blob: de7fa1ec3d1eecb6f3d64cdc7dc1f0fca98a6710 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import junit.framework.AssertionFailedError;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.pig.ExecType;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.OperatorSubPlan;
import org.apache.pig.newplan.PColFilterExtractor;
import org.apache.pig.newplan.optimizer.PlanOptimizer;
import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.newplan.optimizer.Transformer;
import org.apache.pig.parser.ParserException;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.Utils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* unit tests to test extracting partition filter conditions out of the filter
* condition in the filter following a load which talks to metadata system (.i.e.
* implements {@link LoadMetadata})
*/
public class TestPartitionFilterPushDown {
static PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int);";
@BeforeClass
public static void setup() throws Exception {
}
@AfterClass
public static void tearDown() {
}
/**
* test case where there is a single expression on partition columns in
* the filter expression along with an expression on non partition column
* @throws Exception
*/
@Test
public void testSimpleMixed() throws Exception {
String q = query + "b = filter a by srcid == 10 and name == 'foo';" + "store b into 'out';";
test(q, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')");
}
/**
* test case where filter does not contain any condition on partition cols
* @throws Exception
*/
@Test
public void testNoPartFilter() throws Exception {
String q = query + "b = filter a by age == 20 and name == 'foo';" + "store b into 'out';";
test(q, Arrays.asList("srcid"), null,
"((age == 20) and (name == 'foo'))");
}
/**
* test case where filter only contains condition on partition cols
* @throws Exception
*/
@Test
public void testOnlyPartFilter1() throws Exception {
String q = query + "b = filter a by srcid > 20 and mrkt == 'us';" + "store b into 'out';";
test(q, Arrays.asList("srcid", "mrkt"),
"((srcid > 20) and (mrkt == 'us'))", null);
}
/**
* test case where filter only contains condition on partition cols
* @throws Exception
*/
@Test
public void testOnlyPartFilter2() throws Exception {
String q = query + "b = filter a by mrkt == 'us';" + "store b into 'out';";
test(q, Arrays.asList("srcid", "mrkt"),
"(mrkt == 'us')", null);
}
/**
* test case where filter only contains condition on partition cols
* @throws Exception
*/
@Test
public void testOnlyPartFilter3() throws Exception {
String q = query + "b = filter a by srcid == 20 or mrkt == 'us';" + "store b into 'out';";
test(q, Arrays.asList("srcid", "mrkt"),
"((srcid == 20) or (mrkt == 'us'))", null);
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns
*/
@Test
public void testMixed1() throws Exception {
String q = query + "b = filter a by " +
"(age < 20 and mrkt == 'us') and (srcid == 10 and " +
"name == 'foo');" + "store b into 'out';";
test(q, Arrays.asList("srcid", "mrkt"),
"((mrkt == 'us') and (srcid == 10))",
"((age < 20) and (name == 'foo'))");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns
*/
@Test
public void testMixed2() throws Exception {
String q = query + "b = filter a by " +
"(age >= 20 and mrkt == 'us') and (srcid == 10 and " +
"dstid == 15);" + "store b into 'out';";
test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))",
"(age >= 20)");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns
*/
@Test
public void testMixed3() throws Exception {
String q = query + "b = filter a by " +
"age >= 20 and mrkt == 'us' and srcid == 10;" + "store b into 'out';";
test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns - this testcase also has a condition
* based on comparison of two partition columns
*/
@Test
public void testMixed4() throws Exception {
String q = query + "b = filter a by " +
"age >= 20 and mrkt == 'us' and name == 'foo' and " +
"srcid == dstid;" + "store b into 'out';";
test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"((mrkt == 'us') and (srcid == dstid))",
"((age >= 20) and (name == 'foo'))");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns -
* This testcase has two partition col conditions with OR + non parition
* col conditions
*/
@Test
public void testMixed5() throws Exception {
String q = query + "b = filter a by " +
"(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
"dstid == 30;" + "store b into 'out';";
test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))",
"(name == 'foo')");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns -
* This testcase has two partition col conditions with OR + non parition
* col conditions
*/
@Test
public void testMixed6() throws Exception {
String q = query + "b = filter a by " +
"dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';" + "store b into 'out';";
test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))",
"(name == 'foo')");
}
@Test
public void test7() throws Exception {
String query = "a = load 'foo' using " + TestLoader.class.getName() +
"('srcid, mrkt, dstid, name, age', 'srcid, name');" +
"b = filter a by (srcid < 20 and age < 30) or (name == 'foo' and age > 40);" +
"store b into 'output';";
LogicalPlan plan = buildPlan(new PigServer(pc), query);
Rule rule = new PartitionFilterOptimizer("test");
List<OperatorPlan> matches = rule.match(plan);
if (matches != null) {
Transformer transformer = rule.getNewTransformer();
for (OperatorPlan m : matches) {
if (transformer.check(m)) {
transformer.transform(m);
}
}
OperatorSubPlan newPlan = (OperatorSubPlan)transformer.reportChanges();
Assert.assertTrue(newPlan.getBasePlan().isEqual(plan));
}
}
@Test
public void test8() throws Exception {
String query = "a = load 'foo' using " + TestLoader.class.getName() +
"('srcid, mrkt, dstid, name, age', 'srcid,name');" +
"b = filter a by (srcid < 20) or (name == 'foo');" +
"store b into 'output';";
LogicalPlan plan = Util.buildLp(new PigServer(pc), query);
Rule rule = new PartitionFilterOptimizer("test");
List<OperatorPlan> matches = rule.match(plan);
if (matches != null) {
Transformer transformer = rule.getNewTransformer();
for (OperatorPlan m : matches) {
if (transformer.check(m)) {
transformer.transform(m);
}
}
OperatorSubPlan newPlan = (OperatorSubPlan)transformer.reportChanges();
Assert.assertTrue(newPlan.getBasePlan().size() == 3);
}
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns. This testcase also tests arithmetic
* in partition column conditions
*/
@Test
public void testMixedArith() throws Exception {
String q = query + "b = filter a by " +
"mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;" + "store b into 'out';";
test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"((mrkt == 'us') and ((srcid * 10) == (150 + 20)))",
"(age != 15)");
}
@Test
public void testNegPColConditionWithNonPCol() throws Exception {
// use of partition column condition and non partition column in
// same condition should fail
String q = query + "b = filter a by " +
"srcid > age;" + "store b into 'out';";
negativeTest(q, Arrays.asList("srcid"), 1111);
q = query + "b = filter a by " +
"srcid + age == 20;" + "store b into 'out';";
negativeTest(q, Arrays.asList("srcid"), 1111);
// OR of partition column condition and non partiton col condition
// should fail
q = query + "b = filter a by " +
"srcid > 10 or name == 'foo';" +
"store b into 'out';";
negativeTest(q, Arrays.asList("srcid"), 1111);
}
@Test
public void testNegPColInWrongPlaces() throws Exception {
int expectedErrCode = 1112;
String q = query + "b = filter a by " +
"(srcid > 10 and name == 'foo') or dstid == 10;" + "store b into 'out';";
negativeTest(q, Arrays.asList("srcid", "dstid"), expectedErrCode);
expectedErrCode = 1110;
q = query + "b = filter a by " +
"CONCAT(mrkt, '_10') == 'US_10' and age == 20;" + "store b into 'out';";
negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
q = query + "b = filter a by " +
"mrkt matches '.*us.*' and age < 15;" + "store b into 'out';";
negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
q = query + "b = filter a by " +
"(int)mrkt == 10 and name matches '.*foo.*';" + "store b into 'out';";
negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"),expectedErrCode);
q = query + "b = filter a by " +
"(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';" + "store b into 'out';";
negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
q = query + "b = filter a by " +
"(mrkt is null) and name matches '.*foo.*';" + "store b into 'out';";
negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
q = query + "b = filter a by " +
"(mrkt is not null) and name matches '.*foo.*';" + "store b into 'out';";
negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
}
// @Test
// public void testNegPColInWrongPlaces2() throws Exception {
//
// LogicalPlanTester tester = new LogicalPlanTester(pc);
// tester.buildPlan("a = load 'foo' using " + TestLoader.class.getName()
// + "('srcid, mrkt, dstid, name, age', 'srcid,dstid,mrkt');");
//
// org.apache.pig.impl.logicalLayer.LogicalPlan lp = tester
// .buildPlan("b = filter a by "
// + "(srcid > 10 and name == 'foo') or dstid == 10;");
// negativeTest(lp);
//
// lp = tester.buildPlan("b = filter a by " +
// "CONCAT(mrkt, '_10') == 'US_10' and age == 20;");
// negativeTest(lp);
//
// lp = tester.buildPlan("b = filter a by " +
// "mrkt matches '.*us.*' and age < 15;");
// negativeTest(lp);
//
// lp = tester.buildPlan("b = filter a by " +
// "(int)mrkt == 10 and name matches '.*foo.*';");
// negativeTest(lp);
//
// lp = tester.buildPlan("b = filter a by " +
// "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';");
// negativeTest(lp);
//
// lp = tester.buildPlan("b = filter a by " +
// "(mrkt is null) and name matches '.*foo.*';");
// negativeTest(lp);
//
// lp = tester.buildPlan("b = filter a by " +
// "(mrkt is not null) and name matches '.*foo.*';");
// negativeTest(lp);
// }
/**
* Test that pig sends correct partition column names in setPartitionFilter
* when the user has a schema in the load statement which renames partition
* columns
* @throws Exception
*/
@Test
public void testColNameMapping1() throws Exception {
TestLoader.partFilter = null;
String q = "a = load 'foo' using "
+ TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
"'srcid,mrkt') as (f1, f2, f3, f4, f5);" +
"b = filter a by " +
"(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" +
"store b into 'out';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
Assert.assertEquals("checking partition filter:",
"((mrkt == 'us') and (srcid == 10))",
TestLoader.partFilter.toString());
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
String actual = extractor.getExpression(
(LogicalExpression)filter.getFilterPlan().getSources().get(0)).
toString().toLowerCase();
Assert.assertEquals("checking trimmed filter expression:",
"((f5 >= 20) and (f3 == 15))", actual);
}
private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
PigServer pigServer = new PigServer( pc );
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
optimizer.optimize();
return newLogicalPlan;
}
/**
* Test that pig sends correct partition column names in setPartitionFilter
* when the user has a schema in the load statement which renames partition
* columns - in this test case there is no condition on partition columns
* - so setPartitionFilter() should not be called and the filter condition
* should remain as is.
* @throws Exception
*/
@Test
public void testColNameMapping2() throws Exception {
TestLoader.partFilter = null;
String q = "a = load 'foo' using "
+ TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
"'srcid') as (f1, f2, f3, f4, f5);" +
"b = filter a by " +
"f5 >= 20 and f2 == 'us' and f3 == 15;" +
"store b into 'out';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
Assert.assertEquals("checking partition filter:",
null,
TestLoader.partFilter);
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
String actual = extractor.getExpression(
(LogicalExpression) filter.getFilterPlan().
getSources().get(0)).
toString().toLowerCase();
Assert.assertEquals("checking trimmed filter expression:",
"(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", actual);
}
/**
* Test that pig sends correct partition column names in setPartitionFilter
* when the user has a schema in the load statement which renames partition
* columns - in this test case the filter only has conditions on partition
* columns
* @throws Exception
*/
@Test
public void testColNameMapping3() throws Exception {
TestLoader.partFilter = null;
String query = "a = load 'foo' using "
+ TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
"'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);" +
"b = filter a by " +
"(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);" +
"store b into 'out';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Assert.assertEquals("checking partition filter:",
"(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
"(dstid == 15)))",
TestLoader.partFilter.toString());
Iterator<Operator> it = newLogicalPlan.getOperators();
while( it.hasNext() ) {
Assert.assertFalse("Checking that filter has been removed since it contained" +
" only conditions on partition cols:",
(it.next() instanceof LOFilter));
}
}
/**
* Test that pig sends correct partition column names in setPartitionFilter
* when the user has a schema in the load statement which renames partition
* columns - in this test case the schema in load statement is a prefix
* (with columns renamed) of the schema returned by
* {@link LoadMetadata#getSchema(String, Configuration)}
* @throws Exception
*/
@Test
public void testColNameMapping4() throws Exception {
TestLoader.partFilter = null;
String q = "a = load 'foo' using "
+ TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
"'srcid,mrkt') as (f1:int, f2:chararray, f3:int, name:chararray, age:int);" +
"b = filter a by " +
"(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" + "store b into 'out';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
Assert.assertEquals("checking partition filter:",
"((mrkt == 'us') and (srcid == 10))",
TestLoader.partFilter.toString());
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
String actual = extractor.getExpression(
(LogicalExpression) filter.getFilterPlan().getSources().get(0)).
toString().toLowerCase();
Assert.assertEquals("checking trimmed filter expression:",
"((age >= 20) and (f3 == 15))", actual);
}
/**
* Test PIG-1267
* @throws Exception
*/
@Test
public void testColNameMapping5() throws Exception {
TestLoader.partFilter = null;
String q = "a = load 'foo' using "
+ TestLoader.class.getName() +
"('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " +
"'srcid');" +
"b = load 'bar' using "
+ TestLoader.class.getName() +
"('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
"'srcid');" +
"a1 = filter a by srcid == 10;" +
"b1 = filter b by srcid == 20;"+
"c = join a1 by bcookie, b1 by bcookie;" +
"d = foreach c generate $4 as bcookie:chararray, " +
"$5 as dstid:int, $0 as mrkt:chararray;" +
"store d into 'out';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
String partFilter = TestLoader.partFilter.toString();
Assert.assertTrue( "(srcid == 20)".equals( partFilter ) || "(srcid == 10)".equals( partFilter ) );
int counter = 0;
Iterator<Operator> iter = newLogicalPlan.getOperators();
while (iter.hasNext()) {
Assert.assertTrue(!(iter.next() instanceof LOFilter));
counter++;
}
Assert.assertEquals(counter, 5);
}
//// helper methods ///////
private PColFilterExtractor test(String query, List<String> partitionCols,
String expPartFilterString, String expFilterString)
throws Exception {
PigServer pigServer = new PigServer( pc );
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
PColFilterExtractor pColExtractor = new PColFilterExtractor(
filter.getFilterPlan(), partitionCols);
pColExtractor.visit();
if(expPartFilterString == null) {
Assert.assertEquals("Checking partition column filter:", null,
pColExtractor.getPColCondition());
} else {
Assert.assertEquals("Checking partition column filter:",
expPartFilterString.toLowerCase(),
pColExtractor.getPColCondition().toString().toLowerCase());
}
if(expFilterString == null) {
Assert.assertTrue("Check that filter can be removed:",
pColExtractor.isFilterRemovable());
} else {
String actual = pColExtractor.getExpression(
(LogicalExpression)filter.getFilterPlan().getSources().get(0)).
toString().toLowerCase();
Assert.assertEquals("checking trimmed filter expression:", expFilterString,
actual);
}
return pColExtractor;
}
private void negativeTest(String query, List<String> partitionCols,
int expectedErrorCode) throws Exception {
PigServer pigServer = new PigServer( pc );
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
PColFilterExtractor pColExtractor = new PColFilterExtractor(
filter.getFilterPlan(), partitionCols);
try {
pColExtractor.visit();
} catch(Exception e) {
Assert.assertEquals("Checking if exception has right error code",
expectedErrorCode, LogUtils.getPigException(e).getErrorCode());
return;
}
}
/**
* this loader is only used to test that parition column filters are given
* in the manner expected in terms of column names - hence it does not
* implement many of the methods and only implements required ones.
*/
public static class TestLoader extends LoadFunc implements LoadMetadata {
Schema schema;
String[] partCols;
static Expression partFilter = null;
public TestLoader(String schemaString, String commaSepPartitionCols)
throws ParserException {
schema = Utils.getSchemaFromString(schemaString);
partCols = commaSepPartitionCols.split(",");
}
@Override
public InputFormat getInputFormat() throws IOException {
return null;
}
@Override
public Tuple getNext() throws IOException {
return null;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
}
@Override
public void setLocation(String location, Job job) throws IOException {
}
@Override
public String[] getPartitionKeys(String location, Job job)
throws IOException {
return partCols;
}
@Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
return new ResourceSchema(schema);
}
@Override
public ResourceStatistics getStatistics(String location,
Job job) throws IOException {
return null;
}
@Override
public void setPartitionFilter(Expression partitionFilter)
throws IOException {
partFilter = partitionFilter;
}
}
public class MyPlanOptimizer extends LogicalPlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, int iterations) {
super( p, iterations, new HashSet<String>() );
}
protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
Set<Rule> s = new HashSet<Rule>();
// add split filter rule
Rule r = new PartitionFilterOptimizer("PartitionFilterPushDown");
s = new HashSet<Rule>();
s.add(r);
ls.add(s);
r = new LoadTypeCastInserter( "LoadTypeCastInserter" );
s = new HashSet<Rule>();
s.add(r);
ls.add(s);
return ls;
}
}
// Helper Functions
public LogicalPlan buildPlan(PigServer pigServer, String query) throws Exception {
try {
return Util.buildLp(pigServer, query);
} catch(Throwable t) {
throw new AssertionFailedError(t.getMessage());
}
}
}