blob: e5bcc009188792382edb9a0e5baa39ff96118c32 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.apache.pig.newplan.logical.relational.LOTestHelper.newLOLoad;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.BaseOperatorPlan;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.DepthFirstWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanEdge;
import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.newplan.PlanWalker;
import org.apache.pig.newplan.ReverseDependencyOrderWalker;
import org.apache.pig.newplan.logical.expression.AndExpression;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.EqualExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.junit.Before;
import org.junit.Test;
public class TestNewPlanOperatorPlan {
public static class FooLoad extends PigStorage {
public FooLoad(String[] params) {
}
}
private static class SillyPlan extends BaseOperatorPlan {
SillyPlan() {
super();
}
}
static public class DummyLoad extends PigStorage {
public DummyLoad(String a, String b) {
}
public DummyLoad(String a) {
}
}
private static class SillyOperator extends Operator {
private String name;
SillyOperator(String n, SillyPlan p) {
super(n, p);
name = n;
}
@Override
public void accept(PlanVisitor v) {
if (v instanceof SillyVisitor) {
((SillyVisitor)v).visitSillyOperator(this);
}
}
@Override
public boolean isEqual(Operator operator) {
return (name.compareTo(operator.getName()) == 0);
}
}
private static class SillyVisitor extends PlanVisitor {
StringBuffer buf;
protected SillyVisitor(OperatorPlan plan, PlanWalker walker) {
super(plan, walker);
buf = new StringBuffer();
}
public void visitSillyOperator(SillyOperator so) {
buf.append(so.getName());
}
public String getVisitPattern() {
return buf.toString();
}
}
Configuration conf = null;
@Before
public void setUp() throws Exception {
PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
pc.connect();
conf = new Configuration(
ConfigurationUtil.toConfiguration(pc.getFs().getConfiguration())
);
}
// Tests for PlanEdge
@Test
public void testPlanEdgeInsert() {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
PlanEdge edges = new PlanEdge();
// Test initial entry
edges.put(fred, joe, 0);
Collection<Operator> c = edges.get(fred);
assertEquals(1, c.size());
Operator[] a = new Operator[1];
Operator[] b = c.toArray(a);
assertEquals(joe, b[0]);
// Test entry with no position
SillyOperator bob = new SillyOperator("bob", plan);
edges.put(fred, bob);
c = edges.get(fred);
assertEquals(2, c.size());
a = new Operator[2];
b = c.toArray(a);
assertEquals(joe, b[0]);
assertEquals(bob, b[1]);
// Test entry with position
SillyOperator jill = new SillyOperator("jill", plan);
edges.put(fred, jill, 1);
c = edges.get(fred);
assertEquals(3, c.size());
a = new Operator[3];
b = c.toArray(a);
assertEquals(joe, b[0]);
assertEquals(jill, b[1]);
assertEquals(bob, b[2]);
}
// Test that entry with invalid position cannot be made.
@Test
public void testPlanEdgeInsertFirstIndexBad() {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
PlanEdge edges = new PlanEdge();
boolean caught = false;
try {
edges.put(fred, joe, 1);
} catch (IndexOutOfBoundsException e) {
caught = true;
}
assertTrue(caught);
caught = false;
edges.put(fred, joe);
SillyOperator bob = new SillyOperator("bob", plan);
try {
edges.put(fred, bob, 2);
} catch (IndexOutOfBoundsException e) {
caught = true;
}
assertTrue(caught);
}
// Test OperatorPlan
@Test
public void testOperatorPlan() throws FrontendException {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
SillyOperator bob = new SillyOperator("bob", plan);
SillyOperator jim = new SillyOperator("jim", plan);
SillyOperator sam = new SillyOperator("sam", plan);
// Test that roots and leaves are empty when there are no operators in
// plan.
List<Operator> list = plan.getSources();
assertEquals(0, list.size());
list = plan.getSinks();
assertEquals(0, list.size());
plan.add(fred);
plan.add(joe);
plan.add(bob);
plan.add(jim);
plan.add(sam);
// Test that when not connected all nodes are roots and leaves.
list = plan.getSources();
assertEquals(5, list.size());
list = plan.getSinks();
assertEquals(5, list.size());
// Connect them up
plan.connect(fred, bob);
plan.connect(joe, bob);
plan.connect(bob, jim);
plan.connect(bob, sam);
// Check that the roots and leaves came out right
list = plan.getSources();
assertEquals(2, list.size());
for (Operator op : list) {
assertTrue(fred.isEqual(op) || joe.isEqual(op));
}
list = plan.getSinks();
assertEquals(2, list.size());
for (Operator op : list) {
assertTrue(jim.isEqual(op) || sam.isEqual(op));
}
// Check each of their successors and predecessors
list = plan.getSuccessors(fred);
assertEquals(1, list.size());
assertEquals(bob, list.get(0));
list = plan.getSuccessors(joe);
assertEquals(1, list.size());
assertEquals(bob, list.get(0));
list = plan.getPredecessors(jim);
assertEquals(1, list.size());
assertEquals(bob, list.get(0));
list = plan.getPredecessors(sam);
assertEquals(1, list.size());
assertEquals(bob, list.get(0));
list = plan.getPredecessors(bob);
assertEquals(2, list.size());
assertEquals(fred, list.get(0));
assertEquals(joe, list.get(1));
list = plan.getSuccessors(bob);
assertEquals(2, list.size());
assertEquals(jim, list.get(0));
assertEquals(sam, list.get(1));
// Now try swapping two, and check that all comes out as planned
Pair<Integer, Integer> p1 = plan.disconnect(bob, jim);
Pair<Integer, Integer> p2 = plan.disconnect(fred, bob);
plan.connect(bob, p1.first, fred, p1.second);
plan.connect(jim, p2.first, bob, p2.second);
// Check that the roots and leaves came out right
list = plan.getSources();
assertEquals(2, list.size());
for (Operator op : list) {
assertTrue(jim.isEqual(op) || joe.isEqual(op));
}
list = plan.getSinks();
assertEquals(2, list.size());
for (Operator op : list) {
assertTrue(fred.isEqual(op) || sam.isEqual(op));
}
// Check each of their successors and predecessors
list = plan.getSuccessors(jim);
assertEquals(1, list.size());
assertEquals(bob, list.get(0));
list = plan.getSuccessors(joe);
assertEquals(1, list.size());
assertEquals(bob, list.get(0));
list = plan.getPredecessors(fred);
assertEquals(1, list.size());
assertEquals(bob, list.get(0));
list = plan.getPredecessors(sam);
assertEquals(1, list.size());
assertEquals(bob, list.get(0));
list = plan.getPredecessors(bob);
assertEquals(2, list.size());
assertEquals(jim, list.get(0));
assertEquals(joe, list.get(1));
list = plan.getSuccessors(bob);
assertEquals(2, list.size());
assertEquals(fred, list.get(0));
assertEquals(sam, list.get(1));
}
@Test
public void testDisconnectAndRemove() throws FrontendException {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
SillyOperator bob = new SillyOperator("bob", plan);
plan.add(fred);
plan.add(joe);
plan.add(bob);
plan.connect(fred, joe);
plan.remove(bob);
plan.disconnect(fred, joe);
List<Operator> list = plan.getSources();
assertEquals(2, list.size());
list = plan.getSinks();
assertEquals(2, list.size());
plan.remove(fred);
plan.remove(joe);
assertEquals(0, plan.size());
list = plan.getSources();
assertEquals(0, list.size());
list = plan.getSinks();
assertEquals(0, list.size());
}
// Test bad remove
@Test
public void testRemoveNegative() {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
plan.add(fred);
plan.add(joe);
plan.connect(fred, joe);
boolean caught = false;
try {
plan.remove(fred);
} catch (FrontendException e) {
caught = true;
}
assertTrue(caught);
caught = false;
try {
plan.remove(joe);
} catch (FrontendException e) {
caught = true;
}
assertTrue(caught);
}
@Test
public void testDisconnectNegative() {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
plan.add(fred);
plan.add(joe);
boolean caught = false;
try {
plan.disconnect(fred, joe);
} catch (FrontendException e) {
caught = true;
}
assertTrue(caught);
}
// Tests for DependencyOrderWalker
@Test
public void testDependencyOrderWalkerLinear() throws FrontendException {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
SillyOperator bob = new SillyOperator("bob", plan);
plan.add(fred);
plan.add(joe);
plan.add(bob);
plan.connect(fred, joe);
plan.connect(joe, bob);
SillyVisitor v =
new SillyVisitor(plan, new DependencyOrderWalker(plan));
v.visit();
String s = v.getVisitPattern();
assertEquals("fredjoebob", s);
}
@Test
public void testDependencyOrderWalkerTree() throws FrontendException {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
SillyOperator bob = new SillyOperator("bob", plan);
SillyOperator jill = new SillyOperator("jill", plan);
SillyOperator jane = new SillyOperator("jane", plan);
plan.add(fred);
plan.add(joe);
plan.add(bob);
plan.add(jill);
plan.add(jane);
plan.connect(fred, bob);
plan.connect(joe, bob);
plan.connect(bob, jill);
plan.connect(jane, jill);
SillyVisitor v =
new SillyVisitor(plan, new DependencyOrderWalker(plan));
v.visit();
String s = v.getVisitPattern();
if (!s.equals("fredjoebobjanejill") &&
!s.equals("joefredbobjanejill") &&
!s.equals("janefredjoebobjill") &&
!s.equals("janejoefredbobjill")) {
System.out.println("Invalid order " + s);
fail();
}
}
@Test
public void testDependencyOrderWalkerGraph() throws FrontendException {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
SillyOperator bob = new SillyOperator("bob", plan);
SillyOperator jill = new SillyOperator("jill", plan);
SillyOperator jane = new SillyOperator("jane", plan);
plan.add(fred);
plan.add(joe);
plan.add(bob);
plan.add(jill);
plan.add(jane);
plan.connect(fred, bob);
plan.connect(joe, bob);
plan.connect(bob, jill);
plan.connect(bob, jane);
SillyVisitor v =
new SillyVisitor(plan, new DependencyOrderWalker(plan));
v.visit();
String s = v.getVisitPattern();
if (!s.equals("fredjoebobjanejill") &&
!s.equals("joefredbobjanejill") &&
!s.equals("fredjoebobjilljane") &&
!s.equals("joefredbobjilljane")) {
System.out.println("Invalid order " + s);
fail();
}
}
// Tests for DepthFirstWalker
@Test
public void testDepthFirstWalkerLinear() throws FrontendException {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
SillyOperator bob = new SillyOperator("bob", plan);
plan.add(fred);
plan.add(joe);
plan.add(bob);
plan.connect(fred, joe);
plan.connect(joe, bob);
SillyVisitor v =
new SillyVisitor(plan, new DepthFirstWalker(plan));
v.visit();
String s = v.getVisitPattern();
assertEquals("fredjoebob", s);
}
@Test
public void testDepthFirstWalkerTree() throws FrontendException {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
SillyOperator bob = new SillyOperator("bob", plan);
SillyOperator jill = new SillyOperator("jill", plan);
SillyOperator jane = new SillyOperator("jane", plan);
plan.add(fred);
plan.add(joe);
plan.add(bob);
plan.add(jill);
plan.add(jane);
plan.connect(fred, bob);
plan.connect(fred, joe);
plan.connect(joe, jill);
plan.connect(joe, jane);
SillyVisitor v =
new SillyVisitor(plan, new DepthFirstWalker(plan));
v.visit();
String s = v.getVisitPattern();
assertEquals("fredbobjoejilljane", s);
}
@Test
public void testDepthFirstWalkerGraph() throws FrontendException {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
SillyOperator bob = new SillyOperator("bob", plan);
SillyOperator jill = new SillyOperator("jill", plan);
SillyOperator jane = new SillyOperator("jane", plan);
plan.add(fred);
plan.add(joe);
plan.add(bob);
plan.add(jill);
plan.add(jane);
plan.connect(fred, bob);
plan.connect(joe, bob);
plan.connect(bob, jill);
plan.connect(bob, jane);
SillyVisitor v =
new SillyVisitor(plan, new DepthFirstWalker(plan));
v.visit();
String s = v.getVisitPattern();
if (!s.equals("fredbobjilljanejoe") &&
!s.equals("joebobjilljanefred")) {
System.out.println("Invalid order " + s);
fail();
}
}
// Tests for ReverseDependencyOrderWalker
@Test
public void testReverseDependencyOrderWalkerLinear() throws FrontendException {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
SillyOperator bob = new SillyOperator("bob", plan);
plan.add(fred);
plan.add(joe);
plan.add(bob);
plan.connect(fred, joe);
plan.connect(joe, bob);
SillyVisitor v =
new SillyVisitor(plan, new ReverseDependencyOrderWalker(plan));
v.visit();
String s = v.getVisitPattern();
assertEquals("bobjoefred", s);
}
@Test
public void testReverseDependencyOrderWalkerTree() throws FrontendException {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
SillyOperator bob = new SillyOperator("bob", plan);
SillyOperator jill = new SillyOperator("jill", plan);
SillyOperator jane = new SillyOperator("jane", plan);
plan.add(fred);
plan.add(joe);
plan.add(bob);
plan.add(jill);
plan.add(jane);
plan.connect(fred, bob);
plan.connect(joe, bob);
plan.connect(bob, jill);
plan.connect(jane, jill);
SillyVisitor v =
new SillyVisitor(plan, new ReverseDependencyOrderWalker(plan));
v.visit();
String s = v.getVisitPattern();
if (!s.equals("jilljanebobjoefred") &&
!s.equals("jilljanebobfredjoe") &&
!s.equals("jillbobjoefredjane") &&
!s.equals("jillbobjoejanefred") &&
!s.equals("jillbobfredjoejane") &&
!s.equals("jillbobfredjanejoe") &&
!s.equals("jillbobjanejoefred") &&
!s.equals("jillbobjanefredjoe")) {
System.out.println("Invalid order " + s);
fail();
}
}
@Test
public void testReverseDependencyOrderWalkerGraph() throws FrontendException {
SillyPlan plan = new SillyPlan();
SillyOperator fred = new SillyOperator("fred", plan);
SillyOperator joe = new SillyOperator("joe", plan);
SillyOperator bob = new SillyOperator("bob", plan);
SillyOperator jill = new SillyOperator("jill", plan);
SillyOperator jane = new SillyOperator("jane", plan);
plan.add(fred);
plan.add(joe);
plan.add(bob);
plan.add(jill);
plan.add(jane);
plan.connect(fred, bob);
plan.connect(joe, bob);
plan.connect(bob, jill);
plan.connect(bob, jane);
SillyVisitor v =
new SillyVisitor(plan, new ReverseDependencyOrderWalker(plan));
v.visit();
String s = v.getVisitPattern();
if (!s.equals("jilljanebobjoefred") &&
!s.equals("jilljanebobfredjoe") &&
!s.equals("janejillbobjoefred") &&
!s.equals("janejillbobfredjoe")) {
System.out.println("Invalid order " + s);
fail();
}
}
private static class TestLogicalVisitor extends LogicalRelationalNodesVisitor {
StringBuffer bf = new StringBuffer();
protected TestLogicalVisitor(OperatorPlan plan) throws FrontendException {
super(plan, new DepthFirstWalker(plan));
}
@Override
public void visit(LOLoad load) {
bf.append("load ");
}
String getVisitPlan() {
return bf.toString();
}
}
@Test
public void testLogicalPlanVisitor() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
LOLoad load = newLOLoad(null, null, lp, conf);
/*
* lp.add((LogicalRelationalOperator)null, load,
* (LogicalRelationalOperator)null);
*/
lp.add(load);
TestLogicalVisitor v = new TestLogicalVisitor(lp);
v.visit();
assertEquals("load ", v.getVisitPlan());
}
@Test
public void testBinaryOperatorOrder() throws FrontendException {
LogicalExpressionPlan ep = new LogicalExpressionPlan();
ConstantExpression c = new ConstantExpression(ep, new Integer(5));
ProjectExpression p = new ProjectExpression(ep, 0, 0, null);
EqualExpression e = new EqualExpression(ep, p, c);
assertEquals(p, e.getLhs());
assertEquals(c, e.getRhs());
}
private static class TestExpressionVisitor extends LogicalExpressionVisitor {
StringBuffer bf = new StringBuffer();
protected TestExpressionVisitor(OperatorPlan plan) throws FrontendException {
super(plan, new DepthFirstWalker(plan));
}
@Override
public void visit(AndExpression andExpr) {
bf.append("and ");
}
@Override
public void visit(EqualExpression equal) {
bf.append("equal ");
}
@Override
public void visit(ProjectExpression project) {
bf.append("project ");
}
@Override
public void visit(ConstantExpression constant) {
bf.append("constant ");
}
String getVisitPlan() {
return bf.toString();
}
}
@Test
public void testExpressionPlanVisitor() throws FrontendException {
LogicalExpressionPlan ep = new LogicalExpressionPlan();
ConstantExpression c = new ConstantExpression(ep, new Integer(5));
ProjectExpression p = new ProjectExpression(ep, 0, 0, null);
EqualExpression e = new EqualExpression(ep, p, c);
ConstantExpression c2 = new ConstantExpression(ep, new Boolean("true"));
new AndExpression(ep, e, c2);
TestExpressionVisitor v = new TestExpressionVisitor(ep);
v.visit();
assertEquals("and equal project constant constant ", v.getVisitPlan());
}
@Test
public void testExpressionEquality() throws FrontendException {
LogicalExpressionPlan ep1 = new LogicalExpressionPlan();
ConstantExpression c1 = new ConstantExpression(ep1, new Integer(5));
ProjectExpression p1 = new ProjectExpression(ep1, 0, 0, null);
EqualExpression e1 = new EqualExpression(ep1, p1, c1);
ConstantExpression ca1 = new ConstantExpression(ep1, new Boolean("true"));
AndExpression a1 = new AndExpression(ep1, e1, ca1);
LogicalExpressionPlan ep2 = new LogicalExpressionPlan();
ConstantExpression c2 = new ConstantExpression(ep2, new Integer(5));
ProjectExpression p2 = new ProjectExpression(ep2, 0, 0, null);
EqualExpression e2 = new EqualExpression(ep2, p2, c2);
ConstantExpression ca2 = new ConstantExpression(ep2, new Boolean("true"));
AndExpression a2 = new AndExpression(ep2, e2, ca2);
assertTrue(ep1.isEqual(ep2));
assertTrue(c1.isEqual(c2));
assertTrue(p1.isEqual(p2));
assertTrue(e1.isEqual(e2));
assertTrue(ca1.isEqual(ca2));
assertTrue(a1.isEqual(a2));
LogicalExpressionPlan ep3 = new LogicalExpressionPlan();
ConstantExpression c3 = new ConstantExpression(ep3, new Integer(3));
ProjectExpression p3 = new ProjectExpression(ep3, 0, 1, null);
EqualExpression e3 = new EqualExpression(ep3, p3, c3);
ConstantExpression ca3 = new ConstantExpression(ep3, "true");
AndExpression a3 = new AndExpression(ep3, e3, ca3);
assertFalse(ep1.isEqual(ep3));
assertFalse(c1.isEqual(c3));
assertFalse(p1.isEqual(p3));
assertFalse(e1.isEqual(e3));
assertFalse(ca1.isEqual(ca3));
assertFalse(a1.isEqual(a3));
LogicalExpressionPlan ep4 = new LogicalExpressionPlan();
ProjectExpression p4 = new ProjectExpression(ep4, 1, 0, null);
assertFalse(ep1.isEqual(ep4));
assertFalse(p1.isEqual(p4));
}
@Test
public void testRelationalEquality() throws FrontendException {
// Build a plan that is the logical plan for
// A = load 'bla' as (x);
// B = load 'morebla' as (y);
// C = join A on x, B on y;
// D = filter C by y > 0;
// A = load
LogicalPlan lp = new LogicalPlan();
{
LogicalSchema aschema = new LogicalSchema();
aschema.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "y" })), aschema,
lp, conf);
lp.add(A);
// B = load
LogicalSchema bschema = new LogicalSchema();
bschema.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LOLoad B = newLOLoad(new FileSpec("/def",
new FuncSpec("PigStorage", "\t")), bschema, lp, conf);
lp.add(B);
// C = join
LogicalSchema cschema = new LogicalSchema();
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
new ProjectExpression(aprojplan, 0, 0, null);
LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
new ProjectExpression(bprojplan, 1, 0, null);
MultiMap<Integer, LogicalExpressionPlan> mm =
new MultiMap<Integer, LogicalExpressionPlan>();
mm.put(0, aprojplan);
mm.put(1, bprojplan);
LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] { true, true });
C.neverUseForRealSetSchema(cschema);
lp.add(C);
lp.connect(A, C);
lp.connect(B, C);
// D = filter
LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
ProjectExpression fy = new ProjectExpression(filterPlan, 0, 1, null);
ConstantExpression fc = new ConstantExpression(filterPlan, new Integer(0));
new EqualExpression(filterPlan, fy, fc);
LOFilter D = new LOFilter(lp, filterPlan);
D.neverUseForRealSetSchema(cschema);
lp.add(D);
lp.connect(C, D);
}
// Build a second similar plan to test equality
// A = load
LogicalPlan lp1 = new LogicalPlan();
{
LogicalSchema aschema = new LogicalSchema();
aschema.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "y" })), aschema,
lp1, conf);
lp1.add(A);
// B = load
LogicalSchema bschema = new LogicalSchema();
bschema.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LOLoad B = newLOLoad(new FileSpec("/def",
new FuncSpec("PigStorage", "\t")), bschema, lp1, conf);
lp1.add(B);
// C = join
LogicalSchema cschema = new LogicalSchema();
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
new ProjectExpression(aprojplan, 0, 0, null);
LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
new ProjectExpression(bprojplan, 1, 0, null);
MultiMap<Integer, LogicalExpressionPlan> mm =
new MultiMap<Integer, LogicalExpressionPlan>();
mm.put(0, aprojplan);
mm.put(1, bprojplan);
LOJoin C = new LOJoin(lp1, mm, JOINTYPE.HASH, new boolean[] { true, true });
C.neverUseForRealSetSchema(cschema);
lp1.add(C);
lp1.connect(A, C);
lp1.connect(B, C);
// D = filter
LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
ProjectExpression fy = new ProjectExpression(filterPlan, 0, 1, null);
ConstantExpression fc = new ConstantExpression(filterPlan, new Integer(0));
new EqualExpression(filterPlan, fy, fc);
LOFilter D = new LOFilter(lp1, filterPlan);
D.neverUseForRealSetSchema(cschema);
lp1.add(D);
lp1.connect(C, D);
}
assertTrue(lp.isEqual(lp1));
}
@Test
public void testLoadEqualityDifferentFuncSpecCtorArgs() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
LogicalSchema aschema1 = new LogicalSchema();
aschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad load1 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "y" })), aschema1, lp,
conf);
lp.add(load1);
LOLoad load2 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "z" })), aschema1, lp,
conf);
lp.add(load2);
assertFalse(load1.isEqual(load2));
}
@Test
public void testLoadEqualityDifferentNumFuncSpecCstorArgs() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
LogicalSchema aschema1 = new LogicalSchema();
aschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad load1 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "y" })), aschema1, lp,
conf);
lp.add(load1);
LOLoad load3 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), "x")), aschema1, lp, conf);
lp.add(load3);
assertFalse(load1.isEqual(load3));
}
@Test
public void testLoadEqualityDifferentFunctionNames() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
LogicalSchema aschema1 = new LogicalSchema();
aschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad load1 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "y" })), aschema1, lp,
conf);
lp.add(load1);
// Different function names in FuncSpec
LOLoad load4 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "z" })), aschema1, lp,
conf);
lp.add(load4);
assertFalse(load1.isEqual(load4));
}
@Test
public void testLoadEqualityDifferentFileName() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
LogicalSchema aschema1 = new LogicalSchema();
aschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad load1 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "y" })), aschema1, lp,
conf);
lp.add(load1);
// Different file name
LOLoad load5 = newLOLoad(new FileSpec("/def",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "z" })), aschema1, lp,
conf);
lp.add(load5);
assertFalse(load1.isEqual(load5));
}
@Test
public void testRelationalEqualityDifferentSchema() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
LogicalSchema aschema1 = new LogicalSchema();
aschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad load1 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "y" })), aschema1, lp,
conf);
lp.add(load1);
// Different schema
LogicalSchema aschema2 = new LogicalSchema();
aschema2.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.CHARARRAY));
LOLoad load6 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "z" })), aschema2, lp,
conf);
lp.add(load6);
assertFalse(load1.isEqual(load6));
}
@Test
public void testRelationalEqualityNullSchemas() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
// Test that two loads with no schema are still equal
LOLoad load7 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "y" })), null, lp, conf);
lp.add(load7);
LOLoad load8 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "y" })), null, lp, conf);
lp.add(load8);
assertTrue(load7.isEqual(load8));
}
@Test
public void testRelationalEqualityOneNullOneNotNullSchema() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
LogicalSchema aschema1 = new LogicalSchema();
aschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad load1 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "y" })), aschema1, lp,
conf);
lp.add(load1);
// Test that one with schema and one without breaks equality
LOLoad load9 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "z" })), null, lp, conf);
lp.add(load9);
assertFalse(load1.isEqual(load9));
}
@Test
public void testFilterDifferentPredicates() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
LogicalExpressionPlan fp1 = new LogicalExpressionPlan();
ProjectExpression fy1 = new ProjectExpression(fp1, 0, 1, null);
ConstantExpression fc1 = new ConstantExpression(fp1,
new Integer(0));
new EqualExpression(fp1, fy1, fc1);
LOFilter D1 = new LOFilter(lp, fp1);
LogicalSchema cschema = new LogicalSchema();
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
D1.neverUseForRealSetSchema(cschema);
lp.add(D1);
LogicalExpressionPlan fp2 = new LogicalExpressionPlan();
ProjectExpression fy2 = new ProjectExpression(fp2, 0, 1, null);
ConstantExpression fc2 = new ConstantExpression(fp2,
new Integer(1));
new EqualExpression(fp2, fy2, fc2);
LOFilter D2 = new LOFilter(lp, fp2);
D2.neverUseForRealSetSchema(cschema);
lp.add(D2);
assertFalse(D1.isEqual(D2));
}
// No tests for LOStore because it tries to actually instantiate the store
// func, and I don't want to mess with that here.
@Test
public void testJoinDifferentJoinTypes() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
LogicalSchema jaschema1 = new LogicalSchema();
jaschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A1 = newLOLoad(new FileSpec("/abc",
new FuncSpec("org.apache.pig.test.TestNewPlanOperatorPlan$FooLoad", new String[] {
"x", "y" })), jaschema1, lp, conf);
lp.add(A1);
// B = load
LogicalSchema jbschema1 = new LogicalSchema();
jbschema1.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LOLoad B1 = newLOLoad(new FileSpec("/def",
new FuncSpec("PigStorage", "\t")), jbschema1, lp, conf);
lp.add(B1);
// C = join
LogicalSchema jcschema1 = new LogicalSchema();
jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LogicalExpressionPlan aprojplan1 = new LogicalExpressionPlan();
new ProjectExpression(aprojplan1, 0, 0, null);
LogicalExpressionPlan bprojplan1 = new LogicalExpressionPlan();
new ProjectExpression(bprojplan1, 1, 0, null);
MultiMap<Integer, LogicalExpressionPlan> mm1 =
new MultiMap<Integer, LogicalExpressionPlan>();
mm1.put(0, aprojplan1);
mm1.put(1, bprojplan1);
LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] { true, true });
C1.neverUseForRealSetSchema(jcschema1);
lp.add(C1);
lp.connect(A1, C1);
lp.connect(B1, C1);
// A = load
LogicalSchema jaschema2 = new LogicalSchema();
jaschema2.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A2 = newLOLoad(new FileSpec("/abc",
new FuncSpec("org.apache.pig.test.TestNewPlanOperatorPlan$FooLoad", new String[] {
"x", "y" })), jaschema2, lp, conf);
lp.add(A2);
// B = load
LogicalSchema jbschema2 = new LogicalSchema();
jbschema2.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LOLoad B2 = newLOLoad(new FileSpec("/def",
new FuncSpec("PigStorage", "\t")), jbschema2, lp, conf);
lp.add(B2);
// C = join
LogicalSchema jcschema2 = new LogicalSchema();
jcschema2.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
jcschema2.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LogicalExpressionPlan aprojplan2 = new LogicalExpressionPlan();
new ProjectExpression(aprojplan2, 0, 0, null);
LogicalExpressionPlan bprojplan2 = new LogicalExpressionPlan();
new ProjectExpression(bprojplan2, 1, 0, null);
MultiMap<Integer, LogicalExpressionPlan> mm2 =
new MultiMap<Integer, LogicalExpressionPlan>();
mm2.put(0, aprojplan2);
mm2.put(1, bprojplan2);
LOJoin C2 = new LOJoin(lp, mm2, JOINTYPE.SKEWED, new boolean[] { true, true });
C2.neverUseForRealSetSchema(jcschema2);
lp.add(C2);
lp.connect(A2, C2);
lp.connect(B2, C2);
assertFalse(C1.isEqual(C2));
}
@Test
public void testJoinDifferentInner() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
LogicalSchema jaschema1 = new LogicalSchema();
jaschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A1 = newLOLoad(new FileSpec("/abc",
new FuncSpec("org.apache.pig.test.TestNewPlanOperatorPlan$FooLoad", new String[] {
"x", "y" })), jaschema1, lp, conf);
lp.add(A1);
// B = load
LogicalSchema jbschema1 = new LogicalSchema();
jbschema1.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LOLoad B1 = newLOLoad(new FileSpec("/def",
new FuncSpec("PigStorage", "\t")), jbschema1, lp, conf);
lp.add(B1);
// C = join
LogicalSchema jcschema1 = new LogicalSchema();
jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LogicalExpressionPlan aprojplan1 = new LogicalExpressionPlan();
new ProjectExpression(aprojplan1, 0, 0, null);
LogicalExpressionPlan bprojplan1 = new LogicalExpressionPlan();
new ProjectExpression(bprojplan1, 1, 0, null);
MultiMap<Integer, LogicalExpressionPlan> mm1 =
new MultiMap<Integer, LogicalExpressionPlan>();
mm1.put(0, aprojplan1);
mm1.put(1, bprojplan1);
LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] { true, true });
C1.neverUseForRealSetSchema(jcschema1);
lp.add(C1);
lp.connect(A1, C1);
lp.connect(B1, C1);
// Test different inner status
// A = load
LogicalSchema jaschema3 = new LogicalSchema();
jaschema3.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A3 = newLOLoad(new FileSpec("/abc",
new FuncSpec("org.apache.pig.test.TestNewPlanOperatorPlan$FooLoad", new String[] {
"x", "y" })), jaschema3, lp, conf);
lp.add(A3);
// B = load
LogicalSchema jbschema3 = new LogicalSchema();
jbschema3.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LOLoad B3 = newLOLoad(new FileSpec("/def",
new FuncSpec("PigStorage", "\t")), jbschema3, lp, conf);
lp.add(B3);
// C = join
LogicalSchema jcschema3 = new LogicalSchema();
jcschema3.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
jcschema3.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LogicalExpressionPlan aprojplan3 = new LogicalExpressionPlan();
new ProjectExpression(aprojplan3, 0, 0, null);
LogicalExpressionPlan bprojplan3 = new LogicalExpressionPlan();
new ProjectExpression(bprojplan3, 1, 0, null);
MultiMap<Integer, LogicalExpressionPlan> mm3 =
new MultiMap<Integer, LogicalExpressionPlan>();
mm3.put(0, aprojplan3);
mm3.put(1, bprojplan3);
LOJoin C3 = new LOJoin(lp, mm3, JOINTYPE.HASH, new boolean[] { true, false });
C3.neverUseForRealSetSchema(jcschema3);
lp.add(C3);
lp.connect(A3, C3);
lp.connect(B3, C3);
assertFalse(C1.isEqual(C3));
}
@Test
public void testJoinDifferentNumInputs() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
LogicalSchema jaschema1 = new LogicalSchema();
jaschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A1 = newLOLoad(new FileSpec("/abc",
new FuncSpec("org.apache.pig.test.TestNewPlanOperatorPlan$FooLoad", new String[] {
"x", "y" })), jaschema1, lp, conf);
lp.add(A1);
// B = load
LogicalSchema jbschema1 = new LogicalSchema();
jbschema1.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LOLoad B1 = newLOLoad(new FileSpec("/def",
new FuncSpec("PigStorage", "\t")), jbschema1, lp, conf);
lp.add(B1);
// C = join
LogicalSchema jcschema1 = new LogicalSchema();
jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LogicalExpressionPlan aprojplan1 = new LogicalExpressionPlan();
new ProjectExpression(aprojplan1, 0, 0, null);
LogicalExpressionPlan bprojplan1 = new LogicalExpressionPlan();
new ProjectExpression(bprojplan1, 1, 0, null);
MultiMap<Integer, LogicalExpressionPlan> mm1 =
new MultiMap<Integer, LogicalExpressionPlan>();
mm1.put(0, aprojplan1);
mm1.put(1, bprojplan1);
LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] { true, true });
C1.neverUseForRealSetSchema(jcschema1);
lp.add(C1);
lp.connect(A1, C1);
lp.connect(B1, C1);
// A = load
LogicalSchema jaschema5 = new LogicalSchema();
jaschema5.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A5 = newLOLoad(new FileSpec("/abc",
new FuncSpec("org.apache.pig.test.TestNewPlanOperatorPlan$FooLoad", new String[] {
"x", "y" })), jaschema5, lp, conf);
lp.add(A5);
// B = load
LogicalSchema jbschema5 = new LogicalSchema();
jbschema5.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LOLoad B5 = newLOLoad(new FileSpec("/def",
new FuncSpec("PigStorage", "\t")), jbschema5, lp, conf);
lp.add(B5);
// Beta = load
LogicalSchema jbetaschema5 = new LogicalSchema();
jbetaschema5.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LOLoad Beta5 = newLOLoad(new FileSpec("/ghi",
new FuncSpec("PigStorage", "\t")), jbetaschema5, lp, conf);
lp.add(Beta5);
// C = join
LogicalSchema jcschema5 = new LogicalSchema();
jcschema5.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
jcschema5.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LogicalExpressionPlan aprojplan5 = new LogicalExpressionPlan();
new ProjectExpression(aprojplan5, 0, 0, null);
LogicalExpressionPlan bprojplan5 = new LogicalExpressionPlan();
new ProjectExpression(bprojplan5, 1, 0, null);
LogicalExpressionPlan betaprojplan5 = new LogicalExpressionPlan();
new ProjectExpression(betaprojplan5, 1, 0, null);
MultiMap<Integer, LogicalExpressionPlan> mm5 =
new MultiMap<Integer, LogicalExpressionPlan>();
mm5.put(0, aprojplan5);
mm5.put(1, bprojplan5);
mm5.put(2, betaprojplan5);
LOJoin C5 = new LOJoin(lp, mm5, JOINTYPE.HASH, new boolean[] { true, true });
C5.neverUseForRealSetSchema(jcschema5);
lp.add(C5);
lp.connect(A5, C5);
lp.connect(B5, C5);
lp.connect(Beta5, C5);
assertFalse(C1.isEqual(C5));
}
@Test
public void testJoinDifferentJoinKeys() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
// Test different join keys
LogicalSchema jaschema6 = new LogicalSchema();
jaschema6.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A6 = newLOLoad(new FileSpec("/abc",
new FuncSpec("org.apache.pig.test.TestNewPlanOperatorPlan$FooLoad", new String[] {
"x", "y" })), jaschema6, lp, conf);
lp.add(A6);
// B = load
LogicalSchema jbschema6 = new LogicalSchema();
jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
"z", null, DataType.LONG));
LOLoad B6 = newLOLoad(new FileSpec("/def",
new FuncSpec("PigStorage", "\t")), jbschema6, lp, conf);
lp.add(B6);
// C = join
LogicalSchema jcschema6 = new LogicalSchema();
jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LogicalExpressionPlan aprojplan6 = new LogicalExpressionPlan();
new ProjectExpression(aprojplan6, 0, 0, null);
LogicalExpressionPlan bprojplan6 = new LogicalExpressionPlan();
new ProjectExpression(bprojplan6, 1, 0, null);
LogicalExpressionPlan b2projplan6 = new LogicalExpressionPlan();
new ProjectExpression(b2projplan6, 1, 1, null);
MultiMap<Integer, LogicalExpressionPlan> mm6 =
new MultiMap<Integer, LogicalExpressionPlan>();
mm6.put(0, aprojplan6);
mm6.put(1, bprojplan6);
mm6.put(1, b2projplan6);
LOJoin C6 = new LOJoin(lp, mm6, JOINTYPE.HASH, new boolean[] { true, true });
C6.neverUseForRealSetSchema(jcschema6);
lp.add(C6);
lp.connect(A6, C6);
lp.connect(B6, C6);
LogicalSchema jaschema7 = new LogicalSchema();
jaschema7.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A7 = newLOLoad(new FileSpec("/abc",
new FuncSpec("org.apache.pig.test.TestNewPlanOperatorPlan$FooLoad", new String[] {
"x", "y" })), jaschema7, lp, conf);
lp.add(A7);
// B = load
LogicalSchema jbschema7 = new LogicalSchema();
jbschema7.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
jbschema7.addField(new LogicalSchema.LogicalFieldSchema(
"z", null, DataType.LONG));
LOLoad B7 = newLOLoad(new FileSpec("/def",
new FuncSpec("PigStorage", "\t")), jbschema7, lp, conf);
lp.add(B7);
// C = join
LogicalSchema jcschema7 = new LogicalSchema();
jcschema7.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
jcschema7.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LogicalExpressionPlan aprojplan7 = new LogicalExpressionPlan();
new ProjectExpression(aprojplan7, 0, 0, null);
LogicalExpressionPlan bprojplan7 = new LogicalExpressionPlan();
new ProjectExpression(bprojplan7, 1, 1, null);
LogicalExpressionPlan b2projplan7 = new LogicalExpressionPlan();
new ProjectExpression(b2projplan7, 1, 0, null);
MultiMap<Integer, LogicalExpressionPlan> mm7 =
new MultiMap<Integer, LogicalExpressionPlan>();
mm7.put(0, aprojplan7);
mm7.put(1, bprojplan7);
mm7.put(1, b2projplan7);
LOJoin C7 = new LOJoin(lp, mm7, JOINTYPE.HASH, new boolean[] { true, true });
C7.neverUseForRealSetSchema(jcschema7);
lp.add(C7);
lp.connect(A7, C7);
lp.connect(B7, C7);
assertFalse(C6.isEqual(C7));
}
@Test
public void testJoinDifferentNumJoinKeys() throws FrontendException {
LogicalPlan lp = new LogicalPlan();
// Test different join keys
LogicalSchema jaschema6 = new LogicalSchema();
jaschema6.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A6 = newLOLoad(new FileSpec("/abc",
new FuncSpec("org.apache.pig.test.TestNewPlanOperatorPlan$FooLoad", new String[] {
"x", "y" })), jaschema6, lp, conf);
lp.add(A6);
// B = load
LogicalSchema jbschema6 = new LogicalSchema();
jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
"z", null, DataType.LONG));
LOLoad B6 = newLOLoad(new FileSpec("/def",
new FuncSpec("PigStorage", "\t")), jbschema6, lp, conf);
lp.add(B6);
// C = join
LogicalSchema jcschema6 = new LogicalSchema();
jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LogicalExpressionPlan aprojplan6 = new LogicalExpressionPlan();
new ProjectExpression(aprojplan6, 0, 0, null);
LogicalExpressionPlan bprojplan6 = new LogicalExpressionPlan();
new ProjectExpression(bprojplan6, 1, 0, null);
LogicalExpressionPlan b2projplan6 = new LogicalExpressionPlan();
new ProjectExpression(b2projplan6, 1, 1, null);
MultiMap<Integer, LogicalExpressionPlan> mm6 =
new MultiMap<Integer, LogicalExpressionPlan>();
mm6.put(0, aprojplan6);
mm6.put(1, bprojplan6);
mm6.put(1, b2projplan6);
LOJoin C6 = new LOJoin(lp, mm6, JOINTYPE.HASH, new boolean[] { true, true });
C6.neverUseForRealSetSchema(jcschema6);
lp.add(C6);
lp.connect(A6, C6);
lp.connect(B6, C6);
// Test different different number of join keys
LogicalSchema jaschema8 = new LogicalSchema();
jaschema8.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A8 = newLOLoad(new FileSpec("/abc",
new FuncSpec("org.apache.pig.test.TestNewPlanOperatorPlan$FooLoad", new String[] {
"x", "y" })), jaschema8, lp, conf);
lp.add(A8);
// B = load
LogicalSchema jbschema8 = new LogicalSchema();
jbschema8.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
jbschema8.addField(new LogicalSchema.LogicalFieldSchema(
"z", null, DataType.LONG));
LOLoad B8 = newLOLoad(new FileSpec("/def",
new FuncSpec("PigStorage", "\t")), jbschema8, lp, conf);
lp.add(B8);
// C = join
LogicalSchema jcschema8 = new LogicalSchema();
jcschema8.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
jcschema8.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
LogicalExpressionPlan aprojplan8 = new LogicalExpressionPlan();
new ProjectExpression(aprojplan8, 0, 0, null);
LogicalExpressionPlan bprojplan8 = new LogicalExpressionPlan();
new ProjectExpression(bprojplan8, 1, 0, null);
MultiMap<Integer, LogicalExpressionPlan> mm8 =
new MultiMap<Integer, LogicalExpressionPlan>();
mm8.put(0, aprojplan8);
mm8.put(1, bprojplan8);
LOJoin C8 = new LOJoin(lp, mm8, JOINTYPE.HASH, new boolean[] { true, true });
C8.neverUseForRealSetSchema(jcschema8);
lp.add(C8);
lp.connect(A8, C8);
lp.connect(B8, C8);
assertFalse(C6.isEqual(C8));
}
@Test
public void testRelationalSameOpDifferentPreds() throws FrontendException {
LogicalPlan lp1 = new LogicalPlan();
LogicalSchema aschema1 = new LogicalSchema();
aschema1.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
LOLoad A1 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "y" })), aschema1, lp1,
conf);
lp1.add(A1);
LogicalExpressionPlan fp1 = new LogicalExpressionPlan();
ProjectExpression fy1 = new ProjectExpression(fp1, 0, 0, null);
ConstantExpression fc1 = new ConstantExpression(fp1,
new Integer(0));
new EqualExpression(fp1, fy1, fc1);
LOFilter D1 = new LOFilter(lp1, fp1);
LogicalSchema cschema = new LogicalSchema();
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
D1.neverUseForRealSetSchema(cschema);
lp1.add(D1);
lp1.connect(A1, D1);
LogicalPlan lp2 = new LogicalPlan();
LOLoad A2 = newLOLoad(new FileSpec("/abc",
new FuncSpec(DummyLoad.class.getName(), new String[] { "x", "z" })), null, lp2,
conf);
lp2.add(A2);
LogicalExpressionPlan fp2 = new LogicalExpressionPlan();
ProjectExpression fy2 = new ProjectExpression(fp2, 0, 0, null);
ConstantExpression fc2 = new ConstantExpression(fp2,
new Integer(0));
new EqualExpression(fp2, fy2, fc2);
LOFilter D2 = new LOFilter(lp2, fp2);
D2.neverUseForRealSetSchema(cschema);
lp2.add(D2);
lp2.connect(A2, D2);
assertTrue(D1.isEqual(D2));
}
@Test
public void testReplace1() throws FrontendException {
// has multiple inputs
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator load2 = new SillyOperator("load2", plan);
Operator filter1 = new SillyOperator("filter1", plan);
Operator join1 = new SillyOperator("join1", plan);
Operator filter2 = new SillyOperator("filter2", plan);
plan.add(load1);
plan.add(load2);
plan.add(filter1);
plan.add(filter2);
plan.add(join1);
plan.connect(load1, join1);
plan.connect(load2, filter1);
plan.connect(filter1, join1);
plan.connect(join1, filter2);
Operator join2 = new SillyOperator("join2", plan);
plan.replace(join1, join2);
List<Operator> preds = plan.getPredecessors(join2);
assertEquals(2, preds.size());
assertTrue(preds.contains(load1));
assertTrue(preds.contains(filter1));
List<Operator> succs = plan.getSuccessors(join2);
assertEquals(1, succs.size());
assertTrue(succs.contains(filter2));
}
@Test
public void testReplace2() throws FrontendException {
// has multiple outputs
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator split1 = new SillyOperator("split1", plan);
Operator filter1 = new SillyOperator("filter1", plan);
Operator filter2 = new SillyOperator("filter2", plan);
plan.add(load1);
plan.add(split1);
plan.add(filter1);
plan.add(filter2);
plan.connect(load1, split1);
plan.connect(split1, filter1);
plan.connect(split1, filter2);
Operator split2 = new SillyOperator("split2", plan);
plan.replace(split1, split2);
List<Operator> preds = plan.getPredecessors(split2);
assertEquals(1, preds.size());
assertTrue(preds.contains(load1));
List<Operator> succs = plan.getSuccessors(split2);
assertEquals(2, succs.size());
assertTrue(succs.contains(filter1));
assertTrue(succs.contains(filter2));
}
@Test
public void testReplace3() throws FrontendException {
// single input/output
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator filter1 = new SillyOperator("filter1", plan);
Operator filter2 = new SillyOperator("filter2", plan);
plan.add(load1);
plan.add(filter1);
plan.add(filter2);
plan.connect(load1, filter1);
plan.connect(filter1, filter2);
Operator filter3 = new SillyOperator("filter3", plan);
plan.replace(filter1, filter3);
List<Operator> preds = plan.getPredecessors(filter3);
assertEquals(1, preds.size());
assertTrue(preds.contains(load1));
List<Operator> succs = plan.getSuccessors(filter3);
assertEquals(1, succs.size());
assertTrue(succs.contains(filter2));
}
@Test
public void testReplace4() throws FrontendException {
// output is null
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator filter1 = new SillyOperator("filter1", plan);
Operator filter2 = new SillyOperator("filter2", plan);
plan.add(load1);
plan.add(filter1);
plan.add(filter2);
plan.connect(load1, filter1);
plan.connect(filter1, filter2);
Operator filter3 = new SillyOperator("filter3", plan);
plan.replace(filter2, filter3);
List<Operator> preds = plan.getPredecessors(filter3);
assertEquals(1, preds.size());
assertTrue(preds.contains(filter1));
List<Operator> succs = plan.getSuccessors(filter3);
assertNull(succs);
}
@Test
public void testReplace5() throws FrontendException {
// input is null
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator filter1 = new SillyOperator("filter1", plan);
Operator filter2 = new SillyOperator("filter2", plan);
plan.add(load1);
plan.add(filter1);
plan.add(filter2);
plan.connect(load1, filter1);
plan.connect(filter1, filter2);
Operator load2 = new SillyOperator("load2", plan);
plan.replace(load1, load2);
List<Operator> preds = plan.getPredecessors(load2);
assertNull(preds);
List<Operator> succs = plan.getSuccessors(load2);
assertEquals(1, succs.size());
assertTrue(succs.contains(filter1));
}
@Test
public void testReplace6() throws FrontendException {
// has multiple inputs/outputs
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator load2 = new SillyOperator("load2", plan);
Operator filter1 = new SillyOperator("filter1", plan);
// fake operator to take multiple inputs/outputs
Operator fake1 = new SillyOperator("fake1", plan);
Operator filter2 = new SillyOperator("filter2", plan);
Operator filter3 = new SillyOperator("filter3", plan);
plan.add(load1);
plan.add(load2);
plan.add(filter1);
plan.add(filter2);
plan.add(filter3);
plan.add(fake1);
plan.connect(load1, fake1);
plan.connect(load2, filter1);
plan.connect(filter1, fake1);
plan.connect(fake1, filter2);
plan.connect(fake1, filter3);
Operator fake2 = new SillyOperator("fake2", plan);
plan.replace(fake1, fake2);
List<Operator> preds = plan.getPredecessors(fake2);
assertEquals(2, preds.size());
assertTrue(preds.contains(load1));
assertTrue(preds.contains(filter1));
List<Operator> succs = plan.getSuccessors(fake2);
assertEquals(2, succs.size());
assertTrue(succs.contains(filter2));
assertTrue(succs.contains(filter3));
}
@Test
public void testRemove1() throws FrontendException {
// single input/output
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator load2 = new SillyOperator("load2", plan);
Operator filter1 = new SillyOperator("filter1", plan);
Operator join1 = new SillyOperator("join1", plan);
Operator filter2 = new SillyOperator("filter2", plan);
plan.add(load1);
plan.add(load2);
plan.add(filter1);
plan.add(filter2);
plan.add(join1);
plan.connect(load1, join1);
plan.connect(load2, filter1);
plan.connect(filter1, join1);
plan.connect(join1, filter2);
plan.removeAndReconnect(filter1);
List<Operator> preds = plan.getPredecessors(join1);
assertEquals(2, preds.size());
assertTrue(preds.contains(load2));
}
@Test
public void testRemove2() throws FrontendException {
// input is null
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator load2 = new SillyOperator("load2", plan);
Operator filter1 = new SillyOperator("filter1", plan);
Operator join1 = new SillyOperator("join1", plan);
Operator filter2 = new SillyOperator("filter2", plan);
plan.add(load1);
plan.add(load2);
plan.add(filter1);
plan.add(filter2);
plan.add(join1);
plan.connect(load1, join1);
plan.connect(load2, filter1);
plan.connect(filter1, join1);
plan.connect(join1, filter2);
plan.removeAndReconnect(load1);
List<Operator> preds = plan.getPredecessors(join1);
assertEquals(1, preds.size());
assertTrue(preds.contains(filter1));
plan.removeAndReconnect(filter1);
preds = plan.getPredecessors(join1);
assertEquals(1, preds.size());
assertTrue(preds.contains(load2));
}
@Test
public void testRemove3() throws FrontendException {
// output is null
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator filter1 = new SillyOperator("filter1", plan);
Operator filter2 = new SillyOperator("filter2", plan);
plan.add(load1);
plan.add(filter1);
plan.add(filter2);
plan.connect(load1, filter1);
plan.connect(filter1, filter2);
plan.removeAndReconnect(filter2);
List<Operator> succs = plan.getSuccessors(filter2);
assertNull(succs);
}
@Test
public void testRemove4() throws FrontendException {
// has multiple inputs
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator load2 = new SillyOperator("load2", plan);
Operator filter1 = new SillyOperator("filter1", plan);
Operator join1 = new SillyOperator("join1", plan);
Operator fake1 = new SillyOperator("fake1", plan);
plan.add(load1);
plan.add(load2);
plan.add(filter1);
plan.add(join1);
plan.connect(load1, join1);
plan.connect(load2, filter1);
plan.connect(filter1, join1);
plan.connect(join1, fake1);
plan.removeAndReconnect(join1);
List<Operator> preds = plan.getPredecessors(fake1);
assertEquals(2, preds.size());
assertTrue(preds.contains(load1));
assertTrue(preds.contains(filter1));
}
@Test
public void testRemove5() throws FrontendException {
// has multiple outputs
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator split1 = new SillyOperator("split1", plan);
Operator split2 = new SillyOperator("split2", plan);
Operator filter1 = new SillyOperator("filter1", plan);
Operator filter2 = new SillyOperator("filter2", plan);
plan.add(load1);
plan.add(split1);
plan.add(filter1);
plan.add(filter2);
plan.connect(load1, split1);
plan.connect(split1, split2);
plan.connect(split2, filter1);
plan.connect(split2, filter2);
plan.removeAndReconnect(split2);
List<Operator> succs = plan.getSuccessors(split1);
assertEquals(2, succs.size());
assertTrue(succs.contains(filter1));
assertTrue(succs.contains(filter2));
}
@Test(expected = FrontendException.class)
public void testRemove6() throws FrontendException {
// has multiple inputs/outputs
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator load2 = new SillyOperator("load2", plan);
Operator fake1 = new SillyOperator("fake1", plan);
Operator filter1 = new SillyOperator("filter1", plan);
Operator filter2 = new SillyOperator("filter2", plan);
plan.add(load1);
plan.add(load2);
plan.add(fake1);
plan.add(filter1);
plan.add(filter2);
plan.connect(load1, fake1);
plan.connect(load2, fake1);
plan.connect(fake1, filter1);
plan.connect(fake1, filter2);
try {
plan.removeAndReconnect(fake1);
} catch (FrontendException e) {
assertEquals(2256, e.getErrorCode());
throw e;
}
}
@Test
public void testInsertBetween1() throws FrontendException {
// single input
SillyPlan plan = new SillyPlan();
Operator load1 = new SillyOperator("load1", plan);
Operator filter1 = new SillyOperator("filter1", plan);
plan.add(load1);
plan.add(filter1);
plan.connect(load1, filter1);
Operator filter2 = new SillyOperator("filter2", plan);
plan.insertBetween(load1, filter2, filter1);
List<Operator> succs = plan.getSuccessors(filter2);
assertEquals(1, succs.size());
assertTrue(succs.contains(filter1));
List<Operator> preds = plan.getPredecessors(filter2);
assertEquals(1, preds.size());
assertTrue(preds.contains(load1));
}
}