blob: 8356d78d30152099a64abd2cad9f6f4fae1b2b60 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.BinaryPredicate.Operator;
import org.apache.impala.analysis.CastExpr;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.FunctionCallExpr;
import org.apache.impala.analysis.IsNullPredicate;
import org.apache.impala.analysis.Predicate;
import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotId;
import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.analysis.TupleId;
import org.apache.impala.analysis.TupleIsNullPredicate;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.IdGenerator;
import org.apache.impala.common.InternalException;
import org.apache.impala.planner.JoinNode.DistributionMode;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TEnabledRuntimeFilterTypes;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TRuntimeFilterDesc;
import org.apache.impala.thrift.TRuntimeFilterMode;
import org.apache.impala.thrift.TRuntimeFilterTargetDesc;
import org.apache.impala.thrift.TRuntimeFilterType;
import org.apache.impala.util.BitUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* Class used for generating and assigning runtime filters to a query plan using
* runtime filter propagation. Runtime filter propagation is an optimization technique
* used to filter scanned tuples or scan ranges based on information collected at
* runtime. A runtime filter is constructed during the build phase of a join node, and is
* applied at, potentially, multiple scan nodes on the probe side of that join node.
* Runtime filters are generated from equi-join predicates but they do not replace the
* original predicates.
*
* MinMax filters are of a fixed size (except for those used for string type) and
* therefore only sizes for bloom filters need to be calculated. These calculations are
* based on the NDV estimates of the associated table columns, the min buffer size that
* can be allocated by the bufferpool, and the query options. Moreover, it is also bound
* by the MIN/MAX_BLOOM_FILTER_SIZE limits which are enforced on the query options before
* this phase of planning.
*
* Example: select * from T1, T2 where T1.a = T2.b and T2.c = '1';
* Assuming that T1 is a fact table and T2 is a significantly smaller dimension table, a
* runtime filter is constructed at the join node between tables T1 and T2 while building
* the hash table on the values of T2.b (rhs of the join condition) from the tuples of T2
* that satisfy predicate T2.c = '1'. The runtime filter is subsequently sent to the
* scan node of table T1 and is applied on the values of T1.a (lhs of the join condition)
* to prune tuples of T2 that cannot be part of the join result.
*
* TODO: Consider combining multiple filters, that are applied to the same scan node,
* into a single filter.
*/
public final class RuntimeFilterGenerator {
private final static Logger LOG =
LoggerFactory.getLogger(RuntimeFilterGenerator.class);
// Should be in sync with corresponding values in runtime-filter-bank.cc.
private static final long MIN_BLOOM_FILTER_SIZE = 4 * 1024;
private static final long MAX_BLOOM_FILTER_SIZE = 512 * 1024 * 1024;
// Map of base table tuple ids to a list of runtime filters that
// can be applied at the corresponding scan nodes.
private final Map<TupleId, List<RuntimeFilter>> runtimeFiltersByTid_ =
new HashMap<>();
// Generator for filter ids
private final IdGenerator<RuntimeFilterId> filterIdGenerator =
RuntimeFilterId.createGenerator();
/**
* Internal class that encapsulates the max, min and default sizes used for creating
* bloom filter objects.
*/
private class FilterSizeLimits {
// Maximum filter size, in bytes, rounded up to a power of two.
public final long maxVal;
// Minimum filter size, in bytes, rounded up to a power of two.
public final long minVal;
// Pre-computed default filter size, in bytes, rounded up to a power of two.
public final long defaultVal;
// Target false positive probability, between 0 and 1 exclusive.
public final double targetFpp;
public FilterSizeLimits(TQueryOptions tQueryOptions) {
// Round up all limits to a power of two and make sure filter size is more
// than the min buffer size that can be allocated by the buffer pool.
long maxLimit = tQueryOptions.getRuntime_filter_max_size();
long minBufferSize = BackendConfig.INSTANCE.getMinBufferSize();
maxVal = BitUtil.roundUpToPowerOf2(Math.max(maxLimit, minBufferSize));
long minLimit = tQueryOptions.getRuntime_filter_min_size();
minLimit = Math.max(minLimit, minBufferSize);
// Make sure minVal <= defaultVal <= maxVal
minVal = BitUtil.roundUpToPowerOf2(Math.min(minLimit, maxVal));
long defaultValue = tQueryOptions.getRuntime_bloom_filter_size();
defaultValue = Math.max(defaultValue, minVal);
defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultValue, maxVal));
// Target FPP is determined by runtime_filter_error_rate query option, or if that
// is not set, --max_filter_error_rate, which was the legacy option controlling
// this that also had other effects.
targetFpp = tQueryOptions.isSetRuntime_filter_error_rate() ?
tQueryOptions.getRuntime_filter_error_rate() :
BackendConfig.INSTANCE.getMaxFilterErrorRate();
}
};
// Contains size limits for bloom filters.
private FilterSizeLimits bloomFilterSizeLimits_;
private RuntimeFilterGenerator(TQueryOptions tQueryOptions) {
bloomFilterSizeLimits_ = new FilterSizeLimits(tQueryOptions);
};
/**
* Internal representation of a runtime filter. A runtime filter is generated from
* an equi-join predicate of the form <lhs_expr> = <rhs_expr>, where lhs_expr is the
* expr on which the filter is applied and must be bound by a single tuple id from
* the left plan subtree of the associated join node, while rhs_expr is the expr on
* which the filter is built and can be bound by any number of tuple ids from the
* right plan subtree. Every runtime filter must record the join node that constructs
* the filter and the scan nodes that apply the filter (destination nodes).
*/
public static class RuntimeFilter {
// Identifier of the filter (unique within a query)
private final RuntimeFilterId id_;
// Join node that builds the filter
private final JoinNode src_;
// Expr (rhs of join predicate) on which the filter is built
private final Expr srcExpr_;
// Expr (lhs of join predicate) from which the targetExprs_ are generated.
private final Expr origTargetExpr_;
// The operator comparing 'srcExpr_' and 'origTargetExpr_'.
private final Operator exprCmpOp_;
// Runtime filter targets
private final List<RuntimeFilterTarget> targets_ = new ArrayList<>();
// Slots from base table tuples that have value transfer from the slots
// of 'origTargetExpr_'. The slots are grouped by tuple id.
private final Map<TupleId, List<SlotId>> targetSlotsByTid_;
// If true, the join node building this filter is executed using a broadcast join;
// set in the DistributedPlanner.createHashJoinFragment()
private boolean isBroadcastJoin_;
// Estimate of the number of distinct values that will be inserted into this filter,
// globally across all instances of the source node. Used to compute an optimal size
// for the filter. A value of -1 means no estimate is available, and default filter
// parameters should be used.
private long ndvEstimate_ = -1;
// Size of the filter (in Bytes). Should be greater than zero for bloom filters.
private long filterSizeBytes_ = 0;
// If true, the filter is produced by a broadcast join and there is at least one
// destination scan node which is in the same fragment as the join; set in
// DistributedPlanner.createHashJoinFragment().
private boolean hasLocalTargets_ = false;
// If true, there is at least one destination scan node which is not in the same
// fragment as the join that produced the filter; set in
// DistributedPlanner.createHashJoinFragment().
private boolean hasRemoteTargets_ = false;
// If set, indicates that the filter can't be assigned to another scan node.
// Once set, it can't be unset.
private boolean finalized_ = false;
// The type of filter to build.
private final TRuntimeFilterType type_;
// If set, indicates that the filter is targeted for Kudu scan node with source
// timestamp truncation.
private boolean isTimestampTruncation_ = false;
/**
* Internal representation of a runtime filter target.
*/
private static class RuntimeFilterTarget {
// Scan node that applies the filter
public ScanNode node;
// Expr on which the filter is applied
public Expr expr;
// Indicates if 'expr' is bound only by partition columns
public final boolean isBoundByPartitionColumns;
// Indicates if 'node' is in the same fragment as the join that produces the filter
public final boolean isLocalTarget;
public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr,
boolean isBoundByPartitionColumns, boolean isLocalTarget) {
Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()));
node = targetNode;
expr = targetExpr;
this.isBoundByPartitionColumns = isBoundByPartitionColumns;
this.isLocalTarget = isLocalTarget;
}
public TRuntimeFilterTargetDesc toThrift() {
TRuntimeFilterTargetDesc tFilterTarget = new TRuntimeFilterTargetDesc();
tFilterTarget.setNode_id(node.getId().asInt());
tFilterTarget.setTarget_expr(expr.treeToThrift());
List<SlotId> sids = new ArrayList<>();
expr.getIds(null, sids);
List<Integer> tSlotIds = Lists.newArrayListWithCapacity(sids.size());
for (SlotId sid: sids) tSlotIds.add(sid.asInt());
tFilterTarget.setTarget_expr_slotids(tSlotIds);
tFilterTarget.setIs_bound_by_partition_columns(isBoundByPartitionColumns);
tFilterTarget.setIs_local_target(isLocalTarget);
if (node instanceof KuduScanNode) {
// assignRuntimeFilters() only assigns KuduScanNode targets if the target expr
// is a slot ref, possibly with an implicit cast, pointing to a column.
SlotRef slotRef = expr.unwrapSlotRef(true);
KuduColumn col = (KuduColumn) slotRef.getDesc().getColumn();
tFilterTarget.setKudu_col_name(col.getKuduName());
tFilterTarget.setKudu_col_type(col.getType().toThrift());
}
return tFilterTarget;
}
@Override
public String toString() {
StringBuilder output = new StringBuilder();
return output.append("Target Id: " + node.getId() + " ")
.append("Target expr: " + expr.debugString() + " ")
.append("Partition columns: " + isBoundByPartitionColumns)
.append("Is local: " + isLocalTarget)
.toString();
}
}
private RuntimeFilter(RuntimeFilterId filterId, JoinNode filterSrcNode, Expr srcExpr,
Expr origTargetExpr, Operator exprCmpOp, Map<TupleId, List<SlotId>> targetSlots,
TRuntimeFilterType type, FilterSizeLimits filterSizeLimits,
boolean isTimestampTruncation) {
id_ = filterId;
src_ = filterSrcNode;
srcExpr_ = srcExpr;
origTargetExpr_ = origTargetExpr;
exprCmpOp_ = exprCmpOp;
targetSlotsByTid_ = targetSlots;
type_ = type;
isTimestampTruncation_ = isTimestampTruncation;
computeNdvEstimate();
calculateFilterSize(filterSizeLimits);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof RuntimeFilter)) return false;
return ((RuntimeFilter) obj).id_.equals(id_);
}
@Override
public int hashCode() { return id_.hashCode(); }
public void markFinalized() { finalized_ = true; }
public boolean isFinalized() { return finalized_; }
/**
* Serializes a runtime filter to Thrift.
*/
public TRuntimeFilterDesc toThrift() {
TRuntimeFilterDesc tFilter = new TRuntimeFilterDesc();
tFilter.setFilter_id(id_.asInt());
tFilter.setSrc_expr(srcExpr_.treeToThrift());
tFilter.setSrc_node_id(src_.getId().asInt());
tFilter.setIs_broadcast_join(isBroadcastJoin_);
tFilter.setNdv_estimate(ndvEstimate_);
tFilter.setHas_local_targets(hasLocalTargets_);
tFilter.setHas_remote_targets(hasRemoteTargets_);
boolean appliedOnPartitionColumns = true;
for (int i = 0; i < targets_.size(); ++i) {
RuntimeFilterTarget target = targets_.get(i);
tFilter.addToTargets(target.toThrift());
tFilter.putToPlanid_to_target_ndx(target.node.getId().asInt(), i);
appliedOnPartitionColumns =
appliedOnPartitionColumns && target.isBoundByPartitionColumns;
}
tFilter.setApplied_on_partition_columns(appliedOnPartitionColumns);
tFilter.setType(type_);
tFilter.setFilter_size_bytes(filterSizeBytes_);
return tFilter;
}
/**
* Static function to create a RuntimeFilter from 'joinPredicate' that is assigned
* to the join node 'filterSrcNode'. Returns an instance of RuntimeFilter
* or null if a runtime filter cannot be generated from the specified predicate.
*/
public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen,
Analyzer analyzer, Expr joinPredicate, JoinNode filterSrcNode,
TRuntimeFilterType type, FilterSizeLimits filterSizeLimits,
boolean isTimestampTruncation) {
Preconditions.checkNotNull(idGen);
Preconditions.checkNotNull(joinPredicate);
Preconditions.checkNotNull(filterSrcNode);
// Only consider binary equality predicates
if (!Predicate.isEquivalencePredicate(joinPredicate)) return null;
BinaryPredicate normalizedJoinConjunct =
SingleNodePlanner.getNormalizedEqPred(joinPredicate,
filterSrcNode.getChild(0).getTupleIds(),
filterSrcNode.getChild(1).getTupleIds(), analyzer);
if (normalizedJoinConjunct == null) return null;
// Ensure that the target expr does not contain TupleIsNull predicates as these
// can't be evaluated at a scan node.
Expr targetExpr =
TupleIsNullPredicate.unwrapExpr(normalizedJoinConjunct.getChild(0).clone());
Expr srcExpr = normalizedJoinConjunct.getChild(1);
if (isTimestampTruncation) {
Preconditions.checkArgument(srcExpr.isAnalyzed());
Preconditions.checkArgument(srcExpr.getType() == Type.TIMESTAMP);
Expr toUnixTimeExpr =
new FunctionCallExpr("utc_to_unix_micros", Lists.newArrayList(srcExpr));
try {
toUnixTimeExpr.analyze(analyzer);
} catch (AnalysisException e) {
// Expr analysis failed. Skip this runtime filter since we cannot serialize
// it to thrift.
LOG.warn("Skipping runtime filter because analysis failed: "
+ toUnixTimeExpr.toSql(),
e);
return null;
}
srcExpr = toUnixTimeExpr;
}
Map<TupleId, List<SlotId>> targetSlots = getTargetSlots(analyzer, targetExpr);
Preconditions.checkNotNull(targetSlots);
if (targetSlots.isEmpty()) return null;
if (LOG.isTraceEnabled()) {
LOG.trace("Generating runtime filter from predicate " + joinPredicate);
}
return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, targetExpr,
normalizedJoinConjunct.getOp(), targetSlots, type, filterSizeLimits,
isTimestampTruncation);
}
/**
* Returns the ids of base table tuple slots on which a runtime filter expr can be
* applied. Due to the existence of equivalence classes, a filter expr may be
* applicable at multiple scan nodes. The returned slot ids are grouped by tuple id.
* Returns an empty collection if the filter expr cannot be applied at a base table
* or if applying the filter might lead to incorrect results.
*/
private static Map<TupleId, List<SlotId>> getTargetSlots(Analyzer analyzer,
Expr expr) {
// 'expr' is not a SlotRef and may contain multiple SlotRefs
List<TupleId> tids = new ArrayList<>();
List<SlotId> sids = new ArrayList<>();
expr.getIds(tids, sids);
// IMPALA-6286: If the target expression evaluates to a non-NULL value for
// outer-join non-matches, then assigning the filter below the nullable side of
// an outer join may produce incorrect query results.
// This check is conservative but correct to keep the code simple. In particular,
// it would otherwise be difficult to identify incorrect runtime filter assignments
// through outer-joined inline views because the 'expr' has already been fully
// resolved. We rely on the value-transfer graph to check whether 'expr' could
// potentially be assigned below an outer-joined inline view.
if (analyzer.hasOuterJoinedValueTransferTarget(sids)) {
Expr isNotNullPred = new IsNullPredicate(expr, true);
isNotNullPred.analyzeNoThrow(analyzer);
try {
if (analyzer.isTrueWithNullSlots(isNotNullPred)) return Collections.emptyMap();
} catch (InternalException e) {
// Expr evaluation failed in the backend. Skip this runtime filter since we
// cannot determine whether it is safe to assign it.
LOG.warn("Skipping runtime filter because backend evaluation failed: "
+ isNotNullPred.toSql(), e);
return Collections.emptyMap();
}
}
Map<TupleId, List<SlotId>> slotsByTid = new HashMap<>();
// We need to iterate over all the slots of 'expr' and check if they have
// equivalent slots that are bound by the same base table tuple(s).
for (SlotId slotId: sids) {
Map<TupleId, List<SlotId>> currSlotsByTid =
getBaseTblEquivSlots(analyzer, slotId);
if (currSlotsByTid.isEmpty()) return Collections.emptyMap();
if (slotsByTid.isEmpty()) {
slotsByTid.putAll(currSlotsByTid);
continue;
}
// Compute the intersection between tuple ids from 'slotsByTid' and
// 'currSlotsByTid'. If the intersection is empty, an empty collection
// is returned.
Iterator<Map.Entry<TupleId, List<SlotId>>> iter =
slotsByTid.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<TupleId, List<SlotId>> entry = iter.next();
List<SlotId> slotIds = currSlotsByTid.get(entry.getKey());
if (slotIds == null) {
iter.remove();
} else {
entry.getValue().addAll(slotIds);
}
}
if (slotsByTid.isEmpty()) return Collections.emptyMap();
}
return slotsByTid;
}
/**
* Static function that returns the ids of slots bound by base table tuples for which
* there is a value transfer from 'srcSid'. The slots are grouped by tuple id.
*/
private static Map<TupleId, List<SlotId>> getBaseTblEquivSlots(Analyzer analyzer,
SlotId srcSid) {
Map<TupleId, List<SlotId>> slotsByTid = new HashMap<>();
for (SlotId targetSid: analyzer.getValueTransferTargets(srcSid)) {
TupleDescriptor tupleDesc = analyzer.getSlotDesc(targetSid).getParent();
if (tupleDesc.getTable() == null) continue;
List<SlotId> sids = slotsByTid.get(tupleDesc.getId());
if (sids == null) {
sids = new ArrayList<>();
slotsByTid.put(tupleDesc.getId(), sids);
}
sids.add(targetSid);
}
return slotsByTid;
}
public Expr getTargetExpr(PlanNodeId targetPlanNodeId) {
for (RuntimeFilterTarget target: targets_) {
if (target.node.getId() != targetPlanNodeId) continue;
return target.expr;
}
return null;
}
public List<RuntimeFilterTarget> getTargets() { return targets_; }
public boolean hasTargets() { return !targets_.isEmpty(); }
public Expr getSrcExpr() { return srcExpr_; }
public Expr getOrigTargetExpr() { return origTargetExpr_; }
public Map<TupleId, List<SlotId>> getTargetSlots() { return targetSlotsByTid_; }
public RuntimeFilterId getFilterId() { return id_; }
public TRuntimeFilterType getType() { return type_; }
public Operator getExprCompOp() { return exprCmpOp_; }
public long getFilterSize() { return filterSizeBytes_; }
public boolean isTimestampTruncation() { return isTimestampTruncation_; }
/**
* Return TIMESTAMP if the isTimestampTruncation_ is set as true so that
* the source expr type could be matching with target expr type when
* assigning bloom filter to target scan node.
*/
public Type getSrcExprType() {
if (!isTimestampTruncation_) {
return srcExpr_.getType();
} else {
return Type.TIMESTAMP;
}
}
/**
* Estimates the selectivity of a runtime filter as the cardinality of the
* associated source join node over the cardinality of that join node's left
* child.
*/
public double getSelectivity() {
if (src_.getCardinality() == -1
|| src_.getChild(0).getCardinality() == -1
|| src_.getChild(0).getCardinality() == 0) {
return -1;
}
return src_.getCardinality() / (double) src_.getChild(0).getCardinality();
}
public void addTarget(RuntimeFilterTarget target) { targets_.add(target); }
public void setIsBroadcast(boolean isBroadcast) { isBroadcastJoin_ = isBroadcast; }
public void computeNdvEstimate() { ndvEstimate_ = src_.getChild(1).getCardinality(); }
public void computeHasLocalTargets() {
Preconditions.checkNotNull(src_.getFragment());
Preconditions.checkState(hasTargets());
for (RuntimeFilterTarget target: targets_) {
Preconditions.checkNotNull(target.node.getFragment());
hasLocalTargets_ = hasLocalTargets_ || target.isLocalTarget;
hasRemoteTargets_ = hasRemoteTargets_ || !target.isLocalTarget;
}
}
/**
* Sets the filter size (in bytes) required for a bloom filter to achieve the
* configured maximum false-positive rate based on the expected NDV. Also bounds the
* filter size between the max and minimum filter sizes supplied to it by
* 'filterSizeLimits'.
*/
private void calculateFilterSize(FilterSizeLimits filterSizeLimits) {
if (type_ == TRuntimeFilterType.MIN_MAX) return;
if (ndvEstimate_ == -1) {
filterSizeBytes_ = filterSizeLimits.defaultVal;
return;
}
double targetFpp = filterSizeLimits.targetFpp;
int logFilterSize = FeSupport.GetMinLogSpaceForBloomFilter(ndvEstimate_, targetFpp);
filterSizeBytes_ = 1L << logFilterSize;
filterSizeBytes_ = Math.max(filterSizeBytes_, filterSizeLimits.minVal);
filterSizeBytes_ = Math.min(filterSizeBytes_, filterSizeLimits.maxVal);
}
/**
* Assigns this runtime filter to the corresponding plan nodes.
*/
public void assignToPlanNodes() {
Preconditions.checkState(hasTargets());
src_.addRuntimeFilter(this);
for (RuntimeFilterTarget target: targets_) target.node.addRuntimeFilter(this);
}
public String debugString() {
StringBuilder output = new StringBuilder();
return output.append("FilterID: " + id_ + " ")
.append("Source: " + src_.getId() + " ")
.append("SrcExpr: " + getSrcExpr().debugString() + " ")
.append("Target(s): ")
.append(Joiner.on(", ").join(targets_) + " ")
.append("Selectivity: " + getSelectivity()).toString();
}
}
/**
* Generates and assigns runtime filters to a query plan tree.
*/
public static void generateRuntimeFilters(PlannerContext ctx, PlanNode plan) {
Preconditions.checkNotNull(ctx);
Preconditions.checkNotNull(ctx.getQueryOptions());
int maxNumBloomFilters = ctx.getQueryOptions().getMax_num_runtime_filters();
Preconditions.checkState(maxNumBloomFilters >= 0);
RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator(
ctx.getQueryOptions());
filterGenerator.generateFilters(ctx, plan);
List<RuntimeFilter> filters = Lists.newArrayList(filterGenerator.getRuntimeFilters());
if (filters.size() > maxNumBloomFilters) {
// If more than 'maxNumBloomFilters' were generated, sort them by increasing
// selectivity and keep the 'maxNumBloomFilters' most selective bloom filters.
Collections.sort(filters, new Comparator<RuntimeFilter>() {
@Override
public int compare(RuntimeFilter a, RuntimeFilter b) {
double aSelectivity =
a.getSelectivity() == -1 ? Double.MAX_VALUE : a.getSelectivity();
double bSelectivity =
b.getSelectivity() == -1 ? Double.MAX_VALUE : b.getSelectivity();
return Double.compare(aSelectivity, bSelectivity);
}
}
);
}
// We only enforce a limit on the number of bloom filters as they are much more
// heavy-weight than the other filter types.
int numBloomFilters = 0;
for (RuntimeFilter filter : filters) {
if (filter.getType() == TRuntimeFilterType.BLOOM) {
if (numBloomFilters >= maxNumBloomFilters) continue;
++numBloomFilters;
}
filter.setIsBroadcast(
filter.src_.getDistributionMode() == DistributionMode.BROADCAST);
filter.computeHasLocalTargets();
if (LOG.isTraceEnabled()) LOG.trace("Runtime filter: " + filter.debugString());
filter.assignToPlanNodes();
}
}
/**
* Returns a list of all the registered runtime filters, ordered by filter ID.
*/
public List<RuntimeFilter> getRuntimeFilters() {
Set<RuntimeFilter> resultSet = new HashSet<>();
for (List<RuntimeFilter> filters: runtimeFiltersByTid_.values()) {
resultSet.addAll(filters);
}
List<RuntimeFilter> resultList = Lists.newArrayList(resultSet);
Collections.sort(resultList, new Comparator<RuntimeFilter>() {
@Override
public int compare(RuntimeFilter a, RuntimeFilter b) {
return a.getFilterId().compareTo(b.getFilterId());
}
}
);
return resultList;
}
/**
* Generates the runtime filters for a query by recursively traversing the distributed
* plan tree rooted at 'root'. In the top-down traversal of the plan tree, candidate
* runtime filters are generated from equi-join predicates assigned to hash-join nodes.
* In the bottom-up traversal of the plan tree, the filters are assigned to destination
* (scan) nodes. Filters that cannot be assigned to a scan node are discarded.
*/
private void generateFilters(PlannerContext ctx, PlanNode root) {
if (root instanceof HashJoinNode) {
HashJoinNode joinNode = (HashJoinNode) root;
List<Expr> joinConjuncts = new ArrayList<>();
if (!joinNode.getJoinOp().isLeftOuterJoin()
&& !joinNode.getJoinOp().isFullOuterJoin()
&& !joinNode.getJoinOp().isAntiJoin()) {
// It's not correct to push runtime filters to the left side of a left outer,
// full outer or anti join if the filter corresponds to an equi-join predicate
// from the ON clause.
joinConjuncts.addAll(joinNode.getEqJoinConjuncts());
}
joinConjuncts.addAll(joinNode.getConjuncts());
List<RuntimeFilter> filters = new ArrayList<>();
for (TRuntimeFilterType filterType : TRuntimeFilterType.values()) {
for (Expr conjunct : joinConjuncts) {
RuntimeFilter filter =
RuntimeFilter.create(filterIdGenerator, ctx.getRootAnalyzer(), conjunct,
joinNode, filterType, bloomFilterSizeLimits_,
/* isTimestampTruncation */ false);
if (filter != null) {
registerRuntimeFilter(filter);
filters.add(filter);
}
// For timestamp bloom filters, we also generate a RuntimeFilter with the
// src timestamp truncated for Kudu scan node targets.
if (filterType == TRuntimeFilterType.BLOOM
&& Predicate.isEquivalencePredicate(conjunct)
&& conjunct.getChild(0).getType().isTimestamp()
&& conjunct.getChild(1).getType().isTimestamp()) {
RuntimeFilter filter2 =
RuntimeFilter.create(filterIdGenerator, ctx.getRootAnalyzer(), conjunct,
joinNode, filterType, bloomFilterSizeLimits_,
/* isTimestampTruncation */ true);
if (filter2 == null) continue;
registerRuntimeFilter(filter2);
filters.add(filter2);
}
}
}
generateFilters(ctx, root.getChild(0));
// Finalize every runtime filter of that join. This is to ensure that we don't
// assign a filter to a scan node from the right subtree of joinNode or ancestor
// join nodes in case we don't find a destination node in the left subtree.
for (RuntimeFilter runtimeFilter: filters) finalizeRuntimeFilter(runtimeFilter);
generateFilters(ctx, root.getChild(1));
} else if (root instanceof ScanNode) {
assignRuntimeFilters(ctx, (ScanNode) root);
} else {
for (PlanNode childNode: root.getChildren()) {
generateFilters(ctx, childNode);
}
}
}
/**
* Registers a runtime filter with the tuple id of every scan node that is a candidate
* destination node for that filter.
*/
private void registerRuntimeFilter(RuntimeFilter filter) {
Map<TupleId, List<SlotId>> targetSlotsByTid = filter.getTargetSlots();
Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty());
for (TupleId tupleId: targetSlotsByTid.keySet()) {
registerRuntimeFilter(filter, tupleId);
}
}
/**
* Registers a runtime filter with a specific target tuple id.
*/
private void registerRuntimeFilter(RuntimeFilter filter, TupleId targetTid) {
Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
List<RuntimeFilter> filters = runtimeFiltersByTid_.get(targetTid);
if (filters == null) {
filters = new ArrayList<>();
runtimeFiltersByTid_.put(targetTid, filters);
}
Preconditions.checkState(!filter.isFinalized());
filters.add(filter);
}
/**
* Finalizes a runtime filter by disassociating it from all the candidate target scan
* nodes that haven't been used as destinations for that filter. Also sets the
* finalized_ flag of that filter so that it can't be assigned to any other scan nodes.
*/
private void finalizeRuntimeFilter(RuntimeFilter runtimeFilter) {
Set<TupleId> targetTupleIds = new HashSet<>();
for (RuntimeFilter.RuntimeFilterTarget target: runtimeFilter.getTargets()) {
targetTupleIds.addAll(target.node.getTupleIds());
}
for (TupleId tupleId: runtimeFilter.getTargetSlots().keySet()) {
if (!targetTupleIds.contains(tupleId)) {
runtimeFiltersByTid_.get(tupleId).remove(runtimeFilter);
}
}
runtimeFilter.markFinalized();
}
/**
* Assigns runtime filters to a specific scan node 'scanNode'.
* The assigned filters are the ones for which 'scanNode' can be used as a destination
* node. The following constraints are enforced when assigning filters to 'scanNode':
* 1. If the DISABLE_ROW_RUNTIME_FILTERING query option is set, a filter is only
* assigned to 'scanNode' if the filter target expression is bound by partition
* columns.
* 2. If the RUNTIME_FILTER_MODE query option is set to LOCAL, a filter is only assigned
* to 'scanNode' if the filter is produced within the same fragment that contains the
* scan node.
* 3. Only Hdfs and Kudu scan nodes are supported:
* a. If the target is an HdfsScanNode, the filter must be type BLOOM.
* b. If the target is a KuduScanNode, the filter could be type MIN_MAX, and/or
* BLOOM, the target must be a slot ref on a column, and the comp op cannot
* be 'not distinct'.
* A scan node may be used as a destination node for multiple runtime filters.
*/
private void assignRuntimeFilters(PlannerContext ctx, ScanNode scanNode) {
if (!(scanNode instanceof HdfsScanNode || scanNode instanceof KuduScanNode)) return;
TupleId tid = scanNode.getTupleIds().get(0);
if (!runtimeFiltersByTid_.containsKey(tid)) return;
Analyzer analyzer = ctx.getRootAnalyzer();
boolean disableRowRuntimeFiltering =
ctx.getQueryOptions().isDisable_row_runtime_filtering();
TRuntimeFilterMode runtimeFilterMode = ctx.getQueryOptions().getRuntime_filter_mode();
TEnabledRuntimeFilterTypes enabledRuntimeFilterTypes =
ctx.getQueryOptions().getEnabled_runtime_filter_types();
for (RuntimeFilter filter: runtimeFiltersByTid_.get(tid)) {
if (filter.isFinalized()) continue;
Expr targetExpr = computeTargetExpr(filter, tid, analyzer);
if (targetExpr == null) continue;
boolean isBoundByPartitionColumns = isBoundByPartitionColumns(analyzer, targetExpr,
scanNode);
if (disableRowRuntimeFiltering && !isBoundByPartitionColumns) continue;
boolean isLocalTarget = isLocalTarget(filter, scanNode);
if (runtimeFilterMode == TRuntimeFilterMode.LOCAL && !isLocalTarget) continue;
// Check that the scan node supports applying filters of this type and targetExpr.
if (scanNode instanceof HdfsScanNode) {
if (filter.getType() != TRuntimeFilterType.BLOOM
|| filter.isTimestampTruncation()) {
continue;
}
} else {
Preconditions.checkState(scanNode instanceof KuduScanNode);
if (filter.getType() == TRuntimeFilterType.BLOOM) {
if (enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.BLOOM
&& enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.ALL) {
continue;
}
// TODO: Support Kudu VARCHAR Bloom Filter
if (targetExpr.getType().isVarchar()) continue;
// Kudu only supports targeting a single column, not general exprs, so the
// target must be a SlotRef pointing to a column without casting.
// For timestamp bloom filter, assign it to Kudu if it has src timestamp
// truncation.
if (!(targetExpr instanceof SlotRef)
|| filter.getExprCompOp() == Operator.NOT_DISTINCT
|| (targetExpr.getType().isTimestamp()
&& !filter.isTimestampTruncation())) {
continue;
}
SlotRef slotRef = (SlotRef) targetExpr;
if (slotRef.getDesc().getColumn() == null) continue;
} else {
Preconditions.checkState(filter.getType() == TRuntimeFilterType.MIN_MAX);
if (enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.MIN_MAX
&& enabledRuntimeFilterTypes != TEnabledRuntimeFilterTypes.ALL) {
continue;
}
// TODO: IMPALA-9580: Support Kudu VARCHAR Min/Max Filters
if (targetExpr.getType().isVarchar()) continue;
SlotRef slotRef = targetExpr.unwrapSlotRef(true);
// Kudu only supports targeting a single column, not general exprs, so the
// target must be a SlotRef pointing to a column. We can allow implicit
// integer casts by casting the min/max values before sending them to Kudu.
// Kudu also cannot currently return nulls if a filter is applied, so it
// does not work with "is not distinct".
if (slotRef == null || slotRef.getDesc().getColumn() == null
|| (targetExpr instanceof CastExpr && !targetExpr.getType().isIntegerType())
|| filter.getExprCompOp() == Operator.NOT_DISTINCT) {
continue;
}
}
}
RuntimeFilter.RuntimeFilterTarget target = new RuntimeFilter.RuntimeFilterTarget(
scanNode, targetExpr, isBoundByPartitionColumns, isLocalTarget);
filter.addTarget(target);
}
}
/**
* Check if 'targetNode' is local to the source node of 'filter'.
*/
static private boolean isLocalTarget(RuntimeFilter filter, ScanNode targetNode) {
return targetNode.getFragment().getId().equals(filter.src_.getFragment().getId());
}
/**
* Check if all the slots of 'targetExpr' are bound by partition columns.
*/
static private boolean isBoundByPartitionColumns(Analyzer analyzer, Expr targetExpr,
ScanNode targetNode) {
Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()));
TupleDescriptor baseTblDesc = targetNode.getTupleDesc();
FeTable tbl = baseTblDesc.getTable();
if (tbl.getNumClusteringCols() == 0) return false;
List<SlotId> sids = new ArrayList<>();
targetExpr.getIds(null, sids);
for (SlotId sid : sids) {
SlotDescriptor slotDesc = analyzer.getSlotDesc(sid);
if (slotDesc.getColumn() == null
|| slotDesc.getColumn().getPosition() >= tbl.getNumClusteringCols()) {
return false;
}
}
return true;
}
/**
* Computes the target expr for a specified runtime filter 'filter' to be applied at
* the scan node with target tuple descriptor 'targetTid'.
*/
private Expr computeTargetExpr(RuntimeFilter filter, TupleId targetTid,
Analyzer analyzer) {
Expr targetExpr = filter.getOrigTargetExpr();
if (!targetExpr.isBound(targetTid)) {
Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
// Modify the filter target expr using the equivalent slots from the scan node
// on which the filter will be applied.
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> exprSlots = new ArrayList<>();
targetExpr.collect(SlotRef.class, exprSlots);
List<SlotId> sids = filter.getTargetSlots().get(targetTid);
for (SlotRef slotRef: exprSlots) {
for (SlotId sid: sids) {
if (analyzer.hasValueTransfer(slotRef.getSlotId(), sid)) {
SlotRef newSlotRef = new SlotRef(analyzer.getSlotDesc(sid));
newSlotRef.analyzeNoThrow(analyzer);
smap.put(slotRef, newSlotRef);
break;
}
}
}
Preconditions.checkState(exprSlots.size() == smap.size());
try {
targetExpr = targetExpr.substitute(smap, analyzer, false);
} catch (Exception e) {
return null;
}
}
Type srcType = filter.getSrcExprType();
// Types of targetExpr and srcExpr must be exactly the same since runtime filters are
// based on hashing.
if (!targetExpr.getType().equals(srcType)) {
try {
targetExpr = targetExpr.castTo(srcType);
} catch (Exception e) {
return null;
}
}
return targetExpr;
}
}