[PATCH 09/25] Add stream data synchronization functions to cache_busyobj.c in preparation of threaded streaming.

Martin Blix Grydeland martin at varnish-software.com
Sun Jan 22 18:53:15 CET 2012


VBO_StreamData is called by the fetch to update the busyobj with how
much data is available.

VBO_StreamSync is called by the dilvery to update it's local
stream_ctx struct with the new pointers.

VBO_StreamStopped signals object fetch has finished.

VBO_StreamWait waits for the streaming thread to finish looking at
other thread's workspace.
---
 bin/varnishd/cache/cache.h         |   16 ++++++
 bin/varnishd/cache/cache_busyobj.c |   91 ++++++++++++++++++++++++++++++++++++
 2 files changed, 107 insertions(+), 0 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 0bc3f59..0dd8ef4 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -271,6 +271,13 @@ struct stream_ctx {
 
 	/* First byte of storage if we free it as we go (pass) */
 	ssize_t			stream_front;
+	struct storage		*stream_frontchunk;
+
+	/* Max byte we can stream */
+	ssize_t			stream_max;
+
+	/* Backend fetch has finished */
+	unsigned		stream_stopped;
 };
 
 /*--------------------------------------------------------------------*/
@@ -530,6 +537,11 @@ struct busyobj {
 	unsigned		do_gzip;
 	unsigned		do_gunzip;
 	unsigned		do_stream;
+
+	/* Stream stuff */
+	ssize_t			stream_max;
+	struct storage		*stream_frontchunk;
+	unsigned		stream_stopped;
 };
 
 /* Object structure --------------------------------------------------*/
@@ -738,6 +750,10 @@ struct busyobj *VBO_GetBusyObj(struct worker *wrk);
 struct busyobj *VBO_RefBusyObj(struct busyobj *busyobj);
 unsigned VBO_DerefBusyObj(struct worker *wrk, struct busyobj **busyobj);
 void VBO_Free(struct vbo **vbo);
+void VBO_StreamStopped(struct busyobj *busyobj);
+void VBO_StreamWait(struct busyobj *busyobj);
+void VBO_StreamData(struct busyobj *busyobj);
+void VBO_StreamSync(struct worker *wrk);
 
 /* cache_center.c [CNT] */
 void CNT_Session(struct sess *sp);
diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c
index 423df93..7651999 100644
--- a/bin/varnishd/cache/cache_busyobj.c
+++ b/bin/varnishd/cache/cache_busyobj.c
@@ -208,3 +208,94 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo)
 
 	return (r);
 }
+
+/* Signal that the fetch thread has stopped */
+void
+VBO_StreamStopped(struct busyobj *busyobj)
+{
+	if (!busyobj->use_locks) {
+		busyobj->stream_stopped = 1;
+		return;
+	}
+	Lck_Lock(&busyobj->vbo->mtx);
+	busyobj->stream_stopped = 1;
+	AZ(pthread_cond_broadcast(&busyobj->vbo->cond));
+	Lck_Unlock(&busyobj->vbo->mtx);
+}
+
+/* Wait for the fetch thread to finish reading the pipeline buffer */
+void
+VBO_StreamWait(struct busyobj *busyobj)
+{
+	if (!busyobj->use_locks)
+		return;
+	Lck_Lock(&busyobj->vbo->mtx);
+	while (busyobj->htc.pipeline.b != NULL && busyobj->stream_stopped == 0)
+		Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx, NULL);
+	Lck_Unlock(&busyobj->vbo->mtx);
+}
+
+/* Signal additional data available */
+void
+VBO_StreamData(struct busyobj *busyobj)
+{
+	CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+	CHECK_OBJ_NOTNULL(busyobj->fetch_obj, OBJECT_MAGIC);
+
+	if (busyobj->use_locks)
+		Lck_Lock(&busyobj->vbo->mtx);
+	assert(busyobj->fetch_obj->len >= busyobj->stream_max);
+	if (busyobj->fetch_obj->len > busyobj->stream_max) {
+		busyobj->stream_max = busyobj->fetch_obj->len;
+		if (busyobj->use_locks)
+			AZ(pthread_cond_broadcast(&busyobj->vbo->cond));
+	}
+	if (busyobj->use_locks)
+		Lck_Unlock(&busyobj->vbo->mtx);
+}
+
+/* Sync the client's stream_ctx with the busyobj, and block on no more
+ * data available */
+void
+VBO_StreamSync(struct worker *wrk)
+{
+	struct busyobj *busyobj;
+	struct stream_ctx *sctx;
+
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+	CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
+	busyobj = wrk->busyobj;
+	CHECK_OBJ_NOTNULL(wrk->sp, SESS_MAGIC);
+	CHECK_OBJ_NOTNULL(wrk->sp->req, REQ_MAGIC);
+	CHECK_OBJ_NOTNULL(wrk->sp->req->obj, OBJECT_MAGIC);
+	CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
+	sctx = wrk->sctx;
+
+	if (busyobj->use_locks)
+		Lck_Lock(&busyobj->vbo->mtx);
+	assert(sctx->stream_max <= busyobj->stream_max);
+
+	if (wrk->sp->req->obj->objcore == NULL ||
+	    (wrk->sp->req->obj->objcore->flags & OC_F_PASS)) {
+		/* Give notice to backend fetch that we are finished
+		 * with all chunks before this one */
+		busyobj->stream_frontchunk = sctx->stream_frontchunk;
+	}
+
+	sctx->stream_stopped = busyobj->stream_stopped;
+	sctx->stream_max = busyobj->stream_max;
+
+	if (busyobj->use_locks && !sctx->stream_stopped &&
+	    sctx->stream_next == sctx->stream_max) {
+		while (!busyobj->stream_stopped &&
+		       sctx->stream_max == busyobj->stream_max) {
+			Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx,
+				     NULL);
+		}
+		sctx->stream_stopped = busyobj->stream_stopped;
+		sctx->stream_max = busyobj->stream_max;
+	}
+
+	if (busyobj->use_locks)
+		Lck_Unlock(&busyobj->vbo->mtx);
+}
-- 
1.7.4.1




More information about the varnish-dev mailing list