blob: 4297c8782830ec2146a5eb8d5355db64c7bf5942 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.List;
import org.apache.impala.analysis.AnalysisContext;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.InsertStmt;
import org.apache.impala.analysis.JoinOperator;
import org.apache.impala.analysis.MultiAggregateInfo.AggPhase;
import org.apache.impala.analysis.QueryStmt;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.planner.JoinNode.DistributionMode;
import org.apache.impala.util.KuduUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* The distributed planner is responsible for creating an executable, distributed plan
* from a single-node plan that can be sent to the backend.
*/
public class DistributedPlanner {
private final static Logger LOG = LoggerFactory.getLogger(DistributedPlanner.class);
private final PlannerContext ctx_;
public DistributedPlanner(PlannerContext ctx) {
ctx_ = ctx;
}
/**
* Create plan fragments for a single-node plan considering a set of execution options.
* The fragments are returned in a list such that element i of that list can
* only consume output of the following fragments j > i.
*
* TODO: take data partition of the plan fragments into account; in particular,
* coordinate between hash partitioning for aggregation and hash partitioning
* for analytic computation more generally than what createQueryPlan() does
* right now (the coordination only happens if the same select block does both
* the aggregation and analytic computation).
*/
public List<PlanFragment> createPlanFragments(
PlanNode singleNodePlan) throws ImpalaException {
Preconditions.checkState(!ctx_.isSingleNodeExec());
AnalysisContext.AnalysisResult analysisResult = ctx_.getAnalysisResult();
QueryStmt queryStmt = ctx_.getQueryStmt();
List<PlanFragment> fragments = new ArrayList<>();
// For inserts or CTAS, unless there is a limit, leave the root fragment
// partitioned, otherwise merge everything into a single coordinator fragment,
// so we can pass it back to the client.
boolean isPartitioned = false;
if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt()
|| analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt())
&& !singleNodePlan.hasLimit()) {
Preconditions.checkState(!queryStmt.hasOffset());
isPartitioned = true;
}
createPlanFragments(singleNodePlan, isPartitioned, fragments);
return fragments;
}
/**
* Return plan fragment that produces result of 'root'; recursively creates
* all input fragments to the returned fragment.
* If a new fragment is created, it is appended to 'fragments', so that
* each fragment is preceded by those from which it consumes the output.
* If 'isPartitioned' is false, the returned fragment is unpartitioned;
* otherwise it may be partitioned, depending on whether its inputs are
* partitioned; the partition function is derived from the inputs.
*/
private PlanFragment createPlanFragments(
PlanNode root, boolean isPartitioned, List<PlanFragment> fragments)
throws ImpalaException {
List<PlanFragment> childFragments = new ArrayList<>();
for (PlanNode child: root.getChildren()) {
// allow child fragments to be partitioned, unless they contain a limit clause
// (the result set with the limit constraint needs to be computed centrally);
// merge later if needed
boolean childIsPartitioned = !child.hasLimit();
// Do not fragment the subplan of a SubplanNode since it is executed locally.
if (root instanceof SubplanNode && child == root.getChild(1)) continue;
childFragments.add(createPlanFragments(child, childIsPartitioned, fragments));
}
PlanFragment result = null;
if (root instanceof ScanNode) {
result = createScanFragment(root);
fragments.add(result);
} else if (root instanceof HashJoinNode) {
Preconditions.checkState(childFragments.size() == 2);
result = createHashJoinFragment((HashJoinNode) root,
childFragments.get(1), childFragments.get(0), fragments);
} else if (root instanceof NestedLoopJoinNode) {
Preconditions.checkState(childFragments.size() == 2);
result = createNestedLoopJoinFragment((NestedLoopJoinNode) root,
childFragments.get(1), childFragments.get(0), fragments);
} else if (root instanceof SubplanNode) {
Preconditions.checkState(childFragments.size() == 1);
result = createSubplanNodeFragment((SubplanNode) root, childFragments.get(0));
} else if (root instanceof SelectNode) {
result = createSelectNodeFragment((SelectNode) root, childFragments);
} else if (root instanceof UnionNode) {
result = createUnionNodeFragment((UnionNode) root, childFragments, fragments);
} else if (root instanceof AggregationNode) {
result = createAggregationFragment(
(AggregationNode) root, childFragments.get(0), fragments);
} else if (root instanceof SortNode) {
if (((SortNode) root).isAnalyticSort()) {
// don't parallelize this like a regular SortNode
result = createAnalyticFragment(
root, childFragments.get(0), fragments);
} else {
result = createOrderByFragment(
(SortNode) root, childFragments.get(0), fragments);
}
} else if (root instanceof AnalyticEvalNode) {
result = createAnalyticFragment(root, childFragments.get(0), fragments);
} else if (root instanceof EmptySetNode) {
result = new PlanFragment(
ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED);
} else if (root instanceof CardinalityCheckNode) {
result = createCardinalityCheckNodeFragment((CardinalityCheckNode) root, childFragments);
} else {
throw new InternalException("Cannot create plan fragment for this node type: "
+ root.getExplainString(ctx_.getQueryOptions()));
}
// move 'result' to end, it depends on all of its children
fragments.remove(result);
fragments.add(result);
if (!isPartitioned && result.isPartitioned()) {
result = createMergeFragment(result);
fragments.add(result);
}
return result;
}
/**
* Returns the product of the distinct value estimates of the individual exprs
* or -1 if any of them doesn't have a distinct value estimate.
*/
private long getNumDistinctValues(List<Expr> exprs) {
long result = 1;
for (Expr expr: exprs) {
result *= expr.getNumDistinctValues();
if (result < 0) return -1;
}
return result;
}
/**
* Decides whether to repartition the output of 'inputFragment' before feeding its
* data into the table sink of the given 'insertStmt'. The decision obeys the
* shuffle/noshuffle plan hints if present. Otherwise, returns a plan fragment that
* partitions the output of 'inputFragment' on the partition exprs of 'insertStmt',
* unless the expected number of partitions is less than the number of nodes on which
* inputFragment runs, or the target table is unpartitioned.
* For inserts into unpartitioned tables or inserts with only constant partition exprs,
* the shuffle hint leads to a plan that merges all rows at the coordinator where
* the table sink is executed.
* If this functions ends up creating a new fragment, appends that to 'fragments'.
*/
public PlanFragment createInsertFragment(
PlanFragment inputFragment, InsertStmt insertStmt, Analyzer analyzer,
List<PlanFragment> fragments)
throws ImpalaException {
if (insertStmt.hasNoShuffleHint()) return inputFragment;
List<Expr> partitionExprs = Lists.newArrayList(insertStmt.getPartitionKeyExprs());
// Ignore constants for the sake of partitioning.
Expr.removeConstants(partitionExprs);
// Do nothing if the input fragment is already appropriately partitioned. TODO: handle
// Kudu tables here (IMPALA-5254).
DataPartition inputPartition = inputFragment.getDataPartition();
if (!partitionExprs.isEmpty()
&& analyzer.setsHaveValueTransfer(inputPartition.getPartitionExprs(),
partitionExprs, true)
&& !(insertStmt.getTargetTable() instanceof FeKuduTable)) {
return inputFragment;
}
// Make a cost-based decision only if no user hint was supplied.
if (!insertStmt.hasShuffleHint()) {
if (insertStmt.getTargetTable() instanceof FeKuduTable) {
// If the table is unpartitioned or all of the partition exprs are constants,
// don't insert the exchange.
// TODO: make a more sophisticated decision here for partitioned tables and when
// we have info about tablet locations.
if (partitionExprs.isEmpty()) return inputFragment;
} else {
// If the existing partition exprs are a subset of the table partition exprs,
// check if it is distributed across all nodes. If so, don't repartition.
if (Expr.isSubset(inputPartition.getPartitionExprs(), partitionExprs)) {
long numPartitions = getNumDistinctValues(inputPartition.getPartitionExprs());
if (numPartitions >= inputFragment.getNumNodes()) {
return inputFragment;
}
}
// Don't repartition if we know we have fewer partitions than nodes
// (ie, default to repartitioning if col stats are missing).
// TODO: We want to repartition if the resulting files would otherwise
// be very small (less than some reasonable multiple of the recommended block
// size). In order to do that, we need to come up with an estimate of the avg row
// size in the particular file format of the output table/partition.
// We should always know on how many nodes our input is running.
long numPartitions = getNumDistinctValues(partitionExprs);
Preconditions.checkState(inputFragment.getNumNodes() != -1);
if (numPartitions > 0 && numPartitions <= inputFragment.getNumNodes()) {
return inputFragment;
}
}
}
ExchangeNode exchNode =
new ExchangeNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot());
exchNode.init(analyzer);
Preconditions.checkState(exchNode.hasValidStats());
DataPartition partition;
if (partitionExprs.isEmpty()) {
partition = DataPartition.UNPARTITIONED;
} else if (insertStmt.getTargetTable() instanceof FeKuduTable) {
partition = DataPartition.kuduPartitioned(
KuduUtil.createPartitionExpr(insertStmt, ctx_.getRootAnalyzer()));
} else {
partition = DataPartition.hashPartitioned(partitionExprs);
}
PlanFragment fragment =
new PlanFragment(ctx_.getNextFragmentId(), exchNode, partition);
inputFragment.setDestination(exchNode);
inputFragment.setOutputPartition(partition);
fragments.add(fragment);
return fragment;
}
/**
* Return unpartitioned fragment that merges the input fragment's output via
* an ExchangeNode.
* Requires that input fragment be partitioned.
*/
private PlanFragment createMergeFragment(PlanFragment inputFragment)
throws ImpalaException {
Preconditions.checkState(inputFragment.isPartitioned());
ExchangeNode mergePlan =
new ExchangeNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot());
mergePlan.init(ctx_.getRootAnalyzer());
Preconditions.checkState(mergePlan.hasValidStats());
PlanFragment fragment = new PlanFragment(ctx_.getNextFragmentId(), mergePlan,
DataPartition.UNPARTITIONED);
inputFragment.setDestination(mergePlan);
return fragment;
}
/**
* Create new randomly-partitioned fragment containing a single scan node.
* TODO: take bucketing into account to produce a naturally hash-partitioned
* fragment
* TODO: hbase scans are range-partitioned on the row key
*/
private PlanFragment createScanFragment(PlanNode node) {
return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.RANDOM);
}
/**
* Adds the SubplanNode as the new plan root to the child fragment and returns
* the child fragment.
*/
private PlanFragment createSubplanNodeFragment(SubplanNode node,
PlanFragment childFragment) {
node.setChild(0, childFragment.getPlanRoot());
childFragment.setPlanRoot(node);
return childFragment;
}
/**
* Modifies the leftChildFragment to execute a cross join. The right child input is
* provided by an ExchangeNode, which is the destination of the rightChildFragment's
* output.
*/
private PlanFragment createNestedLoopJoinFragment(NestedLoopJoinNode node,
PlanFragment rightChildFragment, PlanFragment leftChildFragment,
List<PlanFragment> fragments) throws ImpalaException {
node.setDistributionMode(DistributionMode.BROADCAST);
node.setChild(0, leftChildFragment.getPlanRoot());
connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
leftChildFragment.setPlanRoot(node);
return leftChildFragment;
}
/**
* Helper function to produce a partitioning hash-join fragment
*/
private PlanFragment createPartitionedHashJoinFragment(HashJoinNode node,
Analyzer analyzer, boolean lhsHasCompatPartition, boolean rhsHasCompatPartition,
PlanFragment leftChildFragment, PlanFragment rightChildFragment,
List<Expr> lhsJoinExprs, List<Expr> rhsJoinExprs,
List<PlanFragment> fragments) throws ImpalaException {
Preconditions.checkState(node.getDistributionMode() == DistributionMode.PARTITIONED);
// The lhs and rhs input fragments are already partitioned on the join exprs.
// Combine the lhs/rhs input fragments into leftChildFragment by placing the join
// node into leftChildFragment and setting its lhs/rhs children to the plan root of
// the lhs/rhs child fragment, respectively. No new child fragments or exchanges
// are created, and the rhs fragment is removed.
// TODO: Relax the isCompatPartition() check below. The check is conservative and
// may reject partitions that could be made physically compatible. Fix this by
// removing equivalent duplicates from partition exprs and impose a canonical order
// on partition exprs (both using the canonical equivalence class representatives).
if (lhsHasCompatPartition
&& rhsHasCompatPartition
&& isCompatPartition(
leftChildFragment.getDataPartition(),
rightChildFragment.getDataPartition(),
lhsJoinExprs, rhsJoinExprs, analyzer)) {
node.setChild(0, leftChildFragment.getPlanRoot());
node.setChild(1, rightChildFragment.getPlanRoot());
// fix up PlanNode.fragment_ for the migrated PlanNode tree of the rhs child
leftChildFragment.setFragmentInPlanTree(node.getChild(1));
// Relocate input fragments of rightChildFragment to leftChildFragment.
for (PlanFragment rhsInput: rightChildFragment.getChildren()) {
leftChildFragment.getChildren().add(rhsInput);
}
// Remove right fragment because its plan tree has been merged into leftFragment.
fragments.remove(rightChildFragment);
leftChildFragment.setPlanRoot(node);
return leftChildFragment;
}
// The lhs input fragment is already partitioned on the join exprs.
// Make the HashJoin the new root of leftChildFragment and set the join's
// first child to the lhs plan root. The second child of the join is an
// ExchangeNode that is fed by the rhsInputFragment whose sink repartitions
// its data by the rhs join exprs.
DataPartition rhsJoinPartition = null;
if (lhsHasCompatPartition) {
rhsJoinPartition = getCompatPartition(lhsJoinExprs,
leftChildFragment.getDataPartition(), rhsJoinExprs, analyzer);
if (rhsJoinPartition != null) {
node.setChild(0, leftChildFragment.getPlanRoot());
connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
rightChildFragment.setOutputPartition(rhsJoinPartition);
leftChildFragment.setPlanRoot(node);
return leftChildFragment;
}
}
// Same as above but with rhs and lhs reversed.
DataPartition lhsJoinPartition = null;
if (rhsHasCompatPartition) {
lhsJoinPartition = getCompatPartition(rhsJoinExprs,
rightChildFragment.getDataPartition(), lhsJoinExprs, analyzer);
if (lhsJoinPartition != null) {
node.setChild(1, rightChildFragment.getPlanRoot());
connectChildFragment(node, 0, rightChildFragment, leftChildFragment);
leftChildFragment.setOutputPartition(lhsJoinPartition);
rightChildFragment.setPlanRoot(node);
return rightChildFragment;
}
}
Preconditions.checkState(lhsJoinPartition == null);
Preconditions.checkState(rhsJoinPartition == null);
lhsJoinPartition = DataPartition.hashPartitioned(Expr.cloneList(lhsJoinExprs));
rhsJoinPartition = DataPartition.hashPartitioned(Expr.cloneList(rhsJoinExprs));
// Neither lhs nor rhs are already partitioned on the join exprs.
// Create a new parent fragment containing a HashJoin node with two
// ExchangeNodes as inputs; the latter are the destinations of the
// left- and rightChildFragments, which now partition their output
// on their respective join exprs.
// The new fragment is hash-partitioned on the lhs input join exprs.
ExchangeNode lhsExchange =
new ExchangeNode(ctx_.getNextNodeId(), leftChildFragment.getPlanRoot());
lhsExchange.computeStats(ctx_.getRootAnalyzer());
node.setChild(0, lhsExchange);
ExchangeNode rhsExchange =
new ExchangeNode(ctx_.getNextNodeId(), rightChildFragment.getPlanRoot());
rhsExchange.computeStats(ctx_.getRootAnalyzer());
node.setChild(1, rhsExchange);
// Connect the child fragments in a new fragment, and set the data partition
// of the new fragment and its child fragments.
DataPartition outputPartition;
switch(node.getJoinOp()) {
// For full outer joins the null values of the lhs/rhs join exprs are not
// partitioned so random partition is the best we have now.
case FULL_OUTER_JOIN:
outputPartition = DataPartition.RANDOM;
break;
// For right anti and semi joins the lhs join slots does not appear in the output.
case RIGHT_ANTI_JOIN:
case RIGHT_SEMI_JOIN:
// For right outer joins the null values of the lhs join expr are not partitioned.
case RIGHT_OUTER_JOIN:
outputPartition = rhsJoinPartition;
break;
// Otherwise we're good to use the lhs partition.
default:
outputPartition = lhsJoinPartition;
}
PlanFragment joinFragment =
new PlanFragment(ctx_.getNextFragmentId(), node, outputPartition);
leftChildFragment.setDestination(lhsExchange);
leftChildFragment.setOutputPartition(lhsJoinPartition);
rightChildFragment.setDestination(rhsExchange);
rightChildFragment.setOutputPartition(rhsJoinPartition);
return joinFragment;
}
/**
* Creates either a broadcast join or a repartitioning join depending on the expected
* cost and various constraints. See computeDistributionMode() for more details.
* TODO: don't create a broadcast join if we already anticipate that this will
* exceed the query's memory budget.
*/
private PlanFragment createHashJoinFragment(
HashJoinNode node, PlanFragment rightChildFragment,
PlanFragment leftChildFragment, List<PlanFragment> fragments)
throws ImpalaException {
// For both join types, the total cost is calculated as the amount of data
// sent over the network, plus the amount of data inserted into the hash table.
// broadcast: send the rightChildFragment's output to each node executing
// the leftChildFragment, and build a hash table with it on each node.
Analyzer analyzer = ctx_.getRootAnalyzer();
PlanNode rhsTree = rightChildFragment.getPlanRoot();
long rhsDataSize = -1;
long broadcastCost = -1;
// TODO: IMPALA-4224: update this once we can share the broadcast join data between
// finstances.
int mt_dop = ctx_.getQueryOptions().mt_dop;
int leftChildInstances = leftChildFragment.getNumInstances(mt_dop);
if (rhsTree.getCardinality() != -1) {
rhsDataSize = Math.round(
rhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(rhsTree));
if (leftChildInstances != -1) {
broadcastCost = 2 * rhsDataSize * leftChildInstances;
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("broadcast: cost=" + Long.toString(broadcastCost));
LOG.trace("card=" + Long.toString(rhsTree.getCardinality()) + " row_size="
+ Float.toString(rhsTree.getAvgRowSize()) + " #instances="
+ Integer.toString(leftChildInstances));
}
// repartition: both left- and rightChildFragment are partitioned on the
// join exprs, and a hash table is built with the rightChildFragment's output.
PlanNode lhsTree = leftChildFragment.getPlanRoot();
List<Expr> lhsJoinExprs = new ArrayList<>();
List<Expr> rhsJoinExprs = new ArrayList<>();
for (Expr joinConjunct: node.getEqJoinConjuncts()) {
// no remapping necessary
lhsJoinExprs.add(joinConjunct.getChild(0).clone());
rhsJoinExprs.add(joinConjunct.getChild(1).clone());
}
boolean lhsHasCompatPartition = false;
boolean rhsHasCompatPartition = false;
long partitionCost = -1;
if (lhsTree.getCardinality() != -1 && rhsTree.getCardinality() != -1) {
lhsHasCompatPartition = analyzer.setsHaveValueTransfer(
leftChildFragment.getDataPartition().getPartitionExprs(), lhsJoinExprs,false);
rhsHasCompatPartition = analyzer.setsHaveValueTransfer(
rightChildFragment.getDataPartition().getPartitionExprs(), rhsJoinExprs, false);
Preconditions.checkState(rhsDataSize != -1);
double lhsNetworkCost = (lhsHasCompatPartition) ? 0.0 :
Math.round(
lhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(lhsTree));
double rhsNetworkCost = (rhsHasCompatPartition) ? 0.0 : rhsDataSize;
partitionCost = Math.round(lhsNetworkCost + rhsNetworkCost + rhsDataSize);
}
if (LOG.isTraceEnabled()) {
LOG.trace("partition: cost=" + Long.toString(partitionCost));
LOG.trace("lhs card=" + Long.toString(lhsTree.getCardinality()) + " row_size="
+ Float.toString(lhsTree.getAvgRowSize()));
LOG.trace("rhs card=" + Long.toString(rhsTree.getCardinality()) + " row_size="
+ Float.toString(rhsTree.getAvgRowSize()));
LOG.trace(rhsTree.getExplainString(ctx_.getQueryOptions()));
}
DistributionMode distrMode = computeJoinDistributionMode(
node, broadcastCost, partitionCost, rhsDataSize);
node.setDistributionMode(distrMode);
PlanFragment hjFragment = null;
if (distrMode == DistributionMode.BROADCAST) {
// Doesn't create a new fragment, but modifies leftChildFragment to execute
// the join; the build input is provided by an ExchangeNode, which is the
// destination of the rightChildFragment's output
node.setChild(0, leftChildFragment.getPlanRoot());
connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
leftChildFragment.setPlanRoot(node);
hjFragment = leftChildFragment;
} else {
hjFragment = createPartitionedHashJoinFragment(node, analyzer,
lhsHasCompatPartition, rhsHasCompatPartition, leftChildFragment,
rightChildFragment, lhsJoinExprs, rhsJoinExprs, fragments);
}
return hjFragment;
}
/**
* Determines and returns the distribution mode for the given join based on the expected
* costs and the right-hand size data size. Considers the following:
* - Some join types require a specific distribution strategy to run correctly.
* - Checks for join hints.
* - Uses the default join strategy (query option) when the costs are unknown or tied.
* - Returns broadcast if it is cheaper than partitioned and the expected hash table
* size is within the query mem limit.
* - Otherwise, returns partitioned.
* For 'broadcastCost', 'partitionCost', and 'rhsDataSize' a value of -1 indicates
* unknown, e.g., due to missing stats.
*/
private DistributionMode computeJoinDistributionMode(JoinNode node,
long broadcastCost, long partitionCost, long rhsDataSize) {
// Check join types that require a specific distribution strategy to run correctly.
JoinOperator op = node.getJoinOp();
if (op == JoinOperator.RIGHT_OUTER_JOIN || op == JoinOperator.RIGHT_SEMI_JOIN
|| op == JoinOperator.RIGHT_ANTI_JOIN || op == JoinOperator.FULL_OUTER_JOIN) {
return DistributionMode.PARTITIONED;
}
if (op == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) return DistributionMode.BROADCAST;
// Check join hints.
if (node.getDistributionModeHint() != DistributionMode.NONE) {
return node.getDistributionModeHint();
}
// Use the default mode when the costs are unknown or tied.
if (broadcastCost == -1 || partitionCost == -1 || broadcastCost == partitionCost) {
return DistributionMode.fromThrift(
ctx_.getQueryOptions().getDefault_join_distribution_mode());
}
// Decide the distribution mode based on the estimated costs and the mem limit.
int mt_dop = ctx_.getQueryOptions().mt_dop;
long htSize = Math.round(rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD);
// TODO: IMPALA-4224: update this once we can share the broadcast join data between
// finstances.
if (mt_dop > 1) htSize *= mt_dop;
long memLimit = ctx_.getQueryOptions().mem_limit;
if (broadcastCost <= partitionCost && (memLimit == 0 || htSize <= memLimit)) {
return DistributionMode.BROADCAST;
}
// Partitioned was cheaper or the broadcast HT would not fit within the mem limit.
return DistributionMode.PARTITIONED;
}
/**
* Returns true if the lhs and rhs partitions are physically compatible for executing
* a partitioned join with the given lhs/rhs join exprs. Physical compatibility means
* that lhs/rhs exchange nodes hashing on exactly those partition expressions are
* guaranteed to send two rows with identical partition-expr values to the same node.
* The requirements for physical compatibility are:
* 1. Number of exprs must be the same
* 2. The lhs partition exprs are identical to the lhs join exprs and the rhs partition
* exprs are identical to the rhs join exprs
* 3. Or for each expr in the lhs partition, there must be an equivalent expr in the
* rhs partition at the same ordinal position within the expr list
* (4. The expr types must be identical, but that is enforced later in PlanFragment)
* Conditions 2 and 3 are similar but not the same due to outer joins, e.g., for full
* outer joins condition 3 can never be met, but condition 2 can.
* TODO: Move parts of this function into DataPartition as appropriate.
*/
private boolean isCompatPartition(DataPartition lhsPartition,
DataPartition rhsPartition, List<Expr> lhsJoinExprs, List<Expr> rhsJoinExprs,
Analyzer analyzer) {
List<Expr> lhsPartExprs = lhsPartition.getPartitionExprs();
List<Expr> rhsPartExprs = rhsPartition.getPartitionExprs();
// 1. Sizes must be equal.
if (lhsPartExprs.size() != rhsPartExprs.size()) return false;
// 2. Lhs/rhs join exprs are identical to lhs/rhs partition exprs.
Preconditions.checkState(lhsJoinExprs.size() == rhsJoinExprs.size());
if (lhsJoinExprs.size() == lhsPartExprs.size()) {
if (lhsJoinExprs.equals(lhsPartExprs) && rhsJoinExprs.equals(rhsPartExprs)) {
return true;
}
}
// 3. Each lhs part expr must have an equivalent expr at the same position
// in the rhs part exprs.
for (int i = 0; i < lhsPartExprs.size(); ++i) {
if (!analyzer.exprsHaveValueTransfer(lhsPartExprs.get(i), rhsPartExprs.get(i),
true)) {
return false;
}
}
return true;
}
/**
* Returns a new data partition that is suitable for creating an exchange node to feed
* a partitioned hash join. The hash join is assumed to be placed in a fragment with an
* existing data partition that is compatible with either the lhs or rhs join exprs
* (srcPartition belongs to the fragment and srcJoinExprs are the compatible exprs).
* The returned partition uses the given joinExprs which are assumed to be the lhs or
* rhs join exprs, whichever srcJoinExprs are not.
* The returned data partition has two important properties to ensure correctness:
* 1. It has exactly the same number of hash exprs as the srcPartition (IMPALA-1307),
* possibly by removing redundant exprs from joinExprs or adding some joinExprs
* multiple times to match the srcPartition
* 2. The hash exprs are ordered based on their corresponding 'matches' in
* the existing srcPartition (IMPALA-1324).
* Returns null if no compatible data partition could be constructed.
* TODO: Move parts of this function into DataPartition as appropriate.
* TODO: Make comment less operational and more semantic.
*/
private DataPartition getCompatPartition(List<Expr> srcJoinExprs,
DataPartition srcPartition, List<Expr> joinExprs, Analyzer analyzer) {
Preconditions.checkState(srcPartition.isHashPartitioned());
List<Expr> srcPartExprs = srcPartition.getPartitionExprs();
List<Expr> resultPartExprs = new ArrayList<>();
for (Expr srcPartExpr : srcPartExprs) {
for (int j = 0; j < srcJoinExprs.size(); ++j) {
if (analyzer.exprsHaveValueTransfer(srcPartExpr, srcJoinExprs.get(j), false)) {
resultPartExprs.add(joinExprs.get(j).clone());
break;
}
}
}
if (resultPartExprs.size() != srcPartExprs.size()) return null;
return DataPartition.hashPartitioned(resultPartExprs);
}
/**
* Returns a new fragment with a UnionNode as its root. The data partition of the
* returned fragment and how the data of the child fragments is consumed depends on the
* data partitions of the child fragments:
* - All child fragments are unpartitioned or partitioned: The returned fragment has an
* UNPARTITIONED or RANDOM data partition, respectively. The UnionNode absorbs the
* plan trees of all child fragments.
* - Mixed partitioned/unpartitioned child fragments: The returned fragment is
* RANDOM partitioned. The plan trees of all partitioned child fragments are absorbed
* into the UnionNode. All unpartitioned child fragments are connected to the
* UnionNode via a RANDOM exchange, and remain unchanged otherwise.
*/
private PlanFragment createUnionNodeFragment(UnionNode unionNode,
List<PlanFragment> childFragments, List<PlanFragment> fragments)
throws ImpalaException {
Preconditions.checkState(unionNode.getChildren().size() == childFragments.size());
// A UnionNode could have no children or constant selects if all of its operands
// were dropped because of constant predicates that evaluated to false.
if (unionNode.getChildren().isEmpty()) {
return new PlanFragment(
ctx_.getNextFragmentId(), unionNode, DataPartition.UNPARTITIONED);
}
Preconditions.checkState(!childFragments.isEmpty());
int numUnpartitionedChildFragments = 0;
for (int i = 0; i < childFragments.size(); ++i) {
if (!childFragments.get(i).isPartitioned()) ++numUnpartitionedChildFragments;
}
// remove all children to avoid them being tagged with the wrong
// fragment (in the PlanFragment c'tor; we haven't created ExchangeNodes yet)
unionNode.clearChildren();
// If all child fragments are unpartitioned, return a single unpartitioned fragment
// with a UnionNode that merges all child fragments.
if (numUnpartitionedChildFragments == childFragments.size()) {
PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(),
unionNode, DataPartition.UNPARTITIONED);
// Absorb the plan trees of all childFragments into unionNode
// and fix up the fragment tree in the process.
for (int i = 0; i < childFragments.size(); ++i) {
unionNode.addChild(childFragments.get(i).getPlanRoot());
unionFragment.setFragmentInPlanTree(unionNode.getChild(i));
unionFragment.addChildren(childFragments.get(i).getChildren());
}
unionNode.init(ctx_.getRootAnalyzer());
// All child fragments have been absorbed into unionFragment.
fragments.removeAll(childFragments);
return unionFragment;
}
// There is at least one partitioned child fragment.
PlanFragment unionFragment = new PlanFragment(
ctx_.getNextFragmentId(), unionNode, DataPartition.RANDOM);
for (int i = 0; i < childFragments.size(); ++i) {
PlanFragment childFragment = childFragments.get(i);
if (childFragment.isPartitioned()) {
// absorb the plan trees of all partitioned child fragments into unionNode
unionNode.addChild(childFragment.getPlanRoot());
unionFragment.setFragmentInPlanTree(unionNode.getChild(i));
unionFragment.addChildren(childFragment.getChildren());
fragments.remove(childFragment);
} else {
// dummy entry for subsequent addition of the ExchangeNode
unionNode.addChild(null);
// Connect the unpartitioned child fragments to unionNode via a random exchange.
connectChildFragment(unionNode, i, unionFragment, childFragment);
childFragment.setOutputPartition(DataPartition.RANDOM);
}
}
unionNode.init(ctx_.getRootAnalyzer());
return unionFragment;
}
/**
* Adds the SelectNode as the new plan root to the child fragment and returns
* the child fragment.
*/
private PlanFragment createSelectNodeFragment(SelectNode selectNode,
List<PlanFragment> childFragments) {
Preconditions.checkState(selectNode.getChildren().size() == childFragments.size());
PlanFragment childFragment = childFragments.get(0);
// set the child explicitly, an ExchangeNode might have been inserted
// (whereas selectNode.child[0] would point to the original child)
selectNode.setChild(0, childFragment.getPlanRoot());
childFragment.setPlanRoot(selectNode);
return childFragment;
}
/**
* Adds the CardinalityCheckNode as the new plan root to the child fragment and returns
* the child fragment.
*/
private PlanFragment createCardinalityCheckNodeFragment(
CardinalityCheckNode cardinalityCheckNode,
List<PlanFragment> childFragments) throws ImpalaException {
PlanFragment childFragment = childFragments.get(0);
// The cardinality check must execute on a single node.
if (childFragment.getOutputPartition().isPartitioned()) {
childFragment = createMergeFragment(childFragment);
}
// Set the child explicitly, an ExchangeNode might have been inserted
// (whereas cardinalityCheckNode.child[0] would point to the original child)
cardinalityCheckNode.setChild(0, childFragment.getPlanRoot());
childFragment.setPlanRoot(cardinalityCheckNode);
return childFragment;
}
/**
* Replace node's child at index childIdx with an ExchangeNode that receives its
* input from childFragment. ParentFragment contains node and the new ExchangeNode.
*/
private void connectChildFragment(PlanNode node, int childIdx,
PlanFragment parentFragment, PlanFragment childFragment) throws ImpalaException {
ExchangeNode exchangeNode =
new ExchangeNode(ctx_.getNextNodeId(), childFragment.getPlanRoot());
exchangeNode.init(ctx_.getRootAnalyzer());
exchangeNode.setFragment(parentFragment);
node.setChild(childIdx, exchangeNode);
childFragment.setDestination(exchangeNode);
}
/**
* Create a new fragment containing a single ExchangeNode that consumes the output
* of childFragment, set the destination of childFragment to the new parent
* and the output partition of childFragment to that of the new parent.
* TODO: the output partition of a child isn't necessarily the same as the data
* partition of the receiving parent (if there is more materialization happening
* in the parent, such as during distinct aggregation). Do we care about the data
* partition of the parent being applicable to the *output* of the parent (it's
* correct for the input).
*/
private PlanFragment createParentFragment(
PlanFragment childFragment, DataPartition parentPartition)
throws ImpalaException {
ExchangeNode exchangeNode =
new ExchangeNode(ctx_.getNextNodeId(), childFragment.getPlanRoot());
exchangeNode.init(ctx_.getRootAnalyzer());
PlanFragment parentFragment = new PlanFragment(ctx_.getNextFragmentId(),
exchangeNode, parentPartition);
childFragment.setDestination(exchangeNode);
childFragment.setOutputPartition(parentPartition);
return parentFragment;
}
/**
* Returns a fragment that materializes the aggregation result of 'node'.
* If the child fragment is partitioned, the result fragment will be partitioned on
* the grouping exprs of 'node'.
* If 'node' is phase 1 of a 2-phase DISTINCT aggregation or the 'transpose' phase of a
* multiple-distinct aggregation, this will simply add 'node' to the child fragment and
* return the child fragment; the new fragment will be created by the call of
* createAggregationFragment() for the phase 2 AggregationNode.
*/
private PlanFragment createAggregationFragment(AggregationNode node,
PlanFragment childFragment, List<PlanFragment> fragments)
throws ImpalaException {
if (!childFragment.isPartitioned() || node.getAggPhase() == AggPhase.TRANSPOSE) {
// nothing to distribute; do full aggregation directly within childFragment
childFragment.addPlanRoot(node);
return childFragment;
}
if (node.isDistinctAgg()) {
// 'node' is phase 1 of a DISTINCT aggregation; the actual agg fragment
// will get created in the next createAggregationFragment() call
// for the parent AggregationNode
childFragment.addPlanRoot(node);
return childFragment;
}
// Check if 'node' is phase 2 of a DISTINCT aggregation.
boolean isDistinct = node.getChild(0) instanceof AggregationNode
&& ((AggregationNode) (node.getChild(0))).isDistinctAgg();
if (isDistinct) {
return createPhase2DistinctAggregationFragment(node, childFragment, fragments);
} else {
return createMergeAggregationFragment(node, childFragment);
}
}
/**
* Returns a fragment that materializes the final result of an aggregation where
* 'childFragment' is a partitioned fragment and 'node' is not part of a distinct
* aggregation.
*/
private PlanFragment createMergeAggregationFragment(
AggregationNode node, PlanFragment childFragment) throws ImpalaException {
Preconditions.checkArgument(childFragment.isPartitioned());
List<Expr> partitionExprs = node.getMergePartitionExprs(ctx_.getRootAnalyzer());
boolean hasGrouping = !partitionExprs.isEmpty();
DataPartition parentPartition = null;
if (hasGrouping) {
boolean childHasCompatPartition = node.isSingleClassAgg()
&& ctx_.getRootAnalyzer().setsHaveValueTransfer(partitionExprs,
childFragment.getDataPartition().getPartitionExprs(), true);
if (childHasCompatPartition) {
// The data is already partitioned on the required expressions. We can do the
// aggregation in the child fragment without an extra merge step.
// An exchange+merge step is required if the grouping exprs reference a tuple
// that is made nullable in 'childFragment' to bring NULLs from outer-join
// non-matches together.
childFragment.addPlanRoot(node);
return childFragment;
}
parentPartition = DataPartition.hashPartitioned(partitionExprs);
} else {
parentPartition = DataPartition.UNPARTITIONED;
}
// the original aggregation materializes the intermediate agg tuple and goes
// into the child fragment; merge aggregation materializes the output agg tuple
// and goes into a parent fragment
childFragment.addPlanRoot(node);
node.setIntermediateTuple();
node.setIsPreagg(ctx_);
// if there is a limit, we need to transfer it from the pre-aggregation
// node in the child fragment to the merge aggregation node in the parent
long limit = node.getLimit();
node.unsetLimit();
node.unsetNeedsFinalize();
// place a merge aggregation step in a new fragment
PlanFragment mergeFragment = createParentFragment(childFragment, parentPartition);
AggregationNode mergeAggNode = new AggregationNode(ctx_.getNextNodeId(),
mergeFragment.getPlanRoot(), node.getMultiAggInfo(), AggPhase.FIRST_MERGE);
mergeAggNode.init(ctx_.getRootAnalyzer());
mergeAggNode.setLimit(limit);
// Merge of non-grouping agg only processes one tuple per Impala daemon - codegen
// will cost more than benefit.
if (!hasGrouping) {
mergeFragment.getPlanRoot().setDisableCodegen(true);
mergeAggNode.setDisableCodegen(true);
}
// HAVING predicates can only be evaluated after the merge agg step
node.transferConjuncts(mergeAggNode);
// Recompute stats after transferring the conjuncts_ (order is important).
node.computeStats(ctx_.getRootAnalyzer());
mergeFragment.getPlanRoot().computeStats(ctx_.getRootAnalyzer());
mergeAggNode.computeStats(ctx_.getRootAnalyzer());
// Set new plan root after updating stats.
mergeFragment.addPlanRoot(mergeAggNode);
return mergeFragment;
}
/**
* Returns a fragment that materialises the final result of a distinct aggregation
* where 'childFragment' is a partitioned fragment with the phase-1 aggregation
* as its root.
*/
private PlanFragment createPhase2DistinctAggregationFragment(
AggregationNode phase2AggNode, PlanFragment childFragment,
List<PlanFragment> fragments) throws ImpalaException {
// The phase-1 aggregation node is already in the child fragment.
Preconditions.checkState(phase2AggNode.getChild(0) == childFragment.getPlanRoot());
// When a query has both grouping and distinct exprs, Impala can optionally include
// the distinct exprs in the hash exchange of the first aggregation phase to spread
// the data among more nodes. However, this plan requires another hash exchange on
// the grouping exprs in the second phase which is not required when omitting the
// distinct exprs in the first phase. Shuffling by both is better if the grouping
// exprs have low NDVs.
boolean shuffleDistinctExprs = ctx_.getQueryOptions().shuffle_distinct_exprs;
boolean hasGrouping = phase2AggNode.hasGrouping();
AggregationNode phase1AggNode = ((AggregationNode) phase2AggNode.getChild(0));
// With grouping, the output partition exprs of the child are the (input) grouping
// exprs of the parent. The grouping exprs reference the output tuple of phase-1
// but the partitioning happens on the intermediate tuple of the phase-1.
List<Expr> phase1PartitionExprs =
phase1AggNode.getMergePartitionExprs(ctx_.getRootAnalyzer());
PlanFragment firstMergeFragment;
boolean childHasCompatPartition = phase1AggNode.isSingleClassAgg()
&& ctx_.getRootAnalyzer().setsHaveValueTransfer(phase1PartitionExprs,
childFragment.getDataPartition().getPartitionExprs(), true);
if (childHasCompatPartition) {
// The data is already partitioned on the required expressions, we can skip the
// phase-1 merge step.
childFragment.addPlanRoot(phase2AggNode);
firstMergeFragment = childFragment;
} else {
phase1AggNode.setIntermediateTuple();
phase1AggNode.setIsPreagg(ctx_);
DataPartition phase1MergePartition =
DataPartition.hashPartitioned(phase1PartitionExprs);
// place phase-1 merge aggregation step in a new fragment
firstMergeFragment = createParentFragment(childFragment, phase1MergePartition);
AggregationNode phase1MergeAggNode = new AggregationNode(ctx_.getNextNodeId(),
phase1AggNode, phase1AggNode.getMultiAggInfo(), AggPhase.FIRST_MERGE);
phase1MergeAggNode.init(ctx_.getRootAnalyzer());
phase1MergeAggNode.unsetNeedsFinalize();
phase1MergeAggNode.setIntermediateTuple();
firstMergeFragment.addPlanRoot(phase1MergeAggNode);
// the phase-2 aggregation consumes the output of the phase-1 merge agg;
// if there is a limit, it had already been placed with the phase-2 aggregation
// step (which is where it should be)
firstMergeFragment.addPlanRoot(phase2AggNode);
if (shuffleDistinctExprs || !hasGrouping) fragments.add(firstMergeFragment);
}
if (!shuffleDistinctExprs && hasGrouping) return firstMergeFragment;
phase2AggNode.unsetNeedsFinalize();
phase2AggNode.setIntermediateTuple();
// Limit should be applied at the final merge aggregation node
long limit = phase2AggNode.getLimit();
phase2AggNode.unsetLimit();
DataPartition phase2MergePartition;
List<Expr> phase2PartitionExprs =
phase2AggNode.getMergePartitionExprs(ctx_.getRootAnalyzer());
if (phase2PartitionExprs.isEmpty()) {
phase2MergePartition = DataPartition.UNPARTITIONED;
} else {
phase2AggNode.setIsPreagg(ctx_);
phase2MergePartition = DataPartition.hashPartitioned(phase2PartitionExprs);
}
PlanFragment secondMergeFragment =
createParentFragment(firstMergeFragment, phase2MergePartition);
AggregationNode phase2MergeAggNode = new AggregationNode(ctx_.getNextNodeId(),
phase2AggNode, phase2AggNode.getMultiAggInfo(), AggPhase.SECOND_MERGE);
phase2MergeAggNode.init(ctx_.getRootAnalyzer());
phase2MergeAggNode.setLimit(limit);
// Transfer having predicates to final merge agg node
phase2AggNode.transferConjuncts(phase2MergeAggNode);
secondMergeFragment.addPlanRoot(phase2MergeAggNode);
return secondMergeFragment;
}
/**
* Returns a fragment that produces the output of either an AnalyticEvalNode
* or of the SortNode that provides the input to an AnalyticEvalNode.
* ('node' can be either an AnalyticEvalNode or a SortNode).
* The returned fragment is either partitioned on the Partition By exprs or
* unpartitioned in the absence of such exprs.
*/
private PlanFragment createAnalyticFragment(PlanNode node,
PlanFragment childFragment, List<PlanFragment> fragments)
throws ImpalaException {
Preconditions.checkState(
node instanceof SortNode || node instanceof AnalyticEvalNode);
if (node instanceof AnalyticEvalNode) {
AnalyticEvalNode analyticNode = (AnalyticEvalNode) node;
if (analyticNode.getPartitionExprs().isEmpty()
&& analyticNode.getOrderByElements().isEmpty()) {
// no Partition-By/Order-By exprs: compute analytic exprs in single
// unpartitioned fragment
PlanFragment fragment = childFragment;
if (childFragment.isPartitioned()) {
fragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED);
}
fragment.addPlanRoot(analyticNode);
return fragment;
} else {
childFragment.addPlanRoot(analyticNode);
return childFragment;
}
}
SortNode sortNode = (SortNode) node;
Preconditions.checkState(sortNode.isAnalyticSort());
PlanFragment analyticFragment = childFragment;
if (sortNode.getInputPartition() != null) {
sortNode.getInputPartition().substitute(
childFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer());
// Make sure the childFragment's output is partitioned as required by the sortNode.
DataPartition sortPartition = sortNode.getInputPartition();
if (!childFragment.getDataPartition().equals(sortPartition)) {
analyticFragment = createParentFragment(childFragment, sortPartition);
}
}
analyticFragment.addPlanRoot(sortNode);
return analyticFragment;
}
/**
* Returns a new unpartitioned fragment that materializes the result of the given
* SortNode. If the child fragment is partitioned, returns a new fragment with a
* sort-merging exchange that merges the results of the partitioned sorts.
* The offset and limit are adjusted in the child and parent plan nodes to produce
* the correct result.
*/
private PlanFragment createOrderByFragment(SortNode node,
PlanFragment childFragment, List<PlanFragment> fragments)
throws ImpalaException {
node.setChild(0, childFragment.getPlanRoot());
childFragment.addPlanRoot(node);
if (!childFragment.isPartitioned()) return childFragment;
// Remember original offset and limit.
boolean hasLimit = node.hasLimit();
long limit = node.getLimit();
long offset = node.getOffset();
// Create a new fragment for a sort-merging exchange.
PlanFragment mergeFragment =
createParentFragment(childFragment, DataPartition.UNPARTITIONED);
ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot();
// Set limit, offset and merge parameters in the exchange node.
exchNode.unsetLimit();
if (hasLimit) exchNode.setLimit(limit);
exchNode.setMergeInfo(node.getSortInfo(), offset);
// Child nodes should not process the offset. If there is a limit,
// the child nodes need only return (offset + limit) rows.
SortNode childSortNode = (SortNode) childFragment.getPlanRoot();
Preconditions.checkState(node == childSortNode);
if (hasLimit) {
childSortNode.unsetLimit();
childSortNode.setLimit(PlanNode.checkedAdd(limit, offset));
}
childSortNode.setOffset(0);
childSortNode.computeStats(ctx_.getRootAnalyzer());
exchNode.computeStats(ctx_.getRootAnalyzer());
return mergeFragment;
}
}