blob: f8645ab01b3b1696706246760ad238dd2127f659 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.List;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.SortInfo;
import org.apache.impala.common.InternalException;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TSortInfo;
import org.apache.impala.thrift.TSortNode;
import org.apache.impala.thrift.TSortType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Node the implements various types of sorts:
* - TOTAL: uses SortNode in the BE.
* - TOPN: uses TopNNode in the BE. Must have a limit.
* - PARTIAL: use PartialSortNode in the BE. Cannot have a limit or offset.
* Will always materialize the new tuple info_.sortTupleDesc_.
public class SortNode extends PlanNode {
private final static Logger LOG = LoggerFactory.getLogger(SortNode.class);
// Memory limit for partial sorts, specified in bytes. TODO: determine the value for
// this, consider making it configurable, enforce it in the BE. (IMPALA-5669)
private final long PARTIAL_SORT_MEM_LIMIT = 128 * 1024 * 1024;
private final SortInfo info_;
// if set, this SortNode requires its input to have this data partition
private DataPartition inputPartition_;
// if true, the output of this node feeds an AnalyticNode
private boolean isAnalyticSort_;
// if this is an analytic sort, this points to the corresponding
// analytic eval node otherwise null
private AnalyticEvalNode analyticEvalNode_;
// set only for the analytic sort node
private List<Expr> partitioningExprs_;
// info_.sortTupleSlotExprs_ substituted with the outputSmap_ for materialized slots
// in init().
private List<Expr> resolvedTupleExprs_;
// The offset of the first row to return.
protected long offset_;
// The type of sort. Determines the exec node used in the BE.
private TSortType type_;
// Estimated bytes of input that will go into this sort node across all backends.
// Used for sorter spill estimation in backend code.
private long estimatedFullInputSize_ = -1;
* Creates a new SortNode that implements a partial sort.
public static SortNode createPartialSortNode(
PlanNodeId id, PlanNode input, SortInfo info) {
return new SortNode(id, input, info, 0, TSortType.PARTIAL);
* Creates a new SortNode with a limit that is executed with TopNNode in the BE.
public static SortNode createTopNSortNode(
PlanNodeId id, PlanNode input, SortInfo info, long offset) {
return new SortNode(id, input, info, offset, TSortType.TOPN);
* Creates a new SortNode that does a total sort, possibly with a limit.
public static SortNode createTotalSortNode(
PlanNodeId id, PlanNode input, SortInfo info, long offset) {
return new SortNode(id, input, info, offset, TSortType.TOTAL);
private SortNode(
PlanNodeId id, PlanNode input, SortInfo info, long offset, TSortType type) {
super(id, info.getSortTupleDescriptor().getId().asList(), getDisplayName(type));
info_ = info;
offset_ = offset;
type_ = type;
public long getOffset() { return offset_; }
public void setOffset(long offset) { offset_ = offset; }
public boolean hasOffset() { return offset_ > 0; }
public boolean isTypeTopN() { return type_ == TSortType.TOPN; }
public SortInfo getSortInfo() { return info_; }
public void setInputPartition(DataPartition inputPartition) {
inputPartition_ = inputPartition;
public DataPartition getInputPartition() { return inputPartition_; }
public boolean isAnalyticSort() { return isAnalyticSort_; }
public void setIsAnalyticSort(boolean v) { isAnalyticSort_ = v; }
public void setAnalyticEvalNode(AnalyticEvalNode n) { analyticEvalNode_ = n; }
public AnalyticEvalNode getAnalyticEvalNode() { return analyticEvalNode_; }
* Under special cases, the planner may decide to convert a total sort into a
* TopN sort with limit
public void convertToTopN(long limit, List<Expr> partitioningExprs,
Analyzer analyzer) {
Preconditions.checkArgument(type_ == TSortType.TOTAL);
type_ = TSortType.TOPN;
displayName_ = getDisplayName(type_);
partitioningExprs_ = partitioningExprs;
public List<Expr> getPartitioningExprs() { return partitioningExprs_ ; }
public boolean allowPartitioned() {
if (isAnalyticSort_ && hasLimit()) return true;
return super.allowPartitioned();
public boolean isBlockingNode() { return type_ != TSortType.PARTIAL; }
public void init(Analyzer analyzer) throws InternalException {
// Do not assignConjuncts() here, so that conjuncts bound by this SortNode's tuple id
// can be placed in a downstream SelectNode. A SortNode cannot evaluate conjuncts.
// Compute the memory layout for the generated tuple.
// populate resolvedTupleExprs_ and outputSmap_
List<SlotDescriptor> sortTupleSlots = info_.getSortTupleDescriptor().getSlots();
Preconditions.checkState(sortTupleSlots.size() > 0,
"empty sort tuple descriptor");
List<Expr> slotExprs = info_.getMaterializedExprs();
resolvedTupleExprs_ = new ArrayList<>();
outputSmap_ = new ExprSubstitutionMap();
for (int i = 0; i < slotExprs.size(); ++i) {
if (!sortTupleSlots.get(i).isMaterialized()) continue;
outputSmap_.put(slotExprs.get(i), new SlotRef(sortTupleSlots.get(i)));
ExprSubstitutionMap childSmap = getCombinedChildSmap();
// Preserve type as resolvedTupleExprs_ will be used to materialize the tuple and the
// layout is already calculated.
resolvedTupleExprs_ =
Expr.substituteList(resolvedTupleExprs_, childSmap, analyzer, true);
// Remap the ordering exprs to the tuple materialized by this sort node. The mapping
// is a composition of the childSmap and the outputSmap_ because the child node may
// have also remapped its input (e.g., as in a series of (sort->analytic)* nodes).
// Parent nodes have to do the same so set the composition as the outputSmap_.
outputSmap_ = ExprSubstitutionMap.compose(childSmap, outputSmap_, analyzer);
info_.substituteSortExprs(outputSmap_, analyzer);
if (LOG.isTraceEnabled()) {
LOG.trace("sort id " + tupleIds_.get(0).toString() + " smap: "
+ outputSmap_.debugString());
LOG.trace("sort input exprs: " + Expr.debugString(resolvedTupleExprs_));
protected void computeStats(Analyzer analyzer) {
cardinality_ = capCardinalityAtLimit(getChild(0).cardinality_);
if (LOG.isTraceEnabled()) {
LOG.trace("stats Sort: cardinality=" + Long.toString(cardinality_));
protected String debugString() {
List<String> strings = new ArrayList<>();
for (Boolean isAsc : info_.getIsAscOrder()) {
strings.add(isAsc ? "a" : "d");
return MoreObjects.toStringHelper(this)
.add("type_", type_)
.add("ordering_exprs", Expr.debugString(info_.getSortExprs()))
.add("is_asc", "[" + Joiner.on(" ").join(strings) + "]")
.add("nulls_first", "[" + Joiner.on(" ").join(info_.getNullsFirst()) + "]")
.add("offset_", offset_)
protected void toThrift(TPlanNode msg) {
Preconditions.checkState(!isTypeTopN() || hasLimit(), "Top-N must have limit");
Preconditions.checkState(offset_ >= 0);
msg.node_type = TPlanNodeType.SORT_NODE;
TSortInfo sort_info = new TSortInfo(Expr.treesToThrift(info_.getSortExprs()),
info_.getIsAscOrder(), info_.getNullsFirst(), info_.getSortingOrder());
Preconditions.checkState(tupleIds_.size() == 1,
"Incorrect size for tupleIds_ in SortNode");
TSortNode sort_node = new TSortNode(sort_info, type_);
msg.sort_node = sort_node;
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
output.append(String.format("%s%s:%s%s\n", prefix, id_.toString(),
displayName_, getNodeExplainDetail(detailLevel)));
if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
output.append(detailPrefix + "order by: ");
info_.getIsAscOrder(), info_.getNullsFirstParams(), info_.getSortingOrder()));
if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
List<Expr> nonSlotRefExprs = new ArrayList<>();
for (Expr e: info_.getMaterializedExprs()) {
if (e instanceof SlotRef) continue;
if (!nonSlotRefExprs.isEmpty()) {
output.append(detailPrefix + "materialized: ");
for (int i = 0; i < nonSlotRefExprs.size(); ++i) {
if (i > 0) output.append(", ");
return output.toString();
private String getNodeExplainDetail(TExplainLevel detailLevel) {
if (!hasLimit()) return "";
if (hasOffset()) {
return String.format(" [LIMIT=%s OFFSET=%s]", limit_, offset_);
} else {
return String.format(" [LIMIT=%s]", limit_);
protected String getOffsetExplainString(String prefix) {
return offset_ != 0 ? prefix + "offset: " + Long.toString(offset_) + "\n" : "";
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
if (type_ == TSortType.TOPN) {
nodeResourceProfile_ = ResourceProfile.noReservation(
getSortInfo().estimateTopNMaterializedSize(cardinality_, offset_));
// For an external sort, set the memory cost to be what is required for a 2-phase
// sort. If the input to be sorted would take up N blocks in memory, then the
// memory required for a 2-phase sort is sqrt(N) blocks. A single run would be of
// size sqrt(N) blocks, and we could merge sqrt(N) such runs with sqrt(N) blocks
// of memory.
double fullInputSize = getChild(0).cardinality_ * avgRowSize_;
estimatedFullInputSize_ = fullInputSize < 0 ? -1 : (long) Math.ceil(fullInputSize);
boolean usesVarLenBlocks = false;
for (SlotDescriptor slotDesc: info_.getSortTupleDescriptor().getSlots()) {
if (slotDesc.isMaterialized() && !slotDesc.getType().isFixedLengthType()) {
usesVarLenBlocks = true;
// Sort uses a single buffer size - either the default spillable buffer size or the
// smallest buffer size required to fit the maximum row size.
long bufferSize = computeMaxSpillableBufferSize(
queryOptions.getDefault_spillable_buffer_size(), queryOptions.getMax_row_size());
// The external sorter writes fixed-len and var-len data in separate sequences of
// pages on disk and reads from both sequences when merging. This effectively
// doubles the number of pages required when there are var-len columns present.
// Must be kept in sync with ComputeMinReservation() in Sorter in be.
int pageMultiplier = usesVarLenBlocks ? 2 : 1;
long perInstanceMemEstimate;
long perInstanceMinMemReservation;
if (type_ == TSortType.PARTIAL) {
// The memory limit cannot be less than the size of the required blocks.
long mem_limit = Math.max(PARTIAL_SORT_MEM_LIMIT, bufferSize * pageMultiplier);
// 'fullInputSize' will be negative if stats are missing, just use the limit.
perInstanceMemEstimate = fullInputSize < 0 ?
mem_limit :
Math.min((long) Math.ceil(fullInputSize), mem_limit);
perInstanceMinMemReservation = bufferSize * pageMultiplier;
} else {
double numInputBlocks = Math.ceil(fullInputSize / (bufferSize * pageMultiplier));
perInstanceMemEstimate =
bufferSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
perInstanceMinMemReservation = 3 * bufferSize * pageMultiplier;
nodeResourceProfile_ = new ResourceProfileBuilder()
private static String getDisplayName(TSortType type) {
if (type == TSortType.TOPN) {
return "TOP-N";
} else if (type == TSortType.PARTIAL) {
return "PARTIAL SORT";
} else {
Preconditions.checkState(type == TSortType.TOTAL);
return "SORT";