| | varnish-cache/bin/varnishd/cache/cache_wrk.c |
0 |
|
/*- |
1 |
|
* Copyright (c) 2006 Verdens Gang AS |
2 |
|
* Copyright (c) 2006-2011 Varnish Software AS |
3 |
|
* All rights reserved. |
4 |
|
* |
5 |
|
* Author: Poul-Henning Kamp <phk@phk.freebsd.dk> |
6 |
|
* |
7 |
|
* SPDX-License-Identifier: BSD-2-Clause |
8 |
|
* |
9 |
|
* Redistribution and use in source and binary forms, with or without |
10 |
|
* modification, are permitted provided that the following conditions |
11 |
|
* are met: |
12 |
|
* 1. Redistributions of source code must retain the above copyright |
13 |
|
* notice, this list of conditions and the following disclaimer. |
14 |
|
* 2. Redistributions in binary form must reproduce the above copyright |
15 |
|
* notice, this list of conditions and the following disclaimer in the |
16 |
|
* documentation and/or other materials provided with the distribution. |
17 |
|
* |
18 |
|
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND |
19 |
|
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
20 |
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
21 |
|
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE |
22 |
|
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
23 |
|
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS |
24 |
|
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) |
25 |
|
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
26 |
|
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY |
27 |
|
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF |
28 |
|
* SUCH DAMAGE. |
29 |
|
* |
30 |
|
* Worker thread stuff unrelated to the worker thread pools. |
31 |
|
* |
32 |
|
* -- |
33 |
|
* signaling_note: |
34 |
|
* |
35 |
|
* note on worker wakeup signaling through the wrk condition variable (cv) |
36 |
|
* |
37 |
|
* In the general case, a cv needs to be signaled while holding the |
38 |
|
* corresponding mutex, otherwise the signal may be posted before the waiting |
39 |
|
* thread could register itself on the cv and, consequently, the signal may be |
40 |
|
* missed. |
41 |
|
* |
42 |
|
* In our case, any worker thread which we wake up comes from the idle queue, |
43 |
|
* where it put itself under the mutex, releasing that mutex implicitly via |
44 |
|
* Lck_CondWaitUntil() (which calls some variant of pthread_cond_wait). So we avoid |
45 |
|
* additional mutex contention knowing that any worker thread on the idle queue |
46 |
|
* is blocking on the cv. |
47 |
|
* |
48 |
|
* Except -- when it isn't, because it woke up for releasing its VCL |
49 |
|
* Reference. To account for this case, we check if the task function has been |
50 |
|
* set in the meantime, which in turn requires all of the task preparation to be |
51 |
|
* done holding the pool mutex. (see also #2719) |
52 |
|
*/ |
53 |
|
|
54 |
|
#include "config.h" |
55 |
|
|
56 |
|
#include <stdlib.h> |
57 |
|
#include <sched.h> |
58 |
|
|
59 |
|
#include "cache_varnishd.h" |
60 |
|
#include "cache_pool.h" |
61 |
|
|
62 |
|
#include "vcli_serve.h" |
63 |
|
#include "vtim.h" |
64 |
|
|
65 |
|
#include "hash/hash_slinger.h" |
66 |
|
|
67 |
|
static void Pool_Work_Thread(struct pool *pp, struct worker *wrk); |
68 |
|
|
69 |
|
static uintmax_t reqpoolfail; |
70 |
|
|
71 |
|
/*-------------------------------------------------------------------- |
72 |
|
* Create and start a back-ground thread which as its own worker and |
73 |
|
* session data structures; |
74 |
|
*/ |
75 |
|
|
76 |
|
struct bgthread { |
77 |
|
unsigned magic; |
78 |
|
#define BGTHREAD_MAGIC 0x23b5152b |
79 |
|
const char *name; |
80 |
|
bgthread_t *func; |
81 |
|
void *priv; |
82 |
|
}; |
83 |
|
|
84 |
|
static void * |
85 |
147851 |
wrk_bgthread(void *arg) |
86 |
|
{ |
87 |
|
struct bgthread *bt; |
88 |
|
struct worker wrk; |
89 |
|
struct worker_priv wpriv[1]; |
90 |
|
struct VSC_main_wrk ds; |
91 |
|
void *r; |
92 |
|
|
93 |
147851 |
CAST_OBJ_NOTNULL(bt, arg, BGTHREAD_MAGIC); |
94 |
147851 |
THR_SetName(bt->name); |
95 |
147851 |
THR_Init(); |
96 |
147851 |
INIT_OBJ(&wrk, WORKER_MAGIC); |
97 |
147851 |
INIT_OBJ(wpriv, WORKER_PRIV_MAGIC); |
98 |
147851 |
wrk.wpriv = wpriv; |
99 |
|
// bgthreads do not have a vpi member |
100 |
147851 |
memset(&ds, 0, sizeof ds); |
101 |
147851 |
wrk.stats = &ds; |
102 |
|
|
103 |
147851 |
r = bt->func(&wrk, bt->priv); |
104 |
147851 |
HSH_Cleanup(&wrk); |
105 |
147851 |
Pool_Sumstat(&wrk); |
106 |
147851 |
return (r); |
107 |
|
} |
108 |
|
|
109 |
|
void |
110 |
147851 |
WRK_BgThread(pthread_t *thr, const char *name, bgthread_t *func, void *priv) |
111 |
|
{ |
112 |
|
struct bgthread *bt; |
113 |
|
|
114 |
147851 |
ALLOC_OBJ(bt, BGTHREAD_MAGIC); |
115 |
147851 |
AN(bt); |
116 |
|
|
117 |
147851 |
bt->name = name; |
118 |
147851 |
bt->func = func; |
119 |
147851 |
bt->priv = priv; |
120 |
147851 |
PTOK(pthread_create(thr, NULL, wrk_bgthread, bt)); |
121 |
147851 |
} |
122 |
|
|
123 |
|
/*--------------------------------------------------------------------*/ |
124 |
|
|
125 |
|
static void |
126 |
864 |
WRK_Thread(struct pool *qp, size_t stacksize, unsigned thread_workspace) |
127 |
|
{ |
128 |
|
// child_signal_handler stack overflow check uses struct worker addr |
129 |
|
struct worker *w, ww; |
130 |
|
struct VSC_main_wrk ds; |
131 |
864 |
unsigned char ws[thread_workspace]; |
132 |
|
struct worker_priv wpriv[1]; |
133 |
864 |
unsigned char vpi[vpi_wrk_len]; |
134 |
|
|
135 |
864 |
AN(qp); |
136 |
864 |
AN(stacksize); |
137 |
864 |
AN(thread_workspace); |
138 |
|
|
139 |
864 |
THR_SetName("cache-worker"); |
140 |
864 |
w = &ww; |
141 |
864 |
INIT_OBJ(w, WORKER_MAGIC); |
142 |
864 |
INIT_OBJ(wpriv, WORKER_PRIV_MAGIC); |
143 |
864 |
w->wpriv = wpriv; |
144 |
864 |
w->lastused = NAN; |
145 |
864 |
memset(&ds, 0, sizeof ds); |
146 |
864 |
w->stats = &ds; |
147 |
864 |
THR_SetWorker(w); |
148 |
864 |
PTOK(pthread_cond_init(&w->cond, NULL)); |
149 |
|
|
150 |
864 |
WS_Init(w->aws, "wrk", ws, thread_workspace); |
151 |
864 |
VPI_wrk_init(w, vpi, sizeof vpi); |
152 |
864 |
AN(w->vpi); |
153 |
|
|
154 |
864 |
VSL(SLT_WorkThread, NO_VXID, "%p start", w); |
155 |
|
|
156 |
864 |
Pool_Work_Thread(qp, w); |
157 |
864 |
AZ(w->pool); |
158 |
|
|
159 |
864 |
VSL(SLT_WorkThread, NO_VXID, "%p end", w); |
160 |
864 |
if (w->wpriv->vcl != NULL) |
161 |
78 |
VCL_Rel(&w->wpriv->vcl); |
162 |
864 |
PTOK(pthread_cond_destroy(&w->cond)); |
163 |
864 |
HSH_Cleanup(w); |
164 |
864 |
Pool_Sumstat(w); |
165 |
864 |
} |
166 |
|
|
167 |
|
/*-------------------------------------------------------------------- |
168 |
|
* Summing of stats into pool counters |
169 |
|
*/ |
170 |
|
|
171 |
|
static unsigned |
172 |
1126634 |
wrk_addstat(const struct worker *wrk, const struct pool_task *tp, unsigned locked) |
173 |
|
{ |
174 |
|
struct pool *pp; |
175 |
|
|
176 |
1126634 |
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); |
177 |
1126634 |
pp = wrk->pool; |
178 |
1126634 |
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC); |
179 |
1126634 |
if (locked) |
180 |
1126645 |
Lck_AssertHeld(&pp->mtx); |
181 |
|
|
182 |
1126658 |
if ((tp == NULL && wrk->stats->summs > 0) || |
183 |
295 |
(wrk->stats->summs >= cache_param->wthread_stats_rate)) { |
184 |
2061349 |
if (!locked) |
185 |
0 |
Lck_Lock(&pp->mtx); |
186 |
|
|
187 |
191377 |
pp->a_stat->summs++; |
188 |
191377 |
VSC_main_Summ_wrk_wrk(pp->a_stat, wrk->stats); |
189 |
191377 |
memset(wrk->stats, 0, sizeof *wrk->stats); |
190 |
|
|
191 |
191377 |
if (!locked) |
192 |
0 |
Lck_Unlock(&pp->mtx); |
193 |
191377 |
} |
194 |
|
|
195 |
1126068 |
return (tp != NULL); |
196 |
|
} |
197 |
|
|
198 |
|
void |
199 |
0 |
WRK_AddStat(const struct worker *wrk) |
200 |
|
{ |
201 |
|
|
202 |
0 |
(void)wrk_addstat(wrk, wrk->task, 0); |
203 |
0 |
wrk->stats->summs++; |
204 |
0 |
} |
205 |
|
|
206 |
|
/*-------------------------------------------------------------------- |
207 |
|
* Pool reserve calculation |
208 |
|
*/ |
209 |
|
|
210 |
|
static unsigned |
211 |
1391958 |
pool_reserve(void) |
212 |
|
{ |
213 |
|
unsigned lim; |
214 |
|
|
215 |
1391958 |
if (cache_param->wthread_reserve == 0) { |
216 |
1390294 |
lim = cache_param->wthread_min / 20 + 1; |
217 |
1390294 |
} else { |
218 |
1664 |
lim = cache_param->wthread_min * 950 / 1000; |
219 |
1664 |
if (cache_param->wthread_reserve < lim) |
220 |
1387 |
lim = cache_param->wthread_reserve; |
221 |
|
} |
222 |
1391956 |
if (lim < TASK_QUEUE_RESERVE) |
223 |
1375317 |
return (TASK_QUEUE_RESERVE); |
224 |
16629 |
return (lim); |
225 |
1391946 |
} |
226 |
|
|
227 |
|
/*--------------------------------------------------------------------*/ |
228 |
|
|
229 |
|
static struct worker * |
230 |
265353 |
pool_getidleworker(struct pool *pp, enum task_prio prio) |
231 |
|
{ |
232 |
265353 |
struct pool_task *pt = NULL; |
233 |
|
struct worker *wrk; |
234 |
|
|
235 |
265353 |
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC); |
236 |
265353 |
Lck_AssertHeld(&pp->mtx); |
237 |
265353 |
if (pp->nidle > (pool_reserve() * prio / TASK_QUEUE_RESERVE)) { |
238 |
264864 |
pt = VTAILQ_FIRST(&pp->idle_queue); |
239 |
264864 |
if (pt == NULL) |
240 |
0 |
AZ(pp->nidle); |
241 |
264864 |
} |
242 |
|
|
243 |
265353 |
if (pt == NULL) |
244 |
489 |
return (NULL); |
245 |
|
|
246 |
264864 |
AZ(pt->func); |
247 |
264864 |
CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC); |
248 |
|
|
249 |
264864 |
AN(pp->nidle); |
250 |
264864 |
VTAILQ_REMOVE(&pp->idle_queue, wrk->task, list); |
251 |
264864 |
pp->nidle--; |
252 |
|
|
253 |
264864 |
return (wrk); |
254 |
265353 |
} |
255 |
|
|
256 |
|
/*-------------------------------------------------------------------- |
257 |
|
* Special scheduling: If no thread can be found, the current thread |
258 |
|
* will be prepared for rescheduling instead. |
259 |
|
* The selected threads workspace is reserved and the argument put there. |
260 |
|
* Return one if another thread was scheduled, otherwise zero. |
261 |
|
*/ |
262 |
|
|
263 |
|
int |
264 |
84489 |
Pool_Task_Arg(struct worker *wrk, enum task_prio prio, task_func_t *func, |
265 |
|
const void *arg, size_t arg_len) |
266 |
|
{ |
267 |
|
struct pool *pp; |
268 |
|
struct worker *wrk2; |
269 |
|
int retval; |
270 |
|
|
271 |
84489 |
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); |
272 |
84489 |
AN(arg); |
273 |
84489 |
AN(arg_len); |
274 |
84489 |
pp = wrk->pool; |
275 |
84489 |
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC); |
276 |
|
|
277 |
84489 |
Lck_Lock(&pp->mtx); |
278 |
84489 |
wrk2 = pool_getidleworker(pp, prio); |
279 |
84489 |
if (wrk2 != NULL) |
280 |
84362 |
retval = 1; |
281 |
|
else { |
282 |
127 |
wrk2 = wrk; |
283 |
127 |
retval = 0; |
284 |
|
} |
285 |
84489 |
AZ(wrk2->task->func); |
286 |
84489 |
assert(arg_len <= WS_ReserveSize(wrk2->aws, arg_len)); |
287 |
84489 |
memcpy(WS_Reservation(wrk2->aws), arg, arg_len); |
288 |
84489 |
wrk2->task->func = func; |
289 |
84489 |
wrk2->task->priv = WS_Reservation(wrk2->aws); |
290 |
84489 |
Lck_Unlock(&pp->mtx); |
291 |
|
// see signaling_note at the top for explanation |
292 |
84489 |
if (retval) |
293 |
84369 |
PTOK(pthread_cond_signal(&wrk2->cond)); |
294 |
84489 |
return (retval); |
295 |
|
} |
296 |
|
|
297 |
|
/*-------------------------------------------------------------------- |
298 |
|
* Enter a new task to be done |
299 |
|
*/ |
300 |
|
|
301 |
|
int |
302 |
180894 |
Pool_Task(struct pool *pp, struct pool_task *task, enum task_prio prio) |
303 |
|
{ |
304 |
|
struct worker *wrk; |
305 |
180894 |
int retval = 0; |
306 |
180894 |
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC); |
307 |
180894 |
AN(task); |
308 |
180894 |
AN(task->func); |
309 |
180894 |
assert(prio < TASK_QUEUE__END); |
310 |
|
|
311 |
180894 |
if (prio == TASK_QUEUE_REQ && reqpoolfail) { |
312 |
40 |
retval = reqpoolfail & 1; |
313 |
40 |
reqpoolfail >>= 1; |
314 |
40 |
if (retval) { |
315 |
80 |
VSL(SLT_Debug, NO_VXID, |
316 |
|
"Failing due to reqpoolfail (next= 0x%jx)", |
317 |
40 |
reqpoolfail); |
318 |
40 |
return (retval); |
319 |
|
} |
320 |
0 |
} |
321 |
|
|
322 |
180854 |
Lck_Lock(&pp->mtx); |
323 |
|
|
324 |
|
/* The common case first: Take an idle thread, do it. */ |
325 |
|
|
326 |
180854 |
wrk = pool_getidleworker(pp, prio); |
327 |
180854 |
if (wrk != NULL) { |
328 |
180492 |
AZ(wrk->task->func); |
329 |
180492 |
wrk->task->func = task->func; |
330 |
180492 |
wrk->task->priv = task->priv; |
331 |
180492 |
Lck_Unlock(&pp->mtx); |
332 |
|
// see signaling_note at the top for explanation |
333 |
180492 |
PTOK(pthread_cond_signal(&wrk->cond)); |
334 |
180492 |
return (0); |
335 |
|
} |
336 |
|
|
337 |
|
/* Vital work is always queued. Only priority classes that can |
338 |
|
* fit under the reserve capacity are eligible to queuing. |
339 |
|
*/ |
340 |
362 |
if (prio >= TASK_QUEUE_RESERVE) { |
341 |
40 |
retval = -1; |
342 |
362 |
} else if (!TASK_QUEUE_LIMITED(prio) || |
343 |
644 |
pp->lqueue + pp->nthr < cache_param->wthread_max + |
344 |
322 |
cache_param->wthread_queue_limit) { |
345 |
282 |
pp->stats->sess_queued++; |
346 |
282 |
pp->lqueue++; |
347 |
282 |
VTAILQ_INSERT_TAIL(&pp->queues[prio], task, list); |
348 |
282 |
PTOK(pthread_cond_signal(&pp->herder_cond)); |
349 |
282 |
} else { |
350 |
|
/* NB: This is counter-intuitive but when we drop a REQ |
351 |
|
* task, it is an HTTP/1 request and we effectively drop |
352 |
|
* the whole session. It is otherwise an h2 stream with |
353 |
|
* STR priority in which case we are dropping a request. |
354 |
|
*/ |
355 |
40 |
if (prio == TASK_QUEUE_REQ) |
356 |
0 |
pp->stats->sess_dropped++; |
357 |
|
else |
358 |
40 |
pp->stats->req_dropped++; |
359 |
40 |
retval = -1; |
360 |
|
} |
361 |
362 |
Lck_Unlock(&pp->mtx); |
362 |
362 |
return (retval); |
363 |
180894 |
} |
364 |
|
|
365 |
|
/*-------------------------------------------------------------------- |
366 |
|
* Empty function used as a pointer value for the thread exit condition. |
367 |
|
*/ |
368 |
|
|
369 |
|
static void v_matchproto_(task_func_t) |
370 |
0 |
pool_kiss_of_death(struct worker *wrk, void *priv) |
371 |
|
{ |
372 |
0 |
(void)wrk; |
373 |
0 |
(void)priv; |
374 |
0 |
} |
375 |
|
|
376 |
|
|
377 |
|
/*-------------------------------------------------------------------- |
378 |
|
* This is the work function for worker threads in the pool. |
379 |
|
*/ |
380 |
|
|
381 |
|
static void |
382 |
1123756 |
Pool_Work_Thread(struct pool *pp, struct worker *wrk) |
383 |
|
{ |
384 |
|
struct pool_task *tp; |
385 |
|
struct pool_task tpx, tps; |
386 |
|
vtim_real tmo, now; |
387 |
|
unsigned i, reserve; |
388 |
|
|
389 |
1123756 |
CHECK_OBJ_NOTNULL(pp, POOL_MAGIC); |
390 |
1123756 |
wrk->pool = pp; |
391 |
1126645 |
while (1) { |
392 |
1126645 |
CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC); |
393 |
1126645 |
tp = NULL; |
394 |
|
|
395 |
1126645 |
WS_Rollback(wrk->aws, 0); |
396 |
1126645 |
AZ(wrk->vsl); |
397 |
|
|
398 |
1126645 |
Lck_Lock(&pp->mtx); |
399 |
1126645 |
reserve = pool_reserve(); |
400 |
|
|
401 |
6025227 |
for (i = 0; i < TASK_QUEUE_RESERVE; i++) { |
402 |
5195810 |
if (pp->nidle < (reserve * i / TASK_QUEUE_RESERVE)) |
403 |
296946 |
break; |
404 |
4898864 |
tp = VTAILQ_FIRST(&pp->queues[i]); |
405 |
4898864 |
if (tp != NULL) { |
406 |
282 |
pp->lqueue--; |
407 |
282 |
pp->ndequeued--; |
408 |
282 |
VTAILQ_REMOVE(&pp->queues[i], tp, list); |
409 |
282 |
break; |
410 |
|
} |
411 |
4898582 |
} |
412 |
|
|
413 |
1126645 |
if (wrk_addstat(wrk, tp, 1)) { |
414 |
282 |
wrk->stats->summs++; |
415 |
282 |
AN(tp); |
416 |
1126645 |
} else if (pp->b_stat != NULL && pp->a_stat->summs) { |
417 |
|
/* Nothing to do, push pool stats into global pool */ |
418 |
191309 |
tps.func = pool_stat_summ; |
419 |
191309 |
tps.priv = pp->a_stat; |
420 |
191309 |
pp->a_stat = pp->b_stat; |
421 |
191309 |
pp->b_stat = NULL; |
422 |
191309 |
tp = &tps; |
423 |
191309 |
} else { |
424 |
|
/* Nothing to do: To sleep, perchance to dream ... */ |
425 |
935054 |
if (isnan(wrk->lastused)) |
426 |
743675 |
wrk->lastused = VTIM_real(); |
427 |
935054 |
wrk->task->func = NULL; |
428 |
935054 |
wrk->task->priv = wrk; |
429 |
935054 |
VTAILQ_INSERT_HEAD(&pp->idle_queue, wrk->task, list); |
430 |
935054 |
pp->nidle++; |
431 |
935054 |
now = wrk->lastused; |
432 |
935054 |
do { |
433 |
|
// see signaling_note at the top for explanation |
434 |
943762 |
if (DO_DEBUG(DBG_VCLREL) && |
435 |
2520 |
pp->b_stat == NULL && pp->a_stat->summs) |
436 |
|
/* We've released the VCL, but |
437 |
|
* there are pool stats not pushed |
438 |
|
* to the global stats and some |
439 |
|
* thread is busy pushing |
440 |
|
* stats. Set a 1 second timeout |
441 |
|
* so that we'll wake up and get a |
442 |
|
* chance to push stats. */ |
443 |
0 |
tmo = now + 1.; |
444 |
943762 |
else if (wrk->wpriv->vcl == NULL) |
445 |
860172 |
tmo = INFINITY; |
446 |
83590 |
else if (DO_DEBUG(DBG_VTC_MODE)) |
447 |
83590 |
tmo = now + 1.; |
448 |
|
else |
449 |
0 |
tmo = now + 60.; |
450 |
943762 |
(void)Lck_CondWaitUntil( |
451 |
943762 |
&wrk->cond, &pp->mtx, tmo); |
452 |
943762 |
if (wrk->task->func != NULL) { |
453 |
|
/* We have been handed a new task */ |
454 |
935053 |
tpx = *wrk->task; |
455 |
935053 |
tp = &tpx; |
456 |
935053 |
wrk->stats->summs++; |
457 |
943762 |
} else if (pp->b_stat != NULL && |
458 |
8709 |
pp->a_stat->summs) { |
459 |
|
/* Woken up to release the VCL, |
460 |
|
* and noticing that there are |
461 |
|
* pool stats not pushed to the |
462 |
|
* global stats and no active |
463 |
|
* thread currently doing |
464 |
|
* it. Remove ourself from the |
465 |
|
* idle queue and take on the |
466 |
|
* task. */ |
467 |
0 |
assert(pp->nidle > 0); |
468 |
0 |
VTAILQ_REMOVE(&pp->idle_queue, |
469 |
|
wrk->task, list); |
470 |
0 |
pp->nidle--; |
471 |
0 |
tps.func = pool_stat_summ; |
472 |
0 |
tps.priv = pp->a_stat; |
473 |
0 |
pp->a_stat = pp->b_stat; |
474 |
0 |
pp->b_stat = NULL; |
475 |
0 |
tp = &tps; |
476 |
0 |
} else { |
477 |
|
// Presumably ETIMEDOUT but we do not |
478 |
|
// assert this because pthread condvars |
479 |
|
// are not airtight. |
480 |
8709 |
if (wrk->wpriv->vcl) |
481 |
8709 |
VCL_Rel(&wrk->wpriv->vcl); |
482 |
8709 |
now = VTIM_real(); |
483 |
|
} |
484 |
943762 |
} while (tp == NULL); |
485 |
|
} |
486 |
1126645 |
Lck_Unlock(&pp->mtx); |
487 |
|
|
488 |
1126645 |
if (tp->func == pool_kiss_of_death) |
489 |
864 |
break; |
490 |
|
|
491 |
1125781 |
do { |
492 |
1687661 |
memset(wrk->task, 0, sizeof wrk->task); |
493 |
1687661 |
assert(wrk->pool == pp); |
494 |
563901 |
AN(tp->func); |
495 |
563901 |
tp->func(wrk, tp->priv); |
496 |
563901 |
if (DO_DEBUG(DBG_VCLREL) && wrk->wpriv->vcl != NULL) |
497 |
440 |
VCL_Rel(&wrk->wpriv->vcl); |
498 |
563901 |
tpx = *wrk->task; |
499 |
563901 |
tp = &tpx; |
500 |
563901 |
} while (tp->func != NULL); |
501 |
|
|
502 |
2889 |
if (WS_Overflowed(wrk->aws)) |
503 |
40 |
wrk->stats->ws_thread_overflow++; |
504 |
|
/* cleanup for next task */ |
505 |
2889 |
wrk->seen_methods = 0; |
506 |
|
} |
507 |
864 |
wrk->pool = NULL; |
508 |
864 |
} |
509 |
|
|
510 |
|
/*-------------------------------------------------------------------- |
511 |
|
* Create another worker thread. |
512 |
|
*/ |
513 |
|
|
514 |
|
struct pool_info { |
515 |
|
unsigned magic; |
516 |
|
#define POOL_INFO_MAGIC 0x4e4442d3 |
517 |
|
size_t stacksize; |
518 |
|
struct pool *qp; |
519 |
|
}; |
520 |
|
|
521 |
|
static void * |
522 |
864 |
pool_thread(void *priv) |
523 |
|
{ |
524 |
|
struct pool_info *pi; |
525 |
|
|
526 |
864 |
CAST_OBJ_NOTNULL(pi, priv, POOL_INFO_MAGIC); |
527 |
864 |
THR_Init(); |
528 |
864 |
WRK_Thread(pi->qp, pi->stacksize, cache_param->workspace_thread); |
529 |
864 |
FREE_OBJ(pi); |
530 |
864 |
return (NULL); |
531 |
|
} |
532 |
|
|
533 |
|
static void |
534 |
743916 |
pool_breed(struct pool *qp) |
535 |
|
{ |
536 |
|
pthread_t tp; |
537 |
|
pthread_attr_t tp_attr; |
538 |
|
struct pool_info *pi; |
539 |
|
|
540 |
743916 |
PTOK(pthread_attr_init(&tp_attr)); |
541 |
743916 |
PTOK(pthread_attr_setdetachstate(&tp_attr, PTHREAD_CREATE_DETACHED)); |
542 |
|
|
543 |
|
/* Set the stacksize for worker threads we create */ |
544 |
743916 |
if (cache_param->wthread_stacksize != UINT_MAX) |
545 |
743916 |
PTOK(pthread_attr_setstacksize(&tp_attr, cache_param->wthread_stacksize)); |
546 |
|
|
547 |
743916 |
ALLOC_OBJ(pi, POOL_INFO_MAGIC); |
548 |
743916 |
AN(pi); |
549 |
743916 |
PTOK(pthread_attr_getstacksize(&tp_attr, &pi->stacksize)); |
550 |
743916 |
pi->qp = qp; |
551 |
|
|
552 |
743916 |
errno = pthread_create(&tp, &tp_attr, pool_thread, pi); |
553 |
743916 |
if (errno) { |
554 |
0 |
FREE_OBJ(pi); |
555 |
0 |
VSL(SLT_Debug, NO_VXID, "Create worker thread failed %d %s", |
556 |
0 |
errno, VAS_errtxt(errno)); |
557 |
0 |
Lck_Lock(&pool_mtx); |
558 |
0 |
VSC_C_main->threads_failed++; |
559 |
0 |
Lck_Unlock(&pool_mtx); |
560 |
0 |
VTIM_sleep(cache_param->wthread_fail_delay); |
561 |
0 |
} else { |
562 |
743916 |
qp->nthr++; |
563 |
743916 |
Lck_Lock(&pool_mtx); |
564 |
743916 |
VSC_C_main->threads++; |
565 |
743916 |
VSC_C_main->threads_created++; |
566 |
743916 |
Lck_Unlock(&pool_mtx); |
567 |
743916 |
if (cache_param->wthread_add_delay > 0.0) |
568 |
612 |
VTIM_sleep(cache_param->wthread_add_delay); |
569 |
|
else |
570 |
743304 |
(void)sched_yield(); |
571 |
|
} |
572 |
|
|
573 |
743916 |
PTOK(pthread_attr_destroy(&tp_attr)); |
574 |
743916 |
} |
575 |
|
|
576 |
|
/*-------------------------------------------------------------------- |
577 |
|
* Herd a single pool |
578 |
|
* |
579 |
|
* This thread wakes up every thread_pool_timeout seconds, whenever a pool |
580 |
|
* queues and when threads need to be destroyed |
581 |
|
* |
582 |
|
* The trick here is to not be too aggressive about creating threads. In |
583 |
|
* pool_breed(), we sleep whenever we create a thread and a little while longer |
584 |
|
* whenever we fail to, hopefully missing a lot of cond_signals in the meantime. |
585 |
|
* |
586 |
|
* Idle threads are destroyed at a rate determined by wthread_destroy_delay |
587 |
|
* |
588 |
|
* XXX: probably need a lot more work. |
589 |
|
* |
590 |
|
*/ |
591 |
|
|
592 |
|
void* |
593 |
72458 |
pool_herder(void *priv) |
594 |
|
{ |
595 |
|
struct pool *pp; |
596 |
|
struct pool_task *pt; |
597 |
|
double t_idle; |
598 |
|
struct worker *wrk; |
599 |
|
double delay; |
600 |
|
unsigned wthread_min; |
601 |
72458 |
uintmax_t dq = (1ULL << 31); |
602 |
72458 |
vtim_mono dqt = 0; |
603 |
72458 |
int r = 0; |
604 |
|
|
605 |
72458 |
CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC); |
606 |
|
|
607 |
72458 |
THR_SetName("pool_herder"); |
608 |
72458 |
THR_Init(); |
609 |
|
|
610 |
1057041 |
while (!pp->die || pp->nthr > 0) { |
611 |
|
/* |
612 |
|
* If the worker pool is configured too small, we can |
613 |
|
* end up deadlocking it (see #2418 for details). |
614 |
|
* |
615 |
|
* Recovering from this would require a lot of complicated |
616 |
|
* code, and fundamentally, either people configured their |
617 |
|
* pools wrong, in which case we want them to notice, or |
618 |
|
* they are under DoS, in which case recovering gracefully |
619 |
|
* is unlikely be a major improvement. |
620 |
|
* |
621 |
|
* Instead we implement a watchdog and kill the worker if |
622 |
|
* nothing has been dequeued for that long. |
623 |
|
*/ |
624 |
984583 |
if (VTAILQ_EMPTY(&pp->queues[TASK_QUEUE_HIGHEST_PRIORITY])) { |
625 |
|
/* Watchdog only applies to no movement on the |
626 |
|
* highest priority queue (TASK_QUEUE_BO) */ |
627 |
984372 |
dq = pp->ndequeued + 1; |
628 |
984583 |
} else if (dq != pp->ndequeued) { |
629 |
190 |
dq = pp->ndequeued; |
630 |
190 |
dqt = VTIM_mono(); |
631 |
211 |
} else if (VTIM_mono() - dqt > cache_param->wthread_watchdog) { |
632 |
0 |
VSL(SLT_Error, NO_VXID, |
633 |
|
"Pool Herder: Queue does not move ql=%u dt=%f", |
634 |
0 |
pp->lqueue, VTIM_mono() - dqt); |
635 |
0 |
WRONG("Worker Pool Queue does not move" |
636 |
|
" - see thread_pool_watchdog parameter"); |
637 |
0 |
} |
638 |
984583 |
wthread_min = cache_param->wthread_min; |
639 |
984583 |
if (pp->die) |
640 |
1010 |
wthread_min = 0; |
641 |
|
|
642 |
|
/* Make more threads if needed and allowed */ |
643 |
984939 |
if (pp->nthr < wthread_min || |
644 |
240903 |
(pp->lqueue > 0 && pp->nthr < cache_param->wthread_max)) { |
645 |
743916 |
pool_breed(pp); |
646 |
743916 |
continue; |
647 |
|
} |
648 |
|
|
649 |
240667 |
delay = cache_param->wthread_timeout; |
650 |
240667 |
assert(pp->nthr >= wthread_min); |
651 |
|
|
652 |
240667 |
if (pp->nthr > wthread_min) { |
653 |
|
|
654 |
1167 |
t_idle = VTIM_real() - cache_param->wthread_timeout; |
655 |
|
|
656 |
1167 |
Lck_Lock(&pp->mtx); |
657 |
1167 |
wrk = NULL; |
658 |
1167 |
pt = VTAILQ_LAST(&pp->idle_queue, taskhead); |
659 |
1167 |
if (pt != NULL) { |
660 |
978 |
AN(pp->nidle); |
661 |
978 |
AZ(pt->func); |
662 |
978 |
CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC); |
663 |
|
|
664 |
978 |
if (pp->die || wrk->lastused < t_idle || |
665 |
114 |
pp->nthr > cache_param->wthread_max) { |
666 |
|
/* Give it a kiss on the cheek... */ |
667 |
864 |
VTAILQ_REMOVE(&pp->idle_queue, |
668 |
|
wrk->task, list); |
669 |
864 |
pp->nidle--; |
670 |
864 |
wrk->task->func = pool_kiss_of_death; |
671 |
864 |
PTOK(pthread_cond_signal(&wrk->cond)); |
672 |
864 |
} else { |
673 |
114 |
delay = wrk->lastused - t_idle; |
674 |
114 |
wrk = NULL; |
675 |
|
} |
676 |
978 |
} |
677 |
1167 |
Lck_Unlock(&pp->mtx); |
678 |
|
|
679 |
1167 |
if (wrk != NULL) { |
680 |
864 |
pp->nthr--; |
681 |
864 |
Lck_Lock(&pool_mtx); |
682 |
864 |
VSC_C_main->threads--; |
683 |
864 |
VSC_C_main->threads_destroyed++; |
684 |
864 |
Lck_Unlock(&pool_mtx); |
685 |
864 |
delay = cache_param->wthread_destroy_delay; |
686 |
864 |
} else |
687 |
303 |
delay = vmax(delay, |
688 |
|
cache_param->wthread_destroy_delay); |
689 |
1167 |
} |
690 |
|
|
691 |
240667 |
if (pp->die) { |
692 |
986 |
if (delay < 2) |
693 |
824 |
delay = .01; |
694 |
|
else |
695 |
162 |
delay = 1; |
696 |
986 |
VTIM_sleep(delay); |
697 |
986 |
continue; |
698 |
|
} |
699 |
239681 |
Lck_Lock(&pp->mtx); |
700 |
239681 |
if (pp->lqueue == 0) { |
701 |
239561 |
if (DO_DEBUG(DBG_VTC_MODE)) |
702 |
239417 |
delay = 0.5; |
703 |
239561 |
r = Lck_CondWaitTimeout( |
704 |
239561 |
&pp->herder_cond, &pp->mtx, delay); |
705 |
239681 |
} else if (pp->nthr >= cache_param->wthread_max) { |
706 |
|
/* XXX: unsafe counters */ |
707 |
120 |
if (r != ETIMEDOUT) |
708 |
40 |
VSC_C_main->threads_limited++; |
709 |
120 |
r = Lck_CondWaitTimeout( |
710 |
120 |
&pp->herder_cond, &pp->mtx, 1.0); |
711 |
120 |
} |
712 |
239681 |
Lck_Unlock(&pp->mtx); |
713 |
|
} |
714 |
72458 |
return (NULL); |
715 |
|
} |
716 |
|
|
717 |
|
/*-------------------------------------------------------------------- |
718 |
|
* Debugging aids |
719 |
|
*/ |
720 |
|
|
721 |
|
static void v_matchproto_(cli_func_t) |
722 |
40 |
debug_reqpoolfail(struct cli *cli, const char * const *av, void *priv) |
723 |
|
{ |
724 |
40 |
uintmax_t u = 1; |
725 |
|
const char *p; |
726 |
|
|
727 |
40 |
(void)priv; |
728 |
40 |
(void)cli; |
729 |
40 |
reqpoolfail = 0; |
730 |
80 |
for (p = av[2]; *p != '\0'; p++) { |
731 |
40 |
if (*p == 'F' || *p == 'f') |
732 |
40 |
reqpoolfail |= u; |
733 |
40 |
u <<= 1; |
734 |
40 |
} |
735 |
40 |
} |
736 |
|
|
737 |
|
static struct cli_proto debug_cmds[] = { |
738 |
|
{ CLICMD_DEBUG_REQPOOLFAIL, "d", debug_reqpoolfail }, |
739 |
|
{ NULL } |
740 |
|
}; |
741 |
|
|
742 |
|
void |
743 |
172250 |
WRK_Log(enum VSL_tag_e tag, const char *fmt, ...) |
744 |
|
{ |
745 |
|
struct worker *wrk; |
746 |
|
va_list ap; |
747 |
|
|
748 |
172250 |
AN(fmt); |
749 |
|
|
750 |
172250 |
wrk = THR_GetWorker(); |
751 |
172250 |
CHECK_OBJ_ORNULL(wrk, WORKER_MAGIC); |
752 |
|
|
753 |
172250 |
va_start(ap, fmt); |
754 |
172250 |
if (wrk != NULL && wrk->vsl != NULL) |
755 |
107763 |
VSLbv(wrk->vsl, tag, fmt, ap); |
756 |
|
else |
757 |
64487 |
VSLv(tag, NO_VXID, fmt, ap); |
758 |
172250 |
va_end(ap); |
759 |
172250 |
} |
760 |
|
|
761 |
|
/*-------------------------------------------------------------------- |
762 |
|
* |
763 |
|
*/ |
764 |
|
|
765 |
|
void |
766 |
36630 |
WRK_Init(void) |
767 |
|
{ |
768 |
36630 |
assert(cache_param->wthread_min >= TASK_QUEUE_RESERVE); |
769 |
36630 |
CLI_AddFuncs(debug_cmds); |
770 |
36630 |
} |