blob: 15473207cc291a3d7e9d025da120f9f8e1064dbb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.List;
import org.apache.impala.analysis.AggregateInfo;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.FunctionCallExpr;
import org.apache.impala.analysis.FunctionName;
import org.apache.impala.analysis.FunctionParams;
import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.NotImplementedException;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TScanRangeSpec;
import org.apache.impala.thrift.TTableStats;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
/**
* Representation of the common elements of all scan nodes.
*/
abstract public class ScanNode extends PlanNode {
// Factor capturing the worst-case deviation from a uniform distribution of scan ranges
// among nodes. The factor of 1.2 means that a particular node may have 20% more
// scan ranges than would have been estimated assuming a uniform distribution.
// Used for HDFS and Kudu Scan node estimations.
protected static final double SCAN_RANGE_SKEW_FACTOR = 1.2;
protected final TupleDescriptor desc_;
// Total number of rows this node is expected to process
protected long inputCardinality_ = -1;
// Scan-range specs. Populated in init().
protected TScanRangeSpec scanRangeSpecs_;
// The AggregationInfo from the query block of this scan node. Used for determining if
// the count(*) optimization can be applied.
// Count(*) aggregation optimization flow:
// The caller passes in an AggregateInfo to the constructor that this scan node uses to
// determine whether to apply the optimization or not. The produced smap must then be
// applied to the AggregateInfo in this query block. We do not apply the smap in this
// class directly to avoid side effects and make it easier to reason about.
protected AggregateInfo aggInfo_ = null;
protected static final String STATS_NUM_ROWS = "stats: num_rows";
// Should be applied to the AggregateInfo from the same query block. We cannot use the
// PlanNode.outputSmap_ for this purpose because we don't want the smap entries to be
// propagated outside the query block.
protected ExprSubstitutionMap optimizedAggSmap_;
public ScanNode(PlanNodeId id, TupleDescriptor desc, String displayName) {
super(id, desc.getId().asList(), displayName);
desc_ = desc;
}
public TupleDescriptor getTupleDesc() { return desc_; }
/**
* Checks if this scan is supported based on the types of scanned columns and the
* underlying file formats, in particular, whether complex types are supported.
*
* The default implementation throws if this scan would need to materialize a nested
* field or collection. The scan is ok if the table schema contains complex types, as
* long as the query does not reference them.
*
* Subclasses should override this function as appropriate.
*/
protected void checkForSupportedFileFormats() throws NotImplementedException {
Preconditions.checkNotNull(desc_);
Preconditions.checkNotNull(desc_.getTable());
for (SlotDescriptor slotDesc: desc_.getSlots()) {
if (slotDesc.getType().isComplexType() || slotDesc.getColumn() == null) {
Preconditions.checkNotNull(slotDesc.getPath());
throw new NotImplementedException(String.format(
"Scan of table '%s' is not supported because '%s' references a nested " +
"field/collection.\nComplex types are supported for these file formats: %s.",
slotDesc.getPath().toString(), desc_.getAlias(),
Joiner.on(", ").join(HdfsFileFormat.complexTypesFormats())));
}
}
}
protected boolean isCountStarOptimizationDescriptor(SlotDescriptor desc) {
return desc.getLabel().equals(STATS_NUM_ROWS);
}
/**
* Adds a new slot descriptor to the tuple descriptor of this scan. The new slot will be
* used for storing the data extracted from the Kudu num rows statistic. Also adds an
* entry to 'optimizedAggSmap_' that substitutes count(*) with
* sum_init_zero(<new-slotref>). Returns the new slot descriptor.
*/
protected SlotDescriptor applyCountStarOptimization(Analyzer analyzer) {
FunctionCallExpr countFn = new FunctionCallExpr(new FunctionName("count"),
FunctionParams.createStarParam());
countFn.analyzeNoThrow(analyzer);
// Create the sum function.
SlotDescriptor sd = analyzer.addSlotDescriptor(getTupleDesc());
sd.setType(Type.BIGINT);
sd.setIsMaterialized(true);
sd.setIsNullable(false);
sd.setLabel(STATS_NUM_ROWS);
List<Expr> args = new ArrayList<>();
args.add(new SlotRef(sd));
FunctionCallExpr sumFn = new FunctionCallExpr("sum_init_zero", args);
sumFn.analyzeNoThrow(analyzer);
optimizedAggSmap_ = new ExprSubstitutionMap();
optimizedAggSmap_.put(countFn, sumFn);
return sd;
}
/**
* Returns true if the count(*) optimization can be applied to the query block
* of this scan node.
*/
protected boolean canApplyCountStarOptimization(Analyzer analyzer) {
if (analyzer.getNumTableRefs() != 1) return false;
if (aggInfo_ == null || !aggInfo_.hasCountStarOnly()) return false;
if (!conjuncts_.isEmpty()) return false;
return desc_.getMaterializedSlots().isEmpty() || desc_.hasClusteringColsOnly();
}
/**
* Returns all scan range specs.
*/
public TScanRangeSpec getScanRangeSpecs() {
Preconditions.checkNotNull(scanRangeSpecs_, "Need to call init() first.");
return scanRangeSpecs_;
}
@Override
protected String debugString() {
return Objects.toStringHelper(this)
.add("tid", desc_.getId().asInt())
.add("tblName", desc_.getTable().getFullName())
.add("keyRanges", "")
.addValue(super.debugString())
.toString();
}
/**
* Returns the explain string for table stats to be included into this ScanNode's
* explain string. The prefix is prepended to each returned line for proper formatting
* when the string returned by this method is embedded in a query's explain plan.
*/
protected String getTableStatsExplainString(String prefix) {
TTableStats tblStats = desc_.getTable().getTTableStats();
return new StringBuilder()
.append(prefix)
.append("table: rows=")
.append(PrintUtils.printEstCardinality(tblStats.num_rows))
.toString();
}
/**
* Returns the explain string for column stats to be included into this ScanNode's
* explain string. The prefix is prepended to each returned line for proper formatting
* when the string returned by this method is embedded in a query's explain plan.
*/
protected String getColumnStatsExplainString(String prefix) {
StringBuilder output = new StringBuilder();
List<String> columnsMissingStats = new ArrayList<>();
for (SlotDescriptor slot: desc_.getSlots()) {
if (!slot.getStats().hasStats() && slot.getColumn() != null) {
columnsMissingStats.add(slot.getColumn().getName());
}
}
output.append(prefix);
if (columnsMissingStats.isEmpty()) {
output.append("columns: all");
} else if (columnsMissingStats.size() == desc_.getSlots().size()) {
output.append("columns: unavailable");
} else {
output.append(String.format("columns missing stats: %s",
Joiner.on(", ").join(columnsMissingStats)));
}
return output.toString();
}
/**
* Combines the explain string for table and column stats.
*/
protected String getStatsExplainString(String prefix) {
StringBuilder output = new StringBuilder(prefix);
output.append("stored statistics:\n");
prefix = prefix + " ";
output.append(getTableStatsExplainString(prefix));
output.append("\n");
output.append(getColumnStatsExplainString(prefix));
return output.toString();
}
/**
* Returns true if the table underlying this scan is missing table stats
* or column stats relevant to this scan node.
*/
public boolean isTableMissingStats() {
return isTableMissingColumnStats() || isTableMissingTableStats();
}
public boolean isTableMissingTableStats() {
return desc_.getTable().getNumRows() == -1;
}
/**
* Returns true if the tuple descriptor references a path with a collection type.
*/
public boolean isAccessingCollectionType() {
for (Type t: desc_.getPath().getMatchedTypes()) {
if (t.isCollectionType()) return true;
}
return false;
}
/**
* Returns true if the column does not have stats, complex type columns are skipped.
*/
public boolean isTableMissingColumnStats() {
for (SlotDescriptor slot: desc_.getSlots()) {
if (slot.getColumn() != null && !slot.getStats().hasStats() &&
!slot.getColumn().getType().isComplexType()) {
return true;
}
}
return false;
}
/**
* Returns true, if the scanned table is suspected to have corrupt table stats,
* in particular, if the scan is non-empty and 'numRows' is 0 or negative (but not -1).
*/
public boolean hasCorruptTableStats() { return false; }
/**
* Helper function to parse a "host:port" address string into TNetworkAddress
* This is called with ipaddress:port when doing scan range assignment.
*/
protected static TNetworkAddress addressToTNetworkAddress(String address) {
TNetworkAddress result = new TNetworkAddress();
String[] hostPort = address.split(":");
result.hostname = hostPort[0];
result.port = Integer.parseInt(hostPort[1]);
return result;
}
@Override
public long getInputCardinality() {
if (!hasScanConjuncts() && !hasStorageLayerConjuncts() && hasLimit()) {
return getLimit();
}
return inputCardinality_;
}
@Override
protected String getDisplayLabelDetail() {
FeTable table = desc_.getTable();
List<String> path = new ArrayList<>();
path.add(table.getDb().getName());
path.add(table.getName());
Preconditions.checkNotNull(desc_.getPath());
if (desc_.hasExplicitAlias()) {
return desc_.getPath().toString() + " " + desc_.getAlias();
} else {
return desc_.getPath().toString();
}
}
/**
* Helper function that returns the estimated number of scan ranges that would
* be assigned to each host based on the total number of scan ranges.
*/
protected int estimatePerHostScanRanges(long totalNumOfScanRanges) {
return (int) Math.ceil(((double) totalNumOfScanRanges / (double) numNodes_) *
SCAN_RANGE_SKEW_FACTOR);
}
/**
* Helper function that returns the max number of scanner threads that can be
* spawned by a scan node.
*/
protected int computeMaxNumberOfScannerThreads(TQueryOptions queryOptions,
int perHostScanRanges) {
// The non-MT scan node requires at least one scanner thread.
if (queryOptions.getMt_dop() >= 1) {
return 1;
}
int maxScannerThreads = Math.min(perHostScanRanges,
RuntimeEnv.INSTANCE.getNumCores());
// Account for the max scanner threads query option.
if (queryOptions.isSetNum_scanner_threads() &&
queryOptions.getNum_scanner_threads() > 0) {
maxScannerThreads = Math.min(maxScannerThreads,
queryOptions.getNum_scanner_threads());
}
return maxScannerThreads;
}
/**
* Returns true if this node has conjuncts to be evaluated by Impala against the scan
* tuple.
*/
public boolean hasScanConjuncts() { return !getConjuncts().isEmpty(); }
/**
* Returns true if this node has conjuncts to be evaluated by the underlying storage
* engine.
*/
public boolean hasStorageLayerConjuncts() { return false; }
}