varnish-cache/lib/libvarnishapi/vsl_dispatch.c
1
/*-
2
 * Copyright (c) 2006 Verdens Gang AS
3
 * Copyright (c) 2006-2015 Varnish Software AS
4
 * All rights reserved.
5
 *
6
 * Author: Martin Blix Grydeland <martin@varnish-software.com>
7
 *
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions
10
 * are met:
11
 * 1. Redistributions of source code must retain the above copyright
12
 *    notice, this list of conditions and the following disclaimer.
13
 * 2. Redistributions in binary form must reproduce the above copyright
14
 *    notice, this list of conditions and the following disclaimer in the
15
 *    documentation and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
21
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27
 * SUCH DAMAGE.
28
 *
29
 */
30
31
#include "config.h"
32
33
#include <stdarg.h>
34
#include <stdint.h>
35
#include <stdio.h>
36
#include <stdlib.h>
37
#include <string.h>
38
39
#include "vdef.h"
40
#include "vas.h"
41
#include "miniobj.h"
42
43
#include "vqueue.h"
44
#include "vre.h"
45
#include "vtim.h"
46
#include "vtree.h"
47
48
#include "vapi/vsl.h"
49
50
#include "vsl_api.h"
51
52
#define VTX_CACHE 10
53
#define VTX_BUFSIZE_MIN 64
54
#define VTX_SHMCHUNKS 3
55
56
static const char * const vsl_t_names[VSL_t__MAX] = {
57
        [VSL_t_unknown] = "unknown",
58
        [VSL_t_sess]    = "sess",
59
        [VSL_t_req]     = "req",
60
        [VSL_t_bereq]   = "bereq",
61
        [VSL_t_raw]     = "raw",
62
};
63
64
static const char * const vsl_r_names[VSL_r__MAX] = {
65
        [VSL_r_unknown] = "unknown",
66
        [VSL_r_http_1]  = "HTTP/1",
67
        [VSL_r_rxreq]   = "rxreq",
68
        [VSL_r_esi]     = "esi",
69
        [VSL_r_restart] = "restart",
70
        [VSL_r_pass]    = "pass",
71
        [VSL_r_fetch]   = "fetch",
72
        [VSL_r_bgfetch] = "bgfetch",
73
        [VSL_r_pipe]    = "pipe",
74
};
75
76
struct vtx;
77
78
struct vslc_raw {
79
        unsigned                magic;
80
#define VSLC_RAW_MAGIC          0x247EBD44
81
82
        struct VSL_cursor       cursor;
83
84
        const uint32_t          *ptr;
85
};
86
87
struct synth {
88
        unsigned                magic;
89
#define SYNTH_MAGIC             0xC654479F
90
91
        VTAILQ_ENTRY(synth)     list;
92
        size_t                  offset;
93
        uint32_t                data[2 + 64 / sizeof (uint32_t)];
94
};
95
VTAILQ_HEAD(synthhead, synth);
96
97
enum chunk_t {
98
        chunk_t__unassigned,
99
        chunk_t_shm,
100
        chunk_t_buf,
101
};
102
103
struct chunk {
104
        unsigned                                magic;
105
#define CHUNK_MAGIC                             0x48DC0194
106
        enum chunk_t                            type;
107
        union {
108
                struct {
109
                        struct VSLC_ptr         start;
110
                        VTAILQ_ENTRY(chunk)     shmref;
111
                } shm;
112
                struct {
113
                        uint32_t                *data;
114
                        size_t                  space;
115
                } buf;
116
        };
117
        size_t                                  len;
118
        struct vtx                              *vtx;
119
        VTAILQ_ENTRY(chunk)                     list;
120
};
121
VTAILQ_HEAD(chunkhead, chunk);
122
123
struct vslc_vtx {
124
        unsigned                magic;
125
#define VSLC_VTX_MAGIC          0x74C6523F
126
127
        struct VSL_cursor       cursor;
128
129
        struct vtx              *vtx;
130
        struct synth            *synth;
131
        struct chunk            *chunk;
132
        size_t                  chunkstart;
133
        size_t                  offset;
134
};
135
136
struct vtx_key {
137
        unsigned                vxid;
138
        VRB_ENTRY(vtx_key)      entry;
139
};
140
VRB_HEAD(vtx_tree, vtx_key);
141
142
struct vtx {
143
        struct vtx_key          key;
144
        unsigned                magic;
145
#define VTX_MAGIC               0xACC21D09
146
        VTAILQ_ENTRY(vtx)       list_child;
147
        VTAILQ_ENTRY(vtx)       list_vtx;
148
149
        double                  t_start;
150
        unsigned                flags;
151
#define VTX_F_BEGIN             0x1 /* Begin record processed */
152
#define VTX_F_END               0x2 /* End record processed */
153
#define VTX_F_COMPLETE          0x4 /* Marked complete. No new children
154
                                       should be appended */
155
#define VTX_F_READY             0x8 /* This vtx and all it's children are
156
                                       complete */
157
158
        enum VSL_transaction_e  type;
159
        enum VSL_reason_e       reason;
160
161
        struct vtx              *parent;
162
        VTAILQ_HEAD(,vtx)       child;
163
        unsigned                n_child;
164
        unsigned                n_childready;
165
        unsigned                n_descend;
166
167
        VTAILQ_HEAD(,synth)     synth;
168
169
        struct chunk            shmchunks[VTX_SHMCHUNKS];
170
        struct chunkhead        shmchunks_free;
171
172
        struct chunkhead        chunks;
173
        size_t                  len;
174
175
        struct vslc_vtx         c;
176
};
177
178
struct VSLQ {
179
        unsigned                magic;
180
#define VSLQ_MAGIC              0x23A8BE97
181
182
        struct VSL_data         *vsl;
183
        struct VSL_cursor       *c;
184
        struct vslq_query       *query;
185
186
        enum VSL_grouping_e     grouping;
187
188
        /* Structured mode */
189
        struct vtx_tree         tree;
190
        VTAILQ_HEAD(,vtx)       ready;
191
        VTAILQ_HEAD(,vtx)       incomplete;
192
        unsigned                n_outstanding;
193
        struct chunkhead        shmrefs;
194
        VTAILQ_HEAD(,vtx)       cache;
195
        unsigned                n_cache;
196
197
        /* Raw mode */
198
        struct {
199
                struct vslc_raw         c;
200
                struct VSL_transaction  trans;
201
                struct VSL_transaction  *ptrans[2];
202
                struct VSLC_ptr         start;
203
                ssize_t                 len;
204
                size_t                  offset;
205
        } raw;
206
};
207
208
static void vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...);
209
/*lint -esym(534, vtx_diag) */
210
static int vtx_diag(struct vtx *vtx, const char *msg);
211
/*lint -esym(534, vtx_diag_tag) */
212
static int vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr,
213
    const char *reason);
214
215
static inline int
216 55142
vtx_keycmp(const struct vtx_key *a, const struct vtx_key *b)
217
{
218 55142
        if (a->vxid < b->vxid)
219 1019
                return (-1);
220 54123
        if (a->vxid > b->vxid)
221 27293
                return (1);
222 26830
        return (0);
223
}
224
225
VRB_PROTOTYPE_STATIC(vtx_tree, vtx_key, entry, vtx_keycmp)
226 65380
VRB_GENERATE_STATIC(vtx_tree, vtx_key, entry, vtx_keycmp)
227
228
static enum vsl_status v_matchproto_(vslc_next_f)
229 87301
vslc_raw_next(const struct VSL_cursor *cursor)
230
{
231
        struct vslc_raw *c;
232
233 87301
        CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
234 87301
        assert(&c->cursor == cursor);
235
236 87301
        AN(c->ptr);
237 87301
        if (c->cursor.rec.ptr == NULL) {
238 43981
                c->cursor.rec.ptr = c->ptr;
239 43981
                return (vsl_more);
240
        } else {
241 43320
                c->cursor.rec.ptr = NULL;
242 43320
                return (vsl_end);
243
        }
244
}
245
246
static enum vsl_status v_matchproto_(vslc_reset_f)
247 3408
vslc_raw_reset(const struct VSL_cursor *cursor)
248
{
249
        struct vslc_raw *c;
250
251 3408
        CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
252 3408
        assert(&c->cursor == cursor);
253
254 3408
        AN(c->ptr);
255 3408
        c->cursor.rec.ptr = NULL;
256
257 3408
        return (vsl_end);
258
}
259
260
static const struct vslc_tbl vslc_raw_tbl = {
261
        .magic  = VSLC_TBL_MAGIC,
262
        .delete = NULL,
263
        .next   = vslc_raw_next,
264
        .reset  = vslc_raw_reset,
265
        .check  = NULL,
266
};
267
268
static enum vsl_status v_matchproto_(vslc_next_f)
269 473967
vslc_vtx_next(const struct VSL_cursor *cursor)
270
{
271
        struct vslc_vtx *c;
272
        const uint32_t *ptr;
273
274 473967
        CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
275 473967
        assert(&c->cursor == cursor);
276 473967
        CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
277
278
        do {
279 508986
                CHECK_OBJ_ORNULL(c->synth, SYNTH_MAGIC);
280 508986
                if (c->synth != NULL && c->synth->offset == c->offset) {
281
                        /* We're at the offset of the next synth record,
282
                           point to it and advance the pointer */
283 2472
                        c->cursor.rec.ptr = c->synth->data;
284 2472
                        c->synth = VTAILQ_NEXT(c->synth, list);
285
                } else {
286 506514
                        assert(c->offset <= c->vtx->len);
287 506514
                        if (c->offset == c->vtx->len)
288 33412
                                return (vsl_end);
289
290
                        /* Advance chunk pointer */
291 473102
                        if (c->chunk == NULL) {
292 16742
                                c->chunk = VTAILQ_FIRST(&c->vtx->chunks);
293 16742
                                c->chunkstart = 0;
294
                        }
295 473102
                        CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
296 961442
                        while (c->offset >= c->chunkstart + c->chunk->len) {
297 15238
                                c->chunkstart += c->chunk->len;
298 15238
                                c->chunk = VTAILQ_NEXT(c->chunk, list);
299 15238
                                CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
300
                        }
301
302
                        /* Point to the next stored record */
303 473102
                        if (c->chunk->type == chunk_t_shm)
304 418431
                                ptr = c->chunk->shm.start.ptr;
305
                        else {
306 54671
                                assert(c->chunk->type == chunk_t_buf);
307 54671
                                ptr = c->chunk->buf.data;
308
                        }
309 473102
                        c->cursor.rec.ptr = ptr + c->offset - c->chunkstart;
310 473102
                        c->offset += VSL_NEXT(c->cursor.rec.ptr) -
311
                            c->cursor.rec.ptr;
312
                }
313 475574
        } while (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch);
314
315 440555
        return (vsl_more);
316
}
317
318
static enum vsl_status v_matchproto_(vslc_reset_f)
319 23961
vslc_vtx_reset(const struct VSL_cursor *cursor)
320
{
321
        struct vslc_vtx *c;
322
323 23961
        CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
324 23961
        assert(&c->cursor == cursor);
325 23961
        CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
326 23961
        c->synth = VTAILQ_FIRST(&c->vtx->synth);
327 23961
        c->chunk = NULL;
328 23961
        c->chunkstart = 0;
329 23961
        c->offset = 0;
330 23961
        c->cursor.rec.ptr = NULL;
331
332 23961
        return (vsl_end);
333
}
334
335
static const struct vslc_tbl vslc_vtx_tbl = {
336
        .magic  = VSLC_TBL_MAGIC,
337
        .delete = NULL,
338
        .next   = vslc_vtx_next,
339
        .reset  = vslc_vtx_reset,
340
        .check  = NULL,
341
};
342
343
/* Create a buf chunk */
344
static struct chunk *
345 2472
chunk_newbuf(struct vtx *vtx, const uint32_t *ptr, size_t len)
346
{
347
        struct chunk *chunk;
348
349 2472
        ALLOC_OBJ(chunk, CHUNK_MAGIC);
350 2472
        XXXAN(chunk);
351 2472
        chunk->type = chunk_t_buf;
352 2472
        chunk->vtx = vtx;
353 2472
        chunk->buf.space = VTX_BUFSIZE_MIN;
354 4944
        while (chunk->buf.space < len)
355 0
                chunk->buf.space *= 2;
356 2472
        chunk->buf.data = malloc(sizeof (uint32_t) * chunk->buf.space);
357 2472
        AN(chunk->buf.data);
358 2472
        memcpy(chunk->buf.data, ptr, sizeof (uint32_t) * len);
359 2472
        chunk->len = len;
360 2472
        return (chunk);
361
}
362
363
/* Free a buf chunk */
364
static void
365 2472
chunk_freebuf(struct chunk **pchunk)
366
{
367
368 2472
        CHECK_OBJ_NOTNULL(*pchunk, CHUNK_MAGIC);
369 2472
        assert((*pchunk)->type == chunk_t_buf);
370 2472
        free((*pchunk)->buf.data);
371 2472
        FREE_OBJ(*pchunk);
372 2471
        *pchunk = NULL;
373 2471
}
374
375
/* Append a set of records to a chunk */
376
static void
377 16134
chunk_appendbuf(struct chunk *chunk, const uint32_t *ptr, size_t len)
378
{
379
380 16134
        CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
381 16134
        assert(chunk->type == chunk_t_buf);
382 16134
        if (chunk->buf.space < chunk->len + len) {
383 3549
                while (chunk->buf.space < chunk->len + len)
384 1183
                        chunk->buf.space *= 2;
385 1183
                chunk->buf.data = realloc(chunk->buf.data,
386 1183
                    sizeof (uint32_t) * chunk->buf.space);
387
        }
388 16134
        memcpy(chunk->buf.data + chunk->len, ptr, sizeof (uint32_t) * len);
389 16134
        chunk->len += len;
390 16134
}
391
392
/* Transform a shm chunk to a buf chunk */
393
static void
394 0
chunk_shm_to_buf(struct VSLQ *vslq, struct chunk *chunk)
395
{
396
        struct vtx *vtx;
397
        struct chunk *buf;
398
399 0
        CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
400 0
        assert(chunk->type == chunk_t_shm);
401 0
        vtx = chunk->vtx;
402 0
        CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
403
404 0
        buf = VTAILQ_PREV(chunk, chunkhead, list);
405 0
        if (buf != NULL && buf->type == chunk_t_buf)
406
                /* Previous is a buf chunk, append to it */
407 0
                chunk_appendbuf(buf, chunk->shm.start.ptr, chunk->len);
408
        else {
409
                /* Create a new buf chunk and insert it before this */
410 0
                buf = chunk_newbuf(vtx, chunk->shm.start.ptr, chunk->len);
411 0
                AN(buf);
412 0
                VTAILQ_INSERT_BEFORE(chunk, buf, list);
413
        }
414
415
        /* Reset cursor chunk pointer, vslc_vtx_next will set it correctly */
416 0
        vtx->c.chunk = NULL;
417
418
        /* Remove from the shmref list and vtx, and put chunk back
419
           on the free list */
420 0
        VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
421 0
        VTAILQ_REMOVE(&vtx->chunks, chunk, list);
422 0
        VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
423 0
}
424
425
/* Append a set of records to a vtx structure */
426
static void
427 34073
vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
428
    size_t len)
429
{
430
        struct chunk *chunk;
431
        enum vsl_check i;
432
433 34073
        AN(vtx);
434 34073
        if (len == 0)
435 0
                return;
436 34073
        AN(start);
437
438 34073
        i = VSL_Check(vslq->c, start);
439 34078
        if (i == vsl_check_valid && !VTAILQ_EMPTY(&vtx->shmchunks_free)) {
440
                /* Shmref it */
441 15472
                chunk = VTAILQ_FIRST(&vtx->shmchunks_free);
442 15472
                CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
443 15472
                assert(chunk->type == chunk_t_shm);
444 15472
                assert(chunk->vtx == vtx);
445 15472
                VTAILQ_REMOVE(&vtx->shmchunks_free, chunk, list);
446 15472
                chunk->shm.start = *start;
447 15472
                chunk->len = len;
448 15472
                VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
449
450
                /* Append to shmref list */
451 15472
                VTAILQ_INSERT_TAIL(&vslq->shmrefs, chunk, shm.shmref);
452
        } else {
453 18606
                assert(i != vsl_check_e_inval);
454
                /* Buffer it */
455 18606
                chunk = VTAILQ_LAST(&vtx->chunks, chunkhead);
456 18606
                CHECK_OBJ_ORNULL(chunk, CHUNK_MAGIC);
457 18606
                if (chunk != NULL && chunk->type == chunk_t_buf) {
458
                        /* Tail is a buf chunk, append to that */
459 16134
                        chunk_appendbuf(chunk, start->ptr, len);
460
                } else {
461
                        /* Append new buf chunk */
462 2472
                        chunk = chunk_newbuf(vtx, start->ptr, len);
463 2472
                        AN(chunk);
464 2472
                        VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
465
                }
466
        }
467 34063
        vtx->len += len;
468
}
469
470
/* Allocate a new vtx structure */
471
static struct vtx *
472 8695
vtx_new(struct VSLQ *vslq)
473
{
474
        struct vtx *vtx;
475
        int i;
476
477 8695
        AN(vslq);
478 8695
        if (vslq->n_cache) {
479 4949
                AZ(VTAILQ_EMPTY(&vslq->cache));
480 4949
                vtx = VTAILQ_FIRST(&vslq->cache);
481 4949
                VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
482 4949
                vslq->n_cache--;
483
        } else {
484 3746
                ALLOC_OBJ(vtx, VTX_MAGIC);
485 3746
                AN(vtx);
486
487 3746
                VTAILQ_INIT(&vtx->child);
488 3746
                VTAILQ_INIT(&vtx->shmchunks_free);
489 14984
                for (i = 0; i < VTX_SHMCHUNKS; i++) {
490 11238
                        vtx->shmchunks[i].magic = CHUNK_MAGIC;
491 11238
                        vtx->shmchunks[i].type = chunk_t_shm;
492 11238
                        vtx->shmchunks[i].vtx = vtx;
493 11238
                        VTAILQ_INSERT_TAIL(&vtx->shmchunks_free,
494
                            &vtx->shmchunks[i], list);
495
                }
496 3746
                VTAILQ_INIT(&vtx->chunks);
497 3746
                VTAILQ_INIT(&vtx->synth);
498 3746
                vtx->c.magic = VSLC_VTX_MAGIC;
499 3746
                vtx->c.vtx = vtx;
500 3746
                vtx->c.cursor.priv_tbl = &vslc_vtx_tbl;
501 3746
                vtx->c.cursor.priv_data = &vtx->c;
502
        }
503
504 8695
        CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
505 8695
        vtx->key.vxid = 0;
506 8695
        vtx->t_start = VTIM_mono();
507 8695
        vtx->flags = 0;
508 8695
        vtx->type = VSL_t_unknown;
509 8695
        vtx->reason = VSL_r_unknown;
510 8695
        vtx->parent = NULL;
511 8695
        vtx->n_child = 0;
512 8695
        vtx->n_childready = 0;
513 8695
        vtx->n_descend = 0;
514 8695
        vtx->len = 0;
515 8695
        AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
516
517 8694
        return (vtx);
518
}
519
520
/* Disuse a vtx and all it's children, freeing any resources held. Free or
521
   cache the vtx for later use */
522
static void
523 8695
vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
524
{
525
        struct vtx *vtx;
526
        struct vtx *child;
527
        struct synth *synth;
528
        struct chunk *chunk;
529
530 8695
        AN(vslq);
531 8695
        TAKE_OBJ_NOTNULL(vtx, pvtx, VTX_MAGIC);
532
533 8695
        AN(vtx->flags & VTX_F_COMPLETE);
534 8695
        AN(vtx->flags & VTX_F_READY);
535 8695
        AZ(vtx->parent);
536
537 18218
        while (!VTAILQ_EMPTY(&vtx->child)) {
538 828
                child = VTAILQ_FIRST(&vtx->child);
539 828
                assert(child->parent == vtx);
540 828
                AN(vtx->n_child);
541 828
                assert(vtx->n_descend >= child->n_descend + 1);
542 828
                VTAILQ_REMOVE(&vtx->child, child, list_child);
543 828
                child->parent = NULL;
544 828
                vtx->n_child--;
545 828
                vtx->n_descend -= child->n_descend + 1;
546 828
                vtx_retire(vslq, &child);
547 828
                AZ(child);
548
        }
549 8695
        AZ(vtx->n_child);
550 8695
        AZ(vtx->n_descend);
551 8695
        vtx->n_childready = 0;
552 8695
        AN(VRB_REMOVE(vtx_tree, &vslq->tree, &vtx->key));
553 8692
        vtx->key.vxid = 0;
554 8692
        vtx->flags = 0;
555
556 19808
        while (!VTAILQ_EMPTY(&vtx->synth)) {
557 2424
                synth = VTAILQ_FIRST(&vtx->synth);
558 2424
                CHECK_OBJ_NOTNULL(synth, SYNTH_MAGIC);
559 2424
                VTAILQ_REMOVE(&vtx->synth, synth, list);
560 2424
                FREE_OBJ(synth);
561
        }
562
563 35328
        while (!VTAILQ_EMPTY(&vtx->chunks)) {
564 17942
                chunk = VTAILQ_FIRST(&vtx->chunks);
565 17942
                CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
566 17945
                VTAILQ_REMOVE(&vtx->chunks, chunk, list);
567 17945
                if (chunk->type == chunk_t_shm) {
568 15473
                        VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
569 15473
                        VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
570
                } else {
571 2472
                        assert(chunk->type == chunk_t_buf);
572 2472
                        chunk_freebuf(&chunk);
573 2471
                        AZ(chunk);
574
                }
575
        }
576 8694
        vtx->len = 0;
577 8694
        AN(vslq->n_outstanding);
578 8694
        vslq->n_outstanding--;
579
580 8694
        if (vslq->n_cache < VTX_CACHE) {
581 8694
                VTAILQ_INSERT_HEAD(&vslq->cache, vtx, list_child);
582 8694
                vslq->n_cache++;
583
        } else {
584 0
                FREE_OBJ(vtx);
585
        }
586 8694
}
587
588
/* Lookup a vtx by vxid from the managed list */
589
static struct vtx *
590 35519
vtx_lookup(const struct VSLQ *vslq, unsigned vxid)
591
{
592
        struct vtx_key lkey, *key;
593
        struct vtx *vtx;
594
595 35519
        AN(vslq);
596 35519
        lkey.vxid = vxid;
597 35519
        key = VRB_FIND(vtx_tree, &vslq->tree, &lkey);
598 35520
        if (key == NULL)
599 8694
                return (NULL);
600 26826
        CAST_OBJ_NOTNULL(vtx, (void *)key, VTX_MAGIC);
601 26826
        return (vtx);
602
}
603
604
/* Insert a new vtx into the managed list */
605
static struct vtx *
606 8694
vtx_add(struct VSLQ *vslq, unsigned vxid)
607
{
608
        struct vtx *vtx;
609
610 8694
        AN(vslq);
611 8694
        vtx = vtx_new(vslq);
612 8694
        AN(vtx);
613 8694
        vtx->key.vxid = vxid;
614 8694
        AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
615 8694
        VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_vtx);
616 8694
        vslq->n_outstanding++;
617 8694
        return (vtx);
618
}
619
620
/* Mark a vtx complete, update child counters and if possible push it or
621
   it's top parent to the ready state */
622
static void
623 8695
vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
624
{
625
626 8695
        AN(vslq);
627 8695
        AN(vtx->flags & VTX_F_END);
628 8695
        AZ(vtx->flags & VTX_F_COMPLETE);
629
630 8695
        if (vtx->type == VSL_t_unknown)
631 0
                vtx_diag(vtx, "vtx of unknown type marked complete");
632
633 8695
        vtx->flags |= VTX_F_COMPLETE;
634 8695
        VTAILQ_REMOVE(&vslq->incomplete, vtx, list_vtx);
635
636
        while (1) {
637 10351
                AZ(vtx->flags & VTX_F_READY);
638 18299
                if (vtx->flags & VTX_F_COMPLETE &&
639 8776
                    vtx->n_child == vtx->n_childready)
640 8695
                        vtx->flags |= VTX_F_READY;
641
                else
642 828
                        return;
643 8695
                if (vtx->parent == NULL) {
644
                        /* Top level vtx ready */
645 7867
                        VTAILQ_INSERT_TAIL(&vslq->ready, vtx, list_vtx);
646 7867
                        return;
647
                }
648 828
                vtx = vtx->parent;
649 828
                vtx->n_childready++;
650 828
                assert(vtx->n_child >= vtx->n_childready);
651
        }
652
}
653
654
/* Add a child to a parent, and update child counters */
655
static void
656 828
vtx_set_parent(struct vtx *parent, struct vtx *child)
657
{
658
659 828
        CHECK_OBJ_NOTNULL(parent, VTX_MAGIC);
660 828
        CHECK_OBJ_NOTNULL(child, VTX_MAGIC);
661 828
        assert(parent != child);
662 828
        AZ(parent->flags & VTX_F_COMPLETE);
663 828
        AZ(child->flags & VTX_F_COMPLETE);
664 828
        AZ(child->parent);
665 828
        child->parent = parent;
666 828
        VTAILQ_INSERT_TAIL(&parent->child, child, list_child);
667 828
        parent->n_child++;
668
        do
669 1038
                parent->n_descend += 1 + child->n_descend;
670 1038
        while ((parent = parent->parent) != NULL);
671 828
}
672
673
/* Parse a begin or link record. Returns the number of elements that was
674
   successfully parsed. */
675
static int
676 14442
vtx_parse_link(const char *str, enum VSL_transaction_e *ptype,
677
    unsigned *pvxid, enum VSL_reason_e *preason)
678
{
679
        char type[16], reason[16];
680
        unsigned vxid;
681
        int i;
682
        enum VSL_transaction_e et;
683
        enum VSL_reason_e er;
684
685 14442
        AN(str);
686 14442
        AN(ptype);
687 14442
        AN(pvxid);
688 14442
        AN(preason);
689
690 14442
        i = sscanf(str, "%15s %u %15s", type, &vxid, reason);
691 14442
        if (i < 1)
692 0
                return (0);
693
694
        /* transaction type */
695 45420
        for (et = VSL_t_unknown; et < VSL_t__MAX; et++)
696 45419
                if (!strcmp(type, vsl_t_names[et]))
697 14441
                        break;
698 14442
        if (et >= VSL_t__MAX)
699 0
                et = VSL_t_unknown;
700 14442
        *ptype = et;
701 14442
        if (i == 1)
702 0
                return (1);
703
704
        /* vxid */
705 14442
        assert((vxid & ~VSL_IDENTMASK) == 0);
706 14442
        *pvxid = vxid;
707 14442
        if (i == 2)
708 0
                return (2);
709
710
        /* transaction reason */
711 63585
        for (er = VSL_r_unknown; er < VSL_r__MAX; er++)
712 63220
                if (!strcmp(reason, vsl_r_names[er]))
713 14077
                        break;
714 14442
        if (er >= VSL_r__MAX)
715 360
                er = VSL_r_unknown;
716 14442
        *preason = er;
717 14442
        return (3);
718
}
719
720
/* Parse and process a begin record */
721
static int
722 8695
vtx_scan_begin(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
723
{
724
        int i;
725
        enum VSL_transaction_e type;
726
        enum VSL_reason_e reason;
727
        unsigned p_vxid;
728
        struct vtx *p_vtx;
729
730 8695
        assert(VSL_TAG(ptr) == SLT_Begin);
731
732 8695
        AZ(vtx->flags & VTX_F_READY);
733
734 8695
        i = vtx_parse_link(VSL_CDATA(ptr), &type, &p_vxid, &reason);
735 8695
        if (i != 3)
736 0
                return (vtx_diag_tag(vtx, ptr, "parse error"));
737 8695
        if (type == VSL_t_unknown)
738 0
                (void)vtx_diag_tag(vtx, ptr, "unknown vxid type");
739
740
        /* Check/set vtx type */
741 8695
        if (vtx->type != VSL_t_unknown && vtx->type != type)
742
                /* Type not matching the one previously set by a link
743
                   record */
744 0
                (void)vtx_diag_tag(vtx, ptr, "type mismatch");
745 8695
        vtx->type = type;
746 8695
        vtx->reason = reason;
747
748 8695
        if (p_vxid == 0)
749
                /* Zero means no parent */
750 2976
                return (0);
751 5719
        if (p_vxid == vtx->key.vxid)
752 0
                return (vtx_diag_tag(vtx, ptr, "link to self"));
753
754 5719
        if (vslq->grouping == VSL_g_vxid)
755 4603
                return (0);     /* No links */
756 1536
        if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_req &&
757 420
            vtx->reason == VSL_r_rxreq)
758 288
                return (0);     /* No links */
759
760 828
        if (vtx->parent != NULL) {
761 210
                if (vtx->parent->key.vxid != p_vxid) {
762
                        /* This vtx already belongs to a different
763
                           parent */
764 0
                        return (vtx_diag_tag(vtx, ptr, "link mismatch"));
765
                } else
766
                        /* Link already exists */
767 210
                        return (0);
768
        }
769
770 618
        p_vtx = vtx_lookup(vslq, p_vxid);
771 618
        if (p_vtx == NULL) {
772
                /* Not seen parent yet. Create it. */
773 330
                p_vtx = vtx_add(vslq, p_vxid);
774 330
                AN(p_vtx);
775
        } else {
776 288
                CHECK_OBJ_NOTNULL(p_vtx, VTX_MAGIC);
777 288
                if (p_vtx->flags & VTX_F_COMPLETE)
778 0
                        return (vtx_diag_tag(vtx, ptr, "link too late"));
779
        }
780
781
        /* Create link */
782 618
        vtx_set_parent(p_vtx, vtx);
783
784 618
        return (0);
785
}
786
787
/* Parse and process a link record */
788
static int
789 5748
vtx_scan_link(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
790
{
791
        int i;
792
        enum VSL_transaction_e c_type;
793
        enum VSL_reason_e c_reason;
794
        unsigned c_vxid;
795
        struct vtx *c_vtx;
796
797 5748
        assert(VSL_TAG(ptr) == SLT_Link);
798
799 5748
        AZ(vtx->flags & VTX_F_READY);
800
801 5748
        i = vtx_parse_link(VSL_CDATA(ptr), &c_type, &c_vxid, &c_reason);
802 5748
        if (i != 3)
803 0
                return (vtx_diag_tag(vtx, ptr, "parse error"));
804 5748
        if (c_type == VSL_t_unknown)
805 0
                (void)vtx_diag_tag(vtx, ptr, "unknown vxid type");
806
807 5748
        if (vslq->grouping == VSL_g_vxid)
808 4632
                return (0);     /* No links */
809 1116
        if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_sess)
810 288
                return (0);     /* No links */
811
812 828
        if (c_vxid == 0)
813 0
                return (vtx_diag_tag(vtx, ptr, "illegal link vxid"));
814 828
        if (c_vxid == vtx->key.vxid)
815 0
                return (vtx_diag_tag(vtx, ptr, "link to self"));
816
817
        /* Lookup and check child vtx */
818 828
        c_vtx = vtx_lookup(vslq, c_vxid);
819 828
        if (c_vtx == NULL) {
820
                /* Child not seen before. Insert it and create link */
821 210
                c_vtx = vtx_add(vslq, c_vxid);
822 210
                AN(c_vtx);
823 210
                AZ(c_vtx->parent);
824 210
                c_vtx->type = c_type;
825 210
                c_vtx->reason = c_reason;
826 210
                vtx_set_parent(vtx, c_vtx);
827 210
                return (0);
828
        }
829
830 618
        CHECK_OBJ_NOTNULL(c_vtx, VTX_MAGIC);
831 618
        if (c_vtx->parent == vtx)
832
                /* Link already exists */
833 618
                return (0);
834 0
        if (c_vtx->parent != NULL && c_vtx->parent != vtx)
835 0
                return (vtx_diag_tag(vtx, ptr, "duplicate link"));
836 0
        if (c_vtx->flags & VTX_F_COMPLETE)
837 0
                return (vtx_diag_tag(vtx, ptr, "link too late"));
838 0
        if (c_vtx->type != VSL_t_unknown && c_vtx->type != c_type)
839 0
                (void)vtx_diag_tag(vtx, ptr, "type mismatch");
840
841 0
        c_vtx->type = c_type;
842 0
        c_vtx->reason = c_reason;
843 0
        vtx_set_parent(vtx, c_vtx);
844 0
        return (0);
845
}
846
847
/* Scan the records of a vtx, performing processing actions on specific
848
   records */
849
static void
850 36483
vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
851
{
852
        const uint32_t *ptr;
853
        enum VSL_tag_e tag;
854
855 544593
        while (!(vtx->flags & VTX_F_COMPLETE) &&
856 249647
            vslc_vtx_next(&vtx->c.cursor) == 1) {
857 221869
                ptr = vtx->c.cursor.rec.ptr;
858 221869
                if (VSL_ID(ptr) != vtx->key.vxid) {
859 -130
                        (void)vtx_diag_tag(vtx, ptr, "vxid mismatch");
860 0
                        continue;
861
                }
862
863 221999
                tag = VSL_TAG(ptr);
864 221999
                assert(tag != SLT__Batch);
865
866 221999
                switch (tag) {
867
                case SLT_Begin:
868 8695
                        if (vtx->flags & VTX_F_BEGIN)
869 0
                                (void)vtx_diag_tag(vtx, ptr, "duplicate begin");
870
                        else {
871 8695
                                (void)vtx_scan_begin(vslq, vtx, ptr);
872 8695
                                vtx->flags |= VTX_F_BEGIN;
873
                        }
874 8695
                        break;
875
876
                case SLT_Link:
877 5748
                        (void)vtx_scan_link(vslq, vtx, ptr);
878 5747
                        break;
879
880
                case SLT_End:
881 8695
                        AZ(vtx->flags & VTX_F_END);
882 8695
                        vtx->flags |= VTX_F_END;
883 8695
                        vtx_mark_complete(vslq, vtx);
884 8695
                        break;
885
886
                default:
887 198861
                        break;
888
                }
889
        }
890 36594
}
891
892
/* Force a vtx into complete status by synthing the necessary outstanding
893
   records */
894
static void
895 1212
vtx_force(struct VSLQ *vslq, struct vtx *vtx, const char *reason)
896
{
897
898 1212
        AZ(vtx->flags & VTX_F_COMPLETE);
899 1212
        AZ(vtx->flags & VTX_F_READY);
900 1212
        vtx_scan(vslq, vtx);
901 1212
        if (!(vtx->flags & VTX_F_BEGIN))
902 0
                vtx_synth_rec(vtx, SLT_Begin, "%s %u synth",
903 0
                    vsl_t_names[vtx->type], 0);
904 1212
        vtx_diag(vtx, reason);
905 1212
        if (!(vtx->flags & VTX_F_END))
906 1212
                vtx_synth_rec(vtx, SLT_End, "synth");
907 1212
        vtx_scan(vslq, vtx);
908 1212
        AN(vtx->flags & VTX_F_COMPLETE);
909 1212
}
910
911
/* Build transaction array, do the query and callback. Returns 0 or the
912
   return value from func */
913
static int
914 6679
vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
915
    void *priv)
916 6679
{
917 6679
        unsigned n = vtx->n_descend + 1;
918 6679
        struct vtx *vtxs[n];
919 6679
        struct VSL_transaction trans[n];
920 6679
        struct VSL_transaction *ptrans[n + 1];
921
        unsigned i, j;
922
923 6679
        AN(vslq);
924 6679
        CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
925 6679
        AN(vtx->flags & VTX_F_READY);
926 6679
        AN(func);
927
928 6787
        if (vslq->grouping == VSL_g_session &&
929 108
            vtx->type != VSL_t_sess)
930 0
                return (0);
931 6991
        if (vslq->grouping == VSL_g_request &&
932 312
            vtx->type != VSL_t_req)
933 0
                return (0);
934
935
        /* Build transaction array */
936 6679
        AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
937 6679
        vtxs[0] = vtx;
938 6679
        trans[0].level = 1;
939 6679
        trans[0].vxid = vtx->key.vxid;
940 6679
        trans[0].vxid_parent = 0;
941 6679
        trans[0].type = vtx->type;
942 6679
        trans[0].reason = vtx->reason;
943 6679
        trans[0].c = &vtx->c.cursor;
944 6679
        i = 1;
945 6679
        j = 0;
946 20865
        while (j < i) {
947 8335
                VTAILQ_FOREACH(vtx, &vtxs[j]->child, list_child) {
948 828
                        assert(i < n);
949 828
                        AN(vslc_vtx_reset(&vtx->c.cursor) == vsl_end);
950 828
                        vtxs[i] = vtx;
951 828
                        if (vtx->reason == VSL_r_restart)
952
                                /* Restarts stay at the same level as parent */
953 24
                                trans[i].level = trans[j].level;
954
                        else
955 804
                                trans[i].level = trans[j].level + 1;
956 828
                        trans[i].vxid = vtx->key.vxid;
957 828
                        trans[i].vxid_parent = trans[j].vxid;
958 828
                        trans[i].type = vtx->type;
959 828
                        trans[i].reason = vtx->reason;
960 828
                        trans[i].c = &vtx->c.cursor;
961 828
                        i++;
962
                }
963 7507
                j++;
964
        }
965 6679
        assert(i == n);
966
967
        /* Build pointer array */
968 14186
        for (i = 0; i < n; i++)
969 7507
                ptrans[i] = &trans[i];
970 6679
        ptrans[i] = NULL;
971
972
        /* Query test goes here */
973 6679
        if (vslq->query != NULL && !vslq_runquery(vslq->query, ptrans))
974 2626
                return (0);
975
976
        /* Callback */
977 4053
        return ((func)(vslq->vsl, ptrans, priv));
978
}
979
980
/* Create a synthetic log record. The record will be inserted at the
981
   current cursor offset */
982
static void
983 2424
vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...)
984
{
985
        struct synth *synth, *it;
986
        va_list ap;
987
        char *buf;
988
        int l, buflen;
989
990 2424
        ALLOC_OBJ(synth, SYNTH_MAGIC);
991 2424
        AN(synth);
992
993 2424
        buf = (char *)&synth->data[2];
994 2424
        buflen = sizeof (synth->data) - 2 * sizeof (uint32_t);
995 2424
        va_start(ap, fmt);
996 2424
        l = vsnprintf(buf, buflen, fmt, ap);
997 2424
        assert(l >= 0);
998 2424
        va_end(ap);
999 2424
        if (l > buflen - 1)
1000 0
                l = buflen - 1;
1001 2424
        buf[l++] = '\0';        /* NUL-terminated */
1002 2424
        synth->data[1] = vtx->key.vxid;
1003 2424
        switch (vtx->type) {
1004
        case VSL_t_req:
1005 216
                synth->data[1] |= VSL_CLIENTMARKER;
1006 216
                break;
1007
        case VSL_t_bereq:
1008 24
                synth->data[1] |= VSL_BACKENDMARKER;
1009 24
                break;
1010
        default:
1011 2184
                break;
1012
        }
1013 2424
        synth->data[0] = (((tag & 0xff) << 24) | l);
1014 2424
        synth->offset = vtx->c.offset;
1015
1016 2424
        VTAILQ_FOREACH_REVERSE(it, &vtx->synth, synthhead, list) {
1017
                /* Make sure the synth list is sorted on offset */
1018 1212
                CHECK_OBJ_NOTNULL(it, SYNTH_MAGIC);
1019 1212
                if (synth->offset >= it->offset)
1020 1212
                        break;
1021
        }
1022 2424
        if (it != NULL)
1023 1212
                VTAILQ_INSERT_AFTER(&vtx->synth, it, synth, list);
1024
        else
1025 1212
                VTAILQ_INSERT_HEAD(&vtx->synth, synth, list);
1026
1027
        /* Update cursor */
1028 2424
        CHECK_OBJ_ORNULL(vtx->c.synth, SYNTH_MAGIC);
1029 2424
        if (vtx->c.synth == NULL || vtx->c.synth->offset > synth->offset)
1030 1212
                vtx->c.synth = synth;
1031 2424
}
1032
1033
/* Add a diagnostic SLT_VSL synth record to the vtx. */
1034
static int
1035 1212
vtx_diag(struct vtx *vtx, const char *msg)
1036
{
1037
1038 1212
        vtx_synth_rec(vtx, SLT_VSL, msg);
1039 1212
        return (-1);
1040
}
1041
1042
/* Add a SLT_VSL diag synth record to the vtx. Takes an offending record
1043
   that will be included in the log record */
1044
static int
1045 0
vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr, const char *reason)
1046
{
1047
1048 0
        vtx_synth_rec(vtx, SLT_VSL, "%s (%u:%s \"%.*s\")", reason, VSL_ID(ptr),
1049 0
            VSL_tags[VSL_TAG(ptr)], (int)VSL_LEN(ptr), VSL_CDATA(ptr));
1050 0
        return (-1);
1051
}
1052
1053
struct VSLQ *
1054 2424
VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
1055
    enum VSL_grouping_e grouping, const char *querystring)
1056
{
1057
        struct vslq_query *query;
1058
        struct VSLQ *vslq;
1059
1060 2424
        CHECK_OBJ_NOTNULL(vsl, VSL_MAGIC);
1061 2424
        if (grouping > VSL_g_session) {
1062 0
                (void)vsl_diag(vsl, "Illegal query grouping");
1063 0
                return (NULL);
1064
        }
1065 2424
        if (querystring != NULL) {
1066 780
                query = vslq_newquery(vsl, grouping, querystring);
1067 780
                if (query == NULL)
1068 108
                        return (NULL);
1069
        } else
1070 1644
                query = NULL;
1071
1072 2316
        ALLOC_OBJ(vslq, VSLQ_MAGIC);
1073 2316
        AN(vslq);
1074 2316
        vslq->vsl = vsl;
1075 2316
        if (cp != NULL) {
1076 1908
                vslq->c = *cp;
1077 1908
                *cp = NULL;
1078
        }
1079 2316
        vslq->grouping = grouping;
1080 2316
        vslq->query = query;
1081
1082
        /* Setup normal mode */
1083 2316
        VRB_INIT(&vslq->tree);
1084 2316
        VTAILQ_INIT(&vslq->ready);
1085 2316
        VTAILQ_INIT(&vslq->incomplete);
1086 2316
        VTAILQ_INIT(&vslq->shmrefs);
1087 2316
        VTAILQ_INIT(&vslq->cache);
1088
1089
        /* Setup raw mode */
1090 2316
        vslq->raw.c.magic = VSLC_RAW_MAGIC;
1091 2316
        vslq->raw.c.cursor.priv_tbl = &vslc_raw_tbl;
1092 2316
        vslq->raw.c.cursor.priv_data = &vslq->raw.c;
1093 2316
        vslq->raw.trans.level = 0;
1094 2316
        vslq->raw.trans.type = VSL_t_raw;
1095 2316
        vslq->raw.trans.reason = VSL_r_unknown;
1096 2316
        vslq->raw.trans.c = &vslq->raw.c.cursor;
1097 2316
        vslq->raw.ptrans[0] = &vslq->raw.trans;
1098 2316
        vslq->raw.ptrans[1] = NULL;
1099
1100 2316
        return (vslq);
1101
}
1102
1103
void
1104 2280
VSLQ_Delete(struct VSLQ **pvslq)
1105
{
1106
        struct VSLQ *vslq;
1107
        struct vtx *vtx;
1108
1109 2280
        TAKE_OBJ_NOTNULL(vslq, pvslq, VSLQ_MAGIC);
1110
1111 2280
        (void)VSLQ_Flush(vslq, NULL, NULL);
1112 2280
        AZ(vslq->n_outstanding);
1113
1114 2280
        if (vslq->c != NULL) {
1115 2280
                VSL_DeleteCursor(vslq->c);
1116 2280
                vslq->c = NULL;
1117
        }
1118
1119 2280
        if (vslq->query != NULL)
1120 672
                vslq_deletequery(&vslq->query);
1121 2280
        AZ(vslq->query);
1122
1123 8307
        while (!VTAILQ_EMPTY(&vslq->cache)) {
1124 3747
                AN(vslq->n_cache);
1125 3747
                vtx = VTAILQ_FIRST(&vslq->cache);
1126 3747
                VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
1127 3747
                vslq->n_cache--;
1128 3747
                FREE_OBJ(vtx);
1129
        }
1130
1131 2280
        FREE_OBJ(vslq);
1132 2280
}
1133
1134
void
1135 372
VSLQ_SetCursor(struct VSLQ *vslq, struct VSL_cursor **cp)
1136
{
1137
1138 372
        CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1139
1140 372
        if (vslq->c != NULL) {
1141 0
                (void)VSLQ_Flush(vslq, NULL, NULL);
1142 0
                AZ(vslq->n_outstanding);
1143 0
                VSL_DeleteCursor(vslq->c);
1144 0
                vslq->c = NULL;
1145
        }
1146
1147 372
        if (cp != NULL) {
1148 372
                AN(*cp);
1149 372
                vslq->c = *cp;
1150 372
                *cp = NULL;
1151
        }
1152 372
}
1153
1154
/* Regard each log line as a single transaction, feed it through the query
1155
   and do the callback */
1156
static int
1157 89453
vslq_raw(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1158
{
1159 89453
        enum vsl_status r = vsl_more;
1160
        int i;
1161
1162 89453
        assert(vslq->grouping == VSL_g_raw);
1163
1164 89453
        assert(vslq->raw.offset <= vslq->raw.len);
1165
        do {
1166 93783
                if (vslq->raw.offset == vslq->raw.len) {
1167 55246
                        r = VSL_Next(vslq->c);
1168 55247
                        if (r != vsl_more)
1169 42405
                                return (r);
1170 12842
                        AN(vslq->c->rec.ptr);
1171 12842
                        vslq->raw.start = vslq->c->rec;
1172 12842
                        if (VSL_TAG(vslq->c->rec.ptr) == SLT__Batch)
1173 8660
                                vslq->raw.len = VSL_END(vslq->c->rec.ptr,
1174 4330
                                    VSL_BATCHLEN(vslq->c->rec.ptr)) -
1175
                                    vslq->c->rec.ptr;
1176
                        else
1177 8512
                                vslq->raw.len = VSL_NEXT(vslq->raw.start.ptr) -
1178
                                    vslq->raw.start.ptr;
1179 12842
                        assert(vslq->raw.len > 0);
1180 12842
                        vslq->raw.offset = 0;
1181
                }
1182
1183 51379
                vslq->raw.c.ptr = vslq->raw.start.ptr + vslq->raw.offset;
1184 51379
                vslq->raw.c.cursor.rec.ptr = NULL;
1185 51379
                vslq->raw.trans.vxid = VSL_ID(vslq->raw.c.ptr);
1186 51379
                vslq->raw.offset += VSL_NEXT(vslq->raw.c.ptr) - vslq->raw.c.ptr;
1187 51379
        } while (VSL_TAG(vslq->raw.c.ptr) == SLT__Batch);
1188
1189 47049
        assert (r == vsl_more);
1190
1191 47049
        if (func == NULL)
1192 0
                return (r);
1193
1194 50457
        if (vslq->query != NULL &&
1195 3408
            !vslq_runquery(vslq->query, vslq->raw.ptrans))
1196 3072
                return (r);
1197
1198 43977
        i = (func)(vslq->vsl, vslq->raw.ptrans, priv);
1199 43978
        if (i)
1200 672
                return (i);
1201
1202 43306
        return (r);
1203
}
1204
1205
/* Check the beginning of the shmref list, and buffer refs that are at
1206
 * warning level.
1207
 */
1208
static enum vsl_status
1209 50081
vslq_shmref_check(struct VSLQ *vslq)
1210
{
1211
        struct chunk *chunk;
1212
        enum vsl_check i;
1213
1214 100162
        while ((chunk = VTAILQ_FIRST(&vslq->shmrefs)) != NULL) {
1215 34337
                CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
1216 34329
                assert(chunk->type == chunk_t_shm);
1217 34329
                i = VSL_Check(vslq->c, &chunk->shm.start);
1218 34319
                switch (i) {
1219
                case vsl_check_valid:
1220
                        /* First on list is OK, refs behind it must also
1221
                           be OK */
1222 34319
                        return (vsl_more);
1223
                case vsl_check_warn:
1224
                        /* Buffer this chunk */
1225 0
                        chunk_shm_to_buf(vslq, chunk);
1226 0
                        break;
1227
                default:
1228
                        /* Too late to buffer */
1229 0
                        return (vsl_e_overrun);
1230
                }
1231
        }
1232
1233 15744
        return (vsl_more);
1234
}
1235
1236
/* Process next input record */
1237
static enum vsl_status
1238 100462
vslq_next(struct VSLQ *vslq)
1239
{
1240
        struct VSL_cursor *c;
1241
        enum vsl_status r;
1242
        enum VSL_tag_e tag;
1243
        ssize_t len;
1244
        unsigned vxid;
1245
        struct vtx *vtx;
1246
1247 100462
        c = vslq->c;
1248 100462
        r = VSL_Next(c);
1249 100468
        if (r != vsl_more)
1250 50366
                return (r);
1251
1252 50102
        assert (r == vsl_more);
1253
1254 50102
        tag = (enum VSL_tag_e)VSL_TAG(c->rec.ptr);
1255 50102
        if (tag == SLT__Batch) {
1256 21422
                vxid = VSL_BATCHID(c->rec.ptr);
1257 21422
                len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
1258
                    c->rec.ptr;
1259 21422
                if (len == 0)
1260 0
                        return (r);
1261 21422
                tag = (enum VSL_tag_e)VSL_TAG(VSL_NEXT(c->rec.ptr));
1262
        } else {
1263 28680
                vxid = VSL_ID(c->rec.ptr);
1264 28680
                len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
1265
        }
1266 50102
        assert(len > 0);
1267 50102
        if (vxid == 0)
1268
                /* Skip non-transactional records */
1269 16022
                return (r);
1270
1271 34080
        vtx = vtx_lookup(vslq, vxid);
1272 34078
        if (vtx == NULL && tag == SLT_Begin) {
1273 8155
                vtx = vtx_add(vslq, vxid);
1274 8155
                AN(vtx);
1275
        }
1276 34078
        if (vtx != NULL) {
1277 34076
                vtx_append(vslq, vtx, &c->rec, len);
1278 34064
                vtx_scan(vslq, vtx);
1279
        }
1280
1281 34064
        return (r);
1282
}
1283
1284
/* Test query and report any ready transactions */
1285
static int
1286 8959
vslq_process_ready(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1287
{
1288
        struct vtx *vtx;
1289 8959
        int i = 0;
1290
1291 8959
        AN(vslq);
1292
1293 24561
        while (!VTAILQ_EMPTY(&vslq->ready)) {
1294 7867
                vtx = VTAILQ_FIRST(&vslq->ready);
1295 7867
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1296 7867
                VTAILQ_REMOVE(&vslq->ready, vtx, list_vtx);
1297 7867
                AN(vtx->flags & VTX_F_READY);
1298 7867
                if (func != NULL)
1299 6679
                        i = vslq_callback(vslq, vtx, func, priv);
1300 7866
                vtx_retire(vslq, &vtx);
1301 7867
                AZ(vtx);
1302 7867
                if (i)
1303 1224
                        return (i);
1304
        }
1305
1306 7735
        return (0);
1307
}
1308
1309
/* Process the input cursor, calling the callback function on matching
1310
   transaction sets */
1311
int
1312 189917
VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1313
{
1314
        enum vsl_status r;
1315
        int i;
1316
        double now;
1317
        struct vtx *vtx;
1318
1319 189917
        CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1320
1321
        /* Check that we have a cursor */
1322 189917
        if (vslq->c == NULL)
1323 0
                return (vsl_e_abandon);
1324
1325 189917
        if (vslq->grouping == VSL_g_raw)
1326 89454
                return (vslq_raw(vslq, func, priv));
1327
1328
        /* Process next cursor input */
1329 100463
        r = vslq_next(vslq);
1330 100447
        if (r != vsl_more)
1331
                /* At end of log or cursor reports error condition */
1332 50366
                return (r);
1333
1334
        /* Check shmref list and buffer if necessary */
1335 50081
        r = vslq_shmref_check(vslq);
1336 50054
        if (r != vsl_more)
1337
                /* Buffering of shm ref failed */
1338 0
                return (r);
1339
1340 50054
        assert (r == vsl_more);
1341
1342
        /* Check vtx timeout */
1343 50054
        now = VTIM_mono();
1344 50105
        while (!VTAILQ_EMPTY(&vslq->incomplete)) {
1345 34900
                vtx = VTAILQ_FIRST(&vslq->incomplete);
1346 34900
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1347 34899
                if (now - vtx->t_start < vslq->vsl->T_opt)
1348 34875
                        break;
1349 24
                vtx_force(vslq, vtx, "timeout");
1350 24
                AN(vtx->flags & VTX_F_COMPLETE);
1351
        }
1352
1353
        /* Check store limit */
1354 100208
        while (vslq->n_outstanding > vslq->vsl->L_opt &&
1355 0
            !(VTAILQ_EMPTY(&vslq->incomplete))) {
1356 0
                vtx = VTAILQ_FIRST(&vslq->incomplete);
1357 0
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1358 0
                vtx_force(vslq, vtx, "store overflow");
1359 0
                AN(vtx->flags & VTX_F_COMPLETE);
1360 0
                i = vslq_process_ready(vslq, func, priv);
1361 0
                if (i)
1362
                        /* User return code */
1363 0
                        return (i);
1364
        }
1365
1366
        /* Check ready list */
1367 50104
        if (!VTAILQ_EMPTY(&vslq->ready)) {
1368 6679
                i = vslq_process_ready(vslq, func, priv);
1369 6679
                if (i)
1370
                        /* User return code */
1371 1224
                        return (i);
1372
        }
1373
1374 48880
        return (vsl_more);
1375
}
1376
1377
/* Flush any incomplete vtx held on to. Do callbacks if func != NULL */
1378
int
1379 2280
VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1380
{
1381
        struct vtx *vtx;
1382
1383 2280
        CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1384
1385 5748
        while (!VTAILQ_EMPTY(&vslq->incomplete)) {
1386 1188
                vtx = VTAILQ_FIRST(&vslq->incomplete);
1387 1188
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1388 1188
                AZ(vtx->flags & VTX_F_COMPLETE);
1389 1188
                vtx_force(vslq, vtx, "flush");
1390
        }
1391
1392 2280
        return (vslq_process_ready(vslq, func, priv));
1393
}