r409 - in trunk/varnish-cache: bin/varnishd bin/varnishlog include
phk at projects.linpro.no
phk at projects.linpro.no
Mon Jul 10 16:52:05 CEST 2006
Author: phk
Date: 2006-07-10 16:52:04 +0200 (Mon, 10 Jul 2006)
New Revision: 409
Modified:
trunk/varnish-cache/bin/varnishd/cache.h
trunk/varnish-cache/bin/varnishd/cache_acceptor.c
trunk/varnish-cache/bin/varnishd/cache_main.c
trunk/varnish-cache/bin/varnishd/cache_pool.c
trunk/varnish-cache/bin/varnishd/heritage.h
trunk/varnish-cache/bin/varnishd/varnishd.c
trunk/varnish-cache/bin/varnishlog/varnishlog.c
trunk/varnish-cache/include/shmlog_tags.h
trunk/varnish-cache/include/stat_field.h
Log:
Rewrite the worker thread pool code.
Assign prefix WRK to the worker pool.
Introduce a struct workreq since the prefetcher (when it happens) will
not have a session to pass in.
The worker threads get a cond_var each and are hung from a list in
most recently used order.
When a request is queued and the worker thread list is not empty,
tickle the cond_var of the first one.
If no threads were availble the max number of threads is not reached,
try to start another worker thread.
If the max was reached or the start filed (likely due to out of memory)
indicate overflow and let the existing pool deal with it.
Create only the minimum requested number of threads initially.
Allow specification of the timeout before a dynamic worker thread commits
suicide to be specified with -w.
Default parameters are -w1,UINT_MAX,10 {min, max, timeout}
Modified: trunk/varnish-cache/bin/varnishd/cache.h
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache.h 2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishd/cache.h 2006-07-10 14:52:04 UTC (rev 409)
@@ -52,8 +52,17 @@
struct sbuf *sb;
struct objhead *nobjhead;
struct object *nobj;
+
+ unsigned nbr;
+ pthread_cond_t cv;
+ TAILQ_ENTRY(worker) list;
};
+struct workreq {
+ TAILQ_ENTRY(workreq) list;
+ struct sess *sess;
+};
+
#include "hash_slinger.h"
/* Backend Connection ------------------------------------------------*/
@@ -150,6 +159,8 @@
/* Various internal stuff */
struct sessmem *mem;
time_t t0;
+
+ struct workreq workreq;
};
struct backend {
@@ -238,8 +249,8 @@
void PipeSession(struct worker *w, struct sess *sp);
/* cache_pool.c */
-void CacheInitPool(void);
-void DealWithSession(void *arg);
+void WRK_Init(void);
+void WRK_QueueSession(struct sess *sp);
/* cache_shmlog.c */
Modified: trunk/varnish-cache/bin/varnishd/cache_acceptor.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor.c 2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor.c 2006-07-10 14:52:04 UTC (rev 409)
@@ -190,7 +190,7 @@
vca_return_session(sp);
return;
}
- DealWithSession(sp);
+ WRK_QueueSession(sp);
}
static void
Modified: trunk/varnish-cache/bin/varnishd/cache_main.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_main.c 2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishd/cache_main.c 2006-07-10 14:52:04 UTC (rev 409)
@@ -109,7 +109,7 @@
AZ(pthread_mutex_init(&sessmtx, NULL));
VBE_Init();
VSL_Init();
- CacheInitPool();
+ WRK_Init();
VCA_Init();
EXP_Init();
Modified: trunk/varnish-cache/bin/varnishd/cache_pool.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_pool.c 2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishd/cache_pool.c 2006-07-10 14:52:04 UTC (rev 409)
@@ -5,6 +5,7 @@
*/
#include <stdio.h>
+#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
@@ -16,11 +17,15 @@
#include "vcl.h"
#include "cache.h"
-static TAILQ_HEAD(, sess) shd = TAILQ_HEAD_INITIALIZER(shd);
-
-static pthread_cond_t shdcnd;
+static pthread_mutex_t wrk_mtx;
static unsigned xids;
+/* Number of work requests queued in excess of worker threads available */
+static unsigned wrk_overflow;
+
+static TAILQ_HEAD(, worker) wrk_head = TAILQ_HEAD_INITIALIZER(wrk_head);
+static TAILQ_HEAD(, workreq) wrk_reqhead = TAILQ_HEAD_INITIALIZER(wrk_reqhead);
+
/*--------------------------------------------------------------------*/
static int
@@ -52,114 +57,202 @@
return (1);
}
-static void *
-CacheWorker(void *priv)
+static void
+wrk_WorkSession(struct worker *w, struct sess *sp)
{
- struct sess *sp;
- struct worker w;
int done;
char *b;
- memset(&w, 0, sizeof w);
- w.eb = event_init();
- assert(w.eb != NULL);
- w.sb = sbuf_new(NULL, NULL, 0, SBUF_AUTOEXTEND);
- assert(w.sb != NULL);
-
- (void)priv;
+ time(&sp->t0);
AZ(pthread_mutex_lock(&sessmtx));
- while (1) {
- while (1) {
- sp = TAILQ_FIRST(&shd);
- if (sp != NULL)
- break;
- AZ(pthread_cond_wait(&shdcnd, &sessmtx));
- }
- TAILQ_REMOVE(&shd, sp, list);
- time(&sp->t0);
- sp->vcl = GetVCL();
- AZ(pthread_mutex_unlock(&sessmtx));
+ sp->vcl = GetVCL();
+ AZ(pthread_mutex_unlock(&sessmtx));
- done = http_Dissect(sp->http, sp->fd, 1);
- if (done != 0) {
- RES_Error(&w, sp, done, NULL);
- goto out;
+ done = http_Dissect(sp->http, sp->fd, 1);
+ if (done != 0) {
+ RES_Error(w, sp, done, NULL);
+ goto out;
+ }
+
+ sp->backend = sp->vcl->backend[0];
+
+ VCL_recv_method(sp);
+
+ for (done = 0; !done; ) {
+ switch(sp->handling) {
+ case VCL_RET_LOOKUP:
+ done = LookupSession(w, sp);
+ break;
+ case VCL_RET_FETCH:
+ done = FetchSession(w, sp);
+ break;
+ case VCL_RET_DELIVER:
+ done = DeliverSession(w, sp);
+ break;
+ case VCL_RET_PIPE:
+ PipeSession(w, sp);
+ done = 1;
+ break;
+ case VCL_RET_PASS:
+ PassSession(w, sp);
+ done = 1;
+ break;
+ default:
+ INCOMPL();
}
+ }
+ if (http_GetHdr(sp->http, "Connection", &b) &&
+ !strcmp(b, "close")) {
+ vca_close_session(sp, "Connection header");
+ } else if (http_GetProto(sp->http, &b) &&
+ strcmp(b, "HTTP/1.1")) {
+ vca_close_session(sp, "not HTTP/1.1");
+ }
- sp->backend = sp->vcl->backend[0];
+out:
+ AZ(pthread_mutex_lock(&sessmtx));
+ RelVCL(sp->vcl);
+ AZ(pthread_mutex_unlock(&sessmtx));
+ sp->vcl = NULL;
+ vca_return_session(sp);
+}
- VCL_recv_method(sp);
+/*--------------------------------------------------------------------*/
- for (done = 0; !done; ) {
- switch(sp->handling) {
- case VCL_RET_LOOKUP:
- done = LookupSession(&w, sp);
- break;
- case VCL_RET_FETCH:
- done = FetchSession(&w, sp);
- break;
- case VCL_RET_DELIVER:
- done = DeliverSession(&w, sp);
- break;
- case VCL_RET_PIPE:
- PipeSession(&w, sp);
- done = 1;
- break;
- case VCL_RET_PASS:
- PassSession(&w, sp);
- done = 1;
- break;
- default:
- INCOMPL();
- }
+static void *
+wrk_thread(void *priv)
+{
+ struct worker *w, ww;
+ struct workreq *wrq;
+ struct timespec ts;
+
+ w = &ww;
+ memset(w, 0, sizeof w);
+
+ AZ(pthread_cond_init(&w->cv, NULL));
+
+ w->eb = event_init();
+ assert(w->eb != NULL);
+
+ w->sb = sbuf_new(NULL, NULL, 0, SBUF_AUTOEXTEND);
+ assert(w->sb != NULL);
+
+ AZ(pthread_mutex_lock(&wrk_mtx));
+ VSL_stats->n_wrk++;
+ w->nbr = VSL_stats->n_wrk;
+ if (priv == NULL)
+ VSL(SLT_WorkThread, 0, "%u born dynamic", w->nbr);
+ else
+ VSL(SLT_WorkThread, 0, "%u born permanent", w->nbr);
+ TAILQ_INSERT_HEAD(&wrk_head, w, list);
+ while (1) {
+ wrq = TAILQ_FIRST(&wrk_reqhead);
+ if (wrq != NULL) {
+ VSL_stats->n_wrkbusy++;
+ TAILQ_REMOVE(&wrk_head, w, list);
+ TAILQ_REMOVE(&wrk_reqhead, wrq, list);
+ AZ(pthread_mutex_unlock(&wrk_mtx));
+ assert(wrq->sess != NULL);
+ wrk_WorkSession(w, wrq->sess);
+ AZ(pthread_mutex_lock(&wrk_mtx));
+ VSL_stats->n_wrkbusy--;
+ TAILQ_INSERT_HEAD(&wrk_head, w, list);
}
- if (http_GetHdr(sp->http, "Connection", &b) &&
- !strcmp(b, "close")) {
- vca_close_session(sp, "Connection header");
- } else if (http_GetProto(sp->http, &b) &&
- strcmp(b, "HTTP/1.1")) {
- vca_close_session(sp, "not HTTP/1.1");
+ if (wrk_overflow > 0) {
+ wrk_overflow--;
+ continue;
}
-out:
- AZ(pthread_mutex_lock(&sessmtx));
- RelVCL(sp->vcl);
- sp->vcl = NULL;
- vca_return_session(sp);
+ /* If we are a reserved thread we don't die */
+ if (priv != NULL) {
+ AZ(pthread_cond_wait(&w->cv, &wrk_mtx));
+ continue;
+ }
+
+ /* If we are a dynamic thread, time out and die */
+ clock_gettime(CLOCK_REALTIME, &ts);
+ ts.tv_sec += heritage.wthread_timeout;
+ if (pthread_cond_timedwait(&w->cv, &wrk_mtx, &ts)) {
+ VSL_stats->n_wrk--;
+ TAILQ_REMOVE(&wrk_head, w, list);
+ AZ(pthread_mutex_unlock(&wrk_mtx));
+ VSL(SLT_WorkThread, 0, "%u suicide", w->nbr);
+ sbuf_delete(w->sb);
+ event_base_free(w->eb);
+ AZ(pthread_cond_destroy(&w->cv));
+ return (NULL);
+ }
}
}
+/*--------------------------------------------------------------------*/
+
void
-DealWithSession(void *arg)
+WRK_QueueSession(struct sess *sp)
{
- struct sess *sp = arg;
+ struct worker *w;
+ pthread_t tp;
time(&sp->t_req);
/*
* No locking necessary, we're serialized in the acceptor thread
+ * XXX: still ?
*/
sp->xid = xids++;
VSL(SLT_XID, sp->fd, "%u", sp->xid);
+ sp->workreq.sess = sp;
VSL_stats->client_req++;
- AZ(pthread_mutex_lock(&sessmtx));
- TAILQ_INSERT_TAIL(&shd, sp, list);
- AZ(pthread_mutex_unlock(&sessmtx));
- AZ(pthread_cond_signal(&shdcnd));
+
+ AZ(pthread_mutex_lock(&wrk_mtx));
+ TAILQ_INSERT_TAIL(&wrk_reqhead, &sp->workreq, list);
+
+ /* If there are idle threads, we tickle the first one into action */
+ w = TAILQ_FIRST(&wrk_head);
+ if (w != NULL) {
+ AZ(pthread_cond_signal(&w->cv));
+ AZ(pthread_mutex_unlock(&wrk_mtx));
+ return;
+ }
+
+ /* Register overflow if max threads reached */
+ if (VSL_stats->n_wrk >= heritage.wthread_max) {
+ wrk_overflow++;
+ AZ(pthread_mutex_unlock(&wrk_mtx));
+ return;
+ }
+
+ /* Try to create a thread */
+ AZ(pthread_mutex_unlock(&wrk_mtx));
+ if (!pthread_create(&tp, NULL, wrk_thread, NULL)) {
+ AZ(pthread_detach(tp));
+ return;
+ }
+
+ VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
+ errno, strerror(errno));
+
+ /* Register overflow */
+ AZ(pthread_mutex_lock(&wrk_mtx));
+ wrk_overflow++;
+ AZ(pthread_mutex_unlock(&wrk_mtx));
}
+
+/*--------------------------------------------------------------------*/
+
void
-CacheInitPool(void)
+WRK_Init(void)
{
pthread_t tp;
int i;
- AZ(pthread_cond_init(&shdcnd, NULL));
+ AZ(pthread_mutex_init(&wrk_mtx, NULL));
VSL(SLT_Debug, 0, "Starting %u worker threads", heritage.wthread_min);
for (i = 0; i < heritage.wthread_min; i++) {
- AZ(pthread_create(&tp, NULL, CacheWorker, NULL));
+ AZ(pthread_create(&tp, NULL, wrk_thread, &i));
AZ(pthread_detach(tp));
}
srandomdev();
Modified: trunk/varnish-cache/bin/varnishd/heritage.h
===================================================================
--- trunk/varnish-cache/bin/varnishd/heritage.h 2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishd/heritage.h 2006-07-10 14:52:04 UTC (rev 409)
@@ -37,7 +37,9 @@
unsigned default_ttl;
/* Worker threads */
- unsigned wthread_min, wthread_max;
+ unsigned wthread_min;
+ unsigned wthread_max;
+ unsigned wthread_timeout;
/* Memory allocation hints */
unsigned mem_http_headerspace;
Modified: trunk/varnish-cache/bin/varnishd/varnishd.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/varnishd.c 2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishd/varnishd.c 2006-07-10 14:52:04 UTC (rev 409)
@@ -8,6 +8,7 @@
#include <string.h>
#include <errno.h>
#include <fcntl.h>
+#include <limits.h>
#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
@@ -419,8 +420,8 @@
fprintf(stderr, " %-28s # %s\n",
"-s kind[,storageoptions]", "Backend storage specification");
fprintf(stderr, " %-28s # %s\n", "-t", "Default TTL");
- fprintf(stderr, " %-28s # %s\n", "-w int[,int]",
- "Number of worker threads (fixed/{min,max})");
+ fprintf(stderr, " %-28s # %s\n", "-w int[,int[,int]]",
+ "Number of worker threads (fixed/{min,max}/{min/max/timeout})");
#if 0
-c clusterid at cluster_controller
-m memory_limit
@@ -435,6 +436,28 @@
/*--------------------------------------------------------------------*/
+static void
+tackle_warg(const char *argv)
+{
+ int i;
+ unsigned ua, ub, uc;
+
+ i = sscanf(argv, "%u,%u,%u", &ua, &ub, &uc);
+ if (i == 0)
+ usage();
+ if (ua < 1)
+ usage();
+ heritage.wthread_min = ua;
+ heritage.wthread_max = ua;
+ heritage.wthread_timeout = 10;
+ if (i >= 2)
+ heritage.wthread_max = ub;
+ if (i >= 3)
+ heritage.wthread_timeout = uc;
+}
+
+/*--------------------------------------------------------------------*/
+
/* for development purposes */
#include <printf.h>
#include <err.h>
@@ -442,8 +465,7 @@
int
main(int argc, char *argv[])
{
- int o, i;
- unsigned ua, ub;
+ int o;
const char *portnumber = "8080";
unsigned dflag = 1; /* XXX: debug=on for now */
const char *bflag = NULL;
@@ -456,8 +478,9 @@
VCC_InitCompile();
heritage.default_ttl = 120;
- heritage.wthread_min = 5;
- heritage.wthread_max = 5;
+ heritage.wthread_min = 1;
+ heritage.wthread_max = UINT_MAX;
+ heritage.wthread_timeout = 10;
heritage.mem_http_headerspace= 4096;
heritage.mem_http_headers= 32;
heritage.mem_workspace = 0;
@@ -486,13 +509,7 @@
heritage.default_ttl = strtoul(optarg, NULL, 0);
break;
case 'w':
- i = sscanf(optarg, "%u,%u", &ua, &ub);
- if (i == 0)
- usage();
- heritage.wthread_min = ua;
- heritage.wthread_max = ua;
- if (i == 2)
- heritage.wthread_max = ub;
+ tackle_warg(optarg);
break;
default:
usage();
Modified: trunk/varnish-cache/bin/varnishlog/varnishlog.c
===================================================================
--- trunk/varnish-cache/bin/varnishlog/varnishlog.c 2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/bin/varnishlog/varnishlog.c 2006-07-10 14:52:04 UTC (rev 409)
@@ -182,6 +182,7 @@
case SLT_SessionClose:
case SLT_SessionReuse:
case SLT_BackendClose:
+ case SLT_BackendReuse:
sbuf_finish(ob[u]);
if ((hc[u] != 4 || h_opt == 0) && sbuf_len(ob[u]) > 1)
printf("%s\n", sbuf_data(ob[u]));
Modified: trunk/varnish-cache/include/shmlog_tags.h
===================================================================
--- trunk/varnish-cache/include/shmlog_tags.h 2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/include/shmlog_tags.h 2006-07-10 14:52:04 UTC (rev 409)
@@ -36,3 +36,4 @@
SLTM(ExpBan)
SLTM(ExpPick)
SLTM(ExpKill)
+SLTM(WorkThread)
Modified: trunk/varnish-cache/include/stat_field.h
===================================================================
--- trunk/varnish-cache/include/stat_field.h 2006-07-10 13:59:13 UTC (rev 408)
+++ trunk/varnish-cache/include/stat_field.h 2006-07-10 14:52:04 UTC (rev 409)
@@ -16,5 +16,7 @@
MAC_STAT(n_smf, uint64_t, "u", "N struct smf");
MAC_STAT(n_vbe, uint64_t, "u", "N struct vbe");
MAC_STAT(n_vbe_conn, uint64_t, "u", "N struct vbe_conn");
+MAC_STAT(n_wrk, uint64_t, "u", "N worker threads");
+MAC_STAT(n_wrkbusy, uint64_t, "u", "N busy worker threads");
More information about the varnish-commit
mailing list