varnish-cache/bin/varnishd/waiter/cache_waiter_kqueue.c
1
/*-
2
 * Copyright (c) 2006 Verdens Gang AS
3
 * Copyright (c) 2006-2011 Varnish Software AS
4
 * All rights reserved.
5
 *
6
 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
7
 *
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions
10
 * are met:
11
 * 1. Redistributions of source code must retain the above copyright
12
 *    notice, this list of conditions and the following disclaimer.
13
 * 2. Redistributions in binary form must reproduce the above copyright
14
 *    notice, this list of conditions and the following disclaimer in the
15
 *    documentation and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
21
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27
 * SUCH DAMAGE.
28
 *
29
 */
30
31
//lint -e{766}
32
#include "config.h"
33
34
#if defined(HAVE_KQUEUE)
35
36
#include "cache/cache_varnishd.h"
37
38
#include <sys/event.h>
39
40
#include <stdlib.h>
41
42
#include "waiter/waiter.h"
43
#include "waiter/waiter_priv.h"
44
#include "vtim.h"
45
46
#define NKEV    256
47
48
struct vwk {
49
        unsigned                magic;
50
#define VWK_MAGIC               0x1cc2acc2
51
        int                     kq;
52
        struct waiter           *waiter;
53
        pthread_t               thread;
54
        double                  next;
55
        int                     pipe[2];
56
        unsigned                nwaited;
57
        int                     die;
58
        struct lock             mtx;
59
};
60
61
/*--------------------------------------------------------------------*/
62
63
static void *
64 1213
vwk_thread(void *priv)
65
{
66
        struct vwk *vwk;
67
        struct kevent ke[NKEV], *kp;
68
        int j, n;
69
        double now, then;
70
        struct timespec ts;
71
        struct waited *wp;
72
        struct waiter *w;
73
        char c;
74
75 1213
        CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
76 1213
        w = vwk->waiter;
77 1213
        CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
78 1213
        THR_SetName("cache-kqueue");
79 1213
        THR_Init();
80
81 1213
        now = VTIM_real();
82
        while (1) {
83
                while (1) {
84 3001
                        Lck_Lock(&vwk->mtx);
85
                        /*
86
                         * XXX: We could avoid many syscalls here if we were
87
                         * XXX: allowed to just close the fd's on timeout.
88
                         */
89 3001
                        then = Wait_HeapDue(w, &wp);
90 3001
                        if (wp == NULL) {
91 2415
                                vwk->next = now + 100;
92 2415
                                break;
93 586
                        } else if (then > now) {
94 582
                                vwk->next = then;
95 582
                                break;
96
                        }
97 4
                        CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
98 4
                        EV_SET(ke, wp->fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
99 4
                        AZ(kevent(vwk->kq, ke, 1, NULL, 0, NULL));
100 4
                        AN(Wait_HeapDelete(w, wp));
101 4
                        Lck_Unlock(&vwk->mtx);
102 4
                        Wait_Call(w, wp, WAITER_TIMEOUT, now);
103 4
                }
104 2997
                then = vwk->next - now;
105 2997
                ts.tv_sec = (time_t)floor(then);
106 2997
                ts.tv_nsec = (long)(1e9 * (then - ts.tv_sec));
107 2997
                Lck_Unlock(&vwk->mtx);
108 2997
                n = kevent(vwk->kq, NULL, 0, ke, NKEV, &ts);
109 1785
                assert(n >= 0);
110 1785
                assert(n <= NKEV);
111 1785
                now = VTIM_real();
112 4166
                for (kp = ke, j = 0; j < n; j++, kp++) {
113 2381
                        assert(kp->filter == EVFILT_READ);
114 2381
                        if (ke[j].udata == vwk) {
115 1188
                                assert(read(vwk->pipe[0], &c, 1) == 1);
116 1188
                                continue;
117
                        }
118 1193
                        CAST_OBJ_NOTNULL(wp, ke[j].udata, WAITED_MAGIC);
119 1193
                        Lck_Lock(&vwk->mtx);
120 1193
                        AN(Wait_HeapDelete(w, wp));
121 1193
                        Lck_Unlock(&vwk->mtx);
122 1193
                        vwk->nwaited--;
123 1193
                        if (kp->flags & EV_EOF)
124 789
                                Wait_Call(w, wp, WAITER_REMCLOSE, now);
125
                        else
126 404
                                Wait_Call(w, wp, WAITER_ACTION, now);
127
                }
128 1785
                if (vwk->nwaited == 0 && vwk->die)
129 1
                        break;
130 1784
        }
131 1
        closefd(&vwk->pipe[0]);
132 1
        closefd(&vwk->pipe[1]);
133 1
        closefd(&vwk->kq);
134 1
        return(NULL);
135
}
136
137
/*--------------------------------------------------------------------*/
138
139
static int v_matchproto_(waiter_enter_f)
140 1205
vwk_enter(void *priv, struct waited *wp)
141
{
142
        struct vwk *vwk;
143
        struct kevent ke;
144
145 1205
        CAST_OBJ_NOTNULL(vwk, priv, VWK_MAGIC);
146 1205
        EV_SET(&ke, wp->fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, wp);
147 1205
        Lck_Lock(&vwk->mtx);
148 1205
        vwk->nwaited++;
149 1205
        Wait_HeapInsert(vwk->waiter, wp);
150 1205
        AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
151
152
        /* If the kqueue isn't due before our timeout, poke it via the pipe */
153 1205
        if (Wait_When(wp) < vwk->next)
154 1187
                assert(write(vwk->pipe[1], "X", 1) == 1);
155
156 1205
        Lck_Unlock(&vwk->mtx);
157 1205
        return(0);
158
}
159
160
/*--------------------------------------------------------------------*/
161
162
static void v_matchproto_(waiter_init_f)
163 1213
vwk_init(struct waiter *w)
164
{
165
        struct vwk *vwk;
166
        struct kevent ke;
167
168 1213
        CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
169 1213
        vwk = w->priv;
170 1213
        INIT_OBJ(vwk, VWK_MAGIC);
171 1213
        vwk->waiter = w;
172
173 1213
        vwk->kq = kqueue();
174 1213
        assert(vwk->kq >= 0);
175 1213
        Lck_New(&vwk->mtx, lck_waiter);
176 1213
        AZ(pipe(vwk->pipe));
177 1213
        EV_SET(&ke, vwk->pipe[0], EVFILT_READ, EV_ADD, 0, 0, vwk);
178 1213
        AZ(kevent(vwk->kq, &ke, 1, NULL, 0, NULL));
179
180 1213
        AZ(pthread_create(&vwk->thread, NULL, vwk_thread, vwk));
181 1213
}
182
183
/*--------------------------------------------------------------------
184
 * It is the callers responsibility to trigger all fd's waited on to
185
 * fail somehow.
186
 */
187
188
static void v_matchproto_(waiter_fini_f)
189 1
vwk_fini(struct waiter *w)
190
{
191
        struct vwk *vwk;
192
        void *vp;
193
194 1
        CAST_OBJ_NOTNULL(vwk, w->priv, VWK_MAGIC);
195 1
        Lck_Lock(&vwk->mtx);
196 1
        vwk->die = 1;
197 1
        assert(write(vwk->pipe[1], "Y", 1) == 1);
198 1
        Lck_Unlock(&vwk->mtx);
199 1
        AZ(pthread_join(vwk->thread, &vp));
200 1
        Lck_Delete(&vwk->mtx);
201 1
}
202
203
/*--------------------------------------------------------------------*/
204
205
#include "waiter/mgt_waiter.h"
206
207
const struct waiter_impl waiter_kqueue = {
208
        .name =         "kqueue",
209
        .init =         vwk_init,
210
        .fini =         vwk_fini,
211
        .enter =        vwk_enter,
212
        .size =         sizeof(struct vwk),
213
};
214
215
#endif /* defined(HAVE_KQUEUE) */