#!/usr/bin/perl -w

require 'sys/syscall.ph';
require 'sys/resource.ph';

my $maxdelay = 10; # transaction must be emitted after this many seconds

my $peers;
my $server;
my $client;
my @msgs;
my %pidtype;

$| = 1;

sub start_n {
    my($n) = @_;

    @msgs = ();
    $peers = [ ];

    my $i;
    for($i = 0; $i < $n; $i++){
        my $port = 40000 + int(rand() * 20000);
        my $id = int(rand() * 2000000);
        $peers->[$i] = { port => $port,
                         id => $id };
    }

    for($i = 0; $i < $n; $i++){
        my @args;
        my $j;
        for($j = 0; $j < $n; $j++){
            if($i != $j){
                push(@args, "127.0.0.1");
                push(@args, $peers->[$j]{port});
            }
        }

        my $of = "/tmp/tt-$$-$<-$i";
        $peers->[$i]{file} = $of;

        my $pid = forkexec($of, $server, $peers->[$i]{id}, $peers->[$i]{port},
                           @args);
        $peers->[$i]{pid} = $pid;
        $pidtype{$pid} = "server";
    }
}

sub forkexec {
    my($of, @args) = @_;

    if($pid = fork){
	return $pid;
    } elsif(defined $pid){
        my $rl = pack("IIII", 0, 0, 0, 0);
        syscall(&SYS_setrlimit, &RLIMIT_CORE, $rl);
        close(STDIN);
        open(STDIN, "</dev/null");
	close(STDOUT);
	open(STDOUT, ">$of");
	exec @args;
	die "test-ticker: cannot exec $args[0]\n";
	exit(0);
    } else {
	die "test-ticker: fork failed: $!\n";
    }
}

sub waitx {
    while(1){
        my $x = waitpid(-1, 1); # non-blocking
        if(defined($x) && $x > 0){
            if($? & 0200){
                printf(STDERR "test-ticker: %s crashed\n",
                       $pidtype{$x});
            }
        } else {
            last;
        }
    }
}

sub submit_one {
    my $np = $#{$peers} + 1;
    my $pi;
    while(1){
        $pi = int(rand($np));
        last if !defined($peers->[$pi]{dead});
    }
    my $port = $peers->[$pi]{port};
    my $msg = "t" . int(rand() * 2000000) . "x";
    push(@msgs, $msg);
    my $pid = forkexec("/dev/null", $client, "127.0.0.1", $port, $msg);
    $pidtype{$pid} = "client";
}

sub submit_n {
    my($n) = @_;
    my $i;
    for($i = 0; $i < $n; $i++){
        submit_one();
    }
    waitx();
}

# Returns "" if ok, otherwise an error message.
# Just looks at the first $nm messages.
sub analyze1 {
    my($nm) = @_;
    my $np = $#{$peers} + 1;
    my $pl = [ ];
    my @ml = @msgs[0..($nm-1)];
    my $ns = 0;
    my $i;

    for($i = 0; $i < $np; $i++){
        $pl->[$i] = [ ];
        my $f = $peers->[$i]{file};
        die "cannot open $f" if !open(F, $f);
        while(<F>){
            if(/(t[0-9]+x)/){
                if(grep(/$1/, @ml)){
                    push(@{$pl->[$i]}, $1);
                }
            } else {
                print STDERR "test-ticker.pl: unexpected line $_\n";
            }
        }
        close(F);
    }

    if(0){
        for($i = 0; $i < $np; $i++){
            my @a = @{$pl->[$i]};
            print "@a\n";
        }
    }

    my @a0;

    # Did the servers produce the right number of output transactions?
    for($i = 0; $i < $np; $i++){
        if(!defined($peers->[$i]{dead})){
            my @a = @{$pl->[$i]};
            if($#a != $#ml){
                return "too few transactions in output";
            }
            @a0 = @a;
        }
    }

    # Did the servers produce the same orders?
    for($i = 0; $i < $np; $i++){
        if(!defined($peers->[$i]{dead})){
            my @a = @{$pl->[$i]};
            my $j;
            for($j = 0; $j <= $#a0; $j++){
                if($a[$j] ne $a0[$j]){
                    return "servers produced different orders";
                }
            }
            $ns += 1;
        }
    }

    # Did the servers produce each submitted transaction exactly once?
    @a0 = sort { $a cmp $b } @a0;
    @ml = sort { $a cmp $b } @ml;
    for($i = 0; $i <= $#ml; $i++){
        if($a0[$i] ne $ml[$i]){
            return "submitted and output transactions differ";
        }
    }

#    print STDERR "(ok $ns $#ml) ";

    return "";
}

sub analyze {
    my $res = analyze1($#msgs + 1);
    return $res;
}

sub killall {
    for(@{$peers}){
        if(defined($_->{pid})){
            kill(9, $_->{pid});
            unlink($_->{file});
        }
    }
    sleep(1);
    waitx();
}

# Make sure no dead server has produced more than $n lines of output.
sub check_dead {
    my($n) = @_;
    my $p;
    foreach $p (@{$peers}){
        if(defined($p->{dead})){
            my $f = $p->{file};
            die "cannot open $f" if !open(F, $f);
            my @a;
            while(<F>){
                if(/(t[0-9]+x)/){
                    if(grep(/$1/, @msgs)){
                        push(@a, $1);
                    }
                }
            }
            close(F);
            if($#a + 1 > $n){
                return "dead server produced output";
            }
        }
    }
    return "";
}

sub kill_test {
    my($persec) = @_;
    my $np = $#{$peers} + 1;
    my $total = 0;

    my $i;
    for($i = 0; $i < 3; $i++){
        submit_n($persec);
        sleep(1);
    }

    # Decide which to kill, stop submitting through them.
    for($i = 0; $i < $np/2; $i++){
#        print STDERR "(killing $i) ";
        $peers->[$i]{dead} = 1;
    }

    # Give the dead ones a chance to drain submissions in progress.
    for($i = 0; $i < $maxdelay; $i++){
        submit_n($persec);
        sleep(1);
    }

    # Now actually kill them.
    for($i = 0; $i < $np/2; $i++){
        kill(9, $peers->[$i]{pid});
    }

    # Give everybody time to detect the dead ones,
    # and an extra round.
    for($i = 0; $i < 2 * $maxdelay; $i++){
        submit_n($persec);
        sleep(1);
    }

    my $res = check_dead($persec * (3 + $maxdelay));
    if($res ne ""){
        return $res;
    }

    return analyze1($persec * (3 + 2 * $maxdelay));
}

sub running_test {
    my($sec, $persec) = @_;

    my $i;
    for($i = 0; $i < $sec; $i++){
        submit_n($persec);
        if($i > $maxdelay){
            my $res = analyze1($persec * ($i - $maxdelay));
            if($res ne ""){
                return $res;
            }
        }
        sleep(1);
    }
}

# Sub $sec * $persec transactions, then give the servers
# plenty of time to finish them all.
sub all_test {
    my($sec, $persec) = @_;
    my $i;

    for($i = 0; $i < $sec; $i++){
        submit_n($persec);
        sleep(1);
    }
    for($i = 0; $i < $maxdelay; $i++){
        sleep(1);
        $res = analyze1($sec * $persec);
        if($res eq ""){
            return $res;
        }
    }
    return $res;
}

sub check_res {
    my($res) = @_;
    if($res eq ""){
        print "passed\n";
    } else {
        print "failed ($res)\n";
        killall();
        exit(1);
    }
}

sub main {
    my $i;
    my $res;

    if($#ARGV != 1){
        print STDERR "Usage: test-ticker.pl server client\n";
        exit(1);
    }
    $server = $ARGV[0];
    $client = $ARGV[1];

    print "One server, one transaction: ";
    start_n(1);
    sleep(1);
    $res = all_test(1, 1);
    check_res($res);
    killall();

    print "Two servers, one transaction: ";
    start_n(2);
    sleep(5);
    $res = all_test(1, 1);
    check_res($res);

    print "Two servers, two transactions: ";
    $res = all_test(2, 1);
    check_res($res);

    print "Two servers, ten concurrent transactions: ";
    $res = all_test(1, 10);
    check_res($res);

    killall();

    print "Five servers, continuous transactions: ";
    start_n(5);
    sleep(5);
    $res = running_test($maxdelay + 10, 10);
    check_res($res);
    killall();

    print "One of two servers fail: ";
    start_n(2);
    sleep(5);
    $res = kill_test(10);
    check_res($res);
    killall();

    print "Three of six servers fail: ";
    start_n(6);
    sleep(5);
    $res = kill_test(10);
    check_res($res);

    killall();
}

main();
