[master] 53ee8e5 VSLQ_Dispatch now returns sooner.
Martin Blix Grydeland
martin at varnish-cache.org
Wed Oct 9 16:03:03 CEST 2013
commit 53ee8e5c6baa2c4cfb6b9c948c209cebc2cfa77c
Author: Martin Blix Grydeland <martin at varnish-software.com>
Date: Wed Oct 9 12:53:42 2013 +0200
VSLQ_Dispatch now returns sooner.
Change VSLQ_Dispatch to always return after processing one record or
one batch of records. It will return 1 if it should be called again
without sleeping.
This fixes the situation where a calling program is lagging behind,
and VSLQ_Dispatch never returns because there is a constant stream of
new log records.
diff --git a/include/vapi/vsl.h b/include/vapi/vsl.h
index cee9307..f88e20f 100644
--- a/include/vapi/vsl.h
+++ b/include/vapi/vsl.h
@@ -423,8 +423,9 @@ int VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv);
* priv: An argument passed to func
*
* Return values:
+ * 1: Call again
* 0: No more log records available
- * !=0: The return value from either VSL_Next() or func
+ * !=0: The error code from VSL_Next() or func returned non-zero
*/
int VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv);
diff --git a/lib/libvarnishapi/vsl_dispatch.c b/lib/libvarnishapi/vsl_dispatch.c
index a47d018..6912943 100644
--- a/lib/libvarnishapi/vsl_dispatch.c
+++ b/lib/libvarnishapi/vsl_dispatch.c
@@ -65,9 +65,7 @@ struct vslc_raw {
struct VSL_cursor cursor;
- const uint32_t *start;
- ssize_t len;
- const uint32_t *next;
+ const uint32_t *ptr;
};
struct synth {
@@ -130,7 +128,7 @@ struct vtx {
unsigned magic;
#define VTX_MAGIC 0xACC21D09
VTAILQ_ENTRY(vtx) list_child;
- VTAILQ_ENTRY(vtx) list_incomplete;
+ VTAILQ_ENTRY(vtx) list_vtx;
double t_start;
unsigned flags;
@@ -170,15 +168,24 @@ struct VSLQ {
enum VSL_grouping_e grouping;
+ /* Structured mode */
struct vtx_tree tree;
-
+ VTAILQ_HEAD(,vtx) ready;
VTAILQ_HEAD(,vtx) incomplete;
unsigned n_incomplete;
-
struct chunkhead shmrefs;
-
VTAILQ_HEAD(,vtx) cache;
unsigned n_cache;
+
+ /* Raw mode */
+ struct {
+ struct vslc_raw c;
+ struct VSL_transaction trans;
+ struct VSL_transaction *ptrans[2];
+ struct VSLC_ptr start;
+ ssize_t len;
+ size_t offset;
+ } raw;
};
static void vtx_synth_rec(struct vtx *vtx, unsigned tag, const char *fmt, ...);
@@ -209,14 +216,14 @@ vslc_raw_next(struct VSL_cursor *cursor)
CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
assert(&c->cursor == cursor);
- assert(c->next >= c->start);
- assert(c->next <= c->start + c->len);
- if (c->next < c->start + c->len) {
- c->cursor.rec.ptr = c->next;
- c->next = VSL_NEXT(c->next);
+ AN(c->ptr);
+ if (c->cursor.rec.ptr == NULL) {
+ c->cursor.rec.ptr = c->ptr;
return (1);
+ } else {
+ c->cursor.rec.ptr = NULL;
+ return (0);
}
- return (0);
}
static int
@@ -227,9 +234,7 @@ vslc_raw_reset(struct VSL_cursor *cursor)
CAST_OBJ_NOTNULL(c, cursor->priv_data, VSLC_RAW_MAGIC);
assert(&c->cursor == cursor);
- assert(c->next >= c->start);
- assert(c->next <= c->start + c->len);
- c->next = c->start;
+ AN(c->ptr);
c->cursor.rec.ptr = NULL;
return (0);
@@ -592,14 +597,14 @@ vtx_add(struct VSLQ *vslq, unsigned vxid)
AN(vtx);
vtx->key.vxid = vxid;
AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
- VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_incomplete);
+ VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_vtx);
vslq->n_incomplete++;
return (vtx);
}
/* Mark a vtx complete, update child counters and if possible push it or
it's top parent to the ready state */
-static struct vtx *
+static void
vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
{
@@ -611,7 +616,7 @@ vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
vtx_diag(vtx, "vtx of unknown type marked complete");
vtx->flags |= VTX_F_COMPLETE;
- VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
+ VTAILQ_REMOVE(&vslq->incomplete, vtx, list_vtx);
AN(vslq->n_incomplete);
vslq->n_incomplete--;
@@ -621,20 +626,16 @@ vtx_mark_complete(struct VSLQ *vslq, struct vtx *vtx)
vtx->n_child == vtx->n_childready)
vtx->flags |= VTX_F_READY;
else
- return (NULL);
+ return;
if (vtx->parent == NULL) {
/* Top level vtx ready */
- return (vtx);
+ VTAILQ_INSERT_TAIL(&vslq->ready, vtx, list_vtx);
+ return;
}
vtx = vtx->parent;
vtx->n_childready++;
assert(vtx->n_child >= vtx->n_childready);
}
-
- if (vtx->flags & VTX_F_READY && vtx->parent == NULL)
- return (vtx);
-
- return (NULL);
}
/* Add a child to a parent, and update child counters */
@@ -809,14 +810,14 @@ vtx_scan_link(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
/* Scan the records of a vtx, performing processing actions on specific
records */
-static struct vtx *
+static void
vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
{
const uint32_t *ptr;
enum VSL_tag_e tag;
if (vtx->flags & VTX_F_END)
- return (NULL);
+ return;
while (vslc_vtx_next(&vtx->c.cursor) == 1) {
ptr = vtx->c.cursor.rec.ptr;
@@ -841,28 +842,25 @@ vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
break;
}
}
-
- return (NULL);
}
/* Force a vtx into complete status by synthing the necessary outstanding
records */
-static struct vtx *
+static void
vtx_force(struct VSLQ *vslq, struct vtx *vtx, const char *reason)
{
- struct vtx *ret;
AZ(vtx->flags & VTX_F_COMPLETE);
AZ(vtx->flags & VTX_F_READY);
+ vtx_scan(vslq, vtx);
if (!(vtx->flags & VTX_F_BEGIN))
vtx_synth_rec(vtx, SLT_Begin, "%s %u synth",
vsl_t_names[vtx->type], vtx->key.vxid);
vtx_diag(vtx, reason);
if (!(vtx->flags & VTX_F_END))
vtx_synth_rec(vtx, SLT_End, "synth");
- ret = vtx_scan(vslq, vtx);
+ vtx_scan(vslq, vtx);
AN(vtx->flags & VTX_F_COMPLETE);
- return (ret);
}
/* Build transaction array, do the query and callback. Returns 0 or the
@@ -880,9 +878,8 @@ vslq_callback(const struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
AN(vslq);
CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
AN(vtx->flags & VTX_F_READY);
+ AN(func);
- if (func == NULL)
- return (0);
if (vslq->grouping == VSL_g_session &&
vtx->type != VSL_t_sess)
return (0);
@@ -1030,11 +1027,24 @@ VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
*cp = NULL;
vslq->grouping = grouping;
vslq->query = query;
+
+ /* Setup normal mode */
VRB_INIT(&vslq->tree);
+ VTAILQ_INIT(&vslq->ready);
VTAILQ_INIT(&vslq->incomplete);
VTAILQ_INIT(&vslq->shmrefs);
VTAILQ_INIT(&vslq->cache);
+ /* Setup raw mode */
+ vslq->raw.c.magic = VSLC_RAW_MAGIC;
+ vslq->raw.c.cursor.priv_tbl = &vslc_raw_tbl;
+ vslq->raw.c.cursor.priv_data = &vslq->raw.c;
+ vslq->raw.trans.level = 0;
+ vslq->raw.trans.type = VSL_t_raw;
+ vslq->raw.trans.c = &vslq->raw.c.cursor;
+ vslq->raw.ptrans[0] = &vslq->raw.trans;
+ vslq->raw.ptrans[1] = NULL;
+
return (vslq);
}
@@ -1051,6 +1061,15 @@ VSLQ_Delete(struct VSLQ **pvslq)
(void)VSLQ_Flush(vslq, NULL, NULL);
AZ(vslq->n_incomplete);
+
+ while (!VTAILQ_EMPTY(&vslq->ready)) {
+ vtx = VTAILQ_FIRST(&vslq->ready);
+ CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
+ VTAILQ_REMOVE(&vslq->ready, vtx, list_vtx);
+ AN(vtx->flags & VTX_F_READY);
+ vtx_retire(vslq, &vtx);
+ }
+
VSL_DeleteCursor(vslq->c);
vslq->c = NULL;
@@ -1072,49 +1091,43 @@ VSLQ_Delete(struct VSLQ **pvslq)
/* Regard each log line as a single transaction, feed it through the query
and do the callback */
static int
-vslq_raw(const struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
+vslq_raw(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
{
- struct vslc_raw rawc;
- struct VSL_transaction trans;
- struct VSL_transaction *ptrans[2];
- struct VSL_cursor *c;
- int i;
+ int i = 1;
+ int r;
assert(vslq->grouping == VSL_g_raw);
- c = vslq->c;
- memset(&rawc, 0, sizeof rawc);
- rawc.magic = VSLC_RAW_MAGIC;
- rawc.cursor.priv_tbl = &vslc_raw_tbl;
- rawc.cursor.priv_data = &rawc;
- trans.level = 0;
- trans.type = VSL_t_raw;
- trans.c = &rawc.cursor;
- ptrans[0] = &trans;
- ptrans[1] = NULL;
+ assert(vslq->raw.offset <= vslq->raw.len);
+ do {
+ if (vslq->raw.offset == vslq->raw.len) {
+ i = VSL_Next(vslq->c);
+ if (i <= 0)
+ return (i);
+ AN(vslq->c->rec.ptr);
+ vslq->raw.start = vslq->c->rec;
+ vslq->raw.len = VSL_NEXT(vslq->raw.start.ptr) -
+ vslq->raw.start.ptr;
+ assert(vslq->raw.len > 0);
+ vslq->raw.offset = 0;
+ }
- while (1) {
- i = VSL_Next(c);
- if (i <= 0)
- break;
- AN(c->rec.ptr);
- if (func == NULL)
- continue;
- rawc.start = c->rec.ptr;
- rawc.len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
- rawc.next = rawc.start;
- rawc.cursor.rec.ptr = NULL;
- trans.vxid = VSL_ID(c->rec.ptr);
-
- /* Query check goes here */
- if (vslq->query != NULL && !vslq_runquery(vslq->query, ptrans))
- continue;
-
- /* Callback */
- i = (func)(vslq->vsl, ptrans, priv);
- if (i)
- break;
- }
+ vslq->raw.c.ptr = vslq->raw.start.ptr + vslq->raw.offset;
+ vslq->raw.c.cursor.rec.ptr = NULL;
+ vslq->raw.trans.vxid = VSL_ID(vslq->raw.c.ptr);
+ vslq->raw.offset += VSL_NEXT(vslq->raw.c.ptr) - vslq->raw.c.ptr;
+ } while (VSL_TAG(vslq->raw.c.ptr) == SLT__Batch);
+
+ if (func == NULL)
+ return (i);
+
+ if (vslq->query != NULL &&
+ !vslq_runquery(vslq->query, vslq->raw.ptrans))
+ return (i);
+
+ r = (func)(vslq->vsl, vslq->raw.ptrans, priv);
+ if (r)
+ return (r);
return (i);
}
@@ -1151,10 +1164,9 @@ vslq_shmref_check(struct VSLQ *vslq)
return (0);
}
-/* Process the input cursor, calling the callback function on matching
- transaction sets */
-int
-VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
+/* Process next input record */
+static int
+vslq_next(struct VSLQ *vslq)
{
struct VSL_cursor *c;
int i, batch;
@@ -1162,98 +1174,120 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
ssize_t len;
unsigned vxid;
struct vtx *vtx;
- double now;
- CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
+ c = vslq->c;
+ i = VSL_Next(c);
+ if (i != 1)
+ return (i);
- if (vslq->grouping == VSL_g_raw)
- return (vslq_raw(vslq, func, priv));
+ tag = VSL_TAG(c->rec.ptr);
+ if (tag == SLT__Batch) {
+ batch = 1;
+ vxid = VSL_BATCHID(c->rec.ptr);
+ len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
+ c->rec.ptr;
+ if (len == 0)
+ return (i);
+ tag = VSL_TAG(VSL_NEXT(c->rec.ptr));
+ } else {
+ batch = 0;
+ vxid = VSL_ID(c->rec.ptr);
+ len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
+ }
+ assert(len > 0);
+ if (vxid == 0)
+ /* Skip non-transactional records */
+ return (i);
- c = vslq->c;
- while (1) {
- /* Check shmref list */
- i = vslq_shmref_check(vslq);
- if (i)
- break;
+ vtx = vtx_lookup(vslq, vxid);
+ if (vtx == NULL && tag == SLT_Begin) {
+ vtx = vtx_add(vslq, vxid);
+ AN(vtx);
+ }
+ if (vtx != NULL) {
+ vtx_append(vslq, vtx, &c->rec, len);
+ vtx_scan(vslq, vtx);
+ }
+ if (batch)
+ AZ(vsl_skip(c, VSL_WORDS(VSL_BATCHLEN(c->rec.ptr))));
- i = VSL_Next(c);
- if (i != 1)
- break;
- tag = VSL_TAG(c->rec.ptr);
- if (tag == SLT__Batch) {
- batch = 1;
- vxid = VSL_BATCHID(c->rec.ptr);
- len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
- c->rec.ptr;
- if (len == 0)
- continue;
- tag = VSL_TAG(VSL_NEXT(c->rec.ptr));
- } else {
- batch = 0;
- vxid = VSL_ID(c->rec.ptr);
- len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
- }
+ return (i);
+}
- assert(len > 0);
- if (vxid == 0)
- /* Skip non-transactional records */
- continue;
+/* Test query and report any ready transactions */
+static int
+vslq_process_ready(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
+{
+ struct vtx *vtx;
+ int i = 0;
- vtx = vtx_lookup(vslq, vxid);
- if (vtx == NULL && tag == SLT_Begin) {
- vtx = vtx_add(vslq, vxid);
- AN(vtx);
- }
- if (vtx != NULL) {
- vtx_append(vslq, vtx, &c->rec, len);
- vtx = vtx_scan(vslq, vtx);
- }
- if (batch)
- AZ(vsl_skip(c, VSL_WORDS(VSL_BATCHLEN(c->rec.ptr))));
- if (vtx) {
- AN(vtx->flags & VTX_F_READY);
+ AN(vslq);
+
+ while (!VTAILQ_EMPTY(&vslq->ready)) {
+ vtx = VTAILQ_FIRST(&vslq->ready);
+ CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
+ VTAILQ_REMOVE(&vslq->ready, vtx, list_vtx);
+ AN(vtx->flags & VTX_F_READY);
+ if (func != NULL)
i = vslq_callback(vslq, vtx, func, priv);
- vtx_retire(vslq, &vtx);
- AZ(vtx);
- if (i)
- break;
- }
+ vtx_retire(vslq, &vtx);
+ AZ(vtx);
+ if (i)
+ return (i);
}
+
+ return (0);
+}
+
+/* Process the input cursor, calling the callback function on matching
+ transaction sets */
+int
+VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
+{
+ int i, r;
+ double now;
+ struct vtx *vtx;
+
+ CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
+
+ if (vslq->grouping == VSL_g_raw)
+ return (vslq_raw(vslq, func, priv));
+
+ /* Check shmref list and buffer if necessary */
+ r = vslq_shmref_check(vslq);
+ if (r)
+ return (r);
+
+ /* Process next cursor input */
+ i = vslq_next(vslq);
if (i)
return (i);
+ /* Check vtx timeout */
now = VTIM_mono();
- while ((vtx = VTAILQ_FIRST(&vslq->incomplete)) &&
- now - vtx->t_start > vslq->vsl->T_opt) {
- AZ(vtx->flags & VTX_F_COMPLETE);
- vtx = vtx_force(vslq, vtx, "incomplete - timeout");
- if (vtx) {
- AN(vtx->flags & VTX_F_READY);
- i = vslq_callback(vslq, vtx, func, priv);
- vtx_retire(vslq, &vtx);
- AZ(vtx);
- if (i)
- break;
- }
+ while (!VTAILQ_EMPTY(&vslq->incomplete)) {
+ vtx = VTAILQ_FIRST(&vslq->incomplete);
+ CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
+ if (now - vtx->t_start < vslq->vsl->T_opt)
+ break;
+ vtx_force(vslq, vtx, "timeout");
+ AN(vtx->flags & VTX_F_COMPLETE);
}
- if (i)
- return (i);
+ /* Check store limit */
while (vslq->n_incomplete > vslq->vsl->L_opt) {
vtx = VTAILQ_FIRST(&vslq->incomplete);
- AN(vtx);
- AZ(vtx->flags & VTX_F_COMPLETE);
- vtx = vtx_force(vslq, vtx, "incomplete - store overflow");
- if (vtx) {
- AN(vtx->flags & VTX_F_READY);
- i = vslq_callback(vslq, vtx, func, priv);
- vtx_retire(vslq, &vtx);
- AZ(vtx);
- if (i)
- break;
- }
+ CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
+ vtx_force(vslq, vtx, "store overflow");
+ AN(vtx->flags & VTX_F_COMPLETE);
}
+ /* Check ready list */
+ if (!VTAILQ_EMPTY(&vslq->ready))
+ r = vslq_process_ready(vslq, func, priv);
+ if (r)
+ return (r);
+
return (i);
}
@@ -1263,23 +1297,15 @@ int
VSLQ_Flush(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
{
struct vtx *vtx;
- int i = 0;
CHECK_OBJ_NOTNULL(vslq, VSLQ_MAGIC);
while (vslq->n_incomplete) {
vtx = VTAILQ_FIRST(&vslq->incomplete);
- AN(vtx);
+ CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
AZ(vtx->flags & VTX_F_COMPLETE);
- vtx = vtx_force(vslq, vtx, "incomplete - flushing");
- if (vtx) {
- AN(vtx->flags & VTX_F_READY);
- i = vslq_callback(vslq, vtx, func, priv);
- vtx_retire(vslq, &vtx);
- AZ(vtx);
- if (i)
- break;
- }
+ vtx_force(vslq, vtx, "flush");
}
- return (i);
+
+ return (vslq_process_ready(vslq, func, priv));
}
diff --git a/lib/libvarnishtools/vut.c b/lib/libvarnishtools/vut.c
index 7583c77..5e24af4 100644
--- a/lib/libvarnishtools/vut.c
+++ b/lib/libvarnishtools/vut.c
@@ -307,14 +307,16 @@ VUT_Main(VSLQ_dispatch_f *func, void *priv)
}
i = VSLQ_Dispatch(VUT.vslq, func, priv);
- if (i == 0) {
+ if (i == 1)
+ /* Call again */
+ continue;
+ else if (i == 0) {
/* Nothing to do but wait */
if (VUT.fo)
fflush(VUT.fo);
VTIM_sleep(0.01);
continue;
- }
- if (i == -1) {
+ } else if (i == -1) {
/* EOF */
break;
}
More information about the varnish-commit
mailing list