[master] 856d562 The beginnings of a TX-scheduling facility.

Poul-Henning Kamp phk at FreeBSD.org
Tue Mar 7 13:33:05 CET 2017


commit 856d5627e8a110c7121c30a925e1eda3e4200459
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date:   Tue Mar 7 12:31:38 2017 +0000

    The beginnings of a TX-scheduling facility.

diff --git a/bin/varnishd/http2/cache_http2.h b/bin/varnishd/http2/cache_http2.h
index d1de3a8..cba5676 100644
--- a/bin/varnishd/http2/cache_http2.h
+++ b/bin/varnishd/http2/cache_http2.h
@@ -122,6 +122,9 @@ struct h2_req {
 	VTAILQ_ENTRY(h2_req)		list;
 	int64_t				window;
 	struct h2h_decode		*decode;
+
+	struct worker			*tx_wrk;
+	VTAILQ_ENTRY(h2_req)		tx_list;
 };
 
 VTAILQ_HEAD(h2_req_s, h2_req);
@@ -158,6 +161,10 @@ struct h2_sess {
 	struct req			*new_req;
 	int				go_away;
 	uint32_t			go_away_last_stream;
+
+	VTAILQ_HEAD(,h2_req)		txqueue;
+
+	struct h2_req			req0[1];
 };
 
 /* http2/cache_http2_panic.c */
@@ -191,6 +198,10 @@ h2_error h2h_decode_fini(const struct h2_sess *h2, struct h2h_decode *d);
 h2_error h2h_decode_bytes(struct h2_sess *h2, struct h2h_decode *d,
     const uint8_t *ptr, size_t len);
 
+/* cache_http2_send.c */
+void H2_Send_Get(struct worker *, struct h2_sess *, struct h2_req *);
+void H2_Send_Rel(struct worker *, struct h2_sess *, struct h2_req *);
+
 h2_error H2_Send_Frame(struct worker *, const struct h2_sess *,
     h2_frame type, uint8_t flags, uint32_t len, uint32_t stream,
     const void *);
diff --git a/bin/varnishd/http2/cache_http2_deliver.c b/bin/varnishd/http2/cache_http2_deliver.c
index 9e2665e..afd088e 100644
--- a/bin/varnishd/http2/cache_http2_deliver.c
+++ b/bin/varnishd/http2/cache_http2_deliver.c
@@ -85,8 +85,10 @@ h2_bytes(struct req *req, enum vdp_action act, void **priv,
 		return (0);
 	AZ(req->vdp_nxt);	       /* always at the bottom of the pile */
 
+	H2_Send_Get(req->wrk, r2->h2sess, r2);
 	H2_Send(req->wrk, r2,
 	    act == VDP_FLUSH ? 1 : 0, H2_F_DATA, H2FF_NONE, len, ptr);
+	H2_Send_Rel(req->wrk, r2->h2sess, r2);
 
 	return (0);
 }
@@ -140,11 +142,13 @@ h2_minimal_response(struct req *req, uint16_t status)
 		req->err_code = status;
 
 	/* XXX return code checking once H2_Send returns anything but 0 */
+	H2_Send_Get(req->wrk, r2->h2sess, r2);
 	H2_Send(req->wrk, r2, 1,
 	    H2_F_HEADERS,
 	    H2FF_HEADERS_END_HEADERS |
 		(status < 200 ? 0 : H2FF_HEADERS_END_STREAM),
 	    l, buf);
+	H2_Send_Rel(req->wrk, r2->h2sess, r2);
 	return (0);
 }
 
@@ -240,8 +244,10 @@ h2_deliver(struct req *req, struct boc *boc, int sendbody)
 	sz = (char*)p - req->ws->f;
 
 	/* XXX: Optimize !sendbody case */
+	H2_Send_Get(req->wrk, r2->h2sess, r2);
 	H2_Send(req->wrk, r2, 1, H2_F_HEADERS, H2FF_HEADERS_END_HEADERS,
 	    sz, req->ws->f);
+	H2_Send_Rel(req->wrk, r2->h2sess, r2);
 
 	WS_Release(req->ws, 0);
 
@@ -254,7 +260,9 @@ h2_deliver(struct req *req, struct boc *boc, int sendbody)
 		err = VDP_DeliverObj(req);
 	/*XXX*/(void)err;
 
+	H2_Send_Get(req->wrk, r2->h2sess, r2);
 	H2_Send(req->wrk, r2, 1, H2_F_DATA, H2FF_DATA_END_STREAM, 0, NULL);
+	H2_Send_Rel(req->wrk, r2->h2sess, r2);
 
 	AZ(req->wrk->v1l);
 	VDP_close(req);
diff --git a/bin/varnishd/http2/cache_http2_proto.c b/bin/varnishd/http2/cache_http2_proto.c
index 915a2d8..f22cb44 100644
--- a/bin/varnishd/http2/cache_http2_proto.c
+++ b/bin/varnishd/http2/cache_http2_proto.c
@@ -186,10 +186,10 @@ h2_rx_ping(struct worker *wrk, struct h2_sess *h2, struct h2_req *r2)
 		return (H2CE_PROTOCOL_ERROR);
 	if (h2->rxf_flags != 0)				// We never send pings
 		return (H2SE_PROTOCOL_ERROR);
-	Lck_Lock(&h2->sess->mtx);
+	H2_Send_Get(wrk, h2, r2);
 	H2_Send_Frame(wrk, h2,
 	    H2_F_PING, H2FF_PING_ACK, 8, 0, h2->rxf_data);
-	Lck_Unlock(&h2->sess->mtx);
+	H2_Send_Rel(wrk, h2, r2);
 	return (0);
 }
 
@@ -357,10 +357,10 @@ h2_rx_settings(struct worker *wrk, struct h2_sess *h2, struct h2_req *r2)
 			if (retval)
 				return (retval);
 		}
-		Lck_Lock(&h2->sess->mtx);
+		H2_Send_Get(wrk, h2, r2);
 		H2_Send_Frame(wrk, h2,
 		    H2_F_SETTINGS, H2FF_SETTINGS_ACK, 0, 0, NULL);
-		Lck_Unlock(&h2->sess->mtx);
+		H2_Send_Rel(wrk, h2, r2);
 	}
 	return (0);
 }
@@ -636,10 +636,10 @@ h2_procframe(struct worker *wrk, struct h2_sess *h2,
 	VSLb(h2->vsl, SLT_Debug, "H2: stream %u: %s", h2->rxf_stream, h2e->txt);
 	vbe32enc(b, h2e->val);
 
-	Lck_Lock(&h2->sess->mtx);
+	H2_Send_Get(wrk, h2, r2);
 	(void)H2_Send_Frame(wrk, h2, H2_F_RST_STREAM,
 	    0, sizeof b, h2->rxf_stream, b);
-	Lck_Unlock(&h2->sess->mtx);
+	H2_Send_Rel(wrk, h2, r2);
 
 	h2_del_req(wrk, r2);
 	return (0);
@@ -732,9 +732,9 @@ h2_rxframe(struct worker *wrk, struct h2_sess *h2)
 	if (h2e) {
 		vbe32enc(b, h2->highest_stream);
 		vbe32enc(b + 4, h2e->val);
-		Lck_Lock(&h2->sess->mtx);
+		H2_Send_Get(wrk, h2, h2->req0);
 		(void)H2_Send_Frame(wrk, h2, H2_F_GOAWAY, 0, 8, 0, b);
-		Lck_Unlock(&h2->sess->mtx);
+		H2_Send_Rel(wrk, h2, h2->req0);
 	}
 	return (h2e ? 0 : 1);
 }
diff --git a/bin/varnishd/http2/cache_http2_send.c b/bin/varnishd/http2/cache_http2_send.c
index 00c553a..2acd39b 100644
--- a/bin/varnishd/http2/cache_http2_send.c
+++ b/bin/varnishd/http2/cache_http2_send.c
@@ -36,6 +36,36 @@
 
 #include "vend.h"
 
+void
+H2_Send_Get(struct worker *wrk, struct h2_sess *h2, struct h2_req *r2)
+{
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+	CHECK_OBJ_NOTNULL(h2, H2_SESS_MAGIC);
+	CHECK_OBJ_NOTNULL(r2, H2_REQ_MAGIC);
+
+	r2->tx_wrk = wrk;
+	Lck_Lock(&h2->sess->mtx);
+	VTAILQ_INSERT_TAIL(&h2->txqueue, r2, tx_list);
+	while (VTAILQ_FIRST(&h2->txqueue) != r2)
+		Lck_CondWait(&wrk->cond, &h2->sess->mtx, 0);
+	Lck_Unlock(&h2->sess->mtx);
+}
+
+void
+H2_Send_Rel(struct worker *wrk, struct h2_sess *h2, struct h2_req *r2)
+{
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+	CHECK_OBJ_NOTNULL(h2, H2_SESS_MAGIC);
+	CHECK_OBJ_NOTNULL(r2, H2_REQ_MAGIC);
+	Lck_Lock(&h2->sess->mtx);
+	assert(VTAILQ_FIRST(&h2->txqueue) == r2);
+	VTAILQ_REMOVE(&h2->txqueue, r2, tx_list);
+	r2 = VTAILQ_FIRST(&h2->txqueue);
+	if (r2 != NULL)
+		AZ(pthread_cond_signal(&r2->tx_wrk->cond));
+	Lck_Unlock(&h2->sess->mtx);
+}
+
 static void
 h2_mk_hdr(uint8_t *hdr, h2_frame ftyp, uint8_t flags,
     uint32_t len, uint32_t stream)
@@ -64,7 +94,6 @@ H2_Send_Frame(struct worker *wrk, const struct h2_sess *h2,
 	ssize_t s;
 
 	(void)wrk;
-	Lck_AssertHeld(&h2->sess->mtx);
 
 	AN(ftyp);
 	AZ(flags & ~(ftyp->flags));
@@ -74,7 +103,9 @@ H2_Send_Frame(struct worker *wrk, const struct h2_sess *h2,
 		AZ(ftyp->act_snonzero);
 
 	h2_mk_hdr(hdr, ftyp, flags, len, stream);
+	Lck_Lock(&h2->sess->mtx);
 	VSLb_bin(h2->vsl, SLT_H2TxHdr, 9, hdr);
+	Lck_Unlock(&h2->sess->mtx);
 
 	s = write(h2->sess->fd, hdr, sizeof hdr);
 	if (s != sizeof hdr)
@@ -83,7 +114,9 @@ H2_Send_Frame(struct worker *wrk, const struct h2_sess *h2,
 		s = write(h2->sess->fd, ptr, len);
 		if (s != len)
 			return (H2CE_PROTOCOL_ERROR);	// XXX Need private ?
+		Lck_Lock(&h2->sess->mtx);
 		VSLb_bin(h2->vsl, SLT_H2TxBody, len, ptr);
+		Lck_Unlock(&h2->sess->mtx);
 	}
 	return (0);
 }
@@ -106,11 +139,14 @@ H2_Send(struct worker *wrk, struct h2_req *r2, int flush,
 
 	(void)flush;
 
+	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
 	CHECK_OBJ_NOTNULL(r2, H2_REQ_MAGIC);
 	h2 = r2->h2sess;
 	CHECK_OBJ_NOTNULL(h2, H2_SESS_MAGIC);
 	assert(len == 0 || ptr != NULL);
 
+	assert(VTAILQ_FIRST(&h2->txqueue) == r2);
+
 	AN(ftyp);
 	AZ(flags & ~(ftyp->flags));
 	if (r2->stream == 0)
@@ -120,6 +156,7 @@ H2_Send(struct worker *wrk, struct h2_req *r2, int flush,
 
 	Lck_Lock(&h2->sess->mtx);
 	mfs = h2->remote_settings.max_frame_size;
+	Lck_Unlock(&h2->sess->mtx);
 	if (len < mfs) {
 		retval = H2_Send_Frame(wrk, h2,
 		    ftyp, flags, len, r2->stream, ptr);
@@ -145,6 +182,5 @@ H2_Send(struct worker *wrk, struct h2_req *r2, int flush,
 			ftyp = ftyp->continuation;
 		} while (len > 0 && retval == 0);
 	}
-	Lck_Unlock(&h2->sess->mtx);
 	return (retval);
 }
diff --git a/bin/varnishd/http2/cache_http2_session.c b/bin/varnishd/http2/cache_http2_session.c
index b33b21e..522e252 100644
--- a/bin/varnishd/http2/cache_http2_session.c
+++ b/bin/varnishd/http2/cache_http2_session.c
@@ -99,6 +99,7 @@ h2_new_sess(const struct worker *wrk, struct sess *sp, struct req *srq)
 		h2->htc->rfd = &sp->fd;
 		h2->sess = sp;
 		VTAILQ_INIT(&h2->streams);
+		VTAILQ_INIT(&h2->txqueue);
 		h2->local_settings = H2_proto_settings;
 		h2->remote_settings = H2_proto_settings;
 
@@ -108,6 +109,8 @@ h2_new_sess(const struct worker *wrk, struct sess *sp, struct req *srq)
 
 		SES_Reserve_xport_priv(sp, &up);
 		*up = (uintptr_t)h2;
+		INIT_OBJ(h2->req0, H2_REQ_MAGIC);
+		h2->req0->h2sess = h2;
 	}
 	AN(up);
 	CAST_OBJ_NOTNULL(h2, (void*)(*up), H2_SESS_MAGIC);
@@ -333,13 +336,13 @@ h2_new_session(struct worker *wrk, void *arg)
 
 	THR_SetRequest(h2->srq);
 
-	Lck_Lock(&h2->sess->mtx);
+	H2_Send_Get(wrk, h2, h2->req0);
 	H2_Send_Frame(wrk, h2,
 	    H2_F_SETTINGS, H2FF_NONE, sizeof H2_settings, 0, H2_settings);
+	H2_Send_Rel(wrk, h2, h2->req0);
 
 	/* and off we go... */
 	h2->cond = &wrk->cond;
-	Lck_Unlock(&h2->sess->mtx);
 
 	while (h2_rxframe(wrk, h2)) {
 		WS_Reset(h2->ws, wsp);



More information about the varnish-commit mailing list