[FFmpeg-devel] [PATCH] avfilter/asubprocess: add audio subprocess filter

Stefan Westerfeld stefan at space.twc.de
Thu Jun 27 19:22:22 EEST 2024


This will send the audio stream to an external command as wav file and
read back the output of the subprocess which is also supposed to be a
wav file of the same format and length.

=== Examples:

# filter mp3 audio file using sox to decrease volume:
ffmpeg -i in.mp3 -af "asubprocess=sox --ignore-length - -t wav - gain -10" out.mp3

# filter mp3 audio file using ffmpeg to decrease volume:
ffmpeg -i in.mp3 -af "asubprocess=ffmpeg -v quiet -i - -af volume=0.2 -f wav -" out.mp3

=== Why?

The main reason I implemented this is to be able to use the audiowmark
open source audio watermarker directly from ffmpeg. Integrating it in
this (loose coupled) way will allow binaries of ffmpeg to ship with the
asubprocess filter by default, while audiowmark can evolve at its own
pace, and users can combine any version of audiowmark with ffmpeg.

One use case is described here:

https://github.com/swesterfeld/audiowmark/issues/56

# typical use case to watermark a mp3 file preserving tags
ffmpeg -i input.mp3 -af "asubprocess=audiowmark add --format wav-pipe - - f0" output.mp3

Once this is integrated, other external programs could use the same
interface to add audio filters to ffmpeg without changing ffmpeg itself.

Signed-off-by: Stefan Westerfeld <stefan at space.twc.de>
---
 libavfilter/Makefile         |   1 +
 libavfilter/af_asubprocess.c | 754 +++++++++++++++++++++++++++++++++++
 libavfilter/allfilters.c     |   1 +
 3 files changed, 756 insertions(+)
 create mode 100644 libavfilter/af_asubprocess.c

diff --git a/libavfilter/Makefile b/libavfilter/Makefile
index 5992fd161f..e43382a65a 100644
--- a/libavfilter/Makefile
+++ b/libavfilter/Makefile
@@ -110,6 +110,7 @@ OBJS-$(CONFIG_ASR_FILTER)                    += af_asr.o
 OBJS-$(CONFIG_ASTATS_FILTER)                 += af_astats.o
 OBJS-$(CONFIG_ASTREAMSELECT_FILTER)          += f_streamselect.o framesync.o
 OBJS-$(CONFIG_ASUBBOOST_FILTER)              += af_asubboost.o
+OBJS-$(CONFIG_ASUBPROCESS_FILTER)            += af_asubprocess.o
 OBJS-$(CONFIG_ASUBCUT_FILTER)                += af_asupercut.o
 OBJS-$(CONFIG_ASUPERCUT_FILTER)              += af_asupercut.o
 OBJS-$(CONFIG_ASUPERPASS_FILTER)             += af_asupercut.o
diff --git a/libavfilter/af_asubprocess.c b/libavfilter/af_asubprocess.c
new file mode 100644
index 0000000000..a5bc836911
--- /dev/null
+++ b/libavfilter/af_asubprocess.c
@@ -0,0 +1,754 @@
+/*
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "libavutil/opt.h"
+#include "libavutil/mem.h"
+#include "libavutil/intreadwrite.h"
+#include "libavutil/avassert.h"
+#include "libavformat/avio.h"
+#include "libavformat/avio_internal.h"
+
+#include "audio.h"
+#include "avfilter.h"
+#include "filters.h"
+#include "internal.h"
+#include "framequeue.h"
+
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <sys/stat.h>
+
+#include <stdio.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <string.h>
+
+#define BUFFER_SIZE             1024 * 128
+#define MAX_INPUT_BEFORE_DATA   16 * 1024 * 1024 /* max number of bytes before data chunk */
+
+typedef struct SPB
+{
+    struct SPB *next;
+    int         offset;
+    int         count;
+    char        data[BUFFER_SIZE];
+} SPB;
+
+typedef struct SP
+{
+    int     input_pipe;
+    int     output_pipe;
+    SPB    *out_buffers;
+    SPB    *unused_buffers;
+    pid_t   pid;
+    size_t  input_count;
+    size_t  can_read;
+    bool    done;
+} SP;
+
+enum {
+    STATE_START = 0,
+    STATE_EXPECT_RIFF,
+    STATE_EXPECT_CHUNK,
+    STATE_SKIP_CHUNK,
+    STATE_IN_DATA_CHUNK,
+    STATE_IN_FMT_CHUNK,
+    STATE_EXT_FMT
+};
+
+typedef struct ASubProcessContext {
+    const AVClass *class;
+
+    const char   *command;
+    int           state;
+    unsigned int  chunk_size;
+    int           in_bit_depth;
+    int           out_bit_depth;
+    SP           *sp;
+    void         *sample_buffer;
+    size_t        sample_buffer_size;
+    int           eof;
+    int64_t       last_pts;
+    uint64_t      nb_input_samples;
+    uint64_t      nb_output_samples;
+    FFFrameQueue  frame_queue;
+} ASubProcessContext;
+
+#define OFFSET(x) offsetof(ASubProcessContext, x)
+#define A AV_OPT_FLAG_AUDIO_PARAM|AV_OPT_FLAG_FILTERING_PARAM
+
+static const AVOption asubprocess_options[] = {
+    { "command",    "set command to run as subprocess",       OFFSET(command),       AV_OPT_TYPE_STRING, {.str=NULL},  0, 0, A},
+    { "bits",       "set wav bit depth for subprocess input", OFFSET(in_bit_depth),  AV_OPT_TYPE_INT,    {.i64=32},    0, 32, A},
+    { NULL },
+};
+
+AVFILTER_DEFINE_CLASS(asubprocess);
+
+static SPB *spb_new(AVFilterContext *ctx)
+{
+    ASubProcessContext *s = ctx->priv;
+    SPB *spb = s->sp->unused_buffers;
+    if (spb) {
+        s->sp->unused_buffers = spb->next;
+    }
+    else {
+        spb = av_malloc(sizeof(SPB));
+        if (!spb)
+            return NULL;
+    }
+
+    spb->next = NULL;
+    spb->offset = 0;
+    spb->count = 0;
+    return spb;
+}
+
+static void spb_free_all(SPB **spbp)
+{
+    SPB *spb = *spbp;
+    while (spb) {
+        SPB *to_free = spb;
+        spb = spb->next;
+        av_free(to_free);
+    }
+    *spbp = NULL;
+}
+
+static void
+close_pipe(int *pipefd)
+{
+    close(pipefd[0]);
+    close(pipefd[1]);
+}
+
+static void
+sp_cleanup(SP *sp)
+{
+    if (sp->input_pipe >= 0)
+        close(sp->input_pipe);
+    if (sp->output_pipe >= 0)
+        close(sp->output_pipe);
+
+    if (!sp->done) {
+        int status;
+        waitpid(sp->pid, &status, 0);
+    }
+
+    spb_free_all(&sp->unused_buffers);
+    spb_free_all(&sp->out_buffers);
+    av_free(sp);
+}
+
+static int
+sp_new(AVFilterContext *ctx)
+{
+    ASubProcessContext *s = ctx->priv;
+    SP *sp;
+    int output_pipe[2];
+    int input_pipe[2];
+    int i;
+    pid_t pid;
+
+    if (pipe(output_pipe) < 0) {
+        int ret = AVERROR(errno);
+        av_log(ctx, AV_LOG_ERROR, "Failed create subprocess output pipe: %s.\n", strerror(errno));
+        return ret;
+    }
+    if (pipe(input_pipe) < 0) {
+        int ret = AVERROR(errno);
+        av_log(ctx, AV_LOG_ERROR, "Failed create subprocess input pipe: %s.\n", strerror(errno));
+        close_pipe(output_pipe);
+        return ret;
+    }
+    pid = fork();
+    if (pid < 0) {
+        int ret = AVERROR(errno);
+        close_pipe(input_pipe);
+        close_pipe(output_pipe);
+        av_log(ctx, AV_LOG_ERROR, "Failed to fork() subprocess: %s.\n", strerror(errno));
+        return ret;
+    }
+    if (pid == 0) {
+        if (dup2(input_pipe[0], 0) < 0) {
+            perror("asubprocess dup2() for stdin failed");
+            exit(EXIT_FAILURE);
+        }
+        if (dup2(output_pipe[1], 1) < 0) {
+            perror("asubprocess dup2() for stdout failed");
+            exit(EXIT_FAILURE);
+        }
+        close_pipe(input_pipe);
+        close_pipe(output_pipe);
+        execl("/bin/sh", "/bin/sh", "-c", s->command, NULL);
+        perror("asubprocess execl() failed");
+        exit(EXIT_FAILURE);
+    }
+    sp = av_mallocz(sizeof(SP));
+    sp->pid = pid;
+    sp->input_pipe = input_pipe[1];
+    sp->output_pipe = output_pipe[0];
+
+    close(input_pipe[0]);
+    close(output_pipe[1]);
+
+    /* make output pipe non blocking */
+    i = fcntl(output_pipe[0], F_GETFL);
+    if (i < 0) {
+        int ret = AVERROR(errno);
+        sp_cleanup(sp);
+        av_log(ctx, AV_LOG_ERROR, "Failed to get output pipe flags: %s.\n", strerror(errno));
+        return ret;
+    }
+    i |= O_NONBLOCK;
+    fcntl(output_pipe[0], F_SETFL, i);
+
+    /* make input pipe non blocking */
+    i = fcntl(input_pipe[1], F_GETFL);
+    if (i < 0) {
+        int ret = AVERROR(errno);
+        sp_cleanup(sp);
+        av_log(ctx, AV_LOG_ERROR, "Failed to get input pipe flags: %s.\n", strerror(errno));
+        return ret;
+    }
+    i |= O_NONBLOCK;
+    fcntl(input_pipe[1], F_SETFL, i);
+
+    s->sp = sp;
+    return 0;
+}
+
+static size_t
+sp_can_read(SP *sp)
+{
+    return sp->can_read;
+}
+
+static void
+sp_read(SP *sp, char *buffer, size_t count)
+{
+    SPB *spb = sp->out_buffers;
+    av_assert0(count <= sp->can_read); // caller should check this before calling sp_read
+    sp->can_read -= count;
+    while (spb) {
+        int n = FFMIN(spb->count - spb->offset, count);
+
+        memcpy(buffer, spb->data + spb->offset, n);
+        spb->offset += n;
+        count -= n;
+        buffer += n;
+        if (spb->count - spb->offset)
+            return;
+
+        sp->out_buffers = spb->next;
+        spb->next = sp->unused_buffers;
+        sp->unused_buffers = spb;
+        spb = sp->out_buffers;
+    }
+}
+
+static int try_read_frame(AVFilterContext *ctx);
+
+static int process_input(AVFilterContext *ctx)
+{
+    AVFilterLink *outlink = ctx->outputs[0];
+    ASubProcessContext *s = ctx->priv;
+
+    while (s->state != STATE_IN_DATA_CHUNK) {
+        size_t can_read = sp_can_read(s->sp);
+
+        if (s->state == STATE_EXPECT_RIFF && can_read >= 12) {
+            char buffer[12];
+            sp_read(s->sp, buffer, 12);
+            if ((memcmp (buffer, "RIFF", 4) && memcmp (buffer, "RF64", 4)) || memcmp (buffer + 8, "WAVE", 4)) {
+                av_log(ctx, AV_LOG_ERROR, "Subprocess output is not a valid wav file.\n");
+                return AVERROR_INVALIDDATA;
+            }
+            s->state = STATE_EXPECT_CHUNK;
+        }
+        else if (s->state == STATE_EXPECT_CHUNK && can_read >= 8) {
+            unsigned char x[8];
+            sp_read(s->sp, x, 8);
+            s->chunk_size = AV_RL32 (x + 4);
+            if (!memcmp (x, "data", 4))
+                s->state = STATE_IN_DATA_CHUNK;
+            else if (!memcmp (x, "fmt ", 4) && s->chunk_size >= 16)
+                s->state = STATE_IN_FMT_CHUNK;
+            else
+                s->state = STATE_SKIP_CHUNK;
+        }
+        else if (s->state == STATE_IN_FMT_CHUNK && can_read >= 16) {
+            unsigned char data[16];
+            int format, nb_channels, sample_rate;
+            sp_read(s->sp, data, 16);
+
+            format           = AV_RL16(data);
+            nb_channels      = AV_RL16(data + 2);
+            sample_rate      = AV_RL32(data + 4);
+            s->out_bit_depth = AV_RL16(data + 14);
+
+            if (format != 1 && format != 0xFFFE) {
+                av_log(ctx, AV_LOG_ERROR,
+                       "Unsupported wav format (%d) from subprocess (expected PCM).\n", format);
+                return AVERROR_INVALIDDATA;
+            }
+            if (nb_channels != outlink->ch_layout.nb_channels) {
+                av_log(ctx, AV_LOG_ERROR,
+                       "Number of output channels (%d) from subprocess doesn't match input channels (%d).\n",
+                       nb_channels, outlink->ch_layout.nb_channels);
+                return AVERROR_INVALIDDATA;
+            }
+            if (sample_rate != outlink->sample_rate) {
+                av_log(ctx, AV_LOG_ERROR,
+                       "Sample rate (%d) from subprocess doesn't match input sample rate (%d).\n",
+                       sample_rate, outlink->sample_rate);
+                return AVERROR_INVALIDDATA;
+            }
+            if (s->out_bit_depth != 8 && s->out_bit_depth != 16 && s->out_bit_depth != 24 && s->out_bit_depth != 32) {
+                av_log(ctx, AV_LOG_ERROR, "Unsupported output wav bit depth (%d) from subprocess.\n", s->out_bit_depth);
+                return AVERROR_INVALIDDATA;
+            }
+            s->chunk_size -= 16;
+            if (format == 0xFFFE)
+                s->state = STATE_EXT_FMT;
+            else
+                s->state = STATE_SKIP_CHUNK;
+        }
+        else if (s->state == STATE_EXT_FMT && can_read >= 24) {
+            unsigned char data[24];
+            static const unsigned char pcm_fmt_guid[16] = {
+               0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00,
+               0x80, 0x00, 0x00, 0xAA, 0x00, 0x38, 0x9B, 0x71
+            };
+            sp_read(s->sp, data, 24);
+            if (memcmp (data + 8, pcm_fmt_guid, 16)) {
+                av_log(ctx, AV_LOG_ERROR, "Unsupported extended wav format from subprocess (expected PCM).\n");
+                return AVERROR_INVALIDDATA;
+            }
+            s->chunk_size -= 24;
+            s->state = STATE_SKIP_CHUNK;
+        }
+        else if (s->state == STATE_SKIP_CHUNK && can_read > 0) {
+            unsigned int to_skip = FFMIN(s->chunk_size, can_read);
+            s->chunk_size -= to_skip;
+            if (!s->chunk_size)
+                s->state = STATE_EXPECT_CHUNK;
+
+            while (to_skip) {
+                char buffer[1024];
+                unsigned int n = FFMIN(sizeof(buffer), to_skip);
+                sp_read(s->sp, buffer, n);
+                to_skip -= n;
+            }
+        }
+        else {
+            if (s->sp->input_count > MAX_INPUT_BEFORE_DATA) {
+                av_log(ctx, AV_LOG_ERROR, "Subprocess output is not a valid wav file: no 'data' chunk found.\n");
+                return AVERROR_INVALIDDATA;
+            }
+            break;
+        }
+    }
+    if (s->state == STATE_IN_DATA_CHUNK) {
+        int ret;
+        int bytes_per_sample = s->out_bit_depth / 8 * outlink->ch_layout.nb_channels;
+        size_t expect_bytes = (s->nb_input_samples - s->nb_output_samples) * bytes_per_sample;
+        if ((s->nb_input_samples * bytes_per_sample) % 2) // padding byte for odd length input
+            expect_bytes++;
+        if (sp_can_read(s->sp) > expect_bytes) {
+            av_log(ctx, AV_LOG_ERROR, "Subprocess produced more output data than expected.\n");
+            return AVERROR_INVALIDDATA;
+        }
+        do {
+            ret = try_read_frame(ctx);
+            if (ret < 0)
+                return ret;
+        } while (ret > 0);
+    }
+    return 0;
+}
+
+static int
+sp_write(AVFilterContext *ctx, char *buffer, int count)
+{
+    ASubProcessContext *s = ctx->priv;
+    SP *sp = s->sp;
+    int offset = 0;
+    if (count == 0 && sp->input_pipe >= 0) { // eof
+        close(sp->input_pipe);
+        sp->input_pipe = -1;
+    }
+    if (count && sp->done) {
+        av_log(ctx, AV_LOG_ERROR, "Subprocess terminated before all input data could be written.\n");
+        return AVERROR(EIO);
+    }
+    do {
+        int rc;
+        struct pollfd fds[2] = {
+            { .fd = sp->input_pipe, .events = POLLOUT, .revents = 0 },
+            { .fd = sp->output_pipe, .events = POLLIN, .revents = 0 },
+        };
+        errno = 0;
+        rc = poll(fds, 2, -1);
+        if ((rc <= 0 && errno != EAGAIN && errno != EINTR) || (fds[0].revents & POLLNVAL) || (fds[1].revents & (POLLERR | POLLNVAL))) {
+            av_log(ctx, AV_LOG_ERROR, "Poll for subprocess failed.\n");
+            return AVERROR(errno ? errno : EIO);
+        }
+        if (fds[0].revents & (POLLOUT | POLLHUP)) {
+            rc = write(sp->input_pipe, &buffer[offset], count);
+            if (rc > 0) {
+                offset += rc;
+                count -= rc;
+            }
+            else if (rc < 0 && errno != EAGAIN && errno != EINTR) {
+                int ret = AVERROR(errno);
+                av_log(ctx, AV_LOG_ERROR, "Write to subprocess failed: %s.\n", strerror(errno));
+                return ret;
+            }
+        }
+        if (fds[1].revents & (POLLIN | POLLHUP)) {
+            SPB *last;
+            int ret;
+
+            if (!sp->out_buffers) {
+                sp->out_buffers = spb_new(ctx);
+                if (!sp->out_buffers)
+                    return AVERROR(ENOMEM);
+            }
+
+            last = sp->out_buffers;
+            while (last->next)
+                last = last->next;
+
+            if (last->count == sizeof(last->data)) {
+                last->next = spb_new(ctx);
+                if (!last->next)
+                    return AVERROR(ENOMEM);
+                last = last->next;
+            }
+            rc = read(sp->output_pipe, last->data + last->count, sizeof(last->data) - last->count);
+            if (rc > 0) {
+                last->count += rc;
+                sp->input_count += rc;
+                sp->can_read += rc;
+                ret = process_input(ctx);
+                if (ret != 0)
+                    return ret;
+            }
+            else if (rc == 0) {
+                int status, exit_status;
+                waitpid(sp->pid, &status, 0);
+                exit_status = WEXITSTATUS(status);
+                sp->done = true;
+
+                if (exit_status != 0) {
+                    av_log(ctx, AV_LOG_ERROR, "Subprocess failed with non-zero exit code (%d).\n", exit_status);
+                    return AVERROR(EIO);
+                }
+                if (count) {
+                    av_log(ctx, AV_LOG_ERROR, "Subprocess terminated before all input data could be written.\n");
+                    return AVERROR(EIO);
+                }
+                return 0;
+            }
+            else if (errno != EAGAIN && errno != EINTR) {
+                ret = AVERROR(errno);
+                av_log(ctx, AV_LOG_ERROR, "Read from subprocess failed: %s.\n", strerror(errno));
+                return ret;
+            }
+        }
+    } while (count);
+    return 0;
+}
+
+static bool
+sp_done (SP *sp)
+{
+    return sp->done;
+}
+
+static av_cold int init(AVFilterContext *ctx)
+{
+    ASubProcessContext *s = ctx->priv;
+    FFFrameQueueGlobal fqg;
+
+    if (!s->command) {
+        av_log(ctx, AV_LOG_ERROR, "No command provided\n");
+        return AVERROR(EINVAL);
+    }
+    if (s->in_bit_depth != 16 && s->in_bit_depth != 24 && s->in_bit_depth != 32) {
+        av_log(ctx, AV_LOG_ERROR, "Only 16, 24 and 32 input bit depth supported\n");
+        return AVERROR(EINVAL);
+    }
+    ff_framequeue_global_init(&fqg);
+    ff_framequeue_init(&s->frame_queue, &fqg);
+    return 0;
+}
+
+static av_cold void uninit(AVFilterContext *ctx)
+{
+    ASubProcessContext *s = ctx->priv;
+
+    if (s->sp)
+        sp_cleanup(s->sp);
+
+    ff_framequeue_free(&s->frame_queue);
+    av_freep(&s->sample_buffer);
+}
+
+static int write_wav_header(AVFilterContext *ctx, int nb_channels, int sample_rate)
+{
+    ASubProcessContext *s = ctx->priv;
+    int bit_depth = s->in_bit_depth;
+    AVIOContext *wav_header = NULL;
+    uint8_t *buffer = NULL;
+    int size;
+    int ret;
+
+    avio_open_dyn_buf(&wav_header);
+    ffio_wfourcc(wav_header, "RIFF");
+    avio_wl32(wav_header, -1);
+    ffio_wfourcc(wav_header, "WAVE");
+    // subchunk 1
+    ffio_wfourcc(wav_header, "fmt ");
+    avio_wl32(wav_header, 16); // subchunk size
+    avio_wl16(wav_header, 1);  // uncompressed audio
+    avio_wl16(wav_header, nb_channels);
+    avio_wl32(wav_header, sample_rate);
+    avio_wl32(wav_header, sample_rate * nb_channels * bit_depth / 8); // byte rate
+    avio_wl16(wav_header, nb_channels * bit_depth / 8); // block align
+    avio_wl16(wav_header, bit_depth); // bits per sample
+
+    // subchunk 2
+    ffio_wfourcc(wav_header, "data");
+    avio_wl32(wav_header, -1);
+    size = avio_close_dyn_buf(wav_header, &buffer);
+    ret = sp_write(ctx,buffer, size);
+    av_free(buffer);
+    return ret;
+}
+
+static void read_samples(ASubProcessContext *s, int32_t *data, int count)
+{
+    int sample_size = s->out_bit_depth / 8;
+#if !HAVE_BIGENDIAN
+    if (s->out_bit_depth == 32) {
+        /* optimized case: on little endian systems we don't need any conversion */
+        sp_read(s->sp, (char *)data, count * sample_size);
+        return;
+    }
+#endif
+    sp_read(s->sp, s->sample_buffer, count * sample_size);
+    if (s->out_bit_depth == 8) {
+        int8_t *in = s->sample_buffer;
+        for (int k = 0; k < count; k++)
+            data[k] = (in[k] << 24) + 0x80000000;
+    }
+    if (s->out_bit_depth == 16) {
+        int16_t *in = s->sample_buffer;
+        for (int k = 0; k < count; k++)
+            data[k] = AV_RL16(in + k) << 16;
+    }
+    if (s->out_bit_depth == 24) {
+        uint8_t *in = s->sample_buffer;
+        for (int k = 0; k < count; k++)
+        {
+            data[k] = AV_RL24(in) << 8;
+            in += 3;
+        }
+    }
+    if (s->out_bit_depth == 32) {
+        int32_t *in = s->sample_buffer;
+        for (int k = 0; k < count; k++)
+            data[k] = AV_RL32(in + k);
+    }
+}
+
+static int write_samples(AVFilterContext *ctx, int32_t *data, int count)
+{
+    ASubProcessContext *s = ctx->priv;
+    int sample_size = s->in_bit_depth / 8;
+    int bsize = count * 4; /* allocate sample buffer for 32 bit per sample (required for conversions of input/output) */
+    if (s->sample_buffer_size < bsize) {
+        s->sample_buffer = av_realloc(s->sample_buffer, bsize);
+        if (!s->sample_buffer)
+            return AVERROR(ENOMEM);
+        s->sample_buffer_size = bsize;
+    }
+
+#if !HAVE_BIGENDIAN
+    if (s->in_bit_depth == 32) {
+        /* optimized case: on little endian systems we don't need any conversion */
+        return sp_write(ctx, (char *)data, sample_size * count);
+    }
+#endif
+    if (s->in_bit_depth == 16) {
+        int16_t *out = s->sample_buffer;
+        for (int k = 0; k < count; k++)
+            AV_WL16(out + k, data[k] >> 16);
+    }
+    if (s->in_bit_depth == 24) {
+        uint8_t *out = s->sample_buffer;
+        for (int k = 0; k < count; k++) {
+            AV_WL24(out, data[k] >> 8);
+            out += 3;
+        }
+    }
+    if (s->in_bit_depth == 32) {
+        int32_t *out = s->sample_buffer;
+        for (int k = 0; k < count; k++)
+            AV_WL32(out + k, data[k]);
+    }
+    return sp_write(ctx, s->sample_buffer, sample_size * count);
+}
+
+static int write_frame(AVFilterContext *ctx, AVFilterLink *inlink, AVFrame *in)
+{
+    ASubProcessContext *s = ctx->priv;
+    AVFrame *out;
+    int ret;
+
+    s->nb_input_samples += in->nb_samples;
+    ret = write_samples(ctx, (int32_t *)in->data[0], in->nb_samples * inlink->ch_layout.nb_channels);
+    if (ret < 0) {
+        av_frame_free(&in);
+        return ret;
+    }
+
+    if (av_frame_is_writable(in)) {
+        out = in;
+    } else {
+        out = ff_get_audio_buffer(ctx->outputs[0], in->nb_samples);
+        if (!out) {
+            av_frame_free(&in);
+            return AVERROR(ENOMEM);
+        }
+        ret = av_frame_copy_props(out, in);
+        if (ret < 0) {
+            av_frame_free(&in);
+            av_frame_free(&out);
+            return ret;
+        }
+        av_frame_free(&in);
+    }
+    ff_framequeue_add(&s->frame_queue, out);
+    return ret;
+}
+
+static int try_read_frame(AVFilterContext *ctx)
+{
+    AVFilterLink *outlink = ctx->outputs[0];
+    ASubProcessContext *s = ctx->priv;
+    int ret;
+
+    if (s->state == STATE_IN_DATA_CHUNK && ff_framequeue_queued_frames(&s->frame_queue)) {
+        AVFrame *out = ff_framequeue_peek(&s->frame_queue, 0);
+        int sample_size = s->out_bit_depth / 8;
+
+        size_t avail = sp_can_read(s->sp) / sample_size / outlink->ch_layout.nb_channels;
+        if (avail >= out->nb_samples) {
+            out = ff_framequeue_take(&s->frame_queue);
+            av_assert0(out->nb_samples <= s->sample_buffer_size);
+            s->nb_output_samples += out->nb_samples;
+            read_samples(s, (int32_t *)out->data[0], out->nb_samples * outlink->ch_layout.nb_channels);
+            ret = ff_filter_frame(outlink, out);
+            if (ret < 0)
+                return ret;
+            return out->nb_samples;
+        }
+    }
+
+    return 0;
+}
+
+static int activate(AVFilterContext *ctx)
+{
+    AVFilterLink *inlink = ctx->inputs[0];
+    AVFilterLink *outlink = ctx->outputs[0];
+    ASubProcessContext *s = ctx->priv;
+    AVFrame *in = NULL;
+    int status, ret;
+
+    if (s->state == STATE_START) {
+        ret = sp_new(ctx);
+        if (ret != 0)
+            return ret;
+
+        s->state = STATE_EXPECT_RIFF;
+
+        ret = write_wav_header(ctx, inlink->ch_layout.nb_channels, inlink->sample_rate);
+        if (ret != 0)
+            return ret;
+    }
+
+    FF_FILTER_FORWARD_STATUS_BACK(outlink, inlink);
+
+    ret = ff_inlink_consume_frame(inlink, &in);
+    if (ret < 0)
+        return ret;
+    if (ret > 0) {
+        ret = write_frame(ctx, inlink, in);
+
+        if (ff_inlink_queued_samples(inlink) >= 0)
+            ff_filter_set_ready(ctx, 100);
+
+        if (ret != 0)
+            return ret;
+    }
+
+    if (!s->eof && ff_inlink_acknowledge_status(inlink, &status, &s->last_pts))
+        s->eof |= status == AVERROR_EOF;
+
+    if (s->eof) {
+        while (!sp_done(s->sp)) {
+            ret = sp_write(ctx, 0, 0);
+            if (ret != 0)
+                return ret;
+        }
+        if (s->state != STATE_IN_DATA_CHUNK) {
+            av_log(ctx, AV_LOG_ERROR, "Subprocess output wav is incomplete (no data chunk found)\n");
+            return AVERROR_INVALIDDATA;
+        }
+        if (s->nb_input_samples != s->nb_output_samples) {
+            av_log(ctx, AV_LOG_ERROR, "Subprocess output wav is incomplete (%lu input samples, %lu output samples)\n", s->nb_input_samples, s->nb_output_samples);
+            return AVERROR_INVALIDDATA;
+        }
+        ff_outlink_set_status(outlink, AVERROR_EOF, s->last_pts);
+        return 0;
+    }
+
+    FF_FILTER_FORWARD_WANTED(outlink, inlink);
+
+    return FFERROR_NOT_READY;
+}
+
+const AVFilter ff_af_asubprocess = {
+    .name          = "asubprocess",
+    .description   = NULL_IF_CONFIG_SMALL("Filter audio stream with subprocess."),
+    .priv_size     = sizeof(ASubProcessContext),
+    .priv_class    = &asubprocess_class,
+    .init          = init,
+    .uninit        = uninit,
+    .activate      = activate,
+    FILTER_INPUTS(ff_audio_default_filterpad),
+    FILTER_OUTPUTS(ff_audio_default_filterpad),
+    FILTER_SINGLE_SAMPLEFMT(AV_SAMPLE_FMT_S32),
+};
diff --git a/libavfilter/allfilters.c b/libavfilter/allfilters.c
index c532682fc2..e9ae4265ca 100644
--- a/libavfilter/allfilters.c
+++ b/libavfilter/allfilters.c
@@ -101,6 +101,7 @@ extern const AVFilter ff_af_asubcut;
 extern const AVFilter ff_af_asupercut;
 extern const AVFilter ff_af_asuperpass;
 extern const AVFilter ff_af_asuperstop;
+extern const AVFilter ff_af_asubprocess;
 extern const AVFilter ff_af_atempo;
 extern const AVFilter ff_af_atilt;
 extern const AVFilter ff_af_atrim;
-- 
2.34.1



More information about the ffmpeg-devel mailing list