changeset 6443:fc72784b1f52

Threads: writing via threads pools in event pipe. The "aio_write" directive is introduced, which enables use of aio for writing. Currently it is meaningful only with "aio threads". Note that aio operations can be done by both event pipe and output chain, so proper mapping between r->aio and p->aio is provided when calling ngx_event_pipe() and in output filter. In collaboration with Valentin Bartenev.
author Maxim Dounin <mdounin@mdounin.ru>
date Fri, 18 Mar 2016 06:44:49 +0300
parents 6e10518f95d8
children 043914d19be8
files src/event/ngx_event_pipe.c src/event/ngx_event_pipe.h src/http/ngx_http_core_module.c src/http/ngx_http_core_module.h src/http/ngx_http_upstream.c
diffstat 5 files changed, 214 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- a/src/event/ngx_event_pipe.c	Fri Mar 18 06:44:03 2016 +0300
+++ b/src/event/ngx_event_pipe.c	Fri Mar 18 06:44:49 2016 +0300
@@ -112,6 +112,14 @@
         return NGX_OK;
     }
 
+#if (NGX_THREADS)
+    if (p->aio) {
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
+                       "pipe read upstream: aio");
+        return NGX_AGAIN;
+    }
+#endif
+
     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    "pipe read upstream: %d", p->upstream->read->ready);
 
@@ -258,19 +266,6 @@
                     break;
                 }
 
-                if (rc == NGX_AGAIN) {
-                    if (ngx_event_flags & NGX_USE_LEVEL_EVENT
-                        && p->upstream->read->active
-                        && p->upstream->read->ready)
-                    {
-                        if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
-                            == NGX_ERROR)
-                        {
-                            return NGX_ABORT;
-                        }
-                    }
-                }
-
                 if (rc != NGX_OK) {
                     return rc;
                 }
@@ -475,8 +470,10 @@
         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                        "pipe write chain");
 
-        if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
-            return NGX_ABORT;
+        rc = ngx_event_pipe_write_chain_to_temp_file(p);
+
+        if (rc != NGX_OK) {
+            return rc;
         }
     }
 
@@ -499,6 +496,18 @@
     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    "pipe write downstream: %d", downstream->write->ready);
 
+#if (NGX_THREADS)
+
+    if (p->writing) {
+        rc = ngx_event_pipe_write_chain_to_temp_file(p);
+
+        if (rc == NGX_ABORT) {
+            return NGX_ABORT;
+        }
+    }
+
+#endif
+
     flushed = 0;
 
     for ( ;; ) {
@@ -532,6 +541,10 @@
                 p->out = NULL;
             }
 
+            if (p->writing) {
+                break;
+            }
+
             if (p->in) {
                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                                "pipe write downstream flush in");
@@ -608,7 +621,7 @@
 
                 p->out = p->out->next;
 
-            } else if (!p->cacheable && p->in) {
+            } else if (!p->cacheable && !p->writing && p->in) {
                 cl = p->in;
 
                 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
@@ -710,12 +723,38 @@
     ssize_t       size, bsize, n;
     ngx_buf_t    *b;
     ngx_uint_t    prev_last_shadow;
-    ngx_chain_t  *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl;
+    ngx_chain_t  *cl, *tl, *next, *out, **ll, **last_out, **last_free;
+
+#if (NGX_THREADS)
+
+    if (p->writing) {
+
+        if (p->aio) {
+            return NGX_AGAIN;
+        }
+
+        out = p->writing;
+        p->writing = NULL;
+
+        n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
+
+        if (n == NGX_ERROR) {
+            return NGX_ABORT;
+        }
+
+        goto done;
+    }
+
+#endif
 
     if (p->buf_to_file) {
-        fl.buf = p->buf_to_file;
-        fl.next = p->in;
-        out = &fl;
+        out = ngx_alloc_chain_link(p->pool);
+        if (out == NULL) {
+            return NGX_ABORT;
+        }
+
+        out->buf = p->buf_to_file;
+        out->next = p->in;
 
     } else {
         out = p->in;
@@ -775,12 +814,31 @@
         p->last_in = &p->in;
     }
 
+#if (NGX_THREADS)
+    p->temp_file->thread_write = p->thread_handler ? 1 : 0;
+    p->temp_file->file.thread_task = p->thread_task;
+    p->temp_file->file.thread_handler = p->thread_handler;
+    p->temp_file->file.thread_ctx = p->thread_ctx;
+#endif
+
     n = ngx_write_chain_to_temp_file(p->temp_file, out);
 
     if (n == NGX_ERROR) {
         return NGX_ABORT;
     }
 
+#if (NGX_THREADS)
+
+    if (n == NGX_AGAIN) {
+        p->writing = out;
+        p->thread_task = p->temp_file->file.thread_task;
+        return NGX_AGAIN;
+    }
+
+done:
+
+#endif
+
     if (p->buf_to_file) {
         p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
         n -= p->buf_to_file->last - p->buf_to_file->pos;
--- a/src/event/ngx_event_pipe.h	Fri Mar 18 06:44:03 2016 +0300
+++ b/src/event/ngx_event_pipe.h	Fri Mar 18 06:44:49 2016 +0300
@@ -30,6 +30,8 @@
     ngx_chain_t       *in;
     ngx_chain_t      **last_in;
 
+    ngx_chain_t       *writing;
+
     ngx_chain_t       *out;
     ngx_chain_t       *free;
     ngx_chain_t       *busy;
@@ -45,6 +47,13 @@
     ngx_event_pipe_output_filter_pt   output_filter;
     void                             *output_ctx;
 
+#if (NGX_THREADS)
+    ngx_int_t                       (*thread_handler)(ngx_thread_task_t *task,
+                                                      ngx_file_t *file);
+    void                             *thread_ctx;
+    ngx_thread_task_t                *thread_task;
+#endif
+
     unsigned           read:1;
     unsigned           cacheable:1;
     unsigned           single_buf:1;
@@ -56,6 +65,7 @@
     unsigned           downstream_done:1;
     unsigned           downstream_error:1;
     unsigned           cyclic_temp_file:1;
+    unsigned           aio:1;
 
     ngx_int_t          allocated;
     ngx_bufs_t         bufs;
--- a/src/http/ngx_http_core_module.c	Fri Mar 18 06:44:03 2016 +0300
+++ b/src/http/ngx_http_core_module.c	Fri Mar 18 06:44:49 2016 +0300
@@ -402,6 +402,13 @@
       0,
       NULL },
 
+    { ngx_string("aio_write"),
+      NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG,
+      ngx_conf_set_flag_slot,
+      NGX_HTTP_LOC_CONF_OFFSET,
+      offsetof(ngx_http_core_loc_conf_t, aio_write),
+      NULL },
+
     { ngx_string("read_ahead"),
       NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
       ngx_conf_set_size_slot,
@@ -3608,6 +3615,7 @@
     clcf->sendfile = NGX_CONF_UNSET;
     clcf->sendfile_max_chunk = NGX_CONF_UNSET_SIZE;
     clcf->aio = NGX_CONF_UNSET;
+    clcf->aio_write = NGX_CONF_UNSET;
 #if (NGX_THREADS)
     clcf->thread_pool = NGX_CONF_UNSET_PTR;
     clcf->thread_pool_value = NGX_CONF_UNSET_PTR;
@@ -3829,6 +3837,7 @@
                               prev->sendfile_max_chunk, 0);
 #if (NGX_HAVE_FILE_AIO || NGX_THREADS)
     ngx_conf_merge_value(conf->aio, prev->aio, NGX_HTTP_AIO_OFF);
+    ngx_conf_merge_value(conf->aio_write, prev->aio_write, 0);
 #endif
 #if (NGX_THREADS)
     ngx_conf_merge_ptr_value(conf->thread_pool, prev->thread_pool, NULL);
--- a/src/http/ngx_http_core_module.h	Fri Mar 18 06:44:03 2016 +0300
+++ b/src/http/ngx_http_core_module.h	Fri Mar 18 06:44:49 2016 +0300
@@ -404,6 +404,7 @@
     ngx_flag_t    internal;                /* internal */
     ngx_flag_t    sendfile;                /* sendfile */
     ngx_flag_t    aio;                     /* aio */
+    ngx_flag_t    aio_write;               /* aio_write */
     ngx_flag_t    tcp_nopush;              /* tcp_nopush */
     ngx_flag_t    tcp_nodelay;             /* tcp_nodelay */
     ngx_flag_t    reset_timedout_connection; /* reset_timedout_connection */
--- a/src/http/ngx_http_upstream.c	Fri Mar 18 06:44:03 2016 +0300
+++ b/src/http/ngx_http_upstream.c	Fri Mar 18 06:44:49 2016 +0300
@@ -76,6 +76,13 @@
 static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data);
 static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data,
     ssize_t bytes);
+#if (NGX_THREADS)
+static ngx_int_t ngx_http_upstream_thread_handler(ngx_thread_task_t *task,
+    ngx_file_t *file);
+static void ngx_http_upstream_thread_event_handler(ngx_event_t *ev);
+#endif
+static ngx_int_t ngx_http_upstream_output_filter(void *data,
+    ngx_chain_t *chain);
 static void ngx_http_upstream_process_downstream(ngx_http_request_t *r);
 static void ngx_http_upstream_process_upstream(ngx_http_request_t *r,
     ngx_http_upstream_t *u);
@@ -2870,7 +2877,7 @@
 
     p = u->pipe;
 
-    p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
+    p->output_filter = ngx_http_upstream_output_filter;
     p->output_ctx = r;
     p->tag = u->output.tag;
     p->bufs = u->conf->bufs;
@@ -2913,6 +2920,13 @@
     p->max_temp_file_size = u->conf->max_temp_file_size;
     p->temp_file_write_size = u->conf->temp_file_write_size;
 
+#if (NGX_THREADS)
+    if (clcf->aio == NGX_HTTP_AIO_THREADS && clcf->aio_write) {
+        p->thread_handler = ngx_http_upstream_thread_handler;
+        p->thread_ctx = r;
+    }
+#endif
+
     p->preread_bufs = ngx_alloc_chain_link(r->pool);
     if (p->preread_bufs == NULL) {
         ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
@@ -3487,6 +3501,97 @@
 }
 
 
+#if (NGX_THREADS)
+
+static ngx_int_t
+ngx_http_upstream_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
+{
+    ngx_str_t                  name;
+    ngx_event_pipe_t          *p;
+    ngx_thread_pool_t         *tp;
+    ngx_http_request_t        *r;
+    ngx_http_core_loc_conf_t  *clcf;
+
+    r = file->thread_ctx;
+    p = r->upstream->pipe;
+
+    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
+    tp = clcf->thread_pool;
+
+    if (tp == NULL) {
+        if (ngx_http_complex_value(r, clcf->thread_pool_value, &name)
+            != NGX_OK)
+        {
+            return NGX_ERROR;
+        }
+
+        tp = ngx_thread_pool_get((ngx_cycle_t *) ngx_cycle, &name);
+
+        if (tp == NULL) {
+            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+                          "thread pool \"%V\" not found", &name);
+            return NGX_ERROR;
+        }
+    }
+
+    task->event.data = r;
+    task->event.handler = ngx_http_upstream_thread_event_handler;
+
+    if (ngx_thread_task_post(tp, task) != NGX_OK) {
+        return NGX_ERROR;
+    }
+
+    r->main->blocked++;
+    r->aio = 1;
+    p->aio = 1;
+
+    return NGX_OK;
+}
+
+
+static void
+ngx_http_upstream_thread_event_handler(ngx_event_t *ev)
+{
+    ngx_connection_t    *c;
+    ngx_http_request_t  *r;
+
+    r = ev->data;
+    c = r->connection;
+
+    ngx_http_set_log_request(c->log, r);
+
+    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
+                   "http upstream thread: \"%V?%V\"", &r->uri, &r->args);
+
+    r->main->blocked--;
+    r->aio = 0;
+
+    r->write_event_handler(r);
+
+    ngx_http_run_posted_requests(c);
+}
+
+#endif
+
+
+static ngx_int_t
+ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain)
+{
+    ngx_int_t            rc;
+    ngx_event_pipe_t    *p;
+    ngx_http_request_t  *r;
+
+    r = data;
+    p = r->upstream->pipe;
+
+    rc = ngx_http_output_filter(r, chain);
+
+    p->aio = r->aio;
+
+    return rc;
+}
+
+
 static void
 ngx_http_upstream_process_downstream(ngx_http_request_t *r)
 {
@@ -3505,6 +3610,10 @@
 
     c->log->action = "sending to client";
 
+#if (NGX_THREADS)
+    p->aio = r->aio;
+#endif
+
     if (wev->timedout) {
 
         if (wev->delayed) {
@@ -3634,6 +3743,12 @@
 
     p = u->pipe;
 
+#if (NGX_THREADS)
+    if (p->writing) {
+        return;
+    }
+#endif
+
     if (u->peer.connection) {
 
         if (u->store) {