Mercurial > hg > nginx
view src/event/ngx_event_accept.c @ 6959:7fcf209d40c8
Limit req: fixed delaying subrequests.
Since limit_req uses connection's write event to delay request processing,
it can conflict with timers in other subrequests. In particular, even
if applied to an active subrequest, it can break things if wev->delayed
is already set (due to limit_rate or sendfile_max_chunk), since after
limit_req finishes the wev->delayed flag will be set and no timer will be
active.
Fix is to use the wev->delayed flag in limit_req as well. This ensures that
wev->delayed won't be set after limit_req finishes, and also ensures that
limit_req's timers will be properly handled by other subrequests if the one
delayed by limit_req is not active.
author | Maxim Dounin <mdounin@mdounin.ru> |
---|---|
date | Sun, 02 Apr 2017 14:32:26 +0300 |
parents | 56fc55e32f23 |
children | 3e2d90073adf |
line wrap: on
line source
/* * Copyright (C) Igor Sysoev * Copyright (C) Nginx, Inc. */ #include <ngx_config.h> #include <ngx_core.h> #include <ngx_event.h> static ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle); static ngx_int_t ngx_disable_accept_events(ngx_cycle_t *cycle, ngx_uint_t all); static void ngx_close_accepted_connection(ngx_connection_t *c); #if (NGX_DEBUG) static void ngx_debug_accepted_connection(ngx_event_conf_t *ecf, ngx_connection_t *c); #endif void ngx_event_accept(ngx_event_t *ev) { socklen_t socklen; ngx_err_t err; ngx_log_t *log; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_sockaddr_t sa; ngx_listening_t *ls; ngx_connection_t *c, *lc; ngx_event_conf_t *ecf; #if (NGX_HAVE_ACCEPT4) static ngx_uint_t use_accept4 = 1; #endif if (ev->timedout) { if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) { return; } ev->timedout = 0; } ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) { ev->available = ecf->multi_accept; } lc = ev->data; ls = lc->listening; ev->ready = 0; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, "accept on %V, ready: %d", &ls->addr_text, ev->available); do { socklen = sizeof(ngx_sockaddr_t); #if (NGX_HAVE_ACCEPT4) if (use_accept4) { s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK); } else { s = accept(lc->fd, &sa.sockaddr, &socklen); } #else s = accept(lc->fd, &sa.sockaddr, &socklen); #endif if (s == (ngx_socket_t) -1) { err = ngx_socket_errno; if (err == NGX_EAGAIN) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err, "accept() not ready"); return; } level = NGX_LOG_ALERT; if (err == NGX_ECONNABORTED) { level = NGX_LOG_ERR; } else if (err == NGX_EMFILE || err == NGX_ENFILE) { level = NGX_LOG_CRIT; } #if (NGX_HAVE_ACCEPT4) ngx_log_error(level, ev->log, err, use_accept4 ? "accept4() failed" : "accept() failed"); if (use_accept4 && err == NGX_ENOSYS) { use_accept4 = 0; ngx_inherited_nonblocking = 0; continue; } #else ngx_log_error(level, ev->log, err, "accept() failed"); #endif if (err == NGX_ECONNABORTED) { if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } if (ev->available) { continue; } } if (err == NGX_EMFILE || err == NGX_ENFILE) { if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle, 1) != NGX_OK) { return; } if (ngx_use_accept_mutex) { if (ngx_accept_mutex_held) { ngx_shmtx_unlock(&ngx_accept_mutex); ngx_accept_mutex_held = 0; } ngx_accept_disabled = 1; } else { ngx_add_timer(ev, ecf->accept_mutex_delay); } } return; } #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1); #endif ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; c = ngx_get_connection(s, ev->log); if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_close_socket_n " failed"); } return; } c->type = SOCK_STREAM; #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_active, 1); #endif c->pool = ngx_create_pool(ls->pool_size, ev->log); if (c->pool == NULL) { ngx_close_accepted_connection(c); return; } c->sockaddr = ngx_palloc(c->pool, socklen); if (c->sockaddr == NULL) { ngx_close_accepted_connection(c); return; } ngx_memcpy(c->sockaddr, &sa, socklen); log = ngx_palloc(c->pool, sizeof(ngx_log_t)); if (log == NULL) { ngx_close_accepted_connection(c); return; } /* set a blocking mode for iocp and non-blocking mode for others */ if (ngx_inherited_nonblocking) { if (ngx_event_flags & NGX_USE_IOCP_EVENT) { if (ngx_blocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_blocking_n " failed"); ngx_close_accepted_connection(c); return; } } } else { if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) { if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno, ngx_nonblocking_n " failed"); ngx_close_accepted_connection(c); return; } } } *log = ls->log; c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->log = log; c->pool->log = log; c->socklen = socklen; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen; #if (NGX_HAVE_UNIX_DOMAIN) if (c->sockaddr->sa_family == AF_UNIX) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED; #if (NGX_SOLARIS) /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */ c->sendfile = 0; #endif } #endif rev = c->read; wev = c->write; wev->ready = 1; if (ngx_event_flags & NGX_USE_IOCP_EVENT) { rev->ready = 1; } if (ev->deferred_accept) { rev->ready = 1; #if (NGX_HAVE_KQUEUE) rev->available = 1; #endif } rev->log = log; wev->log = log; /* * TODO: MT: - ngx_atomic_fetch_add() * or protection by critical section or light mutex * * TODO: MP: - allocated in a shared memory * - ngx_atomic_fetch_add() * or protection by critical section or light mutex */ c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_handled, 1); #endif if (ls->addr_ntop) { c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); if (c->addr_text.data == NULL) { ngx_close_accepted_connection(c); return; } c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, c->addr_text.data, ls->addr_text_max_len, 0); if (c->addr_text.len == 0) { ngx_close_accepted_connection(c); return; } } #if (NGX_DEBUG) { ngx_str_t addr; u_char text[NGX_SOCKADDR_STRLEN]; ngx_debug_accepted_connection(ecf, c); if (log->log_level & NGX_LOG_DEBUG_EVENT) { addr.data = text; addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text, NGX_SOCKADDR_STRLEN, 1); ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0, "*%uA accept: %V fd:%d", c->number, &addr, s); } } #endif if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) { if (ngx_add_conn(c) == NGX_ERROR) { ngx_close_accepted_connection(c); return; } } log->data = NULL; log->handler = NULL; ls->handler(c); if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available--; } } while (ev->available); } #if !(NGX_WIN32) void ngx_event_recvmsg(ngx_event_t *ev) { ssize_t n; ngx_log_t *log; ngx_err_t err; ngx_event_t *rev, *wev; struct iovec iov[1]; struct msghdr msg; ngx_sockaddr_t sa; ngx_listening_t *ls; ngx_event_conf_t *ecf; ngx_connection_t *c, *lc; static u_char buffer[65535]; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) #if (NGX_HAVE_IP_RECVDSTADDR) u_char msg_control[CMSG_SPACE(sizeof(struct in_addr))]; #elif (NGX_HAVE_IP_PKTINFO) u_char msg_control[CMSG_SPACE(sizeof(struct in_pktinfo))]; #endif #if (NGX_HAVE_INET6 && NGX_HAVE_IPV6_RECVPKTINFO) u_char msg_control6[CMSG_SPACE(sizeof(struct in6_pktinfo))]; #endif #endif if (ev->timedout) { if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) { return; } ev->timedout = 0; } ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) { ev->available = ecf->multi_accept; } lc = ev->data; ls = lc->listening; ev->ready = 0; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, "recvmsg on %V, ready: %d", &ls->addr_text, ev->available); do { ngx_memzero(&msg, sizeof(struct msghdr)); iov[0].iov_base = (void *) buffer; iov[0].iov_len = sizeof(buffer); msg.msg_name = &sa; msg.msg_namelen = sizeof(ngx_sockaddr_t); msg.msg_iov = iov; msg.msg_iovlen = 1; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) if (ls->wildcard) { #if (NGX_HAVE_IP_RECVDSTADDR || NGX_HAVE_IP_PKTINFO) if (ls->sockaddr->sa_family == AF_INET) { msg.msg_control = &msg_control; msg.msg_controllen = sizeof(msg_control); } #endif #if (NGX_HAVE_INET6 && NGX_HAVE_IPV6_RECVPKTINFO) if (ls->sockaddr->sa_family == AF_INET6) { msg.msg_control = &msg_control6; msg.msg_controllen = sizeof(msg_control6); } #endif } #endif n = recvmsg(lc->fd, &msg, 0); if (n == -1) { err = ngx_socket_errno; if (err == NGX_EAGAIN) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err, "recvmsg() not ready"); return; } ngx_log_error(NGX_LOG_ALERT, ev->log, err, "recvmsg() failed"); return; } #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1); #endif #if (NGX_HAVE_MSGHDR_MSG_CONTROL) if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) { ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "recvmsg() truncated data"); continue; } #endif ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n; c = ngx_get_connection(lc->fd, ev->log); if (c == NULL) { return; } c->shared = 1; c->type = SOCK_DGRAM; c->socklen = msg.msg_namelen; #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_active, 1); #endif c->pool = ngx_create_pool(ls->pool_size, ev->log); if (c->pool == NULL) { ngx_close_accepted_connection(c); return; } c->sockaddr = ngx_palloc(c->pool, c->socklen); if (c->sockaddr == NULL) { ngx_close_accepted_connection(c); return; } ngx_memcpy(c->sockaddr, msg.msg_name, c->socklen); log = ngx_palloc(c->pool, sizeof(ngx_log_t)); if (log == NULL) { ngx_close_accepted_connection(c); return; } *log = ls->log; c->send = ngx_udp_send; c->send_chain = ngx_udp_send_chain; c->log = log; c->pool->log = log; c->listening = ls; c->local_sockaddr = ls->sockaddr; c->local_socklen = ls->socklen; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) if (ls->wildcard) { struct cmsghdr *cmsg; struct sockaddr *sockaddr; sockaddr = ngx_palloc(c->pool, c->local_socklen); if (sockaddr == NULL) { ngx_close_accepted_connection(c); return; } ngx_memcpy(sockaddr, c->local_sockaddr, c->local_socklen); c->local_sockaddr = sockaddr; for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) { #if (NGX_HAVE_IP_RECVDSTADDR) if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_RECVDSTADDR && sockaddr->sa_family == AF_INET) { struct in_addr *addr; struct sockaddr_in *sin; addr = (struct in_addr *) CMSG_DATA(cmsg); sin = (struct sockaddr_in *) sockaddr; sin->sin_addr = *addr; break; } #elif (NGX_HAVE_IP_PKTINFO) if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO && sockaddr->sa_family == AF_INET) { struct in_pktinfo *pkt; struct sockaddr_in *sin; pkt = (struct in_pktinfo *) CMSG_DATA(cmsg); sin = (struct sockaddr_in *) sockaddr; sin->sin_addr = pkt->ipi_addr; break; } #endif #if (NGX_HAVE_INET6 && NGX_HAVE_IPV6_RECVPKTINFO) if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO && sockaddr->sa_family == AF_INET6) { struct in6_pktinfo *pkt6; struct sockaddr_in6 *sin6; pkt6 = (struct in6_pktinfo *) CMSG_DATA(cmsg); sin6 = (struct sockaddr_in6 *) sockaddr; sin6->sin6_addr = pkt6->ipi6_addr; break; } #endif } } #endif c->buffer = ngx_create_temp_buf(c->pool, n); if (c->buffer == NULL) { ngx_close_accepted_connection(c); return; } c->buffer->last = ngx_cpymem(c->buffer->last, buffer, n); rev = c->read; wev = c->write; wev->ready = 1; rev->log = log; wev->log = log; /* * TODO: MT: - ngx_atomic_fetch_add() * or protection by critical section or light mutex * * TODO: MP: - allocated in a shared memory * - ngx_atomic_fetch_add() * or protection by critical section or light mutex */ c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_handled, 1); #endif if (ls->addr_ntop) { c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); if (c->addr_text.data == NULL) { ngx_close_accepted_connection(c); return; } c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, c->addr_text.data, ls->addr_text_max_len, 0); if (c->addr_text.len == 0) { ngx_close_accepted_connection(c); return; } } #if (NGX_DEBUG) { ngx_str_t addr; u_char text[NGX_SOCKADDR_STRLEN]; ngx_debug_accepted_connection(ecf, c); if (log->log_level & NGX_LOG_DEBUG_EVENT) { addr.data = text; addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text, NGX_SOCKADDR_STRLEN, 1); ngx_log_debug4(NGX_LOG_DEBUG_EVENT, log, 0, "*%uA recvmsg: %V fd:%d n:%z", c->number, &addr, c->fd, n); } } #endif log->data = NULL; log->handler = NULL; ls->handler(c); if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available -= n; } } while (ev->available); } #endif ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) { if (ngx_shmtx_trylock(&ngx_accept_mutex)) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex locked"); if (ngx_accept_mutex_held && ngx_accept_events == 0) { return NGX_OK; } if (ngx_enable_accept_events(cycle) == NGX_ERROR) { ngx_shmtx_unlock(&ngx_accept_mutex); return NGX_ERROR; } ngx_accept_events = 0; ngx_accept_mutex_held = 1; return NGX_OK; } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex lock failed: %ui", ngx_accept_mutex_held); if (ngx_accept_mutex_held) { if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) { return NGX_ERROR; } ngx_accept_mutex_held = 0; } return NGX_OK; } static ngx_int_t ngx_enable_accept_events(ngx_cycle_t *cycle) { ngx_uint_t i; ngx_listening_t *ls; ngx_connection_t *c; ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { c = ls[i].connection; if (c == NULL || c->read->active) { continue; } if (ngx_add_event(c->read, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ERROR; } } return NGX_OK; } static ngx_int_t ngx_disable_accept_events(ngx_cycle_t *cycle, ngx_uint_t all) { ngx_uint_t i; ngx_listening_t *ls; ngx_connection_t *c; ls = cycle->listening.elts; for (i = 0; i < cycle->listening.nelts; i++) { c = ls[i].connection; if (c == NULL || !c->read->active) { continue; } #if (NGX_HAVE_REUSEPORT) /* * do not disable accept on worker's own sockets * when disabling accept events due to accept mutex */ if (ls[i].reuseport && !all) { continue; } #endif if (ngx_del_event(c->read, NGX_READ_EVENT, NGX_DISABLE_EVENT) == NGX_ERROR) { return NGX_ERROR; } } return NGX_OK; } static void ngx_close_accepted_connection(ngx_connection_t *c) { ngx_socket_t fd; ngx_free_connection(c); fd = c->fd; c->fd = (ngx_socket_t) -1; if (!c->shared && ngx_close_socket(fd) == -1) { ngx_log_error(NGX_LOG_ALERT, c->log, ngx_socket_errno, ngx_close_socket_n " failed"); } if (c->pool) { ngx_destroy_pool(c->pool); } #if (NGX_STAT_STUB) (void) ngx_atomic_fetch_add(ngx_stat_active, -1); #endif } u_char * ngx_accept_log_error(ngx_log_t *log, u_char *buf, size_t len) { return ngx_snprintf(buf, len, " while accepting new connection on %V", log->data); } #if (NGX_DEBUG) static void ngx_debug_accepted_connection(ngx_event_conf_t *ecf, ngx_connection_t *c) { struct sockaddr_in *sin; ngx_cidr_t *cidr; ngx_uint_t i; #if (NGX_HAVE_INET6) struct sockaddr_in6 *sin6; ngx_uint_t n; #endif cidr = ecf->debug_connection.elts; for (i = 0; i < ecf->debug_connection.nelts; i++) { if (cidr[i].family != (ngx_uint_t) c->sockaddr->sa_family) { goto next; } switch (cidr[i].family) { #if (NGX_HAVE_INET6) case AF_INET6: sin6 = (struct sockaddr_in6 *) c->sockaddr; for (n = 0; n < 16; n++) { if ((sin6->sin6_addr.s6_addr[n] & cidr[i].u.in6.mask.s6_addr[n]) != cidr[i].u.in6.addr.s6_addr[n]) { goto next; } } break; #endif #if (NGX_HAVE_UNIX_DOMAIN) case AF_UNIX: break; #endif default: /* AF_INET */ sin = (struct sockaddr_in *) c->sockaddr; if ((sin->sin_addr.s_addr & cidr[i].u.in.mask) != cidr[i].u.in.addr) { goto next; } break; } c->log->log_level = NGX_LOG_DEBUG_CONNECTION|NGX_LOG_DEBUG_ALL; break; next: continue; } } #endif