blob: 08f585840cc5fdd95221c2c7f15f62fc046c5b00 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.util.Utils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
public class TestPigServer {
private static Properties properties;
private static MiniGenericCluster cluster;
private File tempDir;
public void setUp() throws Exception{
tempDir = Files.createTempDir();
public void tearDown() throws Exception{
public static void oneTimeSetup() {
cluster = MiniGenericCluster.buildCluster();
properties = cluster.getProperties();
public static void oneTimeTearDown() throws Exception {
private final static String FILE_SEPARATOR = System.getProperty("file.separator");
// make sure that name is included or not (depending on flag "included")
// in the given list of stings
private static void verifyStringContained(List<URL> list, String name, boolean included) {
int count = 0;
for (URL url : list) {
if (url.toString().contains(name)) {
if (!included) {
fail("Included is false, but url ["+url+"] contains name ["+name+"]");
assertEquals("Too many urls contain name: " + name, 1, ++count);
if (included) {
assertEquals("Number of urls that contain name [" + name + "] != 1", 1, count);
// creates an empty jar file
private static void createFakeJarFile(String location, String name)
throws IOException {
createFakeJarFile(location, name,
// creates an empty jar file
private static void createFakeJarFile(String location, String name, FileSystem fs)
throws IOException {
System.err. println("Location: " + location + " name: " + name);
Path dir = new Path(location);
assertTrue(fs.createNewFile(new Path(dir, name)));
// dynamically add more resources to the system class loader
private static void registerNewResource(String file) throws Exception {
URL urlToAdd = new File(file).toURI().toURL();
ClassLoader sysLoader = ClassLoader.getSystemClassLoader();
// Find the class loader below bootstrap class loader
// It is either the system class loader (first invocation), or the new URLClassLoader added below
while (sysLoader.getParent().getParent() != null) {
sysLoader = sysLoader.getParent();
// Check if this class loader is instance of URLClassLoader
// On Java 8 and before it is, add resources via addURL method
// On Java 11 and after it isn't, add a new URLClassLoader with the new resources above it
if (sysLoader instanceof URLClassLoader) {
Method addMethod = URLClassLoader.class.
getDeclaredMethod("addURL", new Class[]{URL.class});
addMethod.invoke(sysLoader, new Object[]{urlToAdd});
} else {
Field parent = ClassLoader.class.getDeclaredField("parent");
ClassLoader urlClassLoader = new URLClassLoader(new URL[]{urlToAdd}, ClassLoader.getSystemClassLoader().getParent());
parent.set(ClassLoader.getSystemClassLoader(), urlClassLoader);
* The jar file to register is not present
public void testRegisterJarFileNotPresent() throws Throwable {
// resister a jar file that does not exist
String jarName = "BadFileNameTestJarNotPresent.jar";
// jar name is not present to start with
PigServer pig = new PigServer(cluster.getExecType(), properties);
verifyStringContained(pig.getPigContext().extraJars, jarName, false);
boolean raisedException = false;
try {
} catch (IOException e) {
raisedException = true;
assertTrue("registerJar on jarName ["+jarName+"] should have raised an exception", raisedException);
verifyStringContained(pig.getPigContext().extraJars, jarName, false);
* Jar file to register is not present in the system resources
* in this case name of jar file is relative to current working dir
public void testRegisterJarLocalDir() throws Throwable {
String dir1 = "test1_register_jar_local";
String dir2 = "test2_register_jar_local";
String jarLocation = dir1 + FILE_SEPARATOR +
String jarName = "TestRegisterJarLocal.jar";
createFakeJarFile(jarLocation, jarName);
PigServer pig = new PigServer(cluster.getExecType(), properties);
verifyStringContained(pig.getPigContext().extraJars, jarName, false);
pig.registerJar(jarLocation + jarName);
verifyStringContained(pig.getPigContext().extraJars, jarName, true);
// clean-up
assertTrue((new File(jarLocation + jarName)).delete());
(new File(dir1 + FILE_SEPARATOR + dir2)).delete();
(new File(dir1)).delete();
* Jar file is located via system resources
* Test verifies that even with multiple resources matching,
* only one of them is registered.
public void testRegisterJarFromResources () throws Throwable {
String dir = "test_register_jar_res_dir";
String subDir1 = "test_register_jar_res_sub_dir1";
String subDir2 = "test_register_jar_res_sub_dir2";
String jarName = "TestRegisterJarFromRes.jar";
String jarLocation1 = dir + FILE_SEPARATOR + subDir1 + FILE_SEPARATOR;
String jarLocation2 = dir + FILE_SEPARATOR + subDir2 + FILE_SEPARATOR;
createFakeJarFile(jarLocation1, jarName);
createFakeJarFile(jarLocation2, jarName);
PigServer pig = new PigServer(cluster.getExecType(), properties);
verifyStringContained(pig.getPigContext().extraJars, jarName, false);
verifyStringContained(pig.getPigContext().extraJars, jarName, true);
// clean-up
assertTrue((new File(jarLocation1 + jarName)).delete());
assertTrue((new File(jarLocation2 + jarName)).delete());
(new File(jarLocation1)).delete();
(new File(jarLocation2)).delete();
(new File(dir)).delete();
* Use a resource inside a jar file.
* Verify that the containing jar file is registered correctly.
* @throws Exception
public void testRegisterJarResourceInJar() throws Throwable {
String dir = "test_register_jar_res_in_jar";
String subDir = "sub_dir";
String jarName = "TestRegisterJarNonEmpty.jar";
String className = "TestRegisterJar";
String javaSrc = "package " + subDir + "; class " + className + " { }";
// create dirs
(new File(dir + FILE_SEPARATOR + subDir)).mkdirs();
// generate java file
FileOutputStream outStream =
new FileOutputStream(new File(dir + FILE_SEPARATOR + subDir +
FILE_SEPARATOR + className + ".java"));
OutputStreamWriter outWriter = new OutputStreamWriter(outStream);
// compile
int status;
status = Util.executeJavaCommand("javac " + dir + FILE_SEPARATOR + subDir +
FILE_SEPARATOR + className + ".java");
assertEquals(0, status);
// remove src file
(new File(dir + FILE_SEPARATOR + subDir +
FILE_SEPARATOR + className + ".java")).delete();
// generate jar file
status = Util.executeJavaCommand("jar -cf " + dir + FILE_SEPARATOR + jarName + " " +
"-C " + dir + " " + subDir);
assertEquals(0, status);
// remove class file and sub_dir
(new File(dir + FILE_SEPARATOR + subDir +
FILE_SEPARATOR + className + ".class")).delete();
(new File(dir + FILE_SEPARATOR + subDir)).delete();
// register resource
registerNewResource(dir + FILE_SEPARATOR + jarName);
// load the specific resource
boolean exceptionRaised = false;
PigServer pig = new PigServer(cluster.getExecType(), properties);
try {
catch (IOException e) {
exceptionRaised = true;
// verify proper jar file is located
verifyStringContained(pig.getPigContext().extraJars, jarName, true);
// clean up Jar file and test dir
(new File(dir + FILE_SEPARATOR + jarName)).delete();
(new File(dir)).delete();
public void testRegisterJarGlobbingRelative() throws Throwable {
String dir = "test1_register_jar_globbing_relative";
String jarLocation = dir + FILE_SEPARATOR;
String jar1Name = "TestRegisterJarGlobbing1.jar";
String jar2Name = "TestRegisterJarGlobbing2.jar";
createFakeJarFile(jarLocation, jar1Name);
createFakeJarFile(jarLocation, jar2Name);
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerJar(jarLocation + "TestRegisterJarGlobbing*.jar");
verifyStringContained(pig.getPigContext().extraJars, jar1Name, true);
verifyStringContained(pig.getPigContext().extraJars, jar2Name, true);
// clean-up
assertTrue((new File(jarLocation + jar1Name)).delete());
assertTrue((new File(jarLocation + jar2Name)).delete());
(new File(dir)).delete();
public void testRegisterJarGlobbingAbsolute() throws Throwable {
String dir = "test1_register_jar_globbing_absolute";
String jarLocation = dir + FILE_SEPARATOR;
String jar1Name = "TestRegisterJarGlobbing1.jar";
String jar2Name = "TestRegisterJarGlobbing2.jar";
createFakeJarFile(jarLocation, jar1Name);
createFakeJarFile(jarLocation, jar2Name);
String currentDir = System.getProperty("user.dir");
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerJar(new File(currentDir, dir) + FILE_SEPARATOR + "TestRegisterJarGlobbing*.jar");
verifyStringContained(pig.getPigContext().extraJars, jar1Name, true);
verifyStringContained(pig.getPigContext().extraJars, jar2Name, true);
// clean-up
assertTrue((new File(jarLocation + jar1Name)).delete());
assertTrue((new File(jarLocation + jar2Name)).delete());
(new File(dir)).delete();
public void testRegisterRemoteGlobbingJar() throws Throwable {
String dir = "test1_register_remote_jar_globbing";
String jarLocation = dir + FILE_SEPARATOR;
String jar1Name = "TestRegisterRemoteJarGlobbing1.jar";
String jar2Name = "TestRegisterRemoteJarGlobbing2.jar";
FileSystem fs = cluster.getFileSystem();
createFakeJarFile(jarLocation, jar1Name, fs);
createFakeJarFile(jarLocation, jar2Name, fs);
// find the absolute path for the directory so that it does not
// depend on configuration
String absPath = fs.getFileStatus(new Path(jarLocation)).getPath().toString();
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerJar(absPath + FILE_SEPARATOR + "TestRegister{Remote}Jar*.jar");
verifyStringContained(pig.getPigContext().extraJars, jar1Name, true);
verifyStringContained(pig.getPigContext().extraJars, jar2Name, true);
// clean-up
assertTrue(fs.delete(new Path(jarLocation), true));
public void testDescribeLoad() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
Schema dumpedSchema = pig.dumpSchema("a") ;
Schema expectedSchema = Utils.getSchemaFromString("field1: int,field2: float,field3: chararray");
assertEquals(expectedSchema, dumpedSchema);
public void testDescribeFilter() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = filter a by field1 > 10;") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
Schema expectedSchema = Utils.getSchemaFromString("field1: int,field2: float,field3: chararray");
assertEquals(expectedSchema, dumpedSchema);
public void testDescribeDistinct() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = distinct a ;") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
Schema expectedSchema = Utils.getSchemaFromString("field1: int,field2: float,field3: chararray");
assertEquals(expectedSchema, dumpedSchema);
public void testDescribeSort() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = order a by * desc;") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
Schema expectedSchema = Utils.getSchemaFromString("field1: int,field2: float,field3: chararray");
assertEquals(expectedSchema, dumpedSchema);
public void testDescribeLimit() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = limit a 10;") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
Schema expectedSchema = Utils.getSchemaFromString("field1: int,field2: float,field3: chararray");
assertEquals(expectedSchema, dumpedSchema);
public void testDescribeForeach() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = foreach a generate field1 + 10;") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
Schema expectedSchema = new Schema(new Schema.FieldSchema(null, DataType.INTEGER));
assertEquals(expectedSchema, dumpedSchema);
public void testDescribeForeachFail() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = foreach a generate field1 + 10;") ;
try {
pig.dumpSchema("c") ;
fail("Error expected");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Unable to describe schema for alias c"));
public void testDescribeForeachNoSchema() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' ;") ;
pig.registerQuery("b = foreach a generate *;") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
public void testDescribeCogroup() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = load 'b' as (field4, field5: double, field6: chararray );") ;
pig.registerQuery("c = cogroup a by field1, b by field4;") ;
Schema dumpedSchema = pig.dumpSchema("c") ;
Schema expectedSchema = Utils.getSchemaFromString("group:int,a:{(field1:int,field2:float,field3:chararray)},b:{(field4:bytearray,field5:double,field6:chararray)}");
assertEquals(expectedSchema, dumpedSchema);
public void testDescribeCross() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = load 'b' as (field4, field5: double, field6: chararray );") ;
pig.registerQuery("c = cross a, b;") ;
Schema dumpedSchema = pig.dumpSchema("c") ;
Schema expectedSchema = Utils.getSchemaFromString("a::field1: int,a::field2: float,a::field3: chararray,b::field4: bytearray,b::field5: double,b::field6: chararray");
assertEquals(expectedSchema, dumpedSchema);
public void testDescribeJoin() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = load 'b' as (field4, field5: double, field6: chararray );") ;
pig.registerQuery("c = join a by field1, b by field4;") ;
Schema dumpedSchema = pig.dumpSchema("c");
Schema expectedSchema = Utils.getSchemaFromString("a::field1: int,a::field2: float,a::field3: chararray,b::field4: bytearray,b::field5: double,b::field6: chararray");
assertEquals(expectedSchema, dumpedSchema);
public void testDescribeUnion() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: float, field3: chararray );") ;
pig.registerQuery("b = load 'b' as (field4, field5: double, field6: chararray );") ;
pig.registerQuery("c = union a, b;") ;
Schema dumpedSchema = pig.dumpSchema("c") ;
Schema expectedSchema = Utils.getSchemaFromString("field1: int,field2: double,field3: chararray");
assertEquals(expectedSchema, dumpedSchema);
public void testDescribeTuple2Elem() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int, field2: int, field3: int );") ;
pig.registerQuery("b = foreach a generate field1, (field2, field3);") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
public void testDescribeComplex() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (site: chararray, count: int, itemCounts: bag { itemCountsTuple: tuple (type: chararray, typeCount: int, f: float, m: map[]) } ) ;") ;
pig.registerQuery("b = foreach a generate site, count, FLATTEN(itemCounts);") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
Schema expectedSchema = Utils.getSchemaFromString(
"site: chararray,count: int," +
"itemCounts::type: chararray,itemCounts::typeCount: int," +
"itemCounts::f: float,itemCounts::m: map[ ]");
assertEquals(expectedSchema, dumpedSchema);
// PIG-5243
public void testDescribeAsClause() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (field1: int);");
pig.registerQuery("b = FOREACH a generate field1 as (new_field:chararray);") ;
Schema dumpedSchema = pig.dumpSchema("b") ;
Schema expectedSchema = Utils.getSchemaFromString("new_field: chararray");
assertEquals(expectedSchema, dumpedSchema);
private void registerScalarScript(boolean useScalar, String expectedSchemaStr) throws IOException {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("A = load 'adata' AS (a: int, b: int);");
pig.registerQuery("C = FOREACH A GENERATE *;");
String overrideScalar = useScalar ? "C = FILTER A BY b % 2 == 0; " : "";
pig.registerQuery("B = FOREACH (GROUP A BY a) { " +
overrideScalar +
"D = FILTER A BY b % 2 == 1;" +
"GENERATE group AS a, A.b AS every, C.b AS even, D.b AS odd;" +
Schema dumpedSchema = pig.dumpSchema("B");
Schema expectedSchema = Utils.getSchemaFromString(
assertEquals(expectedSchema, dumpedSchema);
// PIG-3581
public void testScalarPrecedence() throws Throwable {
registerScalarScript(true, "a: int,every: {(b: int)},even: {(b: int)},odd: {(b: int)}");
// PIG-3581
public void testScalarResolution() throws Throwable {
registerScalarScript(false, "a: int,every: {(b: int)},even: int,odd: {(b: int)}");
public void testExplainXmlComplex() throws Throwable {
// TODO: Explain XML output is not supported in non-MR mode. Remove the
// following condition once it's implemented in Tez.
String execType = cluster.getExecType().toString().toLowerCase();
Assume.assumeTrue("Skip this test for TEZ", Util.isMapredExecType(cluster.getExecType()) || Util.isSparkExecType(cluster.getExecType()));
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a' as (site: chararray, count: int, itemCounts: bag { itemCountsTuple: tuple (type: chararray, typeCount: int, f: float, m: map[]) } ) ;") ;
pig.registerQuery("b = foreach a generate site, count, FLATTEN(itemCounts);") ;
pig.registerQuery("c = group b by site;");
pig.registerQuery("d = foreach c generate FLATTEN($1);");
pig.registerQuery("e = group d by $2;");
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
pig.explain("e", "xml", true, false, ps, ps, null, null);
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(bais);
//Verify Logical and Physical Plans aren't supported.
NodeList logicalPlan = doc.getElementsByTagName("logicalPlan");
assertEquals(1, logicalPlan.getLength());
assertTrue(logicalPlan.item(0).getTextContent().contains("Not Supported"));
NodeList physicalPlan = doc.getElementsByTagName("physicalPlan");
assertEquals(1, physicalPlan.getLength());
assertTrue(physicalPlan.item(0).getTextContent().contains("Not Supported"));
if (execType.equals({
} else if (execType.equals(MiniGenericCluster.EXECTYPE_SPARK)){
private void verifyExplainXmlComplexSpark(Document doc) {
NodeList stores = doc.getElementsByTagName("POStore");
assertEquals(1, stores.getLength());
NodeList groups = doc.getElementsByTagName("POJoinGroupSpark");
assertEquals(2, groups.getLength());
Node innerGroup = groups.item(1);
NodeList groupChildren = innerGroup.getChildNodes();
int foreachCount = 0;
int castCount = 0;
int loadCount = 0;
for (int i = 0; i < groupChildren.getLength(); i++) {
Node node = groupChildren.item(i);
if (node.getNodeName().equals("POForEach")){
NodeList foreachNodes = node.getChildNodes();
for (int j = 0; j < foreachNodes.getLength(); j++) {
Node innerNode = foreachNodes.item(j);
if (innerNode.getNodeName().equals("alias")){
}else if (innerNode.getNodeName().equals("POCast")){
}else if (innerNode.getNodeName().equals("POLoad")) {
private void verifyExplainXmlComplexMR(Document doc) {
//Verify we have two loads and one is temporary
NodeList loads = doc.getElementsByTagName("POLoad");
assertEquals(2, loads.getLength());
boolean sawTempLoad = false;
boolean sawNonTempLoad = false;
for (int i = 0; i < loads.getLength(); i++) {
Boolean isTempLoad = null;
boolean hasAlias = false;
Node poLoad = loads.item(i);
NodeList children = poLoad.getChildNodes();
for (int j = 0; j < children.getLength(); j++) {
Node child = children.item(j);
if (child.getNodeName().equals("alias")) {
hasAlias = true;
if (child.getNodeName().equals("isTmpLoad")) {
if (child.getTextContent().equals("false")) {
isTempLoad = false;
} else if (child.getTextContent().equals("true")) {
isTempLoad = true;
if (isTempLoad == null) {
fail("POLoad elements should have isTmpLoad child node.");
} else if (isTempLoad && hasAlias) {
fail("Temp loads should not have aliases");
} else if (!isTempLoad && !hasAlias) {
fail("Non temporary loads should be associated with alias.");
sawTempLoad = sawTempLoad || isTempLoad;
sawNonTempLoad = sawNonTempLoad || !isTempLoad;
assertTrue(sawTempLoad && sawNonTempLoad);
public void testRegisterRemoteScript() throws Throwable {
String scriptName = "";
File scriptFile = File.createTempFile("tmp", "");
PrintWriter pw = new PrintWriter(new FileWriter(scriptFile));
pw.println("@outputSchema(\"word:chararray\")\ndef helloworld():\n return 'Hello, World'");
FileSystem fs = cluster.getFileSystem();
fs.copyFromLocalFile(new Path(scriptFile.getAbsolutePath()), new Path(scriptName));
// find the absolute path for the directory so that it does not
// depend on configuration
String absPath = fs.getFileStatus(new Path(scriptName)).getPath().toString();
Util.createInputFile(cluster, "testRegisterRemoteScript_input", new String[]{"1", "2"});
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerCode(absPath, "jython", "pig");
pig.registerQuery("a = load 'testRegisterRemoteScript_input';");
pig.registerQuery("b = foreach a generate pig.helloworld($0);");
Iterator<Tuple> iter = pig.openIterator("b");
Tuple t =;
assertTrue(t.size() > 0);
assertEquals("Hello, World", t.get(0));
t =;
assertTrue(t.size() > 0);
assertEquals("Hello, World", t.get(0));
public void testPigProperties() throws Throwable {
File propertyFile = new File(tempDir, "");
File cliPropertyFile = new File("");
Properties properties = PropertiesUtil.loadDefaultProperties();
assertEquals("40000000", properties.getProperty("pig.spill.gc.activation.size"));
PrintWriter out = new PrintWriter(new FileWriter(propertyFile));
properties = PropertiesUtil.loadDefaultProperties();
assertEquals("properties", properties.getProperty("test123"));
out = new PrintWriter(new FileWriter(cliPropertyFile));
properties = PropertiesUtil.loadDefaultProperties();
assertEquals("cli_properties", properties.getProperty("test123"));
public void testPigTempDir() throws Throwable {
Properties properties = PropertiesUtil.loadDefaultProperties();
File pigTempDir = new File(tempDir, FILE_SEPARATOR + "tmp" + FILE_SEPARATOR + "test");
properties.put("pig.temp.dir", pigTempDir.getPath());
PigContext pigContext=new PigContext(ExecType.LOCAL, properties);
String tempPath= FileLocalizer.getTemporaryPath(pigContext).toString();
Path path = new Path(tempPath);
FileSystem fs = FileSystem.get(path.toUri(),
FileStatus status = fs.getFileStatus(path.getParent());
// Temporary root dir should have 700 as permission
assertEquals("rwx------", status.getPermission().toString());
public void testUniquePigTempDir() throws Throwable {
Properties properties = PropertiesUtil.loadDefaultProperties();
File pigTempDir = new File(tempDir, FILE_SEPARATOR + "tmp" + FILE_SEPARATOR + "test");
properties.put("pig.temp.dir", pigTempDir.getPath());
PigContext pigContext = new PigContext(ExecType.LOCAL, properties);
Random r = new Random(5);
String tempPath1 = FileLocalizer.getTemporaryPath(pigContext).toString();
r = new Random(5);
String tempPath2 = FileLocalizer.getTemporaryPath(pigContext).toString();
// cleanup
public void testDescribeForEachFlatten() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("a = load 'a';") ;
pig.registerQuery("b = group a by $0;") ;
pig.registerQuery("c = foreach b generate flatten(a);") ;
Schema s = pig.dumpSchema("c") ;
@Test // PIG-2059
public void test1() throws Throwable {
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery("A = load 'x' as (u, v);") ;
try {
pig.registerQuery("B = foreach A generate $2;") ;
fail("Query is supposed to fail.");
} catch(FrontendException ex) {
String msg = "Out of bound access. " +
"Trying to access non-existent column: 2";
Util.checkMessageInException(ex, msg);
public void testDefaultPigProperties() throws Throwable {
//Test with PigServer
PigServer pigServer = new PigServer(cluster.getExecType());
Properties properties = pigServer.getPigContext().getProperties();
assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
assertEquals("true", properties.getProperty("aggregate.warning"));
assertEquals("true", properties.getProperty(PigConfiguration.PIG_OPT_MULTIQUERY));
assertEquals("false", properties.getProperty("stop.on.failure"));
//Test with properties file
File propertyFile = new File(tempDir, "");
properties = PropertiesUtil.loadDefaultProperties();
assertEquals("999", properties.getProperty("pig.exec.reducers.max"));
assertEquals("true", properties.getProperty("aggregate.warning"));
assertEquals("true", properties.getProperty(PigConfiguration.PIG_OPT_MULTIQUERY));
assertEquals("false", properties.getProperty("stop.on.failure"));
PrintWriter out = new PrintWriter(new FileWriter(propertyFile));
properties = PropertiesUtil.loadDefaultProperties();
assertEquals("false", properties.getProperty("aggregate.warning"));
assertEquals("false", properties.getProperty(PigConfiguration.PIG_OPT_MULTIQUERY));
assertEquals("true", properties.getProperty("stop.on.failure"));
// See PIG-4109
public void testRegisterJarRemoteScript() throws Throwable {
if (Util.WINDOWS) {
properties.setProperty("", "true");
String jarName = JarManager.findContainingJar(org.codehaus.jackson.JsonParser.class);
PigServer pig = new PigServer(cluster.getExecType(), properties);