[master] b9d8796 Centralise the timer/poker function of pipe-based waiters, so that we can avoid thundering-herd/resonance phenomena when we have multiple waiters.
Poul-Henning Kamp
phk at FreeBSD.org
Thu Jan 15 12:47:28 CET 2015
commit b9d87969b5955d21713d437cec22af547b7663b1
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Thu Jan 15 11:46:18 2015 +0000
Centralise the timer/poker function of pipe-based waiters, so that
we can avoid thundering-herd/resonance phenomena when we have multiple
waiters.
diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index be6bc54..fed05d1 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -397,7 +397,7 @@ struct waited {
VTAILQ_ENTRY(waited) list;
int fd;
void *ptr;
- double deadline;
+ double idle;
#if defined(HAVE_EPOLL_CTL)
struct epoll_event ev;
#endif
diff --git a/bin/varnishd/cache/cache_main.c b/bin/varnishd/cache/cache_main.c
index cfce5cc..bc55e99 100644
--- a/bin/varnishd/cache/cache_main.c
+++ b/bin/varnishd/cache/cache_main.c
@@ -39,6 +39,7 @@
#include "vcli_priv.h"
#include "vrnd.h"
+#include "waiter/waiter.h"
#include "hash/hash_slinger.h"
@@ -214,6 +215,8 @@ child_main(void)
CLI_Init();
VFP_Init();
+ Wait_Init();
+
VCL_Init();
HTTP_Init();
diff --git a/bin/varnishd/cache/cache_session.c b/bin/varnishd/cache/cache_session.c
index 814005a..822c0a2 100644
--- a/bin/varnishd/cache/cache_session.c
+++ b/bin/varnishd/cache/cache_session.c
@@ -295,7 +295,7 @@ SES_Wait(struct sess *sp)
sp->waited.magic = WAITED_MAGIC;
sp->waited.fd = sp->fd;
sp->waited.ptr = sp;
- sp->waited.deadline = sp->t_idle;
+ sp->waited.idle = sp->t_idle;
if (Wait_Enter(pp->http1_waiter, &sp->waited)) {
VSC_C_main->sess_pipe_overflow++;
SES_Delete(sp, SC_SESS_PIPE_OVERFLOW, NAN);
diff --git a/bin/varnishd/waiter/cache_waiter.c b/bin/varnishd/waiter/cache_waiter.c
index 4b8fd6e..6d88566 100644
--- a/bin/varnishd/waiter/cache_waiter.c
+++ b/bin/varnishd/waiter/cache_waiter.c
@@ -38,12 +38,53 @@
#include "cache/cache.h"
#include "vfil.h"
+#include "vtim.h"
#include "waiter/waiter.h"
#include "waiter/waiter_priv.h"
#define NEV 8192
+static VTAILQ_HEAD(, waiter) waiters = VTAILQ_HEAD_INITIALIZER(waiters);
+static int nwaiters;
+static struct lock wait_mtx;
+static pthread_t wait_thr;
+
+static void *
+wait_poker_thread(void *arg)
+{
+ struct waiter *w;
+ struct waited *wp;
+ double now;
+
+ (void)arg;
+ THR_SetName("Waiter timer");
+ while (1) {
+ /* Avoid thundering herds and resonances */
+ (void)usleep(990013/nwaiters);
+
+ now = VTIM_real();
+
+ Lck_Lock(&wait_mtx);
+ w = VTAILQ_FIRST(&waiters);
+ VTAILQ_REMOVE(&waiters, w, list);
+ VTAILQ_INSERT_TAIL(&waiters, w, list);
+ assert(w->pipes[1] >= 0);
+
+ wp = VTAILQ_FIRST(&w->waithead);
+ CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
+ if (wp == w->pipe_w) {
+ VTAILQ_REMOVE(&w->waithead, wp, list);
+ VTAILQ_INSERT_TAIL(&w->waithead, wp, list);
+ wp = VTAILQ_FIRST(&w->waithead);
+ }
+ if (wp->idle + *w->tmo < now)
+ (void)write(w->pipes[1], &w->pipe_w, sizeof w->pipe_w);
+ Lck_Unlock(&wait_mtx);
+ }
+ NEEDLESS_RETURN(NULL);
+}
+
const char *
Wait_GetName(void)
{
@@ -70,10 +111,21 @@ Wait_New(waiter_handle_f *func, volatile double *tmo)
w->impl = waiter;
w->func = func;
w->tmo = tmo;
+ w->pipes[0] = w->pipes[1] = -1;
VTAILQ_INIT(&w->waithead);
waiter->init(w);
- AN(w->impl->pass || w->pfd > 0);
+ AN(w->impl->pass || w->pipes[1] >= 0);
+
+ if (w->pipes[1] >= 0 && VTAILQ_EMPTY(&waiters)) {
+ /* Start timer poker thread */
+ AZ(pthread_create(&wait_thr, NULL, wait_poker_thread, NULL));
+ }
+
+ Lck_Lock(&wait_mtx);
+ VTAILQ_INSERT_TAIL(&waiters, w, list);
+ nwaiters++;
+ Lck_Unlock(&wait_mtx);
return (w);
}
@@ -86,10 +138,9 @@ Wait_UsePipe(struct waiter *w)
AZ(pipe(w->pipes));
AZ(VFIL_nonblocking(w->pipes[0]));
AZ(VFIL_nonblocking(w->pipes[1]));
- w->pfd = w->pipes[1];
ALLOC_OBJ(w->pipe_w, WAITED_MAGIC);
w->pipe_w->fd = w->pipes[0];
- w->pipe_w->deadline = 9e99;
+ w->pipe_w->idle = 9e99;
VTAILQ_INSERT_HEAD(&w->waithead, w->pipe_w, list);
waiter->inject(w, w->pipe_w);
}
@@ -106,9 +157,9 @@ Wait_Enter(const struct waiter *w, struct waited *wp)
if (w->impl->pass != NULL)
return (w->impl->pass(w->priv, wp));
- assert(w->pfd >= 0);
+ assert(w->pipes[1] > 0);
- written = write(w->pfd, &wp, sizeof wp);
+ written = write(w->pipes[1], &wp, sizeof wp);
if (written != sizeof wp && (errno == EAGAIN || errno == EWOULDBLOCK))
return (-1);
assert (written == sizeof wp);
@@ -118,28 +169,55 @@ Wait_Enter(const struct waiter *w, struct waited *wp)
void
Wait_Handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
{
- struct waited *ss[NEV];
- int i, j;
+ struct waited *ss[NEV], *wp2;
+ int i, j, dotimer = 0;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
- if (wp == w->pipe_w) {
- i = read(w->pipes[0], ss, sizeof ss);
- if (i == -1 && errno == EAGAIN)
- return;
- for (j = 0; i >= sizeof ss[0]; j++, i -= sizeof ss[0]) {
- CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
- assert(ss[j]->fd >= 0);
- VTAILQ_INSERT_TAIL(&w->waithead, ss[j], list);
- w->impl->inject(w, ss[j]);
+ if (wp != w->pipe_w) {
+ if (w->impl->evict != NULL)
+ w->impl->evict(w, wp);
+
+ VTAILQ_REMOVE(&w->waithead, wp, list);
+ w->func(wp, ev, now);
+ return;
+ }
+
+ i = read(w->pipes[0], ss, sizeof ss);
+ if (i == -1 && errno == EAGAIN)
+ return;
+
+ for (j = 0; i >= sizeof ss[0]; j++, i -= sizeof ss[0]) {
+ CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
+ if (ss[j] == w->pipe_w) {
+ dotimer = 1;
+ continue;
}
- AZ(i);
+ assert(ss[j]->fd >= 0);
+ VTAILQ_INSERT_TAIL(&w->waithead, ss[j], list);
+ w->impl->inject(w, ss[j]);
+ }
+ AZ(i);
+
+ if (!dotimer)
return;
+
+ VTAILQ_FOREACH_SAFE(wp, &w->waithead, list, wp2) {
+ if (wp == w->pipe_w)
+ continue;
+ if (wp->idle + *w->tmo > now)
+ break;
+ if (w->impl->evict != NULL)
+ w->impl->evict(w, wp);
+ VTAILQ_REMOVE(&w->waithead, wp, list);
+ w->func(wp, WAITER_TIMEOUT, now);
}
- if (w->impl->evict != NULL)
- w->impl->evict(w, wp);
+}
+
+void
+Wait_Init(void)
+{
- VTAILQ_REMOVE(&w->waithead, wp, list);
- w->func(wp, ev, now);
+ Lck_New(&wait_mtx, lck_misc);
}
diff --git a/bin/varnishd/waiter/cache_waiter_epoll.c b/bin/varnishd/waiter/cache_waiter_epoll.c
index a281f6b..8f63c78 100644
--- a/bin/varnishd/waiter/cache_waiter_epoll.c
+++ b/bin/varnishd/waiter/cache_waiter_epoll.c
@@ -110,7 +110,7 @@ vwe_thread(void *priv)
struct epoll_event ev[NEEV], *ep;
struct waited *sp, *sp2;
char junk;
- double now, deadline;
+ double now, idle;
int dotimer, i, n;
struct vwe *vwe;
@@ -135,9 +135,9 @@ vwe_thread(void *priv)
continue;
/* check for timeouts */
- deadline = now - *vwe->waiter->tmo;
+ idle = now - *vwe->waiter->tmo;
VTAILQ_FOREACH_SAFE(sp, &vwe->waiter->waithead, list, sp2) {
- if (sp->deadline < deadline)
+ if (sp->idle < idle)
Wait_Handle(vwe->waiter, sp,
WAITER_TIMEOUT, now);
}
diff --git a/bin/varnishd/waiter/cache_waiter_kqueue.c b/bin/varnishd/waiter/cache_waiter_kqueue.c
index 422dabe..8609048 100644
--- a/bin/varnishd/waiter/cache_waiter_kqueue.c
+++ b/bin/varnishd/waiter/cache_waiter_kqueue.c
@@ -127,9 +127,8 @@ vwk_thread(void *priv)
{
struct vwk *vwk;
struct kevent ke[NKEV], *kp;
- int j, n, dotimer;
- double now, deadline;
- struct waited *sp;
+ int j, n;
+ double now;
CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
THR_SetName("cache-kqueue");
@@ -141,42 +140,17 @@ vwk_thread(void *priv)
vwk->nki = 0;
while (1) {
- dotimer = 0;
n = kevent(vwk->kq, vwk->ki, vwk->nki, ke, NKEV, NULL);
- now = VTIM_real();
assert(n <= NKEV);
if (n == 0) {
/* This happens on OSX in m00011.vtc */
- dotimer = 1;
(void)usleep(10000);
}
vwk->nki = 0;
+ now = VTIM_real();
for (kp = ke, j = 0; j < n; j++, kp++) {
- if (kp->filter == EVFILT_TIMER) {
- dotimer = 1;
- } else {
- assert(kp->filter == EVFILT_READ);
- vwk_sess_ev(vwk, kp, now);
- }
- }
- if (!dotimer)
- continue;
- /*
- * Make sure we have no pending changes for the fd's
- * we are about to close, in case the accept(2) in the
- * other thread creates new fd's betwen our close and
- * the kevent(2) at the top of this loop, the kernel
- * would not know we meant "the old fd of this number".
- */
- vwk_kq_flush(vwk);
- deadline = now - *vwk->waiter->tmo;
- for (;;) {
- sp = VTAILQ_FIRST(&vwk->waiter->waithead);
- if (sp == NULL)
- break;
- if (sp->deadline > deadline)
- break;
- Wait_Handle(vwk->waiter, sp, WAITER_TIMEOUT, now);
+ assert(kp->filter == EVFILT_READ);
+ vwk_sess_ev(vwk, kp, now);
}
}
NEEDLESS_RETURN(NULL);
@@ -194,9 +168,6 @@ vwk_init(struct waiter *w)
INIT_OBJ(vwk, VWK_MAGIC);
vwk->waiter = w;
- EV_SET(&vwk->ki[vwk->nki], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
- vwk->nki++;
-
Wait_UsePipe(w);
AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
diff --git a/bin/varnishd/waiter/cache_waiter_poll.c b/bin/varnishd/waiter/cache_waiter_poll.c
index dfaac80..48c3cd7 100644
--- a/bin/varnishd/waiter/cache_waiter_poll.c
+++ b/bin/varnishd/waiter/cache_waiter_poll.c
@@ -131,7 +131,7 @@ vwp_main(void *priv)
int v, v2;
struct vwp *vwp;
struct waited *sp, *sp2;
- double now, deadline;
+ double now, idle;
int fd;
CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
@@ -141,11 +141,11 @@ vwp_main(void *priv)
assert(vwp->hpoll < vwp->npoll);
while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
vwp->hpoll--;
- v = poll(vwp->pollfd, vwp->hpoll + 1, 100);
+ v = poll(vwp->pollfd, vwp->hpoll + 1, -1);
assert(v >= 0);
v2 = v;
now = VTIM_real();
- deadline = now - *vwp->waiter->tmo;
+ idle = now - *vwp->waiter->tmo;
VTAILQ_FOREACH_SAFE(sp, &vwp->waiter->waithead, list, sp2) {
if (v != 0 && v2 == 0)
break;
@@ -160,7 +160,7 @@ vwp_main(void *priv)
vwp->pollfd[fd].revents = 0;
Wait_Handle(vwp->waiter, sp, WAITER_ACTION,
now);
- } else if (sp->deadline <= deadline) {
+ } else if (sp->idle <= idle) {
Wait_Handle(vwp->waiter, sp, WAITER_TIMEOUT,
now);
}
diff --git a/bin/varnishd/waiter/cache_waiter_ports.c b/bin/varnishd/waiter/cache_waiter_ports.c
index 76dad22..158a624 100644
--- a/bin/varnishd/waiter/cache_waiter_ports.c
+++ b/bin/varnishd/waiter/cache_waiter_ports.c
@@ -154,7 +154,7 @@ vws_thread(void *priv)
port_event_t ev[MAX_EVENTS];
u_int nevents;
int ei, ret;
- double now, deadline;
+ double now, idle;
/*
* XXX Do we want to scale this up dynamically to increase
@@ -189,7 +189,7 @@ vws_thread(void *priv)
vws_port_ev(vws, ev + ei, now);
/* check for timeouts */
- deadline = now - *vws->waiter->tmo;
+ idle = now - *vws->waiter->tmo;
/*
* This loop assumes that the oldest sessions are always at the
@@ -202,7 +202,7 @@ vws_thread(void *priv)
sp = VTAILQ_FIRST(&vws->waiter->waithead);
if (sp == NULL)
break;
- if (sp->deadline > deadline) {
+ if (sp->idle > idle) {
break;
}
vws_del(vws, sp->fd);
@@ -214,7 +214,7 @@ vws_thread(void *priv)
*/
if (sp) {
- double tmo = (sp->deadline + *vws->waiter->tmo) - now;
+ double tmo = (sp->idle + *vws->waiter->tmo) - now;
if (tmo < min_t) {
timeout = &min_ts;
diff --git a/bin/varnishd/waiter/waiter.h b/bin/varnishd/waiter/waiter.h
index 4c41001..ee4b6bc 100644
--- a/bin/varnishd/waiter/waiter.h
+++ b/bin/varnishd/waiter/waiter.h
@@ -59,6 +59,7 @@ typedef void waiter_handle_f(struct waited *, enum wait_event, double now);
int Wait_Enter(const struct waiter *, struct waited *);
struct waiter *Wait_New(waiter_handle_f *, volatile double *timeout);
const char *Wait_GetName(void);
+void Wait_Init(void);
/* mgt_waiter.c */
int Wait_Argument(struct vsb *vsb, const char *arg);
diff --git a/bin/varnishd/waiter/waiter_priv.h b/bin/varnishd/waiter/waiter_priv.h
index 336328d..ba0d0e5 100644
--- a/bin/varnishd/waiter/waiter_priv.h
+++ b/bin/varnishd/waiter/waiter_priv.h
@@ -35,6 +35,8 @@ struct waiter {
unsigned magic;
#define WAITER_MAGIC 0x17c399db
const struct waiter_impl *impl;
+ VTAILQ_ENTRY(waiter) list;
+
waiter_handle_f * func;
int pipes[2];
@@ -44,7 +46,6 @@ struct waiter {
VTAILQ_HEAD(,waited) waithead;
void *priv;
- int pfd;
};
typedef void waiter_init_f(struct waiter *);
diff --git a/include/tbl/locks.h b/include/tbl/locks.h
index 1a01782..e4b4914 100644
--- a/include/tbl/locks.h
+++ b/include/tbl/locks.h
@@ -54,4 +54,5 @@ LOCK(busyobj)
LOCK(mempool)
LOCK(vxid)
LOCK(pipestat)
+LOCK(misc)
/*lint -restore */
More information about the varnish-commit
mailing list