[master] 91143910a Redo H/2 tx data handling

Dridi Boukelmoune dridi.boukelmoune at gmail.com
Mon Aug 30 08:31:07 UTC 2021


commit 91143910a2ea2a7af7db390e02096786c7a78645
Author: Martin Blix Grydeland <martin at varnish-software.com>
Date:   Tue Jun 22 11:49:24 2021 +0200

    Redo H/2 tx data handling
    
    This implements stream data handling using a buffer between the H/2
    session thread and each stream thread. This is needed to avoid head of
    line blocking on the session socket when a data frame is received for a
    stream thread that is not yet ready to receive it.
    
    The buffer used will have to be as large as the send window the peer
    expects at the time the stream is opened. This will typically be 65535
    unless the h2_initial_window_size parameter has been changed.
    
    Stream window updates will then be issued only once data is removed from
    the buffer by the request body being consumed from the request handling
    thread, limited in size to what space is then available in the buffer.

diff --git a/bin/varnishd/http2/cache_http2.h b/bin/varnishd/http2/cache_http2.h
index ee91d6fac..a075f6305 100644
--- a/bin/varnishd/http2/cache_http2.h
+++ b/bin/varnishd/http2/cache_http2.h
@@ -115,6 +115,16 @@ enum h2_stream_e {
 #define H2_FRAME_FLAGS(l,u,v)   extern const uint8_t H2FF_##u;
 #include "tbl/h2_frames.h"
 
+struct h2_rxbuf {
+	unsigned			magic;
+#define H2_RXBUF_MAGIC			0x73f9fb27
+	unsigned			size;
+	uint64_t			tail;
+	uint64_t			head;
+	struct stv_buffer		*stvbuf;
+	uint8_t				data[];
+};
+
 struct h2_req {
 	unsigned			magic;
 #define H2_REQ_MAGIC			0x03411584
@@ -134,7 +144,7 @@ struct h2_req {
 	/* Where to wake this stream up */
 	struct worker			*wrk;
 
-	ssize_t				reqbody_bytes;
+	struct h2_rxbuf			*rxbuf;
 
 	VTAILQ_ENTRY(h2_req)		tx_list;
 	h2_error			error;
@@ -147,7 +157,6 @@ struct h2_sess {
 #define H2_SESS_MAGIC			0xa16f7e4b
 
 	pthread_t			rxthr;
-	struct h2_req			*mailcall;
 	pthread_cond_t			*cond;
 	pthread_cond_t			winupd_cond[1];
 
diff --git a/bin/varnishd/http2/cache_http2_proto.c b/bin/varnishd/http2/cache_http2_proto.c
index 61c97a894..155717fac 100644
--- a/bin/varnishd/http2/cache_http2_proto.c
+++ b/bin/varnishd/http2/cache_http2_proto.c
@@ -40,6 +40,7 @@
 #include "cache/cache_filter.h"
 #include "http2/cache_http2.h"
 #include "cache/cache_objhead.h"
+#include "storage/storage.h"
 
 #include "vend.h"
 #include "vtcp.h"
@@ -172,6 +173,7 @@ h2_del_req(struct worker *wrk, struct h2_req *r2)
 {
 	struct h2_sess *h2;
 	struct sess *sp;
+	struct stv_buffer *stvbuf;
 
 	CHECK_OBJ_NOTNULL(r2, H2_REQ_MAGIC);
 	AZ(r2->scheduled);
@@ -185,7 +187,18 @@ h2_del_req(struct worker *wrk, struct h2_req *r2)
 	/* XXX: PRIORITY reshuffle */
 	VTAILQ_REMOVE(&h2->streams, r2, list);
 	Lck_Unlock(&sp->mtx);
+
 	assert(!WS_IsReserved(r2->req->ws));
+	AZ(r2->req->ws->r);
+
+	CHECK_OBJ_ORNULL(r2->rxbuf, H2_RXBUF_MAGIC);
+	if (r2->rxbuf) {
+		stvbuf = r2->rxbuf->stvbuf;
+		r2->rxbuf = NULL;
+		STV_FreeBuf(wrk, &stvbuf);
+		AZ(stvbuf);
+	}
+
 	Req_Cleanup(sp, wrk, r2->req);
 	Req_Release(r2->req);
 }
@@ -537,10 +550,6 @@ h2_do_req(struct worker *wrk, void *priv)
 		r2->scheduled = 0;
 		r2->state = H2_S_CLOSED;
 		r2->h2sess->do_sweep = 1;
-		if (h2->mailcall == r2) {
-			h2->mailcall = NULL;
-			AZ(pthread_cond_signal(h2->cond));
-		}
 		Lck_Unlock(&h2->sess->mtx);
 	}
 	THR_SetRequest(NULL);
@@ -582,8 +591,16 @@ h2_end_headers(struct worker *wrk, struct h2_sess *h2,
 	if (req->req_body_status == NULL) {
 		if (cl == -1)
 			req->req_body_status = BS_EOF;
-		else
+		else {
+			/* Note: If cl==0 here, we still need to have
+			 * req_body_status==BS_LENGTH, so that there will
+			 * be a wait for the stream to reach H2_S_CLOS_REM
+			 * while dealing with the request body. */
 			req->req_body_status = BS_LENGTH;
+		}
+		/* Set req->htc->content_length because this is used as
+		 * the hint in vrb_pull() for how large the storage
+		 * buffers need to be */
 		req->htc->content_length = cl;
 	} else {
 		/* A HEADER frame contained END_STREAM */
@@ -742,78 +759,181 @@ h2_rx_continuation(struct worker *wrk, struct h2_sess *h2, struct h2_req *r2)
 static h2_error v_matchproto_(h2_rxframe_f)
 h2_rx_data(struct worker *wrk, struct h2_sess *h2, struct h2_req *r2)
 {
-	int w1 = 0, w2 = 0;
 	char buf[4];
-	unsigned wi;
-	ssize_t cl;
+	uint64_t l, l2, head;
+	const uint8_t *src;
+
+	/* XXX: Shouldn't error handling, setting of r2->error and
+	 * r2->cond signalling be handled more generally at the end of
+	 * procframe()??? */
 
 	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
 	ASSERT_RXTHR(h2);
 	CHECK_OBJ_ORNULL(r2, H2_REQ_MAGIC);
 
-	if (r2 == NULL || !r2->scheduled)
+	if (r2 == NULL)
 		return (0);
+
 	if (r2->state >= H2_S_CLOS_REM) {
 		r2->error = H2SE_STREAM_CLOSED;
 		return (H2SE_STREAM_CLOSED); // rfc7540,l,1766,1769
 	}
+
 	Lck_Lock(&h2->sess->mtx);
-	while (h2->mailcall != NULL && h2->error == 0 && r2->error == 0)
-		AZ(Lck_CondWait(h2->cond, &h2->sess->mtx, 0));
+	CHECK_OBJ_ORNULL(r2->rxbuf, H2_RXBUF_MAGIC);
+
 	if (h2->error || r2->error) {
+		if (r2->cond)
+			AZ(pthread_cond_signal(r2->cond));
 		Lck_Unlock(&h2->sess->mtx);
 		return (h2->error ? h2->error : r2->error);
 	}
 
-	r2->reqbody_bytes += h2->rxf_len;
-	if (h2->rxf_flags & H2FF_DATA_END_STREAM)
-		r2->state = H2_S_CLOS_REM;
-	cl = r2->req->htc->content_length;
-	if (cl >= 0 && (r2->reqbody_bytes > cl ||
-	      (r2->state >= H2_S_CLOS_REM && r2->reqbody_bytes != cl))) {
-		VSLb(h2->vsl, SLT_Debug,
-		    "H2: stream %u: Received data and Content-Length"
-		    " mismatch", h2->rxf_stream);
-		r2->error = H2SE_PROTOCOL_ERROR; // rfc7540,l,3150,3163
+	/* Check against the Content-Length header if given */
+	if (r2->req->htc->content_length >= 0) {
+		if (r2->rxbuf)
+			l = r2->rxbuf->head;
+		else
+			l = 0;
+		l += h2->rxf_len;
+		if (l > r2->req->htc->content_length ||
+		    ((h2->rxf_flags & H2FF_DATA_END_STREAM) &&
+		     l != r2->req->htc->content_length)) {
+			VSLb(h2->vsl, SLT_Debug,
+			    "H2: stream %u: Received data and Content-Length"
+			    " mismatch", h2->rxf_stream);
+			r2->error = H2SE_PROTOCOL_ERROR;
+			if (r2->cond)
+				AZ(pthread_cond_signal(r2->cond));
+			Lck_Unlock(&h2->sess->mtx);
+			return (H2SE_PROTOCOL_ERROR);
+		}
+	}
+
+	/* Handle zero size frame before starting to allocate buffers */
+	if (h2->rxf_len == 0) {
+		if (h2->rxf_flags & H2FF_DATA_END_STREAM)
+			r2->state = H2_S_CLOS_REM;
 		if (r2->cond)
 			AZ(pthread_cond_signal(r2->cond));
 		Lck_Unlock(&h2->sess->mtx);
-		return (H2SE_PROTOCOL_ERROR);
+		return (0);
 	}
 
-	AZ(h2->mailcall);
-	h2->mailcall = r2;
+	/* Check and charge connection window */
+	if (h2->rxf_len > h2->req0->r_window) {
+		VSLb(h2->vsl, SLT_Debug,
+		    "H2: stream %u: Exceeded connection receive window",
+		    h2->rxf_stream);
+		r2->error = H2CE_FLOW_CONTROL_ERROR;
+		if (r2->cond)
+			AZ(pthread_cond_signal(r2->cond));
+		Lck_Unlock(&h2->sess->mtx);
+		return (H2CE_FLOW_CONTROL_ERROR);
+	}
 	h2->req0->r_window -= h2->rxf_len;
-	r2->r_window -= h2->rxf_len;
-	// req_bodybytes accounted in CNT code.
-	if (r2->cond)
-		AZ(pthread_cond_signal(r2->cond));
-	while (h2->mailcall != NULL && h2->error == 0 && r2->error == 0)
-		AZ(Lck_CondWait(h2->cond, &h2->sess->mtx, 0));
-	wi = cache_param->h2_rx_window_increment;
 	if (h2->req0->r_window < cache_param->h2_rx_window_low_water) {
-		h2->req0->r_window += wi;
-		w1 = 1;
+		h2->req0->r_window += cache_param->h2_rx_window_increment;
+		vbe32enc(buf, cache_param->h2_rx_window_increment);
+		Lck_Unlock(&h2->sess->mtx);
+		H2_Send_Get(wrk, h2, h2->req0);
+		H2_Send_Frame(wrk, h2, H2_F_WINDOW_UPDATE, 0, 4, 0, buf);
+		H2_Send_Rel(h2, h2->req0);
+		Lck_Lock(&h2->sess->mtx);
 	}
-	if (r2->r_window < cache_param->h2_rx_window_low_water) {
-		r2->r_window += wi;
-		w2 = 1;
+
+	/* Check stream window */
+	if (h2->rxf_len > r2->r_window) {
+		VSLb(h2->vsl, SLT_Debug,
+		    "H2: stream %u: Exceeded stream receive window",
+		    h2->rxf_stream);
+		r2->error = H2SE_FLOW_CONTROL_ERROR;
+		if (r2->cond)
+			AZ(pthread_cond_signal(r2->cond));
+		Lck_Unlock(&h2->sess->mtx);
+		return (H2SE_FLOW_CONTROL_ERROR);
 	}
 
+	/* Make the buffer on demand */
+	if (r2->rxbuf == NULL) {
+		unsigned bufsize;
+		size_t bstest;
+		struct stv_buffer *stvbuf;
+		struct h2_rxbuf *rxbuf;
 
-	Lck_Unlock(&h2->sess->mtx);
+		Lck_Unlock(&h2->sess->mtx);
 
-	if (w1 || w2) {
-		vbe32enc(buf, wi);
-		H2_Send_Get(wrk, h2, h2->req0);
-		if (w1)
-			H2_Send_Frame(wrk, h2, H2_F_WINDOW_UPDATE, 0,
-			    4, 0, buf);
-		if (w2)
-			H2_Send_Frame(wrk, h2, H2_F_WINDOW_UPDATE, 0,
-			    4, r2->stream, buf);
-		H2_Send_Rel(h2, h2->req0);
+		bufsize = r2->r_window;
+		/* This is the first data frame, so r_window will be the
+		 * initial window size. */
+		assert(bufsize > 0);
+		if ((h2->rxf_flags & H2FF_DATA_END_STREAM) &&
+		    bufsize > h2->rxf_len)
+			/* Cap the buffer size when we know this is the
+			 * single data frame. */
+			bufsize = h2->rxf_len;
+		stvbuf = STV_AllocBuf(wrk, stv_transient,
+		    bufsize + sizeof *rxbuf);
+		if (stvbuf == NULL) {
+			VSLb(h2->vsl, SLT_Debug,
+			    "H2: stream %u: Failed to allocate request body"
+			    " buffer",
+			    h2->rxf_stream);
+			Lck_Lock(&h2->sess->mtx);
+			r2->error = H2SE_INTERNAL_ERROR;
+			if (r2->cond)
+				AZ(pthread_cond_signal(r2->cond));
+			Lck_Unlock(&h2->sess->mtx);
+			return (H2SE_INTERNAL_ERROR);
+		}
+		rxbuf = STV_GetBufPtr(stvbuf, &bstest);
+		AN(rxbuf);
+		assert(bstest >= bufsize + sizeof *rxbuf);
+		assert(PAOK(rxbuf));
+		INIT_OBJ(rxbuf, H2_RXBUF_MAGIC);
+		rxbuf->size = bufsize;
+		rxbuf->stvbuf = stvbuf;
+
+		r2->rxbuf = rxbuf;
+
+		Lck_Lock(&h2->sess->mtx);
 	}
+
+	CHECK_OBJ_NOTNULL(r2->rxbuf, H2_RXBUF_MAGIC);
+	assert(r2->rxbuf->tail <= r2->rxbuf->head);
+	l = r2->rxbuf->head - r2->rxbuf->tail;
+	assert(l <= r2->rxbuf->size);
+	l = r2->rxbuf->size - l;
+	assert(h2->rxf_len <= l); /* Stream window handling ensures
+				   * this */
+
+	Lck_Unlock(&h2->sess->mtx);
+
+	src = h2->rxf_data;
+	l = h2->rxf_len;
+	head = r2->rxbuf->head;
+	do {
+		l2 = l;
+		if ((head % r2->rxbuf->size) + l2 > r2->rxbuf->size)
+			l2 = r2->rxbuf->size - (head % r2->rxbuf->size);
+		assert(l2 > 0);
+		memcpy(&r2->rxbuf->data[head % r2->rxbuf->size], src, l2);
+		src += l2;
+		head += l2;
+		l -= l2;
+	} while (l > 0);
+
+	Lck_Lock(&h2->sess->mtx);
+	/* Charge stream window */
+	r2->r_window -= h2->rxf_len;
+	r2->rxbuf->head += h2->rxf_len;
+	assert(r2->rxbuf->tail <= r2->rxbuf->head);
+	if (h2->rxf_flags & H2FF_DATA_END_STREAM)
+		r2->state = H2_S_CLOS_REM;
+	if (r2->cond)
+		AZ(pthread_cond_signal(r2->cond));
+	Lck_Unlock(&h2->sess->mtx);
+
 	return (0);
 }
 
@@ -822,8 +942,10 @@ h2_vfp_body(struct vfp_ctx *vc, struct vfp_entry *vfe, void *ptr, ssize_t *lp)
 {
 	struct h2_req *r2;
 	struct h2_sess *h2;
-	unsigned l;
 	enum vfp_status retval;
+	uint64_t l, l2, tail;
+	uint8_t *dst;
+	char buf[4];
 
 	CHECK_OBJ_NOTNULL(vc, VFP_CTX_MAGIC);
 	CHECK_OBJ_NOTNULL(vfe, VFP_ENTRY_MAGIC);
@@ -832,40 +954,87 @@ h2_vfp_body(struct vfp_ctx *vc, struct vfp_entry *vfe, void *ptr, ssize_t *lp)
 
 	AN(ptr);
 	AN(lp);
-	l = *lp;
-	*lp = 0;
+	assert(*lp >= 0);
 
 	Lck_Lock(&h2->sess->mtx);
+
 	r2->cond = &vc->wrk->cond;
-	while (h2->mailcall != r2 && h2->error == 0 && r2->error == 0)
-		AZ(Lck_CondWait(r2->cond, &h2->sess->mtx, 0));
-	r2->cond = NULL;
-	if (h2->error || r2->error) {
-		retval = VFP_ERROR;
-	} else {
-		assert(h2->mailcall == r2);
-		if (l > h2->rxf_len)
-			l = h2->rxf_len;
-		if (l > 0) {
-			memcpy(ptr, h2->rxf_data, l);
-			h2->rxf_data += l;
-			h2->rxf_len -= l;
-		}
-		*lp = l;
-		if (h2->rxf_len > 0) {
-			/* We ran out of storage: Have VFP call us
-			 * again with a fresh buffer */
-			Lck_Unlock(&h2->sess->mtx);
-			return (VFP_OK);
-		}
-		if (h2->rxf_len == 0 && r2->state >= H2_S_CLOS_REM)
+	while (1) {
+		CHECK_OBJ_ORNULL(r2->rxbuf, H2_RXBUF_MAGIC);
+		if (r2->rxbuf) {
+			assert(r2->rxbuf->tail <= r2->rxbuf->head);
+			l = r2->rxbuf->head - r2->rxbuf->tail;
+		} else
+			l = 0;
+
+		if (h2->error || r2->error)
+			retval = VFP_ERROR;
+		else if (r2->state >= H2_S_CLOS_REM && l <= *lp)
 			retval = VFP_END;
-		else
+		else {
+			if (l > *lp)
+				l = *lp;
 			retval = VFP_OK;
-		h2->mailcall = NULL;
-		AZ(pthread_cond_signal(h2->cond));
+		}
+
+		if (retval != VFP_OK || l > 0)
+			break;
+
+		/* XXX: Timeout */
+		AZ(Lck_CondWait(r2->cond, &h2->sess->mtx, 0));
 	}
+	r2->cond = NULL;
+
+	Lck_Unlock(&h2->sess->mtx);
+
+	if (l == 0 || retval == VFP_ERROR) {
+		*lp = 0;
+		return (retval);
+	}
+
+	*lp = l;
+	dst = ptr;
+	tail = r2->rxbuf->tail;
+	do {
+		l2 = l;
+		if ((tail % r2->rxbuf->size) + l2 > r2->rxbuf->size)
+			l2 = r2->rxbuf->size - (tail % r2->rxbuf->size);
+		assert(l2 > 0);
+		memcpy(dst, &r2->rxbuf->data[tail % r2->rxbuf->size], l2);
+		dst += l2;
+		tail += l2;
+		l -= l2;
+	} while (l > 0);
+
+	Lck_Lock(&h2->sess->mtx);
+
+	CHECK_OBJ_NOTNULL(r2->rxbuf, H2_RXBUF_MAGIC);
+	r2->rxbuf->tail = tail;
+	assert(r2->rxbuf->tail <= r2->rxbuf->head);
+
+	if (r2->r_window < cache_param->h2_rx_window_low_water &&
+	    r2->state < H2_S_CLOS_REM) {
+		/* l is free buffer space */
+		/* l2 is calculated window increment */
+		l = r2->rxbuf->size - (r2->rxbuf->head - r2->rxbuf->tail);
+		assert(r2->r_window <= l);
+		l2 = cache_param->h2_rx_window_increment;
+		if (r2->r_window + l2 > l)
+			l2 = l - r2->r_window;
+		r2->r_window += l2;
+	} else
+		l2 = 0;
+
 	Lck_Unlock(&h2->sess->mtx);
+
+	if (l2 > 0) {
+		vbe32enc(buf, l2);
+		H2_Send_Get(vc->wrk, h2, r2);
+		H2_Send_Frame(vc->wrk, h2, H2_F_WINDOW_UPDATE, 0, 4,
+		    r2->stream, buf);
+		H2_Send_Rel(h2, r2);
+	}
+
 	return (retval);
 }
 
@@ -874,6 +1043,7 @@ h2_vfp_body_fini(struct vfp_ctx *vc, struct vfp_entry *vfe)
 {
 	struct h2_req *r2;
 	struct h2_sess *h2;
+	struct stv_buffer *stvbuf = NULL;
 
 	CHECK_OBJ_NOTNULL(vc, VFP_CTX_MAGIC);
 	CHECK_OBJ_NOTNULL(vfe, VFP_ENTRY_MAGIC);
@@ -889,11 +1059,21 @@ h2_vfp_body_fini(struct vfp_ctx *vc, struct vfp_entry *vfe)
 		H2_Send_Rel(h2, r2);
 		Lck_Lock(&h2->sess->mtx);
 		r2->error = H2SE_REFUSED_STREAM;
-		if (h2->mailcall == r2) {
-			h2->mailcall = NULL;
-			AZ(pthread_cond_signal(h2->cond));
+		Lck_Unlock(&h2->sess->mtx);
+	}
+
+	if (r2->state >= H2_S_CLOS_REM && r2->rxbuf != NULL) {
+		Lck_Lock(&h2->sess->mtx);
+		CHECK_OBJ_ORNULL(r2->rxbuf, H2_RXBUF_MAGIC);
+		if (r2->rxbuf != NULL) {
+			stvbuf = r2->rxbuf->stvbuf;
+			r2->rxbuf = NULL;
 		}
 		Lck_Unlock(&h2->sess->mtx);
+		if (stvbuf != NULL) {
+			STV_FreeBuf(vc->wrk, &stvbuf);
+			AZ(stvbuf);
+		}
 	}
 }
 


More information about the varnish-commit mailing list