| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.planner; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.impala.analysis.Analyzer; |
| import org.apache.impala.analysis.Expr; |
| import org.apache.impala.analysis.ExprId; |
| import org.apache.impala.analysis.ExprSubstitutionMap; |
| import org.apache.impala.analysis.ToSqlOptions; |
| import org.apache.impala.analysis.TupleDescriptor; |
| import org.apache.impala.analysis.TupleId; |
| import org.apache.impala.common.ImpalaException; |
| import org.apache.impala.common.PrintUtils; |
| import org.apache.impala.common.TreeNode; |
| import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter; |
| import org.apache.impala.thrift.TExecNodePhase; |
| import org.apache.impala.thrift.TExecStats; |
| import org.apache.impala.thrift.TExplainLevel; |
| import org.apache.impala.thrift.TPlan; |
| import org.apache.impala.thrift.TPlanNode; |
| import org.apache.impala.thrift.TQueryOptions; |
| import org.apache.impala.thrift.TSortingOrder; |
| import org.apache.impala.util.BitUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| import com.google.common.math.LongMath; |
| |
| /** |
| * Each PlanNode represents a single relational operator |
| * and encapsulates the information needed by the planner to |
| * make optimization decisions. |
| * |
| * finalize(): Computes internal state, such as keys for scan nodes; gets called once on |
| * the root of the plan tree before the call to toThrift(). Also finalizes the set |
| * of conjuncts, such that each remaining one requires all of its referenced slots to |
| * be materialized (ie, can be evaluated by calling GetValue(), rather than being |
| * implicitly evaluated as part of a scan key). |
| * |
| * conjuncts_: Each node has a list of conjuncts that can be executed in the context of |
| * this node, ie, they only reference tuples materialized by this node or one of |
| * its children (= are bound by tupleIds_). |
| */ |
| abstract public class PlanNode extends TreeNode<PlanNode> { |
| private final static Logger LOG = LoggerFactory.getLogger(PlanNode.class); |
| |
| // The default row batch size used if the BATCH_SIZE query option is not set |
| // or is less than 1. Must be in sync with QueryState::DEFAULT_BATCH_SIZE. |
| protected final static int DEFAULT_ROWBATCH_SIZE = 1024; |
| |
| // Max memory that a row batch can accumulate before it is considered at capacity. |
| // This is a soft capacity: row batches may exceed the capacity, preferably only by a |
| // row's worth of data. Must be in sync with RowBatch::AT_CAPACITY_MEM_USAGE. |
| protected final static int ROWBATCH_MAX_MEM_USAGE = 8 * 1024 * 1024; |
| |
| // String used for this node in getExplainString(). |
| protected String displayName_; |
| |
| // unique w/in plan tree; assigned by planner, and not necessarily in c'tor |
| protected PlanNodeId id_; |
| |
| protected long limit_; // max. # of rows to be returned; 0: no limit_ |
| |
| // ids materialized by the tree rooted at this node |
| protected List<TupleId> tupleIds_; |
| |
| // ids of the TblRefs "materialized" by this node; identical with tupleIds_ |
| // if the tree rooted at this node only materializes BaseTblRefs; |
| // useful during plan generation |
| protected List<TupleId> tblRefIds_; |
| |
| // A set of nullable TupleId produced by this node. It is a subset of tupleIds_. |
| // A tuple is nullable within a particular plan tree if it's the "nullable" side of |
| // an outer join, which has nothing to do with the schema. |
| protected Set<TupleId> nullableTupleIds_ = new HashSet<>(); |
| |
| protected List<Expr> conjuncts_ = new ArrayList<>(); |
| |
| // Fragment that this PlanNode is executed in. Valid only after this PlanNode has been |
| // assigned to a fragment. Set and maintained by enclosing PlanFragment. |
| protected PlanFragment fragment_; |
| |
| // If set, needs to be applied by parent node to reference this node's output. The |
| // entries need to be propagated all the way to the root node. |
| protected ExprSubstitutionMap outputSmap_; |
| |
| // global state of planning wrt conjunct assignment; used by planner as a shortcut |
| // to avoid having to pass assigned conjuncts back and forth |
| // (the planner uses this to save and reset the global state in between join tree |
| // alternatives) |
| // TODO for 2.3: Save this state in the PlannerContext instead. |
| protected Set<ExprId> assignedConjuncts_; |
| |
| // estimate of the output cardinality of this node; set in computeStats(); |
| // invalid: -1 |
| protected long cardinality_; |
| |
| // number of nodes on which the plan tree rooted at this node would execute; |
| // set in computeStats(); invalid: -1 |
| protected int numNodes_; |
| |
| // resource requirements and estimates for this plan node. |
| // Initialized with a dummy value. Gets set correctly in |
| // computeResourceProfile(). |
| protected ResourceProfile nodeResourceProfile_ = ResourceProfile.invalid(); |
| |
| // List of query execution pipelines that this node executes as a part of. |
| protected List<PipelineMembership> pipelines_; |
| |
| // sum of tupleIds_' avgSerializedSizes; set in computeStats() |
| protected float avgRowSize_; |
| |
| // If true, disable codegen for this plan node. |
| protected boolean disableCodegen_; |
| |
| // Runtime filters assigned to this node. |
| protected List<RuntimeFilter> runtimeFilters_ = new ArrayList<>(); |
| |
| protected PlanNode(PlanNodeId id, List<TupleId> tupleIds, String displayName) { |
| this(id, displayName); |
| tupleIds_.addAll(tupleIds); |
| tblRefIds_.addAll(tupleIds); |
| } |
| |
| /** |
| * Deferred id_ assignment. |
| */ |
| protected PlanNode(String displayName) { |
| this(null, displayName); |
| } |
| |
| protected PlanNode(PlanNodeId id, String displayName) { |
| id_ = id; |
| limit_ = -1; |
| tupleIds_ = new ArrayList<>(); |
| tblRefIds_ = new ArrayList<>(); |
| cardinality_ = -1; |
| numNodes_ = -1; |
| displayName_ = displayName; |
| disableCodegen_ = false; |
| } |
| |
| /** |
| * Copy c'tor. Also passes in new id_. |
| */ |
| protected PlanNode(PlanNodeId id, PlanNode node, String displayName) { |
| id_ = id; |
| limit_ = node.limit_; |
| tupleIds_ = Lists.newArrayList(node.tupleIds_); |
| tblRefIds_ = Lists.newArrayList(node.tblRefIds_); |
| nullableTupleIds_ = Sets.newHashSet(node.nullableTupleIds_); |
| conjuncts_ = Expr.cloneList(node.conjuncts_); |
| cardinality_ = -1; |
| numNodes_ = -1; |
| displayName_ = displayName; |
| disableCodegen_ = node.disableCodegen_; |
| } |
| |
| /** |
| * Sets tblRefIds_, tupleIds_, and nullableTupleIds_. |
| * The default implementation is a no-op. |
| */ |
| public void computeTupleIds() { |
| Preconditions.checkState(children_.isEmpty() || !tupleIds_.isEmpty()); |
| } |
| |
| /** |
| * Clears tblRefIds_, tupleIds_, and nullableTupleIds_. |
| */ |
| protected void clearTupleIds() { |
| tblRefIds_.clear(); |
| tupleIds_.clear(); |
| nullableTupleIds_.clear(); |
| } |
| |
| public PlanNodeId getId() { return id_; } |
| public List<PipelineMembership> getPipelines() { return pipelines_; } |
| public void setId(PlanNodeId id) { |
| Preconditions.checkState(id_ == null); |
| id_ = id; |
| } |
| public long getLimit() { return limit_; } |
| public boolean hasLimit() { return limit_ > -1; } |
| public long getCardinality() { return cardinality_; } |
| public int getNumNodes() { return numNodes_; } |
| public ResourceProfile getNodeResourceProfile() { return nodeResourceProfile_; } |
| public float getAvgRowSize() { return avgRowSize_; } |
| public void setFragment(PlanFragment fragment) { fragment_ = fragment; } |
| public PlanFragment getFragment() { return fragment_; } |
| public List<Expr> getConjuncts() { return conjuncts_; } |
| public ExprSubstitutionMap getOutputSmap() { return outputSmap_; } |
| public void setOutputSmap(ExprSubstitutionMap smap) { outputSmap_ = smap; } |
| public Set<ExprId> getAssignedConjuncts() { return assignedConjuncts_; } |
| public void setAssignedConjuncts(Set<ExprId> conjuncts) { |
| assignedConjuncts_ = conjuncts; |
| } |
| |
| /** |
| * Set the limit_ to the given limit_ only if the limit_ hasn't been set, or the new limit_ |
| * is lower. |
| */ |
| public void setLimit(long limit) { |
| if (limit_ == -1 || (limit != -1 && limit_ > limit)) limit_ = limit; |
| } |
| |
| public void unsetLimit() { limit_ = -1; } |
| |
| public List<TupleId> getTupleIds() { |
| Preconditions.checkState(tupleIds_ != null); |
| return tupleIds_; |
| } |
| |
| public List<TupleId> getTblRefIds() { return tblRefIds_; } |
| public void setTblRefIds(List<TupleId> ids) { tblRefIds_ = ids; } |
| |
| public Set<TupleId> getNullableTupleIds() { |
| Preconditions.checkState(nullableTupleIds_ != null); |
| return nullableTupleIds_; |
| } |
| |
| public void addConjuncts(List<Expr> conjuncts) { |
| if (conjuncts == null) return; |
| conjuncts_.addAll(conjuncts); |
| } |
| |
| public void transferConjuncts(PlanNode recipient) { |
| recipient.conjuncts_.addAll(conjuncts_); |
| conjuncts_.clear(); |
| } |
| |
| public String getExplainString(TQueryOptions queryOptions) { |
| return getExplainString("", "", queryOptions, TExplainLevel.VERBOSE); |
| } |
| |
| protected void setDisplayName(String s) { displayName_ = s; } |
| |
| final protected String getDisplayLabel() { |
| return String.format("%s:%s", id_.toString(), displayName_); |
| } |
| |
| /** |
| * Subclasses can override to provide a node specific detail string that |
| * is displayed to the user. |
| * e.g. scan can return the table name. |
| */ |
| protected String getDisplayLabelDetail() { return ""; } |
| |
| /** |
| * Generate the explain plan tree. The plan will be in the form of: |
| * |
| * root |
| * | |
| * |----child 3 |
| * | limit:1 |
| * | |
| * |----child 2 |
| * | limit:2 |
| * | |
| * child 1 |
| * |
| * The root node header line will be prefixed by rootPrefix and the remaining plan |
| * output will be prefixed by prefix. |
| */ |
| protected final String getExplainString(String rootPrefix, String prefix, |
| TQueryOptions queryOptions, TExplainLevel detailLevel) { |
| StringBuilder expBuilder = new StringBuilder(); |
| String detailPrefix = prefix; |
| String filler; |
| boolean printFiller = (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()); |
| |
| // Do not traverse into the children of an Exchange node to avoid crossing |
| // fragment boundaries. |
| boolean traverseChildren = !children_.isEmpty() && |
| !(this instanceof ExchangeNode && detailLevel == TExplainLevel.VERBOSE); |
| |
| if (traverseChildren) { |
| detailPrefix += "| "; |
| filler = prefix + "|"; |
| } else { |
| detailPrefix += " "; |
| filler = prefix; |
| } |
| |
| // Print the current node |
| // The plan node header line will be prefixed by rootPrefix and the remaining details |
| // will be prefixed by detailPrefix. |
| expBuilder.append(getNodeExplainString(rootPrefix, detailPrefix, detailLevel)); |
| |
| if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal() && |
| !(this instanceof SortNode)) { |
| if (limit_ != -1) expBuilder.append(detailPrefix + "limit: " + limit_ + "\n"); |
| expBuilder.append(getOffsetExplainString(detailPrefix)); |
| } |
| |
| // Output cardinality, cost estimates and tuple Ids only when explain plan level |
| // is extended or above. |
| boolean displayCardinality = displayCardinality(detailLevel); |
| if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { |
| // Print resource profile. |
| expBuilder.append(detailPrefix); |
| expBuilder.append(nodeResourceProfile_.getExplainString()); |
| expBuilder.append("\n"); |
| |
| // Print tuple ids, row size and cardinality. |
| expBuilder.append(detailPrefix + "tuple-ids="); |
| for (int i = 0; i < tupleIds_.size(); ++i) { |
| TupleId tupleId = tupleIds_.get(i); |
| String nullIndicator = nullableTupleIds_.contains(tupleId) ? "N" : ""; |
| expBuilder.append(tupleId.asInt() + nullIndicator); |
| if (i + 1 != tupleIds_.size()) expBuilder.append(","); |
| } |
| expBuilder.append(displayCardinality ? " " : "\n"); |
| } |
| // Output cardinality: in standard and above levels. |
| // In standard, on a line by itself (if wanted). In extended, on |
| // a line with tuple ids. |
| if (displayCardinality) { |
| if (detailLevel == TExplainLevel.STANDARD) expBuilder.append(detailPrefix); |
| expBuilder.append("row-size=") |
| .append(PrintUtils.printBytes(Math.round(avgRowSize_))) |
| .append(" cardinality=") |
| .append(PrintUtils.printEstCardinality(cardinality_)) |
| .append("\n"); |
| } |
| |
| if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { |
| expBuilder.append(detailPrefix); |
| expBuilder.append("in pipelines: "); |
| if (pipelines_ != null) { |
| List<String> pipelines = new ArrayList<>(); |
| for (PipelineMembership pipe: pipelines_) { |
| pipelines.add(pipe.getExplainString()); |
| } |
| if (pipelines.isEmpty()) expBuilder.append("<none>"); |
| else expBuilder.append(Joiner.on(", ").join(pipelines)); |
| expBuilder.append("\n"); |
| } else { |
| expBuilder.append("<not computed>"); |
| } |
| } |
| |
| // Print the children. Do not traverse into the children of an Exchange node to |
| // avoid crossing fragment boundaries. |
| if (traverseChildren) { |
| if (printFiller) expBuilder.append(filler + "\n"); |
| String childHeadlinePrefix = prefix + "|--"; |
| String childDetailPrefix = prefix + "| "; |
| for (int i = children_.size() - 1; i >= 1; --i) { |
| PlanNode child = getChild(i); |
| if (fragment_ != child.fragment_) { |
| // we're crossing a fragment boundary |
| expBuilder.append( |
| child.fragment_.getExplainString( |
| childHeadlinePrefix, childDetailPrefix, queryOptions, detailLevel)); |
| } else { |
| expBuilder.append(child.getExplainString(childHeadlinePrefix, |
| childDetailPrefix, queryOptions, detailLevel)); |
| } |
| if (printFiller) expBuilder.append(filler + "\n"); |
| } |
| PlanFragment childFragment = children_.get(0).fragment_; |
| if (fragment_ != childFragment && detailLevel == TExplainLevel.EXTENDED) { |
| // we're crossing a fragment boundary - print the fragment header. |
| expBuilder.append(childFragment.getFragmentHeaderString(prefix, prefix, |
| queryOptions.getMt_dop())); |
| } |
| expBuilder.append( |
| children_.get(0).getExplainString(prefix, prefix, queryOptions, detailLevel)); |
| } |
| return expBuilder.toString(); |
| } |
| |
| /** |
| * Per-node setting whether to include cardinality in the node overview. |
| * Some nodes omit cardinality because either a) it is not needed |
| * (Empty set, Exchange), or b) it is printed by the node itself (HDFS scan.) |
| * @return true if cardinality should be included in the generic |
| * node details, false if it should be omitted. |
| */ |
| protected boolean displayCardinality(TExplainLevel detailLevel) { |
| return detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal(); |
| } |
| |
| /** |
| * Return the node-specific details. |
| * Subclass should override this function. |
| * Each line should be prefixed by detailPrefix. |
| */ |
| protected String getNodeExplainString(String rootPrefix, String detailPrefix, |
| TExplainLevel detailLevel) { |
| return ""; |
| } |
| |
| /** |
| * Return the offset_ details, if applicable. This is available separately from |
| * 'getNodeExplainString' because we want to output 'limit: ...' (which can be printed |
| * from PlanNode) before 'offset: ...', which is only printed from SortNodes right |
| * now. |
| */ |
| protected String getOffsetExplainString(String prefix) { |
| return ""; |
| } |
| |
| // Convert this plan node, including all children, to its Thrift representation. |
| public TPlan treeToThrift() { |
| TPlan result = new TPlan(); |
| treeToThriftHelper(result); |
| return result; |
| } |
| |
| // Append a flattened version of this plan node, including all children, to 'container'. |
| private void treeToThriftHelper(TPlan container) { |
| TPlanNode msg = new TPlanNode(); |
| msg.node_id = id_.asInt(); |
| msg.limit = limit_; |
| |
| TExecStats estimatedStats = new TExecStats(); |
| estimatedStats.setCardinality(cardinality_); |
| estimatedStats.setMemory_used(nodeResourceProfile_.getMemEstimateBytes()); |
| msg.setLabel(getDisplayLabel()); |
| msg.setLabel_detail(getDisplayLabelDetail()); |
| msg.setEstimated_stats(estimatedStats); |
| |
| Preconditions.checkState(tupleIds_.size() > 0); |
| msg.setRow_tuples(Lists.<Integer>newArrayListWithCapacity(tupleIds_.size())); |
| msg.setNullable_tuples(Lists.<Boolean>newArrayListWithCapacity(tupleIds_.size())); |
| for (TupleId tid: tupleIds_) { |
| msg.addToRow_tuples(tid.asInt()); |
| msg.addToNullable_tuples(nullableTupleIds_.contains(tid)); |
| } |
| for (Expr e: conjuncts_) { |
| msg.addToConjuncts(e.treeToThrift()); |
| } |
| // Serialize any runtime filters |
| for (RuntimeFilter filter : runtimeFilters_) { |
| msg.addToRuntime_filters(filter.toThrift()); |
| } |
| msg.setDisable_codegen(disableCodegen_); |
| Preconditions.checkState(nodeResourceProfile_.isValid()); |
| msg.resource_profile = nodeResourceProfile_.toThrift(); |
| msg.pipelines = new ArrayList<>(); |
| for (PipelineMembership pipe : pipelines_) { |
| msg.pipelines.add(pipe.toThrift()); |
| } |
| toThrift(msg); |
| container.addToNodes(msg); |
| // For the purpose of the BE consider ExchangeNodes to have no children. |
| if (this instanceof ExchangeNode) { |
| msg.num_children = 0; |
| return; |
| } else { |
| msg.num_children = children_.size(); |
| for (PlanNode child: children_) { |
| child.treeToThriftHelper(container); |
| } |
| } |
| } |
| |
| /** |
| * Computes the full internal state, including smap and planner-relevant statistics |
| * (calls computeStats()), marks all slots referenced by this node as materialized |
| * and computes the mem layout of all materialized tuples (with the assumption that |
| * slots that are needed by ancestor PlanNodes have already been marked). |
| * Also performs final expr substitution with childrens' smaps and computes internal |
| * state required for toThrift(). This is called directly after construction. |
| * Throws if an expr substitution or evaluation fails. |
| */ |
| public void init(Analyzer analyzer) throws ImpalaException { |
| assignConjuncts(analyzer); |
| computeStats(analyzer); |
| createDefaultSmap(analyzer); |
| } |
| |
| /** |
| * Assign remaining unassigned conjuncts. |
| */ |
| protected void assignConjuncts(Analyzer analyzer) { |
| List<Expr> unassigned = analyzer.getUnassignedConjuncts(this); |
| conjuncts_.addAll(unassigned); |
| analyzer.markConjunctsAssigned(unassigned); |
| } |
| |
| /** |
| * Returns an smap that combines the childrens' smaps. |
| */ |
| protected ExprSubstitutionMap getCombinedChildSmap() { |
| if (getChildren().size() == 0) return new ExprSubstitutionMap(); |
| if (getChildren().size() == 1) return getChild(0).getOutputSmap(); |
| ExprSubstitutionMap result = ExprSubstitutionMap.combine( |
| getChild(0).getOutputSmap(), getChild(1).getOutputSmap()); |
| for (int i = 2; i < getChildren().size(); ++i) { |
| result = ExprSubstitutionMap.combine(result, getChild(i).getOutputSmap()); |
| } |
| return result; |
| } |
| |
| /** |
| * Sets outputSmap_ to compose(existing smap, combined child smap). Also |
| * substitutes conjuncts_ using the combined child smap. |
| */ |
| protected void createDefaultSmap(Analyzer analyzer) { |
| ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap(); |
| outputSmap_ = |
| ExprSubstitutionMap.compose(outputSmap_, combinedChildSmap, analyzer); |
| conjuncts_ = Expr.substituteList(conjuncts_, outputSmap_, analyzer, false); |
| } |
| |
| /** |
| * Computes planner statistics: avgRowSize_, numNodes_, cardinality_. |
| * Subclasses need to override this. |
| * Assumes that it has already been called on all children. |
| * and that DescriptorTable.computePhysMemLayout() has been called. |
| * This is broken out of init() so that it can be called separately |
| * from init() (to facilitate inserting additional nodes during plan |
| * partitioning w/o the need to call init() recursively on the whole tree again). |
| */ |
| protected void computeStats(Analyzer analyzer) { |
| avgRowSize_ = 0.0F; |
| for (TupleId tid: tupleIds_) { |
| TupleDescriptor desc = analyzer.getTupleDesc(tid); |
| avgRowSize_ += desc.getAvgSerializedSize(); |
| } |
| if (!children_.isEmpty()) numNodes_ = getChild(0).numNodes_; |
| } |
| |
| protected long capCardinalityAtLimit(long cardinality) { |
| if (hasLimit()) { |
| return capCardinalityAtLimit(cardinality, limit_); |
| } |
| return cardinality; |
| } |
| |
| static long capCardinalityAtLimit(long cardinality, long limit) { |
| return cardinality == -1 ? limit : Math.min(cardinality, limit); |
| } |
| |
| /** |
| * Call computeMemLayout() for all materialized tuples. |
| */ |
| protected void computeMemLayout(Analyzer analyzer) { |
| for (TupleId id: tupleIds_) { |
| analyzer.getDescTbl().getTupleDesc(id).computeMemLayout(); |
| } |
| } |
| |
| /** |
| * Returns the estimated combined selectivity of all conjuncts. Uses heuristics to |
| * address the following estimation challenges: |
| * 1. The individual selectivities of conjuncts may be unknown. |
| * 2. Two selectivities, whether known or unknown, could be correlated. Assuming |
| * independence can lead to significant underestimation. |
| * |
| * The first issue is addressed by using a single default selectivity that is |
| * representative of all conjuncts with unknown selectivities. |
| * The second issue is addressed by an exponential backoff when multiplying each |
| * additional selectivity into the final result. |
| */ |
| static protected double computeCombinedSelectivity(List<Expr> conjuncts) { |
| // Collect all estimated selectivities. |
| List<Double> selectivities = new ArrayList<>(); |
| for (Expr e: conjuncts) { |
| if (e.hasSelectivity()) selectivities.add(e.getSelectivity()); |
| } |
| if (selectivities.size() != conjuncts.size()) { |
| // Some conjuncts have no estimated selectivity. Use a single default |
| // representative selectivity for all those conjuncts. |
| selectivities.add(Expr.DEFAULT_SELECTIVITY); |
| } |
| // Sort the selectivities to get a consistent estimate, regardless of the original |
| // conjunct order. Sort in ascending order such that the most selective conjunct |
| // is fully applied. |
| Collections.sort(selectivities); |
| double result = 1.0; |
| for (int i = 0; i < selectivities.size(); ++i) { |
| // Exponential backoff for each selectivity multiplied into the final result. |
| result *= Math.pow(selectivities.get(i), 1.0 / (double) (i + 1)); |
| } |
| // Bound result in [0, 1] |
| return Math.max(0.0, Math.min(1.0, result)); |
| } |
| |
| protected double computeSelectivity() { |
| return computeCombinedSelectivity(conjuncts_); |
| } |
| |
| // Compute the cardinality after applying conjuncts_ based on 'preConjunctCardinality'. |
| protected long applyConjunctsSelectivity(long preConjunctCardinality) { |
| return applySelectivity(preConjunctCardinality, computeSelectivity()); |
| } |
| |
| // Compute the cardinality after applying conjuncts with 'selectivity', based on |
| // 'preConjunctCardinality'. |
| protected long applySelectivity(long preConjunctCardinality, double selectivity) { |
| long cardinality = (long) Math.round(preConjunctCardinality * selectivity); |
| // IMPALA-8647: don't round cardinality down to zero for safety. |
| if (cardinality == 0 && preConjunctCardinality > 0) return 1; |
| return cardinality; |
| } |
| |
| // Convert this plan node into msg (excluding children), which requires setting |
| // the node type and the node-specific field. |
| protected abstract void toThrift(TPlanNode msg); |
| |
| protected String debugString() { |
| // not using Objects.toStrHelper because |
| // PlanNode.debugString() is embedded by debug strings of the subclasses |
| StringBuilder output = new StringBuilder(); |
| output.append("preds=" + Expr.debugString(conjuncts_)); |
| output.append(" limit=" + Long.toString(limit_)); |
| return output.toString(); |
| } |
| |
| protected String getExplainString( |
| List<? extends Expr> exprs, TExplainLevel detailLevel) { |
| if (exprs == null) return ""; |
| ToSqlOptions toSqlOptions = |
| detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal() ? |
| ToSqlOptions.SHOW_IMPLICIT_CASTS : |
| ToSqlOptions.DEFAULT; |
| StringBuilder output = new StringBuilder(); |
| for (int i = 0; i < exprs.size(); ++i) { |
| if (i > 0) output.append(", "); |
| output.append(exprs.get(i).toSql(toSqlOptions)); |
| } |
| return output.toString(); |
| } |
| |
| protected String getSortingOrderExplainString(List<? extends Expr> exprs, |
| List<Boolean> isAscOrder, List<Boolean> nullsFirstParams, |
| TSortingOrder sortingOrder) { |
| StringBuilder output = new StringBuilder(); |
| switch (sortingOrder) { |
| case LEXICAL: |
| for (int i = 0; i < exprs.size(); ++i) { |
| if (i > 0) output.append(", "); |
| output.append(exprs.get(i).toSql() + " "); |
| output.append(isAscOrder.get(i) ? "ASC" : "DESC"); |
| |
| Boolean nullsFirstParam = nullsFirstParams.get(i); |
| if (nullsFirstParam != null) { |
| output.append(nullsFirstParam ? " NULLS FIRST" : " NULLS LAST"); |
| } |
| } |
| break; |
| case ZORDER: |
| output.append("ZORDER: "); |
| for (int i = 0; i < exprs.size(); ++i) { |
| if (i > 0) output.append(", "); |
| output.append(exprs.get(i).toSql()); |
| } |
| break; |
| } |
| output.append("\n"); |
| return output.toString(); |
| } |
| |
| /** |
| * Returns true if stats-related variables are valid. |
| */ |
| protected boolean hasValidStats() { |
| return (numNodes_ == -1 || numNodes_ >= 0) && |
| (cardinality_ == -1 || cardinality_ >= 0); |
| } |
| |
| /** |
| * Computes and returns the sum of two long values. If an overflow occurs, |
| * the maximum Long value is returned (Long.MAX_VALUE). |
| */ |
| public static long checkedAdd(long a, long b) { |
| try { |
| return LongMath.checkedAdd(a, b); |
| } catch (ArithmeticException e) { |
| LOG.warn("overflow when adding longs: " + a + ", " + b); |
| return Long.MAX_VALUE; |
| } |
| } |
| |
| /** |
| * Computes and returns the product of two cardinalities. If an overflow |
| * occurs, the maximum Long value is returned (Long.MAX_VALUE). |
| */ |
| public static long checkedMultiply(long a, long b) { |
| try { |
| return LongMath.checkedMultiply(a, b); |
| } catch (ArithmeticException e) { |
| LOG.warn("overflow when multiplying longs: " + a + ", " + b); |
| return Long.MAX_VALUE; |
| } |
| } |
| |
| /** |
| * Returns true if this plan node can output its first row only after consuming |
| * all rows of all its children. This method is used to group plan nodes |
| * into pipelined units for resource estimation. |
| */ |
| public boolean isBlockingNode() { return false; } |
| |
| /** |
| * Fills in 'pipelines_' with the pipelines that this PlanNode is a member of. |
| * |
| * This is called only for nodes that are not part of the right branch of a |
| * subplan. All nodes in the right branch of a subplan belong to the same |
| * pipeline as the GETNEXT phase of the subplan. |
| */ |
| public void computePipelineMembership() { |
| Preconditions.checkState( |
| children_.size() <= 1, "Plan nodes with > 1 child must override"); |
| if (children_.size() == 0) { |
| // Leaf node, e.g. SCAN. |
| pipelines_ = Arrays.asList( |
| new PipelineMembership(id_, 0, TExecNodePhase.GETNEXT)); |
| return; |
| } |
| children_.get(0).computePipelineMembership(); |
| // Default behaviour for simple blocking or streaming nodes. |
| if (isBlockingNode()) { |
| // Executes as root of pipelines that child belongs to and leaf of another |
| // pipeline. |
| pipelines_ = Lists.newArrayList( |
| new PipelineMembership(id_, 0, TExecNodePhase.GETNEXT)); |
| for (PipelineMembership childPipeline : children_.get(0).getPipelines()) { |
| if (childPipeline.getPhase() == TExecNodePhase.GETNEXT) { |
| pipelines_.add(new PipelineMembership( |
| childPipeline.getId(), childPipeline.getHeight() + 1, TExecNodePhase.OPEN)); |
| } |
| } |
| } else { |
| // Streaming with child, e.g. SELECT. Executes as part of all pipelines the child |
| // belongs to. |
| pipelines_ = new ArrayList<>(); |
| for (PipelineMembership childPipeline : children_.get(0).getPipelines()) { |
| if (childPipeline.getPhase() == TExecNodePhase.GETNEXT) { |
| pipelines_.add(new PipelineMembership( |
| childPipeline.getId(), childPipeline.getHeight() + 1, TExecNodePhase.GETNEXT)); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Called from SubplanNode to set the pipeline to 'pipelines'. |
| */ |
| protected void setPipelinesRecursive(List<PipelineMembership> pipelines) { |
| pipelines_ = pipelines; |
| for (PlanNode child: children_) { |
| child.setPipelinesRecursive(pipelines); |
| } |
| } |
| |
| /** |
| * Compute peak resources consumed when executing this PlanNode, initializing |
| * 'nodeResourceProfile_'. May only be called after this PlanNode has been placed in |
| * a PlanFragment because the cost computation is dependent on the enclosing fragment's |
| * data partition. |
| */ |
| public abstract void computeNodeResourceProfile(TQueryOptions queryOptions); |
| |
| /** |
| * Wrapper class to represent resource profiles during different phases of execution. |
| */ |
| public static class ExecPhaseResourceProfiles { |
| public ExecPhaseResourceProfiles( |
| ResourceProfile duringOpenProfile, ResourceProfile postOpenProfile) { |
| this.duringOpenProfile = duringOpenProfile; |
| this.postOpenProfile = postOpenProfile; |
| } |
| |
| /** Peak resources consumed while Open() is executing for this subtree */ |
| public final ResourceProfile duringOpenProfile; |
| |
| /** |
| * Peak resources consumed for this subtree from the time when ExecNode::Open() |
| * returns until the time when ExecNode::Close() returns. |
| */ |
| public final ResourceProfile postOpenProfile; |
| } |
| |
| /** |
| * Recursive function used to compute the peak resources consumed by this subtree of |
| * the plan within a fragment instance. The default implementation of this function |
| * is correct for streaming and blocking PlanNodes with a single child. PlanNodes |
| * that don't meet this description must override this function. |
| * |
| * Not called for PlanNodes inside a subplan: the root SubplanNode is responsible for |
| * computing the peak resources for the entire subplan. |
| * |
| * computeNodeResourceProfile() must be called on all plan nodes in this subtree before |
| * calling this function. |
| */ |
| public ExecPhaseResourceProfiles computeTreeResourceProfiles( |
| TQueryOptions queryOptions) { |
| Preconditions.checkState( |
| children_.size() <= 1, "Plan nodes with > 1 child must override"); |
| if (children_.isEmpty()) { |
| return new ExecPhaseResourceProfiles(nodeResourceProfile_, nodeResourceProfile_); |
| } |
| ExecPhaseResourceProfiles childResources = |
| getChild(0).computeTreeResourceProfiles(queryOptions); |
| if (isBlockingNode()) { |
| // This does not consume resources until after child's Open() returns. The child is |
| // then closed before Open() of this node returns. |
| ResourceProfile duringOpenProfile = childResources.duringOpenProfile.max( |
| childResources.postOpenProfile.sum(nodeResourceProfile_)); |
| return new ExecPhaseResourceProfiles(duringOpenProfile, nodeResourceProfile_); |
| } else { |
| // Streaming node: this node, child and ancestor execute concurrently. |
| return new ExecPhaseResourceProfiles( |
| childResources.duringOpenProfile.sum(nodeResourceProfile_), |
| childResources.postOpenProfile.sum(nodeResourceProfile_)); |
| } |
| } |
| |
| /** |
| * Compute the buffer size that will be used to fit the maximum-sized row - either the |
| * default buffer size provided or the smallest buffer that fits a maximum-sized row. |
| */ |
| protected static long computeMaxSpillableBufferSize(long defaultBufferSize, |
| long maxRowSize) { |
| return Math.max(defaultBufferSize, BitUtil.roundUpToPowerOf2(maxRowSize)); |
| } |
| |
| /** |
| * The input cardinality is the sum of output cardinalities of its children. |
| * For scan nodes the input cardinality is the expected number of rows scanned. |
| */ |
| public long getInputCardinality() { |
| long sum = 0; |
| for(PlanNode p : children_) { |
| long tmp = p.getCardinality(); |
| if (tmp == -1) return -1; |
| sum = checkedAdd(sum, tmp); |
| } |
| return sum; |
| } |
| |
| protected void addRuntimeFilter(RuntimeFilter filter) { runtimeFilters_.add(filter); } |
| |
| protected Collection<RuntimeFilter> getRuntimeFilters() { return runtimeFilters_; } |
| |
| protected String getRuntimeFilterExplainString( |
| boolean isBuildNode, TExplainLevel detailLevel) { |
| if (runtimeFilters_.isEmpty()) return ""; |
| List<String> filtersStr = new ArrayList<>(); |
| for (RuntimeFilter filter: runtimeFilters_) { |
| StringBuilder filterStr = new StringBuilder(); |
| filterStr.append(filter.getFilterId()); |
| if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { |
| filterStr.append("["); |
| filterStr.append(filter.getType().toString().toLowerCase()); |
| filterStr.append("]"); |
| } |
| if (isBuildNode) { |
| filterStr.append(" <- "); |
| filterStr.append(filter.getSrcExpr().toSql()); |
| } else { |
| filterStr.append(" -> "); |
| filterStr.append(filter.getTargetExpr(getId()).toSql()); |
| } |
| filtersStr.add(filterStr.toString()); |
| } |
| return Joiner.on(", ").join(filtersStr) + "\n"; |
| } |
| |
| /** |
| * Sort a list of conjuncts into an estimated cheapest order to evaluate them in, based |
| * on estimates of the cost to evaluate and selectivity of the expressions. Should be |
| * called during PlanNode.init for any PlanNode that could have a conjunct list. |
| * |
| * The conjuncts are sorted by repeatedly iterating over them and choosing the conjunct |
| * that would result in the least total estimated work were it to be applied before the |
| * remaining conjuncts. |
| * |
| * As in computeCombinedSelecivity, the selectivities are exponentially backed off over |
| * the iterations, to reflect the possibility that the conjuncts may be correlated, and |
| * Exprs without selectivity estimates are given a reasonable default. |
| */ |
| public static <T extends Expr> List<T> orderConjunctsByCost(List<T> conjuncts) { |
| if (conjuncts.size() <= 1) return conjuncts; |
| |
| float totalCost = 0; |
| int numWithoutSel = 0; |
| List<T> remaining = Lists.newArrayListWithCapacity(conjuncts.size()); |
| for (T e : conjuncts) { |
| if (!e.hasCost()) { |
| // Avoid toSql() calls for each call |
| Preconditions.checkState(false, e.toSql()); |
| } |
| totalCost += e.getCost(); |
| remaining.add(e); |
| if (!e.hasSelectivity()) { |
| ++numWithoutSel; |
| } |
| } |
| |
| // We distribute the DEFAULT_SELECTIVITY over the conjuncts without a selectivity |
| // estimate so that their combined selectivities equal DEFAULT_SELECTIVITY, i.e. |
| // Math.pow(defaultSel, numWithoutSel) = Expr.DEFAULT_SELECTIVITY |
| double defaultSel = Expr.DEFAULT_SELECTIVITY; |
| if (numWithoutSel != 0) { |
| defaultSel = Math.pow(Math.E, Math.log(Expr.DEFAULT_SELECTIVITY) / numWithoutSel); |
| } |
| |
| List<T> sortedConjuncts = Lists.newArrayListWithCapacity(conjuncts.size()); |
| while (!remaining.isEmpty()) { |
| double smallestCost = Float.MAX_VALUE; |
| T bestConjunct = null; |
| double backoffExp = 1.0 / (double) (sortedConjuncts.size() + 1); |
| for (T e : remaining) { |
| double sel = Math.pow(e.hasSelectivity() ? e.getSelectivity() : defaultSel, |
| backoffExp); |
| |
| // The cost of evaluating this conjunct first is estimated as the cost of |
| // applying this conjunct to all rows plus the cost of applying all the |
| // remaining conjuncts to the number of rows we expect to remain given |
| // this conjunct's selectivity, exponentially backed off. |
| double cost = e.getCost() + (totalCost - e.getCost()) * sel; |
| if (cost < smallestCost) { |
| smallestCost = cost; |
| bestConjunct = e; |
| } else if (cost == smallestCost) { |
| // Break ties based on toSql() to get a consistent display in explain plans. |
| if (e.toSql().compareTo(bestConjunct.toSql()) < 0) { |
| smallestCost = cost; |
| bestConjunct = e; |
| } |
| } |
| } |
| |
| sortedConjuncts.add(bestConjunct); |
| remaining.remove(bestConjunct); |
| totalCost -= bestConjunct.getCost(); |
| } |
| |
| return sortedConjuncts; |
| } |
| |
| public void setDisableCodegen(boolean disableCodegen) { |
| disableCodegen_ = disableCodegen; |
| } |
| } |