blob: 8426c901813a5c3d820bacef063c26de64c2ff0b [file] [log] [blame]
package org.apache.airavata.helix.task.datain.dataout;
import org.apache.airavata.helix.api.AbstractTask;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskOutPortTypeResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskResult;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.util.LinkedMultiValueMap;
import java.util.Arrays;
import java.util.UUID;
/**
* TODO: Class level comments please
*
* @author dimuthu
* @since 1.0.0-SNAPSHOT
*/
public class DataOutputTask extends AbstractTask {
public static final String NAME = "DATA_OUTPUT";
private String sourcePath;
private String identifier;
private String computeResourceId;
private ComputeResource computeResource;
public DataOutputTask(TaskCallbackContext callbackContext) {
super(callbackContext);
}
@Override
public void init() {
this.sourcePath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.SOURCE_PATH);
this.identifier = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.IDENTIFIER);
this.computeResourceId = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.COMPUTE_RESOURCE);
this.computeResource = this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
+ "/compute/" + Long.parseLong(this.computeResourceId), ComputeResource.class);
}
@Override
public TaskResult onRun() {
try {
publishTaskStatus(TaskStatusResource.State.EXECUTING, "");
String temporaryFile = "/tmp/" + UUID.randomUUID().toString();
System.out.println("Downloading " + sourcePath + " to " + temporaryFile + " from compute resource "
+ computeResource.getName());
fetchComputeResourceOperation(computeResource).transferDataOut(sourcePath, temporaryFile, "SCP");
LinkedMultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
map.add("file", new FileSystemResource(temporaryFile));
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
HttpEntity<LinkedMultiValueMap<String, Object>> requestEntity = new HttpEntity<>(map, headers);
System.out.println("Uploading data file with task id " + getTaskId() + " and identifier "
+ identifier + " to data store");
getRestTemplate().exchange("http://" + getApiServerUrl() + "/data/" + getTaskId() + "/"
+ identifier + "/upload", HttpMethod.POST, requestEntity, Long.class);
publishTaskStatus(TaskStatusResource.State.COMPLETED, "Task completed");
sendToOutPort("Out");
return new TaskResult(TaskResult.Status.COMPLETED, "Task completed");
} catch (Exception e) {
e.printStackTrace();
publishTaskStatus(TaskStatusResource.State.FAILED, e.getMessage());
sendToOutPort("Error");
return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task completed");
}
}
@Override
public void onCancel() {
}
public static final class PARAMS {
public static final String SOURCE_PATH = "source_path";
public static final String COMPUTE_RESOURCE = "compute_resource";
public static final String IDENTIFIER = "identifier";
}
public static TaskTypeResource getTaskType() {
TaskTypeResource taskTypeResource = new TaskTypeResource();
taskTypeResource.setName(NAME);
taskTypeResource.setTopicName("airavata-data-collect");
taskTypeResource.setIcon("assets/icons/dataout.png");
taskTypeResource.getInputTypes().addAll(
Arrays.asList(
new TaskInputTypeResource()
.setName(PARAMS.SOURCE_PATH)
.setType("String")
.setDefaultValue(""),
new TaskInputTypeResource()
.setName(PARAMS.IDENTIFIER)
.setType("String")
.setDefaultValue(""),
new TaskInputTypeResource()
.setName(PARAMS.COMPUTE_RESOURCE)
.setType("Long")
.setDefaultValue("")));
taskTypeResource.getOutPorts().addAll(
Arrays.asList(
new TaskOutPortTypeResource()
.setName("Out")
.setOrder(0),
new TaskOutPortTypeResource()
.setName("Error")
.setOrder(1))
);
return taskTypeResource;
}
}