[master] eff28c6 Generalize the "special" way we use to schedule accepted sockets onto a worker thread (falling back to the current (=acceptor) thread if the pool is empty.
Poul-Henning Kamp
phk at FreeBSD.org
Tue Mar 17 10:10:01 CET 2015
commit eff28c6dd923394a86ceb0429df6ed538028c9f3
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Tue Mar 17 09:09:20 2015 +0000
Generalize the "special" way we use to schedule accepted sockets
onto a worker thread (falling back to the current (=acceptor) thread
if the pool is empty.
diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index d176ef6..7317a74 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -322,6 +322,7 @@ struct wrk_accept {
socklen_t acceptaddrlen;
int acceptsock;
struct listen_sock *acceptlsock;
+ struct sesspool *sesspool;
};
/* Worker pool stuff -------------------------------------------------*/
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index d1023f3..689baf7 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -49,6 +49,7 @@ struct poolsock {
#define POOLSOCK_MAGIC 0x1b0a2d38
struct listen_sock *lsock;
struct pool_task task;
+ struct sesspool *sesspool;
};
/* Number of work requests queued in excess of worker threads available */
@@ -178,6 +179,47 @@ pool_getidleworker(struct pool *pp)
}
/*--------------------------------------------------------------------
+ * Special scheduling: If no thread can be found, the current thread
+ * will be prepared for rescheduling instead.
+ * The selected threads workspace is reserved and the argument put there.
+ * Return one if another thread was scheduled, otherwise zero.
+ */
+
+static int
+Pool_Task_Arg(struct worker *wrk, const void *arg, size_t arg_len)
+{
+ struct pool *pp;
+ struct worker *wrk2;
+ int retval;
+
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+ AN(arg);
+ AN(arg_len);
+ pp = wrk->pool;
+ CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+
+ Lck_Lock(&pp->mtx);
+ wrk2 = pool_getidleworker(pp);
+ if (wrk2 != NULL) {
+ VTAILQ_REMOVE(&pp->idle_queue, &wrk2->task, list);
+ retval = 1;
+ } else {
+ wrk2 = wrk;
+ retval = 0;
+ }
+ Lck_Unlock(&pp->mtx);
+ AZ(wrk2->task.func);
+
+ assert(arg_len == WS_Reserve(wrk2->aws, arg_len));
+ memcpy(wrk2->aws->f, arg, arg_len);
+ wrk2->task.func = SES_pool_accept_task;
+ wrk2->task.priv = wrk2->aws->f;
+ if (retval)
+ AZ(pthread_cond_signal(&wrk2->cond));
+ return (retval);
+}
+
+/*--------------------------------------------------------------------
* Nobody is accepting on this socket, so we do.
*
* As long as we can stick the accepted connection to another thread
@@ -191,19 +233,13 @@ pool_getidleworker(struct pool *pp)
static void __match_proto__(task_func_t)
pool_accept(struct worker *wrk, void *arg)
{
- struct worker *wrk2;
- struct wrk_accept *wa, *wa2;
- struct pool *pp;
+ struct wrk_accept wa;
struct poolsock *ps;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
- pp = wrk->pool;
- CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
CAST_OBJ_NOTNULL(ps, arg, POOLSOCK_MAGIC);
CHECK_OBJ_NOTNULL(ps->lsock, LISTEN_SOCK_MAGIC);
- assert(sizeof *wa == WS_Reserve(wrk->aws, sizeof *wa));
- wa = (void*)wrk->aws->f;
/* Delay until we are ready (flag is set when all
* initialization has finished) */
@@ -211,36 +247,22 @@ pool_accept(struct worker *wrk, void *arg)
VTIM_sleep(.1);
while (1) {
- INIT_OBJ(wa, WRK_ACCEPT_MAGIC);
+ INIT_OBJ(&wa, WRK_ACCEPT_MAGIC);
+ wa.sesspool = ps->sesspool;
assert(ps->lsock->sock > 0); // We know where stdin is
- if (VCA_Accept(ps->lsock, wa) < 0) {
+ if (VCA_Accept(ps->lsock, &wa) < 0) {
wrk->stats->sess_fail++;
/* We're going to pace in vca anyway... */
(void)Pool_TrySumstat(wrk);
continue;
}
- Lck_Lock(&pp->mtx);
- wrk2 = pool_getidleworker(pp);
- if (wrk2 == NULL) {
- /* No idle threads, do it ourselves */
- Lck_Unlock(&pp->mtx);
- AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
- wrk->task.func = SES_pool_accept_task;
- wrk->task.priv = pp->sesspool;
+ if (!Pool_Task_Arg(wrk, &wa, sizeof wa)) {
+ AZ(Pool_Task(wrk->pool, &ps->task, POOL_QUEUE_BACK));
return;
}
- VTAILQ_REMOVE(&pp->idle_queue, &wrk2->task, list);
- AZ(wrk2->task.func);
- assert(sizeof *wa2 == WS_Reserve(wrk2->aws, sizeof *wa2));
- wa2 = (void*)wrk2->aws->f;
- memcpy(wa2, wa, sizeof *wa);
- wrk2->task.func = SES_pool_accept_task;
- wrk2->task.priv = pp->sesspool;
- Lck_Unlock(&pp->mtx);
- AZ(pthread_cond_signal(&wrk2->cond));
/*
* We were able to hand off, so release this threads VCL
@@ -589,11 +611,12 @@ pool_mkpool(unsigned pool_no)
VTAILQ_INIT(&pp->idle_queue);
VTAILQ_INIT(&pp->front_queue);
VTAILQ_INIT(&pp->back_queue);
- pp->sesspool = SES_NewPool(pp, pool_no);
- AN(pp->sesspool);
AZ(pthread_cond_init(&pp->herder_cond, NULL));
AZ(pthread_create(&pp->herder_thr, NULL, pool_herder, pp));
+ pp->sesspool = SES_NewPool(pp, pool_no);
+ AN(pp->sesspool);
+
VTAILQ_FOREACH(ls, &heritage.socks, list) {
assert(ls->sock > 0); // We know where stdin is
ALLOC_OBJ(ps, POOLSOCK_MAGIC);
@@ -601,6 +624,7 @@ pool_mkpool(unsigned pool_no)
ps->lsock = ls;
ps->task.func = pool_accept;
ps->task.priv = ps;
+ ps->sesspool = pp->sesspool;
AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
}
diff --git a/bin/varnishd/cache/cache_session.c b/bin/varnishd/cache/cache_session.c
index 7542b0c..56d760a 100644
--- a/bin/varnishd/cache/cache_session.c
+++ b/bin/varnishd/cache/cache_session.c
@@ -196,9 +196,11 @@ SES_pool_accept_task(struct worker *wrk, void *arg)
struct sesspool *pp;
struct sess *sp;
const char *lsockname;
+ struct wrk_accept *wa;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
- CAST_OBJ_NOTNULL(pp, arg, SESSPOOL_MAGIC);
+ CAST_OBJ_NOTNULL(wa, arg, WRK_ACCEPT_MAGIC);
+ pp = wa->sesspool;
/* Turn accepted socket into a session */
AN(wrk->aws->r);
More information about the varnish-commit
mailing list