blob: 25cd44ace82c8b9253528a7e1e47704f83fe59e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.util;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.values.PValue;
/** A DOT renderer for BEAM {@link Pipeline} DAG. */
public class PipelineDotRenderer implements Pipeline.PipelineVisitor {
public static String toDotString(Pipeline pipeline) {
final PipelineDotRenderer visitor = new PipelineDotRenderer();
visitor.begin();
pipeline.traverseTopologically(visitor);
visitor.end();
return visitor.dotBuilder.toString();
}
private final StringBuilder dotBuilder = new StringBuilder();
private final Map<TransformHierarchy.Node, Integer> nodeToId = new HashMap<>();
private final Map<PValue, Integer> valueToProducerNodeId = new HashMap<>();
private int indent;
private int nextNodeId;
private PipelineDotRenderer() {}
@Override
public void enterPipeline(Pipeline p) {}
@Override
public void leavePipeline(Pipeline pipeline) {}
@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
writeLine("subgraph cluster_%d {", nextNodeId++);
enterBlock();
writeLine("label = \"%s\"", escapeString(node.getFullName()));
return CompositeBehavior.ENTER_TRANSFORM;
}
@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) {
exitBlock();
writeLine("}");
}
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
final int nodeId = nextNodeId++;
writeLine("%d [label=\"%s\"]", nodeId, escapeString(node.getTransform().getName()));
node.getOutputs()
.values()
.forEach(
x -> {
valueToProducerNodeId.put(x, nodeId);
});
node.getInputs()
.forEach(
(key, value) -> {
final int producerId = valueToProducerNodeId.get(value);
String style = "solid";
if (node.getTransform().getAdditionalInputs().containsKey(key)) {
style = "dashed";
}
writeLine(
"%d -> %d [style=%s label=\"%s\"]",
producerId, nodeId, style, escapeString(shortenTag(key.getId())));
});
}
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {}
private void begin() {
writeLine("digraph {");
enterBlock();
writeLine("rankdir=LR");
}
private void end() {
exitBlock();
writeLine("}");
}
private void enterBlock() {
indent += 4;
}
private void exitBlock() {
indent -= 4;
}
private void writeLine(String format, Object... args) {
if (indent != 0) {
dotBuilder.append(String.format("%-" + indent + "s", ""));
}
dotBuilder.append(String.format(format, args));
dotBuilder.append("\n");
}
private static String escapeString(String x) {
return x.replace("\"", "\\\"");
}
private static String shortenTag(String tag) {
return tag.replaceFirst(".*:([a-zA-Z#0-9]+).*", "$1");
}
}