blob: b777da32d1ac7c08aeb4b8e5e84224e53c4f1408 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.workflow.lite;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.util.StringSerializationUtil;
import org.apache.oozie.workflow.WorkflowApp;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.ErrorCode;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
//TODO javadoc
public class LiteWorkflowApp implements Writable, WorkflowApp {
/**
* Serialization of strings longer than 65k changed. This flag marks which method to use during reading.
*/
public static final int NEW_SERIALIZATION_METHOD_FLAG = -1;
private String name;
private String definition;
private Map<String, NodeDef> nodesMap = new LinkedHashMap<String, NodeDef>();
private boolean complete = false;
LiteWorkflowApp() {
}
public LiteWorkflowApp(String name, String definition, StartNodeDef startNode) {
this.name = ParamChecker.notEmpty(name, "name");
this.definition = ParamChecker.notEmpty(definition, "definition");
nodesMap.put(StartNodeDef.START, startNode);
}
public boolean equals(LiteWorkflowApp other) {
return !(other == null || getClass() != other.getClass() || !getName().equals(other.getName()));
}
public int hashCode() {
return name.hashCode();
}
public LiteWorkflowApp addNode(NodeDef node) throws WorkflowException {
Objects.requireNonNull(node, "node cannot be null");
if (complete) {
throw new WorkflowException(ErrorCode.E0704, name);
}
if (nodesMap.containsKey(node.getName())) {
throw new WorkflowException(ErrorCode.E0705, node.getName());
}
if (node.getTransitions().contains(node.getName())) {
throw new WorkflowException(ErrorCode.E0706, node.getName());
}
nodesMap.put(node.getName(), node);
if (node instanceof EndNodeDef) {
complete = true;
}
return this;
}
public String getName() {
return name;
}
public String getDefinition() {
return definition;
}
public Collection<NodeDef> getNodeDefs() {
return Collections.unmodifiableCollection(nodesMap.values());
}
public NodeDef getNode(String name) {
return nodesMap.get(name);
}
public void validateWorkflowIntegrity() {
//TODO traverse wf, ensure there are not cycles, no open paths, and one END
}
public void validateTransition(String name, String transition) {
ParamChecker.notEmpty(name, "name");
ParamChecker.notEmpty(transition, "transition");
NodeDef node = getNode(name);
if (!node.getTransitions().contains(transition)) {
throw new IllegalArgumentException("invalid transition");
}
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(name);
// write out -1 as a marker to use StringSerializationUtil. Previously it was split to 20k long bits in a list.
dataOutput.writeInt(NEW_SERIALIZATION_METHOD_FLAG);
StringSerializationUtil.writeString(dataOutput, definition);
dataOutput.writeInt(nodesMap.size());
for (NodeDef n : getNodeDefs()) {
dataOutput.writeUTF(n.getClass().getName());
n.write(dataOutput);
}
}
@Override
public void readFields(DataInput dataInput) throws IOException {
name = dataInput.readUTF();
//read the full definition back
int definitionListFlag = dataInput.readInt();
if(definitionListFlag > NEW_SERIALIZATION_METHOD_FLAG) {
// negative number marking the usage of StringSerializationUtil
// positive number is the length of the array the String was broken into.
StringBuilder sb = new StringBuilder();
for (int i = 0; i < definitionListFlag; i++) {
sb.append(dataInput.readUTF());
}
definition = sb.toString();
} else {
definition = StringSerializationUtil.readString(dataInput);
}
int numNodes = dataInput.readInt();
for (int x = 0; x < numNodes; x++) {
try {
String nodeDefClass = dataInput.readUTF();
NodeDef node = (NodeDef) ReflectionUtils.newInstance(Class.forName(nodeDefClass), null);
node.readFields(dataInput);
addNode(node);
}
catch (WorkflowException ex) {
throw new IOException(ex);
}
catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
}
}