[master] 19a5542 Rework VSL_Dispatch

Martin Blix Grydeland martin at varnish-cache.org
Wed Oct 9 16:03:03 CEST 2013


commit 19a5542d7b1795e3fcfd6f0697ecace41ddba4d2
Author: Martin Blix Grydeland <martin at varnish-software.com>
Date:   Tue Oct 8 10:22:12 2013 +0200

    Rework VSL_Dispatch
    
    Highlights:
    
    * Improved buffer handling. A vtx can now have a mix of buffered and
      shm chunks. And as shm refs are used and later buffered, they can be
      reused on the same vtx. This benefits long running transactions.
    
    * Synth records are now returned by the cursor at the position the
      cursor was at when it was created. This solves a problem when
      writing to a file and a vtx times out, the reading of that file
      would also cause a time out delay.
    
    * Begin records are now strictly honored, and any records before a
      Begin record are now ignored.
    
    * More asserts on the structures
    
    * More comments, should be easier to understand now
    
    * Various other changes

diff --git a/lib/libvarnishapi/vsl_dispatch.c b/lib/libvarnishapi/vsl_dispatch.c
index db92c7f..a47d018 100644
--- a/lib/libvarnishapi/vsl_dispatch.c
+++ b/lib/libvarnishapi/vsl_dispatch.c
@@ -46,7 +46,16 @@
 
 #define VTX_CACHE 10
 #define VTX_BUFSIZE_MIN 64
-#define VTX_CHUNKS 3
+#define VTX_SHMCHUNKS 3
+
+static const char * const vsl_t_names[VSL_t__MAX] = {
+	[VSL_t_unknown]	= "unknown",
+	[VSL_t_sess]	= "sess",
+	[VSL_t_req]	= "req",
+	[VSL_t_esireq]	= "esireq",
+	[VSL_t_bereq]	= "bereq",
+	[VSL_t_raw]	= "raw",
+};
 
 struct vtx;
 
@@ -61,20 +70,41 @@ struct vslc_raw {
 	const uint32_t		*next;
 };
 
-struct vtx_chunk {
-	struct VSLC_ptr		start;
-	ssize_t			len;
-	ssize_t			offset;
-};
-
-struct vtx_diag {
+struct synth {
 	unsigned		magic;
-#define VTX_DIAG_MAGIC		0xC654479F
+#define SYNTH_MAGIC		0xC654479F
 
-	VTAILQ_ENTRY(vtx_diag)	list;
-	uint32_t		chunk[2 + 256 / sizeof (uint32_t)];
+	VTAILQ_ENTRY(synth)	list;
+	size_t			offset;
+	uint32_t		data[2 + 64 / sizeof (uint32_t)];
 };
+VTAILQ_HEAD(synthhead, synth);
 
+enum chunk_t {
+	chunk_t__unassigned,
+	chunk_t_shm,
+	chunk_t_buf,
+};
+
+struct chunk {
+	unsigned				magic;
+#define CHUNK_MAGIC				0x48DC0194
+	enum chunk_t				type;
+	union {
+		struct {
+			struct VSLC_ptr		start;
+			VTAILQ_ENTRY(chunk)	shmref;
+		} shm;
+		struct {
+			uint32_t		*data;
+			size_t			space;
+		} buf;
+	};
+	size_t					len;
+	struct vtx				*vtx;
+	VTAILQ_ENTRY(chunk)			list;
+};
+VTAILQ_HEAD(chunkhead, chunk);
 
 struct vslc_vtx {
 	unsigned		magic;
@@ -83,11 +113,10 @@ struct vslc_vtx {
 	struct VSL_cursor	cursor;
 
 	struct vtx		*vtx;
-
-	struct vtx_diag		*diag;	/* Current diag message pointer */
-
-	unsigned		chunk;	/* Current chunk */
-	ssize_t			offset; /* Offset of next record */
+	struct synth		*synth;
+	struct chunk		*chunk;
+	size_t			chunkstart;
+	size_t			offset;
 };
 
 struct vtx_key {
@@ -102,12 +131,15 @@ struct vtx {
 #define VTX_MAGIC		0xACC21D09
 	VTAILQ_ENTRY(vtx)	list_child;
 	VTAILQ_ENTRY(vtx)	list_incomplete;
-	VTAILQ_ENTRY(vtx)	list_shm;
 
 	double			t_start;
 	unsigned		flags;
-#define VTX_F_COMPLETE		0x1
-#define VTX_F_READY		0x2
+#define VTX_F_BEGIN		0x1 /* Begin record processed */
+#define VTX_F_END		0x2 /* End record processed */
+#define VTX_F_COMPLETE		0x4 /* Marked complete. No new children
+				       should be appended */
+#define VTX_F_READY		0x8 /* This vtx and all it's children are
+				       complete */
 
 	enum VSL_transaction_e	type;
 
@@ -117,17 +149,15 @@ struct vtx {
 	unsigned		n_childready;
 	unsigned		n_descend;
 
-	struct vslc_vtx		c;
-
-	VTAILQ_HEAD(,vtx_diag)	diag;
+	VTAILQ_HEAD(,synth)	synth;
 
-	struct vtx_chunk	chunk[VTX_CHUNKS];
-	unsigned		n_chunk;
+	struct chunk		shmchunks[VTX_SHMCHUNKS];
+	struct chunkhead	shmchunks_free;
 
-	uint32_t		*buf;
-	ssize_t			bufsize;
+	struct chunkhead	chunks;
+	size_t			len;
 
-	ssize_t			len;
+	struct vslc_vtx		c;
 };
 
 struct VSLQ {
@@ -145,14 +175,15 @@ struct VSLQ {
 	VTAILQ_HEAD(,vtx)	incomplete;
 	unsigned		n_incomplete;
 
-	VTAILQ_HEAD(,vtx)	shmlist;
+	struct chunkhead	shmrefs;
 
 	VTAILQ_HEAD(,vtx)	cache;
 	unsigned		n_cache;
 };
 
+static void vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...);
 /*lint -esym(534, vtx_diag) */
-static int vtx_diag(struct vtx *vtx, const char *fmt, ...);
+static int vtx_diag(struct vtx *vtx, const char *msg);
 /*lint -esym(534, vtx_diag_tag) */
 static int vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr,
     const char *reason);
@@ -217,49 +248,49 @@ static int
 vslc_vtx_next(struct VSL_cursor *cursor)
 {
 	struct vslc_vtx *c;
-	struct vtx_chunk *chunk;
+	const uint32_t *ptr;
 
 	CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
 	assert(&c->cursor == cursor);
 	CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
 
-	if (c->diag == NULL && VTAILQ_FIRST(&c->vtx->diag) != NULL) {
-		/* Send first diag msg */
-		c->diag = VTAILQ_FIRST(&c->vtx->diag);
-		c->cursor.rec.ptr = c->diag->chunk;
-		return (1);
-	} else if (c->diag != NULL && VTAILQ_NEXT(c->diag, list) != NULL) {
-		/* Send next diag msg */
-		c->diag = VTAILQ_NEXT(c->diag, list);
-		c->cursor.rec.ptr = c->diag->chunk;
-		return (1);
-	}
-
-	assert (c->offset <= c->vtx->len);
-	if (c->offset == c->vtx->len)
-		return (0);
-
-	if (c->vtx->n_chunk == 0) {
-		/* Buffer */
-		AN(c->vtx->buf);
-		assert(c->offset < c->vtx->bufsize);
-		c->cursor.rec.ptr = c->vtx->buf + c->offset;
-		c->offset += VSL_NEXT(c->cursor.rec.ptr) - c->cursor.rec.ptr;
-		return (1);
-	}
-
-	/* Shmptr chunks */
-	assert(c->chunk < c->vtx->n_chunk);
-	chunk = &c->vtx->chunk[c->chunk];
-	assert(c->offset >= chunk->offset);
-	assert(c->offset <= chunk->offset + chunk->len);
-	if (c->offset == chunk->offset + chunk->len) {
-		c->chunk++;
-		chunk = &c->vtx->chunk[c->chunk];
-	}
-	AN(chunk->start.ptr);
-	c->cursor.rec.ptr = chunk->start.ptr + c->offset - chunk->offset;
-	c->offset += VSL_NEXT(c->cursor.rec.ptr) - c->cursor.rec.ptr;
+	do {
+		CHECK_OBJ_ORNULL(c->synth, SYNTH_MAGIC);
+		if (c->synth != NULL && c->synth->offset == c->offset) {
+			/* We're at the offset of the next synth record,
+			   point to it and advance the pointer */
+			c->cursor.rec.ptr = c->synth->data;
+			c->synth = VTAILQ_NEXT(c->synth, list);
+		} else {
+			assert(c->offset <= c->vtx->len);
+			if (c->offset == c->vtx->len)
+				/* End of cursor */
+				return (0);
+
+			/* Advance chunk pointer */
+			if (c->chunk == NULL) {
+				c->chunk = VTAILQ_FIRST(&c->vtx->chunks);
+				c->chunkstart = 0;
+			}
+			CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
+			while (c->offset >= c->chunkstart + c->chunk->len) {
+				c->chunkstart += c->chunk->len;
+				c->chunk = VTAILQ_NEXT(c->chunk, list);
+				CHECK_OBJ_NOTNULL(c->chunk, CHUNK_MAGIC);
+			}
+
+			/* Point to the next stored record */
+			if (c->chunk->type == chunk_t_shm)
+				ptr = c->chunk->shm.start.ptr;
+			else {
+				assert(c->chunk->type == chunk_t_buf);
+				ptr = c->chunk->buf.data;
+			}
+			c->cursor.rec.ptr = ptr + c->offset - c->chunkstart;
+			c->offset += VSL_NEXT(c->cursor.rec.ptr) -
+			    c->cursor.rec.ptr;
+		}
+	} while (VSL_TAG(c->cursor.rec.ptr) == SLT__Batch);
 
 	return (1);
 }
@@ -271,8 +302,10 @@ vslc_vtx_reset(struct VSL_cursor *cursor)
 
 	CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_VTX_MAGIC);
 	assert(&c->cursor == cursor);
-	c->diag = NULL;
-	c->chunk = 0;
+	CHECK_OBJ_NOTNULL(c->vtx, VTX_MAGIC);
+	c->synth = VTAILQ_FIRST(&c->vtx->synth);
+	c->chunk = NULL;
+	c->chunkstart = 0;
 	c->offset = 0;
 	c->cursor.rec.ptr = NULL;
 
@@ -288,10 +321,136 @@ static const struct vslc_tbl vslc_vtx_tbl = {
 	.check	= NULL,
 };
 
+/* Create a buf chunk */
+static struct chunk *
+chunk_newbuf(struct vtx *vtx, const uint32_t *ptr, size_t len)
+{
+	struct chunk *chunk;
+
+	ALLOC_OBJ(chunk, CHUNK_MAGIC);
+	chunk->type = chunk_t_buf;
+	chunk->vtx = vtx;
+	chunk->buf.space = VTX_BUFSIZE_MIN;
+	while (chunk->buf.space < len)
+		chunk->buf.space *= 2;
+	chunk->buf.data = malloc(sizeof (uint32_t) * chunk->buf.space);
+	AN(chunk->buf.data);
+	memcpy(chunk->buf.data, ptr, sizeof (uint32_t) * len);
+	chunk->len = len;
+	return (chunk);
+}
+
+/* Free a buf chunk */
+static void
+chunk_freebuf(struct chunk **pchunk)
+{
+
+	CHECK_OBJ_NOTNULL(*pchunk, CHUNK_MAGIC);
+	assert((*pchunk)->type == chunk_t_buf);
+	free((*pchunk)->buf.data);
+	FREE_OBJ(*pchunk);
+	*pchunk = NULL;
+}
+
+/* Append a set of records to a chunk */
+static void
+chunk_appendbuf(struct chunk *chunk, const uint32_t *ptr, size_t len)
+{
+
+	CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
+	assert(chunk->type == chunk_t_buf);
+	if (chunk->buf.space < chunk->len + len) {
+		while (chunk->buf.space < chunk->len + len)
+			chunk->buf.space *= 2;
+		chunk->buf.data = realloc(chunk->buf.data,
+		    sizeof (uint32_t) * chunk->buf.space);
+	}
+	memcpy(chunk->buf.data + chunk->len, ptr, sizeof (uint32_t) * len);
+	chunk->len += len;
+}
+
+/* Transform a shm chunk to a buf chunk */
+static void
+chunk_shm_to_buf(struct VSLQ *vslq, struct chunk *chunk)
+{
+	struct vtx *vtx;
+	struct chunk *buf;
+
+	CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
+	assert(chunk->type == chunk_t_shm);
+	vtx = chunk->vtx;
+	CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
+
+	buf = VTAILQ_PREV(chunk, chunkhead, list);
+	if (buf != NULL && buf->type == chunk_t_buf)
+		/* Previous is a buf chunk, append to it */
+		chunk_appendbuf(buf, chunk->shm.start.ptr, chunk->len);
+	else {
+		/* Create a new buf chunk and insert it before this */
+		buf = chunk_newbuf(vtx, chunk->shm.start.ptr, chunk->len);
+		AN(buf);
+		VTAILQ_INSERT_BEFORE(chunk, buf, list);
+	}
+
+	/* Reset cursor chunk pointer, vslc_vtx_next will set it correctly */
+	vtx->c.chunk = NULL;
+
+	/* Remove from the shmref list and vtx, and put chunk back
+	   on the free list */
+	VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
+	VTAILQ_REMOVE(&vtx->chunks, chunk, list);
+	VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
+}
+
+/* Append a set of records to a vtx structure */
+static void
+vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
+    size_t len)
+{
+	struct chunk *chunk;
+
+	AN(vtx);
+	if (len == 0)
+		return;
+	AN(start);
+
+	if (VSL_Check(vslq->c, start) == 2 &&
+	    !VTAILQ_EMPTY(&vtx->shmchunks_free)) {
+		/* Shmref it */
+		chunk = VTAILQ_FIRST(&vtx->shmchunks_free);
+		CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
+		assert(chunk->type == chunk_t_shm);
+		assert(chunk->vtx == vtx);
+		VTAILQ_REMOVE(&vtx->shmchunks_free, chunk, list);
+		chunk->shm.start = *start;
+		chunk->len = len;
+		VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
+
+		/* Append to shmref list */
+		VTAILQ_INSERT_TAIL(&vslq->shmrefs, chunk, shm.shmref);
+	} else {
+		/* Buffer it */
+		chunk = VTAILQ_LAST(&vtx->chunks, chunkhead);
+		CHECK_OBJ_ORNULL(chunk, CHUNK_MAGIC);
+		if (chunk != NULL && chunk->type == chunk_t_buf) {
+			/* Tail is a buf chunk, append to that */
+			chunk_appendbuf(chunk, start->ptr, len);
+		} else {
+			/* Append new buf chunk */
+			chunk = chunk_newbuf(vtx, start->ptr, len);
+			AN(chunk);
+			VTAILQ_INSERT_TAIL(&vtx->chunks, chunk, list);
+		}
+	}
+	vtx->len += len;
+}
+
+/* Allocate a new vtx structure */
 static struct vtx *
 vtx_new(struct VSLQ *vslq)
 {
 	struct vtx *vtx;
+	int i;
 
 	AN(vslq);
 	if (vslq->n_cache) {
@@ -302,52 +461,48 @@ vtx_new(struct VSLQ *vslq)
 	} else {
 		ALLOC_OBJ(vtx, VTX_MAGIC);
 		AN(vtx);
+
+		VTAILQ_INIT(&vtx->child);
+		VTAILQ_INIT(&vtx->shmchunks_free);
+		for (i = 0; i < VTX_SHMCHUNKS; i++) {
+			vtx->shmchunks[i].magic = CHUNK_MAGIC;
+			vtx->shmchunks[i].type = chunk_t_shm;
+			vtx->shmchunks[i].vtx = vtx;
+			VTAILQ_INSERT_TAIL(&vtx->shmchunks_free,
+			    &vtx->shmchunks[i], list);
+		}
+		VTAILQ_INIT(&vtx->chunks);
+		VTAILQ_INIT(&vtx->synth);
 		vtx->c.magic = VSLC_VTX_MAGIC;
 		vtx->c.vtx = vtx;
 		vtx->c.cursor.priv_tbl = &vslc_vtx_tbl;
 		vtx->c.cursor.priv_data = &vtx->c;
 	}
 
+	CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
 	vtx->key.vxid = 0;
 	vtx->t_start = VTIM_mono();
 	vtx->flags = 0;
 	vtx->type = VSL_t_unknown;
 	vtx->parent = NULL;
-	VTAILQ_INIT(&vtx->child);
 	vtx->n_child = 0;
 	vtx->n_childready = 0;
 	vtx->n_descend = 0;
-	VTAILQ_INIT(&vtx->diag);
-	memset(vtx->chunk, 0, sizeof vtx->chunk);
-	vtx->n_chunk = 0;
 	vtx->len = 0;
 	(void)vslc_vtx_reset(&vtx->c.cursor);
 
-	VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_incomplete);
-	vslq->n_incomplete++;
-
 	return (vtx);
 }
 
-static void
-vtx_free(struct vtx **pvtx)
-{
-	struct vtx *vtx;
-
-	AN(pvtx);
-	vtx = *pvtx;
-	*pvtx = NULL;
-
-	free(vtx->buf);
-	FREE_OBJ(vtx);
-}
-
+/* Disuse a vtx and all it's children, freeing any resources held. Free or
+   cache the vtx for later use */
 static void
 vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
 {
 	struct vtx *vtx;
 	struct vtx *child;
-	struct vtx_diag *diag;
+	struct synth *synth;
+	struct chunk *chunk;
 
 	AN(vslq);
 	AN(pvtx);
@@ -374,177 +529,161 @@ vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
 	}
 	AZ(vtx->n_child);
 	AZ(vtx->n_descend);
+	vtx->n_childready = 0;
 	AN(VRB_REMOVE(vtx_tree, &vslq->tree, &vtx->key));
+	vtx->key.vxid = 0;
+	vtx->flags = 0;
 
-	while (!VTAILQ_EMPTY(&vtx->diag)) {
-		diag = VTAILQ_FIRST(&vtx->diag);
-		VTAILQ_REMOVE(&vtx->diag, diag, list);
-		FREE_OBJ(diag);
+	while (!VTAILQ_EMPTY(&vtx->synth)) {
+		synth = VTAILQ_FIRST(&vtx->synth);
+		CHECK_OBJ_NOTNULL(synth, SYNTH_MAGIC);
+		VTAILQ_REMOVE(&vtx->synth, synth, list);
+		FREE_OBJ(synth);
 	}
 
-	if (vtx->n_chunk)
-		VTAILQ_REMOVE(&vslq->shmlist, vtx, list_shm);
+	while (!VTAILQ_EMPTY(&vtx->chunks)) {
+		chunk = VTAILQ_FIRST(&vtx->chunks);
+		CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
+		VTAILQ_REMOVE(&vtx->chunks, chunk, list);
+		if (chunk->type == chunk_t_shm) {
+			VTAILQ_REMOVE(&vslq->shmrefs, chunk, shm.shmref);
+			VTAILQ_INSERT_HEAD(&vtx->shmchunks_free, chunk, list);
+		} else {
+			assert(chunk->type == chunk_t_buf);
+			chunk_freebuf(&chunk);
+			AZ(chunk);
+		}
+	}
+	vtx->len = 0;
 
 	if (vslq->n_cache < VTX_CACHE) {
 		VTAILQ_INSERT_HEAD(&vslq->cache, vtx, list_child);
 		vslq->n_cache++;
 	} else {
-		vtx_free(&vtx);
-		AZ(vtx);
+		FREE_OBJ(vtx);
+		vtx = NULL;
 	}
 }
 
+/* Lookup a vtx by vxid from the managed list */
 static struct vtx *
-vtx_lori(struct VSLQ *vslq, unsigned vxid)
+vtx_lookup(struct VSLQ *vslq, unsigned vxid)
 {
-	struct vtx *vtx;
 	struct vtx_key lkey, *key;
+	struct vtx *vtx;
 
 	AN(vslq);
 	lkey.vxid = vxid;
 	key = VRB_FIND(vtx_tree, &vslq->tree, &lkey);
-	if (key != NULL) {
-		CAST_OBJ_NOTNULL(vtx, (void *)key, VTX_MAGIC);
-		return (vtx);
-	}
-
-	vtx = vtx_new(vslq);
-	AN(vtx);
-	vtx->key.vxid = vxid;
-	AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
+	if (key == NULL)
+		return (NULL);
+	CAST_OBJ_NOTNULL(vtx, (void *)key, VTX_MAGIC);
 	return (vtx);
 }
 
-static void
-vtx_set_bufsize(struct vtx *vtx, ssize_t len)
-{
-
-	AN(vtx);
-	assert(len >= 0);
-	if (vtx->bufsize >= len)
-		return;
-	if (vtx->bufsize == 0)
-		vtx->bufsize = VTX_BUFSIZE_MIN;
-	while (vtx->bufsize < len)
-		vtx->bufsize *= 2;
-	vtx->buf = realloc(vtx->buf, sizeof (uint32_t) * vtx->bufsize);
-	AN(vtx->buf);
-}
-
-static void
-vtx_buffer(struct VSLQ *vslq, struct vtx *vtx)
-{
-	int i;
-
-	AN(vtx->n_chunk);
-	AN(vtx->len);
-
-	vtx_set_bufsize(vtx, vtx->len);
-	AN(vtx->buf);
-	assert(vtx->bufsize >= vtx->len);
-
-	for (i = 0; i < vtx->n_chunk; i++)
-		memcpy(vtx->buf + vtx->chunk[i].offset, vtx->chunk[i].start.ptr,
-		    sizeof (uint32_t) * vtx->chunk[i].len);
-
-	memset(vtx->chunk, 0, sizeof vtx->chunk);
-	VTAILQ_REMOVE(&vslq->shmlist, vtx, list_shm);
-	vtx->n_chunk = 0;
-}
-
-static void
-vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
-    ssize_t len, int copy)
+/* Insert a new vtx into the managed list */
+static struct vtx *
+vtx_add(struct VSLQ *vslq, unsigned vxid)
 {
+	struct vtx *vtx;
 
+	AN(vslq);
+	vtx = vtx_new(vslq);
 	AN(vtx);
-	if (len == 0)
-		return;
-	AN(start);
-
-	if (vtx->len > 0 && vtx->n_chunk == 0)
-		/* Can't mix buffer and shmptr */
-		copy = 1;
-
-	if (!copy && vtx->n_chunk < VTX_CHUNKS) {
-		/* Add shmptr chunk */
-		AZ(vtx->chunk[vtx->n_chunk].len);
-		vtx->chunk[vtx->n_chunk].start = *start;
-		vtx->chunk[vtx->n_chunk].len = len;
-		vtx->chunk[vtx->n_chunk].offset = vtx->len;
-		vtx->len += len;
-		if (vtx->n_chunk == 0)
-			VTAILQ_INSERT_TAIL(&vslq->shmlist, vtx, list_shm);
-		vtx->n_chunk++;
-		return;
-	}
-
-	/* Append to buffer */
-	vtx_set_bufsize(vtx, vtx->len + len);
-	if (vtx->n_chunk)
-		vtx_buffer(vslq, vtx);
-	AZ(vtx->n_chunk);
-	AN(vtx->buf);
-	memcpy(vtx->buf + vtx->len, start->ptr, sizeof (uint32_t) * len);
-	vtx->len += len;
+	vtx->key.vxid = vxid;
+	AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
+	VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_incomplete);
+	vslq->n_incomplete++;
+	return (vtx);
 }
 
+/* Mark a vtx complete, update child counters and if possible push it or
+   it's top parent to the ready state */
 static struct vtx *
-vtx_check_ready(struct VSLQ *vslq, struct vtx *vtx)
+vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
 {
-	struct vtx *ready;
 
 	AN(vslq);
-	AN(vtx->flags & VTX_F_COMPLETE);
-	AZ(vtx->flags & VTX_F_READY);
+	AN(vtx->flags & VTX_F_END);
+	AZ(vtx->flags & VTX_F_COMPLETE);
 
 	if (vtx->type == VSL_t_unknown)
 		vtx_diag(vtx, "vtx of unknown type marked complete");
 
-	ready = vtx;
+	vtx->flags |= VTX_F_COMPLETE;
+	VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
+	AN(vslq->n_incomplete);
+	vslq->n_incomplete--;
+
 	while (1) {
-		if (ready->flags & VTX_F_COMPLETE &&
-		    ready->n_child == ready->n_childready)
-			ready->flags |= VTX_F_READY;
+		AZ(vtx->flags & VTX_F_READY);
+		if (vtx->flags & VTX_F_COMPLETE &&
+		    vtx->n_child == vtx->n_childready)
+			vtx->flags |= VTX_F_READY;
 		else
-			break;
-		if (ready->parent == NULL)
-			break;
-		ready = ready->parent;
-		ready->n_childready++;
-		assert(ready->n_child >= ready->n_childready);
+			return (NULL);
+		if (vtx->parent == NULL) {
+			/* Top level vtx ready */
+			return (vtx);
+		}
+		vtx = vtx->parent;
+		vtx->n_childready++;
+		assert(vtx->n_child >= vtx->n_childready);
 	}
 
-	if (ready->flags & VTX_F_READY && ready->parent == NULL)
-		/* Top level vtx ready */
-		return (ready);
+	if (vtx->flags & VTX_F_READY && vtx->parent == NULL)
+		return (vtx);
 
 	return (NULL);
 }
 
+/* Add a child to a parent, and update child counters */
+static void
+vtx_set_parent(struct vtx *parent, struct vtx *child)
+{
+
+	CHECK_OBJ_NOTNULL(parent, VTX_MAGIC);
+	CHECK_OBJ_NOTNULL(child, VTX_MAGIC);
+	AZ(parent->flags & VTX_F_COMPLETE);
+	AZ(child->flags & VTX_F_COMPLETE);
+	AZ(child->parent);
+	child->parent = parent;
+	VTAILQ_INSERT_TAIL(&parent->child, child, list_child);
+	parent->n_child++;
+	do
+		parent->n_descend += 1 + child->n_descend;
+	while ((parent = parent->parent));
+}
+
+/* Parse a begin or link record. Returns the number of elements that was
+   successfully parsed. */
 static int
-vtx_parsetag_bl(const char *str, enum VSL_transaction_e *ptype,
+vtx_parse_beginlink(const char *str, enum VSL_transaction_e *ptype,
     unsigned *pvxid)
 {
-	char buf[7];
+	char buf[8];
 	unsigned vxid;
-	int i;
+	int i, j;
 	enum VSL_transaction_e type = VSL_t_unknown;
 
 	AN(str);
-	i = sscanf(str, "%6s %u", buf, &vxid);
+	i = sscanf(str, "%7s %u", buf, &vxid);
 	if (i < 1)
 		return (-1);
-	if (!strcmp(buf, "sess"))
-		type = VSL_t_sess;
-	else if (!strcmp(buf, "req"))
-		type = VSL_t_req;
-	else if (!strcmp(buf, "esireq"))
-		type = VSL_t_esireq;
-	else if (!strcmp(buf, "bereq"))
-		type = VSL_t_bereq;
-	else
+	for (j = 0; j < VSL_t__MAX; j++)
+		if (!strcmp(buf, vsl_t_names[j]))
+			break;
+	switch (j) {
+	case VSL_t_sess:
+	case VSL_t_req:
+	case VSL_t_esireq:
+	case VSL_t_bereq:
+		/* Valid types */
+		type = j;
+		break;
+	default:
 		return (-1);
+	}
 	if (i == 1)
 		vxid = 0;
 	if (ptype)
@@ -554,23 +693,9 @@ vtx_parsetag_bl(const char *str, enum VSL_transaction_e *ptype,
 	return (i);
 }
 
-static void
-vtx_set_parent(struct vtx *parent, struct vtx *child)
-{
-
-	AN(parent);
-	AN(child);
-	AZ(child->parent);
-	child->parent = parent;
-	VTAILQ_INSERT_TAIL(&parent->child, child, list_child);
-	parent->n_child++;
-	do
-		parent->n_descend += 1 + child->n_descend;
-	while ((parent = parent->parent));
-}
-
+/* Parse and process a begin record */
 static int
-vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
+vtx_scan_begin(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
 {
 	int i;
 	enum VSL_transaction_e type;
@@ -579,17 +704,19 @@ vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
 
 	assert(VSL_TAG(ptr) == SLT_Begin);
 
-	if (vtx->flags & VTX_F_READY)
-		return (vtx_diag_tag(vtx, ptr, "link too late"));
+	AZ(vtx->flags & VTX_F_READY);
 
-	i = vtx_parsetag_bl(VSL_CDATA(ptr), &type, &p_vxid);
+	i = vtx_parse_beginlink(VSL_CDATA(ptr), &type, &p_vxid);
 	if (i < 1)
 		return (vtx_diag_tag(vtx, ptr, "parse error"));
 
 	/* Check/set vtx type */
 	assert(type != VSL_t_unknown);
-	if (vtx->type != VSL_t_unknown && vtx->type != type)
+	if (vtx->type != VSL_t_unknown && vtx->type != type) {
+		/* Type not matching the one previously set by a link
+		   record */
 		return (vtx_diag_tag(vtx, ptr, "type mismatch"));
+	}
 	vtx->type = type;
 
 	if (i == 1 || p_vxid == 0)
@@ -600,25 +727,38 @@ vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
 	if (vslq->grouping == VSL_g_request && vtx->type == VSL_t_req)
 		return (0);	/* No links */
 
-	/* Lookup and check parent vtx */
-	p_vtx = vtx_lori(vslq, p_vxid);
-	AN(p_vtx);
-	if (vtx->parent == p_vtx)
-		/* Link already exists */
+	if (vtx->parent != NULL) {
+		if (vtx->parent->key.vxid != p_vxid) {
+			/* This vtx already belongs to a different
+			   parent */
+			return (vtx_diag_tag(vtx, ptr, "link mismatch"));
+		} else
+			/* Link already exists */
+			return (0);
+	}
+
+	p_vtx = vtx_lookup(vslq, p_vxid);
+	if (p_vtx == NULL) {
+		/* Not seen parent yet. Insert it and create link. */
+		p_vtx = vtx_add(vslq, p_vxid);
+		AN(p_vtx);
+		vtx_set_parent(p_vtx, vtx);
 		return (0);
+	}
 
-	if (vtx->parent != NULL)
-		return (vtx_diag_tag(vtx, ptr, "duplicate link"));
-	if (p_vtx->flags & VTX_F_READY)
+	CHECK_OBJ_NOTNULL(p_vtx, VTX_MAGIC);
+	if (p_vtx->flags & VTX_F_COMPLETE)
 		return (vtx_diag_tag(vtx, ptr, "link too late"));
 
+	/* Create link */
 	vtx_set_parent(p_vtx, vtx);
 
 	return (0);
 }
 
+/* Parse and process a link record */
 static int
-vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
+vtx_scan_link(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
 {
 	int i;
 	enum VSL_transaction_e c_type;
@@ -627,13 +767,13 @@ vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
 
 	assert(VSL_TAG(ptr) == SLT_Link);
 
-	if (vtx->flags & VTX_F_READY)
-		return (vtx_diag_tag(vtx, ptr, "link too late"));
+	AZ(vtx->flags & VTX_F_READY);
 
-	i = vtx_parsetag_bl(VSL_CDATA(ptr), &c_type, &c_vxid);
+	i = vtx_parse_beginlink(VSL_CDATA(ptr), &c_type, &c_vxid);
 	if (i < 2)
 		return (vtx_diag_tag(vtx, ptr, "parse error"));
 	assert(i == 2);
+	assert(c_type != VSL_t_unknown);
 
 	if (vslq->grouping == VSL_g_vxid)
 		return (0);	/* No links */
@@ -641,88 +781,92 @@ vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
 		return (0);	/* No links */
 
 	/* Lookup and check child vtx */
-	c_vtx = vtx_lori(vslq, c_vxid);
-	AN(c_vtx);
+	c_vtx = vtx_lookup(vslq, c_vxid);
+	if (c_vtx == NULL) {
+		/* Child not seen before. Insert it and create link */
+		c_vtx = vtx_add(vslq, c_vxid);
+		AN(c_vtx);
+		c_vtx->type = c_type;
+		vtx_set_parent(vtx, c_vtx);
+		return (0);
+	}
+
+	CHECK_OBJ_NOTNULL(c_vtx, VTX_MAGIC);
 	if (c_vtx->parent == vtx)
 		/* Link already exists */
 		return (0);
-	if (c_vtx->parent != NULL)
+	if (c_vtx->parent != vtx)
 		return (vtx_diag_tag(vtx, ptr, "duplicate link"));
-	if (c_vtx->flags & VTX_F_READY)
+	if (c_vtx->flags & VTX_F_COMPLETE)
 		return (vtx_diag_tag(vtx, ptr, "link too late"));
 	if (c_vtx->type != VSL_t_unknown && c_vtx->type != c_type)
 		return (vtx_diag_tag(vtx, ptr, "type mismatch"));
-	c_vtx->type = c_type;
 
+	c_vtx->type = c_type;
 	vtx_set_parent(vtx, c_vtx);
-
 	return (0);
 }
 
+/* Scan the records of a vtx, performing processing actions on specific
+   records */
 static struct vtx *
 vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
 {
 	const uint32_t *ptr;
 	enum VSL_tag_e tag;
-	struct vtx *ret = NULL;
+
+	if (vtx->flags & VTX_F_END)
+		return (NULL);
 
 	while (vslc_vtx_next(&vtx->c.cursor) == 1) {
 		ptr = vtx->c.cursor.rec.ptr;
 		tag = VSL_TAG(ptr);
-
-		if (tag == SLT__Batch || tag == SLT_VSL)
-			continue;
-
-		if (vtx->flags & VTX_F_COMPLETE) {
-			vtx_diag_tag(vtx, ptr, "late log rec");
-			continue;
-		}
-
-		if (vtx->type == VSL_t_unknown && tag != SLT_Begin)
-			vtx_diag_tag(vtx, ptr, "early log rec");
+		assert(tag != SLT__Batch);
 
 		switch (tag) {
 		case SLT_Begin:
-			(void)vtx_scan_begintag(vslq, vtx, ptr);
+			(void)vtx_scan_begin(vslq, vtx, ptr);
+			vtx->flags |= VTX_F_BEGIN;
 			break;
 
 		case SLT_Link:
-			(void)vtx_scan_linktag(vslq, vtx, ptr);
+			(void)vtx_scan_link(vslq, vtx, ptr);
 			break;
 
 		case SLT_End:
-			AZ(vtx->flags & VTX_F_COMPLETE);
-			AZ(ret);
-			VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
-			vtx->flags |= VTX_F_COMPLETE;
-			AN(vslq->n_incomplete);
-			vslq->n_incomplete--;
-			ret = vtx_check_ready(vslq, vtx);
-			break;
+			vtx->flags |= VTX_F_END;
+			return (vtx_mark_complete(vslq, vtx));
 
 		default:
 			break;
 		}
 	}
 
-	return (ret);
+	return (NULL);
 }
 
+/* Force a vtx into complete status by synthing the necessary outstanding
+   records */
 static struct vtx *
 vtx_force(struct VSLQ *vslq, struct vtx *vtx, const char *reason)
 {
+	struct vtx *ret;
+
 	AZ(vtx->flags & VTX_F_COMPLETE);
 	AZ(vtx->flags & VTX_F_READY);
+	if (!(vtx->flags & VTX_F_BEGIN))
+		vtx_synth_rec(vtx, SLT_Begin, "%s %u synth",
+		    vsl_t_names[vtx->type], vtx->key.vxid);
 	vtx_diag(vtx, reason);
-
-	VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
-	vtx->flags |= VTX_F_COMPLETE;
-	AN(vslq->n_incomplete);
-	vslq->n_incomplete--;
-
-	return (vtx_check_ready(vslq, vtx));
+	if (!(vtx->flags & VTX_F_END))
+		vtx_synth_rec(vtx, SLT_End, "synth");
+	ret = vtx_scan(vslq, vtx);
+	AN(vtx->flags & VTX_F_COMPLETE);
+	return (ret);
 }
 
+/* Build transaction array, do the query and callback. Returns 0 or the
+   return value from func */
 static int
 vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
     void *priv)
@@ -735,6 +879,7 @@ vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
 
 	AN(vslq);
 	CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
+	AN(vtx->flags & VTX_F_READY);
 
 	if (func == NULL)
 		return (0);
@@ -784,19 +929,21 @@ vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
 	return ((func)(vslq->vsl, ptrans, priv));
 }
 
-static int
-vtx_diag(struct vtx *vtx, const char *fmt, ...)
+/* Create a synthetic log record. The record will be inserted at the
+   current cursor offset */
+static void
+vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...)
 {
-	struct vtx_diag *diag;
+	struct synth *synth, *it;
 	va_list ap;
 	char *buf;
 	int l, buflen;
 
-	ALLOC_OBJ(diag, VTX_DIAG_MAGIC);
-	AN(diag);
+	ALLOC_OBJ(synth, SYNTH_MAGIC);
+	AN(synth);
 
-	buf = (char *)&diag->chunk[2];
-	buflen = sizeof (diag->chunk) - 2 * sizeof (uint32_t);
+	buf = (char *)&synth->data[2];
+	buflen = sizeof (synth->data) - 2 * sizeof (uint32_t);
 	va_start(ap, fmt);
 	l = vsnprintf(buf, buflen, fmt, ap);
 	assert(l >= 0);
@@ -804,29 +951,56 @@ vtx_diag(struct vtx *vtx, const char *fmt, ...)
 	if (l > buflen - 1)
 		l = buflen - 1;
 	buf[l++] = '\0';	/* NUL-terminated */
-	diag->chunk[1] = vtx->key.vxid;
+	synth->data[1] = vtx->key.vxid;
 	switch (vtx->type) {
 	case VSL_t_req:
 	case VSL_t_esireq:
-		diag->chunk[1] |= VSL_CLIENTMARKER;
+		synth->data[1] |= VSL_CLIENTMARKER;
 		break;
 	case VSL_t_bereq:
-		diag->chunk[1] |= VSL_BACKENDMARKER;
+		synth->data[1] |= VSL_BACKENDMARKER;
 		break;
 	default:
 		break;
 	}
-	diag->chunk[0] = ((((unsigned)SLT_VSL & 0xff) << 24) | l);
-	VTAILQ_INSERT_TAIL(&vtx->diag, diag, list);
+	synth->data[0] = (((tag & 0xff) << 24) | l);
+	synth->offset = vtx->c.offset;
+
+	VTAILQ_FOREACH_REVERSE(it, &vtx->synth, synthhead, list) {
+		/* Make sure the synth list is sorted on offset */
+		CHECK_OBJ_NOTNULL(it, SYNTH_MAGIC);
+		if (synth->offset >= it->offset)
+			break;
+	}
+	if (it != NULL)
+		VTAILQ_INSERT_AFTER(&vtx->synth, it, synth, list);
+	else
+		VTAILQ_INSERT_HEAD(&vtx->synth, synth, list);
+
+	/* Update cursor */
+	CHECK_OBJ_ORNULL(vtx->c.synth, SYNTH_MAGIC);
+	if (vtx->c.synth == NULL || vtx->c.synth->offset > synth->offset)
+		vtx->c.synth = synth;
+}
+
+/* Add a diagnostic SLT_VSL synth record to the vtx. */
+static int
+vtx_diag(struct vtx *vtx, const char *msg)
+{
 
+	vtx_synth_rec(vtx, SLT_VSL, msg);
 	return (-1);
 }
 
+/* Add a SLT_VSL diag synth record to the vtx. Takes an offending record
+   that will be included in the log record */
 static int
 vtx_diag_tag(struct vtx *vtx, const uint32_t *ptr, const char *reason)
 {
-	return (vtx_diag(vtx, "%s (%s: %.*s)", reason,
-		VSL_tags[VSL_TAG(ptr)],	(int)VSL_LEN(ptr), VSL_CDATA(ptr)));
+
+	vtx_synth_rec(vtx, SLT_VSL, "%s (%s: %.*s)", reason,
+	    VSL_tags[VSL_TAG(ptr)], (int)VSL_LEN(ptr), VSL_CDATA(ptr));
+	return (-1);
 }
 
 struct VSLQ *
@@ -858,7 +1032,7 @@ VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
 	vslq->query = query;
 	VRB_INIT(&vslq->tree);
 	VTAILQ_INIT(&vslq->incomplete);
-	VTAILQ_INIT(&vslq->shmlist);
+	VTAILQ_INIT(&vslq->shmrefs);
 	VTAILQ_INIT(&vslq->cache);
 
 	return (vslq);
@@ -889,13 +1063,14 @@ VSLQ_Delete(struct VSLQ **pvslq)
 		vtx = VTAILQ_FIRST(&vslq->cache);
 		VTAILQ_REMOVE(&vslq->cache, vtx, list_child);
 		vslq->n_cache--;
-		vtx_free(&vtx);
-		AZ(vtx);
+		FREE_OBJ(vtx);
 	}
 
 	FREE_OBJ(vslq);
 }
 
+/* Regard each log line as a single transaction, feed it through the query
+   and do the callback */
 static int
 vslq_raw(const struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
 {
@@ -944,11 +1119,45 @@ vslq_raw(const struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
 	return (i);
 }
 
+/* Check the beginning of the shmref list, and buffer refs that are at
+ * warning level.
+ *
+ * Returns:
+ *    0:	OK
+ *   -3:	Failure
+ */
+static int
+vslq_shmref_check(struct VSLQ *vslq)
+{
+	struct chunk *chunk;
+	int i;
+
+	while ((chunk = VTAILQ_FIRST(&vslq->shmrefs))) {
+		CHECK_OBJ_NOTNULL(chunk, CHUNK_MAGIC);
+		assert(chunk->type == chunk_t_shm);
+		i = VSL_Check(vslq->c, &chunk->shm.start);
+		if (i == 2)
+			/* First on list is OK, refs behind it must also
+			   be OK */
+			return (0);
+		else if (i == 1)
+			/* Warning level. Buffer this chunk */
+			chunk_shm_to_buf(vslq, chunk);
+		else
+			/* Too late to buffer */
+			return (-3);
+	}
+
+	return (0);
+}
+
+/* Process the input cursor, calling the callback function on matching
+   transaction sets */
 int
 VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
 {
 	struct VSL_cursor *c;
-	int i;
+	int i, batch;
 	enum VSL_tag_e tag;
 	ssize_t len;
 	unsigned vxid;
@@ -962,39 +1171,45 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
 
 	c = vslq->c;
 	while (1) {
-		/* Check shmlist and buffer on warning */
-		while ((vtx = VTAILQ_FIRST(&vslq->shmlist))) {
-			AN(vtx->n_chunk);
-			i = VSL_Check(c, &vtx->chunk[0].start);
-			if (i == 2)
-				break;
-			else if (i == 1)
-				vtx_buffer(vslq, vtx);
-			else
-				/* Too late */
-				return (-3);
-		}
+		/* Check shmref list */
+		i = vslq_shmref_check(vslq);
+		if (i)
+			break;
 
 		i = VSL_Next(c);
 		if (i != 1)
 			break;
 		tag = VSL_TAG(c->rec.ptr);
 		if (tag == SLT__Batch) {
+			batch = 1;
 			vxid = VSL_BATCHID(c->rec.ptr);
 			len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
 			    c->rec.ptr;
+			if (len == 0)
+				continue;
+			tag = VSL_TAG(VSL_NEXT(c->rec.ptr));
 		} else {
+			batch = 0;
 			vxid = VSL_ID(c->rec.ptr);
 			len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
 		}
+
+		assert(len > 0);
 		if (vxid == 0)
+			/* Skip non-transactional records */
 			continue;
-		vtx = vtx_lori(vslq, vxid);
-		AN(vtx);
-		vtx_append(vslq, vtx, &c->rec, len, VSL_Check(c, &c->rec) != 2);
-		if (tag == SLT__Batch)
+
+		vtx = vtx_lookup(vslq, vxid);
+		if (vtx == NULL && tag == SLT_Begin) {
+			vtx = vtx_add(vslq, vxid);
+			AN(vtx);
+		}
+		if (vtx != NULL) {
+			vtx_append(vslq, vtx, &c->rec, len);
+			vtx = vtx_scan(vslq, vtx);
+		}
+		if (batch)
 			AZ(vsl_skip(c, VSL_WORDS(VSL_BATCHLEN(c->rec.ptr))));
-		vtx = vtx_scan(vslq, vtx);
 		if (vtx) {
 			AN(vtx->flags & VTX_F_READY);
 			i = vslq_callback(vslq, vtx, func, priv);
@@ -1042,6 +1257,8 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
 	return (i);
 }
 
+/* Flush incomplete any incomplete vtx held on to. Do callbacks if func !=
+   NULL */
 int
 VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
 {



More information about the varnish-commit mailing list