Mercurial > hg > nginx
comparison src/event/ngx_event_pipe.c @ 6443:fc72784b1f52
Threads: writing via threads pools in event pipe.
The "aio_write" directive is introduced, which enables use of aio
for writing. Currently it is meaningful only with "aio threads".
Note that aio operations can be done by both event pipe and output
chain, so proper mapping between r->aio and p->aio is provided when
calling ngx_event_pipe() and in output filter.
In collaboration with Valentin Bartenev.
author | Maxim Dounin <mdounin@mdounin.ru> |
---|---|
date | Fri, 18 Mar 2016 06:44:49 +0300 |
parents | d811f22033ad |
children | 2cd019520210 |
comparison
equal
deleted
inserted
replaced
6442:6e10518f95d8 | 6443:fc72784b1f52 |
---|---|
110 | 110 |
111 if (p->upstream_eof || p->upstream_error || p->upstream_done) { | 111 if (p->upstream_eof || p->upstream_error || p->upstream_done) { |
112 return NGX_OK; | 112 return NGX_OK; |
113 } | 113 } |
114 | 114 |
115 #if (NGX_THREADS) | |
116 if (p->aio) { | |
117 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, | |
118 "pipe read upstream: aio"); | |
119 return NGX_AGAIN; | |
120 } | |
121 #endif | |
122 | |
115 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, | 123 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, |
116 "pipe read upstream: %d", p->upstream->read->ready); | 124 "pipe read upstream: %d", p->upstream->read->ready); |
117 | 125 |
118 for ( ;; ) { | 126 for ( ;; ) { |
119 | 127 |
254 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, | 262 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, |
255 "pipe temp offset: %O", p->temp_file->offset); | 263 "pipe temp offset: %O", p->temp_file->offset); |
256 | 264 |
257 if (rc == NGX_BUSY) { | 265 if (rc == NGX_BUSY) { |
258 break; | 266 break; |
259 } | |
260 | |
261 if (rc == NGX_AGAIN) { | |
262 if (ngx_event_flags & NGX_USE_LEVEL_EVENT | |
263 && p->upstream->read->active | |
264 && p->upstream->read->ready) | |
265 { | |
266 if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0) | |
267 == NGX_ERROR) | |
268 { | |
269 return NGX_ABORT; | |
270 } | |
271 } | |
272 } | 267 } |
273 | 268 |
274 if (rc != NGX_OK) { | 269 if (rc != NGX_OK) { |
275 return rc; | 270 return rc; |
276 } | 271 } |
473 if (p->cacheable && (p->in || p->buf_to_file)) { | 468 if (p->cacheable && (p->in || p->buf_to_file)) { |
474 | 469 |
475 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, | 470 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, |
476 "pipe write chain"); | 471 "pipe write chain"); |
477 | 472 |
478 if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) { | 473 rc = ngx_event_pipe_write_chain_to_temp_file(p); |
479 return NGX_ABORT; | 474 |
475 if (rc != NGX_OK) { | |
476 return rc; | |
480 } | 477 } |
481 } | 478 } |
482 | 479 |
483 return NGX_OK; | 480 return NGX_OK; |
484 } | 481 } |
497 downstream = p->downstream; | 494 downstream = p->downstream; |
498 | 495 |
499 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, | 496 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, |
500 "pipe write downstream: %d", downstream->write->ready); | 497 "pipe write downstream: %d", downstream->write->ready); |
501 | 498 |
499 #if (NGX_THREADS) | |
500 | |
501 if (p->writing) { | |
502 rc = ngx_event_pipe_write_chain_to_temp_file(p); | |
503 | |
504 if (rc == NGX_ABORT) { | |
505 return NGX_ABORT; | |
506 } | |
507 } | |
508 | |
509 #endif | |
510 | |
502 flushed = 0; | 511 flushed = 0; |
503 | 512 |
504 for ( ;; ) { | 513 for ( ;; ) { |
505 if (p->downstream_error) { | 514 if (p->downstream_error) { |
506 return ngx_event_pipe_drain_chains(p); | 515 return ngx_event_pipe_drain_chains(p); |
528 p->downstream_error = 1; | 537 p->downstream_error = 1; |
529 return ngx_event_pipe_drain_chains(p); | 538 return ngx_event_pipe_drain_chains(p); |
530 } | 539 } |
531 | 540 |
532 p->out = NULL; | 541 p->out = NULL; |
542 } | |
543 | |
544 if (p->writing) { | |
545 break; | |
533 } | 546 } |
534 | 547 |
535 if (p->in) { | 548 if (p->in) { |
536 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, | 549 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, |
537 "pipe write downstream flush in"); | 550 "pipe write downstream flush in"); |
606 "recycled buffer in pipe out chain"); | 619 "recycled buffer in pipe out chain"); |
607 } | 620 } |
608 | 621 |
609 p->out = p->out->next; | 622 p->out = p->out->next; |
610 | 623 |
611 } else if (!p->cacheable && p->in) { | 624 } else if (!p->cacheable && !p->writing && p->in) { |
612 cl = p->in; | 625 cl = p->in; |
613 | 626 |
614 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0, | 627 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0, |
615 "pipe write buf ls:%d %p %z", | 628 "pipe write buf ls:%d %p %z", |
616 cl->buf->last_shadow, | 629 cl->buf->last_shadow, |
708 ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p) | 721 ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p) |
709 { | 722 { |
710 ssize_t size, bsize, n; | 723 ssize_t size, bsize, n; |
711 ngx_buf_t *b; | 724 ngx_buf_t *b; |
712 ngx_uint_t prev_last_shadow; | 725 ngx_uint_t prev_last_shadow; |
713 ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl; | 726 ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free; |
727 | |
728 #if (NGX_THREADS) | |
729 | |
730 if (p->writing) { | |
731 | |
732 if (p->aio) { | |
733 return NGX_AGAIN; | |
734 } | |
735 | |
736 out = p->writing; | |
737 p->writing = NULL; | |
738 | |
739 n = ngx_write_chain_to_temp_file(p->temp_file, NULL); | |
740 | |
741 if (n == NGX_ERROR) { | |
742 return NGX_ABORT; | |
743 } | |
744 | |
745 goto done; | |
746 } | |
747 | |
748 #endif | |
714 | 749 |
715 if (p->buf_to_file) { | 750 if (p->buf_to_file) { |
716 fl.buf = p->buf_to_file; | 751 out = ngx_alloc_chain_link(p->pool); |
717 fl.next = p->in; | 752 if (out == NULL) { |
718 out = &fl; | 753 return NGX_ABORT; |
754 } | |
755 | |
756 out->buf = p->buf_to_file; | |
757 out->next = p->in; | |
719 | 758 |
720 } else { | 759 } else { |
721 out = p->in; | 760 out = p->in; |
722 } | 761 } |
723 | 762 |
773 } else { | 812 } else { |
774 p->in = NULL; | 813 p->in = NULL; |
775 p->last_in = &p->in; | 814 p->last_in = &p->in; |
776 } | 815 } |
777 | 816 |
817 #if (NGX_THREADS) | |
818 p->temp_file->thread_write = p->thread_handler ? 1 : 0; | |
819 p->temp_file->file.thread_task = p->thread_task; | |
820 p->temp_file->file.thread_handler = p->thread_handler; | |
821 p->temp_file->file.thread_ctx = p->thread_ctx; | |
822 #endif | |
823 | |
778 n = ngx_write_chain_to_temp_file(p->temp_file, out); | 824 n = ngx_write_chain_to_temp_file(p->temp_file, out); |
779 | 825 |
780 if (n == NGX_ERROR) { | 826 if (n == NGX_ERROR) { |
781 return NGX_ABORT; | 827 return NGX_ABORT; |
782 } | 828 } |
829 | |
830 #if (NGX_THREADS) | |
831 | |
832 if (n == NGX_AGAIN) { | |
833 p->writing = out; | |
834 p->thread_task = p->temp_file->file.thread_task; | |
835 return NGX_AGAIN; | |
836 } | |
837 | |
838 done: | |
839 | |
840 #endif | |
783 | 841 |
784 if (p->buf_to_file) { | 842 if (p->buf_to_file) { |
785 p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos; | 843 p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos; |
786 n -= p->buf_to_file->last - p->buf_to_file->pos; | 844 n -= p->buf_to_file->last - p->buf_to_file->pos; |
787 p->buf_to_file = NULL; | 845 p->buf_to_file = NULL; |