[PATCH 12/25] Use background thread fetching when streaming

Martin Blix Grydeland martin at varnish-software.com
Sun Jan 22 18:53:18 CET 2012


---
 bin/varnishd/cache/cache.h          |    5 ++
 bin/varnishd/cache/cache_center.c   |   96 +++++++++++++++++++++++++++++-----
 bin/varnishd/cache/cache_response.c |   84 ++++++++++++++++++++++--------
 3 files changed, 149 insertions(+), 36 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index da06965..880f5e3 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -537,6 +537,7 @@ struct busyobj {
 	unsigned		do_gzip;
 	unsigned		do_gunzip;
 	unsigned		do_stream;
+	unsigned		do_stream_flipflop;
 
 	/* Stream stuff */
 	ssize_t			stream_max;
@@ -793,6 +794,8 @@ int FetchError(struct worker *w, const char *error);
 int FetchError2(struct worker *w, const char *error, const char *more);
 int FetchHdr(struct sess *sp, int need_host_hdr);
 int FetchBody(struct worker *w, struct busyobj *bo);
+void FetchBodyBackground(struct sess *sp, struct busyobj *bo);
+void FetchBodyWait(struct busyobj *bo);
 int FetchReqBody(const struct sess *sp);
 void Fetch_Init(void);
 
@@ -999,6 +1002,8 @@ void WSL_Flush(struct worker *w, int overflow);
 void RES_BuildHttp(const struct sess *sp);
 void RES_WriteObj(struct sess *sp);
 void RES_StreamStart(struct sess *sp);
+void RES_StreamBody(struct sess *sp);
+void RES_StreamWrite(struct sess *sp);
 void RES_StreamEnd(struct sess *sp);
 void RES_StreamPoll(struct worker *wrk);
 
diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c
index 386012c..1f1a239 100644
--- a/bin/varnishd/cache/cache_center.c
+++ b/bin/varnishd/cache/cache_center.c
@@ -963,13 +963,47 @@ DOT }
 DOT streambody -> DONE [style=bold,color=cyan]
  */
 
+/* Background fetch task. Should be called with ref on busyobj, and
+   the objcore if present */
+
+static void
+cnt_streambody_task(struct worker *wrk, void *priv)
+{
+	struct object *obj;
+	struct objcore *objcore;
+	unsigned u;
+
+	AZ(wrk->busyobj);
+	CAST_OBJ_NOTNULL(wrk->busyobj, priv, BUSYOBJ_MAGIC);
+	AN(wrk->busyobj->use_locks);
+
+	CHECK_OBJ_NOTNULL(wrk->busyobj->fetch_obj, OBJECT_MAGIC);
+	AN(wrk->busyobj->vbc);
+	obj = wrk->busyobj->fetch_obj;
+	objcore = obj->objcore;
+
+	wrk->busyobj->fetch_failed = FetchBody(wrk, wrk->busyobj);
+	AZ(wrk->busyobj->fetch_obj);
+	AZ(wrk->busyobj->vbc);
+	wrk->busyobj->vfp = NULL;
+	VBO_StreamStopped(wrk->busyobj);
+
+	u = VBO_DerefBusyObj(wrk, &wrk->busyobj);
+	if (objcore != NULL || u == 0) {
+		/* Only deref object if it has it's own refcnt, or we
+		 * were the last to deref the busyobj */
+		(void)HSH_Deref(wrk, NULL, &obj);
+	}
+}
+
 static int
 cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
 {
-	int i;
 	struct stream_ctx sctx;
 	uint8_t obuf[sp->wrk->res_mode & RES_GUNZIP ?
 	    cache_param->gzip_stack_buffer : 1];
+	struct worker *wrk_ex;
+	unsigned u;
 
 	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
 	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
@@ -987,28 +1021,56 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
 		sctx.obuf_len = sizeof (obuf);
 	}
 
-	RES_StreamStart(sp);
-
 	AssertObjCorePassOrBusy(req->obj->objcore);
+	RES_StreamStart(sp);
 
+	/* MBGXXX: Test on OC_F_BUSY to see if we should initiate
+	 * fetch at all. This code now assumes all passes through here
+	 * needs to do the fetch as well. (Multiple streaming clients
+	 * not implemented yet) */
 	AZ(wrk->busyobj->fetch_obj);
 	wrk->busyobj->fetch_obj = req->obj;
-	i = FetchBody(wrk, wrk->busyobj);
-	AZ(wrk->busyobj->fetch_obj);
-
 	http_Setup(wrk->busyobj->bereq, NULL);
 	http_Setup(wrk->busyobj->beresp, NULL);
-	wrk->busyobj->vfp = NULL;
-	AZ(wrk->busyobj->vbc);
-	AN(req->director);
+	wrk_ex = SES_GrabWorker(sp, 100); /* MBGXXX: Configurable
+					   * thread grabbing
+					   * timeout */
+	if (wrk_ex != NULL) {
+		/* Set up separate thread fetch */
+		wrk->busyobj->use_locks = 1;
+		if (req->obj->objcore != NULL)
+			/* Grab a ref on the objcore for the other thread */
+			HSH_Ref(req->obj->objcore);
+		VBO_RefBusyObj(wrk->busyobj); /* Ref for the other thread */
+		WRK_DoTask(wrk_ex, cnt_streambody_task, wrk->busyobj);
+	} else {
+		/* We have no worker */
+		if (wrk->busyobj->fetch_obj->objcore == NULL ||
+		    wrk->busyobj->fetch_obj->objcore->flags & OC_F_PASS) {
+			/* It's a pass, prefer flipflop
+			 * streaming. (MBGXXX: Flipflop not finished
+			 * yet) */
+			wrk->busyobj->do_stream_flipflop = 1;
+		}
+		wrk->busyobj->fetch_failed = FetchBody(sp->wrk, wrk->busyobj);
+		VBO_StreamStopped(wrk->busyobj);
+	}
 
-	if (!i && req->obj->objcore != NULL) {
+	RES_StreamBody(sp);
+
+	if (wrk->busyobj->htc.ws == wrk->ws)
+		/* Busyobj's htc has buffer on our workspace,
+		   wait for it to be released */
+		VBO_StreamWait(wrk->busyobj);
+
+	if (wrk->busyobj->fetch_failed) {
+		req->doclose = "Stream error";
+	} else if (req->obj->objcore != NULL) {
+		/* MBGXXX: This should be done on the bg task */
 		EXP_Insert(req->obj);
 		AN(req->obj->objcore);
 		AN(req->obj->objcore->ban);
 		HSH_Unbusy(wrk);
-	} else {
-		req->doclose = "Stream error";
 	}
 	wrk->acct_tmp.fetch++;
 	req->director = NULL;
@@ -1021,8 +1083,14 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
 	wrk->sctx = NULL;
 	assert(WRW_IsReleased(wrk));
 	assert(wrk->wrw.ciov == wrk->wrw.siov);
-	(void)HSH_Deref(wrk, NULL, &req->obj);
-	(void)VBO_DerefBusyObj(wrk, &wrk->busyobj);
+	u = VBO_DerefBusyObj(wrk, &wrk->busyobj);
+	if (req->obj->objcore != NULL || u == 0) {
+		/* Only deref object if it has it's own refcnt, or we
+		 * were the last to deref the busyobj */
+		(void)HSH_Deref(wrk, NULL, &req->obj);
+	} else
+		/* Object will be deref'ed by fetch thread */
+		req->obj = NULL;
 	http_Setup(req->resp, NULL);
 	sp->step = STP_DONE;
 	return (0);
diff --git a/bin/varnishd/cache/cache_response.c b/bin/varnishd/cache/cache_response.c
index cb6ddd6..a49acbe 100644
--- a/bin/varnishd/cache/cache_response.c
+++ b/bin/varnishd/cache/cache_response.c
@@ -366,22 +366,39 @@ RES_StreamStart(struct sess *sp)
 }
 
 void
-RES_StreamPoll(struct worker *wrk)
+RES_StreamBody(struct sess *sp)
 {
 	struct stream_ctx *sctx;
+	struct busyobj *bo;
+
+	sctx = sp->wrk->sctx;
+	CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
+	bo = sp->wrk->busyobj;
+	CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
+	AN(sp->req->wantbody);
+
+	while (!sctx->stream_stopped || sctx->stream_next < sctx->stream_max)  {
+		VBO_StreamSync(sp->wrk);
+		RES_StreamWrite(sp);
+	}
+}
+
+void
+RES_StreamWrite(struct sess *sp)
+{
+	struct worker *wrk;
+	struct stream_ctx *sctx;
 	struct storage *st;
-	struct object *fetch_obj;
 	ssize_t l, l2, stlen;
 	void *ptr;
 
+	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+	wrk = sp->wrk;
 	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
 	CHECK_OBJ_NOTNULL(wrk->sp->req->obj, OBJECT_MAGIC);
 	sctx = wrk->sctx;
 	CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
 
-	VBO_StreamData(wrk->busyobj);
-	VBO_StreamSync(wrk);
-
 	if (sctx->stream_max == sctx->stream_next)
 		return;
 	assert(sctx->stream_max > sctx->stream_next);
@@ -418,23 +435,6 @@ RES_StreamPoll(struct worker *wrk)
 	}
 	if (!(wrk->res_mode & RES_GUNZIP))
 		(void)WRW_Flush(wrk);
-
-	if (wrk->busyobj->stream_frontchunk == NULL)
-		return;
-
-	/* It's a pass - remove chunks already delivered */
-	fetch_obj = wrk->busyobj->fetch_obj;
-	CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC);
-	assert(fetch_obj->objcore == NULL ||
-	       (fetch_obj->objcore->flags & OC_F_PASS));
-	while (1) {
-		st = VTAILQ_FIRST(&fetch_obj->store);
-		if (st == NULL || st == wrk->busyobj->stream_frontchunk)
-			break;
-		CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
-		VTAILQ_REMOVE(&fetch_obj->store, st, list);
-		STV_free(st);
-	}
 }
 
 void
@@ -453,3 +453,43 @@ RES_StreamEnd(struct sess *sp)
 	if (WRW_FlushRelease(sp->wrk))
 		SES_Close(sp, "remote closed");
 }
+
+void
+RES_StreamPoll(struct worker *wrk)
+{
+	struct object *fetch_obj;
+	struct storage *st;
+
+	CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
+
+	VBO_StreamData(wrk->busyobj);
+	if (wrk->busyobj->do_stream_flipflop == 1) {
+		AN(wrk->sctx);
+		/* MBGXXX: Do flip-flop streaming */
+		/* MBGXXX: Loop around waiting for the lag behind to
+		 * be less than some configurable size, to keep the
+		 * cache memory usage low (this for streaming
+		 * extremely large objects with pass) */
+		VBO_StreamSync(wrk);
+		RES_StreamWrite(wrk->sp);
+	}
+
+	if (wrk->busyobj->stream_frontchunk == NULL)
+		return;
+
+	/* It's a pass - remove chunks already delivered. Should be OK
+	 * to do lock-free, as we are not fiddling pointers of any
+	 * storage chunk passed busyobj->stream_frontchunk */
+	fetch_obj = wrk->busyobj->fetch_obj;
+	CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC);
+	assert(fetch_obj->objcore == NULL ||
+	       (fetch_obj->objcore->flags & OC_F_PASS));
+	while (1) {
+		st = VTAILQ_FIRST(&fetch_obj->store);
+		if (st == NULL || st == wrk->busyobj->stream_frontchunk)
+			break;
+		CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
+		VTAILQ_REMOVE(&fetch_obj->store, st, list);
+		STV_free(st);
+	}
+}
-- 
1.7.4.1




More information about the varnish-dev mailing list