blob: f50eac3d849fd827fdd43fde440fb187d1459a9c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.apache.pig.builtin.mock.Storage.resetData;
import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.scripting.ScriptEngine;
import org.apache.pig.tools.grunt.Grunt;
import org.apache.pig.tools.grunt.GruntParser;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
import org.junit.Before;
import org.junit.Test;
import com.google.common.io.Files;
public class TestPigServerLocal {
private File tempDir;
@Before
public void setUp() throws Exception{
tempDir = Files.createTempDir();
tempDir.deleteOnExit();
registerNewResource(tempDir.getAbsolutePath());
}
// dynamically add more resources to the system class loader
private static void registerNewResource(String file) throws Exception {
URL urlToAdd = new File(file).toURI().toURL();
URLClassLoader sysLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
Method addMethod = URLClassLoader.class.
getDeclaredMethod("addURL", new Class[]{URL.class});
addMethod.setAccessible(true);
addMethod.invoke(sysLoader, new Object[]{urlToAdd});
}
@Test
public void testParamSubstitution() throws Exception{
// using params map
PigServer pig=new PigServer(Util.getLocalTestMode());
Map<String,String> params=new HashMap<String, String>();
params.put("input", "test/org/apache/pig/test/data/passwd");
File scriptFile=Util.createFile(new String[]{"a = load '$input' using PigStorage(':');"});
pig.registerScript(scriptFile.getAbsolutePath(),params);
Iterator<Tuple> iter=pig.openIterator("a");
int index=0;
List<Tuple> expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":");
while(iter.hasNext()){
Tuple tuple=iter.next();
assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
index++;
}
// using param file
pig=new PigServer(Util.getLocalTestMode());
List<String> paramFile=new ArrayList<String>();
paramFile.add(Util.createFile(new String[]{"input=test/org/apache/pig/test/data/passwd2"}).getAbsolutePath());
pig.registerScript(scriptFile.getAbsolutePath(),paramFile);
iter=pig.openIterator("a");
index=0;
expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd2", ":");
while(iter.hasNext()){
Tuple tuple=iter.next();
assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
index++;
}
// using both param value and param file, param value should override param file
pig=new PigServer(Util.getLocalTestMode());
pig.registerScript(scriptFile.getAbsolutePath(),params,paramFile);
iter=pig.openIterator("a");
index=0;
expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":");
while(iter.hasNext()){
Tuple tuple=iter.next();
assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
index++;
}
}
// PIG-3469
@Test
public void testNonExistingSecondDirectoryInSkewJoin() throws Exception {
String script =
"exists = LOAD 'test/org/apache/pig/test/data/InputFiles/jsTst1.txt' AS (x:chararray, a:long);" +
"missing = LOAD '/non/existing/directory' AS (a:long);" +
"missing = FOREACH ( GROUP missing BY a ) GENERATE $0 AS a, COUNT_STAR($1);" +
"joined = JOIN exists BY a, missing BY a USING 'skewed';" +
"STORE joined INTO '/tmp/test_out.tsv';";
PigServer pig = new PigServer(Util.getLocalTestMode());
if (Util.getLocalTestMode().toString().startsWith("TEZ")) {
try {
pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")));
fail("Expect front exception");
} catch(Exception ex) {
Throwable excp = ex;
assertTrue(excp instanceof FrontendException);
excp = excp.getCause();
assertTrue(excp instanceof VisitorException);
excp = excp.getCause();
assertTrue(excp instanceof ExecException);
excp = excp.getCause();
assertTrue(excp instanceof InvalidInputException);
}
} else {
// Execution of the script should fail, but without throwing any exceptions (such as NPE)
try {
pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")));
} catch(Exception ex) {
fail("Unexpected exception: " + ex);
}
}
}
// build the pig script from in-memory, and wrap it as ByteArrayInputStream
@Test
public void testRegisterScriptFromStream() throws Exception{
// using params map
PigServer pig=new PigServer(Util.getLocalTestMode());
Map<String,String> params=new HashMap<String, String>();
params.put("input", "test/org/apache/pig/test/data/passwd");
String script="a = load '$input' using PigStorage(':');";
pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),params);
assertEquals("ScriptState contains different script", script, ScriptState.get().getScript());
Iterator<Tuple> iter=pig.openIterator("a");
int index=0;
List<Tuple> expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":");
while(iter.hasNext()){
Tuple tuple=iter.next();
assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
index++;
}
// using param file
pig=new PigServer(Util.getLocalTestMode());
List<String> paramFile=new ArrayList<String>();
paramFile.add(Util.createFile(new String[]{"input=test/org/apache/pig/test/data/passwd2"}).getAbsolutePath());
pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),paramFile);
iter=pig.openIterator("a");
index=0;
expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd2", ":");
while(iter.hasNext()){
Tuple tuple=iter.next();
assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
index++;
}
// using both param value and param file, param value should override param file
pig=new PigServer(Util.getLocalTestMode());
pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),params,paramFile);
iter=pig.openIterator("a");
index=0;
expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":");
while(iter.hasNext()){
Tuple tuple=iter.next();
assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString());
index++;
}
}
@Test
public void testSecondarySort() throws Exception {
PigServer pigServer = new PigServer(Util.getLocalTestMode());
Data data = resetData(pigServer);
data.set("foo",
tuple("a", 1, "b"),
tuple("b", 2, "c"),
tuple("c", 3, "d")
);
pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);");
pigServer.registerQuery("B = order A by f1,f2,f3 DESC;");
pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();");
List<Tuple> out = data.get("bar");
assertEquals(tuple("a", 1, "b"), out.get(0));
assertEquals(tuple("b", 2, "c"), out.get(1));
assertEquals(tuple("c", 3, "d"), out.get(2));
}
@Test(expected = RuntimeException.class)
public void testLocationStrictCheck() throws Exception {
Properties properties = PropertiesUtil.loadDefaultProperties();
properties.setProperty("pig.location.check.strict", "true");
PigServer pigServer = new PigServer(Util.getLocalTestMode(), properties);
Data data = resetData(pigServer);
data.set("foo",
tuple("a", 1, "b"),
tuple("b", 2, "c"),
tuple("c", 3, "d"));
pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);");
pigServer.registerQuery("B = order A by f1,f2,f3 DESC;");
pigServer.registerQuery("C = order A by f1,f2,f3;");
// Storing to same location 'bar' should throw a RuntimeException
pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();");
pigServer.registerQuery("STORE C INTO 'bar' USING mock.Storage();");
List<Tuple> out = data.get("bar");
assertEquals(tuple("a", 1, "b"), out.get(0));
assertEquals(tuple("b", 2, "c"), out.get(1));
assertEquals(tuple("c", 3, "d"), out.get(2));
}
@Test
public void testSkipParseInRegisterForBatch() throws Throwable {
if (Util.getLocalTestMode().toString().startsWith("TEZ")) {
_testSkipParseInRegisterForBatch(false, 8, 4);
_testSkipParseInRegisterForBatch(true, 5, 1);
_testParseBatchWithScripting(5, 1);
} else if (Util.getLocalTestMode().toString().startsWith("SPARK")) {
// 6 = 4 (Once per registerQuery) + 2 (SortConverter , PigRecordReader)
// 4 (Once per registerQuery)
_testSkipParseInRegisterForBatch(false, 6, 4);
// 3 = 1 (registerQuery) + 2 (SortConverter, PigRecordReader)
// 1 (registerQuery)
_testSkipParseInRegisterForBatch(true, 3, 1);
_testParseBatchWithScripting(3, 1);
} else {
// numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader,
// InputSizeReducerEstimator, getSplits->RandomSampleLoader,
// createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
// numTimesSchemaCalled = 4 (once per registerQuery)
_testSkipParseInRegisterForBatch(false, 10, 4);
// numTimesInitiated = 7 (parseAndBuild, launchPlan->RandomSampleLoader,
// InputSizeReducerEstimator, getSplits->RandomSampleLoader,
// createRecordReader->RandomSampleLoader, getSplits, createRecordReader)
// numTimesSchemaCalled = 1 (parseAndBuild)
_testSkipParseInRegisterForBatch(true, 7, 1);
_testParseBatchWithScripting(7, 1);
}
}
@Test
// See PIG-3967
public void testGruntValidation() throws Exception {
PigServer pigServer = new PigServer(Util.getLocalTestMode());
Data data = resetData(pigServer);
data.set("foo",
tuple("a", 1, "b"),
tuple("b", 2, "c"),
tuple("c", 3, "d"));
pigServer.setValidateEachStatement(true);
pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);");
pigServer.registerQuery("store A into '" + Util.generateURI(tempDir.toString(), pigServer.getPigContext()) + "/testGruntValidation1';");
pigServer.registerQuery("B = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);");
pigServer.registerQuery("store B into '" + Util.generateURI(tempDir.toString(), pigServer.getPigContext()) + "/testGruntValidation2';"); // This should pass
boolean validationExceptionCaptured = false;
try {
// This should fail due to output validation
pigServer.registerQuery("store A into '" + Util.generateURI(tempDir.toString(),pigServer.getPigContext()) + "/testGruntValidation1';");
} catch (FrontendException e) {
validationExceptionCaptured = true;
}
assertTrue(validationExceptionCaptured);
}
private void _testSkipParseInRegisterForBatch(boolean skipParseInRegisterForBatch,
int numTimesInitiated, int numTimesSchemaCalled) throws Throwable {
MockTrackingStorage.numTimesInitiated = 0;
MockTrackingStorage.numTimesSchemaCalled = 0;
String query = "A = LOAD 'foo' USING " + MockTrackingStorage.class.getName() + "();\n" +
"B = order A by $0,$1,$2;\n" +
"C = LIMIT B 2;\n" +
"STORE C INTO 'bar' USING mock.Storage();\n";
BufferedReader in = new BufferedReader(new StringReader(query));
Properties properties = new Properties();
properties.setProperty("io.sort.mb", "2");
PigContext pigContext = new PigContext(Util.getLocalTestMode(), properties);
Data data;
if (skipParseInRegisterForBatch) {
data = resetData(pigContext);
data.set("foo", tuple("a", 1, "b"), tuple("b", 2, "c"), tuple("c", 3, "d"));
Grunt grunt = new Grunt(in, pigContext);
grunt.exec(); // Calls grunt.parseStopOnError(); which executes as batch
}
else {
PigServer pigServer = new PigServer(pigContext);
data = resetData(pigServer);
data.set("foo", tuple("a", 1, "b"), tuple("b", 2, "c"), tuple("c", 3, "d"));
GruntParser grunt = new GruntParser(in, pigServer);
grunt.setInteractive(false);
grunt.parseStopOnError(true); //not batch
}
assertEquals(numTimesInitiated, MockTrackingStorage.numTimesInitiated);
assertEquals(numTimesSchemaCalled, MockTrackingStorage.numTimesSchemaCalled);
List<Tuple> out = data.get("bar");
assertEquals(2, out.size());
assertEquals(tuple("a", 1, "b"), out.get(0));
assertEquals(tuple("b", 2, "c"), out.get(1));
}
private void _testParseBatchWithScripting(int numTimesInitiated, int numTimesSchemaCalled) throws Throwable {
MockTrackingStorage.numTimesInitiated = 0;
MockTrackingStorage.numTimesSchemaCalled = 0;
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"P = Pig.compile(\"\"\"" +
"A = load 'foo' USING org.apache.pig.test.TestPigServerLocal\\$MockTrackingStorage();" +
"B = order A by $0,$1,$2;" +
"C = LIMIT B 2;" +
"store C into 'bar' USING mock.Storage();" +
"\"\"\")",
"Q = P.bind()",
"stats = Q.runSingle()",
"if stats.isSuccessful():",
"\tprint 'success!'",
"else:",
"\traise 'failed'"
};
Properties properties = new Properties();
properties.setProperty("io.sort.mb", "2");
PigContext pigContext = new PigContext(Util.getLocalTestMode(), properties);
PigServer pigServer = new PigServer(pigContext);
Data data = resetData(pigContext);
data.set("foo", tuple("a", 1, "b"), tuple("b", 2, "c"), tuple("c", 3, "d"));
String scriptFile = tempDir + File.separator + "_testParseBatchWithScripting.py";
Util.createLocalInputFile(scriptFile , script);
ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), scriptFile);
for (List<PigStats> stats : statsMap.values()) {
for (PigStats s : stats) {
assertTrue(s.isSuccessful());
}
}
assertEquals(numTimesInitiated, MockTrackingStorage.numTimesInitiated);
assertEquals(numTimesSchemaCalled, MockTrackingStorage.numTimesSchemaCalled);
List<Tuple> out = data.get("bar");
assertEquals(2, out.size());
assertEquals(tuple("a", 1, "b"), out.get(0));
assertEquals(tuple("b", 2, "c"), out.get(1));
}
public static class MockTrackingStorage extends Storage {
public static int numTimesInitiated = 0;
public static int numTimesSchemaCalled = 0;
public MockTrackingStorage() {
super();
numTimesInitiated++;
}
@Override
public ResourceSchema getSchema(String location, Job job) throws IOException {
numTimesSchemaCalled++;
return super.getSchema(location, job);
}
}
}