| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.planner; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.impala.analysis.Analyzer; |
| import org.apache.impala.analysis.BinaryPredicate; |
| import org.apache.impala.analysis.BoolLiteral; |
| import org.apache.impala.analysis.CompoundPredicate; |
| import org.apache.impala.analysis.Expr; |
| import org.apache.impala.analysis.LiteralExpr; |
| import org.apache.impala.analysis.NumericLiteral; |
| import org.apache.impala.analysis.SlotRef; |
| import org.apache.impala.analysis.StringLiteral; |
| import org.apache.impala.analysis.TupleDescriptor; |
| import org.apache.impala.catalog.DataSource; |
| import org.apache.impala.catalog.FeDataSourceTable; |
| import org.apache.impala.common.ImpalaException; |
| import org.apache.impala.common.InternalException; |
| import org.apache.impala.extdatasource.ExternalDataSourceExecutor; |
| import org.apache.impala.extdatasource.thrift.TBinaryPredicate; |
| import org.apache.impala.extdatasource.thrift.TColumnDesc; |
| import org.apache.impala.extdatasource.thrift.TComparisonOp; |
| import org.apache.impala.extdatasource.thrift.TPrepareParams; |
| import org.apache.impala.extdatasource.thrift.TPrepareResult; |
| import org.apache.impala.service.FeSupport; |
| import org.apache.impala.thrift.TCacheJarResult; |
| import org.apache.impala.thrift.TColumnValue; |
| import org.apache.impala.thrift.TDataSourceScanNode; |
| import org.apache.impala.thrift.TErrorCode; |
| import org.apache.impala.thrift.TExplainLevel; |
| import org.apache.impala.thrift.TNetworkAddress; |
| import org.apache.impala.thrift.TPlanNode; |
| import org.apache.impala.thrift.TPlanNodeType; |
| import org.apache.impala.thrift.TQueryOptions; |
| import org.apache.impala.thrift.TScanRange; |
| import org.apache.impala.thrift.TScanRangeLocation; |
| import org.apache.impala.thrift.TScanRangeLocationList; |
| import org.apache.impala.thrift.TScanRangeSpec; |
| import org.apache.impala.thrift.TStatus; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Objects; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| |
| /** |
| * Scan of a table provided by an external data source. |
| */ |
| public class DataSourceScanNode extends ScanNode { |
| private final static Logger LOG = LoggerFactory.getLogger(DataSourceScanNode.class); |
| private final TupleDescriptor desc_; |
| private final FeDataSourceTable table_; |
| |
| // The converted conjuncts_ that were accepted by the data source. A conjunct can |
| // be converted if it contains only disjunctive predicates of the form |
| // <slotref> <op> <constant>. |
| private List<List<TBinaryPredicate>> acceptedPredicates_; |
| |
| // The conjuncts that were accepted by the data source and removed from conjuncts_ in |
| // removeAcceptedConjuncts(). Only used in getNodeExplainString() to print the |
| // conjuncts applied by the data source. |
| private List<Expr> acceptedConjuncts_; |
| |
| // The number of rows estimate as returned by prepare(). |
| private long numRowsEstimate_; |
| |
| public DataSourceScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> conjuncts) { |
| super(id, desc, "SCAN DATA SOURCE"); |
| desc_ = desc; |
| table_ = (FeDataSourceTable) desc_.getTable(); |
| conjuncts_ = conjuncts; |
| acceptedPredicates_ = null; |
| acceptedConjuncts_ = null; |
| } |
| |
| @Override |
| public void init(Analyzer analyzer) throws ImpalaException { |
| checkForSupportedFileFormats(); |
| prepareDataSource(); |
| conjuncts_ = orderConjunctsByCost(conjuncts_); |
| computeStats(analyzer); |
| // materialize slots in remaining conjuncts_ |
| analyzer.materializeSlots(conjuncts_); |
| computeMemLayout(analyzer); |
| computeScanRangeLocations(analyzer); |
| } |
| |
| /** |
| * Returns a thrift TColumnValue representing the literal from a binary |
| * predicate, or null if the type cannot be represented. |
| */ |
| public static TColumnValue literalToColumnValue(LiteralExpr expr) { |
| switch (expr.getType().getPrimitiveType()) { |
| case BOOLEAN: |
| return new TColumnValue().setBool_val(((BoolLiteral) expr).getValue()); |
| case TINYINT: |
| return new TColumnValue().setByte_val( |
| (byte) ((NumericLiteral) expr).getLongValue()); |
| case SMALLINT: |
| return new TColumnValue().setShort_val( |
| (short) ((NumericLiteral) expr).getLongValue()); |
| case INT: |
| return new TColumnValue().setInt_val( |
| (int) ((NumericLiteral) expr).getLongValue()); |
| case BIGINT: |
| return new TColumnValue().setLong_val(((NumericLiteral) expr).getLongValue()); |
| case FLOAT: |
| case DOUBLE: |
| return new TColumnValue().setDouble_val( |
| ((NumericLiteral) expr).getDoubleValue()); |
| case STRING: |
| return new TColumnValue().setString_val( |
| ((StringLiteral) expr).getUnescapedValue()); |
| case DECIMAL: |
| case DATE: |
| case DATETIME: |
| case TIMESTAMP: |
| // TODO: we support DECIMAL, TIMESTAMP and DATE but no way to specify it in SQL. |
| return null; |
| default: |
| Preconditions.checkState(false); |
| return null; |
| } |
| } |
| |
| /** |
| * Calls prepare() on the data source to determine accepted predicates and get |
| * stats. The accepted predicates are moved from conjuncts_ into acceptedConjuncts_ |
| * and the associated TBinaryPredicates are set in acceptedPredicates_. |
| */ |
| private void prepareDataSource() throws InternalException { |
| // Binary predicates that will be offered to the data source. |
| List<List<TBinaryPredicate>> offeredPredicates = new ArrayList<>(); |
| // The index into conjuncts_ for each element in offeredPredicates. |
| List<Integer> conjunctsIdx = new ArrayList<>(); |
| for (int i = 0; i < conjuncts_.size(); ++i) { |
| Expr conjunct = conjuncts_.get(i); |
| List<TBinaryPredicate> disjuncts = getDisjuncts(conjunct); |
| if (disjuncts != null) { |
| offeredPredicates.add(disjuncts); |
| conjunctsIdx.add(i); |
| } |
| } |
| |
| String hdfsLocation = table_.getDataSource().getHdfs_location(); |
| TCacheJarResult cacheResult = FeSupport.CacheJar(hdfsLocation); |
| TStatus cacheJarStatus = cacheResult.getStatus(); |
| if (cacheJarStatus.getStatus_code() != TErrorCode.OK) { |
| throw new InternalException(String.format( |
| "Unable to cache data source library at location '%s'. Check that the file " + |
| "exists and is readable. Message: %s", |
| hdfsLocation, Joiner.on("\n").join(cacheJarStatus.getError_msgs()))); |
| } |
| String localPath = cacheResult.getLocal_path(); |
| String className = table_.getDataSource().getClass_name(); |
| String apiVersion = table_.getDataSource().getApi_version(); |
| TPrepareResult prepareResult; |
| TStatus prepareStatus; |
| try { |
| ExternalDataSourceExecutor executor = new ExternalDataSourceExecutor( |
| localPath, className, apiVersion, table_.getInitString()); |
| TPrepareParams prepareParams = new TPrepareParams(); |
| prepareParams.setInit_string(table_.getInitString()); |
| prepareParams.setPredicates(offeredPredicates); |
| // TODO: Include DB (i.e. getFullName())? |
| prepareParams.setTable_name(table_.getName()); |
| prepareResult = executor.prepare(prepareParams); |
| prepareStatus = prepareResult.getStatus(); |
| } catch (Exception e) { |
| throw new InternalException(String.format( |
| "Error calling prepare() on data source %s", |
| DataSource.debugString(table_.getDataSource())), e); |
| } |
| if (prepareStatus.getStatus_code() != TErrorCode.OK) { |
| throw new InternalException(String.format( |
| "Data source %s returned an error from prepare(): %s", |
| DataSource.debugString(table_.getDataSource()), |
| Joiner.on("\n").join(prepareStatus.getError_msgs()))); |
| } |
| |
| numRowsEstimate_ = prepareResult.getNum_rows_estimate(); |
| acceptedPredicates_ = new ArrayList<>(); |
| List<Integer> acceptedPredicatesIdx = prepareResult.isSetAccepted_conjuncts() ? |
| prepareResult.getAccepted_conjuncts() : ImmutableList.<Integer>of(); |
| for (Integer acceptedIdx: acceptedPredicatesIdx) { |
| acceptedPredicates_.add(offeredPredicates.get(acceptedIdx)); |
| } |
| removeAcceptedConjuncts(acceptedPredicatesIdx, conjunctsIdx); |
| } |
| |
| /** |
| * Converts the conjunct to a list of TBinaryPredicates if it contains only |
| * disjunctive predicates of the form {slotref} {op} {constant} that can be represented |
| * by TBinaryPredicates. If the Expr cannot be converted, null is returned. |
| * TODO: Move this to Expr. |
| */ |
| private List<TBinaryPredicate> getDisjuncts(Expr conjunct) { |
| List<TBinaryPredicate> disjuncts = new ArrayList<>(); |
| if (getDisjunctsHelper(conjunct, disjuncts)) return disjuncts; |
| return null; |
| } |
| |
| // Recursive helper method for getDisjuncts(). |
| private boolean getDisjunctsHelper(Expr conjunct, |
| List<TBinaryPredicate> predicates) { |
| if (conjunct instanceof BinaryPredicate) { |
| if (conjunct.getChildren().size() != 2) return false; |
| SlotRef slotRef = null; |
| LiteralExpr literalExpr = null; |
| TComparisonOp op = null; |
| if ((conjunct.getChild(0).unwrapSlotRef(true) instanceof SlotRef) && |
| (conjunct.getChild(1) instanceof LiteralExpr)) { |
| slotRef = conjunct.getChild(0).unwrapSlotRef(true); |
| literalExpr = (LiteralExpr) conjunct.getChild(1); |
| op = ((BinaryPredicate) conjunct).getOp().getThriftOp(); |
| } else if ((conjunct.getChild(1).unwrapSlotRef(true) instanceof SlotRef) && |
| (conjunct.getChild(0) instanceof LiteralExpr)) { |
| slotRef = conjunct.getChild(1).unwrapSlotRef(true); |
| literalExpr = (LiteralExpr) conjunct.getChild(0); |
| op = ((BinaryPredicate) conjunct).getOp().converse().getThriftOp(); |
| } else { |
| return false; |
| } |
| |
| TColumnValue val = literalToColumnValue(literalExpr); |
| if (val == null) return false; // false if unsupported type, e.g. |
| |
| String colName = Joiner.on(".").join(slotRef.getResolvedPath().getRawPath()); |
| TColumnDesc col = new TColumnDesc().setName(colName).setType( |
| slotRef.getType().toThrift()); |
| predicates.add(new TBinaryPredicate().setCol(col).setOp(op).setValue(val)); |
| return true; |
| } else if (conjunct instanceof CompoundPredicate) { |
| CompoundPredicate compoundPredicate = ((CompoundPredicate) conjunct); |
| if (compoundPredicate.getOp() != CompoundPredicate.Operator.OR) return false; |
| if (!getDisjunctsHelper(conjunct.getChild(0), predicates)) return false; |
| if (!getDisjunctsHelper(conjunct.getChild(1), predicates)) return false; |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| @Override |
| public void computeStats(Analyzer analyzer) { |
| super.computeStats(analyzer); |
| inputCardinality_ = numRowsEstimate_; |
| cardinality_ = numRowsEstimate_; |
| cardinality_ = applyConjunctsSelectivity(cardinality_); |
| cardinality_ = Math.max(1, cardinality_); |
| cardinality_ = capCardinalityAtLimit(cardinality_); |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("computeStats DataSourceScan: cardinality=" + Long.toString(cardinality_)); |
| } |
| |
| numNodes_ = table_.getNumNodes(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("computeStats DataSourceScan: #nodes=" + Integer.toString(numNodes_)); |
| } |
| } |
| |
| @Override |
| protected String debugString() { |
| return Objects.toStringHelper(this) |
| .add("tid", desc_.getId().asInt()) |
| .add("tblName", table_.getFullName()) |
| .add("dataSource", DataSource.debugString(table_.getDataSource())) |
| .add("initString", table_.getInitString()) |
| .addValue(super.debugString()) |
| .toString(); |
| } |
| |
| /** |
| * Removes the predicates from conjuncts_ that were accepted by the data source. |
| * Stores the accepted conjuncts in acceptedConjuncts_. |
| */ |
| private void removeAcceptedConjuncts(List<Integer> acceptedPredicatesIdx, |
| List<Integer> conjunctsIdx) { |
| acceptedConjuncts_ = new ArrayList<>(); |
| // Because conjuncts_ is modified in place using positional indexes from |
| // conjunctsIdx, we remove the accepted predicates in reverse order. |
| for (int i = acceptedPredicatesIdx.size() - 1; i >= 0; --i) { |
| int acceptedPredIdx = acceptedPredicatesIdx.get(i); |
| int conjunctIdx = conjunctsIdx.get(acceptedPredIdx); |
| acceptedConjuncts_.add(conjuncts_.remove(conjunctIdx)); |
| } |
| // Returns a view of the list in the original order as we will print these |
| // in the explain string and it's convenient to have predicates printed |
| // in the same order that they're specified. |
| acceptedConjuncts_ = Lists.reverse(acceptedConjuncts_); |
| } |
| |
| @Override |
| protected void toThrift(TPlanNode msg) { |
| Preconditions.checkNotNull(acceptedPredicates_); |
| msg.node_type = TPlanNodeType.DATA_SOURCE_NODE; |
| msg.data_source_node = new TDataSourceScanNode(desc_.getId().asInt(), |
| table_.getDataSource(), table_.getInitString(), acceptedPredicates_); |
| } |
| |
| /** |
| * Create a single scan range for the localhost. |
| */ |
| private void computeScanRangeLocations(Analyzer analyzer) { |
| // TODO: Does the port matter? |
| TNetworkAddress networkAddress = addressToTNetworkAddress("localhost:12345"); |
| Integer hostIndex = analyzer.getHostIndex().getIndex(networkAddress); |
| scanRangeSpecs_ = new TScanRangeSpec(); |
| scanRangeSpecs_.addToConcrete_ranges(new TScanRangeLocationList( |
| new TScanRange(), Lists.newArrayList(new TScanRangeLocation(hostIndex)))); |
| } |
| |
| @Override |
| public void computeNodeResourceProfile(TQueryOptions queryOptions) { |
| // This node fetches a thrift representation of the rows from the data |
| // source. The memory used by it is unfortunately allocated on the heap and |
| // is never accounted for against its associated mem tracker. Therefore this |
| // node will never show any memory usage on the query summary. However, |
| // since it does contribute to untracked memory, retaining the conservative |
| // estimate of 1 GB to account for the thrift row-batch structure would |
| // ensure it does not change the current behavior. |
| nodeResourceProfile_ = ResourceProfile.noReservation(1024L * 1024L * 1024L); |
| } |
| |
| @Override |
| protected String getNodeExplainString(String prefix, String detailPrefix, |
| TExplainLevel detailLevel) { |
| StringBuilder output = new StringBuilder(); |
| String aliasStr = ""; |
| if (!table_.getFullName().equalsIgnoreCase(desc_.getAlias()) && |
| !table_.getName().equalsIgnoreCase(desc_.getAlias())) { |
| aliasStr = " " + desc_.getAlias(); |
| } |
| output.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(), |
| displayName_, table_.getFullName(), aliasStr)); |
| |
| if (!acceptedConjuncts_.isEmpty()) { |
| output.append(prefix + "data source predicates: " |
| + Expr.getExplainString(acceptedConjuncts_, detailLevel) + "\n"); |
| } |
| if (!conjuncts_.isEmpty()) { |
| output.append(prefix + "predicates: " + |
| Expr.getExplainString(conjuncts_, detailLevel) + "\n"); |
| } |
| |
| // Add table and column stats in verbose mode. |
| if (detailLevel == TExplainLevel.VERBOSE) { |
| output.append(getStatsExplainString(prefix)); |
| output.append("\n"); |
| } |
| return output.toString(); |
| } |
| |
| @Override |
| public boolean hasStorageLayerConjuncts() { return !acceptedConjuncts_.isEmpty(); } |
| } |