[master] 64fbc76 Bring back the kqueue waiter.
Poul-Henning Kamp
phk at FreeBSD.org
Thu May 21 18:48:51 CEST 2015
commit 64fbc76108d77a601957165317dba1163d70fa6b
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Thu May 21 16:41:26 2015 +0000
Bring back the kqueue waiter.
diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 89d4cd9..1a0dc03 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -376,6 +376,7 @@ struct waited {
int fd;
void *ptr;
double idle;
+ VTAILQ_ENTRY(waited) list;
};
/* Stored object -----------------------------------------------------
diff --git a/bin/varnishd/waiter/cache_waiter_kqueue.c b/bin/varnishd/waiter/cache_waiter_kqueue.c
index c28b2de..f5452ac 100644
--- a/bin/varnishd/waiter/cache_waiter_kqueue.c
+++ b/bin/varnishd/waiter/cache_waiter_kqueue.c
@@ -26,12 +26,8 @@
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
- * XXX: We need to pass sessions back into the event engine when they are
- * reused. Not sure what the most efficient way is for that. For now
- * write the session pointer to a pipe which the event engine monitors.
*/
-#if 0
#include "config.h"
#if defined(HAVE_KQUEUE)
@@ -39,6 +35,7 @@
#include <sys/types.h>
#include <sys/event.h>
+#include <errno.h>
#include <stdlib.h>
#include "cache/cache.h"
@@ -47,95 +44,18 @@
#include "waiter/waiter_priv.h"
#include "vtim.h"
-#define NKEV 100
+#define NKEV 256
struct vwk {
unsigned magic;
#define VWK_MAGIC 0x1cc2acc2
+ int kq;
struct waiter *waiter;
-
pthread_t thread;
- int kq;
- struct kevent ki[NKEV];
- unsigned nki;
-};
-
-/*--------------------------------------------------------------------*/
-
-static void
-vwk_kq_flush(struct vwk *vwk)
-{
- int i;
-
- if (vwk->nki == 0)
- return;
- i = kevent(vwk->kq, vwk->ki, vwk->nki, NULL, 0, NULL);
- AZ(i);
- vwk->nki = 0;
-}
-
-static void
-vwk_kq_sess(struct vwk *vwk, struct waited *sp, short arm)
-{
-
- CHECK_OBJ_NOTNULL(sp, WAITED_MAGIC);
- assert(sp->fd >= 0);
- EV_SET(&vwk->ki[vwk->nki], sp->fd, EVFILT_READ, arm, 0, 0, sp);
- if (++vwk->nki == NKEV)
- vwk_kq_flush(vwk);
-}
-
-/*--------------------------------------------------------------------*/
-static void
-vwk_inject(const struct waiter *w, struct waited *wp)
-{
- struct vwk *vwk;
-
- CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
- if (wp == w->pipe_w)
- vwk_kq_sess(vwk, wp, EV_ADD);
- else
- vwk_kq_sess(vwk, wp, EV_ADD | EV_ONESHOT);
-}
-
-#if 0
-static void
-vwk_evict(const struct waiter *w, struct waited *wp)
-{
- struct vwk *vwk;
-
- CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
- vwk_kq_sess(vwk, wp, EV_DELETE);
-}
-#endif
-
-/*--------------------------------------------------------------------*/
-
-static void
-vwk_sess_ev(const struct vwk *vwk, const struct kevent *kp, double now)
-{
- struct waited *sp;
- double idle;
-
- AN(kp->udata);
- CAST_OBJ_NOTNULL(sp, kp->udata, WAITED_MAGIC);
-
- idle = now - *vwk->waiter->tmo;
-
- if (sp->idle <= idle) {
- Wait_Handle(vwk->waiter, sp, WAITER_TIMEOUT, now);
- } else if (kp->flags & EV_EOF) {
- Wait_Handle(vwk->waiter, sp, WAITER_REMCLOSE, now);
- } else {
- if (kp->data == 0)
- VSL(SLT_Debug, 0,
- "KQR d %ju filter %d data %jd flags 0x%x idle %g",
- (uintmax_t)kp->ident, kp->filter,
- (intmax_t)kp->data, kp->flags, sp->idle - idle);
- Wait_Handle(vwk->waiter, sp, WAITER_ACTION, now);
- }
-}
+ VTAILQ_HEAD(,waited) list;
+ struct lock mtx;
+};
/*--------------------------------------------------------------------*/
@@ -145,30 +65,77 @@ vwk_thread(void *priv)
struct vwk *vwk;
struct kevent ke[NKEV], *kp;
int j, n;
- double now;
+ double now, idle, last_idle;
+ struct timespec ts;
+ struct waited *wp, *wp2;
CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
THR_SetName("cache-kqueue");
- vwk_kq_flush(vwk);
-
- vwk->nki = 0;
- while (!vwk->waiter->dismantle) {
- n = kevent(vwk->kq, vwk->ki, vwk->nki, ke, NKEV, NULL);
+ last_idle = 0.0;
+ while (1) {
+ now = .3 * *vwk->waiter->tmo;
+ ts.tv_sec = (time_t)floor(now);
+ ts.tv_nsec = (long)(1e9 * (now - ts.tv_sec));
+ n = kevent(vwk->kq, NULL, 0, ke, NKEV, &ts);
+ if (n < 0 && errno == EBADF)
+ break;
assert(n <= NKEV);
- if (n == 0) {
- /* This happens on OSX in m00011.vtc */
- (void)usleep(10000);
- }
- vwk->nki = 0;
now = VTIM_real();
+ idle = now - *vwk->waiter->tmo;
for (kp = ke, j = 0; j < n; j++, kp++) {
assert(kp->filter == EVFILT_READ);
- vwk_sess_ev(vwk, kp, now);
+ CAST_OBJ_NOTNULL(wp, ke[j].udata, WAITED_MAGIC);
+ Lck_Lock(&vwk->mtx);
+ VTAILQ_REMOVE(&vwk->list, wp, list);
+ Lck_Unlock(&vwk->mtx);
+ if (wp->idle <= idle)
+ vwk->waiter->func(wp, WAITER_TIMEOUT, now);
+ else if (kp->flags & EV_EOF)
+ vwk->waiter->func(wp, WAITER_REMCLOSE, now);
+ else
+ vwk->waiter->func(wp, WAITER_ACTION, now);
+ }
+ if (now - last_idle > .3 * *vwk->waiter->tmo) {
+ last_idle = now;
+ n = 0;
+ Lck_Lock(&vwk->mtx);
+ VTAILQ_FOREACH_SAFE(wp, &vwk->list, list, wp2) {
+ if (wp->idle > idle)
+ continue;
+ EV_SET(ke + n, wp->fd,
+ EVFILT_READ, EV_DELETE, 0, 0, wp);
+ if (++n == NKEV)
+ break;
+ }
+ if (n > 0)
+ AZ(kevent(vwk->kq, ke, n, NULL, 0, NULL));
+ for (j = 0; j < n; j++) {
+ CAST_OBJ_NOTNULL(wp, ke[j].udata, WAITED_MAGIC);
+ VTAILQ_REMOVE(&vwk->list, wp, list);
+ vwk->waiter->func(wp, WAITER_TIMEOUT, now);
+ }
+ Lck_Unlock(&vwk->mtx);
}
- Wait_Handle(vwk->waiter, NULL, WAITER_ACTION, now);
}
- NEEDLESS_RETURN(NULL);
+ return(NULL);
+}
+
+/*--------------------------------------------------------------------*/
+
+static int __match_proto__(waiter_enter_f)
+vwk_enter(void *priv, struct waited *wp)
+{
+ struct vwk *vwk;
+ struct kevent ke;
+
+ CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
+ EV_SET(&ke, wp->fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, wp);
+ Lck_Lock(&vwk->mtx);
+ VTAILQ_INSERT_TAIL(&vwk->list, wp, list);
+ AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
+ Lck_Unlock(&vwk->mtx);
+ return(0);
}
/*--------------------------------------------------------------------*/
@@ -185,13 +152,16 @@ vwk_init(struct waiter *w)
vwk->kq = kqueue();
assert(vwk->kq >= 0);
-
- Wait_UsePipe(w);
+ VTAILQ_INIT(&vwk->list);
+ Lck_New(&vwk->mtx, lck_misc);
AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
}
-/*--------------------------------------------------------------------*/
+/*--------------------------------------------------------------------
+ * It is the callers responsibility to trigger all fd's waited on to
+ * fail somehow.
+ */
static void __match_proto__(waiter_fini_f)
vwk_fini(struct waiter *w)
@@ -200,8 +170,16 @@ vwk_fini(struct waiter *w)
void *vp;
CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
- AZ(pthread_join(vwk->thread, &vp));
+ Lck_Lock(&vwk->mtx);
+ while (!VTAILQ_EMPTY(&vwk->list)) {
+ Lck_Unlock(&vwk->mtx);
+ (void)usleep(100000);
+ Lck_Lock(&vwk->mtx);
+ }
AZ(close(vwk->kq));
+ vwk->kq = -1;
+ Lck_Unlock(&vwk->mtx);
+ AZ(pthread_join(vwk->thread, &vp));
}
/*--------------------------------------------------------------------*/
@@ -210,9 +188,8 @@ const struct waiter_impl waiter_kqueue = {
.name = "kqueue",
.init = vwk_init,
.fini = vwk_fini,
- .inject = vwk_inject,
+ .enter = vwk_enter,
.size = sizeof(struct vwk),
};
#endif /* defined(HAVE_KQUEUE) */
-#endif
diff --git a/bin/varnishd/waiter/mgt_waiter.c b/bin/varnishd/waiter/mgt_waiter.c
index a97425a..52a380f 100644
--- a/bin/varnishd/waiter/mgt_waiter.c
+++ b/bin/varnishd/waiter/mgt_waiter.c
@@ -39,10 +39,10 @@
#include "waiter/waiter_priv.h"
static const struct waiter_impl *const waiter_impls[] = {
-#if 0
#if defined(HAVE_KQUEUE)
&waiter_kqueue,
#endif
+#if 0
#if defined(HAVE_EPOLL_CTL)
&waiter_epoll,
#endif
More information about the varnish-commit
mailing list