varnish-cache/bin/varnishd/waiter/cache_waiter_poll.c
0
/*-
1
 * Copyright (c) 2006 Verdens Gang AS
2
 * Copyright (c) 2006-2011 Varnish Software AS
3
 * All rights reserved.
4
 *
5
 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
6
 *
7
 * SPDX-License-Identifier: BSD-2-Clause
8
 *
9
 * Redistribution and use in source and binary forms, with or without
10
 * modification, are permitted provided that the following conditions
11
 * are met:
12
 * 1. Redistributions of source code must retain the above copyright
13
 *    notice, this list of conditions and the following disclaimer.
14
 * 2. Redistributions in binary form must reproduce the above copyright
15
 *    notice, this list of conditions and the following disclaimer in the
16
 *    documentation and/or other materials provided with the distribution.
17
 *
18
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
22
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28
 * SUCH DAMAGE.
29
 *
30
 */
31
32
#include "config.h"
33
34
#include <fcntl.h>
35
#include <poll.h>
36
#include <stdlib.h>
37
38
#include "cache/cache_varnishd.h"
39
40
#include "waiter/waiter.h"
41
#include "waiter/waiter_priv.h"
42
#include "vtim.h"
43
44
struct vwp {
45
        unsigned                magic;
46
#define VWP_MAGIC               0x4b2cc735
47
        struct waiter           *waiter;
48
49
        int                     pipes[2];
50
51
        pthread_t               thread;
52
        struct pollfd           *pollfd;
53
        struct waited           **idx;
54
        size_t                  npoll;
55
        size_t                  hpoll;
56
};
57
58
/*--------------------------------------------------------------------
59
 * It would make much more sense to not use two large vectors, but
60
 * the poll(2) API forces us to use at least one, so ... KISS.
61
 */
62
63
static void
64 400
vwp_extend_pollspace(struct vwp *vwp)
65
{
66
        size_t inc;
67
68 400
        if (vwp->npoll < (1<<12))
69 400
                inc = (1<<10);
70 0
        else if (vwp->npoll < (1<<14))
71 0
                inc = (1<<12);
72 0
        else if (vwp->npoll < (1<<16))
73 0
                inc = (1<<14);
74
        else
75 0
                inc = (1<<16);
76
77 800
        VSL(SLT_Debug, NO_VXID, "Acceptor poll space increased by %zu to %zu",
78 400
            inc, vwp->npoll + inc);
79
80 800
        vwp->pollfd = realloc(vwp->pollfd,
81 400
            (vwp->npoll + inc) * sizeof(*vwp->pollfd));
82 400
        AN(vwp->pollfd);
83 400
        memset(vwp->pollfd + vwp->npoll, 0, inc * sizeof(*vwp->pollfd));
84
85 400
        vwp->idx = realloc(vwp->idx, (vwp->npoll + inc) * sizeof(*vwp->idx));
86 400
        AN(vwp->idx);
87 400
        memset(vwp->idx + vwp->npoll, 0, inc * sizeof(*vwp->idx));
88
89 410000
        for (; inc > 0; inc--)
90 409600
                vwp->pollfd[vwp->npoll++].fd = -1;
91 400
}
92
93
/*--------------------------------------------------------------------*/
94
95
static void
96 4678
vwp_add(struct vwp *vwp, struct waited *wp)
97
{
98
99 4678
        CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
100 4678
        VSL(SLT_Debug, NO_VXID, "vwp: ADD %d", wp->fd);
101 4678
        CHECK_OBJ_NOTNULL(vwp, VWP_MAGIC);
102 4678
        if (vwp->hpoll == vwp->npoll)
103 0
                vwp_extend_pollspace(vwp);
104 4678
        assert(vwp->hpoll < vwp->npoll);
105 4678
        assert(vwp->pollfd[vwp->hpoll].fd == -1);
106 4678
        AZ(vwp->idx[vwp->hpoll]);
107 4678
        vwp->pollfd[vwp->hpoll].fd = wp->fd;
108 4678
        vwp->pollfd[vwp->hpoll].events = POLLIN;
109 4678
        vwp->idx[vwp->hpoll] = wp;
110 4678
        vwp->hpoll++;
111 4678
        Wait_HeapInsert(vwp->waiter, wp);
112 4678
}
113
114
static void
115 4440
vwp_del(struct vwp *vwp, int n)
116
{
117 4440
        vwp->hpoll--;
118 4440
        if (n != vwp->hpoll) {
119 668
                vwp->pollfd[n] = vwp->pollfd[vwp->hpoll];
120 668
                vwp->idx[n] = vwp->idx[vwp->hpoll];
121 668
        }
122 4440
        memset(&vwp->pollfd[vwp->hpoll], 0, sizeof(*vwp->pollfd));
123 4440
        vwp->pollfd[vwp->hpoll].fd = -1;
124 4440
        vwp->idx[vwp->hpoll] = NULL;
125 4440
}
126
127
/*--------------------------------------------------------------------*/
128
129
static void
130 4718
vwp_dopipe(struct vwp *vwp)
131
{
132
        struct waited *w[128];
133
        ssize_t ss;
134
        int i;
135
136 4718
        ss = read(vwp->pipes[0], w, sizeof w);
137 4718
        assert(ss > 0);
138 4718
        i = 0;
139 9396
        while (ss) {
140 4718
                if (w[i] == NULL) {
141 40
                        assert(ss == sizeof w[0]);
142 40
                        assert(vwp->hpoll == 1);
143 40
                        pthread_exit(NULL);
144
                }
145 4678
                CHECK_OBJ_NOTNULL(w[i], WAITED_MAGIC);
146 4678
                assert(w[i]->fd > 0);                   // no stdin
147 4678
                vwp_add(vwp, w[i++]);
148 4678
                ss -= sizeof w[0];
149
        }
150 4678
}
151
152
/*--------------------------------------------------------------------*/
153
154
static void *
155 0
vwp_main(void *priv)
156
{
157
        int t, v;
158
        struct vwp *vwp;
159
        struct waiter *w;
160
        struct waited *wp;
161
        double now, then;
162
        size_t z;
163
164 0
        THR_SetName("cache-poll");
165 0
        THR_Init();
166 0
        CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
167 0
        w = vwp->waiter;
168
169 8936
        while (1) {
170 8936
                then = Wait_HeapDue(w, &wp);
171 8936
                if (wp == NULL)
172 2407
                        t = -1;
173
                else
174 6529
                        t = (int)floor(1e3 * (then - VTIM_real()));
175 8936
                assert(vwp->hpoll > 0);
176 8936
                AN(vwp->pollfd);
177 8936
                v = poll(vwp->pollfd, vwp->hpoll, t);
178 8936
                assert(v >= 0);
179 8936
                now = VTIM_real();
180 8936
                if (vwp->pollfd[0].revents)
181 4718
                        v--;
182 15399
                for (z = 1; z < vwp->hpoll;) {
183 9098
                        assert(vwp->pollfd[z].fd != vwp->pipes[0]);
184 9098
                        wp = vwp->idx[z];
185 9098
                        CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
186
187 9098
                        if (v == 0 && Wait_HeapDue(w, NULL) > now)
188 2635
                                break;
189 6463
                        if (vwp->pollfd[z].revents)
190 4439
                                v--;
191 6463
                        then = Wait_When(wp);
192 6463
                        if (then <= now) {
193 0
                                AN(Wait_HeapDelete(w, wp));
194 0
                                Wait_Call(w, wp, WAITER_TIMEOUT, now);
195 0
                                vwp_del(vwp, z);
196 6463
                        } else if (vwp->pollfd[z].revents & POLLIN) {
197 4438
                                assert(wp->fd > 0);
198 4438
                                assert(wp->fd == vwp->pollfd[z].fd);
199 4438
                                AN(Wait_HeapDelete(w, wp));
200 4438
                                Wait_Call(w, wp, WAITER_ACTION, now);
201 4438
                                vwp_del(vwp, z);
202 4438
                        } else {
203 2025
                                z++;
204
                        }
205
                }
206 8936
                if (vwp->pollfd[0].revents)
207 4718
                        vwp_dopipe(vwp);
208
        }
209
        NEEDLESS(return (NULL));
210
}
211
212
/*--------------------------------------------------------------------*/
213
214
static int v_matchproto_(waiter_enter_f)
215 4678
vwp_enter(void *priv, struct waited *wp)
216
{
217
        struct vwp *vwp;
218
219 4678
        CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
220
221 4678
        if (write(vwp->pipes[1], &wp, sizeof wp) != sizeof wp)
222 0
                return (-1);
223 4678
        return (0);
224 4678
}
225
226
/*--------------------------------------------------------------------*/
227
228
static void v_matchproto_(waiter_init_f)
229 400
vwp_init(struct waiter *w)
230
{
231
        struct vwp *vwp;
232
233 400
        CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
234 400
        vwp = w->priv;
235 400
        INIT_OBJ(vwp, VWP_MAGIC);
236 400
        vwp->waiter = w;
237 400
        AZ(pipe(vwp->pipes));
238
        // XXX: set write pipe non-blocking
239
240 400
        vwp->hpoll = 1;
241 400
        vwp_extend_pollspace(vwp);
242 400
        vwp->pollfd[0].fd = vwp->pipes[0];
243 400
        vwp->pollfd[0].events = POLLIN;
244 400
        PTOK(pthread_create(&vwp->thread, NULL, vwp_main, vwp));
245 400
}
246
247
/*--------------------------------------------------------------------
248
 * It is the callers responsibility to trigger all fd's waited on to
249
 * fail somehow.
250
 */
251
252
static void v_matchproto_(waiter_fini_f)
253 40
vwp_fini(struct waiter *w)
254
{
255
        struct vwp *vwp;
256
        void *vp;
257
258 40
        CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
259 40
        vp = NULL;
260 40
        while (vwp->hpoll > 1)
261 0
                (void)usleep(100000);
262
        // XXX: set write pipe blocking
263 40
        assert(write(vwp->pipes[1], &vp, sizeof vp) == sizeof vp);
264 40
        PTOK(pthread_join(vwp->thread, &vp));
265 40
        closefd(&vwp->pipes[0]);
266 40
        closefd(&vwp->pipes[1]);
267 40
        free(vwp->pollfd);
268 40
        free(vwp->idx);
269 40
}
270
271
/*--------------------------------------------------------------------*/
272
273
#include "waiter/mgt_waiter.h"
274
275
const struct waiter_impl waiter_poll = {
276
        .name =         "poll",
277
        .init =         vwp_init,
278
        .fini =         vwp_fini,
279
        .enter =        vwp_enter,
280
        .size =         sizeof(struct vwp),
281
};