[FFmpeg-devel] [PATCH 2/3] pthread : Ensure all state access is protected by progress_mutex

Aaron Colwell acolwell at chromium.org
Thu Mar 22 17:56:00 CET 2012


The current code appears to be inconsistent about holding the
progress_mutex when it checks or updates the state member variable. This
patch makes sure that progress_mutex is always held when state is checked
or modified. I've tried to be conservative in my approach here under the
assumption that protecting all sites is the safest initial strategy. If
other conditions make the locking unnecessarly I'd really like to
understand why and what mechanisms are in place to make sure those
preconditions are always maintained.

Questions:
Why is double-check locking being used in several places? This seems like a
popular threading anti-pattern.
Why are places where I've added locking considered safe right now? Please
help me understand the justification at each site.

---
 libavcodec/pthread.c |   60
++++++++++++++++++++++++++++++-------------------
 1 files changed, 37 insertions(+), 23 deletions(-)

diff --git a/libavcodec/pthread.c b/libavcodec/pthread.c
index e90f21f..9e4686b 100644
--- a/libavcodec/pthread.c
+++ b/libavcodec/pthread.c
@@ -351,6 +351,19 @@ static int thread_init(AVCodecContext *avctx)
     return 0;
 }

+static void thread_finish_setup_locked(AVCodecContext *avctx) {
+    PerThreadContext *p = avctx->thread_opaque;
+
+    if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return;
+
+    if(p->state == STATE_SETUP_FINISHED){
+        av_log(avctx, AV_LOG_WARNING, "Multiple ff_thread_finish_setup()
calls\n");
+    }
+
+    p->state = STATE_SETUP_FINISHED;
+    pthread_cond_broadcast(&p->progress_cond);
+}
+
 /**
  * Codec worker thread.
  *
@@ -368,8 +381,13 @@ static attribute_align_arg void
*frame_worker_thread(void *arg)
     pthread_mutex_lock(&p->mutex);
     while (1) {
         int i;
-            while (p->state == STATE_INPUT_READY && !fctx->die)
-                pthread_cond_wait(&p->input_cond, &p->mutex);
+        pthread_mutex_lock(&p->progress_mutex);
+        while (p->state == STATE_INPUT_READY && !fctx->die) {
+          pthread_mutex_unlock(&p->progress_mutex);
+          pthread_cond_wait(&p->input_cond, &p->mutex);
+          pthread_mutex_lock(&p->progress_mutex);
+        }
+        pthread_mutex_unlock(&p->progress_mutex);

         if (fctx->die) break;

@@ -383,9 +401,9 @@ static attribute_align_arg void
*frame_worker_thread(void *arg)
         p->got_frame = 0;
         p->result = codec->decode(avctx, &p->frame, &p->got_frame,
&p->avpkt);

-        if (p->state == STATE_SETTING_UP) ff_thread_finish_setup(avctx);
-
         pthread_mutex_lock(&p->progress_mutex);
+        if (p->state == STATE_SETTING_UP)
thread_finish_setup_locked(avctx);
+
         for (i = 0; i < MAX_BUFFERS; i++)
             if (p->progress_used[i] && (p->got_frame || p->result<0 ||
avctx->codec_id != CODEC_ID_H264)) {
                 p->progress[i][0] = INT_MAX;
@@ -543,12 +561,10 @@ static int submit_packet(PerThreadContext *p,
AVPacket *avpkt)

     if (prev_thread) {
         int err;
-        if (prev_thread->state == STATE_SETTING_UP) {
             pthread_mutex_lock(&prev_thread->progress_mutex);
             while (prev_thread->state == STATE_SETTING_UP)
                 pthread_cond_wait(&prev_thread->progress_cond,
&prev_thread->progress_mutex);
             pthread_mutex_unlock(&prev_thread->progress_mutex);
-        }

         err = update_context_from_thread(p->avctx, prev_thread->avctx, 0);
         if (err) {
@@ -563,7 +579,9 @@ static int submit_packet(PerThreadContext *p, AVPacket
*avpkt)
     memcpy(buf, avpkt->data, avpkt->size);
     memset(buf + avpkt->size, 0, FF_INPUT_BUFFER_PADDING_SIZE);

+    pthread_mutex_lock(&p->progress_mutex);
     p->state = STATE_SETTING_UP;
+    pthread_mutex_unlock(&p->progress_mutex);
     pthread_cond_signal(&p->input_cond);
     pthread_mutex_unlock(&p->mutex);

@@ -575,8 +593,9 @@ static int submit_packet(PerThreadContext *p, AVPacket
*avpkt)

     if (!p->avctx->thread_safe_callbacks &&
          p->avctx->get_buffer != avcodec_default_get_buffer) {
-        while (p->state != STATE_SETUP_FINISHED && p->state !=
STATE_INPUT_READY) {
-            pthread_mutex_lock(&p->progress_mutex);
+      pthread_mutex_lock(&p->progress_mutex);
+
+      while (p->state != STATE_SETUP_FINISHED && p->state !=
STATE_INPUT_READY) {
             while (p->state == STATE_SETTING_UP)
                 pthread_cond_wait(&p->progress_cond, &p->progress_mutex);

@@ -585,8 +604,8 @@ static int submit_packet(PerThreadContext *p, AVPacket
*avpkt)
                 p->state  = STATE_SETTING_UP;
                 pthread_cond_signal(&p->progress_cond);
             }
-            pthread_mutex_unlock(&p->progress_mutex);
         }
+      pthread_mutex_unlock(&p->progress_mutex);
     }

     fctx->prev_thread = p;
@@ -635,12 +654,10 @@ int ff_thread_decode_frame(AVCodecContext *avctx,
     do {
         p = &fctx->threads[finished++];

-        if (p->state != STATE_INPUT_READY) {
             pthread_mutex_lock(&p->progress_mutex);
             while (p->state != STATE_INPUT_READY)
                 pthread_cond_wait(&p->output_cond, &p->progress_mutex);
             pthread_mutex_unlock(&p->progress_mutex);
-        }

         *picture = p->frame;
         *got_picture_ptr = p->got_frame;
@@ -712,13 +729,8 @@ void ff_thread_finish_setup(AVCodecContext *avctx) {

     if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return;

-    if(p->state == STATE_SETUP_FINISHED){
-        av_log(avctx, AV_LOG_WARNING, "Multiple ff_thread_finish_setup()
calls\n");
-    }
-
     pthread_mutex_lock(&p->progress_mutex);
-    p->state = STATE_SETUP_FINISHED;
-    pthread_cond_broadcast(&p->progress_cond);
+    thread_finish_setup_locked(avctx);
     pthread_mutex_unlock(&p->progress_mutex);
 }

@@ -730,12 +742,11 @@ static void
park_frame_worker_threads(FrameThreadContext *fctx, int thread_count
     for (i = 0; i < thread_count; i++) {
         PerThreadContext *p = &fctx->threads[i];

-        if (p->state != STATE_INPUT_READY) {
             pthread_mutex_lock(&p->progress_mutex);
             while (p->state != STATE_INPUT_READY)
                 pthread_cond_wait(&p->output_cond, &p->progress_mutex);
             pthread_mutex_unlock(&p->progress_mutex);
-        }
+
         p->got_frame = 0;
     }
 }
@@ -945,12 +956,15 @@ int ff_thread_get_buffer(AVCodecContext *avctx,
AVFrame *f)
         return avctx->get_buffer(avctx, f);
     }

+    pthread_mutex_lock(&p->progress_mutex);
     if (p->state != STATE_SETTING_UP &&
         (avctx->codec->update_thread_context ||
(!avctx->thread_safe_callbacks &&
                 avctx->get_buffer != avcodec_default_get_buffer))) {
         av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after
ff_thread_finish_setup()\n");
+        pthread_mutex_unlock(&p->progress_mutex);
         return -1;
     }
+    pthread_mutex_unlock(&p->progress_mutex);

     pthread_mutex_lock(&p->parent->buffer_mutex);
     f->thread_opaque = progress = allocate_progress(p);
@@ -968,8 +982,8 @@ int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame
*f)
         err = avctx->get_buffer(avctx, f);
     } else {
         p->requested_frame = f;
-        p->state = STATE_GET_BUFFER;
         pthread_mutex_lock(&p->progress_mutex);
+        p->state = STATE_GET_BUFFER;
         pthread_cond_broadcast(&p->progress_cond);

         while (p->state != STATE_SETTING_UP)
@@ -977,10 +991,10 @@ int ff_thread_get_buffer(AVCodecContext *avctx,
AVFrame *f)

         err = p->result;

-        pthread_mutex_unlock(&p->progress_mutex);
-
         if (!avctx->codec->update_thread_context)
-            ff_thread_finish_setup(avctx);
+            thread_finish_setup_locked(avctx);
+
+        pthread_mutex_unlock(&p->progress_mutex);
     }

     pthread_mutex_unlock(&p->parent->buffer_mutex);

-- 
1.7.7.3


More information about the ffmpeg-devel mailing list