varnish-cache/bin/varnishd/cache/cache_wrk.c
0
/*-
1
 * Copyright (c) 2006 Verdens Gang AS
2
 * Copyright (c) 2006-2011 Varnish Software AS
3
 * All rights reserved.
4
 *
5
 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
6
 *
7
 * SPDX-License-Identifier: BSD-2-Clause
8
 *
9
 * Redistribution and use in source and binary forms, with or without
10
 * modification, are permitted provided that the following conditions
11
 * are met:
12
 * 1. Redistributions of source code must retain the above copyright
13
 *    notice, this list of conditions and the following disclaimer.
14
 * 2. Redistributions in binary form must reproduce the above copyright
15
 *    notice, this list of conditions and the following disclaimer in the
16
 *    documentation and/or other materials provided with the distribution.
17
 *
18
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
22
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28
 * SUCH DAMAGE.
29
 *
30
 * Worker thread stuff unrelated to the worker thread pools.
31
 *
32
 * --
33
 * signaling_note:
34
 *
35
 * note on worker wakeup signaling through the wrk condition variable (cv)
36
 *
37
 * In the general case, a cv needs to be signaled while holding the
38
 * corresponding mutex, otherwise the signal may be posted before the waiting
39
 * thread could register itself on the cv and, consequently, the signal may be
40
 * missed.
41
 *
42
 * In our case, any worker thread which we wake up comes from the idle queue,
43
 * where it put itself under the mutex, releasing that mutex implicitly via
44
 * Lck_CondWaitUntil() (which calls some variant of pthread_cond_wait). So we avoid
45
 * additional mutex contention knowing that any worker thread on the idle queue
46
 * is blocking on the cv.
47
 *
48
 * Except -- when it isn't, because it woke up for releasing its VCL
49
 * Reference. To account for this case, we check if the task function has been
50
 * set in the meantime, which in turn requires all of the task preparation to be
51
 * done holding the pool mutex. (see also #2719)
52
 */
53
54
#include "config.h"
55
56
#include <stdlib.h>
57
#include <sched.h>
58
59
#include "cache_varnishd.h"
60
#include "cache_pool.h"
61
62
#include "vcli_serve.h"
63
#include "vtim.h"
64
65
#include "hash/hash_slinger.h"
66
67
static void Pool_Work_Thread(struct pool *pp, struct worker *wrk);
68
69
static uintmax_t reqpoolfail;
70
71
/*--------------------------------------------------------------------
72
 * Create and start a back-ground thread which as its own worker and
73
 * session data structures;
74
 */
75
76
struct bgthread {
77
        unsigned        magic;
78
#define BGTHREAD_MAGIC  0x23b5152b
79
        const char      *name;
80
        bgthread_t      *func;
81
        void            *priv;
82
};
83
84
static void *
85 153453
wrk_bgthread(void *arg)
86
{
87
        struct bgthread *bt;
88
        struct worker wrk;
89
        struct worker_priv wpriv[1];
90
        struct VSC_main_wrk ds;
91
        void *r;
92
93 153453
        CAST_OBJ_NOTNULL(bt, arg, BGTHREAD_MAGIC);
94 153453
        THR_SetName(bt->name);
95 153453
        THR_Init();
96 153453
        INIT_OBJ(&wrk, WORKER_MAGIC);
97 153453
        INIT_OBJ(wpriv, WORKER_PRIV_MAGIC);
98 153453
        wrk.wpriv = wpriv;
99
        // bgthreads do not have a vpi member
100 153453
        memset(&ds, 0, sizeof ds);
101 153453
        wrk.stats = &ds;
102
103 153453
        r = bt->func(&wrk, bt->priv);
104 153453
        HSH_Cleanup(&wrk);
105 153453
        Pool_Sumstat(&wrk);
106 153453
        return (r);
107
}
108
109
void
110 153453
WRK_BgThread(pthread_t *thr, const char *name, bgthread_t *func, void *priv)
111
{
112
        struct bgthread *bt;
113
114 153453
        ALLOC_OBJ(bt, BGTHREAD_MAGIC);
115 153453
        AN(bt);
116
117 153453
        bt->name = name;
118 153453
        bt->func = func;
119 153453
        bt->priv = priv;
120 153453
        PTOK(pthread_create(thr, NULL, wrk_bgthread, bt));
121 153453
}
122
123
/*--------------------------------------------------------------------*/
124
125
static void
126 886
WRK_Thread(struct pool *qp, size_t stacksize, unsigned thread_workspace)
127
{
128
        // child_signal_handler stack overflow check uses struct worker addr
129
        struct worker *w, ww;
130
        struct VSC_main_wrk ds;
131 886
        unsigned char ws[thread_workspace];
132
        struct worker_priv wpriv[1];
133 886
        unsigned char vpi[vpi_wrk_len];
134
135 886
        AN(qp);
136 886
        AN(stacksize);
137 886
        AN(thread_workspace);
138
139 886
        THR_SetName("cache-worker");
140 886
        w = &ww;
141 886
        INIT_OBJ(w, WORKER_MAGIC);
142 886
        INIT_OBJ(wpriv, WORKER_PRIV_MAGIC);
143 886
        w->wpriv = wpriv;
144 886
        w->lastused = NAN;
145 886
        memset(&ds, 0, sizeof ds);
146 886
        w->stats = &ds;
147 886
        THR_SetWorker(w);
148 886
        PTOK(pthread_cond_init(&w->cond, NULL));
149
150 886
        WS_Init(w->aws, "wrk", ws, thread_workspace);
151 886
        VPI_wrk_init(w, vpi, sizeof vpi);
152 886
        AN(w->vpi);
153
154 886
        VSL(SLT_WorkThread, NO_VXID, "%p start", w);
155
156 886
        Pool_Work_Thread(qp, w);
157 886
        AZ(w->pool);
158
159 886
        VSL(SLT_WorkThread, NO_VXID, "%p end", w);
160 886
        if (w->wpriv->vcl != NULL)
161 74
                VCL_Rel(&w->wpriv->vcl);
162 886
        PTOK(pthread_cond_destroy(&w->cond));
163 886
        HSH_Cleanup(w);
164 886
        Pool_Sumstat(w);
165 886
}
166
167
/*--------------------------------------------------------------------
168
 * Summing of stats into pool counters
169
 */
170
171
static unsigned
172 1178000
wrk_addstat(const struct worker *wrk, const struct pool_task *tp, unsigned locked)
173
{
174
        struct pool *pp;
175
176 1178000
        CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
177 1178000
        pp = wrk->pool;
178 1178000
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
179 1178000
        if (locked)
180 1178017
                Lck_AssertHeld(&pp->mtx);
181
182 1178034
        if ((tp == NULL && wrk->stats->summs > 0) ||
183 301
            (wrk->stats->summs >= cache_param->wthread_stats_rate)) {
184 2152145
                if (!locked)
185 0
                        Lck_Lock(&pp->mtx);
186
187 203321
                pp->a_stat->summs++;
188 203321
                VSC_main_Summ_wrk_wrk(pp->a_stat, wrk->stats);
189 203321
                memset(wrk->stats, 0, sizeof *wrk->stats);
190
191 203321
                if (!locked)
192 0
                        Lck_Unlock(&pp->mtx);
193 203321
        }
194
195 1177432
        return (tp != NULL);
196
}
197
198
void
199 0
WRK_AddStat(const struct worker *wrk)
200
{
201
202 0
        (void)wrk_addstat(wrk, wrk->task, 0);
203 0
        wrk->stats->summs++;
204 0
}
205
206
/*--------------------------------------------------------------------
207
 * Pool reserve calculation
208
 */
209
210
static unsigned
211 1457756
pool_reserve(void)
212
{
213
        unsigned lim;
214
215 1457756
        if (cache_param->wthread_reserve == 0) {
216 1455890
                lim = cache_param->wthread_min / 20 + 1;
217 1455890
        } else {
218 1866
                lim = cache_param->wthread_min * 950 / 1000;
219 1866
                if (cache_param->wthread_reserve < lim)
220 1146
                        lim = cache_param->wthread_reserve;
221
        }
222 1457756
        if (lim < TASK_QUEUE_RESERVE)
223 1441276
                return (TASK_QUEUE_RESERVE);
224 16480
        return (lim);
225 1457756
}
226
227
/*--------------------------------------------------------------------*/
228
229
static struct worker *
230 279788
pool_getidleworker(struct pool *pp, enum task_prio prio)
231
{
232 279788
        struct pool_task *pt = NULL;
233
        struct worker *wrk;
234
235 279788
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
236 279788
        Lck_AssertHeld(&pp->mtx);
237 279788
        if (pp->nidle > (pool_reserve() * prio / TASK_QUEUE_RESERVE)) {
238 279297
                pt = VTAILQ_FIRST(&pp->idle_queue);
239 279297
                if (pt == NULL)
240 0
                        AZ(pp->nidle);
241 279297
        }
242
243 279788
        if (pt == NULL)
244 489
                return (NULL);
245
246 279299
        AZ(pt->func);
247 279299
        CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
248
249 279299
        AN(pp->nidle);
250 279299
        VTAILQ_REMOVE(&pp->idle_queue, wrk->task, list);
251 279299
        pp->nidle--;
252
253 279299
        return (wrk);
254 279788
}
255
256
/*--------------------------------------------------------------------
257
 * Special scheduling:  If no thread can be found, the current thread
258
 * will be prepared for rescheduling instead.
259
 * The selected threads workspace is reserved and the argument put there.
260
 * Return one if another thread was scheduled, otherwise zero.
261
 */
262
263
int
264 88496
Pool_Task_Arg(struct worker *wrk, enum task_prio prio, task_func_t *func,
265
    const void *arg, size_t arg_len)
266
{
267
        struct pool *pp;
268
        struct worker *wrk2;
269
        int retval;
270
271 88496
        CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
272 88496
        AN(arg);
273 88496
        AN(arg_len);
274 88496
        pp = wrk->pool;
275 88496
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
276
277 88496
        Lck_Lock(&pp->mtx);
278 88496
        wrk2 = pool_getidleworker(pp, prio);
279 88496
        if (wrk2 != NULL)
280 88371
                retval = 1;
281
        else {
282 125
                wrk2 = wrk;
283 125
                retval = 0;
284
        }
285 88496
        AZ(wrk2->task->func);
286 88496
        assert(arg_len <= WS_ReserveSize(wrk2->aws, arg_len));
287 88496
        memcpy(WS_Reservation(wrk2->aws), arg, arg_len);
288 88496
        wrk2->task->func = func;
289 88496
        wrk2->task->priv = WS_Reservation(wrk2->aws);
290 88496
        Lck_Unlock(&pp->mtx);
291
        // see signaling_note at the top for explanation
292 88496
        if (retval)
293 88372
                PTOK(pthread_cond_signal(&wrk2->cond));
294 88496
        return (retval);
295
}
296
297
/*--------------------------------------------------------------------
298
 * Enter a new task to be done
299
 */
300
301
int
302 191329
Pool_Task(struct pool *pp, struct pool_task *task, enum task_prio prio)
303
{
304
        struct worker *wrk;
305 191329
        int retval = 0;
306 191329
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
307 191329
        AN(task);
308 191329
        AN(task->func);
309 191329
        assert(prio < TASK_QUEUE__END);
310
311 191329
        if (prio == TASK_QUEUE_REQ && reqpoolfail) {
312 40
                retval = reqpoolfail & 1;
313 40
                reqpoolfail >>= 1;
314 40
                if (retval) {
315 80
                        VSL(SLT_Debug, NO_VXID,
316
                            "Failing due to reqpoolfail (next= 0x%jx)",
317 40
                            reqpoolfail);
318 40
                        return (retval);
319
                }
320 0
        }
321
322 191289
        Lck_Lock(&pp->mtx);
323
324
        /* The common case first:  Take an idle thread, do it. */
325
326 191289
        wrk = pool_getidleworker(pp, prio);
327 191289
        if (wrk != NULL) {
328 190925
                AZ(wrk->task->func);
329 190925
                wrk->task->func = task->func;
330 190925
                wrk->task->priv = task->priv;
331 190925
                Lck_Unlock(&pp->mtx);
332
                // see signaling_note at the top for explanation
333 190925
                PTOK(pthread_cond_signal(&wrk->cond));
334 190925
                return (0);
335
        }
336
337
        /* Vital work is always queued. Only priority classes that can
338
         * fit under the reserve capacity are eligible to queuing.
339
         */
340 364
        if (prio >= TASK_QUEUE_RESERVE) {
341 40
                retval = -1;
342 364
        } else if (!TASK_QUEUE_LIMITED(prio) ||
343 648
            pp->lqueue + pp->nthr < cache_param->wthread_max +
344 324
            cache_param->wthread_queue_limit) {
345 284
                pp->stats->sess_queued++;
346 284
                pp->lqueue++;
347 284
                VTAILQ_INSERT_TAIL(&pp->queues[prio], task, list);
348 284
                PTOK(pthread_cond_signal(&pp->herder_cond));
349 284
        } else {
350
                /* NB: This is counter-intuitive but when we drop a REQ
351
                 * task, it is an HTTP/1 request and we effectively drop
352
                 * the whole session. It is otherwise an h2 stream with
353
                 * STR priority in which case we are dropping a request.
354
                 */
355 40
                if (prio == TASK_QUEUE_REQ)
356 0
                        pp->stats->sess_dropped++;
357
                else
358 40
                        pp->stats->req_dropped++;
359 40
                retval = -1;
360
        }
361 364
        Lck_Unlock(&pp->mtx);
362 364
        return (retval);
363 191329
}
364
365
/*--------------------------------------------------------------------
366
 * Empty function used as a pointer value for the thread exit condition.
367
 */
368
369
static void v_matchproto_(task_func_t)
370 0
pool_kiss_of_death(struct worker *wrk, void *priv)
371
{
372 0
        (void)wrk;
373 0
        (void)priv;
374 0
}
375
376
377
/*--------------------------------------------------------------------
378
 * This is the work function for worker threads in the pool.
379
 */
380
381
static void
382 1156328
Pool_Work_Thread(struct pool *pp, struct worker *wrk)
383
{
384
        struct pool_task *tp;
385
        struct pool_task tpx, tps;
386
        vtim_real tmo, now;
387
        unsigned i, reserve;
388
389 1156328
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
390 1156328
        wrk->pool = pp;
391 1178026
        while (1) {
392 1178026
                CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
393 1178026
                tp = NULL;
394
395 1178026
                WS_Rollback(wrk->aws, 0);
396 1178026
                AZ(wrk->vsl);
397
398 1178026
                Lck_Lock(&pp->mtx);
399 1178026
                reserve = pool_reserve();
400
401 6304287
                for (i = 0; i < TASK_QUEUE_RESERVE; i++) {
402 5435723
                        if (pp->nidle < (reserve * i / TASK_QUEUE_RESERVE))
403 309178
                                break;
404 5126545
                        tp = VTAILQ_FIRST(&pp->queues[i]);
405 5126545
                        if (tp != NULL) {
406 284
                                pp->lqueue--;
407 284
                                pp->ndequeued--;
408 284
                                VTAILQ_REMOVE(&pp->queues[i], tp, list);
409 284
                                break;
410
                        }
411 5126261
                }
412
413 1178026
                if (wrk_addstat(wrk, tp, 1)) {
414 284
                        wrk->stats->summs++;
415 284
                        AN(tp);
416 1178026
                } else if (pp->b_stat != NULL && pp->a_stat->summs) {
417
                        /* Nothing to do, push pool stats into global pool */
418 203263
                        tps.func = pool_stat_summ;
419 203263
                        tps.priv = pp->a_stat;
420 203263
                        pp->a_stat = pp->b_stat;
421 203263
                        pp->b_stat = NULL;
422 203263
                        tp = &tps;
423 203263
                } else {
424
                        /* Nothing to do: To sleep, perchance to dream ... */
425 974479
                        if (isnan(wrk->lastused))
426 771155
                                wrk->lastused = VTIM_real();
427 974479
                        wrk->task->func = NULL;
428 974479
                        wrk->task->priv = wrk;
429 974479
                        VTAILQ_INSERT_HEAD(&pp->idle_queue, wrk->task, list);
430 974479
                        pp->nidle++;
431 974479
                        now = wrk->lastused;
432 974479
                        do {
433
                                // see signaling_note at the top for explanation
434 1024758
                                if (DO_DEBUG(DBG_VCLREL) &&
435 44148
                                    pp->b_stat == NULL && pp->a_stat->summs)
436
                                        /* We've released the VCL, but
437
                                         * there are pool stats not pushed
438
                                         * to the global stats and some
439
                                         * thread is busy pushing
440
                                         * stats. Set a 1 second timeout
441
                                         * so that we'll wake up and get a
442
                                         * chance to push stats. */
443 0
                                        tmo = now + 1.;
444 1024758
                                else if (wrk->wpriv->vcl == NULL)
445 936739
                                        tmo = INFINITY;
446 88019
                                else if (DO_DEBUG(DBG_VTC_MODE))
447 88019
                                        tmo = now + 1.;
448
                                else
449 0
                                        tmo = now + 60.;
450 1024758
                                (void)Lck_CondWaitUntil(
451 1024758
                                    &wrk->cond, &pp->mtx, tmo);
452 1024758
                                if (wrk->task->func != NULL) {
453
                                        /* We have been handed a new task */
454 974476
                                        tpx = *wrk->task;
455 974476
                                        tp = &tpx;
456 974476
                                        wrk->stats->summs++;
457 1024758
                                } else if (pp->b_stat != NULL &&
458 50281
                                    pp->a_stat->summs) {
459
                                        /* Woken up to release the VCL,
460
                                         * and noticing that there are
461
                                         * pool stats not pushed to the
462
                                         * global stats and no active
463
                                         * thread currently doing
464
                                         * it. Remove ourself from the
465
                                         * idle queue and take on the
466
                                         * task. */
467 0
                                        assert(pp->nidle > 0);
468 0
                                        VTAILQ_REMOVE(&pp->idle_queue,
469
                                            wrk->task, list);
470 0
                                        pp->nidle--;
471 0
                                        tps.func = pool_stat_summ;
472 0
                                        tps.priv = pp->a_stat;
473 0
                                        pp->a_stat = pp->b_stat;
474 0
                                        pp->b_stat = NULL;
475 0
                                        tp = &tps;
476 0
                                } else {
477
                                        // Presumably ETIMEDOUT but we do not
478
                                        // assert this because pthread condvars
479
                                        // are not airtight.
480 50282
                                        if (wrk->wpriv->vcl)
481 50280
                                                VCL_Rel(&wrk->wpriv->vcl);
482 50282
                                        now = VTIM_real();
483
                                }
484 1024758
                        } while (tp == NULL);
485
                }
486 1178026
                Lck_Unlock(&pp->mtx);
487
488 1178026
                if (tp->func == pool_kiss_of_death)
489 886
                        break;
490
491 1177140
                do {
492 1754861
                        memset(wrk->task, 0, sizeof wrk->task);
493 1754861
                        assert(wrk->pool == pp);
494 599419
                        AN(tp->func);
495 599419
                        tp->func(wrk, tp->priv);
496 599419
                        if (DO_DEBUG(DBG_VCLREL) && wrk->wpriv->vcl != NULL)
497 890
                                VCL_Rel(&wrk->wpriv->vcl);
498 599419
                        tpx = *wrk->task;
499 599419
                        tp = &tpx;
500 599419
                } while (tp->func != NULL);
501
502 21698
                if (WS_Overflowed(wrk->aws))
503 40
                        wrk->stats->ws_thread_overflow++;
504
                /* cleanup for next task */
505 21698
                wrk->seen_methods = 0;
506
        }
507 886
        wrk->pool = NULL;
508 886
}
509
510
/*--------------------------------------------------------------------
511
 * Create another worker thread.
512
 */
513
514
struct pool_info {
515
        unsigned                magic;
516
#define POOL_INFO_MAGIC         0x4e4442d3
517
        size_t                  stacksize;
518
        struct pool             *qp;
519
};
520
521
static void *
522 886
pool_thread(void *priv)
523
{
524
        struct pool_info *pi;
525
526 886
        CAST_OBJ_NOTNULL(pi, priv, POOL_INFO_MAGIC);
527 886
        THR_Init();
528 886
        WRK_Thread(pi->qp, pi->stacksize, cache_param->workspace_thread);
529 886
        FREE_OBJ(pi);
530 886
        return (NULL);
531
}
532
533
static void
534 771395
pool_breed(struct pool *qp)
535
{
536
        pthread_t tp;
537
        pthread_attr_t tp_attr;
538
        struct pool_info *pi;
539
540 771395
        PTOK(pthread_attr_init(&tp_attr));
541 771395
        PTOK(pthread_attr_setdetachstate(&tp_attr, PTHREAD_CREATE_DETACHED));
542
543
        /* Set the stacksize for worker threads we create */
544 771395
        if (cache_param->wthread_stacksize != UINT_MAX)
545 771395
                PTOK(pthread_attr_setstacksize(&tp_attr, cache_param->wthread_stacksize));
546
547 771395
        ALLOC_OBJ(pi, POOL_INFO_MAGIC);
548 771395
        AN(pi);
549 771395
        PTOK(pthread_attr_getstacksize(&tp_attr, &pi->stacksize));
550 771395
        pi->qp = qp;
551
552 771395
        errno = pthread_create(&tp, &tp_attr, pool_thread, pi);
553 771395
        if (errno) {
554 0
                FREE_OBJ(pi);
555 0
                VSL(SLT_Debug, NO_VXID, "Create worker thread failed %d %s",
556 0
                    errno, VAS_errtxt(errno));
557 0
                Lck_Lock(&pool_mtx);
558 0
                VSC_C_main->threads_failed++;
559 0
                Lck_Unlock(&pool_mtx);
560 0
                VTIM_sleep(cache_param->wthread_fail_delay);
561 0
        } else {
562 771395
                qp->nthr++;
563 771395
                Lck_Lock(&pool_mtx);
564 771395
                VSC_C_main->threads++;
565 771395
                VSC_C_main->threads_created++;
566 771395
                Lck_Unlock(&pool_mtx);
567 771395
                if (cache_param->wthread_add_delay > 0.0)
568 602
                        VTIM_sleep(cache_param->wthread_add_delay);
569
                else
570 770793
                        (void)sched_yield();
571
        }
572
573 771395
        PTOK(pthread_attr_destroy(&tp_attr));
574 771395
}
575
576
/*--------------------------------------------------------------------
577
 * Herd a single pool
578
 *
579
 * This thread wakes up every thread_pool_timeout seconds, whenever a pool
580
 * queues and when threads need to be destroyed
581
 *
582
 * The trick here is to not be too aggressive about creating threads.  In
583
 * pool_breed(), we sleep whenever we create a thread and a little while longer
584
 * whenever we fail to, hopefully missing a lot of cond_signals in the meantime.
585
 *
586
 * Idle threads are destroyed at a rate determined by wthread_destroy_delay
587
 *
588
 * XXX: probably need a lot more work.
589
 *
590
 */
591
592
void*
593 75186
pool_herder(void *priv)
594
{
595
        struct pool *pp;
596
        struct pool_task *pt;
597
        double t_idle;
598
        struct worker *wrk;
599
        double delay;
600
        unsigned wthread_min;
601 75186
        uintmax_t dq = (1ULL << 31);
602 75186
        vtim_mono dqt = 0;
603 75186
        int r = 0;
604
605 75186
        CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
606
607 75186
        THR_SetName("pool_herder");
608 75186
        THR_Init();
609
610 1199831
        while (!pp->die || pp->nthr > 0) {
611
                /*
612
                 * If the worker pool is configured too small, we can
613
                 * end up deadlocking it (see #2418 for details).
614
                 *
615
                 * Recovering from this would require a lot of complicated
616
                 * code, and fundamentally, either people configured their
617
                 * pools wrong, in which case we want them to notice, or
618
                 * they are under DoS, in which case recovering gracefully
619
                 * is unlikely be a major improvement.
620
                 *
621
                 * Instead we implement a watchdog and kill the worker if
622
                 * nothing has been dequeued for that long.
623
                 */
624 1124645
                if (VTAILQ_EMPTY(&pp->queues[TASK_QUEUE_HIGHEST_PRIORITY])) {
625
                        /* Watchdog only applies to no movement on the
626
                         * highest priority queue (TASK_QUEUE_BO) */
627 1124411
                        dq = pp->ndequeued + 1;
628 1124645
                } else if (dq != pp->ndequeued) {
629 194
                        dq = pp->ndequeued;
630 194
                        dqt = VTIM_mono();
631 234
                } else if (VTIM_mono() - dqt > cache_param->wthread_watchdog) {
632 0
                        VSL(SLT_Error, NO_VXID,
633
                            "Pool Herder: Queue does not move ql=%u dt=%f",
634 0
                            pp->lqueue, VTIM_mono() - dqt);
635 0
                        WRONG("Worker Pool Queue does not move"
636
                              " - see thread_pool_watchdog parameter");
637 0
                }
638 1124645
                wthread_min = cache_param->wthread_min;
639 1124645
                if (pp->die)
640 1058
                        wthread_min = 0;
641
642
                /* Make more threads if needed and allowed */
643 1125020
                if (pp->nthr < wthread_min ||
644 353505
                    (pp->lqueue > 0 && pp->nthr < cache_param->wthread_max)) {
645 771395
                        pool_breed(pp);
646 771395
                        continue;
647
                }
648
649 353250
                delay = cache_param->wthread_timeout;
650 353250
                assert(pp->nthr >= wthread_min);
651
652 353250
                if (pp->nthr > wthread_min) {
653
654 1275
                        t_idle = VTIM_real() - cache_param->wthread_timeout;
655
656 1275
                        Lck_Lock(&pp->mtx);
657 1275
                        wrk = NULL;
658 1275
                        pt = VTAILQ_LAST(&pp->idle_queue, taskhead);
659 1275
                        if (pt != NULL) {
660 1080
                                AN(pp->nidle);
661 1080
                                AZ(pt->func);
662 1080
                                CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
663
664 1080
                                if (pp->die || wrk->lastused < t_idle ||
665 194
                                    pp->nthr > cache_param->wthread_max) {
666
                                        /* Give it a kiss on the cheek... */
667 886
                                        VTAILQ_REMOVE(&pp->idle_queue,
668
                                            wrk->task, list);
669 886
                                        pp->nidle--;
670 886
                                        wrk->task->func = pool_kiss_of_death;
671 886
                                        PTOK(pthread_cond_signal(&wrk->cond));
672 886
                                } else {
673 194
                                        delay = wrk->lastused - t_idle;
674 194
                                        wrk = NULL;
675
                                }
676 1080
                        }
677 1275
                        Lck_Unlock(&pp->mtx);
678
679 1275
                        if (wrk != NULL) {
680 886
                                pp->nthr--;
681 886
                                Lck_Lock(&pool_mtx);
682 886
                                VSC_C_main->threads--;
683 886
                                VSC_C_main->threads_destroyed++;
684 886
                                Lck_Unlock(&pool_mtx);
685 886
                                delay = cache_param->wthread_destroy_delay;
686 886
                        } else
687 389
                                delay = vmax(delay,
688
                                    cache_param->wthread_destroy_delay);
689 1275
                }
690
691 353250
                if (pp->die) {
692 1012
                        if (delay < 2)
693 846
                                delay = .01;
694
                        else
695 166
                                delay = 1;
696 1012
                        VTIM_sleep(delay);
697 1012
                        continue;
698
                }
699 352238
                Lck_Lock(&pp->mtx);
700 352238
                if (pp->lqueue == 0) {
701 352118
                        if (DO_DEBUG(DBG_VTC_MODE))
702 351970
                                delay = 0.5;
703 352118
                        r = Lck_CondWaitTimeout(
704 352118
                            &pp->herder_cond, &pp->mtx, delay);
705 352238
                } else if (pp->nthr >= cache_param->wthread_max) {
706
                        /* XXX: unsafe counters */
707 120
                        if (r != ETIMEDOUT)
708 40
                                VSC_C_main->threads_limited++;
709 120
                        r = Lck_CondWaitTimeout(
710 120
                            &pp->herder_cond, &pp->mtx, 1.0);
711 120
                }
712 352238
                Lck_Unlock(&pp->mtx);
713
        }
714 75186
        return (NULL);
715
}
716
717
/*--------------------------------------------------------------------
718
 * Debugging aids
719
 */
720
721
static void v_matchproto_(cli_func_t)
722 40
debug_reqpoolfail(struct cli *cli, const char * const *av, void *priv)
723
{
724 40
        uintmax_t u = 1;
725
        const char *p;
726
727 40
        (void)priv;
728 40
        (void)cli;
729 40
        reqpoolfail = 0;
730 80
        for (p = av[2]; *p != '\0'; p++) {
731 40
                if (*p == 'F' || *p == 'f')
732 40
                        reqpoolfail |= u;
733 40
                u <<= 1;
734 40
        }
735 40
}
736
737
static struct cli_proto debug_cmds[] = {
738
        { CLICMD_DEBUG_REQPOOLFAIL,             "d", debug_reqpoolfail },
739
        { NULL }
740
};
741
742
void
743 173425
WRK_Log(enum VSL_tag_e tag, const char *fmt, ...)
744
{
745
        struct worker *wrk;
746
        va_list ap;
747
748 173425
        AN(fmt);
749
750 173425
        wrk = THR_GetWorker();
751 173425
        CHECK_OBJ_ORNULL(wrk, WORKER_MAGIC);
752
753 173425
        va_start(ap, fmt);
754 173425
        if (wrk != NULL && wrk->vsl != NULL)
755 108935
                VSLbv(wrk->vsl, tag, fmt, ap);
756
        else
757 64490
                VSLv(tag, NO_VXID, fmt, ap);
758 173425
        va_end(ap);
759 173425
}
760
761
/*--------------------------------------------------------------------
762
 *
763
 */
764
765
void
766 38033
WRK_Init(void)
767
{
768 38033
        assert(cache_param->wthread_min >= TASK_QUEUE_RESERVE);
769 38033
        CLI_AddFuncs(debug_cmds);
770 38033
}