blob: 625e83291100a7654bcad1b77ed98a0c74d3bc04 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.impala.analysis.AggregateInfo;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.CaseExpr;
import org.apache.impala.analysis.CaseWhenClause;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.FunctionCallExpr;
import org.apache.impala.analysis.MultiAggregateInfo;
import org.apache.impala.analysis.MultiAggregateInfo.AggPhase;
import org.apache.impala.analysis.NumericLiteral;
import org.apache.impala.analysis.TupleId;
import org.apache.impala.analysis.ValidTupleIdExpr;
import org.apache.impala.common.InternalException;
import org.apache.impala.thrift.TAggregationNode;
import org.apache.impala.thrift.TAggregator;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TExpr;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.util.BitUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* Aggregation computation.
*
*/
public class AggregationNode extends PlanNode {
private static final Logger LOG = LoggerFactory.getLogger(AggregationNode.class);
// Default per-instance memory requirement used if no valid stats are available.
// TODO: Come up with a more useful heuristic.
private final static long DEFAULT_PER_INSTANCE_MEM = 128L * 1024L * 1024L;
// Conservative minimum size of hash table for low-cardinality aggregations.
private final static long MIN_HASH_TBL_MEM = 10L * 1024L * 1024L;
// Default skew factor to account for data skew among fragment instances.
private final static double DEFAULT_SKEW_FACTOR = 1.5;
private final MultiAggregateInfo multiAggInfo_;
private final AggPhase aggPhase_;
// Aggregation-class infos derived from 'multiAggInfo_' and 'aggPhase_' in c'tor.
private final List<AggregateInfo> aggInfos_;
// If true, this node produces intermediate aggregation tuples.
private boolean useIntermediateTuple_ = false;
// If true, this node performs the finalize step.
private boolean needsFinalize_ = false;
// If true, this node uses streaming preaggregation. Invalid if this is a merge agg.
private boolean useStreamingPreagg_ = false;
// Resource profiles for each aggregation class.
private List<ResourceProfile> resourceProfiles_;
public AggregationNode(
PlanNodeId id, PlanNode input, MultiAggregateInfo multiAggInfo, AggPhase aggPhase) {
super(id, "AGGREGATE");
children_.add(input);
multiAggInfo_ = multiAggInfo;
aggInfos_ = multiAggInfo_.getMaterializedAggInfos(aggPhase);
aggPhase_ = aggPhase;
needsFinalize_ = true;
computeTupleIds();
}
/**
* Copy c'tor used in clone().
*/
private AggregationNode(PlanNodeId id, AggregationNode src) {
super(id, src, "AGGREGATE");
multiAggInfo_ = src.multiAggInfo_;
aggPhase_ = src.aggPhase_;
aggInfos_ = src.aggInfos_;
needsFinalize_ = src.needsFinalize_;
useIntermediateTuple_ = src.useIntermediateTuple_;
}
@Override
public void computeTupleIds() {
clearTupleIds();
for (AggregateInfo aggInfo : aggInfos_) {
TupleId aggClassTupleId = null;
if (useIntermediateTuple_) {
aggClassTupleId = aggInfo.getIntermediateTupleId();
} else {
aggClassTupleId = aggInfo.getOutputTupleId();
}
tupleIds_.add(aggClassTupleId);
tblRefIds_.add(aggClassTupleId);
// Nullable tuples are only required to distinguish between multiple
// aggregation classes.
if (aggInfos_.size() > 1) {
nullableTupleIds_.add(aggClassTupleId);
}
}
}
/**
* Sets this node as a preaggregation.
*/
public void setIsPreagg(PlannerContext ctx) {
if (ctx.getQueryOptions().disable_streaming_preaggregations) {
useStreamingPreagg_ = false;
return;
}
for (AggregateInfo aggInfo : aggInfos_) {
if (aggInfo.getGroupingExprs().size() > 0) {
useStreamingPreagg_ = true;
return;
}
}
}
/**
* Unsets this node as requiring finalize. Only valid to call this if it is
* currently marked as needing finalize.
*/
public void unsetNeedsFinalize() {
Preconditions.checkState(needsFinalize_);
needsFinalize_ = false;
}
public void setIntermediateTuple() {
useIntermediateTuple_ = true;
computeTupleIds();
}
public MultiAggregateInfo getMultiAggInfo() { return multiAggInfo_; }
public AggPhase getAggPhase() { return aggPhase_; }
public boolean hasGrouping() { return multiAggInfo_.hasGrouping(); }
public boolean isSingleClassAgg() { return aggInfos_.size() == 1; }
public boolean isDistinctAgg() {
for (AggregateInfo aggInfo : aggInfos_) {
if (aggInfo.isDistinctAgg()) return true;
}
return false;
}
@Override
public boolean isBlockingNode() { return !useStreamingPreagg_; }
@Override
public void init(Analyzer analyzer) throws InternalException {
Preconditions.checkState(tupleIds_.size() == aggInfos_.size());
// Assign conjuncts to the top-most agg in the single-node plan. They are transferred
// to the proper place in the distributed plan via transferConjuncts().
if (aggPhase_ == multiAggInfo_.getConjunctAssignmentPhase()) {
conjuncts_.clear();
// TODO: If this is the transposition phase, then we can push conjuncts that
// reference a single aggregation class down into the aggregators of the
// previous phase.
conjuncts_.addAll(multiAggInfo_.collectConjuncts(analyzer, true));
conjuncts_ = orderConjunctsByCost(conjuncts_);
}
// Compute the mem layout for both tuples here for simplicity.
for (AggregateInfo aggInfo : aggInfos_) {
aggInfo.getOutputTupleDesc().computeMemLayout();
aggInfo.getIntermediateTupleDesc().computeMemLayout();
}
// Do at the end so it can take all conjuncts into account
computeStats(analyzer);
// don't call createDefaultSMap(), it would point our conjuncts (= Having clause)
// to our input; our conjuncts don't get substituted because they already
// refer to our output
outputSmap_ = getCombinedChildSmap();
// Substitute exprs and check consistency.
// All of the AggregationNodes corresponding to a MultiAggregationInfo will have the
// same outputSmap_, so just substitute it once.
if (aggPhase_ == AggPhase.FIRST) multiAggInfo_.substitute(outputSmap_, analyzer);
for (AggregateInfo aggInfo : aggInfos_) {
aggInfo.substitute(outputSmap_, analyzer);
aggInfo.checkConsistency();
}
}
@Override
public void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
// TODO: IMPALA-2945: this doesn't correctly take into account duplicate keys on
// multiple nodes in a pre-aggregation.
cardinality_ = 0;
for (AggregateInfo aggInfo : aggInfos_) {
// Compute the cardinality for this set of grouping exprs.
long numGroups = estimateNumGroups(aggInfo);
Preconditions.checkState(numGroups >= -1, numGroups);
if (numGroups == -1) {
// No estimate of the number of groups is possible, can't even make a
// conservative estimate.
cardinality_ = -1;
break;
}
cardinality_ = checkedAdd(cardinality_, numGroups);
}
// Take conjuncts into account.
if (cardinality_ > 0) {
cardinality_ = applyConjunctsSelectivity(cardinality_);
}
cardinality_ = capCardinalityAtLimit(cardinality_);
}
/**
* Estimate the number of groups that will be present for the aggregation class
* described by 'aggInfo'.
* Returns -1 if a reasonable cardinality estimate cannot be produced.
*/
private long estimateNumGroups(AggregateInfo aggInfo) {
// This is prone to overestimation, because we keep multiplying cardinalities,
// even if the grouping exprs are functionally dependent (example:
// group by the primary key of a table plus a number of other columns from that
// same table). We limit the estimate to the estimated number of input row to
// limit the potential overestimation. We could, in future, improve this further
// by recognizing functional dependencies.
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (groupingExprs.isEmpty()) {
// Non-grouping aggregation class - always results in one group even if there are
// zero input rows.
return 1;
}
long numGroups = Expr.getNumDistinctValues(groupingExprs);
// Sanity check the cardinality_ based on the input cardinality_.
long aggInputCardinality = getAggInputCardinality();
if (LOG.isTraceEnabled()) {
LOG.trace("Node " + id_ + " numGroups= " + numGroups + " aggInputCardinality=" +
aggInputCardinality + " for agg class " + aggInfo.debugString());
}
if (numGroups == -1) {
// A worst-case cardinality_ is better than an unknown cardinality_.
// Note that this will still be -1 if the child's cardinality is unknown.
return aggInputCardinality;
}
// We have a valid estimate of the number of groups. Cap it at number of input
// rows because an aggregation cannot increase the cardinality_.
if (aggInputCardinality >= 0) {
numGroups = Math.min(aggInputCardinality, numGroups);
}
return numGroups;
}
/**
* Compute the input cardinality to the distributed aggregation. If this is a
* merge aggregation, we need to find the cardinality of the input to the
* preaggregation.
* Return -1 if unknown.
*/
private long getAggInputCardinality() {
PlanNode child = getChild(0);
if (!aggPhase_.isMerge()) return child.getCardinality();
PlanNode preAgg;
if (child instanceof ExchangeNode) {
preAgg = child.getChild(0);
} else {
preAgg = child;
}
Preconditions.checkState(preAgg instanceof AggregationNode);
return preAgg.getChild(0).getCardinality();
}
/**
* Returns a list of exprs suitable for hash partitioning the output of this node
* before the merge aggregation step. Only valid to call if this node is not a merge
* or transposing aggregation. The returned exprs are bound by the intermediate tuples.
* Takes the SHUFFLE_DISTINCT_EXPRS query option into account.
*
* For single-class aggregations the returned exprs are typically the grouping exprs.
* With SHUFFLE_DISTINCT_EXPRS=true the distinct exprs are also included if this is the
* non-merge first-phase aggregation of a distinct aggregation.
*
* For multi-class aggregations the returned exprs are a list of CASE exprs which you
* can think of as a "union" of the merge partition exprs of each class. Each CASE
* switches on the valid tuple id of an input row to determine the aggregation class,
* and selects the corresponding partition expr.
* The main challenges with crafting these exprs are:
* 1. Different aggregation classes can have a different number of distinct exprs
* Solution: The returned list is maximally wide to accommodate the widest
* aggregation class. For classes that have fewer than the max distinct exprs we add
* constant dummy exprs in the corresponding branch of the CASE.
* 2. A CASE expr must return a single output type, but different aggregation classes
* may have incompatible distinct exprs, so selecting the distinct exprs directly
* in the CASE branches would not always work (unless we cast everything to STRING,
* which we try to avoid). Instead, we call MURMUR_HASH() on the exprs to produce
* a hash value. That way, all branches of a CASE return the same type.
* Considering that the backend will do another round of hashing, there's an
* unnecessary double hashing here that we deemed acceptable for now and has
* potential for cleanup (maybe the FE should always add a MURMUR_HASH()).
* The handling of SHUFFLE_DISTINCT_EXPRS is analogous to the single-class case.
*
* Example:
* SELECT COUNT(DISTINCT a,b), COUNT(DISTINCT c) FROM t GROUP BY d
* Suppose the two aggregation classes have intermediate tuple ids 0 and 1.
*
* Challenges explained on this example:
* 1. First class has distinct exprs a,b and second class has c. We need to accommodate
* the widest class (a,b) and also hash on the grouping expr (d), so there will be
* three cases.
* 2. The types of a and c might be incompatible
*
* The first-phase partition exprs are a list of the following 3 exprs:
* CASE valid_tid()
* WHEN 0 THEN murmur_hash(d) <-- d SlotRef into tuple 0
* WHEN 1 THEN murmur_hash(d) <-- d SlotRef into tuple 1
* END,
* CASE valid_tid()
* WHEN 0 THEN murmur_hash(a)
* WHEN 1 THEN murmur_hash(c)
* END,
* CASE valid_tid()
* WHEN 0 THEN murmur_hash(b)
* WHEN 1 THEN 0 <-- dummy constant integer
* END
*/
public List<Expr> getMergePartitionExprs(Analyzer analyzer) {
Preconditions.checkState(!tupleIds_.isEmpty());
Preconditions.checkState(!aggPhase_.isMerge() && !aggPhase_.isTranspose());
boolean shuffleDistinctExprs = analyzer.getQueryOptions().shuffle_distinct_exprs;
if (aggInfos_.size() == 1) {
AggregateInfo aggInfo = aggInfos_.get(0);
List<Expr> groupingExprs = null;
if (aggPhase_.isFirstPhase() && hasGrouping() && !shuffleDistinctExprs) {
groupingExprs = multiAggInfo_.getSubstGroupingExprs();
} else {
groupingExprs = aggInfo.getPartitionExprs();
if (groupingExprs == null) groupingExprs = aggInfo.getGroupingExprs();
}
return Expr.substituteList(
groupingExprs, aggInfo.getIntermediateSmap(), analyzer, false);
}
int maxNumExprs = 0;
for (AggregateInfo aggInfo : aggInfos_) {
if (aggInfo.getGroupingExprs() == null) continue;
maxNumExprs = Math.max(maxNumExprs, aggInfo.getGroupingExprs().size());
}
if (maxNumExprs == 0) return Collections.emptyList();
List<Expr> result = new ArrayList<>();
for (int i = 0; i < maxNumExprs; ++i) {
List<CaseWhenClause> caseWhenClauses = new ArrayList<>();
for (AggregateInfo aggInfo : aggInfos_) {
TupleId tid;
if (aggInfo.isDistinctAgg()) {
tid = aggInfo.getOutputTupleId();
} else {
tid = aggInfo.getIntermediateTupleId();
}
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (aggPhase_.isFirstPhase() && hasGrouping() && !shuffleDistinctExprs) {
groupingExprs = multiAggInfo_.getSubstGroupingExprs();
}
Expr whenExpr = NumericLiteral.create(tid.asInt());
Expr thenExpr;
if (groupingExprs == null || i >= groupingExprs.size()) {
thenExpr = NumericLiteral.create(0);
} else {
thenExpr = new FunctionCallExpr(
"murmur_hash", Lists.newArrayList(groupingExprs.get(i).clone()));
thenExpr.analyzeNoThrow(analyzer);
thenExpr = thenExpr.substitute(aggInfo.getIntermediateSmap(), analyzer, true);
}
caseWhenClauses.add(new CaseWhenClause(whenExpr, thenExpr));
}
CaseExpr caseExpr =
new CaseExpr(new ValidTupleIdExpr(tupleIds_), caseWhenClauses, null);
caseExpr.analyzeNoThrow(analyzer);
result.add(caseExpr);
}
return result;
}
@Override
protected void toThrift(TPlanNode msg) {
msg.agg_node = new TAggregationNode();
msg.node_type = TPlanNodeType.AGGREGATION_NODE;
boolean replicateInput = aggPhase_ == AggPhase.FIRST && aggInfos_.size() > 1;
msg.agg_node.setReplicate_input(replicateInput);
msg.agg_node.setEstimated_input_cardinality(getChild(0).getCardinality());
for (int i = 0; i < aggInfos_.size(); ++i) {
AggregateInfo aggInfo = aggInfos_.get(i);
List<TExpr> aggregateFunctions = new ArrayList<>();
for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) {
aggregateFunctions.add(e.treeToThrift());
}
TAggregator taggregator = new TAggregator(aggregateFunctions,
aggInfo.getIntermediateTupleId().asInt(), aggInfo.getOutputTupleId().asInt(),
needsFinalize_, useStreamingPreagg_, resourceProfiles_.get(i).toThrift());
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (!groupingExprs.isEmpty()) {
taggregator.setGrouping_exprs(Expr.treesToThrift(groupingExprs));
}
msg.agg_node.addToAggregators(taggregator);
}
}
@Override
protected String getDisplayLabelDetail() {
if (useStreamingPreagg_) return "STREAMING";
if (needsFinalize_) return "FINALIZE";
return null;
}
@Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
String nameDetail = getDisplayLabelDetail();
output.append(String.format("%s%s", prefix, getDisplayLabel()));
if (nameDetail != null) output.append(" [" + nameDetail + "]");
output.append("\n");
if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
if (aggInfos_.size() == 1) {
output.append(
getAggInfoExplainString(detailPrefix, aggInfos_.get(0), detailLevel));
} else {
for (int i = 0; i < aggInfos_.size(); ++i) {
AggregateInfo aggInfo = aggInfos_.get(i);
output.append(String.format("%sClass %d\n", detailPrefix, i));
output.append(
getAggInfoExplainString(detailPrefix + " ", aggInfo, detailLevel));
}
}
if (!conjuncts_.isEmpty()) {
output.append(detailPrefix)
.append("having: ")
.append(Expr.getExplainString(conjuncts_, detailLevel))
.append("\n");
}
}
return output.toString();
}
private StringBuilder getAggInfoExplainString(
String prefix, AggregateInfo aggInfo, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
List<FunctionCallExpr> aggExprs = aggInfo.getMaterializedAggregateExprs();
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (!aggExprs.isEmpty()) {
output.append(prefix)
.append("output: ")
.append(Expr.getExplainString(aggExprs, detailLevel))
.append("\n");
}
if (!groupingExprs.isEmpty()) {
output.append(prefix)
.append("group by: ")
.append(Expr.getExplainString(groupingExprs, detailLevel))
.append("\n");
}
return output;
}
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
resourceProfiles_ = Lists.newArrayListWithCapacity(aggInfos_.size());
resourceProfiles_.clear();
for (AggregateInfo aggInfo : aggInfos_) {
resourceProfiles_.add(computeAggClassResourceProfile(queryOptions, aggInfo));
}
if (aggInfos_.size() == 1) {
nodeResourceProfile_ = resourceProfiles_.get(0);
} else {
nodeResourceProfile_ = ResourceProfile.noReservation(0);
for (ResourceProfile aggProfile : resourceProfiles_) {
nodeResourceProfile_ = nodeResourceProfile_.sum(aggProfile);
}
}
}
private ResourceProfile computeAggClassResourceProfile(
TQueryOptions queryOptions, AggregateInfo aggInfo) {
Preconditions.checkNotNull(
fragment_, "PlanNode must be placed into a fragment before calling this method.");
long perInstanceCardinality =
fragment_.getPerInstanceNdv(queryOptions.getMt_dop(), aggInfo.getGroupingExprs());
long perInstanceMemEstimate;
long perInstanceDataBytes = -1;
if (perInstanceCardinality == -1) {
perInstanceMemEstimate = DEFAULT_PER_INSTANCE_MEM;
} else {
// Per-instance cardinality cannot be greater than the total input cardinality.
long inputCardinality = getChild(0).getCardinality();
if (inputCardinality != -1) {
// Calculate the input cardinality distributed across fragment instances.
long numInstances = fragment_.getNumInstances(queryOptions.getMt_dop());
long perInstanceInputCardinality;
if (numInstances > 1) {
perInstanceInputCardinality =
(long) Math.ceil((inputCardinality / numInstances) * DEFAULT_SKEW_FACTOR);
} else {
// When numInstances is 1 or unknown(-1), perInstanceInputCardinality is the
// same as inputCardinality.
perInstanceInputCardinality = inputCardinality;
}
perInstanceCardinality =
Math.min(perInstanceCardinality, perInstanceInputCardinality);
}
perInstanceDataBytes = (long)Math.ceil(perInstanceCardinality * avgRowSize_);
perInstanceMemEstimate = (long)Math.max(perInstanceDataBytes *
PlannerContext.HASH_TBL_SPACE_OVERHEAD, MIN_HASH_TBL_MEM);
}
// Must be kept in sync with GroupingAggregator::MinReservation() in backend.
long perInstanceMinMemReservation;
long bufferSize = queryOptions.getDefault_spillable_buffer_size();
long maxRowBufferSize =
computeMaxSpillableBufferSize(bufferSize, queryOptions.getMax_row_size());
if (aggInfo.getGroupingExprs().isEmpty()) {
perInstanceMinMemReservation = 0;
} else {
// This is a grouping pre-aggregation or merge aggregation.
final int PARTITION_FANOUT = 16;
if (perInstanceDataBytes != -1) {
long bytesPerPartition = perInstanceDataBytes / PARTITION_FANOUT;
// Scale down the buffer size if we think there will be excess free space with the
// default buffer size, e.g. with small dimension tables.
bufferSize = Math.min(bufferSize, Math.max(
queryOptions.getMin_spillable_buffer_size(),
BitUtil.roundUpToPowerOf2(bytesPerPartition)));
// Recompute the max row buffer size with the smaller buffer.
maxRowBufferSize =
computeMaxSpillableBufferSize(bufferSize, queryOptions.getMax_row_size());
}
if (useStreamingPreagg_) {
// We can execute a streaming preagg without any buffers by passing through rows,
// but that is a very low performance mode of execution if the aggregation reduces
// its input significantly. Instead reserve memory for one buffer per partition
// and at least 64kb for hash tables per partition. We must reserve at least one
// full buffer for hash tables for the suballocator to subdivide. We don't need to
// reserve memory for large rows since they can be passed through if needed.
perInstanceMinMemReservation = bufferSize * PARTITION_FANOUT +
Math.max(64 * 1024 * PARTITION_FANOUT, bufferSize);
} else {
long minBuffers = PARTITION_FANOUT + 1 + (aggInfo.needsSerialize() ? 1 : 0);
// Two of the buffers need to be buffers large enough to hold the maximum-sized
// row to serve as input and output buffers while repartitioning.
perInstanceMinMemReservation = bufferSize * (minBuffers - 2) + maxRowBufferSize * 2;
}
}
return new ResourceProfileBuilder()
.setMemEstimateBytes(perInstanceMemEstimate)
.setMinMemReservationBytes(perInstanceMinMemReservation)
.setSpillableBufferBytes(bufferSize)
.setMaxRowBufferBytes(maxRowBufferSize)
.build();
}
}