[master] cef9da1 Make Dispatch use VSL_Check to handle SHM ptrs.

Martin Blix Grydeland martin at varnish-cache.org
Wed May 15 14:46:14 CEST 2013


commit cef9da177a5445d04306cbde5a8756734f8ab8cc
Author: Martin Blix Grydeland <martin at varnish-software.com>
Date:   Sun May 12 23:55:39 2013 +0200

    Make Dispatch use VSL_Check to handle SHM ptrs.
    
    This allows Dispatch to run completely in SHM with no copying for most
    cases, also when transactions come in multiple batches.

diff --git a/include/vapi/vsl_int.h b/include/vapi/vsl_int.h
index f65dc9f..8765b84 100644
--- a/include/vapi/vsl_int.h
+++ b/include/vapi/vsl_int.h
@@ -89,6 +89,8 @@ struct VSL_head {
 #define VSL_BACKEND(ptr)	(((ptr)[1]) & VSL_BACKENDMARKER)
 #define VSL_DATA(ptr)		((char*)((ptr)+2))
 #define VSL_CDATA(ptr)		((const char*)((ptr)+2))
+#define VSL_BATCHLEN(ptr)	((ptr)[1])
+#define VSL_BATCHID(ptr)	(VSL_ID((ptr) + 2))
 
 #define VSL_ENDMARKER	(((uint32_t)SLT__Reserved << 24) | 0x454545) /* "EEE" */
 #define VSL_WRAPMARKER	(((uint32_t)SLT__Reserved << 24) | 0x575757) /* "WWW" */
diff --git a/lib/libvarnishapi/vsl_dispatch.c b/lib/libvarnishapi/vsl_dispatch.c
index 265e1e5..8c3bfe2 100644
--- a/lib/libvarnishapi/vsl_dispatch.c
+++ b/lib/libvarnishapi/vsl_dispatch.c
@@ -46,6 +46,9 @@
 
 #define VTX_CACHE 10
 #define VTX_BUFSIZE_MIN 64
+#define VTX_CHUNKS 3
+
+struct vtx;
 
 enum vtx_type_e {
 	vtx_t_unknown,
@@ -62,6 +65,33 @@ enum vtx_link_e {
 	vtx_l_bereq,
 };
 
+struct vslc_raw {
+	struct vslc		c;
+	unsigned		magic;
+#define VSLC_RAW_MAGIC		0x247EBD44
+
+	const uint32_t		*start;
+	ssize_t			len;
+	const uint32_t		*next;
+};
+
+struct vtx_chunk {
+	struct VSLC_ptr		start;
+	ssize_t			len;
+	ssize_t			offset;
+};
+
+struct vslc_vtx {
+	struct vslc		c;
+	unsigned		magic;
+#define VSLC_VTX_MAGIC		0x74C6523F
+
+	struct vtx		*vtx;
+
+	unsigned		chunk;	/* Current chunk */
+	ssize_t			offset; /* Offset of next record */
+};
+
 struct vtx_key {
 	unsigned		vxid;
 	VRB_ENTRY(vtx_key)	entry;
@@ -74,12 +104,12 @@ struct vtx {
 #define VTX_MAGIC		0xACC21D09
 	VTAILQ_ENTRY(vtx)	list_child;
 	VTAILQ_ENTRY(vtx)	list_incomplete;
+	VTAILQ_ENTRY(vtx)	list_shm;
 
 	double			t_start;
 	unsigned		flags;
-#define VTX_F_SHM		0x1
-#define VTX_F_COMPLETE		0x2
-#define VTX_F_READY		0x4
+#define VTX_F_COMPLETE		0x1
+#define VTX_F_READY		0x2
 
 	enum vtx_type_e		type;
 
@@ -89,33 +119,16 @@ struct vtx {
 	unsigned		n_childready;
 	unsigned		n_descend;
 
-	const uint32_t		*start;
+	struct vslc_vtx		c;
 	ssize_t			len;
-	unsigned		index;
+
+	struct vtx_chunk	chunk[VTX_CHUNKS];
+	unsigned		n_chunk;
 
 	uint32_t		*buf;
 	ssize_t			bufsize;
 };
 
-struct vslc_raw {
-	struct vslc		c;
-	unsigned		magic;
-#define VSLC_RAW_MAGIC		0x247EBD44
-
-	const uint32_t		*start;
-	ssize_t			len;
-	const uint32_t		*next;
-};
-
-struct vslc_vtx {
-	struct vslc		c;
-	unsigned		magic;
-#define VSLC_VTX_MAGIC		0x74C6523F
-
-	struct vtx		*vtx;
-	const uint32_t		*next;
-};
-
 struct VSLQ {
 	unsigned		magic;
 #define VSLQ_MAGIC		0x23A8BE97
@@ -131,6 +144,8 @@ struct VSLQ {
 	VTAILQ_HEAD(,vtx)	incomplete;
 	unsigned		n_incomplete;
 
+	VTAILQ_HEAD(,vtx)	shmlist;
+
 	VTAILQ_HEAD(,vtx)	cache;
 	unsigned		n_cache;
 };
@@ -215,17 +230,37 @@ static int
 vslc_vtx_next(void *cursor)
 {
 	struct vslc_vtx *c;
+	struct vtx_chunk *chunk;
 
 	CAST_OBJ_NOTNULL(c, cursor, VSLC_VTX_MAGIC);
 
-	assert(c->next >= c->vtx->start);
-	assert(c->next <= c->vtx->start + c->vtx->len);
-	if (c->next < c->vtx->start + c->vtx->len) {
-		c->c.c.rec.ptr = c->next;
-		c->next = VSL_NEXT(c->next);
+	assert (c->offset <= c->vtx->len);
+	if (c->offset == c->vtx->len)
+		return (0);
+
+	if (c->vtx->n_chunk == 0) {
+		/* Buffer */
+		AN(c->vtx->buf);
+		assert(c->offset < c->vtx->bufsize);
+		c->c.c.rec.ptr = c->vtx->buf + c->offset;
+		c->offset += VSL_NEXT(c->c.c.rec.ptr) - c->c.c.rec.ptr;
 		return (1);
 	}
-	return (0);
+
+	/* Shmptr chunks */
+	assert(c->chunk < c->vtx->n_chunk);
+	chunk = &c->vtx->chunk[c->chunk];
+	assert(c->offset >= chunk->offset);
+	assert(c->offset <= chunk->offset + chunk->len);
+	if (c->offset == chunk->offset + chunk->len) {
+		c->chunk++;
+		chunk = &c->vtx->chunk[c->chunk];
+	}
+	AN(chunk->start.ptr);
+	c->c.c.rec.ptr = chunk->start.ptr + c->offset - chunk->offset;
+	c->offset += VSL_NEXT(c->c.c.rec.ptr) - c->c.c.rec.ptr;
+
+	return (1);
 }
 
 static int
@@ -234,10 +269,8 @@ vslc_vtx_reset(void *cursor)
 	struct vslc_vtx *c;
 
 	CAST_OBJ_NOTNULL(c, cursor, VSLC_VTX_MAGIC);
-
-	assert(c->next >= c->vtx->start);
-	assert(c->next <= c->vtx->start + c->vtx->len);
-	c->next = c->vtx->start;
+	c->chunk = 0;
+	c->offset = 0;
 	c->c.c.rec.ptr = NULL;
 
 	return (0);
@@ -251,23 +284,6 @@ static struct vslc_tbl vslc_vtx_tbl = {
 	.check	= NULL,
 };
 
-static void
-vslc_vtx_setup(struct vslc_vtx *c, struct vtx *vtx, unsigned level)
-{
-	AN(c);
-	AN(vtx);
-
-	memset(c, 0, sizeof *c);
-	c->c.c.vxid = vtx->key.vxid;
-	c->c.c.level = level;
-	c->c.magic = VSLC_MAGIC;
-	c->c.tbl = &vslc_vtx_tbl;
-
-	c->magic = VSLC_VTX_MAGIC;
-	c->vtx = vtx;
-	c->next = c->vtx->start;
-}
-
 static struct vtx *
 vtx_new(struct VSLQ *vslq)
 {
@@ -282,6 +298,10 @@ vtx_new(struct VSLQ *vslq)
 	} else {
 		ALLOC_OBJ(vtx, VTX_MAGIC);
 		AN(vtx);
+		vtx->c.c.magic = VSLC_MAGIC;
+		vtx->c.c.tbl = &vslc_vtx_tbl;
+		vtx->c.magic = VSLC_VTX_MAGIC;
+		vtx->c.vtx = vtx;
 	}
 
 	vtx->key.vxid = 0;
@@ -293,9 +313,10 @@ vtx_new(struct VSLQ *vslq)
 	vtx->n_child = 0;
 	vtx->n_childready = 0;
 	vtx->n_descend = 0;
-	vtx->start = vtx->buf;
+	(void)vslc_vtx_reset(&vtx->c);
 	vtx->len = 0;
-	vtx->index = 0;
+	memset(&vtx->chunk, 0, sizeof vtx->chunk);
+	vtx->n_chunk = 0;
 
 	VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_incomplete);
 	vslq->n_incomplete++;
@@ -349,6 +370,9 @@ vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
 	AZ(vtx->n_descend);
 	AN(VRB_REMOVE(vtx_tree, &vslq->tree, &vtx->key));
 
+	if (vtx->n_chunk)
+		VTAILQ_REMOVE(&vslq->shmlist, vtx, list_shm);
+
 	if (vslq->n_cache < VTX_CACHE) {
 		VTAILQ_INSERT_HEAD(&vslq->cache, vtx, list_child);
 		vslq->n_cache++;
@@ -375,52 +399,81 @@ vtx_lori(struct VSLQ *vslq, unsigned vxid)
 	vtx = vtx_new(vslq);
 	AN(vtx);
 	vtx->key.vxid = vxid;
+	vtx->c.c.c.vxid = vxid;
 	AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
 	return (vtx);
 }
 
 static void
-vtx_append(struct vtx *vtx, const uint32_t *ptr, ssize_t len, int shmptr_ok)
+vtx_set_bufsize(struct vtx *vtx, ssize_t len)
 {
-	ssize_t bufsize;
-	const uint32_t *ptr2;
-	ssize_t len2;
 
 	AN(vtx);
+	assert(len >= 0);
+	if (vtx->bufsize >= len)
+		return;
+	if (vtx->bufsize == 0)
+		vtx->bufsize = VTX_BUFSIZE_MIN;
+	while (vtx->bufsize < len)
+		vtx->bufsize *= 2;
+	vtx->buf = realloc(vtx->buf, sizeof (uint32_t) * vtx->bufsize);
+}
 
-	if (vtx->flags & VTX_F_SHM) {
-		assert(vtx->start != vtx->buf);
-		ptr2 = vtx->start;
-		vtx->start = vtx->buf;
-		len2 = vtx->len;
-		vtx->len = 0;
-		vtx->flags &= ~VTX_F_SHM;
-		vtx_append(vtx, ptr2, len2, 0);
-	}
+static void
+vtx_buffer(struct VSLQ *vslq, struct vtx *vtx)
+{
+	int i;
+
+	AN(vtx->n_chunk);
+	AN(vtx->len);
+
+	vtx_set_bufsize(vtx, vtx->len);
+	AN(vtx->buf);
+	assert(vtx->bufsize >= vtx->len);
+
+	for (i = 0; i < vtx->n_chunk; i++)
+		memcpy(vtx->buf + vtx->chunk[i].offset, vtx->chunk[i].start.ptr,
+		    sizeof (uint32_t) * vtx->chunk[i].len);
+
+	memset(&vtx->chunk, 0, sizeof vtx->chunk);
+	VTAILQ_REMOVE(&vslq->shmlist, vtx, list_shm);
+	vtx->n_chunk = 0;
+}
 
+static void
+vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
+    ssize_t len, int copy)
+{
+
+	AN(vtx);
 	if (len == 0)
 		return;
-	AN(ptr);
-
-	if (shmptr_ok && vtx->len == 0) {
-		vtx->start = ptr;
-		vtx->len = len;
-		vtx->flags |= VTX_F_SHM;
+	AN(start);
+
+	if (vtx->len > 0 && vtx->n_chunk == 0)
+		/* Can't mix buffer and shmptr */
+		copy = 1;
+
+	if (!copy && vtx->n_chunk < VTX_CHUNKS) {
+		/* Add shmptr chunk */
+		AZ(vtx->chunk[vtx->n_chunk].len);
+		vtx->chunk[vtx->n_chunk].start = *start;
+		vtx->chunk[vtx->n_chunk].len = len;
+		vtx->chunk[vtx->n_chunk].offset = vtx->len;
+		vtx->len += len;
+		if (vtx->n_chunk == 0)
+			VTAILQ_INSERT_TAIL(&vslq->shmlist, vtx, list_shm);
+		vtx->n_chunk++;
 		return;
 	}
 
-	bufsize = vtx->bufsize;
-	if (bufsize == 0)
-		bufsize = VTX_BUFSIZE_MIN;
-	while (vtx->len + len > bufsize)
-		bufsize *= 2;
-	if (bufsize != vtx->bufsize) {
-		vtx->buf = realloc(vtx->buf, 4 * bufsize);
-		AN(vtx->buf);
-		vtx->bufsize = bufsize;
-		vtx->start = vtx->buf;
-	}
-	memcpy(&vtx->buf[vtx->len], ptr, 4 * len);
+	/* Append to buffer */
+	vtx_set_bufsize(vtx, vtx->len + len);
+	if (vtx->n_chunk)
+		vtx_buffer(vslq, vtx);
+	AZ(vtx->n_chunk);
+	AN(vtx->buf);
+	memcpy(vtx->buf + vtx->len, start->ptr, sizeof (uint32_t) * len);
 	vtx->len += len;
 }
 
@@ -454,13 +507,6 @@ vtx_check_ready(struct VSLQ *vslq, struct vtx *vtx)
 		/* Top level vtx ready */
 		return (ready);
 
-	if (vtx->flags & VTX_F_SHM) {
-		/* Not ready, append zero to make sure it's not a shm
-		   reference */
-		vtx_append(vtx, NULL, 0, 0);
-		AZ(vtx->flags & VTX_F_SHM);
-	}
-
 	return (NULL);
 }
 
@@ -518,7 +564,7 @@ static int
 vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
 {
 	int i;
-	enum vtx_type_e p_type;
+	enum vtx_type_e type;
 	unsigned p_vxid;
 	struct vtx *p_vtx;
 
@@ -527,25 +573,31 @@ vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
 	if (vtx->flags & VTX_F_READY)
 		return (vtx_diag_tag(vtx, ptr, "link too late"));
 
-	i = vtx_parsetag_bl(VSL_CDATA(ptr), VSL_LEN(ptr), &p_type, &p_vxid);
+	i = vtx_parsetag_bl(VSL_CDATA(ptr), VSL_LEN(ptr), &type, &p_vxid);
 	if (i < 1)
 		return (vtx_diag_tag(vtx, ptr, "parse error"));
 
 	/* Check/set vtx type */
-	assert(p_type != vtx_t_unknown);
-	if (vtx->type != vtx_t_unknown && vtx->type != p_type)
+	assert(type != vtx_t_unknown);
+	if (vtx->type != vtx_t_unknown && vtx->type != type)
 		return (vtx_diag_tag(vtx, ptr, "type mismatch"));
-	vtx->type = p_type;
+	vtx->type = type;
 
 	if (i == 1 || p_vxid == 0)
 		return (0);
 
+	if (vslq->grouping == VSL_g_vxid)
+		return (0);	/* No links */
+	if (vslq->grouping == VSL_g_request && vtx->type == vtx_t_req)
+		return (0);	/* No links */
+
 	/* Lookup and check parent vtx */
 	p_vtx = vtx_lori(vslq, p_vxid);
 	AN(p_vtx);
 	if (vtx->parent == p_vtx)
 		/* Link already exists */
 		return (0);
+
 	if (vtx->parent != NULL)
 		return (vtx_diag_tag(vtx, ptr, "duplicate link"));
 	if (p_vtx->flags & VTX_F_READY)
@@ -574,6 +626,11 @@ vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
 		return (vtx_diag_tag(vtx, ptr, "parse error"));
 	assert(i == 2);
 
+	if (vslq->grouping == VSL_g_vxid)
+		return (0);	/* No links */
+	if (vslq->grouping == VSL_g_request && vtx->type == vtx_t_sess)
+		return (0);	/* No links */
+
 	/* Lookup and check child vtx */
 	c_vtx = vtx_lori(vslq, c_vxid);
 	AN(c_vtx);
@@ -584,7 +641,6 @@ vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
 		return (vtx_diag_tag(vtx, ptr, "duplicate link"));
 	if (c_vtx->flags & VTX_F_READY)
 		return (vtx_diag_tag(vtx, ptr, "link too late"));
-	assert(c_type != vtx_t_unknown);
 	if (c_vtx->type != vtx_t_unknown && c_vtx->type != c_type)
 		return (vtx_diag_tag(vtx, ptr, "type mismatch"));
 	c_vtx->type = c_type;
@@ -599,16 +655,17 @@ vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
 {
 	const uint32_t *ptr;
 	enum VSL_tag_e tag;
-	int complete;
+	struct vtx *ret = NULL;
 
-	complete = (vtx->flags & VTX_F_COMPLETE ? 1 : 0);
-	ptr = vtx->start + vtx->index;
-	assert(ptr <= vtx->start + vtx->len);
-	for (; ptr < vtx->start + vtx->len; ptr = VSL_NEXT(ptr)) {
+	while (vslc_vtx_next(&vtx->c) == 1) {
+		ptr = vtx->c.c.c.rec.ptr;
 		tag = VSL_TAG(ptr);
 
-		if (complete) {
-			vtx_diag(vtx, "late log rec");
+		if (tag == SLT__Batch)
+			continue;
+
+		if (vtx->flags & VTX_F_COMPLETE) {
+			vtx_diag_tag(vtx, ptr, "late log rec");
 			continue;
 		}
 
@@ -625,31 +682,21 @@ vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
 			break;
 
 		case SLT_End:
-			complete = 1;
+			AZ(vtx->flags & VTX_F_COMPLETE);
+			AZ(ret);
+			VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
+			vtx->flags |= VTX_F_COMPLETE;
+			AN(vslq->n_incomplete);
+			vslq->n_incomplete--;
+			ret = vtx_check_ready(vslq, vtx);
 			break;
 
 		default:
 			break;
 		}
 	}
-	vtx->index = ptr - vtx->start;
-	assert(vtx->index <= vtx->len);
-
-	if (!complete && vtx->flags & VTX_F_SHM) {
-		/* Append zero to make sure it's not a shm reference */
-		vtx_append(vtx, NULL, 0, 0);
-		AZ(vtx->flags & VTX_F_SHM);
-	}
 
-	if (complete) {
-		VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
-		vtx->flags |= VTX_F_COMPLETE;
-		AN(vslq->n_incomplete);
-		vslq->n_incomplete--;
-		return (vtx_check_ready(vslq, vtx));
-	}
-
-	return (NULL);
+	return (ret);
 }
 
 static struct vtx *
@@ -672,9 +719,9 @@ vslq_callback(struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
     void *priv)
 {
 	unsigned n = vtx->n_descend + 1;
+	struct vslc_vtx *cp[n];
+	struct VSL_cursor *CP[n + 1];
 	unsigned i, j;
-	struct vslc_vtx c[n];
-	struct VSL_cursor *cp[n + 1];
 
 	AN(vslq);
 	CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
@@ -689,13 +736,17 @@ vslq_callback(struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
 		return (0);
 
 	i = j = 0;
-	vslc_vtx_setup(&c[i], vtx, 0);
+	cp[i] = &vtx->c;
+	vslc_vtx_reset(cp[i]);
+	cp[i]->c.c.level = 0;
 	i++;
 	while (j < i) {
-		vtx = VTAILQ_FIRST(&c[j].vtx->child);
+		vtx = VTAILQ_FIRST(&cp[j]->vtx->child);
 		while (vtx) {
 			assert(i < n);
-			vslc_vtx_setup(&c[i], vtx, c[j].c.c.level + 1);
+			cp[i] = &vtx->c;
+			vslc_vtx_reset(cp[i]);
+			cp[i]->c.c.level = cp[j]->c.c.level + 1;
 			i++;
 			vtx = VTAILQ_NEXT(vtx, list_child);
 		}
@@ -705,12 +756,12 @@ vslq_callback(struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
 
 	/* Reverse order */
 	for (i = 0; i < n; i++)
-		cp[i] = &c[n - i - 1].c.c;
-	cp[i] = NULL;
+		CP[i] = &cp[n - i - 1]->c.c;
+	CP[i] = NULL;
 
 	/* Query test goes here */
-	if (vslq->query == NULL ? 1 : vslq_runquery(vslq->query, cp))
-		return ((func)(vslq->vsl, cp, priv));
+	if (vslq->query == NULL ? 1 : vslq_runquery(vslq->query, CP))
+		return ((func)(vslq->vsl, CP, priv));
 	else
 		return (0);
 }
@@ -744,6 +795,7 @@ VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
 	vslq->query = query;
 	VRB_INIT(&vslq->tree);
 	VTAILQ_INIT(&vslq->incomplete);
+	VTAILQ_INIT(&vslq->shmlist);
 	VTAILQ_INIT(&vslq->cache);
 
 	return (vslq);
@@ -829,7 +881,6 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
 	struct VSL_cursor *c;
 	int i;
 	enum VSL_tag_e tag;
-	const uint32_t *ptr;
 	ssize_t len;
 	unsigned vxid;
 	struct vtx *vtx;
@@ -842,24 +893,38 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
 
 	c = vslq->c;
 	while (1) {
+		/* Check shmlist and buffer on warning */
+		while ((vtx = VTAILQ_FIRST(&vslq->shmlist))) {
+			AN(vtx->n_chunk);
+			i = VSL_Check(c, &vtx->chunk[0].start);
+			if (i == 2)
+				break;
+			else if (i == 1)
+				vtx_buffer(vslq, vtx);
+			else
+				/* Too late */
+				return (-3);
+		}
+
 		i = VSL_Next(c);
 		if (i != 1)
-			break;
+			return (i);
 		tag = VSL_TAG(c->rec.ptr);
 		if (tag == SLT__Batch) {
-			ptr = VSL_NEXT(c->rec.ptr);
-			len = VSL_WORDS(c->rec.ptr[1]);
-			AZ(vsl_skip(c, len));
+			vxid = VSL_BATCHID(c->rec.ptr);
+			len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
+			    c->rec.ptr;
 		} else {
-			ptr = c->rec.ptr;
-			len = VSL_NEXT(ptr) - ptr;
+			vxid = VSL_ID(c->rec.ptr);
+			len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
 		}
-		vxid = VSL_ID(ptr);
 		if (vxid == 0)
 			continue;
 		vtx = vtx_lori(vslq, vxid);
 		AN(vtx);
-		vtx_append(vtx, ptr, len, c->shmptr_ok);
+		vtx_append(vslq, vtx, &c->rec, len, VSL_Check(c, &c->rec) != 2);
+		if (tag == SLT__Batch)
+			AZ(vsl_skip(c, VSL_WORDS(VSL_BATCHLEN(c->rec.ptr))));
 		vtx = vtx_scan(vslq, vtx);
 		if (vtx) {
 			AN(vtx->flags & VTX_F_READY);
@@ -867,7 +932,7 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
 			vtx_retire(vslq, &vtx);
 			AZ(vtx);
 			if (i)
-				break;
+				return (i);
 		}
 	}
 	if (i)



More information about the varnish-commit mailing list