changeset 9010:a5aebd51e4c7 quic

QUIC: stream lingering. Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it can persist after connection is closed by application. During this period, server is expecting stream final size from client for correct flow control. Also, buffered output is sent to client as more flow control credit is granted.
author Roman Arutyunyan <arut@nginx.com>
date Sat, 05 Feb 2022 12:54:54 +0300
parents e5f16d886c97
children f9c788f3f5cc
files src/event/quic/ngx_event_quic.c src/event/quic/ngx_event_quic.h src/event/quic/ngx_event_quic_connection.h src/event/quic/ngx_event_quic_frames.c src/event/quic/ngx_event_quic_streams.c src/http/v3/ngx_http_v3_uni.c
diffstat 6 files changed, 273 insertions(+), 194 deletions(-) [+]
line wrap: on
line diff
--- a/src/event/quic/ngx_event_quic.c	Tue Feb 15 14:12:34 2022 +0300
+++ b/src/event/quic/ngx_event_quic.c	Sat Feb 05 12:54:54 2022 +0300
@@ -303,6 +303,7 @@
     ctp->active_connection_id_limit = 2;
 
     ngx_queue_init(&qc->streams.uninitialized);
+    ngx_queue_init(&qc->streams.free);
 
     qc->streams.recv_max_data = qc->tp.initial_max_data;
     qc->streams.recv_window = qc->streams.recv_max_data;
--- a/src/event/quic/ngx_event_quic.h	Tue Feb 15 14:12:34 2022 +0300
+++ b/src/event/quic/ngx_event_quic.h	Sat Feb 05 12:54:54 2022 +0300
@@ -78,12 +78,14 @@
     uint64_t                       id;
     uint64_t                       acked;
     uint64_t                       send_max_data;
+    uint64_t                       send_offset;
+    uint64_t                       send_final_size;
     uint64_t                       recv_max_data;
     uint64_t                       recv_offset;
     uint64_t                       recv_window;
     uint64_t                       recv_last;
     uint64_t                       recv_size;
-    uint64_t                       final_size;
+    uint64_t                       recv_final_size;
     ngx_chain_t                   *in;
     ngx_chain_t                   *out;
     ngx_uint_t                     cancelable;  /* unsigned  cancelable:1; */
--- a/src/event/quic/ngx_event_quic_connection.h	Tue Feb 15 14:12:34 2022 +0300
+++ b/src/event/quic/ngx_event_quic_connection.h	Sat Feb 05 12:54:54 2022 +0300
@@ -114,13 +114,16 @@
 typedef struct {
     ngx_rbtree_t                      tree;
     ngx_rbtree_node_t                 sentinel;
+
     ngx_queue_t                       uninitialized;
+    ngx_queue_t                       free;
 
     uint64_t                          sent;
     uint64_t                          recv_offset;
     uint64_t                          recv_window;
     uint64_t                          recv_last;
     uint64_t                          recv_max_data;
+    uint64_t                          send_offset;
     uint64_t                          send_max_data;
 
     uint64_t                          server_max_streams_uni;
--- a/src/event/quic/ngx_event_quic_frames.c	Tue Feb 15 14:12:34 2022 +0300
+++ b/src/event/quic/ngx_event_quic_frames.c	Sat Feb 05 12:54:54 2022 +0300
@@ -391,6 +391,10 @@
         return NGX_ERROR;
     }
 
+    if (f->type == NGX_QUIC_FT_STREAM) {
+        f->u.stream.fin = 0;
+    }
+
     ngx_queue_insert_after(&f->queue, &nf->queue);
 
     return NGX_OK;
--- a/src/event/quic/ngx_event_quic_streams.c	Tue Feb 15 14:12:34 2022 +0300
+++ b/src/event/quic/ngx_event_quic_streams.c	Sat Feb 05 12:54:54 2022 +0300
@@ -13,6 +13,8 @@
 #define NGX_QUIC_STREAM_GONE     (void *) -1
 
 
+static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs,
+    ngx_uint_t err);
 static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c);
 static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c);
 static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id);
@@ -28,11 +30,12 @@
     size_t size);
 static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
     ngx_chain_t *in, off_t limit);
-static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
+static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs);
 static void ngx_quic_stream_cleanup_handler(void *data);
-static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last);
-static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last);
-static ngx_int_t ngx_quic_update_max_stream_data(ngx_connection_t *c);
+static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs);
+static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last);
+static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last);
+static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs);
 static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c);
 static void ngx_quic_set_event(ngx_event_t *ev);
 
@@ -186,15 +189,20 @@
     ns = 0;
 #endif
 
-    for (node = ngx_rbtree_min(tree->root, tree->sentinel);
-         node;
-         node = ngx_rbtree_next(tree, node))
-    {
+    node = ngx_rbtree_min(tree->root, tree->sentinel);
+
+    while (node) {
         qs = (ngx_quic_stream_t *) node;
+        node = ngx_rbtree_next(tree, node);
 
         qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
         qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
 
+        if (qs->connection == NULL) {
+            ngx_quic_close_stream(qs);
+            continue;
+        }
+
         ngx_quic_set_event(qs->connection->read);
         ngx_quic_set_event(qs->connection->write);
 
@@ -213,13 +221,17 @@
 ngx_int_t
 ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
 {
+    return ngx_quic_do_reset_stream(c->quic, err);
+}
+
+
+static ngx_int_t
+ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err)
+{
     ngx_connection_t       *pc;
     ngx_quic_frame_t       *frame;
-    ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
-    qs = c->quic;
-
     if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD
         || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
         || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
@@ -228,10 +240,14 @@
     }
 
     qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
+    qs->send_final_size = qs->send_offset;
 
     pc = qs->parent;
     qc = ngx_quic_get_connection(pc);
 
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+                   "quic stream id:0x%xL reset", qs->id);
+
     frame = ngx_quic_alloc_frame(pc);
     if (frame == NULL) {
         return NGX_ERROR;
@@ -241,10 +257,13 @@
     frame->type = NGX_QUIC_FT_RESET_STREAM;
     frame->u.reset_stream.id = qs->id;
     frame->u.reset_stream.error_code = err;
-    frame->u.reset_stream.final_size = c->sent;
+    frame->u.reset_stream.final_size = qs->send_offset;
 
     ngx_quic_queue_frame(qc, frame);
 
+    ngx_quic_free_chain(pc, qs->out);
+    qs->out = NULL;
+
     return NGX_OK;
 }
 
@@ -271,10 +290,7 @@
 static ngx_int_t
 ngx_quic_shutdown_stream_send(ngx_connection_t *c)
 {
-    ngx_connection_t       *pc;
-    ngx_quic_frame_t       *frame;
-    ngx_quic_stream_t      *qs;
-    ngx_quic_connection_t  *qc;
+    ngx_quic_stream_t  *qs;
 
     qs = c->quic;
 
@@ -284,32 +300,13 @@
         return NGX_OK;
     }
 
-    qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
-
-    pc = qs->parent;
-    qc = ngx_quic_get_connection(pc);
+    qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
+    qs->send_final_size = c->sent;
 
-    frame = ngx_quic_alloc_frame(pc);
-    if (frame == NULL) {
-        return NGX_ERROR;
-    }
-
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
                    "quic stream id:0x%xL send shutdown", qs->id);
 
-    frame->level = ssl_encryption_application;
-    frame->type = NGX_QUIC_FT_STREAM;
-    frame->u.stream.off = 1;
-    frame->u.stream.len = 1;
-    frame->u.stream.fin = 1;
-
-    frame->u.stream.stream_id = qs->id;
-    frame->u.stream.offset = c->sent;
-    frame->u.stream.length = 0;
-
-    ngx_quic_queue_frame(qc, frame);
-
-    return NGX_OK;
+    return ngx_quic_stream_flush(qs);
 }
 
 
@@ -341,7 +338,7 @@
         return NGX_ERROR;
     }
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
                    "quic stream id:0x%xL recv shutdown", qs->id);
 
     frame->level = ssl_encryption_application;
@@ -591,6 +588,7 @@
 {
     ngx_log_t              *log;
     ngx_pool_t             *pool;
+    ngx_queue_t            *q;
     ngx_connection_t       *sc;
     ngx_quic_stream_t      *qs;
     ngx_pool_cleanup_t     *cln;
@@ -601,25 +599,41 @@
 
     qc = ngx_quic_get_connection(c);
 
-    pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
-    if (pool == NULL) {
-        return NULL;
+    if (!ngx_queue_empty(&qc->streams.free)) {
+        q = ngx_queue_head(&qc->streams.free);
+        qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
+        ngx_queue_remove(&qs->queue);
+
+    } else {
+        /*
+         * the number of streams is limited by transport
+         * parameters and application requirements
+         */
+
+        qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t));
+        if (qs == NULL) {
+            return NULL;
+        }
     }
 
-    qs = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t));
-    if (qs == NULL) {
-        ngx_destroy_pool(pool);
-        return NULL;
-    }
+    ngx_memzero(qs, sizeof(ngx_quic_stream_t));
 
     qs->node.key = id;
     qs->parent = c;
     qs->id = id;
-    qs->final_size = (uint64_t) -1;
+    qs->send_final_size = (uint64_t) -1;
+    qs->recv_final_size = (uint64_t) -1;
+
+    pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
+    if (pool == NULL) {
+        ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
+        return NULL;
+    }
 
     log = ngx_palloc(pool, sizeof(ngx_log_t));
     if (log == NULL) {
         ngx_destroy_pool(pool);
+        ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
         return NULL;
     }
 
@@ -629,6 +643,7 @@
     sc = ngx_get_connection(c->fd, log);
     if (sc == NULL) {
         ngx_destroy_pool(pool);
+        ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
         return NULL;
     }
 
@@ -697,6 +712,7 @@
     if (cln == NULL) {
         ngx_close_connection(sc);
         ngx_destroy_pool(pool);
+        ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
         return NULL;
     }
 
@@ -737,7 +753,7 @@
         return NGX_ERROR;
     }
 
-    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
                    "quic stream id:0x%xL recv buf:%uz", qs->id, size);
 
     if (size == 0) {
@@ -763,7 +779,7 @@
         rev->ready = 0;
 
         if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD
-            && qs->recv_offset == qs->final_size)
+            && qs->recv_offset == qs->recv_final_size)
         {
             qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
         }
@@ -781,7 +797,7 @@
     ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
                    "quic stream id:0x%xL recv len:%z", qs->id, len);
 
-    if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) {
+    if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) {
         return NGX_ERROR;
     }
 
@@ -822,9 +838,7 @@
     off_t                   flow;
     size_t                  n;
     ngx_event_t            *wev;
-    ngx_chain_t            *out;
     ngx_connection_t       *pc;
-    ngx_quic_frame_t       *frame;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
@@ -842,7 +856,8 @@
 
     qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
 
-    flow = ngx_quic_max_stream_flow(c);
+    flow = qs->acked + qc->conf->stream_buffer_size - c->sent;
+
     if (flow == 0) {
         wev->ready = 0;
         return in;
@@ -852,37 +867,15 @@
         limit = flow;
     }
 
-    in = ngx_quic_write_chain(pc, &qs->out, in, limit, 0, &n);
+    in = ngx_quic_write_chain(pc, &qs->out, in, limit,
+                              c->sent - qs->send_offset, &n);
     if (in == NGX_CHAIN_ERROR) {
         return NGX_CHAIN_ERROR;
     }
 
-    out = ngx_quic_read_chain(pc, &qs->out, n);
-    if (out == NGX_CHAIN_ERROR) {
-        return NGX_CHAIN_ERROR;
-    }
-
-    frame = ngx_quic_alloc_frame(pc);
-    if (frame == NULL) {
-        return NGX_CHAIN_ERROR;
-    }
-
-    frame->level = ssl_encryption_application;
-    frame->type = NGX_QUIC_FT_STREAM;
-    frame->data = out;
-    frame->u.stream.off = 1;
-    frame->u.stream.len = 1;
-    frame->u.stream.fin = 0;
-
-    frame->u.stream.stream_id = qs->id;
-    frame->u.stream.offset = c->sent;
-    frame->u.stream.length = n;
-
     c->sent += n;
     qc->streams.sent += n;
 
-    ngx_quic_queue_frame(qc, frame);
-
     if (flow == (off_t) n) {
         wev->ready = 0;
     }
@@ -890,61 +883,96 @@
     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
                    "quic send_chain sent:%uz", n);
 
+    if (ngx_quic_stream_flush(qs) != NGX_OK) {
+        return NGX_CHAIN_ERROR;
+    }
+
     return in;
 }
 
 
-static size_t
-ngx_quic_max_stream_flow(ngx_connection_t *c)
+static ngx_int_t
+ngx_quic_stream_flush(ngx_quic_stream_t *qs)
 {
-    size_t                  size;
-    uint64_t                sent, unacked;
-    ngx_quic_stream_t      *qs;
+    off_t                   limit;
+    size_t                  len;
+    ngx_uint_t              last;
+    ngx_chain_t            *out, *cl;
+    ngx_quic_frame_t       *frame;
+    ngx_connection_t       *pc;
     ngx_quic_connection_t  *qc;
 
-    qs = c->quic;
-    qc = ngx_quic_get_connection(qs->parent);
+    if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
+        return NGX_OK;
+    }
 
-    size = qc->conf->stream_buffer_size;
-    sent = c->sent;
-    unacked = sent - qs->acked;
+    pc = qs->parent;
+    qc = ngx_quic_get_connection(pc);
 
     if (qc->streams.send_max_data == 0) {
         qc->streams.send_max_data = qc->ctp.initial_max_data;
     }
 
-    if (unacked >= size) {
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic send flow hit buffer size");
-        return 0;
+    limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset,
+                    qs->send_max_data - qs->send_offset);
+
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+                   "quic stream id:0x%xL flush limit:%O", qs->id, limit);
+
+    out = ngx_quic_read_chain(pc, &qs->out, limit);
+    if (out == NGX_CHAIN_ERROR) {
+        return NGX_ERROR;
     }
 
-    size -= unacked;
+    len = 0;
+    last = 0;
+
+    for (cl = out; cl; cl = cl->next) {
+        len += cl->buf->last - cl->buf->pos;
+    }
 
-    if (qc->streams.sent >= qc->streams.send_max_data) {
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic send flow hit MAX_DATA");
-        return 0;
+    if (qs->send_final_size != (uint64_t) -1
+        && qs->send_final_size == qs->send_offset + len)
+    {
+        qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
+        last = 1;
+    }
+
+    if (len == 0 && !last) {
+        return NGX_OK;
     }
 
-    if (qc->streams.sent + size > qc->streams.send_max_data) {
-        size = qc->streams.send_max_data - qc->streams.sent;
+    frame = ngx_quic_alloc_frame(pc);
+    if (frame == NULL) {
+        return NGX_ERROR;
     }
 
-    if (sent >= qs->send_max_data) {
-        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                       "quic send flow hit MAX_STREAM_DATA");
-        return 0;
+    frame->level = ssl_encryption_application;
+    frame->type = NGX_QUIC_FT_STREAM;
+    frame->data = out;
+
+    frame->u.stream.off = 1;
+    frame->u.stream.len = 1;
+    frame->u.stream.fin = last;
+
+    frame->u.stream.stream_id = qs->id;
+    frame->u.stream.offset = qs->send_offset;
+    frame->u.stream.length = len;
+
+    ngx_quic_queue_frame(qc, frame);
+
+    qs->send_offset += len;
+    qc->streams.send_offset += len;
+
+    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+                   "quic stream id:0x%xL flush len:%uz last:%ui",
+                   qs->id, len, last);
+
+    if (qs->connection == NULL) {
+        return ngx_quic_close_stream(qs);
     }
 
-    if (sent + size > qs->send_max_data) {
-        size = qs->send_max_data - sent;
-    }
-
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic send flow:%uz", size);
-
-    return size;
+    return NGX_OK;
 }
 
 
@@ -953,40 +981,67 @@
 {
     ngx_connection_t *c = data;
 
+    ngx_quic_stream_t  *qs;
+
+    qs = c->quic;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
+                   "quic stream id:0x%xL cleanup", qs->id);
+
+    if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) {
+        ngx_quic_close_connection(c, NGX_ERROR);
+        return;
+    }
+
+    qs->connection = NULL;
+
+    if (ngx_quic_close_stream(qs) != NGX_OK) {
+        ngx_quic_close_connection(c, NGX_ERROR);
+        return;
+    }
+}
+
+
+static ngx_int_t
+ngx_quic_close_stream(ngx_quic_stream_t *qs)
+{
     ngx_connection_t       *pc;
     ngx_quic_frame_t       *frame;
-    ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
-    qs = c->quic;
     pc = qs->parent;
     qc = ngx_quic_get_connection(pc);
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic stream id:0x%xL cleanup", qs->id);
+    if (!qc->closing) {
+        /* make sure everything is sent and final size is received */
+
+        if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
+            || qs->send_state == NGX_QUIC_STREAM_SEND_READY
+            || qs->send_state == NGX_QUIC_STREAM_SEND_SEND)
+        {
+            return NGX_OK;
+        }
+    }
+
+    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+                   "quic stream id:0x%xL close", qs->id);
+
+    ngx_quic_free_chain(pc, qs->in);
+    ngx_quic_free_chain(pc, qs->out);
 
     ngx_rbtree_delete(&qc->streams.tree, &qs->node);
-    ngx_quic_free_chain(pc, qs->in);
-    ngx_quic_free_chain(pc, qs->out);
+    ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
 
     if (qc->closing) {
         /* schedule handler call to continue ngx_quic_close_connection() */
         ngx_post_event(pc->read, &ngx_posted_events);
-        return;
+        return NGX_OK;
     }
 
-    if (qc->error) {
-        goto done;
-    }
-
-    (void) ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN);
-
-    (void) ngx_quic_update_flow(c, qs->recv_last);
-
     if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) {
         frame = ngx_quic_alloc_frame(pc);
         if (frame == NULL) {
-            goto done;
+            return NGX_ERROR;
         }
 
         frame->level = ssl_encryption_application;
@@ -1004,13 +1059,11 @@
         ngx_quic_queue_frame(qc, frame);
     }
 
-done:
-
-    (void) ngx_quic_output(pc);
-
     if (qc->shutdown) {
         ngx_post_event(pc->read, &ngx_posted_events);
     }
+
+    return NGX_OK;
 }
 
 
@@ -1020,7 +1073,6 @@
 {
     size_t                    size;
     uint64_t                  last;
-    ngx_connection_t         *sc;
     ngx_quic_stream_t        *qs;
     ngx_quic_connection_t    *qc;
     ngx_quic_stream_frame_t  *f;
@@ -1048,19 +1100,17 @@
         return NGX_OK;
     }
 
-    sc = qs->connection;
-
     if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
         && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
     {
         return NGX_OK;
     }
 
-    if (ngx_quic_control_flow(sc, last) != NGX_OK) {
+    if (ngx_quic_control_flow(qs, last) != NGX_OK) {
         return NGX_ERROR;
     }
 
-    if (qs->final_size != (uint64_t) -1 && last > qs->final_size) {
+    if (qs->recv_final_size != (uint64_t) -1 && last > qs->recv_final_size) {
         qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
         return NGX_ERROR;
     }
@@ -1075,7 +1125,8 @@
     }
 
     if (f->fin) {
-        if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
+        if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != last)
+        {
             qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
             return NGX_ERROR;
         }
@@ -1085,7 +1136,7 @@
             return NGX_ERROR;
         }
 
-        qs->final_size = last;
+        qs->recv_final_size = last;
         qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN;
     }
 
@@ -1099,13 +1150,17 @@
     qs->recv_size += size;
 
     if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN
-        && qs->recv_size == qs->final_size)
+        && qs->recv_size == qs->recv_final_size)
     {
         qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD;
     }
 
+    if (qs->connection == NULL) {
+        return ngx_quic_close_stream(qs);
+    }
+
     if (f->offset == qs->recv_offset) {
-        ngx_quic_set_event(sc->read);
+        ngx_quic_set_event(qs->connection->read);
     }
 
     return NGX_OK;
@@ -1128,20 +1183,26 @@
         return NGX_OK;
     }
 
-    if (tree->root != tree->sentinel
-        && qc->streams.sent >= qc->streams.send_max_data)
+    if (tree->root == tree->sentinel
+        || qc->streams.send_offset < qc->streams.send_max_data)
     {
-
-        for (node = ngx_rbtree_min(tree->root, tree->sentinel);
-             node;
-             node = ngx_rbtree_next(tree, node))
-        {
-            qs = (ngx_quic_stream_t *) node;
-            ngx_quic_set_event(qs->connection->write);
-        }
+        /* not blocked on MAX_DATA */
+        qc->streams.send_max_data = f->max_data;
+        return NGX_OK;
     }
 
     qc->streams.send_max_data = f->max_data;
+    node = ngx_rbtree_min(tree->root, tree->sentinel);
+
+    while (node && qc->streams.send_offset < qc->streams.send_max_data) {
+
+        qs = (ngx_quic_stream_t *) node;
+        node = ngx_rbtree_next(tree, node);
+
+        if (ngx_quic_stream_flush(qs) != NGX_OK) {
+            return NGX_ERROR;
+        }
+    }
 
     return NGX_OK;
 }
@@ -1189,7 +1250,7 @@
         return NGX_OK;
     }
 
-    return ngx_quic_update_max_stream_data(qs->connection);
+    return ngx_quic_update_max_stream_data(qs);
 }
 
 
@@ -1197,7 +1258,6 @@
 ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
     ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
 {
-    uint64_t                sent;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
@@ -1224,15 +1284,15 @@
         return NGX_OK;
     }
 
-    sent = qs->connection->sent;
-
-    if (sent >= qs->send_max_data) {
-        ngx_quic_set_event(qs->connection->write);
+    if (qs->send_offset < qs->send_max_data) {
+        /* not blocked on MAX_STREAM_DATA */
+        qs->send_max_data = f->limit;
+        return NGX_OK;
     }
 
     qs->send_max_data = f->limit;
 
-    return NGX_OK;
+    return ngx_quic_stream_flush(qs);
 }
 
 
@@ -1240,7 +1300,6 @@
 ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
     ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
 {
-    ngx_connection_t       *sc;
     ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
@@ -1271,13 +1330,13 @@
 
     qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
 
-    sc = qs->connection;
-
-    if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
+    if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) {
         return NGX_ERROR;
     }
 
-    if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
+    if (qs->recv_final_size != (uint64_t) -1
+        && qs->recv_final_size != f->final_size)
+    {
         qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
         return NGX_ERROR;
     }
@@ -1287,12 +1346,16 @@
         return NGX_ERROR;
     }
 
-    qs->final_size = f->final_size;
+    qs->recv_final_size = f->final_size;
 
-    if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
+    if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) {
         return NGX_ERROR;
     }
 
+    if (qs->connection == NULL) {
+        return ngx_quic_close_stream(qs);
+    }
+
     ngx_quic_set_event(qs->connection->read);
 
     return NGX_OK;
@@ -1325,10 +1388,14 @@
         return NGX_OK;
     }
 
-    if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) {
+    if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) {
         return NGX_ERROR;
     }
 
+    if (qs->connection == NULL) {
+        return ngx_quic_close_stream(qs);
+    }
+
     ngx_quic_set_event(qs->connection->write);
 
     return NGX_OK;
@@ -1378,30 +1445,37 @@
         return;
     }
 
+    if (qs->connection == NULL) {
+        qs->acked += f->u.stream.length;
+        return;
+    }
+
     sent = qs->connection->sent;
     unacked = sent - qs->acked;
+    qs->acked += f->u.stream.length;
 
-    if (unacked >= qc->conf->stream_buffer_size) {
-        ngx_quic_set_event(qs->connection->write);
+    ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "quic stream id:0x%xL ack len:%uL acked:%uL unacked:%uL",
+                   qs->id, f->u.stream.length, qs->acked, sent - qs->acked);
+
+    if (unacked != qc->conf->stream_buffer_size) {
+        /* not blocked on buffer size */
+        return;
     }
 
-    qs->acked += f->u.stream.length;
-
-    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0,
-                   "quic stream ack len:%uL acked:%uL unacked:%uL",
-                   f->u.stream.length, qs->acked, sent - qs->acked);
+    ngx_quic_set_event(qs->connection->write);
 }
 
 
 static ngx_int_t
-ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
+ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last)
 {
     uint64_t                len;
-    ngx_quic_stream_t      *qs;
+    ngx_connection_t       *pc;
     ngx_quic_connection_t  *qc;
 
-    qs = c->quic;
-    qc = ngx_quic_get_connection(qs->parent);
+    pc = qs->parent;
+    qc = ngx_quic_get_connection(pc);
 
     if (last <= qs->recv_last) {
         return NGX_OK;
@@ -1409,9 +1483,9 @@
 
     len = last - qs->recv_last;
 
-    ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic flow control msd:%uL/%uL md:%uL/%uL",
-                   last, qs->recv_max_data, qc->streams.recv_last + len,
+    ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+                   "quic stream id:0x%xL flow control msd:%uL/%uL md:%uL/%uL",
+                   qs->id, last, qs->recv_max_data, qc->streams.recv_last + len,
                    qc->streams.recv_max_data);
 
     qs->recv_last += len;
@@ -1435,14 +1509,12 @@
 
 
 static ngx_int_t
-ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
+ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last)
 {
     uint64_t                len;
     ngx_connection_t       *pc;
-    ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
-    qs = c->quic;
     pc = qs->parent;
     qc = ngx_quic_get_connection(pc);
 
@@ -1452,13 +1524,13 @@
 
     len = last - qs->recv_offset;
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic flow update %uL", last);
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+                   "quic stream id:0x%xL flow update %uL", qs->id, last);
 
     qs->recv_offset += len;
 
     if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) {
-        if (ngx_quic_update_max_stream_data(c) != NGX_OK) {
+        if (ngx_quic_update_max_stream_data(qs) != NGX_OK) {
             return NGX_ERROR;
         }
     }
@@ -1478,15 +1550,13 @@
 
 
 static ngx_int_t
-ngx_quic_update_max_stream_data(ngx_connection_t *c)
+ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs)
 {
     uint64_t                recv_max_data;
     ngx_connection_t       *pc;
     ngx_quic_frame_t       *frame;
-    ngx_quic_stream_t      *qs;
     ngx_quic_connection_t  *qc;
 
-    qs = c->quic;
     pc = qs->parent;
     qc = ngx_quic_get_connection(pc);
 
@@ -1502,8 +1572,9 @@
 
     qs->recv_max_data = recv_max_data;
 
-    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
-                   "quic flow update msd:%uL", qs->recv_max_data);
+    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+                   "quic stream id:0x%xL flow update msd:%uL",
+                   qs->id, qs->recv_max_data);
 
     frame = ngx_quic_alloc_frame(pc);
     if (frame == NULL) {
--- a/src/http/v3/ngx_http_v3_uni.c	Tue Feb 15 14:12:34 2022 +0300
+++ b/src/http/v3/ngx_http_v3_uni.c	Sat Feb 05 12:54:54 2022 +0300
@@ -295,8 +295,6 @@
 }
 
 
-/* XXX async & buffered stream writes */
-
 ngx_connection_t *
 ngx_http_v3_create_push_stream(ngx_connection_t *c, uint64_t push_id)
 {