[master] 3dc7838 Start generalizing the pipe-passing stuff for waiters
Poul-Henning Kamp
phk at FreeBSD.org
Wed Jan 14 11:33:14 CET 2015
commit 3dc7838f6fa944233e8a3e101839fed21f5b0335
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Wed Jan 14 10:32:57 2015 +0000
Start generalizing the pipe-passing stuff for waiters
diff --git a/bin/varnishd/waiter/cache_waiter.c b/bin/varnishd/waiter/cache_waiter.c
index 5f22f38..78e901c 100644
--- a/bin/varnishd/waiter/cache_waiter.c
+++ b/bin/varnishd/waiter/cache_waiter.c
@@ -37,9 +37,13 @@
#include "cache/cache.h"
+#include "vfil.h"
+
#include "waiter/waiter.h"
#include "waiter/waiter_priv.h"
+#define NEV 8192
+
const char *
WAIT_GetName(void)
{
@@ -66,6 +70,22 @@ WAIT_Init(waiter_handle_f *func, volatile double *tmo)
return (w);
}
+void
+WAIT_UsePipe(struct waiter *w)
+{
+ CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
+
+ AN(waiter->inject);
+ AZ(pipe(w->pipes));
+ AZ(VFIL_nonblocking(w->pipes[0]));
+ AZ(VFIL_nonblocking(w->pipes[1]));
+ w->pfd = w->pipes[1];
+ ALLOC_OBJ(w->pipe_w, WAITED_MAGIC);
+ w->pipe_w->fd = w->pipes[0];
+ w->pipe_w->deadline = 9e99;
+ waiter->inject(w, w->pipe_w);
+}
+
int
WAIT_Enter(const struct waiter *w, struct waited *wp)
{
@@ -89,9 +109,26 @@ WAIT_Enter(const struct waiter *w, struct waited *wp)
void
WAIT_handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
{
+ struct waited *ss[NEV];
+ int i, j;
+
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
+ if (wp == w->pipe_w) {
+ i = read(w->pipes[0], ss, sizeof ss);
+ if (i == -1 && errno == EAGAIN)
+ return;
+ for (j = 0; i >= sizeof ss[0]; j++, i -= sizeof ss[0]) {
+ CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
+ assert(ss[j]->fd >= 0);
+ VTAILQ_INSERT_TAIL(&w->sesshead, ss[j], list);
+ w->impl->inject(w, ss[j]);
+ }
+ AZ(i);
+ return;
+ }
+
VTAILQ_REMOVE(&w->sesshead, wp, list);
w->func(wp, ev, now);
}
diff --git a/bin/varnishd/waiter/cache_waiter_kqueue.c b/bin/varnishd/waiter/cache_waiter_kqueue.c
index d5017a4..c721ab7 100644
--- a/bin/varnishd/waiter/cache_waiter_kqueue.c
+++ b/bin/varnishd/waiter/cache_waiter_kqueue.c
@@ -46,7 +46,6 @@
#include "waiter/waiter.h"
#include "waiter/waiter_priv.h"
#include "vtim.h"
-#include "vfil.h"
#define NKEV 100
@@ -58,7 +57,6 @@ struct vwk {
waiter_handle_f *func;
volatile double *tmo;
pthread_t thread;
- int pipes[2];
int kq;
struct kevent ki[NKEV];
unsigned nki;
@@ -92,26 +90,15 @@ vwk_kq_sess(struct vwk *vwk, struct waited *sp, short arm)
/*--------------------------------------------------------------------*/
static void
-vwk_pipe_ev(struct vwk *vwk, const struct kevent *kp)
+vwk_inject(const struct waiter *w, struct waited *wp)
{
- int i, j;
- struct waited *ss[NKEV];
+ struct vwk *vwk;
- AN(kp->udata);
- assert(kp->udata == vwk->pipes);
- j = 0;
- i = read(vwk->pipes[0], ss, sizeof ss);
- if (i == -1 && errno == EAGAIN)
- return;
- while (i >= sizeof ss[0]) {
- CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
- assert(ss[j]->fd >= 0);
- VTAILQ_INSERT_TAIL(&vwk->waiter->sesshead, ss[j], list);
- vwk_kq_sess(vwk, ss[j], EV_ADD | EV_ONESHOT);
- j++;
- i -= sizeof ss[0];
- }
- AZ(i);
+ CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
+ if (wp == w->pipe_w)
+ vwk_kq_sess(vwk, wp, EV_ADD);
+ else
+ vwk_kq_sess(vwk, wp, EV_ADD | EV_ONESHOT);
}
/*--------------------------------------------------------------------*/
@@ -122,7 +109,6 @@ vwk_sess_ev(struct vwk *vwk, const struct kevent *kp, double now)
struct waited *sp;
AN(kp->udata);
- assert(kp->udata != vwk->pipes);
CAST_OBJ_NOTNULL(sp, kp->udata, WAITED_MAGIC);
if (kp->data > 0) {
@@ -153,12 +139,7 @@ vwk_thread(void *priv)
vwk->kq = kqueue();
assert(vwk->kq >= 0);
- j = 0;
- EV_SET(&ke[j], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
- j++;
- EV_SET(&ke[j], vwk->pipes[0], EVFILT_READ, EV_ADD, 0, 0, vwk->pipes);
- j++;
- AZ(kevent(vwk->kq, ke, j, NULL, 0, NULL));
+ vwk_kq_flush(vwk);
vwk->nki = 0;
while (1) {
@@ -175,9 +156,6 @@ vwk_thread(void *priv)
for (kp = ke, j = 0; j < n; j++, kp++) {
if (kp->filter == EVFILT_TIMER) {
dotimer = 1;
- } else if (kp->filter == EVFILT_READ &&
- kp->udata == vwk->pipes) {
- vwk_pipe_ev(vwk, kp);
} else {
assert(kp->filter == EVFILT_READ);
vwk_sess_ev(vwk, kp, now);
@@ -219,17 +197,17 @@ vwk_init(waiter_handle_f *func, volatile double *tmo)
AN(vwk);
INIT_OBJ(vwk->waiter, WAITER_MAGIC);
+ VTAILQ_INIT(&vwk->waiter->sesshead);
+ vwk->waiter->priv = vwk;
+
+ EV_SET(&vwk->ki[vwk->nki], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
+ vwk->nki++;
+
+ WAIT_UsePipe(vwk->waiter);
vwk->func = func;
vwk->tmo = tmo;
- VTAILQ_INIT(&vwk->waiter->sesshead);
- AZ(pipe(vwk->pipes));
-
- AZ(VFIL_nonblocking(vwk->pipes[0]));
- AZ(VFIL_nonblocking(vwk->pipes[1]));
- vwk->waiter->pfd = vwk->pipes[1];
-
AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
return (vwk->waiter);
}
@@ -239,6 +217,7 @@ vwk_init(waiter_handle_f *func, volatile double *tmo)
const struct waiter_impl waiter_kqueue = {
.name = "kqueue",
.init = vwk_init,
+ .inject = vwk_inject,
};
#endif /* defined(HAVE_KQUEUE) */
diff --git a/bin/varnishd/waiter/waiter_priv.h b/bin/varnishd/waiter/waiter_priv.h
index 9b90f1c..59691f0 100644
--- a/bin/varnishd/waiter/waiter_priv.h
+++ b/bin/varnishd/waiter/waiter_priv.h
@@ -37,6 +37,9 @@ struct waiter {
const struct waiter_impl *impl;
waiter_handle_f * func;
+ int pipes[2];
+ struct waited *pipe_w;
+
volatile double *tmo;
VTAILQ_HEAD(,waited) sesshead;
@@ -46,15 +49,18 @@ struct waiter {
typedef struct waiter* waiter_init_f(waiter_handle_f *, volatile double *);
typedef int waiter_pass_f(void *priv, struct waited *);
+typedef void waiter_inject_f(const struct waiter *, struct waited *);
struct waiter_impl {
const char *name;
waiter_init_f *init;
waiter_pass_f *pass;
+ waiter_inject_f *inject;
};
/* cache_waiter.c */
void WAIT_handle(struct waiter *, struct waited *, enum wait_event, double now);
+void WAIT_UsePipe(struct waiter *w);
/* mgt_waiter.c */
extern struct waiter_impl const * waiter;
More information about the varnish-commit
mailing list