r1641 - trunk/varnish-tools/fetcher

des at projects.linpro.no des at projects.linpro.no
Wed Jul 4 15:15:32 CEST 2007


Author: des
Date: 2007-07-04 15:15:32 +0200 (Wed, 04 Jul 2007)
New Revision: 1641

Added:
   trunk/varnish-tools/fetcher/fetcher.pl
Removed:
   trunk/varnish-tools/fetcher/fetcher.c
Modified:
   trunk/varnish-tools/fetcher/Makefile
Log:
Replace the old C fetcher with a Perl script which recursively retrieves the
specified URLs using multiple paralell clients.


Modified: trunk/varnish-tools/fetcher/Makefile
===================================================================
--- trunk/varnish-tools/fetcher/Makefile	2007-07-03 22:24:09 UTC (rev 1640)
+++ trunk/varnish-tools/fetcher/Makefile	2007-07-04 13:15:32 UTC (rev 1641)
@@ -2,8 +2,10 @@
 # $Id$
 #
 
-PROG	 = fetcher
-WARNS	?= 6
-MAN	 =
+PREFIX?=	${HOME}
+BINDIR?=	${DESTDIR}${PREFIX}/bin
 
-.include <bsd.prog.mk>
+all:
+
+install:
+	install -m 0755 fetcher.pl ${BINDIR}/fetcher

Deleted: trunk/varnish-tools/fetcher/fetcher.c
===================================================================
--- trunk/varnish-tools/fetcher/fetcher.c	2007-07-03 22:24:09 UTC (rev 1640)
+++ trunk/varnish-tools/fetcher/fetcher.c	2007-07-04 13:15:32 UTC (rev 1641)
@@ -1,220 +0,0 @@
-/*
- * $Id$
- */
-
-#include <sys/param.h>
-#include <sys/socket.h>
-#include <sys/time.h>
-
-#include <err.h>
-#include <netdb.h>
-#include <stdio.h>
-#include <fetch.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-
-static int random_order;
-static int verbose;
-
-static char data[8192];
-
-static const char *
-read_line(FILE *f)
-{
-	static char *buf;
-	static size_t bufsz;
-	const char *line;
-	size_t len;
-
-	if ((line = fgetln(f, &len)) == NULL)
-		return (NULL);
-	while (len && (line[len - 1] == '\r' || line[len - 1] == '\n'))
-		--len;
-	if (bufsz < len + 1) {
-		bufsz = len * 2;
-		if ((buf = realloc(buf, bufsz)) == NULL)
-			err(1, "realloc()");
-	}
-	memcpy(buf, line, len);
-	buf[len] = '\0';
-	if (verbose)
-		fprintf(stderr, "<<< [%s]\n", buf);
-	return (buf);
-}
-
-static int
-open_socket(const char *host, const char *port)
-{
-	struct addrinfo hints, *res;
-	int error, sd;
-
-	/* connect to accelerator */
-	memset(&hints, 0, sizeof(hints));
-	hints.ai_family = AF_INET;
-	hints.ai_socktype = SOCK_STREAM;
-	if ((error = getaddrinfo(host, port, &hints, &res)) != 0)
-		errx(1, "%s", gai_strerror(error));
-	if ((sd = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0)
-		err(1, "socket()");
-	if (connect(sd, res->ai_addr, res->ai_addrlen) < 0)
-		err(1, "connect()");
-	return (sd);
-}
-
-static const char HEAD[] = "HEAD";
-static const char GET[] = "GET";
-
-static int reqcount;
-
-static void
-send_request(FILE *f, const char *method, const char *host, const char *url)
-{
-	static const char *req_pattern =
-	    "%s %s HTTP/1.1\r\n"
-	    "Host: %s\r\n"
-	    "Connection: Keep-Alive\r\n"
-	    "\r\n";
-
-	reqcount++;
-
-	/* send request */
-	if (fprintf(f, req_pattern, method, url, host) < 0)
-		errx(1, "fprintf()");
-	if (verbose)
-		fprintf(stderr, req_pattern, method, url, host);
-}
-
-static int respcount;
-
-static void
-receive_response(FILE *f, const char *method)
-{
-	const char *line;
-	size_t clen, rlen;
-	int code;
-
-	respcount++;
-
-	/* get response header */
-	if ((line = read_line(f)) == NULL)
-		errx(1, "protocol error");
-	if (sscanf(line, "HTTP/%*d.%*d %d %*s", &code) != 1)
-		errx(1, "protocol error");
-	if (code != 200)
-		errx(1, "code %d", code);
-
-	/* get content-length */
-	clen = 0;
-	for (;;) {
-		if ((line = read_line(f)) == NULL)
-			errx(1, "protocol error");
-		if (line[0] == '\0')
-			break;
-		sscanf(line, "Content-Length: %zu\n", &clen);
-	}
-
-	/* eat contents */
-	if (method == HEAD)
-		return;
-	while (clen > 0) {
-		rlen = clen > sizeof(data) ? sizeof(data) : clen;
-		rlen = fread(data, 1, rlen, f);
-		if (rlen == 0)
-			err(1, "fread()");
-		if (verbose)
-			fprintf(stderr, "read %zu bytes\n", rlen);
-		clen -= rlen;
-	}
-}
-
-static volatile sig_atomic_t got_sig;
-
-static void
-handler(int sig)
-{
-	got_sig = sig;
-}
-
-static void
-usage(void)
-{
-	fprintf(stderr, "usage: fetcher [-h]\n");
-	exit(1);
-}
-
-#define MAX_CTR 100000
-
-int
-main(int argc, char *argv[])
-{
-	struct timeval start, stop;
-	double elapsed;
-	char url[PATH_MAX];
-	int opt, sd;
-	FILE *f;
-
-	const char *method = GET;
-	const char *host = "varnish-test-1.linpro.no";
-	const char *url_pattern = "/cgi-bin/recursor.pl?foo=%d";
-	int ctr = MAX_CTR * 10;
-	int depth = 1;
-
-	while ((opt = getopt(argc, argv, "c:d:hrv")) != -1)
-		switch (opt) {
-		case 'c':
-			ctr = atoi(optarg);
-			break;
-		case 'd':
-			depth = atoi(optarg);
-			break;
-		case 'h':
-			method = HEAD;
-			break;
-		case 'r':
-			random_order++;
-			break;
-		case 'v':
-			verbose++;
-			break;
-		default:
-			usage();
-		}
-
-	argc -= optind;
-	argv += optind;
-
-	if (argc != 0)
-		usage();
-
-	if (random_order)
-		srandomdev();
-
-	sd = open_socket("varnish-test-2.linpro.no", "8080");
-	if ((f = fdopen(sd, "w+")) == NULL)
-		err(1, "fdopen()");
-
-	got_sig = 0;
-	signal(SIGINT, handler);
-	signal(SIGTERM, handler);
-	gettimeofday(&start, NULL);
-	while (respcount < ctr && !got_sig) {
-		while (reqcount < ctr && reqcount - respcount < depth && !got_sig) {
-			int serial = (random_order ? random() : reqcount) % MAX_CTR;
-			if (!verbose && (random_order || (reqcount % 29) == 0))
-				fprintf(stderr, "\r%d ", serial);
-			snprintf(url, sizeof url, url_pattern, serial);
-			send_request(f, method, host, url);
-		}
-		receive_response(f, method);
-	}
-	gettimeofday(&stop, NULL);
-	fclose(f);
-
-	elapsed = (stop.tv_sec * 1000000.0 + stop.tv_usec) -
-	    (start.tv_sec * 1000000.0 + start.tv_usec);
-	fprintf(stderr, "%d requests in %.3f seconds (%d rps)\n",
-	    reqcount, elapsed / 1000000, (int)(reqcount / (elapsed / 1000000)));
-
-	exit(got_sig);
-}

Added: trunk/varnish-tools/fetcher/fetcher.pl
===================================================================
--- trunk/varnish-tools/fetcher/fetcher.pl	                        (rev 0)
+++ trunk/varnish-tools/fetcher/fetcher.pl	2007-07-04 13:15:32 UTC (rev 1641)
@@ -0,0 +1,199 @@
+#!/usr/bin/perl -w
+#-
+# Copyright (c) 2007 Linpro AS
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+#    notice, this list of conditions and the following disclaimer
+#    in this position and unchanged.
+# 2. Redistributions in binary form must reproduce the above copyright
+#    notice, this list of conditions and the following disclaimer in the
+#    documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+#
+# $Id$
+#
+
+package Varnish::Fetcher;
+
+use strict;
+use Getopt::Long;
+use IO::Handle;
+use IO::Multiplex;
+use LWP::UserAgent;
+use Socket;
+use URI;
+
+our %TODO;
+our %DONE;
+our %CHILD;
+our $BUSY;
+
+sub new($$) {
+    my ($this, $mux, $fh) = @_;
+    my $class = ref($this) || $this;
+
+    bless {
+	'mux' => $mux,
+	'fh' => $fh,
+	'url' => undef,
+    };
+}
+
+sub run($$) {
+    my ($self, $s) = @_;
+
+    my $ua = new LWP::UserAgent();
+    for (;;) {
+	$0 = "[fetcher] idle";
+	my $url = <$s>;
+	exit(0)
+	    unless defined($url);
+	chomp($url);
+	die "no more work\n"
+	    if $url eq "done";
+	$0 = "[fetcher] requesting $url";
+	print STDERR "Retrieving $url\n";
+	my $resp = $ua->get($url);
+	$0 = "[fetcher] checking $url";
+	if ($resp->header('Content-Type') =~ m/^text\//) {
+	    my %urls = map { $_ => 1 }
+	    ($resp->content =~ m/\b(?:href|src)=[\'\"](.+?)[\'\"]/g);
+	    foreach (keys(%urls)) {
+		$s->write("add $_\n");
+	    }
+	}
+	$0 = "[fetcher] ready";
+	$s->write("ready\n");
+    }
+}
+
+sub send_url($) {
+    my ($child) = @_;
+
+    die "child busy\n"
+	if $child->{'url'};
+    return undef
+	unless (keys(%TODO));
+    my $url = (keys(%TODO))[0];
+    delete $TODO{$url};
+    $DONE{$url} = 1;
+    $child->{'url'} = $url;
+    $child->{'mux'}->write($child->{'fh'}, "$url\n");
+    ++$BUSY;
+}
+
+sub get_url($$) {
+    my ($child, $url) = @_;
+
+    die "child not busy\n"
+	unless $child->{'url'};
+    my $uri = URI->new_abs($1, $child->{'url'});
+    $url = $uri->canonical;
+    if ($uri->scheme() ne 'http' ||
+	$uri->host_port() ne URI->new($child->{'url'})->host_port()) {
+	print STDERR "Rejected $url\n";
+	return;
+    }
+    return if $TODO{$url} || $DONE{$url};
+    $TODO{$url} = 1;
+}
+
+sub mux_input($$$$) {
+    my ($child, $mux, $fh, $input) = @_;
+
+    die "unknown child\n"
+	unless $child;
+
+    while ($$input =~ s/^(.*?)\n//) {
+	my $line = $1;
+	if ($line eq "ready") {
+	    $$child{'url'} = '';
+	    --$BUSY;
+	    $mux->endloop();
+	} elsif ($line =~ m/^add (.*?)$/) {
+	    get_url($child, $1);
+	} else {
+	    die "can't grok [$line]\n";
+	}
+    }
+}
+
+sub fetcher($@) {
+    my ($n, @urls) = @_;
+
+    my $mux = new IO::Multiplex;
+
+    # prepare work queue
+    foreach my $url (@urls) {
+	$TODO{URI->new($url)->canonical} = 1;
+    }
+
+    # start children
+    $BUSY = 0;
+    for (my $i = 0; $i < $n; ++$i) {
+	my ($s1, $s2);
+	socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, PF_UNSPEC);
+	$s1->autoflush(1);
+	$s2->autoflush(1);
+	my $child = __PACKAGE__->new($mux, $s1);
+	my $pid = fork();
+	last unless defined($pid);
+	if ($pid == 0) {
+	    close($s1);
+	    $child->run($s2);
+	    die "not reachable";
+	} else {
+	    close($s2);
+	    $CHILD{$i} = $child;
+	    $mux->add($s1);
+	    $mux->set_callback_object($child, $s1);
+	}
+    }
+
+    # main loop
+    for (;;) {
+	foreach my $child (values(%CHILD)) {
+	    $child->send_url()
+		unless $child->{'url'};
+	}
+	last unless $BUSY;
+	$mux->loop();
+    }
+
+    # done
+    foreach my $child (values(%CHILD)) {
+	$mux->close($$child{'fh'});
+    }
+}
+
+sub usage() {
+
+    print STDERR "usage: $0 [-j n] URL ...\n";
+    exit(1);
+}
+
+MAIN:{
+    my $jobs = 1;
+    GetOptions("j|jobs=i" => \$jobs)
+	or usage();
+    $jobs > 0
+	or usage();
+    @ARGV
+	or usage();
+    fetcher($jobs, @ARGV);
+}


Property changes on: trunk/varnish-tools/fetcher/fetcher.pl
___________________________________________________________________
Name: svn:executable
   + *
Name: svn:keywords
   + Id




More information about the varnish-commit mailing list