blob: 7069bb57889e541967469285c0b8561ed821b7a7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.analysis;
import java.util.ArrayList;
import java.util.List;
import org.apache.impala.analysis.MultiAggregateInfo.AggPhase;
import org.apache.impala.catalog.AggregateFunction;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.InternalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* Encapsulates all the information needed to compute a list of aggregate functions with
* compatible grouping including their distributed execution.
*
* Each SELECT block containing aggregates will have a single MultiAggregateInfo which
* will contain one AggregateInfo per unique list of DISTINCT expressions. If there is
* only a single DISTINCT grouping, a single AggregateInfo will be created which will
* represent that grouping and any non-DISTINCT aggregates. If there is more than one
* DISTINCT grouping, the non-DISTINCT aggregates will be grouped together in their own
* AggregateInfo.
*
* Execution is modeled as a tree of AggregateInfo objects which express the local and
* merging aggregate computations. The tree structure looks as follows:
* - for non-distinct aggregation:
* - aggInfo: contains the original aggregation functions and grouping exprs
* - aggInfo.mergeAggInfo: contains the merging aggregation functions (grouping
* exprs are identical)
* - for distinct aggregation (also see createDistinctAggInfo()):
* - aggInfo: contains the phase 1 aggregate functions and grouping exprs
* - aggInfo.secondPhaseDistinctAggInfo: contains the phase 2 aggregate functions and
* grouping exprs, and the merging aggregate functions for any non-distinct aggs
* - aggInfo.mergeAggInfo: contains the merging aggregate functions for the phase 1
* computation (grouping exprs are identical to aggInfo)
* - aggInfo.secondPhaseDistinctAggInfo.mergeAggInfo: contains the merging aggregate
* functions for the phase 2 computation (grouping exprs are identical to
* aggInfo.secondPhaseDistinctAggInfo)
*
* Merging aggregate computations are idempotent. In other words,
* aggInfo.mergeAggInfo == aggInfo.mergeAggInfo.mergeAggInfo.
*/
public class AggregateInfo extends AggregateInfoBase {
private final static Logger LOG = LoggerFactory.getLogger(AggregateInfo.class);
// created by createMergeAggInfo()
private AggregateInfo mergeAggInfo_;
// created by createDistinctAggInfo()
private AggregateInfo secondPhaseDistinctAggInfo_;
private final AggPhase aggPhase_;
// Map from all grouping and aggregate exprs to a SlotRef referencing the corresp. slot
// in the intermediate tuple. Identical to outputTupleSmap_ if no aggregateExpr has an
// output type that is different from its intermediate type.
protected ExprSubstitutionMap intermediateTupleSmap_ = new ExprSubstitutionMap();
// Map from all grouping and aggregate exprs to a SlotRef referencing the corresp. slot
// in the output tuple.
protected ExprSubstitutionMap outputTupleSmap_ = new ExprSubstitutionMap();
// if set, a subset of groupingExprs_; set and used during planning
private List<Expr> partitionExprs_;
// C'tor creates copies of groupingExprs and aggExprs.
private AggregateInfo(
List<Expr> groupingExprs, List<FunctionCallExpr> aggExprs, AggPhase aggPhase) {
super(groupingExprs, aggExprs);
aggPhase_ = aggPhase;
}
/**
* C'tor for cloning.
*/
private AggregateInfo(AggregateInfo other) {
super(other);
if (other.mergeAggInfo_ != null) {
mergeAggInfo_ = other.mergeAggInfo_.clone();
}
if (other.secondPhaseDistinctAggInfo_ != null) {
secondPhaseDistinctAggInfo_ = other.secondPhaseDistinctAggInfo_.clone();
}
aggPhase_ = other.aggPhase_;
outputTupleSmap_ = other.outputTupleSmap_.clone();
if (other.requiresIntermediateTuple()) {
intermediateTupleSmap_ = other.intermediateTupleSmap_.clone();
} else {
Preconditions.checkState(other.intermediateTupleDesc_ == other.outputTupleDesc_);
intermediateTupleSmap_ = outputTupleSmap_;
}
partitionExprs_ =
(other.partitionExprs_ != null) ? Expr.cloneList(other.partitionExprs_) : null;
}
public List<Expr> getPartitionExprs() { return partitionExprs_; }
public void setPartitionExprs(List<Expr> exprs) { partitionExprs_ = exprs; }
/**
* Creates complete AggregateInfo for groupingExprs and aggExprs, including
* aggTupleDesc and aggTupleSMap. If parameter tupleDesc != null, sets aggTupleDesc to
* that instead of creating a new descriptor (after verifying that the passed-in
* descriptor is correct for the given aggregation).
* Also creates mergeAggInfo and secondPhaseDistinctAggInfo, if needed.
* If an aggTupleDesc is created, also registers eq predicates between the
* grouping exprs and their respective slots with 'analyzer'.
*/
static public AggregateInfo create(List<Expr> groupingExprs,
List<FunctionCallExpr> aggExprs, TupleDescriptor tupleDesc, Analyzer analyzer)
throws AnalysisException {
Preconditions.checkState(
(groupingExprs != null && !groupingExprs.isEmpty())
|| (aggExprs != null && !aggExprs.isEmpty()));
AggregateInfo result = new AggregateInfo(groupingExprs, aggExprs, AggPhase.FIRST);
// collect agg exprs with DISTINCT clause
List<FunctionCallExpr> distinctAggExprs = new ArrayList<>();
if (aggExprs != null) {
for (FunctionCallExpr aggExpr: aggExprs) {
if (aggExpr.isDistinct()) distinctAggExprs.add(aggExpr);
}
}
if (distinctAggExprs.isEmpty()) {
if (tupleDesc == null) {
result.createTupleDescs(analyzer);
result.createSmaps(analyzer);
} else {
// A tupleDesc should only be given for UNION DISTINCT.
Preconditions.checkState(aggExprs == null);
result.outputTupleDesc_ = tupleDesc;
result.intermediateTupleDesc_ = tupleDesc;
}
result.createMergeAggInfo(analyzer);
} else {
// we don't allow you to pass in a descriptor for distinct aggregation
// (we need two descriptors)
Preconditions.checkState(tupleDesc == null);
result.createDistinctAggInfo(groupingExprs, distinctAggExprs, analyzer);
}
if (LOG.isTraceEnabled()) LOG.trace("agg info:\n" + result.debugString());
return result;
}
public static AggregateInfo create(List<Expr> groupingExprs,
List<FunctionCallExpr> aggExprs, Analyzer analyzer) throws AnalysisException {
return create(groupingExprs, aggExprs, null, analyzer);
}
/**
* Create aggregate info for a single distinct grouping. All distinct aggregate
* function in 'distinctAggExprs' must be applied to the same set of exprs.
* This creates:
* - aggTupleDesc
* - a complete secondPhaseDistinctAggInfo
* - mergeAggInfo
*
* Aggregation happens in two successive phases:
* - the first phase aggregates by all grouping exprs plus all parameter exprs
* of DISTINCT aggregate functions
* - the second phase is created in createSecondPhaseAggInfo()
*
* Example:
* SELECT a, COUNT(DISTINCT b, c), MIN(d), COUNT(*) FROM T GROUP BY a
* - 1st phase grouping exprs: a, b, c
* - 1st phase agg exprs: MIN(d), COUNT(*)
* - 2nd phase grouping exprs: a
* - 2nd phase agg exprs: COUNT(*), MIN(<MIN(d) from 1st phase>),
* SUM(<COUNT(*) from 1st phase>)
*/
private void createDistinctAggInfo(List<Expr> origGroupingExprs,
List<FunctionCallExpr> distinctAggExprs, Analyzer analyzer)
throws AnalysisException {
Preconditions.checkState(!distinctAggExprs.isEmpty());
// make sure that all DISTINCT params are the same;
// ignore top-level implicit casts in the comparison, we might have inserted
// those during analysis
List<Expr> expr0Children =
AggregateFunction.getCanonicalDistinctAggChildren(distinctAggExprs.get(0));
for (int i = 1; i < distinctAggExprs.size(); ++i) {
List<Expr> exprIChildren =
AggregateFunction.getCanonicalDistinctAggChildren(distinctAggExprs.get(i));
Preconditions.checkState(Expr.equalLists(expr0Children, exprIChildren));
}
// add DISTINCT parameters to grouping exprs
groupingExprs_.addAll(expr0Children);
// remove DISTINCT aggregate functions from aggExprs
aggregateExprs_.removeAll(distinctAggExprs);
createTupleDescs(analyzer);
createSmaps(analyzer);
createMergeAggInfo(analyzer);
createSecondPhaseAggInfo(origGroupingExprs, distinctAggExprs, analyzer);
}
public AggregateInfo getMergeAggInfo() { return mergeAggInfo_; }
public AggregateInfo getSecondPhaseDistinctAggInfo() {
return secondPhaseDistinctAggInfo_;
}
public boolean isMerge() { return aggPhase_.isMerge(); }
public boolean isDistinctAgg() { return secondPhaseDistinctAggInfo_ != null; }
public ExprSubstitutionMap getIntermediateSmap() { return intermediateTupleSmap_; }
public ExprSubstitutionMap getOutputSmap() { return outputTupleSmap_; }
public boolean hasAggregateExprs() {
return !aggregateExprs_.isEmpty() ||
(secondPhaseDistinctAggInfo_ != null &&
!secondPhaseDistinctAggInfo_.getAggregateExprs().isEmpty());
}
/**
* Return the tuple id produced in the final aggregation step.
*/
public TupleId getResultTupleId() {
if (isDistinctAgg()) return secondPhaseDistinctAggInfo_.getOutputTupleId();
return getOutputTupleId();
}
public TupleDescriptor getResultTupleDesc() {
if (isDistinctAgg()) return secondPhaseDistinctAggInfo_.getOutputTupleDesc();
return getOutputTupleDesc();
}
public ExprSubstitutionMap getResultSmap() {
if (isDistinctAgg()) return secondPhaseDistinctAggInfo_.getOutputSmap();
return getOutputSmap();
}
public List<FunctionCallExpr> getMaterializedAggregateExprs() {
List<FunctionCallExpr> result = new ArrayList<>();
for (Integer i: materializedSlots_) {
result.add(aggregateExprs_.get(i));
}
return result;
}
/**
* Substitute all the expressions (grouping expr, aggregate expr) and update our
* substitution map according to the given substitution map:
* - smap typically maps from tuple t1 to tuple t2 (example: the smap of an
* inline view maps the virtual table ref t1 into a base table ref t2)
* - our grouping and aggregate exprs need to be substituted with the given
* smap so that they also reference t2
* - aggTupleSMap needs to be recomputed to map exprs based on t2
* onto our aggTupleDesc (ie, the left-hand side needs to be substituted with
* smap)
* - mergeAggInfo: this is not affected, because
* * its grouping and aggregate exprs only reference aggTupleDesc_
* * its smap is identical to aggTupleSMap_
* - 2ndPhaseDistinctAggInfo:
* * its grouping and aggregate exprs also only reference aggTupleDesc_
* and are therefore not affected
* * its smap needs to be recomputed to map exprs based on t2 to its own
* aggTupleDesc
*/
public void substitute(ExprSubstitutionMap smap, Analyzer analyzer)
throws InternalException {
// Preserve the root type for NULL literals.
groupingExprs_ = Expr.substituteList(groupingExprs_, smap, analyzer, true);
if (LOG.isTraceEnabled()) {
LOG.trace("AggInfo: grouping_exprs=" + Expr.debugString(groupingExprs_));
}
// The smap in this case should not substitute the aggs themselves, only
// their subexpressions.
List<Expr> substitutedAggs =
Expr.substituteList(aggregateExprs_, smap, analyzer, false);
aggregateExprs_.clear();
for (Expr substitutedAgg: substitutedAggs) {
aggregateExprs_.add((FunctionCallExpr) substitutedAgg);
}
if (LOG.isTraceEnabled()) {
LOG.trace("AggInfo: agg_exprs=" + Expr.debugString(aggregateExprs_));
}
outputTupleSmap_.substituteLhs(smap, analyzer);
intermediateTupleSmap_.substituteLhs(smap, analyzer);
if (secondPhaseDistinctAggInfo_ != null) {
secondPhaseDistinctAggInfo_.substitute(smap, analyzer);
}
}
/**
* Create the info for an aggregation node that merges its pre-aggregated inputs:
* - pre-aggregation is computed by 'this'
* - tuple desc and smap are the same as that of the input (we're materializing
* the same logical tuple)
* - grouping exprs: slotrefs to the input's grouping slots
* - aggregate exprs: aggregation of the input's aggregateExprs slots
*
* The returned AggregateInfo shares its descriptor and smap with the input info;
* createAggTupleDesc() must not be called on it.
*/
private void createMergeAggInfo(Analyzer analyzer) {
Preconditions.checkState(mergeAggInfo_ == null);
TupleDescriptor inputDesc = intermediateTupleDesc_;
// construct grouping exprs
List<Expr> groupingExprs = new ArrayList<>();
for (int i = 0; i < getGroupingExprs().size(); ++i) {
SlotRef slotRef = new SlotRef(inputDesc.getSlots().get(i));
groupingExprs.add(slotRef);
}
// construct agg exprs
List<FunctionCallExpr> aggExprs = new ArrayList<>();
for (int i = 0; i < getAggregateExprs().size(); ++i) {
FunctionCallExpr inputExpr = getAggregateExprs().get(i);
Preconditions.checkState(inputExpr.isAggregateFunction());
Expr aggExprParam =
new SlotRef(inputDesc.getSlots().get(i + getGroupingExprs().size()));
FunctionCallExpr aggExpr = FunctionCallExpr.createMergeAggCall(
inputExpr, Lists.newArrayList(aggExprParam));
aggExpr.analyzeNoThrow(analyzer);
aggExprs.add(aggExpr);
}
AggPhase aggPhase =
(aggPhase_ == AggPhase.FIRST) ? AggPhase.FIRST_MERGE : AggPhase.SECOND_MERGE;
mergeAggInfo_ = new AggregateInfo(groupingExprs, aggExprs, aggPhase);
mergeAggInfo_.intermediateTupleDesc_ = intermediateTupleDesc_;
mergeAggInfo_.outputTupleDesc_ = outputTupleDesc_;
mergeAggInfo_.intermediateTupleSmap_ = intermediateTupleSmap_;
mergeAggInfo_.outputTupleSmap_ = outputTupleSmap_;
mergeAggInfo_.materializedSlots_ = materializedSlots_;
}
/**
* Creates an IF function call that returns NULL if any of the slots
* at indexes [firstIdx, lastIdx] return NULL.
* For example, the resulting IF function would like this for 3 slots:
* IF(IsNull(slot1), NULL, IF(IsNull(slot2), NULL, slot3))
* Returns null if firstIdx is greater than lastIdx.
* Returns a SlotRef to the last slot if there is only one slot in range.
*/
private Expr createCountDistinctAggExprParam(int firstIdx, int lastIdx,
List<SlotDescriptor> slots) {
if (firstIdx > lastIdx) return null;
Expr elseExpr = new SlotRef(slots.get(lastIdx));
if (firstIdx == lastIdx) return elseExpr;
for (int i = lastIdx - 1; i >= firstIdx; --i) {
List<Expr> ifArgs = new ArrayList<>();
SlotRef slotRef = new SlotRef(slots.get(i));
// Build expr: IF(IsNull(slotRef), NULL, elseExpr)
Expr isNullPred = new IsNullPredicate(slotRef, false);
ifArgs.add(isNullPred);
ifArgs.add(new NullLiteral());
ifArgs.add(elseExpr);
elseExpr = new FunctionCallExpr("if", ifArgs);
}
return elseExpr;
}
/**
* Create the info for an aggregation node that computes the second phase of
* DISTINCT aggregate functions.
* (Refer to createDistinctAggInfo() for an explanation of the phases.)
* - 'this' is the phase 1 aggregation
* - grouping exprs are those of the original query (param origGroupingExprs)
* - aggregate exprs for the DISTINCT agg fns: these are aggregating the grouping
* slots that were added to the original grouping slots in phase 1;
* count is mapped to count(*) and sum is mapped to sum
* - other aggregate exprs: same as the non-DISTINCT merge case
* (count is mapped to sum, everything else stays the same)
*
* This call also creates the tuple descriptor and smap for the returned AggregateInfo.
*/
private void createSecondPhaseAggInfo(List<Expr> origGroupingExprs,
List<FunctionCallExpr> distinctAggExprs, Analyzer analyzer)
throws AnalysisException {
Preconditions.checkState(secondPhaseDistinctAggInfo_ == null);
Preconditions.checkState(!distinctAggExprs.isEmpty());
// The output of the 1st phase agg is the 1st phase intermediate.
TupleDescriptor inputDesc = intermediateTupleDesc_;
// construct agg exprs for original DISTINCT aggregate functions
// (these aren't part of aggExprs_)
List<FunctionCallExpr> secondPhaseAggExprs = new ArrayList<>();
for (FunctionCallExpr inputExpr: distinctAggExprs) {
Preconditions.checkState(inputExpr.isAggregateFunction());
FunctionCallExpr aggExpr = null;
if (inputExpr.getFnName().getFunction().equals("count")) {
// COUNT(DISTINCT ...) ->
// COUNT(IF(IsNull(<agg slot 1>), NULL, IF(IsNull(<agg slot 2>), NULL, ...)))
// We need the nested IF to make sure that we do not count
// column-value combinations if any of the distinct columns are NULL.
// This behavior is consistent with MySQL.
Expr ifExpr = createCountDistinctAggExprParam(origGroupingExprs.size(),
origGroupingExprs.size() + inputExpr.getChildren().size() - 1,
inputDesc.getSlots());
Preconditions.checkNotNull(ifExpr);
ifExpr.analyzeNoThrow(analyzer);
aggExpr = new FunctionCallExpr("count", Lists.newArrayList(ifExpr));
} else if (inputExpr.getFnName().getFunction().equals("group_concat")) {
// Syntax: GROUP_CONCAT([DISTINCT] expression [, separator])
List<Expr> exprList = new ArrayList<>();
// Add "expression" parameter. Need to get it from the inputDesc's slots so the
// tuple reference is correct.
exprList.add(new SlotRef(inputDesc.getSlots().get(origGroupingExprs.size())));
// Check if user provided a custom separator
if (inputExpr.getChildren().size() == 2) exprList.add(inputExpr.getChild(1));
aggExpr = new FunctionCallExpr(inputExpr.getFnName(), exprList);
} else {
// SUM(DISTINCT <expr>) -> SUM(<last grouping slot>);
// (MIN(DISTINCT ...) and MAX(DISTINCT ...) have their DISTINCT turned
// off during analysis, and AVG() is changed to SUM()/COUNT())
Expr aggExprParam =
new SlotRef(inputDesc.getSlots().get(origGroupingExprs.size()));
aggExpr = new FunctionCallExpr(inputExpr.getFnName(),
Lists.newArrayList(aggExprParam));
}
secondPhaseAggExprs.add(aggExpr);
}
// map all the remaining agg fns
for (int i = 0; i < aggregateExprs_.size(); ++i) {
FunctionCallExpr inputExpr = aggregateExprs_.get(i);
Preconditions.checkState(inputExpr.isAggregateFunction());
// we're aggregating an intermediate slot of the 1st agg phase
Expr aggExprParam =
new SlotRef(inputDesc.getSlots().get(i + getGroupingExprs().size()));
FunctionCallExpr aggExpr = FunctionCallExpr.createMergeAggCall(
inputExpr, Lists.newArrayList(aggExprParam));
secondPhaseAggExprs.add(aggExpr);
}
Preconditions.checkState(
secondPhaseAggExprs.size() == aggregateExprs_.size() + distinctAggExprs.size());
for (FunctionCallExpr aggExpr: secondPhaseAggExprs) {
aggExpr.analyzeNoThrow(analyzer);
Preconditions.checkState(aggExpr.isAggregateFunction());
}
List<Expr> substGroupingExprs =
Expr.substituteList(origGroupingExprs, intermediateTupleSmap_, analyzer, false);
secondPhaseDistinctAggInfo_ =
new AggregateInfo(substGroupingExprs, secondPhaseAggExprs, AggPhase.SECOND);
secondPhaseDistinctAggInfo_.createTupleDescs(analyzer);
secondPhaseDistinctAggInfo_.createSecondPhaseAggSMap(this, distinctAggExprs);
secondPhaseDistinctAggInfo_.createMergeAggInfo(analyzer);
}
/**
* Populates the intermediate and output smaps. The output smap maps the original
* grouping and aggregate exprs onto the final output of the second phase distinct
* aggregation.
*/
private void createSecondPhaseAggSMap(
AggregateInfo inputAggInfo, List<FunctionCallExpr> distinctAggExprs) {
outputTupleSmap_.clear();
int slotIdx = 0;
List<SlotDescriptor> slotDescs = outputTupleDesc_.getSlots();
int numDistinctParams = distinctAggExprs.get(0).getChildren().size();
// If we are counting distinct params of group_concat, we cannot include the custom
// separator since it is not a distinct param.
if (distinctAggExprs.get(0).getFnName().getFunction().equalsIgnoreCase(
"group_concat")
&& numDistinctParams == 2) {
--numDistinctParams;
}
int numOrigGroupingExprs =
inputAggInfo.getGroupingExprs().size() - numDistinctParams;
Preconditions.checkState(slotDescs.size() ==
numOrigGroupingExprs + distinctAggExprs.size() +
inputAggInfo.getAggregateExprs().size());
// original grouping exprs -> first m slots
for (int i = 0; i < numOrigGroupingExprs; ++i, ++slotIdx) {
Expr groupingExpr = inputAggInfo.getGroupingExprs().get(i);
outputTupleSmap_.put(
groupingExpr.clone(), new SlotRef(slotDescs.get(slotIdx)));
}
// distinct agg exprs -> next n slots
for (int i = 0; i < distinctAggExprs.size(); ++i, ++slotIdx) {
Expr aggExpr = distinctAggExprs.get(i);
outputTupleSmap_.put(
aggExpr.clone(), (new SlotRef(slotDescs.get(slotIdx))));
}
// remaining agg exprs -> remaining slots
for (int i = 0; i < inputAggInfo.getAggregateExprs().size(); ++i, ++slotIdx) {
Expr aggExpr = inputAggInfo.getAggregateExprs().get(i);
outputTupleSmap_.put(aggExpr.clone(), new SlotRef(slotDescs.get(slotIdx)));
}
// Populate intermediate smap even if this step does not require an
// intermediate tuple. The intermediate smap is always different from the
// output smap because the latter maps from original input exprs, whereas
// the former maps from the input exprs of this second phase aggregation.
List<Expr> exprs =
Lists.newArrayListWithCapacity(groupingExprs_.size() + aggregateExprs_.size());
exprs.addAll(groupingExprs_);
exprs.addAll(aggregateExprs_);
for (int i = 0; i < exprs.size(); ++i) {
intermediateTupleSmap_.put(
exprs.get(i).clone(), new SlotRef(intermediateTupleDesc_.getSlots().get(i)));
}
}
/**
* Populates the output and intermediate smaps based on the output and intermediate
* tuples that are assumed to be set. If an intermediate tuple is required, also
* populates the output-to-intermediate smap and registers auxiliary equivalence
* predicates between the grouping slots of the two tuples.
*/
public void createSmaps(Analyzer analyzer) {
Preconditions.checkNotNull(outputTupleDesc_);
Preconditions.checkNotNull(intermediateTupleDesc_);
List<Expr> exprs = Lists.newArrayListWithCapacity(
groupingExprs_.size() + aggregateExprs_.size());
exprs.addAll(groupingExprs_);
exprs.addAll(aggregateExprs_);
for (int i = 0; i < exprs.size(); ++i) {
outputTupleSmap_.put(exprs.get(i).clone(),
new SlotRef(outputTupleDesc_.getSlots().get(i)));
if (!requiresIntermediateTuple()) continue;
intermediateTupleSmap_.put(exprs.get(i).clone(),
new SlotRef(intermediateTupleDesc_.getSlots().get(i)));
if (i < groupingExprs_.size()) {
analyzer.createAuxEqPredicate(
new SlotRef(outputTupleDesc_.getSlots().get(i)),
new SlotRef(intermediateTupleDesc_.getSlots().get(i)));
}
}
if (!requiresIntermediateTuple()) intermediateTupleSmap_ = outputTupleSmap_;
if (LOG.isTraceEnabled()) {
LOG.trace("output smap=" + outputTupleSmap_.debugString());
LOG.trace("intermediate smap=" + intermediateTupleSmap_.debugString());
}
}
/**
* Mark slots required for this aggregation as materialized:
* - all grouping output slots as well as grouping exprs
* - for non-distinct aggregation: the aggregate exprs of materialized aggregate slots;
* this assumes that the output slots corresponding to aggregate exprs have already
* been marked by the consumer of this select block
* - for distinct aggregation, we mark all aggregate output slots in order to keep
* things simple
* Also computes materializedAggregateExprs.
* This call must be idempotent because it may be called more than once for Union stmt.
*/
@Override
public void materializeRequiredSlots(Analyzer analyzer, ExprSubstitutionMap smap) {
for (int i = 0; i < groupingExprs_.size(); ++i) {
outputTupleDesc_.getSlots().get(i).setIsMaterialized(true);
intermediateTupleDesc_.getSlots().get(i).setIsMaterialized(true);
}
// collect input exprs: grouping exprs plus aggregate exprs that need to be
// materialized
materializedSlots_.clear();
List<Expr> exprs = new ArrayList<>();
exprs.addAll(groupingExprs_);
for (int i = 0; i < aggregateExprs_.size(); ++i) {
SlotDescriptor slotDesc =
outputTupleDesc_.getSlots().get(groupingExprs_.size() + i);
SlotDescriptor intermediateSlotDesc =
intermediateTupleDesc_.getSlots().get(groupingExprs_.size() + i);
if (isDistinctAgg()) {
slotDesc.setIsMaterialized(true);
intermediateSlotDesc.setIsMaterialized(true);
}
if (!slotDesc.isMaterialized()) continue;
intermediateSlotDesc.setIsMaterialized(true);
exprs.add(aggregateExprs_.get(i));
materializedSlots_.add(i);
}
List<Expr> resolvedExprs = Expr.substituteList(exprs, smap, analyzer, false);
analyzer.materializeSlots(resolvedExprs);
if (isDistinctAgg()) {
secondPhaseDistinctAggInfo_.materializeRequiredSlots(analyzer, null);
}
}
/**
* Checks if all materialized aggregate expressions have distinct semantics.
* It returns true if either of the following is true:
* (1) all materialized aggregate expressions have distinct semantics
* (e.g. MIN, MAX, NDV). In other words, this optimization will work
* for COUNT(DISTINCT c) but not COUNT(c).
* (2) there are no aggregate expressions but only grouping expressions.
*/
public boolean hasAllDistinctAgg() {
if (hasAggregateExprs()) {
for (FunctionCallExpr aggExpr : getMaterializedAggregateExprs()) {
if (!aggExpr.isDistinct() && !aggExpr.ignoresDistinct()) return false;
}
} else {
Preconditions.checkState(!groupingExprs_.isEmpty());
}
return true;
}
/**
* Returns true if there is a single count(*) materialized aggregate expression.
*/
public boolean hasCountStarOnly() {
if (getMaterializedAggregateExprs().size() != 1) return false;
if (isDistinctAgg()) return false;
FunctionCallExpr origExpr = getMaterializedAggregateExprs().get(0);
if (!origExpr.getFnName().getFunction().equalsIgnoreCase("count")) return false;
return origExpr.getParams().isStar();
}
/**
* Validates the internal state of this agg info: Checks that the number of
* materialized slots of the output tuple corresponds to the number of materialized
* aggregate functions plus the number of grouping exprs. Also checks that the return
* types of the aggregate and grouping exprs correspond to the slots in the output
* tuple and that the input types stored in the merge aggregation are consistent
* with the input exprs.
*/
public void checkConsistency() {
List<SlotDescriptor> slots = outputTupleDesc_.getSlots();
// Check materialized slots.
int numMaterializedSlots = 0;
for (SlotDescriptor slotDesc: slots) {
if (slotDesc.isMaterialized()) ++numMaterializedSlots;
}
Preconditions.checkState(numMaterializedSlots ==
materializedSlots_.size() + groupingExprs_.size());
// Check that grouping expr return types match the slot descriptors.
int slotIdx = 0;
for (int i = 0; i < groupingExprs_.size(); ++i) {
Expr groupingExpr = groupingExprs_.get(i);
Type slotType = slots.get(slotIdx).getType();
Preconditions.checkState(groupingExpr.getType().equals(slotType),
String.format("Grouping expr %s returns type %s but its output tuple " +
"slot has type %s", groupingExpr.toSql(),
groupingExpr.getType().toString(), slotType.toString()));
++slotIdx;
}
// Check that aggregate expr return types match the slot descriptors.
for (int i = 0; i < aggregateExprs_.size(); ++i) {
Expr aggExpr = aggregateExprs_.get(i);
Type slotType = slots.get(slotIdx).getType();
Preconditions.checkState(aggExpr.getType().equals(slotType),
String.format("Agg expr %s returns type %s but its output tuple " +
"slot has type %s", aggExpr.toSql(), aggExpr.getType().toString(),
slotType.toString()));
++slotIdx;
}
if (mergeAggInfo_ != null) {
// Check that the argument types in mergeAggInfo_ are consistent with input exprs.
for (int i = 0; i < aggregateExprs_.size(); ++i) {
FunctionCallExpr mergeAggExpr = mergeAggInfo_.aggregateExprs_.get(i);
mergeAggExpr.validateMergeAggFn(aggregateExprs_.get(i));
}
}
}
/// Return true if any aggregate functions have a serialize function.
/// Only valid to call once analyzed.
public boolean needsSerialize() {
for (FunctionCallExpr aggregateExpr: aggregateExprs_) {
Preconditions.checkState(aggregateExpr.isAnalyzed());
AggregateFunction fn = (AggregateFunction)aggregateExpr.getFn();
if (fn.getSerializeFnSymbol() != null) return true;
}
return false;
}
@Override
public String debugString() {
StringBuilder out = new StringBuilder(super.debugString());
out.append(Objects.toStringHelper(this)
.add("phase", aggPhase_)
.add("intermediate_smap", intermediateTupleSmap_.debugString())
.add("output_smap", outputTupleSmap_.debugString())
.toString());
if (mergeAggInfo_ != this && mergeAggInfo_ != null) {
out.append("\nmergeAggInfo:\n" + mergeAggInfo_.debugString());
}
if (secondPhaseDistinctAggInfo_ != null) {
out.append("\nsecondPhaseDistinctAggInfo:\n"
+ secondPhaseDistinctAggInfo_.debugString());
}
return out.toString();
}
@Override
protected String tupleDebugName() { return "agg-tuple"; }
@Override
public AggregateInfo clone() { return new AggregateInfo(this); }
}