[FFmpeg-devel] [PATCH 2/2] fate/api: test threadmessage

Clément Bœsch u at pkh.me
Mon Nov 30 18:40:07 CET 2015


From: Clément Bœsch <clement at stupeflix.com>

---
 tests/api/Makefile                 |   1 +
 tests/api/api-threadmessage-test.c | 202 +++++++++++++++++++++++++++++++++++++
 tests/fate/api.mak                 |   6 ++
 3 files changed, 209 insertions(+)
 create mode 100644 tests/api/api-threadmessage-test.c

diff --git a/tests/api/Makefile b/tests/api/Makefile
index c48c34a..3556a9b 100644
--- a/tests/api/Makefile
+++ b/tests/api/Makefile
@@ -3,6 +3,7 @@ APITESTPROGS-$(call DEMDEC, H264, H264) += api-h264
 APITESTPROGS-yes += api-seek
 APITESTPROGS-yes += api-codec-param
 APITESTPROGS-$(call DEMDEC, H263, H263) += api-band
+APITESTPROGS-yes += api-threadmessage
 APITESTPROGS += $(APITESTPROGS-yes)
 
 APITESTOBJS  := $(APITESTOBJS:%=$(APITESTSDIR)%) $(APITESTPROGS:%=$(APITESTSDIR)/%-test.o)
diff --git a/tests/api/api-threadmessage-test.c b/tests/api/api-threadmessage-test.c
new file mode 100644
index 0000000..abf0154
--- /dev/null
+++ b/tests/api/api-threadmessage-test.c
@@ -0,0 +1,202 @@
+/*
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+/**
+ * Thread message API test
+ */
+
+#include <pthread.h>
+
+#include "libavutil/frame.h"
+#include "libavutil/avstring.h"
+#include "libavutil/threadmessage.h"
+
+struct tdata {
+    int id;
+    pthread_t tid;
+    int workload;
+    AVThreadMessageQueue *queue;
+};
+
+static void free_frame(void *arg)
+{
+    AVFrame *frame = arg;
+    av_frame_free(&frame);
+}
+
+/* Frame producing thread. Will flush the queue half way just to be a jerk */
+static void *worker_thread(void *arg)
+{
+    int i, ret;
+    struct tdata *td = arg;
+
+    av_log(NULL, AV_LOG_INFO, "worker #%d: workload=%d\n", td->id, td->workload);
+    for (i = 0; i < td->workload; i++) {
+        if (i == td->workload/2) {
+            av_log(NULL, AV_LOG_INFO, "worker #%d: flushing the queue\n", td->id);
+            av_thread_message_flush(td->queue);
+        } else {
+            char *val;
+            AVDictionary *meta = NULL;
+            AVFrame *frame = av_frame_alloc();
+            if (!frame) {
+                ret = AVERROR(ENOMEM);
+                break;
+            }
+
+            /* we add some metadata to identify the frames */
+            val = av_asprintf("frame from worker %d", td->id);
+            if (!val) {
+                av_frame_free(&frame);
+                ret = AVERROR(ENOMEM);
+                break;
+            }
+            ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
+            if (ret < 0) {
+                av_frame_free(&frame);
+                break;
+            }
+            av_frame_set_metadata(frame, meta);
+
+            /* allocate a real frame in order to simulate "real" work */
+            frame->format = AV_PIX_FMT_RGBA;
+            frame->width  = 320;
+            frame->height = 240;
+            ret = av_frame_get_buffer(frame, 32);
+            if (ret < 0) {
+                av_frame_free(&frame);
+                break;
+            }
+
+            /* push the frame in the common queue */
+            av_log(NULL, AV_LOG_INFO, "worker #%d: sending my work (%p), %d left\n",
+                   td->id, frame, td->workload - i - 1);
+            ret = av_thread_message_queue_send(td->queue, &frame, 0);
+            if (ret < 0) {
+                av_frame_free(&frame);
+                break;
+            }
+        }
+    }
+    av_log(NULL, AV_LOG_INFO, "worker #%d: my work is done here (%s)\n",
+           td->id, av_err2str(ret));
+    av_thread_message_queue_set_err_recv(td->queue, ret < 0 ? ret : AVERROR_EOF);
+    return NULL;
+}
+
+static int consume_queue(AVThreadMessageQueue *q, int n)
+{
+    int i, ret = 0;
+
+    for (i = 0; i < n; i++) {
+        AVFrame *frame;
+        AVDictionary *meta;
+        AVDictionaryEntry *e;
+
+        ret = av_thread_message_queue_recv(q, &frame, 0);
+        if (ret < 0)
+            break;
+        meta = av_frame_get_metadata(frame);
+        e = av_dict_get(meta, "sig", NULL, 0);
+        av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, frame);
+        av_frame_free(&frame);
+    }
+    av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
+    av_thread_message_queue_set_err_send(q, ret < 0 ? ret : AVERROR_EOF);
+    return ret;
+}
+
+int main(int ac, char **av)
+{
+    int i, ret = 0;
+    int nb_workers, max_queue_size;
+    int worker_min_production, worker_max_production;
+    int stop_production;
+    struct tdata *workers;
+    AVThreadMessageQueue *queue = NULL;
+
+    if (ac != 6) {
+        av_log(NULL, AV_LOG_ERROR, "%s <nb_workers> <max_queue_size> "
+               "<worker_min_production> <worker_max_production> "
+               "<stop_production>\n", av[0]);
+        return 1;
+    }
+
+    nb_workers            = atoi(av[1]);
+    max_queue_size        = atoi(av[2]);
+    worker_min_production = atoi(av[3]);
+    worker_max_production = atoi(av[4]);
+    stop_production       = atoi(av[5]);
+
+    av_log(NULL, AV_LOG_INFO, "%d workers with producing range [%d;%d] on a queue of size %d, "
+           "will stop after %d is globally produced\n",
+           nb_workers, worker_min_production, worker_max_production,
+           max_queue_size, stop_production);
+
+    workers = av_mallocz_array(nb_workers, sizeof(*workers));
+    if (!workers) {
+        ret = AVERROR(ENOMEM);
+        goto end;
+    }
+
+    ret = av_thread_message_queue_alloc2(&queue, max_queue_size, sizeof(AVFrame*), free_frame);
+    if (ret < 0)
+        goto end;
+
+    for (i = 0; i < nb_workers; i++) {
+        struct tdata *td = &workers[i];
+
+        td->id = i;
+        td->queue = queue;
+        td->workload = worker_max_production == worker_min_production ? worker_max_production
+                     : rand() % (worker_max_production - worker_min_production) + worker_min_production;
+
+        ret = pthread_create(&td->tid, NULL, worker_thread, td);
+        if (ret) {
+            const int err = AVERROR(ret);
+            av_log(NULL, AV_LOG_ERROR, "Unable to start worker thread: %s\n", av_err2str(err));
+            break;
+        }
+    }
+
+    av_log(NULL, AV_LOG_INFO, "All workers spawned, start consuming\n");
+    ret = consume_queue(queue, stop_production);
+
+    for (i = 0; i < nb_workers; i++) {
+        struct tdata *td = &workers[i];
+
+        ret = pthread_join(td->tid, NULL);
+        if (ret) {
+            const int err = AVERROR(ret);
+            av_log(NULL, AV_LOG_ERROR, "Unable to join worker thread: %s\n", av_err2str(err));
+            break;
+        }
+    }
+
+end:
+    av_thread_message_queue_free(&queue);
+    av_freep(&workers);
+
+    if (ret < 0 && ret != AVERROR_EOF) {
+        av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
+        return 1;
+    }
+    return 0;
+}
diff --git a/tests/fate/api.mak b/tests/fate/api.mak
index 325f64a..1c51dd3 100644
--- a/tests/fate/api.mak
+++ b/tests/fate/api.mak
@@ -28,6 +28,12 @@ FATE_API_SAMPLES_LIBAVFORMAT-yes += fate-api-jpeg-codec-param
 fate-api-jpeg-codec-param: $(APITESTSDIR)/api-codec-param-test$(EXESUF)
 fate-api-jpeg-codec-param: CMD = run $(APITESTSDIR)/api-codec-param-test $(TARGET_SAMPLES)/exif/image_small.jpg
 
+FATE_API-$(CONFIG_AVUTIL) += fate-api-threadmessage
+fate-api-threadmessage: $(APITESTSDIR)/api-threadmessage-test$(EXESUF)
+fate-api-threadmessage: CMD = run $(APITESTSDIR)/api-threadmessage-test 5 10 50 100 300
+fate-api-threadmessage: CMP = null
+fate-api-threadmessage: REF = /dev/null
+
 FATE_API_SAMPLES-$(CONFIG_AVFORMAT) += $(FATE_API_SAMPLES_LIBAVFORMAT-yes)
 
 ifdef SAMPLES
-- 
2.6.2



More information about the ffmpeg-devel mailing list