varnish-cache/bin/varnishd/waiter/cache_waiter_kqueue.c
0
/*-
1
 * Copyright (c) 2006 Verdens Gang AS
2
 * Copyright (c) 2006-2011 Varnish Software AS
3
 * All rights reserved.
4
 *
5
 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
6
 *
7
 * SPDX-License-Identifier: BSD-2-Clause
8
 *
9
 * Redistribution and use in source and binary forms, with or without
10
 * modification, are permitted provided that the following conditions
11
 * are met:
12
 * 1. Redistributions of source code must retain the above copyright
13
 *    notice, this list of conditions and the following disclaimer.
14
 * 2. Redistributions in binary form must reproduce the above copyright
15
 *    notice, this list of conditions and the following disclaimer in the
16
 *    documentation and/or other materials provided with the distribution.
17
 *
18
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
22
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28
 * SUCH DAMAGE.
29
 *
30
 */
31
32
#include "config.h"
33
34
#if defined(HAVE_KQUEUE)
35
36
#include "cache/cache_varnishd.h"
37
38
#include <sys/event.h>
39
40
#include <stdlib.h>
41
42
#include "waiter/waiter.h"
43
#include "waiter/waiter_priv.h"
44
#include "vtim.h"
45
46
#define NKEV    256
47
48
struct vwk {
49
        unsigned                magic;
50
#define VWK_MAGIC               0x1cc2acc2
51
        int                     kq;
52
        struct waiter           *waiter;
53
        pthread_t               thread;
54
        double                  next;
55
        int                     pipe[2];
56
        unsigned                nwaited;
57
        int                     die;
58
        struct lock             mtx;
59
};
60
61
/*--------------------------------------------------------------------*/
62
63
static void *
64 40
vwk_thread(void *priv)
65
{
66
        struct vwk *vwk;
67
        struct kevent ke[NKEV], *kp;
68
        int j, n;
69
        double now, then;
70
        struct timespec ts;
71
        struct waited *wp;
72
        struct waiter *w;
73
        char c;
74
75 40
        CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
76 40
        w = vwk->waiter;
77 40
        CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
78 40
        THR_SetName("cache-kqueue");
79 40
        THR_Init();
80
81 40
        now = VTIM_real();
82 180789
        while (1) {
83 181268
                while (1) {
84 181268
                        Lck_Lock(&vwk->mtx);
85
                        /*
86
                         * XXX: We could avoid many syscalls here if we were
87
                         * XXX: allowed to just close the fd's on timeout.
88
                         */
89 181268
                        then = Wait_HeapDue(w, &wp);
90 181268
                        if (wp == NULL) {
91 143009
                                vwk->next = now + 100;
92 143009
                                break;
93 38259
                        } else if (then > now) {
94 37780
                                vwk->next = then;
95 37780
                                break;
96
                        }
97 479
                        CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
98 479
                        EV_SET(ke, wp->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
99 479
                        AZ(kevent(vwk->kq, ke, 1, NULL, 0, NULL));
100 479
                        AN(Wait_HeapDelete(w, wp));
101 479
                        Lck_Unlock(&vwk->mtx);
102 479
                        Wait_Call(w, wp, WAITER_TIMEOUT, now);
103
                }
104 180789
                then = vwk->next - now;
105 180789
                ts = VTIM_timespec(then);
106 180789
                Lck_Unlock(&vwk->mtx);
107 180789
                n = kevent(vwk->kq, NULL, 0, ke, NKEV, &ts);
108 180789
                assert(n >= 0);
109 180789
                assert(n <= NKEV);
110 180789
                now = VTIM_real();
111 319676
                for (kp = ke, j = 0; j < n; j++, kp++) {
112 138887
                        assert(kp->filter == EVFILT_READ);
113 138887
                        if ((uintptr_t)ke[j].udata == (uintptr_t)vwk) {
114 69362
                                assert(read(vwk->pipe[0], &c, 1) == 1);
115 69362
                                continue;
116
                        }
117 69525
                        CAST_OBJ_NOTNULL(wp, (void*)ke[j].udata, WAITED_MAGIC);
118 69525
                        Lck_Lock(&vwk->mtx);
119 69525
                        AN(Wait_HeapDelete(w, wp));
120 69525
                        Lck_Unlock(&vwk->mtx);
121 69525
                        vwk->nwaited--;
122 69525
                        if (kp->flags & EV_EOF &&
123 44930
                            recv(wp->fd, &c, 1, MSG_PEEK) == 0)
124 37775
                                Wait_Call(w, wp, WAITER_REMCLOSE, now);
125
                        else
126 31750
                                Wait_Call(w, wp, WAITER_ACTION, now);
127 69525
                }
128 180789
                if (vwk->nwaited == 0 && vwk->die)
129 40
                        break;
130
        }
131 40
        closefd(&vwk->pipe[0]);
132 40
        closefd(&vwk->pipe[1]);
133 40
        closefd(&vwk->kq);
134 40
        return (NULL);
135
}
136
137
/*--------------------------------------------------------------------*/
138
139
static int v_matchproto_(waiter_enter_f)
140 70803
vwk_enter(void *priv, struct waited *wp)
141
{
142
        struct vwk *vwk;
143
        struct kevent ke;
144
145 70803
        CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
146 70803
        EV_SET(&ke, wp->fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, wp);
147 70803
        Lck_Lock(&vwk->mtx);
148 70803
        vwk->nwaited++;
149 70803
        Wait_HeapInsert(vwk->waiter, wp);
150 70803
        AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
151
152
        /* If the kqueue isn't due before our timeout, poke it via the pipe */
153 70803
        if (Wait_When(wp) < vwk->next)
154 69322
                assert(write(vwk->pipe[1], "X", 1) == 1);
155
156 70803
        Lck_Unlock(&vwk->mtx);
157 70803
        return (0);
158
}
159
160
/*--------------------------------------------------------------------*/
161
162
static void v_matchproto_(waiter_init_f)
163 72057
vwk_init(struct waiter *w)
164
{
165
        struct vwk *vwk;
166
        struct kevent ke;
167
168 72057
        CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
169 72057
        vwk = w->priv;
170 72057
        INIT_OBJ(vwk, VWK_MAGIC);
171 72057
        vwk->waiter = w;
172
173 72057
        vwk->kq = kqueue();
174 72057
        assert(vwk->kq >= 0);
175 72057
        Lck_New(&vwk->mtx, lck_waiter);
176 72057
        AZ(pipe(vwk->pipe));
177 72057
        EV_SET(&ke, vwk->pipe[0], EVFILT_READ, EV_ADD, 0, 0, vwk);
178 72057
        AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
179
180 72057
        PTOK(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
181 72057
}
182
183
/*--------------------------------------------------------------------
184
 * It is the callers responsibility to trigger all fd's waited on to
185
 * fail somehow.
186
 */
187
188
static void v_matchproto_(waiter_fini_f)
189 40
vwk_fini(struct waiter *w)
190
{
191
        struct vwk *vwk;
192
        void *vp;
193
194 40
        CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
195 40
        Lck_Lock(&vwk->mtx);
196 40
        vwk->die = 1;
197 40
        assert(write(vwk->pipe[1], "Y", 1) == 1);
198 40
        Lck_Unlock(&vwk->mtx);
199 40
        PTOK(pthread_join(vwk->thread, &vp));
200 40
        Lck_Delete(&vwk->mtx);
201 40
}
202
203
/*--------------------------------------------------------------------*/
204
205
#include "waiter/mgt_waiter.h"
206
207
const struct waiter_impl waiter_kqueue = {
208
        .name =         "kqueue",
209
        .init =         vwk_init,
210
        .fini =         vwk_fini,
211
        .enter =        vwk_enter,
212
        .size =         sizeof(struct vwk),
213
};
214
215
#endif /* defined(HAVE_KQUEUE) */