r1620 - trunk/varnish-cache/bin/varnishreplay

cecilihf at projects.linpro.no cecilihf at projects.linpro.no
Tue Jul 3 10:07:09 CEST 2007


Author: cecilihf
Date: 2007-07-03 10:07:09 +0200 (Tue, 03 Jul 2007)
New Revision: 1620

Modified:
   trunk/varnish-cache/bin/varnishreplay/Makefile.am
   trunk/varnish-cache/bin/varnishreplay/varnishreplay.c
Log:
parellising varnishreplay. Work in progress.


Modified: trunk/varnish-cache/bin/varnishreplay/Makefile.am
===================================================================
--- trunk/varnish-cache/bin/varnishreplay/Makefile.am	2007-07-02 17:30:03 UTC (rev 1619)
+++ trunk/varnish-cache/bin/varnishreplay/Makefile.am	2007-07-03 08:07:09 UTC (rev 1620)
@@ -14,4 +14,6 @@
 varnishreplay_LDADD = \
 	$(top_builddir)/lib/libvarnish/libvarnish.la \
 	$(top_builddir)/lib/libcompat/libcompat.a \
-	$(top_builddir)/lib/libvarnishapi/libvarnishapi.la
+	$(top_builddir)/lib/libvarnishapi/libvarnishapi.la \
+	${PTHREAD_LIBS}
+

Modified: trunk/varnish-cache/bin/varnishreplay/varnishreplay.c
===================================================================
--- trunk/varnish-cache/bin/varnishreplay/varnishreplay.c	2007-07-02 17:30:03 UTC (rev 1619)
+++ trunk/varnish-cache/bin/varnishreplay/varnishreplay.c	2007-07-03 08:07:09 UTC (rev 1620)
@@ -30,32 +30,63 @@
 
 #include <ctype.h>
 #include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
-#include <fcntl.h>
 
 #include "libvarnish.h"
+#include "queue.h"
 #include "varnishapi.h"
 #include "vss.h"
 
-static struct request {
-	char *df_H;			/* %H, Protocol version */
-	char *df_Host;			/* %{Host}i */
-	char *df_Uq;			/* %U%q, URL path and query string */
-	char *df_m;			/* %m, Request method*/
-	char *df_c;			/* Connection info (keep-alive, close) */
-	int bogus;			/* bogus request */
-} **req;
+static struct thread {
+	pthread_t thread_id;
+	struct mailbox *mbox;
+} **threads;
+	
+struct mailbox {
+	pthread_mutex_t lock;
+	pthread_cond_t has_mail;
+	STAILQ_HEAD(msgq_head, message) messages;
+};
 
-static size_t nreq;
+struct message {
+	enum shmlogtag tag;
+	char *ptr;
+	unsigned len;
+	STAILQ_ENTRY(message) list;
+};
 
+static size_t nthreads;
+
 static struct vss_addr *adr_info;
-static int sock;
-static int reopen;
 static int debug;
 
+static void
+mailbox_put(struct mailbox *mbox, struct message *msg)
+{
+	pthread_mutex_lock(&mbox->lock);
+	STAILQ_INSERT_TAIL(&mbox->messages, msg, list);
+	pthread_cond_signal(&mbox->has_mail);
+	pthread_mutex_unlock(&mbox->lock);
+}
+
+static struct message *
+mailbox_get(struct mailbox *mbox)
+{
+	struct message *msg;
+	
+	pthread_mutex_lock(&mbox->lock);
+	while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL)
+		pthread_cond_wait(&mbox->has_mail, &mbox->lock);
+	STAILQ_REMOVE_HEAD(&mbox->messages, list);
+	pthread_mutex_unlock(&mbox->lock);
+	return msg;
+}
+
 static int
 isprefix(const char *str, const char *prefix, const char *end, const char **next)
 {
@@ -153,7 +184,7 @@
  * A line is terminated by \r\n
  */
 static int
-read_line(char **line)
+read_line(char **line, int sock)
 {
 	char *buf;
 	unsigned nbuf, lbuf;
@@ -191,7 +222,7 @@
  * the number of bytes read.
  */
 static int
-read_block(int length)
+read_block(int length, int sock)
 {
 	char *buf;
 	int n, nbuf;
@@ -214,7 +245,7 @@
 /* Receive the response after sending a request.
  */
 static int
-receive_response(void)
+receive_response(int sock)
 {
 	char *line, *end;
 	const char *next;
@@ -229,7 +260,7 @@
 
 	/* Read header */
 	while (1) {
-		line_len = read_line(&line);
+		line_len = read_line(&line, sock);
 		end = line + line_len;
 
 		if (*line == '\r' && *(line + 1) == '\n') {
@@ -260,7 +291,7 @@
 		/* Fixed body size, read content_length bytes */
 		if (debug)
 			fprintf(stderr, "fixed length\n");
-		n = read_block(content_length);
+		n = read_block(content_length, sock);
 		if (debug) {
 			fprintf(stderr, "size of body: %d\n", (int)content_length);
 			fprintf(stderr, "bytes read: %d\n", n);
@@ -270,22 +301,22 @@
 		if (debug)
 			fprintf(stderr, "chunked encoding\n");
 		while (1) {
-			line_len = read_line(&line);
+			line_len = read_line(&line, sock);
 			end = line + line_len;
 			block_len = strtol(line, &end, 16);
 			if (block_len == 0) {
 				break;
 			}
-			n = read_block(block_len);
+			n = read_block(block_len, sock);
 			if (debug) {
 				fprintf(stderr, "size of body: %d\n", (int)block_len);
 				fprintf(stderr, "bytes read: %d\n", n);
 			}
 			free(line);
-			n = read_line(&line);
+			n = read_line(&line, sock);
 			free(line);
 		}
-		n = read_line(&line);
+		n = read_line(&line, sock);
 		free(line);
 	} else if ((content_length <= 0 && !chunked) || req_failed) {
 		/* No body --> stop reading. */
@@ -302,155 +333,198 @@
 	return close_connection;
 }
 
-
-static int
-gen_traffic(void *priv, enum shmlogtag tag, unsigned fd,
-    unsigned len, unsigned spec, const char *ptr)
+static void *
+pthread_main(void *arg)
 {
+	struct message *msg;
+	struct thread *th = (struct thread*)arg;
+	enum shmlogtag tag;
+	int len;
+	char *ptr;
 	const char *end, *next;
-	FILE *fo;
-	struct request *rp;
+	
+	char *df_H = NULL;			/* %H, Protocol version */
+	char *df_Host = NULL;			/* %{Host}i */
+	char *df_Uq = NULL;			/* %U%q, URL path and query string */
+	char *df_m = NULL;			/* %m, Request method*/
+	char *df_c = NULL;			/* Connection info (keep-alive, close) */
+	int bogus = 0;				/* bogus request */
 
-	end = ptr + len;
+	int sock, reopen = 1;
+	
+	//fprintf(stderr, "thread started\n");
+	
+	do {
+		msg = mailbox_get(th->mbox);
+		tag = msg->tag;
+		len = msg->len;
+		ptr = msg->ptr;
+		end = ptr + len;
+	
+		//fprintf(stderr, "%08x %s(%s)\n", (unsigned int)pthread_self(), VSL_tags[tag], msg->ptr);
+		
+		switch (tag) {
+		case SLT_RxRequest:
+			if (df_m != NULL)
+				bogus = 1;
+			else
+				df_m = trimline(ptr, end);
+			break;
 
-	if (!(spec & VSL_S_CLIENT))
-		return (0);
+		case SLT_RxURL:
+			if (df_Uq != NULL)
+				bogus = 1;
+			else
+				df_Uq = trimline(ptr, end);
+			break;
 
-	if (fd >= nreq) {
-		struct request **newreq = req;
-		size_t newnreq = nreq;
-
-		while (fd >= newnreq)
-			newnreq += newnreq + 1;
-		newreq = realloc(newreq, newnreq * sizeof *newreq);
-		assert(newreq != NULL);
-		memset(newreq + nreq, 0, (newnreq - nreq) * sizeof *newreq);
-		req = newreq;
-		nreq = newnreq;
-	}
-	if (req[fd] == NULL) {
-		req[fd] = calloc(sizeof *req[fd], 1);
-		assert(req[fd] != NULL);
-	}
-	rp = req[fd];
-
-	switch (tag) {
-	case SLT_RxRequest:
-		if (tag == SLT_RxRequest && (spec & VSL_S_BACKEND))
+		case SLT_RxProtocol:
+			if (df_H != NULL)
+				bogus = 1;
+			else
+				df_H = trimline(ptr, end);
 			break;
 
-		if (rp->df_m != NULL)
-			rp->bogus = 1;
-		else
-			rp->df_m = trimline(ptr, end);
-		break;
-
-	case SLT_RxURL:
-		if (tag == SLT_RxURL && (spec & VSL_S_BACKEND))
+		case SLT_RxHeader:
+			if (isprefix(ptr, "host:", end, &next))
+				df_Host = trimline(next, end);
+			if (isprefix(ptr, "connection:", end, &next))
+				df_c = trimline(next, end);
 			break;
 
-		if (rp->df_Uq != NULL)
-			rp->bogus = 1;
-		else
-			rp->df_Uq = trimline(ptr, end);
-		break;
-
-	case SLT_RxProtocol:
-		if (tag == SLT_RxProtocol && (spec & VSL_S_BACKEND))
+		default:
 			break;
+		}
 
-		if (rp->df_H != NULL)
-			rp->bogus = 1;
-		else
-			rp->df_H = trimline(ptr, end);
-		break;
+		if (tag != SLT_ReqEnd)
+			continue;
+			
+		//fprintf(stderr, "bogus: %d %s\n", bogus, df_m);
 
-	case SLT_RxHeader:
-		if (isprefix(ptr, "host:", end, &next))
-			rp->df_Host = trimline(next, end);
-		if (isprefix(ptr, "connection:", end, &next))
-			rp->df_c = trimline(next, end);
-		break;
+		if (!bogus) {
+			/* If the method is supported (GET or HEAD), send the request out
+			 * on the socket. If the socket needs reopening, reopen it first.
+			 * When the request is sent, call the function for receiving
+			 * the answer.
+			 */
+			if (!(strncmp(df_m, "GET", 3) && strncmp(df_m, "HEAD", 4))) {
+				if (reopen)
+					sock = VSS_connect(adr_info);
+				reopen = 0;
 
-	default:
-		break;
-	}
+				if (debug) {
+					fprintf(stderr, "%s ", df_m);
+					fprintf(stderr, "%s ", df_Uq);
+					fprintf(stderr, "%s ", df_H);
+					fprintf(stderr, "\n");
+					fprintf(stderr, "Host: ");
+				}
+				write(sock, df_m, strlen(df_m));
+				write(sock, " ", 1);
+				write(sock, df_Uq, strlen(df_Uq));
+				write(sock, " ", 1);
+				write(sock, df_H, strlen(df_H));
+				write(sock, " ", 1);
+				write(sock, "\r\n", 2);
 
-	if ((spec & VSL_S_CLIENT) && tag != SLT_ReqEnd)
-		return (0);
+				if (strncmp(df_H, "HTTP/1.0", 8))
+					reopen = 1;
 
-	if (!rp->bogus) {
-		fo = priv;
-
-		/* If the method is supported (GET or HEAD), send the request out
-		 * on the socket. If the socket needs reopening, reopen it first.
-		 * When the request is sent, call the function for receiving
-		 * the answer.
-		 */
-		if (!(strncmp(rp->df_m, "GET", 3) && strncmp(rp->df_m, "HEAD", 4))) {
-			if (reopen)
-				sock = VSS_connect(adr_info);
-			reopen = 0;
-
-			if (debug) {
-				fprintf(fo, "%s ", rp->df_m);
-				fprintf(fo, "%s ", rp->df_Uq);
-				fprintf(fo, "%s ", rp->df_H);
-				fprintf(fo, "\n");
-				fprintf(fo, "Host: ");
-			}
-			write(sock, rp->df_m, strlen(rp->df_m));
-			write(sock, " ", 1);
-			write(sock, rp->df_Uq, strlen(rp->df_Uq));
-			write(sock, " ", 1);
-			write(sock, rp->df_H, strlen(rp->df_H));
-			write(sock, " ", 1);
-			write(sock, "\r\n", 2);
-
-			if (strncmp(rp->df_H, "HTTP/1.0", 8))
-				reopen = 1;
-
-			write(sock, "Host: ", 6);
-			if (rp->df_Host) {
+				write(sock, "Host: ", 6);
+				if (df_Host) {
+					if (debug)
+						fprintf(stderr, df_Host);
+					write(sock, df_Host, strlen(df_Host));
+				}
 				if (debug)
-					fprintf(fo, rp->df_Host);
-				write(sock, rp->df_Host, strlen(rp->df_Host));
-			}
-			if (debug)
-				fprintf(fo, "\n");
-			write(sock, "\r\n", 2);
-			if (rp->df_c) {
+					fprintf(stderr, "\n");
+				write(sock, "\r\n", 2);
+				if (df_c) {
+					if (debug)
+						fprintf(stderr, "Connection: %s\n", df_c);
+					write(sock, "Connection: ", 12);
+					write(sock, df_c, strlen(df_c));
+					write(sock, "\r\n", 2);
+					if (isequal(df_c, "keep-alive", df_c + strlen(df_c)))
+						reopen = 0;
+				}
 				if (debug)
-					fprintf(fo, "Connection: %s\n", rp->df_c);
-				write(sock, "Connection: ", 12);
-				write(sock, rp->df_c, strlen(rp->df_c));
+					fprintf(stderr, "\n");
 				write(sock, "\r\n", 2);
-				if (isequal(rp->df_c, "keep-alive", rp->df_c + strlen(rp->df_c)))
-					reopen = 0;
+				if (!reopen)
+					reopen = receive_response(sock);
+				if (reopen)
+					close(sock);
 			}
-			if (debug)
-				fprintf(fo, "\n");
-			write(sock, "\r\n", 2);
-			if (!reopen)
-				reopen = receive_response();
-			if (reopen)
-				close(sock);
 		}
-	}
 
-	/* clean up */
+		/* clean up */
 #define freez(x) do { if (x) free(x); x = NULL; } while (0);
-	freez(rp->df_H);
-	freez(rp->df_Host);
-	freez(rp->df_Uq);
-	freez(rp->df_m);
-	freez(rp->df_c);
+		freez(df_H);
+		freez(df_Host);
+		freez(df_Uq);
+		freez(df_m);
+		freez(df_c);
 #undef freez
-	rp->bogus = 0;
+		bogus = 0;
+	} while (1);		
 
 	return (0);
 }
 
+
+static int
+gen_traffic(void *priv, enum shmlogtag tag, unsigned fd,
+    unsigned len, unsigned spec, const char *ptr)
+{
+	const char *end;
+	struct message *msg;
+	int err;
+	
+	(void)priv;
+
+	end = ptr + len;
+
+	if (!(spec & VSL_S_CLIENT))
+		return (0);
+
+	//fprintf(stderr, "gen_traffic\n");
+
+	if (fd >= nthreads) {
+		struct thread **newthreads = threads;
+		size_t newnthreads = nthreads;
+
+		while (fd >= newnthreads)
+			newnthreads += newnthreads + 1;
+		newthreads = realloc(newthreads, newnthreads * sizeof *newthreads);
+		assert(newthreads != NULL);
+		memset(newthreads + nthreads, 0, (newnthreads - nthreads) * sizeof *newthreads);
+		threads = newthreads;
+		nthreads = newnthreads;
+	}
+	if (threads[fd] == NULL) {
+		threads[fd] = malloc(sizeof *threads[fd]);
+		assert(threads[fd] != NULL);
+		threads[fd]->mbox = malloc(sizeof (struct mailbox));
+		STAILQ_INIT(&threads[fd]->mbox->messages);
+		pthread_mutex_init(&threads[fd]->mbox->lock, NULL);
+		pthread_cond_init(&threads[fd]->mbox->has_mail, NULL);
+		err = pthread_create(&threads[fd]->thread_id, NULL, pthread_main, threads[fd]);
+		if (err)
+			fprintf(stderr, "thread creation failed\n");
+		fprintf(stderr, "Ok, thread %08x created... %d\n", (unsigned int)threads[fd]->thread_id, err);
+	}
+	msg = malloc(sizeof (struct message));
+	msg->tag = tag;
+	msg->ptr = strdup(ptr);
+	msg->len = len;
+	mailbox_put(threads[fd]->mbox, msg);
+	//fprintf(stderr, "message put\n");
+	
+	return 0;
+}
+
+
 /* This function is for testing only, and only sends
  * the raw data from the file to the address.
  * The receive function is called for each blank line.
@@ -461,6 +535,8 @@
 	int fd = open(file, O_RDONLY);
 	char buf[2];
 	char last = ' ';
+	int sock, reopen = 1;
+	
 	adr_info = init_connection(address);
 	sock = VSS_connect(adr_info);
 	while (read(fd, buf, 1)) {
@@ -468,7 +544,7 @@
 		fprintf(stderr, "%s", buf);
 		if (*buf == '\n' && last == '\n'){
 			fprintf(stderr, "receive\n");
-			reopen = receive_response();
+			reopen = receive_response(sock);
 		}
 		last = *buf;
 	}
@@ -491,15 +567,14 @@
 {
 	int c;
 	struct VSL_data *vd;
-	const char *ofn = NULL;
 	const char *address = NULL;
-	FILE *of;
 
 	char *test_file = NULL;
 
 	vd = VSL_New();
 	debug = 0;
 
+	VSL_Arg(vd, 'c', NULL);
 	while ((c = getopt(argc, argv, "a:Dr:t:")) != -1) {
 		switch (c) {
 		case 'a':
@@ -534,18 +609,10 @@
 	if (VSL_OpenLog(vd, NULL))
 		exit(1);
 
-	ofn = "stdout";
-	of = stdout;
-
 	adr_info = init_connection(address);
-	reopen = 1;
 
-	while (VSL_Dispatch(vd, gen_traffic, of) == 0) {
-		if (fflush(of) != 0) {
-			perror(ofn);
-			exit(1);
-		}
-	}
+	while (VSL_Dispatch(vd, gen_traffic, NULL) == 0)
+		/* nothing */ ;
 
 	exit(0);
 }




More information about the varnish-commit mailing list