FFmpeg
libamqp.c
Go to the documentation of this file.
1 /*
2  * Advanced Message Queuing Protocol (AMQP) 0-9-1
3  * Copyright (c) 2020 Andriy Gelman
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <amqp.h>
23 #include <amqp_tcp_socket.h>
24 #include <sys/time.h>
25 #include "avformat.h"
26 #include "libavutil/avstring.h"
27 #include "libavutil/opt.h"
28 #include "libavutil/time.h"
29 #include "network.h"
30 #include "url.h"
31 #include "urldecode.h"
32 
33 typedef struct AMQPContext {
34  const AVClass *class;
35  amqp_connection_state_t conn;
36  amqp_socket_t *socket;
37  const char *exchange;
38  const char *routing_key;
39  int pkt_size;
43 } AMQPContext;
44 
45 #define STR_LEN 1024
46 #define DEFAULT_CHANNEL 1
47 
48 #define OFFSET(x) offsetof(AMQPContext, x)
49 #define D AV_OPT_FLAG_DECODING_PARAM
50 #define E AV_OPT_FLAG_ENCODING_PARAM
51 static const AVOption options[] = {
52  { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
53  { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
54  { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
55  { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags = D | E},
56  { "delivery_mode", "Delivery mode", OFFSET(delivery_mode), AV_OPT_TYPE_INT, { .i64 = AMQP_DELIVERY_PERSISTENT }, 1, 2, .flags = E, "delivery_mode"},
57  { "persistent", "Persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_PERSISTENT }, 0, 0, E, "delivery_mode" },
58  { "non-persistent", "Non-persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_NONPERSISTENT }, 0, 0, E, "delivery_mode" },
59  { NULL }
60 };
61 
62 static int amqp_proto_open(URLContext *h, const char *uri, int flags)
63 {
64  int ret, server_msg;
65  char hostname[STR_LEN], credentials[STR_LEN], path[STR_LEN];
66  int port;
67  const char *user, *password = NULL, *vhost;
68  const char *user_decoded, *password_decoded, *vhost_decoded;
69  char *p;
70  amqp_rpc_reply_t broker_reply;
71  struct timeval tval = { 0 };
72 
73  AMQPContext *s = h->priv_data;
74 
75  h->is_streamed = 1;
76  h->max_packet_size = s->pkt_size;
77 
78  av_url_split(NULL, 0, credentials, sizeof(credentials),
79  hostname, sizeof(hostname), &port, path, sizeof(path), uri);
80 
81  if (port < 0)
82  port = 5672;
83 
84  if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
85  av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
86  return AVERROR(EINVAL);
87  }
88 
89  p = strchr(credentials, ':');
90  if (p) {
91  *p = '\0';
92  password = p + 1;
93  }
94 
95  if (!password || *password == '\0')
96  password = "guest";
97 
98  password_decoded = ff_urldecode(password, 0);
99  if (!password_decoded)
100  return AVERROR(ENOMEM);
101 
102  user = credentials;
103  if (*user == '\0')
104  user = "guest";
105 
106  user_decoded = ff_urldecode(user, 0);
107  if (!user_decoded) {
108  av_freep(&password_decoded);
109  return AVERROR(ENOMEM);
110  }
111 
112  /* skip query for now */
113  p = strchr(path, '?');
114  if (p)
115  *p = '\0';
116 
117  vhost = path;
118  if (*vhost == '\0')
119  vhost = "/";
120  else
121  vhost++; /* skip leading '/' */
122 
123  vhost_decoded = ff_urldecode(vhost, 0);
124  if (!vhost_decoded) {
125  av_freep(&user_decoded);
126  av_freep(&password_decoded);
127  return AVERROR(ENOMEM);
128  }
129 
130  s->conn = amqp_new_connection();
131  if (!s->conn) {
132  av_freep(&vhost_decoded);
133  av_freep(&user_decoded);
134  av_freep(&password_decoded);
135  av_log(h, AV_LOG_ERROR, "Error creating connection\n");
136  return AVERROR_EXTERNAL;
137  }
138 
139  s->socket = amqp_tcp_socket_new(s->conn);
140  if (!s->socket) {
141  av_log(h, AV_LOG_ERROR, "Error creating socket\n");
142  goto destroy_connection;
143  }
144 
145  if (s->connection_timeout < 0)
146  s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);
147 
148  tval.tv_sec = s->connection_timeout / 1000000;
149  tval.tv_usec = s->connection_timeout % 1000000;
150  ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
151 
152  if (ret) {
153  av_log(h, AV_LOG_ERROR, "Error connecting to server: %s\n",
154  amqp_error_string2(ret));
155  goto destroy_connection;
156  }
157 
158  broker_reply = amqp_login(s->conn, vhost_decoded, 0, s->pkt_size, 0,
159  AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
160 
161  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
162  av_log(h, AV_LOG_ERROR, "Error login\n");
163  server_msg = AMQP_ACCESS_REFUSED;
164  goto close_connection;
165  }
166 
167  amqp_channel_open(s->conn, DEFAULT_CHANNEL);
168  broker_reply = amqp_get_rpc_reply(s->conn);
169 
170  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
171  av_log(h, AV_LOG_ERROR, "Error set channel\n");
172  server_msg = AMQP_CHANNEL_ERROR;
173  goto close_connection;
174  }
175 
176  if (h->flags & AVIO_FLAG_READ) {
177  amqp_bytes_t queuename;
178  char queuename_buff[STR_LEN];
179  amqp_queue_declare_ok_t *r;
180 
181  r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes,
182  0, 0, 0, 1, amqp_empty_table);
183  broker_reply = amqp_get_rpc_reply(s->conn);
184  if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
185  av_log(h, AV_LOG_ERROR, "Error declare queue\n");
186  server_msg = AMQP_RESOURCE_ERROR;
187  goto close_channel;
188  }
189 
190  /* store queuename */
191  queuename.bytes = queuename_buff;
192  queuename.len = FFMIN(r->queue.len, STR_LEN);
193  memcpy(queuename.bytes, r->queue.bytes, queuename.len);
194 
195  amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename,
196  amqp_cstring_bytes(s->exchange),
197  amqp_cstring_bytes(s->routing_key), amqp_empty_table);
198 
199  broker_reply = amqp_get_rpc_reply(s->conn);
200  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
201  av_log(h, AV_LOG_ERROR, "Queue bind error\n");
202  server_msg = AMQP_INTERNAL_ERROR;
203  goto close_channel;
204  }
205 
206  amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename, amqp_empty_bytes,
207  0, 1, 0, amqp_empty_table);
208 
209  broker_reply = amqp_get_rpc_reply(s->conn);
210  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
211  av_log(h, AV_LOG_ERROR, "Set consume error\n");
212  server_msg = AMQP_INTERNAL_ERROR;
213  goto close_channel;
214  }
215  }
216 
217  av_freep(&vhost_decoded);
218  av_freep(&user_decoded);
219  av_freep(&password_decoded);
220  return 0;
221 
222 close_channel:
223  amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg);
224 close_connection:
225  amqp_connection_close(s->conn, server_msg);
226 destroy_connection:
227  amqp_destroy_connection(s->conn);
228 
229  av_freep(&vhost_decoded);
230  av_freep(&user_decoded);
231  av_freep(&password_decoded);
232  return AVERROR_EXTERNAL;
233 }
234 
235 static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
236 {
237  int ret;
238  AMQPContext *s = h->priv_data;
239  int fd = amqp_socket_get_sockfd(s->socket);
240 
241  amqp_bytes_t message = { size, (void *)buf };
242  amqp_basic_properties_t props;
243 
244  ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout, &h->interrupt_callback);
245  if (ret)
246  return ret;
247 
248  props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
249  props.content_type = amqp_cstring_bytes("octet/stream");
250  props.delivery_mode = s->delivery_mode;
251 
252  ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
253  amqp_cstring_bytes(s->routing_key), 0, 0,
254  &props, message);
255 
256  if (ret) {
257  av_log(h, AV_LOG_ERROR, "Error publish: %s\n", amqp_error_string2(ret));
258  return AVERROR_EXTERNAL;
259  }
260 
261  return size;
262 }
263 
264 static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
265 {
266  AMQPContext *s = h->priv_data;
267  int fd = amqp_socket_get_sockfd(s->socket);
268  int ret;
269 
270  amqp_rpc_reply_t broker_reply;
271  amqp_envelope_t envelope;
272 
273  ret = ff_network_wait_fd_timeout(fd, 0, h->rw_timeout, &h->interrupt_callback);
274  if (ret)
275  return ret;
276 
277  amqp_maybe_release_buffers(s->conn);
278  broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0);
279 
280  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
281  return AVERROR_EXTERNAL;
282 
283  if (envelope.message.body.len > size) {
284  s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, envelope.message.body.len);
285  av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. "
286  "Message will be truncated. Setting -pkt_size %d "
287  "may resolve this issue.\n", s->pkt_size_overflow);
288  }
289  size = FFMIN(size, envelope.message.body.len);
290 
291  memcpy(buf, envelope.message.body.bytes, size);
292  amqp_destroy_envelope(&envelope);
293 
294  return size;
295 }
296 
298 {
299  AMQPContext *s = h->priv_data;
300  amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS);
301  amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS);
302  amqp_destroy_connection(s->conn);
303 
304  return 0;
305 }
306 
307 static const AVClass amqp_context_class = {
308  .class_name = "amqp",
309  .item_name = av_default_item_name,
310  .option = options,
311  .version = LIBAVUTIL_VERSION_INT,
312 };
313 
315  .name = "amqp",
316  .url_close = amqp_proto_close,
317  .url_open = amqp_proto_open,
318  .url_read = amqp_proto_read,
319  .url_write = amqp_proto_write,
320  .priv_data_size = sizeof(AMQPContext),
321  .priv_data_class = &amqp_context_class,
323 };
AV_LOG_WARNING
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:186
AMQPContext::conn
amqp_connection_state_t conn
Definition: libamqp.c:35
STR_LEN
#define STR_LEN
Definition: libamqp.c:45
r
const char * r
Definition: vf_curves.c:126
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
opt.h
URL_PROTOCOL_FLAG_NETWORK
#define URL_PROTOCOL_FLAG_NETWORK
Definition: url.h:33
message
Definition: api-threadmessage-test.c:46
exchange
static int exchange(MqcState *mqc, uint8_t *cxstate, int lps)
Definition: mqcdec.c:45
AVOption
AVOption.
Definition: opt.h:251
AV_OPT_TYPE_DURATION
@ AV_OPT_TYPE_DURATION
Definition: opt.h:239
amqp_proto_read
static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
Definition: libamqp.c:264
FFMAX
#define FFMAX(a, b)
Definition: macros.h:47
URLProtocol
Definition: url.h:53
ff_urldecode
char * ff_urldecode(const char *url, int decode_plus_sign)
Decodes an URL from its percent-encoded form back into normal representation.
Definition: urldecode.c:35
AMQPContext::socket
amqp_socket_t * socket
Definition: libamqp.c:36
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:180
s
#define s(width, name)
Definition: cbs_vp9.c:198
ff_libamqp_protocol
const URLProtocol ff_libamqp_protocol
Definition: libamqp.c:314
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:66
NULL
#define NULL
Definition: coverity.c:32
av_default_item_name
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:237
ff_network_wait_fd_timeout
int ff_network_wait_fd_timeout(int fd, int write, int64_t timeout, AVIOInterruptCB *int_cb)
This works similarly to ff_network_wait_fd, but waits up to 'timeout' microseconds Uses ff_network_wa...
Definition: network.c:78
time.h
size
int size
Definition: twinvq_data.h:10344
URLProtocol::name
const char * name
Definition: url.h:54
DEFAULT_CHANNEL
#define DEFAULT_CHANNEL
Definition: libamqp.c:46
AMQPContext::exchange
const char * exchange
Definition: libamqp.c:37
OFFSET
#define OFFSET(x)
Definition: libamqp.c:48
AVERROR_EXTERNAL
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:59
AMQPContext::delivery_mode
int delivery_mode
Definition: libamqp.c:42
amqp_proto_close
static int amqp_proto_close(URLContext *h)
Definition: libamqp.c:297
amqp_proto_open
static int amqp_proto_open(URLContext *h, const char *uri, int flags)
Definition: libamqp.c:62
URLContext
Definition: url.h:37
envelope
static float envelope(const float x)
Definition: vf_monochrome.c:45
amqp_context_class
static const AVClass amqp_context_class
Definition: libamqp.c:307
av_url_split
void av_url_split(char *proto, int proto_size, char *authorization, int authorization_size, char *hostname, int hostname_size, int *port_ptr, char *path, int path_size, const char *url)
Split a URL string into components.
Definition: utils.c:358
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
url.h
AMQPContext::connection_timeout
int64_t connection_timeout
Definition: libamqp.c:40
ret
ret
Definition: filter_design.txt:187
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:71
avformat.h
network.h
AMQPContext::pkt_size
int pkt_size
Definition: libamqp.c:39
urldecode.h
AV_OPT_TYPE_INT
@ AV_OPT_TYPE_INT
Definition: opt.h:225
options
static const AVOption options[]
Definition: libamqp.c:51
AVIO_FLAG_READ
#define AVIO_FLAG_READ
read-only
Definition: avio.h:636
AMQPContext
Definition: libamqp.c:33
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:474
AMQPContext::pkt_size_overflow
int pkt_size_overflow
Definition: libamqp.c:41
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
E
#define E
Definition: libamqp.c:50
h
h
Definition: vp9dsp_template.c:2038
avstring.h
AV_OPT_TYPE_STRING
@ AV_OPT_TYPE_STRING
Definition: opt.h:229
D
#define D
Definition: libamqp.c:49
AMQPContext::routing_key
const char * routing_key
Definition: libamqp.c:38
amqp_proto_write
static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
Definition: libamqp.c:235
AV_OPT_TYPE_CONST
@ AV_OPT_TYPE_CONST
Definition: opt.h:234