| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.test; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.BufferedReader; |
| import java.io.ByteArrayInputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileReader; |
| import java.io.FileWriter; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.PrintWriter; |
| import java.io.StringReader; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Properties; |
| |
| import org.apache.log4j.Appender; |
| import org.apache.log4j.FileAppender; |
| import org.apache.log4j.Level; |
| import org.apache.log4j.PatternLayout; |
| import org.apache.pig.EvalFunc; |
| import org.apache.pig.ExecType; |
| import org.apache.pig.PigConfiguration; |
| import org.apache.pig.PigException; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.executionengine.ExecJob; |
| import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.io.FileLocalizer; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.impl.util.JavaCompilerHelper; |
| import org.apache.pig.test.Util.ProcessReturnInfo; |
| import org.apache.pig.tools.grunt.Grunt; |
| import org.apache.pig.tools.pigscript.parser.ParseException; |
| import org.apache.pig.tools.pigstats.JobStats; |
| import org.apache.pig.tools.pigstats.JobStats.JobState; |
| import org.apache.pig.tools.pigstats.PigStats; |
| import org.apache.pig.tools.pigstats.PigStats.JobGraph; |
| import org.junit.AfterClass; |
| import org.junit.Assume; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| public class TestGrunt { |
| |
| static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); |
| private String basedir = "test/org/apache/pig/test/data"; |
| |
| @BeforeClass |
| public static void oneTimeSetup() throws Exception { |
| cluster.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY,"true"); |
| } |
| |
| @AfterClass |
| public static void oneTimeTearDown() throws Exception { |
| cluster.shutDown(); |
| } |
| |
| @Before |
| public void setup() { |
| Util.resetStateForExecModeSwitch(); |
| } |
| |
| |
| @Test |
| public void testCopyFromLocal() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "copyFromLocal README.txt sh_copy ;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| |
| @Test |
| public void testDefine() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "define myudf org.apache.pig.builtin.AVG();\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| try { |
| grunt.exec(); |
| } catch (Exception e) { |
| assertTrue(e.getMessage().contains("Encountered \"define\"")); |
| } |
| assertTrue(null != context.getFuncSpecFromAlias("myudf")); |
| } |
| |
| @Test |
| public void testBagSchema() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'input1' as (b: bag{t:(i: int, c:chararray, f: float)});\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testBagSchemaFail() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'input1'as (b: bag{t:(i: int, c:chararray, f: float)});\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| try { |
| grunt.exec(); |
| fail( "Test case is supposed to fail." ); |
| } catch (Exception e) { |
| assertTrue(e.getMessage().contains( |
| "<line 1, column 62> mismatched input ';' expecting RIGHT_PAREN")); |
| } |
| } |
| |
| @Test |
| public void testBagConstant() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'input1'; b = foreach a generate {(1, '1', 0.4f),(2, '2', 0.45)};\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testBagConstantWithSchema() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'input1'; b = foreach a generate " |
| + "{(1, '1', 0.4f),(2, '2', 0.45)} as " |
| + "b: bag{t:(i: int, c:chararray, d: double)};\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testBagConstantInForeachBlock() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'input1'; " |
| + "b = foreach a {generate {(1, '1', 0.4f),(2, '2', 0.45)};};\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testBagConstantWithSchemaInForeachBlock() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'input1'; " |
| + "b = foreach a {generate {(1, '1', 0.4f),(2, '2', 0.45)} " |
| + "as b: bag{t:(i: int, c:chararray, d: double)};};\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testParsingAsInForeachBlock() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast); " |
| + "b = group a by foo; c = foreach b " |
| + "{generate SUM(a.fast) as fast;};\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testParsingAsInForeachWithOutBlock() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast); " |
| + "b = group a by foo; c = foreach b generate SUM(a.fast) as fast;\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testParsingWordWithAsInForeachBlock() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast); " |
| + "b = group a by foo; c = foreach b {generate SUM(a.fast);};\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testParsingWordWithAsInForeachWithOutBlock() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast); " |
| + "b = group a by foo; c = foreach b generate SUM(a.fast);\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testParsingWordWithAsInForeachWithOutBlock2() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "cash = load 'foo' as (foo, fast); " |
| + "b = foreach cash generate fast * 2.0;\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| |
| @Test |
| public void testParsingGenerateInForeachBlock() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate); " |
| + "b = group a by foo; c = foreach b {generate a.regenerate;};\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testParsingGenerateInForeachWithOutBlock() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate); " |
| + "b = group a by foo; c = foreach b generate a.regenerate;\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testParsingAsGenerateInForeachBlock() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate); " |
| + "b = group a by foo; c = foreach b {generate " |
| + "{(1, '1', 0.4f),(2, '2', 0.45)} " |
| + "as b: bag{t:(i: int, cease:chararray, degenerate: double)}, " |
| + "SUM(a.fast) as fast, a.regenerate as degenerated;};\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testParsingAsGenerateInForeachWithOutBlock() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate); " |
| + "b = group a by foo; c = foreach b generate " |
| + "{(1, '1', 0.4f),(2, '2', 0.45)} " |
| + "as b: bag{t:(i: int, cease:chararray, degenerate: double)}, " |
| + "SUM(a.fast) as fast, a.regenerate as degenerated;\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testRunStatment() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate);" |
| + " run -param LIMIT=5 -param_file " + basedir |
| + "/test_broken.ppf " + basedir + "/testsub.pig; explain bar"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testExecStatment() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| boolean caught = false; |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate);" |
| + " exec -param LIMIT=5 -param FUNCTION=COUNT " |
| + "-param FILE=foo " + basedir + "/testsub.pig; explain bar;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| try { |
| grunt.exec(); |
| } catch (Exception e) { |
| caught = true; |
| assertTrue(e.getMessage().contains("alias bar")); |
| } |
| assertTrue(caught); |
| } |
| |
| @Test |
| public void testRunStatmentNested() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate); run " |
| + basedir + "/testsubnested_run.pig; explain bar"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testExecStatmentNested() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| boolean caught = false; |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate); exec " |
| + basedir + "/testsubnested_exec.pig; explain bar"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| try { |
| grunt.exec(); |
| } catch (Exception e) { |
| caught = true; |
| assertTrue(e.getMessage().contains("alias bar")); |
| } |
| assertTrue(caught); |
| } |
| |
| @Test |
| public void testErrorLineNumber() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "A = load 'x' as ( u:int, v:chararray );\n" + |
| "sh ls\n" + |
| "B = foreach A generate u , v; C = distinct 'xxx';\n" + |
| "store C into 'y';"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| try { |
| grunt.exec(); |
| } catch (Exception e) { |
| assertTrue(e.getMessage().contains( |
| "<line 3, column 42> Syntax error, unexpected symbol at or near ''xxx''")); |
| return; |
| } |
| fail( "Test case is supposed to fail." ); |
| } |
| |
| @Test |
| public void testExplainEmpty() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate); run " |
| + basedir + "/testsubnested_run.pig; explain"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testExplainScript() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate); explain -script " |
| + basedir + "/testsubnested_run.pig;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| /** |
| * PIG-2084 |
| * Check if only statements used in query are validated, in non-interactive |
| * /non-check mode. There is an 'unused' statement in query that would otherise |
| * fail the validation. |
| * Primary purpose of test is to verify that check not happening for |
| * every statement. |
| * @throws Throwable |
| */ |
| @Test |
| public void testExplainScriptIsEachStatementValidated() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate);" + |
| "b = foreach a generate foo + 'x' + 1;" + |
| "c = foreach a generate foo, fast;" + |
| "explain c; "; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testIllustrateScript() throws Throwable { |
| PigServer server = new PigServer(ExecType.LOCAL, new Properties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "illustrate -script " |
| + basedir + "/illustrate.pig;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testIllustrateScript2() throws Throwable { |
| PigServer server = new PigServer(ExecType.LOCAL, new Properties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "illustrate -script " |
| + basedir + "/illustrate2.pig;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testIllustrateScript3() throws Throwable { |
| PigServer server = new PigServer(ExecType.LOCAL, new Properties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "illustrate -script " |
| + basedir + "/illustrate3.pig;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testIllustrateScript4() throws Throwable { |
| // empty line/field test |
| PigServer server = new PigServer(ExecType.LOCAL, new Properties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "illustrate -script " |
| + basedir + "/illustrate4.pig;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testIllustrateScript5() throws Throwable { |
| // empty line/field test |
| PigServer server = new PigServer(ExecType.LOCAL, new Properties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "illustrate -script " |
| + basedir + "/illustrate5.pig;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testIllustrateScript6() throws Throwable { |
| // empty line/field test |
| PigServer server = new PigServer(ExecType.LOCAL, new Properties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "illustrate -script " |
| + basedir + "/illustrate6.pig;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testIllustrateScript7() throws Throwable { |
| // empty line/field test |
| PigServer server = new PigServer(ExecType.LOCAL, new Properties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "illustrate -script " |
| + basedir + "/illustrate7.pig;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| /** |
| * verify that grunt commands are ignored in explain -script mode |
| */ |
| @Test |
| public void testExplainScript2() throws Throwable { |
| |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "explain -script " |
| + basedir + "/explainScript.pig;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| String logMessagesFile = "TestGrunt-testExplainScript2-stderr.txt"; |
| // add a file based appender to the root logger so we can parse the |
| // messages logged by grunt and verify that grunt commands are ignored |
| // in explain -script mode |
| Appender fileAppender = new FileAppender(new PatternLayout(), logMessagesFile); |
| |
| try { |
| org.apache.log4j.LogManager.getRootLogger().addAppender(fileAppender); |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| grunt.exec(); |
| BufferedReader in = new BufferedReader(new FileReader(logMessagesFile)); |
| String gruntLoggingContents = ""; |
| //read file into a string |
| String line; |
| while ( (line = in.readLine()) != null) { |
| gruntLoggingContents += line + "\n"; |
| } |
| in.close(); |
| String[] cmds = new String[] { "'rm/rmf'", "'cp'", "'cat'", "'cd'", "'pwd'", |
| "'copyFromLocal'", "'copyToLocal'", "'describe'", "'ls'", |
| "'mkdir'", "'illustrate'", "'run/exec'", "'fs'", "'aliases'", |
| "'mv'", "'dump'" }; |
| for (String c : cmds) { |
| String expected = c + " statement is ignored while processing " + |
| "'explain -script' or '-check'"; |
| assertTrue("Checking if " + gruntLoggingContents + " contains " + |
| expected, gruntLoggingContents.contains(expected)); |
| } |
| } finally { |
| org.apache.log4j.LogManager.getRootLogger().removeAppender(fileAppender); |
| new File(logMessagesFile).delete(); |
| } |
| } |
| |
| @Test |
| public void testExplainBrief() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate); explain -brief -script " |
| + basedir + "/testsubnested_run.pig;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testExplainDot() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate); explain -dot -script " |
| + basedir + "/testsubnested_run.pig;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testExplainOut() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "a = load 'foo' as (foo, fast, regenerate); explain -out /tmp -script " |
| + basedir + "/testsubnested_run.pig;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testPartialExecution() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| FileLocalizer.setInitialized(false); |
| |
| String strCmd = "rmf bar; rmf baz; " |
| + "a = load '" |
| + Util.generateURI("file:test/org/apache/pig/test/data/passwd", |
| context) |
| + "';" |
| + "store a into 'bar'; exec; a = load 'bar'; store a into 'baz';\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testFileCmds() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = |
| "rmf bar; rmf baz;" |
| +"a = load '" |
| + Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';" |
| +"store a into 'bar';" |
| +"cp bar baz;" |
| +"rm bar; rm baz;" |
| +"store a into 'baz';" |
| +"store a into 'bar';" |
| +"rm baz; rm bar;" |
| +"store a into 'baz';" |
| +"mv baz bar;" |
| +"b = load 'bar';" |
| +"store b into 'baz';" |
| +"cat baz;" |
| +"rm baz;" |
| +"rm bar;\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testCD() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = |
| "mkdir /tmp;" |
| +"mkdir /tmp/foo;" |
| +"cd /tmp;" |
| +"rmf bar; rmf foo/baz;" |
| +"copyFromLocal test/org/apache/pig/test/data/passwd bar;" |
| +"a = load 'bar';" |
| +"cd foo;" |
| +"store a into 'baz';" |
| +"cd /;" |
| +"rm /tmp/bar; rm /tmp/foo/baz;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testDump() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = |
| "rmf bla;" |
| +"a = load '" |
| + Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';" |
| +"e = group a by $0;" |
| +"f = foreach e generate group, COUNT($1);" |
| +"store f into 'bla';" |
| +"f1 = load 'bla';" |
| +"g = order f1 by $1;" |
| +"dump g;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testIllustrate() throws Throwable { |
| Assume.assumeTrue("Skip this test for TEZ. See PIG-3993", Util.isMapredExecType(cluster.getExecType())); |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = |
| "rmf bla;" |
| +"a = load '" |
| + Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';" |
| +"e = group a by $0;" |
| +"f = foreach e generate group, COUNT($1);" |
| +"store f into 'bla';" |
| +"f1 = load 'bla' as (f:chararray);" |
| +"g = order f1 by $0;" |
| +"illustrate g;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testKeepGoing() throws Throwable { |
| Assume.assumeTrue("Skip this test for TEZ", Util.isMapredExecType(cluster.getExecType())); |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| |
| PigContext context = server.getPigContext(); |
| |
| String filename = |
| Util.generateURI("file:test/org/apache/pig/test/data/passwd", context); |
| String strCmd = |
| "rmf bar;" |
| +"rmf foo;" |
| +"rmf baz;" |
| +"A = load '" + filename + "';" |
| +"B = foreach A generate 1;" |
| +"C = foreach A generate 0/0;" |
| +"store B into 'foo';" |
| +"store C into 'bar';" |
| +"A = load '" + filename + "';" |
| +"B = stream A through `false`;" |
| +"store B into 'baz';" |
| +"cat bar;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| } |
| |
| @Test |
| public void testKeepGoigFailed() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "passwd"); |
| String strCmd = |
| "rmf bar;" |
| +"rmf foo;" |
| +"rmf baz;" |
| +"A = load 'passwd';" |
| +"B = foreach A generate 1;" |
| +"C = foreach A generate 0/0;" |
| +"store B into 'foo';" |
| +"store C into 'bar';" |
| +"A = load 'passwd';" |
| +"B = stream A through `false`;" |
| +"store B into 'baz';" |
| +"cat baz;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| boolean caught = false; |
| try { |
| grunt.exec(); |
| } catch (Exception e) { |
| caught = true; |
| assertTrue(e.getMessage().contains("baz does not exist")); |
| } |
| assertTrue(caught); |
| } |
| |
| @Test |
| public void testInvalidParam() throws Throwable { |
| PigServer server = new PigServer(ExecType.LOCAL, cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = |
| "run -param -param;"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| boolean caught = false; |
| try { |
| grunt.exec(); |
| } catch (ParseException e) { |
| caught = true; |
| assertTrue(e.getMessage().contains("Encountered")); |
| } |
| assertTrue(caught); |
| } |
| |
| @Test |
| public void testStopOnFailure() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| context.getProperties().setProperty("stop.on.failure", ""+true); |
| context.getProperties().setProperty("mapreduce.map.maxattempts", "2"); |
| |
| String strCmd = |
| "rmf bar;\n" |
| +"rmf foo;\n" |
| +"rmf baz;\n" |
| +"copyFromLocal test/org/apache/pig/test/data/passwd pre;\n" |
| +"A = load '" |
| + Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';\n" |
| +"B = stream A through `false`;\n" |
| +"store B into 'bar' using BinStorage();\n" |
| +"A1 = load '" |
| + Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';\n" |
| + "A1 = foreach A1 generate org.apache.pig.test.TestGrunt$SleepUDF();\n" |
| +"store A1 into 'bar1' using BinStorage();\n" |
| +"A = load 'bar';\n" |
| +"store A into 'foo';\n" |
| +"cp pre done;\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| boolean caught = false; |
| try { |
| grunt.exec(); |
| } catch (PigException e) { |
| caught = true; |
| assertTrue(e.getErrorCode() == 6017); |
| } |
| |
| if (Util.isMapredExecType(cluster.getExecType())) { |
| JobGraph jobGraph = PigStats.get().getJobGraph(); |
| List<JobStats> failedJobs = jobGraph.getFailedJobs(); |
| assertEquals(2, failedJobs.size()); |
| for (JobStats stats : failedJobs) { |
| if (stats.getAlias().equals("A,B")) { |
| // Job with alias A,B should have failed because of streaming error |
| assertTrue(stats.getException().getMessage().contains( |
| "Received Error while processing the map plan: " |
| + "'false (stdin-org.apache.pig.builtin.PigStreaming/stdout-org.apache.pig.builtin.PigStreaming)'" |
| + " failed with exit status: 1")); |
| } else { |
| // Job with alias A1 with sleep should be killed as a result of stop on failure |
| assertTrue(stats.getErrorMessage().startsWith("Failing running job for -stop_on_failure")); |
| } |
| } |
| |
| // Third job which is dependent on alias A,B should not have started |
| assertEquals(1, getUnknownJobs(jobGraph).size()); |
| } else { |
| // Tez |
| JobGraph jobGraph = PigStats.get().getJobGraph(); |
| List<JobStats> failedJobs = jobGraph.getFailedJobs(); |
| assertEquals(1, failedJobs.size()); |
| // First job should have failed because of streaming error. Sleep is also part of same job |
| assertTrue(failedJobs.get(0).getException().getMessage().contains( |
| "Received Error while processing the map plan: " |
| + "'false (stdin-org.apache.pig.builtin.PigStreaming/stdout-org.apache.pig.builtin.PigStreaming)'" |
| + " failed with exit status: 1")); |
| // Second job which is dependent on first should not have started |
| assertEquals(1, getUnknownJobs(jobGraph).size()); |
| } |
| |
| // Parallel job (sleep udf) should have not succeeded and been killed |
| assertFalse(server.existsFile("bar1")); |
| // fs cp command at the end should not have been executed |
| assertFalse(server.existsFile("done")); |
| assertTrue(caught); |
| } |
| |
| private ArrayList<JobStats> getUnknownJobs(JobGraph jobGraph) { |
| ArrayList<JobStats> unknown = new ArrayList<JobStats>(); |
| Iterator<JobStats> iter = jobGraph.iterator(); |
| while (iter.hasNext()) { |
| JobStats js = iter.next(); |
| if (js.getState() == JobState.UNKNOWN) { |
| unknown.add(js); |
| } |
| } |
| return unknown; |
| } |
| |
| public static class SleepUDF extends EvalFunc<Integer> { |
| |
| @Override |
| public Integer exec(Tuple input) throws IOException { |
| try { |
| Thread.sleep(50000); |
| } catch (InterruptedException e) { |
| throw new IOException(e); |
| } |
| return null; |
| } |
| } |
| |
| @Test |
| public void testFsCommand() throws Throwable { |
| |
| PigServer server = new PigServer(cluster.getExecType(),cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = |
| "fs -ls /;" |
| +"fs -mkdir /fstmp;" |
| +"fs -mkdir /fstmp/foo;" |
| +"cd /fstmp;" |
| +"fs -copyFromLocal test/org/apache/pig/test/data/passwd bar;" |
| +"a = load 'bar';" |
| +"cd foo;" |
| +"store a into 'baz';" |
| +"cd /;" |
| +"fs -ls .;" |
| +"fs -rmr /fstmp/foo/baz;" |
| +"cd"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| grunt.exec(); |
| |
| } |
| |
| @Test |
| public void testShellCommand(){ |
| |
| try { |
| PigServer server = new PigServer(cluster.getExecType(),cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strRemoveFile = "rm"; |
| String strRemoveDir = "rmdir"; |
| |
| if (Util.WINDOWS) |
| { |
| strRemoveFile = "del"; |
| strRemoveDir = "rd"; |
| } |
| |
| String strCmd = "sh mkdir test_shell_tmp;"; |
| |
| // Create a temp directory via command and make sure it exists |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| grunt.exec(); |
| assertTrue(new File("test_shell_tmp").exists()); |
| |
| // Remove the temp directory via shell and make sure it is gone |
| strCmd = "sh " + strRemoveDir + " test_shell_tmp;"; |
| |
| cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| reader = new InputStreamReader(cmd); |
| grunt = new Grunt(new BufferedReader(reader), context); |
| grunt.exec(); |
| assertFalse(new File("test_shell_tmp").exists()); |
| |
| // Verify pipes are usable in the command context by piping data to a file |
| strCmd = "sh echo hello world > tempShFileToTestShCommand"; |
| |
| cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| reader = new InputStreamReader(cmd); |
| grunt = new Grunt(new BufferedReader(reader), context); |
| grunt.exec(); |
| BufferedReader fileReader = null; |
| fileReader = new BufferedReader(new FileReader("tempShFileToTestShCommand")); |
| assertTrue(fileReader.readLine().trim().equals("hello world")); |
| |
| fileReader.close(); |
| |
| // Remove the file via cmd and make sure it is gone |
| strCmd = "sh " + strRemoveFile + " tempShFileToTestShCommand"; |
| cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| reader = new InputStreamReader(cmd); |
| grunt = new Grunt(new BufferedReader(reader), context); |
| grunt.exec(); |
| assertFalse(new File("tempShFileToTestShCommand").exists()); |
| |
| if (Util.WINDOWS) { |
| strCmd = "sh echo foo > TouchedFileInsideGrunt_61 && dir /B | findstr TouchedFileInsideGrunt_61 > fileContainingTouchedFileInsideGruntShell_71"; |
| } |
| else { |
| strCmd = "sh touch TouchedFileInsideGrunt_61 && ls | grep TouchedFileInsideGrunt_61 > fileContainingTouchedFileInsideGruntShell_71"; |
| } |
| |
| cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| reader = new InputStreamReader(cmd); |
| grunt = new Grunt(new BufferedReader(reader), context); |
| grunt.exec(); |
| fileReader = new BufferedReader(new FileReader("fileContainingTouchedFileInsideGruntShell_71")); |
| assertTrue(fileReader.readLine().trim().equals("TouchedFileInsideGrunt_61")); |
| |
| fileReader.close(); |
| strCmd = "sh " + strRemoveFile+" fileContainingTouchedFileInsideGruntShell_71"; |
| |
| cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| reader = new InputStreamReader(cmd); |
| grunt = new Grunt(new BufferedReader(reader), context); |
| grunt.exec(); |
| assertFalse(new File("fileContainingTouchedFileInsideGruntShell_71").exists()); |
| strCmd = "sh " + strRemoveFile + " TouchedFileInsideGrunt_61"; |
| cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| reader = new InputStreamReader(cmd); |
| grunt = new Grunt(new BufferedReader(reader), context); |
| grunt.exec(); |
| assertFalse(new File("TouchedFileInsideGrunt_61").exists()); |
| |
| |
| } catch (ExecException e) { |
| e.printStackTrace(); |
| fail(); |
| } catch (Throwable e) { |
| e.printStackTrace(); |
| fail(); |
| } |
| } |
| |
| // See PIG-2497 |
| @Test |
| public void testShellCommandOrder() throws Throwable { |
| PigServer server = new PigServer(ExecType.LOCAL, new Properties()); |
| |
| String strRemove = "rm"; |
| |
| if (Util.WINDOWS) |
| { |
| strRemove = "del"; |
| } |
| |
| File inputFile = File.createTempFile("testInputFile", ".txt"); |
| PrintWriter pwInput = new PrintWriter(new FileWriter(inputFile)); |
| pwInput.println("1"); |
| pwInput.close(); |
| |
| File inputScript = File.createTempFile("testInputScript", ""); |
| File outputFile = File.createTempFile("testOutputFile", ".txt"); |
| outputFile.delete(); |
| PrintWriter pwScript = new PrintWriter(new FileWriter(inputScript)); |
| pwScript.println("a = load '" + Util.encodeEscape(inputFile.getAbsolutePath()) + "';"); |
| pwScript.println("store a into '" + Util.encodeEscape(outputFile.getAbsolutePath()) + "';"); |
| pwScript.println("sh " + strRemove + " " + Util.encodeEscape(inputFile.getAbsolutePath())); |
| pwScript.close(); |
| |
| InputStream inputStream = new FileInputStream(inputScript.getAbsoluteFile()); |
| server.setBatchOn(); |
| server.registerScript(inputStream); |
| List<ExecJob> execJobs = server.executeBatch(); |
| assertTrue(execJobs.get(0).getStatus() == JOB_STATUS.COMPLETED); |
| } |
| |
| @Test |
| public void testSetPriority() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "set job.priority high\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| assertEquals("high", context.getProperties().getProperty(PigContext.JOB_PRIORITY)); |
| } |
| |
| @Test |
| public void testSetWithQuotes() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "set job.priority 'high'\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| assertEquals("high", context.getProperties().getProperty(PigContext.JOB_PRIORITY)); |
| } |
| |
| @Test |
| public void testRegisterWithQuotes() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| String jarName = Util.findPigJarName(); |
| |
| String strCmd = "register '" + jarName + "'\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| assertEquals(context.extraJars+ " of size 1", 1, context.extraJars.size()); |
| assertTrue(context.extraJars.get(0)+" ends with /" + jarName, context.extraJars.get(0).toString().endsWith("/" + jarName)); |
| } |
| |
| @Test |
| public void testRegisterWithoutQuotes() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| String jarName = Util.findPigJarName(); |
| |
| String strCmd = "register " + jarName + "\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| assertEquals(context.extraJars+ " of size 1", 1, context.extraJars.size()); |
| assertTrue(context.extraJars.get(0)+" ends with /" + jarName, context.extraJars.get(0).toString().endsWith("/" + jarName)); |
| } |
| |
| @Test |
| public void testRegisterScripts() throws Throwable { |
| String[] script = { |
| "#!/usr/bin/python", |
| "@outputSchema(\"x:{t:(num:long)}\")", |
| "def square(number):" , |
| "\treturn (number * number)" |
| }; |
| |
| Util.createLocalInputFile( "testRegisterScripts.py", script); |
| |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "register testRegisterScripts.py using jython as pig\n"; |
| |
| ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| grunt.exec(); |
| assertTrue(context.getFuncSpecFromAlias("pig.square") != null); |
| |
| } |
| |
| @Test |
| public void testScriptMissingLastNewLine() throws Throwable { |
| PigServer server = new PigServer(ExecType.LOCAL); |
| PigContext context = server.getPigContext(); |
| |
| String strCmd = "A = load 'bar';\nB = foreach A generate $0;"; |
| |
| BufferedReader reader = new BufferedReader(new StringReader(strCmd)); |
| String substituted = context.doParamSubstitution(reader, null, null); |
| BufferedReader pigInput = new BufferedReader(new StringReader(substituted)); |
| |
| Grunt grunt = new Grunt(pigInput, context); |
| int results[] = grunt.exec(); |
| for (int i=0; i<results.length; i++) { |
| assertEquals(0, results[i]); |
| } |
| } |
| |
| |
| // Test case for PIG-740 to report an error near the double quotes rather |
| // than an unrelated EOF error message |
| @Test |
| public void testBlockErrMessage() throws Throwable { |
| PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| PigContext context = server.getPigContext(); |
| |
| String script = "A = load 'inputdata' using PigStorage() as ( curr_searchQuery );\n" + |
| "B = foreach A { domain = CONCAT(curr_searchQuery,\"^www\\.\");\n" + |
| " generate domain; };\n"; |
| ByteArrayInputStream cmd = new ByteArrayInputStream(script.getBytes()); |
| InputStreamReader reader = new InputStreamReader(cmd); |
| |
| Grunt grunt = new Grunt(new BufferedReader(reader), context); |
| |
| try { |
| grunt.exec(); |
| } catch(FrontendException e) { |
| e.printStackTrace(); |
| assertTrue(e.getMessage().contains("Error during parsing. <line 2, column 49> Unexpected character '\"'")); |
| } |
| } |
| |
| @Test |
| public void testCheckScript() throws Throwable { |
| // a query which has grunt commands intermixed with pig statements - this |
| // should pass through successfully with the check and all the grunt commands |
| // should be ignored during the check. |
| String query = "rmf input-copy.txt; cat 'foo'; a = load '1.txt' ; " + |
| "aliases;illustrate a; copyFromLocal foo bar; copyToLocal foo bar; " + |
| "describe a; mkdir foo; run bar.pig; exec bar.pig; cp foo bar; " + |
| "explain a;cd 'bar'; pwd; ls ; fs -ls ; fs -rmr foo; mv foo bar; " + |
| "dump a;store a into 'input-copy.txt' ; a = load '2.txt' as (b);" + |
| "explain a; rm foo; store a into 'bar';"; |
| |
| String[] cmds = new String[] { "'rm/rmf'", "'cp'", "'cat'", "'cd'", "'pwd'", |
| "'copyFromLocal'", "'copyToLocal'", "'describe'", "'ls'", |
| "'mkdir'", "'illustrate'", "'run/exec'", "'fs'", "'aliases'", |
| "'mv'", "'dump'" }; |
| ArrayList<String> msgs = new ArrayList<String>(); |
| for (String c : cmds) { |
| msgs.add(c + " statement is ignored while processing " + |
| "'explain -script' or '-check'"); |
| } |
| validate(query, true, msgs.toArray(new String[0])); |
| } |
| |
| @Test |
| public void testCheckScriptSyntaxErr() throws Throwable { |
| // a query which has grunt commands intermixed with pig statements - this |
| // should fail with the -check option with a syntax error |
| |
| // the query has a typo - chararay instead of chararray |
| String query = "a = load '1.txt' ; fs -rmr foo; mv foo bar; dump a;" + |
| "store a into 'input-copy.txt' ; dump a; a = load '2.txt' as " + |
| "(b:chararay);explain a; rm foo; store a into 'bar';"; |
| |
| String[] cmds = new String[] { "'fs'", "'mv'", "'dump'" }; |
| ArrayList<String> msgs = new ArrayList<String>(); |
| for (String c : cmds) { |
| msgs.add(c + " statement is ignored while processing " + |
| "'explain -script' or '-check'"); |
| } |
| msgs.add("Syntax error"); |
| validate(query, false, msgs.toArray(new String[0])); |
| } |
| |
| @Test |
| public void testCheckScriptSyntaxWithSemiColonUDFErr() throws Throwable { |
| // Should able to handle semicolons in udf |
| String query = "a = load 'i1' as (f1:chararray);" + |
| "c = foreach a generate REGEX_EXTRACT(f1, '.;' ,1); dump c;"; |
| |
| ArrayList<String> msgs = new ArrayList<String>(); // |
| validate(query, true, msgs.toArray(new String[0])); |
| } |
| |
| @Test |
| public void testCheckScriptTypeCheckErr() throws Throwable { |
| // a query which has grunt commands intermixed with pig statements - this |
| // should fail with the -check option with a type checking error |
| |
| // the query has incompatible types in bincond |
| String query = "a = load 'foo.pig' as (s:chararray); dump a; explain a; " + |
| "store a into 'foobar'; b = foreach a generate " + |
| "(s == 2 ? 1 : 2.0); store b into 'bar';"; |
| |
| String[] cmds = new String[] { "'dump'" }; |
| ArrayList<String> msgs = new ArrayList<String>(); |
| for (String c : cmds) { |
| msgs.add(c + " statement is ignored while processing " + |
| "'explain -script' or '-check'"); |
| } |
| msgs.add("incompatible types in Equal Operator"); |
| validate(query, false, msgs.toArray(new String[0])); |
| } |
| |
| @Test |
| public void testWithInlineOp() throws Throwable { |
| // specifying schema inside inline-op makes PigScriptParser.jj to read |
| // to the end of file |
| String query = "a = load 'i1' as (f1:chararray);" + |
| "b = foreach (foreach a generate f1 as b1) generate b1; " + |
| "dump b; "; |
| |
| ArrayList<String> msgs = new ArrayList<String>(); // |
| validate(query, true, msgs.toArray(new String[0])); |
| } |
| |
| /* |
| * Following test currently fails. Insead of making further changes to |
| * PigScriptParser.jj, leaving it till we move out of javacc in PIG-2597 |
| |
| @Test |
| public void testWithInlineOpWithNestedForeach() throws Throwable { |
| // This one currently fails because "{}" is treated as |
| // IN_BLOCK in PigScriptParser.jj which jumps to PIG_END and ignore |
| // ") generate *; " part of the code. |
| // In order to support this test, we need to add parenthesis matching |
| // everywhere in PigScriptParser.jj (or stop using it) |
| // |
| String query = "a = load 'i1' as (f1:chararray);" + |
| "b = group a ALL; " + |
| "c = foreach ( foreach b {b1 = limit a 3; generate 1, b1;} ) generate *; " + |
| "dump c;"; |
| ArrayList<String> msgs = new ArrayList<String>(); // |
| validate(query, true, msgs.toArray(new String[0])); |
| } |
| */ |
| |
| |
| |
| private void validate(String query, boolean syntaxOk, |
| String[] logMessagesToCheck) throws Throwable { |
| File scriptFile = Util.createFile(new String[] { query}); |
| String scriptFileName = scriptFile.getAbsolutePath(); |
| String cmd = "java -cp " + System.getProperty("java.class.path") + |
| " org.apache.pig.Main -x local -c " + scriptFileName; |
| |
| ProcessReturnInfo pri = Util.executeJavaCommandAndReturnInfo(cmd); |
| for (String msg : logMessagesToCheck) { |
| assertTrue("Checking if " + pri.stderrContents + " contains " + |
| msg, pri.stderrContents.contains(msg)); |
| } |
| if(syntaxOk) { |
| assertTrue("Checking that the syntax OK message was printed on " + |
| "stderr <" + pri.stderrContents + ">", |
| pri.stderrContents.contains("syntax OK")); |
| } else { |
| assertFalse("Checking that the syntax OK message was NOT printed on " + |
| "stderr <" + pri.stderrContents + ">", |
| pri.stderrContents.contains("syntax OK")); |
| } |
| } |
| |
| @Test |
| public void testSet() throws Throwable { |
| |
| String strCmd = "set my.arbitrary.key my.arbitrary.value\n"; |
| PigContext pc = new PigServer(ExecType.LOCAL).getPigContext(); |
| InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(strCmd.getBytes())); |
| new Grunt(new BufferedReader(reader), pc).exec(); |
| |
| assertEquals("my.arbitrary.value", pc.getProperties().getProperty("my.arbitrary.key")); |
| } |
| |
| @Test |
| public void testCheckScriptTypeCheckErrNoStoreDump() throws Throwable { |
| //the query has not store or dump, but in when -check is used |
| // all statements should be validated |
| String query = "a = load 'foo.pig' as (s:chararray); " + |
| "b = foreach a generate $1;"; |
| |
| String msg = "Trying to access non-existent column"; |
| validateGruntCheckFail(query, msg); |
| } |
| |
| private void validateGruntCheckFail(String piglatin, String errMsg) throws Throwable{ |
| String scriptFile = "myscript.pig"; |
| try { |
| BufferedReader br = new BufferedReader(new StringReader(piglatin)); |
| Grunt grunt = new Grunt(br, new PigContext(ExecType.LOCAL, new Properties())); |
| String [] inp = {piglatin}; |
| Util.createLocalInputFile(scriptFile, inp); |
| |
| grunt.checkScript(scriptFile); |
| |
| fail("Expected exception isn't thrown"); |
| } catch (FrontendException e) { |
| Util.checkMessageInException(e, errMsg); |
| } |
| } |
| |
| @Test |
| public void testScriptWithSingleQuoteInsideCommentInGenerate() throws Throwable { |
| //the query has not store or dump, but in when -check is used |
| // all statements should be validated |
| String query = "a = load 'foo.pig' as (s1:chararray, s2:chararray);\n" + |
| "b = foreach a generate s1,\n" + |
| "--comment should be ignored even it has single quote ' in the line \n" + |
| " s2;\n"; |
| ArrayList<String> msgs = new ArrayList<String>(); // |
| validate(query, true, msgs.toArray(new String[0])); |
| query = "a = load 'foo.pig' as (s1:chararray, s2:chararray);\n" + |
| "b = foreach a generate s1,\n" + |
| "/* comment should be ignored even it has single quote ' in the line \n" + |
| "*/ \n" + |
| " s2;\n"; |
| validate(query, true, msgs.toArray(new String[0])); |
| } |
| |
| @Test |
| public void testDebugOn() throws Throwable { |
| |
| String strCmd = "set debug on\n"; |
| PigContext pc = new PigServer(ExecType.LOCAL).getPigContext(); |
| InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(strCmd.getBytes())); |
| new Grunt(new BufferedReader(reader), pc).exec(); |
| |
| assertEquals(Level.DEBUG.toString(), pc.getLog4jProperties().getProperty("log4j.logger.org.apache.pig")); |
| } |
| |
| @Test |
| public void testDebugOff() throws Throwable { |
| |
| String strCmd = "set debug off\n"; |
| PigContext pc = new PigServer(ExecType.LOCAL).getPigContext(); |
| InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(strCmd.getBytes())); |
| new Grunt(new BufferedReader(reader), pc).exec(); |
| |
| assertEquals(Level.INFO.toString(), pc.getLog4jProperties().getProperty("log4j.logger.org.apache.pig")); |
| } |
| |
| @Test |
| public void testAutoShipUDFContainingJar() throws Throwable { |
| |
| String FILE_SEPARATOR = System.getProperty("file.separator"); |
| File tmpDir = File.createTempFile("test", ""); |
| tmpDir.delete(); |
| tmpDir.mkdir(); |
| |
| File udfDir = new File(tmpDir.getAbsolutePath() + FILE_SEPARATOR + "com" + FILE_SEPARATOR |
| + "xxx" + FILE_SEPARATOR + "udf"); |
| udfDir.mkdirs(); |
| |
| String udfSrc = new String("package com.xxx.udf;\n" + |
| "import java.io.IOException;\n" + |
| "import org.apache.pig.EvalFunc;\n" + |
| "import org.apache.pig.data.Tuple;\n" + |
| "public class TestUDF extends EvalFunc<Integer>{\n" + |
| "public Integer exec(Tuple input) throws IOException {\n" + |
| "return 1;}\n" + |
| "}"); |
| |
| // compile |
| JavaCompilerHelper javaCompilerHelper = new JavaCompilerHelper(); |
| javaCompilerHelper.compile(tmpDir.getAbsolutePath(), |
| new JavaCompilerHelper.JavaSourceFromString("com.xxx.udf.TestUDF", udfSrc)); |
| |
| String jarName = "TestUDFJar.jar"; |
| String jarFile = tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName; |
| int status = Util.executeJavaCommand("jar -cf " + jarFile + |
| " -C " + tmpDir.getAbsolutePath() + " " + "com"); |
| assertEquals(0, status); |
| |
| Util.createInputFile(cluster, "table_testAutoShipUDFContainingJar", new String[] { "1" }); |
| File scriptFile = Util.createFile(new String[] { |
| "a = load 'table_testAutoShipUDFContainingJar' as (a0:int);" + |
| "b = foreach a generate com.xxx.udf.TestUDF(a0);" + |
| "store b into 'output_testAutoShipUDFContainingJar';" |
| }); |
| String scriptFileName = scriptFile.getAbsolutePath(); |
| String execTypeOptions = "-x " + cluster.getExecType() + " "; |
| String cmd = "java -cp " + System.getProperty("java.class.path") + File.pathSeparator + jarFile + |
| " org.apache.pig.Main " + execTypeOptions + scriptFileName; |
| ProcessReturnInfo pri = Util.executeJavaCommandAndReturnInfo(cmd); |
| assertEquals(pri.exitCode, 0); |
| String[] lines = pri.stderrContents.split("\n"); |
| boolean found = false; |
| for (String line : lines) { |
| if (line.matches(".*Added jar .*" + jarName + ".*")) { |
| // MR mode |
| found = true; |
| } else if (line.matches(".*Local resource.*" + jarName + ".*")) { |
| // Tez mode |
| found = true; |
| } |
| } |
| assertTrue(found); |
| } |
| |
| @Test |
| public void testGruntUtf8() throws Throwable { |
| String command = "mkdir 测试\n" + |
| "quit\n"; |
| System.setProperty("jline.WindowsTerminal.directConsole", "false"); |
| System.setIn(new ByteArrayInputStream(command.getBytes())); |
| org.apache.pig.PigRunner.run(new String[] {"-x", "local"}, null); |
| File[] partFiles = new File(".").listFiles(new FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| return name.equals("测试"); |
| } |
| }); |
| assertEquals(partFiles.length, 1); |
| new File("测试").delete(); |
| } |
| } |