| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.test; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.WritableComparable; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.OutputCommitter; |
| import org.apache.hadoop.mapreduce.OutputFormat; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.pig.LoadFunc; |
| import org.apache.pig.PigConfiguration; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.ResourceSchema; |
| import org.apache.pig.StoreFunc; |
| import org.apache.pig.backend.executionengine.ExecJob; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.PigContext; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| @RunWith(JUnit4.class) |
| public class TestMultiQueryBasic { |
| |
| private static PigServer myPig; |
| |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| Util.copyFromLocalToLocal( |
| "test/org/apache/pig/test/data/passwd", "passwd"); |
| Util.copyFromLocalToLocal( |
| "test/org/apache/pig/test/data/passwd2", "passwd2"); |
| Properties props = new Properties(); |
| props.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, ""+true); |
| props.setProperty("tez.runtime.io.sort.mb", "10"); |
| myPig = new PigServer(Util.getLocalTestMode(), props); |
| } |
| |
| @AfterClass |
| public static void tearDownAfterClass() throws Exception { |
| Util.deleteFile(new PigContext(Util.getLocalTestMode(), new Properties()), "passwd"); |
| Util.deleteFile(new PigContext(Util.getLocalTestMode(), new Properties()), "passwd2"); |
| deleteOutputFiles(); |
| } |
| |
| @Before |
| public void setUp() throws Exception { |
| deleteOutputFiles(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| |
| } |
| |
| |
| @Test |
| public void testMultiQueryWithTwoStores2() throws Exception { |
| |
| System.out.println("===== multi-query with 2 stores (2) ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'passwd' " + |
| "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); |
| myPig.registerQuery("b = filter a by uid > 5;"); |
| myPig.registerQuery("store b into 'output1';"); |
| myPig.registerQuery("c = group b by gid;"); |
| myPig.registerQuery("store c into 'output2';"); |
| |
| List<ExecJob> jobs = myPig.executeBatch(); |
| assertTrue(jobs.size() == 2); |
| |
| for (ExecJob job : jobs) { |
| assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); |
| } |
| } |
| |
| @Test |
| public void testMultiQueryWithTwoLoads2() throws Exception { |
| |
| System.out.println("===== multi-query with two loads (2) ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'passwd' " + |
| "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); |
| myPig.registerQuery("b = load 'passwd2' " + |
| "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); |
| myPig.registerQuery("c = filter a by uid > 5;"); |
| myPig.registerQuery("d = filter b by uid > 10;"); |
| myPig.registerQuery("store c into 'output1';"); |
| myPig.registerQuery("store d into 'output2';"); |
| myPig.registerQuery("e = cogroup c by uid, d by uid;"); |
| myPig.registerQuery("store e into 'output3';"); |
| |
| myPig.executeBatch(); |
| myPig.discardBatch(); |
| } |
| |
| @Test |
| public void testMultiQueryPhase3BaseCase2() throws Exception { |
| |
| System.out.println("===== multi-query phase 3 base case (2) ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'passwd' " + |
| "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); |
| myPig.registerQuery("b = filter a by uid < 5;"); |
| myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); |
| myPig.registerQuery("d = filter a by uid >= 10;"); |
| myPig.registerQuery("b1 = group b by gid;"); |
| myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);"); |
| myPig.registerQuery("b3 = filter b2 by $1 > 5;"); |
| myPig.registerQuery("store b3 into 'output1';"); |
| myPig.registerQuery("c1 = group c by gid;"); |
| myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);"); |
| myPig.registerQuery("store c2 into 'output2';"); |
| myPig.registerQuery("d1 = group d by gid;"); |
| myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);"); |
| myPig.registerQuery("store d2 into 'output3';"); |
| |
| List<ExecJob> jobs = myPig.executeBatch(); |
| |
| for (ExecJob job : jobs) { |
| assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); |
| } |
| } |
| |
| @Test |
| public void testMultiQueryPhase3WithoutCombiner2() throws Exception { |
| |
| System.out.println("===== multi-query phase 3 without combiner (2) ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'passwd' " + |
| "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); |
| myPig.registerQuery("b = filter a by uid < 5;"); |
| myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); |
| myPig.registerQuery("d = filter a by uid >= 10;"); |
| myPig.registerQuery("b1 = group b by gid;"); |
| myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid) + SUM(b.uid);"); |
| myPig.registerQuery("b3 = filter b2 by $1 > 5;"); |
| myPig.registerQuery("store b3 into 'output1';"); |
| myPig.registerQuery("c1 = group c by gid;"); |
| myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid) - COUNT(c.uid);"); |
| myPig.registerQuery("store c2 into 'output2';"); |
| myPig.registerQuery("d1 = group d by gid;"); |
| myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);"); |
| myPig.registerQuery("store d2 into 'output3';"); |
| |
| List<ExecJob> jobs = myPig.executeBatch(); |
| |
| for (ExecJob job : jobs) { |
| assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); |
| } |
| } |
| |
| @Test |
| public void testMultiQueryPhase3WithMixedCombiner2() throws Exception { |
| |
| System.out.println("===== multi-query phase 3 with mixed combiner (2) ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'passwd' " + |
| "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); |
| myPig.registerQuery("b = filter a by uid < 5;"); |
| myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); |
| myPig.registerQuery("d = filter a by uid >= 10;"); |
| myPig.registerQuery("b1 = group b by gid;"); |
| myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);"); |
| myPig.registerQuery("b3 = filter b2 by $1 > 5;"); |
| myPig.registerQuery("store b3 into 'output1';"); |
| myPig.registerQuery("c1 = group c by gid;"); |
| myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);"); |
| myPig.registerQuery("store c2 into 'output2';"); |
| myPig.registerQuery("d1 = group d by gid;"); |
| myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);"); |
| myPig.registerQuery("store d2 into 'output3';"); |
| |
| List<ExecJob> jobs = myPig.executeBatch(); |
| assertEquals(3, jobs.size()); |
| |
| for (ExecJob job : jobs) { |
| assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); |
| } |
| } |
| |
| @Test |
| public void testMultiQueryPhase3WithDifferentMapDataTypes2() throws Exception { |
| |
| System.out.println("===== multi-query phase 3 with different map datatypes (2) ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'passwd' " + |
| "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); |
| myPig.registerQuery("b = filter a by uid < 5;"); |
| myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); |
| myPig.registerQuery("d = filter a by uid >= 10;"); |
| myPig.registerQuery("b1 = group b by gid;"); |
| myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);"); |
| myPig.registerQuery("b3 = filter b2 by $1 > 5;"); |
| myPig.registerQuery("store b3 into 'output1';"); |
| myPig.registerQuery("c1 = group c by $1;"); |
| myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);"); |
| myPig.registerQuery("store c2 into 'output2';"); |
| myPig.registerQuery("d1 = group d by $1;"); |
| myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);"); |
| myPig.registerQuery("store d2 into 'output3';"); |
| |
| List<ExecJob> jobs = myPig.executeBatch(); |
| assertEquals(3, jobs.size()); |
| |
| for (ExecJob job : jobs) { |
| assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); |
| } |
| } |
| |
| @Test |
| public void testMultiQueryPhase3WithDifferentMapDataTypes3() throws Exception { |
| |
| System.out.println("===== multi-query phase 3 with different map datatypes (3) ====="); |
| |
| myPig.setBatchOn(); |
| String[] inputData = {"john\t20\t3.4", |
| "john\t25\t3.4" , |
| "henry\t23\t3.9" , |
| "adam\t54\t2.9" , |
| "henry\t21\t3.9"}; |
| Util.createLocalInputFile("queryInput.txt", inputData); |
| |
| myPig.registerQuery("a = load 'queryInput.txt' " + |
| "as (name:chararray, age:int, gpa:double);"); |
| myPig.registerQuery("b = group a all;"); |
| myPig.registerQuery("c = foreach b generate group, COUNT(a);"); |
| myPig.registerQuery("store c into 'output1';"); |
| myPig.registerQuery("d = group a by (name, gpa);"); |
| myPig.registerQuery("e = foreach d generate flatten(group), MIN(a.age);"); |
| myPig.registerQuery("store e into 'output2';"); |
| |
| myPig.executeBatch(); |
| |
| myPig.registerQuery("a = load 'output1' as (grp:chararray, cnt:long) ;"); |
| Iterator<Tuple> it = myPig.openIterator("a"); |
| assertEquals(Util.getPigConstant("('all', 5l)"), it.next()); |
| assertFalse(it.hasNext()); |
| |
| myPig.registerQuery("a = load 'output2' as (name:chararray, gpa:double, age:int);"); |
| it = myPig.openIterator("a"); |
| int i = 0; |
| Map<String, Tuple> expectedResults = new HashMap<String, Tuple>(); |
| expectedResults.put("john", (Tuple) Util.getPigConstant("('john',3.4,20)")); |
| expectedResults.put("adam", (Tuple) Util.getPigConstant("('adam',2.9,54)")); |
| expectedResults.put("henry", (Tuple) Util.getPigConstant("('henry',3.9,21)")); |
| while(it.hasNext()) { |
| Tuple t = it.next(); |
| i++; |
| assertEquals(expectedResults.get(t.get(0)), t); |
| } |
| assertEquals(3, i); |
| } |
| |
| @Test |
| public void testMultiQueryPhase3StreamingInReducer2() throws Exception { |
| |
| System.out.println("===== multi-query phase 3 with streaming in reducer (2) ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("A = load 'passwd';"); |
| myPig.registerQuery("Split A into A1 if $2 > 5, A2 if $2 >= 5;"); |
| myPig.registerQuery("Split A1 into A3 if $0 > 'm', A4 if $0 >= 'm';"); |
| myPig.registerQuery("B = group A3 by $2;"); |
| myPig.registerQuery("C = foreach B generate flatten(A3);"); |
| myPig.registerQuery("D = stream B through `cat`;"); |
| myPig.registerQuery("store D into 'output1';"); |
| myPig.registerQuery("E = group A4 by $2;"); |
| myPig.registerQuery("F = foreach E generate group, COUNT(A4);"); |
| myPig.registerQuery("store F into 'output2';"); |
| myPig.registerQuery("G = group A1 by $2;"); |
| myPig.registerQuery("H = foreach G generate group, COUNT(A1);"); |
| myPig.registerQuery("store H into 'output3';"); |
| |
| List<ExecJob> jobs = myPig.executeBatch(); |
| assertEquals(3, jobs.size()); |
| |
| for (ExecJob job : jobs) { |
| assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); |
| } |
| } |
| |
| @Test |
| public void testMultiQueryWithPigMixL12_2() throws Exception { |
| |
| System.out.println("===== multi-query with PigMix L12 (2) ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'passwd' " + |
| "using PigStorage(':') as (uname, passwd, uid, gid);"); |
| myPig.registerQuery("b = foreach a generate uname, passwd, uid, gid;"); |
| myPig.registerQuery("split b into c1 if uid > 5, c2 if uid <= 5 ;"); |
| myPig.registerQuery("split c1 into d1 if gid < 5, d2 if gid >= 5;"); |
| myPig.registerQuery("e = group d1 by uname;"); |
| myPig.registerQuery("e1 = foreach e generate group, MAX(d1.uid);"); |
| myPig.registerQuery("store e1 into 'output1';"); |
| myPig.registerQuery("f = group c2 by uname;"); |
| myPig.registerQuery("f1 = foreach f generate group, SUM(c2.gid);"); |
| myPig.registerQuery("store f1 into 'output2';"); |
| myPig.registerQuery("g = group d2 by uname;"); |
| myPig.registerQuery("g1 = foreach g generate group, COUNT(d2);"); |
| myPig.registerQuery("store g1 into 'output3';"); |
| |
| List<ExecJob> jobs = myPig.executeBatch(); |
| assertEquals(3, jobs.size()); |
| |
| for (ExecJob job : jobs) { |
| assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); |
| } |
| } |
| |
| @Test |
| public void testMultiQueryWithCoGroup_2() throws Exception { |
| |
| System.out.println("===== multi-query with CoGroup (2) ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'passwd' " + |
| "using PigStorage(':') as (uname, passwd, uid, gid);"); |
| myPig.registerQuery("store a into 'output1' using BinStorage();"); |
| myPig.registerQuery("b = load 'output1' using BinStorage() as (uname, passwd, uid, gid);"); |
| myPig.registerQuery("c = load 'passwd2' " + |
| "using PigStorage(':') as (uname, passwd, uid, gid);"); |
| myPig.registerQuery("d = cogroup b by (uname, uid) inner, c by (uname, uid) inner;"); |
| myPig.registerQuery("e = foreach d generate flatten(b), flatten(c);"); |
| myPig.registerQuery("store e into 'output2';"); |
| |
| List<ExecJob> jobs = myPig.executeBatch(); |
| assertTrue(jobs.size() == 2); |
| |
| for (ExecJob job : jobs) { |
| assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); |
| } |
| } |
| |
| @Test |
| public void testMultiQueryWithFJ_2() throws Exception { |
| |
| System.out.println("===== multi-query with FJ (2) ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'passwd' " + |
| "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); |
| myPig.registerQuery("b = load 'passwd' " + |
| "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); |
| myPig.registerQuery("c = filter a by uid > 5;"); |
| myPig.registerQuery("store c into 'output1';"); |
| myPig.registerQuery("d = filter b by gid > 10;"); |
| myPig.registerQuery("store d into 'output2';"); |
| myPig.registerQuery("e = join c by gid, d by gid using \'repl\';"); |
| myPig.registerQuery("store e into 'output3';"); |
| |
| List<ExecJob> jobs = myPig.executeBatch(); |
| assertEquals(3, jobs.size()); |
| |
| for (ExecJob job : jobs) { |
| assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); |
| } |
| } |
| |
| @Test |
| public void testMultiQueryWithIntermediateStores_2() throws Exception { |
| |
| System.out.println("===== multi-query with intermediate stores (2) ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'passwd' " + |
| "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); |
| myPig.registerQuery("store a into 'output1';"); |
| myPig.registerQuery("b = load 'output1' using PigStorage(':'); "); |
| myPig.registerQuery("store b into 'output2';"); |
| |
| List<ExecJob> jobs = myPig.executeBatch(); |
| assertTrue(jobs.size() == 2); |
| |
| for (ExecJob job : jobs) { |
| assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); |
| } |
| } |
| |
| @Test |
| public void testMultiQueryWithSplitInMapAndMultiMerge() throws Exception { |
| |
| // clean up any existing dirs/files |
| String[] toClean = {"tmwsimam-input.txt", "foo1", "foo2", "foo3", "foo4" }; |
| for (int j = 0; j < toClean.length; j++) { |
| Util.deleteFile(new PigContext(Util.getLocalTestMode(), new Properties()), toClean[j]); |
| } |
| |
| // the data below is tab delimited |
| String[] inputData = { |
| "1 a b e f i j m n", |
| "2 a b e f i j m n", |
| "3 c d g h k l o p", |
| "4 c d g h k l o p" }; |
| Util.createLocalInputFile("tmwsimam-input.txt", inputData); |
| String query = |
| "A = LOAD 'tmwsimam-input.txt' " + |
| "as (f0:chararray, f1:chararray, f2:chararray, f3:chararray, " + |
| "f4:chararray, f5:chararray, f6:chararray, f7:chararray, f8:chararray); " + |
| "B = FOREACH A GENERATE f0, f1, f2, f3, f4;" + |
| "B1 = foreach B generate f0, f1, f2;" + |
| "C = GROUP B1 BY (f1, f2);" + |
| "STORE C into 'foo1' using BinStorage();" + |
| "B2 = FOREACH B GENERATE f0, f3, f4;" + |
| "E = GROUP B2 BY (f3, f4);" + |
| "STORE E into 'foo2' using BinStorage();" + |
| "F = FOREACH A GENERATE f0, f5, f6, f7, f8;" + |
| "F1 = FOREACH F GENERATE f0, f5, f6;" + |
| "G = GROUP F1 BY (f5, f6);" + |
| "STORE G into 'foo3' using BinStorage();" + |
| "F2 = FOREACH F GENERATE f0, f7, f8;" + |
| "I = GROUP F2 BY (f7, f8);" + |
| "STORE I into 'foo4' using BinStorage();" + |
| "explain;"; |
| myPig.setBatchOn(); |
| Util.registerMultiLineQuery(myPig, query); |
| myPig.executeBatch(); |
| |
| String templateLoad = "a = load 'foo' using BinStorage();"; |
| |
| Map<Tuple, DataBag> expectedResults = new HashMap<Tuple, DataBag>(); |
| expectedResults.put((Tuple)Util.getPigConstant("('a','b')"), |
| (DataBag)Util.getPigConstant("{('1','a','b'),('2','a','b')}")); |
| expectedResults.put((Tuple)Util.getPigConstant("('c','d')"), |
| (DataBag)Util.getPigConstant("{('3','c','d'),('4','c','d')}")); |
| expectedResults.put((Tuple)Util.getPigConstant("('e','f')"), |
| (DataBag)Util.getPigConstant("{('1','e','f'),('2','e','f')}")); |
| expectedResults.put((Tuple)Util.getPigConstant("('g','h')"), |
| (DataBag)Util.getPigConstant("{('3','g','h'),('4','g','h')}")); |
| expectedResults.put((Tuple)Util.getPigConstant("('i','j')"), |
| (DataBag)Util.getPigConstant("{('1','i','j'),('2','i','j')}")); |
| expectedResults.put((Tuple)Util.getPigConstant("('k','l')"), |
| (DataBag)Util.getPigConstant("{('3','k','l'),('4','k','l')}")); |
| expectedResults.put((Tuple)Util.getPigConstant("('m','n')"), |
| (DataBag)Util.getPigConstant("{('1','m','n'),('2','m','n')}")); |
| expectedResults.put((Tuple)Util.getPigConstant("('o','p')"), |
| (DataBag)Util.getPigConstant("{('3','o','p'),('4','o','p')}")); |
| String[] outputDirs = { "foo1", "foo2", "foo3", "foo4" }; |
| for(int k = 0; k < outputDirs.length; k++) { |
| myPig.registerQuery(templateLoad.replace("foo", outputDirs[k])); |
| Iterator<Tuple> it = myPig.openIterator("a"); |
| int numTuples = 0; |
| while(it.hasNext()) { |
| Tuple t = it.next(); |
| assertEquals(expectedResults.get(t.get(0)), t.get(1)); |
| numTuples++; |
| } |
| assertEquals(numTuples, 2); |
| } |
| // cleanup |
| for (int j = 0; j < toClean.length; j++) { |
| Util.deleteFile(new PigContext(Util.getLocalTestMode(), new Properties()), toClean[j]); |
| } |
| |
| } |
| |
| @Test |
| public void testMultiQueryWithTwoStores2Execs() throws Exception { |
| |
| System.out.println("===== multi-query with 2 stores execs ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'passwd' " + |
| "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); |
| myPig.registerQuery("b = filter a by uid > 5;"); |
| myPig.executeBatch(); |
| myPig.registerQuery("store b into 'output1';"); |
| myPig.executeBatch(); |
| myPig.registerQuery("c = group b by gid;"); |
| myPig.registerQuery("store c into 'output2';"); |
| |
| myPig.executeBatch(); |
| myPig.discardBatch(); |
| } |
| |
| @Test |
| public void testMultiQueryWithThreeStores2() throws Exception { |
| |
| System.out.println("===== multi-query with 3 stores (2) ====="); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'passwd' " + |
| "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); |
| myPig.registerQuery("b = filter a by uid > 5;"); |
| myPig.registerQuery("store b into 'output1';"); |
| myPig.registerQuery("c = filter b by uid > 10;"); |
| myPig.registerQuery("store c into 'output2';"); |
| myPig.registerQuery("d = filter c by uid > 15;"); |
| myPig.registerQuery("store d into 'output3';"); |
| |
| myPig.executeBatch(); |
| myPig.discardBatch(); |
| } |
| |
| /** |
| * Test that pig calls checkOutputSpecs() method of the OutputFormat (if the |
| * StoreFunc defines an OutputFormat as the return value of |
| * {@link StoreFunc#getStorePreparationClass()} |
| * @throws IOException |
| */ |
| @Test |
| public void testMultiStoreWithOutputFormat() throws Exception { |
| Util.createLocalInputFile("input.txt", new String[] {"hello", "bye"}); |
| String query = "a = load 'input.txt';" + |
| "b = filter a by $0 < 10;" + |
| "store b into 'output1' using "+DUMMY_STORE_WITH_OUTPUTFORMAT_CLASS+"();" + |
| "c = group a by $0;" + |
| "d = foreach c generate group, COUNT(a.$0);" + |
| "store d into 'output2' using "+DUMMY_STORE_WITH_OUTPUTFORMAT_CLASS+"();" ; |
| myPig.setBatchOn(); |
| Util.registerMultiLineQuery(myPig, query); |
| myPig.executeBatch(); |
| |
| // check that files were created as a result of the |
| // checkOutputSpecs() method of the OutputFormat being called |
| FileSystem fs = FileSystem.getLocal(new Configuration()); |
| assertEquals(true, fs.exists(new Path("output1_checkOutputSpec_test"))); |
| assertEquals(true, fs.exists(new Path("output2_checkOutputSpec_test"))); |
| |
| Util.deleteFile(new PigContext(Util.getLocalTestMode(), new Properties()), "output1_checkOutputSpec_test"); |
| Util.deleteFile(new PigContext(Util.getLocalTestMode(), new Properties()), "output2_checkOutputSpec_test"); |
| } |
| |
| /** |
| * Test that MultiQuery optimization won't use user's output for |
| * ScalarExpression (and get empty output) at the end |
| */ |
| @Test |
| public void testMultiQueryWithScalarExpression() throws Exception { |
| |
| System.out.println("===== multi-query with ScalarExpression ====="); |
| |
| String[] inputData = {"john","henry", "adam"}; |
| Util.createLocalInputFile("queryInput.txt", inputData); |
| |
| myPig.setBatchOn(); |
| |
| myPig.registerQuery("a = load 'queryInput.txt' using PigStorage() as (uname:chararray);"); |
| myPig.registerQuery("b = group a ALL;"); |
| myPig.registerQuery("c = foreach b generate COUNT(a) as count;"); |
| myPig.registerQuery("store c into 'output1';"); |
| myPig.registerQuery("z = load 'queryInput.txt' using PigStorage() as (uname:chararray);"); |
| myPig.registerQuery("y = foreach z generate uname, c.count;"); |
| myPig.registerQuery("store y into 'output2';"); |
| |
| List<ExecJob> jobs = myPig.executeBatch(); |
| |
| for (ExecJob job : jobs) { |
| assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); |
| } |
| myPig.registerQuery("aa = load 'output2' as (uname:chararray, cnt:int) ;"); |
| Iterator<Tuple> it = myPig.openIterator("aa"); |
| int i = 0; |
| while(it.hasNext()) { |
| Tuple t = it.next(); |
| i++; |
| assertEquals(3, t.get(1)); |
| } |
| assertEquals(3, i); |
| } |
| |
| private static final String DUMMY_STORE_WITH_OUTPUTFORMAT_CLASS |
| = "org.apache.pig.test.TestMultiQueryBasic\\$DummyStoreWithOutputFormat"; |
| |
| public static class DummyStoreWithOutputFormat extends StoreFunc { |
| |
| public DummyStoreWithOutputFormat() { |
| } |
| |
| @Override |
| public void putNext(Tuple f) throws IOException { |
| |
| } |
| |
| @Override |
| public void checkSchema(ResourceSchema s) throws IOException { |
| |
| } |
| |
| @Override |
| public org.apache.hadoop.mapreduce.OutputFormat getOutputFormat() |
| throws IOException { |
| return new DummyOutputFormat(); |
| } |
| |
| @Override |
| public void prepareToWrite( |
| org.apache.hadoop.mapreduce.RecordWriter writer) |
| throws IOException { |
| |
| } |
| |
| @Override |
| public String relToAbsPathForStoreLocation(String location, Path curDir) |
| throws IOException { |
| return LoadFunc.getAbsolutePath(location, curDir); |
| } |
| |
| @Override |
| public void setStoreLocation(String location, Job job) |
| throws IOException { |
| Configuration conf = job.getConfiguration(); |
| conf.set(MRConfiguration.OUTPUT_DIR, location); |
| |
| } |
| |
| @Override |
| public void setStoreFuncUDFContextSignature(String signature) { |
| } |
| |
| } |
| |
| @SuppressWarnings({ "unchecked" }) |
| public static class DummyOutputFormat |
| extends OutputFormat<WritableComparable, Tuple> { |
| |
| public DummyOutputFormat() { |
| |
| } |
| @Override |
| public void checkOutputSpecs(JobContext context) throws IOException, |
| InterruptedException { |
| Configuration conf = context.getConfiguration(); |
| FileSystem fs = FileSystem.get(conf); |
| // create a file to test that this method got called |
| fs.create(new Path(conf.get(MRConfiguration.OUTPUT_DIR) + "_checkOutputSpec_test")); |
| } |
| |
| @Override |
| public OutputCommitter getOutputCommitter(TaskAttemptContext context) |
| throws IOException, InterruptedException { |
| return null; |
| } |
| @Override |
| public org.apache.hadoop.mapreduce.RecordWriter<WritableComparable, Tuple> getRecordWriter( |
| TaskAttemptContext context) throws IOException, |
| InterruptedException { |
| return null; |
| } |
| |
| } |
| |
| // -------------------------------------------------------------------------- |
| // Helper methods |
| |
| private static void deleteOutputFiles() { |
| Util.deleteDirectory(new File("output1")); |
| Util.deleteDirectory(new File("output2")); |
| Util.deleteDirectory(new File("output3")); |
| Util.deleteDirectory(new File("output4")); |
| Util.deleteDirectory(new File("output5")); |
| } |
| } |