[master] e93ea04 Add a facility to schedule at task on the thread pools in round-robin order.
Poul-Henning Kamp
phk at FreeBSD.org
Sun May 31 01:14:34 CEST 2015
commit e93ea04a93d5b3136d96791d88d4b4698caa41e3
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Sat May 30 23:13:38 2015 +0000
Add a facility to schedule at task on the thread pools in round-robin
order.
Use it to health poke backends, rather than wasting a thread per
backend+poll.
diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 0265717..3c38a04 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -953,6 +953,7 @@ int Pool_Task_Arg(struct worker *, task_func_t *,
void Pool_Sumstat(struct worker *w);
int Pool_TrySumstat(struct worker *wrk);
void Pool_PurgeStat(unsigned nobj);
+int Pool_Task_Any(struct pool_task *task, enum pool_how how);
#define V1L_IsReleased(w) ((w)->v1l == NULL)
void V1L_Chunked(const struct worker *w);
diff --git a/bin/varnishd/cache/cache_backend.c b/bin/varnishd/cache/cache_backend.c
index e84ff06..2df74c2 100644
--- a/bin/varnishd/cache/cache_backend.c
+++ b/bin/varnishd/cache/cache_backend.c
@@ -352,10 +352,10 @@ VRT_event_vbe(VRT_CTX, enum vcl_event_e ev, const struct director *d,
}
if (be->probe != NULL && ev == VCL_EVENT_WARM)
- VBP_Control(be, 0);
+ VBP_Control(be, 1);
if (be->probe != NULL && ev == VCL_EVENT_COLD)
- VBP_Control(be, 1);
+ VBP_Control(be, 0);
if (ev == VCL_EVENT_COLD) {
VSM_Free(be->vsc);
diff --git a/bin/varnishd/cache/cache_backend.h b/bin/varnishd/cache/cache_backend.h
index 11f14bd..d7c9aac 100644
--- a/bin/varnishd/cache/cache_backend.h
+++ b/bin/varnishd/cache/cache_backend.h
@@ -114,6 +114,7 @@ void VBP_Insert(struct backend *b, struct vrt_backend_probe const *p,
void VBP_Remove(struct backend *b);
void VBP_Control(const struct backend *b, int stop);
void VBP_Status(struct cli *cli, const struct backend *, int details);
+void VBP_Init(void);
struct tcp_pool *VBT_Ref(const struct suckaddr *ip4,
const struct suckaddr *ip6);
diff --git a/bin/varnishd/cache/cache_backend_poll.c b/bin/varnishd/cache/cache_backend_poll.c
index 2a9a863..8928d38 100644
--- a/bin/varnishd/cache/cache_backend_poll.c
+++ b/bin/varnishd/cache/cache_backend_poll.c
@@ -41,6 +41,8 @@
#include <stdio.h>
#include <stdlib.h>
+#include "binary_heap.h"
+
#include "cache.h"
#include "cache_backend.h"
@@ -57,9 +59,6 @@ struct vbp_target {
unsigned magic;
#define VBP_TARGET_MAGIC 0x6b7cb656
- struct lock mtx;
- int disable;
- int stop;
struct backend *backend;
struct tcp_pool *tcp_pool;
@@ -81,12 +80,15 @@ struct vbp_target {
double avg;
double rate;
- VTAILQ_ENTRY(vbp_target) list;
- pthread_t thread;
+ double due;
+ int running;
+ int heap_idx;
+ struct pool_task task;
};
-static VTAILQ_HEAD(, vbp_target) vbp_list =
- VTAILQ_HEAD_INITIALIZER(vbp_list);
+static struct lock vbp_mtx;
+static pthread_cond_t vbp_cond;
+static struct binheap *vbp_heap;
/*--------------------------------------------------------------------
* Poke one backend, once, but possibly at both IPv4 and IPv6 addresses.
@@ -238,7 +240,7 @@ vbp_has_poked(struct vbp_target *vt)
}
vt->good = j;
- Lck_Lock(&vt->mtx);
+ Lck_Lock(&vbp_mtx);
if (vt->backend != NULL) {
if (vt->good >= vt->probe.threshold) {
if (vt->backend->healthy)
@@ -260,48 +262,85 @@ vbp_has_poked(struct vbp_target *vt)
vt->backend->display_name, logmsg, bits,
vt->good, vt->probe.threshold, vt->probe.window,
vt->last, vt->avg, vt->resp_buf);
- if (!vt->disable) {
- AN(vt->backend->vsc);
+ if (vt->backend != NULL && vt->backend->vsc != NULL)
vt->backend->vsc->happy = vt->happy;
- }
}
- Lck_Unlock(&vt->mtx);
+ Lck_Unlock(&vbp_mtx);
}
/*--------------------------------------------------------------------
- * One thread per backend to be poked.
*/
-static void *
-vbp_wrk_poll_backend(void *priv)
+static void __match_proto__(task_func_t)
+vbp_task(struct worker *wrk, void *priv)
{
struct vbp_target *vt;
- THR_SetName("backend poll");
-
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CAST_OBJ_NOTNULL(vt, priv, VBP_TARGET_MAGIC);
- while (!vt->stop) {
- AN(vt->req);
- assert(vt->req_len > 0);
+ AN(vt->req);
+ assert(vt->req_len > 0);
- if (!vt->disable) {
- vbp_start_poke(vt);
- vbp_poke(vt);
- vbp_has_poked(vt);
- }
+ vbp_start_poke(vt);
+ vbp_poke(vt);
+ vbp_has_poked(vt);
+
+ Lck_Lock(&vbp_mtx);
+ if (vt->running < 0) {
+ VBT_Rel(&vt->tcp_pool);
+ free(vt->req);
+ FREE_OBJ(vt);
+ } else {
+ vt->running = 0;
+ }
+ Lck_Unlock(&vbp_mtx);
+}
+/*--------------------------------------------------------------------
+ */
+
+static void * __match_proto__()
+vbp_thread(struct worker *wrk, void *priv)
+{
+ double now, nxt;
+ struct vbp_target *vt;
- if (!vt->stop)
- VTIM_sleep(vt->probe.interval);
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+ AZ(priv);
+ while (1) {
+ Lck_Lock(&vbp_mtx);
+ while (1) {
+ now = VTIM_real();
+ vt = binheap_root(vbp_heap);
+ if (vt == NULL) {
+ nxt = 8.192 + now;
+ } else if (vt->due > now) {
+ nxt = vt->due;
+ vt = NULL;
+ } else {
+ binheap_delete(vbp_heap, vt->heap_idx);
+ vt->running = 1;
+ vt->due = now + vt->probe.interval;
+ binheap_insert(vbp_heap, vt);
+ nxt = 0.0;
+ break;
+ }
+ (void)Lck_CondWait(&vbp_cond, &vbp_mtx, nxt);
+ }
+ Lck_Unlock(&vbp_mtx);
+ vt->task.func = vbp_task;
+ vt->task.priv = vt;
+
+ if (Pool_Task_Any(&vt->task, POOL_QUEUE_FRONT)) {
+ Lck_Lock(&vbp_mtx);
+ vt->running = 0;
+ Lck_Unlock(&vbp_mtx);
+ // XXX: ehh... ?
+ }
}
- Lck_Delete(&vt->mtx);
- VTAILQ_REMOVE(&vbp_list, vt, list);
- VBT_Rel(&vt->tcp_pool);
- free(vt->req);
- FREE_OBJ(vt);
- return (NULL);
}
+
/*--------------------------------------------------------------------
* Cli functions
*/
@@ -420,7 +459,7 @@ vbp_set_defaults(struct vbp_target *vt)
*/
void
-VBP_Control(const struct backend *be, int stop)
+VBP_Control(const struct backend *be, int enable)
{
struct vbp_target *vt;
@@ -429,17 +468,18 @@ VBP_Control(const struct backend *be, int stop)
vt = be->probe;
CHECK_OBJ_NOTNULL(vt, VBP_TARGET_MAGIC);
-VSL(SLT_Debug, 0, "VBP_CONTROL %d", stop);
- Lck_Lock(&vt->mtx);
- if (vt->disable == -1 && !stop) {
- vt->disable = stop;
- AZ(pthread_create(&vt->thread, NULL, vbp_wrk_poll_backend, vt));
- AZ(pthread_detach(vt->thread));
+VSL(SLT_Debug, 0, "VBP_CONTROL %d", enable);
+ Lck_Lock(&vbp_mtx);
+ if (enable) {
+ assert(vt->heap_idx == BINHEAP_NOIDX);
+ vt->due = VTIM_real();
+ binheap_insert(vbp_heap, vt);
+ AZ(pthread_cond_signal(&vbp_cond));
} else {
- assert(vt->disable != -1);
- vt->disable = stop;
+ assert(vt->heap_idx != BINHEAP_NOIDX);
+ binheap_delete(vbp_heap, vt->heap_idx);
}
- Lck_Unlock(&vt->mtx);
+ Lck_Unlock(&vbp_mtx);
}
/*--------------------------------------------------------------------
@@ -461,14 +501,12 @@ VBP_Insert(struct backend *b, const struct vrt_backend_probe *p,
ALLOC_OBJ(vt, VBP_TARGET_MAGIC);
XXXAN(vt);
- VTAILQ_INSERT_TAIL(&vbp_list, vt, list);
- Lck_New(&vt->mtx, lck_backend);
- vt->disable = -1;
vt->tcp_pool = VBT_Ref(b->ipv4, b->ipv6);
AN(vt->tcp_pool);
vt->probe = *p;
+ vt->backend = b;
vbp_set_defaults(vt);
vbp_build_req(vt, hosthdr);
@@ -480,7 +518,6 @@ VBP_Insert(struct backend *b, const struct vrt_backend_probe *p,
vt->happy |= 1;
vbp_has_poked(vt);
}
- vt->backend = b;
b->probe = vt;
vbp_has_poked(vt);
}
@@ -495,11 +532,57 @@ VBP_Remove(struct backend *be)
vt = be->probe;
CHECK_OBJ_NOTNULL(vt, VBP_TARGET_MAGIC);
- Lck_Lock(&vt->mtx);
- vt->stop = 1;
- vt->backend = NULL;
- Lck_Unlock(&vt->mtx);
-
+ Lck_Lock(&vbp_mtx);
be->healthy = 1;
be->probe = NULL;
+ vt->backend = NULL;
+ if (vt->running) {
+ vt->running = -1;
+ vt = NULL;
+ }
+ Lck_Unlock(&vbp_mtx);
+ if (vt != NULL) {
+ VBT_Rel(&vt->tcp_pool);
+ free(vt->req);
+ FREE_OBJ(vt);
+ }
+}
+/*--------------------------------------------------------------------
+ */
+
+static int __match_proto__(binheap_cmp_t)
+vbp_cmp(void *priv, const void *a, const void *b)
+{
+ const struct vbp_target *aa, *bb;
+
+ AZ(priv);
+ CAST_OBJ_NOTNULL(aa, a, VBP_TARGET_MAGIC);
+ CAST_OBJ_NOTNULL(bb, b, VBP_TARGET_MAGIC);
+
+ return (aa->due < bb->due);
+}
+
+static void __match_proto__(binheap_update_t)
+vbp_update(void *priv, void *p, unsigned u)
+{
+ struct vbp_target *vt;
+
+ AZ(priv);
+ CAST_OBJ_NOTNULL(vt, p, VBP_TARGET_MAGIC);
+ vt->heap_idx = u;
+}
+
+/*--------------------------------------------------------------------
+ */
+
+void
+VBP_Init(void)
+{
+ pthread_t thr;
+
+ Lck_New(&vbp_mtx, lck_backend);
+ vbp_heap = binheap_new(NULL, vbp_cmp, vbp_update);
+ AN(vbp_heap);
+ AZ(pthread_cond_init(&vbp_cond, NULL));
+ WRK_BgThread(&thr, "Backend poller", vbp_thread, NULL);
}
diff --git a/bin/varnishd/cache/cache_main.c b/bin/varnishd/cache/cache_main.c
index 00db49d..4c5370f 100644
--- a/bin/varnishd/cache/cache_main.c
+++ b/bin/varnishd/cache/cache_main.c
@@ -33,6 +33,7 @@
#include <stdlib.h>
#include "cache.h"
+#include "cache_backend.h"
#include "common/heritage.h"
#include "vcli_priv.h"
@@ -217,6 +218,7 @@ child_main(void)
HTTP_Init();
VBO_Init();
+ VBP_Init();
VBE_InitCfg();
Pool_Init();
V1P_Init();
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index 628b18c..56f11bf 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -45,6 +45,7 @@ static pthread_t thr_pool_herder;
static struct lock wstat_mtx;
struct lock pool_mtx;
+static VTAILQ_HEAD(,pool) pools = VTAILQ_HEAD_INITIALIZER(pools);
/*--------------------------------------------------------------------
* Summing of stats into global stats counters
@@ -86,6 +87,28 @@ Pool_TrySumstat(struct worker *wrk)
}
/*--------------------------------------------------------------------
+ * Facility for scheduling a task on any convenient pool.
+ */
+
+int
+Pool_Task_Any(struct pool_task *task, enum pool_how how)
+{
+ struct pool *pp;
+
+ Lck_Lock(&pool_mtx);
+ pp = VTAILQ_FIRST(&pools);
+ if (pp != NULL) {
+ VTAILQ_REMOVE(&pools, pp, list);
+ VTAILQ_INSERT_TAIL(&pools, pp, list);
+ }
+ Lck_Unlock(&pool_mtx);
+ if (pp == NULL)
+ return (-1);
+ // NB: When we remove pools, is there a race here ?
+ return (Pool_Task(pp, task, how));
+}
+
+/*--------------------------------------------------------------------
* Helper function to update stats for purges under lock
*/
@@ -163,7 +186,6 @@ static void *
pool_poolherder(void *priv)
{
unsigned nwq;
- VTAILQ_HEAD(,pool) pools = VTAILQ_HEAD_INITIALIZER(pools);
struct pool *pp;
uint64_t u;
@@ -175,7 +197,9 @@ pool_poolherder(void *priv)
if (nwq < cache_param->wthread_pools) {
pp = pool_mkpool(nwq);
if (pp != NULL) {
+ Lck_Lock(&pool_mtx);
VTAILQ_INSERT_TAIL(&pools, pp, list);
+ Lck_Unlock(&pool_mtx);
VSC_C_main->pools++;
nwq++;
continue;
@@ -183,7 +207,10 @@ pool_poolherder(void *priv)
}
/* XXX: remove pools */
if (0) {
+ Lck_Lock(&pool_mtx);
pp = VTAILQ_FIRST(&pools);
+ VTAILQ_REMOVE(&pools, pp, list);
+ Lck_Unlock(&pool_mtx);
AN(pp);
MPL_Destroy(&pp->mpl_sess);
MPL_Destroy(&pp->mpl_req);
@@ -191,8 +218,10 @@ pool_poolherder(void *priv)
}
(void)sleep(1);
u = 0;
+ Lck_Lock(&pool_mtx);
VTAILQ_FOREACH(pp, &pools, list)
u += pp->lqueue;
+ Lck_Unlock(&pool_mtx);
VSC_C_main->thread_queue_len = u;
}
NEEDLESS_RETURN(NULL);
@@ -207,4 +236,6 @@ Pool_Init(void)
Lck_New(&wstat_mtx, lck_wstat);
Lck_New(&pool_mtx, lck_wq);
AZ(pthread_create(&thr_pool_herder, NULL, pool_poolherder, NULL));
+ while (!VSC_C_main->pools)
+ (void)usleep(10000);
}
More information about the varnish-commit
mailing list