Mercurial > hg > nginx
diff src/stream/ngx_stream_proxy_module.c @ 6692:56fc55e32f23
Stream: filters.
author | Roman Arutyunyan <arut@nginx.com> |
---|---|
date | Thu, 15 Sep 2016 14:55:46 +0300 |
parents | c02290241cbe |
children | edcd9303a4d3 |
line wrap: on
line diff
--- a/src/stream/ngx_stream_proxy_module.c Thu Sep 15 14:56:26 2016 +0300 +++ b/src/stream/ngx_stream_proxy_module.c Thu Sep 15 14:55:46 2016 +0300 @@ -84,10 +84,10 @@ void *conf); static char *ngx_stream_proxy_bind(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); -static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s); #if (NGX_STREAM_SSL) +static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s); static char *ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static void ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s); @@ -385,8 +385,6 @@ } u->peer.type = c->type; - - u->proxy_protocol = pscf->proxy_protocol; u->start_sec = ngx_time(); c->write->handler = ngx_stream_proxy_downstream_handler; @@ -411,28 +409,6 @@ u->downstream_buf.pos = p; u->downstream_buf.last = p; - if (u->proxy_protocol -#if (NGX_STREAM_SSL) - && pscf->ssl == NULL -#endif - && pscf->buffer_size >= NGX_PROXY_PROTOCOL_MAX_HEADER) - { - /* optimization for a typical case */ - - ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, - "stream proxy send PROXY protocol header"); - - p = ngx_proxy_protocol_write(c, u->downstream_buf.last, - u->downstream_buf.end); - if (p == NULL) { - ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); - return; - } - - u->downstream_buf.last = p; - u->proxy_protocol = 0; - } - if (c->read->ready) { ngx_post_event(c->read, &ngx_posted_events); } @@ -682,8 +658,13 @@ c->log->action = "connecting to upstream"; + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + u = s->upstream; + u->connected = 0; + u->proxy_protocol = pscf->proxy_protocol; + if (u->state) { u->state->response_time = ngx_current_msec - u->state->response_time; } @@ -740,8 +721,6 @@ pc->read->handler = ngx_stream_proxy_connect_handler; pc->write->handler = ngx_stream_proxy_connect_handler; - pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); - ngx_add_timer(pc->write, pscf->connect_timeout); } @@ -751,6 +730,7 @@ { int tcp_nodelay; u_char *p; + ngx_chain_t *cl; ngx_connection_t *c, *pc; ngx_log_handler_pt handler; ngx_stream_upstream_t *u; @@ -782,21 +762,26 @@ pc->tcp_nodelay = NGX_TCP_NODELAY_SET; } - if (u->proxy_protocol) { - if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) { - return; - } - - u->proxy_protocol = 0; - } - pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); #if (NGX_STREAM_SSL) - if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) { - ngx_stream_proxy_ssl_init_connection(s); - return; + + if (pc->type == SOCK_STREAM && pscf->ssl) { + + if (u->proxy_protocol) { + if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) { + return; + } + + u->proxy_protocol = 0; + } + + if (pc->ssl == NULL) { + ngx_stream_proxy_ssl_init_connection(s); + return; + } } + #endif c = s->connection; @@ -838,14 +823,66 @@ u->upstream_buf.last = p; } - if (c->type == SOCK_DGRAM) { - s->received = c->buffer->last - c->buffer->pos; - u->downstream_buf = *c->buffer; - - if (pscf->responses == 0) { - pc->read->ready = 0; - pc->read->eof = 1; + if (c->buffer && c->buffer->pos < c->buffer->last) { + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, + "stream proxy add preread buffer: %uz", + c->buffer->last - c->buffer->pos); + + cl = ngx_chain_get_free_buf(c->pool, &u->free); + if (cl == NULL) { + ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } + + *cl->buf = *c->buffer; + + cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module; + cl->buf->flush = 1; + cl->buf->last_buf = (c->type == SOCK_DGRAM); + + cl->next = u->upstream_out; + u->upstream_out = cl; + } + + if (u->proxy_protocol) { + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, + "stream proxy add PROXY protocol header"); + + cl = ngx_chain_get_free_buf(c->pool, &u->free); + if (cl == NULL) { + ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; } + + p = ngx_pnalloc(c->pool, NGX_PROXY_PROTOCOL_MAX_HEADER); + if (p == NULL) { + ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } + + cl->buf->pos = p; + + p = ngx_proxy_protocol_write(c, p, p + NGX_PROXY_PROTOCOL_MAX_HEADER); + if (p == NULL) { + ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } + + cl->buf->last = p; + cl->buf->temporary = 1; + cl->buf->flush = 0; + cl->buf->last_buf = 0; + cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module; + + cl->next = u->upstream_out; + u->upstream_out = cl; + + u->proxy_protocol = 0; + } + + if (c->type == SOCK_DGRAM && pscf->responses == 0) { + pc->read->ready = 0; + pc->read->eof = 1; } u->connected = 1; @@ -861,6 +898,8 @@ } +#if (NGX_STREAM_SSL) + static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s) { @@ -931,8 +970,6 @@ } -#if (NGX_STREAM_SSL) - static char * ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) @@ -1412,8 +1449,10 @@ size_t size, limit_rate; ssize_t n; ngx_buf_t *b; + ngx_int_t rc; ngx_uint_t flags; ngx_msec_t delay; + ngx_chain_t *cl, **ll, **out, **busy; ngx_connection_t *c, *pc, *src, *dst; ngx_log_handler_pt handler; ngx_stream_upstream_t *u; @@ -1447,6 +1486,8 @@ b = &u->upstream_buf; limit_rate = pscf->download_rate; received = &u->received; + out = &u->downstream_out; + busy = &u->downstream_busy; } else { src = c; @@ -1454,24 +1495,18 @@ b = &u->downstream_buf; limit_rate = pscf->upload_rate; received = &s->received; + out = &u->upstream_out; + busy = &u->upstream_busy; } for ( ;; ) { - if (do_write) { - - size = b->last - b->pos; - - if (size && dst && dst->write->ready) { - - n = dst->send(dst, b->pos, size); - - if (n == NGX_AGAIN && dst->shared) { - /* cannot wait on a shared socket */ - n = NGX_ERROR; - } - - if (n == NGX_ERROR) { + if (do_write && dst) { + + if (*out || *busy || dst->buffered) { + rc = ngx_stream_top_filter(s, *out, from_upstream); + + if (rc == NGX_ERROR) { if (c->type == SOCK_DGRAM && !from_upstream) { ngx_stream_proxy_next_upstream(s); return; @@ -1481,13 +1516,12 @@ return; } - if (n > 0) { - b->pos += n; - - if (b->pos == b->last) { - b->pos = b->start; - b->last = b->start; - } + ngx_chain_update_chains(c->pool, &u->free, busy, out, + (ngx_buf_tag_t) &ngx_stream_proxy_module); + + if (*busy == NULL) { + b->pos = b->start; + b->last = b->start; } } } @@ -1514,11 +1548,21 @@ n = src->recv(src, b->last, size); - if (n == NGX_AGAIN || n == 0) { + if (n == NGX_AGAIN) { break; } - if (n > 0) { + if (n == NGX_ERROR) { + if (c->type == SOCK_DGRAM && u->received == 0) { + ngx_stream_proxy_next_upstream(s); + return; + } + + src->read->eof = 1; + n = 0; + } + + if (n >= 0) { if (limit_rate) { delay = (ngx_msec_t) (n * 1000 / limit_rate); @@ -1541,27 +1585,37 @@ src->read->eof = 1; } + for (ll = out; *ll; ll = &(*ll)->next) { /* void */ } + + cl = ngx_chain_get_free_buf(c->pool, &u->free); + if (cl == NULL) { + ngx_stream_proxy_finalize(s, + NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } + + *ll = cl; + + cl->buf->pos = b->last; + cl->buf->last = b->last + n; + cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module; + + cl->buf->temporary = (n ? 1 : 0); + cl->buf->last_buf = src->read->eof; + cl->buf->flush = 1; + *received += n; b->last += n; do_write = 1; continue; } - - if (n == NGX_ERROR) { - if (c->type == SOCK_DGRAM && u->received == 0) { - ngx_stream_proxy_next_upstream(s); - return; - } - - src->read->eof = 1; - } } break; } - if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) { + if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) { handler = c->log->handler; c->log->handler = NULL; @@ -1614,6 +1668,14 @@ "stream proxy next upstream"); u = s->upstream; + pc = u->peer.connection; + + if (u->upstream_out || u->upstream_busy || (pc && pc->buffered)) { + ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, + "pending buffers on next upstream"); + ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR); + return; + } if (u->peer.sockaddr) { u->peer.free(&u->peer, u->peer.data, NGX_PEER_FAILED); @@ -1632,8 +1694,6 @@ return; } - pc = u->peer.connection; - if (pc) { ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, "close proxy upstream connection: %d", pc->fd);