[master] b04b83b Move the accept sockets into the new pool-task API and GC all the cruft now not needed.
Poul-Henning Kamp
phk at varnish-cache.org
Mon Jan 23 14:09:58 CET 2012
commit b04b83baca73fff2fec268a16946f12665099215
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Mon Jan 23 13:09:21 2012 +0000
Move the accept sockets into the new pool-task API and GC all the
cruft now not needed.
diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index fffb777..3c20777 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -304,14 +304,6 @@ enum pool_how {
/*--------------------------------------------------------------------*/
-enum e_do_what {
- pool_do_inval = 0,
- pool_do_sess,
- pool_do_accept,
- pool_do_nothing,
- pool_do_die,
-};
-
struct worker {
unsigned magic;
#define WORKER_MAGIC 0x6391adcf
@@ -323,12 +315,8 @@ struct worker {
void *nhashpriv;
struct dstat stats;
- /* New Pool stuff */
struct pool_task task;
- /* Pool stuff */
- enum e_do_what do_what;
-
double lastused;
struct wrw wrw;
@@ -938,7 +926,6 @@ int SES_Schedule(struct sess *sp);
void SES_Handle(struct sess *sp, double now);
void SES_GetReq(struct sess *sp);
void SES_ReleaseReq(struct sess *sp);
-pool_func_t SES_pool_task;
pool_func_t SES_pool_accept_task;
/* cache_shmlog.c */
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index 28ae2b1..dab5b11 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -85,8 +85,8 @@ VTAILQ_HEAD(taskhead, pool_task);
struct poolsock {
unsigned magic;
#define POOLSOCK_MAGIC 0x1b0a2d38
- VTAILQ_ENTRY(poolsock) list;
struct listen_sock *lsock;
+ struct pool_task task;
};
/* Number of work requests queued in excess of worker threads available */
@@ -104,7 +104,6 @@ struct pool {
struct taskhead idle_queue;
struct taskhead front_queue;
struct taskhead back_queue;
- VTAILQ_HEAD(, poolsock) socks;
unsigned nthr;
unsigned lqueue;
unsigned last_lqueue;
@@ -126,6 +125,7 @@ pool_getidleworker(const struct pool *pp, int back)
struct worker *wrk;
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+ Lck_AssertHeld(&pp->mtx);
if (back)
pt = VTAILQ_LAST(&pp->idle_queue, taskhead);
else
@@ -141,28 +141,27 @@ pool_getidleworker(const struct pool *pp, int back)
* Nobody is accepting on this socket, so we do.
*
* As long as we can stick the accepted connection to another thread
- * we do so, otherwise we return and handle it ourselves.
- *
- * Notice calling convention: Called locked and returns locked, but
- * works lock in the meantime.
+ * we do so, otherwise we put the socket back on the "BACK" queue
+ * and handle the new connection ourselves.
*
- * We store data about the accept in reserved workspace, it is only used
- * for a brief moment and it takes up around 144 bytes.
+ * We store data about the accept in reserved workspace on the reserved
+ * worker workspace. SES_pool_accept_task() knows about this.
*/
-static int
-pool_accept(struct pool *pp, struct worker *wrk, const struct poolsock *ps)
+static void
+pool_accept(struct worker *wrk, void *arg)
{
struct worker *wrk2;
struct wrk_accept *wa, *wa2;
+ struct pool *pp;
+ struct poolsock *ps;
- CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
- CHECK_OBJ_NOTNULL(ps, POOLSOCK_MAGIC);
+ pp = wrk->pool;
+ CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+ CAST_OBJ_NOTNULL(ps, arg, POOLSOCK_MAGIC);
CHECK_OBJ_NOTNULL(ps->lsock, LISTEN_SOCK_MAGIC);
- Lck_AssertHeld(&pp->mtx);
- Lck_Unlock(&pp->mtx);
assert(sizeof *wa == WS_Reserve(wrk->ws, sizeof *wa));
wa = (void*)wrk->ws->f;
while (1) {
@@ -171,8 +170,9 @@ pool_accept(struct pool *pp, struct worker *wrk, const struct poolsock *ps)
if (ps->lsock->sock < 0) {
/* Socket Shutdown */
- Lck_Lock(&pp->mtx);
- return (-1);
+ FREE_OBJ(ps);
+ WS_Release(wrk->ws, 0);
+ return;
}
if (VCA_Accept(ps->lsock, wa) < 0) {
wrk->stats.sess_fail++;
@@ -183,8 +183,13 @@ pool_accept(struct pool *pp, struct worker *wrk, const struct poolsock *ps)
Lck_Lock(&pp->mtx);
wrk2 = pool_getidleworker(pp, 0);
- if (wrk2 == NULL)
- return (0);
+ if (wrk2 == NULL) {
+ /* No idle threads, do it ourselves */
+ Lck_Unlock(&pp->mtx);
+ AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
+ SES_pool_accept_task(wrk, pp->sesspool);
+ return;
+ }
VTAILQ_REMOVE(&pp->idle_queue, &wrk2->task, list);
Lck_Unlock(&pp->mtx);
assert(sizeof *wa2 == WS_Reserve(wrk2->ws, sizeof *wa2));
@@ -248,7 +253,7 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
WRONG("Unknown enum pool_how");
}
Lck_Unlock(&pp->mtx);
- if (how == POOL_QUEUE_FRONT && !retval)
+ if (retval)
AZ(pthread_cond_signal(&pp->herder_cond));
return (retval);
}
@@ -261,18 +266,14 @@ void
Pool_Work_Thread(void *priv, struct worker *wrk)
{
struct pool *pp;
- int stats_clean, i;
- struct poolsock *ps;
+ int stats_clean;
struct pool_task *tp;
CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
wrk->pool = pp;
- Lck_Lock(&pp->mtx);
stats_clean = 1;
while (1) {
-
- Lck_AssertHeld(&pp->mtx);
- wrk->do_what = pool_do_inval;
+ Lck_Lock(&pp->mtx);
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
@@ -282,39 +283,13 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
if (tp != NULL) {
pp->lqueue--;
VTAILQ_REMOVE(&pp->front_queue, tp, list);
- }
-
- if (tp == NULL) {
+ } else {
tp = VTAILQ_FIRST(&pp->back_queue);
if (tp != NULL)
VTAILQ_REMOVE(&pp->back_queue, tp, list);
}
- if (tp != NULL) {
- Lck_Unlock(&pp->mtx);
- AN(tp->func);
- assert(wrk->pool == pp);
- tp->func(wrk, tp->priv);
- stats_clean = WRK_TrySumStat(wrk);
- Lck_Lock(&pp->mtx);
- continue;
- }
-
- if (!VTAILQ_EMPTY(&pp->socks)) {
- /* Accept on a socket */
- ps = VTAILQ_FIRST(&pp->socks);
- VTAILQ_REMOVE(&pp->socks, ps, list);
- i = pool_accept(pp, wrk, ps);
- Lck_AssertHeld(&pp->mtx);
- if (i < 0) {
- /* Socket Shutdown */
- FREE_OBJ(ps);
- WS_Release(wrk->ws, 0);
- continue;
- }
- VTAILQ_INSERT_TAIL(&pp->socks, ps, list);
- wrk->do_what = pool_do_accept;
- } else {
+ if (tp == NULL) {
/* Nothing to do: To sleep, perchance to dream ... */
if (isnan(wrk->lastused))
wrk->lastused = VTIM_real();
@@ -324,45 +299,17 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
if (!stats_clean)
WRK_SumStat(wrk);
(void)Lck_CondWait(&wrk->cond, &pp->mtx, NULL);
+ tp = &wrk->task;
}
-
- if (wrk->task.func != NULL) {
- Lck_Unlock(&pp->mtx);
- assert(wrk->pool == pp);
- wrk->task.func(wrk, wrk->task.priv);
- wrk->task.func = NULL;
- wrk->task.priv = NULL;
- stats_clean = WRK_TrySumStat(wrk);
- Lck_Lock(&pp->mtx);
- continue;
- }
-
- if (wrk->do_what == pool_do_die)
- break;
-
Lck_Unlock(&pp->mtx);
- if (wrk->do_what == pool_do_accept) {
- SES_pool_accept_task(wrk, pp->sesspool);
- if (wrk->sp == NULL)
- wrk->do_what = pool_do_nothing;
- else
- wrk->do_what = pool_do_sess;
- }
+ if (tp->func == NULL)
+ break;
- if (wrk->do_what == pool_do_sess) {
- stats_clean = 0;
- assert(wrk->pool == pp);
- SES_pool_task(wrk, wrk->sp);
- } else if (wrk->do_what == pool_do_nothing) {
- /* we already did */
- } else {
- WRONG("Invalid wrk->do_what");
- }
+ assert(wrk->pool == pp);
+ tp->func(wrk, tp->priv);
stats_clean = WRK_TrySumStat(wrk);
- Lck_Lock(&pp->mtx);
}
- Lck_Unlock(&pp->mtx);
wrk->pool = NULL;
}
@@ -474,11 +421,10 @@ pool_herder(void *priv)
VSC_C_main->sess_dropped += pp->ndropped;
pp->nqueued = pp->ndropped = 0;
wrk = pool_getidleworker(pp, 1);
- if (wrk != NULL &&
- (wrk->lastused < t_idle ||
+ if (wrk != NULL && (wrk->lastused < t_idle ||
pp->nthr > cache_param->wthread_max)) {
VTAILQ_REMOVE(&pp->idle_queue, &wrk->task, list);
- } else
+ } else
wrk = NULL;
Lck_Unlock(&pp->mtx);
@@ -490,7 +436,8 @@ pool_herder(void *priv)
VSC_C_main->threads_destroyed++;
Lck_Unlock(&pool_mtx);
AZ(wrk->sp);
- wrk->do_what = pool_do_die;
+ wrk->task.func = NULL;
+ wrk->task.priv = NULL;
AZ(pthread_cond_signal(&wrk->cond));
}
}
@@ -514,7 +461,6 @@ pool_mkpool(unsigned pool_no)
Lck_New(&pp->mtx, lck_wq);
VTAILQ_INIT(&pp->idle_queue);
- VTAILQ_INIT(&pp->socks);
VTAILQ_INIT(&pp->front_queue);
VTAILQ_INIT(&pp->back_queue);
pp->sesspool = SES_NewPool(pp, pool_no);
@@ -526,7 +472,9 @@ pool_mkpool(unsigned pool_no)
ALLOC_OBJ(ps, POOLSOCK_MAGIC);
XXXAN(ps);
ps->lsock = ls;
- VTAILQ_INSERT_TAIL(&pp->socks, ps, list);
+ ps->task.func = pool_accept;
+ ps->task.priv = ps;
+ AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
}
AZ(pthread_condattr_init(&cv_attr));
diff --git a/bin/varnishd/cache/cache_session.c b/bin/varnishd/cache/cache_session.c
index 1079bb9..3e144c1 100644
--- a/bin/varnishd/cache/cache_session.c
+++ b/bin/varnishd/cache/cache_session.c
@@ -127,40 +127,12 @@ SES_Alloc(void)
return (sp);
}
- /*--------------------------------------------------------------------
- * The pool-task for a newly accepted session
- */
-
-void
-SES_pool_accept_task(struct worker *wrk, void *arg)
-{
- struct sesspool *pp;
-
- CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
- CAST_OBJ_NOTNULL(pp, arg, SESSPOOL_MAGIC);
-
- /* Turn accepted socket into a session */
- AZ(wrk->sp);
- AN(wrk->ws->r);
- wrk->sp = ses_new(pp);
- if (wrk->sp == NULL) {
- VCA_FailSess(wrk);
- return;
- }
- VCA_SetupSess(wrk);
- wrk->sp->step = STP_FIRST;
- WS_Release(wrk->ws, 0);
- SES_pool_task(wrk, wrk->sp);
-}
-
-
-
/*--------------------------------------------------------------------
* The pool-task function for sessions
*/
-void
-SES_pool_task(struct worker *wrk, void *arg)
+static void
+ses_pool_task(struct worker *wrk, void *arg)
{
struct sess *sp;
@@ -192,6 +164,32 @@ SES_pool_task(struct worker *wrk, void *arg)
}
/*--------------------------------------------------------------------
+ * The pool-task for a newly accepted session
+ */
+
+void
+SES_pool_accept_task(struct worker *wrk, void *arg)
+{
+ struct sesspool *pp;
+
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+ CAST_OBJ_NOTNULL(pp, arg, SESSPOOL_MAGIC);
+
+ /* Turn accepted socket into a session */
+ AZ(wrk->sp);
+ AN(wrk->ws->r);
+ wrk->sp = ses_new(pp);
+ if (wrk->sp == NULL) {
+ VCA_FailSess(wrk);
+ return;
+ }
+ VCA_SetupSess(wrk);
+ wrk->sp->step = STP_FIRST;
+ WS_Release(wrk->ws, 0);
+ ses_pool_task(wrk, wrk->sp);
+}
+
+/*--------------------------------------------------------------------
* Schedule a session back on a work-thread from its pool
*/
@@ -207,7 +205,7 @@ SES_Schedule(struct sess *sp)
AN(pp->pool);
AZ(sp->wrk);
- sp->task.func = SES_pool_task;
+ sp->task.func = ses_pool_task;
sp->task.priv = sp;
if (Pool_Task(pp->pool, &sp->task, POOL_QUEUE_FRONT)) {
More information about the varnish-commit
mailing list