blob: dc13e0804a99280402f8fc8cc987a927a631ed1e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.workflow.lite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.XLogService;
import org.apache.oozie.util.StringSerializationUtil;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.workflow.WorkflowApp;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.oozie.workflow.WorkflowInstance;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
//TODO javadoc
public class LiteWorkflowInstance implements Writable, WorkflowInstance {
private static final String TRANSITION_TO = "transition.to";
private XLog log = XLog.getLog(getClass());
private static String PATH_SEPARATOR = "/";
private static String ROOT = PATH_SEPARATOR;
private static String TRANSITION_SEPARATOR = "#";
private static class NodeInstance {
String nodeName;
boolean started = false;
private NodeInstance(String nodeName) {
this.nodeName = nodeName;
}
}
private class Context implements NodeHandler.Context {
private NodeDef nodeDef;
private String executionPath;
private String exitState;
private Status status = Status.RUNNING;
private Context(NodeDef nodeDef, String executionPath, String exitState) {
this.nodeDef = nodeDef;
this.executionPath = executionPath;
this.exitState = exitState;
}
public NodeDef getNodeDef() {
return nodeDef;
}
public String getExecutionPath() {
return executionPath;
}
public String getParentExecutionPath(String executionPath) {
return LiteWorkflowInstance.getParentPath(executionPath);
}
public String getSignalValue() {
return exitState;
}
public String createExecutionPath(String name) {
return LiteWorkflowInstance.createChildPath(executionPath, name);
}
public String createFullTransition(String executionPath, String transition) {
return LiteWorkflowInstance.createFullTransition(executionPath, transition);
}
public void deleteExecutionPath() {
if (!executionPaths.containsKey(executionPath)) {
throw new IllegalStateException();
}
executionPaths.remove(executionPath);
executionPath = LiteWorkflowInstance.getParentPath(executionPath);
}
public void failJob() {
status = Status.FAILED;
}
public void killJob() {
status = Status.KILLED;
}
public void completeJob() {
status = Status.SUCCEEDED;
}
@Override
public Object getTransientVar(String name) {
return LiteWorkflowInstance.this.getTransientVar(name);
}
@Override
public String getVar(String name) {
return LiteWorkflowInstance.this.getVar(name);
}
@Override
public void setTransientVar(String name, Object value) {
LiteWorkflowInstance.this.setTransientVar(name, value);
}
@Override
public void setVar(String name, String value) {
LiteWorkflowInstance.this.setVar(name, value);
}
@Override
public LiteWorkflowInstance getProcessInstance() {
return LiteWorkflowInstance.this;
}
}
private LiteWorkflowApp def;
private Configuration conf;
private String instanceId;
private Status status;
private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>();
private Map<String, String> persistentVars = new HashMap<String, String>();
private Map<String, Object> transientVars = new HashMap<String, Object>();
protected LiteWorkflowInstance() {
log = XLog.getLog(getClass());
}
public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId) {
this();
this.def = Objects.requireNonNull(def, "def cannot be null");
this.instanceId = Objects.requireNonNull(instanceId, "instanceId cannot be null");
this.conf = Objects.requireNonNull(conf, "conf cannot be null");
refreshLog();
status = Status.PREP;
}
public synchronized boolean start() throws WorkflowException {
if (status != Status.PREP) {
throw new WorkflowException(ErrorCode.E0719);
}
log.debug(XLog.STD, "Starting job");
status = Status.RUNNING;
executionPaths.put(ROOT, new NodeInstance(StartNodeDef.START));
return signal(ROOT, StartNodeDef.START);
}
//todo if suspended store signal and use when resuming
public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException {
ParamChecker.notEmpty(executionPath, "executionPath");
Objects.requireNonNull(signalValue, "signalValue cannot be null");
if (status != Status.RUNNING) {
throw new WorkflowException(ErrorCode.E0716);
}
NodeInstance nodeJob = executionPaths.get(executionPath);
log.debug(XLog.STD, "Signaling job execution path [{0}] signal value [{1}] for node [{2}]", executionPath,
signalValue, (nodeJob == null ? null : nodeJob.nodeName));
if (nodeJob == null) {
status = Status.FAILED;
log.error("invalid execution path [{0}]", executionPath);
}
NodeDef nodeDef = null;
if (!status.isEndState()) {
nodeDef = def.getNode(nodeJob.nodeName);
if (nodeDef == null) {
status = Status.FAILED;
log.error("invalid transition [{0}]", nodeJob.nodeName);
}
}
if (!status.isEndState()) {
NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
boolean exiting = true;
Context context = new Context(nodeDef, executionPath, signalValue);
if (!nodeJob.started) {
try {
nodeHandler.loopDetection(context);
exiting = nodeHandler.enter(context);
nodeJob.started = true;
}
catch (WorkflowException ex) {
status = Status.FAILED;
List<String> killedNodes = terminateNodes(Status.KILLED);
if (killedNodes.size() > 1) {
log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
.size());
}
throw ex;
}
}
if (exiting) {
List<String> pathsToStart = new ArrayList<String>();
List<String> fullTransitions;
try {
fullTransitions = nodeHandler.multiExit(context);
int last = fullTransitions.size() - 1;
// TEST THIS
if (last >= 0) {
String transitionTo = getTransitionNode(fullTransitions.get(last));
if (nodeDef instanceof ForkNodeDef) {
transitionTo = "*"; // WF action cannot hold all transitions for a fork.
// transitions are hardcoded in the WF app.
}
persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO,
transitionTo);
}
}
catch (WorkflowException ex) {
status = Status.FAILED;
throw ex;
}
if (context.status == Status.KILLED) {
status = Status.KILLED;
log.debug(XLog.STD, "Completing job, kill node [{0}]", nodeJob.nodeName);
} else if (context.status == Status.FAILED) {
status = Status.FAILED;
log.debug(XLog.STD, "Completing job, fail node [{0}]", nodeJob.nodeName);
} else if (context.status == Status.SUCCEEDED) {
status = Status.SUCCEEDED;
log.debug(XLog.STD, "Completing job, end node [{0}]", nodeJob.nodeName);
} else {
for (String fullTransition : fullTransitions) {
//this is the whole trick for forking, we need the executionpath and the transition.
//in case of no forking, last element of executionpath is different from transition.
//in case of forking, they are the same
log.debug(XLog.STD, "Exiting node [{0}] with transition[{1}]", nodeJob.nodeName,
fullTransition);
String execPathFromTransition = getExecutionPath(fullTransition);
String transition = getTransitionNode(fullTransition);
def.validateTransition(nodeJob.nodeName, transition);
NodeInstance nodeJobInPath = executionPaths.get(execPathFromTransition);
if ((nodeJobInPath == null) || (!transition.equals(nodeJobInPath.nodeName))) {
// TODO explain this IF better
// If the WfJob is signaled with the parent
// execution executionPath again
// The Fork node will execute again.. and replace
// the Node WorkflowJobBean
// so this is required to prevent that..
// Question : Should we throw an error in this case
// ??
executionPaths.put(execPathFromTransition, new NodeInstance(transition));
pathsToStart.add(execPathFromTransition);
}
}
// signal all new synch transitions
for (String pathToStart : pathsToStart) {
signal(pathToStart, "::synch::");
}
}
}
}
if (status.isEndState()) {
if (status == Status.FAILED) {
List<String> failedNodes = terminateNodes(status);
log.warn(XLog.STD, "Workflow completed [{0}], failing [{1}] running nodes", status, failedNodes
.size());
}
else {
List<String> killedNodes = terminateNodes(Status.KILLED);
if (killedNodes.size() > 1) {
log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
.size());
}
}
}
return status.isEndState();
}
/**
* Get NodeDef from workflow instance
* @param executionPath execution path
* @return node def
*/
public NodeDef getNodeDef(String executionPath) {
NodeInstance nodeJob = executionPaths.get(executionPath);
NodeDef nodeDef = null;
if (nodeJob == null) {
log.error("invalid execution path [{0}]", executionPath);
}
else {
nodeDef = def.getNode(nodeJob.nodeName);
if (nodeDef == null) {
log.error("invalid transition [{0}]", nodeJob.nodeName);
}
}
return nodeDef;
}
public synchronized void fail(String nodeName) throws WorkflowException {
String failedNode = failNode(nodeName);
if (failedNode != null) {
log.warn(XLog.STD, "Workflow Failed. Failing node [{0}]", failedNode);
}
else {
//TODO failed attempting to fail the action. EXCEPTION
}
if (status.isEndState()) {
if (status == Status.FAILED) {
log.warn(XLog.STD, "An attempt was made to fail the workflow, which is already failed");
} else {
throw new WorkflowException(ErrorCode.E0718);
}
} else {
List<String> killedNodes = killNodes();
if (killedNodes.size() > 1) {
log.warn(XLog.STD, "Workflow Failed, killing [{0}] nodes", killedNodes.size());
}
status = Status.FAILED;
}
}
public synchronized void kill() throws WorkflowException {
if (status.isEndState()) {
throw new WorkflowException(ErrorCode.E0718);
}
log.debug(XLog.STD, "Killing job");
List<String> killedNodes = killNodes();
if (killedNodes.size() > 1) {
log.warn(XLog.STD, "workflow killed, killing [{0}] nodes", killedNodes.size());
}
status = Status.KILLED;
}
public synchronized void suspend() throws WorkflowException {
if (status != Status.RUNNING) {
throw new WorkflowException(ErrorCode.E0716);
}
log.debug(XLog.STD, "Suspending job");
this.status = Status.SUSPENDED;
}
public boolean isSuspended() {
return (status == Status.SUSPENDED);
}
public synchronized void resume() throws WorkflowException {
if (status != Status.SUSPENDED) {
throw new WorkflowException(ErrorCode.E0717);
}
log.debug(XLog.STD, "Resuming job");
status = Status.RUNNING;
}
public void setVar(String name, String value) {
if (value != null) {
persistentVars.put(name, value);
}
else {
persistentVars.remove(name);
}
}
@Override
public Map<String, String> getAllVars() {
return persistentVars;
}
@Override
public void setAllVars(Map<String, String> varMap) {
persistentVars.putAll(varMap);
}
public String getVar(String name) {
return persistentVars.get(name);
}
public void setTransientVar(String name, Object value) {
if (value != null) {
transientVars.put(name, value);
}
else {
transientVars.remove(name);
}
}
public boolean hasTransientVar(String name) {
return transientVars.containsKey(name);
}
public Object getTransientVar(String name) {
return transientVars.get(name);
}
public boolean hasEnded() {
return status.isEndState();
}
private List<String> terminateNodes(Status endStatus) {
List<String> endNodes = new ArrayList<String>();
for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
if (entry.getValue().started) {
NodeDef nodeDef = def.getNode(entry.getValue().nodeName);
if (!(nodeDef instanceof ControlNodeDef)) {
NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
try {
if (endStatus == Status.KILLED) {
nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
}
else {
if (endStatus == Status.FAILED) {
nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
}
}
endNodes.add(nodeDef.getName());
}
catch (Exception ex) {
log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
nodeDef.getName(), ex);
}
}
}
}
return endNodes;
}
private String failNode(String nodeName) {
String failedNode = null;
for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
String node = entry.getKey();
NodeInstance nodeInstance = entry.getValue();
if (nodeInstance.started && nodeInstance.nodeName.equals(nodeName)) {
NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
try {
nodeHandler.fail(new Context(nodeDef, node, null));
failedNode = nodeDef.getName();
nodeInstance.started = false;
}
catch (Exception ex) {
log.warn(XLog.STD, "Error failing node [{0}]", nodeDef.getName(), ex);
}
return failedNode;
}
}
return failedNode;
}
private List<String> killNodes() {
List<String> killedNodes = new ArrayList<String>();
for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
String node = entry.getKey();
NodeInstance nodeInstance = entry.getValue();
if (nodeInstance.started) {
NodeDef nodeDef = def.getNode(nodeInstance.nodeName);
NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
try {
nodeHandler.kill(new Context(nodeDef, node, null));
killedNodes.add(nodeDef.getName());
}
catch (Exception ex) {
log.warn(XLog.STD, "Error killing node [{0}]", nodeDef.getName(), ex);
}
}
}
return killedNodes;
}
public LiteWorkflowApp getProcessDefinition() {
return def;
}
private static String createChildPath(String path, String child) {
return path + child + PATH_SEPARATOR;
}
private static String getParentPath(String path) {
path = path.substring(0, path.length() - 1);
return (path.length() == 0) ? null : path.substring(0, path.lastIndexOf(PATH_SEPARATOR) + 1);
}
private static String createFullTransition(String executionPath, String transition) {
return executionPath + TRANSITION_SEPARATOR + transition;
}
private static String getExecutionPath(String fullTransition) {
int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
if (index == -1) {
throw new IllegalArgumentException("Invalid fullTransition");
}
return fullTransition.substring(0, index);
}
private static String getTransitionNode(String fullTransition) {
int index = fullTransition.indexOf(TRANSITION_SEPARATOR);
if (index == -1) {
throw new IllegalArgumentException("Invalid fullTransition");
}
return fullTransition.substring(index + 1);
}
private NodeHandler newInstance(Class<? extends NodeHandler> handler) {
return (NodeHandler) ReflectionUtils.newInstance(handler, null);
}
private void refreshLog() {
XLog.Info.get().setParameter(XLogService.USER, conf.get(OozieClient.USER_NAME));
XLog.Info.get().setParameter(XLogService.GROUP, conf.get(OozieClient.GROUP_NAME));
XLog.Info.get().setParameter(DagXLogInfoService.APP, def.getName());
XLog.Info.get().setParameter(DagXLogInfoService.TOKEN, conf.get(OozieClient.LOG_TOKEN, ""));
XLog.Info.get().setParameter(DagXLogInfoService.JOB, instanceId);
log = XLog.getLog(getClass());
}
public Status getStatus() {
return status;
}
public void setStatus(Status status) {
this.status = status;
}
@Override
public void write(DataOutput dOut) throws IOException {
dOut.writeUTF(instanceId);
//Hadoop Configuration has to get its act right
ByteArrayOutputStream baos = new ByteArrayOutputStream();
conf.writeXml(baos);
baos.close();
byte[] array = baos.toByteArray();
dOut.writeInt(array.length);
dOut.write(array);
def.write(dOut);
dOut.writeUTF(status.toString());
dOut.writeInt(executionPaths.size());
for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
dOut.writeUTF(entry.getKey());
dOut.writeUTF(entry.getValue().nodeName);
dOut.writeBoolean(entry.getValue().started);
}
dOut.writeInt(persistentVars.size());
for (Map.Entry<String, String> entry : persistentVars.entrySet()) {
dOut.writeUTF(entry.getKey());
StringSerializationUtil.writeString(dOut, entry.getValue());
}
}
@Override
public void readFields(DataInput dIn) throws IOException {
instanceId = dIn.readUTF();
//Hadoop Configuration has to get its act right
int len = dIn.readInt();
byte[] array = new byte[len];
dIn.readFully(array);
ByteArrayInputStream bais = new ByteArrayInputStream(array);
conf = new XConfiguration(bais);
def = new LiteWorkflowApp();
def.readFields(dIn);
status = Status.valueOf(dIn.readUTF());
int numExPaths = dIn.readInt();
for (int x = 0; x < numExPaths; x++) {
String path = dIn.readUTF();
String nodeName = dIn.readUTF();
boolean isStarted = dIn.readBoolean();
NodeInstance nodeInstance = new NodeInstance(nodeName);
nodeInstance.started = isStarted;
executionPaths.put(path, nodeInstance);
}
int numVars = dIn.readInt();
for (int x = 0; x < numVars; x++) {
String vName = dIn.readUTF();
persistentVars.put(vName, StringSerializationUtil.readString(dIn));
}
refreshLog();
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public WorkflowApp getApp() {
return def;
}
@Override
public String getId() {
return instanceId;
}
@Override
public String getTransition(String node) {
return persistentVars.get(node + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO);
}
@Override
public boolean equals(Object o) {
return (o != null) && (getClass().isInstance(o)) && ((WorkflowInstance) o).getId().equals(instanceId);
}
@Override
public int hashCode() {
return instanceId.hashCode();
}
}