[master] 3ed720f74 cache_obj: Add a more generic boc extension notification facility
Nils Goroll
nils.goroll at uplex.de
Fri Jul 4 17:04:03 UTC 2025
commit 3ed720f74e50e24c561ce5bcd604785e4bf18063
Author: Nils Goroll <nils.goroll at uplex.de>
Date: Sun Sep 29 10:47:13 2024 +0200
cache_obj: Add a more generic boc extension notification facility
This commit prepares a more generic busy object extension notification facility
in preparation of the asynchronous iteration facility introduced with the next
commit. It makes more sense when looked at in context of that, but the changes
constitute a fairly independent part and thus have been separated.
Background
To support streaming of busy objects (delivery to a client while the body is
being fetched), the Object API provides ObjWaitExtend(), which is called by
storage iterators to learn the available amount of body data and to wait for
more if all available data has been processed (= sent to the client, usually).
The other end of the facility is ObjExtend(), which is called by the fetch side
of storage to update the available amount of body data and wake up any clients
blocking in ObjWaitExtend().
This facility recently got extended by a blocking operation in the other
direction, where the writing side blocks if the amount of unsent data exceeds
the amount configured via the transit_buffer.
Why this change?
The existing facility is based on the model of blocking threads. In order to
support asynchronous iterators, where a single thread may serve multiple
requests, we need a different, non-blocking model with notifications.
Implementation
The basic implementation idea is to introduce a variant of ObjWaitExtend()
which, rather than blocking on a condition variable, registers a callback
function to be called when the condition variable got signalled. This is
ObjVAIGetExtend(): It returns the updated extension, if available, _or_
registers the callback.
To implement the actual callback, we add to struct boc a queue (struct
vai_q_head), whose elements are basically the notification callback with two
pointers: the caller gets a private pointer as well as vai_hdl is an opaque
handle owned by storage.
ObjExtend() now also works the list of registered callbacks.
ObjVAICancel() removes a callback when the caller is no longer interested or
needs to reclaim the queue entry.
diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 3b01a7ac7..e92959f43 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -282,6 +282,10 @@ enum boc_state_e {
#include "tbl/boc_state.h"
};
+// cache_obj.h vai notify
+struct vai_qe;
+VSLIST_HEAD(vai_q_head, vai_qe);
+
struct boc {
unsigned magic;
#define BOC_MAGIC 0x70c98476
@@ -294,6 +298,7 @@ struct boc {
uint64_t fetched_so_far;
uint64_t delivered_so_far;
uint64_t transit_buffer;
+ struct vai_q_head vai_q_head;
};
/* Object core structure ---------------------------------------------
@@ -761,6 +766,15 @@ int ObjGetDouble(struct worker *, struct objcore *, enum obj_attr, double *);
int ObjGetU64(struct worker *, struct objcore *, enum obj_attr, uint64_t *);
int ObjCheckFlag(struct worker *, struct objcore *, enum obj_flags of);
+/*====================================================================
+ * ObjVAI...(): Asynchronous Iteration
+ *
+ * see comments in cache_obj.c for usage
+ */
+
+typedef void *vai_hdl;
+typedef void vai_notify_cb(vai_hdl, void *priv);
+
/* cache_req_body.c */
ssize_t VRB_Iterate(struct worker *, struct vsl_log *, struct req *,
objiterate_f *func, void *priv);
diff --git a/bin/varnishd/cache/cache_obj.c b/bin/varnishd/cache/cache_obj.c
index aa4ef2ac4..c5f2e54fc 100644
--- a/bin/varnishd/cache/cache_obj.c
+++ b/bin/varnishd/cache/cache_obj.c
@@ -231,6 +231,29 @@ obj_extend_condwait(const struct objcore *oc)
(void)Lck_CondWait(&oc->boc->cond, &oc->boc->mtx);
}
+// notify of an extension of the boc or state change
+//lint -sem(obj_boc_notify_Unlock, thread_unlock)
+
+static void
+obj_boc_notify_Unlock(struct boc *boc)
+{
+ struct vai_qe *qe, *next;
+
+ PTOK(pthread_cond_broadcast(&boc->cond));
+ qe = VSLIST_FIRST(&boc->vai_q_head);
+ VSLIST_FIRST(&boc->vai_q_head) = NULL;
+ while (qe != NULL) {
+ CHECK_OBJ(qe, VAI_Q_MAGIC);
+ AN(qe->flags & VAI_QF_INQUEUE);
+ qe->flags &= ~VAI_QF_INQUEUE;
+ next = VSLIST_NEXT(qe, list);
+ VSLIST_NEXT(qe, list) = NULL;
+ qe->cb(qe->hdl, qe->priv);
+ qe = next;
+ }
+ Lck_Unlock(&boc->mtx);
+}
+
void
ObjExtend(struct worker *wrk, struct objcore *oc, ssize_t l, int final)
{
@@ -241,14 +264,13 @@ ObjExtend(struct worker *wrk, struct objcore *oc, ssize_t l, int final)
AN(om->objextend);
assert(l >= 0);
- Lck_Lock(&oc->boc->mtx);
if (l > 0) {
+ Lck_Lock(&oc->boc->mtx);
obj_extend_condwait(oc);
om->objextend(wrk, oc, l);
oc->boc->fetched_so_far += l;
- PTOK(pthread_cond_broadcast(&oc->boc->cond));
+ obj_boc_notify_Unlock(oc->boc);
}
- Lck_Unlock(&oc->boc->mtx);
assert(oc->boc->state < BOS_FINISHED);
if (final && om->objtrimstore != NULL)
@@ -294,6 +316,51 @@ ObjWaitExtend(const struct worker *wrk, const struct objcore *oc, uint64_t l,
*statep = state;
return (rv);
}
+
+// get a new extension _or_ register a notification
+uint64_t
+ObjVAIGetExtend(struct worker *wrk, const struct objcore *oc, uint64_t l,
+ enum boc_state_e *statep, struct vai_qe *qe)
+{
+ enum boc_state_e state;
+ uint64_t rv;
+
+ (void) wrk;
+ CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
+ CHECK_OBJ_NOTNULL(oc->boc, BOC_MAGIC);
+ CHECK_OBJ_NOTNULL(qe, VAI_Q_MAGIC);
+ Lck_Lock(&oc->boc->mtx);
+ rv = oc->boc->fetched_so_far;
+ assert(l <= rv || oc->boc->state == BOS_FAILED);
+ state = oc->boc->state;
+ objSignalFetchLocked(oc, l);
+ if (l == rv && state < BOS_FINISHED &&
+ (qe->flags & VAI_QF_INQUEUE) == 0) {
+ qe->flags |= VAI_QF_INQUEUE;
+ VSLIST_INSERT_HEAD(&oc->boc->vai_q_head, qe, list);
+ }
+ Lck_Unlock(&oc->boc->mtx);
+ if (statep != NULL)
+ *statep = state;
+ return (rv);
+}
+
+void
+ObjVAICancel(struct worker *wrk, struct boc *boc, struct vai_qe *qe)
+{
+
+ (void) wrk;
+ CHECK_OBJ_NOTNULL(boc, BOC_MAGIC);
+ CHECK_OBJ_NOTNULL(qe, VAI_Q_MAGIC);
+
+ Lck_Lock(&boc->mtx);
+ // inefficient, but should be rare
+ if ((qe->flags & VAI_QF_INQUEUE) != 0)
+ VSLIST_REMOVE(&boc->vai_q_head, qe, vai_qe, list);
+ qe->flags = 0;
+ Lck_Unlock(&boc->mtx);
+}
+
/*====================================================================
*/
@@ -319,8 +386,7 @@ ObjSetState(struct worker *wrk, const struct objcore *oc,
Lck_Lock(&oc->boc->mtx);
oc->boc->state = next;
- PTOK(pthread_cond_broadcast(&oc->boc->cond));
- Lck_Unlock(&oc->boc->mtx);
+ obj_boc_notify_Unlock(oc->boc);
}
/*====================================================================
diff --git a/bin/varnishd/cache/cache_obj.h b/bin/varnishd/cache/cache_obj.h
index 1f936a534..f6ee8618e 100644
--- a/bin/varnishd/cache/cache_obj.h
+++ b/bin/varnishd/cache/cache_obj.h
@@ -50,6 +50,26 @@ typedef void *objsetattr_f(struct worker *, struct objcore *,
enum obj_attr attr, ssize_t len, const void *ptr);
typedef void objtouch_f(struct worker *, struct objcore *, vtim_real now);
+/* called by Obj/storage to notify that the lease function (vai_lease_f) can be
+ * called again after a -EAGAIN / -ENOBUFS return value
+ * NOTE:
+ * - the callback gets executed by an arbitrary thread
+ * - WITH the boc mtx held
+ * so it should never block and be efficient
+ */
+
+/* notify entry added to struct boc::vai_q_head */
+struct vai_qe {
+ unsigned magic;
+#define VAI_Q_MAGIC 0x573e27eb
+ unsigned flags;
+#define VAI_QF_INQUEUE (1U<<0)
+ VSLIST_ENTRY(vai_qe) list;
+ vai_notify_cb *cb;
+ vai_hdl hdl;
+ void *priv;
+};
+
struct obj_methods {
/* required */
objfree_f *objfree;
diff --git a/bin/varnishd/cache/cache_varnishd.h b/bin/varnishd/cache/cache_varnishd.h
index c4dc02a48..6d73c04fd 100644
--- a/bin/varnishd/cache/cache_varnishd.h
+++ b/bin/varnishd/cache/cache_varnishd.h
@@ -348,6 +348,10 @@ void *ObjSetAttr(struct worker *, struct objcore *, enum obj_attr,
int ObjCopyAttr(struct worker *, struct objcore *, struct objcore *,
enum obj_attr attr);
void ObjBocDone(struct worker *, struct objcore *, struct boc **);
+// VAI
+uint64_t ObjVAIGetExtend(struct worker *, const struct objcore *, uint64_t,
+ enum boc_state_e *, struct vai_qe *);
+void ObjVAICancel(struct worker *, struct boc *, struct vai_qe *);
int ObjSetDouble(struct worker *, struct objcore *, enum obj_attr, double);
int ObjSetU64(struct worker *, struct objcore *, enum obj_attr, uint64_t);
More information about the varnish-commit
mailing list