Mercurial > hg > nginx
comparison src/event/ngx_event_udp.c @ 7286:d27aa9060c95
Stream: udp streams.
Previously, only one client packet could be processed in a udp stream session
even though multiple response packets were supported. Now multiple packets
coming from the same client address and port are delivered to the same stream
session.
If it's required to maintain a single stream of data, nginx should be
configured in a way that all packets from a client are delivered to the same
worker. On Linux and DragonFly BSD the "reuseport" parameter should be
specified for this. Other systems do not currently provide appropriate
mechanisms. For these systems a single stream of udp packets is only
guaranteed in single-worker configurations.
The proxy_response directive now specifies how many packets are expected in
response to a single client packet.
author | Roman Arutyunyan <arut@nginx.com> |
---|---|
date | Mon, 04 Jun 2018 19:50:00 +0300 |
parents | 88a624c9b491 |
children | 27559d4a5151 |
comparison
equal
deleted
inserted
replaced
7285:88a624c9b491 | 7286:d27aa9060c95 |
---|---|
10 #include <ngx_event.h> | 10 #include <ngx_event.h> |
11 | 11 |
12 | 12 |
13 #if !(NGX_WIN32) | 13 #if !(NGX_WIN32) |
14 | 14 |
15 struct ngx_udp_connection_s { | |
16 ngx_rbtree_node_t node; | |
17 ngx_connection_t *connection; | |
18 ngx_buf_t *buffer; | |
19 }; | |
20 | |
21 | |
15 static void ngx_close_accepted_udp_connection(ngx_connection_t *c); | 22 static void ngx_close_accepted_udp_connection(ngx_connection_t *c); |
23 static ssize_t ngx_udp_shared_recv(ngx_connection_t *c, u_char *buf, | |
24 size_t size); | |
25 static ngx_int_t ngx_insert_udp_connection(ngx_connection_t *c); | |
26 static void ngx_delete_udp_connection(void *data); | |
27 static ngx_connection_t *ngx_lookup_udp_connection(ngx_listening_t *ls, | |
28 struct sockaddr *sockaddr, socklen_t socklen, | |
29 struct sockaddr *local_sockaddr, socklen_t local_socklen); | |
16 | 30 |
17 | 31 |
18 void | 32 void |
19 ngx_event_recvmsg(ngx_event_t *ev) | 33 ngx_event_recvmsg(ngx_event_t *ev) |
20 { | 34 { |
21 ssize_t n; | 35 ssize_t n; |
36 ngx_buf_t buf; | |
22 ngx_log_t *log; | 37 ngx_log_t *log; |
23 ngx_err_t err; | 38 ngx_err_t err; |
24 socklen_t socklen, local_socklen; | 39 socklen_t socklen, local_socklen; |
25 ngx_event_t *rev, *wev; | 40 ngx_event_t *rev, *wev; |
26 struct iovec iov[1]; | 41 struct iovec iov[1]; |
213 } | 228 } |
214 } | 229 } |
215 | 230 |
216 #endif | 231 #endif |
217 | 232 |
233 c = ngx_lookup_udp_connection(ls, sockaddr, socklen, local_sockaddr, | |
234 local_socklen); | |
235 | |
236 if (c) { | |
237 | |
238 #if (NGX_DEBUG) | |
239 if (c->log->log_level & NGX_LOG_DEBUG_EVENT) { | |
240 ngx_log_handler_pt handler; | |
241 | |
242 handler = c->log->handler; | |
243 c->log->handler = NULL; | |
244 | |
245 ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, | |
246 "recvmsg: fd:%d n:%z", c->fd, n); | |
247 | |
248 c->log->handler = handler; | |
249 } | |
250 #endif | |
251 | |
252 ngx_memzero(&buf, sizeof(ngx_buf_t)); | |
253 | |
254 buf.pos = buffer; | |
255 buf.last = buffer + n; | |
256 | |
257 rev = c->read; | |
258 | |
259 c->udp->buffer = &buf; | |
260 rev->ready = 1; | |
261 | |
262 rev->handler(rev); | |
263 | |
264 c->udp->buffer = NULL; | |
265 rev->ready = 0; | |
266 | |
267 goto next; | |
268 } | |
269 | |
218 #if (NGX_STAT_STUB) | 270 #if (NGX_STAT_STUB) |
219 (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1); | 271 (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1); |
220 #endif | 272 #endif |
221 | 273 |
222 ngx_accept_disabled = ngx_cycle->connection_n / 8 | 274 ngx_accept_disabled = ngx_cycle->connection_n / 8 |
255 return; | 307 return; |
256 } | 308 } |
257 | 309 |
258 *log = ls->log; | 310 *log = ls->log; |
259 | 311 |
312 c->recv = ngx_udp_shared_recv; | |
260 c->send = ngx_udp_send; | 313 c->send = ngx_udp_send; |
261 c->send_chain = ngx_udp_send_chain; | 314 c->send_chain = ngx_udp_send_chain; |
262 | 315 |
263 c->log = log; | 316 c->log = log; |
264 c->pool->log = log; | 317 c->pool->log = log; |
342 } | 395 } |
343 | 396 |
344 } | 397 } |
345 #endif | 398 #endif |
346 | 399 |
400 if (ngx_insert_udp_connection(c) != NGX_OK) { | |
401 ngx_close_accepted_udp_connection(c); | |
402 return; | |
403 } | |
404 | |
347 log->data = NULL; | 405 log->data = NULL; |
348 log->handler = NULL; | 406 log->handler = NULL; |
349 | 407 |
350 ls->handler(c); | 408 ls->handler(c); |
409 | |
410 next: | |
351 | 411 |
352 if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { | 412 if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { |
353 ev->available -= n; | 413 ev->available -= n; |
354 } | 414 } |
355 | 415 |
371 #if (NGX_STAT_STUB) | 431 #if (NGX_STAT_STUB) |
372 (void) ngx_atomic_fetch_add(ngx_stat_active, -1); | 432 (void) ngx_atomic_fetch_add(ngx_stat_active, -1); |
373 #endif | 433 #endif |
374 } | 434 } |
375 | 435 |
376 #endif | 436 |
437 static ssize_t | |
438 ngx_udp_shared_recv(ngx_connection_t *c, u_char *buf, size_t size) | |
439 { | |
440 ssize_t n; | |
441 ngx_buf_t *b; | |
442 | |
443 if (c->udp == NULL || c->udp->buffer == NULL) { | |
444 return NGX_AGAIN; | |
445 } | |
446 | |
447 b = c->udp->buffer; | |
448 | |
449 n = ngx_min(b->last - b->pos, (ssize_t) size); | |
450 | |
451 ngx_memcpy(buf, b->pos, n); | |
452 | |
453 c->udp->buffer = NULL; | |
454 c->read->ready = 0; | |
455 | |
456 return n; | |
457 } | |
458 | |
459 | |
460 void | |
461 ngx_udp_rbtree_insert_value(ngx_rbtree_node_t *temp, | |
462 ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel) | |
463 { | |
464 ngx_int_t rc; | |
465 ngx_connection_t *c, *ct; | |
466 ngx_rbtree_node_t **p; | |
467 ngx_udp_connection_t *udp, *udpt; | |
468 | |
469 for ( ;; ) { | |
470 | |
471 if (node->key < temp->key) { | |
472 | |
473 p = &temp->left; | |
474 | |
475 } else if (node->key > temp->key) { | |
476 | |
477 p = &temp->right; | |
478 | |
479 } else { /* node->key == temp->key */ | |
480 | |
481 udp = (ngx_udp_connection_t *) node; | |
482 c = udp->connection; | |
483 | |
484 udpt = (ngx_udp_connection_t *) temp; | |
485 ct = udpt->connection; | |
486 | |
487 rc = ngx_cmp_sockaddr(c->sockaddr, c->socklen, | |
488 ct->sockaddr, ct->socklen, 1); | |
489 | |
490 if (rc == 0 && c->listening->wildcard) { | |
491 rc = ngx_cmp_sockaddr(c->local_sockaddr, c->local_socklen, | |
492 ct->local_sockaddr, ct->local_socklen, 1); | |
493 } | |
494 | |
495 p = (rc < 0) ? &temp->left : &temp->right; | |
496 } | |
497 | |
498 if (*p == sentinel) { | |
499 break; | |
500 } | |
501 | |
502 temp = *p; | |
503 } | |
504 | |
505 *p = node; | |
506 node->parent = temp; | |
507 node->left = sentinel; | |
508 node->right = sentinel; | |
509 ngx_rbt_red(node); | |
510 } | |
511 | |
512 | |
513 static ngx_int_t | |
514 ngx_insert_udp_connection(ngx_connection_t *c) | |
515 { | |
516 uint32_t hash; | |
517 ngx_pool_cleanup_t *cln; | |
518 ngx_udp_connection_t *udp; | |
519 | |
520 if (c->udp) { | |
521 return NGX_OK; | |
522 } | |
523 | |
524 udp = ngx_pcalloc(c->pool, sizeof(ngx_udp_connection_t)); | |
525 if (udp == NULL) { | |
526 return NGX_ERROR; | |
527 } | |
528 | |
529 udp->connection = c; | |
530 | |
531 ngx_crc32_init(hash); | |
532 ngx_crc32_update(&hash, (u_char *) c->sockaddr, c->socklen); | |
533 | |
534 if (c->listening->wildcard) { | |
535 ngx_crc32_update(&hash, (u_char *) c->local_sockaddr, c->local_socklen); | |
536 } | |
537 | |
538 ngx_crc32_final(hash); | |
539 | |
540 udp->node.key = hash; | |
541 | |
542 cln = ngx_pool_cleanup_add(c->pool, 0); | |
543 if (cln == NULL) { | |
544 return NGX_ERROR; | |
545 } | |
546 | |
547 cln->data = c; | |
548 cln->handler = ngx_delete_udp_connection; | |
549 | |
550 ngx_rbtree_insert(&c->listening->rbtree, &udp->node); | |
551 | |
552 c->udp = udp; | |
553 | |
554 return NGX_OK; | |
555 } | |
556 | |
557 | |
558 static void | |
559 ngx_delete_udp_connection(void *data) | |
560 { | |
561 ngx_connection_t *c = data; | |
562 | |
563 ngx_rbtree_delete(&c->listening->rbtree, &c->udp->node); | |
564 } | |
565 | |
566 | |
567 static ngx_connection_t * | |
568 ngx_lookup_udp_connection(ngx_listening_t *ls, struct sockaddr *sockaddr, | |
569 socklen_t socklen, struct sockaddr *local_sockaddr, socklen_t local_socklen) | |
570 { | |
571 uint32_t hash; | |
572 ngx_int_t rc; | |
573 ngx_connection_t *c; | |
574 ngx_rbtree_node_t *node, *sentinel; | |
575 ngx_udp_connection_t *udp; | |
576 | |
577 #if (NGX_HAVE_UNIX_DOMAIN) | |
578 | |
579 if (sockaddr->sa_family == AF_UNIX) { | |
580 struct sockaddr_un *saun = (struct sockaddr_un *) sockaddr; | |
581 | |
582 if (socklen <= (socklen_t) offsetof(struct sockaddr_un, sun_path) | |
583 || saun->sun_path[0] == '\0') | |
584 { | |
585 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ngx_cycle->log, 0, | |
586 "unbound unix socket"); | |
587 return NULL; | |
588 } | |
589 } | |
590 | |
591 #endif | |
592 | |
593 node = ls->rbtree.root; | |
594 sentinel = ls->rbtree.sentinel; | |
595 | |
596 ngx_crc32_init(hash); | |
597 ngx_crc32_update(&hash, (u_char *) sockaddr, socklen); | |
598 | |
599 if (ls->wildcard) { | |
600 ngx_crc32_update(&hash, (u_char *) local_sockaddr, local_socklen); | |
601 } | |
602 | |
603 ngx_crc32_final(hash); | |
604 | |
605 while (node != sentinel) { | |
606 | |
607 if (hash < node->key) { | |
608 node = node->left; | |
609 continue; | |
610 } | |
611 | |
612 if (hash > node->key) { | |
613 node = node->right; | |
614 continue; | |
615 } | |
616 | |
617 /* hash == node->key */ | |
618 | |
619 udp = (ngx_udp_connection_t *) node; | |
620 | |
621 c = udp->connection; | |
622 | |
623 rc = ngx_cmp_sockaddr(sockaddr, socklen, | |
624 c->sockaddr, c->socklen, 1); | |
625 | |
626 if (rc == 0 && ls->wildcard) { | |
627 rc = ngx_cmp_sockaddr(local_sockaddr, local_socklen, | |
628 c->local_sockaddr, c->local_socklen, 1); | |
629 } | |
630 | |
631 if (rc == 0) { | |
632 return c; | |
633 } | |
634 | |
635 node = (rc < 0) ? node->left : node->right; | |
636 } | |
637 | |
638 return NULL; | |
639 } | |
640 | |
641 #endif |