blob: 812f2f8714cae356fad5a3a59d2a8bdded85dae0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.cas.workflow.repository;
//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.workflow.util.XmlStructFactory;
import org.apache.oodt.cas.workflow.examples.NoOpTask;
import org.apache.oodt.cas.workflow.structs.Workflow;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.WorkflowCondition;
import org.apache.oodt.cas.workflow.structs.WorkflowTaskConfiguration;
import org.apache.oodt.cas.workflow.structs.WorkflowTaskInstance;
import org.apache.oodt.cas.workflow.structs.exceptions.RepositoryException;
import org.apache.oodt.cas.workflow.structs.exceptions.WorkflowTaskInstanceException;
//JDK imports
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.Vector;
import java.util.Iterator;
import java.util.Arrays;
import java.net.URI;
import java.net.URISyntaxException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.FileFilter;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
import org.w3c.dom.Element;
import org.xml.sax.InputSource;
/**
* @author mattmann
* @version $Revsion$
*
* <p>
* A {@link WorkflowRepository} that loads events, {@link Workflow}s,
* {@link WorkflowTask}s and {@link WorkflowCondition}s from specialized xml
* files. The WorkflowRepository is given an initial set of seed directory uris,
* where it looks for the following files:
*
* <ul>
* <li>conditions.xml - defines workflow pre conditions</li>
* <li>tasks.xml - defines workflow tasks</li>
* <li>*.workflow.xml - individual workflow xml files specifying a single
* workflow</li>
* <li>events.xml - maps available workflows to event names</li>
* </ul>
*
* All of the WorkflowTasks, WorkflowConditions and Workflows themselves are
* cached in memory by their ids (which are typically URNs).
* </p>
*/
public class XMLWorkflowRepository implements WorkflowRepository {
/* the list of directory URIs where workflow xml files live */
private List workflowHomeUris = null;
/* our log stream */
private static Logger LOG = Logger.getLogger(XMLWorkflowRepository.class
.getName());
/* our task map */
private static HashMap taskMap = new HashMap();
/* our condition map */
private static HashMap conditionMap = new HashMap();
/* our workflow map */
private static HashMap workflowMap = new HashMap();
/* our event map */
private static HashMap eventMap = new HashMap();
private static FileFilter workflowXmlFilter = new FileFilter() {
public boolean accept(File pathname) {
return pathname.isFile()
&& pathname.toString().endsWith(".workflow.xml");
}
};
/**
* <p>
* Constructs a new XMLWorkflowRepository with the given parameter
* <code>uris</code>.
* </p>
*
* @param uris
* URIs pointing to directories that follow the XML workflow
* repository convention documented at the top of this class.
*/
public XMLWorkflowRepository(List uris) {
workflowHomeUris = uris;
// load the tasks and conditions
loadConditions(workflowHomeUris);
loadTasks(workflowHomeUris);
loadWorkflows(workflowHomeUris);
loadEvents(workflowHomeUris);
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getRegisteredEvents()
*/
public List getRegisteredEvents() throws RepositoryException {
return Arrays.asList(eventMap.keySet().toArray());
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getWorkflowTaskById(java.lang.String)
*/
public WorkflowTask getWorkflowTaskById(String taskId)
throws RepositoryException {
return (WorkflowTask) taskMap.get(taskId);
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getWorkflowConditionById(java.lang.String)
*/
public WorkflowCondition getWorkflowConditionById(String conditionId)
throws RepositoryException {
return (WorkflowCondition) conditionMap.get(conditionId);
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getWorkflowByName(java.lang.String)
*/
public Workflow getWorkflowByName(String workflowName)
throws RepositoryException {
for (Iterator i = workflowMap.keySet().iterator(); i.hasNext();) {
String workflowId = (String) i.next();
Workflow w = (Workflow) workflowMap.get(workflowId);
if (w.getName().equals(workflowName)) {
return w;
}
}
return null;
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getWorkflowById(java.lang.String)
*/
public Workflow getWorkflowById(String workflowId)
throws RepositoryException {
return (Workflow) workflowMap.get(workflowId);
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getWorkflows()
*/
public List getWorkflows() throws RepositoryException {
return Arrays.asList(workflowMap.values().toArray());
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getTasksByWorkflowId(java.lang.String)
*/
public List getTasksByWorkflowId(String workflowId)
throws RepositoryException {
Workflow w = getWorkflowById(workflowId);
return w.getTasks();
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getTasksByWorkflowName(java.lang.String)
*/
public List getTasksByWorkflowName(String workflowName)
throws RepositoryException {
Workflow w = getWorkflowByName(workflowName);
return w.getTasks();
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getWorkflowsForEvent(java.lang.String)
*/
public List getWorkflowsForEvent(String eventName)
throws RepositoryException {
return (List) eventMap.get(eventName);
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getConditionsByTaskName(java.lang.String)
*/
public List getConditionsByTaskName(String taskName)
throws RepositoryException {
for (Iterator i = taskMap.values().iterator(); i.hasNext();) {
WorkflowTask t = (WorkflowTask) i.next();
if (t.getTaskName().equals(taskName)) {
return t.getConditions();
}
}
return null;
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getConditionsByTaskId(java.lang.String)
*/
public List getConditionsByTaskId(String taskId) throws RepositoryException {
WorkflowTask t = (WorkflowTask) taskMap.get(taskId);
if (t != null) {
return t.getConditions();
} else
return null;
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getConfigurationByTaskId(java.lang.String)
*/
public WorkflowTaskConfiguration getConfigurationByTaskId(String taskId)
throws RepositoryException {
WorkflowTask task = (WorkflowTask) taskMap.get(taskId);
return task.getTaskConfig();
}
/* (non-Javadoc)
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#addTask(org.apache.oodt.cas.workflow.structs.WorkflowTask)
*/
@Override
public String addTask(WorkflowTask task) throws RepositoryException {
// check its conditions
if(task.getPreConditions() != null && task.getPreConditions().size() > 0){
for(WorkflowCondition cond: task.getPreConditions()){
if(!this.conditionMap.containsKey(cond.getConditionId())){
throw new RepositoryException("Reference in new task: ["+task.getTaskName()+"] to undefined pre condition ith id: ["+cond.getConditionId()+"]");
}
}
for(WorkflowCondition cond: task.getPostConditions()){
if(!this.conditionMap.containsKey(cond.getConditionId())){
throw new RepositoryException("Reference in new task: ["+task.getTaskName()+"] to undefined post condition ith id: ["+cond.getConditionId()+"]");
}
}
}
String taskId = task.getTaskId() != null ?
task.getTaskId():UUID.randomUUID().toString();
this.taskMap.put(taskId, task);
return taskId;
}
/* (non-Javadoc)
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#addWorkflow(org.apache.oodt.cas.workflow.structs.Workflow)
*/
@Override
public String addWorkflow(Workflow workflow) throws RepositoryException {
// first check to see that its tasks are all present
if(workflow.getTasks() == null || (workflow.getTasks() != null && workflow.getTasks().size() == 0)){
throw new RepositoryException("Attempt to define a new worklfow: ["+workflow.getName()+"] with no tasks.");
}
for(WorkflowTask task: (List<WorkflowTask>)workflow.getTasks()){
if(!this.taskMap.containsKey(task.getTaskId())){
throw new RepositoryException("Reference in new workflow: ["+workflow.getName()+"] to undefined task with id: ["+task.getTaskId()+"]");
}
// check its conditions
if(task.getConditions() != null && task.getConditions().size() > 0){
for(WorkflowCondition cond: (List<WorkflowCondition>)task.getConditions()){
if(!this.conditionMap.containsKey(cond.getConditionId())){
throw new RepositoryException("Reference in new workflow: ["+workflow.getName()+"] to undefined condition ith id: ["+cond.getConditionId()+"]");
}
}
}
}
String workflowId = workflow.getId();
if (workflowId == null
|| (workflowId != null && workflowId.equals(""))) {
// generate its ID
workflowId = UUID.randomUUID().toString();
workflow.setId(workflowId);
}
this.workflowMap.put(workflowId, workflow);
this.eventMap.put(workflowId, Collections.singletonList(workflow));
return workflowId;
}
/* (non-Javadoc)
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getConditionsByWorkflowId(java.lang.String)
*/
@Override
public List<WorkflowCondition> getConditionsByWorkflowId(String workflowId)
throws RepositoryException {
if(!this.workflowMap.containsKey(workflowId)) throw new
RepositoryException("Attempt to obtain conditions for a workflow: " +
"["+workflowId+"] that does not exist!");
return ((Workflow)this.workflowMap.get(workflowId)).getConditions();
}
/* (non-Javadoc)
* @see org.apache.oodt.cas.workflow.repository.WorkflowRepository#getTaskById(java.lang.String)
*/
@Override
public WorkflowTask getTaskById(String taskId) throws RepositoryException {
return (WorkflowTask)this.taskMap.get(taskId);
}
/**
* @param args
*/
public static void main(String[] args) throws RepositoryException {
String usage = "XmlWorkflowRepository <uri 1>...<uri n>\n";
List uris = null;
if (args.length == 0) {
System.err.println(usage);
System.exit(1);
}
uris = new Vector(args.length);
for (int i = 0; i < args.length; i++) {
if (args[i] != null) {
uris.add(args[i]);
}
}
XMLWorkflowRepository repo = new XMLWorkflowRepository(uris);
List workflows = repo.getWorkflows();
if (workflows != null) {
for (Iterator i = workflows.iterator(); i.hasNext();) {
Workflow w = (Workflow) i.next();
System.out.println("Workflow: [id=" + w.getId() + ", name="
+ w.getName() + "]");
System.out.println("Tasks: ");
for (Iterator j = w.getTasks().iterator(); j.hasNext();) {
WorkflowTask task = (WorkflowTask) j.next();
System.out.println("Task: [class="
+ task.getTaskInstanceClassName() + ", id="
+ task.getTaskId() + ", name=" + task.getTaskName()
+ ", order=" + task.getOrder() + ",reqMetFields="
+ task.getRequiredMetFields() + "]");
System.out.println("Configuration: ");
for (Iterator k = task.getTaskConfig().getProperties()
.keySet().iterator(); k.hasNext();) {
String key = (String) k.next();
String value = (String) task.getTaskConfig()
.getProperties().get(key);
System.out.println("[name=" + key + ", value=" + value
+ "]");
}
System.out.println("Conditions: ");
for (Iterator k = task.getConditions().iterator(); k
.hasNext();) {
WorkflowCondition condition = (WorkflowCondition) k
.next();
System.out.println("Condition: ["
+ condition.getClass().getName() + ", id="
+ condition.getConditionId() + ", name="
+ condition.getConditionName() + ", timeout="
+ condition.getTimeoutSeconds()+ ", optional="
+ condition.isOptional()+", order="
+ condition.getOrder() + "]");
System.out.println("Configuration: ");
for (String cKeyName : (Set<String>) (Set<?>) condition
.getCondConfig().getProperties().keySet()) {
System.out.println("[name=" + cKeyName + ", value="
+ condition.getCondConfig().getProperty(cKeyName) + "]");
}
}
}
}
} else {
System.out.println("No workflows defined!");
}
}
private void loadTasks(List dirUris) {
if (dirUris != null && dirUris.size() > 0) {
for (Iterator i = dirUris.iterator(); i.hasNext();) {
String dirUri = (String) i.next();
try {
File workflowDir = new File(new URI(dirUri));
if (workflowDir.isDirectory()) {
String workflowDirStr = workflowDir.getAbsolutePath();
if (!workflowDirStr.endsWith("/")) {
workflowDirStr += "/";
}
Document taskRoot = getDocumentRoot(workflowDirStr
+ "tasks.xml");
Element taskElement = taskRoot.getDocumentElement();
NodeList taskElemList = taskElement
.getElementsByTagName("task");
if (taskElemList != null
&& taskElemList.getLength() > 0) {
for (int j = 0; j < taskElemList.getLength(); j++) {
Element taskElem = (Element) taskElemList
.item(j);
WorkflowTask task = XmlStructFactory
.getWorkflowTask(taskElem, conditionMap);
if (task != null) {
taskMap.put(task.getTaskId(), task);
}
}
}
}
} catch (URISyntaxException e) {
LOG
.log(
Level.WARNING,
"DirUri: "
+ dirUri
+ " is not a directory: skipping task loading for it.");
}
}
}
}
private void loadConditions(List dirUris) {
if (dirUris != null && dirUris.size() > 0) {
for (Iterator i = dirUris.iterator(); i.hasNext();) {
String dirUri = (String) i.next();
try {
File workflowDir = new File(new URI(dirUri));
if (workflowDir.isDirectory()) {
String workflowDirStr = workflowDir.getAbsolutePath();
if (!workflowDirStr.endsWith("/")) {
workflowDirStr += "/";
}
Document conditionRoot = getDocumentRoot(workflowDirStr
+ "conditions.xml");
Element conditionElement = conditionRoot
.getDocumentElement();
NodeList conditionElemList = conditionElement
.getElementsByTagName("condition");
if (conditionElemList != null
&& conditionElemList.getLength() > 0) {
for (int j = 0; j < conditionElemList.getLength(); j++) {
Element conditionElem = (Element) conditionElemList
.item(j);
WorkflowCondition condition = XmlStructFactory
.getWorkflowCondition(conditionElem);
if (condition != null) {
conditionMap.put(
condition.getConditionId(),
condition);
}
}
}
}
} catch (URISyntaxException e) {
LOG
.log(
Level.WARNING,
"DirUri: "
+ dirUri
+ " is not a directory: skipping condition loading for it.");
}
}
}
}
private void loadWorkflows(List dirUris) {
if (dirUris != null && dirUris.size() > 0) {
for (Iterator i = dirUris.iterator(); i.hasNext();) {
String dirUri = (String) i.next();
try {
File workflowDir = new File(new URI(dirUri));
if (workflowDir.isDirectory()) {
String workflowDirStr = workflowDir.getAbsolutePath();
if (!workflowDirStr.endsWith("/")) {
workflowDirStr += "/";
}
// get all the workflow xml files
File[] workflowFiles = workflowDir
.listFiles(workflowXmlFilter);
for (int j = 0; j < workflowFiles.length; j++) {
String workflowXmlFile = workflowFiles[j]
.getAbsolutePath();
Document workflowRoot = getDocumentRoot(workflowXmlFile);
String workflowId = workflowRoot
.getDocumentElement().getAttribute("id");
if (workflowMap.get(workflowId) == null) {
Workflow w = XmlStructFactory.getWorkflow(
workflowRoot.getDocumentElement(),
taskMap, conditionMap);
if(w.getConditions() != null && w.getConditions().size() > 0){
// add a virtual first task, with the conditions
w.getTasks().add(0, getGlobalWorkflowConditionsTask(w.getName(), w.getId(), w.getConditions()));
}
workflowMap.put(workflowId, w);
} else {
LOG
.log(
Level.FINE,
"Ignoring workflow file: "
+ workflowXmlFile
+ " when loading workflows, workflow id: "
+ workflowId
+ " already loaded");
}
}
}
} catch (URISyntaxException e) {
LOG
.log(
Level.WARNING,
"DirUri: "
+ dirUri
+ " is not a directory: skipping workflow loading for it.");
}
}
}
}
private void loadEvents(List dirUris) {
if (dirUris != null && dirUris.size() > 0) {
for (Iterator i = dirUris.iterator(); i.hasNext();) {
String dirUri = (String) i.next();
try {
File workflowDir = new File(new URI(dirUri));
if (workflowDir.isDirectory()) {
String workflowDirStr = workflowDir.getAbsolutePath();
if (!workflowDirStr.endsWith("/")) {
workflowDirStr += "/";
}
Document eventRoot = getDocumentRoot(workflowDirStr
+ "events.xml");
Element eventElement = eventRoot.getDocumentElement();
NodeList eventElemList = eventElement
.getElementsByTagName("event");
if (eventElemList != null
&& eventElemList.getLength() > 0) {
for (int j = 0; j < eventElemList.getLength(); j++) {
Element eventElem = (Element) eventElemList
.item(j);
String eventName = eventElem
.getAttribute("name");
Workflow w = null;
NodeList workflowNodeList = eventElem
.getElementsByTagName("workflow");
if (workflowNodeList != null
&& workflowNodeList.getLength() > 0) {
List workflowList = new Vector();
for (int k = 0; k < workflowNodeList
.getLength(); k++) {
Element workflowElement = (Element) workflowNodeList
.item(k);
w = (Workflow) workflowMap
.get(workflowElement
.getAttribute("id"));
if (w != null) {
workflowList.add(w);
}
}
eventMap.put(eventName, workflowList);
}
}
}
}
} catch (URISyntaxException e) {
LOG
.log(
Level.WARNING,
"DirUri: "
+ dirUri
+ " is not a directory: skipping event loading for it.");
}
}
}
}
private Document getDocumentRoot(String xmlFile) {
// open up the XML file
DocumentBuilderFactory factory = null;
DocumentBuilder parser = null;
Document document = null;
InputSource inputSource = null;
InputStream xmlInputStream = null;
try {
xmlInputStream = new File(xmlFile).toURL().openStream();
} catch (IOException e) {
LOG.log(Level.WARNING,
"IOException when getting input stream from [" + xmlFile
+ "]: returning null document root");
return null;
}
inputSource = new InputSource(xmlInputStream);
try {
factory = DocumentBuilderFactory.newInstance();
parser = factory.newDocumentBuilder();
document = parser.parse(inputSource);
} catch (Exception e) {
LOG.warning("Unable to parse xml file [" + xmlFile + "]."
+ "Reason is [" + e + "]");
return null;
}
return document;
}
private WorkflowTask getGlobalWorkflowConditionsTask(String workflowName, String workflowId, List<WorkflowCondition> conditions){
WorkflowTask task = new WorkflowTask();
task.setConditions(conditions);
task.setTaskConfig(new WorkflowTaskConfiguration());
task.setTaskId(workflowId+"-global-conditions-eval");
task.setTaskName(workflowName+"-global-conditions-eval");
task.setTaskInstanceClassName(NoOpTask.class.getName());
this.taskMap.put(task.getTaskId(), task);
return task;
}
}