[master] 3dc7838 Start generalizing the pipe-passing stuff for waiters

Poul-Henning Kamp phk at FreeBSD.org
Wed Jan 14 11:33:14 CET 2015


commit 3dc7838f6fa944233e8a3e101839fed21f5b0335
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date:   Wed Jan 14 10:32:57 2015 +0000

    Start generalizing the pipe-passing stuff for waiters

diff --git a/bin/varnishd/waiter/cache_waiter.c b/bin/varnishd/waiter/cache_waiter.c
index 5f22f38..78e901c 100644
--- a/bin/varnishd/waiter/cache_waiter.c
+++ b/bin/varnishd/waiter/cache_waiter.c
@@ -37,9 +37,13 @@
 
 #include "cache/cache.h"
 
+#include "vfil.h"
+
 #include "waiter/waiter.h"
 #include "waiter/waiter_priv.h"
 
+#define NEV 8192
+
 const char *
 WAIT_GetName(void)
 {
@@ -66,6 +70,22 @@ WAIT_Init(waiter_handle_f *func, volatile double *tmo)
 	return (w);
 }
 
+void
+WAIT_UsePipe(struct waiter *w)
+{
+	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
+
+	AN(waiter->inject);
+	AZ(pipe(w->pipes));
+	AZ(VFIL_nonblocking(w->pipes[0]));
+	AZ(VFIL_nonblocking(w->pipes[1]));
+	w->pfd = w->pipes[1];
+	ALLOC_OBJ(w->pipe_w, WAITED_MAGIC);
+	w->pipe_w->fd = w->pipes[0];
+	w->pipe_w->deadline = 9e99;
+	waiter->inject(w, w->pipe_w);
+}
+
 int
 WAIT_Enter(const struct waiter *w, struct waited *wp)
 {
@@ -89,9 +109,26 @@ WAIT_Enter(const struct waiter *w, struct waited *wp)
 void
 WAIT_handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
 {
+	struct waited *ss[NEV];
+	int i, j;
+
 	CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
 	CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
 
+	if (wp == w->pipe_w) {
+		i = read(w->pipes[0], ss, sizeof ss);
+		if (i == -1 && errno == EAGAIN)
+			return;
+		for (j = 0; i >= sizeof ss[0]; j++, i -= sizeof ss[0]) {
+			CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
+			assert(ss[j]->fd >= 0);
+			VTAILQ_INSERT_TAIL(&w->sesshead, ss[j], list);
+			w->impl->inject(w, ss[j]);
+		}
+		AZ(i);
+		return;
+	}
+
 	VTAILQ_REMOVE(&w->sesshead, wp, list);
 	w->func(wp, ev, now);
 }
diff --git a/bin/varnishd/waiter/cache_waiter_kqueue.c b/bin/varnishd/waiter/cache_waiter_kqueue.c
index d5017a4..c721ab7 100644
--- a/bin/varnishd/waiter/cache_waiter_kqueue.c
+++ b/bin/varnishd/waiter/cache_waiter_kqueue.c
@@ -46,7 +46,6 @@
 #include "waiter/waiter.h"
 #include "waiter/waiter_priv.h"
 #include "vtim.h"
-#include "vfil.h"
 
 #define NKEV	100
 
@@ -58,7 +57,6 @@ struct vwk {
 	waiter_handle_f		*func;
 	volatile double		*tmo;
 	pthread_t		thread;
-	int			pipes[2];
 	int			kq;
 	struct kevent		ki[NKEV];
 	unsigned		nki;
@@ -92,26 +90,15 @@ vwk_kq_sess(struct vwk *vwk, struct waited *sp, short arm)
 /*--------------------------------------------------------------------*/
 
 static void
-vwk_pipe_ev(struct vwk *vwk, const struct kevent *kp)
+vwk_inject(const struct waiter *w, struct waited *wp)
 {
-	int i, j;
-	struct waited *ss[NKEV];
+	struct vwk *vwk;
 
-	AN(kp->udata);
-	assert(kp->udata == vwk->pipes);
-	j = 0;
-	i = read(vwk->pipes[0], ss, sizeof ss);
-	if (i == -1 && errno == EAGAIN)
-		return;
-	while (i >= sizeof ss[0]) {
-		CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
-		assert(ss[j]->fd >= 0);
-		VTAILQ_INSERT_TAIL(&vwk->waiter->sesshead, ss[j], list);
-		vwk_kq_sess(vwk, ss[j], EV_ADD | EV_ONESHOT);
-		j++;
-		i -= sizeof ss[0];
-	}
-	AZ(i);
+	CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
+	if (wp == w->pipe_w)
+		vwk_kq_sess(vwk, wp, EV_ADD);
+	else
+		vwk_kq_sess(vwk, wp, EV_ADD | EV_ONESHOT);
 }
 
 /*--------------------------------------------------------------------*/
@@ -122,7 +109,6 @@ vwk_sess_ev(struct vwk *vwk, const struct kevent *kp, double now)
 	struct waited *sp;
 
 	AN(kp->udata);
-	assert(kp->udata != vwk->pipes);
 	CAST_OBJ_NOTNULL(sp, kp->udata, WAITED_MAGIC);
 
 	if (kp->data > 0) {
@@ -153,12 +139,7 @@ vwk_thread(void *priv)
 	vwk->kq = kqueue();
 	assert(vwk->kq >= 0);
 
-	j = 0;
-	EV_SET(&ke[j], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
-	j++;
-	EV_SET(&ke[j], vwk->pipes[0], EVFILT_READ, EV_ADD, 0, 0, vwk->pipes);
-	j++;
-	AZ(kevent(vwk->kq, ke, j, NULL, 0, NULL));
+	vwk_kq_flush(vwk);
 
 	vwk->nki = 0;
 	while (1) {
@@ -175,9 +156,6 @@ vwk_thread(void *priv)
 		for (kp = ke, j = 0; j < n; j++, kp++) {
 			if (kp->filter == EVFILT_TIMER) {
 				dotimer = 1;
-			} else if (kp->filter == EVFILT_READ &&
-			    kp->udata == vwk->pipes) {
-				vwk_pipe_ev(vwk, kp);
 			} else {
 				assert(kp->filter == EVFILT_READ);
 				vwk_sess_ev(vwk, kp, now);
@@ -219,17 +197,17 @@ vwk_init(waiter_handle_f *func, volatile double *tmo)
 	AN(vwk);
 
 	INIT_OBJ(vwk->waiter, WAITER_MAGIC);
+	VTAILQ_INIT(&vwk->waiter->sesshead);
+	vwk->waiter->priv = vwk;
+	
+	EV_SET(&vwk->ki[vwk->nki], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
+	vwk->nki++;
+
+	WAIT_UsePipe(vwk->waiter);
 
 	vwk->func = func;
 	vwk->tmo = tmo;
 
-	VTAILQ_INIT(&vwk->waiter->sesshead);
-	AZ(pipe(vwk->pipes));
-
-	AZ(VFIL_nonblocking(vwk->pipes[0]));
-	AZ(VFIL_nonblocking(vwk->pipes[1]));
-	vwk->waiter->pfd = vwk->pipes[1];
-
 	AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
 	return (vwk->waiter);
 }
@@ -239,6 +217,7 @@ vwk_init(waiter_handle_f *func, volatile double *tmo)
 const struct waiter_impl waiter_kqueue = {
 	.name =		"kqueue",
 	.init =		vwk_init,
+	.inject =	vwk_inject,
 };
 
 #endif /* defined(HAVE_KQUEUE) */
diff --git a/bin/varnishd/waiter/waiter_priv.h b/bin/varnishd/waiter/waiter_priv.h
index 9b90f1c..59691f0 100644
--- a/bin/varnishd/waiter/waiter_priv.h
+++ b/bin/varnishd/waiter/waiter_priv.h
@@ -37,6 +37,9 @@ struct waiter {
 	const struct waiter_impl	*impl;
 	waiter_handle_f *		func;
 
+	int				pipes[2];
+	struct waited			*pipe_w;
+
 	volatile double			*tmo;
 	VTAILQ_HEAD(,waited)		sesshead;
 
@@ -46,15 +49,18 @@ struct waiter {
 
 typedef struct waiter* waiter_init_f(waiter_handle_f *, volatile double *);
 typedef int waiter_pass_f(void *priv, struct waited *);
+typedef void waiter_inject_f(const struct waiter *, struct waited *);
 
 struct waiter_impl {
 	const char		*name;
 	waiter_init_f		*init;
 	waiter_pass_f		*pass;
+	waiter_inject_f		*inject;
 };
 
 /* cache_waiter.c */
 void WAIT_handle(struct waiter *, struct waited *, enum wait_event, double now);
+void WAIT_UsePipe(struct waiter *w);
 
 /* mgt_waiter.c */
 extern struct waiter_impl const * waiter;



More information about the varnish-commit mailing list