comparison src/event/ngx_event_proxy.c @ 145:58557d0cccd1

nginx-0.0.1-2003-10-13-20:32:29 import
author Igor Sysoev <igor@sysoev.ru>
date Mon, 13 Oct 2003 16:32:29 +0000
parents ef8c87afcfc5
children 5ac79e574285
comparison
equal deleted inserted replaced
144:ef8c87afcfc5 145:58557d0cccd1
16 16
17 int ngx_event_proxy_read_upstream(ngx_event_proxy_t *p) 17 int ngx_event_proxy_read_upstream(ngx_event_proxy_t *p)
18 { 18 {
19 int n, rc, size; 19 int n, rc, size;
20 ngx_hunk_t *h, *nh; 20 ngx_hunk_t *h, *nh;
21 ngx_chain_t *chain, *temp, *entry, *next; 21 ngx_chain_t *chain, *rest, *ce, *next;
22 22
23 #if (NGX_SUPPRESS_WARN) 23 #if (NGX_SUPPRESS_WARN)
24 entry = NULL; 24 rest = NULL;
25 #endif 25 #endif
26 26
27 #if (NGX_EVENT_COPY_FILTER) 27 #if (NGX_EVENT_COPY_FILTER)
28 28
29 if (p->input_filter == NULL) { 29 if (p->input_filter == NULL) {
36 36
37 ngx_log_debug(p->log, "read upstream"); 37 ngx_log_debug(p->log, "read upstream");
38 38
39 for ( ;; ) { 39 for ( ;; ) {
40 40
41 /* use the pre-read hunks if they exist */
42
43 if (p->preread_hunks) { 41 if (p->preread_hunks) {
42
43 /* use the pre-read hunks if they exist */
44
44 chain = p->preread_hunks; 45 chain = p->preread_hunks;
45 p->preread_hunks = NULL; 46 p->preread_hunks = NULL;
46 n = p->preread_size; 47 n = p->preread_size;
47 48
49 ngx_log_debug(p->log, "preread: %d" _ n);
50
48 } else { 51 } else {
49 52
50 #if (HAVE_KQUEUE) /* kqueue notifies about the end of file or a pending error */ 53 #if (HAVE_KQUEUE)
54
55 /*
56 * kqueue notifies about the end of file or a pending error.
57 * This test allows not to allocate a hunk on these conditions
58 * and not to call ngx_recv_chain().
59 */
51 60
52 if (ngx_event_flags == NGX_HAVE_KQUEUE_EVENT) { 61 if (ngx_event_flags == NGX_HAVE_KQUEUE_EVENT) {
53 62
54 if (p->upstream->read->error) { 63 if (p->upstream->read->error) {
55 ngx_log_error(NGX_LOG_ERR, p->log, 64 ngx_log_error(NGX_LOG_ERR, p->log, p->upstream->read->error,
56 p->upstream->read->error,
57 "readv() failed"); 65 "readv() failed");
58 p->upstream_error = 1; 66 p->upstream_error = 1;
59 67
60 return NGX_ERROR; 68 return NGX_ERROR;
61 69
62 } else if (p->upstream->read->eof 70 } else if (p->upstream->read->eof
63 && p->upstream->read->available == 0) { 71 && p->upstream->read->available == 0) {
64 p->upstream_eof = 1; 72 p->upstream_eof = 1;
65 p->block_upstream = 0;
66 73
67 break; 74 break;
68 } 75 }
69 } 76 }
70 #endif 77 #endif
71 /* use the free hunks if they exist */
72 78
73 if (p->free_hunks) { 79 if (p->free_hunks) {
80
81 /* use the free hunks if they exist */
82
74 chain = p->free_hunks; 83 chain = p->free_hunks;
75 p->free_hunks = NULL; 84 p->free_hunks = NULL;
76 85
77 ngx_log_debug(p->log, "free hunk: %08X:%d" _ chain->hunk _ 86 ngx_log_debug(p->log, "free hunk: %08X:%d" _ chain->hunk _
78 chain->hunk->end - chain->hunk->last); 87 chain->hunk->end - chain->hunk->last);
79 88
80 /* allocate a new hunk if it's still allowed */ 89 } else if (p->hunks < p->bufs.num) {
81 90
82 } else if (p->allocated < p->max_block_size) { 91 /* allocate a new hunk if it's still allowed */
83 h = ngx_create_temp_hunk(p->pool, p->block_size, 20, 20); 92
84 if (h == NULL) { 93 ngx_test_null(h, ngx_create_temp_hunk(p->pool,
85 return NGX_ABORT; 94 p->bufs.size, 20, 20);
86 } 95 NGX_ABORT);
87 96 p->hunks++;
88 p->allocated += p->block_size; 97
89 98 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT);
90 temp = ngx_alloc_chain_entry(p->pool); 99 chain = te;
91 if (temp == NULL) {
92 return NGX_ABORT;
93 }
94
95 temp->hunk = h;
96 temp->next = NULL;
97 chain = temp;
98 100
99 ngx_log_debug(p->log, "new hunk: %08X" _ chain->hunk); 101 ngx_log_debug(p->log, "new hunk: %08X" _ chain->hunk);
100 102
101 /* use the file hunks if they exist */
102
103 } else if (p->file_hunks) { 103 } else if (p->file_hunks) {
104
105 /* use the file hunks if they exist */
106
104 chain = p->file_hunks; 107 chain = p->file_hunks;
105 p->file_hunks = NULL; 108 p->file_hunks = NULL;
106 109
107 ngx_log_debug(p->log, "file hunk: %08X" _ chain->hunk _ 110 ngx_log_debug(p->log, "file hunk: %08X" _ chain->hunk _
108 chain->hunk->end - chain->hunk->last); 111 chain->hunk->end - chain->hunk->last);
109 112
110 /* if the hunks are not needed to be saved in a cache and
111 a downstream is ready then write the hunks to a downstream */
112
113 } else if (p->cachable == 0 && p->downstream->write->ready) { 113 } else if (p->cachable == 0 && p->downstream->write->ready) {
114 114
115 /*
116 * if the hunks are not needed to be saved in a cache and
117 * a downstream is ready then write the hunks to a downstream
118 */
119
115 ngx_log_debug(p->log, "downstream ready"); 120 ngx_log_debug(p->log, "downstream ready");
116 121
117 break; 122 break;
118 123
119 /* if it's allowed then save the incoming hunks to a temporary
120 file, move the saved read hunks to a file chain,
121 convert the incoming hunks into the file hunks
122 and add them to an outgoing chain */
123
124 } else if (p->temp_offset < p->max_temp_file_size) { 124 } else if (p->temp_offset < p->max_temp_file_size) {
125
126 /*
127 * if it's allowed then save the incoming hunks to a temporary
128 * file, move the saved read hunks to a file chain,
129 * convert the incoming hunks into the file hunks
130 * and add them to an outgoing chain
131 */
132
125 rc = ngx_event_proxy_write_chain_to_temp_file(p); 133 rc = ngx_event_proxy_write_chain_to_temp_file(p);
126 134
127 ngx_log_debug(p->log, "temp offset: %d" _ p->temp_offset); 135 ngx_log_debug(p->log, "temp offset: %d" _ p->temp_offset);
128 136
129 if (rc != NGX_OK) { 137 if (rc != NGX_OK) {
134 p->file_hunks = NULL; 142 p->file_hunks = NULL;
135 143
136 ngx_log_debug(p->log, "new file hunk: %08X:%d" _ chain->hunk _ 144 ngx_log_debug(p->log, "new file hunk: %08X:%d" _ chain->hunk _
137 chain->hunk->end - chain->hunk->last); 145 chain->hunk->end - chain->hunk->last);
138 146
139 /* if there're no hunks to read in then disable a level event */
140
141 } else { 147 } else {
142 p->block_upstream = 1; 148
149 /* if there're no hunks to read in then disable a level event */
143 150
144 ngx_log_debug(p->log, "no hunks to read in"); 151 ngx_log_debug(p->log, "no hunks to read in");
145 152
146 break; 153 break;
147 } 154 }
148 155
149 n = ngx_recv_chain(p->upstream, chain); 156 n = ngx_recv_chain(p->upstream, chain);
150 157
151 }
152
153 ngx_log_debug(p->log, "recv_chain: %d" _ n); 158 ngx_log_debug(p->log, "recv_chain: %d" _ n);
154 159
155 if (n == NGX_ERROR) { 160 if (n == NGX_ERROR) {
156 p->upstream_error = 1; 161 p->upstream_error = 1;
157 return NGX_ERROR; 162 return NGX_ERROR;
158 } 163 }
159 164
160 if (n == NGX_AGAIN) { 165 if (n == NGX_AGAIN) {
161 if (p->upstream->read->blocked) { 166 if (ngx_handle_read_event(p->upstream->read) == NGX_ERROR) {
162 if (ngx_add_event(p->upstream->read, NGX_READ_EVENT,
163 NGX_LEVEL_EVENT) == NGX_ERROR) {
164 return NGX_ABORT; 167 return NGX_ABORT;
165 } 168 }
166 p->block_upstream = 0; 169
167 p->upstream->read->blocked = 0; 170 break;
168 } 171 }
169 172
170 break; 173 if (n == 0) {
171 } 174 if (chain->hunk->shadow == NULL) {
172 175 p->free_hunks = chain;
173 if (n == 0) { 176 }
174 if (chain->hunk->shadow == NULL) { 177 p->upstream_eof = 1;
175 p->free_hunks = chain; 178
176 } 179 break;
177 p->upstream_eof = 1; 180 }
178 p->block_upstream = 0; 181
179 182 }
180 break; 183
181 } 184 /*
182 185 * move the full hunks to a read chain, the partial filled hunk
183 /* move the full hunks to a read chain 186 * to a free chain, and remove the shadow links for these hunks
184 and the partial filled hunk to a free chain 187 */
185 and remove the shadow links for these hunks */ 188
186 189 for (ce = chain; ce && n > 0; ce = next) {
187 for (entry = chain; entry && n > 0; entry = next) { 190 next = ce->next;
188 next = entry->next; 191 ce->next = NULL;
189 entry->next = NULL; 192
190 193 if (ce->hunk->shadow) {
191 if (entry->hunk->shadow) { 194 for (h = ce->hunk->shadow;
192 for (h = entry->hunk->shadow;
193 (h->type & NGX_HUNK_LAST_SHADOW) == 0; 195 (h->type & NGX_HUNK_LAST_SHADOW) == 0;
194 h = nh) 196 h = nh)
195 { 197 {
196 nh = h->shadow; 198 nh = h->shadow;
197 h->shadow = NULL; 199 h->shadow = NULL;
203 h->shadow = NULL; 205 h->shadow = NULL;
204 h->type &= ~(NGX_HUNK_TEMP 206 h->type &= ~(NGX_HUNK_TEMP
205 |NGX_HUNK_IN_MEMORY 207 |NGX_HUNK_IN_MEMORY
206 |NGX_HUNK_RECYCLED 208 |NGX_HUNK_RECYCLED
207 |NGX_HUNK_LAST_SHADOW); 209 |NGX_HUNK_LAST_SHADOW);
208 entry->hunk->shadow = NULL; 210 ce->hunk->shadow = NULL;
209 } 211 }
210 212
211 size = entry->hunk->end - entry->hunk->last; 213 size = ce->hunk->end - ce->hunk->last;
212 214
213 if (n >= size) { 215 if (n >= size) {
214 entry->hunk->last = entry->hunk->end; 216 ce->hunk->last = ce->hunk->end;
215 217
216 if (p->read_hunks) { 218 if (p->read_hunks) {
217 p->last_read_hunk->next = entry; 219 p->last_read_hunk->next = ce;
218 220
219 } else { 221 } else {
220 p->read_hunks = entry; 222 p->read_hunks = ce;
221 } 223 }
222 224
223 p->last_read_hunk = entry; 225 p->last_read_hunk = ce;
224 226
225 n -= size; 227 n -= size;
226 228
227 #if !(NGX_EVENT_COPY_FILTER) 229 #if !(NGX_EVENT_COPY_FILTER)
228 230
230 continue; 232 continue;
231 } 233 }
232 234
233 /* the inline copy input filter */ 235 /* the inline copy input filter */
234 236
235 h = ngx_alloc_hunk(p->pool); 237 ngx_test_null(h, ngx_alloc_hunk(p->pool), NGX_ABORT);
236 if (h == NULL) { 238
237 return NGX_ABORT; 239 ngx_memcpy(h, ce->hunk, sizeof(ngx_hunk_t));
238 } 240 h->shadow = ce->hunk;
239
240 ngx_memcpy(h, entry->hunk, sizeof(ngx_hunk_t));
241 h->shadow = entry->hunk;
242 h->type |= NGX_HUNK_LAST_SHADOW|NGX_HUNK_RECYCLED; 241 h->type |= NGX_HUNK_LAST_SHADOW|NGX_HUNK_RECYCLED;
243 entry->hunk->shadow = h; 242 ce->hunk->shadow = h;
244 243
245 temp = ngx_alloc_chain_entry(p->pool); 244 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT);
246 if (temp == NULL) { 245
247 return NGX_ABORT; 246 ngx_chain_add_ce(p->in_hunks, p->last_in_hunk, te);
248 }
249
250 temp->hunk = h;
251 temp->next = NULL;
252
253 if (p->in_hunks) {
254 p->last_in_hunk->next = temp;
255
256 } else {
257 p->in_hunks = temp;
258 }
259
260 p->last_in_hunk = temp;
261 #endif 247 #endif
262 248
263 } else { 249 } else {
264 entry->hunk->last += n; 250 ce->hunk->last += n;
265 p->free_hunks = entry; 251 p->free_hunks = ce;
266 252
267 n = 0; 253 n = 0;
268 } 254 }
269 } 255 }
270 256
271 if (chain == p->free_hunks) { 257 if (chain == p->free_hunks) {
272 chain = NULL; 258 chain = NULL;
273 } 259 }
274 260
275 /* the input filter i.e. that moves HTTP/1.1 chunks 261 /*
276 from a read chain to an incoming chain */ 262 * the input filter i.e. that moves HTTP/1.1 chunks
263 * from a read chain to an incoming chain
264 */
277 265
278 if (p->input_filter) { 266 if (p->input_filter) {
279 if (p->input_filter(p, chain) == NGX_ERROR) { 267 if (p->input_filter(p, chain) == NGX_ERROR) {
280 return NGX_ABORT; 268 return NGX_ABORT;
281 } 269 }
282 } 270 }
283 271
284 ngx_log_debug(p->log, "rest chain: %08X" _ entry); 272 ngx_log_debug(p->log, "rest chain: %08X" _ ce);
285 273
286 /* if the rest hunks are file hunks then move them to a file chain 274 /*
287 otherwise add them to a free chain */ 275 * if the rest hunks are file hunks then move them to a file chain
288 276 * otherwise add them to a free chain
289 if (entry) { 277 */
290 if (entry->hunk->shadow) { 278
291 p->file_hunks = entry; 279 if (rest) {
280 if (rest->hunk->shadow) {
281 p->file_hunks = rest;
292 282
293 } else { 283 } else {
294 if (p->free_hunks) { 284 if (p->free_hunks) {
295 p->free_hunks->next = entry; 285 p->free_hunks->next = rest;
296 286
297 } else { 287 } else {
298 p->free_hunks = entry; 288 p->free_hunks = rest;
299 } 289 }
300 } 290 }
301 291
302 p->block_upstream = 0;
303 break; 292 break;
304 } 293 }
305 } 294 }
306 295
307 ngx_log_debug(p->log, "eof: %d block: %d" _ 296 ngx_log_debug(p->log, "eof: %d" _ p->upstream_eof);
308 p->upstream_eof _ p->block_upstream); 297
309 298 /*
310 /* if there's the end of upstream response then move 299 * if there's the end of upstream response then move
311 the partially filled hunk from a free chain to an incoming chain */ 300 * the partially filled hunk from a free chain to an incoming chain
301 */
312 302
313 if (p->upstream_eof) { 303 if (p->upstream_eof) {
314 p->upstream->read->ready = 0;
315
316 if (p->free_hunks 304 if (p->free_hunks
317 && p->free_hunks->hunk->pos < p->free_hunks->hunk->last) 305 && p->free_hunks->hunk->pos < p->free_hunks->hunk->last)
318 { 306 {
319 307
320 #if (NGX_EVENT_COPY_FILTER) 308 #if (NGX_EVENT_COPY_FILTER)
321 309
322 if (p->input_filter(p, NULL) == NGX_ERROR) { 310 if (p->input_filter(p, NULL) == NGX_ERROR) {
323 return NGX_ABORT; 311 return NGX_ABORT;
324 } 312 }
325 #else 313 #else
314
326 if (p->input_filter) { 315 if (p->input_filter) {
327 if (p->input_filter(p, NULL) == NGX_ERROR) { 316 if (p->input_filter(p, NULL) == NGX_ERROR) {
328 return NGX_ABORT; 317 return NGX_ABORT;
329 } 318 }
330 319
331 } else { 320 } else {
332 entry = p->free_hunks; 321 ce = p->free_hunks;
333 322
334 if (p->in_hunks) { 323 if (p->in_hunks) {
335 p->last_in_hunk->next = entry; 324 p->last_in_hunk->next = ce;
336 325
337 } else { 326 } else {
338 p->in_hunks = entry; 327 p->in_hunks = ce;
339 } 328 }
340 329
341 p->last_in_hunk = entry; 330 p->last_in_hunk = ce;
342 } 331 }
343 332
344 p->free_hunks = entry->next; 333 p->free_hunks = ce->next;
345 entry->next = NULL; 334 ce->next = NULL;
346 #endif 335
336 #endif /* NGX_EVENT_COPY_FILTER */
347 } 337 }
348 338
349 #if 0 339 #if 0
350 /* free the unneeded hunks */ 340 /* free the unneeded hunks */
351 341
352 for (entry = p->free_hunks; entry; entry = ce->next) { 342 for (ce = p->free_hunks; ce; ce = ce->next) {
353 ngx_free_hunk(p->pool, entry->hunk); 343 ngx_free_hunk(p->pool, ce->hunk);
354 } 344 }
355 #endif 345 #endif
356 346
357 if (p->in_hunks) { 347 if (p->in_hunks) {
358 p->last_in_hunk->hunk->type |= NGX_HUNK_LAST; 348 p->last_in_hunk->hunk->type |= NGX_HUNK_LAST;
385 375
386 p->upstream_level--; 376 p->upstream_level--;
387 377
388 ngx_log_debug(p->log, "upstream level: %d" _ p->upstream_level); 378 ngx_log_debug(p->log, "upstream level: %d" _ p->upstream_level);
389 379
390 if (p->upstream_level == 0 380 if (p->upstream_level == 0) {
391 && p->block_upstream 381 if (ngx_handler_read_event(p->upstream->read) == NGX_ERROR) {
392 && ngx_event_flags & NGX_USE_LEVEL_EVENT)
393 {
394 if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0) == NGX_ERROR) {
395 return NGX_ABORT; 382 return NGX_ABORT;
396 } 383 }
397
398 p->upstream->read->blocked = 1;
399
400 return NGX_AGAIN;
401 } 384 }
402 385
403 if (p->upstream_eof) { 386 if (p->upstream_eof) {
404 return NGX_OK; 387 return NGX_OK;
405 388
412 int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p) 395 int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p)
413 { 396 {
414 int rc; 397 int rc;
415 ngx_hunk_t *h; 398 ngx_hunk_t *h;
416 ngx_chain_t *entry; 399 ngx_chain_t *entry;
400
401 #if 0
417 402
418 if (p->upstream_level == 0 403 if (p->upstream_level == 0
419 && p->downstream_level == 0 404 && p->downstream_level == 0
420 && p->busy_hunk == NULL 405 && p->busy_hunk == NULL
421 && p->out_hunks == NULL 406 && p->out_hunks == NULL
429 414
430 p->downstream->write->blocked = 1; 415 p->downstream->write->blocked = 1;
431 return NGX_AGAIN; 416 return NGX_AGAIN;
432 } 417 }
433 418
419 #endif
420
434 p->downstream_level++; 421 p->downstream_level++;
435 422
436 ngx_log_debug(p->log, "write to downstream"); 423 ngx_log_debug(p->log, "write to downstream");
437 424
438 entry = NULL; 425 entry = NULL;
693 680
694 static int ngx_event_proxy_copy_input_filter(ngx_event_proxy_t *p, 681 static int ngx_event_proxy_copy_input_filter(ngx_event_proxy_t *p,
695 ngx_chain_t *chain) 682 ngx_chain_t *chain)
696 { 683 {
697 ngx_hunk_t *h; 684 ngx_hunk_t *h;
698 ngx_chain_t *entry, *temp; 685 ngx_chain_t *ce, *temp;
699 686
700 if (p->upstream_eof) { 687 if (p->upstream_eof) {
701 entry = p->free_hunks; 688
702 689 /* THINK comment */
703 if (p->in_hunks) { 690
704 p->last_in_hunk->next = entry; 691 ce = p->free_hunks;
705 692
706 } else { 693 ngx_chain_add_ce(p->in_hunk, p->last_in_hunk, ce);
707 p->in_hunks = entry; 694
708 } 695 p->free_hunks = ce->next;
709 696 ce->next = NULL;
710 p->last_in_hunk = entry;
711
712 p->free_hunks = entry->next;
713 entry->next = NULL;
714 697
715 return NGX_OK; 698 return NGX_OK;
716 } 699 }
717 700
718 for (entry = chain; entry; entry = entry->next) { 701 for (ce = chain; ce; ce = ce->next) {
719 ngx_test_null(h, ngx_alloc_hunk(p->pool), NGX_ERROR); 702 ngx_test_null(h, ngx_alloc_hunk(p->pool), NGX_ERROR);
720 ngx_memcpy(h, entry->hunk, sizeof(ngx_hunk_t)); 703 ngx_memcpy(h, ce->hunk, sizeof(ngx_hunk_t));
721 h->shadow = entry->hunk; 704 h->shadow = ce->hunk;
722 h->type |= NGX_HUNK_LAST_SHADOW|NGX_HUNK_RECYCLED; 705 h->type |= NGX_HUNK_LAST_SHADOW|NGX_HUNK_RECYCLED;
723 entry->hunk->shadow = h; 706 ce->hunk->shadow = h;
724 707
725 ngx_test_null(temp, ngx_alloc_chain_entry(p->pool), NGX_ERROR); 708 ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ERROR);
726 temp->hunk = h; 709
727 temp->next = NULL; 710 ngx_chain_add_ce(p->in_hunk, p->last_in_hunk, te);
728
729 if (p->in_hunks) {
730 p->last_in_hunk->next = temp;
731
732 } else {
733 p->in_hunks = temp;
734 }
735
736 p->last_in_hunk = temp;
737 } 711 }
738 712
739 return NGX_OK; 713 return NGX_OK;
740 } 714 }
741 715