blob: 67a02af19aa650795979ed53bbdbcbfe986a24dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License" + you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.pig.PigServer;
import org.apache.pig.impl.PigContext;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.rules.AddForEach;
import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
import org.apache.pig.newplan.logical.rules.MapKeysPruneHelper;
import org.apache.pig.newplan.optimizer.PlanOptimizer;
import org.apache.pig.newplan.optimizer.Rule;
import org.junit.Before;
import org.junit.Test;
public class TestNewPlanColumnPrune {
LogicalPlan plan = null;
PigContext pc;
@Before
public void setUp() throws Exception {
pc = new PigContext(Util.getLocalTestMode(), new Properties());
}
private LogicalPlan buildPlan(String query) throws Exception{
PigServer pigServer = new PigServer( pc );
return Util.buildLp(pigServer, query);
}
@Test
public void testNoPrune() throws Exception {
// no foreach
String query = "a = load 'd.txt' as (id, v1, v2);" +
"b = filter a by v1==NULL;" +
"store b into 'empty';";
LogicalPlan newLogicalPlan = buildPlan(query);
PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a = load 'd.txt' as (id, v1, v2);" +
"b = filter a by v1==NULL;" +
"store b into 'empty';";
LogicalPlan expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
// no schema
query = "a = load 'd.txt';" +
"b = foreach a generate $0, $1;" +
"store b into 'empty';";
newLogicalPlan = buildPlan(query);
optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a = load 'd.txt';"+
"b = foreach a generate $0, $1;"+
"store b into 'empty';";
expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
}
@Test
public void testPrune() throws Exception {
// only foreach
String query = "a = load 'd.txt' as (id, v1, v2);" +
"b = foreach a generate id;"+
"store b into 'empty';";
LogicalPlan newLogicalPlan = buildPlan(query);
PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a = load 'd.txt' as (id);" +
"b = foreach a generate id;"+
"store b into 'empty';";
LogicalPlan expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
// with filter
query = "a = load 'd.txt' as (id, v1, v5, v3, v4, v2);"+
"b = filter a by v1 != NULL AND (v2+v3)<100;"+
"c = foreach b generate id;"+
"store c into 'empty';";
newLogicalPlan = buildPlan(query);
optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a = load 'd.txt' as (id, v1, v3, v2);" +
"b = filter a by v1 != NULL AND (v2+v3)<100;" +
"c = foreach b generate id;" +
"store c into 'empty';";
expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
// with 2 foreach
query = "a = load 'd.txt' as (id, v1, v5, v3, v4, v2);" +
"b = foreach a generate v2, v5, v4;" +
"c = foreach b generate v5, v4;" +
"store c into 'empty';";
newLogicalPlan = buildPlan(query);
optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a = load 'd.txt' as (v5, v4);" +
"b = foreach a generate v5, v4;" +
"c = foreach b generate v5, v4;" +
"store c into 'empty';";
expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
// with 2 foreach
query = "a = load 'd.txt' as (id, v1, v5, v3, v4, v2);" +
"b = foreach a generate id, v1, v5, v3, v4;" +
"c = foreach b generate v5, v4;" +
"store c into 'empty';";
newLogicalPlan = buildPlan(query);
optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a = load 'd.txt' as (v5, v4);" +
"b = foreach a generate v5, v4;" +
"c = foreach b generate v5, v4;" +
"store c into 'empty';";
expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
// with 2 foreach and filter in between
query = "a =load 'd.txt' as (id, v1, v5, v3, v4, v2);" +
"b = foreach a generate v2, v5, v4;" +
"c = filter b by v2 != NULL;" +
"d = foreach c generate v5, v4;" +
"store d into 'empty';";
newLogicalPlan = buildPlan(query);
optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a =load 'd.txt' as (v5, v4, v2);" +
"b = foreach a generate v2, v5, v4;" +
"c = filter b by v2 != NULL;" +
"d = foreach c generate v5, v4;" +
"store d into 'empty';";
expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
// with 2 foreach after join
query = "a =load 'd.txt' as (id, v1, v2, v3);" +
"b = load 'c.txt' as (id, v4, v5, v6);" +
"c = join a by id, b by id;" +
"d = foreach c generate a::id, v5, v3, v4;" +
"store d into 'empty';";
newLogicalPlan = buildPlan(query);
optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a =load 'd.txt' as (id, v3);" +
"b = load 'c.txt' as (id, v4, v5);" +
"c = join a by id, b by id;" +
"d = foreach c generate a::id, v5, v3, v4;" +
"store d into 'empty';";
expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
// with BinStorage, insert foreach after load
query = "a =load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);" +
"c = filter a by v2 != NULL;" +
"d = foreach c generate v5, v4;" +
"store d into 'empty';";
newLogicalPlan = buildPlan(query);
optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a =load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);" +
"b = foreach a generate v5, v4, v2;" +
"c = filter b by v2 != NULL;" +
"d = foreach c generate v5, v4;" +
"store d into 'empty';";
expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
// with BinStorage, not to insert foreach after load if there is already one
query = "a =load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);" +
"b = foreach a generate v5, v4, v2;" +
"c = filter b by v2 != NULL;" +
"d = foreach c generate v5;" +
"store d into 'empty';";
newLogicalPlan = buildPlan(query);
optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a =load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);" +
"b = foreach a generate v5, v2;" +
"c = filter b by v2 != NULL;" +
"d = foreach c generate v5;" +
"store d into 'empty';";
expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
// with BinStorage, not to insert foreach after load if there is already one
query = "a =load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);" +
"b = foreach a generate v5, v4, v2, 10;" +
"c = filter b by v2 != NULL;" +
"d = foreach c generate v5;" +
"store d into 'empty';";
newLogicalPlan = buildPlan(query);
optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a =load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);" +
"b = foreach a generate v5, v2, 10;" +
"c = filter b by v2 != NULL;" +
"d = foreach c generate v5;" +
"store d into 'empty';";
expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
}
@Test
@SuppressWarnings("unchecked")
public void testPruneWithMapKey() throws Exception {
// only foreach
String query = "a =load 'd.txt' as (id, v1, m:map[]);" +
"b = foreach a generate id, m#'path';" +
"store b into 'empty';";
LogicalPlan newLogicalPlan = buildPlan(query);
PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a =load 'd.txt' as (id, m:map[]);" +
"b = foreach a generate id, m#'path';" +
"store b into 'empty';";
LogicalPlan expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
LOLoad op = (LOLoad)newLogicalPlan.getSources().get(0);
Map<Integer,Set<String>> annotation =
(Map<Integer, Set<String>>) op.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
assertEquals(1, annotation.size());
Set<String> s = new HashSet<String>();
s.add("path");
assertEquals(annotation.get(2), s);
// foreach with join
query = "a =load 'd.txt' as (id, v1, m:map[]);" +
"b = load 'd.txt' as (id, v1, m:map[]);" +
"c = join a by id, b by id;" +
"d = filter c by a::m#'path' != NULL;" +
"e = foreach d generate a::id, b::id, b::m#'path', a::m;" +
"store e into 'empty';";
newLogicalPlan = buildPlan(query);
optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a =load 'd.txt' as (id, m:map[]);" +
"b = load 'd.txt' as (id, m:map[]);" +
"c = join a by id, b by id;" +
"d = filter c by a::m#'path' != NULL;" +
"e = foreach d generate a::id, b::id, b::m#'path', a::m;" +
"store e into 'empty';";
expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
List<Operator> ll = newLogicalPlan.getSources();
assertEquals(2, ll.size());
LOLoad loada = null;
LOLoad loadb = null;
for(Operator opp: ll) {
if (((LogicalRelationalOperator)opp).getAlias().equals("a")) {
loada = (LOLoad)opp;
continue;
}
if (((LogicalRelationalOperator)opp).getAlias().equals("b")) {
loadb = (LOLoad)opp;
continue;
}
}
annotation =
(Map<Integer, Set<String>>) loada.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
assertNull(annotation);
annotation =
(Map<Integer, Set<String>>) loadb.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
assertEquals(1, annotation.size());
s = new HashSet<String>();
s.add("path");
assertEquals(annotation.get(2), s);
}
@Test
public void testPruneWithBag() throws Exception {
// filter above foreach
String query = "a =load 'd.txt' as (id, v:bag{t:(s1,s2,s3)});" +
"b = filter a by id>10;" +
"c = foreach b generate id, FLATTEN(v);" +
"d = foreach c generate id, v::s2;" +
"store d into 'empty';";
LogicalPlan newLogicalPlan = buildPlan(query);
PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a =load 'd.txt' as (id, v:bag{t:(s1,s2,s3)});" +
"b = filter a by id>10;" +
"c = foreach b generate id, FLATTEN(v);" +
"d = foreach c generate id, v::s2;" +
"store d into 'empty';";
LogicalPlan expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
}
@Test
public void testAddForeach() throws Exception {
// filter above foreach
String query = "a =load 'd.txt' as (id, v1, v2);" +
"b = filter a by v1>10;" +
"c = foreach b generate id;" +
"store c into 'empty';";
LogicalPlan newLogicalPlan = buildPlan(query);
PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a =load 'd.txt' as (id, v1);" +
"b = filter a by v1>10;" +
"c = foreach b generate id;" +
"store c into 'empty';";
LogicalPlan expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
// join with foreach
query = "a =load 'd.txt' as (id, v1, v2);" +
"b = load 'd.txt' as (id, v1, v2);" +
"c = join a by id, b by id;" +
"d = filter c by a::v1>b::v1;" +
"e = foreach d generate a::id;" +
"store e into 'empty';";
newLogicalPlan = buildPlan(query);
optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
query = "a =load 'd.txt' as (id, v1);" +
"b = load 'd.txt' as (id, v1);" +
"c = join a by id, b by id;" +
"d = foreach c generate a::id, a::v1, b::v1;" +
"e = filter d by a::v1>b::v1;" +
"f = foreach e generate a::id;" +
"store f into 'empty';";
expected = buildPlan(query);
assertTrue(expected.isEqual(newLogicalPlan));
}
@Test
public void testPruneSubTreeForEach() throws Exception {
String query = "a =load 'd.txt' as (id, v1);" +
"b = group a by id;" +
"c = foreach b { d = a.v1; " +
" e = distinct d; " +
" generate group, e; };" +
"f = foreach c generate group ;" +
"store f into 'empty';";
LogicalPlan newLogicalPlan = buildPlan(query);
PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
try {
optimizer.optimize();
} catch (Exception e) {
//PIG-2968 throws ConcurrentModificationException
e.printStackTrace();
fail("Unexpected Exception: " + e);
}
}
@Test
public void testDistinct() throws Exception {
//Test for bug where distinct wasn't being pruned properly causing union
//to fail to get a schema since the distinct relation had an incompatible schema
//with the other relation being unioned.
String testQuery =
"a = load 'd.txt' as (id, v1, v2);" +
"b = load 'd.txt' as (id, v1, v2);" +
"c = distinct a;" +
"d = union b, c;" +
"e = foreach d generate id, v1;" +
"store e into 'empty';";
//Generate optimized plan.
LogicalPlan newLogicalPlan = buildPlan(testQuery);
PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
Iterator<Operator> iter = newLogicalPlan.getOperators();
while (iter.hasNext()) {
Operator o = iter.next();
LogicalRelationalOperator lro = (LogicalRelationalOperator)o;
if (lro == null || lro.getAlias() == null) continue;
if (lro.getAlias().equals("d")) {
assertNotNull(lro.getSchema());
}
}
}
@Test
public void testNoAddForeach() throws Exception {
// PIG-5055
// Need to make sure that it does not add foreach
// that drops all the fields from B2.
String query = "A = load 'd.txt' as (a0:int, a1:int, a2:int);" +
"B = load 'd.txt' as (b0:int, b1:int, b2:int);" +
"B2 = FILTER B by b0 == 0;" +
"C = join A by (1), B2 by (1) ;" +
"D = FOREACH C GENERATE A::a1, A::a2;" +
"store D into 'empty';";
LogicalPlan newLogicalPlan = buildPlan(query);
PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
optimizer.optimize();
System.err.println(newLogicalPlan);
Iterator<Operator> iter = newLogicalPlan.getOperators();
while (iter.hasNext()) {
Operator o = iter.next();
LogicalRelationalOperator lro = (LogicalRelationalOperator)o;
if (lro == null || lro.getAlias() == null) continue;
if (lro.getAlias().equals("B2")) {
assertNotNull(lro.getSchema());
}
}
}
public class MyPlanOptimizer extends LogicalPlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, int iterations) {
super(p, iterations, null);
}
protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
Rule r = new ColumnMapKeyPrune("ColumnMapKeyPrune");
Set<Rule> s = new HashSet<Rule>();
s.add(r);
ls.add(s);
r = new AddForEach("AddForEach");
s = new HashSet<Rule>();
s.add(r);
ls.add(s);
return ls;
}
}
}