varnish-cache/bin/varnishd/waiter/cache_waiter_kqueue.c
0
/*-
1
 * Copyright (c) 2006 Verdens Gang AS
2
 * Copyright (c) 2006-2011 Varnish Software AS
3
 * All rights reserved.
4
 *
5
 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
6
 *
7
 * SPDX-License-Identifier: BSD-2-Clause
8
 *
9
 * Redistribution and use in source and binary forms, with or without
10
 * modification, are permitted provided that the following conditions
11
 * are met:
12
 * 1. Redistributions of source code must retain the above copyright
13
 *    notice, this list of conditions and the following disclaimer.
14
 * 2. Redistributions in binary form must reproduce the above copyright
15
 *    notice, this list of conditions and the following disclaimer in the
16
 *    documentation and/or other materials provided with the distribution.
17
 *
18
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
22
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28
 * SUCH DAMAGE.
29
 *
30
 */
31
32
#include "config.h"
33
34
#if defined(HAVE_KQUEUE)
35
36
#include "cache/cache_varnishd.h"
37
38
#include <sys/event.h>
39
40
#include <stdlib.h>
41
42
#include "waiter/waiter.h"
43
#include "waiter/waiter_priv.h"
44
#include "vtim.h"
45
46
#define NKEV    256
47
48
struct vwk {
49
        unsigned                magic;
50
#define VWK_MAGIC               0x1cc2acc2
51
        int                     kq;
52
        struct waiter           *waiter;
53
        pthread_t               thread;
54
        double                  next;
55
        int                     pipe[2];
56
        unsigned                nwaited;
57
        int                     die;
58
        struct lock             mtx;
59
};
60
61
/*--------------------------------------------------------------------*/
62
63
static void *
64 1
vwk_thread(void *priv)
65
{
66
        struct vwk *vwk;
67
        struct kevent ke[NKEV], *kp;
68
        int j, n;
69
        double now, then;
70
        struct timespec ts;
71
        struct waited *wp;
72
        struct waiter *w;
73
        char c;
74
75 1
        CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
76 1
        w = vwk->waiter;
77 1
        CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
78 1
        THR_SetName("cache-kqueue");
79 1
        THR_Init();
80
81 1
        now = VTIM_real();
82 4615
        while (1) {
83 4628
                while (1) {
84 4628
                        Lck_Lock(&vwk->mtx);
85
                        /*
86
                         * XXX: We could avoid many syscalls here if we were
87
                         * XXX: allowed to just close the fd's on timeout.
88
                         */
89 4628
                        then = Wait_HeapDue(w, &wp);
90 4628
                        if (wp == NULL) {
91 3650
                                vwk->next = now + 100;
92 3650
                                break;
93 978
                        } else if (then > now) {
94 965
                                vwk->next = then;
95 965
                                break;
96
                        }
97 13
                        CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
98 13
                        EV_SET(ke, wp->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
99 13
                        AZ(kevent(vwk->kq, ke, 1, NULL, 0, NULL));
100 13
                        AN(Wait_HeapDelete(w, wp));
101 13
                        Lck_Unlock(&vwk->mtx);
102 13
                        Wait_Call(w, wp, WAITER_TIMEOUT, now);
103
                }
104 4615
                then = vwk->next - now;
105 4615
                ts = VTIM_timespec(then);
106 4615
                Lck_Unlock(&vwk->mtx);
107 4615
                n = kevent(vwk->kq, NULL, 0, ke, NKEV, &ts);
108 4615
                assert(n >= 0);
109 4615
                assert(n <= NKEV);
110 4615
                now = VTIM_real();
111 8159
                for (kp = ke, j = 0; j < n; j++, kp++) {
112 3544
                        assert(kp->filter == EVFILT_READ);
113 3544
                        if ((uintptr_t)ke[j].udata == (uintptr_t)vwk) {
114 1769
                                assert(read(vwk->pipe[0], &c, 1) == 1);
115 1769
                                continue;
116
                        }
117 1775
                        CAST_OBJ_NOTNULL(wp, (void*)ke[j].udata, WAITED_MAGIC);
118 1775
                        Lck_Lock(&vwk->mtx);
119 1775
                        AN(Wait_HeapDelete(w, wp));
120 1775
                        Lck_Unlock(&vwk->mtx);
121 1775
                        vwk->nwaited--;
122 1775
                        if (kp->flags & EV_EOF &&
123 1132
                            recv(wp->fd, &c, 1, MSG_PEEK) == 0)
124 960
                                Wait_Call(w, wp, WAITER_REMCLOSE, now);
125
                        else
126 815
                                Wait_Call(w, wp, WAITER_ACTION, now);
127 1775
                }
128 4615
                if (vwk->nwaited == 0 && vwk->die)
129 1
                        break;
130
        }
131 1
        closefd(&vwk->pipe[0]);
132 1
        closefd(&vwk->pipe[1]);
133 1
        closefd(&vwk->kq);
134 1
        return (NULL);
135
}
136
137
/*--------------------------------------------------------------------*/
138
139
static int v_matchproto_(waiter_enter_f)
140 1809
vwk_enter(void *priv, struct waited *wp)
141
{
142
        struct vwk *vwk;
143
        struct kevent ke;
144
145 1809
        CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
146 1809
        EV_SET(&ke, wp->fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, wp);
147 1809
        Lck_Lock(&vwk->mtx);
148 1809
        vwk->nwaited++;
149 1809
        Wait_HeapInsert(vwk->waiter, wp);
150 1809
        AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
151
152
        /* If the kqueue isn't due before our timeout, poke it via the pipe */
153 1809
        if (Wait_When(wp) < vwk->next)
154 1768
                assert(write(vwk->pipe[1], "X", 1) == 1);
155
156 1809
        Lck_Unlock(&vwk->mtx);
157 1809
        return (0);
158
}
159
160
/*--------------------------------------------------------------------*/
161
162
static void v_matchproto_(waiter_init_f)
163 1840
vwk_init(struct waiter *w)
164
{
165
        struct vwk *vwk;
166
        struct kevent ke;
167
168 1840
        CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
169 1840
        vwk = w->priv;
170 1840
        INIT_OBJ(vwk, VWK_MAGIC);
171 1840
        vwk->waiter = w;
172
173 1840
        vwk->kq = kqueue();
174 1840
        assert(vwk->kq >= 0);
175 1840
        Lck_New(&vwk->mtx, lck_waiter);
176 1840
        AZ(pipe(vwk->pipe));
177 1840
        EV_SET(&ke, vwk->pipe[0], EVFILT_READ, EV_ADD, 0, 0, vwk);
178 1840
        AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
179
180 1840
        PTOK(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
181 1840
}
182
183
/*--------------------------------------------------------------------
184
 * It is the callers responsibility to trigger all fd's waited on to
185
 * fail somehow.
186
 */
187
188
static void v_matchproto_(waiter_fini_f)
189 1
vwk_fini(struct waiter *w)
190
{
191
        struct vwk *vwk;
192
        void *vp;
193
194 1
        CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
195 1
        Lck_Lock(&vwk->mtx);
196 1
        vwk->die = 1;
197 1
        assert(write(vwk->pipe[1], "Y", 1) == 1);
198 1
        Lck_Unlock(&vwk->mtx);
199 1
        PTOK(pthread_join(vwk->thread, &vp));
200 1
        Lck_Delete(&vwk->mtx);
201 1
}
202
203
/*--------------------------------------------------------------------*/
204
205
#include "waiter/mgt_waiter.h"
206
207
const struct waiter_impl waiter_kqueue = {
208
        .name =         "kqueue",
209
        .init =         vwk_init,
210
        .fini =         vwk_fini,
211
        .enter =        vwk_enter,
212
        .size =         sizeof(struct vwk),
213
};
214
215
#endif /* defined(HAVE_KQUEUE) */