[master] cef9da1 Make Dispatch use VSL_Check to handle SHM ptrs.
Martin Blix Grydeland
martin at varnish-cache.org
Wed May 15 14:46:14 CEST 2013
commit cef9da177a5445d04306cbde5a8756734f8ab8cc
Author: Martin Blix Grydeland <martin at varnish-software.com>
Date: Sun May 12 23:55:39 2013 +0200
Make Dispatch use VSL_Check to handle SHM ptrs.
This allows Dispatch to run completely in SHM with no copying for most
cases, also when transactions come in multiple batches.
diff --git a/include/vapi/vsl_int.h b/include/vapi/vsl_int.h
index f65dc9f..8765b84 100644
--- a/include/vapi/vsl_int.h
+++ b/include/vapi/vsl_int.h
@@ -89,6 +89,8 @@ struct VSL_head {
#define VSL_BACKEND(ptr) (((ptr)[1]) & VSL_BACKENDMARKER)
#define VSL_DATA(ptr) ((char*)((ptr)+2))
#define VSL_CDATA(ptr) ((const char*)((ptr)+2))
+#define VSL_BATCHLEN(ptr) ((ptr)[1])
+#define VSL_BATCHID(ptr) (VSL_ID((ptr) + 2))
#define VSL_ENDMARKER (((uint32_t)SLT__Reserved << 24) | 0x454545) /* "EEE" */
#define VSL_WRAPMARKER (((uint32_t)SLT__Reserved << 24) | 0x575757) /* "WWW" */
diff --git a/lib/libvarnishapi/vsl_dispatch.c b/lib/libvarnishapi/vsl_dispatch.c
index 265e1e5..8c3bfe2 100644
--- a/lib/libvarnishapi/vsl_dispatch.c
+++ b/lib/libvarnishapi/vsl_dispatch.c
@@ -46,6 +46,9 @@
#define VTX_CACHE 10
#define VTX_BUFSIZE_MIN 64
+#define VTX_CHUNKS 3
+
+struct vtx;
enum vtx_type_e {
vtx_t_unknown,
@@ -62,6 +65,33 @@ enum vtx_link_e {
vtx_l_bereq,
};
+struct vslc_raw {
+ struct vslc c;
+ unsigned magic;
+#define VSLC_RAW_MAGIC 0x247EBD44
+
+ const uint32_t *start;
+ ssize_t len;
+ const uint32_t *next;
+};
+
+struct vtx_chunk {
+ struct VSLC_ptr start;
+ ssize_t len;
+ ssize_t offset;
+};
+
+struct vslc_vtx {
+ struct vslc c;
+ unsigned magic;
+#define VSLC_VTX_MAGIC 0x74C6523F
+
+ struct vtx *vtx;
+
+ unsigned chunk; /* Current chunk */
+ ssize_t offset; /* Offset of next record */
+};
+
struct vtx_key {
unsigned vxid;
VRB_ENTRY(vtx_key) entry;
@@ -74,12 +104,12 @@ struct vtx {
#define VTX_MAGIC 0xACC21D09
VTAILQ_ENTRY(vtx) list_child;
VTAILQ_ENTRY(vtx) list_incomplete;
+ VTAILQ_ENTRY(vtx) list_shm;
double t_start;
unsigned flags;
-#define VTX_F_SHM 0x1
-#define VTX_F_COMPLETE 0x2
-#define VTX_F_READY 0x4
+#define VTX_F_COMPLETE 0x1
+#define VTX_F_READY 0x2
enum vtx_type_e type;
@@ -89,33 +119,16 @@ struct vtx {
unsigned n_childready;
unsigned n_descend;
- const uint32_t *start;
+ struct vslc_vtx c;
ssize_t len;
- unsigned index;
+
+ struct vtx_chunk chunk[VTX_CHUNKS];
+ unsigned n_chunk;
uint32_t *buf;
ssize_t bufsize;
};
-struct vslc_raw {
- struct vslc c;
- unsigned magic;
-#define VSLC_RAW_MAGIC 0x247EBD44
-
- const uint32_t *start;
- ssize_t len;
- const uint32_t *next;
-};
-
-struct vslc_vtx {
- struct vslc c;
- unsigned magic;
-#define VSLC_VTX_MAGIC 0x74C6523F
-
- struct vtx *vtx;
- const uint32_t *next;
-};
-
struct VSLQ {
unsigned magic;
#define VSLQ_MAGIC 0x23A8BE97
@@ -131,6 +144,8 @@ struct VSLQ {
VTAILQ_HEAD(,vtx) incomplete;
unsigned n_incomplete;
+ VTAILQ_HEAD(,vtx) shmlist;
+
VTAILQ_HEAD(,vtx) cache;
unsigned n_cache;
};
@@ -215,17 +230,37 @@ static int
vslc_vtx_next(void *cursor)
{
struct vslc_vtx *c;
+ struct vtx_chunk *chunk;
CAST_OBJ_NOTNULL(c, cursor, VSLC_VTX_MAGIC);
- assert(c->next >= c->vtx->start);
- assert(c->next <= c->vtx->start + c->vtx->len);
- if (c->next < c->vtx->start + c->vtx->len) {
- c->c.c.rec.ptr = c->next;
- c->next = VSL_NEXT(c->next);
+ assert (c->offset <= c->vtx->len);
+ if (c->offset == c->vtx->len)
+ return (0);
+
+ if (c->vtx->n_chunk == 0) {
+ /* Buffer */
+ AN(c->vtx->buf);
+ assert(c->offset < c->vtx->bufsize);
+ c->c.c.rec.ptr = c->vtx->buf + c->offset;
+ c->offset += VSL_NEXT(c->c.c.rec.ptr) - c->c.c.rec.ptr;
return (1);
}
- return (0);
+
+ /* Shmptr chunks */
+ assert(c->chunk < c->vtx->n_chunk);
+ chunk = &c->vtx->chunk[c->chunk];
+ assert(c->offset >= chunk->offset);
+ assert(c->offset <= chunk->offset + chunk->len);
+ if (c->offset == chunk->offset + chunk->len) {
+ c->chunk++;
+ chunk = &c->vtx->chunk[c->chunk];
+ }
+ AN(chunk->start.ptr);
+ c->c.c.rec.ptr = chunk->start.ptr + c->offset - chunk->offset;
+ c->offset += VSL_NEXT(c->c.c.rec.ptr) - c->c.c.rec.ptr;
+
+ return (1);
}
static int
@@ -234,10 +269,8 @@ vslc_vtx_reset(void *cursor)
struct vslc_vtx *c;
CAST_OBJ_NOTNULL(c, cursor, VSLC_VTX_MAGIC);
-
- assert(c->next >= c->vtx->start);
- assert(c->next <= c->vtx->start + c->vtx->len);
- c->next = c->vtx->start;
+ c->chunk = 0;
+ c->offset = 0;
c->c.c.rec.ptr = NULL;
return (0);
@@ -251,23 +284,6 @@ static struct vslc_tbl vslc_vtx_tbl = {
.check = NULL,
};
-static void
-vslc_vtx_setup(struct vslc_vtx *c, struct vtx *vtx, unsigned level)
-{
- AN(c);
- AN(vtx);
-
- memset(c, 0, sizeof *c);
- c->c.c.vxid = vtx->key.vxid;
- c->c.c.level = level;
- c->c.magic = VSLC_MAGIC;
- c->c.tbl = &vslc_vtx_tbl;
-
- c->magic = VSLC_VTX_MAGIC;
- c->vtx = vtx;
- c->next = c->vtx->start;
-}
-
static struct vtx *
vtx_new(struct VSLQ *vslq)
{
@@ -282,6 +298,10 @@ vtx_new(struct VSLQ *vslq)
} else {
ALLOC_OBJ(vtx, VTX_MAGIC);
AN(vtx);
+ vtx->c.c.magic = VSLC_MAGIC;
+ vtx->c.c.tbl = &vslc_vtx_tbl;
+ vtx->c.magic = VSLC_VTX_MAGIC;
+ vtx->c.vtx = vtx;
}
vtx->key.vxid = 0;
@@ -293,9 +313,10 @@ vtx_new(struct VSLQ *vslq)
vtx->n_child = 0;
vtx->n_childready = 0;
vtx->n_descend = 0;
- vtx->start = vtx->buf;
+ (void)vslc_vtx_reset(&vtx->c);
vtx->len = 0;
- vtx->index = 0;
+ memset(&vtx->chunk, 0, sizeof vtx->chunk);
+ vtx->n_chunk = 0;
VTAILQ_INSERT_TAIL(&vslq->incomplete, vtx, list_incomplete);
vslq->n_incomplete++;
@@ -349,6 +370,9 @@ vtx_retire(struct VSLQ *vslq, struct vtx **pvtx)
AZ(vtx->n_descend);
AN(VRB_REMOVE(vtx_tree, &vslq->tree, &vtx->key));
+ if (vtx->n_chunk)
+ VTAILQ_REMOVE(&vslq->shmlist, vtx, list_shm);
+
if (vslq->n_cache < VTX_CACHE) {
VTAILQ_INSERT_HEAD(&vslq->cache, vtx, list_child);
vslq->n_cache++;
@@ -375,52 +399,81 @@ vtx_lori(struct VSLQ *vslq, unsigned vxid)
vtx = vtx_new(vslq);
AN(vtx);
vtx->key.vxid = vxid;
+ vtx->c.c.c.vxid = vxid;
AZ(VRB_INSERT(vtx_tree, &vslq->tree, &vtx->key));
return (vtx);
}
static void
-vtx_append(struct vtx *vtx, const uint32_t *ptr, ssize_t len, int shmptr_ok)
+vtx_set_bufsize(struct vtx *vtx, ssize_t len)
{
- ssize_t bufsize;
- const uint32_t *ptr2;
- ssize_t len2;
AN(vtx);
+ assert(len >= 0);
+ if (vtx->bufsize >= len)
+ return;
+ if (vtx->bufsize == 0)
+ vtx->bufsize = VTX_BUFSIZE_MIN;
+ while (vtx->bufsize < len)
+ vtx->bufsize *= 2;
+ vtx->buf = realloc(vtx->buf, sizeof (uint32_t) * vtx->bufsize);
+}
- if (vtx->flags & VTX_F_SHM) {
- assert(vtx->start != vtx->buf);
- ptr2 = vtx->start;
- vtx->start = vtx->buf;
- len2 = vtx->len;
- vtx->len = 0;
- vtx->flags &= ~VTX_F_SHM;
- vtx_append(vtx, ptr2, len2, 0);
- }
+static void
+vtx_buffer(struct VSLQ *vslq, struct vtx *vtx)
+{
+ int i;
+
+ AN(vtx->n_chunk);
+ AN(vtx->len);
+
+ vtx_set_bufsize(vtx, vtx->len);
+ AN(vtx->buf);
+ assert(vtx->bufsize >= vtx->len);
+
+ for (i = 0; i < vtx->n_chunk; i++)
+ memcpy(vtx->buf + vtx->chunk[i].offset, vtx->chunk[i].start.ptr,
+ sizeof (uint32_t) * vtx->chunk[i].len);
+
+ memset(&vtx->chunk, 0, sizeof vtx->chunk);
+ VTAILQ_REMOVE(&vslq->shmlist, vtx, list_shm);
+ vtx->n_chunk = 0;
+}
+static void
+vtx_append(struct VSLQ *vslq, struct vtx *vtx, const struct VSLC_ptr *start,
+ ssize_t len, int copy)
+{
+
+ AN(vtx);
if (len == 0)
return;
- AN(ptr);
-
- if (shmptr_ok && vtx->len == 0) {
- vtx->start = ptr;
- vtx->len = len;
- vtx->flags |= VTX_F_SHM;
+ AN(start);
+
+ if (vtx->len > 0 && vtx->n_chunk == 0)
+ /* Can't mix buffer and shmptr */
+ copy = 1;
+
+ if (!copy && vtx->n_chunk < VTX_CHUNKS) {
+ /* Add shmptr chunk */
+ AZ(vtx->chunk[vtx->n_chunk].len);
+ vtx->chunk[vtx->n_chunk].start = *start;
+ vtx->chunk[vtx->n_chunk].len = len;
+ vtx->chunk[vtx->n_chunk].offset = vtx->len;
+ vtx->len += len;
+ if (vtx->n_chunk == 0)
+ VTAILQ_INSERT_TAIL(&vslq->shmlist, vtx, list_shm);
+ vtx->n_chunk++;
return;
}
- bufsize = vtx->bufsize;
- if (bufsize == 0)
- bufsize = VTX_BUFSIZE_MIN;
- while (vtx->len + len > bufsize)
- bufsize *= 2;
- if (bufsize != vtx->bufsize) {
- vtx->buf = realloc(vtx->buf, 4 * bufsize);
- AN(vtx->buf);
- vtx->bufsize = bufsize;
- vtx->start = vtx->buf;
- }
- memcpy(&vtx->buf[vtx->len], ptr, 4 * len);
+ /* Append to buffer */
+ vtx_set_bufsize(vtx, vtx->len + len);
+ if (vtx->n_chunk)
+ vtx_buffer(vslq, vtx);
+ AZ(vtx->n_chunk);
+ AN(vtx->buf);
+ memcpy(vtx->buf + vtx->len, start->ptr, sizeof (uint32_t) * len);
vtx->len += len;
}
@@ -454,13 +507,6 @@ vtx_check_ready(struct VSLQ *vslq, struct vtx *vtx)
/* Top level vtx ready */
return (ready);
- if (vtx->flags & VTX_F_SHM) {
- /* Not ready, append zero to make sure it's not a shm
- reference */
- vtx_append(vtx, NULL, 0, 0);
- AZ(vtx->flags & VTX_F_SHM);
- }
-
return (NULL);
}
@@ -518,7 +564,7 @@ static int
vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
{
int i;
- enum vtx_type_e p_type;
+ enum vtx_type_e type;
unsigned p_vxid;
struct vtx *p_vtx;
@@ -527,25 +573,31 @@ vtx_scan_begintag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
if (vtx->flags & VTX_F_READY)
return (vtx_diag_tag(vtx, ptr, "link too late"));
- i = vtx_parsetag_bl(VSL_CDATA(ptr), VSL_LEN(ptr), &p_type, &p_vxid);
+ i = vtx_parsetag_bl(VSL_CDATA(ptr), VSL_LEN(ptr), &type, &p_vxid);
if (i < 1)
return (vtx_diag_tag(vtx, ptr, "parse error"));
/* Check/set vtx type */
- assert(p_type != vtx_t_unknown);
- if (vtx->type != vtx_t_unknown && vtx->type != p_type)
+ assert(type != vtx_t_unknown);
+ if (vtx->type != vtx_t_unknown && vtx->type != type)
return (vtx_diag_tag(vtx, ptr, "type mismatch"));
- vtx->type = p_type;
+ vtx->type = type;
if (i == 1 || p_vxid == 0)
return (0);
+ if (vslq->grouping == VSL_g_vxid)
+ return (0); /* No links */
+ if (vslq->grouping == VSL_g_request && vtx->type == vtx_t_req)
+ return (0); /* No links */
+
/* Lookup and check parent vtx */
p_vtx = vtx_lori(vslq, p_vxid);
AN(p_vtx);
if (vtx->parent == p_vtx)
/* Link already exists */
return (0);
+
if (vtx->parent != NULL)
return (vtx_diag_tag(vtx, ptr, "duplicate link"));
if (p_vtx->flags & VTX_F_READY)
@@ -574,6 +626,11 @@ vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
return (vtx_diag_tag(vtx, ptr, "parse error"));
assert(i == 2);
+ if (vslq->grouping == VSL_g_vxid)
+ return (0); /* No links */
+ if (vslq->grouping == VSL_g_request && vtx->type == vtx_t_sess)
+ return (0); /* No links */
+
/* Lookup and check child vtx */
c_vtx = vtx_lori(vslq, c_vxid);
AN(c_vtx);
@@ -584,7 +641,6 @@ vtx_scan_linktag(struct VSLQ *vslq, struct vtx *vtx, const uint32_t *ptr)
return (vtx_diag_tag(vtx, ptr, "duplicate link"));
if (c_vtx->flags & VTX_F_READY)
return (vtx_diag_tag(vtx, ptr, "link too late"));
- assert(c_type != vtx_t_unknown);
if (c_vtx->type != vtx_t_unknown && c_vtx->type != c_type)
return (vtx_diag_tag(vtx, ptr, "type mismatch"));
c_vtx->type = c_type;
@@ -599,16 +655,17 @@ vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
{
const uint32_t *ptr;
enum VSL_tag_e tag;
- int complete;
+ struct vtx *ret = NULL;
- complete = (vtx->flags & VTX_F_COMPLETE ? 1 : 0);
- ptr = vtx->start + vtx->index;
- assert(ptr <= vtx->start + vtx->len);
- for (; ptr < vtx->start + vtx->len; ptr = VSL_NEXT(ptr)) {
+ while (vslc_vtx_next(&vtx->c) == 1) {
+ ptr = vtx->c.c.c.rec.ptr;
tag = VSL_TAG(ptr);
- if (complete) {
- vtx_diag(vtx, "late log rec");
+ if (tag == SLT__Batch)
+ continue;
+
+ if (vtx->flags & VTX_F_COMPLETE) {
+ vtx_diag_tag(vtx, ptr, "late log rec");
continue;
}
@@ -625,31 +682,21 @@ vtx_scan(struct VSLQ *vslq, struct vtx *vtx)
break;
case SLT_End:
- complete = 1;
+ AZ(vtx->flags & VTX_F_COMPLETE);
+ AZ(ret);
+ VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
+ vtx->flags |= VTX_F_COMPLETE;
+ AN(vslq->n_incomplete);
+ vslq->n_incomplete--;
+ ret = vtx_check_ready(vslq, vtx);
break;
default:
break;
}
}
- vtx->index = ptr - vtx->start;
- assert(vtx->index <= vtx->len);
-
- if (!complete && vtx->flags & VTX_F_SHM) {
- /* Append zero to make sure it's not a shm reference */
- vtx_append(vtx, NULL, 0, 0);
- AZ(vtx->flags & VTX_F_SHM);
- }
- if (complete) {
- VTAILQ_REMOVE(&vslq->incomplete, vtx, list_incomplete);
- vtx->flags |= VTX_F_COMPLETE;
- AN(vslq->n_incomplete);
- vslq->n_incomplete--;
- return (vtx_check_ready(vslq, vtx));
- }
-
- return (NULL);
+ return (ret);
}
static struct vtx *
@@ -672,9 +719,9 @@ vslq_callback(struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
void *priv)
{
unsigned n = vtx->n_descend + 1;
+ struct vslc_vtx *cp[n];
+ struct VSL_cursor *CP[n + 1];
unsigned i, j;
- struct vslc_vtx c[n];
- struct VSL_cursor *cp[n + 1];
AN(vslq);
CHECK_OBJ_NOTNULL(vtx, VTX_MAGIC);
@@ -689,13 +736,17 @@ vslq_callback(struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
return (0);
i = j = 0;
- vslc_vtx_setup(&c[i], vtx, 0);
+ cp[i] = &vtx->c;
+ vslc_vtx_reset(cp[i]);
+ cp[i]->c.c.level = 0;
i++;
while (j < i) {
- vtx = VTAILQ_FIRST(&c[j].vtx->child);
+ vtx = VTAILQ_FIRST(&cp[j]->vtx->child);
while (vtx) {
assert(i < n);
- vslc_vtx_setup(&c[i], vtx, c[j].c.c.level + 1);
+ cp[i] = &vtx->c;
+ vslc_vtx_reset(cp[i]);
+ cp[i]->c.c.level = cp[j]->c.c.level + 1;
i++;
vtx = VTAILQ_NEXT(vtx, list_child);
}
@@ -705,12 +756,12 @@ vslq_callback(struct VSLQ *vslq, struct vtx *vtx, VSLQ_dispatch_f *func,
/* Reverse order */
for (i = 0; i < n; i++)
- cp[i] = &c[n - i - 1].c.c;
- cp[i] = NULL;
+ CP[i] = &cp[n - i - 1]->c.c;
+ CP[i] = NULL;
/* Query test goes here */
- if (vslq->query == NULL ? 1 : vslq_runquery(vslq->query, cp))
- return ((func)(vslq->vsl, cp, priv));
+ if (vslq->query == NULL ? 1 : vslq_runquery(vslq->query, CP))
+ return ((func)(vslq->vsl, CP, priv));
else
return (0);
}
@@ -744,6 +795,7 @@ VSLQ_New(struct VSL_data *vsl, struct VSL_cursor **cp,
vslq->query = query;
VRB_INIT(&vslq->tree);
VTAILQ_INIT(&vslq->incomplete);
+ VTAILQ_INIT(&vslq->shmlist);
VTAILQ_INIT(&vslq->cache);
return (vslq);
@@ -829,7 +881,6 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
struct VSL_cursor *c;
int i;
enum VSL_tag_e tag;
- const uint32_t *ptr;
ssize_t len;
unsigned vxid;
struct vtx *vtx;
@@ -842,24 +893,38 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
c = vslq->c;
while (1) {
+ /* Check shmlist and buffer on warning */
+ while ((vtx = VTAILQ_FIRST(&vslq->shmlist))) {
+ AN(vtx->n_chunk);
+ i = VSL_Check(c, &vtx->chunk[0].start);
+ if (i == 2)
+ break;
+ else if (i == 1)
+ vtx_buffer(vslq, vtx);
+ else
+ /* Too late */
+ return (-3);
+ }
+
i = VSL_Next(c);
if (i != 1)
- break;
+ return (i);
tag = VSL_TAG(c->rec.ptr);
if (tag == SLT__Batch) {
- ptr = VSL_NEXT(c->rec.ptr);
- len = VSL_WORDS(c->rec.ptr[1]);
- AZ(vsl_skip(c, len));
+ vxid = VSL_BATCHID(c->rec.ptr);
+ len = VSL_END(c->rec.ptr, VSL_BATCHLEN(c->rec.ptr)) -
+ c->rec.ptr;
} else {
- ptr = c->rec.ptr;
- len = VSL_NEXT(ptr) - ptr;
+ vxid = VSL_ID(c->rec.ptr);
+ len = VSL_NEXT(c->rec.ptr) - c->rec.ptr;
}
- vxid = VSL_ID(ptr);
if (vxid == 0)
continue;
vtx = vtx_lori(vslq, vxid);
AN(vtx);
- vtx_append(vtx, ptr, len, c->shmptr_ok);
+ vtx_append(vslq, vtx, &c->rec, len, VSL_Check(c, &c->rec) != 2);
+ if (tag == SLT__Batch)
+ AZ(vsl_skip(c, VSL_WORDS(VSL_BATCHLEN(c->rec.ptr))));
vtx = vtx_scan(vslq, vtx);
if (vtx) {
AN(vtx->flags & VTX_F_READY);
@@ -867,7 +932,7 @@ VSLQ_Dispatch(struct VSLQ *vslq, VSLQ_dispatch_f *func, void *priv)
vtx_retire(vslq, &vtx);
AZ(vtx);
if (i)
- break;
+ return (i);
}
}
if (i)
More information about the varnish-commit
mailing list