blob: 8379b4e63b4265af684f8904faa17621d1b30b4e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.List;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.DepthFirstWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.expression.AndExpression;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.EqualExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
import org.apache.pig.newplan.logical.optimizer.AllSameRalationalNodesVisitor;
import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher;
import org.apache.pig.newplan.logical.optimizer.SchemaPatcher;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
import org.junit.Before;
import org.junit.Test;
/**
* Tests for PlanTransformListerns
*
*/
public class TestNewPlanListener {
private LogicalPlan lp;
private LogicalPlan changedPlan;
/* (non-Javadoc)
* @see junit.framework.TestCase#setUp()
*/
@Before
public void setUp() throws Exception {
// Build a plan that looks like it has just been transformed
// It is roughly the logical plan for
// A = load 'bla' as (x);
// B = load 'morebla' as (y);
// C = join A on x, B on y;
// D = filter C by y > 0;
// The plan is built with the filter having been pushed above the join
// but the listners not yet having been called.
// A = load
lp = new LogicalPlan();
LogicalSchema aschema = new LogicalSchema();
aschema.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
aschema.getField(0).uid = 1;
LOLoad A = new LOLoad(null, lp);
A.neverUseForRealSetSchema(aschema);
lp.add(A);
// B = load
LogicalSchema bschema = new LogicalSchema();
bschema.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
bschema.getField(0).uid = 2;
LOLoad B = new LOLoad(null, lp);
B.neverUseForRealSetSchema(bschema);
lp.add(B);
// C = join
LogicalSchema cschema = new LogicalSchema();
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.INTEGER));
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.INTEGER));
cschema.getField(0).uid = 1;
cschema.getField(1).uid = 2;
MultiMap<Integer, LogicalExpressionPlan> mm =
new MultiMap<Integer, LogicalExpressionPlan>();
LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true});
LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
ProjectExpression x = new ProjectExpression(aprojplan, 0, 0, C);
x.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null,
DataType.INTEGER, 1));
LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
ProjectExpression y = new ProjectExpression(bprojplan, 1, 0, C);
y.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null,
DataType.INTEGER, 2));
mm.put(0, aprojplan);
mm.put(1, bprojplan);
C.neverUseForRealSetSchema(cschema);
// Don't add it to the plan quite yet
// D = filter
LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
LOFilter D = new LOFilter(lp, filterPlan);
ProjectExpression fy = new ProjectExpression(filterPlan, 0, 1, D);
fy.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null,
DataType.INTEGER, 2));
ConstantExpression fc = new ConstantExpression(filterPlan, new Integer(0));
new EqualExpression(filterPlan, fy, fc);
D.neverUseForRealSetSchema(cschema);
// Connect D to B, since the transform has happened.
lp.add(D);
lp.connect(B, D);
// Now add in C, connected to A and D.
lp.add(C);
lp.connect(A, C);
lp.connect(D, C);
changedPlan = new LogicalPlan();
changedPlan.add(D);
changedPlan.add(C);
changedPlan.connect(D, C);
}
private static class SillySameVisitor extends AllSameRalationalNodesVisitor {
StringBuffer buf = new StringBuffer();
SillySameVisitor(OperatorPlan plan) throws FrontendException {
super(plan, new DepthFirstWalker(plan));
}
@Override
protected void execute(LogicalRelationalOperator op) throws FrontendException {
buf.append(op.getName());
buf.append(" ");
}
@Override
public String toString() {
return buf.toString();
}
}
// Test that the AllSameVisitor calls execute on every node
// in the plan.
@Test
public void testAllSameVisitor() throws FrontendException {
SillySameVisitor v = new SillySameVisitor(lp);
v.visit();
assertTrue("LOLoad LOJoin(HASH) LOLoad LOFilter ".equals(v.toString()) ||
"LOLoad LOFilter LOJoin(HASH) LOLoad ".equals(v.toString()));
}
private static class SillyExpressionVisitor extends LogicalExpressionVisitor {
StringBuffer buf;
protected SillyExpressionVisitor(OperatorPlan p, StringBuffer b) throws FrontendException {
super(p, new DepthFirstWalker(p));
buf = b;
}
@Override
public void visit(AndExpression andExpr) throws FrontendException {
buf.append("and ");
}
@Override
public void visit(EqualExpression equal) throws FrontendException {
buf.append("equal ");
}
@Override
public void visit(ProjectExpression p) throws FrontendException {
buf.append("proj ");
}
@Override
public void visit(ConstantExpression c) throws FrontendException {
buf.append("const ");
}
}
private static class SillyAllExpressionVisitor extends AllExpressionVisitor {
StringBuffer buf = new StringBuffer();
public SillyAllExpressionVisitor(OperatorPlan plan) throws FrontendException {
super(plan, new DepthFirstWalker(plan));
}
@Override
protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) throws FrontendException {
return new SillyExpressionVisitor(expr, buf);
}
@Override
public String toString() {
return buf.toString();
}
}
// Test that the AllExpressionVisitor executes on every
// expression in the plan
@Test
public void testAllExpressionVisitor() throws FrontendException {
SillyAllExpressionVisitor v = new SillyAllExpressionVisitor(lp);
v.visit();
assertTrue("proj proj equal proj const ".equals(v.toString()) ||
"equal proj const proj proj ".equals(v.toString()));
}
// Test that schemas are patched up after a transform
@Test
public void testSchemaPatcher() throws FrontendException {
SchemaPatcher patcher = new SchemaPatcher();
patcher.transformed(lp, changedPlan);
// Check that the filter now has the proper schema.
List<Operator> roots = changedPlan.getSources();
assertEquals(1, roots.size());
LOFilter D = (LOFilter)roots.get(0);
assertNotNull(D);
LogicalSchema dschema = D.getSchema();
assertEquals(1, dschema.size());
LogicalSchema.LogicalFieldSchema y = dschema.getField(0);
assertEquals("y", y.alias);
assertEquals(2, y.uid);
}
// Test that projections are patched up after a transform
@Test
public void testProjectionPatcher() throws FrontendException {
ProjectionPatcher patcher = new ProjectionPatcher();
patcher.transformed(lp, changedPlan);
// Check that the projections in filter are now set properly
List<Operator> roots = changedPlan.getSources();
assertEquals(1, roots.size());
LOFilter D = (LOFilter)roots.get(0);
assertNotNull(D);
LogicalExpressionPlan filterPlan = D.getFilterPlan();
List<Operator> leaves = filterPlan.getSinks();
assertEquals(2, leaves.size());
ProjectExpression proj = null;
for (Operator leaf : leaves) {
if (leaf instanceof ProjectExpression) {
proj = (ProjectExpression)leaf;
break;
}
}
assertNotNull(proj);
assertEquals(0, proj.getInputNum());
assertEquals(0, proj.getColNum());
}
}