[master] 684467d Move all the acceptor related stuff out of cache_pool.c and into cache_acceptor.c where it belongs.
Poul-Henning Kamp
phk at FreeBSD.org
Tue Mar 17 10:50:22 CET 2015
commit 684467d905366baa6b68f8e071ced2ecbbe6f44c
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Tue Mar 17 09:49:53 2015 +0000
Move all the acceptor related stuff out of cache_pool.c and
into cache_acceptor.c where it belongs.
diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 7317a74..da5673e 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -704,9 +704,9 @@ struct sess {
/* cache_acceptor.c */
void VCA_Init(void);
void VCA_Shutdown(void);
-int VCA_Accept(struct listen_sock *ls, struct wrk_accept *wa);
const char *VCA_SetupSess(struct worker *w, struct sess *sp);
void VCA_FailSess(struct worker *w);
+void VCA_New_SessPool(struct pool *pp, struct sesspool *sp);
/* cache_backend_cfg.c */
void VBE_InitCfg(void);
@@ -987,10 +987,12 @@ const char *sess_close_2str(enum sess_close sc, int want_desc);
/* cache_pool.c */
void Pool_Init(void);
-void Pool_Accept(void);
void Pool_Work_Thread(struct pool *, struct worker *w);
int Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how);
+int Pool_Task_Arg(struct worker *, task_func_t *,
+ const void *arg, size_t arg_len);
void Pool_Sumstat(struct worker *w);
+int Pool_TrySumstat(struct worker *wrk);
void Pool_PurgeStat(unsigned nobj);
#define V1L_IsReleased(w) ((w)->v1l == NULL)
diff --git a/bin/varnishd/cache/cache_acceptor.c b/bin/varnishd/cache/cache_acceptor.c
index b05c43f..05a1207 100644
--- a/bin/varnishd/cache/cache_acceptor.c
+++ b/bin/varnishd/cache/cache_acceptor.c
@@ -29,9 +29,6 @@
* This source file has the various trickery surrounding the accept/listen
* sockets.
*
- * The actual acceptance is done from cache_pool.c, by calling
- * into VCA_Accept() in this file.
- *
* Once the session is allocated we move into it with a call to
* VCA_SetupSess().
*
@@ -56,9 +53,17 @@
#include "vtim.h"
static pthread_t VCA_thread;
-static int hack_ready;
static double vca_pace = 0.0;
static struct lock pace_mtx;
+static unsigned pool_accepting;
+
+struct poolsock {
+ unsigned magic;
+#define POOLSOCK_MAGIC 0x1b0a2d38
+ struct listen_sock *lsock;
+ struct pool_task task;
+ struct sesspool *sesspool;
+};
/*--------------------------------------------------------------------
* TCP options we want to control
@@ -269,49 +274,6 @@ vca_pace_good(void)
}
/*--------------------------------------------------------------------
- * Accept on a listen socket, and handle error returns.
- *
- * Called from a worker thread from a pool
- */
-
-int
-VCA_Accept(struct listen_sock *ls, struct wrk_accept *wa)
-{
- int i;
-
- CHECK_OBJ_NOTNULL(ls, LISTEN_SOCK_MAGIC);
- vca_pace_check();
-
- while(!hack_ready)
- (void)usleep(100*1000);
-
- wa->acceptaddrlen = sizeof wa->acceptaddr;
- do {
- i = accept(ls->sock, (void*)&wa->acceptaddr,
- &wa->acceptaddrlen);
- } while (i < 0 && errno == EAGAIN);
-
- if (i < 0) {
- switch (errno) {
- case ECONNABORTED:
- break;
- case EMFILE:
- VSL(SLT_Debug, ls->sock, "Too many open files");
- vca_pace_bad();
- break;
- default:
- VSL(SLT_Debug, ls->sock, "Accept failed: %s",
- strerror(errno));
- vca_pace_bad();
- break;
- }
- }
- wa->acceptlsock = ls;
- wa->acceptsock = i;
- return (i);
-}
-
-/*--------------------------------------------------------------------
* Fail a session
*
* This happens if we accept the socket, but cannot get a session
@@ -365,6 +327,103 @@ VCA_SetupSess(struct worker *wrk, struct sess *sp)
return (retval);
}
+/*--------------------------------------------------------------------
+ * Nobody is accepting on this socket, so we do.
+ *
+ * As long as we can stick the accepted connection to another thread
+ * we do so, otherwise we put the socket back on the "BACK" queue
+ * and handle the new connection ourselves.
+ *
+ * We store data about the accept in reserved workspace on the reserved
+ * worker workspace. SES_pool_accept_task() knows about this.
+ */
+
+static void __match_proto__(task_func_t)
+vca_accept_task(struct worker *wrk, void *arg)
+{
+ struct wrk_accept wa;
+ struct poolsock *ps;
+ struct listen_sock *ls;
+ int i;
+
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+ CAST_OBJ_NOTNULL(ps, arg, POOLSOCK_MAGIC);
+ ls = ps->lsock;
+ CHECK_OBJ_NOTNULL(ls, LISTEN_SOCK_MAGIC);
+
+ /* Delay until we are ready (flag is set when all
+ * initialization has finished) */
+ while (!pool_accepting)
+ VTIM_sleep(.1);
+
+ while (1) {
+ INIT_OBJ(&wa, WRK_ACCEPT_MAGIC);
+ wa.sesspool = ps->sesspool;
+ wa.acceptlsock = ls;
+
+ assert(ls->sock > 0); // We know where stdin is
+
+ vca_pace_check();
+
+ wa.acceptaddrlen = sizeof wa.acceptaddr;
+ do {
+ i = accept(ls->sock, (void*)&wa.acceptaddr,
+ &wa.acceptaddrlen);
+ } while (i < 0 && errno == EAGAIN);
+
+ if (i < 0) {
+ switch (errno) {
+ case ECONNABORTED:
+ break;
+ case EMFILE:
+ VSL(SLT_Debug, ls->sock, "Too many open files");
+ vca_pace_bad();
+ break;
+ default:
+ VSL(SLT_Debug, ls->sock, "Accept failed: %s",
+ strerror(errno));
+ vca_pace_bad();
+ break;
+ }
+ wrk->stats->sess_fail++;
+ (void)Pool_TrySumstat(wrk);
+ continue;
+ }
+
+ wa.acceptsock = i;
+
+ if (!Pool_Task_Arg(wrk, SES_pool_accept_task, &wa, sizeof wa)) {
+ AZ(Pool_Task(wrk->pool, &ps->task, POOL_QUEUE_BACK));
+ return;
+ }
+
+ /*
+ * We were able to hand off, so release this threads VCL
+ * reference (if any) so we don't hold on to discarded VCLs.
+ */
+ if (wrk->vcl != NULL)
+ VCL_Rel(&wrk->vcl);
+ }
+}
+
+void
+VCA_New_SessPool(struct pool *pp, struct sesspool *sp)
+{
+ struct listen_sock *ls;
+ struct poolsock *ps;
+
+ VTAILQ_FOREACH(ls, &heritage.socks, list) {
+ assert(ls->sock > 0); // We know where stdin is
+ ALLOC_OBJ(ps, POOLSOCK_MAGIC);
+ AN(ps);
+ ps->lsock = ls;
+ ps->task.func = vca_accept_task;
+ ps->task.priv = ps;
+ ps->sesspool = sp;
+ AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
+ }
+}
+
/*--------------------------------------------------------------------*/
static void *
@@ -380,8 +439,7 @@ vca_acct(void *arg)
(void)vca_tcp_opt_init();
VTAILQ_FOREACH(ls, &heritage.socks, list) {
- if (ls->sock < 0)
- continue;
+ assert (ls->sock > 0); // We know where stdin is
AZ(listen(ls->sock, cache_param->listen_depth));
vca_tcp_opt_set(ls->sock, 1);
if (cache_param->accept_filter) {
@@ -393,18 +451,15 @@ vca_acct(void *arg)
}
}
- hack_ready = 1;
+ pool_accepting = 1;
need_test = 1;
t0 = VTIM_real();
while (1) {
(void)sleep(1);
if (vca_tcp_opt_init()) {
- VTAILQ_FOREACH(ls, &heritage.socks, list) {
- if (ls->sock < 0)
- continue;
+ VTAILQ_FOREACH(ls, &heritage.socks, list)
vca_tcp_opt_set(ls->sock, 1);
- }
}
now = VTIM_real();
VSC_C_main->uptime = (uint64_t)(now - t0);
@@ -412,7 +467,6 @@ vca_acct(void *arg)
NEEDLESS_RETURN(NULL);
}
-
/*--------------------------------------------------------------------*/
static void
@@ -444,7 +498,7 @@ ccf_listen_address(struct cli *cli, const char * const *av, void *priv)
* a race where varnishtest::client would attempt to connect(2)
* before listen(2) has been called.
*/
- while(!hack_ready)
+ while(!pool_accepting)
(void)usleep(100*1000);
VTAILQ_FOREACH(ls, &heritage.socks, list) {
diff --git a/bin/varnishd/cache/cache_main.c b/bin/varnishd/cache/cache_main.c
index 1f2f48f..bfe99e3 100644
--- a/bin/varnishd/cache/cache_main.c
+++ b/bin/varnishd/cache/cache_main.c
@@ -245,8 +245,6 @@ child_main(void)
if (FEATURE(FEATURE_WAIT_SILO))
SMP_Ready();
- Pool_Accept();
-
CLI_Run();
BAN_Shutdown();
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index 689baf7..762685b 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -38,20 +38,11 @@
#include <stdlib.h>
#include "cache.h"
-#include "common/heritage.h"
#include "vtim.h"
VTAILQ_HEAD(taskhead, pool_task);
-struct poolsock {
- unsigned magic;
-#define POOLSOCK_MAGIC 0x1b0a2d38
- struct listen_sock *lsock;
- struct pool_task task;
- struct sesspool *sesspool;
-};
-
/* Number of work requests queued in excess of worker threads available */
struct pool {
@@ -78,7 +69,6 @@ struct pool {
static struct lock pool_mtx;
static pthread_t thr_pool_herder;
-static unsigned pool_accepting = 0;
static struct lock wstat_mtx;
@@ -110,7 +100,7 @@ Pool_Sumstat(struct worker *wrk)
memset(wrk->stats, 0, sizeof *wrk->stats);
}
-static int
+int
Pool_TrySumstat(struct worker *wrk)
{
if (Lck_Trylock(&wstat_mtx))
@@ -185,8 +175,9 @@ pool_getidleworker(struct pool *pp)
* Return one if another thread was scheduled, otherwise zero.
*/
-static int
-Pool_Task_Arg(struct worker *wrk, const void *arg, size_t arg_len)
+int
+Pool_Task_Arg(struct worker *wrk, task_func_t *func,
+ const void *arg, size_t arg_len)
{
struct pool *pp;
struct worker *wrk2;
@@ -212,7 +203,7 @@ Pool_Task_Arg(struct worker *wrk, const void *arg, size_t arg_len)
assert(arg_len == WS_Reserve(wrk2->aws, arg_len));
memcpy(wrk2->aws->f, arg, arg_len);
- wrk2->task.func = SES_pool_accept_task;
+ wrk2->task.func = func;
wrk2->task.priv = wrk2->aws->f;
if (retval)
AZ(pthread_cond_signal(&wrk2->cond));
@@ -220,60 +211,6 @@ Pool_Task_Arg(struct worker *wrk, const void *arg, size_t arg_len)
}
/*--------------------------------------------------------------------
- * Nobody is accepting on this socket, so we do.
- *
- * As long as we can stick the accepted connection to another thread
- * we do so, otherwise we put the socket back on the "BACK" queue
- * and handle the new connection ourselves.
- *
- * We store data about the accept in reserved workspace on the reserved
- * worker workspace. SES_pool_accept_task() knows about this.
- */
-
-static void __match_proto__(task_func_t)
-pool_accept(struct worker *wrk, void *arg)
-{
- struct wrk_accept wa;
- struct poolsock *ps;
-
- CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
- CAST_OBJ_NOTNULL(ps, arg, POOLSOCK_MAGIC);
-
- CHECK_OBJ_NOTNULL(ps->lsock, LISTEN_SOCK_MAGIC);
-
- /* Delay until we are ready (flag is set when all
- * initialization has finished) */
- while (!pool_accepting)
- VTIM_sleep(.1);
-
- while (1) {
- INIT_OBJ(&wa, WRK_ACCEPT_MAGIC);
- wa.sesspool = ps->sesspool;
-
- assert(ps->lsock->sock > 0); // We know where stdin is
-
- if (VCA_Accept(ps->lsock, &wa) < 0) {
- wrk->stats->sess_fail++;
- /* We're going to pace in vca anyway... */
- (void)Pool_TrySumstat(wrk);
- continue;
- }
-
- if (!Pool_Task_Arg(wrk, &wa, sizeof wa)) {
- AZ(Pool_Task(wrk->pool, &ps->task, POOL_QUEUE_BACK));
- return;
- }
-
- /*
- * We were able to hand off, so release this threads VCL
- * reference (if any) so we don't hold on to discarded VCLs.
- */
- if (wrk->vcl != NULL)
- VCL_Rel(&wrk->vcl);
- }
-}
-
-/*--------------------------------------------------------------------
* Enter a new task to be done
*/
@@ -596,8 +533,6 @@ static struct pool *
pool_mkpool(unsigned pool_no)
{
struct pool *pp;
- struct listen_sock *ls;
- struct poolsock *ps;
ALLOC_OBJ(pp, POOL_MAGIC);
if (pp == NULL)
@@ -617,17 +552,6 @@ pool_mkpool(unsigned pool_no)
pp->sesspool = SES_NewPool(pp, pool_no);
AN(pp->sesspool);
- VTAILQ_FOREACH(ls, &heritage.socks, list) {
- assert(ls->sock > 0); // We know where stdin is
- ALLOC_OBJ(ps, POOLSOCK_MAGIC);
- AN(ps);
- ps->lsock = ls;
- ps->task.func = pool_accept;
- ps->task.priv = ps;
- ps->sesspool = pp->sesspool;
- AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
- }
-
return (pp);
}
@@ -673,14 +597,6 @@ pool_poolherder(void *priv)
/*--------------------------------------------------------------------*/
void
-Pool_Accept(void)
-{
-
- ASSERT_CLI();
- pool_accepting = 1;
-}
-
-void
Pool_Init(void)
{
diff --git a/bin/varnishd/cache/cache_session.c b/bin/varnishd/cache/cache_session.c
index 56d760a..943e412 100644
--- a/bin/varnishd/cache/cache_session.c
+++ b/bin/varnishd/cache/cache_session.c
@@ -505,6 +505,8 @@ SES_NewPool(struct pool *wp, unsigned pool_no)
pp->mpl_sess = MPL_New(nb, &cache_param->sess_pool,
&cache_param->workspace_session);
pp->http1_waiter = Wait_New(ses_handle, &cache_param->timeout_idle);
+
+ VCA_New_SessPool(wp, pp);
return (pp);
}
More information about the varnish-commit
mailing list