blob: 72b3cee058614a67fd75614ed7a6fb0fdce8679f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.execution;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* The JobGraph is the physical execution graph for a multi-stage Samza application.
* It contains the topology of jobs connected with source/sink/intermediate streams.
* High level APIs are transformed into JobGraph for planning, validation and execution.
* Source/sink streams are external streams while intermediate streams are created and managed by Samza.
* Note that intermediate streams are both the input and output of a JobNode in JobGraph.
* So the graph may have cycles and it's not a DAG.
/* package private */ class JobGraph implements ExecutionPlan {
private static final Logger log = LoggerFactory.getLogger(JobGraph.class);
private final Map<String, JobNode> nodes = new HashMap<>();
private final Map<String, StreamEdge> edges = new HashMap<>();
private final Set<StreamEdge> inputStreams = new HashSet<>();
private final Set<StreamEdge> outputStreams = new HashSet<>();
private final Set<StreamEdge> intermediateStreams = new HashSet<>();
private final Set<StreamEdge> sideInputStreams = new HashSet<>();
private final Set<TableDescriptor> tables = new HashSet<>();
private final Config config;
private final JobGraphJsonGenerator jsonGenerator;
private final JobNodeConfigurationGenerator configGenerator;
private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
* The JobGraph is only constructed by the {@link ExecutionPlanner}.
* @param config configuration for the application
* @param appDesc {@link ApplicationDescriptorImpl} describing the application
JobGraph(Config config, ApplicationDescriptorImpl appDesc) {
this.config = config;
this.appDesc = appDesc;
this.jsonGenerator = new JobGraphJsonGenerator();
this.configGenerator = new JobNodeConfigurationGenerator();
public List<JobConfig> getJobConfigs() {
String json = "";
try {
json = getPlanAsJson();
} catch (Exception e) {
log.warn("Failed to generate plan JSON", e);
final String planJson = json;
return getJobNodes().stream().map(n -> n.generateConfig(planJson)).collect(Collectors.toList());
public List<StreamSpec> getIntermediateStreams() {
return getIntermediateStreamEdges().stream()
public String getPlanAsJson() throws Exception {
return jsonGenerator.toJson(this);
* Returns the config for this application
* @return {@link ApplicationConfig}
public ApplicationConfig getApplicationConfig() {
return new ApplicationConfig(config);
* Add a source stream to a {@link JobNode}
* @param streamSpec input stream
* @param node the job node that consumes from the streamSpec
void addInputStream(StreamSpec streamSpec, JobNode node) {
StreamEdge edge = getOrCreateStreamEdge(streamSpec);
* Add an output stream to a {@link JobNode}
* @param streamSpec output stream
* @param node the job node that outputs to the output stream
void addOutputStream(StreamSpec streamSpec, JobNode node) {
StreamEdge edge = getOrCreateStreamEdge(streamSpec);
* Add an intermediate stream from source to target {@link JobNode}
* @param streamSpec intermediate stream
* @param from the source node
* @param to the target node
void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) {
StreamEdge edge = getOrCreateStreamEdge(streamSpec, true);
void addTable(TableDescriptor tableDescriptor, JobNode node) {
* Add a side-input stream to graph
* @param streamSpec side-input stream
void addSideInputStream(StreamSpec streamSpec) {
StreamEdge edge = getOrCreateStreamEdge(streamSpec, false);
* Get the {@link JobNode}. Create one if it does not exist.
* @param jobName name of the job
* @param jobId id of the job
* @return {@link JobNode} created with {@code jobName} and {@code jobId}
JobNode getOrCreateJobNode(String jobName, String jobId) {
String nodeId = JobNode.createJobNameAndId(jobName, jobId);
return nodes.computeIfAbsent(nodeId, k -> new JobNode(jobName, jobId, config, appDesc, configGenerator));
* Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist.
* @param streamSpec spec of the StreamEdge
* @return stream edge
StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec) {
return getOrCreateStreamEdge(streamSpec, false);
* Returns the {@link ApplicationDescriptorImpl} of this graph.
* @return Application descriptor implementation
ApplicationDescriptorImpl<? extends ApplicationDescriptor> getApplicationDescriptorImpl() {
return appDesc;
* Get the {@link StreamEdge} for {@code streamId}.
* @param streamId the streamId for the {@link StreamEdge}
* @return stream edge
StreamEdge getStreamEdge(String streamId) {
return edges.get(streamId);
* Returns the job nodes to be executed in the topological order
* @return unmodifiable list of {@link JobNode}
List<JobNode> getJobNodes() {
List<JobNode> sortedNodes = topologicalSort();
return Collections.unmodifiableList(sortedNodes);
* Returns the input streams in the graph
* @return unmodifiable set of {@link StreamEdge}
Set<StreamEdge> getInputStreams() {
return Collections.unmodifiableSet(inputStreams);
* Returns the side-input streams in the graph
* @return unmodifiable set of {@link StreamEdge}
Set<StreamEdge> getSideInputStreams() {
return Collections.unmodifiableSet(sideInputStreams);
* Returns the output streams in the graph
* @return unmodifiable set of {@link StreamEdge}
Set<StreamEdge> getOutputStreams() {
return Collections.unmodifiableSet(outputStreams);
* Returns the tables in the graph
* @return unmodifiable set of {@link TableDescriptor}
Set<TableDescriptor> getTables() {
return Collections.unmodifiableSet(tables);
* Returns the intermediate streams in the graph
* @return unmodifiable set of {@link StreamEdge}
Set<StreamEdge> getIntermediateStreamEdges() {
return Collections.unmodifiableSet(intermediateStreams);
* Validate the graph has the correct topology, meaning the input streams are coming from external streams,
* output streams are going to external streams, and the nodes are connected with intermediate streams.
* Also validate all the nodes are reachable from the input streams.
void validate() {
* Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist.
* @param streamSpec spec of the StreamEdge
* @param isIntermediate boolean flag indicating whether it's an intermediate stream
* @return stream edge
private StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec, boolean isIntermediate) {
String streamId = streamSpec.getId();
StreamEdge edge = edges.get(streamId);
if (edge == null) {
boolean isBroadcast = appDesc.getIntermediateBroadcastStreamIds().contains(streamId);
edge = new StreamEdge(streamSpec, isIntermediate, isBroadcast, config);
edges.put(streamId, edge);
return edge;
* Validate the input streams should have indegree being 0 and outdegree greater than 0
private void validateInputStreams() {
inputStreams.forEach(edge -> {
if (!edge.getSourceNodes().isEmpty()) {
throw new IllegalArgumentException(
String.format("Source stream %s should not have producers.", edge.getName()));
if (edge.getTargetNodes().isEmpty()) {
throw new IllegalArgumentException(
String.format("Source stream %s should have consumers.", edge.getName()));
* Validate the output streams should have outdegree being 0 and indegree greater than 0
private void validateOutputStreams() {
outputStreams.forEach(edge -> {
if (!edge.getTargetNodes().isEmpty()) {
throw new IllegalArgumentException(
String.format("Sink stream %s should not have consumers", edge.getName()));
if (edge.getSourceNodes().isEmpty()) {
throw new IllegalArgumentException(
String.format("Sink stream %s should have producers", edge.getName()));
* Validate the internal streams should have both indegree and outdegree greater than 0
private void validateInternalStreams() {
Set<StreamEdge> internalEdges = new HashSet<>(edges.values());
internalEdges.forEach(edge -> {
if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) {
throw new IllegalArgumentException(
String.format("Internal stream %s should have both producers and consumers", edge.getName()));
* Validate all nodes are reachable by input streams.
private void validateReachability() {
// validate all nodes are reachable from the input streams
final Set<JobNode> reachable = findReachable();
if (reachable.size() != nodes.size()) {
Set<JobNode> unreachable = new HashSet<>(nodes.values());
throw new IllegalArgumentException(String.format("Jobs %s cannot be reached from Sources.",
String.join(", ",;
* Find the reachable set of nodes using BFS.
* @return reachable set of {@link JobNode}
Set<JobNode> findReachable() {
Queue<JobNode> queue = new ArrayDeque<>();
Set<JobNode> visited = new HashSet<>();
inputStreams.forEach(input -> {
List<JobNode> next = input.getTargetNodes();
while (!queue.isEmpty()) {
JobNode node = queue.poll();
node.getOutEdges().values().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> {
if (!visited.contains(target)) {
return visited;
* An variation of Kahn's algorithm of topological sorting.
* This algorithm also takes account of the simple loops in the graph
* @return topologically sorted {@link JobNode}s
List<JobNode> topologicalSort() {
Collection<JobNode> pnodes = nodes.values();
if (pnodes.size() == 1) {
return new ArrayList<>(pnodes);
Queue<JobNode> q = new ArrayDeque<>();
Map<String, Long> indegree = new HashMap<>();
Set<JobNode> visited = new HashSet<>();
pnodes.forEach(node -> {
String nid = node.getJobNameAndId();
//only count the degrees of intermediate streams
long degree = node.getInEdges().values().stream().filter(e -> !inputStreams.contains(e)).count();
indegree.put(nid, degree);
if (degree == 0L) {
// start from the nodes that has no intermediate input streams, so it only consumes from input streams
List<JobNode> sortedNodes = new ArrayList<>();
Set<JobNode> reachable = new HashSet<>();
while (sortedNodes.size() < pnodes.size()) {
// Here we use indegree-based approach to implment Kahn's algorithm for topological sort
// This approach will not change the graph itself during computation.
// The algorithm works as:
// 1. start with nodes with no incoming edges (in degree being 0) and inserted into the list
// 2. remove the edge from any node in the list to its connected nodes by changing the indegree of the connected nodes.
// 3. add any new nodes with ingree being 0
// 4. loop 1-3 until no more nodes with indegree 0
while (!q.isEmpty()) {
JobNode node = q.poll();
node.getOutEdges().values().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> {
String nid = n.getJobNameAndId();
Long degree = indegree.get(nid) - 1;
indegree.put(nid, degree);
if (degree == 0L && !visited.contains(n)) {
if (sortedNodes.size() < pnodes.size()) {
// The remaining nodes have cycles
// use the following approach to break the cycles
// start from the nodes that are reachable from previous traverse
if (!reachable.isEmpty()) {
//find out the nodes with minimal input edge
long min = Long.MAX_VALUE;
JobNode minNode = null;
for (JobNode node : reachable) {
Long degree = indegree.get(node.getJobNameAndId());
if (degree < min) {
min = degree;
minNode = node;
// start from the node with minimal input edge again
} else {
// all the remaining nodes should be reachable from input streams
// start from input streams again to find the next node that hasn't been visited
JobNode nextNode = -> input.getTargetNodes().stream())
.filter(node -> !visited.contains(node))
return sortedNodes;