#!/usr/bin/env perl | |
############################################################################ | |
# Licensed to the Apache Software Foundation (ASF) under one or more | |
# contributor license agreements. See the NOTICE file distributed with | |
# this work for additional information regarding copyright ownership. | |
# The ASF licenses this file to You under the Apache License, Version 2.0 | |
# (the "License"); you may not use this file except in compliance with | |
# the License. You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
#################################################################### | |
# SUB: Multiquery | |
# Please include a brief description here. | |
# MultiQuery_MapSplitee | |
# - _TEST_ The first example; one that is defined in the bug with one split | |
# - in the map phase | |
# - _TEST_ Multiple side files, all in map phase. | |
# - _TEST_ Two loads and two stores in map phase. | |
# - _TEST_ One split added in reduce phase and map-only splitee. | |
# - _TEST_ Pig-976: Multi-query optimization throws ClassCastException | |
# - _TEST_ Pig-976: Multi-query optimization throws ClassCastException | |
# - _TEST_ Pig-976: Multi-query optimization throws ClassCastException | |
# MultiQuery_MapReduceSplitee | |
# - _TEST_ One split added in reduce phase and one map-reduce splitee | |
# - _TEST_ One split in reduce phase and two Map-Reduce splitees. | |
# - _TEST_ Two loads and two stores in reduce phase | |
# - _TEST_ Implicit split with multiple side files. | |
# - _TEST_ Script with intermediate stores. | |
# - _TEST_ Implicit split with order by and multiple side files. | |
# - _TEST_ Self join using fragment replicate join with multiple side files. | |
# - _TEST_ One split in map phase and two Map-Reduce splitees with mixed combiners. | |
# - _TEST_ One split in map phase and two Map-Reduce splitees without combiners. | |
# - _TEST_ Pig-983: multi-query optimization on multiple group bys following a join or cogroup | |
# MultiQuery_ExplicitSplit | |
# - _TEST_ Explicit split with two side files. | |
# - _TEST_ Explicit split with order by and two side files. | |
# - _TEST_ Splittees with different map key types and nested splits. | |
# - _TEST_ Splittees with different map key type. | |
# - _TEST_ PigMix Test Case L12. | |
# - _TEST_ PigMix Test Case L12 version 2 | |
# - _TEST_ PigMix Test Case L12 version 3 (modified to have different map key types in inner split) | |
# MultiQuery_Streaming | |
# - _TEST_ Streaming with multiple stores. | |
# - _TEST_ Streaming in demux. | |
# - _TEST_ Streaming in nested demux. | |
# MultiQuery_Union (Also refer Union in nightly.conf) | |
# - _TEST_ Multiple levels of union with join | |
# - _TEST_ Union with replicate join left table part of split | |
# - _TEST_ Union with replicate join right table part of split | |
# - _TEST_ Union with skewed join left table part of split | |
# - _TEST_ Union with skewed join right table part of split | |
# - _TEST_ Union with group by + combiner | |
# - _TEST_ Union with group by + secondary key partitioner | |
# - _TEST_ Union with order by | |
# MultiQuery_Self | |
# - _TEST_ Self cross | |
# - _TEST_ Self cogroup | |
# - _TEST_ Three way join (two self) | |
# - _TEST_ Self replicate join | |
# - _TEST_ Self skewed join | |
$cfg = { | |
'driver' => 'Pig', | |
'nummachines' => 5, | |
'groups' => [ | |
{ | |
'name' => 'MultiQuery_MapSplitee', | |
'floatpostprocess' => 1, | |
'delimiter' => ' ', | |
'tests' => [ | |
{ | |
# The first exmaple; one that is defined in the bug with one split | |
# in the map phase | |
'num' => 1, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); | |
b = filter a by age < 22; store b into ':OUTPATH:.1'; | |
c = group b by age; | |
d = foreach c generate group, SUM(b.gpa); | |
store d into ':OUTPATH:.2'; #, | |
'sql' => "select name, age, gpa from studenttab10k where age < 22; | |
select age, sum(gpa) from studenttab10k where age < 22 group by age;", | |
}, | |
{ | |
# Multiple side files, all in map phase. | |
'num' => 2, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); | |
b = filter a by age < 22; | |
store b into ':OUTPATH:.1'; | |
c = filter b by gpa > 3.0; | |
store c into ':OUTPATH:.2'; | |
d = filter c by name < 'm'; | |
store d into ':OUTPATH:.3'; #, | |
'sql' => "select name, age, gpa from studenttab10k where age < 22; | |
select name, age, gpa from studenttab10k where age < 22 and gpa > 3.0; | |
select name, age, gpa from studenttab10k where age < 22 and gpa > 3.0 and name < 'm';", | |
}, | |
{ | |
# Two loads and two stores in map phase. | |
'num' => 3, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); | |
b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); | |
c = filter a by age < 20; | |
d = filter b by age < 20; | |
store c into ':OUTPATH:.1'; | |
store d into ':OUTPATH:.2'; | |
e = cogroup c by name, d by name; | |
f = foreach e generate flatten(c), flatten(d); | |
store f into ':OUTPATH:.3'; #, | |
'sql' => "select name, age, gpa from studenttab10k where age < 20; | |
select name, age, registration, contributions from votertab10k where age < 20; | |
select a.name, a.age, a.gpa, b.name, b.age, b.registration, b.contributions | |
from studenttab10k as a full outer join votertab10k as b using(name) | |
where a.age < 20 and b.age < 20;", | |
}, | |
{ | |
# One split added in reduce phase and map-only splitee. | |
'num' => 4, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); | |
b = filter a by gpa < 3.0; | |
c = group b by age; | |
d = foreach c generate group, AVG(b.gpa); | |
store d into ':OUTPATH:.1'; | |
e = filter d by $1 > 1.5; | |
store e into ':OUTPATH:.2'; #, | |
'sql' => "select age, avg(gpa) from studenttab10k where gpa < 3.0 group by age; | |
select age, avg(gpa) from studenttab10k where gpa < 3.0 group by age having avg(gpa) > 1.5;", | |
}, | |
# Pig-976: Multi-query optimization throws ClassCastException | |
{ | |
'num' => 5, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); | |
b = group a by name; | |
c = group a by age; | |
d = foreach b generate MAX(a.age); | |
e = foreach c generate group, SUM(a.gpa); | |
store d into ':OUTPATH:.1'; | |
store e into ':OUTPATH:.2'; #, | |
'sql' => "select max(age) from studenttab10k group by name; | |
select age, sum(gpa) from studenttab10k group by age;", | |
}, | |
# Pig-976: Multi-query optimization throws ClassCastException | |
{ | |
'num' => 6, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); | |
b = group a all; | |
c = group a by age; | |
d = foreach b generate COUNT(a), MAX(a.age); | |
e = foreach c generate group, SUM(a.gpa); | |
store d into ':OUTPATH:.1'; | |
store e into ':OUTPATH:.2'; #, | |
'sql' => "select count(*), max(age) from studenttab10k; | |
select age, sum(gpa) from studenttab10k group by age;", | |
}, | |
# Pig-976: Multi-query optimization throws ClassCastException | |
{ | |
'num' => 7, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); | |
b = group a by name; | |
c = group a by age; | |
d = foreach b generate MAX(a.age), group; | |
e = foreach c generate group, SUM(a.gpa); | |
store d into ':OUTPATH:.1'; | |
store e into ':OUTPATH:.2'; #, | |
'sql' => "select max(age), name from studenttab10k group by name; | |
select age, sum(gpa) from studenttab10k group by age;", | |
}, | |
] | |
}, | |
{ | |
'name' => 'MultiQuery_MapReduceSplitee', | |
'floatpostprocess' => 1, | |
'delimiter' => ' ', | |
'tests' => [ | |
{ | |
# One split added in reduce phase and one map-reduce splitee | |
'num' => 1, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); | |
b = filter a by gpa < 3.0; | |
c = group b by age; | |
d = foreach c generate group, AVG(b.gpa); | |
store d into ':OUTPATH:.1'; | |
e = filter d by $1 > 1.5; | |
f = group e by $1; | |
g = foreach f generate group, SUM(e.$0); | |
store g into ':OUTPATH:.2'; #, | |
'sql' => "select age, avg(gpa) from studenttab10k where gpa < 3.0 group by age; | |
select t.b, sum(t.a) from (select age as a, avg(gpa) as b from studenttab10k | |
where gpa < 3.0 group by age having avg(gpa) > 1.5) as t group by t.b;", | |
}, | |
{ | |
# One split in reduce phase and two Map-Reduce splitees. | |
'num' => 2, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); | |
b = filter a by gpa < 3.0; | |
c = group b by age; | |
d = foreach c generate group, AVG(b.gpa); | |
e = filter d by $1 > 1.5; | |
e1= group e by $1; | |
e2 = foreach e1 generate group, SUM(e.$0); | |
store e2 into ':OUTPATH:.1'; | |
f = filter d by $1 <= 1.5; | |
f1 = group f by $1; | |
f2 = foreach f1 generate group, COUNT(f.$0); | |
store f2 into ':OUTPATH:.2'; #, | |
'sql' => "select t.c1, sum(t.c0) from (select age as c0, avg(gpa) as c1 from studenttab10k | |
where gpa < 3.0 group by age having avg(gpa) > 1.5) as t group by t.c1; | |
select t.c1, count(t.c0) from (select age as c0, avg(gpa) as c1 from studenttab10k | |
where gpa < 3.0 group by age having avg(gpa) <= 1.5) as t group by t.c1;", | |
}, | |
{ | |
# Two loads and two stores in reduce phase | |
'num' => 3, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); | |
b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); | |
c = filter a by age < 20; | |
d = filter b by age < 20; | |
e = cogroup c by name, d by name; | |
f = foreach e generate flatten(c), flatten(d); | |
g = group f by d::age; | |
h = foreach g generate group, SUM(f.gpa); | |
store h into ':OUTPATH:.1'; | |
e = filter f by c::gpa < 3.0; | |
store e into ':OUTPATH:.2'; #, | |
'sql' => "select c5, sum(c3) from (select a.name as c1, a.age as c2, a.gpa as c3, b.name as c4, | |
b.age as c5, b.registration as c6, b.contributions as c7 | |
from studenttab10k as a full outer join votertab10k as b using(name) | |
where a.age < 20 and b.age < 20) as t group by t.c5; | |
select a.name, a.age, a.gpa, b.name, b.age, b.registration, b.contributions | |
from studenttab10k as a full outer join votertab10k as b using(name) | |
where a.age < 20 and b.age < 20 and a.gpa < 3.0;", | |
}, | |
{ | |
# Implicit split with multiple side files. | |
'num'=> 4, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); | |
b = filter a by age > 50; | |
c = filter a by gpa > 3.0; | |
store c into ':OUTPATH:.1'; | |
d = cogroup b by name, c by name; | |
e = foreach d generate flatten(b), flatten(c); | |
store e into ':OUTPATH:.2'; | |
f = filter e by b::age < 75; | |
store f into ':OUTPATH:.3'; #, | |
'sql' => "select name, age, gpa from studenttab10k where gpa > 3.0; | |
select A.name, A.age, A.gpa, B.name, B.age, B.gpa | |
from (select * from studenttab10k where age > 50) as A | |
join (select * from studenttab10k where gpa > 3.0) as B using (name); | |
select A.name, A.age, A.gpa, B.name, B.age, B.gpa | |
from (select * from studenttab10k where age > 50) as A | |
join (select * from studenttab10k where gpa > 3.0) as B using (name) where A.age < 75;", | |
}, | |
{ | |
# With intermediate store | |
'num' => 5, | |
'pig' => q# A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); | |
store A into ':OUTPATH:.1'; | |
B = load ':OUTPATH:.1'; | |
store B into ':OUTPATH:.2'; #, | |
'sql' => "select name, age, gpa from studenttab10k; | |
select name, age, gpa from studenttab10k;", | |
}, | |
{ | |
# Implicit split with order by and multiple side files. | |
'num'=> 6, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); | |
b = filter a by age > 50; | |
c = filter a by gpa > 3.0; | |
store c into ':OUTPATH:.1'; | |
d = cogroup b by name, c by name; | |
e = foreach d generate flatten(b), flatten(c); | |
f = order e by b::name; | |
store e into ':OUTPATH:.2'; | |
f = filter e by b::age < 75; | |
store f into ':OUTPATH:.3'; #, | |
'sql' => "select name, age, gpa from studenttab10k where gpa > 3.0; | |
select A.name, A.age, A.gpa, B.name, B.age, B.gpa | |
from (select * from studenttab10k where age > 50) as A | |
join (select * from studenttab10k where gpa > 3.0) as B using (name) | |
order by A.name; | |
select A.name, A.age, A.gpa, B.name, B.age, B.gpa | |
from (select * from studenttab10k where age > 50) as A | |
join (select * from studenttab10k where gpa > 3.0) as B using (name) | |
where A.age < 75 order by A.name;", | |
}, | |
# Self join using fragment replicate join with multiple side files | |
{ | |
'num' => 7, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); | |
b = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); | |
c = filter a by age > 50; | |
store c into ':OUTPATH:.1'; | |
d = filter b by gpa > 3.0; | |
store d into ':OUTPATH:.2'; | |
e = join c by gpa, d by gpa using 'repl'; | |
store e into ':OUTPATH:.3'; #, | |
'sql' => "select name, age, gpa from studenttab10k where age > 50; | |
select name, age, gpa from studenttab10k where gpa > 3.0; | |
select a.name, a.age, a.gpa, b.name, b.age, b.gpa | |
from studenttab10k as a join studenttab10k as b using(gpa) | |
where a.age > 50 and b.gpa > 3.0;", | |
}, | |
# One split in map phase and two Map-Reduce splitees with mixed combiner. | |
{ | |
'num' => 8, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); | |
b = filter a by gpa < 3.0; | |
c = filter a by gpa >= 3.0; | |
d = group b by age; | |
e = foreach d generate group, AVG(b.gpa); | |
store e into ':OUTPATH:.1'; | |
f = group c by age; | |
g = foreach f generate group, MAX(c.gpa) - MIN(c.gpa); | |
store g into ':OUTPATH:.2'; #, | |
'sql' => "select age, avg(gpa) from studenttab10k where gpa < 3.0 group by age; | |
select age, max(gpa) - min(gpa) from studenttab10k where gpa >= 3.0 group by age;", | |
}, | |
# One split in map phase and two Map-Reduce splitees without combiner. | |
{ | |
'num' => 9, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); | |
b = filter a by gpa < 3.0; | |
c = filter a by gpa >= 3.0; | |
d = group b by age; | |
e = foreach d generate group, MAX(b.gpa) + MIN(b.gpa); | |
store e into ':OUTPATH:.1'; | |
f = group c by age; | |
g = foreach f generate group, MAX(c.gpa) - MIN(c.gpa); | |
store g into ':OUTPATH:.2'; #, | |
'sql' => "select age, max(gpa) + min(gpa) from studenttab10k where gpa < 3.0 group by age; | |
select age, max(gpa) - min(gpa) from studenttab10k where gpa >= 3.0 group by age;", | |
}, | |
# Pig-983: multi-query optimization on multiple group bys following a join or cogroup | |
{ | |
'num' => 10, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); | |
b = load ':INPATH:/singlefile/votertab10k' as (name:chararray, age:int, registration, contributions:double); | |
c = join a by name, b by name; | |
d = group c by a::age; | |
e = group c by b::age; | |
d1 = foreach d generate group, COUNT(c), MAX(c.a::gpa); | |
e1 = foreach e generate group, SUM(c.b::contributions); | |
store d1 into ':OUTPATH:.1'; | |
store e1 into ':OUTPATH:.2'; #, | |
'sql' => "select a.age, count(*), max(a.gpa) from studenttab10k as a inner join votertab10k as b on (a.name = b.name) group by a.age; | |
select b.age, sum(b.contributions) from studenttab10k as a inner join votertab10k as b on (a.name = b.name) group by b.age;", | |
}, | |
] | |
}, | |
{ | |
'name' => 'MultiQuery_ExplicitSplit', | |
'floatpostprocess' => 1, | |
'delimiter' => ' ', | |
'tests' => [ | |
{ | |
# Explicit split with two side files. | |
'num'=> 1, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); | |
split a into a1 if name > 'm', a2 if name <= 'm'; | |
store a1 into ':OUTPATH:.1'; | |
store a2 into ':OUTPATH:.2'; | |
b = cogroup a1 by age, a2 by age; | |
c = foreach b generate flatten(a1), flatten(a2); | |
store c into ':OUTPATH:.3'; #, | |
'sql' => "select name, age, gpa from studenttab10k where name > 'm'; | |
select name, age, gpa from studenttab10k where name <= 'm'; | |
select A.name, A.age, A.gpa, B.name, B.age, B.gpa | |
from (select * from studenttab10k where name > 'm') as A | |
join (select * from studenttab10k where name <= 'm') as B using (age);", | |
}, | |
{ | |
# Explicit split with order by and two side files. | |
'num'=> 2, | |
'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); | |
split a into a1 if age > 50, a2 if name < 'm'; | |
b2 = distinct a2; | |
b1 = order a1 by name; | |
store b2 into ':OUTPATH:.2'; | |
store b1 into ':OUTPATH:.1'; | |
c = cogroup b2 by name, b1 by name; | |
d = foreach c generate flatten(group), COUNT($1), COUNT($2); | |
store d into ':OUTPATH:.3'; #, | |
'sql' => "select name, age, gpa from studenttab10k where age > 50 order by name; | |
select distinct name, age, gpa from studenttab10k where name < 'm'; | |
select name, count(A.name), count(B.name) | |
from (select distinct name from studenttab10k where name < 'm') as A | |
join (select name from studenttab10k where age > 50) as B using (name) group by name;", | |
'verify_with_pig' => 1, | |
'verify_pig_version' => 'old', | |
}, | |
# Splittees with different map key types and nested splits | |
{ | |
'num' => 3, | |
'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name: chararray, age:int, registration, contributions:double); | |
b = foreach a generate name, age, contributions; | |
split b into c1 if age > 10, c2 if age <= 60; | |
split c1 into d1 if name < 'y', d2 if name >= 'c'; | |
e = group c2 by name parallel 2; | |
e1 = foreach e generate group, SUM(c2.contributions); | |
store e1 into ':OUTPATH:.1'; | |
f = group d1 by name parallel 3; | |
f1 = foreach f generate group, MAX(d1.contributions); | |
store f1 into ':OUTPATH:.2'; | |
g = group d2 by age parallel 4; | |
g1 = foreach g generate group, COUNT(d2); | |
store g1 into ':OUTPATH:.3'; #, | |
'sql' => "select name, sum(contributions) from votertab10k where age <= 60 group by name; | |
select name, max(contributions) from votertab10k where (age > 10 and name < 'y') group by name; | |
select age, count(*) from votertab10k where (age > 10 and name >= 'c') group by age;", | |
}, | |
# Splittees with different map key types | |
{ | |
'num' => 4, | |
'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name: chararray, age:int, registration, contributions:double); | |
b = foreach a generate name, age, contributions; | |
split b into c1 if age > 50, c2 if age <= 50; | |
e = group c2 by name; | |
e1 = foreach e generate group, SUM(c2.contributions); | |
store e1 into ':OUTPATH:.1'; | |
f = group c1 by age; | |
f1 = foreach f generate group, MAX(c1.contributions); | |
store f1 into ':OUTPATH:.2'; #, | |
'sql' => "select name, sum(contributions) from votertab10k where age <= 50 group by name; | |
select age, max(contributions) from votertab10k where age > 50 group by age;", | |
}, | |
# PigMix Test Case L12 | |
{ | |
'num' => 5, | |
'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); | |
b = foreach a generate name, age, contributions; | |
split b into c1 if age > 50, c2 if age <= 50; | |
split c1 into d1 if name < 'm', d2 if name >= 'm'; | |
e = group c2 by name; | |
e1 = foreach e generate group, SUM(c2.contributions); | |
store e1 into ':OUTPATH:.1'; | |
f = group d1 by name; | |
f1 = foreach f generate group, MAX(d1.contributions); | |
store f1 into ':OUTPATH:.2'; | |
g = group d2 by name; | |
g1 = foreach g generate group, COUNT(d2); | |
store g1 into ':OUTPATH:.3'; #, | |
'sql' => "select name, sum(contributions) from votertab10k where age <= 50 group by name; | |
select name, max(contributions) from votertab10k where (age > 50 and name < 'm') group by name; | |
select name, count(*) from votertab10k where (age > 50 and name >= 'm') group by name;", | |
}, | |
# PigMix Test Case L12 version 2 | |
{ | |
'num' => 6, | |
'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); | |
b = foreach a generate name, age, contributions; | |
split b into c1 if age > 50, c2 if age <= 50; | |
split c1 into d1 if name < 'm', d2 if name >= 'm'; | |
e = group c2 by (name, age); | |
e1 = foreach e generate flatten(group), SUM(c2.contributions); | |
store e1 into ':OUTPATH:.1'; | |
f = group d1 by (name, age); | |
f1 = foreach f generate flatten(group), MAX(d1.contributions); | |
store f1 into ':OUTPATH:.2'; | |
g = group d2 by (name, age); | |
g1 = foreach g generate flatten(group), COUNT(d2); | |
store g1 into ':OUTPATH:.3'; #, | |
'sql' => "select name, age, sum(contributions) from votertab10k where age <= 50 group by name, age; | |
select name, age, max(contributions) from votertab10k where (age > 50 and name < 'm') group by name, age; | |
select name, age, count(*) from votertab10k where (age > 50 and name >= 'm') group by name, age;", | |
}, | |
# PigMix Test Case L12 version 3 (modified to have different map key types in inner split) | |
{ | |
'num' => 7, | |
'pig' => q# a = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); | |
b = foreach a generate name, age, contributions; | |
split b into c1 if age > 50, c2 if age <= 50; | |
split c1 into d1 if name < 'm', d2 if name >= 'm'; | |
f = group d1 by name; | |
f1 = foreach f generate flatten(group), MAX(d1.contributions); | |
store f1 into ':OUTPATH:.1'; | |
g = group d2 by (name, age); | |
g1 = foreach g generate flatten(group), COUNT(d2); | |
store g1 into ':OUTPATH:.2'; | |
e = group c2 by (name, age); | |
e1 = foreach e generate flatten(group), SUM(c2.contributions); | |
store e1 into ':OUTPATH:.3'; #, | |
'sql' => "select name, max(contributions) from votertab10k where (age > 50 and name < 'm') group by name; | |
select name, age, count(*) from votertab10k where (age > 50 and name >= 'm') group by name, age; | |
select name, age, sum(contributions) from votertab10k where age <= 50 group by name, age;", | |
}, | |
] | |
}, | |
{ | |
'name' => 'MultiQuery_Streaming', | |
'floatpostprocess' => 1, | |
'delimiter' => ' ', | |
'tests' => [ | |
{ | |
# Streaming with multiple stores | |
'num' => 1, | |
'pig' => q# define CMD1 `perl -ne 'print $_;'`; | |
define CMD2 `perl -ne 'print $_;'`; | |
A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); | |
B = stream A through CMD1 as (name, age, gpa); | |
store B into ':OUTPATH:.1'; | |
C = stream B through CMD2 as (name, age, gpa); | |
D = JOIN B by name, C by name; | |
store D into ':OUTPATH:.2'; #, | |
'pig_win' => q# define CMD1 `perl -ne "print $_;"`; | |
define CMD2 `perl -ne "print $_;"`; | |
A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); | |
B = stream A through CMD1 as (name, age, gpa); | |
store B into ':OUTPATH:.1'; | |
C = stream B through CMD2 as (name, age, gpa); | |
D = JOIN B by name, C by name; | |
store D into ':OUTPATH:.2'; #, | |
'sql' => "select name, age, gpa from studenttab10k; | |
select A.name, A.age, A.gpa, B.name, B.age, B.gpa | |
from studenttab10k as A join studenttab10k as B using(name);", | |
}, | |
# Streaming in demux | |
{ | |
'num' => 2, | |
'execonly' => 'mapred,tez,spark', | |
'pig' => q# | |
define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); | |
A = load ':INPATH:/singlefile/studenttab10k'; | |
split A into A1 if $0 < 'm', A2 if $0 >= 'm'; | |
B = group A1 by $0; | |
C = foreach B generate flatten(A1); | |
D = stream C through CMD; | |
store D into ':OUTPATH:.1'; | |
E = group A2 by $0; | |
F = foreach E generate group, COUNT(A2); | |
store F into ':OUTPATH:.2';#, | |
'sql' => "select name, count(*) from studenttab10k where name < 'm' group by name; | |
select name, count(*) from studenttab10k where name >= 'm' group by name;", | |
}, | |
# Streaming in nested demux | |
{ | |
'num' => 3, | |
'execonly' => 'mapred,tez,spark', | |
'pig' => q# | |
define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl'); | |
A = load ':INPATH:/singlefile/studenttab10k'; | |
split A into A1 if $0 < 'm', A2 if $0 >= 'm'; | |
split A1 into A3 if $1 < 30, A4 if $1 >= 30; | |
B = group A3 by $0; | |
C = foreach B generate flatten(A3); | |
D = stream C through CMD; | |
store D into ':OUTPATH:.1'; | |
E = group A2 by $0; | |
F = foreach E generate group, COUNT(A2); | |
store F into ':OUTPATH:.2'; | |
G = group A4 by $0; | |
H = foreach G generate group, COUNT(A4); | |
store H into ':OUTPATH:.3';#, | |
'sql' => "select name, count(*) from studenttab10k where name < 'm' and age < 30 group by name; | |
select name, count(*) from studenttab10k where name >= 'm' group by name; | |
select name, count(*) from studenttab10k where name < 'm' and age >= 30 group by name;", | |
}, | |
] # end of tests | |
}, | |
{ | |
'name' => 'MultiQuery_Union', | |
'tests' => [ | |
{ | |
# Union + Groupby + Combiner | |
'num' => 1, | |
'floatpostprocess' => 1, | |
'java_params' => ['-Dpig.exec.mapPartAgg=false'], | |
'delimiter' => ' ', | |
'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); | |
a1 = filter a by gpa >= 3.9; | |
a2 = filter a by gpa < 2; | |
c = union a1, a2; | |
d = group c by name; | |
e = foreach d generate group, SUM(c.age); | |
store e into ':OUTPATH:';\, | |
}, | |
{ | |
# Union + Groupby + Combiner + POPartialAgg | |
'num' => 2, | |
'floatpostprocess' => 1, | |
'java_params' => ['-Dpig.exec.mapPartAgg=true'], | |
'delimiter' => ' ', | |
'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); | |
a1 = filter a by gpa >= 3.9; | |
a2 = filter a by gpa < 2; | |
c = union a1, a2; | |
d = group c by name; | |
e = foreach d generate group, SUM(c.age); | |
store e into ':OUTPATH:';\, | |
}, | |
{ | |
# Union + Replicate Join left outer + Stream + Group by + Secondary Key Partitioner | |
'num' => 3, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa:float); | |
a1 = filter a by gpa is null or gpa >= 3.9; | |
a2 = filter a by gpa < 2; | |
b = union a1, a2; | |
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); | |
d = join b by name left outer, c by name using 'replicated'; | |
e = stream d through `cat` as (name, age, gpa, name1, age1, registration, contributions); | |
f = foreach e generate name, age, gpa, registration, contributions; | |
g = group f by name; | |
g1 = group f by name; -- Two separate groupbys to ensure secondary key partitioner | |
h = foreach g { | |
inner1 = order f by age, gpa, registration, contributions; | |
inner2 = limit inner1 1; | |
generate inner2, SUM(f.age); }; | |
i = foreach g1 { | |
inner1 = order f by age asc, gpa desc, registration asc, contributions desc; | |
inner2 = limit inner1 1; | |
generate inner2, SUM(f.age); }; | |
store h into ':OUTPATH:.1'; | |
store i into ':OUTPATH:.2';\, | |
}, | |
{ | |
# Union + Replicate Join inner + Order by | |
'num' => 4, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa:float); | |
a1 = filter a by gpa is null or gpa >= 3.9; | |
a2 = filter a by gpa < 1; | |
b = union a1, a2; | |
b1 = filter b by age < 30; | |
b2 = foreach b generate name, age, FLOOR(gpa) as gpa; | |
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); | |
d = join b2 by name, c by name using 'replicated'; | |
e = foreach d generate b2::name as name, b2::age as age, gpa, registration, contributions; | |
f = order e by name, age DESC; | |
store f into ':OUTPATH:';\, | |
'sortArgs' => ['-t', ' ', '-k', '1,1', '-k', '2,2nr'], | |
}, | |
{ | |
# Union + Replicate Join right input | |
'num' => 5, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
a1 = filter a by gpa is null or gpa <= 3.9; | |
a2 = filter a by gpa < 2; | |
b = union a1, a2; | |
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); | |
d = join c by name, b by name using 'replicated'; | |
store d into ':OUTPATH:';\, | |
}, | |
{ | |
# Union + Left outer Join | |
'num' => 6, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); | |
a1 = filter a by gpa is null or gpa >= 3.9; | |
a2 = filter a by gpa < 1; | |
b = union a1, a2; | |
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); | |
d = join b by name left outer, c by name; | |
e = foreach d generate b::name as name, b::age as age, gpa, registration, contributions; | |
store e into ':OUTPATH:';\, | |
}, | |
{ | |
# Multiple levels of union + Skewed join Right outer | |
'num' => 7, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); | |
b = filter a by gpa >= 3.9; | |
b1 = foreach b generate *; | |
b2 = foreach b generate *; | |
b3 = union onschema b1, b2; | |
c = filter a by gpa < 2; | |
c1 = foreach c generate *; | |
c2 = foreach c generate *; | |
c3 = union onschema c1, c2; | |
a1 = union onschema b3, c3; | |
store a1 into ':OUTPATH:.1'; | |
d = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); | |
e = join a1 by name right outer, d by name using 'skewed' PARALLEL 3; | |
store e into ':OUTPATH:.2';\, | |
}, | |
{ | |
# Union + Skewed Join right input | |
'num' => 8, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
a1 = filter a by gpa >= 3.9; | |
a2 = filter a by gpa < 2; | |
b = union a1, a2; | |
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); | |
d = join c by name, b by name using 'skewed' PARALLEL 3; | |
store d into ':OUTPATH:';\, | |
}, | |
{ | |
# Union + CROSS | |
'num' => 9, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); | |
a1 = filter a by gpa == 0.00; | |
a2 = filter a by gpa == 4.00; | |
b = union a1, a2; | |
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); | |
d = CROSS b, c; | |
store d into ':OUTPATH:';\, | |
}, | |
{ | |
# Union + Rank | |
'num' => 10, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); | |
a1 = filter a by gpa is null or gpa >= 3.9; | |
a2 = filter a by gpa < 1; | |
b = union a1, a2; | |
c = rank b; | |
-- Ordering is not guaranteed with union and ranking will differ. So just test rank and column separately | |
d = foreach c generate $0; | |
e = foreach c generate $1, $2, $3; | |
store d into ':OUTPATH:.1'; | |
store e into ':OUTPATH:.2';\, | |
}, | |
{ | |
# Union + Rank dense | |
'num' => 11, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); | |
a1 = filter a by gpa is null or gpa >= 3.9; | |
a2 = filter a by gpa < 1; | |
b = union a1, a2; | |
c = rank b by name ASC, age DESC DENSE; | |
store c into ':OUTPATH:';\, | |
}, | |
{ | |
# Union + Split + Two replicate join | |
'num' => 12, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
a1 = filter a by gpa is null or gpa <= 3.9; | |
a2 = filter a by gpa < 2; | |
b = union a1, a2; | |
c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); | |
c1 = filter c by age < 30; | |
c2 = filter c by age > 50; | |
d = join b by name, c1 by name using 'replicated'; | |
e = join d by b::name, c2 by name using 'replicated'; | |
store e into ':OUTPATH:';\, | |
}, | |
{ | |
# Multiple Union + Multiple Split + Single store | |
'num' => 13, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); | |
b = load ':INPATH:/singlefile/studenttab10k' as (name, age:int, gpa); | |
u1 = union onschema a, b; | |
SPLIT u1 INTO r IF age < 30, s OTHERWISE; | |
c = load ':INPATH:/singlefile/voternulltab10k' as (votername, voterage, registration, contributions); | |
d = JOIN r BY name LEFT, c BY votername; | |
u2 = UNION ONSCHEMA d, s; | |
e = FILTER u2 BY name == 'nick miller'; | |
f = FILTER u2 BY age > 70 ; | |
u3 = UNION ONSCHEMA e, f; | |
store u3 into ':OUTPATH:';\, | |
}, | |
{ | |
# PIG-5082. Similar to MultiQuery_Union_13 but for non-store vertex group | |
'num' => 14, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa); | |
b = load ':INPATH:/singlefile/studenttab10k' as (name, age:int, gpa); | |
u1 = union onschema a, b; | |
SPLIT u1 INTO r IF age < 30, s OTHERWISE; | |
c = load ':INPATH:/singlefile/voternulltab10k' as (votername, voterage, registration, contributions); | |
d = JOIN r BY name LEFT, c BY votername; | |
u2 = UNION ONSCHEMA d, s; | |
e = FILTER u2 BY name == 'nick miller'; | |
f = FILTER u2 BY age > 70 ; | |
u3 = UNION ONSCHEMA e, f; | |
SPLIT u3 INTO t if age > 75, u OTHERWISE; | |
v = JOIN t BY name LEFT, c BY votername; | |
store v into ':OUTPATH:';\, | |
} | |
] # end of tests | |
}, | |
{ | |
'name' => 'MultiQuery_Self', | |
'tests' => [ | |
# Self cross | |
{ | |
'num' => 1, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
b = filter a by gpa >= 3.9; | |
c = filter a by gpa <= 0.5; | |
d = filter a by gpa >= 3.5 and gpa < 3.9; | |
e = filter a by gpa > 0.5 and gpa < 1; | |
f = CROSS b, c PARALLEL 3; | |
g = CROSS d, e PARALLEL 4; | |
store f into ':OUTPATH:.1'; | |
store g into ':OUTPATH:.2';\, | |
}, | |
{ | |
# Self cogroup | |
'num' => 2, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
b = filter a by gpa >= 3; | |
c = filter a by gpa < 2; | |
d = cogroup c by name, b by name; | |
e = foreach d generate flatten(c), flatten(b); | |
store e into ':OUTPATH:';\, | |
}, | |
{ | |
# Three way join (two self) | |
'num' => 3, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
b = filter a by gpa >= 3; | |
c = filter a by gpa < 2; | |
d = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); | |
e = join b by name, c by name, d by name PARALLEL 2; | |
store e into ':OUTPATH:';\, | |
}, | |
{ | |
# Self join replicated | |
'num' => 4, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
b = filter a by gpa >= 3; | |
c = filter a by gpa < 2; | |
d = join c by name, b by name using 'replicated'; | |
store d into ':OUTPATH:';\, | |
}, | |
{ | |
# Self join skewed | |
'num' => 5, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
b = filter a by gpa >= 3; | |
c = filter a by gpa < 2; | |
d = join c by name, b by name using 'skewed' PARALLEL 2; | |
store d into ':OUTPATH:';\, | |
}, | |
{ | |
# Self join left outer | |
'num' => 6, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
b = filter a by gpa >= 3; | |
c = filter a by gpa < 2; | |
d = join c by name left outer, b by name PARALLEL 2; | |
store d into ':OUTPATH:';\, | |
}, | |
{ | |
# Self join right outer | |
'num' => 7, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
b = filter a by gpa >= 3; | |
c = filter a by gpa < 2; | |
d = join c by name right outer, b by name PARALLEL 2; | |
store d into ':OUTPATH:';\, | |
}, | |
{ | |
# Self join full outer | |
'num' => 8, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa); | |
b = filter a by gpa >= 3; | |
c = filter a by gpa < 2; | |
d = join c by name full outer, b by name PARALLEL 2; | |
store d into ':OUTPATH:';\, | |
}, | |
{ | |
# Self join union replicated | |
'num' => 9, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); | |
a1 = filter a by gpa == 0.00; | |
a2 = filter a by gpa == 4.00; | |
b = union a1, a2; | |
c = JOIN a by name, b by name using 'replicated'; | |
store c into ':OUTPATH:';\, | |
}, | |
{ | |
# Self join union | |
'num' => 10, | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float); | |
a1 = filter a by gpa == 0.00; | |
a2 = filter a by gpa == 4.00; | |
b = union a1, a2; | |
c = JOIN a by name left, b by name; | |
store c into ':OUTPATH:';\, | |
}, | |
{ | |
# Complex self join | |
'num' => 11, | |
'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float); | |
SPLIT a INTO b IF age > 40, | |
c IF age <= 40; | |
d = FOREACH c GENERATE name, age, gpa; | |
e = FILTER d BY gpa > 3; | |
f = FILTER d BY gpa <= 3; | |
g = JOIN e BY name LEFT, f BY name; | |
h = FOREACH g GENERATE e::name as name, e::age as age, e::gpa as gpa; | |
i = DISTINCT h; | |
j = FILTER b BY gpa > 3; | |
k = FILTER b by gpa <= 3; | |
l = JOIN j BY name LEFT, k BY name; | |
m = FOREACH l generate j::name as name, j::age as age, j::gpa as gpa; | |
n = DISTINCT m; | |
m = UNION e, i, j, n; | |
n = JOIN a BY name, m BY name; | |
store n into ':OUTPATH:';\, | |
}, | |
{ | |
# Self join bloom left outer | |
'num' => 12, | |
'execonly' => 'tez', | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
b = filter a by gpa >= 3.9; | |
c = filter a by gpa > 3; | |
d = join b by name left outer, c by name using 'bloom'; | |
store d into ':OUTPATH:';\, | |
'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
b = filter a by gpa >= 3.9; | |
c = filter a by gpa > 3; | |
d = join b by name left outer, c by name; | |
store d into ':OUTPATH:';\, | |
}, | |
{ | |
# Self join bloom left outer with strategy as reduce | |
'num' => 13, | |
'execonly' => 'tez', | |
'java_params' => ['-Dpig.bloomjoin.strategy=reduce'], | |
'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
b = filter a by gpa >= 3.9; | |
c = filter a by gpa > 3; | |
d = join b by name left outer, c by name using 'bloom'; | |
store d into ':OUTPATH:';\, | |
'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); | |
b = filter a by gpa >= 3.9; | |
c = filter a by gpa > 3; | |
d = join b by name left outer, c by name; | |
store d into ':OUTPATH:';\, | |
}, | |
] # end of tests | |
}, | |
] # end of groups | |
} | |
; |