[PATCH 15/25] Make streaming work with multiple streaming clients

Martin Blix Grydeland martin at varnish-software.com
Sun Jan 22 18:53:21 CET 2012


---
 bin/varnishd/cache/cache_center.c |  117 ++++++++++++++++++++++---------------
 bin/varnishd/cache/cache_expire.c |    1 -
 bin/varnishd/cache/cache_fetch.c  |    2 -
 bin/varnishd/cache/cache_hash.c   |   33 ++++++++++-
 bin/varnishd/hash/hash_slinger.h  |    1 +
 5 files changed, 103 insertions(+), 51 deletions(-)

diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c
index 7a9134e..23b5f71 100644
--- a/bin/varnishd/cache/cache_center.c
+++ b/bin/varnishd/cache/cache_center.c
@@ -224,7 +224,6 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct req *req)
 	if (wrk->busyobj != NULL) {
 		CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
 		AN(wrk->busyobj->do_stream);
-		AssertObjCorePassOrBusy(req->obj->objcore);
 	}
 
 	wrk->res_mode = 0;
@@ -301,14 +300,17 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct req *req)
 	case VCL_RET_RESTART:
 		if (req->restarts >= cache_param->max_restarts)
 			break;
-		if (wrk->busyobj != NULL) {
+		if (wrk->busyobj != NULL &&
+		    (req->obj->objcore == NULL ||
+		     req->obj->objcore->flags & OC_F_BUSY)) {
 			AN(wrk->busyobj->do_stream);
 			VDI_CloseFd(wrk, &wrk->busyobj->vbc);
 			HSH_Drop(wrk);
-			VBO_DerefBusyObj(wrk, &wrk->busyobj);
 		} else {
 			(void)HSH_Deref(wrk, NULL, &req->obj);
 		}
+		if (wrk->busyobj != NULL)
+			(void)VBO_DerefBusyObj(wrk, &wrk->busyobj);
 		AZ(req->obj);
 		req->restarts++;
 		req->director = NULL;
@@ -318,8 +320,8 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct req *req)
 	default:
 		WRONG("Illegal action in vcl_deliver{}");
 	}
-	if (wrk->busyobj != NULL && wrk->busyobj->do_stream) {
-		AssertObjCorePassOrBusy(req->obj->objcore);
+	if (wrk->busyobj != NULL) {
+		AN(wrk->busyobj->do_stream);
 		sp->step = STP_STREAMBODY;
 	} else {
 		sp->step = STP_DELIVER;
@@ -983,13 +985,18 @@ cnt_streambody_task(struct worker *wrk, void *priv)
 	objcore = obj->objcore;
 
 	wrk->busyobj->fetch_failed = FetchBody(wrk, wrk->busyobj);
+	VBO_StreamStopped(wrk->busyobj);
 
 	wrk->stats.fetch_threaded++;
 
+	if (obj->objcore != NULL) {
+		HSH_RemoveBusyObj(wrk, obj->objcore);
+		if (wrk->busyobj->fetch_failed == 0)
+			EXP_Insert(obj);
+	}
 	AZ(wrk->busyobj->fetch_obj);
 	AZ(wrk->busyobj->vbc);
 	wrk->busyobj->vfp = NULL;
-	VBO_StreamStopped(wrk->busyobj);
 
 	u = VBO_DerefBusyObj(wrk, &wrk->busyobj);
 	if (objcore != NULL || u == 0) {
@@ -1013,6 +1020,7 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
 	CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
 
 	CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
+	AN(wrk->busyobj->do_stream);
 	memset(&sctx, 0, sizeof sctx);
 	sctx.magic = STREAM_CTX_MAGIC;
 	AZ(wrk->sctx);
@@ -1024,58 +1032,72 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
 		sctx.obuf_len = sizeof (obuf);
 	}
 
-	AssertObjCorePassOrBusy(req->obj->objcore);
 	RES_StreamStart(sp);
 
-	/* MBGXXX: Test on OC_F_BUSY to see if we should initiate
-	 * fetch at all. This code now assumes all passes through here
-	 * needs to do the fetch as well. (Multiple streaming clients
-	 * not implemented yet) */
-	wrk->acct_tmp.fetch++;
-	AZ(wrk->busyobj->fetch_obj);
-	wrk->busyobj->fetch_obj = req->obj;
-	http_Setup(wrk->busyobj->bereq, NULL);
-	http_Setup(wrk->busyobj->beresp, NULL);
-	wrk_ex = SES_GrabWorker(sp, 100); /* MBGXXX: Configurable
-					   * thread grabbing
-					   * timeout */
-	if (wrk_ex != NULL) {
-		/* Set up separate thread fetch */
-		wrk->busyobj->use_locks = 1;
-		if (req->obj->objcore != NULL)
-			/* Grab a ref on the objcore for the other thread */
-			HSH_Ref(req->obj->objcore);
-		VBO_RefBusyObj(wrk->busyobj); /* Ref for the other thread */
-		WRK_DoTask(wrk_ex, cnt_streambody_task, wrk->busyobj);
-	} else {
-		/* We have no worker */
-		if (wrk->busyobj->fetch_obj->objcore == NULL ||
-		    wrk->busyobj->fetch_obj->objcore->flags & OC_F_PASS) {
-			/* It's a pass, prefer flipflop
-			 * streaming. (MBGXXX: Flipflop not finished
-			 * yet) */
-			wrk->busyobj->do_stream_flipflop = 1;
-			wrk->stats.fetch_flipflop++;
+	if (req->obj->objcore == NULL || req->obj->objcore->flags & OC_F_BUSY) {
+		/* Initiate a fetch of the body */
+		wrk->acct_tmp.fetch++;
+		AZ(wrk->busyobj->fetch_obj);
+		wrk->busyobj->fetch_obj = req->obj;
+		http_Setup(wrk->busyobj->bereq, NULL);
+		http_Setup(wrk->busyobj->beresp, NULL);
+		wrk_ex = SES_GrabWorker(sp, 100); /* MBGXXX: Configurable
+						   * thread grabbing
+						   * timeout */
+		if (wrk_ex != NULL)
+			wrk->busyobj->use_locks = 1;
+		if (req->obj->objcore != NULL) {
+			AN(req->obj->objcore->ban);
+			HSH_Unbusy(wrk);
+			AN(req->obj->objcore->busyobj);
+		}
+		if (wrk_ex != NULL) {
+			/* Set up separate thread fetch */
+			if (req->obj->objcore != NULL)
+				/* Grab a ref on the objcore for the
+				 * other thread */
+				HSH_Ref(req->obj->objcore);
+			WRK_DoTask(wrk_ex, cnt_streambody_task,
+				   VBO_RefBusyObj(wrk->busyobj));
+		} else {
+			/* We have no fetch worker */
+			if (req->obj->objcore == NULL ||
+			    req->obj->objcore->flags & OC_F_PASS) {
+				/* It's a pass, prefer flipflop
+				 * streaming. (MBGXXX: Flipflop not
+				 * finished yet) */
+				wrk->busyobj->do_stream_flipflop = 1;
+				wrk->stats.fetch_flipflop++;
+			}
+			wrk->busyobj->fetch_failed =
+				FetchBody(wrk, wrk->busyobj);
+			VBO_StreamStopped(wrk->busyobj);
+			if (req->obj->objcore != NULL) {
+				HSH_RemoveBusyObj(wrk, req->obj->objcore);
+				if (wrk->busyobj->fetch_failed == 0)
+					EXP_Insert(req->obj);
+			}
+			AZ(wrk->busyobj->fetch_obj);
+			AZ(wrk->busyobj->vbc);
+			wrk->busyobj->vfp = NULL;
+			VBO_StreamSync(wrk);
 		}
-		wrk->busyobj->fetch_failed = FetchBody(sp->wrk, wrk->busyobj);
-		VBO_StreamStopped(wrk->busyobj);
 	}
 
-	RES_StreamBody(sp);
+	if (wrk->busyobj->do_stream_flipflop == 0)
+		RES_StreamBody(sp);
+	else
+		AN(wrk->sctx->stream_stopped);
 
-	if (wrk->busyobj->htc.ws == wrk->ws)
+	if (wrk->busyobj->htc.ws == wrk->ws) {
 		/* Busyobj's htc has buffer on our workspace,
 		   wait for it to be released */
+		AZ(wrk->busyobj->do_stream_flipflop);
 		VBO_StreamWait(wrk->busyobj);
+	}
 
 	if (wrk->busyobj->fetch_failed) {
 		req->doclose = "Stream error";
-	} else if (req->obj->objcore != NULL) {
-		/* MBGXXX: This should be done on the bg task */
-		EXP_Insert(req->obj);
-		AN(req->obj->objcore);
-		AN(req->obj->objcore->ban);
-		HSH_Unbusy(wrk);
 	}
 	req->director = NULL;
 	req->restarts = 0;
@@ -1169,7 +1191,6 @@ cnt_hit(struct sess *sp, struct worker *wrk, struct req *req)
 
 	CHECK_OBJ_NOTNULL(req->obj, OBJECT_MAGIC);
 	CHECK_OBJ_NOTNULL(req->vcl, VCL_CONF_MAGIC);
-	AZ(wrk->busyobj);
 
 	assert(!(req->obj->objcore->flags & OC_F_PASS));
 
@@ -1315,6 +1336,8 @@ cnt_lookup(struct sess *sp, struct worker *wrk, struct req *req)
 		wrk->stats.cache_hitpass++;
 		WSP(sp, SLT_HitPass, "%u", req->obj->xid);
 		(void)HSH_Deref(wrk, NULL, &req->obj);
+		if (wrk->busyobj != NULL)
+			(void)VBO_DerefBusyObj(wrk, &wrk->busyobj);
 		req->objcore = NULL;
 		sp->step = STP_PASS;
 		return (0);
diff --git a/bin/varnishd/cache/cache_expire.c b/bin/varnishd/cache/cache_expire.c
index e93a1aa..212adf3 100644
--- a/bin/varnishd/cache/cache_expire.c
+++ b/bin/varnishd/cache/cache_expire.c
@@ -224,7 +224,6 @@ EXP_Insert(struct object *o)
 	CHECK_OBJ_NOTNULL(o, OBJECT_MAGIC);
 	oc = o->objcore;
 	CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
-	AssertObjBusy(o);
 	HSH_Ref(oc);
 
 	assert(o->exp.entered != 0 && !isnan(o->exp.entered));
diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c
index d5ae4d5..0643389 100644
--- a/bin/varnishd/cache/cache_fetch.c
+++ b/bin/varnishd/cache/cache_fetch.c
@@ -507,8 +507,6 @@ FetchBody(struct worker *wrk, struct busyobj *bo)
 	if (bo->vfp == NULL)
 		bo->vfp = &vfp_nop;
 
-	AssertObjCorePassOrBusy(obj->objcore);
-
 	AZ(bo->vgz_rx);
 	AZ(VTAILQ_FIRST(&obj->store));
 
diff --git a/bin/varnishd/cache/cache_hash.c b/bin/varnishd/cache/cache_hash.c
index e78eb60..a12d43a 100644
--- a/bin/varnishd/cache/cache_hash.c
+++ b/bin/varnishd/cache/cache_hash.c
@@ -63,6 +63,8 @@
 
 static const struct hash_slinger *hash;
 
+static void hsh_rush(struct objhead *oh);
+
 /*---------------------------------------------------------------------*/
 /* Precreate an objhead and object for later use */
 void
@@ -414,6 +416,13 @@ HSH_Lookup(struct sess *sp, struct objhead **poh)
 		if (o->hits < INT_MAX)
 			o->hits++;
 		assert(oh->refcnt > 1);
+		if (oc->busyobj != NULL) {
+			/* It's streamable */
+			CHECK_OBJ_NOTNULL(oc->busyobj, BUSYOBJ_MAGIC);
+			wrk->busyobj = VBO_RefBusyObj(oc->busyobj);
+			if (oh->waitinglist != NULL)
+				hsh_rush(oh);
+		}
 		Lck_Unlock(&oh->mtx);
 		assert(hash->deref(oh));
 		*poh = oh;
@@ -623,7 +632,7 @@ HSH_Unbusy(struct worker *wrk)
 	VTAILQ_REMOVE(&oh->objcs, oc, list);
 	VTAILQ_INSERT_HEAD(&oh->objcs, oc, list);
 	oc->flags &= ~OC_F_BUSY;
-	if (oc->busyobj != NULL)
+	if (oc->busyobj->do_stream == 0)
 		(void)VBO_DerefBusyObj(wrk, &oc->busyobj);
 	if (oh->waitinglist != NULL)
 		hsh_rush(oh);
@@ -632,6 +641,28 @@ HSH_Unbusy(struct worker *wrk)
 	assert(oc_getobj(wrk, oc) == o);
 }
 
+/*---------------------------------------------------------------------
+ * Drop the objcore's ref on the busyobj while holding the objhead mutex
+ */
+
+void
+HSH_RemoveBusyObj(struct worker *wrk, struct objcore *oc)
+{
+	struct objhead *oh;
+
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+	CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
+	oh = oc->objhead;
+	CHECK_OBJ_NOTNULL(oh, OBJHEAD_MAGIC);
+
+	AZ(oc->flags & OC_F_BUSY);
+	AN(oc->busyobj);
+	assert(oc->busyobj->do_stream == 0 || oc->busyobj->stream_stopped == 1);
+	Lck_Lock(&oh->mtx);
+	(void)VBO_DerefBusyObj(wrk, &oc->busyobj);
+	Lck_Unlock(&oh->mtx);
+}
+
 void
 HSH_Ref(struct objcore *oc)
 {
diff --git a/bin/varnishd/hash/hash_slinger.h b/bin/varnishd/hash/hash_slinger.h
index b45e604..8affa42 100644
--- a/bin/varnishd/hash/hash_slinger.h
+++ b/bin/varnishd/hash/hash_slinger.h
@@ -55,6 +55,7 @@ void HSH_Prealloc(const struct sess *sp);
 void HSH_Cleanup(struct worker *w);
 struct objcore *HSH_Lookup(struct sess *sp, struct objhead **poh);
 void HSH_Unbusy(struct worker *wrk);
+void HSH_RemoveBusyObj(struct worker *wrk, struct objcore *oc);
 void HSH_Ref(struct objcore *o);
 void HSH_Drop(struct worker *wrk);
 void HSH_Init(const struct hash_slinger *slinger);
-- 
1.7.4.1




More information about the varnish-dev mailing list