blob: 05ae95c075dca39fc93c1d1d2170142e87b5ee33 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.impala.analysis.AggregateInfoBase;
import org.apache.impala.analysis.AnalyticExpr;
import org.apache.impala.analysis.AnalyticInfo;
import org.apache.impala.analysis.AnalyticWindow;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.OrderByElement;
import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.SortInfo;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.analysis.TupleId;
import org.apache.impala.analysis.TupleIsNullPredicate;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.thrift.TSortingOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* The analytic planner adds plan nodes to an existing plan tree in order to
* implement the AnalyticInfo of a given query stmt. The resulting plan reflects
* similarities among analytic exprs with respect to partitioning, ordering and
* windowing to reduce data exchanges and sorts (the exchanges and sorts are currently
* not minimal). The generated plan has the following structure:
* ...
* (
* (
* (
* analytic node <-- group of analytic exprs with compatible window
* )+ <-- group of analytic exprs with compatible ordering
* sort node?
* )+ <-- group of analytic exprs with compatible partitioning
* hash exchange?
* )* <-- group of analytic exprs that have different partitioning
* input plan node
* ...
*/
public class AnalyticPlanner {
private final static Logger LOG = LoggerFactory.getLogger(AnalyticPlanner.class);
private final AnalyticInfo analyticInfo_;
private final Analyzer analyzer_;
private final PlannerContext ctx_;
public AnalyticPlanner(AnalyticInfo analyticInfo, Analyzer analyzer,
PlannerContext ctx) {
analyticInfo_ = analyticInfo;
analyzer_ = analyzer;
ctx_ = ctx;
}
/**
* Return plan tree that augments 'root' with plan nodes that implement single-node
* evaluation of the AnalyticExprs in analyticInfo.
* This plan takes into account a possible hash partition of its input on
* 'groupingExprs'; if this is non-null, it returns in 'inputPartitionExprs'
* a subset of the grouping exprs which should be used for the aggregate
* hash partitioning during the parallelization of 'root'.
* TODO: when generating sort orders for the sort groups, optimize the ordering
* of the partition exprs (so that subsequent sort operations see the input sorted
* on a prefix of their required sort exprs)
* TODO: when merging sort groups, recognize equivalent exprs
* (using the equivalence classes) rather than looking for expr equality
*/
public PlanNode createSingleNodePlan(PlanNode root,
List<Expr> groupingExprs, List<Expr> inputPartitionExprs) throws ImpalaException {
List<WindowGroup> windowGroups = collectWindowGroups();
for (int i = 0; i < windowGroups.size(); ++i) {
windowGroups.get(i).init(analyzer_, "wg-" + i);
}
List<SortGroup> sortGroups = collectSortGroups(windowGroups);
mergeSortGroups(sortGroups);
for (SortGroup g: sortGroups) {
g.init();
}
List<PartitionGroup> partitionGroups = collectPartitionGroups(sortGroups);
// TODO-MT: this maybe should be instances
mergePartitionGroups(partitionGroups, root.getNumNodes());
orderGroups(partitionGroups);
if (groupingExprs != null) {
Preconditions.checkNotNull(inputPartitionExprs);
computeInputPartitionExprs(
partitionGroups, groupingExprs, root.getNumNodes(), inputPartitionExprs);
}
for (PartitionGroup partitionGroup: partitionGroups) {
for (int i = 0; i < partitionGroup.sortGroups.size(); ++i) {
root = createSortGroupPlan(root, partitionGroup.sortGroups.get(i),
i == 0 ? partitionGroup.partitionByExprs : null);
}
}
return root;
}
/**
* Coalesce sort groups that have compatible partition-by exprs and
* have a prefix relationship.
*/
private void mergeSortGroups(List<SortGroup> sortGroups) {
boolean hasMerged = false;
do {
hasMerged = false;
for (SortGroup sg1: sortGroups) {
for (SortGroup sg2: sortGroups) {
if (sg1 != sg2 && sg1.isPrefixOf(sg2)) {
sg1.absorb(sg2);
sortGroups.remove(sg2);
hasMerged = true;
break;
}
}
if (hasMerged) break;
}
} while (hasMerged);
}
/**
* Coalesce partition groups for which the intersection of their
* partition exprs has ndv estimate > numNodes, so that the resulting plan
* still parallelizes across all nodes.
*/
private void mergePartitionGroups(
List<PartitionGroup> partitionGroups, int numNodes) {
boolean hasMerged = false;
do {
hasMerged = false;
for (PartitionGroup pg1: partitionGroups) {
for (PartitionGroup pg2: partitionGroups) {
if (pg1 != pg2) {
long ndv = Expr.getNumDistinctValues(
Expr.intersect(pg1.partitionByExprs, pg2.partitionByExprs));
if (ndv == -1 || ndv < 0 || ndv < numNodes) {
// didn't get a usable value or the number of partitions is too small
continue;
}
pg1.merge(pg2);
partitionGroups.remove(pg2);
hasMerged = true;
break;
}
}
if (hasMerged) break;
}
} while (hasMerged);
}
/**
* Determine the partition group that has the maximum intersection in terms
* of the estimated ndv of the partition exprs with groupingExprs.
* That partition group is placed at the front of partitionGroups, with its
* partition exprs reduced to the intersection, and the intersecting groupingExprs
* are returned in inputPartitionExprs.
*/
private void computeInputPartitionExprs(List<PartitionGroup> partitionGroups,
List<Expr> groupingExprs, int numNodes, List<Expr> inputPartitionExprs) {
inputPartitionExprs.clear();
Preconditions.checkState(numNodes != -1);
// find partition group with maximum intersection
long maxNdv = 0;
PartitionGroup maxPg = null;
List<Expr> maxGroupingExprs = null;
for (PartitionGroup pg: partitionGroups) {
List<Expr> l1 = new ArrayList<>();
List<Expr> l2 = new ArrayList<>();
analyzer_.exprIntersect(pg.partitionByExprs, groupingExprs, l1, l2);
// TODO: also look at l2 and take the max?
long ndv = Expr.getNumDistinctValues(l1);
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Partition group: %s, intersection: %s. " +
"GroupingExprs: %s, intersection: %s. ndv: %d, numNodes: %d, maxNdv: %d.",
Expr.debugString(pg.partitionByExprs), Expr.debugString(l1),
Expr.debugString(groupingExprs), Expr.debugString(l2),
ndv, numNodes, maxNdv));
}
if (ndv < 0 || ndv < numNodes || ndv < maxNdv) continue;
// found a better partition group
maxPg = pg;
maxPg.partitionByExprs = l1;
maxGroupingExprs = l2;
maxNdv = ndv;
}
if (maxNdv > numNodes) {
Preconditions.checkNotNull(maxPg);
// we found a partition group that gives us enough parallelism;
// move it to the front
partitionGroups.remove(maxPg);
partitionGroups.add(0, maxPg);
inputPartitionExprs.addAll(maxGroupingExprs);
if (LOG.isTraceEnabled()) {
LOG.trace("Optimized partition exprs: " + Expr.debugString(inputPartitionExprs));
}
}
}
/**
* Order partition groups (and the sort groups within them) by increasing
* totalOutputTupleSize. This minimizes the total volume of data that needs to be
* repartitioned and sorted.
* Also move the non-partitioning partition group to the end.
*/
private void orderGroups(List<PartitionGroup> partitionGroups) {
// remove the non-partitioning group from partitionGroups
PartitionGroup nonPartitioning = null;
for (PartitionGroup pg: partitionGroups) {
if (Expr.allConstant(pg.partitionByExprs)) {
nonPartitioning = pg;
break;
}
}
if (nonPartitioning != null) partitionGroups.remove(nonPartitioning);
// order by ascending combined output tuple size
Collections.sort(partitionGroups,
new Comparator<PartitionGroup>() {
@Override
public int compare(PartitionGroup pg1, PartitionGroup pg2) {
Preconditions.checkState(pg1.totalOutputTupleSize > 0);
Preconditions.checkState(pg2.totalOutputTupleSize > 0);
int diff = pg1.totalOutputTupleSize - pg2.totalOutputTupleSize;
return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
}
});
if (nonPartitioning != null) partitionGroups.add(nonPartitioning);
for (PartitionGroup pg: partitionGroups) {
pg.orderSortGroups();
}
}
/**
* Create SortInfo, including sort tuple, to sort entire input row
* on sortExprs.
*/
private SortInfo createSortInfo(PlanNode input, List<Expr> sortExprs,
List<Boolean> isAsc, List<Boolean> nullsFirst) {
return createSortInfo(input, sortExprs, isAsc, nullsFirst, TSortingOrder.LEXICAL);
}
/**
* Same as above, but with extra parameter, sorting order.
*/
private SortInfo createSortInfo(PlanNode input, List<Expr> sortExprs,
List<Boolean> isAsc, List<Boolean> nullsFirst, TSortingOrder sortingOrder) {
List<Expr> inputSlotRefs = new ArrayList<>();
for (TupleId tid: input.getTupleIds()) {
TupleDescriptor tupleDesc = analyzer_.getTupleDesc(tid);
for (SlotDescriptor inputSlotDesc: tupleDesc.getSlots()) {
if (!inputSlotDesc.isMaterialized()) continue;
if (inputSlotDesc.getType().isComplexType()) {
// Project out collection slots since they won't be used anymore and may cause
// troubles like IMPALA-8718. They won't be used since outputs of the analytic
// node must be in the select list of the block with the analytic, and we don't
// allow collection types to be returned from a select block, and also don't
// support any builtin or UDF functions that take collection types as an
// argument.
if (LOG.isTraceEnabled()) {
LOG.trace("Project out collection slot in sort tuple of analytic: slot={}",
inputSlotDesc.debugString());
}
continue;
}
inputSlotRefs.add(new SlotRef(inputSlotDesc));
}
}
// The decision to materialize ordering exprs should be based on exprs that are
// fully resolved against our input (IMPALA-5270).
ExprSubstitutionMap inputSmap = input.getOutputSmap();
List<Expr> resolvedSortExprs =
Expr.substituteList(sortExprs, inputSmap, analyzer_, true);
SortInfo sortInfo = new SortInfo(resolvedSortExprs, isAsc, nullsFirst,
sortingOrder);
sortInfo.createSortTupleInfo(inputSlotRefs, analyzer_);
// Lhs exprs to be substituted in ancestor plan nodes could have a rhs that contains
// TupleIsNullPredicates. TupleIsNullPredicates require specific tuple ids for
// evaluation. Since this sort materializes a new tuple, it's impossible to evaluate
// TupleIsNullPredicates referring to this sort's input after this sort,
// To preserve the information whether an input tuple was null or not this sort node,
// we materialize those rhs TupleIsNullPredicates, which are then substituted
// by a SlotRef into the sort's tuple in ancestor nodes (IMPALA-1519).
if (inputSmap != null) {
List<TupleIsNullPredicate> tupleIsNullPreds = new ArrayList<>();
for (Expr rhsExpr: inputSmap.getRhs()) {
// Ignore substitutions that are irrelevant at this plan node and its ancestors.
if (!rhsExpr.isBoundByTupleIds(input.getTupleIds())) continue;
rhsExpr.collect(TupleIsNullPredicate.class, tupleIsNullPreds);
}
Expr.removeDuplicates(tupleIsNullPreds);
sortInfo.addMaterializedExprs(tupleIsNullPreds, analyzer_);
}
sortInfo.getSortTupleDescriptor().materializeSlots();
return sortInfo;
}
/**
* Create plan tree for the entire sort group, including all contained window groups.
* Marks the SortNode as requiring its input to be partitioned if partitionExprs
* is not null (partitionExprs represent the data partition of the entire partition
* group of which this sort group is a part).
*/
private PlanNode createSortGroupPlan(PlanNode root, SortGroup sortGroup,
List<Expr> partitionExprs) throws ImpalaException {
List<Expr> partitionByExprs = sortGroup.partitionByExprs;
List<OrderByElement> orderByElements = sortGroup.orderByElements;
boolean hasActivePartition = !Expr.allConstant(partitionByExprs);
// IMPALA-8069: Ignore something like ORDER BY 0
boolean isConstSort = true;
for (OrderByElement elmt : orderByElements) {
isConstSort = isConstSort && elmt.getExpr().isConstant();
}
// sort on partition by (pb) + order by (ob) exprs and create pb/ob predicates
if (hasActivePartition || !isConstSort) {
// first sort on partitionExprs (direction doesn't matter)
List<Expr> sortExprs = Lists.newArrayList(partitionByExprs);
List<Boolean> isAsc =
Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true)));
// TODO: utilize a direction and nulls/first last that has benefit
// for subsequent sort groups
List<Boolean> nullsFirst =
Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true)));
// then sort on orderByExprs
for (OrderByElement orderByElement: sortGroup.orderByElements) {
// If the expr is in the PARTITION BY and already in 'sortExprs', but also in
// the ORDER BY, its unnecessary to add it to 'sortExprs' again.
if (!sortExprs.contains(orderByElement.getExpr())) {
sortExprs.add(orderByElement.getExpr());
isAsc.add(orderByElement.isAsc());
nullsFirst.add(orderByElement.getNullsFirstParam());
}
}
SortInfo sortInfo = createSortInfo(root, sortExprs, isAsc, nullsFirst);
SortNode sortNode =
SortNode.createTotalSortNode(ctx_.getNextNodeId(), root, sortInfo, 0);
// if this sort group does not have partitioning exprs, we want the sort
// to be executed like a regular distributed sort
if (hasActivePartition) sortNode.setIsAnalyticSort(true);
if (partitionExprs != null) {
// create required input partition
DataPartition inputPartition = DataPartition.UNPARTITIONED;
if (hasActivePartition) {
inputPartition = DataPartition.hashPartitioned(partitionExprs);
}
sortNode.setInputPartition(inputPartition);
}
root = sortNode;
root.init(analyzer_);
}
// create one AnalyticEvalNode per window group
for (WindowGroup windowGroup: sortGroup.windowGroups) {
root = new AnalyticEvalNode(ctx_.getNextNodeId(), root,
windowGroup.analyticFnCalls, windowGroup.partitionByExprs,
windowGroup.orderByElements, windowGroup.window,
windowGroup.physicalIntermediateTuple, windowGroup.physicalOutputTuple,
windowGroup.logicalToPhysicalSmap);
root.init(analyzer_);
}
return root;
}
/**
* Collection of AnalyticExprs that share the same partition-by/order-by/window
* specification. The AnalyticExprs are stored broken up into their constituent parts.
*/
private static class WindowGroup {
public final List<Expr> partitionByExprs;
public final List<OrderByElement> orderByElements;
public final AnalyticWindow window; // not null
// Analytic exprs belonging to this window group and their corresponding logical
// intermediate and output slots from AnalyticInfo.intermediateTupleDesc_
// and AnalyticInfo.outputTupleDesc_.
public final List<AnalyticExpr> analyticExprs = new ArrayList<>();
// Result of getFnCall() for every analytic expr.
public final List<Expr> analyticFnCalls = new ArrayList<>();
public final List<SlotDescriptor> logicalOutputSlots = new ArrayList<>();
public final List<SlotDescriptor> logicalIntermediateSlots = new ArrayList<>();
// Physical output and intermediate tuples as well as an smap that maps the
// corresponding logical output slots to their physical slots in physicalOutputTuple.
// Set in init().
public TupleDescriptor physicalOutputTuple;
public TupleDescriptor physicalIntermediateTuple;
public final ExprSubstitutionMap logicalToPhysicalSmap = new ExprSubstitutionMap();
public WindowGroup(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot,
SlotDescriptor logicalIntermediateSlot) {
partitionByExprs = analyticExpr.getPartitionExprs();
orderByElements = analyticExpr.getOrderByElements();
window = analyticExpr.getWindow();
analyticExprs.add(analyticExpr);
analyticFnCalls.add(analyticExpr.getFnCall());
logicalOutputSlots.add(logicalOutputSlot);
logicalIntermediateSlots.add(logicalIntermediateSlot);
}
/**
* True if this analytic function must be evaluated in its own WindowGroup.
*/
private static boolean requiresIndependentEval(AnalyticExpr analyticExpr) {
return analyticExpr.getFnCall().getFnName().getFunction().equals(
AnalyticExpr.FIRST_VALUE_REWRITE);
}
/**
* True if the partition exprs and ordering elements and the window of analyticExpr
* match ours.
*/
public boolean isCompatible(AnalyticExpr analyticExpr) {
if (requiresIndependentEval(analyticExprs.get(0)) ||
requiresIndependentEval(analyticExpr)) {
return false;
}
if (!Expr.equalSets(analyticExpr.getPartitionExprs(), partitionByExprs)) {
return false;
}
if (!analyticExpr.getOrderByElements().equals(orderByElements)) return false;
if ((window == null) != (analyticExpr.getWindow() == null)) return false;
if (window == null) return true;
return analyticExpr.getWindow().equals(window);
}
/**
* Adds the given analytic expr and its logical slots to this window group.
* Assumes the corresponding analyticExpr is compatible with 'this'.
*/
public void add(AnalyticExpr analyticExpr, SlotDescriptor logicalOutputSlot,
SlotDescriptor logicalIntermediateSlot) {
Preconditions.checkState(isCompatible(analyticExpr));
analyticExprs.add(analyticExpr);
analyticFnCalls.add(analyticExpr.getFnCall());
logicalOutputSlots.add(logicalOutputSlot);
logicalIntermediateSlots.add(logicalIntermediateSlot);
}
/**
* Creates the physical output and intermediate tuples as well as the logical to
* physical smap for this window group. Computes the mem layout for the tuple
* descriptors.
*/
public void init(Analyzer analyzer, String tupleName) {
Preconditions.checkState(physicalOutputTuple == null);
Preconditions.checkState(physicalIntermediateTuple == null);
Preconditions.checkState(analyticFnCalls.size() == analyticExprs.size());
// If needed, create the intermediate tuple first to maintain
// intermediateTupleId < outputTupleId for debugging purposes and consistency with
// tuple creation for aggregations.
boolean requiresIntermediateTuple =
AggregateInfoBase.requiresIntermediateTuple(analyticFnCalls);
if (requiresIntermediateTuple) {
physicalIntermediateTuple =
analyzer.getDescTbl().createTupleDescriptor(tupleName + "intermed");
physicalOutputTuple =
analyzer.getDescTbl().createTupleDescriptor(tupleName + "out");
} else {
physicalOutputTuple =
analyzer.getDescTbl().createTupleDescriptor(tupleName + "out");
physicalIntermediateTuple = physicalOutputTuple;
}
Preconditions.checkState(analyticExprs.size() == logicalIntermediateSlots.size());
Preconditions.checkState(analyticExprs.size() == logicalOutputSlots.size());
for (int i = 0; i < analyticExprs.size(); ++i) {
SlotDescriptor logicalOutputSlot = logicalOutputSlots.get(i);
SlotDescriptor physicalOutputSlot =
analyzer.copySlotDescriptor(logicalOutputSlot, physicalOutputTuple);
physicalOutputSlot.setIsMaterialized(true);
if (requiresIntermediateTuple) {
SlotDescriptor logicalIntermediateSlot = logicalIntermediateSlots.get(i);
SlotDescriptor physicalIntermediateSlot = analyzer.copySlotDescriptor(
logicalIntermediateSlot, physicalIntermediateTuple);
physicalIntermediateSlot.setIsMaterialized(true);
}
logicalToPhysicalSmap.put(
new SlotRef(logicalOutputSlot), new SlotRef(physicalOutputSlot));
}
physicalOutputTuple.computeMemLayout();
if (requiresIntermediateTuple) physicalIntermediateTuple.computeMemLayout();
}
}
/**
* Extract a minimal set of WindowGroups from analyticExprs.
*/
private List<WindowGroup> collectWindowGroups() {
List<AnalyticExpr> analyticExprs = analyticInfo_.getAnalyticExprs();
List<WindowGroup> groups = new ArrayList<>();
for (int i = 0; i < analyticExprs.size(); ++i) {
AnalyticExpr analyticExpr = analyticExprs.get(i);
// Do not generate the plan for non-materialized analytic exprs.
if (!analyticInfo_.getOutputTupleDesc().getSlots().get(i).isMaterialized()) {
continue;
}
boolean match = false;
for (WindowGroup group: groups) {
if (group.isCompatible(analyticExpr)) {
group.add(analyticInfo_.getAnalyticExprs().get(i),
analyticInfo_.getOutputTupleDesc().getSlots().get(i),
analyticInfo_.getIntermediateTupleDesc().getSlots().get(i));
match = true;
break;
}
}
if (!match) {
groups.add(new WindowGroup(
analyticInfo_.getAnalyticExprs().get(i),
analyticInfo_.getOutputTupleDesc().getSlots().get(i),
analyticInfo_.getIntermediateTupleDesc().getSlots().get(i)));
}
}
return groups;
}
/**
* Collection of WindowGroups that share the same partition-by/order-by
* specification.
*/
private static class SortGroup {
public List<Expr> partitionByExprs;
public List<OrderByElement> orderByElements;
public List<WindowGroup> windowGroups = new ArrayList<>();
// sum of windowGroups.physicalOutputTuple.getByteSize()
public int totalOutputTupleSize = -1;
public SortGroup(WindowGroup windowGroup) {
partitionByExprs = windowGroup.partitionByExprs;
orderByElements = windowGroup.orderByElements;
windowGroups.add(windowGroup);
}
/**
* True if the partition and ordering exprs of windowGroup match ours.
*/
public boolean isCompatible(WindowGroup windowGroup) {
return Expr.equalSets(windowGroup.partitionByExprs, partitionByExprs)
&& windowGroup.orderByElements.equals(orderByElements);
}
public void add(WindowGroup windowGroup) {
Preconditions.checkState(isCompatible(windowGroup));
windowGroups.add(windowGroup);
}
/**
* Return true if 'this' and other have compatible partition exprs and
* our orderByElements are a prefix of other's.
*/
public boolean isPrefixOf(SortGroup other) {
if (other.orderByElements.size() > orderByElements.size()) return false;
if (!Expr.equalSets(partitionByExprs, other.partitionByExprs)) return false;
for (int i = 0; i < other.orderByElements.size(); ++i) {
OrderByElement ob = orderByElements.get(i);
OrderByElement otherOb = other.orderByElements.get(i);
// TODO: compare equiv classes by comparing each equiv class's placeholder
// slotref
if (!ob.getExpr().equals(otherOb.getExpr())) return false;
if (ob.isAsc() != otherOb.isAsc()) return false;
if (ob.nullsFirst() != otherOb.nullsFirst()) return false;
}
return true;
}
/**
* Adds other's window groups to ours, assuming that we're a prefix of other.
*/
public void absorb(SortGroup other) {
Preconditions.checkState(isPrefixOf(other));
windowGroups.addAll(other.windowGroups);
}
/**
* Compute totalOutputTupleSize.
*/
public void init() {
totalOutputTupleSize = 0;
for (WindowGroup g: windowGroups) {
TupleDescriptor outputTuple = g.physicalOutputTuple;
Preconditions.checkState(outputTuple.isMaterialized());
Preconditions.checkState(outputTuple.getByteSize() != -1);
totalOutputTupleSize += outputTuple.getByteSize();
}
}
private static class SizeLt implements Comparator<WindowGroup> {
@Override
public int compare(WindowGroup wg1, WindowGroup wg2) {
Preconditions.checkState(wg1.physicalOutputTuple != null
&& wg1.physicalOutputTuple.getByteSize() != -1);
Preconditions.checkState(wg2.physicalOutputTuple != null
&& wg2.physicalOutputTuple.getByteSize() != -1);
int diff = wg1.physicalOutputTuple.getByteSize()
- wg2.physicalOutputTuple.getByteSize();
return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
}
}
private static final SizeLt SIZE_LT;
static {
SIZE_LT = new SizeLt();
}
/**
* Order window groups by increasing size of the output tuple. This minimizes
* the total volume of data that needs to be buffered.
*/
public void orderWindowGroups() {
Collections.sort(windowGroups, SIZE_LT);
}
}
/**
* Partitions the windowGroups into SortGroups based on compatible order by exprs.
*/
private List<SortGroup> collectSortGroups(List<WindowGroup> windowGroups) {
List<SortGroup> sortGroups = new ArrayList<>();
for (WindowGroup windowGroup: windowGroups) {
boolean match = false;
for (SortGroup sortGroup: sortGroups) {
if (sortGroup.isCompatible(windowGroup)) {
sortGroup.add(windowGroup);
match = true;
break;
}
}
if (!match) sortGroups.add(new SortGroup(windowGroup));
}
return sortGroups;
}
/**
* Collection of SortGroups that have compatible partition-by specifications.
*/
private static class PartitionGroup {
public List<Expr> partitionByExprs;
public List<SortGroup> sortGroups = new ArrayList<>();
// sum of sortGroups.windowGroups.physicalOutputTuple.getByteSize()
public int totalOutputTupleSize = -1;
public PartitionGroup(SortGroup sortGroup) {
partitionByExprs = sortGroup.partitionByExprs;
sortGroups.add(sortGroup);
totalOutputTupleSize = sortGroup.totalOutputTupleSize;
}
/**
* True if the partition exprs of sortGroup are compatible with ours.
* For now that means equality.
*/
public boolean isCompatible(SortGroup sortGroup) {
return Expr.equalSets(sortGroup.partitionByExprs, partitionByExprs);
}
public void add(SortGroup sortGroup) {
Preconditions.checkState(isCompatible(sortGroup));
sortGroups.add(sortGroup);
totalOutputTupleSize += sortGroup.totalOutputTupleSize;
}
/**
* Merge 'other' into 'this'
* - partitionByExprs is the intersection of the two
* - sortGroups becomes the union
*/
public void merge(PartitionGroup other) {
partitionByExprs = Expr.intersect(partitionByExprs, other.partitionByExprs);
Preconditions.checkState(Expr.getNumDistinctValues(partitionByExprs) >= 0);
sortGroups.addAll(other.sortGroups);
}
/**
* Order sort groups by increasing totalOutputTupleSize. This minimizes the total
* volume of data that needs to be sorted.
*/
public void orderSortGroups() {
Collections.sort(sortGroups,
new Comparator<SortGroup>() {
@Override
public int compare(SortGroup sg1, SortGroup sg2) {
Preconditions.checkState(sg1.totalOutputTupleSize > 0);
Preconditions.checkState(sg2.totalOutputTupleSize > 0);
int diff = sg1.totalOutputTupleSize - sg2.totalOutputTupleSize;
return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
}
});
for (SortGroup sortGroup: sortGroups) {
sortGroup.orderWindowGroups();
}
}
}
/**
* Extract a minimal set of PartitionGroups from sortGroups.
*/
private List<PartitionGroup> collectPartitionGroups(List<SortGroup> sortGroups) {
List<PartitionGroup> partitionGroups = new ArrayList<>();
for (SortGroup sortGroup: sortGroups) {
boolean match = false;
for (PartitionGroup partitionGroup: partitionGroups) {
if (partitionGroup.isCompatible(sortGroup)) {
partitionGroup.add(sortGroup);
match = true;
break;
}
}
if (!match) partitionGroups.add(new PartitionGroup(sortGroup));
}
return partitionGroups;
}
}