blob: 80969895c4c266b3244c65fec283df018e271cbc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.rules;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.optimizer.Transformer;
/**
* This Rule prunes columns and map keys and set to loader. This rule depends
* on MapKeysPruneHelper to calculate what keys are required for a loader,
* and ColumnPruneHelper to calculate the required columns for a loader. Then
* it combines the map keys and columns info to set into the loader.
*/
public class ColumnMapKeyPrune extends WholePlanRule {
private boolean hasRun;
public ColumnMapKeyPrune(String n) {
super(n, false);
hasRun = false;
}
@Override
public Transformer getNewTransformer() {
return new ColumnMapKeyPruneTransformer();
}
public class ColumnMapKeyPruneTransformer extends Transformer {
private MapKeysPruneHelper mapKeyHelper;
private ColumnPruneHelper columnHelper;
private boolean columnPrune;
private boolean mapKeyPrune;
/*
* This is a map of of required columns and map keys for each LOLoad
* RequiredMapKeys --> Map<Integer, Set<String> >
* RequiredColumns --> Set<Integer>
*
* The integer are column indexes.
*/
private Map<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>> requiredItems =
new HashMap<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>>();
@Override
public boolean check(OperatorPlan matched) throws FrontendException {
// only run this rule once
if (hasRun) {
return false;
}
hasRun = true;
mapKeyHelper = new MapKeysPruneHelper(matched);
columnHelper = new ColumnPruneHelper(matched);
// check if map keys can be pruned
mapKeyPrune = mapKeyHelper.check();
// check if columns can be pruned
columnPrune = columnHelper.check();
return mapKeyPrune || columnPrune;
}
@Override
public OperatorPlan reportChanges() {
return currentPlan;
}
@SuppressWarnings("unchecked")
private void merge() throws FrontendException {
// combine annotations
for( Operator source : currentPlan.getSources() ) {
Map<Integer,Set<String>> mapKeys =
(Map<Integer, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
Set<Integer> requiredColumns = null;
if (source.getAnnotation(ColumnPruneHelper.REQUIREDCOLS) != null) {
requiredColumns = new HashSet<Integer>((Set<Integer>) source.getAnnotation(ColumnPruneHelper.REQUIREDCOLS));
}
// We dont have any information so skip
if( requiredColumns == null && mapKeys == null ) {
continue;
}
if( requiredColumns != null && mapKeys != null ) {
Set<Integer> duplicatedCols = new HashSet<Integer>();
// Remove the columns already marked by MapKeys
for( Integer col : requiredColumns ) {
if( mapKeys.containsKey(col) ) {
duplicatedCols.add(col);
}
}
requiredColumns.removeAll(duplicatedCols);
} else if ( mapKeys != null && requiredColumns == null ) {
// This is the case where only mapKeys can be pruned. And none
// of the columns can be pruned. So we add all columns to the
// requiredcolumns part
requiredColumns = new HashSet<Integer>();
for(int i = 0; i < ((LogicalRelationalOperator)source).getSchema().size(); i++ ) {
if( !mapKeys.containsKey(i) ) {
requiredColumns.add(i);
}
}
}
requiredItems.put((LOLoad) source, new Pair<Map<Integer,Set<String>>,Set<Integer>>(mapKeys, requiredColumns));
}
}
@Override
public void transform(OperatorPlan matched) throws FrontendException {
merge();
ColumnPruneVisitor columnPruneVisitor = new ColumnPruneVisitor(currentPlan, requiredItems, columnPrune);
columnPruneVisitor.visit();
}
}
}