[PATCH 24/25] Thundering horde elimination

Martin Blix Grydeland martin at varnish-software.com
Sun Jan 22 18:53:30 CET 2012


Deal with thundering horde problem by limiting the number of clients
allowed to wait for additional data on the stream to
stream_tokens. Only threads that acquire a token will be allowed to
wait on the condvar. Others will have to go on a queue. Tokens are
passed to queued clients when a token holding thread has sent some
data to the client.

The maximum wait time to get a token is controlled through the
stream_token_timeout run-time parameter. This to prevent a token
holding client that suddenly blocks from starving another client
waiting for a token.
---
 bin/varnishd/cache/cache.h         |    5 ++
 bin/varnishd/cache/cache_busyobj.c |   96 +++++++++++++++++++++++++++++++++++-
 bin/varnishd/cache/cache_center.c  |    4 ++
 bin/varnishd/cache/cache_vrt_var.c |   20 ++++++++
 bin/varnishd/common/params.h       |    2 +
 bin/varnishd/mgt/mgt_param.c       |   12 +++++
 include/tbl/vsc_f_main.h           |    3 +
 lib/libvcl/generate.py             |    6 ++
 8 files changed, 146 insertions(+), 2 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index a64a0ae..b02bea6 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -278,6 +278,9 @@ struct stream_ctx {
 
 	/* Backend fetch has finished */
 	unsigned		stream_stopped;
+
+	/* Are we currently holding a token from the busyobj */
+	unsigned		has_token;
 };
 
 /*--------------------------------------------------------------------*/
@@ -545,6 +548,7 @@ struct busyobj {
 	volatile struct storage	*stream_frontchunk;
 	unsigned		stream_stopped;
 	ssize_t			stream_pass_bufsize;
+	unsigned		stream_tokens;
 };
 
 /* Object structure --------------------------------------------------*/
@@ -759,6 +763,7 @@ void VBO_StreamStopped(struct busyobj *busyobj);
 void VBO_StreamWait(struct busyobj *busyobj);
 void VBO_StreamData(struct busyobj *busyobj);
 void VBO_StreamSync(struct worker *wrk);
+void VBO_ReleaseToken(struct worker *wrk, struct busyobj *busyobj);
 
 /* cache_center.c [CNT] */
 void CNT_Session(struct sess *sp);
diff --git a/bin/varnishd/cache/cache_busyobj.c b/bin/varnishd/cache/cache_busyobj.c
index 8b97d4a..2b6ebc0 100644
--- a/bin/varnishd/cache/cache_busyobj.c
+++ b/bin/varnishd/cache/cache_busyobj.c
@@ -43,6 +43,7 @@ struct vbo {
 #define VBO_MAGIC		0xde3d8223
 	struct lock		mtx;
 	pthread_cond_t		cond;
+	VTAILQ_HEAD(, worker)	token_wait_queue;
 	unsigned		refcount;
 	uint16_t		nhttp;
 	struct busyobj		bo;
@@ -51,6 +52,9 @@ struct vbo {
 static struct lock vbo_mtx;
 static struct vbo *nvbo;
 
+static void vbo_acquire_token(struct worker *wrk, struct busyobj *busyobj);
+static void vbo_release_token(struct worker *wrk, struct busyobj *busyobj);
+
 void
 VBO_Init(void)
 {
@@ -82,6 +86,7 @@ vbo_New(void)
 	vbo->nhttp = nhttp;
 	Lck_New(&vbo->mtx, lck_busyobj);
 	AZ(pthread_cond_init(&vbo->cond, NULL));
+	VTAILQ_INIT(&vbo->token_wait_queue);
 	return (vbo);
 }
 
@@ -145,6 +150,7 @@ VBO_GetBusyObj(struct worker *wrk)
 	vbo->bo.beresp = HTTP_create(p, vbo->nhttp);
 
 	vbo->bo.stream_pass_bufsize = cache_param->stream_pass_bufsize;
+	vbo->bo.stream_tokens = cache_param->stream_tokens;
 
 	return (&vbo->bo);
 }
@@ -291,6 +297,11 @@ VBO_StreamSync(struct worker *wrk)
 		Lck_Lock(&busyobj->vbo->mtx);
 	assert(sctx->stream_max <= busyobj->stream_max);
 
+	if (sctx->has_token)
+		/* Release token to give other clients that has
+		 * reached the end of data a chance */
+		vbo_release_token(wrk, busyobj);
+
 	if (wrk->sp->req->obj->objcore == NULL ||
 	    (wrk->sp->req->obj->objcore->flags & OC_F_PASS)) {
 		/* Give notice to backend fetch that we are finished
@@ -303,8 +314,11 @@ VBO_StreamSync(struct worker *wrk)
 	sctx->stream_stopped = busyobj->stream_stopped;
 	sctx->stream_max = busyobj->stream_max;
 
-	if (busyobj->use_locks && sctx->stream_next == sctx->stream_max) {
-		while (!busyobj->stream_stopped &&
+	if (busyobj->use_locks && !sctx->stream_stopped &&
+	    sctx->stream_next == sctx->stream_max) {
+		/* We've exhausted available data, wait for more */
+		vbo_acquire_token(wrk, busyobj);
+		while (sctx->has_token && !busyobj->stream_stopped &&
 		       sctx->stream_max == busyobj->stream_max) {
 			Lck_CondWait(&busyobj->vbo->cond, &busyobj->vbo->mtx,
 				     NULL);
@@ -316,3 +330,81 @@ VBO_StreamSync(struct worker *wrk)
 	if (busyobj->use_locks)
 		Lck_Unlock(&busyobj->vbo->mtx);
 }
+
+/* Acquire a token for the worker from the busyobj. If none available,
+   wait for stream_token_timeout ms on the queue.
+   wrk->sctx->has_token will be true if succesful */
+static void
+vbo_acquire_token(struct worker *wrk, struct busyobj *busyobj)
+{
+	struct timespec ts;
+
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+	CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
+	AZ(wrk->sctx->has_token);
+	CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+	AN(busyobj->use_locks);
+
+	if (busyobj->stream_tokens > 0) {
+		busyobj->stream_tokens--;
+		wrk->sctx->has_token = 1;
+		return;
+	}
+
+	AZ(clock_gettime(CLOCK_REALTIME, &ts));
+	ts.tv_sec += cache_param->stream_token_timeout / 1000;
+	ts.tv_nsec += (cache_param->stream_token_timeout % 1000) * 1000000;
+	if (ts.tv_nsec >= 1000000000) {
+		ts.tv_sec++;
+		ts.tv_nsec -= 1000000000;
+	}
+	VTAILQ_INSERT_TAIL(&busyobj->vbo->token_wait_queue, wrk, list);
+	Lck_CondWait(&wrk->cond, &busyobj->vbo->mtx, &ts);
+	if (wrk->sctx->has_token == 0) {
+		VTAILQ_REMOVE(&busyobj->vbo->token_wait_queue, wrk, list);
+		wrk->stats.n_tokentimeout++;
+	}
+}
+
+/* Release our currently held token, passing it on to the first on the
+   queue if the queue is non-empty. Resets wrk->sctx->has_token */
+static void
+vbo_release_token(struct worker *wrk, struct busyobj *busyobj)
+{
+	struct worker *wrk_waiting;
+
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+	CHECK_OBJ_NOTNULL(wrk->sctx, STREAM_CTX_MAGIC);
+	AN(wrk->sctx->has_token);
+	CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+	AN(busyobj->use_locks);
+
+	wrk_waiting = VTAILQ_FIRST(&busyobj->vbo->token_wait_queue);
+	if (wrk_waiting != NULL) {
+		/* Transfer our token to the first on the waiting
+		   list, and wake it */
+		CHECK_OBJ_NOTNULL(wrk_waiting, WORKER_MAGIC);
+		CHECK_OBJ_NOTNULL(wrk_waiting->sctx, STREAM_CTX_MAGIC);
+		VTAILQ_REMOVE(&busyobj->vbo->token_wait_queue, wrk_waiting,
+			      list);
+		AZ(wrk_waiting->sctx->has_token);
+		wrk_waiting->sctx->has_token = 1;
+		AZ(pthread_cond_signal(&wrk_waiting->cond));
+	} else {
+		busyobj->stream_tokens++;
+	}
+
+	wrk->sctx->has_token = 0;
+}
+
+void
+VBO_ReleaseToken(struct worker *wrk, struct busyobj *busyobj)
+{
+	CHECK_OBJ_NOTNULL(busyobj, BUSYOBJ_MAGIC);
+
+	if (busyobj->use_locks == 0)
+		return;
+	Lck_Lock(&busyobj->vbo->mtx);
+	vbo_release_token(wrk, busyobj);
+	Lck_Unlock(&busyobj->vbo->mtx);
+}
diff --git a/bin/varnishd/cache/cache_center.c b/bin/varnishd/cache/cache_center.c
index 96e23fc..5daa949 100644
--- a/bin/varnishd/cache/cache_center.c
+++ b/bin/varnishd/cache/cache_center.c
@@ -1084,6 +1084,10 @@ cnt_streambody(struct sess *sp, struct worker *wrk, struct req *req)
 	else
 		AN(wrk->sctx->stream_stopped);
 
+	if (wrk->sctx->has_token)
+		VBO_ReleaseToken(wrk, wrk->busyobj);
+	AZ(wrk->sctx->has_token);
+
 	if (wrk->busyobj->htc.ws == wrk->ws) {
 		/* Busyobj's htc has buffer on our workspace,
 		   wait for it to be released */
diff --git a/bin/varnishd/cache/cache_vrt_var.c b/bin/varnishd/cache/cache_vrt_var.c
index e655abf..82d77c6 100644
--- a/bin/varnishd/cache/cache_vrt_var.c
+++ b/bin/varnishd/cache/cache_vrt_var.c
@@ -320,6 +320,26 @@ VRT_l_beresp_stream_pass_bufsize(const struct sess *sp, double val)
 		sp->wrk->busyobj->stream_pass_bufsize = 0;
 }
 
+int
+VRT_r_beresp_stream_tokens(const struct sess *sp)
+{
+	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+	CHECK_OBJ_NOTNULL(sp->wrk->busyobj, BUSYOBJ_MAGIC);
+	return (sp->wrk->busyobj->stream_tokens);
+}
+
+void
+VRT_l_beresp_stream_tokens(const struct sess *sp, int val)
+{
+	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+	CHECK_OBJ_NOTNULL(sp->wrk->busyobj, BUSYOBJ_MAGIC);
+	if (val >= 1)
+		sp->wrk->busyobj->stream_tokens = val;
+	else
+		sp->wrk->busyobj->stream_tokens = 1;
+}
+
+
 /*--------------------------------------------------------------------*/
 
 void
diff --git a/bin/varnishd/common/params.h b/bin/varnishd/common/params.h
index 20b346a..a68d608 100644
--- a/bin/varnishd/common/params.h
+++ b/bin/varnishd/common/params.h
@@ -101,6 +101,8 @@ struct params {
 	ssize_t			stream_maxchunksize;
 	unsigned		stream_grab_timeout;
 	ssize_t			stream_pass_bufsize;
+	unsigned		stream_tokens;
+	unsigned		stream_token_timeout;
 
 	unsigned		nuke_limit;
 
diff --git a/bin/varnishd/mgt/mgt_param.c b/bin/varnishd/mgt/mgt_param.c
index 3a64b96..e5d6888 100644
--- a/bin/varnishd/mgt/mgt_param.c
+++ b/bin/varnishd/mgt/mgt_param.c
@@ -858,6 +858,18 @@ static const struct parspec input_parspec[] = {
 		"Zero means unlimited.\n",
 		EXPERIMENTAL,
 		"10mb", "bytes" },
+	{ "stream_tokens",
+		tweak_uint, &mgt_param.stream_tokens, 1, UINT_MAX,
+		"Default number of tokens available for racing streaming "
+		"clients.\n",
+		EXPERIMENTAL,
+		"10", "tokens" },
+	{ "stream_token_timeout",
+		tweak_uint, &mgt_param.stream_token_timeout, 1, UINT_MAX,
+		"Timeout for acquiring token during streaming and waiting "
+		"for more data.\n",
+		EXPERIMENTAL,
+		"100", "ms" },
 #ifdef SENDFILE_WORKS
 	{ "sendfile_threshold",
 		tweak_bytes, &mgt_param.sendfile_threshold, 0, HUGE_VAL,
diff --git a/include/tbl/vsc_f_main.h b/include/tbl/vsc_f_main.h
index 5761c51..415b07d 100644
--- a/include/tbl/vsc_f_main.h
+++ b/include/tbl/vsc_f_main.h
@@ -262,6 +262,9 @@ VSC_F(n_objwrite,		uint64_t, 0, 'a', "Objects sent with write",
       "or if the sendfile call has been disabled")
 VSC_F(n_objoverflow,	uint64_t, 1, 'a',
 					"Objects overflowing workspace", "")
+VSC_F(n_tokentimeout,	uint64_t, 1, 'a', "Token wait timeouts",
+      "The number of times a fast streaming client has timed out waiting for "
+      "token to be notified about new incoming data.")
 
 VSC_F(s_sess,			uint64_t, 1, 'a', "Total Sessions", "")
 VSC_F(s_req,			uint64_t, 1, 'a', "Total Requests", "")
diff --git a/lib/libvcl/generate.py b/lib/libvcl/generate.py
index 51b2294..1dab49d 100755
--- a/lib/libvcl/generate.py
+++ b/lib/libvcl/generate.py
@@ -337,6 +337,12 @@ sp_variables = (
 		( 'fetch',),
 		'const struct sess *'
 	),
+	('beresp.stream_tokens',
+		'INT',
+		( 'fetch',),
+		( 'fetch',),
+		'const struct sess *'
+	),
 	('beresp.ttl',
 		'DURATION',
 		( 'fetch',),
-- 
1.7.4.1




More information about the varnish-dev mailing list