Adding data movement task with MFT integration
diff --git a/modules/airavata-apis/airavata-apis-server/pom.xml b/modules/airavata-apis/airavata-apis-server/pom.xml
index 78e7f3b..03802bd 100644
--- a/modules/airavata-apis/airavata-apis-server/pom.xml
+++ b/modules/airavata-apis/airavata-apis-server/pom.xml
@@ -31,9 +31,18 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ <version>${bouncy.castle.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.airavata</groupId>
<artifactId>mft-agent-service</artifactId>
<version>0.01-SNAPSHOT</version>
diff --git a/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/scheduling/ExperimentLauncher.java b/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/scheduling/ExperimentLauncher.java
index e0c6472..f7e2cdc 100644
--- a/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/scheduling/ExperimentLauncher.java
+++ b/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/scheduling/ExperimentLauncher.java
@@ -74,11 +74,6 @@
Map<String, BaseTask> taskMap = new HashMap<>();
- DataMovementTask dataMovementTask = new DataMovementTask();
- dataMovementTask.setTaskId(UUID.randomUUID().toString());
-
- taskMap.put(dataMovementTask.getTaskId(), dataMovementTask);
-
EC2Backend ec2Backend = EC2Backend.newBuilder()
.setAwsCredentialId(s3Secret.getSecretId())
.setLoginUserName("ubuntu")
@@ -92,9 +87,31 @@
ec2InstanceTask.setSecretServiceHost("localhost");
ec2InstanceTask.setSecretServicePort(7002);
ec2InstanceTask.setUserToken("token");
-
taskMap.put(ec2InstanceTask.getTaskId(), ec2InstanceTask);
+ DataMovementTask dataMovementTask = new DataMovementTask();
+ dataMovementTask.setTaskId(UUID.randomUUID().toString());
+ dataMovementTask.setSecretServiceHost("localhost");
+ dataMovementTask.setSecretServicePort(7002);
+ dataMovementTask.setTransferServiceHost("localhost");
+ dataMovementTask.setTransferServicePort(7002);
+ dataMovementTask.setResourceServiceHost("localhost");
+ dataMovementTask.setResourceServicePort(7002);
+ dataMovementTask.setUserToken("token");
+ dataMovementTask.setSourceStorageId("504643b6-f813-4aa1-8e66-2533cb4f837c");
+ dataMovementTask.setSourceCredentialId("");
+ dataMovementTask.setSourcePath("/Users/dwannipu/Downloads/IMG-9309.jpg");
+ dataMovementTask.setDestinationPath("/tmp/IMG-9309.jpg");
+ dataMovementTask.setDestinationStorageId("");
+ dataMovementTask.setDestinationCredentialId("");
+ dataMovementTask.overrideParameterFromWorkflowContext("destinationStorageId", // Loading context parameter from previous Task
+ CreateEC2InstanceTask.EC2_INSTANCE_STORAGE_ID);
+ dataMovementTask.overrideParameterFromWorkflowContext("destinationCredentialId",
+ CreateEC2InstanceTask.EC2_INSTANCE_SECRET_ID);
+
+
+ taskMap.put(dataMovementTask.getTaskId(), dataMovementTask);
+
DestroyEC2InstanceTask destroyEC2InstanceTask = new DestroyEC2InstanceTask();
destroyEC2InstanceTask.setTaskId(UUID.randomUUID().toString());
destroyEC2InstanceTask.setEc2Backend(ec2Backend);
@@ -104,12 +121,11 @@
destroyEC2InstanceTask.setInstanceId(""); // Override by workflow
destroyEC2InstanceTask.overrideParameterFromWorkflowContext("instanceId", CreateEC2InstanceTask.EC2_INSTANCE_ID);
- taskMap.put(destroyEC2InstanceTask.getTaskId(), destroyEC2InstanceTask);
+ //taskMap.put(destroyEC2InstanceTask.getTaskId(), destroyEC2InstanceTask);
- dataMovementTask.addOutPort(new OutPort().setNextTaskId(ec2InstanceTask.getTaskId()));
- ec2InstanceTask.addOutPort(new OutPort().setNextTaskId(destroyEC2InstanceTask.getTaskId()));
+ ec2InstanceTask.addOutPort(new OutPort().setNextTaskId(dataMovementTask.getTaskId()));
- String[] startTaskIds = {dataMovementTask.getTaskId()};
+ String[] startTaskIds = {ec2InstanceTask.getTaskId()};
logger.info("Submitting workflow");
launcher.buildAndRunWorkflow(taskMap, startTaskIds);
}
diff --git a/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/data/DataMovementTask.java b/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/data/DataMovementTask.java
index 0a47926..3f5fa7d 100644
--- a/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/data/DataMovementTask.java
+++ b/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/data/DataMovementTask.java
@@ -2,24 +2,231 @@
import org.apache.airavata.apis.workflow.task.common.BaseTask;
import org.apache.airavata.apis.workflow.task.common.annotation.TaskDef;
+import org.apache.airavata.apis.workflow.task.common.annotation.TaskParam;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.*;
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.UserTokenAuth;
import org.apache.helix.task.TaskResult;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.TimeUnit;
+
@TaskDef(name = "DataMovementTask")
public class DataMovementTask extends BaseTask {
private final static Logger logger = LoggerFactory.getLogger(DataMovementTask.class);
+ @TaskParam(name = "secretServiceHost")
+ private ThreadLocal<String> secretServiceHost = new ThreadLocal<>();
+
+ @TaskParam(name = "secretServicePort")
+ private ThreadLocal<Integer> secretServicePort = new ThreadLocal<>();
+
+ @TaskParam(name = "resourceServiceHost")
+ private ThreadLocal<String> resourceServiceHost = new ThreadLocal<>();
+
+ @TaskParam(name = "resourceServicePort")
+ private ThreadLocal<Integer> resourceServicePort = new ThreadLocal<>();
+
+ @TaskParam(name = "transferServiceHost")
+ private ThreadLocal<String> transferServiceHost = new ThreadLocal<>();
+
+ @TaskParam(name = "transferServicePort")
+ private ThreadLocal<Integer> transferServicePort = new ThreadLocal<>();
+
+ @TaskParam(name = "sourceStorageId")
+ private ThreadLocal<String> sourceStorageId = new ThreadLocal<>();
+
+ @TaskParam(name = "sourceCredentialId")
+ private ThreadLocal<String> sourceCredentialId = new ThreadLocal<>();
+
+ @TaskParam(name = "sourcePath")
+ private ThreadLocal<String> sourcePath = new ThreadLocal<>();
+
+ @TaskParam(name = "destinationStorageId")
+ private ThreadLocal<String> destinationStorageId = new ThreadLocal<>();
+
+ @TaskParam(name = "destinationCredentialId")
+ private ThreadLocal<String> destinationCredentialId = new ThreadLocal<>();
+
+ @TaskParam(name = "destinationPath")
+ private ThreadLocal<String> destinationPath = new ThreadLocal<>();
+
+ @TaskParam(name = "userToken")
+ private ThreadLocal<String> userToken = new ThreadLocal<>();
+
+ @TaskParam(name = "ignoreFailure")
+ private ThreadLocal<Boolean> ignoreFailure = ThreadLocal.withInitial(() -> Boolean.FALSE);
+
@Override
public TaskResult onRun() throws Exception {
logger.info("Starting Data Movement task {}", getTaskId());
- return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+ try (MFTApiClient mftClient = MFTApiClient.MFTApiClientBuilder.newBuilder()
+ .withResourceServicePort(getResourceServicePort())
+ .withTransferServicePort(getTransferServicePort())
+ .withSecretServicePort(getSecretServicePort())
+ .withTransferServiceHost(getTransferServiceHost())
+ .withSecretServiceHost(getSecretServiceHost())
+ .withResourceServiceHost(getResourceServiceHost()).build()) {
+
+ AuthToken authToken = AuthToken.newBuilder()
+ .setUserTokenAuth(UserTokenAuth.newBuilder()
+ .setToken(getUserToken()).build()).build();
+ TransferApiResponse transferResp = mftClient.getTransferClient().submitTransfer(TransferApiRequest.newBuilder()
+ .setMftAuthorizationToken(authToken)
+ .setSourceStorageId(getSourceStorageId())
+ .setSourceSecretId(getSourceCredentialId())
+ .setDestinationStorageId(getDestinationStorageId())
+ .setDestinationSecretId(getDestinationCredentialId())
+ .addEndpointPaths(EndpointPaths.newBuilder()
+ .setSourcePath(getSourcePath())
+ .setDestinationPath(getDestinationPath()).build()).build());
+
+ logger.info("Submitted transfer request {} for source path {} in storage {} and destination path {} in storage {}",
+ transferResp.getTransferId(), getSourcePath(), getSourceStorageId(), getDestinationPath(), getDestinationStorageId());
+
+ Awaitility.with().pollInterval(Duration.of(2, ChronoUnit.SECONDS)).await().atMost(100, TimeUnit.SECONDS).until(() -> {
+ TransferStateSummaryResponse transferState = mftClient.getTransferClient().getTransferStateSummary(TransferStateApiRequest.newBuilder()
+ .setMftAuthorizationToken(authToken)
+ .setTransferId(transferResp.getTransferId()).build());
+ logger.info("Transfer state for transfer {} is {}", transferResp.getTransferId(), transferState.getState());
+ return transferState.getState().equals("COMPLETED") || transferState.getState().equals("FAILED");
+ });
+
+ TransferStateSummaryResponse finalState = mftClient.getTransferClient().getTransferStateSummary(TransferStateApiRequest.newBuilder()
+ .setMftAuthorizationToken(authToken)
+ .setTransferId(transferResp.getTransferId()).build());
+
+ if (finalState.getState().equals("COMPLETED") || getIgnoreFailure()) {
+ return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+ } else {
+ logger.info("Transfer {} was {}. Exiting task..", transferResp.getTransferId(), finalState.getState());
+ return new TaskResult(TaskResult.Status.FAILED, "Failed");
+ }
+ }
}
@Override
public void onCancel() throws Exception {
}
+
+ public String getSourceStorageId() {
+ return sourceStorageId.get();
+ }
+
+ public void setSourceStorageId(String sourceStorageId) {
+ this.sourceStorageId.set(sourceStorageId);
+ }
+
+ public String getSourceCredentialId() {
+ return sourceCredentialId.get();
+ }
+
+ public void setSourceCredentialId(String sourceCredentialId) {
+ this.sourceCredentialId.set(sourceCredentialId);
+ }
+
+ public String getSourcePath() {
+ return sourcePath.get();
+ }
+
+ public void setSourcePath(String sourcePath) {
+ this.sourcePath.set(sourcePath);
+ }
+
+ public String getDestinationStorageId() {
+ return destinationStorageId.get();
+ }
+
+ public void setDestinationStorageId(String destinationStorageId) {
+ this.destinationStorageId.set(destinationStorageId);
+ }
+
+ public String getDestinationCredentialId() {
+ return destinationCredentialId.get();
+ }
+
+ public void setDestinationCredentialId(String destinationCredentialId) {
+ this.destinationCredentialId.set(destinationCredentialId);
+ }
+
+ public String getDestinationPath() {
+ return destinationPath.get();
+ }
+
+ public void setDestinationPath(String destinationPath) {
+ this.destinationPath.set(destinationPath);
+ }
+
+ public String getUserToken() {
+ return userToken.get();
+ }
+
+ public void setUserToken(String userToken) {
+ this.userToken.set(userToken);
+ }
+
+ public Boolean getIgnoreFailure() {
+ return ignoreFailure.get();
+ }
+
+ public void setIgnoreFailure(Boolean ignoreFailure) {
+ this.ignoreFailure.set(ignoreFailure);
+ }
+
+ public String getSecretServiceHost() {
+ return secretServiceHost.get();
+ }
+
+ public void setSecretServiceHost(String secretServiceHost) {
+ this.secretServiceHost.set( secretServiceHost);
+ }
+
+ public Integer getSecretServicePort() {
+ return secretServicePort.get();
+ }
+
+ public void setSecretServicePort(Integer secretServicePort) {
+ this.secretServicePort.set(secretServicePort);
+ }
+
+ public String getResourceServiceHost() {
+ return resourceServiceHost.get();
+ }
+
+ public void setResourceServiceHost(String resourceServiceHost) {
+ this.resourceServiceHost.set( resourceServiceHost);
+ }
+
+ public Integer getResourceServicePort() {
+ return resourceServicePort.get();
+ }
+
+ public void setResourceServicePort(Integer resourceServicePort) {
+ this.resourceServicePort.set(resourceServicePort);
+ }
+
+ public String getTransferServiceHost() {
+ return transferServiceHost.get();
+ }
+
+ public void setTransferServiceHost(String transferServiceHost) {
+ this.transferServiceHost.set( transferServiceHost);
+ }
+
+ public Integer getTransferServicePort() {
+ return transferServicePort.get();
+ }
+
+ public void setTransferServicePort(Integer transferServicePort) {
+ this.transferServicePort.set(transferServicePort);
+ }
+
}
diff --git a/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/ec2/CreateEC2InstanceTask.java b/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/ec2/CreateEC2InstanceTask.java
index bfac540..27fa2c1 100644
--- a/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/ec2/CreateEC2InstanceTask.java
+++ b/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/ec2/CreateEC2InstanceTask.java
@@ -17,6 +17,11 @@
import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecretCreateRequest;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
+import org.apache.airavata.mft.resource.service.scp.SCPStorageServiceGrpc;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageCreateRequest;
import org.apache.airavata.mft.secret.client.SecretServiceClient;
import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
import org.apache.helix.task.TaskResult;
@@ -34,6 +39,7 @@
public class CreateEC2InstanceTask extends BaseTask {
public static final String EC2_INSTANCE_SECRET_ID = "EC2_INSTANCE_SECRET_ID";
+ public static final String EC2_INSTANCE_STORAGE_ID = "EC2_INSTANCE_STORAGE_ID";
public static final String EC2_INSTANCE_ID = "EC2_INSTANCE_ID";
public static final String EC2_INSTANCE_IP = "EC2_INSTANCE_IP";
@@ -218,6 +224,14 @@
logger.info("Waiting 30 seconds until the ssh interface comes up in instance {}", instanceId);
Thread.sleep(30000);
+ try (StorageServiceClient storageClient = StorageServiceClientBuilder
+ .buildClient("localhost", 7002)) {
+ SCPStorage scpStorage = storageClient.scp().createSCPStorage(SCPStorageCreateRequest.newBuilder()
+ .setHost(publicIpAddress)
+ .setPort(22)
+ .setName("EC2 Instance for task " + getTaskId()).build());
+ putUserContent(EC2_INSTANCE_STORAGE_ID, scpStorage.getStorageId(), Scope.WORKFLOW);
+ }
logger.info("EC2 Instance is running...");
} catch (Exception e) {
diff --git a/modules/airavata-apis/airavata-apis-server/src/main/resources/application.properties b/modules/airavata-apis/airavata-apis-server/src/main/resources/application.properties
index e186b72..7ef74fc 100644
--- a/modules/airavata-apis/airavata-apis-server/src/main/resources/application.properties
+++ b/modules/airavata-apis/airavata-apis-server/src/main/resources/application.properties
@@ -16,5 +16,12 @@
#
spring.main.allow-bean-definition-overriding=true
+spring.datasource.url=jdbc:h2:~/mft_db;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1;
+spring.jpa.hibernate.ddl-auto=update
+
agent.id=local-agent
-agent.transport.directory=plugins
\ No newline at end of file
+agent.transport.directory=plugins
+resource.service.host=localhost
+resource.service.port=7002
+secret.service.host=localhost
+secret.service.port=7002
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 83687db..23d1bf4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,7 +115,7 @@
<cargo.version>1.3.1</cargo.version>
<oa4mp.version>1.1.3</oa4mp.version>
<antrun.version>1.8</antrun.version>
- <bouncy.castle.version>1.56</bouncy.castle.version>
+ <bouncy.castle.version>1.69</bouncy.castle.version>
<ebay.cors.filter>1.0.0</ebay.cors.filter>
<thrift.version>0.18.1</thrift.version>
<mysql.connector.version>5.1.34</mysql.connector.version>