r1031 - trunk/varnish-cache/bin/varnishd

phk at projects.linpro.no phk at projects.linpro.no
Sat Sep 16 21:54:34 CEST 2006


Author: phk
Date: 2006-09-16 21:54:34 +0200 (Sat, 16 Sep 2006)
New Revision: 1031

Modified:
   trunk/varnish-cache/bin/varnishd/cache_pool.c
   trunk/varnish-cache/bin/varnishd/heritage.h
   trunk/varnish-cache/bin/varnishd/mgt_param.c
Log:
Make it possible to have multiple worker pools.

The acceptor selects the pool based on filedescriptor modulus
number of pools.

This is an attempt to reduce lock contention.


Modified: trunk/varnish-cache/bin/varnishd/cache_pool.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_pool.c	2006-09-16 16:00:48 UTC (rev 1030)
+++ trunk/varnish-cache/bin/varnishd/cache_pool.c	2006-09-16 19:54:34 UTC (rev 1031)
@@ -29,17 +29,22 @@
 #include "cli_priv.h"
 #include "cache.h"
 
-static MTX wrk_mtx;
+TAILQ_HEAD(workerhead, worker);
 
 /* Number of work requests queued in excess of worker threads available */
-static unsigned		wrk_overflow;
 
-TAILQ_HEAD(workerhead, worker);
+struct wq {
+	MTX 			mtx;
+	struct workerhead	idle;
+	TAILQ_HEAD(, workreq)   req;
+	unsigned		overflow;
+};
 
-static struct workerhead wrk_idle = TAILQ_HEAD_INITIALIZER(wrk_idle);
-static struct workerhead wrk_busy = TAILQ_HEAD_INITIALIZER(wrk_busy);
-static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead);
+static MTX			tmtx;
 
+static struct wq		**wq;
+static unsigned			nwq;
+
 /*--------------------------------------------------------------------
  * Write data to fd
  * We try to use writev() if possible in order to minimize number of
@@ -169,9 +174,10 @@
 wrk_thread(void *priv)
 {
 	struct worker *w, ww;
+	struct wq *qp;
 	char c;
 
-	(void)priv;
+	qp = priv;
 	w = &ww;
 	memset(w, 0, sizeof *w);
 	w->magic = WORKER_MAGIC;
@@ -179,40 +185,38 @@
 	AZ(pipe(w->pipe));
 
 	VSL(SLT_WorkThread, 0, "%p start", w);
-	LOCK(&wrk_mtx);
+	LOCK(&qp->mtx);
 	VSL_stats->n_wrk_create++;
-	TAILQ_INSERT_HEAD(&wrk_busy, w, list);
 	VSL_stats->n_wrk_busy++;
 	while (1) {
 		CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
 
 		/* Process overflow requests, if any */
-		if (wrk_overflow > 0) {
-			wrk_overflow--;
-			w->wrq = TAILQ_FIRST(&wrk_reqhead);
+		if (qp->overflow > 0) {
+			qp->overflow--;
+			w->wrq = TAILQ_FIRST(&qp->req);
 			AN(w->wrq);
-			TAILQ_REMOVE(&wrk_reqhead, w->wrq, list);
+			TAILQ_REMOVE(&qp->req, w->wrq, list);
 			VSL_stats->n_wrk_queue--;
-			UNLOCK(&wrk_mtx);
+			UNLOCK(&qp->mtx);
 			wrk_do_one(w);
-			LOCK(&wrk_mtx);
+			LOCK(&qp->mtx);
 			continue;
 		}
 		
-		TAILQ_REMOVE(&wrk_busy, w, list);
-		TAILQ_INSERT_HEAD(&wrk_idle, w, list);
+		TAILQ_INSERT_HEAD(&qp->idle, w, list);
 		assert(w->idle != 0);
 		VSL_stats->n_wrk_busy--;
-		UNLOCK(&wrk_mtx);
+		UNLOCK(&qp->mtx);
 		assert(1 == read(w->pipe[0], &c, 1));
 		if (w->idle == 0)
 			break;
 		wrk_do_one(w);
-		LOCK(&wrk_mtx);
+		LOCK(&qp->mtx);
 	}
-	LOCK(&wrk_mtx);
+	LOCK(&tmtx);
 	VSL_stats->n_wrk--;
-	UNLOCK(&wrk_mtx);
+	UNLOCK(&tmtx);
 	VSL(SLT_WorkThread, 0, "%p end", w);
 	close(w->pipe[0]);
 	close(w->pipe[1]);
@@ -226,39 +230,42 @@
 {
 	struct worker *w;
 	pthread_t tp;
+	struct wq *qp;
 
 	sp->workreq.sess = sp;
+	qp = wq[sp->fd % nwq];
 
-	LOCK(&wrk_mtx);
+	LOCK(&qp->mtx);
 
 	/* If there are idle threads, we tickle the first one into action */
-	w = TAILQ_FIRST(&wrk_idle);
+	w = TAILQ_FIRST(&qp->idle);
 	if (w != NULL) {
-		TAILQ_REMOVE(&wrk_idle, w, list);
-		TAILQ_INSERT_TAIL(&wrk_busy, w, list);
+		TAILQ_REMOVE(&qp->idle, w, list);
 		VSL_stats->n_wrk_busy++;
-		UNLOCK(&wrk_mtx);
+		UNLOCK(&qp->mtx);
 		w->wrq = &sp->workreq;
 		assert(1 == write(w->pipe[1], w, 1));
 		return;
 	}
 	
-	TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list);
+	TAILQ_INSERT_TAIL(&qp->req, &sp->workreq, list);
 	VSL_stats->n_wrk_queue++;
-	wrk_overflow++;
+	qp->overflow++;
+	UNLOCK(&qp->mtx);
 
+	LOCK(&tmtx);
 	/* Can we create more threads ? */
 	if (VSL_stats->n_wrk >= params->wthread_max) {
 		VSL_stats->n_wrk_max++;
-		UNLOCK(&wrk_mtx);
+		UNLOCK(&tmtx);
 		return;
 	}
 
 	/* Try to create a thread */
 	VSL_stats->n_wrk++;
-	UNLOCK(&wrk_mtx);
+	UNLOCK(&tmtx);
 
-	if (!pthread_create(&tp, NULL, wrk_thread, NULL)) {
+	if (!pthread_create(&tp, NULL, wrk_thread, qp)) {
 		AZ(pthread_detach(tp));
 		return;
 	}
@@ -266,40 +273,75 @@
 	VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
 	    errno, strerror(errno));
 
+	LOCK(&tmtx);
 	/* Register overflow */
-	LOCK(&wrk_mtx);
 	VSL_stats->n_wrk--;
 	VSL_stats->n_wrk_failed++;
-	UNLOCK(&wrk_mtx);
+	UNLOCK(&tmtx);
 }
 
 /*--------------------------------------------------------------------*/
+
+static void
+wrk_addpools(unsigned t)
+{
+	struct wq **pwq, **owq;
+	unsigned u;
+
+	if (t <= nwq)
+		return;
+
+	pwq = calloc(sizeof *pwq, params->wthread_pools);
+	if (pwq == NULL)
+		return;
+	if (wq != NULL)
+		memcpy(pwq, wq, sizeof *pwq * nwq);
+	owq = wq;
+	wq = pwq;
+	for (u = nwq; u < t; u++) {
+		wq[u] = calloc(sizeof *wq[u], 1);
+		XXXAN(wq[u]);
+		MTX_INIT(&wq[u]->mtx);
+		TAILQ_INIT(&wq[u]->idle);
+		TAILQ_INIT(&wq[u]->req);
+	}
+	free(owq);
+	nwq = t;
+}
+
+/*--------------------------------------------------------------------*/
 	
 static void *
 wrk_reaperthread(void *priv)
 {
 	time_t	now;
 	struct worker *w;
+	struct wq *qp;
+	unsigned u;
 
 	(void)priv;
 	while (1) {
+		wrk_addpools(params->wthread_pools);
 		sleep(1);
 		if (VSL_stats->n_wrk <= params->wthread_min)
 			continue; 
 		now = time(NULL);
-		LOCK(&wrk_mtx);
-		w = TAILQ_LAST(&wrk_idle, workerhead);
-		if (w != NULL &&
-		   (w->idle + params->wthread_timeout < now ||
-		    VSL_stats->n_wrk <= params->wthread_max))
-			TAILQ_REMOVE(&wrk_idle, w, list);
-		else 
-			w = NULL;
-		UNLOCK(&wrk_mtx);
-		if (w == NULL)
-			continue;
-		w->idle = 0;
-		assert(1 == write(w->pipe[1], w, 1));
+		for (u = 0; u < nwq; u++) {
+			qp = wq[u];
+			LOCK(&qp->mtx);
+			w = TAILQ_LAST(&qp->idle, workerhead);
+			if (w != NULL &&
+			   (w->idle + params->wthread_timeout < now ||
+			    VSL_stats->n_wrk <= params->wthread_max))
+				TAILQ_REMOVE(&qp->idle, w, list);
+			else 
+				w = NULL;
+			UNLOCK(&qp->mtx);
+			if (w == NULL)
+				continue;
+			w->idle = 0;
+			assert(1 == write(w->pipe[1], w, 1));
+		}
 	}
 	INCOMPL();
 }
@@ -310,53 +352,20 @@
 WRK_Init(void)
 {
 	pthread_t tp;
-	int i;
 
-	MTX_INIT(&wrk_mtx);
-
+	wrk_addpools(params->wthread_pools);
+	MTX_INIT(&tmtx);
 	AZ(pthread_create(&tp, NULL, wrk_reaperthread, NULL));
 	AZ(pthread_detach(tp));
-
-	VSL(SLT_Debug, 0, "Starting %u worker threads", params->wthread_min);
-	for (i = 0; i < params->wthread_min; i++) {
-		VSL_stats->n_wrk++;
-		AZ(pthread_create(&tp, NULL, wrk_thread, NULL));
-		AZ(pthread_detach(tp));
-	}
 }
 
-
 /*--------------------------------------------------------------------*/
 
 void
 cli_func_dump_pool(struct cli *cli, char **av, void *priv)
 {
-	unsigned u;
-	struct sess *s;
-	time_t t;
 
+	(void)cli;
 	(void)av;
 	(void)priv;
-	struct worker *w;
-	LOCK(&wrk_mtx);
-	t = time(NULL);
-	TAILQ_FOREACH(w, &wrk_busy, list) {
-		cli_out(cli, "\n");
-		cli_out(cli, "W %p", w);
-		if (w->wrq == NULL)
-			continue;
-		s = w->wrq->sess;
-		if (s == NULL)
-			continue;
-		cli_out(cli, "sess %p fd %d xid %u step %d handling %d age %d",
-		    s, s->fd, s->xid, s->step, s->handling,
-		    t - s->t_req.tv_sec);
-	}
-	cli_out(cli, "\n");
-
-	u = 0;
-	TAILQ_FOREACH(w, &wrk_idle, list)
-		u++;
-	cli_out(cli, "%u idle workers\n", u);
-	UNLOCK(&wrk_mtx);
 }

Modified: trunk/varnish-cache/bin/varnishd/heritage.h
===================================================================
--- trunk/varnish-cache/bin/varnishd/heritage.h	2006-09-16 16:00:48 UTC (rev 1030)
+++ trunk/varnish-cache/bin/varnishd/heritage.h	2006-09-16 19:54:34 UTC (rev 1031)
@@ -36,6 +36,7 @@
 	unsigned		wthread_min;
 	unsigned		wthread_max;
 	unsigned		wthread_timeout;
+	unsigned		wthread_pools;
 
 	/* Memory allocation hints */
 	unsigned		mem_workspace;

Modified: trunk/varnish-cache/bin/varnishd/mgt_param.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/mgt_param.c	2006-09-16 16:00:48 UTC (rev 1030)
+++ trunk/varnish-cache/bin/varnishd/mgt_param.c	2006-09-16 19:54:34 UTC (rev 1031)
@@ -118,6 +118,18 @@
 /*--------------------------------------------------------------------*/
 
 static void
+tweak_thread_pools(struct cli *cli, struct parspec *par, const char *arg)
+{
+
+	(void)par;
+	tweak_generic_uint(cli, &params->wthread_pools, arg,
+	    1, UINT_MAX);
+}
+
+
+/*--------------------------------------------------------------------*/
+
+static void
 tweak_thread_pool_min(struct cli *cli, struct parspec *par, const char *arg)
 {
 
@@ -296,6 +308,9 @@
 		"To force an immediate effect at the expense of a total "
 		"flush of the cache use \"url.purge .\"",
 		"120", "seconds" },
+	{ "thread_pools", tweak_thread_pools,
+		"Number of thread pools.\n",
+		"1", "pools" },
 	{ "thread_pool_max", tweak_thread_pool_max,
 		"The maximum number of threads in the worker pool.\n"
 		"-1 is unlimited.\n"




More information about the varnish-commit mailing list