[master] 7e25234 Overhaul the thread-pool herding code.
Poul-Henning Kamp
phk at varnish-cache.org
Wed Jun 27 11:54:59 CEST 2012
commit 7e25234d6f25cf1dd622e4d17e70902c99e63b8b
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Wed Jun 27 09:51:12 2012 +0000
Overhaul the thread-pool herding code.
NB: Changes to parameter defaults & units included in this commit!
I tried to apply some queuing theory to this problem and as much as
I admire Agner Krarup Erlang, his math isn't much help when you don't
know any of the relevant metrics for your queue.
Instead I took a much simpler approach: "If we fail to get a thread,
we probably need more threads", and have rewritten the herder to to
react faster and more reliably to such events.
I went over the parameters for thread-pools and normalized timeouts
to seconds rather than milliseconds (beware!) and polished descriptions
etc.
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index a723a9e..fa1f401 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -31,13 +31,6 @@
* Pools can be added on the fly, as a means to mitigate lock contention,
* but can only be removed again by a restart. (XXX: we could fix that)
*
- * Two threads herd the pools, one eliminates idle threads and aggregates
- * statistics for all the pools, the other thread creates new threads
- * on demand, subject to various numerical constraints.
- *
- * The algorithm for when to create threads needs to be reactive enough
- * to handle startup spikes, but sufficiently attenuated to not cause
- * thread pileups. This remains subject for improvement.
*/
#include "config.h"
@@ -50,36 +43,6 @@
#include "vtim.h"
-/*--------------------------------------------------------------------
- * MAC OS/X is incredibly moronic when it comes to time and such...
- */
-
-#ifndef CLOCK_MONOTONIC
-#define CLOCK_MONOTONIC 0
-
-#include <sys/time.h>
-
-static int
-clock_gettime(int foo, struct timespec *ts)
-{
- struct timeval tv;
-
- (void)foo;
- gettimeofday(&tv, NULL);
- ts->tv_sec = tv.tv_sec;
- ts->tv_nsec = tv.tv_usec * 1000;
- return (0);
-}
-
-static int
-pthread_condattr_setclock(pthread_condattr_t *attr, int foo)
-{
- (void)attr;
- (void)foo;
- return (0);
-}
-#endif /* !CLOCK_MONOTONIC */
-
VTAILQ_HEAD(taskhead, pool_task);
struct poolsock {
@@ -97,7 +60,6 @@ struct pool {
VTAILQ_ENTRY(pool) list;
pthread_cond_t herder_cond;
- struct lock herder_mtx;
pthread_t herder_thr;
struct vxid vxid;
@@ -107,8 +69,8 @@ struct pool {
struct taskhead front_queue;
struct taskhead back_queue;
unsigned nthr;
+ unsigned dry;
unsigned lqueue;
- unsigned last_lqueue;
uintmax_t ndropped;
uintmax_t nqueued;
struct sesspool *sesspool;
@@ -121,19 +83,21 @@ static pthread_t thr_pool_herder;
*/
static struct worker *
-pool_getidleworker(const struct pool *pp, int back)
+pool_getidleworker(struct pool *pp)
{
struct pool_task *pt;
struct worker *wrk;
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
Lck_AssertHeld(&pp->mtx);
- if (back)
- pt = VTAILQ_LAST(&pp->idle_queue, taskhead);
- else
- pt = VTAILQ_FIRST(&pp->idle_queue);
- if (pt == NULL)
+ pt = VTAILQ_FIRST(&pp->idle_queue);
+ if (pt == NULL) {
+ if (pp->nthr < cache_param->wthread_max) {
+ pp->dry++;
+ AZ(pthread_cond_signal(&pp->herder_cond));
+ }
return (NULL);
+ }
AZ(pt->func);
CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
return (wrk);
@@ -185,7 +149,7 @@ pool_accept(struct worker *wrk, void *arg)
Lck_Lock(&pp->mtx);
wa->vxid = VXID_Get(&pp->vxid);
- wrk2 = pool_getidleworker(pp, 0);
+ wrk2 = pool_getidleworker(pp);
if (wrk2 == NULL) {
/* No idle threads, do it ourselves */
Lck_Unlock(&pp->mtx);
@@ -225,7 +189,7 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
* The common case first: Take an idle thread, do it.
*/
- wrk = pool_getidleworker(pp, 0);
+ wrk = pool_getidleworker(pp);
if (wrk != NULL) {
VTAILQ_REMOVE(&pp->idle_queue, &wrk->task, list);
AZ(wrk->task.func);
@@ -242,7 +206,7 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
break;
case POOL_QUEUE_FRONT:
/* If we have too much in the queue already, refuse. */
- if (pp->lqueue > (cache_param->queue_max * pp->nthr) / 100) {
+ if (pp->lqueue > cache_param->wthread_queue_limit) {
pp->ndropped++;
retval = -1;
} else {
@@ -258,8 +222,6 @@ Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
WRONG("Unknown enum pool_how");
}
Lck_Unlock(&pp->mtx);
- if (retval)
- AZ(pthread_cond_signal(&pp->herder_cond));
return (retval);
}
@@ -320,7 +282,7 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
}
/*--------------------------------------------------------------------
- * Create another thread, if necessary & possible
+ * Create another thread.
*/
static void
@@ -328,35 +290,23 @@ pool_breed(struct pool *qp, const pthread_attr_t *tp_attr)
{
pthread_t tp;
- /*
- * If we need more threads, and have space, create
- * one more thread.
- */
- if (qp->nthr < cache_param->wthread_min || /* Not enough threads yet */
- (qp->lqueue > cache_param->wthread_add_threshold && /* need more */
- qp->lqueue > qp->last_lqueue)) { /* not getting better since last */
- if (qp->nthr > cache_param->wthread_max) {
- Lck_Lock(&pool_mtx);
- VSC_C_main->threads_limited++;
- Lck_Unlock(&pool_mtx);
- } else if (pthread_create(&tp, tp_attr, WRK_thread, qp)) {
- VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
- errno, strerror(errno));
- Lck_Lock(&pool_mtx);
- VSC_C_main->threads_failed++;
- Lck_Unlock(&pool_mtx);
- VTIM_sleep(cache_param->wthread_fail_delay * 1e-3);
- } else {
- AZ(pthread_detach(tp));
- VTIM_sleep(cache_param->wthread_add_delay * 1e-3);
- qp->nthr++;
- Lck_Lock(&pool_mtx);
- VSC_C_main->threads++;
- VSC_C_main->threads_created++;
- Lck_Unlock(&pool_mtx);
- }
+ if (pthread_create(&tp, tp_attr, WRK_thread, qp)) {
+ VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
+ errno, strerror(errno));
+ Lck_Lock(&pool_mtx);
+ VSC_C_main->threads_failed++;
+ Lck_Unlock(&pool_mtx);
+ VTIM_sleep(cache_param->wthread_fail_delay);
+ } else {
+ AZ(pthread_detach(tp));
+ qp->dry = 0;
+ qp->nthr++;
+ Lck_Lock(&pool_mtx);
+ VSC_C_main->threads++;
+ VSC_C_main->threads_created++;
+ Lck_Unlock(&pool_mtx);
+ VTIM_sleep(cache_param->wthread_add_delay);
}
- qp->last_lqueue = qp->lqueue;
}
/*--------------------------------------------------------------------
@@ -378,11 +328,10 @@ static void*
pool_herder(void *priv)
{
struct pool *pp;
+ struct pool_task *pt;
pthread_attr_t tp_attr;
- struct timespec ts;
double t_idle;
struct worker *wrk;
- int i;
CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
AZ(pthread_attr_init(&tp_attr));
@@ -397,55 +346,56 @@ pool_herder(void *priv)
AZ(pthread_attr_init(&tp_attr));
}
- pool_breed(pp, &tp_attr);
-
- if (pp->nthr < cache_param->wthread_min)
+ /* Make more threads if needed and allowed */
+ if (pp->nthr < cache_param->wthread_min ||
+ (pp->dry && pp->nthr < cache_param->wthread_max)) {
+ pool_breed(pp, &tp_attr);
continue;
-
- AZ(clock_gettime(CLOCK_MONOTONIC, &ts));
- ts.tv_sec += cache_param->wthread_purge_delay / 1000;
- ts.tv_nsec +=
- (cache_param->wthread_purge_delay % 1000) * 1000000;
- if (ts.tv_nsec >= 1000000000) {
- ts.tv_sec++;
- ts.tv_nsec -= 1000000000;
}
- Lck_Lock(&pp->herder_mtx);
- i = Lck_CondWait(&pp->herder_cond, &pp->herder_mtx, &ts);
- Lck_Unlock(&pp->herder_mtx);
- if (!i)
- continue;
+ if (pp->nthr > cache_param->wthread_min) {
- if (pp->nthr <= cache_param->wthread_min)
- continue;
+ t_idle = VTIM_real() - cache_param->wthread_timeout;
- t_idle = VTIM_real() - cache_param->wthread_timeout;
+ Lck_Lock(&pp->mtx);
+ /* XXX: unsafe counters */
+ VSC_C_main->sess_queued += pp->nqueued;
+ VSC_C_main->sess_dropped += pp->ndropped;
+ pp->nqueued = pp->ndropped = 0;
- Lck_Lock(&pp->mtx);
- VSC_C_main->sess_queued += pp->nqueued;
- VSC_C_main->sess_dropped += pp->ndropped;
- pp->nqueued = pp->ndropped = 0;
- wrk = pool_getidleworker(pp, 1);
- if (wrk != NULL && (wrk->lastused < t_idle ||
- pp->nthr > cache_param->wthread_max)) {
- VTAILQ_REMOVE(&pp->idle_queue, &wrk->task, list);
- AZ(wrk->task.func);
- } else
wrk = NULL;
- Lck_Unlock(&pp->mtx);
+ pt = VTAILQ_LAST(&pp->idle_queue, taskhead);
+ if (pt != NULL) {
+ AZ(pt->func);
+ CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
+
+ if (wrk->lastused < t_idle ||
+ pp->nthr > cache_param->wthread_max)
+ VTAILQ_REMOVE(&pp->idle_queue,
+ &wrk->task, list);
+ else
+ wrk = NULL;
+ }
+ Lck_Unlock(&pp->mtx);
- /* And give it a kiss on the cheek... */
- if (wrk != NULL) {
- pp->nthr--;
- Lck_Lock(&pool_mtx);
- VSC_C_main->threads--;
- VSC_C_main->threads_destroyed++;
- Lck_Unlock(&pool_mtx);
- wrk->task.func = NULL;
- wrk->task.priv = NULL;
- AZ(pthread_cond_signal(&wrk->cond));
+ /* And give it a kiss on the cheek... */
+ if (wrk != NULL) {
+ pp->nthr--;
+ Lck_Lock(&pool_mtx);
+ VSC_C_main->threads--;
+ VSC_C_main->threads_destroyed++;
+ Lck_Unlock(&pool_mtx);
+ wrk->task.func = NULL;
+ wrk->task.priv = NULL;
+ VTIM_sleep(cache_param->wthread_destroy_delay);
+ continue;
+ }
}
+
+ Lck_Lock(&pp->mtx);
+ if (!pp->dry)
+ (void)Lck_CondWait(&pp->herder_cond, &pp->mtx, NULL);
+ Lck_Unlock(&pp->mtx);
}
NEEDLESS_RETURN(NULL);
}
@@ -460,10 +410,10 @@ pool_mkpool(unsigned pool_no)
struct pool *pp;
struct listen_sock *ls;
struct poolsock *ps;
- pthread_condattr_t cv_attr;
ALLOC_OBJ(pp, POOL_MAGIC);
- XXXAN(pp);
+ if (pp == NULL)
+ return (NULL);
Lck_New(&pp->mtx, lck_wq);
VTAILQ_INIT(&pp->idle_queue);
@@ -483,11 +433,7 @@ pool_mkpool(unsigned pool_no)
AZ(Pool_Task(pp, &ps->task, POOL_QUEUE_BACK));
}
- AZ(pthread_condattr_init(&cv_attr));
- AZ(pthread_condattr_setclock(&cv_attr, CLOCK_MONOTONIC));
- AZ(pthread_cond_init(&pp->herder_cond, &cv_attr));
- AZ(pthread_condattr_destroy(&cv_attr));
- Lck_New(&pp->herder_mtx, lck_herder);
+ AZ(pthread_cond_init(&pp->herder_cond, NULL));
AZ(pthread_create(&pp->herder_thr, NULL, pool_herder, pp));
return (pp);
diff --git a/bin/varnishd/common/params.h b/bin/varnishd/common/params.h
index 2e8bf37..a66bcf8 100644
--- a/bin/varnishd/common/params.h
+++ b/bin/varnishd/common/params.h
@@ -62,16 +62,15 @@ struct params {
/* Worker threads and pool */
unsigned wthread_min;
unsigned wthread_max;
- unsigned wthread_timeout;
+ double wthread_timeout;
unsigned wthread_pools;
unsigned wthread_add_threshold;
- unsigned wthread_add_delay;
- unsigned wthread_fail_delay;
- unsigned wthread_purge_delay;
- unsigned wthread_stats_rate;
+ double wthread_add_delay;
+ double wthread_fail_delay;
+ double wthread_destroy_delay;
+ double wthread_stats_rate;
ssize_t wthread_stacksize;
-
- unsigned queue_max;
+ unsigned wthread_queue_limit;
/* Memory allocation hints */
unsigned workspace_client;
diff --git a/bin/varnishd/mgt/mgt_param.c b/bin/varnishd/mgt/mgt_param.c
index c425882..859ef14 100644
--- a/bin/varnishd/mgt/mgt_param.c
+++ b/bin/varnishd/mgt/mgt_param.c
@@ -93,7 +93,7 @@ tweak_generic_timeout(struct cli *cli, volatile unsigned *dst, const char *arg)
/*--------------------------------------------------------------------*/
-void
+static void
tweak_timeout(struct cli *cli, const struct parspec *par, const char *arg)
{
volatile unsigned *dest;
@@ -137,7 +137,7 @@ tweak_generic_timeout_double(struct cli *cli, volatile double *dest,
return (0);
}
-static void
+void
tweak_timeout_double(struct cli *cli, const struct parspec *par,
const char *arg)
{
diff --git a/bin/varnishd/mgt/mgt_param.h b/bin/varnishd/mgt/mgt_param.h
index b4538d2..ee1ad52 100644
--- a/bin/varnishd/mgt/mgt_param.h
+++ b/bin/varnishd/mgt/mgt_param.h
@@ -53,7 +53,7 @@ struct parspec {
int tweak_generic_uint(struct cli *cli,
volatile unsigned *dest, const char *arg, unsigned min, unsigned max);
void tweak_uint(struct cli *cli, const struct parspec *par, const char *arg);
-void tweak_timeout(struct cli *cli,
+void tweak_timeout_double(struct cli *cli,
const struct parspec *par, const char *arg);
void tweak_bytes(struct cli *cli, const struct parspec *par, const char *arg);
diff --git a/bin/varnishd/mgt/mgt_pool.c b/bin/varnishd/mgt/mgt_pool.c
index 986d405..385a500 100644
--- a/bin/varnishd/mgt/mgt_pool.c
+++ b/bin/varnishd/mgt/mgt_pool.c
@@ -97,7 +97,8 @@ tweak_thread_pool_max(struct cli *cli, const struct parspec *par,
/*--------------------------------------------------------------------*/
const struct parspec WRK_parspec[] = {
- { "thread_pools", tweak_uint, &mgt_param.wthread_pools, 1, UINT_MAX,
+ { "thread_pools", tweak_uint, &mgt_param.wthread_pools,
+ 1, UINT_MAX,
"Number of worker thread pools.\n"
"\n"
"Increasing number of worker pools decreases lock "
@@ -110,71 +111,69 @@ const struct parspec WRK_parspec[] = {
"restart to take effect.",
EXPERIMENTAL | DELAYED_EFFECT,
"2", "pools" },
- { "thread_pool_max", tweak_thread_pool_max, NULL, 1, 0,
+ { "thread_pool_max", tweak_thread_pool_max, NULL, 10, 0,
"The maximum number of worker threads in each pool.\n"
"\n"
"Do not set this higher than you have to, since excess "
"worker threads soak up RAM and CPU and generally just get "
- "in the way of getting work done.\n",
- EXPERIMENTAL | DELAYED_EFFECT,
- "500", "threads" },
- { "thread_pool_min", tweak_thread_pool_min, NULL, 2, 0,
+ "in the way of getting work done.\n"
+ "\n"
+ "Minimum is 10 threads.",
+ DELAYED_EFFECT,
+ "5000", "threads" },
+ { "thread_pool_min", tweak_thread_pool_min, NULL, 10, 0,
"The minimum number of worker threads in each pool.\n"
"\n"
"Increasing this may help ramp up faster from low load "
- "situations where threads have expired.\n"
- "\n"
- "Minimum is 2 threads.",
- EXPERIMENTAL | DELAYED_EFFECT,
- "5", "threads" },
- { "thread_pool_timeout", tweak_timeout, &mgt_param.wthread_timeout,
- 1, 0,
+ "situations or when threads have expired.\n"
+ "\n"
+ "Minimum is 10 threads.",
+ DELAYED_EFFECT,
+ "100", "threads" },
+ { "thread_pool_timeout",
+ tweak_timeout_double, &mgt_param.wthread_timeout,
+ 10, UINT_MAX,
"Thread idle threshold.\n"
"\n"
"Threads in excess of thread_pool_min, which have been idle "
- "for at least this long are candidates for purging.\n"
+ "for at least this long, will be destroyed.\n"
"\n"
- "Minimum is 1 second.",
+ "Minimum is 10 seconds.",
EXPERIMENTAL | DELAYED_EFFECT,
"300", "seconds" },
- { "thread_pool_purge_delay",
- tweak_timeout, &mgt_param.wthread_purge_delay, 100, 0,
- "Wait this long between purging threads.\n"
+ { "thread_pool_destroy_delay",
+ tweak_timeout_double, &mgt_param.wthread_destroy_delay,
+ 0.01, UINT_MAX,
+ "Wait this long after destroying a thread.\n"
"\n"
"This controls the decay of thread pools when idle(-ish).\n"
"\n"
- "Minimum is 100 milliseconds.",
+ "Minimum is 0.01 second.",
EXPERIMENTAL | DELAYED_EFFECT,
- "1000", "milliseconds" },
- { "thread_pool_add_threshold",
- tweak_uint, &mgt_param.wthread_add_threshold, 0, UINT_MAX,
- "Overflow threshold for worker thread creation.\n"
- "\n"
- "Setting this too low, will result in excess worker threads, "
- "which is generally a bad idea.\n"
- "\n"
- "Setting it too high results in insuffient worker threads.\n",
- EXPERIMENTAL,
- "2", "requests" },
+ "1", "seconds" },
{ "thread_pool_add_delay",
- tweak_timeout, &mgt_param.wthread_add_delay, 0, UINT_MAX,
- "Wait at least this long between creating threads.\n"
+ tweak_timeout_double, &mgt_param.wthread_add_delay,
+ 0, UINT_MAX,
+ "Wait at least this long after creating a thread.\n"
"\n"
- "Setting this too long results in insuffient worker threads.\n"
+ "Some (buggy) systems may need a short (sub-second) "
+ "delay between creating threads.\n"
+ "Set this to a few milliseconds if you see the "
+ "'threads_failed' counter grow too much.\n"
"\n"
- "Setting this too short increases the risk of worker "
- "thread pile-up.\n",
- 0,
- "2", "milliseconds" },
+ "Setting this too high results in insuffient worker threads.\n",
+ EXPERIMENTAL,
+ "0", "seconds" },
{ "thread_pool_fail_delay",
- tweak_timeout, &mgt_param.wthread_fail_delay, 100, UINT_MAX,
+ tweak_timeout_double, &mgt_param.wthread_fail_delay,
+ 10e-3, UINT_MAX,
"Wait at least this long after a failed thread creation "
"before trying to create another thread.\n"
"\n"
"Failure to create a worker thread is often a sign that "
" the end is near, because the process is running out of "
- "RAM resources for thread stacks.\n"
- "This delay tries to not rush it on needlessly.\n"
+ "some resource. "
+ "This delay tries to not rush the end on needlessly.\n"
"\n"
"If thread creation failures are a problem, check that "
"thread_pool_max is not too high.\n"
@@ -183,7 +182,7 @@ const struct parspec WRK_parspec[] = {
"thread_pool_min, to reduce the rate at which treads are "
"destroyed and later recreated.\n",
EXPERIMENTAL,
- "200", "milliseconds" },
+ "0.2", "seconds" },
{ "thread_stats_rate",
tweak_uint, &mgt_param.wthread_stats_rate, 0, UINT_MAX,
"Worker threads accumulate statistics, and dump these into "
@@ -194,13 +193,15 @@ const struct parspec WRK_parspec[] = {
"its accumulated stats into the global counters.\n",
EXPERIMENTAL,
"10", "requests" },
- { "queue_max", tweak_uint, &mgt_param.queue_max, 0, UINT_MAX,
- "Percentage permitted queue length.\n"
+ { "thread_queue_limit", tweak_uint, &mgt_param.wthread_queue_limit,
+ 0, UINT_MAX,
+ "Permitted queue length per thread-pool.\n"
"\n"
- "This sets the ratio of queued requests to worker threads, "
- "above which sessions will be dropped instead of queued.\n",
+ "This sets the number of requests we will queue, waiting "
+ "for an available thread. Above this limit sessions will "
+ "be dropped instead of queued.\n",
EXPERIMENTAL,
- "100", "%" },
+ "20", "" },
{ "rush_exponent", tweak_uint, &mgt_param.rush_exponent, 2, UINT_MAX,
"How many parked request we start for each completed "
"request on the object.\n"
diff --git a/bin/varnishtest/tests/c00002.vtc b/bin/varnishtest/tests/c00002.vtc
index 6914736..5351fec 100644
--- a/bin/varnishtest/tests/c00002.vtc
+++ b/bin/varnishtest/tests/c00002.vtc
@@ -5,7 +5,7 @@ server s1 {
txresp -hdr "Connection: close" -body "012345\n"
} -start
-varnish v1 -arg "-p thread_pool_min=2 -p thread_pool_max=8 -p thread_pools=4 -p thread_pool_purge_delay=100 -p thread_pool_timeout=1 -p thread_pool_add_delay=100"
+varnish v1 -arg "-p thread_pool_min=10 -p thread_pool_max=10 -p thread_pools=2"
varnish v1 -vcl+backend {} -start
@@ -16,4 +16,4 @@ client c1 {
expect resp.status == 200
} -run
-varnish v1 -expect threads == 8
+varnish v1 -expect threads == 20
More information about the varnish-commit
mailing list