[master] eff28c6 Generalize the "special" way we use to schedule accepted sockets onto a worker thread (falling back to the current (=acceptor) thread if the pool is empty.

Poul-Henning Kamp phk at FreeBSD.org
Tue Mar 17 10:10:01 CET 2015


commit eff28c6dd923394a86ceb0429df6ed538028c9f3
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date:   Tue Mar 17 09:09:20 2015 +0000

    Generalize the "special" way we use to schedule accepted sockets
    onto a worker thread (falling back to the current (=acceptor) thread
    if the pool is empty.

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index d176ef6..7317a74 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -322,6 +322,7 @@ struct wrk_accept {
 	socklen_t		acceptaddrlen;
 	int			acceptsock;
 	struct listen_sock	*acceptlsock;
+	struct sesspool		*sesspool;
 };
 
 /* Worker pool stuff -------------------------------------------------*/
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index d1023f3..689baf7 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -49,6 +49,7 @@ struct poolsock {
 #define POOLSOCK_MAGIC			0x1b0a2d38
 	struct listen_sock		*lsock;
 	struct pool_task		task;
+	struct sesspool			*sesspool;
 };
 
 /* Number of work requests queued in excess of worker threads available */
@@ -178,6 +179,47 @@ pool_getidleworker(struct pool *pp)
 }
 
 /*--------------------------------------------------------------------
+ * Special scheduling:  If no thread can be found, the current thread
+ * will be prepared for rescheduling instead.
+ * The selected threads workspace is reserved and the argument put there.
+ * Return one if another thread was scheduled, otherwise zero.
+ */
+
+static int
+Pool_Task_Arg(struct worker *wrk, const void *arg, size_t arg_len)
+{
+	struct pool *pp;
+	struct worker *wrk2;
+	int retval;
+
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+	AN(arg);
+	AN(arg_len);
+	pp = wrk->pool;
+	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+
+	Lck_Lock(&pp->mtx);
+	wrk2 = pool_getidleworker(pp);
+	if (wrk2 != NULL) {
+		VTAILQ_REMOVE(&pp->idle_queue, &wrk2->task, list);
+		retval = 1;
+	} else {
+		wrk2 = wrk;
+		retval = 0;
+	}
+	Lck_Unlock(&pp->mtx);
+	AZ(wrk2->task.func);
+
+	assert(arg_len == WS_Reserve(wrk2->aws, arg_len));
+	memcpy(wrk2->aws->f, arg, arg_len);
+	wrk2->task.func = SES_pool_accept_task;
+	wrk2->task.priv = wrk2->aws->f;
+	if (retval)
+		AZ(pthread_cond_signal(&wrk2->cond));
+	return (retval);
+}
+
+/*--------------------------------------------------------------------
  * Nobody is accepting on this socket, so we do.
  *
  * As long as we can stick the accepted connection to another thread
@@ -191,19 +233,13 @@ pool_getidleworker(struct pool *pp)
 static void __match_proto__(task_func_t)
 pool_accept(struct worker *wrk, void *arg)
 {
-	struct worker *wrk2;
-	struct wrk_accept *wa, *wa2;
-	struct pool *pp;
+	struct wrk_accept wa;
 	struct poolsock *ps;
 
 	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
-	pp = wrk->pool;
-	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
 	CAST_OBJ_NOTNULL(ps, arg, POOLSOCK_MAGIC);
 
 	CHECK_OBJ_NOTNULL(ps->lsock, LISTEN_SOCK_MAGIC);
-	assert(sizeof *wa == WS_Reserve(wrk->aws, sizeof *wa));
-	wa = (void*)wrk->aws->f;
 
 	/* Delay until we are ready (flag is set when all
 	 * initialization has finished) */
@@ -211,36 +247,22 @@ pool_accept(struct worker *wrk, void *arg)
 		VTIM_sleep(.1);
 
 	while (1) {
-		INIT_OBJ(wa, WRK_ACCEPT_MAGIC);
+		INIT_OBJ(&wa, WRK_ACCEPT_MAGIC);
+		wa.sesspool = ps->sesspool;
 
 		assert(ps->lsock->sock > 0);	// We know where stdin is
 
-		if (VCA_Accept(ps->lsock, wa) < 0) {
+		if (VCA_Accept(ps->lsock, &wa) < 0) {
 			wrk->stats->sess_fail++;
 			/* We're going to pace in vca anyway... */
 			(void)Pool_TrySumstat(wrk);
 			continue;
 		}
 
-		Lck_Lock(&pp->mtx);
-		wrk2 = pool_getidleworker(pp);
-		if (wrk2 == NULL) {
-			/* No idle threads, do it ourselves */
-			Lck_Unlock(&pp->mtx);
-			AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
-			wrk->task.func = SES_pool_accept_task;
-			wrk->task.priv = pp->sesspool;
+		if (!Pool_Task_Arg(wrk, &wa, sizeof wa)) {
+			AZ(Pool_Task(wrk->pool, &ps->task, POOL_QUEUE_BACK));
 			return;
 		}
-		VTAILQ_REMOVE(&pp->idle_queue, &wrk2->task, list);
-		AZ(wrk2->task.func);
-		assert(sizeof *wa2 == WS_Reserve(wrk2->aws, sizeof *wa2));
-		wa2 = (void*)wrk2->aws->f;
-		memcpy(wa2, wa, sizeof *wa);
-		wrk2->task.func = SES_pool_accept_task;
-		wrk2->task.priv = pp->sesspool;
-		Lck_Unlock(&pp->mtx);
-		AZ(pthread_cond_signal(&wrk2->cond));
 
 		/*
 		 * We were able to hand off, so release this threads VCL
@@ -589,11 +611,12 @@ pool_mkpool(unsigned pool_no)
 	VTAILQ_INIT(&pp->idle_queue);
 	VTAILQ_INIT(&pp->front_queue);
 	VTAILQ_INIT(&pp->back_queue);
-	pp->sesspool = SES_NewPool(pp, pool_no);
-	AN(pp->sesspool);
 	AZ(pthread_cond_init(&pp->herder_cond, NULL));
 	AZ(pthread_create(&pp->herder_thr, NULL, pool_herder, pp));
 
+	pp->sesspool = SES_NewPool(pp, pool_no);
+	AN(pp->sesspool);
+
 	VTAILQ_FOREACH(ls, &heritage.socks, list) {
 		assert(ls->sock > 0);		// We know where stdin is
 		ALLOC_OBJ(ps, POOLSOCK_MAGIC);
@@ -601,6 +624,7 @@ pool_mkpool(unsigned pool_no)
 		ps->lsock = ls;
 		ps->task.func = pool_accept;
 		ps->task.priv = ps;
+		ps->sesspool = pp->sesspool;
 		AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
 	}
 
diff --git a/bin/varnishd/cache/cache_session.c b/bin/varnishd/cache/cache_session.c
index 7542b0c..56d760a 100644
--- a/bin/varnishd/cache/cache_session.c
+++ b/bin/varnishd/cache/cache_session.c
@@ -196,9 +196,11 @@ SES_pool_accept_task(struct worker *wrk, void *arg)
 	struct sesspool *pp;
 	struct sess *sp;
 	const char *lsockname;
+	struct wrk_accept *wa;
 
 	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
-	CAST_OBJ_NOTNULL(pp, arg, SESSPOOL_MAGIC);
+	CAST_OBJ_NOTNULL(wa, arg, WRK_ACCEPT_MAGIC);
+	pp = wa->sesspool;
 
 	/* Turn accepted socket into a session */
 	AN(wrk->aws->r);



More information about the varnish-commit mailing list