blob: 7e827f644cb7c12aad8b179d634552c39de8df15 [file] [log] [blame]
#!/usr/bin/env perl
############################################################################
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
# Tests for pig streaming.
#
# This configuration file follows streaming functional spec: http://wiki.apache.org/pig/PigStreamingFunctionalSpec
$cfg = {
'driver' => 'Pig',
'nummachines' => 5,
'groups' => [
{
# This group is for local mode testing
'name' => 'StreamingLocal',
'sortBenchmark' => 1,
'sortResults' => 1,
'floatpostprocess' => 1,
'delimiter' => ' ',
'tests' => [
{
#Section 1.1: perl script, no parameters
'num' => 1,
'execonly' => 'local',
'pig' => q#
A = load ':INPATH:/singlefile/studenttab10k';
B = foreach A generate $0, $1, $2;
C = stream B through `perl :SCRIPTHOMEPATH:/PigStreaming.pl`;
store C into ':OUTPATH:';#,
'sql' => "select name, age, gpa from studenttab10k;",
},
{
#Section 1.3: define clause; perl script, with parameters
'num' => 2,
'execonly' => 'local',
'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl - -`;
A = load ':INPATH:/singlefile/studenttab10k';
B = foreach A generate $0, $1, $2;
C = stream B through CMD;
store C into ':OUTPATH:';#,
'sql' => "select name, age, gpa from studenttab10k;",
},
{
# Section 1.4: grouped data
'num' => 3,
'execonly' => 'local',
'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
A = load ':INPATH:/singlefile/studenttab10k';
B = group A by $0;
C = foreach B generate flatten(A);
D = stream C through CMD;
store D into ':OUTPATH:';#,
'sql' => "select name, count(*) from studenttab10k group by name;",
},
{
# Section 1.4: grouped and ordered data
'num' => 4,
'execonly' => 'local',
'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0 1`;
A = load ':INPATH:/singlefile/studenttab10k';
B = group A by $0;
C = foreach B {
D = order A by $1;
generate flatten(D);
};
E = stream C through CMD;
store E into ':OUTPATH:';#,
'sql' => "select name, age, count(*) from studenttab10k group by name, age;",
},
{
# Section 1.5: multiple streaming operators - adjacent - before local rearrange
'num' => 5,
'execonly' => 'local',
'pig' => q#
register :FUNCPATH:/testudf.jar;
define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump);
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through `perl :SCRIPTHOMEPATH:/PigStreaming.pl`;
C = stream B through CMD as (name, age, gpa);
D = foreach C generate name, age;
store D into ':OUTPATH:';#,
'sql' => "select name, age from studenttab10k;",
},
{
# Section 1.5: multiple streaming operators - not adjacent - before local rearrange
'num' => 6,
'execonly' => 'local',
'pig' => q#
register :FUNCPATH:/testudf.jar;
A = load ':INPATH:/singlefile/studenttab10k';
define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump);
B = stream A through CMD as (name, age, gpa);
C = filter B by age < '20';
D = foreach C generate name;
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl - - :SCRIPTHOMEPATH:/nameMap`;
E = stream D through CMD;
store E into ':OUTPATH:';#,
'sql' => "select UPPER(name) from studenttab10k where age < '20';",
},
{
# Section 1.5: multiple streaming operators - adjacent - after local rearrange
'num' => 7,
'execonly' => 'local',
'pig' => q#
register :FUNCPATH:/testudf.jar;
define CMD1 `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0 1`;
define CMD2 `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump);
A = load ':INPATH:/singlefile/studenttab10k';
B = group A by $0;
C = foreach B {
D = order A by $1;
generate flatten(D);
};
E = stream C through CMD1;
F = stream E through CMD2;
store F into ':OUTPATH:';#,
'sql' => "select name, age, count(*) from studenttab10k group by name, age;",
},
{
# Section 1.5: multiple streaming operators - one before and one after local rearrange
# same alias name
'num' => 8,
'execonly' => 'local',
'pig' => q#
register :FUNCPATH:/testudf.jar;
define CMD1 `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0`;
define CMD2 `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump);
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD2;
C = group B by $0;
D = foreach C generate flatten(B);
B = stream D through CMD1;
store B into ':OUTPATH:';#,
'sql' => "select name, count(*) from studenttab10k group by name;",
},
{
# Section 3.1: use of custom deserializer
'num' => 9,
'execonly' => 'local',
'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl` output(stdout using PigStreaming());
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD;
store B into ':OUTPATH:';#,
'sql' => "select name, age, gpa from studenttab10k;",
},
{
# Section 3.1: use of custom serializer and deserializer
'num' => 10,
'execonly' => 'local',
'pig' => q#
register :FUNCPATH:/testudf.jar;
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output(stdout using org.apache.pig.test.udf.streaming.DumpStreamer);
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD as (name, age, gpa);
C = foreach B generate name, age;
store C into ':OUTPATH:';#,
'sql' => "select name, age from studenttab10k;",
},
{
# Section 3.3: streaming application reads from file rather than stdin
'num' => 11,
'execonly' => 'local',
'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl foo -` input('foo');
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD;
store B into ':OUTPATH:';#,
'sql' => "select name, age, gpa from studenttab10k;",
},
{
# Section 3.4: streaming application writes single output to a file
'num' => 12,
'execonly' => 'local',
'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl - foo :SCRIPTHOMEPATH:/nameMap` output('foo' using PigStreaming);
A = load ':INPATH:/singlefile/studenttab10k';
B = foreach A generate $0;
C = stream B through CMD;
store C into ':OUTPATH:';#,
'sql' => "select upper(name) from studenttab10k;",
},
{
# Section 3.4: streaming application writes multiple outputs to file
'num' => 13,
'execonly' => 'local',
'pig' => q#
register :FUNCPATH:/testudf.jar;
define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl - sio_5_1 sio_5_2` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output('sio_5_1', 'sio_5_2');
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD;
store B into ':OUTPATH:';#,
'sql' => "select name, age, gpa from studenttab10k;",
},
{
# Section 3.4: streaming application writes multiple outputs: 1 to file and 1 to stdout
'num' => 14,
'execonly' => 'local',
'pig' => q#
register :FUNCPATH:/testudf.jar;
define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl - - sio_5_2` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output(stdout, 'sio_5_2');
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD;
store B into ':OUTPATH:';#,
'sql' => "select name, age, gpa from studenttab10k;",
},
{
# Section 4.3: integration with parameter substitition
'num' => 15,
'execonly' => 'local',
'pig_params' => ['-p', qq(script_name='PigStreaming.pl')],
'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/$script_name - - :SCRIPTHOMEPATH:/nameMap`;
A = load ':INPATH:/singlefile/studenttab10k';
B = foreach A generate $0;
C = stream B through CMD as (name);
D = group C by name;
E = foreach D generate group, COUNT(C);
store E into ':OUTPATH:';#,
'sql' => "select upper(name) as nm, count(*) from studenttab10k group by nm;",
},
{
# Section 5.1: load/store optimization
'num' => 16,
'execonly' => 'local',
'pig' => q#
define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl`;
A = load ':INPATH:/singlefile/studenttab10k';
C = stream A through CMD;
store C into ':OUTPATH:';#,
'sql' => "select name, age, gpa from studenttab10k;",
},
{
# PIG-272: problem with optimization and intermediate store
'num' => 17,
'execonly' => 'local',
'pig' => q#
define CMD1 `perl -ne 'print $_;print STDERR "stderr $_";'`;
define CMD2 `:SCRIPTHOMEPATH:/Split.pl 3` input(stdin using PigStreaming(','));
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD1;
C = stream B through CMD1;
D = stream C through CMD2;
store D into ':OUTPATH:';#,
'sql' => "select name, age, gpa from studenttab10k;",
},
{
# PIG-272: problem with optimization and intermediate store
'num' => 18,
'execonly' => 'local',
'pig' => q#
define CMD1 `perl -ne 'print $_;'`;
define CMD2 `:SCRIPTHOMEPATH:/Split.pl 3` input(stdin using PigStreaming(','));
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD1;
store B into ':OUTPATH:.intermediate';
C = stream B through CMD1;
D = stream C through CMD2;
E = JOIN B by $0, D by $0;
store E into ':OUTPATH:';#,
'notmq' => 1,
'sql' => "select A.name, A.age, A.gpa, B.name, B.age, B.gpa from studenttab10k as A join studenttab10k as B using(name);",
},
]
},
]
}
;