blob: b107b42123bf55de6ec6c7916bb78d62f004fdcb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.ExecType;
import org.apache.pig.Expression;
import org.apache.pig.LoadMetadata;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.builtin.mock.Storage.Data;
import static org.apache.pig.builtin.mock.Storage.*;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.expression.CastExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
import org.apache.pig.parser.ParserException;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestPlanGeneration {
private static PigContext pc;
private static PigServer ps;
@BeforeClass
public static void setUp() throws Exception {
ps = new PigServer(Util.getLocalTestMode());
pc = ps.getPigContext();
pc.connect();
}
@Test
public void testGenerateStar() throws Exception {
String query = "a = load 'x';" +
"b = foreach a generate *;" +
"store b into '111';";
LogicalPlan lp = Util.parseAndPreprocess(query, pc);
Util.optimizeNewLP(lp);
LOStore loStore = (LOStore)lp.getSinks().get(0);
LOForEach loForEach = (LOForEach)lp.getPredecessors(loStore).get(0);
assertNull(loForEach.getSchema());
}
@Test
public void testEmptyBagDereference() throws Exception {
String query = "A = load 'x' as ( u:bag{} );" +
"B = foreach A generate u.$100;" +
"store B into '111';";
LogicalPlan lp = Util.parseAndPreprocess(query, pc);
Util.optimizeNewLP(lp);
LOStore loStore = (LOStore)lp.getSinks().get(0);
LOForEach loForEach = (LOForEach)lp.getPredecessors(loStore).get(0);
LogicalSchema schema = loForEach.getSchema();
assertEquals(1, schema.size());
LogicalFieldSchema bagFieldSchema = schema.getField(0);
assertEquals(DataType.BAG, bagFieldSchema.type);
LogicalFieldSchema tupleFieldSchema = bagFieldSchema.schema.getField(0);
assertEquals(1, tupleFieldSchema.schema.size());
assertEquals(DataType.BYTEARRAY, tupleFieldSchema.schema.getField(0).type);
}
@Test
public void testEmptyTupleDereference() throws Exception {
String query = "A = load 'x' as ( u:tuple() );" +
"B = foreach A generate u.$100;" +
"store B into '111';";
LogicalPlan lp = Util.parseAndPreprocess(query, pc);
Util.optimizeNewLP(lp);
LOStore loStore = (LOStore)lp.getSinks().get(0);
LOForEach loForEach = (LOForEach)lp.getPredecessors(loStore).get(0);
LogicalSchema schema = loForEach.getSchema();
assertEquals(1, schema.size());
assertEquals(DataType.BYTEARRAY, schema.getField(0).type);
}
@Test
public void testEmptyBagInnerPlan() throws Exception {
String query = "A = load 'x' as ( u:bag{} );" +
"B = foreach A { B1 = filter u by $1==0; generate B1;};" +
"store B into '111';";
LogicalPlan lp = Util.parseAndPreprocess(query, pc);
Util.optimizeNewLP(lp);
LOStore loStore = (LOStore)lp.getSinks().get(0);
LOForEach loForEach = (LOForEach)lp.getPredecessors(loStore).get(0);
LogicalSchema schema = loForEach.getSchema();
assertEquals(1, schema.size());
LogicalFieldSchema bagFieldSchema = schema.getField(0);
assertEquals(DataType.BAG, bagFieldSchema.type);
LogicalFieldSchema tupleFieldSchema = bagFieldSchema.schema.getField(0);
assertNull(tupleFieldSchema.schema);
}
@Test
public void testOrderByNullFieldSchema() throws Exception {
String query = "A = load 'x';" +
"B = order A by *;" +
"store B into '111';";
LogicalPlan lp = Util.parseAndPreprocess(query, pc);
Util.optimizeNewLP(lp);
LOStore loStore = (LOStore)lp.getSinks().get(0);
LOSort loSort = (LOSort)lp.getPredecessors(loStore).get(0);
Operator sortPlanLeaf = loSort.getSortColPlans().get(0).getSources().get(0);
LogicalFieldSchema sortPlanFS = ((LogicalExpression)sortPlanLeaf).getFieldSchema();
assertNull(sortPlanFS);
PhysicalPlan pp = Util.buildPhysicalPlanFromNewLP(lp, pc);
POStore poStore = (POStore)pp.getLeaves().get(0);
POSort poSort = (POSort)pp.getPredecessors(poStore).get(0);
POProject poProject = (POProject)poSort.getSortPlans().get(0).getLeaves().get(0);
assertEquals(DataType.TUPLE, poProject.getResultType());
}
@Test
public void testGroupByNullFieldSchema() throws Exception {
String query = "A = load 'x';" +
"B = group A by *;" +
"store B into '111';";
LogicalPlan lp = Util.parseAndPreprocess(query, pc);
Util.optimizeNewLP(lp);
LOStore loStore = (LOStore)lp.getSinks().get(0);
LOCogroup loCoGroup = (LOCogroup)lp.getPredecessors(loStore).get(0);
LogicalFieldSchema groupFieldSchema = loCoGroup.getSchema().getField(0);
assertEquals(DataType.TUPLE, groupFieldSchema.type);
assertNull(groupFieldSchema.schema);
}
@Test
public void testStoreAlias() throws Exception {
String query = "A = load 'data' as (a0, a1);" +
"B = filter A by a0 > 1;" +
"store B into 'output';";
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
LOStore loStore = (LOStore)lp.getSinks().get(0);
assertEquals("B", loStore.getAlias());
PhysicalPlan pp = Util.buildPhysicalPlanFromNewLP(lp, pc);
POStore poStore = (POStore)pp.getLeaves().get(0);
assertEquals("B", poStore.getAlias());
MROperPlan mrp = Util.buildMRPlanWithOptimizer(pp, pc);
MapReduceOper mrOper = mrp.getLeaves().get(0);
poStore = (POStore)mrOper.mapPlan.getLeaves().get(0);
assertEquals("B", poStore.getAlias());
}
// See PIG-2119
@Test
public void testDanglingNestedNode() throws Exception {
String query = "a = load 'b.txt' AS (id:chararray, num:int); " +
"b = group a by id;" +
"c = foreach b {" +
" d = order a by num DESC;" +
" n = COUNT(a);" +
" e = limit d 1;" +
" generate n;" +
"};";
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
}
public static class SchemaLoader extends PigStorage implements LoadMetadata {
Schema schema;
public SchemaLoader(String schemaString) throws ParserException {
schema = Utils.getSchemaFromString(schemaString);
}
@Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
return new ResourceSchema(schema);
}
@Override
public ResourceStatistics getStatistics(String location, Job job)
throws IOException {
return null;
}
@Override
public String[] getPartitionKeys(String location, Job job)
throws IOException {
return null;
}
@Override
public void setPartitionFilter(Expression partitionFilter)
throws IOException {
}
}
@Test
public void testLoaderWithSchema() throws Exception {
String query = "a = load 'foo' using " + SchemaLoader.class.getName()
+ "('name,age,gpa');\n"
+ "b = filter a by age==20;"
+ "store b into 'output';";
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
LOLoad loLoad = (LOLoad)lp.getSources().get(0);
LOFilter loFilter = (LOFilter)lp.getSuccessors(loLoad).get(0);
LOStore loStore = (LOStore)lp.getSuccessors(loFilter).get(0);
assertNull(lp.getSuccessors(loStore));
}
public static class PartitionedLoader extends PigStorage implements LoadMetadata {
Schema schema;
String[] partCols;
static Expression partFilter = null;
public PartitionedLoader(String schemaString, String commaSepPartitionCols)
throws ParserException {
schema = Utils.getSchemaFromString(schemaString);
partCols = commaSepPartitionCols.split(",");
}
@Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
return new ResourceSchema(schema);
}
@Override
public ResourceStatistics getStatistics(String location,
Job job) throws IOException {
return null;
}
@Override
public void setPartitionFilter(Expression partitionFilter)
throws IOException {
partFilter = partitionFilter;
}
@Override
public String[] getPartitionKeys(String location, Job job)
throws IOException {
return partCols;
}
public Expression getPartFilter() {
return partFilter;
}
}
@Test
// See PIG-2339
public void testPartitionFilterOptimizer() throws Exception {
String query = "a = load 'foo' using " + PartitionedLoader.class.getName() +
"('name:chararray, dt:chararray', 'dt');\n" +
"b = filter a by dt=='2011';\n" +
"store b into 'output';";
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
LOLoad loLoad = (LOLoad)lp.getSources().get(0);
LOStore loStore = (LOStore)lp.getSuccessors(loLoad).get(0);
assertNotNull(((PartitionedLoader)loLoad.getLoadFunc()).getPartFilter());
assertEquals("b", loStore.getAlias());
}
@Test
// See PIG-2315
public void testForEachWithCast1() throws Exception {
// A cast ForEach is inserted to take care of the user schema
String query = "A = load 'foo' as (a, b:int);\n" +
"B = foreach A generate a as a0:chararray, b as b:int;\n" +
"store B into 'output';";
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
LOLoad loLoad = (LOLoad)lp.getSources().get(0);
LOForEach loForEach1 = (LOForEach)lp.getSuccessors(loLoad).get(0);
LOForEach loForEach2 = (LOForEach)lp.getSuccessors(loForEach1).get(0);
// before a0 is typecasted to chararray, it should be bytearray
assertEquals(DataType.BYTEARRAY, loForEach1.getSchema().getField(0).type);
// type of b should stay as int
assertEquals(DataType.INTEGER, loForEach1.getSchema().getField(1).type);
assertEquals("B", loForEach2.getAlias());
LOGenerate generate = (LOGenerate)loForEach2.getInnerPlan().getSinks().get(0);
CastExpression cast = (CastExpression)generate.getOutputPlans().get(0).getSources().get(0);
Assert.assertTrue(cast.getType()==DataType.CHARARRAY);
assertEquals(loForEach2.getSchema().getField(0).alias, "a0");
Assert.assertTrue(lp.getSuccessors(loForEach2).get(0) instanceof LOStore);
}
@Test
// See PIG-2315
public void testForEachWithCast2() throws Exception {
// No additional cast ForEach will be inserted, but schema should match
String query = "A = load 'foo' as (a, b);\n" +
"B = foreach A generate (chararray)a as a0:chararray;\n" +
"store B into 'output';";
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
LOLoad loLoad = (LOLoad)lp.getSources().get(0);
LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
assertEquals(loForEach.getSchema().getField(0).alias, "a0");
Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
}
@Test
// See PIG-2315
public void testForEachWithCast3() throws Exception {
// No additional cast ForEach will be inserted, but schema should match
String query = "A = load 'foo' as (a, b);\n" +
"B = foreach A generate (chararray)a as a0:int;\n" +
"store B into 'output';";
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
LOLoad loLoad = (LOLoad)lp.getSources().get(0);
LOForEach loForEach1 = (LOForEach)lp.getSuccessors(loLoad).get(0);
LOGenerate generate1 = (LOGenerate)loForEach1.getInnerPlan().getSinks().get(0);
CastExpression cast1 = (CastExpression)generate1.getOutputPlans().get(0).getSources().get(0);
Assert.assertTrue(cast1.getType()==DataType.CHARARRAY);
//before a0 is typecasted to int, it should be chararray
Assert.assertEquals(DataType.CHARARRAY, loForEach1.getSchema().getField(0).type);
LOForEach loForEach2 = (LOForEach)lp.getSuccessors(loForEach1).get(0);
LOGenerate generate2 = (LOGenerate)loForEach2.getInnerPlan().getSinks().get(0);
CastExpression cast2 = (CastExpression)generate2.getOutputPlans().get(0).getSources().get(0);
Assert.assertTrue(cast2.getType()==DataType.INTEGER);
Assert.assertTrue(lp.getSuccessors(loForEach2).get(0) instanceof LOStore);
}
@Test
// See PIG-2315
public void testForEachWithCast4() throws Exception {
// No additional cast ForEach will be inserted
String query = "a = load 'foo' as (nb1:bag{}, nb2:chararray);\n" +
"b = foreach a generate flatten(nb1) as (year, name), nb2;\n" +
"store b into 'output';";
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
LOLoad loLoad = (LOLoad)lp.getSources().get(0);
LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
}
@Test
// See PIG-2315
public void testForEachWithCast5() throws Exception {
// cast ForEach will be inserted
String query = "a = load 'foo' as (nb1:bag{}, nb2:chararray);\n" +
"b = foreach a generate flatten(nb1) as (year, name:chararray), nb2 as nb2:chararray;\n" +
"store b into 'output';";
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
LOLoad loLoad = (LOLoad)lp.getSources().get(0);
LOForEach loForEach1 = (LOForEach)lp.getSuccessors(loLoad).get(0);
// flattened "name" field should be bytearray before typecasted to chararray
Assert.assertEquals(DataType.BYTEARRAY, loForEach1.getSchema().getField(1).type);
LOForEach loForEach2 = (LOForEach)lp.getSuccessors(loForEach1).get(0);
LOGenerate generate = (LOGenerate)loForEach2.getInnerPlan().getSinks().get(0);
Assert.assertTrue(generate.getOutputPlans().get(0).getSources().get(0) instanceof ProjectExpression);
CastExpression cast = (CastExpression)generate.getOutputPlans().get(1).getSources().get(0);
Assert.assertTrue(cast.getType()==DataType.CHARARRAY);
Assert.assertTrue(generate.getOutputPlans().get(2).getSources().get(0) instanceof ProjectExpression);
}
@Test
// See PIG-2315
public void testForEachWithCast6() throws Exception {
// no cast ForEach will be inserted
String query = "a = load 'foo' as (nb1:bag{(year,name)}, nb2);\n" +
"b = foreach a generate flatten(nb1) as (year, name2), nb2;\n" +
"store b into 'output';";
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
LOLoad loLoad = (LOLoad)lp.getSources().get(0);
LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
assertEquals(loForEach.getSchema().getField(1).alias, "name2");
Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
}
@Test
// See PIG-2315
public void testForEachWithCast7() throws Exception {
// no cast ForEach will be inserted, since we don't know the size of outputs
// in first inner plan
String query = "a = load 'foo' as (nb1:bag{}, nb2:bag{});\n" +
"b = foreach a generate flatten(nb1), flatten(nb2) as (year, name);\n" +
"store b into 'output';";
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
LOLoad loLoad = (LOLoad)lp.getSources().get(0);
LOForEach loForEach = (LOForEach)lp.getSuccessors(loLoad).get(0);
Assert.assertTrue(lp.getSuccessors(loForEach).get(0) instanceof LOStore);
}
@Test
// See PIG-2315
public void testAsType1() throws Exception {
Data data = Storage.resetData(ps);
data.set("input", tuple(0.1), tuple(1.2), tuple(2.3));
String query =
"A = load 'input' USING mock.Storage() as (a1:double);\n"
+ "B = FOREACH A GENERATE a1 as (a2:int);\n"
+ "store B into 'out' using mock.Storage;" ;
Util.registerMultiLineQuery(ps, query);
List<Tuple> list = data.get("out");
// Without PIG-2315, this failed with (0.1), (1.2), (2.3)
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {"(0)", "(1)", "(2)"});
Util.checkQueryOutputsAfterSort(list, expectedRes);
}
@Test
// See PIG-2315
public void testAsType2() throws Exception {
Data data = Storage.resetData(ps);
data.set("input", tuple("a"), tuple("b"), tuple("c"));
String query =
"A = load 'input' USING mock.Storage(); \n"
+ "A2 = FOREACH A GENERATE 12345 as (a2:chararray); \n"
+ "B = load 'input' USING mock.Storage(); \n"
+ "B2 = FOREACH A GENERATE '12345' as (b2:chararray); \n"
+ "C = union A2, B2;\n"
+ "D = distinct C;\n"
+ "store D into 'out' using mock.Storage;" ;
Util.registerMultiLineQuery(ps, query);
List<Tuple> list = data.get("out");
// Without PIG-2315, this produced TWO 12345.
// One by chararray and another by int.
List<Tuple> expectedRes =
Util.getTuplesFromConstantTupleStrings(
new String[] {"('12345')"});
Util.checkQueryOutputsAfterSort(list, expectedRes);
}
@Test
// See PIG-4933
public void testAsWithByteArrayCast() throws Exception {
Data data = Storage.resetData(ps);
data.set("input_testAsWithByteArrayCast", "t1:(f1:bytearray, f2:bytearray), f3:chararray",
tuple(tuple(1,5), "a"),
tuple(tuple(2,4), "b"),
tuple(tuple(3,3), "c") );
String query =
"A = load 'input_testAsWithByteArrayCast' USING mock.Storage();\n"
+ "B = FOREACH A GENERATE t1 as (t2:(newf1, newf2:float)), f3;"
+ "store B into 'out' using mock.Storage;" ;
// This will call typecast of (bytearray,float) on a tuple
// bytearray2bytearray should be no-op.
// Without pig-4933 patch on POCast,
// this typecast was producing empty results
Util.registerMultiLineQuery(ps, query);
List<Tuple> list = data.get("out");
String[] expectedRes =
new String[] {"((1,5.0),a)","((2,4.0),b)","((3,3.0),c)"};
for( int i=0; i < list.size(); i++ ) {
Assert.assertEquals(expectedRes[i], list.get(i).toString());
}
}
}