varnish-cache/bin/varnishd/storage/storage_persistent.c
1
/*-
2
 * Copyright (c) 2008-2011 Varnish Software AS
3
 * All rights reserved.
4
 *
5
 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
6
 *
7
 * Redistribution and use in source and binary forms, with or without
8
 * modification, are permitted provided that the following conditions
9
 * are met:
10
 * 1. Redistributions of source code must retain the above copyright
11
 *    notice, this list of conditions and the following disclaimer.
12
 * 2. Redistributions in binary form must reproduce the above copyright
13
 *    notice, this list of conditions and the following disclaimer in the
14
 *    documentation and/or other materials provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
20
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26
 * SUCH DAMAGE.
27
 *
28
 * Persistent storage method
29
 *
30
 * XXX: Before we start the client or maybe after it stops, we should give the
31
 * XXX: stevedores a chance to examine their storage for consistency.
32
 *
33
 * XXX: Do we ever free the LRU-lists ?
34
 */
35
36
#include "config.h"
37
38
#include "cache/cache_varnishd.h"
39
40
#include <sys/mman.h>
41
42
#include <stdio.h>
43
#include <stdlib.h>
44
45
#include "cache/cache_obj.h"
46
#include "cache/cache_objhead.h"
47
#include "storage/storage.h"
48
#include "storage/storage_simple.h"
49
50
#include "vcli_serve.h"
51
#include "vsha256.h"
52
#include "vtim.h"
53
54
#include "storage/storage_persistent.h"
55
56
static struct obj_methods smp_oc_realmethods;
57
58
static struct VSC_lck *lck_smp;
59
60
static void smp_init(void);
61
62
/*--------------------------------------------------------------------*/
63
64
/*
65
 * silos is unlocked, it only changes during startup when we are
66
 * single-threaded
67
 */
68
static VTAILQ_HEAD(,smp_sc)     silos = VTAILQ_HEAD_INITIALIZER(silos);
69
70
/*--------------------------------------------------------------------
71
 * Add bans to silos
72
 */
73
74
static int
75 96
smp_appendban(const struct smp_sc *sc, struct smp_signspace *spc,
76
    uint32_t len, const uint8_t *ban)
77
{
78
79
        (void)sc;
80 96
        if (SIGNSPACE_FREE(spc) < len)
81 0
                return (-1);
82
83 96
        memcpy(SIGNSPACE_FRONT(spc), ban, len);
84 96
        smp_append_signspace(spc, len);
85
86 96
        return (0);
87
}
88
89
/* Trust that cache_ban.c takes care of locking */
90
91
static int
92 70
smp_baninfo(const struct stevedore *stv, enum baninfo event,
93
            const uint8_t *ban, unsigned len)
94
{
95
        struct smp_sc *sc;
96 70
        int r = 0;
97
98 70
        CAST_OBJ_NOTNULL(sc, stv->priv, SMP_SC_MAGIC);
99
100 70
        switch (event) {
101
        case BI_NEW:
102 48
                r |= smp_appendban(sc, &sc->ban1, len, ban);
103 48
                r |= smp_appendban(sc, &sc->ban2, len, ban);
104 48
                break;
105
        default:
106
                /* Ignored */
107 22
                break;
108
        }
109
110 70
        return (r);
111
}
112
113
static void
114 150
smp_banexport_spc(struct smp_signspace *spc, const uint8_t *bans, unsigned len)
115
{
116 150
        smp_reset_signspace(spc);
117 150
        assert(SIGNSPACE_FREE(spc) >= len);
118 150
        memcpy(SIGNSPACE_DATA(spc), bans, len);
119 150
        smp_append_signspace(spc, len);
120 150
        smp_sync_sign(&spc->ctx);
121 150
}
122
123
static void
124 75
smp_banexport(const struct stevedore *stv, const uint8_t *bans, unsigned len)
125
{
126
        struct smp_sc *sc;
127
128 75
        CAST_OBJ_NOTNULL(sc, stv->priv, SMP_SC_MAGIC);
129 75
        smp_banexport_spc(&sc->ban1, bans, len);
130 75
        smp_banexport_spc(&sc->ban2, bans, len);
131 75
}
132
133
/*--------------------------------------------------------------------
134
 * Attempt to open and read in a ban list
135
 */
136
137
static int
138 38
smp_open_bans(const struct smp_sc *sc, struct smp_signspace *spc)
139
{
140
        uint8_t *ptr, *pe;
141
        int i;
142
143 38
        ASSERT_CLI();
144
        (void)sc;
145 38
        i = smp_chk_signspace(spc);
146 38
        if (i)
147 0
                return (i);
148
149 38
        ptr = SIGNSPACE_DATA(spc);
150 38
        pe = SIGNSPACE_FRONT(spc);
151 38
        BAN_Reload(ptr, pe - ptr);
152
153 38
        return (0);
154
}
155
156
/*--------------------------------------------------------------------
157
 * Attempt to open and read in a segment list
158
 */
159
160
static int
161 38
smp_open_segs(struct smp_sc *sc, struct smp_signspace *spc)
162
{
163
        uint64_t length, l;
164
        struct smp_segptr *ss, *se;
165
        struct smp_seg *sg, *sg1, *sg2;
166 38
        int i, n = 0;
167
168 38
        ASSERT_CLI();
169 38
        i = smp_chk_signspace(spc);
170 38
        if (i)
171 0
                return (i);
172
173 38
        ss = SIGNSPACE_DATA(spc);
174 38
        length = SIGNSPACE_LEN(spc);
175
176 38
        if (length == 0) {
177
                /* No segments */
178 23
                sc->free_offset = sc->ident->stuff[SMP_SPC_STUFF];
179 23
                return (0);
180
        }
181 15
        se = ss + length / sizeof *ss;
182 15
        se--;
183 15
        assert(ss <= se);
184
185
        /*
186
         * Locate the free reserve, there are only two basic cases,
187
         * but once we start dropping segments, things gets more complicated.
188
         */
189
190 15
        sc->free_offset = se->offset + se->length;
191 15
        l = sc->mediasize - sc->free_offset;
192 15
        if (se->offset > ss->offset && l >= sc->free_reserve) {
193
                /*
194
                 * [__xxxxyyyyzzzz___]
195
                 * Plenty of space at tail, do nothing.
196
                 */
197 15
        } else if (ss->offset > se->offset) {
198
                /*
199
                 * [zzzz____xxxxyyyy_]
200
                 * (make) space between ends
201
                 * We might nuke the entire tail end without getting
202
                 * enough space, in which case we fall through to the
203
                 * last check.
204
                 */
205 0
                while (ss < se && ss->offset > se->offset) {
206 0
                        l = ss->offset - (se->offset + se->length);
207 0
                        if (l > sc->free_reserve)
208 0
                                break;
209 0
                        ss++;
210 0
                        n++;
211
                }
212
        }
213
214 15
        if (l < sc->free_reserve) {
215
                /*
216
                 * [__xxxxyyyyzzzz___]
217
                 * (make) space at front
218
                 */
219 0
                sc->free_offset = sc->ident->stuff[SMP_SPC_STUFF];
220 0
                while (ss < se) {
221 0
                        l = ss->offset - sc->free_offset;
222 0
                        if (l > sc->free_reserve)
223 0
                                break;
224 0
                        ss++;
225 0
                        n++;
226
                }
227
        }
228
229 15
        assert(l >= sc->free_reserve);
230
231
232 15
        sg1 = NULL;
233 15
        sg2 = NULL;
234 30
        for (; ss <= se; ss++) {
235 15
                ALLOC_OBJ(sg, SMP_SEG_MAGIC);
236 15
                AN(sg);
237 15
                VTAILQ_INIT(&sg->objcores);
238 15
                sg->p = *ss;
239
240 15
                sg->flags |= SMP_SEG_MUSTLOAD;
241
242
                /*
243
                 * HACK: prevent save_segs from nuking segment until we have
244
                 * HACK: loaded it.
245
                 */
246 15
                sg->nobj = 1;
247 15
                if (sg1 != NULL) {
248 0
                        assert(sg1->p.offset != sg->p.offset);
249 0
                        if (sg1->p.offset < sg->p.offset)
250 0
                                assert(smp_segend(sg1) <= sg->p.offset);
251
                        else
252 0
                                assert(smp_segend(sg) <= sg1->p.offset);
253
                }
254 15
                if (sg2 != NULL) {
255 0
                        assert(sg2->p.offset != sg->p.offset);
256 0
                        if (sg2->p.offset < sg->p.offset)
257 0
                                assert(smp_segend(sg2) <= sg->p.offset);
258
                        else
259 0
                                assert(smp_segend(sg) <= sg2->p.offset);
260
                }
261
262
                /* XXX: check that they are inside silo */
263
                /* XXX: check that they don't overlap */
264
                /* XXX: check that they are serial */
265 15
                sg->sc = sc;
266 15
                VTAILQ_INSERT_TAIL(&sc->segments, sg, list);
267 15
                sg2 = sg;
268 15
                if (sg1 == NULL)
269 15
                        sg1 = sg;
270
        }
271 15
        printf("Dropped %d segments to make free_reserve\n", n);
272 15
        return (0);
273
}
274
275
/*--------------------------------------------------------------------
276
 * Silo worker thread
277
 */
278
279
static void * v_matchproto_(bgthread_t)
280 38
smp_thread(struct worker *wrk, void *priv)
281
{
282
        struct smp_sc   *sc;
283
        struct smp_seg *sg;
284
285 38
        CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
286 38
        CAST_OBJ_NOTNULL(sc, priv, SMP_SC_MAGIC);
287 38
        sc->thread = pthread_self();
288
289
        /* First, load all the objects from all segments */
290 91
        VTAILQ_FOREACH(sg, &sc->segments, list)
291 53
                if (sg->flags & SMP_SEG_MUSTLOAD)
292 15
                        smp_load_seg(wrk, sc, sg);
293
294 38
        sc->flags |= SMP_SC_LOADED;
295 38
        BAN_Release();
296 38
        printf("Silo completely loaded\n");
297
298
        /* Housekeeping loop */
299 38
        Lck_Lock(&sc->mtx);
300 123
        while (!(sc->flags & SMP_SC_STOP)) {
301 48
                sg = VTAILQ_FIRST(&sc->segments);
302 48
                if (sg != NULL && sg != sc->cur_seg && sg->nobj == 0)
303 2
                        smp_save_segs(sc);
304
305 48
                Lck_Unlock(&sc->mtx);
306 48
                VTIM_sleep(3.14159265359 - 2);
307 47
                Lck_Lock(&sc->mtx);
308
        }
309
310 37
        smp_save_segs(sc);
311
312 37
        Lck_Unlock(&sc->mtx);
313 37
        pthread_exit(0);
314
315
        NEEDLESS(return NULL);
316
}
317
318
/*--------------------------------------------------------------------
319
 * Open a silo in the worker process
320
 */
321
322
static void v_matchproto_(storage_open_f)
323 38
smp_open(struct stevedore *st)
324
{
325
        struct smp_sc   *sc;
326
327 38
        ASSERT_CLI();
328
329 38
        if (VTAILQ_EMPTY(&silos))
330 29
                smp_init();
331
332 38
        CAST_OBJ_NOTNULL(sc, st->priv, SMP_SC_MAGIC);
333
334 38
        Lck_New(&sc->mtx, lck_smp);
335 38
        Lck_Lock(&sc->mtx);
336
337 38
        sc->stevedore = st;
338
339
        /* We trust the parent to give us a valid silo, for good measure: */
340 38
        AZ(smp_valid_silo(sc));
341
342 38
        AZ(mprotect((void*)sc->base, 4096, PROT_READ));
343
344 38
        sc->ident = SIGN_DATA(&sc->idn);
345
346
        /* Check ban lists */
347 38
        if (smp_chk_signspace(&sc->ban1)) {
348
                /* Ban list 1 is broken, use ban2 */
349 0
                AZ(smp_chk_signspace(&sc->ban2));
350 0
                smp_copy_signspace(&sc->ban1, &sc->ban2);
351 0
                smp_sync_sign(&sc->ban1.ctx);
352
        } else {
353
                /* Ban1 is OK, copy to ban2 for consistency */
354 38
                smp_copy_signspace(&sc->ban2, &sc->ban1);
355 38
                smp_sync_sign(&sc->ban2.ctx);
356
        }
357 38
        AZ(smp_open_bans(sc, &sc->ban1));
358
359
        /* We attempt seg1 first, and if that fails, try seg2 */
360 38
        if (smp_open_segs(sc, &sc->seg1))
361 0
                AZ(smp_open_segs(sc, &sc->seg2));
362
363
        /*
364
         * Grap a reference to the tail of the ban list, until the thread
365
         * has loaded all objects, so we can be sure that all of our
366
         * proto-bans survive until then.
367
         */
368 38
        BAN_Hold();
369
370
        /* XXX: save segments to ensure consistency between seg1 & seg2 ? */
371
372
        /* XXX: abandon early segments to make sure we have free space ? */
373
374 38
        (void)ObjSubscribeEvents(smp_oc_event, st,
375
            OEV_BANCHG|OEV_TTLCHG|OEV_INSERT);
376
377
        /* Open a new segment, so we are ready to write */
378 38
        smp_new_seg(sc);
379
380
        /* Start the worker silo worker thread, it will load the objects */
381 38
        WRK_BgThread(&sc->bgthread, "persistence", smp_thread, sc);
382
383 38
        VTAILQ_INSERT_TAIL(&silos, sc, list);
384 38
        Lck_Unlock(&sc->mtx);
385 38
}
386
387
/*--------------------------------------------------------------------
388
 * Close a silo
389
 */
390
391
static void v_matchproto_(storage_close_f)
392 74
smp_close(const struct stevedore *st, int warn)
393
{
394
        struct smp_sc   *sc;
395
        void *status;
396
397 74
        ASSERT_CLI();
398
399 74
        CAST_OBJ_NOTNULL(sc, st->priv, SMP_SC_MAGIC);
400 74
        if (warn) {
401 37
                Lck_Lock(&sc->mtx);
402 37
                if (sc->cur_seg != NULL)
403 37
                        smp_close_seg(sc, sc->cur_seg);
404 37
                AZ(sc->cur_seg);
405 37
                sc->flags |= SMP_SC_STOP;
406 37
                Lck_Unlock(&sc->mtx);
407
        } else {
408 37
                AZ(pthread_join(sc->bgthread, &status));
409 37
                AZ(status);
410
        }
411 74
}
412
413
/*--------------------------------------------------------------------
414
 * Allocate a bite.
415
 *
416
 * Allocate [min_size...max_size] space from the bottom of the segment,
417
 * as is convenient.
418
 *
419
 * If 'so' + 'idx' is given, also allocate a smp_object from the top
420
 * of the segment.
421
 *
422
 * Return the segment in 'ssg' if given.
423
 */
424
425
static struct storage *
426 27
smp_allocx(const struct stevedore *st, size_t min_size, size_t max_size,
427
    struct smp_object **so, unsigned *idx, struct smp_seg **ssg)
428
{
429
        struct smp_sc *sc;
430
        struct storage *ss;
431
        struct smp_seg *sg;
432
        uint64_t left, extra;
433
434 27
        CAST_OBJ_NOTNULL(sc, st->priv, SMP_SC_MAGIC);
435 27
        assert(min_size <= max_size);
436
437 27
        max_size = IRNUP(sc, max_size);
438 27
        min_size = IRNUP(sc, min_size);
439
440 27
        extra = IRNUP(sc, sizeof(*ss));
441 27
        if (so != NULL) {
442 22
                extra += sizeof(**so);
443 22
                AN(idx);
444
        }
445
446 27
        Lck_Lock(&sc->mtx);
447 27
        sg = NULL;
448 27
        ss = NULL;
449
450 27
        left = 0;
451 27
        if (sc->cur_seg != NULL)
452 27
                left = smp_spaceleft(sc, sc->cur_seg);
453 27
        if (left < extra + min_size) {
454 0
                if (sc->cur_seg != NULL)
455 0
                        smp_close_seg(sc, sc->cur_seg);
456 0
                smp_new_seg(sc);
457 0
                if (sc->cur_seg != NULL)
458 0
                        left = smp_spaceleft(sc, sc->cur_seg);
459
                else
460 0
                        left = 0;
461
        }
462
463 27
        if (left >= extra + min_size)  {
464 27
                AN(sc->cur_seg);
465 27
                if (left < extra + max_size)
466 0
                        max_size = IRNDN(sc, left - extra);
467
468 27
                sg = sc->cur_seg;
469 27
                ss = (void*)(sc->base + sc->next_bot);
470 27
                sc->next_bot += max_size + IRNUP(sc, sizeof(*ss));
471 27
                sg->nalloc++;
472 27
                if (so != NULL) {
473 22
                        sc->next_top -= sizeof(**so);
474 22
                        *so = (void*)(sc->base + sc->next_top);
475
                        /* Render this smp_object mostly harmless */
476 22
                        EXP_ZERO((*so));
477 22
                        (*so)->ban = 0.;
478 22
                        (*so)->ptr = 0;
479 22
                        sg->objs = *so;
480 22
                        *idx = ++sg->p.lobjlist;
481
                }
482 27
                (void)smp_spaceleft(sc, sg);    /* for the assert */
483
        }
484 27
        Lck_Unlock(&sc->mtx);
485
486 27
        if (ss == NULL)
487 0
                return (ss);
488 27
        AN(sg);
489 27
        assert(max_size >= min_size);
490
491
        /* Fill the storage structure */
492 27
        INIT_OBJ(ss, STORAGE_MAGIC);
493 27
        ss->ptr = PRNUP(sc, ss + 1);
494 27
        ss->space = max_size;
495 27
        ss->priv = sc;
496 27
        if (ssg != NULL)
497 22
                *ssg = sg;
498 27
        return (ss);
499
}
500
501
/*--------------------------------------------------------------------
502
 * Allocate an object
503
 */
504
505
static int v_matchproto_(storage_allocobj_f)
506 22
smp_allocobj(struct worker *wrk, const struct stevedore *stv,
507
    struct objcore *oc, unsigned wsl)
508
{
509
        struct object *o;
510
        struct storage *st;
511
        struct smp_sc   *sc;
512
        struct smp_seg *sg;
513
        struct smp_object *so;
514
        unsigned objidx;
515
        unsigned ltot;
516
517 22
        CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
518 22
        CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
519 22
        CAST_OBJ_NOTNULL(sc, stv->priv, SMP_SC_MAGIC);
520
521
        /* Don't entertain already dead objects */
522 22
        if (oc->flags & OC_F_DYING)
523 0
                return (0);
524 22
        if (oc->t_origin <= 0.)
525 0
                return (0);
526 22
        if (oc->ttl + oc->grace + oc->keep <= 0.)
527 0
                return (0);
528
529 22
        ltot = sizeof(struct object) + PRNDUP(wsl);
530 22
        ltot = IRNUP(sc, ltot);
531
532 22
        st = NULL;
533 22
        sg = NULL;
534 22
        so = NULL;
535 22
        objidx = 0;
536
537
        do {
538 22
                st = smp_allocx(stv, ltot, ltot, &so, &objidx, &sg);
539 22
                if (st != NULL && st->space < ltot) {
540 0
                        stv->sml_free(st);              // NOP
541 0
                        st = NULL;
542
                }
543 22
        } while (st == NULL && LRU_NukeOne(wrk, stv->lru));
544 22
        if (st == NULL)
545 0
                return (0);
546
547 22
        AN(st);
548 22
        AN(sg);
549 22
        AN(so);
550 22
        assert(st->space >= ltot);
551
552 22
        o = SML_MkObject(stv, oc, st->ptr);
553 22
        AN(oc->stobj->stevedore);
554 22
        assert(oc->stobj->stevedore == stv);
555 22
        CHECK_OBJ_NOTNULL(o, OBJECT_MAGIC);
556 22
        o->objstore = st;
557 22
        st->len = sizeof(*o);
558
559 22
        Lck_Lock(&sc->mtx);
560 22
        sg->nfixed++;
561 22
        sg->nobj++;
562
563
        /* We have to do this somewhere, might as well be here... */
564
        assert(sizeof so->hash == DIGEST_LEN);
565 22
        memcpy(so->hash, oc->objhead->digest, DIGEST_LEN);
566 22
        EXP_COPY(so, oc);
567 22
        so->ptr = (uint8_t*)o - sc->base;
568 22
        so->ban = BAN_Time(oc->ban);
569
570 22
        smp_init_oc(oc, sg, objidx);
571
572 22
        VTAILQ_INSERT_TAIL(&sg->objcores, oc, lru_list);
573 22
        Lck_Unlock(&sc->mtx);
574 22
        return (1);
575
}
576
577
/*--------------------------------------------------------------------
578
 * Allocate a bite
579
 */
580
581
static struct storage * v_matchproto_(sml_alloc_f)
582 5
smp_alloc(const struct stevedore *st, size_t size)
583
{
584
585 5
        return (smp_allocx(st,
586
            size > 4096 ? 4096 : size, size, NULL, NULL, NULL));
587
}
588
589
/*--------------------------------------------------------------------*/
590
591
const struct stevedore smp_stevedore = {
592
        .magic          = STEVEDORE_MAGIC,
593
        .name           = "deprecated_persistent",
594
        .init           = smp_mgt_init,
595
        .open           = smp_open,
596
        .close          = smp_close,
597
        .allocobj       = smp_allocobj,
598
        .baninfo        = smp_baninfo,
599
        .banexport      = smp_banexport,
600
        .methods        = &smp_oc_realmethods,
601
602
        .sml_alloc      = smp_alloc,
603
        .sml_free       = NULL,
604
        .sml_getobj     = smp_sml_getobj,
605
};
606
607
/*--------------------------------------------------------------------
608
 * Persistence is a bear to test unadulterated, so we cheat by adding
609
 * a cli command we can use to make it do tricks for us.
610
 */
611
612
static void
613 3
debug_report_silo(struct cli *cli, const struct smp_sc *sc)
614
{
615
        struct smp_seg *sg;
616
617 6
        VCLI_Out(cli, "Silo: %s (%s)\n",
618 3
            sc->stevedore->ident, sc->filename);
619 7
        VTAILQ_FOREACH(sg, &sc->segments, list) {
620 4
                VCLI_Out(cli, "  Seg: [0x%jx ... +0x%jx]\n",
621
                   (uintmax_t)sg->p.offset, (uintmax_t)sg->p.length);
622 4
                if (sg == sc->cur_seg)
623 3
                        VCLI_Out(cli,
624
                           "    Alloc: [0x%jx ... 0x%jx] = 0x%jx free\n",
625
                           (uintmax_t)(sc->next_bot),
626
                           (uintmax_t)(sc->next_top),
627 3
                           (uintmax_t)(sc->next_top - sc->next_bot));
628 4
                VCLI_Out(cli, "    %u nobj, %u alloc, %u lobjlist, %u fixed\n",
629
                    sg->nobj, sg->nalloc, sg->p.lobjlist, sg->nfixed);
630
        }
631 3
}
632
633
static void v_matchproto_(cli_func_t)
634 6
debug_persistent(struct cli *cli, const char * const * av, void *priv)
635
{
636
        struct smp_sc *sc;
637
638
        (void)priv;
639
640 6
        if (av[2] == NULL) {
641 0
                VTAILQ_FOREACH(sc, &silos, list)
642 0
                        debug_report_silo(cli, sc);
643 0
                return;
644
        }
645 6
        VTAILQ_FOREACH(sc, &silos, list)
646 6
                if (!strcmp(av[2], sc->stevedore->ident))
647 6
                        break;
648 6
        if (sc == NULL) {
649 0
                VCLI_Out(cli, "Silo <%s> not found\n", av[2]);
650 0
                VCLI_SetResult(cli, CLIS_PARAM);
651 0
                return;
652
        }
653 6
        if (av[3] == NULL) {
654 0
                debug_report_silo(cli, sc);
655 0
                return;
656
        }
657 6
        Lck_Lock(&sc->mtx);
658 6
        if (!strcmp(av[3], "sync")) {
659 3
                if (sc->cur_seg != NULL)
660 3
                        smp_close_seg(sc, sc->cur_seg);
661 3
                smp_new_seg(sc);
662 3
        } else if (!strcmp(av[3], "dump")) {
663 3
                debug_report_silo(cli, sc);
664
        } else {
665 0
                VCLI_Out(cli, "Unknown operation\n");
666 0
                VCLI_SetResult(cli, CLIS_PARAM);
667
        }
668 6
        Lck_Unlock(&sc->mtx);
669
}
670
671
static struct cli_proto debug_cmds[] = {
672
        { CLICMD_DEBUG_PERSISTENT,              "d", debug_persistent },
673
        { NULL }
674
};
675
676
/*--------------------------------------------------------------------
677
 */
678
679
static void
680 29
smp_init(void)
681
{
682 29
        lck_smp = Lck_CreateClass("smp");
683 29
        CLI_AddFuncs(debug_cmds);
684 29
        smp_oc_realmethods = SML_methods;
685 29
        smp_oc_realmethods.objtouch = NULL;
686 29
        smp_oc_realmethods.objfree = smp_oc_objfree;
687 29
}
688
689
/*--------------------------------------------------------------------
690
 * Pause until all silos have loaded.
691
 */
692
693
void
694 24
SMP_Ready(void)
695
{
696
        struct smp_sc *sc;
697
698 24
        ASSERT_CLI();
699
        do {
700 57
                VTAILQ_FOREACH(sc, &silos, list)
701 33
                        if (!(sc->flags & SMP_SC_LOADED))
702 0
                                break;
703 24
                if (sc != NULL)
704 0
                        (void)sleep(1);
705 24
        } while (sc != NULL);
706 24
}