| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.tez; |
| |
| import static org.junit.Assert.assertEquals; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.PrintStream; |
| import java.util.Properties; |
| import java.util.Random; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.OutputFormat; |
| import org.apache.hadoop.mapreduce.RecordWriter; |
| import org.apache.pig.PigConfiguration; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.StoreFunc; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; |
| import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher; |
| import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType; |
| import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer; |
| import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter; |
| import org.apache.pig.builtin.OrcStorage; |
| import org.apache.pig.builtin.PigStorage; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.io.FileLocalizer; |
| import org.apache.pig.impl.plan.NodeIdGenerator; |
| import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat; |
| import org.apache.pig.test.Util; |
| import org.apache.pig.test.utils.TestHelper; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| /** |
| * Test cases to test the TezCompiler. VERY IMPORTANT NOTE: The tests here |
| * compare results with a "golden" set of outputs. In each test case here, the |
| * operators generated have a random operator key which uses Java's Random |
| * class. So if there is a code change which changes the number of operators |
| * created in a plan, then the "golden" file for that test case |
| * need to be changed. |
| */ |
| |
| public class TestTezCompiler { |
| private static PigContext pc; |
| private static PigServer pigServer; |
| private static final int MAX_SIZE = 100000; |
| |
| // If for some reason, the golden files need to be regenerated, set this to |
| // true - THIS WILL OVERWRITE THE GOLDEN FILES - So use this with caution |
| // and only for the test cases you need and are sure of. |
| private boolean generate = false; |
| |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| resetFileLocalizer(); |
| pc = new PigContext(new TezLocalExecType(), new Properties()); |
| FileUtils.deleteDirectory(new File("/tmp/pigoutput")); |
| } |
| |
| @AfterClass |
| public static void tearDownAfterClass() throws Exception { |
| resetFileLocalizer(); |
| } |
| |
| @Before |
| public void setUp() throws ExecException { |
| resetScope(); |
| pc.getProperties().remove(PigConfiguration.PIG_OPT_MULTIQUERY); |
| pc.getProperties().remove(PigConfiguration.PIG_TEZ_OPT_UNION); |
| pc.getProperties().remove(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY); |
| pc.getProperties().remove(PigConfiguration.PIG_BLOOMJOIN_STRATEGY); |
| pigServer = new PigServer(pc); |
| } |
| |
| private void resetScope() { |
| NodeIdGenerator.reset(); |
| PigServer.resetScope(); |
| TezPlanContainer.resetScope(); |
| } |
| |
| private static void resetFileLocalizer() { |
| FileLocalizer.deleteTempFiles(); |
| FileLocalizer.setInitialized(false); |
| // Set random seed to generate deterministic temporary paths |
| FileLocalizer.setR(new Random(1331L)); |
| } |
| |
| @Test |
| public void testStoreLoad() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "store a into 'file:///tmp/pigoutput';" + |
| "b = load 'file:///tmp/pigoutput' as (x:int, y:int);" + |
| "store b into 'file:///tmp/pigoutput1';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-1.gld"); |
| } |
| |
| @Test |
| public void testStoreLoadMultiple() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input';" + |
| "store a into 'file:///tmp/pigoutput/Dir1';" + |
| "a = load 'file:///tmp/pigoutput/Dir1';" + |
| "store a into 'file:///tmp/pigoutput/Dir2' using BinStorage();" + |
| "a = load 'file:///tmp/pigoutput/Dir1';" + |
| "store a into 'file:///tmp/pigoutput/Dir3';" + |
| "a = load 'file:///tmp/pigoutput/Dir2' using BinStorage();" + |
| "store a into 'file:///tmp/pigoutput/Dir4';" + |
| "a = load 'file:///tmp/pigoutput/Dir3';" + |
| "b = load 'file:///tmp/pigoutput/Dir2' using BinStorage();" + |
| "c = load 'file:///tmp/pigoutput/Dir1';" + |
| "d = cogroup a by $0, b by $0, c by $0;" + |
| "store d into 'file:///tmp/pigoutput/Dir5';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld"); |
| } |
| |
| @Test |
| public void testStoreLoadJoinMultiple() throws Exception { |
| // Case where different store load statements are used in a single join |
| String query = |
| "a = load 'file:///tmp/pigoutput/Dir1';" + |
| "b = filter a by $0 == 1;" + |
| "c = filter a by $0 == 2;" + |
| "store b into 'file:///tmp/pigoutput/Dir2';" + |
| "store c into 'file:///tmp/pigoutput/Dir3';" + |
| "d = load 'file:///tmp/pigoutput/Dir2';" + |
| "e = load 'file:///tmp/pigoutput/Dir3';" + |
| "f = join d by $0, e by $0;" + |
| "store f into 'file:///tmp/pigoutput/Dir5';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-3.gld"); |
| |
| resetScope(); |
| query = |
| "a = load 'file:///tmp/pigoutput/Dir1';" + |
| "b = distinct a;" + |
| "c = group a by $0;" + |
| "store b into 'file:///tmp/pigoutput/Dir2';" + |
| "store c into 'file:///tmp/pigoutput/Dir3';" + |
| "d = load 'file:///tmp/pigoutput/Dir2';" + |
| "e = load 'file:///tmp/pigoutput/Dir3';" + |
| "f = load 'file:///tmp/pigoutput/Dir4';" + |
| "g = join d by $0, f by $0 using 'repl';" + |
| "h = join e by $0, f by $0 using 'repl';" + |
| "store g into 'file:///tmp/pigoutput/Dir4';" + |
| "store h into 'file:///tmp/pigoutput/Dir5';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-4.gld"); |
| } |
| |
| @Test |
| public void testStoreLoadSplit() throws Exception { |
| // Cases where segmenting into two DAGs is not straight forward due to Split. |
| // The Split operator is required in both the segments. |
| |
| resetFileLocalizer(); |
| // Split operator as root vertex |
| String query = |
| "a = load 'file:///tmp/input';" + |
| "a1 = filter a by $0 == 5;" + |
| "store a1 into 'file:///tmp/pigoutput/Dir1';" + |
| "b = load 'file:///tmp/pigoutput/Dir1';" + |
| "c = join a by $0, b by $0;" + |
| "store c into 'file:///tmp/pigoutput/Dir2';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-5.gld"); |
| |
| // Split operator as intermediate vertex |
| query = |
| "a = load 'file:///tmp/input';" + |
| "a = distinct a;" + |
| "store a into 'file:///tmp/pigoutput/Dir1';" + |
| "b = load 'file:///tmp/pigoutput/Dir1';" + |
| "c = join a by $0, b by $0;" + |
| "store c into 'file:///tmp/pigoutput/Dir2';"; |
| |
| resetScope(); |
| resetFileLocalizer(); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld"); |
| |
| // Three levels of splits - a, a1 and a2. |
| // One split above and one split below a1 which is the split to be replaced with tmp store. |
| query = |
| "a = load 'file:///tmp/input';" + |
| "store a into 'file:///tmp/pigoutput/Dir0';" + |
| "a1 = filter a by $0 == 5;" + |
| "store a1 into 'file:///tmp/pigoutput/Dir1';" + |
| "a2 = distinct a1;" + |
| "store a2 into 'file:///tmp/pigoutput/Dir2';" + |
| "a3 = group a2 by $0;" + |
| "store a3 into 'file:///tmp/pigoutput/Dir3';" + |
| "b = load 'file:///tmp/pigoutput/Dir3';" + |
| "c = join a1 by $0, b by $0;" + |
| "store c into 'file:///tmp/pigoutput/Dir4';"; |
| |
| resetScope(); |
| resetFileLocalizer(); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-7.gld"); |
| } |
| |
| @Test |
| public void testNative() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = native 'hadoop-examples.jar' Store a into '/tmp/table_testNativeMRJobSimple_input' Load '/tmp/table_testNativeMRJobSimple_output' `wordcount /tmp/table_testNativeMRJobSimple_input /tmp/table_testNativeMRJobSimple_output`;" + |
| "store b into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Native-1.gld"); |
| } |
| |
| @Test |
| public void testFilter() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = filter a by x > 0;" + |
| "c = foreach b generate y;" + |
| "store c into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld"); |
| } |
| |
| @Test |
| public void testGroupBy() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = group a by x;" + |
| "c = foreach b generate group, COUNT(a.x);" + |
| "store c into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld"); |
| } |
| |
| @Test |
| public void testJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "c = join a by x, b by x;" + |
| "d = foreach c generate a::x as x, y, z;" + |
| "store d into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld"); |
| } |
| |
| @Test |
| public void testBloomJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x, y:int);" + |
| "b = load 'file:///tmp/input2' as (x, z:int);" + |
| "c = load 'file:///tmp/input2' as (x, w:int);" + |
| "d = join b by x, a by x, c by x using 'bloom';" + |
| "e = foreach d generate a::x as x, y, z, w;" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld"); |
| } |
| |
| @Test |
| public void testBloomJoinLeftOuter() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:chararray, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:chararray, z:int);" + |
| "d = join a by x left, b by x using 'bloom';" + |
| "e = foreach d generate a::x as x, y, z;" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld"); |
| } |
| |
| @Test |
| public void testBloomJoinUnion() throws Exception { |
| // Left input from a union |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "c = load 'file:///tmp/input3' as (x:int, z:int);" + |
| "b = union b, c;" + |
| "d = join a by x, b by x using 'bloom';" + |
| "e = foreach d generate a::x as x, y, z;" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld"); |
| setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null); |
| |
| resetScope(); |
| // Right input from a union |
| query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "c = load 'file:///tmp/input3' as (x:int, z:int);" + |
| "b = union b, c;" + |
| "d = join b by x, a by x using 'bloom';" + |
| "e = foreach d generate a::x as x, y, z;" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| // Needs shared edges and PIG-3856 to be a more optimial plan |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld"); |
| } |
| |
| @Test |
| public void testBloomJoinSplit() throws Exception { |
| // Left input from a split |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "a1 = filter a by x == 3;" + |
| "a2 = filter a by x == 4;" + |
| "d = join a1 by x, a2 by x, b by x using 'bloom';" + |
| "e = foreach d generate a1::x as x, a1::y as y1, a2::y as y2, z;" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld"); |
| setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null); |
| |
| resetScope(); |
| // Right input from a split |
| query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "a1 = filter a by x == 3;" + |
| "a2 = filter a by x == 4;" + |
| "d = join b by x, a1 by x using 'bloom';" + |
| "e = foreach d generate a1::x as x, y, z;" + |
| "store a2 into 'file:///tmp/pigoutput/a2';" + |
| "store e into 'file:///tmp/pigoutput/e';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld"); |
| } |
| |
| @Test |
| public void testBloomSelfJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = filter a by x < 5;" + |
| "c = filter a by x == 10;" + |
| "d = filter a by x > 10;" + |
| "e = join b by x, c by x, d by x using 'bloom';" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld"); |
| } |
| |
| @Test |
| public void testSelfJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = filter a by x < 5;" + |
| "c = filter a by x == 10;" + |
| "d = filter a by x > 10;" + |
| "e = join b by x, c by x, d by x;" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-1.gld"); |
| } |
| |
| @Test |
| public void testSelfJoinSkewed() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = filter a by x < 5;" + |
| "c = filter a by x == 10;" + |
| "d = join b by x, c by x using 'skewed';" + |
| "store d into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld"); |
| } |
| |
| @Test |
| public void testSelfJoinReplicated() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = filter a by x < 5;" + |
| "c = filter a by x == 10;" + |
| "d = filter a by x > 10;" + |
| "e = join b by x, c by x, d by x using 'replicated';" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-3.gld"); |
| } |
| |
| @Test |
| public void testSelfJoinUnionReplicated() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "c = union a, b;" + |
| "d = join b by x, c by x using 'replicated';" + |
| "store d into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld"); |
| } |
| |
| @Test |
| public void testSelfJoinUnion() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "a1 = filter a by x > 5;" + |
| "a2 = filter a by x < 2;" + |
| "b = union a1, a2;" + |
| "c = join b by x, a by x;" + |
| "store c into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld"); |
| } |
| |
| @Test |
| public void testSelfJoinUnionDifferentMembers() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "a1 = filter a by x > 5;" + |
| "a2 = filter a by x < 2;" + |
| "a3 = filter a by y == 10;" + |
| "a4 = join a2 by x, a3 by x;" + |
| "a5 = foreach a4 generate a2::x as x, a3::y as y;" + |
| "b = union a1, a5;" + |
| "c = join b by x, a by x;" + |
| "store c into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld"); |
| } |
| |
| @Test |
| public void testCross() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "c = cross a, b;" + |
| "d = foreach c generate a::x as x, y, z;" + |
| "store d into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-1.gld"); |
| } |
| |
| @Test |
| public void testSelfCross() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = filter a by x < 5;" + |
| "c = filter a by x == 10;" + |
| "d = cross b, c;" + |
| "store d into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-2.gld"); |
| } |
| |
| @Test |
| public void testCrossScalarSplit() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "c = cross b, a;" + |
| "d = foreach c generate a.x, a.y, z;" + //Scalar |
| "store d into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld"); |
| } |
| |
| @Test |
| public void testSkewedJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "c = join a by x, b by x using 'skewed';" + |
| "d = foreach c generate a::x as x, y, z;" + |
| "store d into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld"); |
| } |
| |
| @Test |
| public void testSkewedJoinFilter() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "a = filter a by x == 1;" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "c = join a by x, b by x using 'skewed';" + |
| "d = foreach c generate a::x as x, y, z;" + |
| "store d into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld"); |
| } |
| |
| @Test |
| public void testLimit() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = limit a 10;" + |
| "c = foreach b generate y;" + |
| "store c into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld"); |
| } |
| |
| @Test |
| public void testLimitOrderby() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = order a by x, y;" + |
| "c = limit b 10;" + |
| "store c into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld"); |
| } |
| |
| @Test |
| public void testLimitScalarOrderby() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = order a by x, y;" + |
| "g = group a all;" + |
| "h = foreach g generate COUNT(a) as sum;" + |
| "c = limit b h.sum/2;" + |
| "store c into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld"); |
| } |
| |
| @Test |
| public void testLimitReplJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input' as (x:int, y:int);" + |
| "c = limit a 1;" + |
| "d = join c by x, b by x using 'replicated';" + |
| "store a into 'file:///tmp/pigoutput/a';" + |
| "store d into 'file:///tmp/pigoutput/d';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-4.gld"); |
| } |
| |
| @Test |
| public void testDistinct() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = distinct a;" + |
| "c = foreach b generate y;" + |
| "store c into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld"); |
| } |
| |
| @Test |
| public void testDistinctAlgebraicUdfCombiner() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = group a by x;" + |
| "c = foreach b { d = distinct a; generate COUNT(d); };" + |
| "store c into 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld"); |
| } |
| |
| |
| |
| @Test |
| public void testReplicatedJoinInMapper() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "c = load 'file:///tmp/input3' as (x:int, z:int);" + |
| "d = join a by x, b by x, c by x using 'replicated';" + |
| "store d into 'file:///tmp/pigoutput/d';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld"); |
| } |
| |
| @Test |
| public void testReplicatedJoinInReducer() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = group a by x;" + |
| "b1 = foreach b generate group, COUNT(a.y);" + |
| "c = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "d = join b1 by group, c by x using 'replicated';" + |
| "store d into 'file:///tmp/pigoutput/e';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld"); |
| } |
| |
| @Test |
| public void testStream() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" + |
| "b = stream a through `stream.pl -n 5`;" + |
| "STORE b INTO 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Stream-1.gld"); |
| } |
| |
| @Test |
| public void testSecondaryKeySort() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int, z:int);" + |
| "b = group a by $0;" + |
| "c = foreach b { d = limit a 10; e = order d by $1; f = order e by $0; generate group, f;};"+ |
| "store c INTO 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-1.gld"); |
| |
| // With optimization turned off |
| setProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, "true"); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-2.gld"); |
| } |
| |
| @Test |
| public void testOrderBy() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" + |
| "b = order a by x;" + |
| "STORE b INTO 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld"); |
| } |
| |
| @Test |
| public void testOrderByWithFilter() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" + |
| "b = filter a by x == 1;" + |
| "c = order b by x;" + |
| "STORE c INTO 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld"); |
| } |
| |
| @Test |
| public void testOrderByReadOnceLoadFunc() throws Exception { |
| setProperty("pig.sort.readonce.loadfuncs","org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage"); |
| String query = |
| "a = load 'file:///tmp/input' using org.apache.pig.backend.hadoop.hbase.HBaseStorage(',') as (x:int, y:int);" + |
| "b = order a by x;" + |
| "STORE b INTO 'file:///tmp/pigoutput';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld"); |
| setProperty("pig.sort.readonce.loadfuncs", null); |
| } |
| |
| // PIG-3759, PIG-3781 |
| // Combiner should not be added in case of co-group |
| @Test |
| public void testCogroupWithAlgebraiceUDF() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "c = cogroup a by x, b by x;" + |
| "d = foreach c generate group, COUNT(a.y), COUNT(b.z);" + |
| "store d into 'file:///tmp/pigoutput/d';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld"); |
| } |
| |
| @Test |
| public void testMulitQueryWithSplitSingleVertex() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "split a into b if x <= 5, c if x <= 10, d if x >10;" + |
| "store b into 'file:///tmp/pigoutput/b';" + |
| "store c into 'file:///tmp/pigoutput/c';" + |
| "store d into 'file:///tmp/pigoutput/d';"; |
| |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld"); |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testMulitQueryWithSplitMultiVertex() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "split a into b if x <= 5, c if x <= 10, d if x >10;" + |
| "split b into e if x < 3, f if x >= 3;" + |
| // No Combiner on the edge to b1/b2 vertex as both b1 and b2 are stored |
| "b1 = group b by x;" + |
| "b2 = foreach b1 generate group, SUM(b.x);" + |
| // Case of two outputs within a split going to same edge as input |
| "c1 = join c by x, b by x;" + |
| "c2 = group c by x;" + |
| // Combiner on the edge to c3 vertex |
| "c3 = foreach c2 generate group, SUM(c.x);" + |
| "d1 = filter d by x == 5;" + |
| "e1 = order e by x;" + |
| // TODO: Physical plan has extra split for f1 - 1-2: Split - scope-80 |
| // POSplit has only 1 sub plan. Optimized and removed in MR plan. |
| // Needs to be removed in Tez plan as well. |
| "f1 = limit f 1;" + |
| "f2 = union d1, f1;" + |
| "store b1 into 'file:///tmp/pigoutput/b1';" + |
| "store b2 into 'file:///tmp/pigoutput/b2';" + |
| "store c1 into 'file:///tmp/pigoutput/c1';" + |
| "store c3 into 'file:///tmp/pigoutput/c1';" + |
| "store d1 into 'file:///tmp/pigoutput/d1';" + |
| "store e1 into 'file:///tmp/pigoutput/e1';" + |
| "store f1 into 'file:///tmp/pigoutput/f1';" + |
| "store f2 into 'file:///tmp/pigoutput/f2';"; |
| |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld"); |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testMultiQueryWithGroupBy() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = group a by x;" + |
| "b = foreach b generate group, COUNT(a.x);" + |
| "c = group a by (x,y);" + |
| "c = foreach c generate group, COUNT(a.y);" + |
| "store b into 'file:///tmp/pigoutput/b';" + |
| "store c into 'file:///tmp/pigoutput/c';"; |
| |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld"); |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testMultiQueryWithJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, z:int);" + |
| "c = join a by x, b by x;" + |
| "d = foreach c generate $0, $1, $3;" + |
| "e = foreach c generate $0, $1, $2, $3;" + |
| "store c into 'file:///tmp/pigoutput/c';" + |
| "store d into 'file:///tmp/pigoutput/d';" + |
| "store e into 'file:///tmp/pigoutput/e';"; |
| |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld"); |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testMultiQueryWithNestedSplit() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = group a by x;" + //b: {group: int,a: {(x: int,y: int)}} |
| "store b into 'file:///tmp/pigoutput/b';" + |
| "c = foreach b generate a.x, a.y;" + //c: {{(x: int)},{(y: int)}} |
| "store c into 'file:///tmp/pigoutput/c';" + |
| "d = foreach b GENERATE FLATTEN(a);" + //d: {a::x: int,a::y: int} |
| "store d into 'file:///tmp/pigoutput/d';" + |
| "e = foreach d GENERATE a::x, a::y;" + |
| "store e into 'file:///tmp/pigoutput/e';"; |
| |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld"); |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testMultiQueryScalar() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = group a by x;" + |
| "c = foreach b generate group, COUNT(a) as cnt;" + |
| "SPLIT a into d if (2 * c.cnt) < y, e OTHERWISE;" + |
| "store d into 'file:///tmp/pigoutput1';" + |
| "store e into 'file:///tmp/pigoutput2';"; |
| |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testMultiQueryMultipleReplicateJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input' as (x:int, y:int);" + |
| "c = join a by $0, b by $0 using 'replicated';" + |
| "d = join a by $1, b by $1 using 'replicated';" + |
| "e = union c,d;" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testMultiQueryMultipleScalar() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = filter a by x == 5;" + |
| "b = foreach b generate $0 as b1;" + |
| "c = filter a by x == 10;" + |
| "c = foreach c generate $0 as c1;" + |
| "d = group a by x;" + |
| "e = foreach d generate group, b.b1, c.c1;" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testMultiQueryMultipleReplicateJoinWithUnion() throws Exception { |
| // Replicate joins are from a split |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input2' as (x:int, y:int);" + |
| "c = load 'file:///tmp/input3' as (x:int, y:int);" + |
| "d = union a, b;" + |
| "e = filter c by y < 2;" + |
| "f = filter c by y > 5;" + |
| "g = join d by x, e by x using 'replicated';" + |
| "h = join g by d::x, f by x using 'replicated';" + |
| "store h into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9-OPTOFF.gld"); |
| |
| // Union is also from a split |
| query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = filter a by x == 2;" + |
| "c = load 'file:///tmp/input3' as (x:int, y:int);" + |
| "d = union a, b;" + |
| "e = filter c by y < 2;" + |
| "f = filter c by y > 5;" + |
| "g = join d by x, e by x using 'replicated';" + |
| "h = join g by d::x, f by x using 'replicated';" + |
| "store h into 'file:///tmp/pigoutput';"; |
| |
| resetScope(); |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testUnionStore() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "store c into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b PARALLEL 15;" + |
| "store c into 'file:///tmp/pigoutput';"; |
| // Union optimization should be turned off if PARALLEL clause is specified |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testUnionUnSupportedStore() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "store c into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| String oldSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS); |
| String oldUnSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName()); |
| // Plan should not have union optimization applied as PigStorage is unsupported |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); |
| |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, null); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, OrcStorage.class.getName()); |
| query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "store c into 'file:///tmp/pigoutput' using " + DummyStoreWithOutputFormat.class.getName() + "();"; |
| // Plan should not have union optimization applied as only ORC is supported |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld"); |
| |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null); |
| query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "store c into 'file:///tmp/pigoutput' using " + TestDummyStoreFunc.class.getName() + "();"; |
| // Plan should not have union optimization applied as supportsParallelWriteToStoreLocation returns false |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld"); |
| |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName()); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null); |
| query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "split a into b if x > 5, c if x == 7, d if x == 8, e otherwise;" + |
| "u1 = union onschema b, c;" + |
| "store u1 into 'file:///tmp/pigoutput/u1';" + |
| //TODO: multiple levels of split not merged |
| "u2 = union onschema a, b, c;" + |
| "store u2 into 'file:///tmp/pigoutput/u2';" + |
| "u3 = union onschema d, e;" + |
| "store u3 into 'file:///tmp/pigoutput/u3';" + |
| "j1 = join d by x, a by x using 'replicated';" + |
| "j1 = foreach j1 generate d::x as x, d::y as y;" + |
| "u4 = union onschema j1, a;" + |
| "store u4 into 'file:///tmp/pigoutput/u4';"; |
| |
| // Plan should have union optimization applied even for unsupported storefunc |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld"); |
| |
| // Restore the value |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, oldSupported); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, oldUnSupported); |
| } |
| |
| @Test |
| public void testUnionGroupBy() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:int);" + |
| "b = load 'file:///tmp/input' as (y:int, x:int);" + |
| "c = union onschema a, b;" + |
| "d = group c by x;" + |
| "e = foreach d generate group, SUM(c.y);" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld"); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testUnionJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + |
| "e = join c by x, d by x;" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld"); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld"); |
| } |
| |
| |
| @Test |
| public void testUnionReplicateJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + |
| "e = join c by x, d by x using 'replicated';" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| //TODO: PIG-3856 Not optimized |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld"); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld"); |
| |
| query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + |
| "e = join d by x, c by x using 'replicated';" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| // Optimized |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld"); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testUnionSkewedJoin() throws Exception { |
| // TODO: PIG-4574 optimization needs to be done for this as well. |
| // Requires changes in UnionOptimizer |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + |
| "e = join c by x, d by x using 'skewed';" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld"); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testUnionOrderby() throws Exception { |
| // TODO: PIG-4574 optimization needs to be done for this as well. |
| // Requires changes in UnionOptimizer |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "d = order c by x;" + |
| "store d into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld"); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld"); |
| } |
| |
| //TODO: PIG-3854 Limit is too convoluted and can be simplified. |
| @Test |
| public void testUnionLimit() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "d = limit c 1;" + |
| "store d into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld"); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testUnionSplit() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "split a into a1 if x > 100, a2 otherwise;" + |
| "c = union onschema a1, a2, b;" + |
| "split c into d if x > 500, e otherwise;" + |
| "store a2 into 'file:///tmp/pigoutput/a2';" + |
| "store d into 'file:///tmp/pigoutput/d';" + |
| "store e into 'file:///tmp/pigoutput/e';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testUnionUnion() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "d = load 'file:///tmp/input1' as (x:int, y:chararray);" + |
| "e = union onschema c, d;" + |
| "f = group e by x;" + |
| "store e into 'file:///tmp/pigoutput1';" + |
| "store f into 'file:///tmp/pigoutput2';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10-OPTOFF.gld"); |
| |
| } |
| |
| @Test |
| public void testUnionUnionStore() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "d = load 'file:///tmp/input1' as (x:int, y:chararray);" + |
| "e = union onschema c, d;" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testMultipleUnionSplitJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = filter a by x == 2;" + |
| "b1 = foreach b generate *;" + |
| "b2 = foreach b generate *;" + |
| "b3 = union onschema b1, b2;" + |
| "c = filter a by x == 3;" + |
| "c1 = foreach c generate y, x;" + |
| "c2 = foreach c generate y, x;" + |
| "c3 = union c1, c2;" + |
| "a1 = union onschema b3, c3;" + |
| "store a1 into 'file:///tmp/pigoutput1';" + |
| "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + |
| "e = join a1 by x, d by x using 'skewed';" + |
| "store e into 'file:///tmp/pigoutput2';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testUnionSplitReplicateJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = filter a by x == 2;" + |
| "c = union onschema a, b;" + |
| "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + |
| "e = join c by x, d by x using 'replicated';" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13-OPTOFF.gld"); |
| |
| query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = filter a by x == 2;" + |
| "c = union onschema a, b;" + |
| "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + |
| "e = join d by x, c by x using 'replicated';" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-14.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-14-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testUnionSplitSkewedJoin() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = filter a by x == 2;" + |
| "c = union onschema a, b;" + |
| "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + |
| "e = join c by x, d by x using 'skewed';" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld"); |
| |
| query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = filter a by x == 2;" + |
| "c = union onschema a, b;" + |
| "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + |
| "e = join d by x, c by x using 'skewed';" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testUnionScalar() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + |
| "e = filter c by x == d.x;" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-17.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-17-OPTOFF.gld"); |
| |
| query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + |
| "e = filter d by x == c.x;" + |
| "store e into 'file:///tmp/pigoutput';"; |
| |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-18.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-18-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testUnionSplitUnionStore() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input' as (x:int, y:chararray);" + |
| "b = load 'file:///tmp/input1' as (y:chararray, x:int);" + |
| "c = union onschema a, b;" + |
| "split c into d if x <= 5, e if x <= 10, f if x >10, g if y == '6';" + |
| "h = union onschema d, e;" + |
| "i = union onschema f, g;" + |
| "store h into 'file:///tmp/pigoutput/1';" + |
| "store i into 'file:///tmp/pigoutput/2';"; |
| |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-19.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-19-OPTOFF.gld"); |
| |
| // With a join in between |
| query = |
| "a = load 'file:///tmp/input' as (x:chararray);" + |
| "b = load 'file:///tmp/input' as (x:chararray);" + |
| "c = load 'file:///tmp/input' as (y:chararray);" + |
| "u1 = union onschema a, b;" + |
| "SPLIT u1 INTO r IF x != '', s OTHERWISE;" + |
| "d = JOIN r BY x LEFT, c BY y;" + |
| "u2 = UNION ONSCHEMA d, s;" + |
| "e = FILTER u2 BY x == '';" + |
| "f = FILTER u2 BY x == 'm';" + |
| "u3 = UNION ONSCHEMA e, f;" + |
| "store u3 into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-20.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-20-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testUnionSplitUnionLimitStore() throws Exception { |
| // Similar to previous testcase but a LIMIT at the end to test a non-store vertex group |
| String query = |
| "a = load 'file:///tmp/input' as (x:chararray);" + |
| "b = load 'file:///tmp/input' as (x:chararray);" + |
| "c = load 'file:///tmp/input' as (y:chararray);" + |
| "u1 = union onschema a, b;" + |
| "SPLIT u1 INTO r IF x != '', s OTHERWISE;" + |
| "d = JOIN r BY x LEFT, c BY y;" + |
| "u2 = UNION ONSCHEMA d, s;" + |
| "e = FILTER u2 BY x == '';" + |
| "f = FILTER u2 BY x == 'm';" + |
| "u3 = UNION ONSCHEMA e, f;" + |
| "SPLIT u3 INTO t if x != '', u OTHERWISE;" + |
| "v = LIMIT t 10;" + |
| "store v into 'file:///tmp/pigoutput';"; |
| |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21.gld"); |
| resetScope(); |
| setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21-OPTOFF.gld"); |
| } |
| |
| @Test |
| public void testRank() throws Exception { |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = rank a;" + |
| "store b into 'file:///tmp/pigoutput/d';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-1.gld"); |
| } |
| |
| @Test |
| public void testRankBy() throws Exception { |
| //TODO: Physical plan (affects both MR and Tez) has extra job before order by. Does not look right. |
| String query = |
| "a = load 'file:///tmp/input1' as (x:int, y:int);" + |
| "b = rank a by x;" + |
| "store b into 'file:///tmp/pigoutput/d';"; |
| |
| run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld"); |
| } |
| |
| private String getProperty(String property) { |
| return pigServer.getPigContext().getProperties().getProperty(property); |
| } |
| |
| private void setProperty(String property, String value) { |
| if (value == null) { |
| pigServer.getPigContext().getProperties().remove(property); |
| } else { |
| pigServer.getPigContext().getProperties().setProperty(property, value); |
| } |
| } |
| |
| private void run(String query, String expectedFile) throws Exception { |
| PhysicalPlan pp = Util.buildPp(pigServer, query); |
| TezLauncher launcher = new TezLauncher(); |
| pc.inExplain = true; |
| TezPlanContainer tezPlanContainer = launcher.compile(pp, pc); |
| |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| PrintStream ps = new PrintStream(baos); |
| TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer); |
| printer.visit(); |
| String compiledPlan = baos.toString(); |
| System.out.println(); |
| System.out.println("<<<" + compiledPlan + ">>>"); |
| |
| if (generate) { |
| FileOutputStream fos = new FileOutputStream(expectedFile); |
| fos.write(baos.toByteArray()); |
| fos.close(); |
| return; |
| } |
| FileInputStream fis = new FileInputStream(expectedFile); |
| byte[] b = new byte[MAX_SIZE]; |
| int len = fis.read(b); |
| fis.close(); |
| String goldenPlan = new String(b, 0, len); |
| if (goldenPlan.charAt(len-1) == '\n') { |
| goldenPlan = goldenPlan.substring(0, len-1); |
| } |
| |
| System.out.println("-------------"); |
| System.out.println("Golden"); |
| System.out.println("<<<" + goldenPlan + ">>>"); |
| System.out.println("-------------"); |
| |
| String goldenPlanClean = Util.standardizeNewline(goldenPlan).trim(); |
| String compiledPlanClean = Util.standardizeNewline(compiledPlan).trim(); |
| assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)), |
| TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean))); |
| } |
| |
| public static class TestDummyStoreFunc extends StoreFunc { |
| |
| @Override |
| public OutputFormat getOutputFormat() throws IOException { |
| return null; |
| } |
| |
| @Override |
| public void setStoreLocation(String location, Job job) |
| throws IOException { |
| } |
| |
| @Override |
| public void prepareToWrite(RecordWriter writer) throws IOException { |
| } |
| |
| @Override |
| public void putNext(Tuple t) throws IOException { |
| } |
| |
| @Override |
| public Boolean supportsParallelWriteToStoreLocation() { |
| return false; |
| } |
| |
| } |
| } |
| |