[master] cbc829f Scrape back the waiter API, and rewrite the poll waiter to handle sparse fd-sets efficiently.
Poul-Henning Kamp
phk at FreeBSD.org
Thu May 21 18:48:51 CEST 2015
commit cbc829fef3f9fa65ce1c945ce1d18f507adf8611
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Thu May 21 09:22:17 2015 +0000
Scrape back the waiter API, and rewrite the poll waiter to handle
sparse fd-sets efficiently.
diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index cad336e..89d4cd9 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -374,7 +374,6 @@ struct waited {
unsigned magic;
#define WAITED_MAGIC 0x1743992d
int fd;
- VTAILQ_ENTRY(waited) list;
void *ptr;
double idle;
};
diff --git a/bin/varnishd/cache/cache_main.c b/bin/varnishd/cache/cache_main.c
index 7baee82..00db49d 100644
--- a/bin/varnishd/cache/cache_main.c
+++ b/bin/varnishd/cache/cache_main.c
@@ -37,7 +37,6 @@
#include "vcli_priv.h"
#include "vrnd.h"
-#include "waiter/waiter.h"
#include "hash/hash_slinger.h"
@@ -213,8 +212,6 @@ child_main(void)
PAN_Init();
VFP_Init();
- Waiter_Init();
-
VCL_Init();
HTTP_Init();
diff --git a/bin/varnishd/waiter/cache_waiter.c b/bin/varnishd/waiter/cache_waiter.c
index c1a03fa..3373d0c 100644
--- a/bin/varnishd/waiter/cache_waiter.c
+++ b/bin/varnishd/waiter/cache_waiter.c
@@ -36,174 +36,17 @@
#include "cache/cache.h"
-#include "vfil.h"
-#include "vtim.h"
-
#include "waiter/waiter.h"
#include "waiter/waiter_priv.h"
-#define NEV 8192
-
-static VTAILQ_HEAD(, waiter) waiters = VTAILQ_HEAD_INITIALIZER(waiters);
-static int nwaiters;
-static struct lock wait_mtx;
-static pthread_t wait_thr;
-
-static void *
-wait_poker_thread(void *arg)
-{
- struct waiter *w;
- double now;
-
- (void)arg;
- THR_SetName("Waiter timer");
- while (1) {
- /* Avoid thundering herds and resonances */
- (void)usleep(990013/nwaiters);
-
- now = VTIM_real();
-
- Lck_Lock(&wait_mtx);
- w = VTAILQ_FIRST(&waiters);
- VTAILQ_REMOVE(&waiters, w, list);
- VTAILQ_INSERT_TAIL(&waiters, w, list);
- assert(w->pipes[1] >= 0);
-
- if (w->next_idle + *w->tmo < now)
- (void)write(w->pipes[1], &w->pipe_w, sizeof w->pipe_w);
- Lck_Unlock(&wait_mtx);
- }
- NEEDLESS_RETURN(NULL);
-}
-
-void
-Wait_UsePipe(struct waiter *w)
-{
- CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
-
- AN(waiter->inject);
- AZ(pipe(w->pipes));
- AZ(VFIL_nonblocking(w->pipes[0]));
- AZ(VFIL_nonblocking(w->pipes[1]));
- ALLOC_OBJ(w->pipe_w, WAITED_MAGIC);
- w->pipe_w->fd = w->pipes[0];
- w->pipe_w->idle = 0;
- VTAILQ_INSERT_HEAD(&w->waithead, w->pipe_w, list);
- waiter->inject(w, w->pipe_w);
-}
-
int
Wait_Enter(const struct waiter *w, struct waited *wp)
{
- ssize_t written;
- uintptr_t up;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
- assert(wp->fd > 0); // stdin never comes here
- AZ(w->dismantle);
-
- if (w->impl->pass != NULL)
- return (w->impl->pass(w->priv, wp));
-
- assert(w->pipes[1] > 0);
-
- up = (uintptr_t)wp;
- written = write(w->pipes[1], &up, sizeof up);
- if (written != sizeof up && (errno == EAGAIN || errno == EWOULDBLOCK))
- return (-1);
- assert (written == sizeof up);
- return (0);
-}
-
-static void
-wait_updidle(struct waiter *w, double now)
-{
- struct waited *wp;
-
- wp = VTAILQ_FIRST(&w->waithead);
- if (wp == NULL)
- return;
- if (wp == w->pipe_w) {
- VTAILQ_REMOVE(&w->waithead, wp, list);
- VTAILQ_INSERT_TAIL(&w->waithead, wp, list);
- wp->idle = now;
- wp = VTAILQ_FIRST(&w->waithead);
- }
- w->next_idle = wp->idle;
-}
-
-void
-Wait_Handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
-{
- uintptr_t ss[NEV];
- struct waited *wp2;
- int i, j, dotimer = 0;
-
- CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
- CHECK_OBJ_ORNULL(wp, WAITED_MAGIC);
-
- if (wp != NULL) {
- if (wp == w->pipe_w) {
- w->do_pipe = 1;
- VTAILQ_REMOVE(&w->waithead, w->pipe_w, list);
- wp->idle = now;
- VTAILQ_INSERT_TAIL(&w->waithead, w->pipe_w, list);
- } else {
- if (w->impl->evict != NULL)
- w->impl->evict(w, wp);
-
- VTAILQ_REMOVE(&w->waithead, wp, list);
- w->func(wp, ev, now);
- wait_updidle(w, now);
- }
- return;
- }
-
- AZ(wp);
-
- if (!w->do_pipe)
- return;
-
- w->do_pipe = 0;
-
- i = read(w->pipes[0], ss, sizeof ss);
- if (i == -1 && errno == EAGAIN)
- return;
-
- for (j = 0; i >= sizeof ss[0]; j++, i -= sizeof ss[0]) {
- if (ss[j] == 0) {
- AN(w->dismantle);
- continue;
- }
- ss[j] &= ~1;
- CAST_OBJ_NOTNULL(wp2, (void*)ss[j], WAITED_MAGIC);
- if (wp2 == w->pipe_w) {
- dotimer = 1;
- } else {
- assert(wp2->fd >= 0);
- VTAILQ_INSERT_TAIL(&w->waithead, wp2, list);
- w->impl->inject(w, wp2);
- }
- }
- AZ(i);
-
- wait_updidle(w, now);
-
- if (!dotimer)
- return;
-
- VTAILQ_FOREACH_SAFE(wp, &w->waithead, list, wp2) {
- if (wp == w->pipe_w)
- continue;
- if (wp->idle + *w->tmo > now)
- break;
- if (w->impl->evict != NULL)
- w->impl->evict(w, wp);
- VTAILQ_REMOVE(&w->waithead, wp, list);
- w->func(wp, WAITER_TIMEOUT, now);
- }
- wait_updidle(w, now);
+ assert(wp->fd > 0); // stdin never comes here
+ return (w->impl->enter(w->priv, wp));
}
/**********************************************************************/
@@ -215,7 +58,7 @@ Waiter_GetName(void)
if (waiter != NULL)
return (waiter->name);
else
- return ("no_waiter");
+ return ("(No Waiter?)");
}
struct waiter *
@@ -226,6 +69,8 @@ Waiter_New(waiter_handle_f *func, volatile double *tmo)
AN(waiter);
AN(waiter->name);
AN(waiter->init);
+ AN(waiter->enter);
+ AN(waiter->fini);
w = calloc(1, sizeof (struct waiter) + waiter->size);
AN(w);
@@ -234,20 +79,10 @@ Waiter_New(waiter_handle_f *func, volatile double *tmo)
w->impl = waiter;
w->func = func;
w->tmo = tmo;
- w->pipes[0] = w->pipes[1] = -1;
VTAILQ_INIT(&w->waithead);
waiter->init(w);
- AN(w->impl->pass || w->pipes[1] >= 0);
-
- Lck_Lock(&wait_mtx);
- VTAILQ_INSERT_TAIL(&waiters, w, list);
- nwaiters++;
- /* We assume all waiters either use pipes or don't use pipes */
- if (w->pipes[1] >= 0 && nwaiters == 1)
- AZ(pthread_create(&wait_thr, NULL, wait_poker_thread, NULL));
- Lck_Unlock(&wait_mtx);
return (w);
}
@@ -255,47 +90,13 @@ void
Waiter_Destroy(struct waiter **wp)
{
struct waiter *w;
- struct waited *wx = NULL;
- int written;
- double now;
AN(wp);
w = *wp;
*wp = NULL;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
- Lck_Lock(&wait_mtx);
- VTAILQ_REMOVE(&waiters, w, list);
- w->dismantle = 1;
- Lck_Unlock(&wait_mtx);
-
- if (w->pipes[1] >= 0) {
- while (1) {
- written = write(w->pipes[1], &wx, sizeof wx);
- if (written == sizeof wx)
- break;
- (void)usleep(10000);
- }
- }
AN(w->impl->fini);
w->impl->fini(w);
- now = VTIM_real();
- while (1) {
- wx = VTAILQ_FIRST(&w->waithead);
- if (wx == NULL)
- break;
- VTAILQ_REMOVE(&w->waithead, wx, list);
- if (wx == w->pipe_w)
- FREE_OBJ(wx);
- else
- w->func(wx, WAITER_CLOSE, now);
- }
FREE_OBJ(w);
}
-
-void
-Waiter_Init(void)
-{
-
- Lck_New(&wait_mtx, lck_misc);
-}
diff --git a/bin/varnishd/waiter/cache_waiter_poll.c b/bin/varnishd/waiter/cache_waiter_poll.c
index d3cbe4e..467f73f 100644
--- a/bin/varnishd/waiter/cache_waiter_poll.c
+++ b/bin/varnishd/waiter/cache_waiter_poll.c
@@ -45,80 +45,88 @@ struct vwp {
#define VWP_MAGIC 0x4b2cc735
struct waiter *waiter;
+ int pipes[2];
+
pthread_t thread;
struct pollfd *pollfd;
- unsigned npoll;
- unsigned hpoll;
+ struct waited **idx;
+ size_t npoll;
+ size_t hpoll;
};
-/*--------------------------------------------------------------------*/
+/*--------------------------------------------------------------------
+ * It would make much more sense to not use two large vectors, but
+ * the poll(2) API forces us to use at least one, so ... KISS.
+ */
static void
-vwp_pollspace(struct vwp *vwp, unsigned fd)
+vwp_extend_pollspace(struct vwp *vwp)
{
- struct pollfd *newpollfd = vwp->pollfd;
- unsigned newnpoll;
-
- if (fd < vwp->npoll)
- return;
- newnpoll = vwp->npoll;
- if (newnpoll == 0)
- newnpoll = 1;
- while (fd >= newnpoll)
- newnpoll = newnpoll * 2;
- VSL(SLT_Debug, 0, "Acceptor poll space increased to %u", newnpoll);
- newpollfd = realloc(newpollfd, newnpoll * sizeof *newpollfd);
- XXXAN(newpollfd);
- memset(newpollfd + vwp->npoll, 0,
- (newnpoll - vwp->npoll) * sizeof *newpollfd);
- vwp->pollfd = newpollfd;
- while (vwp->npoll < newnpoll)
+ size_t inc = (1<<16);
+
+ VSL(SLT_Debug, 0, "Acceptor poll space increased by %zu to %zu",
+ inc, vwp->npoll + inc);
+
+ vwp->pollfd = realloc(vwp->pollfd,
+ (vwp->npoll + inc) * sizeof(*vwp->pollfd));
+ AN(vwp->pollfd);
+ memset(vwp->pollfd + vwp->npoll, 0, inc * sizeof(*vwp->pollfd));
+
+ vwp->idx = realloc(vwp->idx, (vwp->npoll + inc) * sizeof(*vwp->idx));
+ AN(vwp->idx);
+ memset(vwp->idx + vwp->npoll, 0, inc * sizeof(*vwp->idx));
+
+ for (; inc > 0; inc--)
vwp->pollfd[vwp->npoll++].fd = -1;
- assert(fd < vwp->npoll);
}
/*--------------------------------------------------------------------*/
-static void __match_proto__(waiter_inject_f)
-vwp_inject(const struct waiter *w, struct waited *wp)
+static void
+vwp_add(struct vwp *vwp, struct waited *w)
{
- struct vwp *vwp;
- int fd;
- CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
- CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
- fd = wp->fd;
-VSL(SLT_Debug, 0, "POLL Inject %d", fd);
- assert(fd >= 0);
- vwp_pollspace(vwp, (unsigned)fd);
- assert(fd < vwp->npoll);
- if (vwp->hpoll < fd)
- vwp->hpoll = fd;
-
- assert(vwp->pollfd[fd].fd == -1);
- AZ(vwp->pollfd[fd].events);
- AZ(vwp->pollfd[fd].revents);
-
- vwp->pollfd[fd].fd = fd;
- vwp->pollfd[fd].events = POLLIN;
+ if (vwp->hpoll == vwp->npoll)
+ vwp_extend_pollspace(vwp);
+ assert(vwp->hpoll < vwp->npoll);
+ assert(vwp->pollfd[vwp->hpoll].fd == -1);
+ AZ(vwp->idx[vwp->hpoll]);
+ vwp->pollfd[vwp->hpoll].fd = w->fd;
+ vwp->pollfd[vwp->hpoll].events = POLLIN;
+ vwp->idx[vwp->hpoll] = w;
+ vwp->hpoll++;
}
-static void __match_proto__(waiter_evict_f)
-vwp_evict(const struct waiter *w, struct waited *wp)
+static void
+vwp_del(struct vwp *vwp, int n)
{
- struct vwp *vwp;
- int fd;
+ vwp->hpoll--;
+ if (n != vwp->hpoll) {
+ vwp->pollfd[n] = vwp->pollfd[vwp->hpoll];
+ vwp->idx[n] = vwp->idx[vwp->hpoll];
+ }
+ memset(&vwp->pollfd[vwp->hpoll], 0, sizeof(*vwp->pollfd));
+ vwp->pollfd[vwp->hpoll].fd = -1;
+ vwp->idx[vwp->hpoll] = NULL;
+}
- CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
- CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
- fd = wp->fd;
-VSL(SLT_Debug, 0, "POLL Evict %d", fd);
- assert(fd >= 0);
- assert(fd < vwp->npoll);
- vwp_pollspace(vwp, (unsigned)fd);
-
- vwp->pollfd[fd].fd = -1;
- vwp->pollfd[fd].events = 0;
+/*--------------------------------------------------------------------*/
+
+static void
+vwp_dopipe(struct vwp *vwp)
+{
+ struct waited *w[128];
+ ssize_t ss;
+ int i;
+
+ ss = read(vwp->pipes[0], w, sizeof w);
+ assert(ss > 0);
+ i = 0;
+ while (ss) {
+ CHECK_OBJ_NOTNULL(w[i], WAITED_MAGIC);
+ assert(w[i]->fd > 0); // no stdin
+ vwp_add(vwp, w[i++]);
+ }
}
/*--------------------------------------------------------------------*/
@@ -128,50 +136,66 @@ vwp_main(void *priv)
{
int v, v2;
struct vwp *vwp;
- struct waited *sp, *sp2;
+ struct waited *wp;
double now, idle;
- int fd;
+ int i;
CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
THR_SetName("cache-poll");
- while (!vwp->waiter->dismantle) {
+ while (1) {
+ // Try to keep the high point as low as possible
assert(vwp->hpoll < vwp->npoll);
while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
vwp->hpoll--;
+
+ // XXX: sleep on ->tmo
v = poll(vwp->pollfd, vwp->hpoll + 1, -1);
assert(v >= 0);
v2 = v;
now = VTIM_real();
idle = now - *vwp->waiter->tmo;
- VTAILQ_FOREACH_SAFE(sp, &vwp->waiter->waithead, list, sp2) {
- if (v != 0 && v2 == 0)
- break;
- CHECK_OBJ_NOTNULL(sp, WAITED_MAGIC);
- fd = sp->fd;
- VSL(SLT_Debug, 0,
- "POLL Handle %d %x", fd, vwp->pollfd[fd].revents);
- assert(fd >= 0);
- assert(fd <= vwp->hpoll);
- assert(fd < vwp->npoll);
- assert(vwp->pollfd[fd].fd == fd);
- if (vwp->pollfd[fd].revents) {
+ i = 1;
+ while (v2 > 0 && i < vwp->hpoll) {
+ wp = vwp->idx[i];
+ CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
+ if (vwp->pollfd[i].revents != 0) {
v2--;
- vwp->pollfd[fd].revents = 0;
- Wait_Handle(vwp->waiter, sp, WAITER_ACTION,
- now);
- } else if (sp->idle <= idle) {
- Wait_Handle(vwp->waiter, sp, WAITER_TIMEOUT,
- now);
+ assert(wp->fd > 0);
+ assert(wp->fd == vwp->pollfd[i].fd);
+ VSL(SLT_Debug, wp->fd, "POLL Handle %d %x",
+ wp->fd, vwp->pollfd[i].revents);
+ vwp_del(vwp, i);
+ vwp->waiter->func(wp, WAITER_ACTION, now);
+ } else if (wp->idle <= idle) {
+ vwp_del(vwp, i);
+ vwp->waiter->func(wp, WAITER_TIMEOUT, now);
+ } else {
+ i++;
}
}
- Wait_Handle(vwp->waiter, NULL, WAITER_ACTION, now);
+ if (vwp->pollfd[0].revents)
+ vwp_dopipe(vwp);
}
NEEDLESS_RETURN(NULL);
}
/*--------------------------------------------------------------------*/
+static int __match_proto__(waiter_enter_f)
+vwp_enter(void *priv, struct waited *wp)
+{
+ struct vwp *vwp;
+
+ CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
+
+ if (write(vwp->pipes[1], &wp, sizeof wp) != sizeof wp)
+ return (-1);
+ return (0);
+}
+
+/*--------------------------------------------------------------------*/
+
static void __match_proto__(waiter_init_f)
vwp_init(struct waiter *w)
{
@@ -181,9 +205,13 @@ vwp_init(struct waiter *w)
vwp = w->priv;
INIT_OBJ(vwp, VWP_MAGIC);
vwp->waiter = w;
+ AZ(pipe(vwp->pipes));
+ // XXX: set write pipe non-blocking
- vwp_pollspace(vwp, 256);
- Wait_UsePipe(w);
+ vwp_extend_pollspace(vwp);
+ vwp->pollfd[0].fd = vwp->pipes[0];
+ vwp->pollfd[0].events = POLLIN;
+ vwp->hpoll = 1;
AZ(pthread_create(&vwp->thread, NULL, vwp_main, vwp));
}
@@ -196,6 +224,9 @@ vwp_fini(struct waiter *w)
void *vp;
CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
+ vp = NULL;
+ // XXX: set write pipe blocking
+ assert(write(vwp->pipes[1], &vp, sizeof vp) == sizeof vp);
AZ(pthread_join(vwp->thread, &vp));
free(vwp->pollfd);
}
@@ -206,7 +237,6 @@ const struct waiter_impl waiter_poll = {
.name = "poll",
.init = vwp_init,
.fini = vwp_fini,
- .inject = vwp_inject,
- .evict = vwp_evict,
+ .enter = vwp_enter,
.size = sizeof(struct vwp),
};
diff --git a/bin/varnishd/waiter/waiter.h b/bin/varnishd/waiter/waiter.h
index db69e26..5b5620b 100644
--- a/bin/varnishd/waiter/waiter.h
+++ b/bin/varnishd/waiter/waiter.h
@@ -61,7 +61,6 @@ int Wait_Enter(const struct waiter *, struct waited *);
struct waiter *Waiter_New(waiter_handle_f *, volatile double *timeout);
void Waiter_Destroy(struct waiter **);
const char *Waiter_GetName(void);
-void Waiter_Init(void);
/* mgt_waiter.c */
int Wait_Argument(struct vsb *vsb, const char *arg);
diff --git a/bin/varnishd/waiter/waiter_priv.h b/bin/varnishd/waiter/waiter_priv.h
index f21e0ae..ed369c4 100644
--- a/bin/varnishd/waiter/waiter_priv.h
+++ b/bin/varnishd/waiter/waiter_priv.h
@@ -33,27 +33,25 @@ struct waited;
struct waiter {
unsigned magic;
- #define WAITER_MAGIC 0x17c399db
+#define WAITER_MAGIC 0x17c399db
const struct waiter_impl *impl;
VTAILQ_ENTRY(waiter) list;
+ VTAILQ_HEAD(,waited) waithead;
+
int dismantle;
waiter_handle_f * func;
- int pipes[2];
- struct waited *pipe_w;
double next_idle;
- int do_pipe;
volatile double *tmo;
- VTAILQ_HEAD(,waited) waithead;
void *priv;
};
typedef void waiter_init_f(struct waiter *);
typedef void waiter_fini_f(struct waiter *);
-typedef int waiter_pass_f(void *priv, struct waited *);
+typedef int waiter_enter_f(void *priv, struct waited *);
typedef void waiter_inject_f(const struct waiter *, struct waited *);
typedef void waiter_evict_f(const struct waiter *, struct waited *);
@@ -61,16 +59,11 @@ struct waiter_impl {
const char *name;
waiter_init_f *init;
waiter_fini_f *fini;
- waiter_pass_f *pass;
+ waiter_enter_f *enter;
waiter_inject_f *inject;
- waiter_evict_f *evict;
size_t size;
};
-/* cache_waiter.c */
-void Wait_Handle(struct waiter *, struct waited *, enum wait_event, double now);
-void Wait_UsePipe(struct waiter *w);
-
/* mgt_waiter.c */
extern struct waiter_impl const * waiter;
More information about the varnish-commit
mailing list