| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.planner; |
| |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.impala.analysis.Analyzer; |
| import org.apache.impala.analysis.Expr; |
| import org.apache.impala.analysis.TupleId; |
| import org.apache.impala.common.AnalysisException; |
| import org.apache.impala.common.InternalException; |
| import org.apache.impala.common.PrintUtils; |
| import org.apache.impala.common.TreeNode; |
| import org.apache.impala.planner.JoinNode.DistributionMode; |
| import org.apache.impala.planner.PlanNode.ExecPhaseResourceProfiles; |
| import org.apache.impala.thrift.TExplainLevel; |
| import org.apache.impala.thrift.TPartitionType; |
| import org.apache.impala.thrift.TPlanFragment; |
| import org.apache.impala.thrift.TPlanFragmentTree; |
| import org.apache.impala.thrift.TQueryOptions; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| |
| /** |
| * PlanFragments form a tree structure via their ExchangeNodes. A tree of fragments |
| * connected in that way forms a plan. The output of a plan is produced by the root |
| * fragment and is either the result of the query or an intermediate result |
| * needed by a different plan (such as a hash table). |
| * |
| * Plans are grouped into cohorts based on the consumer of their output: all |
| * plans that materialize intermediate results for a particular consumer plan |
| * are grouped into a single cohort. |
| * |
| * A PlanFragment encapsulates the specific tree of execution nodes that |
| * are used to produce the output of the plan fragment, as well as output exprs, |
| * destination node, etc. If there are no output exprs, the full row that is |
| * is produced by the plan root is marked as materialized. |
| * |
| * A plan fragment can have one or many instances, each of which in turn is executed by |
| * an individual node and the output sent to a specific instance of the destination |
| * fragment (or, in the case of the root fragment, is materialized in some form). |
| * |
| * A hash-partitioned plan fragment is the result of one or more hash-partitioning data |
| * streams being received by plan nodes in this fragment. In the future, a fragment's |
| * data partition could also be hash partitioned based on a scan node that is reading |
| * from a physically hash-partitioned table. |
| * |
| * The sequence of calls is: |
| * - c'tor |
| * - assemble with getters, etc. |
| * - finalize() |
| * - toThrift() |
| * |
| * TODO: the tree of PlanNodes is connected across fragment boundaries, which makes |
| * it impossible search for things within a fragment (using TreeNode functions); |
| * fix that |
| */ |
| public class PlanFragment extends TreeNode<PlanFragment> { |
| private final static Logger LOG = LoggerFactory.getLogger(PlanFragment.class); |
| |
| private final PlanFragmentId fragmentId_; |
| private PlanId planId_; |
| private CohortId cohortId_; |
| |
| // root of plan tree executed by this fragment |
| private PlanNode planRoot_; |
| |
| // exchange node to which this fragment sends its output |
| private ExchangeNode destNode_; |
| |
| // if null, outputs the entire row produced by planRoot_ |
| private List<Expr> outputExprs_; |
| |
| // created in finalize() or set in setSink() |
| private DataSink sink_; |
| |
| // specification of the partition of the input of this fragment; |
| // an UNPARTITIONED fragment is executed on only a single node |
| // TODO: improve this comment, "input" is a bit misleading |
| private DataPartition dataPartition_; |
| |
| // specification of how the output of this fragment is partitioned (i.e., how |
| // it's sent to its destination); |
| // if the output is UNPARTITIONED, it is being broadcast |
| private DataPartition outputPartition_; |
| |
| // Resource requirements and estimates for an instance of this plan fragment. |
| // Initialized with a dummy value. Gets set correctly in |
| // computeResourceProfile(). |
| private ResourceProfile resourceProfile_ = ResourceProfile.invalid(); |
| |
| // The total of initial reservations (in bytes) that will be claimed over the lifetime |
| // of this fragment. Computed in computeResourceProfile(). |
| private long initialReservationTotalClaims_ = -1; |
| |
| /** |
| * C'tor for fragment with specific partition; the output is by default broadcast. |
| */ |
| public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) { |
| fragmentId_ = id; |
| planRoot_ = root; |
| dataPartition_ = partition; |
| outputPartition_ = DataPartition.UNPARTITIONED; |
| setFragmentInPlanTree(planRoot_); |
| } |
| |
| /** |
| * Assigns 'this' as fragment of all PlanNodes in the plan tree rooted at node. |
| * Does not traverse the children of ExchangeNodes because those must belong to a |
| * different fragment. |
| */ |
| public void setFragmentInPlanTree(PlanNode node) { |
| if (node == null) return; |
| node.setFragment(this); |
| if (node instanceof ExchangeNode) return; |
| for (PlanNode child : node.getChildren()) setFragmentInPlanTree(child); |
| } |
| |
| /** |
| * Collect and return all PlanNodes that belong to the exec tree of this fragment. |
| */ |
| public List<PlanNode> collectPlanNodes() { |
| List<PlanNode> nodes = Lists.newArrayList(); |
| collectPlanNodesHelper(planRoot_, nodes); |
| return nodes; |
| } |
| |
| private void collectPlanNodesHelper(PlanNode root, List<PlanNode> nodes) { |
| if (root == null) return; |
| nodes.add(root); |
| if (root instanceof ExchangeNode) return; |
| for (PlanNode child: root.getChildren()) collectPlanNodesHelper(child, nodes); |
| } |
| |
| public void setOutputExprs(List<Expr> outputExprs) { |
| outputExprs_ = Expr.cloneList(outputExprs); |
| } |
| public List<Expr> getOutputExprs() { return outputExprs_; } |
| |
| /** |
| * Do any final work to set up the ExchangeNodes and DataStreamSinks for this fragment. |
| * If this fragment has partitioned joins, ensures that the corresponding partition |
| * exprs of all hash-partitioning senders are cast to appropriate types. |
| * Otherwise, the hashes generated for identical partition values may differ |
| * among senders if the partition-expr types are not identical. |
| */ |
| public void finalizeExchanges(Analyzer analyzer) throws InternalException { |
| if (destNode_ != null) { |
| Preconditions.checkState(sink_ == null); |
| // we're streaming to an exchange node |
| DataStreamSink streamSink = new DataStreamSink(destNode_, outputPartition_); |
| streamSink.setFragment(this); |
| sink_ = streamSink; |
| } |
| |
| // Must be called regardless of this fragment's data partition. This fragment might |
| // be RANDOM partitioned due to a union. The union could still have partitioned joins |
| // in its child subtrees for which casts on the exchange senders are needed. |
| castPartitionedJoinExchanges(planRoot_, analyzer); |
| } |
| |
| /** |
| * Recursively traverses the plan tree rooted at 'node' and casts the partition exprs |
| * of all senders feeding into a series of partitioned joins to compatible types. |
| */ |
| private void castPartitionedJoinExchanges(PlanNode node, Analyzer analyzer) { |
| if (node instanceof HashJoinNode |
| && ((JoinNode) node).getDistributionMode() == DistributionMode.PARTITIONED) { |
| // Contains all exchange nodes in this fragment below the current join node. |
| List<ExchangeNode> exchNodes = Lists.newArrayList(); |
| node.collect(ExchangeNode.class, exchNodes); |
| |
| // Contains partition-expr lists of all hash-partitioning sender fragments. |
| List<List<Expr>> senderPartitionExprs = Lists.newArrayList(); |
| for (ExchangeNode exchNode: exchNodes) { |
| Preconditions.checkState(!exchNode.getChildren().isEmpty()); |
| PlanFragment senderFragment = exchNode.getChild(0).getFragment(); |
| Preconditions.checkNotNull(senderFragment); |
| if (!senderFragment.getOutputPartition().isHashPartitioned()) continue; |
| List<Expr> partExprs = senderFragment.getOutputPartition().getPartitionExprs(); |
| senderPartitionExprs.add(partExprs); |
| } |
| |
| // Cast partition exprs of all hash-partitioning senders to their compatible types. |
| try { |
| analyzer.castToUnionCompatibleTypes(senderPartitionExprs); |
| } catch (AnalysisException e) { |
| // Should never happen. Analysis should have ensured type compatibility already. |
| throw new IllegalStateException(e); |
| } |
| } else { |
| // Recursively traverse plan nodes in this fragment. |
| for (PlanNode child: node.getChildren()) { |
| if (child.getFragment() == this) castPartitionedJoinExchanges(child, analyzer); |
| } |
| } |
| } |
| |
| /** |
| * Compute the peak resource profile for an instance of this fragment. Must |
| * be called after all the plan nodes and sinks are added to the fragment and resource |
| * profiles of all children fragments are computed. |
| */ |
| public void computeResourceProfile(Analyzer analyzer) { |
| // Compute resource profiles for all plan nodes and sinks in the fragment. |
| sink_.computeResourceProfile(analyzer.getQueryOptions()); |
| for (PlanNode node: collectPlanNodes()) { |
| node.computeNodeResourceProfile(analyzer.getQueryOptions()); |
| } |
| |
| if (sink_ instanceof JoinBuildSink) { |
| // Resource consumption of fragments with join build sinks is included in the |
| // parent fragment because the join node blocks waiting for the join build to |
| // finish - see JoinNode.computeTreeResourceProfiles(). |
| resourceProfile_ = ResourceProfile.invalid(); |
| return; |
| } |
| |
| ExecPhaseResourceProfiles planTreeProfile = |
| planRoot_.computeTreeResourceProfiles(analyzer.getQueryOptions()); |
| // The sink is opened after the plan tree. |
| ResourceProfile fInstancePostOpenProfile = |
| planTreeProfile.postOpenProfile.sum(sink_.getResourceProfile()); |
| resourceProfile_ = |
| planTreeProfile.duringOpenProfile.max(fInstancePostOpenProfile); |
| |
| initialReservationTotalClaims_ = sink_.getResourceProfile().getMinReservationBytes(); |
| for (PlanNode node: collectPlanNodes()) { |
| initialReservationTotalClaims_ += |
| node.getNodeResourceProfile().getMinReservationBytes(); |
| } |
| } |
| |
| public ResourceProfile getResourceProfile() { return resourceProfile_; } |
| |
| /** |
| * Return the number of nodes on which the plan fragment will execute. |
| * invalid: -1 |
| */ |
| public int getNumNodes() { |
| return dataPartition_ == DataPartition.UNPARTITIONED ? 1 : planRoot_.getNumNodes(); |
| } |
| |
| /** |
| * Return the number of instances of this fragment per host that it executes on. |
| * invalid: -1 |
| */ |
| public int getNumInstancesPerHost(int mt_dop) { |
| Preconditions.checkState(mt_dop >= 0); |
| if (dataPartition_ == DataPartition.UNPARTITIONED) return 1; |
| return mt_dop == 0 ? 1 : mt_dop; |
| } |
| |
| /** |
| * Return the total number of instances of this fragment across all hosts. |
| * invalid: -1 |
| */ |
| public int getNumInstances(int mt_dop) { |
| if (dataPartition_ == DataPartition.UNPARTITIONED) return 1; |
| int numNodes = planRoot_.getNumNodes(); |
| if (numNodes == -1) return -1; |
| return getNumInstancesPerHost(mt_dop) * numNodes; |
| } |
| |
| /** |
| * Estimates the number of distinct values of exprs per fragment instance based on the |
| * data partition of this fragment, the number of nodes, and the degree of parallelism. |
| * Returns -1 for an invalid estimate, e.g., because getNumDistinctValues() failed on |
| * one of the exprs. |
| */ |
| public long getPerInstanceNdv(int mt_dop, List<Expr> exprs) { |
| Preconditions.checkNotNull(dataPartition_); |
| long result = 1; |
| int numInstances = getNumInstances(mt_dop); |
| Preconditions.checkState(numInstances >= 0); |
| // The number of nodes is zero for empty tables. |
| if (numInstances == 0) return 0; |
| for (Expr expr: exprs) { |
| long numDistinct = expr.getNumDistinctValues(); |
| if (numDistinct == -1) { |
| result = -1; |
| break; |
| } |
| if (dataPartition_.getPartitionExprs().contains(expr)) { |
| numDistinct = (long)Math.max((double) numDistinct / (double) numInstances, 1L); |
| } |
| result = PlanNode.checkedMultiply(result, numDistinct); |
| } |
| return result; |
| } |
| |
| public TPlanFragment toThrift() { |
| TPlanFragment result = new TPlanFragment(); |
| result.setDisplay_name(fragmentId_.toString()); |
| if (planRoot_ != null) result.setPlan(planRoot_.treeToThrift()); |
| if (outputExprs_ != null) { |
| result.setOutput_exprs(Expr.treesToThrift(outputExprs_)); |
| } |
| if (sink_ != null) result.setOutput_sink(sink_.toThrift()); |
| result.setPartition(dataPartition_.toThrift()); |
| if (resourceProfile_.isValid()) { |
| Preconditions.checkArgument(initialReservationTotalClaims_ > -1); |
| result.setMin_reservation_bytes(resourceProfile_.getMinReservationBytes()); |
| result.setInitial_reservation_total_claims(initialReservationTotalClaims_); |
| } else { |
| result.setMin_reservation_bytes(0); |
| result.setInitial_reservation_total_claims(0); |
| } |
| return result; |
| } |
| |
| public TPlanFragmentTree treeToThrift() { |
| TPlanFragmentTree result = new TPlanFragmentTree(); |
| treeToThriftHelper(result); |
| return result; |
| } |
| |
| private void treeToThriftHelper(TPlanFragmentTree plan) { |
| plan.addToFragments(toThrift()); |
| for (PlanFragment child: children_) { |
| child.treeToThriftHelper(plan); |
| } |
| } |
| |
| public String getExplainString(TQueryOptions queryOptions, TExplainLevel detailLevel) { |
| return getExplainString("", "", queryOptions, detailLevel); |
| } |
| |
| /** |
| * The root of the output tree will be prefixed by rootPrefix and the remaining plan |
| * output will be prefixed by prefix. |
| */ |
| protected final String getExplainString(String rootPrefix, String prefix, |
| TQueryOptions queryOptions, TExplainLevel detailLevel) { |
| StringBuilder str = new StringBuilder(); |
| Preconditions.checkState(dataPartition_ != null); |
| String detailPrefix = prefix + "| "; // sink detail |
| if (detailLevel == TExplainLevel.VERBOSE) { |
| // we're printing a new tree, start over with the indentation |
| prefix = " "; |
| rootPrefix = " "; |
| detailPrefix = prefix + "| "; |
| str.append(getFragmentHeaderString("", "", queryOptions.getMt_dop())); |
| if (sink_ != null && sink_ instanceof DataStreamSink) { |
| str.append( |
| sink_.getExplainString(rootPrefix, detailPrefix, queryOptions, detailLevel)); |
| } |
| } else if (detailLevel == TExplainLevel.EXTENDED) { |
| // Print a fragment prefix displaying the # nodes and # instances |
| str.append( |
| getFragmentHeaderString(rootPrefix, detailPrefix, queryOptions.getMt_dop())); |
| rootPrefix = prefix; |
| } |
| |
| String planRootPrefix = rootPrefix; |
| // Always print sinks other than DataStreamSinks. |
| if (sink_ != null && !(sink_ instanceof DataStreamSink)) { |
| str.append( |
| sink_.getExplainString(rootPrefix, detailPrefix, queryOptions, detailLevel)); |
| if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) { |
| str.append(prefix + "|\n"); |
| } |
| // we already used the root prefix for the sink |
| planRootPrefix = prefix; |
| } |
| if (planRoot_ != null) { |
| str.append( |
| planRoot_.getExplainString(planRootPrefix, prefix, queryOptions, detailLevel)); |
| } |
| return str.toString(); |
| } |
| |
| /** |
| * Get a header string for a fragment in an explain plan. |
| */ |
| public String getFragmentHeaderString(String firstLinePrefix, String detailPrefix, |
| int mt_dop) { |
| StringBuilder builder = new StringBuilder(); |
| builder.append(String.format("%s%s:PLAN FRAGMENT [%s]", firstLinePrefix, |
| fragmentId_.toString(), dataPartition_.getExplainString())); |
| builder.append(PrintUtils.printNumHosts(" ", getNumNodes())); |
| builder.append(PrintUtils.printNumInstances(" ", getNumInstances(mt_dop))); |
| builder.append("\n"); |
| builder.append(detailPrefix); |
| builder.append("Per-Host Resources: "); |
| if (sink_ instanceof JoinBuildSink) { |
| builder.append("included in parent fragment"); |
| } else { |
| builder.append(resourceProfile_.multiply(getNumInstancesPerHost(mt_dop)) |
| .getExplainString()); |
| } |
| builder.append("\n"); |
| return builder.toString(); |
| } |
| |
| /** Returns true if this fragment is partitioned. */ |
| public boolean isPartitioned() { |
| return (dataPartition_.getType() != TPartitionType.UNPARTITIONED); |
| } |
| |
| public PlanFragmentId getId() { return fragmentId_; } |
| public PlanId getPlanId() { return planId_; } |
| public void setPlanId(PlanId id) { planId_ = id; } |
| public CohortId getCohortId() { return cohortId_; } |
| public void setCohortId(CohortId id) { cohortId_ = id; } |
| public PlanFragment getDestFragment() { |
| if (destNode_ == null) return null; |
| return destNode_.getFragment(); |
| } |
| public ExchangeNode getDestNode() { return destNode_; } |
| public DataPartition getDataPartition() { return dataPartition_; } |
| public void setDataPartition(DataPartition dataPartition) { |
| this.dataPartition_ = dataPartition; |
| } |
| public DataPartition getOutputPartition() { return outputPartition_; } |
| public void setOutputPartition(DataPartition outputPartition) { |
| this.outputPartition_ = outputPartition; |
| } |
| public PlanNode getPlanRoot() { return planRoot_; } |
| public void setPlanRoot(PlanNode root) { |
| planRoot_ = root; |
| setFragmentInPlanTree(planRoot_); |
| } |
| |
| public void setDestination(ExchangeNode destNode) { |
| destNode_ = destNode; |
| PlanFragment dest = getDestFragment(); |
| Preconditions.checkNotNull(dest); |
| dest.addChild(this); |
| } |
| |
| public boolean hasSink() { return sink_ != null; } |
| public DataSink getSink() { return sink_; } |
| public void setSink(DataSink sink) { |
| Preconditions.checkState(this.sink_ == null); |
| Preconditions.checkNotNull(sink); |
| sink.setFragment(this); |
| this.sink_ = sink; |
| } |
| |
| /** |
| * Adds a node as the new root to the plan tree. Connects the existing |
| * root as the child of newRoot. |
| */ |
| public void addPlanRoot(PlanNode newRoot) { |
| Preconditions.checkState(newRoot.getChildren().size() == 1); |
| newRoot.setChild(0, planRoot_); |
| planRoot_ = newRoot; |
| planRoot_.setFragment(this); |
| } |
| |
| /** |
| * Verify that the tree of PlanFragments and their contained tree of |
| * PlanNodes is constructed correctly. |
| */ |
| public void verifyTree() { |
| // PlanNode.fragment_ is set correctly |
| List<PlanNode> nodes = collectPlanNodes(); |
| List<PlanNode> exchNodes = Lists.newArrayList(); |
| for (PlanNode node: nodes) { |
| if (node instanceof ExchangeNode) exchNodes.add(node); |
| Preconditions.checkState(node.getFragment() == this); |
| } |
| |
| // all ExchangeNodes have registered input fragments |
| Preconditions.checkState(exchNodes.size() == getChildren().size()); |
| List<PlanFragment> childFragments = Lists.newArrayList(); |
| for (PlanNode exchNode: exchNodes) { |
| PlanFragment childFragment = exchNode.getChild(0).getFragment(); |
| Preconditions.checkState(!childFragments.contains(childFragment)); |
| childFragments.add(childFragment); |
| Preconditions.checkState(childFragment.getDestNode() == exchNode); |
| } |
| // all registered children are accounted for |
| Preconditions.checkState(getChildren().containsAll(childFragments)); |
| |
| for (PlanFragment child: getChildren()) child.verifyTree(); |
| } |
| |
| /** |
| * Returns true if 'exprs' reference a tuple that is made nullable in this fragment, |
| * but not in any of its input fragments. |
| */ |
| public boolean refsNullableTupleId(List<Expr> exprs) { |
| Preconditions.checkNotNull(planRoot_); |
| List<TupleId> tids = Lists.newArrayList(); |
| for (Expr e: exprs) e.getIds(tids, null); |
| Set<TupleId> nullableTids = Sets.newHashSet(planRoot_.getNullableTupleIds()); |
| // Remove all tuple ids that were made nullable in an input fragment. |
| List<ExchangeNode> exchNodes = Lists.newArrayList(); |
| planRoot_.collect(ExchangeNode.class, exchNodes); |
| for (ExchangeNode exchNode: exchNodes) { |
| nullableTids.removeAll(exchNode.getNullableTupleIds()); |
| } |
| for (TupleId tid: tids) if (nullableTids.contains(tid)) return true; |
| return false; |
| } |
| } |