blob: fe19d13e5763e3fc3fe7d98d23fe19a974e3c2a6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.apache.pig.newplan.logical.relational.LOTestHelper.newLOLoad;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashSet;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.logical.expression.AndExpression;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.EqualExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
import org.junit.Before;
import org.junit.Test;
/**
* Test end to end logical optimizations.
*/
public class TestNewPlanLogicalOptimizer {
Configuration conf = null;
@Before
public void setUp() throws Exception {
PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
pc.connect();
conf = new Configuration(
ConfigurationUtil.toConfiguration(pc.getFs().getConfiguration())
);
}
@Test
public void testFilterPushDown() throws IOException {
// A logical plan for:
// A = load 'bla' as (x, y);
// B = load 'morebla' as (a, b);
// C = join A on x, B on a;
// D = filter C by x = a and x = 0 and b = 1 and y = b;
// store D into 'whatever';
// A = load
LogicalPlan lp = new LogicalPlan();
{
LogicalSchema aschema = new LogicalSchema();
aschema.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.BYTEARRAY));
aschema.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.BYTEARRAY));
LOLoad A = newLOLoad(new FileSpec("bla", new FuncSpec("PigStorage", "\t")), aschema, lp, conf);
A.setAlias("A");
lp.add(A);
// B = load
LogicalSchema bschema = new LogicalSchema();
bschema.addField(new LogicalSchema.LogicalFieldSchema(
"a", null, DataType.BYTEARRAY));
bschema.addField(new LogicalSchema.LogicalFieldSchema(
"b", null, DataType.BYTEARRAY));
LOLoad B = newLOLoad(new FileSpec("morebla", new FuncSpec("PigStorage", "\t")), bschema, lp, conf);
B.setAlias("B");
lp.add(B);
// C = join
LogicalSchema cschema = new LogicalSchema();
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.BYTEARRAY));
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.BYTEARRAY));
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"a", null, DataType.BYTEARRAY));
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"b", null, DataType.BYTEARRAY));
MultiMap<Integer, LogicalExpressionPlan> mm =
new MultiMap<Integer, LogicalExpressionPlan>();
LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true});
new ProjectExpression(aprojplan, 0, 0, C);
LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
new ProjectExpression(bprojplan, 1, 0, C);
mm.put(0, aprojplan);
mm.put(1, bprojplan);
C.setAlias("C");
lp.add(C);
lp.connect(A, C);
lp.connect(B, C);
// D = filter
LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
LOFilter D = new LOFilter(lp, filterPlan);
ProjectExpression fx = new ProjectExpression(filterPlan, 0, 0, D);
ConstantExpression fc0 = new ConstantExpression(filterPlan, new Integer(0));
EqualExpression eq1 = new EqualExpression(filterPlan, fx, fc0);
ProjectExpression fanotherx = new ProjectExpression(filterPlan, 0, 0, D);
ProjectExpression fa = new ProjectExpression(filterPlan, 0, 2, D);
EqualExpression eq2 = new EqualExpression(filterPlan, fanotherx, fa);
AndExpression and1 = new AndExpression(filterPlan, eq1, eq2);
ProjectExpression fb = new ProjectExpression(filterPlan, 0, 3, D);
ConstantExpression fc1 = new ConstantExpression(filterPlan, new Integer(1));
EqualExpression eq3 = new EqualExpression(filterPlan, fb, fc1);
AndExpression and2 = new AndExpression(filterPlan, and1, eq3);
ProjectExpression fanotherb = new ProjectExpression(filterPlan, 0, 3, D);
ProjectExpression fy = new ProjectExpression(filterPlan, 0, 1, D);
EqualExpression eq4 = new EqualExpression(filterPlan, fy, fanotherb);
new AndExpression(filterPlan, and2, eq4);
D.setAlias("D");
// Connect D to B, since the transform has happened.
lp.add(D);
lp.connect(C, D);
}
System.out.println(lp);
LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(lp, 500, null);
optimizer.optimize();
LogicalPlan expected = new LogicalPlan();
{
// A = load
LogicalSchema aschema = new LogicalSchema();
aschema.addField(new LogicalSchema.LogicalFieldSchema(
"x", null, DataType.BYTEARRAY));
aschema.addField(new LogicalSchema.LogicalFieldSchema(
"y", null, DataType.BYTEARRAY));
LOLoad A = newLOLoad(new FileSpec("bla", new FuncSpec("PigStorage", "\t")), aschema, expected, conf);
expected.add(A);
// DA = filter
LogicalExpressionPlan DAfilterPlan = new LogicalExpressionPlan();
LOFilter DA = new LOFilter(expected, DAfilterPlan);
ProjectExpression fx = new ProjectExpression(DAfilterPlan, 0, 0, DA);
fx.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null, DataType.BYTEARRAY));
ConstantExpression fc0 = new ConstantExpression(DAfilterPlan, new Integer(0));
new EqualExpression(DAfilterPlan, fx, fc0);
DA.neverUseForRealSetSchema(aschema);
expected.add(DA);
expected.connect(A, DA);
// A = foreach
LOForEach foreachA = org.apache.pig.newplan.logical.Util.addForEachAfter(expected, DA, 0, new HashSet<Integer>());
foreachA.setAlias("A");
foreachA.neverUseForRealSetSchema(aschema);
// B = load
LogicalSchema bschema = new LogicalSchema();
bschema.addField(new LogicalSchema.LogicalFieldSchema(
"a", null, DataType.BYTEARRAY));
bschema.addField(new LogicalSchema.LogicalFieldSchema(
"b", null, DataType.BYTEARRAY));
LOLoad B = newLOLoad(new FileSpec("morebla", new FuncSpec("PigStorage", "\t")), bschema, expected, conf);
expected.add(B);
// DB = filter
LogicalExpressionPlan DBfilterPlan = new LogicalExpressionPlan();
LOFilter DB = new LOFilter(expected, DBfilterPlan);
ProjectExpression fb = new ProjectExpression(DBfilterPlan, 0, 1, DB);
fb.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null, DataType.BYTEARRAY));
ConstantExpression fc1 = new ConstantExpression(DBfilterPlan, new Integer(1));
new EqualExpression(DBfilterPlan, fb, fc1);
DB.neverUseForRealSetSchema(bschema);
expected.add(DB);
expected.connect(B, DB);
// B = foreach
LOForEach foreachB = org.apache.pig.newplan.logical.Util.addForEachAfter(expected, DB, 0, new HashSet<Integer>());
foreachB.setAlias("B");
foreachB.neverUseForRealSetSchema(bschema);
// C = join
LogicalSchema cschema = new LogicalSchema();
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"D::x", null, DataType.BYTEARRAY));
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"D::y", null, DataType.BYTEARRAY));
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"null::a", null, DataType.BYTEARRAY));
cschema.addField(new LogicalSchema.LogicalFieldSchema(
"null::b", null, DataType.BYTEARRAY));
cschema.getField(0).uid = 1;
cschema.getField(1).uid = 2;
cschema.getField(2).uid = 3;
cschema.getField(3).uid = 4;
LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
MultiMap<Integer, LogicalExpressionPlan> mm =
new MultiMap<Integer, LogicalExpressionPlan>();
LOJoin C = new LOJoin(expected, mm, JOINTYPE.HASH, new boolean[] {true, true});
ProjectExpression x = new ProjectExpression(aprojplan, 0, 0, C);
x.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null, DataType.BYTEARRAY));
LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
ProjectExpression y = new ProjectExpression(bprojplan, 1, 0, C);
y.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null, DataType.BYTEARRAY));
mm.put(0, aprojplan);
mm.put(1, bprojplan);
C.neverUseForRealSetSchema(cschema);
expected.add(C);
expected.connect(foreachA, C);
expected.connect(foreachB, C);
// D = filter
LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
LOFilter D = new LOFilter(expected, filterPlan);
ProjectExpression fanotherx = new ProjectExpression(filterPlan, 0, 0, D);
fanotherx.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null, DataType.BYTEARRAY));
ProjectExpression fa = new ProjectExpression(filterPlan, 0, 2, D);
fa.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null, DataType.BYTEARRAY));
EqualExpression eq2 = new EqualExpression(filterPlan, fanotherx, fa);
ProjectExpression fanotherb = new ProjectExpression(filterPlan, 0, 3, D);
fanotherb.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null, DataType.BYTEARRAY));
ProjectExpression fy = new ProjectExpression(filterPlan, 0, 1, D);
fy.neverUseForRealSetFieldSchema(new LogicalFieldSchema(null, null, DataType.BYTEARRAY));
EqualExpression eq4 = new EqualExpression(filterPlan, fy, fanotherb);
new AndExpression(filterPlan, eq2, eq4);
D.neverUseForRealSetSchema(cschema);
expected.add(D);
expected.connect(C, D);
}
SchemaResetter schemaResetter = new SchemaResetter(lp);
schemaResetter.visit();
schemaResetter = new SchemaResetter(expected);
schemaResetter.visit();
assertTrue( lp.isEqual(expected) );
}
}