blob: b1945bf50e678159d5f92fc41c6b35581e4aa6ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.expression.AndExpression;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.EqualExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher;
import org.apache.pig.newplan.logical.optimizer.SchemaPatcher;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LODistinct;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.MergeFilter;
import org.apache.pig.newplan.logical.rules.PushUpFilter;
import org.apache.pig.newplan.logical.rules.SplitFilter;
import org.apache.pig.newplan.optimizer.PlanOptimizer;
import org.apache.pig.newplan.optimizer.PlanTransformListener;
import org.apache.pig.newplan.optimizer.Rule;
import org.junit.Assert;
import org.junit.Test;
public class TestNewPlanFilterRule {
PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
LogicalPlan plan = null;
LogicalRelationalOperator load1 = null;
LogicalRelationalOperator load2 = null;
LogicalRelationalOperator filter = null;
LogicalRelationalOperator join = null;
LogicalRelationalOperator store = null;
private void prep() {
plan = new LogicalPlan();
LogicalSchema schema = new LogicalSchema();
schema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.INTEGER));
schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
schema.addField(new LogicalSchema.LogicalFieldSchema("age", null, DataType.INTEGER));
schema.getField(0).uid = 1;
schema.getField(1).uid = 2;
schema.getField(2).uid = 3;
LogicalRelationalOperator l1 = new LOLoad(schema, plan);
l1.setAlias("A");
plan.add(l1);
schema = new LogicalSchema();
schema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.INTEGER));
schema.addField(new LogicalSchema.LogicalFieldSchema("dept", null, DataType.INTEGER));
schema.addField(new LogicalSchema.LogicalFieldSchema("salary", null, DataType.FLOAT));
schema.getField(0).uid = 4;
schema.getField(1).uid = 5;
schema.getField(2).uid = 6;
LogicalRelationalOperator l2 = new LOLoad(schema, plan);
l2.setAlias("B");
plan.add(l2);
MultiMap<Integer, LogicalExpressionPlan> joinPlans = new MultiMap<Integer, LogicalExpressionPlan>();
LogicalRelationalOperator j1 = new LOJoin(plan, joinPlans, LOJoin.JOINTYPE.HASH, new boolean[]{true, true});
LogicalExpressionPlan p1 = new LogicalExpressionPlan();
ProjectExpression lp1 = new ProjectExpression(p1, 0, 1, j1);
p1.add(lp1);
joinPlans.put(0, p1);
LogicalExpressionPlan p2 = new LogicalExpressionPlan();
ProjectExpression lp2 = new ProjectExpression(p2, 1, 1, j1);
p2.add(lp2);
joinPlans.put(1, p2);
j1.setAlias("C");
plan.add(j1);
// build an expression with no AND
LogicalExpressionPlan p3 = new LogicalExpressionPlan();
LogicalRelationalOperator f1 = new LOFilter(plan, p3);
LogicalExpression lp3 = new ProjectExpression(p3, 0, 2, f1);
LogicalExpression cont = new ConstantExpression(p3, new Integer(3));
p3.add(lp3);
p3.add(cont);
LogicalExpression eq = new EqualExpression(p3, lp3, cont);
f1.setAlias("D");
plan.add(f1);
LogicalRelationalOperator s1 = new LOStore(plan, null, null, null);
plan.add(s1);
// load --|-join - filter - store
// load --|
plan.connect(l1, j1);
plan.connect(l2, j1);
plan.connect(j1, f1);
plan.connect(f1, s1);
filter = f1;
store = s1;
join = j1;
load1 = l1;
load2 = l2;
}
@Test
public void testFilterRule() throws Exception {
prep();
// run split filter rule
Rule r = new SplitFilter("SplitFilter");
Set<Rule> s = new HashSet<Rule>();
s.add(r);
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
ls.add(s);
MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
optimizer.optimize();
Assert.assertEquals(plan.getPredecessors(filter).get(0), join);
Assert.assertEquals(plan.getSuccessors(filter).get(0), store);
// run push up filter rule
r = new PushUpFilter("PushUpFilter");
s = new HashSet<Rule>();
s.add(r);
ls = new ArrayList<Set<Rule>>();
ls.add(s);
optimizer = new MyPlanOptimizer(plan, ls, 3);
optimizer.optimize();
// the filter should be moved up to be after load
Assert.assertEquals(plan.getSuccessors(load1).get(0), filter);
Assert.assertEquals(plan.getSuccessors(filter).get(0), join);
Assert.assertEquals(plan.getSuccessors(join).get(0), store);
// run merge filter rule
r = new MergeFilter("MergeFilter");
s = new HashSet<Rule>();
s.add(r);
ls = new ArrayList<Set<Rule>>();
ls.add(s);
optimizer = new MyPlanOptimizer(plan, ls, 3);
optimizer.optimize();
// the filter should the same as before, nothing to merge
Assert.assertEquals(plan.getSuccessors(load1).get(0), filter);
Assert.assertEquals(plan.getSuccessors(filter).get(0), join);
Assert.assertEquals(plan.getSuccessors(join).get(0), store);
}
// build an expression with 1 AND, it should split into 2 filters
@Test
public void testFilterRuleWithAnd() throws Exception {
prep();
LogicalExpressionPlan p4 = new LogicalExpressionPlan();
LogicalExpression lp3 = new ProjectExpression(p4, 0, 2, filter);
LogicalExpression cont = new ConstantExpression(p4, new Integer(3));
p4.add(lp3);
p4.add(cont);
LogicalExpression eq = new EqualExpression(p4, lp3, cont);
LogicalExpression lp4 = new ProjectExpression(p4, 0, 5, filter);
LogicalExpression cont2 = new ConstantExpression(p4, new Float(100));
p4.add(lp4);
p4.add(cont2);
LogicalExpression eq2 = new EqualExpression(p4, lp4, cont2);
LogicalExpression and = new AndExpression(p4, eq, eq2);
((LOFilter)filter).setFilterPlan(p4);
// run split filter rule
Rule r = new SplitFilter("SplitFilter");
Set<Rule> s = new HashSet<Rule>();
s.add(r);
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
ls.add(s);
PlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
optimizer.optimize();
Assert.assertEquals(plan.getPredecessors(filter).get(0), join);
Operator next = plan.getSuccessors(filter).get(0);
Assert.assertEquals(LOFilter.class, next.getClass());
next = plan.getSuccessors(next).get(0);
Assert.assertEquals(LOStore.class, next.getClass());
// run push up filter rule
r = new PushUpFilter("PushUpFilter");
s = new HashSet<Rule>();
s.add(r);
ls = new ArrayList<Set<Rule>>();
ls.add(s);
optimizer = new MyPlanOptimizer(plan, ls, 3);
optimizer.optimize();
// both filters should be moved up to be after each load
next = plan.getSuccessors(load1).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
Assert.assertEquals(plan.getSuccessors(next).get(0), join);
next = plan.getSuccessors(load2).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
Assert.assertEquals(plan.getSuccessors(next).get(0), join);
Assert.assertEquals(plan.getSuccessors(join).get(0), store);
// run merge filter rule
r = new MergeFilter("MergeFilter");
s = new HashSet<Rule>();
s.add(r);
ls = new ArrayList<Set<Rule>>();
ls.add(s);
optimizer = new MyPlanOptimizer(plan, ls, 3);
optimizer.optimize();
// the filters should the same as before, nothing to merge
next = plan.getSuccessors(load1).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
Assert.assertEquals(plan.getSuccessors(next).get(0), join);
next = plan.getSuccessors(load2).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
Assert.assertEquals(plan.getSuccessors(next).get(0), join);
Assert.assertEquals(plan.getSuccessors(join).get(0), store);
}
@Test
public void testFilterRuleWith2And() throws Exception {
prep();
// build an expression with 2 AND, it should split into 3 filters
LogicalExpressionPlan p5 = new LogicalExpressionPlan();
LogicalExpression lp3 = new ProjectExpression(p5, 0, 2, filter);
LogicalExpression cont = new ConstantExpression(p5, new Integer(3));
p5.add(lp3);
p5.add(cont);
LogicalExpression eq = new EqualExpression(p5, lp3, cont);
LogicalExpression lp4 = new ProjectExpression(p5, 0, 3, filter);
LogicalExpression cont2 = new ConstantExpression(p5, new Integer(3));
p5.add(lp4);
p5.add(cont2);
LogicalExpression eq2 = new EqualExpression(p5, lp4, cont2);
LogicalExpression and1 = new AndExpression(p5, eq, eq2);
lp3 = new ProjectExpression(p5, 0, 0, filter);
lp4 = new ProjectExpression(p5, 0, 3, filter);
p5.add(lp3);
p5.add(lp4);
eq2 = new EqualExpression(p5, lp3, lp4);
LogicalExpression and2 = new AndExpression(p5, and1, eq2);
((LOFilter)filter).setFilterPlan(p5);
Rule r = new SplitFilter("SplitFilter");
Set<Rule> s = new HashSet<Rule>();
s.add(r);
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
ls.add(s);
MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
MyPlanTransformListener listener = new MyPlanTransformListener();
optimizer.addPlanTransformListener(listener);
optimizer.optimize();
Assert.assertEquals(plan.getPredecessors(filter).get(0), join);
Operator next = plan.getSuccessors(filter).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
next = plan.getSuccessors(next).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
next = plan.getSuccessors(next).get(0);
Assert.assertEquals(LOStore.class, next.getClass());
OperatorPlan transformed = listener.getTransformed();
Assert.assertEquals(transformed.size(), 3);
// run push up filter rule
r = new PushUpFilter("PushUpFilter");
s = new HashSet<Rule>();
s.add(r);
ls = new ArrayList<Set<Rule>>();
ls.add(s);
optimizer = new MyPlanOptimizer(plan, ls, 3);
listener = new MyPlanTransformListener();
optimizer.addPlanTransformListener(listener);
optimizer.optimize();
// 2 filters should be moved up to be after each load, and one filter should remain
next = plan.getSuccessors(load1).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
Assert.assertEquals(plan.getSuccessors(next).get(0), join);
next = plan.getSuccessors(load2).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
Assert.assertEquals(plan.getSuccessors(next).get(0), join);
next = plan.getSuccessors(join).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
next = plan.getSuccessors(next).get(0);
Assert.assertEquals(next.getClass(), LOStore.class);
transformed = listener.getTransformed();
Assert.assertEquals(transformed.size(), 7);
// run merge filter rule
r = new MergeFilter("MergeFilter");
s = new HashSet<Rule>();
s.add(r);
ls = new ArrayList<Set<Rule>>();
ls.add(s);
optimizer = new MyPlanOptimizer(plan, ls, 3);
listener = new MyPlanTransformListener();
optimizer.addPlanTransformListener(listener);
optimizer.optimize();
// the filters should the same as before, nothing to merge
next = plan.getSuccessors(load1).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
Assert.assertEquals(plan.getSuccessors(next).get(0), join);
next = plan.getSuccessors(load2).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
Assert.assertEquals(plan.getSuccessors(next).get(0), join);
next = plan.getSuccessors(join).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
next = plan.getSuccessors(next).get(0);
Assert.assertEquals(next.getClass(), LOStore.class);
transformed = listener.getTransformed();
Assert.assertNull(transformed);
}
@Test
public void testFilterRuleWith2And2() throws Exception {
prep();
// build an expression with 2 AND, it should split into 3 filters
LogicalExpressionPlan p5 = new LogicalExpressionPlan();
LogicalExpression lp3 = new ProjectExpression(p5, 0, 2, filter);
LogicalExpression cont = new ConstantExpression(p5, new Integer(3));
p5.add(lp3);
p5.add(cont);
LogicalExpression eq = new EqualExpression(p5, lp3, cont);
lp3 = new ProjectExpression(p5, 0, 0, filter);
LogicalExpression lp4 = new ProjectExpression(p5, 0, 3, filter);
p5.add(lp4);
p5.add(lp3);
LogicalExpression eq2 = new EqualExpression(p5, lp3, lp4);
LogicalExpression and1 = new AndExpression(p5, eq, eq2);
lp3 = new ProjectExpression(p5, 0, 2, filter);
lp4 = new ProjectExpression(p5, 0, 5, filter);
p5.add(lp3);
p5.add(lp4);
eq2 = new EqualExpression(p5, lp3, lp4);
LogicalExpression and2 = new AndExpression(p5, and1, eq2);
((LOFilter)filter).setFilterPlan(p5);
Rule r = new SplitFilter("SplitFilter");
Set<Rule> s = new HashSet<Rule>();
s.add(r);
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
ls.add(s);
MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
optimizer.optimize();
Assert.assertEquals(plan.getPredecessors(filter).get(0), join);
Operator next = plan.getSuccessors(filter).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
next = plan.getSuccessors(next).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
next = plan.getSuccessors(next).get(0);
Assert.assertEquals(LOStore.class, next.getClass());
// run push up filter rule
r = new PushUpFilter("PushUpFilter");
s = new HashSet<Rule>();
s.add(r);
ls = new ArrayList<Set<Rule>>();
ls.add(s);
optimizer = new MyPlanOptimizer(plan, ls, 3);
optimizer.optimize();
// 1 filter should be moved up to be after a load, and 2 filters should remain
next = plan.getSuccessors(load1).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
Assert.assertEquals(plan.getSuccessors(next).get(0), join);
next = plan.getSuccessors(load2).get(0);
Assert.assertEquals(next, join);
next = plan.getSuccessors(join).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
next = plan.getSuccessors(next).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
next = plan.getSuccessors(next).get(0);
Assert.assertEquals(next.getClass(), LOStore.class);
// run merge filter rule
r = new MergeFilter("MergeFilter");
s = new HashSet<Rule>();
s.add(r);
ls = new ArrayList<Set<Rule>>();
ls.add(s);
optimizer = new MyPlanOptimizer(plan, ls, 3);
MyPlanTransformListener listener = new MyPlanTransformListener();
optimizer.addPlanTransformListener(listener);
optimizer.optimize();
// the 2 filters after join should merge
next = plan.getSuccessors(load1).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
Assert.assertEquals(plan.getSuccessors(next).get(0), join);
next = plan.getSuccessors(load2).get(0);
Assert.assertEquals(next, join);
next = plan.getSuccessors(join).get(0);
Assert.assertEquals(next.getClass(), LOFilter.class);
next = plan.getSuccessors(next).get(0);
Assert.assertEquals(next.getClass(), LOStore.class);
OperatorPlan transformed = listener.getTransformed();
Assert.assertEquals(transformed.size(), 2);
}
// See pig-1639
@Test
public void testFilterUDFNegative() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = group A by age;"+
"C = filter B by COUNT(A) < 18;" + "D = STORE C INTO 'empty';" ;
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( fe instanceof LOForEach );
Operator group = newLogicalPlan.getSuccessors( fe ).get( 0 );
Assert.assertTrue( group instanceof LOCogroup );
Operator filter = newLogicalPlan.getSuccessors( group ).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
/**
* Test that SAMPLE doesn't get pushed up (see PIG-2014)
*/
@Test
public void testSample() throws Exception {
String query = "A = LOAD 'file.txt' AS (name, cuisines:bag{ t : ( cuisine ) } );" +
"B = GROUP A by name;" +
"C = FOREACH B GENERATE group, A;" +
"D = SAMPLE C 0.1 ; " +
"E = STORE D INTO 'empty';";
// expect loload -> foreach -> cogroup -> filter
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
newLogicalPlan.explain(System.out, "text", true);
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator fe1 = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( fe1 instanceof LOForEach );
Operator cg = newLogicalPlan.getSuccessors( fe1 ).get( 0 );
Assert.assertTrue( cg instanceof LOCogroup );
Operator fe2 = newLogicalPlan.getSuccessors( cg ).get( 0 );
Assert.assertTrue( fe1 instanceof LOForEach );
Operator filter = newLogicalPlan.getSuccessors( fe2 ).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
}
/**
* Test that SAMPLE doesn't get pushed up over Distinct (see PIG-2137)
*/
@Test
public void testSampleDistinct() throws Exception {
String query = "A = LOAD 'file.txt' AS (name, cuisines:bag{ t : ( cuisine ) } );" +
"B = DISTINCT A;" +
"C = SAMPLE B 0.1 ; " +
"D = STORE C INTO 'empty';";
// expect loload -> foreach -> distinct -> filter
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
newLogicalPlan.explain(System.out, "text", true);
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator fe1 = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( fe1 instanceof LOForEach );
Operator dist = newLogicalPlan.getSuccessors( fe1 ).get( 0 );
Assert.assertTrue( dist instanceof LODistinct );
Operator filter = newLogicalPlan.getSuccessors( dist ).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
}
/**
* Test that deterministic filter gets get pushed up over Distinct (see PIG-2137)
*/
@Test
public void testFilterAfterDistinct() throws Exception {
String query = "A = LOAD 'file.txt' AS (name : chararray, cuisines:bag{ t : ( cuisine ) } );" +
"B = DISTINCT A;" +
"C = filter B by SIZE(name) > 10;" +
"D = STORE C INTO 'long_name';";
// filter should be pushed above distinct,
//ie expect - loload -> foreach -> filter -> distinct
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
newLogicalPlan.explain(System.out, "text", true);
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator fe1 = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( fe1 instanceof LOForEach );
Operator filter = newLogicalPlan.getSuccessors(fe1).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
Operator dist = newLogicalPlan.getSuccessors(filter).get( 0 );
Assert.assertTrue( dist instanceof LODistinct );
}
/**
* Test that filter cannot get pushed up over nested Distinct (see PIG-3347)
*/
@Test
public void testFilterAfterNestedDistinct() throws Exception {
String query = "a = LOAD 'file.txt';" +
"a_group = group a by $0;" +
"b = foreach a_group { a_distinct = distinct a.$0;generate group, a_distinct;}" +
"c = filter b by SIZE(a_distinct) == 1;" +
"store c into 'empty';";
// filter should not be pushed above nested distinct,
//ie expect - loload -> locogroup -> foreach -> filter
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
newLogicalPlan.explain(System.out, "text", true);
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator cogroup = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( cogroup instanceof LOCogroup );
Operator foreach = newLogicalPlan.getSuccessors(cogroup).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
Operator filter = newLogicalPlan.getSuccessors(foreach).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
}
/**
* Test that filter cannot get pushed up over nested Limit (see PIG-3347)
*/
@Test
public void testFilterAfterNestedLimit() throws Exception {
String query = "a = LOAD 'file.txt';" +
"a_group = group a by $0;" +
"b = foreach a_group { a_limit = limit a.$0 5;generate group, a_limit;}" +
"c = filter b by SIZE(a_limit) == 1;" +
"store c into 'empty';";
// filter should not be pushed above nested distinct,
//ie expect - loload -> locogroup -> foreach -> filter
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
newLogicalPlan.explain(System.out, "text", true);
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator cogroup = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( cogroup instanceof LOCogroup );
Operator foreach = newLogicalPlan.getSuccessors(cogroup).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
Operator filter = newLogicalPlan.getSuccessors(foreach).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
}
/**
* Test that filter cannot get pushed up over nested Filter (see PIG-3347)
*/
@Test
public void testFilterAfterNestedFilter() throws Exception {
String query = "a = LOAD 'file.txt';" +
"a_group = group a by $0;" +
"b = foreach a_group { a_filter = filter a by $0 == 1;generate group, a_filter;}" +
"c = filter b by SIZE(a_filter) == 1;" +
"store c into 'empty';";
// filter should not be pushed above nested distinct,
//ie expect - loload -> locogroup -> foreach -> filter
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
newLogicalPlan.explain(System.out, "text", true);
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator cogroup = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( cogroup instanceof LOCogroup );
Operator foreach = newLogicalPlan.getSuccessors(cogroup).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
Operator filter = newLogicalPlan.getSuccessors(foreach).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
}
/**
* Test that filter does not get blocked for PushUpFilter/FilterAboveForeach
* by an unrelated nested filter (see PIG-3347)
*/
@Test
public void testFilterAfterUnrelatedNestedFilter() throws Exception {
String query = "a = LOAD 'file.txt' as (a0:int, a1_bag:bag{(X:int)}, a2_bag:bag{(Y:int)});" +
"b = foreach a { a1_filter = filter a1_bag by X == 1; generate a0, a1_filter, a2_bag;}" +
"c = filter b by SIZE(a2_bag) == 1;" +
"store c into 'empty';";
// filter should be pushed above nested filter,
//ie expect - loload -> locogroup -> foreach -> filter
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
newLogicalPlan.explain(System.out, "text", true);
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach1 = newLogicalPlan.getSuccessors(load).get( 0 );
Assert.assertTrue( foreach1 instanceof LOForEach );
Operator filter = newLogicalPlan.getSuccessors( foreach1 ).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
Operator foreach2 = newLogicalPlan.getSuccessors(filter).get( 0 );
Assert.assertTrue( foreach2 instanceof LOForEach );
}
private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
PigServer pigServer = new PigServer(pc);
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
PlanOptimizer optimizer = new NewPlanOptimizer( newLogicalPlan, 3 );
optimizer.optimize();
return newLogicalPlan;
}
public class NewPlanOptimizer extends LogicalPlanOptimizer {
protected NewPlanOptimizer(OperatorPlan p, int iterations) {
super(p, iterations, new HashSet<String>());
}
@Override
public void addPlanTransformListener(PlanTransformListener listener) {
super.addPlanTransformListener(listener);
}
@Override
protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
Set<Rule> s = new HashSet<Rule>();
// add split filter rule
Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
s.add(r);
ls.add(s);
s = new HashSet<Rule>();
r = new PushUpFilter( "PushUpFilter" );
s.add(r);
ls.add(s);
s = new HashSet<Rule>();
r = new FilterAboveForeach( "PushUpFilter" );
s.add(r);
ls.add(s);
return ls;
}
}
public class MyPlanOptimizer extends PlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, List<Set<Rule>> rs,
int iterations) {
super(p, rs, iterations);
addPlanTransformListener(new SchemaPatcher());
addPlanTransformListener(new ProjectionPatcher());
}
@Override
public void addPlanTransformListener(PlanTransformListener listener) {
super.addPlanTransformListener(listener);
}
}
public class MyPlanTransformListener implements PlanTransformListener {
private OperatorPlan tp;
@Override
public void transformed(OperatorPlan fp, OperatorPlan tp)
throws FrontendException {
this.tp = tp;
}
public OperatorPlan getTransformed() {
return tp;
}
}
}