r1623 - trunk/varnish-cache/bin/varnishreplay

des at projects.linpro.no des at projects.linpro.no
Tue Jul 3 11:09:55 CEST 2007


Author: des
Date: 2007-07-03 11:09:55 +0200 (Tue, 03 Jul 2007)
New Revision: 1623

Modified:
   trunk/varnish-cache/bin/varnishreplay/varnishreplay.c
Log:
Reorganize the code a little, and add code to wait for all threads to finish
processing pending messages before we exit.

Note that VSL_Dispatch() will read in log data as fast as it can, so when
working from a log file, varnishreplay will usually read in the entire file
into memory within the first few seconds.


Modified: trunk/varnish-cache/bin/varnishreplay/varnishreplay.c
===================================================================
--- trunk/varnish-cache/bin/varnishreplay/varnishreplay.c	2007-07-03 08:50:34 UTC (rev 1622)
+++ trunk/varnish-cache/bin/varnishreplay/varnishreplay.c	2007-07-03 09:09:55 UTC (rev 1623)
@@ -32,6 +32,7 @@
 #include <errno.h>
 #include <fcntl.h>
 #include <pthread.h>
+#include <stdarg.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -42,32 +43,60 @@
 #include "varnishapi.h"
 #include "vss.h"
 
-static struct thread {
-	pthread_t thread_id;
-	struct mailbox *mbox;
-} **threads;
-	
-struct mailbox {
-	pthread_mutex_t lock;
-	pthread_cond_t has_mail;
-	STAILQ_HEAD(msgq_head, message) messages;
-};
+#ifndef HAVE_STRNDUP
+#include "compat/strndup.h"
+#endif
 
+#define freez(x) do { if (x) free(x); x = NULL; } while (0);
+
+static struct vss_addr *addr_info;
+static int debug;
+
+/*
+ * mailbox toolkit
+ */
+
 struct message {
 	enum shmlogtag tag;
+	size_t len;
 	char *ptr;
-	unsigned len;
 	STAILQ_ENTRY(message) list;
 };
 
-static size_t nthreads;
+struct mailbox {
+	pthread_mutex_t lock;
+	pthread_cond_t has_mail;
+	int open;
+	STAILQ_HEAD(msgq_head, message) messages;
+};
 
-static struct vss_addr *adr_info;
-static int debug;
+static void
+mailbox_create(struct mailbox *mbox)
+{
 
+	STAILQ_INIT(&mbox->messages);
+	pthread_mutex_init(&mbox->lock, NULL);
+	pthread_cond_init(&mbox->has_mail, NULL);
+	mbox->open = 1;
+}
+
 static void
+mailbox_destroy(struct mailbox *mbox)
+{
+	struct message *msg;
+
+	while ((msg = STAILQ_FIRST(&mbox->messages))) {
+		STAILQ_REMOVE_HEAD(&mbox->messages, list);
+		free(msg);
+	}
+	pthread_cond_destroy(&mbox->has_mail);
+	pthread_mutex_destroy(&mbox->lock);
+}
+
+static void
 mailbox_put(struct mailbox *mbox, struct message *msg)
 {
+
 	pthread_mutex_lock(&mbox->lock);
 	STAILQ_INSERT_TAIL(&mbox->messages, msg, list);
 	pthread_cond_signal(&mbox->has_mail);
@@ -78,15 +107,113 @@
 mailbox_get(struct mailbox *mbox)
 {
 	struct message *msg;
-	
+
 	pthread_mutex_lock(&mbox->lock);
-	while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL)
+	while ((msg = STAILQ_FIRST(&mbox->messages)) == NULL && mbox->open)
 		pthread_cond_wait(&mbox->has_mail, &mbox->lock);
-	STAILQ_REMOVE_HEAD(&mbox->messages, list);
+	if (msg != NULL)
+		STAILQ_REMOVE_HEAD(&mbox->messages, list);
 	pthread_mutex_unlock(&mbox->lock);
 	return msg;
 }
 
+static void
+mailbox_close(struct mailbox *mbox)
+{
+	pthread_mutex_lock(&mbox->lock);
+	mbox->open = 0;
+	pthread_cond_signal(&mbox->has_mail);
+	pthread_mutex_unlock(&mbox->lock);
+}
+
+/*
+ * thread toolkit
+ */
+
+struct thread {
+	pthread_t thread_id;
+	struct mailbox mbox;
+};
+
+static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static void
+thread_log(int lvl, const char *fmt, ...)
+{
+	va_list ap;
+
+	if (lvl > debug)
+		return;
+	pthread_mutex_lock(&log_mutex);
+	fprintf(stderr, "%08x ", (unsigned int)pthread_self());
+	va_start(ap, fmt);
+	vfprintf(stderr, fmt, ap);
+	va_end(ap);
+	pthread_mutex_unlock(&log_mutex);
+}
+
+static struct thread **threads;
+static size_t nthreads;
+
+static struct thread *
+thread_get(int fd, void *(*thread_main)(void *))
+{
+
+	assert(fd != 0);
+	if (fd >= nthreads) {
+		struct thread **newthreads = threads;
+		size_t newnthreads = nthreads;
+
+		while (fd >= newnthreads)
+			newnthreads += newnthreads + 1;
+		newthreads = realloc(newthreads, newnthreads * sizeof *newthreads);
+		assert(newthreads != NULL);
+		memset(newthreads + nthreads, 0,
+		    (newnthreads - nthreads) * sizeof *newthreads);
+		threads = newthreads;
+		nthreads = newnthreads;
+	}
+	if (threads[fd] == NULL) {
+		threads[fd] = malloc(sizeof *threads[fd]);
+		assert(threads[fd] != NULL);
+		mailbox_create(&threads[fd]->mbox);
+		if (pthread_create(&threads[fd]->thread_id, NULL,
+		    thread_main, threads[fd]) != 0) {
+			thread_log(0, "thread creation failed\n");
+			mailbox_destroy(&threads[fd]->mbox);
+			freez(threads[fd]);
+		}
+		thread_log(1, "thread %08x started\n",
+		    (unsigned int)threads[fd]->thread_id);
+	}
+	return (threads[fd]);
+}
+
+static void
+thread_close(int fd)
+{
+
+	assert(fd < nthreads);
+	if (fd == 0) {
+		for (fd = 1; fd < nthreads; ++fd)
+			thread_close(fd);
+		return;
+	}
+
+	if (threads[fd] == NULL)
+		return;
+	mailbox_close(&threads[fd]->mbox);
+	pthread_join(threads[fd]->thread_id, NULL);
+	thread_log(1, "thread %08x stopped\n",
+	    (unsigned int)threads[fd]->thread_id);
+	mailbox_destroy(&threads[fd]->mbox);
+	freez(threads[fd]);
+}
+
+/*
+ * ...
+ */
+
 static int
 isprefix(const char *str, const char *prefix, const char *end, const char **next)
 {
@@ -159,14 +286,14 @@
 	int i, n;
 
 	if (VSS_parse(address, &addr, &port) != 0) {
-		fprintf(stderr, "Invalid address\n");
+		thread_log(0, "Invalid address\n");
 		exit(2);
 	}
 	n = VSS_resolve(addr, port, &ta);
 	free(addr);
 	free(port);
 	if (n == 0) {
-		fprintf(stderr, "Could not connect to server\n");
+		thread_log(0, "Could not connect to server\n");
 		exit(2);
 	}
 	for (i = 1; i < n; ++i) {
@@ -200,12 +327,11 @@
 			buf = realloc(buf, lbuf);
 			XXXAN(buf);
 		}
-		//fprintf(stderr, "start reading\n");
 		i = read(sock, buf + nbuf, 1);
 		if (i <= 0) {
-			perror("error in reading\n");
+			thread_log(0, "read(): %s\n", strerror(errno));
 			free(buf);
-			exit(1);
+			return (-1);
 		}
 		nbuf += i;
 		if (nbuf >= 2 && buf[nbuf-2] == '\r' && buf[nbuf-1] == '\n')
@@ -233,8 +359,8 @@
 		n = read(sock, buf + nbuf,
 		    (2048 < length - nbuf ? 2048 : length - nbuf));
 		if (n <= 0) {
-			perror("failed reading the block\n");
-			break;
+			thread_log(0, "failed reading the block\n");
+			return (-1);
 		}
 		nbuf += n;
 	}
@@ -282,24 +408,20 @@
 		free(line);
 	}
 
-	if (debug)
-		fprintf(stderr, "status: %d\n", status);
+	thread_log(1, "status: %d\n", status);
 
 
 	/* Read body */
 	if (content_length > 0 && !chunked) {
 		/* Fixed body size, read content_length bytes */
-		if (debug)
-			fprintf(stderr, "fixed length\n");
-		n = read_block(content_length, sock);
-		if (debug) {
-			fprintf(stderr, "size of body: %d\n", (int)content_length);
-			fprintf(stderr, "bytes read: %d\n", n);
-		}
+		thread_log(1, "fixed length\n");
+		thread_log(1, "size of body: %ld\n", content_length);
+		if ((n = read_block(content_length, sock)) < 0)
+			return (1);
+		thread_log(1, "bytes read: %d\n", n);
 	} else if (chunked) {
 		/* Chunked encoding, read size and bytes until no more */
-		if (debug)
-			fprintf(stderr, "chunked encoding\n");
+		thread_log(1, "chunked encoding\n");
 		while (1) {
 			line_len = read_line(&line, sock);
 			end = line + line_len;
@@ -308,10 +430,8 @@
 				break;
 			}
 			n = read_block(block_len, sock);
-			if (debug) {
-				fprintf(stderr, "size of body: %d\n", (int)block_len);
-				fprintf(stderr, "bytes read: %d\n", n);
-			}
+			thread_log(1, "size of body: %d\n", (int)block_len);
+			thread_log(1, "bytes read: %d\n", n);
 			free(line);
 			n = read_line(&line, sock);
 			free(line);
@@ -320,29 +440,27 @@
 		free(line);
 	} else if ((content_length <= 0 && !chunked) || req_failed) {
 		/* No body --> stop reading. */
-		if (debug)
-			fprintf(stderr, "no body\n");
+		thread_log(1, "no body\n");
+		return (1);
 	} else {
 		/* Unhandled case. */
-		fprintf(stderr, "An error occured\n");
-		exit(1);
+		thread_log(0, "An error occured\n");
+		return (1);
 	}
-	if (debug)
-		fprintf(stderr, "\n");
 
 	return close_connection;
 }
 
 static void *
-pthread_main(void *arg)
+replay_thread(void *arg)
 {
+	struct thread *thr = arg;
 	struct message *msg;
-	struct thread *th = (struct thread*)arg;
 	enum shmlogtag tag;
-	int len;
+	size_t len;
 	char *ptr;
 	const char *end, *next;
-	
+
 	char *df_H = NULL;			/* %H, Protocol version */
 	char *df_Host = NULL;			/* %{Host}i */
 	char *df_Uq = NULL;			/* %U%q, URL path and query string */
@@ -351,18 +469,15 @@
 	int bogus = 0;				/* bogus request */
 
 	int sock, reopen = 1;
-	
-	//fprintf(stderr, "thread started\n");
-	
-	do {
-		msg = mailbox_get(th->mbox);
+
+	while ((msg = mailbox_get(&thr->mbox)) != NULL) {
 		tag = msg->tag;
 		len = msg->len;
 		ptr = msg->ptr;
 		end = ptr + len;
-	
-		//fprintf(stderr, "%08x %s(%s)\n", (unsigned int)pthread_self(), VSL_tags[tag], msg->ptr);
-		
+
+		thread_log(2, "%s(%s)\n", VSL_tags[tag], msg->ptr);
+
 		switch (tag) {
 		case SLT_RxRequest:
 			if (df_m != NULL)
@@ -398,10 +513,10 @@
 
 		if (tag != SLT_ReqEnd)
 			continue;
-			
-		//fprintf(stderr, "bogus: %d %s\n", bogus, df_m);
 
-		if (!bogus) {
+		if (bogus) {
+			thread_log(1, "bogus\n");
+		} else {
 			/* If the method is supported (GET or HEAD), send the request out
 			 * on the socket. If the socket needs reopening, reopen it first.
 			 * When the request is sent, call the function for receiving
@@ -409,16 +524,11 @@
 			 */
 			if (!(strncmp(df_m, "GET", 3) && strncmp(df_m, "HEAD", 4))) {
 				if (reopen)
-					sock = VSS_connect(adr_info);
+					sock = VSS_connect(addr_info);
 				reopen = 0;
 
-				if (debug) {
-					fprintf(stderr, "%s ", df_m);
-					fprintf(stderr, "%s ", df_Uq);
-					fprintf(stderr, "%s ", df_H);
-					fprintf(stderr, "\n");
-					fprintf(stderr, "Host: ");
-				}
+				thread_log(1, "%s %s %s\n", df_m, df_Uq, df_H);
+
 				write(sock, df_m, strlen(df_m));
 				write(sock, " ", 1);
 				write(sock, df_Uq, strlen(df_Uq));
@@ -432,16 +542,12 @@
 
 				write(sock, "Host: ", 6);
 				if (df_Host) {
-					if (debug)
-						fprintf(stderr, df_Host);
+					thread_log(1, "Host: %s\n", df_Host);
 					write(sock, df_Host, strlen(df_Host));
 				}
-				if (debug)
-					fprintf(stderr, "\n");
 				write(sock, "\r\n", 2);
 				if (df_c) {
-					if (debug)
-						fprintf(stderr, "Connection: %s\n", df_c);
+					thread_log(1, "Connection: %s\n", df_c);
 					write(sock, "Connection: ", 12);
 					write(sock, df_c, strlen(df_c));
 					write(sock, "\r\n", 2);
@@ -449,7 +555,7 @@
 						reopen = 0;
 				}
 				if (debug)
-					fprintf(stderr, "\n");
+					thread_log(0, "\n");
 				write(sock, "\r\n", 2);
 				if (!reopen)
 					reopen = receive_response(sock);
@@ -459,16 +565,15 @@
 		}
 
 		/* clean up */
-#define freez(x) do { if (x) free(x); x = NULL; } while (0);
+		freez(msg->ptr);
+		freez(msg);
 		freez(df_H);
 		freez(df_Host);
 		freez(df_Uq);
 		freez(df_m);
 		freez(df_c);
-#undef freez
 		bogus = 0;
-	} while (1);		
-
+	}
 	return (0);
 }
 
@@ -477,50 +582,25 @@
 gen_traffic(void *priv, enum shmlogtag tag, unsigned fd,
     unsigned len, unsigned spec, const char *ptr)
 {
+	struct thread *thr;
 	const char *end;
 	struct message *msg;
-	int err;
-	
+
 	(void)priv;
 
 	end = ptr + len;
 
-	if (!(spec & VSL_S_CLIENT))
+	if (fd == 0 || !(spec & VSL_S_CLIENT))
 		return (0);
 
-	//fprintf(stderr, "gen_traffic\n");
-
-	if (fd >= nthreads) {
-		struct thread **newthreads = threads;
-		size_t newnthreads = nthreads;
-
-		while (fd >= newnthreads)
-			newnthreads += newnthreads + 1;
-		newthreads = realloc(newthreads, newnthreads * sizeof *newthreads);
-		assert(newthreads != NULL);
-		memset(newthreads + nthreads, 0, (newnthreads - nthreads) * sizeof *newthreads);
-		threads = newthreads;
-		nthreads = newnthreads;
-	}
-	if (threads[fd] == NULL) {
-		threads[fd] = malloc(sizeof *threads[fd]);
-		assert(threads[fd] != NULL);
-		threads[fd]->mbox = malloc(sizeof (struct mailbox));
-		STAILQ_INIT(&threads[fd]->mbox->messages);
-		pthread_mutex_init(&threads[fd]->mbox->lock, NULL);
-		pthread_cond_init(&threads[fd]->mbox->has_mail, NULL);
-		err = pthread_create(&threads[fd]->thread_id, NULL, pthread_main, threads[fd]);
-		if (err)
-			fprintf(stderr, "thread creation failed\n");
-		fprintf(stderr, "Ok, thread %08x created... %d\n", (unsigned int)threads[fd]->thread_id, err);
-	}
+	thread_log(2, "%d %s\n", fd, VSL_tags[tag]);
+	thr = thread_get(fd, replay_thread);
 	msg = malloc(sizeof (struct message));
 	msg->tag = tag;
-	msg->ptr = strdup(ptr);
 	msg->len = len;
-	mailbox_put(threads[fd]->mbox, msg);
-	//fprintf(stderr, "message put\n");
-	
+	msg->ptr = strndup(ptr, len);
+	mailbox_put(&thr->mbox, msg);
+
 	return 0;
 }
 
@@ -536,14 +616,14 @@
 	char buf[2];
 	char last = ' ';
 	int sock, reopen = 1;
-	
-	adr_info = init_connection(address);
-	sock = VSS_connect(adr_info);
+
+	addr_info = init_connection(address);
+	sock = VSS_connect(addr_info);
 	while (read(fd, buf, 1)) {
 		write(sock, buf, 1);
-		fprintf(stderr, "%s", buf);
+		thread_log(0, "%s", buf);
 		if (*buf == '\n' && last == '\n'){
-			fprintf(stderr, "receive\n");
+			thread_log(0, "receive\n");
 			reopen = receive_response(sock);
 		}
 		last = *buf;
@@ -581,7 +661,7 @@
 			address = optarg;
 			break;
 		case 'D':
-			debug = 1;
+			++debug;
 			break;
 		case 't':
 			/* This option is for testing only. The test file must contain
@@ -609,10 +689,10 @@
 	if (VSL_OpenLog(vd, NULL))
 		exit(1);
 
-	adr_info = init_connection(address);
+	addr_info = init_connection(address);
 
 	while (VSL_Dispatch(vd, gen_traffic, NULL) == 0)
 		/* nothing */ ;
-
+	thread_close(0);
 	exit(0);
 }




More information about the varnish-commit mailing list