varnish-cache/bin/varnishd/cache/cache_wrk.c
1
/*-
2
 * Copyright (c) 2006 Verdens Gang AS
3
 * Copyright (c) 2006-2011 Varnish Software AS
4
 * All rights reserved.
5
 *
6
 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
7
 *
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions
10
 * are met:
11
 * 1. Redistributions of source code must retain the above copyright
12
 *    notice, this list of conditions and the following disclaimer.
13
 * 2. Redistributions in binary form must reproduce the above copyright
14
 *    notice, this list of conditions and the following disclaimer in the
15
 *    documentation and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
21
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27
 * SUCH DAMAGE.
28
 *
29
 * Worker thread stuff unrelated to the worker thread pools.
30
 */
31
32
#include "config.h"
33
34
#include <errno.h>
35
#include <stdlib.h>
36
37
#include "cache_varnishd.h"
38
#include "cache_pool.h"
39
40
#include "vtim.h"
41
42
#include "hash/hash_slinger.h"
43
44
static void Pool_Work_Thread(struct pool *pp, struct worker *wrk);
45
46
/*--------------------------------------------------------------------
47
 * Create and start a back-ground thread which as its own worker and
48
 * session data structures;
49
 */
50
51
struct bgthread {
52
        unsigned        magic;
53
#define BGTHREAD_MAGIC  0x23b5152b
54
        const char      *name;
55
        bgthread_t      *func;
56
        void            *priv;
57
};
58
59
static void *
60 4984
wrk_bgthread(void *arg)
61
{
62
        struct bgthread *bt;
63
        struct worker wrk;
64
        struct VSC_main ds;
65
66 4984
        CAST_OBJ_NOTNULL(bt, arg, BGTHREAD_MAGIC);
67 4984
        THR_SetName(bt->name);
68 4984
        THR_Init();
69 4984
        INIT_OBJ(&wrk, WORKER_MAGIC);
70 4984
        memset(&ds, 0, sizeof ds);
71 4984
        wrk.stats = &ds;
72
73 4984
        (void)bt->func(&wrk, bt->priv);
74
75 0
        WRONG("BgThread terminated");
76
77
        NEEDLESS(return NULL);
78
}
79
80
void
81 4984
WRK_BgThread(pthread_t *thr, const char *name, bgthread_t *func, void *priv)
82
{
83
        struct bgthread *bt;
84
85 4984
        ALLOC_OBJ(bt, BGTHREAD_MAGIC);
86 4984
        AN(bt);
87
88 4984
        bt->name = name;
89 4984
        bt->func = func;
90 4984
        bt->priv = priv;
91 4984
        AZ(pthread_create(thr, NULL, wrk_bgthread, bt));
92 4984
}
93
94
/*--------------------------------------------------------------------*/
95
96
static void
97 24742
WRK_Thread(struct pool *qp, size_t stacksize, unsigned thread_workspace)
98
{
99
        struct worker *w, ww;
100
        struct VSC_main ds;
101 24742
        unsigned char ws[thread_workspace];
102
103 24742
        AN(qp);
104 24742
        AN(stacksize);
105 24742
        AN(thread_workspace);
106
107 24742
        THR_SetName("cache-worker");
108 24741
        w = &ww;
109 24741
        INIT_OBJ(w, WORKER_MAGIC);
110 24741
        w->lastused = NAN;
111 24741
        memset(&ds, 0, sizeof ds);
112 24741
        w->stats = &ds;
113 24741
        AZ(pthread_cond_init(&w->cond, NULL));
114
115 24741
        WS_Init(w->aws, "wrk", ws, thread_workspace);
116
117 24739
        VSL(SLT_WorkThread, 0, "%p start", w);
118
119 24741
        Pool_Work_Thread(qp, w);
120 46
        AZ(w->pool);
121
122 46
        VSL(SLT_WorkThread, 0, "%p end", w);
123 46
        if (w->vcl != NULL)
124 4
                VCL_Rel(&w->vcl);
125 46
        AZ(pthread_cond_destroy(&w->cond));
126 46
        HSH_Cleanup(w);
127 46
        Pool_Sumstat(w);
128 46
}
129
130
/*--------------------------------------------------------------------
131
 * Summing of stats into pool counters
132
 */
133
134
static void
135 5445
pool_addstat(struct VSC_main *dst, struct VSC_main *src)
136
{
137
138 5445
        dst->summs++;
139 5445
        VSC_main_Summ(dst, src);
140 5445
        memset(src, 0, sizeof *src);
141 5445
}
142
143
static inline int
144 40872
pool_reserve(void)
145
{
146
        unsigned lim;
147
148 40872
        if (cache_param->wthread_reserve == 0)
149 40844
                return (cache_param->wthread_min / 20 + 1);
150 28
        lim = cache_param->wthread_min * 950 / 1000;
151 28
        if (cache_param->wthread_reserve > lim)
152 0
                return (lim);
153 28
        return (cache_param->wthread_reserve);
154
}
155
156
/*--------------------------------------------------------------------*/
157
158
static struct worker *
159 7900
pool_getidleworker(struct pool *pp, enum task_prio prio)
160
{
161 7900
        struct pool_task *pt = NULL;
162
        struct worker *wrk;
163
164 7900
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
165 7900
        Lck_AssertHeld(&pp->mtx);
166 7902
        if (prio <= TASK_QUEUE_RESERVE || pp->nidle > pool_reserve()) {
167 7893
                pt = VTAILQ_FIRST(&pp->idle_queue);
168 7893
                if (pt == NULL)
169 3
                        AZ(pp->nidle);
170
        }
171
172 7900
        if (pt == NULL) {
173 9
                if (pp->nthr < cache_param->wthread_max) {
174 7
                        pp->dry++;
175 7
                        AZ(pthread_cond_signal(&pp->herder_cond));
176
                }
177 9
                return (NULL);
178
        }
179 7891
        AZ(pt->func);
180 7891
        CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
181 7891
        return (wrk);
182
}
183
184
/*--------------------------------------------------------------------
185
 * Special scheduling:  If no thread can be found, the current thread
186
 * will be prepared for rescheduling instead.
187
 * The selected threads workspace is reserved and the argument put there.
188
 * Return one if another thread was scheduled, otherwise zero.
189
 */
190
191
int
192 2185
Pool_Task_Arg(struct worker *wrk, enum task_prio prio, task_func_t *func,
193
    const void *arg, size_t arg_len)
194
{
195
        struct pool *pp;
196
        struct worker *wrk2;
197
        int retval;
198
199 2185
        CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
200 2185
        AN(arg);
201 2185
        AN(arg_len);
202 2185
        pp = wrk->pool;
203 2185
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
204
205 2185
        Lck_Lock(&pp->mtx);
206 2186
        wrk2 = pool_getidleworker(pp, prio);
207 2185
        if (wrk2 != NULL) {
208 2181
                AN(pp->nidle);
209 2181
                VTAILQ_REMOVE(&pp->idle_queue, &wrk2->task, list);
210 2181
                pp->nidle--;
211 2181
                retval = 1;
212
        } else {
213 4
                wrk2 = wrk;
214 4
                retval = 0;
215
        }
216 2185
        Lck_Unlock(&pp->mtx);
217 2185
        AZ(wrk2->task.func);
218
219 2185
        assert(arg_len <= WS_Reserve(wrk2->aws, arg_len));
220 2183
        memcpy(wrk2->aws->f, arg, arg_len);
221 2183
        wrk2->task.func = func;
222 2183
        wrk2->task.priv = wrk2->aws->f;
223 2183
        if (retval)
224 2179
                AZ(pthread_cond_signal(&wrk2->cond));
225 2186
        return (retval);
226
}
227
228
/*--------------------------------------------------------------------
229
 * Enter a new task to be done
230
 */
231
232
int
233 5715
Pool_Task(struct pool *pp, struct pool_task *task, enum task_prio prio)
234
{
235
        struct worker *wrk;
236 5715
        int retval = 0;
237 5715
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
238 5715
        AN(task);
239 5715
        AN(task->func);
240 5715
        assert(prio < TASK_QUEUE_END);
241
242 5715
        Lck_Lock(&pp->mtx);
243
244
        /* The common case first:  Take an idle thread, do it. */
245
246 5715
        wrk = pool_getidleworker(pp, prio);
247 5715
        if (wrk != NULL) {
248 5710
                AN(pp->nidle);
249 5710
                VTAILQ_REMOVE(&pp->idle_queue, &wrk->task, list);
250 5710
                pp->nidle--;
251 5710
                AZ(wrk->task.func);
252 5710
                wrk->task.func = task->func;
253 5710
                wrk->task.priv = task->priv;
254 5710
                Lck_Unlock(&pp->mtx);
255 5710
                AZ(pthread_cond_signal(&wrk->cond));
256 5710
                return (0);
257
        }
258
259
        /*
260
         * queue limits only apply to client threads - all other
261
         * work is vital and needs do be done at the earliest
262
         */
263 7
        if (!TASK_QUEUE_CLIENT(prio) ||
264 4
            pp->lqueue + pp->nthr < cache_param->wthread_max +
265 2
            cache_param->wthread_queue_limit) {
266 3
                pp->nqueued++;
267 3
                pp->lqueue++;
268 3
                VTAILQ_INSERT_TAIL(&pp->queues[prio], task, list);
269
        } else {
270 2
                if (prio == TASK_QUEUE_REQ)
271 0
                        pp->sdropped++;
272
                else
273 2
                        pp->rdropped++;
274 2
                retval = -1;
275
        }
276 5
        Lck_Unlock(&pp->mtx);
277 5
        return (retval);
278
}
279
280
/*--------------------------------------------------------------------
281
 * Empty function used as a pointer value for the thread exit condition.
282
 */
283
284
static void v_matchproto_(task_func_t)
285 0
pool_kiss_of_death(struct worker *wrk, void *priv)
286
{
287
        (void)wrk;
288
        (void)priv;
289 0
}
290
291
292
/*--------------------------------------------------------------------
293
 * This is the work function for worker threads in the pool.
294
 */
295
296
static void
297 24738
Pool_Work_Thread(struct pool *pp, struct worker *wrk)
298
{
299 24738
        struct pool_task *tp = NULL;
300
        struct pool_task tpx, tps;
301
        int i, prio_lim;
302
303 24738
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
304 24738
        wrk->pool = pp;
305
        while (1) {
306 35628
                Lck_Lock(&pp->mtx);
307
308 35632
                CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
309
310 35632
                WS_Reset(wrk->aws, 0);
311 35632
                AZ(wrk->vsl);
312
313 35632
                if (pp->nidle < pool_reserve())
314 2458
                        prio_lim = TASK_QUEUE_RESERVE + 1;
315
                else
316 33174
                        prio_lim = TASK_QUEUE_END;
317
318 170783
                for (i = 0; i < prio_lim; i++) {
319 135154
                        tp = VTAILQ_FIRST(&pp->queues[i]);
320 135154
                        if (tp != NULL) {
321 3
                                pp->lqueue--;
322 3
                                VTAILQ_REMOVE(&pp->queues[i], tp, list);
323 3
                                break;
324
                        }
325
                }
326
327 65819
                if ((tp == NULL && wrk->stats->summs > 0) ||
328 30187
                    (wrk->stats->summs >= cache_param->wthread_stats_rate))
329 5445
                        pool_addstat(pp->a_stat, wrk->stats);
330
331 35632
                if (tp != NULL) {
332 3
                        wrk->stats->summs++;
333 35629
                } else if (pp->b_stat != NULL && pp->a_stat->summs) {
334
                        /* Nothing to do, push pool stats into global pool */
335 5445
                        tps.func = pool_stat_summ;
336 5445
                        tps.priv = pp->a_stat;
337 5445
                        pp->a_stat = pp->b_stat;
338 5445
                        pp->b_stat = NULL;
339 5445
                        tp = &tps;
340
                } else {
341
                        /* Nothing to do: To sleep, perchance to dream ... */
342 30184
                        if (isnan(wrk->lastused))
343 24739
                                wrk->lastused = VTIM_real();
344 30184
                        wrk->task.func = NULL;
345 30184
                        wrk->task.priv = wrk;
346 30184
                        VTAILQ_INSERT_HEAD(&pp->idle_queue, &wrk->task, list);
347 30184
                        pp->nidle++;
348
                        do {
349 32534
                                i = Lck_CondWait(&wrk->cond, &pp->mtx,
350 32534
                                    wrk->vcl == NULL ?  0 : wrk->lastused+60.);
351 7937
                                if (i == ETIMEDOUT)
352 0
                                        VCL_Rel(&wrk->vcl);
353 7937
                        } while (wrk->task.func == NULL);
354 7937
                        tpx = wrk->task;
355 7937
                        tp = &tpx;
356 7937
                        wrk->stats->summs++;
357
                }
358 13385
                Lck_Unlock(&pp->mtx);
359
360 13386
                if (tp->func == pool_kiss_of_death)
361 46
                        break;
362
363
                do {
364 18039
                        memset(&wrk->task, 0, sizeof wrk->task);
365 18039
                        assert(wrk->pool == pp);
366 18039
                        tp->func(wrk, tp->priv);
367 15590
                        if (DO_DEBUG(DBG_VCLREL) && wrk->vcl != NULL)
368 10
                                VCL_Rel(&wrk->vcl);
369 15589
                        tpx = wrk->task;
370 15589
                        tp = &tpx;
371 15589
                } while (tp->func != NULL);
372
373
                /* cleanup for next task */
374 10890
                wrk->seen_methods = 0;
375 10890
        }
376 46
        wrk->pool = NULL;
377 46
}
378
379
/*--------------------------------------------------------------------
380
 * Create another worker thread.
381
 */
382
383
struct pool_info {
384
        unsigned                magic;
385
#define POOL_INFO_MAGIC         0x4e4442d3
386
        size_t                  stacksize;
387
        struct pool             *qp;
388
};
389
390
static void *
391 24742
pool_thread(void *priv)
392
{
393
        struct pool_info *pi;
394
395 24742
        CAST_OBJ_NOTNULL(pi, priv, POOL_INFO_MAGIC);
396 24742
        THR_Init();
397 24742
        WRK_Thread(pi->qp, pi->stacksize, cache_param->workspace_thread);
398 46
        FREE_OBJ(pi);
399 46
        return (NULL);
400
}
401
402
static void
403 24742
pool_breed(struct pool *qp)
404
{
405
        pthread_t tp;
406
        pthread_attr_t tp_attr;
407
        struct pool_info *pi;
408
409 24742
        AZ(pthread_attr_init(&tp_attr));
410 24742
        AZ(pthread_attr_setdetachstate(&tp_attr, PTHREAD_CREATE_DETACHED));
411
412
        /* Set the stacksize for worker threads we create */
413 24742
        if (cache_param->wthread_stacksize != UINT_MAX)
414 24742
                AZ(pthread_attr_setstacksize(&tp_attr,
415
                    cache_param->wthread_stacksize));
416
417 24742
        ALLOC_OBJ(pi, POOL_INFO_MAGIC);
418 24742
        AN(pi);
419 24742
        AZ(pthread_attr_getstacksize(&tp_attr, &pi->stacksize));
420 24742
        pi->qp = qp;
421
422 24742
        if (pthread_create(&tp, &tp_attr, pool_thread, pi)) {
423 0
                VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
424 0
                    errno, strerror(errno));
425 0
                Lck_Lock(&pool_mtx);
426 0
                VSC_C_main->threads_failed++;
427 0
                Lck_Unlock(&pool_mtx);
428 0
                VTIM_sleep(cache_param->wthread_fail_delay);
429
        } else {
430 24742
                qp->dry = 0;
431 24742
                qp->nthr++;
432 24742
                Lck_Lock(&pool_mtx);
433 24742
                VSC_C_main->threads++;
434 24742
                VSC_C_main->threads_created++;
435 24742
                Lck_Unlock(&pool_mtx);
436 24742
                VTIM_sleep(cache_param->wthread_add_delay);
437
        }
438
439 24742
        AZ(pthread_attr_destroy(&tp_attr));
440 24742
}
441
442
/*--------------------------------------------------------------------
443
 * Herd a single pool
444
 *
445
 * This thread wakes up every thread_pool_timeout seconds, whenever a pool
446
 * queues and when threads need to be destroyed
447
 *
448
 * The trick here is to not be too aggressive about creating threads.  In
449
 * pool_breed(), we sleep whenever we create a thread and a little while longer
450
 * whenever we fail to, hopefully missing a lot of cond_signals in the meantime.
451
 *
452
 * Idle threads are destroyed at a rate determined by wthread_destroy_delay
453
 *
454
 * XXX: probably need a lot more work.
455
 *
456
 */
457
458
void*
459 2442
pool_herder(void *priv)
460
{
461
        struct pool *pp;
462
        struct pool_task *pt;
463
        double t_idle;
464
        struct worker *wrk;
465
        double delay;
466
        int wthread_min;
467
468 2442
        CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
469
470 2442
        THR_SetName("pool_herder");
471 2442
        THR_Init();
472
473 39039
        while (!pp->die || pp->nthr > 0) {
474 36593
                wthread_min = cache_param->wthread_min;
475 36593
                if (pp->die)
476 59
                        wthread_min = 0;
477
478
                /* Make more threads if needed and allowed */
479 48430
                if (pp->nthr < wthread_min ||
480 11841
                    (pp->dry && pp->nthr < cache_param->wthread_max)) {
481 24782
                        pool_breed(pp);
482 24742
                        continue;
483
                }
484
485 11811
                delay = cache_param->wthread_timeout;
486 11811
                assert(pp->nthr >= wthread_min);
487
488 11811
                if (pp->nthr > wthread_min) {
489
490 63
                        t_idle = VTIM_real() - cache_param->wthread_timeout;
491
492 63
                        Lck_Lock(&pp->mtx);
493
                        /* XXX: unsafe counters */
494 63
                        VSC_C_main->sess_queued += pp->nqueued;
495 63
                        VSC_C_main->sess_dropped += pp->sdropped;
496 63
                        VSC_C_main->req_dropped += pp->rdropped;
497 63
                        pp->nqueued = pp->sdropped = pp->rdropped = 0;
498
499 63
                        wrk = NULL;
500 63
                        pt = VTAILQ_LAST(&pp->idle_queue, taskhead);
501 63
                        if (pt != NULL) {
502 52
                                AN(pp->nidle);
503 52
                                AZ(pt->func);
504 52
                                CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
505
506 58
                                if (pp->die || wrk->lastused < t_idle ||
507 6
                                    pp->nthr > cache_param->wthread_max) {
508
                                        /* Give it a kiss on the cheek... */
509 46
                                        VTAILQ_REMOVE(&pp->idle_queue,
510
                                            &wrk->task, list);
511 46
                                        pp->nidle--;
512 46
                                        wrk->task.func = pool_kiss_of_death;
513 46
                                        AZ(pthread_cond_signal(&wrk->cond));
514
                                } else {
515 6
                                        delay = wrk->lastused - t_idle;
516 6
                                        wrk = NULL;
517
                                }
518
                        }
519 63
                        Lck_Unlock(&pp->mtx);
520
521 63
                        if (wrk != NULL) {
522 46
                                pp->nthr--;
523 46
                                Lck_Lock(&pool_mtx);
524 46
                                VSC_C_main->threads--;
525 46
                                VSC_C_main->threads_destroyed++;
526 46
                                Lck_Unlock(&pool_mtx);
527 46
                                delay = cache_param->wthread_destroy_delay;
528 17
                        } else if (delay < cache_param->wthread_destroy_delay)
529 0
                                delay = cache_param->wthread_destroy_delay;
530
                }
531
532 11811
                if (pp->die) {
533 55
                        if (delay < 2)
534 44
                                delay = 10e-3;
535
                        else
536 11
                                delay = 1;
537 55
                        VTIM_sleep(delay);
538 55
                        continue;
539
                }
540 11756
                Lck_Lock(&pp->mtx);
541 11765
                if (!pp->dry) {
542 11765
                        if (DO_DEBUG(DBG_VTC_MODE))
543 11761
                                delay = 0.5;
544 11771
                        (void)Lck_CondWait(&pp->herder_cond, &pp->mtx,
545 11761
                                VTIM_real() + delay);
546
                } else {
547
                        /* XXX: unsafe counters */
548 0
                        VSC_C_main->threads_limited++;
549 0
                        pp->dry = 0;
550
                }
551 9342
                Lck_Unlock(&pp->mtx);
552
        }
553 4
        return (NULL);
554
}