blob: 8d0d0dc9d0243a8d0ddf850c231935e38a1ae7af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import java.io.File;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.pig.ExecType;
import org.apache.pig.FilterFunc;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LODistinct;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LOStream;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
import org.apache.pig.newplan.logical.rules.PushUpFilter;
import org.apache.pig.newplan.optimizer.PlanOptimizer;
import org.apache.pig.newplan.optimizer.PlanTransformListener;
import org.apache.pig.newplan.optimizer.Rule;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test the logical optimizer.
*/
public class TestNewPlanPushUpFilter {
PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
@BeforeClass
public static void oneTimeSetup() throws Exception {
new File("dummy").delete();
}
@Before
public void tearDown() {
}
/**
* A simple filter UDF for testing
*/
static public class MyFilterFunc extends FilterFunc {
@Override
public Boolean exec(Tuple input) {
return false;
}
}
@Test
// Empty plan, nothing to update
public void testErrorEmptyInput() throws Exception {
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( "" );
Assert.assertTrue( !newLogicalPlan.getOperators().hasNext() );
}
@Test
//Test to ensure that the right exception is thrown when the input list is empty
public void testErrorNonFilterInput() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);store A into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get(0);
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
Assert.assertTrue( newLogicalPlan.getSuccessors(op) == null );
}
@Test
public void testFilterLoad() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = filter A by $1 < 18;" +
"store B into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get(0);
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterStreaming() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = stream A through `" + "ps -u" + "`;" +
"C = filter B by $1 < 18;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStream );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
}
@Test
public void testFilterSort() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = order A by $1, $2;" +
"C = filter B by $1 < 18;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOSort );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterConstantConditionSort() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = order A by $1, $2;" +
"C = filter B by 1 == 1;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOSort );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterUDFSort() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = order A by $1, $2;" +
"C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOSort );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterDistinct() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = distinct A;" +
"C = filter B by $1 < 18;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LODistinct );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterConstantConditionDistinct() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = distinct A;" +
"C = filter B by 1 == 1;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LODistinct );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterUDFDistinct() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = distinct A;" +
"C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LODistinct );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterFilter() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = filter A by $0 != 'name';" +
"C = filter B by $1 < 18;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
Assert.assertTrue( ((LOFilter)op).getAlias().equals( "B" ) );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
Assert.assertTrue( ((LOFilter)op).getAlias().equals( "C" ) );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterSplitOutput() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"split A into B if $1 < 18, C if $1 >= 18;" +
"C = filter B by $1 < 10;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOSplit );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOSplitOutput );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterLimit() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = limit A 10;" +
"C = filter B by $1 < 18;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOLimit );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterUnion() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = union A, B;" +
"D = filter C by $1 < 18;" +
"E = STORE D into'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator loadA = null;
Operator loadB = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) {
loadA = loads.get( 0 );
loadB = loads.get( 1 );
} else {
loadA = loads.get( 1 );
loadB = loads.get( 0 );
}
Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
Assert.assertTrue( foreachA instanceof LOForEach );
Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
Assert.assertTrue( filterA instanceof LOFilter );
Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
Assert.assertTrue( foreachB instanceof LOForEach );
Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
Assert.assertTrue( filterB instanceof LOFilter );
Operator unionA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
Assert.assertTrue( unionA instanceof LOUnion );
Operator unionB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
Assert.assertTrue( unionB instanceof LOUnion );
Assert.assertTrue( unionB == unionA );
Operator store = newLogicalPlan.getSuccessors(unionA).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@Test
public void testFilterConstantConditionUnion() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = union A, B;" +
"D = filter C by 1 == 1;" +
"E = STORE D into'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator loadA = null;
Operator loadB = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) {
loadA = loads.get( 0 );
loadB = loads.get( 1 );
} else {
loadA = loads.get( 1 );
loadB = loads.get( 0 );
}
Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
Assert.assertTrue( foreachA instanceof LOForEach );
Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
Assert.assertTrue( filterA instanceof LOFilter );
Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
Assert.assertTrue( foreachB instanceof LOForEach );
Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
Assert.assertTrue( filterB instanceof LOFilter );
Operator unionA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
Assert.assertTrue( unionA instanceof LOUnion );
Operator unionB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
Assert.assertTrue( unionB instanceof LOUnion );
Assert.assertTrue( unionB == unionA );
Operator store = newLogicalPlan.getSuccessors(unionA).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@Test
public void testFilterUDFUnion() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = union A, B;" +
"D = filter C by " + MyFilterFunc.class.getName() + "() ;" +
"E = STORE D into'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator loadA = null;
Operator loadB = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) {
loadA = loads.get( 0 );
loadB = loads.get( 1 );
} else {
loadA = loads.get( 1 );
loadB = loads.get( 0 );
}
Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
Assert.assertTrue( foreachA instanceof LOForEach );
Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
Assert.assertTrue( filterA instanceof LOFilter );
Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
Assert.assertTrue( foreachB instanceof LOForEach );
Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
Assert.assertTrue( filterB instanceof LOFilter );
Operator unionA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
Assert.assertTrue( unionA instanceof LOUnion );
Operator unionB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
Assert.assertTrue( unionB instanceof LOUnion );
Assert.assertTrue( unionB == unionA );
Operator store = newLogicalPlan.getSuccessors(unionA).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@Test
public void testFilterCross() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = cross A, B;" +
"D = filter C by $5 < 18;" +
"E = limit D 10;" +
"F = STORE E into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOCross );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOLimit );
}
@Test
public void testFilterCross1() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = cross A, B;" +
"D = filter C by $1 < 18;" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOCross );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterCross2() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = cross A, B;" +
"D = filter C by $1 < 18 and $5 < 18;" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator loadA = null;
Operator loadB = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) {
loadA = loads.get( 0 );
loadB = loads.get( 1 );
} else {
loadA = loads.get( 1 );
loadB = loads.get( 0 );
}
Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
Assert.assertTrue( foreachA instanceof LOForEach );
Operator op = newLogicalPlan.getSuccessors(foreachA).get( 0 );
Assert.assertTrue( op instanceof LOCross );
Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
Assert.assertTrue( foreachB instanceof LOForEach );
op = newLogicalPlan.getSuccessors(foreachB).get( 0 );
Assert.assertTrue( op instanceof LOCross );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
}
@Test
public void testFilterConstantConditionCross() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = cross A, B;" +
"D = filter C by 1 == 1;" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator loadA = null;
Operator loadB = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) {
loadA = loads.get( 0 );
loadB = loads.get( 1 );
} else {
loadA = loads.get( 1 );
loadB = loads.get( 0 );
}
Operator op = newLogicalPlan.getSuccessors(loadA).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(loadB).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOCross );
}
@Test
public void testFilterUDFCross() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = cross A, B;" +
"D = filter C by " + MyFilterFunc.class.getName() + "($0) ;" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOCross );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterCogroup() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = cogroup A by $0, B by $0;" +
"D = filter C by $0 < 'name';" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator loadA = null;
Operator loadB = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) {
loadA = loads.get( 0 );
loadB = loads.get( 1 );
} else {
loadA = loads.get( 1 );
loadB = loads.get( 0 );
}
Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
Assert.assertTrue( foreachA instanceof LOForEach );
Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
Assert.assertTrue( filterA instanceof LOFilter );
Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
Assert.assertTrue( foreachB instanceof LOForEach );
Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
Assert.assertTrue( filterB instanceof LOFilter );
Operator cogrpA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
Assert.assertTrue( cogrpA instanceof LOCogroup );
Operator cogrpB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
Assert.assertTrue( cogrpB instanceof LOCogroup );
Assert.assertTrue( cogrpB == cogrpA );
Operator store = newLogicalPlan.getSuccessors(cogrpA).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@Test
public void testFilterConstantConditionCogroup() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = cogroup A by $0, B by $0;" +
"D = filter C by 1 == 1;" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator loadA = null;
Operator loadB = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) {
loadA = loads.get( 0 );
loadB = loads.get( 1 );
} else {
loadA = loads.get( 1 );
loadB = loads.get( 0 );
}
Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
Assert.assertTrue( foreachA instanceof LOForEach );
Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
Assert.assertTrue( filterA instanceof LOFilter );
Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
Assert.assertTrue( foreachB instanceof LOForEach );
Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
Assert.assertTrue( filterB instanceof LOFilter );
Operator cogrpA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
Assert.assertTrue( cogrpA instanceof LOCogroup );
Operator cogrpB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
Assert.assertTrue( cogrpB instanceof LOCogroup );
Assert.assertTrue( cogrpB == cogrpA );
Operator store = newLogicalPlan.getSuccessors(cogrpA).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@Test
public void testFilterUDFCogroup() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = cogroup A by $0, B by $0;" +
"D = filter C by " + MyFilterFunc.class.getName() + "($1) ;" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator loadA = null;
Operator loadB = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) {
loadA = loads.get( 0 );
loadB = loads.get( 1 );
} else {
loadA = loads.get( 1 );
loadB = loads.get( 0 );
}
Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
Assert.assertTrue( foreachA instanceof LOForEach );
Operator cogroupA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
Assert.assertTrue( cogroupA instanceof LOCogroup );
Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
Assert.assertTrue( foreachB instanceof LOForEach );
Operator cogroupB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
Assert.assertTrue( cogroupB instanceof LOCogroup );
Operator filter = newLogicalPlan.getSuccessors( cogroupA ).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
filter = newLogicalPlan.getSuccessors( cogroupB ).get( 0 );
Assert.assertTrue( cogroupB instanceof LOCogroup );
Assert.assertTrue( cogroupB == cogroupA );
Operator store = newLogicalPlan.getSuccessors(filter).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@Test
public void testFilterCogroupOuter() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = cogroup A by $0, B by $0 outer;" +
"D = filter C by $0 < 'name';" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator loadA = null;
Operator loadB = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) {
loadA = loads.get( 0 );
loadB = loads.get( 1 );
} else {
loadA = loads.get( 1 );
loadB = loads.get( 0 );
}
Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
Assert.assertTrue( foreachA instanceof LOForEach );
Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
Assert.assertTrue( filterA instanceof LOFilter );
Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
Assert.assertTrue( foreachB instanceof LOForEach );
Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
Assert.assertTrue( filterB instanceof LOFilter );
Operator cogrpA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
Assert.assertTrue( cogrpA instanceof LOCogroup );
Operator cogrpB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
Assert.assertTrue( cogrpB instanceof LOCogroup );
Assert.assertTrue( cogrpB == cogrpA );
Operator store = newLogicalPlan.getSuccessors(cogrpA).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@Test
public void testFilterConstantConditionCogroupOuter() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = cogroup A by $0, B by $0 outer;" +
"D = filter C by 1 == 1;" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator loadA = null;
Operator loadB = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) {
loadA = loads.get( 0 );
loadB = loads.get( 1 );
} else {
loadA = loads.get( 1 );
loadB = loads.get( 0 );
}
Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
Assert.assertTrue( foreachA instanceof LOForEach );
Operator filterA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
Assert.assertTrue( filterA instanceof LOFilter );
Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
Assert.assertTrue( foreachB instanceof LOForEach );
Operator filterB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
Assert.assertTrue( filterB instanceof LOFilter );
Operator cogrpA = newLogicalPlan.getSuccessors( filterA ).get( 0 );
Assert.assertTrue( cogrpA instanceof LOCogroup );
Operator cogrpB = newLogicalPlan.getSuccessors( filterB ).get( 0 );
Assert.assertTrue( cogrpB instanceof LOCogroup );
Assert.assertTrue( cogrpB == cogrpA );
Operator store = newLogicalPlan.getSuccessors(cogrpA).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@Test
public void testFilterUDFCogroupOuter() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = cogroup A by $0, B by $0 outer;" +
"D = filter C by " + MyFilterFunc.class.getName() + "() ;" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator loadA = null;
Operator loadB = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) ) {
loadA = loads.get( 0 );
loadB = loads.get( 1 );
} else {
loadA = loads.get( 1 );
loadB = loads.get( 0 );
}
Operator foreachA = newLogicalPlan.getSuccessors(loadA).get( 0 );
Assert.assertTrue( foreachA instanceof LOForEach );
Operator cogroupA = newLogicalPlan.getSuccessors(foreachA).get( 0 );
Assert.assertTrue( cogroupA instanceof LOCogroup );
Operator foreachB = newLogicalPlan.getSuccessors(loadB).get( 0 );
Assert.assertTrue( foreachB instanceof LOForEach );
Operator cogroupB = newLogicalPlan.getSuccessors(foreachB).get( 0 );
Assert.assertTrue( cogroupB instanceof LOCogroup );
Operator filter = newLogicalPlan.getSuccessors( cogroupA ).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
filter = newLogicalPlan.getSuccessors( cogroupB ).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
Assert.assertTrue( cogroupB == cogroupA );
Operator store = newLogicalPlan.getSuccessors(filter).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@Test
public void testFilterGroupBy() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = group A by $0;" +
"C = filter B by $0 < 'name';" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOCogroup );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterConstantConditionGroupBy() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = group A by $0;" +
"C = filter B by 1 == 1;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOCogroup );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterUDFGroupBy() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = group A by $0;" +
"C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOCogroup );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterGroupByOuter() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = group A by $0 outer;" +
"C = filter B by $0 < 'name';" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOCogroup );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterConstantConditionGroupByOuter() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = group A by $0 outer;" +
"C = filter B by 1 == 1;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOCogroup );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterUDFGroupByOuter() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = group A by $0 outer;" +
"C = filter B by " + MyFilterFunc.class.getName() + "($1) ;" +
"D = STORE C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator op = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( op instanceof LOLoad );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOCogroup );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
public void testFilterFRJoin() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = join A by $0, B by $0 using 'replicated';" +
"D = filter C by $0 < 'name';" +
"E = limit D 10;" +
"F = STORE E into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOJoin );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOLimit );
}
@Test
public void testFilterFRJoin1() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = join A by $0, B by $0 using 'replicated';" +
"D = filter C by $4 < 'name';" +
"E = limit D 10;" +
"F = STORE E into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOJoin );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOLimit );
}
@Test
// Constant filter condition, the filter will be pushed up to the first branch of join.
public void testFilterConstantConditionFRJoin() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = join A by $0, B by $0 using 'replicated';" +
"D = filter C by 1 == 1;" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOJoin );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
// UDF takes on argument, so it's constant. As a result, filter will pushed up to the first branch of the join.
public void testFilterUDFFRJoin() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = join A by $0, B by $0 using 'replicated';" +
"D = filter C by " + MyFilterFunc.class.getName() + "();" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOJoin );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
@Test
// UDF takes all input, so filter connot be pushed up.
public void testFilterUDFFRJoin1() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = join A by $0, B by $0 using 'replicated';" +
"D = filter C by TupleSize(*) > 5;" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = newLogicalPlan.getSuccessors( loads.get( 0 ) ).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOJoin );
op = newLogicalPlan.getSuccessors( loads.get( 1 ) ).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOJoin );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
}
@Test
public void testFilterInnerJoin() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = join A by $0, B by $0;" +
"D = filter C by $0 < 'name';" +
"E = limit D 10;" +
"F = STORE E into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOJoin );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOLimit );
}
@Test
public void testFilterInnerJoin1() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = join A by $0, B by $0;" +
"D = filter C by $4 < 'name';" +
"E = limit D 10;" +
"F = STORE E into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOJoin );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOLimit );
}
@Test
public void testFilterInnerJoin2() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = join A by $0, B by $0;" +
"D = filter C by $0 < 'jonh' OR $1 > 50;" +
"E = limit D 10;" +
"F = STORE E into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOJoin );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOLimit );
}
@Test
public void testFilterInnerJoinNegative() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = join A by $0, B by $0;" +
"D = filter C by $4 < 'name' AND $0 == 'joe';" +
"E = limit D 10;" +
"F = STORE E into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = newLogicalPlan.getSuccessors( loads.get( 0 ) ).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOJoin );
op = newLogicalPlan.getSuccessors( loads.get( 1 ) ).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOJoin );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
}
@Test
public void testFilterUDFInnerJoin() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name, age, preference);" +
"C = join A by $0, B by $0;" +
"D = filter C by " + MyFilterFunc.class.getName() + "() ;" +
"E = STORE D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOForEach );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOJoin );
op = newLogicalPlan.getSuccessors(op).get( 0 );
Assert.assertTrue( op instanceof LOStore );
}
// See PIG-1289
@Test
public void testOutJoin() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = load 'anotherfile' as (name);" +
"C = join A by name LEFT OUTER, B by name;" +
"D = filter C by B::name is null;" +
"store D into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan(query);
Operator op = newLogicalPlan.getSinks().get(0);
Assert.assertTrue( op instanceof LOStore );
op = newLogicalPlan.getPredecessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
}
// See PIG-1507
@Test
public void testFullOutJoin() throws Exception {
String query = "A = load 'myfile' as (d1:int);" +
"B = load 'anotherfile' as (d2:int);" +
"c = join A by d1 full outer, B by d2;" +
"d = filter c by d2 is null;" +
"store d into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan(query);
Operator op = newLogicalPlan.getSinks().get(0);
Assert.assertTrue( op instanceof LOStore );
op = newLogicalPlan.getPredecessors(op).get( 0 );
Assert.assertTrue( op instanceof LOFilter );
}
/**
* Cascading joins. First one has outer, but filter should be able to push up above the second one, but below
* the first one.
*/
@Test
public void testFullOutJoin1() throws Exception {
String query = "A = load 'myfile' as (d1:int);" +
"B = load 'anotherfile' as (d2:int);" +
"C = join A by d1 full outer, B by d2;" +
"D = load 'xxx' as (d3:int);" +
"E = join C by d1, D by d3;" +
"F = filter E by d1 > 5;" +
"G = store F into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan(query);
List<Operator> ops = newLogicalPlan.getSinks();
Assert.assertTrue( ops.size() == 1 );
Operator op = ops.get( 0 );
Assert.assertTrue( op instanceof LOStore );
Operator join = newLogicalPlan.getPredecessors(op).get( 0 );
Assert.assertTrue( join instanceof LOJoin );
ops = newLogicalPlan.getPredecessors(join);
Assert.assertEquals( 2, ops.size() );
Assert.assertTrue( ops.get( 0 ) instanceof LOFilter || ops.get( 1 ) instanceof LOFilter );
}
private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
PigServer pigServer = new PigServer( pc );
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
optimizer.optimize();
return newLogicalPlan;
}
public class MyPlanOptimizer extends LogicalPlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, int iterations) {
super(p, iterations, new HashSet<String>());
}
@Override
public void addPlanTransformListener(PlanTransformListener listener) {
super.addPlanTransformListener(listener);
}
@Override
protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
Set<Rule> s = new HashSet<Rule>();
// add split filter rule
Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
s.add(r);
ls.add(s);
s = new HashSet<Rule>();
r = new PushUpFilter( "PushUpFilter" );
s.add(r);
ls.add(s);
return ls;
}
}
}