blob: 1e1aa5577b6019ba726904390aeb08d62a6f1895 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.pig.PigRunner;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.scripting.ScriptEngine;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestScriptLanguage {
static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
cluster.shutDown();
}
@After
public void tearDown() throws Exception {
Util.deleteFile(cluster, "simple_out");
Util.deleteFile(cluster, "simple_out2");
}
private String createScript(String name, String[] script) throws Exception {
String scriptName = name + "_testScript.py";
Util.createLocalInputFile(scriptName, script);
return scriptName;
}
private Map<String, List<PigStats>> runScript(String name, String[] script) throws Exception {
String scriptName = name + "_testScript.py";
Util.createLocalInputFile(scriptName, script);
ScriptEngine scriptEngine = ScriptEngine.getInstance("jython");
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), scriptName);
return statsMap;
}
private PigStats runSimpleScript(String name, String[] script) throws Exception {
Map<String, List<PigStats>> statsMap = runScript(name, script);
assertEquals(1, statsMap.size());
Iterator<List<PigStats>> it = statsMap.values().iterator();
PigStats stats = it.next().get(0);
assertTrue("job should succeed", stats.isSuccessful());
return stats;
}
private PigStats runPigRunner(String name, String[] script, boolean shouldSucceed) throws Exception {
String scriptName = createScript(name, script);
String[] args = { "-x", cluster.getExecType().name().toLowerCase(), "-g", "jython", scriptName };
PigStats mainStats = PigRunner.run(args, new TestPigRunner.TestNotificationListener());
if (shouldSucceed) {
assertTrue(mainStats.isEmbedded());
assertTrue("job should succeed", mainStats.isSuccessful());
} else {
assertFalse("job should fail", mainStats.isSuccessful());
}
return mainStats;
}
private PigStats runPigRunner(String name, String[] script) throws Exception {
return runPigRunner(name, script, true);
}
@Test
public void firstTest() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"Pig.fs(\"rmr simple_out\")",
"input = 'simple_table'",
"output = 'simple_out'",
"P = Pig.compile(\"\"\"a = load '$input';store a into '$output';\"\"\")",
"Q = P.bind({'input':input, 'output':output})",
"stats = Q.runSingle()",
"if stats.isSuccessful():",
"\tprint 'success!'",
"else:",
"\traise 'failed'"
};
String[] input = {
"1\t3",
"2\t4",
"3\t5"
};
Util.deleteFile(cluster, "simple_table");
Util.createInputFile(cluster, "simple_table", input);
PigStats stats = runSimpleScript("firstTest", script);
assertEquals(1, stats.getNumberJobs());
String name = stats.getOutputNames().get(0);
assertEquals("simple_out", name);
assertEquals(12, stats.getBytesWritten());
assertEquals(3, stats.getRecordWritten());
}
@Test
public void secondTest() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"Pig.fs(\"rmr simple_out\")",
"input = 'simple_table_6'",
"output = 'simple_out'",
"P = Pig.compileFromFile(\"\"\"secondTest.pig\"\"\")",
"Q = P.bind({'input':input, 'output':output})",
"stats = Q.runSingle()",
"if stats.isSuccessful():",
"\tprint 'success!'",
"else:",
"\traise 'failed'"
};
String[] input = {
"1\t3",
"2\t4",
"3\t5"
};
String[] pigLatin = {
"-- ensure comment parsed correctly",
"a = load '$input';",
"store a into '$output';"
};
Util.createInputFile(cluster, "simple_table_6", input);
Util.createLocalInputFile( "secondTest.pig", pigLatin);
PigStats stats = runSimpleScript("secondTest", script);
assertEquals(1, stats.getNumberJobs());
String name = stats.getOutputNames().get(0);
assertEquals("simple_out", name);
assertEquals(12, stats.getBytesWritten());
assertEquals(3, stats.getRecordWritten());
}
@Test
public void firstParallelTest() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"Pig.fs(\"rmr simple_out\")",
"Pig.fs(\"rmr simple_out2\")",
"input = 'simple_table_1'",
"output1 = 'simple_out'",
"output2 = 'simple_out2'",
"P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';store a into '$output';\"\"\")",
"Q = P.bind([{'input':input, 'output':output1}, {'input':input, 'output':output2}])",
"stats = Q.run()"
};
String[] input = {
"1\t3",
"2\t4",
"3\t5"
};
Util.createInputFile(cluster, "simple_table_1", input);
Map<String, List<PigStats>> statsMap = runScript("firstParallelTest", script);
assertEquals(1, statsMap.size());
assertEquals("mypipeline", statsMap.keySet().iterator().next());
List<PigStats> lst = statsMap.get("mypipeline");
assertEquals(2, lst.size());
for (PigStats stats : lst) {
assertTrue("job should succeed", stats.isSuccessful());
assertEquals(1, stats.getNumberJobs());
assertEquals(12, stats.getBytesWritten());
assertEquals(3, stats.getRecordWritten());
}
}
@Test
public void pigRunnerTest() throws Exception {
String[] script = {
"from org.apache.pig.scripting import *",
"Pig.fs(\"rmr simple_out\")",
"input = 'simple_table_2'",
"output = 'simple_out'",
"P = Pig.compile(\"\"\"a = load '$input';store a into '$output';\"\"\")",
"Q = P.bind({'input':input, 'output':output})",
"stats = Q.runSingle()",
"if stats.isSuccessful():",
"\tprint 'success!'",
"else:",
"\traise 'failed'"
};
String[] input = {
"1\t3",
"2\t4",
"3\t5"
};
Util.createInputFile(cluster, "simple_table_2", input);
PigStats mainStats = runPigRunner("pigRunnerTest", script);
Map<String, List<PigStats>> statsMap = mainStats.getAllStats();
assertEquals(1, statsMap.size());
Iterator<List<PigStats>> it = statsMap.values().iterator();
PigStats stats = it.next().get(0);
assertTrue("job should succeed", stats.isSuccessful());
assertEquals(1, stats.getNumberJobs());
String name = stats.getOutputNames().get(0);
assertEquals("simple_out", name);
assertEquals(12, stats.getBytesWritten());
assertEquals(3, stats.getRecordWritten());
String jobName = stats.getPigProperties().getProperty(PigContext.JOB_NAME);
assertTrue(jobName.contains("pigRunnerTest"));
}
@Test
public void runParallelTest() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"input = 'simple_table_3'",
"Pig.fs(\"rmr simple_out\")",
"Pig.fs(\"rmr simple_out2\")",
"output1 = 'simple_out'",
"output2 = 'simple_out2'",
"P.set(\"jobName\", \"myjob\")",
"P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';store a into '$output';\"\"\")",
"Q = P.bind([{'input':input, 'output':output1}, {'input':input, 'output':output2}])",
"stats = Q.run()"
};
String[] input = {
"1\t3",
"2\t4",
"3\t5"
};
Util.createInputFile(cluster, "simple_table_3", input);
PigStats mainStats = runPigRunner("runParallelTest", script);
Map<String, List<PigStats>> statsMap = mainStats.getAllStats();
assertEquals(1, statsMap.size());
assertEquals("mypipeline", statsMap.keySet().iterator().next());
List<PigStats> lst = statsMap.get("mypipeline");
assertEquals(2, lst.size());
for (PigStats stats : lst) {
assertTrue("job should succeed", stats.isSuccessful());
assertEquals(1, stats.getNumberJobs());
assertEquals(12, stats.getBytesWritten());
assertEquals(3, stats.getRecordWritten());
String jobName = stats.getPigProperties().getProperty(PigContext.JOB_NAME);
assertTrue(jobName.contains("myjob"));
}
}
@Test
public void runParallelTest2() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"input = 'simple_table_7'",
"Pig.fs(\"rmr simple_out\")",
"Pig.fs(\"rmr simple_out2\")",
"output1 = 'simple_out'",
"output2 = 'simple_out2'",
"P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';",
"b = foreach a generate $0, org.apache.pig.test.utils.UDFContextTestEvalFunc3($0);",
"store b into '$output';\"\"\")",
"Q = P.bind([{'input':input, 'output':output1}, {'input':input, 'output':output2}])",
"stats = Q.run()"
};
String[] input = {
"1\t3"
};
Util.createInputFile(cluster, "simple_table_7", input);
PigStats mainStats = runPigRunner("runParallelTest2", script);
Map<String, List<PigStats>> statsMap = mainStats.getAllStats();
assertEquals(1, statsMap.size());
assertEquals("mypipeline", statsMap.keySet().iterator().next());
List<PigStats> lst = statsMap.get("mypipeline");
assertEquals(2, lst.size());
String[] results = new String[2];
int i = 0;
for (PigStats stats : lst) {
assertTrue("job should succeed", stats.isSuccessful());
assertEquals(1, stats.getNumberJobs());
OutputStats os = stats.getOutputStats().get(0);
Tuple t = os.iterator().next();
results[i++] = t.get(1).toString();
}
assertTrue(results[0] != null);
assertTrue(results[1] != null);
assertTrue(!results[0].equals(results[1]));
}
@Test
public void runLoopTest() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"Pig.fs(\"rmr simple_out\")",
"Pig.fs(\"rmr simple_out2\")",
"input = 'simple_table_4'",
"P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';store a into '$output';\"\"\")",
"for x in [\"simple_out\", \"simple_out2\"]:",
"\tQ = P.bind({'input':input, 'output':x}).run()"
};
String[] input = {
"1\t3",
"2\t4",
"3\t5"
};
Util.createInputFile(cluster, "simple_table_4", input);
PigStats mainStats = runPigRunner("runLoopTest", script);
Map<String, List<PigStats>> statsMap = mainStats.getAllStats();
assertEquals(1, statsMap.size());
assertEquals("mypipeline", statsMap.keySet().iterator().next());
List<PigStats> lst = statsMap.get("mypipeline");
assertEquals(2, lst.size());
for (PigStats stats : lst) {
assertTrue("job should succeed", stats.isSuccessful());
assertEquals(1, stats.getNumberJobs());
assertEquals(12, stats.getBytesWritten());
assertEquals(3, stats.getRecordWritten());
}
}
@Test
public void bindLocalVariableTest() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"Pig.fs(\"rmr simple_out\")",
"input = 'simple_table_5'",
"output = 'simple_out'",
"testvar = 'abcd$py'",
"testvar2 = '$'",
"testvar3 = '\\\\\\\\$'",
"testvar4 = 'abcd\\$py$'",
"testvar5 = 'abcd\\$py'",
"P = Pig.compile(\"\"\"a = load '$input';store a into '$output';\"\"\")",
"Q = P.bind()",
"stats = Q.runSingle()",
"if stats.isSuccessful():",
"\tprint 'success!'",
"else:",
"\traise 'failed'"
};
String[] input = {
"1\t3",
"2\t4",
"3\t5"
};
Util.createInputFile(cluster, "simple_table_5", input);
PigStats stats = runSimpleScript("bindLocalVariableTest", script);
assertEquals(1, stats.getNumberJobs());
String name = stats.getOutputNames().get(0);
assertEquals("simple_out", name);
assertEquals(12, stats.getBytesWritten());
assertEquals(3, stats.getRecordWritten());
}
@Test
public void bindLocalVariableTest2() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"Pig.fs(\"rmr simple_out\")",
"input = 'bindLocalVariableTest2'",
"output = 'simple_out'",
"separator = '$'",
"P = Pig.compile(\"\"\"a = load '$input' using PigStorage('$separator');store a into '$output';\"\"\")",
"Q = P.bind()",
"stats = Q.runSingle()",
"if stats.isSuccessful():",
"\tprint 'success!'",
"else:",
"\traise 'failed'"
};
String[] input = {
"1$3",
"2$4",
"3$5"
};
Util.createInputFile(cluster, "bindLocalVariableTest2", input);
PigStats stats = runSimpleScript("bindLocalVariableTest2", script);
assertEquals(1, stats.getNumberJobs());
String name = stats.getOutputNames().get(0);
assertEquals("simple_out", name);
assertEquals(12, stats.getBytesWritten());
assertEquals(3, stats.getRecordWritten());
}
@Test
public void bindNonStringVariableTest() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"Pig.fs(\"-rmr simple_out\")",
"input = 'simple_table'",
"output = 'simple_out'",
"max = 2",
"P = Pig.compile(\"\"\"a = load '$in' as (a0:int, a1:int);" +
" b = filter a by a0 > $max;" +
" store b into '$out';\"\"\")",
"Q = P.bind({'in':input, 'out':output, 'max':max})",
"stats = Q.runSingle()",
"if stats.isSuccessful():",
"\tprint 'success!'",
"else:",
"\traise 'failed'"
};
String[] input = {
"1\t3",
"2\t4",
"3\t5"
};
Util.deleteFile(cluster, "simple_table");
Util.createInputFile(cluster, "simple_table", input);
PigStats stats = runSimpleScript("bindNonStringVariableTest", script);
assertEquals(1, stats.getNumberJobs());
String name = stats.getOutputNames().get(0);
assertEquals("simple_out", name);
assertEquals(4, stats.getBytesWritten());
assertEquals(1, stats.getRecordWritten());
}
@Test
public void fsTest() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"ret = Pig.fs(\"-rmr simple_out\")",
"if ret == 0:",
"\tprint 'success!'",
"else:",
"\traise 'fs command failed'"
};
runPigRunner("fsTest", script, false);
}
@Test
public void NegativeTest() throws Exception {
String[] script = {
"#!/usr/bin/python",
" from org.apache.pig.scripting import *",
"Pig.fs(\"rmr simple_out\")"
};
PigStats stats = runPigRunner("NegativeTest", script, false);
assertTrue(stats.getErrorCode() == 1121);
assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.PIG_EXCEPTION);
}
@Test
public void NegativeTest2() throws Exception {
String[] script = {
"#!/usr/bin/python",
" from org.apache.pig.scripting import *",
"raise 'This is a test'"
};
PigStats stats = runPigRunner("NegativeTest2", script, false);
assertTrue(stats.getErrorCode() == 1121);
assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.PIG_EXCEPTION);
}
@Test // PIG-2056
public void NegativeTest3() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import Pig",
"P = Pig.compile(\"\"\"",
" #TEST PIG COMMENTS",
" A = load 'studenttab10k' as (name, age, gpa);",
" store A into 'CompileBindRun_2.out';\"\"\")",
"result = P.bind().runSingle()"
};
PigStats stats = runPigRunner("NegativeTest3", script, false);
assertTrue(stats.getErrorCode() == 1121);
assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.PIG_EXCEPTION);
String expected = "Python Error. Traceback (most recent call last):";
String msg = stats.getErrorMessage();
Util.checkErrorMessageContainsExpected(msg, expected);
}
@Test
public void testFixNonEscapedDollarSign() throws Exception {
java.lang.reflect.Method fixNonEscapedDollarSign = Class.forName(
"org.apache.pig.scripting.Pig").getDeclaredMethod(
"fixNonEscapedDollarSign", new Class[] { String.class });
fixNonEscapedDollarSign.setAccessible(true);
String s = (String)fixNonEscapedDollarSign.invoke(null, "abc$py$");
assertEquals("abc\\\\$py\\\\$", s);
s = (String)fixNonEscapedDollarSign.invoke(null, "$abc$py");
assertEquals("\\\\$abc\\\\$py", s);
s = (String)fixNonEscapedDollarSign.invoke(null, "$");
assertEquals("\\\\$", s);
s = (String)fixNonEscapedDollarSign.invoke(null, "$$abc");
assertEquals("\\\\$\\\\$abc", s);
}
// See PIG-2291
@Test
public void testDumpInScript() throws Exception{
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"Pig.fs(\"rmr simple_out\")",
"input = 'testDumpInScript_table'",
"output = 'simple_out'",
"P = Pig.compileFromFile(\"\"\"testDumpInScript.pig\"\"\")",
"Q = P.bind({'input':input, 'output':output})",
"stats = Q.runSingle()",
"if stats.isSuccessful():",
"\tprint 'success!'",
"else:",
"\tprint 'failed'"
};
String[] input = {
"1\t3",
"2\t4",
"3\t5"
};
String[] pigLatin = {
"a = load '$input' as (a0:int,a1:int);",
"store a into '$output';",
"dump a"
};
Util.createInputFile(cluster, "testDumpInScript_table", input);
Util.createLocalInputFile( "testDumpInScript.pig", pigLatin);
runSimpleScript("testDumpInScript", script);
}
// See PIG-2291
@Test
public void testIllustrateInScript() throws Exception {
Assume.assumeTrue("Skip this test for TEZ. See PIG-3993", Util.isMapredExecType(cluster.getExecType()));
String[] script = { "#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"Pig.fs(\"rmr simple_out\")",
"input = 'testIllustrateInScript_table'",
"output = 'simple_out'",
"P = Pig.compileFromFile(\"\"\"testIllustrateInScript.pig\"\"\")",
"Q = P.bind({'input':input, 'output':output})",
"stats = Q.runSingle()", "if stats.isSuccessful():",
"\tprint 'success!'", "else:", "\tprint 'failed'" };
String[] input = { "1\t3", "2\t4", "3\t5" };
String[] pigLatin = { "a = load '$input' as (a0:int,a1:int);",
"store a into '$output';", "illustrate a" };
Util.createInputFile(cluster, "testIllustrateInScript_table", input);
Util.createLocalInputFile("testIllustrateInScript.pig", pigLatin);
runSimpleScript("testIllustrateInScript", script);
}
@Test
public void testPyShouldNotFailScriptIfExitCodeIs0() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import *",
"import sys",
"if 1 == 2:",
" sys.exit(1)",
"else: sys.exit(0)"
};
Map<String, List<PigStats>> statsMap = runScript("testPyShouldNotFailScriptIfExitCodeIs0", script);
assertEquals(0, statsMap.size());
}
@Test
public void testSysArguments() throws Exception {
String[] script = {
"#!/usr/bin/python",
"from org.apache.pig.scripting import Pig",
"import sys",
"P = Pig.compile(\"\"\"rmf $file;\"\"\")",
"files = sys.argv[1].strip().split(',')",
"for file in files:",
" P.bind({'file':file}).runSingle()"
};
String file1 = "/tmp/sysarg_1";
String file2 = "/tmp/sysarg_2";
createEmptyFiles(file1, file2);
File scriptFile = Util.createInputFile("sysarg", ".py", script);
// ExecMode.STRING
PigStats stats = PigRunner.run(new String[] { scriptFile.getAbsolutePath(),
file1 + "," + file2 }, null);
assertEquals(null, stats.getErrorMessage());
assertFileNotExists(file1, file2);
createEmptyFiles(file1, file2);
// Clear stats from previous execution
PigStatsUtil.getEmptyPigStats();
// ExecMode.FILE
stats = PigRunner.run(new String[] { "-x", cluster.getExecType().name().toLowerCase(),
"-f", scriptFile.getAbsolutePath(), "arg0",
file1 + "," + file2 }, null);
assertEquals(null, stats.getErrorMessage());
assertFileNotExists(file1, file2);
}
private void createEmptyFiles(String... filenames) throws IOException {
for (String file : filenames) {
Util.createInputFile(cluster, file, new String[]{""});
assertTrue(cluster.getFileSystem().exists(new Path(file)));
}
}
private void assertFileNotExists(String... filenames) throws IOException {
for (String file : filenames) {
assertFalse(cluster.getFileSystem().exists(new Path(file)));
}
}
}