blob: 934d983b2c0471fedc8933dc0adccbdd0c340087 [file] [log] [blame]
#!/usr/bin/env perl
############################################################################
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################
# SUB: Multiquery
# Please include a brief description here.
# MultiQuery_MapSplitee
# - _TEST_ The first example; one that is defined in the bug with one split
# - in the map phase
# - _TEST_ Multiple side files, all in map phase.
# - _TEST_ Two loads and two stores in map phase.
# - _TEST_ One split added in reduce phase and map-only splitee.
# - _TEST_ Pig-976: Multi-query optimization throws ClassCastException
# - _TEST_ Pig-976: Multi-query optimization throws ClassCastException
# - _TEST_ Pig-976: Multi-query optimization throws ClassCastException
# MultiQuery_MapReduceSplitee
# - _TEST_ One split added in reduce phase and one map-reduce splitee
# - _TEST_ One split in reduce phase and two Map-Reduce splitees.
# - _TEST_ Two loads and two stores in reduce phase
# - _TEST_ Implicit split with multiple side files.
# - _TEST_ Script with intermediate stores.
# - _TEST_ Implicit split with order by and multiple side files.
# - _TEST_ Self join using fragment replicate join with multiple side files.
# - _TEST_ One split in map phase and two Map-Reduce splitees with mixed combiners.
# - _TEST_ One split in map phase and two Map-Reduce splitees without combiners.
# - _TEST_ Pig-983: multi-query optimization on multiple group bys following a join or cogroup
# MultiQuery_ExplicitSplit
# - _TEST_ Explicit split with two side files.
# - _TEST_ Explicit split with order by and two side files.
# - _TEST_ Splittees with different map key types and nested splits.
# - _TEST_ Splittees with different map key type.
# - _TEST_ PigMix Test Case L12.
# - _TEST_ PigMix Test Case L12 version 2
# - _TEST_ PigMix Test Case L12 version 3 (modified to have different map key types in inner split)
# MultiQuery_Streaming
# - _TEST_ Streaming with multiple stores.
# - _TEST_ Streaming in demux.
# - _TEST_ Streaming in nested demux.
# MultiQuery_Union (Also refer Union in nightly.conf)
# - _TEST_ Multiple levels of union with join
# - _TEST_ Union with replicate join left table part of split
# - _TEST_ Union with replicate join right table part of split
# - _TEST_ Union with skewed join left table part of split
# - _TEST_ Union with skewed join right table part of split
# - _TEST_ Union with group by + combiner
# - _TEST_ Union with group by + secondary key partitioner
# - _TEST_ Union with order by
# MultiQuery_Self
# - _TEST_ Self cross
# - _TEST_ Self cogroup
# - _TEST_ Three way join (two self)
# - _TEST_ Self replicate join
# - _TEST_ Self skewed join
$cfg = {
'driver' => 'Pig',
'nummachines' => 5,
'groups' => [
{
'name' => 'MultiQuery_MapSplitee',
'floatpostprocess' => 1,
'delimiter' => ' ',
'tests' => [
{
# The first exmaple; one that is defined in the bug with one split
# in the map phase
'num' => 1,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float);
b = filter a by age < 22; store b into ':OUTPATH:.1';
c = group b by age;
d = foreach c generate group, SUM(b.gpa);
store d into ':OUTPATH:.2'; #,
'sql' => "select name, age, gpa from studenttab10k where age < 22;
select age, sum(gpa) from studenttab10k where age < 22 group by age;",
},
{
# Multiple side files, all in map phase.
'num' => 2,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float);
b = filter a by age < 22;
store b into ':OUTPATH:.1';
c = filter b by gpa > 3.0;
store c into ':OUTPATH:.2';
d = filter c by name < 'm';
store d into ':OUTPATH:.3'; #,
'sql' => "select name, age, gpa from studenttab10k where age < 22;
select name, age, gpa from studenttab10k where age < 22 and gpa > 3.0;
select name, age, gpa from studenttab10k where age < 22 and gpa > 3.0 and name < 'm';",
},
{
# Two loads and two stores in map phase.
'num' => 3,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
c = filter a by age < 20;
d = filter b by age < 20;
store c into ':OUTPATH:.1';
store d into ':OUTPATH:.2';
e = cogroup c by name, d by name;
f = foreach e generate flatten(c), flatten(d);
store f into ':OUTPATH:.3'; #,
'sql' => "select name, age, gpa from studenttab10k where age < 20;
select name, age, registration, contributions from votertab10k where age < 20;
select a.name, a.age, a.gpa, b.name, b.age, b.registration, b.contributions
from studenttab10k as a full outer join votertab10k as b using(name)
where a.age < 20 and b.age < 20;",
},
{
# One split added in reduce phase and map-only splitee.
'num' => 4,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float);
b = filter a by gpa < 3.0;
c = group b by age;
d = foreach c generate group, AVG(b.gpa);
store d into ':OUTPATH:.1';
e = filter d by $1 > 1.5;
store e into ':OUTPATH:.2'; #,
'sql' => "select age, avg(gpa) from studenttab10k where gpa < 3.0 group by age;
select age, avg(gpa) from studenttab10k where gpa < 3.0 group by age having avg(gpa) > 1.5;",
},
# Pig-976: Multi-query optimization throws ClassCastException
{
'num' => 5,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
b = group a by name;
c = group a by age;
d = foreach b generate MAX(a.age);
e = foreach c generate group, SUM(a.gpa);
store d into ':OUTPATH:.1';
store e into ':OUTPATH:.2'; #,
'sql' => "select max(age) from studenttab10k group by name;
select age, sum(gpa) from studenttab10k group by age;",
},
# Pig-976: Multi-query optimization throws ClassCastException
{
'num' => 6,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
b = group a all;
c = group a by age;
d = foreach b generate COUNT(a), MAX(a.age);
e = foreach c generate group, SUM(a.gpa);
store d into ':OUTPATH:.1';
store e into ':OUTPATH:.2'; #,
'sql' => "select count(*), max(age) from studenttab10k;
select age, sum(gpa) from studenttab10k group by age;",
},
# Pig-976: Multi-query optimization throws ClassCastException
{
'num' => 7,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
b = group a by name;
c = group a by age;
d = foreach b generate MAX(a.age), group;
e = foreach c generate group, SUM(a.gpa);
store d into ':OUTPATH:.1';
store e into ':OUTPATH:.2'; #,
'sql' => "select max(age), name from studenttab10k group by name;
select age, sum(gpa) from studenttab10k group by age;",
},
]
},
{
'name' => 'MultiQuery_MapReduceSplitee',
'floatpostprocess' => 1,
'delimiter' => ' ',
'tests' => [
{
# One split added in reduce phase and one map-reduce splitee
'num' => 1,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float);
b = filter a by gpa < 3.0;
c = group b by age;
d = foreach c generate group, AVG(b.gpa);
store d into ':OUTPATH:.1';
e = filter d by $1 > 1.5;
f = group e by $1;
g = foreach f generate group, SUM(e.$0);
store g into ':OUTPATH:.2'; #,
'sql' => "select age, avg(gpa) from studenttab10k where gpa < 3.0 group by age;
select t.b, sum(t.a) from (select age as a, avg(gpa) as b from studenttab10k
where gpa < 3.0 group by age having avg(gpa) > 1.5) as t group by t.b;",
},
{
# One split in reduce phase and two Map-Reduce splitees.
'num' => 2,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float);
b = filter a by gpa < 3.0;
c = group b by age;
d = foreach c generate group, AVG(b.gpa);
e = filter d by $1 > 1.5;
e1= group e by $1;
e2 = foreach e1 generate group, SUM(e.$0);
store e2 into ':OUTPATH:.1';
f = filter d by $1 <= 1.5;
f1 = group f by $1;
f2 = foreach f1 generate group, COUNT(f.$0);
store f2 into ':OUTPATH:.2'; #,
'sql' => "select t.c1, sum(t.c0) from (select age as c0, avg(gpa) as c1 from studenttab10k
where gpa < 3.0 group by age having avg(gpa) > 1.5) as t group by t.c1;
select t.c1, count(t.c0) from (select age as c0, avg(gpa) as c1 from studenttab10k
where gpa < 3.0 group by age having avg(gpa) <= 1.5) as t group by t.c1;",
},
{
# Two loads and two stores in reduce phase
'num' => 3,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
c = filter a by age < 20;
d = filter b by age < 20;
e = cogroup c by name, d by name;
f = foreach e generate flatten(c), flatten(d);
g = group f by d::age;
h = foreach g generate group, SUM(f.gpa);
store h into ':OUTPATH:.1';
e = filter f by c::gpa < 3.0;
store e into ':OUTPATH:.2'; #,
'sql' => "select c5, sum(c3) from (select a.name as c1, a.age as c2, a.gpa as c3, b.name as c4,
b.age as c5, b.registration as c6, b.contributions as c7
from studenttab10k as a full outer join votertab10k as b using(name)
where a.age < 20 and b.age < 20) as t group by t.c5;
select a.name, a.age, a.gpa, b.name, b.age, b.registration, b.contributions
from studenttab10k as a full outer join votertab10k as b using(name)
where a.age < 20 and b.age < 20 and a.gpa < 3.0;",
},
{
# Implicit split with multiple side files.
'num'=> 4,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
b = filter a by age > 50;
c = filter a by gpa > 3.0;
store c into ':OUTPATH:.1';
d = cogroup b by name, c by name;
e = foreach d generate flatten(b), flatten(c);
store e into ':OUTPATH:.2';
f = filter e by b::age < 75;
store f into ':OUTPATH:.3'; #,
'sql' => "select name, age, gpa from studenttab10k where gpa > 3.0;
select A.name, A.age, A.gpa, B.name, B.age, B.gpa
from (select * from studenttab10k where age > 50) as A
join (select * from studenttab10k where gpa > 3.0) as B using (name);
select A.name, A.age, A.gpa, B.name, B.age, B.gpa
from (select * from studenttab10k where age > 50) as A
join (select * from studenttab10k where gpa > 3.0) as B using (name) where A.age < 75;",
},
{
# With intermediate store
'num' => 5,
'pig' => q# A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
store A into ':OUTPATH:.1';
B = load ':OUTPATH:.1';
store B into ':OUTPATH:.2'; #,
'sql' => "select name, age, gpa from studenttab10k;
select name, age, gpa from studenttab10k;",
},
{
# Implicit split with order by and multiple side files.
'num'=> 6,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
b = filter a by age > 50;
c = filter a by gpa > 3.0;
store c into ':OUTPATH:.1';
d = cogroup b by name, c by name;
e = foreach d generate flatten(b), flatten(c);
f = order e by b::name;
store e into ':OUTPATH:.2';
f = filter e by b::age < 75;
store f into ':OUTPATH:.3'; #,
'sql' => "select name, age, gpa from studenttab10k where gpa > 3.0;
select A.name, A.age, A.gpa, B.name, B.age, B.gpa
from (select * from studenttab10k where age > 50) as A
join (select * from studenttab10k where gpa > 3.0) as B using (name)
order by A.name;
select A.name, A.age, A.gpa, B.name, B.age, B.gpa
from (select * from studenttab10k where age > 50) as A
join (select * from studenttab10k where gpa > 3.0) as B using (name)
where A.age < 75 order by A.name;",
},
# Self join using fragment replicate join with multiple side files
{
'num' => 7,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
b = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
c = filter a by age > 50;
store c into ':OUTPATH:.1';
d = filter b by gpa > 3.0;
store d into ':OUTPATH:.2';
e = join c by gpa, d by gpa using 'repl';
store e into ':OUTPATH:.3'; #,
'sql' => "select name, age, gpa from studenttab10k where age > 50;
select name, age, gpa from studenttab10k where gpa > 3.0;
select a.name, a.age, a.gpa, b.name, b.age, b.gpa
from studenttab10k as a join studenttab10k as b using(gpa)
where a.age > 50 and b.gpa > 3.0;",
},
# One split in map phase and two Map-Reduce splitees with mixed combiner.
{
'num' => 8,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float);
b = filter a by gpa < 3.0;
c = filter a by gpa >= 3.0;
d = group b by age;
e = foreach d generate group, AVG(b.gpa);
store e into ':OUTPATH:.1';
f = group c by age;
g = foreach f generate group, MAX(c.gpa) - MIN(c.gpa);
store g into ':OUTPATH:.2'; #,
'sql' => "select age, avg(gpa) from studenttab10k where gpa < 3.0 group by age;
select age, max(gpa) - min(gpa) from studenttab10k where gpa >= 3.0 group by age;",
},
# One split in map phase and two Map-Reduce splitees without combiner.
{
'num' => 9,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float);
b = filter a by gpa < 3.0;
c = filter a by gpa >= 3.0;
d = group b by age;
e = foreach d generate group, MAX(b.gpa) + MIN(b.gpa);
store e into ':OUTPATH:.1';
f = group c by age;
g = foreach f generate group, MAX(c.gpa) - MIN(c.gpa);
store g into ':OUTPATH:.2'; #,
'sql' => "select age, max(gpa) + min(gpa) from studenttab10k where gpa < 3.0 group by age;
select age, max(gpa) - min(gpa) from studenttab10k where gpa >= 3.0 group by age;",
},
# Pig-983: multi-query optimization on multiple group bys following a join or cogroup
{
'num' => 10,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
b = load ':INPATH:/singlefile/votertab10k' as (name:chararray, age:int, registration, contributions:double);
c = join a by name, b by name;
d = group c by a::age;
e = group c by b::age;
d1 = foreach d generate group, COUNT(c), MAX(c.a::gpa);
e1 = foreach e generate group, SUM(c.b::contributions);
store d1 into ':OUTPATH:.1';
store e1 into ':OUTPATH:.2'; #,
'sql' => "select a.age, count(*), max(a.gpa) from studenttab10k as a inner join votertab10k as b on (a.name = b.name) group by a.age;
select b.age, sum(b.contributions) from studenttab10k as a inner join votertab10k as b on (a.name = b.name) group by b.age;",
},
]
},
{
'name' => 'MultiQuery_ExplicitSplit',
'floatpostprocess' => 1,
'delimiter' => ' ',
'tests' => [
{
# Explicit split with two side files.
'num'=> 1,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
split a into a1 if name > 'm', a2 if name <= 'm';
store a1 into ':OUTPATH:.1';
store a2 into ':OUTPATH:.2';
b = cogroup a1 by age, a2 by age;
c = foreach b generate flatten(a1), flatten(a2);
store c into ':OUTPATH:.3'; #,
'sql' => "select name, age, gpa from studenttab10k where name > 'm';
select name, age, gpa from studenttab10k where name <= 'm';
select A.name, A.age, A.gpa, B.name, B.age, B.gpa
from (select * from studenttab10k where name > 'm') as A
join (select * from studenttab10k where name <= 'm') as B using (age);",
},
{
# Explicit split with order by and two side files.
'num'=> 2,
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
split a into a1 if age > 50, a2 if name < 'm';
b2 = distinct a2;
b1 = order a1 by name;
store b2 into ':OUTPATH:.2';
store b1 into ':OUTPATH:.1';
c = cogroup b2 by name, b1 by name;
d = foreach c generate flatten(group), COUNT($1), COUNT($2);
store d into ':OUTPATH:.3'; #,
'sql' => "select name, age, gpa from studenttab10k where age > 50 order by name;
select distinct name, age, gpa from studenttab10k where name < 'm';
select name, count(A.name), count(B.name)
from (select distinct name from studenttab10k where name < 'm') as A
join (select name from studenttab10k where age > 50) as B using (name) group by name;",
'verify_with_pig' => 1,
'verify_pig_version' => 'old',
},
# Splittees with different map key types and nested splits
{
'num' => 3,
'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name: chararray, age:int, registration, contributions:double);
b = foreach a generate name, age, contributions;
split b into c1 if age > 10, c2 if age <= 60;
split c1 into d1 if name < 'y', d2 if name >= 'c';
e = group c2 by name parallel 2;
e1 = foreach e generate group, SUM(c2.contributions);
store e1 into ':OUTPATH:.1';
f = group d1 by name parallel 3;
f1 = foreach f generate group, MAX(d1.contributions);
store f1 into ':OUTPATH:.2';
g = group d2 by age parallel 4;
g1 = foreach g generate group, COUNT(d2);
store g1 into ':OUTPATH:.3'; #,
'sql' => "select name, sum(contributions) from votertab10k where age <= 60 group by name;
select name, max(contributions) from votertab10k where (age > 10 and name < 'y') group by name;
select age, count(*) from votertab10k where (age > 10 and name >= 'c') group by age;",
},
# Splittees with different map key types
{
'num' => 4,
'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name: chararray, age:int, registration, contributions:double);
b = foreach a generate name, age, contributions;
split b into c1 if age > 50, c2 if age <= 50;
e = group c2 by name;
e1 = foreach e generate group, SUM(c2.contributions);
store e1 into ':OUTPATH:.1';
f = group c1 by age;
f1 = foreach f generate group, MAX(c1.contributions);
store f1 into ':OUTPATH:.2'; #,
'sql' => "select name, sum(contributions) from votertab10k where age <= 50 group by name;
select age, max(contributions) from votertab10k where age > 50 group by age;",
},
# PigMix Test Case L12
{
'num' => 5,
'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
b = foreach a generate name, age, contributions;
split b into c1 if age > 50, c2 if age <= 50;
split c1 into d1 if name < 'm', d2 if name >= 'm';
e = group c2 by name;
e1 = foreach e generate group, SUM(c2.contributions);
store e1 into ':OUTPATH:.1';
f = group d1 by name;
f1 = foreach f generate group, MAX(d1.contributions);
store f1 into ':OUTPATH:.2';
g = group d2 by name;
g1 = foreach g generate group, COUNT(d2);
store g1 into ':OUTPATH:.3'; #,
'sql' => "select name, sum(contributions) from votertab10k where age <= 50 group by name;
select name, max(contributions) from votertab10k where (age > 50 and name < 'm') group by name;
select name, count(*) from votertab10k where (age > 50 and name >= 'm') group by name;",
},
# PigMix Test Case L12 version 2
{
'num' => 6,
'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
b = foreach a generate name, age, contributions;
split b into c1 if age > 50, c2 if age <= 50;
split c1 into d1 if name < 'm', d2 if name >= 'm';
e = group c2 by (name, age);
e1 = foreach e generate flatten(group), SUM(c2.contributions);
store e1 into ':OUTPATH:.1';
f = group d1 by (name, age);
f1 = foreach f generate flatten(group), MAX(d1.contributions);
store f1 into ':OUTPATH:.2';
g = group d2 by (name, age);
g1 = foreach g generate flatten(group), COUNT(d2);
store g1 into ':OUTPATH:.3'; #,
'sql' => "select name, age, sum(contributions) from votertab10k where age <= 50 group by name, age;
select name, age, max(contributions) from votertab10k where (age > 50 and name < 'm') group by name, age;
select name, age, count(*) from votertab10k where (age > 50 and name >= 'm') group by name, age;",
},
# PigMix Test Case L12 version 3 (modified to have different map key types in inner split)
{
'num' => 7,
'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
b = foreach a generate name, age, contributions;
split b into c1 if age > 50, c2 if age <= 50;
split c1 into d1 if name < 'm', d2 if name >= 'm';
f = group d1 by name;
f1 = foreach f generate flatten(group), MAX(d1.contributions);
store f1 into ':OUTPATH:.1';
g = group d2 by (name, age);
g1 = foreach g generate flatten(group), COUNT(d2);
store g1 into ':OUTPATH:.2';
e = group c2 by (name, age);
e1 = foreach e generate flatten(group), SUM(c2.contributions);
store e1 into ':OUTPATH:.3'; #,
'sql' => "select name, max(contributions) from votertab10k where (age > 50 and name < 'm') group by name;
select name, age, count(*) from votertab10k where (age > 50 and name >= 'm') group by name, age;
select name, age, sum(contributions) from votertab10k where age <= 50 group by name, age;",
},
]
},
{
'name' => 'MultiQuery_Streaming',
'floatpostprocess' => 1,
'delimiter' => ' ',
'tests' => [
{
# Streaming with multiple stores
'num' => 1,
'pig' => q# define CMD1 `perl -ne 'print $_;'`;
define CMD2 `perl -ne 'print $_;'`;
A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
B = stream A through CMD1 as (name, age, gpa);
store B into ':OUTPATH:.1';
C = stream B through CMD2 as (name, age, gpa);
D = JOIN B by name, C by name;
store D into ':OUTPATH:.2'; #,
'pig_win' => q# define CMD1 `perl -ne "print $_;"`;
define CMD2 `perl -ne "print $_;"`;
A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
B = stream A through CMD1 as (name, age, gpa);
store B into ':OUTPATH:.1';
C = stream B through CMD2 as (name, age, gpa);
D = JOIN B by name, C by name;
store D into ':OUTPATH:.2'; #,
'sql' => "select name, age, gpa from studenttab10k;
select A.name, A.age, A.gpa, B.name, B.age, B.gpa
from studenttab10k as A join studenttab10k as B using(name);",
},
# Streaming in demux
{
'num' => 2,
'execonly' => 'mapred,tez',
'pig' => q#
define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
A = load ':INPATH:/singlefile/studenttab10k';
split A into A1 if $0 < 'm', A2 if $0 >= 'm';
B = group A1 by $0;
C = foreach B generate flatten(A1);
D = stream C through CMD;
store D into ':OUTPATH:.1';
E = group A2 by $0;
F = foreach E generate group, COUNT(A2);
store F into ':OUTPATH:.2';#,
'sql' => "select name, count(*) from studenttab10k where name < 'm' group by name;
select name, count(*) from studenttab10k where name >= 'm' group by name;",
},
# Streaming in nested demux
{
'num' => 3,
'execonly' => 'mapred,tez',
'pig' => q#
define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
A = load ':INPATH:/singlefile/studenttab10k';
split A into A1 if $0 < 'm', A2 if $0 >= 'm';
split A1 into A3 if $1 < 30, A4 if $1 >= 30;
B = group A3 by $0;
C = foreach B generate flatten(A3);
D = stream C through CMD;
store D into ':OUTPATH:.1';
E = group A2 by $0;
F = foreach E generate group, COUNT(A2);
store F into ':OUTPATH:.2';
G = group A4 by $0;
H = foreach G generate group, COUNT(A4);
store H into ':OUTPATH:.3';#,
'sql' => "select name, count(*) from studenttab10k where name < 'm' and age < 30 group by name;
select name, count(*) from studenttab10k where name >= 'm' group by name;
select name, count(*) from studenttab10k where name < 'm' and age >= 30 group by name;",
},
] # end of tests
},
{
'name' => 'MultiQuery_Union',
'tests' => [
{
# Union + Groupby + Combiner
'num' => 1,
'floatpostprocess' => 1,
'java_params' => ['-Dpig.exec.mapPartAgg=false'],
'delimiter' => ' ',
'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
a1 = filter a by gpa >= 3.9;
a2 = filter a by gpa < 2;
c = union a1, a2;
d = group c by name;
e = foreach d generate group, SUM(c.age);
store e into ':OUTPATH:';\,
},
{
# Union + Groupby + Combiner + POPartialAgg
'num' => 2,
'floatpostprocess' => 1,
'java_params' => ['-Dpig.exec.mapPartAgg=true'],
'delimiter' => ' ',
'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
a1 = filter a by gpa >= 3.9;
a2 = filter a by gpa < 2;
c = union a1, a2;
d = group c by name;
e = foreach d generate group, SUM(c.age);
store e into ':OUTPATH:';\,
},
{
# Union + Replicate Join left outer + Stream + Group by + Secondary Key Partitioner
'num' => 3,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa:float);
a1 = filter a by gpa is null or gpa >= 3.9;
a2 = filter a by gpa < 2;
b = union a1, a2;
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
d = join b by name left outer, c by name using 'replicated';
e = stream d through `cat` as (name, age, gpa, name1, age1, registration, contributions);
f = foreach e generate name, age, gpa, registration, contributions;
g = group f by name;
g1 = group f by name; -- Two separate groupbys to ensure secondary key partitioner
h = foreach g {
inner1 = order f by age, gpa, registration, contributions;
inner2 = limit inner1 1;
generate inner2, SUM(f.age); };
i = foreach g1 {
inner1 = order f by age asc, gpa desc, registration asc, contributions desc;
inner2 = limit inner1 1;
generate inner2, SUM(f.age); };
store h into ':OUTPATH:.1';
store i into ':OUTPATH:.2';\,
},
{
# Union + Replicate Join inner + Order by
'num' => 4,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa:float);
a1 = filter a by gpa is null or gpa >= 3.9;
a2 = filter a by gpa < 1;
b = union a1, a2;
b1 = filter b by age < 30;
b2 = foreach b generate name, age, FLOOR(gpa) as gpa;
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
d = join b2 by name, c by name using 'replicated';
e = foreach d generate b2::name as name, b2::age as age, gpa, registration, contributions;
f = order e by name, age DESC;
store f into ':OUTPATH:';\,
'sortArgs' => ['-t', ' ', '-k', '1,1', '-k', '2,2nr'],
},
{
# Union + Replicate Join right input
'num' => 5,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
a1 = filter a by gpa is null or gpa <= 3.9;
a2 = filter a by gpa < 2;
b = union a1, a2;
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
d = join c by name, b by name using 'replicated';
store d into ':OUTPATH:';\,
},
{
# Union + Left outer Join
'num' => 6,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
a1 = filter a by gpa is null or gpa >= 3.9;
a2 = filter a by gpa < 1;
b = union a1, a2;
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
d = join b by name left outer, c by name;
e = foreach d generate b::name as name, b::age as age, gpa, registration, contributions;
store e into ':OUTPATH:';\,
},
{
# Multiple levels of union + Skewed join Right outer
'num' => 7,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
b = filter a by gpa >= 3.9;
b1 = foreach b generate *;
b2 = foreach b generate *;
b3 = union onschema b1, b2;
c = filter a by gpa < 2;
c1 = foreach c generate *;
c2 = foreach c generate *;
c3 = union onschema c1, c2;
a1 = union onschema b3, c3;
store a1 into ':OUTPATH:.1';
d = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
e = join a1 by name right outer, d by name using 'skewed' PARALLEL 3;
store e into ':OUTPATH:.2';\,
},
{
# Union + Skewed Join right input
'num' => 8,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
a1 = filter a by gpa >= 3.9;
a2 = filter a by gpa < 2;
b = union a1, a2;
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
d = join c by name, b by name using 'skewed' PARALLEL 3;
store d into ':OUTPATH:';\,
},
{
# Union + CROSS
'num' => 9,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
a1 = filter a by gpa == 0.00;
a2 = filter a by gpa == 4.00;
b = union a1, a2;
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
d = CROSS b, c;
store d into ':OUTPATH:';\,
},
{
# Union + Rank
'num' => 10,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
a1 = filter a by gpa is null or gpa >= 3.9;
a2 = filter a by gpa < 1;
b = union a1, a2;
c = rank b;
-- Ordering is not guaranteed with union and ranking will differ. So just test rank and column separately
d = foreach c generate $0;
e = foreach c generate $1, $2, $3;
store d into ':OUTPATH:.1';
store e into ':OUTPATH:.2';\,
},
{
# Union + Rank dense
'num' => 11,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
a1 = filter a by gpa is null or gpa >= 3.9;
a2 = filter a by gpa < 1;
b = union a1, a2;
c = rank b by name ASC, age DESC DENSE;
store c into ':OUTPATH:';\,
},
] # end of tests
},
{
'name' => 'MultiQuery_Self',
'tests' => [
# Self cross
{
'num' => 1,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
b = filter a by gpa >= 3.9;
c = filter a by gpa <= 0.5;
d = filter a by gpa >= 3.5 and gpa < 3.9;
e = filter a by gpa > 0.5 and gpa < 1;
f = CROSS b, c PARALLEL 3;
g = CROSS d, e PARALLEL 4;
store f into ':OUTPATH:.1';
store g into ':OUTPATH:.2';\,
},
{
# Self cogroup
'num' => 2,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
b = filter a by gpa >= 3;
c = filter a by gpa < 2;
d = cogroup c by name, b by name;
e = foreach d generate flatten(c), flatten(b);
store e into ':OUTPATH:';\,
},
{
# Three way join (two self)
'num' => 3,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
b = filter a by gpa >= 3;
c = filter a by gpa < 2;
d = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
e = join b by name, c by name, d by name PARALLEL 2;
store e into ':OUTPATH:';\,
},
{
# Self join replicated
'num' => 4,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
b = filter a by gpa >= 3;
c = filter a by gpa < 2;
d = join c by name, b by name using 'replicated';
store d into ':OUTPATH:';\,
},
{
# Self join skewed
'num' => 5,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
b = filter a by gpa >= 3;
c = filter a by gpa < 2;
d = join c by name, b by name using 'skewed' PARALLEL 2;
store d into ':OUTPATH:';\,
},
{
# Self join left outer
'num' => 6,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
b = filter a by gpa >= 3;
c = filter a by gpa < 2;
d = join c by name left outer, b by name PARALLEL 2;
store d into ':OUTPATH:';\,
},
{
# Self join right outer
'num' => 7,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
b = filter a by gpa >= 3;
c = filter a by gpa < 2;
d = join c by name right outer, b by name PARALLEL 2;
store d into ':OUTPATH:';\,
},
{
# Self join full outer
'num' => 8,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
b = filter a by gpa >= 3;
c = filter a by gpa < 2;
d = join c by name full outer, b by name PARALLEL 2;
store d into ':OUTPATH:';\,
},
{
# Self join union replicated
'num' => 9,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
a1 = filter a by gpa == 0.00;
a2 = filter a by gpa == 4.00;
b = union a1, a2;
c = JOIN a by name, b by name using 'replicated';
store c into ':OUTPATH:';\,
},
{
# Self join union
'num' => 10,
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
a1 = filter a by gpa == 0.00;
a2 = filter a by gpa == 4.00;
b = union a1, a2;
c = JOIN a by name left, b by name;
store c into ':OUTPATH:';\,
},
{
# Complex self join
'num' => 11,
'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
SPLIT a INTO b IF age > 40,
c IF age <= 40;
d = FOREACH c GENERATE name, age, gpa;
e = FILTER d BY gpa > 3;
f = FILTER d BY gpa <= 3;
g = JOIN e BY name LEFT, f BY name;
h = FOREACH g GENERATE e::name as name, e::age as age, e::gpa as gpa;
i = DISTINCT h;
j = FILTER b BY gpa > 3;
k = FILTER b by gpa <= 3;
l = JOIN j BY name LEFT, k BY name;
m = FOREACH l generate j::name as name, j::age as age, j::gpa as gpa;
n = DISTINCT m;
m = UNION e, i, j, n;
n = JOIN a BY name, m BY name;
store n into ':OUTPATH:';\,
}
] # end of tests
},
] # end of groups
}
;