blob: 6090c9022a3818683f063d8581b07f2a561f3c8d [file] [log] [blame]
package org.apache.airavata.k8s.api.server.service;
import org.apache.airavata.k8s.api.resources.process.ProcessResource;
import org.apache.airavata.k8s.api.resources.workflow.WorkflowResource;
import org.apache.airavata.k8s.api.server.ServerRuntimeException;
import org.apache.airavata.k8s.api.server.model.process.ProcessModel;
import org.apache.airavata.k8s.api.server.model.task.TaskDAG;
import org.apache.airavata.k8s.api.server.model.task.TaskModel;
import org.apache.airavata.k8s.api.server.model.task.TaskOutPort;
import org.apache.airavata.k8s.api.server.model.workflow.Workflow;
import org.apache.airavata.k8s.api.server.repository.task.TaskDAGRepository;
import org.apache.airavata.k8s.api.server.repository.task.TaskOutPortRepository;
import org.apache.airavata.k8s.api.server.repository.task.TaskRepository;
import org.apache.airavata.k8s.api.server.repository.workflow.WorkflowRepository;
import org.apache.airavata.k8s.api.server.service.task.TaskService;
import org.apache.airavata.k8s.api.server.service.util.GraphParser;
import org.apache.airavata.k8s.api.server.service.util.ToResourceUtil;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
/**
* TODO: Class level comments please
*
* @author dimuthu
* @since 1.0.0-SNAPSHOT
*/
@Service
public class WorkflowService {
private ProcessService processService;
private TaskService taskService;
private WorkflowRepository workflowRepository;
private TaskOutPortRepository taskOutPortRepository;
private TaskRepository taskRepository;
private TaskDAGRepository taskDAGRepository;
public WorkflowService(ProcessService processService,
TaskService taskService,
WorkflowRepository workflowRepository,
TaskOutPortRepository taskOutPortRepository,
TaskRepository taskRepository,
TaskDAGRepository taskDAGRepository) {
this.processService = processService;
this.taskService = taskService;
this.workflowRepository = workflowRepository;
this.taskOutPortRepository = taskOutPortRepository;
this.taskRepository = taskRepository;
this.taskDAGRepository = taskDAGRepository;
}
public long createWorkflow(WorkflowResource resource) {
Workflow workflow = new Workflow();
workflow.setName(resource.getName());
workflow.setWorkFlowGraph(resource.getWorkflowGraphXML().getBytes());
Workflow saved = workflowRepository.save(workflow);
return saved.getId();
}
public long launchWorkflow(long id) {
Workflow workflow = this.workflowRepository.findById(id)
.orElseThrow(() -> new ServerRuntimeException("Workflow with id " + id + "can not be found"));
long processId = processService.create(new ProcessResource()
.setName("Workflow Process : " + workflow.getName() + "-" + UUID.randomUUID().toString())
.setCreationTime(System.currentTimeMillis()).setProcessType("WORKFLOW"));
try {
GraphParser.ParseResult parseResult = GraphParser.parse(new String(workflow.getWorkFlowGraph()));
parseResult.getTasks().forEach((mockId, task) -> {
task.getTaskResource().setParentProcessId(processId);
for (GraphParser.OutPort outPort : task.getOutPorts().values()) {
if (outPort.getNextPort() != null && outPort.getNextPort().getParentOperation() != null) {
if ("Stop".equals(outPort.getNextPort().getParentOperation().getName())) {
task.getTaskResource().setStoppingTask(true);
}
}
}
for (GraphParser.InPort inPort : task.getInPorts().values()) {
for (GraphParser.OutPort outPort : inPort.getPreviousPorts().values()) {
if (outPort.getParentOperation() != null && "Start".equals(outPort.getParentOperation().getName())) {
task.getTaskResource().setStartingTask(true);
}
}
}
long taskId = taskService.create(task.getTaskResource());
task.getTaskResource().setId(taskId);
});
parseResult.getEdgeCache().forEach(((outPort, inPort) -> {
if (outPort.getParentOperation() != null && outPort.getNextPort().getParentOperation() != null) {
Optional<TaskOutPort> sourceOutPort = taskOutPortRepository
.findByReferenceIdAndTaskModel_Id(outPort.getId(), outPort.getParentTask().getId());
Optional<TaskModel> targetTask = taskRepository.findById(inPort.getParentTask().getId());
taskDAGRepository.save(new TaskDAG()
.setSourceOutPort(sourceOutPort.get())
.setTargetTask(targetTask.get()));
}
}));
} catch (Exception e) {
throw new ServerRuntimeException("Failed to create workflow", e);
}
return 0;
}
public List<WorkflowResource> getAll() {
List<WorkflowResource> workflowResources = new ArrayList<>();
Iterable<Workflow> workFlows = this.workflowRepository.findAll();
Optional.ofNullable(workFlows)
.ifPresent(wfs -> wfs.forEach(wf -> workflowResources.add(ToResourceUtil.toResource(wf).get())));
return workflowResources;
}
public Optional<Workflow> findEntityById(long id) {
return this.workflowRepository.findById(id);
}
}