[master] 3b3173449 cache_deliver_proc: Add VDPIO for VAI-based delivery processing (filters)
Nils Goroll
nils.goroll at uplex.de
Fri Jul 4 17:04:04 UTC 2025
commit 3b3173449b2b9097eb8f5d3b09cfca3306e47096
Author: Nils Goroll <nils.goroll at uplex.de>
Date: Wed Jan 8 09:43:31 2025 +0100
cache_deliver_proc: Add VDPIO for VAI-based delivery processing (filters)
Context: Please read the message of previous commit "cache_obj: Add an
asynchronous iteration API" before continuing here.
Why?
The VAI interface lay ground for asynchronous iteration, but did not yet address
filters.
The existing VDP model is a "push" design: VDP_ObjIterate() calls VDP_bytes()
with data from storage. VDP_bytes() calls the first VDP, which does its
processing, calls VDP_bytes() for the next VDP etc until the last VDP sends data
out somewhere.
This is a good model for our existing "synchronous I/O from threads" design, but
it is, again, fundamentally incompatible with async I/O (see Context): Any
filter in the chain can assume that VDP_bytes(..., VDP_FLUSH) will, when it
returns, be done with the buffer, such that the filter may issue the next
VDP_byte(), potentially on the same buffer.
For async I/O, we need a model where buffers are handed out and explicitly
returned when done with.
Discarded prior work
Before ending up at the model in this patch, a "push" design had been attempted
where buffers would be handed from filter to filter, with the option for each
filter to say "first I need more input" or "I have more output after this". This
model turned out overly complicated, so it was discarded.
How?
The model in this patch takes VDP from "push" to "pull": The top level delivery
code asks the filter chain for buffers like it would ask the ObjVAI API for
data. An example is coming up in patch "vmod_debug: Switch transport_vai to
VDPIO Upgrade", it basically looks like this:
do {
nbufs = vdpio_pull(req->vdc, NULL, scarab);
send_data(scarab);
} while ((scarab->flags & VSCARAB_F_END) == 0)
Similarly to VDP_bytes(), vdpio_pull() calls into the VDP layer, but this time
in the other direction, from last VDP to first to storage. Each VDP now returns
buffers it has ready, and when it needs more data, it calls vdpio_pull() to get
more input buffers from the next layer and ultimately from the storage engine.
Different to VDP_bytes(), vdpio_pull() has a tail call to the next layer and can
be inlined, which shifts some responsibility, more on that in the next section.
API Usage
The VDP filter API is similar to the existing API in that it consists of an
initializer, a finalizer and a "bytes" function, which is now called "lease" to
match the lease concept introduced with VAI. The lease function is called from
vdpio_pull(). It is responsible for vdpio_pull()'ing data from the previous
layer, processing it and putting buffers into a provided output vscarab.
The lease function return values are the same as of ObjVAIlease(): negative for
"call again later" and errors, otherwise the number of extents added.
The lease function SHOULD signal end-of-data by setting the vscarab flag
VSCARAB_F_END in the scarab which contains the last chunk of data. If it can not
do this, it MUST set the vscarab flag VSCARAB_F_END for all subsequent calls
(for which 0 is returned).
The lease function MUST accept a partially filled vscarab. If it can not add
anything to it, because the minimum capacity is not available, it MUST return
zero.
The lease function is now responsible for maintaining the .calls and .bytes_in
members of its struct vdp_entry.
Any data which the VDP creates needs to either put into buffers allocated from
storage via ObjVAIbuffer(), or be guaranteed to remain valid until the end of
delivery (like static and workspace pointers) and carry the lease
VAI_LEASE_NORET. Any buffers which the VDP receives from a previous layer and
does not emit in the output vscarab need to be returned with ObjVAIreturn(). To
batch these returns, a VSCARET is kept in the VDP context and managed by these
helpers:
- vdpio_return_lease() to return a single lease to the batch
- vdpio_return_vscarab() to return a full vscarab to the batch
- vdpio_consolidate_vscarab() to return all leases with a zero size, where the
zero size marks them being consumed. This is intended to facilitate input
vscarabs.
Naturally, this API is likely to still evolve.
VDPIO management & transitional interface
The basic model for VDPIO is kept from VDP: There is VDPIO_Push() to match
VDP_Push() and VDPIO_Close() to match VDP_Close(). Yet, for the time being, we
need to have VDP and VDPIO co-exist: Not all filters will be ready for VDPIO and
there will be bugs, so we will want the option to go back to the old interface.
This is VDPIO_Upgrade() used: It works an already initialized VDP filter list
and retuns if it can be upgraded to VDPIO. To do so, it calls the vdpio_upgrade
function of each VDP. If a vdpio_upgrade function is missing for any filter, all
of the upgrade fails and the caller is expected to fall back to traditional VDP.
VDPIO_Push() can be used to push additional VDPIO-enabled VDPs after a
successful upgrade, or if only VDPIO-enabled VDPs are used.
diff --git a/bin/varnishd/cache/cache_deliver_proc.c b/bin/varnishd/cache/cache_deliver_proc.c
index 83ee48e61..7b1fd0ab9 100644
--- a/bin/varnishd/cache/cache_deliver_proc.c
+++ b/bin/varnishd/cache/cache_deliver_proc.c
@@ -244,6 +244,139 @@ VDP_Close(struct vdp_ctx *vdc, struct objcore *oc, struct boc *boc)
/*--------------------------------------------------------------------*/
+/*
+ * Push a VDPIO vdp. This can only be used with only vdpio-enabled VDPs or
+ * after a successful upgrade
+ */
+int
+VDPIO_Push(VRT_CTX, struct vdp_ctx *vdc, struct ws *ws, const struct vdp *vdp,
+ void *priv)
+{
+ struct vdp_entry *vdpe;
+ int r;
+
+ CHECK_OBJ_NOTNULL(ctx, VRT_CTX_MAGIC);
+ CHECK_OBJ_NOTNULL(vdc, VDP_CTX_MAGIC);
+ CHECK_OBJ_ORNULL(vdc->oc, OBJCORE_MAGIC);
+ CHECK_OBJ_NOTNULL(vdc->hp, HTTP_MAGIC);
+ AN(vdc->clen);
+ assert(*vdc->clen >= -1);
+ AN(ws);
+ AN(vdp);
+ AN(vdp->name);
+
+ if (vdc->retval < 0)
+ return (vdc->retval);
+
+ AN(vdp->io_init);
+
+ // the first VDP (which leases from storage) only gets the minimum
+ // capacity requirement of 1
+ if (vdc->retval == 0) {
+ assert(VTAILQ_EMPTY(&vdc->vdp));
+ vdc->retval = 1;
+ }
+
+ if (DO_DEBUG(DBG_PROCESSORS))
+ VSLb(vdc->vsl, SLT_Debug, "VDPIO_push(%s)", vdp->name);
+
+ vdpe = WS_Alloc(ws, sizeof *vdpe);
+ if (vdpe == NULL) {
+ vdc->retval = -ENOMEM;
+ return (vdc->retval);
+ }
+ INIT_OBJ(vdpe, VDP_ENTRY_MAGIC);
+ vdpe->vdp = vdp;
+ vdpe->priv = priv;
+ VTAILQ_INSERT_TAIL(&vdc->vdp, vdpe, list);
+ vdc->nxt = VTAILQ_FIRST(&vdc->vdp);
+
+ assert(vdc->retval > 0);
+ if (vdpe->vdp->io_init != NULL) {
+ r = vdpe->vdp->io_init(ctx, vdc, &vdpe->priv, vdc->retval);
+ if (r <= 0) {
+ VTAILQ_REMOVE(&vdc->vdp, vdpe, list);
+ vdc->nxt = VTAILQ_FIRST(&vdc->vdp);
+ }
+ else
+ AN(vdp->io_lease);
+ if (r != 0)
+ vdc->retval = r;
+ }
+ vdc->oc = NULL;
+ return (vdc->retval);
+}
+
+/*
+ * upgrade an already initialized VDP filter chain to VDPIO, if possible
+ * returns:
+ * > 0 cap
+ * -ENOTSUP io_upgrade missing for at least one filter
+ * vdc->retval if < 0
+ */
+int
+VDPIO_Upgrade(VRT_CTX, struct vdp_ctx *vdc)
+{
+ struct vdp_entry *vdpe;
+ int cap, r;
+
+ CHECK_OBJ_NOTNULL(ctx, VRT_CTX_MAGIC);
+ CHECK_OBJ_NOTNULL(vdc, VDP_CTX_MAGIC);
+
+ VTAILQ_FOREACH(vdpe, &vdc->vdp, list)
+ if (vdpe->vdp->io_upgrade == NULL)
+ return (-ENOTSUP);
+
+ if (vdc->retval < 0)
+ return (vdc->retval);
+
+ // minimum capacity requirement for the first filter (after storage)
+ r = cap = 1;
+ VTAILQ_FOREACH(vdpe, &vdc->vdp, list) {
+ r = vdpe->vdp->io_upgrade(ctx, vdc, &vdpe->priv, cap);
+ if (DO_DEBUG(DBG_PROCESSORS)) {
+ VSLb(vdc->vsl, SLT_Debug, "VDPIO_Upgrade "
+ "%d = %s(cap = %d)",
+ r, vdpe->vdp->name, cap);
+ }
+ if (r < 0)
+ return ((vdc->retval = r));
+ // XXX remove if filter does not want to be pushed?
+ assert(r != 0);
+ cap = r;
+ }
+ return ((vdc->retval = r));
+}
+
+uint64_t
+VDPIO_Close(struct vdp_ctx *vdc, struct objcore *oc, struct boc *boc)
+{
+ struct vdp_entry *vdpe;
+ uint64_t rv = 0;
+
+ CHECK_OBJ_NOTNULL(vdc, VDP_CTX_MAGIC);
+ CHECK_OBJ_NOTNULL(vdc->wrk, WORKER_MAGIC);
+ CHECK_OBJ_ORNULL(oc, OBJCORE_MAGIC);
+ CHECK_OBJ_ORNULL(boc, BOC_MAGIC);
+
+ while ((vdpe = VTAILQ_FIRST(&vdc->vdp)) != NULL) {
+ CHECK_OBJ(vdpe, VDP_ENTRY_MAGIC);
+ rv = vdpe->bytes_in;
+ VSLb(vdc->vsl, SLT_VdpAcct, "%s %ju %ju", vdpe->vdp->name,
+ (uintmax_t)vdpe->calls, (uintmax_t)rv);
+ if (vdpe->vdp->io_fini != NULL)
+ vdpe->vdp->io_fini(vdc, &vdpe->priv);
+ AZ(vdpe->priv);
+ VTAILQ_REMOVE(&vdc->vdp, vdpe, list);
+ vdc->nxt = VTAILQ_FIRST(&vdc->vdp);
+ }
+
+ if (oc != NULL)
+ HSH_Cancel(vdc->wrk, oc, boc);
+ return (rv);
+}
+
+/*--------------------------------------------------------------------*/
int v_matchproto_(objiterate_f)
VDP_ObjIterate(void *priv, unsigned flush, const void *ptr, ssize_t len)
{
diff --git a/bin/varnishd/cache/cache_filter.h b/bin/varnishd/cache/cache_filter.h
index 329ab9982..ce3cdabc2 100644
--- a/bin/varnishd/cache/cache_filter.h
+++ b/bin/varnishd/cache/cache_filter.h
@@ -33,6 +33,7 @@ struct req;
struct vfp_entry;
struct vfp_ctx;
struct vdp_ctx;
+struct vdp_entry;
/* Fetch processors --------------------------------------------------*/
@@ -125,12 +126,55 @@ typedef int vdp_fini_f(struct vdp_ctx *, void **priv);
typedef int vdp_bytes_f(struct vdp_ctx *, enum vdp_action, void **priv,
const void *ptr, ssize_t len);
+/*
+ * ============================================================
+ * vdpio io-vector interface
+ */
+typedef int vdpio_init_f(VRT_CTX, struct vdp_ctx *, void **priv, int capacity);
+/*
+ * the vdpio_init_f() are called front (object iterator) to back (consumer).
+ *
+ * each init function returns the minimum number of io vectors (vscarab
+ * capacity) that it requires the next filter to accept. This capacity is
+ * passed to the next init function such that it can allocate sufficient
+ * space to fulfil the requirement of the previous filter.
+ *
+ * Return values:
+ * < 0 : Error
+ * == 0 ; NOOP, do not push this filter
+ * >= 1 : capacity requirement
+ *
+ * typedef is shared with upgrade
+ */
+
+typedef int vdpio_lease_f(struct vdp_ctx *, struct vdp_entry *, struct vscarab *scarab);
+/*
+ * vdpio_lease_f() returns leases provided by this filter layer in the vscarab
+ * probided by the caller.
+ *
+ * called via vdpio_pull(): the last filter is called first by delivery. Each
+ * filter calls the previous layer for leases. The first filter calls storage.
+ *
+ * return values are as for ObjVAIlease()
+ *
+ * Other notable differences to vdp_bytes_f:
+ * - responsible for updating (struct vdp_entry).bytes_in and .calls
+ *
+ */
+
+typedef void vdpio_fini_f(struct vdp_ctx *, void **priv);
+
struct vdp {
const char *name;
vdp_init_f *init;
vdp_bytes_f *bytes;
vdp_fini_f *fini;
const void *priv1;
+
+ vdpio_init_f *io_init;
+ vdpio_init_f *io_upgrade;
+ vdpio_lease_f *io_lease;
+ vdpio_fini_f *io_fini;
};
struct vdp_entry {
@@ -149,10 +193,10 @@ VTAILQ_HEAD(vdp_entry_s, vdp_entry);
struct vdp_ctx {
unsigned magic;
#define VDP_CTX_MAGIC 0xee501df7
- int retval;
- uint64_t bytes_done;
+ int retval; // vdpio: error or capacity
+ uint64_t bytes_done; // not used with vdpio
struct vdp_entry_s vdp;
- struct vdp_entry *nxt;
+ struct vdp_entry *nxt; // not needed for vdpio
struct worker *wrk;
struct vsl_log *vsl;
// NULL'ed after the first filter has been pushed
@@ -160,10 +204,119 @@ struct vdp_ctx {
// NULL'ed for delivery
struct http *hp;
intmax_t *clen;
+ // only for vdpio
+ vai_hdl vai_hdl;
+ struct vscaret *scaret;
};
int VDP_bytes(struct vdp_ctx *, enum vdp_action act, const void *, ssize_t);
+/*
+ * vdpe == NULL: get lesaes from the last layer
+ * vdpe != NULL: get leases from the previous layer or storage
+ *
+ * conversely to VDP_bytes, vdpio calls happen back (delivery) to front (storage)
+ *
+ * ends up in a tail call to the previous layer to save stack space
+ */
+static inline int
+vdpio_pull(struct vdp_ctx *vdc, struct vdp_entry *vdpe, struct vscarab *scarab)
+{
+
+ CHECK_OBJ_NOTNULL(vdc, VDP_CTX_MAGIC);
+
+ if (vdpe == NULL)
+ vdpe = VTAILQ_LAST(&vdc->vdp, vdp_entry_s);
+ else {
+ CHECK_OBJ(vdpe, VDP_ENTRY_MAGIC);
+ vdpe = VTAILQ_PREV(vdpe, vdp_entry_s, list);
+ }
+
+ if (vdpe != NULL)
+ return (vdpe->vdp->io_lease(vdc, vdpe, scarab));
+ else
+ return (ObjVAIlease(vdc->wrk, vdc->vai_hdl, scarab));
+}
+
+/*
+ * ============================================================
+ * VDPIO helpers
+ */
+
+/*
+ * l bytes have been written to buf. save these to out and checkpoint buf for
+ * the remaining free space
+ */
+static inline void
+iovec_collect(struct iovec *buf, struct iovec *out, size_t l)
+{
+ if (out->iov_base == NULL)
+ out->iov_base = buf->iov_base;
+ assert((char *)out->iov_base + out->iov_len == buf->iov_base);
+ out->iov_len += l;
+ buf->iov_base = (char *)buf->iov_base + l;
+ buf->iov_len -= l;
+}
+
+/*
+ * return a single lease via the vdc vscaret
+ */
+static inline
+void vdpio_return_lease(const struct vdp_ctx *vdc, uint64_t lease)
+{
+ struct vscaret *scaret;
+
+ CHECK_OBJ_NOTNULL(vdc, VDP_CTX_MAGIC);
+ scaret = vdc->scaret;
+ VSCARET_CHECK_NOTNULL(scaret);
+
+ if (scaret->used == scaret->capacity)
+ ObjVAIreturn(vdc->wrk, vdc->vai_hdl, scaret);
+ VSCARET_ADD(scaret, lease);
+}
+
+/*
+ * add all leases from the vscarab to the vscaret
+ */
+static inline
+void vdpio_return_vscarab(const struct vdp_ctx *vdc, struct vscarab *scarab)
+{
+ struct viov *v;
+
+ VSCARAB_CHECK_NOTNULL(scarab);
+ VSCARAB_FOREACH(v, scarab)
+ vdpio_return_lease(vdc, v->lease);
+ VSCARAB_INIT(scarab, scarab->capacity);
+}
+
+/*
+ * return used up iovs (len == 0)
+ * move remaining to the beginning of the scarab
+ */
+static inline void
+vdpio_consolidate_vscarab(const struct vdp_ctx *vdc, struct vscarab *scarab)
+{
+ struct viov *v, *f = NULL;
+
+ VSCARAB_CHECK_NOTNULL(scarab);
+ VSCARAB_FOREACH(v, scarab) {
+ if (v->iov.iov_len == 0) {
+ AN(v->iov.iov_base);
+ vdpio_return_lease(vdc, v->lease);
+ if (f == NULL)
+ f = v;
+ continue;
+ }
+ else if (f == NULL)
+ continue;
+ memmove(f, v, scarab->used - (v - scarab->s) * sizeof (*v));
+ break;
+ }
+ if (f != NULL)
+ scarab->used = f - scarab->s;
+}
+
+
void v_deprecated_ VRT_AddVDP(VRT_CTX, const struct vdp *);
void v_deprecated_ VRT_RemoveVDP(VRT_CTX, const struct vdp *);
diff --git a/bin/varnishd/cache/cache_varnishd.h b/bin/varnishd/cache/cache_varnishd.h
index 6d73c04fd..2892ef188 100644
--- a/bin/varnishd/cache/cache_varnishd.h
+++ b/bin/varnishd/cache/cache_varnishd.h
@@ -199,6 +199,11 @@ extern const struct vdp VDP_gunzip;
extern const struct vdp VDP_esi;
extern const struct vdp VDP_range;
+uint64_t VDPIO_Close(struct vdp_ctx *, struct objcore *, struct boc *);
+int VDPIO_Upgrade(VRT_CTX, struct vdp_ctx *vdc);
+int VDPIO_Push(VRT_CTX, struct vdp_ctx *, struct ws *, const struct vdp *,
+ void *priv);
+
/* cache_exp.c */
vtim_real EXP_Ttl(const struct req *, const struct objcore *);
More information about the varnish-commit
mailing list