00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00027 #define _BSD_SOURCE
00028
00029 #include "avformat.h"
00030 #include "avio_internal.h"
00031 #include "libavutil/parseutils.h"
00032 #include "libavutil/fifo.h"
00033 #include <unistd.h>
00034 #include "internal.h"
00035 #include "network.h"
00036 #include "os_support.h"
00037 #include "url.h"
00038
00039 #if HAVE_PTHREADS
00040 #include <pthread.h>
00041 #endif
00042
00043 #include <sys/time.h>
00044
00045 #ifndef IPV6_ADD_MEMBERSHIP
00046 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
00047 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
00048 #endif
00049
00050 typedef struct {
00051 int udp_fd;
00052 int ttl;
00053 int buffer_size;
00054 int is_multicast;
00055 int local_port;
00056 int reuse_socket;
00057 struct sockaddr_storage dest_addr;
00058 int dest_addr_len;
00059 int is_connected;
00060
00061
00062 int circular_buffer_size;
00063 AVFifoBuffer *fifo;
00064 int circular_buffer_error;
00065 #if HAVE_PTHREADS
00066 pthread_t circular_buffer_thread;
00067 #endif
00068 } UDPContext;
00069
00070 #define UDP_TX_BUF_SIZE 32768
00071 #define UDP_MAX_PKT_SIZE 65536
00072
00073 static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
00074 struct sockaddr *addr)
00075 {
00076 #ifdef IP_MULTICAST_TTL
00077 if (addr->sa_family == AF_INET) {
00078 if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) {
00079 av_log(NULL, AV_LOG_ERROR, "setsockopt(IP_MULTICAST_TTL): %s\n", strerror(errno));
00080 return -1;
00081 }
00082 }
00083 #endif
00084 #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
00085 if (addr->sa_family == AF_INET6) {
00086 if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) {
00087 av_log(NULL, AV_LOG_ERROR, "setsockopt(IPV6_MULTICAST_HOPS): %s\n", strerror(errno));
00088 return -1;
00089 }
00090 }
00091 #endif
00092 return 0;
00093 }
00094
00095 static int udp_join_multicast_group(int sockfd, struct sockaddr *addr)
00096 {
00097 #ifdef IP_ADD_MEMBERSHIP
00098 if (addr->sa_family == AF_INET) {
00099 struct ip_mreq mreq;
00100
00101 mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
00102 mreq.imr_interface.s_addr= INADDR_ANY;
00103 if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
00104 av_log(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP): %s\n", strerror(errno));
00105 return -1;
00106 }
00107 }
00108 #endif
00109 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
00110 if (addr->sa_family == AF_INET6) {
00111 struct ipv6_mreq mreq6;
00112
00113 memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
00114 mreq6.ipv6mr_interface= 0;
00115 if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
00116 av_log(NULL, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP): %s\n", strerror(errno));
00117 return -1;
00118 }
00119 }
00120 #endif
00121 return 0;
00122 }
00123
00124 static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr)
00125 {
00126 #ifdef IP_DROP_MEMBERSHIP
00127 if (addr->sa_family == AF_INET) {
00128 struct ip_mreq mreq;
00129
00130 mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
00131 mreq.imr_interface.s_addr= INADDR_ANY;
00132 if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
00133 av_log(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP): %s\n", strerror(errno));
00134 return -1;
00135 }
00136 }
00137 #endif
00138 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
00139 if (addr->sa_family == AF_INET6) {
00140 struct ipv6_mreq mreq6;
00141
00142 memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
00143 mreq6.ipv6mr_interface= 0;
00144 if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
00145 av_log(NULL, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP): %s\n", strerror(errno));
00146 return -1;
00147 }
00148 }
00149 #endif
00150 return 0;
00151 }
00152
00153 static struct addrinfo* udp_resolve_host(const char *hostname, int port,
00154 int type, int family, int flags)
00155 {
00156 struct addrinfo hints, *res = 0;
00157 int error;
00158 char sport[16];
00159 const char *node = 0, *service = "0";
00160
00161 if (port > 0) {
00162 snprintf(sport, sizeof(sport), "%d", port);
00163 service = sport;
00164 }
00165 if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) {
00166 node = hostname;
00167 }
00168 memset(&hints, 0, sizeof(hints));
00169 hints.ai_socktype = type;
00170 hints.ai_family = family;
00171 hints.ai_flags = flags;
00172 if ((error = getaddrinfo(node, service, &hints, &res))) {
00173 res = NULL;
00174 av_log(NULL, AV_LOG_ERROR, "udp_resolve_host: %s\n", gai_strerror(error));
00175 }
00176
00177 return res;
00178 }
00179
00180 static int udp_set_url(struct sockaddr_storage *addr,
00181 const char *hostname, int port)
00182 {
00183 struct addrinfo *res0;
00184 int addr_len;
00185
00186 res0 = udp_resolve_host(hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
00187 if (res0 == 0) return AVERROR(EIO);
00188 memcpy(addr, res0->ai_addr, res0->ai_addrlen);
00189 addr_len = res0->ai_addrlen;
00190 freeaddrinfo(res0);
00191
00192 return addr_len;
00193 }
00194
00195 static int udp_socket_create(UDPContext *s,
00196 struct sockaddr_storage *addr, int *addr_len)
00197 {
00198 int udp_fd = -1;
00199 struct addrinfo *res0 = NULL, *res = NULL;
00200 int family = AF_UNSPEC;
00201
00202 if (((struct sockaddr *) &s->dest_addr)->sa_family)
00203 family = ((struct sockaddr *) &s->dest_addr)->sa_family;
00204 res0 = udp_resolve_host(0, s->local_port, SOCK_DGRAM, family, AI_PASSIVE);
00205 if (res0 == 0)
00206 goto fail;
00207 for (res = res0; res; res=res->ai_next) {
00208 udp_fd = socket(res->ai_family, SOCK_DGRAM, 0);
00209 if (udp_fd > 0) break;
00210 av_log(NULL, AV_LOG_ERROR, "socket: %s\n", strerror(errno));
00211 }
00212
00213 if (udp_fd < 0)
00214 goto fail;
00215
00216 memcpy(addr, res->ai_addr, res->ai_addrlen);
00217 *addr_len = res->ai_addrlen;
00218
00219 freeaddrinfo(res0);
00220
00221 return udp_fd;
00222
00223 fail:
00224 if (udp_fd >= 0)
00225 closesocket(udp_fd);
00226 if(res0)
00227 freeaddrinfo(res0);
00228 return -1;
00229 }
00230
00231 static int udp_port(struct sockaddr_storage *addr, int addr_len)
00232 {
00233 char sbuf[sizeof(int)*3+1];
00234
00235 if (getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0, sbuf, sizeof(sbuf), NI_NUMERICSERV) != 0) {
00236 av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", strerror(errno));
00237 return -1;
00238 }
00239
00240 return strtol(sbuf, NULL, 10);
00241 }
00242
00243
00259 int ff_udp_set_remote_url(URLContext *h, const char *uri)
00260 {
00261 UDPContext *s = h->priv_data;
00262 char hostname[256], buf[10];
00263 int port;
00264 const char *p;
00265
00266 av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
00267
00268
00269 s->dest_addr_len = udp_set_url(&s->dest_addr, hostname, port);
00270 if (s->dest_addr_len < 0) {
00271 return AVERROR(EIO);
00272 }
00273 s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
00274 p = strchr(uri, '?');
00275 if (p) {
00276 if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
00277 int was_connected = s->is_connected;
00278 s->is_connected = strtol(buf, NULL, 10);
00279 if (s->is_connected && !was_connected) {
00280 if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
00281 s->dest_addr_len)) {
00282 s->is_connected = 0;
00283 av_log(h, AV_LOG_ERROR, "connect: %s\n", strerror(errno));
00284 return AVERROR(EIO);
00285 }
00286 }
00287 }
00288 }
00289
00290 return 0;
00291 }
00292
00298 int ff_udp_get_local_port(URLContext *h)
00299 {
00300 UDPContext *s = h->priv_data;
00301 return s->local_port;
00302 }
00303
00309 static int udp_get_file_handle(URLContext *h)
00310 {
00311 UDPContext *s = h->priv_data;
00312 return s->udp_fd;
00313 }
00314
00315 static void *circular_buffer_task( void *_URLContext)
00316 {
00317 URLContext *h = _URLContext;
00318 UDPContext *s = h->priv_data;
00319 fd_set rfds;
00320 struct timeval tv;
00321
00322 for(;;) {
00323 int left;
00324 int ret;
00325 int len;
00326
00327 if (url_interrupt_cb()) {
00328 s->circular_buffer_error = EINTR;
00329 return NULL;
00330 }
00331
00332 FD_ZERO(&rfds);
00333 FD_SET(s->udp_fd, &rfds);
00334 tv.tv_sec = 1;
00335 tv.tv_usec = 0;
00336 ret = select(s->udp_fd + 1, &rfds, NULL, NULL, &tv);
00337 if (ret < 0) {
00338 if (ff_neterrno() == AVERROR(EINTR))
00339 continue;
00340 s->circular_buffer_error = EIO;
00341 return NULL;
00342 }
00343
00344 if (!(ret > 0 && FD_ISSET(s->udp_fd, &rfds)))
00345 continue;
00346
00347
00348
00349 left = av_fifo_space(s->fifo);
00350 left = FFMIN(left, s->fifo->end - s->fifo->wptr);
00351
00352
00353 if( !left) {
00354 av_log(h, AV_LOG_ERROR, "circular_buffer: OVERRUN\n");
00355 s->circular_buffer_error = EIO;
00356 return NULL;
00357 }
00358
00359 len = recv(s->udp_fd, s->fifo->wptr, left, 0);
00360 if (len < 0) {
00361 if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
00362 s->circular_buffer_error = EIO;
00363 return NULL;
00364 }
00365 }
00366 s->fifo->wptr += len;
00367 if (s->fifo->wptr >= s->fifo->end)
00368 s->fifo->wptr = s->fifo->buffer;
00369 s->fifo->wndx += len;
00370 }
00371
00372 return NULL;
00373 }
00374
00375
00376
00377 static int udp_open(URLContext *h, const char *uri, int flags)
00378 {
00379 char hostname[1024];
00380 int port, udp_fd = -1, tmp, bind_ret = -1;
00381 UDPContext *s = NULL;
00382 int is_output;
00383 const char *p;
00384 char buf[256];
00385 struct sockaddr_storage my_addr;
00386 int len;
00387 int reuse_specified = 0;
00388
00389 h->is_streamed = 1;
00390 h->max_packet_size = 1472;
00391
00392 is_output = !(flags & AVIO_FLAG_READ);
00393
00394 s = av_mallocz(sizeof(UDPContext));
00395 if (!s)
00396 return AVERROR(ENOMEM);
00397
00398 h->priv_data = s;
00399 s->ttl = 16;
00400 s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
00401
00402 s->circular_buffer_size = 7*188*4096;
00403
00404 p = strchr(uri, '?');
00405 if (p) {
00406 if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
00407 char *endptr=NULL;
00408 s->reuse_socket = strtol(buf, &endptr, 10);
00409
00410 if (buf == endptr)
00411 s->reuse_socket = 1;
00412 reuse_specified = 1;
00413 }
00414 if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
00415 s->ttl = strtol(buf, NULL, 10);
00416 }
00417 if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
00418 s->local_port = strtol(buf, NULL, 10);
00419 }
00420 if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
00421 h->max_packet_size = strtol(buf, NULL, 10);
00422 }
00423 if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
00424 s->buffer_size = strtol(buf, NULL, 10);
00425 }
00426 if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
00427 s->is_connected = strtol(buf, NULL, 10);
00428 }
00429 if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
00430 s->circular_buffer_size = strtol(buf, NULL, 10)*188;
00431 }
00432 }
00433
00434
00435 av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
00436
00437
00438 if (hostname[0] == '\0' || hostname[0] == '?') {
00439
00440 if (!(flags & AVIO_FLAG_READ))
00441 goto fail;
00442 } else {
00443 if (ff_udp_set_remote_url(h, uri) < 0)
00444 goto fail;
00445 }
00446
00447 if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
00448 s->local_port = port;
00449 udp_fd = udp_socket_create(s, &my_addr, &len);
00450 if (udp_fd < 0)
00451 goto fail;
00452
00453
00454
00455
00456 if (s->reuse_socket || (s->is_multicast && !reuse_specified)) {
00457 s->reuse_socket = 1;
00458 if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0)
00459 goto fail;
00460 }
00461
00462
00463
00464 if (s->is_multicast && (h->flags & AVIO_FLAG_READ)) {
00465 bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
00466 }
00467
00468
00469 if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0)
00470 goto fail;
00471
00472 len = sizeof(my_addr);
00473 getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
00474 s->local_port = udp_port(&my_addr, len);
00475
00476 if (s->is_multicast) {
00477 if (!(h->flags & AVIO_FLAG_READ)) {
00478
00479 if (udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr) < 0)
00480 goto fail;
00481 } else {
00482
00483 if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr) < 0)
00484 goto fail;
00485 }
00486 }
00487
00488 if (is_output) {
00489
00490 tmp = s->buffer_size;
00491 if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
00492 av_log(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF): %s\n", strerror(errno));
00493 goto fail;
00494 }
00495 } else {
00496
00497
00498 tmp = s->buffer_size;
00499 if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
00500 av_log(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF): %s\n", strerror(errno));
00501 }
00502
00503 ff_socket_nonblock(udp_fd, 1);
00504 }
00505 if (s->is_connected) {
00506 if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
00507 av_log(h, AV_LOG_ERROR, "connect: %s\n", strerror(errno));
00508 goto fail;
00509 }
00510 }
00511
00512 s->udp_fd = udp_fd;
00513
00514 #if HAVE_PTHREADS
00515 if (!is_output && s->circular_buffer_size) {
00516
00517 s->fifo = av_fifo_alloc(s->circular_buffer_size);
00518 if (pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h)) {
00519 av_log(h, AV_LOG_ERROR, "pthread_create failed\n");
00520 goto fail;
00521 }
00522 }
00523 #endif
00524
00525 return 0;
00526 fail:
00527 if (udp_fd >= 0)
00528 closesocket(udp_fd);
00529 av_fifo_free(s->fifo);
00530 av_free(s);
00531 return AVERROR(EIO);
00532 }
00533
00534 static int udp_read(URLContext *h, uint8_t *buf, int size)
00535 {
00536 UDPContext *s = h->priv_data;
00537 int ret;
00538 int avail;
00539 fd_set rfds;
00540 struct timeval tv;
00541
00542 if (s->fifo) {
00543
00544 do {
00545 avail = av_fifo_size(s->fifo);
00546 if (avail) {
00547
00548
00549 size = FFMIN( avail, size);
00550 av_fifo_generic_read(s->fifo, buf, size, NULL);
00551 return size;
00552 }
00553 else {
00554 FD_ZERO(&rfds);
00555 FD_SET(s->udp_fd, &rfds);
00556 tv.tv_sec = 1;
00557 tv.tv_usec = 0;
00558 ret = select(s->udp_fd + 1, &rfds, NULL, NULL, &tv);
00559 if (ret<0)
00560 return ret;
00561 }
00562 } while( 1);
00563 }
00564
00565 if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
00566 ret = ff_network_wait_fd(s->udp_fd, 0);
00567 if (ret < 0)
00568 return ret;
00569 }
00570 ret = recv(s->udp_fd, buf, size, 0);
00571
00572 return ret < 0 ? ff_neterrno() : ret;
00573 }
00574
00575 static int udp_write(URLContext *h, const uint8_t *buf, int size)
00576 {
00577 UDPContext *s = h->priv_data;
00578 int ret;
00579
00580 if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
00581 ret = ff_network_wait_fd(s->udp_fd, 1);
00582 if (ret < 0)
00583 return ret;
00584 }
00585
00586 if (!s->is_connected) {
00587 ret = sendto (s->udp_fd, buf, size, 0,
00588 (struct sockaddr *) &s->dest_addr,
00589 s->dest_addr_len);
00590 } else
00591 ret = send(s->udp_fd, buf, size, 0);
00592
00593 return ret < 0 ? ff_neterrno() : ret;
00594 }
00595
00596 static int udp_close(URLContext *h)
00597 {
00598 UDPContext *s = h->priv_data;
00599
00600 if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
00601 udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr);
00602 closesocket(s->udp_fd);
00603 av_fifo_free(s->fifo);
00604 av_free(s);
00605 return 0;
00606 }
00607
00608 URLProtocol ff_udp_protocol = {
00609 .name = "udp",
00610 .url_open = udp_open,
00611 .url_read = udp_read,
00612 .url_write = udp_write,
00613 .url_close = udp_close,
00614 .url_get_file_handle = udp_get_file_handle,
00615 };