Mercurial > hg > nginx
view src/http/ngx_http_copy_filter_module.c @ 9202:e88cdaa0f1ff
AIO operations now add timers (ticket #2162).
Each AIO (thread IO) operation being run is now accompanied with 1-minute
timer. This timer prevents unexpected shutdown of the worker process while
an AIO operation is running, and logs an alert if the operation is running
for too long.
This fixes "open socket left" alerts during worker processes shutdown
due to pending AIO (or thread IO) operations while corresponding requests
have no timers. In particular, such errors were observed while reading
cache headers (ticket #2162), and with worker_shutdown_timeout.
author | Maxim Dounin <mdounin@mdounin.ru> |
---|---|
date | Mon, 29 Jan 2024 10:31:37 +0300 |
parents | ec2e6893caaa |
children | 0de20f43db25 |
line wrap: on
line source
/* * Copyright (C) Igor Sysoev * Copyright (C) Nginx, Inc. */ #include <ngx_config.h> #include <ngx_core.h> #include <ngx_http.h> typedef struct { ngx_bufs_t bufs; } ngx_http_copy_filter_conf_t; #if (NGX_HAVE_FILE_AIO) static void ngx_http_copy_aio_handler(ngx_output_chain_ctx_t *ctx, ngx_file_t *file); static void ngx_http_copy_aio_event_handler(ngx_event_t *ev); #endif #if (NGX_THREADS) static ngx_int_t ngx_http_copy_thread_handler(ngx_thread_task_t *task, ngx_file_t *file); static void ngx_http_copy_thread_event_handler(ngx_event_t *ev); #endif static void *ngx_http_copy_filter_create_conf(ngx_conf_t *cf); static char *ngx_http_copy_filter_merge_conf(ngx_conf_t *cf, void *parent, void *child); static ngx_int_t ngx_http_copy_filter_init(ngx_conf_t *cf); static ngx_command_t ngx_http_copy_filter_commands[] = { { ngx_string("output_buffers"), NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE2, ngx_conf_set_bufs_slot, NGX_HTTP_LOC_CONF_OFFSET, offsetof(ngx_http_copy_filter_conf_t, bufs), NULL }, ngx_null_command }; static ngx_http_module_t ngx_http_copy_filter_module_ctx = { NULL, /* preconfiguration */ ngx_http_copy_filter_init, /* postconfiguration */ NULL, /* create main configuration */ NULL, /* init main configuration */ NULL, /* create server configuration */ NULL, /* merge server configuration */ ngx_http_copy_filter_create_conf, /* create location configuration */ ngx_http_copy_filter_merge_conf /* merge location configuration */ }; ngx_module_t ngx_http_copy_filter_module = { NGX_MODULE_V1, &ngx_http_copy_filter_module_ctx, /* module context */ ngx_http_copy_filter_commands, /* module directives */ NGX_HTTP_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ NULL, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING }; static ngx_http_output_body_filter_pt ngx_http_next_body_filter; static ngx_int_t ngx_http_copy_filter(ngx_http_request_t *r, ngx_chain_t *in) { ngx_int_t rc; ngx_connection_t *c; ngx_output_chain_ctx_t *ctx; ngx_http_core_loc_conf_t *clcf; ngx_http_copy_filter_conf_t *conf; c = r->connection; ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http copy filter: \"%V?%V\"", &r->uri, &r->args); ctx = ngx_http_get_module_ctx(r, ngx_http_copy_filter_module); if (ctx == NULL) { ctx = ngx_pcalloc(r->pool, sizeof(ngx_output_chain_ctx_t)); if (ctx == NULL) { return NGX_ERROR; } ngx_http_set_ctx(r, ctx, ngx_http_copy_filter_module); conf = ngx_http_get_module_loc_conf(r, ngx_http_copy_filter_module); clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); ctx->sendfile = c->sendfile; ctx->need_in_memory = r->main_filter_need_in_memory || r->filter_need_in_memory; ctx->need_in_temp = r->filter_need_temporary; ctx->alignment = clcf->directio_alignment; ctx->pool = r->pool; ctx->bufs = conf->bufs; ctx->tag = (ngx_buf_tag_t) &ngx_http_copy_filter_module; ctx->output_filter = (ngx_output_chain_filter_pt) ngx_http_next_body_filter; ctx->filter_ctx = r; #if (NGX_HAVE_FILE_AIO) if (ngx_file_aio && clcf->aio == NGX_HTTP_AIO_ON) { ctx->aio_handler = ngx_http_copy_aio_handler; } #endif #if (NGX_THREADS) if (clcf->aio == NGX_HTTP_AIO_THREADS) { ctx->thread_handler = ngx_http_copy_thread_handler; } #endif if (in && in->buf && ngx_buf_size(in->buf)) { r->request_output = 1; } } #if (NGX_HAVE_FILE_AIO || NGX_THREADS) ctx->aio = r->aio; #endif rc = ngx_output_chain(ctx, in); if (ctx->in == NULL) { r->buffered &= ~NGX_HTTP_COPY_BUFFERED; } else { r->buffered |= NGX_HTTP_COPY_BUFFERED; } ngx_log_debug3(NGX_LOG_DEBUG_HTTP, c->log, 0, "http copy filter: %i \"%V?%V\"", rc, &r->uri, &r->args); return rc; } #if (NGX_HAVE_FILE_AIO) static void ngx_http_copy_aio_handler(ngx_output_chain_ctx_t *ctx, ngx_file_t *file) { ngx_http_request_t *r; r = ctx->filter_ctx; file->aio->data = r; file->aio->handler = ngx_http_copy_aio_event_handler; ngx_add_timer(&file->aio->event, 60000); r->main->blocked++; r->aio = 1; ctx->aio = 1; } static void ngx_http_copy_aio_event_handler(ngx_event_t *ev) { ngx_event_aio_t *aio; ngx_connection_t *c; ngx_http_request_t *r; aio = ev->data; r = aio->data; c = r->connection; ngx_http_set_log_request(c->log, r); ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http aio: \"%V?%V\"", &r->uri, &r->args); if (ev->timedout) { ngx_log_error(NGX_LOG_ALERT, c->log, 0, "aio operation took too long"); ev->timedout = 0; return; } if (ev->timer_set) { ngx_del_timer(ev); } r->main->blocked--; r->aio = 0; r->write_event_handler(r); ngx_http_run_posted_requests(c); } #endif #if (NGX_THREADS) static ngx_int_t ngx_http_copy_thread_handler(ngx_thread_task_t *task, ngx_file_t *file) { ngx_str_t name; ngx_connection_t *c; ngx_thread_pool_t *tp; ngx_http_request_t *r; ngx_output_chain_ctx_t *ctx; ngx_http_core_loc_conf_t *clcf; r = file->thread_ctx; if (r->aio) { /* * tolerate sendfile() calls if another operation is already * running; this can happen due to subrequests, multiple calls * of the next body filter from a filter, or in HTTP/2 due to * a write event on the main connection */ c = r->connection; #if (NGX_HTTP_V2) if (r->stream) { c = r->stream->connection->connection; } #endif if (task == c->sendfile_task) { return NGX_OK; } } clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); tp = clcf->thread_pool; if (tp == NULL) { if (ngx_http_complex_value(r, clcf->thread_pool_value, &name) != NGX_OK) { return NGX_ERROR; } tp = ngx_thread_pool_get((ngx_cycle_t *) ngx_cycle, &name); if (tp == NULL) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "thread pool \"%V\" not found", &name); return NGX_ERROR; } } task->event.data = r; task->event.handler = ngx_http_copy_thread_event_handler; if (ngx_thread_task_post(tp, task) != NGX_OK) { return NGX_ERROR; } ngx_add_timer(&task->event, 60000); r->main->blocked++; r->aio = 1; ctx = ngx_http_get_module_ctx(r, ngx_http_copy_filter_module); ctx->aio = 1; return NGX_OK; } static void ngx_http_copy_thread_event_handler(ngx_event_t *ev) { ngx_connection_t *c; ngx_http_request_t *r; r = ev->data; c = r->connection; ngx_http_set_log_request(c->log, r); ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "http thread: \"%V?%V\"", &r->uri, &r->args); if (ev->timedout) { ngx_log_error(NGX_LOG_ALERT, c->log, 0, "thread operation took too long"); ev->timedout = 0; return; } if (ev->timer_set) { ngx_del_timer(ev); } r->main->blocked--; r->aio = 0; #if (NGX_HTTP_V2) if (r->stream) { /* * for HTTP/2, update write event to make sure processing will * reach the main connection to handle sendfile() in threads */ c->write->ready = 1; c->write->active = 0; } #endif if (r->done) { /* * trigger connection event handler if the subrequest was * already finalized; this can happen if the handler is used * for sendfile() in threads */ c->write->handler(c->write); } else { r->write_event_handler(r); ngx_http_run_posted_requests(c); } } #endif static void * ngx_http_copy_filter_create_conf(ngx_conf_t *cf) { ngx_http_copy_filter_conf_t *conf; conf = ngx_palloc(cf->pool, sizeof(ngx_http_copy_filter_conf_t)); if (conf == NULL) { return NULL; } conf->bufs.num = 0; return conf; } static char * ngx_http_copy_filter_merge_conf(ngx_conf_t *cf, void *parent, void *child) { ngx_http_copy_filter_conf_t *prev = parent; ngx_http_copy_filter_conf_t *conf = child; ngx_conf_merge_bufs_value(conf->bufs, prev->bufs, 2, 32768); return NULL; } static ngx_int_t ngx_http_copy_filter_init(ngx_conf_t *cf) { ngx_http_next_body_filter = ngx_http_top_body_filter; ngx_http_top_body_filter = ngx_http_copy_filter; return NGX_OK; }