[master] 9f65997fa storage_simple: Implement asynchronous iteration and use it for the iterator
Nils Goroll
nils.goroll at uplex.de
Fri Jul 4 17:04:04 UTC 2025
commit 9f65997fa11aaa1d3287856749a6b856f8efba62
Author: Nils Goroll <nils.goroll at uplex.de>
Date: Mon Jan 6 22:02:36 2025 +0100
storage_simple: Implement asynchronous iteration and use it for the iterator
This commit implements the asynchronous iteration API defined and described in
previous commits for the simple storage layer and reimplements the synchronous
iterator with it.
This commit message does not provide background information, please refer to the
two previous commits.
Implementation
sml_ai_init() initializes the handle and choses either a simple or more
elaborate "boc" lease function depending on whether or not a streaming fetch is
ongoing (boc present).
sml_ai_lease_simple() is just that, dead simple. It iterates the storage segment
list and fills the VSCARAB provided by the caller. It is a good starting point
into the implementation.
sml_ai_lease_boc() handles the busy case and is more elaborate due to the nature
of streaming fetches. It first calls ObjVAIGetExtend() to get the current
extent. If no data is available, it returns the appropriate value.
Other than that, is basically does the same things as sml_ai_lease_simple() with
these exceptions: It also needs to return partial extents ("fragments") and it
needs to handle the case where the last available segment is reached, in which
case there is no successor to store for the next invocation.
sml_ai_return() is only used for the "boc" case. It removes returned full
segments from the list and then frees them outside the boc mtx. It also adds
special handling for the last segment still needed by sml_ai_lease_boc() to
resume. This segment is retained on the VSCARET.
sml_ai_fini() is straight forward and should not need explanation.
Implementation of sml_iterator() using the new API
To reimplement the existing synchronous iterator based on the new API, we first
need a little facility to block waiting for a notification. This is struct
sml_notify with the four sml_notify* functions. sml_notify() is the callback,
sml_notify_wait() blocks waiting for a notification to arrive.
Until it runs out of work, the iterator performs these steps:
ObjVAIlease() is called repeatedly until either the VSCARAB is full or a
negative value is returned. This allows the rest of the code to react to the
next condition appropriately by sending an OBJ_ITER_FLUSH with the last lease
only.
Calling func() on each extent is trivial, the complications only come from
handling OBJ_ITER_FLUSH, "just in time" returns and error handling.
diff --git a/bin/varnishd/storage/storage_persistent.c b/bin/varnishd/storage/storage_persistent.c
index f193ae976..5eceb7e32 100644
--- a/bin/varnishd/storage/storage_persistent.c
+++ b/bin/varnishd/storage/storage_persistent.c
@@ -696,6 +696,7 @@ smp_init(void)
smp_oc_realmethods.objsetattr = SML_methods.objsetattr;
smp_oc_realmethods.objtouch = LRU_Touch;
smp_oc_realmethods.objfree = smp_oc_objfree;
+ smp_oc_realmethods.vai_init = SML_methods.vai_init;
}
/*--------------------------------------------------------------------
diff --git a/bin/varnishd/storage/storage_simple.c b/bin/varnishd/storage/storage_simple.c
index 8c81c85ee..ce185fd1b 100644
--- a/bin/varnishd/storage/storage_simple.c
+++ b/bin/varnishd/storage/storage_simple.c
@@ -31,6 +31,8 @@
#include "config.h"
+#include <stdlib.h>
+
#include "cache/cache_varnishd.h"
#include "cache/cache_obj.h"
@@ -306,130 +308,434 @@ sml_objfree(struct worker *wrk, struct objcore *oc)
wrk->stats->n_object--;
}
+// kept for reviewers - XXX remove later
+#undef VAI_DBG
+
+struct sml_hdl {
+ struct vai_hdl_preamble preamble;
+#define SML_HDL_MAGIC 0x37dfd996
+ struct vai_qe qe;
+ struct ws *ws; // NULL is malloc()
+ struct objcore *oc;
+ struct object *obj;
+ const struct stevedore *stv;
+ struct boc *boc;
+
+ struct storage *st; // updated by _lease()
+
+ // only for _lease_boc()
+ uint64_t st_off; // already returned fragment of current st
+ uint64_t avail, returned;
+ struct storage *last; // to resume, held back by _return()
+};
+
+static inline void
+sml_ai_viov_fill(struct viov *viov, struct storage *st)
+{
+ viov->iov.iov_base = st->ptr;
+ viov->iov.iov_len = st->len;
+ viov->lease = (uintptr_t)st;
+ VAI_ASSERT_LEASE(viov->lease);
+}
+
+static int
+sml_ai_lease_simple(struct worker *wrk, vai_hdl vhdl, struct vscarab *scarab)
+{
+ struct storage *st;
+ struct sml_hdl *hdl;
+ struct viov *viov;
+ int r = 0;
+
+ (void) wrk;
+ CAST_VAI_HDL_NOTNULL(hdl, vhdl, SML_HDL_MAGIC);
+ VSCARAB_CHECK_NOTNULL(scarab);
+
+ AZ(hdl->st_off);
+ st = hdl->st;
+ while (st != NULL && (viov = VSCARAB_GET(scarab)) != NULL) {
+ CHECK_OBJ(st, STORAGE_MAGIC);
+ sml_ai_viov_fill(viov, st);
+ r++;
+ st = VTAILQ_PREV(st, storagehead, list);
+ }
+ hdl->st = st;
+ if (st == NULL)
+ scarab->flags |= VSCARAB_F_END;
+ return (r);
+}
+
+/*
+ * on leases while streaming (with a boc):
+ *
+ * SML uses the lease return facility to implement the "free behind" for
+ * OC_F_TRANSIENT objects. When streaming, we also return leases on
+ * fragments of sts, but we must only "free behind" when we are done with the
+ * last fragment.
+ *
+ * So we use a magic lease to signal "this is only a fragment", which we ignore
+ * on returns
+ */
+
+static int
+sml_ai_lease_boc(struct worker *wrk, vai_hdl vhdl, struct vscarab *scarab)
+{
+ enum boc_state_e state = BOS_INVALID;
+ struct storage *next;
+ struct sml_hdl *hdl;
+ struct viov *viov;
+ int r = 0;
+
+ CAST_VAI_HDL_NOTNULL(hdl, vhdl, SML_HDL_MAGIC);
+ VSCARAB_CHECK_NOTNULL(scarab);
+
+ if (hdl->avail == hdl->returned) {
+ hdl->avail = ObjVAIGetExtend(wrk, hdl->oc, hdl->returned,
+ &state, &hdl->qe);
+ if (state == BOS_FAILED) {
+ hdl->last = NULL;
+ return (-EPIPE);
+ }
+ else if (state == BOS_FINISHED)
+ (void)0;
+ else if (hdl->avail == hdl->returned) {
+ // ObjVAIGetExtend() has scheduled a notification
+ if (hdl->boc->transit_buffer > 0)
+ return (-ENOBUFS);
+ else
+ return (-EAGAIN);
+ }
+ else
+ assert(state < BOS_FINISHED);
+ }
+ Lck_Lock(&hdl->boc->mtx);
+ if (hdl->st == NULL && hdl->last != NULL) {
+ /* when the "last" st completed, we did not yet have a next, so
+ * resume from there. Because "last" might have been returned and
+ * deleted, we can not just use the pointer, but rather need to
+ * iterate the st list.
+ * if we can not find "last", it also has been returned and
+ * deleted, and the current write head (VTAILQ_LAST) is our next
+ * st, which can also be null if we are done.
+ */
+ VTAILQ_FOREACH_REVERSE(next, &hdl->obj->list, storagehead, list) {
+ if (next == hdl->last) {
+ hdl->st = VTAILQ_PREV(next, storagehead, list);
+ break;
+ }
+ }
+ }
+ hdl->last = NULL;
+ if (hdl->st == NULL) {
+ assert(hdl->returned == 0 || hdl->avail == hdl->returned);
+ hdl->st = VTAILQ_LAST(&hdl->obj->list, storagehead);
+ }
+ if (hdl->st == NULL)
+ assert(hdl->avail == hdl->returned);
+
+ while (hdl->avail > hdl->returned && (viov = VSCARAB_GET(scarab)) != NULL) {
+ CHECK_OBJ_NOTNULL(hdl->st, STORAGE_MAGIC); // ObjVAIGetExtend ensures
+ assert(hdl->st_off <= hdl->st->space);
+ size_t av = hdl->avail - hdl->returned;
+ size_t l = hdl->st->space - hdl->st_off;
+ AN(l);
+ if (l > av)
+ l = av;
+ viov->iov.iov_base = hdl->st->ptr + hdl->st_off;
+ viov->iov.iov_len = l;
+ if (hdl->st_off + l == hdl->st->space) {
+ next = VTAILQ_PREV(hdl->st, storagehead, list);
+ AZ(hdl->last);
+ if (next == NULL)
+ hdl->last = hdl->st;
+ else
+ CHECK_OBJ(next, STORAGE_MAGIC);
+#ifdef VAI_DBG
+ VSLb(wrk->vsl, SLT_Debug, "off %zu + l %zu == space st %p next st %p stvprv %p",
+ hdl->st_off, l, hdl->st, next, hdl->boc->stevedore_priv);
+#endif
+ viov->lease = (uintptr_t)hdl->st;
+ hdl->st_off = 0;
+ hdl->st = next;
+ }
+ else {
+ viov->lease = VAI_LEASE_NORET;
+ hdl->st_off += l;
+ }
+ hdl->returned += l;
+ VAI_ASSERT_LEASE(viov->lease);
+ r++;
+ }
+
+ Lck_Unlock(&hdl->boc->mtx);
+ if (state != BOS_FINISHED && hdl->avail == hdl->returned) {
+ hdl->avail = ObjVAIGetExtend(wrk, hdl->oc, hdl->returned,
+ &state, &hdl->qe);
+ }
+ if (state == BOS_FINISHED && hdl->avail == hdl->returned)
+ scarab->flags |= VSCARAB_F_END;
+ return (r);
+}
+
+static void v_matchproto_(vai_return_f)
+sml_ai_return(struct worker *wrk, vai_hdl vhdl, struct vscaret *scaret)
+{
+ struct storage *st;
+ struct sml_hdl *hdl;
+ uint64_t *p;
+
+ (void) wrk;
+ CAST_VAI_HDL_NOTNULL(hdl, vhdl, SML_HDL_MAGIC);
+ VSCARET_CHECK_NOTNULL(scaret);
+ if (scaret->used == 0)
+ return;
+
+ // callback is only registered if needed
+ assert(hdl->boc != NULL && (hdl->oc->flags & OC_F_TRANSIENT) != 0);
+
+ // filter noret and last
+ VSCARET_LOCAL(todo, scaret->used);
+ VSCARET_FOREACH(p, scaret) {
+ if (*p == VAI_LEASE_NORET)
+ continue;
+ CAST_OBJ_NOTNULL(st, (void *)*p, STORAGE_MAGIC);
+ if (st == hdl->last)
+ continue;
+ VSCARET_ADD(todo, *p);
+ }
+ VSCARET_INIT(scaret, scaret->capacity);
+
+ Lck_Lock(&hdl->boc->mtx);
+ VSCARET_FOREACH(p, todo) {
+ CAST_OBJ_NOTNULL(st, (void *)*p, STORAGE_MAGIC);
+ VTAILQ_REMOVE(&hdl->obj->list, st, list);
+ if (st == hdl->boc->stevedore_priv)
+ hdl->boc->stevedore_priv = trim_once;
+ }
+ Lck_Unlock(&hdl->boc->mtx);
+
+ VSCARET_FOREACH(p, todo) {
+ CAST_OBJ_NOTNULL(st, (void *)*p, STORAGE_MAGIC);
+ sml_stv_free(hdl->stv, st);
+ }
+}
+
+static void v_matchproto_(vai_fini_f)
+sml_ai_fini(struct worker *wrk, vai_hdl *vai_hdlp)
+{
+ struct sml_hdl *hdl;
+
+ AN(vai_hdlp);
+ CAST_VAI_HDL_NOTNULL(hdl, *vai_hdlp, SML_HDL_MAGIC);
+ *vai_hdlp = NULL;
+
+ if (hdl->boc != NULL) {
+ ObjVAICancel(wrk, hdl->boc, &hdl->qe);
+ HSH_DerefBoc(wrk, hdl->oc);
+ hdl->boc = NULL;
+ }
+
+ if (hdl->ws != NULL)
+ WS_Release(hdl->ws, 0);
+ else
+ free(hdl);
+}
+
+static vai_hdl v_matchproto_(vai_init_f)
+sml_ai_init(struct worker *wrk, struct objcore *oc, struct ws *ws,
+ vai_notify_cb *notify, void *notify_priv)
+{
+ struct sml_hdl *hdl;
+ const size_t sz = sizeof *hdl;
+
+ if (ws != NULL && WS_ReserveSize(ws, (unsigned)sz))
+ hdl = WS_Reservation(ws);
+ else {
+ hdl = malloc(sz);
+ ws = NULL;
+ }
+
+ AN(hdl);
+ INIT_VAI_HDL(hdl, SML_HDL_MAGIC);
+ hdl->preamble.vai_lease = sml_ai_lease_simple;
+ hdl->preamble.vai_fini = sml_ai_fini;
+ hdl->ws = ws;
+
+ hdl->oc = oc;
+ hdl->obj = sml_getobj(wrk, oc);
+ CHECK_OBJ_NOTNULL(hdl->obj, OBJECT_MAGIC);
+ hdl->stv = oc->stobj->stevedore;
+ CHECK_OBJ_NOTNULL(hdl->stv, STEVEDORE_MAGIC);
+
+ hdl->st = VTAILQ_LAST(&hdl->obj->list, storagehead);
+ CHECK_OBJ_ORNULL(hdl->st, STORAGE_MAGIC);
+
+ hdl->boc = HSH_RefBoc(oc);
+ if (hdl->boc == NULL)
+ return (hdl);
+ /* we only initialize notifications if we have a boc, so
+ * any wrong attempt triggers magic checks.
+ */
+ hdl->preamble.vai_lease = sml_ai_lease_boc;
+ if ((hdl->oc->flags & OC_F_TRANSIENT) != 0)
+ hdl->preamble.vai_return = sml_ai_return;
+ hdl->qe.magic = VAI_Q_MAGIC;
+ hdl->qe.cb = notify;
+ hdl->qe.hdl = hdl;
+ hdl->qe.priv = notify_priv;
+ return (hdl);
+}
+
+/*
+ * trivial notification to allow the iterator to simply block
+ */
+struct sml_notify {
+ unsigned magic;
+#define SML_NOTIFY_MAGIC 0x4589af31
+ unsigned hasmore;
+ pthread_mutex_t mtx;
+ pthread_cond_t cond;
+};
+
+static void
+sml_notify_init(struct sml_notify *sn)
+{
+
+ INIT_OBJ(sn, SML_NOTIFY_MAGIC);
+ AZ(pthread_mutex_init(&sn->mtx, NULL));
+ AZ(pthread_cond_init(&sn->cond, NULL));
+}
+
+static void
+sml_notify_fini(struct sml_notify *sn)
+{
+
+ CHECK_OBJ_NOTNULL(sn, SML_NOTIFY_MAGIC);
+ AZ(pthread_mutex_destroy(&sn->mtx));
+ AZ(pthread_cond_destroy(&sn->cond));
+}
+
+static void v_matchproto_(vai_notify_cb)
+sml_notify(vai_hdl hdl, void *priv)
+{
+ struct sml_notify *sn;
+
+ (void) hdl;
+ CAST_OBJ_NOTNULL(sn, priv, SML_NOTIFY_MAGIC);
+ AZ(pthread_mutex_lock(&sn->mtx));
+ sn->hasmore = 1;
+ AZ(pthread_cond_signal(&sn->cond));
+ AZ(pthread_mutex_unlock(&sn->mtx));
+
+}
+
+static void
+sml_notify_wait(struct sml_notify *sn)
+{
+
+ CHECK_OBJ_NOTNULL(sn, SML_NOTIFY_MAGIC);
+ AZ(pthread_mutex_lock(&sn->mtx));
+ while (sn->hasmore == 0)
+ AZ(pthread_cond_wait(&sn->cond, &sn->mtx));
+ AN(sn->hasmore);
+ sn->hasmore = 0;
+ AZ(pthread_mutex_unlock(&sn->mtx));
+}
+
static int v_matchproto_(objiterator_f)
sml_iterator(struct worker *wrk, struct objcore *oc,
void *priv, objiterate_f *func, int final)
{
- struct boc *boc;
- enum boc_state_e state;
- struct object *obj;
- struct storage *st;
- struct storage *checkpoint = NULL;
- const struct stevedore *stv;
- ssize_t checkpoint_len = 0;
- ssize_t len = 0;
- int ret = 0, ret2;
- ssize_t ol;
- ssize_t nl;
- ssize_t sl;
- void *p;
- ssize_t l;
- unsigned u;
-
- obj = sml_getobj(wrk, oc);
- CHECK_OBJ_NOTNULL(obj, OBJECT_MAGIC);
- stv = oc->stobj->stevedore;
- CHECK_OBJ_NOTNULL(stv, STEVEDORE_MAGIC);
+ struct sml_notify sn;
+ struct viov *vio, *last;
+ unsigned u, uu;
+ vai_hdl hdl;
+ int nn, r, r2, islast;
- boc = HSH_RefBoc(oc);
+ VSCARAB_LOCAL(scarab, 16);
+ VSCARET_LOCAL(scaret, 16);
- if (boc == NULL) {
- VTAILQ_FOREACH_REVERSE_SAFE(
- st, &obj->list, storagehead, list, checkpoint) {
+ (void) final; // phase out?
+ sml_notify_init(&sn);
+ hdl = ObjVAIinit(wrk, oc, NULL, sml_notify, &sn);
+ AN(hdl);
- u = 0;
- if (VTAILQ_PREV(st, storagehead, list) == NULL)
+ r = u = 0;
+
+ do {
+ do {
+ nn = ObjVAIlease(wrk, hdl, scarab);
+ if (nn <= 0 || scarab->flags & VSCARAB_F_END)
+ break;
+ } while (scarab->used < scarab->capacity);
+
+ /*
+ * nn is the wait/return action or 0
+ * nn tells us if to flush
+ */
+ uu = u;
+ last = VSCARAB_LAST(scarab);
+ VSCARAB_FOREACH(vio, scarab) {
+ islast = vio == last;
+ AZ(u & OBJ_ITER_END);
+ if (islast && scarab->flags & VSCARAB_F_END)
u |= OBJ_ITER_END;
- if (final)
- u |= OBJ_ITER_FLUSH;
- if (ret == 0 && st->len > 0)
- ret = func(priv, u, st->ptr, st->len);
- if (final) {
- VTAILQ_REMOVE(&obj->list, st, list);
- sml_stv_free(stv, st);
- } else if (ret)
+
+ // flush if it is the scarab's last IOV and we will block next
+ // or if we need space in the return leases array
+ uu = u;
+ if ((islast && nn < 0) || scaret->used == scaret->capacity - 1)
+ uu |= OBJ_ITER_FLUSH;
+ r = func(priv, uu, vio->iov.iov_base, vio->iov.iov_len);
+ if (r != 0)
break;
- }
- return (ret);
- }
- p = NULL;
- l = 0;
+ // sufficient space ensured by capacity check above
+ VSCARET_ADD(scaret, vio->lease);
- u = 0;
- if (boc->fetched_so_far == 0) {
- ret = func(priv, OBJ_ITER_FLUSH, NULL, 0);
- if (ret)
- return (ret);
- }
- while (1) {
- ol = len;
- nl = ObjWaitExtend(wrk, oc, ol, &state);
- if (state == BOS_FAILED) {
- ret = -1;
- break;
+ // whenever we have flushed, return leases
+ if ((uu & OBJ_ITER_FLUSH) && scaret->used > 0)
+ ObjVAIreturn(wrk, hdl, scaret);
}
- if (nl == ol) {
- assert(state == BOS_FINISHED);
- break;
+
+ // return leases which we did not use if error (break)
+ VSCARAB_FOREACH_RESUME(vio, scarab) {
+ if (scaret->used == scaret->capacity)
+ ObjVAIreturn(wrk, hdl, scaret);
+ VSCARET_ADD(scaret, vio->lease);
}
- assert(nl > ol);
- Lck_Lock(&boc->mtx);
- AZ(VTAILQ_EMPTY(&obj->list));
- if (checkpoint == NULL) {
- st = VTAILQ_LAST(&obj->list, storagehead);
- sl = 0;
- } else {
- st = checkpoint;
- sl = checkpoint_len;
- ol -= checkpoint_len;
+
+ // we have now completed the scarab
+ VSCARAB_INIT(scarab, scarab->capacity);
+
+ // flush before blocking if we did not already
+ if (r == 0 && (nn == -ENOBUFS || nn == -EAGAIN) &&
+ (uu & OBJ_ITER_FLUSH) == 0) {
+ r = func(priv, OBJ_ITER_FLUSH, NULL, 0);
+ if (scaret->used > 0)
+ ObjVAIreturn(wrk, hdl, scaret);
}
- while (st != NULL) {
- if (st->len > ol) {
- p = st->ptr + ol;
- l = st->len - ol;
- len += l;
- break;
- }
- ol -= st->len;
- assert(ol >= 0);
- nl -= st->len;
- assert(nl > 0);
- sl += st->len;
- st = VTAILQ_PREV(st, storagehead, list);
- if (final && checkpoint != NULL) {
- if (checkpoint == boc->stevedore_priv)
- boc->stevedore_priv = trim_once;
- else
- VTAILQ_REMOVE(&obj->list, checkpoint, list);
- sml_stv_free(stv, checkpoint);
- }
- checkpoint = st;
- checkpoint_len = sl;
+
+ if (r == 0 && (nn == -ENOBUFS || nn == -EAGAIN)) {
+ assert(scaret->used <= 1);
+ sml_notify_wait(&sn);
}
- CHECK_OBJ_NOTNULL(obj, OBJECT_MAGIC);
- CHECK_OBJ_NOTNULL(st, STORAGE_MAGIC);
- st = VTAILQ_PREV(st, storagehead, list);
- if (st != NULL && st->len == 0)
- st = NULL;
- Lck_Unlock(&boc->mtx);
- assert(l > 0 || state == BOS_FINISHED);
- u = 0;
- if (st == NULL || final)
- u |= OBJ_ITER_FLUSH;
- if (st == NULL && state == BOS_FINISHED)
- u |= OBJ_ITER_END;
- ret = func(priv, u, p, l);
- if (ret)
- break;
- }
- HSH_DerefBoc(wrk, oc);
+ else if (r == 0 && nn < 0)
+ r = -1;
+ } while (nn != 0 && r == 0);
+
if ((u & OBJ_ITER_END) == 0) {
- ret2 = func(priv, OBJ_ITER_END, NULL, 0);
- if (ret == 0)
- ret = ret2;
+ r2 = func(priv, OBJ_ITER_END, NULL, 0);
+ if (r == 0)
+ r = r2;
}
- return (ret);
+
+ if (scaret->used > 0)
+ ObjVAIreturn(wrk, hdl, scaret);
+
+ ObjVAIfini(wrk, &hdl);
+ sml_notify_fini(&sn);
+
+ return (r);
}
/*--------------------------------------------------------------------
@@ -736,6 +1042,7 @@ const struct obj_methods SML_methods = {
.objgetattr = sml_getattr,
.objsetattr = sml_setattr,
.objtouch = LRU_Touch,
+ .vai_init = sml_ai_init
};
static void
diff --git a/bin/varnishtest/tests/c00111.vtc b/bin/varnishtest/tests/c00111.vtc
index 706ee7041..996d5d258 100644
--- a/bin/varnishtest/tests/c00111.vtc
+++ b/bin/varnishtest/tests/c00111.vtc
@@ -15,7 +15,8 @@ client c1 {
} -run
varnish v1 -vsl_catchup
-varnish v1 -expect fetch_failed == 1
+# with vai, this no longer fails systematically (which is good)
+varnish v1 -expect fetch_failed <= 1
varnish v1 -cliok "param.set transit_buffer 4k"
@@ -26,4 +27,4 @@ client c2 {
varnish v1 -vsl_catchup
varnish v1 -expect s_fetch == 2
-varnish v1 -expect fetch_failed == 1
+varnish v1 -expect fetch_failed <= 1
More information about the varnish-commit
mailing list