blob: 9d910691dc712734a5865313593eb28329c8153b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
import org.apache.pig.FilterFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadPushDown;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor;
import org.apache.pig.test.utils.TestHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestPruneColumn {
private PigServer pigServer;
File tmpFile1;
File tmpFile2;
File tmpFile3;
File tmpFile4;
File tmpFile5;
File tmpFile6;
File tmpFile7;
File tmpFile8;
File tmpFile9;
File tmpFile10;
File tmpFile11;
File tmpFile12;
File tmpFile13;
File logFile;
private static final String simpleEchoStreamingCommand;
static {
String quote = "'";
if (Util.WINDOWS) {
quote = "\"";
}
simpleEchoStreamingCommand = "perl -ne " + quote + "print $_" + quote;
}
static public class MyFilterFunc extends FilterFunc {
@Override
public Boolean exec(Tuple input) {
return true;
}
}
@Before
public void setUp() throws Exception{
Logger logger = Logger.getLogger(ColumnPruneVisitor.class);
logger.removeAllAppenders();
logger.setLevel(Level.INFO);
SimpleLayout layout = new SimpleLayout();
logFile = File.createTempFile("log", "");
FileAppender appender = new FileAppender(layout, logFile.toString(), false, false, 0);
logger.addAppender(appender);
pigServer = new PigServer(Util.getLocalTestMode());
tmpFile1 = File.createTempFile("prune", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile1));
ps.println("1\t2\t3");
ps.println("2\t5\t2");
ps.close();
tmpFile2 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile2));
ps.println("1\t1");
ps.println("2\t2");
ps.close();
tmpFile3 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile3));
ps.println("1\t[key1#1,key2#2]");
ps.println("2\t[key1#2,key2#4]");
ps.close();
tmpFile4 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile4));
ps.println("1\t2\t3");
ps.println("1\t2\t3");
ps.close();
tmpFile5 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile5));
ps.println("1\t2\t3\t4");
ps.println("2\t3\t4\t5");
ps.close();
tmpFile6 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile6));
ps.println("\t2\t3");
ps.println("2\t3\t4");
ps.close();
tmpFile7 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile7));
ps.println("1\t1\t1");
ps.println("2\t2\t2");
ps.close();
tmpFile8 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile8));
ps.println("1\t2\t3\t4");
ps.println("2\t5\t2\t3");
ps.close();
tmpFile9 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile9));
ps.println("1\t[key1#1,key2#2]\t[key3#8,key4#9]");
ps.println("2\t[key1#2,key2#4]\t[key3#8,key4#9]");
ps.close();
tmpFile10 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile10));
ps.println("1\t[1#1,2#1]\t2");
ps.close();
tmpFile11 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile11));
ps.println("1\t2\t3");
ps.println("1\t3\t2");
ps.println("2\t5\t2");
ps.close();
tmpFile12 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile12));
ps.println("[key1#1,key2#2,cond#1]");
ps.println("[key1#2,key2#3,cond#1]");
ps.close();
tmpFile13 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile13));
ps.println("3\ti");
ps.println("3\ti");
ps.println("1\ti");
ps.println("2\ti");
ps.println("2\ti");
ps.println("3\ti");
ps.close();
}
@After
public void tearDown() throws Exception{
tmpFile1.delete();
tmpFile2.delete();
tmpFile3.delete();
tmpFile4.delete();
tmpFile5.delete();
tmpFile6.delete();
tmpFile7.delete();
tmpFile8.delete();
tmpFile9.delete();
tmpFile10.delete();
tmpFile11.delete();
tmpFile12.delete();
logFile.delete();
}
public boolean checkLogFileMessage(String[] messages) {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(logFile));
List<String> logMessages=new ArrayList<String>();
String line;
while ((line=reader.readLine())!=null)
{
logMessages.add(line);
}
if (logMessages.size() > 0) {
logMessages = TestHelper.sortSubFields(logMessages);
}
// Check if all messages appear in the log
for (int i=0;i<messages.length;i++)
{
boolean found = false;
for (int j=0;j<logMessages.size();j++)
if (logMessages.get(j).contains(messages[i])) {
found = true;
break;
}
if (!found)
return false;
}
// Check no other log besides messages
for (int i=0;i<logMessages.size();i++) {
boolean found = false;
for (int j=0;j<messages.length;j++) {
if (logMessages.get(i).contains(messages[j])) {
found = true;
break;
}
}
if (!found) {
if (logMessages.get(i).contains("Columns pruned for")||
logMessages.get(i).contains("Map key required for")) {
return false;
}
}
}
return true;
} catch (IOException e) {
return false;
}
}
public boolean emptyLogFileMessage()
{
if (!logFile.exists())
return true;
BufferedReader reader = null;
String line;
try {
reader = new BufferedReader(new FileReader(logFile));
while ((line=reader.readLine())!=null)
{
if (line!=null && !line.equals(""))
return false;
}
return true;
}
catch (IOException e) {
return false;
}
}
@Test
public void testLoadForEach1() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals(2, t.get(0));
assertEquals(3, t.get(1));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals(5, t.get(0));
assertEquals(2, t.get(1));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
}
@Test
public void testLoadForEach2() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals(1, t.get(0));
assertEquals(3, t.get(1));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals(2, t.get(0));
assertEquals(2, t.get(1));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
@Test
public void testLoadForEach3() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a1;");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals(1, t.get(0));
assertEquals(2, t.get(1));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals(2, t.get(0));
assertEquals(5, t.get(1));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2"}));
}
@Test
public void testJoin1() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
pigServer.registerQuery("C = join A by a1, B by b1;");
pigServer.registerQuery("D = foreach C generate a1, a2, b0, b1;");
Iterator<Tuple> iter = pigServer.openIterator("D");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(4, t.size());
assertEquals(2, t.get(0));
assertEquals(3, t.get(1));
assertEquals(2, t.get(2));
assertEquals(2, t.get(3));
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
}
@Test
public void testJoin2() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
pigServer.registerQuery("C = join A by a1, B by b1;");
pigServer.registerQuery("D = foreach C generate a1, a2, b1;");
Iterator<Tuple> iter = pigServer.openIterator("D");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(3, t.size());
assertEquals(2, t.get(0));
assertEquals(3, t.get(1));
assertEquals(2, t.get(2));
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
"Columns pruned for B: $0"}));
}
@Test
public void testForEachFilter() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = filter A by a2==3;");
pigServer.registerQuery("C = foreach B generate a0, a1;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals(1, t.get(0));
assertEquals(2, t.get(1));
assertFalse(iter.hasNext());
assertTrue(emptyLogFileMessage());
}
@Test
public void testForEach1() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a1+a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals(1, t.get(0));
assertEquals(5, t.get(1));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals(2, t.get(0));
assertEquals(7, t.get(1));
assertFalse(iter.hasNext());
assertTrue(emptyLogFileMessage());
}
@Test
public void testForEach2() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0 as b0, *;");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(4, t.size());
assertEquals(1, t.get(0));
assertEquals(1, t.get(1));
assertEquals(2, t.get(2));
assertEquals(3, t.get(3));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(4, t.size());
assertEquals(2, t.get(0));
assertEquals(2, t.get(1));
assertEquals(5, t.get(2));
assertEquals(2, t.get(3));
assertFalse(iter.hasNext());
assertTrue(emptyLogFileMessage());
}
@Test
public void testSplit1() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("split A into B if $0<=1, C if $0>1;");
pigServer.registerQuery("D = foreach B generate $1;");
Iterator<Tuple> iter = pigServer.openIterator("D");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2"}));
}
@Test
public void testSplit2() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("split A into B if $0<=1, C if $0>1;");
pigServer.registerQuery("D = foreach B generate $1;");
Iterator<Tuple> iter = pigServer.openIterator("D");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2"}));
}
@Test
public void testForeachNoSchema1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("B = foreach A generate $1, $2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals("2", t.get(0).toString());
assertEquals("3", t.get(1).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals("5", t.get(0).toString());
assertEquals("2", t.get(1).toString());
assertTrue(emptyLogFileMessage());
}
@Test
public void testForeachNoSchema2() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("B = foreach A generate $1, 'aoeuaoeu';");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals("2", t.get(0).toString());
assertEquals("aoeuaoeu", t.get(1).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals("5", t.get(0).toString());
assertEquals("aoeuaoeu", t.get(1).toString());
assertTrue(emptyLogFileMessage());
}
@Test
public void testCoGroup1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
pigServer.registerQuery("C = cogroup A by $1, B by $1;");
pigServer.registerQuery("D = foreach C generate AVG($1.$1);");
Iterator<Tuple> iter = pigServer.openIterator("D");
String[] expectedRes = new String[]{"()","(2.0)","(5.0)"};
Schema s = pigServer.dumpSchema("D");
Util.checkQueryOutputsAfterSortRecursive(iter,expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0"}));
}
@Test
public void testCoGroup2() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
pigServer.registerQuery("B = group A all;");
pigServer.registerQuery("C = foreach B generate $1;");
Iterator<Tuple> iter = pigServer.openIterator("C");
String[] expected = new String[] {
"({(1,2,3),(2,5,2)})"
};
assertTrue(iter.hasNext());
Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
assertTrue(emptyLogFileMessage());
}
@Test
public void testCoGroup3() throws Exception {
pigServer.registerQuery("A = load '" + Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
pigServer.registerQuery("B = group A by $1;");
pigServer.registerQuery("C = foreach B generate $1, '1';");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
String[] expected = new String[] {
"({(1,2,3)},1)",
"({(2,5,2)},1)"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
}
@Test
public void testCoGroup4() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C generate $1.$1, $2.$1;");
Iterator<Tuple> iter = pigServer.openIterator("D");
assertTrue(iter.hasNext());
String[] expected = new String[] {
"({},{(1)})",
"({(2)},{(2)})",
"({(5)},{})"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")),
Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
}
@Test
public void testCoGroup5() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = group A by (a0, a1);");
pigServer.registerQuery("C = foreach B generate flatten(group);");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals("1", t.get(0).toString());
assertEquals("2", t.get(1).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals("2", t.get(0).toString());
assertEquals("5", t.get(1).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2"}));
}
@Test
public void testDistinct1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = distinct A;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(emptyLogFileMessage());
}
@Test
public void testStream1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = stream A through `" + simpleEchoStreamingCommand + "`;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(emptyLogFileMessage());
}
@Test
public void testBinCond1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
pigServer.registerQuery("B = foreach A generate ($1 == '2'? $2 : $3);");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("3", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("5", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
}
@Test
public void testCoGroup6() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C generate A, flatten(B.($0, $1));");
Iterator<Tuple> iter = pigServer.openIterator("D");
assertTrue(iter.hasNext());
String[] expected = new String[] {
"({},1,1)",
"({(1,2,3)},2,2)"
};
Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
assertTrue(emptyLogFileMessage());
}
@Test
public void testCoGroup7() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C {B = order B by $0;generate FLATTEN(A), B.($1);};");
Iterator<Tuple> iter = pigServer.openIterator("D");
assertTrue(iter.hasNext());
String[] expected = new String[] {
"(1,2,3,{(2)})",
"(2,5,2,{})"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")),
Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
}
@Test
public void testCross1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = cross A, B;");
pigServer.registerQuery("D = foreach C generate $0, $3;");
Iterator<Tuple> iter = pigServer.openIterator("D");
Collection<String> results = new HashSet<String>();
results.add("(1,1)");
results.add("(2,1)");
results.add("(1,2)");
results.add("(2,2)");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertTrue(results.contains(t.toString()));
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
"Columns pruned for B: $1"}));
}
@Test
public void testUnion1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
pigServer.registerQuery("C = union A, B;");
pigServer.registerQuery("D = foreach C generate $0, $2;");
Iterator<Tuple> iter = pigServer.openIterator("D");
Collection<String> results = new HashSet<String>();
results.add("(1,3)");
results.add("(2,2)");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
results.contains(t.toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
results.contains(t.toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
results.contains(t.toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
results.contains(t.toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
"Columns pruned for B: $1"}));
}
@Test
public void testFRJoin1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = join A by $0, B by $0 using 'replicated';");
pigServer.registerQuery("D = foreach C generate $0, $3;");
Iterator<Tuple> iter = pigServer.openIterator("D");
assertTrue(iter.hasNext());
String[] expected = new String[] {
"(1,1)",
"(2,2)"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")),
Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
"Columns pruned for B: $1"}));
}
@Test
public void testFilter1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by a1;");
pigServer.registerQuery("C = limit B 10;");
pigServer.registerQuery("D = foreach C generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("D");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2"}));
}
@Test
public void testFilter2() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = filter A by a0+a2 == 4;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
@Test
public void testOrderBy1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by $0;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2"}));
}
@Test
public void testOrderBy2() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by *;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(emptyLogFileMessage());
}
@Test
public void testCogroup8() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = group A by *;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
String[] expected = new String[] {
"((1,2,3))",
"((2,5,2))"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
}
@Test
public void testJoin3() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
pigServer.registerQuery("C = join A by *, B by * using 'replicated';");
pigServer.registerQuery("D = foreach C generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("D");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(emptyLogFileMessage());
}
@Test
public void testLoadForEach4() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate *;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2"}));
}
@Test
public void testForEachUDF() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
pigServer.registerQuery("B = foreach A generate StringSize(*);");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(emptyLogFileMessage());
}
@Test
public void testOutJoin1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile6.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
pigServer.registerQuery("C = join A by $0 left, B by $0;");
pigServer.registerQuery("D = foreach C generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("D");
Collection<String> results = new HashSet<String>();
results.add("(2)");
results.add("()");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertTrue(results.contains(t.toString()));
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
"Columns pruned for B: $1, $2"}));
}
@Test
public void testFilter3() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = filter A by " + MyFilterFunc.class.getName() + "(*) ;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(emptyLogFileMessage());
}
@Test
public void testMapKey1() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate a0, a1#'key1';");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals(1, t.get(0));
assertEquals("1", t.get(1).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals(2, t.get(0));
assertEquals("2", t.get(1).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Map key required for A: $1->[key1]"}));
}
@Test
public void testMapKey2() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate a1, a1#'key1';");
pigServer.registerQuery("C = foreach B generate $0#'key2', $1;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals("2", t.get(0).toString());
assertEquals("1", t.get(1).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals("4", t.get(0).toString());
assertEquals("2", t.get(1).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
"Map key required for A: $1->[key1, key2]"}));
}
@Test
public void testMapKey3() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate a1, a1#'key1';");
pigServer.registerQuery("C = group B all;");
Iterator<Tuple> iter = pigServer.openIterator("C");
String[] expected = new String[] {
"(all,{([key2#2,key1#1],1),([key2#4,key1#2],2)})"
};
Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
}
@Test
public void testMapKey4() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = limit A 10;");
pigServer.registerQuery("C = foreach B generate $0, $1#'key1';");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals("1", t.get(0).toString());
assertEquals("1", t.get(1).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals("2", t.get(0).toString());
assertEquals("2", t.get(1).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Map key required for A: $1->[key1]"}));
}
@Test
public void testMapKey5() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate $0, $1#'key1';");
pigServer.registerQuery("C = stream B through `" + simpleEchoStreamingCommand + "`;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals("1", t.get(0).toString());
assertEquals("1", t.get(1).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals("2", t.get(0).toString());
assertEquals("2", t.get(1).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Map key required for A: $1->[key1]"}));
}
@Test
public void testMapKeyInSplit1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile12.toString(), pigServer.getPigContext()) + "' as (m:map[]);");
pigServer.registerQuery("B = foreach A generate m#'key1' as key1;");
pigServer.registerQuery("C = foreach A generate m#'key2' as key2;");
pigServer.registerQuery("D = join B by key1, C by key2;");
Iterator<Tuple> iter = pigServer.openIterator("D");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals("2", t.get(0).toString());
assertEquals("2", t.get(1).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Map key required for A: $0->[key1, key2]"}));
}
@SuppressWarnings("rawtypes")
@Test
public void testMapKeyInSplit2() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile12.toString(), pigServer.getPigContext()) + "' as (m:map[]);");
pigServer.registerQuery("B = filter A by m#'cond'==1;");
pigServer.registerQuery("C = filter B by m#'key1'==1;");
pigServer.registerQuery("D = filter B by m#'key2'==2;");
pigServer.registerQuery("E = join C by m#'key1', D by m#'key1';");
Iterator<Tuple> iter = pigServer.openIterator("E");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals("1", ((Map)t.get(0)).get("key1").toString());
assertEquals("2", ((Map)t.get(0)).get("key2").toString());
assertEquals("1", ((Map)t.get(0)).get("cond").toString());
assertEquals("1", ((Map)t.get(1)).get("key1").toString());
assertEquals("2", ((Map)t.get(1)).get("key2").toString());
assertEquals("1", ((Map)t.get(1)).get("cond").toString());
assertFalse(iter.hasNext());
assertTrue(this.emptyLogFileMessage());
}
@Test
public void testConstantPlan() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate 1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals("1", t.get(0).toString());
assertEquals("3", t.get(1).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals("1", t.get(0).toString());
assertEquals("2", t.get(1).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $1"}));
}
@Test
public void testPlainPlan() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("B = order A by $0;");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(3, t.size());
assertEquals("1", t.get(0).toString());
assertEquals("2", t.get(1).toString());
assertEquals("3", t.get(2).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(3, t.size());
assertEquals("2", t.get(0).toString());
assertEquals("5", t.get(1).toString());
assertEquals("2", t.get(2).toString());
assertFalse(iter.hasNext());
assertTrue(emptyLogFileMessage());
}
@Test
public void testBinStorage1() throws Exception {
// get a temp intermediate filename
File intermediateFile = File.createTempFile("intemediate", "txt");
intermediateFile.delete(); // delete since we don't want the file to be present
String clusterPath = Util.removeColon(intermediateFile.getAbsolutePath());
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.store("A", clusterPath, "BinStorage()");
pigServer.registerQuery("A = load '"+ Util.encodeEscape(clusterPath)
+ "' using BinStorage() as (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a0;");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2"}));
}
@Test
public void testBinStorage2() throws Exception {
File intermediateFile = File.createTempFile("intemediate", "txt");
intermediateFile.delete(); // delete since we don't want the file to be present
String clusterPath = Util.removeColon(intermediateFile.getAbsolutePath());
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.store("A", clusterPath, "BinStorage()");
pigServer.registerQuery("A = load '"+ Util.encodeEscape(clusterPath)
+ "' using BinStorage() as (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a2, a0, a1;");
pigServer.registerQuery("C = foreach B generate a0, a2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals("1", t.get(0).toString());
assertEquals("3", t.get(1).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals("2", t.get(0).toString());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
@Test
public void testProjectCastKeyLookup() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
+ "' as (a0, a1);");
pigServer.registerQuery("B = foreach A generate a1#'key1';");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
"Map key required for A: $1->[key1]"}));
}
@Test
public void testFlattenMapCantPruneKeys() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
+ "' as (a0, a1:map[int]);");
pigServer.registerQuery("B = foreach A generate flatten(a1);");
pigServer.registerQuery("B1 = filter B by a1::key == 'key1';");
pigServer.registerQuery("C = foreach B1 generate a1::value;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("1", t.get(0).toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("2", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
}
@Test
public void testCrossAtLeastOneColumnOneInput() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
pigServer.registerQuery("C = cross A, B;");
pigServer.registerQuery("D = foreach C generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("D");
Collection<String> results = new HashSet<String>();
results.add("(1)");
results.add("(2)");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertTrue(results.contains(t.toString()));
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
"Columns pruned for B: $1"}));
}
@Test
public void testComplex1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile7.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile8.toString(), pigServer.getPigContext()) + "' as (b0, b1, b2, b3);");
pigServer.registerQuery("B1 = foreach B generate b2, b0+b3;");
pigServer.registerQuery("C = join A by $0, B1 by $0;");
pigServer.registerQuery("D = order C by $4;");
pigServer.registerQuery("E = foreach D generate $0, $2;");
pigServer.registerQuery("F = filter E by $1<10;");
pigServer.registerQuery("G = group F by $0;");
pigServer.registerQuery("H = foreach G generate $1;");
Iterator<Tuple> iter = pigServer.openIterator("H");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("{(2,2)}", t.get(0).toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
"Columns pruned for B: $1"}));
}
@Test
public void testComplex2() throws Exception {
HashSet<String> optimizerRules = new HashSet<String>();
optimizerRules.add("PushUpFilter");
pigServer.getPigContext().getProperties().setProperty(
PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
ObjectSerializer.serialize(optimizerRules));
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile13.toString(), pigServer.getPigContext()) + "' as (a:int, b:chararray);");
pigServer.registerQuery("B = FOREACH A generate a;");
pigServer.registerQuery("C = GROUP B by a;");
pigServer.registerQuery("D = filter C by group > 0 and group < 100;");
pigServer.registerQuery("E = FOREACH D {F = LIMIT B 1 ;GENERATE B.a as mya, FLATTEN(F.a) as setting;}");
pigServer.registerQuery("G = FOREACH E GENERATE mya, setting as setting;");
Iterator<Tuple> iter = pigServer.openIterator("G");
assertTrue(iter.hasNext());
String[] expected = new String[] {
"({(1)},1)",
"({(2),(2)},2)",
"({(3),(3),(3)},3)"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("G")),
Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY);
}
@Test
public void testCoGroup8() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C generate $0, $1;");
Iterator<Tuple> iter = pigServer.openIterator("D");
assertTrue(iter.hasNext());
String[] expected = new String[] {
"(1,{})",
"(2,{(1,2,3)})",
"(5,{(2,5,2)})"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D"))
, Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0"}));
}
// See PIG-1128
@Test
public void testUserDefinedSchema() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS ( c1 : chararray, c2 : int);");
pigServer.registerQuery("B = foreach A generate c1 as c1 : chararray, c2 as c2 : int, 'CA' as state : chararray;");
pigServer.registerQuery("C = foreach B generate c1 as c1 : chararray;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals("(1)", t.toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals("(2)", t.toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
// See PIG-1127
@Test
public void testSharedSchemaObject() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile10.toString(), pigServer.getPigContext()) + "' AS (a0, a1:map[], a2);");
pigServer.registerQuery("B = foreach A generate a1;");
pigServer.registerQuery("C = limit B 10;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals("([1#1, 2#1])", TestHelper.sortString("\\[(.*)\\]", t.toString(), ","));
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $2"}));
}
// See PIG-1142
@Test
public void testJoin4() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
pigServer.registerQuery("C = join A by a2, B by b2;");
pigServer.registerQuery("D = foreach C generate $0, $1, $2;");
Iterator<Tuple> iter = pigServer.openIterator("D");
Collection<String> results = new HashSet<String>();
results.add("(1,2,3)");
results.add("(2,5,2)");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(results.contains(t.toString()));
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0, $1"}));
}
@Test
public void testFilter4() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
pigServer.registerQuery("B = filter A by a2==3;");
pigServer.registerQuery("C = foreach B generate $2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals("(3)", t.toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $1"}));
}
@Test
public void testSplit3() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
pigServer.registerQuery("split A into B if a2==3, C if a2<3;");
pigServer.registerQuery("C = foreach B generate $2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals("(3)", t.toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $1"}));
}
@Test
public void testOrderBy3() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by a2;");
pigServer.registerQuery("C = foreach B generate a2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertEquals("(2)", t.toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertEquals("(3)", t.toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $1"}));
}
@Test
public void testCogroup9() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (c0, c1, c2);");
pigServer.registerQuery("D = cogroup A by a2, B by b2, C by c2;");
pigServer.registerQuery("E = foreach D generate $1, $2;");
Iterator<Tuple> iter = pigServer.openIterator("E");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals("({(2,5,2)},{(2,5,2)})", t.toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertEquals("({(1,2,3)},{(1,2,3)})", t.toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for C: $0, $1"}));
}
// See PIG-1165
@Test
public void testOrderbyWrongSignature() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
pigServer.registerQuery("C = order A by a1;");
pigServer.registerQuery("D = join C by a1, B by b0;");
pigServer.registerQuery("E = foreach D generate a1, b0, b1;");
Iterator<Tuple> iter = pigServer.openIterator("E");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(3, t.size());
assertEquals("(2,2,2)", t.toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $2"}));
}
// See PIG-1146
@Test
public void testUnionMixedPruning() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b2);");
pigServer.registerQuery("C = foreach B generate b0, 'hello', b2;");
pigServer.registerQuery("D = union A, C;");
pigServer.registerQuery("E = foreach D generate $0, $2;");
Iterator<Tuple> iter = pigServer.openIterator("E");
Collection<String> results = new HashSet<String>();
results.add("(1,3)");
results.add("(2,2)");
results.add("(1,1)");
results.add("(2,2)");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(2, t.size());
assertTrue(results.contains(t.toString()));
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
// See PIG-1176
@Test
public void testUnionMixedSchemaPruning() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a0;;");
pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("D = foreach C generate $0;");
pigServer.registerQuery("E = union B, D;");
Iterator<Tuple> iter = pigServer.openIterator("E");
Collection<String> results = new HashSet<String>();
results.add("(1)");
results.add("(2)");
results.add("(1)");
results.add("(2)");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(1, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
assertEquals(1, t.size());
assertTrue(results.contains(t.toString()));
assertFalse(iter.hasNext());
assertTrue(emptyLogFileMessage());
}
// See PIG-1184
@Test
public void testForEachFlatten() throws Exception {
File inputFile = Util.createInputFile("table_testForEachFlatten", "", new String[]{"oiue\tM\t{(3),(4)}\t{(toronto),(montreal)}"});
pigServer.registerQuery("A = load '"+Util.encodeEscape(inputFile.toString())+"' as (a0:chararray, a1:chararray, a2:bag{t:tuple(id:chararray)}, a3:bag{t:tuple(loc:chararray)});");
pigServer.registerQuery("B = foreach A generate a0, a1, flatten(a2), flatten(a3), 10;");
pigServer.registerQuery("C = foreach B generate a0, $4;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals("(oiue,10)", t.toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals("(oiue,10)", t.toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals("(oiue,10)", t.toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals("(oiue,10)", t.toString());
assertFalse(iter.hasNext());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
// See PIG-1210
@Test
public void testFieldsToReadDuplicatedEntry() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a0+a0, a1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals("(2.0,2,3)", t.toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals("(4.0,5,2)", t.toString());
assertFalse(iter.hasNext());
assertTrue(emptyLogFileMessage());
}
// See PIG-1272
@Test
public void testSplit4() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a0;");
pigServer.registerQuery("C = join A by a0, B by a0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
String[] expected = new String[] {
"(1,2,3,1)",
"(2,5,2,2)"
};
Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
}
@Test
public void testSplit5() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile11.toString(), pigServer.getPigContext()) + "' AS (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a1;");
pigServer.registerQuery("C = join A by a0, B by a0;");
pigServer.registerQuery("D = filter C by A::a1>=B::a1;");
Iterator<Tuple> iter = pigServer.openIterator("D");
String [] expected = new String[] {
"(1,2,3,1,2)",
"(1,3,2,1,2)",
"(1,3,2,1,3)",
"(2,5,2,2,5)"
};
Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
assertTrue(emptyLogFileMessage());
}
// See PIG-1493
@Test
public void testInconsistentPruning() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2);");
pigServer.registerQuery("B = foreach A generate CONCAT(a0,a1) as b0, a0, a2;");
pigServer.registerQuery("C = foreach B generate a0, a2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals("(1,3)", t.toString());
assertTrue(iter.hasNext());
t = iter.next();
assertEquals("(2,2)", t.toString());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
// See PIG-1644
@Test
public void testSplitOutputWithForEach() throws Exception {
Path output1 = FileLocalizer.getTemporaryPath(pigServer.getPigContext());
Path output2 = FileLocalizer.getTemporaryPath(pigServer.getPigContext());
pigServer.setBatchOn();
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
pigServer.registerQuery("B = foreach A generate a0, a1, a2;");
pigServer.registerQuery("store B into '" + Util.generateURI(output1.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("C = order B by a2;");
pigServer.registerQuery("D = foreach C generate a2;");
pigServer.registerQuery("store D into '" + Util.generateURI(output2.toString(), pigServer.getPigContext()) + "';");
pigServer.executeBatch();
BufferedReader reader1 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output1.toString(), pigServer.getPigContext().getProperties())));
String line = reader1.readLine();
assertEquals("1\t2\t3", line);
line = reader1.readLine();
assertEquals("2\t3\t4", line);
assertNull(reader1.readLine());
BufferedReader reader2 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output2.toString(), pigServer.getPigContext().getProperties())));
line = reader2.readLine();
assertEquals("3", line);
line = reader2.readLine();
assertEquals("4", line);
assertNull(reader2.readLine());
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $3"}));
reader1.close();
reader2.close();
}
static public class PruneColumnEvalFunc extends LoadFunc implements LoadPushDown {
String[] aliases;
String signature;
public PruneColumnEvalFunc() {}
@Override
public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
aliases = new String[requiredFieldList.getFields().size()];
for (int i=0; i<requiredFieldList.getFields().size(); i++) {
RequiredField fs = requiredFieldList.getFields().get(i);
aliases[i] = fs.getAlias();
}
try {
UDFContext.getUDFContext().getUDFProperties(this.getClass()).setProperty(signature, ObjectSerializer.serialize(aliases));
} catch (IOException e) {
throw new FrontendException(e);
}
return new RequiredFieldResponse(true);
}
@Override
public List<OperatorSet> getFeatures() {
return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
}
@Override
public void setLocation(String location, Job job) throws IOException {
FileInputFormat.setInputPaths(job, location);
}
@Override
public InputFormat getInputFormat() throws IOException {
return new PigTextInputFormat();
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
}
@Override
public void setUDFContextSignature(String signature) {
this.signature = signature;
}
@Override
public Tuple getNext() throws IOException {
if (aliases==null) {
aliases = (String[])ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass()).getProperty(signature));
Tuple t = TupleFactory.getInstance().newTuple();
for (String s : aliases)
t.append(s);
return t;
}
return null;
}
}
@Test
public void testAliasInRequiredFieldList() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' using "
+ PruneColumnEvalFunc.class.getName() +"() as (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertEquals(2, t.size());
assertEquals("a1", t.get(0));
assertEquals("a2", t.get(1));
assertFalse(iter.hasNext());
}
@Test
public void testCogroup10() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (a0, a1:double);");
pigServer.registerQuery("B = foreach A generate a0, a1, 0 as joinField;");
pigServer.registerQuery("C = group B all;");
pigServer.registerQuery("D = foreach C generate 0 as joinField, SUM(B.a1) as total;");
pigServer.registerQuery("E = join B by joinField, D by joinField;");
pigServer.registerQuery("F = foreach E generate a0;");
Iterator<Tuple> iter = pigServer.openIterator("F");
String[] expected = new String[] {"(1)", "(2)"};
Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("F")));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
// See PIG-2534
@Test
public void testStream2() throws Exception {
File input1 = File.createTempFile("tmp", "");
input1.delete();
File input2 = File.createTempFile("tmp", "");
input2.delete();
Util.createLocalInputFile(input1.getAbsolutePath(), new String[]
{"[key1#0,key2#5,key3#val3,key4#val4,key5#val5]"});
Util.createLocalInputFile(input2.getAbsolutePath(), new String[]
{"[key1#0,key2#5,key3#val3,key4#val4,key5#val5]"});
pigServer.registerQuery("event_serve = LOAD '" + Util.encodeEscape(input1.getAbsolutePath()) +
"' AS (s, m, l);");
pigServer.registerQuery("cm_data_raw = LOAD '" + Util.encodeEscape(input2.getAbsolutePath()) +
"' AS (s, m, l);");
pigServer.registerQuery("cm_serve = FOREACH cm_data_raw GENERATE s#'key3' AS f1, s#'key4' AS f2, s#'key5' AS f3 ;");
pigServer.registerQuery("cm_serve_lowercase = stream cm_serve through `tr '[:upper:]' '[:lower:]'`;");
pigServer.registerQuery("cm_serve_final = FOREACH cm_serve_lowercase GENERATE $0 AS cm_event_guid, $1 AS cm_receive_time, $2 AS cm_ctx_url;");
pigServer.registerQuery("event_serve_project = FOREACH event_serve GENERATE s#'key3' AS event_guid, s#'key4' AS receive_time;");
pigServer.registerQuery("event_serve_join = join cm_serve_final by (cm_event_guid), event_serve_project by (event_guid);");
Iterator<Tuple> iter = pigServer.openIterator("event_serve_join");
String[] expected = new String[] {"(val3,val4,val5,val3,val4)"};
Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("event_serve_join")));
assertTrue(checkLogFileMessage(new String[]{"Map key required for event_serve: $0->[key3, key4]",
"Map key required for cm_data_raw: $0->[key3, key4, key5]"}));
}
// See PIG-2535
@Test
public void testStream3() throws Exception {
pigServer.registerQuery("event_serve = LOAD 'input1' AS (s, m, l);");
pigServer.registerQuery("raw = LOAD 'input2' AS (s, m, l);");
pigServer.registerQuery("SPLIT raw INTO " +
"serve_raw IF (( (chararray) (s#'type') == '0') AND ( (chararray) (s#'source') == '5'))," +
"cm_click_raw IF (( (chararray) (s#'type') == '1') AND ( (chararray) (s#'source') == '5'));");
pigServer.registerQuery("cm_serve = FOREACH serve_raw GENERATE s#'cm_serve_id' AS cm_event_guid, s#'cm_serve_timestamp_ms' AS cm_receive_time, s#'p_url' AS ctx ;");
pigServer.registerQuery("cm_serve_lowercase = stream cm_serve through `tr '[:upper:]' '[:lower:]'`;");
pigServer.registerQuery("cm_serve_final = FOREACH cm_serve_lowercase GENERATE $0 AS cm_event_guid, $1 AS cm_receive_time, $2 AS ctx;");
pigServer.registerQuery("filtered = FILTER event_serve BY (chararray) (s#'filter_key') neq 'xxxx' AND (chararray) (s#'filter_key') neq 'yyyy';");
pigServer.registerQuery("event_serve_project = FOREACH filtered GENERATE s#'event_guid' AS event_guid, s#'receive_time' AS receive_time;");
pigServer.registerQuery("event_serve_join = join cm_serve_final by (cm_event_guid), event_serve_project by (event_guid);");
pigServer.explain("event_serve_join", System.out);
assertTrue(checkLogFileMessage(new String[]{"Map key required for event_serve: $0->[event_guid, filter_key, receive_time]",
"Map key required for raw: $0->[cm_serve_id, cm_serve_timestamp_ms, p_url, source, type]"}));
}
}