[FFmpeg-devel] [RFC] execute2/simplify dnxhdenc threading

Reimar Döffinger Reimar.Doeffinger
Sat Oct 10 17:08:35 CEST 2009


Hello,
this is my second try to simplify dnxhdenc and also change the code so
that number of jobs/tasks > number of threads, avoiding performance
issues if not all threads finish with the same speed.
Obviously execute2 must be implemented for the other thread
implementations, too, and dnxhdenc can probably be further simplified
(in particular, I doubt that setting start_mb_y and end_mb_y is actually
necessary), but I wanted to hear if this is on the right track before
I take it further.
I haven't tested it too well since I just now realized that all the time
I had compiled FFmpeg without thread support, if someone volunteers to
benchmark the speeds of the different implementations they are more than
welcome.
-------------- next part --------------
Index: libavcodec/options.c
===================================================================
--- libavcodec/options.c	(revision 20198)
+++ libavcodec/options.c	(working copy)
@@ -433,6 +433,7 @@
     s->release_buffer= avcodec_default_release_buffer;
     s->get_format= avcodec_default_get_format;
     s->execute= avcodec_default_execute;
+    s->execute2= avcodec_default_execute2;
     s->sample_aspect_ratio= (AVRational){0,1};
     s->pix_fmt= PIX_FMT_NONE;
     s->sample_fmt= SAMPLE_FMT_S16; // FIXME: set to NONE
Index: libavcodec/avcodec.h
===================================================================
--- libavcodec/avcodec.h	(revision 20198)
+++ libavcodec/avcodec.h	(working copy)
@@ -30,7 +30,7 @@
 #include "libavutil/avutil.h"
 
 #define LIBAVCODEC_VERSION_MAJOR 52
-#define LIBAVCODEC_VERSION_MINOR 36
+#define LIBAVCODEC_VERSION_MINOR 37
 #define LIBAVCODEC_VERSION_MICRO  0
 
 #define LIBAVCODEC_VERSION_INT  AV_VERSION_INT(LIBAVCODEC_VERSION_MAJOR, \
@@ -2526,6 +2526,18 @@
      * - decoding: Set by libavcodec
      */
     enum AVChromaLocation chroma_sample_location;
+
+    /**
+     * The codec may call this to execute several independent things.
+     * It will return only after finishing all tasks.
+     * The user may replace this with some multithreaded implementation,
+     * the default implementation will execute the parts serially.
+     * @param count the number of things to execute
+     * @param arg2 argument passed unchanged to func
+     * - encoding: Set by libavcodec, user can override.
+     * - decoding: Set by libavcodec, user can override.
+     */
+    int (*execute2)(struct AVCodecContext *c, int (*func)(struct AVCodecContext *c2, void *arg, int jobnr, int threadnr), void *arg2, int *ret, int count);
 } AVCodecContext;
 
 /**
@@ -3154,6 +3166,7 @@
 void avcodec_thread_free(AVCodecContext *s);
 int avcodec_thread_execute(AVCodecContext *s, int (*func)(AVCodecContext *c2, void *arg2),void *arg, int *ret, int count, int size);
 int avcodec_default_execute(AVCodecContext *c, int (*func)(AVCodecContext *c2, void *arg2),void *arg, int *ret, int count, int size);
+int avcodec_default_execute2(AVCodecContext *c, int (*func)(AVCodecContext *c2, void *arg2, int, int),void *arg, int *ret, int count);
 //FIXME func typedef
 
 /**
Index: libavcodec/dnxhdenc.c
===================================================================
--- libavcodec/dnxhdenc.c	(revision 20198)
+++ libavcodec/dnxhdenc.c	(working copy)
@@ -204,6 +204,7 @@
         return -1;
 
     FF_ALLOCZ_OR_GOTO(ctx->m.avctx, ctx->slice_size, ctx->m.mb_height*sizeof(uint32_t), fail);
+    FF_ALLOCZ_OR_GOTO(ctx->m.avctx, ctx->slice_offs, ctx->m.mb_height*sizeof(uint32_t), fail);
     FF_ALLOCZ_OR_GOTO(ctx->m.avctx, ctx->mb_bits,    ctx->m.mb_num   *sizeof(uint16_t), fail);
     FF_ALLOCZ_OR_GOTO(ctx->m.avctx, ctx->mb_qscale,  ctx->m.mb_num   *sizeof(uint8_t) , fail);
 
@@ -211,7 +212,7 @@
     ctx->frame.pict_type = FF_I_TYPE;
     ctx->m.avctx->coded_frame = &ctx->frame;
 
-    if (avctx->thread_count > MAX_THREADS || (avctx->thread_count > ctx->m.mb_height)) {
+    if (avctx->thread_count > MAX_THREADS) {
         av_log(avctx, AV_LOG_ERROR, "too many threads\n");
         return -1;
     }
@@ -222,11 +223,6 @@
         memcpy(ctx->thread[i], ctx, sizeof(DNXHDEncContext));
     }
 
-    for (i = 0; i < avctx->thread_count; i++) {
-        ctx->thread[i]->m.start_mb_y = (ctx->m.mb_height*(i  ) + avctx->thread_count/2) / avctx->thread_count;
-        ctx->thread[i]->m.end_mb_y   = (ctx->m.mb_height*(i+1) + avctx->thread_count/2) / avctx->thread_count;
-    }
-
     return 0;
  fail: //for FF_ALLOCZ_OR_GOTO
     return -1;
@@ -397,13 +393,15 @@
     }
 }
 
-static int dnxhd_calc_bits_thread(AVCodecContext *avctx, void *arg)
+static int dnxhd_calc_bits_thread(AVCodecContext *avctx, void *arg, int jobnr, int threadnr)
 {
-    DNXHDEncContext *ctx = *(void**)arg;
-    int mb_y, mb_x;
-    int qscale = ctx->thread[0]->qscale;
+    DNXHDEncContext *ctx = avctx->priv_data;
+    int mb_y = jobnr, mb_x;
+    int qscale = ctx->qscale;
+    ctx = ctx->thread[threadnr];
+    ctx->m.start_mb_y = jobnr;
+    ctx->m.end_mb_y   = jobnr + 1;
 
-    for (mb_y = ctx->m.start_mb_y; mb_y < ctx->m.end_mb_y; mb_y++) {
         ctx->m.last_dc[0] =
         ctx->m.last_dc[1] =
         ctx->m.last_dc[2] = 1024;
@@ -443,16 +441,18 @@
             ctx->mb_rc[qscale][mb].ssd = ssd;
             ctx->mb_rc[qscale][mb].bits = ac_bits+dc_bits+12+8*ctx->vlc_bits[0];
         }
-    }
     return 0;
 }
 
-static int dnxhd_encode_thread(AVCodecContext *avctx, void *arg)
+static int dnxhd_encode_thread(AVCodecContext *avctx, void *arg, int jobnr, int threadnr)
 {
-    DNXHDEncContext *ctx = *(void**)arg;
-    int mb_y, mb_x;
+    DNXHDEncContext *ctx = avctx->priv_data;
+    int mb_y = jobnr, mb_x;
+    ctx = ctx->thread[threadnr];
+    ctx->m.start_mb_y = jobnr;
+    ctx->m.end_mb_y   = jobnr + 1;
+    init_put_bits(&ctx->m.pb, (uint8_t *)arg + 640 + ctx->slice_offs[jobnr], ctx->slice_size[jobnr]);
 
-    for (mb_y = ctx->m.start_mb_y; mb_y < ctx->m.end_mb_y; mb_y++) {
         ctx->m.last_dc[0] =
         ctx->m.last_dc[1] =
         ctx->m.last_dc[2] = 1024;
@@ -477,18 +477,17 @@
         }
         if (put_bits_count(&ctx->m.pb)&31)
             put_bits(&ctx->m.pb, 32-(put_bits_count(&ctx->m.pb)&31), 0);
-    }
     flush_put_bits(&ctx->m.pb);
     return 0;
 }
 
-static void dnxhd_setup_threads_slices(DNXHDEncContext *ctx, uint8_t *buf)
+static void dnxhd_setup_threads_slices(DNXHDEncContext *ctx)
 {
     int mb_y, mb_x;
-    int i, offset = 0;
-    for (i = 0; i < ctx->m.avctx->thread_count; i++) {
-        int thread_size = 0;
-        for (mb_y = ctx->thread[i]->m.start_mb_y; mb_y < ctx->thread[i]->m.end_mb_y; mb_y++) {
+    int offset = 0;
+    for (mb_y = 0; mb_y < ctx->m.mb_height; mb_y++) {
+        int thread_size;
+        ctx->slice_offs[mb_y] = offset;
             ctx->slice_size[mb_y] = 0;
             for (mb_x = 0; mb_x < ctx->m.mb_width; mb_x++) {
                 unsigned mb = mb_y * ctx->m.mb_width + mb_x;
@@ -496,18 +495,18 @@
             }
             ctx->slice_size[mb_y] = (ctx->slice_size[mb_y]+31)&~31;
             ctx->slice_size[mb_y] >>= 3;
-            thread_size += ctx->slice_size[mb_y];
-        }
-        init_put_bits(&ctx->thread[i]->m.pb, buf + 640 + offset, thread_size);
+            thread_size = ctx->slice_size[mb_y];
         offset += thread_size;
     }
 }
 
-static int dnxhd_mb_var_thread(AVCodecContext *avctx, void *arg)
+static int dnxhd_mb_var_thread(AVCodecContext *avctx, void *arg, int jobnr, int threadnr)
 {
-    DNXHDEncContext *ctx = *(void**)arg;
-    int mb_y, mb_x;
-    for (mb_y = ctx->m.start_mb_y; mb_y < ctx->m.end_mb_y; mb_y++) {
+    DNXHDEncContext *ctx = avctx->priv_data;
+    int mb_y = jobnr, mb_x;
+    ctx = ctx->thread[threadnr];
+    ctx->m.start_mb_y = jobnr;
+    ctx->m.end_mb_y   = jobnr + 1;
         for (mb_x = 0; mb_x < ctx->m.mb_width; mb_x++) {
             unsigned mb  = mb_y * ctx->m.mb_width + mb_x;
             uint8_t *pix = ctx->thread[0]->src[0] + ((mb_y<<4) * ctx->m.linesize) + (mb_x<<4);
@@ -516,7 +515,6 @@
             ctx->mb_cmp[mb].value = varc;
             ctx->mb_cmp[mb].mb = mb;
         }
-    }
     return 0;
 }
 
@@ -528,7 +526,7 @@
 
     for (q = 1; q < avctx->qmax; q++) {
         ctx->qscale = q;
-        avctx->execute(avctx, dnxhd_calc_bits_thread, &ctx->thread[0], NULL, avctx->thread_count, sizeof(void*));
+        avctx->execute2(avctx, dnxhd_calc_bits_thread, NULL, NULL, ctx->m.mb_height);
     }
     up_step = down_step = 2<<LAMBDA_FRAC_BITS;
     lambda = ctx->lambda;
@@ -608,7 +606,7 @@
         bits = 0;
         ctx->qscale = qscale;
         // XXX avoid recalculating bits
-        ctx->m.avctx->execute(ctx->m.avctx, dnxhd_calc_bits_thread, &ctx->thread[0], NULL, ctx->m.avctx->thread_count, sizeof(void*));
+        ctx->m.avctx->execute2(ctx->m.avctx, dnxhd_calc_bits_thread, NULL, NULL, ctx->m.mb_height);
         for (y = 0; y < ctx->m.mb_height; y++) {
             for (x = 0; x < ctx->m.mb_width; x++)
                 bits += ctx->mb_rc[qscale][y*ctx->m.mb_width+x].bits;
@@ -732,7 +730,7 @@
     }
     if (!ret) {
         if (RC_VARIANCE)
-            avctx->execute(avctx, dnxhd_mb_var_thread, &ctx->thread[0], NULL, avctx->thread_count, sizeof(void*));
+            avctx->execute2(avctx, dnxhd_mb_var_thread, NULL, NULL, ctx->m.mb_height);
         radix_sort(ctx->mb_cmp, ctx->m.mb_num);
         for (x = 0; x < ctx->m.mb_num && max_bits > ctx->frame_bits; x++) {
             int mb = ctx->mb_cmp[x].mb;
@@ -795,7 +793,7 @@
         return -1;
     }
 
-    dnxhd_setup_threads_slices(ctx, buf);
+    dnxhd_setup_threads_slices(ctx);
 
     offset = 0;
     for (i = 0; i < ctx->m.mb_height; i++) {
@@ -804,7 +802,7 @@
         assert(!(ctx->slice_size[i] & 3));
     }
 
-    avctx->execute(avctx, dnxhd_encode_thread, &ctx->thread[0], NULL, avctx->thread_count, sizeof(void*));
+    avctx->execute2(avctx, dnxhd_encode_thread, buf, NULL, ctx->m.mb_height);
 
     assert(640 + offset + 4 <= ctx->cid_table->coding_unit_size);
     memset(buf + 640 + offset, 0, ctx->cid_table->coding_unit_size - 4 - offset - 640);
@@ -840,6 +838,7 @@
     av_freep(&ctx->mb_rc);
     av_freep(&ctx->mb_cmp);
     av_freep(&ctx->slice_size);
+    av_freep(&ctx->slice_offs);
 
     av_freep(&ctx->qmatrix_c);
     av_freep(&ctx->qmatrix_l);
Index: libavcodec/dnxhdenc.h
===================================================================
--- libavcodec/dnxhdenc.h	(revision 20198)
+++ libavcodec/dnxhdenc.h	(working copy)
@@ -46,6 +46,7 @@
     const CIDEntry *cid_table;
     uint8_t *msip; ///< Macroblock Scan Indexes Payload
     uint32_t *slice_size;
+    uint32_t *slice_offs;
 
     struct DNXHDEncContext *thread[MAX_THREADS];
 
Index: libavcodec/utils.c
===================================================================
--- libavcodec/utils.c	(revision 20198)
+++ libavcodec/utils.c	(working copy)
@@ -414,6 +414,16 @@
     return 0;
 }
 
+int avcodec_default_execute2(AVCodecContext *c, int (*func)(AVCodecContext *c2, void *arg2, int jobnr, int threadnr),void *arg, int *ret, int count){
+    int i;
+
+    for(i=0; i<count; i++){
+        int r= func(c, arg, i, 0);
+        if(ret) ret[i]= r;
+    }
+    return 0;
+}
+
 enum PixelFormat avcodec_default_get_format(struct AVCodecContext *s, const enum PixelFormat *fmt){
     while (*fmt != PIX_FMT_NONE && ff_is_hwaccel_pix_fmt(*fmt))
         ++fmt;
Index: libavcodec/pthread.c
===================================================================
--- libavcodec/pthread.c	(revision 20198)
+++ libavcodec/pthread.c	(working copy)
@@ -26,10 +26,12 @@
 #include "avcodec.h"
 
 typedef int (action_func)(AVCodecContext *c, void *arg);
+typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
 
 typedef struct ThreadContext {
     pthread_t *workers;
     action_func *func;
+    action_func2 *func2;
     void *args;
     int *rets;
     int rets_count;
@@ -68,7 +70,8 @@
         }
         pthread_mutex_unlock(&c->current_job_lock);
 
-        c->rets[our_job%c->rets_count] = c->func(avctx, (char*)c->args + our_job*c->job_size);
+        c->rets[our_job%c->rets_count] = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
+                                                   c->func2(avctx, c->args, our_job, self_id);
 
         pthread_mutex_lock(&c->current_job_lock);
         our_job = c->current_job++;
@@ -130,6 +133,13 @@
     return 0;
 }
 
+int avcodec_thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
+{
+    ThreadContext *c= avctx->thread_opaque;
+    c->func2 = func2;
+    avcodec_thread_execute(avctx, NULL, arg, ret, job_count, 0);
+}
+
 int avcodec_thread_init(AVCodecContext *avctx, int thread_count)
 {
     int i;
@@ -167,5 +177,6 @@
     avcodec_thread_park_workers(c, thread_count);
 
     avctx->execute = avcodec_thread_execute;
+    avctx->execute2 = avcodec_thread_execute2;
     return 0;
 }



More information about the ffmpeg-devel mailing list