blob: 10542d360466a780ee1a135e97e1b8700316f526 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.traversals;
import org.apache.flink.api.common.Plan;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.IterationPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plan.WorksetPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.util.Visitor;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* This visitor traverses the selected execution plan and finalizes it:
*
* <ul>
* <li>The graph of nodes is double-linked (links from child to parent are inserted).
* <li>If unions join static and dynamic paths, the cache is marked as a memory consumer.
* <li>Relative memory fractions are assigned to all nodes.
* <li>All nodes are collected into a set.
* </ul>
*/
public class PlanFinalizer implements Visitor<PlanNode> {
private final Set<PlanNode> allNodes; // a set of all nodes in the optimizer plan
private final List<SourcePlanNode> sources; // all data source nodes in the optimizer plan
private final List<SinkPlanNode> sinks; // all data sink nodes in the optimizer plan
private final Deque<IterationPlanNode> stackOfIterationNodes;
private int memoryConsumerWeights; // a counter of all memory consumers
/** Creates a new plan finalizer. */
public PlanFinalizer() {
this.allNodes = new HashSet<PlanNode>();
this.sources = new ArrayList<SourcePlanNode>();
this.sinks = new ArrayList<SinkPlanNode>();
this.stackOfIterationNodes = new ArrayDeque<IterationPlanNode>();
}
public OptimizedPlan createFinalPlan(
List<SinkPlanNode> sinks, String jobName, Plan originalPlan) {
this.memoryConsumerWeights = 0;
// traverse the graph
for (SinkPlanNode node : sinks) {
node.accept(this);
}
// assign the memory to each node
if (this.memoryConsumerWeights > 0) {
for (PlanNode node : this.allNodes) {
// assign memory to the driver strategy of the node
final int consumerWeight = node.getMemoryConsumerWeight();
if (consumerWeight > 0) {
final double relativeMem = (double) consumerWeight / this.memoryConsumerWeights;
node.setRelativeMemoryPerSubtask(relativeMem);
if (Optimizer.LOG.isDebugEnabled()) {
Optimizer.LOG.debug(
"Assigned "
+ relativeMem
+ " of total memory to each subtask of "
+ node.getProgramOperator().getName()
+ ".");
}
}
// assign memory to the local and global strategies of the channels
for (Channel c : node.getInputs()) {
if (c.getLocalStrategy().dams()) {
final double relativeMem = 1.0 / this.memoryConsumerWeights;
c.setRelativeMemoryLocalStrategy(relativeMem);
if (Optimizer.LOG.isDebugEnabled()) {
Optimizer.LOG.debug(
"Assigned "
+ relativeMem
+ " of total memory to each local strategy "
+ "instance of "
+ c
+ ".");
}
}
if (c.getTempMode() != TempMode.NONE) {
final double relativeMem = 1.0 / this.memoryConsumerWeights;
c.setRelativeTempMemory(relativeMem);
if (Optimizer.LOG.isDebugEnabled()) {
Optimizer.LOG.debug(
"Assigned "
+ relativeMem
+ " of total memory to each instance of the temp "
+ "table for "
+ c
+ ".");
}
}
}
}
}
return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName, originalPlan);
}
@Override
public boolean preVisit(PlanNode visitable) {
// if we come here again, prevent a further descend
if (!this.allNodes.add(visitable)) {
return false;
}
if (visitable instanceof SinkPlanNode) {
this.sinks.add((SinkPlanNode) visitable);
} else if (visitable instanceof SourcePlanNode) {
this.sources.add((SourcePlanNode) visitable);
} else if (visitable instanceof BinaryUnionPlanNode) {
BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
if (unionNode.unionsStaticAndDynamicPath()) {
unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
}
} else if (visitable instanceof BulkPartialSolutionPlanNode) {
// tell the partial solution about the iteration node that contains it
final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable;
final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
// sanity check!
if (!(iteration instanceof BulkIterationPlanNode)) {
throw new CompilerException(
"Bug: Error finalizing the plan. "
+ "Cannot associate the node for a partial solutions with its containing iteration.");
}
pspn.setContainingIterationNode((BulkIterationPlanNode) iteration);
} else if (visitable instanceof WorksetPlanNode) {
// tell the partial solution about the iteration node that contains it
final WorksetPlanNode wspn = (WorksetPlanNode) visitable;
final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
// sanity check!
if (!(iteration instanceof WorksetIterationPlanNode)) {
throw new CompilerException(
"Bug: Error finalizing the plan. "
+ "Cannot associate the node for a partial solutions with its containing iteration.");
}
wspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
} else if (visitable instanceof SolutionSetPlanNode) {
// tell the partial solution about the iteration node that contains it
final SolutionSetPlanNode sspn = (SolutionSetPlanNode) visitable;
final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
// sanity check!
if (!(iteration instanceof WorksetIterationPlanNode)) {
throw new CompilerException(
"Bug: Error finalizing the plan. "
+ "Cannot associate the node for a partial solutions with its containing iteration.");
}
sspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
}
// double-connect the connections. previously, only parents knew their children, because
// one child candidate could have been referenced by multiple parents.
for (Channel conn : visitable.getInputs()) {
conn.setTarget(visitable);
conn.getSource().addOutgoingChannel(conn);
}
for (Channel c : visitable.getBroadcastInputs()) {
c.setTarget(visitable);
c.getSource().addOutgoingChannel(c);
}
// count the memory consumption
this.memoryConsumerWeights += visitable.getMemoryConsumerWeight();
for (Channel c : visitable.getInputs()) {
if (c.getLocalStrategy().dams()) {
this.memoryConsumerWeights++;
}
if (c.getTempMode() != TempMode.NONE) {
this.memoryConsumerWeights++;
}
}
for (Channel c : visitable.getBroadcastInputs()) {
if (c.getLocalStrategy().dams()) {
this.memoryConsumerWeights++;
}
if (c.getTempMode() != TempMode.NONE) {
this.memoryConsumerWeights++;
}
}
// pass the visitor to the iteration's step function
if (visitable instanceof IterationPlanNode) {
// push the iteration node onto the stack
final IterationPlanNode iterNode = (IterationPlanNode) visitable;
this.stackOfIterationNodes.addLast(iterNode);
// recurse
((IterationPlanNode) visitable).acceptForStepFunction(this);
// pop the iteration node from the stack
this.stackOfIterationNodes.removeLast();
}
return true;
}
@Override
public void postVisit(PlanNode visitable) {}
}