Supporting streaming and chunked trasnport connectors
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index bec7945..fc670c5 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -65,8 +65,6 @@
private static final Logger logger = LoggerFactory.getLogger(MFTAgent.class);
- private final TransportMediator mediator = new TransportMediator();
-
@org.springframework.beans.factory.annotation.Value("${agent.id}")
private String agentId;
@@ -88,6 +86,9 @@
@org.springframework.beans.factory.annotation.Value("${agent.supported.protocols}")
private String supportedProtocols;
+ @org.springframework.beans.factory.annotation.Value("${agent.temp.data.dir}")
+ private String tempDataDir = "/tmp";
+
@org.springframework.beans.factory.annotation.Value("${resource.service.host}")
private String resourceServiceHost;
@@ -113,6 +114,9 @@
private long sessionTTLSeconds = 10;
private String session;
+ private TransportMediator mediator;
+
+
private ObjectMapper mapper = new ObjectMapper();
@Autowired
@@ -127,6 +131,7 @@
public void init() {
transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
+ mediator = new TransportMediator(tempDataDir);
}
private void acceptRPCRequests() {
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index 6822275..b891b05 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Optional;
@@ -44,6 +45,12 @@
private final ExecutorService executor = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 connections per transfer
private final ExecutorService monitorPool = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 monitors per transfer
+ private String tempDataDir = "/tmp";
+
+ public TransportMediator(String tempDataDir) {
+ this.tempDataDir = tempDataDir;
+ }
+
public void transferSingleThread(String transferId,
TransferApiRequest request,
ConnectorConfig srcCC,
@@ -65,58 +72,120 @@
.setUpdateTimeMils(System.currentTimeMillis())
.setDescription("Transfer successfully completed"));
- Optional<IncomingConnector> inConnectorOp = ConnectorResolver.resolveIncomingConnector(request.getSourceType());
- Optional<OutgoingConnector> outConnectorOp = ConnectorResolver.resolveOutgoingConnector(request.getDestinationType());
+ Optional<IncomingStreamingConnector> inStreamingConnectorOp = ConnectorResolver
+ .resolveIncomingStreamingConnector(request.getSourceType());
+ Optional<OutgoingStreamingConnector> outStreamingConnectorOp = ConnectorResolver
+ .resolveOutgoingStreamingConnector(request.getDestinationType());
- IncomingConnector inConnector = inConnectorOp
- .orElseThrow(() -> new Exception("Could not find an in connector for type " + request.getSourceType()));
+ Optional<IncomingChunkedConnector> inChunkedConnectorOp = ConnectorResolver
+ .resolveIncomingChunkedConnector(request.getSourceType());
+ Optional<OutgoingChunkedConnector> outChunkedConnectorOp = ConnectorResolver
+ .resolveOutgoingChunkedConnector(request.getDestinationType());
- OutgoingConnector outConnector = outConnectorOp
- .orElseThrow(() -> new Exception("Could not find an out connector for type " + request.getDestinationType()));
+ // Give priority for chunked transfers.
+ // TODO: Provide a preference at the API level
+ if (inChunkedConnectorOp.isPresent() && outChunkedConnectorOp.isPresent()) {
- inConnector.init(srcCC);
- outConnector.init(dstCC);
+ logger.info("Starting the chunked transfer for transfer {}", transferId);
- String srcChild = request.getSourceChildResourcePath();
- String dstChild = request.getDestinationChildResourcePath();
+ long chunkSize = 20 * 1024 * 1024L;
- InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
- OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
+ ExecutorService chunkedExecutorService = Executors.newFixedThreadPool(20);
- long count = 0;
- final AtomicLong countAtomic = new AtomicLong();
- countAtomic.set(count);
+ CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(chunkedExecutorService);
- monitorPool.submit(() -> {
- while (true) {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- // Ignore
+ long fileLength = srcCC.getMetadata().getResourceSize();
+ long uploadLength = 0L;
+ int chunkIdx = 0;
+
+ IncomingChunkedConnector inConnector = inChunkedConnectorOp
+ .orElseThrow(() -> new Exception("Could not find an in chunked connector for type " + request.getSourceType()));
+
+ OutgoingChunkedConnector outConnector = outChunkedConnectorOp
+ .orElseThrow(() -> new Exception("Could not find an out chunked connector for type " + request.getDestinationType()));
+
+ inConnector.init(srcCC);
+ outConnector.init(dstCC);
+
+ while(uploadLength < fileLength) {
+
+ long endPos = uploadLength + chunkSize;
+ if (endPos > fileLength) {
+ endPos = fileLength;
}
- if (!transferInProgress.get()) {
- logger.info("Status monitor is exiting for transfer {}", transferId);
- break;
- }
- double transferPercentage = countAtomic.get() * 100.0/ srcCC.getMetadata().getResourceSize();
- logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
- onStatusCallback.accept(transferId, new TransferState()
- .setPercentage(transferPercentage)
- .setState("RUNNING")
- .setUpdateTimeMils(System.currentTimeMillis())
- .setDescription("Transfer Progress Updated"));
+
+ String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
+ completionService.submit(new ChunkMover(inConnector, outConnector, uploadLength, endPos, chunkIdx, tempFile));
+
+ uploadLength = endPos;
+ chunkIdx++;
}
- });
- int n;
- byte[] buffer = new byte[128 * 1024];
- for(count = 0L; -1 != (n = inputStream.read(buffer)); count += (long)n) {
- outputStream.write(buffer, 0, n);
+
+ for (int i = 0; i < chunkIdx; i++) {
+ Future<Integer> future = completionService.take();
+ }
+
+ logger.info("Completed chunked transfer for transfer {}", transferId);
+
+ } else if (inStreamingConnectorOp.isPresent() && outStreamingConnectorOp.isPresent()) {
+
+ logger.info("Starting streaming transfer for transfer {}", transferId);
+ IncomingStreamingConnector inConnector = inStreamingConnectorOp
+ .orElseThrow(() -> new Exception("Could not find an in streaming connector for type " + request.getSourceType()));
+
+ OutgoingStreamingConnector outConnector = outStreamingConnectorOp
+ .orElseThrow(() -> new Exception("Could not find an out streaming connector for type " + request.getDestinationType()));
+
+ inConnector.init(srcCC);
+ outConnector.init(dstCC);
+
+ String srcChild = request.getSourceChildResourcePath();
+ String dstChild = request.getDestinationChildResourcePath();
+
+ InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
+ OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
+
+ long count = 0;
+ final AtomicLong countAtomic = new AtomicLong();
countAtomic.set(count);
- }
- inConnector.complete();
- outConnector.complete();
+ monitorPool.submit(() -> {
+ while (true) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ if (!transferInProgress.get()) {
+ logger.info("Status monitor is exiting for transfer {}", transferId);
+ break;
+ }
+ double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getResourceSize();
+ logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
+ onStatusCallback.accept(transferId, new TransferState()
+ .setPercentage(transferPercentage)
+ .setState("RUNNING")
+ .setUpdateTimeMils(System.currentTimeMillis())
+ .setDescription("Transfer Progress Updated"));
+ }
+ });
+
+ int n;
+ byte[] buffer = new byte[128 * 1024];
+ for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
+ outputStream.write(buffer, 0, n);
+ countAtomic.set(count);
+ }
+
+ inConnector.complete();
+ outConnector.complete();
+
+ logger.info("Completed s ttreaming ransfer for transfer {}", transferId);
+
+ } else {
+ throw new Exception("No matching connector found to perform the transfer");
+ }
long time = (System.currentTimeMillis() - start) / 1000;
@@ -149,4 +218,32 @@
executor.shutdown();
monitorPool.shutdown();
}
+
+ private static class ChunkMover implements Callable<Integer> {
+
+ IncomingChunkedConnector downloader;
+ OutgoingChunkedConnector uploader;
+ long uploadLength;
+ long endPos;
+ int chunkIdx;
+ String tempFile;
+
+ public ChunkMover(IncomingChunkedConnector downloader, OutgoingChunkedConnector uploader, long uploadLength,
+ long endPos, int chunkIdx, String tempFile) {
+ this.downloader = downloader;
+ this.uploader = uploader;
+ this.uploadLength = uploadLength;
+ this.endPos = endPos;
+ this.chunkIdx = chunkIdx;
+ this.tempFile = tempFile;
+ }
+
+ @Override
+ public Integer call() throws Exception {
+ downloader.downloadChunk(chunkIdx, uploadLength, endPos, tempFile);
+ uploader.uploadChunk(chunkIdx, uploadLength, endPos, tempFile);
+ new File(tempFile).delete();
+ return chunkIdx;
+ }
+ }
}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
index 1b8930c..db7a444 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
@@ -18,20 +18,30 @@
package org.apache.airavata.mft.agent.http;
import org.apache.airavata.mft.core.api.ConnectorConfig;
-import org.apache.airavata.mft.core.api.IncomingConnector;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
public class AgentHttpDownloadData {
- private IncomingConnector incomingConnector;
+ private IncomingStreamingConnector incomingStreamingConnector;
+ private IncomingChunkedConnector incomingChunkedConnector;
private ConnectorConfig connectorConfig;
private String childResourcePath;
private long createdTime = System.currentTimeMillis();
- public IncomingConnector getIncomingConnector() {
- return incomingConnector;
+ public IncomingStreamingConnector getIncomingStreamingConnector() {
+ return incomingStreamingConnector;
}
- public void setIncomingConnector(IncomingConnector incomingConnector) {
- this.incomingConnector = incomingConnector;
+ public void setIncomingStreamingConnector(IncomingStreamingConnector incomingStreamingConnector) {
+ this.incomingStreamingConnector = incomingStreamingConnector;
+ }
+
+ public IncomingChunkedConnector getIncomingChunkedConnector() {
+ return incomingChunkedConnector;
+ }
+
+ public void setIncomingChunkedConnector(IncomingChunkedConnector incomingChunkedConnector) {
+ this.incomingChunkedConnector = incomingChunkedConnector;
}
public ConnectorConfig getConnectorConfig() {
@@ -60,7 +70,8 @@
public static final class AgentHttpDownloadDataBuilder {
- private IncomingConnector incomingConnector;
+ private IncomingStreamingConnector incomingStreamingConnector;
+ private IncomingChunkedConnector incomingChunkedConnector;
private ConnectorConfig connectorConfig;
private String childResourcePath;
private long createdTime = System.currentTimeMillis();
@@ -72,8 +83,13 @@
return new AgentHttpDownloadDataBuilder();
}
- public AgentHttpDownloadDataBuilder withIncomingConnector(IncomingConnector incomingConnector) {
- this.incomingConnector = incomingConnector;
+ public AgentHttpDownloadDataBuilder withIncomingStreamingConnector(IncomingStreamingConnector incomingConnector) {
+ this.incomingStreamingConnector = incomingConnector;
+ return this;
+ }
+
+ public AgentHttpDownloadDataBuilder withIncomingChunkedConnector(IncomingChunkedConnector incomingConnector) {
+ this.incomingChunkedConnector = incomingConnector;
return this;
}
@@ -95,7 +111,8 @@
public AgentHttpDownloadData build() {
AgentHttpDownloadData agentHttpDownloadData = new AgentHttpDownloadData();
- agentHttpDownloadData.setIncomingConnector(incomingConnector);
+ agentHttpDownloadData.setIncomingStreamingConnector(incomingStreamingConnector);
+ agentHttpDownloadData.setIncomingChunkedConnector(incomingChunkedConnector);
agentHttpDownloadData.setConnectorConfig(connectorConfig);
agentHttpDownloadData.setChildResourcePath(childResourcePath);
agentHttpDownloadData.setCreatedTime(createdTime);
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
index 87153b4..655eb68 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
@@ -22,11 +22,7 @@
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedStream;
import io.netty.util.CharsetUtil;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.*;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.IncomingConnector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,11 +84,17 @@
ChannelFuture sendFileFuture;
ChannelFuture lastContentFuture;
- IncomingConnector incomingConnector = downloadData.getIncomingConnector();
- incomingConnector.init(downloadData.getConnectorConfig());
+ // TODO: Support chunked streaming
+ if (downloadData.getIncomingStreamingConnector() == null && downloadData.getIncomingChunkedConnector() != null) {
+ logger.error("Chunked data download is not yes supported in Http transport");
+ throw new Exception("Chunked data download is not yes supported in Http transport");
+ }
+
+ IncomingStreamingConnector incomingStreamingConnector = downloadData.getIncomingStreamingConnector();
+ incomingStreamingConnector.init(downloadData.getConnectorConfig());
InputStream inputStream = downloadData.getChildResourcePath().equals("")?
- incomingConnector.fetchInputStream() :
- incomingConnector.fetchInputStream(downloadData.getChildResourcePath());
+ incomingStreamingConnector.fetchInputStream() :
+ incomingStreamingConnector.fetchInputStream(downloadData.getChildResourcePath());
sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(inputStream)),
ctx.newProgressivePromise());
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
index 8e1e155..03466d4 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
@@ -28,9 +28,9 @@
import org.apache.airavata.mft.core.DirectoryResourceMetadata;
import org.apache.airavata.mft.core.FileResourceMetadata;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
-import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.core.api.ConnectorConfig;
-import org.apache.airavata.mft.core.api.IncomingConnector;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -155,9 +155,10 @@
mftAuthorizationToken = tokenBuilder.build();
metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(storeType);
- Optional<IncomingConnector> connectorOp = ConnectorResolver.resolveIncomingConnector(storeType);
+ Optional<IncomingStreamingConnector> connectorStreamingOp = ConnectorResolver.resolveIncomingStreamingConnector(storeType);
+ Optional<IncomingChunkedConnector> connectorChunkedOp = ConnectorResolver.resolveIncomingChunkedConnector(storeType);
- if (metadataCollectorOp.isPresent() && connectorOp.isPresent()) {
+ if (metadataCollectorOp.isPresent() && (connectorStreamingOp.isPresent() || connectorChunkedOp.isPresent())) {
MetadataCollector metadataCollector = metadataCollectorOp.get();
metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
@@ -168,9 +169,8 @@
childResourcePath,
sourceToken);
- AgentHttpDownloadData downloadData = AgentHttpDownloadData.AgentHttpDownloadDataBuilder.newBuilder()
+ AgentHttpDownloadData.AgentHttpDownloadDataBuilder agentHttpDownloadDataBuilder = AgentHttpDownloadData.AgentHttpDownloadDataBuilder.newBuilder()
.withChildResourcePath(childResourcePath)
- .withIncomingConnector(connectorOp.get())
.withConnectorConfig(ConnectorConfig.ConnectorConfigBuilder.newBuilder()
.withResourceServiceHost(resourceServiceHost)
.withResourceServicePort(resourceServicePort)
@@ -179,7 +179,12 @@
.withResourceId(resourceId)
.withCredentialToken(sourceToken)
.withAuthToken(mftAuthorizationToken)
- .withMetadata(fileResourceMetadata).build()).build();
+ .withMetadata(fileResourceMetadata).build());
+
+ connectorStreamingOp.ifPresent(agentHttpDownloadDataBuilder::withIncomingStreamingConnector);
+ connectorChunkedOp.ifPresent(agentHttpDownloadDataBuilder::withIncomingChunkedConnector);
+
+ AgentHttpDownloadData downloadData = agentHttpDownloadDataBuilder.build();
String url = httpTransferRequestsStore.addDownloadRequest(downloadData);
diff --git a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
index e6b5ef1..7a76bcd 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
@@ -17,14 +17,16 @@
package org.apache.airavata.mft.core;
-import org.apache.airavata.mft.core.api.IncomingConnector;
-import org.apache.airavata.mft.core.api.OutgoingConnector;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+import org.apache.airavata.mft.core.api.OutgoingChunkedConnector;
+import org.apache.airavata.mft.core.api.OutgoingStreamingConnector;
import java.util.Optional;
public final class ConnectorResolver {
- public static Optional<IncomingConnector> resolveIncomingConnector(String type) throws Exception {
+ public static Optional<IncomingStreamingConnector> resolveIncomingStreamingConnector(String type) throws Exception {
String className = null;
switch (type) {
@@ -35,26 +37,64 @@
if (className != null) {
Class<?> aClass = Class.forName(className);
- return Optional.of((IncomingConnector) aClass.getDeclaredConstructor().newInstance());
+ return Optional.of((IncomingStreamingConnector) aClass.getDeclaredConstructor().newInstance());
} else {
return Optional.empty();
}
}
- public static Optional<OutgoingConnector> resolveOutgoingConnector(String type) throws Exception {
+ public static Optional<OutgoingStreamingConnector> resolveOutgoingStreamingConnector(String type) throws Exception {
String className = null;
switch (type) {
case "SCP":
className = "org.apache.airavata.mft.transport.scp.SCPOutgoingConnector";
break;
+ case "S3":
+ className = "org.apache.airavata.mft.transport.s3.S3IncomingConnector";
+ break;
}
if (className != null) {
Class<?> aClass = Class.forName(className);
- return Optional.of((OutgoingConnector) aClass.getDeclaredConstructor().newInstance());
+ return Optional.of((OutgoingStreamingConnector) aClass.getDeclaredConstructor().newInstance());
} else {
return Optional.empty();
}
}
+
+ public static Optional<IncomingChunkedConnector> resolveIncomingChunkedConnector(String type) throws Exception {
+
+ String className = null;
+ switch (type) {
+ case "S3":
+ className = "org.apache.airavata.mft.transport.s3.S3IncomingConnector";
+ break;
+ }
+
+ if (className != null) {
+ Class<?> aClass = Class.forName(className);
+ return Optional.of((IncomingChunkedConnector) aClass.getDeclaredConstructor().newInstance());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ public static Optional<OutgoingChunkedConnector> resolveOutgoingChunkedConnector(String type) throws Exception {
+
+ String className = null;
+ switch (type) {
+ case "S3":
+ className = "org.apache.airavata.mft.transport.s3.S3OutgoingConnector";
+ break;
+ }
+
+ if (className != null) {
+ Class<?> aClass = Class.forName(className);
+ return Optional.of((OutgoingChunkedConnector) aClass.getDeclaredConstructor().newInstance());
+ } else {
+ return Optional.empty();
+ }
+ }
+
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
new file mode 100644
index 0000000..1ca72d6
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
@@ -0,0 +1,6 @@
+package org.apache.airavata.mft.core.api;
+
+public interface BasicConnector {
+ public void init(ConnectorConfig connectorConfig) throws Exception;
+ public void complete() throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
new file mode 100644
index 0000000..37288c6
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
@@ -0,0 +1,5 @@
+package org.apache.airavata.mft.core.api;
+
+public interface IncomingChunkedConnector extends BasicConnector {
+ public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
similarity index 95%
rename from core/src/main/java/org/apache/airavata/mft/core/api/IncomingConnector.java
rename to core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
index b805bf8..900b148 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
@@ -19,7 +19,7 @@
import java.io.InputStream;
-public interface IncomingConnector {
+public interface IncomingStreamingConnector {
public void init(ConnectorConfig connectorConfig) throws Exception;
public InputStream fetchInputStream() throws Exception;
public InputStream fetchInputStream(String childPath) throws Exception;
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
new file mode 100644
index 0000000..9a2f3cb
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
@@ -0,0 +1,5 @@
+package org.apache.airavata.mft.core.api;
+
+public interface OutgoingChunkedConnector extends BasicConnector {
+ public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
similarity index 86%
rename from core/src/main/java/org/apache/airavata/mft/core/api/OutgoingConnector.java
rename to core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
index 9ab495c..1e2633d 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
@@ -19,9 +19,7 @@
import java.io.OutputStream;
-public interface OutgoingConnector {
- public void init(ConnectorConfig connectorConfig) throws Exception;
+public interface OutgoingStreamingConnector extends BasicConnector {
public OutputStream fetchOutputStream() throws Exception;
public OutputStream fetchOutputStream(String childPath) throws Exception;
- public void complete() throws Exception;
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
new file mode 100644
index 0000000..03b745b
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
@@ -0,0 +1,96 @@
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
+import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+
+public class S3IncomingConnector implements IncomingChunkedConnector, IncomingStreamingConnector {
+
+ private static final Logger logger = LoggerFactory.getLogger(S3IncomingConnector.class);
+
+ private GenericResource resource;
+ private AmazonS3 s3Client;
+
+ @Override
+ public void init(ConnectorConfig cc) throws Exception {
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+ .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setResourceId(cc.getResourceId()).build());
+ }
+
+ if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
+ logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+ throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+ }
+
+ S3Storage s3Storage = resource.getS3Storage();
+
+ S3Secret s3Secret;
+
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+ cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+ s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setSecretId(cc.getCredentialToken()).build());
+
+ BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
+
+ s3Client = AmazonS3ClientBuilder.standard()
+ .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+ .withRegion(s3Storage.getRegion())
+ .build();
+ }
+ }
+
+
+ @Override
+ public InputStream fetchInputStream() throws Exception {
+ S3Object s3object = s3Client.getObject(resource.getS3Storage().getBucketName(), resource.getFile().getResourcePath());
+ return s3object.getObjectContent();
+ }
+
+ @Override
+ public InputStream fetchInputStream(String childPath) throws Exception {
+ S3Object s3object = s3Client.getObject(resource.getS3Storage().getBucketName(), childPath);
+ return s3object.getObjectContent();
+ }
+
+ @Override
+ public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception {
+ GetObjectRequest rangeObjectRequest = new GetObjectRequest(resource.getS3Storage().getBucketName(),
+ resource.getFile().getResourcePath());
+ rangeObjectRequest.setRange(startByte, endByte - 1);
+ ObjectMetadata objectMetadata = s3Client.getObject(rangeObjectRequest, new File(downloadFile));
+ logger.info("Downloaded S3 chunk to path {} for resource id {}", downloadFile, resource.getResourceId());
+ }
+
+ @Override
+ public void complete() throws Exception {
+
+ }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
new file mode 100644
index 0000000..3e95383
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -0,0 +1,101 @@
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.*;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.OutgoingChunkedConnector;
+import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
+import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class S3OutgoingConnector implements OutgoingChunkedConnector {
+
+ private static final Logger logger = LoggerFactory.getLogger(S3OutgoingConnector.class);
+
+ private GenericResource resource;
+ private AmazonS3 s3Client;
+
+ InitiateMultipartUploadResult initResponse;
+ List<PartETag> partETags = Collections.synchronizedList(new ArrayList<>());
+
+ @Override
+ public void init(ConnectorConfig cc) throws Exception {
+ try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+ .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+ resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setResourceId(cc.getResourceId()).build());
+ }
+
+ if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
+ logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+ throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+ }
+
+ S3Storage s3Storage = resource.getS3Storage();
+
+ S3Secret s3Secret;
+
+ try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+ cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+ s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder()
+ .setAuthzToken(cc.getAuthToken())
+ .setSecretId(cc.getCredentialToken()).build());
+
+ BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
+
+ s3Client = AmazonS3ClientBuilder.standard()
+ .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+ .withRegion(s3Storage.getRegion())
+ .build();
+ }
+
+ InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(resource.getS3Storage().getBucketName(),
+ resource.getFile().getResourcePath());
+ initResponse = s3Client.initiateMultipartUpload(initRequest);
+ }
+
+ @Override
+ public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception {
+ File file = new File(uploadFile);
+ UploadPartRequest uploadRequest = new UploadPartRequest()
+ .withBucketName(resource.getS3Storage().getBucketName())
+ .withKey(resource.getFile().getResourcePath())
+ .withUploadId(initResponse.getUploadId())
+ .withPartNumber(chunkId + 1)
+ .withFileOffset(0)
+ .withFile(file)
+ .withPartSize(file.length());
+
+ UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+ this.partETags.add(uploadResult.getPartETag());
+ logger.info("Uploaded S3 chunk to path {} for resource id {}", uploadFile, resource.getResourceId());
+ }
+
+
+ @Override
+ public void complete() throws Exception {
+ CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(resource.getS3Storage().getBucketName(),
+ resource.getFile().getResourcePath(), initResponse.getUploadId(), partETags);
+ s3Client.completeMultipartUpload(compRequest);
+ }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
deleted file mode 100644
index a430323..0000000
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.s3;
-
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
-import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
-import org.apache.airavata.mft.secret.client.SecretServiceClient;
-import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-
-public class S3Receiver implements Connector {
-
- private static final Logger logger = LoggerFactory.getLogger(S3Receiver.class);
-
- private AmazonS3 s3Client;
-
- private String resourceServiceHost;
- private int resourceServicePort;
- private String secretServiceHost;
- private int secretServicePort;
-
- @Override
- public void init(String resourceServiceHost, int resourceServicePort,
- String secretServiceHost, int secretServicePort) throws Exception {
-
- this.resourceServiceHost = resourceServiceHost;
- this.resourceServicePort = resourceServicePort;
- this.secretServiceHost = secretServiceHost;
- this.secretServicePort = secretServicePort;
- }
-
- @Override
- public void destroy() {
-
- }
-
- @Override
- public void startStream(AuthToken authToken, String resourceId, String credentialToken, ConnectorContext context) throws Exception {
-
- logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
-
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
- .setResourceId(resourceId).build());
-
- if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
- logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
- throw new Exception("Invalid storage type specified for resource " + resourceId);
- }
-
- S3Storage s3Storage = resource.getS3Storage();
-
- SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
- BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
-
- s3Client = AmazonS3ClientBuilder.standard()
- .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
- .withRegion(s3Storage.getRegion())
- .build();
-
- S3Object s3object = s3Client.getObject(s3Storage.getBucketName(), resource.getFile().getResourcePath());
- S3ObjectInputStream inputStream = s3object.getObjectContent();
-
- OutputStream os = context.getStreamBuffer().getOutputStream();
- int read;
- long bytes = 0;
- while ((read = inputStream.read()) != -1) {
- bytes++;
- os.write(read);
- }
- os.flush();
- os.close();
-
- logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
- }
-
- @Override
- public void startStream(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
- ConnectorContext context) throws Exception {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
deleted file mode 100644
index 0dec96d..0000000
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.s3;
-
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
-import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
-import org.apache.airavata.mft.secret.client.SecretServiceClient;
-import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class S3Sender implements Connector {
-
- private static final Logger logger = LoggerFactory.getLogger(S3Sender.class);
-
- private AmazonS3 s3Client;
-
- private String resourceServiceHost;
- private int resourceServicePort;
- private String secretServiceHost;
- private int secretServicePort;
-
- @Override
- public void init(String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
-
- this.resourceServiceHost = resourceServiceHost;
- this.resourceServicePort = resourceServicePort;
- this.secretServiceHost = secretServiceHost;
- this.secretServicePort = secretServicePort;
- }
-
- @Override
- public void destroy() {
-
- }
-
- @Override
- public void startStream(AuthToken authToken, String resourceId, String credentialToken, ConnectorContext context) throws Exception {
-
- logger.info("Starting S3 Sender stream for transfer {}", context.getTransferId());
- logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
-
- ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
- GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
- .setResourceId(resourceId).build());
-
- if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
- logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
- throw new Exception("Invalid storage type specified for resource " + resourceId);
- }
-
- S3Storage s3Storage = resource.getS3Storage();
-
- SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
- S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
- BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
-
- s3Client = AmazonS3ClientBuilder.standard()
- .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
- .withRegion(s3Storage.getRegion())
- .build();
-
- ObjectMetadata metadata = new ObjectMetadata();
- metadata.setContentLength(context.getMetadata().getResourceSize());
-
- s3Client.putObject(s3Storage.getBucketName(), resource.getFile().getResourcePath(),
- context.getStreamBuffer().getInputStream(), metadata);
- logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
- }
-
- @Override
- public void startStream(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
- ConnectorContext context) throws Exception {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
index 9cbb590..d6bacdb 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
@@ -21,7 +21,7 @@
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.Session;
import org.apache.airavata.mft.core.api.ConnectorConfig;
-import org.apache.airavata.mft.core.api.IncomingConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
@@ -38,7 +38,7 @@
import java.io.InputStream;
import java.io.OutputStream;
-public final class SCPIncomingConnector implements IncomingConnector {
+public final class SCPIncomingConnector implements IncomingStreamingConnector {
private static final Logger logger = LoggerFactory.getLogger(SCPIncomingConnector.class);
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
index 980019b..28ea5e0 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
@@ -21,7 +21,7 @@
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.Session;
import org.apache.airavata.mft.core.api.ConnectorConfig;
-import org.apache.airavata.mft.core.api.OutgoingConnector;
+import org.apache.airavata.mft.core.api.OutgoingStreamingConnector;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
@@ -38,7 +38,7 @@
import java.io.InputStream;
import java.io.OutputStream;
-public final class SCPOutgoingConnector implements OutgoingConnector {
+public final class SCPOutgoingConnector implements OutgoingStreamingConnector {
private static final Logger logger = LoggerFactory.getLogger(SCPOutgoingConnector.class);