| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.pig.newplan.logical.rules; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.pig.Expression; |
| import org.apache.pig.Expression.BinaryExpression; |
| import org.apache.pig.Expression.Column; |
| import org.apache.pig.LoadFunc; |
| import org.apache.pig.LoadMetadata; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.newplan.FilterExtractor; |
| import org.apache.pig.newplan.Operator; |
| import org.apache.pig.newplan.OperatorPlan; |
| import org.apache.pig.newplan.OperatorSubPlan; |
| import org.apache.pig.newplan.PartitionFilterExtractor; |
| import org.apache.pig.newplan.logical.relational.LOFilter; |
| import org.apache.pig.newplan.logical.relational.LOLoad; |
| import org.apache.pig.newplan.logical.relational.LogicalPlan; |
| import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator; |
| import org.apache.pig.newplan.logical.relational.LogicalSchema; |
| import org.apache.pig.newplan.optimizer.Rule; |
| import org.apache.pig.newplan.optimizer.Transformer; |
| |
| public class PartitionFilterOptimizer extends Rule { |
| |
| private static final Log LOG = LogFactory.getLog(PartitionFilterOptimizer.class); |
| private String[] partitionKeys; |
| |
| /** |
| * a reference to the LoadMetada implementation |
| */ |
| private LoadMetadata loadMetadata; |
| |
| /** |
| * a reference to the LoadFunc implementation |
| */ |
| private LoadFunc loadFunc; |
| |
| private LOLoad loLoad; |
| private LOFilter loFilter; |
| |
| /** |
| * a map between column names as reported in |
| * {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)} |
| * and as present in {@link LOLoad#getSchema()}. The two will be different |
| * when the user has provided a schema in the load statement |
| */ |
| private Map<String, String> colNameMap = new HashMap<String, String>(); |
| |
| /** |
| * a map between column nameas as present in {@link LOLoad#getSchema()} and |
| * as reported in |
| * {@link LoadMetadata#getSchema(String, org.apache.hadoop.conf.Configuration)}. |
| * The two will be different when the user has provided a schema in the |
| * load statement. |
| */ |
| private Map<String, String> reverseColNameMap = new HashMap<String, String>(); |
| |
| public PartitionFilterOptimizer(String name) { |
| super( name, false ); |
| } |
| |
| @Override |
| protected OperatorPlan buildPattern() { |
| // match each foreach. |
| LogicalPlan plan = new LogicalPlan(); |
| LogicalRelationalOperator load = new LOLoad (null, plan); |
| plan.add( load ); |
| // LogicalRelationalOperator filter = new LOFilter( plan ); |
| // plan.add( filter ); |
| // plan.connect( load, filter ); |
| return plan; |
| } |
| |
| @Override |
| public Transformer getNewTransformer() { |
| return new PartitionFilterPushDownTransformer(); |
| } |
| |
| public class PartitionFilterPushDownTransformer extends Transformer { |
| protected OperatorSubPlan subPlan; |
| private boolean planChanged; |
| |
| @Override |
| public boolean check(OperatorPlan matched) throws FrontendException { |
| loLoad = (LOLoad)matched.getSources().get(0); |
| // Match filter. |
| List<Operator> succeds = currentPlan.getSuccessors( loLoad ); |
| if( succeds == null || succeds.size() == 0 || !( succeds.get(0) instanceof LOFilter ) ) |
| return false; |
| loFilter = (LOFilter)succeds.get(0); |
| |
| // we have to check more only if LoadFunc implements LoadMetada |
| loadFunc = loLoad.getLoadFunc(); |
| if(!( loadFunc instanceof LoadMetadata ) ) { |
| return false; |
| } |
| |
| loadMetadata = (LoadMetadata)loadFunc; |
| try { |
| partitionKeys = loadMetadata.getPartitionKeys( |
| loLoad.getFileSpec().getFileName(), new Job( loLoad.getConfiguration() ) ); |
| } catch (IOException e) { |
| throw new FrontendException( e ); |
| } |
| if( partitionKeys == null || partitionKeys.length == 0 ) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| @Override |
| public OperatorPlan reportChanges() { |
| // Return null in case there is no partition filter extracted |
| // which means the plan hasn't changed. |
| // If not return the modified plan which has filters removed. |
| return planChanged ? subPlan : null; |
| } |
| |
| @Override |
| public void transform(OperatorPlan matched) throws FrontendException { |
| subPlan = new OperatorSubPlan( currentPlan ); |
| |
| setupColNameMaps(); |
| |
| FilterExtractor filterFinder = new PartitionFilterExtractor(loFilter.getFilterPlan(), |
| getMappedKeys(partitionKeys)); |
| filterFinder.visit(); |
| LOG.info("Partition keys are " + Arrays.asList(partitionKeys)); |
| Expression partitionFilter = filterFinder.getPushDownExpression(); |
| |
| if(partitionFilter != null) { |
| // the column names in the filter may be the ones provided by |
| // the user in the schema in the load statement - we may need |
| // to replace them with partition column names as given by |
| // LoadFunc.getSchema() |
| updateMappedColNames(partitionFilter); |
| try { |
| LOG.info("Setting partition filter [" + partitionFilter + "] on loader " + loadMetadata); |
| loadMetadata.setPartitionFilter(partitionFilter); |
| planChanged = true; |
| } catch (IOException e) { |
| throw new FrontendException( e ); |
| } |
| if(filterFinder.isFilterRemovable()) { |
| currentPlan.removeAndReconnect( loFilter ); |
| } else { |
| loFilter.setFilterPlan(filterFinder.getFilteredPlan()); |
| } |
| } |
| } |
| |
| protected void updateMappedColNames(Expression expr) { |
| if(expr instanceof BinaryExpression) { |
| updateMappedColNames(((BinaryExpression) expr).getLhs()); |
| updateMappedColNames(((BinaryExpression) expr).getRhs()); |
| } else if (expr instanceof Column) { |
| Column col = (Column) expr; |
| col.setName(reverseColNameMap.get(col.getName())); |
| } |
| } |
| |
| /** |
| * The partition keys in the argument are as reported by |
| * {@link LoadMetadata#getPartitionKeys(String, org.apache.hadoop.conf.Configuration)}. |
| * The user may have renamed these by providing a schema with different names |
| * in the load statement - this method will replace the former names with |
| * the latter names. |
| * @param partitionKeys |
| * @return |
| */ |
| protected List<String> getMappedKeys(String[] partitionKeys) { |
| List<String> mappedKeys = new ArrayList<String>(partitionKeys.length); |
| for (int i = 0; i < partitionKeys.length; i++) { |
| mappedKeys.add(colNameMap.get(partitionKeys[i])); |
| } |
| return mappedKeys; |
| } |
| |
| protected void setupColNameMaps() throws FrontendException { |
| LogicalSchema loLoadSchema = loLoad.getSchema(); |
| LogicalSchema loadFuncSchema = loLoad.getDeterminedSchema(); |
| for(int i = 0; i < loadFuncSchema.size(); i++) { |
| colNameMap.put(loadFuncSchema.getField(i).alias, |
| (i < loLoadSchema.size() ? loLoadSchema.getField(i).alias : |
| loadFuncSchema.getField(i).alias)); |
| |
| reverseColNameMap.put((i < loLoadSchema.size() ? loLoadSchema.getField(i).alias : |
| loadFuncSchema.getField(i).alias), |
| loadFuncSchema.getField(i).alias); |
| } |
| } |
| |
| } |
| |
| } |