[FFmpeg-devel] [PATCH] lavd: implement threaded NewTek NDI output

Maksym Veremeyenko verem at m1stereo.tv
Tue Sep 5 10:09:54 EEST 2017


04.09.2017 17:10, Maksym Veremeyenko пише:
> Hi,
> 
> attached patch implemented threaded NDI output - separate output thread 
> for each stream. it makes audio preview in my case more smooth.

updated patch allows running audio/video threads separately

please review

-- 
Maksym Veremeyenko

-------------- next part --------------
From 67ee166a5504d72294638125b210f58c88973a54 Mon Sep 17 00:00:00 2001
From: Maksym Veremeyenko <verem at m1.tv>
Date: Tue, 5 Sep 2017 03:04:11 -0400
Subject: [PATCH 1/3] lavd: implement threaded NewTek NDI output

---
 configure                       |   2 +-
 doc/outdevs.texi                |   8 ++
 libavdevice/libndi_newtek_enc.c | 171 +++++++++++++++++++++++++++++++++++++++-
 3 files changed, 178 insertions(+), 3 deletions(-)

diff --git a/configure b/configure
index d582705..7626901 100755
--- a/configure
+++ b/configure
@@ -3019,7 +3019,7 @@ decklink_outdev_deps="decklink threads"
 decklink_outdev_extralibs="-lstdc++"
 libndi_newtek_indev_deps="libndi_newtek"
 libndi_newtek_indev_extralibs="-lndi"
-libndi_newtek_outdev_deps="libndi_newtek"
+libndi_newtek_outdev_deps="libndi_newtek threads"
 libndi_newtek_outdev_extralibs="-lndi"
 dshow_indev_deps="IBaseFilter"
 dshow_indev_extralibs="-lpsapi -lole32 -lstrmiids -luuid -loleaut32 -lshlwapi"
diff --git a/doc/outdevs.texi b/doc/outdevs.texi
index 0012b0f..595864b 100644
--- a/doc/outdevs.texi
+++ b/doc/outdevs.texi
@@ -213,6 +213,14 @@ Defaults to @option{false}.
 These specify whether audio "clock" themselves.
 Defaults to @option{false}.
 
+ at item video_queue
+Enable video packets output in separate thread. Specify video packets queue length.
+Defaults to @option{0}.
+
+ at item audio_queue
+Enable audio packets output in separate thread. Specify audio packets queue length.
+Defaults to @option{0}.
+
 @end table
 
 @subsection Examples
diff --git a/libavdevice/libndi_newtek_enc.c b/libavdevice/libndi_newtek_enc.c
index 6ca6f41..f8af851 100644
--- a/libavdevice/libndi_newtek_enc.c
+++ b/libavdevice/libndi_newtek_enc.c
@@ -23,9 +23,16 @@
 #include "libavformat/internal.h"
 #include "libavutil/opt.h"
 #include "libavutil/imgutils.h"
+#include "libavutil/threadmessage.h"
 
 #include "libndi_newtek_common.h"
 
+#include <pthread.h>
+
+#define THREAD_VIDEO    0
+#define THREAD_AUDIO    1
+#define THREAD_LAST     2
+
 struct NDIContext {
     const AVClass *cclass;
 
@@ -37,12 +44,104 @@ struct NDIContext {
     NDIlib_audio_frame_interleaved_16s_t *audio;
     NDIlib_send_instance_t ndi_send;
     AVFrame *last_avframe;
+
+    /* threaded operations */
+    AVFormatContext *avctx;
+    struct
+    {
+        int length;
+        AVThreadMessageQueue *queue;
+        pthread_t thread;
+    } threads[THREAD_LAST];
 };
 
+static int ndi_write_video_packet(AVFormatContext *avctx, AVStream *st, AVPacket *pkt);
+static int ndi_write_audio_packet(AVFormatContext *avctx, AVStream *st, AVPacket *pkt);
+
+static void* ndi_thread_audio(void* p)
+{
+    int ret;
+    AVPacket pkt;
+    struct NDIContext *ctx = p;
+
+    av_log(ctx->avctx, AV_LOG_DEBUG, "%s: entering\n", __func__);
+
+    while (1) {
+        ret = av_thread_message_queue_recv(ctx->threads[THREAD_AUDIO].queue, &pkt, 0);
+        if (ret < 0) {
+            if (ret != AVERROR_EOF)
+                av_log(ctx->avctx, AV_LOG_ERROR, "Failed av_thread_message_queue_recv of audio queue.\n");
+            break;
+        }
+
+        ret = ndi_write_audio_packet(ctx->avctx, ctx->avctx->streams[pkt.stream_index], &pkt);
+        av_packet_unref(&pkt);
+        if (ret) {
+            av_log(ctx->avctx, AV_LOG_ERROR, "Failed ndi_write_audio_packet.\n");
+            break;
+        }
+    }
+
+    av_log(ctx->avctx, AV_LOG_DEBUG, "%s: exiting, ret=%d\n", __func__, ret);
+
+    return NULL;
+}
+
+static void* ndi_thread_video(void* p)
+{
+    int ret;
+    AVPacket pkt;
+    struct NDIContext *ctx = p;
+
+    av_log(ctx->avctx, AV_LOG_DEBUG, "%s: entering\n", __func__);
+
+    while (1) {
+        ret = av_thread_message_queue_recv(ctx->threads[THREAD_VIDEO].queue, &pkt, 0);
+        if (ret < 0) {
+            if (ret != AVERROR_EOF)
+                av_log(ctx->avctx, AV_LOG_ERROR, "Failed av_thread_message_queue_recv of video queue.\n");
+            break;
+        }
+
+        ret = ndi_write_video_packet(ctx->avctx, ctx->avctx->streams[pkt.stream_index], &pkt);
+        av_packet_unref(&pkt);
+        if (ret) {
+            av_log(ctx->avctx, AV_LOG_ERROR, "Failed ndi_write_video_packet.\n");
+            break;
+        }
+    }
+
+    av_log(ctx->avctx, AV_LOG_DEBUG, "%s: exiting, ret=%d\n", __func__, ret);
+
+    return NULL;
+}
+
 static int ndi_write_trailer(AVFormatContext *avctx)
 {
+    int i;
     struct NDIContext *ctx = avctx->priv_data;
 
+    for (i = 0; i < THREAD_LAST; i++)
+    {
+        AVPacket pkt;
+
+        if (!ctx->threads[i].queue)
+            continue;
+
+        av_log(ctx->avctx, AV_LOG_DEBUG, "%s: freeing queue %d\n", __func__, i);
+
+        av_thread_message_queue_set_err_recv(ctx->threads[i].queue, AVERROR_EOF);
+
+        pthread_join(ctx->threads[i].thread, NULL);
+
+        while (av_thread_message_queue_recv(ctx->threads[i].queue, &pkt, 0) >= 0) {
+            av_log(ctx->avctx, AV_LOG_DEBUG, "%s: freeing packet queue %d\n", __func__, i);
+            av_packet_unref(&pkt);
+        }
+
+        av_thread_message_queue_free(&ctx->threads[i].queue);
+    }
+
     if (ctx->ndi_send) {
         NDIlib_send_destroy(ctx->ndi_send);
         av_frame_free(&ctx->last_avframe);
@@ -119,18 +218,47 @@ static int ndi_write_audio_packet(AVFormatContext *avctx, AVStream *st, AVPacket
 
 static int ndi_write_packet(AVFormatContext *avctx, AVPacket *pkt)
 {
+    struct NDIContext *ctx = avctx->priv_data;
     AVStream *st = avctx->streams[pkt->stream_index];
+    AVThreadMessageQueue *queue;
 
     if      (st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO)
-        return ndi_write_video_packet(avctx, st, pkt);
+        queue = ctx->threads[THREAD_VIDEO].queue;
     else if (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO)
-        return ndi_write_audio_packet(avctx, st, pkt);
+        queue = ctx->threads[THREAD_AUDIO].queue;
+
+    if (queue) {
+
+        int ret;
+        AVPacket enq;
+
+        av_init_packet(&enq);
+
+        ret = av_packet_ref(&enq, pkt);
+        if (ret)
+            return ret;
+
+        ret = av_thread_message_queue_send(queue, &enq, 0);
+        if (ret) {
+            av_packet_unref(&enq);
+            return ret;
+        }
+
+        return 0;
+
+    } else {
+        if      (st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO)
+            return ndi_write_video_packet(avctx, st, pkt);
+        else if (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO)
+            return ndi_write_audio_packet(avctx, st, pkt);
+    }
 
     return AVERROR_BUG;
 }
 
 static int ndi_setup_audio(AVFormatContext *avctx, AVStream *st)
 {
+    int ret;
     struct NDIContext *ctx = avctx->priv_data;
     AVCodecParameters *c = st->codecpar;
 
@@ -149,11 +277,30 @@ static int ndi_setup_audio(AVFormatContext *avctx, AVStream *st)
 
     avpriv_set_pts_info(st, 64, 1, NDI_TIME_BASE);
 
+    if (ctx->threads[THREAD_AUDIO].length) {
+        ret = av_thread_message_queue_alloc(&ctx->threads[THREAD_AUDIO].queue,
+            ctx->threads[THREAD_AUDIO].length, sizeof(AVPacket));
+        if (ret) {
+            av_log(avctx, AV_LOG_ERROR, "Failed to av_thread_message_queue_alloc!\n");
+            return ret;
+        }
+
+        ret = pthread_create(&ctx->threads[THREAD_AUDIO].thread, NULL, ndi_thread_audio, ctx);
+        if (ret) {
+            av_log(NULL, AV_LOG_ERROR, "Failed to pthread_create: %s\n", strerror(ret));
+            av_thread_message_queue_free(&ctx->threads[THREAD_AUDIO].queue);
+            return AVERROR(ret);
+        }
+
+        ctx->avctx = avctx;
+    }
+
     return 0;
 }
 
 static int ndi_setup_video(AVFormatContext *avctx, AVStream *st)
 {
+    int ret;
     struct NDIContext *ctx = avctx->priv_data;
     AVCodecParameters *c = st->codecpar;
 
@@ -225,6 +372,24 @@ static int ndi_setup_video(AVFormatContext *avctx, AVStream *st)
 
     avpriv_set_pts_info(st, 64, 1, NDI_TIME_BASE);
 
+    if (ctx->threads[THREAD_VIDEO].length) {
+        ret = av_thread_message_queue_alloc(&ctx->threads[THREAD_VIDEO].queue,
+            ctx->threads[THREAD_VIDEO].length, sizeof(AVPacket));
+        if (ret) {
+            av_log(avctx, AV_LOG_ERROR, "Failed to av_thread_message_queue_alloc!\n");
+            return ret;
+        }
+
+        ret = pthread_create(&ctx->threads[THREAD_VIDEO].thread, NULL, ndi_thread_video, ctx);
+        if (ret) {
+            av_log(NULL, AV_LOG_ERROR, "Failed to pthread_create: %s\n", strerror(ret));
+            av_thread_message_queue_free(&ctx->threads[THREAD_VIDEO].queue);
+            return AVERROR(ret);
+        }
+
+        ctx->avctx = avctx;
+    }
+
     return 0;
 }
 
@@ -273,6 +438,8 @@ static const AVOption options[] = {
     { "reference_level", "The audio reference level in dB"  , OFFSET(reference_level), AV_OPT_TYPE_INT, { .i64 = 0 }, -20, 20, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_AUDIO_PARAM},
     { "clock_video", "These specify whether video 'clock' themselves"  , OFFSET(clock_video), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_VIDEO_PARAM },
     { "clock_audio", "These specify whether audio 'clock' themselves"  , OFFSET(clock_audio), AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_AUDIO_PARAM },
+    { "video_queue", "Video queue length", OFFSET(threads[THREAD_VIDEO].length), AV_OPT_TYPE_INT, { .i64 = 0 }, 1, 128, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_VIDEO_PARAM},
+    { "audio_queue", "Audio queue length", OFFSET(threads[THREAD_AUDIO].length), AV_OPT_TYPE_INT, { .i64 = 0 }, 1, 128, AV_OPT_FLAG_ENCODING_PARAM | AV_OPT_FLAG_AUDIO_PARAM},
     { NULL },
 };
 
-- 
1.8.3.1



More information about the ffmpeg-devel mailing list