[master] b6d1f73b2 param: New transit_buffer parameter

Nils Goroll nils.goroll at uplex.de
Mon Nov 7 14:57:06 UTC 2022


commit b6d1f73b232133c6d4731644e6d2d606ea3a17f2
Author: Alf-André Walla <fwsgonzo at hotmail.com>
Date:   Tue Dec 7 12:30:44 2021 +0100

    param: New transit_buffer parameter
    
    It pauses the fetch progress when clients are lagging behind for
    uncacheable streaming deliveries.
    
    Signed-off-by: Dridi Boukelmoune <dridi.boukelmoune at gmail.com>

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index c46ef68b9..49ef26195 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -296,6 +296,8 @@ struct boc {
 	enum boc_state_e	state;
 	uint8_t			*vary;
 	uint64_t		fetched_so_far;
+	uint64_t		delivered_so_far;
+	uint64_t		transit_buffer;
 };
 
 /* Object core structure ---------------------------------------------
diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c
index bc46e280c..cf1005aa4 100644
--- a/bin/varnishd/cache/cache_fetch.c
+++ b/bin/varnishd/cache/cache_fetch.c
@@ -549,6 +549,8 @@ vbf_stp_startfetch(struct worker *wrk, struct busyobj *bo)
 		bo->uncacheable = 1;
 		wrk->vpi->handling = VCL_RET_DELIVER;
 	}
+	if (!bo->uncacheable || !bo->do_stream)
+		oc->boc->transit_buffer = 0;
 	if (bo->uncacheable)
 		oc->flags |= OC_F_HFM;
 
@@ -597,6 +599,9 @@ vbf_stp_fetchbody(struct worker *wrk, struct busyobj *bo)
 		}
 		AZ(vfc->failed);
 		l = est;
+		oc = bo->fetch_objcore;
+		if (oc->boc->transit_buffer > 0)
+			l = vmin_t(ssize_t, l, oc->boc->transit_buffer);
 		assert(l >= 0);
 		if (VFP_GetStorage(vfc, &l, &ptr) != VFP_OK) {
 			bo->htc->doclose = SC_RX_BODY;
@@ -970,6 +975,8 @@ vbf_stp_error(struct worker *wrk, struct busyobj *bo)
 		return (F_STP_FAIL);
 	}
 
+	oc->boc->transit_buffer = 0;
+
 	ll = VSB_len(synth_body);
 	o = 0;
 	while (ll > 0) {
diff --git a/bin/varnishd/cache/cache_obj.c b/bin/varnishd/cache/cache_obj.c
index 0c9e68fb3..b839b9e68 100644
--- a/bin/varnishd/cache/cache_obj.c
+++ b/bin/varnishd/cache/cache_obj.c
@@ -109,6 +109,7 @@ obj_newboc(void)
 	Lck_New(&boc->mtx, lck_busyobj);
 	AZ(pthread_cond_init(&boc->cond, NULL));
 	boc->refcount = 1;
+	boc->transit_buffer = cache_param->transit_buffer;
 	return (boc);
 }
 
@@ -216,6 +217,29 @@ ObjGetSpace(struct worker *wrk, struct objcore *oc, ssize_t *sz, uint8_t **ptr)
  * surplus space allocated.
  */
 
+static void
+obj_extend_condwait(const struct objcore *oc)
+{
+
+	if (oc->boc->transit_buffer == 0)
+		return;
+
+	assert(oc->flags & (OC_F_PRIVATE | OC_F_HFM | OC_F_HFP));
+	/* NB: strictly signaling progress both ways would be prone to
+	 * deadlocks, so instead we wait for signals from the client side
+	 * when delivered_so_far so far is updated, but in case the fetch
+	 * thread was not waiting at the time of the signal, we will see
+	 * updates to delivered_so_far after timing out.
+	 */
+	while (!(oc->flags & OC_F_CANCEL) && oc->boc->fetched_so_far >
+	    oc->boc->delivered_so_far + oc->boc->transit_buffer) {
+		(void)Lck_CondWaitTimeout(&oc->boc->cond, &oc->boc->mtx, 0.1);
+		/* Fallback: Check if we are alone waiting on this object */
+		if (oc->refcnt == 1)
+			break;
+	}
+}
+
 void
 ObjExtend(struct worker *wrk, struct objcore *oc, ssize_t l, int final)
 {
@@ -228,6 +252,7 @@ ObjExtend(struct worker *wrk, struct objcore *oc, ssize_t l, int final)
 
 	Lck_Lock(&oc->boc->mtx);
 	if (l > 0) {
+		obj_extend_condwait(oc);
 		om->objextend(wrk, oc, l);
 		oc->boc->fetched_so_far += l;
 		AZ(pthread_cond_broadcast(&oc->boc->cond));
@@ -254,6 +279,12 @@ ObjWaitExtend(const struct worker *wrk, const struct objcore *oc, uint64_t l)
 	while (1) {
 		rv = oc->boc->fetched_so_far;
 		assert(l <= rv || oc->boc->state == BOS_FAILED);
+		if (oc->boc->transit_buffer > 0) {
+			assert(oc->flags & (OC_F_PRIVATE | OC_F_HFM | OC_F_HFP));
+			/* Signal the new client position */
+			oc->boc->delivered_so_far = l;
+			AZ(pthread_cond_signal(&oc->boc->cond));
+		}
 		if (rv > l || oc->boc->state >= BOS_FINISHED)
 			break;
 		(void)Lck_CondWait(&oc->boc->cond, &oc->boc->mtx);
diff --git a/bin/varnishtest/tests/c00110.vtc b/bin/varnishtest/tests/c00110.vtc
new file mode 100644
index 000000000..ea841c505
--- /dev/null
+++ b/bin/varnishtest/tests/c00110.vtc
@@ -0,0 +1,22 @@
+varnishtest "Transit buffering with early close"
+
+feature cmd {test $(uname) != SunOS}
+
+server s1 {
+	non_fatal
+	rxreq
+	txresp -bodylen 2000000
+} -start
+
+varnish v1 -cliok "param.set transit_buffer 1k"
+varnish v1 -vcl+backend { } -start
+
+client c1 -rcvbuf 128 {
+	txreq -method POST
+	rxresphdrs
+	expect resp.status == 200
+	recv 100
+} -run
+
+varnish v1 -expect VBE.vcl1.s1.conn == 0
+varnish v1 -expect VBE.vcl1.s1.busy == 0
diff --git a/bin/varnishtest/tests/c00111.vtc b/bin/varnishtest/tests/c00111.vtc
new file mode 100644
index 000000000..706ee7041
--- /dev/null
+++ b/bin/varnishtest/tests/c00111.vtc
@@ -0,0 +1,29 @@
+varnishtest "LRU error without transit buffer"
+
+server s1 -repeat 2 {
+	non_fatal
+	rxreq
+	txresp -bodylen 1850000
+} -start
+
+varnish v1 -arg "-s Transient=malloc,1m" -vcl+backend { } -start
+
+client c1 {
+	non_fatal
+	txreq -method POST
+	rxresp
+} -run
+
+varnish v1 -vsl_catchup
+varnish v1 -expect fetch_failed == 1
+
+varnish v1 -cliok "param.set transit_buffer 4k"
+
+client c2 {
+	txreq -method POST
+	rxresp
+} -run
+
+varnish v1 -vsl_catchup
+varnish v1 -expect s_fetch == 2
+varnish v1 -expect fetch_failed == 1
diff --git a/bin/varnishtest/tests/c00112.vtc b/bin/varnishtest/tests/c00112.vtc
new file mode 100644
index 000000000..491daedee
--- /dev/null
+++ b/bin/varnishtest/tests/c00112.vtc
@@ -0,0 +1,39 @@
+varnishtest "Transit buffering deadlock test"
+
+server s1 {
+	rxreq
+	txresp -status 404
+} -start
+
+server s2 {
+	non_fatal
+	rxreq
+	txresp -bodylen 2000000
+} -start
+
+varnish v1 -cliok "param.set transit_buffer 1k"
+varnish v1 -vcl+backend {
+	sub vcl_recv {
+		set req.backend_hint = s2;
+		if (req.restarts == 1) {
+			set req.backend_hint = s1;
+		}
+	}
+	sub vcl_deliver {
+		if (req.restarts < 1) {
+			return (restart);
+		}
+	}
+} -start
+
+client c1 {
+	txreq -method POST
+	rxresp
+	expect resp.bodylen == 0
+	expect resp.status == 404
+} -run
+
+varnish v1 -expect VBE.vcl1.s1.conn == 0
+varnish v1 -expect VBE.vcl1.s1.busy == 0
+varnish v1 -expect VBE.vcl1.s2.conn == 0
+varnish v1 -expect VBE.vcl1.s2.busy == 0
diff --git a/include/tbl/params.h b/include/tbl/params.h
index 1d3416daa..5ed5d7f86 100644
--- a/include/tbl/params.h
+++ b/include/tbl/params.h
@@ -952,6 +952,23 @@ PARAM_SIMPLE(
 	/* flags */	EXPERIMENTAL
 )
 
+PARAM_SIMPLE(
+	/* name */	transit_buffer,
+	/* type */	bytes,
+	/* min */	"0k",
+	/* max */	NULL,
+	/* def */	"0k",
+	/* units */	"bytes",
+	/* descr */
+	"The amount of buffer allowed through Varnish for uncacheable "
+	"backend streaming fetches. Zero means that no buffering is "
+	"done and the object is fetched as fast as possible.\n\n"
+	"When a client is slow, this can prevent large uncacheable objects "
+	"from being stored indefinitely when the intent is to simply stream "
+	"them to the client. As a result, a slow client transaction holds "
+	"onto a backend connection until the end of the delivery."
+)
+
 PARAM_SIMPLE(
 	/* name */	vary_notice,
 	/* type */	uint,


More information about the varnish-commit mailing list