varnish-cache/bin/varnishd/http1/cache_http1_pipe.c
1
/*-
2
 * Copyright (c) 2006 Verdens Gang AS
3
 * Copyright (c) 2006-2011 Varnish Software AS
4
 * All rights reserved.
5
 *
6
 * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
7
 *
8
 * Redistribution and use in source and binary forms, with or without
9
 * modification, are permitted provided that the following conditions
10
 * are met:
11
 * 1. Redistributions of source code must retain the above copyright
12
 *    notice, this list of conditions and the following disclaimer.
13
 * 2. Redistributions in binary form must reproduce the above copyright
14
 *    notice, this list of conditions and the following disclaimer in the
15
 *    documentation and/or other materials provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
 * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
21
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27
 * SUCH DAMAGE.
28
 *
29
 * XXX: charge bytes to srcaddr
30
 */
31
32
#include "config.h"
33
34
#include "cache/cache_varnishd.h"
35
36
#include <poll.h>
37
#include <stdio.h>
38
39
#include "cache_http1.h"
40
41
#include "VSC_vbe.h"
42
43
static struct lock pipestat_mtx;
44
45
static int
46 31
rdf(int fd0, int fd1, uint64_t *pcnt)
47
{
48
        int i, j;
49
        char buf[BUFSIZ], *p;
50
51 31
        i = read(fd0, buf, sizeof buf);
52 31
        if (i <= 0)
53 18
                return (1);
54 26
        for (p = buf; i > 0; i -= j, p += j) {
55 13
                j = write(fd1, p, i);
56 13
                if (j <= 0)
57 0
                        return (1);
58 13
                *pcnt += j;
59 13
                if (i != j)
60 0
                        (void)usleep(100000);           /* XXX hack */
61
        }
62 13
        return (0);
63
}
64
65
void
66 10
V1P_Charge(struct req *req, const struct v1p_acct *a, struct VSC_vbe *b)
67
{
68
69 10
        AN(b);
70 10
        VSLb(req->vsl, SLT_PipeAcct, "%ju %ju %ju %ju",
71
            (uintmax_t)a->req,
72
            (uintmax_t)a->bereq,
73
            (uintmax_t)a->in,
74
            (uintmax_t)a->out);
75
76 10
        Lck_Lock(&pipestat_mtx);
77 10
        VSC_C_main->s_pipe_hdrbytes += a->req;
78 10
        VSC_C_main->s_pipe_in += a->in;
79 10
        VSC_C_main->s_pipe_out += a->out;
80 10
        b->pipe_hdrbytes += a->bereq;
81 10
        b->pipe_out += a->in;
82 10
        b->pipe_in += a->out;
83 10
        Lck_Unlock(&pipestat_mtx);
84 10
}
85
86
void
87 9
V1P_Process(struct req *req, int fd, struct v1p_acct *v1a)
88
{
89
        struct pollfd fds[2];
90
        int i, j;
91
92 9
        CHECK_OBJ_NOTNULL(req, REQ_MAGIC);
93 9
        CHECK_OBJ_NOTNULL(req->sp, SESS_MAGIC);
94 9
        assert(fd > 0);
95
96 9
        if (req->htc->pipeline_b != NULL) {
97 3
                j = write(fd,  req->htc->pipeline_b,
98 3
                    req->htc->pipeline_e - req->htc->pipeline_b);
99 3
                if (j < 0)
100 9
                        return;
101 3
                req->htc->pipeline_b = NULL;
102 3
                req->htc->pipeline_e = NULL;
103 3
                v1a->in += j;
104
        }
105 9
        memset(fds, 0, sizeof fds);
106 9
        fds[0].fd = fd;
107 9
        fds[0].events = POLLIN | POLLERR;
108 9
        fds[1].fd = req->sp->fd;
109 9
        fds[1].events = POLLIN | POLLERR;
110
111 40
        while (fds[0].fd > -1 || fds[1].fd > -1) {
112 31
                fds[0].revents = 0;
113 31
                fds[1].revents = 0;
114 31
                i = poll(fds, 2,
115 31
                    (int)(cache_param->pipe_timeout * 1e3));
116 31
                if (i < 1)
117 0
                        break;
118 51
                if (fds[0].revents &&
119 20
                    rdf(fd, req->sp->fd, &v1a->out)) {
120 9
                        if (fds[1].fd == -1)
121 0
                                break;
122 9
                        (void)shutdown(fd, SHUT_RD);
123 9
                        (void)shutdown(req->sp->fd, SHUT_WR);
124 9
                        fds[0].events = 0;
125 9
                        fds[0].fd = -1;
126
                }
127 42
                if (fds[1].revents &&
128 11
                    rdf(req->sp->fd, fd, &v1a->in)) {
129 9
                        if (fds[0].fd == -1)
130 9
                                break;
131 0
                        (void)shutdown(req->sp->fd, SHUT_RD);
132 0
                        (void)shutdown(fd, SHUT_WR);
133 0
                        fds[1].events = 0;
134 0
                        fds[1].fd = -1;
135
                }
136
        }
137
}
138
139
/*--------------------------------------------------------------------*/
140
141
void
142 614
V1P_Init(void)
143
{
144
145 614
        Lck_New(&pipestat_mtx, lck_pipestat);
146 614
}