[master] 19a5542 Rework VSL_Dispatch
Martin Blix Grydeland
martin at varnish-cache.org
Wed Oct 9 16:03:03 CEST 2013
commit 19a5542d7b1795e3fcfd6f0697ecace41ddba4d2
Author: Martin Blix Grydeland <martin at varnish-software.com>
Date: Tue Oct 8 10:22:12 2013 +0200
Rework VSL_Dispatch
Highlights:
* Improved buffer handling. A vtx can now have a mix of buffered and
shm chunks. And as shm refs are used and later buffered, they can be
reused on the same vtx. This benefits long running transactions.
* Synth records are now returned by the cursor at the position the
cursor was at when it was created. This solves a problem when
writing to a file and a vtx times out, the reading of that file
would also cause a time out delay.
* Begin records are now strictly honored, and any records before a
Begin record are now ignored.
* More asserts on the structures
* More comments, should be easier to understand now
* Various other changes
diff --git a/lib/libvarnishapi/vsl_dispatch.c b/lib/libvarnishapi/vsl_dispatch.c
index db92c7f..a47d018 100644
--- a/lib/libvarnishapi/vsl_dispatch.c
+++ b/lib/libvarnishapi/vsl_dispatch.c
@@ -46,7 +46,16 @@
#define VTX_CACHE 10
#define VTX_BUFSIZE_MIN 64
-#define VTX_CHUNKS 3
+#define VTX_SHMCHUNKS 3
+
+static const char * const vsl_t_names[VSL_t__MAX] = {
+ [VSL_t_unknown] = "unknown",
+ [VSL_t_sess] = "sess",
+ [VSL_t_req] = "req",
+ [VSL_t_esireq] = "esireq",
+ [VSL_t_bereq] = "bereq",
+ [VSL_t_raw] = "raw",
+};
struct vtx;
@@ -61,20 +70,41 @@ struct vslc_raw {
const uint32_t *next;
};
-struct vtx_chunk {
- struct VSLC_ptr start;
- ssize_t len;
- ssize_t offset;
-};
-
-struct vtx_diag {
+struct synth {
unsigned magic;
-#define VTX_DIAG_MAGIC 0xC654479F
+#define SYNTH_MAGIC 0xC654479F
- VTAILQ_ENTRY(vtx_diag) list;
- uint32_t chunk[2 + 256 / sizeof (uint32_t)];
+ VTAILQ_ENTRY(synth) list;
+ size_t offset;
+ uint32_t data[2 + 64 / sizeof (uint32_t)];
};
+VTAILQ_HEAD(synthhead, synth);
+enum chunk_t {
+ chunk_t__unassigned,
+ chunk_t_shm,
+ chunk_t_buf,
+};
+
+struct chunk {
+ unsigned magic;
+#define CHUNK_MAGIC 0x48DC0194
+ enum chunk_t type;
+ union {
+ struct {
+ struct VSLC_ptr start;
+ VTAILQ_ENTRY(chunk) shmref;
+ } shm;
+ struct {
+ uint32_t *data;
+ size_t space;
+ } buf;
+ };
+ size_t len;
+ struct vtx *vtx;
+ VTAILQ_ENTRY(chunk) list;
+};
+VTAILQ_HEAD(chunkhead, chunk);
struct vslc_vtx {
unsigned magic;
@@ -83,11 +113,10 @@ struct vslc_vtx {
struct VSL_cursor cursor;
struct vtx *vtx;
-
- struct vtx_diag *diag; /* Current diag message pointer */
-
- unsigned chunk; /* Current chunk */
- ssize_t offset; /* Offset of next record */
+ struct synth *synth;
+ struct chunk *chunk;
+ size_t chunkstart;
+ size_t offset;
};
struct vtx_key {
@@ -102,12 +131,15 @@ struct vtx {
#define VTX_MAGIC 0xACC21D09
VTAILQ_ENTRY(vtx) list_child;
VTAILQ_ENTRY(vtx) list_incomplete;
- VTAILQ_ENTRY(vtx) list_shm;
double t_start;
unsigned flags;
-#define VTX_F_COMPLETE 0x1
-#define VTX_F_READY 0x2
+#define VTX_F_BEGIN 0x1 /* Begin record processed */
+#define VTX_F_END 0x2 /* End record processed */
+#define VTX_F_COMPLETE 0x4 /* Marked complete. No new children
+ should be appended */
+#define VTX_F_READY 0x8 /* This vtx and all it's children are
+ complete */
enum VSL_transaction_e type;
@@ -117,17 +149,15 @@ struct vtx {
unsigned n_childready;
unsigned n_descend;
- struct vslc_vtx c;
-
- VTAILQ_HEAD(,vtx_diag) diag;
+ VTAILQ_HEAD(,synth) synth;
- struct vtx_chunk chunk[VTX_CHUNKS];
- unsigned n_chunk;
+ struct chunk shmchunks[VTX_SHMCHUNKS];
+ struct chunkhead shmchunks_free;
- uint32_t *buf;
- ssize_t bufsize;
+ struct chunkhead chunks;
+ size_t len;
- ssize_t len;
+ struct vslc_vtx c;
};
struct VSLQ {
@@ -145,14 +175,15 @@ struct VSLQ {
VTAILQ_HEAD(,vtx) incomplete;
unsigned n_incomplete;
- VTAILQ_HEAD(,vtx) shmlist;
+ struct chunkhead shmrefs;
VTAILQ_HEAD(,vtx) cache;
unsigned n_cache;
};
+static void vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...);
/*lint -esym(534, vtx_diag) */
-static int vtx_diag(struct vtx *vtx, const char *fmt, ...);
+static int vtx_diag(struct vtx *vtx, const char *msg);
/*lint -esym(534, vtx_diag_tag) */
static int vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr,
const char *reason);
@@ -217,49 +248,49 @@ static int
vslc_vtx_next(struct VSL_cursor *cursor)
{
struct vslc_vtx *c;
- struct vtx_chunk *chunk;
+ const uint32_t *ptr;
CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
assert(&c->cursor == cursor);
CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
- if (c->diag == NULL && VTAILQ_FIRST(&c->vtx->diag) != NULL) {
- /* Send first diag msg */
- c->diag = VTAILQ_FIRST(&c->vtx->diag);
- c->cursor.rec.ptr = c->diag->chunk;
- return (1);
- } else if (c->diag != NULL && VTAILQ_NEXT(c->diag, list) != NULL) {
- /* Send next diag msg */
- c->diag = VTAILQ_NEXT(c->diag, list);
- c->cursor.rec.ptr = c->diag->chunk;
- return (1);
- }
-
- assert (c->offset <= c->vtx->len);
- if (c->offset == c->vtx->len)
- return (0);
-
- if (c->vtx->n_chunk == 0) {
- /* Buffer */
- AN(c->vtx->buf);
- assert(c->offset < c->vtx->bufsize);
- c->cursor.rec.ptr = c->vtx->buf + c->offset;
- c->offset += VSL_NEXT(c->cursor.rec.ptr) - c->cursor.rec.ptr;
- return (1);
- }
-
- /* Shmptr chunks */
- assert(c->chunk < c->vtx->n_chunk);
- chunk = &c->vtx->chunk[c->chunk];
- assert(c->offset >= chunk->offset);
- assert(c->offset <= chunk->offset + chunk->len);
- if (c->offset == chunk->offset + chunk->len) {
- c->chunk++;
- chunk = &c->vtx->chunk[c->chunk];
- }
- AN(chunk->start.ptr);
- c->cursor.rec.ptr = chunk->start.ptr + c->offset - chunk->offset;
- c->offset += VSL_NEXT(c->cursor.rec.ptr) - c->cursor.rec.ptr;
+ do {
+ CHECK_OBJ_ORNULL(c->synth, SYNTH_MAGIC);
+ if (c->synth != NULL && c->synth->offset == c->offset) {
+ /* We're at the offset of the next synth record,
+ point to it and advance the pointer */
+ c->cursor.rec.ptr = c->synth->data;
+ c->synth = VTAILQ_NEXT(c->synth, list);
+ } else {
+ assert(c->offset <= c->vtx->len);
+ if (c->offset == c->vtx->len)
+ /* End of cursor */
+ return (0);
+
+ /* Advance chunk pointer */
+ if (c->chunk == NULL) {
+ c->chunk = VTAILQ_FIRST(&c->vtx->chunks);
+ c->chunkstart = 0;
+ }
+ CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
+ while (c->offset >= c->chunkstart + c->chunk->len) {
+ c->chunkstart += c->chunk->len;
+ c->chunk = VTAILQ_NEXT(c->chunk, list);
+ CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
+ }
+
+ /* Point to the next stored record */
+ if (c->chunk->type == chunk_t_shm)
+ ptr = c->chunk->shm.start.ptr;
+ else {
+ assert(c->chunk->type == chunk_t_buf);
+ ptr = c->chunk->buf.data;
+ }
+ c->cursor.rec.ptr = ptr + c->offset - c->chunkstart;
+ c->offset += VSL_NEXT(c->cursor.rec.ptr) -
+ c->cursor.rec.ptr;
+ }
+ } while (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch);
return (1);
}
@@ -271,8 +302,10 @@ vslc_vtx_reset(struct VSL_cursor *cursor)
CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
assert(&c->cursor == cursor);
- c->diag = NULL;
- c->chunk = 0;
+ CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
+ c->synth = VTAILQ_FIRST(&c->vtx->synth);
+ c->chunk = NULL;
+ c->chunkstart = 0;
c->offset = 0;
c->cursor.rec.ptr = NULL;
@@ -288,10 +321,136 @@ static const struct vslc_tbl vslc_vtx_tbl = {
.check = NULL,
};
+/* Create a buf chunk */
+static struct chunk *
+chunk_newbuf(struct vtx *vtx, const uint32_t *ptr, size_t len)
+{
+ struct chunk *chunk;
+
+ ALLOC_OBJ(chunk, CHUNK_MAGIC);
+ chunk->type = chunk_t_buf;
+ chunk->vtx = vtx;
+ chunk->buf.space = VTX_BUFSIZE_MIN;
+ while (chunk->buf.space < len)
+ chunk->buf.space *= 2;
+ chunk->buf.data = malloc(sizeof (uint32_t) * chunk->buf.space);
+ AN(chunk->buf.data);
+ memcpy(chunk->buf.data, ptr, sizeof (uint32_t) * len);
+ chunk->len = len;
+ return (chunk);
+}
+
+/* Free a buf chunk */
+static void
+chunk_freebuf(struct chunk **pchunk)
+{
+
+ CHECK_OBJ_NOTNULL(*pchunk, CHUNK_MAGIC);
+ assert((*pchunk)->type == chunk_t_buf);
+ free((*pchunk)->buf.data);
+ FREE_OBJ(*pchunk);
+ *pchunk = NULL;
+}
+
+/* Append a set of records to a chunk */
+static void
+chunk_appendbuf(struct chunk *chunk, const uint32_t *ptr, size_t len)
+{
+
+ CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
+ assert(chunk->type == chunk_t_buf);
+ if (chunk->buf.space < chunk->len + len) {
+ while (chunk->buf.space < chunk->len + len)
+ chunk->buf.space *= 2;
+ chunk->buf.data = realloc(chunk->buf.data,
+ sizeof (uint32_t) * chunk->buf.space);
+ }
+ memcpy(chunk->buf.data + chunk->len, ptr, sizeof (uint32_t) * len);
+ chunk->len += len;
+}
+
+/* Transform a shm chunk to a buf chunk */
+static void
+chunk_shm_to_buf(struct VSLQ *vslq, struct chunk *chunk)
+{
+ struct vtx *vtx;
+ struct chunk *buf;
+
+ CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
+ assert(chunk->type == chunk_t_shm);
+ vtx = chunk->vtx;
+ CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
+
+ buf = VTAILQ_PREV(chunk, chunkhead, list);
+ if (buf != NULL && buf->type == chunk_t_buf)
+ /* Previous is a buf chunk, append to it */
+ chunk_appendbuf(buf, chunk->shm.start.ptr, chunk->len);
+ else {
+ /* Create a new buf chunk and insert it before this */
+ buf = chunk_newbuf(vtx, chunk->shm.start.ptr, chunk->len);
+ AN(buf);
+ VTAILQ_INSERT_BEFORE(chunk, buf, list);
+ }
+
+ /* Reset cursor chunk pointer, vslc_vtx_next will set it correctly */
+ vtx->c.chunk = NULL;
+
+ /* Remove from the shmref list and vtx, and put chunk back
+ on the free list */
+ VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
+ VTAILQ_REMOVE(&vtx->chunks, chunk, list);
+ VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
+}
+
+/* Append a set of records to a vtx structure */
+static void
+vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
+ size_t len)
+{
+ struct chunk *chunk;
+
+ AN(vtx);
+ if (len == 0)
+ return;
+ AN(start);
+
+ if (VSL_Check(vslq->c, start) == 2 &&
+ !VTAILQ_EMPTY(&vtx->shmchunks_free)) {
+ /* Shmref it */
+ chunk = VTAILQ_FIRST(&vtx->shmchunks_free);
+ CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
+ assert(chunk->type == chunk_t_shm);
+ assert(chunk->vtx == vtx);
+ VTAILQ_REMOVE(&vtx->shmchunks_free, chunk, list);
+ chunk->shm.start = *start;
+ chunk->len = len;
+ VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
+
+ /* Append to shmref list */
+ VTAILQ_INSERT_TAIL(&vslq->shmrefs, chunk, shm.shmref);
+ } else {
+ /* Buffer it */
+ chunk = VTAILQ_LAST(&vtx->chunks, chunkhead);
+ CHECK_OBJ_ORNULL(chunk, CHUNK_MAGIC);
+ if (chunk != NULL && chunk->type == chunk_t_buf) {
+ /* Tail is a buf chunk, append to that */
+ chunk_appendbuf(chunk, start->ptr, len);
+ } else {
+ /* Append new buf chunk */
+ chunk = chunk_newbuf(vtx, start->ptr, len);
+ AN(chunk);
+ VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
+ }
+ }
+ vtx->len += len;
+}
+
+/* Allocate a new vtx structure */
static struct vtx *
vtx_new(struct VSLQ *vslq)
{
struct vtx *vtx;
+ int i;
AN(vslq);
if (vslq->n_cache) {
@@ -302,52 +461,48 @@ vtx_new(struct VSLQ *vslq)
} else {
ALLOC_OBJ(vtx, VTX_MAGIC);
AN(vtx);
+
+ VTAILQ_INIT(&vtx->child);
+ VTAILQ_INIT(&vtx->shmchunks_free);
+ for (i = 0; i < VTX_SHMCHUNKS; i++) {
+ vtx->shmchunks[i].magic = CHUNK_MAGIC;
+ vtx->shmchunks[i].type = chunk_t_shm;
+ vtx->shmchunks[i].vtx = vtx;
+ VTAILQ_INSERT_TAIL(&vtx->shmchunks_free,
+ &vtx->shmchunks[i], list);
+ }
+ VTAILQ_INIT(&vtx->chunks);
+ VTAILQ_INIT(&vtx->synth);
vtx->c.magic = VSLC_VTX_MAGIC;
vtx->c.vtx = vtx;
vtx->c.cursor.priv_tbl = &vslc_vtx_tbl;
vtx->c.cursor.priv_data = &vtx->c;
}
+ CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
vtx->key.vxid = 0;
vtx->t_start = VTIM_mono();
vtx->flags = 0;
vtx->type = VSL_t_unknown;
vtx->parent = NULL;
- VTAILQ_INIT(&vtx->child);
vtx->n_child = 0;
vtx->n_childready = 0;
vtx->n_descend = 0;
- VTAILQ_INIT(&vtx->diag);
- memset(vtx->chunk, 0, sizeof vtx->chunk);
- vtx->n_chunk = 0;
vtx->len = 0;
(void)vslc_vtx_reset(&vtx->c.cursor);
- VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_incomplete);
- vslq->n_incomplete++;
-
return (vtx);
}
-static void
-vtx_free(struct vtx **pvtx)
-{
- struct vtx *vtx;
-
- AN(pvtx);
- vtx = *pvtx;
- *pvtx = NULL;
-
- free(vtx->buf);
- FREE_OBJ(vtx);
-}
-
+/* Disuse a vtx and all it's children, freeing any resources held. Free or
+ cache the vtx for later use */
static void
vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
{
struct vtx *vtx;
struct vtx *child;
- struct vtx_diag *diag;
+ struct synth *synth;
+ struct chunk *chunk;
AN(vslq);
AN(pvtx);
@@ -374,177 +529,161 @@ vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
}
AZ(vtx->n_child);
AZ(vtx->n_descend);
+ vtx->n_childready = 0;
AN(VRB_REMOVE(vtx_tree, &vslq->tree, &vtx->key));
+ vtx->key.vxid = 0;
+ vtx->flags = 0;
- while (!VTAILQ_EMPTY(&vtx->diag)) {
- diag = VTAILQ_FIRST(&vtx->diag);
- VTAILQ_REMOVE(&vtx->diag, diag, list);
- FREE_OBJ(diag);
+ while (!VTAILQ_EMPTY(&vtx->synth)) {
+ synth = VTAILQ_FIRST(&vtx->synth);
+ CHECK_OBJ_NOTNULL(synth, SYNTH_MAGIC);
+ VTAILQ_REMOVE(&vtx->synth, synth, list);
+ FREE_OBJ(synth);
}
- if (vtx->n_chunk)
- VTAILQ_REMOVE(&vslq->shmlist, vtx, list_shm);
+ while (!VTAILQ_EMPTY(&vtx->chunks)) {
+ chunk = VTAILQ_FIRST(&vtx->chunks);
+ CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
+ VTAILQ_REMOVE(&vtx->chunks, chunk, list);
+ if (chunk->type == chunk_t_shm) {
+ VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
+ VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
+ } else {
+ assert(chunk->type == chunk_t_buf);
+ chunk_freebuf(&chunk);
+ AZ(chunk);
+ }
+ }
+ vtx->len = 0;
if (vslq->n_cache < VTX_CACHE) {
VTAILQ_INSERT_HEAD(&vslq->cache, vtx, list_child);
vslq->n_cache++;
} else {
- vtx_free(&vtx);
- AZ(vtx);
+ FREE_OBJ(vtx);
+ vtx = NULL;
}
}
+/* Lookup a vtx by vxid from the managed list */
static struct vtx *
-vtx_lori(struct VSLQ *vslq, unsigned vxid)
+vtx_lookup(struct VSLQ *vslq, unsigned vxid)
{
- struct vtx *vtx;
struct vtx_key lkey, *key;
+ struct vtx *vtx;
AN(vslq);
lkey.vxid = vxid;
key = VRB_FIND(vtx_tree, &vslq->tree, &lkey);
- if (key != NULL) {
- CAST_OBJ_NOTNULL(vtx, (void *)key, VTX_MAGIC);
- return (vtx);
- }
-
- vtx = vtx_new(vslq);
- AN(vtx);
- vtx->key.vxid = vxid;
- AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
+ if (key == NULL)
+ return (NULL);
+ CAST_OBJ_NOTNULL(vtx, (void *)key, VTX_MAGIC);
return (vtx);
}
-static void
-vtx_set_bufsize(struct vtx *vtx, ssize_t len)
-{
-
- AN(vtx);
- assert(len >= 0);
- if (vtx->bufsize >= len)
- return;
- if (vtx->bufsize == 0)
- vtx->bufsize = VTX_BUFSIZE_MIN;
- while (vtx->bufsize < len)
- vtx->bufsize *= 2;
- vtx->buf = realloc(vtx->buf, sizeof (uint32_t) * vtx->bufsize);
- AN(vtx->buf);
-}
-
-static void
-vtx_buffer(struct VSLQ *vslq, struct vtx *vtx)
-{
- int i;
-
- AN(vtx->n_chunk);
- AN(vtx->len);
-
- vtx_set_bufsize(vtx, vtx->len);
- AN(vtx->buf);
- assert(vtx->bufsize >= vtx->len);
-
- for (i = 0; i < vtx->n_chunk; i++)
- memcpy(vtx->buf + vtx->chunk[i].offset, vtx->chunk[i].start.ptr,
- sizeof (uint32_t) * vtx->chunk[i].len);
-
- memset(vtx->chunk, 0, sizeof vtx->chunk);
- VTAILQ_REMOVE(&vslq->shmlist, vtx, list_shm);
- vtx->n_chunk = 0;
-}
-
-static void
-vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
- ssize_t len, int copy)
+/* Insert a new vtx into the managed list */
+static struct vtx *
+vtx_add(struct VSLQ *vslq, unsigned vxid)
{
+ struct vtx *vtx;
+ AN(vslq);
+ vtx = vtx_new(vslq);
AN(vtx);
- if (len == 0)
- return;
- AN(start);
-
- if (vtx->len > 0 && vtx->n_chunk == 0)
- /* Can't mix buffer and shmptr */
- copy = 1;
-
- if (!copy && vtx->n_chunk < VTX_CHUNKS) {
- /* Add shmptr chunk */
- AZ(vtx->chunk[vtx->n_chunk].len);
- vtx->chunk[vtx->n_chunk].start = *start;
- vtx->chunk[vtx->n_chunk].len = len;
- vtx->chunk[vtx->n_chunk].offset = vtx->len;
- vtx->len += len;
- if (vtx->n_chunk == 0)
- VTAILQ_INSERT_TAIL(&vslq->shmlist, vtx, list_shm);
- vtx->n_chunk++;
- return;
- }
-
- /* Append to buffer */
- vtx_set_bufsize(vtx, vtx->len + len);
- if (vtx->n_chunk)
- vtx_buffer(vslq, vtx);
- AZ(vtx->n_chunk);
- AN(vtx->buf);
- memcpy(vtx->buf + vtx->len, start->ptr, sizeof (uint32_t) * len);
- vtx->len += len;
+ vtx->key.vxid = vxid;
+ AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
+ VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_incomplete);
+ vslq->n_incomplete++;
+ return (vtx);
}
+/* Mark a vtx complete, update child counters and if possible push it or
+ it's top parent to the ready state */
static struct vtx *
-vtx_check_ready(struct VSLQ *vslq, struct vtx *vtx)
+vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
{
- struct vtx *ready;
AN(vslq);
- AN(vtx->flags & VTX_F_COMPLETE);
- AZ(vtx->flags & VTX_F_READY);
+ AN(vtx->flags & VTX_F_END);
+ AZ(vtx->flags & VTX_F_COMPLETE);
if (vtx->type == VSL_t_unknown)
vtx_diag(vtx, "vtx of unknown type marked complete");
- ready = vtx;
+ vtx->flags |= VTX_F_COMPLETE;
+ VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
+ AN(vslq->n_incomplete);
+ vslq->n_incomplete--;
+
while (1) {
- if (ready->flags & VTX_F_COMPLETE &&
- ready->n_child == ready->n_childready)
- ready->flags |= VTX_F_READY;
+ AZ(vtx->flags & VTX_F_READY);
+ if (vtx->flags & VTX_F_COMPLETE &&
+ vtx->n_child == vtx->n_childready)
+ vtx->flags |= VTX_F_READY;
else
- break;
- if (ready->parent == NULL)
- break;
- ready = ready->parent;
- ready->n_childready++;
- assert(ready->n_child >= ready->n_childready);
+ return (NULL);
+ if (vtx->parent == NULL) {
+ /* Top level vtx ready */
+ return (vtx);
+ }
+ vtx = vtx->parent;
+ vtx->n_childready++;
+ assert(vtx->n_child >= vtx->n_childready);
}
- if (ready->flags & VTX_F_READY && ready->parent == NULL)
- /* Top level vtx ready */
- return (ready);
+ if (vtx->flags & VTX_F_READY && vtx->parent == NULL)
+ return (vtx);
return (NULL);
}
+/* Add a child to a parent, and update child counters */
+static void
+vtx_set_parent(struct vtx *parent, struct vtx *child)
+{
+
+ CHECK_OBJ_NOTNULL(parent, VTX_MAGIC);
+ CHECK_OBJ_NOTNULL(child, VTX_MAGIC);
+ AZ(parent->flags & VTX_F_COMPLETE);
+ AZ(child->flags & VTX_F_COMPLETE);
+ AZ(child->parent);
+ child->parent = parent;
+ VTAILQ_INSERT_TAIL(&parent->child, child, list_child);
+ parent->n_child++;
+ do
+ parent->n_descend += 1 + child->n_descend;
+ while ((parent = parent->parent));
+}
+
+/* Parse a begin or link record. Returns the number of elements that was
+ successfully parsed. */
static int
-vtx_parsetag_bl(const char *str, enum VSL_transaction_e *ptype,
+vtx_parse_beginlink(const char *str, enum VSL_transaction_e *ptype,
unsigned *pvxid)
{
- char buf[7];
+ char buf[8];
unsigned vxid;
- int i;
+ int i, j;
enum VSL_transaction_e type = VSL_t_unknown;
AN(str);
- i = sscanf(str, "%6s %u", buf, &vxid);
+ i = sscanf(str, "%7s %u", buf, &vxid);
if (i < 1)
return (-1);
- if (!strcmp(buf, "sess"))
- type = VSL_t_sess;
- else if (!strcmp(buf, "req"))
- type = VSL_t_req;
- else if (!strcmp(buf, "esireq"))
- type = VSL_t_esireq;
- else if (!strcmp(buf, "bereq"))
- type = VSL_t_bereq;
- else
+ for (j = 0; j < VSL_t__MAX; j++)
+ if (!strcmp(buf, vsl_t_names[j]))
+ break;
+ switch (j) {
+ case VSL_t_sess:
+ case VSL_t_req:
+ case VSL_t_esireq:
+ case VSL_t_bereq:
+ /* Valid types */
+ type = j;
+ break;
+ default:
return (-1);
+ }
if (i == 1)
vxid = 0;
if (ptype)
@@ -554,23 +693,9 @@ vtx_parsetag_bl(const char *str, enum VSL_transaction_e *ptype,
return (i);
}
-static void
-vtx_set_parent(struct vtx *parent, struct vtx *child)
-{
-
- AN(parent);
- AN(child);
- AZ(child->parent);
- child->parent = parent;
- VTAILQ_INSERT_TAIL(&parent->child, child, list_child);
- parent->n_child++;
- do
- parent->n_descend += 1 + child->n_descend;
- while ((parent = parent->parent));
-}
-
+/* Parse and process a begin record */
static int
-vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
+vtx_scan_begin(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
{
int i;
enum VSL_transaction_e type;
@@ -579,17 +704,19 @@ vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
assert(VSL_TAG(ptr) == SLT_Begin);
- if (vtx->flags & VTX_F_READY)
- return (vtx_diag_tag(vtx, ptr, "link too late"));
+ AZ(vtx->flags & VTX_F_READY);
- i = vtx_parsetag_bl(VSL_CDATA(ptr), &type, &p_vxid);
+ i = vtx_parse_beginlink(VSL_CDATA(ptr), &type, &p_vxid);
if (i < 1)
return (vtx_diag_tag(vtx, ptr, "parse error"));
/* Check/set vtx type */
assert(type != VSL_t_unknown);
- if (vtx->type != VSL_t_unknown && vtx->type != type)
+ if (vtx->type != VSL_t_unknown && vtx->type != type) {
+ /* Type not matching the one previously set by a link
+ record */
return (vtx_diag_tag(vtx, ptr, "type mismatch"));
+ }
vtx->type = type;
if (i == 1 || p_vxid == 0)
@@ -600,25 +727,38 @@ vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_req)
return (0); /* No links */
- /* Lookup and check parent vtx */
- p_vtx = vtx_lori(vslq, p_vxid);
- AN(p_vtx);
- if (vtx->parent == p_vtx)
- /* Link already exists */
+ if (vtx->parent != NULL) {
+ if (vtx->parent->key.vxid != p_vxid) {
+ /* This vtx already belongs to a different
+ parent */
+ return (vtx_diag_tag(vtx, ptr, "link mismatch"));
+ } else
+ /* Link already exists */
+ return (0);
+ }
+
+ p_vtx = vtx_lookup(vslq, p_vxid);
+ if (p_vtx == NULL) {
+ /* Not seen parent yet. Insert it and create link. */
+ p_vtx = vtx_add(vslq, p_vxid);
+ AN(p_vtx);
+ vtx_set_parent(p_vtx, vtx);
return (0);
+ }
- if (vtx->parent != NULL)
- return (vtx_diag_tag(vtx, ptr, "duplicate link"));
- if (p_vtx->flags & VTX_F_READY)
+ CHECK_OBJ_NOTNULL(p_vtx, VTX_MAGIC);
+ if (p_vtx->flags & VTX_F_COMPLETE)
return (vtx_diag_tag(vtx, ptr, "link too late"));
+ /* Create link */
vtx_set_parent(p_vtx, vtx);
return (0);
}
+/* Parse and process a link record */
static int
-vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
+vtx_scan_link(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
{
int i;
enum VSL_transaction_e c_type;
@@ -627,13 +767,13 @@ vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
assert(VSL_TAG(ptr) == SLT_Link);
- if (vtx->flags & VTX_F_READY)
- return (vtx_diag_tag(vtx, ptr, "link too late"));
+ AZ(vtx->flags & VTX_F_READY);
- i = vtx_parsetag_bl(VSL_CDATA(ptr), &c_type, &c_vxid);
+ i = vtx_parse_beginlink(VSL_CDATA(ptr), &c_type, &c_vxid);
if (i < 2)
return (vtx_diag_tag(vtx, ptr, "parse error"));
assert(i == 2);
+ assert(c_type != VSL_t_unknown);
if (vslq->grouping == VSL_g_vxid)
return (0); /* No links */
@@ -641,88 +781,92 @@ vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
return (0); /* No links */
/* Lookup and check child vtx */
- c_vtx = vtx_lori(vslq, c_vxid);
- AN(c_vtx);
+ c_vtx = vtx_lookup(vslq, c_vxid);
+ if (c_vtx == NULL) {
+ /* Child not seen before. Insert it and create link */
+ c_vtx = vtx_add(vslq, c_vxid);
+ AN(c_vtx);
+ c_vtx->type = c_type;
+ vtx_set_parent(vtx, c_vtx);
+ return (0);
+ }
+
+ CHECK_OBJ_NOTNULL(c_vtx, VTX_MAGIC);
if (c_vtx->parent == vtx)
/* Link already exists */
return (0);
- if (c_vtx->parent != NULL)
+ if (c_vtx->parent != vtx)
return (vtx_diag_tag(vtx, ptr, "duplicate link"));
- if (c_vtx->flags & VTX_F_READY)
+ if (c_vtx->flags & VTX_F_COMPLETE)
return (vtx_diag_tag(vtx, ptr, "link too late"));
if (c_vtx->type != VSL_t_unknown && c_vtx->type != c_type)
return (vtx_diag_tag(vtx, ptr, "type mismatch"));
- c_vtx->type = c_type;
+ c_vtx->type = c_type;
vtx_set_parent(vtx, c_vtx);
-
return (0);
}
+/* Scan the records of a vtx, performing processing actions on specific
+ records */
static struct vtx *
vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
{
const uint32_t *ptr;
enum VSL_tag_e tag;
- struct vtx *ret = NULL;
+
+ if (vtx->flags & VTX_F_END)
+ return (NULL);
while (vslc_vtx_next(&vtx->c.cursor) == 1) {
ptr = vtx->c.cursor.rec.ptr;
tag = VSL_TAG(ptr);
-
- if (tag == SLT__Batch || tag == SLT_VSL)
- continue;
-
- if (vtx->flags & VTX_F_COMPLETE) {
- vtx_diag_tag(vtx, ptr, "late log rec");
- continue;
- }
-
- if (vtx->type == VSL_t_unknown && tag != SLT_Begin)
- vtx_diag_tag(vtx, ptr, "early log rec");
+ assert(tag != SLT__Batch);
switch (tag) {
case SLT_Begin:
- (void)vtx_scan_begintag(vslq, vtx, ptr);
+ (void)vtx_scan_begin(vslq, vtx, ptr);
+ vtx->flags |= VTX_F_BEGIN;
break;
case SLT_Link:
- (void)vtx_scan_linktag(vslq, vtx, ptr);
+ (void)vtx_scan_link(vslq, vtx, ptr);
break;
case SLT_End:
- AZ(vtx->flags & VTX_F_COMPLETE);
- AZ(ret);
- VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
- vtx->flags |= VTX_F_COMPLETE;
- AN(vslq->n_incomplete);
- vslq->n_incomplete--;
- ret = vtx_check_ready(vslq, vtx);
- break;
+ vtx->flags |= VTX_F_END;
+ return (vtx_mark_complete(vslq, vtx));
default:
break;
}
}
- return (ret);
+ return (NULL);
}
+/* Force a vtx into complete status by synthing the necessary outstanding
+ records */
static struct vtx *
vtx_force(struct VSLQ *vslq, struct vtx *vtx, const char *reason)
{
+ struct vtx *ret;
+
AZ(vtx->flags & VTX_F_COMPLETE);
AZ(vtx->flags & VTX_F_READY);
+ if (!(vtx->flags & VTX_F_BEGIN))
+ vtx_synth_rec(vtx, SLT_Begin, "%s %u synth",
+ vsl_t_names[vtx->type], vtx->key.vxid);
vtx_diag(vtx, reason);
-
- VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
- vtx->flags |= VTX_F_COMPLETE;
- AN(vslq->n_incomplete);
- vslq->n_incomplete--;
-
- return (vtx_check_ready(vslq, vtx));
+ if (!(vtx->flags & VTX_F_END))
+ vtx_synth_rec(vtx, SLT_End, "synth");
+ ret = vtx_scan(vslq, vtx);
+ AN(vtx->flags & VTX_F_COMPLETE);
+ return (ret);
}
+/* Build transaction array, do the query and callback. Returns 0 or the
+ return value from func */
static int
vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
void *priv)
@@ -735,6 +879,7 @@ vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
AN(vslq);
CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
+ AN(vtx->flags & VTX_F_READY);
if (func == NULL)
return (0);
@@ -784,19 +929,21 @@ vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
return ((func)(vslq->vsl, ptrans, priv));
}
-static int
-vtx_diag(struct vtx *vtx, const char *fmt, ...)
+/* Create a synthetic log record. The record will be inserted at the
+ current cursor offset */
+static void
+vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...)
{
- struct vtx_diag *diag;
+ struct synth *synth, *it;
va_list ap;
char *buf;
int l, buflen;
- ALLOC_OBJ(diag, VTX_DIAG_MAGIC);
- AN(diag);
+ ALLOC_OBJ(synth, SYNTH_MAGIC);
+ AN(synth);
- buf = (char *)&diag->chunk[2];
- buflen = sizeof (diag->chunk) - 2 * sizeof (uint32_t);
+ buf = (char *)&synth->data[2];
+ buflen = sizeof (synth->data) - 2 * sizeof (uint32_t);
va_start(ap, fmt);
l = vsnprintf(buf, buflen, fmt, ap);
assert(l >= 0);
@@ -804,29 +951,56 @@ vtx_diag(struct vtx *vtx, const char *fmt, ...)
if (l > buflen - 1)
l = buflen - 1;
buf[l++] = '\0'; /* NUL-terminated */
- diag->chunk[1] = vtx->key.vxid;
+ synth->data[1] = vtx->key.vxid;
switch (vtx->type) {
case VSL_t_req:
case VSL_t_esireq:
- diag->chunk[1] |= VSL_CLIENTMARKER;
+ synth->data[1] |= VSL_CLIENTMARKER;
break;
case VSL_t_bereq:
- diag->chunk[1] |= VSL_BACKENDMARKER;
+ synth->data[1] |= VSL_BACKENDMARKER;
break;
default:
break;
}
- diag->chunk[0] = ((((unsigned)SLT_VSL & 0xff) << 24) | l);
- VTAILQ_INSERT_TAIL(&vtx->diag, diag, list);
+ synth->data[0] = (((tag & 0xff) << 24) | l);
+ synth->offset = vtx->c.offset;
+
+ VTAILQ_FOREACH_REVERSE(it, &vtx->synth, synthhead, list) {
+ /* Make sure the synth list is sorted on offset */
+ CHECK_OBJ_NOTNULL(it, SYNTH_MAGIC);
+ if (synth->offset >= it->offset)
+ break;
+ }
+ if (it != NULL)
+ VTAILQ_INSERT_AFTER(&vtx->synth, it, synth, list);
+ else
+ VTAILQ_INSERT_HEAD(&vtx->synth, synth, list);
+
+ /* Update cursor */
+ CHECK_OBJ_ORNULL(vtx->c.synth, SYNTH_MAGIC);
+ if (vtx->c.synth == NULL || vtx->c.synth->offset > synth->offset)
+ vtx->c.synth = synth;
+}
+
+/* Add a diagnostic SLT_VSL synth record to the vtx. */
+static int
+vtx_diag(struct vtx *vtx, const char *msg)
+{
+ vtx_synth_rec(vtx, SLT_VSL, msg);
return (-1);
}
+/* Add a SLT_VSL diag synth record to the vtx. Takes an offending record
+ that will be included in the log record */
static int
vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr, const char *reason)
{
- return (vtx_diag(vtx, "%s (%s: %.*s)", reason,
- VSL_tags[VSL_TAG(ptr)], (int)VSL_LEN(ptr), VSL_CDATA(ptr)));
+
+ vtx_synth_rec(vtx, SLT_VSL, "%s (%s: %.*s)", reason,
+ VSL_tags[VSL_TAG(ptr)], (int)VSL_LEN(ptr), VSL_CDATA(ptr));
+ return (-1);
}
struct VSLQ *
@@ -858,7 +1032,7 @@ VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
vslq->query = query;
VRB_INIT(&vslq->tree);
VTAILQ_INIT(&vslq->incomplete);
- VTAILQ_INIT(&vslq->shmlist);
+ VTAILQ_INIT(&vslq->shmrefs);
VTAILQ_INIT(&vslq->cache);
return (vslq);
@@ -889,13 +1063,14 @@ VSLQ_Delete(struct VSLQ **pvslq)
vtx = VTAILQ_FIRST(&vslq->cache);
VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
vslq->n_cache--;
- vtx_free(&vtx);
- AZ(vtx);
+ FREE_OBJ(vtx);
}
FREE_OBJ(vslq);
}
+/* Regard each log line as a single transaction, feed it through the query
+ and do the callback */
static int
vslq_raw(const struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
{
@@ -944,11 +1119,45 @@ vslq_raw(const struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
return (i);
}
+/* Check the beginning of the shmref list, and buffer refs that are at
+ * warning level.
+ *
+ * Returns:
+ * 0: OK
+ * -3: Failure
+ */
+static int
+vslq_shmref_check(struct VSLQ *vslq)
+{
+ struct chunk *chunk;
+ int i;
+
+ while ((chunk = VTAILQ_FIRST(&vslq->shmrefs))) {
+ CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
+ assert(chunk->type == chunk_t_shm);
+ i = VSL_Check(vslq->c, &chunk->shm.start);
+ if (i == 2)
+ /* First on list is OK, refs behind it must also
+ be OK */
+ return (0);
+ else if (i == 1)
+ /* Warning level. Buffer this chunk */
+ chunk_shm_to_buf(vslq, chunk);
+ else
+ /* Too late to buffer */
+ return (-3);
+ }
+
+ return (0);
+}
+
+/* Process the input cursor, calling the callback function on matching
+ transaction sets */
int
VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
{
struct VSL_cursor *c;
- int i;
+ int i, batch;
enum VSL_tag_e tag;
ssize_t len;
unsigned vxid;
@@ -962,39 +1171,45 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
c = vslq->c;
while (1) {
- /* Check shmlist and buffer on warning */
- while ((vtx = VTAILQ_FIRST(&vslq->shmlist))) {
- AN(vtx->n_chunk);
- i = VSL_Check(c, &vtx->chunk[0].start);
- if (i == 2)
- break;
- else if (i == 1)
- vtx_buffer(vslq, vtx);
- else
- /* Too late */
- return (-3);
- }
+ /* Check shmref list */
+ i = vslq_shmref_check(vslq);
+ if (i)
+ break;
i = VSL_Next(c);
if (i != 1)
break;
tag = VSL_TAG(c->rec.ptr);
if (tag == SLT__Batch) {
+ batch = 1;
vxid = VSL_BATCHID(c->rec.ptr);
len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
c->rec.ptr;
+ if (len == 0)
+ continue;
+ tag = VSL_TAG(VSL_NEXT(c->rec.ptr));
} else {
+ batch = 0;
vxid = VSL_ID(c->rec.ptr);
len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
}
+
+ assert(len > 0);
if (vxid == 0)
+ /* Skip non-transactional records */
continue;
- vtx = vtx_lori(vslq, vxid);
- AN(vtx);
- vtx_append(vslq, vtx, &c->rec, len, VSL_Check(c, &c->rec) != 2);
- if (tag == SLT__Batch)
+
+ vtx = vtx_lookup(vslq, vxid);
+ if (vtx == NULL && tag == SLT_Begin) {
+ vtx = vtx_add(vslq, vxid);
+ AN(vtx);
+ }
+ if (vtx != NULL) {
+ vtx_append(vslq, vtx, &c->rec, len);
+ vtx = vtx_scan(vslq, vtx);
+ }
+ if (batch)
AZ(vsl_skip(c, VSL_WORDS(VSL_BATCHLEN(c->rec.ptr))));
- vtx = vtx_scan(vslq, vtx);
if (vtx) {
AN(vtx->flags & VTX_F_READY);
i = vslq_callback(vslq, vtx, func, priv);
@@ -1042,6 +1257,8 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
return (i);
}
+/* Flush incomplete any incomplete vtx held on to. Do callbacks if func !=
+ NULL */
int
VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
{
More information about the varnish-commit
mailing list