[master] e1e52264a varnishtest: New tunnel command

Dridi Boukelmoune dridi.boukelmoune at gmail.com
Fri May 28 08:59:04 UTC 2021


commit e1e52264a3638bb3cea3a1551966e0702eefb351
Author: Dridi Boukelmoune <dridi.boukelmoune at gmail.com>
Date:   Fri Jul 31 18:14:26 2020 +0200

    varnishtest: New tunnel command
    
    The tunnel acts like a client and a server and forwards bytes between
    two parties. It can then pause in the middle of a session and control
    how much data can go in either direction. Combined with barriers it can
    be used to trigger socket timeouts, possibly in the middle of protocol
    frames.
    
    A tunnel works with two threads: one to execute its spec and one to poll
    both parties and transfer data accordingly. The two threads synchronize
    via basic pthread primitives and follow a very simplistic state machine:
    
        ACCEPT -> RUNNING <-> PAUSED -> SPEC DONE -> POLL DONE -> STOPPED
    
    Despite being simplistic, the coordination of two threads would likely
    complicate the implementation of a `-repeat` action, but it is still
    possible to `start` a tunnel again as shown in the c106 test case that
    exercises basic coverage involving a varnish instance.
    
    Usage is documented in the vtc(7) manual.

diff --git a/bin/varnishtest/Makefile.am b/bin/varnishtest/Makefile.am
index 30cea03ad..fae47f4a5 100644
--- a/bin/varnishtest/Makefile.am
+++ b/bin/varnishtest/Makefile.am
@@ -48,6 +48,7 @@ varnishtest_SOURCES = \
 		vtc_sess.c \
 		vtc_subr.c \
 		vtc_syslog.c \
+		vtc_tunnel.c \
 		vtc_varnish.c
 
 varnishtest_LDADD = \
diff --git a/bin/varnishtest/cmds.h b/bin/varnishtest/cmds.h
index 17a64408c..dccdcf5bb 100644
--- a/bin/varnishtest/cmds.h
+++ b/bin/varnishtest/cmds.h
@@ -53,6 +53,7 @@ CMD_TOP(process)
 CMD_TOP(server)
 CMD_TOP(setenv)
 CMD_TOP(syslog)
+CMD_TOP(tunnel)
 #ifdef VTEST_WITH_VTC_VARNISH
 CMD_TOP(varnish)
 #endif
diff --git a/bin/varnishtest/tests/a00021.vtc b/bin/varnishtest/tests/a00021.vtc
new file mode 100644
index 000000000..c7c175e79
--- /dev/null
+++ b/bin/varnishtest/tests/a00021.vtc
@@ -0,0 +1,32 @@
+varnishtest "tunnel basic coverage"
+
+barrier b1 cond 2
+barrier b2 cond 2
+barrier b3 cond 2
+
+server s1 {
+	rxreq
+	barrier b2 sync
+	barrier b3 sync
+	txresp
+} -start
+
+tunnel t1 -connect "${s1_sock}" {
+	pause
+	barrier b1 sync
+	send 10
+	resume
+	barrier b2 sync
+	pause
+	barrier b3 sync
+	recv 10
+	resume
+} -start
+
+client c1 -connect "${t1_sock}" {
+	barrier b1 sync
+	txreq
+	rxresp
+} -run
+
+tunnel t1 -wait
diff --git a/bin/varnishtest/tests/c00106.vtc b/bin/varnishtest/tests/c00106.vtc
new file mode 100644
index 000000000..0461e04f4
--- /dev/null
+++ b/bin/varnishtest/tests/c00106.vtc
@@ -0,0 +1,49 @@
+varnishtest "tunnel basic coverage"
+
+barrier b1 cond 2 -cyclic
+barrier b2 sock 2 -cyclic
+barrier b3 sock 2 -cyclic
+
+server s1 {
+	rxreq
+	txresp
+} -start
+
+varnish v1 -vcl+backend {
+	import vtc;
+	sub vcl_recv {
+		vtc.barrier_sync("${b2_sock}");
+		vtc.barrier_sync("${b3_sock}");
+	}
+} -start
+
+tunnel t1 {
+	pause
+	barrier b1 sync
+	send 10
+	delay 0.1
+	send 15
+	resume
+	barrier b2 sync
+	pause
+	barrier b3 sync
+	recv 10
+	delay 0.1
+	recv 15
+	# automatic resumption here
+} -start
+
+client c1 -connect "${t1_sock}" {
+	barrier b1 sync
+	txreq
+	rxresp
+	expect resp.status == 200
+} -run
+
+tunnel t1 -wait
+
+# Same scenario, but wait for c1 _after_ t1
+tunnel t1 -start
+client c1 -start
+tunnel t1 -wait
+client c1 -wait
diff --git a/bin/varnishtest/vtc.c b/bin/varnishtest/vtc.c
index f6d696471..7319d4d43 100644
--- a/bin/varnishtest/vtc.c
+++ b/bin/varnishtest/vtc.c
@@ -533,6 +533,7 @@ exec_file(const char *fn, const char *script, const char *tmpdir,
 	init_macro();
 	init_server();
 	init_syslog();
+	init_tunnel();
 
 	vsb = VSB_new_auto();
 	AN(vsb);
diff --git a/bin/varnishtest/vtc.h b/bin/varnishtest/vtc.h
index 046df86d0..d2b449359 100644
--- a/bin/varnishtest/vtc.h
+++ b/bin/varnishtest/vtc.h
@@ -85,6 +85,7 @@ extern const char *default_listen_addr;
 
 void init_server(void);
 void init_syslog(void);
+void init_tunnel(void);
 
 /* Sessions */
 struct vtc_sess *Sess_New(struct vtclog *vl, const char *name);
@@ -147,6 +148,8 @@ void start_h2(struct http *hp);
 void stop_h2(struct http *hp);
 void b64_settings(const struct http *hp, const char *s);
 
+cmd_f cmd_tunnel;
+
 /* vtc_gzip.c */
 void vtc_gzip(struct http *, const char *, char **, long *);
 void vtc_gunzip(struct http *, char *, long *);
diff --git a/bin/varnishtest/vtc_tunnel.c b/bin/varnishtest/vtc_tunnel.c
new file mode 100644
index 000000000..41ce3c775
--- /dev/null
+++ b/bin/varnishtest/vtc_tunnel.c
@@ -0,0 +1,758 @@
+/*-
+ * Copyright (c) 2020 Varnish Software
+ * All rights reserved.
+ *
+ * Author: Dridi Boukelmoune <dridi.boukelmoune at gmail.com>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "config.h"
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include <errno.h>
+#include <poll.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "vtc.h"
+
+#include "vtcp.h"
+
+/* SECTION: tunnel tunnel
+ *
+ * The goal of a tunnel is to help control the data transfer between two
+ * parties, for example to trigger socket timeouts in the middle of protocol
+ * frames, without the need to change how both parties are implemented.
+ *
+ * A tunnel accepts a connection and then connects on behalf of the source to
+ * the desired destination. Once both connections are established the tunnel
+ * will transfer bytes unchanged between the source and destination. Transfer
+ * can be interrupted, usually with the help of synchronization methods like
+ * barriers. Once the transfer is paused, it is possible to let a specific
+ * amount of bytes move in either direction.
+ *
+ * SECTION: tunnel.args Arguments
+ *
+ * \-start
+ *        Start the tunnel in background, processing the last given
+ *        specification.
+ *
+ * \-wait
+ *        Block until the thread finishes.
+ *
+ * \-listen STRING
+ *        Dictate the listening socket for the server. STRING is of the form
+ *        "IP PORT", or "HOST PORT".
+ *
+ *        Listens by defaults to a local random port.
+ *
+ * \-connect STRING
+ *        Indicate the server to connect to. STRING is also of the form
+ *        "IP PORT", or "HOST PORT".
+ *
+ *        Connects by default to a varnish instance called ``v1``.
+ *
+ * SECTION: tunnel.spec Specification
+ *
+ * The specification contains a list of tunnel commands that can be combined
+ * with barriers and delays. For example::
+ *
+ *     tunnel t1 {
+ *         barrier b1 sync
+ *         pause
+ *         delay 1
+ *         send 42
+ *         barrier b2 sync
+ *         resume
+ *     } -start
+ *
+ * If one end of the tunnel is closed before the end of the specification
+ * the test case will fail. A specification that ends in a paused state will
+ * implicitely resume the tunnel.
+ */
+
+enum tunnel_state_e {
+	TUNNEL_ACCEPT,
+	TUNNEL_RUNNING,
+	TUNNEL_PAUSED,
+	TUNNEL_SPEC_DONE,
+	TUNNEL_POLL_DONE,
+	TUNNEL_STOPPED,
+};
+
+struct tunnel_lane {
+	char			buf[1024];
+	ssize_t			buf_len;
+	size_t			wrk_len;
+	int			*rfd;
+	int			*wfd;
+};
+
+struct tunnel {
+	unsigned		magic;
+#define TUNNEL_MAGIC		0x55286619
+	char			*name;
+	struct vtclog		*vl;
+	VTAILQ_ENTRY(tunnel)	list;
+	enum tunnel_state_e	state;
+
+	char			*spec;
+
+	char			connect[256];
+	int			csock;
+	const char		*caddr;
+
+	char			listen[256];
+	int			lsock;
+	char			laddr[32];
+	char			lport[32];
+
+	int			asock;
+
+	struct tunnel_lane	send_lane[1];
+	struct tunnel_lane	recv_lane[1];
+
+	pthread_mutex_t		mtx;		/* state and lanes->*_len */
+	pthread_cond_t		cond;
+	pthread_t		tspec;
+	pthread_t		tpoll;
+};
+
+static pthread_mutex_t		tunnel_mtx;
+
+static VTAILQ_HEAD(, tunnel)	tunnels = VTAILQ_HEAD_INITIALIZER(tunnels);
+
+/**********************************************************************
+ * Is the tunnel still operating?
+ */
+
+static unsigned
+tunnel_is_open(struct tunnel *t)
+{
+	unsigned is_open;
+
+	AZ(pthread_mutex_lock(&t->mtx));
+	is_open = (t->send_lane->buf_len >= 0 && t->recv_lane->buf_len >= 0);
+	AZ(pthread_mutex_unlock(&t->mtx));
+	return (is_open);
+}
+
+/**********************************************************************
+ * SECTION: tunnel.spec.pause
+ *
+ * pause
+ *         Wait for in-flight bytes to be transferred and pause the tunnel.
+ *
+ *         The tunnel must be running.
+ */
+
+static void
+cmd_tunnel_pause(CMD_ARGS)
+{
+	struct tunnel *t;
+
+	CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
+	AZ(av[1]);
+
+	if (!tunnel_is_open(t))
+		vtc_fatal(vl, "Tunnel already closed");
+
+	AZ(pthread_mutex_lock(&t->mtx));
+	if (t->state == TUNNEL_PAUSED) {
+		AZ(pthread_mutex_unlock(&t->mtx));
+		vtc_fatal(vl, "Tunnel already paused");
+		WRONG("unreachable");
+	}
+	assert(t->state == TUNNEL_RUNNING);
+	t->state = TUNNEL_PAUSED;
+	AZ(pthread_cond_signal(&t->cond));
+	AZ(pthread_cond_wait(&t->cond, &t->mtx));
+	AZ(pthread_mutex_unlock(&t->mtx));
+}
+
+/**********************************************************************
+ * SECTION: tunnel.spec.send
+ *
+ * send NUMBER
+ *         Wait until NUMBER bytes are transferred from source to
+ *         destination.
+ *
+ *         The tunnel must be paused, it remains paused afterwards.
+ */
+
+static void
+cmd_tunnel_send(CMD_ARGS)
+{
+	struct tunnel *t;
+	unsigned len;
+
+	CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
+	AN(av[1]);
+	AZ(av[2]);
+
+	len = atoi(av[1]);
+
+	if (!tunnel_is_open(t))
+		vtc_fatal(vl, "Tunnel already closed");
+
+	AZ(pthread_mutex_lock(&t->mtx));
+	if (t->state == TUNNEL_RUNNING) {
+		AZ(pthread_mutex_unlock(&t->mtx));
+		vtc_fatal(vl, "Tunnel still running");
+		WRONG("unreachable");
+	}
+	assert(t->state == TUNNEL_PAUSED);
+	AZ(t->send_lane->wrk_len);
+	AZ(t->recv_lane->wrk_len);
+	if (!strcmp(av[0], "send"))
+		t->send_lane->wrk_len = len;
+	else
+		t->recv_lane->wrk_len = len;
+	AZ(pthread_cond_signal(&t->cond));
+	AZ(pthread_cond_wait(&t->cond, &t->mtx));
+	AZ(pthread_mutex_unlock(&t->mtx));
+}
+
+/**********************************************************************
+ * SECTION: tunnel.spec.recv
+ *
+ * recv NUMBER
+ *         Wait until NUMBER bytes are transferred from destination to
+ *         source.
+ *
+ *         The tunnel must be paused, it remains paused afterwards.
+ */
+
+static void
+cmd_tunnel_recv(CMD_ARGS)
+{
+
+	cmd_tunnel_send(av, priv, vl);
+}
+
+/**********************************************************************
+ * SECTION: tunnel.spec.resume
+ *
+ * resume
+ *         Resume the transfer of bytes in both directions.
+ *
+ *         The tunnel must be paused.
+ */
+
+static void
+cmd_tunnel_resume(CMD_ARGS)
+{
+	struct tunnel *t;
+
+	CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
+	AZ(av[1]);
+
+	if (!tunnel_is_open(t))
+		vtc_fatal(vl, "Tunnel already closed");
+
+	AZ(pthread_mutex_lock(&t->mtx));
+	if (t->state == TUNNEL_RUNNING) {
+		AZ(pthread_mutex_unlock(&t->mtx));
+		vtc_fatal(vl, "Tunnel already running");
+		WRONG("unreachable");
+	}
+	assert(t->state == TUNNEL_PAUSED);
+	t->state = TUNNEL_RUNNING;
+	AZ(pthread_cond_signal(&t->cond));
+	AZ(pthread_mutex_unlock(&t->mtx));
+}
+
+const struct cmds tunnel_cmds[] = {
+#define CMD_TUNNEL(n) { #n, cmd_tunnel_##n },
+	CMD_TUNNEL(pause)
+	CMD_TUNNEL(send)
+	CMD_TUNNEL(recv)
+	CMD_TUNNEL(resume)
+#undef CMD_TUNNEL
+	{ NULL, NULL }
+};
+
+/**********************************************************************
+ * Tunnel poll thread
+ */
+
+static void
+tunnel_read(struct tunnel *t, struct vtclog *vl, struct pollfd *pfd,
+    struct tunnel_lane *lane)
+{
+	size_t len;
+	ssize_t res;
+	enum tunnel_state_e state;
+
+	assert(pfd->fd == *lane->rfd);
+	if (!(pfd->revents & POLLIN))
+		return;
+
+	AZ(pthread_mutex_lock(&t->mtx));
+	AZ(lane->buf_len);
+	len = lane->wrk_len;
+	state = t->state;
+	AZ(pthread_mutex_unlock(&t->mtx));
+
+	if (len == 0 && state == TUNNEL_PAUSED)
+		return;
+
+	if (len == 0 || len > sizeof lane->buf)
+		len = sizeof lane->buf;
+
+	res = read(pfd->fd, lane->buf, len);
+	if (res < 0)
+		vtc_fatal(vl, "Read failed: %s", strerror(errno));
+
+	AZ(pthread_mutex_lock(&t->mtx));
+	lane->buf_len = (res == 0) ? -1 : res;
+	AZ(pthread_mutex_unlock(&t->mtx));
+}
+
+static void
+tunnel_write(struct tunnel *t, struct vtclog *vl, struct tunnel_lane *lane,
+    const char *action)
+{
+	const char *p;
+	ssize_t res, l;
+
+	p = lane->buf;
+	AZ(pthread_mutex_lock(&t->mtx));
+	l = lane->buf_len;
+	AZ(pthread_mutex_unlock(&t->mtx));
+
+	if (l > 0)
+		vtc_log(vl, 3, "%s %zd bytes", action, l);
+	while (l > 0) {
+		res = write(*lane->wfd, p, l);
+		if (res <= 0)
+			vtc_fatal(vl, "Write failed: %s", strerror(errno));
+		l -= res;
+		p += res;
+	}
+
+	AZ(pthread_mutex_lock(&t->mtx));
+	if (lane->wrk_len > 0 && lane->buf_len != -1) {
+		assert(lane->buf_len >= 0);
+		assert(lane->wrk_len >= lane->buf_len);
+		lane->wrk_len -= lane->buf_len;
+	}
+	lane->buf_len = l;
+	AZ(pthread_mutex_unlock(&t->mtx));
+}
+
+static void *
+tunnel_poll_thread(void *priv)
+{
+	struct tunnel *t;
+	struct vtclog *vl;
+	struct pollfd pfd[2];
+	enum tunnel_state_e state;
+	int res;
+
+	CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
+
+	vl = vtc_logopen("%s", t->name);
+	pthread_cleanup_push(vtc_logclose, vl);
+
+	while (tunnel_is_open(t) && !vtc_stop) {
+		AZ(pthread_mutex_lock(&t->mtx));
+		/* NB: can be woken up by `tunnel tX -wait` */
+		while (t->state == TUNNEL_ACCEPT)
+			AZ(pthread_cond_wait(&t->cond, &t->mtx));
+		state = t->state;
+		AZ(pthread_mutex_unlock(&t->mtx));
+
+		assert(state < TUNNEL_POLL_DONE);
+
+		memset(pfd, 0, sizeof pfd);
+		pfd[0].fd = *t->send_lane->rfd;
+		pfd[1].fd = *t->recv_lane->rfd;
+		pfd[0].events = POLLIN;
+		pfd[1].events = POLLIN;
+		res = poll(pfd, 2, 100);
+		if (res == -1)
+			vtc_fatal(vl, "Poll failed: %s", strerror(errno));
+
+		tunnel_read(t, vl, &pfd[0], t->send_lane);
+		tunnel_read(t, vl, &pfd[1], t->recv_lane);
+
+		AZ(pthread_mutex_lock(&t->mtx));
+		if (t->state == TUNNEL_PAUSED && t->send_lane->wrk_len == 0 &&
+		    t->recv_lane->wrk_len == 0) {
+			AZ(t->send_lane->buf_len);
+			AZ(t->recv_lane->buf_len);
+			AZ(pthread_cond_signal(&t->cond));
+			AZ(pthread_cond_wait(&t->cond, &t->mtx));
+		}
+		AZ(pthread_mutex_unlock(&t->mtx));
+
+		tunnel_write(t, vl, t->send_lane, "Sending");
+		tunnel_write(t, vl, t->recv_lane, "Receiving");
+	}
+
+	AZ(pthread_mutex_lock(&t->mtx));
+	if (t->state != TUNNEL_SPEC_DONE && !vtc_stop) {
+		AZ(pthread_cond_signal(&t->cond));
+		AZ(pthread_cond_wait(&t->cond, &t->mtx));
+	}
+	AZ(pthread_mutex_unlock(&t->mtx));
+
+	pthread_cleanup_pop(0);
+	vtc_logclose(vl);
+	t->state = TUNNEL_POLL_DONE;
+	return (NULL);
+}
+
+/**********************************************************************
+ * Tunnel spec thread
+ */
+
+static void
+tunnel_accept(struct tunnel *t, struct vtclog *vl)
+{
+	struct vsb *vsb;
+	const char *addr, *err;
+	int afd, cfd;
+
+	CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
+	assert(t->lsock >= 0);
+	assert(t->asock < 0);
+	assert(t->csock < 0);
+	assert(t->state == TUNNEL_ACCEPT);
+
+	vtc_log(vl, 4, "Accepting");
+	afd = accept(t->lsock, NULL, NULL);
+	if (afd < 0)
+		vtc_fatal(vl, "Accept failed: %s", strerror(errno));
+	vtc_log(vl, 3, "Accepted socket fd is %d", afd);
+
+	vsb = macro_expand(vl, t->connect);
+	AN(vsb);
+	addr = VSB_data(vsb);
+
+	cfd = VTCP_open(addr, NULL, 10., &err);
+	if (cfd < 0)
+		vtc_fatal(vl, "Failed to open %s: %s", addr, err);
+	vtc_log(vl, 3, "Connected socket fd is %d", cfd);
+	VSB_destroy(&vsb);
+
+	VTCP_blocking(afd);
+	VTCP_blocking(cfd);
+
+	AZ(pthread_mutex_lock(&t->mtx));
+	t->asock = afd;
+	t->csock = cfd;
+	t->send_lane->buf_len = 0;
+	t->send_lane->wrk_len = 0;
+	t->recv_lane->buf_len = 0;
+	t->recv_lane->wrk_len = 0;
+	t->state = TUNNEL_RUNNING;
+	AZ(pthread_cond_signal(&t->cond));
+	AZ(pthread_mutex_unlock(&t->mtx));
+}
+
+static void *
+tunnel_spec_thread(void *priv)
+{
+	struct tunnel *t;
+	struct vtclog *vl;
+	enum tunnel_state_e state;
+
+	CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
+	AN(*t->connect);
+
+	vl = vtc_logopen("%s", t->name);
+	vtc_log_set_cmd(vl, tunnel_cmds);
+	pthread_cleanup_push(vtc_logclose, vl);
+
+	tunnel_accept(t, vl);
+	parse_string(vl, t, t->spec);
+
+	AZ(pthread_mutex_lock(&t->mtx));
+	state = t->state;
+	AZ(pthread_mutex_unlock(&t->mtx));
+
+	if (state == TUNNEL_PAUSED && !vtc_stop)
+		parse_string(vl, t, "resume");
+
+	AZ(pthread_mutex_lock(&t->mtx));
+	t->state = TUNNEL_SPEC_DONE;
+	AZ(pthread_cond_signal(&t->cond));
+	AZ(pthread_mutex_unlock(&t->mtx));
+
+	vtc_log(vl, 2, "Ending");
+	pthread_cleanup_pop(0);
+	vtc_logclose(vl);
+	return (NULL);
+}
+
+/**********************************************************************
+ * Tunnel management
+ */
+
+static struct tunnel *
+tunnel_new(const char *name)
+{
+	struct tunnel *t;
+
+	ALLOC_OBJ(t, TUNNEL_MAGIC);
+	AN(t);
+	REPLACE(t->name, name);
+	t->vl = vtc_logopen("%s", name);
+	AN(t->vl);
+
+	t->state = TUNNEL_STOPPED;
+	bprintf(t->connect, "%s", "${v1_sock}");
+	bprintf(t->listen, "%s", "127.0.0.1 0");
+	t->csock = -1;
+	t->lsock = -1;
+	t->asock = -1;
+	t->send_lane->rfd = &t->asock;
+	t->send_lane->wfd = &t->csock;
+	t->recv_lane->rfd = &t->csock;
+	t->recv_lane->wfd = &t->asock;
+	AZ(pthread_mutex_init(&t->mtx, NULL));
+	AZ(pthread_cond_init(&t->cond, NULL));
+	AZ(pthread_mutex_lock(&tunnel_mtx));
+	VTAILQ_INSERT_TAIL(&tunnels, t, list);
+	AZ(pthread_mutex_unlock(&tunnel_mtx));
+	return (t);
+}
+
+static void
+tunnel_delete(struct tunnel *t)
+{
+
+	CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
+	assert(t->asock < 0);
+	assert(t->csock < 0);
+	if (t->lsock >= 0)
+		VTCP_close(&t->lsock);
+	macro_undef(t->vl, t->name, "addr");
+	macro_undef(t->vl, t->name, "port");
+	macro_undef(t->vl, t->name, "sock");
+	vtc_logclose(t->vl);
+	(void)pthread_mutex_destroy(&t->mtx);
+	(void)pthread_cond_destroy(&t->cond);
+	free(t->name);
+	FREE_OBJ(t);
+}
+
+/**********************************************************************
+ * Tunnel listen
+ */
+
+static void
+tunnel_listen(struct tunnel *t)
+{
+	const char *err;
+
+	if (t->lsock >= 0)
+		VTCP_close(&t->lsock);
+	t->lsock = VTCP_listen_on(t->listen, "0", 1, &err);
+	if (err != NULL)
+		vtc_fatal(t->vl,
+		    "Tunnel listen address (%s) cannot be resolved: %s",
+		    t->listen, err);
+	assert(t->lsock > 0);
+	VTCP_myname(t->lsock, t->laddr, sizeof t->laddr,
+	    t->lport, sizeof t->lport);
+	macro_def(t->vl, t->name, "addr", "%s", t->laddr);
+	macro_def(t->vl, t->name, "port", "%s", t->lport);
+	macro_def(t->vl, t->name, "sock", "%s %s", t->laddr, t->lport);
+	/* Record the actual port, and reuse it on subsequent starts */
+	bprintf(t->listen, "%s %s", t->laddr, t->lport);
+}
+
+/**********************************************************************
+ * Start the tunnel thread
+ */
+
+static void
+tunnel_start(struct tunnel *t)
+{
+
+	CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
+	vtc_log(t->vl, 2, "Starting tunnel");
+	tunnel_listen(t);
+	vtc_log(t->vl, 1, "Listen on %s", t->listen);
+	assert(t->state == TUNNEL_STOPPED);
+	t->state = TUNNEL_ACCEPT;
+	t->send_lane->buf_len = 0;
+	t->send_lane->wrk_len = 0;
+	t->recv_lane->buf_len = 0;
+	t->recv_lane->wrk_len = 0;
+	AZ(pthread_create(&t->tpoll, NULL, tunnel_poll_thread, t));
+	AZ(pthread_create(&t->tspec, NULL, tunnel_spec_thread, t));
+}
+
+/**********************************************************************
+ * Wait for tunnel thread to stop
+ */
+
+static void
+tunnel_wait(struct tunnel *t)
+{
+	void *res;
+
+	CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
+	vtc_log(t->vl, 2, "Waiting for tunnel");
+
+	AZ(pthread_cond_signal(&t->cond));
+
+	AZ(pthread_join(t->tspec, &res));
+	if (res == PTHREAD_CANCELED && !vtc_stop)
+		vtc_fatal(t->vl, "Tunnel spec canceled");
+	if (res != NULL && !vtc_stop)
+		vtc_fatal(t->vl, "Tunnel spec returned \"%p\"", res);
+
+	AZ(pthread_join(t->tpoll, &res));
+	if (res == PTHREAD_CANCELED && !vtc_stop)
+		vtc_fatal(t->vl, "Tunnel poll canceled");
+	if (res != NULL && !vtc_stop)
+		vtc_fatal(t->vl, "Tunnel poll returned \"%p\"", res);
+
+	if (t->csock >= 0)
+		VTCP_close(&t->csock);
+	if (t->asock >= 0)
+		VTCP_close(&t->asock);
+	t->tspec = 0;
+	t->tpoll = 0;
+	t->state = TUNNEL_STOPPED;
+}
+
+/**********************************************************************
+ * Reap tunnel
+ */
+
+static void
+tunnel_reset(void)
+{
+	struct tunnel *t;
+	enum tunnel_state_e state;
+
+	while (1) {
+		AZ(pthread_mutex_lock(&tunnel_mtx));
+		t = VTAILQ_FIRST(&tunnels);
+		CHECK_OBJ_ORNULL(t, TUNNEL_MAGIC);
+		if (t != NULL)
+			VTAILQ_REMOVE(&tunnels, t, list);
+		AZ(pthread_mutex_unlock(&tunnel_mtx));
+		if (t == NULL)
+			break;
+
+		AZ(pthread_mutex_lock(&t->mtx));
+		state = t->state;
+		if (state < TUNNEL_POLL_DONE)
+			(void)pthread_cancel(t->tpoll);
+		if (state < TUNNEL_SPEC_DONE)
+			(void)pthread_cancel(t->tspec);
+		AZ(pthread_mutex_unlock(&t->mtx));
+
+		if (state != TUNNEL_STOPPED)
+			tunnel_wait(t);
+		tunnel_delete(t);
+	}
+}
+
+/**********************************************************************
+ * Tunnel command dispatch
+ */
+
+void
+cmd_tunnel(CMD_ARGS)
+{
+	struct tunnel *t;
+
+	(void)priv;
+
+	if (av == NULL) {
+		/* Reset and free */
+		tunnel_reset();
+		return;
+	}
+
+	AZ(strcmp(av[0], "tunnel"));
+	av++;
+
+	VTC_CHECK_NAME(vl, av[0], "Tunnel", 't');
+
+	AZ(pthread_mutex_lock(&tunnel_mtx));
+	VTAILQ_FOREACH(t, &tunnels, list)
+		if (!strcmp(t->name, av[0]))
+			break;
+	AZ(pthread_mutex_unlock(&tunnel_mtx));
+	if (t == NULL)
+		t = tunnel_new(av[0]);
+	CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
+	av++;
+
+	for (; *av != NULL; av++) {
+		if (vtc_error)
+			break;
+		if (!strcmp(*av, "-wait")) {
+			if (t->state == TUNNEL_STOPPED)
+				vtc_fatal(t->vl, "Tunnel not -started");
+			tunnel_wait(t);
+			continue;
+		}
+
+		/* Don't mess with a running tunnel */
+		if (t->state != TUNNEL_STOPPED)
+			tunnel_wait(t);
+
+		assert(t->state == TUNNEL_STOPPED);
+		if (!strcmp(*av, "-connect")) {
+			bprintf(t->connect, "%s", av[1]);
+			av++;
+			continue;
+		}
+		if (!strcmp(*av, "-listen")) {
+			bprintf(t->listen, "%s", av[1]);
+			av++;
+			continue;
+		}
+		if (!strcmp(*av, "-start")) {
+			tunnel_start(t);
+			continue;
+		}
+		if (**av == '-')
+			vtc_fatal(t->vl, "Unknown tunnel argument: %s", *av);
+		t->spec = *av;
+	}
+}
+
+void
+init_tunnel(void)
+{
+
+	AZ(pthread_mutex_init(&tunnel_mtx, NULL));
+}
diff --git a/doc/sphinx/Makefile.am b/doc/sphinx/Makefile.am
index a85815071..8327b478f 100644
--- a/doc/sphinx/Makefile.am
+++ b/doc/sphinx/Makefile.am
@@ -157,16 +157,17 @@ include/vsl-tags.rst: $(top_builddir)/lib/libvarnishapi/vsl2rst
 	mv -f ${@}_ ${@}
 BUILT_SOURCES += include/vsl-tags.rst
 
-VTCSYN_SRC = $(top_srcdir)/bin/varnishtest/vtc.c \
-	     $(top_srcdir)/bin/varnishtest/vtc_barrier.c \
-	     $(top_srcdir)/bin/varnishtest/vtc_haproxy.c \
-	     $(top_srcdir)/bin/varnishtest/vtc_http.c \
-	     $(top_srcdir)/bin/varnishtest/vtc_http2.c \
-	     $(top_srcdir)/bin/varnishtest/vtc_logexp.c \
-	     $(top_srcdir)/bin/varnishtest/vtc_misc.c \
-	     $(top_srcdir)/bin/varnishtest/vtc_process.c \
-	     $(top_srcdir)/bin/varnishtest/vtc_syslog.c \
-	     $(top_srcdir)/bin/varnishtest/vtc_varnish.c
+VTCSYN_SRC = \
+	$(top_srcdir)/bin/varnishtest/vtc_barrier.c \
+	$(top_srcdir)/bin/varnishtest/vtc_haproxy.c \
+	$(top_srcdir)/bin/varnishtest/vtc_http.c \
+	$(top_srcdir)/bin/varnishtest/vtc_http2.c \
+	$(top_srcdir)/bin/varnishtest/vtc_logexp.c \
+	$(top_srcdir)/bin/varnishtest/vtc_misc.c \
+	$(top_srcdir)/bin/varnishtest/vtc_process.c \
+	$(top_srcdir)/bin/varnishtest/vtc_syslog.c \
+	$(top_srcdir)/bin/varnishtest/vtc_tunnel.c \
+	$(top_srcdir)/bin/varnishtest/vtc_varnish.c
 include/vtc-syntax.rst: vtc-syntax.py $(VTCSYN_SRC)
 	$(AM_V_GEN) $(PYTHON) $(top_srcdir)/doc/sphinx/vtc-syntax.py $(VTCSYN_SRC) > ${@}_
 	@mv -f ${@}_ ${@}


More information about the varnish-commit mailing list