blob: b42e7acd34b4de0f9df2488431a08085f5444f22 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.airavata.workflow.core;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher;
import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
import org.apache.airavata.model.ComponentState;
import org.apache.airavata.model.ComponentStatus;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.messaging.event.ProcessIdentifier;
import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.*;
import org.apache.airavata.workflow.core.dag.edge.Edge;
import org.apache.airavata.workflow.core.dag.nodes.*;
import org.apache.airavata.workflow.core.parser.WorkflowParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* Package-Private class
*/
class WorkflowInterpreter {
private static final Logger log = LoggerFactory.getLogger(WorkflowInterpreter.class);
private List<InputNode> inputNodes;
private ExperimentModel experiment;
private String credentialToken;
private String gatewayName;
private String workflowString;
private Map<String, WorkflowNode> readyList = new ConcurrentHashMap<>();
private Map<String, WorkflowNode> waitingList = new ConcurrentHashMap<>();
private Map<String, WorkflowNode> processingQueue = new ConcurrentHashMap<>();
private Map<String, WorkflowNode> completeList = new HashMap<>();
private Registry registry;
private List<OutputNode> completeWorkflowOutputs = new ArrayList<>();
private RabbitMQProcessLaunchPublisher publisher;
private RabbitMQStatusConsumer statusConsumer;
private String consumerId;
private boolean continueWorkflow = true;
public WorkflowInterpreter(String experimentId, String credentialToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) throws RegistryException {
this.gatewayName = gatewayName;
setExperiment(experimentId);
this.credentialToken = credentialToken;
this.publisher = publisher;
}
public WorkflowInterpreter(ExperimentModel experiment, String credentialStoreToken, String gatewayName, RabbitMQProcessLaunchPublisher publisher) {
this.gatewayName = gatewayName;
this.experiment = experiment;
this.credentialToken = credentialStoreToken;
this.publisher = publisher;
}
/**
* Package-Private method.
*
* @throws Exception
*/
void launchWorkflow() throws Exception {
// WorkflowBuilder workflowBuilder = WorkflowFactory.getWorkflowBuilder(experiment.getExperimentId(), credentialToken, null);
workflowString = getWorkflow();
WorkflowParser workflowParser = WorkflowFactory.getWorkflowParser(workflowString);
log.debug("Initialized workflow parser");
workflowParser.parse();
setInputNodes(workflowParser.getInputNodes());
log.debug("Parsed the workflow and got the workflow input nodes");
// process workflow input nodes
processWorkflowInputNodes(getInputNodes());
if (readyList.isEmpty()) {
StringBuilder sb = new StringBuilder();
for (InputNode inputNode : inputNodes) {
sb.append(", ");
sb.append(inputNode.getInputObject().getName());
sb.append("=");
sb.append(inputNode.getInputObject().getValue());
}
throw new AiravataException("No workflow application node is in ready state to run with experiment inputs" + sb.toString());
}
processReadyList();
}
private String getWorkflow() throws AppCatalogException, WorkflowCatalogException {
WorkflowCatalog workflowCatalog = RegistryFactory.getAppCatalog().getWorkflowCatalog();
//FIXME: parse workflowTemplateId or experimentId
// workflowCatalog.getWorkflow("");
return "";
}
// try to remove synchronization tag
/**
* Package-Private method.
*
* @throws RegistryException
* @throws AiravataException
*/
void processReadyList() throws RegistryException, AiravataException {
if (readyList.isEmpty() && processingQueue.isEmpty() && !waitingList.isEmpty()) {
throw new AiravataException("No workflow application node is in ready state to run");
}
for (WorkflowNode readyNode : readyList.values()) {
if (readyNode instanceof OutputNode) {
OutputNode outputNode = (OutputNode) readyNode;
outputNode.getOutputObject().setValue(outputNode.getInPort().getInputObject().getValue());
addToCompleteOutputNodeList(outputNode);
} else if (readyNode instanceof InputNode) {
// FIXME: set input object of applications and add applications to ready List.
} else if (readyNode instanceof ApplicationNode) {
// FIXME: call orchestrator to create process for the application
} else {
throw new RuntimeException("Unsupported workflow node type");
}
}
if (processingQueue.isEmpty() && waitingList.isEmpty()) {
try {
saveWorkflowOutputs();
} catch (AppCatalogException e) {
throw new AiravataException("Error while updating completed workflow outputs to registry", e);
}
}
}
private void saveWorkflowOutputs() throws AppCatalogException {
List<OutputDataObjectType> outputDataObjects = new ArrayList<>();
for (OutputNode completeWorkflowOutput : completeWorkflowOutputs) {
outputDataObjects.add(completeWorkflowOutput.getOutputObject());
}
// FIXME: save workflow output to registry.
// RegistryFactory.getAppCatalog().getWorkflowCatalog()
// .updateWorkflowOutputs(experiment.getApplicationId(), outputDataObjects);
}
private void processWorkflowInputNodes(List<InputNode> inputNodes) {
Set<WorkflowNode> tempNodeSet = new HashSet<>();
for (InputNode inputNode : inputNodes) {
if (inputNode.isReady()) {
log.debug("Workflow node : " + inputNode.getId() + " is ready to execute");
for (Edge edge : inputNode.getOutPort().getEdges()) {
edge.getToPort().getInputObject().setValue(inputNode.getInputObject().getValue());
if (edge.getToPort().getNode().isReady()) {
addToReadyQueue(edge.getToPort().getNode());
log.debug("Added workflow node : " + edge.getToPort().getNode().getId() + " to the readyQueue");
} else {
addToWaitingQueue(edge.getToPort().getNode());
log.debug("Added workflow node " + edge.getToPort().getNode().getId() + " to the waitingQueue");
}
}
}
}
}
public List<InputNode> getInputNodes() throws Exception {
return inputNodes;
}
public void setInputNodes(List<InputNode> inputNodes) {
this.inputNodes = inputNodes;
}
private Registry getRegistry() throws RegistryException {
if (registry == null) {
registry = RegistryFactory.getRegistry();
}
return registry;
}
/**
* Package-Private method.
* Remove the workflow node from waiting queue and add it to the ready queue.
*
* @param workflowNode - Workflow Node
*/
synchronized void addToReadyQueue(WorkflowNode workflowNode) {
waitingList.remove(workflowNode.getId());
readyList.put(workflowNode.getId(), workflowNode);
}
private void addToWaitingQueue(WorkflowNode workflowNode) {
waitingList.put(workflowNode.getId(), workflowNode);
}
/**
* First remove the node from ready list and then add the WfNodeContainer to the process queue.
* Note that underline data structure of the process queue is a Map.
*
* @param applicationNode - has both workflow and correspond workflowNodeDetails and TaskDetails
*/
private synchronized void addToProcessingQueue(ApplicationNode applicationNode) {
readyList.remove(applicationNode.getId());
processingQueue.put(applicationNode.getId(), applicationNode);
}
private synchronized void addToCompleteQueue(ApplicationNode applicationNode) {
processingQueue.remove(applicationNode.getId());
completeList.put(applicationNode.getId(), applicationNode);
}
private void addToCompleteOutputNodeList(OutputNode wfOutputNode) {
completeWorkflowOutputs.add(wfOutputNode);
readyList.remove(wfOutputNode.getId());
}
boolean isAllDone() {
return !continueWorkflow || (waitingList.isEmpty() && readyList.isEmpty() && processingQueue.isEmpty());
}
private void setExperiment(String experimentId) throws RegistryException {
experiment = (ExperimentModel) getRegistry().getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, experimentId);
log.debug("Retrieve Experiment for experiment id : " + experimentId);
}
/* synchronized void handleTaskOutputChangeEvent(ProcessStatusChangeEvent taskOutputChangeEvent) {
String taskId = taskOutputChangeEvent.getTaskIdentity().getTaskId();
log.debug("Task Output changed event received for workflow node : " +
taskOutputChangeEvent.getTaskIdentity().getWorkflowNodeId() + ", task : " + taskId);
WorkflowNode workflowNode = processingQueue.get(taskId);
Set<WorkflowNode> tempWfNodeSet = new HashSet<>();
if (workflowNode != null) {
if (workflowNode instanceof ApplicationNode) {
ApplicationNode applicationNode = (ApplicationNode) workflowNode;
// Workflow node can have one to many output ports and each output port can have one to many links
for (OutPort outPort : applicationNode.getOutputPorts()) {
for (OutputDataObjectType outputDataObjectType : taskOutputChangeEvent.getOutput()) {
if (outPort.getOutputObject().getName().equals(outputDataObjectType.getName())) {
outPort.getOutputObject().setValue(outputDataObjectType.getValue());
break;
}
}
for (Edge edge : outPort.getEdges()) {
edge.getToPort().getInputObject().setValue(outPort.getOutputObject().getValue());
if (edge.getToPort().getNode().isReady()) {
addToReadyQueue(edge.getToPort().getNode());
}
}
}
addToCompleteQueue(applicationNode);
log.debug("removed task from processing queue : " + taskId);
}
try {
processReadyList();
} catch (Exception e) {
log.error("Error while processing ready workflow nodes", e);
continueWorkflow = false;
}
}
}*/
void handleProcessStatusChangeEvent(ProcessStatusChangeEvent processStatusChangeEvent) {
ProcessState processState = processStatusChangeEvent.getState();
ProcessIdentifier processIdentity = processStatusChangeEvent.getProcessIdentity();
String processId = processIdentity.getProcessId();
ApplicationNode applicationNode = (ApplicationNode) processingQueue.get(processId);
if (applicationNode != null) {
ComponentState state = applicationNode.getState();
switch (processState) {
case CREATED:
case VALIDATED:
case STARTED:
break;
case CONFIGURING_WORKSPACE:
case PRE_PROCESSING:
case INPUT_DATA_STAGING:
case EXECUTING:
case OUTPUT_DATA_STAGING:
case POST_PROCESSING:
state = ComponentState.RUNNING;
break;
case COMPLETED:
state = ComponentState.COMPLETED;
// FIXME: read output form registry and set it to node outputport then continue to next application.
break;
case FAILED:
state = ComponentState.FAILED;
// FIXME: fail workflow.
break;
case CANCELED:
case CANCELLING:
state = ComponentState.CANCELED;
// FIXME: cancel workflow.
break;
default:
break;
}
if (state != applicationNode.getState()) {
try {
updateWorkflowNodeStatus(applicationNode, new ComponentStatus(state));
} catch (RegistryException e) {
log.error("Error! Couldn't update new application state to registry. nodeInstanceId : {} "
+ applicationNode.getId() + " status to: " + applicationNode.getState().toString(), e);
}
}
}
}
private void updateWorkflowNodeStatus(ApplicationNode applicationNode, ComponentStatus componentStatus) throws RegistryException {
// FIXME: save new workflow node status to registry.
}
}