[experimental-ims] 12bf085 Get rid of the single acceptor thread.
Geoff Simmons
geoff at varnish-cache.org
Mon Jan 9 21:51:56 CET 2012
commit 12bf085b48f787b4744be61197136cd6ee983b5d
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Sun Sep 18 07:19:44 2011 +0000
Get rid of the single acceptor thread.
Instead of a single thread which accepts all sockets with a poll/accept
sequence, each thread-pool will have a thread accepting on each socket.
If no threads are available, the sockets will not be accepted on.
CAVEATS:
This commit undoubtedly leaves a number of minor issues dangling,
they will be cleaned up as we find them.
Please notice that there are changes to stats counters (some don't
work right now a nd more changes will be coming)
Changing certain acceptor-related params on the fly may not work.
Testing would be very welcome.
diff --git a/bin/varnishd/cache.h b/bin/varnishd/cache.h
index 71cf6b3..49ca3a4 100644
--- a/bin/varnishd/cache.h
+++ b/bin/varnishd/cache.h
@@ -305,7 +305,9 @@ struct worker {
/* Accept stuff */
struct sockaddr_storage acceptaddr;
+ socklen_t acceptaddrlen;
int acceptsock;
+ struct listen_sock *acceptlsock;
struct wrw wrw;
@@ -641,7 +643,7 @@ void VCA_Prep(struct sess *sp);
void VCA_Init(void);
void VCA_Shutdown(void);
int VCA_Accept(int sock, socklen_t *slp, struct sockaddr_storage *sap);
-extern pthread_t VCA_thread;
+void VCA_SetupSess(struct worker *w);
/* cache_backend.c */
void VBE_UseHealth(const struct director *vdi);
@@ -849,12 +851,13 @@ void WRW_Sendfile(struct worker *w, int fd, off_t off, unsigned len);
/* cache_session.c [SES] */
-void SES_Init(void);
-struct sess *SES_New(struct sesspool *pp);
+struct sess *SES_New(struct worker *wrk, struct sesspool *pp);
struct sess *SES_Alloc(void);
void SES_Close(struct sess *sp, const char *reason);
void SES_Delete(struct sess *sp, const char *reason);
void SES_Charge(struct sess *sp);
+struct sesspool *SES_NewPool(void);
+
/* cache_shmlog.c */
void VSL_Init(void);
diff --git a/bin/varnishd/cache_acceptor.c b/bin/varnishd/cache_acceptor.c
index 37cc857..b2a8015 100644
--- a/bin/varnishd/cache_acceptor.c
+++ b/bin/varnishd/cache_acceptor.c
@@ -33,6 +33,7 @@
#include <errno.h>
#include <poll.h>
#include <stdlib.h>
+#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
@@ -41,7 +42,7 @@
#include "cli_priv.h"
#include "cache.h"
-pthread_t VCA_thread;
+static pthread_t VCA_thread;
static struct timeval tv_sndtimeo;
static struct timeval tv_rcvtimeo;
@@ -178,6 +179,7 @@ vca_pace_check(void)
static void
vca_pace_bad(void)
{
+
Lck_Lock(&pace_mtx);
vca_pace += params->acceptor_sleep_incr;
if (vca_pace > params->acceptor_sleep_max)
@@ -202,13 +204,19 @@ vca_pace_good(void)
* Accept on a listen socket, and handle error returns.
*/
+static int hack_ready;
+
int
VCA_Accept(int sock, socklen_t *slp, struct sockaddr_storage *sap)
{
int i;
+ assert(sock >= 0);
vca_pace_check();
+ while(!hack_ready)
+ (void)usleep(100*1000);
+
*slp = sizeof *sap;
i = accept(sock, (void*)sap, slp);
@@ -234,45 +242,67 @@ VCA_Accept(int sock, socklen_t *slp, struct sockaddr_storage *sap)
/*--------------------------------------------------------------------*/
+void
+VCA_SetupSess(struct worker *w)
+{
+ struct sess *sp;
+
+ sp = w->sp;
+ if (sp == NULL) {
+ AZ(close(w->acceptsock));
+ w->acceptsock = -1;
+ VSC_C_main->client_drop++;
+ /* XXX: 50x Reply ? */
+ vca_pace_bad();
+ INCOMPL();
+ }
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ sp->fd = w->acceptsock;
+ sp->id = w->acceptsock;
+ w->acceptsock = -1;
+ sp->t_open = TIM_real();
+ sp->t_end = sp->t_end;
+ sp->mylsock = w->acceptlsock;
+ assert(w->acceptaddrlen <= sp->sockaddrlen);
+ memcpy(sp->sockaddr, &w->acceptaddr, w->acceptaddrlen);
+ sp->sockaddrlen = w->acceptaddrlen;
+ sp->step = STP_FIRST;
+ vca_pace_good();
+ w->sp = sp;
+ w->stats.client_conn++;
+}
+
+/*--------------------------------------------------------------------*/
+
static void *
vca_acct(void *arg)
{
- struct sess *sp;
- socklen_t l;
- struct sockaddr_storage addr_s;
- struct sockaddr *addr;
#ifdef SO_RCVTIMEO_WORKS
double sess_timeout = 0;
#endif
#ifdef SO_SNDTIMEO_WORKS
double send_timeout = 0;
#endif
- int i;
- struct pollfd *pfd;
struct listen_sock *ls;
- unsigned u;
double t0, now;
THR_SetName("cache-acceptor");
(void)arg;
- /* Set up the poll argument */
- pfd = calloc(sizeof *pfd, heritage.nsocks);
- AN(pfd);
- i = 0;
VTAILQ_FOREACH(ls, &heritage.socks, list) {
if (ls->sock < 0)
continue;
AZ(listen(ls->sock, params->listen_depth));
AZ(setsockopt(ls->sock, SOL_SOCKET, SO_LINGER,
&linger, sizeof linger));
- pfd[i].events = POLLIN;
- pfd[i++].fd = ls->sock;
}
+ hack_ready = 1;
+
need_test = 1;
t0 = TIM_real();
while (1) {
+ (void)sleep(1);
#ifdef SO_SNDTIMEO_WORKS
if (params->send_timeout != send_timeout) {
need_test = 1;
@@ -301,45 +331,8 @@ vca_acct(void *arg)
}
}
#endif
- i = poll(pfd, heritage.nsocks, 1000);
now = TIM_real();
VSC_C_main->uptime = (uint64_t)(now - t0);
- u = 0;
- VTAILQ_FOREACH(ls, &heritage.socks, list) {
- if (ls->sock < 0)
- continue;
- if (pfd[u++].revents == 0)
- continue;
- VSC_C_main->client_conn++;
- l = sizeof addr_s;
- addr = (void*)&addr_s;
- i = VCA_Accept(ls->sock, &l, &addr_s);
- if (i < 0)
- continue;
- sp = SES_New(NULL);
- if (sp == NULL) {
- AZ(close(i));
- VSC_C_main->client_drop++;
- vca_pace_bad();
- continue;
- }
- sp->fd = i;
- sp->id = i;
- sp->t_open = now;
- sp->t_end = now;
- sp->mylsock = ls;
- assert(l < sp->sockaddrlen);
- memcpy(sp->sockaddr, addr, l);
- sp->sockaddrlen = l;
-
- sp->step = STP_FIRST;
- if (Pool_QueueSession(sp)) {
- VSC_C_main->client_drop++;
- vca_pace_bad();
- } else {
- vca_pace_good();
- }
- }
}
NEEDLESS_RETURN(NULL);
}
diff --git a/bin/varnishd/cache_center.c b/bin/varnishd/cache_center.c
index b561c84..db51927 100644
--- a/bin/varnishd/cache_center.c
+++ b/bin/varnishd/cache_center.c
@@ -369,7 +369,6 @@ cnt_done(struct sess *sp)
if (sp->fd < 0) {
sp->wrk->stats.sess_closed++;
- sp->wrk = NULL;
SES_Delete(sp, NULL);
return (1);
}
diff --git a/bin/varnishd/cache_hash.c b/bin/varnishd/cache_hash.c
index 16e5cf0..8e0fd4e 100644
--- a/bin/varnishd/cache_hash.c
+++ b/bin/varnishd/cache_hash.c
@@ -509,7 +509,6 @@ hsh_rush(struct objhead *oh)
* We could not schedule the session, leave the
* rest on the busy list.
*/
- VSC_C_main->client_drop_late++;
break;
}
}
diff --git a/bin/varnishd/cache_main.c b/bin/varnishd/cache_main.c
index 3489530..5660893 100644
--- a/bin/varnishd/cache_main.c
+++ b/bin/varnishd/cache_main.c
@@ -113,7 +113,6 @@ child_main(void)
VCL_Init();
HTTP_Init();
- SES_Init();
VBE_Init();
VBP_Init();
diff --git a/bin/varnishd/cache_pool.c b/bin/varnishd/cache_pool.c
index 05ce529..1bfc6b4 100644
--- a/bin/varnishd/cache_pool.c
+++ b/bin/varnishd/cache_pool.c
@@ -66,6 +66,7 @@ struct poolsock {
unsigned magic;
#define POOLSOCK_MAGIC 0x1b0a2d38
VTAILQ_ENTRY(poolsock) list;
+ struct listen_sock *lsock;
int sock;
};
@@ -83,6 +84,7 @@ struct pool {
unsigned last_lqueue;
uintmax_t ndrop;
uintmax_t nqueue;
+ struct sesspool *sesspool;
};
static struct pool **wq;
@@ -95,11 +97,46 @@ static struct lock herder_mtx;
/*--------------------------------------------------------------------*/
+static void
+pool_accept(struct pool *pp, struct worker *w, const struct poolsock *ps)
+{
+ struct worker *w2;
+
+ CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+ CHECK_OBJ_NOTNULL(w, WORKER_MAGIC);
+ CHECK_OBJ_NOTNULL(ps, POOLSOCK_MAGIC);
+
+ assert(ps->sock >= 0);
+ Lck_AssertHeld(&pp->mtx);
+ Lck_Unlock(&pp->mtx);
+ while (1) {
+ w->acceptsock =
+ VCA_Accept(ps->sock, &w->acceptaddrlen, &w->acceptaddr);
+ if (w->acceptsock == -1)
+ continue;
+ w->acceptlsock = ps->lsock;
+ Lck_Lock(&pp->mtx);
+ if (VTAILQ_EMPTY(&pp->idle))
+ return;
+ w2 = VTAILQ_FIRST(&pp->idle);
+ VTAILQ_REMOVE(&pp->idle, w2, list);
+ Lck_Unlock(&pp->mtx);
+ w2->acceptaddr = w->acceptaddr;
+ w2->acceptaddrlen = w->acceptaddrlen;
+ w2->acceptsock = w->acceptsock;
+ w2->acceptlsock = w->acceptlsock;
+ AZ(pthread_cond_signal(&w2->cond));
+ }
+}
+
+/*--------------------------------------------------------------------*/
+
void
Pool_Work_Thread(void *priv, struct worker *w)
{
struct pool *qp;
int stats_clean;
+ struct poolsock *ps;
CAST_OBJ_NOTNULL(qp, priv, POOL_MAGIC);
w->pool = qp;
@@ -107,6 +144,9 @@ Pool_Work_Thread(void *priv, struct worker *w)
qp->nthr++;
stats_clean = 1;
while (1) {
+
+ Lck_AssertHeld(&qp->mtx);
+
CHECK_OBJ_NOTNULL(w->bereq, HTTP_MAGIC);
CHECK_OBJ_NOTNULL(w->beresp, HTTP_MAGIC);
CHECK_OBJ_NOTNULL(w->resp, HTTP_MAGIC);
@@ -117,17 +157,29 @@ Pool_Work_Thread(void *priv, struct worker *w)
if (w->sp != NULL) {
VTAILQ_REMOVE(&qp->queue, w->sp, poollist);
qp->lqueue--;
- } else {
+ } else if (VTAILQ_EMPTY(&qp->socks)) {
if (isnan(w->lastused))
w->lastused = TIM_real();
VTAILQ_INSERT_HEAD(&qp->idle, w, list);
if (!stats_clean)
WRK_SumStat(w);
Lck_CondWait(&w->cond, &qp->mtx);
+ } else {
+ ps = VTAILQ_FIRST(&qp->socks);
+ VTAILQ_REMOVE(&qp->socks, ps, list);
+ pool_accept(qp, w, ps);
+ Lck_AssertHeld(&qp->mtx);
+ VTAILQ_INSERT_TAIL(&qp->socks, ps, list);
}
- if (w->sp == NULL)
+ if (w->sp == NULL && w->acceptsock == -1)
break;
Lck_Unlock(&qp->mtx);
+ if (w->sp == NULL) {
+ w->sp = SES_New(w, qp->sesspool);
+ VCA_SetupSess(w);
+ }
+ AN(w->sp);
+ assert(w->acceptsock == -1);
stats_clean = 0;
w->lastused = NAN;
WS_Reset(w->ws, NULL);
@@ -221,7 +273,9 @@ Pool_QueueSession(struct sess *sp)
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
AZ(sp->wrk);
if (WRK_Queue(sp) == 0)
- return (0);
+ return(0);
+
+ VSC_C_main->client_drop_late++;
/*
* Couldn't queue it -- kill it.
@@ -258,11 +312,16 @@ pool_mkpool(void)
VTAILQ_INIT(&pp->queue);
VTAILQ_INIT(&pp->idle);
VTAILQ_INIT(&pp->socks);
+ pp->sesspool = SES_NewPool();
+ AN(pp->sesspool);
VTAILQ_FOREACH(ls, &heritage.socks, list) {
+ if (ls->sock < 0)
+ continue;
ALLOC_OBJ(ps, POOLSOCK_MAGIC);
XXXAN(ps);
ps->sock = ls->sock;
+ ps->lsock = ls;
VTAILQ_INSERT_TAIL(&pp->socks, ps, list);
}
return (pp);
diff --git a/bin/varnishd/cache_session.c b/bin/varnishd/cache_session.c
index a930c07..4c01f34 100644
--- a/bin/varnishd/cache_session.c
+++ b/bin/varnishd/cache_session.c
@@ -28,6 +28,9 @@
*
* Session management
*
+ * This is a little bit of a mixed back, containing both memory management
+ * and various state-change functions.
+ *
*/
#include "config.h"
@@ -39,7 +42,6 @@
#include <sys/socket.h>
#include "cache.h"
-#include "cache_backend.h"
#include "cache_waiter.h"
/*--------------------------------------------------------------------*/
@@ -63,16 +65,12 @@ struct sesspool {
VTAILQ_HEAD(,sessmem) freelist;
struct lock mtx;
unsigned nsess;
- unsigned maxsess;
+ unsigned dly_free_cnt;
};
-static struct sesspool *sesspool;
-
-/*--------------------------------------------------------------------*/
-
-static struct lock stat_mtx;
-
-/*--------------------------------------------------------------------*/
+/*--------------------------------------------------------------------
+ * Charge statistics from worker to request and session.
+ */
void
SES_Charge(struct sess *sp)
@@ -112,16 +110,12 @@ ses_sm_alloc(void)
hl = HTTP_estimate(nhttp);
l = sizeof *sm + nws + 2 * hl;
+ VSC_C_main->g_sessmem_size = l;
p = malloc(l);
if (p == NULL)
return (NULL);
q = p + l;
- /* XXX Stats */
- Lck_Lock(&stat_mtx);
- VSC_C_main->n_sess_mem++;
- Lck_Unlock(&stat_mtx);
-
/* Don't waste time zeroing the workspace */
memset(p, 0, l - nws);
@@ -179,38 +173,42 @@ ses_setup(struct sessmem *sm)
*/
struct sess *
-SES_New(struct sesspool *pp)
+SES_New(struct worker *wrk, struct sesspool *pp)
{
struct sessmem *sm;
struct sess *sp;
- int do_alloc = 0;
+ int do_alloc;
- if (pp == NULL)
- pp = sesspool;
CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
+ do_alloc = 0;
Lck_Lock(&pp->mtx);
sm = VTAILQ_FIRST(&pp->freelist);
if (sm != NULL) {
VTAILQ_REMOVE(&pp->freelist, sm, list);
- } else if (pp->nsess < pp->maxsess) {
+ } else if (pp->nsess < params->max_sess) {
pp->nsess++;
do_alloc = 1;
}
+ wrk->stats.c_sessmem_free += pp->dly_free_cnt;
+ pp->dly_free_cnt = 0;
Lck_Unlock(&pp->mtx);
if (do_alloc) {
sm = ses_sm_alloc();
if (sm != NULL) {
+ wrk->stats.c_sessmem_alloc++;
sm->pool = pp;
ses_setup(sm);
+ } else {
+ wrk->stats.c_sessmem_fail++;
}
+ } else if (sm == NULL) {
+ wrk->stats.c_sessmem_limit++;
}
if (sm == NULL)
return (NULL);
sp = &sm->sess;
CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- /* XXX Stats */
- VSC_C_main->n_sess++; /* XXX: locking ? */
return (sp);
}
@@ -251,8 +249,7 @@ SES_Handle(struct sess *sp, int status)
break;
case 1:
sp->step = STP_START;
- if (Pool_QueueSession(sp))
- VSC_C_main->client_drop_late++;
+ (void)Pool_QueueSession(sp);
break;
default:
WRONG("Unexpected return from HTC_Rx()");
@@ -276,16 +273,21 @@ SES_Close(struct sess *sp, const char *reason)
}
/*--------------------------------------------------------------------
- * (Close &) Recycle a session. If the workspace has changed, deleted it,
- * otherwise wash it, and put it up for adoption.
+ * (Close &) Free or Recycle a session.
+ *
+ * If the workspace has changed, deleted it, otherwise wash it, and put
+ * it up for adoption.
+ *
+ * XXX: We should also check nhttp
*/
void
SES_Delete(struct sess *sp, const char *reason)
{
- struct acct *b = &sp->acct_ses;
+ struct acct *b;
struct sessmem *sm;
static char noaddr[] = "-";
+ struct worker *wrk;
struct sesspool *pp;
@@ -294,6 +296,9 @@ SES_Delete(struct sess *sp, const char *reason)
CHECK_OBJ_NOTNULL(sm, SESSMEM_MAGIC);
pp = sm->pool;
CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
+ wrk = sp->wrk;
+ CHECK_OBJ_ORNULL(wrk, WORKER_MAGIC);
+
if (reason != NULL)
SES_Close(sp, reason);
@@ -301,54 +306,55 @@ SES_Delete(struct sess *sp, const char *reason)
AZ(sp->obj);
AZ(sp->vcl);
- VSC_C_main->n_sess--; /* XXX: locking ? */
- assert(!isnan(b->first));
- assert(!isnan(sp->t_end));
if (sp->addr == NULL)
sp->addr = noaddr;
if (sp->port == NULL)
sp->port = noaddr;
+
+ b = &sp->acct_ses;
+ assert(!isnan(b->first));
+ assert(!isnan(sp->t_end));
+
VSL(SLT_StatSess, sp->id, "%s %s %.0f %ju %ju %ju %ju %ju %ju %ju",
sp->addr, sp->port, sp->t_end - b->first,
b->sess, b->req, b->pipe, b->pass,
b->fetch, b->hdrbytes, b->bodybytes);
- if (sm->workspace != params->sess_workspace) {
- Lck_Lock(&stat_mtx);
- VSC_C_main->n_sess_mem--;
- Lck_Unlock(&stat_mtx);
+ if (sm->workspace != params->sess_workspace ||
+ pp->nsess > params->max_sess) {
free(sm);
Lck_Lock(&pp->mtx);
+ if (wrk != NULL)
+ wrk->stats.c_sessmem_free++;
+ else
+ pp->dly_free_cnt++;
pp->nsess--;
Lck_Unlock(&pp->mtx);
} else {
/* Clean and prepare for reuse */
ses_setup(sm);
Lck_Lock(&pp->mtx);
+ if (wrk != NULL) {
+ wrk->stats.c_sessmem_free += pp->dly_free_cnt;
+ pp->dly_free_cnt = 0;
+ }
VTAILQ_INSERT_HEAD(&pp->freelist, sm, list);
Lck_Unlock(&pp->mtx);
}
}
-/*--------------------------------------------------------------------*/
+/*--------------------------------------------------------------------
+ * Create a new pool to allocate from
+ */
-static struct sesspool *
-SES_NewPool(unsigned maxsess)
+struct sesspool *
+SES_NewPool(void)
{
struct sesspool *sp;
ALLOC_OBJ(sp, SESSPOOL_MAGIC);
+ AN(sp);
VTAILQ_INIT(&sp->freelist);
Lck_New(&sp->mtx, lck_sessmem);
- sp->maxsess = maxsess;
return (sp);
}
-
-
-void
-SES_Init()
-{
-
- sesspool = SES_NewPool(params->max_sess);
- Lck_New(&stat_mtx, lck_stat);
-}
diff --git a/bin/varnishd/cache_wrk.c b/bin/varnishd/cache_wrk.c
index 7b731ba..631342f 100644
--- a/bin/varnishd/cache_wrk.c
+++ b/bin/varnishd/cache_wrk.c
@@ -165,6 +165,7 @@ wrk_thread_real(void *priv, unsigned shm_workspace, unsigned sess_workspace,
w->bereq = HTTP_create(http0, nhttp);
w->beresp = HTTP_create(http1, nhttp);
w->resp = HTTP_create(http2, nhttp);
+ w->acceptsock = -1;
w->wrw.iov = iov;
w->wrw.siov = siov;
w->wrw.ciov = siov;
diff --git a/bin/varnishd/locks.h b/bin/varnishd/locks.h
index 6cbf91f..f1b634b 100644
--- a/bin/varnishd/locks.h
+++ b/bin/varnishd/locks.h
@@ -36,7 +36,6 @@ LOCK(hsl)
LOCK(hcb)
LOCK(hcl)
LOCK(vcl)
-LOCK(stat)
LOCK(sessmem)
LOCK(wstat)
LOCK(herder)
diff --git a/bin/varnishd/mgt_child.c b/bin/varnishd/mgt_child.c
index 9d6966a..4181a43 100644
--- a/bin/varnishd/mgt_child.c
+++ b/bin/varnishd/mgt_child.c
@@ -241,7 +241,6 @@ open_sockets(void)
* closes before we call accept(2) and nobody else are in
* the listen queue to release us.
*/
- (void)VTCP_nonblocking(ls->sock);
(void)VTCP_filter_http(ls->sock);
good++;
}
diff --git a/include/vsc_fields.h b/include/vsc_fields.h
index c35acec..e337321 100644
--- a/include/vsc_fields.h
+++ b/include/vsc_fields.h
@@ -41,7 +41,7 @@
#ifdef VSC_DO_MAIN
-VSC_F(client_conn, uint64_t, 0, 'a', "Client connections accepted")
+VSC_F(client_conn, uint64_t, 1, 'a', "Client connections accepted")
VSC_F(client_drop, uint64_t, 0, 'a',
"Connection dropped, no sess/wrk")
VSC_F(client_req, uint64_t, 1, 'a', "Client requests received")
@@ -72,6 +72,12 @@ VSC_F(fetch_1xx, uint64_t, 1, 'a', "Fetch no body (1xx)")
VSC_F(fetch_204, uint64_t, 1, 'a', "Fetch no body (204)")
VSC_F(fetch_304, uint64_t, 1, 'a', "Fetch no body (304)")
+/* Sessmem cache_session.c */
+VSC_F(g_sessmem_size, uint64_t, 1, 'i', "Session mem size")
+VSC_F(c_sessmem_alloc, uint64_t, 1, 'a', "Session mem allocated")
+VSC_F(c_sessmem_free, uint64_t, 1, 'a', "Session mem freed")
+VSC_F(c_sessmem_fail, uint64_t, 1, 'a', "Session mem alloc failed")
+VSC_F(c_sessmem_limit, uint64_t, 1, 'a', "Session mem alloc limited")
VSC_F(n_sess_mem, uint64_t, 0, 'i', "N struct sess_mem")
VSC_F(n_sess, uint64_t, 0, 'i', "N struct sess")
More information about the varnish-commit
mailing list