r409 - in trunk/varnish-cache: bin/varnishd bin/varnishlog include

phk at projects.linpro.no phk at projects.linpro.no
Mon Jul 10 16:52:05 CEST 2006


Author: phk
Date: 2006-07-10 16:52:04 +0200 (Mon, 10 Jul 2006)
New Revision: 409

Modified:
   trunk/varnish-cache/bin/varnishd/cache.h
   trunk/varnish-cache/bin/varnishd/cache_acceptor.c
   trunk/varnish-cache/bin/varnishd/cache_main.c
   trunk/varnish-cache/bin/varnishd/cache_pool.c
   trunk/varnish-cache/bin/varnishd/heritage.h
   trunk/varnish-cache/bin/varnishd/varnishd.c
   trunk/varnish-cache/bin/varnishlog/varnishlog.c
   trunk/varnish-cache/include/shmlog_tags.h
   trunk/varnish-cache/include/stat_field.h
Log:
Rewrite the worker thread pool code.

Assign prefix WRK to the worker pool.

Introduce a struct workreq since the prefetcher (when it happens) will
not have a session to pass in.

The worker threads get a cond_var each and are hung from a list in
most recently used order.

When a request is queued and the worker thread list is not empty,
tickle the cond_var of the first one.

If no threads were availble the max number of threads is not reached,
try to start another worker thread.

If the max was reached or the start filed (likely due to out of memory)
indicate overflow and let the existing pool deal with it.

Create only the minimum requested number of threads initially.

Allow specification of the timeout before a dynamic worker thread commits
suicide to be specified with -w.

Default parameters are -w1,UINT_MAX,10 {min, max, timeout}




Modified: trunk/varnish-cache/bin/varnishd/cache.h
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache.h	2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishd/cache.h	2006-07-10 14:52:04 UTC (rev 409)
@@ -52,8 +52,17 @@
 	struct sbuf		*sb;
 	struct objhead		*nobjhead;
 	struct object		*nobj;
+
+	unsigned		nbr;
+	pthread_cond_t		cv;
+	TAILQ_ENTRY(worker)	list;
 };
 
+struct workreq {
+	TAILQ_ENTRY(workreq)	list;
+	struct sess		*sess;
+};
+
 #include "hash_slinger.h"
 
 /* Backend Connection ------------------------------------------------*/
@@ -150,6 +159,8 @@
 	/* Various internal stuff */
 	struct sessmem		*mem;
 	time_t			t0;
+
+	struct workreq		workreq;
 };
 
 struct backend {
@@ -238,8 +249,8 @@
 void PipeSession(struct worker *w, struct sess *sp);
 
 /* cache_pool.c */
-void CacheInitPool(void);
-void DealWithSession(void *arg);
+void WRK_Init(void);
+void WRK_QueueSession(struct sess *sp);
 
 /* cache_shmlog.c */
 

Modified: trunk/varnish-cache/bin/varnishd/cache_acceptor.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor.c	2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor.c	2006-07-10 14:52:04 UTC (rev 409)
@@ -190,7 +190,7 @@
 		vca_return_session(sp);
 		return;
 	}
-	DealWithSession(sp);
+	WRK_QueueSession(sp);
 }
 
 static void

Modified: trunk/varnish-cache/bin/varnishd/cache_main.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_main.c	2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishd/cache_main.c	2006-07-10 14:52:04 UTC (rev 409)
@@ -109,7 +109,7 @@
 	AZ(pthread_mutex_init(&sessmtx, NULL));
 	VBE_Init();
 	VSL_Init();
-	CacheInitPool();
+	WRK_Init();
 
 	VCA_Init();
 	EXP_Init();

Modified: trunk/varnish-cache/bin/varnishd/cache_pool.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_pool.c	2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishd/cache_pool.c	2006-07-10 14:52:04 UTC (rev 409)
@@ -5,6 +5,7 @@
  */
 
 #include <stdio.h>
+#include <errno.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
@@ -16,11 +17,15 @@
 #include "vcl.h"
 #include "cache.h"
 
-static TAILQ_HEAD(, sess) shd = TAILQ_HEAD_INITIALIZER(shd);
-
-static pthread_cond_t	shdcnd;
+static pthread_mutex_t wrk_mtx;
 static unsigned		xids;
 
+/* Number of work requests queued in excess of worker threads available */
+static unsigned		wrk_overflow;
+
+static TAILQ_HEAD(, worker) wrk_head = TAILQ_HEAD_INITIALIZER(wrk_head);
+static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead);
+
 /*--------------------------------------------------------------------*/
 
 static int
@@ -52,114 +57,202 @@
 	return (1);
 }
 
-static void *
-CacheWorker(void *priv)
+static void
+wrk_WorkSession(struct worker *w, struct sess *sp)
 {
-	struct sess *sp;
-	struct worker w;
 	int done;
 	char *b;
 
-	memset(&w, 0, sizeof w);
-	w.eb = event_init();
-	assert(w.eb != NULL);
-	w.sb = sbuf_new(NULL, NULL, 0, SBUF_AUTOEXTEND);
-	assert(w.sb != NULL);
-	
-	(void)priv;
+	time(&sp->t0);
 	AZ(pthread_mutex_lock(&sessmtx));
-	while (1) {
-		while (1) {
-			sp = TAILQ_FIRST(&shd);
-			if (sp != NULL)
-				break;
-			AZ(pthread_cond_wait(&shdcnd, &sessmtx));
-		}
-		TAILQ_REMOVE(&shd, sp, list);
-		time(&sp->t0);
-		sp->vcl = GetVCL();
-		AZ(pthread_mutex_unlock(&sessmtx));
+	sp->vcl = GetVCL();
+	AZ(pthread_mutex_unlock(&sessmtx));
 
-		done = http_Dissect(sp->http, sp->fd, 1);
-		if (done != 0) {
-			RES_Error(&w, sp, done, NULL);
-			goto out;
+	done = http_Dissect(sp->http, sp->fd, 1);
+	if (done != 0) {
+		RES_Error(w, sp, done, NULL);
+		goto out;
+	}
+
+	sp->backend = sp->vcl->backend[0];
+
+	VCL_recv_method(sp);
+
+	for (done = 0; !done; ) {
+		switch(sp->handling) {
+		case VCL_RET_LOOKUP:
+			done = LookupSession(w, sp);
+			break;
+		case VCL_RET_FETCH:
+			done = FetchSession(w, sp);
+			break;
+		case VCL_RET_DELIVER:
+			done = DeliverSession(w, sp);
+			break;
+		case VCL_RET_PIPE:
+			PipeSession(w, sp);
+			done = 1;
+			break;
+		case VCL_RET_PASS:
+			PassSession(w, sp);
+			done = 1;
+			break;
+		default:
+			INCOMPL();
 		}
+	}
+	if (http_GetHdr(sp->http, "Connection", &b) &&
+	    !strcmp(b, "close")) {
+		vca_close_session(sp, "Connection header");
+	} else if (http_GetProto(sp->http, &b) &&
+	    strcmp(b, "HTTP/1.1")) {
+		vca_close_session(sp, "not HTTP/1.1");
+	}
 
-		sp->backend = sp->vcl->backend[0];
+out:
+	AZ(pthread_mutex_lock(&sessmtx));
+	RelVCL(sp->vcl);
+	AZ(pthread_mutex_unlock(&sessmtx));
+	sp->vcl = NULL;
+	vca_return_session(sp);
+}
 
-		VCL_recv_method(sp);
+/*--------------------------------------------------------------------*/
 
-		for (done = 0; !done; ) {
-			switch(sp->handling) {
-			case VCL_RET_LOOKUP:
-				done = LookupSession(&w, sp);
-				break;
-			case VCL_RET_FETCH:
-				done = FetchSession(&w, sp);
-				break;
-			case VCL_RET_DELIVER:
-				done = DeliverSession(&w, sp);
-				break;
-			case VCL_RET_PIPE:
-				PipeSession(&w, sp);
-				done = 1;
-				break;
-			case VCL_RET_PASS:
-				PassSession(&w, sp);
-				done = 1;
-				break;
-			default:
-				INCOMPL();
-			}
+static void *
+wrk_thread(void *priv)
+{
+	struct worker *w, ww;
+	struct workreq *wrq;
+	struct timespec ts;
+
+	w = &ww;
+	memset(w, 0, sizeof w);
+
+	AZ(pthread_cond_init(&w->cv, NULL));
+
+	w->eb = event_init();
+	assert(w->eb != NULL);
+
+	w->sb = sbuf_new(NULL, NULL, 0, SBUF_AUTOEXTEND);
+	assert(w->sb != NULL);
+	
+	AZ(pthread_mutex_lock(&wrk_mtx));
+	VSL_stats->n_wrk++;
+	w->nbr = VSL_stats->n_wrk;
+	if (priv == NULL)
+		VSL(SLT_WorkThread, 0, "%u born dynamic", w->nbr);
+	else
+		VSL(SLT_WorkThread, 0, "%u born permanent", w->nbr);
+	TAILQ_INSERT_HEAD(&wrk_head, w, list);
+	while (1) {
+		wrq = TAILQ_FIRST(&wrk_reqhead);
+		if (wrq != NULL) {
+			VSL_stats->n_wrkbusy++;
+			TAILQ_REMOVE(&wrk_head, w, list);
+			TAILQ_REMOVE(&wrk_reqhead, wrq, list);
+			AZ(pthread_mutex_unlock(&wrk_mtx));
+			assert(wrq->sess != NULL);
+			wrk_WorkSession(w, wrq->sess);
+			AZ(pthread_mutex_lock(&wrk_mtx));
+			VSL_stats->n_wrkbusy--;
+			TAILQ_INSERT_HEAD(&wrk_head, w, list);
 		}
-		if (http_GetHdr(sp->http, "Connection", &b) &&
-		    !strcmp(b, "close")) {
-			vca_close_session(sp, "Connection header");
-		} else if (http_GetProto(sp->http, &b) &&
-		    strcmp(b, "HTTP/1.1")) {
-			vca_close_session(sp, "not HTTP/1.1");
+		if (wrk_overflow > 0) {
+			wrk_overflow--;
+			continue;
 		}
 
-out:
-		AZ(pthread_mutex_lock(&sessmtx));
-		RelVCL(sp->vcl);
-		sp->vcl = NULL;
-		vca_return_session(sp);
+		/* If we are a reserved thread we don't die */
+		if (priv != NULL) {
+			AZ(pthread_cond_wait(&w->cv, &wrk_mtx));
+			continue;
+		}
+
+		/* If we are a dynamic thread, time out and die */
+		clock_gettime(CLOCK_REALTIME, &ts);
+		ts.tv_sec += heritage.wthread_timeout;
+		if (pthread_cond_timedwait(&w->cv, &wrk_mtx, &ts)) {
+			VSL_stats->n_wrk--;
+			TAILQ_REMOVE(&wrk_head, w, list);
+			AZ(pthread_mutex_unlock(&wrk_mtx));
+			VSL(SLT_WorkThread, 0, "%u suicide", w->nbr);
+			sbuf_delete(w->sb);
+			event_base_free(w->eb);
+			AZ(pthread_cond_destroy(&w->cv));
+			return (NULL);
+		}
 	}
 }
 
+/*--------------------------------------------------------------------*/
+
 void
-DealWithSession(void *arg)
+WRK_QueueSession(struct sess *sp)
 {
-	struct sess *sp = arg;
+	struct worker *w;
+	pthread_t tp;
 
 	time(&sp->t_req);
 
 	/*
 	 * No locking necessary, we're serialized in the acceptor thread
+	 * XXX: still ?
 	 */
 	sp->xid = xids++;
 	VSL(SLT_XID, sp->fd, "%u", sp->xid);
 
+	sp->workreq.sess = sp;
 	VSL_stats->client_req++;
-	AZ(pthread_mutex_lock(&sessmtx));
-	TAILQ_INSERT_TAIL(&shd, sp, list);
-	AZ(pthread_mutex_unlock(&sessmtx));
-	AZ(pthread_cond_signal(&shdcnd));
+
+	AZ(pthread_mutex_lock(&wrk_mtx));
+	TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list);
+
+	/* If there are idle threads, we tickle the first one into action */
+	w = TAILQ_FIRST(&wrk_head);
+	if (w != NULL) {
+		AZ(pthread_cond_signal(&w->cv));
+		AZ(pthread_mutex_unlock(&wrk_mtx));
+		return;
+	}
+	
+	/* Register overflow if max threads reached */
+	if (VSL_stats->n_wrk >= heritage.wthread_max) {
+		wrk_overflow++;
+		AZ(pthread_mutex_unlock(&wrk_mtx));
+		return;
+	}
+
+	/* Try to create a thread */
+	AZ(pthread_mutex_unlock(&wrk_mtx));
+	if (!pthread_create(&tp, NULL, wrk_thread, NULL)) {
+		AZ(pthread_detach(tp));
+		return;
+	}
+
+	VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
+	    errno, strerror(errno));
+
+	/* Register overflow */
+	AZ(pthread_mutex_lock(&wrk_mtx));
+	wrk_overflow++;
+	AZ(pthread_mutex_unlock(&wrk_mtx));
 }
+	
 
+/*--------------------------------------------------------------------*/
+
 void
-CacheInitPool(void)
+WRK_Init(void)
 {
 	pthread_t tp;
 	int i;
 
-	AZ(pthread_cond_init(&shdcnd, NULL));
+	AZ(pthread_mutex_init(&wrk_mtx, NULL));
 
 	VSL(SLT_Debug, 0, "Starting %u worker threads", heritage.wthread_min);
 	for (i = 0; i < heritage.wthread_min; i++) {
-		AZ(pthread_create(&tp, NULL, CacheWorker, NULL));
+		AZ(pthread_create(&tp, NULL, wrk_thread, &i));
 		AZ(pthread_detach(tp));
 	}
 	srandomdev();

Modified: trunk/varnish-cache/bin/varnishd/heritage.h
===================================================================
--- trunk/varnish-cache/bin/varnishd/heritage.h	2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishd/heritage.h	2006-07-10 14:52:04 UTC (rev 409)
@@ -37,7 +37,9 @@
 	unsigned		default_ttl;
 
 	/* Worker threads */
-	unsigned		wthread_min, wthread_max;
+	unsigned		wthread_min;
+	unsigned		wthread_max;
+	unsigned		wthread_timeout;
 
 	/* Memory allocation hints */
 	unsigned		mem_http_headerspace;

Modified: trunk/varnish-cache/bin/varnishd/varnishd.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/varnishd.c	2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishd/varnishd.c	2006-07-10 14:52:04 UTC (rev 409)
@@ -8,6 +8,7 @@
 #include <string.h>
 #include <errno.h>
 #include <fcntl.h>
+#include <limits.h>
 #include <signal.h>
 #include <stdarg.h>
 #include <stdio.h>
@@ -419,8 +420,8 @@
 	fprintf(stderr, "    %-28s # %s\n",
 	    "-s kind[,storageoptions]", "Backend storage specification");
 	fprintf(stderr, "    %-28s # %s\n", "-t", "Default TTL");
-	fprintf(stderr, "    %-28s # %s\n", "-w int[,int]",
-	    "Number of worker threads (fixed/{min,max})");
+	fprintf(stderr, "    %-28s # %s\n", "-w int[,int[,int]]",
+	    "Number of worker threads (fixed/{min,max}/{min/max/timeout})");
 #if 0
 	-c clusterid at cluster_controller
 	-m memory_limit
@@ -435,6 +436,28 @@
 
 /*--------------------------------------------------------------------*/
 
+static void
+tackle_warg(const char *argv)
+{
+	int i;
+	unsigned ua, ub, uc;
+
+	i = sscanf(argv, "%u,%u,%u", &ua, &ub, &uc);
+	if (i == 0)
+		usage();
+	if (ua < 1)
+		usage();
+	heritage.wthread_min = ua;
+	heritage.wthread_max = ua;
+	heritage.wthread_timeout = 10;
+	if (i >= 2)
+		heritage.wthread_max = ub;
+	if (i >= 3)
+		heritage.wthread_timeout = uc;
+}
+
+/*--------------------------------------------------------------------*/
+
 /* for development purposes */
 #include <printf.h>
 #include <err.h>
@@ -442,8 +465,7 @@
 int
 main(int argc, char *argv[])
 {
-	int o, i;
-	unsigned ua, ub;
+	int o;
 	const char *portnumber = "8080";
 	unsigned dflag = 1;	/* XXX: debug=on for now */
 	const char *bflag = NULL;
@@ -456,8 +478,9 @@
 	VCC_InitCompile();
 
 	heritage.default_ttl = 120;
-	heritage.wthread_min = 5;
-	heritage.wthread_max = 5;
+	heritage.wthread_min = 1;
+	heritage.wthread_max = UINT_MAX;
+	heritage.wthread_timeout = 10;
 	heritage.mem_http_headerspace= 4096;
 	heritage.mem_http_headers= 32;
 	heritage.mem_workspace = 0;
@@ -486,13 +509,7 @@
 			heritage.default_ttl = strtoul(optarg, NULL, 0);
 			break;
 		case 'w':
-			i = sscanf(optarg, "%u,%u", &ua, &ub);
-			if (i == 0)
-				usage();
-			heritage.wthread_min = ua;
-			heritage.wthread_max = ua;
-			if (i == 2)
-				heritage.wthread_max = ub;
+			tackle_warg(optarg);
 			break;
 		default:
 			usage();

Modified: trunk/varnish-cache/bin/varnishlog/varnishlog.c
===================================================================
--- trunk/varnish-cache/bin/varnishlog/varnishlog.c	2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishlog/varnishlog.c	2006-07-10 14:52:04 UTC (rev 409)
@@ -182,6 +182,7 @@
 	case SLT_SessionClose:
 	case SLT_SessionReuse:
 	case SLT_BackendClose:
+	case SLT_BackendReuse:
 		sbuf_finish(ob[u]);
 		if ((hc[u] != 4 || h_opt == 0) && sbuf_len(ob[u]) > 1)
 			printf("%s\n", sbuf_data(ob[u]));

Modified: trunk/varnish-cache/include/shmlog_tags.h
===================================================================
--- trunk/varnish-cache/include/shmlog_tags.h	2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/include/shmlog_tags.h	2006-07-10 14:52:04 UTC (rev 409)
@@ -36,3 +36,4 @@
 SLTM(ExpBan)
 SLTM(ExpPick)
 SLTM(ExpKill)
+SLTM(WorkThread)

Modified: trunk/varnish-cache/include/stat_field.h
===================================================================
--- trunk/varnish-cache/include/stat_field.h	2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/include/stat_field.h	2006-07-10 14:52:04 UTC (rev 409)
@@ -16,5 +16,7 @@
 MAC_STAT(n_smf,			uint64_t, "u", "N struct smf");
 MAC_STAT(n_vbe,			uint64_t, "u", "N struct vbe");
 MAC_STAT(n_vbe_conn,		uint64_t, "u", "N struct vbe_conn");
+MAC_STAT(n_wrk,			uint64_t, "u", "N worker threads");
+MAC_STAT(n_wrkbusy,		uint64_t, "u", "N busy worker threads");
 
 




More information about the varnish-commit mailing list