[PATCH 15/25] Make streaming work with multiple streaming clients
Martin Blix Grydeland
martin at varnish-software.com
Sun Jan 22 18:53:21 CET 2012
---
bin/varnishd/cache/cache_center.c | 117 ++++++++++++++++++++++---------------
bin/varnishd/cache/cache_expire.c | 1 -
bin/varnishd/cache/cache_fetch.c | 2 -
bin/varnishd/cache/cache_hash.c | 33 ++++++++++-
bin/varnishd/hash/hash_slinger.h | 1 +
5 files changed, 103 insertions(+), 51 deletions(-)
diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c
index 7a9134e..23b5f71 100644
--- a/bin/varnishd/cache/cache_center.c
+++ b/bin/varnishd/cache/cache_center.c
@@ -224,7 +224,6 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct req *req)
if (wrk->busyobj != NULL) {
CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
AN(wrk->busyobj->do_stream);
- AssertObjCorePassOrBusy(req->obj->objcore);
}
wrk->res_mode = 0;
@@ -301,14 +300,17 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct req *req)
case VCL_RET_RESTART:
if (req->restarts >= cache_param->max_restarts)
break;
- if (wrk->busyobj != NULL) {
+ if (wrk->busyobj != NULL &&
+ (req->obj->objcore == NULL ||
+ req->obj->objcore->flags & OC_F_BUSY)) {
AN(wrk->busyobj->do_stream);
VDI_CloseFd(wrk, &wrk->busyobj->vbc);
HSH_Drop(wrk);
- VBO_DerefBusyObj(wrk, &wrk->busyobj);
} else {
(void)HSH_Deref(wrk, NULL, &req->obj);
}
+ if (wrk->busyobj != NULL)
+ (void)VBO_DerefBusyObj(wrk, &wrk->busyobj);
AZ(req->obj);
req->restarts++;
req->director = NULL;
@@ -318,8 +320,8 @@ cnt_prepresp(struct sess *sp, struct worker *wrk, struct req *req)
default:
WRONG("Illegal action in vcl_deliver{}");
}
- if (wrk->busyobj != NULL && wrk->busyobj->do_stream) {
- AssertObjCorePassOrBusy(req->obj->objcore);
+ if (wrk->busyobj != NULL) {
+ AN(wrk->busyobj->do_stream);
sp->step = STP_STREAMBODY;
} else {
sp->step = STP_DELIVER;
@@ -983,13 +985,18 @@ cnt_streambody_task(struct worker *wrk, void *priv)
objcore = obj->objcore;
wrk->busyobj->fetch_failed = FetchBody(wrk, wrk->busyobj);
+ VBO_StreamStopped(wrk->busyobj);
wrk->stats.fetch_threaded++;
+ if (obj->objcore != NULL) {
+ HSH_RemoveBusyObj(wrk, obj->objcore);
+ if (wrk->busyobj->fetch_failed == 0)
+ EXP_Insert(obj);
+ }
AZ(wrk->busyobj->fetch_obj);
AZ(wrk->busyobj->vbc);
wrk->busyobj->vfp = NULL;
- VBO_StreamStopped(wrk->busyobj);
u = VBO_DerefBusyObj(wrk, &wrk->busyobj);
if (objcore != NULL || u == 0) {
@@ -1013,6 +1020,7 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
CHECK_OBJ_NOTNULL(wrk->busyobj, BUSYOBJ_MAGIC);
+ AN(wrk->busyobj->do_stream);
memset(&sctx, 0, sizeof sctx);
sctx.magic = STREAM_CTX_MAGIC;
AZ(wrk->sctx);
@@ -1024,58 +1032,72 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
sctx.obuf_len = sizeof (obuf);
}
- AssertObjCorePassOrBusy(req->obj->objcore);
RES_StreamStart(sp);
- /* MBGXXX: Test on OC_F_BUSY to see if we should initiate
- * fetch at all. This code now assumes all passes through here
- * needs to do the fetch as well. (Multiple streaming clients
- * not implemented yet) */
- wrk->acct_tmp.fetch++;
- AZ(wrk->busyobj->fetch_obj);
- wrk->busyobj->fetch_obj = req->obj;
- http_Setup(wrk->busyobj->bereq, NULL);
- http_Setup(wrk->busyobj->beresp, NULL);
- wrk_ex = SES_GrabWorker(sp, 100); /* MBGXXX: Configurable
- * thread grabbing
- * timeout */
- if (wrk_ex != NULL) {
- /* Set up separate thread fetch */
- wrk->busyobj->use_locks = 1;
- if (req->obj->objcore != NULL)
- /* Grab a ref on the objcore for the other thread */
- HSH_Ref(req->obj->objcore);
- VBO_RefBusyObj(wrk->busyobj); /* Ref for the other thread */
- WRK_DoTask(wrk_ex, cnt_streambody_task, wrk->busyobj);
- } else {
- /* We have no worker */
- if (wrk->busyobj->fetch_obj->objcore == NULL ||
- wrk->busyobj->fetch_obj->objcore->flags & OC_F_PASS) {
- /* It's a pass, prefer flipflop
- * streaming. (MBGXXX: Flipflop not finished
- * yet) */
- wrk->busyobj->do_stream_flipflop = 1;
- wrk->stats.fetch_flipflop++;
+ if (req->obj->objcore == NULL || req->obj->objcore->flags & OC_F_BUSY) {
+ /* Initiate a fetch of the body */
+ wrk->acct_tmp.fetch++;
+ AZ(wrk->busyobj->fetch_obj);
+ wrk->busyobj->fetch_obj = req->obj;
+ http_Setup(wrk->busyobj->bereq, NULL);
+ http_Setup(wrk->busyobj->beresp, NULL);
+ wrk_ex = SES_GrabWorker(sp, 100); /* MBGXXX: Configurable
+ * thread grabbing
+ * timeout */
+ if (wrk_ex != NULL)
+ wrk->busyobj->use_locks = 1;
+ if (req->obj->objcore != NULL) {
+ AN(req->obj->objcore->ban);
+ HSH_Unbusy(wrk);
+ AN(req->obj->objcore->busyobj);
+ }
+ if (wrk_ex != NULL) {
+ /* Set up separate thread fetch */
+ if (req->obj->objcore != NULL)
+ /* Grab a ref on the objcore for the
+ * other thread */
+ HSH_Ref(req->obj->objcore);
+ WRK_DoTask(wrk_ex, cnt_streambody_task,
+ VBO_RefBusyObj(wrk->busyobj));
+ } else {
+ /* We have no fetch worker */
+ if (req->obj->objcore == NULL ||
+ req->obj->objcore->flags & OC_F_PASS) {
+ /* It's a pass, prefer flipflop
+ * streaming. (MBGXXX: Flipflop not
+ * finished yet) */
+ wrk->busyobj->do_stream_flipflop = 1;
+ wrk->stats.fetch_flipflop++;
+ }
+ wrk->busyobj->fetch_failed =
+ FetchBody(wrk, wrk->busyobj);
+ VBO_StreamStopped(wrk->busyobj);
+ if (req->obj->objcore != NULL) {
+ HSH_RemoveBusyObj(wrk, req->obj->objcore);
+ if (wrk->busyobj->fetch_failed == 0)
+ EXP_Insert(req->obj);
+ }
+ AZ(wrk->busyobj->fetch_obj);
+ AZ(wrk->busyobj->vbc);
+ wrk->busyobj->vfp = NULL;
+ VBO_StreamSync(wrk);
}
- wrk->busyobj->fetch_failed = FetchBody(sp->wrk, wrk->busyobj);
- VBO_StreamStopped(wrk->busyobj);
}
- RES_StreamBody(sp);
+ if (wrk->busyobj->do_stream_flipflop == 0)
+ RES_StreamBody(sp);
+ else
+ AN(wrk->sctx->stream_stopped);
- if (wrk->busyobj->htc.ws == wrk->ws)
+ if (wrk->busyobj->htc.ws == wrk->ws) {
/* Busyobj's htc has buffer on our workspace,
wait for it to be released */
+ AZ(wrk->busyobj->do_stream_flipflop);
VBO_StreamWait(wrk->busyobj);
+ }
if (wrk->busyobj->fetch_failed) {
req->doclose = "Stream error";
- } else if (req->obj->objcore != NULL) {
- /* MBGXXX: This should be done on the bg task */
- EXP_Insert(req->obj);
- AN(req->obj->objcore);
- AN(req->obj->objcore->ban);
- HSH_Unbusy(wrk);
}
req->director = NULL;
req->restarts = 0;
@@ -1169,7 +1191,6 @@ cnt_hit(struct sess *sp, struct worker *wrk, struct req *req)
CHECK_OBJ_NOTNULL(req->obj, OBJECT_MAGIC);
CHECK_OBJ_NOTNULL(req->vcl, VCL_CONF_MAGIC);
- AZ(wrk->busyobj);
assert(!(req->obj->objcore->flags & OC_F_PASS));
@@ -1315,6 +1336,8 @@ cnt_lookup(struct sess *sp, struct worker *wrk, struct req *req)
wrk->stats.cache_hitpass++;
WSP(sp, SLT_HitPass, "%u", req->obj->xid);
(void)HSH_Deref(wrk, NULL, &req->obj);
+ if (wrk->busyobj != NULL)
+ (void)VBO_DerefBusyObj(wrk, &wrk->busyobj);
req->objcore = NULL;
sp->step = STP_PASS;
return (0);
diff --git a/bin/varnishd/cache/cache_expire.c b/bin/varnishd/cache/cache_expire.c
index e93a1aa..212adf3 100644
--- a/bin/varnishd/cache/cache_expire.c
+++ b/bin/varnishd/cache/cache_expire.c
@@ -224,7 +224,6 @@ EXP_Insert(struct object *o)
CHECK_OBJ_NOTNULL(o, OBJECT_MAGIC);
oc = o->objcore;
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
- AssertObjBusy(o);
HSH_Ref(oc);
assert(o->exp.entered != 0 && !isnan(o->exp.entered));
diff --git a/bin/varnishd/cache/cache_fetch.c b/bin/varnishd/cache/cache_fetch.c
index d5ae4d5..0643389 100644
--- a/bin/varnishd/cache/cache_fetch.c
+++ b/bin/varnishd/cache/cache_fetch.c
@@ -507,8 +507,6 @@ FetchBody(struct worker *wrk, struct busyobj *bo)
if (bo->vfp == NULL)
bo->vfp = &vfp_nop;
- AssertObjCorePassOrBusy(obj->objcore);
-
AZ(bo->vgz_rx);
AZ(VTAILQ_FIRST(&obj->store));
diff --git a/bin/varnishd/cache/cache_hash.c b/bin/varnishd/cache/cache_hash.c
index e78eb60..a12d43a 100644
--- a/bin/varnishd/cache/cache_hash.c
+++ b/bin/varnishd/cache/cache_hash.c
@@ -63,6 +63,8 @@
static const struct hash_slinger *hash;
+static void hsh_rush(struct objhead *oh);
+
/*---------------------------------------------------------------------*/
/* Precreate an objhead and object for later use */
void
@@ -414,6 +416,13 @@ HSH_Lookup(struct sess *sp, struct objhead **poh)
if (o->hits < INT_MAX)
o->hits++;
assert(oh->refcnt > 1);
+ if (oc->busyobj != NULL) {
+ /* It's streamable */
+ CHECK_OBJ_NOTNULL(oc->busyobj, BUSYOBJ_MAGIC);
+ wrk->busyobj = VBO_RefBusyObj(oc->busyobj);
+ if (oh->waitinglist != NULL)
+ hsh_rush(oh);
+ }
Lck_Unlock(&oh->mtx);
assert(hash->deref(oh));
*poh = oh;
@@ -623,7 +632,7 @@ HSH_Unbusy(struct worker *wrk)
VTAILQ_REMOVE(&oh->objcs, oc, list);
VTAILQ_INSERT_HEAD(&oh->objcs, oc, list);
oc->flags &= ~OC_F_BUSY;
- if (oc->busyobj != NULL)
+ if (oc->busyobj->do_stream == 0)
(void)VBO_DerefBusyObj(wrk, &oc->busyobj);
if (oh->waitinglist != NULL)
hsh_rush(oh);
@@ -632,6 +641,28 @@ HSH_Unbusy(struct worker *wrk)
assert(oc_getobj(wrk, oc) == o);
}
+/*---------------------------------------------------------------------
+ * Drop the objcore's ref on the busyobj while holding the objhead mutex
+ */
+
+void
+HSH_RemoveBusyObj(struct worker *wrk, struct objcore *oc)
+{
+ struct objhead *oh;
+
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+ CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
+ oh = oc->objhead;
+ CHECK_OBJ_NOTNULL(oh, OBJHEAD_MAGIC);
+
+ AZ(oc->flags & OC_F_BUSY);
+ AN(oc->busyobj);
+ assert(oc->busyobj->do_stream == 0 || oc->busyobj->stream_stopped == 1);
+ Lck_Lock(&oh->mtx);
+ (void)VBO_DerefBusyObj(wrk, &oc->busyobj);
+ Lck_Unlock(&oh->mtx);
+}
+
void
HSH_Ref(struct objcore *oc)
{
diff --git a/bin/varnishd/hash/hash_slinger.h b/bin/varnishd/hash/hash_slinger.h
index b45e604..8affa42 100644
--- a/bin/varnishd/hash/hash_slinger.h
+++ b/bin/varnishd/hash/hash_slinger.h
@@ -55,6 +55,7 @@ void HSH_Prealloc(const struct sess *sp);
void HSH_Cleanup(struct worker *w);
struct objcore *HSH_Lookup(struct sess *sp, struct objhead **poh);
void HSH_Unbusy(struct worker *wrk);
+void HSH_RemoveBusyObj(struct worker *wrk, struct objcore *oc);
void HSH_Ref(struct objcore *o);
void HSH_Drop(struct worker *wrk);
void HSH_Init(const struct hash_slinger *slinger);
--
1.7.4.1
More information about the varnish-dev
mailing list