#!/usr/bin/perl -w
use strict;
use MIME::Base64;

#
# <@LICENSE>
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to you under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at:
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# </@LICENSE>

sub aidbg;

sub usage {
  my $status = shift;

  my $out = $status ? \*STDERR : \*STDOUT;
  print $out <<EOF;
usage: mass-check [options] target ...

  -c=file       set configuration/rules directory
  -p=dir        set user-prefs directory
  -f=file       read list of targets from <file>
  -j=jobs       specify the number of processes to run simultaneously
  --net         turn on network checks
  --mid         report Message-ID from each message
  --debug[=LIST] report debugging information (default is all facilities, LIST
                is a comma-separated list of facilities)
  --rewrite=OUT save rewritten message to OUT (default is /tmp/out)
  --rules=RE    Only test rules matching the given regexp RE
  --restart=N   restart all of the children after processing N messages
  --deencap=RE  Extract SpamAssassin-encapsulated spam mails only if they
                were encapsulated by servers matching the regexp RE
                (default = extract all SpamAssassin-encapsulated mails)
  --lint        check rules for syntax before running
  --cf='config line'  Additional line of configuration
  --pre='config line' Additional line of ".pre" (prepended to configuration)
  --run_post_scan='command'  Run the named command after the 'scan' stage,
                before starting the 'run' stage

  verbosity options
  --progress    show progress updates during check
  --noisy       show noisier progress updates during check
  --showdots    print a dot for each scanned message

  client/server mode options
  --server=host:port
                use server mode, running on the given hostname and port
  --client=host:port
  		use client mode, connecting to the given hostname and port
  --cs_conn_retries=N
		only used in client mode. set the number of times to retry
		the initial connection to the server, while waiting 60
		seconds between connection attempts, default is 60 retries
  --cs_max=N
  		at most, only ever request (client)/give out (server) a
		maximum of N messages (defaults to 1000)
  --cs_timeout=N
  		in client mode, try to connect to the server every N seconds
		defaults to 120
		in server mode, timeout messages after N seconds
		defaults to 300
  --cs_paths_only
		only used in client mode.  when making requests of the
		server, only ask for paths to the messages and not the
		messages themselves.  useful when the client and server
		have the same paths to the corpus data.
  --cs_ssl      use SSL to encrypt on-the-wire client-server traffic
                (requires IO::Socket::SSL, see
                http://wiki.apache.org/spamassassin/SslMassCheck for more)
  --cs_verbose  Log network bandwidth utilization figures and other statistics
  --cs_schedule_cache
                Distribute messages so that they are checked on clients who
                have the messages in their local message cache, implies
                --cs_cache; if --cs_schedule_cache is not enabled, but
                --cs_cache is, clients running in --cs_paths_only mode will
                opportunistically use the messages from their local cache
  --cs_cache	in client mode, enable the local message cache
                in server mode, allows the clients to use cached messages
                and/or add to their local caches
  --cs_cachedir=dir
                write cache info for --cs_cache in this directory tree
  --cs_max_tries=N
                maximum number of attempts to have a client scan a message
                defaults to 3

  log options
  -o            write all logs to stdout
  --loghits     log the text hit for patterns (useful for debugging)
  --loguris	log the URIs found
  --logmem	log the memory delta (only on Linux)
  --hamlog=log  use <log> as ham log ('ham.log' is default)
  --spamlog=log use <log> as spam log ('spam.log' is default)

  message selection options
  -n            no date sorting or spam/ham interleaving
  --cache	use cache information when selecting messages
  --cachedir=dir write cache info for --cache in this directory tree
  --all         don't skip big messages

  message selection options, can be specified for each target
  --after=N     only test mails received after time_t N (negative values
                are an offset from current time, e.g. -86400 = last day)
                or after date as parsed by Time::ParseDate (e.g. '-6 months')
  --before=N    same as --after, except received times are before time_t N
  --scanprob=N  probability of scanning a message, range 0.0 - 1.0 (default: 1.0)

  message selection options, can be specified for each target class
  --head=N      only check first N ham and N spam (N messages if -n used)
  --tail=N      only check last N ham and N spam (N messages if -n used)

  simple target options (implies -o and no ham/spam classification)
  --dir         subsequent targets are directories
  --file        subsequent targets are files in RFC 822 format
  --mbox        subsequent targets are mbox files
  --mbx         subsequent targets are mbx files

  Just left over functions we should remove at some point:
  --bayes       report score from Bayesian classifier

  options used during score generation process
  --learn=N     learn N% of messages as spam or ham
  --reuse       reuse network checks if X-Spam-Status: is present in messages
                (note: both clients and servers in c/s mode need this)

  non-option arguments are used as target names (mail files and folders),
  the target format is: <class>:<format>:<location>
  <class>       is "spam" or "ham"
  <format>      is "dir", "file", "mbx", "mbox", or "detect"
  <location>    is a file or directory name.  globbing of ~ and * is supported

(see http://wiki.apache.org/spamassassin/MassCheck for more details.)

EOF
  exit($status);
}

###########################################################################

our ($opt_c, $opt_p, $opt_f, $opt_j, $opt_n, $opt_o, $opt_all, $opt_bayes,
     $opt_debug, $opt_format, $opt_hamlog, $opt_head, $opt_loghits,
     $opt_mid, $opt_net, $opt_nosort, $opt_progress, $opt_showdots,
     $opt_spamlog, $opt_tail, $opt_rules, $opt_restart, $opt_loguris,
     $opt_logmem, $opt_after, $opt_before, $opt_rewrite, $opt_deencap,
     $opt_learn, $opt_reuse, $opt_lint, $opt_cache, $opt_noisy, $opt_cf,
     $total_messages, $statusevery, $opt_cachedir, $opt_scanprob,
     $opt_client, $opt_cs_conn_retries, $opt_cs_max, $opt_cs_timeout,
     $opt_cs_paths_only, $opt_server, %postdata, %real, $svn_revision,
     $opt_cs_ssl, $opt_run_post_scan, $opt_cs_verbose, %client_caches,
     %server_caches, @cache_tmp_files, %min_other_caches,
     %unique_cache_completed, $opt_cs_schedule_cache, $opt_cs_cache,
     $opt_cs_cachedir, $opt_cs_max_tries, $opt_pre,
     $tmpfd);

use FindBin;

# use "blib" so that we can use e.g. @@LOCAL_STATE_DIR@@; "lib" doesn't
# have that stuff substituted :(  use lib too, though, as a backup,
# since some users might be running mass-check without "make" first
use lib "$FindBin::Bin/../lib";
use lib "$FindBin::Bin/../blib/lib";

use IO::Select;
use IO::Socket;
use Socket qw();
use Mail::SpamAssassin::ArchiveIterator;
use Mail::SpamAssassin;
use Mail::SpamAssassin::Logger;
use File::Copy;
use File::Spec;
use Getopt::Long;
use POSIX qw(strftime);
use Fcntl qw(O_RDWR O_CREAT);;
use Config; 

use constant HAS_TIME_PARSEDATE => eval { require Time::ParseDate; };
use constant HAS_IO_ZLIB => eval { require IO::Zlib; };
use constant HAS_IO_SOCKET_SSL => eval { require IO::Socket::SSL; };
use constant HAS_TIME_HI_RES => eval { require Time::HiRes; };
use constant HAS_SDBM_FILE => eval { require SDBM_File; };


# default settings
$opt_c = "$FindBin::Bin/../rules";
$opt_p = "$FindBin::Bin/spamassassin";
$opt_j = 1;
$opt_head = 0;
$opt_tail = 0;
$opt_net = 0;
$opt_hamlog = "ham.log";
$opt_spamlog = "spam.log";
$opt_learn = 0;
$opt_cf = [];
$opt_pre = [];

my $rcvd_bytes = 0;
my $sent_bytes = 0;
my $t_first_msg = 0;
my $t_last_msg = 0;
my $msgs_processed = 0;
my $failed_msgs = 0;
my $cache_hits = 0;
my $client_id = 0;

my @ORIG_ARGV = @ARGV;
GetOptions("c=s", "p=s", "f=s", "j=i", "n", "o", "all", "bayes", "debug:s",
	   "hamlog=s", "head=i", "loghits", "mh", "mid", "ms", "net",
	   "progress!", "rewrite:s", "showdots", "spamlog=s", "tail=i",
	   "rules=s", "restart=i", "loguris", "run_post_scan=s",
	   "deencap=s", "logmem", "learn=i", "reuse", "lint", "cache",
           "cachedir=s", "noisy", "scanprob=f",
	   "server=s", "cs_max=i", "cs_timeout=i", "cs_conn_retries=i",
	   "cs_paths_only", "client=s", "cs_ssl", "cs_verbose",
           "cs_schedule_cache", "cs_cache", "cs_cachedir=s", "cs_max_tries=i",
	   "before=s" => \&deal_with_before_after,
	   "after=s" => \&deal_with_before_after,
	   "cf=s" => \@{$opt_cf},
	   "pre=s" => \@{$opt_pre},
	   "dir" => sub { $opt_format = "dir"; },
	   "file" => sub { $opt_format = "file"; },
	   "mbox" => sub { $opt_format = "mbox"; },
	   "mbx" => sub { $opt_format = "mbx"; },
	   "help" => sub { usage(0); },
	   '<>' => \&target) or usage(1);

# We need IO::Zlib for client-server mode!
if ( ($opt_client || $opt_server) && ! HAS_IO_ZLIB ) {
  die "IO::Zlib required for client/server mode!\n";
}

if ($opt_cs_ssl && ! HAS_IO_SOCKET_SSL ) {
  die "IO::Socket::SSL required for --cs_ssl!\n";
}

if ($opt_noisy) {
  $opt_progress = 1;        # implies --progress
}

$opt_debug ||= 'all' if defined $opt_debug;

if ($opt_cs_schedule_cache) {
  $opt_cs_cache = 1;        # implies --cs_cache
}

if ($opt_client && $opt_cs_cache && !$opt_cs_cachedir) {
  warn "You must specify a local message cache directory with --cs_cachedir\n".
       "when using the --cs_cache option.\n";
  exit;
}  

if ($opt_server && $opt_cs_schedule_cache && !$opt_n) {
  warn '*'x74 ."\n";
  warn '*'. ' 'x72 ."*\n";
  warn "*   Corpus will be run un-sorted but with date stamp loggging which is   *\n";
  warn "*   needed for score generation log selection but is not available when  *\n";
  warn "*   using the -n option.  This may affect the results of any bayes and   *\n";
  warn "*   AWL tests run during this mass-check.                                *\n";
  warn '*'. ' 'x72 ."*\n";
  warn '*'x74 ."\n";
}

if ($opt_cs_schedule_cache && !HAS_SDBM_FILE) {
  warn "--cs_schedule_cache requires the Perl module SDBM_File.\n";
  exit;
}

# --lint
if ($opt_lint) {
  # In theory we could probably use the same spamtest object as below,
  # but since it's probably not expecting that, and we don't want
  # strange things happening, create a local object.
  my $spamlint = create_spamtest();
  $spamlint->debug_diagnostics();
  my $res = $spamlint->lint_rules();
  $spamlint->finish();
  if ($res) {
    warn "lint: $res issues detected, ".
        "please rerun with debug enabled for more information\n";
    exit 1;
  }
}

# test messages for the mass-check
my @targets;
if (!$opt_client) {
  if ($opt_f) {
    open(F, $opt_f) || die "cannot read target $opt_f: $!";
    push(@targets, map { chomp; $_ } <F>);
    close(F);
  }
  usage(1) if !@targets;
}

if ($opt_reuse) {
  # if we have --reuse, don't bother testing DNS; we shouldn't be hitting
  # the wire at all, and in fact we may be running without a net connection
  push @{$opt_cf}, "dns_available yes\n";

  # need to load M:SA:Plugin:Reuse
  push @{$opt_pre}, "loadplugin Mail::SpamAssassin::Plugin::Reuse\n";

}

my $user_prefs = "$opt_p/user_prefs";

sub create_spamtest {
  return new Mail::SpamAssassin ({
    'debug'              		=> $opt_debug,
    'rules_filename'     		=> $opt_c,
    'site_rules_filename'		=> "$opt_p/local.cf",
    'userprefs_filename'		=> $user_prefs,
    'userstate_dir'     		=> $opt_p,
    'save_pattern_hits'  		=> $opt_loghits,
    'dont_copy_prefs'   		=> 1,
    'local_tests_only'   		=> $opt_net ? 0 : 1,
    'only_these_rules'   		=> $opt_rules,
    'ignore_safety_expire_timeout'	=> 1,
    'pre_config_text'                   => join("\n", @{$opt_pre})."\n",
    'post_config_text'                  => join("\n", @{$opt_cf})."\n",
    PREFIX				=> '',
    DEF_RULES_DIR        		=> $opt_c,
    LOCAL_RULES_DIR      		=> '',
  });
}

my $spamtest = create_spamtest();
$spamtest->compile_now(0);      # 0 since we will be reading more configs
$spamtest->read_scoreonly_config("$FindBin::Bin/mass-check.cf");
$spamtest->read_scoreonly_config($user_prefs);
$spamtest->call_plugins("prefork_init");  # since SA 3.4.0

my $who = `id -un 2>/dev/null`;
my $where = `uname -n 2>/dev/null`;
my $when = `date -u`;
my $host = $ENV{'HOSTNAME'} || $ENV{'HOST'} || $where || `hostname` || 'localhost';
chomp $who;
chomp $where;
chomp $when;
chomp $host;
$svn_revision = get_current_svn_revision();

# when displaying the commandline, quote any arguments which have
# "questionable" characters such as spaces, pipes, etc.
my $cmdline = join(' ',map { m@[^A-Za-z0-9_/\\.-]@ ? qq/"$_"/ : $_ } @ORIG_ARGV); $cmdline =~ s/\s+/ /gs;
my $isowhen = strftime("%Y%m%dT%H%M%SZ", gmtime(time)); # better

my $log_header = "# mass-check results from $who\@$where, on $when\n" .
		 "# M:SA version ".$spamtest->Version()."\n" .
		 "# SVN revision: $svn_revision\n" .
                 "# Date: $isowhen\n" .
		 "# Perl version: $] on $Config{archname}\n" .
                 "# Switches: '$cmdline'\n";

my $updates = ($opt_noisy ? 100 : 10);
my $total_count = 0;
my $spam_count = 0;
my $ham_count = 0;
my $init_results = 0;

my $showdots_active = ($opt_showdots || $opt_noisy);
my $showdots_counter = 0;
my $showdots_every = ($opt_showdots ? 1 : 20);

my $AIopts = {
	'opt_all' => $opt_all,
        'opt_skip_empty_messages' => 1,
      };

if (!$opt_client) {
  # Deal with --rewrite
  if (defined $opt_rewrite) {
    my $rewrite = ($opt_rewrite ? $opt_rewrite : "/tmp/out");
    open(REWRITE, "> $rewrite") || die "open of $rewrite failed: $!";
  }

  # ArchiveIterator options for non-client mode
  $AIopts->{'opt_scanprob'} = $opt_scanprob;
  $AIopts->{'opt_cache'} = $opt_cache;
  $AIopts->{'opt_cachedir'} = $opt_cachedir;
  $AIopts->{'opt_after'} = $opt_after;
  $AIopts->{'opt_before'} = $opt_before;
  $AIopts->{'scan_progress_sub'} = \&showdots_blip;
  $AIopts->{'opt_want_date'} = ! $opt_n;

  # ensure that scanprob stuff is predictable and reproducable
  if (defined $opt_scanprob && $opt_scanprob < 1.0) {
    srand(1);
  }
}
else {
  # ArchiveIterator options for client mode -- tends to be simple
  $opt_n = 1;
  $AIopts->{'opt_want_date'} = 0;
}

###########################################################################
## SCAN MODE

my $iter = new Mail::SpamAssassin::ArchiveIterator($AIopts);

# setup the AI functions
if ($opt_client) {
  $iter->set_functions(\&wanted, \&result_client);
}
elsif ($opt_server) {
  $iter->set_functions(\&wanted_server, \&result);
}
else {
  $iter->set_functions(\&wanted, \&result);
}

#AFTER SETTING ALL OPTS, RUN _set_default_message_selection_opts() TO GET SANE DEFAULTS
$iter->_set_default_message_selection_opts();

my $messages;

# normal mode as well as a server do scan mode and get a temp file
if (!$opt_client) {
  status('starting scan stage') if ($opt_progress);

  # Make a temp file and delete it
  my $tmpf;
  ($tmpf, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile();
  die 'mass-check: failed to create temp file' unless $tmpf;
  unlink $tmpf or die "mass-check: unlink '$tmpf': $!";

  # having opt_j or server mode means do scan in a separate process
  if ($opt_server || $opt_j) {
    if ($tmpf = fork()) {
      # parent
      waitpid($tmpf, 0);
    }
    elsif (defined $tmpf) {
      # child -- process using message_array
      generate_queue(\@targets, $tmpfd);
      exit;
    }
    else {
      die "mass-check: cannot fork: $!";
    }
  }
  else {
    # we get here if opt_j == 0, so scan in this process
    generate_queue(\@targets, $tmpfd);
  }

  # we now have a temporary file with the messages to process
  seek($tmpfd, 0, 0);
  # the first line is the number of messages
  $total_messages = read_line($tmpfd);

  if (!$total_messages) {
    die "mass-check: no messages to process\n";
  }

  if ($opt_cs_schedule_cache) {
    # create a tied hash database for the server to use for matching against
    # client hashes

    # while we're at it, count the total number of messages for ourself
    # I suspect that AI gets it wrong sometimes, somehow
    $total_messages = 0;

    # get a temp file for the server database on messages to process
    my ($dbpath, $dbfd) = Mail::SpamAssassin::Util::secure_tmpfile();
    close $dbfd;
    unlink $dbpath;
    push @cache_tmp_files, $dbpath.'.pag';
    push @cache_tmp_files, $dbpath.'.dir';

    tie %{$server_caches{'to_process'}}, "SDBM_File", $dbpath,
      O_RDWR|O_CREAT, 0600 || die "Cannot tie hash to file $dbpath: $!";

    # get a temp file for the server DB of messages that are not cached anywhere
    ($dbpath, $dbfd) = Mail::SpamAssassin::Util::secure_tmpfile();
    close $dbfd;
    unlink $dbpath;
    push @cache_tmp_files, $dbpath.'.pag';
    push @cache_tmp_files, $dbpath.'.dir';
  
    tie %{$server_caches{'not_cached'}}, "SDBM_File", $dbpath,
      O_RDWR|O_CREAT, 0600 || die "Cannot tie hash to file $dbpath: $!";

    # get a temp file for the server database of cache hit counts per message
    ($dbpath, $dbfd) = Mail::SpamAssassin::Util::secure_tmpfile();
    close $dbfd;
    unlink $dbpath;
    push @cache_tmp_files, $dbpath.'.pag';
    push @cache_tmp_files, $dbpath.'.dir';

    tie %{$server_caches{'cache_count'}}, "SDBM_File", $dbpath,
      O_RDWR|O_CREAT, 0600 || die "Cannot tie hash to file $dbpath: $!";

    # dump the server's list of messages to process to the DB_HASHes
    while (my $msg = read_line($tmpfd)) {
      my @d = Mail::SpamAssassin::ArchiveIterator::_index_unpack($msg);
      $server_caches{'to_process'}{$d[3]} = $msg;
      $server_caches{'not_cached'}{$d[3]} = undef;
      $total_messages++;
    }
    # and back to the first message (although we currently never use it again)
    seek($tmpfd, 0, 0);
    read_line($tmpfd);
  }

  showdots_finish();
  status("completed scan stage, $total_messages messages") if ($opt_progress);
}

###########################################################################
## RUN MODE

run_post_scan();

if ($opt_client) {
  client_mode();
}
else {
  status('starting run stage') if ($opt_progress);

  if ($opt_server) {
    server_mode();
  }
  else {
    $t_first_msg = time;
    run_through_messages();
    $t_last_msg = time;
  }

  status('completed run stage') if ($opt_progress);
}

# Even though we're about to exit, let's clean up after ourselves
close($tmpfd) if ($tmpfd);
showdots_finish();

if (defined $opt_rewrite) {
  close(REWRITE);
}

$spamtest->finish();

if ($opt_cs_verbose) {
  if ($opt_client || $opt_server) {
    warn "network I/O: sent=$sent_bytes, received=$rcvd_bytes\n";
  }
  $t_last_msg++ if $t_last_msg == $t_first_msg;
  warn "processed $msgs_processed messages (failed to scan $failed_msgs) in ".($t_last_msg - $t_first_msg).
       ' seconds ('.(sprintf("%.1f", ($msgs_processed / ($t_last_msg - $t_first_msg) * 3.6))).
       " kmsgs/hr)\n";
  $msgs_processed ||= 1;
  warn "cache hits: $cache_hits ("
    .(sprintf("%.1f", ($cache_hits / $msgs_processed * 100)))."%)\n" if $opt_cs_cache;
}

# exit status: did we check at least one message correctly?
exit(!($ham_count || $spam_count));

###########################################################################

sub target  {
  my ($target) = @_;

  # message-selection options; these can now be specified separately
  # for each target
  my %selopts = (
    opt_scanprob => $opt_scanprob,
    opt_after => $opt_after,
    opt_before => $opt_before
  );

  if (!defined($opt_format)) {
    push(@targets, { %selopts, target => $target });
  }
  else {
    $opt_o = 1;
    push(@targets, { %selopts, target => "spam:$opt_format:$target" });
  }
}

###########################################################################

sub init_results {
  $init_results = 1;

  showdots_finish();

  # now, showdots only happens if --showdots was used
  $showdots_active = $opt_showdots;

  if ($opt_progress) {
    # round up since 100% will be caught at end already
    $statusevery = int($total_messages / $updates + 1);

    # if $messages < $updates, just give a status line per msg.
    $statusevery ||= 1;
  }

  return if $opt_client;

  if ($opt_o) {
    autoflush STDOUT 1;
    print STDOUT $log_header;
  }
  else {
    open(HAM, "> $opt_hamlog") || die "open of $opt_hamlog failed: $!";
    open(SPAM, "> $opt_spamlog") || die "open of $opt_spamlog failed: $!";
    autoflush HAM 1;
    autoflush SPAM 1;
    print HAM $log_header;
    print SPAM $log_header;
  }
}

sub result {
  my ($class, $result, $time) = @_;

  # don't open results files until we get here to avoid overwriting files
  init_results() if !$init_results;

  if ($class eq "s") {
    if ($opt_o) { print STDOUT $result; } else { print SPAM $result; }
    $spam_count++;
  }
  elsif ($class eq "h") {
    if ($opt_o) { print STDOUT $result; } else { print HAM $result; }
    $ham_count++;
  }

  $total_count++;

  if ($opt_progress) {
    progress($time);
  }
}

sub wanted {
  my ($class, $id, $time, $dataref, $format) = @_;
  my $out = '';

  # if origid is defined, it'll be the message number from server mode
  my $origid;

  # client mode is a little crazy because we need to kluge around the fact
  # that the information needed to do the run is different than the
  # information that goes into the results.
  if ($opt_client) {
    # change the format and id to the real version, make sure to remember
    # the server's message number
    # note: pop/push makes it work regardless of any changes to M::SA::AI
    $origid = pop @{$real{$id}};
    $format = $real{$id}->[2];
    $id = $real{$id}->[3];
  }

  memory_track_start() if ($opt_logmem);
  $spamtest->timer_reset;

  # parse the message, and force it to complete
  my $ma = $spamtest->parse($dataref, 1);

  # get X-Spam-Status: header for rule hit resue
  my $x_spam_status;
  my $reusing;
  if ($opt_reuse) {
    $x_spam_status = $ma->get_header("X-Spam-Status");
    $x_spam_status and $x_spam_status =~ s/,\s+/,/gs;
  }

  # remove SpamAssassin markup, if present and the mail was spam
  my $header = $ma->get_header("Received");
  if ($header && $header =~ /\bwith SpamAssassin\b/) {
    if (!$opt_deencap || message_should_be_deencapped($ma)) {
      my $new_ma = $spamtest->parse($spamtest->remove_spamassassin_markup($ma), 1);
      $ma->finish();
      $ma = $new_ma;
    }
  }

  if ($opt_reuse) {
    if ($x_spam_status 
        && $x_spam_status =~ m/tests=(\S*)/
        && $x_spam_status !~ /\bshortcircuit=(?:ham|spam|default)\b/)
    {
      my @previous = split(/,/, $1);
      # Bug 7709
      # Amavis X-Spam-Status rules include score and are enclosed in []
      # Amavis: [RULENAME=0.01,RULENAME_2=0.01]
      # Spamassassin: RULENAME,RULENAME_2
      s/[\[\]]//, s/=.*// foreach (@previous);
      $ma->{metadata}->{reuse_tests_hit} = { map {$_ => 1} @previous };
      $reusing = 1;
    }
  }

  # plugin hook to cause us to skip messages
  my $skip = $spamtest->call_plugins("mass_check_skip_message", {
        class => $class,
        'time' => $time,
        'id' => $id,
        msg => $ma
      });
  if ($skip) {
    $ma->finish();
    return;
  }

  # log-uris support
  my $status;
  my @uris;
  my $before;
  my $after;
  if ($opt_loguris) {
    my $pms = Mail::SpamAssassin::PerMsgStatus->new($spamtest, $ma);
    @uris = $pms->get_uri_list();
    $pms->finish();

  } else {
    $before = time;
    $status = $spamtest->check($ma);
    $after = time;
  }

  my @extra;

  # sample-based learning
  if ($opt_learn > 0) {
    my $spam;
    # spam learned as ham = 0.05%
    if ($class eq 's' && rand(100) < 0.05) {
      $spam = 0;
    }
    # ham learned as spam = 0.01%
    elsif ($class eq 'h' && rand(100) < 0.01) {
      $spam = 1;
    }
    # spam/ham learned correctly
    elsif (rand(100) < $opt_learn) {
      if ($class eq 's') {
	$spam = 1;
      }
      elsif ($class eq 'h') {
	$spam = 0;
      }
      else {
	die "unknown class, learning failed";
      }
    }
    if (defined $spam) {
      my $result = ($spam ? "spam" : "ham");
      my $status = $spamtest->learn($ma, undef, $spam, 0);
      my $learned = $status->did_learn();
      $result = "undef" if !defined $learned;
      push(@extra, "learn=".$result);
    }
  }

  if (defined($time)) {
    push(@extra, "time=".$time);
  }
  if ($status && defined $status->{bayes_score}) {
    push(@extra, "bayes=".sprintf("%06f", $status->{bayes_score}));
  }
  if ($opt_mid) {
    my $mid = $ma->get_header("Message-Id");
    if ($mid) {     # message contains a Message-Id:
      while($mid =~ s/\([^\(\)]*\)//s) {};   # remove comments and
      $mid =~ s/^\s+|\s+$//sg;               # leading and trailing spaces
      $mid =~ s/\s.*$//s;                    # keep only the first token
    }
    else {          # it doesn't have a Message-Id:
      $mid = $id;             # so build one from the id
      $mid =~ s,^.*/,,;       # remove the path
      $mid = "<$mid\@$host.masses.spamassassin.org>";  # and put it together
    }
    $mid =~ tr/-A-Za-z0-9_!#%&=~<@>/./c;     # replace dangerous chars with . (so regexp search just works)
    push(@extra, "mid=$mid");
  }
  push(@extra, "scantime=" . ($after - $before));
  push(@extra, "format=$format");

  if ($opt_logmem) { 
    my $mem = memory_track_finish();
    if ($mem) {
      push(@extra, $mem);
    }
  }

  push(@extra, "reuse=" . ($reusing ? "yes" : "no"));

  # log the scoreset we're in
  {
    my $set = 0;
    if ($opt_net) { $set |= 1; }
    if ($status && defined $status->{bayes_score}) { $set |= 2; }
    push(@extra, "set=".$set);
  }

  if ($opt_client) {
    push(@extra, "host=$where");
  }

  my $yorn;
  my $score;
  my $tests;
  my $extra;

  if ($opt_loguris) {
    $yorn = '.';
    $score = 0;
    $tests = join(" ", sort @uris);
    $extra = '';
  } else {
    $yorn = $status->is_spam() ? 'Y' : '.';
    # don't bother adjusting scores for reuse
    $score = $status->get_score();
    # list of tests hit
    my @tests;
    push @tests, split(/,/, $status->get_names_of_tests_hit());
    push @tests, split(/,/, $status->get_names_of_subtests_hit());

    $tests = join(",", sort(@tests));
    $extra = join(",", @extra);
  }

  if (defined $opt_rewrite) {
    print REWRITE $status->rewrite_mail();
  }

  $id =~ s/\s/_/g;

  # if we have an origid set, it'll be the server mode's message number, so
  # attach it to our result appropriately.
  if (defined $origid) {
    $out = "$origid ";
  }

  $out .= sprintf("%s %2d %s %s %s\n", $yorn, $score, $id, $tests, $extra);

  if ($tests =~ /MICROSOFT_EXECUTABLE|MIME_SUSPECT_NAME/) {
    $out .= logkilled($ma, $id, "possible virus");
  }

  if ($opt_loghits) {
    my $log = '';
    foreach my $t (sort keys %{$status->{pattern_hits}}) {
      $_ = $status->{pattern_hits}->{$t};
      $_ ||= '';
      s/\r/\\r/gs;      # fix unprintables
      s/\n/\\n/gs;
      $log .= "$t=\"$_\" ";
    }
    if ($log) {
      chomp $log;
      $out .= "# $log\n";
    }
  }

  if (defined $status) { $status->finish(); }
  $ma->finish();
  undef $ma;		# clean 'em up
  undef $status;

  showdots_blip();
#  print ">>>> out = $out\n";
  return $out;
}

sub showdots_blip {
  return unless ($showdots_active);

  $showdots_counter++;
  if ($showdots_counter % $showdots_every == 0) {
    print STDERR '.';
    if ($showdots_counter % (60 * $showdots_every) == 0) {
      print STDERR "\n";
    }
  }
}

sub showdots_finish {
  print STDERR "\n" if ($showdots_active);
  $showdots_counter = 0;
}

# ick.  We have to go grovelling through the body parts to see if a message
# is a report_safe-marked-up message, because a local scanner will overwrite
# any remote scanner's X-Spam-Checker-Version header.
#
sub message_should_be_deencapped {
  my ($ma) = @_;

  # not sure why this is undefined, but it is sometimes
  if (defined $ma->{body_parts} && scalar @{$ma->{body_parts}} > 0) {
    my $firstpart = $ma->{body_parts}->[0];
    if (!$firstpart->{headers}->{'content-type'}
        || $firstpart->{headers}->{'content-type'} ne 'text/plain')
    {
      return 0;     # not a 'report_safe' encapsulation
    }

    if (scalar @{$firstpart->{raw}} < 3) { return 0; } # too short to be a report

    # grab first 2 lines
    my $text = $firstpart->{raw}->[0] . $firstpart->{raw}->[1];
    $text =~ s/\s+/ /gs;
    if ($text =~ /^Spam detection software, running on the system \"(\S+)\"/) {
      my $hname = $1;
      if ($hname =~ /$opt_deencap/io) {
        return 1;
      }
    }
  }

  return 0;     # a different host marked it up.  pass it through!
}

sub logkilled {
  my ($ma, $id, $reason) = @_;

  my $from = $ma->get_header("From")       || 'undef';
  my $to   = $ma->get_header("To")         || 'undef';
  my $subj = $ma->get_header("Subject")    || 'undef';
  my $mid  = $ma->get_header("Message-Id") || 'undef';
  chomp ($from);
  chomp ($to);
  chomp ($subj);
  chomp ($mid);
  return "# skipped killfiled message ($reason): from=$from to=$to subj=$subj mid=$mid id=$id\n";
}

sub progress {
  my ($time) = @_;
  $time ||= 0;

  # Are we at the end or otherwise at a point we should print status?  Then do it.
  if ($total_messages == $total_count || $total_count % $statusevery == 0) {
    my $time = strftime("%Y-%m-%d", localtime($time));
    status(sprintf("%3d%% ham: %-6d spam: %-6d date: %s",
	int(($total_count / $total_messages) * 100), $ham_count, $spam_count, $time));
  }
}

sub status {
  my($str) = @_;
  my $now = strftime("%Y-%m-%d %X", localtime(time));
  printf STDERR "status: %-48s now: %s\n", $str, $now;
}

###########################################################################

our ($mem_size, $mem_rss, $mem_shared);

sub memory_track_start {
  if ($^O =~ /linux/i) {
    if (open (IN, "</proc/$$/statm")) {
      my $statm = <IN>;
      close IN;
      if ($statm =~ /^(\d+) (\d+) (\d+) /) {
        $mem_size = $1;
        $mem_rss = $2;
        $mem_shared = $3;
      }
    }
  }
}

sub memory_track_finish {
  my $str = '';

  if ($^O =~ /linux/i) {
    if (open (IN, "</proc/$$/statm")) {
      my $statm = <IN>;
      close IN;
      if ($statm =~ /^(\d+) (\d+) (\d+) /) {
        my $size = $1;
        my $rss = $2;
        my $shared = $3;

        $str = sprintf ("memsz=%d,memrss=%d,memshr=%d",
                ($size - $mem_size),
                ($rss - $mem_rss),
                ($shared - $mem_shared));
      }
    }
  }
  return $str;
}

sub get_current_svn_revision {
  my $revision;

  # this is usually "${TOPDIR}/masses"
  my $dir = $FindBin::Bin || ".";

  if (-d "$dir/../.svn" || -f "$dir/svninfo.tmp") {
    if (-f "$dir/svninfo.tmp") {
      # created by build/automc/buildbot_ready for chrooted mass-checks
      open (SVNINFO, "< $dir/svninfo.tmp");
    } 
    else {
      # note, ".." since we want to pick up changes outside 'masses'
      # too!
      open (SVNINFO, "( svn info --non-interactive $dir/.. || svn info $dir/.. ) 2>&1 |");
    }

    while (<SVNINFO>) {
      # Revision: 383822
      next unless /^Revision: (\d+)/;
      $revision = $1;
      last;
    }
    close SVNINFO;
    return $revision if $revision;
  }

  # this probably will never work due to Rules Project changes TODO
  if (open(TESTING, "$opt_c/70_testing.cf")) {
    chomp($revision = <TESTING>);
    $revision =~ s/.*\$Rev:\s*(\S+).*/$1/;
    close(TESTING);
    return $revision if $revision;
  }

  return $revision || "unknown";
}

############################################################################

## children processors, start and process, used when opt_j > 1

sub start_children {
  my ($count, $child, $pid, $socket) = @_;

  my $io = IO::Socket->new();
  my $parent;

  # create children
  for (my $i = 0; $i < $count; $i++) {
    ($child->[$i],$parent) = $io->socketpair(AF_UNIX,SOCK_STREAM,PF_UNSPEC)
	or die "mass-check: socketpair failed: $!";
    if ($pid->[$i] = fork) {
      close $parent;

      # disable caching for parent<->child relations
      my ($old) = select($child->[$i]);
      $|++;
      select($old);

      $socket->add($child->[$i]);
      aidbg "mass-check: starting new child $i (pid ".$pid->[$i].")\n";
      next;
    }
    elsif (defined $pid->[$i]) {
      my $result;
      my $line;

      close $tmpfd if defined $tmpfd;

      close $child->[$i];
      select($parent);
      $| = 1;	# print to parent by default, turn off buffering
      send_line($parent,"START");
      while ($line = read_line($parent)) {
	if ($line eq "exit") {
	  close $parent;
	  exit;
	}

	my($class, $format, $date, $where, $result) = $iter->_run_message($line);
	$result ||= '';

	# If determine_receive_date is not set, the original input date
	# wasn't calculated, but run_message would have done so, so reset
	# the packed version if possible ...  use defined for date since
	# it could == 0.
        if (!$iter->{determine_receive_date} && $class && $format && defined $date && $where) {
	  $line = Mail::SpamAssassin::ArchiveIterator::_index_pack($date, $class, $format, $where);
        }

	$result = encode_base64($result);
	send_line($parent,"$result\0RESULT $line");
      }
      exit;
    }
    else {
      die "mass-check: cannot fork: $!";
    }
  }
}

## handling killing off the children

sub reap_children {
  my ($count, $socket, $pid) = @_;

  # If the child died, sending it the exit will generate a SIGPIPE, but we
  # don't really care since the readline will go undef (which is fine),
  # then we do the waitpid which will finish it off.  So we end up in the
  # right state, in theory.
  local $SIG{'PIPE'} = 'IGNORE';

  for (my $i = 0; $i < $count; $i++) {
    aidbg "mass-check: killing child $i (pid ",$pid->[$i],")\n";
    send_line($socket->[$i],"exit"); # tell the child to die.
    close $socket->[$i];
    waitpid($pid->[$i], 0); # wait for the signal ...
  }
}

# in server mode, this gets called to read in the HTTP request from a given
# socket, then return the information the client sent to us.
sub handle_http_request {
  my $socket = shift;

  my $headers = {};
  my $postdata = {};

  # read in the request (POST / HTTP/1.0)
  my $line = $socket->getline();
  $line ||= '';
  $rcvd_bytes += length($line);
  $line =~ s/\r\n$//;

  my ($type, $URI, $VERS) = $line =~ /^([a-zA-Z]+)\s+(\S+)(?:\s*(\S+))/;
  unless ($type && $URI && $VERS) {
    $type ||= '';
    $URI  ||= '';

    return ($type, $URI, $headers, $postdata);
  }

  $type = uc $type;

  # read in headers, "key: value" up to a blank line
  do {
    $line = $socket->getline();
    last unless defined $line;
    $rcvd_bytes += length($line);
    $line =~ s/\r\n$//;

    if ($line) {
      my ($k,$v) = split(/:\s*/, $line, 2);
      $headers->{lc $k} = $v;
    }
  } while ($line !~ /^$/);

  # if this is a POST request w/ content-length, there'll be a payload, deal
  # with it.  we only support compressed payloads.
  my $postheader;
  if ($type eq 'POST' && $headers->{'content-length'}) {
    $rcvd_bytes += $headers->{'content-length'};
    my $pd = '';
    if ($headers->{'content-encoding'} eq 'x-gzip') {
      # assign an id to the client if it doesn't already have one
      $headers->{'client-id'} ||= ++$client_id;

      my ($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile();
      die "Can't make tempfile, exiting" unless $gzpath;

      # TODO: don't read in the entire thing at once to avoid memory bloat
      my $rd;
      $socket->read($rd, $headers->{'content-length'}) || die "mass-check: error reading in data from client\n";
      print $gzfd $rd;
      $gzfd->close;

      my $fd = IO::Zlib->new($gzpath, "rb");
      die "Can't open temp result file: $!" unless $fd;

      if ($headers->{'action'}) {
        # different types of POST contents are packed different ways
        if ($headers->{'action'} eq 'sending cache') {

          if ($opt_cs_schedule_cache) {
            # save the client cache to a tied hash

            # get a temp file for the client cache database
            my ($dbpath, $dbfd) = Mail::SpamAssassin::Util::secure_tmpfile();
            close $dbfd;
            unlink $dbpath;
            push @cache_tmp_files, $dbpath.'.pag';
            push @cache_tmp_files, $dbpath.'.dir';

            tie %{$client_caches{$headers->{'client-id'}}}, "SDBM_File", $dbpath,
              O_RDWR|O_CREAT, 0600 || die "Cannot tie hash to file $dbpath: $!\n";

            # dump the client's list of cached message paths to the DB_HASH
            my $client_cache = 0;
            my $client_cache_hits = 0;
            while (my $msg = read_line($fd)) {
              my @d = Mail::SpamAssassin::ArchiveIterator::_index_unpack($msg);
              $client_cache++;
              if (exists $server_caches{'to_process'}{$d[3]}) {
                $client_caches{$headers->{'client-id'}}{$d[3]} = undef;
                $server_caches{'cache_count'}{$d[3]}++;
                delete $server_caches{'not_cached'}{$d[3]};
                # log this client's cache hit rate
                $client_cache_hits++;
              }
            }
            status("client $headers->{'client-id'} has $client_cache msgs cached ($client_cache_hits usable)");
            status("client $headers->{'client-id'} has ".(sprintf("%.1f", ($client_cache_hits / $total_messages * 100))).'% of required messages');
          }
        }
        elsif ($headers->{'action'} eq 'sending results') {
          # process the results
          $pd = read_line($fd);

          # key1=value1&key2=value2...
          %{$postdata} = map {
            my($k,$v) = split(/=/, $_, 2);

            # we need to decode the key and value
            $k =~ s/\%([0-9a-fA-F]{2})/sprintf "%c", hex($1)/eg;
            $v =~ s/\%([0-9a-fA-F]{2})/sprintf "%c", hex($1)/eg;

            $k => $v;
          } split(/\&/, $pd);

        }
      }

      $fd->close;
      undef $fd;
      $gzfd->close;
      unlink $gzpath;
    }
  }
  return($type, $URI, $headers, $postdata);
}

# in server mode, generate a gzip compressed data stream with the messages and
# return the path to the compressed file which the server will read and pass
# to the client.
#
# Input:
#  - Number of messages to generate (scalar)
#  - Hash of Arrays of outstanding requests (reference to hash of array refs)
#     timestamp# -> [ num1, num2, ... ]
#     Used to quickly find outstanding/timed out messages to send to client.
#  - Hash of outstanding messages and associated data (ref to hash of hash refs)
#     num1 -> { data => 'binary data from scan mode', timestamp => timestamp# }
#     Used later on to specify the timestamp entry to remove the entry from.
#  - Paths only?  If true, just include the original message data in the gzip
#    file.  Otherwise, include the message data.  Useful if the client has the
#    corpus available via the same paths as originally specified.
#
# Returns: scalar path to gzip file
#
sub generate_messages {
  my($msgs, $timestamps, $msgsout, $paths_only, $client_id) = @_;

  # Hold the message numbers we'll be sending out
  my @tosend = ();
  my @sent = ();

  # Find out if any of the messages we sent out before need to be sent out
  # again because we haven't seen a response within the timeout.
  my $tooold = time - $opt_cs_timeout;
  foreach (sort { $a <=> $b } keys %{$timestamps}) {
    # since we're going in numeric order, if the current entry is newer than
    # the timeout value, the rest will be too, so stop looking.
    last if ($_ > $tooold);

    # messages that might be eligible for retry
    my @toretry = ();

    # how many messages do we still need to fulfill the request?
    my $wanted = $msgs - @tosend;

    if (@{$timestamps->{$_}} > $wanted) {
      # there are more entries in the timestamp list than we want, so just
      # grab that many off the list.
      push(@toretry, splice @{$timestamps->{$_}}, 0, $wanted);
    }
    else {
      # there are just enough, or not enough entries on the timestamp list to
      # satisfy our request, so take them all and we'll loop around.
      push(@toretry, @{$timestamps->{$_}});
      delete $timestamps->{$_};
    }

    # limit retries
    foreach my $num (@toretry) {
      if ($msgsout->{$num}->{'count'} < $opt_cs_max_tries) {
        $msgsout->{$num}->{'count'}++;
        push @tosend, $num;
      } else {
        warn "skipping message num $num after $opt_cs_max_tries attempts, index: ".
          (Mail::SpamAssassin::ArchiveIterator::_index_unpack($msgsout->{$num}->{'data'}))[3]."\n";
        delete $msgsout->{$num};
        $failed_msgs++;
      }
    }

    # Ok, we have enough messages so we can stop now.
    last if (@tosend == $msgs);
  }

  if (!$opt_cs_schedule_cache) {
    # if we still have the temp file with the input messages open, we'll fillup
    # out message output queue with messages from there.
    if ($tmpfd) {
      while (@tosend < $msgs) {
        my $msg = read_line($tmpfd);

        # no more messages from the temp file, close it out
        unless ($msg) {
          delete $msgsout->{'curnum'};
          close $tmpfd;
          undef $tmpfd;
          last;
        }

        # we got a result, so assign it a number (curnum) and store the data
        # appropriately, then add the new number to the queue.
        my $num = ++$msgsout->{'curnum'};
        $msgsout->{$num}->{'data'} = $msg;
        $msgsout->{$num}->{'count'}++;
        push(@tosend, $num);
      }
    }
  }
  else {
    if (($msgs_processed + $failed_msgs) != $total_messages) {
      # select messages based on what the client*s* have cached
 
      # if the client hasn't sent cache data, fake it
      unless (exists $client_caches{$client_id}) {
        %{$client_caches{$client_id}} = ();
        $unique_cache_completed{$client_id} = 1;
      }

      # first: select messages that only the current client has cached
      MESSAGE: while (!$unique_cache_completed{$client_id} &&
                 @tosend < $msgs &&
                 (my($path, undef) = each %{$client_caches{$client_id}})) {
        # check that no other clients have it cached
        next MESSAGE if $server_caches{'cache_count'}{$path} > 1; 

        # we got a result, so assign it a number (curnum) and store the data
        # appropriately, then add the new number to the queue.
        my $num = ++$msgsout->{'curnum'};
        $msgsout->{$num}->{'data'} = $server_caches{'to_process'}{$path};
        $msgsout->{$num}->{'count'}++;
        push(@tosend, [ $num, 1 ]);
        delete $server_caches{'to_process'}{$path};
        delete $server_caches{'cache_count'}{$path};
        delete $client_caches{$client_id}{$path};
        $cache_hits++;
      }
      if (@tosend < $msgs) {
        $unique_cache_completed{$client_id} = 1;
      }

      # second: hand out messages that no clients have cached
      while (@tosend < $msgs && 
             (my($path, undef) = each %{$server_caches{'not_cached'}})) {
        # we got a result, so assign it a number (curnum) and store the data
        # appropriately, then add the new number to the queue.
        my $num = ++$msgsout->{'curnum'};
        $msgsout->{$num}->{'data'} = $server_caches{'to_process'}{$path};
        $msgsout->{$num}->{'count'}++;
        push(@tosend, [ $num, 0 ]);
        delete $server_caches{'to_process'}{$path};
        delete $server_caches{'cache_count'}{$path};
        delete $server_caches{'not_cached'}{$path};
      }

      # third: hand out messages in the client's cache regardless of how many
      #        other clients have them cached; smart scheduling takes too long
      while (@tosend < $msgs &&
             (my($path, undef) = each %{$client_caches{$client_id}})) {
        # we got a result, so assign it a number (curnum) and store the data
        # appropriately, then add the new number to the queue.
        my $num = ++$msgsout->{'curnum'};
        $msgsout->{$num}->{'data'} = $server_caches{'to_process'}{$path};
        $msgsout->{$num}->{'count'}++;
        push(@tosend, [ $num, 1 ]);
        delete $server_caches{'to_process'}{$path};
        delete $server_caches{'cache_count'}{$path};
        $cache_hits++;

        # testing with 5 clients, this seems to be faster (in msgs/hr) than
        # deleting from each other clients' cache when we go to get their
        # messages; of course it's more I/O on the server this way
        my $cache_count = $server_caches{'cache_count'}{$path};
        foreach my $cc (keys %client_caches) {
          delete $client_caches{$cc}{$path};
          last unless --$cache_count;
        }     
      }

      # fourth: hand out messages that other clients have cached without
      #         regard for how many clients have cached them; smart
      #         scheduling takes too long (at least when using DBM)
      while (@tosend < $msgs &&
             (my($path, $msg) = each %{$server_caches{'to_process'}})) {
        # we got a result, so assign it a number (curnum) and store the data
        # appropriately, then add the new number to the queue.
        my $num = ++$msgsout->{'curnum'};
        $msgsout->{$num}->{'data'} = $server_caches{'to_process'}{$path};
        $msgsout->{$num}->{'count'}++;
        push(@tosend, [ $num, 0 ]);
        delete $server_caches{'to_process'}{$path};
        delete $server_caches{'cache_count'}{$path};

        my $cache_count = $server_caches{'cache_count'}{$path};
        foreach my $cc (keys %client_caches) {
          delete $client_caches{$cc}{$path};
          last unless --$cache_count;
        }
      }
    }
    else {
      # close the tmpfd, etc so that the main loop knows we're done
      delete $msgsout->{'curnum'};
      close $tmpfd;
      undef $tmpfd;
    }
  }

  # ok, at this point, @tosend ought to have a list of numbers, pointers into
  # %{$msgsout}.  turn that into a tar file.
  return '' unless @tosend;

  my($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile();
  die "Can't make tempfile, exiting" unless $gzpath;
  close($gzfd);

  $gzfd = IO::Zlib->new($gzpath, 'wb') || die "Can't create temp gzip file: $!";

  # first line is the number of messages included in the file
  send_line($gzfd, scalar @tosend) || die "mass-check: error when writing to gz temp file\n";

  # Generate an archive in the temp file
  foreach my $num (@tosend) {
    my $in_cache = 0;
    if (ref($num) eq 'ARRAY') {
      $in_cache = $num->[1];
      $num = $num->[0];
    }
    # Archive format, gzip compressed file w/ 4 parts per message:
    # 1- server message number in text format
    # 2- server index string, binary packed format
    # 3- a 1 if the message is included, 0 otherwise -- unless paths_only
    # 4- message content -- unless paths_only or #3 is 0
    my $data = $msgsout->{$num}->{'data'};
    if (!$paths_only) {
      unless ($in_cache) {
        my $msg = ($iter->_run_message($data))[4];
        if ($msg) {
          send_line($gzfd, $num) || die "mass-check: error when writing to gz temp file\n";
          send_line($gzfd, $data) || die "mass-check: error when writing to gz temp file\n";
          send_line($gzfd, '1') || die "mass-check: error when writing to gz temp file\n";
          send_line($gzfd, (defined $msg ? join('', @{$msg}) : '')) ||
            die "mass-check: error when writing to gz temp file\n";
          push @sent, $num;
        } else {
          # if the message has an error (probably due to it being removed sometime during the
          # run) we send a message number of 0 to indicate to the client that the server had an
          # error retrieving the message; we need to do this since we have already added the
          # line telling the client how many messages to expect
          my $filename = (Mail::SpamAssassin::ArchiveIterator::_index_unpack($data))[3];
          warn qq/mass-check: ArchiveIterator returned error for "$filename"\n/;
          send_line($gzfd, 0) || die "mass-check: error when writing to gz temp file\n";
          delete $msgsout->{$num};
          $failed_msgs++;
        }
      } else {
        send_line($gzfd, $num) || die "mass-check: error when writing to gz temp file\n";
        send_line($gzfd, $data) || die "mass-check: error when writing to gz temp file\n";
        send_line($gzfd, '0') || die "mass-check: error when writing to gz temp file\n";
        push @sent, $num;
      }
    } else {
      send_line($gzfd, $num) || die "mass-check: error when writing to gz temp file\n";
      send_line($gzfd, $data) || die "mass-check: error when writing to gz temp file\n";
      # the client deals with missing messages on its own (sort of)
      push @sent, $num;
    }
  }

  $gzfd->close;

  # update timestamp entries
  my $ts = time;

  # make sure the timestamp is unique!  without Time::HiRes it is trivial and
  # common to have two reissueings of timedout messages in the same second
  # with Time::HiRes we'll check anyway since we'll waste less time checking
  # for uniqueness than one of us will waste debugging a report of mass-check
  # not completing due to someone's wacky clock (some (non-para-)virtualized
  # servers will have their clocks go forward and backward)
  while (exists $timestamps->{$ts}) {
    if (HAS_TIME_HI_RES) {
      $ts = Time::HiRes::time();
    } else {
      $ts += 0.01;
    }
  }

  foreach (@sent) {
    $msgsout->{$_}->{'timestamp'} = $ts;
  }

  # conveniently, this list should be the only thing sent out w/ this
  # timestamp, so just set the reference appropriately. :)
  $timestamps->{$ts} = \@sent;

  if ($opt_noisy) {
    status('sent '.scalar(@sent).' of '.scalar(@tosend).' intended messages');
  }

  return $gzpath;
}

# we've gotten results posted, so clean up msgsout and timestamp hashes and
# process result...
sub handle_post_results {
  my($postdata, $timestamps, $msgsout) = @_;

  # local version to batch the removals
  my %timestamps = ();

  # $msgsout->{num}->{data|timestamp}
  # $timestamp{num} = [ msgout_nums ... ]
  # $postdata{num} = result_string

  while( my($k,$v) = each %{$postdata} ) {
    # message run results will be \d+ => log entry
    next if ($k !~ /^\d+$/);

    # if we've been waiting for this result, process it, otherwise throw it on
    # the ground.  multiple clients could have been given the same messages to
    # process, and we take whatever the first responder sends us.
    if (exists $msgsout->{$k}) {
      # the result_sub will need parts of the message data, so get it ready
      my @d = Mail::SpamAssassin::ArchiveIterator::_index_unpack($msgsout->{$k}->{'data'});

      # go ahead and do the result
      &{$iter->{result_sub}}($d[1], $v, $d[0]);

      # prep to get rid of the cached entries
      $timestamps{$msgsout->{$k}->{'timestamp'}}->{$k} = 1;
      delete $msgsout->{$k};

      $msgs_processed++;
    }
  }

  # if we got any results, clean out the results from the timestamp arrays
  while ( my($k,$v) = each %timestamps ) {
    # trim out the result list from the timestamp sent list
    my @temp = grep(!exists $v->{$_}, @{$timestamps->{$k}});

    # if there are results left for a specific timestamp, update the array
    # pointer.  otherwise, delete the timestamp entry since it's empty.
    if (@temp) {
      $timestamps->{$k} = \@temp;
    }
    else {
      delete $timestamps->{$k};
    }
  }
}

# This function reads from $tmpfd and processes the message as appropriate wrt
# $opt_j, $opt_restart, etc.
#
sub run_through_messages {
  # do everything in one process
  if ($opt_j <= 1 && !defined $opt_restart) {
    my $message;
    my $messages;
    my $total_count = 0;

    while (($total_messages > $total_count) && ($message = read_line($tmpfd))) {
      my($class, undef, $date, undef, $result) = $iter->_run_message($message);
      if ($result) {
        &{$iter->{result_sub}}($class, $result, $date);
      }
      $total_count++;
    }
    $msgs_processed += $total_count;
  }
  # more than one process or one process with restarts
  else {
    my $select = IO::Select->new();

    my $total_count = 0;
    my $needs_restart = 0;
    my @child = ();
    my @pid = ();
    my $messages;

    # start children processes
    start_children($opt_j, \@child, \@pid, $select);

    # feed childen, make them work for it, repeat
    while ($select->count()) {
      foreach my $socket ($select->can_read()) {
        my $line = read_line($socket);

        # some error happened during the read!
        if (!defined $line) {
          $needs_restart = 1;
          warn "mass-check: readline failed, attempting to recover\n";
          $select->remove($socket);
        }
        elsif ($line =~ /^([^\0]*)\0RESULT (.+)$/s) {
	  my $result = decode_base64($1);
	  my ($date,$class,$type) = Mail::SpamAssassin::ArchiveIterator::_index_unpack($2);
	  aidbg "mass-check: $class, $type, $date\n";

	  if (defined $opt_restart && ($total_count % $opt_restart) == 0) {
	    $needs_restart = 1;
	  }

	  # if messages remain, and we don't need to restart, send message
	  if (($total_messages > $total_count) && !$needs_restart) {
	    my $line = read_line($tmpfd);
	    unless ($line) {
	      warn "mass-check: found short message list ($total_messages, $total_count)\n";
	      $select->remove($socket);
	      next;
	    }

            send_line($socket, $line);
	    $total_count++;
	    aidbg "mass-check: $total_messages $total_count\n";
	  }
	  else {
	    # stop listening on this child since we're done with it
	    aidbg "mass-check: $needs_restart $total_messages $total_count\n";
	    $select->remove($socket);
	  }

	  # deal with the result we received
	  if ($result) {
	    &{$iter->{result_sub}}($class, $result, $date);
	  }
        }
        elsif ($line eq "START") {
	  if ($total_messages > $total_count) {
	    # we still have messages, send one to child
	    send_line($socket, read_line($tmpfd));
	    $total_count++;
	    aidbg "mass-check: $total_messages $total_count\n";
	  }
	  else {
	    # no more messages, so stop listening on this child
	    aidbg "mass-check: $needs_restart $total_messages $total_count\n";
	    $select->remove($socket);
	  }
        }
        else {
          $needs_restart = 1;
          warn "mass-check: bad line from readline: $line\n";
          $select->remove($socket);
        }
      }

      aidbg "mass-check: out of loop, $total_messages $total_count $needs_restart ".$select->count()."\n";

      # If there are still messages to process, and we need to restart
      # the children, and all of the children are idle, let's go ahead.
      if ($needs_restart && $select->count == 0 && $total_messages > $total_count) {
        $needs_restart = 0;

        aidbg "mass-check: needs restart, $total_messages total, $total_count done\n";
        reap_children($opt_j, \@child, \@pid);
        @child=();
        @pid=();
        start_children($opt_j, \@child, \@pid, $select);
      }
    }
    $msgs_processed += $total_count;

    # reap children
    reap_children($opt_j, \@child, \@pid);
  }
}

# send an HTTP response to a socket based on the input result, headers, and
# data values.
sub http_response {
  my($socket, $result, $headers, $data) = @_;

  $headers->{'Content-Length'} = length $data;
  $headers->{'Cache-Usage'} = $opt_cs_cache ? 'allowed' : 'disallowed';

  print $socket
    "HTTP/1.0 $result\r\n",
    "Pragma: no-cache\r\n",
    "Server: mass-check/$svn_revision\r\n",
    (map { "$_: ".$headers->{$_}."\r\n" } keys %{$headers}), "\r\n";
  print $socket $data;

  if ($opt_cs_verbose) {
    $sent_bytes += length(
      "HTTP/1.0 $result\r\n".
      "Pragma: no-cache\r\n".
      "Server: mass-check/$svn_revision\r\n");
    # length of a map output isn't what the other end claims to see
    while (my($f,$v) = each %{$headers}) {
      $sent_bytes += length($f) + length($v) + 4;
    }
    $sent_bytes += 2 + length($data);
  }
}

# the client needs to make a request to the server on a given socket.
sub http_make_request {
  my($socket, $type, $uri, $headers, $data) = @_;

  $headers->{'Content-Length'} = length $data;

  print $socket
    "$type $uri HTTP/1.0\r\n",
    "User-Agent: mass-check/$svn_revision\r\n",
    (map { "$_: ".$headers->{$_}."\r\n" } keys %{$headers}), "\r\n";
  print $socket $data;

  if ($opt_cs_verbose) {
    $sent_bytes += length(
      "$type $uri HTTP/1.0\r\n".
      "User-Agent: mass-check/$svn_revision\r\n");
    # length of a map output isn't what the other end claims to see
    while (my($f,$v) = each %{$headers}) {
      $sent_bytes += length($f) + length($v) + 4;
    }
    $sent_bytes += 2 + length($data);
  }

  # parse the response that the server sends us
  my $line = $socket->getline() || '';
  $rcvd_bytes += length($line);
  my(undef, $code, $string) = split(/\s+/, $line, 3);
  return unless $code == 200;

  my %headers = ();
  do {
    $line = $socket->getline();
    last unless defined $line;
    $rcvd_bytes += length($line);
    $line =~ s/\r\n$//;

    if ($line) {
      my ($k,$v) = split(/:\s*/, $line, 2);
      $headers{lc $k} = $v;
    }
  } while ($line !~ /^$/);
  $rcvd_bytes += $headers{'content-length'} if $headers{'content-length'};
  $client_id ||= $headers{'client-id'} if $headers{'client-id'};

  # disable cache-usage if the server disallows cache usage
  $opt_cs_cache = 0 if $headers{'cache-usage'} eq 'disallowed';

  # the server has sent us notification that it's going to exit, so let's
  # follow suit.
  return 'finished' if ($headers{'finished'});

  my $gzpath = '';
  if ($headers{'content-type'} eq 'application/x-gzip') {
    my $gzfd;
    ($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile();
    die "Can't make tempfile, exiting" unless $gzpath;

    my $rd;
    $socket->read($rd, $headers{'content-length'}) || die "mass-check: error reading in data from server\n";
    print $gzfd $rd;
    close $gzfd;
  }

  $socket->close();
  return $gzpath;
}

# Be conservative -- encode most things.
# we could encode spaces to plusses, then decode that later, but...
sub post_encode {
  my $string = shift;
  $string =~  s/([^a-zA-Z0-9_,.\/\\-])/sprintf "%%%02x",unpack("C",$1)/egx;
  return $string;
}

# remove all of the files in a given directory, non-recursive
sub clean_dir {
  my $dir = shift;

  unless (opendir(DIR, $dir)) {
    warn "error: can't opendir $dir: $!\n";
    return;
  }
  while(my $file = readdir(DIR)) {
    $file =~ /^(.+)$/;       # untaint
    $file = $1;

    my $path = File::Spec->catfile($dir, $file);
    next unless (-f $path);

    if (!unlink $path) {
      warn "error: can't remove file $path: $!\n";
      closedir(DIR);
      return;
    }
  }
  closedir(DIR);
  return 1;
}

############################################################################

# four bytes in network/vax format (little endian) as length of message
# the rest is the actual message

sub read_line {
  my $fd = shift;
  my($length,$msg);

  # read in the 4 byte length and unpack
  $fd->read($length, 4) || return;

  $length = unpack("V", $length);
  return unless $length;

  # read in the rest of the single message
  $fd->read($msg, $length) || return;

  return $msg;
}

sub send_line {
  my $fd = shift;
  foreach ( @_ ) {
    my $length = pack("V", length $_);
    $fd->print($length.$_) || return 0;
  }

  return 1;
}

############################################################################

# this is the function that implemented server mode.  basically, sit and wait
# for connections to come in.  when a client sends in a request, deal with any
# results that the client sent, then generate a response and send it back,
# and then go back to waiting.  lather, rinse, repeat.
sub server_mode {
  $opt_cs_max ||= 1000;
  $opt_cs_timeout ||= 60 * 5;
  $opt_cs_max_tries ||= 3;

  # IO::Socket::SSL isn't that smart
  $opt_server =~ s/:(\d+)$//;
  my $port = $1;

  # ::SSL needs resolved hostnames, at least on Solaris. note: not IPv6-aware
  my $localaddr = (gethostbyname($opt_server))[4] || $opt_server;
  $localaddr = Socket::inet_ntoa($localaddr);

  my $serv_socket;
  if ($opt_cs_ssl) {
    $serv_socket = IO::Socket::SSL->new(
            LocalAddr => $localaddr,
            LocalPort => $port,
            ReuseAddr => 1,
            Listen => 5,
            SSL_verify_mode => 0x02,
            SSL_use_cert => 1,
            SSL_version => "TLSv1",
            SSL_key_file => "spamassassin/server-key.pem",
            SSL_cert_file => "spamassassin/server-cert.pem",
          );
  }
  else {
    $serv_socket = IO::Socket::INET->new(
            LocalAddr => $localaddr,
            LocalPort => $port,
            ReuseAddr => 1,
            Listen => 5,
          );
  }

  die "Could not create socket: $!\n" unless $serv_socket;

  if ($opt_progress) {
    status('server ready for connections');
  }

  # Setup out "what messages have been sent out" hashes
  my $timestamps = {};
  my $msgsout = { 'curnum' => 0 };

  # Generate an IO::Select object and put the server socket on the queue
  my $select = IO::Select->new( $serv_socket );

  # We'll keep looping while there's something to pay attention to
  while ($select->count()) {
    # Sit and block until there's something for us to read from
    foreach my $socket ($select->can_read()) {
      if ($socket == $serv_socket) {
        # it's the server socket, go ahead and accept the connection and add
	# it to the queue.
        $select->add($serv_socket->accept);
      }
      else {
	# it's some client, so deal with the request
	my($type, $URI, $headers, $postdata) = handle_http_request($socket);

	# we don't do GET, so just send something back
	if ($type eq 'GET') {
	  if ($opt_noisy) {
	    status('GET request from '.$socket->peerhost);
	  }

	  http_response($socket, "200 OK", {
	      'Content-type' => 'text/plain',
	    },
	    "Your GET request came from IP Address: ".$socket->peerhost."\n");
	}
        elsif ($type eq 'POST') {
	  # ooh, POST.  deal with any results that the client sent
	  handle_post_results($postdata, $timestamps, $msgsout);

	  if ($opt_noisy) {
	    status('POST request from '.$socket->peerhost);
	  }

          # based on the number of messages that the client requested,
	  # generate a gzip file with the appropriate data in it
	  my $messages = '';
	  if ($postdata->{'max_messages'}) {
	    my $msgnum = $postdata->{'max_messages'};
	    if ($msgnum > $opt_cs_max || $msgnum < 1) {
	      $msgnum = $opt_cs_max;
	    }

	    if ($opt_noisy) {
	      status('client requested '.$postdata->{'max_messages'}.' messages');
	    }

	    $messages = generate_messages($msgnum, $timestamps, $msgsout, $postdata->{'paths_only'}, $headers->{'client-id'});
	  }

          # $messages will contain the path to the gzip file if there are
	  # messages to send out.
          if ($messages && open(MSG, $messages)) {
	    $t_first_msg = time unless $t_first_msg;
	    binmode(MSG);
	    local $/ = undef;  # go go slurp mode

	    # send the response
	    http_response($socket, "200 OK", {
	      'Content-Type' => 'application/x-gzip',
	      'Content-Encoding' => 'x-gzip',
              'Client-ID'    => $headers->{'client-id'},
	      },
	      scalar <MSG>);

	    close(MSG);

	    # we don't need the file anymore, so get rid of it
	    unlink $messages;
          }
	  elsif (!keys %{$msgsout} && !defined $tmpfd) {
	    # we have no more outstanding messages and our original queue of
	    # messages to process is empty, so tell the client to exit.
	    http_response($socket, "200 OK", {
              "Content-type" => "text/plain",
              'Client-ID'    => $headers->{'client-id'},
	      "Finished" => 1,
	      },
	      'We are all done');
            $t_last_msg = time;
	  }
	  else {
	    # when in doubt, treat this like a GET
	    http_response($socket, "200 OK", {
              "Content-type" => "text/plain",
              'Client-ID'    => $headers->{'client-id'},
	      },
              "Your POST request (sans max_messages) came from IP Address: ".$socket->peerhost."\n");
	  }
	}
	else {
          # for error, "501 Not Implemented"
	  http_response($socket, '501 Not Implemented', {}, '');
	}
      
	# ok, we don't do keepalive, so get rid of the socket
        $select->remove($socket);
	$socket->close;
      }
    }

    if ($opt_noisy) {
      status((exists $msgsout->{'curnum'} ? (scalar(keys %{$msgsout})-1) :
              scalar(keys %{$msgsout})).' messages outstanding');
    }

#print "msgs waiting: ".join(" ", keys %{$msgsout})."\n";
#print "tmpfd defined? ".(defined $tmpfd ? "yes" : "no")."\n";

    # we're not awaiting responses and we've exhausted the input file, so
    # drop the server socket. :)
    $select->remove($serv_socket) if (!keys %{$msgsout} && !defined $tmpfd);
  }

  # remove the server and client message cache temp files
  foreach my $tied_hash (keys %server_caches) {
    undef $tied_hash;
  }
  foreach my $tied_hash (keys %client_caches) {
    undef $tied_hash;
  }
  foreach my $tmp_file (@cache_tmp_files) {
    unlink $tmp_file;
  }
}

# this is the function that implements client mode.  generally, in a loop:
#  make a request of the server for some max number of messages, and send our
#  results back at the same time.  based on the results of that request, put
#  messages into a temp dir and process them.  prep the results and loop.
#  lather, rinse, repeat.
sub client_mode {
  $opt_cs_max ||= 1000;
  $opt_cs_timeout ||= 60 * 2;
  $opt_cs_conn_retries ||= 60;            # 1 hour

  my($host, $port, $uri);

  if ($opt_client =~ /^http:\/\/([^\/]+)(\/.*)?/) {
    ($host, $uri) = ($1,$2);
  } else {
    $host = $opt_client;
    if ($host =~ /^:/) {
      $host = 'localhost'.$host;
    }
  }
  ($host, $port) = split(/:/, $host);

  die "No host found in opt_client" unless $host;
  $uri ||= "/";

  # ::SSL needs resolved hostnames, at least on Solaris. note: not IPv6-aware
  $host = Socket::inet_ntoa(Socket::inet_aton($host));

  # use this to track how many messages we ought to be requesting
  # start at 100 to get warmed up
  my $msgnum = $opt_cs_max > 100 ? 100 : $opt_cs_max;

  my $tmpdir;

  # if we're not doing paths_only, create a temp dir where we'll put the
  # incoming messages to process.
  if (!$opt_cs_paths_only) {
    $tmpdir = Mail::SpamAssassin::Util::secure_tmpdir();
    die "Can't create tempdir" unless $tmpdir;
  }

  my $made_conn_once = 0;

  # keep going until something stops us.
  while (1) {
    # send cache data if this is the first connect
    # do this before creating the connection so we don't waste all the other
    # clients' time, once we've connected, by monopolizing the single server
    my ($gzpath, $gzfd, $action);
    if (!$made_conn_once && $opt_cs_schedule_cache) {
      # we need to find out what messages we have in our client cache so that we
      # can tell the server what we've got so that it can optimize our cache hit
      # rate by requesting us to scan messages that are already in our cache
      $action = 'sending cache';
      $gzpath = scan_client_cache();
    }
    else {
      $action = 'sending results';
      # if the number of messages to request is too much, bring it down
      $msgnum = $opt_cs_max if ($msgnum > $opt_cs_max);

      # prep the POST request
      $postdata{'max_messages'} = $msgnum;
      $postdata{'paths_only'} = 1 if ($opt_cs_paths_only);

      # compress the results
      ($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile();
      die "Can't make tempfile, exiting" unless $gzpath;
      $gzfd->close;
      $gzfd = IO::Zlib->new($gzpath, 'wb') || die "Can't create temp gzip file: $!";

      # the actual POST data string
      send_line($gzfd, join('&', map { post_encode($_) . '=' . post_encode($postdata{$_}) } keys %postdata)) || 
        die "mass-check: error when writing to gz temp file\n";
      $gzfd->close;
      %postdata = ();
    }

    # connect to server
    my $socket;
    if ($opt_cs_ssl) {
      $socket = IO::Socket::SSL->new(
		PeerAddr => $host,
		PeerPort => $port,
		SSL_version => "TLSv1",
		SSL_use_cert => 1,
		SSL_key_file => "spamassassin/client-key.pem",
		SSL_cert_file => "spamassassin/client-cert.pem",
	      );
    } else {
      $socket = IO::Socket::INET->new(
		PeerAddr => $host,
		PeerPort => $port
              );
    }

    if (!$socket) {
      unlink $gzpath;
      undef $gzfd;
      # if we haven't yet made a connection, keep retrying;
      # this is probably the server still in "scan stage"
      if (!$made_conn_once) {
        if ($opt_cs_conn_retries-- > 0) {
          status('failed to connect, sleeping for retry') if ($opt_noisy);
          sleep 60;
          next;
        } else {
          status('failed to connect, giving up') if ($opt_noisy);
          last;
        }
      }

      # last if connection fails after scanning something
      last;
    }

    $made_conn_once = 1;
    status("requesting $msgnum messages from server") if ($opt_noisy);

    # make request, include and then drop results if there are any
    my $POSTDATA = '';
    if ($gzpath) {
      if (open(RESULTS, $gzpath)) {
        binmode(RESULTS);
        {
          # slurp here, rather than into an anonymous variable in the
          # http_make_request call so that we don't end up slurping the
          # response from the server too
          local $/ = undef; # slurp
          $POSTDATA = scalar <RESULTS>;
        }
        close(RESULTS);
        unlink $gzpath;
        undef $gzfd;
      } else {
        die "Can't open tempfile, exiting" unless $gzpath;
      }
    }

    my $result = http_make_request($socket, 'POST', $uri, {
      'Host'		=> $host,
      'Content-Type'	=> 'application/x-www-form-urlencoded',
      'Content-Encoding' => 'x-gzip',
      'Action'		=> $action,
      'Client-ID'	=> ($client_id || '')
      },
      $POSTDATA
    );
    undef $POSTDATA;

    $t_last_msg = time;

    # If we received messages to run through, go ahead and do it.
    # otherwise, just sleep for the timeout length and try again
    if (!defined $result) {
      # we got an error?!?  abort!
      last;
    }
    elsif ($result eq 'finished') {
      # the server said that we're done
      status('server has no more work, exiting.') if ($opt_noisy);
      last;
    }
    elsif ($result eq '') {
      # no messages means the server may give us more work down the road.
      # sleep for client_timeout seconds and try the request again
      status("Received no messages, waiting $opt_cs_timeout seconds") if ($opt_noisy);
      sleep $opt_cs_timeout;
    }
    else {
      # we got messages, so deal with them.
      my $time_start = time;
      $t_first_msg = $time_start unless $t_first_msg;

      # postdata will hold our results, real will hold the original message
      # data from the server's scan mode.
      %postdata = ();
      %real = ();
      $init_results = $total_count = $spam_count = $ham_count = 0;

      # we got a result, so do things with it!
      my $gzfd = IO::Zlib->new($result, "rb");
      die "Can't open temp result file: $!" unless $gzfd;

      # used for the temp queue file
      my $tmppath;
      ($tmppath, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile();
      die "Can't make tempfile, exiting" unless $tmppath;
      unlink $tmppath;

      # if we have a temp directory, clean it out for this run
      clean_dir($tmpdir) if ($tmpdir);

      # Archive format, gzip compressed file w/ 4 parts per message:
      # 1- server message number in text format
      # 2- server index string, binary packed format
      # 3- a 1 if the message is included, 0 otherwise -- unless paths_only
      # 4- message content -- unless paths_only or #3 is 0

      # number of messages
      $msgnum = $total_messages = read_line($gzfd) || die "mass-check: error reading from gzip message file\n";

      status("server says it gave us $total_messages messages") if ($opt_progress);
      my $actual_messages = 0;

      # loop through and prep all of the messages the server sent
      for(my $i = 0 ; $i < $total_messages; $i++ ) {
        my $num = read_line($gzfd);
	last unless defined $num;

        next unless $num; # a message number of 0 indicates a msg-error on the server

        my $index = read_line($gzfd);
	last unless defined $index;

	my @d = Mail::SpamAssassin::ArchiveIterator::_index_unpack($index);

	# if we're doing paths_only, there'll be no message content
	if (!$opt_cs_paths_only) {
          my $msg_included = read_line($gzfd);
          last unless defined $msg_included;

	  my $msg;
          if ($msg_included == 1) {
            $msg = read_line($gzfd);
            last unless defined $msg;
          }

          # permanently cache the message on the client
          my $cache_success = 0;
          if ($opt_cs_cache) {
            unless (-f "$opt_cs_cachedir/$d[3]") {
              umask 0077; # prevent others on the system from reading corpus messages
                          # TODO: the entire mass-check script should probably use umask 0077;

              mkdir $opt_cs_cachedir; # mkdir won't bitch if it already exists, that's ok
              if (-d $opt_cs_cachedir && $d[3] =~ m{^/?(.*(?![^\\]\\)/)?(.+)$}) {
                my ($path, $filename) = ($1, $2);
                my $dir = '';
                $path ||= '';
                while ($path =~ m#((?![^\\]\\)[^/]+/)#gc) {
                  $dir .= $1;
                  mkdir "$opt_cs_cachedir/$dir";
                }
                if (-d "$opt_cs_cachedir/$dir") {
                  if (open(OUT, ">$opt_cs_cachedir/$dir$filename")) {
                    print OUT $msg;
                    close $msg;

                    $cache_success = 1;
                  }
                  else {
                    warn "Can't create/write message cache file $opt_cs_cachedir/$dir$filename: $!";
                  }
                }
              }
            }
            else {
              $cache_success = 1;
              $cache_hits++;
            }
            if ($cache_success) {
              # this is a little tricky -- we need to process the files in the
              # path and format we've created, but the original data is needed
              # to create a proper result later, so deal with that here.
              $real{"$opt_cs_cachedir/$d[3]"} = \@d;
              push @{$real{"$opt_cs_cachedir/$d[3]"}}, $num;
              send_line($tmpfd,
                Mail::SpamAssassin::ArchiveIterator::_index_pack($d[0], $d[1], 'f', "$opt_cs_cachedir/$d[3]")) ||
                  die "mass-check: error writing out temp file in client mode\n";
              $actual_messages++;
            }
          }

          # if the cache failed try a temp file instead
          if (!$opt_cs_cache || !$cache_success) {
            next unless defined $msg;
 	    # it's going to be a dir of file formatted messages
	    if (open(OUT, ">$tmpdir/$num")) {
	      print OUT $msg;
	      close(OUT);

              # this is a little tricky -- we need to process the files in the
              # path and format we've created, but the original data is needed
              # to create a proper result later, so deal with that here.
              $real{"$tmpdir/$num"} = \@d;
              push @{$real{"$tmpdir/$num"}}, $num;
              send_line($tmpfd,
                Mail::SpamAssassin::ArchiveIterator::_index_pack($d[0], $d[1], 'f', "$tmpdir/$num")) ||
                  die "mass-check: error writing out temp file in client mode\n";
              $actual_messages++;
	    }
	    else {
	      warn "Can't create/write message temp file $tmpdir/$num: $!";
	    }
          }
	}
        # paths_only_mode
	else {
          # permanently cache the message on the client
          my $cache_success = 0;
          if ($opt_cs_cache) {
            unless (-f "$opt_cs_cachedir/$d[3]") {
              umask 0077; # prevent others on the system from reading corpus messages
                          # TODO: the entire mass-check script should probably use umask 0077;

              mkdir $opt_cs_cachedir; # mkdir won't bitch if it already exists, that's ok
              if (-d $opt_cs_cachedir && $d[3] =~ m{^/?(.*(?![^\\]\\)/)?(.+)$}) {
                my ($path, $filename) = ($1, $2);
                my $dir = '';
                $path ||= '';
                while ($path =~ m#((?![^\\]\\)[^/]+/)#gc) {
                  $dir .= $1;
                  mkdir "$opt_cs_cachedir/$dir";
                }
                if (-d "$opt_cs_cachedir/$dir") {
                  # copy the file to the local cache for use next (and this) time
                  if (copy($d[3], "$opt_cs_cachedir/$d[3]")) {
                    $cache_success = 1;
                  }
                }
              }
            }
            else {
              $cache_success = 1;
              $cache_hits++;
            }
            if ($cache_success) {
              # this is a little tricky -- we need to process the files in the
              # path and format we've created, but the original data is needed
              # to create a proper result later, so deal with that here.
              $real{"$opt_cs_cachedir/$d[3]"} = \@d;
              push @{$real{"$opt_cs_cachedir/$d[3]"}}, $num;
              send_line($tmpfd,
                Mail::SpamAssassin::ArchiveIterator::_index_pack($d[0], $d[1], 'f', "$opt_cs_cachedir/$d[3]")) ||
                  die "mass-check: error writing out temp file in client mode\n";
              $actual_messages++;
            }
          }

          # if the messages isn't in our cache use the supplied path instead
          if (!$opt_cs_cache || !$cache_success) {
	    # in paths_only mode, there's no kluging between formats since we're
	    # reading the same corpus, however we do still need to track server
	    # message number to message data so our results will be useable.
            $real{$d[3]} = \@d;
            push @{$real{$d[3]}}, $num;
	    send_line($tmpfd, $index) ||
	        die "mass-check: error writing out temp file in client mode\n";
            $actual_messages++;
          }
	}
      }

      $gzfd->close;
      unlink $result;

      # if the server tries to give us messages, but some are errored,
      # we need to know that so we don't try to process them all.
      if ($total_messages != $actual_messages) {
        status("server actually gave us $actual_messages messages") if ($opt_progress);
        $total_messages = $actual_messages;
      }

      if ($opt_progress) {
        status('starting run stage');
      }

      # we're about to start running, so go back to the start of the file
      seek $tmpfd, 0, 0;

      run_through_messages();

      # we're done with the temp file -- bye bye
      close($tmpfd);

      # figure out new max messages, try keeping ~cs_timeout between runs
      my $time_end = time;

      # if we only requested a small number of messages, it may take <1s to
      # run through them, so fake it and say it took 1s.
      if ($time_end == $time_start) {
        $time_end++;
      }

      if ($opt_progress) {
        status('completed run stage');
      }

      status('completed run in '.($time_end-$time_start).' seconds') if ($opt_noisy);
      $msgnum = int($msgnum * $opt_cs_timeout / ($time_end-$time_start)) || 1;
    }
  }

  # if we were using a temp dir, clean it out and then remove it
  if ($tmpdir) {
    clean_dir($tmpdir);
    rmdir $tmpdir;
  }
}

# scan the client's cache and return a path to a gzip archive of AI output
sub scan_client_cache {
  status('starting cache scan stage') if ($opt_progress);

  # make sure the cachedir exists so that AI doesn't bail out
  mkdir $opt_cs_cachedir unless -e $opt_cs_cachedir;

  # Make a temp file and delete it
  my $tmpf;
  ($tmpf, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile();
  die 'mass-check: failed to create temp file' unless $tmpf;
  unlink $tmpf or die "mass-check: unlink '$tmpf': $!";

  my @targets = ("h:dir:$opt_cs_cachedir");
  generate_queue(\@targets, $tmpfd);

  # we now have a temporary file with the messages to process
  seek($tmpfd, 0, 0);
  # the first line is the number of messages
  $total_messages = read_line($tmpfd);
  showdots_finish();
  status("completed cache scan stage, $total_messages messages") if ($opt_progress);

  unless ($tmpfd) {
    status('cache scan failed');
    return;
  }
  status('compressing cache data') if ($opt_progress);

  # create a temp file for compression
  my($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile();
  die "Can't make tempfile, exiting" unless $gzpath;
  $gzfd->close;
  $gzfd = IO::Zlib->new($gzpath, 'wb') || die "Can't create temp gzip file: $!";

  while (my $msg = read_line($tmpfd)) {
    $msg =~ s/\r?\n$//;
    my @d = Mail::SpamAssassin::ArchiveIterator::_index_unpack($msg);
    $d[3] =~ s/^$opt_cs_cachedir//;

    send_line($gzfd, Mail::SpamAssassin::ArchiveIterator::_index_pack($d[0], $d[1], $d[2], $d[3])) ||
      die "mass-check: error writing out temp file in client mode\n";
  }
  close $tmpfd;
  undef $tmpfd;
  $gzfd->close;
  status('cache data compressed to '.(-s $gzpath).' bytes') if ($opt_progress);

  return $gzpath;
}

############################################################################

# in server mode, just return the ref to the message data
sub wanted_server {
  my ($class, $id, $time, $dataref, $format) = @_;
  return $dataref;
}

# very similar to result() except the result has the message number at the
# front, so strip it off and then set the POST data appropriately.
sub result_client {
  my ($class, $result, $time) = @_;

  # don't open results files until we get here to avoid overwriting files
  init_results() if !$init_results;

  if ($class eq "s") {
    $spam_count++;
  }
  elsif ($class eq "h") {
    $ham_count++;
  }

  $total_count++;

  if ($opt_progress) {
    progress($time);
  }

  if ($result =~ s/^(\d+)\s+//m) {
    $postdata{$1} = $result;
  }
  else {
    warn ">> WTH!?  result is not in the correct format: $result\n";
    # 20071114: bit of a hack
    # prevent malformed cs_client message result lines from preventing the
    # cs_server running in cs_schedule cache mode from completing
    # TODO: find out how result lines get malformed (malformed result lines
    # will still hang any cs_server not running with cs_schedule_cache)
    $failed_msgs++;
  }
}

sub aidbg {
  if (would_log("dbg", "mass-check") == 2) {
    dbg (@_);
  }
}

sub deal_with_before_after {
  my($which, $time) = @_;

  if ($time && $time =~ /^-\d+$/) {
    $time = time + $time;
  }
  elsif ($time && $time !~ /^-?\d+$/) {
    if (HAS_TIME_PARSEDATE) {
      $time = Time::ParseDate::parsedate($time, GMT => 1, PREFER_PAST => 1);
    }
    else { 
      die "You need Time::ParseDate if you use either the --before or --after option.";
    }
  }
  
  if ($which eq 'before') {
    $opt_before = $time;
  }
  else {
    $opt_after = $time;
  }

  if ($opt_before && $opt_after && $opt_after >= $opt_before) {
    die "--before ($opt_before) <= --after ($opt_after) -- conflict!";
  }
}

sub generate_queue {
  my ($targets, $tmpfd) = @_;

  # scan the targets and get the number and list of messages
  $iter->_scan_targets($targets,
    sub {
      my($self, $date, $class, $format, $mail) = @_;
      push(@{$self->{$class}}, Mail::SpamAssassin::ArchiveIterator::_index_pack($date, $class, $format, $mail));
    }
  );

  # deal with opt_head and opt_tail
  top_and_tail_messages($iter->{h});
  top_and_tail_messages($iter->{s});

  my $messages;
  if ($opt_n) {
    # OPT_N == 1 means don't bother sorting on message receive date

    # for ease of memory, we'll play with pointers
    $messages = $iter->{s};
    undef $iter->{s};
    if ($iter->{h}) {
      push(@{$messages}, @{$iter->{h}});
      undef $iter->{h};
    }
  }
  else {
    # OPT_N == 0 means sort on message receive date

    # Sort the spam and ham groups by date
    my @s = @{$iter->{s}};
    undef $iter->{s};
    my @h = @{$iter->{h}};
    undef $iter->{h};

    # interleave ordered spam and ham
    if (@s && @h) {
      my $ratio = @s / @h;
      while (@s && @h) {
	push @{$messages}, (@s / @h > $ratio) ? (shift @s) : (shift @h);
      }
    }
    # push the rest onto the end
    push @{$messages}, @s, @h;
  }

  # head or tail < 0 means crop the total list, negate the value appropriately
  if ($opt_tail < 0) {
    splice(@{$messages}, 0, $opt_tail);
  }
  if ($opt_head < 0) {
    splice(@{$messages}, -$opt_head);
  }

  my $num = $Mail::SpamAssassin::ArchiveIterator::MESSAGES = scalar(@{$messages});

  # Dump out the number of messages and the message index info to
  # the temp file
  send_line($tmpfd, $num, @{$messages});
}

sub top_and_tail_messages {
  my ($ary) = @_;

  if ($opt_n) {
    # OPT_N == 1 means don't bother sorting on message receive date

    # head or tail > 0 means crop each list
    if ($opt_tail > 0) {
      splice(@{$ary}, 0, -$opt_tail);
    }
    if ($opt_head > 0) {
      splice(@{$ary}, min ($opt_head, scalar @{$ary}));
    }
  }
  else {
    # OPT_N == 0 means sort on message receive date

    # Sort the spam and ham groups by date
    my @s = sort { $a cmp $b } @{$ary};

    # head or tail > 0 means crop each list
    if ($opt_tail > 0) {
      splice(@s, 0, -$opt_tail);
    }
    if ($opt_head > 0) {
      splice(@s, min ($opt_head, scalar @s));
    }

    @{$ary} = @s;
  }
}

sub min {
  return ($_[0] < $_[1] ? $_[0] : $_[1]);
}

############################################################################

sub run_post_scan {
  return unless $opt_run_post_scan;
  system($opt_run_post_scan);
  if ($? >> 8 != 0) {
    warn "$opt_run_post_scan failed";
  }
}

