[PATCH 24/25] Thundering horde elimination
Martin Blix Grydeland
martin at varnish-software.com
Sun Jan 22 18:53:30 CET 2012
Deal with thundering horde problem by limiting the number of clients
allowed to wait for additional data on the stream to
stream_tokens. Only threads that acquire a token will be allowed to
wait on the condvar. Others will have to go on a queue. Tokens are
passed to queued clients when a token holding thread has sent some
data to the client.
The maximum wait time to get a token is controlled through the
stream_token_timeout run-time parameter. This to prevent a token
holding client that suddenly blocks from starving another client
waiting for a token.
---
bin/varnishd/cache/cache.h | 5 ++
bin/varnishd/cache/cache_busyobj.c | 96 +++++++++++++++++++++++++++++++++++-
bin/varnishd/cache/cache_center.c | 4 ++
bin/varnishd/cache/cache_vrt_var.c | 20 ++++++++
bin/varnishd/common/params.h | 2 +
bin/varnishd/mgt/mgt_param.c | 12 +++++
include/tbl/vsc_f_main.h | 3 +
lib/libvcl/generate.py | 6 ++
8 files changed, 146 insertions(+), 2 deletions(-)
diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index a64a0ae..b02bea6 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -278,6 +278,9 @@ struct stream_ctx {
/* Backend fetch has finished */
unsigned stream_stopped;
+
+ /* Are we currently holding a token from the busyobj */
+ unsigned has_token;
};
/*--------------------------------------------------------------------*/
@@ -545,6 +548,7 @@ struct busyobj {
volatile struct storage *stream_frontchunk;
unsigned stream_stopped;
ssize_t stream_pass_bufsize;
+ unsigned stream_tokens;
};
/* Object structure --------------------------------------------------*/
@@ -759,6 +763,7 @@ void VBO_StreamStopped(struct busyobj *busyobj);
void VBO_StreamWait(struct busyobj *busyobj);
void VBO_StreamData(struct busyobj *busyobj);
void VBO_StreamSync(struct worker *wrk);
+void VBO_ReleaseToken(struct worker *wrk, struct busyobj *busyobj);
/* cache_center.c [CNT] */
void CNT_Session(struct sess *sp);
diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c
index 8b97d4a..2b6ebc0 100644
--- a/bin/varnishd/cache/cache_busyobj.c
+++ b/bin/varnishd/cache/cache_busyobj.c
@@ -43,6 +43,7 @@ struct vbo {
#define VBO_MAGIC 0xde3d8223
struct lock mtx;
pthread_cond_t cond;
+ VTAILQ_HEAD(, worker) token_wait_queue;
unsigned refcount;
uint16_t nhttp;
struct busyobj bo;
@@ -51,6 +52,9 @@ struct vbo {
static struct lock vbo_mtx;
static struct vbo *nvbo;
+static void vbo_acquire_token(struct worker *wrk, struct busyobj *busyobj);
+static void vbo_release_token(struct worker *wrk, struct busyobj *busyobj);
+
void
VBO_Init(void)
{
@@ -82,6 +86,7 @@ vbo_New(void)
vbo->nhttp = nhttp;
Lck_New(&vbo->mtx, lck_busyobj);
AZ(pthread_cond_init(&vbo->cond, NULL));
+ VTAILQ_INIT(&vbo->token_wait_queue);
return (vbo);
}
@@ -145,6 +150,7 @@ VBO_GetBusyObj(struct worker *wrk)
vbo->bo.beresp = HTTP_create(p, vbo->nhttp);
vbo->bo.stream_pass_bufsize = cache_param->stream_pass_bufsize;
+ vbo->bo.stream_tokens = cache_param->stream_tokens;
return (&vbo->bo);
}
@@ -291,6 +297,11 @@ VBO_StreamSync(struct worker *wrk)
Lck_Lock(&busyobj->vbo->mtx);
assert(sctx->stream_max <= busyobj->stream_max);
+ if (sctx->has_token)
+ /* Release token to give other clients that has
+ * reached the end of data a chance */
+ vbo_release_token(wrk, busyobj);
+
if (wrk->sp->req->obj->objcore == NULL ||
(wrk->sp->req->obj->objcore->flags & OC_F_PASS)) {
/* Give notice to backend fetch that we are finished
@@ -303,8 +314,11 @@ VBO_StreamSync(struct worker *wrk)
sctx->stream_stopped = busyobj->stream_stopped;
sctx->stream_max = busyobj->stream_max;
- if (busyobj->use_locks && sctx->stream_next == sctx->stream_max) {
- while (!busyobj->stream_stopped &&
+ if (busyobj->use_locks && !sctx->stream_stopped &&
+ sctx->stream_next == sctx->stream_max) {
+ /* We've exhausted available data, wait for more */
+ vbo_acquire_token(wrk, busyobj);
+ while (sctx->has_token && !busyobj->stream_stopped &&
sctx->stream_max == busyobj->stream_max) {
Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx,
NULL);
@@ -316,3 +330,81 @@ VBO_StreamSync(struct worker *wrk)
if (busyobj->use_locks)
Lck_Unlock(&busyobj->vbo->mtx);
}
+
+/* Acquire a token for the worker from the busyobj. If none available,
+ wait for stream_token_timeout ms on the queue.
+ wrk->sctx->has_token will be true if succesful */
+static void
+vbo_acquire_token(struct worker *wrk, struct busyobj *busyobj)
+{
+ struct timespec ts;
+
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+ CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
+ AZ(wrk->sctx->has_token);
+ CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+ AN(busyobj->use_locks);
+
+ if (busyobj->stream_tokens > 0) {
+ busyobj->stream_tokens--;
+ wrk->sctx->has_token = 1;
+ return;
+ }
+
+ AZ(clock_gettime(CLOCK_REALTIME, &ts));
+ ts.tv_sec += cache_param->stream_token_timeout / 1000;
+ ts.tv_nsec += (cache_param->stream_token_timeout % 1000) * 1000000;
+ if (ts.tv_nsec >= 1000000000) {
+ ts.tv_sec++;
+ ts.tv_nsec -= 1000000000;
+ }
+ VTAILQ_INSERT_TAIL(&busyobj->vbo->token_wait_queue, wrk, list);
+ Lck_CondWait(&wrk->cond, &busyobj->vbo->mtx, &ts);
+ if (wrk->sctx->has_token == 0) {
+ VTAILQ_REMOVE(&busyobj->vbo->token_wait_queue, wrk, list);
+ wrk->stats.n_tokentimeout++;
+ }
+}
+
+/* Release our currently held token, passing it on to the first on the
+ queue if the queue is non-empty. Resets wrk->sctx->has_token */
+static void
+vbo_release_token(struct worker *wrk, struct busyobj *busyobj)
+{
+ struct worker *wrk_waiting;
+
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+ CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
+ AN(wrk->sctx->has_token);
+ CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+ AN(busyobj->use_locks);
+
+ wrk_waiting = VTAILQ_FIRST(&busyobj->vbo->token_wait_queue);
+ if (wrk_waiting != NULL) {
+ /* Transfer our token to the first on the waiting
+ list, and wake it */
+ CHECK_OBJ_NOTNULL(wrk_waiting, WORKER_MAGIC);
+ CHECK_OBJ_NOTNULL(wrk_waiting->sctx, STREAM_CTX_MAGIC);
+ VTAILQ_REMOVE(&busyobj->vbo->token_wait_queue, wrk_waiting,
+ list);
+ AZ(wrk_waiting->sctx->has_token);
+ wrk_waiting->sctx->has_token = 1;
+ AZ(pthread_cond_signal(&wrk_waiting->cond));
+ } else {
+ busyobj->stream_tokens++;
+ }
+
+ wrk->sctx->has_token = 0;
+}
+
+void
+VBO_ReleaseToken(struct worker *wrk, struct busyobj *busyobj)
+{
+ CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+
+ if (busyobj->use_locks == 0)
+ return;
+ Lck_Lock(&busyobj->vbo->mtx);
+ vbo_release_token(wrk, busyobj);
+ Lck_Unlock(&busyobj->vbo->mtx);
+}
diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c
index 96e23fc..5daa949 100644
--- a/bin/varnishd/cache/cache_center.c
+++ b/bin/varnishd/cache/cache_center.c
@@ -1084,6 +1084,10 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
else
AN(wrk->sctx->stream_stopped);
+ if (wrk->sctx->has_token)
+ VBO_ReleaseToken(wrk, wrk->busyobj);
+ AZ(wrk->sctx->has_token);
+
if (wrk->busyobj->htc.ws == wrk->ws) {
/* Busyobj's htc has buffer on our workspace,
wait for it to be released */
diff --git a/bin/varnishd/cache/cache_vrt_var.c b/bin/varnishd/cache/cache_vrt_var.c
index e655abf..82d77c6 100644
--- a/bin/varnishd/cache/cache_vrt_var.c
+++ b/bin/varnishd/cache/cache_vrt_var.c
@@ -320,6 +320,26 @@ VRT_l_beresp_stream_pass_bufsize(const struct sess *sp, double val)
sp->wrk->busyobj->stream_pass_bufsize = 0;
}
+int
+VRT_r_beresp_stream_tokens(const struct sess *sp)
+{
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ CHECK_OBJ_NOTNULL(sp->wrk->busyobj, BUSYOBJ_MAGIC);
+ return (sp->wrk->busyobj->stream_tokens);
+}
+
+void
+VRT_l_beresp_stream_tokens(const struct sess *sp, int val)
+{
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ CHECK_OBJ_NOTNULL(sp->wrk->busyobj, BUSYOBJ_MAGIC);
+ if (val >= 1)
+ sp->wrk->busyobj->stream_tokens = val;
+ else
+ sp->wrk->busyobj->stream_tokens = 1;
+}
+
+
/*--------------------------------------------------------------------*/
void
diff --git a/bin/varnishd/common/params.h b/bin/varnishd/common/params.h
index 20b346a..a68d608 100644
--- a/bin/varnishd/common/params.h
+++ b/bin/varnishd/common/params.h
@@ -101,6 +101,8 @@ struct params {
ssize_t stream_maxchunksize;
unsigned stream_grab_timeout;
ssize_t stream_pass_bufsize;
+ unsigned stream_tokens;
+ unsigned stream_token_timeout;
unsigned nuke_limit;
diff --git a/bin/varnishd/mgt/mgt_param.c b/bin/varnishd/mgt/mgt_param.c
index 3a64b96..e5d6888 100644
--- a/bin/varnishd/mgt/mgt_param.c
+++ b/bin/varnishd/mgt/mgt_param.c
@@ -858,6 +858,18 @@ static const struct parspec input_parspec[] = {
"Zero means unlimited.\n",
EXPERIMENTAL,
"10mb", "bytes" },
+ { "stream_tokens",
+ tweak_uint, &mgt_param.stream_tokens, 1, UINT_MAX,
+ "Default number of tokens available for racing streaming "
+ "clients.\n",
+ EXPERIMENTAL,
+ "10", "tokens" },
+ { "stream_token_timeout",
+ tweak_uint, &mgt_param.stream_token_timeout, 1, UINT_MAX,
+ "Timeout for acquiring token during streaming and waiting "
+ "for more data.\n",
+ EXPERIMENTAL,
+ "100", "ms" },
#ifdef SENDFILE_WORKS
{ "sendfile_threshold",
tweak_bytes, &mgt_param.sendfile_threshold, 0, HUGE_VAL,
diff --git a/include/tbl/vsc_f_main.h b/include/tbl/vsc_f_main.h
index 5761c51..415b07d 100644
--- a/include/tbl/vsc_f_main.h
+++ b/include/tbl/vsc_f_main.h
@@ -262,6 +262,9 @@ VSC_F(n_objwrite, uint64_t, 0, 'a', "Objects sent with write",
"or if the sendfile call has been disabled")
VSC_F(n_objoverflow, uint64_t, 1, 'a',
"Objects overflowing workspace", "")
+VSC_F(n_tokentimeout, uint64_t, 1, 'a', "Token wait timeouts",
+ "The number of times a fast streaming client has timed out waiting for "
+ "token to be notified about new incoming data.")
VSC_F(s_sess, uint64_t, 1, 'a', "Total Sessions", "")
VSC_F(s_req, uint64_t, 1, 'a', "Total Requests", "")
diff --git a/lib/libvcl/generate.py b/lib/libvcl/generate.py
index 51b2294..1dab49d 100755
--- a/lib/libvcl/generate.py
+++ b/lib/libvcl/generate.py
@@ -337,6 +337,12 @@ sp_variables = (
( 'fetch',),
'const struct sess *'
),
+ ('beresp.stream_tokens',
+ 'INT',
+ ( 'fetch',),
+ ( 'fetch',),
+ 'const struct sess *'
+ ),
('beresp.ttl',
'DURATION',
( 'fetch',),
--
1.7.4.1
More information about the varnish-dev
mailing list