[master] 91143910a Redo H/2 tx data handling
Dridi Boukelmoune
dridi.boukelmoune at gmail.com
Mon Aug 30 08:31:07 UTC 2021
commit 91143910a2ea2a7af7db390e02096786c7a78645
Author: Martin Blix Grydeland <martin at varnish-software.com>
Date: Tue Jun 22 11:49:24 2021 +0200
Redo H/2 tx data handling
This implements stream data handling using a buffer between the H/2
session thread and each stream thread. This is needed to avoid head of
line blocking on the session socket when a data frame is received for a
stream thread that is not yet ready to receive it.
The buffer used will have to be as large as the send window the peer
expects at the time the stream is opened. This will typically be 65535
unless the h2_initial_window_size parameter has been changed.
Stream window updates will then be issued only once data is removed from
the buffer by the request body being consumed from the request handling
thread, limited in size to what space is then available in the buffer.
diff --git a/bin/varnishd/http2/cache_http2.h b/bin/varnishd/http2/cache_http2.h
index ee91d6fac..a075f6305 100644
--- a/bin/varnishd/http2/cache_http2.h
+++ b/bin/varnishd/http2/cache_http2.h
@@ -115,6 +115,16 @@ enum h2_stream_e {
#define H2_FRAME_FLAGS(l,u,v) extern const uint8_t H2FF_##u;
#include "tbl/h2_frames.h"
+struct h2_rxbuf {
+ unsigned magic;
+#define H2_RXBUF_MAGIC 0x73f9fb27
+ unsigned size;
+ uint64_t tail;
+ uint64_t head;
+ struct stv_buffer *stvbuf;
+ uint8_t data[];
+};
+
struct h2_req {
unsigned magic;
#define H2_REQ_MAGIC 0x03411584
@@ -134,7 +144,7 @@ struct h2_req {
/* Where to wake this stream up */
struct worker *wrk;
- ssize_t reqbody_bytes;
+ struct h2_rxbuf *rxbuf;
VTAILQ_ENTRY(h2_req) tx_list;
h2_error error;
@@ -147,7 +157,6 @@ struct h2_sess {
#define H2_SESS_MAGIC 0xa16f7e4b
pthread_t rxthr;
- struct h2_req *mailcall;
pthread_cond_t *cond;
pthread_cond_t winupd_cond[1];
diff --git a/bin/varnishd/http2/cache_http2_proto.c b/bin/varnishd/http2/cache_http2_proto.c
index 61c97a894..155717fac 100644
--- a/bin/varnishd/http2/cache_http2_proto.c
+++ b/bin/varnishd/http2/cache_http2_proto.c
@@ -40,6 +40,7 @@
#include "cache/cache_filter.h"
#include "http2/cache_http2.h"
#include "cache/cache_objhead.h"
+#include "storage/storage.h"
#include "vend.h"
#include "vtcp.h"
@@ -172,6 +173,7 @@ h2_del_req(struct worker *wrk, struct h2_req *r2)
{
struct h2_sess *h2;
struct sess *sp;
+ struct stv_buffer *stvbuf;
CHECK_OBJ_NOTNULL(r2, H2_REQ_MAGIC);
AZ(r2->scheduled);
@@ -185,7 +187,18 @@ h2_del_req(struct worker *wrk, struct h2_req *r2)
/* XXX: PRIORITY reshuffle */
VTAILQ_REMOVE(&h2->streams, r2, list);
Lck_Unlock(&sp->mtx);
+
assert(!WS_IsReserved(r2->req->ws));
+ AZ(r2->req->ws->r);
+
+ CHECK_OBJ_ORNULL(r2->rxbuf, H2_RXBUF_MAGIC);
+ if (r2->rxbuf) {
+ stvbuf = r2->rxbuf->stvbuf;
+ r2->rxbuf = NULL;
+ STV_FreeBuf(wrk, &stvbuf);
+ AZ(stvbuf);
+ }
+
Req_Cleanup(sp, wrk, r2->req);
Req_Release(r2->req);
}
@@ -537,10 +550,6 @@ h2_do_req(struct worker *wrk, void *priv)
r2->scheduled = 0;
r2->state = H2_S_CLOSED;
r2->h2sess->do_sweep = 1;
- if (h2->mailcall == r2) {
- h2->mailcall = NULL;
- AZ(pthread_cond_signal(h2->cond));
- }
Lck_Unlock(&h2->sess->mtx);
}
THR_SetRequest(NULL);
@@ -582,8 +591,16 @@ h2_end_headers(struct worker *wrk, struct h2_sess *h2,
if (req->req_body_status == NULL) {
if (cl == -1)
req->req_body_status = BS_EOF;
- else
+ else {
+ /* Note: If cl==0 here, we still need to have
+ * req_body_status==BS_LENGTH, so that there will
+ * be a wait for the stream to reach H2_S_CLOS_REM
+ * while dealing with the request body. */
req->req_body_status = BS_LENGTH;
+ }
+ /* Set req->htc->content_length because this is used as
+ * the hint in vrb_pull() for how large the storage
+ * buffers need to be */
req->htc->content_length = cl;
} else {
/* A HEADER frame contained END_STREAM */
@@ -742,78 +759,181 @@ h2_rx_continuation(struct worker *wrk, struct h2_sess *h2, struct h2_req *r2)
static h2_error v_matchproto_(h2_rxframe_f)
h2_rx_data(struct worker *wrk, struct h2_sess *h2, struct h2_req *r2)
{
- int w1 = 0, w2 = 0;
char buf[4];
- unsigned wi;
- ssize_t cl;
+ uint64_t l, l2, head;
+ const uint8_t *src;
+
+ /* XXX: Shouldn't error handling, setting of r2->error and
+ * r2->cond signalling be handled more generally at the end of
+ * procframe()??? */
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
ASSERT_RXTHR(h2);
CHECK_OBJ_ORNULL(r2, H2_REQ_MAGIC);
- if (r2 == NULL || !r2->scheduled)
+ if (r2 == NULL)
return (0);
+
if (r2->state >= H2_S_CLOS_REM) {
r2->error = H2SE_STREAM_CLOSED;
return (H2SE_STREAM_CLOSED); // rfc7540,l,1766,1769
}
+
Lck_Lock(&h2->sess->mtx);
- while (h2->mailcall != NULL && h2->error == 0 && r2->error == 0)
- AZ(Lck_CondWait(h2->cond, &h2->sess->mtx, 0));
+ CHECK_OBJ_ORNULL(r2->rxbuf, H2_RXBUF_MAGIC);
+
if (h2->error || r2->error) {
+ if (r2->cond)
+ AZ(pthread_cond_signal(r2->cond));
Lck_Unlock(&h2->sess->mtx);
return (h2->error ? h2->error : r2->error);
}
- r2->reqbody_bytes += h2->rxf_len;
- if (h2->rxf_flags & H2FF_DATA_END_STREAM)
- r2->state = H2_S_CLOS_REM;
- cl = r2->req->htc->content_length;
- if (cl >= 0 && (r2->reqbody_bytes > cl ||
- (r2->state >= H2_S_CLOS_REM && r2->reqbody_bytes != cl))) {
- VSLb(h2->vsl, SLT_Debug,
- "H2: stream %u: Received data and Content-Length"
- " mismatch", h2->rxf_stream);
- r2->error = H2SE_PROTOCOL_ERROR; // rfc7540,l,3150,3163
+ /* Check against the Content-Length header if given */
+ if (r2->req->htc->content_length >= 0) {
+ if (r2->rxbuf)
+ l = r2->rxbuf->head;
+ else
+ l = 0;
+ l += h2->rxf_len;
+ if (l > r2->req->htc->content_length ||
+ ((h2->rxf_flags & H2FF_DATA_END_STREAM) &&
+ l != r2->req->htc->content_length)) {
+ VSLb(h2->vsl, SLT_Debug,
+ "H2: stream %u: Received data and Content-Length"
+ " mismatch", h2->rxf_stream);
+ r2->error = H2SE_PROTOCOL_ERROR;
+ if (r2->cond)
+ AZ(pthread_cond_signal(r2->cond));
+ Lck_Unlock(&h2->sess->mtx);
+ return (H2SE_PROTOCOL_ERROR);
+ }
+ }
+
+ /* Handle zero size frame before starting to allocate buffers */
+ if (h2->rxf_len == 0) {
+ if (h2->rxf_flags & H2FF_DATA_END_STREAM)
+ r2->state = H2_S_CLOS_REM;
if (r2->cond)
AZ(pthread_cond_signal(r2->cond));
Lck_Unlock(&h2->sess->mtx);
- return (H2SE_PROTOCOL_ERROR);
+ return (0);
}
- AZ(h2->mailcall);
- h2->mailcall = r2;
+ /* Check and charge connection window */
+ if (h2->rxf_len > h2->req0->r_window) {
+ VSLb(h2->vsl, SLT_Debug,
+ "H2: stream %u: Exceeded connection receive window",
+ h2->rxf_stream);
+ r2->error = H2CE_FLOW_CONTROL_ERROR;
+ if (r2->cond)
+ AZ(pthread_cond_signal(r2->cond));
+ Lck_Unlock(&h2->sess->mtx);
+ return (H2CE_FLOW_CONTROL_ERROR);
+ }
h2->req0->r_window -= h2->rxf_len;
- r2->r_window -= h2->rxf_len;
- // req_bodybytes accounted in CNT code.
- if (r2->cond)
- AZ(pthread_cond_signal(r2->cond));
- while (h2->mailcall != NULL && h2->error == 0 && r2->error == 0)
- AZ(Lck_CondWait(h2->cond, &h2->sess->mtx, 0));
- wi = cache_param->h2_rx_window_increment;
if (h2->req0->r_window < cache_param->h2_rx_window_low_water) {
- h2->req0->r_window += wi;
- w1 = 1;
+ h2->req0->r_window += cache_param->h2_rx_window_increment;
+ vbe32enc(buf, cache_param->h2_rx_window_increment);
+ Lck_Unlock(&h2->sess->mtx);
+ H2_Send_Get(wrk, h2, h2->req0);
+ H2_Send_Frame(wrk, h2, H2_F_WINDOW_UPDATE, 0, 4, 0, buf);
+ H2_Send_Rel(h2, h2->req0);
+ Lck_Lock(&h2->sess->mtx);
}
- if (r2->r_window < cache_param->h2_rx_window_low_water) {
- r2->r_window += wi;
- w2 = 1;
+
+ /* Check stream window */
+ if (h2->rxf_len > r2->r_window) {
+ VSLb(h2->vsl, SLT_Debug,
+ "H2: stream %u: Exceeded stream receive window",
+ h2->rxf_stream);
+ r2->error = H2SE_FLOW_CONTROL_ERROR;
+ if (r2->cond)
+ AZ(pthread_cond_signal(r2->cond));
+ Lck_Unlock(&h2->sess->mtx);
+ return (H2SE_FLOW_CONTROL_ERROR);
}
+ /* Make the buffer on demand */
+ if (r2->rxbuf == NULL) {
+ unsigned bufsize;
+ size_t bstest;
+ struct stv_buffer *stvbuf;
+ struct h2_rxbuf *rxbuf;
- Lck_Unlock(&h2->sess->mtx);
+ Lck_Unlock(&h2->sess->mtx);
- if (w1 || w2) {
- vbe32enc(buf, wi);
- H2_Send_Get(wrk, h2, h2->req0);
- if (w1)
- H2_Send_Frame(wrk, h2, H2_F_WINDOW_UPDATE, 0,
- 4, 0, buf);
- if (w2)
- H2_Send_Frame(wrk, h2, H2_F_WINDOW_UPDATE, 0,
- 4, r2->stream, buf);
- H2_Send_Rel(h2, h2->req0);
+ bufsize = r2->r_window;
+ /* This is the first data frame, so r_window will be the
+ * initial window size. */
+ assert(bufsize > 0);
+ if ((h2->rxf_flags & H2FF_DATA_END_STREAM) &&
+ bufsize > h2->rxf_len)
+ /* Cap the buffer size when we know this is the
+ * single data frame. */
+ bufsize = h2->rxf_len;
+ stvbuf = STV_AllocBuf(wrk, stv_transient,
+ bufsize + sizeof *rxbuf);
+ if (stvbuf == NULL) {
+ VSLb(h2->vsl, SLT_Debug,
+ "H2: stream %u: Failed to allocate request body"
+ " buffer",
+ h2->rxf_stream);
+ Lck_Lock(&h2->sess->mtx);
+ r2->error = H2SE_INTERNAL_ERROR;
+ if (r2->cond)
+ AZ(pthread_cond_signal(r2->cond));
+ Lck_Unlock(&h2->sess->mtx);
+ return (H2SE_INTERNAL_ERROR);
+ }
+ rxbuf = STV_GetBufPtr(stvbuf, &bstest);
+ AN(rxbuf);
+ assert(bstest >= bufsize + sizeof *rxbuf);
+ assert(PAOK(rxbuf));
+ INIT_OBJ(rxbuf, H2_RXBUF_MAGIC);
+ rxbuf->size = bufsize;
+ rxbuf->stvbuf = stvbuf;
+
+ r2->rxbuf = rxbuf;
+
+ Lck_Lock(&h2->sess->mtx);
}
+
+ CHECK_OBJ_NOTNULL(r2->rxbuf, H2_RXBUF_MAGIC);
+ assert(r2->rxbuf->tail <= r2->rxbuf->head);
+ l = r2->rxbuf->head - r2->rxbuf->tail;
+ assert(l <= r2->rxbuf->size);
+ l = r2->rxbuf->size - l;
+ assert(h2->rxf_len <= l); /* Stream window handling ensures
+ * this */
+
+ Lck_Unlock(&h2->sess->mtx);
+
+ src = h2->rxf_data;
+ l = h2->rxf_len;
+ head = r2->rxbuf->head;
+ do {
+ l2 = l;
+ if ((head % r2->rxbuf->size) + l2 > r2->rxbuf->size)
+ l2 = r2->rxbuf->size - (head % r2->rxbuf->size);
+ assert(l2 > 0);
+ memcpy(&r2->rxbuf->data[head % r2->rxbuf->size], src, l2);
+ src += l2;
+ head += l2;
+ l -= l2;
+ } while (l > 0);
+
+ Lck_Lock(&h2->sess->mtx);
+ /* Charge stream window */
+ r2->r_window -= h2->rxf_len;
+ r2->rxbuf->head += h2->rxf_len;
+ assert(r2->rxbuf->tail <= r2->rxbuf->head);
+ if (h2->rxf_flags & H2FF_DATA_END_STREAM)
+ r2->state = H2_S_CLOS_REM;
+ if (r2->cond)
+ AZ(pthread_cond_signal(r2->cond));
+ Lck_Unlock(&h2->sess->mtx);
+
return (0);
}
@@ -822,8 +942,10 @@ h2_vfp_body(struct vfp_ctx *vc, struct vfp_entry *vfe, void *ptr, ssize_t *lp)
{
struct h2_req *r2;
struct h2_sess *h2;
- unsigned l;
enum vfp_status retval;
+ uint64_t l, l2, tail;
+ uint8_t *dst;
+ char buf[4];
CHECK_OBJ_NOTNULL(vc, VFP_CTX_MAGIC);
CHECK_OBJ_NOTNULL(vfe, VFP_ENTRY_MAGIC);
@@ -832,40 +954,87 @@ h2_vfp_body(struct vfp_ctx *vc, struct vfp_entry *vfe, void *ptr, ssize_t *lp)
AN(ptr);
AN(lp);
- l = *lp;
- *lp = 0;
+ assert(*lp >= 0);
Lck_Lock(&h2->sess->mtx);
+
r2->cond = &vc->wrk->cond;
- while (h2->mailcall != r2 && h2->error == 0 && r2->error == 0)
- AZ(Lck_CondWait(r2->cond, &h2->sess->mtx, 0));
- r2->cond = NULL;
- if (h2->error || r2->error) {
- retval = VFP_ERROR;
- } else {
- assert(h2->mailcall == r2);
- if (l > h2->rxf_len)
- l = h2->rxf_len;
- if (l > 0) {
- memcpy(ptr, h2->rxf_data, l);
- h2->rxf_data += l;
- h2->rxf_len -= l;
- }
- *lp = l;
- if (h2->rxf_len > 0) {
- /* We ran out of storage: Have VFP call us
- * again with a fresh buffer */
- Lck_Unlock(&h2->sess->mtx);
- return (VFP_OK);
- }
- if (h2->rxf_len == 0 && r2->state >= H2_S_CLOS_REM)
+ while (1) {
+ CHECK_OBJ_ORNULL(r2->rxbuf, H2_RXBUF_MAGIC);
+ if (r2->rxbuf) {
+ assert(r2->rxbuf->tail <= r2->rxbuf->head);
+ l = r2->rxbuf->head - r2->rxbuf->tail;
+ } else
+ l = 0;
+
+ if (h2->error || r2->error)
+ retval = VFP_ERROR;
+ else if (r2->state >= H2_S_CLOS_REM && l <= *lp)
retval = VFP_END;
- else
+ else {
+ if (l > *lp)
+ l = *lp;
retval = VFP_OK;
- h2->mailcall = NULL;
- AZ(pthread_cond_signal(h2->cond));
+ }
+
+ if (retval != VFP_OK || l > 0)
+ break;
+
+ /* XXX: Timeout */
+ AZ(Lck_CondWait(r2->cond, &h2->sess->mtx, 0));
}
+ r2->cond = NULL;
+
+ Lck_Unlock(&h2->sess->mtx);
+
+ if (l == 0 || retval == VFP_ERROR) {
+ *lp = 0;
+ return (retval);
+ }
+
+ *lp = l;
+ dst = ptr;
+ tail = r2->rxbuf->tail;
+ do {
+ l2 = l;
+ if ((tail % r2->rxbuf->size) + l2 > r2->rxbuf->size)
+ l2 = r2->rxbuf->size - (tail % r2->rxbuf->size);
+ assert(l2 > 0);
+ memcpy(dst, &r2->rxbuf->data[tail % r2->rxbuf->size], l2);
+ dst += l2;
+ tail += l2;
+ l -= l2;
+ } while (l > 0);
+
+ Lck_Lock(&h2->sess->mtx);
+
+ CHECK_OBJ_NOTNULL(r2->rxbuf, H2_RXBUF_MAGIC);
+ r2->rxbuf->tail = tail;
+ assert(r2->rxbuf->tail <= r2->rxbuf->head);
+
+ if (r2->r_window < cache_param->h2_rx_window_low_water &&
+ r2->state < H2_S_CLOS_REM) {
+ /* l is free buffer space */
+ /* l2 is calculated window increment */
+ l = r2->rxbuf->size - (r2->rxbuf->head - r2->rxbuf->tail);
+ assert(r2->r_window <= l);
+ l2 = cache_param->h2_rx_window_increment;
+ if (r2->r_window + l2 > l)
+ l2 = l - r2->r_window;
+ r2->r_window += l2;
+ } else
+ l2 = 0;
+
Lck_Unlock(&h2->sess->mtx);
+
+ if (l2 > 0) {
+ vbe32enc(buf, l2);
+ H2_Send_Get(vc->wrk, h2, r2);
+ H2_Send_Frame(vc->wrk, h2, H2_F_WINDOW_UPDATE, 0, 4,
+ r2->stream, buf);
+ H2_Send_Rel(h2, r2);
+ }
+
return (retval);
}
@@ -874,6 +1043,7 @@ h2_vfp_body_fini(struct vfp_ctx *vc, struct vfp_entry *vfe)
{
struct h2_req *r2;
struct h2_sess *h2;
+ struct stv_buffer *stvbuf = NULL;
CHECK_OBJ_NOTNULL(vc, VFP_CTX_MAGIC);
CHECK_OBJ_NOTNULL(vfe, VFP_ENTRY_MAGIC);
@@ -889,11 +1059,21 @@ h2_vfp_body_fini(struct vfp_ctx *vc, struct vfp_entry *vfe)
H2_Send_Rel(h2, r2);
Lck_Lock(&h2->sess->mtx);
r2->error = H2SE_REFUSED_STREAM;
- if (h2->mailcall == r2) {
- h2->mailcall = NULL;
- AZ(pthread_cond_signal(h2->cond));
+ Lck_Unlock(&h2->sess->mtx);
+ }
+
+ if (r2->state >= H2_S_CLOS_REM && r2->rxbuf != NULL) {
+ Lck_Lock(&h2->sess->mtx);
+ CHECK_OBJ_ORNULL(r2->rxbuf, H2_RXBUF_MAGIC);
+ if (r2->rxbuf != NULL) {
+ stvbuf = r2->rxbuf->stvbuf;
+ r2->rxbuf = NULL;
}
Lck_Unlock(&h2->sess->mtx);
+ if (stvbuf != NULL) {
+ STV_FreeBuf(vc->wrk, &stvbuf);
+ AZ(stvbuf);
+ }
}
}
More information about the varnish-commit
mailing list