r875 - trunk/varnish-cache/bin/varnishd

phk at projects.linpro.no phk at projects.linpro.no
Mon Aug 21 20:55:24 CEST 2006


Author: phk
Date: 2006-08-21 20:55:24 +0200 (Mon, 21 Aug 2006)
New Revision: 875

Modified:
   trunk/varnish-cache/bin/varnishd/cache.h
   trunk/varnish-cache/bin/varnishd/cache_acceptor.c
   trunk/varnish-cache/bin/varnishd/cache_acceptor.h
   trunk/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c
Log:
Overhaul kqueue acceptor in light of todays learnings.

Use the pipe trick to inject sessions into the system, as far as I
can tell it is cheaper because of the low rate it happens and the
high rate of mutex operations avoided.

Ignore the timer event, but purge the list every time we wake up
to reduce lumpyness of timeout'ing.

Centralize the polling of a session so we don't have the same two
messages spread out all over the place.

Centralize the acceptor thread and send things directly to the worker
thread, leaving only the session-herder in the split out files.

poll & epoll not yet updated accordingly.



Modified: trunk/varnish-cache/bin/varnishd/cache.h
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache.h	2006-08-21 17:49:39 UTC (rev 874)
+++ trunk/varnish-cache/bin/varnishd/cache.h	2006-08-21 18:55:24 UTC (rev 875)
@@ -272,7 +272,6 @@
 
 	struct workreq		workreq;
 	struct acct		acct;
-	unsigned		kqa;
 };
 
 struct backend {

Modified: trunk/varnish-cache/bin/varnishd/cache_acceptor.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor.c	2006-08-21 17:49:39 UTC (rev 874)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor.c	2006-08-21 18:55:24 UTC (rev 875)
@@ -40,6 +40,7 @@
 };
 
 static unsigned		xids;
+static pthread_t 	vca_thread_acct;
 
 struct sess *
 vca_accept_sess(int fd)
@@ -128,6 +129,23 @@
 
 /*--------------------------------------------------------------------*/
 
+int
+vca_pollsession(struct sess *sp)
+{
+	int i;
+
+	i = http_RecvSome(sp->fd, sp->http);
+	if (i < 1)
+		return (i);
+	if (i == 1)
+		vca_close_session(sp, "overflow");
+	else if (i == 2)
+		vca_close_session(sp, "no request");
+	return (1);
+}
+
+/*--------------------------------------------------------------------*/
+
 void
 vca_close_session(struct sess *sp, const char *why)
 {
@@ -143,11 +161,35 @@
 {
 
 	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+	if (sp->fd >= 0) {
+		VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
+		(void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
+		if (http_RecvPrepAgain(sp->http))
+			vca_handover(sp, 0);
+	}
 	vca_acceptors[0]->recycle(sp);
 }
 
 /*--------------------------------------------------------------------*/
 
+static void *
+vca_acct(void *arg)
+{
+	struct sess *sp;
+
+	(void)arg;
+	while (1) {
+		sp = vca_accept_sess(heritage.socket);
+		if (sp == NULL)
+			continue;
+		http_RecvPrep(sp->http);
+		vca_handfirst(sp);
+	}
+}
+
+
+/*--------------------------------------------------------------------*/
+
 void
 VCA_Init(void)
 {
@@ -161,4 +203,5 @@
 		exit (2);
 	}
 	vca_acceptors[0]->init();
+	AZ(pthread_create(&vca_thread_acct, NULL, vca_acct, NULL));
 }

Modified: trunk/varnish-cache/bin/varnishd/cache_acceptor.h
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor.h	2006-08-21 17:49:39 UTC (rev 874)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor.h	2006-08-21 18:55:24 UTC (rev 875)
@@ -29,4 +29,5 @@
 struct sess *vca_accept_sess(int fd);
 void vca_handover(struct sess *sp, int bad);
 void vca_handfirst(struct sess *sp);
+int vca_pollsession(struct sess *sp);
 

Modified: trunk/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c
===================================================================
--- trunk/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c	2006-08-21 17:49:39 UTC (rev 874)
+++ trunk/varnish-cache/bin/varnishd/cache_acceptor_kqueue.c	2006-08-21 18:55:24 UTC (rev 875)
@@ -19,109 +19,82 @@
 #include <sys/socket.h>
 #include <sys/event.h>
 
-#ifndef HAVE_SRANDOMDEV
-#include "compat/srandomdev.h"
-#endif
-
 #include "heritage.h"
 #include "shmlog.h"
 #include "cache.h"
 #include "cache_acceptor.h"
 
-static pthread_t vca_kqueue_thread1;
-static pthread_t vca_kqueue_thread2;
+static pthread_t vca_kqueue_thread;
 static int kq = -1;
 
+static TAILQ_HEAD(,sess) sesshead = TAILQ_HEAD_INITIALIZER(sesshead);
+static int pipes[2];
+
 #define NKEV	100
 
 static void
 vca_kq_sess(struct sess *sp, int arm)
 {
-	struct kevent ke[2];
-	int i, j, arm2;
+	struct kevent ke;
+	int i;
 
 	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
-	memset(ke, 0, sizeof ke);
-	if (arm == EV_ADD || arm == EV_ENABLE) {
-		assert(sp->kqa == 0);
-		sp->kqa = 1;
-		arm2 = EV_ADD;
-	} else  {
-		assert(sp->kqa == 1);
-		sp->kqa = 0;
-		arm2 = EV_DELETE;
-	}
-	j = 0;
-	EV_SET(&ke[j++], sp->id, EVFILT_TIMER, arm2,
-	    0, params->sess_timeout * 1000, sp);
-	if (sp->fd >= 0)
-		EV_SET(&ke[j++], sp->fd, EVFILT_READ, arm, 0, 0, sp);
-
-	i = kevent(kq, ke, j, NULL, 0, NULL);
+	if (sp->fd < 0)
+		return;
+	EV_SET(&ke, sp->fd, EVFILT_READ, arm, 0, 0, sp);
+	i = kevent(kq, &ke, 1, NULL, 0, NULL);
 	assert(i == 0);
 }
 
-static struct sess *
+static void
 vca_kev(struct kevent *kp)
 {
 	int i;
 	struct sess *sp;
 
-	if (kp->udata == NULL) {
-		VSL(SLT_Debug, 0,
-		    "KQ RACE %s flags %x fflags %x data %x",
-		    kp->filter == EVFILT_READ ? "R" : "T",
-		    kp->flags, kp->fflags, kp->data);
-		return (NULL);
+	assert(kp->udata != NULL);
+	if (kp->udata == pipes) {
+		while (kp->data > 0) {
+			i = read(pipes[0], &sp, sizeof sp);
+			assert(i == sizeof sp);
+			kp->data -= i;
+			CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+			TAILQ_INSERT_TAIL(&sesshead, sp, list);
+			vca_kq_sess(sp, EV_ADD);
+		}
+		return;
 	}
 	CAST_OBJ_NOTNULL(sp, kp->udata, SESS_MAGIC);
-	if (sp->kqa == 0) {
-		VSL(SLT_Debug, sp->id,
-		    "KQ %s flags %x fflags %x data %x",
-		    kp->filter == EVFILT_READ ? "R" : "T",
-		    kp->flags, kp->fflags, kp->data);
-		return (NULL);
-	}
-	if (kp->filter == EVFILT_READ) {
-		if (kp->data > 0) {
-			i = http_RecvSome(sp->fd, sp->http);
-			switch (i) {
-			case -1:
-				return (NULL);
-			case 0:
-				vca_kq_sess(sp, EV_DISABLE);
-				vca_handover(sp, i);
-				return (NULL);	 /* ?? */
-			case 1:
-				vca_close_session(sp, "overflow");
-				break;
-			case 2:
-				vca_close_session(sp, "no request");
-				break;
-			default:
-				INCOMPL();
-			}
-			return (sp);
+	if (kp->data > 0) {
+		i = vca_pollsession(sp);
+		if (i == -1)
+			return;
+		TAILQ_REMOVE(&sesshead, sp, list);
+		if (i == 0) {
+			vca_kq_sess(sp, EV_DELETE);
+			vca_handover(sp, i);
+		} else {
+			SES_Delete(sp);
 		}
-		if (kp->flags == EV_EOF) {
-			vca_close_session(sp, "EOF");
-			return (sp);
-		}
-		INCOMPL();
+		return;
 	}
-	if (kp->filter == EVFILT_TIMER) {
-		vca_close_session(sp, "timeout");
-		return (sp);
+	if (kp->flags == EV_EOF) {
+		TAILQ_REMOVE(&sesshead, sp, list);
+		vca_close_session(sp, "EOF");
+		SES_Delete(sp);
+		return;
 	}
 	INCOMPL();
 }
 
+/*--------------------------------------------------------------------*/
 
 static void *
 vca_kqueue_main(void *arg)
 {
 	struct kevent ke[NKEV], *kp;
 	int i, j, n;
+	struct timespec ts;
 	struct sess *sp;
 
 	(void)arg;
@@ -129,62 +102,57 @@
 	kq = kqueue();
 	assert(kq >= 0);
 
+	j = 0;
+	EV_SET(&ke[j++], 0, EVFILT_TIMER, EV_ADD, 0, 100, NULL);
+	EV_SET(&ke[j++], pipes[0], EVFILT_READ, EV_ADD, 0, 0, pipes);
+	i = kevent(kq, ke, j, NULL, 0, NULL);
+	assert(i == 0);
+
 	while (1) {
 		n = kevent(kq, NULL, 0, ke, NKEV, NULL);
 		assert(n >= 1 && n <= NKEV);
 		for (kp = ke, j = 0; j < n; j++, kp++) {
-			sp = vca_kev(kp);
-			if (sp != NULL) {
-				vca_kq_sess(sp, EV_DELETE);
-				SES_Delete(sp);
-				for (i = j; i < n; i++)
-					if (ke[i].udata == sp)
-						ke[i].udata = NULL;
-			}
+			if (kp->filter == EVFILT_TIMER)
+				continue; 
+			assert(kp->filter == EVFILT_READ);
+			vca_kev(kp);
 		}
+		clock_gettime(CLOCK_REALTIME, &ts);
+		ts.tv_sec -= params->sess_timeout;
+		for (;;) {
+			sp = TAILQ_FIRST(&sesshead);
+			if (sp == NULL)
+				break;
+			if (sp->t_open.tv_sec > ts.tv_sec) 
+				break;
+			if (sp->t_open.tv_sec == ts.tv_sec &&
+			    sp->t_open.tv_nsec > ts.tv_nsec)
+				break;
+			TAILQ_REMOVE(&sesshead, sp, list);
+			vca_close_session(sp, "timeout");
+			SES_Delete(sp);
+		}
 	}
-	INCOMPL();
 }
 
-static void *
-vca_kqueue_acct(void *arg)
-{
-	struct sess *sp;
-
-	(void)arg;
-	while (1) {
-		sp = vca_accept_sess(heritage.socket);
-		if (sp == NULL)
-			continue;
-		clock_gettime(CLOCK_MONOTONIC, &sp->t_idle);
-		http_RecvPrep(sp->http);
-		vca_kq_sess(sp, EV_ADD);
-	}
-}
-
 /*--------------------------------------------------------------------*/
 
 static void
 vca_kqueue_recycle(struct sess *sp)
 {
 
-	if (sp->fd < 0) {
+	if (sp->fd < 0)
 		SES_Delete(sp);
-		return;
-	}
-	(void)clock_gettime(CLOCK_REALTIME, &sp->t_open);
-	VSL(SLT_SessionReuse, sp->fd, "%s %s", sp->addr, sp->port);
-	if (http_RecvPrepAgain(sp->http))
-		vca_handover(sp, 0);
-	else 
-		vca_kq_sess(sp, EV_ENABLE);
+	else
+		assert(write(pipes[1], &sp, sizeof sp) == sizeof sp);
 }
 
 static void
 vca_kqueue_init(void)
 {
-	AZ(pthread_create(&vca_kqueue_thread1, NULL, vca_kqueue_main, NULL));
-	AZ(pthread_create(&vca_kqueue_thread2, NULL, vca_kqueue_acct, NULL));
+
+	AZ(pipe(pipes));
+	AZ(pthread_create(&vca_kqueue_thread, NULL, vca_kqueue_main, NULL));
 }
 
 struct acceptor acceptor_kqueue = {




More information about the varnish-commit mailing list