[master] 385930c Rework the backend/waiter stuff.
Poul-Henning Kamp
phk at FreeBSD.org
Thu May 21 18:48:51 CEST 2015
commit 385930c383eae465e661673f165bd22d6836ace5
Author: Poul-Henning Kamp <phk at FreeBSD.org>
Date: Thu May 21 15:04:00 2015 +0000
Rework the backend/waiter stuff.
Instead of trying to steal vbc's away from the waiter, send the
request and wait for the waiter to hand the vbc back to us.
All waiters but poll still disabled ... coming up next.
diff --git a/bin/varnishd/cache/cache_backend.c b/bin/varnishd/cache/cache_backend.c
index 858ade7..d451943 100644
--- a/bin/varnishd/cache/cache_backend.c
+++ b/bin/varnishd/cache/cache_backend.c
@@ -78,7 +78,7 @@ VBE_Healthy(const struct backend *backend, double *changed)
*/
static int __match_proto__(vdi_getfd_f)
-vbe_dir_getfd(const struct director *d, struct busyobj *bo)
+vbe_dir_getfd(struct worker *wrk, const struct director *d, struct busyobj *bo)
{
struct vbc *vc;
struct backend *bp;
@@ -87,6 +87,7 @@ vbe_dir_getfd(const struct director *d, struct busyobj *bo)
char abuf1[VTCP_ADDRBUFSIZE], abuf2[VTCP_ADDRBUFSIZE];
char pbuf1[VTCP_PORTBUFSIZE], pbuf2[VTCP_PORTBUFSIZE];
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
CHECK_OBJ_NOTNULL(d, DIRECTOR_MAGIC);
CAST_OBJ_NOTNULL(bp, d->priv, BACKEND_MAGIC);
@@ -112,7 +113,7 @@ vbe_dir_getfd(const struct director *d, struct busyobj *bo)
return (-1);
FIND_TMO(connect_timeout, tmod, bo, vrt);
- vc = VBT_Get(bp->tcp_pool, tmod);
+ vc = VBT_Get(bp->tcp_pool, tmod, bp, wrk);
if (vc == NULL) {
// XXX: Per backend stats ?
VSC_C_main->backend_fail++;
@@ -197,7 +198,7 @@ static int __match_proto__(vdi_gethdrs_f)
vbe_dir_gethdrs(const struct director *d, struct worker *wrk,
struct busyobj *bo)
{
- int i;
+ int i, extrachance = 0;
const struct vrt_backend *vrt;
CHECK_OBJ_NOTNULL(d, DIRECTOR_MAGIC);
@@ -205,12 +206,14 @@ vbe_dir_gethdrs(const struct director *d, struct worker *wrk,
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
CAST_OBJ_NOTNULL(vrt, d->priv2, VRT_BACKEND_MAGIC);
- i = vbe_dir_getfd(d, bo);
+ i = vbe_dir_getfd(wrk, d, bo);
if (i < 0) {
VSLb(bo->vsl, SLT_FetchError, "no backend connection");
return (-1);
}
AN(bo->htc);
+ if (bo->htc->vbc->state == VBC_STATE_STOLEN)
+ extrachance = 1;
i = V1F_fetch_hdr(wrk, bo, vrt->hosthdr);
/*
@@ -218,12 +221,12 @@ vbe_dir_gethdrs(const struct director *d, struct worker *wrk,
* that the backend closed it before we get a request to it.
* Do a single retry in that case.
*/
- if (i == 1 && bo->htc->vbc->recycled) {
+ if (i == 1 && extrachance) {
vbe_dir_finish(d, wrk, bo);
AZ(bo->htc);
VSC_C_main->backend_retry++;
bo->doclose = SC_NULL;
- i = vbe_dir_getfd(d, bo);
+ i = vbe_dir_getfd(wrk, d, bo);
if (i < 0) {
VSLb(bo->vsl, SLT_FetchError, "no backend connection");
bo->htc = NULL;
@@ -281,13 +284,15 @@ vbe_dir_http1pipe(const struct director *d, struct req *req, struct busyobj *bo)
CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
- i = vbe_dir_getfd(d, bo);
+ i = vbe_dir_getfd(req->wrk, d, bo);
if (i < 0) {
VSLb(bo->vsl, SLT_FetchError, "no backend connection");
+ SES_Close(req->sp, SC_RX_TIMEOUT);
return;
+ } else {
+ V1P_Process(req, bo, i);
+ vbe_dir_finish(d, req->wrk, bo);
}
- V1P_Process(req, bo, i);
- vbe_dir_finish(d, bo->wrk, bo);
}
/*--------------------------------------------------------------------*/
diff --git a/bin/varnishd/cache/cache_backend.h b/bin/varnishd/cache/cache_backend.h
index 38a979f..7fdc55b 100644
--- a/bin/varnishd/cache/cache_backend.h
+++ b/bin/varnishd/cache/cache_backend.h
@@ -88,21 +88,19 @@ struct backend {
struct vbc {
unsigned magic;
#define VBC_MAGIC 0x0c5e6592
- VTAILQ_ENTRY(vbc) list;
int fd;
+ VTAILQ_ENTRY(vbc) list;
const struct suckaddr *addr;
- uint8_t recycled;
uint8_t state;
#define VBC_STATE_AVAIL (1<<0)
#define VBC_STATE_USED (1<<1)
#define VBC_STATE_STOLEN (1<<2)
#define VBC_STATE_CLEANUP (1<<3)
- uint8_t in_waiter;
- uint8_t have_been_in_waiter;
struct waited waited[1];
struct tcp_pool *tcp_pool;
struct backend *backend;
+ struct worker *wrk;
};
/* cache_backend_cfg.c */
@@ -123,6 +121,7 @@ void VBT_Rel(struct tcp_pool **tpp);
int VBT_Open(const struct tcp_pool *tp, double tmo, const struct suckaddr **sa);
void VBT_Recycle(struct tcp_pool *tp, struct vbc **vbc);
void VBT_Close(struct tcp_pool *tp, struct vbc **vbc);
-struct vbc *VBT_Get(struct tcp_pool *tp, double tmo);
-
+struct vbc *VBT_Get(struct tcp_pool *, double tmo, struct backend *,
+ struct worker *);
+void VBT_Wait(struct worker *wrk, const struct vbc *vbc);
diff --git a/bin/varnishd/cache/cache_backend_tcp.c b/bin/varnishd/cache/cache_backend_tcp.c
index ef037c9..44b6351 100644
--- a/bin/varnishd/cache/cache_backend_tcp.c
+++ b/bin/varnishd/cache/cache_backend_tcp.c
@@ -44,7 +44,6 @@
#include "waiter/waiter.h"
#include "vtim.h"
-
struct tcp_pool {
unsigned magic;
#define TCP_POOL_MAGIC 0x28b0e42a
@@ -89,24 +88,14 @@ tcp_handle(struct waited *w, enum wait_event ev, double now)
tp = vbc->tcp_pool;
Lck_Lock(&tp->mtx);
- VSL(SLT_Debug, 0,
- "------> Handler fd %d in_w %d state 0x%x ev %d have_been %d",
- vbc->fd, vbc->in_waiter, vbc->state, ev, vbc->have_been_in_waiter);
- AN(vbc->in_waiter);
switch(vbc->state) {
case VBC_STATE_STOLEN:
- vbc->state = VBC_STATE_AVAIL;
+ vbc->state = VBC_STATE_USED;
VTAILQ_REMOVE(&tp->connlist, vbc, list);
- if (Wait_Enter(tp->waiter, vbc->waited)) {
- VSL(SLT_Debug, 0,
- "------> Handler stolen -> re-wait failed");
- VTCP_close(&vbc->fd);
- tp->n_conn--;
- FREE_OBJ(vbc);
- } else {
- VTAILQ_INSERT_HEAD(&tp->connlist, vbc, list);
- }
+ CHECK_OBJ_NOTNULL(vbc->backend, BACKEND_MAGIC);
+ CHECK_OBJ_NOTNULL(vbc->wrk, WORKER_MAGIC);
+ AZ(pthread_cond_signal(&vbc->wrk->cond));
break;
case VBC_STATE_AVAIL:
VTCP_close(&vbc->fd);
@@ -114,10 +103,6 @@ tcp_handle(struct waited *w, enum wait_event ev, double now)
tp->n_conn--;
FREE_OBJ(vbc);
break;
- case VBC_STATE_USED:
- vbc->in_waiter = 0;
- vbc->have_been_in_waiter = 1;
- break;
case VBC_STATE_CLEANUP:
VTCP_close(&vbc->fd);
tp->n_kill--;
@@ -208,16 +193,11 @@ VBT_Rel(struct tcp_pool **tpp)
VTAILQ_FOREACH_SAFE(vbc, &tp->connlist, list, vbc2) {
VTAILQ_REMOVE(&tp->connlist, vbc, list);
tp->n_conn--;
- if (vbc->in_waiter) {
- vbc->state = VBC_STATE_CLEANUP;
- shutdown(vbc->fd, SHUT_WR);
- VTAILQ_INSERT_TAIL(&tp->killlist, vbc, list);
- tp->n_kill++;
- } else {
- VTCP_close(&vbc->fd);
- memset(vbc, 0x22, sizeof *vbc);
- free(vbc);
- }
+ assert(vbc->state == VBC_STATE_AVAIL);
+ vbc->state = VBC_STATE_CLEANUP;
+ (void)shutdown(vbc->fd, SHUT_WR);
+ VTAILQ_INSERT_TAIL(&tp->killlist, vbc, list);
+ tp->n_kill++;
}
while (tp->n_kill) {
Lck_Unlock(&tp->mtx);
@@ -279,39 +259,30 @@ VBT_Recycle(struct tcp_pool *tp, struct vbc **vbcp)
assert(vbc->state == VBC_STATE_USED);
assert(vbc->fd > 0);
+ AZ(vbc->backend);
Lck_Lock(&tp->mtx);
tp->n_used--;
- VSL(SLT_Debug, 0, "------> Recycle fd %d in_w %d",
- vbc->fd, vbc->in_waiter);
-
- if (!vbc->in_waiter) {
- vbc->in_waiter = 1;
- vbc->waited->ptr = vbc;
- vbc->waited->fd = vbc->fd;
- vbc->waited->idle = VTIM_real();
- vbc->state = VBC_STATE_AVAIL;
- VSL(SLT_Debug, 0, "------> Recycle fd %d Wait_Enter", vbc->fd);
- if (Wait_Enter(tp->waiter, vbc->waited)) {
- VTCP_close(&vbc->fd);
- memset(vbc, 0x33, sizeof *vbc);
- free(vbc);
- vbc = NULL;
- } else {
- VTAILQ_INSERT_HEAD(&tp->connlist, vbc, list);
- }
- i = 1;
+ vbc->waited->ptr = vbc;
+ vbc->waited->fd = vbc->fd;
+ vbc->waited->idle = VTIM_real();
+ vbc->state = VBC_STATE_AVAIL;
+ if (Wait_Enter(tp->waiter, vbc->waited)) {
+ VTCP_close(&vbc->fd);
+ memset(vbc, 0x33, sizeof *vbc);
+ free(vbc);
+ // XXX: stats
+ vbc = NULL;
} else {
- vbc->state = VBC_STATE_STOLEN;
- VTAILQ_INSERT_TAIL(&tp->connlist, vbc, list);
+ VTAILQ_INSERT_HEAD(&tp->connlist, vbc, list);
+ i++;
}
- if (vbc != NULL) {
+ if (vbc != NULL)
tp->n_conn++;
- vbc->recycled = 1;
- }
Lck_Unlock(&tp->mtx);
+
if (i && DO_DEBUG(DBG_VTC_MODE)) {
/*
* In varnishtest we do not have the luxury of using
@@ -346,17 +317,17 @@ VBT_Close(struct tcp_pool *tp, struct vbc **vbcp)
assert(vbc->state == VBC_STATE_USED);
assert(vbc->fd > 0);
- VSL(SLT_Debug, 0, "------> Close fd %d in_w %d",
- vbc->fd, vbc->in_waiter);
+ AZ(vbc->backend);
Lck_Lock(&tp->mtx);
tp->n_used--;
- if (vbc->in_waiter) {
- shutdown(vbc->fd, SHUT_WR);
+ if (vbc->state == VBC_STATE_STOLEN) {
+ (void)shutdown(vbc->fd, SHUT_WR);
vbc->state = VBC_STATE_CLEANUP;
VTAILQ_INSERT_HEAD(&tp->killlist, vbc, list);
tp->n_kill++;
} else {
+ assert(vbc->state == VBC_STATE_USED);
VTCP_close(&vbc->fd);
memset(vbc, 0x44, sizeof *vbc);
free(vbc);
@@ -369,27 +340,29 @@ VBT_Close(struct tcp_pool *tp, struct vbc **vbcp)
*/
struct vbc *
-VBT_Get(struct tcp_pool *tp, double tmo)
+VBT_Get(struct tcp_pool *tp, double tmo, struct backend *be, struct worker *wrk)
{
struct vbc *vbc;
CHECK_OBJ_NOTNULL(tp, TCP_POOL_MAGIC);
+ CHECK_OBJ_NOTNULL(be, BACKEND_MAGIC);
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
Lck_Lock(&tp->mtx);
vbc = VTAILQ_FIRST(&tp->connlist);
- if (vbc != NULL) {
- CHECK_OBJ_NOTNULL(vbc, VBC_MAGIC);
-
- VSL(SLT_Debug, 0, "------> Steal fd %d state 0x%x",
- vbc->fd, vbc->state);
- assert(vbc->state == VBC_STATE_AVAIL ||
- vbc->state == VBC_STATE_STOLEN);
-
+ CHECK_OBJ_ORNULL(vbc, VBC_MAGIC);
+ if (vbc == NULL || vbc->backend != NULL)
+ vbc = NULL;
+ else {
+ assert(vbc->tcp_pool == tp);
+ assert(vbc->state == VBC_STATE_AVAIL);
VTAILQ_REMOVE(&tp->connlist, vbc, list);
+ VTAILQ_INSERT_TAIL(&tp->connlist, vbc, list);
tp->n_conn--;
VSC_C_main->backend_reuse += 1;
- vbc->state = VBC_STATE_USED;
- assert(vbc->tcp_pool == tp);
+ vbc->state = VBC_STATE_STOLEN;
+ vbc->backend = be;
+ vbc->wrk = wrk;
}
tp->n_used++; // Opening mostly works
Lck_Unlock(&tp->mtx);
@@ -402,15 +375,33 @@ VBT_Get(struct tcp_pool *tp, double tmo)
INIT_OBJ(vbc->waited, WAITED_MAGIC);
vbc->state = VBC_STATE_USED;
vbc->tcp_pool = tp;
+ vbc->backend = be;
vbc->fd = VBT_Open(tp, tmo, &vbc->addr);
if (vbc->fd < 0)
FREE_OBJ(vbc);
if (vbc == NULL) {
- VSL(SLT_Debug, 0, "------> No new fd");
Lck_Lock(&tp->mtx);
tp->n_used--; // Nope, didn't work after all.
Lck_Unlock(&tp->mtx);
- } else
- VSL(SLT_Debug, 0, "------> New fd %d", vbc->fd);
+ }
return (vbc);
}
+
+/*--------------------------------------------------------------------
+ */
+
+void
+VBT_Wait(struct worker *wrk, const struct vbc *vbc)
+{
+ struct tcp_pool *tp;
+
+ CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
+ CHECK_OBJ_NOTNULL(vbc, VBC_MAGIC);
+ tp = vbc->tcp_pool;
+ CHECK_OBJ_NOTNULL(tp, TCP_POOL_MAGIC);
+ assert(vbc->wrk == wrk);
+ Lck_Lock(&tp->mtx);
+ while (vbc->state == VBC_STATE_STOLEN)
+ AZ(Lck_CondWait(&wrk->cond, &tp->mtx, 0));
+ Lck_Unlock(&tp->mtx);
+}
diff --git a/bin/varnishd/http1/cache_http1_fetch.c b/bin/varnishd/http1/cache_http1_fetch.c
index 7b108d2..23fc6b9 100644
--- a/bin/varnishd/http1/cache_http1_fetch.c
+++ b/bin/varnishd/http1/cache_http1_fetch.c
@@ -37,6 +37,7 @@
#include "hash/hash_slinger.h"
+#include "cache/cache_backend.h"
#include "cache/cache_director.h"
#include "vcli_priv.h"
#include "vtcp.h"
@@ -144,6 +145,11 @@ V1F_fetch_hdr(struct worker *wrk, struct busyobj *bo, const char *def_host)
/* Receive response */
+ if (htc->vbc->state != VBC_STATE_USED)
+ VBT_Wait(wrk, htc->vbc);
+
+ assert(htc->vbc->state == VBC_STATE_USED);
+
SES_RxInit(htc, bo->ws, cache_param->http_resp_size,
cache_param->http_resp_hdr_len);
CHECK_OBJ_NOTNULL(htc, HTTP_CONN_MAGIC);
diff --git a/bin/varnishd/http1/cache_http1_pipe.c b/bin/varnishd/http1/cache_http1_pipe.c
index ac1fb6e..818f88f 100644
--- a/bin/varnishd/http1/cache_http1_pipe.c
+++ b/bin/varnishd/http1/cache_http1_pipe.c
@@ -105,6 +105,7 @@ V1P_Process(struct req *req, struct busyobj *bo, int fd)
wrk = req->wrk;
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
CHECK_OBJ_NOTNULL(bo, BUSYOBJ_MAGIC);
+ assert(fd > 0);
req->res_mode = RES_PIPE;
@@ -112,12 +113,8 @@ V1P_Process(struct req *req, struct busyobj *bo, int fd)
acct_pipe.req = req->acct.req_hdrbytes;
req->acct.req_hdrbytes = 0;
- if (fd < 0) {
- pipecharge(req, &acct_pipe, NULL);
- SES_Close(req->sp, SC_OVERLOAD);
- return;
- }
CHECK_OBJ_NOTNULL(bo->htc, HTTP_CONN_MAGIC);
+ CHECK_OBJ_NOTNULL(bo->htc->vbc, VBC_MAGIC);
bo->wrk = req->wrk;
bo->director_state = DIR_S_BODY;
(void)VTCP_blocking(fd);
@@ -134,6 +131,9 @@ V1P_Process(struct req *req, struct busyobj *bo, int fd)
VSLb_ts_req(req, "Pipe", W_TIM_real(wrk));
if (i == 0) {
+ if (bo->htc->vbc->state == VBC_STATE_STOLEN)
+ VBT_Wait(req->wrk, bo->htc->vbc);
+
memset(fds, 0, sizeof fds);
fds[0].fd = fd;
fds[0].events = POLLIN | POLLERR;
diff --git a/bin/varnishd/waiter/cache_waiter_poll.c b/bin/varnishd/waiter/cache_waiter_poll.c
index 467f73f..bb87b17 100644
--- a/bin/varnishd/waiter/cache_waiter_poll.c
+++ b/bin/varnishd/waiter/cache_waiter_poll.c
@@ -62,7 +62,16 @@ struct vwp {
static void
vwp_extend_pollspace(struct vwp *vwp)
{
- size_t inc = (1<<16);
+ size_t inc;
+
+ if (vwp->npoll < (1<<12))
+ inc = (1<<10);
+ else if (vwp->npoll < (1<<14))
+ inc = (1<<12);
+ else if (vwp->npoll < (1<<16))
+ inc = (1<<14);
+ else
+ inc = (1<<16);
VSL(SLT_Debug, 0, "Acceptor poll space increased by %zu to %zu",
inc, vwp->npoll + inc);
@@ -86,6 +95,8 @@ static void
vwp_add(struct vwp *vwp, struct waited *w)
{
+ CHECK_OBJ_NOTNULL(w, WAITED_MAGIC);
+ CHECK_OBJ_NOTNULL(vwp, VWP_MAGIC);
if (vwp->hpoll == vwp->npoll)
vwp_extend_pollspace(vwp);
assert(vwp->hpoll < vwp->npoll);
@@ -123,9 +134,15 @@ vwp_dopipe(struct vwp *vwp)
assert(ss > 0);
i = 0;
while (ss) {
+ if (w[i] == NULL) {
+ assert(ss == sizeof w[0]);
+ assert(vwp->hpoll == 1);
+ pthread_exit(NULL);
+ }
CHECK_OBJ_NOTNULL(w[i], WAITED_MAGIC);
assert(w[i]->fd > 0); // no stdin
vwp_add(vwp, w[i++]);
+ ss -= sizeof w[0];
}
}
@@ -134,47 +151,49 @@ vwp_dopipe(struct vwp *vwp)
static void *
vwp_main(void *priv)
{
- int v, v2;
+ int v;
struct vwp *vwp;
struct waited *wp;
double now, idle;
- int i;
+ int i, dopipe;
- CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
THR_SetName("cache-poll");
+ CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
while (1) {
- // Try to keep the high point as low as possible
- assert(vwp->hpoll < vwp->npoll);
- while (vwp->hpoll > 0 && vwp->pollfd[vwp->hpoll].fd == -1)
- vwp->hpoll--;
-
- // XXX: sleep on ->tmo
- v = poll(vwp->pollfd, vwp->hpoll + 1, -1);
+ v = poll(vwp->pollfd, vwp->hpoll,
+ (int)floor(1e3 * *vwp->waiter->tmo));
assert(v >= 0);
- v2 = v;
+ if (v == 0)
+ v = vwp->hpoll;
now = VTIM_real();
idle = now - *vwp->waiter->tmo;
- i = 1;
- while (v2 > 0 && i < vwp->hpoll) {
+ i = 0;
+ dopipe = 0;
+ while (v > 0 && i < vwp->hpoll) {
+ if (vwp->pollfd[i].revents)
+ v--;
+ if (vwp->pollfd[i].fd == vwp->pipes[0]) {
+ if (vwp->pollfd[i].revents)
+ dopipe = 1;
+ i++;
+ continue;
+ }
wp = vwp->idx[i];
CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
- if (vwp->pollfd[i].revents != 0) {
- v2--;
+ if (wp->idle <= idle) {
+ vwp->waiter->func(wp, WAITER_TIMEOUT, now);
+ vwp_del(vwp, i);
+ } else if (vwp->pollfd[i].revents & POLLIN) {
assert(wp->fd > 0);
assert(wp->fd == vwp->pollfd[i].fd);
- VSL(SLT_Debug, wp->fd, "POLL Handle %d %x",
- wp->fd, vwp->pollfd[i].revents);
- vwp_del(vwp, i);
vwp->waiter->func(wp, WAITER_ACTION, now);
- } else if (wp->idle <= idle) {
vwp_del(vwp, i);
- vwp->waiter->func(wp, WAITER_TIMEOUT, now);
} else {
i++;
}
}
- if (vwp->pollfd[0].revents)
+ if (dopipe)
vwp_dopipe(vwp);
}
NEEDLESS_RETURN(NULL);
@@ -208,10 +227,10 @@ vwp_init(struct waiter *w)
AZ(pipe(vwp->pipes));
// XXX: set write pipe non-blocking
+ vwp->hpoll = 1;
vwp_extend_pollspace(vwp);
vwp->pollfd[0].fd = vwp->pipes[0];
vwp->pollfd[0].events = POLLIN;
- vwp->hpoll = 1;
AZ(pthread_create(&vwp->thread, NULL, vwp_main, vwp));
}
@@ -228,7 +247,10 @@ vwp_fini(struct waiter *w)
// XXX: set write pipe blocking
assert(write(vwp->pipes[1], &vp, sizeof vp) == sizeof vp);
AZ(pthread_join(vwp->thread, &vp));
+ AZ(close(vwp->pipes[0]));
+ AZ(close(vwp->pipes[1]));
free(vwp->pollfd);
+ free(vwp->idx);
}
/*--------------------------------------------------------------------*/
More information about the varnish-commit
mailing list