| #!/usr/bin/perl -w |
| use strict; |
| use Getopt::Long qw(GetOptions :config no_ignore_case); |
| use Time::HiRes qw(gettimeofday tv_interval); |
| |
| my %opt = ( |
| host => 'localhost', |
| port => 30783, |
| conc => 2, |
| max => 0, |
| ); |
| |
| GetOptions(\%opt, qw(host|h=s port|p=i conc|concurrency|c=i max|m=i)); |
| die "usage:\n\t$0 list of mboxes\n" unless @ARGV; |
| |
| my (@mboxes, $curr_mbox, $mbox_fh) = @ARGV; |
| |
| #my $all_all = 0; |
| #for my $f (@ARGV) { |
| # my $mbox = Mail::MboxParser->new($f) or die; |
| # $mbox->make_index; |
| # push @mboxes, $mbox; |
| # print 'mbox ' . $mbox->nmsgs() . "\t$f\n"; |
| # $all_all += $mbox->nmsgs; |
| #} |
| |
| use IO::Socket::INET6; |
| use IO::Multiplex; |
| |
| my @sockets; |
| my %conn = ( |
| PeerAddr => $opt{host}, |
| PeerPort => $opt{port}, |
| ); |
| |
| my $mux = IO::Multiplex->new; |
| $mux->set_callback_object(__PACKAGE__); |
| |
| my $msgs = 0; |
| my $tempfoo; |
| my $start = [gettimeofday]; |
| |
| while ($mux->handles < $opt{conc} && new_conn()) { |
| ##warn ~~ $mux->handles(); |
| die if $mux->handles > $opt{conc}; |
| } |
| $mux->loop; |
| |
| my $howlong = tv_interval($start); |
| my $hour = int($howlong / 3600); |
| my $min = int(($howlong % 3600) / 60); |
| my $sec = $howlong % 60; |
| printf |
| "parsed %d messages in %02d:%02d:%02d (%s s), %.4f msgs/s (%.0f msgs/min, %.0f msgs/h)\n", |
| $msgs, $hour, $min, $sec, $howlong, $msgs / $howlong, $msgs * 60 / $howlong, |
| $msgs * 60 * 60 / $howlong; |
| |
| #sleep 1; |
| |
| sub new_conn { |
| my $message = next_message() or return; |
| die 'handles: ' . $mux->handles if $mux->handles > $opt{conc}; |
| |
| return if $opt{max} && $msgs >= $opt{max}; |
| ++$msgs; |
| |
| # return 1 unless ++$tempfoo >= 6800; |
| #die "'$$message'"; |
| |
| my $s = IO::Socket::INET6->new(%conn) or die; |
| $mux->add($s) or die; |
| my $spamc = Spamc->new(id => $msgs, s => $s, start => [gettimeofday],); |
| $mux->set_callback_object($spamc, $s); |
| $mux->set_timeout($s, 20); |
| $mux->write($s, |
| "SYMBOLS SPAMC/1.9\r\n" |
| . 'Content-length: ' |
| . length($$message) |
| . "\r\n\r\n" |
| . $$message) |
| or die; |
| 1; |
| } |
| |
| sub next_message { |
| local $/ = "\nFrom "; |
| if ($curr_mbox && !eof $mbox_fh) { |
| my $msg = tell $mbox_fh ? <$mbox_fh> : 'From ' . <$mbox_fh>; |
| $msg =~ s/\r?\n(?:From )?$//; # (?:...) is for last message |
| return \$msg; |
| } |
| else { # end of mbox or first one |
| return unless @mboxes; # end |
| $curr_mbox = shift @mboxes; |
| close $mbox_fh if $mbox_fh; |
| open $mbox_fh, '<', $curr_mbox or die "open $curr_mbox: $!"; |
| return next_message(); # ;-> |
| } |
| } |
| |
| package Spamc; |
| |
| sub mux_input { |
| my ($self, $mux, $fh, $in) = @_; |
| |
| my $ret = $self->parse($in); |
| if (defined $ret) { |
| main::new_conn(); |
| if ($ret) { # ok |
| (my $body = $self->{body}) =~ y/\r\n/ /s; |
| $self->{headers}->{spam} =~ |
| /^([TF])\S+\s*;\s*(-?[\d.]+)\s*\/\s*([\d.]+)\b/ |
| or die "bad Spam header: '$self->{headers}->{spam}'"; |
| printf "%-8s %5s %1s %4s/%3s %s\n", |
| Time::HiRes::tv_interval($self->{start}), |
| ($self->{id} ? $self->{id} : '(wtf)'), $1, $2, $3, $body; |
| } |
| else { |
| warn 'fail for ', ($self->{id} ? $self->{id} : '(wtf)'), |
| ": $self->{rcode} $self->{rmsg}\n"; |
| $mux->kill_output($fh); |
| } |
| $fh->close; # are both needed? |
| $mux->close($fh); |
| } |
| |
| # undef $$in; |
| # $mux->close($fh); |
| } |
| |
| sub mux_timeout { |
| my $self = shift; |
| my $mux = shift; |
| warn "timeout for $self->{id}\n"; |
| $mux->close($self->{s}); |
| } |
| |
| sub new { |
| my $class = shift; |
| bless {@_}, $class; |
| } |
| |
| sub parse { |
| my $self = shift; |
| my $in = ref $_[0] ? $_[0] : \$_[0]; |
| my $ret; |
| while ($$in =~ /\n/ |
| or defined $self->{body} |
| && (length $$in || $self->{headers}->{content_length} == 0)) |
| { |
| $ret = |
| !defined $self->{banner} ? $self->banner($in) |
| : !defined $self->{body} ? $self->headers($in) |
| : $self->body($in); |
| return $ret if defined $ret; |
| } |
| undef; |
| } |
| |
| sub banner { |
| my $self = shift; |
| my $in = shift; |
| if ($$in =~ s/^SPAMD\/(\d\.\d)\s+(\d+)\s+([^\r\n]+)\r?\n//) { |
| (@{$self}{qw(sver rcode rmsg)}) = ($1, $2, $3); |
| $self->{banner}++; |
| } |
| else { |
| warn "unparseable input from spamd: '$$in'"; |
| return 0; |
| } |
| if ($self->{rcode} != 0) { |
| # warn "fail: $self->{rcode} $self->{rmsg}\n"; |
| return 0; |
| } |
| undef; |
| } |
| |
| sub headers { |
| my $self = shift; |
| my $in = shift; |
| die "blah" unless $self->{banner}; |
| while ($$in =~ s/^([a-z\d_-]+):\s+([^\r\n]+)\r?\n//i) { |
| my ($h, $v) = ($1, $2); |
| $h =~ y/A-Z-/a-z_/; |
| $self->{headers}->{$h} = $v; |
| } |
| die "content-length not numeric" |
| if defined $self->{headers}->{content_length} |
| && $self->{headers}->{content_length} !~ /^\d+$/; |
| if ($$in =~ s/^\r?\n//) { |
| $self->{body} = ''; |
| unless ($self->{headers}->{spam}) { |
| warn "no Spam header", keys %{ $self->{headers} }; |
| return 0; |
| } |
| unless (defined $self->{headers}->{content_length}) { |
| warn "Content-length is required"; |
| return 0; |
| } |
| } |
| elsif ($$in =~ /\n/) { |
| warn "bad header '$$in'"; |
| return 0; |
| } |
| undef; |
| } |
| |
| sub body { |
| my $self = shift; |
| my $in = shift; |
| die "fubar" |
| unless $self->{banner} && $self->{headers} && defined $self->{body}; |
| $self->{body} .= $$in; |
| $$in = ''; |
| if (defined(my $l = $self->{headers}->{content_length})) { |
| if (length $self->{body} == $l) { |
| return 1; |
| } |
| elsif (length $self->{body} > $l) { |
| warn "body too long"; |
| return 0; |
| } |
| } |
| else { |
| return 1 if $self->{body} =~ /\n/; # only good for one line output |
| } |
| undef; |
| } |
| |
| #sub DESTROY { my $self = shift; warn "DESTROY $self->{id}"; } |
| 1; |