blob: c6fd45d721574abc8a92d86f8be93fe774fdd55d [file] [log] [blame]
#!/usr/bin/perl
#
# mk-corpus-link-farm - distribute a bunch of mail tidily into a set of corpora
# (see EOF for an example/testcase)
#
# Note: creates symbolic links only; renaming/moving the originals will
# cause breakage.
#
# <@LICENSE>
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to you under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# </@LICENSE>
use strict;
use warnings;
sub usage {
die "
usage: mk-corpus-link-farm [options] [dest] src [...]
dest:
-dest outputdir [-num num]
options:
-most_recent: select the most recent messages (default)
-after=N only test mails received after time_t N (negative values
are an offset from current time, e.g. -86400 = last day)
or after date as parsed by Time::ParseDate (e.g. '-6 months')
";
}
use Time::ParseDate;
use Time::Local;
use Cwd;
use File::Path;
use File::Find;
use File::Basename;
use Data::Dumper;
use SDBM_File;
use Fcntl;
my $DEBUG;# $DEBUG=1;
my @classes = qw(ham spam);
my $srcs = [ ];
my $dests = [ ];
my $mbox_tmpdir = $ENV{TMPDIR} || "/tmp";
sub dbg;
use Getopt::Long;
our ($opt_most_recent, $opt_after, $opt_before);
$opt_most_recent = 0;
tz_init();
my $curdest;
GetOptions(
'dest=s' => sub {
my ($switch, $dir) = @_;
$curdest = {
ham => { },
spam => { },
dir => $dir
};
push (@$dests, $curdest);
},
'num=i' => sub {
my ($switch, $num) = @_;
$curdest->{num_msgs} = $num;
},
'most_recent' => \$opt_most_recent,
'before=s' => \$opt_before,
'after=s' => \$opt_after,
) or usage();
foreach my $arg (@ARGV) {
push (@$srcs, { dir => $arg });
}
# Deal with --before and --after
foreach my $time ($opt_before, $opt_after) {
if ($time && $time =~ /^-\d+$/) {
$time = time + $time;
}
elsif ($time && $time !~ /^-?\d+$/) {
$time = Time::ParseDate::parsedate($time, GMT => 1, PREFER_PAST => 1);
}
}
# test data: $srcs = [ { dir => "/src1", ham => { dests => [ ], dir =>
# "/src1/ham", num => 100 }, spam => { dests => [ ], dir => "/src1/spam", num
# => 100 }, }, { dir => "/src2", ham => { dests => [ ], dir => "/src2/ham", num
# => 300 }, spam => { dests => [ ], dir => "/src2/spam", num => 300 }, }, { dir
# => "/src3", ham => { dests => [ ], dir => "/src3/ham", num => 500 }, spam =>
# { dests => [ ], dir => "/src3/spam", num => 500 }, } ];
my $cwd = cwd();
# use an on-disk file -- this list can get pretty big!
my $poss_del_path = "$mbox_tmpdir/dels.$$";
my $poss_delete;
{
my %h;
tie(%h, 'SDBM_File', $poss_del_path, O_RDWR|O_CREAT, 0600)
or die "Couldn't tie SDBM file $poss_del_path: $!; aborting";
$poss_delete = \%h;
}
my $mbox_work = "$mbox_tmpdir/mboxes.d";
if (-d $mbox_work) {
mark_for_poss_deletion($mbox_work);
}
main();
# and clean up again.
unlink $poss_del_path;
unlink "$poss_del_path.dir";
unlink "$poss_del_path.pag";
exit;
sub main {
find_srcs();
dist_across_dests();
make_links_in_dests();
perform_poss_deletion();
}
sub find_srcs {
foreach my $src (@$srcs) {
my $num_files;
my @mboxes = ();
my $cb = sub {
if (-f $_ && -r _) {
if ($_ =~ /\.mbox/i) {
push @mboxes, $File::Find::name;
} else {
$num_files++;
}
}
};
$src->{ham} = { num => 0, dests => [ ] };
$src->{spam} = { num => 0, dests => [ ] };
my $try_dir = "$src->{dir}/ham";
if (-d $try_dir) {
$num_files = 0;
@mboxes = ();
File::Find::find({ wanted => $cb, follow => 1 }, $try_dir);
foreach my $mbox (@mboxes) {
$num_files += mbox_count($mbox);
}
$src->{ham}{subdir} = $try_dir;
$src->{ham}{num} = $num_files;
}
$try_dir = "$src->{dir}/spam";
if (-d $try_dir) {
$num_files = 0;
@mboxes = ();
File::Find::find({ wanted => $cb, follow => 1 }, $try_dir);
foreach my $mbox (@mboxes) {
$num_files += mbox_count($mbox);
}
$src->{spam}{subdir} = $try_dir;
$src->{spam}{num} = $num_files;
}
print "$src->{dir}: found $src->{ham}{num} ham, $src->{spam}{num} spam\n";
}
}
sub dist_across_dests {
my @srcorder = @$srcs;
foreach my $dest (@$dests) {
my %want = ();
my $wantnum = $dest->{num_msgs} || 99999999;
foreach my $class (@classes) {
$want{$class} = $wantnum;
}
$dest->{srcs} = [ ];
print "\n$dest->{dir}: want $wantnum messages\n";
foreach my $class (@classes) {
foreach my $src (@srcorder) {
last unless ($want{$class} > 0);
allocate ($src, $dest, \$want{$class}, $class);
}
}
foreach my $class (@classes) {
print "$class:";
($class eq 'ham') and print " ";
my $added = 0;
foreach my $src (@{$dest->{$class}{srcs}}) {
print " $src->{num} of $src->{from}{$class}{subdir}";
$added += $src->{num};
}
print "\n";
if ($want{$class} > 0) {
warn " WARNING: failed to fill $dest->{dir}/$class: ".
"only $added, wanted $want{$class} more\n";
}
}
# for the next dest, try to take some more entries from
# other sources as well. do this by moving the source that's
# currently at the head of the list, to the end.
my $first = shift @srcorder;
@srcorder = (@srcorder, $first);
}
}
sub make_links_in_dests {
foreach my $class (@classes) {
foreach my $dest (@$dests) {
my $dir = $dest->{dir}.'/'.$class;
if (-d $dir) {
mark_for_poss_deletion($dir);
}
else {
mkpath($dir) or warn "cannot mkdir $dir: $!";
}
}
foreach my $src (@$srcs) {
_mklink($class, $src);
}
}
}
sub _mklink {
my ($class, $src) = @_;
my $srcdir = $src->{$class}{subdir};
if (!$srcdir) {
dbg "no srcdir, skipping $src";
return;
}
if (!-d $srcdir) {
warn "cannot read $srcdir, ignoring: $!";
return;
}
dbg "linking from $srcdir";
# create a hash of modtime -> filepath, so we can be sure we pick up
# "new" files first if so desired. note that -M gives (now - modtime) in
# days, so larger numbers means earlier.
my %files = ();
File::Find::find({ follow => 1, wanted => sub {
return unless (-f $_ && -r _); # not a file
my @stat = stat _;
my $mtime = $stat[9];
return unless message_is_useful_by_date($mtime);
if (!exists $files{$mtime}) {
$files{$mtime} = [ ];
}
if ($_ =~ /\.mbox/i) {
push(@{$files{$mtime}}, mbox_extract_all($_));
} else {
push(@{$files{$mtime}}, $File::Find::name);
}
} }, $srcdir);
my @files = ();
foreach my $key (sort { $b <=> $a } keys %files) {
push (@files, @{$files{$key}});
}
undef %files; # no longer need that
# @files is now sorted with the "youngest" files first. check:
if (scalar @files && $files[0] && $files[1] && -M $files[0] > -M $files[-1])
{
warn "oops! files out of order, should be youngest first: ".
join(' ',@files);
}
foreach my $destobj (@{$src->{$class}{dests}}) {
my $dest = $destobj->{dest};
my $num = $destobj->{num};
my $destdir = $dest->{dir};
dbg " linking $num into $destdir";
my $i;
for ($i = 0; $i < $num; $i++)
{
my $srcname = shift @files;
if (!$srcname) {
# die "oops! ran out of srcs. at $i / $num. dump: ".Dumper($destobj);
last;
}
my $dstname = $srcname;
$dstname =~ s/[^-_\.A-Za-z0-9]/_/gs;
$dstname =~ s/_+/_/gs;
$dstname =~ s/^_//gs;
$dstname = $destdir."/".$class."/".$dstname;
if ($srcname !~ m,^/,) { # unrooted. root it
$srcname = $cwd.'/'.$srcname;
}
remove_from_poss_delete($dstname);
if (-l $dstname) {
my $link = readlink($dstname);
if ($link eq $srcname) {
dbg " $srcname already linked to $dstname";
next;
}
unlink $dstname;
}
if (symlink($srcname, $dstname)) {
dbg " $srcname -> $dstname";
} else {
warn "symlink $srcname -> $dstname failed: $!";
}
}
}
}
sub allocate {
my ($src, $dest, $nhamref, $class) = @_;
my $nsrc = $src->{$class}{num};
dbg "$class nsrc=$nsrc nwanted=$$nhamref";
if ($nsrc == 0) {
dbg "already exhausted src";
}
elsif ($nsrc <= $$nhamref) {
dbg "exhausted src";
push (@{$dest->{$class}{srcs}}, { from => $src, num => $nsrc });
push (@{$src->{$class}{dests}}, { dest => $dest, num => $nsrc });
$$nhamref -= $nsrc;
$src->{$class}{num} = 0;
}
else {
dbg "filled dest, some left in src";
push (@{$dest->{$class}{srcs}}, { from => $src, num => $$nhamref });
push (@{$src->{$class}{dests}}, { dest => $dest, num => $$nhamref });
$src->{$class}{num} -= $$nhamref;
$$nhamref = 0;
}
}
sub mark_for_poss_deletion {
my ($dir) = @_;
File::Find::find({ follow => 1, wanted => sub {
return if (/mboxcountcache$/);
if (!-d $_) {
my $fname = $File::Find::name;
$poss_delete->{$fname} = 1;
dbg("marked as deleteable: $fname");
} else {
# TODO: delete dirs? for now, leave 'em behind
}
} }, $dir);
}
sub perform_poss_deletion {
foreach my $fname (keys %{$poss_delete}) {
unlink $fname or warn "cannot unlink $fname";
}
}
sub remove_from_poss_delete {
my ($fname) = @_;
if (exists $poss_delete->{$fname}) {
delete $poss_delete->{$fname};
return 1;
} else {
return 0;
}
}
sub mbox_count {
my ($mboxpath) = @_;
print "counting mbox: $mboxpath\n";
return _mbox_extract_all($mboxpath, 1);
}
sub mbox_extract_all {
my ($mboxpath) = @_;
print "extracting mbox: $mboxpath\n";
return _mbox_extract_all($mboxpath, 0);
}
sub _mbox_extract_all {
my ($mboxpath, $justcount) = @_;
# create an area to hold extracted mbox files
# this cannot use $$, it must remain the same between runs
if (!-d $mbox_work) {
mkdir $mbox_work or die "cannot create tmpdir: $mbox_work";
# fatal error, could be an attack
}
my $countcache = get_mbox_name ($mboxpath, 0);
$countcache =~ s/OFF\d+$/mboxcountcache/gs;
if ($justcount && -f $countcache && -M $countcache < -M $mboxpath) {
open (CACHE, "<$countcache");
my $count = <CACHE> + 0;
close CACHE;
return $count;
}
my $counter = 0;
my @created_files = ();
open (INPUT, "<$mboxpath") or die "cannot read $mboxpath";
binmode INPUT;
# get stat details for the input mbox
my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
$atime,$mtime,$ctime) = stat INPUT;
# assumption: get_mbox_name() uses the same dir for all offsets
my $newname = get_mbox_name ($mboxpath, 0);
my $dir = dirname ($newname);
if (!-d $dir) {
mkdir $dir or die "cannot mkdir $dir";
}
chmod 0755, $dir or warn "cannot chmod $dir";
my $start = 0; # start of a message
my $where = 0; # current byte offset
my $in_header = 0; # are in we a header?
my $fromline;
while (!eof INPUT) {
my $offset = $start; # byte offset of this message
while (<INPUT>) {
nextfrom:
last unless defined($_);
if (substr($_,0,5) eq "From ") {
$in_header = 1;
$start = $where;
$where = tell INPUT;
$fromline = $_;
last;
}
}
last unless defined($_);
# dbg "mbox From: $counter $start $where $fromline";
if ($fromline && mbox_new_enough($fromline))
{
$counter++;
if (!$justcount) {
$newname = get_mbox_name ($mboxpath, $offset);
if (-f $newname && (-M _ >= -M INPUT)) {
# no need to recreate it, it's fresh
my $past = 0;
while (<INPUT>) {
if ($past) {
last if (!defined($_) || substr($_,0,5) eq "From ");
} else {
$past = 1;
}
}
}
else {
seek (INPUT, $where, 0);
open (OUTPUT, ">$newname") or die "cannot write to $newname";
binmode OUTPUT;
my $past = 0;
while (<INPUT>) {
if ($past) {
last if (!defined($_) || substr($_,0,5) eq "From ");
} else {
$past = 1;
}
print OUTPUT;
}
close OUTPUT or die "failed to write to $newname";
chmod 0644, $newname or warn "cannot chmod $newname";
utime $atime, $mtime, $newname
or warn "failed to touch $newname";
}
push @created_files, $newname;
remove_from_poss_delete($newname);
$where = tell INPUT;
$offset = $where;
# we've already read the next "From " line, parse it now
goto nextfrom;
}
}
}
close INPUT;
if ($justcount) {
open (CACHE, ">$countcache");
print CACHE $counter;
close CACHE;
return $counter;
}
else {
print "extracted: $mboxpath: $counter files\n";
return @created_files;
}
}
sub get_mbox_name {
my ($mboxpath, $where) = @_;
my $dstname = $mboxpath;
$dstname =~ s/[^-_\.A-Za-z0-9]/_/gs;
$dstname =~ s/_+/_/gs;
$dstname =~ s/^_//gs;
$dstname = $mbox_work."/".$dstname."/OFF".$where;
return $dstname;
}
sub mbox_new_enough {
my ($fromline) = @_;
# From xscludshmkjgc@yahoo.com Thu Apr 29 20:02:18 2004
return unless ($fromline && $fromline =~ /^From \S+ +(.*)$/);
$fromline = $1;
$fromline .= " ".local_tz() unless $fromline =~ /(?:[-+]\d{4}|\b[A-Z]{2,4}\b)/;
my $time = first_date($fromline);
return message_is_useful_by_date($time);
}
sub message_is_useful_by_date {
my ($date) = @_;
return 0 unless $date; # undef or 0 date = unusable
if (!$opt_after && !$opt_before) {
# Not using the feature
return 1;
}
elsif (!$opt_before) {
# Just care about after
return $date > $opt_after;
}
else {
return (($date < $opt_before) && ($date > $opt_after));
}
}
sub dbg {
return unless $DEBUG;
warn "debug: ".join("", @_)."\n";
}
sub first_date {
my (@strings) = @_;
foreach my $string (@strings) {
my $time = parse_rfc822_date($string);
return $time if defined($time) && $time;
}
return undef;
}
###########################################################################
my %TZ;
my %MONTH;
my $LOCALTZ;
sub tz_init {
# timezone mappings: in case of conflicts, use RFC 2822, then most
# common and least conflicting mapping
%TZ = (
# standard
'UT' => '+0000',
'UTC' => '+0000',
# US and Canada
'NDT' => '-0230',
'AST' => '-0400',
'ADT' => '-0300',
'NST' => '-0330',
'EST' => '-0500',
'EDT' => '-0400',
'CST' => '-0600',
'CDT' => '-0500',
'MST' => '-0700',
'MDT' => '-0600',
'PST' => '-0800',
'PDT' => '-0700',
'HST' => '-1000',
'AKST' => '-0900',
'AKDT' => '-0800',
'HADT' => '-0900',
'HAST' => '-1000',
# Europe
'GMT' => '+0000',
'BST' => '+0100',
'IST' => '+0100',
'WET' => '+0000',
'WEST' => '+0100',
'CET' => '+0100',
'CEST' => '+0200',
'EET' => '+0200',
'EEST' => '+0300',
'MSK' => '+0300',
'MSD' => '+0400',
'MET' => '+0100',
'MEZ' => '+0100',
'MEST' => '+0200',
'MESZ' => '+0200',
# South America
'BRST' => '-0200',
'BRT' => '-0300',
# Australia
'AEST' => '+1000',
'AEDT' => '+1100',
'ACST' => '+0930',
'ACDT' => '+1030',
'AWST' => '+0800',
# New Zealand
'NZST' => '+1200',
'NZDT' => '+1300',
# Asia
'JST' => '+0900',
'KST' => '+0900',
'HKT' => '+0800',
'SGT' => '+0800',
'PHT' => '+0800',
# Middle East
'IDT' => '+0300',
);
# month mappings
%MONTH = (jan => 1, feb => 2, mar => 3, apr => 4, may => 5, jun => 6,
jul => 7, aug => 8, sep => 9, oct => 10, nov => 11, dec => 12);
}
sub local_tz {
return $LOCALTZ if defined($LOCALTZ);
# standard method for determining local timezone
my $time = time;
my @g = gmtime($time);
my @t = localtime($time);
my $z = $t[1]-$g[1]+($t[2]-$g[2])*60+($t[7]-$g[7])*1440+($t[5]-$g[5])*525600;
$LOCALTZ = sprintf("%+.2d%.2d", $z/60, $z%60);
return $LOCALTZ;
}
sub parse_rfc822_date {
my ($date) = @_;
local ($_);
my ($yyyy, $mmm, $dd, $hh, $mm, $ss, $mon, $tzoff);
# make it a bit easier to match
$_ = " $date "; s/, */ /gs; s/\s+/ /gs;
# now match it in parts. Date part first:
if (s/ (\d+) (Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) (\d{4}) / /i) {
$dd = $1; $mon = lc($2); $yyyy = $3;
} elsif (s/ (Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) +(\d+) \d+:\d+:\d+ (\d{4}) / /i) {
$dd = $2; $mon = lc($1); $yyyy = $3;
} elsif (s/ (\d+) (Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) (\d{2,3}) / /i) {
$dd = $1; $mon = lc($2); $yyyy = $3;
} else {
dbg("util: time cannot be parsed: $date");
return undef;
}
# handle two and three digit dates as specified by RFC 2822
if (defined $yyyy) {
if (length($yyyy) == 2 && $yyyy < 50) {
$yyyy += 2000;
}
elsif (length($yyyy) != 4) {
# three digit years and two digit years with values between 50 and 99
$yyyy += 1900;
}
}
# hh:mm:ss
if (s/ (\d?\d):(\d\d)(:(\d\d))? / /) {
$hh = $1; $mm = $2; $ss = $4 || 0;
}
# numeric timezones
if (s/ ([-+]\d{4}) / /) {
$tzoff = $1;
}
# common timezones
elsif (s/\b([A-Z]{2,4}(?:-DST)?)\b/ / && exists $TZ{$1}) {
$tzoff = $TZ{$1};
}
# all other timezones are considered equivalent to "-0000"
$tzoff ||= '-0000';
# months
if (exists $MONTH{$mon}) {
$mmm = $MONTH{$mon};
}
$hh ||= 0; $mm ||= 0; $ss ||= 0; $dd ||= 0; $mmm ||= 0; $yyyy ||= 0;
# Time::Local (v1.10 at least) throws warnings when the dates cause
# a 32-bit overflow. So force a min/max for year.
if ($yyyy > 2037) {
dbg("util: date after supported range, forcing year to 2037: $date");
$yyyy = 2037;
}
elsif ($yyyy < 1970) {
dbg("util: date before supported range, forcing year to 1970: $date");
$yyyy = 1971;
}
my $time;
eval { # could croak
$time = timegm($ss, $mm, $hh, $dd, $mmm-1, $yyyy);
};
if ($@) {
dbg("util: time cannot be parsed: $date, $yyyy-$mmm-$dd $hh:$mm:$ss: $@");
return undef;
}
if ($tzoff =~ /([-+])(\d\d)(\d\d)$/) # convert to seconds difference
{
$tzoff = (($2 * 60) + $3) * 60;
if ($1 eq '-') {
$time += $tzoff;
} else {
$time -= $tzoff;
}
}
return $time;
}
sub time_to_rfc822_date {
my($time) = @_;
my @days = qw/Sun Mon Tue Wed Thu Fri Sat/;
my @months = qw/Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec/;
my @localtime = localtime($time || time);
$localtime[5]+=1900;
sprintf("%s, %02d %s %4d %02d:%02d:%02d %s", $days[$localtime[6]], $localtime[3],
$months[$localtime[4]], @localtime[5,2,1,0], local_tz());
}
###########################################################################
__DATA__
Quick test/demo. Given the following input structure:
src1/{ham,spam}/{1,2,3}
src2/{ham,spam}/{1,2}
src4/{ham,spam}/1
and this command:
../mk-corpus-link-farm \
-dest ./out1 -num 1 -dest ./out2 -num 2 -dest ./out3 -num 5 \
src*
we want:
out1/{ham,spam}/1
out2/{ham,spam}/{1,2}
out3/{ham,spam}/{1,2,3}
[and a warning that we exhausted the sources, because we actually
asked for 5 mails in each class of out3.]
test commands:
mkdir t_splitcorpus; cd t_splitcorpus; mkdir -p src{1,2,3}/{ham,spam}
for f in src1/{ham,spam}/{1,2,3} src2/{ham,spam}/{1,2} src3/{ham,spam}/1
do echo > $f ; done;
../mk-corpus-link-farm \
-dest ./out1 -num 1 -dest ./out2 -num 2 -dest ./out3 -num 5 \
src*
../mk-corpus-link-farm \
-dest ./out1 -num 1 -dest ./out2 -num 2 -dest ./out3 -num 5 \
src1/*.mbox src2 src3