FTP transport implementation
diff --git a/transport/ftp-transport/pom.xml b/transport/ftp-transport/pom.xml
new file mode 100644
index 0000000..347cbc0
--- /dev/null
+++ b/transport/ftp-transport/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>mft-transport</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>0.01-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>mft-ftp-transport</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>mft-core</artifactId>
+ <version>0.01-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <version>3.6</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
new file mode 100644
index 0000000..8b7c9df
--- /dev/null
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
@@ -0,0 +1,106 @@
+package org.apache.airavata.mft.transport.ftp;
+
+import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.FTPResource;
+import org.apache.airavata.mft.resource.service.FTPResourceGetRequest;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.service.FTPSecret;
+import org.apache.airavata.mft.secret.service.FTPSecretGetRequest;
+import org.apache.airavata.mft.secret.service.SecretServiceGrpc;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+
+public class FTPMetadataCollector implements MetadataCollector {
+
+ private static final Logger logger = LoggerFactory.getLogger(FTPMetadataCollector.class);
+
+ private String resourceServiceHost;
+ private int resourceServicePort;
+ private String secretServiceHost;
+ private int secretServicePort;
+ private boolean initialized = false;
+
+ @Override
+ public void init(String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) {
+ this.resourceServiceHost = resourceServiceHost;
+ this.resourceServicePort = resourceServicePort;
+ this.secretServiceHost = secretServiceHost;
+ this.secretServicePort = secretServicePort;
+ this.initialized = true;
+ }
+
+ private void checkInitialized() {
+ if (!initialized) {
+ throw new IllegalStateException("FTP Metadata Collector is not initialized");
+ }
+ }
+
+ @Override
+ public ResourceMetadata getGetResourceMetadata(String resourceId, String credentialToken) {
+
+ checkInitialized();
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+ FTPResource ftpResource = resourceClient.getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort);
+ FTPSecret ftpSecret = secretClient.getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+ ResourceMetadata resourceMetadata = new ResourceMetadata();
+ FTPClient ftpClient = null;
+ try {
+ ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret);
+ logger.info("Fetching metadata for resource {} in {}", ftpResource.getResourcePath(), ftpResource.getFtpStorage().getHost());
+
+ FTPFile ftpFile = ftpClient.mlistFile(ftpResource.getResourcePath());
+
+ if (ftpFile != null) {
+ resourceMetadata.setResourceSize(ftpFile.getSize());
+ resourceMetadata.setUpdateTime(ftpFile.getTimestamp().getTimeInMillis());
+ if (ftpClient.hasFeature("MD5") && FTPReply.isPositiveCompletion(ftpClient.sendCommand("MD5 " + ftpResource.getResourcePath()))) {
+ String[] replies = ftpClient.getReplyStrings();
+ resourceMetadata.setMd5sum(replies[0]);
+ } else {
+ logger.warn("MD5 fetch error out {}", ftpClient.getReplyString());
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to fetch md5 for FTP resource {}", resourceId, e);
+ } finally {
+ FTPTransportUtil.disconnectFTP(ftpClient);
+ }
+
+ return resourceMetadata;
+ }
+
+ @Override
+ public Boolean isAvailable(String resourceId, String credentialToken) {
+
+ checkInitialized();
+
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+ FTPResource ftpResource = resourceClient.getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort);
+ FTPSecret ftpSecret = secretClient.getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+ FTPClient ftpClient = null;
+ try {
+ ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret);
+ InputStream inputStream = ftpClient.retrieveFileStream(ftpResource.getResourcePath());
+
+ return !(inputStream == null || ftpClient.getReplyCode() == 550);
+ } catch (Exception e) {
+ logger.error("FTP client initialization failed ", e);
+ return false;
+ } finally {
+ FTPTransportUtil.disconnectFTP(ftpClient);
+ }
+ }
+}
+
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
new file mode 100644
index 0000000..46909dd
--- /dev/null
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
@@ -0,0 +1,89 @@
+package org.apache.airavata.mft.transport.ftp;
+
+import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.FTPResource;
+import org.apache.airavata.mft.resource.service.FTPResourceGetRequest;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.service.FTPSecret;
+import org.apache.airavata.mft.secret.service.FTPSecretGetRequest;
+import org.apache.airavata.mft.secret.service.SecretServiceGrpc;
+import org.apache.commons.net.ftp.FTPClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class FTPReceiver implements Connector {
+
+ private static final Logger logger = LoggerFactory.getLogger(FTPReceiver.class);
+
+ private FTPResource resource;
+ private boolean initialized;
+ private FTPClient ftpClient;
+
+ @Override
+ public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+ this.initialized = true;
+
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+ this.resource = resourceClient.getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+
+ SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort);
+ FTPSecret ftpSecret = secretClient.getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+ this.ftpClient = FTPTransportUtil.getFTPClient(this.resource, ftpSecret);
+ }
+
+ @Override
+ public void destroy() {
+ FTPTransportUtil.disconnectFTP(ftpClient);
+ }
+
+ @Override
+ public void startStream(ConnectorContext context) throws Exception {
+ logger.info("Starting FTP receiver stream for transfer {}", context.getTransferId());
+
+ checkInitialized();
+ OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+ InputStream inputStream = ftpClient.retrieveFileStream(resource.getResourcePath());
+
+ long fileSize = context.getMetadata().getResourceSize();
+
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize;
+
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = inputStream.read(buf, 0, bufSize);
+
+ if (bufSize < 0) {
+ break;
+ }
+
+ streamOs.write(buf, 0, bufSize);
+ streamOs.flush();
+
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
+ }
+
+ inputStream.close();
+ streamOs.close();
+ logger.info("Completed FTP receiver stream for transfer {}", context.getTransferId());
+ }
+
+ private void checkInitialized() {
+ if (!initialized) {
+ throw new IllegalStateException("FTP Receiver is not initialized");
+ }
+ }
+}
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
new file mode 100644
index 0000000..41d4b8e
--- /dev/null
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
@@ -0,0 +1,91 @@
+package org.apache.airavata.mft.transport.ftp;
+
+import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.FTPResource;
+import org.apache.airavata.mft.resource.service.FTPResourceGetRequest;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.service.FTPSecret;
+import org.apache.airavata.mft.secret.service.FTPSecretGetRequest;
+import org.apache.airavata.mft.secret.service.SecretServiceGrpc;
+import org.apache.commons.net.ftp.FTPClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class FTPSender implements Connector {
+
+ private static final Logger logger = LoggerFactory.getLogger(FTPReceiver.class);
+
+ private FTPResource resource;
+ private boolean initialized;
+ private FTPClient ftpClient;
+
+ @Override
+ public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+ this.initialized = true;
+
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+ this.resource = resourceClient.getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+
+ SecretServiceGrpc.SecretServiceBlockingStub secretClient = SecretServiceClient.buildClient(secretServiceHost, secretServicePort);
+ FTPSecret ftpSecret = secretClient.getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
+ this.ftpClient = FTPTransportUtil.getFTPClient(this.resource, ftpSecret);
+ }
+
+ @Override
+ public void destroy() {
+ FTPTransportUtil.disconnectFTP(ftpClient);
+ }
+
+ @Override
+ public void startStream(ConnectorContext context) throws Exception {
+
+ logger.info("Starting FTP sender stream for transfer {}", context.getTransferId());
+
+ checkInitialized();
+ InputStream in = context.getStreamBuffer().getInputStream();
+ long fileSize = context.getMetadata().getResourceSize();
+ OutputStream outputStream = ftpClient.storeFileStream(resource.getResourcePath());
+
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize;
+
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = in.read(buf, 0, bufSize);
+
+ if (bufSize < 0) {
+ break;
+ }
+
+ outputStream.write(buf, 0, bufSize);
+ outputStream.flush();
+
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
+ }
+
+ in.close();
+ outputStream.close();
+
+ logger.info("Completed FTP sender stream for transfer {}", context.getTransferId());
+
+ }
+
+ private void checkInitialized() {
+ if (!initialized) {
+ throw new IllegalStateException("FTP Sender is not initialized");
+ }
+ }
+}
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java
new file mode 100644
index 0000000..81608a4
--- /dev/null
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java
@@ -0,0 +1,35 @@
+package org.apache.airavata.mft.transport.ftp;
+
+import org.apache.airavata.mft.resource.service.FTPResource;
+import org.apache.airavata.mft.secret.service.FTPSecret;
+import org.apache.commons.net.ftp.FTPClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class FTPTransportUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(FTPTransportUtil.class);
+
+ static FTPClient getFTPClient(FTPResource ftpResource, FTPSecret ftpSecret) throws IOException {
+
+ FTPClient ftpClient = new FTPClient();
+ ftpClient.connect(ftpResource.getFtpStorage().getHost(), ftpResource.getFtpStorage().getPort());
+ ftpClient.enterLocalActiveMode();
+ ftpClient.login(ftpSecret.getUserId(), ftpSecret.getPassword());
+
+ return ftpClient;
+ }
+
+ static void disconnectFTP(FTPClient ftpClient) {
+ try {
+ if (ftpClient != null) {
+ ftpClient.logout();
+ ftpClient.disconnect();
+ }
+ } catch (Exception e) {
+ logger.error("FTP client close operation failed", e);
+ }
+ }
+}