FFmpeg
libamqp.c
Go to the documentation of this file.
1 /*
2  * Advanced Message Queuing Protocol (AMQP) 0-9-1
3  * Copyright (c) 2020 Andriy Gelman
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <amqp.h>
23 #include <amqp_tcp_socket.h>
24 #include <sys/time.h>
25 #include "avformat.h"
26 #include "libavutil/mem.h"
27 #include "libavutil/opt.h"
28 #include "network.h"
29 #include "url.h"
30 #include "urldecode.h"
31 
32 typedef struct AMQPContext {
33  const AVClass *class;
34  amqp_connection_state_t conn;
35  amqp_socket_t *socket;
36  const char *exchange;
37  const char *routing_key;
38  int pkt_size;
42 } AMQPContext;
43 
44 #define STR_LEN 1024
45 #define DEFAULT_CHANNEL 1
46 
47 #define OFFSET(x) offsetof(AMQPContext, x)
48 #define D AV_OPT_FLAG_DECODING_PARAM
49 #define E AV_OPT_FLAG_ENCODING_PARAM
50 static const AVOption options[] = {
51  { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
52  { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
53  { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
54  { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags = D | E},
55  { "delivery_mode", "Delivery mode", OFFSET(delivery_mode), AV_OPT_TYPE_INT, { .i64 = AMQP_DELIVERY_PERSISTENT }, 1, 2, .flags = E, .unit = "delivery_mode"},
56  { "persistent", "Persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_PERSISTENT }, 0, 0, E, .unit = "delivery_mode" },
57  { "non-persistent", "Non-persistent delivery mode", 0, AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_NONPERSISTENT }, 0, 0, E, .unit = "delivery_mode" },
58  { NULL }
59 };
60 
61 static int amqp_proto_open(URLContext *h, const char *uri, int flags)
62 {
63  int ret, server_msg;
64  char hostname[STR_LEN], credentials[STR_LEN], path[STR_LEN];
65  int port;
66  const char *user, *password = NULL, *vhost;
67  const char *user_decoded, *password_decoded, *vhost_decoded;
68  char *p;
69  amqp_rpc_reply_t broker_reply;
70  struct timeval tval = { 0 };
71 
72  AMQPContext *s = h->priv_data;
73 
74  h->is_streamed = 1;
75  h->max_packet_size = s->pkt_size;
76 
77  av_url_split(NULL, 0, credentials, sizeof(credentials),
78  hostname, sizeof(hostname), &port, path, sizeof(path), uri);
79 
80  if (port < 0)
81  port = 5672;
82 
83  if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
84  av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
85  return AVERROR(EINVAL);
86  }
87 
88  p = strchr(credentials, ':');
89  if (p) {
90  *p = '\0';
91  password = p + 1;
92  }
93 
94  if (!password || *password == '\0')
95  password = "guest";
96 
97  password_decoded = ff_urldecode(password, 0);
98  if (!password_decoded)
99  return AVERROR(ENOMEM);
100 
101  user = credentials;
102  if (*user == '\0')
103  user = "guest";
104 
105  user_decoded = ff_urldecode(user, 0);
106  if (!user_decoded) {
107  av_freep(&password_decoded);
108  return AVERROR(ENOMEM);
109  }
110 
111  /* skip query for now */
112  p = strchr(path, '?');
113  if (p)
114  *p = '\0';
115 
116  vhost = path;
117  if (*vhost == '\0')
118  vhost = "/";
119  else
120  vhost++; /* skip leading '/' */
121 
122  vhost_decoded = ff_urldecode(vhost, 0);
123  if (!vhost_decoded) {
124  av_freep(&user_decoded);
125  av_freep(&password_decoded);
126  return AVERROR(ENOMEM);
127  }
128 
129  s->conn = amqp_new_connection();
130  if (!s->conn) {
131  av_freep(&vhost_decoded);
132  av_freep(&user_decoded);
133  av_freep(&password_decoded);
134  av_log(h, AV_LOG_ERROR, "Error creating connection\n");
135  return AVERROR_EXTERNAL;
136  }
137 
138  s->socket = amqp_tcp_socket_new(s->conn);
139  if (!s->socket) {
140  av_log(h, AV_LOG_ERROR, "Error creating socket\n");
141  goto destroy_connection;
142  }
143 
144  if (s->connection_timeout < 0)
145  s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);
146 
147  tval.tv_sec = s->connection_timeout / 1000000;
148  tval.tv_usec = s->connection_timeout % 1000000;
149  ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
150 
151  if (ret) {
152  av_log(h, AV_LOG_ERROR, "Error connecting to server: %s\n",
153  amqp_error_string2(ret));
154  goto destroy_connection;
155  }
156 
157  broker_reply = amqp_login(s->conn, vhost_decoded, 0, s->pkt_size, 0,
158  AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
159 
160  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
161  av_log(h, AV_LOG_ERROR, "Error login\n");
162  server_msg = AMQP_ACCESS_REFUSED;
163  goto close_connection;
164  }
165 
166  amqp_channel_open(s->conn, DEFAULT_CHANNEL);
167  broker_reply = amqp_get_rpc_reply(s->conn);
168 
169  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
170  av_log(h, AV_LOG_ERROR, "Error set channel\n");
171  server_msg = AMQP_CHANNEL_ERROR;
172  goto close_connection;
173  }
174 
175  if (h->flags & AVIO_FLAG_READ) {
176  amqp_bytes_t queuename;
177  char queuename_buff[STR_LEN];
178  amqp_queue_declare_ok_t *r;
179 
180  r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes,
181  0, 0, 0, 1, amqp_empty_table);
182  broker_reply = amqp_get_rpc_reply(s->conn);
183  if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
184  av_log(h, AV_LOG_ERROR, "Error declare queue\n");
185  server_msg = AMQP_RESOURCE_ERROR;
186  goto close_channel;
187  }
188 
189  /* store queuename */
190  queuename.bytes = queuename_buff;
191  queuename.len = FFMIN(r->queue.len, STR_LEN);
192  memcpy(queuename.bytes, r->queue.bytes, queuename.len);
193 
194  amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename,
195  amqp_cstring_bytes(s->exchange),
196  amqp_cstring_bytes(s->routing_key), amqp_empty_table);
197 
198  broker_reply = amqp_get_rpc_reply(s->conn);
199  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
200  av_log(h, AV_LOG_ERROR, "Queue bind error\n");
201  server_msg = AMQP_INTERNAL_ERROR;
202  goto close_channel;
203  }
204 
205  amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename, amqp_empty_bytes,
206  0, 1, 0, amqp_empty_table);
207 
208  broker_reply = amqp_get_rpc_reply(s->conn);
209  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
210  av_log(h, AV_LOG_ERROR, "Set consume error\n");
211  server_msg = AMQP_INTERNAL_ERROR;
212  goto close_channel;
213  }
214  }
215 
216  av_freep(&vhost_decoded);
217  av_freep(&user_decoded);
218  av_freep(&password_decoded);
219  return 0;
220 
221 close_channel:
222  amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg);
223 close_connection:
224  amqp_connection_close(s->conn, server_msg);
225 destroy_connection:
226  amqp_destroy_connection(s->conn);
227 
228  av_freep(&vhost_decoded);
229  av_freep(&user_decoded);
230  av_freep(&password_decoded);
231  return AVERROR_EXTERNAL;
232 }
233 
234 static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
235 {
236  int ret;
237  AMQPContext *s = h->priv_data;
238  int fd = amqp_socket_get_sockfd(s->socket);
239 
240  amqp_bytes_t message = { size, (void *)buf };
241  amqp_basic_properties_t props;
242 
243  ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout, &h->interrupt_callback);
244  if (ret)
245  return ret;
246 
247  props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
248  props.content_type = amqp_cstring_bytes("octet/stream");
249  props.delivery_mode = s->delivery_mode;
250 
251  ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, amqp_cstring_bytes(s->exchange),
252  amqp_cstring_bytes(s->routing_key), 0, 0,
253  &props, message);
254 
255  if (ret) {
256  av_log(h, AV_LOG_ERROR, "Error publish: %s\n", amqp_error_string2(ret));
257  return AVERROR_EXTERNAL;
258  }
259 
260  return size;
261 }
262 
263 static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
264 {
265  AMQPContext *s = h->priv_data;
266  int fd = amqp_socket_get_sockfd(s->socket);
267  int ret;
268 
269  amqp_rpc_reply_t broker_reply;
270  amqp_envelope_t envelope;
271 
272  ret = ff_network_wait_fd_timeout(fd, 0, h->rw_timeout, &h->interrupt_callback);
273  if (ret)
274  return ret;
275 
276  amqp_maybe_release_buffers(s->conn);
277  broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0);
278 
279  if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
280  return AVERROR_EXTERNAL;
281 
282  if (envelope.message.body.len > size) {
283  s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, envelope.message.body.len);
284  av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. "
285  "Message will be truncated. Setting -pkt_size %d "
286  "may resolve this issue.\n", s->pkt_size_overflow);
287  }
288  size = FFMIN(size, envelope.message.body.len);
289 
290  memcpy(buf, envelope.message.body.bytes, size);
291  amqp_destroy_envelope(&envelope);
292 
293  return size;
294 }
295 
297 {
298  AMQPContext *s = h->priv_data;
299  amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS);
300  amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS);
301  amqp_destroy_connection(s->conn);
302 
303  return 0;
304 }
305 
306 static const AVClass amqp_context_class = {
307  .class_name = "amqp",
308  .item_name = av_default_item_name,
309  .option = options,
310  .version = LIBAVUTIL_VERSION_INT,
311 };
312 
314  .name = "amqp",
315  .url_close = amqp_proto_close,
316  .url_open = amqp_proto_open,
317  .url_read = amqp_proto_read,
318  .url_write = amqp_proto_write,
319  .priv_data_size = sizeof(AMQPContext),
320  .priv_data_class = &amqp_context_class,
322 };
AV_LOG_WARNING
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:186
AMQPContext::conn
amqp_connection_state_t conn
Definition: libamqp.c:34
STR_LEN
#define STR_LEN
Definition: libamqp.c:44
r
const char * r
Definition: vf_curves.c:127
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
opt.h
URL_PROTOCOL_FLAG_NETWORK
#define URL_PROTOCOL_FLAG_NETWORK
Definition: url.h:33
message
Definition: api-threadmessage-test.c:47
exchange
static int exchange(MqcState *mqc, uint8_t *cxstate, int lps)
Definition: mqcdec.c:45
AVOption
AVOption.
Definition: opt.h:357
AV_OPT_TYPE_DURATION
@ AV_OPT_TYPE_DURATION
Definition: opt.h:259
amqp_proto_read
static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
Definition: libamqp.c:263
FFMAX
#define FFMAX(a, b)
Definition: macros.h:47
URLProtocol
Definition: url.h:51
ff_urldecode
char * ff_urldecode(const char *url, int decode_plus_sign)
Decodes an URL from its percent-encoded form back into normal representation.
Definition: urldecode.c:35
AMQPContext::socket
amqp_socket_t * socket
Definition: libamqp.c:35
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:180
s
#define s(width, name)
Definition: cbs_vp9.c:198
ff_libamqp_protocol
const URLProtocol ff_libamqp_protocol
Definition: libamqp.c:313
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:66
NULL
#define NULL
Definition: coverity.c:32
av_default_item_name
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:237
ff_network_wait_fd_timeout
int ff_network_wait_fd_timeout(int fd, int write, int64_t timeout, AVIOInterruptCB *int_cb)
This works similarly to ff_network_wait_fd, but waits up to 'timeout' microseconds Uses ff_network_wa...
Definition: network.c:83
size
int size
Definition: twinvq_data.h:10344
URLProtocol::name
const char * name
Definition: url.h:52
DEFAULT_CHANNEL
#define DEFAULT_CHANNEL
Definition: libamqp.c:45
AMQPContext::exchange
const char * exchange
Definition: libamqp.c:36
OFFSET
#define OFFSET(x)
Definition: libamqp.c:47
AVERROR_EXTERNAL
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:59
AMQPContext::delivery_mode
int delivery_mode
Definition: libamqp.c:41
amqp_proto_close
static int amqp_proto_close(URLContext *h)
Definition: libamqp.c:296
amqp_proto_open
static int amqp_proto_open(URLContext *h, const char *uri, int flags)
Definition: libamqp.c:61
URLContext
Definition: url.h:35
envelope
static float envelope(const float x)
Definition: vf_monochrome.c:45
amqp_context_class
static const AVClass amqp_context_class
Definition: libamqp.c:306
av_url_split
void av_url_split(char *proto, int proto_size, char *authorization, int authorization_size, char *hostname, int hostname_size, int *port_ptr, char *path, int path_size, const char *url)
Split a URL string into components.
Definition: utils.c:346
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
url.h
AMQPContext::connection_timeout
int64_t connection_timeout
Definition: libamqp.c:39
ret
ret
Definition: filter_design.txt:187
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:71
avformat.h
network.h
AMQPContext::pkt_size
int pkt_size
Definition: libamqp.c:38
urldecode.h
AV_OPT_TYPE_INT
@ AV_OPT_TYPE_INT
Definition: opt.h:245
options
static const AVOption options[]
Definition: libamqp.c:50
AVIO_FLAG_READ
#define AVIO_FLAG_READ
read-only
Definition: avio.h:617
mem.h
AMQPContext
Definition: libamqp.c:32
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:474
AMQPContext::pkt_size_overflow
int pkt_size_overflow
Definition: libamqp.c:40
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
E
#define E
Definition: libamqp.c:49
h
h
Definition: vp9dsp_template.c:2038
AV_OPT_TYPE_STRING
@ AV_OPT_TYPE_STRING
Definition: opt.h:249
D
#define D
Definition: libamqp.c:48
AMQPContext::routing_key
const char * routing_key
Definition: libamqp.c:37
amqp_proto_write
static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
Definition: libamqp.c:234
AV_OPT_TYPE_CONST
@ AV_OPT_TYPE_CONST
Definition: opt.h:254