[FFmpeg-devel] [PATCH 05/13] fftools/ffmpeg_filter: move filtering to a separate thread

Anton Khirnov anton at khirnov.net
Thu Nov 23 21:15:00 EET 2023


As previously for decoding, this is merely "scaffolding" for moving to a
fully threaded architecture and does not yet make filtering truly
parallel - the main thread will currently wait for the filtering thread
to finish its work before continuing. That will change in future commits
after encoders are also moved to threads and a thread-aware scheduler is
added.
---
 fftools/ffmpeg.h        |   9 +-
 fftools/ffmpeg_dec.c    |  39 +-
 fftools/ffmpeg_filter.c | 825 ++++++++++++++++++++++++++++++++++------
 3 files changed, 730 insertions(+), 143 deletions(-)

diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 1f11a2f002..f50222472c 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -80,6 +80,14 @@ enum HWAccelID {
     HWACCEL_GENERIC,
 };
 
+enum FrameOpaque {
+    FRAME_OPAQUE_REAP_FILTERS = 1,
+    FRAME_OPAQUE_CHOOSE_INPUT,
+    FRAME_OPAQUE_SUB_HEARTBEAT,
+    FRAME_OPAQUE_EOF,
+    FRAME_OPAQUE_SEND_COMMAND,
+};
+
 typedef struct HWDevice {
     const char *name;
     enum AVHWDeviceType type;
@@ -730,7 +738,6 @@ const FrameData *frame_data_c(AVFrame *frame);
 
 int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference);
 int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb);
-int ifilter_sub2video(InputFilter *ifilter, const AVFrame *frame);
 void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb);
 
 /**
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index 517d6b3ced..b60bad1220 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -147,11 +147,12 @@ fail:
 
 static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
 {
-    int i, ret;
+    int i, ret = 0;
 
-    av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */
     for (i = 0; i < ist->nb_filters; i++) {
-        ret = ifilter_send_frame(ist->filters[i], decoded_frame, i < ist->nb_filters - 1);
+        ret = ifilter_send_frame(ist->filters[i], decoded_frame,
+                                 i < ist->nb_filters - 1 ||
+                                 ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
         if (ret == AVERROR_EOF)
             ret = 0; /* ignore */
         if (ret < 0) {
@@ -380,15 +381,6 @@ static int video_frame_process(InputStream *ist, AVFrame *frame)
     return 0;
 }
 
-static void sub2video_flush(InputStream *ist)
-{
-    for (int i = 0; i < ist->nb_filters; i++) {
-        int ret = ifilter_sub2video(ist->filters[i], NULL);
-        if (ret != AVERROR_EOF && ret < 0)
-            av_log(NULL, AV_LOG_WARNING, "Flush the frame error.\n");
-    }
-}
-
 static int process_subtitle(InputStream *ist, AVFrame *frame)
 {
     Decoder *d = ist->decoder;
@@ -426,14 +418,9 @@ static int process_subtitle(InputStream *ist, AVFrame *frame)
     if (!subtitle)
         return 0;
 
-    for (int i = 0; i < ist->nb_filters; i++) {
-        ret = ifilter_sub2video(ist->filters[i], frame);
-        if (ret < 0) {
-            av_log(ist, AV_LOG_ERROR, "Error sending a subtitle for filtering: %s\n",
-                   av_err2str(ret));
-            return ret;
-        }
-    }
+    ret = send_frame_to_filters(ist, frame);
+    if (ret < 0)
+        return ret;
 
     subtitle = (AVSubtitle*)frame->buf[0]->data;
     if (!subtitle->num_rects)
@@ -824,14 +811,10 @@ finish:
         return ret;
 
     // signal EOF to our downstreams
-    if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE)
-        sub2video_flush(ist);
-    else {
-        ret = send_filter_eof(ist);
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
-            return ret;
-        }
+    ret = send_filter_eof(ist);
+    if (ret < 0) {
+        av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
+        return ret;
     }
 
     return AVERROR_EOF;
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 69c28a6b2b..1b964fc53f 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -21,6 +21,8 @@
 #include <stdint.h>
 
 #include "ffmpeg.h"
+#include "ffmpeg_utils.h"
+#include "thread_queue.h"
 
 #include "libavfilter/avfilter.h"
 #include "libavfilter/buffersink.h"
@@ -53,12 +55,50 @@ typedef struct FilterGraphPriv {
     int is_meta;
     int disable_conversions;
 
+    int nb_inputs_bound;
+    int nb_outputs_bound;
+
     const char *graph_desc;
 
     // frame for temporarily holding output from the filtergraph
     AVFrame *frame;
     // frame for sending output to the encoder
     AVFrame *frame_enc;
+
+    pthread_t        thread;
+    /**
+     * Queue for sending frames from the main thread to the filtergraph. Has
+     * nb_inputs+1 streams - the first nb_inputs stream correspond to
+     * filtergraph inputs. Frames on those streams may have their opaque set to
+     * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the
+     *   EOF event for the correspondint stream. Will be immediately followed by
+     *   this stream being send-closed.
+     * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of
+     *   a subtitle heartbeat event. Will only be sent for sub2video streams.
+     *
+     * The last stream is "control" - the main thread sends empty AVFrames with
+     * opaque set to
+     * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame available
+     *   from filtergraph outputs. These frames are sent to corresponding
+     *   streams in queue_out. Finally an empty frame is sent to the control
+     *   stream in queue_out.
+     * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames are
+     *   available the terminating empty frame's opaque will contain the index+1
+     *   of the filtergraph input to which more input frames should be supplied.
+     */
+    ThreadQueue     *queue_in;
+    /**
+     * Queue for sending frames from the filtergraph back to the main thread.
+     * Has nb_outputs+1 streams - the first nb_outputs stream correspond to
+     * filtergraph outputs.
+     *
+     * The last stream is "control" - see documentation for queue_in for more
+     * details.
+     */
+    ThreadQueue     *queue_out;
+    // submitting frames to filter thread returned EOF
+    // this only happens on thread exit, so is not per-input
+    int              eof_in;
 } FilterGraphPriv;
 
 static FilterGraphPriv *fgp_from_fg(FilterGraph *fg)
@@ -71,6 +111,22 @@ static const FilterGraphPriv *cfgp_from_cfg(const FilterGraph *fg)
     return (const FilterGraphPriv*)fg;
 }
 
+// data that is local to the filter thread and not visible outside of it
+typedef struct FilterGraphThread {
+    AVFrame *frame;
+
+    // Temporary buffer for output frames, since on filtergraph reset
+    // we cannot send them to encoders immediately.
+    // The output index is stored in frame opaque.
+    AVFifo  *frame_queue_out;
+
+    int      got_frame;
+
+    // EOF status of each input/output, as received by the thread
+    uint8_t *eof_in;
+    uint8_t *eof_out;
+} FilterGraphThread;
+
 typedef struct InputFilterPriv {
     InputFilter ifilter;
 
@@ -204,7 +260,25 @@ static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter)
     return (OutputFilterPriv*)ofilter;
 }
 
-static int configure_filtergraph(FilterGraph *fg);
+typedef struct FilterCommand {
+    char *target;
+    char *command;
+    char *arg;
+
+    double time;
+    int    all_filters;
+} FilterCommand;
+
+static void filter_command_free(void *opaque, uint8_t *data)
+{
+    FilterCommand *fc = (FilterCommand*)data;
+
+    av_freep(&fc->target);
+    av_freep(&fc->command);
+    av_freep(&fc->arg);
+
+    av_free(data);
+}
 
 static int sub2video_get_blank_frame(InputFilterPriv *ifp)
 {
@@ -574,6 +648,59 @@ static int ifilter_has_all_input_formats(FilterGraph *fg)
     return 1;
 }
 
+static void *filter_thread(void *arg);
+
+// start the filtering thread once all inputs and outputs are bound
+static int fg_thread_try_start(FilterGraphPriv *fgp)
+{
+    FilterGraph *fg = &fgp->fg;
+    ObjPool *op;
+    int ret = 0;
+
+    if (fgp->nb_inputs_bound  < fg->nb_inputs ||
+        fgp->nb_outputs_bound < fg->nb_outputs)
+        return 0;
+
+    op = objpool_alloc_frames();
+    if (!op)
+        return AVERROR(ENOMEM);
+
+    fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move);
+    if (!fgp->queue_in) {
+        objpool_free(&op);
+        return AVERROR(ENOMEM);
+    }
+
+    // at least one output is mandatory
+    op = objpool_alloc_frames();
+    if (!op)
+        goto fail;
+
+    fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move);
+    if (!fgp->queue_out) {
+        objpool_free(&op);
+        goto fail;
+    }
+
+    ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp);
+    if (ret) {
+        ret = AVERROR(ret);
+        av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n",
+               fg->index, av_err2str(ret));
+        goto fail;
+    }
+
+    return 0;
+fail:
+    if (ret >= 0)
+        ret = AVERROR(ENOMEM);
+
+    tq_free(&fgp->queue_in);
+    tq_free(&fgp->queue_out);
+
+    return ret;
+}
+
 static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in)
 {
     AVFilterContext *ctx = inout->filter_ctx;
@@ -607,6 +734,7 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg)
 static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
 {
     InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
+    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
     int ret;
 
     av_assert0(!ifp->ist);
@@ -624,7 +752,10 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
             return AVERROR(ENOMEM);
     }
 
-    return 0;
+    fgp->nb_inputs_bound++;
+    av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs);
+
+    return fg_thread_try_start(fgp);
 }
 
 static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
@@ -756,24 +887,10 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost)
         break;
     }
 
-    // if we have all input parameters and all outputs are bound,
-    // the graph can now be configured
-    if (ifilter_has_all_input_formats(fg)) {
-        int ret;
+    fgp->nb_outputs_bound++;
+    av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
 
-        for (int i = 0; i < fg->nb_outputs; i++)
-            if (!fg->outputs[i]->ost)
-                return 0;
-
-        ret = configure_filtergraph(fg);
-        if (ret < 0) {
-            av_log(fg, AV_LOG_ERROR, "Error configuring filter graph: %s\n",
-                   av_err2str(ret));
-            return ret;
-        }
-    }
-
-    return 0;
+    return fg_thread_try_start(fgp);
 }
 
 static InputFilter *ifilter_alloc(FilterGraph *fg)
@@ -803,6 +920,34 @@ static InputFilter *ifilter_alloc(FilterGraph *fg)
     return ifilter;
 }
 
+static int fg_thread_stop(FilterGraphPriv *fgp)
+{
+    void *ret;
+
+    if (!fgp->queue_in)
+        return 0;
+
+    for (int i = 0; i <= fgp->fg.nb_inputs; i++) {
+        InputFilterPriv *ifp = i < fgp->fg.nb_inputs ?
+                               ifp_from_ifilter(fgp->fg.inputs[i]) : NULL;
+
+        if (ifp)
+            ifp->eof = 1;
+
+        tq_send_finish(fgp->queue_in, i);
+    }
+
+    for (int i = 0; i <= fgp->fg.nb_outputs; i++)
+        tq_receive_finish(fgp->queue_out, i);
+
+    pthread_join(fgp->thread, &ret);
+
+    tq_free(&fgp->queue_in);
+    tq_free(&fgp->queue_out);
+
+    return (int)(intptr_t)ret;
+}
+
 void fg_free(FilterGraph **pfg)
 {
     FilterGraph *fg = *pfg;
@@ -812,6 +957,8 @@ void fg_free(FilterGraph **pfg)
         return;
     fgp = fgp_from_fg(fg);
 
+    fg_thread_stop(fgp);
+
     avfilter_graph_free(&fg->graph);
     for (int j = 0; j < fg->nb_inputs; j++) {
         InputFilter *ifilter = fg->inputs[j];
@@ -1622,7 +1769,7 @@ static int graph_is_meta(AVFilterGraph *graph)
     return 1;
 }
 
-static int configure_filtergraph(FilterGraph *fg)
+static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
 {
     FilterGraphPriv *fgp = fgp_from_fg(fg);
     AVBufferRef *hw_device;
@@ -1746,7 +1893,7 @@ static int configure_filtergraph(FilterGraph *fg)
     /* send the EOFs for the finished inputs */
     for (i = 0; i < fg->nb_inputs; i++) {
         InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]);
-        if (ifp->eof) {
+        if (fgt->eof_in[i]) {
             ret = av_buffersrc_add_frame(ifp->filter, NULL);
             if (ret < 0)
                 goto fail;
@@ -1829,8 +1976,8 @@ int filtergraph_is_simple(const FilterGraph *fg)
     return fgp->is_simple;
 }
 
-void fg_send_command(FilterGraph *fg, double time, const char *target,
-                     const char *command, const char *arg, int all_filters)
+static void send_command(FilterGraph *fg, double time, const char *target,
+                         const char *command, const char *arg, int all_filters)
 {
     int ret;
 
@@ -1853,6 +2000,29 @@ void fg_send_command(FilterGraph *fg, double time, const char *target,
     }
 }
 
+static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt)
+{
+    int nb_requests, nb_requests_max = 0;
+    int best_input = -1;
+
+    for (int i = 0; i < fg->nb_inputs; i++) {
+        InputFilter *ifilter = fg->inputs[i];
+        InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
+        InputStream     *ist = ifp->ist;
+
+        if (input_files[ist->file_index]->eagain || fgt->eof_in[i])
+            continue;
+
+        nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter);
+        if (nb_requests > nb_requests_max) {
+            nb_requests_max = nb_requests;
+            best_input = i;
+        }
+    }
+
+    return best_input;
+}
+
 static int choose_out_timebase(OutputFilterPriv *ofp, AVFrame *frame)
 {
     OutputFilter *ofilter = &ofp->ofilter;
@@ -2088,16 +2258,16 @@ finish:
     fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY);
 }
 
-static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame)
+static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
+                           AVFrame *frame, int buffer)
 {
     FilterGraphPriv  *fgp = fgp_from_fg(ofp->ofilter.graph);
-    OutputStream     *ost = ofp->ofilter.ost;
     AVFrame   *frame_prev = ofp->fps.last_frame;
     enum AVMediaType type = ofp->ofilter.type;
 
-    int64_t nb_frames = 1, nb_frames_prev = 0;
+    int64_t nb_frames = !!frame, nb_frames_prev = 0;
 
-    if (type == AVMEDIA_TYPE_VIDEO)
+    if (type == AVMEDIA_TYPE_VIDEO && (frame || fgt->got_frame))
         video_sync_process(ofp, frame, &nb_frames, &nb_frames_prev);
 
     for (int64_t i = 0; i < nb_frames; i++) {
@@ -2136,10 +2306,31 @@ static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame)
             frame_out = frame;
         }
 
-        ret = enc_frame(ost, frame_out);
-        av_frame_unref(frame_out);
-        if (ret < 0)
-            return ret;
+        if (buffer) {
+            AVFrame *f = av_frame_alloc();
+
+            if (!f) {
+                av_frame_unref(frame_out);
+                return AVERROR(ENOMEM);
+            }
+
+            av_frame_move_ref(f, frame_out);
+            f->opaque = (void*)(intptr_t)ofp->index;
+
+            ret = av_fifo_write(fgt->frame_queue_out, &f, 1);
+            if (ret < 0) {
+                av_frame_free(&f);
+                return AVERROR(ENOMEM);
+            }
+        } else {
+            // return the frame to the main thread
+            ret = tq_send(fgp->queue_out, ofp->index, frame_out);
+            if (ret < 0) {
+                av_frame_unref(frame_out);
+                fgt->eof_out[ofp->index] = 1;
+                return ret == AVERROR_EOF ? 0 : ret;
+            }
+        }
 
         if (type == AVMEDIA_TYPE_VIDEO) {
             ofp->fps.frame_number++;
@@ -2149,7 +2340,7 @@ static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame)
                 frame->flags &= ~AV_FRAME_FLAG_KEY;
         }
 
-        ofp->got_frame = 1;
+        fgt->got_frame = 1;
     }
 
     if (frame && frame_prev) {
@@ -2157,23 +2348,27 @@ static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame)
         av_frame_move_ref(frame_prev, frame);
     }
 
+    if (!frame) {
+        tq_send_finish(fgp->queue_out, ofp->index);
+        fgt->eof_out[ofp->index] = 1;
+    }
+
     return 0;
 }
 
-static int fg_output_step(OutputFilterPriv *ofp, int flush)
+static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
+                          AVFrame *frame,  int buffer)
 {
     FilterGraphPriv    *fgp = fgp_from_fg(ofp->ofilter.graph);
     OutputStream       *ost = ofp->ofilter.ost;
-    AVFrame          *frame = fgp->frame;
     AVFilterContext *filter = ofp->filter;
     FrameData *fd;
     int ret;
 
     ret = av_buffersink_get_frame_flags(filter, frame,
                                         AV_BUFFERSINK_FLAG_NO_REQUEST);
-    if (flush && ret == AVERROR_EOF && ofp->got_frame &&
-        ost->type == AVMEDIA_TYPE_VIDEO) {
-        ret = fg_output_frame(ofp, NULL);
+    if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) {
+        ret = fg_output_frame(ofp, fgt, NULL, buffer);
         return (ret < 0) ? ret : 1;
     } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
         return 1;
@@ -2183,22 +2378,18 @@ static int fg_output_step(OutputFilterPriv *ofp, int flush)
                av_err2str(ret));
         return ret;
     }
-    if (ost->finished) {
+
+    if (fgt->eof_out[ofp->index]) {
         av_frame_unref(frame);
         return 0;
     }
 
     frame->time_base = av_buffersink_get_time_base(filter);
 
-    if (frame->pts != AV_NOPTS_VALUE) {
-        ost->filter->last_pts = av_rescale_q(frame->pts, frame->time_base,
-                                             AV_TIME_BASE_Q);
-
-        if (debug_ts)
-            av_log(fgp, AV_LOG_INFO, "filter_raw -> pts:%s pts_time:%s time_base:%d/%d\n",
-                   av_ts2str(frame->pts), av_ts2timestr(frame->pts, &frame->time_base),
-                             frame->time_base.num, frame->time_base.den);
-    }
+    if (debug_ts)
+        av_log(fgp, AV_LOG_INFO, "filter_raw -> pts:%s pts_time:%s time_base:%d/%d\n",
+               av_ts2str(frame->pts), av_ts2timestr(frame->pts, &frame->time_base),
+                         frame->time_base.num, frame->time_base.den);
 
     // Choose the output timebase the first time we get a frame.
     if (!ofp->tb_out_locked) {
@@ -2231,7 +2422,7 @@ static int fg_output_step(OutputFilterPriv *ofp, int flush)
         fd->frame_rate_filter = ofp->fps.framerate;
     }
 
-    ret = fg_output_frame(ofp, frame);
+    ret = fg_output_frame(ofp, fgt, frame, buffer);
     av_frame_unref(frame);
     if (ret < 0)
         return ret;
@@ -2239,18 +2430,38 @@ static int fg_output_step(OutputFilterPriv *ofp, int flush)
     return 0;
 }
 
-int reap_filters(FilterGraph *fg, int flush)
+/* retrieve all frames available at filtergraph outputs and either send them to
+ * the main thread (buffer=0) or buffer them for later (buffer=1) */
+static int read_frames(FilterGraph *fg, FilterGraphThread *fgt,
+                       AVFrame *frame, int buffer)
 {
+    FilterGraphPriv *fgp = fgp_from_fg(fg);
+    int ret = 0;
+
     if (!fg->graph)
         return 0;
 
+    // process buffered frames
+    if (!buffer) {
+        AVFrame *f;
+
+        while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) {
+            int out_idx = (intptr_t)f->opaque;
+            f->opaque = NULL;
+            ret = tq_send(fgp->queue_out, out_idx, f);
+            av_frame_free(&f);
+            if (ret < 0 && ret != AVERROR_EOF)
+                return ret;
+        }
+    }
+
     /* Reap all buffers present in the buffer sinks */
     for (int i = 0; i < fg->nb_outputs; i++) {
         OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
         int ret = 0;
 
         while (!ret) {
-            ret = fg_output_step(ofp, flush);
+            ret = fg_output_step(ofp, fgt, frame, buffer);
             if (ret < 0)
                 return ret;
         }
@@ -2259,7 +2470,7 @@ int reap_filters(FilterGraph *fg, int flush)
     return 0;
 }
 
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
+static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
 {
     InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
     int64_t pts2;
@@ -2284,11 +2495,17 @@ void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t
         sub2video_push_ref(ifp, pts2);
 }
 
-int ifilter_sub2video(InputFilter *ifilter, const AVFrame *frame)
+static int sub2video_frame(InputFilter *ifilter, const AVFrame *frame)
 {
     InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
     int ret;
 
+    // heartbeat frame
+    if (frame && !frame->buf[0]) {
+        sub2video_heartbeat(ifilter, frame->pts, frame->time_base);
+        return 0;
+    }
+
     if (ifilter->graph->graph) {
         if (!frame) {
             if (ifp->sub2video.end_pts < INT64_MAX)
@@ -2317,12 +2534,13 @@ int ifilter_sub2video(InputFilter *ifilter, const AVFrame *frame)
     return 0;
 }
 
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
+static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter,
+                    int64_t pts, AVRational tb)
 {
     InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
     int ret;
 
-    ifp->eof = 1;
+    fgt->eof_in[ifp->index] = 1;
 
     if (ifp->filter) {
         pts = av_rescale_q_rnd(pts, tb, ifp->time_base,
@@ -2346,7 +2564,7 @@ int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
                 return ret;
 
             if (ifilter_has_all_input_formats(ifilter->graph)) {
-                ret = configure_filtergraph(ifilter->graph);
+                ret = configure_filtergraph(ifilter->graph, fgt);
                 if (ret < 0) {
                     av_log(NULL, AV_LOG_ERROR, "Error initializing filters!\n");
                     return ret;
@@ -2365,10 +2583,10 @@ int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
     return 0;
 }
 
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
+static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
+                      InputFilter *ifilter, AVFrame *frame)
 {
     InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
-    FilterGraph *fg = ifilter->graph;
     AVFrameSideData *sd;
     int need_reinit, ret;
 
@@ -2408,10 +2626,13 @@ int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
 
     /* (re)init the graph if possible, otherwise buffer the frame and return */
     if (need_reinit || !fg->graph) {
+        AVFrame *tmp = av_frame_alloc();
+
+        if (!tmp)
+            return AVERROR(ENOMEM);
+
         if (!ifilter_has_all_input_formats(fg)) {
-            AVFrame *tmp = av_frame_clone(frame);
-            if (!tmp)
-                return AVERROR(ENOMEM);
+            av_frame_move_ref(tmp, frame);
 
             ret = av_fifo_write(ifp->frame_queue, &tmp, 1);
             if (ret < 0)
@@ -2420,27 +2641,18 @@ int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
             return ret;
         }
 
-        ret = reap_filters(fg, 0);
-        if (ret < 0 && ret != AVERROR_EOF) {
-            av_log(fg, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
+        ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0;
+        av_frame_free(&tmp);
+        if (ret < 0)
             return ret;
-        }
 
-        ret = configure_filtergraph(fg);
+        ret = configure_filtergraph(fg, fgt);
         if (ret < 0) {
             av_log(fg, AV_LOG_ERROR, "Error reinitializing filters!\n");
             return ret;
         }
     }
 
-    if (keep_reference) {
-        ret = av_frame_ref(ifp->frame, frame);
-        if (ret < 0)
-            return ret;
-    } else
-        av_frame_move_ref(ifp->frame, frame);
-    frame = ifp->frame;
-
     frame->pts       = av_rescale_q(frame->pts,      frame->time_base, ifp->time_base);
     frame->duration  = av_rescale_q(frame->duration, frame->time_base, ifp->time_base);
     frame->time_base = ifp->time_base;
@@ -2462,20 +2674,30 @@ int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
     return 0;
 }
 
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
+static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt,
+                       AVFrame *frame)
 {
-    FilterGraphPriv *fgp = fgp_from_fg(graph);
-    int i, ret;
-    int nb_requests, nb_requests_max = 0;
-    InputStream *ist;
+    const enum FrameOpaque msg = (intptr_t)frame->opaque;
+    FilterGraph            *fg = &fgp->fg;
+    int              graph_eof = 0;
+    int ret;
 
-    if (!graph->graph) {
-        for (int i = 0; i < graph->nb_inputs; i++) {
-            InputFilter *ifilter = graph->inputs[i];
+    frame->opaque = NULL;
+    av_assert0(msg > 0);
+    av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]);
+
+    if (!fg->graph) {
+        // graph not configured yet, ignore all messages other than choosing
+        // the input to read from
+        if (msg != FRAME_OPAQUE_CHOOSE_INPUT)
+            goto done;
+
+        for (int i = 0; i < fg->nb_inputs; i++) {
+            InputFilter *ifilter = fg->inputs[i];
             InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
-            if (ifp->format < 0 && !ifp->eof) {
-                *best_ist = ifp->ist;
-                return 0;
+            if (ifp->format < 0 && !fgt->eof_in[i]) {
+                frame->opaque = (void*)(intptr_t)(i + 1);
+                goto done;
             }
         }
 
@@ -2486,16 +2708,310 @@ int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
         return AVERROR_BUG;
     }
 
-    *best_ist = NULL;
-    ret = avfilter_graph_request_oldest(graph->graph);
-    if (ret >= 0)
-        return reap_filters(graph, 0);
+    if (msg == FRAME_OPAQUE_SEND_COMMAND) {
+        FilterCommand *fc = (FilterCommand*)frame->buf[0]->data;
+        send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters);
+        av_frame_unref(frame);
+        goto done;
+    }
 
-    if (ret == AVERROR_EOF) {
-        reap_filters(graph, 1);
-        for (int i = 0; i < graph->nb_outputs; i++) {
-            OutputFilter *ofilter = graph->outputs[i];
-            OutputFilterPriv *ofp = ofp_from_ofilter(ofilter);
+    if (msg == FRAME_OPAQUE_CHOOSE_INPUT) {
+        ret = avfilter_graph_request_oldest(fg->graph);
+
+        graph_eof = ret == AVERROR_EOF;
+
+        if (ret == AVERROR(EAGAIN)) {
+            frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1);
+            goto done;
+        } else if (ret < 0 && !graph_eof)
+            return ret;
+    }
+
+    ret = read_frames(fg, fgt, frame, 0);
+    if (ret < 0) {
+        av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for encoding\n");
+        return ret;
+    }
+
+    if (graph_eof)
+        return AVERROR_EOF;
+
+    // signal to the main thread that we are done processing the message
+done:
+    ret = tq_send(fgp->queue_out, fg->nb_outputs, frame);
+    if (ret < 0) {
+        if (ret != AVERROR_EOF)
+            av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
+        return ret;
+    }
+
+    return 0;
+}
+
+static void fg_thread_set_name(const FilterGraph *fg)
+{
+    char name[16];
+    if (filtergraph_is_simple(fg)) {
+        OutputStream *ost = fg->outputs[0]->ost;
+        snprintf(name, sizeof(name), "%cf#%d:%d",
+                 av_get_media_type_string(ost->type)[0],
+                 ost->file_index, ost->index);
+    } else {
+        snprintf(name, sizeof(name), "fc%d", fg->index);
+    }
+
+    ff_thread_setname(name);
+}
+
+static void fg_thread_uninit(FilterGraphThread *fgt)
+{
+    if (fgt->frame_queue_out) {
+        AVFrame *frame;
+        while (av_fifo_read(fgt->frame_queue_out, &frame, 1) >= 0)
+            av_frame_free(&frame);
+        av_fifo_freep2(&fgt->frame_queue_out);
+    }
+
+    av_frame_free(&fgt->frame);
+    av_freep(&fgt->eof_in);
+    av_freep(&fgt->eof_out);
+
+    memset(fgt, 0, sizeof(*fgt));
+}
+
+static int fg_thread_init(FilterGraphThread *fgt, const FilterGraph *fg)
+{
+    memset(fgt, 0, sizeof(*fgt));
+
+    fgt->frame = av_frame_alloc();
+    if (!fgt->frame)
+        goto fail;
+
+    fgt->eof_in = av_calloc(fg->nb_inputs, sizeof(*fgt->eof_in));
+    if (!fgt->eof_in)
+        goto fail;
+
+    fgt->eof_out = av_calloc(fg->nb_outputs, sizeof(*fgt->eof_out));
+    if (!fgt->eof_out)
+        goto fail;
+
+    fgt->frame_queue_out = av_fifo_alloc2(1, sizeof(AVFrame*), AV_FIFO_FLAG_AUTO_GROW);
+    if (!fgt->frame_queue_out)
+        goto fail;
+
+    return 0;
+
+fail:
+    fg_thread_uninit(fgt);
+    return AVERROR(ENOMEM);
+}
+
+static void *filter_thread(void *arg)
+{
+    FilterGraphPriv *fgp = arg;
+    FilterGraph      *fg = &fgp->fg;
+
+    FilterGraphThread fgt;
+    int ret = 0, input_status = 0;
+
+    ret = fg_thread_init(&fgt, fg);
+    if (ret < 0)
+        goto finish;
+
+    fg_thread_set_name(fg);
+
+    // if we have all input parameters the graph can now be configured
+    if (ifilter_has_all_input_formats(fg)) {
+        ret = configure_filtergraph(fg, &fgt);
+        if (ret < 0) {
+            av_log(fg, AV_LOG_ERROR, "Error configuring filter graph: %s\n",
+                   av_err2str(ret));
+            goto finish;
+        }
+    }
+
+    while (1) {
+        InputFilter *ifilter;
+        InputFilterPriv *ifp;
+        enum FrameOpaque o;
+        int input_idx, eof_frame;
+
+        input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
+        if (input_idx < 0 ||
+            (input_idx == fg->nb_inputs && input_status < 0)) {
+            av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n");
+            break;
+        }
+
+        o = (intptr_t)fgt.frame->opaque;
+
+        // message on the control stream
+        if (input_idx == fg->nb_inputs) {
+            ret = msg_process(fgp, &fgt, fgt.frame);
+            if (ret < 0)
+                goto finish;
+
+            continue;
+        }
+
+        // we received an input frame or EOF
+        ifilter   = fg->inputs[input_idx];
+        ifp       = ifp_from_ifilter(ifilter);
+        eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF;
+        if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) {
+            int hb_frame = input_status >= 0 && o == FRAME_OPAQUE_SUB_HEARTBEAT;
+            ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || hb_frame) ? fgt.frame : NULL);
+        } else if (input_status >= 0 && fgt.frame->buf[0]) {
+            ret = send_frame(fg, &fgt, ifilter, fgt.frame);
+        } else {
+            int64_t   pts = input_status >= 0 ? fgt.frame->pts : AV_NOPTS_VALUE;
+            AVRational tb = input_status >= 0 ? fgt.frame->time_base : (AVRational){ 1, 1 };
+            ret = send_eof(&fgt, ifilter, pts, tb);
+        }
+        av_frame_unref(fgt.frame);
+        if (ret < 0)
+            break;
+
+        if (eof_frame) {
+            // an EOF frame is immediately followed by sender closing
+            // the corresponding stream, so retrieve that event
+            input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
+            av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index);
+        }
+
+        // signal to the main thread that we are done
+        ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame);
+        if (ret < 0) {
+            if (ret == AVERROR_EOF)
+                break;
+
+            av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
+            goto finish;
+        }
+    }
+
+finish:
+    // EOF is normal termination
+    if (ret == AVERROR_EOF)
+        ret = 0;
+
+    for (int i = 0; i <= fg->nb_inputs; i++)
+        tq_receive_finish(fgp->queue_in, i);
+    for (int i = 0; i <= fg->nb_outputs; i++)
+        tq_send_finish(fgp->queue_out, i);
+
+    fg_thread_uninit(&fgt);
+
+    av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n");
+
+    return (void*)(intptr_t)ret;
+}
+
+static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter,
+                             AVFrame *frame, enum FrameOpaque type)
+{
+    InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
+    int output_idx, ret;
+
+    if (ifp->eof) {
+        av_frame_unref(frame);
+        return AVERROR_EOF;
+    }
+
+    frame->opaque = (void*)(intptr_t)type;
+
+    ret = tq_send(fgp->queue_in, ifp->index, frame);
+    if (ret < 0) {
+        ifp->eof = 1;
+        av_frame_unref(frame);
+        return ret;
+    }
+
+    if (type == FRAME_OPAQUE_EOF)
+        tq_send_finish(fgp->queue_in, ifp->index);
+
+    // wait for the frame to be processed
+    ret = tq_receive(fgp->queue_out, &output_idx, frame);
+    av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
+
+    return ret;
+}
+
+int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
+{
+    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
+    int ret;
+
+    if (keep_reference) {
+        ret = av_frame_ref(fgp->frame, frame);
+        if (ret < 0)
+            return ret;
+    } else
+        av_frame_move_ref(fgp->frame, frame);
+
+    return thread_send_frame(fgp, ifilter, fgp->frame, 0);
+}
+
+int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
+{
+    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
+    int ret;
+
+    fgp->frame->pts       = pts;
+    fgp->frame->time_base = tb;
+
+    ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF);
+
+    return ret == AVERROR_EOF ? 0 : ret;
+}
+
+void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
+{
+    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
+
+    fgp->frame->pts       = pts;
+    fgp->frame->time_base = tb;
+
+    thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT);
+}
+
+int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
+{
+    FilterGraphPriv *fgp = fgp_from_fg(graph);
+    int ret, got_frames = 0;
+
+    if (fgp->eof_in)
+        return AVERROR_EOF;
+
+    // signal to the filtering thread to return all frames it can
+    av_assert0(!fgp->frame->buf[0]);
+    fgp->frame->opaque = (void*)(intptr_t)(best_ist             ?
+                                           FRAME_OPAQUE_CHOOSE_INPUT :
+                                           FRAME_OPAQUE_REAP_FILTERS);
+
+    ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame);
+    if (ret < 0) {
+        fgp->eof_in = 1;
+        goto finish;
+    }
+
+    while (1) {
+        OutputFilter *ofilter;
+        OutputFilterPriv *ofp;
+        OutputStream *ost;
+        int output_idx;
+
+        ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
+
+        // EOF on the whole queue or the control stream
+        if (output_idx < 0 ||
+            (ret < 0 && output_idx == graph->nb_outputs))
+            goto finish;
+
+        // EOF for a specific stream
+        if (ret < 0) {
+            ofilter = graph->outputs[output_idx];
+            ofp     = ofp_from_ofilter(ofilter);
 
             // we are finished and no frames were ever seen at this output,
             // at least initialize the encoder with a dummy frame
@@ -2533,30 +3049,111 @@ int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
                 av_frame_unref(frame);
             }
 
-            close_output_stream(ofilter->ost);
-        }
-        return 0;
-    }
-    if (ret != AVERROR(EAGAIN))
-        return ret;
-
-    for (i = 0; i < graph->nb_inputs; i++) {
-        InputFilter *ifilter = graph->inputs[i];
-        InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
-
-        ist = ifp->ist;
-        if (input_files[ist->file_index]->eagain || ifp->eof)
+            close_output_stream(graph->outputs[output_idx]->ost);
             continue;
-        nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter);
-        if (nb_requests > nb_requests_max) {
-            nb_requests_max = nb_requests;
-            *best_ist = ist;
         }
+
+        // request was fully processed by the filtering thread,
+        // return the input stream to read from, if needed
+        if (output_idx == graph->nb_outputs) {
+            int input_idx = (intptr_t)fgp->frame->opaque - 1;
+            av_assert0(input_idx <= graph->nb_inputs);
+
+            if (best_ist) {
+                *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ?
+                            ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL;
+
+                if (input_idx < 0 && !got_frames) {
+                    for (int i = 0; i < graph->nb_outputs; i++)
+                        graph->outputs[i]->ost->unavailable = 1;
+                }
+            }
+            break;
+        }
+
+        // got a frame from the filtering thread, send it for encoding
+        ofilter = graph->outputs[output_idx];
+        ost     = ofilter->ost;
+        ofp     = ofp_from_ofilter(ofilter);
+
+        if (ost->finished) {
+            av_frame_unref(fgp->frame);
+            tq_receive_finish(fgp->queue_out, output_idx);
+            continue;
+        }
+
+        if (fgp->frame->pts != AV_NOPTS_VALUE) {
+            ofilter->last_pts = av_rescale_q(fgp->frame->pts,
+                                             fgp->frame->time_base,
+                                             AV_TIME_BASE_Q);
+        }
+
+        ret = enc_frame(ost, fgp->frame);
+        av_frame_unref(fgp->frame);
+        if (ret < 0)
+            goto finish;
+
+        ofp->got_frame = 1;
+        got_frames     = 1;
     }
 
-    if (!*best_ist)
-        for (i = 0; i < graph->nb_outputs; i++)
-            graph->outputs[i]->ost->unavailable = 1;
+finish:
+    if (ret < 0) {
+        fgp->eof_in = 1;
+        for (int i = 0; i < graph->nb_outputs; i++)
+            close_output_stream(graph->outputs[i]->ost);
+    }
 
-    return 0;
+    return ret;
+}
+
+int reap_filters(FilterGraph *fg, int flush)
+{
+    return fg_transcode_step(fg, NULL);
+}
+
+void fg_send_command(FilterGraph *fg, double time, const char *target,
+                     const char *command, const char *arg, int all_filters)
+{
+    FilterGraphPriv *fgp = fgp_from_fg(fg);
+    AVBufferRef *buf;
+    FilterCommand *fc;
+    int output_idx, ret;
+
+    if (!fgp->queue_in)
+        return;
+
+    fc = av_mallocz(sizeof(*fc));
+    if (!fc)
+        return;
+
+    buf = av_buffer_create((uint8_t*)fc, sizeof(*fc), filter_command_free, NULL, 0);
+    if (!buf) {
+        av_freep(&fc);
+        return;
+    }
+
+    fc->target  = av_strdup(target);
+    fc->command = av_strdup(command);
+    fc->arg     = av_strdup(arg);
+    if (!fc->target || !fc->command || !fc->arg) {
+        av_buffer_unref(&buf);
+        return;
+    }
+
+    fc->time        = time;
+    fc->all_filters = all_filters;
+
+    fgp->frame->buf[0] = buf;
+    fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND;
+
+    ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame);
+    if (ret < 0) {
+        av_frame_unref(fgp->frame);
+        return;
+    }
+
+    // wait for the frame to be processed
+    ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
+    av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
 }
-- 
2.42.0



More information about the ffmpeg-devel mailing list