[FFmpeg-devel] [PATCH v4 2/4] lavf/dashdec: Multithreaded DASH initialization
Lukas Fellechner
lukas.fellechner at gmx.net
Tue Sep 6 00:16:32 EEST 2022
This patch adds an "init_threads" option, specifying the max
number of threads to use. Multiple worker threads are spun up
to massively bring down init times.
---
libavformat/dashdec.c | 286 +++++++++++++++++++++++++++++++++++++++++-
1 file changed, 285 insertions(+), 1 deletion(-)
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index e82da45e43..0532e2c918 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -24,6 +24,8 @@
#include "libavutil/opt.h"
#include "libavutil/time.h"
#include "libavutil/parseutils.h"
+#include "libavutil/thread.h"
+#include "libavutil/slicethread.h"
#include "internal.h"
#include "avio_internal.h"
#include "dash.h"
@@ -152,6 +154,8 @@ typedef struct DASHContext {
int max_url_size;
char *cenc_decryption_key;
+ int init_threads;
+
/* Flags for init section*/
int is_init_section_common_video;
int is_init_section_common_audio;
@@ -2033,6 +2037,265 @@ static void move_metadata(AVStream *st, const char *key, char **value)
}
}
+#if HAVE_THREADS
+
+typedef struct WorkPoolData
+{
+ AVFormatContext *ctx;
+ struct representation *pls;
+ struct representation *common_pls;
+ pthread_mutex_t *common_mutex;
+ pthread_cond_t *common_condition;
+ int is_common;
+ int is_started;
+ int result;
+} WorkPoolData;
+
+static void thread_worker(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads)
+{
+ WorkPoolData *work_pool = (WorkPoolData*)priv;
+ WorkPoolData *data = work_pool + jobnr;
+ int ret;
+
+ // if we are common section provider, init and signal
+ if (data->is_common) {
+ data->pls->parent = data->ctx;
+ ret = update_init_section(data->pls);
+ if (ret < 0) {
+ pthread_cond_signal(data->common_condition);
+ goto end;
+ }
+ else
+ ret = AVERROR(pthread_cond_signal(data->common_condition));
+ }
+
+ // if we depend on common section provider, wait for signal and copy
+ if (data->common_pls) {
+ ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
+ if (ret < 0)
+ goto end;
+
+ if (!data->common_pls->init_sec_buf) {
+ goto end;
+ ret = AVERROR(EFAULT);
+ }
+
+ ret = copy_init_section(data->pls, data->common_pls);
+ if (ret < 0)
+ goto end;
+ }
+
+ ret = begin_open_demux_for_component(data->ctx, data->pls);
+ if (ret < 0)
+ goto end;
+
+end:
+ data->result = ret;
+}
+
+static void create_work_pool_data(AVFormatContext *ctx, int *stream_index,
+ struct representation **streams, int num_streams, int is_init_section_common,
+ WorkPoolData *work_pool, pthread_mutex_t* common_mutex,
+ pthread_cond_t* common_condition)
+{
+ work_pool += *stream_index;
+
+ for (int i = 0; i < num_streams; i++) {
+ work_pool->ctx = ctx;
+ work_pool->pls = streams[i];
+ work_pool->pls->stream_index = *stream_index;
+ work_pool->common_condition = common_condition;
+ work_pool->common_mutex = common_mutex;
+ work_pool->result = -1;
+
+ if (is_init_section_common) {
+ if (i == 0)
+ work_pool->is_common = 1;
+ else
+ work_pool->common_pls = streams[0];
+ }
+
+ work_pool++;
+ *stream_index = *stream_index + 1;
+ }
+}
+
+static pthread_mutex_t* create_mutex()
+{
+ pthread_mutex_t* mutex = (pthread_mutex_t*)av_malloc(sizeof(pthread_mutex_t));
+ if (!mutex)
+ return NULL;
+
+ if (pthread_mutex_init(mutex, NULL)) {
+ av_free(mutex);
+ return NULL;
+ }
+
+ return mutex;
+}
+
+static int free_mutex(pthread_mutex_t **mutex)
+{
+ int ret = 0;
+ if (*mutex) {
+ ret = pthread_mutex_destroy(*mutex);
+ av_free(*mutex);
+ *mutex = NULL;
+ }
+ return ret;
+}
+
+static pthread_cond_t* create_cond()
+{
+ pthread_cond_t* cond = (pthread_cond_t*)av_malloc(sizeof(pthread_cond_t));
+ if (!cond)
+ return NULL;
+
+ if (pthread_cond_init(cond, NULL)) {
+ av_free(cond);
+ return NULL;
+ }
+
+ return cond;
+}
+
+static int free_cond(pthread_cond_t **cond)
+{
+ int ret = 0;
+ if (*cond) {
+ ret = pthread_cond_destroy(*cond);
+ av_free(*cond);
+ *cond = NULL;
+ }
+ return ret;
+}
+
+static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
+{
+ DASHContext *c = s->priv_data;
+ int ret = 0;
+ int stream_index = 0;
+ AVSliceThread *slice_thread;
+
+ // we need to cleanup even in case of errors,
+ // so we need to store results of run and cleanup phase
+ int initResult = 0;
+ int runResult = 0;
+ int cleanupResult = 0;
+
+ // alloc data
+ WorkPoolData *work_pool = (WorkPoolData*)av_mallocz(
+ sizeof(WorkPoolData) * nstreams);
+ if (!work_pool)
+ return AVERROR(ENOMEM);
+
+ if (!avpriv_slicethread_create(&slice_thread, (void*)work_pool, &thread_worker, NULL, threads)) {
+ av_free(work_pool);
+ return AVERROR(ENOMEM);
+}
+
+ // alloc mutex and conditions
+ c->init_mutex = create_mutex();
+
+ pthread_mutex_t *common_video_mutex = create_mutex();
+ pthread_cond_t *common_video_cond = create_cond();
+
+ pthread_mutex_t *common_audio_mutex = create_mutex();
+ pthread_cond_t *common_audio_cond = create_cond();
+
+ pthread_mutex_t *common_subtitle_mutex = create_mutex();
+ pthread_cond_t *common_subtitle_cond = create_cond();
+
+ if (!(c->init_mutex && common_video_mutex && common_video_cond && common_audio_mutex &&
+ common_audio_cond && common_subtitle_mutex && common_subtitle_cond)) {
+ initResult = AVERROR(ENOMEM);
+ goto cleanup;
+ }
+
+ // set work pool data
+ create_work_pool_data(s, &stream_index, c->videos, c->n_videos,
+ c->is_init_section_common_video, work_pool,
+ common_video_mutex, common_video_cond);
+
+ create_work_pool_data(s, &stream_index, c->audios, c->n_audios,
+ c->is_init_section_common_audio, work_pool,
+ common_audio_mutex, common_audio_cond);
+
+ create_work_pool_data(s, &stream_index, c->subtitles, c->n_subtitles,
+ c->is_init_section_common_subtitle, work_pool,
+ common_subtitle_mutex, common_subtitle_cond);
+
+ // run threads
+ avpriv_slicethread_execute(slice_thread, nstreams, 0);
+
+ // finalize streams and collect results
+ WorkPoolData* current_data = work_pool;
+ for (int i = 0; i < nstreams; i++) {
+ if (current_data->result < 0) {
+ // thread ran into error: collect result and break
+ runResult = current_data->result;
+ break;
+ }
+ else {
+ // thread success: create streams on AVFormatContext
+ ret = end_open_demux_for_component(s, current_data->pls);
+ if (ret < 0) {
+ runResult = ret;
+ break;
+ }
+ }
+ current_data++;
+ }
+
+cleanup:
+ // cleanup mutex and conditions
+ ret = free_mutex(&c->init_mutex);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = free_mutex(&common_video_mutex);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = free_cond(&common_video_cond);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = free_mutex(&common_audio_mutex);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = free_cond(&common_audio_cond);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = free_mutex(&common_subtitle_mutex);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = free_cond(&common_subtitle_cond);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ // cleanup threads and workpool
+ av_free(work_pool);
+ avpriv_slicethread_free(&slice_thread);
+
+ // return results if errors have occured in one of the phases
+ if (initResult < 0)
+ return initResult;
+
+ if (runResult < 0)
+ return runResult;
+
+ if (cleanupResult < 0)
+ return cleanupResult;
+
+ return 0;
+}
+
+#endif
+
static int dash_read_header(AVFormatContext *s)
{
DASHContext *c = s->priv_data;
@@ -2067,6 +2330,23 @@ static int dash_read_header(AVFormatContext *s)
if (c->n_subtitles)
c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+ int threads = 1;
+ int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
+
+#if HAVE_THREADS
+ threads = FFMIN(nstreams, c->init_threads);
+#endif
+
+ if (threads > 1)
+ {
+#if HAVE_THREADS
+ ret = init_streams_multithreaded(s, nstreams, threads);
+ if (ret < 0)
+ return ret;
+#endif
+ }
+ else
+ {
/* Open the demuxer for video and audio components if available */
for (i = 0; i < c->n_videos; i++) {
rep = c->videos[i];
@@ -2115,6 +2395,7 @@ static int dash_read_header(AVFormatContext *s)
if (!stream_index)
return AVERROR_INVALIDDATA;
+ }
/* Create a program */
program = av_new_program(s, 0);
@@ -2366,7 +2647,10 @@ static const AVOption dash_options[] = {
OFFSET(allowed_extensions), AV_OPT_TYPE_STRING,
{.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
INT_MIN, INT_MAX, FLAGS},
- { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+ { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key),
+ AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+ { "init_threads", "Number of threads to use for initializing the DASH stream",
+ OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 1}, 1, INT_MAX, FLAGS },
{NULL}
};
--
2.28.0.windows.1
More information about the ffmpeg-devel
mailing list