varnish-cache/lib/libvarnishapi/vsl_dispatch.c
1
/*-
2
 * Copyright (c) 2006 Verdens Gang AS
3
 * Copyright (c) 2006-2015 Varnish Software AS
4
 * All rights reserved.
5
 *
6
 * Author: Martin Blix Grydeland <martin@varnish-software.com>
7
 *
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions
10
 * are met:
11
 * 1. Redistributions of source code must retain the above copyright
12
 *    notice, this list of conditions and the following disclaimer.
13
 * 2. Redistributions in binary form must reproduce the above copyright
14
 *    notice, this list of conditions and the following disclaimer in the
15
 *    documentation and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
21
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27
 * SUCH DAMAGE.
28
 *
29
 */
30
31
#include "config.h"
32
33
#include <stdarg.h>
34
#include <stdint.h>
35
#include <stdio.h>
36
#include <stdlib.h>
37
#include <string.h>
38
39
#include "vdef.h"
40
#include "vas.h"
41
#include "miniobj.h"
42
43
#include "vqueue.h"
44
#include "vre.h"
45
#include "vtim.h"
46
#include "vtree.h"
47
48
#include "vapi/vsl.h"
49
50
#include "vsl_api.h"
51
52
#define VTX_CACHE 10
53
#define VTX_BUFSIZE_MIN 64
54
#define VTX_SHMCHUNKS 3
55
56
static const char * const vsl_t_names[VSL_t__MAX] = {
57
        [VSL_t_unknown] = "unknown",
58
        [VSL_t_sess]    = "sess",
59
        [VSL_t_req]     = "req",
60
        [VSL_t_bereq]   = "bereq",
61
        [VSL_t_raw]     = "raw",
62
};
63
64
static const char * const vsl_r_names[VSL_r__MAX] = {
65
        [VSL_r_unknown] = "unknown",
66
        [VSL_r_http_1]  = "HTTP/1",
67
        [VSL_r_rxreq]   = "rxreq",
68
        [VSL_r_esi]     = "esi",
69
        [VSL_r_restart] = "restart",
70
        [VSL_r_pass]    = "pass",
71
        [VSL_r_fetch]   = "fetch",
72
        [VSL_r_bgfetch] = "bgfetch",
73
        [VSL_r_pipe]    = "pipe",
74
};
75
76
struct vtx;
77
78
struct vslc_raw {
79
        unsigned                magic;
80
#define VSLC_RAW_MAGIC          0x247EBD44
81
82
        struct VSL_cursor       cursor;
83
84
        const uint32_t          *ptr;
85
};
86
87
struct synth {
88
        unsigned                magic;
89
#define SYNTH_MAGIC             0xC654479F
90
91
        VTAILQ_ENTRY(synth)     list;
92
        size_t                  offset;
93
        uint32_t                data[2 + 64 / sizeof (uint32_t)];
94
};
95
VTAILQ_HEAD(synthhead, synth);
96
97
enum chunk_t {
98
        chunk_t__unassigned,
99
        chunk_t_shm,
100
        chunk_t_buf,
101
};
102
103
struct chunk {
104
        unsigned                                magic;
105
#define CHUNK_MAGIC                             0x48DC0194
106
        enum chunk_t                            type;
107
        union {
108
                struct {
109
                        struct VSLC_ptr         start;
110
                        VTAILQ_ENTRY(chunk)     shmref;
111
                } shm;
112
                struct {
113
                        uint32_t                *data;
114
                        size_t                  space;
115
                } buf;
116
        };
117
        size_t                                  len;
118
        struct vtx                              *vtx;
119
        VTAILQ_ENTRY(chunk)                     list;
120
};
121
VTAILQ_HEAD(chunkhead, chunk);
122
123
struct vslc_vtx {
124
        unsigned                magic;
125
#define VSLC_VTX_MAGIC          0x74C6523F
126
127
        struct VSL_cursor       cursor;
128
129
        struct vtx              *vtx;
130
        struct synth            *synth;
131
        struct chunk            *chunk;
132
        size_t                  chunkstart;
133
        size_t                  offset;
134
};
135
136
struct vtx_key {
137
        unsigned                vxid;
138
        VRB_ENTRY(vtx_key)      entry;
139
};
140
VRB_HEAD(vtx_tree, vtx_key);
141
142
struct vtx {
143
        struct vtx_key          key;
144
        unsigned                magic;
145
#define VTX_MAGIC               0xACC21D09
146
        VTAILQ_ENTRY(vtx)       list_child;
147
        VTAILQ_ENTRY(vtx)       list_vtx;
148
149
        double                  t_start;
150
        unsigned                flags;
151
#define VTX_F_BEGIN             0x1 /* Begin record processed */
152
#define VTX_F_END               0x2 /* End record processed */
153
#define VTX_F_COMPLETE          0x4 /* Marked complete. No new children
154
                                       should be appended */
155
#define VTX_F_READY             0x8 /* This vtx and all it's children are
156
                                       complete */
157
158
        enum VSL_transaction_e  type;
159
        enum VSL_reason_e       reason;
160
161
        struct vtx              *parent;
162
        VTAILQ_HEAD(,vtx)       child;
163
        unsigned                n_child;
164
        unsigned                n_childready;
165
        unsigned                n_descend;
166
167
        VTAILQ_HEAD(,synth)     synth;
168
169
        struct chunk            shmchunks[VTX_SHMCHUNKS];
170
        struct chunkhead        shmchunks_free;
171
172
        struct chunkhead        chunks;
173
        size_t                  len;
174
175
        struct vslc_vtx         c;
176
};
177
178
struct VSLQ {
179
        unsigned                magic;
180
#define VSLQ_MAGIC              0x23A8BE97
181
182
        struct VSL_data         *vsl;
183
        struct VSL_cursor       *c;
184
        struct vslq_query       *query;
185
186
        enum VSL_grouping_e     grouping;
187
188
        /* Structured mode */
189
        struct vtx_tree         tree;
190
        VTAILQ_HEAD(,vtx)       ready;
191
        VTAILQ_HEAD(,vtx)       incomplete;
192
        unsigned                n_outstanding;
193
        struct chunkhead        shmrefs;
194
        VTAILQ_HEAD(,vtx)       cache;
195
        unsigned                n_cache;
196
197
        /* Raw mode */
198
        struct {
199
                struct vslc_raw         c;
200
                struct VSL_transaction  trans;
201
                struct VSL_transaction  *ptrans[2];
202
                struct VSLC_ptr         start;
203
                ssize_t                 len;
204
                size_t                  offset;
205
        } raw;
206
};
207
208
static void vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...);
209
/*lint -esym(534, vtx_diag) */
210
static int vtx_diag(struct vtx *vtx, const char *msg);
211
/*lint -esym(534, vtx_diag_tag) */
212
static int vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr,
213
    const char *reason);
214
215
static inline int
216 4484
vtx_keycmp(const struct vtx_key *a, const struct vtx_key *b)
217
{
218 4484
        if (a->vxid < b->vxid)
219 82
                return (-1);
220 4402
        if (a->vxid > b->vxid)
221 2265
                return (1);
222 2137
        return (0);
223
}
224
225
VRB_PROTOTYPE_STATIC(vtx_tree, vtx_key, entry, vtx_keycmp)
226 5094
VRB_GENERATE_STATIC(vtx_tree, vtx_key, entry, vtx_keycmp)
227
228
static int
229 5239
vslc_raw_next(const struct VSL_cursor *cursor)
230
{
231
        struct vslc_raw *c;
232
233 5239
        CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
234 5239
        assert(&c->cursor == cursor);
235
236 5239
        AN(c->ptr);
237 5239
        if (c->cursor.rec.ptr == NULL) {
238 2641
                c->cursor.rec.ptr = c->ptr;
239 2641
                return (1);
240
        } else {
241 2598
                c->cursor.rec.ptr = NULL;
242 2598
                return (0);
243
        }
244
}
245
246
static int
247 298
vslc_raw_reset(const struct VSL_cursor *cursor)
248
{
249
        struct vslc_raw *c;
250
251 298
        CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
252 298
        assert(&c->cursor == cursor);
253
254 298
        AN(c->ptr);
255 298
        c->cursor.rec.ptr = NULL;
256
257 298
        return (0);
258
}
259
260
static const struct vslc_tbl vslc_raw_tbl = {
261
        .magic  = VSLC_TBL_MAGIC,
262
        .delete = NULL,
263
        .next   = vslc_raw_next,
264
        .reset  = vslc_raw_reset,
265
        .check  = NULL,
266
};
267
268
static int
269 37964
vslc_vtx_next(const struct VSL_cursor *cursor)
270
{
271
        struct vslc_vtx *c;
272
        const uint32_t *ptr;
273
274 37964
        CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
275 37964
        assert(&c->cursor == cursor);
276 37964
        CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
277
278
        do {
279 40916
                CHECK_OBJ_ORNULL(c->synth, SYNTH_MAGIC);
280 40916
                if (c->synth != NULL && c->synth->offset == c->offset) {
281
                        /* We're at the offset of the next synth record,
282
                           point to it and advance the pointer */
283 202
                        c->cursor.rec.ptr = c->synth->data;
284 202
                        c->synth = VTAILQ_NEXT(c->synth, list);
285
                } else {
286 40714
                        assert(c->offset <= c->vtx->len);
287 40714
                        if (c->offset == c->vtx->len)
288
                                /* End of cursor */
289 2642
                                return (0);
290
291
                        /* Advance chunk pointer */
292 38072
                        if (c->chunk == NULL) {
293 1271
                                c->chunk = VTAILQ_FIRST(&c->vtx->chunks);
294 1271
                                c->chunkstart = 0;
295
                        }
296 38072
                        CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
297 77226
                        while (c->offset >= c->chunkstart + c->chunk->len) {
298 1082
                                c->chunkstart += c->chunk->len;
299 1082
                                c->chunk = VTAILQ_NEXT(c->chunk, list);
300 1082
                                CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
301
                        }
302
303
                        /* Point to the next stored record */
304 38072
                        if (c->chunk->type == chunk_t_shm)
305 33451
                                ptr = c->chunk->shm.start.ptr;
306
                        else {
307 4621
                                assert(c->chunk->type == chunk_t_buf);
308 4621
                                ptr = c->chunk->buf.data;
309
                        }
310 38072
                        c->cursor.rec.ptr = ptr + c->offset - c->chunkstart;
311 38072
                        c->offset += VSL_NEXT(c->cursor.rec.ptr) -
312
                            c->cursor.rec.ptr;
313
                }
314 38274
        } while (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch);
315
316 35322
        return (1);
317
}
318
319
static int
320 1856
vslc_vtx_reset(const struct VSL_cursor *cursor)
321
{
322
        struct vslc_vtx *c;
323
324 1856
        CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
325 1856
        assert(&c->cursor == cursor);
326 1856
        CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
327 1856
        c->synth = VTAILQ_FIRST(&c->vtx->synth);
328 1856
        c->chunk = NULL;
329 1856
        c->chunkstart = 0;
330 1856
        c->offset = 0;
331 1856
        c->cursor.rec.ptr = NULL;
332
333 1856
        return (0);
334
}
335
336
static const struct vslc_tbl vslc_vtx_tbl = {
337
        .magic  = VSLC_TBL_MAGIC,
338
        .delete = NULL,
339
        .next   = vslc_vtx_next,
340
        .reset  = vslc_vtx_reset,
341
        .check  = NULL,
342
};
343
344
/* Create a buf chunk */
345
static struct chunk *
346 174
chunk_newbuf(struct vtx *vtx, const uint32_t *ptr, size_t len)
347
{
348
        struct chunk *chunk;
349
350 174
        ALLOC_OBJ(chunk, CHUNK_MAGIC);
351 174
        XXXAN(chunk);
352 174
        chunk->type = chunk_t_buf;
353 174
        chunk->vtx = vtx;
354 174
        chunk->buf.space = VTX_BUFSIZE_MIN;
355 348
        while (chunk->buf.space < len)
356 0
                chunk->buf.space *= 2;
357 174
        chunk->buf.data = malloc(sizeof (uint32_t) * chunk->buf.space);
358 174
        AN(chunk->buf.data);
359 174
        memcpy(chunk->buf.data, ptr, sizeof (uint32_t) * len);
360 174
        chunk->len = len;
361 174
        return (chunk);
362
}
363
364
/* Free a buf chunk */
365
static void
366 174
chunk_freebuf(struct chunk **pchunk)
367
{
368
369 174
        CHECK_OBJ_NOTNULL(*pchunk, CHUNK_MAGIC);
370 174
        assert((*pchunk)->type == chunk_t_buf);
371 174
        free((*pchunk)->buf.data);
372 174
        FREE_OBJ(*pchunk);
373 174
        *pchunk = NULL;
374 174
}
375
376
/* Append a set of records to a chunk */
377
static void
378 1354
chunk_appendbuf(struct chunk *chunk, const uint32_t *ptr, size_t len)
379
{
380
381 1354
        CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
382 1354
        assert(chunk->type == chunk_t_buf);
383 1354
        if (chunk->buf.space < chunk->len + len) {
384 294
                while (chunk->buf.space < chunk->len + len)
385 98
                        chunk->buf.space *= 2;
386 98
                chunk->buf.data = realloc(chunk->buf.data,
387 98
                    sizeof (uint32_t) * chunk->buf.space);
388
        }
389 1354
        memcpy(chunk->buf.data + chunk->len, ptr, sizeof (uint32_t) * len);
390 1354
        chunk->len += len;
391 1354
}
392
393
/* Transform a shm chunk to a buf chunk */
394
static void
395 0
chunk_shm_to_buf(struct VSLQ *vslq, struct chunk *chunk)
396
{
397
        struct vtx *vtx;
398
        struct chunk *buf;
399
400 0
        CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
401 0
        assert(chunk->type == chunk_t_shm);
402 0
        vtx = chunk->vtx;
403 0
        CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
404
405 0
        buf = VTAILQ_PREV(chunk, chunkhead, list);
406 0
        if (buf != NULL && buf->type == chunk_t_buf)
407
                /* Previous is a buf chunk, append to it */
408 0
                chunk_appendbuf(buf, chunk->shm.start.ptr, chunk->len);
409
        else {
410
                /* Create a new buf chunk and insert it before this */
411 0
                buf = chunk_newbuf(vtx, chunk->shm.start.ptr, chunk->len);
412 0
                AN(buf);
413 0
                VTAILQ_INSERT_BEFORE(chunk, buf, list);
414
        }
415
416
        /* Reset cursor chunk pointer, vslc_vtx_next will set it correctly */
417 0
        vtx->c.chunk = NULL;
418
419
        /* Remove from the shmref list and vtx, and put chunk back
420
           on the free list */
421 0
        VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
422 0
        VTAILQ_REMOVE(&vtx->chunks, chunk, list);
423 0
        VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
424 0
}
425
426
/* Append a set of records to a vtx structure */
427
static void
428 2678
vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
429
    size_t len)
430
{
431
        struct chunk *chunk;
432
433 2678
        AN(vtx);
434 2678
        if (len == 0)
435 2683
                return;
436 2678
        AN(start);
437
438 5137
        if (VSL_Check(vslq->c, start) == 2 &&
439 2459
            !VTAILQ_EMPTY(&vtx->shmchunks_free)) {
440
                /* Shmref it */
441 1151
                chunk = VTAILQ_FIRST(&vtx->shmchunks_free);
442 1151
                CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
443 1151
                assert(chunk->type == chunk_t_shm);
444 1151
                assert(chunk->vtx == vtx);
445 1151
                VTAILQ_REMOVE(&vtx->shmchunks_free, chunk, list);
446 1151
                chunk->shm.start = *start;
447 1151
                chunk->len = len;
448 1151
                VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
449
450
                /* Append to shmref list */
451 1151
                VTAILQ_INSERT_TAIL(&vslq->shmrefs, chunk, shm.shmref);
452
        } else {
453
                /* Buffer it */
454 1524
                chunk = VTAILQ_LAST(&vtx->chunks, chunkhead);
455 1524
                CHECK_OBJ_ORNULL(chunk, CHUNK_MAGIC);
456 1524
                if (chunk != NULL && chunk->type == chunk_t_buf) {
457
                        /* Tail is a buf chunk, append to that */
458 1354
                        chunk_appendbuf(chunk, start->ptr, len);
459
                } else {
460
                        /* Append new buf chunk */
461 170
                        chunk = chunk_newbuf(vtx, start->ptr, len);
462 174
                        AN(chunk);
463 174
                        VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
464
                }
465
        }
466 2683
        vtx->len += len;
467
}
468
469
/* Allocate a new vtx structure */
470
static struct vtx *
471 657
vtx_new(struct VSLQ *vslq)
472
{
473
        struct vtx *vtx;
474
        int i;
475
476 657
        AN(vslq);
477 657
        if (vslq->n_cache) {
478 379
                AZ(VTAILQ_EMPTY(&vslq->cache));
479 379
                vtx = VTAILQ_FIRST(&vslq->cache);
480 379
                VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
481 379
                vslq->n_cache--;
482
        } else {
483 278
                ALLOC_OBJ(vtx, VTX_MAGIC);
484 278
                AN(vtx);
485
486 278
                VTAILQ_INIT(&vtx->child);
487 278
                VTAILQ_INIT(&vtx->shmchunks_free);
488 1112
                for (i = 0; i < VTX_SHMCHUNKS; i++) {
489 834
                        vtx->shmchunks[i].magic = CHUNK_MAGIC;
490 834
                        vtx->shmchunks[i].type = chunk_t_shm;
491 834
                        vtx->shmchunks[i].vtx = vtx;
492 834
                        VTAILQ_INSERT_TAIL(&vtx->shmchunks_free,
493
                            &vtx->shmchunks[i], list);
494
                }
495 278
                VTAILQ_INIT(&vtx->chunks);
496 278
                VTAILQ_INIT(&vtx->synth);
497 278
                vtx->c.magic = VSLC_VTX_MAGIC;
498 278
                vtx->c.vtx = vtx;
499 278
                vtx->c.cursor.priv_tbl = &vslc_vtx_tbl;
500 278
                vtx->c.cursor.priv_data = &vtx->c;
501
        }
502
503 657
        CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
504 657
        vtx->key.vxid = 0;
505 657
        vtx->t_start = VTIM_mono();
506 657
        vtx->flags = 0;
507 657
        vtx->type = VSL_t_unknown;
508 657
        vtx->reason = VSL_r_unknown;
509 657
        vtx->parent = NULL;
510 657
        vtx->n_child = 0;
511 657
        vtx->n_childready = 0;
512 657
        vtx->n_descend = 0;
513 657
        vtx->len = 0;
514 657
        (void)vslc_vtx_reset(&vtx->c.cursor);
515
516 657
        return (vtx);
517
}
518
519
/* Disuse a vtx and all it's children, freeing any resources held. Free or
520
   cache the vtx for later use */
521
static void
522 657
vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
523
{
524
        struct vtx *vtx;
525
        struct vtx *child;
526
        struct synth *synth;
527
        struct chunk *chunk;
528
529 657
        AN(vslq);
530 657
        TAKE_OBJ_NOTNULL(vtx, pvtx, VTX_MAGIC);
531
532 657
        AN(vtx->flags & VTX_F_COMPLETE);
533 657
        AN(vtx->flags & VTX_F_READY);
534 657
        AZ(vtx->parent);
535
536 1379
        while (!VTAILQ_EMPTY(&vtx->child)) {
537 65
                child = VTAILQ_FIRST(&vtx->child);
538 65
                assert(child->parent == vtx);
539 65
                AN(vtx->n_child);
540 65
                assert(vtx->n_descend >= child->n_descend + 1);
541 65
                VTAILQ_REMOVE(&vtx->child, child, list_child);
542 65
                child->parent = NULL;
543 65
                vtx->n_child--;
544 65
                vtx->n_descend -= child->n_descend + 1;
545 65
                vtx_retire(vslq, &child);
546 65
                AZ(child);
547
        }
548 657
        AZ(vtx->n_child);
549 657
        AZ(vtx->n_descend);
550 657
        vtx->n_childready = 0;
551 657
        AN(VRB_REMOVE(vtx_tree, &vslq->tree, &vtx->key));
552 657
        vtx->key.vxid = 0;
553 657
        vtx->flags = 0;
554
555 1512
        while (!VTAILQ_EMPTY(&vtx->synth)) {
556 198
                synth = VTAILQ_FIRST(&vtx->synth);
557 198
                CHECK_OBJ_NOTNULL(synth, SYNTH_MAGIC);
558 198
                VTAILQ_REMOVE(&vtx->synth, synth, list);
559 198
                FREE_OBJ(synth);
560
        }
561
562 2640
        while (!VTAILQ_EMPTY(&vtx->chunks)) {
563 1326
                chunk = VTAILQ_FIRST(&vtx->chunks);
564 1326
                CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
565 1326
                VTAILQ_REMOVE(&vtx->chunks, chunk, list);
566 1326
                if (chunk->type == chunk_t_shm) {
567 1152
                        VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
568 1152
                        VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
569
                } else {
570 174
                        assert(chunk->type == chunk_t_buf);
571 174
                        chunk_freebuf(&chunk);
572 174
                        AZ(chunk);
573
                }
574
        }
575 657
        vtx->len = 0;
576 657
        AN(vslq->n_outstanding);
577 657
        vslq->n_outstanding--;
578
579 657
        if (vslq->n_cache < VTX_CACHE) {
580 657
                VTAILQ_INSERT_HEAD(&vslq->cache, vtx, list_child);
581 657
                vslq->n_cache++;
582
        } else {
583 0
                FREE_OBJ(vtx);
584 0
                vtx = NULL;
585
        }
586 657
}
587
588
/* Lookup a vtx by vxid from the managed list */
589
static struct vtx *
590 2851
vtx_lookup(const struct VSLQ *vslq, unsigned vxid)
591
{
592
        struct vtx_key lkey, *key;
593
        struct vtx *vtx;
594
595 2851
        AN(vslq);
596 2851
        lkey.vxid = vxid;
597 2851
        key = VRB_FIND(vtx_tree, &vslq->tree, &lkey);
598 2848
        if (key == NULL)
599 713
                return (NULL);
600 2135
        CAST_OBJ_NOTNULL(vtx, (void *)key, VTX_MAGIC);
601 2135
        return (vtx);
602
}
603
604
/* Insert a new vtx into the managed list */
605
static struct vtx *
606 657
vtx_add(struct VSLQ *vslq, unsigned vxid)
607
{
608
        struct vtx *vtx;
609
610 657
        AN(vslq);
611 657
        vtx = vtx_new(vslq);
612 657
        AN(vtx);
613 657
        vtx->key.vxid = vxid;
614 657
        AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
615 657
        VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_vtx);
616 657
        vslq->n_outstanding++;
617 657
        return (vtx);
618
}
619
620
/* Mark a vtx complete, update child counters and if possible push it or
621
   it's top parent to the ready state */
622
static void
623 657
vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
624
{
625
626 657
        AN(vslq);
627 657
        AN(vtx->flags & VTX_F_END);
628 657
        AZ(vtx->flags & VTX_F_COMPLETE);
629
630 657
        if (vtx->type == VSL_t_unknown)
631 0
                vtx_diag(vtx, "vtx of unknown type marked complete");
632
633 657
        vtx->flags |= VTX_F_COMPLETE;
634 657
        VTAILQ_REMOVE(&vslq->incomplete, vtx, list_vtx);
635
636
        while (1) {
637 722
                AZ(vtx->flags & VTX_F_READY);
638 1385
                if (vtx->flags & VTX_F_COMPLETE &&
639 663
                    vtx->n_child == vtx->n_childready)
640 657
                        vtx->flags |= VTX_F_READY;
641
                else
642 65
                        return;
643 657
                if (vtx->parent == NULL) {
644
                        /* Top level vtx ready */
645 592
                        VTAILQ_INSERT_TAIL(&vslq->ready, vtx, list_vtx);
646 592
                        return;
647
                }
648 65
                vtx = vtx->parent;
649 65
                vtx->n_childready++;
650 65
                assert(vtx->n_child >= vtx->n_childready);
651 65
        }
652
}
653
654
/* Add a child to a parent, and update child counters */
655
static void
656 65
vtx_set_parent(struct vtx *parent, struct vtx *child)
657
{
658
659 65
        CHECK_OBJ_NOTNULL(parent, VTX_MAGIC);
660 65
        CHECK_OBJ_NOTNULL(child, VTX_MAGIC);
661 65
        assert(parent != child);
662 65
        AZ(parent->flags & VTX_F_COMPLETE);
663 65
        AZ(child->flags & VTX_F_COMPLETE);
664 65
        AZ(child->parent);
665 65
        child->parent = parent;
666 65
        VTAILQ_INSERT_TAIL(&parent->child, child, list_child);
667 65
        parent->n_child++;
668
        do
669 80
                parent->n_descend += 1 + child->n_descend;
670 80
        while ((parent = parent->parent) != NULL);
671 65
}
672
673
/* Parse a begin or link record. Returns the number of elements that was
674
   successfully parsed. */
675
static int
676 1103
vtx_parse_link(const char *str, enum VSL_transaction_e *ptype,
677
    unsigned *pvxid, enum VSL_reason_e *preason)
678
{
679
        char type[16], reason[16];
680
        unsigned vxid;
681
        int i;
682
        enum VSL_transaction_e et;
683
        enum VSL_reason_e er;
684
685 1103
        AN(str);
686 1103
        AN(ptype);
687 1103
        AN(pvxid);
688 1103
        AN(preason);
689
690 1103
        i = sscanf(str, "%15s %u %15s", type, &vxid, reason);
691 1103
        if (i < 1)
692 0
                return (0);
693
694
        /* transaction type */
695 3486
        for (et = 0; et < VSL_t__MAX; et++)
696 3486
                if (!strcmp(type, vsl_t_names[et]))
697 1103
                        break;
698 1103
        if (et >= VSL_t__MAX)
699 0
                et = VSL_t_unknown;
700 1103
        *ptype = et;
701 1103
        if (i == 1)
702 0
                return (1);
703
704
        /* vxid */
705 1103
        assert((vxid & ~VSL_IDENTMASK) == 0);
706 1103
        *pvxid = vxid;
707 1103
        if (i == 2)
708 0
                return (2);
709
710
        /* transaction reason */
711 4742
        for (er = 0; er < VSL_r__MAX; er++)
712 4730
                if (!strcmp(reason, vsl_r_names[er]))
713 1091
                        break;
714 1103
        if (er >= VSL_r__MAX)
715 10
                er = VSL_r_unknown;
716 1103
        *preason = er;
717 1103
        return (3);
718
}
719
720
/* Parse and process a begin record */
721
static int
722 656
vtx_scan_begin(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
723
{
724
        int i;
725
        enum VSL_transaction_e type;
726
        enum VSL_reason_e reason;
727
        unsigned p_vxid;
728
        struct vtx *p_vtx;
729
730 656
        assert(VSL_TAG(ptr) == SLT_Begin);
731
732 656
        AZ(vtx->flags & VTX_F_READY);
733
734 656
        i = vtx_parse_link(VSL_CDATA(ptr), &type, &p_vxid, &reason);
735 657
        if (i != 3)
736 0
                return (vtx_diag_tag(vtx, ptr, "parse error"));
737 657
        if (type == VSL_t_unknown)
738 0
                (void)vtx_diag_tag(vtx, ptr, "unknown vxid type");
739
740
        /* Check/set vtx type */
741 657
        if (vtx->type != VSL_t_unknown && vtx->type != type)
742
                /* Type not matching the one previously set by a link
743
                   record */
744 0
                (void)vtx_diag_tag(vtx, ptr, "type mismatch");
745 657
        vtx->type = type;
746 657
        vtx->reason = reason;
747
748 657
        if (p_vxid == 0)
749
                /* Zero means no parent */
750 213
                return (0);
751 444
        if (p_vxid == vtx->key.vxid)
752 0
                return (vtx_diag_tag(vtx, ptr, "link to self"));
753
754 444
        if (vslq->grouping == VSL_g_vxid)
755 355
                return (0);     /* No links */
756 124
        if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_req &&
757 35
            vtx->reason == VSL_r_rxreq)
758 24
                return (0);     /* No links */
759
760 65
        if (vtx->parent != NULL) {
761 15
                if (vtx->parent->key.vxid != p_vxid) {
762
                        /* This vtx already belongs to a different
763
                           parent */
764 0
                        return (vtx_diag_tag(vtx, ptr, "link mismatch"));
765
                } else
766
                        /* Link already exists */
767 15
                        return (0);
768
        }
769
770 50
        p_vtx = vtx_lookup(vslq, p_vxid);
771 50
        if (p_vtx == NULL) {
772
                /* Not seen parent yet. Create it. */
773 28
                p_vtx = vtx_add(vslq, p_vxid);
774 28
                AN(p_vtx);
775
        } else {
776 22
                CHECK_OBJ_NOTNULL(p_vtx, VTX_MAGIC);
777 22
                if (p_vtx->flags & VTX_F_COMPLETE)
778 0
                        return (vtx_diag_tag(vtx, ptr, "link too late"));
779
        }
780
781
        /* Create link */
782 50
        vtx_set_parent(p_vtx, vtx);
783
784 50
        return (0);
785
}
786
787
/* Parse and process a link record */
788
static int
789 446
vtx_scan_link(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
790
{
791
        int i;
792
        enum VSL_transaction_e c_type;
793
        enum VSL_reason_e c_reason;
794
        unsigned c_vxid;
795
        struct vtx *c_vtx;
796
797 446
        assert(VSL_TAG(ptr) == SLT_Link);
798
799 446
        AZ(vtx->flags & VTX_F_READY);
800
801 446
        i = vtx_parse_link(VSL_CDATA(ptr), &c_type, &c_vxid, &c_reason);
802 446
        if (i != 3)
803 0
                return (vtx_diag_tag(vtx, ptr, "parse error"));
804 446
        if (c_type == VSL_t_unknown)
805 0
                (void)vtx_diag_tag(vtx, ptr, "unknown vxid type");
806
807 446
        if (vslq->grouping == VSL_g_vxid)
808 357
                return (0);     /* No links */
809 89
        if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_sess)
810 24
                return (0);     /* No links */
811
812 65
        if (c_vxid == 0)
813 0
                return (vtx_diag_tag(vtx, ptr, "illegal link vxid"));
814 65
        if (c_vxid == vtx->key.vxid)
815 0
                return (vtx_diag_tag(vtx, ptr, "link to self"));
816
817
        /* Lookup and check child vtx */
818 65
        c_vtx = vtx_lookup(vslq, c_vxid);
819 65
        if (c_vtx == NULL) {
820
                /* Child not seen before. Insert it and create link */
821 15
                c_vtx = vtx_add(vslq, c_vxid);
822 15
                AN(c_vtx);
823 15
                AZ(c_vtx->parent);
824 15
                c_vtx->type = c_type;
825 15
                c_vtx->reason = c_reason;
826 15
                vtx_set_parent(vtx, c_vtx);
827 15
                return (0);
828
        }
829
830 50
        CHECK_OBJ_NOTNULL(c_vtx, VTX_MAGIC);
831 50
        if (c_vtx->parent == vtx)
832
                /* Link already exists */
833 50
                return (0);
834 0
        if (c_vtx->parent != NULL && c_vtx->parent != vtx)
835 0
                return (vtx_diag_tag(vtx, ptr, "duplicate link"));
836 0
        if (c_vtx->flags & VTX_F_COMPLETE)
837 0
                return (vtx_diag_tag(vtx, ptr, "link too late"));
838 0
        if (c_vtx->type != VSL_t_unknown && c_vtx->type != c_type)
839 0
                (void)vtx_diag_tag(vtx, ptr, "type mismatch");
840
841 0
        c_vtx->type = c_type;
842 0
        c_vtx->reason = c_reason;
843 0
        vtx_set_parent(vtx, c_vtx);
844 0
        return (0);
845
}
846
847
/* Scan the records of a vtx, performing processing actions on specific
848
   records */
849
static void
850 2881
vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
851
{
852
        const uint32_t *ptr;
853
        enum VSL_tag_e tag;
854
855 43522
        while (!(vtx->flags & VTX_F_COMPLETE) &&
856 19995
            vslc_vtx_next(&vtx->c.cursor) == 1) {
857 17772
                ptr = vtx->c.cursor.rec.ptr;
858 17772
                if (VSL_ID(ptr) != vtx->key.vxid) {
859 1
                        (void)vtx_diag_tag(vtx, ptr, "vxid mismatch");
860 0
                        continue;
861
                }
862
863 17771
                tag = VSL_TAG(ptr);
864 17771
                assert(tag != SLT__Batch);
865
866 17771
                switch (tag) {
867
                case SLT_Begin:
868 656
                        if (vtx->flags & VTX_F_BEGIN)
869 0
                                (void)vtx_diag_tag(vtx, ptr, "duplicate begin");
870
                        else {
871 656
                                (void)vtx_scan_begin(vslq, vtx, ptr);
872 657
                                vtx->flags |= VTX_F_BEGIN;
873
                        }
874 657
                        break;
875
876
                case SLT_Link:
877 446
                        (void)vtx_scan_link(vslq, vtx, ptr);
878 446
                        break;
879
880
                case SLT_End:
881 657
                        AZ(vtx->flags & VTX_F_END);
882 657
                        vtx->flags |= VTX_F_END;
883 657
                        vtx_mark_complete(vslq, vtx);
884 657
                        break;
885
886
                default:
887 16012
                        break;
888
                }
889
        }
890 2874
}
891
892
/* Force a vtx into complete status by synthing the necessary outstanding
893
   records */
894
static void
895 99
vtx_force(struct VSLQ *vslq, struct vtx *vtx, const char *reason)
896
{
897
898 99
        AZ(vtx->flags & VTX_F_COMPLETE);
899 99
        AZ(vtx->flags & VTX_F_READY);
900 99
        vtx_scan(vslq, vtx);
901 99
        if (!(vtx->flags & VTX_F_BEGIN))
902 0
                vtx_synth_rec(vtx, SLT_Begin, "%s %u synth",
903 0
                    vsl_t_names[vtx->type], 0);
904 99
        vtx_diag(vtx, reason);
905 99
        if (!(vtx->flags & VTX_F_END))
906 99
                vtx_synth_rec(vtx, SLT_End, "synth");
907 99
        vtx_scan(vslq, vtx);
908 99
        AN(vtx->flags & VTX_F_COMPLETE);
909 99
}
910
911
/* Build transaction array, do the query and callback. Returns 0 or the
912
   return value from func */
913
static int
914 495
vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
915
    void *priv)
916
{
917 495
        unsigned n = vtx->n_descend + 1;
918 495
        struct vtx *vtxs[n];
919 495
        struct VSL_transaction trans[n];
920 495
        struct VSL_transaction *ptrans[n + 1];
921
        unsigned i, j;
922
923 495
        AN(vslq);
924 495
        CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
925 495
        AN(vtx->flags & VTX_F_READY);
926 495
        AN(func);
927
928 502
        if (vslq->grouping == VSL_g_session &&
929 7
            vtx->type != VSL_t_sess)
930 0
                return (0);
931 521
        if (vslq->grouping == VSL_g_request &&
932 26
            vtx->type != VSL_t_req)
933 0
                return (0);
934
935
        /* Build transaction array */
936 495
        (void)vslc_vtx_reset(&vtx->c.cursor);
937 495
        vtxs[0] = vtx;
938 495
        trans[0].level = 1;
939 495
        trans[0].vxid = vtx->key.vxid;
940 495
        trans[0].vxid_parent = 0;
941 495
        trans[0].type = vtx->type;
942 495
        trans[0].reason = vtx->reason;
943 495
        trans[0].c = &vtx->c.cursor;
944 495
        i = 1;
945 495
        j = 0;
946 1550
        while (j < i) {
947 625
                VTAILQ_FOREACH(vtx, &vtxs[j]->child, list_child) {
948 65
                        assert(i < n);
949 65
                        (void)vslc_vtx_reset(&vtx->c.cursor);
950 65
                        vtxs[i] = vtx;
951 65
                        if (vtx->reason == VSL_r_restart)
952
                                /* Restarts stay at the same level as parent */
953 2
                                trans[i].level = trans[j].level;
954
                        else
955 63
                                trans[i].level = trans[j].level + 1;
956 65
                        trans[i].vxid = vtx->key.vxid;
957 65
                        trans[i].vxid_parent = trans[j].vxid;
958 65
                        trans[i].type = vtx->type;
959 65
                        trans[i].reason = vtx->reason;
960 65
                        trans[i].c = &vtx->c.cursor;
961 65
                        i++;
962
                }
963 560
                j++;
964
        }
965 495
        assert(i == n);
966
967
        /* Build pointer array */
968 1055
        for (i = 0; i < n; i++)
969 560
                ptrans[i] = &trans[i];
970 495
        ptrans[i] = NULL;
971
972
        /* Query test goes here */
973 495
        if (vslq->query != NULL && !vslq_runquery(vslq->query, ptrans))
974 218
                return (0);
975
976
        /* Callback */
977 277
        return ((func)(vslq->vsl, ptrans, priv));
978
}
979
980
/* Create a synthetic log record. The record will be inserted at the
981
   current cursor offset */
982
static void
983 198
vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...)
984
{
985
        struct synth *synth, *it;
986
        va_list ap;
987
        char *buf;
988
        int l, buflen;
989
990 198
        ALLOC_OBJ(synth, SYNTH_MAGIC);
991 198
        AN(synth);
992
993 198
        buf = (char *)&synth->data[2];
994 198
        buflen = sizeof (synth->data) - 2 * sizeof (uint32_t);
995 198
        va_start(ap, fmt);
996 198
        l = vsnprintf(buf, buflen, fmt, ap);
997 198
        assert(l >= 0);
998 198
        va_end(ap);
999 198
        if (l > buflen - 1)
1000 0
                l = buflen - 1;
1001 198
        buf[l++] = '\0';        /* NUL-terminated */
1002 198
        synth->data[1] = vtx->key.vxid;
1003 198
        switch (vtx->type) {
1004
        case VSL_t_req:
1005 18
                synth->data[1] |= VSL_CLIENTMARKER;
1006 18
                break;
1007
        case VSL_t_bereq:
1008 2
                synth->data[1] |= VSL_BACKENDMARKER;
1009 2
                break;
1010
        default:
1011 178
                break;
1012
        }
1013 198
        synth->data[0] = (((tag & 0xff) << 24) | l);
1014 198
        synth->offset = vtx->c.offset;
1015
1016 198
        VTAILQ_FOREACH_REVERSE(it, &vtx->synth, synthhead, list) {
1017
                /* Make sure the synth list is sorted on offset */
1018 99
                CHECK_OBJ_NOTNULL(it, SYNTH_MAGIC);
1019 99
                if (synth->offset >= it->offset)
1020 99
                        break;
1021
        }
1022 198
        if (it != NULL)
1023 99
                VTAILQ_INSERT_AFTER(&vtx->synth, it, synth, list);
1024
        else
1025 99
                VTAILQ_INSERT_HEAD(&vtx->synth, synth, list);
1026
1027
        /* Update cursor */
1028 198
        CHECK_OBJ_ORNULL(vtx->c.synth, SYNTH_MAGIC);
1029 198
        if (vtx->c.synth == NULL || vtx->c.synth->offset > synth->offset)
1030 99
                vtx->c.synth = synth;
1031 198
}
1032
1033
/* Add a diagnostic SLT_VSL synth record to the vtx. */
1034
static int
1035 99
vtx_diag(struct vtx *vtx, const char *msg)
1036
{
1037
1038 99
        vtx_synth_rec(vtx, SLT_VSL, msg);
1039 99
        return (-1);
1040
}
1041
1042
/* Add a SLT_VSL diag synth record to the vtx. Takes an offending record
1043
   that will be included in the log record */
1044
static int
1045 0
vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr, const char *reason)
1046
{
1047
1048 0
        vtx_synth_rec(vtx, SLT_VSL, "%s (%u:%s \"%.*s\")", reason, VSL_ID(ptr),
1049 0
            VSL_tags[VSL_TAG(ptr)], (int)VSL_LEN(ptr), VSL_CDATA(ptr));
1050 0
        return (-1);
1051
}
1052
1053
struct VSLQ *
1054 170
VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
1055
    enum VSL_grouping_e grouping, const char *querystring)
1056
{
1057
        struct vslq_query *query;
1058
        struct VSLQ *vslq;
1059
1060 170
        CHECK_OBJ_NOTNULL(vsl, VSL_MAGIC);
1061 170
        if (grouping > VSL_g_session) {
1062 0
                (void)vsl_diag(vsl, "Illegal query grouping");
1063 0
                return (NULL);
1064
        }
1065 170
        if (querystring != NULL) {
1066 63
                query = vslq_newquery(vsl, grouping, querystring);
1067 63
                if (query == NULL)
1068 9
                        return (NULL);
1069
        } else
1070 107
                query = NULL;
1071
1072 161
        ALLOC_OBJ(vslq, VSLQ_MAGIC);
1073 161
        AN(vslq);
1074 161
        vslq->vsl = vsl;
1075 161
        if (cp != NULL) {
1076 141
                vslq->c = *cp;
1077 141
                *cp = NULL;
1078
        }
1079 161
        vslq->grouping = grouping;
1080 161
        vslq->query = query;
1081
1082
        /* Setup normal mode */
1083 161
        VRB_INIT(&vslq->tree);
1084 161
        VTAILQ_INIT(&vslq->ready);
1085 161
        VTAILQ_INIT(&vslq->incomplete);
1086 161
        VTAILQ_INIT(&vslq->shmrefs);
1087 161
        VTAILQ_INIT(&vslq->cache);
1088
1089
        /* Setup raw mode */
1090 161
        vslq->raw.c.magic = VSLC_RAW_MAGIC;
1091 161
        vslq->raw.c.cursor.priv_tbl = &vslc_raw_tbl;
1092 161
        vslq->raw.c.cursor.priv_data = &vslq->raw.c;
1093 161
        vslq->raw.trans.level = 0;
1094 161
        vslq->raw.trans.type = VSL_t_raw;
1095 161
        vslq->raw.trans.reason = VSL_r_unknown;
1096 161
        vslq->raw.trans.c = &vslq->raw.c.cursor;
1097 161
        vslq->raw.ptrans[0] = &vslq->raw.trans;
1098 161
        vslq->raw.ptrans[1] = NULL;
1099
1100 161
        return (vslq);
1101
}
1102
1103
void
1104 158
VSLQ_Delete(struct VSLQ **pvslq)
1105
{
1106
        struct VSLQ *vslq;
1107
        struct vtx *vtx;
1108
1109 158
        TAKE_OBJ_NOTNULL(vslq, pvslq, VSLQ_MAGIC);
1110
1111 158
        (void)VSLQ_Flush(vslq, NULL, NULL);
1112 158
        AZ(vslq->n_outstanding);
1113
1114 158
        if (vslq->c != NULL) {
1115 158
                VSL_DeleteCursor(vslq->c);
1116 158
                vslq->c = NULL;
1117
        }
1118
1119 158
        if (vslq->query != NULL)
1120 54
                vslq_deletequery(&vslq->query);
1121 158
        AZ(vslq->query);
1122
1123 594
        while (!VTAILQ_EMPTY(&vslq->cache)) {
1124 278
                AN(vslq->n_cache);
1125 278
                vtx = VTAILQ_FIRST(&vslq->cache);
1126 278
                VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
1127 278
                vslq->n_cache--;
1128 278
                FREE_OBJ(vtx);
1129
        }
1130
1131 158
        FREE_OBJ(vslq);
1132 158
}
1133
1134
void
1135 17
VSLQ_SetCursor(struct VSLQ *vslq, struct VSL_cursor **cp)
1136
{
1137
1138 17
        CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1139
1140 17
        if (vslq->c != NULL) {
1141 0
                (void)VSLQ_Flush(vslq, NULL, NULL);
1142 0
                AZ(vslq->n_outstanding);
1143 0
                VSL_DeleteCursor(vslq->c);
1144 0
                vslq->c = NULL;
1145
        }
1146
1147 17
        if (cp != NULL) {
1148 17
                AN(*cp);
1149 17
                vslq->c = *cp;
1150 17
                *cp = NULL;
1151
        }
1152 17
}
1153
1154
/* Regard each log line as a single transaction, feed it through the query
1155
   and do the callback */
1156
static int
1157 5908
vslq_raw(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1158
{
1159 5908
        int i = 1;
1160
        int r;
1161
1162 5908
        assert(vslq->grouping == VSL_g_raw);
1163
1164 5908
        assert(vslq->raw.offset <= vslq->raw.len);
1165
        do {
1166 6103
                if (vslq->raw.offset == vslq->raw.len) {
1167 3784
                        i = VSL_Next(vslq->c);
1168 3784
                        if (i <= 0)
1169 2996
                                return (i);
1170 788
                        AN(vslq->c->rec.ptr);
1171 788
                        vslq->raw.start = vslq->c->rec;
1172 788
                        if (VSL_TAG(vslq->c->rec.ptr) == SLT__Batch)
1173 390
                                vslq->raw.len = VSL_END(vslq->c->rec.ptr,
1174 195
                                    VSL_BATCHLEN(vslq->c->rec.ptr)) -
1175
                                    vslq->c->rec.ptr;
1176
                        else
1177 593
                                vslq->raw.len = VSL_NEXT(vslq->raw.start.ptr) -
1178
                                    vslq->raw.start.ptr;
1179 788
                        assert(vslq->raw.len > 0);
1180 788
                        vslq->raw.offset = 0;
1181
                }
1182
1183 3107
                vslq->raw.c.ptr = vslq->raw.start.ptr + vslq->raw.offset;
1184 3107
                vslq->raw.c.cursor.rec.ptr = NULL;
1185 3107
                vslq->raw.trans.vxid = VSL_ID(vslq->raw.c.ptr);
1186 3107
                vslq->raw.offset += VSL_NEXT(vslq->raw.c.ptr) - vslq->raw.c.ptr;
1187 3107
        } while (VSL_TAG(vslq->raw.c.ptr) == SLT__Batch);
1188
1189 2912
        if (func == NULL)
1190 0
                return (i);
1191
1192 3210
        if (vslq->query != NULL &&
1193 298
            !vslq_runquery(vslq->query, vslq->raw.ptrans))
1194 270
                return (i);
1195
1196 2642
        r = (func)(vslq->vsl, vslq->raw.ptrans, priv);
1197 2642
        if (r)
1198 45
                return (r);
1199
1200 2597
        return (i);
1201
}
1202
1203
/* Check the beginning of the shmref list, and buffer refs that are at
1204
 * warning level.
1205
 *
1206
 * Returns:
1207
 *    0:        OK
1208
 *   -3:        Failure
1209
 */
1210
static int
1211 3819
vslq_shmref_check(struct VSLQ *vslq)
1212
{
1213
        struct chunk *chunk;
1214
        int i;
1215
1216 7638
        while ((chunk = VTAILQ_FIRST(&vslq->shmrefs)) != NULL) {
1217 2675
                CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
1218 2676
                assert(chunk->type == chunk_t_shm);
1219 2676
                i = VSL_Check(vslq->c, &chunk->shm.start);
1220 2678
                if (i == 2)
1221
                        /* First on list is OK, refs behind it must also
1222
                           be OK */
1223 2678
                        return (0);
1224 0
                else if (i == 1)
1225
                        /* Warning level. Buffer this chunk */
1226 0
                        chunk_shm_to_buf(vslq, chunk);
1227
                else
1228
                        /* Too late to buffer */
1229 0
                        return (-3);
1230
        }
1231
1232 1144
        return (0);
1233
}
1234
1235
/* Process next input record */
1236
static int
1237 6101
vslq_next(struct VSLQ *vslq)
1238
{
1239
        struct VSL_cursor *c;
1240
        int i;
1241
        enum VSL_tag_e tag;
1242
        ssize_t len;
1243
        unsigned vxid;
1244
        struct vtx *vtx;
1245
1246 6101
        c = vslq->c;
1247 6101
        i = VSL_Next(c);
1248 6101
        if (i != 1)
1249 2281
                return (i);
1250
1251 3820
        tag = (enum VSL_tag_e)VSL_TAG(c->rec.ptr);
1252 3820
        if (tag == SLT__Batch) {
1253 1768
                vxid = VSL_BATCHID(c->rec.ptr);
1254 1768
                len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
1255
                    c->rec.ptr;
1256 1768
                if (len == 0)
1257 0
                        return (i);
1258 1768
                tag = (enum VSL_tag_e)VSL_TAG(VSL_NEXT(c->rec.ptr));
1259
        } else {
1260 2052
                vxid = VSL_ID(c->rec.ptr);
1261 2052
                len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
1262
        }
1263 3820
        assert(len > 0);
1264 3820
        if (vxid == 0)
1265
                /* Skip non-transactional records */
1266 1084
                return (i);
1267
1268 2736
        vtx = vtx_lookup(vslq, vxid);
1269 2733
        if (vtx == NULL && tag == SLT_Begin) {
1270 613
                vtx = vtx_add(vslq, vxid);
1271 614
                AN(vtx);
1272
        }
1273 2734
        if (vtx != NULL) {
1274 2678
                vtx_append(vslq, vtx, &c->rec, len);
1275 2683
                vtx_scan(vslq, vtx);
1276
        }
1277
1278 2737
        return (i);
1279
}
1280
1281
/* Test query and report any ready transactions */
1282
static int
1283 653
vslq_process_ready(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1284
{
1285
        struct vtx *vtx;
1286 653
        int i = 0;
1287
1288 653
        AN(vslq);
1289
1290 1801
        while (!VTAILQ_EMPTY(&vslq->ready)) {
1291 592
                vtx = VTAILQ_FIRST(&vslq->ready);
1292 592
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1293 592
                VTAILQ_REMOVE(&vslq->ready, vtx, list_vtx);
1294 592
                AN(vtx->flags & VTX_F_READY);
1295 592
                if (func != NULL)
1296 495
                        i = vslq_callback(vslq, vtx, func, priv);
1297 592
                vtx_retire(vslq, &vtx);
1298 592
                AZ(vtx);
1299 592
                if (i)
1300 97
                        return (i);
1301
        }
1302
1303 556
        return (0);
1304
}
1305
1306
/* Process the input cursor, calling the callback function on matching
1307
   transaction sets */
1308
int
1309 12010
VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1310
{
1311
        int i, r;
1312
        double now;
1313
        struct vtx *vtx;
1314
1315 12010
        CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1316
1317
        /* Check that we have a cursor */
1318 12010
        if (vslq->c == NULL)
1319 0
                return (-2);
1320
1321 12010
        if (vslq->grouping == VSL_g_raw)
1322 5908
                return (vslq_raw(vslq, func, priv));
1323
1324
        /* Process next cursor input */
1325 6102
        i = vslq_next(vslq);
1326 6100
        if (i <= 0)
1327
                /* At end of log or cursor reports error condition */
1328 2280
                return (i);
1329
1330
        /* Check shmref list and buffer if necessary */
1331 3820
        r = vslq_shmref_check(vslq);
1332 3821
        if (r)
1333
                /* Buffering of shm ref failed */
1334 0
                return (r);
1335
1336
        /* Check vtx timeout */
1337 3821
        now = VTIM_mono();
1338 3826
        while (!VTAILQ_EMPTY(&vslq->incomplete)) {
1339 2769
                vtx = VTAILQ_FIRST(&vslq->incomplete);
1340 2769
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1341 2768
                if (now - vtx->t_start < vslq->vsl->T_opt)
1342 2766
                        break;
1343 2
                vtx_force(vslq, vtx, "timeout");
1344 2
                AN(vtx->flags & VTX_F_COMPLETE);
1345
        }
1346
1347
        /* Check store limit */
1348 7650
        while (vslq->n_outstanding > vslq->vsl->L_opt &&
1349 0
            !(VTAILQ_EMPTY(&vslq->incomplete))) {
1350 0
                vtx = VTAILQ_FIRST(&vslq->incomplete);
1351 0
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1352 0
                vtx_force(vslq, vtx, "store overflow");
1353 0
                AN(vtx->flags & VTX_F_COMPLETE);
1354 0
                r = vslq_process_ready(vslq, func, priv);
1355 0
                if (r)
1356
                        /* User return code */
1357 0
                        return (r);
1358
        }
1359
1360
        /* Check ready list */
1361 3825
        if (!VTAILQ_EMPTY(&vslq->ready)) {
1362 495
                r = vslq_process_ready(vslq, func, priv);
1363 495
                if (r)
1364
                        /* User return code */
1365 97
                        return (r);
1366
        }
1367
1368
        /* Return cursor return value */
1369 3728
        return (i);
1370
}
1371
1372
/* Flush any incomplete vtx held on to. Do callbacks if func != NULL */
1373
int
1374 158
VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
1375
{
1376
        struct vtx *vtx;
1377
1378 158
        CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
1379
1380 413
        while (!VTAILQ_EMPTY(&vslq->incomplete)) {
1381 97
                vtx = VTAILQ_FIRST(&vslq->incomplete);
1382 97
                CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
1383 97
                AZ(vtx->flags & VTX_F_COMPLETE);
1384 97
                vtx_force(vslq, vtx, "flush");
1385
        }
1386
1387 158
        return (vslq_process_ready(vslq, func, priv));
1388
}