varnish-cache/bin/varnishtest/vtc_barrier.c
1
/*-
2
 * Copyright (c) 2005 Varnish Software AS
3
 * All rights reserved.
4
 *
5
 * Author: Dridi Boukelmoune <dridi@varnish-software.com>
6
 *
7
 * Redistribution and use in source and binary forms, with or without
8
 * modification, are permitted provided that the following conditions
9
 * are met:
10
 * 1. Redistributions of source code must retain the above copyright
11
 *    notice, this list of conditions and the following disclaimer.
12
 * 2. Redistributions in binary form must reproduce the above copyright
13
 *    notice, this list of conditions and the following disclaimer in the
14
 *    documentation and/or other materials provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
20
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26
 * SUCH DAMAGE.
27
 */
28
29
#include "config.h"
30
31
#include <poll.h>
32
#include <stdio.h>
33
#include <stdlib.h>
34
#include <string.h>
35
#include <unistd.h>
36
37
#include <sys/socket.h>
38
#include <sys/time.h> /* for MUSL */
39
40
#include "vtc.h"
41
#include "vtcp.h"
42
43
enum barrier_e {
44
        BARRIER_NONE = 0,
45
        BARRIER_COND,
46
        BARRIER_SOCK,
47
};
48
49
struct barrier {
50
        unsigned                magic;
51
#define BARRIER_MAGIC           0x7b54c275
52
        char                    *name;
53
        VTAILQ_ENTRY(barrier)   list;
54
        pthread_mutex_t         mtx;
55
        pthread_cond_t          cond;
56
57
        int                     waiters;
58
        int                     expected;
59
        int                     cyclic;
60
61
        enum barrier_e          type;
62
        /* fields below are only for BARRIER_SOCK */
63
        pthread_t               thread;
64
        volatile unsigned       active;
65
        volatile unsigned       need_join;
66
};
67
68
static VTAILQ_HEAD(, barrier)   barriers = VTAILQ_HEAD_INITIALIZER(barriers);
69
70
static struct barrier *
71 2352
barrier_new(const char *name, struct vtclog *vl)
72
{
73
        struct barrier *b;
74
75 2352
        ALLOC_OBJ(b, BARRIER_MAGIC);
76 2352
        AN(b);
77 2352
        if (pthread_self() != vtc_thread)
78 0
                vtc_fatal(vl,
79 0
                    "Barrier %s can only be created on the top thread", name);
80 2352
        REPLACE(b->name, name);
81
82 2352
        AZ(pthread_mutex_init(&b->mtx, NULL));
83 2352
        AZ(pthread_cond_init(&b->cond, NULL));
84 2352
        b->waiters = 0;
85 2352
        b->expected = 0;
86 2352
        VTAILQ_INSERT_TAIL(&barriers, b, list);
87 2352
        return (b);
88
}
89
90
/**********************************************************************
91
 * Init a barrier
92
 */
93
94
static void
95 2352
barrier_expect(struct barrier *b, const char *av, struct vtclog *vl)
96
{
97
        unsigned expected;
98
99 2352
        if (b->type != BARRIER_NONE)
100 0
                vtc_fatal(vl,
101 0
                    "Barrier(%s) use error: already initialized", b->name);
102
103 2352
        AZ(b->expected);
104 2352
        AZ(b->waiters);
105 2352
        expected = strtoul(av, NULL, 0);
106 2352
        if (expected < 2)
107 0
                vtc_fatal(vl,
108
                    "Barrier(%s) use error: wrong expectation (%u)",
109 0
                    b->name, expected);
110
111 2352
        b->expected = expected;
112 2352
}
113
114
static void
115 1952
barrier_cond(struct barrier *b, const char *av, struct vtclog *vl)
116
{
117
118 1952
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
119 1952
        barrier_expect(b, av, vl);
120 1952
        b->type = BARRIER_COND;
121 1952
}
122
123
static void *
124 400
barrier_sock_thread(void *priv)
125
{
126
        struct barrier *b;
127
        struct vtclog *vl;
128
        const char *err;
129
        char abuf[16], pbuf[6];
130
        int i, sock, *conns;
131
        struct pollfd pfd[1];
132
133 400
        CAST_OBJ_NOTNULL(b, priv, BARRIER_MAGIC);
134 400
        assert(b->type == BARRIER_SOCK);
135
136 400
        AZ(pthread_mutex_lock(&b->mtx));
137
138 400
        vl = vtc_logopen(b->name);
139 400
        pthread_cleanup_push(vtc_logclose, vl);
140
141 400
        sock = VTCP_listen_on("127.0.0.1:0", NULL, b->expected, &err);
142 400
        if (sock < 0) {
143 0
                AZ(pthread_cond_signal(&b->cond));
144 0
                AZ(pthread_mutex_unlock(&b->mtx));
145 0
                vtc_fatal(vl, "Barrier(%s) %s fails: %s (errno=%d)",
146 0
                    b->name, err, strerror(errno), errno);
147
        }
148 400
        assert(sock > 0);
149 400
        VTCP_nonblocking(sock);
150 400
        VTCP_myname(sock, abuf, sizeof abuf, pbuf, sizeof pbuf);
151
152 400
        macro_def(vl, b->name, "addr", "%s", abuf);
153 400
        macro_def(vl, b->name, "port", "%s", pbuf);
154 400
        macro_def(vl, b->name, "sock", "%s:%s", abuf, pbuf);
155
156 400
        AZ(pthread_cond_signal(&b->cond));
157 400
        AZ(pthread_mutex_unlock(&b->mtx));
158
159 400
        conns = calloc(b->expected, sizeof *conns);
160 400
        AN(conns);
161
162 2474
        while (b->active) {
163 2072
                pfd[0].fd = sock;
164 2072
                pfd[0].events = POLLIN;
165
166 2072
                i = poll(pfd, 1, 1000);
167 2072
                if (i == 0)
168 799
                        continue;
169 1264
                if (i < 0) {
170 0
                        if (errno == EINTR)
171 0
                                continue;
172 0
                        closefd(&sock);
173 0
                        vtc_fatal(vl,
174
                            "Barrier(%s) select fails: %s (errno=%d)",
175 0
                            b->name, strerror(errno), errno);
176
                }
177 1264
                assert(i == 1);
178 1264
                assert(b->waiters <= b->expected);
179 1264
                if (b->waiters == b->expected)
180 0
                        vtc_fatal(vl,
181
                            "Barrier(%s) use error: "
182
                            "more waiters than the %u expected",
183 0
                            b->name, b->expected);
184
185 1264
                i = accept(sock, NULL, NULL);
186 1264
                if (i < 0) {
187 0
                        closefd(&sock);
188 0
                        vtc_fatal(vl,
189
                            "Barrier(%s) accept fails: %s (errno=%d)",
190 0
                            b->name, strerror(errno), errno);
191
                }
192
193
                /* NB. We don't keep track of the established connections, only
194
                 *     that connections were made to the barrier's socket.
195
                 */
196 1264
                conns[b->waiters] = i;
197
198 1264
                if (++b->waiters < b->expected) {
199 1600
                        vtc_log(vl, 4, "Barrier(%s) wait %u of %u",
200 800
                            b->name, b->waiters, b->expected);
201 800
                        continue;
202
                }
203
204 464
                vtc_log(vl, 4, "Barrier(%s) wake %u", b->name, b->expected);
205 1728
                for (i = 0; i < b->expected; i++)
206 1264
                        closefd(&conns[i]);
207
208 464
                if (b->cyclic)
209 96
                        b->waiters = 0;
210
                else
211 368
                        b->active = 0;
212
        }
213
214 400
        macro_undef(vl, b->name, "addr");
215 400
        macro_undef(vl, b->name, "port");
216 400
        macro_undef(vl, b->name, "sock");
217 400
        closefd(&sock);
218 400
        free(conns);
219 400
        pthread_cleanup_pop(0);
220 400
        vtc_logclose(vl);
221 400
        return (NULL);
222
}
223
224
static void
225 400
barrier_sock(struct barrier *b, const char *av, struct vtclog *vl)
226
{
227
228 400
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
229 400
        barrier_expect(b, av, vl);
230 400
        b->type = BARRIER_SOCK;
231 400
        b->active = 1;
232 400
        b->need_join = 1;
233
234
        /* NB. We can use the BARRIER_COND's pthread_cond_t to wait until the
235
         *     socket is ready for convenience.
236
         */
237 400
        AZ(pthread_create(&b->thread, NULL, barrier_sock_thread, b));
238 400
        AZ(pthread_cond_wait(&b->cond, &b->mtx));
239 400
}
240
241
static void
242 64
barrier_cyclic(struct barrier *b, struct vtclog *vl)
243
{
244
245 64
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
246
247 64
        if (b->type == BARRIER_NONE)
248 0
                vtc_fatal(vl,
249 0
                    "Barrier(%s) use error: not initialized", b->name);
250
251 64
        if (b->waiters != 0)
252 0
                vtc_fatal(vl,
253 0
                    "Barrier(%s) use error: already in use", b->name);
254
255 64
        b->cyclic = 1;
256 64
}
257
258
/**********************************************************************
259
 * Sync a barrier
260
 */
261
262
static void
263 4592
barrier_cond_sync(struct barrier *b, struct vtclog *vl)
264
{
265
266 4592
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
267 4592
        assert(b->type == BARRIER_COND);
268
269 4592
        assert(b->waiters <= b->expected);
270 4592
        if (b->waiters == b->expected)
271 0
                vtc_fatal(vl,
272
                    "Barrier(%s) use error: more waiters than the %u expected",
273 0
                    b->name, b->expected);
274
275 4592
        if (++b->waiters == b->expected) {
276 1984
                vtc_log(vl, 4, "Barrier(%s) wake %u", b->name, b->expected);
277 1984
                if (b->cyclic)
278 64
                        b->waiters = 0;
279 1984
                AZ(pthread_cond_broadcast(&b->cond));
280 1984
        } else {
281 5216
                vtc_log(vl, 4, "Barrier(%s) wait %u of %u",
282 2608
                    b->name, b->waiters, b->expected);
283 2608
                AZ(pthread_cond_wait(&b->cond, &b->mtx));
284
        }
285 4592
}
286
287
static void
288 848
barrier_sock_sync(struct barrier *b, struct vtclog *vl)
289
{
290
        struct vsb *vsb;
291
        const char *err;
292
        char buf[32];
293
        int i, sock;
294
        ssize_t sz;
295
296 848
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
297 848
        assert(b->type == BARRIER_SOCK);
298
299 848
        vsb = macro_expandf(vl, "${%s_sock}", b->name);
300 848
        vtc_log(vl, 4, "Barrier(%s) sync with socket", b->name);
301
302 848
        sock = VTCP_open(VSB_data(vsb), NULL, 0., &err);
303 848
        if (sock < 0)
304 0
                vtc_fatal(vl, "Barrier(%s) connection failed: %s",
305 0
                    b->name, err);
306
307 848
        VSB_delete(vsb);
308
309
        /* emulate pthread_cond_wait's behavior */
310 848
        AZ(pthread_mutex_unlock(&b->mtx));
311 848
        sz = read(sock, buf, sizeof buf); /* XXX loop with timeout? */
312 848
        AZ(pthread_mutex_lock(&b->mtx));
313
314 848
        i = errno;
315 848
        closefd(&sock);
316
317 848
        if (sz < 0)
318 0
                vtc_fatal(vl, "Barrier(%s) read failed: %s (errno=%d)",
319 0
                    b->name, strerror(i), i);
320 848
        if (sz > 0)
321 0
                vtc_fatal(vl, "Barrier(%s) unexpected data (%zdB)",
322 0
                    b->name, sz);
323 848
}
324
325
static void
326 5440
barrier_sync(struct barrier *b, struct vtclog *vl)
327
{
328
329 5440
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
330
331 5440
        switch (b->type) {
332
        case BARRIER_NONE:
333 0
                vtc_fatal(vl,
334 0
                    "Barrier(%s) use error: not initialized", b->name);
335
                break;
336
        case BARRIER_COND:
337 4592
                barrier_cond_sync(b, vl);
338 4592
                break;
339
        case BARRIER_SOCK:
340 848
                barrier_sock_sync(b, vl);
341 848
                break;
342
        default:
343 0
                WRONG("Wrong barrier type");
344 0
        }
345 5440
}
346
347
/* SECTION: barrier barrier
348
 *
349
 * NOTE: This command is available everywhere commands are given.
350
 *
351
 * Barriers allows you to synchronize different threads to make sure events
352
 * occur in the right order. It's even possible to use them in VCL.
353
 *
354
 * First, it's necessary to declare the barrier::
355
 *
356
 *         barrier bNAME TYPE NUMBER [-cyclic]
357
 *
358
 * With the arguments being:
359
 *
360
 * bNAME
361
 *         this is the name of the barrier, used to identify it when you'll
362
 *         create sync points. It must start with 'b'.
363
 *
364
 * TYPE
365
 *         it can be "cond" (mutex) or "sock" (socket) and sets internal
366
 *         behavior. If you don't need VCL synchronization, use cond.
367
 *
368
 * NUMBER
369
 *         number of sync point needed to go through the barrier.
370
 *
371
 * \-cyclic
372
 *         if present, the barrier will reset itself and be ready for another
373
 *         round once gotten through.
374
 *
375
 * Then, to add a sync point::
376
 *
377
 *         barrier bNAME sync
378
 *
379
 * This will block the parent thread until the number of sync points for bNAME
380
 * reaches the NUMBER given in the barrier declaration.
381
 *
382
 * If you wish to synchronize the VCL, you need to declare a "sock" barrier.
383
 * This will emit a macro definition named "bNAME_sock" that you can use in
384
 * VCL (after importing the debug vmod)::
385
 *
386
 *         debug.barrier_sync("${bNAME_sock}");
387
 *
388
 * This function returns 0 if everything went well and is the equivalent of
389
 * ``barrier bNAME sync`` at the VTC top-level.
390
 *
391
 *
392
 */
393
394
void
395 20592
cmd_barrier(CMD_ARGS)
396
{
397
        struct barrier *b, *b2;
398
        int r;
399
400 20592
        (void)priv;
401 20592
        (void)cmd;
402
403 20592
        if (av == NULL) {
404
                /* Reset and free */
405 15152
                VTAILQ_FOREACH_SAFE(b, &barriers, list, b2) {
406 2352
                        r = pthread_mutex_trylock(&b->mtx);
407 2352
                        assert(r == 0 || r == EBUSY);
408 2352
                        switch (b->type) {
409
                        case BARRIER_COND:
410 1952
                                break;
411
                        case BARRIER_SOCK:
412 400
                                if (b->need_join) {
413 400
                                        b->active = 0;
414 400
                                        AZ(pthread_join(b->thread, NULL));
415 400
                                        b->need_join = 0;
416 400
                                }
417 400
                                break;
418
                        default:
419 0
                                WRONG("Wrong barrier type");
420 0
                        }
421 2352
                        if (r == 0)
422 2349
                                AZ(pthread_mutex_unlock(&b->mtx));
423 2352
                }
424 12800
                return;
425
        }
426
427 7792
        AZ(strcmp(av[0], "barrier"));
428 7792
        av++;
429
430 7792
        VTC_CHECK_NAME(vl, av[0], "Barrier", 'b');
431 14654
        VTAILQ_FOREACH(b, &barriers, list)
432 12303
                if (!strcmp(b->name, av[0]))
433 5439
                        break;
434 7791
        if (b == NULL)
435 2352
                b = barrier_new(av[0], vl);
436 7791
        av++;
437
438 7791
        AZ(pthread_mutex_lock(&b->mtx));
439 15648
        for (; *av != NULL; av++) {
440 7856
                if (!strcmp(*av, "cond")) {
441 1952
                        av++;
442 1952
                        AN(*av);
443 1952
                        barrier_cond(b, *av, vl);
444 1952
                        continue;
445
                }
446 5904
                if (!strcmp(*av, "sock")) {
447 400
                        av++;
448 400
                        AN(*av);
449 400
                        barrier_sock(b, *av, vl);
450 400
                        continue;
451
                }
452 5504
                if (!strcmp(*av, "sync")) {
453 5440
                        barrier_sync(b, vl);
454 5440
                        continue;
455
                }
456 64
                if (!strcmp(*av, "-cyclic")) {
457 64
                        barrier_cyclic(b, vl);
458 64
                        continue;
459
                }
460 0
                vtc_fatal(vl, "Unknown barrier argument: %s", *av);
461
        }
462 7792
        AZ(pthread_mutex_unlock(&b->mtx));
463 20591
}