| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.test; |
| |
| import static org.junit.Assert.*; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.PrintStream; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Random; |
| |
| import org.apache.pig.ExecType; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SampleOptimizer; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.logicalLayer.LogicalPlan; |
| import org.apache.pig.test.utils.LogicalPlanTester; |
| import org.junit.Test; |
| |
| public class TestSampleOptimizer { |
| |
| static PigContext pc; |
| static{ |
| pc = new PigContext(ExecType.MAPREDUCE,MiniCluster.buildCluster().getProperties()); |
| try { |
| pc.connect(); |
| } catch (ExecException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| @Test |
| public void testOptimizerFired() throws Exception{ |
| |
| LogicalPlanTester planTester = new LogicalPlanTester() ; |
| planTester.buildPlan(" A = load 'input' using PigStorage('\t');"); |
| planTester.buildPlan(" B = order A by $0;"); |
| LogicalPlan lp = planTester.buildPlan("store B into '/tmp';"); |
| PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc); |
| MROperPlan mrPlan = Util.buildMRPlan(pp, pc); |
| |
| int count = 1; |
| MapReduceOper mrOper = mrPlan.getRoots().get(0); |
| while(mrPlan.getSuccessors(mrOper) != null) { |
| mrOper = mrPlan.getSuccessors(mrOper).get(0); |
| ++count; |
| } |
| |
| // Before optimizer visits, number of MR jobs = 3. |
| assertEquals(3,count); |
| |
| SampleOptimizer so = new SampleOptimizer(mrPlan); |
| so.visit(); |
| |
| count = 1; |
| mrOper = mrPlan.getRoots().get(0); |
| while(mrPlan.getSuccessors(mrOper) != null) { |
| mrOper = mrPlan.getSuccessors(mrOper).get(0); |
| ++count; |
| } |
| |
| // After optimizer visits, number of MR jobs = 2. |
| assertEquals(2,count); |
| |
| // Test if RandomSampleLoader got pushed to top. |
| mrOper = mrPlan.getRoots().get(0); |
| List<PhysicalOperator> phyOps = mrOper.mapPlan.getRoots(); |
| assertEquals(1, phyOps.size()); |
| assertTrue(phyOps.get(0) instanceof POLoad); |
| assertTrue(((POLoad)phyOps.get(0)).getLFile().getFuncName().equals("org.apache.pig.impl.builtin.RandomSampleLoader")); |
| |
| // Test RandomSampleLoader is not present anymore in second MR job. |
| phyOps = mrPlan.getSuccessors(mrOper).get(0).mapPlan.getRoots(); |
| assertEquals(1, phyOps.size()); |
| assertTrue(phyOps.get(0) instanceof POLoad); |
| assertFalse(((POLoad)phyOps.get(0)).getLFile().getFuncName().equals("org.apache.pig.impl.builtin.RandomSampleLoader")); |
| } |
| |
| @Test |
| public void testOptimizerNotFired() throws Exception{ |
| |
| LogicalPlanTester planTester = new LogicalPlanTester() ; |
| planTester.buildPlan(" A = load 'input' using PigStorage('\t');"); |
| planTester.buildPlan("B = group A by $0;"); |
| planTester.buildPlan(" C = order B by $0;"); |
| LogicalPlan lp = planTester.buildPlan("store C into 'output';"); |
| PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc); |
| MROperPlan mrPlan = Util.buildMRPlan(pp, pc); |
| |
| int count = 1; |
| MapReduceOper mrOper = mrPlan.getRoots().get(0); |
| while(mrPlan.getSuccessors(mrOper) != null) { |
| mrOper = mrPlan.getSuccessors(mrOper).get(0); |
| ++count; |
| } |
| // Before optimizer visits, number of MR jobs = 3. |
| assertEquals(3,count); |
| |
| SampleOptimizer so = new SampleOptimizer(mrPlan); |
| so.visit(); |
| |
| count = 1; |
| mrOper = mrPlan.getRoots().get(0); |
| while(mrPlan.getSuccessors(mrOper) != null) { |
| mrOper = mrPlan.getSuccessors(mrOper).get(0); |
| ++count; |
| } |
| |
| // After optimizer visits, number of MR jobs = 3. Since here |
| // optimizer is not fired. |
| assertEquals(3,count); |
| |
| // Test Sampler is not moved and is present in 2nd MR job. |
| mrOper = mrPlan.getRoots().get(0); |
| List<PhysicalOperator> phyOps = mrOper.mapPlan.getRoots(); |
| assertEquals(1, phyOps.size()); |
| assertTrue(phyOps.get(0) instanceof POLoad); |
| assertFalse(((POLoad)phyOps.get(0)).getLFile().getFuncName().equals("org.apache.pig.impl.builtin.RandomSampleLoader")); |
| |
| phyOps = mrPlan.getSuccessors(mrOper).get(0).mapPlan.getRoots(); |
| assertEquals(1, phyOps.size()); |
| assertTrue(phyOps.get(0) instanceof POLoad); |
| assertTrue(((POLoad)phyOps.get(0)).getLFile().getFuncName().equals("org.apache.pig.impl.builtin.RandomSampleLoader")); |
| } |
| |
| |
| // End to end is more comprehensively tested in TestOrderBy and TestOrderBy2. But since those tests are currently excluded |
| // this simple end to end test is included. |
| @Test |
| public void testEndToEnd() throws Exception{ |
| |
| PigServer pigServer = new PigServer(pc); |
| int LOOP_COUNT = 40; |
| File tmpFile = File.createTempFile("test", "txt"); |
| PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); |
| Random r = new Random(3); |
| int rand; |
| for(int i = 0; i < LOOP_COUNT; i++) { |
| rand = r.nextInt(100); |
| ps.println(rand); |
| } |
| ps.close(); |
| |
| pigServer.registerQuery("A = LOAD '" |
| + Util.generateURI(tmpFile.toString(), pigServer.getPigContext()) |
| + "' using PigStorage() AS (num:int);"); |
| pigServer.registerQuery("B = order A by num desc;"); |
| Iterator<Tuple> result = pigServer.openIterator("B"); |
| |
| Integer prevNum = null; |
| while (result.hasNext()) |
| { |
| Integer curNum = (Integer)result.next().get(0); |
| if (null != prevNum) |
| assertTrue(curNum.compareTo(prevNum) <= 0 ); |
| |
| prevNum = curNum; |
| } |
| tmpFile.delete(); |
| } |
| |
| @Test |
| public void testPoissonSampleOptimizer() throws Exception { |
| LogicalPlanTester planTester = new LogicalPlanTester() ; |
| planTester.buildPlan(" A = load 'input' using PigStorage('\t');"); |
| planTester.buildPlan("B = load 'input' using PigStorage('\t');"); |
| planTester.buildPlan(" C = join A by $0, B by $0 using \"skewed\";"); |
| LogicalPlan lp = planTester.buildPlan("store C into 'output';"); |
| PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc); |
| MROperPlan mrPlan = Util.buildMRPlan(pp, pc); |
| |
| int count = 1; |
| MapReduceOper mrOper = mrPlan.getRoots().get(0); |
| while(mrPlan.getSuccessors(mrOper) != null) { |
| mrOper = mrPlan.getSuccessors(mrOper).get(0); |
| ++count; |
| } |
| // Before optimizer visits, number of MR jobs = 3. |
| assertEquals(3,count); |
| |
| SampleOptimizer so = new SampleOptimizer(mrPlan); |
| so.visit(); |
| |
| count = 1; |
| mrOper = mrPlan.getRoots().get(0); |
| while(mrPlan.getSuccessors(mrOper) != null) { |
| mrOper = mrPlan.getSuccessors(mrOper).get(0); |
| ++count; |
| } |
| // After optimizer visits, number of MR jobs = 2 |
| assertEquals(2,count); |
| } |
| } |