r597 - trunk/varnish-cache/bin/varnishd

phk at projects.linpro.no phk at projects.linpro.no
Wed Aug 2 11:34:40 CEST 2006


Author: phk
Date: 2006-08-02 11:34:40 +0200 (Wed, 02 Aug 2006)
New Revision: 597

Modified:
   trunk/varnish-cache/bin/varnishd/cache.h
   trunk/varnish-cache/bin/varnishd/cache_acceptor.c
   trunk/varnish-cache/bin/varnishd/cache_http.c
Log:
Bite the bullet and write an alternate acceptor which uses kqueue
directly instead of libevent.

Degeneralize the header reading code in cache_http.c which seems to
be cleaner anyway.

An #ifdef at the top of cache_acceptor.c selects which implementation
you want: libevent or kqueue.



Modified: trunk/varnish-cache/bin/varnishd/cache.h
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache.h	2006-08-02 07:23:45 UTC (rev 596)
+++ trunk/varnish-cache/bin/varnishd/cache.h	2006-08-02 09:34:40 UTC (rev 597)
@@ -46,8 +46,6 @@
  * RSN: struct worker and struct session will have one of these embedded.
  */
 
-typedef void http_callback_f(void *, int bad);
-
 struct http_hdr {
 	char			*b;
 	char			*e;
@@ -56,9 +54,6 @@
 struct http {
 	unsigned		magic;
 #define HTTP_MAGIC		0x6428b5c9
-	struct event		ev;
-	http_callback_f		*callback;
-	void			*arg;
 
 	char			*s;		/* (S)tart of buffer */
 	char			*t;		/* start of (T)railing data */
@@ -230,6 +225,7 @@
 	int			id;
 	unsigned		xid;
 
+	struct event		ev;
 	struct worker		*wrk;
 
 	unsigned		sockaddrlen;
@@ -346,7 +342,9 @@
 int http_HdrIs(struct http *hp, const char *hdr, const char *val);
 int http_GetTail(struct http *hp, unsigned len, char **b, char **e);
 int http_Read(struct http *hp, int fd, void *b, unsigned len);
-void http_RecvHeadEv(struct http *hp, int fd, struct event_base *eb, http_callback_f *func, void *arg);
+void http_RecvPrep(struct http *hp);
+int http_RecvPrepAgain(struct http *hp);
+int http_RecvSome(int fd, struct http *hp);
 int http_RecvHead(struct http *hp, int fd);
 int http_DissectRequest(struct http *sp, int fd);
 int http_DissectResponse(struct http *sp, int fd);

Modified: trunk/varnish-cache/bin/varnishd/cache_acceptor.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor.c	2006-08-02 07:23:45 UTC (rev 596)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor.c	2006-08-02 09:34:40 UTC (rev 597)
@@ -6,6 +6,9 @@
  * write the session pointer to a pipe which the event engine monitors.
  */
 
+#define ACCEPTOR_USE_KQUEUE
+#undef ACCEPTOR_USE_LIBEVENT
+
 #include <stdio.h>
 #include <errno.h>
 #include <string.h>
@@ -16,14 +19,73 @@
 #include <sys/types.h>
 #include <sys/socket.h>
 
-#include <netdb.h>
-
 #include "config.h"
 #include "libvarnish.h"
 #include "heritage.h"
 #include "shmlog.h"
 #include "cache.h"
 
+static pthread_t vca_thread;
+static unsigned		xids;
+
+static struct sess *
+vca_accept_sess(int fd)
+{
+	socklen_t l;
+	struct sockaddr addr[2];	/* XXX: IPv6 hack */
+	struct sess *sp;
+	int i;
+	struct linger linger;
+
+	VSL_stats->client_conn++;
+
+	l = sizeof addr;
+	i = accept(fd, addr, &l);
+	if (i < 0) {
+		VSL(SLT_Debug, fd, "Accept failed errno=%d", errno);
+		/* XXX: stats ? */
+		return (NULL);
+	}
+	sp = SES_New(addr, l);
+	assert(sp != NULL);	/* XXX handle */
+
+	sp->fd = i;
+	sp->id = i;
+
+#ifdef SO_NOSIGPIPE /* XXX Linux */
+	i = 1;
+	AZ(setsockopt(sp->fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof i));
+#endif
+#ifdef SO_LINGER /* XXX Linux*/
+	linger.l_onoff = 0;
+	linger.l_linger = 0;
+	AZ(setsockopt(sp->fd, SOL_SOCKET, SO_LINGER, &linger, sizeof linger));
+#endif
+
+	TCP_name(addr, l, sp->addr, sizeof sp->addr, sp->port, sizeof sp->port);
+	VSL(SLT_SessionOpen, sp->fd, "%s %s", sp->addr, sp->port);
+	return (sp);
+}
+
+static void
+vca_handover(struct sess *sp, int bad)
+{
+
+	if (bad) {
+		vca_close_session(sp,
+		    bad == 1 ? "overflow" : "no request");
+		vca_return_session(sp);
+		return;
+	}
+	sp->step = STP_RECV;
+	VSL_stats->client_req++;
+	sp->xid = xids++;
+	VSL(SLT_XID, sp->fd, "%u", sp->xid);
+	WRK_QueueSession(sp);
+}
+
+#ifdef ACCEPTOR_USE_LIBEVENT
+
 static struct event_base *evb;
 static struct event pipe_e;
 static int pipes[2];
@@ -31,12 +93,11 @@
 static struct event tick_e;
 static struct timeval tick_rate;
 
-static pthread_t vca_thread;
-static unsigned		xids;
-
 static struct event accept_e[2 * HERITAGE_NSOCKS];
 static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
 
+/*--------------------------------------------------------------------*/
+
 static void
 vca_tick(int a, short b, void *c)
 {
@@ -59,28 +120,36 @@
 }
 
 static void
-vca_callback(void *arg, int bad)
+vca_rcvhd_f(int fd, short event, void *arg)
 {
 	struct sess *sp;
+	int i;
 
+	(void)event;
+
 	CAST_OBJ_NOTNULL(sp, arg, SESS_MAGIC);
+	i = http_RecvSome(fd, sp->http);
+	if (i < 0)
+		return;
+
+	event_del(&sp->ev);
 	TAILQ_REMOVE(&sesshead, sp, list);
-	if (bad) {
-		if (bad == 1)
-			vca_close_session(sp, "overflow");
-		else
-			vca_close_session(sp, "no request");
-		vca_return_session(sp);
-		return;
-	}
-	sp->step = STP_RECV;
-	VSL_stats->client_req++;
-	sp->xid = xids++;
-	VSL(SLT_XID, sp->fd, "%u", sp->xid);
-	WRK_QueueSession(sp);
+	vca_handover(sp, i);
 }
 
 static void
+vca_rcvhdev(struct sess *sp)
+{
+
+	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+	clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+	TAILQ_INSERT_TAIL(&sesshead, sp, list);
+	event_set(&sp->ev, sp->fd, EV_READ | EV_PERSIST, vca_rcvhd_f, sp);
+	AZ(event_base_set(evb, &sp->ev));
+	AZ(event_add(&sp->ev, NULL));      /* XXX: timeout */
+}
+
+static void
 pipe_f(int fd, short event, void *arg)
 {
 	struct sess *sp;
@@ -90,53 +159,27 @@
 	(void)arg;
 	i = read(fd, &sp, sizeof sp);
 	assert(i == sizeof sp);
-	clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
-	TAILQ_INSERT_TAIL(&sesshead, sp, list);
-	http_RecvHeadEv(sp->http, sp->fd, evb, vca_callback, sp);
+	if (http_RecvPrepAgain(sp->http)) {
+		vca_handover(sp, 0);
+		return;
+	}
+	vca_rcvhdev(sp);
 }
 
 static void
 accept_f(int fd, short event, void *arg)
 {
-	socklen_t l;
-	struct sockaddr addr[2];	/* XXX: IPv6 hack */
 	struct sess *sp;
-	int i;
-	struct linger linger;
 
 	(void)event;
 	(void)arg;
-	VSL_stats->client_conn++;
 
-
-	l = sizeof addr;
-	i = accept(fd, addr, &l);
-	if (i < 0) {
-		VSL(SLT_Debug, fd, "Accept failed errno=%d", errno);
-		/* XXX: stats ? */
+	sp = vca_accept_sess(fd);
+	if (sp == NULL)
 		return;
-	}
-	sp = SES_New(addr, l);
-	assert(sp != NULL);	/* XXX handle */
 
-	sp->fd = i;
-	sp->id = i;
-
-#ifdef SO_NOSIGPIPE /* XXX Linux */
-	i = 1;
-	AZ(setsockopt(sp->fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof i));
-#endif
-#ifdef SO_LINGER /* XXX Linux*/
-	linger.l_onoff = 0;
-	linger.l_linger = 0;
-	AZ(setsockopt(sp->fd, SOL_SOCKET, SO_LINGER, &linger, sizeof linger));
-#endif
-
-	TCP_name(addr, l, sp->addr, sizeof sp->addr, sp->port, sizeof sp->port);
-	VSL(SLT_SessionOpen, sp->fd, "%s %s", sp->addr, sp->port);
-	clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
-	TAILQ_INSERT_TAIL(&sesshead, sp, list);
-	http_RecvHeadEv(sp->http, sp->fd, evb, vca_callback, sp);
+	http_RecvPrep(sp->http);
+	vca_rcvhdev(sp);
 }
 
 static void *
@@ -147,6 +190,8 @@
 
 	(void)arg;
 
+	tick_rate.tv_sec = 1;
+	tick_rate.tv_usec = 0;
 	AZ(pipe(pipes));
 	evb = event_init();
 	assert(evb != NULL);
@@ -187,37 +232,153 @@
 /*--------------------------------------------------------------------*/
 
 void
-vca_close_session(struct sess *sp, const char *why)
+vca_return_session(struct sess *sp)
 {
 
-	VSL(SLT_SessionClose, sp->fd, why);
-	if (sp->fd >= 0)
-		AZ(close(sp->fd));
-	sp->fd = -1;
+	if (sp->fd < 0) {
+		SES_Delete(sp);
+		return;
+	}
+	VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+	assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
 }
 
+#endif /* ACCEPTOR_USE_LIBEVENT */
+
+#ifdef ACCEPTOR_USE_KQUEUE
+#include <sys/event.h>
+
+static int kq = -1;
+
+static void
+vca_kq_sess(struct sess *sp, int arm)
+{
+	struct kevent ke[2];
+
+	assert(arm == EV_ADD || arm == EV_DELETE);
+	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+	memset(ke, 0, sizeof ke);
+	EV_SET(&ke[0], sp->fd, EVFILT_READ, arm, 0, 0, sp);
+	EV_SET(&ke[1], sp->fd, EVFILT_TIMER, arm , 0, 5000, sp);
+	AZ(kevent(kq, ke, 2, NULL, 0, NULL));
+}
+
+static void
+accept_f(int fd)
+{
+	struct sess *sp;
+
+	sp = vca_accept_sess(fd);
+	if (sp == NULL)
+		return;
+	clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
+	http_RecvPrep(sp->http);
+	vca_kq_sess(sp, EV_ADD);
+}
+
+static void *
+vca_main(void *arg)
+{
+	unsigned u;
+	struct kevent ke;
+	int i;
+	struct sess *sp;
+
+	(void)arg;
+
+	kq = kqueue();
+	assert(kq >= 0);
+
+
+	for (u = 0; u < HERITAGE_NSOCKS; u++) {
+		if (heritage.sock_local[u] >= 0) {
+			memset(&ke, 0, sizeof ke);
+			EV_SET(&ke, heritage.sock_local[u],
+			    EVFILT_READ, EV_ADD, 0, 0, accept_f);
+			AZ(kevent(kq, &ke, 1, NULL, 0, NULL));
+		}
+		if (heritage.sock_remote[u] >= 0) {
+			memset(&ke, 0, sizeof ke);
+			EV_SET(&ke, heritage.sock_remote[u],
+			    EVFILT_READ, EV_ADD, 0, 0, accept_f);
+			AZ(kevent(kq, &ke, 1, NULL, 0, NULL));
+		}
+	}
+
+	while (1) {
+		i = kevent(kq, NULL, 0, &ke, 1, NULL);
+		assert(i == 1);
+#if 0
+		printf("i = %d\n", i);
+		printf("ke.ident = %ju\n", (uintmax_t)ke.ident);
+		printf("ke.filter = %u\n", ke.filter);
+		printf("ke.flags = %u\n", ke.flags);
+		printf("ke.fflags = %u\n", ke.fflags);
+		printf("ke.data = %jd\n", (intmax_t)ke.data);
+		printf("ke.udata = %p\n", ke.udata);
+#endif
+		if (ke.udata == accept_f) {
+			accept_f(ke.ident);
+			continue;
+		}
+		CAST_OBJ_NOTNULL(sp, ke.udata, SESS_MAGIC);
+		if (ke.filter == EVFILT_READ) {
+			i = http_RecvSome(sp->fd, sp->http);
+			if (i == -1)
+				continue;
+			vca_kq_sess(sp, EV_DELETE);
+			vca_handover(sp, i);
+			continue;
+		}
+		if (ke.filter == EVFILT_TIMER) {
+			vca_kq_sess(sp, EV_DELETE);
+			vca_close_session(sp, "timeout");
+			vca_return_session(sp);
+			continue;
+		} 
+		INCOMPL();
+	}
+
+	INCOMPL();
+}
+
 /*--------------------------------------------------------------------*/
 
 void
 vca_return_session(struct sess *sp)
 {
 
-	if (sp->fd >= 0) {
-		VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
-		assert(sizeof sp == write(pipes[1], &sp, sizeof sp));
-	} else {
+	if (sp->fd < 0) {
 		SES_Delete(sp);
+		return;
 	}
+	VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+	if (http_RecvPrepAgain(sp->http))
+		vca_handover(sp, 0);
+	else
+		vca_kq_sess(sp, EV_ADD);
 }
 
+#endif /* ACCEPTOR_USE_KQUEUE */
+
 /*--------------------------------------------------------------------*/
 
 void
+vca_close_session(struct sess *sp, const char *why)
+{
+
+	VSL(SLT_SessionClose, sp->fd, why);
+	if (sp->fd >= 0)
+		AZ(close(sp->fd));
+	sp->fd = -1;
+}
+
+/*--------------------------------------------------------------------*/
+
+void
 VCA_Init(void)
 {
 
-	tick_rate.tv_sec = 1;
-	tick_rate.tv_usec = 0;
 	AZ(pthread_create(&vca_thread, NULL, vca_main, NULL));
 	srandomdev();
 	xids = random();

Modified: trunk/varnish-cache/bin/varnishd/cache_http.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_http.c	2006-08-02 07:23:45 UTC (rev 596)
+++ trunk/varnish-cache/bin/varnishd/cache_http.c	2006-08-02 09:34:40 UTC (rev 597)
@@ -436,14 +436,15 @@
 	if (++p > hp->v)
 		return (0);
 	hp->t = p;
+	assert(hp->t > hp->s);
 	assert(hp->t <= hp->v);
 	return (1);
 }
 
 /*--------------------------------------------------------------------*/
 
-static void
-http_preprecv(struct http *hp)
+void
+http_RecvPrep(struct http *hp)
 {
 	unsigned l;
 
@@ -462,10 +463,19 @@
 	}
 }
 
+int
+http_RecvPrepAgain(struct http *hp)
+{
+	http_RecvPrep(hp);
+	if (hp->v == hp->s)
+		return (0);
+	return (http_header_complete(hp));
+}
+
 /*--------------------------------------------------------------------*/
 
-static int
-http_read_hdr(int fd, struct http *hp)
+int
+http_RecvSome(int fd, struct http *hp)
 {
 	unsigned l;
 	int i;
@@ -507,55 +517,15 @@
 
 /*--------------------------------------------------------------------*/
 
-static void
-http_read_f(int fd, short event, void *arg)
-{
-	struct http *hp;
-	int i;
-
-	(void)event;
-
-	CAST_OBJ_NOTNULL(hp, arg, HTTP_MAGIC);
-	i = http_read_hdr(fd, hp);
-	if (i < 0)
-		return;
-
-	event_del(&hp->ev);
-	if (hp->callback != NULL)
-		hp->callback(hp->arg, i);
-}
-
-
-void
-http_RecvHeadEv(struct http *hp, int fd, struct event_base *eb, http_callback_f *func, void *arg)
-{
-
-	CHECK_OBJ_NOTNULL(hp, HTTP_MAGIC);
-	assert(func != NULL);
-	http_preprecv(hp);
-	if (hp->v != hp->s && http_header_complete(hp)) {
-		func(arg, 0);
-		return;
-	}
-	hp->callback = func;
-	hp->arg = arg;
-	event_set(&hp->ev, fd, EV_READ | EV_PERSIST, http_read_f, hp);
-	AZ(event_base_set(eb, &hp->ev));
-	AZ(event_add(&hp->ev, NULL));      /* XXX: timeout */
-	return;
-}
-
-/*--------------------------------------------------------------------*/
-
 int
 http_RecvHead(struct http *hp, int fd)
 {
 	int i;
 
 	CHECK_OBJ_NOTNULL(hp, HTTP_MAGIC);
-	http_preprecv(hp);
+	http_RecvPrep(hp);
 	do 
-		i = http_read_hdr(fd, hp);
+		i = http_RecvSome(fd, hp);
 	while (i == -1);
 	return (i);
 }




More information about the varnish-commit mailing list