blob: 17065207697b90212e2502805585eaf8cfd8e569 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BetweenPredicate;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.BinaryPredicate.Operator;
import org.apache.impala.analysis.CompoundPredicate;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.InPredicate;
import org.apache.impala.analysis.IsNullPredicate;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.analysis.NullLiteral;
import org.apache.impala.analysis.SlotId;
import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.catalog.FeFsPartition;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.PrunablePartition;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
import org.apache.impala.rewrite.BetweenToCompoundRule;
import org.apache.impala.rewrite.ExprRewriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
* HDFS partitions pruner provides a mechanism to filter out partitions of an HDFS
* table based on the conjuncts provided by the caller.
*
* The pruner is initialized with a TupleDescriptor for the slots being materialized.
* The prunePartitions() method is the external interface exposed by this class. It
* takes a list of conjuncts, loops through all the partitions and prunes them based
* on applicable conjuncts. It returns a list of partitions left after applying all
* the conjuncts and also removes the conjuncts which have been fully evaluated with
* the partition columns.
*
* The pruner does not update referenced partitions in the DescriptorTable because
* not all users of this class require the resulting partitions to be serialized, e.g.,
* DDL commands.
* It is up to the user of this class to mark referenced partitions as needed.
*/
public class HdfsPartitionPruner {
private final static Logger LOG = LoggerFactory.getLogger(HdfsPartitionPruner.class);
// Partition batch size used during partition pruning.
private final static int PARTITION_PRUNING_BATCH_SIZE = 1024;
private final FeFsTable tbl_;
private final List<SlotId> partitionSlots_;
// For converting BetweenPredicates to CompoundPredicates so they can be
// executed in the BE.
private final ExprRewriter exprRewriter_ =
new ExprRewriter(BetweenToCompoundRule.INSTANCE);
public HdfsPartitionPruner(TupleDescriptor tupleDesc) {
Preconditions.checkState(tupleDesc.getTable() instanceof FeFsTable);
tbl_ = (FeFsTable)tupleDesc.getTable();
partitionSlots_ = tupleDesc.getPartitionSlots();
}
/**
* Return a list of partitions left after applying the conjuncts.
* Conjuncts used for filtering will be removed from the list 'conjuncts' and
* returned as the second item in the returned Pair. These expressions can be
* shown in the EXPLAIN output.
*
* If 'allowEmpty' is False, empty partitions are not returned.
*/
public Pair<List<? extends FeFsPartition>, List<Expr>> prunePartitions(
Analyzer analyzer, List<Expr> conjuncts, boolean allowEmpty)
throws ImpalaException {
// Start with creating a collection of partition filters for the applicable conjuncts.
List<HdfsPartitionFilter> partitionFilters = new ArrayList<>();
// Conjuncts that can be evaluated from the partition key values.
List<Expr> simpleFilterConjuncts = new ArrayList<>();
List<Expr> partitionConjuncts = new ArrayList<>();
// Simple predicates (e.g. binary predicates of the form
// <SlotRef> <op> <LiteralExpr>) can be used to derive lists
// of matching partition ids directly from the partition key values.
// Split conjuncts among those that can be evaluated from partition
// key values and those that need to be evaluated in the BE.
Iterator<Expr> it = conjuncts.iterator();
while (it.hasNext()) {
Expr conjunct = it.next();
if (conjunct.isBoundBySlotIds(partitionSlots_) &&
!conjunct.contains(Expr.IS_NONDETERMINISTIC_BUILTIN_FN_PREDICATE)) {
// Check if the conjunct can be evaluated from the partition metadata.
// Use a cloned conjunct to rewrite BetweenPredicates and allow
// canEvalUsingPartitionMd() to fold constant expressions without modifying
// the original expr.
Expr clonedConjunct = exprRewriter_.rewrite(conjunct.clone(), analyzer);
if (canEvalUsingPartitionMd(clonedConjunct, analyzer)) {
simpleFilterConjuncts.add(Expr.pushNegationToOperands(clonedConjunct));
} else {
partitionFilters.add(new HdfsPartitionFilter(clonedConjunct, tbl_, analyzer));
}
partitionConjuncts.add(conjunct);
it.remove();
}
}
// Set of matching partition ids, i.e. partitions that pass all filters
Set<Long> matchingPartitionIds = null;
// Evaluate the partition filters from the partition key values.
// The result is the intersection of the associated partition id sets.
for (Expr filter: simpleFilterConjuncts) {
// Evaluate the filter
Set<Long> matchingIds = evalSlotBindingFilter(filter);
if (matchingPartitionIds == null) {
matchingPartitionIds = matchingIds;
} else {
matchingPartitionIds.retainAll(matchingIds);
}
}
// Check if we need to initialize the set of valid partition ids.
if (simpleFilterConjuncts.size() == 0) {
Preconditions.checkState(matchingPartitionIds == null);
matchingPartitionIds = Sets.newHashSet(tbl_.getPartitionIds());
}
// Evaluate the 'complex' partition filters in the BE.
evalPartitionFiltersInBe(partitionFilters, matchingPartitionIds, analyzer);
// Populate the list of valid, non-empty partitions to process
List<? extends FeFsPartition> results = tbl_.loadPartitions(
matchingPartitionIds);
if (!allowEmpty) {
results = Lists.newArrayList(Iterables.filter(results,
new Predicate<FeFsPartition>() {
@Override
public boolean apply(FeFsPartition partition) {
return partition.hasFileDescriptors();
}
}));
}
return new Pair<>(results, partitionConjuncts);
}
/**
* Recursive function that checks if a given partition expr can be evaluated
* directly from the partition key values. If 'expr' contains any constant expressions,
* they are evaluated in the BE and are replaced by their corresponding results, as
* LiteralExprs.
*/
private boolean canEvalUsingPartitionMd(Expr expr, Analyzer analyzer) {
Preconditions.checkNotNull(expr);
Preconditions.checkState(!(expr instanceof BetweenPredicate));
if (expr instanceof BinaryPredicate) {
// Evaluate any constant expression in the BE
try {
// TODO: Analyzer should already have done constant folding
// and rewrite -- unless this is a copy of an expression taken
// before analysis, which would introduce its own issues.
expr = analyzer.getConstantFolder().rewrite(expr, analyzer);
Preconditions.checkState(expr instanceof BinaryPredicate);
} catch (AnalysisException e) {
LOG.error("Error evaluating constant expressions in the BE: " + e.getMessage());
return false;
}
BinaryPredicate bp = (BinaryPredicate)expr;
if (bp.getChild(0).isImplicitCast()) return false;
SlotRef slot = bp.getBoundSlot();
if (slot == null) return false;
Expr bindingExpr = bp.getSlotBinding(slot.getSlotId());
if (bindingExpr == null || !Expr.IS_LITERAL.apply(bindingExpr)) return false;
return true;
} else if (expr instanceof CompoundPredicate) {
boolean res = canEvalUsingPartitionMd(expr.getChild(0), analyzer);
if (expr.getChild(1) != null) {
res &= canEvalUsingPartitionMd(expr.getChild(1), analyzer);
}
return res;
} else if (expr instanceof IsNullPredicate) {
// Check for SlotRef IS [NOT] NULL case
IsNullPredicate nullPredicate = (IsNullPredicate)expr;
return nullPredicate.getBoundSlot() != null;
} else if (expr instanceof InPredicate) {
// Evaluate any constant expressions in the BE
try {
analyzer.getConstantFolder().rewrite(expr, analyzer);
} catch (AnalysisException e) {
LOG.error("Error evaluating constant expressions in the BE: " + e.getMessage());
return false;
}
// Check for SlotRef [NOT] IN (Literal, ... Literal) case
SlotRef slot = ((InPredicate)expr).getBoundSlot();
if (slot == null) return false;
for (int i = 1; i < expr.getChildren().size(); ++i) {
if (!Expr.IS_LITERAL.apply(expr.getChild(i))) return false;
}
return true;
}
return false;
}
/**
* Evaluate a BinaryPredicate filter on a partition column and return the
* ids of the matching partitions. An empty set is returned if there
* are no matching partitions.
*/
private Set<Long> evalBinaryPredicate(Expr expr) {
Preconditions.checkNotNull(expr);
Preconditions.checkState(expr instanceof BinaryPredicate);
// TODO: Note that rewrite rules should have ensured that the slot
// is on the left.
boolean isSlotOnLeft = true;
if (Expr.IS_LITERAL.apply(expr.getChild(0))) isSlotOnLeft = false;
// Get the operands
BinaryPredicate bp = (BinaryPredicate)expr;
SlotRef slot = bp.getBoundSlot();
Preconditions.checkNotNull(slot);
Expr bindingExpr = bp.getSlotBinding(slot.getSlotId());
Preconditions.checkNotNull(bindingExpr);
Preconditions.checkState(Expr.IS_LITERAL.apply(bindingExpr));
LiteralExpr literal = (LiteralExpr)bindingExpr;
Operator op = bp.getOp();
if (Expr.IS_NULL_LITERAL.apply(literal) && (op != Operator.NOT_DISTINCT)
&& (op != Operator.DISTINCT_FROM)) {
return new HashSet<>();
}
// Get the partition column position and retrieve the associated partition
// value metadata.
int partitionPos = slot.getDesc().getColumn().getPosition();
TreeMap<LiteralExpr, Set<Long>> partitionValueMap =
tbl_.getPartitionValueMap(partitionPos);
if (partitionValueMap.isEmpty()) return new HashSet<>();
Set<Long> matchingIds = new HashSet<>();
// Compute the matching partition ids
if (op == Operator.NOT_DISTINCT) {
// Case: SlotRef <=> Literal
if (Expr.IS_NULL_LITERAL.apply(literal)) {
Set<Long> ids = tbl_.getNullPartitionIds(partitionPos);
if (ids != null) matchingIds.addAll(ids);
return matchingIds;
}
// Punt to equality case:
op = Operator.EQ;
}
if (op == Operator.EQ) {
// Case: SlotRef = Literal
Set<Long> ids = partitionValueMap.get(literal);
if (ids != null) matchingIds.addAll(ids);
return matchingIds;
}
if (op == Operator.DISTINCT_FROM) {
// Case: SlotRef IS DISTINCT FROM Literal
matchingIds.addAll(tbl_.getPartitionIds());
if (Expr.IS_NULL_LITERAL.apply(literal)) {
Set<Long> nullIds = tbl_.getNullPartitionIds(partitionPos);
matchingIds.removeAll(nullIds);
} else {
Set<Long> ids = partitionValueMap.get(literal);
if (ids != null) matchingIds.removeAll(ids);
}
return matchingIds;
}
if (op == Operator.NE) {
// Case: SlotRef != Literal
matchingIds.addAll(tbl_.getPartitionIds());
Set<Long> nullIds = tbl_.getNullPartitionIds(partitionPos);
matchingIds.removeAll(nullIds);
Set<Long> ids = partitionValueMap.get(literal);
if (ids != null) matchingIds.removeAll(ids);
return matchingIds;
}
// Determine the partition key value range of this predicate.
NavigableMap<LiteralExpr, Set<Long>> rangeValueMap = null;
LiteralExpr firstKey = partitionValueMap.firstKey();
LiteralExpr lastKey = partitionValueMap.lastKey();
boolean upperInclusive = false;
boolean lowerInclusive = false;
LiteralExpr upperBoundKey = null;
LiteralExpr lowerBoundKey = null;
if (((op == Operator.LE || op == Operator.LT) && isSlotOnLeft) ||
((op == Operator.GE || op == Operator.GT) && !isSlotOnLeft)) {
// Case: SlotRef <[=] Literal
if (literal.compareTo(firstKey) < 0) return new HashSet<>();
if (op == Operator.LE || op == Operator.GE) upperInclusive = true;
if (literal.compareTo(lastKey) <= 0) {
upperBoundKey = literal;
} else {
upperBoundKey = lastKey;
upperInclusive = true;
}
lowerBoundKey = firstKey;
lowerInclusive = true;
} else {
// Cases: SlotRef >[=] Literal
if (literal.compareTo(lastKey) > 0) return new HashSet<>();
if (op == Operator.GE || op == Operator.LE) lowerInclusive = true;
if (literal.compareTo(firstKey) >= 0) {
lowerBoundKey = literal;
} else {
lowerBoundKey = firstKey;
lowerInclusive = true;
}
upperBoundKey = lastKey;
upperInclusive = true;
}
// Retrieve the submap that corresponds to the computed partition key
// value range.
rangeValueMap = partitionValueMap.subMap(lowerBoundKey, lowerInclusive,
upperBoundKey, upperInclusive);
// Compute the matching partition ids
for (Set<Long> idSet: rangeValueMap.values()) {
if (idSet != null) matchingIds.addAll(idSet);
}
return matchingIds;
}
/**
* Evaluate an InPredicate filter on a partition column and return the ids of
* the matching partitions.
*/
private Set<Long> evalInPredicate(Expr expr) {
Preconditions.checkNotNull(expr);
Preconditions.checkState(expr instanceof InPredicate);
InPredicate inPredicate = (InPredicate)expr;
Set<Long> matchingIds = new HashSet<>();
SlotRef slot = inPredicate.getBoundSlot();
Preconditions.checkNotNull(slot);
int partitionPos = slot.getDesc().getColumn().getPosition();
TreeMap<LiteralExpr, Set<Long>> partitionValueMap =
tbl_.getPartitionValueMap(partitionPos);
if (inPredicate.isNotIn()) {
// Case: SlotRef NOT IN (Literal, ..., Literal)
// If there is a NullLiteral, return an empty set.
List<Expr> nullLiterals = new ArrayList<>();
inPredicate.collectAll(Predicates.instanceOf(NullLiteral.class), nullLiterals);
if (!nullLiterals.isEmpty()) return matchingIds;
matchingIds.addAll(tbl_.getPartitionIds());
// Exclude partitions with null partition column values
Set<Long> nullIds = tbl_.getNullPartitionIds(partitionPos);
matchingIds.removeAll(nullIds);
}
// Compute the matching partition ids
for (int i = 1; i < inPredicate.getChildren().size(); ++i) {
LiteralExpr literal = (LiteralExpr)inPredicate.getChild(i);
Set<Long> idSet = partitionValueMap.get(literal);
if (idSet != null) {
if (inPredicate.isNotIn()) {
matchingIds.removeAll(idSet);
} else {
matchingIds.addAll(idSet);
}
}
}
return matchingIds;
}
/**
* Evaluate an IsNullPredicate on a partition column and return the ids of the
* matching partitions.
*/
private Set<Long> evalIsNullPredicate(Expr expr) {
Preconditions.checkNotNull(expr);
Preconditions.checkState(expr instanceof IsNullPredicate);
Set<Long> matchingIds = new HashSet<>();
IsNullPredicate nullPredicate = (IsNullPredicate)expr;
SlotRef slot = nullPredicate.getBoundSlot();
Preconditions.checkNotNull(slot);
int partitionPos = slot.getDesc().getColumn().getPosition();
Set<Long> nullPartitionIds = tbl_.getNullPartitionIds(partitionPos);
if (nullPredicate.isNotNull()) {
matchingIds.addAll(tbl_.getPartitionIds());
matchingIds.removeAll(nullPartitionIds);
} else {
matchingIds.addAll(nullPartitionIds);
}
return matchingIds;
}
/**
* Evaluate a slot binding predicate on a partition key using the partition
* key values; return the matching partition ids. An empty set is returned
* if there are no matching partitions. This function can evaluate the following
* types of predicates: BinaryPredicate, CompoundPredicate, IsNullPredicate,
* InPredicate.
*/
private Set<Long> evalSlotBindingFilter(Expr expr) {
Preconditions.checkNotNull(expr);
Preconditions.checkState(!(expr instanceof BetweenPredicate));
if (expr instanceof BinaryPredicate) {
return evalBinaryPredicate(expr);
} else if (expr instanceof CompoundPredicate) {
Set<Long> leftChildIds = evalSlotBindingFilter(expr.getChild(0));
CompoundPredicate cp = (CompoundPredicate)expr;
// NOT operators have been eliminated
Preconditions.checkState(cp.getOp() != CompoundPredicate.Operator.NOT);
if (cp.getOp() == CompoundPredicate.Operator.AND) {
Set<Long> rightChildIds = evalSlotBindingFilter(expr.getChild(1));
leftChildIds.retainAll(rightChildIds);
} else if (cp.getOp() == CompoundPredicate.Operator.OR) {
Set<Long> rightChildIds = evalSlotBindingFilter(expr.getChild(1));
leftChildIds.addAll(rightChildIds);
}
return leftChildIds;
} else if (expr instanceof InPredicate) {
return evalInPredicate(expr);
} else if (expr instanceof IsNullPredicate) {
return evalIsNullPredicate(expr);
}
return null;
}
/**
* Evaluate a list of HdfsPartitionFilters in the BE. These are 'complex'
* filters that could not be evaluated from the partition key values.
*/
private void evalPartitionFiltersInBe(List<HdfsPartitionFilter> filters,
Set<Long> matchingPartitionIds, Analyzer analyzer) throws ImpalaException {
Map<Long, ? extends PrunablePartition> partitionMap = tbl_.getPartitionMap();
// Set of partition ids that pass a filter
Set<Long> matchingIds = new HashSet<>();
// Batch of partitions
List<PrunablePartition> partitionBatch = new ArrayList<>();
// Identify the partitions that pass all filters.
for (HdfsPartitionFilter filter: filters) {
// Iterate through the currently valid partitions
for (Long id: matchingPartitionIds) {
PrunablePartition p = partitionMap.get(id);
Preconditions.checkState(
p.getPartitionValues().size() == tbl_.getNumClusteringCols());
// Add the partition to the current batch
partitionBatch.add(partitionMap.get(id));
if (partitionBatch.size() == PARTITION_PRUNING_BATCH_SIZE) {
// Batch is full. Evaluate the predicates of this batch in the BE.
matchingIds.addAll(filter.getMatchingPartitionIds(partitionBatch, analyzer));
partitionBatch.clear();
}
}
// Check if there are any unprocessed partitions.
if (!partitionBatch.isEmpty()) {
matchingIds.addAll(filter.getMatchingPartitionIds(partitionBatch, analyzer));
partitionBatch.clear();
}
// Prune the partitions ids that didn't pass the filter
matchingPartitionIds.retainAll(matchingIds);
matchingIds.clear();
}
}
}