[master] db395f8 Implement socket barriers (shared between processes)

Dridi Boukelmoune dridi.boukelmoune at gmail.com
Tue Mar 29 14:20:05 CEST 2016


commit db395f8d7edeea6935310eedce55bc88160f7b48
Author: Dridi Boukelmoune <dridi.boukelmoune at gmail.com>
Date:   Wed Dec 23 17:29:24 2015 +0100

    Implement socket barriers (shared between processes)
    
    When varnishtest creates a socket barrier, it will bind a socket and
    listen to incoming connections. Once the number of expected connections
    is open, connections are closed.
    
    Barrier users only need to connect to the socket, read "nothing" and
    block until the connection is closed. It allows virtually any process
    to sync with varnishtest. The barrier will provide macros with its
    socket information.

diff --git a/bin/varnishtest/tests/a00013.vtc b/bin/varnishtest/tests/a00013.vtc
new file mode 100644
index 0000000..91bfce0
--- /dev/null
+++ b/bin/varnishtest/tests/a00013.vtc
@@ -0,0 +1,64 @@
+varnishtest "Barrier operations"
+
+# same as a00008.vtc, with socket barriers instead
+
+# bs -> server, bc -> client, bb -> both
+barrier bs sock 4
+barrier bc sock 4
+barrier bb sock 4 -cyclic
+
+server s1 {
+	rxreq
+	barrier bs sync
+	barrier bb sync
+	delay .9
+	txresp
+} -start
+
+server s2 {
+	rxreq
+	barrier bs sync
+	barrier bb sync
+	delay .6
+	txresp
+} -start
+
+server s3 {
+	rxreq
+	barrier bs sync
+	barrier bb sync
+	delay .2
+	txresp
+} -start
+
+client c1 -connect ${s1_sock} {
+	delay .2
+	txreq
+	rxresp
+	barrier bc sync
+	barrier bb sync
+} -start
+
+client c2 -connect ${s2_sock} {
+	delay .6
+	txreq
+	rxresp
+	barrier bc sync
+	barrier bb sync
+} -start
+
+client c3 -connect ${s3_sock} {
+	delay .9
+	txreq
+	rxresp
+	barrier bc sync
+	barrier bb sync
+} -start
+
+# Wait for all servers to have received requests
+barrier bs sync
+barrier bb sync
+
+# Wait for all clients to have received responses
+barrier bc sync
+barrier bb sync
diff --git a/bin/varnishtest/vtc_barrier.c b/bin/varnishtest/vtc_barrier.c
index ba80cfb..47ac4dc 100644
--- a/bin/varnishtest/vtc_barrier.c
+++ b/bin/varnishtest/vtc_barrier.c
@@ -28,13 +28,19 @@
 
 #include "config.h"
 
+#include <errno.h>
 #include <pthread.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
 
+#include <sys/select.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
 #include "vtc.h"
+#include "vtcp.h"
 
 enum barrier_e {
 	BARRIER_NONE = 0,
@@ -55,6 +61,9 @@ struct barrier {
 	unsigned		cyclic;
 
 	enum barrier_e		type;
+	/* fields below are only for BARRIER_SOCK */
+	pthread_t		thread;
+	volatile unsigned	active;
 };
 
 static pthread_mutex_t		barrier_mtx;
@@ -116,6 +125,108 @@ barrier_cond(struct barrier *b, const char *av, struct vtclog *vl)
 	b->type = BARRIER_COND;
 }
 
+static void *
+barrier_sock_thread(void *priv)
+{
+	struct barrier *b;
+	struct vtclog *vl;
+	struct timeval tmo;
+	const char *err;
+	char abuf[16], pbuf[6];
+	int i, sock, *conns;
+	fd_set rfds;
+
+	CAST_OBJ_NOTNULL(b, priv, BARRIER_MAGIC);
+	assert(b->type == BARRIER_SOCK);
+
+	AZ(pthread_mutex_lock(&b->mtx));
+
+	vl = vtc_logopen(b->name);
+	AN(vl);
+
+	sock = VTCP_listen_on("127.0.0.1:0", NULL, b->expected, &err);
+	if (sock < 0) {
+		pthread_cond_signal(&b->cond);
+		AZ(pthread_mutex_unlock(&b->mtx));
+		vtc_log(vl, 0, "Barrier(%s) %s fails: %s (errno=%d)",
+		    b->name, err, strerror(errno), errno);
+	}
+	assert(sock > 0);
+	(void)VTCP_nonblocking(sock);
+	VTCP_myname(sock, abuf, sizeof abuf, pbuf, sizeof pbuf);
+
+	macro_def(vl, b->name, "addr", "%s", abuf);
+	macro_def(vl, b->name, "port", "%s", pbuf);
+	macro_def(vl, b->name, "sock", "%s:%s", abuf, pbuf);
+
+	pthread_cond_signal(&b->cond);
+	AZ(pthread_mutex_unlock(&b->mtx));
+
+	conns = calloc(b->expected, sizeof *conns);
+	AN(conns);
+
+	while (b->active) {
+		FD_ZERO(&rfds);
+		FD_SET(sock, &rfds);
+
+		tmo.tv_sec = 1;
+		tmo.tv_usec = 0;
+		i = select(sock + 1, &rfds, NULL, NULL, &tmo);
+		if (i == 0)
+			continue;
+		if (i < 0) {
+			if (errno == EINTR)
+				continue;
+			AZ(close(sock));
+			vtc_log(vl, 0,
+			    "Barrier(%s) select fails: %s (errno=%d)",
+			    b->name, strerror(errno), errno);
+		}
+		assert(i == 1);
+		assert(b->waiters <= b->expected);
+		if (b->waiters == b->expected)
+			vtc_log(vl, 0,
+			    "Barrier(%s) use error: "
+			    "more waiters than the %u expected",
+			    b->name, b->expected);
+
+		i = accept(sock, NULL, NULL);
+		if (i < 0) {
+			AZ(close(sock));
+			vtc_log(vl, 0,
+			    "Barrier(%s) accept fails: %s (errno=%d)",
+			    b->name, strerror(errno), errno);
+		}
+
+		/* NB. We don't keep track of the established connections, only
+		 *     that connections were made to the barrier's socket.
+		 */
+		conns[b->waiters] = i;
+
+		if (++b->waiters < b->expected) {
+			vtc_log(vl, 4, "Barrier(%s) wait %u of %u",
+			    b->name, b->waiters, b->expected);
+			continue;
+		}
+
+		vtc_log(vl, 4, "Barrier(%s) wake %u", b->name, b->expected);
+		for (i = 0; i < b->expected; i++)
+			AZ(close(conns[i]));
+
+		if (b->cyclic)
+			b->waiters = 0;
+		else
+			b->active = 0;
+	}
+
+	macro_undef(vl, b->name, "addr");
+	macro_undef(vl, b->name, "port");
+	macro_undef(vl, b->name, "sock");
+	AZ(close(sock));
+
+	return (NULL);
+}
+
 static void
 barrier_sock(struct barrier *b, const char *av, struct vtclog *vl)
 {
@@ -123,7 +234,13 @@ barrier_sock(struct barrier *b, const char *av, struct vtclog *vl)
 	CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
 	barrier_expect(b, av, vl);
 	b->type = BARRIER_SOCK;
-	INCOMPL();
+	b->active = 1;
+
+	/* NB. We can use the BARRIER_COND's pthread_cond_t to wait until the
+	 *     socket is ready for convenience.
+	 */
+	AZ(pthread_create(&b->thread, NULL, barrier_sock_thread, b));
+	AZ(pthread_cond_wait(&b->cond, &b->mtx));
 }
 
 static void
@@ -175,6 +292,46 @@ barrier_cond_sync(struct barrier *b, struct vtclog *vl)
 }
 
 static void
+barrier_sock_sync(struct barrier *b, struct vtclog *vl)
+{
+	struct vsb *vsb;
+	const char *err;
+	char buf[32];
+	int i, sock;
+	ssize_t sz;
+
+	CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
+	assert(b->type == BARRIER_SOCK);
+
+	i = snprintf(buf, sizeof buf, "${%s_sock}", b->name);
+	assert(i > 0 && i < sizeof buf);
+	vsb = macro_expand(vl, buf);
+	vtc_log(vl, 4, "Barrier(%s) sync with socket", b->name);
+
+	sock = VTCP_open(VSB_data(vsb), NULL, 0., &err);
+	if (sock < 0)
+		vtc_log(vl, 0, "Barrier(%s) connection failed: %s",
+		    b->name, err);
+
+	VSB_delete(vsb);
+
+	/* emulate pthread_cond_wait's behavior */
+	AZ(pthread_mutex_unlock(&b->mtx));
+	sz = read(sock, buf, sizeof buf); /* XXX loop with timeout? */
+	AZ(pthread_mutex_lock(&b->mtx));
+
+	i = errno;
+	AZ(close(sock));
+
+	if (sz < 0)
+		vtc_log(vl, 0, "Barrier(%s) connection failed: %s (errno=%d)",
+		    b->name, strerror(i), i);
+	if (sz > 0)
+		vtc_log(vl, 0, "Barrier(%s) unexpected data (%ldB)",
+		    b->name, sz);
+}
+
+static void
 barrier_sync(struct barrier *b, struct vtclog *vl)
 {
 
@@ -188,7 +345,7 @@ barrier_sync(struct barrier *b, struct vtclog *vl)
 		barrier_cond_sync(b, vl);
 		break;
 	case BARRIER_SOCK:
-		INCOMPL();
+		barrier_sock_sync(b, vl);
 		break;
 	default:
 		WRONG("Wrong barrier type");
@@ -212,11 +369,20 @@ cmd_barrier(CMD_ARGS)
 		/* Reset and free */
 		VTAILQ_FOREACH_SAFE(b, &barriers, list, b2) {
 			AZ(pthread_mutex_lock(&b->mtx));
-			assert(b->type != BARRIER_NONE);
-			if (b->cyclic)
-				AZ(b->waiters);
-			else
-				assert(b->waiters == b->expected);
+			switch (b->type) {
+			case BARRIER_COND:
+				if (b->cyclic)
+					AZ(b->waiters);
+				else
+					assert(b->waiters == b->expected);
+				break;
+			case BARRIER_SOCK:
+				b->active = 0;
+				AZ(pthread_join(b->thread, NULL));
+				break;
+			default:
+				WRONG("Wrong barrier type");
+			}
 			AZ(pthread_mutex_unlock(&b->mtx));
 		}
 		AZ(pthread_mutex_unlock(&barrier_mtx));



More information about the varnish-commit mailing list