blob: a674aad8a245b2b013b7c77e756e47e4781ed57a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.airavata.workflow.core;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.impl.RabbitMQProcessPublisher;
import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
import org.apache.airavata.model.messaging.event.TaskIdentifier;
import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
import org.apache.airavata.model.util.ExperimentModelUtil;
import org.apache.airavata.model.workspace.experiment.ExecutionUnit;
import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.ChildDataType;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.workflow.core.dag.edge.Edge;
import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
import org.apache.airavata.workflow.core.dag.nodes.NodeState;
import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNode;
import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
import org.apache.airavata.workflow.core.dag.nodes.WorkflowOutputNode;
import org.apache.airavata.workflow.core.dag.port.InPort;
import org.apache.airavata.workflow.core.dag.port.OutPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Package-Private class
*/
class SimpleWorkflowInterpreter{
private static final Logger log = LoggerFactory.getLogger(SimpleWorkflowInterpreter.class);
private List<WorkflowInputNode> workflowInputNodes;
private Experiment experiment;
private String credentialToken;
private String gatewayName;
private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<String, WorkflowNode>();
private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<String, WorkflowNode>();
private Map<String, ProcessContext> processingQueue = new ConcurrentHashMap<String, ProcessContext>();
private Map<String, ProcessContext> completeList = new HashMap<String, ProcessContext>();
private Registry registry;
private List<WorkflowOutputNode> completeWorkflowOutputs = new ArrayList<WorkflowOutputNode>();
private RabbitMQProcessPublisher publisher;
private RabbitMQStatusConsumer statusConsumer;
private String consumerId;
private boolean continueWorkflow = true;
public SimpleWorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessPublisher publisher) throws RegistryException {
this.gatewayName = gatewayName;
setExperiment(experimentId);
this.credentialToken = credentialToken;
this.publisher = publisher;
}
public SimpleWorkflowInterpreter(Experiment experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessPublisher publisher) {
this.gatewayName = gatewayName;
this.experiment = experiment;
this.credentialToken = credentialStoreToken;
this.publisher = publisher;
}
/**
* Package-Private method.
* @throws Exception
*/
void launchWorkflow() throws Exception {
WorkflowFactoryImpl wfFactory = WorkflowFactoryImpl.getInstance();
WorkflowParser workflowParser = wfFactory.getWorkflowParser(experiment.getExperimentID(), credentialToken);
log.debug("Initialized workflow parser");
setWorkflowInputNodes(workflowParser.parse());
log.debug("Parsed the workflow and got the workflow input nodes");
// process workflow input nodes
processWorkflowInputNodes(getWorkflowInputNodes());
if (readyList.isEmpty()) {
StringBuilder sb = new StringBuilder();
for (WorkflowInputNode workflowInputNode : workflowInputNodes) {
sb.append(", ");
sb.append(workflowInputNode.getInputObject().getName());
sb.append("=");
sb.append(workflowInputNode.getInputObject().getValue());
}
throw new AiravataException("No workflow application node is in ready state to run with experiment inputs" + sb.toString());
}
processReadyList();
}
// try to remove synchronization tag
/**
* Package-Private method.
* @throws RegistryException
* @throws AiravataException
*/
void processReadyList() throws RegistryException, AiravataException {
if (readyList.isEmpty() && processingQueue.isEmpty() && !waitingList.isEmpty()) {
throw new AiravataException("No workflow application node is in ready state to run");
}
for (WorkflowNode readyNode : readyList.values()) {
if (readyNode instanceof WorkflowOutputNode) {
WorkflowOutputNode wfOutputNode = (WorkflowOutputNode) readyNode;
wfOutputNode.getOutputObject().setValue(wfOutputNode.getInPort().getInputObject().getValue());
addToCompleteOutputNodeList(wfOutputNode);
continue;
}
WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(readyNode);
TaskDetails process = getProcess(workflowNodeDetails);
ProcessContext processContext = new ProcessContext(readyNode, workflowNodeDetails, process);
addToProcessingQueue(processContext);
publishToProcessQueue(process);
}
}
private void publishToProcessQueue(TaskDetails process) throws AiravataException {
ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
processSubmitEvent.setCredentialToken(credentialToken);
processSubmitEvent.setTaskId(process.getTaskID());
MessageContext messageContext = new MessageContext(processSubmitEvent, MessageType.TASK, process.getTaskID(), null);
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
publisher.publish(messageContext);
}
private TaskDetails getProcess(WorkflowNodeDetails wfNodeDetails) throws RegistryException {
// create workflow taskDetails from workflowNodeDetails
TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(getExperiment(), wfNodeDetails);
taskDetails.setTaskID(getRegistry()
.add(ChildDataType.TASK_DETAIL, taskDetails, wfNodeDetails.getNodeInstanceId()).toString());
return taskDetails;
}
private WorkflowNodeDetails createWorkflowNodeDetails(WorkflowNode readyNode) throws RegistryException {
WorkflowNodeDetails wfNodeDetails = ExperimentModelUtil.createWorkflowNode(readyNode.getId(), null);
ExecutionUnit executionUnit = ExecutionUnit.APPLICATION;
String executionData = null;
if (readyNode instanceof ApplicationNode) {
executionUnit = ExecutionUnit.APPLICATION;
executionData = ((ApplicationNode) readyNode).getApplicationId();
setupNodeDetailsInput(((ApplicationNode) readyNode), wfNodeDetails);
} else if (readyNode instanceof WorkflowInputNode) {
executionUnit = ExecutionUnit.INPUT;
} else if (readyNode instanceof WorkflowOutputNode) {
executionUnit = ExecutionUnit.OUTPUT;
}
wfNodeDetails.setExecutionUnit(executionUnit);
wfNodeDetails.setExecutionUnitData(executionData);
wfNodeDetails.setNodeInstanceId((String) getRegistry()
.add(ChildDataType.WORKFLOW_NODE_DETAIL, wfNodeDetails, getExperiment().getExperimentID()));
return wfNodeDetails;
}
private void setupNodeDetailsInput(ApplicationNode readyAppNode, WorkflowNodeDetails wfNodeDetails) {
if (readyAppNode.isReady()) {
for (InPort inPort : readyAppNode.getInputPorts()) {
wfNodeDetails.addToNodeInputs(inPort.getInputObject());
}
} else {
throw new IllegalArgumentException("Application node should be in ready state to set inputs to the " +
"workflow node details, nodeId = " + readyAppNode.getId());
}
}
private void processWorkflowInputNodes(List<WorkflowInputNode> wfInputNodes) {
Set<WorkflowNode> tempNodeSet = new HashSet<WorkflowNode>();
for (WorkflowInputNode wfInputNode : wfInputNodes) {
if (wfInputNode.isReady()) {
log.debug("Workflow node : " + wfInputNode.getId() + " is ready to execute");
for (Edge edge : wfInputNode.getOutPort().getOutEdges()) {
edge.getToPort().getInputObject().setValue(wfInputNode.getInputObject().getValue());
if (edge.getToPort().getNode().isReady()) {
addToReadyQueue(edge.getToPort().getNode());
log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue");
} else {
addToWaitingQueue(edge.getToPort().getNode());
log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue");
}
}
}
}
}
public List<WorkflowInputNode> getWorkflowInputNodes() throws Exception {
return workflowInputNodes;
}
public void setWorkflowInputNodes(List<WorkflowInputNode> workflowInputNodes) {
this.workflowInputNodes = workflowInputNodes;
}
private Registry getRegistry() throws RegistryException {
if (registry==null){
registry = RegistryFactory.getDefaultRegistry();
}
return registry;
}
public Experiment getExperiment() {
return experiment;
}
private void updateWorkflowNodeStatus(WorkflowNodeDetails wfNodeDetails, WorkflowNodeState state) throws RegistryException{
WorkflowNodeStatus status = ExperimentModelUtil.createWorkflowNodeStatus(state);
wfNodeDetails.setWorkflowNodeStatus(status);
getRegistry().update(RegistryModelType.WORKFLOW_NODE_STATUS, status, wfNodeDetails.getNodeInstanceId());
}
/**
* Package-Private method.
* Remove the workflow node from waiting queue and add it to the ready queue.
* @param workflowNode - Workflow Node
*/
synchronized void addToReadyQueue(WorkflowNode workflowNode) {
waitingList.remove(workflowNode.getId());
readyList.put(workflowNode.getId(), workflowNode);
}
private void addToWaitingQueue(WorkflowNode workflowNode) {
waitingList.put(workflowNode.getId(), workflowNode);
}
/**
* First remove the node from ready list and then add the WfNodeContainer to the process queue.
* Note that underline data structure of the process queue is a Map.
* @param processContext - has both workflow and correspond workflowNodeDetails and TaskDetails
*/
private synchronized void addToProcessingQueue(ProcessContext processContext) {
readyList.remove(processContext.getWorkflowNode().getId());
processingQueue.put(processContext.getTaskDetails().getTaskID(), processContext);
}
private synchronized void addToCompleteQueue(ProcessContext processContext) {
processingQueue.remove(processContext.getTaskDetails().getTaskID());
completeList.put(processContext.getTaskDetails().getTaskID(), processContext);
}
private void addToCompleteOutputNodeList(WorkflowOutputNode wfOutputNode) {
completeWorkflowOutputs.add(wfOutputNode);
readyList.remove(wfOutputNode.getId());
}
boolean isAllDone() {
return !continueWorkflow || (waitingList.isEmpty() && readyList.isEmpty() && processingQueue.isEmpty());
}
private void setExperiment(String experimentId) throws RegistryException {
experiment = (Experiment) getRegistry().get(RegistryModelType.EXPERIMENT, experimentId);
log.debug("Retrieve Experiment for experiment id : " + experimentId);
}
synchronized void handleTaskOutputChangeEvent(TaskOutputChangeEvent taskOutputChangeEvent) {
String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
log.debug("Task Output changed event received for workflow node : " +
taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
ProcessContext processContext = processingQueue.get(taskId);
Set<WorkflowNode> tempWfNodeSet = new HashSet<WorkflowNode>();
if (processContext != null) {
WorkflowNode workflowNode = processContext.getWorkflowNode();
if (workflowNode instanceof ApplicationNode) {
ApplicationNode applicationNode = (ApplicationNode) workflowNode;
// Workflow node can have one to many output ports and each output port can have one to many links
for (OutPort outPort : applicationNode.getOutputPorts()) {
for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) {
if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
outPort.getOutputObject().setValue(outputDataObjectType.getValue());
break;
}
}
for (Edge edge : outPort.getOutEdges()) {
edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
if (edge.getToPort().getNode().isReady()) {
addToReadyQueue(edge.getToPort().getNode());
}
}
}
}
addToCompleteQueue(processContext);
log.debug("removed task from processing queue : " + taskId);
try {
processReadyList();
} catch (Exception e) {
log.error("Error while processing ready workflow nodes", e);
continueWorkflow = false;
}
}
}
void handleTaskStatusChangeEvent(TaskStatusChangeEvent taskStatusChangeEvent) {
TaskState taskState = taskStatusChangeEvent.getState();
TaskIdentifier taskIdentity = taskStatusChangeEvent.getTaskIdentity();
String taskId = taskIdentity.getTaskId();
ProcessContext processContext = processingQueue.get(taskId);
if (processContext != null) {
WorkflowNodeState wfNodeState = WorkflowNodeState.INVOKED;
switch (taskState) {
case WAITING:
break;
case STARTED:
break;
case PRE_PROCESSING:
wfNodeState = WorkflowNodeState.INVOKED;
processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
break;
case INPUT_DATA_STAGING:
wfNodeState = WorkflowNodeState.INVOKED;
processContext.getWorkflowNode().setState(NodeState.PRE_PROCESSING);
break;
case EXECUTING:
wfNodeState = WorkflowNodeState.EXECUTING;
processContext.getWorkflowNode().setState(NodeState.EXECUTING);
break;
case OUTPUT_DATA_STAGING:
wfNodeState = WorkflowNodeState.COMPLETED;
processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
break;
case POST_PROCESSING:
wfNodeState = WorkflowNodeState.COMPLETED;
processContext.getWorkflowNode().setState(NodeState.POST_PROCESSING);
break;
case COMPLETED:
wfNodeState = WorkflowNodeState.COMPLETED;
processContext.getWorkflowNode().setState(NodeState.EXECUTED);
break;
case FAILED:
wfNodeState = WorkflowNodeState.FAILED;
processContext.getWorkflowNode().setState(NodeState.FAILED);
break;
case UNKNOWN:
wfNodeState = WorkflowNodeState.UNKNOWN;
break;
case CONFIGURING_WORKSPACE:
wfNodeState = WorkflowNodeState.COMPLETED;
break;
case CANCELED:
case CANCELING:
wfNodeState = WorkflowNodeState.CANCELED;
processContext.getWorkflowNode().setState(NodeState.FAILED);
break;
default:
break;
}
if (wfNodeState != WorkflowNodeState.UNKNOWN) {
try {
updateWorkflowNodeStatus(processContext.getWfNodeDetails(), wfNodeState);
} catch (RegistryException e) {
log.error("Error while updating workflow node status update to the registry. nodeInstanceId :"
+ processContext.getWfNodeDetails().getNodeInstanceId() + " status to: "
+ processContext.getWfNodeDetails().getWorkflowNodeStatus().toString() , e);
}
}
}
}
}