[master] 0f03834 Give the kqueue waiter a binheap to track timeouts.

Poul-Henning Kamp phk at FreeBSD.org
Wed May 27 23:38:32 CEST 2015


commit 0f038349f7532249c56ccf12850b5ec0bd9dcd29
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date:   Wed May 27 09:08:19 2015 +0000

    Give the kqueue waiter a binheap to track timeouts.

diff --git a/bin/varnishd/waiter/cache_waiter.c b/bin/varnishd/waiter/cache_waiter.c
index 46957be..d3a33ca 100644
--- a/bin/varnishd/waiter/cache_waiter.c
+++ b/bin/varnishd/waiter/cache_waiter.c
@@ -34,11 +34,80 @@
 #include <stdio.h>
 #include <stdlib.h>
 
+#include "binary_heap.h"
+
 #include "cache/cache.h"
 
 #include "waiter/waiter_priv.h"
 #include "waiter/mgt_waiter.h"
 
+static int __match_proto__(binheap_cmp_t)
+waited_cmp(void *priv, const void *a, const void *b)
+{
+	const struct waiter *ww;
+	const struct waited *aa, *bb;
+
+	CAST_OBJ_NOTNULL(ww, priv, WAITER_MAGIC);
+	CAST_OBJ_NOTNULL(aa, a, WAITED_MAGIC);
+	CAST_OBJ_NOTNULL(bb, b, WAITED_MAGIC);
+
+	return (aa->idle + Wait_Tmo(ww, aa) < bb->idle + Wait_Tmo(ww, bb));
+}
+
+static void __match_proto__(binheap_update_t)
+waited_update(void *priv, void *p, unsigned u)
+{
+	struct waited *pp;
+
+	(void)priv;
+	CAST_OBJ_NOTNULL(pp, p, WAITED_MAGIC);
+	pp->idx = u;
+}
+
+/**********************************************************************/
+
+void
+Wait_Call(const struct waiter *w,
+    struct waited *wp, enum wait_event ev, double now)
+{
+	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
+	CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
+	CHECK_OBJ_NOTNULL(w->waitfor, WAITFOR_MAGIC);
+	AN(w->waitfor->func);
+	if (wp->idx != BINHEAP_NOIDX)
+		binheap_delete(w->heap, wp->idx);
+	assert(wp->idx == BINHEAP_NOIDX);
+	w->waitfor->func(wp, ev, now);
+}
+
+/**********************************************************************/
+
+void
+Wait_HeapInsert(const struct waiter *w, struct waited *wp)
+{
+	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
+	CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
+	assert(wp->idx == BINHEAP_NOIDX);
+	binheap_insert(w->heap, wp);
+}
+
+double
+Wait_HeapDue(const struct waiter *w, struct waited **wpp)
+{
+	struct waited *wp;
+
+	wp = binheap_root(w->heap);
+	CHECK_OBJ_ORNULL(wp, WAITED_MAGIC);
+	if (wp == NULL) {
+		*wpp = NULL;
+		return (0);
+	}
+	*wpp = wp;
+	return(wp->idle + Wait_Tmo(w, wp));
+}
+
+/**********************************************************************/
+
 int
 Wait_Enter(const struct waiter *w, struct waited *wp)
 {
@@ -46,6 +115,7 @@ Wait_Enter(const struct waiter *w, struct waited *wp)
 	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
 	CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
 	assert(wp->fd > 0);			// stdin never comes here
+	wp->idx = BINHEAP_NOIDX;
 	return (w->impl->enter(w->priv, wp));
 }
 
@@ -79,6 +149,7 @@ Waiter_New(struct waitfor *wf)
 	w->impl = waiter;
 	w->waitfor = wf;
 	VTAILQ_INIT(&w->waithead);
+	w->heap = binheap_new(w, waited_cmp, waited_update);
 
 	waiter->init(w);
 
@@ -95,6 +166,7 @@ Waiter_Destroy(struct waiter **wp)
 	*wp = NULL;
 	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
 
+	AZ(binheap_root(w->heap));
 	AN(w->impl->fini);
 	w->impl->fini(w);
 	FREE_OBJ(w);
diff --git a/bin/varnishd/waiter/cache_waiter_kqueue.c b/bin/varnishd/waiter/cache_waiter_kqueue.c
index a6d4c8c..f8af96e 100644
--- a/bin/varnishd/waiter/cache_waiter_kqueue.c
+++ b/bin/varnishd/waiter/cache_waiter_kqueue.c
@@ -52,6 +52,8 @@ struct vwk {
 	int			kq;
 	struct waiter		*waiter;
 	pthread_t		thread;
+	double			next;
+	int			pipe[2];
 
 	VTAILQ_HEAD(,waited)	list;
 	struct lock		mtx;
@@ -66,58 +68,67 @@ vwk_thread(void *priv)
 	struct vwk *vwk;
 	struct kevent ke[NKEV], *kp;
 	int j, n;
-	double now, idle, last_idle;
+	double now, then;
 	struct timespec ts;
-	struct waited *wp, *wp2;
+	struct waited *wp;
+	struct waiter *w;
+	char c;
 
 	CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
+	w = vwk->waiter;
+	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
 	THR_SetName("cache-kqueue");
 
-	last_idle = 0.0;
+	now = VTIM_real();
+	Lck_Lock(&vwk->mtx);
 	while (1) {
-		now = .3 * Wait_Tmo(vwk->waiter, NULL);
-		ts.tv_sec = (time_t)floor(now);
-		ts.tv_nsec = (long)(1e9 * (now - ts.tv_sec));
+		while (1) {
+			/*
+			 * XXX: We could avoid many syscalls here if we were
+			 * XXX: allowed to just close the fd's on timeout.
+			 */
+			then = Wait_HeapDue(w, &wp);
+			if (wp == NULL) {
+				vwk->next = now + 100;
+				break;
+			} else if (then > now) {
+				vwk->next = then;
+				break;
+			}
+			CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
+			EV_SET(ke, wp->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+			AZ(kevent(vwk->kq, ke, 1, NULL, 0, NULL));
+			Wait_Call(w, wp, WAITER_TIMEOUT, now);
+		}
+		then = vwk->next - now;
+		ts.tv_sec = (time_t)floor(then);
+		ts.tv_nsec = (long)(1e9 * (then - ts.tv_sec));
+		Lck_Unlock(&vwk->mtx);
 		n = kevent(vwk->kq, NULL, 0, ke, NKEV, &ts);
-		if (n < 0 && vwk->die)
-			break;
 		assert(n >= 0);
 		assert(n <= NKEV);
 		now = VTIM_real();
+		Lck_Lock(&vwk->mtx);
 		for (kp = ke, j = 0; j < n; j++, kp++) {
 			assert(kp->filter == EVFILT_READ);
+			if (ke[j].udata == vwk) {
+				assert(read(vwk->pipe[0], &c, 1) == 1);
+				continue;
+			}
 			CAST_OBJ_NOTNULL(wp, ke[j].udata, WAITED_MAGIC);
-			Lck_Lock(&vwk->mtx);
 			VTAILQ_REMOVE(&vwk->list, wp, list);
-			Lck_Unlock(&vwk->mtx);
 			if (kp->flags & EV_EOF)
-				Wait_Call(vwk->waiter, wp, WAITER_REMCLOSE, now);
+				Wait_Call(w, wp, WAITER_REMCLOSE, now);
 			else
-				Wait_Call(vwk->waiter, wp, WAITER_ACTION, now);
-		}
-		idle = now - Wait_Tmo(vwk->waiter, NULL);
-		if (now - last_idle < .3 * Wait_Tmo(vwk->waiter, NULL))
-			continue;
-		last_idle = now;
-		n = 0;
-		Lck_Lock(&vwk->mtx);
-		VTAILQ_FOREACH_SAFE(wp, &vwk->list, list, wp2) {
-			if (wp->idle > idle)
-				continue;
-			EV_SET(ke + n, wp->fd,
-			    EVFILT_READ, EV_DELETE, 0, 0, wp);
-			if (++n == NKEV)
-				break;
+				Wait_Call(w, wp, WAITER_ACTION, now);
 		}
-		if (n > 0)
-			AZ(kevent(vwk->kq, ke, n, NULL, 0, NULL));
-		for (j = 0; j < n; j++) {
-			CAST_OBJ_NOTNULL(wp, ke[j].udata, WAITED_MAGIC);
-			VTAILQ_REMOVE(&vwk->list, wp, list);
-			Wait_Call(vwk->waiter, wp, WAITER_TIMEOUT, now);
-		}
-		Lck_Unlock(&vwk->mtx);
+		if (VTAILQ_EMPTY(&vwk->list) && vwk->die)
+			break;
 	}
+	Lck_Unlock(&vwk->mtx);
+	AZ(close(vwk->pipe[0]));
+	AZ(close(vwk->pipe[1]));
+	AZ(close(vwk->kq));
 	return(NULL);
 }
 
@@ -133,7 +144,13 @@ vwk_enter(void *priv, struct waited *wp)
 	EV_SET(&ke, wp->fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, wp);
 	Lck_Lock(&vwk->mtx);
 	VTAILQ_INSERT_TAIL(&vwk->list, wp, list);
+	Wait_HeapInsert(vwk->waiter, wp);
 	AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
+
+	/* If the kqueue isn't due before our timeout, poke it via the pipe */
+	if (Wait_Tmo(vwk->waiter, wp) < vwk->next)
+		assert(write(vwk->pipe[1], "X", 1) == 1);
+
 	Lck_Unlock(&vwk->mtx);
 	return(0);
 }
@@ -144,6 +161,7 @@ static void __match_proto__(waiter_init_f)
 vwk_init(struct waiter *w)
 {
 	struct vwk *vwk;
+	struct kevent ke;
 
 	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
 	vwk = w->priv;
@@ -154,6 +172,9 @@ vwk_init(struct waiter *w)
 	assert(vwk->kq >= 0);
 	VTAILQ_INIT(&vwk->list);
 	Lck_New(&vwk->mtx, lck_misc);
+	AZ(pipe(vwk->pipe));
+	EV_SET(&ke, vwk->pipe[0], EVFILT_READ, EV_ADD, 0, 0, vwk);
+	AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
 
 	AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
 }
@@ -168,19 +189,11 @@ vwk_fini(struct waiter *w)
 {
 	struct vwk *vwk;
 	void *vp;
-	int i;
 
 	CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
 	Lck_Lock(&vwk->mtx);
-	while (!VTAILQ_EMPTY(&vwk->list)) {
-		Lck_Unlock(&vwk->mtx);
-		(void)usleep(100000);
-		Lck_Lock(&vwk->mtx);
-	}
 	vwk->die = 1;
-	i = vwk->kq;
-	vwk->kq = -1;
-	AZ(close(i));
+	assert(write(vwk->pipe[1], "Y", 1) == 1);
 	Lck_Unlock(&vwk->mtx);
 	AZ(pthread_join(vwk->thread, &vp));
 	Lck_Delete(&vwk->mtx);
diff --git a/bin/varnishd/waiter/waiter.h b/bin/varnishd/waiter/waiter.h
index 0e70d9e..7227fcd 100644
--- a/bin/varnishd/waiter/waiter.h
+++ b/bin/varnishd/waiter/waiter.h
@@ -65,6 +65,7 @@ struct waited {
 	unsigned		magic;
 #define WAITED_MAGIC		0x1743992d
 	int			fd;
+	unsigned		idx;
 	void			*ptr;
 	double			idle;
 	VTAILQ_ENTRY(waited)	list;
diff --git a/bin/varnishd/waiter/waiter_priv.h b/bin/varnishd/waiter/waiter_priv.h
index b4d9078..5215995 100644
--- a/bin/varnishd/waiter/waiter_priv.h
+++ b/bin/varnishd/waiter/waiter_priv.h
@@ -30,6 +30,7 @@
  */
 
 struct waited;
+struct binheap;
 
 struct waiter {
 	unsigned			magic;
@@ -41,6 +42,7 @@ struct waiter {
 	struct waitfor			*waitfor;
 
 	void				*priv;
+	struct binheap			*heap;
 };
 
 typedef void waiter_init_f(struct waiter *);
@@ -68,13 +70,7 @@ Wait_Tmo(const struct waiter *w, const struct waited *wp)
 	return (*w->waitfor->tmo);
 }
 
-static inline void
-Wait_Call(const struct waiter *w,
-    struct waited *wp, enum wait_event ev, double now)
-{
-	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
-	CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
-	CHECK_OBJ_NOTNULL(w->waitfor, WAITFOR_MAGIC);
-	AN(w->waitfor->func);
-	w->waitfor->func(wp, ev, now);
-}
+void Wait_Call(const struct waiter *, struct waited *,
+    enum wait_event ev, double now);
+void Wait_HeapInsert(const struct waiter *, struct waited *);
+double Wait_HeapDue(const struct waiter *, struct waited **);



More information about the varnish-commit mailing list