| #!/usr/bin/perl -w |
| use strict; |
| use MIME::Base64; |
| |
| # |
| # <@LICENSE> |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to you under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at: |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # </@LICENSE> |
| |
| sub aidbg; |
| |
| sub usage { |
| my $status = shift; |
| |
| my $out = $status ? \*STDERR : \*STDOUT; |
| print $out <<EOF; |
| usage: mass-check [options] target ... |
| |
| -c=file set configuration/rules directory |
| -p=dir set user-prefs directory |
| -f=file read list of targets from <file> |
| -j=jobs specify the number of processes to run simultaneously |
| --net turn on network checks |
| --mid report Message-ID from each message |
| --debug[=LIST] report debugging information (default is all facilities, LIST |
| is a comma-separated list of facilities) |
| --rewrite=OUT save rewritten message to OUT (default is /tmp/out) |
| --rules=RE Only test rules matching the given regexp RE |
| --restart=N restart all of the children after processing N messages |
| --deencap=RE Extract SpamAssassin-encapsulated spam mails only if they |
| were encapsulated by servers matching the regexp RE |
| (default = extract all SpamAssassin-encapsulated mails) |
| --lint check rules for syntax before running |
| --cf='config line' Additional line of configuration |
| --pre='config line' Additional line of ".pre" (prepended to configuration) |
| --run_post_scan='command' Run the named command after the 'scan' stage, |
| before starting the 'run' stage |
| |
| verbosity options |
| --progress show progress updates during check |
| --noisy show noisier progress updates during check |
| --showdots print a dot for each scanned message |
| |
| client/server mode options |
| --server=host:port |
| use server mode, running on the given hostname and port |
| --client=host:port |
| use client mode, connecting to the given hostname and port |
| --cs_conn_retries=N |
| only used in client mode. set the number of times to retry |
| the initial connection to the server, while waiting 60 |
| seconds between connection attempts, default is 60 retries |
| --cs_max=N |
| at most, only ever request (client)/give out (server) a |
| maximum of N messages (defaults to 1000) |
| --cs_timeout=N |
| in client mode, try to connect to the server every N seconds |
| defaults to 120 |
| in server mode, timeout messages after N seconds |
| defaults to 300 |
| --cs_paths_only |
| only used in client mode. when making requests of the |
| server, only ask for paths to the messages and not the |
| messages themselves. useful when the client and server |
| have the same paths to the corpus data. |
| --cs_ssl use SSL to encrypt on-the-wire client-server traffic |
| (requires IO::Socket::SSL, see |
| http://wiki.apache.org/spamassassin/SslMassCheck for more) |
| --cs_verbose Log network bandwidth utilization figures and other statistics |
| --cs_schedule_cache |
| Distribute messages so that they are checked on clients who |
| have the messages in their local message cache, implies |
| --cs_cache; if --cs_schedule_cache is not enabled, but |
| --cs_cache is, clients running in --cs_paths_only mode will |
| opportunistically use the messages from their local cache |
| --cs_cache in client mode, enable the local message cache |
| in server mode, allows the clients to use cached messages |
| and/or add to their local caches |
| --cs_cachedir=dir |
| write cache info for --cs_cache in this directory tree |
| --cs_max_tries=N |
| maximum number of attempts to have a client scan a message |
| defaults to 3 |
| |
| log options |
| -o write all logs to stdout |
| --loghits log the text hit for patterns (useful for debugging) |
| --loguris log the URIs found |
| --logmem log the memory delta (only on Linux) |
| --hamlog=log use <log> as ham log ('ham.log' is default) |
| --spamlog=log use <log> as spam log ('spam.log' is default) |
| |
| message selection options |
| -n no date sorting or spam/ham interleaving |
| --cache use cache information when selecting messages |
| --cachedir=dir write cache info for --cache in this directory tree |
| --all don't skip big messages |
| |
| message selection options, can be specified for each target |
| --after=N only test mails received after time_t N (negative values |
| are an offset from current time, e.g. -86400 = last day) |
| or after date as parsed by Time::ParseDate (e.g. '-6 months') |
| --before=N same as --after, except received times are before time_t N |
| --scanprob=N probability of scanning a message, range 0.0 - 1.0 (default: 1.0) |
| |
| message selection options, can be specified for each target class |
| --head=N only check first N ham and N spam (N messages if -n used) |
| --tail=N only check last N ham and N spam (N messages if -n used) |
| |
| simple target options (implies -o and no ham/spam classification) |
| --dir subsequent targets are directories |
| --file subsequent targets are files in RFC 822 format |
| --mbox subsequent targets are mbox files |
| --mbx subsequent targets are mbx files |
| |
| Just left over functions we should remove at some point: |
| --bayes report score from Bayesian classifier |
| |
| options used during score generation process |
| --learn=N learn N% of messages as spam or ham |
| --reuse reuse network checks if X-Spam-Status: is present in messages |
| (note: both clients and servers in c/s mode need this) |
| |
| non-option arguments are used as target names (mail files and folders), |
| the target format is: <class>:<format>:<location> |
| <class> is "spam" or "ham" |
| <format> is "dir", "file", "mbx", "mbox", or "detect" |
| <location> is a file or directory name. globbing of ~ and * is supported |
| |
| (see http://wiki.apache.org/spamassassin/MassCheck for more details.) |
| |
| EOF |
| exit($status); |
| } |
| |
| ########################################################################### |
| |
| our ($opt_c, $opt_p, $opt_f, $opt_j, $opt_n, $opt_o, $opt_all, $opt_bayes, |
| $opt_debug, $opt_format, $opt_hamlog, $opt_head, $opt_loghits, |
| $opt_mid, $opt_net, $opt_nosort, $opt_progress, $opt_showdots, |
| $opt_spamlog, $opt_tail, $opt_rules, $opt_restart, $opt_loguris, |
| $opt_logmem, $opt_after, $opt_before, $opt_rewrite, $opt_deencap, |
| $opt_learn, $opt_reuse, $opt_lint, $opt_cache, $opt_noisy, $opt_cf, |
| $total_messages, $statusevery, $opt_cachedir, $opt_scanprob, |
| $opt_client, $opt_cs_conn_retries, $opt_cs_max, $opt_cs_timeout, |
| $opt_cs_paths_only, $opt_server, %postdata, %real, $svn_revision, |
| $opt_cs_ssl, $opt_run_post_scan, $opt_cs_verbose, %client_caches, |
| %server_caches, @cache_tmp_files, %min_other_caches, |
| %unique_cache_completed, $opt_cs_schedule_cache, $opt_cs_cache, |
| $opt_cs_cachedir, $opt_cs_max_tries, $opt_pre, |
| $tmpfd); |
| |
| use FindBin; |
| |
| # use "blib" so that we can use e.g. @@LOCAL_STATE_DIR@@; "lib" doesn't |
| # have that stuff substituted :( use lib too, though, as a backup, |
| # since some users might be running mass-check without "make" first |
| use lib "$FindBin::Bin/../lib"; |
| use lib "$FindBin::Bin/../blib/lib"; |
| |
| use IO::Select; |
| use IO::Socket; |
| use Socket qw(); |
| use Mail::SpamAssassin::ArchiveIterator; |
| use Mail::SpamAssassin; |
| use Mail::SpamAssassin::Logger; |
| use File::Copy; |
| use File::Spec; |
| use Getopt::Long; |
| use POSIX qw(strftime); |
| use Fcntl qw(O_RDWR O_CREAT);; |
| use Config; |
| |
| use constant HAS_TIME_PARSEDATE => eval { require Time::ParseDate; }; |
| use constant HAS_IO_ZLIB => eval { require IO::Zlib; }; |
| use constant HAS_IO_SOCKET_SSL => eval { require IO::Socket::SSL; }; |
| use constant HAS_TIME_HI_RES => eval { require Time::HiRes; }; |
| use constant HAS_SDBM_FILE => eval { require SDBM_File; }; |
| |
| |
| # default settings |
| $opt_c = "$FindBin::Bin/../rules"; |
| $opt_p = "$FindBin::Bin/spamassassin"; |
| $opt_j = 1; |
| $opt_head = 0; |
| $opt_tail = 0; |
| $opt_net = 0; |
| $opt_hamlog = "ham.log"; |
| $opt_spamlog = "spam.log"; |
| $opt_learn = 0; |
| $opt_cf = []; |
| $opt_pre = []; |
| |
| my $rcvd_bytes = 0; |
| my $sent_bytes = 0; |
| my $t_first_msg = 0; |
| my $t_last_msg = 0; |
| my $msgs_processed = 0; |
| my $failed_msgs = 0; |
| my $cache_hits = 0; |
| my $client_id = 0; |
| |
| my @ORIG_ARGV = @ARGV; |
| GetOptions("c=s", "p=s", "f=s", "j=i", "n", "o", "all", "bayes", "debug:s", |
| "hamlog=s", "head=i", "loghits", "mh", "mid", "ms", "net", |
| "progress!", "rewrite:s", "showdots", "spamlog=s", "tail=i", |
| "rules=s", "restart=i", "loguris", "run_post_scan=s", |
| "deencap=s", "logmem", "learn=i", "reuse", "lint", "cache", |
| "cachedir=s", "noisy", "scanprob=f", |
| "server=s", "cs_max=i", "cs_timeout=i", "cs_conn_retries=i", |
| "cs_paths_only", "client=s", "cs_ssl", "cs_verbose", |
| "cs_schedule_cache", "cs_cache", "cs_cachedir=s", "cs_max_tries=i", |
| "before=s" => \&deal_with_before_after, |
| "after=s" => \&deal_with_before_after, |
| "cf=s" => \@{$opt_cf}, |
| "pre=s" => \@{$opt_pre}, |
| "dir" => sub { $opt_format = "dir"; }, |
| "file" => sub { $opt_format = "file"; }, |
| "mbox" => sub { $opt_format = "mbox"; }, |
| "mbx" => sub { $opt_format = "mbx"; }, |
| "help" => sub { usage(0); }, |
| '<>' => \&target) or usage(1); |
| |
| # We need IO::Zlib for client-server mode! |
| if ( ($opt_client || $opt_server) && ! HAS_IO_ZLIB ) { |
| die "IO::Zlib required for client/server mode!\n"; |
| } |
| |
| if ($opt_cs_ssl && ! HAS_IO_SOCKET_SSL ) { |
| die "IO::Socket::SSL required for --cs_ssl!\n"; |
| } |
| |
| if ($opt_noisy) { |
| $opt_progress = 1; # implies --progress |
| } |
| |
| $opt_debug ||= 'all' if defined $opt_debug; |
| |
| if ($opt_cs_schedule_cache) { |
| $opt_cs_cache = 1; # implies --cs_cache |
| } |
| |
| if ($opt_client && $opt_cs_cache && !$opt_cs_cachedir) { |
| warn "You must specify a local message cache directory with --cs_cachedir\n". |
| "when using the --cs_cache option.\n"; |
| exit; |
| } |
| |
| if ($opt_server && $opt_cs_schedule_cache && !$opt_n) { |
| warn '*'x74 ."\n"; |
| warn '*'. ' 'x72 ."*\n"; |
| warn "* Corpus will be run un-sorted but with date stamp loggging which is *\n"; |
| warn "* needed for score generation log selection but is not available when *\n"; |
| warn "* using the -n option. This may affect the results of any bayes and *\n"; |
| warn "* AWL tests run during this mass-check. *\n"; |
| warn '*'. ' 'x72 ."*\n"; |
| warn '*'x74 ."\n"; |
| } |
| |
| if ($opt_cs_schedule_cache && !HAS_SDBM_FILE) { |
| warn "--cs_schedule_cache requires the Perl module SDBM_File.\n"; |
| exit; |
| } |
| |
| # --lint |
| if ($opt_lint) { |
| # In theory we could probably use the same spamtest object as below, |
| # but since it's probably not expecting that, and we don't want |
| # strange things happening, create a local object. |
| my $spamlint = create_spamtest(); |
| $spamlint->debug_diagnostics(); |
| my $res = $spamlint->lint_rules(); |
| $spamlint->finish(); |
| if ($res) { |
| warn "lint: $res issues detected, ". |
| "please rerun with debug enabled for more information\n"; |
| exit 1; |
| } |
| } |
| |
| # test messages for the mass-check |
| my @targets; |
| if (!$opt_client) { |
| if ($opt_f) { |
| open(F, $opt_f) || die "cannot read target $opt_f: $!"; |
| push(@targets, map { chomp; $_ } <F>); |
| close(F); |
| } |
| usage(1) if !@targets; |
| } |
| |
| if ($opt_reuse) { |
| # if we have --reuse, don't bother testing DNS; we shouldn't be hitting |
| # the wire at all, and in fact we may be running without a net connection |
| push @{$opt_cf}, "dns_available yes\n"; |
| |
| # need to load M:SA:Plugin:Reuse |
| push @{$opt_pre}, "loadplugin Mail::SpamAssassin::Plugin::Reuse\n"; |
| |
| } |
| |
| my $user_prefs = "$opt_p/user_prefs"; |
| |
| sub create_spamtest { |
| return Mail::SpamAssassin->new({ |
| 'debug' => $opt_debug, |
| 'rules_filename' => $opt_c, |
| 'site_rules_filename' => "$opt_p/local.cf", |
| 'userprefs_filename' => $user_prefs, |
| 'userstate_dir' => $opt_p, |
| 'save_pattern_hits' => $opt_loghits, |
| 'dont_copy_prefs' => 1, |
| 'local_tests_only' => $opt_net ? 0 : 1, |
| 'only_these_rules' => $opt_rules, |
| 'ignore_safety_expire_timeout' => 1, |
| 'pre_config_text' => join("\n", @{$opt_pre})."\n", |
| 'post_config_text' => join("\n", @{$opt_cf})."\n", |
| PREFIX => '', |
| DEF_RULES_DIR => $opt_c, |
| LOCAL_RULES_DIR => '', |
| }); |
| } |
| |
| my $spamtest = create_spamtest(); |
| $spamtest->compile_now(0); # 0 since we will be reading more configs |
| $spamtest->read_scoreonly_config("$FindBin::Bin/mass-check.cf"); |
| $spamtest->read_scoreonly_config($user_prefs); |
| $spamtest->call_plugins("prefork_init"); # since SA 3.4.0 |
| |
| my $who = `id -un 2>/dev/null`; |
| my $where = `uname -n 2>/dev/null`; |
| my $when = `date -u`; |
| my $host = $ENV{'HOSTNAME'} || $ENV{'HOST'} || $where || `hostname` || 'localhost'; |
| chomp $who; |
| chomp $where; |
| chomp $when; |
| chomp $host; |
| $svn_revision = get_current_svn_revision(); |
| |
| # when displaying the commandline, quote any arguments which have |
| # "questionable" characters such as spaces, pipes, etc. |
| my $cmdline = join(' ',map { m@[^A-Za-z0-9_/\\.-]@ ? qq/"$_"/ : $_ } @ORIG_ARGV); $cmdline =~ s/\s+/ /gs; |
| my $isowhen = strftime("%Y%m%dT%H%M%SZ", gmtime(time)); # better |
| |
| my $log_header = "# mass-check results from $who\@$where, on $when\n" . |
| "# M:SA version ".$spamtest->Version()."\n" . |
| "# SVN revision: $svn_revision\n" . |
| "# Date: $isowhen\n" . |
| "# Perl version: $] on $Config{archname}\n" . |
| "# Switches: '$cmdline'\n"; |
| |
| my $updates = ($opt_noisy ? 100 : 10); |
| my $total_count = 0; |
| my $spam_count = 0; |
| my $ham_count = 0; |
| my $init_results = 0; |
| |
| my $showdots_active = ($opt_showdots || $opt_noisy); |
| my $showdots_counter = 0; |
| my $showdots_every = ($opt_showdots ? 1 : 20); |
| |
| my $AIopts = { |
| 'opt_all' => $opt_all, |
| 'opt_skip_empty_messages' => 1, |
| }; |
| |
| if (!$opt_client) { |
| # Deal with --rewrite |
| if (defined $opt_rewrite) { |
| my $rewrite = ($opt_rewrite ? $opt_rewrite : "/tmp/out"); |
| open(REWRITE, "> $rewrite") || die "open of $rewrite failed: $!"; |
| } |
| |
| # ArchiveIterator options for non-client mode |
| $AIopts->{'opt_scanprob'} = $opt_scanprob; |
| $AIopts->{'opt_cache'} = $opt_cache; |
| $AIopts->{'opt_cachedir'} = $opt_cachedir; |
| $AIopts->{'opt_after'} = $opt_after; |
| $AIopts->{'opt_before'} = $opt_before; |
| $AIopts->{'scan_progress_sub'} = \&showdots_blip; |
| $AIopts->{'opt_want_date'} = ! $opt_n; |
| |
| # ensure that scanprob stuff is predictable and reproducable |
| if (defined $opt_scanprob && $opt_scanprob < 1.0) { |
| srand(1); |
| } |
| } |
| else { |
| # ArchiveIterator options for client mode -- tends to be simple |
| $opt_n = 1; |
| $AIopts->{'opt_want_date'} = 0; |
| } |
| |
| ########################################################################### |
| ## SCAN MODE |
| |
| my $iter = Mail::SpamAssassin::ArchiveIterator->new($AIopts); |
| |
| # setup the AI functions |
| if ($opt_client) { |
| $iter->set_functions(\&wanted, \&result_client); |
| } |
| elsif ($opt_server) { |
| $iter->set_functions(\&wanted_server, \&result); |
| } |
| else { |
| $iter->set_functions(\&wanted, \&result); |
| } |
| |
| #AFTER SETTING ALL OPTS, RUN _set_default_message_selection_opts() TO GET SANE DEFAULTS |
| $iter->_set_default_message_selection_opts(); |
| |
| my $messages; |
| |
| # normal mode as well as a server do scan mode and get a temp file |
| if (!$opt_client) { |
| status('starting scan stage') if ($opt_progress); |
| |
| # Make a temp file and delete it |
| my $tmpf; |
| ($tmpf, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile(); |
| die 'mass-check: failed to create temp file' unless $tmpf; |
| unlink $tmpf or die "mass-check: unlink '$tmpf': $!"; |
| |
| # having opt_j or server mode means do scan in a separate process |
| if ($opt_server || $opt_j) { |
| if ($tmpf = fork()) { |
| # parent |
| waitpid($tmpf, 0); |
| } |
| elsif (defined $tmpf) { |
| # child -- process using message_array |
| generate_queue(\@targets, $tmpfd); |
| exit; |
| } |
| else { |
| die "mass-check: cannot fork: $!"; |
| } |
| } |
| else { |
| # we get here if opt_j == 0, so scan in this process |
| generate_queue(\@targets, $tmpfd); |
| } |
| |
| # we now have a temporary file with the messages to process |
| seek($tmpfd, 0, 0); |
| # the first line is the number of messages |
| $total_messages = read_line($tmpfd); |
| |
| if (!$total_messages) { |
| die "mass-check: no messages to process\n"; |
| } |
| |
| if ($opt_cs_schedule_cache) { |
| # create a tied hash database for the server to use for matching against |
| # client hashes |
| |
| # while we're at it, count the total number of messages for ourself |
| # I suspect that AI gets it wrong sometimes, somehow |
| $total_messages = 0; |
| |
| # get a temp file for the server database on messages to process |
| my ($dbpath, $dbfd) = Mail::SpamAssassin::Util::secure_tmpfile(); |
| close $dbfd; |
| unlink $dbpath; |
| push @cache_tmp_files, $dbpath.'.pag'; |
| push @cache_tmp_files, $dbpath.'.dir'; |
| |
| tie %{$server_caches{'to_process'}}, "SDBM_File", $dbpath, |
| O_RDWR|O_CREAT, 0600 || die "Cannot tie hash to file $dbpath: $!"; |
| |
| # get a temp file for the server DB of messages that are not cached anywhere |
| ($dbpath, $dbfd) = Mail::SpamAssassin::Util::secure_tmpfile(); |
| close $dbfd; |
| unlink $dbpath; |
| push @cache_tmp_files, $dbpath.'.pag'; |
| push @cache_tmp_files, $dbpath.'.dir'; |
| |
| tie %{$server_caches{'not_cached'}}, "SDBM_File", $dbpath, |
| O_RDWR|O_CREAT, 0600 || die "Cannot tie hash to file $dbpath: $!"; |
| |
| # get a temp file for the server database of cache hit counts per message |
| ($dbpath, $dbfd) = Mail::SpamAssassin::Util::secure_tmpfile(); |
| close $dbfd; |
| unlink $dbpath; |
| push @cache_tmp_files, $dbpath.'.pag'; |
| push @cache_tmp_files, $dbpath.'.dir'; |
| |
| tie %{$server_caches{'cache_count'}}, "SDBM_File", $dbpath, |
| O_RDWR|O_CREAT, 0600 || die "Cannot tie hash to file $dbpath: $!"; |
| |
| # dump the server's list of messages to process to the DB_HASHes |
| while (my $msg = read_line($tmpfd)) { |
| my @d = Mail::SpamAssassin::ArchiveIterator::_index_unpack($msg); |
| $server_caches{'to_process'}{$d[3]} = $msg; |
| $server_caches{'not_cached'}{$d[3]} = undef; |
| $total_messages++; |
| } |
| # and back to the first message (although we currently never use it again) |
| seek($tmpfd, 0, 0); |
| read_line($tmpfd); |
| } |
| |
| showdots_finish(); |
| status("completed scan stage, $total_messages messages") if ($opt_progress); |
| } |
| |
| ########################################################################### |
| ## RUN MODE |
| |
| run_post_scan(); |
| |
| if ($opt_client) { |
| client_mode(); |
| } |
| else { |
| status('starting run stage') if ($opt_progress); |
| |
| if ($opt_server) { |
| server_mode(); |
| } |
| else { |
| $t_first_msg = time; |
| run_through_messages(); |
| $t_last_msg = time; |
| } |
| |
| status('completed run stage') if ($opt_progress); |
| } |
| |
| # Even though we're about to exit, let's clean up after ourselves |
| close($tmpfd) if ($tmpfd); |
| showdots_finish(); |
| |
| if (defined $opt_rewrite) { |
| close(REWRITE); |
| } |
| |
| $spamtest->finish(); |
| |
| if ($opt_cs_verbose) { |
| if ($opt_client || $opt_server) { |
| warn "network I/O: sent=$sent_bytes, received=$rcvd_bytes\n"; |
| } |
| $t_last_msg++ if $t_last_msg == $t_first_msg; |
| warn "processed $msgs_processed messages (failed to scan $failed_msgs) in ".($t_last_msg - $t_first_msg). |
| ' seconds ('.(sprintf("%.1f", ($msgs_processed / ($t_last_msg - $t_first_msg) * 3.6))). |
| " kmsgs/hr)\n"; |
| $msgs_processed ||= 1; |
| warn "cache hits: $cache_hits (" |
| .(sprintf("%.1f", ($cache_hits / $msgs_processed * 100)))."%)\n" if $opt_cs_cache; |
| } |
| |
| # exit status: did we check at least one message correctly? |
| exit(!($ham_count || $spam_count)); |
| |
| ########################################################################### |
| |
| sub target { |
| my ($target) = @_; |
| |
| # message-selection options; these can now be specified separately |
| # for each target |
| my %selopts = ( |
| opt_scanprob => $opt_scanprob, |
| opt_after => $opt_after, |
| opt_before => $opt_before |
| ); |
| |
| if (!defined($opt_format)) { |
| push(@targets, { %selopts, target => $target }); |
| } |
| else { |
| $opt_o = 1; |
| push(@targets, { %selopts, target => "spam:$opt_format:$target" }); |
| } |
| } |
| |
| ########################################################################### |
| |
| sub init_results { |
| $init_results = 1; |
| |
| showdots_finish(); |
| |
| # now, showdots only happens if --showdots was used |
| $showdots_active = $opt_showdots; |
| |
| if ($opt_progress) { |
| # round up since 100% will be caught at end already |
| $statusevery = int($total_messages / $updates + 1); |
| |
| # if $messages < $updates, just give a status line per msg. |
| $statusevery ||= 1; |
| } |
| |
| return if $opt_client; |
| |
| if ($opt_o) { |
| autoflush STDOUT 1; |
| print STDOUT $log_header; |
| } |
| else { |
| open(HAM, "> $opt_hamlog") || die "open of $opt_hamlog failed: $!"; |
| open(SPAM, "> $opt_spamlog") || die "open of $opt_spamlog failed: $!"; |
| autoflush HAM 1; |
| autoflush SPAM 1; |
| print HAM $log_header; |
| print SPAM $log_header; |
| } |
| } |
| |
| sub result { |
| my ($class, $result, $time) = @_; |
| |
| # don't open results files until we get here to avoid overwriting files |
| init_results() if !$init_results; |
| |
| if ($class eq "s") { |
| if ($opt_o) { print STDOUT $result; } else { print SPAM $result; } |
| $spam_count++; |
| } |
| elsif ($class eq "h") { |
| if ($opt_o) { print STDOUT $result; } else { print HAM $result; } |
| $ham_count++; |
| } |
| |
| $total_count++; |
| |
| if ($opt_progress) { |
| progress($time); |
| } |
| } |
| |
| sub wanted { |
| my ($class, $id, $time, $dataref, $format) = @_; |
| my $out = ''; |
| |
| # if origid is defined, it'll be the message number from server mode |
| my $origid; |
| |
| # client mode is a little crazy because we need to kluge around the fact |
| # that the information needed to do the run is different than the |
| # information that goes into the results. |
| if ($opt_client) { |
| # change the format and id to the real version, make sure to remember |
| # the server's message number |
| # note: pop/push makes it work regardless of any changes to M::SA::AI |
| $origid = pop @{$real{$id}}; |
| $format = $real{$id}->[2]; |
| $id = $real{$id}->[3]; |
| } |
| |
| memory_track_start() if ($opt_logmem); |
| $spamtest->timer_reset; |
| |
| # parse the message, and force it to complete |
| my $ma = $spamtest->parse($dataref, 1); |
| |
| # get X-Spam-Status: header for rule hit resue |
| my $x_spam_status; |
| my $reusing; |
| if ($opt_reuse) { |
| $x_spam_status = $ma->get_header("X-Spam-Status"); |
| $x_spam_status and $x_spam_status =~ s/,\s+/,/gs; |
| } |
| |
| # remove SpamAssassin markup, if present and the mail was spam |
| my $header = $ma->get_header("Received"); |
| if ($header && $header =~ /\bwith SpamAssassin\b/) { |
| if (!$opt_deencap || message_should_be_deencapped($ma)) { |
| my $new_ma = $spamtest->parse($spamtest->remove_spamassassin_markup($ma), 1); |
| $ma->finish(); |
| $ma = $new_ma; |
| } |
| } |
| |
| if ($opt_reuse) { |
| if ($x_spam_status |
| && $x_spam_status =~ m/tests=(\S*)/ |
| && $x_spam_status !~ /\bshortcircuit=(?:ham|spam|default)\b/) |
| { |
| my @previous = split(/,/, $1); |
| # Bug 7709 |
| # Amavis X-Spam-Status rules include score and are enclosed in [] |
| # Amavis: [RULENAME=0.01,RULENAME_2=0.01] |
| # Spamassassin: RULENAME,RULENAME_2 |
| s/[\[\]]//, s/=.*// foreach (@previous); |
| $ma->{metadata}->{reuse_tests_hit} = { map {$_ => 1} @previous }; |
| $reusing = 1; |
| } |
| } |
| |
| # plugin hook to cause us to skip messages |
| my $skip = $spamtest->call_plugins("mass_check_skip_message", { |
| class => $class, |
| 'time' => $time, |
| 'id' => $id, |
| msg => $ma |
| }); |
| if ($skip) { |
| $ma->finish(); |
| return; |
| } |
| |
| # log-uris support |
| my $status; |
| my @uris; |
| my $before; |
| my $after; |
| if ($opt_loguris) { |
| my $pms = Mail::SpamAssassin::PerMsgStatus->new($spamtest, $ma); |
| @uris = $pms->get_uri_list(); |
| $pms->finish(); |
| |
| } else { |
| $before = time; |
| $status = $spamtest->check($ma); |
| $after = time; |
| } |
| |
| my @extra; |
| |
| # sample-based learning |
| if ($opt_learn > 0) { |
| my $spam; |
| # spam learned as ham = 0.05% |
| if ($class eq 's' && rand(100) < 0.05) { |
| $spam = 0; |
| } |
| # ham learned as spam = 0.01% |
| elsif ($class eq 'h' && rand(100) < 0.01) { |
| $spam = 1; |
| } |
| # spam/ham learned correctly |
| elsif (rand(100) < $opt_learn) { |
| if ($class eq 's') { |
| $spam = 1; |
| } |
| elsif ($class eq 'h') { |
| $spam = 0; |
| } |
| else { |
| die "unknown class, learning failed"; |
| } |
| } |
| if (defined $spam) { |
| my $result = ($spam ? "spam" : "ham"); |
| my $status = $spamtest->learn($ma, undef, $spam, 0); |
| my $learned = $status->did_learn(); |
| $result = "undef" if !defined $learned; |
| push(@extra, "learn=".$result); |
| } |
| } |
| |
| if (defined($time)) { |
| push(@extra, "time=".$time); |
| } |
| if ($status && defined $status->{bayes_score}) { |
| push(@extra, "bayes=".sprintf("%06f", $status->{bayes_score})); |
| } |
| if ($opt_mid) { |
| my $mid = $ma->get_header("Message-Id"); |
| if ($mid) { # message contains a Message-Id: |
| while($mid =~ s/\([^\(\)]*\)//s) {}; # remove comments and |
| $mid =~ s/^\s+|\s+$//sg; # leading and trailing spaces |
| $mid =~ s/\s.*$//s; # keep only the first token |
| } |
| else { # it doesn't have a Message-Id: |
| $mid = $id; # so build one from the id |
| $mid =~ s,^.*/,,; # remove the path |
| $mid = "<$mid\@$host.masses.spamassassin.org>"; # and put it together |
| } |
| $mid =~ tr/-A-Za-z0-9_!#%&=~<@>/./c; # replace dangerous chars with . (so regexp search just works) |
| push(@extra, "mid=$mid"); |
| } |
| push(@extra, "scantime=" . ($after - $before)); |
| push(@extra, "format=$format"); |
| |
| if ($opt_logmem) { |
| my $mem = memory_track_finish(); |
| if ($mem) { |
| push(@extra, $mem); |
| } |
| } |
| |
| push(@extra, "reuse=" . ($reusing ? "yes" : "no")); |
| |
| # log the scoreset we're in |
| { |
| my $set = 0; |
| if ($opt_net) { $set |= 1; } |
| if ($status && defined $status->{bayes_score}) { $set |= 2; } |
| push(@extra, "set=".$set); |
| } |
| |
| if ($opt_client) { |
| push(@extra, "host=$where"); |
| } |
| |
| my $yorn; |
| my $score; |
| my $tests; |
| my $extra; |
| |
| if ($opt_loguris) { |
| $yorn = '.'; |
| $score = 0; |
| $tests = join(" ", sort @uris); |
| $extra = ''; |
| } else { |
| $yorn = $status->is_spam() ? 'Y' : '.'; |
| # don't bother adjusting scores for reuse |
| $score = $status->get_score(); |
| # list of tests hit |
| my @tests; |
| push @tests, split(/,/, $status->get_names_of_tests_hit()); |
| push @tests, split(/,/, $status->get_names_of_subtests_hit()); |
| |
| $tests = join(",", sort(@tests)); |
| $extra = join(",", @extra); |
| } |
| |
| if (defined $opt_rewrite) { |
| print REWRITE $status->rewrite_mail(); |
| } |
| |
| $id =~ s/\s/_/g; |
| |
| # if we have an origid set, it'll be the server mode's message number, so |
| # attach it to our result appropriately. |
| if (defined $origid) { |
| $out = "$origid "; |
| } |
| |
| $out .= sprintf("%s %2d %s %s %s\n", $yorn, $score, $id, $tests, $extra); |
| |
| if ($tests =~ /MICROSOFT_EXECUTABLE|MIME_SUSPECT_NAME/) { |
| $out .= logkilled($ma, $id, "possible virus"); |
| } |
| |
| if ($opt_loghits) { |
| my $log = ''; |
| foreach my $t (sort keys %{$status->{pattern_hits}}) { |
| $_ = $status->{pattern_hits}->{$t}; |
| $_ ||= ''; |
| s/\r/\\r/gs; # fix unprintables |
| s/\n/\\n/gs; |
| $log .= "$t=\"$_\" "; |
| } |
| if ($log) { |
| chomp $log; |
| $out .= "# $log\n"; |
| } |
| } |
| |
| if (defined $status) { $status->finish(); } |
| $ma->finish(); |
| undef $ma; # clean 'em up |
| undef $status; |
| |
| showdots_blip(); |
| # print ">>>> out = $out\n"; |
| return $out; |
| } |
| |
| sub showdots_blip { |
| return unless ($showdots_active); |
| |
| $showdots_counter++; |
| if ($showdots_counter % $showdots_every == 0) { |
| print STDERR '.'; |
| if ($showdots_counter % (60 * $showdots_every) == 0) { |
| print STDERR "\n"; |
| } |
| } |
| } |
| |
| sub showdots_finish { |
| print STDERR "\n" if ($showdots_active); |
| $showdots_counter = 0; |
| } |
| |
| # ick. We have to go grovelling through the body parts to see if a message |
| # is a report_safe-marked-up message, because a local scanner will overwrite |
| # any remote scanner's X-Spam-Checker-Version header. |
| # |
| sub message_should_be_deencapped { |
| my ($ma) = @_; |
| |
| # not sure why this is undefined, but it is sometimes |
| if (defined $ma->{body_parts} && scalar @{$ma->{body_parts}} > 0) { |
| my $firstpart = $ma->{body_parts}->[0]; |
| if (!$firstpart->{headers}->{'content-type'} |
| || $firstpart->{headers}->{'content-type'} ne 'text/plain') |
| { |
| return 0; # not a 'report_safe' encapsulation |
| } |
| |
| if (scalar @{$firstpart->{raw}} < 3) { return 0; } # too short to be a report |
| |
| # grab first 2 lines |
| my $text = $firstpart->{raw}->[0] . $firstpart->{raw}->[1]; |
| $text =~ s/\s+/ /gs; |
| if ($text =~ /^Spam detection software, running on the system \"(\S+)\"/) { |
| my $hname = $1; |
| if ($hname =~ /$opt_deencap/io) { |
| return 1; |
| } |
| } |
| } |
| |
| return 0; # a different host marked it up. pass it through! |
| } |
| |
| sub logkilled { |
| my ($ma, $id, $reason) = @_; |
| |
| my $from = $ma->get_header("From") || 'undef'; |
| my $to = $ma->get_header("To") || 'undef'; |
| my $subj = $ma->get_header("Subject") || 'undef'; |
| my $mid = $ma->get_header("Message-Id") || 'undef'; |
| chomp ($from); |
| chomp ($to); |
| chomp ($subj); |
| chomp ($mid); |
| return "# skipped killfiled message ($reason): from=$from to=$to subj=$subj mid=$mid id=$id\n"; |
| } |
| |
| sub progress { |
| my ($time) = @_; |
| $time ||= 0; |
| |
| # Are we at the end or otherwise at a point we should print status? Then do it. |
| if ($total_messages == $total_count || $total_count % $statusevery == 0) { |
| my $time = strftime("%Y-%m-%d", localtime($time)); |
| status(sprintf("%3d%% ham: %-6d spam: %-6d date: %s", |
| int(($total_count / $total_messages) * 100), $ham_count, $spam_count, $time)); |
| } |
| } |
| |
| sub status { |
| my($str) = @_; |
| my $now = strftime("%Y-%m-%d %X", localtime(time)); |
| printf STDERR "status: %-48s now: %s\n", $str, $now; |
| } |
| |
| ########################################################################### |
| |
| our ($mem_size, $mem_rss, $mem_shared); |
| |
| sub memory_track_start { |
| if ($^O =~ /linux/i) { |
| if (open (IN, "</proc/$$/statm")) { |
| my $statm = <IN>; |
| close IN; |
| if ($statm =~ /^(\d+) (\d+) (\d+) /) { |
| $mem_size = $1; |
| $mem_rss = $2; |
| $mem_shared = $3; |
| } |
| } |
| } |
| } |
| |
| sub memory_track_finish { |
| my $str = ''; |
| |
| if ($^O =~ /linux/i) { |
| if (open (IN, "</proc/$$/statm")) { |
| my $statm = <IN>; |
| close IN; |
| if ($statm =~ /^(\d+) (\d+) (\d+) /) { |
| my $size = $1; |
| my $rss = $2; |
| my $shared = $3; |
| |
| $str = sprintf ("memsz=%d,memrss=%d,memshr=%d", |
| ($size - $mem_size), |
| ($rss - $mem_rss), |
| ($shared - $mem_shared)); |
| } |
| } |
| } |
| return $str; |
| } |
| |
| sub get_current_svn_revision { |
| my $revision; |
| |
| # this is usually "${TOPDIR}/masses" |
| my $dir = $FindBin::Bin || "."; |
| |
| if (-d "$dir/../.svn" || -f "$dir/svninfo.tmp") { |
| if (-f "$dir/svninfo.tmp") { |
| # created by build/automc/buildbot_ready for chrooted mass-checks |
| open (SVNINFO, "< $dir/svninfo.tmp"); |
| } |
| else { |
| # note, ".." since we want to pick up changes outside 'masses' |
| # too! |
| open (SVNINFO, "( svn info --non-interactive $dir/.. || svn info $dir/.. ) 2>&1 |"); |
| } |
| |
| while (<SVNINFO>) { |
| # Revision: 383822 |
| next unless /^Revision: (\d+)/; |
| $revision = $1; |
| last; |
| } |
| close SVNINFO; |
| return $revision if $revision; |
| } |
| |
| # this probably will never work due to Rules Project changes TODO |
| if (open(TESTING, "$opt_c/70_testing.cf")) { |
| chomp($revision = <TESTING>); |
| $revision =~ s/.*\$Rev:\s*(\S+).*/$1/; |
| close(TESTING); |
| return $revision if $revision; |
| } |
| |
| return $revision || "unknown"; |
| } |
| |
| ############################################################################ |
| |
| ## children processors, start and process, used when opt_j > 1 |
| |
| sub start_children { |
| my ($count, $child, $pid, $socket) = @_; |
| |
| my $io = IO::Socket->new(); |
| my $parent; |
| |
| # create children |
| for (my $i = 0; $i < $count; $i++) { |
| ($child->[$i],$parent) = $io->socketpair(AF_UNIX,SOCK_STREAM,PF_UNSPEC) |
| or die "mass-check: socketpair failed: $!"; |
| if ($pid->[$i] = fork) { |
| close $parent; |
| |
| # disable caching for parent<->child relations |
| my ($old) = select($child->[$i]); |
| $|++; |
| select($old); |
| |
| $socket->add($child->[$i]); |
| aidbg "mass-check: starting new child $i (pid ".$pid->[$i].")\n"; |
| next; |
| } |
| elsif (defined $pid->[$i]) { |
| my $result; |
| my $line; |
| |
| close $tmpfd if defined $tmpfd; |
| |
| close $child->[$i]; |
| select($parent); |
| $| = 1; # print to parent by default, turn off buffering |
| send_line($parent,"START"); |
| while ($line = read_line($parent)) { |
| if ($line eq "exit") { |
| close $parent; |
| exit; |
| } |
| |
| my($class, $format, $date, $where, $result) = $iter->_run_message($line); |
| $result ||= ''; |
| |
| # If determine_receive_date is not set, the original input date |
| # wasn't calculated, but run_message would have done so, so reset |
| # the packed version if possible ... use defined for date since |
| # it could == 0. |
| if (!$iter->{determine_receive_date} && $class && $format && defined $date && $where) { |
| $line = Mail::SpamAssassin::ArchiveIterator::_index_pack($date, $class, $format, $where); |
| } |
| |
| $result = encode_base64($result); |
| send_line($parent,"$result\0RESULT $line"); |
| } |
| exit; |
| } |
| else { |
| die "mass-check: cannot fork: $!"; |
| } |
| } |
| } |
| |
| ## handling killing off the children |
| |
| sub reap_children { |
| my ($count, $socket, $pid) = @_; |
| |
| # If the child died, sending it the exit will generate a SIGPIPE, but we |
| # don't really care since the readline will go undef (which is fine), |
| # then we do the waitpid which will finish it off. So we end up in the |
| # right state, in theory. |
| local $SIG{'PIPE'} = 'IGNORE'; |
| |
| for (my $i = 0; $i < $count; $i++) { |
| aidbg "mass-check: killing child $i (pid ",$pid->[$i],")\n"; |
| send_line($socket->[$i],"exit"); # tell the child to die. |
| close $socket->[$i]; |
| waitpid($pid->[$i], 0); # wait for the signal ... |
| } |
| } |
| |
| # in server mode, this gets called to read in the HTTP request from a given |
| # socket, then return the information the client sent to us. |
| sub handle_http_request { |
| my $socket = shift; |
| |
| my $headers = {}; |
| my $postdata = {}; |
| |
| # read in the request (POST / HTTP/1.0) |
| my $line = $socket->getline(); |
| $line ||= ''; |
| $rcvd_bytes += length($line); |
| $line =~ s/\r\n$//; |
| |
| my ($type, $URI, $VERS) = $line =~ /^([a-zA-Z]+)\s+(\S+)(?:\s*(\S+))/; |
| unless ($type && $URI && $VERS) { |
| $type ||= ''; |
| $URI ||= ''; |
| |
| return ($type, $URI, $headers, $postdata); |
| } |
| |
| $type = uc $type; |
| |
| # read in headers, "key: value" up to a blank line |
| do { |
| $line = $socket->getline(); |
| last unless defined $line; |
| $rcvd_bytes += length($line); |
| $line =~ s/\r\n$//; |
| |
| if ($line) { |
| my ($k,$v) = split(/:\s*/, $line, 2); |
| $headers->{lc $k} = $v; |
| } |
| } while ($line !~ /^$/); |
| |
| # if this is a POST request w/ content-length, there'll be a payload, deal |
| # with it. we only support compressed payloads. |
| my $postheader; |
| if ($type eq 'POST' && $headers->{'content-length'}) { |
| $rcvd_bytes += $headers->{'content-length'}; |
| my $pd = ''; |
| if ($headers->{'content-encoding'} eq 'x-gzip') { |
| # assign an id to the client if it doesn't already have one |
| $headers->{'client-id'} ||= ++$client_id; |
| |
| my ($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile(); |
| die "Can't make tempfile, exiting" unless $gzpath; |
| |
| # TODO: don't read in the entire thing at once to avoid memory bloat |
| my $rd; |
| $socket->read($rd, $headers->{'content-length'}) || die "mass-check: error reading in data from client\n"; |
| print $gzfd $rd; |
| $gzfd->close; |
| |
| my $fd = IO::Zlib->new($gzpath, "rb"); |
| die "Can't open temp result file: $!" unless $fd; |
| |
| if ($headers->{'action'}) { |
| # different types of POST contents are packed different ways |
| if ($headers->{'action'} eq 'sending cache') { |
| |
| if ($opt_cs_schedule_cache) { |
| # save the client cache to a tied hash |
| |
| # get a temp file for the client cache database |
| my ($dbpath, $dbfd) = Mail::SpamAssassin::Util::secure_tmpfile(); |
| close $dbfd; |
| unlink $dbpath; |
| push @cache_tmp_files, $dbpath.'.pag'; |
| push @cache_tmp_files, $dbpath.'.dir'; |
| |
| tie %{$client_caches{$headers->{'client-id'}}}, "SDBM_File", $dbpath, |
| O_RDWR|O_CREAT, 0600 || die "Cannot tie hash to file $dbpath: $!\n"; |
| |
| # dump the client's list of cached message paths to the DB_HASH |
| my $client_cache = 0; |
| my $client_cache_hits = 0; |
| while (my $msg = read_line($fd)) { |
| my @d = Mail::SpamAssassin::ArchiveIterator::_index_unpack($msg); |
| $client_cache++; |
| if (exists $server_caches{'to_process'}{$d[3]}) { |
| $client_caches{$headers->{'client-id'}}{$d[3]} = undef; |
| $server_caches{'cache_count'}{$d[3]}++; |
| delete $server_caches{'not_cached'}{$d[3]}; |
| # log this client's cache hit rate |
| $client_cache_hits++; |
| } |
| } |
| status("client $headers->{'client-id'} has $client_cache msgs cached ($client_cache_hits usable)"); |
| status("client $headers->{'client-id'} has ".(sprintf("%.1f", ($client_cache_hits / $total_messages * 100))).'% of required messages'); |
| } |
| } |
| elsif ($headers->{'action'} eq 'sending results') { |
| # process the results |
| $pd = read_line($fd); |
| |
| # key1=value1&key2=value2... |
| %{$postdata} = map { |
| my($k,$v) = split(/=/, $_, 2); |
| |
| # we need to decode the key and value |
| $k =~ s/\%([0-9a-fA-F]{2})/sprintf "%c", hex($1)/eg; |
| $v =~ s/\%([0-9a-fA-F]{2})/sprintf "%c", hex($1)/eg; |
| |
| $k => $v; |
| } split(/\&/, $pd); |
| |
| } |
| } |
| |
| $fd->close; |
| undef $fd; |
| $gzfd->close; |
| unlink $gzpath; |
| } |
| } |
| return($type, $URI, $headers, $postdata); |
| } |
| |
| # in server mode, generate a gzip compressed data stream with the messages and |
| # return the path to the compressed file which the server will read and pass |
| # to the client. |
| # |
| # Input: |
| # - Number of messages to generate (scalar) |
| # - Hash of Arrays of outstanding requests (reference to hash of array refs) |
| # timestamp# -> [ num1, num2, ... ] |
| # Used to quickly find outstanding/timed out messages to send to client. |
| # - Hash of outstanding messages and associated data (ref to hash of hash refs) |
| # num1 -> { data => 'binary data from scan mode', timestamp => timestamp# } |
| # Used later on to specify the timestamp entry to remove the entry from. |
| # - Paths only? If true, just include the original message data in the gzip |
| # file. Otherwise, include the message data. Useful if the client has the |
| # corpus available via the same paths as originally specified. |
| # |
| # Returns: scalar path to gzip file |
| # |
| sub generate_messages { |
| my($msgs, $timestamps, $msgsout, $paths_only, $client_id) = @_; |
| |
| # Hold the message numbers we'll be sending out |
| my @tosend = (); |
| my @sent = (); |
| |
| # Find out if any of the messages we sent out before need to be sent out |
| # again because we haven't seen a response within the timeout. |
| my $tooold = time - $opt_cs_timeout; |
| foreach (sort { $a <=> $b } keys %{$timestamps}) { |
| # since we're going in numeric order, if the current entry is newer than |
| # the timeout value, the rest will be too, so stop looking. |
| last if ($_ > $tooold); |
| |
| # messages that might be eligible for retry |
| my @toretry = (); |
| |
| # how many messages do we still need to fulfill the request? |
| my $wanted = $msgs - @tosend; |
| |
| if (@{$timestamps->{$_}} > $wanted) { |
| # there are more entries in the timestamp list than we want, so just |
| # grab that many off the list. |
| push(@toretry, splice @{$timestamps->{$_}}, 0, $wanted); |
| } |
| else { |
| # there are just enough, or not enough entries on the timestamp list to |
| # satisfy our request, so take them all and we'll loop around. |
| push(@toretry, @{$timestamps->{$_}}); |
| delete $timestamps->{$_}; |
| } |
| |
| # limit retries |
| foreach my $num (@toretry) { |
| if ($msgsout->{$num}->{'count'} < $opt_cs_max_tries) { |
| $msgsout->{$num}->{'count'}++; |
| push @tosend, $num; |
| } else { |
| warn "skipping message num $num after $opt_cs_max_tries attempts, index: ". |
| (Mail::SpamAssassin::ArchiveIterator::_index_unpack($msgsout->{$num}->{'data'}))[3]."\n"; |
| delete $msgsout->{$num}; |
| $failed_msgs++; |
| } |
| } |
| |
| # Ok, we have enough messages so we can stop now. |
| last if (@tosend == $msgs); |
| } |
| |
| if (!$opt_cs_schedule_cache) { |
| # if we still have the temp file with the input messages open, we'll fillup |
| # out message output queue with messages from there. |
| if ($tmpfd) { |
| while (@tosend < $msgs) { |
| my $msg = read_line($tmpfd); |
| |
| # no more messages from the temp file, close it out |
| unless ($msg) { |
| delete $msgsout->{'curnum'}; |
| close $tmpfd; |
| undef $tmpfd; |
| last; |
| } |
| |
| # we got a result, so assign it a number (curnum) and store the data |
| # appropriately, then add the new number to the queue. |
| my $num = ++$msgsout->{'curnum'}; |
| $msgsout->{$num}->{'data'} = $msg; |
| $msgsout->{$num}->{'count'}++; |
| push(@tosend, $num); |
| } |
| } |
| } |
| else { |
| if (($msgs_processed + $failed_msgs) != $total_messages) { |
| # select messages based on what the client*s* have cached |
| |
| # if the client hasn't sent cache data, fake it |
| unless (exists $client_caches{$client_id}) { |
| %{$client_caches{$client_id}} = (); |
| $unique_cache_completed{$client_id} = 1; |
| } |
| |
| # first: select messages that only the current client has cached |
| MESSAGE: while (!$unique_cache_completed{$client_id} && |
| @tosend < $msgs && |
| (my($path, undef) = each %{$client_caches{$client_id}})) { |
| # check that no other clients have it cached |
| next MESSAGE if $server_caches{'cache_count'}{$path} > 1; |
| |
| # we got a result, so assign it a number (curnum) and store the data |
| # appropriately, then add the new number to the queue. |
| my $num = ++$msgsout->{'curnum'}; |
| $msgsout->{$num}->{'data'} = $server_caches{'to_process'}{$path}; |
| $msgsout->{$num}->{'count'}++; |
| push(@tosend, [ $num, 1 ]); |
| delete $server_caches{'to_process'}{$path}; |
| delete $server_caches{'cache_count'}{$path}; |
| delete $client_caches{$client_id}{$path}; |
| $cache_hits++; |
| } |
| if (@tosend < $msgs) { |
| $unique_cache_completed{$client_id} = 1; |
| } |
| |
| # second: hand out messages that no clients have cached |
| while (@tosend < $msgs && |
| (my($path, undef) = each %{$server_caches{'not_cached'}})) { |
| # we got a result, so assign it a number (curnum) and store the data |
| # appropriately, then add the new number to the queue. |
| my $num = ++$msgsout->{'curnum'}; |
| $msgsout->{$num}->{'data'} = $server_caches{'to_process'}{$path}; |
| $msgsout->{$num}->{'count'}++; |
| push(@tosend, [ $num, 0 ]); |
| delete $server_caches{'to_process'}{$path}; |
| delete $server_caches{'cache_count'}{$path}; |
| delete $server_caches{'not_cached'}{$path}; |
| } |
| |
| # third: hand out messages in the client's cache regardless of how many |
| # other clients have them cached; smart scheduling takes too long |
| while (@tosend < $msgs && |
| (my($path, undef) = each %{$client_caches{$client_id}})) { |
| # we got a result, so assign it a number (curnum) and store the data |
| # appropriately, then add the new number to the queue. |
| my $num = ++$msgsout->{'curnum'}; |
| $msgsout->{$num}->{'data'} = $server_caches{'to_process'}{$path}; |
| $msgsout->{$num}->{'count'}++; |
| push(@tosend, [ $num, 1 ]); |
| delete $server_caches{'to_process'}{$path}; |
| delete $server_caches{'cache_count'}{$path}; |
| $cache_hits++; |
| |
| # testing with 5 clients, this seems to be faster (in msgs/hr) than |
| # deleting from each other clients' cache when we go to get their |
| # messages; of course it's more I/O on the server this way |
| my $cache_count = $server_caches{'cache_count'}{$path}; |
| foreach my $cc (keys %client_caches) { |
| delete $client_caches{$cc}{$path}; |
| last unless --$cache_count; |
| } |
| } |
| |
| # fourth: hand out messages that other clients have cached without |
| # regard for how many clients have cached them; smart |
| # scheduling takes too long (at least when using DBM) |
| while (@tosend < $msgs && |
| (my($path, $msg) = each %{$server_caches{'to_process'}})) { |
| # we got a result, so assign it a number (curnum) and store the data |
| # appropriately, then add the new number to the queue. |
| my $num = ++$msgsout->{'curnum'}; |
| $msgsout->{$num}->{'data'} = $server_caches{'to_process'}{$path}; |
| $msgsout->{$num}->{'count'}++; |
| push(@tosend, [ $num, 0 ]); |
| delete $server_caches{'to_process'}{$path}; |
| delete $server_caches{'cache_count'}{$path}; |
| |
| my $cache_count = $server_caches{'cache_count'}{$path}; |
| foreach my $cc (keys %client_caches) { |
| delete $client_caches{$cc}{$path}; |
| last unless --$cache_count; |
| } |
| } |
| } |
| else { |
| # close the tmpfd, etc so that the main loop knows we're done |
| delete $msgsout->{'curnum'}; |
| close $tmpfd; |
| undef $tmpfd; |
| } |
| } |
| |
| # ok, at this point, @tosend ought to have a list of numbers, pointers into |
| # %{$msgsout}. turn that into a tar file. |
| return '' unless @tosend; |
| |
| my($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile(); |
| die "Can't make tempfile, exiting" unless $gzpath; |
| close($gzfd); |
| |
| $gzfd = IO::Zlib->new($gzpath, 'wb') || die "Can't create temp gzip file: $!"; |
| |
| # first line is the number of messages included in the file |
| send_line($gzfd, scalar @tosend) || die "mass-check: error when writing to gz temp file\n"; |
| |
| # Generate an archive in the temp file |
| foreach my $num (@tosend) { |
| my $in_cache = 0; |
| if (ref($num) eq 'ARRAY') { |
| $in_cache = $num->[1]; |
| $num = $num->[0]; |
| } |
| # Archive format, gzip compressed file w/ 4 parts per message: |
| # 1- server message number in text format |
| # 2- server index string, binary packed format |
| # 3- a 1 if the message is included, 0 otherwise -- unless paths_only |
| # 4- message content -- unless paths_only or #3 is 0 |
| my $data = $msgsout->{$num}->{'data'}; |
| if (!$paths_only) { |
| unless ($in_cache) { |
| my $msg = ($iter->_run_message($data))[4]; |
| if ($msg) { |
| send_line($gzfd, $num) || die "mass-check: error when writing to gz temp file\n"; |
| send_line($gzfd, $data) || die "mass-check: error when writing to gz temp file\n"; |
| send_line($gzfd, '1') || die "mass-check: error when writing to gz temp file\n"; |
| send_line($gzfd, (defined $msg ? join('', @{$msg}) : '')) || |
| die "mass-check: error when writing to gz temp file\n"; |
| push @sent, $num; |
| } else { |
| # if the message has an error (probably due to it being removed sometime during the |
| # run) we send a message number of 0 to indicate to the client that the server had an |
| # error retrieving the message; we need to do this since we have already added the |
| # line telling the client how many messages to expect |
| my $filename = (Mail::SpamAssassin::ArchiveIterator::_index_unpack($data))[3]; |
| warn qq/mass-check: ArchiveIterator returned error for "$filename"\n/; |
| send_line($gzfd, 0) || die "mass-check: error when writing to gz temp file\n"; |
| delete $msgsout->{$num}; |
| $failed_msgs++; |
| } |
| } else { |
| send_line($gzfd, $num) || die "mass-check: error when writing to gz temp file\n"; |
| send_line($gzfd, $data) || die "mass-check: error when writing to gz temp file\n"; |
| send_line($gzfd, '0') || die "mass-check: error when writing to gz temp file\n"; |
| push @sent, $num; |
| } |
| } else { |
| send_line($gzfd, $num) || die "mass-check: error when writing to gz temp file\n"; |
| send_line($gzfd, $data) || die "mass-check: error when writing to gz temp file\n"; |
| # the client deals with missing messages on its own (sort of) |
| push @sent, $num; |
| } |
| } |
| |
| $gzfd->close; |
| |
| # update timestamp entries |
| my $ts = time; |
| |
| # make sure the timestamp is unique! without Time::HiRes it is trivial and |
| # common to have two reissueings of timedout messages in the same second |
| # with Time::HiRes we'll check anyway since we'll waste less time checking |
| # for uniqueness than one of us will waste debugging a report of mass-check |
| # not completing due to someone's wacky clock (some (non-para-)virtualized |
| # servers will have their clocks go forward and backward) |
| while (exists $timestamps->{$ts}) { |
| if (HAS_TIME_HI_RES) { |
| $ts = Time::HiRes::time(); |
| } else { |
| $ts += 0.01; |
| } |
| } |
| |
| foreach (@sent) { |
| $msgsout->{$_}->{'timestamp'} = $ts; |
| } |
| |
| # conveniently, this list should be the only thing sent out w/ this |
| # timestamp, so just set the reference appropriately. :) |
| $timestamps->{$ts} = \@sent; |
| |
| if ($opt_noisy) { |
| status('sent '.scalar(@sent).' of '.scalar(@tosend).' intended messages'); |
| } |
| |
| return $gzpath; |
| } |
| |
| # we've gotten results posted, so clean up msgsout and timestamp hashes and |
| # process result... |
| sub handle_post_results { |
| my($postdata, $timestamps, $msgsout) = @_; |
| |
| # local version to batch the removals |
| my %timestamps = (); |
| |
| # $msgsout->{num}->{data|timestamp} |
| # $timestamp{num} = [ msgout_nums ... ] |
| # $postdata{num} = result_string |
| |
| while( my($k,$v) = each %{$postdata} ) { |
| # message run results will be \d+ => log entry |
| next if ($k !~ /^\d+$/); |
| |
| # if we've been waiting for this result, process it, otherwise throw it on |
| # the ground. multiple clients could have been given the same messages to |
| # process, and we take whatever the first responder sends us. |
| if (exists $msgsout->{$k}) { |
| # the result_sub will need parts of the message data, so get it ready |
| my @d = Mail::SpamAssassin::ArchiveIterator::_index_unpack($msgsout->{$k}->{'data'}); |
| |
| # go ahead and do the result |
| &{$iter->{result_sub}}($d[1], $v, $d[0]); |
| |
| # prep to get rid of the cached entries |
| $timestamps{$msgsout->{$k}->{'timestamp'}}->{$k} = 1; |
| delete $msgsout->{$k}; |
| |
| $msgs_processed++; |
| } |
| } |
| |
| # if we got any results, clean out the results from the timestamp arrays |
| while ( my($k,$v) = each %timestamps ) { |
| # trim out the result list from the timestamp sent list |
| my @temp = grep(!exists $v->{$_}, @{$timestamps->{$k}}); |
| |
| # if there are results left for a specific timestamp, update the array |
| # pointer. otherwise, delete the timestamp entry since it's empty. |
| if (@temp) { |
| $timestamps->{$k} = \@temp; |
| } |
| else { |
| delete $timestamps->{$k}; |
| } |
| } |
| } |
| |
| # This function reads from $tmpfd and processes the message as appropriate wrt |
| # $opt_j, $opt_restart, etc. |
| # |
| sub run_through_messages { |
| # do everything in one process |
| if ($opt_j <= 1 && !defined $opt_restart) { |
| my $message; |
| my $messages; |
| my $total_count = 0; |
| |
| while (($total_messages > $total_count) && ($message = read_line($tmpfd))) { |
| my($class, undef, $date, undef, $result) = $iter->_run_message($message); |
| if ($result) { |
| &{$iter->{result_sub}}($class, $result, $date); |
| } |
| $total_count++; |
| } |
| $msgs_processed += $total_count; |
| } |
| # more than one process or one process with restarts |
| else { |
| my $select = IO::Select->new(); |
| |
| my $total_count = 0; |
| my $needs_restart = 0; |
| my @child = (); |
| my @pid = (); |
| my $messages; |
| |
| # start children processes |
| start_children($opt_j, \@child, \@pid, $select); |
| |
| # feed childen, make them work for it, repeat |
| while ($select->count()) { |
| foreach my $socket ($select->can_read()) { |
| my $line = read_line($socket); |
| |
| # some error happened during the read! |
| if (!defined $line) { |
| $needs_restart = 1; |
| warn "mass-check: readline failed, attempting to recover\n"; |
| $select->remove($socket); |
| } |
| elsif ($line =~ /^([^\0]*)\0RESULT (.+)$/s) { |
| my $result = decode_base64($1); |
| my ($date,$class,$type) = Mail::SpamAssassin::ArchiveIterator::_index_unpack($2); |
| aidbg "mass-check: $class, $type, $date\n"; |
| |
| if (defined $opt_restart && ($total_count % $opt_restart) == 0) { |
| $needs_restart = 1; |
| } |
| |
| # if messages remain, and we don't need to restart, send message |
| if (($total_messages > $total_count) && !$needs_restart) { |
| my $line = read_line($tmpfd); |
| unless ($line) { |
| warn "mass-check: found short message list ($total_messages, $total_count)\n"; |
| $select->remove($socket); |
| next; |
| } |
| |
| send_line($socket, $line); |
| $total_count++; |
| aidbg "mass-check: $total_messages $total_count\n"; |
| } |
| else { |
| # stop listening on this child since we're done with it |
| aidbg "mass-check: $needs_restart $total_messages $total_count\n"; |
| $select->remove($socket); |
| } |
| |
| # deal with the result we received |
| if ($result) { |
| &{$iter->{result_sub}}($class, $result, $date); |
| } |
| } |
| elsif ($line eq "START") { |
| if ($total_messages > $total_count) { |
| # we still have messages, send one to child |
| send_line($socket, read_line($tmpfd)); |
| $total_count++; |
| aidbg "mass-check: $total_messages $total_count\n"; |
| } |
| else { |
| # no more messages, so stop listening on this child |
| aidbg "mass-check: $needs_restart $total_messages $total_count\n"; |
| $select->remove($socket); |
| } |
| } |
| else { |
| $needs_restart = 1; |
| warn "mass-check: bad line from readline: $line\n"; |
| $select->remove($socket); |
| } |
| } |
| |
| aidbg "mass-check: out of loop, $total_messages $total_count $needs_restart ".$select->count()."\n"; |
| |
| # If there are still messages to process, and we need to restart |
| # the children, and all of the children are idle, let's go ahead. |
| if ($needs_restart && $select->count == 0 && $total_messages > $total_count) { |
| $needs_restart = 0; |
| |
| aidbg "mass-check: needs restart, $total_messages total, $total_count done\n"; |
| reap_children($opt_j, \@child, \@pid); |
| @child=(); |
| @pid=(); |
| start_children($opt_j, \@child, \@pid, $select); |
| } |
| } |
| $msgs_processed += $total_count; |
| |
| # reap children |
| reap_children($opt_j, \@child, \@pid); |
| } |
| } |
| |
| # send an HTTP response to a socket based on the input result, headers, and |
| # data values. |
| sub http_response { |
| my($socket, $result, $headers, $data) = @_; |
| |
| $headers->{'Content-Length'} = length $data; |
| $headers->{'Cache-Usage'} = $opt_cs_cache ? 'allowed' : 'disallowed'; |
| |
| print $socket |
| "HTTP/1.0 $result\r\n", |
| "Pragma: no-cache\r\n", |
| "Server: mass-check/$svn_revision\r\n", |
| (map { "$_: ".$headers->{$_}."\r\n" } keys %{$headers}), "\r\n"; |
| print $socket $data; |
| |
| if ($opt_cs_verbose) { |
| $sent_bytes += length( |
| "HTTP/1.0 $result\r\n". |
| "Pragma: no-cache\r\n". |
| "Server: mass-check/$svn_revision\r\n"); |
| # length of a map output isn't what the other end claims to see |
| while (my($f,$v) = each %{$headers}) { |
| $sent_bytes += length($f) + length($v) + 4; |
| } |
| $sent_bytes += 2 + length($data); |
| } |
| } |
| |
| # the client needs to make a request to the server on a given socket. |
| sub http_make_request { |
| my($socket, $type, $uri, $headers, $data) = @_; |
| |
| $headers->{'Content-Length'} = length $data; |
| |
| print $socket |
| "$type $uri HTTP/1.0\r\n", |
| "User-Agent: mass-check/$svn_revision\r\n", |
| (map { "$_: ".$headers->{$_}."\r\n" } keys %{$headers}), "\r\n"; |
| print $socket $data; |
| |
| if ($opt_cs_verbose) { |
| $sent_bytes += length( |
| "$type $uri HTTP/1.0\r\n". |
| "User-Agent: mass-check/$svn_revision\r\n"); |
| # length of a map output isn't what the other end claims to see |
| while (my($f,$v) = each %{$headers}) { |
| $sent_bytes += length($f) + length($v) + 4; |
| } |
| $sent_bytes += 2 + length($data); |
| } |
| |
| # parse the response that the server sends us |
| my $line = $socket->getline() || ''; |
| $rcvd_bytes += length($line); |
| my(undef, $code, $string) = split(/\s+/, $line, 3); |
| return unless $code == 200; |
| |
| my %headers = (); |
| do { |
| $line = $socket->getline(); |
| last unless defined $line; |
| $rcvd_bytes += length($line); |
| $line =~ s/\r\n$//; |
| |
| if ($line) { |
| my ($k,$v) = split(/:\s*/, $line, 2); |
| $headers{lc $k} = $v; |
| } |
| } while ($line !~ /^$/); |
| $rcvd_bytes += $headers{'content-length'} if $headers{'content-length'}; |
| $client_id ||= $headers{'client-id'} if $headers{'client-id'}; |
| |
| # disable cache-usage if the server disallows cache usage |
| $opt_cs_cache = 0 if $headers{'cache-usage'} eq 'disallowed'; |
| |
| # the server has sent us notification that it's going to exit, so let's |
| # follow suit. |
| return 'finished' if ($headers{'finished'}); |
| |
| my $gzpath = ''; |
| if ($headers{'content-type'} eq 'application/x-gzip') { |
| my $gzfd; |
| ($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile(); |
| die "Can't make tempfile, exiting" unless $gzpath; |
| |
| my $rd; |
| $socket->read($rd, $headers{'content-length'}) || die "mass-check: error reading in data from server\n"; |
| print $gzfd $rd; |
| close $gzfd; |
| } |
| |
| $socket->close(); |
| return $gzpath; |
| } |
| |
| # Be conservative -- encode most things. |
| # we could encode spaces to plusses, then decode that later, but... |
| sub post_encode { |
| my $string = shift; |
| $string =~ s/([^a-zA-Z0-9_,.\/\\-])/sprintf "%%%02x",unpack("C",$1)/egx; |
| return $string; |
| } |
| |
| # remove all of the files in a given directory, non-recursive |
| sub clean_dir { |
| my $dir = shift; |
| |
| unless (opendir(DIR, $dir)) { |
| warn "error: can't opendir $dir: $!\n"; |
| return; |
| } |
| while(my $file = readdir(DIR)) { |
| $file =~ /^(.+)$/; # untaint |
| $file = $1; |
| |
| my $path = File::Spec->catfile($dir, $file); |
| next unless (-f $path); |
| |
| if (!unlink $path) { |
| warn "error: can't remove file $path: $!\n"; |
| closedir(DIR); |
| return; |
| } |
| } |
| closedir(DIR); |
| return 1; |
| } |
| |
| ############################################################################ |
| |
| # four bytes in network/vax format (little endian) as length of message |
| # the rest is the actual message |
| |
| sub read_line { |
| my $fd = shift; |
| my($length,$msg); |
| |
| # read in the 4 byte length and unpack |
| $fd->read($length, 4) || return; |
| |
| $length = unpack("V", $length); |
| return unless $length; |
| |
| # read in the rest of the single message |
| $fd->read($msg, $length) || return; |
| |
| return $msg; |
| } |
| |
| sub send_line { |
| my $fd = shift; |
| foreach ( @_ ) { |
| my $length = pack("V", length $_); |
| $fd->print($length.$_) || return 0; |
| } |
| |
| return 1; |
| } |
| |
| ############################################################################ |
| |
| # this is the function that implemented server mode. basically, sit and wait |
| # for connections to come in. when a client sends in a request, deal with any |
| # results that the client sent, then generate a response and send it back, |
| # and then go back to waiting. lather, rinse, repeat. |
| sub server_mode { |
| $opt_cs_max ||= 1000; |
| $opt_cs_timeout ||= 60 * 5; |
| $opt_cs_max_tries ||= 3; |
| |
| # IO::Socket::SSL isn't that smart |
| $opt_server =~ s/:(\d+)$//; |
| my $port = $1; |
| |
| # ::SSL needs resolved hostnames, at least on Solaris. note: not IPv6-aware |
| my $localaddr = (gethostbyname($opt_server))[4] || $opt_server; |
| $localaddr = Socket::inet_ntoa($localaddr); |
| |
| my $serv_socket; |
| if ($opt_cs_ssl) { |
| $serv_socket = IO::Socket::SSL->new( |
| LocalAddr => $localaddr, |
| LocalPort => $port, |
| ReuseAddr => 1, |
| Listen => 5, |
| SSL_verify_mode => 0x02, |
| SSL_use_cert => 1, |
| SSL_version => "TLSv1", |
| SSL_key_file => "spamassassin/server-key.pem", |
| SSL_cert_file => "spamassassin/server-cert.pem", |
| ); |
| } |
| else { |
| $serv_socket = IO::Socket::INET->new( |
| LocalAddr => $localaddr, |
| LocalPort => $port, |
| ReuseAddr => 1, |
| Listen => 5, |
| ); |
| } |
| |
| die "Could not create socket: $!\n" unless $serv_socket; |
| |
| if ($opt_progress) { |
| status('server ready for connections'); |
| } |
| |
| # Setup out "what messages have been sent out" hashes |
| my $timestamps = {}; |
| my $msgsout = { 'curnum' => 0 }; |
| |
| # Generate an IO::Select object and put the server socket on the queue |
| my $select = IO::Select->new( $serv_socket ); |
| |
| # We'll keep looping while there's something to pay attention to |
| while ($select->count()) { |
| # Sit and block until there's something for us to read from |
| foreach my $socket ($select->can_read()) { |
| if ($socket == $serv_socket) { |
| # it's the server socket, go ahead and accept the connection and add |
| # it to the queue. |
| $select->add($serv_socket->accept); |
| } |
| else { |
| # it's some client, so deal with the request |
| my($type, $URI, $headers, $postdata) = handle_http_request($socket); |
| |
| # we don't do GET, so just send something back |
| if ($type eq 'GET') { |
| if ($opt_noisy) { |
| status('GET request from '.$socket->peerhost); |
| } |
| |
| http_response($socket, "200 OK", { |
| 'Content-type' => 'text/plain', |
| }, |
| "Your GET request came from IP Address: ".$socket->peerhost."\n"); |
| } |
| elsif ($type eq 'POST') { |
| # ooh, POST. deal with any results that the client sent |
| handle_post_results($postdata, $timestamps, $msgsout); |
| |
| if ($opt_noisy) { |
| status('POST request from '.$socket->peerhost); |
| } |
| |
| # based on the number of messages that the client requested, |
| # generate a gzip file with the appropriate data in it |
| my $messages = ''; |
| if ($postdata->{'max_messages'}) { |
| my $msgnum = $postdata->{'max_messages'}; |
| if ($msgnum > $opt_cs_max || $msgnum < 1) { |
| $msgnum = $opt_cs_max; |
| } |
| |
| if ($opt_noisy) { |
| status('client requested '.$postdata->{'max_messages'}.' messages'); |
| } |
| |
| $messages = generate_messages($msgnum, $timestamps, $msgsout, $postdata->{'paths_only'}, $headers->{'client-id'}); |
| } |
| |
| # $messages will contain the path to the gzip file if there are |
| # messages to send out. |
| if ($messages && open(MSG, $messages)) { |
| $t_first_msg = time unless $t_first_msg; |
| binmode(MSG); |
| local $/ = undef; # go go slurp mode |
| |
| # send the response |
| http_response($socket, "200 OK", { |
| 'Content-Type' => 'application/x-gzip', |
| 'Content-Encoding' => 'x-gzip', |
| 'Client-ID' => $headers->{'client-id'}, |
| }, |
| scalar <MSG>); |
| |
| close(MSG); |
| |
| # we don't need the file anymore, so get rid of it |
| unlink $messages; |
| } |
| elsif (!keys %{$msgsout} && !defined $tmpfd) { |
| # we have no more outstanding messages and our original queue of |
| # messages to process is empty, so tell the client to exit. |
| http_response($socket, "200 OK", { |
| "Content-type" => "text/plain", |
| 'Client-ID' => $headers->{'client-id'}, |
| "Finished" => 1, |
| }, |
| 'We are all done'); |
| $t_last_msg = time; |
| } |
| else { |
| # when in doubt, treat this like a GET |
| http_response($socket, "200 OK", { |
| "Content-type" => "text/plain", |
| 'Client-ID' => $headers->{'client-id'}, |
| }, |
| "Your POST request (sans max_messages) came from IP Address: ".$socket->peerhost."\n"); |
| } |
| } |
| else { |
| # for error, "501 Not Implemented" |
| http_response($socket, '501 Not Implemented', {}, ''); |
| } |
| |
| # ok, we don't do keepalive, so get rid of the socket |
| $select->remove($socket); |
| $socket->close; |
| } |
| } |
| |
| if ($opt_noisy) { |
| status((exists $msgsout->{'curnum'} ? (scalar(keys %{$msgsout})-1) : |
| scalar(keys %{$msgsout})).' messages outstanding'); |
| } |
| |
| #print "msgs waiting: ".join(" ", keys %{$msgsout})."\n"; |
| #print "tmpfd defined? ".(defined $tmpfd ? "yes" : "no")."\n"; |
| |
| # we're not awaiting responses and we've exhausted the input file, so |
| # drop the server socket. :) |
| $select->remove($serv_socket) if (!keys %{$msgsout} && !defined $tmpfd); |
| } |
| |
| # remove the server and client message cache temp files |
| foreach my $tied_hash (keys %server_caches) { |
| undef $tied_hash; |
| } |
| foreach my $tied_hash (keys %client_caches) { |
| undef $tied_hash; |
| } |
| foreach my $tmp_file (@cache_tmp_files) { |
| unlink $tmp_file; |
| } |
| } |
| |
| # this is the function that implements client mode. generally, in a loop: |
| # make a request of the server for some max number of messages, and send our |
| # results back at the same time. based on the results of that request, put |
| # messages into a temp dir and process them. prep the results and loop. |
| # lather, rinse, repeat. |
| sub client_mode { |
| $opt_cs_max ||= 1000; |
| $opt_cs_timeout ||= 60 * 2; |
| $opt_cs_conn_retries ||= 60; # 1 hour |
| |
| my($host, $port, $uri); |
| |
| if ($opt_client =~ /^http:\/\/([^\/]+)(\/.*)?/) { |
| ($host, $uri) = ($1,$2); |
| } else { |
| $host = $opt_client; |
| if ($host =~ /^:/) { |
| $host = 'localhost'.$host; |
| } |
| } |
| ($host, $port) = split(/:/, $host); |
| |
| die "No host found in opt_client" unless $host; |
| $uri ||= "/"; |
| |
| # ::SSL needs resolved hostnames, at least on Solaris. note: not IPv6-aware |
| $host = Socket::inet_ntoa(Socket::inet_aton($host)); |
| |
| # use this to track how many messages we ought to be requesting |
| # start at 100 to get warmed up |
| my $msgnum = $opt_cs_max > 100 ? 100 : $opt_cs_max; |
| |
| my $tmpdir; |
| |
| # if we're not doing paths_only, create a temp dir where we'll put the |
| # incoming messages to process. |
| if (!$opt_cs_paths_only) { |
| $tmpdir = Mail::SpamAssassin::Util::secure_tmpdir(); |
| die "Can't create tempdir" unless $tmpdir; |
| } |
| |
| my $made_conn_once = 0; |
| |
| # keep going until something stops us. |
| while (1) { |
| # send cache data if this is the first connect |
| # do this before creating the connection so we don't waste all the other |
| # clients' time, once we've connected, by monopolizing the single server |
| my ($gzpath, $gzfd, $action); |
| if (!$made_conn_once && $opt_cs_schedule_cache) { |
| # we need to find out what messages we have in our client cache so that we |
| # can tell the server what we've got so that it can optimize our cache hit |
| # rate by requesting us to scan messages that are already in our cache |
| $action = 'sending cache'; |
| $gzpath = scan_client_cache(); |
| } |
| else { |
| $action = 'sending results'; |
| # if the number of messages to request is too much, bring it down |
| $msgnum = $opt_cs_max if ($msgnum > $opt_cs_max); |
| |
| # prep the POST request |
| $postdata{'max_messages'} = $msgnum; |
| $postdata{'paths_only'} = 1 if ($opt_cs_paths_only); |
| |
| # compress the results |
| ($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile(); |
| die "Can't make tempfile, exiting" unless $gzpath; |
| $gzfd->close; |
| $gzfd = IO::Zlib->new($gzpath, 'wb') || die "Can't create temp gzip file: $!"; |
| |
| # the actual POST data string |
| send_line($gzfd, join('&', map { post_encode($_) . '=' . post_encode($postdata{$_}) } keys %postdata)) || |
| die "mass-check: error when writing to gz temp file\n"; |
| $gzfd->close; |
| %postdata = (); |
| } |
| |
| # connect to server |
| my $socket; |
| if ($opt_cs_ssl) { |
| $socket = IO::Socket::SSL->new( |
| PeerAddr => $host, |
| PeerPort => $port, |
| SSL_version => "TLSv1", |
| SSL_use_cert => 1, |
| SSL_key_file => "spamassassin/client-key.pem", |
| SSL_cert_file => "spamassassin/client-cert.pem", |
| ); |
| } else { |
| $socket = IO::Socket::INET->new( |
| PeerAddr => $host, |
| PeerPort => $port |
| ); |
| } |
| |
| if (!$socket) { |
| unlink $gzpath; |
| undef $gzfd; |
| # if we haven't yet made a connection, keep retrying; |
| # this is probably the server still in "scan stage" |
| if (!$made_conn_once) { |
| if ($opt_cs_conn_retries-- > 0) { |
| status('failed to connect, sleeping for retry') if ($opt_noisy); |
| sleep 60; |
| next; |
| } else { |
| status('failed to connect, giving up') if ($opt_noisy); |
| last; |
| } |
| } |
| |
| # last if connection fails after scanning something |
| last; |
| } |
| |
| $made_conn_once = 1; |
| status("requesting $msgnum messages from server") if ($opt_noisy); |
| |
| # make request, include and then drop results if there are any |
| my $POSTDATA = ''; |
| if ($gzpath) { |
| if (open(RESULTS, $gzpath)) { |
| binmode(RESULTS); |
| { |
| # slurp here, rather than into an anonymous variable in the |
| # http_make_request call so that we don't end up slurping the |
| # response from the server too |
| local $/ = undef; # slurp |
| $POSTDATA = scalar <RESULTS>; |
| } |
| close(RESULTS); |
| unlink $gzpath; |
| undef $gzfd; |
| } else { |
| die "Can't open tempfile, exiting" unless $gzpath; |
| } |
| } |
| |
| my $result = http_make_request($socket, 'POST', $uri, { |
| 'Host' => $host, |
| 'Content-Type' => 'application/x-www-form-urlencoded', |
| 'Content-Encoding' => 'x-gzip', |
| 'Action' => $action, |
| 'Client-ID' => ($client_id || '') |
| }, |
| $POSTDATA |
| ); |
| undef $POSTDATA; |
| |
| $t_last_msg = time; |
| |
| # If we received messages to run through, go ahead and do it. |
| # otherwise, just sleep for the timeout length and try again |
| if (!defined $result) { |
| # we got an error?!? abort! |
| last; |
| } |
| elsif ($result eq 'finished') { |
| # the server said that we're done |
| status('server has no more work, exiting.') if ($opt_noisy); |
| last; |
| } |
| elsif ($result eq '') { |
| # no messages means the server may give us more work down the road. |
| # sleep for client_timeout seconds and try the request again |
| status("Received no messages, waiting $opt_cs_timeout seconds") if ($opt_noisy); |
| sleep $opt_cs_timeout; |
| } |
| else { |
| # we got messages, so deal with them. |
| my $time_start = time; |
| $t_first_msg = $time_start unless $t_first_msg; |
| |
| # postdata will hold our results, real will hold the original message |
| # data from the server's scan mode. |
| %postdata = (); |
| %real = (); |
| $init_results = $total_count = $spam_count = $ham_count = 0; |
| |
| # we got a result, so do things with it! |
| my $gzfd = IO::Zlib->new($result, "rb"); |
| die "Can't open temp result file: $!" unless $gzfd; |
| |
| # used for the temp queue file |
| my $tmppath; |
| ($tmppath, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile(); |
| die "Can't make tempfile, exiting" unless $tmppath; |
| unlink $tmppath; |
| |
| # if we have a temp directory, clean it out for this run |
| clean_dir($tmpdir) if ($tmpdir); |
| |
| # Archive format, gzip compressed file w/ 4 parts per message: |
| # 1- server message number in text format |
| # 2- server index string, binary packed format |
| # 3- a 1 if the message is included, 0 otherwise -- unless paths_only |
| # 4- message content -- unless paths_only or #3 is 0 |
| |
| # number of messages |
| $msgnum = $total_messages = read_line($gzfd) || die "mass-check: error reading from gzip message file\n"; |
| |
| status("server says it gave us $total_messages messages") if ($opt_progress); |
| my $actual_messages = 0; |
| |
| # loop through and prep all of the messages the server sent |
| for(my $i = 0 ; $i < $total_messages; $i++ ) { |
| my $num = read_line($gzfd); |
| last unless defined $num; |
| |
| next unless $num; # a message number of 0 indicates a msg-error on the server |
| |
| my $index = read_line($gzfd); |
| last unless defined $index; |
| |
| my @d = Mail::SpamAssassin::ArchiveIterator::_index_unpack($index); |
| |
| # if we're doing paths_only, there'll be no message content |
| if (!$opt_cs_paths_only) { |
| my $msg_included = read_line($gzfd); |
| last unless defined $msg_included; |
| |
| my $msg; |
| if ($msg_included == 1) { |
| $msg = read_line($gzfd); |
| last unless defined $msg; |
| } |
| |
| # permanently cache the message on the client |
| my $cache_success = 0; |
| if ($opt_cs_cache) { |
| unless (-f "$opt_cs_cachedir/$d[3]") { |
| umask 0077; # prevent others on the system from reading corpus messages |
| # TODO: the entire mass-check script should probably use umask 0077; |
| |
| mkdir $opt_cs_cachedir; # mkdir won't bitch if it already exists, that's ok |
| if (-d $opt_cs_cachedir && $d[3] =~ m{^/?(.*(?![^\\]\\)/)?(.+)$}) { |
| my ($path, $filename) = ($1, $2); |
| my $dir = ''; |
| $path ||= ''; |
| while ($path =~ m#((?![^\\]\\)[^/]+/)#gc) { |
| $dir .= $1; |
| mkdir "$opt_cs_cachedir/$dir"; |
| } |
| if (-d "$opt_cs_cachedir/$dir") { |
| if (open(OUT, ">$opt_cs_cachedir/$dir$filename")) { |
| print OUT $msg; |
| close $msg; |
| |
| $cache_success = 1; |
| } |
| else { |
| warn "Can't create/write message cache file $opt_cs_cachedir/$dir$filename: $!"; |
| } |
| } |
| } |
| } |
| else { |
| $cache_success = 1; |
| $cache_hits++; |
| } |
| if ($cache_success) { |
| # this is a little tricky -- we need to process the files in the |
| # path and format we've created, but the original data is needed |
| # to create a proper result later, so deal with that here. |
| $real{"$opt_cs_cachedir/$d[3]"} = \@d; |
| push @{$real{"$opt_cs_cachedir/$d[3]"}}, $num; |
| send_line($tmpfd, |
| Mail::SpamAssassin::ArchiveIterator::_index_pack($d[0], $d[1], 'f', "$opt_cs_cachedir/$d[3]")) || |
| die "mass-check: error writing out temp file in client mode\n"; |
| $actual_messages++; |
| } |
| } |
| |
| # if the cache failed try a temp file instead |
| if (!$opt_cs_cache || !$cache_success) { |
| next unless defined $msg; |
| # it's going to be a dir of file formatted messages |
| if (open(OUT, ">$tmpdir/$num")) { |
| print OUT $msg; |
| close(OUT); |
| |
| # this is a little tricky -- we need to process the files in the |
| # path and format we've created, but the original data is needed |
| # to create a proper result later, so deal with that here. |
| $real{"$tmpdir/$num"} = \@d; |
| push @{$real{"$tmpdir/$num"}}, $num; |
| send_line($tmpfd, |
| Mail::SpamAssassin::ArchiveIterator::_index_pack($d[0], $d[1], 'f', "$tmpdir/$num")) || |
| die "mass-check: error writing out temp file in client mode\n"; |
| $actual_messages++; |
| } |
| else { |
| warn "Can't create/write message temp file $tmpdir/$num: $!"; |
| } |
| } |
| } |
| # paths_only_mode |
| else { |
| # permanently cache the message on the client |
| my $cache_success = 0; |
| if ($opt_cs_cache) { |
| unless (-f "$opt_cs_cachedir/$d[3]") { |
| umask 0077; # prevent others on the system from reading corpus messages |
| # TODO: the entire mass-check script should probably use umask 0077; |
| |
| mkdir $opt_cs_cachedir; # mkdir won't bitch if it already exists, that's ok |
| if (-d $opt_cs_cachedir && $d[3] =~ m{^/?(.*(?![^\\]\\)/)?(.+)$}) { |
| my ($path, $filename) = ($1, $2); |
| my $dir = ''; |
| $path ||= ''; |
| while ($path =~ m#((?![^\\]\\)[^/]+/)#gc) { |
| $dir .= $1; |
| mkdir "$opt_cs_cachedir/$dir"; |
| } |
| if (-d "$opt_cs_cachedir/$dir") { |
| # copy the file to the local cache for use next (and this) time |
| if (copy($d[3], "$opt_cs_cachedir/$d[3]")) { |
| $cache_success = 1; |
| } |
| } |
| } |
| } |
| else { |
| $cache_success = 1; |
| $cache_hits++; |
| } |
| if ($cache_success) { |
| # this is a little tricky -- we need to process the files in the |
| # path and format we've created, but the original data is needed |
| # to create a proper result later, so deal with that here. |
| $real{"$opt_cs_cachedir/$d[3]"} = \@d; |
| push @{$real{"$opt_cs_cachedir/$d[3]"}}, $num; |
| send_line($tmpfd, |
| Mail::SpamAssassin::ArchiveIterator::_index_pack($d[0], $d[1], 'f', "$opt_cs_cachedir/$d[3]")) || |
| die "mass-check: error writing out temp file in client mode\n"; |
| $actual_messages++; |
| } |
| } |
| |
| # if the messages isn't in our cache use the supplied path instead |
| if (!$opt_cs_cache || !$cache_success) { |
| # in paths_only mode, there's no kluging between formats since we're |
| # reading the same corpus, however we do still need to track server |
| # message number to message data so our results will be useable. |
| $real{$d[3]} = \@d; |
| push @{$real{$d[3]}}, $num; |
| send_line($tmpfd, $index) || |
| die "mass-check: error writing out temp file in client mode\n"; |
| $actual_messages++; |
| } |
| } |
| } |
| |
| $gzfd->close; |
| unlink $result; |
| |
| # if the server tries to give us messages, but some are errored, |
| # we need to know that so we don't try to process them all. |
| if ($total_messages != $actual_messages) { |
| status("server actually gave us $actual_messages messages") if ($opt_progress); |
| $total_messages = $actual_messages; |
| } |
| |
| if ($opt_progress) { |
| status('starting run stage'); |
| } |
| |
| # we're about to start running, so go back to the start of the file |
| seek $tmpfd, 0, 0; |
| |
| run_through_messages(); |
| |
| # we're done with the temp file -- bye bye |
| close($tmpfd); |
| |
| # figure out new max messages, try keeping ~cs_timeout between runs |
| my $time_end = time; |
| |
| # if we only requested a small number of messages, it may take <1s to |
| # run through them, so fake it and say it took 1s. |
| if ($time_end == $time_start) { |
| $time_end++; |
| } |
| |
| if ($opt_progress) { |
| status('completed run stage'); |
| } |
| |
| status('completed run in '.($time_end-$time_start).' seconds') if ($opt_noisy); |
| $msgnum = int($msgnum * $opt_cs_timeout / ($time_end-$time_start)) || 1; |
| } |
| } |
| |
| # if we were using a temp dir, clean it out and then remove it |
| if ($tmpdir) { |
| clean_dir($tmpdir); |
| rmdir $tmpdir; |
| } |
| } |
| |
| # scan the client's cache and return a path to a gzip archive of AI output |
| sub scan_client_cache { |
| status('starting cache scan stage') if ($opt_progress); |
| |
| # make sure the cachedir exists so that AI doesn't bail out |
| mkdir $opt_cs_cachedir unless -e $opt_cs_cachedir; |
| |
| # Make a temp file and delete it |
| my $tmpf; |
| ($tmpf, $tmpfd) = Mail::SpamAssassin::Util::secure_tmpfile(); |
| die 'mass-check: failed to create temp file' unless $tmpf; |
| unlink $tmpf or die "mass-check: unlink '$tmpf': $!"; |
| |
| my @targets = ("h:dir:$opt_cs_cachedir"); |
| generate_queue(\@targets, $tmpfd); |
| |
| # we now have a temporary file with the messages to process |
| seek($tmpfd, 0, 0); |
| # the first line is the number of messages |
| $total_messages = read_line($tmpfd); |
| showdots_finish(); |
| status("completed cache scan stage, $total_messages messages") if ($opt_progress); |
| |
| unless ($tmpfd) { |
| status('cache scan failed'); |
| return; |
| } |
| status('compressing cache data') if ($opt_progress); |
| |
| # create a temp file for compression |
| my($gzpath, $gzfd) = Mail::SpamAssassin::Util::secure_tmpfile(); |
| die "Can't make tempfile, exiting" unless $gzpath; |
| $gzfd->close; |
| $gzfd = IO::Zlib->new($gzpath, 'wb') || die "Can't create temp gzip file: $!"; |
| |
| while (my $msg = read_line($tmpfd)) { |
| $msg =~ s/\r?\n$//; |
| my @d = Mail::SpamAssassin::ArchiveIterator::_index_unpack($msg); |
| $d[3] =~ s/^$opt_cs_cachedir//; |
| |
| send_line($gzfd, Mail::SpamAssassin::ArchiveIterator::_index_pack($d[0], $d[1], $d[2], $d[3])) || |
| die "mass-check: error writing out temp file in client mode\n"; |
| } |
| close $tmpfd; |
| undef $tmpfd; |
| $gzfd->close; |
| status('cache data compressed to '.(-s $gzpath).' bytes') if ($opt_progress); |
| |
| return $gzpath; |
| } |
| |
| ############################################################################ |
| |
| # in server mode, just return the ref to the message data |
| sub wanted_server { |
| my ($class, $id, $time, $dataref, $format) = @_; |
| return $dataref; |
| } |
| |
| # very similar to result() except the result has the message number at the |
| # front, so strip it off and then set the POST data appropriately. |
| sub result_client { |
| my ($class, $result, $time) = @_; |
| |
| # don't open results files until we get here to avoid overwriting files |
| init_results() if !$init_results; |
| |
| if ($class eq "s") { |
| $spam_count++; |
| } |
| elsif ($class eq "h") { |
| $ham_count++; |
| } |
| |
| $total_count++; |
| |
| if ($opt_progress) { |
| progress($time); |
| } |
| |
| if ($result =~ s/^(\d+)\s+//m) { |
| $postdata{$1} = $result; |
| } |
| else { |
| warn ">> WTH!? result is not in the correct format: $result\n"; |
| # 20071114: bit of a hack |
| # prevent malformed cs_client message result lines from preventing the |
| # cs_server running in cs_schedule cache mode from completing |
| # TODO: find out how result lines get malformed (malformed result lines |
| # will still hang any cs_server not running with cs_schedule_cache) |
| $failed_msgs++; |
| } |
| } |
| |
| sub aidbg { |
| if (would_log("dbg", "mass-check") == 2) { |
| dbg (@_); |
| } |
| } |
| |
| sub deal_with_before_after { |
| my($which, $time) = @_; |
| |
| if ($time && $time =~ /^-\d+$/) { |
| $time = time + $time; |
| } |
| elsif ($time && $time !~ /^-?\d+$/) { |
| if (HAS_TIME_PARSEDATE) { |
| $time = Time::ParseDate::parsedate($time, GMT => 1, PREFER_PAST => 1); |
| } |
| else { |
| die "You need Time::ParseDate if you use either the --before or --after option."; |
| } |
| } |
| |
| if ($which eq 'before') { |
| $opt_before = $time; |
| } |
| else { |
| $opt_after = $time; |
| } |
| |
| if ($opt_before && $opt_after && $opt_after >= $opt_before) { |
| die "--before ($opt_before) <= --after ($opt_after) -- conflict!"; |
| } |
| } |
| |
| sub generate_queue { |
| my ($targets, $tmpfd) = @_; |
| |
| # scan the targets and get the number and list of messages |
| $iter->_scan_targets($targets, |
| sub { |
| my($self, $date, $class, $format, $mail) = @_; |
| push(@{$self->{$class}}, Mail::SpamAssassin::ArchiveIterator::_index_pack($date, $class, $format, $mail)); |
| } |
| ); |
| |
| # deal with opt_head and opt_tail |
| top_and_tail_messages($iter->{h}); |
| top_and_tail_messages($iter->{s}); |
| |
| my $messages; |
| if ($opt_n) { |
| # OPT_N == 1 means don't bother sorting on message receive date |
| |
| # for ease of memory, we'll play with pointers |
| $messages = $iter->{s}; |
| undef $iter->{s}; |
| if ($iter->{h}) { |
| push(@{$messages}, @{$iter->{h}}); |
| undef $iter->{h}; |
| } |
| } |
| else { |
| # OPT_N == 0 means sort on message receive date |
| |
| # Sort the spam and ham groups by date |
| my @s = @{$iter->{s}}; |
| undef $iter->{s}; |
| my @h = @{$iter->{h}}; |
| undef $iter->{h}; |
| |
| # interleave ordered spam and ham |
| if (@s && @h) { |
| my $ratio = @s / @h; |
| while (@s && @h) { |
| push @{$messages}, (@s / @h > $ratio) ? (shift @s) : (shift @h); |
| } |
| } |
| # push the rest onto the end |
| push @{$messages}, @s, @h; |
| } |
| |
| # head or tail < 0 means crop the total list, negate the value appropriately |
| if ($opt_tail < 0) { |
| splice(@{$messages}, 0, $opt_tail); |
| } |
| if ($opt_head < 0) { |
| splice(@{$messages}, -$opt_head); |
| } |
| |
| my $num = $Mail::SpamAssassin::ArchiveIterator::MESSAGES = scalar(@{$messages}); |
| |
| # Dump out the number of messages and the message index info to |
| # the temp file |
| send_line($tmpfd, $num, @{$messages}); |
| } |
| |
| sub top_and_tail_messages { |
| my ($ary) = @_; |
| |
| if ($opt_n) { |
| # OPT_N == 1 means don't bother sorting on message receive date |
| |
| # head or tail > 0 means crop each list |
| if ($opt_tail > 0) { |
| splice(@{$ary}, 0, -$opt_tail); |
| } |
| if ($opt_head > 0) { |
| splice(@{$ary}, min ($opt_head, scalar @{$ary})); |
| } |
| } |
| else { |
| # OPT_N == 0 means sort on message receive date |
| |
| # Sort the spam and ham groups by date |
| my @s = sort { $a cmp $b } @{$ary}; |
| |
| # head or tail > 0 means crop each list |
| if ($opt_tail > 0) { |
| splice(@s, 0, -$opt_tail); |
| } |
| if ($opt_head > 0) { |
| splice(@s, min ($opt_head, scalar @s)); |
| } |
| |
| @{$ary} = @s; |
| } |
| } |
| |
| sub min { |
| return ($_[0] < $_[1] ? $_[0] : $_[1]); |
| } |
| |
| ############################################################################ |
| |
| sub run_post_scan { |
| return unless $opt_run_post_scan; |
| system($opt_run_post_scan); |
| if ($? >> 8 != 0) { |
| warn "$opt_run_post_scan failed"; |
| } |
| } |
| |