[master] cbc829f Scrape back the waiter API, and rewrite the poll waiter to handle sparse fd-sets efficiently.

Poul-Henning Kamp phk at FreeBSD.org
Thu May 21 18:48:51 CEST 2015


commit cbc829fef3f9fa65ce1c945ce1d18f507adf8611
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date:   Thu May 21 09:22:17 2015 +0000

    Scrape back the waiter API, and rewrite the poll waiter to handle
    sparse fd-sets efficiently.

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index cad336e..89d4cd9 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -374,7 +374,6 @@ struct waited {
 	unsigned		magic;
 #define WAITED_MAGIC		0x1743992d
 	int			fd;
-	VTAILQ_ENTRY(waited)	list;
 	void			*ptr;
 	double			idle;
 };
diff --git a/bin/varnishd/cache/cache_main.c b/bin/varnishd/cache/cache_main.c
index 7baee82..00db49d 100644
--- a/bin/varnishd/cache/cache_main.c
+++ b/bin/varnishd/cache/cache_main.c
@@ -37,7 +37,6 @@
 
 #include "vcli_priv.h"
 #include "vrnd.h"
-#include "waiter/waiter.h"
 
 #include "hash/hash_slinger.h"
 
@@ -213,8 +212,6 @@ child_main(void)
 	PAN_Init();
 	VFP_Init();
 
-	Waiter_Init();
-
 	VCL_Init();
 
 	HTTP_Init();
diff --git a/bin/varnishd/waiter/cache_waiter.c b/bin/varnishd/waiter/cache_waiter.c
index c1a03fa..3373d0c 100644
--- a/bin/varnishd/waiter/cache_waiter.c
+++ b/bin/varnishd/waiter/cache_waiter.c
@@ -36,174 +36,17 @@
 
 #include "cache/cache.h"
 
-#include "vfil.h"
-#include "vtim.h"
-
 #include "waiter/waiter.h"
 #include "waiter/waiter_priv.h"
 
-#define NEV 8192
-
-static VTAILQ_HEAD(, waiter)	waiters = VTAILQ_HEAD_INITIALIZER(waiters);
-static int			nwaiters;
-static struct lock		wait_mtx;
-static pthread_t		wait_thr;
-
-static void *
-wait_poker_thread(void *arg)
-{
-	struct waiter *w;
-	double now;
-
-	(void)arg;
-	THR_SetName("Waiter timer");
-	while (1) {
-		/* Avoid thundering herds and resonances */
-		(void)usleep(990013/nwaiters);
-
-		now = VTIM_real();
-
-		Lck_Lock(&wait_mtx);
-		w = VTAILQ_FIRST(&waiters);
-		VTAILQ_REMOVE(&waiters, w, list);
-		VTAILQ_INSERT_TAIL(&waiters, w, list);
-		assert(w->pipes[1] >= 0);
-
-		if (w->next_idle + *w->tmo < now)
-			(void)write(w->pipes[1], &w->pipe_w, sizeof w->pipe_w);
-		Lck_Unlock(&wait_mtx);
-	}
-	NEEDLESS_RETURN(NULL);
-}
-
-void
-Wait_UsePipe(struct waiter *w)
-{
-	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
-
-	AN(waiter->inject);
-	AZ(pipe(w->pipes));
-	AZ(VFIL_nonblocking(w->pipes[0]));
-	AZ(VFIL_nonblocking(w->pipes[1]));
-	ALLOC_OBJ(w->pipe_w, WAITED_MAGIC);
-	w->pipe_w->fd = w->pipes[0];
-	w->pipe_w->idle = 0;
-	VTAILQ_INSERT_HEAD(&w->waithead, w->pipe_w, list);
-	waiter->inject(w, w->pipe_w);
-}
-
 int
 Wait_Enter(const struct waiter *w, struct waited *wp)
 {
-	ssize_t written;
-	uintptr_t up;
 
 	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
 	CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
-	assert(wp->fd > 0);		// stdin never comes here
-	AZ(w->dismantle);
-
-	if (w->impl->pass != NULL)
-		return (w->impl->pass(w->priv, wp));
-
-	assert(w->pipes[1] > 0);
-
-	up = (uintptr_t)wp;
-	written = write(w->pipes[1], &up, sizeof up);
-	if (written != sizeof up && (errno == EAGAIN || errno == EWOULDBLOCK))
-		return (-1);
-	assert (written == sizeof up);
-	return (0);
-}
-
-static void
-wait_updidle(struct waiter *w, double now)
-{
-	struct waited *wp;
-
-	wp = VTAILQ_FIRST(&w->waithead);
-	if (wp == NULL)
-		return;
-	if (wp == w->pipe_w) {
-		VTAILQ_REMOVE(&w->waithead, wp, list);
-		VTAILQ_INSERT_TAIL(&w->waithead, wp, list);
-		wp->idle = now;
-		wp = VTAILQ_FIRST(&w->waithead);
-	}
-	w->next_idle = wp->idle;
-}
-
-void
-Wait_Handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
-{
-	uintptr_t ss[NEV];
-	struct waited *wp2;
-	int i, j, dotimer = 0;
-
-	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
-	CHECK_OBJ_ORNULL(wp, WAITED_MAGIC);
-
-	if (wp != NULL) {
-		if (wp == w->pipe_w) {
-			w->do_pipe = 1;
-			VTAILQ_REMOVE(&w->waithead, w->pipe_w, list);
-			wp->idle = now;
-			VTAILQ_INSERT_TAIL(&w->waithead, w->pipe_w, list);
-		} else {
-			if (w->impl->evict != NULL)
-				w->impl->evict(w, wp);
-
-			VTAILQ_REMOVE(&w->waithead, wp, list);
-			w->func(wp, ev, now);
-			wait_updidle(w, now);
-		}
-		return;
-	}
-
-	AZ(wp);
-
-	if (!w->do_pipe)
-		return;
-
-	w->do_pipe = 0;
-
-	i = read(w->pipes[0], ss, sizeof ss);
-	if (i == -1 && errno == EAGAIN)
-		return;
-
-	for (j = 0; i >= sizeof ss[0]; j++, i -= sizeof ss[0]) {
-		if (ss[j] == 0) {
-			AN(w->dismantle);
-			continue;
-		}
-		ss[j] &= ~1;
-		CAST_OBJ_NOTNULL(wp2, (void*)ss[j], WAITED_MAGIC);
-		if (wp2 == w->pipe_w) {
-			dotimer = 1;
-		} else {
-			assert(wp2->fd >= 0);
-			VTAILQ_INSERT_TAIL(&w->waithead, wp2, list);
-			w->impl->inject(w, wp2);
-		}
-	}
-	AZ(i);
-
-	wait_updidle(w, now);
-
-	if (!dotimer)
-		return;
-
-	VTAILQ_FOREACH_SAFE(wp, &w->waithead, list, wp2) {
-		if (wp == w->pipe_w)
-			continue;
-		if (wp->idle + *w->tmo > now)
-			break;
-		if (w->impl->evict != NULL)
-			w->impl->evict(w, wp);
-		VTAILQ_REMOVE(&w->waithead, wp, list);
-		w->func(wp, WAITER_TIMEOUT, now);
-	}
-	wait_updidle(w, now);
+	assert(wp->fd > 0);			// stdin never comes here
+	return (w->impl->enter(w->priv, wp));
 }
 
 /**********************************************************************/
@@ -215,7 +58,7 @@ Waiter_GetName(void)
 	if (waiter != NULL)
 		return (waiter->name);
 	else
-		return ("no_waiter");
+		return ("(No Waiter?)");
 }
 
 struct waiter *
@@ -226,6 +69,8 @@ Waiter_New(waiter_handle_f *func, volatile double *tmo)
 	AN(waiter);
 	AN(waiter->name);
 	AN(waiter->init);
+	AN(waiter->enter);
+	AN(waiter->fini);
 
 	w = calloc(1, sizeof (struct waiter) + waiter->size);
 	AN(w);
@@ -234,20 +79,10 @@ Waiter_New(waiter_handle_f *func, volatile double *tmo)
 	w->impl = waiter;
 	w->func = func;
 	w->tmo = tmo;
-	w->pipes[0] = w->pipes[1] = -1;
 	VTAILQ_INIT(&w->waithead);
 
 	waiter->init(w);
-	AN(w->impl->pass || w->pipes[1] >= 0);
-
-	Lck_Lock(&wait_mtx);
-	VTAILQ_INSERT_TAIL(&waiters, w, list);
-	nwaiters++;
 
-	/* We assume all waiters either use pipes or don't use pipes */
-	if (w->pipes[1] >= 0 && nwaiters == 1)
-		AZ(pthread_create(&wait_thr, NULL, wait_poker_thread, NULL));
-	Lck_Unlock(&wait_mtx);
 	return (w);
 }
 
@@ -255,47 +90,13 @@ void
 Waiter_Destroy(struct waiter **wp)
 {
 	struct waiter *w;
-	struct waited *wx = NULL;
-	int written;
-	double now;
 
 	AN(wp);
 	w = *wp;
 	*wp = NULL;
 	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
 
-	Lck_Lock(&wait_mtx);
-	VTAILQ_REMOVE(&waiters, w, list);
-	w->dismantle = 1;
-	Lck_Unlock(&wait_mtx);
-
-	if (w->pipes[1] >= 0) {
-		while (1) {
-			written = write(w->pipes[1], &wx, sizeof wx);
-			if (written == sizeof wx)
-				break;
-			(void)usleep(10000);
-		}
-	}
 	AN(w->impl->fini);
 	w->impl->fini(w);
-	now = VTIM_real();
-	while (1) {
-		wx = VTAILQ_FIRST(&w->waithead);
-		if (wx == NULL)
-			break;
-		VTAILQ_REMOVE(&w->waithead, wx, list);
-		if (wx == w->pipe_w)
-			FREE_OBJ(wx);
-		else
-			w->func(wx, WAITER_CLOSE, now);
-	}
 	FREE_OBJ(w);
 }
-
-void
-Waiter_Init(void)
-{
-
-	Lck_New(&wait_mtx, lck_misc);
-}
diff --git a/bin/varnishd/waiter/cache_waiter_poll.c b/bin/varnishd/waiter/cache_waiter_poll.c
index d3cbe4e..467f73f 100644
--- a/bin/varnishd/waiter/cache_waiter_poll.c
+++ b/bin/varnishd/waiter/cache_waiter_poll.c
@@ -45,80 +45,88 @@ struct vwp {
 #define VWP_MAGIC		0x4b2cc735
 	struct waiter		*waiter;
 
+	int			pipes[2];
+
 	pthread_t		thread;
 	struct pollfd		*pollfd;
-	unsigned		npoll;
-	unsigned		hpoll;
+	struct waited		**idx;
+	size_t			npoll;
+	size_t			hpoll;
 };
 
-/*--------------------------------------------------------------------*/
+/*--------------------------------------------------------------------
+ * It would make much more sense to not use two large vectors, but
+ * the poll(2) API forces us to use at least one, so ... KISS.
+ */
 
 static void
-vwp_pollspace(struct vwp *vwp, unsigned fd)
+vwp_extend_pollspace(struct vwp *vwp)
 {
-	struct pollfd *newpollfd = vwp->pollfd;
-	unsigned newnpoll;
-
-	if (fd < vwp->npoll)
-		return;
-	newnpoll = vwp->npoll;
-	if (newnpoll == 0)
-		newnpoll = 1;
-	while (fd >= newnpoll)
-		newnpoll = newnpoll * 2;
-	VSL(SLT_Debug, 0, "Acceptor poll space increased to %u", newnpoll);
-	newpollfd = realloc(newpollfd, newnpoll * sizeof *newpollfd);
-	XXXAN(newpollfd);
-	memset(newpollfd + vwp->npoll, 0,
-	    (newnpoll - vwp->npoll) * sizeof *newpollfd);
-	vwp->pollfd = newpollfd;
-	while (vwp->npoll < newnpoll)
+	size_t inc = (1<<16);
+
+	VSL(SLT_Debug, 0, "Acceptor poll space increased by %zu to %zu",
+	    inc, vwp->npoll + inc);
+
+	vwp->pollfd = realloc(vwp->pollfd,
+	    (vwp->npoll + inc) * sizeof(*vwp->pollfd));
+	AN(vwp->pollfd);
+	memset(vwp->pollfd + vwp->npoll, 0, inc * sizeof(*vwp->pollfd));
+
+	vwp->idx = realloc(vwp->idx, (vwp->npoll + inc) * sizeof(*vwp->idx));
+	AN(vwp->idx);
+	memset(vwp->idx + vwp->npoll, 0, inc * sizeof(*vwp->idx));
+
+	for (; inc > 0; inc--)
 		vwp->pollfd[vwp->npoll++].fd = -1;
-	assert(fd < vwp->npoll);
 }
 
 /*--------------------------------------------------------------------*/
 
-static void __match_proto__(waiter_inject_f)
-vwp_inject(const struct waiter *w, struct waited *wp)
+static void
+vwp_add(struct vwp *vwp, struct waited *w)
 {
-	struct vwp *vwp;
-	int fd;
 
-	CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
-	CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
-	fd = wp->fd;
-VSL(SLT_Debug, 0, "POLL Inject %d", fd);
-	assert(fd >= 0);
-	vwp_pollspace(vwp, (unsigned)fd);
-	assert(fd < vwp->npoll);
-	if (vwp->hpoll < fd)
-		vwp->hpoll = fd;
-
-	assert(vwp->pollfd[fd].fd == -1);
-	AZ(vwp->pollfd[fd].events);
-	AZ(vwp->pollfd[fd].revents);
-
-	vwp->pollfd[fd].fd = fd;
-	vwp->pollfd[fd].events = POLLIN;
+	if (vwp->hpoll == vwp->npoll)
+		vwp_extend_pollspace(vwp);
+	assert(vwp->hpoll < vwp->npoll);
+	assert(vwp->pollfd[vwp->hpoll].fd == -1);
+	AZ(vwp->idx[vwp->hpoll]);
+	vwp->pollfd[vwp->hpoll].fd = w->fd;
+	vwp->pollfd[vwp->hpoll].events = POLLIN;
+	vwp->idx[vwp->hpoll] = w;
+	vwp->hpoll++;
 }
 
-static void __match_proto__(waiter_evict_f)
-vwp_evict(const struct waiter *w, struct waited *wp)
+static void
+vwp_del(struct vwp *vwp, int n)
 {
-	struct vwp *vwp;
-	int fd;
+	vwp->hpoll--;
+	if (n != vwp->hpoll) {
+		vwp->pollfd[n] = vwp->pollfd[vwp->hpoll];
+		vwp->idx[n] = vwp->idx[vwp->hpoll];
+	}
+	memset(&vwp->pollfd[vwp->hpoll], 0, sizeof(*vwp->pollfd));
+	vwp->pollfd[vwp->hpoll].fd = -1;
+	vwp->idx[vwp->hpoll] = NULL;
+}
 
-	CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
-	CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
-	fd = wp->fd;
-VSL(SLT_Debug, 0, "POLL Evict %d", fd);
-	assert(fd >= 0);
-	assert(fd < vwp->npoll);
-	vwp_pollspace(vwp, (unsigned)fd);
-
-	vwp->pollfd[fd].fd = -1;
-	vwp->pollfd[fd].events = 0;
+/*--------------------------------------------------------------------*/
+
+static void
+vwp_dopipe(struct vwp *vwp)
+{
+	struct waited *w[128];
+	ssize_t ss;
+	int i;
+
+	ss = read(vwp->pipes[0], w, sizeof w);
+	assert(ss > 0);
+	i = 0;
+	while (ss) {
+		CHECK_OBJ_NOTNULL(w[i], WAITED_MAGIC);
+		assert(w[i]->fd > 0);			// no stdin
+		vwp_add(vwp, w[i++]);
+	}
 }
 
 /*--------------------------------------------------------------------*/
@@ -128,50 +136,66 @@ vwp_main(void *priv)
 {
 	int v, v2;
 	struct vwp *vwp;
-	struct waited *sp, *sp2;
+	struct waited *wp;
 	double now, idle;
-	int fd;
+	int i;
 
 	CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
 	THR_SetName("cache-poll");
 
-	while (!vwp->waiter->dismantle) {
+	while (1) {
+		// Try to keep the high point as low as possible
 		assert(vwp->hpoll < vwp->npoll);
 		while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
 			vwp->hpoll--;
+
+		// XXX: sleep on ->tmo
 		v = poll(vwp->pollfd, vwp->hpoll + 1, -1);
 		assert(v >= 0);
 		v2 = v;
 		now = VTIM_real();
 		idle = now - *vwp->waiter->tmo;
-		VTAILQ_FOREACH_SAFE(sp, &vwp->waiter->waithead, list, sp2) {
-			if (v != 0 && v2 == 0)
-				break;
-			CHECK_OBJ_NOTNULL(sp, WAITED_MAGIC);
-			fd = sp->fd;
-			VSL(SLT_Debug, 0,
-			    "POLL Handle %d %x", fd, vwp->pollfd[fd].revents);
-			assert(fd >= 0);
-			assert(fd <= vwp->hpoll);
-			assert(fd < vwp->npoll);
-			assert(vwp->pollfd[fd].fd == fd);
-			if (vwp->pollfd[fd].revents) {
+		i = 1;
+		while (v2 > 0 && i < vwp->hpoll) {
+			wp = vwp->idx[i];
+			CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
+			if (vwp->pollfd[i].revents != 0) {
 				v2--;
-				vwp->pollfd[fd].revents = 0;
-				Wait_Handle(vwp->waiter, sp, WAITER_ACTION,
-				    now);
-			} else if (sp->idle <= idle) {
-				Wait_Handle(vwp->waiter, sp, WAITER_TIMEOUT,
-				    now);
+				assert(wp->fd > 0);
+				assert(wp->fd == vwp->pollfd[i].fd);
+				VSL(SLT_Debug, wp->fd, "POLL Handle %d %x",
+				    wp->fd, vwp->pollfd[i].revents);
+				vwp_del(vwp, i);
+				vwp->waiter->func(wp, WAITER_ACTION, now);
+			} else if (wp->idle <= idle) {
+				vwp_del(vwp, i);
+				vwp->waiter->func(wp, WAITER_TIMEOUT, now);
+			} else {
+				i++;
 			}
 		}
-		Wait_Handle(vwp->waiter, NULL, WAITER_ACTION, now);
+		if (vwp->pollfd[0].revents)
+			vwp_dopipe(vwp);
 	}
 	NEEDLESS_RETURN(NULL);
 }
 
 /*--------------------------------------------------------------------*/
 
+static int __match_proto__(waiter_enter_f)
+vwp_enter(void *priv, struct waited *wp)
+{
+	struct vwp *vwp;
+
+	CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
+
+	if (write(vwp->pipes[1], &wp, sizeof wp) != sizeof wp)
+		return (-1);
+	return (0);
+}
+
+/*--------------------------------------------------------------------*/
+
 static void __match_proto__(waiter_init_f)
 vwp_init(struct waiter *w)
 {
@@ -181,9 +205,13 @@ vwp_init(struct waiter *w)
 	vwp = w->priv;
 	INIT_OBJ(vwp, VWP_MAGIC);
 	vwp->waiter = w;
+	AZ(pipe(vwp->pipes));
+	// XXX: set write pipe non-blocking
 
-	vwp_pollspace(vwp, 256);
-	Wait_UsePipe(w);
+	vwp_extend_pollspace(vwp);
+	vwp->pollfd[0].fd = vwp->pipes[0];
+	vwp->pollfd[0].events = POLLIN;
+	vwp->hpoll = 1;
 	AZ(pthread_create(&vwp->thread, NULL, vwp_main, vwp));
 }
 
@@ -196,6 +224,9 @@ vwp_fini(struct waiter *w)
 	void *vp;
 
 	CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
+	vp = NULL;
+	// XXX: set write pipe blocking
+	assert(write(vwp->pipes[1], &vp, sizeof vp) == sizeof vp);
 	AZ(pthread_join(vwp->thread, &vp));
 	free(vwp->pollfd);
 }
@@ -206,7 +237,6 @@ const struct waiter_impl waiter_poll = {
 	.name =		"poll",
 	.init =		vwp_init,
 	.fini =		vwp_fini,
-	.inject =	vwp_inject,
-	.evict =	vwp_evict,
+	.enter =	vwp_enter,
 	.size =		sizeof(struct vwp),
 };
diff --git a/bin/varnishd/waiter/waiter.h b/bin/varnishd/waiter/waiter.h
index db69e26..5b5620b 100644
--- a/bin/varnishd/waiter/waiter.h
+++ b/bin/varnishd/waiter/waiter.h
@@ -61,7 +61,6 @@ int Wait_Enter(const struct waiter *, struct waited *);
 struct waiter *Waiter_New(waiter_handle_f *, volatile double *timeout);
 void Waiter_Destroy(struct waiter **);
 const char *Waiter_GetName(void);
-void Waiter_Init(void);
 
 /* mgt_waiter.c */
 int Wait_Argument(struct vsb *vsb, const char *arg);
diff --git a/bin/varnishd/waiter/waiter_priv.h b/bin/varnishd/waiter/waiter_priv.h
index f21e0ae..ed369c4 100644
--- a/bin/varnishd/waiter/waiter_priv.h
+++ b/bin/varnishd/waiter/waiter_priv.h
@@ -33,27 +33,25 @@ struct waited;
 
 struct waiter {
 	unsigned			magic;
-	#define WAITER_MAGIC		0x17c399db
+#define WAITER_MAGIC			0x17c399db
 	const struct waiter_impl	*impl;
 	VTAILQ_ENTRY(waiter)		list;
+	VTAILQ_HEAD(,waited)		waithead;
+
 	int				dismantle;
 
 	waiter_handle_f *		func;
 
-	int				pipes[2];
-	struct waited			*pipe_w;
 	double				next_idle;
-	int				do_pipe;
 
 	volatile double			*tmo;
-	VTAILQ_HEAD(,waited)		waithead;
 
 	void				*priv;
 };
 
 typedef void waiter_init_f(struct waiter *);
 typedef void waiter_fini_f(struct waiter *);
-typedef int waiter_pass_f(void *priv, struct waited *);
+typedef int waiter_enter_f(void *priv, struct waited *);
 typedef void waiter_inject_f(const struct waiter *, struct waited *);
 typedef void waiter_evict_f(const struct waiter *, struct waited *);
 
@@ -61,16 +59,11 @@ struct waiter_impl {
 	const char		*name;
 	waiter_init_f		*init;
 	waiter_fini_f		*fini;
-	waiter_pass_f		*pass;
+	waiter_enter_f		*enter;
 	waiter_inject_f		*inject;
-	waiter_evict_f		*evict;
 	size_t			size;
 };
 
-/* cache_waiter.c */
-void Wait_Handle(struct waiter *, struct waited *, enum wait_event, double now);
-void Wait_UsePipe(struct waiter *w);
-
 /* mgt_waiter.c */
 extern struct waiter_impl const * waiter;
 



More information about the varnish-commit mailing list