Adding data movement task with MFT integration
diff --git a/modules/airavata-apis/airavata-apis-server/pom.xml b/modules/airavata-apis/airavata-apis-server/pom.xml
index 78e7f3b..03802bd 100644
--- a/modules/airavata-apis/airavata-apis-server/pom.xml
+++ b/modules/airavata-apis/airavata-apis-server/pom.xml
@@ -31,9 +31,18 @@
                     <groupId>com.google.protobuf</groupId>
                     <artifactId>protobuf-java</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.bouncycastle</groupId>
+                    <artifactId>bcprov-jdk15on</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15on</artifactId>
+            <version>${bouncy.castle.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>mft-agent-service</artifactId>
             <version>0.01-SNAPSHOT</version>
diff --git a/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/scheduling/ExperimentLauncher.java b/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/scheduling/ExperimentLauncher.java
index e0c6472..f7e2cdc 100644
--- a/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/scheduling/ExperimentLauncher.java
+++ b/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/scheduling/ExperimentLauncher.java
@@ -74,11 +74,6 @@
 
         Map<String, BaseTask> taskMap = new HashMap<>();
 
-        DataMovementTask dataMovementTask = new DataMovementTask();
-        dataMovementTask.setTaskId(UUID.randomUUID().toString());
-
-        taskMap.put(dataMovementTask.getTaskId(), dataMovementTask);
-
         EC2Backend ec2Backend = EC2Backend.newBuilder()
                 .setAwsCredentialId(s3Secret.getSecretId())
                 .setLoginUserName("ubuntu")
@@ -92,9 +87,31 @@
         ec2InstanceTask.setSecretServiceHost("localhost");
         ec2InstanceTask.setSecretServicePort(7002);
         ec2InstanceTask.setUserToken("token");
-
         taskMap.put(ec2InstanceTask.getTaskId(), ec2InstanceTask);
 
+        DataMovementTask dataMovementTask = new DataMovementTask();
+        dataMovementTask.setTaskId(UUID.randomUUID().toString());
+        dataMovementTask.setSecretServiceHost("localhost");
+        dataMovementTask.setSecretServicePort(7002);
+        dataMovementTask.setTransferServiceHost("localhost");
+        dataMovementTask.setTransferServicePort(7002);
+        dataMovementTask.setResourceServiceHost("localhost");
+        dataMovementTask.setResourceServicePort(7002);
+        dataMovementTask.setUserToken("token");
+        dataMovementTask.setSourceStorageId("504643b6-f813-4aa1-8e66-2533cb4f837c");
+        dataMovementTask.setSourceCredentialId("");
+        dataMovementTask.setSourcePath("/Users/dwannipu/Downloads/IMG-9309.jpg");
+        dataMovementTask.setDestinationPath("/tmp/IMG-9309.jpg");
+        dataMovementTask.setDestinationStorageId("");
+        dataMovementTask.setDestinationCredentialId("");
+        dataMovementTask.overrideParameterFromWorkflowContext("destinationStorageId", // Loading context parameter from previous Task
+                CreateEC2InstanceTask.EC2_INSTANCE_STORAGE_ID);
+        dataMovementTask.overrideParameterFromWorkflowContext("destinationCredentialId",
+                CreateEC2InstanceTask.EC2_INSTANCE_SECRET_ID);
+
+
+        taskMap.put(dataMovementTask.getTaskId(), dataMovementTask);
+
         DestroyEC2InstanceTask destroyEC2InstanceTask = new DestroyEC2InstanceTask();
         destroyEC2InstanceTask.setTaskId(UUID.randomUUID().toString());
         destroyEC2InstanceTask.setEc2Backend(ec2Backend);
@@ -104,12 +121,11 @@
         destroyEC2InstanceTask.setInstanceId(""); // Override by workflow
         destroyEC2InstanceTask.overrideParameterFromWorkflowContext("instanceId", CreateEC2InstanceTask.EC2_INSTANCE_ID);
 
-        taskMap.put(destroyEC2InstanceTask.getTaskId(), destroyEC2InstanceTask);
+        //taskMap.put(destroyEC2InstanceTask.getTaskId(), destroyEC2InstanceTask);
 
-        dataMovementTask.addOutPort(new OutPort().setNextTaskId(ec2InstanceTask.getTaskId()));
-        ec2InstanceTask.addOutPort(new OutPort().setNextTaskId(destroyEC2InstanceTask.getTaskId()));
+        ec2InstanceTask.addOutPort(new OutPort().setNextTaskId(dataMovementTask.getTaskId()));
 
-        String[] startTaskIds = {dataMovementTask.getTaskId()};
+        String[] startTaskIds = {ec2InstanceTask.getTaskId()};
         logger.info("Submitting workflow");
         launcher.buildAndRunWorkflow(taskMap, startTaskIds);
     }
diff --git a/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/data/DataMovementTask.java b/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/data/DataMovementTask.java
index 0a47926..3f5fa7d 100644
--- a/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/data/DataMovementTask.java
+++ b/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/data/DataMovementTask.java
@@ -2,24 +2,231 @@
 
 import org.apache.airavata.apis.workflow.task.common.BaseTask;
 import org.apache.airavata.apis.workflow.task.common.annotation.TaskDef;
+import org.apache.airavata.apis.workflow.task.common.annotation.TaskParam;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.*;
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.UserTokenAuth;
 import org.apache.helix.task.TaskResult;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.TimeUnit;
+
 @TaskDef(name = "DataMovementTask")
 public class DataMovementTask extends BaseTask {
 
     private final static Logger logger = LoggerFactory.getLogger(DataMovementTask.class);
 
+    @TaskParam(name = "secretServiceHost")
+    private ThreadLocal<String> secretServiceHost = new ThreadLocal<>();
+
+    @TaskParam(name = "secretServicePort")
+    private ThreadLocal<Integer> secretServicePort = new ThreadLocal<>();
+
+    @TaskParam(name = "resourceServiceHost")
+    private ThreadLocal<String> resourceServiceHost = new ThreadLocal<>();
+
+    @TaskParam(name = "resourceServicePort")
+    private ThreadLocal<Integer> resourceServicePort = new ThreadLocal<>();
+
+    @TaskParam(name = "transferServiceHost")
+    private ThreadLocal<String> transferServiceHost = new ThreadLocal<>();
+
+    @TaskParam(name = "transferServicePort")
+    private ThreadLocal<Integer> transferServicePort = new ThreadLocal<>();
+    
+    @TaskParam(name = "sourceStorageId")
+    private ThreadLocal<String> sourceStorageId = new ThreadLocal<>();
+
+    @TaskParam(name = "sourceCredentialId")
+    private ThreadLocal<String> sourceCredentialId = new ThreadLocal<>();
+
+    @TaskParam(name = "sourcePath")
+    private ThreadLocal<String> sourcePath = new ThreadLocal<>();
+
+    @TaskParam(name = "destinationStorageId")
+    private ThreadLocal<String> destinationStorageId = new ThreadLocal<>();
+
+    @TaskParam(name = "destinationCredentialId")
+    private ThreadLocal<String> destinationCredentialId = new ThreadLocal<>();
+
+    @TaskParam(name = "destinationPath")
+    private ThreadLocal<String> destinationPath = new ThreadLocal<>();
+
+    @TaskParam(name = "userToken")
+    private ThreadLocal<String> userToken = new ThreadLocal<>();
+
+    @TaskParam(name = "ignoreFailure")
+    private ThreadLocal<Boolean> ignoreFailure = ThreadLocal.withInitial(() -> Boolean.FALSE);
+
     @Override
     public TaskResult onRun() throws Exception {
         logger.info("Starting Data Movement task {}", getTaskId());
 
-        return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+        try (MFTApiClient mftClient = MFTApiClient.MFTApiClientBuilder.newBuilder()
+                .withResourceServicePort(getResourceServicePort())
+                .withTransferServicePort(getTransferServicePort())
+                .withSecretServicePort(getSecretServicePort())
+                .withTransferServiceHost(getTransferServiceHost())
+                .withSecretServiceHost(getSecretServiceHost())
+                .withResourceServiceHost(getResourceServiceHost()).build()) {
+
+            AuthToken authToken = AuthToken.newBuilder()
+                    .setUserTokenAuth(UserTokenAuth.newBuilder()
+                            .setToken(getUserToken()).build()).build();
+            TransferApiResponse transferResp = mftClient.getTransferClient().submitTransfer(TransferApiRequest.newBuilder()
+                    .setMftAuthorizationToken(authToken)
+                    .setSourceStorageId(getSourceStorageId())
+                    .setSourceSecretId(getSourceCredentialId())
+                    .setDestinationStorageId(getDestinationStorageId())
+                    .setDestinationSecretId(getDestinationCredentialId())
+                    .addEndpointPaths(EndpointPaths.newBuilder()
+                            .setSourcePath(getSourcePath())
+                            .setDestinationPath(getDestinationPath()).build()).build());
+
+            logger.info("Submitted transfer request {} for source path {} in storage {} and destination path {} in storage {}",
+                    transferResp.getTransferId(), getSourcePath(), getSourceStorageId(), getDestinationPath(), getDestinationStorageId());
+
+            Awaitility.with().pollInterval(Duration.of(2, ChronoUnit.SECONDS)).await().atMost(100, TimeUnit.SECONDS).until(() ->  {
+                TransferStateSummaryResponse transferState = mftClient.getTransferClient().getTransferStateSummary(TransferStateApiRequest.newBuilder()
+                        .setMftAuthorizationToken(authToken)
+                        .setTransferId(transferResp.getTransferId()).build());
+                logger.info("Transfer state for transfer {} is {}", transferResp.getTransferId(), transferState.getState());
+                return transferState.getState().equals("COMPLETED") || transferState.getState().equals("FAILED");
+            });
+
+            TransferStateSummaryResponse finalState = mftClient.getTransferClient().getTransferStateSummary(TransferStateApiRequest.newBuilder()
+                    .setMftAuthorizationToken(authToken)
+                    .setTransferId(transferResp.getTransferId()).build());
+
+            if (finalState.getState().equals("COMPLETED") || getIgnoreFailure()) {
+                return new TaskResult(TaskResult.Status.COMPLETED, "Completed");
+            } else {
+                logger.info("Transfer {} was {}. Exiting task..", transferResp.getTransferId(), finalState.getState());
+                return new TaskResult(TaskResult.Status.FAILED, "Failed");
+            }
+        }
     }
 
     @Override
     public void onCancel() throws Exception {
 
     }
+
+    public String getSourceStorageId() {
+        return sourceStorageId.get();
+    }
+
+    public void setSourceStorageId(String sourceStorageId) {
+        this.sourceStorageId.set(sourceStorageId);
+    }
+
+    public String getSourceCredentialId() {
+        return sourceCredentialId.get();
+    }
+
+    public void setSourceCredentialId(String sourceCredentialId) {
+        this.sourceCredentialId.set(sourceCredentialId);
+    }
+
+    public String getSourcePath() {
+        return sourcePath.get();
+    }
+
+    public void setSourcePath(String sourcePath) {
+        this.sourcePath.set(sourcePath);
+    }
+
+    public String getDestinationStorageId() {
+        return destinationStorageId.get();
+    }
+
+    public void setDestinationStorageId(String destinationStorageId) {
+        this.destinationStorageId.set(destinationStorageId);
+    }
+
+    public String getDestinationCredentialId() {
+        return destinationCredentialId.get();
+    }
+
+    public void setDestinationCredentialId(String destinationCredentialId) {
+        this.destinationCredentialId.set(destinationCredentialId);
+    }
+
+    public String getDestinationPath() {
+        return destinationPath.get();
+    }
+
+    public void setDestinationPath(String destinationPath) {
+        this.destinationPath.set(destinationPath);
+    }
+
+    public String getUserToken() {
+        return userToken.get();
+    }
+
+    public void setUserToken(String userToken) {
+        this.userToken.set(userToken);
+    }
+
+    public Boolean getIgnoreFailure() {
+        return ignoreFailure.get();
+    }
+
+    public void setIgnoreFailure(Boolean ignoreFailure) {
+        this.ignoreFailure.set(ignoreFailure);
+    }
+
+    public String getSecretServiceHost() {
+        return secretServiceHost.get();
+    }
+
+    public void setSecretServiceHost(String secretServiceHost) {
+        this.secretServiceHost.set( secretServiceHost);
+    }
+
+    public Integer getSecretServicePort() {
+        return secretServicePort.get();
+    }
+
+    public void setSecretServicePort(Integer secretServicePort) {
+        this.secretServicePort.set(secretServicePort);
+    }
+
+    public String getResourceServiceHost() {
+        return resourceServiceHost.get();
+    }
+
+    public void setResourceServiceHost(String resourceServiceHost) {
+        this.resourceServiceHost.set( resourceServiceHost);
+    }
+
+    public Integer getResourceServicePort() {
+        return resourceServicePort.get();
+    }
+
+    public void setResourceServicePort(Integer resourceServicePort) {
+        this.resourceServicePort.set(resourceServicePort);
+    }
+
+    public String getTransferServiceHost() {
+        return transferServiceHost.get();
+    }
+
+    public void setTransferServiceHost(String transferServiceHost) {
+        this.transferServiceHost.set( transferServiceHost);
+    }
+
+    public Integer getTransferServicePort() {
+        return transferServicePort.get();
+    }
+
+    public void setTransferServicePort(Integer transferServicePort) {
+        this.transferServicePort.set(transferServicePort);
+    }
+
 }
diff --git a/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/ec2/CreateEC2InstanceTask.java b/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/ec2/CreateEC2InstanceTask.java
index bfac540..27fa2c1 100644
--- a/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/ec2/CreateEC2InstanceTask.java
+++ b/modules/airavata-apis/airavata-apis-server/src/main/java/org/apache/airavata/apis/workflow/task/ec2/CreateEC2InstanceTask.java
@@ -17,6 +17,11 @@
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecretCreateRequest;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
+import org.apache.airavata.mft.resource.client.StorageServiceClient;
+import org.apache.airavata.mft.resource.client.StorageServiceClientBuilder;
+import org.apache.airavata.mft.resource.service.scp.SCPStorageServiceGrpc;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageCreateRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.apache.helix.task.TaskResult;
@@ -34,6 +39,7 @@
 public class CreateEC2InstanceTask extends BaseTask {
 
     public static final String EC2_INSTANCE_SECRET_ID = "EC2_INSTANCE_SECRET_ID";
+    public static final String EC2_INSTANCE_STORAGE_ID = "EC2_INSTANCE_STORAGE_ID";
     public static final String EC2_INSTANCE_ID = "EC2_INSTANCE_ID";
     public static final String EC2_INSTANCE_IP = "EC2_INSTANCE_IP";
 
@@ -218,6 +224,14 @@
 
                 logger.info("Waiting 30 seconds until the ssh interface comes up in instance {}", instanceId);
                 Thread.sleep(30000);
+                try (StorageServiceClient storageClient = StorageServiceClientBuilder
+                        .buildClient("localhost", 7002)) {
+                    SCPStorage scpStorage = storageClient.scp().createSCPStorage(SCPStorageCreateRequest.newBuilder()
+                            .setHost(publicIpAddress)
+                            .setPort(22)
+                            .setName("EC2 Instance for task " + getTaskId()).build());
+                    putUserContent(EC2_INSTANCE_STORAGE_ID, scpStorage.getStorageId(), Scope.WORKFLOW);
+                }
                 logger.info("EC2 Instance is running...");
 
             } catch (Exception e) {
diff --git a/modules/airavata-apis/airavata-apis-server/src/main/resources/application.properties b/modules/airavata-apis/airavata-apis-server/src/main/resources/application.properties
index e186b72..7ef74fc 100644
--- a/modules/airavata-apis/airavata-apis-server/src/main/resources/application.properties
+++ b/modules/airavata-apis/airavata-apis-server/src/main/resources/application.properties
@@ -16,5 +16,12 @@
 #
 
 spring.main.allow-bean-definition-overriding=true
+spring.datasource.url=jdbc:h2:~/mft_db;DB_CLOSE_ON_EXIT=FALSE;DB_CLOSE_DELAY=-1;
+spring.jpa.hibernate.ddl-auto=update
+
 agent.id=local-agent
-agent.transport.directory=plugins
\ No newline at end of file
+agent.transport.directory=plugins
+resource.service.host=localhost
+resource.service.port=7002
+secret.service.host=localhost
+secret.service.port=7002
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 83687db..23d1bf4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,7 +115,7 @@
         <cargo.version>1.3.1</cargo.version>
         <oa4mp.version>1.1.3</oa4mp.version>
         <antrun.version>1.8</antrun.version>
-        <bouncy.castle.version>1.56</bouncy.castle.version>
+        <bouncy.castle.version>1.69</bouncy.castle.version>
         <ebay.cors.filter>1.0.0</ebay.cors.filter>
         <thrift.version>0.18.1</thrift.version>
         <mysql.connector.version>5.1.34</mysql.connector.version>