[PATCH 01/25] Functionality for a worker to grab another worker to perform a background task. Will be used for streamed fetching.
Martin Blix Grydeland
martin at varnish-software.com
Sun Jan 22 18:53:07 CET 2012
Thread grabbing statistics
---
bin/varnishd/cache/cache.h | 16 ++++++
bin/varnishd/cache/cache_pool.c | 106 +++++++++++++++++++++++++++++++++++-
bin/varnishd/cache/cache_session.c | 21 +++++++
bin/varnishd/cache/cache_wrk.c | 18 ++++++
include/tbl/vsc_f_main.h | 15 +++++
5 files changed, 174 insertions(+), 2 deletions(-)
diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 817b7c7..11f0f6a 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -294,8 +294,11 @@ enum e_do_what {
pool_do_accept,
pool_do_nothing,
pool_do_die,
+ pool_do_task,
};
+typedef void taskfunc(struct worker *, void *priv);
+
struct worker {
unsigned magic;
#define WORKER_MAGIC 0x6391adcf
@@ -331,6 +334,15 @@ struct worker {
/* Stream state */
struct stream_ctx *sctx;
+ /* Task */
+ taskfunc *taskfunc;
+ void *taskpriv;
+
+ /* ESI delivery stuff */
+ int gzip_resp;
+ ssize_t l_crc;
+ uint32_t crc;
+
/* Timeouts */
double connect_timeout;
double first_byte_timeout;
@@ -894,6 +906,8 @@ void PipeSession(struct sess *sp);
void Pool_Init(void);
void Pool_Work_Thread(void *priv, struct worker *w);
int Pool_Schedule(struct pool *pp, struct sess *sp);
+struct worker *Pool_GrabWorker(struct pool *pp, struct worker *wrk,
+ unsigned timeout);
#define WRW_IsReleased(w) ((w)->wrw.wfd == NULL)
int WRW_Error(const struct worker *w);
@@ -920,6 +934,7 @@ int SES_Schedule(struct sess *sp);
void SES_Handle(struct sess *sp, double now);
void SES_GetReq(struct sess *sp);
void SES_ReleaseReq(struct sess *sp);
+struct worker *SES_GrabWorker(struct sess *sp, unsigned timeout);
/* cache_shmlog.c */
extern struct VSC_C_main *VSC_C_main;
@@ -1007,6 +1022,7 @@ void *WRK_thread(void *priv);
typedef void *bgthread_t(struct sess *, void *priv);
void WRK_BgThread(pthread_t *thr, const char *name, bgthread_t *func,
void *priv);
+void WRK_DoTask(struct worker *wrk, taskfunc *func, void *priv);
/* cache_ws.c */
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index 84a6cfd..89c96ec 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -89,6 +89,14 @@ struct poolsock {
struct listen_sock *lsock;
};
+struct grabber {
+ unsigned magic;
+#define GRABBER_MAGIC 0x1f6f53f5
+ VTAILQ_ENTRY(grabber) list;
+ struct worker *wrk_waiting;
+ struct worker *wrk_grabbed;
+};
+
/* Number of work requests queued in excess of worker threads available */
struct pool {
@@ -104,6 +112,7 @@ struct pool {
struct workerhead idle;
VTAILQ_HEAD(, sess) queue;
VTAILQ_HEAD(, poolsock) socks;
+ VTAILQ_HEAD(, grabber) grabqueue;
unsigned nthr;
unsigned lqueue;
unsigned last_lqueue;
@@ -183,6 +192,7 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
struct pool *pp;
int stats_clean, i;
struct poolsock *ps;
+ struct grabber *grabber;
CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
wrk->pool = pp;
@@ -197,8 +207,22 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
WS_Reset(wrk->ws, NULL);
- wrk->sp = VTAILQ_FIRST(&pp->queue);
- if (wrk->sp != NULL) {
+ grabber = VTAILQ_FIRST(&pp->grabqueue);
+ if (grabber == NULL)
+ wrk->sp = VTAILQ_FIRST(&pp->queue);
+ if (grabber != NULL) {
+ /* We've been grabbed */
+ assert(pp->lqueue > 0);
+ VTAILQ_REMOVE(&pp->grabqueue, grabber, list);
+ pp->lqueue--;
+ CHECK_OBJ_NOTNULL(grabber, GRABBER_MAGIC);
+ CHECK_OBJ_NOTNULL(grabber->wrk_waiting, WORKER_MAGIC);
+ AZ(grabber->wrk_grabbed);
+ grabber->wrk_grabbed = wrk;
+ AZ(pthread_cond_signal(&grabber->wrk_waiting->cond));
+ /* Wait for work to be assigned to us */
+ (void)Lck_CondWait(&wrk->cond, &pp->mtx, NULL);
+ } else if (wrk->sp != NULL) {
/* Process queued requests, if any */
assert(pp->lqueue > 0);
VTAILQ_REMOVE(&pp->queue, wrk->sp, list);
@@ -271,6 +295,18 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
if (wrk->vcl != NULL)
VCL_Rel(&wrk->vcl);
}
+ } else if (wrk->do_what == pool_do_task) {
+ stats_clean = 0;
+ wrk->lastused = NAN;
+
+ AZ(wrk->sp);
+ AN(wrk->taskfunc);
+ wrk->taskfunc(wrk, wrk->taskpriv);
+ wrk->taskfunc = NULL;
+ wrk->taskpriv = NULL;
+
+ WS_Assert(wrk->ws);
+ AZ(wrk->busyobj);
} else if (wrk->do_what == pool_do_nothing) {
/* we already did */
} else {
@@ -334,6 +370,71 @@ Pool_Schedule(struct pool *pp, struct sess *sp)
}
/*--------------------------------------------------------------------
+ * Grab an idle worker to do a task, waiting up to timeout
+ * milliseconds for one to become idle.
+ * Returns:
+ * pointer to an idle worker
+ * NULL on failure to acquire idle worker within timeout
+ */
+
+struct worker *
+Pool_GrabWorker(struct pool *pp, struct worker *wrk, unsigned timeout)
+{
+ struct grabber grabber;
+ struct timespec ts;
+
+ CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+
+ memset(&grabber, 0, sizeof grabber);
+ grabber.magic = GRABBER_MAGIC;
+
+ Lck_Lock(&pp->mtx);
+
+ grabber.wrk_grabbed = VTAILQ_FIRST(&pp->idle);
+ if (grabber.wrk_grabbed != NULL) {
+ VTAILQ_REMOVE(&pp->idle, grabber.wrk_grabbed, list);
+ VSC_C_main->threadgrab_idle++;
+ }
+
+ if (grabber.wrk_grabbed == NULL && timeout > 0 &&
+ pp->lqueue < (cache_param->queue_max * pp->nthr) / 100) {
+ /* Wait up to timeout milliseconds for a worker to
+ * become idle */
+ AZ(clock_gettime(CLOCK_REALTIME, &ts));
+ ts.tv_sec += timeout / 1000;
+ ts.tv_nsec += (timeout % 1000) * 1000000;
+ if (ts.tv_nsec >= 1000000000) {
+ ts.tv_sec++;
+ ts.tv_nsec -= 1000000000;
+ }
+ grabber.wrk_waiting = wrk;
+ VTAILQ_INSERT_TAIL(&pp->grabqueue, &grabber, list);
+ pp->lqueue++;
+ AZ(pthread_cond_signal(&pp->herder_cond));
+ (void)Lck_CondWait(&wrk->cond, &pp->mtx, &ts);
+ if (grabber.wrk_grabbed != NULL) {
+ VSC_C_main->threadgrab_waiting++;
+ } else {
+ VTAILQ_REMOVE(&pp->grabqueue, &grabber, list);
+ pp->lqueue--;
+ }
+ }
+
+ if (grabber.wrk_grabbed != NULL) {
+ CHECK_OBJ_NOTNULL(grabber.wrk_grabbed, WORKER_MAGIC);
+ AZ(grabber.wrk_grabbed->taskfunc);
+ AZ(grabber.wrk_grabbed->taskpriv);
+ } else {
+ VSC_C_main->threadgrab_failed++;
+ }
+
+ Lck_Unlock(&pp->mtx);
+
+ return (grabber.wrk_grabbed);
+}
+
+/*--------------------------------------------------------------------
* Create another thread, if necessary & possible
*/
@@ -480,6 +581,7 @@ pool_mkpool(unsigned pool_no)
XXXAN(pp);
Lck_New(&pp->mtx, lck_wq);
+ VTAILQ_INIT(&pp->grabqueue);
VTAILQ_INIT(&pp->queue);
VTAILQ_INIT(&pp->idle);
VTAILQ_INIT(&pp->socks);
diff --git a/bin/varnishd/cache/cache_session.c b/bin/varnishd/cache/cache_session.c
index d1dbaba..ca93bb5 100644
--- a/bin/varnishd/cache/cache_session.c
+++ b/bin/varnishd/cache/cache_session.c
@@ -159,6 +159,27 @@ SES_Schedule(struct sess *sp)
}
/*--------------------------------------------------------------------
+ * Grab an idle worker to do a task, waiting up to timeout
+ * milliseconds for one to become idle.
+ * Returns:
+ * pointer to an idle woker
+ * NULL on failure to acquire idle worker within timeout
+ */
+
+struct worker *
+SES_GrabWorker(struct sess *sp, unsigned timeout)
+{
+ struct sesspool *pp;
+
+ CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+ pp = sp->sesspool;
+ CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
+ AN(pp->pool);
+
+ return (Pool_GrabWorker(pp->pool, sp->wrk, timeout));
+}
+
+/*--------------------------------------------------------------------
* Handle a session (from waiter)
*/
diff --git a/bin/varnishd/cache/cache_wrk.c b/bin/varnishd/cache/cache_wrk.c
index 6087991..e866aa1 100644
--- a/bin/varnishd/cache/cache_wrk.c
+++ b/bin/varnishd/cache/cache_wrk.c
@@ -128,6 +128,24 @@ WRK_BgThread(pthread_t *thr, const char *name, bgthread_t *func, void *priv)
AZ(pthread_create(thr, NULL, wrk_bgthread, bt));
}
+/*--------------------------------------------------------------------
+ * Make the referenced worker execute the given task
+ */
+
+void
+WRK_DoTask(struct worker *wrk, taskfunc *func, void *priv)
+{
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+ AN(func);
+
+ AZ(wrk->taskfunc);
+ AZ(wrk->taskpriv);
+ wrk->taskfunc = func;
+ wrk->taskpriv = priv;
+ wrk->do_what = pool_do_task;
+ AZ(pthread_cond_signal(&wrk->cond));
+}
+
/*--------------------------------------------------------------------*/
static void *
diff --git a/include/tbl/vsc_f_main.h b/include/tbl/vsc_f_main.h
index df7984a..b6b8dc9 100644
--- a/include/tbl/vsc_f_main.h
+++ b/include/tbl/vsc_f_main.h
@@ -193,6 +193,21 @@ VSC_F(thread_queue_len, uint64_t, 0, 'g',
" See also param queue_max."
)
+VSC_F(threadgrab_idle, uint64_t, 0, 'c',
+ "Threads grabbed while idle",
+ "Number of times an idle thread was grabbed."
+)
+
+VSC_F(threadgrab_waiting, uint64_t, 0, 'c',
+ "Threads grabbed while waiting",
+ "Number of times an idle thread was grabbed while waiting."
+)
+
+VSC_F(threadgrab_failed, uint64_t, 0, 'c',
+ "Thread grab failed",
+ "Number of times failed to grab a thread within the timeout."
+)
+
VSC_F(sess_queued, uint64_t, 0, 'c',
"Sessions queued for thread",
"Number of times session was queued waiting for a thread."
--
1.7.4.1
More information about the varnish-dev
mailing list