[master] 552f71c Move the waiters into their own subdirectory where VMODs won't see them.
Poul-Henning Kamp
phk at varnish-cache.org
Wed Oct 12 18:03:46 CEST 2011
commit 552f71c17ee8fb8d745869d1fce65e8f9521b629
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Wed Oct 12 16:03:18 2011 +0000
Move the waiters into their own subdirectory where VMODs won't see them.
diff --git a/bin/varnishd/Makefile.am b/bin/varnishd/Makefile.am
index 5260427..cdec084 100644
--- a/bin/varnishd/Makefile.am
+++ b/bin/varnishd/Makefile.am
@@ -44,11 +44,11 @@ varnishd_SOURCES = \
cache_vrt_re.c \
cache_vrt_var.c \
cache_vrt_vmod.c \
- cache_waiter.c \
- cache_waiter_epoll.c \
- cache_waiter_kqueue.c \
- cache_waiter_poll.c \
- cache_waiter_ports.c \
+ waiter/cache_waiter.c \
+ waiter/cache_waiter_epoll.c \
+ waiter/cache_waiter_kqueue.c \
+ waiter/cache_waiter_poll.c \
+ waiter/cache_waiter_ports.c \
cache_wrk.c \
cache_wrw.c \
cache_ws.c \
@@ -81,7 +81,7 @@ noinst_HEADERS = \
cache.h \
cache_backend.h \
cache_esi.h \
- cache_waiter.h \
+ waiter/cache_waiter.h \
common.h \
default_vcl.h \
hash_slinger.h \
diff --git a/bin/varnishd/cache_main.c b/bin/varnishd/cache_main.c
index 6e62fd2..2d00ee2 100644
--- a/bin/varnishd/cache_main.c
+++ b/bin/varnishd/cache_main.c
@@ -34,7 +34,7 @@
#include "cache.h"
-#include "cache_waiter.h"
+#include "waiter/cache_waiter.h"
#include "hash_slinger.h"
/*--------------------------------------------------------------------
diff --git a/bin/varnishd/cache_panic.c b/bin/varnishd/cache_panic.c
index 482708d..76ea866 100644
--- a/bin/varnishd/cache_panic.c
+++ b/bin/varnishd/cache_panic.c
@@ -43,7 +43,7 @@
#include "vapi/vsm_int.h"
#include "cache_backend.h"
-#include "cache_waiter.h"
+#include "waiter/cache_waiter.h"
#include "libvcl.h"
#include "vcl.h"
diff --git a/bin/varnishd/cache_pool.c b/bin/varnishd/cache_pool.c
index 00eaef5..55ad333 100644
--- a/bin/varnishd/cache_pool.c
+++ b/bin/varnishd/cache_pool.c
@@ -47,7 +47,7 @@
#include "cache.h"
-#include "cache_waiter.h"
+#include "waiter/cache_waiter.h"
#include "hash_slinger.h"
#include "vtcp.h"
#include "vtim.h"
diff --git a/bin/varnishd/cache_session.c b/bin/varnishd/cache_session.c
index d55bae0..5db418c 100644
--- a/bin/varnishd/cache_session.c
+++ b/bin/varnishd/cache_session.c
@@ -40,7 +40,7 @@
#include "cache.h"
-#include "cache_waiter.h"
+#include "waiter/cache_waiter.h"
/*--------------------------------------------------------------------*/
diff --git a/bin/varnishd/cache_waiter.c b/bin/varnishd/cache_waiter.c
deleted file mode 100644
index 9c30edf..0000000
--- a/bin/varnishd/cache_waiter.c
+++ /dev/null
@@ -1,110 +0,0 @@
-/*-
- * Copyright (c) 2006 Verdens Gang AS
- * Copyright (c) 2006-2011 Varnish Software AS
- * All rights reserved.
- *
- * Author: Poul-Henning Kamp <phk at phk.freebsd.dk>
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
- * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- * SUCH DAMAGE.
- *
- */
-
-#include "config.h"
-
-#include "cache.h"
-
-#include "cache_waiter.h"
-#include "vcli.h"
-#include "vcli_priv.h"
-
-static const struct waiter * const vca_waiters[] = {
- #if defined(HAVE_KQUEUE)
- &waiter_kqueue,
- #endif
- #if defined(HAVE_EPOLL_CTL)
- &waiter_epoll,
- #endif
- #if defined(HAVE_PORT_CREATE)
- &waiter_ports,
- #endif
- &waiter_poll,
- NULL,
-};
-
-struct waiter const * waiter;
-
-const char *
-WAIT_GetName(void)
-{
-
- if (waiter != NULL)
- return (waiter->name);
- else
- return ("no_waiter");
-}
-
-void
-WAIT_tweak_waiter(struct cli *cli, const char *arg)
-{
- int i;
-
- ASSERT_MGT();
-
- if (arg == NULL) {
- if (waiter == NULL)
- VCLI_Out(cli, "default");
- else
- VCLI_Out(cli, "%s", waiter->name);
-
- VCLI_Out(cli, " (");
- for (i = 0; vca_waiters[i] != NULL; i++)
- VCLI_Out(cli, "%s%s", i == 0 ? "" : ", ",
- vca_waiters[i]->name);
- VCLI_Out(cli, ")");
- return;
- }
- if (!strcmp(arg, "default")) {
- waiter = NULL;
- return;
- }
- for (i = 0; vca_waiters[i]; i++) {
- if (!strcmp(arg, vca_waiters[i]->name)) {
- waiter = vca_waiters[i];
- return;
- }
- }
- VCLI_Out(cli, "Unknown waiter");
- VCLI_SetResult(cli, CLIS_PARAM);
-}
-
-void
-WAIT_Init(void)
-{
-
- if (waiter == NULL)
- waiter = vca_waiters[0];
-
- AN(waiter);
- AN(waiter->name);
- AN(waiter->init);
- AN(waiter->pass);
-}
diff --git a/bin/varnishd/cache_waiter.h b/bin/varnishd/cache_waiter.h
deleted file mode 100644
index 28bc39d..0000000
--- a/bin/varnishd/cache_waiter.h
+++ /dev/null
@@ -1,64 +0,0 @@
-/*-
- * Copyright (c) 2006 Verdens Gang AS
- * Copyright (c) 2006-2009 Varnish Software AS
- * All rights reserved.
- *
- * Author: Poul-Henning Kamp <phk at phk.freebsd.dk>
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
- * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- * SUCH DAMAGE.
- *
- */
-
-struct sess;
-
-typedef void* waiter_init_f(void);
-typedef void waiter_pass_f(void *priv, const struct sess *);
-
-struct waiter {
- const char *name;
- waiter_init_f *init;
- waiter_pass_f *pass;
-};
-
-extern struct waiter const * waiter;
-
-#if defined(HAVE_EPOLL_CTL)
-extern const struct waiter waiter_epoll;
-#endif
-
-#if defined(HAVE_KQUEUE)
-extern const struct waiter waiter_kqueue;
-#endif
-
-#if defined(HAVE_PORT_CREATE)
-extern const struct waiter waiter_ports;
-#endif
-
-
-/* cache_session.c */
-void SES_Handle(struct sess *sp, int status);
-
-/* cache_waiter.c */
-extern const struct waiter waiter_poll;
-const char *WAIT_GetName(void);
-void WAIT_tweak_waiter(struct cli *cli, const char *arg);
-void WAIT_Init(void);
diff --git a/bin/varnishd/cache_waiter_epoll.c b/bin/varnishd/cache_waiter_epoll.c
deleted file mode 100644
index 301fcf2..0000000
--- a/bin/varnishd/cache_waiter_epoll.c
+++ /dev/null
@@ -1,276 +0,0 @@
-/*-
- * Copyright (c) 2006 Verdens Gang AS
- * Copyright (c) 2006-2010 Varnish Software AS
- * All rights reserved.
- *
- * Author: Rogerio Carvalho Schneider <stockrt at gmail.com>
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
- * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- * SUCH DAMAGE.
- *
- * XXX: We need to pass sessions back into the event engine when they are
- * reused. Not sure what the most efficient way is for that. For now
- * write the session pointer to a pipe which the event engine monitors.
- */
-
-#include "config.h"
-
-#if defined(HAVE_EPOLL_CTL)
-
-#include <sys/epoll.h>
-
-#include <fcntl.h>
-#include <stdlib.h>
-
-#include "cache.h"
-
-#include "cache_waiter.h"
-#include "vtim.h"
-
-#ifndef EPOLLRDHUP
-# define EPOLLRDHUP 0
-#endif
-
-#define NEEV 100
-
-struct vwe {
- unsigned magic;
-#define VWE_MAGIC 0x6bd73424
-
- pthread_t epoll_thread;
- pthread_t timer_thread;
- int epfd;
-
- VTAILQ_HEAD(,sess) sesshead;
- int pipes[2];
- int timer_pipes[2];
-};
-
-static void
-vwe_modadd(struct vwe *vwe, int fd, void *data, short arm)
-{
-
- /* XXX: EPOLLET (edge triggered) can cause rather Bad Things to
- * XXX: happen: If NEEV+1 threads get stuck in write(), all threads
- * XXX: will hang. See #644.
- */
- assert(fd >= 0);
- if (data == vwe->pipes || data == vwe->timer_pipes) {
- struct epoll_event ev = {
- EPOLLIN | EPOLLPRI , { data }
- };
- AZ(epoll_ctl(vwe->epfd, arm, fd, &ev));
- } else {
- struct sess *sp = (struct sess *)data;
- CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- sp->ev.data.ptr = data;
- sp->ev.events = EPOLLIN | EPOLLPRI | EPOLLONESHOT | EPOLLRDHUP;
- AZ(epoll_ctl(vwe->epfd, arm, fd, &sp->ev));
- }
-}
-
-static void
-vwe_cond_modadd(struct vwe *vwe, int fd, void *data)
-{
- struct sess *sp = (struct sess *)data;
-
- assert(fd >= 0);
- CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- if (sp->ev.data.ptr)
- AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_MOD, fd, &sp->ev));
- else {
- sp->ev.data.ptr = data;
- sp->ev.events = EPOLLIN | EPOLLPRI | EPOLLONESHOT | EPOLLRDHUP;
- AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_ADD, fd, &sp->ev));
- }
-}
-
-static void
-vwe_eev(struct vwe *vwe, const struct epoll_event *ep)
-{
- struct sess *ss[NEEV], *sp;
- int i, j;
-
- AN(ep->data.ptr);
- if (ep->data.ptr == vwe->pipes) {
- if (ep->events & EPOLLIN || ep->events & EPOLLPRI) {
- j = 0;
- i = read(vwe->pipes[0], ss, sizeof ss);
- if (i == -1 && errno == EAGAIN)
- return;
- while (i >= sizeof ss[0]) {
- CHECK_OBJ_NOTNULL(ss[j], SESS_MAGIC);
- assert(ss[j]->fd >= 0);
- AZ(ss[j]->obj);
- VTAILQ_INSERT_TAIL(&vwe->sesshead, ss[j], list);
- vwe_cond_modadd(vwe, ss[j]->fd, ss[j]);
- j++;
- i -= sizeof ss[0];
- }
- assert(i == 0);
- }
- } else {
- CAST_OBJ_NOTNULL(sp, ep->data.ptr, SESS_MAGIC);
- if (ep->events & EPOLLIN || ep->events & EPOLLPRI) {
- i = HTC_Rx(sp->htc);
- if (i == 0) {
- vwe_modadd(vwe, sp->fd, sp, EPOLL_CTL_MOD);
- return; /* more needed */
- }
- VTAILQ_REMOVE(&vwe->sesshead, sp, list);
- SES_Handle(sp, i);
- } else if (ep->events & EPOLLERR) {
- VTAILQ_REMOVE(&vwe->sesshead, sp, list);
- SES_Delete(sp, "ERR");
- } else if (ep->events & EPOLLHUP) {
- VTAILQ_REMOVE(&vwe->sesshead, sp, list);
- SES_Delete(sp, "HUP");
- } else if (ep->events & EPOLLRDHUP) {
- VTAILQ_REMOVE(&vwe->sesshead, sp, list);
- SES_Delete(sp, "RHUP");
- }
- }
-}
-
-/*--------------------------------------------------------------------*/
-
-static void *
-vwe_thread(void *priv)
-{
- struct epoll_event ev[NEEV], *ep;
- struct sess *sp;
- char junk;
- double deadline;
- int dotimer, i, n;
- struct vwe *vwe;
-
- CAST_OBJ_NOTNULL(vwe, priv, VWE_MAGIC);
-
- THR_SetName("cache-epoll");
-
- vwe->epfd = epoll_create(1);
- assert(vwe->epfd >= 0);
-
- vwe_modadd(vwe, vwe->pipes[0], vwe->pipes, EPOLL_CTL_ADD);
- vwe_modadd(vwe, vwe->timer_pipes[0], vwe->timer_pipes, EPOLL_CTL_ADD);
-
- while (1) {
- dotimer = 0;
- n = epoll_wait(vwe->epfd, ev, NEEV, -1);
- for (ep = ev, i = 0; i < n; i++, ep++) {
- if (ep->data.ptr == vwe->timer_pipes &&
- (ep->events == EPOLLIN || ep->events == EPOLLPRI))
- {
- assert(read(vwe->timer_pipes[0], &junk, 1));
- dotimer = 1;
- } else
- vwe_eev(vwe, ep);
- }
- if (!dotimer)
- continue;
-
- /* check for timeouts */
- deadline = VTIM_real() - params->sess_timeout;
- for (;;) {
- sp = VTAILQ_FIRST(&vwe->sesshead);
- if (sp == NULL)
- break;
- if (sp->t_open > deadline)
- break;
- VTAILQ_REMOVE(&vwe->sesshead, sp, list);
- // XXX: not yet VTCP_linger(sp->fd, 0);
- SES_Delete(sp, "timeout");
- }
- }
- return NULL;
-}
-
-/*--------------------------------------------------------------------*/
-
-static void *
-vwe_sess_timeout_ticker(void *priv)
-{
- char ticker = 'R';
- struct vwe *vwe;
-
- CAST_OBJ_NOTNULL(vwe, priv, VWE_MAGIC);
- THR_SetName("cache-epoll-sess_timeout_ticker");
-
- while (1) {
- /* ticking */
- assert(write(vwe->timer_pipes[1], &ticker, 1));
- VTIM_sleep(100 * 1e-3);
- }
- return NULL;
-}
-
-/*--------------------------------------------------------------------*/
-
-static void
-vwe_pass(void *priv, const struct sess *sp)
-{
- struct vwe *vwe;
-
- CAST_OBJ_NOTNULL(vwe, priv, VWE_MAGIC);
- assert(sizeof sp == write(vwe->pipes[1], &sp, sizeof sp));
-}
-
-/*--------------------------------------------------------------------*/
-
-static void *
-vwe_init(void)
-{
- int i;
- struct vwe *vwe;
-
- ALLOC_OBJ(vwe, VWE_MAGIC);
- AN(vwe);
- VTAILQ_INIT(&vwe->sesshead);
- AZ(pipe(vwe->pipes));
- AZ(pipe(vwe->timer_pipes));
-
- i = fcntl(vwe->pipes[0], F_GETFL);
- assert(i != -1);
- i |= O_NONBLOCK;
- i = fcntl(vwe->pipes[0], F_SETFL, i);
- assert(i != -1);
-
- i = fcntl(vwe->timer_pipes[0], F_GETFL);
- assert(i != -1);
- i |= O_NONBLOCK;
- i = fcntl(vwe->timer_pipes[0], F_SETFL, i);
- assert(i != -1);
-
- AZ(pthread_create(&vwe->timer_thread,
- NULL, vwe_sess_timeout_ticker, vwe));
- AZ(pthread_create(&vwe->epoll_thread, NULL, vwe_thread, vwe));
- return(vwe);
-}
-
-/*--------------------------------------------------------------------*/
-
-const struct waiter waiter_epoll = {
- .name = "epoll",
- .init = vwe_init,
- .pass = vwe_pass,
-};
-
-#endif /* defined(HAVE_EPOLL_CTL) */
diff --git a/bin/varnishd/cache_waiter_kqueue.c b/bin/varnishd/cache_waiter_kqueue.c
deleted file mode 100644
index 8bee1c9..0000000
--- a/bin/varnishd/cache_waiter_kqueue.c
+++ /dev/null
@@ -1,246 +0,0 @@
-/*-
- * Copyright (c) 2006 Verdens Gang AS
- * Copyright (c) 2006-2009 Varnish Software AS
- * All rights reserved.
- *
- * Author: Poul-Henning Kamp <phk at phk.freebsd.dk>
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
- * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- * SUCH DAMAGE.
- *
- * XXX: We need to pass sessions back into the event engine when they are
- * reused. Not sure what the most efficient way is for that. For now
- * write the session pointer to a pipe which the event engine monitors.
- */
-
-#include "config.h"
-
-#if defined(HAVE_KQUEUE)
-
-#include <sys/types.h>
-#include <sys/event.h>
-
-#include <fcntl.h>
-#include <stdlib.h>
-#include <unistd.h>
-
-#include "cache.h"
-
-#include "cache_waiter.h"
-#include "vtim.h"
-
-#define NKEV 100
-
-struct vwk {
- unsigned magic;
-#define VWK_MAGIC 0x1cc2acc2
- pthread_t thread;
- int pipes[2];
- int kq;
- struct kevent ki[NKEV];
- unsigned nki;
- VTAILQ_HEAD(,sess) sesshead;
-};
-
-/*--------------------------------------------------------------------*/
-
-static void
-vwk_kq_flush(struct vwk *vwk)
-{
- int i;
-
- if (vwk->nki == 0)
- return;
- i = kevent(vwk->kq, vwk->ki, vwk->nki, NULL, 0, NULL);
- assert(i == 0);
- vwk->nki = 0;
-}
-
-static void
-vwk_kq_sess(struct vwk *vwk, struct sess *sp, short arm)
-{
-
- CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- assert(sp->fd >= 0);
- DSL(0x04, SLT_Debug, sp->vsl_id, "KQ: EV_SET sp %p arm %x", sp, arm);
- EV_SET(&vwk->ki[vwk->nki], sp->fd, EVFILT_READ, arm, 0, 0, sp);
- if (++vwk->nki == NKEV)
- vwk_kq_flush(vwk);
-}
-
-static void
-vwk_kev(struct vwk *vwk, const struct kevent *kp)
-{
- int i, j;
- struct sess *sp;
- struct sess *ss[NKEV];
-
- AN(kp->udata);
- if (kp->udata == vwk->pipes) {
- j = 0;
- i = read(vwk->pipes[0], ss, sizeof ss);
- if (i == -1 && errno == EAGAIN)
- return;
- while (i >= sizeof ss[0]) {
- CHECK_OBJ_NOTNULL(ss[j], SESS_MAGIC);
- assert(ss[j]->fd >= 0);
- AZ(ss[j]->obj);
- VTAILQ_INSERT_TAIL(&vwk->sesshead, ss[j], list);
- vwk_kq_sess(vwk, ss[j], EV_ADD | EV_ONESHOT);
- j++;
- i -= sizeof ss[0];
- }
- assert(i == 0);
- return;
- }
- CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
- DSL(0x04, SLT_Debug, sp->vsl_id, "KQ: sp %p kev data %lu flags 0x%x%s",
- sp, (unsigned long)kp->data, kp->flags,
- (kp->flags & EV_EOF) ? " EOF" : "");
-
- assert((sp->vsl_id & VSL_IDENTMASK) == kp->ident);
- assert((sp->vsl_id & VSL_IDENTMASK) == sp->fd);
- if (kp->data > 0) {
- i = HTC_Rx(sp->htc);
- if (i == 0) {
- vwk_kq_sess(vwk, sp, EV_ADD | EV_ONESHOT);
- return; /* more needed */
- }
- VTAILQ_REMOVE(&vwk->sesshead, sp, list);
- SES_Handle(sp, i);
- return;
- } else if (kp->flags & EV_EOF) {
- VTAILQ_REMOVE(&vwk->sesshead, sp, list);
- SES_Delete(sp, "EOF");
- return;
- } else {
- VSL(SLT_Debug, sp->vsl_id, "KQ: sp %p kev data %lu flags 0x%x%s",
- sp, (unsigned long)kp->data, kp->flags,
- (kp->flags & EV_EOF) ? " EOF" : "");
- }
-}
-
-/*--------------------------------------------------------------------*/
-
-static void *
-vwk_thread(void *priv)
-{
- struct vwk *vwk;
- struct kevent ke[NKEV], *kp;
- int j, n, dotimer;
- double deadline;
- struct sess *sp;
-
- CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
- THR_SetName("cache-kqueue");
-
- vwk->kq = kqueue();
- assert(vwk->kq >= 0);
-
- j = 0;
- EV_SET(&ke[j], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
- j++;
- EV_SET(&ke[j], vwk->pipes[0], EVFILT_READ, EV_ADD, 0, 0, vwk->pipes);
- j++;
- AZ(kevent(vwk->kq, ke, j, NULL, 0, NULL));
-
- vwk->nki = 0;
- while (1) {
- dotimer = 0;
- n = kevent(vwk->kq, vwk->ki, vwk->nki, ke, NKEV, NULL);
- assert(n >= 1 && n <= NKEV);
- vwk->nki = 0;
- for (kp = ke, j = 0; j < n; j++, kp++) {
- if (kp->filter == EVFILT_TIMER) {
- dotimer = 1;
- continue;
- }
- assert(kp->filter == EVFILT_READ);
- vwk_kev(vwk, kp);
- }
- if (!dotimer)
- continue;
- /*
- * Make sure we have no pending changes for the fd's
- * we are about to close, in case the accept(2) in the
- * other thread creates new fd's betwen our close and
- * the kevent(2) at the top of this loop, the kernel
- * would not know we meant "the old fd of this number".
- */
- vwk_kq_flush(vwk);
- deadline = VTIM_real() - params->sess_timeout;
- for (;;) {
- sp = VTAILQ_FIRST(&vwk->sesshead);
- if (sp == NULL)
- break;
- if (sp->t_open > deadline)
- break;
- VTAILQ_REMOVE(&vwk->sesshead, sp, list);
- // XXX: not yet (void)VTCP_linger(sp->fd, 0);
- SES_Delete(sp, "timeout");
- }
- }
-}
-
-/*--------------------------------------------------------------------*/
-
-static void
-vwk_pass(void *priv, const struct sess *sp)
-{
- struct vwk *vwk;
-
- CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
- assert(sizeof sp == write(vwk->pipes[1], &sp, sizeof sp));
-}
-
-/*--------------------------------------------------------------------*/
-
-static void *
-vwk_init(void)
-{
- int i;
- struct vwk *vwk;
-
- ALLOC_OBJ(vwk, VWK_MAGIC);
- AN(vwk);
-
- VTAILQ_INIT(&vwk->sesshead);
- AZ(pipe(vwk->pipes));
-
- i = fcntl(vwk->pipes[0], F_GETFL);
- assert(i != -1);
- i |= O_NONBLOCK;
- i = fcntl(vwk->pipes[0], F_SETFL, i);
- assert(i != -1);
-
- AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
- return (vwk);
-}
-
-/*--------------------------------------------------------------------*/
-
-const struct waiter waiter_kqueue = {
- .name = "kqueue",
- .init = vwk_init,
- .pass = vwk_pass,
-};
-
-#endif /* defined(HAVE_KQUEUE) */
diff --git a/bin/varnishd/cache_waiter_poll.c b/bin/varnishd/cache_waiter_poll.c
deleted file mode 100644
index eb48870..0000000
--- a/bin/varnishd/cache_waiter_poll.c
+++ /dev/null
@@ -1,233 +0,0 @@
-/*-
- * Copyright (c) 2006 Verdens Gang AS
- * Copyright (c) 2006-2010 Varnish Software AS
- * All rights reserved.
- *
- * Author: Poul-Henning Kamp <phk at phk.freebsd.dk>
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
- * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- * SUCH DAMAGE.
- *
- */
-
-#include "config.h"
-
-#include <poll.h>
-#include <stdlib.h>
-
-#include "cache.h"
-
-#include "cache_waiter.h"
-#include "vtim.h"
-
-#define NEEV 128
-
-struct vwp {
- unsigned magic;
-#define VWP_MAGIC 0x4b2cc735
- int pipes[2];
- pthread_t poll_thread;
- struct pollfd *pollfd;
- unsigned npoll;
- unsigned hpoll;
-
- VTAILQ_HEAD(,sess) sesshead;
-};
-
-/*--------------------------------------------------------------------*/
-
-static void
-vwp_pollspace(struct vwp *vwp, unsigned fd)
-{
- struct pollfd *newpollfd = vwp->pollfd;
- unsigned newnpoll;
-
- if (fd < vwp->npoll)
- return;
- newnpoll = vwp->npoll;
- if (newnpoll == 0)
- newnpoll = 1;
- while (fd >= newnpoll)
- newnpoll = newnpoll * 2;
- VSL(SLT_Debug, 0, "Acceptor poll space increased to %u", newnpoll);
- newpollfd = realloc(newpollfd, newnpoll * sizeof *newpollfd);
- XXXAN(newpollfd);
- memset(newpollfd + vwp->npoll, 0,
- (newnpoll - vwp->npoll) * sizeof *newpollfd);
- vwp->pollfd = newpollfd;
- while (vwp->npoll < newnpoll)
- vwp->pollfd[vwp->npoll++].fd = -1;
- assert(fd < vwp->npoll);
-}
-
-/*--------------------------------------------------------------------*/
-
-static void
-vwp_poll(struct vwp *vwp, int fd)
-{
-
- assert(fd >= 0);
- vwp_pollspace(vwp, (unsigned)fd);
- assert(fd < vwp->npoll);
-
- if (vwp->hpoll < fd)
- vwp->hpoll = fd;
-
- assert(vwp->pollfd[fd].fd == -1);
- assert(vwp->pollfd[fd].events == 0);
- assert(vwp->pollfd[fd].revents == 0);
-
- vwp->pollfd[fd].fd = fd;
- vwp->pollfd[fd].events = POLLIN;
-}
-
-static void
-vwp_unpoll(struct vwp *vwp, int fd)
-{
-
- assert(fd >= 0);
- assert(fd < vwp->npoll);
- vwp_pollspace(vwp, (unsigned)fd);
-
- assert(vwp->pollfd[fd].fd == fd);
- assert(vwp->pollfd[fd].events == POLLIN);
- assert(vwp->pollfd[fd].revents == 0);
-
- vwp->pollfd[fd].fd = -1;
- vwp->pollfd[fd].events = 0;
-}
-
-/*--------------------------------------------------------------------*/
-
-static void *
-vwp_main(void *priv)
-{
- int v, v2;
- struct vwp *vwp;
- struct sess *ss[NEEV], *sp, *sp2;
- double deadline;
- int i, j, fd;
-
- CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
- THR_SetName("cache-poll");
-
- vwp_poll(vwp, vwp->pipes[0]);
-
- while (1) {
- assert(vwp->hpoll < vwp->npoll);
- while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
- vwp->hpoll--;
- assert(vwp->pipes[0] <= vwp->hpoll);
- assert(vwp->pollfd[vwp->pipes[0]].fd == vwp->pipes[0]);
- assert(vwp->pollfd[vwp->pipes[1]].fd == -1);
- v = poll(vwp->pollfd, vwp->hpoll + 1, 100);
- assert(v >= 0);
- deadline = VTIM_real() - params->sess_timeout;
- v2 = v;
- VTAILQ_FOREACH_SAFE(sp, &vwp->sesshead, list, sp2) {
- if (v != 0 && v2 == 0)
- break;
- CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
- fd = sp->fd;
- assert(fd >= 0);
- assert(fd <= vwp->hpoll);
- assert(fd < vwp->npoll);
- assert(vwp->pollfd[fd].fd == fd);
- if (vwp->pollfd[fd].revents) {
- v2--;
- i = HTC_Rx(sp->htc);
- if (vwp->pollfd[fd].revents != POLLIN)
- VSL(SLT_Debug, fd, "Poll: %x / %d",
- vwp->pollfd[fd].revents, i);
- vwp->pollfd[fd].revents = 0;
- VTAILQ_REMOVE(&vwp->sesshead, sp, list);
- if (i == 0) {
- /* Mov to front of list for speed */
- VTAILQ_INSERT_HEAD(&vwp->sesshead, sp, list);
- } else {
- vwp_unpoll(vwp, fd);
- SES_Handle(sp, i);
- }
- } else if (sp->t_open <= deadline) {
- VTAILQ_REMOVE(&vwp->sesshead, sp, list);
- vwp_unpoll(vwp, fd);
- // XXX: not yet (void)VTCP_linger(sp->fd, 0);
- SES_Delete(sp, "timeout");
- }
- }
- if (v2 && vwp->pollfd[vwp->pipes[0]].revents) {
-
- if (vwp->pollfd[vwp->pipes[0]].revents != POLLIN)
- VSL(SLT_Debug, 0, "pipe.revents= 0x%x",
- vwp->pollfd[vwp->pipes[0]].revents);
- assert(vwp->pollfd[vwp->pipes[0]].revents == POLLIN);
- vwp->pollfd[vwp->pipes[0]].revents = 0;
- v2--;
- i = read(vwp->pipes[0], ss, sizeof ss);
- assert(i >= 0);
- assert(((unsigned)i % sizeof ss[0]) == 0);
- for (j = 0; j * sizeof ss[0] < i; j++) {
- CHECK_OBJ_NOTNULL(ss[j], SESS_MAGIC);
- assert(ss[j]->fd >= 0);
- VTAILQ_INSERT_TAIL(&vwp->sesshead, ss[j], list);
- vwp_poll(vwp, ss[j]->fd);
- }
- }
- assert(v2 == 0);
- }
- NEEDLESS_RETURN(NULL);
-}
-
-/*--------------------------------------------------------------------*/
-
-static void
-vwp_poll_pass(void *priv, const struct sess *sp)
-{
- struct vwp *vwp;
-
- CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
-
- assert(sizeof sp == write(vwp->pipes[1], &sp, sizeof sp));
-}
-
-/*--------------------------------------------------------------------*/
-
-static void *
-vwp_poll_init(void)
-{
- struct vwp *vwp;
-
- ALLOC_OBJ(vwp, VWP_MAGIC);
- AN(vwp);
- VTAILQ_INIT(&vwp->sesshead);
- AZ(pipe(vwp->pipes));
- vwp_pollspace(vwp, 256);
- AZ(pthread_create(&vwp->poll_thread, NULL, vwp_main, vwp));
- return (vwp);
-}
-
-/*--------------------------------------------------------------------*/
-
-const struct waiter waiter_poll = {
- .name = "poll",
- .init = vwp_poll_init,
- .pass = vwp_poll_pass,
-};
diff --git a/bin/varnishd/cache_waiter_ports.c b/bin/varnishd/cache_waiter_ports.c
deleted file mode 100644
index 821ebc3..0000000
--- a/bin/varnishd/cache_waiter_ports.c
+++ /dev/null
@@ -1,283 +0,0 @@
-/*-
- * Copyright (c) 2006 Verdens Gang AS
- * Copyright (c) 2006 Varnish Software AS
- * Copyright (c) 2007 OmniTI Computer Consulting, Inc.
- * Copyright (c) 2007 Theo Schlossnagle
- * Copyright (c) 2010 UPLEX, Nils Goroll
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
- * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- * SUCH DAMAGE.
- *
- */
-
-#include "config.h"
-
-#if defined(HAVE_PORT_CREATE)
-
-#include <sys/time.h>
-
-#include <math.h>
-#include <port.h>
-#include <stdio.h>
-#include <stdlib.h>
-
-#include "cache.h"
-
-#include "cache_waiter.h"
-#include "vtim.h"
-
-#define MAX_EVENTS 256
-
-struct vws {
- unsigned magic;
-#define VWS_MAGIC 0x0b771473
- pthread_t ports_thread;
- int dport;
- VTAILQ_HEAD(,sess) sesshead;
-};
-
-static inline void
-vws_add(struct vws *vws, int fd, void *data)
-{
- /*
- * POLLIN should be all we need here
- *
- */
- AZ(port_associate(vws->dport, PORT_SOURCE_FD, fd, POLLIN, data));
-}
-
-static inline void
-vws_del(struct vws *vws, int fd)
-{
- port_dissociate(vws->dport, PORT_SOURCE_FD, fd);
-}
-
-static inline void
-vws_port_ev(struct vws *vws, port_event_t *ev) {
- struct sess *sp;
- if(ev->portev_source == PORT_SOURCE_USER) {
- CAST_OBJ_NOTNULL(sp, ev->portev_user, SESS_MAGIC);
- assert(sp->fd >= 0);
- AZ(sp->obj);
- VTAILQ_INSERT_TAIL(&vws->sesshead, sp, list);
- vws_add(vws, sp->fd, sp);
- } else {
- int i;
- assert(ev->portev_source == PORT_SOURCE_FD);
- CAST_OBJ_NOTNULL(sp, ev->portev_user, SESS_MAGIC);
- assert(sp->fd >= 0);
- if(ev->portev_events & POLLERR) {
- vws_del(vws, sp->fd);
- VTAILQ_REMOVE(&vws->sesshead, sp, list);
- SES_Delete(sp, "EOF");
- return;
- }
- i = HTC_Rx(sp->htc);
-
- if (i == 0) {
- /* incomplete header, wait for more data */
- vws_add(vws, sp->fd, sp);
- return;
- }
-
- /*
- * note: the original man page for port_associate(3C) states:
- *
- * When an event for a PORT_SOURCE_FD object is retrieved,
- * the object no longer has an association with the port.
- *
- * This can be read along the lines of sparing the
- * port_dissociate after port_getn(), but in fact,
- * port_dissociate should be used
- *
- * Ref: http://opensolaris.org/jive/thread.jspa?threadID=129476&tstart=0
- */
- vws_del(vws, sp->fd);
- VTAILQ_REMOVE(&vws->sesshead, sp, list);
-
- /* SES_Handle will also handle errors */
- SES_Handle(sp, i);
- }
- return;
-}
-
-static void *
-vws_thread(void *priv)
-{
- struct sess *sp;
- struct vws *vws;
-
- CAST_OBJ_NOTNULL(vws, priv, VWS_MAGIC);
- /*
- * timeouts:
- *
- * min_ts : Minimum timeout for port_getn
- * min_t : ^ equivalent in floating point representation
- *
- * max_ts : Maximum timeout for port_getn
- * max_t : ^ equivalent in floating point representation
- *
- * with (nevents == 1), we should always choose the correct port_getn
- * timeout to check session timeouts, so max is just a safety measure
- * (if this implementation is correct, it could be set to an "infinte"
- * value)
- *
- * with (nevents > 1), min and max define the acceptable range for
- * - additional latency of keep-alive connections and
- * - additional tolerance for handling session timeouts
- *
- */
- static struct timespec min_ts = {0L, 100L /*ms*/ * 1000L /*us*/ * 1000L /*ns*/};
- static double min_t = 0.1; /* 100 ms*/
- static struct timespec max_ts = {1L, 0L}; /* 1 second */
- static double max_t = 1.0; /* 1 second */
-
- /* XXX: These should probably go in vws ? */
- struct timespec ts;
- struct timespec *timeout;
-
- vws->dport = port_create();
- assert(vws->dport >= 0);
-
- timeout = &max_ts;
-
- while (1) {
- port_event_t ev[MAX_EVENTS];
- int nevents, ei, ret;
- double now, deadline;
-
- /*
- * XXX Do we want to scale this up dynamically to increase
- * efficiency in high throughput situations? - would need to
- * start with one to keep latency low at any rate
- *
- * Note: when increasing nevents, we must lower min_ts
- * and max_ts
- */
- nevents = 1;
-
- /*
- * see disucssion in
- * - https://issues.apache.org/bugzilla/show_bug.cgi?id=47645
- * - http://mail.opensolaris.org/pipermail/networking-discuss/2009-August/011979.html
- *
- * comment from apr/poll/unix/port.c :
- *
- * This confusing API can return an event at the same time
- * that it reports EINTR or ETIME.
- *
- */
-
- ret = port_getn(vws->dport, ev, MAX_EVENTS, &nevents, timeout);
-
- if (ret < 0)
- assert((errno == EINTR) || (errno == ETIME));
-
- for (ei = 0; ei < nevents; ei++)
- vws_port_ev(vws, ev + ei);
-
- /* check for timeouts */
- now = VTIM_real();
- deadline = now - params->sess_timeout;
-
- /*
- * This loop assumes that the oldest sessions are always at the
- * beginning of the list (which is the case if we guarantee to
- * enqueue at the tail only
- *
- */
-
- for (;;) {
- sp = VTAILQ_FIRST(&vws->sesshead);
- if (sp == NULL)
- break;
- if (sp->t_open > deadline) {
- break;
- }
- VTAILQ_REMOVE(&vws->sesshead, sp, list);
- if(sp->fd != -1) {
- vws_del(vws, sp->fd);
- }
- SES_Delete(sp, "timeout");
- }
-
- /*
- * Calculate the timeout for the next get_portn
- */
-
- if (sp) {
- double tmo = (sp->t_open + params->sess_timeout) - now;
-
- /* we should have removed all sps whose timeout has passed */
- assert(tmo > 0.0);
-
- if (tmo < min_t) {
- timeout = &min_ts;
- } else if (tmo > max_t) {
- timeout = &max_ts;
- } else {
- ts = VTIM_timespec(tmo);
- timeout = &ts;
- }
- } else {
- timeout = &max_ts;
- }
- }
-}
-
-/*--------------------------------------------------------------------*/
-
-static void
-vws_pass(void *priv, const struct sess *sp)
-{
- int r;
- struct vws *vws;
-
- CAST_OBJ_NOTNULL(vws, priv, VWS_MAGIC);
- while((r = port_send(vws->dport, 0, TRUST_ME(sp))) == -1 &&
- errno == EAGAIN);
- AZ(r);
-}
-
-/*--------------------------------------------------------------------*/
-
-static void *
-vws_init(void)
-{
- struct vws *vws;
-
- ALLOC_OBJ(vws, VWS_MAGIC);
- AN(vws);
- VTAILQ_INIT(&vws->sesshead);
- AZ(pthread_create(&vws->ports_thread, NULL, vws_thread, vws));
- return (vws);
-}
-
-/*--------------------------------------------------------------------*/
-
-const struct waiter waiter_ports = {
- .name = "ports",
- .init = vws_init,
- .pass = vws_pass
-};
-
-#endif /* defined(HAVE_PORT_CREATE) */
diff --git a/bin/varnishd/flint.sh b/bin/varnishd/flint.sh
index 86d4ad0..357d280 100755
--- a/bin/varnishd/flint.sh
+++ b/bin/varnishd/flint.sh
@@ -17,6 +17,7 @@ flexelint \
-DVARNISH_STATE_DIR=\"foo\" \
*.c \
storage/*.c \
+ waiter/*.c \
../../lib/libvarnish/*.c \
../../lib/libvarnishcompat/execinfo.c \
../../lib/libvcl/*.c \
diff --git a/bin/varnishd/mgt_param.c b/bin/varnishd/mgt_param.c
index 9ded195..43749b6 100644
--- a/bin/varnishd/mgt_param.c
+++ b/bin/varnishd/mgt_param.c
@@ -39,7 +39,7 @@
#include "mgt.h"
-#include "cache_waiter.h"
+#include "waiter/cache_waiter.h"
#include "heritage.h"
#include "vav.h"
#include "vcli.h"
diff --git a/bin/varnishd/waiter/cache_waiter.c b/bin/varnishd/waiter/cache_waiter.c
new file mode 100644
index 0000000..e6654fd
--- /dev/null
+++ b/bin/varnishd/waiter/cache_waiter.c
@@ -0,0 +1,110 @@
+/*-
+ * Copyright (c) 2006 Verdens Gang AS
+ * Copyright (c) 2006-2011 Varnish Software AS
+ * All rights reserved.
+ *
+ * Author: Poul-Henning Kamp <phk at phk.freebsd.dk>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ */
+
+#include "config.h"
+
+#include "cache.h"
+
+#include "waiter/cache_waiter.h"
+#include "vcli.h"
+#include "vcli_priv.h"
+
+static const struct waiter * const vca_waiters[] = {
+ #if defined(HAVE_KQUEUE)
+ &waiter_kqueue,
+ #endif
+ #if defined(HAVE_EPOLL_CTL)
+ &waiter_epoll,
+ #endif
+ #if defined(HAVE_PORT_CREATE)
+ &waiter_ports,
+ #endif
+ &waiter_poll,
+ NULL,
+};
+
+struct waiter const * waiter;
+
+const char *
+WAIT_GetName(void)
+{
+
+ if (waiter != NULL)
+ return (waiter->name);
+ else
+ return ("no_waiter");
+}
+
+void
+WAIT_tweak_waiter(struct cli *cli, const char *arg)
+{
+ int i;
+
+ ASSERT_MGT();
+
+ if (arg == NULL) {
+ if (waiter == NULL)
+ VCLI_Out(cli, "default");
+ else
+ VCLI_Out(cli, "%s", waiter->name);
+
+ VCLI_Out(cli, " (");
+ for (i = 0; vca_waiters[i] != NULL; i++)
+ VCLI_Out(cli, "%s%s", i == 0 ? "" : ", ",
+ vca_waiters[i]->name);
+ VCLI_Out(cli, ")");
+ return;
+ }
+ if (!strcmp(arg, "default")) {
+ waiter = NULL;
+ return;
+ }
+ for (i = 0; vca_waiters[i]; i++) {
+ if (!strcmp(arg, vca_waiters[i]->name)) {
+ waiter = vca_waiters[i];
+ return;
+ }
+ }
+ VCLI_Out(cli, "Unknown waiter");
+ VCLI_SetResult(cli, CLIS_PARAM);
+}
+
+void
+WAIT_Init(void)
+{
+
+ if (waiter == NULL)
+ waiter = vca_waiters[0];
+
+ AN(waiter);
+ AN(waiter->name);
+ AN(waiter->init);
+ AN(waiter->pass);
+}
diff --git a/bin/varnishd/waiter/cache_waiter.h b/bin/varnishd/waiter/cache_waiter.h
new file mode 100644
index 0000000..28bc39d
--- /dev/null
+++ b/bin/varnishd/waiter/cache_waiter.h
@@ -0,0 +1,64 @@
+/*-
+ * Copyright (c) 2006 Verdens Gang AS
+ * Copyright (c) 2006-2009 Varnish Software AS
+ * All rights reserved.
+ *
+ * Author: Poul-Henning Kamp <phk at phk.freebsd.dk>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ */
+
+struct sess;
+
+typedef void* waiter_init_f(void);
+typedef void waiter_pass_f(void *priv, const struct sess *);
+
+struct waiter {
+ const char *name;
+ waiter_init_f *init;
+ waiter_pass_f *pass;
+};
+
+extern struct waiter const * waiter;
+
+#if defined(HAVE_EPOLL_CTL)
+extern const struct waiter waiter_epoll;
+#endif
+
+#if defined(HAVE_KQUEUE)
+extern const struct waiter waiter_kqueue;
+#endif
+
+#if defined(HAVE_PORT_CREATE)
+extern const struct waiter waiter_ports;
+#endif
+
+
+/* cache_session.c */
+void SES_Handle(struct sess *sp, int status);
+
+/* cache_waiter.c */
+extern const struct waiter waiter_poll;
+const char *WAIT_GetName(void);
+void WAIT_tweak_waiter(struct cli *cli, const char *arg);
+void WAIT_Init(void);
diff --git a/bin/varnishd/waiter/cache_waiter_epoll.c b/bin/varnishd/waiter/cache_waiter_epoll.c
new file mode 100644
index 0000000..e700676
--- /dev/null
+++ b/bin/varnishd/waiter/cache_waiter_epoll.c
@@ -0,0 +1,276 @@
+/*-
+ * Copyright (c) 2006 Verdens Gang AS
+ * Copyright (c) 2006-2010 Varnish Software AS
+ * All rights reserved.
+ *
+ * Author: Rogerio Carvalho Schneider <stockrt at gmail.com>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * XXX: We need to pass sessions back into the event engine when they are
+ * reused. Not sure what the most efficient way is for that. For now
+ * write the session pointer to a pipe which the event engine monitors.
+ */
+
+#include "config.h"
+
+#if defined(HAVE_EPOLL_CTL)
+
+#include <sys/epoll.h>
+
+#include <fcntl.h>
+#include <stdlib.h>
+
+#include "cache.h"
+
+#include "waiter/cache_waiter.h"
+#include "vtim.h"
+
+#ifndef EPOLLRDHUP
+# define EPOLLRDHUP 0
+#endif
+
+#define NEEV 100
+
+struct vwe {
+ unsigned magic;
+#define VWE_MAGIC 0x6bd73424
+
+ pthread_t epoll_thread;
+ pthread_t timer_thread;
+ int epfd;
+
+ VTAILQ_HEAD(,sess) sesshead;
+ int pipes[2];
+ int timer_pipes[2];
+};
+
+static void
+vwe_modadd(struct vwe *vwe, int fd, void *data, short arm)
+{
+
+ /* XXX: EPOLLET (edge triggered) can cause rather Bad Things to
+ * XXX: happen: If NEEV+1 threads get stuck in write(), all threads
+ * XXX: will hang. See #644.
+ */
+ assert(fd >= 0);
+ if (data == vwe->pipes || data == vwe->timer_pipes) {
+ struct epoll_event ev = {
+ EPOLLIN | EPOLLPRI , { data }
+ };
+ AZ(epoll_ctl(vwe->epfd, arm, fd, &ev));
+ } else {
+ struct sess *sp = (struct sess *)data;
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ sp->ev.data.ptr = data;
+ sp->ev.events = EPOLLIN | EPOLLPRI | EPOLLONESHOT | EPOLLRDHUP;
+ AZ(epoll_ctl(vwe->epfd, arm, fd, &sp->ev));
+ }
+}
+
+static void
+vwe_cond_modadd(struct vwe *vwe, int fd, void *data)
+{
+ struct sess *sp = (struct sess *)data;
+
+ assert(fd >= 0);
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ if (sp->ev.data.ptr)
+ AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_MOD, fd, &sp->ev));
+ else {
+ sp->ev.data.ptr = data;
+ sp->ev.events = EPOLLIN | EPOLLPRI | EPOLLONESHOT | EPOLLRDHUP;
+ AZ(epoll_ctl(vwe->epfd, EPOLL_CTL_ADD, fd, &sp->ev));
+ }
+}
+
+static void
+vwe_eev(struct vwe *vwe, const struct epoll_event *ep)
+{
+ struct sess *ss[NEEV], *sp;
+ int i, j;
+
+ AN(ep->data.ptr);
+ if (ep->data.ptr == vwe->pipes) {
+ if (ep->events & EPOLLIN || ep->events & EPOLLPRI) {
+ j = 0;
+ i = read(vwe->pipes[0], ss, sizeof ss);
+ if (i == -1 && errno == EAGAIN)
+ return;
+ while (i >= sizeof ss[0]) {
+ CHECK_OBJ_NOTNULL(ss[j], SESS_MAGIC);
+ assert(ss[j]->fd >= 0);
+ AZ(ss[j]->obj);
+ VTAILQ_INSERT_TAIL(&vwe->sesshead, ss[j], list);
+ vwe_cond_modadd(vwe, ss[j]->fd, ss[j]);
+ j++;
+ i -= sizeof ss[0];
+ }
+ assert(i == 0);
+ }
+ } else {
+ CAST_OBJ_NOTNULL(sp, ep->data.ptr, SESS_MAGIC);
+ if (ep->events & EPOLLIN || ep->events & EPOLLPRI) {
+ i = HTC_Rx(sp->htc);
+ if (i == 0) {
+ vwe_modadd(vwe, sp->fd, sp, EPOLL_CTL_MOD);
+ return; /* more needed */
+ }
+ VTAILQ_REMOVE(&vwe->sesshead, sp, list);
+ SES_Handle(sp, i);
+ } else if (ep->events & EPOLLERR) {
+ VTAILQ_REMOVE(&vwe->sesshead, sp, list);
+ SES_Delete(sp, "ERR");
+ } else if (ep->events & EPOLLHUP) {
+ VTAILQ_REMOVE(&vwe->sesshead, sp, list);
+ SES_Delete(sp, "HUP");
+ } else if (ep->events & EPOLLRDHUP) {
+ VTAILQ_REMOVE(&vwe->sesshead, sp, list);
+ SES_Delete(sp, "RHUP");
+ }
+ }
+}
+
+/*--------------------------------------------------------------------*/
+
+static void *
+vwe_thread(void *priv)
+{
+ struct epoll_event ev[NEEV], *ep;
+ struct sess *sp;
+ char junk;
+ double deadline;
+ int dotimer, i, n;
+ struct vwe *vwe;
+
+ CAST_OBJ_NOTNULL(vwe, priv, VWE_MAGIC);
+
+ THR_SetName("cache-epoll");
+
+ vwe->epfd = epoll_create(1);
+ assert(vwe->epfd >= 0);
+
+ vwe_modadd(vwe, vwe->pipes[0], vwe->pipes, EPOLL_CTL_ADD);
+ vwe_modadd(vwe, vwe->timer_pipes[0], vwe->timer_pipes, EPOLL_CTL_ADD);
+
+ while (1) {
+ dotimer = 0;
+ n = epoll_wait(vwe->epfd, ev, NEEV, -1);
+ for (ep = ev, i = 0; i < n; i++, ep++) {
+ if (ep->data.ptr == vwe->timer_pipes &&
+ (ep->events == EPOLLIN || ep->events == EPOLLPRI))
+ {
+ assert(read(vwe->timer_pipes[0], &junk, 1));
+ dotimer = 1;
+ } else
+ vwe_eev(vwe, ep);
+ }
+ if (!dotimer)
+ continue;
+
+ /* check for timeouts */
+ deadline = VTIM_real() - params->sess_timeout;
+ for (;;) {
+ sp = VTAILQ_FIRST(&vwe->sesshead);
+ if (sp == NULL)
+ break;
+ if (sp->t_open > deadline)
+ break;
+ VTAILQ_REMOVE(&vwe->sesshead, sp, list);
+ // XXX: not yet VTCP_linger(sp->fd, 0);
+ SES_Delete(sp, "timeout");
+ }
+ }
+ return NULL;
+}
+
+/*--------------------------------------------------------------------*/
+
+static void *
+vwe_sess_timeout_ticker(void *priv)
+{
+ char ticker = 'R';
+ struct vwe *vwe;
+
+ CAST_OBJ_NOTNULL(vwe, priv, VWE_MAGIC);
+ THR_SetName("cache-epoll-sess_timeout_ticker");
+
+ while (1) {
+ /* ticking */
+ assert(write(vwe->timer_pipes[1], &ticker, 1));
+ VTIM_sleep(100 * 1e-3);
+ }
+ return NULL;
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vwe_pass(void *priv, const struct sess *sp)
+{
+ struct vwe *vwe;
+
+ CAST_OBJ_NOTNULL(vwe, priv, VWE_MAGIC);
+ assert(sizeof sp == write(vwe->pipes[1], &sp, sizeof sp));
+}
+
+/*--------------------------------------------------------------------*/
+
+static void *
+vwe_init(void)
+{
+ int i;
+ struct vwe *vwe;
+
+ ALLOC_OBJ(vwe, VWE_MAGIC);
+ AN(vwe);
+ VTAILQ_INIT(&vwe->sesshead);
+ AZ(pipe(vwe->pipes));
+ AZ(pipe(vwe->timer_pipes));
+
+ i = fcntl(vwe->pipes[0], F_GETFL);
+ assert(i != -1);
+ i |= O_NONBLOCK;
+ i = fcntl(vwe->pipes[0], F_SETFL, i);
+ assert(i != -1);
+
+ i = fcntl(vwe->timer_pipes[0], F_GETFL);
+ assert(i != -1);
+ i |= O_NONBLOCK;
+ i = fcntl(vwe->timer_pipes[0], F_SETFL, i);
+ assert(i != -1);
+
+ AZ(pthread_create(&vwe->timer_thread,
+ NULL, vwe_sess_timeout_ticker, vwe));
+ AZ(pthread_create(&vwe->epoll_thread, NULL, vwe_thread, vwe));
+ return(vwe);
+}
+
+/*--------------------------------------------------------------------*/
+
+const struct waiter waiter_epoll = {
+ .name = "epoll",
+ .init = vwe_init,
+ .pass = vwe_pass,
+};
+
+#endif /* defined(HAVE_EPOLL_CTL) */
diff --git a/bin/varnishd/waiter/cache_waiter_kqueue.c b/bin/varnishd/waiter/cache_waiter_kqueue.c
new file mode 100644
index 0000000..b300fae
--- /dev/null
+++ b/bin/varnishd/waiter/cache_waiter_kqueue.c
@@ -0,0 +1,246 @@
+/*-
+ * Copyright (c) 2006 Verdens Gang AS
+ * Copyright (c) 2006-2009 Varnish Software AS
+ * All rights reserved.
+ *
+ * Author: Poul-Henning Kamp <phk at phk.freebsd.dk>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * XXX: We need to pass sessions back into the event engine when they are
+ * reused. Not sure what the most efficient way is for that. For now
+ * write the session pointer to a pipe which the event engine monitors.
+ */
+
+#include "config.h"
+
+#if defined(HAVE_KQUEUE)
+
+#include <sys/types.h>
+#include <sys/event.h>
+
+#include <fcntl.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "cache.h"
+
+#include "waiter/cache_waiter.h"
+#include "vtim.h"
+
+#define NKEV 100
+
+struct vwk {
+ unsigned magic;
+#define VWK_MAGIC 0x1cc2acc2
+ pthread_t thread;
+ int pipes[2];
+ int kq;
+ struct kevent ki[NKEV];
+ unsigned nki;
+ VTAILQ_HEAD(,sess) sesshead;
+};
+
+/*--------------------------------------------------------------------*/
+
+static void
+vwk_kq_flush(struct vwk *vwk)
+{
+ int i;
+
+ if (vwk->nki == 0)
+ return;
+ i = kevent(vwk->kq, vwk->ki, vwk->nki, NULL, 0, NULL);
+ assert(i == 0);
+ vwk->nki = 0;
+}
+
+static void
+vwk_kq_sess(struct vwk *vwk, struct sess *sp, short arm)
+{
+
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ assert(sp->fd >= 0);
+ DSL(0x04, SLT_Debug, sp->vsl_id, "KQ: EV_SET sp %p arm %x", sp, arm);
+ EV_SET(&vwk->ki[vwk->nki], sp->fd, EVFILT_READ, arm, 0, 0, sp);
+ if (++vwk->nki == NKEV)
+ vwk_kq_flush(vwk);
+}
+
+static void
+vwk_kev(struct vwk *vwk, const struct kevent *kp)
+{
+ int i, j;
+ struct sess *sp;
+ struct sess *ss[NKEV];
+
+ AN(kp->udata);
+ if (kp->udata == vwk->pipes) {
+ j = 0;
+ i = read(vwk->pipes[0], ss, sizeof ss);
+ if (i == -1 && errno == EAGAIN)
+ return;
+ while (i >= sizeof ss[0]) {
+ CHECK_OBJ_NOTNULL(ss[j], SESS_MAGIC);
+ assert(ss[j]->fd >= 0);
+ AZ(ss[j]->obj);
+ VTAILQ_INSERT_TAIL(&vwk->sesshead, ss[j], list);
+ vwk_kq_sess(vwk, ss[j], EV_ADD | EV_ONESHOT);
+ j++;
+ i -= sizeof ss[0];
+ }
+ assert(i == 0);
+ return;
+ }
+ CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
+ DSL(0x04, SLT_Debug, sp->vsl_id, "KQ: sp %p kev data %lu flags 0x%x%s",
+ sp, (unsigned long)kp->data, kp->flags,
+ (kp->flags & EV_EOF) ? " EOF" : "");
+
+ assert((sp->vsl_id & VSL_IDENTMASK) == kp->ident);
+ assert((sp->vsl_id & VSL_IDENTMASK) == sp->fd);
+ if (kp->data > 0) {
+ i = HTC_Rx(sp->htc);
+ if (i == 0) {
+ vwk_kq_sess(vwk, sp, EV_ADD | EV_ONESHOT);
+ return; /* more needed */
+ }
+ VTAILQ_REMOVE(&vwk->sesshead, sp, list);
+ SES_Handle(sp, i);
+ return;
+ } else if (kp->flags & EV_EOF) {
+ VTAILQ_REMOVE(&vwk->sesshead, sp, list);
+ SES_Delete(sp, "EOF");
+ return;
+ } else {
+ VSL(SLT_Debug, sp->vsl_id, "KQ: sp %p kev data %lu flags 0x%x%s",
+ sp, (unsigned long)kp->data, kp->flags,
+ (kp->flags & EV_EOF) ? " EOF" : "");
+ }
+}
+
+/*--------------------------------------------------------------------*/
+
+static void *
+vwk_thread(void *priv)
+{
+ struct vwk *vwk;
+ struct kevent ke[NKEV], *kp;
+ int j, n, dotimer;
+ double deadline;
+ struct sess *sp;
+
+ CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
+ THR_SetName("cache-kqueue");
+
+ vwk->kq = kqueue();
+ assert(vwk->kq >= 0);
+
+ j = 0;
+ EV_SET(&ke[j], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
+ j++;
+ EV_SET(&ke[j], vwk->pipes[0], EVFILT_READ, EV_ADD, 0, 0, vwk->pipes);
+ j++;
+ AZ(kevent(vwk->kq, ke, j, NULL, 0, NULL));
+
+ vwk->nki = 0;
+ while (1) {
+ dotimer = 0;
+ n = kevent(vwk->kq, vwk->ki, vwk->nki, ke, NKEV, NULL);
+ assert(n >= 1 && n <= NKEV);
+ vwk->nki = 0;
+ for (kp = ke, j = 0; j < n; j++, kp++) {
+ if (kp->filter == EVFILT_TIMER) {
+ dotimer = 1;
+ continue;
+ }
+ assert(kp->filter == EVFILT_READ);
+ vwk_kev(vwk, kp);
+ }
+ if (!dotimer)
+ continue;
+ /*
+ * Make sure we have no pending changes for the fd's
+ * we are about to close, in case the accept(2) in the
+ * other thread creates new fd's betwen our close and
+ * the kevent(2) at the top of this loop, the kernel
+ * would not know we meant "the old fd of this number".
+ */
+ vwk_kq_flush(vwk);
+ deadline = VTIM_real() - params->sess_timeout;
+ for (;;) {
+ sp = VTAILQ_FIRST(&vwk->sesshead);
+ if (sp == NULL)
+ break;
+ if (sp->t_open > deadline)
+ break;
+ VTAILQ_REMOVE(&vwk->sesshead, sp, list);
+ // XXX: not yet (void)VTCP_linger(sp->fd, 0);
+ SES_Delete(sp, "timeout");
+ }
+ }
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vwk_pass(void *priv, const struct sess *sp)
+{
+ struct vwk *vwk;
+
+ CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
+ assert(sizeof sp == write(vwk->pipes[1], &sp, sizeof sp));
+}
+
+/*--------------------------------------------------------------------*/
+
+static void *
+vwk_init(void)
+{
+ int i;
+ struct vwk *vwk;
+
+ ALLOC_OBJ(vwk, VWK_MAGIC);
+ AN(vwk);
+
+ VTAILQ_INIT(&vwk->sesshead);
+ AZ(pipe(vwk->pipes));
+
+ i = fcntl(vwk->pipes[0], F_GETFL);
+ assert(i != -1);
+ i |= O_NONBLOCK;
+ i = fcntl(vwk->pipes[0], F_SETFL, i);
+ assert(i != -1);
+
+ AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
+ return (vwk);
+}
+
+/*--------------------------------------------------------------------*/
+
+const struct waiter waiter_kqueue = {
+ .name = "kqueue",
+ .init = vwk_init,
+ .pass = vwk_pass,
+};
+
+#endif /* defined(HAVE_KQUEUE) */
diff --git a/bin/varnishd/waiter/cache_waiter_poll.c b/bin/varnishd/waiter/cache_waiter_poll.c
new file mode 100644
index 0000000..2617365
--- /dev/null
+++ b/bin/varnishd/waiter/cache_waiter_poll.c
@@ -0,0 +1,233 @@
+/*-
+ * Copyright (c) 2006 Verdens Gang AS
+ * Copyright (c) 2006-2010 Varnish Software AS
+ * All rights reserved.
+ *
+ * Author: Poul-Henning Kamp <phk at phk.freebsd.dk>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ */
+
+#include "config.h"
+
+#include <poll.h>
+#include <stdlib.h>
+
+#include "cache.h"
+
+#include "waiter/cache_waiter.h"
+#include "vtim.h"
+
+#define NEEV 128
+
+struct vwp {
+ unsigned magic;
+#define VWP_MAGIC 0x4b2cc735
+ int pipes[2];
+ pthread_t poll_thread;
+ struct pollfd *pollfd;
+ unsigned npoll;
+ unsigned hpoll;
+
+ VTAILQ_HEAD(,sess) sesshead;
+};
+
+/*--------------------------------------------------------------------*/
+
+static void
+vwp_pollspace(struct vwp *vwp, unsigned fd)
+{
+ struct pollfd *newpollfd = vwp->pollfd;
+ unsigned newnpoll;
+
+ if (fd < vwp->npoll)
+ return;
+ newnpoll = vwp->npoll;
+ if (newnpoll == 0)
+ newnpoll = 1;
+ while (fd >= newnpoll)
+ newnpoll = newnpoll * 2;
+ VSL(SLT_Debug, 0, "Acceptor poll space increased to %u", newnpoll);
+ newpollfd = realloc(newpollfd, newnpoll * sizeof *newpollfd);
+ XXXAN(newpollfd);
+ memset(newpollfd + vwp->npoll, 0,
+ (newnpoll - vwp->npoll) * sizeof *newpollfd);
+ vwp->pollfd = newpollfd;
+ while (vwp->npoll < newnpoll)
+ vwp->pollfd[vwp->npoll++].fd = -1;
+ assert(fd < vwp->npoll);
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vwp_poll(struct vwp *vwp, int fd)
+{
+
+ assert(fd >= 0);
+ vwp_pollspace(vwp, (unsigned)fd);
+ assert(fd < vwp->npoll);
+
+ if (vwp->hpoll < fd)
+ vwp->hpoll = fd;
+
+ assert(vwp->pollfd[fd].fd == -1);
+ assert(vwp->pollfd[fd].events == 0);
+ assert(vwp->pollfd[fd].revents == 0);
+
+ vwp->pollfd[fd].fd = fd;
+ vwp->pollfd[fd].events = POLLIN;
+}
+
+static void
+vwp_unpoll(struct vwp *vwp, int fd)
+{
+
+ assert(fd >= 0);
+ assert(fd < vwp->npoll);
+ vwp_pollspace(vwp, (unsigned)fd);
+
+ assert(vwp->pollfd[fd].fd == fd);
+ assert(vwp->pollfd[fd].events == POLLIN);
+ assert(vwp->pollfd[fd].revents == 0);
+
+ vwp->pollfd[fd].fd = -1;
+ vwp->pollfd[fd].events = 0;
+}
+
+/*--------------------------------------------------------------------*/
+
+static void *
+vwp_main(void *priv)
+{
+ int v, v2;
+ struct vwp *vwp;
+ struct sess *ss[NEEV], *sp, *sp2;
+ double deadline;
+ int i, j, fd;
+
+ CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
+ THR_SetName("cache-poll");
+
+ vwp_poll(vwp, vwp->pipes[0]);
+
+ while (1) {
+ assert(vwp->hpoll < vwp->npoll);
+ while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
+ vwp->hpoll--;
+ assert(vwp->pipes[0] <= vwp->hpoll);
+ assert(vwp->pollfd[vwp->pipes[0]].fd == vwp->pipes[0]);
+ assert(vwp->pollfd[vwp->pipes[1]].fd == -1);
+ v = poll(vwp->pollfd, vwp->hpoll + 1, 100);
+ assert(v >= 0);
+ deadline = VTIM_real() - params->sess_timeout;
+ v2 = v;
+ VTAILQ_FOREACH_SAFE(sp, &vwp->sesshead, list, sp2) {
+ if (v != 0 && v2 == 0)
+ break;
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ fd = sp->fd;
+ assert(fd >= 0);
+ assert(fd <= vwp->hpoll);
+ assert(fd < vwp->npoll);
+ assert(vwp->pollfd[fd].fd == fd);
+ if (vwp->pollfd[fd].revents) {
+ v2--;
+ i = HTC_Rx(sp->htc);
+ if (vwp->pollfd[fd].revents != POLLIN)
+ VSL(SLT_Debug, fd, "Poll: %x / %d",
+ vwp->pollfd[fd].revents, i);
+ vwp->pollfd[fd].revents = 0;
+ VTAILQ_REMOVE(&vwp->sesshead, sp, list);
+ if (i == 0) {
+ /* Mov to front of list for speed */
+ VTAILQ_INSERT_HEAD(&vwp->sesshead, sp, list);
+ } else {
+ vwp_unpoll(vwp, fd);
+ SES_Handle(sp, i);
+ }
+ } else if (sp->t_open <= deadline) {
+ VTAILQ_REMOVE(&vwp->sesshead, sp, list);
+ vwp_unpoll(vwp, fd);
+ // XXX: not yet (void)VTCP_linger(sp->fd, 0);
+ SES_Delete(sp, "timeout");
+ }
+ }
+ if (v2 && vwp->pollfd[vwp->pipes[0]].revents) {
+
+ if (vwp->pollfd[vwp->pipes[0]].revents != POLLIN)
+ VSL(SLT_Debug, 0, "pipe.revents= 0x%x",
+ vwp->pollfd[vwp->pipes[0]].revents);
+ assert(vwp->pollfd[vwp->pipes[0]].revents == POLLIN);
+ vwp->pollfd[vwp->pipes[0]].revents = 0;
+ v2--;
+ i = read(vwp->pipes[0], ss, sizeof ss);
+ assert(i >= 0);
+ assert(((unsigned)i % sizeof ss[0]) == 0);
+ for (j = 0; j * sizeof ss[0] < i; j++) {
+ CHECK_OBJ_NOTNULL(ss[j], SESS_MAGIC);
+ assert(ss[j]->fd >= 0);
+ VTAILQ_INSERT_TAIL(&vwp->sesshead, ss[j], list);
+ vwp_poll(vwp, ss[j]->fd);
+ }
+ }
+ assert(v2 == 0);
+ }
+ NEEDLESS_RETURN(NULL);
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vwp_poll_pass(void *priv, const struct sess *sp)
+{
+ struct vwp *vwp;
+
+ CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
+
+ assert(sizeof sp == write(vwp->pipes[1], &sp, sizeof sp));
+}
+
+/*--------------------------------------------------------------------*/
+
+static void *
+vwp_poll_init(void)
+{
+ struct vwp *vwp;
+
+ ALLOC_OBJ(vwp, VWP_MAGIC);
+ AN(vwp);
+ VTAILQ_INIT(&vwp->sesshead);
+ AZ(pipe(vwp->pipes));
+ vwp_pollspace(vwp, 256);
+ AZ(pthread_create(&vwp->poll_thread, NULL, vwp_main, vwp));
+ return (vwp);
+}
+
+/*--------------------------------------------------------------------*/
+
+const struct waiter waiter_poll = {
+ .name = "poll",
+ .init = vwp_poll_init,
+ .pass = vwp_poll_pass,
+};
diff --git a/bin/varnishd/waiter/cache_waiter_ports.c b/bin/varnishd/waiter/cache_waiter_ports.c
new file mode 100644
index 0000000..022131b
--- /dev/null
+++ b/bin/varnishd/waiter/cache_waiter_ports.c
@@ -0,0 +1,283 @@
+/*-
+ * Copyright (c) 2006 Verdens Gang AS
+ * Copyright (c) 2006 Varnish Software AS
+ * Copyright (c) 2007 OmniTI Computer Consulting, Inc.
+ * Copyright (c) 2007 Theo Schlossnagle
+ * Copyright (c) 2010 UPLEX, Nils Goroll
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ */
+
+#include "config.h"
+
+#if defined(HAVE_PORT_CREATE)
+
+#include <sys/time.h>
+
+#include <math.h>
+#include <port.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "cache.h"
+
+#include "waiter/cache_waiter.h"
+#include "vtim.h"
+
+#define MAX_EVENTS 256
+
+struct vws {
+ unsigned magic;
+#define VWS_MAGIC 0x0b771473
+ pthread_t ports_thread;
+ int dport;
+ VTAILQ_HEAD(,sess) sesshead;
+};
+
+static inline void
+vws_add(struct vws *vws, int fd, void *data)
+{
+ /*
+ * POLLIN should be all we need here
+ *
+ */
+ AZ(port_associate(vws->dport, PORT_SOURCE_FD, fd, POLLIN, data));
+}
+
+static inline void
+vws_del(struct vws *vws, int fd)
+{
+ port_dissociate(vws->dport, PORT_SOURCE_FD, fd);
+}
+
+static inline void
+vws_port_ev(struct vws *vws, port_event_t *ev) {
+ struct sess *sp;
+ if(ev->portev_source == PORT_SOURCE_USER) {
+ CAST_OBJ_NOTNULL(sp, ev->portev_user, SESS_MAGIC);
+ assert(sp->fd >= 0);
+ AZ(sp->obj);
+ VTAILQ_INSERT_TAIL(&vws->sesshead, sp, list);
+ vws_add(vws, sp->fd, sp);
+ } else {
+ int i;
+ assert(ev->portev_source == PORT_SOURCE_FD);
+ CAST_OBJ_NOTNULL(sp, ev->portev_user, SESS_MAGIC);
+ assert(sp->fd >= 0);
+ if(ev->portev_events & POLLERR) {
+ vws_del(vws, sp->fd);
+ VTAILQ_REMOVE(&vws->sesshead, sp, list);
+ SES_Delete(sp, "EOF");
+ return;
+ }
+ i = HTC_Rx(sp->htc);
+
+ if (i == 0) {
+ /* incomplete header, wait for more data */
+ vws_add(vws, sp->fd, sp);
+ return;
+ }
+
+ /*
+ * note: the original man page for port_associate(3C) states:
+ *
+ * When an event for a PORT_SOURCE_FD object is retrieved,
+ * the object no longer has an association with the port.
+ *
+ * This can be read along the lines of sparing the
+ * port_dissociate after port_getn(), but in fact,
+ * port_dissociate should be used
+ *
+ * Ref: http://opensolaris.org/jive/thread.jspa?threadID=129476&tstart=0
+ */
+ vws_del(vws, sp->fd);
+ VTAILQ_REMOVE(&vws->sesshead, sp, list);
+
+ /* SES_Handle will also handle errors */
+ SES_Handle(sp, i);
+ }
+ return;
+}
+
+static void *
+vws_thread(void *priv)
+{
+ struct sess *sp;
+ struct vws *vws;
+
+ CAST_OBJ_NOTNULL(vws, priv, VWS_MAGIC);
+ /*
+ * timeouts:
+ *
+ * min_ts : Minimum timeout for port_getn
+ * min_t : ^ equivalent in floating point representation
+ *
+ * max_ts : Maximum timeout for port_getn
+ * max_t : ^ equivalent in floating point representation
+ *
+ * with (nevents == 1), we should always choose the correct port_getn
+ * timeout to check session timeouts, so max is just a safety measure
+ * (if this implementation is correct, it could be set to an "infinte"
+ * value)
+ *
+ * with (nevents > 1), min and max define the acceptable range for
+ * - additional latency of keep-alive connections and
+ * - additional tolerance for handling session timeouts
+ *
+ */
+ static struct timespec min_ts = {0L, 100L /*ms*/ * 1000L /*us*/ * 1000L /*ns*/};
+ static double min_t = 0.1; /* 100 ms*/
+ static struct timespec max_ts = {1L, 0L}; /* 1 second */
+ static double max_t = 1.0; /* 1 second */
+
+ /* XXX: These should probably go in vws ? */
+ struct timespec ts;
+ struct timespec *timeout;
+
+ vws->dport = port_create();
+ assert(vws->dport >= 0);
+
+ timeout = &max_ts;
+
+ while (1) {
+ port_event_t ev[MAX_EVENTS];
+ int nevents, ei, ret;
+ double now, deadline;
+
+ /*
+ * XXX Do we want to scale this up dynamically to increase
+ * efficiency in high throughput situations? - would need to
+ * start with one to keep latency low at any rate
+ *
+ * Note: when increasing nevents, we must lower min_ts
+ * and max_ts
+ */
+ nevents = 1;
+
+ /*
+ * see disucssion in
+ * - https://issues.apache.org/bugzilla/show_bug.cgi?id=47645
+ * - http://mail.opensolaris.org/pipermail/networking-discuss/2009-August/011979.html
+ *
+ * comment from apr/poll/unix/port.c :
+ *
+ * This confusing API can return an event at the same time
+ * that it reports EINTR or ETIME.
+ *
+ */
+
+ ret = port_getn(vws->dport, ev, MAX_EVENTS, &nevents, timeout);
+
+ if (ret < 0)
+ assert((errno == EINTR) || (errno == ETIME));
+
+ for (ei = 0; ei < nevents; ei++)
+ vws_port_ev(vws, ev + ei);
+
+ /* check for timeouts */
+ now = VTIM_real();
+ deadline = now - params->sess_timeout;
+
+ /*
+ * This loop assumes that the oldest sessions are always at the
+ * beginning of the list (which is the case if we guarantee to
+ * enqueue at the tail only
+ *
+ */
+
+ for (;;) {
+ sp = VTAILQ_FIRST(&vws->sesshead);
+ if (sp == NULL)
+ break;
+ if (sp->t_open > deadline) {
+ break;
+ }
+ VTAILQ_REMOVE(&vws->sesshead, sp, list);
+ if(sp->fd != -1) {
+ vws_del(vws, sp->fd);
+ }
+ SES_Delete(sp, "timeout");
+ }
+
+ /*
+ * Calculate the timeout for the next get_portn
+ */
+
+ if (sp) {
+ double tmo = (sp->t_open + params->sess_timeout) - now;
+
+ /* we should have removed all sps whose timeout has passed */
+ assert(tmo > 0.0);
+
+ if (tmo < min_t) {
+ timeout = &min_ts;
+ } else if (tmo > max_t) {
+ timeout = &max_ts;
+ } else {
+ ts = VTIM_timespec(tmo);
+ timeout = &ts;
+ }
+ } else {
+ timeout = &max_ts;
+ }
+ }
+}
+
+/*--------------------------------------------------------------------*/
+
+static void
+vws_pass(void *priv, const struct sess *sp)
+{
+ int r;
+ struct vws *vws;
+
+ CAST_OBJ_NOTNULL(vws, priv, VWS_MAGIC);
+ while((r = port_send(vws->dport, 0, TRUST_ME(sp))) == -1 &&
+ errno == EAGAIN);
+ AZ(r);
+}
+
+/*--------------------------------------------------------------------*/
+
+static void *
+vws_init(void)
+{
+ struct vws *vws;
+
+ ALLOC_OBJ(vws, VWS_MAGIC);
+ AN(vws);
+ VTAILQ_INIT(&vws->sesshead);
+ AZ(pthread_create(&vws->ports_thread, NULL, vws_thread, vws));
+ return (vws);
+}
+
+/*--------------------------------------------------------------------*/
+
+const struct waiter waiter_ports = {
+ .name = "ports",
+ .init = vws_init,
+ .pass = vws_pass
+};
+
+#endif /* defined(HAVE_PORT_CREATE) */
More information about the varnish-commit
mailing list