varnish-cache/bin/varnishd/cache/cache_wrk.c
1
/*-
2
 * Copyright (c) 2006 Verdens Gang AS
3
 * Copyright (c) 2006-2011 Varnish Software AS
4
 * All rights reserved.
5
 *
6
 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
7
 *
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions
10
 * are met:
11
 * 1. Redistributions of source code must retain the above copyright
12
 *    notice, this list of conditions and the following disclaimer.
13
 * 2. Redistributions in binary form must reproduce the above copyright
14
 *    notice, this list of conditions and the following disclaimer in the
15
 *    documentation and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
21
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27
 * SUCH DAMAGE.
28
 *
29
 * Worker thread stuff unrelated to the worker thread pools.
30
 */
31
32
#include "config.h"
33
34
#include <errno.h>
35
#include <stdlib.h>
36
37
#include "cache_varnishd.h"
38
#include "cache_pool.h"
39
40
#include "vtim.h"
41
42
#include "hash/hash_slinger.h"
43
44
static void Pool_Work_Thread(struct pool *pp, struct worker *wrk);
45
46
/*--------------------------------------------------------------------
47
 * Create and start a back-ground thread which as its own worker and
48
 * session data structures;
49
 */
50
51
struct bgthread {
52
        unsigned        magic;
53
#define BGTHREAD_MAGIC  0x23b5152b
54
        const char      *name;
55
        bgthread_t      *func;
56
        void            *priv;
57
};
58
59
static void *
60 8328
wrk_bgthread(void *arg)
61
{
62
        struct bgthread *bt;
63
        struct worker wrk;
64
        struct VSC_main ds;
65
66 8328
        CAST_OBJ_NOTNULL(bt, arg, BGTHREAD_MAGIC);
67 8328
        THR_SetName(bt->name);
68 8328
        THR_Init();
69 8328
        INIT_OBJ(&wrk, WORKER_MAGIC);
70 8328
        memset(&ds, 0, sizeof ds);
71 8328
        wrk.stats = &ds;
72
73 8328
        (void)bt->func(&wrk, bt->priv);
74
75 0
        WRONG("BgThread terminated");
76
77
        NEEDLESS(return NULL);
78
}
79
80
void
81 8328
WRK_BgThread(pthread_t *thr, const char *name, bgthread_t *func, void *priv)
82
{
83
        struct bgthread *bt;
84
85 8328
        ALLOC_OBJ(bt, BGTHREAD_MAGIC);
86 8328
        AN(bt);
87
88 8328
        bt->name = name;
89 8328
        bt->func = func;
90 8328
        bt->priv = priv;
91 8328
        AZ(pthread_create(thr, NULL, wrk_bgthread, bt));
92 8328
}
93
94
/*--------------------------------------------------------------------*/
95
96
static void
97 41322
WRK_Thread(struct pool *qp, size_t stacksize, unsigned thread_workspace)
98 41322
{
99
        // child_signal_handler stack overflow check uses struct worker addr
100
        struct worker *w, ww;
101
        struct VSC_main ds;
102 41322
        unsigned char ws[thread_workspace];
103
104 41322
        AN(qp);
105 41322
        AN(stacksize);
106 41322
        AN(thread_workspace);
107
108 41322
        THR_SetName("cache-worker");
109 41343
        w = &ww;
110 41343
        INIT_OBJ(w, WORKER_MAGIC);
111 41343
        w->lastused = NAN;
112 41343
        memset(&ds, 0, sizeof ds);
113 41343
        w->stats = &ds;
114 41343
        AZ(pthread_cond_init(&w->cond, NULL));
115
116 41317
        WS_Init(w->aws, "wrk", ws, thread_workspace);
117
118 41300
        VSL(SLT_WorkThread, 0, "%p start", w);
119
120 41310
        Pool_Work_Thread(qp, w);
121 69
        AZ(w->pool);
122
123 69
        VSL(SLT_WorkThread, 0, "%p end", w);
124 69
        if (w->vcl != NULL)
125 6
                VCL_Rel(&w->vcl);
126 69
        AZ(pthread_cond_destroy(&w->cond));
127 69
        HSH_Cleanup(w);
128 69
        Pool_Sumstat(w);
129 69
}
130
131
/*--------------------------------------------------------------------
132
 * Summing of stats into pool counters
133
 */
134
135
static void
136 10550
pool_addstat(struct VSC_main *dst, struct VSC_main *src)
137
{
138
139 10550
        dst->summs++;
140 10550
        VSC_main_Summ(dst, src);
141 10550
        memset(src, 0, sizeof *src);
142 10550
}
143
144
static inline int
145 72205
pool_reserve(void)
146
{
147
        unsigned lim;
148
149 72205
        if (cache_param->wthread_reserve == 0)
150 71893
                return (cache_param->wthread_min / 20 + 1);
151 312
        lim = cache_param->wthread_min * 950 / 1000;
152 312
        if (cache_param->wthread_reserve > lim)
153 0
                return (lim);
154 312
        return (cache_param->wthread_reserve);
155
}
156
157
/*--------------------------------------------------------------------*/
158
159
static struct worker *
160 14666
pool_getidleworker(struct pool *pp, enum task_prio prio)
161
{
162 14666
        struct pool_task *pt = NULL;
163
        struct worker *wrk;
164
165 14666
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
166 14666
        Lck_AssertHeld(&pp->mtx);
167 14666
        if (prio <= TASK_QUEUE_RESERVE || pp->nidle > pool_reserve()) {
168 14657
                pt = VTAILQ_FIRST(&pp->idle_queue);
169 14657
                if (pt == NULL)
170 6
                        AZ(pp->nidle);
171
        }
172
173 14666
        if (pt == NULL) {
174 15
                if (pp->nthr < cache_param->wthread_max) {
175 12
                        pp->dry++;
176 12
                        AZ(pthread_cond_signal(&pp->herder_cond));
177
                }
178 15
                return (NULL);
179
        }
180 14651
        AZ(pt->func);
181 14651
        CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
182 14651
        return (wrk);
183
}
184
185
/*--------------------------------------------------------------------
186
 * Special scheduling:  If no thread can be found, the current thread
187
 * will be prepared for rescheduling instead.
188
 * The selected threads workspace is reserved and the argument put there.
189
 * Return one if another thread was scheduled, otherwise zero.
190
 */
191
192
int
193 4389
Pool_Task_Arg(struct worker *wrk, enum task_prio prio, task_func_t *func,
194
    const void *arg, size_t arg_len)
195
{
196
        struct pool *pp;
197
        struct worker *wrk2;
198
        int retval;
199
200 4389
        CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
201 4389
        AN(arg);
202 4389
        AN(arg_len);
203 4389
        pp = wrk->pool;
204 4389
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
205
206 4389
        Lck_Lock(&pp->mtx);
207 4389
        wrk2 = pool_getidleworker(pp, prio);
208 4389
        if (wrk2 != NULL) {
209 4383
                AN(pp->nidle);
210 4383
                VTAILQ_REMOVE(&pp->idle_queue, &wrk2->task, list);
211 4383
                pp->nidle--;
212 4383
                retval = 1;
213
        } else {
214 6
                wrk2 = wrk;
215 6
                retval = 0;
216
        }
217 4389
        AZ(wrk2->task.func);
218 4389
        assert(arg_len <= WS_Reserve(wrk2->aws, arg_len));
219 4389
        memcpy(wrk2->aws->f, arg, arg_len);
220 4389
        wrk2->task.func = func;
221 4389
        wrk2->task.priv = wrk2->aws->f;
222 4389
        Lck_Unlock(&pp->mtx);
223 4389
        if (retval)
224 4383
                AZ(pthread_cond_signal(&wrk2->cond));
225 4389
        return (retval);
226
}
227
228
/*--------------------------------------------------------------------
229
 * Enter a new task to be done
230
 */
231
232
int
233 10277
Pool_Task(struct pool *pp, struct pool_task *task, enum task_prio prio)
234
{
235
        struct worker *wrk;
236 10277
        int retval = 0;
237 10277
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
238 10277
        AN(task);
239 10277
        AN(task->func);
240 10277
        assert(prio < TASK_QUEUE_END);
241
242 10277
        Lck_Lock(&pp->mtx);
243
244
        /* The common case first:  Take an idle thread, do it. */
245
246 10277
        wrk = pool_getidleworker(pp, prio);
247 10277
        if (wrk != NULL) {
248 10268
                AN(pp->nidle);
249 10268
                VTAILQ_REMOVE(&pp->idle_queue, &wrk->task, list);
250 10268
                pp->nidle--;
251 10268
                AZ(wrk->task.func);
252 10268
                wrk->task.func = task->func;
253 10268
                wrk->task.priv = task->priv;
254 10268
                Lck_Unlock(&pp->mtx);
255 10268
                AZ(pthread_cond_signal(&wrk->cond));
256 10268
                return (0);
257
        }
258
259
        /*
260
         * queue limits only apply to client threads - all other
261
         * work is vital and needs do be done at the earliest
262
         */
263 12
        if (!TASK_QUEUE_CLIENT(prio) ||
264 6
            pp->lqueue + pp->nthr < cache_param->wthread_max +
265 3
            cache_param->wthread_queue_limit) {
266 6
                pp->nqueued++;
267 6
                pp->lqueue++;
268 6
                VTAILQ_INSERT_TAIL(&pp->queues[prio], task, list);
269
        } else {
270 3
                if (prio == TASK_QUEUE_REQ)
271 0
                        pp->sdropped++;
272
                else
273 3
                        pp->rdropped++;
274 3
                retval = -1;
275
        }
276 9
        Lck_Unlock(&pp->mtx);
277 9
        return (retval);
278
}
279
280
/*--------------------------------------------------------------------
281
 * Empty function used as a pointer value for the thread exit condition.
282
 */
283
284
static void v_matchproto_(task_func_t)
285 0
pool_kiss_of_death(struct worker *wrk, void *priv)
286
{
287
        (void)wrk;
288
        (void)priv;
289 0
}
290
291
292
/*--------------------------------------------------------------------
293
 * This is the work function for worker threads in the pool.
294
 */
295
296
static void
297 41314
Pool_Work_Thread(struct pool *pp, struct worker *wrk)
298
{
299 41314
        struct pool_task *tp = NULL;
300
        struct pool_task tpx, tps;
301
        int i, prio_lim;
302
303 41314
        CHECK_OBJ_NOTNULL(pp, POOL_MAGIC);
304 41314
        wrk->pool = pp;
305
        while (1) {
306 83512
                Lck_Lock(&pp->mtx);
307
308 62443
                CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
309
310 62443
                WS_Reset(wrk->aws, 0);
311 62442
                AZ(wrk->vsl);
312
313 62442
                if (pp->nidle < pool_reserve())
314 4111
                        prio_lim = TASK_QUEUE_RESERVE + 1;
315
                else
316 58331
                        prio_lim = TASK_QUEUE_END;
317
318 299874
                for (i = 0; i < prio_lim; i++) {
319 237437
                        tp = VTAILQ_FIRST(&pp->queues[i]);
320 237437
                        if (tp != NULL) {
321 5
                                pp->lqueue--;
322 5
                                VTAILQ_REMOVE(&pp->queues[i], tp, list);
323 5
                                break;
324
                        }
325
                }
326
327 114335
                if ((tp == NULL && wrk->stats->summs > 0) ||
328 51893
                    (wrk->stats->summs >= cache_param->wthread_stats_rate))
329 10549
                        pool_addstat(pp->a_stat, wrk->stats);
330
331 62443
                if (tp != NULL) {
332 6
                        wrk->stats->summs++;
333 62437
                } else if (pp->b_stat != NULL && pp->a_stat->summs) {
334
                        /* Nothing to do, push pool stats into global pool */
335 10550
                        tps.func = pool_stat_summ;
336 10550
                        tps.priv = pp->a_stat;
337 10550
                        pp->a_stat = pp->b_stat;
338 10550
                        pp->b_stat = NULL;
339 10550
                        tp = &tps;
340
                } else {
341
                        /* Nothing to do: To sleep, perchance to dream ... */
342 51887
                        if (isnan(wrk->lastused))
343 41337
                                wrk->lastused = VTIM_real();
344 51887
                        wrk->task.func = NULL;
345 51887
                        wrk->task.priv = wrk;
346 51887
                        VTAILQ_INSERT_HEAD(&pp->idle_queue, &wrk->task, list);
347 51887
                        pp->nidle++;
348
                        do {
349 56777
                                i = Lck_CondWait(&wrk->cond, &pp->mtx,
350 56777
                                    wrk->vcl == NULL ?  0 : wrk->lastused+60.);
351 14720
                                if (i == ETIMEDOUT)
352 0
                                        VCL_Rel(&wrk->vcl);
353 14720
                        } while (wrk->task.func == NULL);
354 14720
                        tpx = wrk->task;
355 14720
                        tp = &tpx;
356 14720
                        wrk->stats->summs++;
357
                }
358 25276
                Lck_Unlock(&pp->mtx);
359
360 25276
                if (tp->func == pool_kiss_of_death)
361 69
                        break;
362
363
                do {
364 34584
                        memset(&wrk->task, 0, sizeof wrk->task);
365 34584
                        assert(wrk->pool == pp);
366 34584
                        tp->func(wrk, tp->priv);
367 30476
                        if (DO_DEBUG(DBG_VCLREL) && wrk->vcl != NULL)
368 15
                                VCL_Rel(&wrk->vcl);
369 30476
                        tpx = wrk->task;
370 30476
                        tp = &tpx;
371 30476
                } while (tp->func != NULL);
372
373
                /* cleanup for next task */
374 21099
                wrk->seen_methods = 0;
375
        }
376 69
        wrk->pool = NULL;
377 69
}
378
379
/*--------------------------------------------------------------------
380
 * Create another worker thread.
381
 */
382
383
struct pool_info {
384
        unsigned                magic;
385
#define POOL_INFO_MAGIC         0x4e4442d3
386
        size_t                  stacksize;
387
        struct pool             *qp;
388
};
389
390
static void *
391 41322
pool_thread(void *priv)
392
{
393
        struct pool_info *pi;
394
395 41322
        CAST_OBJ_NOTNULL(pi, priv, POOL_INFO_MAGIC);
396 41322
        THR_Init();
397 41328
        WRK_Thread(pi->qp, pi->stacksize, cache_param->workspace_thread);
398 69
        FREE_OBJ(pi);
399 69
        return (NULL);
400
}
401
402
static void
403 41343
pool_breed(struct pool *qp)
404
{
405
        pthread_t tp;
406
        pthread_attr_t tp_attr;
407
        struct pool_info *pi;
408
409 41343
        AZ(pthread_attr_init(&tp_attr));
410 41343
        AZ(pthread_attr_setdetachstate(&tp_attr, PTHREAD_CREATE_DETACHED));
411
412
        /* Set the stacksize for worker threads we create */
413 41343
        if (cache_param->wthread_stacksize != UINT_MAX)
414 41343
                AZ(pthread_attr_setstacksize(&tp_attr,
415
                    cache_param->wthread_stacksize));
416
417 41343
        ALLOC_OBJ(pi, POOL_INFO_MAGIC);
418 41343
        AN(pi);
419 41343
        AZ(pthread_attr_getstacksize(&tp_attr, &pi->stacksize));
420 41343
        pi->qp = qp;
421
422 41343
        if (pthread_create(&tp, &tp_attr, pool_thread, pi)) {
423 0
                VSL(SLT_Debug, 0, "Create worker thread failed %d %s",
424 0
                    errno, strerror(errno));
425 0
                Lck_Lock(&pool_mtx);
426 0
                VSC_C_main->threads_failed++;
427 0
                Lck_Unlock(&pool_mtx);
428 0
                VTIM_sleep(cache_param->wthread_fail_delay);
429
        } else {
430 41343
                qp->dry = 0;
431 41343
                qp->nthr++;
432 41343
                Lck_Lock(&pool_mtx);
433 41343
                VSC_C_main->threads++;
434 41343
                VSC_C_main->threads_created++;
435 41343
                Lck_Unlock(&pool_mtx);
436 41343
                VTIM_sleep(cache_param->wthread_add_delay);
437
        }
438
439 41343
        AZ(pthread_attr_destroy(&tp_attr));
440 41343
}
441
442
/*--------------------------------------------------------------------
443
 * Herd a single pool
444
 *
445
 * This thread wakes up every thread_pool_timeout seconds, whenever a pool
446
 * queues and when threads need to be destroyed
447
 *
448
 * The trick here is to not be too aggressive about creating threads.  In
449
 * pool_breed(), we sleep whenever we create a thread and a little while longer
450
 * whenever we fail to, hopefully missing a lot of cond_signals in the meantime.
451
 *
452
 * Idle threads are destroyed at a rate determined by wthread_destroy_delay
453
 *
454
 * XXX: probably need a lot more work.
455
 *
456
 */
457
458
void*
459 4086
pool_herder(void *priv)
460
{
461
        struct pool *pp;
462
        struct pool_task *pt;
463
        double t_idle;
464
        struct worker *wrk;
465
        double delay;
466
        int wthread_min;
467
468 4086
        CAST_OBJ_NOTNULL(pp, priv, POOL_MAGIC);
469
470 4086
        THR_SetName("pool_herder");
471 4086
        THR_Init();
472
473 66844
        while (!pp->die || pp->nthr > 0) {
474 62752
                wthread_min = cache_param->wthread_min;
475 62752
                if (pp->die)
476 90
                        wthread_min = 0;
477
478
                /* Make more threads if needed and allowed */
479 84173
                if (pp->nthr < wthread_min ||
480 21427
                    (pp->dry && pp->nthr < cache_param->wthread_max)) {
481 41342
                        pool_breed(pp);
482 41343
                        continue;
483
                }
484
485 21410
                delay = cache_param->wthread_timeout;
486 21410
                assert(pp->nthr >= wthread_min);
487
488 21410
                if (pp->nthr > wthread_min) {
489
490 96
                        t_idle = VTIM_real() - cache_param->wthread_timeout;
491
492 96
                        Lck_Lock(&pp->mtx);
493
                        /* XXX: unsafe counters */
494 96
                        VSC_C_main->sess_queued += pp->nqueued;
495 96
                        VSC_C_main->sess_dropped += pp->sdropped;
496 96
                        VSC_C_main->req_dropped += pp->rdropped;
497 96
                        pp->nqueued = pp->sdropped = pp->rdropped = 0;
498
499 96
                        wrk = NULL;
500 96
                        pt = VTAILQ_LAST(&pp->idle_queue, taskhead);
501 96
                        if (pt != NULL) {
502 78
                                AN(pp->nidle);
503 78
                                AZ(pt->func);
504 78
                                CAST_OBJ_NOTNULL(wrk, pt->priv, WORKER_MAGIC);
505
506 87
                                if (pp->die || wrk->lastused < t_idle ||
507 9
                                    pp->nthr > cache_param->wthread_max) {
508
                                        /* Give it a kiss on the cheek... */
509 69
                                        VTAILQ_REMOVE(&pp->idle_queue,
510
                                            &wrk->task, list);
511 69
                                        pp->nidle--;
512 69
                                        wrk->task.func = pool_kiss_of_death;
513 69
                                        AZ(pthread_cond_signal(&wrk->cond));
514
                                } else {
515 9
                                        delay = wrk->lastused - t_idle;
516 9
                                        wrk = NULL;
517
                                }
518
                        }
519 96
                        Lck_Unlock(&pp->mtx);
520
521 96
                        if (wrk != NULL) {
522 69
                                pp->nthr--;
523 69
                                Lck_Lock(&pool_mtx);
524 69
                                VSC_C_main->threads--;
525 69
                                VSC_C_main->threads_destroyed++;
526 69
                                Lck_Unlock(&pool_mtx);
527 69
                                delay = cache_param->wthread_destroy_delay;
528 27
                        } else if (delay < cache_param->wthread_destroy_delay)
529 0
                                delay = cache_param->wthread_destroy_delay;
530
                }
531
532 21410
                if (pp->die) {
533 84
                        if (delay < 2)
534 66
                                delay = 10e-3;
535
                        else
536 18
                                delay = 1;
537 84
                        VTIM_sleep(delay);
538 84
                        continue;
539
                }
540 21326
                Lck_Lock(&pp->mtx);
541 21323
                if (!pp->dry) {
542 21323
                        if (DO_DEBUG(DBG_VTC_MODE))
543 21321
                                delay = 0.5;
544 21331
                        (void)Lck_CondWait(&pp->herder_cond, &pp->mtx,
545 21321
                                VTIM_real() + delay);
546
                } else {
547
                        /* XXX: unsafe counters */
548 0
                        VSC_C_main->threads_limited++;
549 0
                        pp->dry = 0;
550
                }
551 17250
                Lck_Unlock(&pp->mtx);
552
        }
553 6
        return (NULL);
554
}