[FFmpeg-devel] [PATCH v2] avcodec/pthread_slice: rewrite implementation

wm4 nfxjfg at googlemail.com
Mon Jul 10 12:25:23 EEST 2017


On Sun,  9 Jul 2017 23:26:54 +0700
Muhammad Faiz <mfcc64 at gmail.com> wrote:

> Avoid pthread_cond_broadcast that wakes up all workers. Make each of them
> uses distict mutex/cond. Also let main thread help running jobs, but still
> allocate thread_count workers. The last worker is currently unused, emulated
> by main thread.
> Similar to 'avfilter/pthread: rewrite implementation'
> 
> Benchmark on x86_64 with 4 cpus (2 cores x 2 hyperthread)
> ./ffmpeg -threads $threads -thread_type slice -i 10slices.mp4 -f rawvideo -y /dev/null
> threads=2:
> old: 1m15.888s
> new:  1m5.710s
> threads=3:
> old:  1m6.480s
> new:  1m5.260s
> threads=4:
> old:  1m2.292s
> new:   59.677s
> threads=5:
> old:   58.939s
> new:   55.166s
> 
> Signed-off-by: Muhammad Faiz <mfcc64 at gmail.com>
> ---
>  libavcodec/pthread_slice.c | 219 +++++++++++++++++++++++++++++----------------
>  1 file changed, 142 insertions(+), 77 deletions(-)
> 
> diff --git a/libavcodec/pthread_slice.c b/libavcodec/pthread_slice.c
> index 60f5b78..7223205 100644
> --- a/libavcodec/pthread_slice.c
> +++ b/libavcodec/pthread_slice.c
> @@ -22,6 +22,7 @@
>   * @see doc/multithreading.txt
>   */
>  
> +#include <stdatomic.h>
>  #include "config.h"
>  
>  #include "avcodec.h"
> @@ -38,21 +39,26 @@
>  typedef int (action_func)(AVCodecContext *c, void *arg);
>  typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
>  
> +typedef struct WorkerContext WorkerContext;
> +
>  typedef struct SliceThreadContext {
> -    pthread_t *workers;
> +    AVCodecContext *avctx;
> +    WorkerContext *workers;
>      action_func *func;
>      action_func2 *func2;
>      void *args;
>      int *rets;
> -    int job_count;
> -    int job_size;
> -
> -    pthread_cond_t last_job_cond;
> -    pthread_cond_t current_job_cond;
> -    pthread_mutex_t current_job_lock;
> -    unsigned current_execute;
> -    int current_job;
> +    unsigned job_count;
> +    unsigned job_size;
> +
> +    pthread_mutex_t mutex_user;
> +    pthread_mutex_t mutex_done;
> +    pthread_cond_t cond_done;
> +    atomic_uint first_job;
> +    atomic_uint current_job;
> +    atomic_uint nb_finished_jobs;
>      int done;
> +    int worker_done;
>  
>      int *entries;
>      int entries_count;
> @@ -61,42 +67,55 @@ typedef struct SliceThreadContext {
>      pthread_mutex_t *progress_mutex;
>  } SliceThreadContext;
>  
> -static void* attribute_align_arg worker(void *v)
> +struct WorkerContext {
> +    SliceThreadContext  *ctx;
> +    pthread_t           thread;
> +    pthread_mutex_t     mutex;
> +    pthread_cond_t      cond;
> +    int                 done;
> +};
> +
> +static unsigned run_jobs(SliceThreadContext *c)
>  {
> -    AVCodecContext *avctx = v;
> -    SliceThreadContext *c = avctx->internal->thread_ctx;
> -    unsigned last_execute = 0;
> -    int our_job = c->job_count;
> -    int thread_count = avctx->thread_count;
> -    int self_id;
> -
> -    pthread_mutex_lock(&c->current_job_lock);
> -    self_id = c->current_job++;
> -    for (;;){
> -        int ret;
> -        while (our_job >= c->job_count) {
> -            if (c->current_job == thread_count + c->job_count)
> -                pthread_cond_signal(&c->last_job_cond);
> -
> -            while (last_execute == c->current_execute && !c->done)
> -                pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
> -            last_execute = c->current_execute;
> -            our_job = self_id;
> -
> -            if (c->done) {
> -                pthread_mutex_unlock(&c->current_job_lock);
> -                return NULL;
> -            }
> -        }
> -        pthread_mutex_unlock(&c->current_job_lock);
> +    unsigned current_job, first_job, nb_finished_jobs;
> +
> +    current_job = first_job = atomic_fetch_add_explicit(&c->first_job, 1, memory_order_acq_rel);
>  
> -        ret = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
> -                                c->func2(avctx, c->args, our_job, self_id);
> +    do {
> +        int ret = c->func ? c->func(c->avctx, (char *)c->args + current_job * (size_t) c->job_size)
> +                          : c->func2(c->avctx, c->args, current_job, first_job);
>          if (c->rets)
> -            c->rets[our_job%c->job_count] = ret;
> +            c->rets[current_job] = ret;
> +        nb_finished_jobs = atomic_fetch_add_explicit(&c->nb_finished_jobs, 1, memory_order_relaxed) + 1;
> +    } while ((current_job = atomic_fetch_add_explicit(&c->current_job, 1, memory_order_acq_rel)) < c->job_count);
>  
> -        pthread_mutex_lock(&c->current_job_lock);
> -        our_job = c->current_job++;
> +    return nb_finished_jobs;
> +}
> +
> +static void* attribute_align_arg worker(void *v)
> +{
> +    WorkerContext *w = v;
> +    SliceThreadContext *c = w->ctx;
> +
> +    pthread_mutex_lock(&w->mutex);
> +    pthread_cond_signal(&w->cond);
> +
> +    while (1) {
> +        w->done = 1;
> +        while (w->done)
> +            pthread_cond_wait(&w->cond, &w->mutex);
> +
> +        if (c->done) {
> +            pthread_mutex_unlock(&w->mutex);
> +            return NULL;
> +        }
> +
> +        if (run_jobs(c) == c->job_count) {
> +            pthread_mutex_lock(&c->mutex_done);
> +            c->worker_done = 1;
> +            pthread_cond_signal(&c->cond_done);
> +            pthread_mutex_unlock(&c->mutex_done);
> +        }
>      }
>  }
>  
> @@ -105,24 +124,36 @@ void ff_slice_thread_free(AVCodecContext *avctx)
>      SliceThreadContext *c = avctx->internal->thread_ctx;
>      int i;
>  
> -    pthread_mutex_lock(&c->current_job_lock);
> +    for (i = 0; i < avctx->thread_count; i++)
> +        pthread_mutex_lock(&c->workers[i].mutex);
> +
>      c->done = 1;
> -    pthread_cond_broadcast(&c->current_job_cond);
> +
>      for (i = 0; i < c->thread_count; i++)
>          pthread_cond_broadcast(&c->progress_cond[i]);
> -    pthread_mutex_unlock(&c->current_job_lock);
>  
> -    for (i=0; i<avctx->thread_count; i++)
> -         pthread_join(c->workers[i], NULL);
> +    for (i = 0; i < avctx->thread_count; i++) {
> +        WorkerContext *w = &c->workers[i];
> +        w->done = 0;
> +        pthread_cond_signal(&w->cond);
> +        pthread_mutex_unlock(&w->mutex);
> +    }
> +
> +    for (i = 0; i < avctx->thread_count; i++) {
> +        WorkerContext *w = &c->workers[i];
> +        pthread_join(w->thread, NULL);
> +        pthread_cond_destroy(&w->cond);
> +        pthread_mutex_destroy(&w->mutex);
> +    }
>  
>      for (i = 0; i < c->thread_count; i++) {
>          pthread_mutex_destroy(&c->progress_mutex[i]);
>          pthread_cond_destroy(&c->progress_cond[i]);
>      }
>  
> -    pthread_mutex_destroy(&c->current_job_lock);
> -    pthread_cond_destroy(&c->current_job_cond);
> -    pthread_cond_destroy(&c->last_job_cond);
> +    pthread_cond_destroy(&c->cond_done);
> +    pthread_mutex_destroy(&c->mutex_done);

> +    pthread_mutex_lock(&c->mutex_user);

This looks suspicious. Does it really acquire the lock and keep it
locked after leaving this deinit function?

>  
>      av_freep(&c->entries);
>      av_freep(&c->progress_mutex);
> @@ -132,16 +163,11 @@ void ff_slice_thread_free(AVCodecContext *avctx)
>      av_freep(&avctx->internal->thread_ctx);
>  }
>  
> -static av_always_inline void thread_park_workers(SliceThreadContext *c, int thread_count)
> -{
> -    while (c->current_job != thread_count + c->job_count)
> -        pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
> -    pthread_mutex_unlock(&c->current_job_lock);
> -}
> -
> -static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
> +static int thread_execute_internal(AVCodecContext *avctx, action_func *func, action_func2 *func2,
> +                                   void *arg, int *ret, int job_count, int job_size)
>  {
>      SliceThreadContext *c = avctx->internal->thread_ctx;
> +    int i, nb_workers;
>  
>      if (!(avctx->active_thread_type&FF_THREAD_SLICE) || avctx->thread_count <= 1)
>          return avcodec_default_execute(avctx, func, arg, ret, job_count, job_size);
> @@ -149,27 +175,49 @@ static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, i
>      if (job_count <= 0)
>          return 0;
>  
> -    pthread_mutex_lock(&c->current_job_lock);
> +    // last worker is unused
> +    nb_workers = FFMIN(job_count - 1, avctx->thread_count - 1);
> +
> +    for (i = 0; i < nb_workers; i++)
> +        pthread_mutex_lock(&c->workers[i].mutex);

This looks suspicious... does it lock all workers in the "hot" path?
Wouldn't this cause a lot of contention? And why mix this with atomic
accesses?

>  
> -    c->current_job = avctx->thread_count;
> +    atomic_store_explicit(&c->first_job, 0, memory_order_relaxed);
> +    atomic_store_explicit(&c->current_job, avctx->thread_count, memory_order_relaxed);
> +    atomic_store_explicit(&c->nb_finished_jobs, 0, memory_order_relaxed);
>      c->job_count = job_count;
>      c->job_size = job_size;
>      c->args = arg;
>      c->func = func;
> +    c->func2 = func2;
>      c->rets = ret;
> -    c->current_execute++;
> -    pthread_cond_broadcast(&c->current_job_cond);
>  
> -    thread_park_workers(c, avctx->thread_count);
> +    for (i = 0; i < nb_workers; i++) {
> +        WorkerContext *w = &c->workers[i];
> +        w->done = 0;
> +        pthread_cond_signal(&w->cond);
> +        pthread_mutex_unlock(&w->mutex);
> +    }
> +
> +    // emulate the last worker, no need to wait if all jobs is complete
> +    if (run_jobs(c) != c->job_count) {
> +        pthread_mutex_lock(&c->mutex_done);
> +        while (!c->worker_done)
> +            pthread_cond_wait(&c->cond_done, &c->mutex_done);
> +        c->worker_done = 0;
> +        pthread_mutex_unlock(&c->mutex_done);
> +    }
>  
>      return 0;
>  }
>  
> +static int thread_execute(AVCodecContext *avctx, action_func *func, void *arg, int *ret, int job_count, int job_size)
> +{
> +    return thread_execute_internal(avctx, func, NULL, arg, ret, job_count, job_size);
> +}
> +
>  static int thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
>  {
> -    SliceThreadContext *c = avctx->internal->thread_ctx;
> -    c->func2 = func2;
> -    return thread_execute(avctx, NULL, arg, ret, job_count, 0);
> +    return thread_execute_internal(avctx, NULL, func2, arg, ret, job_count, 0);
>  }
>  
>  int ff_slice_thread_init(AVCodecContext *avctx)
> @@ -208,31 +256,48 @@ int ff_slice_thread_init(AVCodecContext *avctx)
>      if (!c)
>          return -1;
>  
> -    c->workers = av_mallocz_array(thread_count, sizeof(pthread_t));
> +    // allocate thread_count workers, but currently last worker is unused, emulated by main thread
> +    // anticipate when main thread needs to do something
> +    c->workers = av_mallocz_array(thread_count, sizeof(*c->workers));
>      if (!c->workers) {
>          av_free(c);
>          return -1;
>      }
>  
>      avctx->internal->thread_ctx = c;
> -    c->current_job = 0;
> +    c->avctx = avctx;
> +    pthread_mutex_init(&c->mutex_user, NULL);
> +    pthread_mutex_init(&c->mutex_done, NULL);
> +    pthread_cond_init(&c->cond_done, NULL);
> +    atomic_init(&c->first_job, 0);
> +    atomic_init(&c->current_job, 0);
> +    atomic_init(&c->nb_finished_jobs, 0);
>      c->job_count = 0;
>      c->job_size = 0;
>      c->done = 0;
> -    pthread_cond_init(&c->current_job_cond, NULL);
> -    pthread_cond_init(&c->last_job_cond, NULL);
> -    pthread_mutex_init(&c->current_job_lock, NULL);
> -    pthread_mutex_lock(&c->current_job_lock);
> -    for (i=0; i<thread_count; i++) {
> -        if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
> -           avctx->thread_count = i;
> -           pthread_mutex_unlock(&c->current_job_lock);
> +    c->worker_done = 0;
> +
> +    for (i = 0; i < thread_count; i++) {
> +        WorkerContext *w = &c->workers[i];
> +
> +        w->ctx = c;
> +        pthread_mutex_init(&w->mutex, NULL);
> +        pthread_cond_init(&w->cond, NULL);
> +        pthread_mutex_lock(&w->mutex);
> +        w->done = 0;
> +        if (pthread_create(&w->thread, NULL, worker, w)) {
> +           avctx->thread_count = i + 1;
> +           pthread_mutex_unlock(&w->mutex);
> +           pthread_cond_destroy(&w->cond);
> +           pthread_mutex_destroy(&w->mutex);
>             ff_thread_free(avctx);
>             return -1;
>          }
> -    }
>  
> -    thread_park_workers(c, thread_count);
> +        while (!w->done)
> +            pthread_cond_wait(&w->cond, &w->mutex);
> +        pthread_mutex_unlock(&w->mutex);
> +    }
>  
>      avctx->execute = thread_execute;
>      avctx->execute2 = thread_execute2;



More information about the ffmpeg-devel mailing list