[master] 64fbc76 Bring back the kqueue waiter.

Poul-Henning Kamp phk at FreeBSD.org
Thu May 21 18:48:51 CEST 2015


commit 64fbc76108d77a601957165317dba1163d70fa6b
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date:   Thu May 21 16:41:26 2015 +0000

    Bring back the kqueue waiter.

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 89d4cd9..1a0dc03 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -376,6 +376,7 @@ struct waited {
 	int			fd;
 	void			*ptr;
 	double			idle;
+	VTAILQ_ENTRY(waited)	list;
 };
 
 /* Stored object -----------------------------------------------------
diff --git a/bin/varnishd/waiter/cache_waiter_kqueue.c b/bin/varnishd/waiter/cache_waiter_kqueue.c
index c28b2de..f5452ac 100644
--- a/bin/varnishd/waiter/cache_waiter_kqueue.c
+++ b/bin/varnishd/waiter/cache_waiter_kqueue.c
@@ -26,12 +26,8 @@
  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * XXX: We need to pass sessions back into the event engine when they are
- * reused.  Not sure what the most efficient way is for that.  For now
- * write the session pointer to a pipe which the event engine monitors.
  */
 
-#if 0
 #include "config.h"
 
 #if defined(HAVE_KQUEUE)
@@ -39,6 +35,7 @@
 #include <sys/types.h>
 #include <sys/event.h>
 
+#include <errno.h>
 #include <stdlib.h>
 
 #include "cache/cache.h"
@@ -47,95 +44,18 @@
 #include "waiter/waiter_priv.h"
 #include "vtim.h"
 
-#define NKEV	100
+#define NKEV	256
 
 struct vwk {
 	unsigned		magic;
 #define VWK_MAGIC		0x1cc2acc2
+	int			kq;
 	struct waiter		*waiter;
-
 	pthread_t		thread;
-	int			kq;
-	struct kevent		ki[NKEV];
-	unsigned		nki;
-};
-
-/*--------------------------------------------------------------------*/
-
-static void
-vwk_kq_flush(struct vwk *vwk)
-{
-	int i;
-
-	if (vwk->nki == 0)
-		return;
-	i = kevent(vwk->kq, vwk->ki, vwk->nki, NULL, 0, NULL);
-	AZ(i);
-	vwk->nki = 0;
-}
-
-static void
-vwk_kq_sess(struct vwk *vwk, struct waited *sp, short arm)
-{
-
-	CHECK_OBJ_NOTNULL(sp, WAITED_MAGIC);
-	assert(sp->fd >= 0);
-	EV_SET(&vwk->ki[vwk->nki], sp->fd, EVFILT_READ, arm, 0, 0, sp);
-	if (++vwk->nki == NKEV)
-		vwk_kq_flush(vwk);
-}
-
-/*--------------------------------------------------------------------*/
 
-static void
-vwk_inject(const struct waiter *w, struct waited *wp)
-{
-	struct vwk *vwk;
-
-	CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
-	if (wp == w->pipe_w)
-		vwk_kq_sess(vwk, wp, EV_ADD);
-	else
-		vwk_kq_sess(vwk, wp, EV_ADD | EV_ONESHOT);
-}
-
-#if 0
-static void
-vwk_evict(const struct waiter *w, struct waited *wp)
-{
-	struct vwk *vwk;
-
-	CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
-	vwk_kq_sess(vwk, wp, EV_DELETE);
-}
-#endif
-
-/*--------------------------------------------------------------------*/
-
-static void
-vwk_sess_ev(const struct vwk *vwk, const struct kevent *kp, double now)
-{
-	struct waited *sp;
-	double idle;
-
-	AN(kp->udata);
-	CAST_OBJ_NOTNULL(sp, kp->udata, WAITED_MAGIC);
-
-	idle = now - *vwk->waiter->tmo;
-
-	if (sp->idle <= idle) {
-		Wait_Handle(vwk->waiter, sp, WAITER_TIMEOUT, now);
-	} else if (kp->flags & EV_EOF) {
-		Wait_Handle(vwk->waiter, sp, WAITER_REMCLOSE, now);
-	} else {
-		if (kp->data == 0)
-			VSL(SLT_Debug, 0,
-			    "KQR d %ju filter %d data %jd flags 0x%x idle %g",
-			    (uintmax_t)kp->ident, kp->filter,
-			    (intmax_t)kp->data, kp->flags, sp->idle - idle);
-		Wait_Handle(vwk->waiter, sp, WAITER_ACTION, now);
-	}
-}
+	VTAILQ_HEAD(,waited)	list;
+	struct lock		mtx;
+};
 
 /*--------------------------------------------------------------------*/
 
@@ -145,30 +65,77 @@ vwk_thread(void *priv)
 	struct vwk *vwk;
 	struct kevent ke[NKEV], *kp;
 	int j, n;
-	double now;
+	double now, idle, last_idle;
+	struct timespec ts;
+	struct waited *wp, *wp2;
 
 	CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
 	THR_SetName("cache-kqueue");
 
-	vwk_kq_flush(vwk);
-
-	vwk->nki = 0;
-	while (!vwk->waiter->dismantle) {
-		n = kevent(vwk->kq, vwk->ki, vwk->nki, ke, NKEV, NULL);
+	last_idle = 0.0;
+	while (1) {
+		now = .3 * *vwk->waiter->tmo;
+		ts.tv_sec = (time_t)floor(now);
+		ts.tv_nsec = (long)(1e9 * (now - ts.tv_sec));
+		n = kevent(vwk->kq, NULL, 0, ke, NKEV, &ts);
+		if (n < 0 && errno == EBADF)
+			break;
 		assert(n <= NKEV);
-		if (n == 0) {
-			/* This happens on OSX in m00011.vtc */
-			(void)usleep(10000);
-		}
-		vwk->nki = 0;
 		now = VTIM_real();
+		idle = now - *vwk->waiter->tmo;
 		for (kp = ke, j = 0; j < n; j++, kp++) {
 			assert(kp->filter == EVFILT_READ);
-			vwk_sess_ev(vwk, kp, now);
+			CAST_OBJ_NOTNULL(wp, ke[j].udata, WAITED_MAGIC);
+			Lck_Lock(&vwk->mtx);
+			VTAILQ_REMOVE(&vwk->list, wp, list);
+			Lck_Unlock(&vwk->mtx);
+			if (wp->idle <= idle)
+				vwk->waiter->func(wp, WAITER_TIMEOUT, now);
+			else if (kp->flags & EV_EOF)
+				vwk->waiter->func(wp, WAITER_REMCLOSE, now);
+			else
+				vwk->waiter->func(wp, WAITER_ACTION, now);
+		}
+		if (now - last_idle > .3 * *vwk->waiter->tmo) {
+			last_idle = now;
+			n = 0;
+			Lck_Lock(&vwk->mtx);
+			VTAILQ_FOREACH_SAFE(wp, &vwk->list, list, wp2) {
+				if (wp->idle > idle)
+					continue;
+				EV_SET(ke + n, wp->fd,
+				    EVFILT_READ, EV_DELETE, 0, 0, wp);
+				if (++n == NKEV)
+					break;
+			}
+			if (n > 0)
+				AZ(kevent(vwk->kq, ke, n, NULL, 0, NULL));
+			for (j = 0; j < n; j++) {
+				CAST_OBJ_NOTNULL(wp, ke[j].udata, WAITED_MAGIC);
+				VTAILQ_REMOVE(&vwk->list, wp, list);
+				vwk->waiter->func(wp, WAITER_TIMEOUT, now);
+			}
+			Lck_Unlock(&vwk->mtx);
 		}
-		Wait_Handle(vwk->waiter, NULL, WAITER_ACTION, now);
 	}
-	NEEDLESS_RETURN(NULL);
+	return(NULL);
+}
+
+/*--------------------------------------------------------------------*/
+
+static int __match_proto__(waiter_enter_f)
+vwk_enter(void *priv, struct waited *wp)
+{
+	struct vwk *vwk;
+	struct kevent ke;
+
+	CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
+	EV_SET(&ke, wp->fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, wp);
+	Lck_Lock(&vwk->mtx);
+	VTAILQ_INSERT_TAIL(&vwk->list, wp, list);
+	AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
+	Lck_Unlock(&vwk->mtx);
+	return(0);
 }
 
 /*--------------------------------------------------------------------*/
@@ -185,13 +152,16 @@ vwk_init(struct waiter *w)
 
 	vwk->kq = kqueue();
 	assert(vwk->kq >= 0);
-
-	Wait_UsePipe(w);
+	VTAILQ_INIT(&vwk->list);
+	Lck_New(&vwk->mtx, lck_misc);
 
 	AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
 }
 
-/*--------------------------------------------------------------------*/
+/*--------------------------------------------------------------------
+ * It is the callers responsibility to trigger all fd's waited on to
+ * fail somehow.
+ */
 
 static void __match_proto__(waiter_fini_f)
 vwk_fini(struct waiter *w)
@@ -200,8 +170,16 @@ vwk_fini(struct waiter *w)
 	void *vp;
 
 	CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
-	AZ(pthread_join(vwk->thread, &vp));
+	Lck_Lock(&vwk->mtx);
+	while (!VTAILQ_EMPTY(&vwk->list)) {
+		Lck_Unlock(&vwk->mtx);
+		(void)usleep(100000);
+		Lck_Lock(&vwk->mtx);
+	}
 	AZ(close(vwk->kq));
+	vwk->kq = -1;
+	Lck_Unlock(&vwk->mtx);
+	AZ(pthread_join(vwk->thread, &vp));
 }
 
 /*--------------------------------------------------------------------*/
@@ -210,9 +188,8 @@ const struct waiter_impl waiter_kqueue = {
 	.name =		"kqueue",
 	.init =		vwk_init,
 	.fini =		vwk_fini,
-	.inject =	vwk_inject,
+	.enter =	vwk_enter,
 	.size =		sizeof(struct vwk),
 };
 
 #endif /* defined(HAVE_KQUEUE) */
-#endif
diff --git a/bin/varnishd/waiter/mgt_waiter.c b/bin/varnishd/waiter/mgt_waiter.c
index a97425a..52a380f 100644
--- a/bin/varnishd/waiter/mgt_waiter.c
+++ b/bin/varnishd/waiter/mgt_waiter.c
@@ -39,10 +39,10 @@
 #include "waiter/waiter_priv.h"
 
 static const struct waiter_impl *const waiter_impls[] = {
-#if 0
     #if defined(HAVE_KQUEUE)
 	&waiter_kqueue,
     #endif
+#if 0
     #if defined(HAVE_EPOLL_CTL)
 	&waiter_epoll,
     #endif



More information about the varnish-commit mailing list