varnish-cache/bin/varnishd/waiter/cache_waiter_poll.c
0
/*-
1
 * Copyright (c) 2006 Verdens Gang AS
2
 * Copyright (c) 2006-2011 Varnish Software AS
3
 * All rights reserved.
4
 *
5
 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
6
 *
7
 * SPDX-License-Identifier: BSD-2-Clause
8
 *
9
 * Redistribution and use in source and binary forms, with or without
10
 * modification, are permitted provided that the following conditions
11
 * are met:
12
 * 1. Redistributions of source code must retain the above copyright
13
 *    notice, this list of conditions and the following disclaimer.
14
 * 2. Redistributions in binary form must reproduce the above copyright
15
 *    notice, this list of conditions and the following disclaimer in the
16
 *    documentation and/or other materials provided with the distribution.
17
 *
18
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
22
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28
 * SUCH DAMAGE.
29
 *
30
 */
31
32
#include "config.h"
33
34
#include <fcntl.h>
35
#include <poll.h>
36
#include <stdlib.h>
37
38
#include "cache/cache_varnishd.h"
39
40
#include "waiter/waiter.h"
41
#include "waiter/waiter_priv.h"
42
#include "vtim.h"
43
44
struct vwp {
45
        unsigned                magic;
46
#define VWP_MAGIC               0x4b2cc735
47
        struct waiter           *waiter;
48
49
        int                     pipes[2];
50
51
        pthread_t               thread;
52
        struct pollfd           *pollfd;
53
        struct waited           **idx;
54
        size_t                  npoll;
55
        size_t                  hpoll;
56
};
57
58
/*--------------------------------------------------------------------
59
 * It would make much more sense to not use two large vectors, but
60
 * the poll(2) API forces us to use at least one, so ... KISS.
61
 */
62
63
static void
64 50
vwp_extend_pollspace(struct vwp *vwp)
65
{
66
        size_t inc;
67
68 50
        if (vwp->npoll < (1<<12))
69 50
                inc = (1<<10);
70 0
        else if (vwp->npoll < (1<<14))
71 0
                inc = (1<<12);
72 0
        else if (vwp->npoll < (1<<16))
73 0
                inc = (1<<14);
74
        else
75 0
                inc = (1<<16);
76
77 100
        VSL(SLT_Debug, NO_VXID, "Acceptor poll space increased by %zu to %zu",
78 50
            inc, vwp->npoll + inc);
79
80 100
        vwp->pollfd = realloc(vwp->pollfd,
81 50
            (vwp->npoll + inc) * sizeof(*vwp->pollfd));
82 50
        AN(vwp->pollfd);
83 50
        memset(vwp->pollfd + vwp->npoll, 0, inc * sizeof(*vwp->pollfd));
84
85 50
        vwp->idx = realloc(vwp->idx, (vwp->npoll + inc) * sizeof(*vwp->idx));
86 50
        AN(vwp->idx);
87 50
        memset(vwp->idx + vwp->npoll, 0, inc * sizeof(*vwp->idx));
88
89 51250
        for (; inc > 0; inc--)
90 51200
                vwp->pollfd[vwp->npoll++].fd = -1;
91 50
}
92
93
/*--------------------------------------------------------------------*/
94
95
static void
96 582
vwp_add(struct vwp *vwp, struct waited *wp)
97
{
98
99 582
        CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
100 582
        VSL(SLT_Debug, NO_VXID, "vwp: ADD %d", wp->fd);
101 582
        CHECK_OBJ_NOTNULL(vwp, VWP_MAGIC);
102 582
        if (vwp->hpoll == vwp->npoll)
103 0
                vwp_extend_pollspace(vwp);
104 582
        assert(vwp->hpoll < vwp->npoll);
105 582
        assert(vwp->pollfd[vwp->hpoll].fd == -1);
106 582
        AZ(vwp->idx[vwp->hpoll]);
107 582
        vwp->pollfd[vwp->hpoll].fd = wp->fd;
108 582
        vwp->pollfd[vwp->hpoll].events = POLLIN;
109 582
        vwp->idx[vwp->hpoll] = wp;
110 582
        vwp->hpoll++;
111 582
        Wait_HeapInsert(vwp->waiter, wp);
112 582
}
113
114
static void
115 554
vwp_del(struct vwp *vwp, int n)
116
{
117 554
        vwp->hpoll--;
118 554
        if (n != vwp->hpoll) {
119 84
                vwp->pollfd[n] = vwp->pollfd[vwp->hpoll];
120 84
                vwp->idx[n] = vwp->idx[vwp->hpoll];
121 84
        }
122 554
        memset(&vwp->pollfd[vwp->hpoll], 0, sizeof(*vwp->pollfd));
123 554
        vwp->pollfd[vwp->hpoll].fd = -1;
124 554
        vwp->idx[vwp->hpoll] = NULL;
125 554
}
126
127
/*--------------------------------------------------------------------*/
128
129
static void
130 587
vwp_dopipe(struct vwp *vwp)
131
{
132
        struct waited *w[128];
133
        ssize_t ss;
134
        int i;
135
136 587
        ss = read(vwp->pipes[0], w, sizeof w);
137 587
        assert(ss > 0);
138 587
        i = 0;
139 1169
        while (ss) {
140 587
                if (w[i] == NULL) {
141 5
                        assert(ss == sizeof w[0]);
142 5
                        assert(vwp->hpoll == 1);
143 5
                        pthread_exit(NULL);
144
                }
145 582
                CHECK_OBJ_NOTNULL(w[i], WAITED_MAGIC);
146 582
                assert(w[i]->fd > 0);                   // no stdin
147 582
                vwp_add(vwp, w[i++]);
148 582
                ss -= sizeof w[0];
149
        }
150 582
}
151
152
/*--------------------------------------------------------------------*/
153
154
static void *
155 0
vwp_main(void *priv)
156
{
157
        int t, v;
158
        struct vwp *vwp;
159
        struct waiter *w;
160
        struct waited *wp;
161
        double now, then;
162
        size_t z;
163
164 0
        THR_SetName("cache-poll");
165 0
        THR_Init();
166 0
        CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
167 0
        w = vwp->waiter;
168
169 1119
        while (1) {
170 1119
                then = Wait_HeapDue(w, &wp);
171 1119
                if (wp == NULL)
172 316
                        t = -1;
173
                else
174 803
                        t = (int)floor(1e3 * (then - VTIM_real()));
175 1119
                assert(vwp->hpoll > 0);
176 1119
                AN(vwp->pollfd);
177 1119
                v = poll(vwp->pollfd, vwp->hpoll, t);
178 1119
                assert(v >= 0);
179 1119
                now = VTIM_real();
180 1119
                if (vwp->pollfd[0].revents)
181 587
                        v--;
182 1901
                for (z = 1; z < vwp->hpoll;) {
183 1098
                        assert(vwp->pollfd[z].fd != vwp->pipes[0]);
184 1098
                        wp = vwp->idx[z];
185 1098
                        CHECK_OBJ_NOTNULL(wp, WAITED_MAGIC);
186
187 1098
                        if (v == 0 && Wait_HeapDue(w, NULL) > now)
188 316
                                break;
189 782
                        if (vwp->pollfd[z].revents)
190 554
                                v--;
191 782
                        then = Wait_When(wp);
192 782
                        if (then <= now) {
193 0
                                AN(Wait_HeapDelete(w, wp));
194 0
                                Wait_Call(w, wp, WAITER_TIMEOUT, now);
195 0
                                vwp_del(vwp, z);
196 782
                        } else if (vwp->pollfd[z].revents & POLLIN) {
197 554
                                assert(wp->fd > 0);
198 554
                                assert(wp->fd == vwp->pollfd[z].fd);
199 554
                                AN(Wait_HeapDelete(w, wp));
200 554
                                Wait_Call(w, wp, WAITER_ACTION, now);
201 554
                                vwp_del(vwp, z);
202 554
                        } else {
203 228
                                z++;
204
                        }
205
                }
206 1119
                if (vwp->pollfd[0].revents)
207 587
                        vwp_dopipe(vwp);
208
        }
209
        NEEDLESS(return (NULL));
210
}
211
212
/*--------------------------------------------------------------------*/
213
214
static int v_matchproto_(waiter_enter_f)
215 582
vwp_enter(void *priv, struct waited *wp)
216
{
217
        struct vwp *vwp;
218
219 582
        CAST_OBJ_NOTNULL(vwp, priv, VWP_MAGIC);
220
221 582
        if (write(vwp->pipes[1], &wp, sizeof wp) != sizeof wp)
222 0
                return (-1);
223 582
        return (0);
224 582
}
225
226
/*--------------------------------------------------------------------*/
227
228
static void v_matchproto_(waiter_init_f)
229 50
vwp_init(struct waiter *w)
230
{
231
        struct vwp *vwp;
232
233 50
        CHECK_OBJ_NOTNULL(w, WAITER_MAGIC);
234 50
        vwp = w->priv;
235 50
        INIT_OBJ(vwp, VWP_MAGIC);
236 50
        vwp->waiter = w;
237 50
        AZ(pipe(vwp->pipes));
238
        // XXX: set write pipe non-blocking
239
240 50
        vwp->hpoll = 1;
241 50
        vwp_extend_pollspace(vwp);
242 50
        vwp->pollfd[0].fd = vwp->pipes[0];
243 50
        vwp->pollfd[0].events = POLLIN;
244 50
        PTOK(pthread_create(&vwp->thread, NULL, vwp_main, vwp));
245 50
}
246
247
/*--------------------------------------------------------------------
248
 * It is the callers responsibility to trigger all fd's waited on to
249
 * fail somehow.
250
 */
251
252
static void v_matchproto_(waiter_fini_f)
253 5
vwp_fini(struct waiter *w)
254
{
255
        struct vwp *vwp;
256
        void *vp;
257
258 5
        CAST_OBJ_NOTNULL(vwp, w->priv, VWP_MAGIC);
259 5
        vp = NULL;
260 5
        while (vwp->hpoll > 1)
261 0
                (void)usleep(100000);
262
        // XXX: set write pipe blocking
263 5
        assert(write(vwp->pipes[1], &vp, sizeof vp) == sizeof vp);
264 5
        PTOK(pthread_join(vwp->thread, &vp));
265 5
        closefd(&vwp->pipes[0]);
266 5
        closefd(&vwp->pipes[1]);
267 5
        free(vwp->pollfd);
268 5
        free(vwp->idx);
269 5
}
270
271
/*--------------------------------------------------------------------*/
272
273
#include "waiter/mgt_waiter.h"
274
275
const struct waiter_impl waiter_poll = {
276
        .name =         "poll",
277
        .init =         vwp_init,
278
        .fini =         vwp_fini,
279
        .enter =        vwp_enter,
280
        .size =         sizeof(struct vwp),
281
};