blob: 2778a7f6c8dd07c997840b82730411bce34679b7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.impala.analysis.AggregateInfo;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.FunctionCallExpr;
import org.apache.impala.analysis.SlotId;
import org.apache.impala.common.InternalException;
import org.apache.impala.thrift.TAggregationNode;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TExpr;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.util.BitUtil;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
* Aggregation computation.
*
*/
public class AggregationNode extends PlanNode {
private final static Logger LOG = LoggerFactory.getLogger(AggregationNode.class);
// Default per-instance memory requirement used if no valid stats are available.
// TODO: Come up with a more useful heuristic.
private final static long DEFAULT_PER_INSTANCE_MEM = 128L * 1024L * 1024L;
// Conservative minimum size of hash table for low-cardinality aggregations.
private final static long MIN_HASH_TBL_MEM = 10L * 1024L * 1024L;
private final AggregateInfo aggInfo_;
// Set to true if this aggregation node needs to run the Finalize step. This
// node is the root node of a distributed aggregation.
private boolean needsFinalize_;
// If true, use streaming preaggregation algorithm. Not valid if this is a merge agg.
private boolean useStreamingPreagg_;
/**
* Create an agg node from aggInfo.
*/
public AggregationNode(PlanNodeId id, PlanNode input, AggregateInfo aggInfo) {
super(id, aggInfo.getOutputTupleId().asList(), "AGGREGATE");
aggInfo_ = aggInfo;
children_.add(input);
needsFinalize_ = true;
}
/**
* Copy c'tor used in clone().
*/
private AggregationNode(PlanNodeId id, AggregationNode src) {
super(id, src, "AGGREGATE");
aggInfo_ = src.aggInfo_;
needsFinalize_ = src.needsFinalize_;
}
public AggregateInfo getAggInfo() { return aggInfo_; }
/**
* Unsets this node as requiring finalize. Only valid to call this if it is
* currently marked as needing finalize.
*/
public void unsetNeedsFinalize() {
Preconditions.checkState(needsFinalize_);
needsFinalize_ = false;
}
/**
* Sets this node as a preaggregation. Only valid to call this if it is not marked
* as a preaggregation
*/
public void setIsPreagg(PlannerContext ctx_) {
TQueryOptions query_options = ctx_.getQueryOptions();
useStreamingPreagg_ = !query_options.disable_streaming_preaggregations &&
aggInfo_.getGroupingExprs().size() > 0;
}
/**
* Have this node materialize the aggregation's intermediate tuple instead of
* the output tuple.
*/
public void setIntermediateTuple() {
Preconditions.checkState(!tupleIds_.isEmpty());
Preconditions.checkState(tupleIds_.get(0).equals(aggInfo_.getOutputTupleId()));
tupleIds_.clear();
tupleIds_.add(aggInfo_.getIntermediateTupleId());
}
@Override
public boolean isBlockingNode() { return !useStreamingPreagg_; }
@Override
public void init(Analyzer analyzer) throws InternalException {
// Assign predicates to the top-most agg in the single-node plan that can evaluate
// them, as follows: For non-distinct aggs place them in the 1st phase agg node. For
// distinct aggs place them in the 2nd phase agg node. The conjuncts are
// transferred to the proper place in the multi-node plan via transferConjuncts().
if (tupleIds_.get(0).equals(aggInfo_.getResultTupleId()) && !aggInfo_.isMerge()) {
// Ignore predicates bound by a grouping slot produced by a SlotRef grouping expr.
// Those predicates are already evaluated below this agg node (e.g., in a scan),
// because the grouping slot must be in the same equivalence class as another slot
// below this agg node. We must not ignore other grouping slots in order to retain
// conjuncts bound by those grouping slots in createEquivConjuncts() (IMPALA-2089).
// Those conjuncts cannot be redundant because our equivalence classes do not
// capture dependencies with non-SlotRef exprs.
Set<SlotId> groupBySlots = Sets.newHashSet();
for (int i = 0; i < aggInfo_.getGroupingExprs().size(); ++i) {
if (aggInfo_.getGroupingExprs().get(i).unwrapSlotRef(true) == null) continue;
groupBySlots.add(aggInfo_.getOutputTupleDesc().getSlots().get(i).getId());
}
ArrayList<Expr> bindingPredicates =
analyzer.getBoundPredicates(tupleIds_.get(0), groupBySlots, true);
conjuncts_.addAll(bindingPredicates);
// also add remaining unassigned conjuncts_
assignConjuncts(analyzer);
analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_, groupBySlots);
}
conjuncts_ = orderConjunctsByCost(conjuncts_);
// Compute the mem layout for both tuples here for simplicity.
aggInfo_.getOutputTupleDesc().computeMemLayout();
aggInfo_.getIntermediateTupleDesc().computeMemLayout();
// do this at the end so it can take all conjuncts into account
computeStats(analyzer);
// don't call createDefaultSMap(), it would point our conjuncts (= Having clause)
// to our input; our conjuncts don't get substituted because they already
// refer to our output
outputSmap_ = getCombinedChildSmap();
aggInfo_.substitute(outputSmap_, analyzer);
// assert consistent aggregate expr and slot materialization
aggInfo_.checkConsistency();
}
@Override
public void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
// This is prone to overflow, because we keep multiplying cardinalities,
// even if the grouping exprs are functionally dependent (example:
// group by the primary key of a table plus a number of other columns from that
// same table)
// TODO: try to recognize functional dependencies
// TODO: as a shortcut, instead of recognizing functional dependencies,
// limit the contribution of a single table to the number of rows
// of that table (so that when we're grouping by the primary key col plus
// some others, the estimate doesn't overshoot dramatically)
// cardinality: product of # of distinct values produced by grouping exprs
// Any non-grouping aggregation has at least one distinct value
cardinality_ = aggInfo_.getGroupingExprs().isEmpty() ? 1 :
Expr.getNumDistinctValues(aggInfo_.getGroupingExprs());
// take HAVING predicate into account
if (LOG.isTraceEnabled()) {
LOG.trace("Agg: cardinality=" + Long.toString(cardinality_));
}
if (cardinality_ > 0) {
cardinality_ = Math.round((double) cardinality_ * computeSelectivity());
if (LOG.isTraceEnabled()) {
LOG.trace("sel=" + Double.toString(computeSelectivity()));
}
}
// if we ended up with an overflow, the estimate is certain to be wrong
if (cardinality_ < 0) cardinality_ = -1;
// Sanity check the cardinality_ based on the input cardinality_.
if (getChild(0).getCardinality() != -1) {
if (cardinality_ == -1) {
// A worst-case cardinality_ is better than an unknown cardinality_.
cardinality_ = getChild(0).getCardinality();
} else {
// An AggregationNode cannot increase the cardinality_.
cardinality_ = Math.min(getChild(0).getCardinality(), cardinality_);
}
}
cardinality_ = capAtLimit(cardinality_);
if (LOG.isTraceEnabled()) {
LOG.trace("stats Agg: cardinality=" + Long.toString(cardinality_));
}
}
@Override
protected String debugString() {
return Objects.toStringHelper(this)
.add("aggInfo", aggInfo_.debugString())
.addValue(super.debugString())
.toString();
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.AGGREGATION_NODE;
List<TExpr> aggregateFunctions = Lists.newArrayList();
// only serialize agg exprs that are being materialized
for (FunctionCallExpr e: aggInfo_.getMaterializedAggregateExprs()) {
aggregateFunctions.add(e.treeToThrift());
}
aggInfo_.checkConsistency();
msg.agg_node = new TAggregationNode(
aggregateFunctions,
aggInfo_.getIntermediateTupleId().asInt(),
aggInfo_.getOutputTupleId().asInt(), needsFinalize_,
useStreamingPreagg_,
getChild(0).getCardinality());
List<Expr> groupingExprs = aggInfo_.getGroupingExprs();
if (groupingExprs != null) {
msg.agg_node.setGrouping_exprs(Expr.treesToThrift(groupingExprs));
}
}
@Override
protected String getDisplayLabelDetail() {
if (useStreamingPreagg_) return "STREAMING";
if (needsFinalize_) return "FINALIZE";
return null;
}
@Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
String nameDetail = getDisplayLabelDetail();
output.append(String.format("%s%s", prefix, getDisplayLabel()));
if (nameDetail != null) output.append(" [" + nameDetail + "]");
output.append("\n");
if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
ArrayList<FunctionCallExpr> aggExprs = aggInfo_.getMaterializedAggregateExprs();
if (!aggExprs.isEmpty()) {
output.append(detailPrefix + "output: ")
.append(getExplainString(aggExprs) + "\n");
}
// TODO: is this the best way to display this. It currently would
// have DISTINCT_PC(DISTINCT_PC(col)) for the merge phase but not
// very obvious what that means if you don't already know.
// TODO: group by can be very long. Break it into multiple lines
if (!aggInfo_.getGroupingExprs().isEmpty()) {
output.append(detailPrefix + "group by: ")
.append(getExplainString(aggInfo_.getGroupingExprs()) + "\n");
}
if (!conjuncts_.isEmpty()) {
output.append(detailPrefix + "having: ")
.append(getExplainString(conjuncts_) + "\n");
}
}
return output.toString();
}
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
Preconditions.checkNotNull(
fragment_, "PlanNode must be placed into a fragment before calling this method.");
long perInstanceCardinality = fragment_.getPerInstanceNdv(
queryOptions.getMt_dop(), aggInfo_.getGroupingExprs());
long perInstanceMemEstimate;
long perInstanceDataBytes = -1;
if (perInstanceCardinality == -1) {
perInstanceMemEstimate = DEFAULT_PER_INSTANCE_MEM;
} else {
// Per-instance cardinality cannot be greater than the total output cardinality.
if (cardinality_ != -1) {
perInstanceCardinality = Math.min(perInstanceCardinality, cardinality_);
}
perInstanceDataBytes = (long)Math.ceil(perInstanceCardinality * avgRowSize_);
perInstanceMemEstimate = (long)Math.max(perInstanceDataBytes *
PlannerContext.HASH_TBL_SPACE_OVERHEAD, MIN_HASH_TBL_MEM);
}
// Must be kept in sync with PartitionedAggregationNode::MinReservation() in be.
long perInstanceMinReservation;
long bufferSize = queryOptions.getDefault_spillable_buffer_size();
long maxRowBufferSize =
computeMaxSpillableBufferSize(bufferSize, queryOptions.getMax_row_size());
if (aggInfo_.getGroupingExprs().isEmpty()) {
perInstanceMinReservation = 0;
} else {
// This is a grouping pre-aggregation or merge aggregation.
final int PARTITION_FANOUT = 16;
if (perInstanceDataBytes != -1) {
long bytesPerPartition = perInstanceDataBytes / PARTITION_FANOUT;
// Scale down the buffer size if we think there will be excess free space with the
// default buffer size, e.g. with small dimension tables.
bufferSize = Math.min(bufferSize, Math.max(
queryOptions.getMin_spillable_buffer_size(),
BitUtil.roundUpToPowerOf2(bytesPerPartition)));
// Recompute the max row buffer size with the smaller buffer.
maxRowBufferSize =
computeMaxSpillableBufferSize(bufferSize, queryOptions.getMax_row_size());
}
if (useStreamingPreagg_) {
// We can execute a streaming preagg without any buffers by passing through rows,
// but that is a very low performance mode of execution if the aggregation reduces
// its input significantly. Instead reserve memory for one buffer and 64kb of hash
// tables per partition. We don't need to reserve memory for large rows since they
// can be passed through if needed.
perInstanceMinReservation = (bufferSize + 64 * 1024) * PARTITION_FANOUT;
} else {
long minBuffers = PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() ? 1 : 0);
// Two of the buffers need to be buffers large enough to hold the maximum-sized
// row to serve as input and output buffers while repartitioning.
perInstanceMinReservation = bufferSize * (minBuffers - 2) + maxRowBufferSize * 2;
}
}
nodeResourceProfile_ = new ResourceProfileBuilder()
.setMemEstimateBytes(perInstanceMemEstimate)
.setMinReservationBytes(perInstanceMinReservation)
.setSpillableBufferBytes(bufferSize)
.setMaxRowBufferBytes(maxRowBufferSize).build();
}
}