blob: ed3cbd5c895e500a89092b95c5a5f24587be7fae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.traversals.BinaryUnionReplacer;
import org.apache.flink.optimizer.traversals.BranchesVisitor;
import org.apache.flink.optimizer.traversals.GraphCreatingVisitor;
import org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor;
import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
import org.apache.flink.optimizer.traversals.PlanFinalizer;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.Plan;
import org.apache.flink.optimizer.costs.CostEstimator;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.dag.OptimizerNode;
import org.apache.flink.optimizer.dag.SinkJoiner;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.postpass.OptimizerPostPass;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The optimizer that takes the user specified program plan and creates an optimized plan that contains
* exact descriptions about how the physical execution will take place. It first translates the user
* program into an internal optimizer representation and then chooses between different alternatives
* for shipping strategies and local strategies.
* <p>
* The basic principle is taken from optimizer works in systems such as Volcano/Cascades and Selinger/System-R/DB2. The
* optimizer walks from the sinks down, generating interesting properties, and ascends from the sources generating
* alternative plans, pruning against the interesting properties.
* <p>
* The optimizer also assigns the memory to the individual tasks. This is currently done in a very simple fashion: All
* sub-tasks that need memory (e.g. reduce or join) are given an equal share of memory.
*/
public class Optimizer {
// ------------------------------------------------------------------------
// Constants
// ------------------------------------------------------------------------
/**
* Compiler hint key for the input channel's shipping strategy. This String is a key to the operator's stub
* parameters. The corresponding value tells the compiler which shipping strategy to use for the input channel.
* If the operator has two input channels, the shipping strategy is applied to both input channels.
*/
public static final String HINT_SHIP_STRATEGY = "INPUT_SHIP_STRATEGY";
/**
* Compiler hint key for the <b>first</b> input channel's shipping strategy. This String is a key to
* the operator's stub parameters. The corresponding value tells the compiler which shipping strategy
* to use for the <b>first</b> input channel. Only applicable to operators with two inputs.
*/
public static final String HINT_SHIP_STRATEGY_FIRST_INPUT = "INPUT_LEFT_SHIP_STRATEGY";
/**
* Compiler hint key for the <b>second</b> input channel's shipping strategy. This String is a key to
* the operator's stub parameters. The corresponding value tells the compiler which shipping strategy
* to use for the <b>second</b> input channel. Only applicable to operators with two inputs.
*/
public static final String HINT_SHIP_STRATEGY_SECOND_INPUT = "INPUT_RIGHT_SHIP_STRATEGY";
/**
* Value for the shipping strategy compiler hint that enforces a <b>Forward</b> strategy on the
* input channel, i.e. no redistribution of any kind.
*
* @see #HINT_SHIP_STRATEGY
* @see #HINT_SHIP_STRATEGY_FIRST_INPUT
* @see #HINT_SHIP_STRATEGY_SECOND_INPUT
*/
public static final String HINT_SHIP_STRATEGY_FORWARD = "SHIP_FORWARD";
/**
* Value for the shipping strategy compiler hint that enforces a random repartition strategy.
*
* @see #HINT_SHIP_STRATEGY
* @see #HINT_SHIP_STRATEGY_FIRST_INPUT
* @see #HINT_SHIP_STRATEGY_SECOND_INPUT
*/
public static final String HINT_SHIP_STRATEGY_REPARTITION= "SHIP_REPARTITION";
/**
* Value for the shipping strategy compiler hint that enforces a hash-partition strategy.
*
* @see #HINT_SHIP_STRATEGY
* @see #HINT_SHIP_STRATEGY_FIRST_INPUT
* @see #HINT_SHIP_STRATEGY_SECOND_INPUT
*/
public static final String HINT_SHIP_STRATEGY_REPARTITION_HASH = "SHIP_REPARTITION_HASH";
/**
* Value for the shipping strategy compiler hint that enforces a range-partition strategy.
*
* @see #HINT_SHIP_STRATEGY
* @see #HINT_SHIP_STRATEGY_FIRST_INPUT
* @see #HINT_SHIP_STRATEGY_SECOND_INPUT
*/
public static final String HINT_SHIP_STRATEGY_REPARTITION_RANGE = "SHIP_REPARTITION_RANGE";
/**
* Value for the shipping strategy compiler hint that enforces a <b>broadcast</b> strategy on the
* input channel.
*
* @see #HINT_SHIP_STRATEGY
* @see #HINT_SHIP_STRATEGY_FIRST_INPUT
* @see #HINT_SHIP_STRATEGY_SECOND_INPUT
*/
public static final String HINT_SHIP_STRATEGY_BROADCAST = "SHIP_BROADCAST";
/**
* Compiler hint key for the operator's local strategy. This String is a key to the operator's stub
* parameters. The corresponding value tells the compiler which local strategy to use to process the
* data inside one partition.
* <p>
* This hint is ignored by operators that do not have a local strategy (such as <i>Map</i>), or by operators that
* have no choice in their local strategy (such as <i>Cross</i>).
*/
public static final String HINT_LOCAL_STRATEGY = "LOCAL_STRATEGY";
/**
* Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy.
* For example, a <i>Reduce</i> operator will sort the data to group it.
*
* @see #HINT_LOCAL_STRATEGY
*/
public static final String HINT_LOCAL_STRATEGY_SORT = "LOCAL_STRATEGY_SORT";
/**
* Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy.
* During sorting a combine method is repeatedly applied to reduce the data volume.
* For example, a <i>Reduce</i> operator will sort the data to group it.
*
* @see #HINT_LOCAL_STRATEGY
*/
public static final String HINT_LOCAL_STRATEGY_COMBINING_SORT = "LOCAL_STRATEGY_COMBINING_SORT";
/**
* Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy on both
* inputs with subsequent merging of inputs.
* For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs
* of matching keys.
*
* @see #HINT_LOCAL_STRATEGY
*/
public static final String HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE = "LOCAL_STRATEGY_SORT_BOTH_MERGE";
/**
* Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy.
* The first input is sorted, the second input is assumed to be sorted. After sorting both inputs are merged.
* For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs
* of matching keys.
*
* @see #HINT_LOCAL_STRATEGY
*/
public static final String HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE = "LOCAL_STRATEGY_SORT_FIRST_MERGE";
/**
* Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy.
* The second input is sorted, the first input is assumed to be sorted. After sorting both inputs are merged.
* For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs
* of matching keys.
*
* @see #HINT_LOCAL_STRATEGY
*/
public static final String HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE = "LOCAL_STRATEGY_SORT_SECOND_MERGE";
/**
* Value for the local strategy compiler hint that enforces a <b>merge based</b> local strategy.
* Both inputs are assumed to be sorted and are merged.
* For example, a <i>Match</i> or <i>CoGroup</i> operator will use a merge strategy to find pairs
* of matching keys.
*
* @see #HINT_LOCAL_STRATEGY
*/
public static final String HINT_LOCAL_STRATEGY_MERGE = "LOCAL_STRATEGY_MERGE";
/**
* Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy.
* For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of
* matching keys. The <b>first</b> input will be used to build the hash table, the second input will be
* used to probe the table.
*
* @see #HINT_LOCAL_STRATEGY
*/
public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST = "LOCAL_STRATEGY_HASH_BUILD_FIRST";
/**
* Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy.
* For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of
* matching keys. The <b>second</b> input will be used to build the hash table, the first input will be
* used to probe the table.
*
* @see #HINT_LOCAL_STRATEGY
*/
public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND = "LOCAL_STRATEGY_HASH_BUILD_SECOND";
/**
* Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
* A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops.
* Hence, the data of the first input will be is streamed though, while the data of the second input is stored on
* disk
* and repeatedly read.
*
* @see #HINT_LOCAL_STRATEGY
*/
public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST";
/**
* Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
* A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops.
* Hence, the data of the second input will be is streamed though, while the data of the first input is stored on
* disk
* and repeatedly read.
*
* @see #HINT_LOCAL_STRATEGY
*/
public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND";
/**
* Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
* A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops.
* Further more, the first input, being the outer side, will be processed in blocks, and for each block, the second
* input,
* being the inner side, will read repeatedly from disk.
*
* @see #HINT_LOCAL_STRATEGY
*/
public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST";
/**
* Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
* A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops.
* Further more, the second input, being the outer side, will be processed in blocks, and for each block, the first
* input,
* being the inner side, will read repeatedly from disk.
*
* @see #HINT_LOCAL_STRATEGY
*/
public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND";
/**
* The log handle that is used by the compiler to log messages.
*/
public static final Logger LOG = LoggerFactory.getLogger(Optimizer.class);
// ------------------------------------------------------------------------
// Members
// ------------------------------------------------------------------------
/**
* The statistics object used to obtain statistics, such as input sizes,
* for the cost estimation process.
*/
private final DataStatistics statistics;
/**
* The cost estimator used by the compiler.
*/
private final CostEstimator costEstimator;
/**
* The default parallelism for jobs compiled by this compiler.
*/
private int defaultParallelism;
// ------------------------------------------------------------------------
// Constructor & Setup
// ------------------------------------------------------------------------
/**
* Creates a new optimizer instance. The optimizer has no access to statistics about the
* inputs and can hence not determine any properties. It will perform all optimization with
* unknown sizes and hence use only the heuristic cost functions, which result in the selection
* of the most robust execution strategies.
*/
public Optimizer(Configuration config) {
this(null, new DefaultCostEstimator(), config);
}
/**
* Creates a new optimizer instance that uses the statistics object to determine properties about the input.
* Given those statistics, the optimizer can make better choices for the execution strategies.
*
* @param stats
* The statistics to be used to determine the input properties.
*/
public Optimizer(DataStatistics stats, Configuration config) {
this(stats, new DefaultCostEstimator(), config);
}
/**
* Creates a new optimizer instance. The optimizer has no access to statistics about the
* inputs and can hence not determine any properties. It will perform all optimization with
* unknown sizes and hence use only the heuristic cost functions, which result in the selection
* of the most robust execution strategies.
*
* The optimizer uses the given cost estimator to compute the costs of the individual operations.
*
* @param estimator The cost estimator to use to cost the individual operations.
*/
public Optimizer(CostEstimator estimator, Configuration config) {
this(null, estimator, config);
}
/**
* Creates a new optimizer instance that uses the statistics object to determine properties about the input.
* Given those statistics, the optimizer can make better choices for the execution strategies.
*
* The optimizer uses the given cost estimator to compute the costs of the individual operations.
*
* @param stats
* The statistics to be used to determine the input properties.
* @param estimator
* The <tt>CostEstimator</tt> to use to cost the individual operations.
*/
public Optimizer(DataStatistics stats, CostEstimator estimator, Configuration config) {
this.statistics = stats;
this.costEstimator = estimator;
// determine the default parallelism
this.defaultParallelism = config.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
ConfigConstants.DEFAULT_PARALLELISM);
if (defaultParallelism < 1) {
LOG.warn("Config value " + defaultParallelism + " for option "
+ ConfigConstants.DEFAULT_PARALLELISM + " is invalid. Ignoring and using a value of 1.");
this.defaultParallelism = 1;
}
}
// ------------------------------------------------------------------------
// Getters / Setters
// ------------------------------------------------------------------------
public int getDefaultParallelism() {
return defaultParallelism;
}
public void setDefaultParallelism(int defaultParallelism) {
if (defaultParallelism > 0) {
this.defaultParallelism = defaultParallelism;
} else {
throw new IllegalArgumentException("Default parallelism cannot be zero or negative.");
}
}
// ------------------------------------------------------------------------
// Compilation
// ------------------------------------------------------------------------
/**
* Translates the given program to an OptimizedPlan, where all nodes have their local strategy assigned
* and all channels have a shipping strategy assigned.
*
* For more details on the optimization phase, see the comments for
* {@link #compile(org.apache.flink.api.common.Plan, org.apache.flink.optimizer.postpass.OptimizerPostPass)}.
*
* @param program The program to be translated.
* @return The optimized plan.
*
* @throws CompilerException
* Thrown, if the plan is invalid or the optimizer encountered an inconsistent
* situation during the compilation process.
*/
public OptimizedPlan compile(Plan program) throws CompilerException {
final OptimizerPostPass postPasser = getPostPassFromPlan(program);
return compile(program, postPasser);
}
/**
* Translates the given program to an OptimizedPlan. The optimized plan describes for each operator
* which strategy to use (such as hash join versus sort-merge join), what data exchange method to use
* (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined, batch),
* where to cache intermediate results, etc,
*
* The optimization happens in multiple phases:
* <ol>
* <li>Create optimizer dag implementation of the program.
*
* <tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute size estimates.</li>
* <li>Compute interesting properties and auxiliary structures.</li>
* <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting property computation (as
* opposed to the Database approaches), because we support plans that are not trees.</li>
* </ol>
*
* @param program The program to be translated.
* @param postPasser The function to be used for post passing the optimizer's plan and setting the
* data type specific serialization routines.
* @return The optimized plan.
*
* @throws CompilerException
* Thrown, if the plan is invalid or the optimizer encountered an inconsistent
* situation during the compilation process.
*/
private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws CompilerException {
if (program == null || postPasser == null) {
throw new NullPointerException();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Beginning compilation of program '" + program.getJobName() + '\'');
}
final ExecutionMode defaultDataExchangeMode = program.getExecutionConfig().getExecutionMode();
final int defaultParallelism = program.getDefaultParallelism() > 0 ?
program.getDefaultParallelism() : this.defaultParallelism;
// log the default settings
LOG.debug("Using a default parallelism of {}", defaultParallelism);
LOG.debug("Using default data exchange mode {}", defaultDataExchangeMode);
// the first step in the compilation is to create the optimizer plan representation
// this step does the following:
// 1) It creates an optimizer plan node for each operator
// 2) It connects them via channels
// 3) It looks for hints about local strategies and channel types and
// sets the types and strategies accordingly
// 4) It makes estimates about the data volume of the data sources and
// propagates those estimates through the plan
GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
program.accept(graphCreator);
// if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
// each until we have only a single root node. This allows to transparently deal with the nodes with
// multiple outputs
OptimizerNode rootNode;
if (graphCreator.getSinks().size() == 1) {
rootNode = graphCreator.getSinks().get(0);
}
else if (graphCreator.getSinks().size() > 1) {
Iterator<DataSinkNode> iter = graphCreator.getSinks().iterator();
rootNode = iter.next();
while (iter.hasNext()) {
rootNode = new SinkJoiner(rootNode, iter.next());
}
}
else {
throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
}
// now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
// guaranteed memory, for further cost estimations. we assume an equal distribution of memory among consumer tasks
rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
// We are dealing with operator DAGs, rather than operator trees.
// That requires us to deviate at some points from the classical DB optimizer algorithms.
// This step build some auxiliary structures to help track branches and joins in the DAG
BranchesVisitor branchingVisitor = new BranchesVisitor();
rootNode.accept(branchingVisitor);
// Propagate the interesting properties top-down through the graph
InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
rootNode.accept(propsVisitor);
// perform a sanity check: the root may not have any unclosed branches
if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) {
throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not " +
"track the re-joining of branches correctly.");
}
// the final step is now to generate the actual plan alternatives
List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
if (bestPlan.size() != 1) {
throw new CompilerException("Error in compiler: more than one best plan was created!");
}
// check if the best plan's root is a data sink (single sink plan)
// if so, directly take it. if it is a sink joiner node, get its contained sinks
PlanNode bestPlanRoot = bestPlan.get(0);
List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
if (bestPlanRoot instanceof SinkPlanNode) {
bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
} else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
}
// finalize the plan
OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
plan.accept(new BinaryUnionReplacer());
// post pass the plan. this is the phase where the serialization and comparator code is set
postPasser.postPass(plan);
return plan;
}
/**
* This function performs only the first step to the compilation process - the creation of the optimizer
* representation of the plan. No estimations or enumerations of alternatives are done here.
*
* @param program The plan to generate the optimizer representation for.
* @return The optimizer representation of the plan, as a collection of all data sinks
* from the plan can be traversed.
*/
public static List<DataSinkNode> createPreOptimizedPlan(Plan program) {
GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1, null);
program.accept(graphCreator);
return graphCreator.getSinks();
}
// ------------------------------------------------------------------------
// Miscellaneous
// ------------------------------------------------------------------------
private OptimizerPostPass getPostPassFromPlan(Plan program) {
final String className = program.getPostPassClassName();
if (className == null) {
throw new CompilerException("Optimizer Post Pass class description is null");
}
try {
Class<? extends OptimizerPostPass> clazz = Class.forName(className).asSubclass(OptimizerPostPass.class);
try {
return InstantiationUtil.instantiate(clazz, OptimizerPostPass.class);
} catch (RuntimeException rtex) {
// unwrap the source exception
if (rtex.getCause() != null) {
throw new CompilerException("Cannot instantiate optimizer post pass: " + rtex.getMessage(), rtex.getCause());
} else {
throw rtex;
}
}
}
catch (ClassNotFoundException cnfex) {
throw new CompilerException("Cannot load Optimizer post-pass class '" + className + "'.", cnfex);
}
catch (ClassCastException ccex) {
throw new CompilerException("Class '" + className + "' is not an optimizer post-pass.", ccex);
}
}
}