varnish-cache/bin/varnishd/waiter/cache_waiter_poll.c
0
/*-
1
 * Copyright (c) 2006 Verdens Gang AS
2
 * Copyright (c) 2006-2011 Varnish Software AS
3
 * All rights reserved.
4
 *
5
 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
6
 *
7
 * SPDX-License-Identifier: BSD-2-Clause
8
 *
9
 * Redistribution and use in source and binary forms, with or without
10
 * modification, are permitted provided that the following conditions
11
 * are met:
12
 * 1. Redistributions of source code must retain the above copyright
13
 *    notice, this list of conditions and the following disclaimer.
14
 * 2. Redistributions in binary form must reproduce the above copyright
15
 *    notice, this list of conditions and the following disclaimer in the
16
 *    documentation and/or other materials provided with the distribution.
17
 *
18
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
22
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28
 * SUCH DAMAGE.
29
 *
30
 */
31
32
#include "config.h"
33
34
#include <fcntl.h>
35
#include <poll.h>
36
#include <stdlib.h>
37
38
#include "cache/cache_varnishd.h"
39
40
#include "waiter/waiter.h"
41
#include "waiter/waiter_priv.h"
42
#include "vtim.h"
43
44
struct vwp {
45
        unsigned                magic;
46
#define VWP_MAGIC               0x4b2cc735
47
        struct waiter           *waiter;
48
49
        int                     pipes[2];
50
51
        pthread_t               thread;
52
        struct pollfd           *pollfd;
53
        struct waited           **idx;
54
        size_t                  npoll;
55
        size_t                  hpoll;
56
};
57
58
/*--------------------------------------------------------------------
59
 * It would make much more sense to not use two large vectors, but
60
 * the poll(2) API forces us to use at least one, so ... KISS.
61
 */
62
63
static void
64 250
vwp_extend_pollspace(struct vwp *vwp)
65
{
66
        size_t inc;
67
68 250
        if (vwp->npoll < (1<<12))
69 250
                inc = (1<<10);
70 0
        else if (vwp->npoll < (1<<14))
71 0
                inc = (1<<12);
72 0
        else if (vwp->npoll < (1<<16))
73 0
                inc = (1<<14);
74
        else
75 0
                inc = (1<<16);
76
77 500
        VSL(SLT_Debug, NO_VXID, "Acceptor poll space increased by %zu to %zu",
78 250
            inc, vwp->npoll + inc);
79
80 500
        vwp->pollfd = realloc(vwp->pollfd,
81 250
            (vwp->npoll + inc) * sizeof(*vwp->pollfd));
82 250
        AN(vwp->pollfd);
83 250
        memset(vwp->pollfd + vwp->npoll, 0, inc * sizeof(*vwp->pollfd));
84
85 250
        vwp->idx = realloc(vwp->idx, (vwp->npoll + inc) * sizeof(*vwp->idx));
86 250
        AN(vwp->idx);
87 250
        memset(vwp->idx + vwp->npoll, 0, inc * sizeof(*vwp->idx));
88
89 256250
        for (; inc > 0; inc--)
90 256000
                vwp->pollfd[vwp->npoll++].fd = -1;
91 250
}
92
93
/*--------------------------------------------------------------------*/
94
95
static void
96 2919
vwp_add(struct vwp *vwp, struct waited *wp)
97
{
98
99 2919
        CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
100 2919
        VSL(SLT_Debug, NO_VXID, "vwp: ADD %d", wp->fd);
101 2919
        CHECK_OBJ_NOTNULL(vwp, VWP_MAGIC);
102 2919
        if (vwp->hpoll == vwp->npoll)
103 0
                vwp_extend_pollspace(vwp);
104 2919
        assert(vwp->hpoll < vwp->npoll);
105 2919
        assert(vwp->pollfd[vwp->hpoll].fd == -1);
106 2919
        AZ(vwp->idx[vwp->hpoll]);
107 2919
        vwp->pollfd[vwp->hpoll].fd = wp->fd;
108 2919
        vwp->pollfd[vwp->hpoll].events = POLLIN;
109 2919
        vwp->idx[vwp->hpoll] = wp;
110 2919
        vwp->hpoll++;
111 2919
        Wait_HeapInsert(vwp->waiter, wp);
112 2919
}
113
114
static void
115 2772
vwp_del(struct vwp *vwp, int n)
116
{
117 2772
        vwp->hpoll--;
118 2772
        if (n != vwp->hpoll) {
119 691
                vwp->pollfd[n] = vwp->pollfd[vwp->hpoll];
120 691
                vwp->idx[n] = vwp->idx[vwp->hpoll];
121 691
        }
122 2772
        memset(&vwp->pollfd[vwp->hpoll], 0, sizeof(*vwp->pollfd));
123 2772
        vwp->pollfd[vwp->hpoll].fd = -1;
124 2772
        vwp->idx[vwp->hpoll] = NULL;
125 2772
}
126
127
/*--------------------------------------------------------------------*/
128
129
static void
130 2944
vwp_dopipe(struct vwp *vwp)
131
{
132
        struct waited *w[128];
133
        ssize_t ss;
134
        int i;
135
136 2944
        ss = read(vwp->pipes[0], w, sizeof w);
137 2944
        assert(ss > 0);
138 2944
        i = 0;
139 5863
        while (ss) {
140 2944
                if (w[i] == NULL) {
141 25
                        assert(ss == sizeof w[0]);
142 25
                        assert(vwp->hpoll == 1);
143 25
                        pthread_exit(NULL);
144
                }
145 2919
                CHECK_OBJ_NOTNULL(w[i], WAITED_MAGIC);
146 2919
                assert(w[i]->fd > 0);                   // no stdin
147 2919
                vwp_add(vwp, w[i++]);
148 2919
                ss -= sizeof w[0];
149
        }
150 2919
}
151
152
/*--------------------------------------------------------------------*/
153
154
static void *
155 0
vwp_main(void *priv)
156
{
157
        int t, v;
158
        struct vwp *vwp;
159
        struct waiter *w;
160
        struct waited *wp;
161
        double now, then;
162
        size_t z;
163
164 0
        THR_SetName("cache-poll");
165 0
        THR_Init();
166 0
        CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
167 0
        w = vwp->waiter;
168
169 5506
        while (1) {
170 5506
                then = Wait_HeapDue(w, &wp);
171 5506
                if (wp == NULL)
172 1517
                        t = -1;
173
                else
174 3989
                        t = (int)floor(1e3 * (then - VTIM_real()));
175 5506
                assert(vwp->hpoll > 0);
176 5506
                AN(vwp->pollfd);
177 5506
                v = poll(vwp->pollfd, vwp->hpoll, t);
178 5506
                assert(v >= 0);
179 5506
                now = VTIM_real();
180 5506
                if (vwp->pollfd[0].revents)
181 2944
                        v--;
182 9117
                for (z = 1; z < vwp->hpoll;) {
183 5437
                        assert(vwp->pollfd[z].fd != vwp->pipes[0]);
184 5437
                        wp = vwp->idx[z];
185 5437
                        CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
186
187 5437
                        if (v == 0 && Wait_HeapDue(w, NULL) > now)
188 1826
                                break;
189 3611
                        if (vwp->pollfd[z].revents)
190 2772
                                v--;
191 3611
                        then = Wait_When(wp);
192 3611
                        if (then <= now) {
193 0
                                AN(Wait_HeapDelete(w, wp));
194 0
                                Wait_Call(w, wp, WAITER_TIMEOUT, now);
195 0
                                vwp_del(vwp, z);
196 3611
                        } else if (vwp->pollfd[z].revents & POLLIN) {
197 2771
                                assert(wp->fd > 0);
198 2771
                                assert(wp->fd == vwp->pollfd[z].fd);
199 2771
                                AN(Wait_HeapDelete(w, wp));
200 2771
                                Wait_Call(w, wp, WAITER_ACTION, now);
201 2771
                                vwp_del(vwp, z);
202 2771
                        } else {
203 840
                                z++;
204
                        }
205
                }
206 5506
                if (vwp->pollfd[0].revents)
207 2944
                        vwp_dopipe(vwp);
208
        }
209
        NEEDLESS(return (NULL));
210
}
211
212
/*--------------------------------------------------------------------*/
213
214
static int v_matchproto_(waiter_enter_f)
215 2919
vwp_enter(void *priv, struct waited *wp)
216
{
217
        struct vwp *vwp;
218
219 2919
        CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
220
221 2919
        if (write(vwp->pipes[1], &wp, sizeof wp) != sizeof wp)
222 0
                return (-1);
223 2919
        return (0);
224 2919
}
225
226
/*--------------------------------------------------------------------*/
227
228
static void v_matchproto_(waiter_init_f)
229 250
vwp_init(struct waiter *w)
230
{
231
        struct vwp *vwp;
232
233 250
        CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
234 250
        vwp = w->priv;
235 250
        INIT_OBJ(vwp, VWP_MAGIC);
236 250
        vwp->waiter = w;
237 250
        AZ(pipe(vwp->pipes));
238
        // XXX: set write pipe non-blocking
239
240 250
        vwp->hpoll = 1;
241 250
        vwp_extend_pollspace(vwp);
242 250
        vwp->pollfd[0].fd = vwp->pipes[0];
243 250
        vwp->pollfd[0].events = POLLIN;
244 250
        PTOK(pthread_create(&vwp->thread, NULL, vwp_main, vwp));
245 250
}
246
247
/*--------------------------------------------------------------------
248
 * It is the callers responsibility to trigger all fd's waited on to
249
 * fail somehow.
250
 */
251
252
static void v_matchproto_(waiter_fini_f)
253 25
vwp_fini(struct waiter *w)
254
{
255
        struct vwp *vwp;
256
        void *vp;
257
258 25
        CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
259 25
        vp = NULL;
260 25
        while (vwp->hpoll > 1)
261 0
                (void)usleep(100000);
262
        // XXX: set write pipe blocking
263 25
        assert(write(vwp->pipes[1], &vp, sizeof vp) == sizeof vp);
264 25
        PTOK(pthread_join(vwp->thread, &vp));
265 25
        closefd(&vwp->pipes[0]);
266 25
        closefd(&vwp->pipes[1]);
267 25
        free(vwp->pollfd);
268 25
        free(vwp->idx);
269 25
}
270
271
/*--------------------------------------------------------------------*/
272
273
#include "waiter/mgt_waiter.h"
274
275
const struct waiter_impl waiter_poll = {
276
        .name =         "poll",
277
        .init =         vwp_init,
278
        .fini =         vwp_fini,
279
        .enter =        vwp_enter,
280
        .size =         sizeof(struct vwp),
281
};