FTP transport implementation
diff --git a/transport/ftp-transport/pom.xml b/transport/ftp-transport/pom.xml
new file mode 100644
index 0000000..347cbc0
--- /dev/null
+++ b/transport/ftp-transport/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>mft-transport</artifactId>
+        <groupId>org.apache.airavata</groupId>
+        <version>0.01-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>mft-ftp-transport</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>mft-core</artifactId>
+            <version>0.01-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-net</groupId>
+            <artifactId>commons-net</artifactId>
+            <version>3.6</version>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
new file mode 100644
index 0000000..8b7c9df
--- /dev/null
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
@@ -0,0 +1,106 @@
+package org.apache.airavata.mft.transport.ftp;
+
+import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.FTPResource;
+import org.apache.airavata.mft.resource.service.FTPResourceGetRequest;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.service.FTPSecret;
+import org.apache.airavata.mft.secret.service.FTPSecretGetRequest;
+import org.apache.airavata.mft.secret.service.SecretServiceGrpc;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+
+public class FTPMetadataCollector implements MetadataCollector {
+
+    private static final Logger logger = LoggerFactory.getLogger(FTPMetadataCollector.class);
+
+    private String resourceServiceHost;
+    private int resourceServicePort;
+    private String secretServiceHost;
+    private int secretServicePort;
+    private boolean initialized = false;
+
+    @Override
+    public void init(String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) {
+        this.resourceServiceHost = resourceServiceHost;
+        this.resourceServicePort = resourceServicePort;
+        this.secretServiceHost = secretServiceHost;
+        this.secretServicePort = secretServicePort;
+        this.initialized = true;
+    }
+
+    private void checkInitialized() {
+        if (!initialized) {
+            throw new IllegalStateException("FTP Metadata Collector is not initialized");
+        }
+    }
+
+    @Override
+    public ResourceMetadata getGetResourceMetadata(String resourceId, String credentialToken) {
+
+        checkInitialized();
+        ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+        FTPResource ftpResource = resourceClient.getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort);
+        FTPSecret ftpSecret = secretClient.getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+        ResourceMetadata resourceMetadata = new ResourceMetadata();
+        FTPClient ftpClient = null;
+        try {
+            ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret);
+            logger.info("Fetching metadata for resource {} in {}", ftpResource.getResourcePath(), ftpResource.getFtpStorage().getHost());
+
+            FTPFile ftpFile = ftpClient.mlistFile(ftpResource.getResourcePath());
+
+            if (ftpFile != null) {
+                resourceMetadata.setResourceSize(ftpFile.getSize());
+                resourceMetadata.setUpdateTime(ftpFile.getTimestamp().getTimeInMillis());
+                if (ftpClient.hasFeature("MD5") && FTPReply.isPositiveCompletion(ftpClient.sendCommand("MD5 " + ftpResource.getResourcePath()))) {
+                    String[] replies = ftpClient.getReplyStrings();
+                    resourceMetadata.setMd5sum(replies[0]);
+                } else {
+                    logger.warn("MD5 fetch error out {}", ftpClient.getReplyString());
+                }
+            }
+        } catch (Exception e) {
+            logger.warn("Failed to fetch md5 for FTP resource {}", resourceId, e);
+        } finally {
+            FTPTransportUtil.disconnectFTP(ftpClient);
+        }
+
+        return resourceMetadata;
+    }
+
+    @Override
+    public Boolean isAvailable(String resourceId, String credentialToken) {
+
+        checkInitialized();
+
+        ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+        FTPResource ftpResource = resourceClient.getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort);
+        FTPSecret ftpSecret = secretClient.getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+        FTPClient ftpClient = null;
+        try {
+            ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret);
+            InputStream inputStream = ftpClient.retrieveFileStream(ftpResource.getResourcePath());
+
+            return !(inputStream == null || ftpClient.getReplyCode() == 550);
+        } catch (Exception e) {
+            logger.error("FTP client initialization failed ", e);
+            return false;
+        } finally {
+            FTPTransportUtil.disconnectFTP(ftpClient);
+        }
+    }
+}
+
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
new file mode 100644
index 0000000..46909dd
--- /dev/null
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
@@ -0,0 +1,89 @@
+package org.apache.airavata.mft.transport.ftp;
+
+import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.FTPResource;
+import org.apache.airavata.mft.resource.service.FTPResourceGetRequest;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.service.FTPSecret;
+import org.apache.airavata.mft.secret.service.FTPSecretGetRequest;
+import org.apache.airavata.mft.secret.service.SecretServiceGrpc;
+import org.apache.commons.net.ftp.FTPClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class FTPReceiver implements Connector {
+
+    private static final Logger logger = LoggerFactory.getLogger(FTPReceiver.class);
+
+    private FTPResource resource;
+    private boolean initialized;
+    private FTPClient ftpClient;
+
+    @Override
+    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+        this.initialized = true;
+
+        ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+        this.resource = resourceClient.getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+
+        SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort);
+        FTPSecret ftpSecret = secretClient.getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+        this.ftpClient = FTPTransportUtil.getFTPClient(this.resource, ftpSecret);
+    }
+
+    @Override
+    public void destroy() {
+        FTPTransportUtil.disconnectFTP(ftpClient);
+    }
+
+    @Override
+    public void startStream(ConnectorContext context) throws Exception {
+        logger.info("Starting FTP receiver stream for transfer {}", context.getTransferId());
+
+        checkInitialized();
+        OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+        InputStream inputStream = ftpClient.retrieveFileStream(resource.getResourcePath());
+
+        long fileSize = context.getMetadata().getResourceSize();
+
+        byte[] buf = new byte[1024];
+        while (true) {
+            int bufSize;
+
+            if (buf.length < fileSize) {
+                bufSize = buf.length;
+            } else {
+                bufSize = (int) fileSize;
+            }
+            bufSize = inputStream.read(buf, 0, bufSize);
+
+            if (bufSize < 0) {
+                break;
+            }
+
+            streamOs.write(buf, 0, bufSize);
+            streamOs.flush();
+
+            fileSize -= bufSize;
+            if (fileSize == 0L)
+                break;
+        }
+
+        inputStream.close();
+        streamOs.close();
+        logger.info("Completed FTP receiver stream for transfer {}", context.getTransferId());
+    }
+
+    private void checkInitialized() {
+        if (!initialized) {
+            throw new IllegalStateException("FTP Receiver is not initialized");
+        }
+    }
+}
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
new file mode 100644
index 0000000..41d4b8e
--- /dev/null
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
@@ -0,0 +1,91 @@
+package org.apache.airavata.mft.transport.ftp;
+
+import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.FTPResource;
+import org.apache.airavata.mft.resource.service.FTPResourceGetRequest;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.service.FTPSecret;
+import org.apache.airavata.mft.secret.service.FTPSecretGetRequest;
+import org.apache.airavata.mft.secret.service.SecretServiceGrpc;
+import org.apache.commons.net.ftp.FTPClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class FTPSender implements Connector {
+
+    private static final Logger logger = LoggerFactory.getLogger(FTPReceiver.class);
+
+    private FTPResource resource;
+    private boolean initialized;
+    private FTPClient ftpClient;
+
+    @Override
+    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+        this.initialized = true;
+
+        ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+        this.resource = resourceClient.getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+
+        SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort);
+        FTPSecret ftpSecret = secretClient.getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+        this.ftpClient = FTPTransportUtil.getFTPClient(this.resource, ftpSecret);
+    }
+
+    @Override
+    public void destroy() {
+        FTPTransportUtil.disconnectFTP(ftpClient);
+    }
+
+    @Override
+    public void startStream(ConnectorContext context) throws Exception {
+
+        logger.info("Starting FTP sender stream for transfer {}", context.getTransferId());
+
+        checkInitialized();
+        InputStream in = context.getStreamBuffer().getInputStream();
+        long fileSize = context.getMetadata().getResourceSize();
+        OutputStream outputStream = ftpClient.storeFileStream(resource.getResourcePath());
+
+        byte[] buf = new byte[1024];
+        while (true) {
+            int bufSize;
+
+            if (buf.length < fileSize) {
+                bufSize = buf.length;
+            } else {
+                bufSize = (int) fileSize;
+            }
+            bufSize = in.read(buf, 0, bufSize);
+
+            if (bufSize < 0) {
+                break;
+            }
+
+            outputStream.write(buf, 0, bufSize);
+            outputStream.flush();
+
+            fileSize -= bufSize;
+            if (fileSize == 0L)
+                break;
+        }
+
+        in.close();
+        outputStream.close();
+
+        logger.info("Completed FTP sender stream for transfer {}", context.getTransferId());
+
+    }
+
+    private void checkInitialized() {
+        if (!initialized) {
+            throw new IllegalStateException("FTP Sender is not initialized");
+        }
+    }
+}
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java
new file mode 100644
index 0000000..81608a4
--- /dev/null
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java
@@ -0,0 +1,35 @@
+package org.apache.airavata.mft.transport.ftp;
+
+import org.apache.airavata.mft.resource.service.FTPResource;
+import org.apache.airavata.mft.secret.service.FTPSecret;
+import org.apache.commons.net.ftp.FTPClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class FTPTransportUtil {
+
+    private static final Logger logger = LoggerFactory.getLogger(FTPTransportUtil.class);
+
+    static FTPClient getFTPClient(FTPResource ftpResource, FTPSecret ftpSecret) throws IOException {
+
+        FTPClient ftpClient = new FTPClient();
+        ftpClient.connect(ftpResource.getFtpStorage().getHost(), ftpResource.getFtpStorage().getPort());
+        ftpClient.enterLocalActiveMode();
+        ftpClient.login(ftpSecret.getUserId(), ftpSecret.getPassword());
+
+        return ftpClient;
+    }
+
+    static void disconnectFTP(FTPClient ftpClient) {
+        try {
+            if (ftpClient != null) {
+                ftpClient.logout();
+                ftpClient.disconnect();
+            }
+        } catch (Exception e) {
+            logger.error("FTP client close operation failed", e);
+        }
+    }
+}