blob: e6102d8563624507b37cd5e3939a2a8dffbbbe4c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.workflow.lite;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.workflow.WorkflowException;
import org.jdom.Element;
import org.jdom.JDOMException;
public class LiteWorkflowValidator {
private static XLog LOG = XLog.getLog(LiteWorkflowValidator.class);
public void validateWorkflow(LiteWorkflowApp app, boolean validateForkJoin) throws WorkflowException {
NodeDef startNode = app.getNode(StartNodeDef.START);
if (startNode == null) {
throw new WorkflowException(ErrorCode.E0700, "no start node"); // shouldn't happen, but just in case...
}
ForkJoinCount forkJoinCount = new ForkJoinCount();
performBasicValidation(app, startNode, new ArrayDeque<String>(), new HashSet<NodeDef>(), forkJoinCount);
if (validateForkJoin) {
// don't validate fork/join pairs if the number of forks and joins mismatch
if (forkJoinCount.forks != forkJoinCount.joins) {
throw new WorkflowException(ErrorCode.E0730);
}
validateForkJoin(app,
startNode,
null,
null,
true,
new ArrayDeque<String>(),
new HashMap<String, String>(),
new HashMap<String, Optional<String>>(),
new HashSet<>());
}
}
/**
* Basic recursive validation of the workflow:
* - it is acyclic, no loops
* - names of the actions follow a specific pattern
* - all nodes have valid transitions
* - it only has supported action nodes
* - there is no node that points to itself
* - counts fork/join nodes
*
* @param app The WorkflowApp
* @param node Current node we're checking
* @param path The list of nodes that we've visited so far in this call chain
* @param checkedNodes The list of nodes that we've already checked. For example, if it's a decision node, then the we
* don't have to re-walk the entire path because it indicates that it've been done before on a separate path
* @param forkJoinCount Number of fork and join nodes
* @throws WorkflowException If there is any of the constraints described above is violated
*/
private void performBasicValidation(LiteWorkflowApp app, NodeDef node, Deque<String> path, Set<NodeDef> checkedNodes,
ForkJoinCount forkJoinCount) throws WorkflowException {
String nodeName = node.getName();
checkActionName(node);
if (node instanceof ActionNodeDef) {
checkActionNode(node);
} else if (node instanceof ForkNodeDef) {
forkJoinCount.forks++;
} else if (node instanceof JoinNodeDef) {
forkJoinCount.joins++;
}
checkCycle(path, nodeName);
path.addLast(nodeName);
List<String> transitions = node.getTransitions();
// Get all transitions and walk the workflow recursively
if (!transitions.isEmpty()) {
for (final String t : transitions) {
NodeDef transitionNode = app.getNode(t);
if (transitionNode == null) {
throw new WorkflowException(ErrorCode.E0708, node.getName(), t);
}
if (!checkedNodes.contains(transitionNode)) {
performBasicValidation(app, transitionNode, path, checkedNodes, forkJoinCount);
checkedNodes.add(transitionNode);
}
}
}
path.remove(nodeName);
}
/**
* This method recursively validates two things:
* - fork/join methods are properly paired
* - there are no multiple "okTo" paths to a given node
*
* Important: this method assumes that the workflow is not acyclic - therefore this must run after performBasicValidation()
*
* @param app The WorkflowApp
* @param node Current node we're checking
* @param currentFork Current fork node (null if we are not under a fork path)
* @param topDecisionParent The top (eldest) decision node along the path to this node, or null if there isn't one
* @param okPath false if node (or an ancestor of node) was gotten to via an "error to" transition or via a join node that has
* already been visited at least once before
* @param forkJoins Map that contains a mapping of fork-join node pairs.
* @param nodeAndDecisionParents Map that contains a mapping of nodes and their eldest decision node
* @param visitedNodes contains the nodes that have been already visited & validated (except Join/End nodes)
* @throws WorkflowException If there is any of the constraints described above is violated
*/
private void validateForkJoin(LiteWorkflowApp app,
NodeDef node,
NodeDef currentFork,
String topDecisionParent,
boolean okPath,
Deque<String> path,
Map<String, String> forkJoins,
Map<String, Optional<String>> nodeAndDecisionParents,
Set<NodeDef> visitedNodes) throws WorkflowException {
final String nodeName = node.getName();
path.addLast(nodeName);
/* If we're walking an "okTo" path and the nodes are not Kill/Join/End, we have to make sure that only a single
* "okTo" path exists to the current node.
*
* The "topDecisionParent" represents the eldest decision in the chain that we've gone through. For example, let's assume
* that D1, D2, D3 are decision nodes and A is an action node.
*
* D1-->D2-->D3---> ... (rest of the WF)
* | | |
* | | |
* | | +----> +---+
* | +---------> | A |
* +-------------> +---+
*
* In this case, there are three "okTo" paths to "A" but it's still a valid workflow because the eldest decision node
* is D1 and during every run, there is only one possible execution path that leads to A (D1->A, D1->D2->A or
* (D1->D2->D3->A). In the code, if we encounter a decision node and we already have one, we don't update it. If it's null
* then we set it to the current decision node we're under.
*
* If the "current" and "top" parents are null, it means that we reached the node from two separate "okTo" paths, which is
* not acceptable.
*
* Also, if we have two distinct top decision parents it means that the node is reachable from two decision paths which
* are not "chained" (like in the example).
*
* It's worth noting that the last two examples can only occur in case of fork-join when we start to execute at least
* two separate paths in parallel. Without fork-join, multiple parents or two null parents would mean that there is a loop
* in the workflow but that should not happen since it has been validated.
*/
if (okPath && !(node instanceof KillNodeDef) && !(node instanceof JoinNodeDef) && !(node instanceof EndNodeDef)) {
// using Optional here so we can distinguish between "non-visited" and "visited - no parent" state.
Optional<String> decisionParentOpt = nodeAndDecisionParents.get(nodeName);
if (decisionParentOpt == null) {
nodeAndDecisionParents.put(node.getName(), Optional.ofNullable(topDecisionParent));
} else {
String decisionParent = decisionParentOpt.isPresent() ? decisionParentOpt.get() : null;
if ((decisionParent == null && topDecisionParent == null) || !Objects.equals(decisionParent, topDecisionParent)) {
throw new WorkflowException(ErrorCode.E0743, nodeName);
}
}
}
/* Memoization part: don't re-walk paths that have been visited already. This prevents
* exponential runtime in specific cases.
*
* There are three edge-cases that we have to keep in mind:
* 1. This part of the code cannot be above the "okTo" verification part. Otherwise we would
* accept WFs where multiple "ok" paths lead to the same node.
*
* 2. We don't store Join nodes. Firstly, we don't recurse from Join nodes anyway.
* Also, it's necessary to reach fork-join mapping verification below,
* so that we can throw errors "E0742" or "E0758" if needed.
*
* 3. We don't store End nodes. Similarly to Join, no recursion occurs after End. Plus, we
* could miss the erroneous condition "E0737" if we previously arrived at End from a valid path.
*/
if (visitedNodes.contains(node)) {
LOG.debug("Skipping node because it's been validated: " + nodeName);
path.remove(nodeName);
return;
} else {
if (node instanceof JoinNodeDef || node instanceof EndNodeDef) {
LOG.debug("Not storing node because it's a Join or End: " + nodeName);
} else {
visitedNodes.add(node);
LOG.debug("Storing node as visited: " + nodeName);
}
}
/* Fork-Join validation logic:
*
* At each Fork node, we recurse to every possible paths, changing the "currentFork" variable to the Fork node. We stop
* walking as soon as we encounter a Join node. At the Join node, we update the forkJoin mapping, which maintains
* the relationship between every fork-join pair (actually it's join->fork mapping). We check whether the join->fork
* mapping already contains another Fork node, which means that the Join is reachable from at least two distinct
* Fork nodes, so we terminate the validation.
*
* From the Join node, we don't recurse further. Therefore, all recursive calls return back to the point where we called
* validateForkJoin() from the Fork node in question.
*
* At this point, we have to check how many different Join nodes we've found at each different paths. We collect them to
* a set, then we make sure that we have only a single Join node for all Fork paths. Otherwise the workflow is broken.
*
* If we have only a single Join, then we get the transition node from the Join and go on with the recursive validation -
* this time we use the original "currentFork" variable that we have on the stack. With this approach, nested
* Fork-Joins are handled correctly.
*/
if (node instanceof ForkNodeDef) {
final List<String> transitions = node.getTransitions();
checkForkTransitions(app, transitions, node);
for (String t : transitions) {
NodeDef transition = app.getNode(t);
validateForkJoin(app, transition, node, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents,
visitedNodes);
}
// get the Join node for this ForkNode & validate it (we must have only one)
Set<String> joins = new HashSet<String>();
collectJoins(app, forkJoins, nodeName, joins);
checkJoins(joins, nodeName);
List<String> joinTransitions = app.getNode(joins.iterator().next()).getTransitions();
NodeDef next = app.getNode(joinTransitions.get(0));
validateForkJoin(app, next, currentFork, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents,
visitedNodes);
} else if (node instanceof JoinNodeDef) {
if (currentFork == null) {
throw new WorkflowException(ErrorCode.E0742, node.getName());
}
// join --> fork mapping
String forkNode = forkJoins.get(nodeName);
if (forkNode == null) {
forkJoins.put(nodeName, currentFork.getName());
} else if (!forkNode.equals(currentFork.getName())) {
throw new WorkflowException(ErrorCode.E0758, node.getName(), forkNode + "," + currentFork);
}
} else if (node instanceof DecisionNodeDef) {
List<String> transitions = node.getTransitions();
// see explanation above - if we already have a topDecisionParent, we don't update it
String parentDecisionNode = topDecisionParent;
if (parentDecisionNode == null) {
parentDecisionNode = nodeName;
}
for (String t : transitions) {
NodeDef transition = app.getNode(t);
validateForkJoin(app, transition, currentFork, parentDecisionNode, okPath, path, forkJoins,
nodeAndDecisionParents, visitedNodes);
}
} else if (node instanceof KillNodeDef) {
// no op
} else if (node instanceof EndNodeDef) {
// We can't end the WF if we're on a Fork path. From the "path" deque, we remove the last node (which
// is the current "End") and look at last node again so we know where we came from
if (currentFork != null) {
path.removeLast();
String previous = path.peekLast();
throw new WorkflowException(ErrorCode.E0737, previous, node.getName());
}
} else if (node instanceof ActionNodeDef) {
String transition = node.getTransitions().get(0); // "ok to" transition
NodeDef okNode = app.getNode(transition);
validateForkJoin(app, okNode, currentFork, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents,
visitedNodes);
transition = node.getTransitions().get(1); // "error to" transition
NodeDef errorNode = app.getNode(transition);
validateForkJoin(app, errorNode, currentFork, topDecisionParent, false, path, forkJoins, nodeAndDecisionParents,
visitedNodes);
} else if (node instanceof StartNodeDef) {
String transition = node.getTransitions().get(0); // start always has only 1 transition
NodeDef tranNode = app.getNode(transition);
validateForkJoin(app, tranNode, currentFork, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents,
visitedNodes);
} else {
throw new WorkflowException(ErrorCode.E0740, node.getClass());
}
path.remove(nodeName);
}
private void checkActionName(NodeDef node) throws WorkflowException {
if (!(node instanceof StartNodeDef)) {
try {
ParamChecker.validateActionName(node.getName());
} catch (IllegalArgumentException ex) {
throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
}
}
}
private void checkActionNode(NodeDef node) throws WorkflowException {
try {
Element action = XmlUtils.parseXml(node.getConf());
ActionService actionService = Services.get().get(ActionService.class);
boolean supportedAction = actionService.hasActionType(action.getName());
if (!supportedAction) {
throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName());
}
} catch (JDOMException ex) {
throw new WorkflowException(ErrorCode.E0700, "JDOMException: " + ex.getMessage());
}
}
private void checkCycle(Deque<String> path, String nodeName) throws WorkflowException {
if (path.contains(nodeName)) {
path.addLast(nodeName);
throw new WorkflowException(ErrorCode.E0707, nodeName, String.join("->", path));
}
}
// Check that a fork doesn't go to the same node more than once
private void checkForkTransitions(LiteWorkflowApp app, List<String> transitionsList, NodeDef node) throws WorkflowException {
for (final String t : transitionsList) {
NodeDef aNode = app.getNode(t);
// Now we have to figure out which node is the problem and what type of node they are (join and kill are ok)
if (!(aNode instanceof JoinNodeDef) && !(aNode instanceof KillNodeDef)) {
int count = CollectionUtils.cardinality(t, transitionsList);
if (count > 1) {
throw new WorkflowException(ErrorCode.E0744, node.getName(), t);
}
}
}
}
private void collectJoins(LiteWorkflowApp app, Map<String, String> forkJoinPairs, String nodeName, Set<String> joins) {
for (Entry<String, String> entry : forkJoinPairs.entrySet()) {
if (entry.getValue().equals(nodeName)) {
joins.add(app.getNode(entry.getKey()).getName());
}
}
}
private void checkJoins(Set<String> joinNodes, String forkName) throws WorkflowException {
if (joinNodes.size() == 0) {
throw new WorkflowException(ErrorCode.E0733, forkName);
}
if (joinNodes.size() > 1) {
throw new WorkflowException(ErrorCode.E0757, forkName, String.join(",", joinNodes));
}
}
// Tiny utility class where we keep track of how many fork and join nodes we have found
private class ForkJoinCount {
int forks = 0;
int joins = 0;
}
}