blob: 15473207cc291a3d7e9d025da120f9f8e1064dbb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.List;
import org.apache.impala.analysis.AggregateInfo;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.FunctionCallExpr;
import org.apache.impala.analysis.FunctionName;
import org.apache.impala.analysis.FunctionParams;
import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.NotImplementedException;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TScanRangeSpec;
import org.apache.impala.thrift.TTableStats;
* Representation of the common elements of all scan nodes.
abstract public class ScanNode extends PlanNode {
// Factor capturing the worst-case deviation from a uniform distribution of scan ranges
// among nodes. The factor of 1.2 means that a particular node may have 20% more
// scan ranges than would have been estimated assuming a uniform distribution.
// Used for HDFS and Kudu Scan node estimations.
protected static final double SCAN_RANGE_SKEW_FACTOR = 1.2;
protected final TupleDescriptor desc_;
// Total number of rows this node is expected to process
protected long inputCardinality_ = -1;
// Scan-range specs. Populated in init().
protected TScanRangeSpec scanRangeSpecs_;
// The AggregationInfo from the query block of this scan node. Used for determining if
// the count(*) optimization can be applied.
// Count(*) aggregation optimization flow:
// The caller passes in an AggregateInfo to the constructor that this scan node uses to
// determine whether to apply the optimization or not. The produced smap must then be
// applied to the AggregateInfo in this query block. We do not apply the smap in this
// class directly to avoid side effects and make it easier to reason about.
protected AggregateInfo aggInfo_ = null;
protected static final String STATS_NUM_ROWS = "stats: num_rows";
// Should be applied to the AggregateInfo from the same query block. We cannot use the
// PlanNode.outputSmap_ for this purpose because we don't want the smap entries to be
// propagated outside the query block.
protected ExprSubstitutionMap optimizedAggSmap_;
public ScanNode(PlanNodeId id, TupleDescriptor desc, String displayName) {
super(id, desc.getId().asList(), displayName);
desc_ = desc;
public TupleDescriptor getTupleDesc() { return desc_; }
* Checks if this scan is supported based on the types of scanned columns and the
* underlying file formats, in particular, whether complex types are supported.
* The default implementation throws if this scan would need to materialize a nested
* field or collection. The scan is ok if the table schema contains complex types, as
* long as the query does not reference them.
* Subclasses should override this function as appropriate.
protected void checkForSupportedFileFormats() throws NotImplementedException {
for (SlotDescriptor slotDesc: desc_.getSlots()) {
if (slotDesc.getType().isComplexType() || slotDesc.getColumn() == null) {
throw new NotImplementedException(String.format(
"Scan of table '%s' is not supported because '%s' references a nested " +
"field/collection.\nComplex types are supported for these file formats: %s.",
slotDesc.getPath().toString(), desc_.getAlias(),
Joiner.on(", ").join(HdfsFileFormat.complexTypesFormats())));
protected boolean isCountStarOptimizationDescriptor(SlotDescriptor desc) {
return desc.getLabel().equals(STATS_NUM_ROWS);
* Adds a new slot descriptor to the tuple descriptor of this scan. The new slot will be
* used for storing the data extracted from the Kudu num rows statistic. Also adds an
* entry to 'optimizedAggSmap_' that substitutes count(*) with
* sum_init_zero(<new-slotref>). Returns the new slot descriptor.
protected SlotDescriptor applyCountStarOptimization(Analyzer analyzer) {
FunctionCallExpr countFn = new FunctionCallExpr(new FunctionName("count"),
// Create the sum function.
SlotDescriptor sd = analyzer.addSlotDescriptor(getTupleDesc());
List<Expr> args = new ArrayList<>();
args.add(new SlotRef(sd));
FunctionCallExpr sumFn = new FunctionCallExpr("sum_init_zero", args);
optimizedAggSmap_ = new ExprSubstitutionMap();
optimizedAggSmap_.put(countFn, sumFn);
return sd;
* Returns true if the count(*) optimization can be applied to the query block
* of this scan node.
protected boolean canApplyCountStarOptimization(Analyzer analyzer) {
if (analyzer.getNumTableRefs() != 1) return false;
if (aggInfo_ == null || !aggInfo_.hasCountStarOnly()) return false;
if (!conjuncts_.isEmpty()) return false;
return desc_.getMaterializedSlots().isEmpty() || desc_.hasClusteringColsOnly();
* Returns all scan range specs.
public TScanRangeSpec getScanRangeSpecs() {
Preconditions.checkNotNull(scanRangeSpecs_, "Need to call init() first.");
return scanRangeSpecs_;
protected String debugString() {
return Objects.toStringHelper(this)
.add("tid", desc_.getId().asInt())
.add("tblName", desc_.getTable().getFullName())
.add("keyRanges", "")
* Returns the explain string for table stats to be included into this ScanNode's
* explain string. The prefix is prepended to each returned line for proper formatting
* when the string returned by this method is embedded in a query's explain plan.
protected String getTableStatsExplainString(String prefix) {
TTableStats tblStats = desc_.getTable().getTTableStats();
return new StringBuilder()
.append("table: rows=")
* Returns the explain string for column stats to be included into this ScanNode's
* explain string. The prefix is prepended to each returned line for proper formatting
* when the string returned by this method is embedded in a query's explain plan.
protected String getColumnStatsExplainString(String prefix) {
StringBuilder output = new StringBuilder();
List<String> columnsMissingStats = new ArrayList<>();
for (SlotDescriptor slot: desc_.getSlots()) {
if (!slot.getStats().hasStats() && slot.getColumn() != null) {
if (columnsMissingStats.isEmpty()) {
output.append("columns: all");
} else if (columnsMissingStats.size() == desc_.getSlots().size()) {
output.append("columns: unavailable");
} else {
output.append(String.format("columns missing stats: %s",
Joiner.on(", ").join(columnsMissingStats)));
return output.toString();
* Combines the explain string for table and column stats.
protected String getStatsExplainString(String prefix) {
StringBuilder output = new StringBuilder(prefix);
output.append("stored statistics:\n");
prefix = prefix + " ";
return output.toString();
* Returns true if the table underlying this scan is missing table stats
* or column stats relevant to this scan node.
public boolean isTableMissingStats() {
return isTableMissingColumnStats() || isTableMissingTableStats();
public boolean isTableMissingTableStats() {
return desc_.getTable().getNumRows() == -1;
* Returns true if the tuple descriptor references a path with a collection type.
public boolean isAccessingCollectionType() {
for (Type t: desc_.getPath().getMatchedTypes()) {
if (t.isCollectionType()) return true;
return false;
* Returns true if the column does not have stats, complex type columns are skipped.
public boolean isTableMissingColumnStats() {
for (SlotDescriptor slot: desc_.getSlots()) {
if (slot.getColumn() != null && !slot.getStats().hasStats() &&
!slot.getColumn().getType().isComplexType()) {
return true;
return false;
* Returns true, if the scanned table is suspected to have corrupt table stats,
* in particular, if the scan is non-empty and 'numRows' is 0 or negative (but not -1).
public boolean hasCorruptTableStats() { return false; }
* Helper function to parse a "host:port" address string into TNetworkAddress
* This is called with ipaddress:port when doing scan range assignment.
protected static TNetworkAddress addressToTNetworkAddress(String address) {
TNetworkAddress result = new TNetworkAddress();
String[] hostPort = address.split(":");
result.hostname = hostPort[0];
result.port = Integer.parseInt(hostPort[1]);
return result;
public long getInputCardinality() {
if (!hasScanConjuncts() && !hasStorageLayerConjuncts() && hasLimit()) {
return getLimit();
return inputCardinality_;
protected String getDisplayLabelDetail() {
FeTable table = desc_.getTable();
List<String> path = new ArrayList<>();
if (desc_.hasExplicitAlias()) {
return desc_.getPath().toString() + " " + desc_.getAlias();
} else {
return desc_.getPath().toString();
* Helper function that returns the estimated number of scan ranges that would
* be assigned to each host based on the total number of scan ranges.
protected int estimatePerHostScanRanges(long totalNumOfScanRanges) {
return (int) Math.ceil(((double) totalNumOfScanRanges / (double) numNodes_) *
* Helper function that returns the max number of scanner threads that can be
* spawned by a scan node.
protected int computeMaxNumberOfScannerThreads(TQueryOptions queryOptions,
int perHostScanRanges) {
// The non-MT scan node requires at least one scanner thread.
if (queryOptions.getMt_dop() >= 1) {
return 1;
int maxScannerThreads = Math.min(perHostScanRanges,
// Account for the max scanner threads query option.
if (queryOptions.isSetNum_scanner_threads() &&
queryOptions.getNum_scanner_threads() > 0) {
maxScannerThreads = Math.min(maxScannerThreads,
return maxScannerThreads;
* Returns true if this node has conjuncts to be evaluated by Impala against the scan
* tuple.
public boolean hasScanConjuncts() { return !getConjuncts().isEmpty(); }
* Returns true if this node has conjuncts to be evaluated by the underlying storage
* engine.
public boolean hasStorageLayerConjuncts() { return false; }