| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.planner; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.impala.analysis.Analyzer; |
| import org.apache.impala.analysis.Expr; |
| import org.apache.impala.analysis.TupleDescriptor; |
| import org.apache.impala.analysis.TupleId; |
| import org.apache.impala.analysis.SlotDescriptor; |
| import org.apache.impala.analysis.SlotRef; |
| import org.apache.impala.thrift.TExecNodePhase; |
| import org.apache.impala.thrift.TExplainLevel; |
| import org.apache.impala.thrift.TExpr; |
| import org.apache.impala.thrift.TPlanNode; |
| import org.apache.impala.thrift.TPlanNodeType; |
| import org.apache.impala.thrift.TQueryOptions; |
| import org.apache.impala.thrift.TUnionNode; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * Node that merges the results of its child plans, Normally, this is done by |
| * materializing the corresponding result exprs into a new tuple. However, if |
| * a child has an identical tuple layout as the output of the union node, and |
| * the child only has naked SlotRefs as result exprs, then the child is marked |
| * as 'passthrough'. The rows of passthrough children are directly returned by |
| * the union node, instead of materializing the child's result exprs into new |
| * tuples. |
| */ |
| public class UnionNode extends PlanNode { |
| private final static Logger LOG = LoggerFactory.getLogger(UnionNode.class); |
| |
| // List of union result exprs of the originating UnionStmt. Used for |
| // determining passthrough-compatibility of children. |
| protected List<Expr> unionResultExprs_; |
| |
| // Expr lists corresponding to the input query stmts. |
| // The ith resultExprList belongs to the ith child. |
| // All exprs are resolved to base tables. |
| protected List<List<Expr>> resultExprLists_ = new ArrayList<>(); |
| |
| // Expr lists that originate from constant select stmts. |
| // We keep them separate from the regular expr lists to avoid null children. |
| protected List<List<Expr>> constExprLists_ = new ArrayList<>(); |
| |
| // Materialized result/const exprs corresponding to materialized slots. |
| // Set in init() and substituted against the corresponding child's output smap. |
| protected List<List<Expr>> materializedResultExprLists_ = new ArrayList<>(); |
| protected List<List<Expr>> materializedConstExprLists_ = new ArrayList<>(); |
| |
| // Indicates if this UnionNode is inside a subplan. |
| protected boolean isInSubplan_; |
| |
| // Index of the first non-passthrough child. |
| protected int firstMaterializedChildIdx_; |
| |
| protected final TupleId tupleId_; |
| |
| protected UnionNode(PlanNodeId id, TupleId tupleId) { |
| super(id, tupleId.asList(), "UNION"); |
| unionResultExprs_ = new ArrayList<>(); |
| tupleId_ = tupleId; |
| isInSubplan_ = false; |
| } |
| |
| protected UnionNode(PlanNodeId id, TupleId tupleId, |
| List<Expr> unionResultExprs, boolean isInSubplan) { |
| super(id, tupleId.asList(), "UNION"); |
| unionResultExprs_ = unionResultExprs; |
| tupleId_ = tupleId; |
| isInSubplan_ = isInSubplan; |
| } |
| |
| public void addConstExprList(List<Expr> exprs) { constExprLists_.add(exprs); } |
| |
| /** |
| * Returns true if this UnionNode has only constant exprs. |
| */ |
| public boolean isConstantUnion() { return resultExprLists_.isEmpty(); } |
| |
| /** |
| * Add a child tree plus its corresponding unresolved resultExprs. |
| */ |
| public void addChild(PlanNode node, List<Expr> resultExprs) { |
| super.addChild(node); |
| resultExprLists_.add(resultExprs); |
| } |
| |
| @Override |
| public void computeStats(Analyzer analyzer) { |
| super.computeStats(analyzer); |
| long totalChildCardinality = 0; |
| boolean haveChildWithCardinality = false; |
| for (PlanNode child: children_) { |
| // ignore missing child cardinality info in the hope it won't matter enough |
| // to change the planning outcome |
| if (child.cardinality_ >= 0) { |
| totalChildCardinality = checkedAdd(totalChildCardinality, child.cardinality_); |
| haveChildWithCardinality = true; |
| } |
| // Union fragments are scheduled on the union of hosts of all scans in the fragment |
| // and the hosts of all its input fragments. Approximate this with max(), which is |
| // correct iff the child fragments run on sets of nodes that are supersets or |
| // subsets of each other, i.e. not just partly overlapping. |
| numNodes_ = Math.max(child.getNumNodes(), numNodes_); |
| } |
| // Consider estimate valid if we have at least one child with known cardinality, or |
| // only constant values. |
| if (haveChildWithCardinality || children_.size() == 0) { |
| cardinality_ = checkedAdd(totalChildCardinality, constExprLists_.size()); |
| } else { |
| cardinality_ = -1; |
| } |
| // The number of nodes of a union node is -1 (invalid) if all the referenced tables |
| // are inline views (e.g. select 1 FROM (VALUES(1 x, 1 y)) a FULL OUTER JOIN |
| // (VALUES(1 x, 1 y)) b ON (a.x = b.y)). We need to set the correct value. |
| if (numNodes_ == -1) numNodes_ = 1; |
| cardinality_ = capCardinalityAtLimit(cardinality_); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("stats Union: cardinality=" + Long.toString(cardinality_)); |
| } |
| } |
| |
| @Override |
| public void computeNodeResourceProfile(TQueryOptions queryOptions) { |
| // The union node directly returns the rows for children marked as pass |
| // through. For others, it initializes a single row-batch which it recycles |
| // on every GetNext() call made to the child node. The memory attached to |
| // that row-batch is the only memory counted against this node and the |
| // memory allocated for materialization is counted against the parent's |
| // row-batch passed to it. Since that attached memory depends on how the |
| // nodes under it manage memory ownership, it becomes increasingly difficult |
| // to accurately estimate this node's peak mem usage. Considering that, we |
| // estimate zero bytes for it to make sure it does not affect overall |
| // estimations in any way. |
| nodeResourceProfile_ = ResourceProfile.noReservation(0); |
| } |
| |
| @Override |
| public ExecPhaseResourceProfiles computeTreeResourceProfiles( |
| TQueryOptions queryOptions) { |
| // The union executes concurrently with Open() and GetNext() on each of it's |
| // children. |
| ResourceProfile maxProfile = ResourceProfile.invalid(); |
| for (PlanNode child : children_) { |
| // Children are opened either during Open() or GetNext() of the union. |
| ExecPhaseResourceProfiles childResources = |
| child.computeTreeResourceProfiles(queryOptions); |
| maxProfile = maxProfile.max(childResources.duringOpenProfile); |
| maxProfile = maxProfile.max(childResources.postOpenProfile); |
| } |
| ResourceProfile peakResources = nodeResourceProfile_.sum(maxProfile); |
| return new ExecPhaseResourceProfiles(peakResources, peakResources); |
| } |
| |
| /** |
| * Returns true if rows from the child with 'childTupleIds' and 'childResultExprs' can |
| * be returned directly by the union node (without materialization into a new tuple). |
| */ |
| private boolean isChildPassthrough( |
| Analyzer analyzer, PlanNode childNode, List<Expr> childExprList) { |
| List<TupleId> childTupleIds = childNode.getTupleIds(); |
| // Check that if the child outputs a single tuple, then it's not nullable. Tuple |
| // nullability can be considered to be part of the physical row layout. |
| Preconditions.checkState(childTupleIds.size() != 1 || |
| !childNode.getNullableTupleIds().contains(childTupleIds.get(0))); |
| // If the Union node is inside a subplan, passthrough should be disabled to avoid |
| // performance issues by forcing tiny batches. |
| // TODO: Remove this as part of IMPALA-4179. |
| if (isInSubplan_) return false; |
| // Pass through is only done for the simple case where the row has a single tuple. One |
| // of the motivations for this is that the output of a UnionNode is a row with a |
| // single tuple. |
| if (childTupleIds.size() != 1) return false; |
| Preconditions.checkState(!unionResultExprs_.isEmpty()); |
| |
| TupleDescriptor unionTupleDescriptor = analyzer.getDescTbl().getTupleDesc(tupleId_); |
| TupleDescriptor childTupleDescriptor = |
| analyzer.getDescTbl().getTupleDesc(childTupleIds.get(0)); |
| |
| // Verify that the union tuple descriptor has one slot for every expression. |
| Preconditions.checkState( |
| unionTupleDescriptor.getSlots().size() == unionResultExprs_.size()); |
| // Verify that the union node has one slot for every child expression. |
| Preconditions.checkState( |
| unionTupleDescriptor.getSlots().size() == childExprList.size()); |
| |
| if (unionResultExprs_.size() != childTupleDescriptor.getSlots().size()) return false; |
| if (unionTupleDescriptor.getByteSize() != childTupleDescriptor.getByteSize()) { |
| return false; |
| } |
| |
| for (int i = 0; i < unionResultExprs_.size(); ++i) { |
| if (!unionTupleDescriptor.getSlots().get(i).isMaterialized()) continue; |
| SlotRef unionSlotRef = unionResultExprs_.get(i).unwrapSlotRef(false); |
| SlotRef childSlotRef = childExprList.get(i).unwrapSlotRef(false); |
| Preconditions.checkNotNull(unionSlotRef); |
| if (childSlotRef == null) return false; |
| if (!childSlotRef.getDesc().LayoutEquals(unionSlotRef.getDesc())) return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Compute which children are passthrough and reorder them such that the passthrough |
| * children come before the children that need to be materialized. Also reorder |
| * 'resultExprLists_'. The children are reordered to simplify the implementation in the |
| * BE. |
| */ |
| void computePassthrough(Analyzer analyzer) { |
| List<List<Expr>> newResultExprLists = new ArrayList<>(); |
| List<PlanNode> newChildren = new ArrayList<>(); |
| for (int i = 0; i < children_.size(); i++) { |
| if (isChildPassthrough(analyzer, children_.get(i), resultExprLists_.get(i))) { |
| newResultExprLists.add(resultExprLists_.get(i)); |
| newChildren.add(children_.get(i)); |
| } |
| } |
| firstMaterializedChildIdx_ = newChildren.size(); |
| |
| for (int i = 0; i < children_.size(); i++) { |
| if (!isChildPassthrough(analyzer, children_.get(i), resultExprLists_.get(i))) { |
| newResultExprLists.add(resultExprLists_.get(i)); |
| newChildren.add(children_.get(i)); |
| } |
| } |
| |
| Preconditions.checkState(resultExprLists_.size() == newResultExprLists.size()); |
| resultExprLists_ = newResultExprLists; |
| Preconditions.checkState(children_.size() == newChildren.size()); |
| children_ = newChildren; |
| } |
| |
| /** |
| * Must be called after addChild()/addConstExprList(). Computes the materialized |
| * result/const expr lists based on the materialized slots of this UnionNode's |
| * produced tuple. The UnionNode doesn't need an smap: like a ScanNode, it |
| * materializes an original tuple. |
| * There is no need to call assignConjuncts() because all non-constant conjuncts |
| * have already been assigned to the union operands, and all constant conjuncts have |
| * been evaluated during registration to set analyzer.hasEmptyResultSet_. |
| */ |
| @Override |
| public void init(Analyzer analyzer) { |
| Preconditions.checkState(conjuncts_.isEmpty()); |
| computeMemLayout(analyzer); |
| computeStats(analyzer); |
| computePassthrough(analyzer); |
| |
| // drop resultExprs/constExprs that aren't getting materialized (= where the |
| // corresponding output slot isn't being materialized) |
| materializedResultExprLists_.clear(); |
| Preconditions.checkState(resultExprLists_.size() == children_.size()); |
| List<SlotDescriptor> slots = analyzer.getDescTbl().getTupleDesc(tupleId_).getSlots(); |
| for (int i = 0; i < resultExprLists_.size(); ++i) { |
| List<Expr> exprList = resultExprLists_.get(i); |
| List<Expr> newExprList = new ArrayList<>(); |
| Preconditions.checkState(exprList.size() == slots.size()); |
| for (int j = 0; j < exprList.size(); ++j) { |
| if (slots.get(j).isMaterialized()) newExprList.add(exprList.get(j)); |
| } |
| materializedResultExprLists_.add( |
| Expr.substituteList(newExprList, getChild(i).getOutputSmap(), analyzer, true)); |
| } |
| Preconditions.checkState( |
| materializedResultExprLists_.size() == getChildren().size()); |
| |
| materializedConstExprLists_.clear(); |
| for (List<Expr> exprList: constExprLists_) { |
| Preconditions.checkState(exprList.size() == slots.size()); |
| List<Expr> newExprList = new ArrayList<>(); |
| for (int i = 0; i < exprList.size(); ++i) { |
| if (slots.get(i).isMaterialized()) newExprList.add(exprList.get(i)); |
| } |
| materializedConstExprLists_.add(newExprList); |
| } |
| } |
| |
| @Override |
| protected void toThrift(TPlanNode msg) { |
| Preconditions.checkState(materializedResultExprLists_.size() == children_.size()); |
| List<List<TExpr>> texprLists = new ArrayList<>(); |
| for (List<Expr> exprList: materializedResultExprLists_) { |
| texprLists.add(Expr.treesToThrift(exprList)); |
| } |
| List<List<TExpr>> constTexprLists = new ArrayList<>(); |
| for (List<Expr> constTexprList: materializedConstExprLists_) { |
| constTexprLists.add(Expr.treesToThrift(constTexprList)); |
| } |
| Preconditions.checkState(firstMaterializedChildIdx_ <= children_.size()); |
| msg.union_node = new TUnionNode( |
| tupleId_.asInt(), texprLists, constTexprLists, firstMaterializedChildIdx_); |
| msg.node_type = TPlanNodeType.UNION_NODE; |
| } |
| |
| @Override |
| protected String getNodeExplainString(String prefix, String detailPrefix, |
| TExplainLevel detailLevel) { |
| StringBuilder output = new StringBuilder(); |
| output.append(String.format("%s%s:%s\n", prefix, id_.toString(), displayName_)); |
| // A UnionNode may have predicates if a union is used inside an inline view, |
| // and the enclosing select stmt has predicates referring to the inline view. |
| if (!conjuncts_.isEmpty()) { |
| output.append(detailPrefix |
| + "predicates: " + Expr.getExplainString(conjuncts_, detailLevel) + "\n"); |
| } |
| if (!constExprLists_.isEmpty()) { |
| output.append(detailPrefix + "constant-operands=" + constExprLists_.size() + "\n"); |
| } |
| if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) { |
| List<String> passThroughNodeIds = new ArrayList<>(); |
| for (int i = 0; i < firstMaterializedChildIdx_; ++i) { |
| passThroughNodeIds.add(children_.get(i).getId().toString()); |
| } |
| if (!passThroughNodeIds.isEmpty()) { |
| String result = detailPrefix + "pass-through-operands: "; |
| if (passThroughNodeIds.size() == children_.size()) { |
| output.append(result + "all\n"); |
| } else { |
| output.append(result + Joiner.on(",").join(passThroughNodeIds) + "\n"); |
| } |
| } |
| } |
| return output.toString(); |
| } |
| |
| @Override |
| public void computePipelineMembership() { |
| // The union streams each child's input through, so is part of every pipeline that |
| // its child is a part of. |
| pipelines_ = new ArrayList<>(); |
| for (PlanNode child: children_) { |
| child.computePipelineMembership(); |
| for (PipelineMembership childPipeline : child.getPipelines()) { |
| if (childPipeline.getPhase() == TExecNodePhase.GETNEXT) { |
| pipelines_.add(new PipelineMembership( |
| childPipeline.getId(), childPipeline.getHeight() + 1, TExecNodePhase.GETNEXT)); |
| } |
| } |
| } |
| } |
| } |