# HG changeset patch # User Roman Arutyunyan # Date 1435079868 -10800 # Node ID 24488e6db782e24b9a30ba31f0b719204e582918 # Parent abee77018d3aa4bbbecc7d637fd99868f4b4db6f Stream: upstream and downstream limit rates. diff -r abee77018d3a -r 24488e6db782 src/stream/ngx_stream_proxy_module.c --- a/src/stream/ngx_stream_proxy_module.c Tue Jun 23 20:17:47 2015 +0300 +++ b/src/stream/ngx_stream_proxy_module.c Tue Jun 23 20:17:48 2015 +0300 @@ -18,7 +18,9 @@ ngx_msec_t timeout; ngx_msec_t next_upstream_timeout; size_t downstream_buf_size; + size_t downstream_limit_rate; size_t upstream_buf_size; + size_t upstream_limit_rate; ngx_uint_t next_upstream_tries; ngx_flag_t next_upstream; ngx_flag_t proxy_protocol; @@ -132,6 +134,13 @@ offsetof(ngx_stream_proxy_srv_conf_t, downstream_buf_size), NULL }, + { ngx_string("proxy_downstream_limit_rate"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, downstream_limit_rate), + NULL }, + { ngx_string("proxy_upstream_buffer"), NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, ngx_conf_set_size_slot, @@ -139,6 +148,13 @@ offsetof(ngx_stream_proxy_srv_conf_t, upstream_buf_size), NULL }, + { ngx_string("proxy_upstream_limit_rate"), + NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_STREAM_SRV_CONF_OFFSET, + offsetof(ngx_stream_proxy_srv_conf_t, upstream_limit_rate), + NULL }, + { ngx_string("proxy_next_upstream"), NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG, ngx_conf_set_flag_slot, @@ -340,6 +356,7 @@ } u->proxy_protocol = pscf->proxy_protocol; + u->start_sec = ngx_time(); p = ngx_pnalloc(c->pool, pscf->downstream_buf_size); if (p == NULL) { @@ -831,17 +848,56 @@ static void ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream) { - ngx_connection_t *c; - ngx_stream_session_t *s; - ngx_stream_upstream_t *u; + ngx_connection_t *c, *pc; + ngx_stream_session_t *s; + ngx_stream_upstream_t *u; + ngx_stream_proxy_srv_conf_t *pscf; c = ev->data; s = c->data; u = s->upstream; if (ev->timedout) { - ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out"); - ngx_stream_proxy_finalize(s, NGX_DECLINED); + + if (ev->delayed) { + + ev->timedout = 0; + ev->delayed = 0; + + if (!ev->ready) { + if (ngx_handle_read_event(ev, 0) != NGX_OK) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + return; + } + + if (u->upstream_buf.start) { + pc = u->peer.connection; + + if (!c->read->delayed && !pc->read->delayed) { + pscf = ngx_stream_get_module_srv_conf(s, + ngx_stream_proxy_module); + ngx_add_timer(c->write, pscf->timeout); + } + } + + return; + } + + } else { + ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out"); + ngx_stream_proxy_finalize(s, NGX_DECLINED); + return; + } + + } else if (ev->delayed) { + + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, + "stream connection delayed"); + + if (ngx_handle_read_event(ev, 0) != NGX_OK) { + ngx_stream_proxy_finalize(s, NGX_ERROR); + } + return; } @@ -930,10 +986,12 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream, ngx_uint_t do_write) { - size_t size; + off_t *received, limit; + size_t size, limit_rate; ssize_t n; ngx_buf_t *b; ngx_uint_t flags; + ngx_msec_t delay; ngx_connection_t *c, *pc, *src, *dst; ngx_log_handler_pt handler; ngx_stream_upstream_t *u; @@ -944,15 +1002,21 @@ c = s->connection; pc = u->upstream_buf.start ? u->peer.connection : NULL; + pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); + if (from_upstream) { src = pc; dst = c; b = &u->upstream_buf; + limit_rate = pscf->upstream_limit_rate; + received = &u->received; } else { src = c; dst = pc; b = &u->downstream_buf; + limit_rate = pscf->downstream_limit_rate; + received = &s->received; } for ( ;; ) { @@ -983,7 +1047,23 @@ size = b->end - b->last; - if (size && src->read->ready) { + if (size && src->read->ready && !src->read->delayed) { + + if (limit_rate) { + limit = (off_t) limit_rate * (ngx_time() - u->start_sec + 1) + - *received; + + if (limit <= 0) { + src->read->delayed = 1; + delay = (ngx_msec_t) (- limit * 1000 / limit_rate + 1); + ngx_add_timer(src->read, delay); + break; + } + + if (size > (size_t) limit) { + size = limit; + } + } n = src->recv(src, b->last, size); @@ -992,15 +1072,19 @@ } if (n > 0) { - if (from_upstream) { - u->received += n; + if (limit_rate) { + delay = (ngx_msec_t) (n * 1000 / limit_rate); - } else { - s->received += n; + if (delay > 0) { + src->read->delayed = 1; + ngx_add_timer(src->read, delay); + } } + *received += n; + b->last += n; do_write = 1; - b->last += n; + continue; } @@ -1012,8 +1096,6 @@ break; } - pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module); - if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) { handler = c->log->handler; c->log->handler = NULL; @@ -1044,7 +1126,12 @@ return NGX_ERROR; } - ngx_add_timer(c->read, pscf->timeout); + if (!c->read->delayed && !pc->read->delayed) { + ngx_add_timer(c->write, pscf->timeout); + + } else if (c->write->timer_set) { + ngx_del_timer(c->write); + } } return NGX_OK; @@ -1207,7 +1294,9 @@ conf->timeout = NGX_CONF_UNSET_MSEC; conf->next_upstream_timeout = NGX_CONF_UNSET_MSEC; conf->downstream_buf_size = NGX_CONF_UNSET_SIZE; + conf->downstream_limit_rate = NGX_CONF_UNSET_SIZE; conf->upstream_buf_size = NGX_CONF_UNSET_SIZE; + conf->upstream_limit_rate = NGX_CONF_UNSET_SIZE; conf->next_upstream_tries = NGX_CONF_UNSET_UINT; conf->next_upstream = NGX_CONF_UNSET; conf->proxy_protocol = NGX_CONF_UNSET; @@ -1244,9 +1333,15 @@ ngx_conf_merge_size_value(conf->downstream_buf_size, prev->downstream_buf_size, 16384); + ngx_conf_merge_size_value(conf->downstream_limit_rate, + prev->downstream_limit_rate, 0); + ngx_conf_merge_size_value(conf->upstream_buf_size, prev->upstream_buf_size, 16384); + ngx_conf_merge_size_value(conf->upstream_limit_rate, + prev->upstream_limit_rate, 0); + ngx_conf_merge_uint_value(conf->next_upstream_tries, prev->next_upstream_tries, 0); diff -r abee77018d3a -r 24488e6db782 src/stream/ngx_stream_upstream.h --- a/src/stream/ngx_stream_upstream.h Tue Jun 23 20:17:47 2015 +0300 +++ b/src/stream/ngx_stream_upstream.h Tue Jun 23 20:17:48 2015 +0300 @@ -83,6 +83,7 @@ ngx_buf_t downstream_buf; ngx_buf_t upstream_buf; off_t received; + time_t start_sec; #if (NGX_STREAM_SSL) ngx_str_t ssl_name; #endif