varnish-cache/bin/varnishtest/vtc_barrier.c
1
/*-
2
 * Copyright (c) 2005 Varnish Software AS
3
 * All rights reserved.
4
 *
5
 * Author: Dridi Boukelmoune <dridi@varnish-software.com>
6
 *
7
 * Redistribution and use in source and binary forms, with or without
8
 * modification, are permitted provided that the following conditions
9
 * are met:
10
 * 1. Redistributions of source code must retain the above copyright
11
 *    notice, this list of conditions and the following disclaimer.
12
 * 2. Redistributions in binary form must reproduce the above copyright
13
 *    notice, this list of conditions and the following disclaimer in the
14
 *    documentation and/or other materials provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
20
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26
 * SUCH DAMAGE.
27
 */
28
29
#include "config.h"
30
31
#include <errno.h>
32
#include <poll.h>
33
#include <stdio.h>
34
#include <stdlib.h>
35
#include <string.h>
36
#include <unistd.h>
37
38
#include <sys/socket.h>
39
#include <sys/time.h> /* for MUSL */
40
41
#include "vtc.h"
42
#include "vtcp.h"
43
44
enum barrier_e {
45
        BARRIER_NONE = 0,
46
        BARRIER_COND,
47
        BARRIER_SOCK,
48
};
49
50
struct barrier {
51
        unsigned                magic;
52
#define BARRIER_MAGIC           0x7b54c275
53
        char                    *name;
54
        VTAILQ_ENTRY(barrier)   list;
55
        pthread_mutex_t         mtx;
56
        pthread_cond_t          cond;
57
58
        unsigned                waiters;
59
        unsigned                expected;
60
        unsigned                cyclic;
61
62
        enum barrier_e          type;
63
        /* fields below are only for BARRIER_SOCK */
64
        pthread_t               thread;
65
        volatile unsigned       active;
66
        volatile unsigned       need_join;
67
};
68
69
static VTAILQ_HEAD(, barrier)   barriers = VTAILQ_HEAD_INITIALIZER(barriers);
70
71
static struct barrier *
72 369
barrier_new(const char *name, struct vtclog *vl)
73
{
74
        struct barrier *b;
75
76 369
        ALLOC_OBJ(b, BARRIER_MAGIC);
77 369
        AN(b);
78 369
        if (pthread_self() != vtc_thread)
79 0
                vtc_fatal(vl,
80
                    "Barrier %s can only be created on the top thread", name);
81 369
        REPLACE(b->name, name);
82
83 369
        AZ(pthread_mutex_init(&b->mtx, NULL));
84 369
        AZ(pthread_cond_init(&b->cond, NULL));
85 369
        b->waiters = 0;
86 369
        b->expected = 0;
87 369
        VTAILQ_INSERT_TAIL(&barriers, b, list);
88 369
        return (b);
89
}
90
91
/**********************************************************************
92
 * Init a barrier
93
 */
94
95
static void
96 369
barrier_expect(struct barrier *b, const char *av, struct vtclog *vl)
97
{
98
        unsigned expected;
99
100 369
        if (b->type != BARRIER_NONE)
101 0
                vtc_fatal(vl,
102
                    "Barrier(%s) use error: already initialized", b->name);
103
104 369
        AZ(b->expected);
105 369
        AZ(b->waiters);
106 369
        expected = strtoul(av, NULL, 0);
107 369
        if (expected < 2)
108 0
                vtc_fatal(vl,
109
                    "Barrier(%s) use error: wrong expectation (%u)",
110
                    b->name, expected);
111
112 369
        b->expected = expected;
113 369
}
114
115
static void
116 312
barrier_cond(struct barrier *b, const char *av, struct vtclog *vl)
117
{
118
119 312
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
120 312
        barrier_expect(b, av, vl);
121 312
        b->type = BARRIER_COND;
122 312
}
123
124
static void *
125 57
barrier_sock_thread(void *priv)
126
{
127
        struct barrier *b;
128
        struct vtclog *vl;
129
        const char *err;
130
        char abuf[16], pbuf[6];
131
        int i, sock, *conns;
132
        struct pollfd pfd[1];
133
134 57
        CAST_OBJ_NOTNULL(b, priv, BARRIER_MAGIC);
135 57
        assert(b->type == BARRIER_SOCK);
136
137 57
        AZ(pthread_mutex_lock(&b->mtx));
138
139 57
        vl = vtc_logopen(b->name);
140 57
        pthread_cleanup_push(vtc_logclose, vl);
141
142 57
        sock = VTCP_listen_on("127.0.0.1:0", NULL, b->expected, &err);
143 57
        if (sock < 0) {
144 0
                AZ(pthread_cond_signal(&b->cond));
145 0
                AZ(pthread_mutex_unlock(&b->mtx));
146 0
                vtc_fatal(vl, "Barrier(%s) %s fails: %s (errno=%d)",
147 0
                    b->name, err, strerror(errno), errno);
148
        }
149 57
        assert(sock > 0);
150 57
        (void)VTCP_nonblocking(sock);
151 57
        VTCP_myname(sock, abuf, sizeof abuf, pbuf, sizeof pbuf);
152
153 57
        macro_def(vl, b->name, "addr", "%s", abuf);
154 57
        macro_def(vl, b->name, "port", "%s", pbuf);
155 57
        macro_def(vl, b->name, "sock", "%s:%s", abuf, pbuf);
156
157 57
        AZ(pthread_cond_signal(&b->cond));
158 57
        AZ(pthread_mutex_unlock(&b->mtx));
159
160 58
        conns = calloc(b->expected, sizeof *conns);
161 58
        AN(conns);
162
163 544
        while (b->active) {
164 429
                pfd[0].fd = sock;
165 429
                pfd[0].events = POLLIN;
166
167 429
                i = poll(pfd, 1, 1000);
168 428
                if (i == 0)
169 236
                        continue;
170 192
                if (i < 0) {
171 0
                        if (errno == EINTR)
172 0
                                continue;
173 0
                        closefd(&sock);
174 0
                        vtc_fatal(vl,
175
                            "Barrier(%s) select fails: %s (errno=%d)",
176 0
                            b->name, strerror(errno), errno);
177
                }
178 192
                assert(i == 1);
179 192
                assert(b->waiters <= b->expected);
180 192
                if (b->waiters == b->expected)
181 0
                        vtc_fatal(vl,
182
                            "Barrier(%s) use error: "
183
                            "more waiters than the %u expected",
184
                            b->name, b->expected);
185
186 192
                i = accept(sock, NULL, NULL);
187 192
                if (i < 0) {
188 0
                        closefd(&sock);
189 0
                        vtc_fatal(vl,
190
                            "Barrier(%s) accept fails: %s (errno=%d)",
191 0
                            b->name, strerror(errno), errno);
192
                }
193
194
                /* NB. We don't keep track of the established connections, only
195
                 *     that connections were made to the barrier's socket.
196
                 */
197 192
                conns[b->waiters] = i;
198
199 192
                if (++b->waiters < b->expected) {
200 123
                        vtc_log(vl, 4, "Barrier(%s) wait %u of %u",
201
                            b->name, b->waiters, b->expected);
202 123
                        continue;
203
                }
204
205 69
                vtc_log(vl, 4, "Barrier(%s) wake %u", b->name, b->expected);
206 261
                for (i = 0; i < b->expected; i++)
207 192
                        closefd(&conns[i]);
208
209 69
                if (b->cyclic)
210 18
                        b->waiters = 0;
211
                else
212 51
                        b->active = 0;
213
        }
214
215 57
        macro_undef(vl, b->name, "addr");
216 57
        macro_undef(vl, b->name, "port");
217 57
        macro_undef(vl, b->name, "sock");
218 57
        closefd(&sock);
219 57
        free(conns);
220 57
        pthread_cleanup_pop(0);
221 57
        vtc_logclose(vl);
222 57
        return (NULL);
223
}
224
225
static void
226 57
barrier_sock(struct barrier *b, const char *av, struct vtclog *vl)
227
{
228
229 57
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
230 57
        barrier_expect(b, av, vl);
231 57
        b->type = BARRIER_SOCK;
232 57
        b->active = 1;
233 57
        b->need_join = 1;
234
235
        /* NB. We can use the BARRIER_COND's pthread_cond_t to wait until the
236
         *     socket is ready for convenience.
237
         */
238 57
        AZ(pthread_create(&b->thread, NULL, barrier_sock_thread, b));
239 57
        AZ(pthread_cond_wait(&b->cond, &b->mtx));
240 57
}
241
242
static void
243 12
barrier_cyclic(struct barrier *b, struct vtclog *vl)
244
{
245
246 12
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
247
248 12
        if (b->type == BARRIER_NONE)
249 0
                vtc_fatal(vl,
250
                    "Barrier(%s) use error: not initialized", b->name);
251
252 12
        if (b->waiters != 0)
253 0
                vtc_fatal(vl,
254
                    "Barrier(%s) use error: already in use", b->name);
255
256 12
        b->cyclic = 1;
257 12
}
258
259
/**********************************************************************
260
 * Sync a barrier
261
 */
262
263
static void
264 717
barrier_cond_sync(struct barrier *b, struct vtclog *vl)
265
{
266
267 717
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
268 717
        assert(b->type == BARRIER_COND);
269
270 717
        assert(b->waiters <= b->expected);
271 717
        if (b->waiters == b->expected)
272 0
                vtc_fatal(vl,
273
                    "Barrier(%s) use error: more waiters than the %u expected",
274
                    b->name, b->expected);
275
276 717
        if (++b->waiters == b->expected) {
277 318
                vtc_log(vl, 4, "Barrier(%s) wake %u", b->name, b->expected);
278 318
                AZ(pthread_cond_broadcast(&b->cond));
279
        } else {
280 399
                vtc_log(vl, 4, "Barrier(%s) wait %u of %u",
281
                    b->name, b->waiters, b->expected);
282 399
                AZ(pthread_cond_wait(&b->cond, &b->mtx));
283
        }
284
285 717
        if (b->cyclic)
286 48
                b->waiters = 0;
287 717
}
288
289
static void
290 129
barrier_sock_sync(struct barrier *b, struct vtclog *vl)
291
{
292
        struct vsb *vsb;
293
        const char *err;
294
        char buf[32];
295
        int i, sock;
296
        ssize_t sz;
297
298 129
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
299 129
        assert(b->type == BARRIER_SOCK);
300
301 129
        vsb = macro_expandf(vl, "${%s_sock}", b->name);
302 129
        vtc_log(vl, 4, "Barrier(%s) sync with socket", b->name);
303
304 129
        sock = VTCP_open(VSB_data(vsb), NULL, 0., &err);
305 129
        if (sock < 0)
306 0
                vtc_fatal(vl, "Barrier(%s) connection failed: %s",
307
                    b->name, err);
308
309 129
        VSB_delete(vsb);
310
311
        /* emulate pthread_cond_wait's behavior */
312 129
        AZ(pthread_mutex_unlock(&b->mtx));
313 129
        sz = read(sock, buf, sizeof buf); /* XXX loop with timeout? */
314 129
        AZ(pthread_mutex_lock(&b->mtx));
315
316 129
        i = errno;
317 129
        closefd(&sock);
318
319 129
        if (sz < 0)
320 0
                vtc_fatal(vl, "Barrier(%s) read failed: %s (errno=%d)",
321
                    b->name, strerror(i), i);
322 129
        if (sz > 0)
323 0
                vtc_fatal(vl, "Barrier(%s) unexpected data (%zdB)",
324
                    b->name, sz);
325 129
}
326
327
static void
328 846
barrier_sync(struct barrier *b, struct vtclog *vl)
329
{
330
331 846
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
332
333 846
        switch (b->type) {
334
        case BARRIER_NONE:
335 0
                vtc_fatal(vl,
336
                    "Barrier(%s) use error: not initialized", b->name);
337
                break;
338
        case BARRIER_COND:
339 717
                barrier_cond_sync(b, vl);
340 717
                break;
341
        case BARRIER_SOCK:
342 129
                barrier_sock_sync(b, vl);
343 129
                break;
344
        default:
345 0
                WRONG("Wrong barrier type");
346
        }
347 846
}
348
349
/* SECTION: barrier barrier
350
 *
351
 * NOTE: this can be used from the top-level as well as from client and server
352
 * specifications.
353
 *
354
 * Barriers allows you to synchronize different threads to make sure events
355
 * occur in the right order. It's even possible to use them in VCL.
356
 *
357
 * First, it's necessary to declare the barrier::
358
 *
359
 *         barrier bNAME TYPE NUMBER [-cyclic]
360
 *
361
 * With the arguments being:
362
 *
363
 * bNAME
364
 *         this is the name of the barrier, used to identify it when you'll
365
 *         create sync points. It must start with 'b'.
366
 *
367
 * TYPE
368
 *         it can be "cond" (mutex) or "sock" (socket) and sets internal
369
 *         behavior. If you don't need VCL synchronization, use cond.
370
 *
371
 * NUMBER
372
 *         number of sync point needed to go through the barrier.
373
 *
374
 * \-cyclic
375
 *         if present, the barrier will reset itself and be ready for another
376
 *         round once gotten through.
377
 *
378
 * Then, to add a sync point::
379
 *
380
 *         barrier bNAME sync
381
 *
382
 * This will block the parent thread until the number of sync points for bNAME
383
 * reaches the NUMBER given in the barrier declaration.
384
 *
385
 * If you wish to synchronize the VCL, you need to declare a "sock" barrier.
386
 * This will emit a macro definition named "bNAME_sock" that you can use in
387
 * VCL (after importing the debug vmod)::
388
 *
389
 *         debug.barrier_sync("${bNAME_sock}");
390
 *
391
 * This function returns 0 if everything went well and is the equivalent of
392
 * ``barrier bNAME sync`` at the VTC top-level.
393
 *
394
 *
395
 */
396
397
void
398 3504
cmd_barrier(CMD_ARGS)
399
{
400
        struct barrier *b, *b2;
401
        int r;
402
403
        (void)priv;
404
        (void)cmd;
405
406 3504
        if (av == NULL) {
407
                /* Reset and free */
408 2658
                VTAILQ_FOREACH_SAFE(b, &barriers, list, b2) {
409 369
                        r = pthread_mutex_trylock(&b->mtx);
410 369
                        assert(r == 0 || r == EBUSY);
411 369
                        switch (b->type) {
412
                        case BARRIER_COND:
413 312
                                break;
414
                        case BARRIER_SOCK:
415 57
                                if (b->need_join) {
416 57
                                        b->active = 0;
417 57
                                        AZ(pthread_join(b->thread, NULL));
418 57
                                        b->need_join = 0;
419
                                }
420 57
                                break;
421
                        default:
422 0
                                WRONG("Wrong barrier type");
423
                        }
424 369
                        if (r == 0)
425 369
                                AZ(pthread_mutex_unlock(&b->mtx));
426
                }
427 2289
                return;
428
        }
429
430 1215
        AZ(strcmp(av[0], "barrier"));
431 1215
        av++;
432
433 1215
        VTC_CHECK_NAME(vl, av[0], "Barrier", 'b');
434 2001
        VTAILQ_FOREACH(b, &barriers, list)
435 1632
                if (!strcmp(b->name, av[0]))
436 846
                        break;
437 1215
        if (b == NULL)
438 369
                b = barrier_new(av[0], vl);
439 1215
        av++;
440
441 1215
        AZ(pthread_mutex_lock(&b->mtx));
442 2442
        for (; *av != NULL; av++) {
443 1227
                if (!strcmp(*av, "cond")) {
444 312
                        av++;
445 312
                        AN(*av);
446 312
                        barrier_cond(b, *av, vl);
447 312
                        continue;
448
                }
449 915
                if (!strcmp(*av, "sock")) {
450 57
                        av++;
451 57
                        AN(*av);
452 57
                        barrier_sock(b, *av, vl);
453 57
                        continue;
454
                }
455 858
                if (!strcmp(*av, "sync")) {
456 846
                        barrier_sync(b, vl);
457 846
                        continue;
458
                }
459 12
                if (!strcmp(*av, "-cyclic")) {
460 12
                        barrier_cyclic(b, vl);
461 12
                        continue;
462
                }
463 0
                vtc_fatal(vl, "Unknown barrier argument: %s", *av);
464
        }
465 1215
        AZ(pthread_mutex_unlock(&b->mtx));
466
}