[PATCH 01/25] Functionality for a worker to grab another worker to perform a background task. Will be used for streamed fetching.

Martin Blix Grydeland martin at varnish-software.com
Sun Jan 22 18:53:07 CET 2012


Thread grabbing statistics
---
 bin/varnishd/cache/cache.h         |   16 ++++++
 bin/varnishd/cache/cache_pool.c    |  106 +++++++++++++++++++++++++++++++++++-
 bin/varnishd/cache/cache_session.c |   21 +++++++
 bin/varnishd/cache/cache_wrk.c     |   18 ++++++
 include/tbl/vsc_f_main.h           |   15 +++++
 5 files changed, 174 insertions(+), 2 deletions(-)

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 817b7c7..11f0f6a 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -294,8 +294,11 @@ enum e_do_what {
 	pool_do_accept,
 	pool_do_nothing,
 	pool_do_die,
+	pool_do_task,
 };
 
+typedef void taskfunc(struct worker *, void *priv);
+
 struct worker {
 	unsigned		magic;
 #define WORKER_MAGIC		0x6391adcf
@@ -331,6 +334,15 @@ struct worker {
 	/* Stream state */
 	struct stream_ctx	*sctx;
 
+	/* Task */
+	taskfunc		*taskfunc;
+	void			*taskpriv;
+
+	/* ESI delivery stuff */
+	int			gzip_resp;
+	ssize_t			l_crc;
+	uint32_t		crc;
+
 	/* Timeouts */
 	double			connect_timeout;
 	double			first_byte_timeout;
@@ -894,6 +906,8 @@ void PipeSession(struct sess *sp);
 void Pool_Init(void);
 void Pool_Work_Thread(void *priv, struct worker *w);
 int Pool_Schedule(struct pool *pp, struct sess *sp);
+struct worker *Pool_GrabWorker(struct pool *pp, struct worker *wrk,
+			       unsigned timeout);
 
 #define WRW_IsReleased(w)	((w)->wrw.wfd == NULL)
 int WRW_Error(const struct worker *w);
@@ -920,6 +934,7 @@ int SES_Schedule(struct sess *sp);
 void SES_Handle(struct sess *sp, double now);
 void SES_GetReq(struct sess *sp);
 void SES_ReleaseReq(struct sess *sp);
+struct worker *SES_GrabWorker(struct sess *sp, unsigned timeout);
 
 /* cache_shmlog.c */
 extern struct VSC_C_main *VSC_C_main;
@@ -1007,6 +1022,7 @@ void *WRK_thread(void *priv);
 typedef void *bgthread_t(struct sess *, void *priv);
 void WRK_BgThread(pthread_t *thr, const char *name, bgthread_t *func,
     void *priv);
+void WRK_DoTask(struct worker *wrk, taskfunc *func, void *priv);
 
 /* cache_ws.c */
 
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index 84a6cfd..89c96ec 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -89,6 +89,14 @@ struct poolsock {
 	struct listen_sock		*lsock;
 };
 
+struct grabber {
+	unsigned			magic;
+#define GRABBER_MAGIC			0x1f6f53f5
+	VTAILQ_ENTRY(grabber)		list;
+	struct worker			*wrk_waiting;
+	struct worker			*wrk_grabbed;
+};
+
 /* Number of work requests queued in excess of worker threads available */
 
 struct pool {
@@ -104,6 +112,7 @@ struct pool {
 	struct workerhead	idle;
 	VTAILQ_HEAD(, sess)	queue;
 	VTAILQ_HEAD(, poolsock)	socks;
+	VTAILQ_HEAD(, grabber)	grabqueue;
 	unsigned		nthr;
 	unsigned		lqueue;
 	unsigned		last_lqueue;
@@ -183,6 +192,7 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
 	struct pool *pp;
 	int stats_clean, i;
 	struct poolsock *ps;
+	struct grabber *grabber;
 
 	CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
 	wrk->pool = pp;
@@ -197,8 +207,22 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
 
 		WS_Reset(wrk->ws, NULL);
 
-		wrk->sp = VTAILQ_FIRST(&pp->queue);
-		if (wrk->sp != NULL) {
+		grabber = VTAILQ_FIRST(&pp->grabqueue);
+		if (grabber == NULL)
+			wrk->sp = VTAILQ_FIRST(&pp->queue);
+		if (grabber != NULL) {
+			/* We've been grabbed */
+			assert(pp->lqueue > 0);
+			VTAILQ_REMOVE(&pp->grabqueue, grabber, list);
+			pp->lqueue--;
+			CHECK_OBJ_NOTNULL(grabber, GRABBER_MAGIC);
+			CHECK_OBJ_NOTNULL(grabber->wrk_waiting, WORKER_MAGIC);
+			AZ(grabber->wrk_grabbed);
+			grabber->wrk_grabbed = wrk;
+			AZ(pthread_cond_signal(&grabber->wrk_waiting->cond));
+			/* Wait for work to be assigned to us */
+			(void)Lck_CondWait(&wrk->cond, &pp->mtx, NULL);
+		} else if (wrk->sp != NULL) {
 			/* Process queued requests, if any */
 			assert(pp->lqueue > 0);
 			VTAILQ_REMOVE(&pp->queue, wrk->sp, list);
@@ -271,6 +295,18 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
 				if (wrk->vcl != NULL)
 					VCL_Rel(&wrk->vcl);
 			}
+		} else if (wrk->do_what == pool_do_task) {
+			stats_clean = 0;
+			wrk->lastused = NAN;
+
+			AZ(wrk->sp);
+			AN(wrk->taskfunc);
+			wrk->taskfunc(wrk, wrk->taskpriv);
+			wrk->taskfunc = NULL;
+			wrk->taskpriv = NULL;
+
+			WS_Assert(wrk->ws);
+			AZ(wrk->busyobj);
 		} else if (wrk->do_what == pool_do_nothing) {
 			/* we already did */
 		} else {
@@ -334,6 +370,71 @@ Pool_Schedule(struct pool *pp, struct sess *sp)
 }
 
 /*--------------------------------------------------------------------
+ * Grab an idle worker to do a task, waiting up to timeout
+ * milliseconds for one to become idle.
+ * Returns:
+ *   pointer to an idle worker
+ *   NULL on failure to acquire idle worker within timeout
+ */
+
+struct worker *
+Pool_GrabWorker(struct pool *pp, struct worker *wrk, unsigned timeout)
+{
+	struct grabber grabber;
+	struct timespec ts;
+
+	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+
+	memset(&grabber, 0, sizeof grabber);
+	grabber.magic = GRABBER_MAGIC;
+
+	Lck_Lock(&pp->mtx);
+
+	grabber.wrk_grabbed = VTAILQ_FIRST(&pp->idle);
+	if (grabber.wrk_grabbed != NULL) {
+		VTAILQ_REMOVE(&pp->idle, grabber.wrk_grabbed, list);
+		VSC_C_main->threadgrab_idle++;
+	}
+
+	if (grabber.wrk_grabbed == NULL && timeout > 0 &&
+	    pp->lqueue < (cache_param->queue_max * pp->nthr) / 100) {
+		/* Wait up to timeout milliseconds for a worker to
+		 * become idle */
+		AZ(clock_gettime(CLOCK_REALTIME, &ts));
+		ts.tv_sec += timeout / 1000;
+		ts.tv_nsec += (timeout % 1000) *  1000000;
+		if (ts.tv_nsec >= 1000000000) {
+			ts.tv_sec++;
+			ts.tv_nsec -= 1000000000;
+		}
+		grabber.wrk_waiting = wrk;
+		VTAILQ_INSERT_TAIL(&pp->grabqueue, &grabber, list);
+		pp->lqueue++;
+		AZ(pthread_cond_signal(&pp->herder_cond));
+		(void)Lck_CondWait(&wrk->cond, &pp->mtx, &ts);
+		if (grabber.wrk_grabbed != NULL) {
+			VSC_C_main->threadgrab_waiting++;
+		} else {
+			VTAILQ_REMOVE(&pp->grabqueue, &grabber, list);
+			pp->lqueue--;
+		}
+	}
+
+	if (grabber.wrk_grabbed != NULL) {
+		CHECK_OBJ_NOTNULL(grabber.wrk_grabbed, WORKER_MAGIC);
+		AZ(grabber.wrk_grabbed->taskfunc);
+		AZ(grabber.wrk_grabbed->taskpriv);
+	} else {
+		VSC_C_main->threadgrab_failed++;
+	}
+
+	Lck_Unlock(&pp->mtx);
+
+	return (grabber.wrk_grabbed);
+}
+
+/*--------------------------------------------------------------------
  * Create another thread, if necessary & possible
  */
 
@@ -480,6 +581,7 @@ pool_mkpool(unsigned pool_no)
 	XXXAN(pp);
 	Lck_New(&pp->mtx, lck_wq);
 
+	VTAILQ_INIT(&pp->grabqueue);
 	VTAILQ_INIT(&pp->queue);
 	VTAILQ_INIT(&pp->idle);
 	VTAILQ_INIT(&pp->socks);
diff --git a/bin/varnishd/cache/cache_session.c b/bin/varnishd/cache/cache_session.c
index d1dbaba..ca93bb5 100644
--- a/bin/varnishd/cache/cache_session.c
+++ b/bin/varnishd/cache/cache_session.c
@@ -159,6 +159,27 @@ SES_Schedule(struct sess *sp)
 }
 
 /*--------------------------------------------------------------------
+ * Grab an idle worker to do a task, waiting up to timeout
+ * milliseconds for one to become idle.
+ * Returns:
+ *   pointer to an idle woker
+ *   NULL on failure to acquire idle worker within timeout
+ */
+
+struct worker *
+SES_GrabWorker(struct sess *sp, unsigned timeout)
+{
+	struct sesspool *pp;
+
+	CHECK_OBJ_NOTNULL(sp, SESS_MAGIC);
+	pp = sp->sesspool;
+	CHECK_OBJ_NOTNULL(pp, SESSPOOL_MAGIC);
+	AN(pp->pool);
+
+	return (Pool_GrabWorker(pp->pool, sp->wrk, timeout));
+}
+
+/*--------------------------------------------------------------------
  * Handle a session (from waiter)
  */
 
diff --git a/bin/varnishd/cache/cache_wrk.c b/bin/varnishd/cache/cache_wrk.c
index 6087991..e866aa1 100644
--- a/bin/varnishd/cache/cache_wrk.c
+++ b/bin/varnishd/cache/cache_wrk.c
@@ -128,6 +128,24 @@ WRK_BgThread(pthread_t *thr, const char *name, bgthread_t *func, void *priv)
 	AZ(pthread_create(thr, NULL, wrk_bgthread, bt));
 }
 
+/*--------------------------------------------------------------------
+ * Make the referenced worker execute the given task
+ */
+
+void
+WRK_DoTask(struct worker *wrk, taskfunc *func, void *priv)
+{
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+	AN(func);
+
+	AZ(wrk->taskfunc);
+	AZ(wrk->taskpriv);
+	wrk->taskfunc = func;
+	wrk->taskpriv = priv;
+	wrk->do_what = pool_do_task;
+	AZ(pthread_cond_signal(&wrk->cond));
+}
+
 /*--------------------------------------------------------------------*/
 
 static void *
diff --git a/include/tbl/vsc_f_main.h b/include/tbl/vsc_f_main.h
index df7984a..b6b8dc9 100644
--- a/include/tbl/vsc_f_main.h
+++ b/include/tbl/vsc_f_main.h
@@ -193,6 +193,21 @@ VSC_F(thread_queue_len,		uint64_t, 0, 'g',
 	"  See also param queue_max."
 )
 
+VSC_F(threadgrab_idle,		uint64_t, 0, 'c',
+    "Threads grabbed while idle",
+        "Number of times an idle thread was grabbed."
+)
+
+VSC_F(threadgrab_waiting,	uint64_t, 0, 'c',
+    "Threads grabbed while waiting",
+        "Number of times an idle thread was grabbed while waiting."
+)
+
+VSC_F(threadgrab_failed,	uint64_t, 0, 'c',
+    "Thread grab failed",
+        "Number of times failed to grab a thread within the timeout."
+)
+
 VSC_F(sess_queued,		uint64_t, 0, 'c',
     "Sessions queued for thread",
 	"Number of times session was queued waiting for a thread."
-- 
1.7.4.1




More information about the varnish-dev mailing list