varnish-cache/bin/varnishtest/vtc_tunnel.c
0
/*-
1
 * Copyright (c) 2020 Varnish Software
2
 * All rights reserved.
3
 *
4
 * Author: Dridi Boukelmoune <dridi.boukelmoune@gmail.com>
5
 *
6
 * SPDX-License-Identifier: BSD-2-Clause
7
 *
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions
10
 * are met:
11
 * 1. Redistributions of source code must retain the above copyright
12
 *    notice, this list of conditions and the following disclaimer.
13
 * 2. Redistributions in binary form must reproduce the above copyright
14
 *    notice, this list of conditions and the following disclaimer in the
15
 *    documentation and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
21
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27
 * SUCH DAMAGE.
28
 */
29
30
#include "config.h"
31
32
#include <sys/socket.h>
33
#include <sys/types.h>
34
#include <sys/stat.h>
35
36
#include <errno.h>
37
#include <poll.h>
38
#include <stdio.h>
39
#include <stdlib.h>
40
#include <string.h>
41
#include <unistd.h>
42
43
#include "vtc.h"
44
45
#include "vsa.h"
46
#include "vtcp.h"
47
48
/* SECTION: tunnel tunnel
49
 *
50
 * The goal of a tunnel is to help control the data transfer between two
51
 * parties, for example to trigger socket timeouts in the middle of protocol
52
 * frames, without the need to change how both parties are implemented.
53
 *
54
 * A tunnel accepts a connection and then connects on behalf of the source to
55
 * the desired destination. Once both connections are established the tunnel
56
 * will transfer bytes unchanged between the source and destination. Transfer
57
 * can be interrupted, usually with the help of synchronization methods like
58
 * barriers. Once the transfer is paused, it is possible to let a specific
59
 * amount of bytes move in either direction.
60
 *
61
 * SECTION: tunnel.args Arguments
62
 *
63
 * \-start
64
 *        Start the tunnel in background, processing the last given
65
 *        specification.
66
 *
67
 * \-start+pause
68
 *        Start the tunnel, but already paused.
69
 *
70
 * \-wait
71
 *        Block until the thread finishes.
72
 *
73
 * \-listen STRING
74
 *        Dictate the listening socket for the server. STRING is of the form
75
 *        "IP PORT", or "HOST PORT".
76
 *
77
 *        Listens by defaults to a local random port.
78
 *
79
 * \-connect STRING
80
 *        Indicate the server to connect to. STRING is also of the form
81
 *        "IP PORT", or "HOST PORT".
82
 *
83
 *        Connects by default to a varnish instance called ``v1``.
84
 *
85
 * SECTION: tunnel.spec Specification
86
 *
87
 * The specification contains a list of tunnel commands that can be combined
88
 * with barriers and delays. For example::
89
 *
90
 *     tunnel t1 {
91
 *         barrier b1 sync
92
 *         pause
93
 *         delay 1
94
 *         send 42
95
 *         barrier b2 sync
96
 *         resume
97
 *     } -start
98
 *
99
 * If one end of the tunnel is closed before the end of the specification
100
 * the test case will fail. A specification that ends in a paused state will
101
 * implicitely resume the tunnel.
102
 */
103
104
enum tunnel_state_e {
105
        TUNNEL_ACCEPT,
106
        TUNNEL_RUNNING,
107
        TUNNEL_PAUSED,
108
        TUNNEL_SPEC_DONE,
109
        TUNNEL_POLL_DONE,
110
        TUNNEL_STOPPED,
111
};
112
113
struct tunnel_lane {
114
        char                    buf[1024];
115
        ssize_t                 buf_len;
116
        size_t                  wrk_len;
117
        int                     *rfd;
118
        int                     *wfd;
119
};
120
121
struct tunnel {
122
        unsigned                magic;
123
#define TUNNEL_MAGIC            0x7f59913d
124
        char                    *name;
125
        struct vtclog           *vl;
126
        VTAILQ_ENTRY(tunnel)    list;
127
        enum tunnel_state_e     state;
128
        unsigned                start_paused;
129
130
        char                    *spec;
131
132
        char                    connect[256];
133
        int                     csock;
134
135
        char                    listen[256];
136
        int                     lsock;
137
        char                    laddr[VTCP_ADDRBUFSIZE];
138
        char                    lport[VTCP_PORTBUFSIZE];
139
140
        int                     asock;
141
142
        struct tunnel_lane      send_lane[1];
143
        struct tunnel_lane      recv_lane[1];
144
145
        pthread_mutex_t         mtx;            /* state and lanes->*_len */
146
        pthread_cond_t          cond;
147
        pthread_t               tspec;
148
        pthread_t               tpoll;
149
};
150
151
static pthread_mutex_t          tunnel_mtx;
152
153
static VTAILQ_HEAD(, tunnel)    tunnels = VTAILQ_HEAD_INITIALIZER(tunnels);
154
155
/**********************************************************************
156
 * Is the tunnel still operating?
157
 */
158
159
static unsigned
160 2621
tunnel_is_open(struct tunnel *t)
161
{
162
        unsigned is_open;
163
164 2621
        PTOK(pthread_mutex_lock(&t->mtx));
165 2621
        is_open = (t->send_lane->buf_len >= 0 && t->recv_lane->buf_len >= 0);
166 2621
        PTOK(pthread_mutex_unlock(&t->mtx));
167 2621
        return (is_open);
168
}
169
170
/**********************************************************************
171
 * SECTION: tunnel.spec.pause
172
 *
173
 * pause
174
 *         Wait for in-flight bytes to be transferred and pause the tunnel.
175
 *
176
 *         The tunnel must be running.
177
 */
178
179
static void
180 240
cmd_tunnel_pause(CMD_ARGS)
181
{
182
        struct tunnel *t;
183
184 240
        CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
185 240
        AZ(av[1]);
186
187 240
        if (!tunnel_is_open(t))
188 0
                vtc_fatal(vl, "Tunnel already closed");
189
190 240
        PTOK(pthread_mutex_lock(&t->mtx));
191 240
        if (t->state == TUNNEL_PAUSED) {
192 0
                PTOK(pthread_mutex_unlock(&t->mtx));
193 0
                vtc_fatal(vl, "Tunnel already paused");
194
        }
195 240
        assert(t->state == TUNNEL_RUNNING);
196 240
        t->state = TUNNEL_PAUSED;
197 240
        PTOK(pthread_cond_signal(&t->cond));
198 240
        PTOK(pthread_cond_wait(&t->cond, &t->mtx));
199 240
        PTOK(pthread_mutex_unlock(&t->mtx));
200 240
}
201
202
/**********************************************************************
203
 * SECTION: tunnel.spec.send
204
 *
205
 * send NUMBER
206
 *         Wait until NUMBER bytes are transferred from source to
207
 *         destination.
208
 *
209
 *         The tunnel must be paused, it remains paused afterwards.
210
 */
211
212
static void
213 400
cmd_tunnel_send(CMD_ARGS)
214
{
215
        struct tunnel *t;
216
        unsigned len;
217
218 400
        CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
219 400
        AN(av[1]);
220 400
        AZ(av[2]);
221
222 400
        len = atoi(av[1]);
223
224 400
        if (!tunnel_is_open(t))
225 0
                vtc_fatal(vl, "Tunnel already closed");
226
227 400
        PTOK(pthread_mutex_lock(&t->mtx));
228 400
        if (t->state == TUNNEL_RUNNING) {
229 0
                PTOK(pthread_mutex_unlock(&t->mtx));
230 0
                vtc_fatal(vl, "Tunnel still running");
231
        }
232 400
        assert(t->state == TUNNEL_PAUSED);
233 400
        AZ(t->send_lane->wrk_len);
234 400
        AZ(t->recv_lane->wrk_len);
235 400
        if (!strcmp(av[0], "send"))
236 200
                t->send_lane->wrk_len = len;
237
        else
238 200
                t->recv_lane->wrk_len = len;
239 400
        PTOK(pthread_cond_signal(&t->cond));
240 400
        PTOK(pthread_cond_wait(&t->cond, &t->mtx));
241 400
        PTOK(pthread_mutex_unlock(&t->mtx));
242 400
}
243
244
/**********************************************************************
245
 * SECTION: tunnel.spec.recv
246
 *
247
 * recv NUMBER
248
 *         Wait until NUMBER bytes are transferred from destination to
249
 *         source.
250
 *
251
 *         The tunnel must be paused, it remains paused afterwards.
252
 */
253
254
static void
255 200
cmd_tunnel_recv(CMD_ARGS)
256
{
257
258 200
        cmd_tunnel_send(av, priv, vl);
259 200
}
260
261
/**********************************************************************
262
 * SECTION: tunnel.spec.resume
263
 *
264
 * resume
265
 *         Resume the transfer of bytes in both directions.
266
 *
267
 *         The tunnel must be paused.
268
 */
269
270
static void
271 280
cmd_tunnel_resume(CMD_ARGS)
272
{
273
        struct tunnel *t;
274
275 280
        CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
276 280
        AZ(av[1]);
277
278 280
        if (!tunnel_is_open(t))
279 0
                vtc_fatal(vl, "Tunnel already closed");
280
281 280
        PTOK(pthread_mutex_lock(&t->mtx));
282 280
        if (t->state == TUNNEL_RUNNING) {
283 0
                PTOK(pthread_mutex_unlock(&t->mtx));
284 0
                vtc_fatal(vl, "Tunnel already running");
285
        }
286 280
        assert(t->state == TUNNEL_PAUSED);
287 280
        t->state = TUNNEL_RUNNING;
288 280
        PTOK(pthread_cond_signal(&t->cond));
289 280
        PTOK(pthread_mutex_unlock(&t->mtx));
290 280
}
291
292
static const struct cmds tunnel_cmds[] = {
293
#define CMD_TUNNEL(n) { #n, cmd_tunnel_##n },
294
        CMD_TUNNEL(pause)
295
        CMD_TUNNEL(send)
296
        CMD_TUNNEL(recv)
297
        CMD_TUNNEL(resume)
298
#undef CMD_TUNNEL
299
        { NULL, NULL }
300
};
301
302
/**********************************************************************
303
 * Tunnel poll thread
304
 */
305
306
static void
307 3082
tunnel_read(struct tunnel *t, struct vtclog *vl, const struct pollfd *pfd,
308
    struct tunnel_lane *lane)
309
{
310
        size_t len;
311
        ssize_t res;
312
        enum tunnel_state_e state;
313
314 3082
        assert(pfd->fd == *lane->rfd);
315 3082
        if (!(pfd->revents & POLLIN))
316 1802
                return;
317
318 1280
        PTOK(pthread_mutex_lock(&t->mtx));
319 1280
        AZ(lane->buf_len);
320 1280
        len = lane->wrk_len;
321 1280
        state = t->state;
322 1280
        PTOK(pthread_mutex_unlock(&t->mtx));
323
324 1280
        if (len == 0 && state == TUNNEL_PAUSED)
325 400
                return;
326
327 880
        if (len == 0 || len > sizeof lane->buf)
328 480
                len = sizeof lane->buf;
329
330 880
        res = read(pfd->fd, lane->buf, len);
331 880
        if (res < 0)
332 0
                vtc_fatal(vl, "Read failed: %s", strerror(errno));
333
334 880
        PTOK(pthread_mutex_lock(&t->mtx));
335 880
        lane->buf_len = (res == 0) ? -1 : res;
336 880
        PTOK(pthread_mutex_unlock(&t->mtx));
337 3082
}
338
339
static void
340 3082
tunnel_write(struct tunnel *t, struct vtclog *vl, struct tunnel_lane *lane,
341
    const char *action)
342
{
343
        const char *p;
344
        ssize_t res, l;
345
346 3082
        p = lane->buf;
347 3082
        l = lane->buf_len;
348
349 3082
        if (l > 0)
350 720
                vtc_log(vl, 3, "%s %zd bytes", action, l);
351 3802
        while (l > 0) {
352 720
                res = write(*lane->wfd, p, l);
353 720
                if (res <= 0)
354 0
                        vtc_fatal(vl, "Write failed: %s", strerror(errno));
355 720
                l -= res;
356 720
                p += res;
357
        }
358
359 3082
        PTOK(pthread_mutex_lock(&t->mtx));
360 3082
        if (lane->wrk_len > 0 && lane->buf_len != -1) {
361 800
                assert(lane->buf_len >= 0);
362 800
                assert(lane->wrk_len >= (size_t)lane->buf_len);
363 800
                lane->wrk_len -= lane->buf_len;
364 800
        }
365 3082
        lane->buf_len = l;
366 3082
        PTOK(pthread_mutex_unlock(&t->mtx));
367 3082
}
368
369
static void *
370 160
tunnel_poll_thread(void *priv)
371
{
372
        struct tunnel *t;
373
        struct vtclog *vl;
374
        struct pollfd pfd[2];
375
        enum tunnel_state_e state;
376
        int res;
377
378 160
        CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
379
380 160
        vl = vtc_logopen("%s", t->name);
381 160
        pthread_cleanup_push(vtc_logclose, vl);
382
383 1701
        while (tunnel_is_open(t) && !vtc_stop) {
384 1541
                PTOK(pthread_mutex_lock(&t->mtx));
385
                /* NB: can be woken up by `tunnel tX -wait` */
386 1718
                while (t->state == TUNNEL_ACCEPT && !vtc_stop)
387 177
                        PTOK(pthread_cond_wait(&t->cond, &t->mtx));
388 1541
                state = t->state;
389 1541
                PTOK(pthread_mutex_unlock(&t->mtx));
390
391 1541
                if (vtc_stop)
392 0
                        break;
393
394 1541
                assert(state < TUNNEL_POLL_DONE);
395
396 1541
                memset(pfd, 0, sizeof pfd);
397 1541
                pfd[0].fd = *t->send_lane->rfd;
398 1541
                pfd[1].fd = *t->recv_lane->rfd;
399 1541
                pfd[0].events = POLLIN;
400 1541
                pfd[1].events = POLLIN;
401 1541
                res = poll(pfd, 2, 100);
402 1541
                if (res == -1)
403 0
                        vtc_fatal(vl, "Poll failed: %s", strerror(errno));
404
405 1541
                tunnel_read(t, vl, &pfd[0], t->send_lane);
406 1541
                tunnel_read(t, vl, &pfd[1], t->recv_lane);
407
408 1541
                PTOK(pthread_mutex_lock(&t->mtx));
409 1541
                if (t->state == TUNNEL_PAUSED && t->send_lane->wrk_len == 0 &&
410 840
                    t->recv_lane->wrk_len == 0) {
411 640
                        AZ(t->send_lane->buf_len);
412 640
                        AZ(t->recv_lane->buf_len);
413 640
                        PTOK(pthread_cond_signal(&t->cond));
414 640
                        PTOK(pthread_cond_wait(&t->cond, &t->mtx));
415 640
                }
416 1541
                PTOK(pthread_mutex_unlock(&t->mtx));
417
418 1541
                if (vtc_stop)
419 0
                        break;
420
421 1541
                tunnel_write(t, vl, t->send_lane, "Sending");
422 1541
                tunnel_write(t, vl, t->recv_lane, "Receiving");
423
        }
424
425 160
        PTOK(pthread_mutex_lock(&t->mtx));
426 160
        if (t->state != TUNNEL_SPEC_DONE && !vtc_stop) {
427 0
                PTOK(pthread_cond_signal(&t->cond));
428 0
                PTOK(pthread_cond_wait(&t->cond, &t->mtx));
429 0
        }
430 160
        PTOK(pthread_mutex_unlock(&t->mtx));
431
432 160
        pthread_cleanup_pop(0);
433 160
        vtc_logclose(vl);
434 160
        t->state = TUNNEL_POLL_DONE;
435 160
        return (NULL);
436
}
437
438
/**********************************************************************
439
 * Tunnel spec thread
440
 */
441
442
static void
443 160
tunnel_accept(struct tunnel *t, struct vtclog *vl)
444
{
445
        struct vsb *vsb;
446
        const char *addr, *err;
447
        int afd, cfd;
448
449 160
        CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
450 160
        assert(t->lsock >= 0);
451 160
        assert(t->asock < 0);
452 160
        assert(t->csock < 0);
453 160
        assert(t->state == TUNNEL_ACCEPT);
454
455 160
        vtc_log(vl, 4, "Accepting");
456 160
        afd = accept(t->lsock, NULL, NULL);
457 160
        if (afd < 0)
458 0
                vtc_fatal(vl, "Accept failed: %s", strerror(errno));
459 160
        vtc_log(vl, 3, "Accepted socket fd is %d", afd);
460
461 160
        vsb = macro_expand(vl, t->connect);
462 160
        AN(vsb);
463 160
        addr = VSB_data(vsb);
464
465 160
        cfd = VTCP_open(addr, NULL, 10., &err);
466 160
        if (cfd < 0)
467 0
                vtc_fatal(vl, "Failed to open %s: %s", addr, err);
468 160
        vtc_log(vl, 3, "Connected socket fd is %d", cfd);
469 160
        VSB_destroy(&vsb);
470
471 160
        VTCP_blocking(afd);
472 160
        VTCP_blocking(cfd);
473
474 160
        PTOK(pthread_mutex_lock(&t->mtx));
475 160
        t->asock = afd;
476 160
        t->csock = cfd;
477 160
        t->send_lane->buf_len = 0;
478 160
        t->send_lane->wrk_len = 0;
479 160
        t->recv_lane->buf_len = 0;
480 160
        t->recv_lane->wrk_len = 0;
481 160
        if (t->start_paused) {
482 40
                t->state = TUNNEL_PAUSED;
483 40
                t->start_paused = 0;
484 40
        } else
485 120
                t->state = TUNNEL_RUNNING;
486 160
        PTOK(pthread_cond_signal(&t->cond));
487 160
        PTOK(pthread_mutex_unlock(&t->mtx));
488 160
}
489
490
static void *
491 160
tunnel_spec_thread(void *priv)
492
{
493
        struct tunnel *t;
494
        struct vtclog *vl;
495
        enum tunnel_state_e state;
496
497 160
        CAST_OBJ_NOTNULL(t, priv, TUNNEL_MAGIC);
498 160
        AN(*t->connect);
499
500 160
        vl = vtc_logopen("%s", t->name);
501 160
        vtc_log_set_cmd(vl, tunnel_cmds);
502 160
        pthread_cleanup_push(vtc_logclose, vl);
503
504 160
        tunnel_accept(t, vl);
505 160
        parse_string(vl, t, t->spec);
506
507 160
        PTOK(pthread_mutex_lock(&t->mtx));
508 160
        state = t->state;
509 160
        PTOK(pthread_mutex_unlock(&t->mtx));
510
511 160
        if (state == TUNNEL_PAUSED && !vtc_stop)
512 80
                parse_string(vl, t, "resume");
513
514 160
        PTOK(pthread_mutex_lock(&t->mtx));
515 160
        t->state = TUNNEL_SPEC_DONE;
516 160
        PTOK(pthread_cond_signal(&t->cond));
517 160
        PTOK(pthread_mutex_unlock(&t->mtx));
518
519 160
        vtc_log(vl, 2, "Ending");
520 160
        pthread_cleanup_pop(0);
521 160
        vtc_logclose(vl);
522 160
        return (NULL);
523
}
524
525
/**********************************************************************
526
 * Tunnel management
527
 */
528
529
static struct tunnel *
530 120
tunnel_new(const char *name)
531
{
532
        struct tunnel *t;
533
534 120
        ALLOC_OBJ(t, TUNNEL_MAGIC);
535 120
        AN(t);
536 120
        REPLACE(t->name, name);
537 120
        t->vl = vtc_logopen("%s", name);
538 120
        AN(t->vl);
539
540 120
        t->state = TUNNEL_STOPPED;
541 120
        bprintf(t->connect, "%s", "${v1_sock}");
542 120
        bprintf(t->listen, "%s", default_listen_addr);
543 120
        t->csock = -1;
544 120
        t->lsock = -1;
545 120
        t->asock = -1;
546 120
        t->send_lane->rfd = &t->asock;
547 120
        t->send_lane->wfd = &t->csock;
548 120
        t->recv_lane->rfd = &t->csock;
549 120
        t->recv_lane->wfd = &t->asock;
550 120
        PTOK(pthread_mutex_init(&t->mtx, NULL));
551 120
        PTOK(pthread_cond_init(&t->cond, NULL));
552 120
        PTOK(pthread_mutex_lock(&tunnel_mtx));
553 120
        VTAILQ_INSERT_TAIL(&tunnels, t, list);
554 120
        PTOK(pthread_mutex_unlock(&tunnel_mtx));
555 120
        return (t);
556
}
557
558
static void
559 120
tunnel_delete(struct tunnel *t)
560
{
561
562 120
        CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
563 120
        assert(t->asock < 0);
564 120
        assert(t->csock < 0);
565 120
        if (t->lsock >= 0)
566 120
                VTCP_close(&t->lsock);
567 120
        macro_undef(t->vl, t->name, "addr");
568 120
        macro_undef(t->vl, t->name, "port");
569 120
        macro_undef(t->vl, t->name, "sock");
570 120
        vtc_logclose(t->vl);
571 120
        (void)pthread_mutex_destroy(&t->mtx);
572 120
        (void)pthread_cond_destroy(&t->cond);
573 120
        free(t->name);
574 120
        FREE_OBJ(t);
575 120
}
576
577
/**********************************************************************
578
 * Tunnel listen
579
 */
580
581
static void
582 160
tunnel_listen(struct tunnel *t)
583
{
584 160
        char buf[vsa_suckaddr_len];
585
        const struct suckaddr *sua;
586
        const char *err;
587
588 160
        if (t->lsock >= 0)
589 40
                VTCP_close(&t->lsock);
590 160
        t->lsock = VTCP_listen_on(t->listen, "0", 1, &err);
591 160
        if (err != NULL)
592 0
                vtc_fatal(t->vl,
593
                    "Tunnel listen address (%s) cannot be resolved: %s",
594 0
                    t->listen, err);
595 160
        assert(t->lsock > 0);
596 160
        sua = VSA_getsockname(t->lsock, buf, sizeof buf);
597 160
        AN(sua);
598 160
        VTCP_name(sua, t->laddr, sizeof t->laddr, t->lport, sizeof t->lport);
599
600
        /* Record the actual port, and reuse it on subsequent starts */
601 160
        if (VSA_Get_Proto(sua) == AF_INET)
602 160
                bprintf(t->listen, "%s:%s", t->laddr, t->lport);
603
        else
604 0
                bprintf(t->listen, "[%s]:%s", t->laddr, t->lport);
605
606 160
        macro_def(t->vl, t->name, "addr", "%s", t->laddr);
607 160
        macro_def(t->vl, t->name, "port", "%s", t->lport);
608 160
        macro_def(t->vl, t->name, "sock", "%s %s", t->laddr, t->lport);
609 160
}
610
611
/**********************************************************************
612
 * Start the tunnel thread
613
 */
614
615
static void
616 160
tunnel_start(struct tunnel *t)
617
{
618
619 160
        CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
620 160
        vtc_log(t->vl, 2, "Starting tunnel");
621 160
        tunnel_listen(t);
622 160
        vtc_log(t->vl, 1, "Listen on %s", t->listen);
623 160
        assert(t->state == TUNNEL_STOPPED);
624 160
        t->state = TUNNEL_ACCEPT;
625 160
        t->send_lane->buf_len = 0;
626 160
        t->send_lane->wrk_len = 0;
627 160
        t->recv_lane->buf_len = 0;
628 160
        t->recv_lane->wrk_len = 0;
629 160
        PTOK(pthread_create(&t->tpoll, NULL, tunnel_poll_thread, t));
630 160
        PTOK(pthread_create(&t->tspec, NULL, tunnel_spec_thread, t));
631 160
}
632
633
static void
634 40
tunnel_start_pause(struct tunnel *t)
635
{
636
637 40
        CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
638 40
        t->start_paused = 1;
639 40
        tunnel_start(t);
640 40
}
641
642
/**********************************************************************
643
 * Wait for tunnel thread to stop
644
 */
645
646
static void
647 160
tunnel_wait(struct tunnel *t)
648
{
649
        void *res;
650
651 160
        CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
652 160
        vtc_log(t->vl, 2, "Waiting for tunnel");
653
654 160
        PTOK(pthread_cond_signal(&t->cond));
655
656 160
        PTOK(pthread_join(t->tspec, &res));
657 160
        if (res != NULL && !vtc_stop)
658 0
                vtc_fatal(t->vl, "Tunnel spec returned \"%p\"", res);
659
660 160
        PTOK(pthread_join(t->tpoll, &res));
661 160
        if (res != NULL && !vtc_stop)
662 0
                vtc_fatal(t->vl, "Tunnel poll returned \"%p\"", res);
663
664 160
        if (t->csock >= 0)
665 160
                VTCP_close(&t->csock);
666 160
        if (t->asock >= 0)
667 160
                VTCP_close(&t->asock);
668 160
        t->tspec = 0;
669 160
        t->tpoll = 0;
670 160
        t->state = TUNNEL_STOPPED;
671 160
}
672
673
/**********************************************************************
674
 * Reap tunnel
675
 */
676
677
static void
678 39640
tunnel_reset(void)
679
{
680
        struct tunnel *t;
681
682 39760
        while (1) {
683 39760
                PTOK(pthread_mutex_lock(&tunnel_mtx));
684 39760
                t = VTAILQ_FIRST(&tunnels);
685 39760
                CHECK_OBJ_ORNULL(t, TUNNEL_MAGIC);
686 39760
                if (t != NULL)
687 120
                        VTAILQ_REMOVE(&tunnels, t, list);
688 39760
                PTOK(pthread_mutex_unlock(&tunnel_mtx));
689 39760
                if (t == NULL)
690 39640
                        break;
691
692 120
                if (t->state != TUNNEL_STOPPED)
693 40
                        tunnel_wait(t);
694 120
                tunnel_delete(t);
695
        }
696 39640
}
697
698
/**********************************************************************
699
 * Tunnel command dispatch
700
 */
701
702
void
703 39920
cmd_tunnel(CMD_ARGS)
704
{
705
        struct tunnel *t;
706
707 39920
        (void)priv;
708
709 39920
        if (av == NULL) {
710
                /* Reset and free */
711 39640
                tunnel_reset();
712 39640
                return;
713
        }
714
715 280
        AZ(strcmp(av[0], "tunnel"));
716 280
        av++;
717
718 280
        VTC_CHECK_NAME(vl, av[0], "Tunnel", 't');
719
720 280
        PTOK(pthread_mutex_lock(&tunnel_mtx));
721 320
        VTAILQ_FOREACH(t, &tunnels, list)
722 200
                if (!strcmp(t->name, av[0]))
723 160
                        break;
724 280
        PTOK(pthread_mutex_unlock(&tunnel_mtx));
725 280
        if (t == NULL)
726 120
                t = tunnel_new(av[0]);
727 280
        CHECK_OBJ_NOTNULL(t, TUNNEL_MAGIC);
728 280
        av++;
729
730 760
        for (; *av != NULL; av++) {
731 480
                if (vtc_error)
732 0
                        break;
733 480
                if (!strcmp(*av, "-wait")) {
734 120
                        if (t->state == TUNNEL_STOPPED)
735 0
                                vtc_fatal(t->vl, "Tunnel not -started");
736 120
                        tunnel_wait(t);
737 120
                        continue;
738
                }
739
740
                /* Don't mess with a running tunnel */
741 360
                if (t->state != TUNNEL_STOPPED)
742 0
                        tunnel_wait(t);
743
744 360
                assert(t->state == TUNNEL_STOPPED);
745 360
                if (!strcmp(*av, "-connect")) {
746 80
                        bprintf(t->connect, "%s", av[1]);
747 80
                        av++;
748 80
                        continue;
749
                }
750 280
                if (!strcmp(*av, "-listen")) {
751 0
                        bprintf(t->listen, "%s", av[1]);
752 0
                        av++;
753 0
                        continue;
754
                }
755 280
                if (!strcmp(*av, "-start")) {
756 120
                        tunnel_start(t);
757 120
                        continue;
758
                }
759 160
                if (!strcmp(*av, "-start+pause")) {
760 40
                        tunnel_start_pause(t);
761 40
                        continue;
762
                }
763 120
                if (**av == '-')
764 0
                        vtc_fatal(t->vl, "Unknown tunnel argument: %s", *av);
765 120
                t->spec = *av;
766 120
        }
767 39920
}
768
769
void
770 39640
init_tunnel(void)
771
{
772
773 39640
        PTOK(pthread_mutex_init(&tunnel_mtx, NULL));
774 39640
}