blob: 117e753ec7c3d98a51f7ff6de54d0ac26c2c0df1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Add;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Divide;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Mod;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Multiply;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.expression.AddExpression;
import org.apache.pig.newplan.logical.expression.BinCondExpression;
import org.apache.pig.newplan.logical.expression.DereferenceExpression;
import org.apache.pig.newplan.logical.expression.DivideExpression;
import org.apache.pig.newplan.logical.expression.IsNullExpression;
import org.apache.pig.newplan.logical.expression.LessThanExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ModExpression;
import org.apache.pig.newplan.logical.expression.MultiplyExpression;
import org.apache.pig.newplan.logical.expression.NegativeExpression;
import org.apache.pig.newplan.logical.expression.NotExpression;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.expression.SubtractExpression;
import org.apache.pig.newplan.logical.expression.UserFuncExpression;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
import org.junit.Before;
import org.junit.Test;
public class TestNewPlanLogToPhyTranslationVisitor {
PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
private PhysicalPlan translatePlan(OperatorPlan plan) throws IOException {
LogToPhyTranslationVisitor visitor = new LogToPhyTranslationVisitor(plan);
visitor.visit();
return visitor.getPhysicalPlan();
}
private LogicalPlan buildPlan(String query)
throws Exception{
PigServer pigServer = new PigServer( pc );
return Util.buildLp(pigServer, query);
}
@Before
public void setUp() throws Exception {
LogicalExpression.resetNextUid();
}
@Test
public void testSimplePlan() throws Exception {
String query = ("a = load 'd.txt';" +
"b = filter a by $0==NULL;" +
"store b into 'empty';");
org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = buildPlan(query);
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
assertEquals( 3, phyPlan.size() );
assertEquals( 1, phyPlan.getRoots().size() );
assertEquals( 1, phyPlan.getLeaves().size() );
PhysicalOperator load = phyPlan.getRoots().get(0);
assertEquals( POLoad.class, load.getClass() );
assertTrue( ((POLoad)load).getLFile().getFileName().contains("d.txt") );
// Check for Filter
PhysicalOperator fil = phyPlan.getSuccessors(load).get(0);
assertEquals( POFilter.class, fil.getClass() );
PhysicalPlan filPlan = ((POFilter)fil).getPlan();
List<PhysicalOperator> roots = filPlan.getRoots();
// Order of entrySet is not guaranteed with jdk1.7
Collections.sort(roots);
assertEquals( 2, roots.size() );
assertEquals( 1, filPlan.getLeaves().size() );
PhysicalOperator eq = filPlan.getLeaves().get(0);
assertEquals( EqualToExpr.class, eq.getClass() );
PhysicalOperator prj1 = roots.get(0);
assertEquals( POProject.class, prj1.getClass() );
assertEquals( 0, ((POProject)prj1).getColumn() );
PhysicalOperator constExp = roots.get(1);
assertEquals( ConstantExpression.class, constExp.getClass() );
assertEquals( null, ((ConstantExpression)constExp).getValue() );
// Check for Store
PhysicalOperator stor = phyPlan.getSuccessors(fil).get(0);
assertEquals( POStore.class, stor.getClass() );
assertTrue( ((POStore)stor).getSFile().getFileName().contains("empty"));
}
@Test
public void testJoinPlan() throws Exception {
String query = ("a = load 'd1.txt' as (id, c);" +
"b = load 'd2.txt'as (id, c);" +
"c = join a by id, b by c;" +
"d = filter c by a::id==NULL AND b::c==NULL;" +
"store d into 'empty';");
// check basics
LogicalPlan newPlan = buildPlan(query);
PhysicalPlan physicalPlan = translatePlan(newPlan);
assertEquals(9, physicalPlan.size());
List<PhysicalOperator> roots = physicalPlan.getRoots();
Collections.sort(roots);
assertEquals(roots.size(), 2);
// Check Load and LocalRearrange and GlobalRearrange
PhysicalOperator LoR = (PhysicalOperator)physicalPlan.getSuccessors(roots.get(0)).get(0);
assertEquals( POLocalRearrange.class, LoR.getClass() );
POLocalRearrange Lor = (POLocalRearrange) LoR;
PhysicalOperator prj3 = Lor.getPlans().get(0).getLeaves().get(0);
assertEquals( POProject.class, prj3.getClass() );
assertEquals(0, ((POProject)prj3).getColumn() );
PhysicalOperator inp1 = Lor.getInputs().get(0);
assertEquals( POLoad.class, inp1.getClass() );
assertTrue( ((POLoad)inp1).getLFile().getFileName().contains("d1.txt") );
PhysicalOperator LoR1 = (PhysicalOperator)physicalPlan.getSuccessors(roots.get(1)).get(0);
assertEquals( POLocalRearrange.class, LoR1.getClass() );
POLocalRearrange Lor1 = (POLocalRearrange) LoR1;
PhysicalOperator prj4 = Lor1.getPlans().get(0).getLeaves().get(0);
assertEquals( POProject.class, prj4.getClass() );
assertEquals(1, ((POProject)prj4).getColumn() );
PhysicalOperator inp2 = Lor1.getInputs().get(0);
assertEquals( POLoad.class, inp2.getClass() );
assertTrue( ((POLoad)inp2).getLFile().getFileName().contains("d2.txt") );
PhysicalOperator GoR = (PhysicalOperator)physicalPlan.getSuccessors(LoR).get(0);
assertEquals( POGlobalRearrange.class, GoR.getClass() );
PhysicalOperator Pack = (PhysicalOperator)physicalPlan.getSuccessors(GoR).get(0);
assertEquals( POPackage.class, Pack.getClass() );
// Check for ForEach
PhysicalOperator ForE = (PhysicalOperator)physicalPlan.getSuccessors(Pack).get(0);
assertEquals( POForEach.class, ForE.getClass() );
PhysicalOperator prj5 = ((POForEach)ForE).getInputPlans().get(0).getLeaves().get(0);
assertEquals( POProject.class, prj5.getClass() );
assertEquals( 1, ((POProject)prj5).getColumn() );
PhysicalOperator prj6 = ((POForEach)ForE).getInputPlans().get(1).getLeaves().get(0);
assertEquals( POProject.class, prj6.getClass() );
assertEquals( 2, ((POProject)prj6).getColumn() );
// Filter Operator
PhysicalOperator fil = (PhysicalOperator)physicalPlan.getSuccessors(ForE).get(0);
assertEquals( POFilter.class, fil.getClass() );
PhysicalPlan filPlan = ((POFilter)fil).getPlan();
List<PhysicalOperator> filRoots = filPlan.getRoots();
assertEquals(4, filRoots.size());
Collections.sort(filRoots);
assertEquals( ConstantExpression.class, filRoots.get(1).getClass() );
ConstantExpression ce1 = (ConstantExpression) filRoots.get(1);
assertEquals( null, ce1.getValue() );
assertEquals( ConstantExpression.class, filRoots.get(3).getClass() );
ConstantExpression ce2 = (ConstantExpression) filRoots.get(3);
assertEquals( null, ce2.getValue() );
assertEquals( POProject.class, filRoots.get(0).getClass() );
POProject prj1 = (POProject) filRoots.get(0);
assertEquals( 0, prj1.getColumn() );
assertEquals( POProject.class, filRoots.get(2).getClass() );
POProject prj2 = (POProject) filRoots.get(2);
assertEquals( 3, prj2.getColumn() );
// Check Store Operator
PhysicalOperator stor = (PhysicalOperator)physicalPlan.getSuccessors(fil).get(0);
assertEquals( POStore.class, stor.getClass() );
assertTrue( ((POStore)stor).getSFile().getFileName().contains("empty") );
}
@Test
public void testMultiStore() throws Exception {
String query = ("a = load 'd1.txt' as (id, c);" +
"b = load 'd2.txt'as (id, c);" +
"c = load 'd3.txt' as (id, c);" +
"d = join a by id, b by c;" +
"e = filter d by a::id==NULL AND b::c==NULL;" +
"f = join e by b::c, c by id;" +
"g = filter f by b::id==NULL AND c::c==NULL;" +
"store g into 'empty2';");
// check basics
LogicalPlan newLogicalPlan = buildPlan(query);
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
assertEquals(16, phyPlan.size());
List<PhysicalOperator> phyPlanRoots = phyPlan.getRoots();
Collections.sort(phyPlanRoots);
assertEquals(phyPlanRoots.size(), 3);
assertEquals(phyPlan.getLeaves().size(), 1 );
// Check Load and LocalRearrange and GlobalRearrange
PhysicalOperator LoR = (PhysicalOperator)phyPlan.getSuccessors(phyPlanRoots.get(2)).get(0);
assertEquals( POLocalRearrange.class, LoR.getClass() );
POLocalRearrange Lor = (POLocalRearrange) LoR;
PhysicalOperator prj1 = Lor.getPlans().get(0).getLeaves().get(0);
assertEquals( POProject.class, prj1.getClass() );
assertEquals(0, ((POProject)prj1).getColumn() );
PhysicalOperator inp1 = Lor.getInputs().get(0);
assertEquals( POLoad.class, inp1.getClass() );
assertTrue( ((POLoad)inp1).getLFile().getFileName().contains("d3.txt") );
PhysicalOperator LoR1 = (PhysicalOperator)phyPlan.getSuccessors(phyPlanRoots.get(0)).get(0);
assertEquals( POLocalRearrange.class, LoR1.getClass() );
POLocalRearrange Lor1 = (POLocalRearrange) LoR1;
PhysicalOperator prj2 = Lor1.getPlans().get(0).getLeaves().get(0);
assertEquals( POProject.class, prj2.getClass() );
assertEquals(0, ((POProject)prj2).getColumn() );
PhysicalOperator inp2 = Lor1.getInputs().get(0);
assertEquals( POLoad.class, inp2.getClass() );
assertTrue( ((POLoad)inp2).getLFile().getFileName().contains("d1.txt") );
PhysicalOperator GoR = (PhysicalOperator)phyPlan.getSuccessors(LoR).get(0);
assertEquals( POGlobalRearrange.class, GoR.getClass() );
PhysicalOperator Pack = (PhysicalOperator)phyPlan.getSuccessors(GoR).get(0);
assertEquals( POPackage.class, Pack.getClass() );
PhysicalOperator LoR2 = (PhysicalOperator)phyPlan.getSuccessors(phyPlanRoots.get(1)).get(0);
assertEquals( POLocalRearrange.class, LoR2.getClass() );
POLocalRearrange Lor2 = (POLocalRearrange) LoR2;
PhysicalOperator prj3 = Lor2.getPlans().get(0).getLeaves().get(0);
assertEquals( POProject.class, prj3.getClass() );
assertEquals(1, ((POProject)prj3).getColumn() );
PhysicalOperator inp3 = Lor2.getInputs().get(0);
assertEquals( POLoad.class, inp3.getClass() );
assertTrue( ((POLoad)inp3).getLFile().getFileName().contains("d2.txt") );
PhysicalOperator GoR2 = (PhysicalOperator)phyPlan.getSuccessors(LoR2).get(0);
assertEquals( POGlobalRearrange.class, GoR2.getClass() );
PhysicalOperator Pack2 = (PhysicalOperator)phyPlan.getSuccessors(GoR2).get(0);
assertEquals( POPackage.class, Pack2.getClass() );
// Check for ForEach
PhysicalOperator ForE = (PhysicalOperator)phyPlan.getSuccessors(Pack).get(0);
assertEquals( POForEach.class, ForE.getClass() );
PhysicalOperator prj4 = ((POForEach)ForE).getInputPlans().get(0).getLeaves().get(0);
assertEquals( POProject.class, prj4.getClass() );
assertEquals( 1, ((POProject)prj4).getColumn() );
PhysicalOperator prj5 = ((POForEach)ForE).getInputPlans().get(1).getLeaves().get(0);
assertEquals( POProject.class, prj5.getClass() );
assertEquals( 2, ((POProject)prj5).getColumn() );
PhysicalOperator ForE2 = (PhysicalOperator)phyPlan.getSuccessors(Pack2).get(0);
assertEquals( POForEach.class, ForE2.getClass() );
PhysicalOperator prj6 = ((POForEach)ForE2).getInputPlans().get(0).getLeaves().get(0);
assertEquals( POProject.class, prj6.getClass() );
assertEquals( 1, ((POProject)prj6).getColumn() );
PhysicalOperator prj7 = ((POForEach)ForE2).getInputPlans().get(1).getLeaves().get(0);
assertEquals( POProject.class, prj7.getClass() );
assertEquals( 2, ((POProject)prj7).getColumn() );
// Check Filter Operator
PhysicalOperator fil = (PhysicalOperator)phyPlan.getSuccessors(ForE).get(0);
assertEquals( POFilter.class, fil.getClass() );
PhysicalPlan filPlan = ((POFilter)fil).getPlan();
List<PhysicalOperator> filRoots = filPlan.getRoots();
Collections.sort(filRoots);
assertEquals( ConstantExpression.class, filRoots.get(1).getClass() );
ConstantExpression ce1 = (ConstantExpression) filRoots.get(1);
assertEquals( null, ce1.getValue() );
assertEquals( ConstantExpression.class, filRoots.get(3).getClass() );
ConstantExpression ce2 = (ConstantExpression) filRoots.get(3);
assertEquals( null, ce2.getValue() );
assertEquals( POProject.class, filRoots.get(2).getClass() );
POProject prj8 = (POProject) filRoots.get(2);
assertEquals( 5, prj8.getColumn() );
assertEquals( POProject.class, filRoots.get(0).getClass() );
POProject prj9 = (POProject) filRoots.get(0);
assertEquals( 2, prj9.getColumn() );
PhysicalOperator fil2 = (PhysicalOperator)phyPlan.getSuccessors(ForE2).get(0);
assertEquals( POFilter.class, fil2.getClass() );
PhysicalOperator LoR3 = (PhysicalOperator)phyPlan.getSuccessors(fil2).get(0);
assertEquals( POLocalRearrange.class, LoR3.getClass() );
POLocalRearrange Lor3 = (POLocalRearrange) LoR3;
PhysicalOperator prj12 = Lor3.getPlans().get(0).getLeaves().get(0);
assertEquals( POProject.class, prj12.getClass() );
assertEquals(3, ((POProject)prj12).getColumn() );
PhysicalPlan filPlan2 = ((POFilter)fil2).getPlan();
List<PhysicalOperator> filRoots2 = filPlan2.getRoots();
Collections.sort(filRoots2);
assertEquals( ConstantExpression.class, filRoots2.get(1).getClass() );
ConstantExpression ce3 = (ConstantExpression) filRoots2.get(1);
assertEquals( null, ce3.getValue() );
assertEquals( ConstantExpression.class, filRoots2.get(3).getClass() );
ConstantExpression ce4 = (ConstantExpression) filRoots2.get(3);
assertEquals( null, ce4.getValue() );
assertEquals( POProject.class, filRoots2.get(2).getClass() );
POProject prj10 = (POProject) filRoots2.get(2);
assertEquals( 3, prj10.getColumn() );
assertEquals( POProject.class, filRoots2.get(0).getClass() );
POProject prj11 = (POProject) filRoots2.get(0);
assertEquals( 0, prj11.getColumn() );
// Check Store Operator
PhysicalOperator stor = (PhysicalOperator)phyPlan.getLeaves().get(0);
assertEquals( stor, phyPlan.getSuccessors(fil).get(0));
assertEquals( POStore.class, stor.getClass() );
assertTrue( ((POStore)stor).getSFile().getFileName().contains("empty") );
}
@Test
public void testPlanWithCast() throws Exception {
String query = ("a = load 'd.txt' as (id, c);" +
"b = filter a by (int)id==10;" +
"store b into 'empty';");
// check basics
LogicalPlan newLogicalPlan = buildPlan(query);
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
assertEquals(3, phyPlan.size());
assertEquals(phyPlan.getRoots().size(), 1);
assertEquals(phyPlan.getLeaves().size(), 1 );
PhysicalOperator load = phyPlan.getRoots().get(0);
assertEquals( POLoad.class, load.getClass() );
assertTrue( ((POLoad)load).getLFile().getFileName().contains("d.txt"));
PhysicalOperator fil = phyPlan.getSuccessors(load).get(0);
assertEquals( POFilter.class, fil.getClass() );
PhysicalPlan filPlan = ((POFilter)fil).getPlan();
PhysicalOperator equal = filPlan.getLeaves().get(0);
assertEquals( EqualToExpr.class, equal.getClass() );
assertEquals( DataType.BOOLEAN, ((EqualToExpr)equal).getResultType() );
PhysicalOperator constExpr = ((EqualToExpr)equal).getRhs();
assertEquals( ConstantExpression.class, constExpr.getClass() );
assertEquals( 10, ((ConstantExpression)constExpr).getValue() );
PhysicalOperator castExpr = ((EqualToExpr)equal).getLhs();
assertEquals( POCast.class, castExpr.getClass() );
assertEquals( DataType.INTEGER, ((POCast)castExpr).getResultType() );
PhysicalOperator prj = ((POCast)castExpr).getInputs().get(0);
assertEquals( POProject.class, prj.getClass() );
assertEquals( 0, ((POProject)prj).getColumn() );
PhysicalOperator stor = phyPlan.getLeaves().get(0);
assertEquals( POStore.class, stor.getClass() );
assertTrue( ((POStore)stor).getSFile().getFileName().contains( "empty" ) );
}
@Test
public void testPlanWithGreaterThan() throws Exception {
String query = ("a = load 'd.txt' as (id, c);" +
"b = filter a by (int)id>10;" +
"store b into 'empty';");
// check basics
LogicalPlan newLogicalPlan = buildPlan(query);
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
assertEquals(3, phyPlan.size());
assertEquals(phyPlan.getRoots().size(), 1);
assertEquals(phyPlan.getLeaves().size(), 1 );
PhysicalOperator load = phyPlan.getRoots().get(0);
assertEquals( POLoad.class, load.getClass() );
assertTrue( ((POLoad)load).getLFile().getFileName().contains("d.txt"));
PhysicalOperator fil = phyPlan.getSuccessors(load).get(0);
assertEquals( POFilter.class, fil.getClass() );
PhysicalPlan filPlan = ((POFilter)fil).getPlan();
PhysicalOperator greaterThan = filPlan.getLeaves().get(0);
assertEquals( GreaterThanExpr.class, greaterThan.getClass() );
assertEquals( DataType.BOOLEAN, ((GreaterThanExpr)greaterThan).getResultType() );
PhysicalOperator constExpr = ((GreaterThanExpr)greaterThan).getRhs();
assertEquals( ConstantExpression.class, constExpr.getClass() );
assertEquals( 10, ((ConstantExpression)constExpr).getValue() );
PhysicalOperator castExpr = ((GreaterThanExpr)greaterThan).getLhs();
assertEquals( POCast.class, castExpr.getClass() );
assertEquals( DataType.INTEGER, ((POCast)castExpr).getResultType() );
PhysicalOperator prj = ((POCast)castExpr).getInputs().get(0);
assertEquals( POProject.class, prj.getClass() );
assertEquals( 0, ((POProject)prj).getColumn() );
PhysicalOperator stor = phyPlan.getLeaves().get(0);
assertEquals( POStore.class, stor.getClass() );
assertTrue( ((POStore)stor).getSFile().getFileName().contains( "empty" ) );
}
@Test
public void testPlanWithLessThan() throws Exception {
String query = ("a = load 'd.txt' as (id, c);" +
"b = filter a by (int)id<10;" +
"store b into 'empty';");
// check basics
LogicalPlan newLogicalPlan = buildPlan(query);
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
assertEquals(3, phyPlan.size());
assertEquals(phyPlan.getRoots().size(), 1);
assertEquals(phyPlan.getLeaves().size(), 1 );
PhysicalOperator load = phyPlan.getRoots().get(0);
assertEquals( POLoad.class, load.getClass() );
assertTrue( ((POLoad)load).getLFile().getFileName().contains("d.txt"));
PhysicalOperator fil = phyPlan.getSuccessors(load).get(0);
assertEquals( POFilter.class, fil.getClass() );
PhysicalPlan filPlan = ((POFilter)fil).getPlan();
PhysicalOperator lessThan = filPlan.getLeaves().get(0);
assertEquals( LessThanExpr.class, lessThan.getClass() );
assertEquals( DataType.BOOLEAN, ((LessThanExpr)lessThan).getResultType() );
PhysicalOperator constExpr = ((LessThanExpr)lessThan).getRhs();
assertEquals( ConstantExpression.class, constExpr.getClass() );
assertEquals( 10, ((ConstantExpression)constExpr).getValue() );
PhysicalOperator castExpr = ((LessThanExpr)lessThan).getLhs();
assertEquals( POCast.class, castExpr.getClass() );
assertEquals( DataType.INTEGER, ((POCast)castExpr).getResultType() );
PhysicalOperator prj = ((POCast)castExpr).getInputs().get(0);
assertEquals( POProject.class, prj.getClass() );
assertEquals( 0, ((POProject)prj).getColumn() );
PhysicalOperator stor = phyPlan.getLeaves().get(0);
assertEquals( POStore.class, stor.getClass() );
assertTrue( ((POStore)stor).getSFile().getFileName().contains( "empty" ) );
}
@Test
public void testForeachPlan() throws Exception {
String query = ("a = load 'd.txt' as (id, c);" +
"b = foreach a generate id, c;" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
assertEquals(phyPlan.size(), 3);
assertEquals(phyPlan.getLeaves().get(0).getClass(), POStore.class);
POForEach foreach = (POForEach)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0);
assertEquals(foreach.getInputPlans().size(), 2);
PhysicalPlan inner = foreach.getInputPlans().get(0);
assertEquals(inner.size(), 1);
POProject prj = (POProject)inner.getRoots().get(0);
assertEquals(prj.getColumn(), 0);
assertEquals(prj.getInputs(), null);
inner = foreach.getInputPlans().get(1);
assertEquals(inner.size(), 1);
prj = (POProject)inner.getRoots().get(0);
assertEquals(prj.getColumn(), 1);
assertEquals(prj.getInputs(), null);
Boolean[] flat = foreach.getToBeFlattened().toArray(new Boolean[0]);
assertFalse(flat[0]);
assertFalse(flat[1]);
}
@Test
public void testForeachPlan2() throws Exception {
String query = ("a = load 'd.txt' as (id, c:bag{t:(s,v)});" +
"b = foreach a generate id, flatten(c);" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
LogicalRelationalOperator fe = (LogicalRelationalOperator)newLogicalPlan.getSuccessors(ld).get(0);
LogicalSchema ls = fe.getSchema();
assertEquals(1, ls.getField(0).uid);
assertEquals(4, ls.getField(1).uid);
assertEquals(5, ls.getField(2).uid);
LogicalSchema expected = new LogicalSchema();
expected.addField(new LogicalFieldSchema("id", null, DataType.BYTEARRAY));
expected.addField(new LogicalFieldSchema("s", null, DataType.BYTEARRAY));
expected.addField(new LogicalFieldSchema("v", null, DataType.BYTEARRAY));
assertTrue(expected.isEqual(ls));
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
assertEquals(phyPlan.size(), 3);
assertEquals(phyPlan.getLeaves().get(0).getClass(), POStore.class);
POForEach foreach = (POForEach)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0);
assertEquals(foreach.getInputPlans().size(), 2);
PhysicalPlan inner = foreach.getInputPlans().get(0);
assertEquals(inner.size(), 1);
POProject prj = (POProject)inner.getRoots().get(0);
assertEquals(prj.getColumn(), 0);
assertEquals(prj.getInputs(), null);
inner = foreach.getInputPlans().get(1);
assertEquals(inner.size(), 1);
prj = (POProject)inner.getRoots().get(0);
assertEquals(prj.getColumn(), 1);
assertEquals(prj.getInputs(), null);
Boolean[] flat = foreach.getToBeFlattened().toArray(new Boolean[0]);
assertFalse(flat[0]);
assertTrue(flat[1]);
}
@Test
public void testPlanwithPlus() throws Exception {
String query = ("a = load 'd.txt' as (a:int, b:int);" +
"b = foreach a generate a+b;" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
assertEquals( LOLoad.class, ld.getClass() );
LOLoad load = (LOLoad)ld;
LogicalSchema ls = load.getSchema();
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0);
assertEquals( POForEach.class, pFE.getClass() );
POForEach pForEach = (POForEach)pFE;
PhysicalPlan inputPln = pForEach.getInputPlans().get(0);
assertEquals(1, ls.getField(0).uid);
assertEquals(2, ls.getField(1).uid);
LogicalRelationalOperator fe =
(LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0);
assertEquals( LOForEach.class, fe.getClass() );
LOForEach forEach = (LOForEach)fe;
LogicalPlan innerPlan =
forEach.getInnerPlan();
assertEquals( 1, innerPlan.getSinks().size() );
assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() );
LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
assertEquals( 1, gen.getOutputPlans().size() );
LogicalExpressionPlan genExp = gen.getOutputPlans().get(0);
assertEquals( 1, genExp.getSources().size() );
// Main Tests start here
assertEquals( AddExpression.class, genExp.getSources().get(0).getClass() );
AddExpression add = (AddExpression) genExp.getSources().get(0);
assertEquals( ls.getField(0).uid, add.getLhs().getFieldSchema().uid );
assertEquals( ls.getField(1).uid, add.getRhs().getFieldSchema().uid );
assertTrue( ls.getField(0).uid != add.getFieldSchema().uid );
assertTrue( ls.getField(1).uid != add.getFieldSchema().uid );
assertEquals( 1, inputPln.getLeaves().size() );
assertEquals( Add.class, inputPln.getLeaves().get(0).getClass() );
Add pAdd = (Add) inputPln.getLeaves().get(0);
assertEquals( 2, inputPln.getRoots().size() );
assertEquals( POProject.class, pAdd.getLhs().getClass() );
assertEquals( POProject.class, pAdd.getRhs().getClass() );
}
@Test
public void testPlanwithSubtract() throws Exception {
String query = ("a = load 'd.txt' as (a:int, b:int);" +
"b = foreach a generate a-b;" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
assertEquals( LOLoad.class, ld.getClass() );
LOLoad load = (LOLoad)ld;
LogicalSchema ls = load.getSchema();
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0);
assertEquals( POForEach.class, pFE.getClass() );
POForEach pForEach = (POForEach)pFE;
PhysicalPlan inputPln = pForEach.getInputPlans().get(0);
assertEquals(1, ls.getField(0).uid);
assertEquals(2, ls.getField(1).uid);
LogicalRelationalOperator fe =
(LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0);
assertEquals( LOForEach.class, fe.getClass() );
LOForEach forEach = (LOForEach)fe;
LogicalPlan innerPlan =
forEach.getInnerPlan();
assertEquals( 1, innerPlan.getSinks().size() );
assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() );
LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
assertEquals( 1, gen.getOutputPlans().size() );
LogicalExpressionPlan genExp = gen.getOutputPlans().get(0);
assertEquals( 1, genExp.getSources().size() );
// Main Tests start here
assertEquals( SubtractExpression.class, genExp.getSources().get(0).getClass() );
SubtractExpression add = (SubtractExpression) genExp.getSources().get(0);
assertEquals( ls.getField(0).uid, add.getLhs().getFieldSchema().uid );
assertEquals( ls.getField(1).uid, add.getRhs().getFieldSchema().uid );
assertTrue( ls.getField(0).uid != add.getFieldSchema().uid );
assertTrue( ls.getField(1).uid != add.getFieldSchema().uid );
assertEquals( 1, inputPln.getLeaves().size() );
assertEquals( Subtract.class, inputPln.getLeaves().get(0).getClass() );
Subtract pSubtract = (Subtract) inputPln.getLeaves().get(0);
assertEquals( 2, inputPln.getRoots().size() );
assertEquals( POProject.class, pSubtract.getLhs().getClass() );
assertEquals( POProject.class, pSubtract.getRhs().getClass() );
}
@Test
public void testPlanwithMultiply() throws Exception {
String query = ("a = load 'd.txt' as (a:int, b:int);" +
"b = foreach a generate a*b;" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
assertEquals( LOLoad.class, ld.getClass() );
LOLoad load = (LOLoad)ld;
LogicalSchema ls = load.getSchema();
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0);
assertEquals( POForEach.class, pFE.getClass() );
POForEach pForEach = (POForEach)pFE;
PhysicalPlan inputPln = pForEach.getInputPlans().get(0);
assertEquals(1, ls.getField(0).uid);
assertEquals(2, ls.getField(1).uid);
LogicalRelationalOperator fe =
(LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0);
assertEquals( LOForEach.class, fe.getClass() );
LOForEach forEach = (LOForEach)fe;
LogicalPlan innerPlan =
forEach.getInnerPlan();
assertEquals( 1, innerPlan.getSinks().size() );
assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() );
LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
assertEquals( 1, gen.getOutputPlans().size() );
LogicalExpressionPlan genExp = gen.getOutputPlans().get(0);
assertEquals( 1, genExp.getSources().size() );
// Main Tests start here
assertEquals( MultiplyExpression.class, genExp.getSources().get(0).getClass() );
MultiplyExpression add = (MultiplyExpression) genExp.getSources().get(0);
assertEquals( ls.getField(0).uid, add.getLhs().getFieldSchema().uid );
assertEquals( ls.getField(1).uid, add.getRhs().getFieldSchema().uid );
assertTrue( ls.getField(0).uid != add.getFieldSchema().uid );
assertTrue( ls.getField(1).uid != add.getFieldSchema().uid );
assertEquals( 1, inputPln.getLeaves().size() );
assertEquals( Multiply.class, inputPln.getLeaves().get(0).getClass() );
Multiply pMultiply = (Multiply) inputPln.getLeaves().get(0);
assertEquals( 2, inputPln.getRoots().size() );
assertEquals( POProject.class, pMultiply.getLhs().getClass() );
assertEquals( POProject.class, pMultiply.getRhs().getClass() );
}
@Test
public void testPlanwithDivide() throws Exception {
String query = ("a = load 'd.txt' as (a:int, b:int);" +
"b = foreach a generate a/b;" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
assertEquals( LOLoad.class, ld.getClass() );
LOLoad load = (LOLoad)ld;
LogicalSchema ls = load.getSchema();
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0);
assertEquals( POForEach.class, pFE.getClass() );
POForEach pForEach = (POForEach)pFE;
PhysicalPlan inputPln = pForEach.getInputPlans().get(0);
assertEquals(1, ls.getField(0).uid);
assertEquals(2, ls.getField(1).uid);
LogicalRelationalOperator fe =
(LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0);
assertEquals( LOForEach.class, fe.getClass() );
LOForEach forEach = (LOForEach)fe;
LogicalPlan innerPlan =
forEach.getInnerPlan();
assertEquals( 1, innerPlan.getSinks().size() );
assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() );
LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
assertEquals( 1, gen.getOutputPlans().size() );
LogicalExpressionPlan genExp = gen.getOutputPlans().get(0);
assertEquals( 1, genExp.getSources().size() );
// Main Tests start here
assertEquals( DivideExpression.class, genExp.getSources().get(0).getClass() );
DivideExpression add = (DivideExpression) genExp.getSources().get(0);
assertEquals( ls.getField(0).uid, add.getLhs().getFieldSchema().uid );
assertEquals( ls.getField(1).uid, add.getRhs().getFieldSchema().uid );
assertTrue( ls.getField(0).uid != add.getFieldSchema().uid );
assertTrue( ls.getField(1).uid != add.getFieldSchema().uid );
assertEquals( 1, inputPln.getLeaves().size() );
assertEquals( Divide.class, inputPln.getLeaves().get(0).getClass() );
Divide pDivide = (Divide) inputPln.getLeaves().get(0);
assertEquals( 2, inputPln.getRoots().size() );
assertEquals( POProject.class, pDivide.getLhs().getClass() );
assertEquals( POProject.class, pDivide.getRhs().getClass() );
}
@Test
public void testPlanwithMod() throws Exception {
String query = ("a = load 'd.txt' as (a:int, b:int);" +
"b = foreach a generate a%b;" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
assertEquals( LOLoad.class, ld.getClass() );
LOLoad load = (LOLoad)ld;
LogicalSchema ls = load.getSchema();
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0);
assertEquals( POForEach.class, pFE.getClass() );
POForEach pForEach = (POForEach)pFE;
PhysicalPlan inputPln = pForEach.getInputPlans().get(0);
assertEquals(1, ls.getField(0).uid);
assertEquals(2, ls.getField(1).uid);
LogicalRelationalOperator fe =
(LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0);
assertEquals( LOForEach.class, fe.getClass() );
LOForEach forEach = (LOForEach)fe;
LogicalPlan innerPlan =
forEach.getInnerPlan();
assertEquals( 1, innerPlan.getSinks().size() );
assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() );
LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
assertEquals( 1, gen.getOutputPlans().size() );
LogicalExpressionPlan genExp = gen.getOutputPlans().get(0);
assertEquals( 1, genExp.getSources().size() );
// Main Tests start here
assertEquals( ModExpression.class, genExp.getSources().get(0).getClass() );
ModExpression add = (ModExpression) genExp.getSources().get(0);
assertEquals( ls.getField(0).uid, add.getLhs().getFieldSchema().uid );
assertEquals( ls.getField(1).uid, add.getRhs().getFieldSchema().uid );
assertTrue( ls.getField(0).uid != add.getFieldSchema().uid );
assertTrue( ls.getField(1).uid != add.getFieldSchema().uid );
assertEquals( 1, inputPln.getLeaves().size() );
assertEquals( Mod.class, inputPln.getLeaves().get(0).getClass() );
Mod pMod = (Mod) inputPln.getLeaves().get(0);
assertEquals( 2, inputPln.getRoots().size() );
assertEquals( POProject.class, pMod.getLhs().getClass() );
assertEquals( POProject.class, pMod.getRhs().getClass() );
}
@Test
public void testPlanwithNegative() throws Exception {
String query = ("a = load 'd.txt' as (a:int, b:int);" +
"b = foreach a generate -a;" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
assertEquals( LOLoad.class, ld.getClass() );
LOLoad load = (LOLoad)ld;
LogicalSchema ls = load.getSchema();
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0);
assertEquals( POForEach.class, pFE.getClass() );
POForEach pForEach = (POForEach)pFE;
PhysicalPlan inputPln = pForEach.getInputPlans().get(0);
assertEquals(1, ls.getField(0).uid);
assertEquals(2, ls.getField(1).uid);
LogicalRelationalOperator fe =
(LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0);
assertEquals( LOForEach.class, fe.getClass() );
LOForEach forEach = (LOForEach)fe;
LogicalPlan innerPlan =
forEach.getInnerPlan();
assertEquals( 1, innerPlan.getSinks().size() );
assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() );
LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
assertEquals( 1, gen.getOutputPlans().size() );
LogicalExpressionPlan genExp = gen.getOutputPlans().get(0);
assertEquals( 1, genExp.getSources().size() );
// Main Tests start here
assertEquals( NegativeExpression.class, genExp.getSources().get(0).getClass() );
NegativeExpression add = (NegativeExpression) genExp.getSources().get(0);
assertEquals( ls.getField(0).uid, add.getExpression().getFieldSchema().uid );
assertTrue( ls.getField(0).uid != add.getFieldSchema().uid );
assertTrue( ls.getField(1).uid != add.getFieldSchema().uid );
assertEquals( 1, inputPln.getLeaves().size() );
assertEquals( PONegative.class, inputPln.getLeaves().get(0).getClass() );
PONegative pNegative = (PONegative) inputPln.getLeaves().get(0);
assertEquals( 1, inputPln.getRoots().size() );
assertEquals( POProject.class, pNegative.getInputs().get(0).getClass() );
}
@Test
public void testPlanwithisNull() throws Exception {
String query = ("a = load 'd.txt' as (a:int, b:int);" +
"b = filter a by a is null;" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
assertEquals( LOLoad.class, ld.getClass() );
LOLoad load = (LOLoad)ld;
LogicalSchema ls = load.getSchema();
assertEquals(1, ls.getField(0).uid);
assertEquals(2, ls.getField(1).uid);
LogicalRelationalOperator fil = (LogicalRelationalOperator)
newLogicalPlan.getSuccessors( newLogicalPlan.getSources().get(0) ).get(0);
assertEquals( LOFilter.class,
fil.getClass() );
LOFilter filter = (LOFilter)fil;
LogicalExpressionPlan filPlan = filter.getFilterPlan();
assertEquals( 1, filPlan.getSources().size() );
assertEquals( 2, filPlan.size() );
assertEquals( 1, filPlan.getSinks().size() );
assertEquals( IsNullExpression.class, filPlan.getSources().get(0).getClass() );
IsNullExpression isNull = (IsNullExpression)filPlan.getSources().get(0);
assertTrue( ls.getField(0).uid != isNull.getFieldSchema().uid );
assertTrue( ls.getField(1).uid != isNull.getFieldSchema().uid );
assertEquals( ProjectExpression.class, isNull.getExpression().getClass() );
ProjectExpression prj = (ProjectExpression) isNull.getExpression();
assertEquals( ls.getField(0).uid, prj.getFieldSchema().uid );
}
@Test
public void testPlanwithisNotNull() throws Exception {
String query = ("a = load 'd.txt' as (a:int, b:int);" +
"b = filter a by a is not null;" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
assertEquals( LOLoad.class, ld.getClass() );
LOLoad load = (LOLoad)ld;
LogicalSchema ls = load.getSchema();
assertEquals(1, ls.getField(0).uid);
assertEquals(2, ls.getField(1).uid);
LogicalRelationalOperator fil = (LogicalRelationalOperator)
newLogicalPlan.getSuccessors( newLogicalPlan.getSources().get(0) ).get(0);
assertEquals( LOFilter.class,
fil.getClass() );
LOFilter filter = (LOFilter)fil;
LogicalExpressionPlan filPlan = filter.getFilterPlan();
assertEquals( 1, filPlan.getSources().size() );
assertEquals( 3, filPlan.size() );
assertEquals( 1, filPlan.getSinks().size() );
assertEquals( NotExpression.class, filPlan.getSources().get(0).getClass() );
NotExpression notExp = (NotExpression)filPlan.getSources().get(0);
assertTrue( ls.getField(0).uid != notExp.getFieldSchema().uid );
assertTrue( ls.getField(1).uid != notExp.getFieldSchema().uid );
assertEquals( IsNullExpression.class, notExp.getExpression().getClass() );
IsNullExpression isNull = (IsNullExpression)notExp.getExpression();
assertTrue( ls.getField(0).uid != isNull.getFieldSchema().uid );
assertTrue( ls.getField(1).uid != isNull.getFieldSchema().uid );
assertEquals( ProjectExpression.class, isNull.getExpression().getClass() );
ProjectExpression prj = (ProjectExpression) isNull.getExpression();
assertEquals( ls.getField(0).uid, prj.getFieldSchema().uid );
}
@Test
public void testPlanwithBinCond() throws Exception {
String query = ("a = load 'd.txt' as (a:int, b:int);" +
"b = foreach a generate ( a < b ? b : a );" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
assertEquals( LOLoad.class, ld.getClass() );
LOLoad load = (LOLoad)ld;
LogicalSchema ls = load.getSchema();
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0);
assertEquals( POForEach.class, pFE.getClass() );
POForEach pForEach = (POForEach)pFE;
PhysicalPlan inputPln = pForEach.getInputPlans().get(0);
assertEquals(1, ls.getField(0).uid);
assertEquals(2, ls.getField(1).uid);
LogicalRelationalOperator fe =
(LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0);
assertEquals( LOForEach.class, fe.getClass() );
LOForEach forEach = (LOForEach)fe;
LogicalPlan innerPlan =
forEach.getInnerPlan();
assertEquals( 1, innerPlan.getSinks().size() );
assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() );
LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
assertEquals( 1, gen.getOutputPlans().size() );
LogicalExpressionPlan genExp = gen.getOutputPlans().get(0);
assertEquals( 1, genExp.getSources().size() );
// Main Tests start here
assertEquals( BinCondExpression.class, genExp.getSources().get(0).getClass() );
BinCondExpression add = (BinCondExpression) genExp.getSources().get(0);
assertEquals( LessThanExpression.class, add.getCondition().getClass() );
LessThanExpression lessThan = (LessThanExpression) add.getCondition();
assertEquals( ProjectExpression.class, lessThan.getLhs().getClass() );
ProjectExpression prj1 = ((ProjectExpression)lessThan.getLhs());
ProjectExpression prj2 = ((ProjectExpression)lessThan.getRhs());
assertEquals( ls.getField(0).uid, prj1.getFieldSchema().uid );
assertEquals( ProjectExpression.class, lessThan.getRhs().getClass() );
assertEquals( ls.getField(1).uid, prj2.getFieldSchema().uid );
assertEquals( ProjectExpression.class, add.getLhs().getClass() );
ProjectExpression prj3 = ((ProjectExpression)add.getLhs());
assertEquals( ls.getField(1).uid, prj3.getFieldSchema().uid );
assertEquals( ProjectExpression.class, add.getRhs().getClass() );
ProjectExpression prj4 = ((ProjectExpression)add.getRhs());
assertEquals( ls.getField(0).uid, prj4.getFieldSchema().uid );
assertEquals( 4, inputPln.getRoots().size() );
for( PhysicalOperator p : inputPln.getRoots() ) {
assertEquals( POProject.class, p.getClass() );
}
assertEquals( 1, inputPln.getLeaves().size() );
assertEquals( POBinCond.class, inputPln.getLeaves().get(0).getClass() );
POBinCond binCond = (POBinCond) inputPln.getLeaves().get(0);
assertEquals( POProject.class, binCond.getLhs().getClass() );
POProject prj_1 = (POProject)binCond.getLhs();
assertEquals( 1, prj_1.getColumn() );
assertEquals( POProject.class, binCond.getRhs().getClass() );
POProject prj_2 = (POProject) binCond.getRhs();
assertEquals( 0, prj_2.getColumn() );
assertEquals( LessThanExpr.class, binCond.getCond().getClass() );
LessThanExpr lessThan_p = (LessThanExpr) binCond.getCond();
assertEquals( POProject.class, lessThan_p.getLhs().getClass() );
POProject prj_3 = (POProject) lessThan_p.getLhs();
assertEquals( 0, prj_3.getColumn() );
assertEquals( POProject.class, lessThan_p.getRhs().getClass() );
POProject prj_4 = (POProject) lessThan_p.getRhs();
assertEquals( 1, prj_4.getColumn() );
}
@Test
public void testPlanwithUserFunc() throws Exception {
String query = ("a = load 'd.txt' as (a:int, b:bag{t:tuple(b_a:int,b_b:int)});" +
"b = foreach a generate a,COUNT(b);" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
assertEquals( LOLoad.class, ld.getClass() );
LOLoad load = (LOLoad)ld;
LogicalSchema ls = load.getSchema();
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0);
assertEquals( POForEach.class, pFE.getClass() );
POForEach pForEach = (POForEach)pFE;
PhysicalPlan inputPln1 = pForEach.getInputPlans().get(0);
assertEquals(1, ls.getField(0).uid);
assertEquals(2, ls.getField(1).uid);
LogicalRelationalOperator fe =
(LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0);
assertEquals( LOForEach.class, fe.getClass() );
LOForEach forEach = (LOForEach)fe;
LogicalPlan innerPlan =
forEach.getInnerPlan();
assertEquals( 1, innerPlan.getSinks().size() );
assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() );
LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
assertEquals( 2, gen.getOutputPlans().size() );
LogicalExpressionPlan genExp1 = gen.getOutputPlans().get(0);
assertEquals( 1, genExp1.getSources().size() );
assertEquals( ProjectExpression.class, genExp1.getSources().get(0).getClass() );
ProjectExpression prj1 = (ProjectExpression) genExp1.getSources().get(0);
assertEquals( ls.getField(0).uid, prj1.getFieldSchema().uid );
LogicalExpressionPlan genExp2 = gen.getOutputPlans().get(1);
assertEquals( UserFuncExpression.class, genExp2.getSources().get(0).getClass() );
assertEquals( ProjectExpression.class, genExp2.getSinks().get(0).getClass() );
ProjectExpression prj2 = (ProjectExpression)genExp2.getSinks().get(0);
assertEquals( ls.getField(1).uid, prj2.getFieldSchema().uid );
assertEquals( 1, inputPln1.getLeaves().size() );
assertEquals( 1, inputPln1.getRoots().size() );
assertEquals( POProject.class, inputPln1.getLeaves().get(0).getClass() );
assertEquals( 0, (( POProject) inputPln1.getLeaves().get(0)).getColumn() );
PhysicalPlan inputPln2 = pForEach.getInputPlans().get(1);
assertEquals( POUserFunc.class, inputPln2.getLeaves().get(0).getClass() );
assertEquals( "org.apache.pig.builtin.COUNT",
((POUserFunc) inputPln2.getLeaves().get(0)).getFuncSpec().getClassName() );
assertEquals( POProject.class, inputPln2.getRoots().get(0).getClass() );
assertEquals( 1, ((POProject)inputPln2.getRoots().get(0)).getColumn() );
}
@Test
public void testPlanwithUserFunc2() throws Exception {
// This one uses BagDereferenceExpression
String query = ("a = load 'd.txt' as (a:int, b:bag{t:tuple(b_a:int,b_b:int)});" +
"b = foreach a generate a,COUNT(b.b_a);" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
assertEquals( LOLoad.class, ld.getClass() );
LOLoad load = (LOLoad)ld;
LogicalSchema ls = load.getSchema();
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0);
assertEquals( POForEach.class, pFE.getClass() );
POForEach pForEach = (POForEach)pFE;
PhysicalPlan inputPln = pForEach.getInputPlans().get(0);
assertEquals(1, ls.getField(0).uid);
assertEquals(2, ls.getField(1).uid);
LogicalRelationalOperator fe =
(LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0);
assertEquals( LOForEach.class, fe.getClass() );
LOForEach forEach = (LOForEach)fe;
LogicalPlan innerPlan =
forEach.getInnerPlan();
assertEquals( 1, innerPlan.getSinks().size() );
assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() );
LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
assertEquals( 2, gen.getOutputPlans().size() );
LogicalExpressionPlan genExp1 = gen.getOutputPlans().get(0);
assertEquals( 1, genExp1.getSources().size() );
assertEquals( ProjectExpression.class, genExp1.getSources().get(0).getClass() );
ProjectExpression prj1 = (ProjectExpression) genExp1.getSources().get(0);
assertEquals( ls.getField(0).uid, prj1.getFieldSchema().uid );
LogicalExpressionPlan genExp2 = gen.getOutputPlans().get(1);
assertEquals( UserFuncExpression.class, genExp2.getSources().get(0).getClass() );
assertEquals( ProjectExpression.class, genExp2.getSinks().get(0).getClass() );
ProjectExpression prj2 = (ProjectExpression)genExp2.getSinks().get(0);
assertEquals( ls.getField(1).uid, prj2.getFieldSchema().uid );
assertEquals( DereferenceExpression.class, genExp2.getPredecessors(prj2).get(0).getClass() );
assertEquals( 0, (int)((DereferenceExpression)genExp2.getPredecessors(prj2).get(0)).getBagColumns().get(0) );
assertEquals( 1, inputPln.getRoots().size() );
assertEquals( POProject.class, inputPln.getRoots().get(0).getClass() );
assertEquals( 0, ((POProject)inputPln.getRoots().get(0)).getColumn() );
PhysicalPlan inputPln2 = pForEach.getInputPlans().get(1);
assertEquals( 1, inputPln2.getRoots().size() );
assertEquals( POProject.class, inputPln2.getRoots().get(0).getClass() );
assertEquals(1, ((POProject)inputPln2.getRoots().get(0)).getColumn() );
assertEquals( POUserFunc.class, inputPln2.getLeaves().get(0).getClass() );
assertEquals( "org.apache.pig.builtin.COUNT",
((POUserFunc)inputPln2.getLeaves().get(0)).getFuncSpec().getClassName() );
POProject prj3 = (POProject)inputPln2.getRoots().get(0);
assertEquals( POProject.class, inputPln2.getSuccessors(prj3).get(0).getClass() );
assertEquals( 0, ((POProject)inputPln2.getSuccessors(prj3).get(0)).getColumn() );
}
@Test
public void testCogroup() throws Exception {
String query = ("a = load 'd.txt' as (name:chararray, age:int, gpa:float);" +
"b = group a by name;" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
assertEquals( 1, phyPlan.getRoots().size() );
assertEquals( POLoad.class, phyPlan.getRoots().get(0).getClass() );
POLoad load = (POLoad)phyPlan.getRoots().get(0);
assertEquals( POLocalRearrange.class, phyPlan.getSuccessors(load).get(0).getClass() );
POLocalRearrange localR = (POLocalRearrange)phyPlan.getSuccessors(load).get(0);
assertEquals( 1, localR.getInputs().size() );
assertEquals( 1, localR.getPlans().size() );
PhysicalPlan cogroupPlan = localR.getPlans().get(0);
assertEquals( 1, cogroupPlan.getLeaves().size() );
assertEquals( POProject.class, cogroupPlan.getLeaves().get(0).getClass() );
POProject prj = (POProject)cogroupPlan.getLeaves().get(0);
assertEquals( 0, prj.getColumn() );
assertEquals( DataType.CHARARRAY, prj.getResultType() );
assertEquals( POGlobalRearrange.class, phyPlan.getSuccessors(localR).get(0).getClass() );
POGlobalRearrange globalR = (POGlobalRearrange)phyPlan.getSuccessors(localR).get(0);
assertEquals( DataType.TUPLE, globalR.getResultType() );
assertEquals( POPackage.class, phyPlan.getSuccessors(globalR).get(0).getClass() );
POPackage pack = (POPackage)phyPlan.getSuccessors(globalR).get(0);
assertEquals( DataType.TUPLE, pack.getResultType() );
}
@Test
public void testCogroup2() throws Exception {
String query = ("a = load 'd.txt' as (name:chararray, age:int, gpa:float);" +
"b = group a by ( name, age );" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
assertEquals( 1, phyPlan.getRoots().size() );
assertEquals( POLoad.class, phyPlan.getRoots().get(0).getClass() );
POLoad load = (POLoad)phyPlan.getRoots().get(0);
assertEquals( POLocalRearrange.class, phyPlan.getSuccessors(load).get(0).getClass() );
POLocalRearrange localR = (POLocalRearrange)phyPlan.getSuccessors(load).get(0);
assertEquals( 1, localR.getInputs().size() );
assertEquals( 2, localR.getPlans().size() );
PhysicalPlan cogroupPlan = localR.getPlans().get(0);
assertEquals( 1, cogroupPlan.getLeaves().size() );
assertEquals( POProject.class, cogroupPlan.getLeaves().get(0).getClass() );
POProject prj = (POProject)cogroupPlan.getLeaves().get(0);
assertEquals( 0, prj.getColumn() );
assertEquals( DataType.CHARARRAY, prj.getResultType() );
PhysicalPlan cogroupPlan2 = localR.getPlans().get(1);
POProject prj2 = (POProject)cogroupPlan2.getLeaves().get(0);
assertEquals( 1, prj2.getColumn() );
assertEquals( DataType.INTEGER, prj2.getResultType() );
assertEquals( POGlobalRearrange.class, phyPlan.getSuccessors(localR).get(0).getClass() );
POGlobalRearrange globalR = (POGlobalRearrange)phyPlan.getSuccessors(localR).get(0);
assertEquals( DataType.TUPLE, globalR.getResultType() );
assertEquals( POPackage.class, phyPlan.getSuccessors(globalR).get(0).getClass() );
POPackage pack = (POPackage)phyPlan.getSuccessors(globalR).get(0);
assertEquals( DataType.TUPLE, pack.getResultType() );
}
@Test
public void testCogroup3() throws Exception {
String query = "a = load 'd.txt' as (name:chararray, age:int, gpa:float);" +
"b = load 'e.txt' as (name:chararray, age:int, gpa:float);" +
"c = group a by name, b by name;" +
"store c into 'empty';";
LogicalPlan newLogicalPlan = buildPlan(query);
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
assertEquals( 2, phyPlan.getRoots().size() );
assertEquals( POLoad.class, phyPlan.getRoots().get(0).getClass() );
POLoad load = (POLoad)phyPlan.getRoots().get(0);
assertEquals( POLocalRearrange.class, phyPlan.getSuccessors(load).get(0).getClass() );
POLocalRearrange localR = (POLocalRearrange)phyPlan.getSuccessors(load).get(0);
assertEquals( 1, localR.getPlans().size() );
PhysicalPlan cogroupPlan = localR.getPlans().get(0);
assertEquals( 1, cogroupPlan.getLeaves().size() );
assertEquals( POProject.class, cogroupPlan.getLeaves().get(0).getClass() );
POProject prj = (POProject)cogroupPlan.getLeaves().get(0);
assertEquals( 0, prj.getColumn() );
assertEquals( DataType.CHARARRAY, prj.getResultType() );
assertEquals( POGlobalRearrange.class, phyPlan.getSuccessors(localR).get(0).getClass() );
POGlobalRearrange globalR = (POGlobalRearrange)phyPlan.getSuccessors(localR).get(0);
assertEquals( DataType.TUPLE, globalR.getResultType() );
assertEquals( POLoad.class, phyPlan.getRoots().get(1).getClass() );
POLoad load2 = (POLoad)phyPlan.getRoots().get(0);
assertEquals( POLocalRearrange.class, phyPlan.getSuccessors(load2).get(0).getClass() );
POLocalRearrange localR2 = (POLocalRearrange)phyPlan.getSuccessors(load2).get(0);
assertEquals( 1, localR2.getPlans().size() );
PhysicalPlan cogroupPlan2 = localR2.getPlans().get(0);
POProject prj2 = (POProject)cogroupPlan2.getLeaves().get(0);
assertEquals( 0, prj2.getColumn() );
assertEquals( DataType.CHARARRAY, prj2.getResultType() );
assertEquals( POPackage.class, phyPlan.getSuccessors(globalR).get(0).getClass() );
POPackage pack = (POPackage)phyPlan.getSuccessors(globalR).get(0);
assertEquals( DataType.TUPLE, pack.getResultType() );
}
@Test
public void testCogroup4() throws Exception {
String query = "a = load 'd.txt' as (name:chararray, age:int, gpa:float);" +
"b = load 'e.txt' as (name:chararray, age:int, gpa:float);" +
"c = group a by ( name, age ), b by ( name, age );" +
"store c into 'empty';";
LogicalPlan newLogicalPlan = buildPlan(query);
PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
assertEquals( 2, phyPlan.getRoots().size() );
assertEquals( POLoad.class, phyPlan.getRoots().get(0).getClass() );
POLoad load = (POLoad)phyPlan.getRoots().get(0);
assertEquals( POLocalRearrange.class, phyPlan.getSuccessors(load).get(0).getClass() );
POLocalRearrange localR = (POLocalRearrange)phyPlan.getSuccessors(load).get(0);
assertEquals( 2, localR.getPlans().size() );
PhysicalPlan cogroupPlan = localR.getPlans().get(0);
assertEquals( 1, cogroupPlan.getLeaves().size() );
assertEquals( POProject.class, cogroupPlan.getLeaves().get(0).getClass() );
POProject prj = (POProject)cogroupPlan.getLeaves().get(0);
assertEquals( 0, prj.getColumn() );
assertEquals( DataType.CHARARRAY, prj.getResultType() );
PhysicalPlan cogroupPlan2 = localR.getPlans().get(1);
assertEquals( 1, cogroupPlan2.getLeaves().size() );
assertEquals( POProject.class, cogroupPlan2.getLeaves().get(0).getClass() );
POProject prj2 = (POProject)cogroupPlan2.getLeaves().get(0);
assertEquals( 1, prj2.getColumn() );
assertEquals( DataType.INTEGER, prj2.getResultType() );
assertEquals( POGlobalRearrange.class, phyPlan.getSuccessors(localR).get(0).getClass() );
POGlobalRearrange globalR = (POGlobalRearrange)phyPlan.getSuccessors(localR).get(0);
assertEquals( DataType.TUPLE, globalR.getResultType() );
assertEquals( POLoad.class, phyPlan.getRoots().get(1).getClass() );
POLoad load2 = (POLoad)phyPlan.getRoots().get(0);
assertEquals( POLocalRearrange.class, phyPlan.getSuccessors(load2).get(0).getClass() );
POLocalRearrange localR3 = (POLocalRearrange)phyPlan.getSuccessors(load2).get(0);
assertEquals( 2, localR3.getPlans().size() );
PhysicalPlan cogroupPlan3 = localR3.getPlans().get(0);
POProject prj3 = (POProject)cogroupPlan3.getLeaves().get(0);
assertEquals( 0, prj3.getColumn() );
assertEquals( DataType.CHARARRAY, prj3.getResultType() );
PhysicalPlan cogroupPlan4 = localR3.getPlans().get(1);
POProject prj4 = (POProject)cogroupPlan4.getLeaves().get(0);
assertEquals( 1, prj4.getColumn() );
assertEquals( DataType.INTEGER, prj4.getResultType() );
assertEquals( POPackage.class, phyPlan.getSuccessors(globalR).get(0).getClass() );
POPackage pack = (POPackage)phyPlan.getSuccessors(globalR).get(0);
assertEquals( DataType.TUPLE, pack.getResultType() );
}
@Test
public void testUserDefinedForEachSchema1() throws Exception {
String query = ("a = load 'a.txt';" +
"b = foreach a generate $0 as a0, $1 as a1;" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
Operator store = newLogicalPlan.getSinks().get(0);
LOForEach foreach = (LOForEach)newLogicalPlan.getPredecessors(store).get(0);
foreach.getSchema();
assertTrue(foreach.getSchema().size()==2);
assertTrue(foreach.getSchema().getField(0).alias.equals("a0"));
assertTrue(foreach.getSchema().getField(1).alias.equals("a1"));
}
@Test
public void testUserDefinedForEachSchema2() throws Exception {
String query = ("a = load 'a.txt' as (b:bag{});" +
"b = foreach a generate flatten($0) as (a0, a1);" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
Operator store = newLogicalPlan.getSinks().get(0);
LOForEach foreach = (LOForEach)newLogicalPlan.getPredecessors(store).get(0);
foreach.getSchema();
assertTrue(foreach.getSchema().size()==2);
assertTrue(foreach.getSchema().getField(0).alias.equals("a0"));
assertTrue(foreach.getSchema().getField(1).alias.equals("a1"));
}
@Test
// See PIG-767
public void testCogroupSchema1() throws Exception {
String query = ("a = load '1.txt' as (a0, a1);" +
"b = group a by a0;" +
"store b into 'empty';");
LogicalPlan newLogicalPlan = buildPlan(query);
Operator store = newLogicalPlan.getSinks().get(0);
LOCogroup cogroup = (LOCogroup)newLogicalPlan.getPredecessors(store).get(0);
LogicalSchema cogroupSchema = cogroup.getSchema();
assertEquals(cogroupSchema.getField(1).type, DataType.BAG);
assertTrue(cogroupSchema.getField(1).alias.equals("a"));
LogicalSchema bagSchema = cogroupSchema.getField(1).schema;
assertEquals(bagSchema.getField(0).type, DataType.TUPLE);
assertEquals(bagSchema.getField(0).alias, null);
LogicalSchema tupleSchema = bagSchema.getField(0).schema;
assertEquals(tupleSchema.size(), 2);
}
@Test
// See PIG-767
public void testCogroupSchema2() throws Exception {
String query = "a = load '1.txt' as (a0, a1);" +
"b = group a by a0;" +
"c = foreach b generate a.a1;" +
"store c into 'empty';";
LogicalPlan newLogicalPlan = buildPlan(query);
Operator store = newLogicalPlan.getSinks().get(0);
LOForEach foreach = (LOForEach)newLogicalPlan.getPredecessors(store).get(0);
LogicalSchema foreachSchema = foreach.getSchema();
assertEquals(foreachSchema.getField(0).type, DataType.BAG);
LogicalSchema bagSchema = foreachSchema.getField(0).schema;
assertEquals(bagSchema.getField(0).type, DataType.TUPLE);
assertEquals(bagSchema.getField(0).alias, null);
LogicalSchema tupleSchema = bagSchema.getField(0).schema;
assertEquals(tupleSchema.size(), 1);
assertTrue(tupleSchema.getField(0).alias.equals("a1"));
}
}