| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.test; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.fs.Path; |
| import org.apache.pig.PigRunner; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.scripting.ScriptEngine; |
| import org.apache.pig.tools.pigstats.OutputStats; |
| import org.apache.pig.tools.pigstats.PigStats; |
| import org.apache.pig.tools.pigstats.PigStatsUtil; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Assume; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| public class TestScriptLanguage { |
| |
| static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); |
| |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| |
| } |
| |
| @AfterClass |
| public static void tearDownAfterClass() throws Exception { |
| cluster.shutDown(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| Util.deleteFile(cluster, "simple_out"); |
| Util.deleteFile(cluster, "simple_out2"); |
| } |
| |
| private String createScript(String name, String[] script) throws Exception { |
| String scriptName = name + "_testScript.py"; |
| Util.createLocalInputFile(scriptName, script); |
| return scriptName; |
| } |
| |
| private Map<String, List<PigStats>> runScript(String name, String[] script) throws Exception { |
| String scriptName = name + "_testScript.py"; |
| Util.createLocalInputFile(scriptName, script); |
| ScriptEngine scriptEngine = ScriptEngine.getInstance("jython"); |
| PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| Map<String, List<PigStats>> statsMap = scriptEngine.run(pigServer.getPigContext(), scriptName); |
| return statsMap; |
| } |
| |
| private PigStats runSimpleScript(String name, String[] script) throws Exception { |
| Map<String, List<PigStats>> statsMap = runScript(name, script); |
| assertEquals(1, statsMap.size()); |
| Iterator<List<PigStats>> it = statsMap.values().iterator(); |
| PigStats stats = it.next().get(0); |
| assertTrue("job should succeed", stats.isSuccessful()); |
| return stats; |
| } |
| |
| private PigStats runPigRunner(String name, String[] script, boolean shouldSucceed) throws Exception { |
| String scriptName = createScript(name, script); |
| |
| String[] args = { "-x", cluster.getExecType().name().toLowerCase(), "-g", "jython", scriptName }; |
| |
| PigStats mainStats = PigRunner.run(args, new TestPigRunner.TestNotificationListener()); |
| if (shouldSucceed) { |
| assertTrue(mainStats.isEmbedded()); |
| assertTrue("job should succeed", mainStats.isSuccessful()); |
| } else { |
| assertFalse("job should fail", mainStats.isSuccessful()); |
| } |
| return mainStats; |
| } |
| |
| |
| private PigStats runPigRunner(String name, String[] script) throws Exception { |
| return runPigRunner(name, script, true); |
| } |
| |
| @Test |
| public void firstTest() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "Pig.fs(\"rmr simple_out\")", |
| "input = 'simple_table'", |
| "output = 'simple_out'", |
| "P = Pig.compile(\"\"\"a = load '$input';store a into '$output';\"\"\")", |
| "Q = P.bind({'input':input, 'output':output})", |
| "stats = Q.runSingle()", |
| "if stats.isSuccessful():", |
| "\tprint 'success!'", |
| "else:", |
| "\traise 'failed'" |
| }; |
| String[] input = { |
| "1\t3", |
| "2\t4", |
| "3\t5" |
| }; |
| |
| Util.deleteFile(cluster, "simple_table"); |
| Util.createInputFile(cluster, "simple_table", input); |
| |
| PigStats stats = runSimpleScript("firstTest", script); |
| |
| assertEquals(1, stats.getNumberJobs()); |
| String name = stats.getOutputNames().get(0); |
| assertEquals("simple_out", name); |
| assertEquals(12, stats.getBytesWritten()); |
| assertEquals(3, stats.getRecordWritten()); |
| } |
| |
| @Test |
| public void secondTest() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "Pig.fs(\"rmr simple_out\")", |
| "input = 'simple_table_6'", |
| "output = 'simple_out'", |
| "P = Pig.compileFromFile(\"\"\"secondTest.pig\"\"\")", |
| "Q = P.bind({'input':input, 'output':output})", |
| "stats = Q.runSingle()", |
| "if stats.isSuccessful():", |
| "\tprint 'success!'", |
| "else:", |
| "\traise 'failed'" |
| }; |
| String[] input = { |
| "1\t3", |
| "2\t4", |
| "3\t5" |
| }; |
| |
| String[] pigLatin = { |
| "-- ensure comment parsed correctly", |
| "a = load '$input';", |
| "store a into '$output';" |
| }; |
| |
| Util.createInputFile(cluster, "simple_table_6", input); |
| Util.createLocalInputFile( "secondTest.pig", pigLatin); |
| |
| PigStats stats = runSimpleScript("secondTest", script); |
| assertEquals(1, stats.getNumberJobs()); |
| String name = stats.getOutputNames().get(0); |
| assertEquals("simple_out", name); |
| assertEquals(12, stats.getBytesWritten()); |
| assertEquals(3, stats.getRecordWritten()); |
| } |
| |
| @Test |
| public void firstParallelTest() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "Pig.fs(\"rmr simple_out\")", |
| "Pig.fs(\"rmr simple_out2\")", |
| "input = 'simple_table_1'", |
| "output1 = 'simple_out'", |
| "output2 = 'simple_out2'", |
| "P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';store a into '$output';\"\"\")", |
| "Q = P.bind([{'input':input, 'output':output1}, {'input':input, 'output':output2}])", |
| "stats = Q.run()" |
| }; |
| String[] input = { |
| "1\t3", |
| "2\t4", |
| "3\t5" |
| }; |
| |
| Util.createInputFile(cluster, "simple_table_1", input); |
| |
| Map<String, List<PigStats>> statsMap = runScript("firstParallelTest", script); |
| assertEquals(1, statsMap.size()); |
| assertEquals("mypipeline", statsMap.keySet().iterator().next()); |
| List<PigStats> lst = statsMap.get("mypipeline"); |
| assertEquals(2, lst.size()); |
| for (PigStats stats : lst) { |
| assertTrue("job should succeed", stats.isSuccessful()); |
| assertEquals(1, stats.getNumberJobs()); |
| assertEquals(12, stats.getBytesWritten()); |
| assertEquals(3, stats.getRecordWritten()); |
| } |
| } |
| |
| @Test |
| public void pigRunnerTest() throws Exception { |
| String[] script = { |
| "from org.apache.pig.scripting import *", |
| "Pig.fs(\"rmr simple_out\")", |
| "input = 'simple_table_2'", |
| "output = 'simple_out'", |
| "P = Pig.compile(\"\"\"a = load '$input';store a into '$output';\"\"\")", |
| "Q = P.bind({'input':input, 'output':output})", |
| "stats = Q.runSingle()", |
| "if stats.isSuccessful():", |
| "\tprint 'success!'", |
| "else:", |
| "\traise 'failed'" |
| }; |
| String[] input = { |
| "1\t3", |
| "2\t4", |
| "3\t5" |
| }; |
| |
| Util.createInputFile(cluster, "simple_table_2", input); |
| |
| PigStats mainStats = runPigRunner("pigRunnerTest", script); |
| |
| Map<String, List<PigStats>> statsMap = mainStats.getAllStats(); |
| assertEquals(1, statsMap.size()); |
| Iterator<List<PigStats>> it = statsMap.values().iterator(); |
| PigStats stats = it.next().get(0); |
| assertTrue("job should succeed", stats.isSuccessful()); |
| assertEquals(1, stats.getNumberJobs()); |
| String name = stats.getOutputNames().get(0); |
| assertEquals("simple_out", name); |
| assertEquals(12, stats.getBytesWritten()); |
| assertEquals(3, stats.getRecordWritten()); |
| String jobName = stats.getPigProperties().getProperty(PigContext.JOB_NAME); |
| assertTrue(jobName.contains("pigRunnerTest")); |
| } |
| |
| @Test |
| public void runParallelTest() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "input = 'simple_table_3'", |
| "Pig.fs(\"rmr simple_out\")", |
| "Pig.fs(\"rmr simple_out2\")", |
| "output1 = 'simple_out'", |
| "output2 = 'simple_out2'", |
| "P.set(\"jobName\", \"myjob\")", |
| "P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';store a into '$output';\"\"\")", |
| "Q = P.bind([{'input':input, 'output':output1}, {'input':input, 'output':output2}])", |
| "stats = Q.run()" |
| }; |
| String[] input = { |
| "1\t3", |
| "2\t4", |
| "3\t5" |
| }; |
| |
| Util.createInputFile(cluster, "simple_table_3", input); |
| |
| PigStats mainStats = runPigRunner("runParallelTest", script); |
| |
| Map<String, List<PigStats>> statsMap = mainStats.getAllStats(); |
| assertEquals(1, statsMap.size()); |
| assertEquals("mypipeline", statsMap.keySet().iterator().next()); |
| List<PigStats> lst = statsMap.get("mypipeline"); |
| assertEquals(2, lst.size()); |
| for (PigStats stats : lst) { |
| assertTrue("job should succeed", stats.isSuccessful()); |
| assertEquals(1, stats.getNumberJobs()); |
| assertEquals(12, stats.getBytesWritten()); |
| assertEquals(3, stats.getRecordWritten()); |
| String jobName = stats.getPigProperties().getProperty(PigContext.JOB_NAME); |
| assertTrue(jobName.contains("myjob")); |
| } |
| } |
| |
| @Test |
| public void runParallelTest2() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "input = 'simple_table_7'", |
| "Pig.fs(\"rmr simple_out\")", |
| "Pig.fs(\"rmr simple_out2\")", |
| "output1 = 'simple_out'", |
| "output2 = 'simple_out2'", |
| "P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';", |
| "b = foreach a generate $0, org.apache.pig.test.utils.UDFContextTestEvalFunc3($0);", |
| "store b into '$output';\"\"\")", |
| "Q = P.bind([{'input':input, 'output':output1}, {'input':input, 'output':output2}])", |
| "stats = Q.run()" |
| }; |
| String[] input = { |
| "1\t3" |
| }; |
| |
| Util.createInputFile(cluster, "simple_table_7", input); |
| PigStats mainStats = runPigRunner("runParallelTest2", script); |
| |
| Map<String, List<PigStats>> statsMap = mainStats.getAllStats(); |
| assertEquals(1, statsMap.size()); |
| assertEquals("mypipeline", statsMap.keySet().iterator().next()); |
| List<PigStats> lst = statsMap.get("mypipeline"); |
| assertEquals(2, lst.size()); |
| String[] results = new String[2]; |
| int i = 0; |
| for (PigStats stats : lst) { |
| assertTrue("job should succeed", stats.isSuccessful()); |
| assertEquals(1, stats.getNumberJobs()); |
| OutputStats os = stats.getOutputStats().get(0); |
| Tuple t = os.iterator().next(); |
| results[i++] = t.get(1).toString(); |
| } |
| assertTrue(results[0] != null); |
| assertTrue(results[1] != null); |
| assertTrue(!results[0].equals(results[1])); |
| } |
| |
| @Test |
| public void runLoopTest() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "Pig.fs(\"rmr simple_out\")", |
| "Pig.fs(\"rmr simple_out2\")", |
| "input = 'simple_table_4'", |
| "P = Pig.compile(\"mypipeline\", \"\"\"a = load '$input';store a into '$output';\"\"\")", |
| "for x in [\"simple_out\", \"simple_out2\"]:", |
| "\tQ = P.bind({'input':input, 'output':x}).run()" |
| }; |
| String[] input = { |
| "1\t3", |
| "2\t4", |
| "3\t5" |
| }; |
| |
| Util.createInputFile(cluster, "simple_table_4", input); |
| |
| PigStats mainStats = runPigRunner("runLoopTest", script); |
| |
| Map<String, List<PigStats>> statsMap = mainStats.getAllStats(); |
| assertEquals(1, statsMap.size()); |
| assertEquals("mypipeline", statsMap.keySet().iterator().next()); |
| List<PigStats> lst = statsMap.get("mypipeline"); |
| assertEquals(2, lst.size()); |
| for (PigStats stats : lst) { |
| assertTrue("job should succeed", stats.isSuccessful()); |
| assertEquals(1, stats.getNumberJobs()); |
| assertEquals(12, stats.getBytesWritten()); |
| assertEquals(3, stats.getRecordWritten()); |
| } |
| } |
| |
| @Test |
| public void bindLocalVariableTest() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "Pig.fs(\"rmr simple_out\")", |
| "input = 'simple_table_5'", |
| "output = 'simple_out'", |
| "testvar = 'abcd$py'", |
| "testvar2 = '$'", |
| "testvar3 = '\\\\\\\\$'", |
| "testvar4 = 'abcd\\$py$'", |
| "testvar5 = 'abcd\\$py'", |
| "P = Pig.compile(\"\"\"a = load '$input';store a into '$output';\"\"\")", |
| "Q = P.bind()", |
| "stats = Q.runSingle()", |
| "if stats.isSuccessful():", |
| "\tprint 'success!'", |
| "else:", |
| "\traise 'failed'" |
| }; |
| String[] input = { |
| "1\t3", |
| "2\t4", |
| "3\t5" |
| }; |
| |
| Util.createInputFile(cluster, "simple_table_5", input); |
| |
| PigStats stats = runSimpleScript("bindLocalVariableTest", script); |
| |
| assertEquals(1, stats.getNumberJobs()); |
| String name = stats.getOutputNames().get(0); |
| assertEquals("simple_out", name); |
| assertEquals(12, stats.getBytesWritten()); |
| assertEquals(3, stats.getRecordWritten()); |
| } |
| |
| @Test |
| public void bindLocalVariableTest2() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "Pig.fs(\"rmr simple_out\")", |
| "input = 'bindLocalVariableTest2'", |
| "output = 'simple_out'", |
| "separator = '$'", |
| "P = Pig.compile(\"\"\"a = load '$input' using PigStorage('$separator');store a into '$output';\"\"\")", |
| "Q = P.bind()", |
| "stats = Q.runSingle()", |
| "if stats.isSuccessful():", |
| "\tprint 'success!'", |
| "else:", |
| "\traise 'failed'" |
| }; |
| String[] input = { |
| "1$3", |
| "2$4", |
| "3$5" |
| }; |
| |
| Util.createInputFile(cluster, "bindLocalVariableTest2", input); |
| |
| PigStats stats = runSimpleScript("bindLocalVariableTest2", script); |
| |
| assertEquals(1, stats.getNumberJobs()); |
| String name = stats.getOutputNames().get(0); |
| assertEquals("simple_out", name); |
| assertEquals(12, stats.getBytesWritten()); |
| assertEquals(3, stats.getRecordWritten()); |
| } |
| |
| @Test |
| public void bindNonStringVariableTest() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "Pig.fs(\"-rmr simple_out\")", |
| "input = 'simple_table'", |
| "output = 'simple_out'", |
| "max = 2", |
| "P = Pig.compile(\"\"\"a = load '$in' as (a0:int, a1:int);" + |
| " b = filter a by a0 > $max;" + |
| " store b into '$out';\"\"\")", |
| "Q = P.bind({'in':input, 'out':output, 'max':max})", |
| "stats = Q.runSingle()", |
| "if stats.isSuccessful():", |
| "\tprint 'success!'", |
| "else:", |
| "\traise 'failed'" |
| }; |
| String[] input = { |
| "1\t3", |
| "2\t4", |
| "3\t5" |
| }; |
| |
| Util.deleteFile(cluster, "simple_table"); |
| Util.createInputFile(cluster, "simple_table", input); |
| |
| PigStats stats = runSimpleScript("bindNonStringVariableTest", script); |
| |
| assertEquals(1, stats.getNumberJobs()); |
| String name = stats.getOutputNames().get(0); |
| assertEquals("simple_out", name); |
| assertEquals(4, stats.getBytesWritten()); |
| assertEquals(1, stats.getRecordWritten()); |
| } |
| |
| @Test |
| public void fsTest() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "ret = Pig.fs(\"-rmr simple_out\")", |
| "if ret == 0:", |
| "\tprint 'success!'", |
| "else:", |
| "\traise 'fs command failed'" |
| }; |
| |
| runPigRunner("fsTest", script, false); |
| |
| } |
| |
| @Test |
| public void NegativeTest() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| " from org.apache.pig.scripting import *", |
| "Pig.fs(\"rmr simple_out\")" |
| }; |
| |
| PigStats stats = runPigRunner("NegativeTest", script, false); |
| |
| assertTrue(stats.getErrorCode() == 1121); |
| assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.PIG_EXCEPTION); |
| } |
| |
| @Test |
| public void NegativeTest2() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| " from org.apache.pig.scripting import *", |
| "raise 'This is a test'" |
| }; |
| |
| PigStats stats = runPigRunner("NegativeTest2", script, false); |
| assertTrue(stats.getErrorCode() == 1121); |
| assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.PIG_EXCEPTION); |
| } |
| |
| @Test // PIG-2056 |
| public void NegativeTest3() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import Pig", |
| "P = Pig.compile(\"\"\"", |
| " #TEST PIG COMMENTS", |
| " A = load 'studenttab10k' as (name, age, gpa);", |
| " store A into 'CompileBindRun_2.out';\"\"\")", |
| "result = P.bind().runSingle()" |
| }; |
| |
| PigStats stats = runPigRunner("NegativeTest3", script, false); |
| assertTrue(stats.getErrorCode() == 1121); |
| assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.PIG_EXCEPTION); |
| |
| String expected = "Python Error. Traceback (most recent call last):"; |
| |
| String msg = stats.getErrorMessage(); |
| Util.checkErrorMessageContainsExpected(msg, expected); |
| |
| } |
| |
| @Test |
| public void testFixNonEscapedDollarSign() throws Exception { |
| java.lang.reflect.Method fixNonEscapedDollarSign = Class.forName( |
| "org.apache.pig.scripting.Pig").getDeclaredMethod( |
| "fixNonEscapedDollarSign", new Class[] { String.class }); |
| |
| fixNonEscapedDollarSign.setAccessible(true); |
| |
| String s = (String)fixNonEscapedDollarSign.invoke(null, "abc$py$"); |
| assertEquals("abc\\\\$py\\\\$", s); |
| |
| s = (String)fixNonEscapedDollarSign.invoke(null, "$abc$py"); |
| assertEquals("\\\\$abc\\\\$py", s); |
| |
| s = (String)fixNonEscapedDollarSign.invoke(null, "$"); |
| assertEquals("\\\\$", s); |
| |
| s = (String)fixNonEscapedDollarSign.invoke(null, "$$abc"); |
| assertEquals("\\\\$\\\\$abc", s); |
| } |
| |
| // See PIG-2291 |
| @Test |
| public void testDumpInScript() throws Exception{ |
| |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "Pig.fs(\"rmr simple_out\")", |
| "input = 'testDumpInScript_table'", |
| "output = 'simple_out'", |
| "P = Pig.compileFromFile(\"\"\"testDumpInScript.pig\"\"\")", |
| "Q = P.bind({'input':input, 'output':output})", |
| "stats = Q.runSingle()", |
| "if stats.isSuccessful():", |
| "\tprint 'success!'", |
| "else:", |
| "\tprint 'failed'" |
| }; |
| String[] input = { |
| "1\t3", |
| "2\t4", |
| "3\t5" |
| }; |
| |
| String[] pigLatin = { |
| "a = load '$input' as (a0:int,a1:int);", |
| "store a into '$output';", |
| "dump a" |
| }; |
| |
| Util.createInputFile(cluster, "testDumpInScript_table", input); |
| Util.createLocalInputFile( "testDumpInScript.pig", pigLatin); |
| runSimpleScript("testDumpInScript", script); |
| |
| } |
| |
| // See PIG-2291 |
| @Test |
| public void testIllustrateInScript() throws Exception { |
| Assume.assumeTrue("Skip this test for TEZ. See PIG-3993", Util.isMapredExecType(cluster.getExecType())); |
| String[] script = { "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "Pig.fs(\"rmr simple_out\")", |
| "input = 'testIllustrateInScript_table'", |
| "output = 'simple_out'", |
| "P = Pig.compileFromFile(\"\"\"testIllustrateInScript.pig\"\"\")", |
| "Q = P.bind({'input':input, 'output':output})", |
| "stats = Q.runSingle()", "if stats.isSuccessful():", |
| "\tprint 'success!'", "else:", "\tprint 'failed'" }; |
| String[] input = { "1\t3", "2\t4", "3\t5" }; |
| |
| String[] pigLatin = { "a = load '$input' as (a0:int,a1:int);", |
| "store a into '$output';", "illustrate a" }; |
| |
| Util.createInputFile(cluster, "testIllustrateInScript_table", input); |
| Util.createLocalInputFile("testIllustrateInScript.pig", pigLatin); |
| runSimpleScript("testIllustrateInScript", script); |
| } |
| |
| @Test |
| public void testPyShouldNotFailScriptIfExitCodeIs0() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import *", |
| "import sys", |
| "if 1 == 2:", |
| " sys.exit(1)", |
| "else: sys.exit(0)" |
| }; |
| |
| Map<String, List<PigStats>> statsMap = runScript("testPyShouldNotFailScriptIfExitCodeIs0", script); |
| assertEquals(0, statsMap.size()); |
| |
| } |
| |
| @Test |
| public void testSysArguments() throws Exception { |
| String[] script = { |
| "#!/usr/bin/python", |
| "from org.apache.pig.scripting import Pig", |
| "import sys", |
| "P = Pig.compile(\"\"\"rmf $file;\"\"\")", |
| "files = sys.argv[1].strip().split(',')", |
| "for file in files:", |
| " P.bind({'file':file}).runSingle()" |
| }; |
| String file1 = "/tmp/sysarg_1"; |
| String file2 = "/tmp/sysarg_2"; |
| createEmptyFiles(file1, file2); |
| |
| File scriptFile = Util.createInputFile("sysarg", ".py", script); |
| // ExecMode.STRING |
| PigStats stats = PigRunner.run(new String[] { scriptFile.getAbsolutePath(), |
| file1 + "," + file2 }, null); |
| assertEquals(null, stats.getErrorMessage()); |
| assertFileNotExists(file1, file2); |
| |
| createEmptyFiles(file1, file2); |
| // Clear stats from previous execution |
| PigStatsUtil.getEmptyPigStats(); |
| |
| // ExecMode.FILE |
| stats = PigRunner.run(new String[] { "-x", cluster.getExecType().name().toLowerCase(), |
| "-f", scriptFile.getAbsolutePath(), "arg0", |
| file1 + "," + file2 }, null); |
| assertEquals(null, stats.getErrorMessage()); |
| assertFileNotExists(file1, file2); |
| } |
| |
| private void createEmptyFiles(String... filenames) throws IOException { |
| for (String file : filenames) { |
| Util.createInputFile(cluster, file, new String[]{""}); |
| assertTrue(cluster.getFileSystem().exists(new Path(file))); |
| } |
| } |
| |
| private void assertFileNotExists(String... filenames) throws IOException { |
| for (String file : filenames) { |
| assertFalse(cluster.getFileSystem().exists(new Path(file))); |
| } |
| } |
| |
| } |