blob: ec19602659b1f832adc9a01f7d41e5ecc8b5c4bf [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import org.apache.impala.analysis.AggregateInfo;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.BoolLiteral;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.FunctionCallExpr;
import org.apache.impala.analysis.FunctionName;
import org.apache.impala.analysis.FunctionParams;
import org.apache.impala.analysis.InPredicate;
import org.apache.impala.analysis.IsNullPredicate;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.analysis.NumericLiteral;
import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.StringLiteral;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TKuduScanNode;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TScanRange;
import org.apache.impala.thrift.TScanRangeLocation;
import org.apache.impala.thrift.TScanRangeLocationList;
import org.apache.impala.thrift.TScanRangeSpec;
import org.apache.impala.util.KuduUtil;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduPredicate.ComparisonOp;
import org.apache.kudu.client.KuduScanToken;
import org.apache.kudu.client.KuduScanToken.KuduScanTokenBuilder;
import org.apache.kudu.client.LocatedTablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
/**
* Scan of a single Kudu table.
*
* Extracts predicates that can be pushed down to Kudu. Currently only binary predicates
* that have a constant expression on one side and a slot ref on the other can be
* evaluated by Kudu.
*
* Uses the Kudu ScanToken API to generate a set of Kudu "scan tokens" which are used for
* scheduling and initializing the scanners. Scan tokens are opaque objects that represent
* a scan for some Kudu data on a tablet (currently one token represents one tablet), and
* it contains the tablet locations and all information needed to produce a Kudu scanner,
* including the projected columns and predicates that are pushed down.
*
* After KUDU-1065 is resolved, Kudu will also prune the tablets that don't need to be
* scanned, and only the tokens for those tablets will be returned.
*/
public class KuduScanNode extends ScanNode {
private final static Logger LOG = LoggerFactory.getLogger(KuduScanNode.class);
private final FeKuduTable kuduTable_;
// True if this scan node should use the MT implementation in the backend.
private boolean useMtScanNode_;
// Indexes for the set of hosts that will be used for the query.
// From analyzer.getHostIndex().getIndex(address)
private final Set<Integer> hostIndexSet_ = new HashSet<>();
// List of conjuncts that can be pushed down to Kudu. Used for computing stats and
// explain strings.
private final List<Expr> kuduConjuncts_ = new ArrayList<>();
// Exprs in kuduConjuncts_ converted to KuduPredicates.
private final List<KuduPredicate> kuduPredicates_ = new ArrayList<>();
// Slot that is used to record the Kudu metadata for the count(*) aggregation if
// this scan node has the count(*) optimization enabled.
private SlotDescriptor countStarSlot_ = null;
public KuduScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> conjuncts,
AggregateInfo aggInfo) {
super(id, desc, "SCAN KUDU");
kuduTable_ = (FeKuduTable) desc_.getTable();
conjuncts_ = conjuncts;
aggInfo_ = aggInfo;
}
public ExprSubstitutionMap getOptimizedAggSmap() { return optimizedAggSmap_; }
@Override
public void init(Analyzer analyzer) throws ImpalaRuntimeException {
conjuncts_ = orderConjunctsByCost(conjuncts_);
KuduClient client = KuduUtil.getKuduClient(kuduTable_.getKuduMasterHosts());
try {
org.apache.kudu.client.KuduTable rpcTable =
client.openTable(kuduTable_.getKuduTableName());
validateSchema(rpcTable);
if (canApplyCountStarOptimization(analyzer)) {
Preconditions.checkState(desc_.getPath().destTable() != null);
Preconditions.checkState(kuduConjuncts_.isEmpty());
countStarSlot_ = applyCountStarOptimization(analyzer);
}
// Extract predicates that can be evaluated by Kudu.
extractKuduConjuncts(analyzer, client, rpcTable);
// Materialize the slots of the remaining conjuncts (i.e. those not pushed to Kudu)
analyzer.materializeSlots(conjuncts_);
// Compute mem layout before the scan range locations because creation of the Kudu
// scan tokens depends on having a mem layout.
computeMemLayout(analyzer);
// Creates Kudu scan tokens and sets the scan range locations.
computeScanRangeLocations(analyzer, client, rpcTable);
} catch (Exception e) {
throw new ImpalaRuntimeException("Unable to initialize the Kudu scan node", e);
}
// Determine backend scan node implementation to use.
if (analyzer.getQueryOptions().isSetMt_dop() &&
analyzer.getQueryOptions().mt_dop > 0) {
useMtScanNode_ = true;
} else {
useMtScanNode_ = false;
}
computeStats(analyzer);
}
/**
* Validate the columns Impala expects are actually in the Kudu table.
*/
private void validateSchema(org.apache.kudu.client.KuduTable rpcTable)
throws ImpalaRuntimeException {
Schema tableSchema = rpcTable.getSchema();
for (SlotDescriptor desc: getTupleDesc().getSlots()) {
if (!desc.isScanSlot()) continue;
String colName = ((KuduColumn) desc.getColumn()).getKuduName();
Type colType = desc.getColumn().getType();
ColumnSchema kuduCol = null;
try {
kuduCol = tableSchema.getColumn(colName);
} catch (Exception e) {
throw new ImpalaRuntimeException("Column '" + colName + "' not found in kudu " +
"table " + rpcTable.getName() + ". The table metadata in Impala may be " +
"outdated and need to be refreshed.");
}
Type kuduColType =
KuduUtil.toImpalaType(kuduCol.getType(), kuduCol.getTypeAttributes());
if (!colType.equals(kuduColType)) {
throw new ImpalaRuntimeException("Column '" + colName + "' is type " +
kuduColType.toSql() + " but Impala expected " + colType.toSql() +
". The table metadata in Impala may be outdated and need to be refreshed.");
}
if (desc.getIsNullable() != kuduCol.isNullable()) {
String expected;
String actual;
if (desc.getIsNullable()) {
expected = "nullable";
actual = "not nullable";
} else {
expected = "not nullable";
actual = "nullable";
}
throw new ImpalaRuntimeException("Column '" + colName + "' is " + actual +
" but Impala expected it to be " + expected +
". The table metadata in Impala may be outdated and need to be refreshed.");
}
}
}
/**
* Compute the scan range locations for the given table using the scan tokens.
*/
private void computeScanRangeLocations(Analyzer analyzer,
KuduClient client, org.apache.kudu.client.KuduTable rpcTable)
throws ImpalaRuntimeException {
scanRangeSpecs_ = new TScanRangeSpec();
List<KuduScanToken> scanTokens = createScanTokens(client, rpcTable);
for (KuduScanToken token: scanTokens) {
LocatedTablet tablet = token.getTablet();
List<TScanRangeLocation> locations = new ArrayList<>();
if (tablet.getReplicas().isEmpty()) {
throw new ImpalaRuntimeException(String.format(
"At least one tablet does not have any replicas. Tablet ID: %s",
new String(tablet.getTabletId(), Charsets.UTF_8)));
}
for (LocatedTablet.Replica replica: tablet.getReplicas()) {
TNetworkAddress address =
new TNetworkAddress(replica.getRpcHost(), replica.getRpcPort());
// Use the network address to look up the host in the global list
Integer hostIndex = analyzer.getHostIndex().getIndex(address);
locations.add(new TScanRangeLocation(hostIndex));
hostIndexSet_.add(hostIndex);
}
TScanRange scanRange = new TScanRange();
try {
scanRange.setKudu_scan_token(token.serialize());
} catch (IOException e) {
throw new ImpalaRuntimeException("Unable to serialize Kudu scan token=" +
token.toString(), e);
}
TScanRangeLocationList locs = new TScanRangeLocationList();
locs.setScan_range(scanRange);
locs.setLocations(locations);
scanRangeSpecs_.addToConcrete_ranges(locs);
}
}
/**
* Returns KuduScanTokens for this scan given the projected columns and predicates that
* will be pushed to Kudu. The projected Kudu columns are ordered by offset in an
* Impala tuple to make the Impala and Kudu tuple layouts identical.
*/
private List<KuduScanToken> createScanTokens(KuduClient client,
org.apache.kudu.client.KuduTable rpcTable) {
List<String> projectedCols = new ArrayList<>();
for (SlotDescriptor desc: getTupleDesc().getSlotsOrderedByOffset()) {
if (!isCountStarOptimizationDescriptor(desc)) {
projectedCols.add(((KuduColumn) desc.getColumn()).getKuduName());
}
}
KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(rpcTable);
tokenBuilder.setProjectedColumnNames(projectedCols);
for (KuduPredicate predicate: kuduPredicates_) tokenBuilder.addPredicate(predicate);
return tokenBuilder.build();
}
@Override
protected double computeSelectivity() {
List<Expr> allConjuncts = Lists.newArrayList(
Iterables.concat(conjuncts_, kuduConjuncts_));
return computeCombinedSelectivity(allConjuncts);
}
@Override
protected void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
// Update the number of nodes to reflect the hosts that have relevant data.
numNodes_ = Math.max(1, hostIndexSet_.size());
// Update the cardinality
inputCardinality_ = cardinality_ = kuduTable_.getNumRows();
cardinality_ = applyConjunctsSelectivity(cardinality_);
cardinality_ = capCardinalityAtLimit(cardinality_);
if (LOG.isTraceEnabled()) {
LOG.trace("computeStats KuduScan: cardinality=" + Long.toString(cardinality_));
}
}
@Override
public void computeNodeResourceProfile(TQueryOptions queryOptions) {
// The bulk of memory used by Kudu scan node is generally utilized by the
// RowbatchQueue plus the row batches filled in by the scanner threads and
// waiting to be queued into the RowbatchQueue. Due to a number of factors
// like variable length string columns, mem pool usage pattern,
// variability of the number of scanner threads being spawned and the
// variability of the average RowbatchQueue size, it is increasingly
// difficult to precisely estimate the memory usage. Therefore, we fall back
// to a more simpler approach of using empirically derived estimates.
int numOfScanRanges = scanRangeSpecs_.getConcrete_rangesSize();
int perHostScanRanges = estimatePerHostScanRanges(numOfScanRanges);
int maxScannerThreads = computeMaxNumberOfScannerThreads(queryOptions,
perHostScanRanges);
int num_cols = desc_.getSlots().size();
long estimated_bytes_per_column_per_thread = BackendConfig.INSTANCE.getBackendCfg().
kudu_scanner_thread_estimated_bytes_per_column;
long max_estimated_bytes_per_thread = BackendConfig.INSTANCE.getBackendCfg().
kudu_scanner_thread_max_estimated_bytes;
long mem_estimate_per_thread = Math.min(num_cols *
estimated_bytes_per_column_per_thread, max_estimated_bytes_per_thread);
nodeResourceProfile_ = new ResourceProfileBuilder()
.setMemEstimateBytes(mem_estimate_per_thread * maxScannerThreads)
.setThreadReservation(useMtScanNode_ ? 0 : 1).build();
}
@Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
StringBuilder result = new StringBuilder();
String aliasStr = desc_.hasExplicitAlias() ? " " + desc_.getAlias() : "";
result.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(), displayName_,
kuduTable_.getFullName(), aliasStr));
switch (detailLevel) {
case MINIMAL: break;
case STANDARD: // Fallthrough intended.
case EXTENDED: // Fallthrough intended.
case VERBOSE: {
if (!conjuncts_.isEmpty()) {
result.append(detailPrefix
+ "predicates: " + Expr.getExplainString(conjuncts_, detailLevel) + "\n");
}
if (!kuduConjuncts_.isEmpty()) {
result.append(detailPrefix + "kudu predicates: "
+ Expr.getExplainString(kuduConjuncts_, detailLevel) + "\n");
}
if (!runtimeFilters_.isEmpty()) {
result.append(detailPrefix + "runtime filters: ");
result.append(getRuntimeFilterExplainString(false, detailLevel));
}
}
}
return result.toString();
}
@Override
protected void toThrift(TPlanNode node) {
node.node_type = TPlanNodeType.KUDU_SCAN_NODE;
node.kudu_scan_node = new TKuduScanNode(desc_.getId().asInt());
node.kudu_scan_node.setUse_mt_scan_node(useMtScanNode_);
Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == null));
if (countStarSlot_ != null) {
node.kudu_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset());
}
}
/**
* Extracts predicates from conjuncts_ that can be pushed down to Kudu. Currently only
* binary predicates that have a constant expression on one side and a slot ref on the
* other can be evaluated by Kudu. Only looks at comparisons of constants (i.e., the
* bounds of the result can be evaluated with Expr::GetValue(NULL)). If a conjunct can
* be converted into this form, the normalized expr is added to kuduConjuncts_, a
* KuduPredicate is added to kuduPredicates_, and the original expr from conjuncts_ is
* removed.
*/
private void extractKuduConjuncts(Analyzer analyzer,
KuduClient client, org.apache.kudu.client.KuduTable rpcTable) {
ListIterator<Expr> it = conjuncts_.listIterator();
while (it.hasNext()) {
Expr predicate = it.next();
if (tryConvertBinaryKuduPredicate(analyzer, rpcTable, predicate) ||
tryConvertInListKuduPredicate(analyzer, rpcTable, predicate) ||
tryConvertIsNullKuduPredicate(analyzer, rpcTable, predicate)) {
it.remove();
}
}
}
/**
* If 'expr' can be converted to a KuduPredicate, returns true and updates
* kuduPredicates_ and kuduConjuncts_.
*/
private boolean tryConvertBinaryKuduPredicate(Analyzer analyzer,
org.apache.kudu.client.KuduTable table, Expr expr) {
if (!(expr instanceof BinaryPredicate)) return false;
BinaryPredicate predicate = (BinaryPredicate) expr;
// TODO KUDU-931 look into handling implicit/explicit casts on the SlotRef.
ComparisonOp op = getKuduOperator(predicate.getOp());
if (op == null) return false;
if (!(predicate.getChild(0) instanceof SlotRef)) return false;
SlotRef ref = (SlotRef) predicate.getChild(0);
if (!(predicate.getChild(1) instanceof LiteralExpr)) return false;
LiteralExpr literal = (LiteralExpr) predicate.getChild(1);
// Cannot push predicates with null literal values (KUDU-1595).
if (Expr.IS_NULL_LITERAL.apply(literal)) return false;
String colName = ((KuduColumn) ref.getDesc().getColumn()).getKuduName();
ColumnSchema column = table.getSchema().getColumn(colName);
KuduPredicate kuduPredicate = null;
switch (literal.getType().getPrimitiveType()) {
case BOOLEAN: {
kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
((BoolLiteral)literal).getValue());
break;
}
case TINYINT:
case SMALLINT:
case INT: {
kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
((NumericLiteral)literal).getIntValue());
break;
}
case BIGINT: {
kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
((NumericLiteral)literal).getLongValue());
break;
}
case FLOAT: {
kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
(float)((NumericLiteral)literal).getDoubleValue());
break;
}
case DOUBLE: {
kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
((NumericLiteral)literal).getDoubleValue());
break;
}
case STRING:
case VARCHAR:
case CHAR: {
kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
((StringLiteral)literal).getUnescapedValue());
break;
}
case TIMESTAMP: {
try {
// TODO: Simplify when Impala supports a 64-bit TIMESTAMP type.
kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
KuduUtil.timestampToUnixTimeMicros(analyzer, literal));
} catch (Exception e) {
LOG.info("Exception converting Kudu timestamp predicate: " + expr.toSql(), e);
return false;
}
break;
}
case DECIMAL: {
kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
((NumericLiteral)literal).getValue());
break;
}
default: break;
}
if (kuduPredicate == null) return false;
kuduConjuncts_.add(predicate);
kuduPredicates_.add(kuduPredicate);
return true;
}
/**
* If the InList 'expr' can be converted to a KuduPredicate, returns true and updates
* kuduPredicates_ and kuduConjuncts_.
*/
private boolean tryConvertInListKuduPredicate(Analyzer analyzer,
org.apache.kudu.client.KuduTable table, Expr expr) {
if (!(expr instanceof InPredicate)) return false;
InPredicate predicate = (InPredicate) expr;
// Only convert IN predicates, i.e. cannot convert NOT IN.
if (predicate.isNotIn()) return false;
// Do not convert if there is an implicit cast.
if (!(predicate.getChild(0) instanceof SlotRef)) return false;
SlotRef ref = (SlotRef) predicate.getChild(0);
// KuduPredicate takes a list of values as Objects.
List<Object> values = new ArrayList<>();
for (int i = 1; i < predicate.getChildren().size(); ++i) {
if (!Expr.IS_LITERAL.apply(predicate.getChild(i))) return false;
LiteralExpr literal = (LiteralExpr) predicate.getChild(i);
// Cannot push predicates with null literal values (KUDU-1595).
if (Expr.IS_NULL_LITERAL.apply(literal)) return false;
Object value = getKuduInListValue(analyzer, literal);
if (value == null) return false;
values.add(value);
}
String colName = ((KuduColumn) ref.getDesc().getColumn()).getKuduName();
ColumnSchema column = table.getSchema().getColumn(colName);
kuduPredicates_.add(KuduPredicate.newInListPredicate(column, values));
kuduConjuncts_.add(predicate);
return true;
}
/**
* If IS NULL/IS NOT NULL 'expr' can be converted to a KuduPredicate,
* returns true and updates kuduPredicates_ and kuduConjuncts_.
*/
private boolean tryConvertIsNullKuduPredicate(Analyzer analyzer,
org.apache.kudu.client.KuduTable table, Expr expr) {
if (!(expr instanceof IsNullPredicate)) return false;
IsNullPredicate predicate = (IsNullPredicate) expr;
// Do not convert if expression is more than a SlotRef
// This is true even for casts, as certain casts can take a non-NULL
// value and produce a NULL. For example, CAST('test' as tinyint)
// is NULL.
if (!(predicate.getChild(0) instanceof SlotRef)) return false;
SlotRef ref = (SlotRef) predicate.getChild(0);
String colName = ((KuduColumn) ref.getDesc().getColumn()).getKuduName();
ColumnSchema column = table.getSchema().getColumn(colName);
KuduPredicate kuduPredicate = null;
if (predicate.isNotNull()) {
kuduPredicate = KuduPredicate.newIsNotNullPredicate(column);
} else {
kuduPredicate = KuduPredicate.newIsNullPredicate(column);
}
kuduConjuncts_.add(predicate);
kuduPredicates_.add(kuduPredicate);
return true;
}
/**
* Return the value of the InList child expression 'e' as an Object that can be
* added to a KuduPredicate. If the Expr is not supported by Kudu or the type doesn't
* match the expected PrimitiveType 'type', null is returned.
*/
private static Object getKuduInListValue(Analyzer analyzer, LiteralExpr e) {
switch (e.getType().getPrimitiveType()) {
case BOOLEAN: return ((BoolLiteral) e).getValue();
case TINYINT: return (byte) ((NumericLiteral) e).getLongValue();
case SMALLINT: return (short) ((NumericLiteral) e).getLongValue();
case INT: return (int) ((NumericLiteral) e).getLongValue();
case BIGINT: return ((NumericLiteral) e).getLongValue();
case FLOAT: return (float) ((NumericLiteral) e).getDoubleValue();
case DOUBLE: return ((NumericLiteral) e).getDoubleValue();
case STRING: return ((StringLiteral) e).getUnescapedValue();
case TIMESTAMP: {
try {
// TODO: Simplify when Impala supports a 64-bit TIMESTAMP type.
return KuduUtil.timestampToUnixTimeMicros(analyzer, e);
} catch (Exception ex) {
LOG.info("Exception converting Kudu timestamp expr: " + e.toSql(), ex);
}
break;
}
case DECIMAL: return ((NumericLiteral) e).getValue();
default:
Preconditions.checkState(false,
"Unsupported Kudu type considered for predicate: %s", e.getType().toSql());
}
return null;
}
/**
* Returns a Kudu comparison operator for the BinaryPredicate operator, or null if
* the operation is not supported by Kudu.
*/
private static KuduPredicate.ComparisonOp getKuduOperator(BinaryPredicate.Operator op) {
switch (op) {
case GT: return ComparisonOp.GREATER;
case LT: return ComparisonOp.LESS;
case GE: return ComparisonOp.GREATER_EQUAL;
case LE: return ComparisonOp.LESS_EQUAL;
case EQ: return ComparisonOp.EQUAL;
default: return null;
}
}
@Override
public boolean hasStorageLayerConjuncts() { return !kuduConjuncts_.isEmpty(); }
}