[FFmpeg-devel] Implement RTMP flow control and proper (or least better) closing

Momtchil Momtchev momtchil at momtchev.com
Thu Feb 1 18:40:33 EET 2024


https://trac.ffmpeg.org/ticket/10838


Hello,


This small PR implements:

     * RTMP flow control on the server side - now the server will stop 
sending data if it has filled the window and the client hasn't ACKed

     * Improves the tear-down by not returning an error when the server 
closes the connection (which seems to be the usual method of signalling 
the end of the connection)

     * Fixes the client ignoring the last frames



-- 
Momtchil Momtchev <momtchil at momtchev.com>
-------------- next part --------------
diff --git a/doc/protocols.texi b/doc/protocols.texi
index f54600b846..642cde962a 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -948,6 +948,9 @@ URL to player swf file, compute hash/size automatically.
 @item rtmp_tcurl
 URL of the target stream. Defaults to proto://host[:port]/app.
 
+ at item rtmp_window
+Size of the RTMP window. Defaults to 2500000.
+
 @item tcp_nodelay=@var{1|0}
 Set TCP_NODELAY to disable Nagle's algorithm. Default value is 0.
 
diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c
index a602bf6a96..3fcb4e8f14 100644
--- a/libavformat/rtmppkt.c
+++ b/libavformat/rtmppkt.c
@@ -159,7 +159,10 @@ int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p,
 {
     uint8_t hdr;
 
-    if (ffurl_read(h, &hdr, 1) != 1)
+    int ret = ffurl_read(h, &hdr, 1);
+    if (ret == AVERROR_EOF)
+        return AVERROR_EOF;
+    if (ret != 1)
         return AVERROR(EIO);
 
     return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt,
diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
index 98718bc6da..6aed9db3b0 100644
--- a/libavformat/rtmpproto.c
+++ b/libavformat/rtmpproto.c
@@ -97,6 +97,8 @@ typedef struct RTMPContext {
     uint32_t      receive_report_size;        ///< number of bytes after which we should report the number of received bytes to the peer
     uint64_t      bytes_read;                 ///< number of bytes read from server
     uint64_t      last_bytes_read;            ///< number of bytes read last reported to server
+    uint64_t      bytes_sent;                 ///< number of bytes sent to the client
+    uint64_t      last_bytes_sent;            ///< number of bytes last acknowledged by the client
     uint32_t      last_timestamp;             ///< last timestamp received in a packet
     int           skip_bytes;                 ///< number of bytes to skip from the input FLV stream in the next write call
     int           has_audio;                  ///< presence of audio data
@@ -251,6 +253,7 @@ static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track)
 
     ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size,
                                &rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
+    rt->bytes_sent += pkt->size;
 fail:
     ff_rtmp_packet_destroy(pkt);
     return ret;
@@ -472,7 +475,8 @@ static int read_connect(URLContext *s, RTMPContext *rt)
             ff_rtmp_packet_destroy(&pkt);
             return AVERROR_UNKNOWN;
         } else if (pkt.type == RTMP_PT_BYTES_READ) {
-            av_log(s, AV_LOG_TRACE, "received acknowledgement\n");
+            rt->last_bytes_read = AV_RB32(pkt.data);
+            av_log(s, AV_LOG_TRACE, "received acknowledgement (%lu)\n", rt->last_bytes_read);
         } else if (pkt.type == RTMP_PT_WINDOW_ACK_SIZE) {
             if ((ret = handle_window_ack_size(s, &pkt)) < 0) {
                 ff_rtmp_packet_destroy(&pkt);
@@ -2344,7 +2348,8 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
 
     switch (pkt->type) {
     case RTMP_PT_BYTES_READ:
-        av_log(s, AV_LOG_TRACE, "received bytes read report\n");
+        rt->last_bytes_sent = AV_RB32(pkt->data);
+        av_log(s, AV_LOG_TRACE, "received bytes read report (%lu)\n", rt->last_bytes_sent);
         break;
     case RTMP_PT_CHUNK_SIZE:
         if ((ret = handle_chunk_size(s, pkt)) < 0)
@@ -2455,6 +2460,8 @@ static int get_packet(URLContext *s, int for_header)
                                        &rt->nb_prev_pkt[0])) <= 0) {
             if (ret == 0) {
                 return AVERROR(EAGAIN);
+            } else if (ret == AVERROR_EOF) {
+                return AVERROR_EOF;
             } else {
                 return AVERROR(EIO);
             }
@@ -2468,7 +2475,7 @@ static int get_packet(URLContext *s, int for_header)
             av_log(s, AV_LOG_DEBUG, "Sending bytes read report\n");
             if ((ret = gen_bytes_read(s, rt, rpkt.timestamp + 1)) < 0) {
                 ff_rtmp_packet_destroy(&rpkt);
-                return ret;
+                av_log(s, AV_LOG_DEBUG, "Failed ACKing to the server, connection finished?\n");
             }
             rt->last_bytes_read = rt->bytes_read;
         }
@@ -2835,11 +2842,12 @@ reconnect:
 
     rt->receive_report_size = 1048576;
     rt->bytes_read = 0;
+    rt->bytes_sent = 0;
     rt->has_audio = 0;
     rt->has_video = 0;
     rt->received_metadata = 0;
     rt->last_bytes_read = 0;
-    rt->max_sent_unacked = 2500000;
+    rt->last_bytes_sent = 0;
     rt->duration = 0;
 
     av_log(s, AV_LOG_DEBUG, "Proto = %s, path = %s, app = %s, fname = %s\n",
@@ -3095,8 +3103,14 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
         return size;
     rt->flv_nb_packets = 0;
 
-    /* set stream into nonblocking mode */
-    rt->stream->flags |= AVIO_FLAG_NONBLOCK;
+    /* it is time to start throttling? */
+    if (rt->bytes_sent < rt->last_bytes_sent + rt->max_sent_unacked) {
+        /* set stream into nonblocking mode */
+        rt->stream->flags |= AVIO_FLAG_NONBLOCK;
+    } else {
+        av_log(s, AV_LOG_DEBUG, "Throttling, sent %lu bytes, client has acknowledged %lu bytes\n", rt->bytes_sent,
+             rt->last_bytes_sent);
+    }
 
     /* try to read one byte from the stream */
     ret = ffurl_read(rt->stream, &c, 1);
@@ -3139,6 +3153,7 @@ static const AVOption rtmp_options[] = {
     {"rtmp_flush_interval", "Number of packets flushed in the same request (RTMPT only).", OFFSET(flush_interval), AV_OPT_TYPE_INT, {.i64 = 10}, 0, INT_MAX, ENC},
     {"rtmp_enhanced_codecs", "Specify the codec(s) to use in an enhanced rtmp live stream", OFFSET(enhanced_codecs), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, ENC},
     {"rtmp_live", "Specify that the media is a live stream.", OFFSET(live), AV_OPT_TYPE_INT, {.i64 = -2}, INT_MIN, INT_MAX, DEC, "rtmp_live"},
+    {"rtmp_window", "RTMP window size (server only).", OFFSET(max_sent_unacked), AV_OPT_TYPE_INT, {.i64 = 2500000}, INT_MIN, INT_MAX, DEC, "rtmp_window"},
     {"any", "both", 0, AV_OPT_TYPE_CONST, {.i64 = -2}, 0, 0, DEC, "rtmp_live"},
     {"live", "live stream", 0, AV_OPT_TYPE_CONST, {.i64 = -1}, 0, 0, DEC, "rtmp_live"},
     {"recorded", "recorded stream", 0, AV_OPT_TYPE_CONST, {.i64 = 0}, 0, 0, DEC, "rtmp_live"},


More information about the ffmpeg-devel mailing list