varnish-cache/bin/varnishtest/vtc_barrier.c
1
/*-
2
 * Copyright (c) 2005 Varnish Software AS
3
 * All rights reserved.
4
 *
5
 * Author: Dridi Boukelmoune <dridi@varnish-software.com>
6
 *
7
 * Redistribution and use in source and binary forms, with or without
8
 * modification, are permitted provided that the following conditions
9
 * are met:
10
 * 1. Redistributions of source code must retain the above copyright
11
 *    notice, this list of conditions and the following disclaimer.
12
 * 2. Redistributions in binary form must reproduce the above copyright
13
 *    notice, this list of conditions and the following disclaimer in the
14
 *    documentation and/or other materials provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
20
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26
 * SUCH DAMAGE.
27
 */
28
29
#include "config.h"
30
31
#include <errno.h>
32
#include <stdio.h>
33
#include <stdlib.h>
34
#include <string.h>
35
#include <unistd.h>
36
37
#include <sys/socket.h>
38
39
#include "vtc.h"
40
#include "vtcp.h"
41
42
enum barrier_e {
43
        BARRIER_NONE = 0,
44
        BARRIER_COND,
45
        BARRIER_SOCK,
46
};
47
48
struct barrier {
49
        unsigned                magic;
50
#define BARRIER_MAGIC           0x7b54c275
51
        char                    *name;
52
        VTAILQ_ENTRY(barrier)   list;
53
        pthread_mutex_t         mtx;
54
        pthread_cond_t          cond;
55
56
        unsigned                waiters;
57
        unsigned                expected;
58
        unsigned                cyclic;
59
60
        enum barrier_e          type;
61
        /* fields below are only for BARRIER_SOCK */
62
        pthread_t               thread;
63
        volatile unsigned       active;
64
        volatile unsigned       need_join;
65
};
66
67
static VTAILQ_HEAD(, barrier)   barriers = VTAILQ_HEAD_INITIALIZER(barriers);
68
69
static struct barrier *
70 87
barrier_new(const char *name, struct vtclog *vl)
71
{
72
        struct barrier *b;
73
74 87
        ALLOC_OBJ(b, BARRIER_MAGIC);
75 87
        AN(b);
76 87
        if (pthread_self() != vtc_thread)
77 0
                vtc_fatal(vl,
78
                    "Barrier %s can only be created on the top thread", name);
79 87
        REPLACE(b->name, name);
80
81 87
        AZ(pthread_mutex_init(&b->mtx, NULL));
82 87
        AZ(pthread_cond_init(&b->cond, NULL));
83 87
        b->waiters = 0;
84 87
        b->expected = 0;
85 87
        VTAILQ_INSERT_TAIL(&barriers, b, list);
86 87
        return (b);
87
}
88
89
/**********************************************************************
90
 * Init a barrier
91
 */
92
93
static void
94 87
barrier_expect(struct barrier *b, const char *av, struct vtclog *vl)
95
{
96
        unsigned expected;
97
98 87
        if (b->type != BARRIER_NONE)
99 0
                vtc_fatal(vl,
100
                    "Barrier(%s) use error: already initialized", b->name);
101
102 87
        AZ(b->expected);
103 87
        AZ(b->waiters);
104 87
        expected = strtoul(av, NULL, 0);
105 87
        if (expected < 2)
106 0
                vtc_fatal(vl,
107
                    "Barrier(%s) use error: wrong expectation (%u)",
108
                    b->name, expected);
109
110 87
        b->expected = expected;
111 87
}
112
113
static void
114 78
barrier_cond(struct barrier *b, const char *av, struct vtclog *vl)
115
{
116
117 78
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
118 78
        barrier_expect(b, av, vl);
119 78
        b->type = BARRIER_COND;
120 78
}
121
122
static void *
123 9
barrier_sock_thread(void *priv)
124
{
125
        struct barrier *b;
126
        struct vtclog *vl;
127
        struct timeval tmo;
128
        const char *err;
129
        char abuf[16], pbuf[6];
130
        int i, sock, *conns;
131
        fd_set rfds;
132
133 9
        CAST_OBJ_NOTNULL(b, priv, BARRIER_MAGIC);
134 9
        assert(b->type == BARRIER_SOCK);
135
136 9
        AZ(pthread_mutex_lock(&b->mtx));
137
138 9
        vl = vtc_logopen(b->name);
139 9
        AN(vl);
140
141 9
        sock = VTCP_listen_on("127.0.0.1:0", NULL, b->expected, &err);
142 9
        if (sock < 0) {
143 0
                AZ(pthread_cond_signal(&b->cond));
144 0
                AZ(pthread_mutex_unlock(&b->mtx));
145 0
                vtc_fatal(vl, "Barrier(%s) %s fails: %s (errno=%d)",
146 0
                    b->name, err, strerror(errno), errno);
147
        }
148 9
        assert(sock > 0);
149 9
        (void)VTCP_nonblocking(sock);
150 9
        VTCP_myname(sock, abuf, sizeof abuf, pbuf, sizeof pbuf);
151
152 9
        macro_def(vl, b->name, "addr", "%s", abuf);
153 9
        macro_def(vl, b->name, "port", "%s", pbuf);
154 9
        macro_def(vl, b->name, "sock", "%s:%s", abuf, pbuf);
155
156 9
        AZ(pthread_cond_signal(&b->cond));
157 9
        AZ(pthread_mutex_unlock(&b->mtx));
158
159 9
        conns = calloc(b->expected, sizeof *conns);
160 9
        AN(conns);
161
162 78
        while (b->active) {
163 60
                FD_ZERO(&rfds);
164 60
                FD_SET(sock, &rfds);
165
166 60
                tmo.tv_sec = 1;
167 60
                tmo.tv_usec = 0;
168 60
                i = select(sock + 1, &rfds, NULL, NULL, &tmo);
169 60
                if (i == 0)
170 30
                        continue;
171 30
                if (i < 0) {
172 0
                        if (errno == EINTR)
173 0
                                continue;
174 0
                        closefd(&sock);
175 0
                        vtc_fatal(vl,
176
                            "Barrier(%s) select fails: %s (errno=%d)",
177 0
                            b->name, strerror(errno), errno);
178
                }
179 30
                assert(i == 1);
180 30
                assert(b->waiters <= b->expected);
181 30
                if (b->waiters == b->expected)
182 0
                        vtc_fatal(vl,
183
                            "Barrier(%s) use error: "
184
                            "more waiters than the %u expected",
185
                            b->name, b->expected);
186
187 30
                i = accept(sock, NULL, NULL);
188 30
                if (i < 0) {
189 0
                        closefd(&sock);
190 0
                        vtc_fatal(vl,
191
                            "Barrier(%s) accept fails: %s (errno=%d)",
192 0
                            b->name, strerror(errno), errno);
193
                }
194
195
                /* NB. We don't keep track of the established connections, only
196
                 *     that connections were made to the barrier's socket.
197
                 */
198 30
                conns[b->waiters] = i;
199
200 30
                if (++b->waiters < b->expected) {
201 20
                        vtc_log(vl, 4, "Barrier(%s) wait %u of %u",
202
                            b->name, b->waiters, b->expected);
203 20
                        continue;
204
                }
205
206 10
                vtc_log(vl, 4, "Barrier(%s) wake %u", b->name, b->expected);
207 40
                for (i = 0; i < b->expected; i++)
208 30
                        closefd(&conns[i]);
209
210 10
                if (b->cyclic)
211 2
                        b->waiters = 0;
212
                else
213 8
                        b->active = 0;
214
        }
215
216 9
        macro_undef(vl, b->name, "addr");
217 9
        macro_undef(vl, b->name, "port");
218 9
        macro_undef(vl, b->name, "sock");
219 9
        closefd(&sock);
220 9
        free(conns);
221
222 9
        return (NULL);
223
}
224
225
static void
226 9
barrier_sock(struct barrier *b, const char *av, struct vtclog *vl)
227
{
228
229 9
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
230 9
        barrier_expect(b, av, vl);
231 9
        b->type = BARRIER_SOCK;
232 9
        b->active = 1;
233 9
        b->need_join = 1;
234
235
        /* NB. We can use the BARRIER_COND's pthread_cond_t to wait until the
236
         *     socket is ready for convenience.
237
         */
238 9
        AZ(pthread_create(&b->thread, NULL, barrier_sock_thread, b));
239 9
        AZ(pthread_cond_wait(&b->cond, &b->mtx));
240 9
}
241
242
static void
243 3
barrier_cyclic(struct barrier *b, struct vtclog *vl)
244
{
245
246 3
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
247
248 3
        if (b->type == BARRIER_NONE)
249 0
                vtc_fatal(vl,
250
                    "Barrier(%s) use error: not initialized", b->name);
251
252 3
        if (b->waiters != 0)
253 0
                vtc_fatal(vl,
254
                    "Barrier(%s) use error: already in use", b->name);
255
256 3
        b->cyclic = 1;
257 3
}
258
259
/**********************************************************************
260
 * Sync a barrier
261
 */
262
263
static void
264 186
barrier_cond_sync(struct barrier *b, struct vtclog *vl)
265
{
266
267 186
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
268 186
        assert(b->type == BARRIER_COND);
269
270 186
        assert(b->waiters <= b->expected);
271 186
        if (b->waiters == b->expected)
272 0
                vtc_fatal(vl,
273
                    "Barrier(%s) use error: more waiters than the %u expected",
274
                    b->name, b->expected);
275
276 186
        if (++b->waiters == b->expected) {
277 80
                vtc_log(vl, 4, "Barrier(%s) wake %u", b->name, b->expected);
278 80
                AZ(pthread_cond_broadcast(&b->cond));
279
        }
280
        else {
281 106
                vtc_log(vl, 4, "Barrier(%s) wait %u of %u",
282
                    b->name, b->waiters, b->expected);
283 106
                AZ(pthread_cond_wait(&b->cond, &b->mtx));
284
        }
285
286 186
        if (b->cyclic)
287 16
                b->waiters = 0;
288 186
}
289
290
static void
291 24
barrier_sock_sync(struct barrier *b, struct vtclog *vl)
292
{
293
        struct vsb *vsb;
294
        const char *err;
295
        char buf[32];
296
        int i, sock;
297
        ssize_t sz;
298
299 24
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
300 24
        assert(b->type == BARRIER_SOCK);
301
302 24
        i = snprintf(buf, sizeof buf, "${%s_sock}", b->name);
303 24
        assert(i > 0 && i < sizeof buf);
304 24
        vsb = macro_expand(vl, buf);
305 24
        vtc_log(vl, 4, "Barrier(%s) sync with socket", b->name);
306
307 24
        sock = VTCP_open(VSB_data(vsb), NULL, 0., &err);
308 24
        if (sock < 0)
309 0
                vtc_fatal(vl, "Barrier(%s) connection failed: %s",
310
                    b->name, err);
311
312 24
        VSB_delete(vsb);
313
314
        /* emulate pthread_cond_wait's behavior */
315 24
        AZ(pthread_mutex_unlock(&b->mtx));
316 24
        sz = read(sock, buf, sizeof buf); /* XXX loop with timeout? */
317 24
        AZ(pthread_mutex_lock(&b->mtx));
318
319 24
        i = errno;
320 24
        closefd(&sock);
321
322 24
        if (sz < 0)
323 0
                vtc_fatal(vl, "Barrier(%s) read failed: %s (errno=%d)",
324
                    b->name, strerror(i), i);
325 24
        if (sz > 0)
326 0
                vtc_fatal(vl, "Barrier(%s) unexpected data (%zdB)",
327
                    b->name, sz);
328 24
}
329
330
static void
331 210
barrier_sync(struct barrier *b, struct vtclog *vl)
332
{
333
334 210
        CHECK_OBJ_NOTNULL(b, BARRIER_MAGIC);
335
336 210
        switch (b->type) {
337
        case BARRIER_NONE:
338 0
                vtc_fatal(vl,
339
                    "Barrier(%s) use error: not initialized", b->name);
340
                break;
341
        case BARRIER_COND:
342 186
                barrier_cond_sync(b, vl);
343 186
                break;
344
        case BARRIER_SOCK:
345 24
                barrier_sock_sync(b, vl);
346 24
                break;
347
        default:
348 0
                WRONG("Wrong barrier type");
349
        }
350 210
}
351
352
/* SECTION: barrier barrier
353
 *
354
 * NOTE: this can be used from the top-level as well as from client and server
355
 * specifications.
356
 *
357
 * Barriers allows you to synchronize different threads to make sure events
358
 * occur in the right order. It's even possible to use them in VCL.
359
 *
360
 * First, it's necessary to declare the barrier::
361
 *
362
 *         barrier bNAME TYPE NUMBER [-cyclic]
363
 *
364
 * With the arguments being:
365
 *
366
 * bNAME
367
 *         this is the name of the barrier, used to identify it when you'll
368
 *         create sync points. It must start with 'b'.
369
 *
370
 * TYPE
371
 *         it can be "cond" (mutex) or "sock" (socket) and sets internal
372
 *         behavior. If you don't need VCL synchronization, use cond.
373
 *
374
 * NUMBER
375
 *         number of sync point needed to go through the barrier.
376
 *
377
 * \-cyclic
378
 *         if present, the barrier will reset itself and be ready for another
379
 *         round once gotten through.
380
 *
381
 * Then, to add a sync point::
382
 *
383
 *         barrier bNAME sync
384
 *
385
 * This will block the parent thread until the number of sync points for bNAME
386
 * reaches the NUMBER given in the barrier declaration.
387
 *
388
 * If you wish to synchronize the VCL, you need to declare a "sock" barrier.
389
 * This will emit a macro definition named "bNAME_sock" that you can use in
390
 * VCL (after importing the debug vmod)::
391
 *
392
 *         debug.barrier_sync("${bNAME_sock}");
393
 *
394
 * This function returns 0 if everything went well and is the equivalent of
395
 * ``barrier bNAME sync`` at the VTC top-level.
396
 *
397
 *
398
 */
399
400
void
401 975
cmd_barrier(CMD_ARGS)
402
{
403
        struct barrier *b, *b2;
404
        int r;
405
406
        (void)priv;
407
        (void)cmd;
408
409 975
        if (av == NULL) {
410
                /* Reset and free */
411 765
                VTAILQ_FOREACH_SAFE(b, &barriers, list, b2) {
412 87
                        r = pthread_mutex_trylock(&b->mtx);
413 87
                        assert(r == 0 || r == EBUSY);
414 87
                        switch (b->type) {
415
                        case BARRIER_COND:
416 78
                                break;
417
                        case BARRIER_SOCK:
418 9
                                if (b->need_join) {
419 9
                                        b->active = 0;
420 9
                                        AZ(pthread_join(b->thread, NULL));
421 9
                                        b->need_join = 0;
422
                                }
423 9
                                break;
424
                        default:
425 0
                                WRONG("Wrong barrier type");
426
                        }
427 87
                        AZ(pthread_mutex_unlock(&b->mtx));
428
                }
429 1653
                return;
430
        }
431
432 297
        AZ(strcmp(av[0], "barrier"));
433 297
        av++;
434
435 297
        VTC_CHECK_NAME(vl, av[0], "Barrier", 'b');
436 477
        VTAILQ_FOREACH(b, &barriers, list)
437 390
                if (!strcmp(b->name, av[0]))
438 210
                        break;
439 297
        if (b == NULL)
440 87
                b = barrier_new(av[0], vl);
441 297
        av++;
442
443 297
        AZ(pthread_mutex_lock(&b->mtx));
444 597
        for (; *av != NULL; av++) {
445 300
                if (!strcmp(*av, "cond")) {
446 78
                        av++;
447 78
                        AN(*av);
448 78
                        barrier_cond(b, *av, vl);
449 78
                        continue;
450
                }
451 222
                if (!strcmp(*av, "sock")) {
452 9
                        av++;
453 9
                        AN(*av);
454 9
                        barrier_sock(b, *av, vl);
455 9
                        continue;
456
                }
457 213
                if (!strcmp(*av, "sync")) {
458 210
                        barrier_sync(b, vl);
459 210
                        continue;
460
                }
461 3
                if (!strcmp(*av, "-cyclic")) {
462 3
                        barrier_cyclic(b, vl);
463 3
                        continue;
464
                }
465 0
                vtc_fatal(vl, "Unknown barrier argument: %s", *av);
466
        }
467 297
        AZ(pthread_mutex_unlock(&b->mtx));
468
}