[master] 53ee8e5 VSLQ_Dispatch now returns sooner.

Martin Blix Grydeland martin at varnish-cache.org
Wed Oct 9 16:03:03 CEST 2013


commit 53ee8e5c6baa2c4cfb6b9c948c209cebc2cfa77c
Author: Martin Blix Grydeland <martin at varnish-software.com>
Date:   Wed Oct 9 12:53:42 2013 +0200

    VSLQ_Dispatch now returns sooner.
    
    Change VSLQ_Dispatch to always return after processing one record or
    one batch of records. It will return 1 if it should be called again
    without sleeping.
    
    This fixes the situation where a calling program is lagging behind,
    and VSLQ_Dispatch never returns because there is a constant stream of
    new log records.

diff --git a/include/vapi/vsl.h b/include/vapi/vsl.h
index cee9307..f88e20f 100644
--- a/include/vapi/vsl.h
+++ b/include/vapi/vsl.h
@@ -423,8 +423,9 @@ int VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv);
 	 *  priv: An argument passed to func
 	 *
 	 * Return values:
+	 *     1: Call again
 	 *     0: No more log records available
-	 *   !=0: The return value from either VSL_Next() or func
+	 *   !=0: The error code from VSL_Next() or func returned non-zero
 	 */
 
 int VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv);
diff --git a/lib/libvarnishapi/vsl_dispatch.c b/lib/libvarnishapi/vsl_dispatch.c
index a47d018..6912943 100644
--- a/lib/libvarnishapi/vsl_dispatch.c
+++ b/lib/libvarnishapi/vsl_dispatch.c
@@ -65,9 +65,7 @@ struct vslc_raw {
 
 	struct VSL_cursor	cursor;
 
-	const uint32_t		*start;
-	ssize_t			len;
-	const uint32_t		*next;
+	const uint32_t		*ptr;
 };
 
 struct synth {
@@ -130,7 +128,7 @@ struct vtx {
 	unsigned		magic;
 #define VTX_MAGIC		0xACC21D09
 	VTAILQ_ENTRY(vtx)	list_child;
-	VTAILQ_ENTRY(vtx)	list_incomplete;
+	VTAILQ_ENTRY(vtx)	list_vtx;
 
 	double			t_start;
 	unsigned		flags;
@@ -170,15 +168,24 @@ struct VSLQ {
 
 	enum VSL_grouping_e	grouping;
 
+	/* Structured mode */
 	struct vtx_tree		tree;
-
+	VTAILQ_HEAD(,vtx)	ready;
 	VTAILQ_HEAD(,vtx)	incomplete;
 	unsigned		n_incomplete;
-
 	struct chunkhead	shmrefs;
-
 	VTAILQ_HEAD(,vtx)	cache;
 	unsigned		n_cache;
+
+	/* Raw mode */
+	struct {
+		struct vslc_raw		c;
+		struct VSL_transaction	trans;
+		struct VSL_transaction	*ptrans[2];
+		struct VSLC_ptr		start;
+		ssize_t			len;
+		size_t			offset;
+	} raw;
 };
 
 static void vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...);
@@ -209,14 +216,14 @@ vslc_raw_next(struct VSL_cursor *cursor)
 	CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
 	assert(&c->cursor == cursor);
 
-	assert(c->next >= c->start);
-	assert(c->next <= c->start + c->len);
-	if (c->next < c->start + c->len) {
-		c->cursor.rec.ptr = c->next;
-		c->next = VSL_NEXT(c->next);
+	AN(c->ptr);
+	if (c->cursor.rec.ptr == NULL) {
+		c->cursor.rec.ptr = c->ptr;
 		return (1);
+	} else {
+		c->cursor.rec.ptr = NULL;
+		return (0);
 	}
-	return (0);
 }
 
 static int
@@ -227,9 +234,7 @@ vslc_raw_reset(struct VSL_cursor *cursor)
 	CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
 	assert(&c->cursor == cursor);
 
-	assert(c->next >= c->start);
-	assert(c->next <= c->start + c->len);
-	c->next = c->start;
+	AN(c->ptr);
 	c->cursor.rec.ptr = NULL;
 
 	return (0);
@@ -592,14 +597,14 @@ vtx_add(struct VSLQ *vslq, unsigned vxid)
 	AN(vtx);
 	vtx->key.vxid = vxid;
 	AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
-	VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_incomplete);
+	VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_vtx);
 	vslq->n_incomplete++;
 	return (vtx);
 }
 
 /* Mark a vtx complete, update child counters and if possible push it or
    it's top parent to the ready state */
-static struct vtx *
+static void
 vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
 {
 
@@ -611,7 +616,7 @@ vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
 		vtx_diag(vtx, "vtx of unknown type marked complete");
 
 	vtx->flags |= VTX_F_COMPLETE;
-	VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
+	VTAILQ_REMOVE(&vslq->incomplete, vtx, list_vtx);
 	AN(vslq->n_incomplete);
 	vslq->n_incomplete--;
 
@@ -621,20 +626,16 @@ vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
 		    vtx->n_child == vtx->n_childready)
 			vtx->flags |= VTX_F_READY;
 		else
-			return (NULL);
+			return;
 		if (vtx->parent == NULL) {
 			/* Top level vtx ready */
-			return (vtx);
+			VTAILQ_INSERT_TAIL(&vslq->ready, vtx, list_vtx);
+			return;
 		}
 		vtx = vtx->parent;
 		vtx->n_childready++;
 		assert(vtx->n_child >= vtx->n_childready);
 	}
-
-	if (vtx->flags & VTX_F_READY && vtx->parent == NULL)
-		return (vtx);
-
-	return (NULL);
 }
 
 /* Add a child to a parent, and update child counters */
@@ -809,14 +810,14 @@ vtx_scan_link(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
 
 /* Scan the records of a vtx, performing processing actions on specific
    records */
-static struct vtx *
+static void
 vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
 {
 	const uint32_t *ptr;
 	enum VSL_tag_e tag;
 
 	if (vtx->flags & VTX_F_END)
-		return (NULL);
+		return;
 
 	while (vslc_vtx_next(&vtx->c.cursor) == 1) {
 		ptr = vtx->c.cursor.rec.ptr;
@@ -841,28 +842,25 @@ vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
 			break;
 		}
 	}
-
-	return (NULL);
 }
 
 /* Force a vtx into complete status by synthing the necessary outstanding
    records */
-static struct vtx *
+static void
 vtx_force(struct VSLQ *vslq, struct vtx *vtx, const char *reason)
 {
-	struct vtx *ret;
 
 	AZ(vtx->flags & VTX_F_COMPLETE);
 	AZ(vtx->flags & VTX_F_READY);
+	vtx_scan(vslq, vtx);
 	if (!(vtx->flags & VTX_F_BEGIN))
 		vtx_synth_rec(vtx, SLT_Begin, "%s %u synth",
 		    vsl_t_names[vtx->type], vtx->key.vxid);
 	vtx_diag(vtx, reason);
 	if (!(vtx->flags & VTX_F_END))
 		vtx_synth_rec(vtx, SLT_End, "synth");
-	ret = vtx_scan(vslq, vtx);
+	vtx_scan(vslq, vtx);
 	AN(vtx->flags & VTX_F_COMPLETE);
-	return (ret);
 }
 
 /* Build transaction array, do the query and callback. Returns 0 or the
@@ -880,9 +878,8 @@ vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
 	AN(vslq);
 	CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
 	AN(vtx->flags & VTX_F_READY);
+	AN(func);
 
-	if (func == NULL)
-		return (0);
 	if (vslq->grouping == VSL_g_session &&
 	    vtx->type != VSL_t_sess)
 		return (0);
@@ -1030,11 +1027,24 @@ VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
 	*cp = NULL;
 	vslq->grouping = grouping;
 	vslq->query = query;
+
+	/* Setup normal mode */
 	VRB_INIT(&vslq->tree);
+	VTAILQ_INIT(&vslq->ready);
 	VTAILQ_INIT(&vslq->incomplete);
 	VTAILQ_INIT(&vslq->shmrefs);
 	VTAILQ_INIT(&vslq->cache);
 
+	/* Setup raw mode */
+	vslq->raw.c.magic = VSLC_RAW_MAGIC;
+	vslq->raw.c.cursor.priv_tbl = &vslc_raw_tbl;
+	vslq->raw.c.cursor.priv_data = &vslq->raw.c;
+	vslq->raw.trans.level = 0;
+	vslq->raw.trans.type = VSL_t_raw;
+	vslq->raw.trans.c = &vslq->raw.c.cursor;
+	vslq->raw.ptrans[0] = &vslq->raw.trans;
+	vslq->raw.ptrans[1] = NULL;
+
 	return (vslq);
 }
 
@@ -1051,6 +1061,15 @@ VSLQ_Delete(struct VSLQ **pvslq)
 
 	(void)VSLQ_Flush(vslq, NULL, NULL);
 	AZ(vslq->n_incomplete);
+
+	while (!VTAILQ_EMPTY(&vslq->ready)) {
+		vtx = VTAILQ_FIRST(&vslq->ready);
+		CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
+		VTAILQ_REMOVE(&vslq->ready, vtx, list_vtx);
+		AN(vtx->flags & VTX_F_READY);
+		vtx_retire(vslq, &vtx);
+	}
+
 	VSL_DeleteCursor(vslq->c);
 	vslq->c = NULL;
 
@@ -1072,49 +1091,43 @@ VSLQ_Delete(struct VSLQ **pvslq)
 /* Regard each log line as a single transaction, feed it through the query
    and do the callback */
 static int
-vslq_raw(const struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
+vslq_raw(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
 {
-	struct vslc_raw rawc;
-	struct VSL_transaction trans;
-	struct VSL_transaction *ptrans[2];
-	struct VSL_cursor *c;
-	int i;
+	int i = 1;
+	int r;
 
 	assert(vslq->grouping == VSL_g_raw);
-	c = vslq->c;
 
-	memset(&rawc, 0, sizeof rawc);
-	rawc.magic = VSLC_RAW_MAGIC;
-	rawc.cursor.priv_tbl = &vslc_raw_tbl;
-	rawc.cursor.priv_data = &rawc;
-	trans.level = 0;
-	trans.type = VSL_t_raw;
-	trans.c = &rawc.cursor;
-	ptrans[0] = &trans;
-	ptrans[1] = NULL;
+	assert(vslq->raw.offset <= vslq->raw.len);
+	do {
+		if (vslq->raw.offset == vslq->raw.len) {
+			i = VSL_Next(vslq->c);
+			if (i <= 0)
+				return (i);
+			AN(vslq->c->rec.ptr);
+			vslq->raw.start = vslq->c->rec;
+			vslq->raw.len = VSL_NEXT(vslq->raw.start.ptr) -
+			    vslq->raw.start.ptr;
+			assert(vslq->raw.len > 0);
+			vslq->raw.offset = 0;
+		}
 
-	while (1) {
-		i = VSL_Next(c);
-		if (i <= 0)
-			break;
-		AN(c->rec.ptr);
-		if (func == NULL)
-			continue;
-		rawc.start = c->rec.ptr;
-		rawc.len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
-		rawc.next = rawc.start;
-		rawc.cursor.rec.ptr = NULL;
-		trans.vxid = VSL_ID(c->rec.ptr);
-
-		/* Query check goes here */
-		if (vslq->query != NULL && !vslq_runquery(vslq->query, ptrans))
-			continue;
-
-		/* Callback */
-		i = (func)(vslq->vsl, ptrans, priv);
-		if (i)
-			break;
-	}
+		vslq->raw.c.ptr = vslq->raw.start.ptr + vslq->raw.offset;
+		vslq->raw.c.cursor.rec.ptr = NULL;
+		vslq->raw.trans.vxid = VSL_ID(vslq->raw.c.ptr);
+		vslq->raw.offset += VSL_NEXT(vslq->raw.c.ptr) - vslq->raw.c.ptr;
+	} while (VSL_TAG(vslq->raw.c.ptr) == SLT__Batch);
+
+	if (func == NULL)
+		return (i);
+
+	if (vslq->query != NULL &&
+	    !vslq_runquery(vslq->query, vslq->raw.ptrans))
+		return (i);
+
+	r = (func)(vslq->vsl, vslq->raw.ptrans, priv);
+	if (r)
+		return (r);
 
 	return (i);
 }
@@ -1151,10 +1164,9 @@ vslq_shmref_check(struct VSLQ *vslq)
 	return (0);
 }
 
-/* Process the input cursor, calling the callback function on matching
-   transaction sets */
-int
-VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
+/* Process next input record */
+static int
+vslq_next(struct VSLQ *vslq)
 {
 	struct VSL_cursor *c;
 	int i, batch;
@@ -1162,98 +1174,120 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
 	ssize_t len;
 	unsigned vxid;
 	struct vtx *vtx;
-	double now;
 
-	CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
+	c = vslq->c;
+	i = VSL_Next(c);
+	if (i != 1)
+		return (i);
 
-	if (vslq->grouping == VSL_g_raw)
-		return (vslq_raw(vslq, func, priv));
+	tag = VSL_TAG(c->rec.ptr);
+	if (tag == SLT__Batch) {
+		batch = 1;
+		vxid = VSL_BATCHID(c->rec.ptr);
+		len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
+		    c->rec.ptr;
+		if (len == 0)
+			return (i);
+		tag = VSL_TAG(VSL_NEXT(c->rec.ptr));
+	} else {
+		batch = 0;
+		vxid = VSL_ID(c->rec.ptr);
+		len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
+	}
+	assert(len > 0);
+	if (vxid == 0)
+		/* Skip non-transactional records */
+		return (i);
 
-	c = vslq->c;
-	while (1) {
-		/* Check shmref list */
-		i = vslq_shmref_check(vslq);
-		if (i)
-			break;
+	vtx = vtx_lookup(vslq, vxid);
+	if (vtx == NULL && tag == SLT_Begin) {
+		vtx = vtx_add(vslq, vxid);
+		AN(vtx);
+	}
+	if (vtx != NULL) {
+		vtx_append(vslq, vtx, &c->rec, len);
+		vtx_scan(vslq, vtx);
+	}
+	if (batch)
+		AZ(vsl_skip(c, VSL_WORDS(VSL_BATCHLEN(c->rec.ptr))));
 
-		i = VSL_Next(c);
-		if (i != 1)
-			break;
-		tag = VSL_TAG(c->rec.ptr);
-		if (tag == SLT__Batch) {
-			batch = 1;
-			vxid = VSL_BATCHID(c->rec.ptr);
-			len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
-			    c->rec.ptr;
-			if (len == 0)
-				continue;
-			tag = VSL_TAG(VSL_NEXT(c->rec.ptr));
-		} else {
-			batch = 0;
-			vxid = VSL_ID(c->rec.ptr);
-			len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
-		}
+	return (i);
+}
 
-		assert(len > 0);
-		if (vxid == 0)
-			/* Skip non-transactional records */
-			continue;
+/* Test query and report any ready transactions */
+static int
+vslq_process_ready(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
+{
+	struct vtx *vtx;
+	int i = 0;
 
-		vtx = vtx_lookup(vslq, vxid);
-		if (vtx == NULL && tag == SLT_Begin) {
-			vtx = vtx_add(vslq, vxid);
-			AN(vtx);
-		}
-		if (vtx != NULL) {
-			vtx_append(vslq, vtx, &c->rec, len);
-			vtx = vtx_scan(vslq, vtx);
-		}
-		if (batch)
-			AZ(vsl_skip(c, VSL_WORDS(VSL_BATCHLEN(c->rec.ptr))));
-		if (vtx) {
-			AN(vtx->flags & VTX_F_READY);
+	AN(vslq);
+
+	while (!VTAILQ_EMPTY(&vslq->ready)) {
+		vtx = VTAILQ_FIRST(&vslq->ready);
+		CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
+		VTAILQ_REMOVE(&vslq->ready, vtx, list_vtx);
+		AN(vtx->flags & VTX_F_READY);
+		if (func != NULL)
 			i = vslq_callback(vslq, vtx, func, priv);
-			vtx_retire(vslq, &vtx);
-			AZ(vtx);
-			if (i)
-				break;
-		}
+		vtx_retire(vslq, &vtx);
+		AZ(vtx);
+		if (i)
+			return (i);
 	}
+
+	return (0);
+}
+
+/* Process the input cursor, calling the callback function on matching
+   transaction sets */
+int
+VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
+{
+	int i, r;
+	double now;
+	struct vtx *vtx;
+
+	CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
+
+	if (vslq->grouping == VSL_g_raw)
+		return (vslq_raw(vslq, func, priv));
+
+	/* Check shmref list and buffer if necessary */
+	r = vslq_shmref_check(vslq);
+	if (r)
+		return (r);
+
+	/* Process next cursor input */
+	i = vslq_next(vslq);
 	if (i)
 		return (i);
 
+	/* Check vtx timeout */
 	now = VTIM_mono();
-	while ((vtx = VTAILQ_FIRST(&vslq->incomplete)) &&
-	    now - vtx->t_start > vslq->vsl->T_opt) {
-		AZ(vtx->flags & VTX_F_COMPLETE);
-		vtx = vtx_force(vslq, vtx, "incomplete - timeout");
-		if (vtx) {
-			AN(vtx->flags & VTX_F_READY);
-			i = vslq_callback(vslq, vtx, func, priv);
-			vtx_retire(vslq, &vtx);
-			AZ(vtx);
-			if (i)
-				break;
-		}
+	while (!VTAILQ_EMPTY(&vslq->incomplete)) {
+		vtx = VTAILQ_FIRST(&vslq->incomplete);
+		CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
+		if (now - vtx->t_start < vslq->vsl->T_opt)
+			break;
+		vtx_force(vslq, vtx, "timeout");
+		AN(vtx->flags & VTX_F_COMPLETE);
 	}
-	if (i)
-		return (i);
 
+	/* Check store limit */
 	while (vslq->n_incomplete > vslq->vsl->L_opt) {
 		vtx = VTAILQ_FIRST(&vslq->incomplete);
-		AN(vtx);
-		AZ(vtx->flags & VTX_F_COMPLETE);
-		vtx = vtx_force(vslq, vtx, "incomplete - store overflow");
-		if (vtx) {
-			AN(vtx->flags & VTX_F_READY);
-			i = vslq_callback(vslq, vtx, func, priv);
-			vtx_retire(vslq, &vtx);
-			AZ(vtx);
-			if (i)
-				break;
-		}
+		CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
+		vtx_force(vslq, vtx, "store overflow");
+		AN(vtx->flags & VTX_F_COMPLETE);
 	}
 
+	/* Check ready list */
+	if (!VTAILQ_EMPTY(&vslq->ready))
+		r = vslq_process_ready(vslq, func, priv);
+	if (r)
+		return (r);
+
 	return (i);
 }
 
@@ -1263,23 +1297,15 @@ int
 VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
 {
 	struct vtx *vtx;
-	int i = 0;
 
 	CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
 
 	while (vslq->n_incomplete) {
 		vtx = VTAILQ_FIRST(&vslq->incomplete);
-		AN(vtx);
+		CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
 		AZ(vtx->flags & VTX_F_COMPLETE);
-		vtx = vtx_force(vslq, vtx, "incomplete - flushing");
-		if (vtx) {
-			AN(vtx->flags & VTX_F_READY);
-			i = vslq_callback(vslq, vtx, func, priv);
-			vtx_retire(vslq, &vtx);
-			AZ(vtx);
-			if (i)
-				break;
-		}
+		vtx_force(vslq, vtx, "flush");
 	}
-	return (i);
+
+	return (vslq_process_ready(vslq, func, priv));
 }
diff --git a/lib/libvarnishtools/vut.c b/lib/libvarnishtools/vut.c
index 7583c77..5e24af4 100644
--- a/lib/libvarnishtools/vut.c
+++ b/lib/libvarnishtools/vut.c
@@ -307,14 +307,16 @@ VUT_Main(VSLQ_dispatch_f *func, void *priv)
 		}
 
 		i = VSLQ_Dispatch(VUT.vslq, func, priv);
-		if (i == 0) {
+		if (i == 1)
+			/* Call again */
+			continue;
+		else if (i == 0) {
 			/* Nothing to do but wait */
 			if (VUT.fo)
 				fflush(VUT.fo);
 			VTIM_sleep(0.01);
 			continue;
-		}
-		if (i == -1) {
+		} else if (i == -1) {
 			/* EOF */
 			break;
 		}



More information about the varnish-commit mailing list