[PATCH 10/25] Rework RES_StreamPoll to use the VBO_StreamData and VBO_StreamSync functions, and make the object data access thread safe

Martin Blix Grydeland martin at varnish-software.com
Sun Jan 22 18:53:16 CET 2012


---
 bin/varnishd/cache/cache.h          |    4 +-
 bin/varnishd/cache/cache_response.c |   67 ++++++++++++++++++++++------------
 2 files changed, 45 insertions(+), 26 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 0dd8ef4..b59a217 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -269,7 +269,7 @@ struct stream_ctx {
 	/* Next byte we will take from storage */
 	ssize_t			stream_next;
 
-	/* First byte of storage if we free it as we go (pass) */
+	/* Point in storage chunk chain we have reached */
 	ssize_t			stream_front;
 	struct storage		*stream_frontchunk;
 
@@ -998,7 +998,7 @@ void RES_BuildHttp(const struct sess *sp);
 void RES_WriteObj(struct sess *sp);
 void RES_StreamStart(struct sess *sp);
 void RES_StreamEnd(struct sess *sp);
-void RES_StreamPoll(struct worker *);
+void RES_StreamPoll(struct worker *wrk);
 
 /* cache_vary.c */
 struct vsb *VRY_Create(const struct sess *sp, const struct http *hp);
diff --git a/bin/varnishd/cache/cache_response.c b/bin/varnishd/cache/cache_response.c
index 86f4517..cb6ddd6 100644
--- a/bin/varnishd/cache/cache_response.c
+++ b/bin/varnishd/cache/cache_response.c
@@ -370,23 +370,39 @@ RES_StreamPoll(struct worker *wrk)
 {
 	struct stream_ctx *sctx;
 	struct storage *st;
-	ssize_t l, l2;
+	struct object *fetch_obj;
+	ssize_t l, l2, stlen;
 	void *ptr;
 
 	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
-	CHECK_OBJ_NOTNULL(wrk->busyobj->fetch_obj, OBJECT_MAGIC);
+	CHECK_OBJ_NOTNULL(wrk->sp->req->obj, OBJECT_MAGIC);
 	sctx = wrk->sctx;
 	CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
-	if (wrk->busyobj->fetch_obj->len == sctx->stream_next)
+
+	VBO_StreamData(wrk->busyobj);
+	VBO_StreamSync(wrk);
+
+	if (sctx->stream_max == sctx->stream_next)
 		return;
-	assert(wrk->busyobj->fetch_obj->len > sctx->stream_next);
+	assert(sctx->stream_max > sctx->stream_next);
+
 	l = sctx->stream_front;
-	VTAILQ_FOREACH(st, &wrk->busyobj->fetch_obj->store, list) {
-		if (st->len + l <= sctx->stream_next) {
-			l += st->len;
+	st = sctx->stream_frontchunk;
+	if (st == NULL)
+		st = VTAILQ_FIRST(&wrk->sp->req->obj->store);
+	for (; st != NULL; st = VTAILQ_NEXT(st, list)) {
+		CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
+		sctx->stream_front = l;
+		sctx->stream_frontchunk = st;
+		stlen = st->len;
+		if (l + stlen <= sctx->stream_next) {
+			l += stlen;
 			continue;
 		}
-		l2 = st->len + l - sctx->stream_next;
+		assert(l + stlen > sctx->stream_next);
+		l2 = l + stlen - sctx->stream_next;
+		if (sctx->stream_next + l2 > sctx->stream_max)
+			l2 = sctx->stream_max - sctx->stream_next;
 		ptr = st->ptr + (sctx->stream_next - l);
 		if (wrk->res_mode & RES_GUNZIP) {
 			(void)VGZ_WrwGunzip(wrk, sctx->vgz, ptr, l2,
@@ -394,27 +410,30 @@ RES_StreamPoll(struct worker *wrk)
 		} else {
 			(void)WRW_Write(wrk, ptr, l2);
 		}
-		l += st->len;
 		sctx->stream_next += l2;
+		if (sctx->stream_next == sctx->stream_max)
+			break;
+		AN(VTAILQ_NEXT(st, list));
+		l += st->len;
 	}
 	if (!(wrk->res_mode & RES_GUNZIP))
 		(void)WRW_Flush(wrk);
 
-	if (wrk->busyobj->fetch_obj->objcore == NULL ||
-	    (wrk->busyobj->fetch_obj->objcore->flags & OC_F_PASS)) {
-		/*
-		 * This is a pass object, release storage as soon as we
-		 * have delivered it.
-		 */
-		while (1) {
-			st = VTAILQ_FIRST(&wrk->busyobj->fetch_obj->store);
-			if (st == NULL ||
-			    sctx->stream_front + st->len > sctx->stream_next)
-				break;
-			VTAILQ_REMOVE(&wrk->busyobj->fetch_obj->store, st, list);
-			sctx->stream_front += st->len;
-			STV_free(st);
-		}
+	if (wrk->busyobj->stream_frontchunk == NULL)
+		return;
+
+	/* It's a pass - remove chunks already delivered */
+	fetch_obj = wrk->busyobj->fetch_obj;
+	CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC);
+	assert(fetch_obj->objcore == NULL ||
+	       (fetch_obj->objcore->flags & OC_F_PASS));
+	while (1) {
+		st = VTAILQ_FIRST(&fetch_obj->store);
+		if (st == NULL || st == wrk->busyobj->stream_frontchunk)
+			break;
+		CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
+		VTAILQ_REMOVE(&fetch_obj->store, st, list);
+		STV_free(st);
 	}
 }
 
-- 
1.7.4.1




More information about the varnish-dev mailing list