[PATCH 09/25] Add stream data synchronization functions to cache_busyobj.c in preparation of threaded streaming.
Martin Blix Grydeland
martin at varnish-software.com
Sun Jan 22 18:53:15 CET 2012
VBO_StreamData is called by the fetch to update the busyobj with how
much data is available.
VBO_StreamSync is called by the dilvery to update it's local
stream_ctx struct with the new pointers.
VBO_StreamStopped signals object fetch has finished.
VBO_StreamWait waits for the streaming thread to finish looking at
other thread's workspace.
---
bin/varnishd/cache/cache.h | 16 ++++++
bin/varnishd/cache/cache_busyobj.c | 91 ++++++++++++++++++++++++++++++++++++
2 files changed, 107 insertions(+), 0 deletions(-)
diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 0bc3f59..0dd8ef4 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -271,6 +271,13 @@ struct stream_ctx {
/* First byte of storage if we free it as we go (pass) */
ssize_t stream_front;
+ struct storage *stream_frontchunk;
+
+ /* Max byte we can stream */
+ ssize_t stream_max;
+
+ /* Backend fetch has finished */
+ unsigned stream_stopped;
};
/*--------------------------------------------------------------------*/
@@ -530,6 +537,11 @@ struct busyobj {
unsigned do_gzip;
unsigned do_gunzip;
unsigned do_stream;
+
+ /* Stream stuff */
+ ssize_t stream_max;
+ struct storage *stream_frontchunk;
+ unsigned stream_stopped;
};
/* Object structure --------------------------------------------------*/
@@ -738,6 +750,10 @@ struct busyobj *VBO_GetBusyObj(struct worker *wrk);
struct busyobj *VBO_RefBusyObj(struct busyobj *busyobj);
unsigned VBO_DerefBusyObj(struct worker *wrk, struct busyobj **busyobj);
void VBO_Free(struct vbo **vbo);
+void VBO_StreamStopped(struct busyobj *busyobj);
+void VBO_StreamWait(struct busyobj *busyobj);
+void VBO_StreamData(struct busyobj *busyobj);
+void VBO_StreamSync(struct worker *wrk);
/* cache_center.c [CNT] */
void CNT_Session(struct sess *sp);
diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c
index 423df93..7651999 100644
--- a/bin/varnishd/cache/cache_busyobj.c
+++ b/bin/varnishd/cache/cache_busyobj.c
@@ -208,3 +208,94 @@ VBO_DerefBusyObj(struct worker *wrk, struct busyobj **pbo)
return (r);
}
+
+/* Signal that the fetch thread has stopped */
+void
+VBO_StreamStopped(struct busyobj *busyobj)
+{
+ if (!busyobj->use_locks) {
+ busyobj->stream_stopped = 1;
+ return;
+ }
+ Lck_Lock(&busyobj->vbo->mtx);
+ busyobj->stream_stopped = 1;
+ AZ(pthread_cond_broadcast(&busyobj->vbo->cond));
+ Lck_Unlock(&busyobj->vbo->mtx);
+}
+
+/* Wait for the fetch thread to finish reading the pipeline buffer */
+void
+VBO_StreamWait(struct busyobj *busyobj)
+{
+ if (!busyobj->use_locks)
+ return;
+ Lck_Lock(&busyobj->vbo->mtx);
+ while (busyobj->htc.pipeline.b != NULL && busyobj->stream_stopped == 0)
+ Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx, NULL);
+ Lck_Unlock(&busyobj->vbo->mtx);
+}
+
+/* Signal additional data available */
+void
+VBO_StreamData(struct busyobj *busyobj)
+{
+ CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+ CHECK_OBJ_NOTNULL(busyobj->fetch_obj, OBJECT_MAGIC);
+
+ if (busyobj->use_locks)
+ Lck_Lock(&busyobj->vbo->mtx);
+ assert(busyobj->fetch_obj->len >= busyobj->stream_max);
+ if (busyobj->fetch_obj->len > busyobj->stream_max) {
+ busyobj->stream_max = busyobj->fetch_obj->len;
+ if (busyobj->use_locks)
+ AZ(pthread_cond_broadcast(&busyobj->vbo->cond));
+ }
+ if (busyobj->use_locks)
+ Lck_Unlock(&busyobj->vbo->mtx);
+}
+
+/* Sync the client's stream_ctx with the busyobj, and block on no more
+ * data available */
+void
+VBO_StreamSync(struct worker *wrk)
+{
+ struct busyobj *busyobj;
+ struct stream_ctx *sctx;
+
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+ CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
+ busyobj = wrk->busyobj;
+ CHECK_OBJ_NOTNULL(wrk->sp, SESS_MAGIC);
+ CHECK_OBJ_NOTNULL(wrk->sp->req, REQ_MAGIC);
+ CHECK_OBJ_NOTNULL(wrk->sp->req->obj, OBJECT_MAGIC);
+ CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
+ sctx = wrk->sctx;
+
+ if (busyobj->use_locks)
+ Lck_Lock(&busyobj->vbo->mtx);
+ assert(sctx->stream_max <= busyobj->stream_max);
+
+ if (wrk->sp->req->obj->objcore == NULL ||
+ (wrk->sp->req->obj->objcore->flags & OC_F_PASS)) {
+ /* Give notice to backend fetch that we are finished
+ * with all chunks before this one */
+ busyobj->stream_frontchunk = sctx->stream_frontchunk;
+ }
+
+ sctx->stream_stopped = busyobj->stream_stopped;
+ sctx->stream_max = busyobj->stream_max;
+
+ if (busyobj->use_locks && !sctx->stream_stopped &&
+ sctx->stream_next == sctx->stream_max) {
+ while (!busyobj->stream_stopped &&
+ sctx->stream_max == busyobj->stream_max) {
+ Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx,
+ NULL);
+ }
+ sctx->stream_stopped = busyobj->stream_stopped;
+ sctx->stream_max = busyobj->stream_max;
+ }
+
+ if (busyobj->use_locks)
+ Lck_Unlock(&busyobj->vbo->mtx);
+}
--
1.7.4.1
More information about the varnish-dev
mailing list