blob: c4f65734bf2bee7279e4521374430efbeca8c9e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.data.Tuple;
import org.apache.pig.tools.pigstats.PigStats;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestNativeMapReduce {
//NOTE:
// Testing NativeMapReduce in LOCAL mode from unit test setup is not easy.
// (ie current WordCount.jar does not work as-is).
// the presence of build/classes/hadoop-site.xml created by MiniCluster
// in the class path makes the MR job try to contact the MiniCluster.
// if the MiniCluster shutdown is changed to delete the file, the other
// test cases fail because the file in classpath does not exist
// the jar has been created using the source at
// http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/WordCount.java:816822
private String jarFileName = "test//org/apache/pig/test/data/TestWordCount.jar";
private String exp_msg_prefix = "Check if expected results contains: ";
final static String INPUT_FILE = "TestNMapReduceInputFile";
/**
*stop word file - used to test distributed cache usage, words in this
* file if specified will be skipped by the wordcount udf
*/
final static String STOPWORD_FILE = "TestNMapReduceStopwFile";
static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
private PigServer pigServer = null;
/**
* TODO - Move to runtime jar creation approach
private void createWordCountJar() {
}*/
@BeforeClass
public static void oneTimeSetup() throws Exception{
String[] input = {
"one",
"two",
"three",
"three",
"two",
"three"
};
//for stop word file
String[] stopw = {
"one"
};
Util.createInputFile(cluster, INPUT_FILE, input);
Util.createLocalInputFile(STOPWORD_FILE, stopw);
}
// createWordCountJar(){
// // its a manual process
// javac -cp build/ivy/lib/Pig/hadoop-core-0.20.2.jar:build/ivy/lib/Pig/commons-cli-1.2.jar test/org/apache/pig/test/utils/WordCount.java
// cd test/
// jar -cf WordCount.jar org/apache/pig/test/utils/WordCount*class
// mv WordCount.jar org/apache/pig/test/data/TestWordCount.jar
//
//
//}
@Before
public void setUp() throws Exception{
pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
//createWordCountJar();
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
Util.deleteFile(cluster, INPUT_FILE);
new File(STOPWORD_FILE).delete();
cluster.shutDown();
}
// See PIG-506
@Test
public void testNativeMRJobSimple() throws Exception{
try{
Collection<String> results = new HashSet<String>();
results.add("(two,2)");
results.add("(three,3)");
pigServer.setBatchOn();
pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
//also test distributed cache using the stopwords file for udf
pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
"Store A into 'table_testNativeMRJobSimple_input' "+
"Load 'table_testNativeMRJobSimple_output' "+
"`org.apache.pig.test.utils.WordCount " +
" -Dmapred.child.java.opts='-Xmx1536m -Xms128m' " +
" -files " + STOPWORD_FILE +
" table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output " +
STOPWORD_FILE + "`;");
pigServer.registerQuery("Store B into 'table_testNativeMRJobSimpleDir';");
List<ExecJob> execJobs = pigServer.executeBatch();
assertEquals("num of jobs", execJobs.size(), 1);
boolean foundNativeFeature = false;
for(ExecJob job : execJobs){
assertEquals("job status", job.getStatus(),JOB_STATUS.COMPLETED);
if(job.getStatistics().getFeatures().contains("NATIVE")){
foundNativeFeature = true;
}
}
assertTrue("foundNativeFeature", foundNativeFeature);
// Check the output
pigServer.registerQuery("C = load 'table_testNativeMRJobSimpleDir';");
Iterator<Tuple> iter = pigServer.openIterator("C");
Tuple t;
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
// check in interactive mode
iter = pigServer.openIterator("B");
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
}
finally{
// We have to manually delete intermediate mapreduce files
Util.deleteFile(cluster,"table_testNativeMRJobSimple_input");
Util.deleteFile(cluster,"table_testNativeMRJobSimple_output");
Util.deleteFile(cluster,"table_testNativeMRJobSimpleDir");
}
}
@Test
public void testNativeMRJobSimpleFailure() throws Exception{
try{
//test if correct return code is obtained when query fails
// the native MR is writing to an exisiting and should fail
Collection<String> results = new HashSet<String>();
results.add("(one,1)");
results.add("(two,2)");
results.add("(three,3)");
pigServer.setBatchOn();
pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
"Store A into 'table_testNativeMRJobSimple_input' "+
"Load 'table_testNativeMRJobSimple_output' "+
"`org.apache.pig.test.utils.WordCount table_testNativeMRJobSimple_input " + INPUT_FILE + "`;");
pigServer.registerQuery("Store B into 'table_testNativeMRJobSimpleDir';");
pigServer.executeBatch();
assertTrue("job failed", PigStats.get().getReturnCode() != 0);
} catch (JobCreationException e) {
// Running in Tez mode throw exception
assertTrue(e.getCause() instanceof FileAlreadyExistsException);
}
finally{
// We have to manually delete intermediate mapreduce files
Util.deleteFile(cluster, "table_testNativeMRJobSimple_input");
Util.deleteFile(cluster, "table_testNativeMRJobSimpleDir");
}
}
// See PIG-506
@Test
public void testNativeMRJobMultiStoreOnPred() throws Exception{
try{
Collection<String> results = new HashSet<String>();
results.add("(one,1)");
results.add("(two,2)");
results.add("(three,3)");
pigServer.setBatchOn();
pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
pigServer.registerQuery("Store A into 'testNativeMRJobMultiStoreOnPredTemp';");
pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
"Store A into 'table_testNativeMRJobMultiStoreOnPred_input' "+
"Load 'table_testNativeMRJobMultiStoreOnPred_output' "+
"`org.apache.pig.test.utils.WordCount table_testNativeMRJobMultiStoreOnPred_input table_testNativeMRJobMultiStoreOnPred_output`;");
pigServer.registerQuery("Store B into 'table_testNativeMRJobMultiStoreOnPredDir';");
pigServer.executeBatch();
// Check the output
pigServer.registerQuery("C = load 'table_testNativeMRJobMultiStoreOnPredDir';");
Iterator<Tuple> iter = pigServer.openIterator("C");
Tuple t;
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
// check in interactive mode
iter = pigServer.openIterator("B");
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
}
finally{
Util.deleteFile(cluster,"testNativeMRJobMultiStoreOnPredTemp");
Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_input");
Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_output");
Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPredDir");
}
}
// See PIG-506
@Test
public void testNativeMRJobMultiQueryOpt() throws Exception{
try{
Collection<String> results = new HashSet<String>();
results.add("(one,1)");
results.add("(two,2)");
results.add("(three,3)");
pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
"Store A into 'table_testNativeMRJobMultiQueryOpt_inputB' "+
"Load 'table_testNativeMRJobMultiQueryOpt_outputB' "+
"`org.apache.pig.test.utils.WordCount table_testNativeMRJobMultiQueryOpt_inputB table_testNativeMRJobMultiQueryOpt_outputB`;");
pigServer.registerQuery("C = mapreduce '" + jarFileName + "' " +
"Store A into 'table_testNativeMRJobMultiQueryOpt_inputC' "+
"Load 'table_testNativeMRJobMultiQueryOpt_outputC' "+
"`org.apache.pig.test.utils.WordCount table_testNativeMRJobMultiQueryOpt_inputC table_testNativeMRJobMultiQueryOpt_outputC`;");
Iterator<Tuple> iter = pigServer.openIterator("C");
Tuple t;
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
iter = pigServer.openIterator("B");
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
}finally{
Util.deleteFile(cluster,"table_testNativeMRJobMultiQueryOpt_inputB");
Util.deleteFile(cluster,"table_testNativeMRJobMultiQueryOpt_outputB");
Util.deleteFile(cluster,"table_testNativeMRJobMultiQueryOpt_inputC");
Util.deleteFile(cluster,"table_testNativeMRJobMultiQueryOpt_outputC");
}
}
// See PIG-506
@Test
public void testNativeMRJobTypeCastInserter() throws Exception{
try{
Collection<String> results = new HashSet<String>();
results.add("(2)");
results.add("(3)");
results.add("(4)");
pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
"Store A into 'table_testNativeMRJobTypeCastInserter_input' "+
"Load 'table_testNativeMRJobTypeCastInserter_output' as (name:chararray, count: int)"+
"`org.apache.pig.test.utils.WordCount table_testNativeMRJobTypeCastInserter_input table_testNativeMRJobTypeCastInserter_output`;");
pigServer.registerQuery("C = foreach B generate count+1;");
Iterator<Tuple> iter = pigServer.openIterator("C");
Tuple t;
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
}finally{
Util.deleteFile(cluster,"table_testNativeMRJobTypeCastInserter_input");
Util.deleteFile(cluster,"table_testNativeMRJobTypeCastInserter_output");
}
}
}