blob: 5c2b76baf4f7f3ff6385ceb45546d7b82ed4e7e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LimitAdjuster;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.builtin.AVG;
import org.apache.pig.builtin.COUNT;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.SUM;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.builtin.GFCross;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.test.junit.OrderedJUnit4Runner;
import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.TestHelper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
/**
* Test cases to test the MRCompiler.
* VERY IMPORTANT NOTE: The tests here compare results with a
* "golden" set of outputs. In each testcase here, the operators
* generated have a random operator key which uses Java's Random
* class. So if there is a code change which changes the number of
* operators created in a plan, then not only will the "golden" file
* for that test case need to be changed, but also for the tests
* that follow it since the operator keys that will be generated through
* Random will be different.
*/
@RunWith(OrderedJUnit4Runner.class)
@TestOrder({
"testRun1",
"testRun2",
"testSpl1",
"testSpl2",
"testSpl3",
"testSim1",
"testSim2",
"testSim3",
"testSim4",
"testSim5",
"testSim6",
"testSim7",
"testSim8",
"testSim9",
"testSortUDF1",
"testDistinct1",
"testLimit",
"testMRCompilerErr",
"testMRCompilerErr1",
"testNumReducersInLimit",
"testNumReducersInLimitWithParallel",
"testUDFInJoin",
"testMergeJoin",
"testMergeJoinWithIndexableLoadFunc",
"testCastFuncShipped",
"testLimitAdjusterFuncShipped",
"testSortedDistinctInForeach",
"testUDFInMergedCoGroup",
"testUDFInMergedJoin",
"testSchemaInStoreForDistinctLimit",
"testStorerLimit",
"testFetchOptimizerSideEffect"})
public class TestMRCompiler {
static MiniGenericCluster cluster;
static PigContext pc;
static PigContext pcMR;
static final int MAX_SIZE = 100000;
static final long SEED = 1013;
static final Random r = new Random(SEED);
PigServer pigServer = null;
PigServer pigServerMR = null;
// if for some reason, the golden files need
// to be regenerated, set this to true - THIS
// WILL OVERWRITE THE GOLDEN FILES - So use this
// with caution and only for the testcases you need
// and are sure of
private boolean generate = false;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
cluster = MiniGenericCluster.buildCluster();
pc = new PigContext(ExecType.LOCAL, new Properties());
pcMR = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
pc.connect();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
cluster.shutDown();
}
@Before
public void setUp() throws ExecException {
GenPhyOp.setR(r);
GenPhyOp.setPc(pc);
// Set random seed to generate deterministic temporary paths
FileLocalizer.setR(new Random(1331L));
NodeIdGenerator.reset("");
pigServer = new PigServer(pc);
pigServerMR = new PigServer(pcMR);
}
@After
public void shutdown() {
pigServer.shutdown();
}
@Test
public void testRun1() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan part1 = new PhysicalPlan();
POLoad lC = GenPhyOp.topLoadOp();
POFilter fC = GenPhyOp.topFilterOp();
fC.setRequestedParallelism(20);
POLocalRearrange lrC = GenPhyOp.topLocalRearrangeOp();
lrC.setRequestedParallelism(10);
POGlobalRearrange grC = GenPhyOp.topGlobalRearrangeOp();
POPackage pkC = GenPhyOp.topPackageOp();
part1.add(lC);
part1.add(fC);
part1.connect(lC, fC);
part1.add(lrC);
part1.connect(fC, lrC);
part1.add(grC);
part1.connect(lrC, grC);
part1.add(pkC);
part1.connect(grC, pkC);
POPackage pkD = GenPhyOp.topPackageOp();
pkD.setRequestedParallelism(20);
POLocalRearrange lrD = GenPhyOp.topLocalRearrangeOp();
lrD.setRequestedParallelism(30);
POGlobalRearrange grD = GenPhyOp.topGlobalRearrangeOp();
POLoad lD = GenPhyOp.topLoadOp();
part1.add(lD);
part1.add(lrD);
part1.connect(lD, lrD);
part1.add(grD);
part1.connect(lrD, grD);
part1.add(pkD);
part1.connect(grD, pkD);
POPackage pkCD = GenPhyOp.topPackageOp();
POLocalRearrange lrCD1 = GenPhyOp.topLocalRearrangeOp();
POLocalRearrange lrCD2 = GenPhyOp.topLocalRearrangeOp();
POGlobalRearrange grCD = GenPhyOp.topGlobalRearrangeOp();
part1.add(lrCD1);
part1.add(lrCD2);
part1.connect(pkC, lrCD1);
part1.connect(pkD, lrCD2);
part1.add(grCD);
part1.connect(lrCD1, grCD);
part1.connect(lrCD2, grCD);
part1.add(pkCD);
part1.connect(grCD, pkCD);
POLoad lA = GenPhyOp.topLoadOp();
POLoad lB = GenPhyOp.topLoadOp();
// POLoad lC = lA;
POFilter fA = GenPhyOp.topFilterOp();
POLocalRearrange lrA = GenPhyOp.topLocalRearrangeOp();
POLocalRearrange lrB = GenPhyOp.topLocalRearrangeOp();
POGlobalRearrange grAB = GenPhyOp.topGlobalRearrangeOp();
POPackage pkAB = GenPhyOp.topPackageOp();
POFilter fAB = GenPhyOp.topFilterOp();
POUnion unABC = GenPhyOp.topUnionOp();
php.add(lA);
php.add(lB);
php.add(fA);
php.connect(lA, fA);
php.add(lrA);
php.add(lrB);
php.connect(fA, lrA);
php.connect(lB, lrB);
php.add(grAB);
php.connect(lrA, grAB);
php.connect(lrB, grAB);
php.add(pkAB);
php.connect(grAB, pkAB);
php.add(fAB);
php.connect(pkAB, fAB);
php.merge(part1);
List<PhysicalOperator> leaves = new ArrayList<PhysicalOperator>();
for (PhysicalOperator phyOp : php.getLeaves()) {
leaves.add(phyOp);
}
php.add(unABC);
for (PhysicalOperator physicalOperator : leaves) {
php.connect(physicalOperator, unABC);
}
POStore st = GenPhyOp.topStoreOp();
php.add(st);
php.connect(unABC, st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC10.gld");
}
@Test
public void testRun2() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan part1 = new PhysicalPlan();
POLoad lC = GenPhyOp.topLoadOp();
POFilter fC = GenPhyOp.topFilterOp();
POLocalRearrange lrC = GenPhyOp.topLocalRearrangeOp();
POGlobalRearrange grC = GenPhyOp.topGlobalRearrangeOp();
POPackage pkC = GenPhyOp.topPackageOp();
part1.add(lC);
part1.add(fC);
part1.connect(lC, fC);
part1.add(lrC);
part1.connect(fC, lrC);
part1.add(grC);
part1.connect(lrC, grC);
part1.add(pkC);
part1.connect(grC, pkC);
POPackage pkD = GenPhyOp.topPackageOp();
POLocalRearrange lrD = GenPhyOp.topLocalRearrangeOp();
POGlobalRearrange grD = GenPhyOp.topGlobalRearrangeOp();
POLoad lD = GenPhyOp.topLoadOp();
part1.add(lD);
part1.add(lrD);
part1.connect(lD, lrD);
part1.add(grD);
part1.connect(lrD, grD);
part1.add(pkD);
part1.connect(grD, pkD);
part1.connect(pkD, grC);
POLoad lA = GenPhyOp.topLoadOp();
POLoad lB = GenPhyOp.topLoadOp();
// POLoad lC = lA;
POFilter fA = GenPhyOp.topFilterOp();
POLocalRearrange lrA = GenPhyOp.topLocalRearrangeOp();
POLocalRearrange lrB = GenPhyOp.topLocalRearrangeOp();
POGlobalRearrange grAB = GenPhyOp.topGlobalRearrangeOp();
POPackage pkAB = GenPhyOp.topPackageOp();
POFilter fAB = GenPhyOp.topFilterOp();
POUnion unABC = GenPhyOp.topUnionOp();
php.add(lA);
php.add(lB);
php.add(fA);
php.connect(lA, fA);
php.add(lrA);
php.add(lrB);
php.connect(fA, lrA);
php.connect(lB, lrB);
php.add(grAB);
php.connect(lrA, grAB);
php.connect(lrB, grAB);
php.add(pkAB);
php.connect(grAB, pkAB);
php.add(fAB);
php.connect(pkAB, fAB);
php.merge(part1);
List<PhysicalOperator> leaves = new ArrayList<PhysicalOperator>();
for (PhysicalOperator phyOp : php.getLeaves()) {
leaves.add(phyOp);
}
php.add(unABC);
for (PhysicalOperator physicalOperator : leaves) {
php.connect(physicalOperator, unABC);
}
POStore st = GenPhyOp.topStoreOp();
php.add(st);
php.connect(unABC, st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC11.gld");
}
@Test
public void testSpl1() throws Exception {
PhysicalPlan php = new PhysicalPlan();
POLoad lA = GenPhyOp.topLoadOp();
POSplit spl = GenPhyOp.topSplitOp();
php.add(lA);
php.add(spl);
php.connect(lA, spl);
POFilter fl1 = GenPhyOp.topFilterOp();
POFilter fl2 = GenPhyOp.topFilterOp();
php.add(fl1);
php.add(fl2);
php.connect(spl, fl1);
php.connect(spl, fl2);
POLocalRearrange lr1 = GenPhyOp.topLocalRearrangeOp();
POLocalRearrange lr2 = GenPhyOp.topLocalRearrangeOp();
php.add(lr1);
php.add(lr2);
php.connect(fl1, lr1);
php.connect(fl2, lr2);
POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
php.add(gr);
php.connect(lr1, gr);
php.connect(lr2, gr);
POPackage pk = GenPhyOp.topPackageOp();
php.add(pk);
php.connect(gr, pk);
POStore st = GenPhyOp.topStoreOp();
php.add(st);
php.connect(pk, st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC12.gld");
}
@Test
public void testSpl2() throws Exception {
PhysicalPlan php = new PhysicalPlan();
POLoad lA = GenPhyOp.topLoadOp();
POSplit spl = GenPhyOp.topSplitOp();
php.add(lA);
php.add(spl);
php.connect(lA, spl);
POFilter fl1 = GenPhyOp.topFilterOp();
POFilter fl2 = GenPhyOp.topFilterOp();
php.add(fl1);
php.add(fl2);
php.connect(spl, fl1);
php.connect(spl, fl2);
POLocalRearrange lr1 = GenPhyOp.topLocalRearrangeOp();
POLocalRearrange lr2 = GenPhyOp.topLocalRearrangeOp();
php.add(lr1);
php.add(lr2);
php.connect(fl1, lr1);
php.connect(fl2, lr2);
POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
php.addAsLeaf(gr);
POPackage pk = GenPhyOp.topPackageOp();
php.addAsLeaf(pk);
POSplit sp2 = GenPhyOp.topSplitOp();
php.addAsLeaf(sp2);
POFilter fl3 = GenPhyOp.topFilterOp();
POFilter fl4 = GenPhyOp.topFilterOp();
php.add(fl3);
php.add(fl4);
php.connect(sp2, fl3);
php.connect(sp2, fl4);
POUnion un = GenPhyOp.topUnionOp();
php.addAsLeaf(un);
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC13.gld");
}
@Test
public void testSpl3() throws Exception {
PhysicalPlan php = new PhysicalPlan();
POLoad lA = GenPhyOp.topLoadOp();
POSplit spl = GenPhyOp.topSplitOp();
php.add(lA);
php.add(spl);
php.connect(lA, spl);
POFilter fl1 = GenPhyOp.topFilterOp();
fl1.setRequestedParallelism(10);
POFilter fl2 = GenPhyOp.topFilterOp();
fl2.setRequestedParallelism(20);
php.add(fl1);
php.add(fl2);
php.connect(spl, fl1);
php.connect(spl, fl2);
POSplit sp11 = GenPhyOp.topSplitOp();
POSplit sp21 = GenPhyOp.topSplitOp();
php.add(sp11);
php.add(sp21);
php.connect(fl1, sp11);
php.connect(fl2, sp21);
POFilter fl11 = GenPhyOp.topFilterOp();
fl11.setRequestedParallelism(10);
POFilter fl21 = GenPhyOp.topFilterOp();
fl21.setRequestedParallelism(20);
POFilter fl22 = GenPhyOp.topFilterOp();
fl22.setRequestedParallelism(30);
php.add(fl11);
php.add(fl21);
php.add(fl22);
php.connect(sp11, fl11);
php.connect(sp21, fl21);
php.connect(sp21, fl22);
POLocalRearrange lr1 = GenPhyOp.topLocalRearrangeOp();
lr1.setRequestedParallelism(40);
POLocalRearrange lr21 = GenPhyOp.topLocalRearrangeOp();
lr21.setRequestedParallelism(15);
POLocalRearrange lr22 = GenPhyOp.topLocalRearrangeOp();
lr22.setRequestedParallelism(35);
php.add(lr1);
php.add(lr21);
php.add(lr22);
php.connect(fl11, lr1);
php.connect(fl21, lr21);
php.connect(fl22, lr22);
POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
php.addAsLeaf(gr);
POPackage pk = GenPhyOp.topPackageOp();
pk.setRequestedParallelism(25);
php.addAsLeaf(pk);
POSplit sp2 = GenPhyOp.topSplitOp();
php.addAsLeaf(sp2);
POFilter fl3 = GenPhyOp.topFilterOp();
fl3.setRequestedParallelism(100);
POFilter fl4 = GenPhyOp.topFilterOp();
fl4.setRequestedParallelism(80);
php.add(fl3);
php.add(fl4);
php.connect(sp2, fl3);
php.connect(sp2, fl4);
POUnion un = GenPhyOp.topUnionOp();
php.addAsLeaf(un);
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC14.gld");
}
// Tests Single input case for both blocking and non-blocking
// with both map and reduce phases
@Test
public void testSim1() throws Exception {
PhysicalPlan php = new PhysicalPlan();
POLoad ld = GenPhyOp.topLoadOp();
php.add(ld);
PhysicalPlan grpChain1 = GenPhyOp.grpChain();
php.merge(grpChain1);
php.connect(ld, grpChain1.getRoots().get(0));
PhysicalOperator leaf = php.getLeaves().get(0);
PhysicalPlan grpChain2 = GenPhyOp.grpChain();
php.merge(grpChain2);
php.connect(leaf, grpChain2.getRoots().get(0));
leaf = php.getLeaves().get(0);
POFilter fl = GenPhyOp.topFilterOp();
php.add(fl);
php.connect(leaf, fl);
POStore st = GenPhyOp.topStoreOp();
php.add(st);
php.connect(fl, st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC1.gld");
}
@Test
public void testSim2() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan ldGrpChain1 = GenPhyOp.loadedGrpChain();
PhysicalPlan ldGrpChain2 = GenPhyOp.loadedGrpChain();
php.merge(ldGrpChain1);
php.merge(ldGrpChain2);
POUnion un = GenPhyOp.topUnionOp();
php.addAsLeaf(un);
POStore st = GenPhyOp.topStoreOp();
php.add(st);
php.connect(un, st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC2.gld");
}
@Test
public void testSim3() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan ldGrpChain1 = GenPhyOp.loadedGrpChain();
PhysicalPlan ldGrpChain2 = GenPhyOp.loadedGrpChain();
php.merge(ldGrpChain1);
php.merge(ldGrpChain2);
POUnion un = GenPhyOp.topUnionOp();
php.addAsLeaf(un);
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
php.merge(ldFil1);
php.connect(ldFil1.getLeaves().get(0), un);
POStore st = GenPhyOp.topStoreOp();
php.add(st);
php.connect(un, st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC3.gld");
}
@Test
public void testSim4() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan ldGrpChain1 = GenPhyOp.loadedGrpChain();
PhysicalPlan ldGrpChain2 = GenPhyOp.loadedGrpChain();
php.merge(ldGrpChain1);
php.merge(ldGrpChain2);
POUnion un = GenPhyOp.topUnionOp();
php.addAsLeaf(un);
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
PhysicalPlan ldFil2 = GenPhyOp.loadedFilter();
php.merge(ldFil1);
php.connect(ldFil1.getLeaves().get(0), un);
php.merge(ldFil2);
php.connect(ldFil2.getLeaves().get(0), un);
POStore st = GenPhyOp.topStoreOp();
php.add(st);
php.connect(un, st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC4.gld");
}
@Test
public void testSim5() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
PhysicalPlan ldFil2 = GenPhyOp.loadedFilter();
php.merge(ldFil1);
php.merge(ldFil2);
POUnion un = GenPhyOp.topUnionOp();
php.addAsLeaf(un);
POStore st = GenPhyOp.topStoreOp();
php.add(st);
php.connect(un, st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC5.gld");
}
@Test
public void testSim6() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan ldGrpChain1 = GenPhyOp.loadedGrpChain();
PhysicalPlan ldGrpChain2 = GenPhyOp.loadedGrpChain();
POLocalRearrange lr1 = GenPhyOp.topLocalRearrangeOp();
POLocalRearrange lr2 = GenPhyOp.topLocalRearrangeOp();
ldGrpChain1.addAsLeaf(lr1);
ldGrpChain2.addAsLeaf(lr2);
php.merge(ldGrpChain1);
php.merge(ldGrpChain2);
POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
php.addAsLeaf(gr);
POPackage pk = GenPhyOp.topPackageOp();
php.addAsLeaf(pk);
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC6.gld");
}
@Test
public void testSim7() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan ldGrpChain1 = GenPhyOp.loadedGrpChain();
PhysicalPlan ldGrpChain2 = GenPhyOp.loadedGrpChain();
POLocalRearrange lr1 = GenPhyOp.topLocalRearrangeOp();
POLocalRearrange lr2 = GenPhyOp.topLocalRearrangeOp();
ldGrpChain1.addAsLeaf(lr1);
ldGrpChain2.addAsLeaf(lr2);
php.merge(ldGrpChain1);
php.merge(ldGrpChain2);
POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
php.addAsLeaf(gr);
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
php.merge(ldFil1);
php.connect(ldFil1.getLeaves().get(0), gr);
POPackage pk = GenPhyOp.topPackageOp();
php.addAsLeaf(pk);
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC7.gld");
}
@Test
public void testSim8() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan ldGrpChain1 = GenPhyOp.loadedGrpChain();
PhysicalPlan ldGrpChain2 = GenPhyOp.loadedGrpChain();
POLocalRearrange lr1 = GenPhyOp.topLocalRearrangeOp();
POLocalRearrange lr2 = GenPhyOp.topLocalRearrangeOp();
ldGrpChain1.addAsLeaf(lr1);
ldGrpChain2.addAsLeaf(lr2);
php.merge(ldGrpChain1);
php.merge(ldGrpChain2);
POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
php.addAsLeaf(gr);
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
PhysicalPlan ldFil2 = GenPhyOp.loadedFilter();
php.merge(ldFil1);
php.connect(ldFil1.getLeaves().get(0), gr);
php.merge(ldFil2);
php.connect(ldFil2.getLeaves().get(0), gr);
POPackage pk = GenPhyOp.topPackageOp();
php.addAsLeaf(pk);
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC8.gld");
}
@Test
public void testSim9() throws Exception {
PhysicalPlan php = new PhysicalPlan();
POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
php.addAsLeaf(gr);
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
PhysicalPlan ldFil2 = GenPhyOp.loadedFilter();
php.merge(ldFil1);
php.connect(ldFil1.getLeaves().get(0), gr);
php.merge(ldFil2);
php.connect(ldFil2.getLeaves().get(0), gr);
POPackage pk = GenPhyOp.topPackageOp();
php.addAsLeaf(pk);
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC9.gld");
}
@Test
public void testSortUDF1() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
php.merge(ldFil1);
// set up order by *
String funcName = WeirdComparator.class.getName();
POUserComparisonFunc comparator = new POUserComparisonFunc(
new OperatorKey("", r.nextLong()), -1, null, new FuncSpec(funcName));
POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, ldFil1.getLeaves(),
null, new ArrayList<Boolean>(), comparator);
sort.setRequestedParallelism(20);
PhysicalPlan nesSortPlan = new PhysicalPlan();
POProject topPrj = new POProject(new OperatorKey("", r.nextLong()));
topPrj.setColumn(1);
topPrj.setOverloaded(true);
topPrj.setResultType(DataType.TUPLE);
nesSortPlan.add(topPrj);
POProject prjStar2 = new POProject(new OperatorKey("", r.nextLong()));
prjStar2.setResultType(DataType.TUPLE);
prjStar2.setStar(true);
nesSortPlan.add(prjStar2);
nesSortPlan.connect(topPrj, prjStar2);
List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();
nesSortPlanLst.add(nesSortPlan);
sort.setSortPlans(nesSortPlanLst);
php.add(sort);
php.connect(ldFil1.getLeaves().get(0), sort);
// have a foreach which takes the sort output
// and send it two two udfs
List<String> udfs = new ArrayList<String>();
udfs.add(COUNT.class.getName());
udfs.add(SUM.class.getName());
POForEach fe3 = GenPhyOp.topForEachOPWithUDF(udfs);
php.add(fe3);
php.connect(sort, fe3);
// add a group above the foreach
PhysicalPlan grpChain1 = GenPhyOp.grpChain();
php.merge(grpChain1);
php.connect(fe3,grpChain1.getRoots().get(0));
udfs.clear();
udfs.add(AVG.class.getName());
POForEach fe4 = GenPhyOp.topForEachOPWithUDF(udfs);
php.addAsLeaf(fe4);
PhysicalPlan grpChain2 = GenPhyOp.grpChain();
php.merge(grpChain2);
php.connect(fe4,grpChain2.getRoots().get(0));
udfs.clear();
udfs.add(GFCross.class.getName() + "('1')");
POForEach fe5 = GenPhyOp.topForEachOPWithUDF(udfs);
php.addAsLeaf(fe5);
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC15.gld");
}
@Test
public void testDistinct1() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
php.merge(ldFil1);
PODistinct op = new PODistinct(new OperatorKey("", r.nextLong()),
-1, null);
php.addAsLeaf(op);
PhysicalPlan grpChain1 = GenPhyOp.grpChain();
php.merge(grpChain1);
php.connect(op,grpChain1.getRoots().get(0));
PODistinct op1 = new PODistinct(new OperatorKey("", r.nextLong()),
-1, null);
php.addAsLeaf(op1);
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC16.gld");
}
@Test
public void testLimit() throws Exception {
PhysicalPlan php = new PhysicalPlan();
POLoad lC = GenPhyOp.topLoadOp();
php.add(lC);
POLimit op = new POLimit(new OperatorKey("", r.nextLong()),
-1, null);
php.add(op);
php.connect(lC, op);
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC17.gld");
}
@Test(expected = MRCompilerException.class)
public void testMRCompilerErr() throws Exception {
String query = "a = load 'input';" +
"b = filter a by $0 > 5;" +
"store b into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
pp.remove(pp.getRoots().get(0));
try {
Util.buildMRPlan(new PhysicalPlan(), pc);
} catch (MRCompilerException mrce) {
assertEquals(2053, mrce.getErrorCode());
throw mrce;
}
}
@Test(expected = MRCompilerException.class)
public void testMRCompilerErr1() throws Exception {
PhysicalPlan pp = new PhysicalPlan();
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
pp.merge(ldFil1);
POSplit op = GenPhyOp.topSplitOp();
pp.addAsLeaf(op);
try {
Util.buildMRPlan(pp, pc);
} catch (MRCompilerException mrce) {
assertEquals(2025, mrce.getErrorCode());
throw mrce;
}
}
/**
* Test to ensure that the order by without parallel followed by a limit, i.e., top k
* always produces the correct number of map reduce jobs. In the testcase below since
* we are running the unit test locally, we will get reduce parallelism as 1. So we will
* NOT introduce the extra MR job to do a final limit
*/
@Test
public void testNumReducersInLimit() throws Exception {
String query = "a = load 'input';" +
"b = order a by $0;" +
"c = limit b 10;" +
"store c into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper mrOper = mrPlan.getRoots().get(0);
int count = 1;
while(mrPlan.getSuccessors(mrOper) != null) {
mrOper = mrPlan.getSuccessors(mrOper).get(0);
++count;
}
assertEquals(3, count);
}
/**
* Test to ensure that the order by with parallel followed by a limit, i.e., top k
* always produces the correct number of map reduce jobs
*/
@Test
public void testNumReducersInLimitWithParallel() throws Exception {
String query = "a = load 'input';" +
"b = order a by $0 parallel 2;" +
"c = limit b 10;" + "store c into 'output';";
PhysicalPlan pp = Util.buildPp(pigServerMR, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
LimitAdjuster la = new LimitAdjuster(mrPlan, pc);
la.visit();
la.adjust();
MapReduceOper mrOper = mrPlan.getRoots().get(0);
int count = 1;
while(mrPlan.getSuccessors(mrOper) != null) {
mrOper = mrPlan.getSuccessors(mrOper).get(0);
++count;
}
assertEquals(4, count);
}
@Test
public void testUDFInJoin() throws Exception {
String query = "a = load 'input1' using BinStorage();" +
"b = load 'input2';" +
"c = join a by $0, b by $0;" + "store c into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper mrOper = mrPlan.getRoots().get(0);
assertEquals(2, mrOper.UDFs.size());
assertEquals(2, mrOper.UDFs.size());
assertTrue(mrOper.UDFs.contains("BinStorage"));
assertTrue(mrOper.UDFs.contains("org.apache.pig.builtin.PigStorage"));
}
@Test
public void testMergeJoin() throws Exception {
String query = "a = load '/tmp/input1';" +
"b = load '/tmp/input2';" +
"c = join a by $0, b by $0 using 'merge';" +
"store c into '/tmp/output1';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MRCompiler comp = new MRCompiler(pp, pc);
comp.compile();
MROperPlan mrp = comp.getMRPlan();
assertTrue(mrp.size()==2);
MapReduceOper mrOp0 = mrp.getRoots().get(0);
assertTrue(mrOp0.mapPlan.size()==2);
PhysicalOperator load0 = mrOp0.mapPlan.getRoots().get(0);
MergeJoinIndexer func = (MergeJoinIndexer)PigContext.instantiateFuncFromSpec(((POLoad)load0).getLFile().getFuncSpec());
Field lrField = MergeJoinIndexer.class.getDeclaredField("lr");
lrField.setAccessible(true);
POLocalRearrange lr = (POLocalRearrange)lrField.get(func);
List<PhysicalPlan> innerPlans = lr.getPlans();
PhysicalOperator localrearrange0 = mrOp0.mapPlan.getSuccessors(load0).get(0);
assertTrue(localrearrange0 instanceof POLocalRearrange);
assertTrue(mrOp0.reducePlan.size()==3);
PhysicalOperator pack0 = mrOp0.reducePlan.getRoots().get(0);
assertTrue(pack0 instanceof POPackage);
PhysicalOperator foreach0 = mrOp0.reducePlan.getSuccessors(pack0).get(0);
assertTrue(foreach0 instanceof POForEach);
PhysicalOperator store0 = mrOp0.reducePlan.getSuccessors(foreach0).get(0);
assertTrue(store0 instanceof POStore);
assertTrue(innerPlans.size()==1);
PhysicalPlan innerPlan = innerPlans.get(0);
assertTrue(innerPlan.size()==1);
PhysicalOperator project = innerPlan.getRoots().get(0);
assertTrue(project instanceof POProject);
assertTrue(((POProject)project).getColumn()==0);
MapReduceOper mrOp1 = mrp.getSuccessors(mrOp0).get(0);
assertTrue(mrOp1.mapPlan.size()==3);
PhysicalOperator load1 = mrOp1.mapPlan.getRoots().get(0);
assertTrue(load1 instanceof POLoad);
PhysicalOperator mergejoin1 = mrOp1.mapPlan.getSuccessors(load1).get(0);
assertTrue(mergejoin1 instanceof POMergeJoin);
PhysicalOperator store1 = mrOp1.mapPlan.getSuccessors(mergejoin1).get(0);
assertTrue(store1 instanceof POStore);
assertTrue(mrOp1.reducePlan.isEmpty());
}
public static class WeirdComparator extends ComparisonFunc {
@Override
public int compare(Tuple t1, Tuple t2) {
int result = 0;
try {
int i1 = (Integer) t1.get(1);
int i2 = (Integer) t2.get(1);
result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
} catch (ExecException e) {
throw new RuntimeException(e);
}
return result;
}
}
@Test
public void testMergeJoinWithIndexableLoadFunc() throws Exception{
String query = "a = load 'input1';" +
"b = load 'input2' using " +
TestMergeJoin.DummyIndexableLoader.class.getName() + ";" +
"c = join a by $0, b by $0 using 'merge';" + "store c into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mp = Util.buildMRPlan(pp, pc);
assertEquals("Checking number of MR Jobs for merge join with " +
"IndexableLoadFunc:", 1, mp.size());
}
@Test
public void testCastFuncShipped() throws Exception{
String query = "a = load 'input1' using " + PigStorageNoDefCtor.class.getName() +
"('\t') as (a0, a1, a2);" +
"b = group a by a0;" +
"c = foreach b generate flatten(a);" +
"d = order c by a0;" +
"e = foreach d generate a1+a2;" +
"store e into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mp = Util.buildMRPlan(pp, pc);
MapReduceOper op = mp.getLeaves().get(0);
assertTrue(op.UDFs.contains(new FuncSpec(PigStorageNoDefCtor.class.getName())+"('\t')"));
}
@Test
public void testLimitAdjusterFuncShipped() throws Exception{
String query = "a = load 'input';" +
"b = order a by $0 parallel 2;" +
"c = limit b 7;" + "store c into 'output' using "
+ PigStorageNoDefCtor.class.getName() + "('\t');";
PhysicalPlan pp = Util.buildPp(pigServerMR, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
LimitAdjuster la = new LimitAdjuster(mrPlan, pc);
la.visit();
la.adjust();
MapReduceOper mrOper = mrPlan.getRoots().get(0);
int count = 1;
while(mrPlan.getSuccessors(mrOper) != null) {
mrOper = mrPlan.getSuccessors(mrOper).get(0);
++count;
}
assertEquals(4, count);
MapReduceOper op = mrPlan.getLeaves().get(0);
assertTrue(op.UDFs.contains(new FuncSpec(PigStorageNoDefCtor.class.getName())+"('\t')"));
}
/**
* Test that POSortedDistinct gets printed as POSortedDistinct
* @throws Exception
*/
@Test
public void testSortedDistinctInForeach() throws Exception {
PhysicalPlan php = new PhysicalPlan();
PhysicalPlan grpChain1 = GenPhyOp.loadedGrpChain();
php.merge(grpChain1);
List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
PhysicalPlan inplan = new PhysicalPlan();
PODistinct op1 = new POSortedDistinct(new OperatorKey("", r.nextLong()),
-1, null);
inplan.addAsLeaf(op1);
inputs.add(inplan);
List<Boolean> toFlattens = new ArrayList<Boolean>();
toFlattens.add(false);
POForEach pofe = new POForEach(new OperatorKey("", r.nextLong()), 1,
inputs, toFlattens);
php.addAsLeaf(pofe);
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC19.gld");
}
private void run(PhysicalPlan pp, String expectedFile) throws Exception {
String compiledPlan, goldenPlan = null;
int MAX_SIZE = 100000;
MRCompiler comp = new MRCompiler(pp, pc);
comp.compile();
MROperPlan mrp = comp.getMRPlan();
PlanPrinter ppp = new PlanPrinter(mrp);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ppp.print(baos);
compiledPlan = baos.toString();
if(generate){
FileOutputStream fos = new FileOutputStream(expectedFile);
fos.write(baos.toByteArray());
return;
}
FileInputStream fis = new FileInputStream(expectedFile);
byte[] b = new byte[MAX_SIZE];
int len = fis.read(b);
goldenPlan = new String(b, 0, len);
if (goldenPlan.charAt(len-1) == '\n')
goldenPlan = goldenPlan.substring(0, len-1);
pp.explain(System.out);
System.out.println();
System.out.println("<<<" + compiledPlan + ">>>");
System.out.println("-------------");
System.out.println("Golden");
System.out.println("<<<" + goldenPlan + ">>>");
System.out.println("-------------");
String goldenPlanClean = Util.standardizeNewline(goldenPlan);
String compiledPlanClean = Util.standardizeNewline(compiledPlan);
assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)),
TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean)));
}
public static class TestCollectableLoadFunc extends PigStorage implements CollectableLoadFunc {
@Override
public void ensureAllKeyInstancesInSameSplit() throws IOException {
}
}
public static class TestIndexableLoadFunc extends PigStorage implements IndexableLoadFunc {
@Override
public void initialize(Configuration conf) throws IOException {
}
@Override
public void seekNear(Tuple keys) throws IOException {
}
@Override
public void close() throws IOException {
}
}
@Test
public void testUDFInMergedCoGroup() throws Exception {
String query = "a = load 'input1' using " + TestCollectableLoadFunc.class.getName() + "();" +
"b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" +
"c = cogroup a by $0, b by $0 using 'merge';" +
"store c into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper mrOper = mrPlan.getRoots().get(0);
assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName()));
mrOper = mrPlan.getSuccessors(mrOper).get(0);
assertTrue(mrOper.UDFs.contains(TestCollectableLoadFunc.class.getName()));
assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName()));
}
@Test
public void testUDFInMergedJoin() throws Exception {
String query = "a = load 'input1';" +
"b = load 'input2' using " + TestIndexableLoadFunc.class.getName() + "();" +
"c = join a by $0, b by $0 using 'merge';" +
"store c into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper mrOper = mrPlan.getRoots().get(0);
assertTrue(mrOper.UDFs.contains(TestIndexableLoadFunc.class.getName()));
}
//PIG-2146
@Test
public void testSchemaInStoreForDistinctLimit() throws Exception {
//test if the POStore in the 2nd mr plan (that stores the actual output)
// has a schema
String query = "a = load 'input1' as (a : int,b :float ,c : int);" +
"b = distinct a;" +
"c = limit b 10;" +
"store c into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
MapReduceOper secondMrOper = mrPlan.getLeaves().get(0);
POStore store = (POStore)secondMrOper.reducePlan.getLeaves().get(0);
assertEquals(
"compare load and store schema",
store.getSchema(),
Utils.getSchemaFromString("a : int,b :float ,c : int")
);
}
//PIG-2146
@Test
public void testStorerLimit() throws Exception {
// test if the POStore in the 1st mr plan
// use the right StoreFunc
String query = "a = load 'input1';" +
"b = limit a 10;" +
"store b into 'output' using " + PigStorageNoDefCtor.class.getName() + "(',');";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
LimitAdjuster la = new LimitAdjuster(mrPlan, pc);
la.visit();
la.adjust();
MapReduceOper firstMrOper = mrPlan.getRoots().get(0);
POStore store = (POStore)firstMrOper.reducePlan.getLeaves().get(0);
assertEquals(store.getStoreFunc().getClass().getName(), "org.apache.pig.impl.io.InterStorage");
}
// See PIG-4538
@Test
public void testFetchOptimizerSideEffect() throws Exception{
String query = "in1 = LOAD 'data.txt' AS (ident:chararray);" +
"in2 = LOAD 'data.txt' AS (ident:chararray);" +
"in3 = LOAD 'data.txt';" +
"joined = JOIN in1 BY ident LEFT OUTER, in2 BY ident;" +
"store joined into 'output';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
MROperPlan mp = Util.buildMRPlan(pp, pc);
// isPlanFetchable should not bring side effect:
// set parentPlan for operators
FetchOptimizer.isPlanFetchable(pc, pp);
MapReduceOper op = mp.getLeaves().get(0);
PhysicalOperator store = op.reducePlan.getLeaves().get(0);
POForEach foreach = (POForEach)op.reducePlan.getPredecessors(store).get(0);
PhysicalOperator project = foreach.getInputPlans().get(0).getRoots().get(0);
Field parentPlan = PhysicalOperator.class.getDeclaredField("parentPlan");
parentPlan.setAccessible(true);
assertTrue(parentPlan.get(project)==null);
}
}