[master] 0f03834 Give the kqueue waiter a binheap to track timeouts.
Poul-Henning Kamp
phk at FreeBSD.org
Wed May 27 23:38:32 CEST 2015
commit 0f038349f7532249c56ccf12850b5ec0bd9dcd29
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Wed May 27 09:08:19 2015 +0000
Give the kqueue waiter a binheap to track timeouts.
diff --git a/bin/varnishd/waiter/cache_waiter.c b/bin/varnishd/waiter/cache_waiter.c
index 46957be..d3a33ca 100644
--- a/bin/varnishd/waiter/cache_waiter.c
+++ b/bin/varnishd/waiter/cache_waiter.c
@@ -34,11 +34,80 @@
#include <stdio.h>
#include <stdlib.h>
+#include "binary_heap.h"
+
#include "cache/cache.h"
#include "waiter/waiter_priv.h"
#include "waiter/mgt_waiter.h"
+static int __match_proto__(binheap_cmp_t)
+waited_cmp(void *priv, const void *a, const void *b)
+{
+ const struct waiter *ww;
+ const struct waited *aa, *bb;
+
+ CAST_OBJ_NOTNULL(ww, priv, WAITER_MAGIC);
+ CAST_OBJ_NOTNULL(aa, a, WAITED_MAGIC);
+ CAST_OBJ_NOTNULL(bb, b, WAITED_MAGIC);
+
+ return (aa->idle + Wait_Tmo(ww, aa) < bb->idle + Wait_Tmo(ww, bb));
+}
+
+static void __match_proto__(binheap_update_t)
+waited_update(void *priv, void *p, unsigned u)
+{
+ struct waited *pp;
+
+ (void)priv;
+ CAST_OBJ_NOTNULL(pp, p, WAITED_MAGIC);
+ pp->idx = u;
+}
+
+/**********************************************************************/
+
+void
+Wait_Call(const struct waiter *w,
+ struct waited *wp, enum wait_event ev, double now)
+{
+ CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
+ CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
+ CHECK_OBJ_NOTNULL(w->waitfor, WAITFOR_MAGIC);
+ AN(w->waitfor->func);
+ if (wp->idx != BINHEAP_NOIDX)
+ binheap_delete(w->heap, wp->idx);
+ assert(wp->idx == BINHEAP_NOIDX);
+ w->waitfor->func(wp, ev, now);
+}
+
+/**********************************************************************/
+
+void
+Wait_HeapInsert(const struct waiter *w, struct waited *wp)
+{
+ CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
+ CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
+ assert(wp->idx == BINHEAP_NOIDX);
+ binheap_insert(w->heap, wp);
+}
+
+double
+Wait_HeapDue(const struct waiter *w, struct waited **wpp)
+{
+ struct waited *wp;
+
+ wp = binheap_root(w->heap);
+ CHECK_OBJ_ORNULL(wp, WAITED_MAGIC);
+ if (wp == NULL) {
+ *wpp = NULL;
+ return (0);
+ }
+ *wpp = wp;
+ return(wp->idle + Wait_Tmo(w, wp));
+}
+
+/**********************************************************************/
+
int
Wait_Enter(const struct waiter *w, struct waited *wp)
{
@@ -46,6 +115,7 @@ Wait_Enter(const struct waiter *w, struct waited *wp)
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
assert(wp->fd > 0); // stdin never comes here
+ wp->idx = BINHEAP_NOIDX;
return (w->impl->enter(w->priv, wp));
}
@@ -79,6 +149,7 @@ Waiter_New(struct waitfor *wf)
w->impl = waiter;
w->waitfor = wf;
VTAILQ_INIT(&w->waithead);
+ w->heap = binheap_new(w, waited_cmp, waited_update);
waiter->init(w);
@@ -95,6 +166,7 @@ Waiter_Destroy(struct waiter **wp)
*wp = NULL;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
+ AZ(binheap_root(w->heap));
AN(w->impl->fini);
w->impl->fini(w);
FREE_OBJ(w);
diff --git a/bin/varnishd/waiter/cache_waiter_kqueue.c b/bin/varnishd/waiter/cache_waiter_kqueue.c
index a6d4c8c..f8af96e 100644
--- a/bin/varnishd/waiter/cache_waiter_kqueue.c
+++ b/bin/varnishd/waiter/cache_waiter_kqueue.c
@@ -52,6 +52,8 @@ struct vwk {
int kq;
struct waiter *waiter;
pthread_t thread;
+ double next;
+ int pipe[2];
VTAILQ_HEAD(,waited) list;
struct lock mtx;
@@ -66,58 +68,67 @@ vwk_thread(void *priv)
struct vwk *vwk;
struct kevent ke[NKEV], *kp;
int j, n;
- double now, idle, last_idle;
+ double now, then;
struct timespec ts;
- struct waited *wp, *wp2;
+ struct waited *wp;
+ struct waiter *w;
+ char c;
CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
+ w = vwk->waiter;
+ CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
THR_SetName("cache-kqueue");
- last_idle = 0.0;
+ now = VTIM_real();
+ Lck_Lock(&vwk->mtx);
while (1) {
- now = .3 * Wait_Tmo(vwk->waiter, NULL);
- ts.tv_sec = (time_t)floor(now);
- ts.tv_nsec = (long)(1e9 * (now - ts.tv_sec));
+ while (1) {
+ /*
+ * XXX: We could avoid many syscalls here if we were
+ * XXX: allowed to just close the fd's on timeout.
+ */
+ then = Wait_HeapDue(w, &wp);
+ if (wp == NULL) {
+ vwk->next = now + 100;
+ break;
+ } else if (then > now) {
+ vwk->next = then;
+ break;
+ }
+ CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
+ EV_SET(ke, wp->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+ AZ(kevent(vwk->kq, ke, 1, NULL, 0, NULL));
+ Wait_Call(w, wp, WAITER_TIMEOUT, now);
+ }
+ then = vwk->next - now;
+ ts.tv_sec = (time_t)floor(then);
+ ts.tv_nsec = (long)(1e9 * (then - ts.tv_sec));
+ Lck_Unlock(&vwk->mtx);
n = kevent(vwk->kq, NULL, 0, ke, NKEV, &ts);
- if (n < 0 && vwk->die)
- break;
assert(n >= 0);
assert(n <= NKEV);
now = VTIM_real();
+ Lck_Lock(&vwk->mtx);
for (kp = ke, j = 0; j < n; j++, kp++) {
assert(kp->filter == EVFILT_READ);
+ if (ke[j].udata == vwk) {
+ assert(read(vwk->pipe[0], &c, 1) == 1);
+ continue;
+ }
CAST_OBJ_NOTNULL(wp, ke[j].udata, WAITED_MAGIC);
- Lck_Lock(&vwk->mtx);
VTAILQ_REMOVE(&vwk->list, wp, list);
- Lck_Unlock(&vwk->mtx);
if (kp->flags & EV_EOF)
- Wait_Call(vwk->waiter, wp, WAITER_REMCLOSE, now);
+ Wait_Call(w, wp, WAITER_REMCLOSE, now);
else
- Wait_Call(vwk->waiter, wp, WAITER_ACTION, now);
- }
- idle = now - Wait_Tmo(vwk->waiter, NULL);
- if (now - last_idle < .3 * Wait_Tmo(vwk->waiter, NULL))
- continue;
- last_idle = now;
- n = 0;
- Lck_Lock(&vwk->mtx);
- VTAILQ_FOREACH_SAFE(wp, &vwk->list, list, wp2) {
- if (wp->idle > idle)
- continue;
- EV_SET(ke + n, wp->fd,
- EVFILT_READ, EV_DELETE, 0, 0, wp);
- if (++n == NKEV)
- break;
+ Wait_Call(w, wp, WAITER_ACTION, now);
}
- if (n > 0)
- AZ(kevent(vwk->kq, ke, n, NULL, 0, NULL));
- for (j = 0; j < n; j++) {
- CAST_OBJ_NOTNULL(wp, ke[j].udata, WAITED_MAGIC);
- VTAILQ_REMOVE(&vwk->list, wp, list);
- Wait_Call(vwk->waiter, wp, WAITER_TIMEOUT, now);
- }
- Lck_Unlock(&vwk->mtx);
+ if (VTAILQ_EMPTY(&vwk->list) && vwk->die)
+ break;
}
+ Lck_Unlock(&vwk->mtx);
+ AZ(close(vwk->pipe[0]));
+ AZ(close(vwk->pipe[1]));
+ AZ(close(vwk->kq));
return(NULL);
}
@@ -133,7 +144,13 @@ vwk_enter(void *priv, struct waited *wp)
EV_SET(&ke, wp->fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, wp);
Lck_Lock(&vwk->mtx);
VTAILQ_INSERT_TAIL(&vwk->list, wp, list);
+ Wait_HeapInsert(vwk->waiter, wp);
AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
+
+ /* If the kqueue isn't due before our timeout, poke it via the pipe */
+ if (Wait_Tmo(vwk->waiter, wp) < vwk->next)
+ assert(write(vwk->pipe[1], "X", 1) == 1);
+
Lck_Unlock(&vwk->mtx);
return(0);
}
@@ -144,6 +161,7 @@ static void __match_proto__(waiter_init_f)
vwk_init(struct waiter *w)
{
struct vwk *vwk;
+ struct kevent ke;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
vwk = w->priv;
@@ -154,6 +172,9 @@ vwk_init(struct waiter *w)
assert(vwk->kq >= 0);
VTAILQ_INIT(&vwk->list);
Lck_New(&vwk->mtx, lck_misc);
+ AZ(pipe(vwk->pipe));
+ EV_SET(&ke, vwk->pipe[0], EVFILT_READ, EV_ADD, 0, 0, vwk);
+ AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
}
@@ -168,19 +189,11 @@ vwk_fini(struct waiter *w)
{
struct vwk *vwk;
void *vp;
- int i;
CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
Lck_Lock(&vwk->mtx);
- while (!VTAILQ_EMPTY(&vwk->list)) {
- Lck_Unlock(&vwk->mtx);
- (void)usleep(100000);
- Lck_Lock(&vwk->mtx);
- }
vwk->die = 1;
- i = vwk->kq;
- vwk->kq = -1;
- AZ(close(i));
+ assert(write(vwk->pipe[1], "Y", 1) == 1);
Lck_Unlock(&vwk->mtx);
AZ(pthread_join(vwk->thread, &vp));
Lck_Delete(&vwk->mtx);
diff --git a/bin/varnishd/waiter/waiter.h b/bin/varnishd/waiter/waiter.h
index 0e70d9e..7227fcd 100644
--- a/bin/varnishd/waiter/waiter.h
+++ b/bin/varnishd/waiter/waiter.h
@@ -65,6 +65,7 @@ struct waited {
unsigned magic;
#define WAITED_MAGIC 0x1743992d
int fd;
+ unsigned idx;
void *ptr;
double idle;
VTAILQ_ENTRY(waited) list;
diff --git a/bin/varnishd/waiter/waiter_priv.h b/bin/varnishd/waiter/waiter_priv.h
index b4d9078..5215995 100644
--- a/bin/varnishd/waiter/waiter_priv.h
+++ b/bin/varnishd/waiter/waiter_priv.h
@@ -30,6 +30,7 @@
*/
struct waited;
+struct binheap;
struct waiter {
unsigned magic;
@@ -41,6 +42,7 @@ struct waiter {
struct waitfor *waitfor;
void *priv;
+ struct binheap *heap;
};
typedef void waiter_init_f(struct waiter *);
@@ -68,13 +70,7 @@ Wait_Tmo(const struct waiter *w, const struct waited *wp)
return (*w->waitfor->tmo);
}
-static inline void
-Wait_Call(const struct waiter *w,
- struct waited *wp, enum wait_event ev, double now)
-{
- CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
- CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
- CHECK_OBJ_NOTNULL(w->waitfor, WAITFOR_MAGIC);
- AN(w->waitfor->func);
- w->waitfor->func(wp, ev, now);
-}
+void Wait_Call(const struct waiter *, struct waited *,
+ enum wait_event ev, double now);
+void Wait_HeapInsert(const struct waiter *, struct waited *);
+double Wait_HeapDue(const struct waiter *, struct waited **);
More information about the varnish-commit
mailing list