blob: e378254be2bfe62134759977f0f89acf96cddeb7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.test.utils.Identity;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class TestEvalPipeline2 {
static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
private PigServer pigServer;
TupleFactory mTf = TupleFactory.getInstance();
BagFactory mBf = BagFactory.getInstance();
@Before
public void setUp() throws Exception{
pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
@Test
public void testUdfInputOrder() throws IOException {
String[] input = {
"(123)",
"((123)",
"(123123123123)",
"(asdf)"
};
Util.createInputFile(cluster, "table_udfInp", input);
pigServer.registerQuery("a = load 'table_udfInp' as (i:int);");
pigServer.registerQuery("b = foreach a {dec = 'hello'; str1 = " + Identity.class.getName() +
"(dec,'abc','def');" +
"generate dec,str1; };");
Iterator<Tuple> it = pigServer.openIterator("b");
Tuple tup=null;
//tuple 1
tup = it.next();
Tuple out = (Tuple)tup.get(1);
Assert.assertEquals( out.get(0).toString(), "hello");
Assert.assertEquals(out.get(1).toString(), "abc");
Assert.assertEquals(out.get(2).toString(), "def");
Util.deleteFile(cluster, "table_udfInp");
}
@Test
public void testUDFwithStarInput() throws Exception {
int LOOP_COUNT = 10;
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j=0;j<LOOP_COUNT;j+=2){
ps.println(i+"\t"+j);
ps.println(i+"\t"+j);
}
}
ps.close();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "';");
pigServer.registerQuery("B = group A by $0;");
String query = "C = foreach B {"
+ "generate " + Identity.class.getName() +"(*);"
+ "};";
pigServer.registerQuery(query);
Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) Assert.fail("No output found");
List<Tuple> actualResList = new ArrayList<Tuple>();
while(iter.hasNext()){
actualResList.add(iter.next());
}
Util.sortQueryOutputsIfNeed(actualResList,Util.isSparkExecType(cluster.getExecType()));
int numIdentity = 0;
for (Tuple tuple : actualResList) {
Tuple t = (Tuple)tuple.get(0);
Assert.assertEquals(DataByteArray.class, t.get(0).getClass());
int group = Integer.parseInt(new String(((DataByteArray)t.get(0)).get()));
Assert.assertEquals(numIdentity, group);
Assert.assertTrue(t.get(1) instanceof DataBag);
DataBag bag = (DataBag)t.get(1);
Assert.assertEquals(10, bag.size());
Assert.assertEquals(2, t.size());
++numIdentity;
}
Assert.assertEquals(LOOP_COUNT, numIdentity);
}
@Test
public void testBinStorageByteArrayCastsSimple() throws IOException {
// Test for PIG-544 fix
// Tries to read data in BinStorage bytearrays as other pig types,
// should return null if the conversion fails.
// This test case does not use a practical example , it just tests
// if the conversion happens when minimum conditions for conversion
// such as expected number of bytes are met.
String[] input = {
"asdf\t12\t1.1\t231\t234\t3024123\t3.2492",
"sa\t1231\t123.4\t12345678\t1234.567\t5081123453\t9.181817",
"asdff\t1232123\t1.45345\t123456789\t123456789.9\t1234567\t1.234567"
};
Util.createInputFile(cluster, "table_bs_ac", input);
// test with BinStorage
pigServer.registerQuery("a = load 'table_bs_ac';");
String output = "/pig/out/TestEvalPipeline2_BinStorageByteArrayCasts";
pigServer.deleteFile(output);
pigServer.store("a", output, BinStorage.class.getName());
pigServer.registerQuery("b = load '" + output + "' using BinStorage('Utf8StorageConverter') "
+ "as (name: int, age: int, gpa: float, lage: long, dgpa: double, bi:biginteger, bd:bigdecimal);");
Iterator<Tuple> it = pigServer.openIterator("b");
Tuple tup=null;
// I have separately verified only few of the successful conversions,
// assuming the rest are correct.
// It is primarily testing if null is being returned when conversions
// are expected to fail
//tuple 1
tup = it.next();
Assert.assertTrue((Integer)tup.get(0) == null);
Assert.assertTrue((Integer)tup.get(1) == 12);
Assert.assertTrue((Float)tup.get(2) == 1.1F);
Assert.assertTrue((Long)tup.get(3) == 231L);
Assert.assertTrue((Double)tup.get(4) == 234.0);
Assert.assertEquals((BigInteger)tup.get(5), new BigInteger("3024123"));
Assert.assertEquals((BigDecimal)tup.get(6), new BigDecimal("3.2492"));
//tuple 2
tup = it.next();
Assert.assertTrue(tup.get(0) == null);
Assert.assertTrue((Integer)tup.get(1) == 1231);
Assert.assertTrue((Float)tup.get(2) == 123.4F);
Assert.assertTrue((Long)tup.get(3) == 12345678L);
Assert.assertTrue((Double)tup.get(4) == 1234.567);
Assert.assertEquals((BigInteger)tup.get(5), new BigInteger("5081123453"));
Assert.assertEquals((BigDecimal)tup.get(6), new BigDecimal("9.181817"));
//tuple 3
tup = it.next();
Assert.assertTrue(tup.get(0) == null);
Assert.assertTrue((Integer)tup.get(1) == 1232123);
Assert.assertTrue((Float)tup.get(2) == 1.45345F);
Assert.assertTrue((Long)tup.get(3) == 123456789L);
Assert.assertTrue((Double)tup.get(4) == 1.234567899E8);
Assert.assertEquals((BigInteger)tup.get(5), new BigInteger("1234567"));
Assert.assertEquals((BigDecimal)tup.get(6), new BigDecimal("1.234567"));
Util.deleteFile(cluster, "table");
}
@Test
public void testBinStorageByteArrayCastsComplexBag() throws IOException {
// Test for PIG-544 fix
// Tries to read data in BinStorage bytearrays as other pig bags,
// should return null if the conversion fails.
String[] input = {
"{(asdf)}",
"{(2344)}",
"{(2344}",
"{(323423423423434)}",
"{(323423423423434L)}",
"{(asdff)}"
};
Util.createInputFile(cluster, "table_bs_ac_clx", input);
// test with BinStorage
pigServer.registerQuery("a = load 'table_bs_ac_clx' as (f1);");
pigServer.registerQuery("b = foreach a generate (bag{tuple(int)})f1;");
Iterator<Tuple> it = pigServer.openIterator("b");
Tuple tup=null;
//tuple 1
tup = it.next();
Assert.assertTrue(tup.get(0) != null);
//tuple 2
tup = it.next();
Assert.assertTrue(tup.get(0) != null);
//tuple 3 - malformed
tup = it.next();
Assert.assertTrue(tup.get(0) == null);
//tuple 4 - integer exceeds size limit
tup = it.next();
Assert.assertTrue(tup.get(0) instanceof DataBag);
DataBag db = (DataBag)tup.get(0);
Assert.assertTrue(db.iterator().hasNext());
Tuple innerTuple = db.iterator().next();
Assert.assertTrue(innerTuple.get(0)==null);
//tuple 5
tup = it.next();
Assert.assertTrue(tup.get(0) != null);
//tuple 6
tup = it.next();
Assert.assertTrue(tup.get(0) != null);
Util.deleteFile(cluster, "table_bs_ac_clx");
}
@Test
public void testBinStorageByteArrayCastsComplexTuple() throws IOException {
// Test for PIG-544 fix
// Tries to read data in BinStorage bytearrays as other pig bags,
// should return null if the conversion fails.
String[] input = {
"(123)",
"((123)",
"(123123123123)",
"(asdf)"
};
Util.createInputFile(cluster, "table_bs_ac_clxt", input);
// test with BinStorage
pigServer.registerQuery("a = load 'table_bs_ac_clxt' as (t:tuple(t:tuple(i:int)));");
Iterator<Tuple> it = pigServer.openIterator("a");
Tuple tup=null;
//tuple 1
tup = it.next();
Assert.assertTrue(tup.get(0) == null);
//tuple 2 -malformed tuple
tup = it.next();
Assert.assertTrue(tup.get(0) == null);
//tuple 3 - integer exceeds size limit
tup = it.next();
Assert.assertTrue(tup.get(0) == null);
//tuple 4
tup = it.next();
Assert.assertTrue(tup.get(0) == null);
Util.deleteFile(cluster, "table_bs_ac_clxt");
}
@Test
public void testPigStorageWithCtrlChars() throws Exception {
String[] inputData = { "hello\u0001world", "good\u0001morning", "nice\u0001day" };
Util.createInputFile(cluster, "testPigStorageWithCtrlCharsInput.txt", inputData);
String script = "a = load 'testPigStorageWithCtrlCharsInput.txt' using PigStorage('\u0001');" +
"b = foreach a generate $0, CONCAT($0, '\u0005'), $1; " +
"store b into 'testPigStorageWithCtrlCharsOutput.txt' using PigStorage('\u0001');" +
"c = load 'testPigStorageWithCtrlCharsOutput.txt' using PigStorage('\u0001') as (f1:chararray, f2:chararray, f3:chararray);";
Util.registerMultiLineQuery(pigServer, script);
Iterator<Tuple> it = pigServer.openIterator("c");
HashMap<String, Tuple> expectedResults = new HashMap<String, Tuple>();
expectedResults.put("hello", (Tuple) Util.getPigConstant("('hello','hello\u0005','world')"));
expectedResults.put("good", (Tuple) Util.getPigConstant("('good','good\u0005','morning')"));
expectedResults.put("nice", (Tuple) Util.getPigConstant("('nice','nice\u0005','day')"));
HashMap<String, Boolean> seen = new HashMap<String, Boolean>();
int numRows = 0;
while(it.hasNext()) {
Tuple t = it.next();
String firstCol = (String) t.get(0);
Assert.assertFalse(seen.containsKey(firstCol));
seen.put(firstCol, true);
Assert.assertEquals(expectedResults.get(firstCol), t);
numRows++;
}
Assert.assertEquals(3, numRows);
Util.deleteFile(cluster, "testPigStorageWithCtrlCharsInput.txt");
}
@Test
// Test case added for PIG-850
public void testLimitedSortWithDump() throws Exception{
int LOOP_COUNT = 40;
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
Random r = new Random(1);
int rand;
for(int i = 0; i < LOOP_COUNT; i++) {
rand = r.nextInt(100);
ps.println(rand);
}
ps.close();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "' AS (num:int);");
pigServer.registerQuery("B = order A by num parallel 2;");
pigServer.registerQuery("C = limit B 10;");
Iterator<Tuple> result = pigServer.openIterator("C");
int numIdentity = 0;
while (result.hasNext())
{
result.next();
++numIdentity;
}
Assert.assertEquals(10, numIdentity);
}
@Test
public void testLimitAfterSort() throws Exception{
int LOOP_COUNT = 40;
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
Random r = new Random(1);
int rand;
for(int i = 0; i < LOOP_COUNT; i++) {
rand = r.nextInt(100);
ps.println(rand);
}
ps.close();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "' AS (num:int);");
pigServer.registerQuery("B = order A by num parallel 2;");
pigServer.registerQuery("C = limit B 10;");
Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) Assert.fail("No output found");
int numIdentity = 0;
int oldNum = Integer.MIN_VALUE;
int newNum;
while(iter.hasNext()){
Tuple t = iter.next();
newNum = (Integer)t.get(0);
Assert.assertTrue(newNum>=oldNum);
oldNum = newNum;
++numIdentity;
}
Assert.assertEquals(10, numIdentity);
}
@Test
public void testLimitAfterSortDesc() throws Exception{
int LOOP_COUNT = 40;
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
Random r = new Random(1);
int rand;
for(int i = 0; i < LOOP_COUNT; i++) {
rand = r.nextInt(100);
ps.println(rand);
}
ps.close();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "' AS (num:int);");
pigServer.registerQuery("B = order A by num desc parallel 2;");
pigServer.registerQuery("C = limit B 10;");
Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) Assert.fail("No output found");
int numIdentity = 0;
int oldNum = Integer.MAX_VALUE;
int newNum;
while(iter.hasNext()){
Tuple t = iter.next();
newNum = (Integer)t.get(0);
Assert.assertTrue(newNum<=oldNum);
oldNum = newNum;
++numIdentity;
}
Assert.assertEquals(10, numIdentity);
}
@Test
// See PIG-894
public void testEmptySort() throws Exception{
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigServer
.getPigContext()) + "';");
pigServer.registerQuery("B = order A by $0;");
Iterator<Tuple> iter = pigServer.openIterator("B");
Assert.assertTrue(iter.hasNext()==false);
}
// See PIG-761
@Test
public void testLimitPOPackageAnnotator() throws Exception{
File tmpFile1 = Util.createTempFileDelOnExit("test1", "txt");
PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
ps1.println("1\t2\t3");
ps1.println("2\t5\t2");
ps1.close();
File tmpFile2 = Util.createTempFileDelOnExit("test2", "txt");
PrintStream ps2 = new PrintStream(new FileOutputStream(tmpFile2));
ps2.println("1\t1");
ps2.println("2\t2");
ps2.close();
pigServer.registerQuery("A = LOAD '" + Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = LOAD '" + Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = LIMIT B 100;");
pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;");
Iterator<Tuple> iter = pigServer.openIterator("D");
if (Util.isSparkExecType(cluster.getExecType())) {
String[] expectedResults =
new String[] {"(2,{(2,2)},{(2,5,2)})", "(1,{(1,1)},{(1,2,3)})" };
Util.checkQueryOutputs(iter, expectedResults,
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")), Util.isSparkExecType(cluster.getExecType()));
} else {
Assert.assertTrue(iter.hasNext());
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})"));
Assert.assertTrue(iter.hasNext());
t = iter.next();
Assert.assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})"));
Assert.assertFalse(iter.hasNext());
}
}
// See PIG-1195
@Test
public void testNestedDescSort() throws Exception{
Util.createInputFile(cluster, "table_testNestedDescSort", new String[]{"3","4"});
pigServer.registerQuery("A = LOAD 'table_testNestedDescSort' as (a0:int);");
pigServer.registerQuery("B = group A ALL;");
pigServer.registerQuery("C = foreach B { D = order A by a0 desc;generate D;};");
Iterator<Tuple> iter = pigServer.openIterator("C");
Assert.assertTrue(iter.hasNext());
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("({(4),(3)})"));
Assert.assertFalse(iter.hasNext());
Util.deleteFile(cluster, "table_testNestedDescSort");
}
// See PIG-972
@Test
public void testDescribeNestedAlias() throws Exception{
String[] input = {
"1\t3",
"2\t4",
"3\t5"
};
Util.createInputFile(cluster, "table_testDescribeNestedAlias", input);
pigServer.registerQuery("A = LOAD 'table_testDescribeNestedAlias' as (a0, a1);");
pigServer.registerQuery("P = GROUP A by a1;");
// Test RelationalOperator
pigServer.registerQuery("B = FOREACH P { D = ORDER A by $0; generate group, D.$0; };");
// Test ExpressionOperator - negative test case
pigServer.registerQuery("C = FOREACH A { D = a0/a1; E=a1/a0; generate E as newcol; };");
Schema schema = pigServer.dumpSchemaNested("B", "D");
Assert.assertTrue(schema.toString().equalsIgnoreCase("{a0: bytearray,a1: bytearray}"));
try {
schema = pigServer.dumpSchemaNested("C", "E");
} catch (FrontendException e) {
Assert.assertTrue(e.getErrorCode() == 1113);
}
}
// See PIG-1484
@Test
public void testBinStorageCommaSeperatedPath() throws Exception{
String[] input = {
"1\t3",
"2\t4",
"3\t5"
};
Util.createInputFile(cluster, "table_simple1", input);
pigServer.setBatchOn();
pigServer.registerQuery("A = LOAD 'table_simple1' as (a0, a1);");
pigServer.registerQuery("store A into 'table_simple1.bin' using BinStorage();");
pigServer.registerQuery("store A into 'table_simple2.bin' using BinStorage();");
pigServer.executeBatch();
pigServer.registerQuery("A = LOAD 'table_simple1.bin,table_simple2.bin' using BinStorage();");
Iterator<Tuple> iter = pigServer.openIterator("A");
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("(1,3)"));
t = iter.next();
Assert.assertTrue(t.toString().equals("(2,4)"));
t = iter.next();
Assert.assertTrue(t.toString().equals("(3,5)"));
t = iter.next();
Assert.assertTrue(t.toString().equals("(1,3)"));
t = iter.next();
Assert.assertTrue(t.toString().equals("(2,4)"));
t = iter.next();
Assert.assertTrue(t.toString().equals("(3,5)"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1543
@Test
public void testEmptyBagIterator() throws Exception{
String[] input1 = {
"1",
"1",
"1"
};
String[] input2 = {
"2",
"2"
};
Util.createInputFile(cluster, "input1", input1);
Util.createInputFile(cluster, "input2", input2);
pigServer.registerQuery("A = load 'input1' as (a1:int);");
pigServer.registerQuery("B = load 'input2' as (b1:int);");
pigServer.registerQuery("C = COGROUP A by a1, B by b1;");
pigServer.registerQuery("C1 = foreach C { Alim = limit A 1; Blim = limit B 1; generate Alim, Blim; };");
pigServer.registerQuery("D1 = FOREACH C1 generate Alim,Blim, (IsEmpty(Alim)? 0:1), (IsEmpty(Blim)? 0:1), COUNT(Alim), COUNT(Blim);");
Iterator<Tuple> iter = pigServer.openIterator("D1");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
new String[] {
"({(1)},{},1,0,1L,0L)",
"({},{(2)},0,1,0L,1L)" });
Util.checkQueryOutputsAfterSort(iter, expectedResults);
}
// See PIG-1669
@Test
public void testPushUpFilterScalar() throws Exception{
String[] input1 = {
"jason\t14\t4.7",
"jack\t18\t4.6"
};
String[] input2 = {
"jason\t14",
"jack\t18"
};
Util.createInputFile(cluster, "table_PushUpFilterScalar1", input1);
Util.createInputFile(cluster, "table_PushUpFilterScalar2", input2);
pigServer.registerQuery("A = load 'table_PushUpFilterScalar1' as (name, age, gpa);");
pigServer.registerQuery("B = load 'table_PushUpFilterScalar2' as (name, age);");
pigServer.registerQuery("C = filter A by age < 20;");
pigServer.registerQuery("D = filter B by age < 20;");
pigServer.registerQuery("simple_scalar = limit D 1;");
pigServer.registerQuery("E = join C by name, D by name;");
pigServer.registerQuery("F = filter E by C::age==(int)simple_scalar.age;");
Iterator<Tuple> iter = pigServer.openIterator("F");
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("(jason,14,4.7,jason,14)"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1683
@Test
public void testDuplicateReferenceInnerPlan() throws Exception{
String[] input1 = {
"1\t1\t1",
};
String[] input2 = {
"1\t1",
"2\t2"
};
Util.createInputFile(cluster, "table_testDuplicateReferenceInnerPlan1", input1);
Util.createInputFile(cluster, "table_testDuplicateReferenceInnerPlan2", input2);
pigServer.registerQuery("a = load 'table_testDuplicateReferenceInnerPlan1' as (a0, a1, a2);");
pigServer.registerQuery("b = load 'table_testDuplicateReferenceInnerPlan2' as (b0, b1);");
pigServer.registerQuery("c = join a by a0, b by b0;");
pigServer.registerQuery("d = foreach c {d0 = a::a1;d1 = a::a2;generate ((d0 is not null)? d0 : d1);};");
Iterator<Tuple> iter = pigServer.openIterator("d");
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("(1)"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1719
@Test
public void testBinCondSchema() throws IOException {
String[] inputData = new String[] {"hello world\t2"};
Util.createInputFile(cluster, "table_testSchemaSerialization.txt", inputData);
pigServer.registerQuery("a = load 'table_testSchemaSerialization.txt' as (a0:chararray, a1:int);");
pigServer.registerQuery("b = foreach a generate FLATTEN((a1<=1?{('null')}:TOKENIZE(a0)));");
pigServer.registerQuery("c = foreach b generate UPPER($0);");
Iterator<Tuple> it = pigServer.openIterator("c");
Tuple t = it.next();
Assert.assertTrue(t.get(0).equals("HELLO"));
t = it.next();
Assert.assertTrue(t.get(0).equals("WORLD"));
}
// See PIG-1721
@Test
public void testDuplicateInnerAlias() throws Exception{
String[] input1 = {
"1\t[key1#5]", "1\t[key2#5]", "2\t[key1#3]"
};
Util.createInputFile(cluster, "table_testDuplicateInnerAlias", input1);
pigServer.registerQuery("a = load 'table_testDuplicateInnerAlias' as (a0:int, a1:map[]);");
pigServer.registerQuery("b = filter a by a0==1;");
pigServer.registerQuery("c = foreach b { b0 = a1#'key1'; generate ((b0 is null or b0 == '')?1:0);};");
Iterator<Tuple> iter = pigServer.openIterator("c");
Tuple t = iter.next();
Assert.assertTrue((Integer)t.get(0)==0);
t = iter.next();
Assert.assertTrue((Integer)t.get(0)==1);
Assert.assertFalse(iter.hasNext());
}
// See PIG-3379
@Test
public void testNestedOperatorReuse() throws Exception{
String[] input1 = {
"60000\tdv1\txuaHeartBeat",
"70000\tdv2\txuaHeartBeat",
"80000\tdv1\txuaPowerOff",
"90000\tdv1\txuaPowerOn",
"110000\tdv2\txuaHeartBeat",
"120000\tdv2\txuaPowerOff",
"140000\tdv2\txuaPowerOn",
"150000\tdv1\txuaHeartBeat",
"160000\tdv2\txuaHeartBeat",
"250000\tdv1\txuaHeartBeat",
"310000\tdv2\txuaPowerOff",
"360000\tdv1\txuaPowerOn",
"420000\tdv3\txuaHeartBeat",
"450000\tdv3\txuaHeartBeat",
"540000\tdv4\txuaPowerOn",
"550000\tdv3\txuaHeartBeat",
"560000\tdv5\txuaHeartBeat" };
Util.createInputFile( cluster, "table_testNestedOperatorReuse", input1 );
String query = "Events = LOAD 'table_testNestedOperatorReuse' AS (eventTime:long, deviceId:chararray, eventName:chararray);" +
"Events = FOREACH Events GENERATE eventTime, deviceId, eventName;" +
"EventsPerMinute = GROUP Events BY (eventTime / 60000);" +
"EventsPerMinute = FOREACH EventsPerMinute {" +
" DistinctDevices = DISTINCT Events.deviceId;" +
" nbDevices = SIZE(DistinctDevices);" +
" DistinctDevices = FILTER Events BY eventName == 'xuaHeartBeat';" +
" nbDevicesWatching = SIZE(DistinctDevices);" +
" GENERATE $0*60000 as timeStamp, nbDevices as nbDevices, nbDevicesWatching as nbDevicesWatching;" +
"}" +
"EventsPerMinute = FILTER EventsPerMinute BY timeStamp >= 0 AND timeStamp < 300000;";
pigServer.registerQuery(query);
Iterator<Tuple> iter = pigServer.openIterator("EventsPerMinute");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
new String[]{"(60000L,2L,3L)", "(120000L,2L,2L)", "(240000L,1L,1L)"});
Util.checkQueryOutputs(iter, expectedResults, Util.isSparkExecType(cluster.getExecType()));
}
// See PIG-1729
@Test
public void testDereferenceInnerPlan() throws Exception{
String[] input1 = {
"1\t2\t3"
};
String[] input2 = {
"1\t1"
};
Util.createInputFile(cluster, "table_testDereferenceInnerPlan1", input1);
Util.createInputFile(cluster, "table_testDereferenceInnerPlan2", input2);
pigServer.registerQuery("a = load 'table_testDereferenceInnerPlan1' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("b = load 'table_testDereferenceInnerPlan2' as (b0:int, b1:int);");
pigServer.registerQuery("c = cogroup a by a0, b by b0;");
pigServer.registerQuery("d = foreach c generate ((COUNT(a)==0L)?null : a.a0) as d0;");
pigServer.registerQuery("e = foreach d generate flatten(d0);");
pigServer.registerQuery("f = group e all;");
Iterator<Tuple> iter = pigServer.openIterator("f");
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("(all,{(1)})"));
Assert.assertFalse(iter.hasNext());
}
@Test
// See PIG-1725
public void testLOGenerateSchema() throws Exception{
String[] input1 = {
"1\t2\t{(1)}",
};
Util.createInputFile(cluster, "table_testLOGenerateSchema", input1);
pigServer.registerQuery("a = load 'table_testLOGenerateSchema' as (a0:int, a1, a2:bag{});");
pigServer.registerQuery("b = foreach a generate a0 as b0, a1 as b1, flatten(a2) as b2:int;");
pigServer.registerQuery("c = filter b by b0==1;");
pigServer.registerQuery("d = foreach c generate b0+1, b2;");
Iterator<Tuple> iter = pigServer.openIterator("d");
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("(2,1)"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1737
@Test
public void testMergeSchemaErrorMessage() throws IOException {
try {
pigServer.registerQuery("a = load '1.txt' as (a0, a1, a2);");
pigServer.registerQuery("b = group a by (a0, a1);");
pigServer.registerQuery("c = foreach b generate flatten(group) as c0;");
pigServer.openIterator("c");
} catch (Exception e) {
PigException pe = LogUtils.getPigException(e);
Util.checkStrContainsSubStr(pe.getMessage(), "Incompatible schema");
return;
}
Assert.fail();
}
// See PIG-1732
@Test
public void testForEachDupColumn() throws Exception{
String[] input1 = {
"1\t2",
};
String[] input2 = {
"1\t1\t3",
"2\t4\t2"
};
Util.createInputFile(cluster, "table_testForEachDupColumn1", input1);
Util.createInputFile(cluster, "table_testForEachDupColumn2", input2);
pigServer.registerQuery("a = load 'table_testForEachDupColumn1' as (a0, a1:int);");
pigServer.registerQuery("b = load 'table_testForEachDupColumn2' as (b0, b1:int, b2);");
pigServer.registerQuery("c = foreach a generate a0, a1, a1 as a2;");
pigServer.registerQuery("d = union b, c;");
pigServer.registerQuery("e = foreach d generate $1;");
Iterator<Tuple> iter = pigServer.openIterator("e");
Map<Object, Object> expected = new HashMap<Object, Object>(3);
expected.put(1, null);
expected.put(2, null);
expected.put(4, null);
Tuple t = iter.next();
Assert.assertTrue(t.size()==1);
Assert.assertTrue(expected.containsKey(t.get(0)));
t = iter.next();
Assert.assertTrue(t.size()==1);
Assert.assertTrue(expected.containsKey(t.get(0)));
t = iter.next();
Assert.assertTrue(t.size()==1);
Assert.assertTrue(expected.containsKey(t.get(0)));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1745
@Test
public void testBinStorageByteCast() throws Exception{
String[] input1 = {
"1\t2\t3",
};
Util.createInputFile(cluster, "table_testBinStorageByteCast", input1);
pigServer.registerQuery("a = load 'table_testBinStorageByteCast' as (a0, a1, a2);");
pigServer.store("a", "table_testBinStorageByteCast.temp", BinStorage.class.getName());
pigServer.registerQuery("a = load 'table_testBinStorageByteCast.temp' using BinStorage() as (a0, a1, a2);");
pigServer.registerQuery("b = foreach a generate (long)a0;");
try {
pigServer.openIterator("b");
} catch (Exception e) {
PigException pe = LogUtils.getPigException(e);
//This changes in hadoop 23, we get error code 2997
//Assert.assertTrue(pe.getErrorCode()==1118);
return;
}
Assert.fail();
}
// See PIG-1761
@Test
public void testBagDereferenceInMiddle1() throws Exception{
String[] input1 = {
"foo@apache#44",
};
Util.createInputFile(cluster, "table_testBagDereferenceInMiddle1", input1);
pigServer.registerQuery("a = load 'table_testBagDereferenceInMiddle1' as (a0:chararray);");
pigServer.registerQuery("b = foreach a generate UPPER(REGEX_EXTRACT_ALL(a0, '.*@(.*)#.*').$0);");
Iterator<Tuple> iter = pigServer.openIterator("b");
Tuple t = iter.next();
Assert.assertTrue(t.size()==1);
Assert.assertTrue(t.get(0).equals("APACHE"));
}
// See PIG-1843
@Test
public void testBagDereferenceInMiddle2() throws Exception{
String[] input1 = {
"foo apache",
};
Util.createInputFile(cluster, "table_testBagDereferenceInMiddle2", input1);
pigServer.registerQuery("a = load 'table_testBagDereferenceInMiddle2' as (a0:chararray);");
pigServer.registerQuery("b = foreach a generate " + MapGenerate.class.getName() + " (STRSPLIT(a0).$0);");
Iterator<Tuple> iter = pigServer.openIterator("b");
Tuple t = iter.next();
Assert.assertTrue(t.size()==1);
Assert.assertTrue(t.toString().equals("([key#1])"));
}
// See PIG-1766
@Test
public void testForEachSameOriginColumn1() throws Exception {
String[] input1 = {
"1\t2",
"1\t3",
"2\t4",
"2\t5",
};
String[] input2 = {
"1\tone",
"2\ttwo",
};
Util.createInputFile(cluster, "table_testForEachSameOriginColumn1_1", input1);
Util.createInputFile(cluster, "table_testForEachSameOriginColumn1_2", input2);
pigServer.registerQuery("A = load 'table_testForEachSameOriginColumn1_1' AS (a0:int, a1:int);");
pigServer.registerQuery("B = load 'table_testForEachSameOriginColumn1_2' AS (b0:int, b1:chararray);");
pigServer.registerQuery("C = join A by a0, B by b0;");
pigServer.registerQuery("D = foreach B generate b0 as d0, b1 as d1;");
pigServer.registerQuery("E = join C by a1, D by d0;");
pigServer.registerQuery("F = foreach E generate b1, d1;");
Iterator<Tuple> iter = pigServer.openIterator("F");
Tuple t = iter.next();
Assert.assertTrue(t.size()==2);
Assert.assertTrue(t.get(0).equals("one"));
Assert.assertTrue(t.get(1).equals("two"));
}
// See PIG-1771
@Test
public void testLoadWithDifferentSchema() throws Exception{
String[] input1 = {
"hello\thello\t(hello)\t[key#value]",
};
Util.createInputFile(cluster, "table_testLoadWithDifferentSchema1", input1);
pigServer.registerQuery("a = load 'table_testLoadWithDifferentSchema1' as (a0:chararray, a1:chararray, a2, a3:map[]);");
pigServer.store("a", "table_testLoadWithDifferentSchema1.bin", "org.apache.pig.builtin.BinStorage");
pigServer.registerQuery("b = load 'table_testLoadWithDifferentSchema1.bin' USING BinStorage('Utf8StorageConverter') AS (b0:chararray, b1:chararray, b2:tuple(), b3:map[]);");
Iterator<Tuple> iter = pigServer.openIterator("b");
Tuple t = iter.next();
Assert.assertTrue(t.size()==4);
Assert.assertTrue(t.toString().equals("(hello,hello,(hello),[key#value])"));
}
static public class MapGenerate extends EvalFunc<Map<String, Integer>> {
@Override
public Map<String, Integer> exec(Tuple input) throws IOException {
Map<String, Integer> m = new HashMap<String, Integer>();
m.put("key", new Integer(input.size()));
return m;
}
@Override
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(getSchemaName("parselong", input), DataType.MAP));
}
}
// See PIG-1277
@Test
public void testWrappingUnknownKey1() throws Exception{
String[] input1 = {
"1",
};
Util.createInputFile(cluster, "table_testWrappingUnknownKey1", input1);
pigServer.registerQuery("a = load 'table_testWrappingUnknownKey1' as (a0);");
pigServer.registerQuery("b = foreach a generate a0, "+ MapGenerate.class.getName() + "(*) as m:map[];");
pigServer.registerQuery("c = foreach b generate a0, m#'key' as key;");
pigServer.registerQuery("d = group c by key;");
Iterator<Tuple> iter = pigServer.openIterator("d");
Tuple t = iter.next();
Assert.assertTrue(t.size()==2);
Assert.assertTrue(t.toString().equals("(1,{(1,1)})"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-999
@Test
public void testWrappingUnknownKey2() throws Exception{
String[] input1 = {
"1",
};
Util.createInputFile(cluster, "table_testWrappingUnknownKey2", input1);
pigServer.registerQuery("a = load 'table_testWrappingUnknownKey2' as (a0);");
pigServer.registerQuery("b = foreach a generate a0, "+ MapGenerate.class.getName() + "(*) as m:map[];");
pigServer.registerQuery("c = foreach b generate a0, m#'key' as key;");
pigServer.registerQuery("d = order c by key;");
Iterator<Tuple> iter = pigServer.openIterator("d");
Tuple t = iter.next();
Assert.assertTrue(t.size()==2);
Assert.assertTrue(t.toString().equals("(1,1)"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1065
@Test
public void testWrappingUnknownKey3() throws Exception{
String[] input1 = {
"1\t2",
"2\t3"
};
String[] input2 = {
"1",
};
Util.createInputFile(cluster, "table_testWrappingUnknownKey3_1", input1);
Util.createInputFile(cluster, "table_testWrappingUnknownKey3_2", input2);
pigServer.registerQuery("a = load 'table_testWrappingUnknownKey3_1' as (a0:chararray, a1:chararray);");
pigServer.registerQuery("b = load 'table_testWrappingUnknownKey3_2' as (b0:chararray);");
pigServer.registerQuery("c = union a, b;");
pigServer.registerQuery("d = order c by $0;");
Collection<String> results = new HashSet<String>();
results.add("(1,2)");
results.add("(1)");
results.add("(2,3)");
Iterator<Tuple> iter = pigServer.openIterator("d");
Tuple t = iter.next();
Assert.assertTrue(results.contains(t.toString()));
t = iter.next();
Assert.assertTrue(results.contains(t.toString()));
t = iter.next();
Assert.assertTrue(results.contains(t.toString()));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1787
@Test
public void testOrderByLimitJoin() throws Exception{
String[] input1 = {
"1\t1",
"1\t2"
};
Util.createInputFile(cluster, "table_testOrderByLimitJoin", input1);
pigServer.registerQuery("a = load 'table_testOrderByLimitJoin' as (a0, a1);");
pigServer.registerQuery("b = group a by a0;");
pigServer.registerQuery("c = foreach b generate group as c0, COUNT(a) as c1;");
pigServer.registerQuery("d = order c by c1 parallel 2;");
pigServer.registerQuery("e = limit d 10;");
pigServer.registerQuery("f = join e by c0, a by a0;");
Iterator<Tuple> iter = pigServer.openIterator("f");
String[] expected = new String[] {"(1,2,1,1)", "(1,2,1,2)"};
Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("f")));
}
// See PIG-1785
@Test
public void testForEachSameOriginColumn2() throws Exception{
String[] input1 = {
"{(1,2),(2,3)}",
};
Util.createInputFile(cluster, "table_testForEachSameOriginColumn2", input1);
pigServer.registerQuery("a = load 'table_testForEachSameOriginColumn2' as (a0:bag{t:tuple(i0:int, i1:int)});");
pigServer.registerQuery("b = foreach a generate flatten(a0) as (b0, b1), flatten(a0) as (b2, b3);");
pigServer.registerQuery("c = filter b by b0>b2;");
Iterator<Tuple> iter = pigServer.openIterator("c");
Tuple t = iter.next();
Assert.assertTrue(t.toString().contains("(2,3,1,2)"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1785
@Test
public void testForEachSameOriginColumn3() throws Exception{
String[] input1 = {
"1\t1\t2",
"1\t2\t3",
};
Util.createInputFile(cluster, "table_testForEachSameOriginColumn3", input1);
pigServer.registerQuery("a = load 'table_testForEachSameOriginColumn3' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("b = group a by a0;");
pigServer.registerQuery("c = foreach b generate flatten(a.(a1,a2)) as (b0, b1), flatten(a.(a1,a2)) as (b2, b3);");
pigServer.registerQuery("d = filter c by b0>b2;");
Iterator<Tuple> iter = pigServer.openIterator("d");
Tuple t = iter.next();
Assert.assertTrue(t.toString().contains("(2,3,1,2)"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1785
@Test
public void testAddingTwoBag() {
try {
pigServer.registerQuery("a = load '1.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.age*a.gpa);");
pigServer.openIterator("c");
} catch (Exception e) {
PigException pe = LogUtils.getPigException(e);
Assert.assertTrue(pe.getErrorCode()==1039);
Assert.assertTrue(pe.getMessage().contains("incompatible types"));
return;
}
Assert.fail();
}
public static class BagGenerateNoSchema extends EvalFunc<DataBag> {
@Override
public DataBag exec(Tuple input) throws IOException {
DataBag bg = DefaultBagFactory.getInstance().newDefaultBag();
bg.add(input);
return bg;
}
}
// See PIG-1813
@Test
public void testUDFNoSchemaPropagate1() throws Exception{
String[] input1 = {
"[key#1,key2#2]",
"[key#2,key2#3]",
};
Util.createInputFile(cluster, "table_testUDFNoSchemaPropagate1", input1);
pigServer.registerQuery("a = load 'table_testUDFNoSchemaPropagate1' as (a0:map[]);");
pigServer.registerQuery("b = foreach a generate " + BagGenerateNoSchema.class.getName() + "(*) as b0;");
pigServer.registerQuery("c = foreach b generate flatten(IdentityColumn(b0));");
pigServer.registerQuery("d = foreach c generate $0#'key';");
Iterator<Tuple> iter = pigServer.openIterator("d");
Tuple t = iter.next();
Assert.assertTrue(t.toString().contains("(1)"));
t = iter.next();
Assert.assertTrue(t.toString().contains("(2)"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1813
@Test
public void testUDFNoSchemaPropagate2() throws Exception{
String[] input1 = {
"[key#1,key2#2]",
"[key#2,key2#3]",
};
Util.createInputFile(cluster, "table_testUDFNoSchemaPropagate2", input1);
pigServer.registerQuery("a = load 'table_testUDFNoSchemaPropagate2' as (a0:map[]);");
pigServer.registerQuery("b = foreach a generate flatten(" + BagGenerateNoSchema.class.getName() + "(*)) as b0;");
pigServer.registerQuery("c = foreach b generate IdentityColumn(b0);");
pigServer.registerQuery("d = foreach c generate $0#'key';");
Iterator<Tuple> iter = pigServer.openIterator("d");
Tuple t = iter.next();
Assert.assertTrue(t.toString().contains("(1)"));
t = iter.next();
Assert.assertTrue(t.toString().contains("(2)"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1812
@Test
public void testLocalRearrangeInReducer() throws Exception{
String[] input1 = {
"1\t1",
"1\t1",
"1\t2",
};
String[] input2 = {
"1\t1",
};
Util.createInputFile(cluster, "table_testLocalRearrangeInReducer1", input1);
Util.createInputFile(cluster, "table_testLocalRearrangeInReducer2", input2);
pigServer.registerQuery("a = load 'table_testLocalRearrangeInReducer1' as (a0, a1);");
pigServer.registerQuery("b = distinct a;");
pigServer.registerQuery("c = load 'table_testLocalRearrangeInReducer2' as (c0, c1);");
pigServer.registerQuery("d = cogroup b by a0, c by c0;");
pigServer.registerQuery("e = foreach d { e1 = order c by c1; generate e1;};");
Iterator<Tuple> iter = pigServer.openIterator("e");
Tuple t = iter.next();
Assert.assertTrue(t.toString().contains("({(1,1)})"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1850
@Test
public void testProjectNullSchema() throws Exception{
String[] input = {
"0\t1",
};
Util.createInputFile(cluster, "table_testProjectNullSchema", input);
pigServer.registerQuery("a = load 'table_testProjectNullSchema';");
pigServer.registerQuery("b = foreach a generate ASIN($0), $1;");
pigServer.registerQuery("c = order b by $0;");
Iterator<Tuple> iter = pigServer.openIterator("c");
Tuple t = iter.next();
Assert.assertTrue(t.toString().contains("(0.0,1)"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1188
@Test
public void testSchemaDataNotMatch() throws Exception{
String[] input = {
"0\t1\t2",
"3\t4",
"5"
};
Util.createInputFile(cluster, "table_testSchemaDataNotMatch", input);
pigServer.registerQuery("a = load 'table_testSchemaDataNotMatch' as (a0, a1);");
Iterator<Tuple> iter = pigServer.openIterator("a");
Tuple t = iter.next();
Assert.assertTrue(t.size()==2);
Assert.assertTrue(t.get(0).toString().equals("0"));
Assert.assertTrue(t.get(1).toString().equals("1"));
t = iter.next();
Assert.assertTrue(t.size()==2);
Assert.assertTrue(t.get(0).toString().equals("3"));
Assert.assertTrue(t.get(1).toString().equals("4"));
t = iter.next();
Assert.assertTrue(t.size()==2);
Assert.assertTrue(t.get(0).toString().equals("5"));
Assert.assertTrue(t.get(1)==null);
Assert.assertFalse(iter.hasNext());
}
// See PIG-1912
@Test
public void testDuplicateLoadFuncSignature() throws Exception{
String[] input = {
"0\t1\ta",
};
Util.createInputFile(cluster, "table_testDuplicateLoadFuncSignature", input);
pigServer.setBatchOn();
pigServer.registerQuery("a = load 'table_testDuplicateLoadFuncSignature' as (a0, a1, a2);");
pigServer.registerQuery("b = foreach a generate a0, a1;");
pigServer.registerQuery("a = load 'table_testDuplicateLoadFuncSignature' as (a0, a1, a2);");
pigServer.registerQuery("c = foreach a generate a0, a2;");
pigServer.registerQuery("store b into 'testDuplicateLoadFuncSignatureOutput1';");
pigServer.registerQuery("store c into 'testDuplicateLoadFuncSignatureOutput2';");
pigServer.executeBatch();
pigServer.registerQuery("a = load 'testDuplicateLoadFuncSignatureOutput1';");
Iterator<Tuple> iter = pigServer.openIterator("a");
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("(0,1)"));
Assert.assertFalse(iter.hasNext());
pigServer.registerQuery("a = load 'testDuplicateLoadFuncSignatureOutput2';");
iter = pigServer.openIterator("a");
t = iter.next();
Assert.assertTrue(t.toString().equals("(0,a)"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-1927
@Test
public void testDereferencePartialAlias() throws Exception{
pigServer.registerQuery("a = load '1.txt' as (a0:int, a1);");
pigServer.registerQuery("b = group a by a0;");
pigServer.registerQuery("c = foreach b generate flatten(a);");
pigServer.registerQuery("d = cogroup c by (a0);");
pigServer.registerQuery("e = foreach d generate c.a0 as e0;");
pigServer.registerQuery("f = foreach e generate e0;");
// Shall not throw exception
pigServer.explain("f", System.out);
}
// See PIG-1866
@Test
public void testProjBagInTuple() throws Exception{
String[] input = {
"(1,{(one),(two)})",
};
Util.createInputFile(cluster, "table_testProjBagInTuple", input);
pigServer.registerQuery("a = load 'table_testProjBagInTuple' as (t : tuple(i: int, b1: bag { b_tuple : tuple ( b_str: chararray) }));");
pigServer.registerQuery("b = foreach a generate t.b1;");
Iterator<Tuple> iter = pigServer.openIterator("b");
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("({(one),(two)})"));
Assert.assertFalse(iter.hasNext());
}
//See PIG-1974
@Test
public void testCastMap() throws Exception{
String[] input = {
"([key#1])",
"([key#2])",
};
Util.createInputFile(cluster, "table_testCastMap", input);
pigServer.registerQuery("a = load 'table_testCastMap' as (m:map[]);");
pigServer.registerQuery("b = foreach a generate (map[int])m;");
pigServer.registerQuery("c = foreach b generate m#'key' + 1;");
Iterator<Tuple> iter = pigServer.openIterator("c");
Tuple t = iter.next();
Assert.assertEquals(t.get(0), 2);
t = iter.next();
Assert.assertEquals(t.get(0), 3);
Assert.assertFalse(iter.hasNext());
}
// See PIG-1979
@Test
public void testDereferenceUidBug() throws Exception{
String[] input1 = {
"0\t0\t{(1,2)}\t1",
};
String[] input2 = {
"0\t0",
};
Util.createInputFile(cluster, "table_testDereferenceUidBug1", input1);
Util.createInputFile(cluster, "table_testDereferenceUidBug2", input2);
pigServer.registerQuery("a = load 'table_testDereferenceUidBug1' as (a0:int, a1:int, a2:{t:(i0:int, i1:int)}, a3:int);");
pigServer.registerQuery("b = foreach a generate a0, a1, a0+a1 as sum, a2.i0 as a2, a3;");
pigServer.registerQuery("c = filter b by sum==0;");
pigServer.registerQuery("d = load 'table_testDereferenceUidBug2' as (d0:int, d1:int);");
pigServer.registerQuery("e = join c by a0, d by d0;");
pigServer.registerQuery("f = foreach e generate c::a2;");
Iterator<Tuple> iter = pigServer.openIterator("f");
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("({(1)})"));
Assert.assertFalse(iter.hasNext());
}
static public class UDFWithNonStandardType extends EvalFunc<Tuple>{
@Override
public Tuple exec(Tuple input) throws IOException {
Tuple t = TupleFactory.getInstance().newTuple();
t.append(new ArrayList<Integer>());
return t;
}
}
// See PIG-1826
@Test
public void testNonStandardData() throws Exception{
String[] input1 = {
"0",
};
Util.createInputFile(cluster, "table_testNonStandardData", input1);
pigServer.registerQuery("a = load 'table_testNonStandardData' as (a0);");
pigServer.registerQuery("b = foreach a generate " + UDFWithNonStandardType.class.getName() + "(a0);");
try {
pigServer.openIterator("b");
Assert.fail();
} catch (Exception e) {
// Tez does not construct exceptions from stacktrace as it will have multiple ones.
// So e.getCause().getCause() will be null
Throwable cause = e.getCause().getCause() == null ? e.getCause() : e.getCause().getCause();
String message = cause.getMessage();
Assert.assertTrue(message.contains(ArrayList.class.getName()));
}
}
// See PIG-1826
@Test
public void testNonStandardDataWithoutFetch() throws Exception{
Properties props = pigServer.getPigContext().getProperties();
props.setProperty(PigConfiguration.PIG_OPT_FETCH, "false");
String[] input1 = {
"0",
};
try {
Util.createInputFile(cluster, "table_testNonStandardDataWithoutFetch", input1);
pigServer.registerQuery("a = load 'table_testNonStandardDataWithoutFetch' as (a0);");
pigServer.registerQuery("b = foreach a generate " + UDFWithNonStandardType.class.getName() + "(a0);");
try {
pigServer.openIterator("b");
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains(ArrayList.class.getName()));
}
}
finally {
props.setProperty(PigConfiguration.PIG_OPT_FETCH, "true");
}
}
// See PIG-2078
@Test
public void testProjectNullBag() throws Exception{
String[] input1 = {
"{(1)}\t2",
"\t3"
};
HashSet<String> optimizerRules = new HashSet<String>();
optimizerRules.add("MergeForEach");
pigServer.getPigContext().getProperties().setProperty(
PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
ObjectSerializer.serialize(optimizerRules));
Util.createInputFile(cluster, "table_testProjectNullBag", input1);
pigServer.registerQuery("a = load 'table_testProjectNullBag' as (a0:bag{}, a1:int);");
pigServer.registerQuery("b = foreach a generate a0;");
Iterator<Tuple> iter = pigServer.openIterator("b");
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("({(1)})"));
t = iter.next();
Assert.assertTrue(t.toString().equals("()"));
Assert.assertFalse(iter.hasNext());
pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY);
}
// See PIG-2159
@Test
public void testUnionOnSchemaUidGeneration() throws Exception{
String[] input1 = {
"100,101,102,103,104,105",
"110,111,112,113,114,115"
};
String[] input2 = {
"200,201,202,203,204,205",
"210,211,212,213,214,215"
};
String[] input0 = {
"200,201,202,203,204,205",
"210,211,212,213,214,215"
};
Util.createInputFile(cluster, "table_testUnionOnSchemaUidGeneration1", input1);
Util.createInputFile(cluster, "table_testUnionOnSchemaUidGeneration2", input2);
Util.createInputFile(cluster, "table_testUnionOnSchemaUidGeneration0", input0);
pigServer.registerQuery("A = load 'table_testUnionOnSchemaUidGeneration1' using PigStorage(',') as (f1:int,f2:int,f3:int,f4:long,f5:double);");
pigServer.registerQuery("B = load 'table_testUnionOnSchemaUidGeneration2' using PigStorage(',') as (f1:int,f2:int,f3:int,f4:long,f5:double);");
pigServer.registerQuery("C = load 'table_testUnionOnSchemaUidGeneration0' using PigStorage(',') as (f1:int,f2:int,f3:int);");
pigServer.registerQuery("U = UNION ONSCHEMA A,B;");
pigServer.registerQuery("J = join C by (f1,f2,f3) LEFT OUTER, U by (f1,f2,f3);");
pigServer.registerQuery("Porj = foreach J generate C::f1 as f1 ,C::f2 as f2,C::f3 as f3,U::f4 as f4,U::f5 as f5;");
pigServer.registerQuery("G = GROUP Porj by (f1,f2,f3,f5);");
pigServer.registerQuery("Final = foreach G generate SUM(Porj.f4) as total;");
Iterator<Tuple> iter = pigServer.openIterator("Final");
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("(203)"));
t = iter.next();
Assert.assertTrue(t.toString().equals("(213)"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-2185
@Test
public void testProjectEmptyBag() throws Exception{
String[] input = {
"{(12)}",
"{(23)}",
""
};
Util.createInputFile(cluster, "table_testProjectEmptyBag", input);
pigServer.registerQuery("A = load 'table_testProjectEmptyBag' as (bg:bag{});");
pigServer.registerQuery("B = FOREACH A { x = FILTER bg BY $0 == '12'; GENERATE x; };");
Iterator<Tuple> iter = pigServer.openIterator("B");
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("({(12)})"));
t = iter.next();
Assert.assertTrue(t.toString().equals("({})"));
Assert.assertTrue(iter.hasNext());
t = iter.next();
Assert.assertTrue(t.toString().equals("({})"));
Assert.assertFalse(iter.hasNext());
}
// See PIG-2231
@Test
public void testLimitFlatten() throws Exception{
String[] input = {
"1\tA",
"1\tB",
"2\tC",
"3\tD",
"3\tE",
"3\tF"
};
Util.createInputFile(cluster, "table_testLimitFlatten", input);
pigServer.registerQuery("data = load 'table_testLimitFlatten' as (k,v);");
pigServer.registerQuery("grouped = GROUP data BY k;");
if (Util.isSparkExecType(cluster.getExecType())) {
pigServer.registerQuery("grouped = ORDER grouped BY group;");
}
pigServer.registerQuery("selected = LIMIT grouped 2;");
pigServer.registerQuery("flattened = FOREACH selected GENERATE FLATTEN (data);");
Iterator<Tuple> iter = pigServer.openIterator("flattened");
String[] expected = new String[] {"(1,A)", "(1,B)", "(2,C)"};
Util.checkQueryOutputsAfterSortRecursive(iter, expected,
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")));
}
// See PIG-2237
@Test
public void testLimitAutoReducer() throws Exception{
String[] input = {
"1\tA",
"4\tB",
"2\tC",
"3\tD",
"6\tE",
"5\tF"
};
Util.createInputFile(cluster, "table_testLimitAutoReducer", input);
pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "16");
pigServer.registerQuery("A = load 'table_testLimitAutoReducer';");
pigServer.registerQuery("B = order A by $0;");
pigServer.registerQuery("C = limit B 2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
Tuple t = iter.next();
Assert.assertTrue(t.toString().equals("(1,A)"));
t = iter.next();
Assert.assertTrue(t.toString().equals("(2,C)"));
Assert.assertFalse(iter.hasNext());
}
@SuppressWarnings("unchecked")
@Test
public void testCrossAfterGroupAll() throws Exception{
String[] input = {
"1\tA",
"2\tB",
"3\tC",
"4\tD",
};
Util.createInputFile(cluster, "table_testCrossAfterGroupAll", input);
try {
pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "40");
pigServer.registerQuery("A = load 'table_testCrossAfterGroupAll' as (a0:int, a1:chararray);");
pigServer.registerQuery("B = group A all;");
pigServer.registerQuery("C = foreach B generate COUNT(A);");
pigServer.registerQuery("D = cross A, C;");
Path output = FileLocalizer.getTemporaryPath(pigServer.getPigContext());
ExecJob job = pigServer.store("D", output.toString());
FileSystem fs = output.getFileSystem(cluster.getConfiguration());
FileStatus[] partFiles = fs.listStatus(output, new PathFilter() {
@Override
public boolean accept(Path path) {
if (path.getName().startsWith("part")) {
return true;
}
return false;
}
});
if (Util.isSparkExecType(cluster.getExecType())) {
// TODO: Fix this when we implement auto-parallelism in Spark
Assert.assertTrue(partFiles.length == 1);
} else {
// auto-parallelism is 2 in MR, 20 in Tez, so check >=2
Assert.assertTrue(partFiles.length >= 2);
}
// Check the output
Iterator<Tuple> iter = job.getResults();
List<Tuple> results = new ArrayList<Tuple>();
while (iter.hasNext()) {
results.add(iter.next());
}
Collections.sort(results);
Assert.assertEquals(4, results.size());
Assert.assertEquals("(1,A,4)", results.get(0).toString());
Assert.assertEquals("(2,B,4)", results.get(1).toString());
Assert.assertEquals("(3,C,4)", results.get(2).toString());
Assert.assertEquals("(4,D,4)", results.get(3).toString());
} finally {
pigServer.getPigContext().getProperties().remove("pig.exec.reducers.bytes.per.reducer");
}
}
// see PIG-4392
@Test
public void testRankWithEmptyReduce() throws Exception {
Util.createInputFile(cluster, "table_testRankWithEmptyReduce", new String[]{"1\t2\t3", "4\t5\t6", "7\t8\t9"});
pigServer.setDefaultParallel(4);
pigServer.registerQuery("d = load 'table_testRankWithEmptyReduce' as (a:int, b:int, c:int);");
pigServer.registerQuery("e = rank d by a parallel 4;");
Iterator<Tuple> iter = pigServer.openIterator("e");
Collection<String> results = new HashSet<String>();
results.add("(1,1,2,3)");
results.add("(2,4,5,6)");
results.add("(3,7,8,9)");
Assert.assertTrue(results.contains(iter.next().toString()));
Assert.assertTrue(results.contains(iter.next().toString()));
Assert.assertTrue(results.contains(iter.next().toString()));
Assert.assertFalse(iter.hasNext());
}
}