blob: 1e1a5c5697b113a6e0d97f783566a4a8058731de [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.airavata.workflow.core.parser;
import org.airavata.appcatalog.cpi.AppCatalogException;
import org.airavata.appcatalog.cpi.WorkflowCatalog;
import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.workflow.core.WorkflowParser;
import org.apache.airavata.workflow.core.dag.edge.DirectedEdge;
import org.apache.airavata.workflow.core.dag.edge.Edge;
import org.apache.airavata.workflow.core.dag.nodes.ApplicationNode;
import org.apache.airavata.workflow.core.dag.nodes.ApplicationNodeImpl;
import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNode;
import org.apache.airavata.workflow.core.dag.nodes.WorkflowInputNodeImpl;
import org.apache.airavata.workflow.core.dag.nodes.WorkflowNode;
import org.apache.airavata.workflow.core.dag.nodes.WorkflowOutputNode;
import org.apache.airavata.workflow.core.dag.nodes.WorkflowOutputNodeImpl;
import org.apache.airavata.workflow.core.dag.port.InPort;
import org.apache.airavata.workflow.core.dag.port.InputPortIml;
import org.apache.airavata.workflow.core.dag.port.OutPort;
import org.apache.airavata.workflow.core.dag.port.OutPortImpl;
import org.apache.airavata.workflow.model.component.ComponentException;
import org.apache.airavata.workflow.model.component.system.ConstantComponent;
import org.apache.airavata.workflow.model.component.system.InputComponent;
import org.apache.airavata.workflow.model.component.system.S3InputComponent;
import org.apache.airavata.workflow.model.graph.DataEdge;
import org.apache.airavata.workflow.model.graph.DataPort;
import org.apache.airavata.workflow.model.graph.GraphException;
import org.apache.airavata.workflow.model.graph.Node;
import org.apache.airavata.workflow.model.graph.impl.NodeImpl;
import org.apache.airavata.workflow.model.graph.system.OutputNode;
import org.apache.airavata.workflow.model.graph.system.SystemDataPort;
import org.apache.airavata.workflow.model.graph.ws.WSNode;
import org.apache.airavata.workflow.model.graph.ws.WSPort;
import org.apache.airavata.workflow.model.wf.Workflow;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class AiravataWorkflowParser implements WorkflowParser {
private String credentialToken ;
private Experiment experiment;
private Map<String, WorkflowNode> wfNodes = new HashMap<String, WorkflowNode>();
public AiravataWorkflowParser(String experimentId, String credentialToken) throws RegistryException {
this.experiment = getExperiment(experimentId);
this.credentialToken = credentialToken;
}
public AiravataWorkflowParser(Experiment experiment, String credentialToken) {
this.credentialToken = credentialToken;
this.experiment = experiment;
}
@Override
public List<WorkflowInputNode> parse() throws RegistryException, AppCatalogException,
ComponentException, GraphException {
return parseWorkflow(getWorkflowFromExperiment(experiment));
}
public List<WorkflowInputNode> parseWorkflow(Workflow workflow) {
List<Node> gNodes = getInputNodes(workflow);
List<WorkflowInputNode> wfInputNodes = new ArrayList<WorkflowInputNode>();
List<PortContainer> portContainers = new ArrayList<PortContainer>();
List<InputDataObjectType> experimentInputs = experiment.getExperimentInputs();
Map<String,InputDataObjectType> inputDataMap=new HashMap<String, InputDataObjectType>();
WorkflowInputNode wfInputNode = null;
for (InputDataObjectType dataObjectType : experimentInputs) {
inputDataMap.put(dataObjectType.getName(), dataObjectType);
}
for (Node gNode : gNodes) {
wfInputNode = new WorkflowInputNodeImpl(gNode.getID(), gNode.getName());
wfInputNode.setInputObject(inputDataMap.get(wfInputNode.getId()));
if (wfInputNode.getInputObject() == null) {
throw new RuntimeException("Workflow Input object is not set, workflow node id: " + wfInputNode.getId());
}
portContainers.addAll(processOutPorts(gNode, wfInputNode));
wfInputNodes.add(wfInputNode);
}
// while port container is not empty iterate graph and build the workflow DAG.
buildModel(portContainers);
return wfInputNodes;
}
private void buildModel(List<PortContainer> portContainerList) {
// end condition of recursive call.
if (portContainerList == null || portContainerList.isEmpty()) {
return ;
}
DataPort dataPort = null;
InPort inPort = null;
ApplicationNode wfApplicationNode = null;
WorkflowOutputNode wfOutputNode = null;
List<PortContainer> nextPortContainerList = new ArrayList<PortContainer>();
for (PortContainer portContainer : portContainerList) {
dataPort = portContainer.getDataPort();
inPort = portContainer.getInPort();
Node node = dataPort.getNode();
if (node instanceof WSNode) {
WSNode wsNode = (WSNode) node;
WorkflowNode wfNode = wfNodes.get(wsNode.getID());
if (wfNode == null) {
wfApplicationNode = createApplicationNode(wsNode);
wfNodes.put(wfApplicationNode.getId(), wfApplicationNode);
nextPortContainerList.addAll(processOutPorts(wsNode, wfApplicationNode));
} else if (wfNode instanceof ApplicationNode) {
wfApplicationNode = (ApplicationNode) wfNode;
} else {
throw new IllegalArgumentException("Only support for ApplicationNode implementation, but found other type for node implementation");
}
inPort.setNode(wfApplicationNode);
wfApplicationNode.addInPort(inPort);
}else if (node instanceof OutputNode) {
OutputNode oNode = (OutputNode) node;
wfOutputNode = createWorkflowOutputNode(oNode);
wfOutputNode.setInPort(inPort);
inPort.setNode(wfOutputNode);
wfNodes.put(wfOutputNode.getId(), wfOutputNode);
}
}
buildModel(nextPortContainerList);
}
private WorkflowOutputNode createWorkflowOutputNode(OutputNode oNode) {
WorkflowOutputNodeImpl workflowOutputNode = new WorkflowOutputNodeImpl(oNode.getID(), oNode.getName());
OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
outputDataObjectType.setType(oNode.getParameterType());
workflowOutputNode.setOutputObject(outputDataObjectType);
return workflowOutputNode;
}
private ApplicationNode createApplicationNode(WSNode wsNode) {
ApplicationNode applicationNode = new ApplicationNodeImpl(wsNode.getID(),
wsNode.getComponent().getApplication().getName(),
wsNode.getComponent().getApplication().getApplicationId());
return applicationNode;
}
private List<PortContainer> processOutPorts(Node node, WorkflowNode wfNode) {
OutPort outPort ;
Edge edge;
InPort inPort = null;
List<PortContainer> portContainers = new ArrayList<PortContainer>();
for (DataPort dataPort : node.getOutputPorts()) {
outPort = createOutPort(dataPort);
for (DataEdge dataEdge : dataPort.getEdges()) {
edge = new DirectedEdge();
edge.setFromPort(outPort);
outPort.addEdge(edge);
inPort = createInPort(dataEdge.getToPort());
edge.setToPort(inPort);
inPort.addEdge(edge);
portContainers.add(new PortContainer(dataEdge.getToPort(), inPort));
}
outPort.setNode(wfNode);
if (wfNode instanceof WorkflowInputNode) {
WorkflowInputNode workflowInputNode = (WorkflowInputNode) wfNode;
workflowInputNode.setOutPort(outPort);
} else if (wfNode instanceof ApplicationNode) {
ApplicationNode applicationNode = ((ApplicationNode) wfNode);
applicationNode.addOutPort(outPort);
}
}
return portContainers;
}
private OutPort createOutPort(DataPort dataPort) {
OutPortImpl outPort = new OutPortImpl(dataPort.getID());
OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
if (dataPort instanceof WSPort) {
WSPort wsPort = (WSPort) dataPort;
outputDataObjectType.setName(wsPort.getComponentPort().getName());
outputDataObjectType.setType(wsPort.getType());
}else if (dataPort instanceof SystemDataPort) {
SystemDataPort sysPort = (SystemDataPort) dataPort;
outputDataObjectType.setName(sysPort.getFromNode().getName());
outputDataObjectType.setType(sysPort.getType());
}
outPort.setOutputObject(outputDataObjectType);
return outPort;
}
private InPort createInPort(DataPort toPort) {
InPort inPort = new InputPortIml(toPort.getID());
InputDataObjectType inputDataObjectType = new InputDataObjectType();
if (toPort instanceof WSPort) {
WSPort wsPort = (WSPort) toPort;
inputDataObjectType.setName(wsPort.getName());
inputDataObjectType.setType(wsPort.getType());
inputDataObjectType.setApplicationArgument(wsPort.getComponentPort().getApplicationArgument());
inputDataObjectType.setIsRequired(!wsPort.getComponentPort().isOptional());
inputDataObjectType.setInputOrder(wsPort.getComponentPort().getInputOrder());
inPort.setDefaultValue(wsPort.getComponentPort().getDefaultValue());
}else if (toPort instanceof SystemDataPort) {
SystemDataPort sysPort = (SystemDataPort) toPort;
inputDataObjectType.setName(sysPort.getName());
inputDataObjectType.setType(sysPort.getType());
}
inPort.setInputObject(inputDataObjectType);
return inPort;
}
private InputDataObjectType getInputDataObject(DataPort dataPort) {
InputDataObjectType inputDataObject = new InputDataObjectType();
inputDataObject.setName(dataPort.getName());
if (dataPort instanceof WSPort) {
WSPort port = (WSPort) dataPort;
inputDataObject.setInputOrder(port.getComponentPort().getInputOrder());
inputDataObject.setApplicationArgument(port.getComponentPort().getApplicationArgument() == null ?
"" : port.getComponentPort().getApplicationArgument());
inputDataObject.setType(dataPort.getType());
}
return inputDataObject;
}
private OutputDataObjectType getOutputDataObject(InputDataObjectType inputObject) {
OutputDataObjectType outputDataObjectType = new OutputDataObjectType();
outputDataObjectType.setApplicationArgument(inputObject.getApplicationArgument());
outputDataObjectType.setName(inputObject.getName());
outputDataObjectType.setType(inputObject.getType());
outputDataObjectType.setValue(inputObject.getValue());
return outputDataObjectType;
}
private Experiment getExperiment(String experimentId) throws RegistryException {
Registry registry = RegistryFactory.getDefaultRegistry();
return (Experiment)registry.get(RegistryModelType.EXPERIMENT, experimentId);
}
private Workflow getWorkflowFromExperiment(Experiment experiment) throws RegistryException, AppCatalogException, GraphException, ComponentException {
WorkflowCatalog workflowCatalog = getWorkflowCatalog();
return new Workflow(workflowCatalog.getWorkflow(experiment.getApplicationId()).getGraph());
}
private WorkflowCatalog getWorkflowCatalog() throws AppCatalogException {
return AppCatalogFactory.getAppCatalog().getWorkflowCatalog();
}
private ArrayList<Node> getInputNodes(Workflow wf) {
ArrayList<Node> list = new ArrayList<Node>();
List<NodeImpl> nodes = wf.getGraph().getNodes();
for (Node node : nodes) {
String name = node.getComponent().getName();
if (InputComponent.NAME.equals(name) || ConstantComponent.NAME.equals(name) || S3InputComponent.NAME.equals(name)) {
list.add(node);
}
}
return list;
}
public Map<String, WorkflowNode> getWfNodes() {
return wfNodes;
}
}