[experimental-ims] edf5144 Add the new pool-task API so Martin can see it.

Geoff Simmons geoff at varnish-cache.org
Tue Jan 24 18:30:27 CET 2012


commit edf5144fb9c87869638f3b3dc56ea6572e0d83cb
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date:   Mon Jan 23 10:06:46 2012 +0000

    Add the new pool-task API so Martin can see it.
    
    Presently unused and untested.

diff --git a/bin/varnishd/cache/cache.h b/bin/varnishd/cache/cache.h
index 817b7c7..463517c 100644
--- a/bin/varnishd/cache/cache.h
+++ b/bin/varnishd/cache/cache.h
@@ -286,6 +286,22 @@ struct wrk_accept {
 	struct listen_sock	*acceptlsock;
 };
 
+/* Worker pool stuff -------------------------------------------------*/
+
+typedef void pool_func_t(struct pool *pp, void *priv);
+
+struct pool_task {
+	VTAILQ_ENTRY(pool_task)		list;
+	pool_func_t			*func;
+	void				*priv;
+};
+
+enum pool_how {
+	POOL_NO_QUEUE,
+	POOL_QUEUE_FRONT,
+	POOL_QUEUE_BACK
+};
+
 /*--------------------------------------------------------------------*/
 
 enum e_do_what {
@@ -307,6 +323,10 @@ struct worker {
 	void			*nhashpriv;
 	struct dstat		stats;
 
+	/* New Pool stuff */
+	pool_func_t		*pool_func;
+	void			*pool_priv;
+
 	/* Pool stuff */
 	enum e_do_what		do_what;
 
@@ -648,6 +668,7 @@ struct sess {
 	struct worker		*wrk;
 	struct req		*req;
 
+	struct pool_task	task;
 	VTAILQ_ENTRY(sess)	list;
 
 	/* Session related fields ------------------------------------*/
@@ -894,6 +915,7 @@ void PipeSession(struct sess *sp);
 void Pool_Init(void);
 void Pool_Work_Thread(void *priv, struct worker *w);
 int Pool_Schedule(struct pool *pp, struct sess *sp);
+int Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how);
 
 #define WRW_IsReleased(w)	((w)->wrw.wfd == NULL)
 int WRW_Error(const struct worker *w);
diff --git a/bin/varnishd/cache/cache_pool.c b/bin/varnishd/cache/cache_pool.c
index 0f7c9e4..cc81b59 100644
--- a/bin/varnishd/cache/cache_pool.c
+++ b/bin/varnishd/cache/cache_pool.c
@@ -92,24 +92,26 @@ struct poolsock {
 /* Number of work requests queued in excess of worker threads available */
 
 struct pool {
-	unsigned		magic;
-#define POOL_MAGIC		0x606658fa
-	VTAILQ_ENTRY(pool)	list;
-
-	pthread_cond_t		herder_cond;
-	struct lock		herder_mtx;
-	pthread_t		herder_thr;
-
-	struct lock		mtx;
-	struct workerhead	idle;
-	VTAILQ_HEAD(, sess)	queue;
-	VTAILQ_HEAD(, poolsock)	socks;
-	unsigned		nthr;
-	unsigned		lqueue;
-	unsigned		last_lqueue;
-	uintmax_t		ndropped;
-	uintmax_t		nqueued;
-	struct sesspool		*sesspool;
+	unsigned			magic;
+#define POOL_MAGIC			0x606658fa
+	VTAILQ_ENTRY(pool)		list;
+
+	pthread_cond_t			herder_cond;
+	struct lock			herder_mtx;
+	pthread_t			herder_thr;
+
+	struct lock			mtx;
+	struct workerhead		idle;
+	VTAILQ_HEAD(, pool_task)	front_queue;
+	VTAILQ_HEAD(, pool_task)	back_queue;
+	VTAILQ_HEAD(, sess)		queue;
+	VTAILQ_HEAD(, poolsock)		socks;
+	unsigned			nthr;
+	unsigned			lqueue;
+	unsigned			last_lqueue;
+	uintmax_t			ndropped;
+	uintmax_t			nqueued;
+	struct sesspool			*sesspool;
 };
 
 static struct lock		pool_mtx;
@@ -174,6 +176,51 @@ pool_accept(struct pool *pp, struct worker *wrk, const struct poolsock *ps)
 }
 
 /*--------------------------------------------------------------------
+ * Enter a new task to be done
+ */
+
+int
+Pool_Task(struct pool *pp, struct pool_task *task, enum pool_how how)
+{
+	struct worker *wrk;
+	int retval = 0;
+
+	CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
+
+	Lck_Lock(&pp->mtx);
+
+	/*
+	 * The common case first:  Take an idle thread, do it.
+	 */
+
+	wrk = VTAILQ_FIRST(&pp->idle);
+	if (wrk != NULL) {
+		VTAILQ_REMOVE(&pp->idle, wrk, list);
+		Lck_Unlock(&pp->mtx);
+		wrk->pool_func = task->func;
+		wrk->pool_priv = task->priv;
+		AZ(pthread_cond_signal(&wrk->cond));
+		return (0);
+	}
+
+	switch (how) {
+	case POOL_NO_QUEUE:
+		retval = -1;
+		break;
+	case POOL_QUEUE_FRONT:
+		VTAILQ_INSERT_TAIL(&pp->front_queue, task, list);
+		break;
+	case POOL_QUEUE_BACK:
+		VTAILQ_INSERT_TAIL(&pp->back_queue, task, list);
+		break;
+	default:
+		WRONG("Unknown enum pool_how");
+	}
+	Lck_Unlock(&pp->mtx);
+	return (retval);
+}
+
+/*--------------------------------------------------------------------
  * This is the work function for worker threads in the pool.
  */
 
@@ -183,6 +230,7 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
 	struct pool *pp;
 	int stats_clean, i;
 	struct poolsock *ps;
+	struct pool_task *tp;
 
 	CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
 	wrk->pool = pp;
@@ -197,6 +245,24 @@ Pool_Work_Thread(void *priv, struct worker *wrk)
 
 		WS_Reset(wrk->ws, NULL);
 
+		tp = VTAILQ_FIRST(&pp->front_queue);
+		if (tp != NULL)
+			VTAILQ_REMOVE(&pp->front_queue, tp, list);
+
+		if (tp == NULL) {
+			tp = VTAILQ_FIRST(&pp->back_queue);
+			if (tp != NULL)
+				VTAILQ_REMOVE(&pp->back_queue, tp, list);
+		}
+
+		if (tp != NULL) {
+			Lck_Unlock(&pp->mtx);
+			tp->func(pp, tp->priv);
+			stats_clean = WRK_TrySumStat(wrk);
+			Lck_Lock(&pp->mtx);
+			continue;
+		}
+
 		wrk->sp = VTAILQ_FIRST(&pp->queue);
 		if (wrk->sp != NULL) {
 			/* Process queued requests, if any */
@@ -483,6 +549,8 @@ pool_mkpool(unsigned pool_no)
 	VTAILQ_INIT(&pp->queue);
 	VTAILQ_INIT(&pp->idle);
 	VTAILQ_INIT(&pp->socks);
+	VTAILQ_INIT(&pp->front_queue);
+	VTAILQ_INIT(&pp->back_queue);
 	pp->sesspool = SES_NewPool(pp, pool_no);
 	AN(pp->sesspool);
 



More information about the varnish-commit mailing list