blob: 00aeadc6af4241f59be16d326622400be4834b13 [file] [log] [blame]
package org.apache.airavata.helix.task.datain;
import org.apache.airavata.helix.api.AbstractTask;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskOutPortTypeResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
import org.apache.commons.io.FileUtils;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskResult;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.UUID;
/**
* TODO: Class level comments please
*
* @author dimuthu
* @since 1.0.0-SNAPSHOT
*/
public class DataInputTask extends AbstractTask {
public static final String NAME = "DATA_INPUT";
private String remoteSourcePath;
private String targetPath;
private String computeResourceId;
private ComputeResource computeResource;
public DataInputTask(TaskCallbackContext callbackContext) {
super(callbackContext);
}
@Override
public void init() {
this.remoteSourcePath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.REMOTE_SOURCE_PATH);
this.targetPath = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.TARGET_PATH);
this.computeResourceId = getCallbackContext().getTaskConfig().getConfigMap().get(PARAMS.COMPUTE_RESOURCE);
this.computeResource = this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
+ "/compute/" + Long.parseLong(this.computeResourceId), ComputeResource.class);
}
@Override
public TaskResult onRun() {
try {
String tempFilePath = "/tmp/" + UUID.randomUUID().toString();
System.out.println("Creating tmp file " + tempFilePath);
publishTaskStatus(TaskStatusResource.State.EXECUTING, "");
if (remoteSourcePath.startsWith("http")) {
System.out.println("Downloading text file " + remoteSourcePath);
FileUtils.copyURLToFile(new URL(remoteSourcePath), new File(tempFilePath));
} else {
publishTaskStatus(TaskStatusResource.State.FAILED, "Unsupported source type");
sendToOutPort("Error");
return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
}
System.out.println("Transferring file to remote resource");
fetchComputeResourceOperation(computeResource).transferDataIn(tempFilePath, this.targetPath, "SCP");
publishTaskStatus(TaskStatusResource.State.COMPLETED, "Task completed");
sendToOutPort("Out");
return new TaskResult(TaskResult.Status.COMPLETED, "Task completed");
} catch (Exception e) {
e.printStackTrace();
publishTaskStatus(TaskStatusResource.State.FAILED, e.getMessage());
sendToOutPort("Error");
return new TaskResult(TaskResult.Status.FATAL_FAILED, "Task failed");
}
}
@Override
public void onCancel() {
}
public static TaskTypeResource getTaskType() {
TaskTypeResource taskTypeResource = new TaskTypeResource();
taskTypeResource.setName(NAME);
taskTypeResource.setTopicName("airavata-data-collect");
taskTypeResource.setIcon("assets/icons/datain.png");
taskTypeResource.getInputTypes().addAll(
Arrays.asList(
new TaskInputTypeResource()
.setName(PARAMS.REMOTE_SOURCE_PATH)
.setType("String")
.setDefaultValue(""),
new TaskInputTypeResource()
.setName(PARAMS.TARGET_PATH)
.setType("String")
.setDefaultValue(""),
new TaskInputTypeResource()
.setName(PARAMS.COMPUTE_RESOURCE)
.setType("Long")
.setDefaultValue("")));
taskTypeResource.getOutPorts().addAll(
Arrays.asList(
new TaskOutPortTypeResource()
.setName("Out")
.setOrder(0),
new TaskOutPortTypeResource()
.setName("Error")
.setOrder(1))
);
return taskTypeResource;
}
public static final class PARAMS {
public static final String REMOTE_SOURCE_PATH = "remote_source_path";
public static final String TARGET_PATH = "target_path";
public static final String COMPUTE_RESOURCE = "compute_resource";
}
}