[FFmpeg-devel] [PATCH] avcodec/pthread_slice: rewrite implementation
Muhammad Faiz
mfcc64 at gmail.com
Sun Jul 9 13:27:42 EEST 2017
Avoid pthread_cond_broadcast that wakes up all workers. Make each of them
uses distict mutex/cond. Also let main thread help running jobs.
Similar to 'avfilter/pthread: rewrite implementation'
Benchmark on x86_64 with 4 cpus (2 cores x 2 hyperthread)
./ffmpeg -threads $threads -thread_type slice -i 10slices.mp4 -f rawvideo -y /dev/null
threads=2:
old: 1m21.492s
new: 1m11.075s
threads=3:
old: 1m12.554s
new: 1m11.771s
threads=4:
old: 1m8.915s
new: 1m4.194s
threads=5:
old: 1m2.417s
new: 57.524s
threads=6:
old: 1m6.710s
new: 1m0.731s
threads=7:
old: 1m5.217s
new: 1m2.166s
threads=8:
old: 1m6.974s
new: 1m2.431s
threads=9:
old: 59.830s
new: 58.162s
threads=10:
old: 58.711s
new: 55.859s
Signed-off-by: Muhammad Faiz <mfcc64 at gmail.com>
---
libavcodec/pthread_slice.c | 217 ++++++++++++++++++++++++++++-----------------
1 file changed, 138 insertions(+), 79 deletions(-)
diff --git a/libavcodec/pthread_slice.c b/libavcodec/pthread_slice.c
index 60f5b78..a290dab 100644
--- a/libavcodec/pthread_slice.c
+++ b/libavcodec/pthread_slice.c
@@ -22,6 +22,7 @@
* @see doc/multithreading.txt
*/
+#include <stdatomic.h>
#include "config.h"
#include "avcodec.h"
@@ -38,21 +39,25 @@
typedef int (action_func)(AVCodecContext *c, void *arg);
typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
+typedef struct WorkerContext WorkerContext;
+
typedef struct SliceThreadContext {
- pthread_t *workers;
+ AVCodecContext *avctx;
+ WorkerContext *workers;
action_func *func;
action_func2 *func2;
void *args;
int *rets;
- int job_count;
- int job_size;
-
- pthread_cond_t last_job_cond;
- pthread_cond_t current_job_cond;
- pthread_mutex_t current_job_lock;
- unsigned current_execute;
- int current_job;
+ unsigned job_count;
+ unsigned job_size;
+
+ pthread_mutex_t mutex_user;
+ pthread_mutex_t mutex_done;
+ pthread_cond_t cond_done;
+ atomic_uint current_job;
+ atomic_uint nb_finished_jobs;
int done;
+ int worker_done;
int *entries;
int entries_count;
@@ -61,68 +66,93 @@ typedef struct SliceThreadContext {
pthread_mutex_t *progress_mutex;
} SliceThreadContext;
-static void* attribute_align_arg worker(void *v)
+struct WorkerContext {
+ SliceThreadContext *ctx;
+ pthread_t thread;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int done;
+};
+
+static unsigned run_jobs(SliceThreadContext *c)
{
- AVCodecContext *avctx = v;
- SliceThreadContext *c = avctx->internal->thread_ctx;
- unsigned last_execute = 0;
- int our_job = c->job_count;
- int thread_count = avctx->thread_count;
- int self_id;
-
- pthread_mutex_lock(&c->current_job_lock);
- self_id = c->current_job++;
- for (;;){
- int ret;
- while (our_job >= c->job_count) {
- if (c->current_job == thread_count + c->job_count)
- pthread_cond_signal(&c->last_job_cond);
-
- while (last_execute == c->current_execute && !c->done)
- pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
- last_execute = c->current_execute;
- our_job = self_id;
-
- if (c->done) {
- pthread_mutex_unlock(&c->current_job_lock);
- return NULL;
- }
- }
- pthread_mutex_unlock(&c->current_job_lock);
+ unsigned current_job, nb_finished_jobs = 0;
+ int thread_count = c->avctx->thread_count;
- ret = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
- c->func2(avctx, c->args, our_job, self_id);
+ while (nb_finished_jobs != c->job_count &&
+ (current_job = atomic_fetch_add_explicit(&c->current_job, 1, memory_order_acq_rel)) < c->job_count) {
+ int ret = c->func ? c->func(c->avctx, (char *)c->args + current_job * (size_t) c->job_size)
+ : c->func2(c->avctx, c->args, current_job, current_job % thread_count);
if (c->rets)
- c->rets[our_job%c->job_count] = ret;
+ c->rets[current_job] = ret;
+ nb_finished_jobs = atomic_fetch_add_explicit(&c->nb_finished_jobs, 1, memory_order_acq_rel) + 1;
+ }
+
+ return nb_finished_jobs;
+}
+
+static void* attribute_align_arg worker(void *v)
+{
+ WorkerContext *w = v;
+ SliceThreadContext *c = w->ctx;
+
+ pthread_mutex_lock(&w->mutex);
+ pthread_cond_signal(&w->cond);
+
+ while (1) {
+ w->done = 1;
+ while (w->done)
+ pthread_cond_wait(&w->cond, &w->mutex);
+
+ if (c->done) {
+ pthread_mutex_unlock(&w->mutex);
+ return NULL;
+ }
- pthread_mutex_lock(&c->current_job_lock);
- our_job = c->current_job++;
+ if (run_jobs(c) == c->job_count) {
+ pthread_mutex_lock(&c->mutex_done);
+ c->worker_done = 1;
+ pthread_cond_signal(&c->cond_done);
+ pthread_mutex_unlock(&c->mutex_done);
+ }
}
}
void ff_slice_thread_free(AVCodecContext *avctx)
{
SliceThreadContext *c = avctx->internal->thread_ctx;
- int i;
+ int i, nb_workers = avctx->thread_count - 1;
+
+ for (i = 0; i < nb_workers; i++)
+ pthread_mutex_lock(&c->workers[i].mutex);
- pthread_mutex_lock(&c->current_job_lock);
c->done = 1;
- pthread_cond_broadcast(&c->current_job_cond);
+
for (i = 0; i < c->thread_count; i++)
pthread_cond_broadcast(&c->progress_cond[i]);
- pthread_mutex_unlock(&c->current_job_lock);
- for (i=0; i<avctx->thread_count; i++)
- pthread_join(c->workers[i], NULL);
+ for (i = 0; i < nb_workers; i++) {
+ WorkerContext *w = &c->workers[i];
+ w->done = 0;
+ pthread_cond_signal(&w->cond);
+ pthread_mutex_unlock(&w->mutex);
+ }
+
+ for (i = 0; i < nb_workers; i++) {
+ WorkerContext *w = &c->workers[i];
+ pthread_join(w->thread, NULL);
+ pthread_cond_destroy(&w->cond);
+ pthread_mutex_destroy(&w->mutex);
+ }
for (i = 0; i < c->thread_count; i++) {
pthread_mutex_destroy(&c->progress_mutex[i]);
pthread_cond_destroy(&c->progress_cond[i]);
}
- pthread_mutex_destroy(&c->current_job_lock);
- pthread_cond_destroy(&c->current_job_cond);
- pthread_cond_destroy(&c->last_job_cond);
+ pthread_cond_destroy(&c->cond_done);
+ pthread_mutex_destroy(&c->mutex_done);
+ pthread_mutex_lock(&c->mutex_user);
av_freep(&c->entries);
av_freep(&c->progress_mutex);
@@ -132,16 +162,11 @@ void ff_slice_thread_free(AVCodecContext *avctx)
av_freep(&avctx->internal->thread_ctx);
}
-static av_always_inline void thread_park_workers(SliceThreadContext *c, int thread_count)
-{
- while (c->current_job != thread_count + c->job_count)
- pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
- pthread_mutex_unlock(&c->current_job_lock);
-}
-
-static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
+static int thread_execute_internal(AVCodecContext *avctx, action_func *func, action_func2 *func2,
+ void *arg, int *ret, int job_count, int job_size)
{
SliceThreadContext *c = avctx->internal->thread_ctx;
+ int i, nb_workers;
if (!(avctx->active_thread_type&FF_THREAD_SLICE) || avctx->thread_count <= 1)
return avcodec_default_execute(avctx, func, arg, ret, job_count, job_size);
@@ -149,32 +174,51 @@ static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, i
if (job_count <= 0)
return 0;
- pthread_mutex_lock(&c->current_job_lock);
+ nb_workers = FFMIN(job_count - 1, avctx->thread_count - 1);
+
+ for (i = 0; i < nb_workers; i++)
+ pthread_mutex_lock(&c->workers[i].mutex);
- c->current_job = avctx->thread_count;
+ atomic_store_explicit(&c->current_job, 0, memory_order_relaxed);
+ atomic_store_explicit(&c->nb_finished_jobs, 0, memory_order_relaxed);
c->job_count = job_count;
c->job_size = job_size;
c->args = arg;
c->func = func;
+ c->func2 = func2;
c->rets = ret;
- c->current_execute++;
- pthread_cond_broadcast(&c->current_job_cond);
- thread_park_workers(c, avctx->thread_count);
+ for (i = 0; i < nb_workers; i++) {
+ WorkerContext *w = &c->workers[i];
+ w->done = 0;
+ pthread_cond_signal(&w->cond);
+ pthread_mutex_unlock(&w->mutex);
+ }
+
+ if (run_jobs(c) != c->job_count) {
+ pthread_mutex_lock(&c->mutex_done);
+ while (!c->worker_done)
+ pthread_cond_wait(&c->cond_done, &c->mutex_done);
+ c->worker_done = 0;
+ pthread_mutex_unlock(&c->mutex_done);
+ }
return 0;
}
+static int thread_execute(AVCodecContext *avctx, action_func *func, void *arg, int *ret, int job_count, int job_size)
+{
+ return thread_execute_internal(avctx, func, NULL, arg, ret, job_count, job_size);
+}
+
static int thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
{
- SliceThreadContext *c = avctx->internal->thread_ctx;
- c->func2 = func2;
- return thread_execute(avctx, NULL, arg, ret, job_count, 0);
+ return thread_execute_internal(avctx, NULL, func2, arg, ret, job_count, 0);
}
int ff_slice_thread_init(AVCodecContext *avctx)
{
- int i;
+ int i, nb_workers;
SliceThreadContext *c;
int thread_count = avctx->thread_count;
@@ -208,31 +252,46 @@ int ff_slice_thread_init(AVCodecContext *avctx)
if (!c)
return -1;
- c->workers = av_mallocz_array(thread_count, sizeof(pthread_t));
+ nb_workers = thread_count - 1;
+ c->workers = av_mallocz_array(nb_workers, sizeof(*c->workers));
if (!c->workers) {
av_free(c);
return -1;
}
avctx->internal->thread_ctx = c;
- c->current_job = 0;
+ c->avctx = avctx;
+ pthread_mutex_init(&c->mutex_user, NULL);
+ pthread_mutex_init(&c->mutex_done, NULL);
+ pthread_cond_init(&c->cond_done, NULL);
+ atomic_init(&c->current_job, 0);
+ atomic_init(&c->nb_finished_jobs, 0);
c->job_count = 0;
c->job_size = 0;
c->done = 0;
- pthread_cond_init(&c->current_job_cond, NULL);
- pthread_cond_init(&c->last_job_cond, NULL);
- pthread_mutex_init(&c->current_job_lock, NULL);
- pthread_mutex_lock(&c->current_job_lock);
- for (i=0; i<thread_count; i++) {
- if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
- avctx->thread_count = i;
- pthread_mutex_unlock(&c->current_job_lock);
+ c->worker_done = 0;
+
+ for (i = 0; i < nb_workers; i++) {
+ WorkerContext *w = &c->workers[i];
+
+ w->ctx = c;
+ pthread_mutex_init(&w->mutex, NULL);
+ pthread_cond_init(&w->cond, NULL);
+ pthread_mutex_lock(&w->mutex);
+ w->done = 0;
+ if (pthread_create(&w->thread, NULL, worker, w)) {
+ avctx->thread_count = i + 1;
+ pthread_mutex_unlock(&w->mutex);
+ pthread_cond_destroy(&w->cond);
+ pthread_mutex_destroy(&w->mutex);
ff_thread_free(avctx);
return -1;
}
- }
- thread_park_workers(c, thread_count);
+ while (!w->done)
+ pthread_cond_wait(&w->cond, &w->mutex);
+ pthread_mutex_unlock(&w->mutex);
+ }
avctx->execute = thread_execute;
avctx->execute2 = thread_execute2;
--
2.9.3
More information about the ffmpeg-devel
mailing list