blob: fa46efd455d2049b67e969c74476103802b1d841 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.service.modules.flowgraph.pathfinder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
import org.apache.gobblin.service.modules.flow.FlowEdgeContext;
import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
/**
* An implementation of {@link PathFinder} that assumes an unweighted {@link FlowGraph} and computes the
* shortest path using a variant of the BFS path-finding algorithm. This implementation has two key differences from the
* traditional BFS implementations:
* <ul>
* <p> the input graph is a multi-graph i.e. there could be multiple edges between each pair of nodes, and </p>
* <p> each edge has a label associated with it. In our case, the label corresponds to the set of input/output
* dataset descriptors that are accepted by the edge.</p>
* </ul>
* Given these differences, we maintain:
* <p> a {@link HashMap} of list of visited edges, as opposed to list of visited
* vertices as in the case of traditional BFS, and </p>
* <p> for each edge, we maintain additional state that includes the input/output dataset descriptor
* associated with the particular visitation of that edge. </p>
* This additional information allows us to accurately mark edges as visited and guarantee termination of the algorithm.
*/
@Alpha
@Slf4j
public class BFSPathFinder extends AbstractPathFinder {
/**
* Constructor.
* @param flowGraph
*/
public BFSPathFinder(FlowGraph flowGraph, FlowSpec flowSpec)
throws ReflectiveOperationException {
super(flowGraph, flowSpec);
}
/**
* A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s
* to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are
* added first to the queue. This ensures that dataset transformations are always performed closest to the source.
* @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode.
*/
public List<FlowEdgeContext> findPathUnicast(DataNode destNode) {
//Initialization of auxiliary data structures used for path computation
this.pathMap = new HashMap<>();
//Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
// flow graph.
//Base condition 1: Source Node or Dest Node is inactive; return null
if (!srcNode.isActive() || !destNode.isActive()) {
log.warn("Either source node {} or destination node {} is inactive; skipping path computation.",
this.srcNode.getId(), destNode.getId());
return null;
}
//Base condition 2: Check if we are already at the target. If so, return an empty path.
if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) {
return new ArrayList<>(0);
}
LinkedList<FlowEdgeContext> edgeQueue =
new LinkedList<>(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
for (FlowEdgeContext flowEdgeContext : edgeQueue) {
this.pathMap.put(flowEdgeContext, flowEdgeContext);
}
//At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges
// to the edge E. For each adjacent edge E', do the following:
// 1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and
// 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the
// edge E'. If yes, add the edge E' to the edge queue.
// If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration.
while (!edgeQueue.isEmpty()) {
FlowEdgeContext flowEdgeContext = edgeQueue.pop();
DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest());
DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor();
//Are we done?
if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) {
return constructPath(flowEdgeContext);
}
//Expand the currentNode to its adjacent edges and add them to the queue.
List<FlowEdgeContext> nextEdges =
getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor);
for (FlowEdgeContext childFlowEdgeContext : nextEdges) {
//Add a pointer from the child edge to the parent edge, if the child edge is not already in the
// queue.
if (!this.pathMap.containsKey(childFlowEdgeContext)) {
edgeQueue.add(childFlowEdgeContext);
this.pathMap.put(childFlowEdgeContext, flowEdgeContext);
}
}
}
//No path found. Return null.
return null;
}
}