[FFmpeg-devel] Implement RTMP flow control and proper (or least better) closing
Momtchil Momtchev
momtchil at momtchev.com
Thu Feb 1 20:00:33 EET 2024
Alternative patch which eliminates the I/O error, but slightly modifies
the behavior by introducing a two-stage shutdown (the only difference is
the end of the rtmp_close function) which keeps the server open until
the client closes its end.
On 01/02/2024 17:40, Momtchil Momtchev wrote:
>
> https://trac.ffmpeg.org/ticket/10838
>
>
> Hello,
>
>
> This small PR implements:
>
> * RTMP flow control on the server side - now the server will stop
> sending data if it has filled the window and the client hasn't ACKed
>
> * Improves the tear-down by not returning an error when the server
> closes the connection (which seems to be the usual method of
> signalling the end of the connection)
>
> * Fixes the client ignoring the last frames
>
>
>
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel at ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request at ffmpeg.org with subject "unsubscribe".
--
Momtchil Momtchev <momtchil at momtchev.com>
-------------- next part --------------
diff --git a/doc/protocols.texi b/doc/protocols.texi
index f54600b846..642cde962a 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -948,6 +948,9 @@ URL to player swf file, compute hash/size automatically.
@item rtmp_tcurl
URL of the target stream. Defaults to proto://host[:port]/app.
+ at item rtmp_window
+Size of the RTMP window. Defaults to 2500000.
+
@item tcp_nodelay=@var{1|0}
Set TCP_NODELAY to disable Nagle's algorithm. Default value is 0.
diff --git a/libavformat/rtmppkt.c b/libavformat/rtmppkt.c
index a602bf6a96..3fcb4e8f14 100644
--- a/libavformat/rtmppkt.c
+++ b/libavformat/rtmppkt.c
@@ -159,7 +159,10 @@ int ff_rtmp_packet_read(URLContext *h, RTMPPacket *p,
{
uint8_t hdr;
- if (ffurl_read(h, &hdr, 1) != 1)
+ int ret = ffurl_read(h, &hdr, 1);
+ if (ret == AVERROR_EOF)
+ return AVERROR_EOF;
+ if (ret != 1)
return AVERROR(EIO);
return ff_rtmp_packet_read_internal(h, p, chunk_size, prev_pkt,
diff --git a/libavformat/rtmpproto.c b/libavformat/rtmpproto.c
index 98718bc6da..189e1c7d02 100644
--- a/libavformat/rtmpproto.c
+++ b/libavformat/rtmpproto.c
@@ -97,6 +97,8 @@ typedef struct RTMPContext {
uint32_t receive_report_size; ///< number of bytes after which we should report the number of received bytes to the peer
uint64_t bytes_read; ///< number of bytes read from server
uint64_t last_bytes_read; ///< number of bytes read last reported to server
+ uint64_t bytes_sent; ///< number of bytes sent to the client
+ uint64_t last_bytes_sent; ///< number of bytes last acknowledged by the client
uint32_t last_timestamp; ///< last timestamp received in a packet
int skip_bytes; ///< number of bytes to skip from the input FLV stream in the next write call
int has_audio; ///< presence of audio data
@@ -251,6 +253,7 @@ static int rtmp_send_packet(RTMPContext *rt, RTMPPacket *pkt, int track)
ret = ff_rtmp_packet_write(rt->stream, pkt, rt->out_chunk_size,
&rt->prev_pkt[1], &rt->nb_prev_pkt[1]);
+ rt->bytes_sent += pkt->size;
fail:
ff_rtmp_packet_destroy(pkt);
return ret;
@@ -472,7 +475,8 @@ static int read_connect(URLContext *s, RTMPContext *rt)
ff_rtmp_packet_destroy(&pkt);
return AVERROR_UNKNOWN;
} else if (pkt.type == RTMP_PT_BYTES_READ) {
- av_log(s, AV_LOG_TRACE, "received acknowledgement\n");
+ rt->last_bytes_read = AV_RB32(pkt.data);
+ av_log(s, AV_LOG_TRACE, "received acknowledgement (%lu)\n", rt->last_bytes_read);
} else if (pkt.type == RTMP_PT_WINDOW_ACK_SIZE) {
if ((ret = handle_window_ack_size(s, &pkt)) < 0) {
ff_rtmp_packet_destroy(&pkt);
@@ -2344,7 +2348,8 @@ static int rtmp_parse_result(URLContext *s, RTMPContext *rt, RTMPPacket *pkt)
switch (pkt->type) {
case RTMP_PT_BYTES_READ:
- av_log(s, AV_LOG_TRACE, "received bytes read report\n");
+ rt->last_bytes_sent = AV_RB32(pkt->data);
+ av_log(s, AV_LOG_TRACE, "received bytes read report (%lu)\n", rt->last_bytes_sent);
break;
case RTMP_PT_CHUNK_SIZE:
if ((ret = handle_chunk_size(s, pkt)) < 0)
@@ -2455,6 +2460,8 @@ static int get_packet(URLContext *s, int for_header)
&rt->nb_prev_pkt[0])) <= 0) {
if (ret == 0) {
return AVERROR(EAGAIN);
+ } else if (ret == AVERROR_EOF) {
+ return AVERROR_EOF;
} else {
return AVERROR(EIO);
}
@@ -2468,7 +2475,7 @@ static int get_packet(URLContext *s, int for_header)
av_log(s, AV_LOG_DEBUG, "Sending bytes read report\n");
if ((ret = gen_bytes_read(s, rt, rpkt.timestamp + 1)) < 0) {
ff_rtmp_packet_destroy(&rpkt);
- return ret;
+ av_log(s, AV_LOG_DEBUG, "Failed ACKing to the server, connection finished?\n");
}
rt->last_bytes_read = rt->bytes_read;
}
@@ -2529,6 +2536,7 @@ static int rtmp_close(URLContext *h)
{
RTMPContext *rt = h->priv_data;
int ret = 0, i, j;
+ uint8_t c;
if (!rt->is_input) {
rt->flv_data = NULL;
@@ -2547,7 +2555,12 @@ static int rtmp_close(URLContext *h)
free_tracked_methods(rt);
av_freep(&rt->flv_data);
- ffurl_closep(&rt->stream);
+ if (rt->state > STATE_HANDSHAKED) {
+ ffurl_shutdown(rt->stream, AVIO_FLAG_WRITE);
+ while (ffurl_read(rt->stream, &c, 1) >= 0);
+ } else {
+ ffurl_closep(&rt->stream);
+ }
return ret;
}
@@ -2835,11 +2848,12 @@ reconnect:
rt->receive_report_size = 1048576;
rt->bytes_read = 0;
+ rt->bytes_sent = 0;
rt->has_audio = 0;
rt->has_video = 0;
rt->received_metadata = 0;
rt->last_bytes_read = 0;
- rt->max_sent_unacked = 2500000;
+ rt->last_bytes_sent = 0;
rt->duration = 0;
av_log(s, AV_LOG_DEBUG, "Proto = %s, path = %s, app = %s, fname = %s\n",
@@ -3095,8 +3109,14 @@ static int rtmp_write(URLContext *s, const uint8_t *buf, int size)
return size;
rt->flv_nb_packets = 0;
- /* set stream into nonblocking mode */
- rt->stream->flags |= AVIO_FLAG_NONBLOCK;
+ /* it is time to start throttling? */
+ if (rt->bytes_sent < rt->last_bytes_sent + rt->max_sent_unacked) {
+ /* set stream into nonblocking mode */
+ rt->stream->flags |= AVIO_FLAG_NONBLOCK;
+ } else {
+ av_log(s, AV_LOG_DEBUG, "Throttling, sent %lu bytes, client has acknowledged %lu bytes\n", rt->bytes_sent,
+ rt->last_bytes_sent);
+ }
/* try to read one byte from the stream */
ret = ffurl_read(rt->stream, &c, 1);
@@ -3139,6 +3159,7 @@ static const AVOption rtmp_options[] = {
{"rtmp_flush_interval", "Number of packets flushed in the same request (RTMPT only).", OFFSET(flush_interval), AV_OPT_TYPE_INT, {.i64 = 10}, 0, INT_MAX, ENC},
{"rtmp_enhanced_codecs", "Specify the codec(s) to use in an enhanced rtmp live stream", OFFSET(enhanced_codecs), AV_OPT_TYPE_STRING, {.str = NULL }, 0, 0, ENC},
{"rtmp_live", "Specify that the media is a live stream.", OFFSET(live), AV_OPT_TYPE_INT, {.i64 = -2}, INT_MIN, INT_MAX, DEC, "rtmp_live"},
+ {"rtmp_window", "RTMP window size (server only).", OFFSET(max_sent_unacked), AV_OPT_TYPE_INT, {.i64 = 2500000}, INT_MIN, INT_MAX, DEC, "rtmp_window"},
{"any", "both", 0, AV_OPT_TYPE_CONST, {.i64 = -2}, 0, 0, DEC, "rtmp_live"},
{"live", "live stream", 0, AV_OPT_TYPE_CONST, {.i64 = -1}, 0, 0, DEC, "rtmp_live"},
{"recorded", "recorded stream", 0, AV_OPT_TYPE_CONST, {.i64 = 0}, 0, 0, DEC, "rtmp_live"},
More information about the ffmpeg-devel
mailing list