blob: 99ab31b04fe29e9f78407478aea7f24dc829c299 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.PatternLayout;
import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.JavaCompilerHelper;
import org.apache.pig.test.Util.ProcessReturnInfo;
import org.apache.pig.tools.grunt.Grunt;
import org.apache.pig.tools.pigscript.parser.ParseException;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.JobStats.JobState;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestGrunt {
static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
private String basedir = "test/org/apache/pig/test/data";
@BeforeClass
public static void oneTimeSetup() throws Exception {
cluster.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY,"true");
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
@Before
public void setup() {
Util.resetStateForExecModeSwitch();
}
@Test
public void testCopyFromLocal() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "copyFromLocal README.txt sh_copy ;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testDefine() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "define myudf org.apache.pig.builtin.AVG();\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
try {
grunt.exec();
} catch (Exception e) {
assertTrue(e.getMessage().contains("Encountered \"define\""));
}
assertTrue(null != context.getFuncSpecFromAlias("myudf"));
}
@Test
public void testBagSchema() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'input1' as (b: bag{t:(i: int, c:chararray, f: float)});\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testBagSchemaFail() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'input1'as (b: bag{t:(i: int, c:chararray, f: float)});\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
try {
grunt.exec();
fail( "Test case is supposed to fail." );
} catch (Exception e) {
assertTrue(e.getMessage().contains(
"<line 1, column 62> mismatched input ';' expecting RIGHT_PAREN"));
}
}
@Test
public void testBagConstant() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'input1'; b = foreach a generate {(1, '1', 0.4f),(2, '2', 0.45)};\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testBagConstantWithSchema() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'input1'; b = foreach a generate "
+ "{(1, '1', 0.4f),(2, '2', 0.45)} as "
+ "b: bag{t:(i: int, c:chararray, d: double)};\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testBagConstantInForeachBlock() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'input1'; "
+ "b = foreach a {generate {(1, '1', 0.4f),(2, '2', 0.45)};};\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testBagConstantWithSchemaInForeachBlock() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'input1'; "
+ "b = foreach a {generate {(1, '1', 0.4f),(2, '2', 0.45)} "
+ "as b: bag{t:(i: int, c:chararray, d: double)};};\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testParsingAsInForeachBlock() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast); "
+ "b = group a by foo; c = foreach b "
+ "{generate SUM(a.fast) as fast;};\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testParsingAsInForeachWithOutBlock() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast); "
+ "b = group a by foo; c = foreach b generate SUM(a.fast) as fast;\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testParsingWordWithAsInForeachBlock() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast); "
+ "b = group a by foo; c = foreach b {generate SUM(a.fast);};\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testParsingWordWithAsInForeachWithOutBlock() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast); "
+ "b = group a by foo; c = foreach b generate SUM(a.fast);\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testParsingWordWithAsInForeachWithOutBlock2() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "cash = load 'foo' as (foo, fast); "
+ "b = foreach cash generate fast * 2.0;\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testParsingGenerateInForeachBlock() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); "
+ "b = group a by foo; c = foreach b {generate a.regenerate;};\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testParsingGenerateInForeachWithOutBlock() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); "
+ "b = group a by foo; c = foreach b generate a.regenerate;\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testParsingAsGenerateInForeachBlock() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); "
+ "b = group a by foo; c = foreach b {generate "
+ "{(1, '1', 0.4f),(2, '2', 0.45)} "
+ "as b: bag{t:(i: int, cease:chararray, degenerate: double)}, "
+ "SUM(a.fast) as fast, a.regenerate as degenerated;};\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testParsingAsGenerateInForeachWithOutBlock() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); "
+ "b = group a by foo; c = foreach b generate "
+ "{(1, '1', 0.4f),(2, '2', 0.45)} "
+ "as b: bag{t:(i: int, cease:chararray, degenerate: double)}, "
+ "SUM(a.fast) as fast, a.regenerate as degenerated;\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testRunStatment() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate);"
+ " run -param LIMIT=5 -param_file " + basedir
+ "/test_broken.ppf " + basedir + "/testsub.pig; explain bar";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testExecStatment() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
boolean caught = false;
String strCmd = "a = load 'foo' as (foo, fast, regenerate);"
+ " exec -param LIMIT=5 -param FUNCTION=COUNT "
+ "-param FILE=foo " + basedir + "/testsub.pig; explain bar;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
try {
grunt.exec();
} catch (Exception e) {
caught = true;
assertTrue(e.getMessage().contains("alias bar"));
}
assertTrue(caught);
}
@Test
public void testRunStatmentNested() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); run "
+ basedir + "/testsubnested_run.pig; explain bar";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testExecStatmentNested() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
boolean caught = false;
String strCmd = "a = load 'foo' as (foo, fast, regenerate); exec "
+ basedir + "/testsubnested_exec.pig; explain bar";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
try {
grunt.exec();
} catch (Exception e) {
caught = true;
assertTrue(e.getMessage().contains("alias bar"));
}
assertTrue(caught);
}
@Test
public void testErrorLineNumber() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "A = load 'x' as ( u:int, v:chararray );\n" +
"sh ls\n" +
"B = foreach A generate u , v; C = distinct 'xxx';\n" +
"store C into 'y';";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
try {
grunt.exec();
} catch (Exception e) {
assertTrue(e.getMessage().contains(
"<line 3, column 42> Syntax error, unexpected symbol at or near ''xxx''"));
return;
}
fail( "Test case is supposed to fail." );
}
@Test
public void testExplainEmpty() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); run "
+ basedir + "/testsubnested_run.pig; explain";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testExplainScript() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); explain -script "
+ basedir + "/testsubnested_run.pig;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
/**
* PIG-2084
* Check if only statements used in query are validated, in non-interactive
* /non-check mode. There is an 'unused' statement in query that would otherise
* fail the validation.
* Primary purpose of test is to verify that check not happening for
* every statement.
* @throws Throwable
*/
@Test
public void testExplainScriptIsEachStatementValidated() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate);" +
"b = foreach a generate foo + 'x' + 1;" +
"c = foreach a generate foo, fast;" +
"explain c; ";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testIllustrateScript() throws Throwable {
PigServer server = new PigServer(ExecType.LOCAL, new Properties());
PigContext context = server.getPigContext();
String strCmd = "illustrate -script "
+ basedir + "/illustrate.pig;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testIllustrateScript2() throws Throwable {
PigServer server = new PigServer(ExecType.LOCAL, new Properties());
PigContext context = server.getPigContext();
String strCmd = "illustrate -script "
+ basedir + "/illustrate2.pig;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testIllustrateScript3() throws Throwable {
PigServer server = new PigServer(ExecType.LOCAL, new Properties());
PigContext context = server.getPigContext();
String strCmd = "illustrate -script "
+ basedir + "/illustrate3.pig;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testIllustrateScript4() throws Throwable {
// empty line/field test
PigServer server = new PigServer(ExecType.LOCAL, new Properties());
PigContext context = server.getPigContext();
String strCmd = "illustrate -script "
+ basedir + "/illustrate4.pig;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testIllustrateScript5() throws Throwable {
// empty line/field test
PigServer server = new PigServer(ExecType.LOCAL, new Properties());
PigContext context = server.getPigContext();
String strCmd = "illustrate -script "
+ basedir + "/illustrate5.pig;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testIllustrateScript6() throws Throwable {
// empty line/field test
PigServer server = new PigServer(ExecType.LOCAL, new Properties());
PigContext context = server.getPigContext();
String strCmd = "illustrate -script "
+ basedir + "/illustrate6.pig;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testIllustrateScript7() throws Throwable {
// empty line/field test
PigServer server = new PigServer(ExecType.LOCAL, new Properties());
PigContext context = server.getPigContext();
String strCmd = "illustrate -script "
+ basedir + "/illustrate7.pig;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
/**
* verify that grunt commands are ignored in explain -script mode
*/
@Test
public void testExplainScript2() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "explain -script "
+ basedir + "/explainScript.pig;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
String logMessagesFile = "TestGrunt-testExplainScript2-stderr.txt";
// add a file based appender to the root logger so we can parse the
// messages logged by grunt and verify that grunt commands are ignored
// in explain -script mode
Appender fileAppender = new FileAppender(new PatternLayout(), logMessagesFile);
try {
org.apache.log4j.LogManager.getRootLogger().addAppender(fileAppender);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
BufferedReader in = new BufferedReader(new FileReader(logMessagesFile));
String gruntLoggingContents = "";
//read file into a string
String line;
while ( (line = in.readLine()) != null) {
gruntLoggingContents += line + "\n";
}
in.close();
String[] cmds = new String[] { "'rm/rmf'", "'cp'", "'cat'", "'cd'", "'pwd'",
"'copyFromLocal'", "'copyToLocal'", "'describe'", "'ls'",
"'mkdir'", "'illustrate'", "'run/exec'", "'fs'", "'aliases'",
"'mv'", "'dump'" };
for (String c : cmds) {
String expected = c + " statement is ignored while processing " +
"'explain -script' or '-check'";
assertTrue("Checking if " + gruntLoggingContents + " contains " +
expected, gruntLoggingContents.contains(expected));
}
} finally {
org.apache.log4j.LogManager.getRootLogger().removeAppender(fileAppender);
new File(logMessagesFile).delete();
}
}
@Test
public void testExplainBrief() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); explain -brief -script "
+ basedir + "/testsubnested_run.pig;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testExplainDot() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); explain -dot -script "
+ basedir + "/testsubnested_run.pig;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testExplainOut() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "a = load 'foo' as (foo, fast, regenerate); explain -out /tmp -script "
+ basedir + "/testsubnested_run.pig;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testPartialExecution() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
FileLocalizer.setInitialized(false);
String strCmd = "rmf bar; rmf baz; "
+ "a = load '"
+ Util.generateURI("file:test/org/apache/pig/test/data/passwd",
context)
+ "';"
+ "store a into 'bar'; exec; a = load 'bar'; store a into 'baz';\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testFileCmds() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd =
"rmf bar; rmf baz;"
+"a = load '"
+ Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';"
+"store a into 'bar';"
+"cp bar baz;"
+"rm bar; rm baz;"
+"store a into 'baz';"
+"store a into 'bar';"
+"rm baz; rm bar;"
+"store a into 'baz';"
+"mv baz bar;"
+"b = load 'bar';"
+"store b into 'baz';"
+"cat baz;"
+"rm baz;"
+"rm bar;\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testCD() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd =
"mkdir /tmp;"
+"mkdir /tmp/foo;"
+"cd /tmp;"
+"rmf bar; rmf foo/baz;"
+"copyFromLocal test/org/apache/pig/test/data/passwd bar;"
+"a = load 'bar';"
+"cd foo;"
+"store a into 'baz';"
+"cd /;"
+"rm /tmp/bar; rm /tmp/foo/baz;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testDump() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd =
"rmf bla;"
+"a = load '"
+ Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';"
+"e = group a by $0;"
+"f = foreach e generate group, COUNT($1);"
+"store f into 'bla';"
+"f1 = load 'bla';"
+"g = order f1 by $1;"
+"dump g;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testIllustrate() throws Throwable {
Assume.assumeTrue("Skip this test for TEZ. See PIG-3993", Util.isMapredExecType(cluster.getExecType()));
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd =
"rmf bla;"
+"a = load '"
+ Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';"
+"e = group a by $0;"
+"f = foreach e generate group, COUNT($1);"
+"store f into 'bla';"
+"f1 = load 'bla' as (f:chararray);"
+"g = order f1 by $0;"
+"illustrate g;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testKeepGoing() throws Throwable {
Assume.assumeTrue("Skip this test for TEZ", Util.isMapredExecType(cluster.getExecType()));
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String filename =
Util.generateURI("file:test/org/apache/pig/test/data/passwd", context);
String strCmd =
"rmf bar;"
+"rmf foo;"
+"rmf baz;"
+"A = load '" + filename + "';"
+"B = foreach A generate 1;"
+"C = foreach A generate 0/0;"
+"store B into 'foo';"
+"store C into 'bar';"
+"A = load '" + filename + "';"
+"B = stream A through `false`;"
+"store B into 'baz';"
+"cat bar;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testKeepGoigFailed() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "passwd");
String strCmd =
"rmf bar;"
+"rmf foo;"
+"rmf baz;"
+"A = load 'passwd';"
+"B = foreach A generate 1;"
+"C = foreach A generate 0/0;"
+"store B into 'foo';"
+"store C into 'bar';"
+"A = load 'passwd';"
+"B = stream A through `false`;"
+"store B into 'baz';"
+"cat baz;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
boolean caught = false;
try {
grunt.exec();
} catch (Exception e) {
caught = true;
assertTrue(e.getMessage().contains("baz does not exist"));
}
assertTrue(caught);
}
@Test
public void testInvalidParam() throws Throwable {
PigServer server = new PigServer(ExecType.LOCAL, cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd =
"run -param -param;";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
boolean caught = false;
try {
grunt.exec();
} catch (ParseException e) {
caught = true;
assertTrue(e.getMessage().contains("Encountered"));
}
assertTrue(caught);
}
@Test
public void testStopOnFailure() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
context.getProperties().setProperty("stop.on.failure", ""+true);
context.getProperties().setProperty("mapreduce.map.maxattempts", "2");
String strCmd =
"rmf bar;\n"
+"rmf foo;\n"
+"rmf baz;\n"
+"copyFromLocal test/org/apache/pig/test/data/passwd pre;\n"
+"A = load '"
+ Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';\n"
+"B = stream A through `false`;\n"
+"store B into 'bar' using BinStorage();\n"
+"A1 = load '"
+ Util.generateURI("file:test/org/apache/pig/test/data/passwd", context) + "';\n"
+ "A1 = foreach A1 generate org.apache.pig.test.TestGrunt$SleepUDF();\n"
+"store A1 into 'bar1' using BinStorage();\n"
+"A = load 'bar';\n"
+"store A into 'foo';\n"
+"cp pre done;\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
boolean caught = false;
try {
grunt.exec();
} catch (PigException e) {
caught = true;
assertTrue(e.getErrorCode() == 6017);
}
if (Util.isMapredExecType(cluster.getExecType())) {
JobGraph jobGraph = PigStats.get().getJobGraph();
List<JobStats> failedJobs = jobGraph.getFailedJobs();
assertEquals(2, failedJobs.size());
for (JobStats stats : failedJobs) {
if (stats.getAlias().equals("A,B")) {
// Job with alias A,B should have failed because of streaming error
assertTrue(stats.getException().getMessage().contains(
"Received Error while processing the map plan: "
+ "'false (stdin-org.apache.pig.builtin.PigStreaming/stdout-org.apache.pig.builtin.PigStreaming)'"
+ " failed with exit status: 1"));
} else {
// Job with alias A1 with sleep should be killed as a result of stop on failure
assertTrue(stats.getErrorMessage().startsWith("Failing running job for -stop_on_failure"));
}
}
// Third job which is dependent on alias A,B should not have started
assertEquals(1, getUnknownJobs(jobGraph).size());
} else {
// Tez
JobGraph jobGraph = PigStats.get().getJobGraph();
List<JobStats> failedJobs = jobGraph.getFailedJobs();
assertEquals(1, failedJobs.size());
// First job should have failed because of streaming error. Sleep is also part of same job
assertTrue(failedJobs.get(0).getException().getMessage().contains(
"Received Error while processing the map plan: "
+ "'false (stdin-org.apache.pig.builtin.PigStreaming/stdout-org.apache.pig.builtin.PigStreaming)'"
+ " failed with exit status: 1"));
// Second job which is dependent on first should not have started
assertEquals(1, getUnknownJobs(jobGraph).size());
}
// Parallel job (sleep udf) should have not succeeded and been killed
assertFalse(server.existsFile("bar1"));
// fs cp command at the end should not have been executed
assertFalse(server.existsFile("done"));
assertTrue(caught);
}
private ArrayList<JobStats> getUnknownJobs(JobGraph jobGraph) {
ArrayList<JobStats> unknown = new ArrayList<JobStats>();
Iterator<JobStats> iter = jobGraph.iterator();
while (iter.hasNext()) {
JobStats js = iter.next();
if (js.getState() == JobState.UNKNOWN) {
unknown.add(js);
}
}
return unknown;
}
public static class SleepUDF extends EvalFunc<Integer> {
@Override
public Integer exec(Tuple input) throws IOException {
try {
Thread.sleep(50000);
} catch (InterruptedException e) {
throw new IOException(e);
}
return null;
}
}
@Test
public void testFsCommand() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(),cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd =
"fs -ls /;"
+"fs -mkdir /fstmp;"
+"fs -mkdir /fstmp/foo;"
+"cd /fstmp;"
+"fs -copyFromLocal test/org/apache/pig/test/data/passwd bar;"
+"a = load 'bar';"
+"cd foo;"
+"store a into 'baz';"
+"cd /;"
+"fs -ls .;"
+"fs -rmr /fstmp/foo/baz;"
+"cd";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
}
@Test
public void testShellCommand(){
try {
PigServer server = new PigServer(cluster.getExecType(),cluster.getProperties());
PigContext context = server.getPigContext();
String strRemoveFile = "rm";
String strRemoveDir = "rmdir";
if (Util.WINDOWS)
{
strRemoveFile = "del";
strRemoveDir = "rd";
}
String strCmd = "sh mkdir test_shell_tmp;";
// Create a temp directory via command and make sure it exists
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertTrue(new File("test_shell_tmp").exists());
// Remove the temp directory via shell and make sure it is gone
strCmd = "sh " + strRemoveDir + " test_shell_tmp;";
cmd = new ByteArrayInputStream(strCmd.getBytes());
reader = new InputStreamReader(cmd);
grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertFalse(new File("test_shell_tmp").exists());
// Verify pipes are usable in the command context by piping data to a file
strCmd = "sh echo hello world > tempShFileToTestShCommand";
cmd = new ByteArrayInputStream(strCmd.getBytes());
reader = new InputStreamReader(cmd);
grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
BufferedReader fileReader = null;
fileReader = new BufferedReader(new FileReader("tempShFileToTestShCommand"));
assertTrue(fileReader.readLine().trim().equals("hello world"));
fileReader.close();
// Remove the file via cmd and make sure it is gone
strCmd = "sh " + strRemoveFile + " tempShFileToTestShCommand";
cmd = new ByteArrayInputStream(strCmd.getBytes());
reader = new InputStreamReader(cmd);
grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertFalse(new File("tempShFileToTestShCommand").exists());
if (Util.WINDOWS) {
strCmd = "sh echo foo > TouchedFileInsideGrunt_61 && dir /B | findstr TouchedFileInsideGrunt_61 > fileContainingTouchedFileInsideGruntShell_71";
}
else {
strCmd = "sh touch TouchedFileInsideGrunt_61 && ls | grep TouchedFileInsideGrunt_61 > fileContainingTouchedFileInsideGruntShell_71";
}
cmd = new ByteArrayInputStream(strCmd.getBytes());
reader = new InputStreamReader(cmd);
grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
fileReader = new BufferedReader(new FileReader("fileContainingTouchedFileInsideGruntShell_71"));
assertTrue(fileReader.readLine().trim().equals("TouchedFileInsideGrunt_61"));
fileReader.close();
strCmd = "sh " + strRemoveFile+" fileContainingTouchedFileInsideGruntShell_71";
cmd = new ByteArrayInputStream(strCmd.getBytes());
reader = new InputStreamReader(cmd);
grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertFalse(new File("fileContainingTouchedFileInsideGruntShell_71").exists());
strCmd = "sh " + strRemoveFile + " TouchedFileInsideGrunt_61";
cmd = new ByteArrayInputStream(strCmd.getBytes());
reader = new InputStreamReader(cmd);
grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertFalse(new File("TouchedFileInsideGrunt_61").exists());
} catch (ExecException e) {
e.printStackTrace();
fail();
} catch (Throwable e) {
e.printStackTrace();
fail();
}
}
// See PIG-2497
@Test
public void testShellCommandOrder() throws Throwable {
PigServer server = new PigServer(ExecType.LOCAL, new Properties());
String strRemove = "rm";
if (Util.WINDOWS)
{
strRemove = "del";
}
File inputFile = File.createTempFile("testInputFile", ".txt");
PrintWriter pwInput = new PrintWriter(new FileWriter(inputFile));
pwInput.println("1");
pwInput.close();
File inputScript = File.createTempFile("testInputScript", "");
File outputFile = File.createTempFile("testOutputFile", ".txt");
outputFile.delete();
PrintWriter pwScript = new PrintWriter(new FileWriter(inputScript));
pwScript.println("a = load '" + Util.encodeEscape(inputFile.getAbsolutePath()) + "';");
pwScript.println("store a into '" + Util.encodeEscape(outputFile.getAbsolutePath()) + "';");
pwScript.println("sh " + strRemove + " " + Util.encodeEscape(inputFile.getAbsolutePath()));
pwScript.close();
InputStream inputStream = new FileInputStream(inputScript.getAbsoluteFile());
server.setBatchOn();
server.registerScript(inputStream);
List<ExecJob> execJobs = server.executeBatch();
assertTrue(execJobs.get(0).getStatus() == JOB_STATUS.COMPLETED);
}
@Test
public void testSetPriority() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "set job.priority high\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertEquals("high", context.getProperties().getProperty(PigContext.JOB_PRIORITY));
}
@Test
public void testSetWithQuotes() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "set job.priority 'high'\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertEquals("high", context.getProperties().getProperty(PigContext.JOB_PRIORITY));
}
@Test
public void testRegisterWithQuotes() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String jarName = Util.findPigJarName();
String strCmd = "register '" + jarName + "'\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertEquals(context.extraJars+ " of size 1", 1, context.extraJars.size());
assertTrue(context.extraJars.get(0)+" ends with /" + jarName, context.extraJars.get(0).toString().endsWith("/" + jarName));
}
@Test
public void testRegisterWithoutQuotes() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String jarName = Util.findPigJarName();
String strCmd = "register " + jarName + "\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertEquals(context.extraJars+ " of size 1", 1, context.extraJars.size());
assertTrue(context.extraJars.get(0)+" ends with /" + jarName, context.extraJars.get(0).toString().endsWith("/" + jarName));
}
@Test
public void testRegisterScripts() throws Throwable {
String[] script = {
"#!/usr/bin/python",
"@outputSchema(\"x:{t:(num:long)}\")",
"def square(number):" ,
"\treturn (number * number)"
};
Util.createLocalInputFile( "testRegisterScripts.py", script);
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String strCmd = "register testRegisterScripts.py using jython as pig\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
grunt.exec();
assertTrue(context.getFuncSpecFromAlias("pig.square") != null);
}
@Test
public void testScriptMissingLastNewLine() throws Throwable {
PigServer server = new PigServer(ExecType.LOCAL);
PigContext context = server.getPigContext();
String strCmd = "A = load 'bar';\nB = foreach A generate $0;";
BufferedReader reader = new BufferedReader(new StringReader(strCmd));
String substituted = context.doParamSubstitution(reader, null, null);
BufferedReader pigInput = new BufferedReader(new StringReader(substituted));
Grunt grunt = new Grunt(pigInput, context);
int results[] = grunt.exec();
for (int i=0; i<results.length; i++) {
assertEquals(0, results[i]);
}
}
// Test case for PIG-740 to report an error near the double quotes rather
// than an unrelated EOF error message
@Test
public void testBlockErrMessage() throws Throwable {
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
String script = "A = load 'inputdata' using PigStorage() as ( curr_searchQuery );\n" +
"B = foreach A { domain = CONCAT(curr_searchQuery,\"^www\\.\");\n" +
" generate domain; };\n";
ByteArrayInputStream cmd = new ByteArrayInputStream(script.getBytes());
InputStreamReader reader = new InputStreamReader(cmd);
Grunt grunt = new Grunt(new BufferedReader(reader), context);
try {
grunt.exec();
} catch(FrontendException e) {
e.printStackTrace();
assertTrue(e.getMessage().contains("Error during parsing. <line 2, column 49> Unexpected character '\"'"));
}
}
@Test
public void testCheckScript() throws Throwable {
// a query which has grunt commands intermixed with pig statements - this
// should pass through successfully with the check and all the grunt commands
// should be ignored during the check.
String query = "rmf input-copy.txt; cat 'foo'; a = load '1.txt' ; " +
"aliases;illustrate a; copyFromLocal foo bar; copyToLocal foo bar; " +
"describe a; mkdir foo; run bar.pig; exec bar.pig; cp foo bar; " +
"explain a;cd 'bar'; pwd; ls ; fs -ls ; fs -rmr foo; mv foo bar; " +
"dump a;store a into 'input-copy.txt' ; a = load '2.txt' as (b);" +
"explain a; rm foo; store a into 'bar';";
String[] cmds = new String[] { "'rm/rmf'", "'cp'", "'cat'", "'cd'", "'pwd'",
"'copyFromLocal'", "'copyToLocal'", "'describe'", "'ls'",
"'mkdir'", "'illustrate'", "'run/exec'", "'fs'", "'aliases'",
"'mv'", "'dump'" };
ArrayList<String> msgs = new ArrayList<String>();
for (String c : cmds) {
msgs.add(c + " statement is ignored while processing " +
"'explain -script' or '-check'");
}
validate(query, true, msgs.toArray(new String[0]));
}
@Test
public void testCheckScriptSyntaxErr() throws Throwable {
// a query which has grunt commands intermixed with pig statements - this
// should fail with the -check option with a syntax error
// the query has a typo - chararay instead of chararray
String query = "a = load '1.txt' ; fs -rmr foo; mv foo bar; dump a;" +
"store a into 'input-copy.txt' ; dump a; a = load '2.txt' as " +
"(b:chararay);explain a; rm foo; store a into 'bar';";
String[] cmds = new String[] { "'fs'", "'mv'", "'dump'" };
ArrayList<String> msgs = new ArrayList<String>();
for (String c : cmds) {
msgs.add(c + " statement is ignored while processing " +
"'explain -script' or '-check'");
}
msgs.add("Syntax error");
validate(query, false, msgs.toArray(new String[0]));
}
@Test
public void testCheckScriptSyntaxWithSemiColonUDFErr() throws Throwable {
// Should able to handle semicolons in udf
String query = "a = load 'i1' as (f1:chararray);" +
"c = foreach a generate REGEX_EXTRACT(f1, '.;' ,1); dump c;";
ArrayList<String> msgs = new ArrayList<String>(); //
validate(query, true, msgs.toArray(new String[0]));
}
@Test
public void testCheckScriptTypeCheckErr() throws Throwable {
// a query which has grunt commands intermixed with pig statements - this
// should fail with the -check option with a type checking error
// the query has incompatible types in bincond
String query = "a = load 'foo.pig' as (s:chararray); dump a; explain a; " +
"store a into 'foobar'; b = foreach a generate " +
"(s == 2 ? 1 : 2.0); store b into 'bar';";
String[] cmds = new String[] { "'dump'" };
ArrayList<String> msgs = new ArrayList<String>();
for (String c : cmds) {
msgs.add(c + " statement is ignored while processing " +
"'explain -script' or '-check'");
}
msgs.add("incompatible types in Equal Operator");
validate(query, false, msgs.toArray(new String[0]));
}
@Test
public void testWithInlineOp() throws Throwable {
// specifying schema inside inline-op makes PigScriptParser.jj to read
// to the end of file
String query = "a = load 'i1' as (f1:chararray);" +
"b = foreach (foreach a generate f1 as b1) generate b1; " +
"dump b; ";
ArrayList<String> msgs = new ArrayList<String>(); //
validate(query, true, msgs.toArray(new String[0]));
}
/*
* Following test currently fails. Insead of making further changes to
* PigScriptParser.jj, leaving it till we move out of javacc in PIG-2597
@Test
public void testWithInlineOpWithNestedForeach() throws Throwable {
// This one currently fails because "{}" is treated as
// IN_BLOCK in PigScriptParser.jj which jumps to PIG_END and ignore
// ") generate *; " part of the code.
// In order to support this test, we need to add parenthesis matching
// everywhere in PigScriptParser.jj (or stop using it)
//
String query = "a = load 'i1' as (f1:chararray);" +
"b = group a ALL; " +
"c = foreach ( foreach b {b1 = limit a 3; generate 1, b1;} ) generate *; " +
"dump c;";
ArrayList<String> msgs = new ArrayList<String>(); //
validate(query, true, msgs.toArray(new String[0]));
}
*/
private void validate(String query, boolean syntaxOk,
String[] logMessagesToCheck) throws Throwable {
File scriptFile = Util.createFile(new String[] { query});
String scriptFileName = scriptFile.getAbsolutePath();
String cmd = "java -cp " + System.getProperty("java.class.path") +
" org.apache.pig.Main -x local -c " + scriptFileName;
ProcessReturnInfo pri = Util.executeJavaCommandAndReturnInfo(cmd);
for (String msg : logMessagesToCheck) {
assertTrue("Checking if " + pri.stderrContents + " contains " +
msg, pri.stderrContents.contains(msg));
}
if(syntaxOk) {
assertTrue("Checking that the syntax OK message was printed on " +
"stderr <" + pri.stderrContents + ">",
pri.stderrContents.contains("syntax OK"));
} else {
assertFalse("Checking that the syntax OK message was NOT printed on " +
"stderr <" + pri.stderrContents + ">",
pri.stderrContents.contains("syntax OK"));
}
}
@Test
public void testSet() throws Throwable {
String strCmd = "set my.arbitrary.key my.arbitrary.value\n";
PigContext pc = new PigServer(ExecType.LOCAL).getPigContext();
InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(strCmd.getBytes()));
new Grunt(new BufferedReader(reader), pc).exec();
assertEquals("my.arbitrary.value", pc.getProperties().getProperty("my.arbitrary.key"));
}
@Test
public void testCheckScriptTypeCheckErrNoStoreDump() throws Throwable {
//the query has not store or dump, but in when -check is used
// all statements should be validated
String query = "a = load 'foo.pig' as (s:chararray); " +
"b = foreach a generate $1;";
String msg = "Trying to access non-existent column";
validateGruntCheckFail(query, msg);
}
private void validateGruntCheckFail(String piglatin, String errMsg) throws Throwable{
String scriptFile = "myscript.pig";
try {
BufferedReader br = new BufferedReader(new StringReader(piglatin));
Grunt grunt = new Grunt(br, new PigContext(ExecType.LOCAL, new Properties()));
String [] inp = {piglatin};
Util.createLocalInputFile(scriptFile, inp);
grunt.checkScript(scriptFile);
fail("Expected exception isn't thrown");
} catch (FrontendException e) {
Util.checkMessageInException(e, errMsg);
}
}
@Test
public void testScriptWithSingleQuoteInsideCommentInGenerate() throws Throwable {
//the query has not store or dump, but in when -check is used
// all statements should be validated
String query = "a = load 'foo.pig' as (s1:chararray, s2:chararray);\n" +
"b = foreach a generate s1,\n" +
"--comment should be ignored even it has single quote ' in the line \n" +
" s2;\n";
ArrayList<String> msgs = new ArrayList<String>(); //
validate(query, true, msgs.toArray(new String[0]));
query = "a = load 'foo.pig' as (s1:chararray, s2:chararray);\n" +
"b = foreach a generate s1,\n" +
"/* comment should be ignored even it has single quote ' in the line \n" +
"*/ \n" +
" s2;\n";
validate(query, true, msgs.toArray(new String[0]));
}
@Test
public void testDebugOn() throws Throwable {
String strCmd = "set debug on\n";
PigContext pc = new PigServer(ExecType.LOCAL).getPigContext();
InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(strCmd.getBytes()));
new Grunt(new BufferedReader(reader), pc).exec();
assertEquals(Level.DEBUG.toString(), pc.getLog4jProperties().getProperty("log4j.logger.org.apache.pig"));
}
@Test
public void testDebugOff() throws Throwable {
String strCmd = "set debug off\n";
PigContext pc = new PigServer(ExecType.LOCAL).getPigContext();
InputStreamReader reader = new InputStreamReader(new ByteArrayInputStream(strCmd.getBytes()));
new Grunt(new BufferedReader(reader), pc).exec();
assertEquals(Level.INFO.toString(), pc.getLog4jProperties().getProperty("log4j.logger.org.apache.pig"));
}
@Test
public void testAutoShipUDFContainingJar() throws Throwable {
String FILE_SEPARATOR = System.getProperty("file.separator");
File tmpDir = File.createTempFile("test", "");
tmpDir.delete();
tmpDir.mkdir();
File udfDir = new File(tmpDir.getAbsolutePath() + FILE_SEPARATOR + "com" + FILE_SEPARATOR
+ "xxx" + FILE_SEPARATOR + "udf");
udfDir.mkdirs();
String udfSrc = new String("package com.xxx.udf;\n" +
"import java.io.IOException;\n" +
"import org.apache.pig.EvalFunc;\n" +
"import org.apache.pig.data.Tuple;\n" +
"public class TestUDF extends EvalFunc<Integer>{\n" +
"public Integer exec(Tuple input) throws IOException {\n" +
"return 1;}\n" +
"}");
// compile
JavaCompilerHelper javaCompilerHelper = new JavaCompilerHelper();
javaCompilerHelper.compile(tmpDir.getAbsolutePath(),
new JavaCompilerHelper.JavaSourceFromString("com.xxx.udf.TestUDF", udfSrc));
String jarName = "TestUDFJar.jar";
String jarFile = tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName;
int status = Util.executeJavaCommand("jar -cf " + jarFile +
" -C " + tmpDir.getAbsolutePath() + " " + "com");
assertEquals(0, status);
Util.createInputFile(cluster, "table_testAutoShipUDFContainingJar", new String[] { "1" });
File scriptFile = Util.createFile(new String[] {
"a = load 'table_testAutoShipUDFContainingJar' as (a0:int);" +
"b = foreach a generate com.xxx.udf.TestUDF(a0);" +
"store b into 'output_testAutoShipUDFContainingJar';"
});
String scriptFileName = scriptFile.getAbsolutePath();
String execTypeOptions = "-x " + cluster.getExecType() + " ";
String cmd = "java -cp " + System.getProperty("java.class.path") + File.pathSeparator + jarFile +
" org.apache.pig.Main " + execTypeOptions + scriptFileName;
ProcessReturnInfo pri = Util.executeJavaCommandAndReturnInfo(cmd);
assertEquals(pri.exitCode, 0);
String[] lines = pri.stderrContents.split("\n");
boolean found = false;
for (String line : lines) {
if (line.matches(".*Added jar .*" + jarName + ".*")) {
// MR mode
found = true;
} else if (line.matches(".*Local resource.*" + jarName + ".*")) {
// Tez mode
found = true;
}
}
assertTrue(found);
}
@Test
public void testGruntUtf8() throws Throwable {
String command = "mkdir 测试\n" +
"quit\n";
System.setProperty("jline.WindowsTerminal.directConsole", "false");
System.setIn(new ByteArrayInputStream(command.getBytes()));
org.apache.pig.PigRunner.run(new String[] {"-x", "local"}, null);
File[] partFiles = new File(".").listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.equals("测试");
}
});
assertEquals(partFiles.length, 1);
new File("测试").delete();
}
}