[PATCH 12/25] Use background thread fetching when streaming
Martin Blix Grydeland
martin at varnish-software.com
Sun Jan 22 18:53:18 CET 2012
---
bin/varnishd/cache/cache.h | 5 ++
bin/varnishd/cache/cache_center.c | 96 +++++++++++++++++++++++++++++-----
bin/varnishd/cache/cache_response.c | 84 ++++++++++++++++++++++--------
3 files changed, 149 insertions(+), 36 deletions(-)
diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index da06965..880f5e3 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -537,6 +537,7 @@ struct busyobj {
unsigned do_gzip;
unsigned do_gunzip;
unsigned do_stream;
+ unsigned do_stream_flipflop;
/* Stream stuff */
ssize_t stream_max;
@@ -793,6 +794,8 @@ int FetchError(struct worker *w, const char *error);
int FetchError2(struct worker *w, const char *error, const char *more);
int FetchHdr(struct sess *sp, int need_host_hdr);
int FetchBody(struct worker *w, struct busyobj *bo);
+void FetchBodyBackground(struct sess *sp, struct busyobj *bo);
+void FetchBodyWait(struct busyobj *bo);
int FetchReqBody(const struct sess *sp);
void Fetch_Init(void);
@@ -999,6 +1002,8 @@ void WSL_Flush(struct worker *w, int overflow);
void RES_BuildHttp(const struct sess *sp);
void RES_WriteObj(struct sess *sp);
void RES_StreamStart(struct sess *sp);
+void RES_StreamBody(struct sess *sp);
+void RES_StreamWrite(struct sess *sp);
void RES_StreamEnd(struct sess *sp);
void RES_StreamPoll(struct worker *wrk);
diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c
index 386012c..1f1a239 100644
--- a/bin/varnishd/cache/cache_center.c
+++ b/bin/varnishd/cache/cache_center.c
@@ -963,13 +963,47 @@ DOT }
DOT streambody -> DONE [style=bold,color=cyan]
*/
+/* Background fetch task. Should be called with ref on busyobj, and
+ the objcore if present */
+
+static void
+cnt_streambody_task(struct worker *wrk, void *priv)
+{
+ struct object *obj;
+ struct objcore *objcore;
+ unsigned u;
+
+ AZ(wrk->busyobj);
+ CAST_OBJ_NOTNULL(wrk->busyobj, priv, BUSYOBJ_MAGIC);
+ AN(wrk->busyobj->use_locks);
+
+ CHECK_OBJ_NOTNULL(wrk->busyobj->fetch_obj, OBJECT_MAGIC);
+ AN(wrk->busyobj->vbc);
+ obj = wrk->busyobj->fetch_obj;
+ objcore = obj->objcore;
+
+ wrk->busyobj->fetch_failed = FetchBody(wrk, wrk->busyobj);
+ AZ(wrk->busyobj->fetch_obj);
+ AZ(wrk->busyobj->vbc);
+ wrk->busyobj->vfp = NULL;
+ VBO_StreamStopped(wrk->busyobj);
+
+ u = VBO_DerefBusyObj(wrk, &wrk->busyobj);
+ if (objcore != NULL || u == 0) {
+ /* Only deref object if it has it's own refcnt, or we
+ * were the last to deref the busyobj */
+ (void)HSH_Deref(wrk, NULL, &obj);
+ }
+}
+
static int
cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
{
- int i;
struct stream_ctx sctx;
uint8_t obuf[sp->wrk->res_mode & RES_GUNZIP ?
cache_param->gzip_stack_buffer : 1];
+ struct worker *wrk_ex;
+ unsigned u;
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
@@ -987,28 +1021,56 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
sctx.obuf_len = sizeof (obuf);
}
- RES_StreamStart(sp);
-
AssertObjCorePassOrBusy(req->obj->objcore);
+ RES_StreamStart(sp);
+ /* MBGXXX: Test on OC_F_BUSY to see if we should initiate
+ * fetch at all. This code now assumes all passes through here
+ * needs to do the fetch as well. (Multiple streaming clients
+ * not implemented yet) */
AZ(wrk->busyobj->fetch_obj);
wrk->busyobj->fetch_obj = req->obj;
- i = FetchBody(wrk, wrk->busyobj);
- AZ(wrk->busyobj->fetch_obj);
-
http_Setup(wrk->busyobj->bereq, NULL);
http_Setup(wrk->busyobj->beresp, NULL);
- wrk->busyobj->vfp = NULL;
- AZ(wrk->busyobj->vbc);
- AN(req->director);
+ wrk_ex = SES_GrabWorker(sp, 100); /* MBGXXX: Configurable
+ * thread grabbing
+ * timeout */
+ if (wrk_ex != NULL) {
+ /* Set up separate thread fetch */
+ wrk->busyobj->use_locks = 1;
+ if (req->obj->objcore != NULL)
+ /* Grab a ref on the objcore for the other thread */
+ HSH_Ref(req->obj->objcore);
+ VBO_RefBusyObj(wrk->busyobj); /* Ref for the other thread */
+ WRK_DoTask(wrk_ex, cnt_streambody_task, wrk->busyobj);
+ } else {
+ /* We have no worker */
+ if (wrk->busyobj->fetch_obj->objcore == NULL ||
+ wrk->busyobj->fetch_obj->objcore->flags & OC_F_PASS) {
+ /* It's a pass, prefer flipflop
+ * streaming. (MBGXXX: Flipflop not finished
+ * yet) */
+ wrk->busyobj->do_stream_flipflop = 1;
+ }
+ wrk->busyobj->fetch_failed = FetchBody(sp->wrk, wrk->busyobj);
+ VBO_StreamStopped(wrk->busyobj);
+ }
- if (!i && req->obj->objcore != NULL) {
+ RES_StreamBody(sp);
+
+ if (wrk->busyobj->htc.ws == wrk->ws)
+ /* Busyobj's htc has buffer on our workspace,
+ wait for it to be released */
+ VBO_StreamWait(wrk->busyobj);
+
+ if (wrk->busyobj->fetch_failed) {
+ req->doclose = "Stream error";
+ } else if (req->obj->objcore != NULL) {
+ /* MBGXXX: This should be done on the bg task */
EXP_Insert(req->obj);
AN(req->obj->objcore);
AN(req->obj->objcore->ban);
HSH_Unbusy(wrk);
- } else {
- req->doclose = "Stream error";
}
wrk->acct_tmp.fetch++;
req->director = NULL;
@@ -1021,8 +1083,14 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
wrk->sctx = NULL;
assert(WRW_IsReleased(wrk));
assert(wrk->wrw.ciov == wrk->wrw.siov);
- (void)HSH_Deref(wrk, NULL, &req->obj);
- (void)VBO_DerefBusyObj(wrk, &wrk->busyobj);
+ u = VBO_DerefBusyObj(wrk, &wrk->busyobj);
+ if (req->obj->objcore != NULL || u == 0) {
+ /* Only deref object if it has it's own refcnt, or we
+ * were the last to deref the busyobj */
+ (void)HSH_Deref(wrk, NULL, &req->obj);
+ } else
+ /* Object will be deref'ed by fetch thread */
+ req->obj = NULL;
http_Setup(req->resp, NULL);
sp->step = STP_DONE;
return (0);
diff --git a/bin/varnishd/cache/cache_response.c b/bin/varnishd/cache/cache_response.c
index cb6ddd6..a49acbe 100644
--- a/bin/varnishd/cache/cache_response.c
+++ b/bin/varnishd/cache/cache_response.c
@@ -366,22 +366,39 @@ RES_StreamStart(struct sess *sp)
}
void
-RES_StreamPoll(struct worker *wrk)
+RES_StreamBody(struct sess *sp)
{
struct stream_ctx *sctx;
+ struct busyobj *bo;
+
+ sctx = sp->wrk->sctx;
+ CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
+ bo = sp->wrk->busyobj;
+ CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
+ AN(sp->req->wantbody);
+
+ while (!sctx->stream_stopped || sctx->stream_next < sctx->stream_max) {
+ VBO_StreamSync(sp->wrk);
+ RES_StreamWrite(sp);
+ }
+}
+
+void
+RES_StreamWrite(struct sess *sp)
+{
+ struct worker *wrk;
+ struct stream_ctx *sctx;
struct storage *st;
- struct object *fetch_obj;
ssize_t l, l2, stlen;
void *ptr;
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ wrk = sp->wrk;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(wrk->sp->req->obj, OBJECT_MAGIC);
sctx = wrk->sctx;
CHECK_OBJ_NOTNULL(sctx, STREAM_CTX_MAGIC);
- VBO_StreamData(wrk->busyobj);
- VBO_StreamSync(wrk);
-
if (sctx->stream_max == sctx->stream_next)
return;
assert(sctx->stream_max > sctx->stream_next);
@@ -418,23 +435,6 @@ RES_StreamPoll(struct worker *wrk)
}
if (!(wrk->res_mode & RES_GUNZIP))
(void)WRW_Flush(wrk);
-
- if (wrk->busyobj->stream_frontchunk == NULL)
- return;
-
- /* It's a pass - remove chunks already delivered */
- fetch_obj = wrk->busyobj->fetch_obj;
- CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC);
- assert(fetch_obj->objcore == NULL ||
- (fetch_obj->objcore->flags & OC_F_PASS));
- while (1) {
- st = VTAILQ_FIRST(&fetch_obj->store);
- if (st == NULL || st == wrk->busyobj->stream_frontchunk)
- break;
- CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
- VTAILQ_REMOVE(&fetch_obj->store, st, list);
- STV_free(st);
- }
}
void
@@ -453,3 +453,43 @@ RES_StreamEnd(struct sess *sp)
if (WRW_FlushRelease(sp->wrk))
SES_Close(sp, "remote closed");
}
+
+void
+RES_StreamPoll(struct worker *wrk)
+{
+ struct object *fetch_obj;
+ struct storage *st;
+
+ CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
+
+ VBO_StreamData(wrk->busyobj);
+ if (wrk->busyobj->do_stream_flipflop == 1) {
+ AN(wrk->sctx);
+ /* MBGXXX: Do flip-flop streaming */
+ /* MBGXXX: Loop around waiting for the lag behind to
+ * be less than some configurable size, to keep the
+ * cache memory usage low (this for streaming
+ * extremely large objects with pass) */
+ VBO_StreamSync(wrk);
+ RES_StreamWrite(wrk->sp);
+ }
+
+ if (wrk->busyobj->stream_frontchunk == NULL)
+ return;
+
+ /* It's a pass - remove chunks already delivered. Should be OK
+ * to do lock-free, as we are not fiddling pointers of any
+ * storage chunk passed busyobj->stream_frontchunk */
+ fetch_obj = wrk->busyobj->fetch_obj;
+ CHECK_OBJ_NOTNULL(fetch_obj, OBJECT_MAGIC);
+ assert(fetch_obj->objcore == NULL ||
+ (fetch_obj->objcore->flags & OC_F_PASS));
+ while (1) {
+ st = VTAILQ_FIRST(&fetch_obj->store);
+ if (st == NULL || st == wrk->busyobj->stream_frontchunk)
+ break;
+ CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
+ VTAILQ_REMOVE(&fetch_obj->store, st, list);
+ STV_free(st);
+ }
+}
--
1.7.4.1
More information about the varnish-dev
mailing list