| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.planner; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.ListIterator; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.impala.analysis.AggregateInfo; |
| import org.apache.impala.analysis.AnalyticInfo; |
| import org.apache.impala.analysis.Analyzer; |
| import org.apache.impala.analysis.BaseTableRef; |
| import org.apache.impala.analysis.BinaryPredicate; |
| import org.apache.impala.analysis.BinaryPredicate.Operator; |
| import org.apache.impala.analysis.CollectionTableRef; |
| import org.apache.impala.analysis.Expr; |
| import org.apache.impala.analysis.ExprId; |
| import org.apache.impala.analysis.ExprSubstitutionMap; |
| import org.apache.impala.analysis.InlineViewRef; |
| import org.apache.impala.analysis.JoinOperator; |
| import org.apache.impala.analysis.MultiAggregateInfo; |
| import org.apache.impala.analysis.MultiAggregateInfo.AggPhase; |
| import org.apache.impala.analysis.NullLiteral; |
| import org.apache.impala.analysis.QueryStmt; |
| import org.apache.impala.analysis.SelectStmt; |
| import org.apache.impala.analysis.SingularRowSrcTableRef; |
| import org.apache.impala.analysis.SlotDescriptor; |
| import org.apache.impala.analysis.SlotId; |
| import org.apache.impala.analysis.SlotRef; |
| import org.apache.impala.analysis.SortInfo; |
| import org.apache.impala.analysis.TableRef; |
| import org.apache.impala.analysis.TupleDescriptor; |
| import org.apache.impala.analysis.TupleId; |
| import org.apache.impala.analysis.TupleIsNullPredicate; |
| import org.apache.impala.analysis.UnionStmt; |
| import org.apache.impala.analysis.UnionStmt.UnionOperand; |
| import org.apache.impala.catalog.ColumnStats; |
| import org.apache.impala.catalog.FeDataSourceTable; |
| import org.apache.impala.catalog.FeFsPartition; |
| import org.apache.impala.catalog.FeFsTable; |
| import org.apache.impala.catalog.FeHBaseTable; |
| import org.apache.impala.catalog.FeKuduTable; |
| import org.apache.impala.catalog.FeTable; |
| import org.apache.impala.catalog.HdfsFileFormat; |
| import org.apache.impala.catalog.ScalarType; |
| import org.apache.impala.common.ImpalaException; |
| import org.apache.impala.common.InternalException; |
| import org.apache.impala.common.NotImplementedException; |
| import org.apache.impala.common.Pair; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Predicate; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| |
| /** |
| * Constructs a non-executable single-node plan from an analyzed parse tree. |
| * The single-node plan does not contain data exchanges or data-reduction optimizations |
| * such as local aggregations that are important for distributed execution. |
| * The single-node plan needs to be wrapped in a plan fragment for it to be executable. |
| */ |
| public class SingleNodePlanner { |
| private final static Logger LOG = LoggerFactory.getLogger(SingleNodePlanner.class); |
| |
| private final PlannerContext ctx_; |
| |
| public SingleNodePlanner(PlannerContext ctx) { |
| ctx_ = ctx; |
| } |
| |
| /** |
| * Generates and returns the root of the single-node plan for the analyzed parse tree |
| * in the planner context. The planning process recursively walks the parse tree and |
| * performs the following actions. |
| * In the top-down phase over query statements: |
| * - Materialize the slots required for evaluating expressions of that statement. |
| * - Migrate conjuncts from parent blocks into inline views and union operands. |
| * In the bottom-up phase generate the plan tree for every query statement: |
| * - Generate the plan for the FROM-clause of a select statement: The plan trees of |
| * absolute and uncorrelated table refs are connected via JoinNodes. The relative |
| * and correlated table refs are associated with one or more SubplanNodes. |
| * - A new SubplanNode is placed on top of an existing plan node whenever the tuples |
| * materialized by that plan node enable evaluation of one or more relative or |
| * correlated table refs, i.e., SubplanNodes are placed at the lowest possible point |
| * in the plan, often right after a ScanNode materializing the (single) parent tuple. |
| * - The right-hand side of each SubplanNode is a plan tree generated by joining a |
| * SingularRowSrcTableRef with those applicable relative and correlated refs. |
| * A SingularRowSrcTableRef represents the current row being processed by the |
| * SubplanNode from its input (first child). |
| * - Connecting table ref plans via JoinNodes is done in a cost-based fashion |
| * (join-order optimization). All materialized slots, including those of tuples |
| * materialized inside a SubplanNode, must be known for an accurate estimate of row |
| * sizes needed for cost-based join ordering. |
| * - The remaining aggregate/analytic/orderby portions of a select statement are added |
| * on top of the FROM-clause plan. |
| * - Whenever a new node is added to the plan tree, assign conjuncts that can be |
| * evaluated at that node and compute the stats of that node (cardinality, etc.). |
| * - Apply combined expression substitution map of child plan nodes; if a plan node |
| * re-maps its input, set a substitution map to be applied by parents. |
| */ |
| public PlanNode createSingleNodePlan() throws ImpalaException { |
| QueryStmt queryStmt = ctx_.getQueryStmt(); |
| // Use the stmt's analyzer which is not necessarily the root analyzer |
| // to detect empty result sets. |
| Analyzer analyzer = queryStmt.getAnalyzer(); |
| analyzer.computeValueTransferGraph(); |
| ctx_.getTimeline().markEvent("Value transfer graph computed"); |
| |
| // Mark slots referenced by output exprs as materialized, prior to generating the |
| // plan tree. |
| // We need to mark the result exprs of the topmost select block as materialized, so |
| // that PlanNode.init() can compute the final mem layout of materialized tuples |
| // (the byte size of tuples is needed for cost computations). |
| // TODO: instead of materializing everything produced by the plan root, derive |
| // referenced slots from destination fragment and add a materialization node |
| // if not all output is needed by destination fragment |
| // TODO 2: should the materialization decision be cost-based? |
| if (queryStmt.getBaseTblResultExprs() != null) { |
| analyzer.materializeSlots(queryStmt.getBaseTblResultExprs()); |
| } |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("desctbl: " + analyzer.getDescTbl().debugString()); |
| } |
| PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer, |
| ctx_.getQueryOptions().isDisable_outermost_topn()); |
| Preconditions.checkNotNull(singleNodePlan); |
| return singleNodePlan; |
| } |
| |
| /** |
| * Checks that the given single-node plan is executable: |
| * - It may not contain right or full outer joins with no equi-join conjuncts that |
| * are not inside the right child of a SubplanNode. |
| * Throws a NotImplementedException if plan validation fails. |
| */ |
| public void validatePlan(PlanNode planNode) throws NotImplementedException { |
| // Any join can run in a single-node plan. |
| if (ctx_.isSingleNodeExec()) return; |
| |
| if (planNode instanceof NestedLoopJoinNode) { |
| JoinNode joinNode = (JoinNode) planNode; |
| JoinOperator joinOp = joinNode.getJoinOp(); |
| if ((joinOp.isRightSemiJoin() || joinOp.isFullOuterJoin() |
| || joinOp == JoinOperator.RIGHT_OUTER_JOIN) |
| && joinNode.getEqJoinConjuncts().isEmpty()) { |
| throw new NotImplementedException(String.format("Error generating a valid " + |
| "execution plan for this query. A %s type with no equi-join " + |
| "predicates can only be executed with a single node plan.", |
| joinOp.toString())); |
| } |
| } |
| |
| if (planNode instanceof SubplanNode) { |
| // Right and full outer joins with no equi-join conjuncts are ok in the right |
| // child of a SubplanNode. |
| validatePlan(planNode.getChild(0)); |
| } else { |
| for (PlanNode child: planNode.getChildren()) { |
| validatePlan(child); |
| } |
| } |
| } |
| |
| /** |
| * Returns true if there is a join in the plan outside of the right branch of a |
| * subplan. This specific behaviour maintains compatibility with older |
| * validatePlan() logic that allowed joins with mt_dop only in this specific case |
| * (presumably by accident). |
| */ |
| public boolean hasUnsupportedMtDopJoin(PlanNode planNode) { |
| if (planNode instanceof JoinNode) return true; |
| |
| if (planNode instanceof SubplanNode) { |
| return hasUnsupportedMtDopJoin(planNode.getChild(0)); |
| } |
| |
| for (PlanNode child : planNode.getChildren()) { |
| if (hasUnsupportedMtDopJoin(child)) return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Creates an EmptyNode that 'materializes' the tuples of the given stmt. |
| * Marks all collection-typed slots referenced in stmt as non-materialized because |
| * they are never unnested, and therefore the corresponding parent scan should not |
| * materialize them. |
| */ |
| private PlanNode createEmptyNode(QueryStmt stmt, Analyzer analyzer) { |
| List<TupleId> tupleIds = new ArrayList<>(); |
| stmt.getMaterializedTupleIds(tupleIds); |
| if (tupleIds.isEmpty()) { |
| // Constant selects do not have materialized tuples at this stage. |
| Preconditions.checkState(stmt instanceof SelectStmt, |
| "Only constant selects should have no materialized tuples"); |
| SelectStmt selectStmt = (SelectStmt)stmt; |
| Preconditions.checkState(selectStmt.getTableRefs().isEmpty()); |
| tupleIds.add(createResultTupleDescriptor(selectStmt, "empty", analyzer).getId()); |
| } |
| unmarkCollectionSlots(stmt); |
| EmptySetNode node = new EmptySetNode(ctx_.getNextNodeId(), tupleIds); |
| node.init(analyzer); |
| // Set the output smap to resolve exprs referencing inline views within stmt. |
| // Not needed for a UnionStmt because it materializes its input operands. |
| if (stmt instanceof SelectStmt) { |
| node.setOutputSmap(((SelectStmt) stmt).getBaseTblSmap()); |
| } |
| return node; |
| } |
| |
| /** |
| * Mark all collection-typed slots in stmt as non-materialized. |
| */ |
| private void unmarkCollectionSlots(QueryStmt stmt) { |
| List<TableRef> tblRefs = new ArrayList<>(); |
| stmt.collectFromClauseTableRefs(tblRefs); |
| for (TableRef ref: tblRefs) { |
| if (!ref.isRelative()) continue; |
| Preconditions.checkState(ref instanceof CollectionTableRef); |
| CollectionTableRef collTblRef = (CollectionTableRef) ref; |
| Expr collExpr = collTblRef.getCollectionExpr(); |
| Preconditions.checkState(collExpr instanceof SlotRef); |
| SlotRef collSlotRef = (SlotRef) collExpr; |
| collSlotRef.getDesc().setIsMaterialized(false); |
| // Re-compute the mem layout if necessary. The tuple may not have a mem layout if |
| // no plan has been generated for the TableRef (e.g. due to limit 0 or similar). |
| collSlotRef.getDesc().getParent().recomputeMemLayout(); |
| } |
| } |
| |
| /** |
| * Create plan tree for single-node execution. Generates PlanNodes for the |
| * Select/Project/Join/Union [All]/Group by/Having/Order by clauses of the query stmt. |
| */ |
| private PlanNode createQueryPlan(QueryStmt stmt, Analyzer analyzer, boolean disableTopN) |
| throws ImpalaException { |
| if (analyzer.hasEmptyResultSet()) return createEmptyNode(stmt, analyzer); |
| |
| PlanNode root; |
| if (stmt instanceof SelectStmt) { |
| SelectStmt selectStmt = (SelectStmt) stmt; |
| root = createSelectPlan(selectStmt, analyzer); |
| |
| // insert possible AnalyticEvalNode before SortNode |
| if (((SelectStmt) stmt).getAnalyticInfo() != null) { |
| AnalyticInfo analyticInfo = selectStmt.getAnalyticInfo(); |
| AnalyticPlanner analyticPlanner = |
| new AnalyticPlanner(analyticInfo, analyzer, ctx_); |
| MultiAggregateInfo multiAggInfo = selectStmt.getMultiAggInfo(); |
| List<Expr> groupingExprs; |
| if (multiAggInfo != null) { |
| groupingExprs = multiAggInfo.getSubstGroupingExprs(); |
| Preconditions.checkState(groupingExprs != null); |
| } else { |
| groupingExprs = Collections.emptyList(); |
| } |
| List<Expr> inputPartitionExprs = new ArrayList<>(); |
| root = analyticPlanner.createSingleNodePlan( |
| root, groupingExprs, inputPartitionExprs); |
| if (multiAggInfo != null && !inputPartitionExprs.isEmpty() |
| && multiAggInfo.getMaterializedAggClasses().size() == 1) { |
| // analytic computation will benefit from a partition on inputPartitionExprs |
| multiAggInfo.getMaterializedAggClass(0).setPartitionExprs(inputPartitionExprs); |
| } |
| } |
| } else { |
| Preconditions.checkState(stmt instanceof UnionStmt); |
| root = createUnionPlan((UnionStmt) stmt, analyzer); |
| } |
| |
| // Avoid adding a sort node if the sort tuple has no materialized slots. |
| boolean sortHasMaterializedSlots = false; |
| if (stmt.evaluateOrderBy()) { |
| for (SlotDescriptor sortSlotDesc: |
| stmt.getSortInfo().getSortTupleDescriptor().getSlots()) { |
| if (sortSlotDesc.isMaterialized()) { |
| sortHasMaterializedSlots = true; |
| break; |
| } |
| } |
| } |
| |
| if (stmt.evaluateOrderBy() && sortHasMaterializedSlots) { |
| root = createSortNode(analyzer, root, stmt.getSortInfo(), stmt.getLimit(), |
| stmt.getOffset(), stmt.hasLimit(), disableTopN); |
| } else { |
| root.setLimit(stmt.getLimit()); |
| root.computeStats(analyzer); |
| } |
| |
| return root; |
| } |
| |
| /** |
| * Creates and initializes either a SortNode or a TopNNode depending on various |
| * heuristics and configuration parameters. |
| */ |
| private SortNode createSortNode(Analyzer analyzer, PlanNode root, SortInfo sortInfo, |
| long limit, long offset, boolean hasLimit, boolean disableTopN) |
| throws ImpalaException { |
| SortNode sortNode; |
| long topNBytesLimit = ctx_.getQueryOptions().topn_bytes_limit; |
| |
| if (hasLimit && !disableTopN) { |
| if (topNBytesLimit <= 0) { |
| sortNode = |
| SortNode.createTopNSortNode(ctx_.getNextNodeId(), root, sortInfo, offset); |
| } else { |
| long topNCardinality = PlanNode.capCardinalityAtLimit(root.cardinality_, limit); |
| long estimatedTopNMaterializedSize = |
| sortInfo.estimateTopNMaterializedSize(topNCardinality, offset); |
| |
| if (estimatedTopNMaterializedSize < topNBytesLimit) { |
| sortNode = |
| SortNode.createTopNSortNode(ctx_.getNextNodeId(), root, sortInfo, offset); |
| } else { |
| sortNode = |
| SortNode.createTotalSortNode(ctx_.getNextNodeId(), root, sortInfo, offset); |
| } |
| } |
| } else { |
| sortNode = |
| SortNode.createTotalSortNode(ctx_.getNextNodeId(), root, sortInfo, offset); |
| } |
| Preconditions.checkState(sortNode.hasValidStats()); |
| sortNode.setLimit(limit); |
| sortNode.init(analyzer); |
| return sortNode; |
| } |
| |
| /** |
| * If there are unassigned conjuncts that are bound by tupleIds or if there are slot |
| * equivalences for tupleIds that have not yet been enforced, returns a SelectNode on |
| * top of root that evaluates those conjuncts; otherwise returns root unchanged. |
| * TODO: change this to assign the unassigned conjuncts to root itself, if that is |
| * semantically correct |
| */ |
| private PlanNode addUnassignedConjuncts( |
| Analyzer analyzer, List<TupleId> tupleIds, PlanNode root) { |
| // No point in adding SelectNode on top of an EmptyNode. |
| if (root instanceof EmptySetNode) return root; |
| Preconditions.checkNotNull(root); |
| // Gather unassigned conjuncts and generate predicates to enforce |
| // slot equivalences for each tuple id. |
| List<Expr> conjuncts = analyzer.getUnassignedConjuncts(root); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(String.format("unassigned conjuncts for (Node %s): %s", |
| root.getDisplayLabel(), Expr.debugString(conjuncts))); |
| LOG.trace("all conjuncts: " + analyzer.conjunctAssignmentsDebugString()); |
| } |
| for (TupleId tid: tupleIds) { |
| analyzer.createEquivConjuncts(tid, conjuncts); |
| } |
| if (conjuncts.isEmpty()) return root; |
| // evaluate conjuncts in SelectNode |
| SelectNode selectNode = new SelectNode(ctx_.getNextNodeId(), root, conjuncts); |
| // init() marks conjuncts as assigned |
| selectNode.init(analyzer); |
| Preconditions.checkState(selectNode.hasValidStats()); |
| return selectNode; |
| } |
| |
| /** |
| * Return the cheapest plan that materializes the joins of all TableRefs in |
| * parentRefPlans and the subplans of all applicable TableRefs in subplanRefs. |
| * Assumes that parentRefPlans are in the order as they originally appeared in |
| * the query. |
| * For this plan: |
| * - the plan is executable, ie, all non-cross joins have equi-join predicates |
| * - the leftmost scan is over the largest of the inputs for which we can still |
| * construct an executable plan |
| * - from bottom to top, all rhs's are in increasing order of selectivity (percentage |
| * of surviving rows) |
| * - outer/cross/semi joins: rhs serialized size is < lhs serialized size; |
| * enforced via join inversion, if necessary |
| * - SubplanNodes are placed as low as possible in the plan tree - as soon as the |
| * required tuple ids of one or more TableRefs in subplanRefs are materialized |
| * Returns null if we can't create an executable plan. |
| */ |
| private PlanNode createCheapestJoinPlan(Analyzer analyzer, |
| List<Pair<TableRef, PlanNode>> parentRefPlans, List<SubplanRef> subplanRefs) |
| throws ImpalaException { |
| LOG.trace("createCheapestJoinPlan"); |
| if (parentRefPlans.size() == 1) return parentRefPlans.get(0).second; |
| |
| // collect eligible candidates for the leftmost input; list contains |
| // (plan, materialized size) |
| List<Pair<TableRef, Long>> candidates = new ArrayList<>(); |
| for (Pair<TableRef, PlanNode> entry: parentRefPlans) { |
| TableRef ref = entry.first; |
| JoinOperator joinOp = ref.getJoinOp(); |
| |
| // Avoid reordering outer/semi joins which is generally incorrect. |
| // TODO: Allow the rhs of any cross join as the leftmost table. This needs careful |
| // consideration of the joinOps that result from such a re-ordering (IMPALA-1281). |
| if (joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) continue; |
| |
| PlanNode plan = entry.second; |
| if (plan.getCardinality() == -1) { |
| // use 0 for the size to avoid it becoming the leftmost input |
| // TODO: Consider raw size of scanned partitions in the absence of stats. |
| candidates.add(new Pair<TableRef, Long>(ref, new Long(0))); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("candidate " + ref.getUniqueAlias() + ": 0"); |
| } |
| continue; |
| } |
| Preconditions.checkState(ref.isAnalyzed()); |
| long materializedSize = |
| (long) Math.ceil(plan.getAvgRowSize() * (double) plan.getCardinality()); |
| candidates.add(new Pair<TableRef, Long>(ref, new Long(materializedSize))); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace( |
| "candidate " + ref.getUniqueAlias() + ": " + Long.toString(materializedSize)); |
| } |
| } |
| if (candidates.isEmpty()) return null; |
| |
| // order candidates by descending materialized size; we want to minimize the memory |
| // consumption of the materialized hash tables required for the join sequence |
| Collections.sort(candidates, |
| new Comparator<Pair<TableRef, Long>>() { |
| @Override |
| public int compare(Pair<TableRef, Long> a, Pair<TableRef, Long> b) { |
| long diff = b.second - a.second; |
| return (diff < 0 ? -1 : (diff > 0 ? 1 : 0)); |
| } |
| }); |
| |
| for (Pair<TableRef, Long> candidate: candidates) { |
| PlanNode result = createJoinPlan(analyzer, candidate.first, parentRefPlans, subplanRefs); |
| if (result != null) return result; |
| } |
| return null; |
| } |
| |
| /** |
| * Returns a plan with leftmostRef's plan as its leftmost input; the joins |
| * are in decreasing order of selectiveness (percentage of rows they eliminate). |
| * Creates and adds subplan nodes as soon as the tuple ids required by at least one |
| * subplan ref are materialized by a join node added during plan generation. |
| */ |
| private PlanNode createJoinPlan(Analyzer analyzer, TableRef leftmostRef, |
| List<Pair<TableRef, PlanNode>> refPlans, List<SubplanRef> subplanRefs) |
| throws ImpalaException { |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("createJoinPlan: " + leftmostRef.getUniqueAlias()); |
| } |
| // the refs that have yet to be joined |
| List<Pair<TableRef, PlanNode>> remainingRefs = new ArrayList<>(); |
| PlanNode root = null; // root of accumulated join plan |
| for (Pair<TableRef, PlanNode> entry: refPlans) { |
| if (entry.first == leftmostRef) { |
| root = entry.second; |
| } else { |
| remainingRefs.add(entry); |
| } |
| } |
| Preconditions.checkNotNull(root); |
| |
| // Maps from a TableRef in refPlans with an outer/semi join op to the set of |
| // TableRefs that precede it refPlans (i.e., in FROM-clause order). |
| // The map is used to place outer/semi joins at a fixed position in the plan tree |
| // (IMPALA-860), s.t. all the tables appearing to the left/right of an outer/semi |
| // join in the original query still remain to the left/right after join ordering. |
| // This prevents join re-ordering across outer/semi joins which is generally wrong. |
| Map<TableRef, Set<TableRef>> precedingRefs = new HashMap<>(); |
| List<TableRef> tmpTblRefs = new ArrayList<>(); |
| for (Pair<TableRef, PlanNode> entry: refPlans) { |
| TableRef tblRef = entry.first; |
| if (tblRef.getJoinOp().isOuterJoin() || tblRef.getJoinOp().isSemiJoin()) { |
| precedingRefs.put(tblRef, Sets.newHashSet(tmpTblRefs)); |
| } |
| tmpTblRefs.add(tblRef); |
| } |
| |
| // Refs that have been joined. The union of joinedRefs and the refs in remainingRefs |
| // are the set of all table refs. |
| Set<TableRef> joinedRefs = Sets.newHashSet(leftmostRef); |
| long numOps = 0; |
| int i = 0; |
| while (!remainingRefs.isEmpty()) { |
| // We minimize the resulting cardinality at each step in the join chain, |
| // which minimizes the total number of hash table lookups. |
| PlanNode newRoot = null; |
| Pair<TableRef, PlanNode> minEntry = null; |
| for (Pair<TableRef, PlanNode> entry: remainingRefs) { |
| TableRef ref = entry.first; |
| JoinOperator joinOp = ref.getJoinOp(); |
| |
| // Place outer/semi joins at a fixed position in the plan tree. |
| Set<TableRef> requiredRefs = precedingRefs.get(ref); |
| if (requiredRefs != null) { |
| Preconditions.checkState(joinOp.isOuterJoin() || joinOp.isSemiJoin()); |
| // If the required table refs have not been placed yet, do not even consider |
| // the remaining table refs to prevent incorrect re-ordering of tables across |
| // outer/semi joins. |
| if (!requiredRefs.equals(joinedRefs)) break; |
| } |
| |
| analyzer.setAssignedConjuncts(root.getAssignedConjuncts()); |
| PlanNode candidate = createJoinNode(root, entry.second, ref, analyzer); |
| if (candidate == null) continue; |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("cardinality=" + Long.toString(candidate.getCardinality())); |
| } |
| |
| // Use 'candidate' as the new root; don't consider any other table refs at this |
| // position in the plan. |
| if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) { |
| newRoot = candidate; |
| minEntry = entry; |
| break; |
| } |
| |
| // Always prefer Hash Join over Nested-Loop Join due to limited costing |
| // infrastructure. |
| if (newRoot == null |
| || (candidate.getClass().equals(newRoot.getClass()) |
| && candidate.getCardinality() < newRoot.getCardinality()) |
| || (candidate instanceof HashJoinNode |
| && newRoot instanceof NestedLoopJoinNode)) { |
| newRoot = candidate; |
| minEntry = entry; |
| } |
| } |
| if (newRoot == null) { |
| // Could not generate a valid plan. |
| return null; |
| } |
| |
| // we need to insert every rhs row into the hash table and then look up |
| // every lhs row |
| long lhsCardinality = root.getCardinality(); |
| long rhsCardinality = minEntry.second.getCardinality(); |
| numOps += lhsCardinality + rhsCardinality; |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(Integer.toString(i) + " chose " + minEntry.first.getUniqueAlias() |
| + " #lhs=" + Long.toString(lhsCardinality) |
| + " #rhs=" + Long.toString(rhsCardinality) |
| + " #ops=" + Long.toString(numOps)); |
| } |
| remainingRefs.remove(minEntry); |
| joinedRefs.add(minEntry.first); |
| root = newRoot; |
| // Create a Subplan on top of the new root for all the subplan refs that can be |
| // evaluated at this point. |
| // TODO: Once we have stats on nested collections, we should consider the join |
| // order in conjunction with the placement of SubplanNodes, i.e., move the creation |
| // of SubplanNodes into the join-ordering loop above. |
| root = createSubplan(root, subplanRefs, false, analyzer); |
| // assign node ids after running through the possible choices in order to end up |
| // with a dense sequence of node ids |
| if (root instanceof SubplanNode) root.getChild(0).setId(ctx_.getNextNodeId()); |
| root.setId(ctx_.getNextNodeId()); |
| analyzer.setAssignedConjuncts(root.getAssignedConjuncts()); |
| ++i; |
| } |
| |
| return root; |
| } |
| |
| /** |
| * Return a plan with joins in the order of parentRefPlans (= FROM clause order). |
| * Adds coalesced SubplanNodes based on the FROM-clause order of subplanRefs. |
| */ |
| private PlanNode createFromClauseJoinPlan(Analyzer analyzer, |
| List<Pair<TableRef, PlanNode>> parentRefPlans, List<SubplanRef> subplanRefs) |
| throws ImpalaException { |
| // create left-deep sequence of binary hash joins; assign node ids as we go along |
| Preconditions.checkState(!parentRefPlans.isEmpty()); |
| PlanNode root = parentRefPlans.get(0).second; |
| for (int i = 1; i < parentRefPlans.size(); ++i) { |
| TableRef innerRef = parentRefPlans.get(i).first; |
| PlanNode innerPlan = parentRefPlans.get(i).second; |
| root = createJoinNode(root, innerPlan, innerRef, analyzer); |
| if (root != null) root = createSubplan(root, subplanRefs, false, analyzer); |
| if (root instanceof SubplanNode) root.getChild(0).setId(ctx_.getNextNodeId()); |
| root.setId(ctx_.getNextNodeId()); |
| } |
| return root; |
| } |
| |
| /** |
| * Create tree of PlanNodes that implements the Select/Project/Join/Group by/Having |
| * of the selectStmt query block. |
| */ |
| private PlanNode createSelectPlan(SelectStmt selectStmt, Analyzer analyzer) |
| throws ImpalaException { |
| // no from clause -> materialize the select's exprs with a UnionNode |
| if (selectStmt.getTableRefs().isEmpty()) { |
| return createConstantSelectPlan(selectStmt, analyzer); |
| } |
| |
| // Slot materialization: |
| // We need to mark all slots as materialized that are needed during the execution |
| // of selectStmt, and we need to do that prior to creating plans for the TableRefs |
| // (because createTableRefNode() might end up calling computeMemLayout() on one or |
| // more TupleDescriptors, at which point all referenced slots need to be marked). |
| // |
| // For non-join predicates, slots are marked as follows: |
| // - for base table scan predicates, this is done directly by ScanNode.init(), which |
| // can do a better job because it doesn't need to materialize slots that are only |
| // referenced for partition pruning, for instance |
| // - for inline views, non-join predicates are pushed down, at which point the |
| // process repeats itself. |
| selectStmt.materializeRequiredSlots(analyzer); |
| |
| List<TupleId> rowTuples = new ArrayList<>(); |
| // collect output tuples of subtrees |
| for (TableRef tblRef: selectStmt.getTableRefs()) { |
| rowTuples.addAll(tblRef.getMaterializedTupleIds()); |
| } |
| |
| // If the selectStmt's select-project-join portion returns an empty result set |
| // create a plan that feeds the aggregation of selectStmt with an empty set. |
| // Make sure the slots of the aggregation exprs and the tuples that they reference |
| // are materialized (see IMPALA-1960). Marks all collection-typed slots referenced |
| // in this select stmt as non-materialized because they are never unnested. Note that |
| // this creates extra unused space in the tuple since the mem layout has already been |
| // computed. |
| if (analyzer.hasEmptySpjResultSet()) { |
| unmarkCollectionSlots(selectStmt); |
| PlanNode emptySetNode = new EmptySetNode(ctx_.getNextNodeId(), rowTuples); |
| emptySetNode.init(analyzer); |
| emptySetNode.setOutputSmap(selectStmt.getBaseTblSmap()); |
| return createAggregationPlan(selectStmt, analyzer, emptySetNode); |
| } |
| |
| // Separate table refs into parent refs (uncorrelated or absolute) and |
| // subplan refs (correlated or relative), and generate their plan. |
| List<TableRef> parentRefs = new ArrayList<>(); |
| List<SubplanRef> subplanRefs = new ArrayList<>(); |
| computeParentAndSubplanRefs( |
| selectStmt.getTableRefs(), analyzer.isStraightJoin(), parentRefs, subplanRefs); |
| MultiAggregateInfo multiAggInfo = selectStmt.getMultiAggInfo(); |
| // Only optimize scan/agg plan if there is a single aggregation class. |
| AggregateInfo scanAggInfo = null; |
| if (multiAggInfo != null && multiAggInfo.getMaterializedAggClasses().size() == 1) { |
| scanAggInfo = multiAggInfo.getMaterializedAggClass(0); |
| } |
| PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, scanAggInfo, analyzer); |
| // Add aggregation, if any. |
| if (multiAggInfo != null) { |
| // Apply substitution for optimized scan/agg plan, |
| if (scanAggInfo != null) { |
| if (root instanceof HdfsScanNode) { |
| scanAggInfo.substitute(((HdfsScanNode) root).getOptimizedAggSmap(), analyzer); |
| scanAggInfo.getMergeAggInfo().substitute( |
| ((HdfsScanNode) root).getOptimizedAggSmap(), analyzer); |
| } else if (root instanceof KuduScanNode) { |
| scanAggInfo.substitute(((KuduScanNode) root).getOptimizedAggSmap(), analyzer); |
| scanAggInfo.getMergeAggInfo().substitute( |
| ((KuduScanNode) root).getOptimizedAggSmap(), analyzer); |
| } |
| } |
| root = createAggregationPlan(selectStmt, analyzer, root); |
| } |
| |
| // All the conjuncts_ should be assigned at this point. |
| // TODO: Re-enable this check here and/or elswehere. |
| //Preconditions.checkState(!analyzer.hasUnassignedConjuncts()); |
| return root; |
| } |
| |
| /** |
| * Holds a table ref that must be evaluated inside a subplan (i.e., a relative or |
| * correlated ref), along with the materialized tuple ids and table ref ids that |
| * are required for this table ref to be correctly evaluated inside a SubplanNode. |
| * |
| * Required materialized tuple ids: |
| * These ensure that the SubplanNode evaluating this table ref is placed only once all |
| * root tuples needed by this table ref or relative refs contained in this table ref |
| * are materialized. |
| * |
| * Required table ref ids: |
| * These ensure that the SubplanNode evaluating this table ref is placed correctly |
| * with respect to join ordering, in particular, that the SubplanNode is not ordered |
| * across semi/outer joins. |
| */ |
| private static class SubplanRef { |
| // Relative or correlated table ref. |
| public final TableRef tblRef; |
| |
| // List of tuple ids that must be materialized before 'tblRef' can be |
| // correctly evaluated inside a SubplanNode. |
| public final List<TupleId> requiredTids; |
| |
| // List of table ref ids that a plan tree must contain before 'tblRef' |
| // can be correctly evaluated inside a SubplanNode. |
| public final List<TupleId> requiredTblRefIds; |
| |
| public SubplanRef(TableRef tblRef, List<TupleId> requiredTids, |
| List<TupleId> requiredTblRefIds) { |
| Preconditions.checkState(tblRef.isRelative() || tblRef.isCorrelated()); |
| this.tblRef = tblRef; |
| this.requiredTids = requiredTids; |
| this.requiredTblRefIds = requiredTblRefIds; |
| } |
| } |
| |
| /** |
| * Separates tblRefs into the following two lists. |
| * |
| * parentRefs: |
| * Uncorrelated and non-relative table refs. These are the 'regular' table refs whose |
| * plans are connected by join nodes, and are not placed inside a Subplan. The returned |
| * parentRefs are self-contained with respect to TableRef linking, i.e., each returned |
| * TableRef has its left TableRef link set to the TableRef preceding it in parentRefs. |
| * |
| * subplanRefs: |
| * Correlated and relative table refs. The plan of such refs must be put inside a |
| * Subplan. See SubplanRef for details. The left TableRef link of the TableRefs in |
| * returned SubplanRefs are set to null. |
| * If isStraightJoin is true, then the required tuple ids and table ref ids of a |
| * correlated or relative ref are simply those of all table refs preceding it in |
| * the FROM-clause order. |
| * |
| * If this function is called when generating the right-hand side of a SubplanNode, |
| * then correlated and relative table refs that require only tuples produced by the |
| * SubplanNode's input are placed inside parentRefs. |
| */ |
| private void computeParentAndSubplanRefs(List<TableRef> tblRefs, |
| boolean isStraightJoin, List<TableRef> parentRefs, List<SubplanRef> subplanRefs) { |
| // List of table ref ids materialized so far during plan generation, including those |
| // from the subplan context, if any. We append the ids of table refs placed into |
| // parentRefs to this list to satisfy the ordering requirement of subsequent |
| // table refs that should also be put into parentRefs. Consider this example: |
| // FROM t, (SELECT ... FROM t.c1 LEFT JOIN t.c2 ON(...) JOIN t.c3 ON (...)) v |
| // Table ref t.c3 has an ordering dependency on t.c2 due to the outer join, but t.c3 |
| // must be placed into the subplan that materializes t.c1 and t.c2. |
| List<TupleId> planTblRefIds = new ArrayList<>(); |
| |
| // List of materialized tuple ids in the subplan context, if any. This list must |
| // remain constant in this function because the subplan context is fixed. Any |
| // relative or correlated table ref that requires a materialized tuple id produced |
| // by an element in tblRefs should be placed into subplanRefs because it requires |
| // a new subplan context. Otherwise, it should be placed into parentRefs. |
| List<TupleId> subplanTids = Collections.emptyList(); |
| |
| if (ctx_.hasSubplan()) { |
| // Add all table ref ids from the subplan context. |
| planTblRefIds.addAll(ctx_.getSubplan().getChild(0).getTblRefIds()); |
| subplanTids = |
| Collections.unmodifiableList(ctx_.getSubplan().getChild(0).getTupleIds()); |
| } |
| |
| // Table ref representing the last outer or semi join we have seen. |
| TableRef lastSemiOrOuterJoin = null; |
| for (TableRef ref: tblRefs) { |
| boolean isParentRef = true; |
| if (ref.isRelative() || ref.isCorrelated()) { |
| List<TupleId> requiredTids = new ArrayList<>(); |
| List<TupleId> requiredTblRefIds = new ArrayList<>(); |
| if (ref.isCorrelated()) { |
| requiredTids.addAll(ref.getCorrelatedTupleIds()); |
| } else { |
| CollectionTableRef collectionTableRef = (CollectionTableRef) ref; |
| requiredTids.add(collectionTableRef.getResolvedPath().getRootDesc().getId()); |
| } |
| // Add all plan table ref ids as an ordering dependency for straight_join. |
| if (isStraightJoin) requiredTblRefIds.addAll(planTblRefIds); |
| if (lastSemiOrOuterJoin != null) { |
| // Prevent incorrect join re-ordering across outer/semi joins by requiring all |
| // table ref ids to the left and including the last outer/semi join. |
| // TODO: Think about when we can allow re-ordering across semi/outer joins |
| // in subplans. |
| requiredTblRefIds.addAll(lastSemiOrOuterJoin.getAllTableRefIds()); |
| } |
| if (!subplanTids.containsAll(requiredTids)) { |
| isParentRef = false; |
| // Outer and semi joins are placed at a fixed position in the join order. |
| // They require that all tables to their left are materialized. |
| if (ref.getJoinOp().isOuterJoin() || ref.getJoinOp().isSemiJoin()) { |
| requiredTblRefIds.addAll(ref.getAllTableRefIds()); |
| requiredTblRefIds.remove(ref.getId()); |
| } |
| subplanRefs.add(new SubplanRef(ref, requiredTids, requiredTblRefIds)); |
| } |
| } |
| if (isParentRef) { |
| parentRefs.add(ref); |
| planTblRefIds.add(ref.getId()); |
| } |
| if (ref.getJoinOp().isOuterJoin() || ref.getJoinOp().isSemiJoin()) { |
| lastSemiOrOuterJoin = ref; |
| } |
| } |
| Preconditions.checkState(tblRefs.size() == parentRefs.size() + subplanRefs.size()); |
| |
| // Fix the chain of parent table refs and set the left table of all subplanRefs to |
| // null. This step needs to be done outside of the loop above because the left links |
| // are required for getAllTupleIds() used for determining the requiredTblRefIds. |
| parentRefs.get(0).setLeftTblRef(null); |
| for (int i = 1; i < parentRefs.size(); ++i) { |
| parentRefs.get(i).setLeftTblRef(parentRefs.get(i - 1)); |
| } |
| for (SubplanRef subplanRef: subplanRefs) subplanRef.tblRef.setLeftTblRef(null); |
| } |
| |
| /** |
| * Returns a plan tree for evaluating the given parentRefs and subplanRefs. |
| */ |
| private PlanNode createTableRefsPlan(List<TableRef> parentRefs, |
| List<SubplanRef> subplanRefs, AggregateInfo aggInfo, Analyzer analyzer) |
| throws ImpalaException { |
| // create plans for our table refs; use a list here instead of a map to |
| // maintain a deterministic order of traversing the TableRefs during join |
| // plan generation (helps with tests) |
| List<Pair<TableRef, PlanNode>> parentRefPlans = new ArrayList<>(); |
| for (TableRef ref: parentRefs) { |
| PlanNode root = createTableRefNode(ref, aggInfo, analyzer); |
| Preconditions.checkNotNull(root); |
| root = createSubplan(root, subplanRefs, true, analyzer); |
| parentRefPlans.add(new Pair<TableRef, PlanNode>(ref, root)); |
| } |
| // save state of conjunct assignment; needed for join plan generation |
| for (Pair<TableRef, PlanNode> entry: parentRefPlans) { |
| entry.second.setAssignedConjuncts(analyzer.getAssignedConjuncts()); |
| } |
| |
| PlanNode root = null; |
| if (!analyzer.isStraightJoin()) { |
| Set<ExprId> assignedConjuncts = analyzer.getAssignedConjuncts(); |
| root = createCheapestJoinPlan(analyzer, parentRefPlans, subplanRefs); |
| // If createCheapestJoinPlan() failed to produce an executable plan, then we need |
| // to restore the original state of conjunct assignment for the straight-join plan |
| // to not incorrectly miss conjuncts. |
| if (root == null) analyzer.setAssignedConjuncts(assignedConjuncts); |
| } |
| if (analyzer.isStraightJoin() || root == null) { |
| // we didn't have enough stats to do a cost-based join plan, or the STRAIGHT_JOIN |
| // keyword was in the select list: use the FROM clause order instead |
| root = createFromClauseJoinPlan(analyzer, parentRefPlans, subplanRefs); |
| Preconditions.checkNotNull(root); |
| } |
| return root; |
| } |
| |
| /** |
| * Places a SubplanNode on top of 'root' that evaluates all the subplan refs that can |
| * be correctly evaluated from 'root's materialized tuple ids. Returns 'root' if there |
| * are no applicable subplan refs. |
| * Assigns the returned SubplanNode a new node id unless assignId is false. |
| * |
| * If applicable, the SubplanNode is created as follows: |
| * - 'root' is the input of the SubplanNode (first child) |
| * - the second child is the plan tree generated from these table refs: |
| * 1. a SingularRowSrcTableRef that represents the current row being processed |
| * by the SubplanNode to be joined with |
| * 2. all applicable subplan refs |
| * - the second child plan tree is generated as usual with createTableRefsPlan() |
| * - the plans of the applicable subplan refs are generated as usual, without a |
| * SingularRowSrcTableRef |
| * - nested SubplanNodes are generated recursively inside createTableRefsPlan() by |
| * passing in the remaining subplanRefs that are not applicable after 'root'; some |
| * of those subplanRefs may become applicable inside the second child plan tree of |
| * the SubplanNode generated here |
| */ |
| private PlanNode createSubplan(PlanNode root, List<SubplanRef> subplanRefs, |
| boolean assignId, Analyzer analyzer) throws ImpalaException { |
| Preconditions.checkNotNull(root); |
| List<TableRef> applicableRefs = extractApplicableRefs(root, subplanRefs); |
| if (applicableRefs.isEmpty()) return root; |
| |
| // Prepend a SingularRowSrcTableRef representing the current row being processed |
| // by the SubplanNode from its input (first child). |
| Preconditions.checkState(applicableRefs.get(0).getLeftTblRef() == null); |
| applicableRefs.add(0, new SingularRowSrcTableRef(root)); |
| applicableRefs.get(1).setLeftTblRef(applicableRefs.get(0)); |
| |
| // Construct an incomplete SubplanNode that only knows its input so we can push it |
| // into the planner context. The subplan is set after the subplan tree has been |
| // constructed. |
| SubplanNode subplanNode = new SubplanNode(root); |
| if (assignId) subplanNode.setId(ctx_.getNextNodeId()); |
| |
| // Push the SubplanNode such that UnnestNodes and SingularRowSrcNodes can pick up |
| // their containing SubplanNode. Also, further plan generation relies on knowing |
| // whether we are in a subplan context or not (see computeParentAndSubplanRefs()). |
| ctx_.pushSubplan(subplanNode); |
| PlanNode subplan = createTableRefsPlan(applicableRefs, subplanRefs, null, analyzer); |
| ctx_.popSubplan(); |
| subplanNode.setSubplan(subplan); |
| subplanNode.init(analyzer); |
| return subplanNode; |
| } |
| |
| /** |
| * Returns a new list with all table refs from subplanRefs that can be correctly |
| * evaluated inside a SubplanNode placed after the given plan root. |
| * The returned table refs have their left-table links properly set, and the |
| * corresponding SubplanRefs are removed from subplanRefs. |
| */ |
| private List<TableRef> extractApplicableRefs(PlanNode root, |
| List<SubplanRef> subplanRefs) { |
| // List of table ref ids in 'root' as well as the table ref ids of all table refs |
| // placed in 'subplanRefs' so far. |
| List<TupleId> tblRefIds = Lists.newArrayList(root.getTblRefIds()); |
| List<TableRef> result = new ArrayList<>(); |
| Iterator<SubplanRef> subplanRefIt = subplanRefs.iterator(); |
| TableRef leftTblRef = null; |
| while (subplanRefIt.hasNext()) { |
| SubplanRef subplanRef = subplanRefIt.next(); |
| // Ensure that 'root' materializes all required tuples (first condition), and that |
| // correct join ordering is obeyed (second condition). |
| if (root.getTupleIds().containsAll(subplanRef.requiredTids) && |
| tblRefIds.containsAll(subplanRef.requiredTblRefIds)) { |
| subplanRef.tblRef.setLeftTblRef(leftTblRef); |
| result.add(subplanRef.tblRef); |
| leftTblRef = subplanRef.tblRef; |
| subplanRefIt.remove(); |
| // Add the table ref id such that other subplan refs that can be evaluated inside |
| // the same SubplanNode but only after this ref are returned as well. |
| tblRefIds.add(subplanRef.tblRef.getId()); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Returns a new AggregationNode that materializes the aggregation of the given stmt. |
| * Assigns conjuncts from the Having clause to the returned node. |
| */ |
| private AggregationNode createAggregationPlan( |
| SelectStmt selectStmt, Analyzer analyzer, PlanNode root) throws ImpalaException { |
| MultiAggregateInfo multiAggInfo = |
| Preconditions.checkNotNull(selectStmt.getMultiAggInfo()); |
| AggregationNode result = createAggregationPlan(root, multiAggInfo, analyzer); |
| ExprSubstitutionMap simplifiedAggSmap = multiAggInfo.getSimplifiedAggSmap(); |
| if (simplifiedAggSmap == null) return result; |
| |
| // Fix up aggregations that simplified to a single class after |
| // materializeRequiredSlots(). |
| |
| // Collect conjuncts and then re-assign them to the top-most aggregation node |
| // of the simplified plan. |
| AggregationNode dummyAgg = new AggregationNode( |
| ctx_.getNextNodeId(), result, multiAggInfo, AggPhase.TRANSPOSE); |
| dummyAgg.init(analyzer); |
| List<Expr> conjuncts = |
| Expr.substituteList(dummyAgg.getConjuncts(), simplifiedAggSmap, analyzer, true); |
| // Validate conjuncts after substitution. |
| for (Expr c : conjuncts) { |
| Preconditions.checkState(c.isBound(result.getTupleIds().get(0))); |
| } |
| result.getConjuncts().addAll(conjuncts); |
| |
| // Apply simplification substitution in ancestors. |
| result.setOutputSmap( |
| ExprSubstitutionMap.compose(result.getOutputSmap(), simplifiedAggSmap, analyzer)); |
| return result; |
| } |
| |
| private AggregationNode createAggregationPlan(PlanNode root, |
| MultiAggregateInfo multiAggInfo, Analyzer analyzer) throws InternalException { |
| Preconditions.checkNotNull(multiAggInfo); |
| AggregationNode firstPhaseAgg = |
| new AggregationNode(ctx_.getNextNodeId(), root, multiAggInfo, AggPhase.FIRST); |
| firstPhaseAgg.init(analyzer); |
| if (!multiAggInfo.hasSecondPhase()) return firstPhaseAgg; |
| |
| firstPhaseAgg.unsetNeedsFinalize(); |
| firstPhaseAgg.setIntermediateTuple(); |
| |
| AggregationNode secondPhaseAgg = new AggregationNode( |
| ctx_.getNextNodeId(), firstPhaseAgg, multiAggInfo, AggPhase.SECOND); |
| secondPhaseAgg.init(analyzer); |
| if (!multiAggInfo.hasTransposePhase()) return secondPhaseAgg; |
| |
| AggregationNode transposePhaseAgg = new AggregationNode( |
| ctx_.getNextNodeId(), secondPhaseAgg, multiAggInfo, AggPhase.TRANSPOSE); |
| transposePhaseAgg.init(analyzer); |
| return transposePhaseAgg; |
| } |
| |
| /** |
| * Returns a UnionNode that materializes the exprs of the constant selectStmt. |
| * Replaces the resultExprs of the selectStmt with SlotRefs into the materialized tuple. |
| */ |
| private PlanNode createConstantSelectPlan(SelectStmt selectStmt, Analyzer analyzer) |
| throws InternalException { |
| Preconditions.checkState(selectStmt.getTableRefs().isEmpty()); |
| List<Expr> resultExprs = selectStmt.getResultExprs(); |
| // Create tuple descriptor for materialized tuple. |
| TupleDescriptor tupleDesc = createResultTupleDescriptor(selectStmt, "union", analyzer); |
| UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), tupleDesc.getId()); |
| // Analysis guarantees that selects without a FROM clause only have constant exprs. |
| unionNode.addConstExprList(Lists.newArrayList(resultExprs)); |
| |
| // Replace the select stmt's resultExprs with SlotRefs into tupleDesc. |
| for (int i = 0; i < resultExprs.size(); ++i) { |
| SlotRef slotRef = new SlotRef(tupleDesc.getSlots().get(i)); |
| resultExprs.set(i, slotRef); |
| } |
| // UnionNode.init() needs tupleDesc to have been initialized |
| unionNode.init(analyzer); |
| return unionNode; |
| } |
| |
| /** |
| * Create tuple descriptor that can hold the results of the given SelectStmt, with one |
| * slot per result expr. |
| */ |
| private TupleDescriptor createResultTupleDescriptor(SelectStmt selectStmt, |
| String debugName, Analyzer analyzer) { |
| TupleDescriptor tupleDesc = analyzer.getDescTbl().createTupleDescriptor( |
| debugName); |
| tupleDesc.setIsMaterialized(true); |
| |
| List<Expr> resultExprs = selectStmt.getResultExprs(); |
| List<String> colLabels = selectStmt.getColLabels(); |
| for (int i = 0; i < resultExprs.size(); ++i) { |
| Expr resultExpr = resultExprs.get(i); |
| String colLabel = colLabels.get(i); |
| SlotDescriptor slotDesc = analyzer.addSlotDescriptor(tupleDesc); |
| slotDesc.setLabel(colLabel); |
| slotDesc.setSourceExpr(resultExpr); |
| slotDesc.setType(resultExpr.getType()); |
| slotDesc.setStats(ColumnStats.fromExpr(resultExpr)); |
| slotDesc.setIsMaterialized(true); |
| } |
| tupleDesc.computeMemLayout(); |
| return tupleDesc; |
| } |
| |
| /** |
| * Returns plan tree for an inline view ref: |
| * - predicates from the enclosing scope that can be evaluated directly within |
| * the inline-view plan are pushed down |
| * - predicates that cannot be evaluated directly within the inline-view plan |
| * but only apply to the inline view are evaluated in a SelectNode placed |
| * on top of the inline view plan |
| * - all slots that are referenced by predicates from the enclosing scope that cannot |
| * be pushed down are marked as materialized (so that when computeMemLayout() is |
| * called on the base table descriptors materialized by the inline view it has a |
| * complete picture) |
| */ |
| private PlanNode createInlineViewPlan(Analyzer analyzer, InlineViewRef inlineViewRef) |
| throws ImpalaException { |
| // If possible, "push down" view predicates; this is needed in order to ensure |
| // that predicates such as "x + y = 10" are evaluated in the view's plan tree |
| // rather than a SelectNode grafted on top of that plan tree. |
| // This doesn't prevent predicate propagation, because predicates like |
| // "x = 10" that get pushed down are still connected to equivalent slots |
| // via the equality predicates created for the view's select list. |
| // Include outer join conjuncts here as well because predicates from the |
| // On-clause of an outer join may be pushed into the inline view as well. |
| migrateConjunctsToInlineView(analyzer, inlineViewRef); |
| |
| // Turn a constant select into a UnionNode that materializes the exprs. |
| // TODO: unify this with createConstantSelectPlan(), this is basically the |
| // same thing |
| QueryStmt viewStmt = inlineViewRef.getViewStmt(); |
| if (viewStmt instanceof SelectStmt) { |
| SelectStmt selectStmt = (SelectStmt) viewStmt; |
| if (selectStmt.getTableRefs().isEmpty()) { |
| if (inlineViewRef.getAnalyzer().hasEmptyResultSet()) { |
| PlanNode emptySetNode = createEmptyNode(viewStmt, inlineViewRef.getAnalyzer()); |
| // Still substitute exprs in parent nodes with the inline-view's smap to make |
| // sure no exprs reference the non-materialized inline view slots. No wrapping |
| // with TupleIsNullPredicates is necessary here because we do not migrate |
| // conjuncts into outer-joined inline views, so hasEmptyResultSet() cannot be |
| // true for an outer-joined inline view that has no table refs. |
| Preconditions.checkState(!analyzer.isOuterJoined(inlineViewRef.getId())); |
| emptySetNode.setOutputSmap(inlineViewRef.getSmap()); |
| // The tblRef materialized by this node is still the 'inlineViewRef'. |
| emptySetNode.setTblRefIds(Lists.newArrayList(inlineViewRef.getId())); |
| return emptySetNode; |
| } |
| // Analysis should have generated a tuple id into which to materialize the exprs. |
| Preconditions.checkState(inlineViewRef.getMaterializedTupleIds().size() == 1); |
| // we need to materialize all slots of our inline view tuple |
| analyzer.getTupleDesc(inlineViewRef.getId()).materializeSlots(); |
| UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), |
| inlineViewRef.getMaterializedTupleIds().get(0)); |
| if (analyzer.hasEmptyResultSet()) return unionNode; |
| unionNode.setTblRefIds(Lists.newArrayList(inlineViewRef.getId())); |
| unionNode.addConstExprList(selectStmt.getBaseTblResultExprs()); |
| unionNode.init(analyzer); |
| return unionNode; |
| } |
| } |
| |
| PlanNode rootNode = |
| createQueryPlan(inlineViewRef.getViewStmt(), inlineViewRef.getAnalyzer(), false); |
| // TODO: we should compute the "physical layout" of the view's descriptor, so that |
| // the avg row size is available during optimization; however, that means we need to |
| // select references to its resultExprs from the enclosing scope(s) |
| rootNode.setTblRefIds(Lists.newArrayList(inlineViewRef.getId())); |
| |
| // The output smap is the composition of the inline view's smap and the output smap |
| // of the inline view's plan root. This ensures that all downstream exprs referencing |
| // the inline view are replaced with exprs referencing the physical output of the |
| // inline view's plan. |
| ExprSubstitutionMap outputSmap = ExprSubstitutionMap.compose( |
| inlineViewRef.getSmap(), rootNode.getOutputSmap(), analyzer); |
| if (analyzer.isOuterJoined(inlineViewRef.getId())) { |
| // Exprs against non-matched rows of an outer join should always return NULL. |
| // Make the rhs exprs of the output smap nullable, if necessary. This expr wrapping |
| // must be performed on the composed smap, and not on the the inline view's smap, |
| // because the rhs exprs must first be resolved against the physical output of |
| // 'planRoot' to correctly determine whether wrapping is necessary. |
| List<Expr> nullableRhs = TupleIsNullPredicate.wrapExprs( |
| outputSmap.getRhs(), rootNode.getTupleIds(), analyzer); |
| outputSmap = new ExprSubstitutionMap(outputSmap.getLhs(), nullableRhs); |
| } |
| // Set output smap of rootNode *before* creating a SelectNode for proper resolution. |
| rootNode.setOutputSmap(outputSmap); |
| |
| // Add runtime cardinality check if needed |
| if (inlineViewRef.getViewStmt().isRuntimeScalar()) { |
| rootNode = new CardinalityCheckNode(ctx_.getNextNodeId(), rootNode, |
| inlineViewRef.getViewStmt().getOrigSqlString()); |
| rootNode.setOutputSmap(outputSmap); |
| rootNode.init(ctx_.getRootAnalyzer()); |
| } |
| |
| // If the inline view has a LIMIT/OFFSET or unassigned conjuncts due to analytic |
| // functions, we may have conjuncts that need to be assigned to a SELECT node on |
| // top of the current plan root node. |
| // |
| // TODO: This check is also repeated in migrateConjunctsToInlineView() because we |
| // need to make sure that equivalences are not enforced multiple times. Consolidate |
| // the assignment of conjuncts and the enforcement of equivalences into a single |
| // place. |
| if (!canMigrateConjuncts(inlineViewRef)) { |
| rootNode = addUnassignedConjuncts( |
| analyzer, inlineViewRef.getDesc().getId().asList(), rootNode); |
| } |
| return rootNode; |
| } |
| |
| /** |
| * Migrates unassigned conjuncts into an inline view. Conjuncts are not |
| * migrated into the inline view if the view has a LIMIT/OFFSET clause or if the |
| * view's stmt computes analytic functions (see IMPALA-1243/IMPALA-1900). |
| * The reason is that analytic functions compute aggregates over their entire input, |
| * and applying filters from the enclosing scope *before* the aggregate computation |
| * would alter the results. This is unlike regular aggregate computation, which only |
| * makes the *output* of the computation visible to the enclosing scope, so that |
| * filters from the enclosing scope can be safely applied (to the grouping cols, say). |
| */ |
| public void migrateConjunctsToInlineView(final Analyzer analyzer, |
| final InlineViewRef inlineViewRef) throws ImpalaException { |
| List<Expr> unassignedConjuncts = |
| analyzer.getUnassignedConjuncts(inlineViewRef.getId().asList(), true); |
| if (LOG. isTraceEnabled()) { |
| LOG.trace("unassignedConjuncts: " + Expr.debugString(unassignedConjuncts)); |
| } |
| if (!canMigrateConjuncts(inlineViewRef)) { |
| // mark (fully resolve) slots referenced by unassigned conjuncts as |
| // materialized |
| List<Expr> substUnassigned = Expr.substituteList(unassignedConjuncts, |
| inlineViewRef.getBaseTblSmap(), analyzer, false); |
| analyzer.materializeSlots(substUnassigned); |
| return; |
| } |
| |
| List<Expr> preds = new ArrayList<>(); |
| for (Expr e: unassignedConjuncts) { |
| if (analyzer.canEvalPredicate(inlineViewRef.getId().asList(), e)) { |
| preds.add(e); |
| if (LOG. isTraceEnabled()) { |
| LOG.trace(String.format("Can evaluate %s in inline view %s", e.debugString(), |
| inlineViewRef.getExplicitAlias())); |
| } |
| } |
| } |
| unassignedConjuncts.removeAll(preds); |
| // Migrate the conjuncts by marking the original ones as assigned. They will either |
| // be ignored if they are identity predicates (e.g. a = a), or be substituted into |
| // new ones (viewPredicates below). The substituted ones will be re-registered. |
| analyzer.markConjunctsAssigned(preds); |
| // Generate predicates to enforce equivalences among slots of the inline view |
| // tuple. These predicates are also migrated into the inline view. |
| analyzer.createEquivConjuncts(inlineViewRef.getId(), preds); |
| |
| // Remove unregistered predicates that finally resolved to predicates reference |
| // the same slot on both sides (e.g. a = a). Such predicates have been generated from |
| // slot equivalences and may incorrectly reject rows with nulls |
| // (IMPALA-1412/IMPALA-2643/IMPALA-8386). |
| Predicate<Expr> isIdentityPredicate = new Predicate<Expr>() { |
| @Override |
| public boolean apply(Expr e) { |
| if (!org.apache.impala.analysis.Predicate.isEquivalencePredicate(e) |
| || !((BinaryPredicate) e).isInferred()) { |
| return false; |
| } |
| try { |
| BinaryPredicate finalExpr = (BinaryPredicate) e.trySubstitute( |
| inlineViewRef.getBaseTblSmap(), analyzer, false); |
| boolean isIdentity = finalExpr.hasIdenticalOperands(); |
| |
| // Verity that "smap[e1] == smap[e2]" => "baseTblSmap[e1] == baseTblSmap[e2]" |
| // in case we have bugs in generating baseTblSmap. |
| BinaryPredicate midExpr = (BinaryPredicate) e.trySubstitute( |
| inlineViewRef.getSmap(), analyzer, false); |
| Preconditions.checkState(!midExpr.hasIdenticalOperands() || isIdentity); |
| |
| if (LOG.isTraceEnabled() && isIdentity) { |
| LOG.trace("Removed identity predicate: " + finalExpr.debugString()); |
| } |
| return isIdentity; |
| } catch (Exception ex) { |
| throw new IllegalStateException( |
| "Failed analysis after expr substitution.", ex); |
| } |
| } |
| }; |
| Iterables.removeIf(preds, isIdentityPredicate); |
| |
| // create new predicates against the inline view's unresolved result exprs, not |
| // the resolved result exprs, in order to avoid skipping scopes (and ignoring |
| // limit clauses on the way) |
| List<Expr> viewPredicates = |
| Expr.substituteList(preds, inlineViewRef.getSmap(), analyzer, false); |
| |
| // perform any post-processing of the predicates before registering |
| removeDisqualifyingInferredPreds(inlineViewRef.getAnalyzer(), viewPredicates); |
| |
| // Unset the On-clause flag of the migrated conjuncts because the migrated conjuncts |
| // apply to the post-join/agg/analytic result of the inline view. |
| for (Expr e: viewPredicates) e.setIsOnClauseConjunct(false); |
| inlineViewRef.getAnalyzer().registerConjuncts(viewPredicates); |
| |
| // mark (fully resolve) slots referenced by remaining unassigned conjuncts as |
| // materialized |
| List<Expr> substUnassigned = Expr.substituteList(unassignedConjuncts, |
| inlineViewRef.getBaseTblSmap(), analyzer, false); |
| analyzer.materializeSlots(substUnassigned); |
| } |
| |
| /** |
| * Analyze the predicates in the context of the inline view for certain disqualifying |
| * conditions and remove such predicates from the input list. One such condition is |
| * the predicate is an inferred predicate AND either its left or right SlotRef |
| * references the output of an outer join. Note that although such predicates |
| * may have been detected at the time of creating the values transfer graph |
| * (in the Analyzer), we do this check here anyways as a safety in case any such |
| * predicate 'fell through' to this stage. |
| */ |
| private void removeDisqualifyingInferredPreds(Analyzer analyzer, List<Expr> preds) { |
| ListIterator<Expr> iter = preds.listIterator(); |
| while (iter.hasNext()) { |
| Expr e = iter.next(); |
| if (e instanceof BinaryPredicate && ((BinaryPredicate)e).isInferred()) { |
| BinaryPredicate p = (BinaryPredicate)e; |
| Pair<SlotId, SlotId> slots = p.getEqSlots(); |
| if (slots == null) continue; |
| TupleId leftParent = analyzer.getTupleId(slots.first); |
| TupleId rightParent = analyzer.getTupleId(slots.second); |
| // check if either the left parent or right parent is an outer joined tuple |
| // Note: strictly, we may be ok to check only for the null producing |
| // side but we are being conservative here to check both sides. With |
| // additional testing we could potentially relax this. |
| if (analyzer.isOuterJoined(leftParent) || |
| analyzer.isOuterJoined(rightParent)) { |
| iter.remove(); |
| LOG.warn("Removed inferred predicate " + p.toSql() + " from the list of " + |
| "predicates considered for inline view because either the left " + |
| "or right side is derived from an outer join output."); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Checks if conjuncts can be migrated into an inline view. |
| */ |
| private boolean canMigrateConjuncts(InlineViewRef inlineViewRef) { |
| return !inlineViewRef.getViewStmt().hasLimit() |
| && !inlineViewRef.getViewStmt().hasOffset() |
| && (!(inlineViewRef.getViewStmt() instanceof SelectStmt) |
| || !((SelectStmt) inlineViewRef.getViewStmt()).hasAnalyticInfo()); |
| } |
| |
| /** |
| * Create a node to materialize the slots in the given HdfsTblRef. |
| * |
| * The given 'aggInfo' is used for detecting and applying optimizations that span both |
| * the scan and aggregation. |
| */ |
| private PlanNode createHdfsScanPlan(TableRef hdfsTblRef, AggregateInfo aggInfo, |
| List<Expr> conjuncts, Analyzer analyzer) throws ImpalaException { |
| TupleDescriptor tupleDesc = hdfsTblRef.getDesc(); |
| |
| // Do partition pruning before deciding which slots to materialize because we might |
| // end up removing some predicates. |
| HdfsPartitionPruner pruner = new HdfsPartitionPruner(tupleDesc); |
| Pair<List<? extends FeFsPartition>, List<Expr>> pair = |
| pruner.prunePartitions(analyzer, conjuncts, false); |
| List<? extends FeFsPartition> partitions = pair.first; |
| |
| // Mark all slots referenced by the remaining conjuncts as materialized. |
| analyzer.materializeSlots(conjuncts); |
| |
| // TODO: Remove this section, once DATE type is supported across all fileformats. |
| // Check if there are any partitions for which DATE is not supported. |
| FeFsPartition part = findUnsupportedDateFsPartition(partitions); |
| if (part != null) { |
| FeFsTable table = (FeFsTable)hdfsTblRef.getTable(); |
| HdfsFileFormat ff = part.getFileFormat(); |
| // Throw an exception if tupleDesc contains a non-clustering, materialized |
| // DATE slot. |
| for (SlotDescriptor slotDesc: tupleDesc.getMaterializedSlots()) { |
| if (slotDesc.getColumn() != null |
| && !table.isClusteringColumn(slotDesc.getColumn()) |
| && slotDesc.getType() == ScalarType.DATE) { |
| throw new NotImplementedException( |
| "Scanning DATE values in table '" + table.getFullName() + |
| "' is not supported for fileformat " + ff); |
| } |
| } |
| } |
| |
| // For queries which contain partition columns only, we may use the metadata instead |
| // of table scans. This is only feasible if all materialized aggregate expressions |
| // have distinct semantics. Please see createHdfsScanPlan() for details. |
| boolean fastPartitionKeyScans = |
| analyzer.getQueryCtx().client_request.query_options.optimize_partition_key_scans && |
| aggInfo != null && aggInfo.hasAllDistinctAgg(); |
| |
| // If the optimization for partition key scans with metadata is enabled, |
| // try evaluating with metadata first. If not, fall back to scanning. |
| if (fastPartitionKeyScans && tupleDesc.hasClusteringColsOnly()) { |
| Set<List<Expr>> uniqueExprs = new HashSet<>(); |
| |
| for (FeFsPartition partition: partitions) { |
| // Ignore empty partitions to match the behavior of the scan based approach. |
| if (partition.getSize() == 0) continue; |
| List<Expr> exprs = new ArrayList<>(); |
| for (SlotDescriptor slotDesc: tupleDesc.getSlots()) { |
| // UnionNode.init() will go through all the slots in the tuple descriptor so |
| // there needs to be an entry in 'exprs' for each slot. For unmaterialized |
| // slots, use dummy null values. UnionNode will filter out unmaterialized slots. |
| if (!slotDesc.isMaterialized()) { |
| exprs.add(NullLiteral.create(slotDesc.getType())); |
| } else { |
| int pos = slotDesc.getColumn().getPosition(); |
| exprs.add(partition.getPartitionValue(pos)); |
| } |
| } |
| uniqueExprs.add(exprs); |
| } |
| |
| // Create a UNION node with all unique partition keys. |
| UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), tupleDesc.getId()); |
| for (List<Expr> exprList: uniqueExprs) { |
| unionNode.addConstExprList(exprList); |
| } |
| unionNode.init(analyzer); |
| return unionNode; |
| } else { |
| HdfsScanNode scanNode = |
| new HdfsScanNode(ctx_.getNextNodeId(), tupleDesc, conjuncts, partitions, |
| hdfsTblRef, aggInfo, pair.second); |
| scanNode.init(analyzer); |
| return scanNode; |
| } |
| } |
| |
| /** |
| * Looks for a filesystem-based partition in 'partitions' with no DATE support and |
| * returns the first one it finds. Right now, scanning DATE values is only supported for |
| * TEXT, PARQUET and AVRO fileformats. |
| * |
| * Returns null otherwise. |
| */ |
| private FeFsPartition findUnsupportedDateFsPartition( |
| List<? extends FeFsPartition> partitions) { |
| for (FeFsPartition part: partitions) { |
| HdfsFileFormat ff = part.getFileFormat(); |
| if (!ff.isDateTypeSupported()) return part; |
| } |
| return null; |
| } |
| |
| /** |
| * Create node for scanning all data files of a particular table. |
| * |
| * The given 'aggInfo' is used for detecting and applying optimizations that span both |
| * the scan and aggregation. Only applicable to HDFS and Kudu table refs. |
| * |
| * Throws if a PlanNode.init() failed or if planning of the given |
| * table ref is not implemented. |
| */ |
| private PlanNode createScanNode(TableRef tblRef, AggregateInfo aggInfo, |
| Analyzer analyzer) throws ImpalaException { |
| ScanNode scanNode = null; |
| |
| // Get all predicates bound by the tuple. |
| List<Expr> conjuncts = new ArrayList<>(); |
| TupleId tid = tblRef.getId(); |
| conjuncts.addAll(analyzer.getBoundPredicates(tid)); |
| |
| // Also add remaining unassigned conjuncts |
| List<Expr> unassigned = analyzer.getUnassignedConjuncts(tid.asList()); |
| conjuncts.addAll(unassigned); |
| analyzer.markConjunctsAssigned(unassigned); |
| analyzer.createEquivConjuncts(tid, conjuncts); |
| |
| // Perform constant propagation and optimization if rewriting is enabled |
| if (analyzer.getQueryCtx().client_request.query_options.enable_expr_rewrites) { |
| if (!Expr.optimizeConjuncts(conjuncts, analyzer)) { |
| // Conjuncts implied false; convert to EmptySetNode |
| EmptySetNode node = new EmptySetNode(ctx_.getNextNodeId(), tid.asList()); |
| node.init(analyzer); |
| return node; |
| } |
| } else { |
| Expr.removeDuplicates(conjuncts); |
| } |
| |
| // TODO(todd) introduce FE interfaces for DataSourceTable, HBaseTable, KuduTable |
| FeTable table = tblRef.getTable(); |
| if (table instanceof FeFsTable) { |
| return createHdfsScanPlan(tblRef, aggInfo, conjuncts, analyzer); |
| } else if (table instanceof FeDataSourceTable) { |
| scanNode = new DataSourceScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), |
| conjuncts); |
| scanNode.init(analyzer); |
| return scanNode; |
| } else if (table instanceof FeHBaseTable) { |
| // HBase table |
| scanNode = new HBaseScanNode(ctx_.getNextNodeId(), tblRef.getDesc()); |
| scanNode.addConjuncts(conjuncts); |
| scanNode.init(analyzer); |
| return scanNode; |
| } else if (tblRef.getTable() instanceof FeKuduTable) { |
| scanNode = new KuduScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), conjuncts, |
| aggInfo); |
| scanNode.init(analyzer); |
| return scanNode; |
| } else { |
| throw new NotImplementedException( |
| "Planning not implemented for table ref class: " + tblRef.getClass()); |
| } |
| } |
| |
| /** |
| * Returns all applicable conjuncts for join between two plan trees 'materializing' the |
| * given left-hand and right-hand side table ref ids. The conjuncts either come from |
| * the analyzer or are generated based on equivalence classes, if necessary. The |
| * returned conjuncts are marked as assigned. |
| * The conjuncts can be used for hash table lookups. |
| * - for inner joins, those are equi-join predicates in which one side is fully bound |
| * by lhsTblRefIds and the other by rhsTblRefIds |
| * - for outer joins: same type of conjuncts as inner joins, but only from the |
| * ON or USING clause |
| * Predicates that are redundant based on equivalence classes are intentionally |
| * returned by this function because the removal of redundant predicates and the |
| * creation of new predicates for enforcing slot equivalences go hand-in-hand |
| * (see analyzer.createEquivConjuncts()). |
| */ |
| private List<BinaryPredicate> getHashLookupJoinConjuncts( |
| List<TupleId> lhsTblRefIds, List<TupleId> rhsTblRefIds, Analyzer analyzer) { |
| List<BinaryPredicate> result = new ArrayList<>(); |
| List<Expr> candidates = analyzer.getEqJoinConjuncts(lhsTblRefIds, rhsTblRefIds); |
| Preconditions.checkNotNull(candidates); |
| for (Expr e: candidates) { |
| if (!(e instanceof BinaryPredicate)) continue; |
| BinaryPredicate normalizedJoinConjunct = |
| getNormalizedEqPred(e, lhsTblRefIds, rhsTblRefIds, analyzer); |
| if (normalizedJoinConjunct == null) continue; |
| analyzer.markConjunctAssigned(e); |
| result.add(normalizedJoinConjunct); |
| } |
| if (!result.isEmpty()) return result; |
| |
| // Construct join conjuncts derived from equivalence class membership. |
| Set<TupleId> lhsTblRefIdsHs = new HashSet<>(lhsTblRefIds); |
| for (TupleId rhsId: rhsTblRefIds) { |
| TableRef rhsTblRef = analyzer.getTableRef(rhsId); |
| Preconditions.checkNotNull(rhsTblRef); |
| for (SlotDescriptor slotDesc: rhsTblRef.getDesc().getSlots()) { |
| SlotId rhsSid = slotDesc.getId(); |
| for (SlotId lhsSid : analyzer.getEquivClass(rhsSid)) { |
| if (lhsTblRefIdsHs.contains(analyzer.getTupleId(lhsSid))) { |
| result.add(analyzer.createInferredEqPred(lhsSid, rhsSid)); |
| break; |
| } |
| } |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Returns a normalized version of a binary equality predicate 'expr' where the lhs |
| * child expr is bound by some tuple in 'lhsTids' and the rhs child expr is bound by |
| * some tuple in 'rhsTids'. Returns 'expr' if this predicate is already normalized. |
| * Returns null in any of the following cases: |
| * 1. It is not an equality predicate |
| * 2. One of the operands is a constant |
| * 3. Both children of this predicate are the same expr |
| * 4. Cannot be normalized |
| */ |
| public static BinaryPredicate getNormalizedEqPred(Expr expr, List<TupleId> lhsTids, |
| List<TupleId> rhsTids, Analyzer analyzer) { |
| if (!(expr instanceof BinaryPredicate)) return null; |
| BinaryPredicate pred = (BinaryPredicate) expr; |
| if (!pred.getOp().isEquivalence() && pred.getOp() != Operator.NULL_MATCHING_EQ) { |
| return null; |
| } |
| if (pred.getChild(0).isConstant() || pred.getChild(1).isConstant()) return null; |
| |
| Expr lhsExpr = Expr.getFirstBoundChild(pred, lhsTids); |
| Expr rhsExpr = Expr.getFirstBoundChild(pred, rhsTids); |
| if (lhsExpr == null || rhsExpr == null || lhsExpr == rhsExpr) return null; |
| |
| BinaryPredicate result = new BinaryPredicate(pred.getOp(), lhsExpr, rhsExpr); |
| result.analyzeNoThrow(analyzer); |
| return result; |
| } |
| |
| /** |
| * Creates a new node to join outer with inner. Collects and assigns join conjunct |
| * as well as regular conjuncts. Calls init() on the new join node. |
| * Throws if the JoinNode.init() fails. |
| */ |
| private PlanNode createJoinNode(PlanNode outer, PlanNode inner, |
| TableRef innerRef, Analyzer analyzer) throws ImpalaException { |
| // get eq join predicates for the TableRefs' ids (not the PlanNodes' ids, which |
| // are materialized) |
| List<BinaryPredicate> eqJoinConjuncts = getHashLookupJoinConjuncts( |
| outer.getTblRefIds(), inner.getTblRefIds(), analyzer); |
| // Outer joins should only use On-clause predicates as eqJoinConjuncts. |
| if (!innerRef.getJoinOp().isOuterJoin()) { |
| analyzer.createEquivConjuncts(outer.getTblRefIds(), inner.getTblRefIds(), |
| eqJoinConjuncts); |
| } |
| if (!eqJoinConjuncts.isEmpty() && innerRef.getJoinOp() == JoinOperator.CROSS_JOIN) { |
| innerRef.setJoinOp(JoinOperator.INNER_JOIN); |
| } |
| |
| List<Expr> otherJoinConjuncts = new ArrayList<>(); |
| if (innerRef.getJoinOp().isOuterJoin()) { |
| // Also assign conjuncts from On clause. All remaining unassigned conjuncts |
| // that can be evaluated by this join are assigned in createSelectPlan(). |
| otherJoinConjuncts = analyzer.getUnassignedOjConjuncts(innerRef); |
| } else if (innerRef.getJoinOp().isSemiJoin()) { |
| // Unassigned conjuncts bound by the invisible tuple id of a semi join must have |
| // come from the join's On-clause, and therefore, must be added to the other join |
| // conjuncts to produce correct results. |
| // TODO This doesn't handle predicates specified in the On clause which are not |
| // bound by any tuple id (e.g. ON (true)) |
| List<TupleId> tblRefIds = Lists.newArrayList(outer.getTblRefIds()); |
| tblRefIds.addAll(inner.getTblRefIds()); |
| otherJoinConjuncts = analyzer.getUnassignedConjuncts(tblRefIds, false); |
| if (innerRef.getJoinOp().isNullAwareLeftAntiJoin()) { |
| boolean hasNullMatchingEqOperator = false; |
| // Keep only the null-matching eq conjunct in the eqJoinConjuncts and move |
| // all the others in otherJoinConjuncts. The BE relies on this |
| // separation for correct execution of the null-aware left anti join. |
| Iterator<BinaryPredicate> it = eqJoinConjuncts.iterator(); |
| while (it.hasNext()) { |
| BinaryPredicate conjunct = it.next(); |
| if (!conjunct.isNullMatchingEq()) { |
| otherJoinConjuncts.add(conjunct); |
| it.remove(); |
| } else { |
| // Only one null-matching eq conjunct is allowed |
| Preconditions.checkState(!hasNullMatchingEqOperator); |
| hasNullMatchingEqOperator = true; |
| } |
| } |
| Preconditions.checkState(hasNullMatchingEqOperator); |
| } |
| } |
| analyzer.markConjunctsAssigned(otherJoinConjuncts); |
| |
| // Use a nested-loop join if there are no equi-join conjuncts, or if the inner |
| // (build side) is a singular row src. A singular row src has a cardinality of 1, so |
| // a nested-loop join is certainly cheaper than a hash join. |
| JoinNode result = null; |
| Preconditions.checkState(!innerRef.getJoinOp().isNullAwareLeftAntiJoin() |
| || !(inner instanceof SingularRowSrcNode)); |
| if (eqJoinConjuncts.isEmpty() || inner instanceof SingularRowSrcNode) { |
| otherJoinConjuncts.addAll(eqJoinConjuncts); |
| result = new NestedLoopJoinNode(outer, inner, analyzer.isStraightJoin(), |
| innerRef.getDistributionMode(), innerRef.getJoinOp(), otherJoinConjuncts); |
| } else { |
| result = new HashJoinNode(outer, inner, analyzer.isStraightJoin(), |
| innerRef.getDistributionMode(), innerRef.getJoinOp(), eqJoinConjuncts, |
| otherJoinConjuncts); |
| } |
| result.init(analyzer); |
| return result; |
| } |
| |
| /** |
| * Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef, |
| * CollectionTableRef or an InlineViewRef. |
| * |
| * The given 'aggInfo' is used for detecting and applying optimizations that span both |
| * the scan and aggregation. Only applicable to HDFS and Kudu table refs. |
| * |
| * Throws if a PlanNode.init() failed or if planning of the given |
| * table ref is not implemented. |
| */ |
| private PlanNode createTableRefNode(TableRef tblRef, AggregateInfo aggInfo, |
| Analyzer analyzer) throws ImpalaException { |
| PlanNode result = null; |
| if (tblRef instanceof BaseTableRef) { |
| result = createScanNode(tblRef, aggInfo, analyzer); |
| } else if (tblRef instanceof CollectionTableRef) { |
| if (tblRef.isRelative()) { |
| Preconditions.checkState(ctx_.hasSubplan()); |
| result = new UnnestNode(ctx_.getNextNodeId(), ctx_.getSubplan(), |
| (CollectionTableRef) tblRef); |
| result.init(analyzer); |
| } else { |
| result = createScanNode(tblRef, null, analyzer); |
| } |
| } else if (tblRef instanceof InlineViewRef) { |
| result = createInlineViewPlan(analyzer, (InlineViewRef) tblRef); |
| } else if (tblRef instanceof SingularRowSrcTableRef) { |
| Preconditions.checkState(ctx_.hasSubplan()); |
| result = new SingularRowSrcNode(ctx_.getNextNodeId(), ctx_.getSubplan()); |
| result.init(analyzer); |
| } else { |
| throw new NotImplementedException( |
| "Planning not implemented for table ref class: " + tblRef.getClass()); |
| } |
| return result; |
| } |
| |
| /** |
| * Create a plan tree corresponding to 'unionOperands' for the given unionStmt. |
| * The individual operands' plan trees are attached to a single UnionNode. |
| * If unionDistinctPlan is not null, it is expected to contain the plan for the |
| * distinct portion of the given unionStmt. The unionDistinctPlan is then added |
| * as a child of the returned UnionNode. |
| */ |
| private UnionNode createUnionPlan( |
| Analyzer analyzer, UnionStmt unionStmt, List<UnionOperand> unionOperands, |
| PlanNode unionDistinctPlan) |
| throws ImpalaException { |
| UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), unionStmt.getTupleId(), |
| unionStmt.getUnionResultExprs(), ctx_.hasSubplan()); |
| for (UnionOperand op: unionOperands) { |
| if (op.getAnalyzer().hasEmptyResultSet()) { |
| unmarkCollectionSlots(op.getQueryStmt()); |
| continue; |
| } |
| QueryStmt queryStmt = op.getQueryStmt(); |
| if (queryStmt instanceof SelectStmt) { |
| SelectStmt selectStmt = (SelectStmt) queryStmt; |
| if (selectStmt.getTableRefs().isEmpty()) { |
| unionNode.addConstExprList(selectStmt.getResultExprs()); |
| continue; |
| } |
| } |
| PlanNode opPlan = createQueryPlan(queryStmt, op.getAnalyzer(), false); |
| // There may still be unassigned conjuncts if the operand has an order by + limit. |
| // Place them into a SelectNode on top of the operand's plan. |
| opPlan = addUnassignedConjuncts(analyzer, opPlan.getTupleIds(), opPlan); |
| if (opPlan instanceof EmptySetNode) continue; |
| unionNode.addChild(opPlan, op.getQueryStmt().getResultExprs()); |
| } |
| |
| if (unionDistinctPlan != null) { |
| Preconditions.checkState(unionStmt.hasDistinctOps()); |
| Preconditions.checkState(unionDistinctPlan instanceof AggregationNode); |
| unionNode.addChild(unionDistinctPlan, |
| unionStmt.getDistinctAggInfo().getGroupingExprs()); |
| } |
| unionNode.init(analyzer); |
| return unionNode; |
| } |
| |
| /** |
| * Returns plan tree for unionStmt: |
| * - distinctOperands' plan trees are collected in a single UnionNode |
| * and duplicates removed via distinct aggregation |
| * - the output of that plus the allOperands' plan trees are collected in |
| * another UnionNode which materializes the result of unionStmt |
| * - if any of the union operands contains analytic exprs, we avoid pushing |
| * predicates directly into the operands and instead evaluate them |
| * *after* the final UnionNode (see createInlineViewPlan() for the reasoning) |
| * TODO: optimize this by still pushing predicates into the union operands |
| * that don't contain analytic exprs and evaluating the conjuncts in Select |
| * directly above the AnalyticEvalNodes |
| * TODO: Simplify the plan of unions with empty operands using an empty set node. |
| * TODO: Simplify the plan of unions with only a single non-empty operand to not |
| * use a union node (this is tricky because a union materializes a new tuple). |
| */ |
| private PlanNode createUnionPlan(UnionStmt unionStmt, Analyzer analyzer) |
| throws ImpalaException { |
| List<Expr> conjuncts = |
| analyzer.getUnassignedConjuncts(unionStmt.getTupleId().asList(), false); |
| if (!unionStmt.hasAnalyticExprs()) { |
| // Turn unassigned predicates for unionStmt's tupleId_ into predicates for |
| // the individual operands. |
| // Do this prior to creating the operands' plan trees so they get a chance to |
| // pick up propagated predicates. |
| for (UnionOperand op: unionStmt.getOperands()) { |
| List<Expr> opConjuncts = |
| Expr.substituteList(conjuncts, op.getSmap(), analyzer, false); |
| op.getAnalyzer().registerConjuncts(opConjuncts); |
| } |
| analyzer.markConjunctsAssigned(conjuncts); |
| } else { |
| // mark slots referenced by the yet-unassigned conjuncts |
| analyzer.materializeSlots(conjuncts); |
| } |
| // mark slots after predicate propagation but prior to plan tree generation |
| unionStmt.materializeRequiredSlots(analyzer); |
| |
| PlanNode result = null; |
| // create DISTINCT tree |
| if (unionStmt.hasDistinctOps()) { |
| result = createUnionPlan( |
| analyzer, unionStmt, unionStmt.getDistinctOperands(), null); |
| result = new AggregationNode( |
| ctx_.getNextNodeId(), result, unionStmt.getDistinctAggInfo(), AggPhase.FIRST); |
| result.init(analyzer); |
| } |
| // create ALL tree |
| if (unionStmt.hasAllOps()) { |
| result = createUnionPlan(analyzer, unionStmt, unionStmt.getAllOperands(), result); |
| } |
| |
| if (unionStmt.hasAnalyticExprs()) { |
| result = addUnassignedConjuncts( |
| analyzer, unionStmt.getTupleId().asList(), result); |
| } |
| return result; |
| } |
| } |