varnish-cache/lib/libvarnishapi/vsl_dispatch.c
0
/*-
1
 * Copyright (c) 2006 Verdens Gang AS
2
 * Copyright (c) 2006-2015 Varnish Software AS
3
 * All rights reserved.
4
 *
5
 * Author: Martin Blix Grydeland <martin@varnish-software.com>
6
 *
7
 * SPDX-License-Identifier: BSD-2-Clause
8
 *
9
 * Redistribution and use in source and binary forms, with or without
10
 * modification, are permitted provided that the following conditions
11
 * are met:
12
 * 1. Redistributions of source code must retain the above copyright
13
 *    notice, this list of conditions and the following disclaimer.
14
 * 2. Redistributions in binary form must reproduce the above copyright
15
 *    notice, this list of conditions and the following disclaimer in the
16
 *    documentation and/or other materials provided with the distribution.
17
 *
18
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
22
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28
 * SUCH DAMAGE.
29
 *
30
 */
31
32
#include "config.h"
33
34
#include <stdarg.h>
35
#include <stdint.h>
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <string.h>
39
40
#include "vdef.h"
41
#include "vas.h"
42
#include "miniobj.h"
43
44
#include "vqueue.h"
45
#include "vre.h"
46
#include "vtim.h"
47
#include "vtree.h"
48
49
#include "vapi/vsl.h"
50
51
#include "vsl_api.h"
52
53
#define VTX_CACHE 10
54
#define VTX_BUFSIZE_MIN 64
55
#define VTX_SHMCHUNKS 3
56
57
static const char * const vsl_t_names[VSL_t__MAX] = {
58
        [VSL_t_unknown] = "unknown",
59
        [VSL_t_sess]    = "sess",
60
        [VSL_t_req]     = "req",
61
        [VSL_t_bereq]   = "bereq",
62
        [VSL_t_raw]     = "raw",
63
};
64
65
static const char * const vsl_r_names[VSL_r__MAX] = {
66
        [VSL_r_unknown] = "unknown",
67
        [VSL_r_http_1]  = "HTTP/1",
68
        [VSL_r_rxreq]   = "rxreq",
69
        [VSL_r_esi]     = "esi",
70
        [VSL_r_restart] = "restart",
71
        [VSL_r_pass]    = "pass",
72
        [VSL_r_fetch]   = "fetch",
73
        [VSL_r_bgfetch] = "bgfetch",
74
        [VSL_r_pipe]    = "pipe",
75
};
76
77
struct vtx;
78
VTAILQ_HEAD(vtxhead, vtx);
79
80
struct vslc_raw {
81
        unsigned                magic;
82
#define VSLC_RAW_MAGIC          0x247EBD44
83
84
        struct VSL_cursor       cursor;
85
86
        const uint32_t          *ptr;
87
};
88
89
struct synth {
90
        unsigned                magic;
91
#define SYNTH_MAGIC             0xC654479F
92
93
        VTAILQ_ENTRY(synth)     list;
94
        size_t                  offset;
95
        uint32_t                data[VSL_OVERHEAD + VSL_WORDS(64)];
96
};
97
VTAILQ_HEAD(synthhead, synth);
98
99
enum chunk_t {
100
        chunk_t__unassigned,
101
        chunk_t_shm,
102
        chunk_t_buf,
103
};
104
105
struct chunk {
106
        unsigned                                magic;
107
#define CHUNK_MAGIC                             0x48DC0194
108
        enum chunk_t                            type;
109
        union {
110
                struct {
111
                        struct VSLC_ptr         start;
112
                        VTAILQ_ENTRY(chunk)     shmref;
113
                } shm;
114
                struct {
115
                        uint32_t                *data;
116
                        size_t                  space;
117
                } buf;
118
        };
119
        size_t                                  len;
120
        struct vtx                              *vtx;
121
        VTAILQ_ENTRY(chunk)                     list;
122
};
123
VTAILQ_HEAD(chunkhead, chunk);
124
125
struct vslc_vtx {
126
        unsigned                magic;
127
#define VSLC_VTX_MAGIC          0x74C6523F
128
129
        struct VSL_cursor       cursor;
130
131
        struct vtx              *vtx;
132
        struct synth            *synth;
133
        struct chunk            *chunk;
134
        size_t                  chunkstart;
135
        size_t                  offset;
136
};
137
138
struct vtx_key {
139
        uint64_t                vxid;
140
        VRBT_ENTRY(vtx_key)     entry;
141
};
142
VRBT_HEAD(vtx_tree, vtx_key);
143
144
struct vtx {
145
        struct vtx_key          key;
146
        unsigned                magic;
147
#define VTX_MAGIC               0xACC21D09
148
        VTAILQ_ENTRY(vtx)       list_child;
149
        VTAILQ_ENTRY(vtx)       list_vtx;
150
151
        double                  t_start;
152
        unsigned                flags;
153
#define VTX_F_BEGIN             0x1 /* Begin record processed */
154
#define VTX_F_END               0x2 /* End record processed */
155
#define VTX_F_COMPLETE          0x4 /* Marked complete. No new children
156
                                       should be appended */
157
#define VTX_F_READY             0x8 /* This vtx and all it's children are
158
                                       complete */
159
160
        enum VSL_transaction_e  type;
161
        enum VSL_reason_e       reason;
162
163
        struct vtx              *parent;
164
        struct vtxhead          child;
165
        unsigned                n_child;
166
        unsigned                n_childready;
167
        unsigned                n_descend;
168
169
        struct synthhead        synth;
170
171
        struct chunk            shmchunks[VTX_SHMCHUNKS];
172
        struct chunkhead        shmchunks_free;
173
174
        struct chunkhead        chunks;
175
        size_t                  len;
176
177
        struct vslc_vtx         c;
178
};
179
180
struct VSLQ {
181
        unsigned                magic;
182
#define VSLQ_MAGIC              0x23A8BE97
183
184
        struct VSL_data         *vsl;
185
        struct VSL_cursor       *c;
186
        struct vslq_query       *query;
187
188
        enum VSL_grouping_e     grouping;
189
190
        /* Structured mode */
191
        struct vtx_tree         tree;
192
        struct vtxhead          ready;
193
        struct vtxhead          incomplete;
194
        int                     n_outstanding;
195
        struct chunkhead        shmrefs;
196
        struct vtxhead          cache;
197
        unsigned                n_cache;
198
199
        /* Rate limiting */
200
        double                  credits;
201
        vtim_mono               last_use;
202
203
        /* Raw mode */
204
        struct {
205
                struct vslc_raw         c;
206
                struct VSL_transaction  trans;
207
                struct VSL_transaction  *ptrans[2];
208
                struct VSLC_ptr         start;
209
                ssize_t                 len;
210
                ssize_t                 offset;
211
        } raw;
212
};
213
214
static void vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...);
215
/*lint -esym(534, vtx_diag) */
216
static int vtx_diag(struct vtx *vtx, const char *msg);
217
/*lint -esym(534, vtx_diag_tag) */
218
static int vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr,
219
    const char *reason);
220
221
static inline int
222 504419
vtx_keycmp(const struct vtx_key *a, const struct vtx_key *b)
223
{
224 504419
        if (a->vxid < b->vxid)
225 14381
                return (-1);
226 490038
        if (a->vxid > b->vxid)
227 198240
                return (1);
228 291798
        return (0);
229 504419
}
230
231 33834
VRBT_GENERATE_REMOVE_COLOR(vtx_tree, vtx_key, entry, static)
232 83065
VRBT_GENERATE_REMOVE(vtx_tree, vtx_key, entry, static)
233 41397
VRBT_GENERATE_INSERT_COLOR(vtx_tree, vtx_key, entry, static)
234 50703
VRBT_GENERATE_INSERT_FINISH(vtx_tree, vtx_key, entry, static)
235 93429
VRBT_GENERATE_INSERT(vtx_tree, vtx_key, entry, vtx_keycmp, static)
236 537251
VRBT_GENERATE_FIND(vtx_tree, vtx_key, entry, vtx_keycmp, static)
237
238
static enum vsl_status v_matchproto_(vslc_next_f)
239 907817
vslc_raw_next(const struct VSL_cursor *cursor)
240
{
241
        struct vslc_raw *c;
242
243 907817
        CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
244 907817
        assert(&c->cursor == cursor);
245
246 907817
        AN(c->ptr);
247 907817
        if (c->cursor.rec.ptr == NULL) {
248 456625
                c->cursor.rec.ptr = c->ptr;
249 456625
                return (vsl_more);
250
        } else {
251 451192
                c->cursor.rec.ptr = NULL;
252 451192
                return (vsl_end);
253
        }
254 907817
}
255
256
static enum vsl_status v_matchproto_(vslc_reset_f)
257 131960
vslc_raw_reset(const struct VSL_cursor *cursor)
258
{
259
        struct vslc_raw *c;
260
261 131960
        CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
262 131960
        assert(&c->cursor == cursor);
263
264 131960
        AN(c->ptr);
265 131960
        c->cursor.rec.ptr = NULL;
266
267 131960
        return (vsl_end);
268
}
269
270
static const struct vslc_tbl vslc_raw_tbl = {
271
        .magic  = VSLC_TBL_MAGIC,
272
        .delete = NULL,
273
        .next   = vslc_raw_next,
274
        .reset  = vslc_raw_reset,
275
        .check  = NULL,
276
};
277
278
static enum vsl_status v_matchproto_(vslc_next_f)
279 3442768
vslc_vtx_next(const struct VSL_cursor *cursor)
280
{
281
        struct vslc_vtx *c;
282
        const uint32_t *ptr;
283
        unsigned overrun;
284
285 3442768
        CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
286 3442768
        assert(&c->cursor == cursor);
287 3442768
        CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
288
289 3442768
        do {
290 3701817
                CHECK_OBJ_ORNULL(c->synth, SYNTH_MAGIC);
291 3701817
                if (c->synth != NULL && c->synth->offset == c->offset) {
292
                        /* We're at the offset of the next synth record,
293
                           point to it and advance the pointer */
294 11922
                        c->cursor.rec.ptr = c->synth->data;
295 11922
                        c->synth = VTAILQ_NEXT(c->synth, list);
296 11922
                } else {
297 3689895
                        overrun = c->offset > c->vtx->len;
298 3689895
                        AZ(overrun);
299 3689895
                        if (c->offset == c->vtx->len)
300 329059
                                return (vsl_end);
301
302
                        /* Advance chunk pointer */
303 3360836
                        if (c->chunk == NULL) {
304 99380
                                c->chunk = VTAILQ_FIRST(&c->vtx->chunks);
305 99380
                                c->chunkstart = 0;
306 99380
                        }
307 3360836
                        CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
308 3450238
                        while (c->offset >= c->chunkstart + c->chunk->len) {
309 89402
                                c->chunkstart += c->chunk->len;
310 89402
                                c->chunk = VTAILQ_NEXT(c->chunk, list);
311 89402
                                CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
312
                        }
313
314
                        /* Point to the next stored record */
315 3360836
                        if (c->chunk->type == chunk_t_shm)
316 2773809
                                ptr = c->chunk->shm.start.ptr;
317
                        else {
318 587027
                                assert(c->chunk->type == chunk_t_buf);
319 587027
                                ptr = c->chunk->buf.data;
320
                        }
321 3360836
                        c->cursor.rec.ptr = ptr + c->offset - c->chunkstart;
322 6721672
                        c->offset += VSL_NEXT(c->cursor.rec.ptr) -
323 3360836
                            c->cursor.rec.ptr;
324
                }
325 3372758
        } while (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch);
326
327 3113709
        return (vsl_more);
328 3442768
}
329
330
static enum vsl_status v_matchproto_(vslc_reset_f)
331 139770
vslc_vtx_reset(const struct VSL_cursor *cursor)
332
{
333
        struct vslc_vtx *c;
334
335 139770
        CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
336 139770
        assert(&c->cursor == cursor);
337 139770
        CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
338 139770
        c->synth = VTAILQ_FIRST(&c->vtx->synth);
339 139770
        c->chunk = NULL;
340 139770
        c->chunkstart = 0;
341 139770
        c->offset = 0;
342 139770
        c->cursor.rec.ptr = NULL;
343
344 139770
        return (vsl_end);
345
}
346
347
static const struct vslc_tbl vslc_vtx_tbl = {
348
        .magic  = VSLC_TBL_MAGIC,
349
        .delete = NULL,
350
        .next   = vslc_vtx_next,
351
        .reset  = vslc_vtx_reset,
352
        .check  = NULL,
353
};
354
355
/* Create a buf chunk */
356
static struct chunk *
357 17157
chunk_newbuf(struct vtx *vtx, const uint32_t *ptr, size_t len)
358
{
359
        struct chunk *chunk;
360
361 17157
        ALLOC_OBJ(chunk, CHUNK_MAGIC);
362 17157
        XXXAN(chunk);
363 17157
        chunk->type = chunk_t_buf;
364 17157
        chunk->vtx = vtx;
365 17157
        chunk->buf.space = VTX_BUFSIZE_MIN;
366 17157
        while (chunk->buf.space < len)
367 0
                chunk->buf.space *= 2;
368 17157
        chunk->buf.data = malloc(sizeof (uint32_t) * chunk->buf.space);
369 17157
        AN(chunk->buf.data);
370 17157
        memcpy(chunk->buf.data, ptr, sizeof (uint32_t) * len);
371 17157
        chunk->len = len;
372 17157
        return (chunk);
373
}
374
375
/* Free a buf chunk */
376
static void
377 17152
chunk_freebuf(struct chunk **pchunk)
378
{
379
        struct chunk *chunk;
380
381 17152
        TAKE_OBJ_NOTNULL(chunk, pchunk, CHUNK_MAGIC);
382 17152
        assert(chunk->type == chunk_t_buf);
383 17152
        free(chunk->buf.data);
384 17152
        FREE_OBJ(chunk);
385 17152
}
386
387
/* Append a set of records to a chunk */
388
static void
389 230067
chunk_appendbuf(struct chunk *chunk, const uint32_t *ptr, size_t len)
390
{
391
392 230067
        CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
393 230067
        assert(chunk->type == chunk_t_buf);
394 230067
        if (chunk->buf.space < chunk->len + len) {
395 72560
                while (chunk->buf.space < chunk->len + len)
396 36280
                        chunk->buf.space *= 2;
397 72560
                chunk->buf.data = realloc(chunk->buf.data,
398 36280
                    sizeof (uint32_t) * chunk->buf.space);
399 36280
        }
400 230067
        memcpy(chunk->buf.data + chunk->len, ptr, sizeof (uint32_t) * len);
401 230067
        chunk->len += len;
402 230067
}
403
404
/* Transform a shm chunk to a buf chunk */
405
static void
406 0
chunk_shm_to_buf(struct VSLQ *vslq, struct chunk *chunk)
407
{
408
        struct vtx *vtx;
409
        struct chunk *buf;
410
411 0
        CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
412 0
        assert(chunk->type == chunk_t_shm);
413 0
        vtx = chunk->vtx;
414 0
        CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
415
416 0
        buf = VTAILQ_PREV(chunk, chunkhead, list);
417 0
        if (buf != NULL && buf->type == chunk_t_buf)
418
                /* Previous is a buf chunk, append to it */
419 0
                chunk_appendbuf(buf, chunk->shm.start.ptr, chunk->len);
420
        else {
421
                /* Create a new buf chunk and insert it before this */
422 0
                buf = chunk_newbuf(vtx, chunk->shm.start.ptr, chunk->len);
423 0
                AN(buf);
424 0
                VTAILQ_INSERT_BEFORE(chunk, buf, list);
425
        }
426
427
        /* Reset cursor chunk pointer, vslc_vtx_next will set it correctly */
428 0
        vtx->c.chunk = NULL;
429
430
        /* Remove from the shmref list and vtx, and put chunk back
431
           on the free list */
432 0
        VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
433 0
        VTAILQ_REMOVE(&vtx->chunks, chunk, list);
434 0
        VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
435 0
}
436
437
/* Append a set of records to a vtx structure */
438
static enum vsl_status
439 333971
vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
440
    size_t len)
441
{
442
        struct chunk *chunk;
443
        enum vsl_check i;
444
445 333971
        AN(vtx);
446 333971
        AN(len);
447 333971
        AN(start);
448
449 333971
        i = VSL_Check(vslq->c, start);
450 333971
        if (i == vsl_check_e_inval)
451 0
                return (vsl_e_overrun);
452
453 333971
        if (i == vsl_check_valid && !VTAILQ_EMPTY(&vtx->shmchunks_free)) {
454
                /* Shmref it */
455 86771
                chunk = VTAILQ_FIRST(&vtx->shmchunks_free);
456 86771
                CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
457 86771
                assert(chunk->type == chunk_t_shm);
458 86771
                assert(chunk->vtx == vtx);
459 86771
                VTAILQ_REMOVE(&vtx->shmchunks_free, chunk, list);
460 86771
                chunk->shm.start = *start;
461 86771
                chunk->len = len;
462 86771
                VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
463
464
                /* Append to shmref list */
465 86771
                VTAILQ_INSERT_TAIL(&vslq->shmrefs, chunk, shm.shmref);
466 86771
        } else {
467
                /* Buffer it */
468 247200
                chunk = VTAILQ_LAST(&vtx->chunks, chunkhead);
469 247200
                CHECK_OBJ_ORNULL(chunk, CHUNK_MAGIC);
470 247194
                if (chunk != NULL && chunk->type == chunk_t_buf) {
471
                        /* Tail is a buf chunk, append to that */
472 230043
                        chunk_appendbuf(chunk, start->ptr, len);
473 230043
                } else {
474
                        /* Append new buf chunk */
475 17151
                        chunk = chunk_newbuf(vtx, start->ptr, len);
476 17151
                        AN(chunk);
477 17151
                        VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
478
                }
479
        }
480 333965
        vtx->len += len;
481 333965
        return (vsl_more);
482 333965
}
483
484
/* Allocate a new vtx structure */
485
static struct vtx *
486 50714
vtx_new(struct VSLQ *vslq)
487
{
488
        struct vtx *vtx;
489
        int i;
490
491 50714
        AN(vslq);
492 50714
        if (vslq->n_cache) {
493 27459
                AZ(VTAILQ_EMPTY(&vslq->cache));
494 27459
                vtx = VTAILQ_FIRST(&vslq->cache);
495 27459
                VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
496 27459
                vslq->n_cache--;
497 27459
        } else {
498 23255
                ALLOC_OBJ(vtx, VTX_MAGIC);
499 23251
                AN(vtx);
500
501 23251
                VTAILQ_INIT(&vtx->child);
502 23251
                VTAILQ_INIT(&vtx->shmchunks_free);
503 92996
                for (i = 0; i < VTX_SHMCHUNKS; i++) {
504 69745
                        vtx->shmchunks[i].magic = CHUNK_MAGIC;
505 69745
                        vtx->shmchunks[i].type = chunk_t_shm;
506 69745
                        vtx->shmchunks[i].vtx = vtx;
507 69745
                        VTAILQ_INSERT_TAIL(&vtx->shmchunks_free,
508
                            &vtx->shmchunks[i], list);
509 69745
                }
510 23251
                VTAILQ_INIT(&vtx->chunks);
511 23251
                VTAILQ_INIT(&vtx->synth);
512 23251
                vtx->c.magic = VSLC_VTX_MAGIC;
513 23251
                vtx->c.vtx = vtx;
514 23251
                vtx->c.cursor.priv_tbl = &vslc_vtx_tbl;
515 23251
                vtx->c.cursor.priv_data = &vtx->c;
516
        }
517
518 50710
        CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
519 50710
        vtx->key.vxid = 0;
520 50710
        vtx->t_start = VTIM_mono();
521 50710
        vtx->flags = 0;
522 50710
        vtx->type = VSL_t_unknown;
523 50710
        vtx->reason = VSL_r_unknown;
524 50710
        vtx->parent = NULL;
525 50710
        vtx->n_child = 0;
526 50710
        vtx->n_childready = 0;
527 50710
        vtx->n_descend = 0;
528 50710
        vtx->len = 0;
529 50710
        AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
530
531 50710
        return (vtx);
532
}
533
534
/* Disuse a vtx and all it's children, freeing any resources held. Free or
535
   cache the vtx for later use */
536
static void
537 50700
vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
538
{
539
        struct vtx *vtx;
540
        struct vtx *child;
541
        struct synth *synth;
542
        struct chunk *chunk;
543
544 50700
        AN(vslq);
545 50700
        TAKE_OBJ_NOTNULL(vtx, pvtx, VTX_MAGIC);
546
547 50700
        AN(vtx->flags & VTX_F_COMPLETE);
548 50700
        AN(vtx->flags & VTX_F_READY);
549 50700
        AZ(vtx->parent);
550
551 56117
        while (!VTAILQ_EMPTY(&vtx->child)) {
552 5417
                child = VTAILQ_FIRST(&vtx->child);
553 5417
                assert(child->parent == vtx);
554 5417
                AN(vtx->n_child);
555 5417
                assert(vtx->n_descend >= child->n_descend + 1);
556 5417
                VTAILQ_REMOVE(&vtx->child, child, list_child);
557 5417
                child->parent = NULL;
558 5417
                vtx->n_child--;
559 5417
                vtx->n_descend -= child->n_descend + 1;
560 5417
                vtx_retire(vslq, &child);
561 5417
                AZ(child);
562
        }
563 50700
        AZ(vtx->n_child);
564 50700
        AZ(vtx->n_descend);
565 50700
        vtx->n_childready = 0;
566
        // remove rval is no way to check if element was present
567 50700
        (void)VRBT_REMOVE(vtx_tree, &vslq->tree, &vtx->key);
568 50700
        vtx->key.vxid = 0;
569 50700
        vtx->flags = 0;
570
571 62462
        while (!VTAILQ_EMPTY(&vtx->synth)) {
572 11762
                synth = VTAILQ_FIRST(&vtx->synth);
573 11762
                CHECK_OBJ_NOTNULL(synth, SYNTH_MAGIC);
574 11762
                VTAILQ_REMOVE(&vtx->synth, synth, list);
575 11762
                FREE_OBJ(synth);
576
        }
577
578 154601
        while (!VTAILQ_EMPTY(&vtx->chunks)) {
579 103901
                chunk = VTAILQ_FIRST(&vtx->chunks);
580 103901
                CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
581 103901
                VTAILQ_REMOVE(&vtx->chunks, chunk, list);
582 103901
                if (chunk->type == chunk_t_shm) {
583 86753
                        VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
584 86753
                        VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
585 86753
                } else {
586 17148
                        assert(chunk->type == chunk_t_buf);
587 17148
                        chunk_freebuf(&chunk);
588 17148
                        AZ(chunk);
589
                }
590
        }
591 50700
        vtx->len = 0;
592 50700
        AN(vslq->n_outstanding);
593 50700
        vslq->n_outstanding--;
594
595 50700
        if (vslq->n_cache < VTX_CACHE) {
596 49848
                VTAILQ_INSERT_HEAD(&vslq->cache, vtx, list_child);
597 49848
                vslq->n_cache++;
598 49848
        } else
599 852
                FREE_OBJ(vtx);
600
601 50700
}
602
603
/* Lookup a vtx by vxid from the managed list */
604
static struct vtx *
605 367358
vtx_lookup(const struct VSLQ *vslq, uint64_t vxid)
606
{
607
        struct vtx_key lkey, *key;
608
        struct vtx *vtx;
609
610 367358
        AN(vslq);
611 367358
        lkey.vxid = vxid;
612 367358
        key = VRBT_FIND(vtx_tree, &vslq->tree, &lkey);
613 367358
        if (key == NULL)
614 75565
                return (NULL);
615 291793
        CAST_OBJ_NOTNULL(vtx, (void *)key, VTX_MAGIC);
616 291793
        return (vtx);
617 367358
}
618
619
/* Insert a new vtx into the managed list */
620
static struct vtx *
621 50701
vtx_add(struct VSLQ *vslq, uint64_t vxid)
622
{
623
        struct vtx *vtx;
624
625 50701
        AN(vslq);
626 50701
        vtx = vtx_new(vslq);
627 50701
        AN(vtx);
628 50701
        vtx->key.vxid = vxid;
629 50701
        AZ(VRBT_INSERT(vtx_tree, &vslq->tree, &vtx->key));
630 50701
        VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_vtx);
631 50701
        vslq->n_outstanding++;
632 50701
        return (vtx);
633
}
634
635
/* Mark a vtx complete, update child counters and if possible push it or
636
   it's top parent to the ready state */
637
static void
638 50697
vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
639
{
640
641 50697
        AN(vslq);
642 50697
        AN(vtx->flags & VTX_F_END);
643 50697
        AZ(vtx->flags & VTX_F_COMPLETE);
644
645 50697
        if (vtx->type == VSL_t_unknown)
646 0
                vtx_diag(vtx, "vtx of unknown type marked complete");
647
648 50697
        vtx->flags |= VTX_F_COMPLETE;
649 50697
        VTAILQ_REMOVE(&vslq->incomplete, vtx, list_vtx);
650
651 56103
        while (1) {
652 56103
                AZ(vtx->flags & VTX_F_READY);
653 56103
                if (vtx->flags & VTX_F_COMPLETE &&
654 50938
                    vtx->n_child == vtx->n_childready)
655 50694
                        vtx->flags |= VTX_F_READY;
656
                else
657 5409
                        return;
658 50694
                if (vtx->parent == NULL) {
659
                        /* Top level vtx ready */
660 45288
                        VTAILQ_INSERT_TAIL(&vslq->ready, vtx, list_vtx);
661 45288
                        return;
662
                }
663 5406
                vtx = vtx->parent;
664 5406
                vtx->n_childready++;
665 5406
                assert(vtx->n_child >= vtx->n_childready);
666
        }
667 50697
}
668
669
/* Add a child to a parent, and update child counters */
670
static void
671 5406
vtx_set_parent(struct vtx *parent, struct vtx *child)
672
{
673
674 5406
        CHECK_OBJ_NOTNULL(parent, VTX_MAGIC);
675 5406
        CHECK_OBJ_NOTNULL(child, VTX_MAGIC);
676 5406
        assert(parent != child);
677 5406
        AZ(parent->flags & VTX_F_COMPLETE);
678 5406
        AZ(child->flags & VTX_F_COMPLETE);
679 5406
        AZ(child->parent);
680 5406
        child->parent = parent;
681 5406
        VTAILQ_INSERT_TAIL(&parent->child, child, list_child);
682 5406
        parent->n_child++;
683 5406
        do
684 6452
                parent->n_descend += 1 + child->n_descend;
685 6452
        while ((parent = parent->parent) != NULL);
686 5406
}
687
688
/* Parse a begin or link record. Returns the number of elements that was
689
   successfully parsed. */
690
static int
691 135915
vtx_parse_link(const char *str, enum VSL_transaction_e *ptype,
692
    uint64_t *pvxid, enum VSL_reason_e *preason, uint64_t *psub)
693
{
694
        char type[16], reason[16];
695
        uintmax_t vxid, sub;
696
        int i;
697
        enum VSL_transaction_e et;
698
        enum VSL_reason_e er;
699
700 135915
        AN(str);
701 135915
        AN(ptype);
702 135915
        AN(pvxid);
703 135915
        AN(preason);
704
705 135915
        i = sscanf(str, "%15s %ju %15s %ju", type, &vxid, reason, &sub);
706 135915
        if (i < 1)
707 0
                return (0);
708
709
        /* transaction type */
710 418753
        for (et = VSL_t_unknown; et < VSL_t__MAX; et++)
711 418728
                if (!strcmp(type, vsl_t_names[et]))
712 135890
                        break;
713 135915
        if (et >= VSL_t__MAX)
714 0
                et = VSL_t_unknown;
715 135877
        *ptype = et;
716 135877
        if (i == 1)
717 0
                return (1);
718
719
        /* vxid */
720 135877
        assert((vxid & ~VSL_IDENTMASK) == 0);
721 135877
        *pvxid = vxid;
722 135877
        if (i == 2)
723 0
                return (2);
724
725
        /* transaction reason */
726 564794
        for (er = VSL_r_unknown; er < VSL_r__MAX; er++)
727 561915
                if (!strcmp(reason, vsl_r_names[er]))
728 132998
                        break;
729 135877
        if (er >= VSL_r__MAX)
730 2880
                er = VSL_r_unknown;
731 135877
        *preason = er;
732 135877
        if (i == 3)
733 129797
                return (3);
734
735
        /* request sub-level */
736 6080
        if (psub != NULL)
737 2200
                *psub = sub;
738 6080
        return (4);
739 135877
}
740
741
/* Parse and process a begin record */
742
static int
743 50712
vtx_scan_begin(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
744
{
745
        int i;
746
        enum VSL_transaction_e type;
747
        enum VSL_reason_e reason;
748
        uint64_t p_vxid;
749
        struct vtx *p_vtx;
750
751 50712
        assert(VSL_TAG(ptr) == SLT_Begin);
752
753 50712
        AZ(vtx->flags & VTX_F_READY);
754
755 50712
        i = vtx_parse_link(VSL_CDATA(ptr), &type, &p_vxid, &reason, NULL);
756 50712
        if (i < 3)
757 0
                return (vtx_diag_tag(vtx, ptr, "parse error"));
758 50712
        if (type == VSL_t_unknown)
759 0
                (void)vtx_diag_tag(vtx, ptr, "unknown vxid type");
760
761
        /* Check/set vtx type */
762 50712
        if (vtx->type != VSL_t_unknown && vtx->type != type)
763
                /* Type not matching the one previously set by a link
764
                   record */
765 0
                (void)vtx_diag_tag(vtx, ptr, "type mismatch");
766 50712
        vtx->type = type;
767 50712
        vtx->reason = reason;
768
769 50712
        if (p_vxid == 0)
770
                /* Zero means no parent */
771 14637
                return (0);
772 36075
        if (p_vxid == vtx->key.vxid)
773 0
                return (vtx_diag_tag(vtx, ptr, "link to self"));
774
775 36075
        if (vslq->grouping == VSL_g_vxid)
776 29537
                return (0);     /* No links */
777 6538
        if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_req &&
778 1960
            vtx->reason == VSL_r_rxreq)
779 1120
                return (0);     /* No links */
780
781 5418
        if (vtx->parent != NULL) {
782 2504
                if (vtx->parent->key.vxid != p_vxid) {
783
                        /* This vtx already belongs to a different
784
                           parent */
785 0
                        return (vtx_diag_tag(vtx, ptr, "link mismatch"));
786
                } else
787
                        /* Link already exists */
788 2504
                        return (0);
789
        }
790
791 2914
        p_vtx = vtx_lookup(vslq, p_vxid);
792 2914
        if (p_vtx == NULL) {
793
                /* Not seen parent yet. Create it. */
794 1234
                p_vtx = vtx_add(vslq, p_vxid);
795 1234
                AN(p_vtx);
796 1234
        } else {
797 1680
                CHECK_OBJ_NOTNULL(p_vtx, VTX_MAGIC);
798 1680
                if (p_vtx->flags & VTX_F_COMPLETE)
799 0
                        return (vtx_diag_tag(vtx, ptr, "link too late"));
800
        }
801
802
        /* Create link */
803 2914
        vtx_set_parent(p_vtx, vtx);
804
805 2914
        return (0);
806 50712
}
807
808
/* Parse and process a link record */
809
static int
810 37369
vtx_scan_link(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
811
{
812
        int i;
813
        enum VSL_transaction_e c_type;
814
        enum VSL_reason_e c_reason;
815
        uint64_t c_vxid;
816
        struct vtx *c_vtx;
817
818 37369
        assert(VSL_TAG(ptr) == SLT_Link);
819
820 37369
        AZ(vtx->flags & VTX_F_READY);
821
822 37369
        i = vtx_parse_link(VSL_CDATA(ptr), &c_type, &c_vxid, &c_reason, NULL);
823 37369
        if (i < 3)
824 0
                return (vtx_diag_tag(vtx, ptr, "parse error"));
825 37369
        if (c_type == VSL_t_unknown)
826 0
                (void)vtx_diag_tag(vtx, ptr, "unknown vxid type");
827
828 37369
        if (vslq->grouping == VSL_g_vxid)
829 31965
                return (0);     /* No links */
830 5404
        if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_sess)
831 0
                return (0);     /* No links */
832
833 5404
        if (c_vxid == 0)
834 0
                return (vtx_diag_tag(vtx, ptr, "illegal link vxid"));
835 5404
        if (c_vxid == vtx->key.vxid)
836 0
                return (vtx_diag_tag(vtx, ptr, "link to self"));
837
838
        /* Lookup and check child vtx */
839 5404
        c_vtx = vtx_lookup(vslq, c_vxid);
840 5404
        if (c_vtx == NULL) {
841
                /* Child not seen before. Insert it and create link */
842 2490
                c_vtx = vtx_add(vslq, c_vxid);
843 2490
                AN(c_vtx);
844 2490
                AZ(c_vtx->parent);
845 2490
                c_vtx->type = c_type;
846 2490
                c_vtx->reason = c_reason;
847 2490
                vtx_set_parent(vtx, c_vtx);
848 2490
                return (0);
849
        }
850
851 2914
        CHECK_OBJ_NOTNULL(c_vtx, VTX_MAGIC);
852 2914
        if (c_vtx->parent == vtx)
853
                /* Link already exists */
854 2914
                return (0);
855 0
        if (c_vtx->parent != NULL && c_vtx->parent != vtx)
856 0
                return (vtx_diag_tag(vtx, ptr, "duplicate link"));
857 0
        if (c_vtx->flags & VTX_F_COMPLETE)
858 0
                return (vtx_diag_tag(vtx, ptr, "link too late"));
859 0
        if (c_vtx->type != VSL_t_unknown && c_vtx->type != c_type)
860 0
                (void)vtx_diag_tag(vtx, ptr, "type mismatch");
861
862 0
        c_vtx->type = c_type;
863 0
        c_vtx->reason = c_reason;
864 0
        vtx_set_parent(vtx, c_vtx);
865 0
        return (0);
866 37369
}
867
868
/* Scan the records of a vtx, performing processing actions on specific
869
   records */
870
static void
871 345299
vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
872
{
873
        const uint32_t *ptr;
874
        enum VSL_tag_e tag;
875
876 3876363
        while (!(vtx->flags & VTX_F_COMPLETE) &&
877 1911547
            vslc_vtx_next(&vtx->c.cursor) == 1) {
878 1619517
                ptr = vtx->c.cursor.rec.ptr;
879 1619517
                if (VSL_ID(ptr) != vtx->key.vxid) {
880 0
                        (void)vtx_diag_tag(vtx, ptr, "vxid mismatch");
881 0
                        continue;
882
                }
883
884 1619517
                tag = VSL_TAG(ptr);
885 1619517
                assert(tag != SLT__Batch);
886
887 1619517
                switch (tag) {
888
                case SLT_Begin:
889 50704
                        if (vtx->flags & VTX_F_BEGIN)
890 0
                                (void)vtx_diag_tag(vtx, ptr, "duplicate begin");
891
                        else {
892 50704
                                (void)vtx_scan_begin(vslq, vtx, ptr);
893 50704
                                vtx->flags |= VTX_F_BEGIN;
894
                        }
895 50704
                        break;
896
897
                case SLT_Link:
898 37357
                        (void)vtx_scan_link(vslq, vtx, ptr);
899 37357
                        break;
900
901
                case SLT_End:
902 50693
                        AZ(vtx->flags & VTX_F_END);
903 50693
                        vtx->flags |= VTX_F_END;
904 50693
                        vtx_mark_complete(vslq, vtx);
905 50693
                        break;
906
907
                default:
908 1480763
                        break;
909
                }
910
        }
911 345299
}
912
913
/* Force a vtx into complete status by synthing the necessary outstanding
914
   records */
915
static void
916 5881
vtx_force(struct VSLQ *vslq, struct vtx *vtx, const char *reason)
917
{
918
919 5881
        AZ(vtx->flags & VTX_F_COMPLETE);
920 5881
        AZ(vtx->flags & VTX_F_READY);
921 5881
        vtx_scan(vslq, vtx);
922 5881
        if (!(vtx->flags & VTX_F_BEGIN))
923 0
                vtx_synth_rec(vtx, SLT_Begin, "%s %u synth",
924 0
                    vsl_t_names[vtx->type], 0);
925 5881
        vtx_diag(vtx, reason);
926 5881
        if (!(vtx->flags & VTX_F_END))
927 5881
                vtx_synth_rec(vtx, SLT_End, "synth");
928 5881
        vtx_scan(vslq, vtx);
929 5881
        AN(vtx->flags & VTX_F_COMPLETE);
930 5881
}
931
932
static int
933 440
vslq_ratelimit(struct VSLQ *vslq)
934
{
935
        vtim_mono now;
936
        vtim_dur delta;
937
938 440
        CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
939 440
        CHECK_OBJ_NOTNULL(vslq->vsl, VSL_MAGIC);
940
941 440
        now = VTIM_mono();
942 440
        delta = now - vslq->last_use;
943 440
        vslq->credits += (delta / vslq->vsl->R_opt_p) * vslq->vsl->R_opt_l;
944 440
        vslq->credits = vmin_t(double, vslq->credits, vslq->vsl->R_opt_l);
945 440
        vslq->last_use = now;
946
947 440
        if (vslq->credits < 1.0)
948 0
                return (0);
949
950 440
        vslq->credits -= 1.0;
951 440
        return (1);
952 440
}
953
954
/* Build transaction array, do the query and callback. Returns 0 or the
955
   return value from func */
956
static int
957 39489
vslq_callback(struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
958
    void *priv)
959
{
960 39489
        unsigned n = vtx->n_descend + 1;
961 39489
        struct vtx *vtxs[n];
962 39489
        struct VSL_transaction trans[n];
963 39489
        struct VSL_transaction *ptrans[n + 1];
964
        unsigned i, j;
965
966 39489
        AN(vslq);
967 39489
        CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
968 39489
        AN(vtx->flags & VTX_F_READY);
969 39489
        AN(func);
970
971 39489
        if (vslq->grouping == VSL_g_session &&
972 876
            vtx->type != VSL_t_sess)
973 0
                return (0);
974 39489
        if (vslq->grouping == VSL_g_request &&
975 1120
            vtx->type != VSL_t_req)
976 0
                return (0);
977
978
        /* Build transaction array */
979 39489
        AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
980 39489
        vtxs[0] = vtx;
981 39489
        trans[0].level = 1;
982 39489
        trans[0].vxid = vtx->key.vxid;
983 39489
        trans[0].vxid_parent = 0;
984 39489
        trans[0].type = vtx->type;
985 39489
        trans[0].reason = vtx->reason;
986 39489
        trans[0].c = &vtx->c.cursor;
987 39489
        i = 1;
988 39489
        j = 0;
989 83773
        while (j < i) {
990 49080
                VTAILQ_FOREACH(vtx, &vtxs[j]->child, list_child) {
991 4796
                        assert(i < n);
992 4796
                        AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
993 4796
                        vtxs[i] = vtx;
994 4796
                        if (vtx->reason == VSL_r_restart)
995
                                /* Restarts stay at the same level as parent */
996 80
                                trans[i].level = trans[j].level;
997
                        else
998 4716
                                trans[i].level = trans[j].level + 1;
999 4796
                        trans[i].vxid = vtx->key.vxid;
1000 4796
                        trans[i].vxid_parent = trans[j].vxid;
1001 4796
                        trans[i].type = vtx->type;
1002 4796
                        trans[i].reason = vtx->reason;
1003 4796
                        trans[i].c = &vtx->c.cursor;
1004 4796
                        i++;
1005 4796
                }
1006 44284
                j++;
1007
        }
1008 39489
        assert(i == n);
1009
1010
        /* Build pointer array */
1011 83788
        for (i = 0; i < n; i++)
1012 44299
                ptrans[i] = &trans[i];
1013 39489
        ptrans[i] = NULL;
1014
1015
        /* Query test goes here */
1016 39489
        if (vslq->query != NULL && !vslq_runquery(vslq->query, ptrans))
1017 14089
                return (0);
1018
1019 25400
        if (vslq->vsl->R_opt_l != 0 && !vslq_ratelimit(vslq))
1020 0
                return (0);
1021
1022
        /* Callback */
1023 25400
        return ((func)(vslq->vsl, ptrans, priv));
1024 39489
}
1025
1026
/* Create a synthetic log record. The record will be inserted at the
1027
   current cursor offset */
1028
static void
1029 11762
vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...)
1030
{
1031
        struct synth *synth, *it;
1032
        va_list ap;
1033
        char *buf;
1034
        int l, buflen;
1035
        uint64_t vxid;
1036
1037 11762
        ALLOC_OBJ(synth, SYNTH_MAGIC);
1038 11762
        AN(synth);
1039
1040 11762
        buf = VSL_DATA(synth->data);
1041 11762
        buflen = sizeof(synth->data) - VSL_BYTES(VSL_OVERHEAD);
1042 11762
        va_start(ap, fmt);
1043 11762
        l = vsnprintf(buf, buflen, fmt, ap);
1044 11762
        assert(l >= 0);
1045 11762
        va_end(ap);
1046 11762
        if (l > buflen - 1)
1047 0
                l = buflen - 1;
1048 11762
        buf[l++] = '\0';        /* NUL-terminated */
1049 11762
        vxid = vtx->key.vxid;
1050 11762
        switch (vtx->type) {
1051
        case VSL_t_req:
1052 640
                vxid |= VSL_CLIENTMARKER;
1053 640
                break;
1054
        case VSL_t_bereq:
1055 80
                vxid |= VSL_BACKENDMARKER;
1056 80
                break;
1057
        default:
1058 11042
                break;
1059
        }
1060 11762
        synth->data[2] = vxid >> 32;
1061 11762
        synth->data[1] = vxid;
1062 23524
        synth->data[0] = (((tag & VSL_IDMASK) << VSL_IDSHIFT) |
1063 11762
            (VSL_VERSION_3 << VSL_VERSHIFT) | l);
1064 11762
        synth->offset = vtx->c.offset;
1065
1066 11762
        VTAILQ_FOREACH_REVERSE(it, &vtx->synth, synthhead, list) {
1067
                /* Make sure the synth list is sorted on offset */
1068 5881
                CHECK_OBJ_NOTNULL(it, SYNTH_MAGIC);
1069 5881
                if (synth->offset >= it->offset)
1070 5881
                        break;
1071 0
        }
1072 11762
        if (it != NULL)
1073 5881
                VTAILQ_INSERT_AFTER(&vtx->synth, it, synth, list);
1074
        else
1075 5881
                VTAILQ_INSERT_HEAD(&vtx->synth, synth, list);
1076
1077
        /* Update cursor */
1078 11762
        CHECK_OBJ_ORNULL(vtx->c.synth, SYNTH_MAGIC);
1079 11762
        if (vtx->c.synth == NULL || vtx->c.synth->offset > synth->offset)
1080 5881
                vtx->c.synth = synth;
1081 11762
}
1082
1083
/* Add a diagnostic SLT_VSL synth record to the vtx. */
1084
static int
1085 5881
vtx_diag(struct vtx *vtx, const char *msg)
1086
{
1087
1088 5881
        vtx_synth_rec(vtx, SLT_VSL, msg);
1089 5881
        return (-1);
1090
}
1091
1092
/* Add a SLT_VSL diag synth record to the vtx. Takes an offending record
1093
   that will be included in the log record */
1094
static int
1095 0
vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr, const char *reason)
1096
{
1097
1098 0
        vtx_synth_rec(vtx, SLT_VSL, "%s (%ju:%s \"%.*s\")", reason, VSL_ID(ptr),
1099 0
            VSL_tags[VSL_TAG(ptr)], (int)VSL_LEN(ptr), VSL_CDATA(ptr));
1100 0
        return (-1);
1101
}
1102
1103
struct VSLQ *
1104 15360
VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
1105
    enum VSL_grouping_e grouping, const char *querystring)
1106
{
1107
        struct vslq_query *query;
1108
        struct VSLQ *vslq;
1109
1110 15360
        CHECK_OBJ_NOTNULL(vsl, VSL_MAGIC);
1111 15360
        if (grouping >= VSL_g__MAX) {
1112 0
                (void)vsl_diag(vsl, "Illegal query grouping");
1113 0
                return (NULL);
1114
        }
1115 15360
        if (querystring != NULL) {
1116 7000
                query = vslq_newquery(vsl, grouping, querystring);
1117 7000
                if (query == NULL)
1118 1280
                        return (NULL);
1119 5720
        } else
1120 8360
                query = NULL;
1121
1122 14080
        ALLOC_OBJ(vslq, VSLQ_MAGIC);
1123 14080
        AN(vslq);
1124 14080
        vslq->vsl = vsl;
1125 14080
        if (cp != NULL) {
1126 10680
                vslq->c = *cp;
1127 10680
                *cp = NULL;
1128 10680
        }
1129 14080
        vslq->grouping = grouping;
1130 14080
        vslq->query = query;
1131 14080
        if (vslq->vsl->R_opt_l != 0) {
1132 80
                vslq->last_use = VTIM_mono();
1133 80
                vslq->credits = 1;
1134 80
        }
1135
1136
        /* Setup normal mode */
1137 14080
        VRBT_INIT(&vslq->tree);
1138 14080
        VTAILQ_INIT(&vslq->ready);
1139 14080
        VTAILQ_INIT(&vslq->incomplete);
1140 14080
        VTAILQ_INIT(&vslq->shmrefs);
1141 14080
        VTAILQ_INIT(&vslq->cache);
1142
1143
        /* Setup raw mode */
1144 14080
        vslq->raw.c.magic = VSLC_RAW_MAGIC;
1145 14080
        vslq->raw.c.cursor.priv_tbl = &vslc_raw_tbl;
1146 14080
        vslq->raw.c.cursor.priv_data = &vslq->raw.c;
1147 14080
        vslq->raw.trans.level = 0;
1148 14080
        vslq->raw.trans.type = VSL_t_raw;
1149 14080
        vslq->raw.trans.reason = VSL_r_unknown;
1150 14080
        vslq->raw.trans.c = &vslq->raw.c.cursor;
1151 14080
        vslq->raw.ptrans[0] = &vslq->raw.trans;
1152 14080
        vslq->raw.ptrans[1] = NULL;
1153
1154 14080
        return (vslq);
1155 15360
}
1156
1157
void
1158 13840
VSLQ_Delete(struct VSLQ **pvslq)
1159
{
1160
        struct VSLQ *vslq;
1161
        struct vtx *vtx;
1162
1163 13840
        TAKE_OBJ_NOTNULL(vslq, pvslq, VSLQ_MAGIC);
1164
1165 13840
        (void)VSLQ_Flush(vslq, NULL, NULL);
1166 13840
        AZ(vslq->n_outstanding);
1167
1168 13840
        if (vslq->c != NULL) {
1169 13840
                VSL_DeleteCursor(vslq->c);
1170 13840
                vslq->c = NULL;
1171 13840
        }
1172
1173 13840
        if (vslq->query != NULL)
1174 5720
                vslq_deletequery(&vslq->query);
1175 13840
        AZ(vslq->query);
1176
1177 36245
        while (!VTAILQ_EMPTY(&vslq->cache)) {
1178 22405
                AN(vslq->n_cache);
1179 22405
                vtx = VTAILQ_FIRST(&vslq->cache);
1180 22405
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1181 22405
                VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
1182 22405
                vslq->n_cache--;
1183 22405
                FREE_OBJ(vtx);
1184
        }
1185
1186 13840
        FREE_OBJ(vslq);
1187 13840
}
1188
1189
void
1190 3160
VSLQ_SetCursor(struct VSLQ *vslq, struct VSL_cursor **cp)
1191
{
1192
1193 3160
        CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1194
1195 3160
        if (vslq->c != NULL) {
1196 0
                (void)VSLQ_Flush(vslq, NULL, NULL);
1197 0
                AZ(vslq->n_outstanding);
1198 0
                VSL_DeleteCursor(vslq->c);
1199 0
                vslq->c = NULL;
1200 0
        }
1201
1202 3160
        if (cp != NULL) {
1203 3160
                AN(*cp);
1204 3160
                vslq->c = *cp;
1205 3160
                *cp = NULL;
1206 3160
        }
1207 3160
}
1208
1209
/* Regard each log line as a single transaction, feed it through the query
1210
   and do the callback */
1211
static int
1212 703808
vslq_raw(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1213
{
1214 703808
        enum vsl_status r = vsl_more;
1215
        int i;
1216
1217 703808
        assert(vslq->grouping == VSL_g_raw);
1218
1219 703808
        assert(vslq->raw.offset <= vslq->raw.len);
1220 703808
        do {
1221 772440
                if (vslq->raw.offset == vslq->raw.len) {
1222 412445
                        r = VSL_Next(vslq->c);
1223 412445
                        if (r != vsl_more)
1224 221819
                                return (r);
1225 190626
                        AN(vslq->c->rec.ptr);
1226 190626
                        vslq->raw.start = vslq->c->rec;
1227 190626
                        if (VSL_TAG(vslq->c->rec.ptr) == SLT__Batch)
1228 137284
                                vslq->raw.len = VSL_END(vslq->c->rec.ptr,
1229 68642
                                    VSL_BATCHLEN(vslq->c->rec.ptr)) -
1230 68642
                                    vslq->c->rec.ptr;
1231
                        else
1232 243968
                                vslq->raw.len = VSL_NEXT(vslq->raw.start.ptr) -
1233 121984
                                    vslq->raw.start.ptr;
1234 190626
                        assert(vslq->raw.len > 0);
1235 190626
                        vslq->raw.offset = 0;
1236 190626
                }
1237
1238 550621
                vslq->raw.c.ptr = vslq->raw.start.ptr + vslq->raw.offset;
1239 550621
                vslq->raw.c.cursor.rec.ptr = NULL;
1240 550621
                vslq->raw.trans.vxid = VSL_ID(vslq->raw.c.ptr);
1241 550621
                vslq->raw.offset += VSL_NEXT(vslq->raw.c.ptr) - vslq->raw.c.ptr;
1242 550621
        } while (VSL_TAG(vslq->raw.c.ptr) == SLT__Batch);
1243
1244 481989
        assert (r == vsl_more);
1245
1246 481989
        if (func == NULL)
1247 0
                return (r);
1248
1249 481989
        if (vslq->query != NULL &&
1250 81100
            !vslq_runquery(vslq->query, vslq->raw.ptrans))
1251 76220
                return (r);
1252
1253 405769
        if (vslq->vsl->R_opt_l != 0 && !vslq_ratelimit(vslq))
1254 0
                return (r);
1255
1256 405769
        i = (func)(vslq->vsl, vslq->raw.ptrans, priv);
1257 405769
        if (i)
1258 3720
                return (i);
1259
1260 402049
        return (r);
1261 703808
}
1262
1263
/* Check the beginning of the shmref list, and buffer refs that are at
1264
 * warning level.
1265
 */
1266
static enum vsl_status
1267 460417
vslq_shmref_check(struct VSLQ *vslq)
1268
{
1269
        struct chunk *chunk;
1270
        enum vsl_check i;
1271
1272 460417
        while ((chunk = VTAILQ_FIRST(&vslq->shmrefs)) != NULL) {
1273 339623
                CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
1274 339623
                assert(chunk->type == chunk_t_shm);
1275 339623
                i = VSL_Check(vslq->c, &chunk->shm.start);
1276 339623
                switch (i) {
1277
                case vsl_check_valid:
1278
                        /* First on list is OK, refs behind it must also
1279
                           be OK */
1280 339623
                        return (vsl_more);
1281
                case vsl_check_warn:
1282
                        /* Buffer this chunk */
1283 0
                        chunk_shm_to_buf(vslq, chunk);
1284 0
                        break;
1285
                default:
1286
                        /* Too late to buffer */
1287 0
                        return (vsl_e_overrun);
1288
                }
1289
        }
1290
1291 120794
        return (vsl_more);
1292 460417
}
1293
1294
static unsigned
1295 56292
vslq_candidate(struct VSLQ *vslq, const uint32_t *ptr)
1296
{
1297
        enum VSL_transaction_e type;
1298
        enum VSL_reason_e reason;
1299
        struct VSL_data *vsl;
1300
        enum VSL_tag_e tag;
1301
        uint64_t p_vxid, sub;
1302
        int i;
1303
1304 56292
        CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1305 56292
        AN(ptr);
1306
1307 56292
        assert(vslq->grouping != VSL_g_raw);
1308 56292
        if (vslq->grouping == VSL_g_session)
1309 4284
                return (1); /* All are needed */
1310
1311 52008
        vsl = vslq->vsl;
1312 52008
        CHECK_OBJ_NOTNULL(vsl, VSL_MAGIC);
1313 52008
        if (vslq->grouping == VSL_g_vxid) {
1314 47847
                if (!vsl->c_opt && !vsl->b_opt)
1315 35207
                        AZ(vsl->E_opt);
1316 12640
                else if (!vsl->b_opt && !VSL_CLIENT(ptr))
1317 2840
                        return (0);
1318 9800
                else if (!vsl->c_opt && !VSL_BACKEND(ptr))
1319 1360
                        return (0);
1320
                /* Need to parse the Begin tag - fallthrough to below */
1321 43647
        }
1322
1323 47808
        tag = VSL_TAG(ptr);
1324 47808
        assert(tag == SLT_Begin);
1325 47808
        i = vtx_parse_link(VSL_CDATA(ptr), &type, &p_vxid, &reason, &sub);
1326 47808
        if (i < 3 || type == VSL_t_unknown)
1327 4
                return (0);
1328
1329 47806
        if (vslq->grouping == VSL_g_request && type == VSL_t_sess)
1330 800
                return (0);
1331
1332 47006
        if (vslq->grouping == VSL_g_vxid && i > 3 && sub > 0 && !vsl->E_opt)
1333 600
                return (0);
1334
1335 46406
        return (1);
1336 56290
}
1337
1338
/* Process next input record */
1339
static enum vsl_status
1340 861775
vslq_next(struct VSLQ *vslq)
1341
{
1342
        const uint32_t *ptr;
1343
        struct VSL_cursor *c;
1344
        enum vsl_status r;
1345
        enum VSL_tag_e tag;
1346
        ssize_t len;
1347
        uint64_t vxid;
1348
        unsigned keep;
1349
        struct vtx *vtx;
1350
1351 861775
        c = vslq->c;
1352 861775
        r = VSL_Next(c);
1353 861775
        if (r != vsl_more)
1354 400236
                return (r);
1355
1356 461539
        assert (r == vsl_more);
1357
1358 461539
        tag = (enum VSL_tag_e)VSL_TAG(c->rec.ptr);
1359 461539
        if (tag == SLT__Batch) {
1360 145377
                vxid = VSL_BATCHID(c->rec.ptr);
1361 290754
                len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
1362 145377
                    c->rec.ptr;
1363 145377
                if (len == 0)
1364 0
                        return (r);
1365 145377
                ptr = VSL_NEXT(c->rec.ptr);
1366 145377
                tag = (enum VSL_tag_e)VSL_TAG(ptr);
1367 145377
        } else {
1368 316162
                vxid = VSL_ID(c->rec.ptr);
1369 316162
                len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
1370 316162
                ptr = c->rec.ptr;
1371
        }
1372 461539
        assert(len > 0);
1373 461539
        if (vxid == 0)
1374
                /* Skip non-transactional records */
1375 102488
                return (r);
1376
1377 359051
        vtx = vtx_lookup(vslq, vxid);
1378 359051
        keep = tag != SLT_Begin || vslq_candidate(vslq, ptr);
1379 359051
        if (vtx == NULL && tag == SLT_Begin && keep) {
1380 46974
                vtx = vtx_add(vslq, vxid);
1381 46974
                AN(vtx);
1382 46974
        }
1383 359051
        if (vtx != NULL) {
1384 334088
                AN(keep);
1385 334088
                r = vtx_append(vslq, vtx, &c->rec, len);
1386 334088
                if (r == vsl_more)
1387 333624
                        vtx_scan(vslq, vtx);
1388 334096
        }
1389
1390 359059
        return (r);
1391 861783
}
1392
1393
/* Test query and report any ready transactions */
1394
static int
1395 53329
vslq_process_ready(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1396
{
1397
        struct vtx *vtx;
1398 53329
        int i = 0;
1399
1400 53329
        AN(vslq);
1401
1402 91621
        while (!VTAILQ_EMPTY(&vslq->ready)) {
1403 45292
                vtx = VTAILQ_FIRST(&vslq->ready);
1404 45292
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1405 45292
                VTAILQ_REMOVE(&vslq->ready, vtx, list_vtx);
1406 45292
                AN(vtx->flags & VTX_F_READY);
1407 45292
                if (func != NULL)
1408 39488
                        i = vslq_callback(vslq, vtx, func, priv);
1409 45292
                vtx_retire(vslq, &vtx);
1410 45292
                AZ(vtx);
1411 45292
                if (i)
1412 7000
                        return (i);
1413
        }
1414
1415 46329
        return (0);
1416 53329
}
1417
1418
/* Process the input cursor, calling the callback function on matching
1419
   transaction sets */
1420
int
1421 1565375
VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1422
{
1423
        enum vsl_status r;
1424
        int i;
1425
        double now;
1426
        struct vtx *vtx;
1427
1428 1565375
        CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1429
1430
        /* Check that we have a cursor */
1431 1565375
        if (vslq->c == NULL)
1432 0
                return (vsl_e_abandon);
1433
1434 1565375
        if (vslq->grouping == VSL_g_raw)
1435 703808
                return (vslq_raw(vslq, func, priv));
1436
1437
        /* Process next cursor input */
1438 861567
        r = vslq_next(vslq);
1439 861567
        if (r != vsl_more)
1440
                /* At end of log or cursor reports error condition */
1441 399903
                return (r);
1442
1443
        /* Check shmref list and buffer if necessary */
1444 461664
        r = vslq_shmref_check(vslq);
1445 461664
        if (r != vsl_more)
1446
                /* Buffering of shm ref failed */
1447 0
                return (r);
1448
1449 461664
        assert (r == vsl_more);
1450
1451
        /* Check vtx timeout */
1452 461664
        now = VTIM_mono();
1453 461744
        while (!VTAILQ_EMPTY(&vslq->incomplete)) {
1454 333772
                vtx = VTAILQ_FIRST(&vslq->incomplete);
1455 333772
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1456 333772
                if (now - vtx->t_start < vslq->vsl->T_opt)
1457 333692
                        break;
1458 80
                vtx_force(vslq, vtx, "timeout");
1459 80
                AN(vtx->flags & VTX_F_COMPLETE);
1460
        }
1461
1462
        /* Check store limit */
1463 461664
        while (vslq->n_outstanding > vslq->vsl->L_opt &&
1464 0
            !(VTAILQ_EMPTY(&vslq->incomplete))) {
1465 0
                vtx = VTAILQ_FIRST(&vslq->incomplete);
1466 0
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1467 0
                vtx_force(vslq, vtx, "store overflow");
1468 0
                AN(vtx->flags & VTX_F_COMPLETE);
1469 0
                i = vslq_process_ready(vslq, func, priv);
1470 0
                if (i)
1471
                        /* User return code */
1472 0
                        return (i);
1473
        }
1474
1475
        /* Check ready list */
1476 461664
        if (!VTAILQ_EMPTY(&vslq->ready)) {
1477 39488
                i = vslq_process_ready(vslq, func, priv);
1478 39488
                if (i)
1479
                        /* User return code */
1480 7000
                        return (i);
1481 32488
        }
1482
1483 454664
        return (vsl_more);
1484 1565375
}
1485
1486
/* Flush any incomplete vtx held on to. Do callbacks if func != NULL */
1487
int
1488 13840
VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1489
{
1490
        struct vtx *vtx;
1491
1492 13840
        CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1493
1494 19641
        while (!VTAILQ_EMPTY(&vslq->incomplete)) {
1495 5801
                vtx = VTAILQ_FIRST(&vslq->incomplete);
1496 5801
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1497 5801
                AZ(vtx->flags & VTX_F_COMPLETE);
1498 5801
                vtx_force(vslq, vtx, "flush");
1499
        }
1500
1501 13840
        return (vslq_process_ready(vslq, func, priv));
1502
}