[master] b1dd667 Add a new mode to server instance 'b0', where it acts like "a normal webserver" and dispatches a thread for each incomming connection.

Poul-Henning Kamp phk at FreeBSD.org
Mon May 18 21:12:05 CEST 2015


commit b1dd6679db4f534ba3cd743fb0662965682ab87a
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date:   Mon May 18 19:07:52 2015 +0000

    Add a new mode to server instance 'b0', where it acts like "a normal
    webserver" and dispatches a thread for each incomming connection.
    
    This is an example use:
    
    	server s0 {
    		loop 10 {
    			rxreq
    			txresp -body "foo1"
    		}
    		rxreq
    		txresp -hdr "Connection: close" -body "foo1"
    		expect_close
    	} -dispatch
    
    Each connection will spawn a dynamically created s%d instance
    starting with s1, s2 ...
    
    Each of these will respond to 11 requests on the accepted connection,
    the last response will have "Connection: close" and they will
    expect the other end (varnish) to do that.
    
    The main limitation on using this for bulk traffic tests is the
    finite and limited size of the vtc_log which varnishtest will
    collect (half a megabyte).
    
    Test b00048 is about as much traffic as will fit in the vtc_log.

diff --git a/bin/varnishtest/tests/b00048.vtc b/bin/varnishtest/tests/b00048.vtc
new file mode 100644
index 0000000..ab9089c
--- /dev/null
+++ b/bin/varnishtest/tests/b00048.vtc
@@ -0,0 +1,49 @@
+varnishtest "Run a lot of transactions through"
+
+server s0 {
+	loop 10 {
+		rxreq
+		txresp -body "foo1"
+	}
+	rxreq
+	txresp -hdr "Connection: close" -body "foo1"
+	expect_close
+} -dispatch
+
+varnish v1 -arg "-p waiter=poll" -vcl+backend {
+	sub vcl_recv {
+		return (pass);
+	}
+	sub vcl_backend_fetch {
+		set bereq.backend = s0;
+	}
+
+} -start
+
+client c1 {
+	loop 20 {
+		txreq -url /c1
+		rxresp
+		expect resp.status == 200
+	}
+} -start
+
+client c2 {
+	loop 20 {
+		txreq -url /c2
+		rxresp
+		expect resp.status == 200
+	}
+} -start
+
+client c3 {
+	loop 20 {
+		txreq -url /c3
+		rxresp
+		expect resp.status == 200
+	}
+} -start
+
+client c1 -wait
+client c2 -wait
+client c3 -wait
diff --git a/bin/varnishtest/vtc.c b/bin/varnishtest/vtc.c
index 4b40c90..9053e73 100644
--- a/bin/varnishtest/vtc.c
+++ b/bin/varnishtest/vtc.c
@@ -643,6 +643,7 @@ exec_file(const char *fn, const char *script, const char *tmpdir,
 
 	init_macro();
 	init_sema();
+	init_server();
 
 	/* Apply extmacro definitions */
 	VTAILQ_FOREACH(m, &extmacro_list, list)
diff --git a/bin/varnishtest/vtc.h b/bin/varnishtest/vtc.h
index 7837052..e27c056 100644
--- a/bin/varnishtest/vtc.h
+++ b/bin/varnishtest/vtc.h
@@ -71,6 +71,7 @@ extern int iflg;
 extern unsigned vtc_maxdur;
 
 void init_sema(void);
+void init_server(void);
 
 int http_process(struct vtclog *vl, const char *spec, int sock, int *sfd);
 
diff --git a/bin/varnishtest/vtc_server.c b/bin/varnishtest/vtc_server.c
index 8271368..73e50ed 100644
--- a/bin/varnishtest/vtc_server.c
+++ b/bin/varnishtest/vtc_server.c
@@ -54,6 +54,7 @@ struct server {
 
 	int			depth;
 	int			sock;
+	int			fd;
 	char			listen[256];
 	char			aaddr[32];
 	char			aport[32];
@@ -61,10 +62,85 @@ struct server {
 	pthread_t		tp;
 };
 
+static pthread_mutex_t		server_mtx;
+
 static VTAILQ_HEAD(, server)	servers =
     VTAILQ_HEAD_INITIALIZER(servers);
 
 /**********************************************************************
+ * Allocate and initialize a server
+ */
+
+static struct server *
+server_new(const char *name)
+{
+	struct server *s;
+
+	AN(name);
+	ALLOC_OBJ(s, SERVER_MAGIC);
+	AN(s);
+	REPLACE(s->name, name);
+	s->vl = vtc_logopen(s->name);
+	AN(s->vl);
+
+	bprintf(s->listen, "%s", "127.0.0.1 0");
+	s->repeat = 1;
+	s->depth = 10;
+	s->sock = -1;
+	s->fd = -1;
+	AZ(pthread_mutex_lock(&server_mtx));
+	VTAILQ_INSERT_TAIL(&servers, s, list);
+	AZ(pthread_mutex_unlock(&server_mtx));
+	return (s);
+}
+
+/**********************************************************************
+ * Clean up a server
+ */
+
+static void
+server_delete(struct server *s)
+{
+
+	CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
+	macro_undef(s->vl, s->name, "addr");
+	macro_undef(s->vl, s->name, "port");
+	macro_undef(s->vl, s->name, "sock");
+	vtc_logclose(s->vl);
+	free(s->name);
+	/* XXX: MEMLEAK (?) (VSS ??) */
+	FREE_OBJ(s);
+}
+
+/**********************************************************************
+ * Server listen
+ */
+
+static void
+server_listen(struct server *s)
+{
+	const char *err;
+
+	CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
+
+	if (s->sock >= 0)
+		VTCP_close(&s->sock);
+	s->sock = VTCP_listen_on(s->listen, "0", s->depth, &err);
+	if (err != NULL)
+		vtc_log(s->vl, 0,
+		    "Server listen address (%s) cannot be resolved: %s",
+		    s->listen, err);
+	assert(s->sock > 0);
+	VTCP_myname(s->sock, s->aaddr, sizeof s->aaddr,
+	    s->aport, sizeof s->aport);
+	macro_def(s->vl, s->name, "addr", "%s", s->aaddr);
+	macro_def(s->vl, s->name, "port", "%s", s->aport);
+	macro_def(s->vl, s->name, "sock", "%s %s", s->aaddr, s->aport);
+	/* Record the actual port, and reuse it on subsequent starts */
+	bprintf(s->listen, "%s %s", s->aaddr, s->aport);
+}
+
+/**********************************************************************
  * Server thread
  */
 
@@ -83,7 +159,7 @@ server_thread(void *priv)
 
 	vl = vtc_logopen(s->name);
 
-	vtc_log(vl, 2, "Started on %s %s", s->aaddr, s->aport);
+	vtc_log(vl, 2, "Started on %s", s->listen);
 	for (i = 0; i < s->repeat; i++) {
 		if (s->repeat > 1)
 			vtc_log(vl, 3, "Iteration %d", i);
@@ -104,80 +180,92 @@ server_thread(void *priv)
 	return (NULL);
 }
 
+
 /**********************************************************************
- * Allocate and initialize a server
+ * Start the server thread
  */
 
-static struct server *
-server_new(const char *name)
+static void
+server_start(struct server *s)
 {
-	struct server *s;
-
-	AN(name);
-	ALLOC_OBJ(s, SERVER_MAGIC);
-	AN(s);
-	REPLACE(s->name, name);
-	s->vl = vtc_logopen(name);
-	AN(s->vl);
-	if (*s->name != 's')
-		vtc_log(s->vl, 0, "Server name must start with 's'");
-
-	bprintf(s->listen, "%s", "127.0.0.1 0");
-	s->repeat = 1;
-	s->depth = 10;
-	s->sock = -1;
-	VTAILQ_INSERT_TAIL(&servers, s, list);
-	return (s);
+	CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
+	vtc_log(s->vl, 2, "Starting server");
+	server_listen(s);
+	vtc_log(s->vl, 1, "Listen on %s", s->listen);
+	s->run = 1;
+	AZ(pthread_create(&s->tp, NULL, server_thread, s));
 }
 
 /**********************************************************************
- * Clean up a server
  */
 
-static void
-server_delete(struct server *s)
+static void *
+server_dispatch_wrk(void *priv)
 {
+	struct server *s;
+	struct vtclog *vl;
+	int j, fd;
 
-	CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
-	macro_undef(s->vl, s->name, "addr");
-	macro_undef(s->vl, s->name, "port");
-	macro_undef(s->vl, s->name, "sock");
-	vtc_logclose(s->vl);
-	free(s->name);
-	/* XXX: MEMLEAK (?) (VSS ??) */
-	FREE_OBJ(s);
+	CAST_OBJ_NOTNULL(s, priv, SERVER_MAGIC);
+	assert(s->sock < 0);
+
+	vl = vtc_logopen(s->name);
+
+	fd = s->fd;
+
+	vtc_log(vl, 3, "start with fd %d", fd);
+	fd = http_process(vl, s->spec, fd, &s->sock);
+	vtc_log(vl, 3, "shutting fd %d", fd);
+	j = shutdown(fd, SHUT_WR);
+	if (!VTCP_Check(j))
+		vtc_log(vl, 0, "Shutdown failed: %s", strerror(errno));
+	VTCP_close(&s->fd);
+	vtc_log(vl, 2, "Ending");
+	return (NULL);
 }
 
-/**********************************************************************
- * Start the server thread
- */
+static void *
+server_dispatch_thread(void *priv)
+{
+	struct server *s, *s2;
+	int sn = 1, fd;
+	char snbuf[8];
+	struct vtclog *vl;
+	struct sockaddr_storage addr_s;
+	struct sockaddr *addr;
+	socklen_t l;
+
+	CAST_OBJ_NOTNULL(s, priv, SERVER_MAGIC);
+	assert(s->sock >= 0);
+
+	vl = vtc_logopen(s->name);
+	vtc_log(vl, 2, "Dispatch started on %s", s->listen);
+
+	while (1) {
+		addr = (void*)&addr_s;
+		l = sizeof addr_s;
+		fd = accept(s->sock, addr, &l);
+		if (fd < 0)
+			vtc_log(vl, 0, "Accepted failed: %s", strerror(errno));
+		bprintf(snbuf, "s%d", sn++);
+		vtc_log(vl, 3, "dispatch fd %d -> %s", fd, snbuf);
+		s2 = server_new(snbuf);
+		s2->spec = s->spec;
+		strcpy(s2->listen, s->listen);
+		s2->fd = fd;
+		s2->run = 1;
+		AZ(pthread_create(&s2->tp, NULL, server_dispatch_wrk, s2));
+	}
+}
 
 static void
-server_start(struct server *s)
+server_dispatch(struct server *s)
 {
-	const char *err;
-
 	CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
-	vtc_log(s->vl, 2, "Starting server");
-	if (s->sock < 0) {
-		s->sock = VTCP_listen_on(s->listen, "0", s->depth, &err);
-		if (err != NULL)
-			vtc_log(s->vl, 0,
-			    "Server listen address (%s) cannot be resolved: %s",
-			    s->listen, err);
-		assert(s->sock > 0);
-		VTCP_myname(s->sock, s->aaddr, sizeof s->aaddr,
-		    s->aport, sizeof s->aport);
-		macro_def(s->vl, s->name, "addr", "%s", s->aaddr);
-		macro_def(s->vl, s->name, "port", "%s", s->aport);
-		macro_def(s->vl, s->name, "sock", "%s %s", s->aaddr, s->aport);
-
-		/* Record the actual port, and reuse it on subsequent starts */
-		bprintf(s->listen, "%s %s", s->aaddr, s->aport);
-	}
-	vtc_log(s->vl, 1, "Listen on %s", s->listen);
+	server_listen(s);
+	vtc_log(s->vl, 2, "Starting dispatch server");
 	s->run = 1;
-	AZ(pthread_create(&s->tp, NULL, server_thread, s));
+	AZ(pthread_create(&s->tp, NULL, server_dispatch_thread, s));
 }
 
 /**********************************************************************
@@ -207,7 +295,7 @@ server_wait(struct server *s)
 	void *res;
 
 	CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
-	vtc_log(s->vl, 2, "Waiting for server");
+	vtc_log(s->vl, 2, "Waiting for server (%d/%d)", s->sock, s->fd);
 	AZ(pthread_join(s->tp, &res));
 	if (res != NULL && !vtc_stop)
 		vtc_log(s->vl, 0, "Server returned \"%p\"",
@@ -225,11 +313,13 @@ cmd_server_genvcl(struct vsb *vsb)
 {
 	struct server *s;
 
+	AZ(pthread_mutex_lock(&server_mtx));
 	VTAILQ_FOREACH(s, &servers, list) {
 		VSB_printf(vsb,
 		    "backend %s { .host = \"%s\"; .port = \"%s\"; }\n",
 		    s->name, s->aaddr, s->aport);
 	}
+	AZ(pthread_mutex_unlock(&server_mtx));
 }
 
 
@@ -240,7 +330,7 @@ cmd_server_genvcl(struct vsb *vsb)
 void
 cmd_server(CMD_ARGS)
 {
-	struct server *s, *s2;
+	struct server *s;
 
 	(void)priv;
 	(void)cmd;
@@ -248,9 +338,15 @@ cmd_server(CMD_ARGS)
 
 	if (av == NULL) {
 		/* Reset and free */
-		VTAILQ_FOREACH_SAFE(s, &servers, list, s2) {
-			CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
-			VTAILQ_REMOVE(&servers, s, list);
+		while (1) {
+			AZ(pthread_mutex_lock(&server_mtx));
+			s = VTAILQ_FIRST(&servers);
+			CHECK_OBJ_ORNULL(s, SERVER_MAGIC);
+			if (s != NULL)
+				VTAILQ_REMOVE(&servers, s, list);
+			AZ(pthread_mutex_unlock(&server_mtx));
+			if (s == NULL)
+				break;
 			if (s->run) {
 				(void)pthread_cancel(s->tp);
 				server_wait(s);
@@ -265,9 +361,17 @@ cmd_server(CMD_ARGS)
 	AZ(strcmp(av[0], "server"));
 	av++;
 
+	if (*av[0] != 's') {
+		fprintf(stderr, "Server name must start with 's' (is: %s)\n",
+		    av[0]);
+		exit(1);
+	}
+
+	AZ(pthread_mutex_lock(&server_mtx));
 	VTAILQ_FOREACH(s, &servers, list)
 		if (!strcmp(s->name, av[0]))
 			break;
+	AZ(pthread_mutex_unlock(&server_mtx));
 	if (s == NULL)
 		s = server_new(av[0]);
 	CHECK_OBJ_NOTNULL(s, SERVER_MAGIC);
@@ -312,8 +416,23 @@ cmd_server(CMD_ARGS)
 			server_start(s);
 			continue;
 		}
+		if (!strcmp(*av, "-dispatch")) {
+			if (strcmp(s->name, "s0")) {
+				fprintf(stderr,
+				    "server -parallel only works on s0\n");
+				exit(1);
+			}
+			server_dispatch(s);
+			continue;
+		}
 		if (**av == '-')
 			vtc_log(s->vl, 0, "Unknown server argument: %s", *av);
 		s->spec = *av;
 	}
 }
+
+void
+init_server(void)
+{
+	AZ(pthread_mutex_init(&server_mtx, NULL));
+}



More information about the varnish-commit mailing list