blob: 5bf3856cab65aa23c252be2e0d46cc4bb95781ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import junit.framework.AssertionFailedError;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.pig.ExecType;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.PartitionFilterExtractor;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.expression.AndExpression;
import org.apache.pig.newplan.logical.expression.BinaryExpression;
import org.apache.pig.newplan.logical.expression.CastExpression;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.DereferenceExpression;
import org.apache.pig.newplan.logical.expression.EqualExpression;
import org.apache.pig.newplan.logical.expression.IsNullExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.MapLookupExpression;
import org.apache.pig.newplan.logical.expression.NotEqualExpression;
import org.apache.pig.newplan.logical.expression.NotExpression;
import org.apache.pig.newplan.logical.expression.OrExpression;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.expression.ScalarExpression;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
import org.apache.pig.newplan.optimizer.PlanOptimizer;
import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.parser.ParserException;
import org.junit.Assert;
import org.junit.Test;
/**
* unit tests to test extracting new partition filter conditions out of the filter
* condition in the filter following a load which talks to metadata system (.i.e.
* implements {@link LoadMetadata})
*/
public class TestNewPartitionFilterPushDown {
static PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
"age:int, browser:map[], location:tuple(country:chararray, zip:int));";
String loadquery = "a = load 'foo' using "
+ TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
"age:int, browser:map[], location:tuple(country:chararray, zip:int)', " +
"'%s');";
String query2 = String.format(loadquery, "srcid,dstid");
String query3 = String.format(loadquery, "srcid");
/**
* test case where there is a single expression on partition columns in
* the filter expression along with an expression on non partition column
* @throws Exception
*/
@Test
public void testSimpleMixed() throws Exception {
String q = query + "b = filter a by srcid == 10 and name == 'foo';" + "store b into 'out';";
test(q, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')");
}
/**
* test case where filter does not contain any condition on partition cols
* @throws Exception
*/
@Test
public void testNoPartFilter() throws Exception {
String q = query + "b = filter a by age == 20 and name == 'foo';" + "store b into 'out';";
test(q, Arrays.asList("srcid"), null,
"((age == 20) and (name == 'foo'))");
}
/**
* test case where filter only contains condition on partition cols
* @throws Exception
*/
@Test
public void testOnlyPartFilter1() throws Exception {
String q = query + "b = filter a by srcid > 20 and mrkt == 'us';" + "store b into 'out';";
test(q, Arrays.asList("srcid", "mrkt"),
"((srcid > 20) and (mrkt == 'us'))", null);
}
/**
* test case where filter only contains condition on partition cols
* @throws Exception
*/
@Test
public void testPartIsNullFilter() throws Exception {
String q = query + "b = filter a by srcid is null;" + "store b into 'out';";
test(q, Arrays.asList("srcid"),
null, "(srcid is null)");
}
/**
* test case where filter only contains condition on partition cols
* @throws Exception
*/
@Test
public void testOnlyPartFilter2() throws Exception {
String q = query + "b = filter a by mrkt == 'us';" + "store b into 'out';";
test(q, Arrays.asList("srcid", "mrkt"),
"(mrkt == 'us')", null);
}
/**
* test case where filter only contains condition on partition cols
* @throws Exception
*/
@Test
public void testOnlyPartFilter3() throws Exception {
String q = query + "b = filter a by srcid == 20 or mrkt == 'us';" + "store b into 'out';";
test(q, Arrays.asList("srcid", "mrkt"),
"((srcid == 20) or (mrkt == 'us'))", null);
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns
*/
@Test
public void testMixed1() throws Exception {
String q = query + "b = filter a by " +
"(age < 20 and mrkt == 'us') and (srcid == 10 and " +
"name == 'foo');" + "store b into 'out';";
test(q, Arrays.asList("srcid", "mrkt"),
"((mrkt == 'us') and (srcid == 10))",
"((age < 20) and (name == 'foo'))");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns
*/
@Test
public void testMixed2() throws Exception {
String q = query + "b = filter a by " +
"(age >= 20 and mrkt == 'us') and (srcid == 10 and " +
"dstid == 15);" + "store b into 'out';";
test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))",
"(age >= 20)");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns
*/
@Test
public void testMixed3() throws Exception {
String q = query + "b = filter a by " +
"age >= 20 and mrkt == 'us' and srcid == 10;" + "store b into 'out';";
test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns - this testcase also has a condition
* based on comparison of two partition columns
*/
@Test
public void testMixed4() throws Exception {
String q = query + "b = filter a by " +
"age >= 20 and mrkt == 'us' and name == 'foo' and " +
"srcid == dstid;" + "store b into 'out';";
test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"((mrkt == 'us') and (srcid == dstid))",
"((age >= 20) and (name == 'foo'))");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns -
* This testcase has two partition col conditions with OR + non parition
* col conditions
*/
@Test
public void testMixed5() throws Exception {
String q = query + "b = filter a by " +
"(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
"dstid == 30;" + "store b into 'out';";
test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))",
"(name == 'foo')");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns -
* This testcase has two partition col conditions with OR + non parition
* col conditions
*/
@Test
public void testMixed6() throws Exception {
String q = query + "b = filter a by " +
"dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';" + "store b into 'out';";
test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))",
"(name == 'foo')");
}
/**
* test case where filter has both conditions on partition cols and non
* partition cols and the filter condition will be split to extract the
* conditions on partition columns. This testcase also tests arithmetic
* in partition column conditions
*/
@Test
public void testMixedArith() throws Exception {
String q = query + "b = filter a by " +
"mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;" + "store b into 'out';";
test(q, Arrays.asList("srcid", "dstid", "mrkt"),
"((mrkt == 'us') and ((srcid * 10) == (150 + 20)))",
"(age != 15)");
}
/**
* test case where there is a single expression on partition columns in the
* filter expression along with an expression on non partition column of
* type map
* @throws Exception
*/
@Test
public void testMixedNonPartitionTypeMap() throws Exception {
String q = query + "b = filter a by srcid == 10 and browser#'type' == 'IE';" +
"store b into 'out';";
test(q, Arrays.asList("srcid"), "(srcid == 10)", "(browser#'type' == IE)", true);
q = query + "b = filter a by srcid == 10 and browser#'type' == 'IE' and " +
"browser#'version'#'major' == '8.0';" + "store b into 'out';";
test(q, Arrays.asList("srcid"), "(srcid == 10)", "((browser#'type' == IE) and " +
"(browser#'version'#'major' == 8.0))", true);
}
@Test
public void testMixedNonPartitionTypeMapComplex() throws Exception {
TestLoader.partFilter = null;
String q = "a = load 'foo' using "
+ TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, " +
"age:int, browser:map[], location:tuple(country:chararray, zip:int)', " +
"'srcid,mrkt');" +
"b = filter a by srcid == 10 and mrkt > '1' and mrkt < '5';" +
"c = filter b by browser#'type' == 'IE';" + "store c into 'out';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
Assert.assertEquals("checking partition filter:",
"(((srcid == 10) and (mrkt > '1')) and (mrkt < '5'))",
TestLoader.partFilter.toString());
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
String actual =
getTestExpression((LogicalExpression) filter.getFilterPlan().
getSources().get(0)).toString();
Assert.assertEquals("checking trimmed filter expression:",
"(browser#'type' == IE)", actual);
}
/**
* test case where there is a single expression on partition columns in the
* filter expression along with an expression on non partition column of
* type tuple
* @throws Exception
*/
@Test
public void testMixedNonPartitionTypeTuple() throws Exception {
String q = query + "b = filter a by srcid == 10 and location.country == 'US';" +
"store b into 'out';";
test(q, Arrays.asList("srcid"), "(srcid == 10)", "(location.$0 == US)", true);
}
@Test
public void testAndORConditionPartitionKeyCol() throws Exception {
// Case of AND and OR
String q = "b = filter a by (srcid == 10 and dstid == 5) " +
"or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7);" +
"store b into 'out';";
test(query + q, Arrays.asList("srcid", "dstid"),
"((((srcid == 10) and (dstid == 5)) " +
"or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
null);
// TODO fix following test after PIG-3465
//testFull(query2+ q, "((((srcid == 10) and (dstid == 5)) " +
// "or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))", null, false);
// Additional filter on non-partition key column
q = "b = filter a by ((srcid == 10 and dstid == 5) " +
"or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
"store b into 'out';";
test(query + q, Arrays.asList("srcid", "dstid"),
"((((srcid == 10) and (dstid == 5)) " +
"or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
"(mrkt == 'US')");
testFull(query2+q, "((((srcid == 10) and (dstid == 5)) " +
"or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
"(mrkt == 'US')", false);
// Additional filter on partition key column that cannot be pushed
q = query +
"b = filter a by ((srcid == 10 and dstid == 5) " +
"or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7)) and srcid is null;" +
"store b into 'out';";
test(q, Arrays.asList("srcid", "dstid"),
"((((srcid == 10) and (dstid == 5)) " +
"or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
"(srcid is null)", true);
// partition key col but null condition which should not become part of
// the pushed down filter
q = query + "b = filter a by (srcid is null and dstid == 5) " +
"or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7);" +
"store b into 'out';";
test(q, Arrays.asList("srcid", "dstid"),
"(((dstid == 5) or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))",
"(((srcid is null) or ((srcid == 11) and (dstid == 6))) or ((srcid == 12) and (dstid == 7)))", true);
// Case of AND of ORs
q = query +
"b = filter a by (mrkt == 'US' or mrkt == 'UK') and (srcid == 11 or srcid == 10);" +
"store b into 'out';";
test(q, Arrays.asList("srcid", "mrkt"),
"(((mrkt == 'US') or (mrkt == 'UK')) and ((srcid == 11) or (srcid == 10)))", null);
test(q, Arrays.asList("srcid"),
"((srcid == 11) or (srcid == 10))", "((mrkt == 'US') or (mrkt == 'UK'))");
}
@Test
public void testAndORConditionMixedCol() throws Exception {
// Case of AND and OR with partition key and non-partition key columns
String q = "b = filter a by (srcid == 10 and dstid == 5) " +
"or (srcid == 11 and dstid == 6) or (srcid == 12 and dstid == 7) " +
"or (srcid == 13 and dstid == 8);" +
"store b into 'out';";
test(query + q, Arrays.asList("srcid"), "((((srcid == 10) or (srcid == 11)) or (srcid == 12)) or (srcid == 13))",
"(((((srcid == 10) or (srcid == 11)) or (srcid == 12)) or (dstid == 8)) " +
"and ((((((srcid == 10) or (srcid == 11)) or (dstid == 7)) and " +
"(((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) " +
"and ((dstid == 5) or (dstid == 6)))) or " +
"(srcid == 12)) and ((((srcid == 10) or (dstid == 6)) " +
"and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (dstid == 7)))) or (srcid == 13)) " +
"and (((((srcid == 10) or (srcid == 11)) or (dstid == 7)) and (((((srcid == 10) or (dstid == 6)) " +
"and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (srcid == 12)) " +
"and ((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (dstid == 7)))) or (dstid == 8))))");
//
//testFull(query3 + q, "((((srcid == 10) or (srcid == 11)) or (srcid == 12)) or (srcid == 13))", "", false);
// Additional filter on a partition key column
q = query +
"b = filter a by ((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
"or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
"store b into 'out';";
test(q, Arrays.asList("srcid", "mrkt"), "((((srcid == 10) or (srcid == 11)) or (srcid == 12)) and (mrkt == 'US'))",
"((((srcid == 10) or (srcid == 11)) or (dstid == 7)) and (((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) " +
"and ((dstid == 5) or (dstid == 6)))) or (srcid == 12)) and ((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) " +
"and ((dstid == 5) or (dstid == 6)))) or (dstid == 7))))");
q = query + "b = filter a by (mrkt == 'US' or mrkt == 'UK') and " +
"((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
"or (srcid == 12 and dstid == 7));" +
"store b into 'out';";
test(q, Arrays.asList("srcid", "mrkt"), "(((mrkt == 'US') or (mrkt == 'UK')) and (((srcid == 10) or (srcid == 11)) or (srcid == 12)))",
"((((srcid == 10) or (srcid == 11)) or (dstid == 7)) and (((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (srcid == 12)) and ((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (dstid == 7))))");
// Additional filter on a non-partition key column
q = query +
"b = filter a by ((srcid == 10 and dstid == 5) or (srcid == 11 and dstid == 6) " +
"or (srcid == 12 and dstid == 7)) and mrkt == 'US';" +
"store b into 'out';";
test(q, Arrays.asList("srcid"), "(((srcid == 10) or (srcid == 11)) or (srcid == 12))",
"(((((srcid == 10) or (srcid == 11)) or (dstid == 7)) " +
"and (((((srcid == 10) or (dstid == 6)) and " +
"(((dstid == 5) or (srcid == 11)) and ((dstid == 5) or (dstid == 6)))) or (srcid == 12)) " +
"and ((((srcid == 10) or (dstid == 6)) and (((dstid == 5) or (srcid == 11)) " +
"and ((dstid == 5) or (dstid == 6)))) or (dstid == 7)))) and (mrkt == 'US'))");
// Case of OR and AND
q = query +
"b = filter a by (mrkt == 'US' or mrkt == 'UK') and " +
"(srcid == 11 or srcid == 10) and (dstid == 5 or dstid == 6);" +
"store b into 'out';";
test(q, Arrays.asList("srcid"),
"((srcid == 11) or (srcid == 10))",
"(((mrkt == 'US') or (mrkt == 'UK')) and ((dstid == 5) or (dstid == 6)))");
test(q, Arrays.asList("mrkt"),
"((mrkt == 'US') or (mrkt == 'UK'))",
"(((srcid == 11) or (srcid == 10)) and ((dstid == 5) or (dstid == 6)))");
}
private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
PigServer pigServer = new PigServer( pc );
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
optimizer.optimize();
return newLogicalPlan;
}
private void testFull(String q, String partFilter, String filterExpr, boolean unsupportedExpr) throws Exception {
TestLoader.partFilter = null;
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
if (partFilter != null) {
Assert.assertEquals("checking partition filter:",
partFilter,
TestLoader.partFilter.toString());
} else {
Assert.assertTrue(TestLoader.partFilter == null);
}
if (filterExpr != null) {
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
if (unsupportedExpr) {
String actual = getTestExpression((LogicalExpression) filter.getFilterPlan().getSources().get(0));
Assert.assertEquals("checking trimmed filter expression:", filterExpr, actual);
} else {
String actual = new PartitionFilterExtractor(null, new ArrayList<String>())
.getExpression((LogicalExpression) filter.getFilterPlan().getSources().get(0)).toString();
Assert.assertEquals("checking trimmed filter expression:", filterExpr, actual);
}
} else {
Iterator<Operator> it = newLogicalPlan.getOperators();
while( it.hasNext() ) {
Assert.assertFalse("Checking that filter has been removed since it contained" +
" only conditions on partition cols:",
(it.next() instanceof LOFilter));
}
}
// Test that the filtered plan can be translated to physical plan (PIG-3657)
LogToPhyTranslationVisitor translator = new LogToPhyTranslationVisitor(newLogicalPlan);
translator.visit();
}
/**
* Test that pig sends correct partition column names in setPartitionFilter
* when the user has a schema in the load statement which renames partition
* columns
* @throws Exception
*/
@Test
public void testColNameMapping1() throws Exception {
String q = "a = load 'foo' using "
+ TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
"'srcid,mrkt') as (f1, f2, f3, f4, f5);" +
"b = filter a by " +
"(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" +
"store b into 'out';";
testFull(q, "((mrkt == 'us') and (srcid == 10))", "((f5 >= 20) and (f3 == 15))", false);
}
/**
* Test that pig sends correct partition column names in setPartitionFilter
* when the user has a schema in the load statement which renames partition
* columns - in this test case there is no condition on partition columns
* - so setPartitionFilter() should not be called and the filter condition
* should remain as is.
* @throws Exception
*/
@Test
public void testColNameMapping2() throws Exception {
String q = "a = load 'foo' using "
+ TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
"'srcid') as (f1, f2, f3, f4, f5);" +
"b = filter a by " +
"f5 >= 20 and f2 == 'us' and f3 == 15;" +
"store b into 'out';";
testFull(q, null, "(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", false);
}
/**
* Test that pig sends correct partition column names in setPartitionFilter
* when the user has a schema in the load statement which renames partition
* columns - in this test case the filter only has conditions on partition
* columns
* @throws Exception
*/
@Test
public void testColNameMapping3() throws Exception {
String query = "a = load 'foo' using "
+ TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
"'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);" +
"b = filter a by " +
"(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);" +
"store b into 'out';";
testFull(query, "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
"(dstid == 15)))", null, false);
}
/**
* Test that pig sends correct partition column names in setPartitionFilter
* when the user has a schema in the load statement which renames partition
* columns - in this test case the schema in load statement is a prefix
* (with columns renamed) of the schema returned by
* {@link LoadMetadata#getSchema(String, Configuration)}
* @throws Exception
*/
@Test
public void testColNameMapping4() throws Exception {
String q = "a = load 'foo' using "
+ TestLoader.class.getName() +
"('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
"'srcid,mrkt') as (f1:int, f2:chararray, f3:int, name:chararray, age:int);" +
"b = filter a by " +
"(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" + "store b into 'out';";
testFull(q, "((mrkt == 'us') and (srcid == 10))", "((age >= 20) and (f3 == 15))", false);
}
/**
* Test PIG-1267
* @throws Exception
*/
@Test
public void testColNameMapping5() throws Exception {
TestLoader.partFilter = null;
String q = "a = load 'foo' using "
+ TestLoader.class.getName() +
"('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " +
"'srcid');" +
"b = load 'bar' using "
+ TestLoader.class.getName() +
"('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
"'srcid');" +
"a1 = filter a by srcid == 10;" +
"b1 = filter b by srcid == 20;"+
"c = join a1 by bcookie, b1 by bcookie;" +
"d = foreach c generate $4 as bcookie:chararray, " +
"$5 as dstid:int, $0 as mrkt:chararray;" +
"store d into 'out';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
String partFilter = TestLoader.partFilter.toString();
Assert.assertTrue( "(srcid == 20)".equals( partFilter ) || "(srcid == 10)".equals( partFilter ) );
int counter = 0;
Iterator<Operator> iter = newLogicalPlan.getOperators();
while (iter.hasNext()) {
Assert.assertTrue(!(iter.next() instanceof LOFilter));
counter++;
}
Assert.assertEquals(counter, 5);
}
/**
* Test PIG-2778 Add matches operator to predicate pushdown
* @throws Exception
*/
@Test
public void testMatchOpPushDown() throws Exception {
// regexp condition on a partition col
String q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
test(q, Arrays.asList("name"), "(name matches 'foo*')", null);
// regexp condition on a non-partition col
q = query + "b = filter a by name matches 'foo*';" + "store b into 'out';";
test(q, Arrays.asList("srcid"), null, "(name matches 'foo*')");
}
/**
* Test PIG-3395 Large filter expression makes Pig hang
* @throws Exception
*/
@Test
public void testLargeAndOrCondition() throws Exception {
String q = query + "b = filter a by " +
"(srcid == 1 and mrkt == '2' and dstid == 3) " +
"or (srcid == 4 and mrkt == '5' and dstid == 6) " +
"or (srcid == 7 and mrkt == '8' and dstid == 9) " +
"or (srcid == 10 and mrkt == '11' and dstid == 12) " +
"or (srcid == 13 and mrkt == '14' and dstid == 15) " +
"or (srcid == 16 and mrkt == '17' and dstid == 18) " +
"or (srcid == 19 and mrkt == '20' and dstid == 21) " +
"or (srcid == 22 and mrkt == '23' and dstid == 24) " +
"or (srcid == 25 and mrkt == '26' and dstid == 27) " +
"or (srcid == 28 and mrkt == '29' and dstid == 30) " +
"or (srcid == 31 and mrkt == '32' and dstid == 33) " +
"or (srcid == 34 and mrkt == '35' and dstid == 36) " +
"or (srcid == 37 and mrkt == '38' and dstid == 39) " +
"or (srcid == 40 and mrkt == '41' and dstid == 42) " +
"or (srcid == 43 and mrkt == '44' and dstid == 45) " +
"or (srcid == 46 and mrkt == '47' and dstid == 48) " +
"or (srcid == 49 and mrkt == '50' and dstid == 51) " +
"or (srcid == 52 and mrkt == '53' and dstid == 54) " +
"or (srcid == 55 and mrkt == '56' and dstid == 57) " +
"or (srcid == 58 and mrkt == '59' and dstid == 60) " +
"or (srcid == 61 and mrkt == '62' and dstid == 63) " +
"or (srcid == 64 and mrkt == '65' and dstid == 66) " +
"or (srcid == 67 and mrkt == '68' and dstid == 69);" +
"store b into 'out';";
test(q, Arrays.asList("srcid", "mrkt", "dstid"),
"(((((((((((((((((((((((((srcid == 1) and (mrkt == '2')) and (dstid == 3)) " +
"or (((srcid == 4) and (mrkt == '5')) and (dstid == 6))) " +
"or (((srcid == 7) and (mrkt == '8')) and (dstid == 9))) " +
"or (((srcid == 10) and (mrkt == '11')) and (dstid == 12))) " +
"or (((srcid == 13) and (mrkt == '14')) and (dstid == 15))) " +
"or (((srcid == 16) and (mrkt == '17')) and (dstid == 18))) " +
"or (((srcid == 19) and (mrkt == '20')) and (dstid == 21))) " +
"or (((srcid == 22) and (mrkt == '23')) and (dstid == 24))) " +
"or (((srcid == 25) and (mrkt == '26')) and (dstid == 27))) " +
"or (((srcid == 28) and (mrkt == '29')) and (dstid == 30))) " +
"or (((srcid == 31) and (mrkt == '32')) and (dstid == 33))) " +
"or (((srcid == 34) and (mrkt == '35')) and (dstid == 36))) " +
"or (((srcid == 37) and (mrkt == '38')) and (dstid == 39))) " +
"or (((srcid == 40) and (mrkt == '41')) and (dstid == 42))) " +
"or (((srcid == 43) and (mrkt == '44')) and (dstid == 45))) " +
"or (((srcid == 46) and (mrkt == '47')) and (dstid == 48))) " +
"or (((srcid == 49) and (mrkt == '50')) and (dstid == 51))) " +
"or (((srcid == 52) and (mrkt == '53')) and (dstid == 54))) " +
"or (((srcid == 55) and (mrkt == '56')) and (dstid == 57))) " +
"or (((srcid == 58) and (mrkt == '59')) and (dstid == 60))) " +
"or (((srcid == 61) and (mrkt == '62')) and (dstid == 63))) " +
"or (((srcid == 64) and (mrkt == '65')) and (dstid == 66))) " +
"or (((srcid == 67) and (mrkt == '68')) and (dstid == 69)))",
null);
}
// Or with non partition filter condition should not push projection
@Test
public void testOrWithNonPartitionCondition() throws Exception {
String q = query + "b = filter a by " +
"(srcid == 1 and mrkt == '2' and dstid == 3) " +
"or (srcid == 4 and mrkt == '5' and dstid == 6) " +
"or (srcid == 7 and mrkt == '8' and dstid == 9) " +
"or (name is null);" +
"store b into 'out';";
negativeTest(q, Arrays.asList("srcid", "mrkt", "dstid"));
}
// PIG-3657
@Test
public void testFilteredPlanWithLogToPhyTranslator() throws Exception {
String q = "a = load 'foo' using " + TestLoader.class.getName() +
"('srcid:int, mrkt:chararray', 'srcid') as (f1, f2);" +
"b = filter a by (f1 < 5 or (f1 == 10 and f2 == 'UK'));" +
"store b into 'out';";
testFull(q, "((srcid < 5) or (srcid == 10))", "((f1 < 5) or (f2 == 'UK'))", false);
}
// PIG-4940
@Test
public void testUnaryExpressions() throws Exception {
String q = query + "b = filter a by srcid == 10 and not browser#'type' is null;" +
"store b into 'out';";
test(q, Arrays.asList("srcid"), "(srcid == 10)",
"(not (browser#'type' is null))", true);
}
@Test
public void testScalarExpressions() throws Exception {
String q = "z = load '1line' as (z1:int); " +
query3 + "b = filter a by srcid != 10 and srcid == z.z1;" +
"store b into 'out';";
testFull(q, "(srcid != 10)", "(srcid == z.z1)", true);
}
@Test
public void testNegativeOrAndOr() throws Exception {
String q = query + "b = filter a by dstid != 10 OR ((dstid < 3000 and srcid == 1000) OR (dstid >= 3000 and srcid == 2000));" +
"store b into 'out';";
test(q, Arrays.asList("srcid"), null, "((dstid != 10) or (((dstidLessThan3000) and (srcid == 1000)) or ((dstidGreaterThanEqual3000) and (srcid == 2000))))", true);
}
//// helper methods ///////
private PartitionFilterExtractor test(String query, List<String> partitionCols,
String expPartFilterString, String expFilterString)
throws Exception {
return test(query, partitionCols, expPartFilterString, expFilterString, false);
}
private PartitionFilterExtractor test(String query, List<String> partitionCols,
String expPartFilterString, String expFilterString, boolean unsupportedExpression)
throws Exception {
PigServer pigServer = new PigServer( pc );
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
PartitionFilterExtractor pColExtractor = new PartitionFilterExtractor(
filter.getFilterPlan(), partitionCols);
pColExtractor.visit();
if(expPartFilterString == null) {
Assert.assertEquals("Checking partition column filter:", null,
pColExtractor.getPushDownExpression());
} else {
Assert.assertEquals("Checking partition column filter:",
expPartFilterString,
pColExtractor.getPushDownExpression().toString());
}
if (expFilterString == null) {
Assert.assertTrue("Check that filter can be removed:",
pColExtractor.isFilterRemovable());
} else {
if (unsupportedExpression) {
String actual = getTestExpression((LogicalExpression)pColExtractor.getFilteredPlan().getSources().get(0)).toString();
Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual);
} else {
String actual = pColExtractor.getExpression((LogicalExpression)pColExtractor.getFilteredPlan().getSources().get(0)).toString();
Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual);
}
}
return pColExtractor;
}
// The filter cannot be pushed down unless it meets certain conditions. In
// that case, PColExtractor.getPColCondition() should return null.
private void negativeTest(String query, List<String> partitionCols) throws Exception {
PigServer pigServer = new PigServer( pc );
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
Operator op = newLogicalPlan.getSinks().get(0);
LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
PartitionFilterExtractor extractor = new PartitionFilterExtractor(
filter.getFilterPlan(), partitionCols);
extractor.visit();
Assert.assertFalse(extractor.canPushDown());
}
/**
* this loader is only used to test that parition column filters are given
* in the manner expected in terms of column names - hence it does not
* implement many of the methods and only implements required ones.
*/
public static class TestLoader extends LoadFunc implements LoadMetadata {
Schema schema;
String[] partCols;
static Expression partFilter = null;
public TestLoader(String schemaString, String commaSepPartitionCols)
throws ParserException {
schema = Utils.getSchemaFromString(schemaString);
partCols = commaSepPartitionCols.split(",");
}
@Override
public InputFormat getInputFormat() throws IOException {
return null;
}
@Override
public Tuple getNext() throws IOException {
return null;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
}
@Override
public void setLocation(String location, Job job) throws IOException {
}
@Override
public String[] getPartitionKeys(String location, Job job)
throws IOException {
return partCols;
}
@Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
return new ResourceSchema(schema);
}
@Override
public ResourceStatistics getStatistics(String location,
Job job) throws IOException {
return null;
}
@Override
public void setPartitionFilter(Expression partitionFilter)
throws IOException {
partFilter = partitionFilter;
}
}
public class MyPlanOptimizer extends LogicalPlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, int iterations) {
super( p, iterations, new HashSet<String>() );
}
@Override
protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
Set<Rule> s = new HashSet<Rule>();
// add split filter rule
Rule r = new PartitionFilterOptimizer("PartitionFilterPushDown");
s = new HashSet<Rule>();
s.add(r);
ls.add(s);
r = new LoadTypeCastInserter( "LoadTypeCastInserter" );
s = new HashSet<Rule>();
s.add(r);
ls.add(s);
// Logical expression simplifier
// TODO enable this test after PIG-3465
/*
s = new HashSet<Rule>();
// add logical expression simplification rule
r = new LogicalExpressionSimplifier("FilterLogicExpressionSimplifier");
s.add(r);
ls.add(s);*/
return ls;
}
}
// Helper Functions
public LogicalPlan buildPlan(PigServer pigServer, String query) throws Exception {
try {
return Util.buildLp(pigServer, query);
} catch(Throwable t) {
throw new AssertionFailedError(t.getMessage());
}
}
private static String braketize(String input) {
return "(" + input + ")";
}
public static String getTestExpression(LogicalExpression op) throws FrontendException {
if(op == null) {
return null;
}
if(op instanceof ConstantExpression) {
ConstantExpression constExpr =(ConstantExpression)op ;
return String.valueOf(constExpr.getValue());
} else if (op instanceof ProjectExpression) {
ProjectExpression projExpr = (ProjectExpression)op;
String fieldName = projExpr.getFieldSchema().alias;
return fieldName;
} else {
if(op instanceof BinaryExpression) {
String lhs = getTestExpression(((BinaryExpression) op).getLhs());
String rhs = getTestExpression(((BinaryExpression) op).getRhs());
String opStr = null;
if(op instanceof EqualExpression) {
opStr = " == ";
} else if (op instanceof AndExpression) {
opStr = " and ";
} else if (op instanceof OrExpression) {
opStr = " or ";
} else if (op instanceof NotEqualExpression) {
opStr = " != ";
} else {
opStr = op.getName();
}
return braketize(lhs + opStr + rhs);
} else if (op instanceof CastExpression) {
String expr = getTestExpression(((CastExpression) op).getExpression());
return expr;
} else if(op instanceof IsNullExpression) {
String expr = getTestExpression(((IsNullExpression) op).getExpression());
return braketize(expr + " is null");
} else if(op instanceof MapLookupExpression) {
String col = getTestExpression(((MapLookupExpression)op).getMap());
String key = ((MapLookupExpression)op).getLookupKey();
return col + "#'" + key + "'";
} else if(op instanceof DereferenceExpression) {
String alias = getTestExpression(((DereferenceExpression) op).getReferredExpression());
int colind = ((DereferenceExpression) op).getBagColumns().get(0);
String column = String.valueOf(colind);
return alias + ".$" + column;
} else if (op instanceof NotExpression) {
String expr = getTestExpression(((NotExpression) op).getExpression());
return braketize("not " + expr);
} else if (op instanceof ScalarExpression) {
ScalarExpression scalar = (ScalarExpression) op;
return ((LogicalRelationalOperator)scalar.getImplicitReferencedOperator()).getAlias() +
"." + scalar.getFieldSchema().alias;
} else {
throw new FrontendException("Unsupported conversion of LogicalExpression to Expression: " + op.getName());
}
}
}
}