blob: a0e4fda5035f85fbdd550b036946dd0b13383e87 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.impala.analysis.AnalysisContext;
import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.ColumnLineageGraph;
import org.apache.impala.analysis.ColumnLineageGraph.ColumnLabel;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.InsertStmt;
import org.apache.impala.analysis.JoinOperator;
import org.apache.impala.analysis.QueryStmt;
import org.apache.impala.analysis.SortInfo;
import org.apache.impala.analysis.TupleId;
import org.apache.impala.catalog.FeHBaseTable;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.NotImplementedException;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TQueryExecRequest;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TRuntimeFilterMode;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.util.EventSequence;
import org.apache.impala.util.KuduUtil;
import org.apache.impala.util.MathUtil;
import org.apache.impala.util.MaxRowsProcessedVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import static org.apache.impala.analysis.ToSqlOptions.SHOW_IMPLICIT_CASTS;
/**
* Creates an executable plan from an analyzed parse tree and query options.
*/
public class Planner {
private final static Logger LOG = LoggerFactory.getLogger(Planner.class);
// Minimum per-host resource requirements to ensure that no plan node set can have
// estimates of zero, even if the contained PlanNodes have estimates of zero.
public static final long MIN_PER_HOST_MEM_ESTIMATE_BYTES = 10 * 1024 * 1024;
// The amount of memory added to a dedicated coordinator's memory estimate to
// compensate for underestimation. In the general case estimates for exec
// nodes tend to overestimate and should work fine but for estimates in the
// 100-500 MB space, underestimates can be a problem. We pick a value of 100MB
// because it is trivial for large estimates and small enough to not make a
// huge impact on the coordinator's process memory (which ideally would be
// large).
public static final long DEDICATED_COORD_SAFETY_BUFFER_BYTES = 100 * 1024 * 1024;
public static final ResourceProfile MIN_PER_HOST_RESOURCES =
new ResourceProfileBuilder()
.setMemEstimateBytes(MIN_PER_HOST_MEM_ESTIMATE_BYTES)
.setMinMemReservationBytes(0)
.build();
private final PlannerContext ctx_;
public Planner(AnalysisResult analysisResult, TQueryCtx queryCtx,
EventSequence timeline) {
ctx_ = new PlannerContext(analysisResult, queryCtx, timeline);
}
public TQueryCtx getQueryCtx() { return ctx_.getQueryCtx(); }
public AnalysisContext.AnalysisResult getAnalysisResult() {
return ctx_.getAnalysisResult();
}
/**
* Returns a list of plan fragments for executing an analyzed parse tree.
* May return a single-node or distributed executable plan. If enabled (through a
* query option), computes runtime filters for dynamic partition pruning.
*
* Plan generation may fail and throw for the following reasons:
* 1. Expr evaluation failed, e.g., during partition pruning.
* 2. A certain feature is not yet implemented, e.g., physical join implementation for
* outer/semi joins without equi conjuncts.
* 3. Expr substitution failed, e.g., because an expr was substituted with a type that
* render the containing expr semantically invalid. Analysis should have ensured
* that such an expr substitution during plan generation never fails. If it does,
* that typically means there is a bug in analysis, or a broken/missing smap.
*/
public List<PlanFragment> createPlan() throws ImpalaException {
SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_);
DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
ctx_.getTimeline().markEvent("Single node plan created");
List<PlanFragment> fragments = null;
checkForSmallQueryOptimization(singleNodePlan);
// Join rewrites.
invertJoins(singleNodePlan, ctx_.isSingleNodeExec());
singleNodePlan = useNljForSingularRowBuilds(singleNodePlan, ctx_.getRootAnalyzer());
// MT_DOP > 0 is not supported by default for plans with base table joins or table
// sinks: we only allow MT_DOP > 0 with such plans if --unlock_mt_dop=true is
// specified. We allow single node plans with mt_dop since there is no actual
// parallelism.
if (!ctx_.isSingleNodeExec() && ctx_.getQueryOptions().mt_dop > 0
&& (!RuntimeEnv.INSTANCE.isTestEnv()
|| RuntimeEnv.INSTANCE.isMtDopValidationEnabled())
&& !BackendConfig.INSTANCE.isMtDopUnlocked()
&& (ctx_.hasTableSink()
|| singleNodePlanner.hasUnsupportedMtDopJoin(singleNodePlan))) {
if (BackendConfig.INSTANCE.mtDopAutoFallback()) {
// Fall back to non-dop mode. This assumes that the mt_dop value is only used
// in the distributed planning process, which should be generally true as long
// as the value isn't cached in any plan nodes.
ctx_.getQueryOptions().setMt_dop(0);
} else {
throw new NotImplementedException(
"MT_DOP not supported for plans with base table joins or table sinks.");
}
}
singleNodePlanner.validatePlan(singleNodePlan);
if (ctx_.isSingleNodeExec()) {
// create one fragment containing the entire single-node plan tree
fragments = Lists.newArrayList(new PlanFragment(
ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED));
} else {
// create distributed plan
fragments = distributedPlanner.createPlanFragments(singleNodePlan);
}
// Create runtime filters.
PlanFragment rootFragment = fragments.get(fragments.size() - 1);
if (ctx_.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) {
RuntimeFilterGenerator.generateRuntimeFilters(ctx_, rootFragment.getPlanRoot());
ctx_.getTimeline().markEvent("Runtime filters computed");
}
rootFragment.verifyTree();
ExprSubstitutionMap rootNodeSmap = rootFragment.getPlanRoot().getOutputSmap();
if (ctx_.isInsertOrCtas()) {
InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
insertStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
if (!ctx_.isSingleNodeExec()) {
// repartition on partition keys
rootFragment = distributedPlanner.createInsertFragment(
rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
}
// Add optional sort node to the plan, based on clustered/noclustered plan hint.
createPreInsertSort(insertStmt, rootFragment, ctx_.getRootAnalyzer());
// set up table sink for root fragment
rootFragment.setSink(insertStmt.createDataSink());
} else {
QueryStmt queryStmt = ctx_.getQueryStmt();
queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
List<Expr> resultExprs = queryStmt.getResultExprs();
if (ctx_.isUpdate()) {
// Set up update sink for root fragment
rootFragment.setSink(
ctx_.getAnalysisResult().getUpdateStmt().createDataSink(resultExprs));
} else if (ctx_.isDelete()) {
// Set up delete sink for root fragment
rootFragment.setSink(
ctx_.getAnalysisResult().getDeleteStmt().createDataSink(resultExprs));
} else if (ctx_.isQuery()) {
rootFragment.setSink(
ctx_.getAnalysisResult().getQueryStmt().createDataSink(resultExprs));
}
}
// The check for disabling codegen uses estimates of rows per node so must be done
// on the distributed plan.
checkForDisableCodegen(rootFragment.getPlanRoot());
if (LOG.isTraceEnabled()) {
LOG.trace("desctbl: " + ctx_.getRootAnalyzer().getDescTbl().debugString());
LOG.trace("root sink: " + rootFragment.getSink().getExplainString(
"", "", ctx_.getQueryOptions(), TExplainLevel.VERBOSE));
LOG.trace("finalize plan fragments");
}
for (PlanFragment fragment: fragments) {
fragment.finalizeExchanges(ctx_.getRootAnalyzer());
}
Collections.reverse(fragments);
ctx_.getTimeline().markEvent("Distributed plan created");
ColumnLineageGraph graph = ctx_.getRootAnalyzer().getColumnLineageGraph();
if (BackendConfig.INSTANCE.getComputeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
// Lineage is disabled for UPDATE AND DELETE statements
if (ctx_.isUpdateOrDelete()) return fragments;
// Compute the column lineage graph
if (ctx_.isInsertOrCtas()) {
InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
FeTable targetTable = insertStmt.getTargetTable();
Preconditions.checkNotNull(targetTable);
if (targetTable instanceof FeKuduTable) {
if (ctx_.isInsert()) {
// For insert statements on Kudu tables, we only need to consider
// the labels of columns mentioned in the column list.
List<String> mentionedColumns = insertStmt.getMentionedColumns();
Preconditions.checkState(!mentionedColumns.isEmpty());
List<ColumnLabel> targetColLabels = new ArrayList<>();
String tblFullName = targetTable.getFullName();
for (String column: mentionedColumns) {
targetColLabels.add(new ColumnLabel(column, targetTable.getTableName()));
}
graph.addTargetColumnLabels(targetColLabels);
} else {
graph.addTargetColumnLabels(targetTable);
}
} else if (targetTable instanceof FeHBaseTable) {
graph.addTargetColumnLabels(targetTable);
} else {
graph.addTargetColumnLabels(targetTable);
}
} else {
graph.addTargetColumnLabels(ctx_.getQueryStmt().getColLabels().stream()
.map(col -> new ColumnLabel(col))
.collect(Collectors.toList()));
}
List<Expr> outputExprs = new ArrayList<>();
rootFragment.getSink().collectExprs(outputExprs);
graph.computeLineageGraph(outputExprs, ctx_.getRootAnalyzer());
if (LOG.isTraceEnabled()) LOG.trace("lineage: " + graph.debugString());
ctx_.getTimeline().markEvent("Lineage info computed");
}
return fragments;
}
/**
* Return a list of plans, each represented by the root of their fragment trees.
* TODO: roll into createPlan()
*/
public List<PlanFragment> createParallelPlans() throws ImpalaException {
Preconditions.checkState(ctx_.getQueryOptions().mt_dop > 0);
List<PlanFragment> distrPlan = createPlan();
Preconditions.checkNotNull(distrPlan);
List<PlanFragment> parallelPlans;
// TODO: IMPALA-4224: Parallel plans are not executable
if (RuntimeEnv.INSTANCE.isTestEnv()) {
ParallelPlanner planner = new ParallelPlanner(ctx_);
parallelPlans = planner.createPlans(distrPlan.get(0));
} else {
parallelPlans = Collections.singletonList(distrPlan.get(0));
}
ctx_.getTimeline().markEvent("Parallel plans created");
return parallelPlans;
}
/**
* Return combined explain string for all plan fragments.
* Includes the estimated resource requirements from the request if set.
* Uses a default level of EXTENDED, unless overriden by the
* 'explain_level' query option.
*/
public String getExplainString(List<PlanFragment> fragments,
TQueryExecRequest request) {
// use EXTENDED by default for all non-explain statements
TExplainLevel explainLevel = TExplainLevel.EXTENDED;
// use the query option for explain stmts and tests (e.g., planner tests)
if (ctx_.getAnalysisResult().isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
explainLevel = ctx_.getQueryOptions().getExplain_level();
}
return getExplainString(fragments, request, explainLevel);
}
/**
* Return combined explain string for all plan fragments and with an
* explicit explain level.
* Includes the estimated resource requirements from the request if set.
*/
public String getExplainString(List<PlanFragment> fragments,
TQueryExecRequest request, TExplainLevel explainLevel) {
StringBuilder str = new StringBuilder();
boolean hasHeader = false;
// Only some requests (queries, DML, etc) have a resource profile.
if (request.isSetMax_per_host_min_mem_reservation()) {
Preconditions.checkState(request.isSetMax_per_host_thread_reservation());
Preconditions.checkState(request.isSetPer_host_mem_estimate());
str.append(String.format(
"Max Per-Host Resource Reservation: Memory=%s Threads=%d\n",
PrintUtils.printBytes(request.getMax_per_host_min_mem_reservation()),
request.getMax_per_host_thread_reservation()));
str.append(String.format("Per-Host Resource Estimates: Memory=%s\n",
PrintUtils.printBytesRoundedToMb(request.getPer_host_mem_estimate())));
if (BackendConfig.INSTANCE.useDedicatedCoordinatorEstimates()) {
str.append(String.format("Dedicated Coordinator Resource Estimate: Memory=%s\n",
PrintUtils.printBytesRoundedToMb(request.getDedicated_coord_mem_estimate())));
}
hasHeader = true;
}
// Warn if the planner is running in DEBUG mode.
if (request.query_ctx.client_request.query_options.planner_testcase_mode) {
str.append("WARNING: The planner is running in TESTCASE mode. This should only be "
+ "used by developers for debugging.\nTo disable it, do SET " +
"PLANNER_TESTCASE_MODE=false.\n");
}
if (request.query_ctx.disable_codegen_hint) {
str.append("Codegen disabled by planner\n");
}
// IMPALA-1983 In the case of corrupt stats, issue a warning for all queries except
// child queries of 'compute stats'.
if (!request.query_ctx.isSetParent_query_id() &&
request.query_ctx.isSetTables_with_corrupt_stats() &&
!request.query_ctx.getTables_with_corrupt_stats().isEmpty()) {
List<String> tableNames = new ArrayList<>();
for (TTableName tableName: request.query_ctx.getTables_with_corrupt_stats()) {
tableNames.add(tableName.db_name + "." + tableName.table_name);
}
str.append(
"WARNING: The following tables have potentially corrupt table statistics.\n" +
"Drop and re-compute statistics to resolve this problem.\n" +
Joiner.on(", ").join(tableNames) + "\n");
hasHeader = true;
}
// Append warning about tables missing stats except for child queries of
// 'compute stats'. The parent_query_id is only set for compute stats child queries.
if (!request.query_ctx.isSetParent_query_id() &&
request.query_ctx.isSetTables_missing_stats() &&
!request.query_ctx.getTables_missing_stats().isEmpty()) {
List<String> tableNames = new ArrayList<>();
for (TTableName tableName: request.query_ctx.getTables_missing_stats()) {
tableNames.add(tableName.db_name + "." + tableName.table_name);
}
str.append("WARNING: The following tables are missing relevant table " +
"and/or column statistics.\n" + Joiner.on(", ").join(tableNames) + "\n");
hasHeader = true;
}
if (request.query_ctx.isSetTables_missing_diskids()) {
List<String> tableNames = new ArrayList<>();
for (TTableName tableName: request.query_ctx.getTables_missing_diskids()) {
tableNames.add(tableName.db_name + "." + tableName.table_name);
}
str.append("WARNING: The following tables have scan ranges with missing " +
"disk id information.\n" + Joiner.on(", ").join(tableNames) + "\n");
hasHeader = true;
}
if (request.query_ctx.isDisable_spilling()) {
str.append("WARNING: Spilling is disabled for this query as a safety guard.\n" +
"Reason: Query option disable_unsafe_spills is set, at least one table\n" +
"is missing relevant stats, and no plan hints were given.\n");
hasHeader = true;
}
if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
// In extended explain include the analyzed query text showing implicit casts
String queryText = ctx_.getQueryStmt().toSql(SHOW_IMPLICIT_CASTS);
String wrappedText = PrintUtils.wrapString("Analyzed query: " + queryText, 80);
str.append(wrappedText).append("\n");
hasHeader = true;
}
// Note that the analyzed query text must be the last thing in the header.
// This is to help tests that parse the header.
// Add the blank line that indicates the end of the header
if (hasHeader) str.append("\n");
if (explainLevel.ordinal() < TExplainLevel.VERBOSE.ordinal()) {
// Print the non-fragmented parallel plan.
str.append(fragments.get(0).getExplainString(ctx_.getQueryOptions(), explainLevel));
} else {
// Print the fragmented parallel plan.
for (int i = 0; i < fragments.size(); ++i) {
PlanFragment fragment = fragments.get(i);
str.append(fragment.getExplainString(ctx_.getQueryOptions(), explainLevel));
if (i < fragments.size() - 1) str.append("\n");
}
}
return str.toString();
}
/**
* Computes the per-host resource profile for the given plans, i.e. the peak resources
* consumed by all fragment instances belonging to the query per host. Sets the
* per-host resource values in 'request'.
*/
public void computeResourceReqs(List<PlanFragment> planRoots,
TQueryCtx queryCtx, TQueryExecRequest request) {
Preconditions.checkState(!planRoots.isEmpty());
Preconditions.checkNotNull(request);
TQueryOptions queryOptions = ctx_.getRootAnalyzer().getQueryOptions();
int mtDop = queryOptions.getMt_dop();
// Peak per-host peak resources for all plan fragments, assuming that all fragments
// are scheduled on all nodes. The actual per-host resource requirements are computed
// after scheduling.
ResourceProfile maxPerHostPeakResources = ResourceProfile.invalid();
long totalRuntimeFilterMemBytes = 0;
// Do a pass over all the fragments to compute resource profiles. Compute the
// profiles bottom-up since a fragment's profile may depend on its descendants.
PlanFragment rootFragment = planRoots.get(0);
List<PlanFragment> allFragments = rootFragment.getNodesPostOrder();
for (PlanFragment fragment: allFragments) {
// Compute the per-node, per-sink and aggregate profiles for the fragment.
fragment.computeResourceProfile(ctx_.getRootAnalyzer());
// Different fragments do not synchronize their Open() and Close(), so the backend
// does not provide strong guarantees about whether one fragment instance releases
// resources before another acquires them. Conservatively assume that all fragment
// instances run on all backends with max DOP, and can consume their peak resources
// at the same time, i.e. that the query-wide peak resources is the sum of the
// per-fragment-instance peak resources.
maxPerHostPeakResources = maxPerHostPeakResources.sum(
fragment.getResourceProfile().multiply(fragment.getNumInstancesPerHost(mtDop)));
// Coordinator has to have a copy of each of the runtime filters to perform filter
// aggregation.
totalRuntimeFilterMemBytes += fragment.getRuntimeFiltersMemReservationBytes();
}
rootFragment.computePipelineMembership();
Preconditions.checkState(maxPerHostPeakResources.getMemEstimateBytes() >= 0,
maxPerHostPeakResources.getMemEstimateBytes());
Preconditions.checkState(maxPerHostPeakResources.getMinMemReservationBytes() >= 0,
maxPerHostPeakResources.getMinMemReservationBytes());
maxPerHostPeakResources = MIN_PER_HOST_RESOURCES.max(maxPerHostPeakResources);
request.setPer_host_mem_estimate(maxPerHostPeakResources.getMemEstimateBytes());
request.setMax_per_host_min_mem_reservation(
maxPerHostPeakResources.getMinMemReservationBytes());
request.setMax_per_host_thread_reservation(
maxPerHostPeakResources.getThreadReservation());
if (getAnalysisResult().isQueryStmt()) {
request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
.getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
DEDICATED_COORD_SAFETY_BUFFER_BYTES));
} else {
// For queries that don't have a coordinator fragment, estimate a small
// amount of memory that the query state spwaned on the coordinator can use.
request.setDedicated_coord_mem_estimate(totalRuntimeFilterMemBytes +
DEDICATED_COORD_SAFETY_BUFFER_BYTES);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Max per-host min reservation: " +
maxPerHostPeakResources.getMinMemReservationBytes());
LOG.trace("Max estimated per-host memory: " +
maxPerHostPeakResources.getMemEstimateBytes());
LOG.trace("Max estimated per-host thread reservation: " +
maxPerHostPeakResources.getThreadReservation());
}
}
/**
* Traverses the plan tree rooted at 'root' and inverts joins in the following
* situations:
* 1. If the left-hand side is a SingularRowSrcNode then we invert the join because
* then the build side is guaranteed to have only a single row.
* 2. There is no backend support for distributed non-equi right outer/semi joins,
* so we invert them (any distributed left semi/outer join is ok).
* 3. If we estimate that the inverted join is cheaper (see isInvertedJoinCheaper()).
* Do not invert if relevant stats are missing.
* The first two inversion rules are independent of the presence/absence of stats.
* Left Null Aware Anti Joins are never inverted due to lack of backend support.
* Joins that originate from query blocks with a straight join hint are not inverted.
* The 'isLocalPlan' parameter indicates whether the plan tree rooted at 'root'
* will be executed locally within one machine, i.e., without any data exchanges.
*/
private void invertJoins(PlanNode root, boolean isLocalPlan) {
if (root instanceof SubplanNode) {
invertJoins(root.getChild(0), isLocalPlan);
invertJoins(root.getChild(1), true);
} else {
for (PlanNode child: root.getChildren()) invertJoins(child, isLocalPlan);
}
if (root instanceof JoinNode) {
JoinNode joinNode = (JoinNode) root;
JoinOperator joinOp = joinNode.getJoinOp();
if (!joinNode.isInvertible(isLocalPlan)) {
// Re-compute tuple ids since their order must correspond to the order
// of children.
root.computeTupleIds();
return;
}
if (joinNode.getChild(0) instanceof SingularRowSrcNode) {
// Always place a singular row src on the build side because it
// only produces a single row.
joinNode.invertJoin();
} else if (!isLocalPlan && joinNode instanceof NestedLoopJoinNode &&
(joinOp.isRightSemiJoin() || joinOp.isRightOuterJoin())) {
// The current join is a distributed non-equi right outer or semi join
// which has no backend support. Invert the join to make it executable.
joinNode.invertJoin();
} else if (isInvertedJoinCheaper(joinNode, isLocalPlan)) {
joinNode.invertJoin();
}
}
// Re-compute tuple ids because the backend assumes that their order corresponds to
// the order of children.
root.computeTupleIds();
}
/**
* Return true if we estimate that 'joinNode' will be cheaper to execute after
* inversion. Returns false if any join input is missing relevant stats.
*
* For nested loop joins, we simply assume that the cost is determined by the size of
* the build side.
*
* For hash joins, the cost model is more nuanced and depends on:
* - est. number of rows in the build and probe: lhsCard and rhsCard
* - est. size of the rows in the build and probe: lhsAvgRowSize and rhsAvgRowSize
* - est. parallelism with which the lhs and rhs trees execute: lhsNumNodes
* and rhsNumNodes. The parallelism of the join is determined by the lhs.
*
* The assumptions are:
* - the join strategy is PARTITIONED and rows are distributed evenly. We don't know
* what join strategy will be chosen until later in planning so this assumption
* simplifies the analysis. Generally if one input is small enough that broadcast
* join is viable then this formula will prefer to put that input on the right side
* anyway.
* - processing a build row is twice as expensive as processing a probe row of the
* same size.
* - the cost of processing each byte of a row has a fixed component (C) (e.g.
* hashing and comparing the row) and a variable component (e.g. looking up the
* hash table).
* - The variable component grows proportionally to the log of the build side, to
* approximate the effect of accesses to the the hash table hitting slower levels
* of the memory hierarchy.
*
* The estimated per-host cost of a hash join before and after inversion, measured in
* an arbitrary unit of time, is then:
*
* (log_b(rhsBytes) + C) * (lhsBytes + 2 * rhsBytes) / lhsNumNodes
* vs.
* (log_b(lhsBytes) + C) * (rhsBytes + 2 * lhsBytes) / rhsNumNodes
*
* where lhsBytes = lhsCard * lhsAvgRowSize and rhsBytes = rhsCard * rhsAvgRowSize
*
* We choose b = 10 and C = 5 empirically because it seems to give reasonable
* results for a range of inputs. The model is not particularly sensitive to the
* parameters.
*
* If the parallelism of both sides is the same then this reduces to comparing
* the size of input on both sides. Otherwise, if inverting a hash join reduces
* parallelism significantly, then a significant difference between lhs and rhs
* bytes is needed to justify inversion.
*/
private boolean isInvertedJoinCheaper(JoinNode joinNode, boolean isLocalPlan) {
long lhsCard = joinNode.getChild(0).getCardinality();
long rhsCard = joinNode.getChild(1).getCardinality();
// Need cardinality estimates to make a decision.
if (lhsCard == -1 || rhsCard == -1) return false;
double lhsBytes = lhsCard * joinNode.getChild(0).getAvgRowSize();
double rhsBytes = rhsCard * joinNode.getChild(1).getAvgRowSize();
if (joinNode instanceof NestedLoopJoinNode) {
// For NLJ, simply try to minimize the size of the build side, since it needs to
// be broadcast to all participating nodes.
return lhsBytes < rhsBytes;
}
Preconditions.checkState(joinNode instanceof HashJoinNode);
int lhsNumNodes = isLocalPlan ? 1 : joinNode.getChild(0).getNumNodes();
int rhsNumNodes = isLocalPlan ? 1 : joinNode.getChild(1).getNumNodes();
// Need parallelism to determine whether inverting a hash join is profitable.
if (lhsNumNodes <= 0 || rhsNumNodes <= 0) return false;
final long CONSTANT_COST_PER_BYTE = 5;
// Add 1 to the log argument to avoid taking log of 0.
double totalCost =
(Math.log10(rhsBytes + 1) + CONSTANT_COST_PER_BYTE) * (lhsBytes + 2 * rhsBytes);
double invertedTotalCost =
(Math.log10(lhsBytes + 1) + CONSTANT_COST_PER_BYTE) * (rhsBytes + 2 * lhsBytes);
double perNodeCost = totalCost / lhsNumNodes;
double invertedPerNodeCost = invertedTotalCost / rhsNumNodes;
if (LOG.isTraceEnabled()) {
LOG.trace("isInvertedJoinCheaper() " + TupleId.printIds(joinNode.getTupleIds()));
LOG.trace("lhsCard " + lhsCard + " lhsBytes " + lhsBytes +
" lhsNumNodes " + lhsNumNodes);
LOG.trace("rhsCard " + rhsCard + " rhsBytes " + rhsBytes +
" rhsNumNodes " + rhsNumNodes);
LOG.trace("cost " + perNodeCost + " invCost " + invertedPerNodeCost);
LOG.trace("INVERT? " + (invertedPerNodeCost < perNodeCost));
}
return invertedPerNodeCost < perNodeCost;
}
/**
* Converts hash joins to nested-loop joins if the right-side is a SingularRowSrcNode.
* Does not convert Null Aware Anti Joins because we only support that join op with
* a hash join.
* Throws if JoinNode.init() fails on the new nested-loop join node.
*/
private PlanNode useNljForSingularRowBuilds(PlanNode root, Analyzer analyzer)
throws ImpalaException {
for (int i = 0; i < root.getChildren().size(); ++i) {
root.setChild(i, useNljForSingularRowBuilds(root.getChild(i), analyzer));
}
if (!(root instanceof JoinNode)) return root;
if (root instanceof NestedLoopJoinNode) return root;
if (!(root.getChild(1) instanceof SingularRowSrcNode)) return root;
JoinNode joinNode = (JoinNode) root;
if (joinNode.getJoinOp().isNullAwareLeftAntiJoin()) {
Preconditions.checkState(joinNode instanceof HashJoinNode);
return root;
}
List<Expr> otherJoinConjuncts = Lists.newArrayList(joinNode.getOtherJoinConjuncts());
otherJoinConjuncts.addAll(joinNode.getEqJoinConjuncts());
JoinNode newJoinNode = new NestedLoopJoinNode(joinNode.getChild(0),
joinNode.getChild(1), joinNode.isStraightJoin(),
joinNode.getDistributionModeHint(), joinNode.getJoinOp(), otherJoinConjuncts);
newJoinNode.getConjuncts().addAll(joinNode.getConjuncts());
newJoinNode.setId(joinNode.getId());
newJoinNode.init(analyzer);
return newJoinNode;
}
private void checkForSmallQueryOptimization(PlanNode singleNodePlan) {
MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
singleNodePlan.accept(visitor);
// TODO: IMPALA-3335: support the optimization for plans with joins.
if (!visitor.valid() || visitor.foundJoinNode()) return;
// This optimization executes the plan on a single node so the threshold must
// be based on the total number of rows processed.
long maxRowsProcessed = visitor.getMaxRowsProcessed();
int threshold = ctx_.getQueryOptions().exec_single_node_rows_threshold;
if (maxRowsProcessed < threshold) {
// Execute on a single node and disable codegen for small results
LOG.trace("Query is small enough to execute on a single node: maxRowsProcessed = "
+ maxRowsProcessed);
ctx_.getQueryOptions().setNum_nodes(1);
ctx_.getQueryCtx().disable_codegen_hint = true;
if (maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) {
// Only one scanner thread for small queries
ctx_.getQueryOptions().setNum_scanner_threads(1);
}
// disable runtime filters
ctx_.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF);
}
}
private void checkForDisableCodegen(PlanNode distributedPlan) {
MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
distributedPlan.accept(visitor);
if (!visitor.valid()) return;
// This heuristic threshold tries to determine if the per-node codegen time will
// reduce per-node execution time enough to justify the cost of codegen. Per-node
// execution time is correlated with the number of rows flowing through the plan.
if (visitor.getMaxRowsProcessedPerNode()
< ctx_.getQueryOptions().getDisable_codegen_rows_threshold()) {
ctx_.getQueryCtx().disable_codegen_hint = true;
}
}
/**
* Insert a sort node on top of the plan, depending on the clustered/noclustered
* plan hint and on the 'sort.columns' table property. If clustering is enabled in
* insertStmt or additional columns are specified in the 'sort.columns' table property,
* then the ordering columns will start with the clustering columns (key columns for
* Kudu tables), so that partitions can be written sequentially in the table sink. Any
* additional non-clustering columns specified by the 'sort.columns' property will be
* added to the ordering columns and after any clustering columns. If no clustering is
* requested and the table does not contain columns in the 'sort.columns' property, then
* no sort node will be added to the plan.
*/
public void createPreInsertSort(InsertStmt insertStmt, PlanFragment inputFragment,
Analyzer analyzer) throws ImpalaException {
List<Expr> orderingExprs = new ArrayList<>();
boolean partialSort = false;
if (insertStmt.getTargetTable() instanceof FeKuduTable) {
// Always sort if the 'clustered' hint is present. Otherwise, don't sort if either
// the 'noclustered' hint is present, or this is a single node exec, or if the
// target table is unpartitioned.
if (insertStmt.hasClusteredHint() || (!insertStmt.hasNoClusteredHint()
&& !ctx_.isSingleNodeExec() && !insertStmt.getPartitionKeyExprs().isEmpty())) {
orderingExprs.add(
KuduUtil.createPartitionExpr(insertStmt, ctx_.getRootAnalyzer()));
orderingExprs.addAll(insertStmt.getPrimaryKeyExprs());
partialSort = true;
}
} else if (insertStmt.requiresClustering()) {
orderingExprs.addAll(insertStmt.getPartitionKeyExprs());
}
orderingExprs.addAll(insertStmt.getSortExprs());
// Ignore constants for the sake of clustering.
Expr.removeConstants(orderingExprs);
if (orderingExprs.isEmpty()) return;
// Build sortinfo to sort by the ordering exprs.
List<Boolean> isAscOrder = Collections.nCopies(orderingExprs.size(), true);
List<Boolean> nullsFirstParams = Collections.nCopies(orderingExprs.size(), false);
SortInfo sortInfo = new SortInfo(orderingExprs, isAscOrder, nullsFirstParams,
insertStmt.getSortingOrder());
sortInfo.createSortTupleInfo(insertStmt.getResultExprs(), analyzer);
sortInfo.getSortTupleDescriptor().materializeSlots();
insertStmt.substituteResultExprs(sortInfo.getOutputSmap(), analyzer);
PlanNode node = null;
if (partialSort) {
node = SortNode.createPartialSortNode(
ctx_.getNextNodeId(), inputFragment.getPlanRoot(), sortInfo);
} else {
node = SortNode.createTotalSortNode(
ctx_.getNextNodeId(), inputFragment.getPlanRoot(), sortInfo, 0);
}
node.init(analyzer);
inputFragment.setPlanRoot(node);
}
}