blob: 2eb85e17aa9a85ebb694b84ff5258ca7d08d734d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.PigContext;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher;
import org.apache.pig.newplan.logical.optimizer.SchemaPatcher;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
import org.apache.pig.newplan.logical.rules.MapKeysPruneHelper;
import org.apache.pig.newplan.optimizer.PlanOptimizer;
import org.apache.pig.newplan.optimizer.PlanTransformListener;
import org.apache.pig.newplan.optimizer.Rule;
import org.junit.Test;
public class TestNewPlanPruneMapKeys {
PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
private org.apache.pig.newplan.logical.relational.LogicalPlan migratePlan(String query) throws Exception{
PigServer pigServer = new PigServer( pc );
org.apache.pig.newplan.logical.relational.LogicalPlan newPlan =
Util.buildLp(pigServer, query);
// run filter rule
Set<Rule> s = new HashSet<Rule>();
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
ls.add(s);
// Add the PruneMap Filter
Rule r = new ColumnMapKeyPrune("PruneMapKeys");
s.add(r);
printPlan((org.apache.pig.newplan.logical.relational.LogicalPlan)newPlan);
// Run the optimizer
MyPlanOptimizer optimizer = new MyPlanOptimizer(newPlan, ls, 3);
optimizer.addPlanTransformListener(new ProjectionPatcher());
optimizer.addPlanTransformListener(new SchemaPatcher());
optimizer.optimize();
return newPlan;
}
public class MyPlanOptimizer extends PlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, List<Set<Rule>> rs,
int iterations) {
super(p, rs, iterations);
}
public void addPlanTransformListener(PlanTransformListener listener) {
super.addPlanTransformListener(listener);
}
}
@Test
@SuppressWarnings("unchecked")
public void testSimplePlan() throws Exception {
String query = "a =load 'd.txt' as (a:map[], b:int, c:float);" +
"b = filter a by a#'name' == 'hello';" +
"store b into 'empty';";
org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
List<Operator> sources = newLogicalPlan.getSources();
assertEquals( 1, sources.size() );
for( Operator source : sources ) {
Map<Long,Set<String>> annotation =
(Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
assertTrue(annotation == null || annotation.isEmpty() );
}
}
@Test
@SuppressWarnings("unchecked")
public void testSimplePlan2() throws Exception {
String query = "a =load 'd.txt' as (a:map[], b:int, c:float);" +
"b = filter a by a#'name' == 'hello';" +
"c = foreach b generate b,c;" +
"store c into 'empty';";
org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
assertEquals( 1, newLogicalPlan.getSources().size() );
LOLoad load = (LOLoad) newLogicalPlan.getSources().get(0);
Map<Long,Set<String>> annotation =
(Map<Long, Set<String>>) load.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
assertNotNull(annotation);
assertEquals( 1, annotation.keySet().size() );
Integer[] keySet = annotation.keySet().toArray( new Integer[0] );
assertEquals( new Integer(0), keySet[0] );
Set<String> keys = annotation.get(0);
assertEquals( 1, keys.size() );
assertEquals( "name", keys.toArray( new String[0] )[0] );
}
@Test
@SuppressWarnings("unchecked")
public void testSimplePlan3() throws Exception {
String query = "a =load 'd.txt' as (a:map[], b:int, c:float);" +
"b = filter a by a#'name' == 'hello';" +
"c = foreach b generate a#'age',b,c;" +
"store c into 'empty';";
org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
assertEquals( 1, newLogicalPlan.getSources().size() );
LOLoad load = (LOLoad) newLogicalPlan.getSources().get(0);
Map<Long,Set<String>> annotation =
(Map<Long, Set<String>>) load.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
assertNotNull(annotation);
assertEquals( 1, annotation.keySet().size() );
Integer[] keySet = annotation.keySet().toArray( new Integer[0] );
assertEquals( new Integer(0), keySet[0] );
Set<String> keys = annotation.get(0);
assertEquals( 2, keys.size() );
assertTrue( keys.contains("name") );
assertTrue( keys.contains("age"));
}
@Test
@SuppressWarnings("unchecked")
public void testSimplePlan4() throws Exception {
String query = "a =load 'd.txt' as (a:map[], b:int, c:float);" +
"b = filter a by a#'name' == 'hello';" +
"c = foreach b generate a#'age',a,b,c;" +
"store c into 'empty';";
org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
List<Operator> sources = newLogicalPlan.getSources();
assertEquals( 1, sources.size() );
for( Operator source : sources ) {
Map<Long,Set<String>> annotation =
(Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
assertTrue(annotation == null || annotation.isEmpty() );
}
}
@Test
@SuppressWarnings("unchecked")
public void testSimplePlan5() throws Exception {
String query = "a =load 'd.txt' as (a:chararray, b:int, c:float);" +
"b = filter a by a == 'hello';" +
"c = foreach b generate a,b,c;" +
"store c into 'empty';";
org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
List<Operator> sources = newLogicalPlan.getSources();
assertEquals( 1, sources.size() );
for( Operator source : sources ) {
Map<Long,Set<String>> annotation =
(Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
assertNull(annotation);
}
}
@Test
@SuppressWarnings("unchecked")
public void testSimplePlan6() throws Exception {
String query = "a =load 'd.txt';" +
"b = filter a by $0 == 'hello';" +
"c = foreach b generate $0,$1,$2;" +
"store c into 'empty';";
org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
List<Operator> sources = newLogicalPlan.getSources();
assertEquals( 1, sources.size() );
for( Operator source : sources ) {
Map<Long,Set<String>> annotation =
(Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
assertTrue(annotation == null );
}
}
@Test
@SuppressWarnings("unchecked")
public void testSimplePlan7() throws Exception {
String query = "a =load 'd.txt';" +
"a1 = load 'b.txt' as (a:map[],b:int, c:float);" +
"b = join a by $0, a1 by a#'name';" +
"c = foreach b generate $0,$1,$2;" +
"store c into 'empty';";
org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
printPlan( newLogicalPlan );
List<Operator> sources = newLogicalPlan.getSources();
assertEquals( 2, sources.size() );
for( Operator source : sources ) {
Map<Long,Set<String>> annotation =
(Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
assertTrue( annotation == null || annotation.isEmpty() );
}
}
@Test
@SuppressWarnings("unchecked")
public void testSimplePlan8() throws Exception {
String query = "a =load 'd.txt';" +
"a1 = load 'b.txt' as (a:chararray,b:int, c:float);" +
"b = join a by $0, a1 by a;" +
"c = foreach b generate $0,$1,$2;" +
"store c into 'empty';";
org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(query);
printPlan(newLogicalPlan);
List<Operator> sources = newLogicalPlan.getSources();
assertEquals( 2, sources.size() );
for( Operator source : sources ) {
Map<Long,Set<String>> annotation =
(Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
assertTrue( annotation == null || annotation.isEmpty() );
}
}
public void printPlan(org.apache.pig.newplan.logical.relational.LogicalPlan logicalPlan ) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(out);
LogicalPlanPrinter pp = new LogicalPlanPrinter(logicalPlan,ps);
pp.visit();
System.err.println(out.toString());
}
public void printPlan(PhysicalPlan physicalPlan) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(out);
physicalPlan.explain(ps, "text", true);
System.err.println(out.toString());
}
}