[FFmpeg-devel] [PATCH] avutil/WIP: add AVAsync API

Clément Bœsch u at pkh.me
Wed Nov 11 01:27:20 CET 2015


From: Clément Bœsch <clement at stupeflix.com>

---

So here is a first prototype of a higher level API following an
asynchronous model to (for now) simply use the current synchronous API.

I suggest to look at libavutil/async.h (sorry no doxy yet) and
doc/examples/async_demuxing_decoding.c for an example of usage.

Basically what the current draft proposes for the user is to instanciate
an asynchronous context, in which you register Readers (the user will
generally do a demuxing in the associated callback to get a packet), in
which you register Decoders (basically just a wrapper for an
AVCodecContext) to which you associate a push frame callback.

You construct something like this:

  AVAsyncContext(
    AVAsyncReader(
      AVAsyncDecoder(...),
      AVAsyncDecoder(...),
      AVAsyncDecoder(...),
      ...
    ),
    AVAsyncReader(
      AVAsyncDecoder(...),
      AVAsyncDecoder(...),
      ...
    ),
    ....
  )

You will generally have a Reader per file but that can be anything you
want as long as you can pull of packets.

Now implementation wise, the interesting part is that every level is
asynchronous:

- The context will spawn a thread for each reader
- Then in each reader, you will have a thread for each decoder
- Then each decoder will have its own queue of packets, and its own
  queue of output frames
- Then along with each decoder, you will have Watcher in its dedicated
  thread: the watcher is monitoring the frame queue of its decoder, and
  will call the user push frame callback anytime a new one appears in
  the queue.

The interesting part about the watcher is that it will allow the user to
transparently do any operation on the frame without blocking the
decoding or the demuxing: basically, blocking in push_frame for the user
will just block any subsequent call to the callback, but the packet
and frame queues of the decoder will continue to fill up, ready to be
pop'ed.

Note: the size of the packet and frame queue for each decoder is
configurable

Note2: the packet queue is not set in the reader so that a decoder that
isn't pulling its packets and decoding fast enough will not block the
others. As a result, the Reader just reads one packet and is then
responsible for broadcasting it to the appropriate decoder (by just
adding it to its internal queue).

Now all of this doesn't require any modification to the current API so
it doesn't really solve the problem of writing asynchronous
decoders/hwaccel. The goal for the next iteration will be to adjust the
VideoToolbox hwaccel to make use of the asynchronous model, and
basically just make it call the user callback by itself. It will be a
good candidate for such thing.

Another concerning limitation in the current code is the inability to
seek: basically the user can not lock the codec and reader context to
execute a seek, nor flush the queues. And I'd like suggestion of
direction in that regard:

- first, in threadmessage it seems I can't yet flush the fifo properly
  (for example by calling av_frame_free() on each entry of the frame
  queue): should I extend the API? Nicolas, maybe you have a suggestion?

- secondly, how should I allow the user to lock/unlock the reader (and
  as a result the likely AVFormatContext user side) and decoder? I added
  a few prototypes in async.h but I'm pretty sure that won't fit many
  cases.  Comments very welcome.

I hope this is going in a direction most developers like. If it's not
the case, I'm of course open to any reworking but please be specific.

Also, I'm really sorry I'm not sending this to libav-devel even so I
apparently looked forward a colaboration in a previous thread. I was
planning to but unfortunately the threadmessage API is not there yet and
it would have require way too much work for an initial prototype. When
we come up with something that satisfy everyone and this code becomes
something more than just a draft, I will maybe do the necessary.
Unfortunately, porting VideoToolbox accel, threadmessage and probably a
bunch of other ffmpeg specific features (I can thing about
av_dynarray2_add and maybe more) is going to require a bit too much code
adjustment, so I might just share the async.h and redirect them to a
working implementation here....

Anyway, please comment.
---
 .gitignore                             |   1 +
 configure                              |   2 +
 doc/Makefile                           |   1 +
 doc/examples/Makefile                  |   3 +-
 doc/examples/async_demuxing_decoding.c | 155 +++++++++++++
 libavutil/Makefile                     |   1 +
 libavutil/async.c                      | 400 +++++++++++++++++++++++++++++++++
 libavutil/async.h                      |  92 ++++++++
 8 files changed, 654 insertions(+), 1 deletion(-)
 create mode 100644 doc/examples/async_demuxing_decoding.c
 create mode 100644 libavutil/async.c
 create mode 100644 libavutil/async.h

diff --git a/.gitignore b/.gitignore
index 93b0dca..010acb0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -40,6 +40,7 @@
 /doc/avoptions_codec.texi
 /doc/avoptions_format.texi
 /doc/doxy/html/
+/doc/examples/async_demuxing_decoding
 /doc/examples/avio_dir_cmd
 /doc/examples/avio_reading
 /doc/examples/decoding_encoding
diff --git a/configure b/configure
index d5e76de..12ca262 100755
--- a/configure
+++ b/configure
@@ -1368,6 +1368,7 @@ COMPONENT_LIST="
 "
 
 EXAMPLE_LIST="
+    async_demuxing_decoding_example
     avio_reading_example
     avio_dir_cmd_example
     decoding_encoding_example
@@ -2855,6 +2856,7 @@ zoompan_filter_deps="swscale"
 zscale_filter_deps="libzimg"
 
 # examples
+async_demuxing_decoding_example_deps="avcodec avformat avutil"
 avio_reading="avformat avcodec avutil"
 avio_dir_cmd="avformat avutil"
 avcodec_example_deps="avcodec avutil"
diff --git a/doc/Makefile b/doc/Makefile
index 3e67c2a..66112a9 100644
--- a/doc/Makefile
+++ b/doc/Makefile
@@ -36,6 +36,7 @@ DOCS-$(CONFIG_MANPAGES)  += $(MANPAGES)
 DOCS-$(CONFIG_TXTPAGES)  += $(TXTPAGES)
 DOCS = $(DOCS-yes)
 
+DOC_EXAMPLES-$(CONFIG_ASYNC_DEMUXING_DECODING_EXAMPLE) += async_demuxing_decoding
 DOC_EXAMPLES-$(CONFIG_AVIO_DIR_CMD_EXAMPLE)      += avio_dir_cmd
 DOC_EXAMPLES-$(CONFIG_AVIO_READING_EXAMPLE)      += avio_reading
 DOC_EXAMPLES-$(CONFIG_AVCODEC_EXAMPLE)           += avcodec
diff --git a/doc/examples/Makefile b/doc/examples/Makefile
index af38159..ed06bc2 100644
--- a/doc/examples/Makefile
+++ b/doc/examples/Makefile
@@ -11,7 +11,8 @@ CFLAGS += -Wall -g
 CFLAGS := $(shell pkg-config --cflags $(FFMPEG_LIBS)) $(CFLAGS)
 LDLIBS := $(shell pkg-config --libs $(FFMPEG_LIBS)) $(LDLIBS)
 
-EXAMPLES=       avio_dir_cmd                       \
+EXAMPLES=       async_demuxing_decoding            \
+                avio_dir_cmd                       \
                 avio_reading                       \
                 decoding_encoding                  \
                 demuxing_decoding                  \
diff --git a/doc/examples/async_demuxing_decoding.c b/doc/examples/async_demuxing_decoding.c
new file mode 100644
index 0000000..0c25a68
--- /dev/null
+++ b/doc/examples/async_demuxing_decoding.c
@@ -0,0 +1,155 @@
+/*
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include <libavutil/async.h>
+#include <libavutil/imgutils.h>
+#include <libavutil/samplefmt.h>
+#include <libavutil/timestamp.h>
+#include <libavformat/avformat.h>
+
+static int pull_packet_cb(void *priv, AVPacket *pkt)
+{
+    AVFormatContext *fmt_ctx = priv;
+    int ret = av_read_frame(fmt_ctx, pkt);
+
+    printf("read packet of size %d from stream %d\n",
+           pkt->size, pkt->stream_index);
+    return ret;
+}
+
+static int push_frame_cb(void *priv, AVFrame *frame)
+{
+    AVStream *st = priv;
+    const char *type = st->codec->codec_type == AVMEDIA_TYPE_VIDEO ?
+                       av_get_pix_fmt_name(frame->format)
+                     : av_get_sample_fmt_name(frame->format);
+
+    printf("decoded %s frame / ts:%s\n", type,
+           av_ts2str(av_frame_get_best_effort_timestamp(frame)));
+
+    av_frame_free(&frame);
+    return 0;
+}
+
+static int open_codec_context(AVAsyncReader *r, AVFormatContext *fmt_ctx,
+                              enum AVMediaType type)
+{
+    int ret, stream_index = -1;
+    AVStream *st;
+    AVCodec *dec;
+    AVCodecContext *dec_ctx;
+
+    ret = av_find_best_stream(fmt_ctx, type, -1, -1, NULL, 0);
+    if (ret < 0)
+        return 0; // ignore if no appropriate stream found
+
+    stream_index = ret;
+    st = fmt_ctx->streams[stream_index];
+
+    dec_ctx = st->codec;
+    dec = avcodec_find_decoder(dec_ctx->codec_id);
+    if (!dec) {
+        fprintf(stderr, "Unable to find %s codec\n", av_get_media_type_string(type));
+        return AVERROR(EINVAL);
+    }
+
+    ret = avcodec_open2(dec_ctx, dec, NULL);
+    if (ret < 0) {
+        fprintf(stderr, "Unable to open %s codec\n", av_get_media_type_string(type));
+        return ret;
+    }
+
+    return avasync_register_decoder(r, st->codec, st, push_frame_cb, st->index);
+}
+
+int main(int ac, char **av)
+{
+    int i, ret = 0;
+    AVAsyncContext *actx;
+
+    if (ac < 2) {
+        fprintf(stderr, "Usage: %s <files...>\n", av[0]);
+        return 0;
+    }
+
+    av_register_all();
+    av_log_set_level(AV_LOG_TRACE);
+
+    actx = avasync_alloc_context();
+    if (!actx)
+        return 1;
+
+    for (i = 1; i < ac; i++) {
+        AVFormatContext *fmt_ctx = NULL;
+        AVAsyncReader *r;
+
+        ret = avformat_open_input(&fmt_ctx, av[i], NULL, NULL);
+        if (ret < 0) {
+            fprintf(stderr, "Unable to open '%s'\n", av[i]);
+            goto end;
+        }
+
+        ret = avformat_find_stream_info(fmt_ctx, NULL) < 0;
+        if (ret < 0) {
+            fprintf(stderr, "Unable to find stream info\n");
+            goto end;
+        }
+
+        av_dump_format(fmt_ctx, 0, av[i], 0);
+
+        ret = avasync_register_reader(actx, av[i], fmt_ctx, pull_packet_cb, &r);
+        if (ret < 0)
+            goto end;
+
+        ret = open_codec_context(r, fmt_ctx, AVMEDIA_TYPE_AUDIO);
+        if (ret < 0)
+            goto end;
+
+        ret = open_codec_context(r, fmt_ctx, AVMEDIA_TYPE_VIDEO);
+        if (ret < 0)
+            goto end;
+
+        if (!r->nb_decoders) {
+            fprintf(stderr, "No video or audio stream found\n");
+            ret = 1;
+            goto end;
+        }
+    }
+
+    avasync_start(actx);
+
+end:
+
+    avasync_wait(actx);
+
+    for (i = 0; i < actx->nb_readers; i++) {
+        int j;
+        AVAsyncReader *r = &actx->readers[i];
+
+        for (j = 0; j < r->nb_decoders; j++) {
+            AVAsyncDecoder *d = &r->decoders[j];
+            avcodec_close(d->codec_ctx);
+        }
+        avformat_close_input((AVFormatContext **)&actx->readers[i].priv_data);
+    }
+
+    avasync_free(&actx);
+    return ret < 0 ? 1 : 0;
+}
diff --git a/libavutil/Makefile b/libavutil/Makefile
index 1bac2b9..2880b6c 100644
--- a/libavutil/Makefile
+++ b/libavutil/Makefile
@@ -80,6 +80,7 @@ BUILT_HEADERS = avconfig.h                                              \
 
 OBJS = adler32.o                                                        \
        aes.o                                                            \
+       async.o                                                          \
        audio_fifo.o                                                     \
        avstring.o                                                       \
        base64.o                                                         \
diff --git a/libavutil/async.c b/libavutil/async.c
new file mode 100644
index 0000000..f1a2c02
--- /dev/null
+++ b/libavutil/async.c
@@ -0,0 +1,400 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <pthread.h>
+
+#include <libavutil/async.h>
+#include <libavutil/opt.h>
+#include <libavutil/time.h>
+
+static const AVClass async_context_class = {
+    .class_name = "async_context",
+    .item_name  = av_default_item_name,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+AVAsyncContext *avasync_alloc_context(void)
+{
+    AVAsyncContext *actx = av_mallocz(sizeof(*actx));
+    if (!actx)
+        return NULL;
+    actx->class = &async_context_class;
+    return actx;
+}
+
+#define OFFSET_READER(x) offsetof(AVAsyncReader, x)
+static const AVOption async_reader_options[] = {
+    { "non_blocking", "set non blocking mode", OFFSET_READER(non_blocking), AV_OPT_TYPE_BOOL, {.i64=0}, 0, 1 },
+    { NULL }
+};
+
+static const AVClass async_reader_class = {
+    .class_name = "async_reader",
+    .item_name  = av_default_item_name,
+    .option     = async_reader_options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+int avasync_register_reader(AVAsyncContext *actx,
+                            const char *name, void *priv,
+                            pull_packet_func_type pull_packet_cb,
+                            AVAsyncReader **r)
+{
+    AVAsyncReader *reader = av_dynarray2_add((void**)&actx->readers, &actx->nb_readers,
+                                             sizeof(*actx->readers), NULL);
+    if (!reader)
+        return AVERROR(ENOMEM);
+
+    memset(reader, 0, sizeof(*reader));
+
+    reader->class = &async_reader_class;
+    av_opt_set_defaults(reader);
+
+    if (name) {
+        reader->name = av_strdup(name);
+        if (!reader->name)
+            return AVERROR(ENOMEM);
+    }
+    reader->priv_data      = priv;
+    reader->pull_packet_cb = pull_packet_cb;
+
+    *r = reader;
+    return 0;
+}
+
+#define OFFSET_DEC(x) offsetof(AVAsyncDecoder, x)
+#define FLAGS AV_OPT_FLAG_DECODING_PARAM
+static const AVOption async_decoder_options[] = {
+    { "max_packets_queue", "set the maximum number of packets in the queue", OFFSET_DEC(max_packets_queue), AV_OPT_TYPE_INT, {.i64=5}, 1, 100, FLAGS },
+    { "max_frames_queue",  "set the maximum number of frames in the queue",  OFFSET_DEC(max_frames_queue),  AV_OPT_TYPE_INT, {.i64=3}, 1, 100, FLAGS },
+    { NULL }
+};
+
+static const AVClass async_decoder_class = {
+    .class_name = "async_decoder",
+    .item_name  = av_default_item_name,
+    .option     = async_decoder_options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+int avasync_register_decoder(AVAsyncReader *r,
+                             AVCodecContext *codec_ctx, void *priv,
+                             push_frame_func_type push_frame_cb,
+                             int pkt_id_match)
+{
+    AVAsyncDecoder *adec = av_dynarray2_add((void**)&r->decoders, &r->nb_decoders,
+                                            sizeof(*r->decoders), NULL);
+    if (!adec)
+        return AVERROR(ENOMEM);
+
+    memset(adec, 0, sizeof(*adec));
+
+    adec->class = &async_decoder_class;
+    av_opt_set_defaults(adec);
+
+    adec->codec_ctx     = codec_ctx;
+    adec->priv_data     = priv;
+    adec->push_frame_cb = push_frame_cb;
+    adec->pkt_id_match  = pkt_id_match;
+    return 0;
+}
+
+/* Watch the frames queue and push them to the user */
+static void *watcher_thread(void *arg)
+{
+    AVAsyncDecoder *d = arg;
+
+    av_log(d, AV_LOG_TRACE, "watching thread starting\n");
+
+    for (;;) {
+        AVFrame *frame;
+
+        /* Wait to get a frame from the queue. If it fails, then the watcher
+         * has to die. */
+        av_log(d, AV_LOG_TRACE, "fetch frame\n");
+        int ret = av_thread_message_queue_recv(d->frames_queue, &frame, 0);
+        av_log(d, AV_LOG_TRACE, "watcher recv frame %p ret=%d\n", frame, ret);
+        if (ret < 0)
+            break;
+
+        ret = d->push_frame_cb(d->priv_data, frame);
+
+        if (ret < 0) {
+            /* watcher will die, notify the decoder */
+            av_thread_message_queue_set_err_send(d->frames_queue, ret);
+            break;
+        }
+    }
+
+    return NULL;
+}
+
+static int decode_packet(AVAsyncDecoder *d, AVPacket *pkt,
+                         AVFrame **frame, int *got_frame)
+{
+    int ret;
+    int decoded = pkt->size;
+    AVFrame *dec_frame = av_frame_alloc(); // XXX: pre-alloc them in the fifo?
+
+    *got_frame = 0;
+    *frame = NULL;
+
+    if (!dec_frame)
+        return AVERROR(ENOMEM);
+
+    switch (d->codec_ctx->codec_type) {
+    case AVMEDIA_TYPE_VIDEO:
+        ret = avcodec_decode_video2(d->codec_ctx, dec_frame, got_frame, pkt);
+        break;
+    case AVMEDIA_TYPE_AUDIO:
+        ret = avcodec_decode_audio4(d->codec_ctx, dec_frame, got_frame, pkt);
+        break;
+
+    default:
+        av_log(d, AV_LOG_ERROR, "Unsupported codec type :(\n");
+    }
+
+    if (ret < 0) {
+        fprintf(stderr, "Error decoding %s frame\n",
+                av_get_media_type_string(d->codec_ctx->codec_type));
+        av_frame_free(&dec_frame);
+        return ret;
+    }
+
+    if (*got_frame)
+        *frame = dec_frame;
+    else
+        av_frame_free(&dec_frame);
+
+    decoded = FFMIN(ret, pkt->size);
+
+    return decoded;
+}
+
+static int queue_frame(AVAsyncDecoder *d, AVFrame *frame)
+{
+    av_log(d, AV_LOG_TRACE, "queue frame %p\n", frame);
+    int ret = av_thread_message_queue_send(d->frames_queue, &frame, 0);
+    if (ret < 0)
+        av_log(d, AV_LOG_ERROR, "Unable to push frame: %s\n", av_err2str(ret));
+    return ret;
+}
+
+static void *decoder_thread(void *arg)
+{
+    int ret, got_frame;
+    AVPacket pkt, orig_pkt;
+    AVFrame *dec_frame;
+    AVAsyncDecoder *d = arg;
+
+    /* Initialize the frame queue (communication decode <-> watcher) */
+    ret = av_thread_message_queue_alloc(&d->frames_queue, d->max_frames_queue, sizeof(AVFrame *));
+    if (ret < 0) {
+        return NULL;
+    }
+
+    /* Spawn frame queue watcher */
+    av_log(d, AV_LOG_TRACE, "decoding thread starting\n");
+    if (pthread_create(&d->watcher_tid, NULL, watcher_thread, d)) {
+        ret = AVERROR(errno);
+        av_log(d, AV_LOG_ERROR, "Unable to start watcher thread: %s\n",
+               av_err2str(ret));
+        av_thread_message_queue_free(&d->frames_queue);
+        return NULL;
+    }
+
+    /* Main packet decoding loop */
+    av_log(d, AV_LOG_TRACE, "main packet decoding loop\n");
+    for (;;) {
+        ret = av_thread_message_queue_recv(d->pkt_queue, &pkt, 0);
+        if (ret < 0)
+            break;
+
+        orig_pkt = pkt;
+        do {
+            ret = decode_packet(d, &pkt, &dec_frame, &got_frame);
+            if (ret < 0)
+                break;
+            pkt.data += ret;
+            pkt.size -= ret;
+            if (got_frame && (ret = queue_frame(d, dec_frame)) < 0)
+                break;
+        } while (pkt.size > 0);
+        av_packet_unref(&orig_pkt);
+    }
+
+    /* flush cached frames */
+    av_log(d, AV_LOG_TRACE, "flush cached frames\n");
+    av_init_packet(&pkt);
+    pkt.data = NULL;
+    pkt.size = 0;
+    do {
+        ret = decode_packet(d, &pkt, &dec_frame, &got_frame);
+        if (ret == 0 && got_frame && (ret = queue_frame(d, dec_frame)) < 0)
+            break;
+        av_log(d, AV_LOG_TRACE, "flushed, got_frame=%d\n", got_frame);
+    } while (got_frame);
+    av_log(d, AV_LOG_TRACE, "flush end\n");
+
+    /* Decoder ends, notify frame watcher so it dies */
+    av_thread_message_queue_set_err_recv(d->frames_queue, ret < 0 ? ret : AVERROR_EOF);
+    pthread_join(d->watcher_tid, NULL);
+
+    av_thread_message_queue_free(&d->frames_queue);
+    return NULL;
+}
+
+static void *reader_thread(void *arg)
+{
+    int ret, i;
+    AVAsyncReader *r = arg;
+
+    av_log(r, AV_LOG_TRACE, "reader thread starting\n");
+
+    /* Spawn decoders */
+    for (i = 0; i < r->nb_decoders; i++) {
+        AVAsyncDecoder *d = &r->decoders[i];
+
+        av_log(d, AV_LOG_TRACE, "    decoder[%d]: matching pkt id #%d\n", i, d->pkt_id_match);
+
+        /* Initialize the packet queue (communication reader <-> decoder) */
+        ret = av_thread_message_queue_alloc(&d->pkt_queue, d->max_packets_queue, sizeof(AVPacket));
+        if (ret < 0) {
+            return NULL;
+        }
+
+        /* Start its working thread */
+        if (pthread_create(&d->tid, NULL, decoder_thread, d)) {
+            ret = AVERROR(errno);
+            av_log(d, AV_LOG_ERROR, "Unable to start decoding thread %d: %s\n",
+                   i, av_err2str(ret));
+            goto end;
+        }
+
+        d->started = 1;
+    }
+
+    while (1) {
+        AVPacket pkt;
+        AVAsyncDecoder *decoder = NULL;
+
+        ret = r->pull_packet_cb(r->priv_data, &pkt);
+
+        if (ret == AVERROR(EAGAIN)) {
+            av_usleep(10000);
+            continue;
+        }
+        if (ret < 0)
+            break;
+
+        /* Find decoder for a given packet */
+        // FIXME: faster broadcasting needed
+        for (i = 0; i < r->nb_decoders; i++) {
+            AVAsyncDecoder *d = &r->decoders[i];
+            if (pkt.stream_index == d->pkt_id_match) {
+                decoder = d;
+                break;
+            }
+        }
+
+        if (!decoder) {
+            av_log(r, AV_LOG_DEBUG, "No decoder for stream %d, ignoring packet\n", pkt.stream_index);
+            av_packet_unref(&pkt);
+            continue;
+        }
+
+        ret = av_thread_message_queue_send(decoder->pkt_queue, &pkt, 0);
+        if (ret < 0) {
+            if (ret != AVERROR_EOF)
+                av_log(decoder, AV_LOG_ERROR, "Unable to send packet to decoder: %s\n", av_err2str(ret));
+            av_packet_unref(&pkt);
+            av_thread_message_queue_set_err_recv(decoder->pkt_queue, ret);
+        }
+    }
+
+end:
+
+    /* Notify all decoders about the error/EOF so they die */
+    for (i = 0; i < r->nb_decoders; i++) {
+        AVAsyncDecoder *d = &r->decoders[i];
+        av_thread_message_queue_set_err_recv(d->pkt_queue, ret);
+        if (d->started)
+            pthread_join(d->tid, NULL);
+        av_thread_message_queue_free(&d->pkt_queue);
+    }
+
+    return NULL;
+}
+
+int avasync_start(AVAsyncContext *actx)
+{
+    int i;
+
+    av_log(actx, AV_LOG_TRACE, "Starting AVAsync loop\n");
+
+    for (i = 0; i < actx->nb_readers; i++) {
+        int ret;
+        AVAsyncReader *r = &actx->readers[i];
+
+        av_log(actx, AV_LOG_TRACE, "  Reader[%d]: %s (blocking: %s)\n",
+               i, r->name, r->non_blocking ? "yes" : "no");
+
+        ret = pthread_create(&r->tid, NULL, reader_thread, r);
+        if (ret) {
+            const int err = AVERROR(ret);
+            av_log(actx, AV_LOG_ERROR, "Unable to start reader thread %d: %s\n",
+                   i, av_err2str(err));
+            return err;
+        }
+        r->started = 1;
+    }
+
+    return 0;
+}
+
+int avasync_wait(AVAsyncContext *actx)
+{
+    int i;
+
+    av_log(actx, AV_LOG_TRACE, "waiting for readers to end\n");
+    for (i = 0; i < actx->nb_readers; i++) {
+        const AVAsyncReader *r = &actx->readers[i];
+
+        if (r->started) {
+            int ret = pthread_join(r->tid, NULL);
+            if (ret)
+                av_log(actx, AV_LOG_ERROR, "Unable to join reader #%d: %s\n",
+                       i, av_err2str(AVERROR(ret)));
+        }
+    }
+    return 0;
+}
+
+void avasync_free(AVAsyncContext **actxp)
+{
+    int i, j;
+    AVAsyncContext *actx = *actxp;
+
+    for (i = 0; i < actx->nb_readers; i++) {
+        AVAsyncReader *r = &actx->readers[i];
+        av_freep(&r->decoders);
+        av_freep(&r->name);
+    }
+    av_freep(&actx->readers);
+    av_freep(actxp);
+}
diff --git a/libavutil/async.h b/libavutil/async.h
new file mode 100644
index 0000000..9c50e79
--- /dev/null
+++ b/libavutil/async.h
@@ -0,0 +1,92 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef AVUTIL_ASYNC_H
+#define AVUTIL_ASYNC_H
+
+#include <libavcodec/avcodec.h>
+#include <libavutil/threadmessage.h>
+
+typedef int (*pull_packet_func_type)(void *priv, AVPacket *pkt);
+typedef int (*push_frame_func_type)(void *priv, AVFrame *frame);
+
+typedef struct AVAsyncDecoder {
+    const AVClass *class;
+    AVCodecContext *codec_ctx;
+    void *priv_data;
+    push_frame_func_type push_frame_cb;
+    int pkt_id_match;
+
+    int started;
+    pthread_t tid;
+    pthread_t watcher_tid;
+
+    AVThreadMessageQueue *pkt_queue;
+    AVThreadMessageQueue *frames_queue;
+
+    int max_packets_queue;
+    int max_frames_queue;
+
+} AVAsyncDecoder;
+
+typedef struct AVAsyncReader {
+    const AVClass *class;
+    char *name;
+    void *priv_data;
+    pull_packet_func_type pull_packet_cb;
+    AVAsyncDecoder *decoders;
+    int nb_decoders;
+
+    int started;
+    pthread_t tid;
+
+    int non_blocking; // TODO: honor
+
+} AVAsyncReader;
+
+typedef struct AVAsyncContext {
+    const AVClass *class;
+    AVAsyncReader *readers;
+    int nb_readers;
+} AVAsyncContext;
+
+AVAsyncContext *avasync_alloc_context(void);
+
+int avasync_register_reader(AVAsyncContext *actx,
+                            const char *name, void *priv,
+                            pull_packet_func_type pull_packet_cb,
+                            AVAsyncReader **r);
+
+int avasync_register_decoder(AVAsyncReader *r,
+                             AVCodecContext *codec_ctx, void *priv,
+                             push_frame_func_type push_frame_cb,
+                             int pkt_id_match);
+
+int avasync_start(AVAsyncContext *actx);
+
+int avasync_lock_decoder(AVAsyncDecoder *d);
+int avasync_unlock_decoder(AVAsyncDecoder *d);
+
+int avasync_lock_reader(AVAsyncReader *r);
+int avasync_unlock_reader(AVAsyncReader *r);
+
+int avasync_wait(AVAsyncContext *actx);
+
+void avasync_free(AVAsyncContext **actxp);
+
+#endif /* AVUTIL_ASYNC_H */
-- 
2.6.2



More information about the ffmpeg-devel mailing list