[master] ef6918d Attempt to implement waiter destruction in kqueue and poll waiters.
Poul-Henning Kamp
phk at FreeBSD.org
Wed Jan 21 10:35:26 CET 2015
commit ef6918d6ece269e10fc65a242afe8c17f272fc24
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Wed Jan 21 09:35:04 2015 +0000
Attempt to implement waiter destruction in kqueue and poll waiters.
diff --git a/bin/varnishd/cache/cache_session.c b/bin/varnishd/cache/cache_session.c
index 822c0a2..96ae0a3 100644
--- a/bin/varnishd/cache/cache_session.c
+++ b/bin/varnishd/cache/cache_session.c
@@ -272,6 +272,8 @@ ses_handle(struct waited *wp, enum wait_event ev, double now)
if (Pool_Task(pp->pool, &sp->task, POOL_QUEUE_FRONT))
SES_Delete(sp, SC_OVERLOAD, now);
break;
+ case WAITER_CLOSE:
+ WRONG("Should not see WAITER_CLOSE on client side");
default:
WRONG("Wrong event in ses_handle");
}
diff --git a/bin/varnishd/waiter/cache_waiter.c b/bin/varnishd/waiter/cache_waiter.c
index 1629625..db0ef8a 100644
--- a/bin/varnishd/waiter/cache_waiter.c
+++ b/bin/varnishd/waiter/cache_waiter.c
@@ -124,13 +124,42 @@ void
Wait_Destroy(struct waiter **wp)
{
struct waiter *w;
+ struct waited *wx = NULL;
+ int written;
+ double now;
AN(wp);
w = *wp;
*wp = NULL;
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
+
+ Lck_Lock(&wait_mtx);
+ VTAILQ_REMOVE(&waiters, w, list);
+ w->dismantle = 1;
+ Lck_Unlock(&wait_mtx);
+
+ if (w->pipes[1] >= 0) {
+ while (1) {
+ written = write(w->pipes[1], &wx, sizeof wx);
+ if (written == sizeof wx)
+ break;
+ (void)usleep(10000);
+ }
+ }
AN(w->impl->fini);
w->impl->fini(w);
+ now = VTIM_real();
+ while (1) {
+ wx = VTAILQ_FIRST(&w->waithead);
+ if (wx == NULL)
+ break;
+ VTAILQ_REMOVE(&w->waithead, wx, list);
+ if (wx == w->pipe_w)
+ FREE_OBJ(wx);
+ else
+ w->func(wx, WAITER_CLOSE, now);
+ }
+ FREE_OBJ(w);
}
void
@@ -157,6 +186,7 @@ Wait_Enter(const struct waiter *w, struct waited *wp)
CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
assert(wp->fd >= 0);
+ AZ(w->dismantle);
if (w->impl->pass != NULL)
return (w->impl->pass(w->priv, wp));
@@ -218,11 +248,13 @@ Wait_Handle(struct waiter *w, struct waited *wp, enum wait_event ev, double now)
CHECK_OBJ_NOTNULL(ss[j], WAITED_MAGIC);
if (ss[j] == w->pipe_w) {
dotimer = 1;
- continue;
+ } else if (ss[j] == NULL) {
+ AN(w->dismantle);
+ } else {
+ assert(ss[j]->fd >= 0);
+ VTAILQ_INSERT_TAIL(&w->waithead, ss[j], list);
+ w->impl->inject(w, ss[j]);
}
- assert(ss[j]->fd >= 0);
- VTAILQ_INSERT_TAIL(&w->waithead, ss[j], list);
- w->impl->inject(w, ss[j]);
}
AZ(i);
diff --git a/bin/varnishd/waiter/cache_waiter_epoll.c b/bin/varnishd/waiter/cache_waiter_epoll.c
index 9bb7295..7ded624 100644
--- a/bin/varnishd/waiter/cache_waiter_epoll.c
+++ b/bin/varnishd/waiter/cache_waiter_epoll.c
@@ -57,7 +57,7 @@ struct vwe {
#define VWE_MAGIC 0x6bd73424
struct waiter *waiter;
- pthread_t epoll_thread;
+ pthread_t thread;
int epfd;
};
@@ -113,7 +113,7 @@ vwe_thread(void *priv)
THR_SetName("cache-epoll");
- while (1) {
+ while (!vew->waiter->dismantle) {
n = epoll_wait(vwe->epfd, ev, NEEV, -1);
now = VTIM_real();
for (ep = ev, i = 0; i < n; i++, ep++)
@@ -139,7 +139,21 @@ vwe_init(struct waiter *w)
Wait_UsePipe(w);
- AZ(pthread_create(&vwe->epoll_thread, NULL, vwe_thread, vwe));
+ AZ(pthread_create(&vwe->thread, NULL, vwe_thread, vwe));
+}
+
+/*--------------------------------------------------------------------*/
+
+static void __match_proto__(waiter_fini_f)
+vwe_fini(struct waiter *w)
+{
+ struct vwe *vwe;
+ void *vp;
+
+ CAST_OBJ_NOTNULL(vwe, w->priv, VWE_MAGIC);
+
+ AZ(pthread_join(vwe->thread, &vp));
+ WRONG("Not Yet Implemented");
}
/*--------------------------------------------------------------------*/
@@ -147,6 +161,7 @@ vwe_init(struct waiter *w)
const struct waiter_impl waiter_epoll = {
.name = "epoll",
.init = vwe_init,
+ .fini = vwe_fini,
.inject = vwe_inject,
.size = sizeof(struct vwe),
};
diff --git a/bin/varnishd/waiter/cache_waiter_kqueue.c b/bin/varnishd/waiter/cache_waiter_kqueue.c
index 8609048..d371d7e 100644
--- a/bin/varnishd/waiter/cache_waiter_kqueue.c
+++ b/bin/varnishd/waiter/cache_waiter_kqueue.c
@@ -133,13 +133,10 @@ vwk_thread(void *priv)
CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
THR_SetName("cache-kqueue");
- vwk->kq = kqueue();
- assert(vwk->kq >= 0);
-
vwk_kq_flush(vwk);
vwk->nki = 0;
- while (1) {
+ while (!vwk->waiter->dismantle) {
n = kevent(vwk->kq, vwk->ki, vwk->nki, ke, NKEV, NULL);
assert(n <= NKEV);
if (n == 0) {
@@ -168,6 +165,9 @@ vwk_init(struct waiter *w)
INIT_OBJ(vwk, VWK_MAGIC);
vwk->waiter = w;
+ vwk->kq = kqueue();
+ assert(vwk->kq >= 0);
+
Wait_UsePipe(w);
AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
@@ -175,9 +175,23 @@ vwk_init(struct waiter *w)
/*--------------------------------------------------------------------*/
+static void __match_proto__(waiter_fini_f)
+vwk_fini(struct waiter *w)
+{
+ struct vwk *vwk;
+ void *vp;
+
+ CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
+ AZ(pthread_join(vwk->thread, &vp));
+ AZ(close(vwk->kq));
+}
+
+/*--------------------------------------------------------------------*/
+
const struct waiter_impl waiter_kqueue = {
.name = "kqueue",
.init = vwk_init,
+ .fini = vwk_fini,
.inject = vwk_inject,
.size = sizeof(struct vwk),
};
diff --git a/bin/varnishd/waiter/cache_waiter_poll.c b/bin/varnishd/waiter/cache_waiter_poll.c
index 48c3cd7..81763a1 100644
--- a/bin/varnishd/waiter/cache_waiter_poll.c
+++ b/bin/varnishd/waiter/cache_waiter_poll.c
@@ -45,7 +45,7 @@ struct vwp {
#define VWP_MAGIC 0x4b2cc735
struct waiter *waiter;
- pthread_t poll_thread;
+ pthread_t thread;
struct pollfd *pollfd;
unsigned npoll;
unsigned hpoll;
@@ -137,7 +137,7 @@ vwp_main(void *priv)
CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
THR_SetName("cache-poll");
- while (1) {
+ while (!vwp->waiter->dismantle) {
assert(vwp->hpoll < vwp->npoll);
while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
vwp->hpoll--;
@@ -172,7 +172,7 @@ vwp_main(void *priv)
/*--------------------------------------------------------------------*/
static void __match_proto__(waiter_init_f)
-vwp_poll_init(struct waiter *w)
+vwp_init(struct waiter *w)
{
struct vwp *vwp;
@@ -183,14 +183,28 @@ vwp_poll_init(struct waiter *w)
vwp_pollspace(vwp, 256);
Wait_UsePipe(w);
- AZ(pthread_create(&vwp->poll_thread, NULL, vwp_main, vwp));
+ AZ(pthread_create(&vwp->thread, NULL, vwp_main, vwp));
+}
+
+/*--------------------------------------------------------------------*/
+
+static void __match_proto__(waiter_fini_f)
+vwp_fini(struct waiter *w)
+{
+ struct vwp *vwp;
+ void *vp;
+
+ CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
+ AZ(pthread_join(vwp->thread, &vp));
+ free(vwp->pollfd);
}
/*--------------------------------------------------------------------*/
const struct waiter_impl waiter_poll = {
.name = "poll",
- .init = vwp_poll_init,
+ .init = vwp_init,
+ .fini = vwp_fini,
.inject = vwp_inject,
.evict = vwp_evict,
.size = sizeof(struct vwp),
diff --git a/bin/varnishd/waiter/cache_waiter_ports.c b/bin/varnishd/waiter/cache_waiter_ports.c
index 158a624..5ba8a8a 100644
--- a/bin/varnishd/waiter/cache_waiter_ports.c
+++ b/bin/varnishd/waiter/cache_waiter_ports.c
@@ -53,7 +53,7 @@ struct vws {
#define VWS_MAGIC 0x0b771473
struct waiter *waiter;
- pthread_t ports_thread;
+ pthread_t thread;
int dport;
};
@@ -150,7 +150,7 @@ vws_thread(void *priv)
timeout = &max_ts;
- while (1) {
+ while (!vws->waiter->dismantle) {
port_event_t ev[MAX_EVENTS];
u_int nevents;
int ei, ret;
@@ -259,7 +259,20 @@ vws_init(struct waiter *w)
INIT_OBJ(vws, VWS_MAGIC);
vws->waiter = w;
- AZ(pthread_create(&vws->ports_thread, NULL, vws_thread, vws));
+ AZ(pthread_create(&vws->thread, NULL, vws_thread, vws));
+}
+
+/*--------------------------------------------------------------------*/
+
+static void __match_proto__(waiter_fini_f)
+vws_fini(struct waiter *w)
+{
+ struct vws *vwe;
+ void *vp;
+
+ CAST_OBJ_NOTNULL(vws, w->priv, VWS_MAGIC);
+ AZ(pthread_join(vwp->thread, &vp));
+ WRONG("Not Yet Implemented");
}
/*--------------------------------------------------------------------*/
@@ -267,6 +280,7 @@ vws_init(struct waiter *w)
const struct waiter_impl waiter_ports = {
.name = "ports",
.init = vws_init,
+ .fini = vws_fini,
.pass = vws_pass,
.size = sizeof(struct vws),
};
diff --git a/bin/varnishd/waiter/waiter.h b/bin/varnishd/waiter/waiter.h
index 8b52b6b..953df8e 100644
--- a/bin/varnishd/waiter/waiter.h
+++ b/bin/varnishd/waiter/waiter.h
@@ -48,7 +48,8 @@ struct waiter;
enum wait_event {
WAITER_REMCLOSE,
WAITER_TIMEOUT,
- WAITER_ACTION
+ WAITER_ACTION,
+ WAITER_CLOSE
};
#define WAITER_DEFAULT "platform dependent"
diff --git a/bin/varnishd/waiter/waiter_priv.h b/bin/varnishd/waiter/waiter_priv.h
index f18dd7e..d83d369 100644
--- a/bin/varnishd/waiter/waiter_priv.h
+++ b/bin/varnishd/waiter/waiter_priv.h
@@ -36,6 +36,7 @@ struct waiter {
#define WAITER_MAGIC 0x17c399db
const struct waiter_impl *impl;
VTAILQ_ENTRY(waiter) list;
+ int dismantle;
waiter_handle_f * func;
More information about the varnish-commit
mailing list