r2864 - trunk/varnish-cache/bin/varnishreplay

des at projects.linpro.no des at projects.linpro.no
Mon Jun 30 04:11:54 CEST 2008


Author: des
Date: 2008-06-30 04:11:54 +0200 (Mon, 30 Jun 2008)
New Revision: 2864

Modified:
   trunk/varnish-cache/bin/varnishreplay/varnishreplay.c
Log:
Old patch: greatly improve performance and memory usage by using per-thread
pre-allocated buffers instead of malloc().


Modified: trunk/varnish-cache/bin/varnishreplay/varnishreplay.c
===================================================================
--- trunk/varnish-cache/bin/varnishreplay/varnishreplay.c	2008-06-30 02:03:58 UTC (rev 2863)
+++ trunk/varnish-cache/bin/varnishreplay/varnishreplay.c	2008-06-30 02:11:54 UTC (rev 2864)
@@ -58,6 +58,37 @@
 static struct vss_addr *addr_info;
 static int debug;
 
+static int
+isprefix(const char *str, const char *prefix, const char **next)
+{
+
+	while (*str && *prefix &&
+	    tolower((int)*str) == tolower((int)*prefix))
+		++str, ++prefix;
+	if (*str && *str != ' ')
+		return (0);
+	if (next) {
+		while (*str && *str == ' ')
+			++str;
+		*next = str;
+	}
+	return (1);
+}
+
+#if 0
+static int
+isequal(const char *str, const char *reference, const char *end)
+{
+
+	while (str < end && *str && *reference &&
+	    tolower((int)*str) == tolower((int)*reference))
+		++str, ++reference;
+	if (str != end || *reference)
+		return (0);
+	return (1);
+}
+#endif
+
 /*
  * mailbox toolkit
  */
@@ -69,6 +100,8 @@
 	VSTAILQ_ENTRY(message) list;
 };
 
+#define MAX_MAILBOX_SIZE 30
+
 struct mailbox {
 	pthread_mutex_t lock;
 	pthread_cond_t has_mail;
@@ -136,11 +169,6 @@
  * thread toolkit
  */
 
-struct thread {
-	pthread_t thread_id;
-	struct mailbox mbox;
-};
-
 static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 static void
@@ -161,9 +189,56 @@
 	pthread_mutex_unlock(&log_mutex);
 }
 
+struct thread {
+	pthread_t thread_id;
+	struct mailbox mbox;
+
+	int sock;
+
+	int fd;			/* original fd from logs */
+
+	char *method;		/* Request method*/
+	char *proto;		/* Protocol version */
+	char *url;		/* URL and query string */
+	const char *conn;	/* Connection info (keep-alive, close) */
+	char *hdr[64];		/* Headers */
+	int nhdr;		/* Number of headers */
+	int bogus;		/* bogus request */
+
+	char arena[4096];
+	int top;
+	char line[2048];
+	char temp[2048];
+};
+
 static struct thread **threads;
 static size_t nthreads;
 
+/*
+ * Clear thread state
+ */
+static void
+thread_clear(struct thread *thr)
+{
+
+	thr->method = thr->proto = thr->url = NULL;
+	thr->conn = NULL;
+	memset(&thr->hdr, 0, sizeof thr->hdr);
+	thr->nhdr = 0;
+	thr->bogus = 0;
+	memset(&thr->arena, 0, sizeof thr->arena);
+	thr->top = 0;
+	memset(&thr->line, 0, sizeof thr->line);
+	memset(&thr->temp, 0, sizeof thr->temp);
+	if (thr->sock != -1)
+		close(thr->sock);
+	thr->sock = -1;
+}
+
+#define THREAD_FAIL ((struct thread *)-1)
+
+static pthread_attr_t thread_attr;
+
 static struct thread *
 thread_get(int fd, void *(*thread_main)(void *))
 {
@@ -185,16 +260,23 @@
 	if (threads[fd] == NULL) {
 		threads[fd] = malloc(sizeof *threads[fd]);
 		assert(threads[fd] != NULL);
+		threads[fd]->sock = -1;
+		thread_clear(threads[fd]);
 		mailbox_create(&threads[fd]->mbox);
-		if (pthread_create(&threads[fd]->thread_id, NULL,
+		if (pthread_create(&threads[fd]->thread_id, &thread_attr,
 		    thread_main, threads[fd]) != 0) {
 			thread_log(0, errno, "pthread_create()");
 			mailbox_destroy(&threads[fd]->mbox);
 			freez(threads[fd]);
+			threads[fd] = THREAD_FAIL;
+		} else {
+			threads[fd]->fd = fd;
+			thread_log(0, 0, "thread %p:%d started",
+			    (void *)threads[fd]->thread_id, fd);
 		}
-		thread_log(0, 0, "thread %p started",
-		    (void *)threads[fd]->thread_id);
 	}
+	if (threads[fd] == THREAD_FAIL)
+		return (NULL);
 	return (threads[fd]);
 }
 
@@ -202,74 +284,56 @@
 thread_close(int fd)
 {
 
-	assert(fd == 0 || fd < nthreads);
-	if (fd == 0) {
-		for (fd = 1; fd < nthreads; ++fd)
+	if (fd == -1) {
+		for (fd = 0; fd < nthreads; ++fd)
 			thread_close(fd);
 		return;
 	}
 
+	assert(fd < nthreads);
+
 	if (threads[fd] == NULL)
 		return;
 	mailbox_close(&threads[fd]->mbox);
 	pthread_join(threads[fd]->thread_id, NULL);
 	thread_log(0, 0, "thread %p stopped",
 	    (void *)threads[fd]->thread_id);
+	thread_clear(threads[fd]);
 	mailbox_destroy(&threads[fd]->mbox);
 	freez(threads[fd]);
 }
 
 /*
- * ...
+ * Allocate from thread arena
  */
-
-static int
-isprefix(const char *str, const char *prefix, const char *end, const char **next)
+static void *
+thread_alloc(struct thread *thr, size_t len)
 {
+	void *ptr;
 
-	while (str < end && *str && *prefix &&
-	    tolower((int)*str) == tolower((int)*prefix))
-		++str, ++prefix;
-	if (*str && *str != ' ')
-		return (0);
-	if (next) {
-		while (str < end && *str && *str == ' ')
-			++str;
-		*next = str;
-	}
-	return (1);
+	if (sizeof thr->arena - thr->top < len)
+		return (NULL);
+	ptr = thr->arena + thr->top;
+	thr->top += len;
+	return (ptr);
 }
 
-#if 0
-static int
-isequal(const char *str, const char *reference, const char *end)
-{
-
-	while (str < end && *str && *reference &&
-	    tolower((int)*str) == tolower((int)*reference))
-		++str, ++reference;
-	if (str != end || *reference)
-		return (0);
-	return (1);
-}
-#endif
-
 /*
  * Returns a copy of the entire string with leading and trailing spaces
  * trimmed.
  */
 static char *
-trimline(const char *str, const char *end)
+trimline(struct thread *thr, const char *str)
 {
 	size_t len;
 	char *p;
 
 	/* skip leading space */
-	while (str < end && *str && *str == ' ')
+	while (*str && *str == ' ')
 		++str;
 
 	/* seek to end of string */
-	for (len = 0; &str[len] < end && str[len]; ++len)
+	for (len = 0; str[len]; ++len)
 		 /* nothing */ ;
 
 	/* trim trailing space */
@@ -277,87 +341,45 @@
 		--len;
 
 	/* copy and return */
-	p = malloc(len + 1);
-	assert(p != NULL);
+	if ((p = thread_alloc(thr, len + 1)) == NULL)
+		return (NULL);
 	memcpy(p, str, len);
 	p[len] = '\0';
 	return (p);
 }
 
-/* Initiate a connection to <address> by resolving the
- * hostname and returning a struct with necessary
- * connection info.
- */
-static struct vss_addr *
-init_connection(const char *address)
-{
-	struct vss_addr **ta;
-	struct vss_addr *tap;
-	char *addr, *port;
-	int i, n;
-
-	if (VSS_parse(address, &addr, &port) != 0) {
-		thread_log(0, 0, "Invalid address");
-		exit(2);
-	}
-	n = VSS_resolve(addr, port, &ta);
-	free(addr);
-	free(port);
-	if (n == 0) {
-		thread_log(0, 0, "Could not connect to server");
-		exit(2);
-	}
-	for (i = 1; i < n; ++i) {
-		free(ta[i]);
-		ta[i] = NULL;
-	}
-	tap = ta[0];
-	free(ta);
-
-	return (tap);
-}
-
 /* Read a line from the socket and return the number of bytes read.
  * After returning, line will point to the read bytes in memory.
  * A line is terminated by \r\n
  */
 static int
-read_line(char **line, int sock)
+read_line(struct thread *thr)
 {
-	char *buf;
-	unsigned nbuf, lbuf;
-	int i;
+	int i, len;
 
-	lbuf = 4096;
-	buf = malloc(lbuf);
-	XXXAN(buf);
-	nbuf = 0;
+	len = 0;
 	while (1) {
-		if (nbuf + 2 >= lbuf) {
-			lbuf += lbuf;
-			buf = realloc(buf, lbuf);
-			XXXAN(buf);
+		if (len + 2 > sizeof thr->line) {
+			thread_log(0, 0, "overflow");
+			return (-1);
 		}
-		i = read(sock, buf + nbuf, 1);
+		i = read(thr->sock, thr->line + len, 1);
 		if (i < 0) {
 			thread_log(0, errno, "read(%d, %p, %d)",
-			    sock, buf + nbuf, 1);
-			free(buf);
+			    thr->sock, thr->line + len, 1);
 			return (-1);
 		}
-		if (i == 0) {
-			buf[nbuf] = '\0';
+		if (i == 0)
 			break;
-		}
-		nbuf += i;
-		if (nbuf >= 2 && buf[nbuf-2] == '\r' && buf[nbuf-1] == '\n') {
-			buf[nbuf-2] = '\0';
+		len += i;
+		if (len >= 2 && thr->line[len - 2] == '\r' &&
+		    thr->line[len - 1] == '\n') {
+			len -= 2;
 			break;
 		}
-
 	}
-	*line = buf;
-	return (nbuf - 2);
+	thr->line[len] = '\0';
+	return (len);
 }
 
 /* Read a block of data from the socket, and do nothing with it.
@@ -365,36 +387,31 @@
  * the number of bytes read.
  */
 static int
-read_block(int length, int sock)
+read_block(struct thread *thr, int len)
 {
-	char *buf;
-	int len, n, nbuf;
+	int n, r, tot;
 
-	buf = malloc(length);
-	nbuf = 0;
-	while (nbuf < length) {
-		len = 2048 < length - nbuf ? 2048 : length - nbuf;
-		n = read(sock, buf + nbuf, len);
-		if (n < 0) {
+	for (tot = 0; tot < len; tot += r) {
+		n = len - tot;
+		if (n > sizeof thr->temp)
+			n = sizeof thr->temp;
+		r = read(thr->sock, thr->temp, n);
+		if (r < 0) {
 			thread_log(0, errno, "read(%d, %p, %d)",
-			    sock, buf + nbuf, len);
-			nbuf = -1;
-			break;
+			    thr->sock, thr->temp, n);
+			return (-1);
 		}
-		if (n == 0)
+		if (r == 0)
 			break;
-		nbuf += n;
 	}
-	free(buf);
-	return (nbuf);
+	return (tot);
 }
 
 /* Receive the response after sending a request.
  */
 static int
-receive_response(int sock)
+receive_response(struct thread *thr)
 {
-	char *line, *end;
 	const char *next;
 	int line_len;
 	long chunk_length, content_length;
@@ -406,26 +423,22 @@
 
 	/* Read header */
 	for (;;) {
-		line_len = read_line(&line, sock);
+		line_len = read_line(thr);
 		if (line_len < 0)
 			return (-1);
-		thread_log(2, 0, "< %.*s", line_len, line);
-		end = line + line_len;
-		if (line_len == 0) {
-			freez(line);
+		thread_log(2, 0, "< %.*s", line_len, thr->line);
+		if (line_len == 0)
 			break;
-		}
-		if (strncmp(line, "HTTP", 4) == 0) {
-			sscanf(line, "%*s %d %*s\r\n", &status);
+		if (strncmp(thr->line, "HTTP", 4) == 0) {
+			sscanf(thr->line, "%*s %d %*s\r\n", &status);
 			failed = (status != 200);
-		} else if (isprefix(line, "content-length:", end, &next)) {
+		} else if (isprefix(thr->line, "content-length:", &next)) {
 			content_length = strtol(next, NULL, 10);
-		} else if (isprefix(line, "transfer-encoding:", end, &next)) {
+		} else if (isprefix(thr->line, "transfer-encoding:", &next)) {
 			chunked = (strcasecmp(next, "chunked") == 0);
-		} else if (isprefix(line, "connection:", end, &next)) {
+		} else if (isprefix(thr->line, "connection:", &next)) {
 			connclose = (strcasecmp(next, "close") == 0);
 		}
-		freez(line);
 	}
 
 	thread_log(1, 0, "status: %d", status);
@@ -435,34 +448,33 @@
 		/* Chunked encoding, read size and bytes until no more */
 		thread_log(1, 0, "chunked encoding");
 		for (;;) {
-			if ((line_len = read_line(&line, sock)) < 0)
+			line_len = read_line(thr);
+			if (line_len < 0)
 				return (-1);
-			end = line + line_len;
 			/* read_line() guarantees null-termination */
-			chunk_length = strtol(line, NULL, 16);
-			freez(line);
+			chunk_length = strtol(thr->line, NULL, 16);
 			if (chunk_length == 0)
 				break;
-			if ((n = read_block(chunk_length, sock)) < 0)
+			if ((n = read_block(thr, chunk_length)) < 0)
 				return (-1);
 			if (n < chunk_length)
 				thread_log(0, 0, "short read: %d/%ld",
 				    n, chunk_length);
 			thread_log(1, 0, "chunk length: %ld", chunk_length);
 			thread_log(1, 0, "bytes read: %d", n);
-			/* trainling CR LF */
-			if ((n = read_line(&line, sock)) < 0)
+			/* trailing CR LF */
+			if ((n = read_line(thr)) < 0)
 				return (-1);
-			freez(line);
 		}
 		/* trailing CR LF */
-		n = read_line(&line, sock);
-		freez(line);
+		n = read_line(thr);
+		if (n < 0)
+			return (-1);
 	} else if (content_length > 0) {
 		/* Fixed body size, read content_length bytes */
 		thread_log(1, 0, "fixed length");
 		thread_log(1, 0, "content length: %ld", content_length);
-		if ((n = read_block(content_length, sock)) < 0)
+		if ((n = read_block(thr, content_length)) < 0)
 			return (1);
 		if (n < content_length)
 			thread_log(0, 0, "short read: %d/%ld",
@@ -487,62 +499,49 @@
 	enum shmlogtag tag;
 	size_t len;
 	char *ptr;
-	const char *end, *next;
+	const char *next;
 
-	char *df_method = NULL;			/* Request method*/
-	char *df_proto = NULL;			/* Protocol version */
-	char *df_url = NULL;			/* URL and query string */
-	char *df_conn = NULL;			/* Connection info (keep-alive, close) */
-	char **df_hdr = NULL;			/* Headers */
-	size_t df_hdrsz = 0;			/* Size of df_hdr */
-	int df_nhdr = 0;			/* Number of headers */
-	int bogus = 0;				/* bogus request */
 	int i;
 
-	int sock = -1, reopen = 1;
+	int reopen = 1;
 
-	df_hdrsz = 16;
-	df_hdr = malloc(df_hdrsz * sizeof *df_hdr);
-
 	while ((msg = mailbox_get(&thr->mbox)) != NULL) {
 		tag = msg->tag;
 		len = msg->len;
 		ptr = msg->ptr;
-		end = ptr + len;
 
 		thread_log(2, 0, "%s(%s)", VSL_tags[tag], msg->ptr);
 
 		switch (tag) {
 		case SLT_RxRequest:
-			if (df_method != NULL)
-				bogus = 1;
+			if (thr->method != NULL)
+				thr->bogus = 1;
 			else
-				df_method = trimline(ptr, end);
+				thr->method = trimline(thr, ptr);
 			break;
 
 		case SLT_RxURL:
-			if (df_url != NULL)
-				bogus = 1;
+			if (thr->url != NULL)
+				thr->bogus = 1;
 			else
-				df_url = trimline(ptr, end);
+				thr->url = trimline(thr, ptr);
 			break;
 
 		case SLT_RxProtocol:
-			if (df_proto != NULL)
-				bogus = 1;
+			if (thr->proto != NULL)
+				thr->bogus = 1;
 			else
-				df_proto = trimline(ptr, end);
+				thr->proto = trimline(thr, ptr);
 			break;
 
 		case SLT_RxHeader:
-			while (df_hdrsz <= df_nhdr) {
-				df_hdrsz *= 2;
-				df_hdr = realloc(df_hdr, df_hdrsz * sizeof *df_hdr);
-				XXXAN(df_hdr);
+			if (thr->nhdr >= sizeof thr->hdr / sizeof *thr->hdr) {
+				thr->bogus = 1;
+			} else {
+				thr->hdr[thr->nhdr++] = trimline(thr, ptr);
+				if (isprefix(ptr, "connection:", &next))
+					thr->conn = trimline(thr, next);
 			}
-			df_hdr[df_nhdr++] = trimline(ptr, end);
-			if (isprefix(ptr, "connection:", end, &next))
-				df_conn = trimline(next, end);
 			break;
 
 		default:
@@ -555,89 +554,81 @@
 		if (tag != SLT_ReqEnd)
 			continue;
 
-		if (!df_method || !df_url || !df_proto) {
-			bogus = 1;
-		} else if (strcmp(df_method, "GET") != 0 && strcmp(df_method, "HEAD") != 0) {
-			bogus = 1;
-		} else if (strcmp(df_proto, "HTTP/1.0") == 0) {
-			reopen = !(df_conn && strcasecmp(df_conn, "keep-alive") == 0);
-		} else if (strcmp(df_proto, "HTTP/1.1") == 0) {
-			reopen = (df_conn && strcasecmp(df_conn, "close") == 0);
+		if (!thr->method || !thr->url || !thr->proto) {
+			thr->bogus = 1;
+		} else if (strcmp(thr->method, "GET") != 0 && strcmp(thr->method, "HEAD") != 0) {
+			thr->bogus = 1;
+		} else if (strcmp(thr->proto, "HTTP/1.0") == 0) {
+			reopen = !(thr->conn && strcasecmp(thr->conn, "keep-alive") == 0);
+		} else if (strcmp(thr->proto, "HTTP/1.1") == 0) {
+			reopen = (thr->conn && strcasecmp(thr->conn, "close") == 0);
 		} else {
-			bogus = 1;
+			thr->bogus = 1;
 		}
 
-		if (bogus) {
+		if (thr->bogus) {
 			thread_log(1, 0, "bogus");
-		} else {
-			if (sock == -1) {
-				thread_log(1, 0, "open");
-				sock = VSS_connect(addr_info);
-				assert(sock != -1);
+			goto clear;
+		}
+
+		if (thr->sock == -1) {
+			for (;;) {
+				thread_log(1, 0, "sleeping before connect...");
+				usleep(1000 * (thr->fd % 3001));
+				if ((thr->sock = VSS_connect(addr_info)) >= 0)
+					break;
+				thread_log(0, errno, "connect failed");
 			}
+		}
 
-			thread_log(1, 0, "%s %s %s", df_method, df_url, df_proto);
+		thread_log(1, 0, "%s %s %s", thr->method, thr->url, thr->proto);
 
-			iov[0].iov_base = df_method;
-			iov[0].iov_len = strlen(df_method);
-			iov[2].iov_base = df_url;
-			iov[2].iov_len = strlen(df_url);
-			iov[4].iov_base = df_proto;
-			iov[4].iov_len = strlen(df_proto);
-			iov[1].iov_base = iov[3].iov_base = space;
-			iov[1].iov_len = iov[3].iov_len = 1;
-			iov[5].iov_base = crlf;
-			iov[5].iov_len = 2;
-			if (writev(sock, iov, 6) == -1) {
-				thread_log(0, errno, "writev()");
-				goto close;
-			}
+		iov[0].iov_base = thr->method;
+		iov[0].iov_len = strlen(thr->method);
+		iov[2].iov_base = thr->url;
+		iov[2].iov_len = strlen(thr->url);
+		iov[4].iov_base = thr->proto;
+		iov[4].iov_len = strlen(thr->proto);
+		iov[1].iov_base = iov[3].iov_base = space;
+		iov[1].iov_len = iov[3].iov_len = 1;
+		iov[5].iov_base = crlf;
+		iov[5].iov_len = 2;
+		if (writev(thr->sock, iov, 6) == -1) {
+			thread_log(0, errno, "writev()");
+			goto close;
+		}
 
-			for (i = 0; i < df_nhdr; ++i) {
-				thread_log(2, 0, "%d %s", i, df_hdr[i]);
-				iov[0].iov_base = df_hdr[i];
-				iov[0].iov_len = strlen(df_hdr[i]);
-				iov[1].iov_base = crlf;
-				iov[1].iov_len = 2;
-				if (writev(sock, iov, 2) == -1) {
-					thread_log(0, errno, "writev()");
-					goto close;
-				}
-			}
-			if (write(sock, crlf, 2) == -1) {
+		for (i = 0; i < thr->nhdr; ++i) {
+			thread_log(2, 0, "%d %s", i, thr->hdr[i]);
+			iov[0].iov_base = thr->hdr[i];
+			iov[0].iov_len = strlen(thr->hdr[i]);
+			iov[1].iov_base = crlf;
+			iov[1].iov_len = 2;
+			if (writev(thr->sock, iov, 2) == -1) {
 				thread_log(0, errno, "writev()");
 				goto close;
 			}
-			if (receive_response(sock) || reopen) {
+		}
+		if (write(thr->sock, crlf, 2) == -1) {
+			thread_log(0, errno, "writev()");
+			goto close;
+		}
+		if (receive_response(thr) || reopen) {
 close:
-				thread_log(1, 0, "close");
-				close(sock);
-				sock = -1;
-			}
+			thread_log(1, 0, "close");
+			assert(thr->sock != -1);
+			close(thr->sock);
+			thr->sock = -1;
 		}
 
+		sleep(1);
+clear:
 		/* clean up */
-		freez(df_method);
-		freez(df_url);
-		freez(df_proto);
-		freez(df_conn);
-		while (df_nhdr) {
-			--df_nhdr;
-			freez(df_hdr[df_nhdr]);
-		}
-		bogus = 0;
+		thread_clear(thr);
 	}
 
 	/* leftovers */
-	freez(df_method);
-	freez(df_url);
-	freez(df_proto);
-	freez(df_conn);
-	while (df_nhdr) {
-		--df_nhdr;
-		freez(df_hdr[df_nhdr]);
-	}
-	freez(df_hdr);
+	thread_clear(thr);
 
 	return (0);
 }
@@ -659,6 +650,8 @@
 
 	thread_log(3, 0, "%d %s", fd, VSL_tags[tag]);
 	thr = thread_get(fd, replay_thread);
+	if (thr == NULL)
+		return (0);
 	msg = malloc(sizeof (struct message));
 	msg->tag = tag;
 	msg->len = len;
@@ -668,6 +661,39 @@
 	return (0);
 }
 
+/* Initiate a connection to <address> by resolving the
+ * hostname and returning a struct with necessary
+ * connection info.
+ */
+static struct vss_addr *
+init_connection(const char *address)
+{
+	struct vss_addr **ta;
+	struct vss_addr *tap;
+	char *addr, *port;
+	int i, n;
+
+	if (VSS_parse(address, &addr, &port) != 0) {
+		thread_log(0, 0, "Invalid address");
+		exit(2);
+	}
+	n = VSS_resolve(addr, port, &ta);
+	free(addr);
+	free(port);
+	if (n == 0) {
+		thread_log(0, 0, "Could not connect to server");
+		exit(2);
+	}
+	for (i = 1; i < n; ++i) {
+		free(ta[i]);
+		ta[i] = NULL;
+	}
+	tap = ta[0];
+	free(ta);
+
+	return (tap);
+}
+
 /*--------------------------------------------------------------------*/
 
 static void
@@ -715,8 +741,11 @@
 
 	signal(SIGPIPE, SIG_IGN);
 
+	pthread_attr_init(&thread_attr);
+	pthread_attr_setstacksize(&thread_attr, 16384);
+
 	while (VSL_Dispatch(vd, gen_traffic, NULL) == 0)
 		/* nothing */ ;
-	thread_close(0);
+	thread_close(-1);
 	exit(0);
 }




More information about the varnish-commit mailing list