| #!/usr/bin/env perl |
| |
| ############################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| ############################################################################### |
| # Tests for pig streaming. |
| # |
| # This configuration file follows streaming functional spec: http://wiki.apache.org/pig/PigStreamingFunctionalSpec |
| |
| $cfg = { |
| 'driver' => 'Pig', |
| 'nummachines' => 5, |
| |
| 'groups' => [ |
| { |
| # This group is for local mode testing |
| 'name' => 'StreamingLocal', |
| 'sortBenchmark' => 1, |
| 'sortResults' => 1, |
| 'floatpostprocess' => 1, |
| 'delimiter' => ' ', |
| 'tests' => [ |
| { |
| #Section 1.1: perl script, no parameters |
| 'num' => 1, |
| 'execonly' => 'local', |
| 'pig' => q# |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = foreach A generate $0, $1, $2; |
| C = stream B through `perl :SCRIPTHOMEPATH:/PigStreaming.pl`; |
| store C into ':OUTPATH:';#, |
| 'sql' => "select name, age, gpa from studenttab10k;", |
| }, |
| { |
| #Section 1.3: define clause; perl script, with parameters |
| 'num' => 2, |
| 'execonly' => 'local', |
| 'pig' => q# |
| define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl - -`; |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = foreach A generate $0, $1, $2; |
| C = stream B through CMD; |
| store C into ':OUTPATH:';#, |
| 'sql' => "select name, age, gpa from studenttab10k;", |
| }, |
| { |
| # Section 1.4: grouped data |
| 'num' => 3, |
| 'execonly' => 'local', |
| 'pig' => q# |
| define CMD `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = group A by $0; |
| C = foreach B generate flatten(A); |
| D = stream C through CMD; |
| store D into ':OUTPATH:';#, |
| 'sql' => "select name, count(*) from studenttab10k group by name;", |
| }, |
| { |
| # Section 1.4: grouped and ordered data |
| 'num' => 4, |
| 'execonly' => 'local', |
| 'pig' => q# |
| define CMD `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0 1`; |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = group A by $0; |
| C = foreach B { |
| D = order A by $1; |
| generate flatten(D); |
| }; |
| E = stream C through CMD; |
| store E into ':OUTPATH:';#, |
| 'sql' => "select name, age, count(*) from studenttab10k group by name, age;", |
| }, |
| { |
| # Section 1.5: multiple streaming operators - adjacent - before local rearrange |
| 'num' => 5, |
| 'execonly' => 'local', |
| 'pig' => q# |
| register :FUNCPATH:/testudf.jar; |
| define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump); |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = stream A through `perl :SCRIPTHOMEPATH:/PigStreaming.pl`; |
| C = stream B through CMD as (name, age, gpa); |
| D = foreach C generate name, age; |
| store D into ':OUTPATH:';#, |
| 'sql' => "select name, age from studenttab10k;", |
| }, |
| { |
| # Section 1.5: multiple streaming operators - not adjacent - before local rearrange |
| 'num' => 6, |
| 'execonly' => 'local', |
| 'pig' => q# |
| register :FUNCPATH:/testudf.jar; |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump); |
| B = stream A through CMD as (name, age, gpa); |
| C = filter B by age < '20'; |
| D = foreach C generate name; |
| define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl - - :SCRIPTHOMEPATH:/nameMap`; |
| E = stream D through CMD; |
| store E into ':OUTPATH:';#, |
| 'sql' => "select UPPER(name) from studenttab10k where age < '20';", |
| }, |
| { |
| # Section 1.5: multiple streaming operators - adjacent - after local rearrange |
| 'num' => 7, |
| 'execonly' => 'local', |
| 'pig' => q# |
| register :FUNCPATH:/testudf.jar; |
| define CMD1 `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0 1`; |
| define CMD2 `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump); |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = group A by $0; |
| C = foreach B { |
| D = order A by $1; |
| generate flatten(D); |
| }; |
| E = stream C through CMD1; |
| F = stream E through CMD2; |
| store F into ':OUTPATH:';#, |
| 'sql' => "select name, age, count(*) from studenttab10k group by name, age;", |
| }, |
| { |
| # Section 1.5: multiple streaming operators - one before and one after local rearrange |
| # same alias name |
| 'num' => 8, |
| 'execonly' => 'local', |
| 'pig' => q# |
| register :FUNCPATH:/testudf.jar; |
| define CMD1 `perl :SCRIPTHOMEPATH:/GroupBy.pl '\t' 0`; |
| define CMD2 `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump); |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = stream A through CMD2; |
| C = group B by $0; |
| D = foreach C generate flatten(B); |
| B = stream D through CMD1; |
| store B into ':OUTPATH:';#, |
| 'sql' => "select name, count(*) from studenttab10k group by name;", |
| }, |
| { |
| # Section 3.1: use of custom deserializer |
| 'num' => 9, |
| 'execonly' => 'local', |
| 'pig' => q# |
| define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl` output(stdout using PigStreaming()); |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = stream A through CMD; |
| store B into ':OUTPATH:';#, |
| 'sql' => "select name, age, gpa from studenttab10k;", |
| }, |
| { |
| # Section 3.1: use of custom serializer and deserializer |
| 'num' => 10, |
| 'execonly' => 'local', |
| 'pig' => q# |
| register :FUNCPATH:/testudf.jar; |
| define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output(stdout using org.apache.pig.test.udf.streaming.DumpStreamer); |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = stream A through CMD as (name, age, gpa); |
| C = foreach B generate name, age; |
| store C into ':OUTPATH:';#, |
| 'sql' => "select name, age from studenttab10k;", |
| }, |
| { |
| # Section 3.3: streaming application reads from file rather than stdin |
| 'num' => 11, |
| 'execonly' => 'local', |
| 'pig' => q# |
| define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl foo -` input('foo'); |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = stream A through CMD; |
| store B into ':OUTPATH:';#, |
| 'sql' => "select name, age, gpa from studenttab10k;", |
| }, |
| { |
| # Section 3.4: streaming application writes single output to a file |
| 'num' => 12, |
| 'execonly' => 'local', |
| 'pig' => q# |
| define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl - foo :SCRIPTHOMEPATH:/nameMap` output('foo' using PigStreaming); |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = foreach A generate $0; |
| C = stream B through CMD; |
| store C into ':OUTPATH:';#, |
| 'sql' => "select upper(name) from studenttab10k;", |
| }, |
| { |
| # Section 3.4: streaming application writes multiple outputs to file |
| 'num' => 13, |
| 'execonly' => 'local', |
| 'pig' => q# |
| register :FUNCPATH:/testudf.jar; |
| define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl - sio_5_1 sio_5_2` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output('sio_5_1', 'sio_5_2'); |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = stream A through CMD; |
| store B into ':OUTPATH:';#, |
| 'sql' => "select name, age, gpa from studenttab10k;", |
| }, |
| { |
| # Section 3.4: streaming application writes multiple outputs: 1 to file and 1 to stdout |
| 'num' => 14, |
| 'execonly' => 'local', |
| 'pig' => q# |
| register :FUNCPATH:/testudf.jar; |
| define CMD `perl :SCRIPTHOMEPATH:/PigStreamingDepend.pl - - sio_5_2` input(stdin using org.apache.pig.test.udf.streaming.StreamingDump) output(stdout, 'sio_5_2'); |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = stream A through CMD; |
| store B into ':OUTPATH:';#, |
| 'sql' => "select name, age, gpa from studenttab10k;", |
| }, |
| { |
| # Section 4.3: integration with parameter substitition |
| 'num' => 15, |
| 'execonly' => 'local', |
| 'pig_params' => ['-p', qq(script_name='PigStreaming.pl')], |
| 'pig' => q# |
| define CMD `perl :SCRIPTHOMEPATH:/$script_name - - :SCRIPTHOMEPATH:/nameMap`; |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = foreach A generate $0; |
| C = stream B through CMD as (name); |
| D = group C by name; |
| E = foreach D generate group, COUNT(C); |
| store E into ':OUTPATH:';#, |
| 'sql' => "select upper(name) as nm, count(*) from studenttab10k group by nm;", |
| }, |
| { |
| # Section 5.1: load/store optimization |
| 'num' => 16, |
| 'execonly' => 'local', |
| 'pig' => q# |
| define CMD `perl :SCRIPTHOMEPATH:/PigStreaming.pl`; |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| C = stream A through CMD; |
| store C into ':OUTPATH:';#, |
| 'sql' => "select name, age, gpa from studenttab10k;", |
| }, |
| { |
| # PIG-272: problem with optimization and intermediate store |
| 'num' => 17, |
| 'execonly' => 'local', |
| 'pig' => q# |
| define CMD1 `perl -ne 'print $_;print STDERR "stderr $_";'`; |
| define CMD2 `:SCRIPTHOMEPATH:/Split.pl 3` input(stdin using PigStreaming(',')); |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = stream A through CMD1; |
| C = stream B through CMD1; |
| D = stream C through CMD2; |
| store D into ':OUTPATH:';#, |
| 'sql' => "select name, age, gpa from studenttab10k;", |
| }, |
| { |
| # PIG-272: problem with optimization and intermediate store |
| 'num' => 18, |
| 'execonly' => 'local', |
| 'pig' => q# |
| define CMD1 `perl -ne 'print $_;'`; |
| define CMD2 `:SCRIPTHOMEPATH:/Split.pl 3` input(stdin using PigStreaming(',')); |
| A = load ':INPATH:/singlefile/studenttab10k'; |
| B = stream A through CMD1; |
| store B into ':OUTPATH:.intermediate'; |
| C = stream B through CMD1; |
| D = stream C through CMD2; |
| E = JOIN B by $0, D by $0; |
| store E into ':OUTPATH:';#, |
| 'notmq' => 1, |
| 'sql' => "select A.name, A.age, A.gpa, B.name, B.age, B.gpa from studenttab10k as A join studenttab10k as B using(name);", |
| }, |
| ] |
| }, |
| ] |
| } |
| ; |
| |