[PATCH 22/25] Implement stream_pass_bufsize throttling of streaming in pass mode

Martin Blix Grydeland martin at varnish-software.com
Sun Jan 22 18:53:28 CET 2012


---
 bin/varnishd/cache/cache.h          |    3 +-
 bin/varnishd/cache/cache_busyobj.c  |    5 ++-
 bin/varnishd/cache/cache_response.c |   67 +++++++++++++++++++++-------------
 3 files changed, 46 insertions(+), 29 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index fe65dbd..a64a0ae 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -541,7 +541,8 @@ struct busyobj {
 
 	/* Stream stuff */
 	ssize_t			stream_max;
-	struct storage		*stream_frontchunk;
+	volatile ssize_t	stream_next;
+	volatile struct storage	*stream_frontchunk;
 	unsigned		stream_stopped;
 	ssize_t			stream_pass_bufsize;
 };
diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c
index b39c170..8b97d4a 100644
--- a/bin/varnishd/cache/cache_busyobj.c
+++ b/bin/varnishd/cache/cache_busyobj.c
@@ -295,14 +295,15 @@ VBO_StreamSync(struct worker *wrk)
 	    (wrk->sp->req->obj->objcore->flags & OC_F_PASS)) {
 		/* Give notice to backend fetch that we are finished
 		 * with all chunks before this one */
+		assert(busyobj->stream_next <= sctx->stream_next);
+		busyobj->stream_next = sctx->stream_next;
 		busyobj->stream_frontchunk = sctx->stream_frontchunk;
 	}
 
 	sctx->stream_stopped = busyobj->stream_stopped;
 	sctx->stream_max = busyobj->stream_max;
 
-	if (busyobj->use_locks && !sctx->stream_stopped &&
-	    sctx->stream_next == sctx->stream_max) {
+	if (busyobj->use_locks && sctx->stream_next == sctx->stream_max) {
 		while (!busyobj->stream_stopped &&
 		       sctx->stream_max == busyobj->stream_max) {
 			Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx,
diff --git a/bin/varnishd/cache/cache_response.c b/bin/varnishd/cache/cache_response.c
index 6bd4da7..bc7f122 100644
--- a/bin/varnishd/cache/cache_response.c
+++ b/bin/varnishd/cache/cache_response.c
@@ -459,36 +459,51 @@ RES_StreamPoll(struct worker *wrk)
 {
 	struct object *fetch_obj;
 	struct storage *st;
+	struct busyobj *bo;
+	int pass = 0;
 
-	CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
-
-	VBO_StreamData(wrk->busyobj);
-	if (wrk->busyobj->do_stream_flipflop == 1) {
-		AN(wrk->sctx);
-		/* MBGXXX: Loop around waiting for the lag behind to
-		 * be less than some configurable size, to keep the
-		 * cache memory usage low (this for streaming
-		 * extremely large objects with pass) */
-		VBO_StreamSync(wrk);
-		RES_StreamWrite(wrk->sp);
-	}
-
-	if (wrk->busyobj->stream_frontchunk == NULL)
-		return;
-
-	/* It's a pass - remove chunks already delivered. Should be OK
-	 * to do lock-free, as we are not fiddling pointers of any
-	 * storage chunk passed busyobj->stream_frontchunk */
+	bo = wrk->busyobj;
+	CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
 	fetch_obj = wrk->busyobj->fetch_obj;
 	CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC);
-	assert(fetch_obj->objcore == NULL ||
-	       (fetch_obj->objcore->flags & OC_F_PASS));
+
+	if (fetch_obj->objcore == NULL || fetch_obj->objcore->flags & OC_F_PASS)
+		pass = 1;
+
+	VBO_StreamData(bo);
 	while (1) {
-		st = VTAILQ_FIRST(&fetch_obj->store);
-		if (st == NULL || st == wrk->busyobj->stream_frontchunk)
+		if (bo->do_stream_flipflop == 1) {
+			CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
+			VBO_StreamSync(wrk);
+			RES_StreamWrite(wrk->sp);
+		}
+
+		if (bo->stream_frontchunk != NULL) {
+			/* It's a pass - remove chunks already
+			 * delivered. Should be OK to do lock-free, as
+			 * we are not fiddling pointers of any storage
+			 * chunk beyond busyobj->stream_frontchunk */
+			AN(pass);
+			while (1) {
+				st = VTAILQ_FIRST(&fetch_obj->store);
+				if (st == NULL ||
+				    st == bo->stream_frontchunk)
+					break;
+				CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
+				VTAILQ_REMOVE(&fetch_obj->store, st, list);
+				STV_free(st);
+			}
+		}
+
+		if (pass == 0 || bo->stream_pass_bufsize == 0 ||
+		    bo->stream_max - bo->stream_next < bo->stream_pass_bufsize)
 			break;
-		CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
-		VTAILQ_REMOVE(&fetch_obj->store, st, list);
-		STV_free(st);
+
+		/* Loop around waiting for the lag behind to be less
+		 * than some configurable size, to keep the cache
+		 * memory usage low (this for streaming large objects
+		 * with pass) */
+		if (bo->do_stream_flipflop == 0)
+			VTIM_sleep(0.1);
 	}
 }
-- 
1.7.4.1




More information about the varnish-dev mailing list