[FFmpeg-devel] [PATCH 10/24] fftools/ffmpeg_filter: move filtering to a separate thread
Anton Khirnov
anton at khirnov.net
Sat Nov 4 09:56:19 EET 2023
As previously for decoding, this is merely "scaffolding" for moving to a
fully threaded architecture and does not yet make filtering truly
parallel - the main thread will currently wait for the filtering thread
to finish its work before continuing. That will change in future commits
after encoders are also moved to threads and a thread-aware scheduler is
added.
---
fftools/ffmpeg.h | 9 +-
fftools/ffmpeg_dec.c | 39 +-
fftools/ffmpeg_filter.c | 825 ++++++++++++++++++++++++++++++++++------
3 files changed, 730 insertions(+), 143 deletions(-)
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 41935d39d5..9852df8320 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -80,6 +80,14 @@ enum HWAccelID {
HWACCEL_GENERIC,
};
+enum FrameOpaque {
+ FRAME_OPAQUE_REAP_FILTERS = 1,
+ FRAME_OPAQUE_CHOOSE_INPUT,
+ FRAME_OPAQUE_SUB_HEARTBEAT,
+ FRAME_OPAQUE_EOF,
+ FRAME_OPAQUE_SEND_COMMAND,
+};
+
typedef struct HWDevice {
const char *name;
enum AVHWDeviceType type;
@@ -728,7 +736,6 @@ FrameData *frame_data(AVFrame *frame);
int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference);
int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb);
-int ifilter_sub2video(InputFilter *ifilter, const AVFrame *frame);
void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb);
/**
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index 517d6b3ced..b60bad1220 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -147,11 +147,12 @@ fail:
static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
{
- int i, ret;
+ int i, ret = 0;
- av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */
for (i = 0; i < ist->nb_filters; i++) {
- ret = ifilter_send_frame(ist->filters[i], decoded_frame, i < ist->nb_filters - 1);
+ ret = ifilter_send_frame(ist->filters[i], decoded_frame,
+ i < ist->nb_filters - 1 ||
+ ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
if (ret == AVERROR_EOF)
ret = 0; /* ignore */
if (ret < 0) {
@@ -380,15 +381,6 @@ static int video_frame_process(InputStream *ist, AVFrame *frame)
return 0;
}
-static void sub2video_flush(InputStream *ist)
-{
- for (int i = 0; i < ist->nb_filters; i++) {
- int ret = ifilter_sub2video(ist->filters[i], NULL);
- if (ret != AVERROR_EOF && ret < 0)
- av_log(NULL, AV_LOG_WARNING, "Flush the frame error.\n");
- }
-}
-
static int process_subtitle(InputStream *ist, AVFrame *frame)
{
Decoder *d = ist->decoder;
@@ -426,14 +418,9 @@ static int process_subtitle(InputStream *ist, AVFrame *frame)
if (!subtitle)
return 0;
- for (int i = 0; i < ist->nb_filters; i++) {
- ret = ifilter_sub2video(ist->filters[i], frame);
- if (ret < 0) {
- av_log(ist, AV_LOG_ERROR, "Error sending a subtitle for filtering: %s\n",
- av_err2str(ret));
- return ret;
- }
- }
+ ret = send_frame_to_filters(ist, frame);
+ if (ret < 0)
+ return ret;
subtitle = (AVSubtitle*)frame->buf[0]->data;
if (!subtitle->num_rects)
@@ -824,14 +811,10 @@ finish:
return ret;
// signal EOF to our downstreams
- if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE)
- sub2video_flush(ist);
- else {
- ret = send_filter_eof(ist);
- if (ret < 0) {
- av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
- return ret;
- }
+ ret = send_filter_eof(ist);
+ if (ret < 0) {
+ av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
+ return ret;
}
return AVERROR_EOF;
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 4edf634b26..384cdedcd0 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -21,6 +21,8 @@
#include <stdint.h>
#include "ffmpeg.h"
+#include "ffmpeg_utils.h"
+#include "thread_queue.h"
#include "libavfilter/avfilter.h"
#include "libavfilter/buffersink.h"
@@ -53,12 +55,50 @@ typedef struct FilterGraphPriv {
int is_meta;
int disable_conversions;
+ int nb_inputs_bound;
+ int nb_outputs_bound;
+
const char *graph_desc;
// frame for temporarily holding output from the filtergraph
AVFrame *frame;
// frame for sending output to the encoder
AVFrame *frame_enc;
+
+ pthread_t thread;
+ /**
+ * Queue for sending frames from the main thread to the filtergraph. Has
+ * nb_inputs+1 streams - the first nb_inputs stream correspond to
+ * filtergraph inputs. Frames on those streams may have their opaque set to
+ * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the
+ * EOF event for the correspondint stream. Will be immediately followed by
+ * this stream being send-closed.
+ * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of
+ * a subtitle heartbeat event. Will only be sent for sub2video streams.
+ *
+ * The last stream is "control" - the main thread sends empty AVFrames with
+ * opaque set to
+ * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame available
+ * from filtergraph outputs. These frames are sent to corresponding
+ * streams in queue_out. Finally an empty frame is sent to the control
+ * stream in queue_out.
+ * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames are
+ * available the terminating empty frame's opaque will contain the index+1
+ * of the filtergraph input to which more input frames should be supplied.
+ */
+ ThreadQueue *queue_in;
+ /**
+ * Queue for sending frames from the filtergraph back to the main thread.
+ * Has nb_outputs+1 streams - the first nb_outputs stream correspond to
+ * filtergraph outputs.
+ *
+ * The last stream is "control" - see documentation for queue_in for more
+ * details.
+ */
+ ThreadQueue *queue_out;
+ // submitting frames to filter thread returned EOF
+ // this only happens on thread exit, so is not per-input
+ int eof_in;
} FilterGraphPriv;
static FilterGraphPriv *fgp_from_fg(FilterGraph *fg)
@@ -71,6 +111,22 @@ static const FilterGraphPriv *cfgp_from_cfg(const FilterGraph *fg)
return (const FilterGraphPriv*)fg;
}
+// data that is local to the filter thread and not visible outside of it
+typedef struct FilterGraphThread {
+ AVFrame *frame;
+
+ // Temporary buffer for output frames, since on filtergraph reset
+ // we cannot send them to encoders immediately.
+ // The output index is stored in frame opaque.
+ AVFifo *frame_queue_out;
+
+ int got_frame;
+
+ // EOF status of each input/output, as received by the thread
+ uint8_t *eof_in;
+ uint8_t *eof_out;
+} FilterGraphThread;
+
typedef struct InputFilterPriv {
InputFilter ifilter;
@@ -204,7 +260,25 @@ static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter)
return (OutputFilterPriv*)ofilter;
}
-static int configure_filtergraph(FilterGraph *fg);
+typedef struct FilterCommand {
+ char *target;
+ char *command;
+ char *arg;
+
+ double time;
+ int all_filters;
+} FilterCommand;
+
+static void filter_command_free(void *opaque, uint8_t *data)
+{
+ FilterCommand *fc = (FilterCommand*)data;
+
+ av_freep(&fc->target);
+ av_freep(&fc->command);
+ av_freep(&fc->arg);
+
+ av_free(data);
+}
static int sub2video_get_blank_frame(InputFilterPriv *ifp)
{
@@ -574,6 +648,59 @@ static int ifilter_has_all_input_formats(FilterGraph *fg)
return 1;
}
+static void *filter_thread(void *arg);
+
+// start the filtering thread once all inputs and outputs are bound
+static int fg_thread_try_start(FilterGraphPriv *fgp)
+{
+ FilterGraph *fg = &fgp->fg;
+ ObjPool *op;
+ int ret = 0;
+
+ if (fgp->nb_inputs_bound < fg->nb_inputs ||
+ fgp->nb_outputs_bound < fg->nb_outputs)
+ return 0;
+
+ op = objpool_alloc_frames();
+ if (!op)
+ return AVERROR(ENOMEM);
+
+ fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move);
+ if (!fgp->queue_in) {
+ objpool_free(&op);
+ return AVERROR(ENOMEM);
+ }
+
+ // at least one output is mandatory
+ op = objpool_alloc_frames();
+ if (!op)
+ goto fail;
+
+ fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move);
+ if (!fgp->queue_out) {
+ objpool_free(&op);
+ goto fail;
+ }
+
+ ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp);
+ if (ret) {
+ ret = AVERROR(ret);
+ av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n",
+ fg->index, av_err2str(ret));
+ goto fail;
+ }
+
+ return 0;
+fail:
+ if (ret >= 0)
+ ret = AVERROR(ENOMEM);
+
+ tq_free(&fgp->queue_in);
+ tq_free(&fgp->queue_out);
+
+ return ret;
+}
+
static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in)
{
AVFilterContext *ctx = inout->filter_ctx;
@@ -607,6 +734,7 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg)
static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
{
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
+ FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
int ret;
av_assert0(!ifp->ist);
@@ -624,7 +752,10 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
return AVERROR(ENOMEM);
}
- return 0;
+ fgp->nb_inputs_bound++;
+ av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs);
+
+ return fg_thread_try_start(fgp);
}
static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
@@ -756,24 +887,10 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost)
break;
}
- // if we have all input parameters and all outputs are bound,
- // the graph can now be configured
- if (ifilter_has_all_input_formats(fg)) {
- int ret;
+ fgp->nb_outputs_bound++;
+ av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
- for (int i = 0; i < fg->nb_outputs; i++)
- if (!fg->outputs[i]->ost)
- return 0;
-
- ret = configure_filtergraph(fg);
- if (ret < 0) {
- av_log(fg, AV_LOG_ERROR, "Error configuring filter graph: %s\n",
- av_err2str(ret));
- return ret;
- }
- }
-
- return 0;
+ return fg_thread_try_start(fgp);
}
static InputFilter *ifilter_alloc(FilterGraph *fg)
@@ -803,6 +920,34 @@ static InputFilter *ifilter_alloc(FilterGraph *fg)
return ifilter;
}
+static int fg_thread_stop(FilterGraphPriv *fgp)
+{
+ void *ret;
+
+ if (!fgp->queue_in)
+ return 0;
+
+ for (int i = 0; i <= fgp->fg.nb_inputs; i++) {
+ InputFilterPriv *ifp = i < fgp->fg.nb_inputs ?
+ ifp_from_ifilter(fgp->fg.inputs[i]) : NULL;
+
+ if (ifp)
+ ifp->eof = 1;
+
+ tq_send_finish(fgp->queue_in, i);
+ }
+
+ for (int i = 0; i <= fgp->fg.nb_outputs; i++)
+ tq_receive_finish(fgp->queue_out, i);
+
+ pthread_join(fgp->thread, &ret);
+
+ tq_free(&fgp->queue_in);
+ tq_free(&fgp->queue_out);
+
+ return (int)(intptr_t)ret;
+}
+
void fg_free(FilterGraph **pfg)
{
FilterGraph *fg = *pfg;
@@ -812,6 +957,8 @@ void fg_free(FilterGraph **pfg)
return;
fgp = fgp_from_fg(fg);
+ fg_thread_stop(fgp);
+
avfilter_graph_free(&fg->graph);
for (int j = 0; j < fg->nb_inputs; j++) {
InputFilter *ifilter = fg->inputs[j];
@@ -1614,7 +1761,7 @@ static int graph_is_meta(AVFilterGraph *graph)
return 1;
}
-static int configure_filtergraph(FilterGraph *fg)
+static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
{
FilterGraphPriv *fgp = fgp_from_fg(fg);
AVBufferRef *hw_device;
@@ -1738,7 +1885,7 @@ static int configure_filtergraph(FilterGraph *fg)
/* send the EOFs for the finished inputs */
for (i = 0; i < fg->nb_inputs; i++) {
InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]);
- if (ifp->eof) {
+ if (fgt->eof_in[i]) {
ret = av_buffersrc_add_frame(ifp->filter, NULL);
if (ret < 0)
goto fail;
@@ -1821,8 +1968,8 @@ int filtergraph_is_simple(const FilterGraph *fg)
return fgp->is_simple;
}
-void fg_send_command(FilterGraph *fg, double time, const char *target,
- const char *command, const char *arg, int all_filters)
+static void send_command(FilterGraph *fg, double time, const char *target,
+ const char *command, const char *arg, int all_filters)
{
int ret;
@@ -1845,6 +1992,29 @@ void fg_send_command(FilterGraph *fg, double time, const char *target,
}
}
+static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt)
+{
+ int nb_requests, nb_requests_max = 0;
+ int best_input = -1;
+
+ for (int i = 0; i < fg->nb_inputs; i++) {
+ InputFilter *ifilter = fg->inputs[i];
+ InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
+ InputStream *ist = ifp->ist;
+
+ if (input_files[ist->file_index]->eagain || fgt->eof_in[i])
+ continue;
+
+ nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter);
+ if (nb_requests > nb_requests_max) {
+ nb_requests_max = nb_requests;
+ best_input = i;
+ }
+ }
+
+ return best_input;
+}
+
static int choose_out_timebase(OutputFilterPriv *ofp, AVFrame *frame)
{
OutputFilter *ofilter = &ofp->ofilter;
@@ -2080,16 +2250,16 @@ finish:
fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY);
}
-static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame)
+static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
+ AVFrame *frame, int buffer)
{
FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
- OutputStream *ost = ofp->ofilter.ost;
AVFrame *frame_prev = ofp->fps.last_frame;
enum AVMediaType type = ofp->ofilter.type;
- int64_t nb_frames = 1, nb_frames_prev = 0;
+ int64_t nb_frames = !!frame, nb_frames_prev = 0;
- if (type == AVMEDIA_TYPE_VIDEO)
+ if (type == AVMEDIA_TYPE_VIDEO && (frame || fgt->got_frame))
video_sync_process(ofp, frame, &nb_frames, &nb_frames_prev);
for (int64_t i = 0; i < nb_frames; i++) {
@@ -2128,10 +2298,31 @@ static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame)
frame_out = frame;
}
- ret = enc_frame(ost, frame_out);
- av_frame_unref(frame_out);
- if (ret < 0)
- return ret;
+ if (buffer) {
+ AVFrame *f = av_frame_alloc();
+
+ if (!f) {
+ av_frame_unref(frame_out);
+ return AVERROR(ENOMEM);
+ }
+
+ av_frame_move_ref(f, frame_out);
+ f->opaque = (void*)(intptr_t)ofp->index;
+
+ ret = av_fifo_write(fgt->frame_queue_out, &f, 1);
+ if (ret < 0) {
+ av_frame_free(&f);
+ return AVERROR(ENOMEM);
+ }
+ } else {
+ // return the frame to the main thread
+ ret = tq_send(fgp->queue_out, ofp->index, frame_out);
+ if (ret < 0) {
+ av_frame_unref(frame_out);
+ fgt->eof_out[ofp->index] = 1;
+ return ret == AVERROR_EOF ? 0 : ret;
+ }
+ }
if (type == AVMEDIA_TYPE_VIDEO) {
ofp->fps.frame_number++;
@@ -2141,7 +2332,7 @@ static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame)
frame->flags &= ~AV_FRAME_FLAG_KEY;
}
- ofp->got_frame = 1;
+ fgt->got_frame = 1;
}
if (frame && frame_prev) {
@@ -2149,23 +2340,27 @@ static int fg_output_frame(OutputFilterPriv *ofp, AVFrame *frame)
av_frame_move_ref(frame_prev, frame);
}
+ if (!frame) {
+ tq_send_finish(fgp->queue_out, ofp->index);
+ fgt->eof_out[ofp->index] = 1;
+ }
+
return 0;
}
-static int fg_output_step(OutputFilterPriv *ofp, int flush)
+static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
+ AVFrame *frame, int buffer)
{
FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
OutputStream *ost = ofp->ofilter.ost;
- AVFrame *frame = fgp->frame;
AVFilterContext *filter = ofp->filter;
FrameData *fd;
int ret;
ret = av_buffersink_get_frame_flags(filter, frame,
AV_BUFFERSINK_FLAG_NO_REQUEST);
- if (flush && ret == AVERROR_EOF && ofp->got_frame &&
- ost->type == AVMEDIA_TYPE_VIDEO) {
- ret = fg_output_frame(ofp, NULL);
+ if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) {
+ ret = fg_output_frame(ofp, fgt, NULL, buffer);
return (ret < 0) ? ret : 1;
} else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
return 1;
@@ -2175,22 +2370,18 @@ static int fg_output_step(OutputFilterPriv *ofp, int flush)
av_err2str(ret));
return ret;
}
- if (ost->finished) {
+
+ if (fgt->eof_out[ofp->index]) {
av_frame_unref(frame);
return 0;
}
frame->time_base = av_buffersink_get_time_base(filter);
- if (frame->pts != AV_NOPTS_VALUE) {
- ost->filter->last_pts = av_rescale_q(frame->pts, frame->time_base,
- AV_TIME_BASE_Q);
-
- if (debug_ts)
- av_log(fgp, AV_LOG_INFO, "filter_raw -> pts:%s pts_time:%s time_base:%d/%d\n",
- av_ts2str(frame->pts), av_ts2timestr(frame->pts, &frame->time_base),
- frame->time_base.num, frame->time_base.den);
- }
+ if (debug_ts)
+ av_log(fgp, AV_LOG_INFO, "filter_raw -> pts:%s pts_time:%s time_base:%d/%d\n",
+ av_ts2str(frame->pts), av_ts2timestr(frame->pts, &frame->time_base),
+ frame->time_base.num, frame->time_base.den);
// Choose the output timebase the first time we get a frame.
if (!ofp->tb_out_locked) {
@@ -2223,7 +2414,7 @@ static int fg_output_step(OutputFilterPriv *ofp, int flush)
fd->frame_rate_filter = ofp->fps.framerate;
}
- ret = fg_output_frame(ofp, frame);
+ ret = fg_output_frame(ofp, fgt, frame, buffer);
av_frame_unref(frame);
if (ret < 0)
return ret;
@@ -2231,18 +2422,38 @@ static int fg_output_step(OutputFilterPriv *ofp, int flush)
return 0;
}
-int reap_filters(FilterGraph *fg, int flush)
+/* retrieve all frames available at filtergraph outputs and either send them to
+ * the main thread (buffer=0) or buffer them for later (buffer=1) */
+static int read_frames(FilterGraph *fg, FilterGraphThread *fgt,
+ AVFrame *frame, int buffer)
{
+ FilterGraphPriv *fgp = fgp_from_fg(fg);
+ int ret = 0;
+
if (!fg->graph)
return 0;
+ // process buffered frames
+ if (!buffer) {
+ AVFrame *f;
+
+ while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) {
+ int out_idx = (intptr_t)f->opaque;
+ f->opaque = NULL;
+ ret = tq_send(fgp->queue_out, out_idx, f);
+ av_frame_free(&f);
+ if (ret < 0 && ret != AVERROR_EOF)
+ return ret;
+ }
+ }
+
/* Reap all buffers present in the buffer sinks */
for (int i = 0; i < fg->nb_outputs; i++) {
OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
int ret = 0;
while (!ret) {
- ret = fg_output_step(ofp, flush);
+ ret = fg_output_step(ofp, fgt, frame, buffer);
if (ret < 0)
return ret;
}
@@ -2251,7 +2462,7 @@ int reap_filters(FilterGraph *fg, int flush)
return 0;
}
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
+static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
{
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
int64_t pts2;
@@ -2274,11 +2485,17 @@ void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t
sub2video_update(ifp, pts2 + 1, NULL);
}
-int ifilter_sub2video(InputFilter *ifilter, const AVFrame *frame)
+static int sub2video_frame(InputFilter *ifilter, const AVFrame *frame)
{
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
int ret;
+ // heartbeat frame
+ if (frame && !frame->buf[0]) {
+ sub2video_heartbeat(ifilter, frame->pts, frame->time_base);
+ return 0;
+ }
+
if (ifilter->graph->graph) {
if (!frame) {
if (ifp->sub2video.end_pts < INT64_MAX)
@@ -2307,12 +2524,13 @@ int ifilter_sub2video(InputFilter *ifilter, const AVFrame *frame)
return 0;
}
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
+static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter,
+ int64_t pts, AVRational tb)
{
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
int ret;
- ifp->eof = 1;
+ fgt->eof_in[ifp->index] = 1;
if (ifp->filter) {
pts = av_rescale_q_rnd(pts, tb, ifp->time_base,
@@ -2336,7 +2554,7 @@ int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
return ret;
if (ifilter_has_all_input_formats(ifilter->graph)) {
- ret = configure_filtergraph(ifilter->graph);
+ ret = configure_filtergraph(ifilter->graph, fgt);
if (ret < 0) {
av_log(NULL, AV_LOG_ERROR, "Error initializing filters!\n");
return ret;
@@ -2355,10 +2573,10 @@ int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
return 0;
}
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
+static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
+ InputFilter *ifilter, AVFrame *frame)
{
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
- FilterGraph *fg = ifilter->graph;
AVFrameSideData *sd;
int need_reinit, ret;
@@ -2398,10 +2616,13 @@ int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
/* (re)init the graph if possible, otherwise buffer the frame and return */
if (need_reinit || !fg->graph) {
+ AVFrame *tmp = av_frame_alloc();
+
+ if (!tmp)
+ return AVERROR(ENOMEM);
+
if (!ifilter_has_all_input_formats(fg)) {
- AVFrame *tmp = av_frame_clone(frame);
- if (!tmp)
- return AVERROR(ENOMEM);
+ av_frame_move_ref(tmp, frame);
ret = av_fifo_write(ifp->frame_queue, &tmp, 1);
if (ret < 0)
@@ -2410,27 +2631,18 @@ int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
return ret;
}
- ret = reap_filters(fg, 0);
- if (ret < 0 && ret != AVERROR_EOF) {
- av_log(fg, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
+ ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0;
+ av_frame_free(&tmp);
+ if (ret < 0)
return ret;
- }
- ret = configure_filtergraph(fg);
+ ret = configure_filtergraph(fg, fgt);
if (ret < 0) {
av_log(fg, AV_LOG_ERROR, "Error reinitializing filters!\n");
return ret;
}
}
- if (keep_reference) {
- ret = av_frame_ref(ifp->frame, frame);
- if (ret < 0)
- return ret;
- } else
- av_frame_move_ref(ifp->frame, frame);
- frame = ifp->frame;
-
frame->pts = av_rescale_q(frame->pts, frame->time_base, ifp->time_base);
frame->duration = av_rescale_q(frame->duration, frame->time_base, ifp->time_base);
frame->time_base = ifp->time_base;
@@ -2452,20 +2664,30 @@ int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
return 0;
}
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
+static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt,
+ AVFrame *frame)
{
- FilterGraphPriv *fgp = fgp_from_fg(graph);
- int i, ret;
- int nb_requests, nb_requests_max = 0;
- InputStream *ist;
+ const enum FrameOpaque msg = (intptr_t)frame->opaque;
+ FilterGraph *fg = &fgp->fg;
+ int graph_eof = 0;
+ int ret;
- if (!graph->graph) {
- for (int i = 0; i < graph->nb_inputs; i++) {
- InputFilter *ifilter = graph->inputs[i];
+ frame->opaque = NULL;
+ av_assert0(msg > 0);
+ av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]);
+
+ if (!fg->graph) {
+ // graph not configured yet, ignore all messages other than choosing
+ // the input to read from
+ if (msg != FRAME_OPAQUE_CHOOSE_INPUT)
+ goto done;
+
+ for (int i = 0; i < fg->nb_inputs; i++) {
+ InputFilter *ifilter = fg->inputs[i];
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
- if (ifp->format < 0 && !ifp->eof) {
- *best_ist = ifp->ist;
- return 0;
+ if (ifp->format < 0 && !fgt->eof_in[i]) {
+ frame->opaque = (void*)(intptr_t)(i + 1);
+ goto done;
}
}
@@ -2476,16 +2698,310 @@ int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
return AVERROR_BUG;
}
- *best_ist = NULL;
- ret = avfilter_graph_request_oldest(graph->graph);
- if (ret >= 0)
- return reap_filters(graph, 0);
+ if (msg == FRAME_OPAQUE_SEND_COMMAND) {
+ FilterCommand *fc = (FilterCommand*)frame->buf[0]->data;
+ send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters);
+ av_frame_unref(frame);
+ goto done;
+ }
- if (ret == AVERROR_EOF) {
- reap_filters(graph, 1);
- for (int i = 0; i < graph->nb_outputs; i++) {
- OutputFilter *ofilter = graph->outputs[i];
- OutputFilterPriv *ofp = ofp_from_ofilter(ofilter);
+ if (msg == FRAME_OPAQUE_CHOOSE_INPUT) {
+ ret = avfilter_graph_request_oldest(fg->graph);
+
+ graph_eof = ret == AVERROR_EOF;
+
+ if (ret == AVERROR(EAGAIN)) {
+ frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1);
+ goto done;
+ } else if (ret < 0 && !graph_eof)
+ return ret;
+ }
+
+ ret = read_frames(fg, fgt, frame, 0);
+ if (ret < 0) {
+ av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for encoding\n");
+ return ret;
+ }
+
+ if (graph_eof)
+ return AVERROR_EOF;
+
+ // signal to the main thread that we are done processing the message
+done:
+ ret = tq_send(fgp->queue_out, fg->nb_outputs, frame);
+ if (ret < 0) {
+ if (ret != AVERROR_EOF)
+ av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
+ return ret;
+ }
+
+ return 0;
+}
+
+static void fg_thread_set_name(const FilterGraph *fg)
+{
+ char name[16];
+ if (filtergraph_is_simple(fg)) {
+ OutputStream *ost = fg->outputs[0]->ost;
+ snprintf(name, sizeof(name), "%cf#%d:%d",
+ av_get_media_type_string(ost->type)[0],
+ ost->file_index, ost->index);
+ } else {
+ snprintf(name, sizeof(name), "fc%d", fg->index);
+ }
+
+ ff_thread_setname(name);
+}
+
+static void fg_thread_uninit(FilterGraphThread *fgt)
+{
+ if (fgt->frame_queue_out) {
+ AVFrame *frame;
+ while (av_fifo_read(fgt->frame_queue_out, &frame, 1) >= 0)
+ av_frame_free(&frame);
+ av_fifo_freep2(&fgt->frame_queue_out);
+ }
+
+ av_frame_free(&fgt->frame);
+ av_freep(&fgt->eof_in);
+ av_freep(&fgt->eof_out);
+
+ memset(fgt, 0, sizeof(*fgt));
+}
+
+static int fg_thread_init(FilterGraphThread *fgt, const FilterGraph *fg)
+{
+ memset(fgt, 0, sizeof(*fgt));
+
+ fgt->frame = av_frame_alloc();
+ if (!fgt->frame)
+ goto fail;
+
+ fgt->eof_in = av_calloc(fg->nb_inputs, sizeof(*fgt->eof_in));
+ if (!fgt->eof_in)
+ goto fail;
+
+ fgt->eof_out = av_calloc(fg->nb_outputs, sizeof(*fgt->eof_out));
+ if (!fgt->eof_out)
+ goto fail;
+
+ fgt->frame_queue_out = av_fifo_alloc2(1, sizeof(AVFrame*), AV_FIFO_FLAG_AUTO_GROW);
+ if (!fgt->frame_queue_out)
+ goto fail;
+
+ return 0;
+
+fail:
+ fg_thread_uninit(fgt);
+ return AVERROR(ENOMEM);
+}
+
+static void *filter_thread(void *arg)
+{
+ FilterGraphPriv *fgp = arg;
+ FilterGraph *fg = &fgp->fg;
+
+ FilterGraphThread fgt;
+ int ret = 0, input_status = 0;
+
+ ret = fg_thread_init(&fgt, fg);
+ if (ret < 0)
+ goto finish;
+
+ fg_thread_set_name(fg);
+
+ // if we have all input parameters the graph can now be configured
+ if (ifilter_has_all_input_formats(fg)) {
+ ret = configure_filtergraph(fg, &fgt);
+ if (ret < 0) {
+ av_log(fg, AV_LOG_ERROR, "Error configuring filter graph: %s\n",
+ av_err2str(ret));
+ goto finish;
+ }
+ }
+
+ while (1) {
+ InputFilter *ifilter;
+ InputFilterPriv *ifp;
+ enum FrameOpaque o;
+ int input_idx, eof_frame;
+
+ input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
+ if (input_idx < 0 ||
+ (input_idx == fg->nb_inputs && input_status < 0)) {
+ av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n");
+ break;
+ }
+
+ o = (intptr_t)fgt.frame->opaque;
+
+ // message on the control stream
+ if (input_idx == fg->nb_inputs) {
+ ret = msg_process(fgp, &fgt, fgt.frame);
+ if (ret < 0)
+ goto finish;
+
+ continue;
+ }
+
+ // we received an input frame or EOF
+ ifilter = fg->inputs[input_idx];
+ ifp = ifp_from_ifilter(ifilter);
+ eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF;
+ if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) {
+ int hb_frame = input_status >= 0 && o == FRAME_OPAQUE_SUB_HEARTBEAT;
+ ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || hb_frame) ? fgt.frame : NULL);
+ } else if (input_status >= 0 && fgt.frame->buf[0]) {
+ ret = send_frame(fg, &fgt, ifilter, fgt.frame);
+ } else {
+ int64_t pts = input_status >= 0 ? fgt.frame->pts : AV_NOPTS_VALUE;
+ AVRational tb = input_status >= 0 ? fgt.frame->time_base : (AVRational){ 1, 1 };
+ ret = send_eof(&fgt, ifilter, pts, tb);
+ }
+ av_frame_unref(fgt.frame);
+ if (ret < 0)
+ break;
+
+ if (eof_frame) {
+ // an EOF frame is immediately followed by sender closing
+ // the corresponding stream, so retrieve that event
+ input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
+ av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index);
+ }
+
+ // signal to the main thread that we are done
+ ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame);
+ if (ret < 0) {
+ if (ret == AVERROR_EOF)
+ break;
+
+ av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
+ goto finish;
+ }
+ }
+
+finish:
+ // EOF is normal termination
+ if (ret == AVERROR_EOF)
+ ret = 0;
+
+ for (int i = 0; i <= fg->nb_inputs; i++)
+ tq_receive_finish(fgp->queue_in, i);
+ for (int i = 0; i <= fg->nb_outputs; i++)
+ tq_send_finish(fgp->queue_out, i);
+
+ fg_thread_uninit(&fgt);
+
+ av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n");
+
+ return (void*)(intptr_t)ret;
+}
+
+static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter,
+ AVFrame *frame, enum FrameOpaque type)
+{
+ InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
+ int output_idx, ret;
+
+ if (ifp->eof) {
+ av_frame_unref(frame);
+ return AVERROR_EOF;
+ }
+
+ frame->opaque = (void*)(intptr_t)type;
+
+ ret = tq_send(fgp->queue_in, ifp->index, frame);
+ if (ret < 0) {
+ ifp->eof = 1;
+ av_frame_unref(frame);
+ return ret;
+ }
+
+ if (type == FRAME_OPAQUE_EOF)
+ tq_send_finish(fgp->queue_in, ifp->index);
+
+ // wait for the frame to be processed
+ ret = tq_receive(fgp->queue_out, &output_idx, frame);
+ av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
+
+ return ret;
+}
+
+int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
+{
+ FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
+ int ret;
+
+ if (keep_reference) {
+ ret = av_frame_ref(fgp->frame, frame);
+ if (ret < 0)
+ return ret;
+ } else
+ av_frame_move_ref(fgp->frame, frame);
+
+ return thread_send_frame(fgp, ifilter, fgp->frame, 0);
+}
+
+int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
+{
+ FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
+ int ret;
+
+ fgp->frame->pts = pts;
+ fgp->frame->time_base = tb;
+
+ ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF);
+
+ return ret == AVERROR_EOF ? 0 : ret;
+}
+
+void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
+{
+ FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
+
+ fgp->frame->pts = pts;
+ fgp->frame->time_base = tb;
+
+ thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT);
+}
+
+int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
+{
+ FilterGraphPriv *fgp = fgp_from_fg(graph);
+ int ret, got_frames = 0;
+
+ if (fgp->eof_in)
+ return AVERROR_EOF;
+
+ // signal to the filtering thread to return all frames it can
+ av_assert0(!fgp->frame->buf[0]);
+ fgp->frame->opaque = (void*)(intptr_t)(best_ist ?
+ FRAME_OPAQUE_CHOOSE_INPUT :
+ FRAME_OPAQUE_REAP_FILTERS);
+
+ ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame);
+ if (ret < 0) {
+ fgp->eof_in = 1;
+ goto finish;
+ }
+
+ while (1) {
+ OutputFilter *ofilter;
+ OutputFilterPriv *ofp;
+ OutputStream *ost;
+ int output_idx;
+
+ ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
+
+ // EOF on the whole queue or the control stream
+ if (output_idx < 0 ||
+ (ret < 0 && output_idx == graph->nb_outputs))
+ goto finish;
+
+ // EOF for a specific stream
+ if (ret < 0) {
+ ofilter = graph->outputs[output_idx];
+ ofp = ofp_from_ofilter(ofilter);
// we are finished and no frames were ever seen at this output,
// at least initialize the encoder with a dummy frame
@@ -2523,30 +3039,111 @@ int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
av_frame_unref(frame);
}
- close_output_stream(ofilter->ost);
- }
- return 0;
- }
- if (ret != AVERROR(EAGAIN))
- return ret;
-
- for (i = 0; i < graph->nb_inputs; i++) {
- InputFilter *ifilter = graph->inputs[i];
- InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
-
- ist = ifp->ist;
- if (input_files[ist->file_index]->eagain || ifp->eof)
+ close_output_stream(graph->outputs[output_idx]->ost);
continue;
- nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter);
- if (nb_requests > nb_requests_max) {
- nb_requests_max = nb_requests;
- *best_ist = ist;
}
+
+ // request was fully processed by the filtering thread,
+ // return the input stream to read from, if needed
+ if (output_idx == graph->nb_outputs) {
+ int input_idx = (intptr_t)fgp->frame->opaque - 1;
+ av_assert0(input_idx <= graph->nb_inputs);
+
+ if (best_ist) {
+ *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ?
+ ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL;
+
+ if (input_idx < 0 && !got_frames) {
+ for (int i = 0; i < graph->nb_outputs; i++)
+ graph->outputs[i]->ost->unavailable = 1;
+ }
+ }
+ break;
+ }
+
+ // got a frame from the filtering thread, send it for encoding
+ ofilter = graph->outputs[output_idx];
+ ost = ofilter->ost;
+ ofp = ofp_from_ofilter(ofilter);
+
+ if (ost->finished) {
+ av_frame_unref(fgp->frame);
+ tq_receive_finish(fgp->queue_out, output_idx);
+ continue;
+ }
+
+ if (fgp->frame->pts != AV_NOPTS_VALUE) {
+ ofilter->last_pts = av_rescale_q(fgp->frame->pts,
+ fgp->frame->time_base,
+ AV_TIME_BASE_Q);
+ }
+
+ ret = enc_frame(ost, fgp->frame);
+ av_frame_unref(fgp->frame);
+ if (ret < 0)
+ goto finish;
+
+ ofp->got_frame = 1;
+ got_frames = 1;
}
- if (!*best_ist)
- for (i = 0; i < graph->nb_outputs; i++)
- graph->outputs[i]->ost->unavailable = 1;
+finish:
+ if (ret < 0) {
+ fgp->eof_in = 1;
+ for (int i = 0; i < graph->nb_outputs; i++)
+ close_output_stream(graph->outputs[i]->ost);
+ }
- return 0;
+ return ret;
+}
+
+int reap_filters(FilterGraph *fg, int flush)
+{
+ return fg_transcode_step(fg, NULL);
+}
+
+void fg_send_command(FilterGraph *fg, double time, const char *target,
+ const char *command, const char *arg, int all_filters)
+{
+ FilterGraphPriv *fgp = fgp_from_fg(fg);
+ AVBufferRef *buf;
+ FilterCommand *fc;
+ int output_idx, ret;
+
+ if (!fgp->queue_in)
+ return;
+
+ fc = av_mallocz(sizeof(*fc));
+ if (!fc)
+ return;
+
+ buf = av_buffer_create((uint8_t*)fc, sizeof(*fc), filter_command_free, NULL, 0);
+ if (!buf) {
+ av_freep(&fc);
+ return;
+ }
+
+ fc->target = av_strdup(target);
+ fc->command = av_strdup(command);
+ fc->arg = av_strdup(arg);
+ if (!fc->target || !fc->command || !fc->arg) {
+ av_buffer_unref(&buf);
+ return;
+ }
+
+ fc->time = time;
+ fc->all_filters = all_filters;
+
+ fgp->frame->buf[0] = buf;
+ fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND;
+
+ ret = tq_send(fgp->queue_in, fg->nb_inputs + 1, fgp->frame);
+ if (ret < 0) {
+ av_frame_unref(fgp->frame);
+ return;
+ }
+
+ // wait for the frame to be processed
+ ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
+ av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
}
--
2.42.0
More information about the ffmpeg-devel
mailing list