varnish-cache/bin/varnishd/waiter/cache_waiter_poll.c
1
/*-
2
 * Copyright (c) 2006 Verdens Gang AS
3
 * Copyright (c) 2006-2011 Varnish Software AS
4
 * All rights reserved.
5
 *
6
 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
7
 *
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions
10
 * are met:
11
 * 1. Redistributions of source code must retain the above copyright
12
 *    notice, this list of conditions and the following disclaimer.
13
 * 2. Redistributions in binary form must reproduce the above copyright
14
 *    notice, this list of conditions and the following disclaimer in the
15
 *    documentation and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
21
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27
 * SUCH DAMAGE.
28
 *
29
 */
30
31
#include "config.h"
32
33
#include <fcntl.h>
34
#include <poll.h>
35
#include <stdlib.h>
36
37
#include "cache/cache_varnishd.h"
38
39
#include "waiter/waiter.h"
40
#include "waiter/waiter_priv.h"
41
#include "vtim.h"
42
43
struct vwp {
44
        unsigned                magic;
45
#define VWP_MAGIC               0x4b2cc735
46
        struct waiter           *waiter;
47
48
        int                     pipes[2];
49
50
        pthread_t               thread;
51
        struct pollfd           *pollfd;
52
        struct waited           **idx;
53
        size_t                  npoll;
54
        size_t                  hpoll;
55
};
56
57
/*--------------------------------------------------------------------
58
 * It would make much more sense to not use two large vectors, but
59
 * the poll(2) API forces us to use at least one, so ... KISS.
60
 */
61
62
static void
63 16
vwp_extend_pollspace(struct vwp *vwp)
64
{
65
        size_t inc;
66
67 16
        if (vwp->npoll < (1<<12))
68 16
                inc = (1<<10);
69 0
        else if (vwp->npoll < (1<<14))
70 0
                inc = (1<<12);
71 0
        else if (vwp->npoll < (1<<16))
72 0
                inc = (1<<14);
73
        else
74 0
                inc = (1<<16);
75
76 16
        VSL(SLT_Debug, 0, "Acceptor poll space increased by %zu to %zu",
77 16
            inc, vwp->npoll + inc);
78
79 16
        vwp->pollfd = realloc(vwp->pollfd,
80 16
            (vwp->npoll + inc) * sizeof(*vwp->pollfd));
81 16
        AN(vwp->pollfd);
82 16
        memset(vwp->pollfd + vwp->npoll, 0, inc * sizeof(*vwp->pollfd));
83
84 16
        vwp->idx = realloc(vwp->idx, (vwp->npoll + inc) * sizeof(*vwp->idx));
85 16
        AN(vwp->idx);
86 16
        memset(vwp->idx + vwp->npoll, 0, inc * sizeof(*vwp->idx));
87
88 16400
        for (; inc > 0; inc--)
89 16384
                vwp->pollfd[vwp->npoll++].fd = -1;
90 16
}
91
92
/*--------------------------------------------------------------------*/
93
94
static void
95 120
vwp_add(struct vwp *vwp, struct waited *wp)
96
{
97
98 120
VSL(SLT_Debug, wp->fd, "ADD");
99 120
        CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
100 120
        CHECK_OBJ_NOTNULL(vwp, VWP_MAGIC);
101 120
        if (vwp->hpoll == vwp->npoll)
102 0
                vwp_extend_pollspace(vwp);
103 120
        assert(vwp->hpoll < vwp->npoll);
104 120
        assert(vwp->pollfd[vwp->hpoll].fd == -1);
105 120
        AZ(vwp->idx[vwp->hpoll]);
106 120
        vwp->pollfd[vwp->hpoll].fd = wp->fd;
107 120
        vwp->pollfd[vwp->hpoll].events = POLLIN;
108 120
        vwp->idx[vwp->hpoll] = wp;
109 120
        vwp->hpoll++;
110 120
        Wait_HeapInsert(vwp->waiter, wp);
111 120
}
112
113
static void
114 114
vwp_del(struct vwp *vwp, int n)
115
{
116 114
        vwp->hpoll--;
117 114
        if (n != vwp->hpoll) {
118 23
                vwp->pollfd[n] = vwp->pollfd[vwp->hpoll];
119 23
                vwp->idx[n] = vwp->idx[vwp->hpoll];
120
        }
121 114
VSL(SLT_Debug, vwp->pollfd[vwp->hpoll].fd, "DEL");
122 114
        memset(&vwp->pollfd[vwp->hpoll], 0, sizeof(*vwp->pollfd));
123 114
        vwp->pollfd[vwp->hpoll].fd = -1;
124 114
        vwp->idx[vwp->hpoll] = NULL;
125 114
}
126
127
/*--------------------------------------------------------------------*/
128
129
static void
130 122
vwp_dopipe(struct vwp *vwp)
131
{
132
        struct waited *w[128];
133
        ssize_t ss;
134
        int i;
135
136 122
        ss = read(vwp->pipes[0], w, sizeof w);
137 122
        assert(ss > 0);
138 122
        i = 0;
139 364
        while (ss) {
140 122
                if (w[i] == NULL) {
141 2
                        assert(ss == sizeof w[0]);
142 2
                        assert(vwp->hpoll == 1);
143 2
                        pthread_exit(NULL);
144
                }
145 120
                CHECK_OBJ_NOTNULL(w[i], WAITED_MAGIC);
146 120
                assert(w[i]->fd > 0);                   // no stdin
147 120
                vwp_add(vwp, w[i++]);
148 120
                ss -= sizeof w[0];
149
        }
150 120
}
151
152
/*--------------------------------------------------------------------*/
153
154
static void *
155 16
vwp_main(void *priv)
156
{
157
        int v;
158
        struct vwp *vwp;
159
        struct waiter *w;
160
        struct waited *wp;
161
        double now, then;
162
        int i;
163
164 16
        THR_SetName("cache-poll");
165 16
        THR_Init();
166 16
        CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
167 16
        w = vwp->waiter;
168
169
        while (1) {
170 233
                then = Wait_HeapDue(w, &wp);
171 233
                if (wp == NULL)
172 82
                        i = -1;
173
                else
174 151
                        i = (int)floor(1e3 * (then - VTIM_real()));
175 233
                assert(vwp->hpoll > 0);
176 233
                AN(vwp->pollfd);
177 233
                v = poll(vwp->pollfd, vwp->hpoll, i);
178 219
                assert(v >= 0);
179 219
                now = VTIM_real();
180 219
                if (vwp->pollfd[0].revents)
181 122
                        v--;
182 572
                for (i = 1; i < vwp->hpoll;) {
183 190
                        assert(vwp->pollfd[i].fd != vwp->pipes[0]);
184 190
                        wp = vwp->idx[i];
185 190
                        CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
186
187 190
                        if (v == 0 && Wait_HeapDue(w, NULL) > now)
188 56
                                break;
189 134
                        if (vwp->pollfd[i].revents)
190 114
                                v--;
191 134
                        then = Wait_When(wp);
192 133
                        if (then <= now) {
193 0
                                AN(Wait_HeapDelete(w, wp));
194 0
                                Wait_Call(w, wp, WAITER_TIMEOUT, now);
195 0
                                vwp_del(vwp, i);
196 133
                        } else if (vwp->pollfd[i].revents & POLLIN) {
197 113
                                assert(wp->fd > 0);
198 113
                                assert(wp->fd == vwp->pollfd[i].fd);
199 113
                                AN(Wait_HeapDelete(w, wp));
200 114
                                Wait_Call(w, wp, WAITER_ACTION, now);
201 114
                                vwp_del(vwp, i);
202
                        } else {
203 20
                                i++;
204
                        }
205
                }
206 219
                if (vwp->pollfd[0].revents)
207 122
                        vwp_dopipe(vwp);
208 217
        }
209
        NEEDLESS(return NULL);
210
}
211
212
/*--------------------------------------------------------------------*/
213
214
static int v_matchproto_(waiter_enter_f)
215 120
vwp_enter(void *priv, struct waited *wp)
216
{
217
        struct vwp *vwp;
218
219 120
        CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
220
221 120
        if (write(vwp->pipes[1], &wp, sizeof wp) != sizeof wp)
222 0
                return (-1);
223 120
        return (0);
224
}
225
226
/*--------------------------------------------------------------------*/
227
228
static void v_matchproto_(waiter_init_f)
229 16
vwp_init(struct waiter *w)
230
{
231
        struct vwp *vwp;
232
233 16
        CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
234 16
        vwp = w->priv;
235 16
        INIT_OBJ(vwp, VWP_MAGIC);
236 16
        vwp->waiter = w;
237 16
        AZ(pipe(vwp->pipes));
238
        // XXX: set write pipe non-blocking
239
240 16
        vwp->hpoll = 1;
241 16
        vwp_extend_pollspace(vwp);
242 16
        vwp->pollfd[0].fd = vwp->pipes[0];
243 16
        vwp->pollfd[0].events = POLLIN;
244 16
        AZ(pthread_create(&vwp->thread, NULL, vwp_main, vwp));
245 16
}
246
247
/*--------------------------------------------------------------------
248
 * It is the callers responsibility to trigger all fd's waited on to
249
 * fail somehow.
250
 */
251
252
static void v_matchproto_(waiter_fini_f)
253 2
vwp_fini(struct waiter *w)
254
{
255
        struct vwp *vwp;
256
        void *vp;
257
258 2
        CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
259 2
        vp = NULL;
260 4
        while (vwp->hpoll > 1)
261 0
                (void)usleep(100000);
262
        // XXX: set write pipe blocking
263 2
        assert(write(vwp->pipes[1], &vp, sizeof vp) == sizeof vp);
264 2
        AZ(pthread_join(vwp->thread, &vp));
265 2
        closefd(&vwp->pipes[0]);
266 2
        closefd(&vwp->pipes[1]);
267 2
        free(vwp->pollfd);
268 2
        free(vwp->idx);
269 2
}
270
271
/*--------------------------------------------------------------------*/
272
273
#include "waiter/mgt_waiter.h"
274
275
const struct waiter_impl waiter_poll = {
276
        .name =         "poll",
277
        .init =         vwp_init,
278
        .fini =         vwp_fini,
279
        .enter =        vwp_enter,
280
        .size =         sizeof(struct vwp),
281
};