[FFmpeg-soc] [PATCH 6/9] Add the frame-threading support code.

Alexander Strange astrange at ithinksw.com
Thu May 29 05:51:53 CEST 2008


---
  libavcodec/Makefile      |    2 +-
  libavcodec/framethread.c |  342 +++++++++++++++++++++++++++++++++++++ 
+++++++++
  libavcodec/thread.c      |    7 +-
  libavcodec/utils.c       |    7 +-
  4 files changed, 353 insertions(+), 5 deletions(-)
  create mode 100644 libavcodec/framethread.c

diff --git a/libavcodec/Makefile b/libavcodec/Makefile
index c9b237f..0d80b92 100644
--- a/libavcodec/Makefile
+++ b/libavcodec/Makefile
@@ -364,7 +364,7 @@ OBJS-$(CONFIG_TEXT2MOVSUB_BSF)         +=  
movsub_bsf.o

  OBJS-$(HAVE_BEOSTHREADS)               += beosthread.o
  OBJS-$(HAVE_OS2THREADS)                += os2thread.o
-OBJS-$(HAVE_PTHREADS)                  += thread.o
+OBJS-$(HAVE_PTHREADS)                  += framethread.o thread.o
  OBJS-$(HAVE_W32THREADS)                += w32thread.o

  OBJS-$(HAVE_XVMC)                      += xvmcvideo.o
diff --git a/libavcodec/framethread.c b/libavcodec/framethread.c
new file mode 100644
index 0000000..f707cf3
--- /dev/null
+++ b/libavcodec/framethread.c
@@ -0,0 +1,342 @@
+/*
+ * Frame-based multithreading support
+ * Copyright (c) 2008 Alexander Strange (astrange at ithinksw.com)
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  
02110-1301 USA
+ */
+#include <pthread.h>
+#include "avcodec.h"
+#include "thread.h"
+
+/*
+FIXME:
+There are maybe more (or less) conds/mutexes than necessary
+Lots of 'volatile'
+Only works with pthreads
+*/
+
+typedef struct PerThreadContext {
+    pthread_t thread;
+    pthread_cond_t input_cond, progress_cond, output_cond;
+    pthread_mutex_t mutex, progress_mutex, buffer_mutex;
+
+    AVCodecContext *context;
+
+    uint8_t *input;
+    int input_size;
+
+    AVFrame result;
+    int output_size, output_res;
+
+    struct FrameThreadContext *parent;
+
+    int frame_number, decoded_frame_number, decode_progress;
+
+    int predecoded; // unneeded?
+} PerThreadContext;
+
+typedef struct FrameThreadContext {
+    PerThreadContext *threads;
+    PerThreadContext *last_thread;
+
+    int next_available, next_ready;
+    int delaying, submitted, last_output;
+    //last_output is max(submitted - threadcount+1, 0) or so,  
unnecessary?
+
+    int die;
+} FrameThreadContext;
+
+static void *decode_frame_thread(void *arg)
+{
+    PerThreadContext * volatile p = arg;
+    AVCodecContext *avctx = p->context;
+    FrameThreadContext * volatile fctx = p->parent;
+    AVCodec *codec = avctx->codec;
+    int res;
+
+    while (1) {
+        pthread_mutex_lock(&p->mutex);
+        while (!p->input_size && !fctx->die) {pthread_cond_wait(&p- 
 >input_cond, &p->mutex);}
+        pthread_mutex_unlock(&p->mutex);
+
+        if (fctx->die) break;
+
+        if (!codec->update_context) ff_report_predecode_done(avctx);
+
+        pthread_mutex_lock(&p->mutex);
+        res = codec->decode(avctx, &p->result, &p->output_size, p- 
 >input, p->input_size);
+
+        if (!p->predecoded) ff_report_predecode_done(avctx); // 
duplication
+
+        p->output_res = res;
+        p->decoded_frame_number = p->frame_number;
+        p->input_size = 0;
+
+        pthread_mutex_lock(&p->progress_mutex);
+        pthread_cond_signal(&p->progress_cond);
+        pthread_mutex_unlock(&p->progress_mutex);
+        pthread_mutex_unlock(&p->mutex);
+    };
+
+    return NULL;
+}
+
+static void submit_frame(PerThreadContext * volatile p, const uint8_t  
*buf, int buf_size, int frame_number)
+{
+    AVCodec *codec = p->context->codec;
+    PerThreadContext *last_thread = p->parent->last_thread;
+
+    if (!buf_size && !(p->context->codec->capabilities &  
CODEC_CAP_DELAY)) return;
+
+    pthread_mutex_lock(&p->mutex);
+
+    if (codec->update_context && last_thread) {
+        codec->update_context(p->context, last_thread->context);
+    }
+
+    p->input = av_fast_realloc(p->input, &p->input_size, buf_size);
+    memcpy(p->input, buf, buf_size);
+    p->frame_number = frame_number;
+    p->input_size = buf_size;
+    p->decode_progress = -1;
+    p->predecoded = 0;
+
+    pthread_cond_signal(&p->input_cond);
+    pthread_mutex_unlock(&p->mutex);
+
+    pthread_mutex_lock(&p->progress_mutex);
+    while (!p->predecoded) pthread_cond_wait(&p->progress_cond, &p- 
 >progress_mutex);
+    pthread_mutex_unlock(&p->progress_mutex);
+
+    p->parent->last_thread = p;
+}
+
+int ff_decode_frame_threaded(AVCodecContext *avctx,
+                        void *data, int *data_size,
+                        const uint8_t *buf, int buf_size)
+{
+    FrameThreadContext *fctx;
+    PerThreadContext * volatile p;
+    int thread_count = avctx->thread_count;
+
+    // We can't call init until now, because avcodec_thread_init
+    // is sometimes called before avcodec_open, and we need codec info
+	if (!avctx->thread_opaque) ff_frame_thread_init(avctx);
+
+    fctx = ((PerThreadContext*)avctx->thread_opaque)->parent;
+
+    p = &fctx->threads[fctx->next_available];
+    submit_frame(p, buf, buf_size, fctx->submitted++);
+
+    if (fctx->delaying) {
+        fctx->next_available++;
+        if (fctx->next_available >= (thread_count-1)) fctx->delaying  
= 0;
+
+        *data_size=0;
+        return 0;
+    }
+
+    fctx->next_available++;
+
+    p = &fctx->threads[fctx->next_ready];
+
+    pthread_mutex_lock(&p->progress_mutex);
+    while (p->decoded_frame_number <= fctx->last_output)  
{pthread_cond_wait(&p->progress_cond, &p->progress_mutex);}
+    pthread_mutex_unlock(&p->progress_mutex);
+
+    fctx->next_ready++;
+
+    *(AVFrame*)data = p->result;
+    *data_size = p->output_size;
+
+    if (fctx->next_available >= thread_count) fctx->next_available = 0;
+    if (fctx->next_ready >= thread_count) fctx->next_ready = 0;
+
+    fctx->last_output++;
+
+    return p->output_res;
+}
+
+void ff_report_decode_progress(AVFrame *f, int n) {
+    PerThreadContext *p;
+    int *progress;
+
+    if (!f->avctx->thread_opaque) return;
+
+    p = f->avctx->thread_opaque;
+    progress = f->thread_opaque;
+
+    pthread_mutex_lock(&p->progress_mutex);
+    *progress = n;
+    pthread_cond_broadcast(&p->progress_cond);
+    pthread_mutex_unlock(&p->progress_mutex);
+}
+
+void ff_await_decode_progress(AVFrame *f, int n) {
+    PerThreadContext *p;
+    int * volatile progress;
+
+    if (!f->avctx->thread_opaque) return;
+
+    p = f->avctx->thread_opaque;
+    progress = f->thread_opaque;
+
+    if (*progress >= n) return;
+
+    pthread_mutex_lock(&p->progress_mutex);
+    while (*progress < n)
+        pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
+    pthread_mutex_unlock(&p->progress_mutex);
+}
+
+void ff_report_predecode_done(AVCodecContext *avctx) {
+    PerThreadContext *p = avctx->thread_opaque;
+
+    if (!p) return;
+
+    pthread_mutex_lock(&p->progress_mutex);
+    p->predecoded = 1;
+    pthread_cond_broadcast(&p->progress_cond);
+    pthread_mutex_unlock(&p->progress_mutex);
+}
+
+int ff_frame_thread_init(AVCodecContext *avctx) {
+    FrameThreadContext *fctx;
+    int i, thread_count = avctx->thread_count;
+
+    fctx = av_mallocz(sizeof(FrameThreadContext));
+    fctx->delaying = 1;
+    fctx->last_output = -1;
+
+    fctx->threads = av_mallocz(sizeof(PerThreadContext) *  
thread_count);
+
+    for (i = 0; i < thread_count; i++) {
+        PerThreadContext *p = &fctx->threads[i];
+
+        pthread_mutex_init(&p->mutex, NULL);
+        pthread_mutex_init(&p->progress_mutex, NULL);
+        pthread_mutex_init(&p->buffer_mutex, NULL);
+        pthread_cond_init(&p->input_cond, NULL);
+        pthread_cond_init(&p->progress_cond, NULL);
+        pthread_cond_init(&p->output_cond, NULL);
+
+        p->parent = fctx;
+        p->decoded_frame_number = -1;
+
+        if (!i) {
+            p->context = avctx;
+            avctx->thread_opaque = p;
+        } else {
+            AVCodecContext *copy = av_malloc(sizeof(AVCodecContext));
+
+            *copy = *avctx;
+            copy->is_copy = 1;
+            copy->priv_data = av_malloc(avctx->codec->priv_data_size);
+            memcpy(copy->priv_data, avctx->priv_data, copy->codec- 
 >priv_data_size);
+            if (copy->codec->init_copy) copy->codec->init_copy(copy);
+
+            p->context = copy;
+            copy->thread_opaque = p;
+        }
+
+        pthread_create(&p->thread, NULL, decode_frame_thread, p);
+    }
+
+    return 0;
+}
+
+void ff_frame_thread_free(AVCodecContext *avctx) {
+    FrameThreadContext *fctx;
+    AVCodec *codec;
+    int i;
+
+    if (!avctx->thread_opaque) return;
+
+    fctx = ((PerThreadContext*)avctx->thread_opaque)->parent;
+    codec = fctx->last_thread->context->codec;
+
+    if (fctx->last_thread != &fctx->threads[0] && codec- 
 >update_context)
+        codec->update_context(fctx->threads[0].context, fctx- 
 >last_thread->context);
+
+    fctx->die = 1;
+
+    for (i = 0; i < avctx->thread_count; i++) {
+        PerThreadContext *p = &fctx->threads[i];
+
+        pthread_mutex_lock(&p->mutex);
+        pthread_cond_signal(&p->input_cond);
+        pthread_mutex_unlock(&p->mutex);
+
+        pthread_join(p->thread, NULL);
+
+        pthread_mutex_destroy(&p->mutex);
+        pthread_mutex_destroy(&p->progress_mutex);
+        pthread_mutex_destroy(&p->buffer_mutex);
+        pthread_cond_destroy(&p->input_cond);
+        pthread_cond_destroy(&p->progress_cond);
+        pthread_cond_destroy(&p->output_cond);
+        av_freep(&p->input);
+
+        if (i) {
+            p->context->codec->close(p->context);
+            avcodec_default_free_buffers(p->context);
+
+            av_freep(&p->context->priv_data);
+            av_freep(&p->context);
+        }
+    }
+
+    av_freep(&fctx->threads);
+    av_free(fctx);
+}
+
+int ff_mt_get_buffer(AVCodecContext *avctx, AVFrame *f)
+{
+    int ret, *progress;
+    PerThreadContext *p = avctx->thread_opaque;
+
+    f->avctx = avctx;
+
+    if (!p) {
+        return avctx->get_buffer(avctx, f);
+    }
+
+    pthread_mutex_lock(&p->buffer_mutex);
+    ret = avctx->get_buffer(avctx, f);
+    pthread_mutex_unlock(&p->buffer_mutex);
+
+    f->thread_opaque = progress = av_malloc(sizeof(int));
+    *progress = -1;
+
+    return ret;
+}
+
+void ff_mt_release_buffer(AVFrame *f)
+{
+    PerThreadContext *p = f->avctx->thread_opaque;
+
+    if (!p) {
+        f->avctx->release_buffer(f->avctx, f);
+        return;
+    }
+
+    pthread_mutex_lock(&p->buffer_mutex);
+    f->avctx->release_buffer(f->avctx, f);
+    pthread_mutex_unlock(&p->buffer_mutex);
+
+    av_freep(&f->thread_opaque);
+}
diff --git a/libavcodec/thread.c b/libavcodec/thread.c
index 5b9f4fa..7529c29 100644
--- a/libavcodec/thread.c
+++ b/libavcodec/thread.c
@@ -174,7 +174,7 @@ int avcodec_thread_init(AVCodecContext *s, int  
thread_count)
  {
      s->thread_count = thread_count;

-    if (!s->thread_opaque)
+    if (s->codec && !(s->codec->capabilities &&  
CODEC_CAP_FRAME_THREADS) && !s->thread_opaque)
          return ff_thread_init(s, thread_count);

      return 0;
@@ -182,5 +182,8 @@ int avcodec_thread_init(AVCodecContext *s, int  
thread_count)

  void avcodec_thread_free(AVCodecContext *s)
  {
-    ff_thread_free(s);
+    if (s->codec->capabilities & CODEC_CAP_FRAME_THREADS)
+        ff_frame_thread_free(s);
+    else
+        ff_thread_free(s);
  }
diff --git a/libavcodec/utils.c b/libavcodec/utils.c
index 72dc3c7..85f3682 100644
--- a/libavcodec/utils.c
+++ b/libavcodec/utils.c
@@ -886,7 +886,7 @@ int attribute_align_arg  
avcodec_open(AVCodecContext *avctx, AVCodec *codec)
          }
      }

-    if (avctx->thread_count > 1) {
+    if (avctx->thread_count > 1 && !(avctx->codec->capabilities &  
CODEC_CAP_FRAME_THREADS)) {
          ret = ff_thread_init(avctx, avctx->thread_count);
          if (ret < 0) {
              av_freep(&avctx->priv_data);
@@ -949,12 +949,15 @@ int attribute_align_arg  
avcodec_decode_video(AVCodecContext *avctx, AVFrame *pic
                           const uint8_t *buf, int buf_size)
  {
      int ret;
+    int threaded = (avctx->codec->capabilities &  
CODEC_CAP_FRAME_THREADS) && avctx->thread_count > 1;

      *got_picture_ptr= 0;
      if((avctx->coded_width||avctx->coded_height) &&  
avcodec_check_dimensions(avctx,avctx->coded_width,avctx->coded_height))
          return -1;
      if((avctx->codec->capabilities & CODEC_CAP_DELAY) || buf_size){
-        ret = avctx->codec->decode(avctx, picture, got_picture_ptr,
+        if (threaded) ret = ff_decode_frame_threaded(avctx, picture,
+                                got_picture_ptr, buf, buf_size);
+        else ret = avctx->codec->decode(avctx, picture,  
got_picture_ptr,
                                  buf, buf_size);

          emms_c(); //needed to avoid an emms_c() call before every  
return;
-- 
1.5.5.1





More information about the FFmpeg-soc mailing list