[master] e93ea04 Add a facility to schedule at task on the thread pools in round-robin order.

Poul-Henning Kamp phk at FreeBSD.org
Sun May 31 01:14:34 CEST 2015


commit e93ea04a93d5b3136d96791d88d4b4698caa41e3
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date:   Sat May 30 23:13:38 2015 +0000

    Add a facility to schedule at task on the thread pools in round-robin
    order.
    
    Use it to health poke backends, rather than wasting a thread per
    backend+poll.

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 0265717..3c38a04 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -953,6 +953,7 @@ int Pool_Task_Arg(struct worker *, task_func_t *,
 void Pool_Sumstat(struct worker *w);
 int Pool_TrySumstat(struct worker *wrk);
 void Pool_PurgeStat(unsigned nobj);
+int Pool_Task_Any(struct pool_task *task, enum pool_how how);
 
 #define V1L_IsReleased(w)	((w)->v1l == NULL)
 void V1L_Chunked(const struct worker *w);
diff --git a/bin/varnishd/cache/cache_backend.c b/bin/varnishd/cache/cache_backend.c
index e84ff06..2df74c2 100644
--- a/bin/varnishd/cache/cache_backend.c
+++ b/bin/varnishd/cache/cache_backend.c
@@ -352,10 +352,10 @@ VRT_event_vbe(VRT_CTX, enum vcl_event_e ev, const struct director *d,
 	}
 
 	if (be->probe != NULL && ev == VCL_EVENT_WARM)
-		VBP_Control(be, 0);
+		VBP_Control(be, 1);
 
 	if (be->probe != NULL && ev == VCL_EVENT_COLD)
-		VBP_Control(be, 1);
+		VBP_Control(be, 0);
 
 	if (ev == VCL_EVENT_COLD) {
 		VSM_Free(be->vsc);
diff --git a/bin/varnishd/cache/cache_backend.h b/bin/varnishd/cache/cache_backend.h
index 11f14bd..d7c9aac 100644
--- a/bin/varnishd/cache/cache_backend.h
+++ b/bin/varnishd/cache/cache_backend.h
@@ -114,6 +114,7 @@ void VBP_Insert(struct backend *b, struct vrt_backend_probe const *p,
 void VBP_Remove(struct backend *b);
 void VBP_Control(const struct backend *b, int stop);
 void VBP_Status(struct cli *cli, const struct backend *, int details);
+void VBP_Init(void);
 
 struct tcp_pool *VBT_Ref(const struct suckaddr *ip4,
     const struct suckaddr *ip6);
diff --git a/bin/varnishd/cache/cache_backend_poll.c b/bin/varnishd/cache/cache_backend_poll.c
index 2a9a863..8928d38 100644
--- a/bin/varnishd/cache/cache_backend_poll.c
+++ b/bin/varnishd/cache/cache_backend_poll.c
@@ -41,6 +41,8 @@
 #include <stdio.h>
 #include <stdlib.h>
 
+#include "binary_heap.h"
+
 #include "cache.h"
 
 #include "cache_backend.h"
@@ -57,9 +59,6 @@ struct vbp_target {
 	unsigned			magic;
 #define VBP_TARGET_MAGIC		0x6b7cb656
 
-	struct lock			mtx;
-	int				disable;
-	int				stop;
 	struct backend			*backend;
 
 	struct tcp_pool			*tcp_pool;
@@ -81,12 +80,15 @@ struct vbp_target {
 	double				avg;
 	double				rate;
 
-	VTAILQ_ENTRY(vbp_target)	list;
-	pthread_t			thread;
+	double				due;
+	int				running;
+	int				heap_idx;
+	struct pool_task		task;
 };
 
-static VTAILQ_HEAD(, vbp_target)	vbp_list =
-    VTAILQ_HEAD_INITIALIZER(vbp_list);
+static struct lock			vbp_mtx;
+static pthread_cond_t			vbp_cond;
+static struct binheap			*vbp_heap;
 
 /*--------------------------------------------------------------------
  * Poke one backend, once, but possibly at both IPv4 and IPv6 addresses.
@@ -238,7 +240,7 @@ vbp_has_poked(struct vbp_target *vt)
 	}
 	vt->good = j;
 
-	Lck_Lock(&vt->mtx);
+	Lck_Lock(&vbp_mtx);
 	if (vt->backend != NULL) {
 		if (vt->good >= vt->probe.threshold) {
 			if (vt->backend->healthy)
@@ -260,48 +262,85 @@ vbp_has_poked(struct vbp_target *vt)
 		    vt->backend->display_name, logmsg, bits,
 		    vt->good, vt->probe.threshold, vt->probe.window,
 		    vt->last, vt->avg, vt->resp_buf);
-		if (!vt->disable) {
-			AN(vt->backend->vsc);
+		if (vt->backend != NULL && vt->backend->vsc != NULL)
 			vt->backend->vsc->happy = vt->happy;
-		}
 	}
-	Lck_Unlock(&vt->mtx);
+	Lck_Unlock(&vbp_mtx);
 }
 
 /*--------------------------------------------------------------------
- * One thread per backend to be poked.
  */
 
-static void *
-vbp_wrk_poll_backend(void *priv)
+static void __match_proto__(task_func_t)
+vbp_task(struct worker *wrk, void *priv)
 {
 	struct vbp_target *vt;
 
-	THR_SetName("backend poll");
-
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
 	CAST_OBJ_NOTNULL(vt, priv, VBP_TARGET_MAGIC);
 
-	while (!vt->stop) {
-		AN(vt->req);
-		assert(vt->req_len > 0);
+	AN(vt->req);
+	assert(vt->req_len > 0);
 
-		if (!vt->disable) {
-			vbp_start_poke(vt);
-			vbp_poke(vt);
-			vbp_has_poked(vt);
-		}
+	vbp_start_poke(vt);
+	vbp_poke(vt);
+	vbp_has_poked(vt);
+
+	Lck_Lock(&vbp_mtx);
+	if (vt->running < 0) {
+		VBT_Rel(&vt->tcp_pool);
+		free(vt->req);
+		FREE_OBJ(vt);
+	} else {
+		vt->running = 0;
+	}
+	Lck_Unlock(&vbp_mtx);
+}
+/*--------------------------------------------------------------------
+ */
+
+static void * __match_proto__()
+vbp_thread(struct worker *wrk, void *priv)
+{
+	double now, nxt;
+	struct vbp_target *vt;
 
-		if (!vt->stop)
-			VTIM_sleep(vt->probe.interval);
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+	AZ(priv);
+	while (1) {
+		Lck_Lock(&vbp_mtx);
+		while (1) {
+			now = VTIM_real();
+			vt = binheap_root(vbp_heap);
+			if (vt == NULL) {
+				nxt = 8.192 + now;
+			} else if (vt->due > now) {
+				nxt = vt->due;
+				vt = NULL;
+			} else {
+				binheap_delete(vbp_heap, vt->heap_idx);
+				vt->running = 1;
+				vt->due = now + vt->probe.interval;
+				binheap_insert(vbp_heap, vt);
+				nxt = 0.0;
+				break;
+			}
+			(void)Lck_CondWait(&vbp_cond, &vbp_mtx, nxt);
+		}
+		Lck_Unlock(&vbp_mtx);
+		vt->task.func = vbp_task;
+		vt->task.priv = vt;
+
+		if (Pool_Task_Any(&vt->task, POOL_QUEUE_FRONT)) {
+			Lck_Lock(&vbp_mtx);
+			vt->running = 0;
+			Lck_Unlock(&vbp_mtx);
+			// XXX: ehh... ?
+		}
 	}
-	Lck_Delete(&vt->mtx);
-	VTAILQ_REMOVE(&vbp_list, vt, list);
-	VBT_Rel(&vt->tcp_pool);
-	free(vt->req);
-	FREE_OBJ(vt);
-	return (NULL);
 }
 
+
 /*--------------------------------------------------------------------
  * Cli functions
  */
@@ -420,7 +459,7 @@ vbp_set_defaults(struct vbp_target *vt)
  */
 
 void
-VBP_Control(const struct backend *be, int stop)
+VBP_Control(const struct backend *be, int enable)
 {
 	struct vbp_target *vt;
 
@@ -429,17 +468,18 @@ VBP_Control(const struct backend *be, int stop)
 	vt = be->probe;
 	CHECK_OBJ_NOTNULL(vt, VBP_TARGET_MAGIC);
 
-VSL(SLT_Debug, 0, "VBP_CONTROL %d", stop);
-	Lck_Lock(&vt->mtx);
-	if (vt->disable == -1 && !stop) {
-		vt->disable = stop;
-		AZ(pthread_create(&vt->thread, NULL, vbp_wrk_poll_backend, vt));
-		AZ(pthread_detach(vt->thread));
+VSL(SLT_Debug, 0, "VBP_CONTROL %d", enable);
+	Lck_Lock(&vbp_mtx);
+	if (enable) {
+		assert(vt->heap_idx == BINHEAP_NOIDX);
+		vt->due = VTIM_real();
+		binheap_insert(vbp_heap, vt);
+		AZ(pthread_cond_signal(&vbp_cond));
 	} else {
-		assert(vt->disable != -1);
-		vt->disable = stop;
+		assert(vt->heap_idx != BINHEAP_NOIDX);
+		binheap_delete(vbp_heap, vt->heap_idx);
 	}
-	Lck_Unlock(&vt->mtx);
+	Lck_Unlock(&vbp_mtx);
 }
 
 /*--------------------------------------------------------------------
@@ -461,14 +501,12 @@ VBP_Insert(struct backend *b, const struct vrt_backend_probe *p,
 
 	ALLOC_OBJ(vt, VBP_TARGET_MAGIC);
 	XXXAN(vt);
-	VTAILQ_INSERT_TAIL(&vbp_list, vt, list);
-	Lck_New(&vt->mtx, lck_backend);
-	vt->disable = -1;
 
 	vt->tcp_pool = VBT_Ref(b->ipv4, b->ipv6);
 	AN(vt->tcp_pool);
 
 	vt->probe = *p;
+	vt->backend = b;
 
 	vbp_set_defaults(vt);
 	vbp_build_req(vt, hosthdr);
@@ -480,7 +518,6 @@ VBP_Insert(struct backend *b, const struct vrt_backend_probe *p,
 		vt->happy |= 1;
 		vbp_has_poked(vt);
 	}
-	vt->backend = b;
 	b->probe = vt;
 	vbp_has_poked(vt);
 }
@@ -495,11 +532,57 @@ VBP_Remove(struct backend *be)
 	vt = be->probe;
 	CHECK_OBJ_NOTNULL(vt, VBP_TARGET_MAGIC);
 
-	Lck_Lock(&vt->mtx);
-	vt->stop = 1;
-	vt->backend = NULL;
-	Lck_Unlock(&vt->mtx);
-
+	Lck_Lock(&vbp_mtx);
 	be->healthy = 1;
 	be->probe = NULL;
+	vt->backend = NULL;
+	if (vt->running) {
+		vt->running = -1;
+		vt = NULL;
+	}
+	Lck_Unlock(&vbp_mtx);
+	if (vt != NULL) {
+		VBT_Rel(&vt->tcp_pool);
+		free(vt->req);
+		FREE_OBJ(vt);
+	}
+}
+/*--------------------------------------------------------------------
+ */
+
+static int __match_proto__(binheap_cmp_t)
+vbp_cmp(void *priv, const void *a, const void *b)
+{
+	const struct vbp_target *aa, *bb;
+
+	AZ(priv);
+	CAST_OBJ_NOTNULL(aa, a, VBP_TARGET_MAGIC);
+	CAST_OBJ_NOTNULL(bb, b, VBP_TARGET_MAGIC);
+
+	return (aa->due < bb->due);
+}
+
+static void __match_proto__(binheap_update_t)
+vbp_update(void *priv, void *p, unsigned u)
+{
+	struct vbp_target *vt;
+
+	AZ(priv);
+	CAST_OBJ_NOTNULL(vt, p, VBP_TARGET_MAGIC);
+	vt->heap_idx = u;
+}
+
+/*--------------------------------------------------------------------
+ */
+
+void
+VBP_Init(void)
+{
+	pthread_t thr;
+
+	Lck_New(&vbp_mtx, lck_backend);
+	vbp_heap = binheap_new(NULL, vbp_cmp, vbp_update);
+	AN(vbp_heap);
+	AZ(pthread_cond_init(&vbp_cond, NULL));
+	WRK_BgThread(&thr, "Backend poller", vbp_thread, NULL);
 }
diff --git a/bin/varnishd/cache/cache_main.c b/bin/varnishd/cache/cache_main.c
index 00db49d..4c5370f 100644
--- a/bin/varnishd/cache/cache_main.c
+++ b/bin/varnishd/cache/cache_main.c
@@ -33,6 +33,7 @@
 #include <stdlib.h>
 
 #include "cache.h"
+#include "cache_backend.h"
 #include "common/heritage.h"
 
 #include "vcli_priv.h"
@@ -217,6 +218,7 @@ child_main(void)
 	HTTP_Init();
 
 	VBO_Init();
+	VBP_Init();
 	VBE_InitCfg();
 	Pool_Init();
 	V1P_Init();
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index 628b18c..56f11bf 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -45,6 +45,7 @@ static pthread_t		thr_pool_herder;
 
 static struct lock		wstat_mtx;
 struct lock			pool_mtx;
+static VTAILQ_HEAD(,pool)	pools = VTAILQ_HEAD_INITIALIZER(pools);
 
 /*--------------------------------------------------------------------
  * Summing of stats into global stats counters
@@ -86,6 +87,28 @@ Pool_TrySumstat(struct worker *wrk)
 }
 
 /*--------------------------------------------------------------------
+ * Facility for scheduling a task on any convenient pool.
+ */
+
+int
+Pool_Task_Any(struct pool_task *task, enum pool_how how)
+{
+	struct pool *pp;
+
+	Lck_Lock(&pool_mtx);
+	pp = VTAILQ_FIRST(&pools);
+	if (pp != NULL) {
+		VTAILQ_REMOVE(&pools, pp, list);
+		VTAILQ_INSERT_TAIL(&pools, pp, list);
+	}
+	Lck_Unlock(&pool_mtx);
+	if (pp == NULL)
+		return (-1);
+	// NB: When we remove pools, is there a race here ?
+	return (Pool_Task(pp, task, how));
+}
+
+/*--------------------------------------------------------------------
  * Helper function to update stats for purges under lock
  */
 
@@ -163,7 +186,6 @@ static void *
 pool_poolherder(void *priv)
 {
 	unsigned nwq;
-	VTAILQ_HEAD(,pool)	pools = VTAILQ_HEAD_INITIALIZER(pools);
 	struct pool *pp;
 	uint64_t u;
 
@@ -175,7 +197,9 @@ pool_poolherder(void *priv)
 		if (nwq < cache_param->wthread_pools) {
 			pp = pool_mkpool(nwq);
 			if (pp != NULL) {
+				Lck_Lock(&pool_mtx);
 				VTAILQ_INSERT_TAIL(&pools, pp, list);
+				Lck_Unlock(&pool_mtx);
 				VSC_C_main->pools++;
 				nwq++;
 				continue;
@@ -183,7 +207,10 @@ pool_poolherder(void *priv)
 		}
 		/* XXX: remove pools */
 		if (0) {
+			Lck_Lock(&pool_mtx);
 			pp = VTAILQ_FIRST(&pools);
+			VTAILQ_REMOVE(&pools, pp, list);
+			Lck_Unlock(&pool_mtx);
 			AN(pp);
 			MPL_Destroy(&pp->mpl_sess);
 			MPL_Destroy(&pp->mpl_req);
@@ -191,8 +218,10 @@ pool_poolherder(void *priv)
 		}
 		(void)sleep(1);
 		u = 0;
+		Lck_Lock(&pool_mtx);
 		VTAILQ_FOREACH(pp, &pools, list)
 			u += pp->lqueue;
+		Lck_Unlock(&pool_mtx);
 		VSC_C_main->thread_queue_len = u;
 	}
 	NEEDLESS_RETURN(NULL);
@@ -207,4 +236,6 @@ Pool_Init(void)
 	Lck_New(&wstat_mtx, lck_wstat);
 	Lck_New(&pool_mtx, lck_wq);
 	AZ(pthread_create(&thr_pool_herder, NULL, pool_poolherder, NULL));
+	while (!VSC_C_main->pools)
+		(void)usleep(10000);
 }



More information about the varnish-commit mailing list