blob: 0d723f7e9ffb2210795b250e67f6cc840f2bcc8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.ExecType;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
public class TestMapSideCogroup {
private static final String INPUT_FILE1 = "testCogrpInput1.txt";
private static final String INPUT_FILE2 = "testCogrpInput2.txt";
private static final String INPUT_FILE3 = "testCogrpInput3.txt";
private static final String INPUT_FILE4 = "testCogrpInput4.txt";
private static final String INPUT_FILE5 = "testCogrpInput5.txt";
private static final String EMPTY_FILE = "empty.txt";
private static final String DATA_WITH_NULL_KEYS = "null.txt";
private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@Before
public void setUp() throws Exception {
Util.deleteFile(cluster, INPUT_FILE1);
Util.deleteFile(cluster, INPUT_FILE2);
Util.deleteFile(cluster, INPUT_FILE3);
Util.deleteFile(cluster, INPUT_FILE4);
Util.deleteFile(cluster, INPUT_FILE5);
Util.deleteFile(cluster, EMPTY_FILE);
int LOOP_SIZE = 3;
String[] input = new String[LOOP_SIZE*LOOP_SIZE];
int k = 0;
for(int i = 1; i <= LOOP_SIZE; i++) {
String si = i + "";
for(int j=1;j<=LOOP_SIZE;j++)
input[k++] = si + "\t" + j;
}
String[] input2 = new String[LOOP_SIZE*2*LOOP_SIZE];
k = 0;
for(int i = LOOP_SIZE + 1; i <= 3*LOOP_SIZE ; i++) {
String si = i + "";
for(int j=1;j<=LOOP_SIZE;j++)
input2[k++] = si + "\t" + j;
}
String[] input3 = new String[LOOP_SIZE*LOOP_SIZE];
k = 0;
for(int i = LOOP_SIZE ; i < 2*LOOP_SIZE ; i++) {
String si = i + "";
for(int j=1;j<=LOOP_SIZE;j++)
input3[k++] = si + "\t" + j;
}
String[] dataWithNullKeys = new String[LOOP_SIZE*LOOP_SIZE];
k = 0;
for(int i = 1; i <= LOOP_SIZE ; i++) {
String si;
if(i == 1)
si = "";
else
si = i + "";
for(int j=1;j<=LOOP_SIZE;j++){
dataWithNullKeys[k++] = si + "\t" + j;
}
}
Util.createInputFile(cluster, INPUT_FILE1, input);
Util.createInputFile(cluster, INPUT_FILE2, input);
Util.createInputFile(cluster, INPUT_FILE3, input);
Util.createInputFile(cluster, INPUT_FILE4, input2);
Util.createInputFile(cluster, INPUT_FILE5, input3);
Util.createInputFile(cluster, EMPTY_FILE, new String[]{});
Util.createInputFile(cluster, DATA_WITH_NULL_KEYS, dataWithNullKeys);
}
@After
public void tearDown() throws Exception {
Util.deleteFile(cluster, INPUT_FILE1);
Util.deleteFile(cluster, INPUT_FILE2);
Util.deleteFile(cluster, INPUT_FILE3);
Util.deleteFile(cluster, INPUT_FILE4);
Util.deleteFile(cluster, INPUT_FILE5);
Util.deleteFile(cluster, EMPTY_FILE);
Util.deleteFile(cluster, DATA_WITH_NULL_KEYS);
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
@Test
public void testCompilation(){
try{
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
String query = "A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);" +
"B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
"D = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
"C = cogroup A by id, B by id, D by id using 'merge';" +
"store C into 'output';";
LogicalPlan lp = Util.buildLp(pigServer, query);
Operator op = lp.getSinks().get(0);
LOCogroup cogrp = (LOCogroup)lp.getPredecessors(op).get(0);
assertEquals(LOCogroup.GROUPTYPE.MERGE, cogrp.getGroupType());
PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties());
pc.connect();
PhysicalPlan phyP = Util.buildPp(pigServer, query);
PhysicalOperator phyOp = phyP.getLeaves().get(0);
assertTrue(phyOp instanceof POStore);
phyOp = phyOp.getInputs().get(0);
assertTrue(phyOp instanceof POMergeCogroup);
MROperPlan mrPlan = Util.buildMRPlan(phyP,pc);
assertEquals(2,mrPlan.size());
Iterator<MapReduceOper> itr = mrPlan.iterator();
List<MapReduceOper> opers = new ArrayList<MapReduceOper>();
opers.add(itr.next());
opers.add(itr.next());
//Order of entrySet is not guaranteed with jdk1.7
Collections.sort(opers);
assertTrue(opers.get(0).reducePlan.isEmpty());
assertFalse(opers.get(0).mapPlan.isEmpty());
assertFalse(opers.get(1).reducePlan.isEmpty());
assertFalse(opers.get(1).mapPlan.isEmpty());
} catch(Exception e){
e.printStackTrace();
fail("Compilation of merged cogroup failed.");
}
}
// @Test // PIG-2018
// public void testFailure1() throws Exception{
// PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
// String query = "A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);" +
// "E = group A by id;" +
// "B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
// "D = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
// "C = cogroup E by A.id, B by id, D by id using 'merge';" +
// "store C into 'output';";
// LogicalPlan lp = Util.buildLp(pigServer, query);
// Operator op = lp.getSinks().get(0);
// LOCogroup cogrp = (LOCogroup)lp.getPredecessors(op).get(0);
// assertEquals(LOCogroup.GROUPTYPE.MERGE, cogrp.getGroupType());
//
// PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties());
// pc.connect();
// boolean exceptionCaught = false;
// try{
// Util.buildPp(pigServer, query);
// }catch (java.lang.reflect.InvocationTargetException e){
// FrontendException ex = (FrontendException)e.getTargetException();
// assertEquals(1103,ex.getErrorCode());
// exceptionCaught = true;
// }
// assertTrue(exceptionCaught);
// }
@Test
public void testFailure2() throws Exception{
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
String query = "A = LOAD 'data1' using "+ DummyCollectableLoader.class.getName() +"() as (id, name, grade);" +
"B = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
"D = LOAD 'data2' using "+ DummyIndexableLoader.class.getName() +"() as (id, name, grade);" +
"C = cogroup A by id inner, B by id, D by id inner using 'merge';" +
"store C into 'output';";
LogicalPlan lp = Util.buildLp(pigServer, query);
Operator op = lp.getSinks().get(0);
LOCogroup cogrp = (LOCogroup)lp.getPredecessors(op).get(0);
assertEquals(LOCogroup.GROUPTYPE.MERGE, cogrp.getGroupType());
PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties());
pc.connect();
boolean exceptionCaught = false;
try{
Util.buildPp(pigServer, query);
}catch (FrontendException e){
exceptionCaught = true;
}
assertTrue(exceptionCaught);
}
@Test
public void testSimple() throws Exception{
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray,c2:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray,c2:int);");
DataBag dbMergeCogrp = BagFactory.getInstance().newDefaultBag();
pigServer.registerQuery("C = cogroup A by c1, B by c1 using 'merge';");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
dbMergeCogrp.add(t);
}
String[] results = new String[]{
"(1,{(1,1),(1,2),(1,3)},{(1,1),(1,2),(1,3)})",
"(2,{(2,2),(2,1),(2,3)},{(2,1),(2,2),(2,3)})",
"(3,{(3,3),(3,2),(3,1)},{(3,1),(3,2),(3,3)})"
};
assertEquals(3, dbMergeCogrp.size());
Iterator<Tuple> itr = dbMergeCogrp.iterator();
for(int i=0; i<3; i++){
assertEquals(itr.next().toString(), results[i]);
}
assertFalse(itr.hasNext());
}
@Test
public void test3Way() throws Exception{
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray,c2:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray,c2:int);");
pigServer.registerQuery("E = LOAD '" + INPUT_FILE3 + "' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray,c2:int);");
DataBag dbMergeCogrp = BagFactory.getInstance().newDefaultBag();
pigServer.registerQuery("C = cogroup A by c1, B by c1, E by c1 using 'merge';");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
dbMergeCogrp.add(t);
}
String[] results = new String[]{
"(1,{(1,1),(1,2),(1,3)},{(1,1),(1,2),(1,3)},{(1,1),(1,2),(1,3)})",
"(2,{(2,2),(2,1),(2,3)},{(2,1),(2,2),(2,3)},{(2,1),(2,2),(2,3)})",
"(3,{(3,2),(3,3),(3,1)},{(3,1),(3,2),(3,3)},{(3,1),(3,2),(3,3)})"
};
assertEquals(3, dbMergeCogrp.size());
Iterator<Tuple> itr = dbMergeCogrp.iterator();
for(int i=0; i<3; i++){
assertEquals(itr.next().toString(), results[i]);
}
assertFalse(itr.hasNext());
}
@Test
public void testMultiSplits() throws Exception{
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "," + INPUT_FILE4 + "' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray,c2:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray,c2:int);");
List<Tuple> dbMergeCogrp = new ArrayList<Tuple>();
pigServer.registerQuery("C = cogroup A by c1, B by c1 using 'merge';");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
dbMergeCogrp.add(t);
}
String[] results = new String[]{
"(4,{(4,1),(4,2),(4,3)},{(4,1),(4,2),(4,3)})",
"(5,{(5,2),(5,1),(5,3)},{(5,1),(5,2),(5,3)})",
"(6,{(6,1),(6,2),(6,3)},{})",
"(7,{(7,1),(7,2),(7,3)},{})",
"(8,{(8,1),(8,2),(8,3)},{})",
"(9,{(9,1),(9,2),(9,3)},{})",
"(1,{(1,1),(1,2),(1,3)},{})",
"(2,{(2,1),(2,2),(2,3)},{})",
"(3,{(3,3),(3,2),(3,1)},{(3,1),(3,2),(3,3)})"
};
List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(results);
//We need sort dbMergeCogrp because the result is different in sequence between spark and other mode when
//multiple files are loaded(LOAD INPUT_FILE1,INPUT_FILE4...)
for (Tuple t : dbMergeCogrp) {
Util.convertBagToSortedBag(t);
}
for (Tuple t : expected) {
Util.convertBagToSortedBag(t);
}
Collections.sort(dbMergeCogrp);
Collections.sort(expected);
assertEquals(dbMergeCogrp.size(), expected.size());
//Since TestMapSideCogroup.DummyIndexableLoader.getNext() does not
//apply schema for each input tuple,Util#checkQueryOutputsAfterSortRecursive fails to assert.
// The schema for C is (int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,int),(chararray,int),(chararray,int)}).
//But the schema for result "dbMergeCogrp" is (int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,chararray),(chararray,chararray),(chararray,chararray)})
Iterator<Tuple> itr = dbMergeCogrp.iterator();
for (int i = 0; i < dbMergeCogrp.size(); i++) {
assertEquals(itr.next().toString(), expected.get(i).toString());
}
}
@Test
public void testCogrpOnMultiKeys() throws Exception{
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray,c2:chararray);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray,c2:chararray);");
DataBag dbMergeCogrp = BagFactory.getInstance().newDefaultBag();
pigServer.registerQuery("C = cogroup A by (c1,c2) , B by (c1,c2) using 'merge' ;");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
dbMergeCogrp.add(t);
}
String[] results = new String[]{
"((1,1),{(1,1)},{(1,1)})",
"((1,2),{(1,2)},{(1,2)})",
"((1,3),{(1,3)},{(1,3)})",
"((2,1),{(2,1)},{(2,1)})",
"((2,2),{(2,2)},{(2,2)})",
"((2,3),{(2,3)},{(2,3)})",
"((3,1),{(3,1)},{(3,1)})",
"((3,2),{(3,2)},{(3,2)})",
"((3,3),{(3,3)},{(3,3)})"
};
assertEquals(9, dbMergeCogrp.size());
Iterator<Tuple> itr = dbMergeCogrp.iterator();
for(int i=0; i<9; i++){
assertEquals(itr.next().toString(), results[i]);
}
assertFalse(itr.hasNext());
}
@Test
public void testEmptyDeltaFile() throws Exception{
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray,c2:int);");
pigServer.registerQuery("B = LOAD '" + EMPTY_FILE + "' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray,c2:int);");
DataBag dbMergeCogrp = BagFactory.getInstance().newDefaultBag();
pigServer.registerQuery("C = cogroup A by c1, B by c1 using 'merge';");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
dbMergeCogrp.add(t);
}
String[] results = new String[]{
"(1,{(1,1),(1,2),(1,3)},{})",
"(2,{(2,1),(2,2),(2,3)},{})",
"(3,{(3,1),(3,2),(3,3)},{})"
};
assertEquals(3, dbMergeCogrp.size());
Iterator<Tuple> itr = dbMergeCogrp.iterator();
for(int i=0; i<3; i++){
assertEquals(itr.next().toString(), results[i]);
}
assertFalse(itr.hasNext());
}
@Test
public void testDataWithNullKeys() throws Exception{
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("A = LOAD '" + DATA_WITH_NULL_KEYS + "' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray,c2:int);");
pigServer.registerQuery("B = LOAD '" + DATA_WITH_NULL_KEYS + "' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray,c2:int);");
String[] results = new String[]{
"(,{(,1),(,2),(,3)},{})",
"(,{},{(,1),(,2),(,3)})",
"(2,{(2,3),(2,1),(2,2)},{(2,1),(2,2),(2,3)})",
"(3,{(3,3),(3,1),(3,2)},{(3,1),(3,2),(3,3)})"
};
DataBag dbMergeCogrp = BagFactory.getInstance().newDefaultBag();
pigServer.registerQuery("C = cogroup A by c1, B by c1 using 'merge';");
Iterator<Tuple> iter = pigServer.openIterator("C");
while(iter.hasNext()) {
Tuple t = iter.next();
dbMergeCogrp.add(t);
}
assertEquals(4, dbMergeCogrp.size());
Iterator<Tuple> itr = dbMergeCogrp.iterator();
for(int i=0; i<4; i++){
assertEquals(itr.next().toString(), results[i]);
}
assertFalse(itr.hasNext());
}
/**
* A dummy loader which implements {@link CollectableLoadFunc}
*/
public static class DummyCollectableLoader extends PigStorage implements CollectableLoadFunc{
public DummyCollectableLoader() {
}
@Override
public void ensureAllKeyInstancesInSameSplit() throws IOException {
}
}
/**
* A dummy loader which implements {@link IndexableLoadFunc}
*/
public static class DummyIndexableLoader extends PigStorage implements IndexableLoadFunc {
private String loc;
private FSDataInputStream is;
public DummyIndexableLoader() {
}
@Override
public void close() throws IOException {
is.close();
}
@Override
public void seekNear(Tuple keys) throws IOException {
}
@Override
public void initialize(Configuration conf) throws IOException {
is = FileSystem.get(new Path(loc).toUri(), conf).open(new Path(loc));
}
@Override
public void setLocation(String location, Job job) throws IOException {
super.setLocation(location, job);
loc = location;
}
@Override
public Tuple getNext() throws IOException {
String line = is.readLine();
if(line == null)
return null;
String[] members = line.split("\t");
DefaultTuple tuple = new DefaultTuple();
for(String member : members)
tuple.append(member);
return tuple;
}
}
}