Supporting streaming and chunked trasnport connectors
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index bec7945..fc670c5 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -65,8 +65,6 @@
 
     private static final Logger logger = LoggerFactory.getLogger(MFTAgent.class);
 
-    private final TransportMediator mediator = new TransportMediator();
-
     @org.springframework.beans.factory.annotation.Value("${agent.id}")
     private String agentId;
 
@@ -88,6 +86,9 @@
     @org.springframework.beans.factory.annotation.Value("${agent.supported.protocols}")
     private String supportedProtocols;
 
+    @org.springframework.beans.factory.annotation.Value("${agent.temp.data.dir}")
+    private String tempDataDir = "/tmp";
+
     @org.springframework.beans.factory.annotation.Value("${resource.service.host}")
     private String resourceServiceHost;
 
@@ -113,6 +114,9 @@
     private long sessionTTLSeconds = 10;
     private String session;
 
+    private TransportMediator mediator;
+
+
     private ObjectMapper mapper = new ObjectMapper();
 
     @Autowired
@@ -127,6 +131,7 @@
     public void init() {
         transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
         rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
+        mediator = new TransportMediator(tempDataDir);
     }
 
     private void acceptRPCRequests() {
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index 6822275..b891b05 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -25,6 +25,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Optional;
@@ -44,6 +45,12 @@
     private final ExecutorService executor = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 connections per transfer
     private final ExecutorService monitorPool = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 monitors per transfer
 
+    private String tempDataDir = "/tmp";
+
+    public TransportMediator(String tempDataDir) {
+        this.tempDataDir = tempDataDir;
+    }
+
     public void transferSingleThread(String transferId,
                                      TransferApiRequest request,
                                      ConnectorConfig srcCC,
@@ -65,58 +72,120 @@
                         .setUpdateTimeMils(System.currentTimeMillis())
                         .setDescription("Transfer successfully completed"));
 
-                Optional<IncomingConnector> inConnectorOp = ConnectorResolver.resolveIncomingConnector(request.getSourceType());
-                Optional<OutgoingConnector> outConnectorOp = ConnectorResolver.resolveOutgoingConnector(request.getDestinationType());
+                Optional<IncomingStreamingConnector> inStreamingConnectorOp = ConnectorResolver
+                        .resolveIncomingStreamingConnector(request.getSourceType());
+                Optional<OutgoingStreamingConnector> outStreamingConnectorOp = ConnectorResolver
+                        .resolveOutgoingStreamingConnector(request.getDestinationType());
 
-                IncomingConnector inConnector = inConnectorOp
-                        .orElseThrow(() -> new Exception("Could not find an in connector for type " + request.getSourceType()));
+                Optional<IncomingChunkedConnector> inChunkedConnectorOp = ConnectorResolver
+                        .resolveIncomingChunkedConnector(request.getSourceType());
+                Optional<OutgoingChunkedConnector> outChunkedConnectorOp = ConnectorResolver
+                        .resolveOutgoingChunkedConnector(request.getDestinationType());
 
-                OutgoingConnector outConnector = outConnectorOp
-                        .orElseThrow(() -> new Exception("Could not find an out connector for type " + request.getDestinationType()));
+                // Give priority for chunked transfers.
+                // TODO: Provide a preference at the API level
+                if (inChunkedConnectorOp.isPresent() && outChunkedConnectorOp.isPresent()) {
 
-                inConnector.init(srcCC);
-                outConnector.init(dstCC);
+                    logger.info("Starting the chunked transfer for transfer {}", transferId);
 
-                String srcChild = request.getSourceChildResourcePath();
-                String dstChild = request.getDestinationChildResourcePath();
+                    long chunkSize = 20 * 1024 * 1024L;
 
-                InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
-                OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
+                    ExecutorService chunkedExecutorService = Executors.newFixedThreadPool(20);
 
-                long count = 0;
-                final AtomicLong countAtomic = new AtomicLong();
-                countAtomic.set(count);
+                    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(chunkedExecutorService);
 
-                monitorPool.submit(() -> {
-                    while (true) {
-                        try {
-                            Thread.sleep(2000);
-                        } catch (InterruptedException e) {
-                            // Ignore
+                    long fileLength = srcCC.getMetadata().getResourceSize();
+                    long uploadLength = 0L;
+                    int chunkIdx = 0;
+
+                    IncomingChunkedConnector inConnector = inChunkedConnectorOp
+                            .orElseThrow(() -> new Exception("Could not find an in chunked connector for type " + request.getSourceType()));
+
+                    OutgoingChunkedConnector outConnector = outChunkedConnectorOp
+                            .orElseThrow(() -> new Exception("Could not find an out chunked connector for type " + request.getDestinationType()));
+
+                    inConnector.init(srcCC);
+                    outConnector.init(dstCC);
+
+                    while(uploadLength < fileLength) {
+
+                        long endPos = uploadLength + chunkSize;
+                        if (endPos > fileLength) {
+                            endPos = fileLength;
                         }
-                        if (!transferInProgress.get()) {
-                            logger.info("Status monitor is exiting for transfer {}", transferId);
-                            break;
-                        }
-                        double transferPercentage = countAtomic.get() * 100.0/ srcCC.getMetadata().getResourceSize();
-                        logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
-                        onStatusCallback.accept(transferId, new TransferState()
-                                .setPercentage(transferPercentage)
-                                .setState("RUNNING")
-                                .setUpdateTimeMils(System.currentTimeMillis())
-                                .setDescription("Transfer Progress Updated"));
+
+                        String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
+                        completionService.submit(new ChunkMover(inConnector, outConnector, uploadLength, endPos, chunkIdx, tempFile));
+
+                        uploadLength = endPos;
+                        chunkIdx++;
                     }
-                });
 
-                int n;
-                byte[] buffer = new byte[128 * 1024];
-                for(count = 0L; -1 != (n = inputStream.read(buffer)); count += (long)n) {
-                    outputStream.write(buffer, 0, n);
+
+                    for (int i = 0; i < chunkIdx; i++) {
+                        Future<Integer> future = completionService.take();
+                    }
+
+                    logger.info("Completed chunked transfer for transfer {}", transferId);
+
+                } else if (inStreamingConnectorOp.isPresent() && outStreamingConnectorOp.isPresent()) {
+
+                    logger.info("Starting streaming transfer for transfer {}", transferId);
+                    IncomingStreamingConnector inConnector = inStreamingConnectorOp
+                            .orElseThrow(() -> new Exception("Could not find an in streaming connector for type " + request.getSourceType()));
+
+                    OutgoingStreamingConnector outConnector = outStreamingConnectorOp
+                            .orElseThrow(() -> new Exception("Could not find an out streaming connector for type " + request.getDestinationType()));
+
+                    inConnector.init(srcCC);
+                    outConnector.init(dstCC);
+
+                    String srcChild = request.getSourceChildResourcePath();
+                    String dstChild = request.getDestinationChildResourcePath();
+
+                    InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
+                    OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
+
+                    long count = 0;
+                    final AtomicLong countAtomic = new AtomicLong();
                     countAtomic.set(count);
-                }
 
-                inConnector.complete();
-                outConnector.complete();
+                    monitorPool.submit(() -> {
+                        while (true) {
+                            try {
+                                Thread.sleep(2000);
+                            } catch (InterruptedException e) {
+                                // Ignore
+                            }
+                            if (!transferInProgress.get()) {
+                                logger.info("Status monitor is exiting for transfer {}", transferId);
+                                break;
+                            }
+                            double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getResourceSize();
+                            logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
+                            onStatusCallback.accept(transferId, new TransferState()
+                                    .setPercentage(transferPercentage)
+                                    .setState("RUNNING")
+                                    .setUpdateTimeMils(System.currentTimeMillis())
+                                    .setDescription("Transfer Progress Updated"));
+                        }
+                    });
+
+                    int n;
+                    byte[] buffer = new byte[128 * 1024];
+                    for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
+                        outputStream.write(buffer, 0, n);
+                        countAtomic.set(count);
+                    }
+
+                    inConnector.complete();
+                    outConnector.complete();
+
+                    logger.info("Completed s ttreaming ransfer for transfer {}", transferId);
+
+                } else {
+                    throw new Exception("No matching connector found to perform the transfer");
+                }
 
                 long time = (System.currentTimeMillis() - start) / 1000;
 
@@ -149,4 +218,32 @@
         executor.shutdown();
         monitorPool.shutdown();
     }
+
+    private static class ChunkMover implements Callable<Integer> {
+
+        IncomingChunkedConnector downloader;
+        OutgoingChunkedConnector uploader;
+        long uploadLength;
+        long endPos;
+        int chunkIdx;
+        String tempFile;
+
+        public ChunkMover(IncomingChunkedConnector downloader, OutgoingChunkedConnector uploader, long uploadLength,
+                          long endPos, int chunkIdx, String tempFile) {
+            this.downloader = downloader;
+            this.uploader = uploader;
+            this.uploadLength = uploadLength;
+            this.endPos = endPos;
+            this.chunkIdx = chunkIdx;
+            this.tempFile = tempFile;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            downloader.downloadChunk(chunkIdx, uploadLength, endPos, tempFile);
+            uploader.uploadChunk(chunkIdx, uploadLength, endPos, tempFile);
+            new File(tempFile).delete();
+            return chunkIdx;
+        }
+    }
 }
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
index 1b8930c..db7a444 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
@@ -18,20 +18,30 @@
 package org.apache.airavata.mft.agent.http;
 
 import org.apache.airavata.mft.core.api.ConnectorConfig;
-import org.apache.airavata.mft.core.api.IncomingConnector;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
 
 public class AgentHttpDownloadData {
-    private IncomingConnector incomingConnector;
+    private IncomingStreamingConnector incomingStreamingConnector;
+    private IncomingChunkedConnector incomingChunkedConnector;
     private ConnectorConfig connectorConfig;
     private String childResourcePath;
     private long createdTime = System.currentTimeMillis();
 
-    public IncomingConnector getIncomingConnector() {
-        return incomingConnector;
+    public IncomingStreamingConnector getIncomingStreamingConnector() {
+        return incomingStreamingConnector;
     }
 
-    public void setIncomingConnector(IncomingConnector incomingConnector) {
-        this.incomingConnector = incomingConnector;
+    public void setIncomingStreamingConnector(IncomingStreamingConnector incomingStreamingConnector) {
+        this.incomingStreamingConnector = incomingStreamingConnector;
+    }
+
+    public IncomingChunkedConnector getIncomingChunkedConnector() {
+        return incomingChunkedConnector;
+    }
+
+    public void setIncomingChunkedConnector(IncomingChunkedConnector incomingChunkedConnector) {
+        this.incomingChunkedConnector = incomingChunkedConnector;
     }
 
     public ConnectorConfig getConnectorConfig() {
@@ -60,7 +70,8 @@
 
 
     public static final class AgentHttpDownloadDataBuilder {
-        private IncomingConnector incomingConnector;
+        private IncomingStreamingConnector incomingStreamingConnector;
+        private IncomingChunkedConnector incomingChunkedConnector;
         private ConnectorConfig connectorConfig;
         private String childResourcePath;
         private long createdTime = System.currentTimeMillis();
@@ -72,8 +83,13 @@
             return new AgentHttpDownloadDataBuilder();
         }
 
-        public AgentHttpDownloadDataBuilder withIncomingConnector(IncomingConnector incomingConnector) {
-            this.incomingConnector = incomingConnector;
+        public AgentHttpDownloadDataBuilder withIncomingStreamingConnector(IncomingStreamingConnector incomingConnector) {
+            this.incomingStreamingConnector = incomingConnector;
+            return this;
+        }
+
+        public AgentHttpDownloadDataBuilder withIncomingChunkedConnector(IncomingChunkedConnector incomingConnector) {
+            this.incomingChunkedConnector = incomingConnector;
             return this;
         }
 
@@ -95,7 +111,8 @@
 
         public AgentHttpDownloadData build() {
             AgentHttpDownloadData agentHttpDownloadData = new AgentHttpDownloadData();
-            agentHttpDownloadData.setIncomingConnector(incomingConnector);
+            agentHttpDownloadData.setIncomingStreamingConnector(incomingStreamingConnector);
+            agentHttpDownloadData.setIncomingChunkedConnector(incomingChunkedConnector);
             agentHttpDownloadData.setConnectorConfig(connectorConfig);
             agentHttpDownloadData.setChildResourcePath(childResourcePath);
             agentHttpDownloadData.setCreatedTime(createdTime);
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
index 87153b4..655eb68 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
@@ -22,11 +22,7 @@
 import io.netty.handler.codec.http.*;
 import io.netty.handler.stream.ChunkedStream;
 import io.netty.util.CharsetUtil;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.*;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.IncomingConnector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,11 +84,17 @@
             ChannelFuture sendFileFuture;
             ChannelFuture lastContentFuture;
 
-            IncomingConnector incomingConnector = downloadData.getIncomingConnector();
-            incomingConnector.init(downloadData.getConnectorConfig());
+            // TODO: Support chunked streaming
+            if (downloadData.getIncomingStreamingConnector() == null && downloadData.getIncomingChunkedConnector() != null) {
+                logger.error("Chunked data download is not yes supported in Http transport");
+                throw new Exception("Chunked data download is not yes supported in Http transport");
+            }
+
+            IncomingStreamingConnector incomingStreamingConnector = downloadData.getIncomingStreamingConnector();
+            incomingStreamingConnector.init(downloadData.getConnectorConfig());
             InputStream inputStream = downloadData.getChildResourcePath().equals("")?
-                    incomingConnector.fetchInputStream() :
-                    incomingConnector.fetchInputStream(downloadData.getChildResourcePath());
+                    incomingStreamingConnector.fetchInputStream() :
+                    incomingStreamingConnector.fetchInputStream(downloadData.getChildResourcePath());
 
             sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(inputStream)),
                     ctx.newProgressivePromise());
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
index 8e1e155..03466d4 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
@@ -28,9 +28,9 @@
 import org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
-import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.core.api.ConnectorConfig;
-import org.apache.airavata.mft.core.api.IncomingConnector;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -155,9 +155,10 @@
                 mftAuthorizationToken = tokenBuilder.build();
 
                 metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(storeType);
-                Optional<IncomingConnector> connectorOp = ConnectorResolver.resolveIncomingConnector(storeType);
+                Optional<IncomingStreamingConnector> connectorStreamingOp = ConnectorResolver.resolveIncomingStreamingConnector(storeType);
+                Optional<IncomingChunkedConnector> connectorChunkedOp = ConnectorResolver.resolveIncomingChunkedConnector(storeType);
 
-                if (metadataCollectorOp.isPresent() && connectorOp.isPresent()) {
+                if (metadataCollectorOp.isPresent() && (connectorStreamingOp.isPresent() || connectorChunkedOp.isPresent())) {
 
                     MetadataCollector metadataCollector = metadataCollectorOp.get();
                     metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
@@ -168,9 +169,8 @@
                             childResourcePath,
                             sourceToken);
 
-                    AgentHttpDownloadData downloadData = AgentHttpDownloadData.AgentHttpDownloadDataBuilder.newBuilder()
+                    AgentHttpDownloadData.AgentHttpDownloadDataBuilder agentHttpDownloadDataBuilder = AgentHttpDownloadData.AgentHttpDownloadDataBuilder.newBuilder()
                             .withChildResourcePath(childResourcePath)
-                            .withIncomingConnector(connectorOp.get())
                             .withConnectorConfig(ConnectorConfig.ConnectorConfigBuilder.newBuilder()
                                     .withResourceServiceHost(resourceServiceHost)
                                     .withResourceServicePort(resourceServicePort)
@@ -179,7 +179,12 @@
                                     .withResourceId(resourceId)
                                     .withCredentialToken(sourceToken)
                                     .withAuthToken(mftAuthorizationToken)
-                                    .withMetadata(fileResourceMetadata).build()).build();
+                                    .withMetadata(fileResourceMetadata).build());
+
+                    connectorStreamingOp.ifPresent(agentHttpDownloadDataBuilder::withIncomingStreamingConnector);
+                    connectorChunkedOp.ifPresent(agentHttpDownloadDataBuilder::withIncomingChunkedConnector);
+
+                    AgentHttpDownloadData downloadData = agentHttpDownloadDataBuilder.build();
 
                     String url = httpTransferRequestsStore.addDownloadRequest(downloadData);
 
diff --git a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
index e6b5ef1..7a76bcd 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
@@ -17,14 +17,16 @@
 
 package org.apache.airavata.mft.core;
 
-import org.apache.airavata.mft.core.api.IncomingConnector;
-import org.apache.airavata.mft.core.api.OutgoingConnector;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+import org.apache.airavata.mft.core.api.OutgoingChunkedConnector;
+import org.apache.airavata.mft.core.api.OutgoingStreamingConnector;
 
 import java.util.Optional;
 
 public final class ConnectorResolver {
 
-    public static Optional<IncomingConnector> resolveIncomingConnector(String type) throws Exception {
+    public static Optional<IncomingStreamingConnector> resolveIncomingStreamingConnector(String type) throws Exception {
 
         String className = null;
         switch (type) {
@@ -35,26 +37,64 @@
 
         if (className != null) {
             Class<?> aClass = Class.forName(className);
-            return Optional.of((IncomingConnector) aClass.getDeclaredConstructor().newInstance());
+            return Optional.of((IncomingStreamingConnector) aClass.getDeclaredConstructor().newInstance());
         } else {
             return Optional.empty();
         }
     }
 
-    public static Optional<OutgoingConnector> resolveOutgoingConnector(String type) throws Exception {
+    public static Optional<OutgoingStreamingConnector> resolveOutgoingStreamingConnector(String type) throws Exception {
 
         String className = null;
         switch (type) {
             case "SCP":
                 className = "org.apache.airavata.mft.transport.scp.SCPOutgoingConnector";
                 break;
+            case "S3":
+                className = "org.apache.airavata.mft.transport.s3.S3IncomingConnector";
+                break;
         }
 
         if (className != null) {
             Class<?> aClass = Class.forName(className);
-            return Optional.of((OutgoingConnector) aClass.getDeclaredConstructor().newInstance());
+            return Optional.of((OutgoingStreamingConnector) aClass.getDeclaredConstructor().newInstance());
         } else {
             return Optional.empty();
         }
     }
+
+    public static Optional<IncomingChunkedConnector> resolveIncomingChunkedConnector(String type) throws Exception {
+
+        String className = null;
+        switch (type) {
+            case "S3":
+                className = "org.apache.airavata.mft.transport.s3.S3IncomingConnector";
+                break;
+        }
+
+        if (className != null) {
+            Class<?> aClass = Class.forName(className);
+            return Optional.of((IncomingChunkedConnector) aClass.getDeclaredConstructor().newInstance());
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    public static Optional<OutgoingChunkedConnector> resolveOutgoingChunkedConnector(String type) throws Exception {
+
+        String className = null;
+        switch (type) {
+            case "S3":
+                className = "org.apache.airavata.mft.transport.s3.S3OutgoingConnector";
+                break;
+        }
+
+        if (className != null) {
+            Class<?> aClass = Class.forName(className);
+            return Optional.of((OutgoingChunkedConnector) aClass.getDeclaredConstructor().newInstance());
+        } else {
+            return Optional.empty();
+        }
+    }
+
 }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
new file mode 100644
index 0000000..1ca72d6
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
@@ -0,0 +1,6 @@
+package org.apache.airavata.mft.core.api;
+
+public interface BasicConnector {
+    public void init(ConnectorConfig connectorConfig) throws Exception;
+    public void complete() throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
new file mode 100644
index 0000000..37288c6
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
@@ -0,0 +1,5 @@
+package org.apache.airavata.mft.core.api;
+
+public interface IncomingChunkedConnector extends BasicConnector {
+    public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
similarity index 95%
rename from core/src/main/java/org/apache/airavata/mft/core/api/IncomingConnector.java
rename to core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
index b805bf8..900b148 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
@@ -19,7 +19,7 @@
 
 import java.io.InputStream;
 
-public interface IncomingConnector {
+public interface IncomingStreamingConnector {
     public void init(ConnectorConfig connectorConfig) throws Exception;
     public InputStream fetchInputStream() throws Exception;
     public InputStream fetchInputStream(String childPath) throws Exception;
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
new file mode 100644
index 0000000..9a2f3cb
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
@@ -0,0 +1,5 @@
+package org.apache.airavata.mft.core.api;
+
+public interface OutgoingChunkedConnector extends BasicConnector {
+    public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
similarity index 86%
rename from core/src/main/java/org/apache/airavata/mft/core/api/OutgoingConnector.java
rename to core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
index 9ab495c..1e2633d 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
@@ -19,9 +19,7 @@
 
 import java.io.OutputStream;
 
-public interface OutgoingConnector {
-    public void init(ConnectorConfig connectorConfig) throws Exception;
+public interface OutgoingStreamingConnector extends BasicConnector {
     public OutputStream fetchOutputStream() throws Exception;
     public OutputStream fetchOutputStream(String childPath) throws Exception;
-    public void complete() throws Exception;
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
new file mode 100644
index 0000000..03b745b
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
@@ -0,0 +1,96 @@
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
+import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+
+public class S3IncomingConnector implements IncomingChunkedConnector, IncomingStreamingConnector {
+
+    private static final Logger logger = LoggerFactory.getLogger(S3IncomingConnector.class);
+
+    private GenericResource resource;
+    private AmazonS3 s3Client;
+
+    @Override
+    public void init(ConnectorConfig cc) throws Exception {
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setResourceId(cc.getResourceId()).build());
+        }
+
+        if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
+            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+        }
+
+        S3Storage s3Storage = resource.getS3Storage();
+
+        S3Secret s3Secret;
+
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+            s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setSecretId(cc.getCredentialToken()).build());
+
+            BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
+
+            s3Client = AmazonS3ClientBuilder.standard()
+                    .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+                    .withRegion(s3Storage.getRegion())
+                    .build();
+        }
+    }
+
+
+    @Override
+    public InputStream fetchInputStream() throws Exception {
+        S3Object s3object = s3Client.getObject(resource.getS3Storage().getBucketName(), resource.getFile().getResourcePath());
+        return s3object.getObjectContent();
+    }
+
+    @Override
+    public InputStream fetchInputStream(String childPath) throws Exception {
+        S3Object s3object = s3Client.getObject(resource.getS3Storage().getBucketName(), childPath);
+        return s3object.getObjectContent();
+    }
+
+    @Override
+    public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception {
+        GetObjectRequest rangeObjectRequest = new GetObjectRequest(resource.getS3Storage().getBucketName(),
+                resource.getFile().getResourcePath());
+        rangeObjectRequest.setRange(startByte, endByte - 1);
+        ObjectMetadata objectMetadata = s3Client.getObject(rangeObjectRequest, new File(downloadFile));
+        logger.info("Downloaded S3 chunk to path {} for resource id {}", downloadFile, resource.getResourceId());
+    }
+
+    @Override
+    public void complete() throws Exception {
+
+    }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
new file mode 100644
index 0000000..3e95383
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -0,0 +1,101 @@
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.*;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.OutgoingChunkedConnector;
+import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
+import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class S3OutgoingConnector implements OutgoingChunkedConnector {
+
+    private static final Logger logger = LoggerFactory.getLogger(S3OutgoingConnector.class);
+
+    private GenericResource resource;
+    private AmazonS3 s3Client;
+
+    InitiateMultipartUploadResult initResponse;
+    List<PartETag> partETags = Collections.synchronizedList(new ArrayList<>());
+
+    @Override
+    public void init(ConnectorConfig cc) throws Exception {
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setResourceId(cc.getResourceId()).build());
+        }
+
+        if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
+            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+        }
+
+        S3Storage s3Storage = resource.getS3Storage();
+
+        S3Secret s3Secret;
+
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+            s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setSecretId(cc.getCredentialToken()).build());
+
+            BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
+
+            s3Client = AmazonS3ClientBuilder.standard()
+                    .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+                    .withRegion(s3Storage.getRegion())
+                    .build();
+        }
+
+        InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(resource.getS3Storage().getBucketName(),
+                resource.getFile().getResourcePath());
+        initResponse = s3Client.initiateMultipartUpload(initRequest);
+    }
+
+    @Override
+    public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception {
+        File file = new File(uploadFile);
+        UploadPartRequest uploadRequest = new UploadPartRequest()
+                .withBucketName(resource.getS3Storage().getBucketName())
+                .withKey(resource.getFile().getResourcePath())
+                .withUploadId(initResponse.getUploadId())
+                .withPartNumber(chunkId + 1)
+                .withFileOffset(0)
+                .withFile(file)
+                .withPartSize(file.length());
+
+        UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+        this.partETags.add(uploadResult.getPartETag());
+        logger.info("Uploaded S3 chunk to path {} for resource id {}", uploadFile, resource.getResourceId());
+    }
+
+
+    @Override
+    public void complete() throws Exception {
+        CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(resource.getS3Storage().getBucketName(),
+                resource.getFile().getResourcePath(), initResponse.getUploadId(), partETags);
+        s3Client.completeMultipartUpload(compRequest);
+    }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
deleted file mode 100644
index a430323..0000000
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.s3;
-
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
-import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
-import org.apache.airavata.mft.secret.client.SecretServiceClient;
-import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-
-public class S3Receiver implements Connector {
-
-    private static final Logger logger = LoggerFactory.getLogger(S3Receiver.class);
-
-    private AmazonS3 s3Client;
-
-    private String resourceServiceHost;
-    private int resourceServicePort;
-    private String secretServiceHost;
-    private int secretServicePort;
-
-    @Override
-    public void init(String resourceServiceHost, int resourceServicePort,
-                     String secretServiceHost, int secretServicePort) throws Exception {
-
-        this.resourceServiceHost = resourceServiceHost;
-        this.resourceServicePort = resourceServicePort;
-        this.secretServiceHost = secretServiceHost;
-        this.secretServicePort = secretServicePort;
-    }
-
-    @Override
-    public void destroy() {
-
-    }
-
-    @Override
-    public void startStream(AuthToken authToken, String resourceId, String credentialToken, ConnectorContext context) throws Exception {
-
-        logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
-
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(resourceId).build());
-
-        if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
-            throw new Exception("Invalid storage type specified for resource " + resourceId);
-        }
-
-        S3Storage s3Storage = resource.getS3Storage();
-
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
-        BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
-
-        s3Client = AmazonS3ClientBuilder.standard()
-                .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
-                .withRegion(s3Storage.getRegion())
-                .build();
-
-        S3Object s3object = s3Client.getObject(s3Storage.getBucketName(), resource.getFile().getResourcePath());
-        S3ObjectInputStream inputStream = s3object.getObjectContent();
-
-        OutputStream os = context.getStreamBuffer().getOutputStream();
-        int read;
-        long bytes = 0;
-        while ((read = inputStream.read()) != -1) {
-            bytes++;
-            os.write(read);
-        }
-        os.flush();
-        os.close();
-
-        logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
-    }
-
-    @Override
-    public void startStream(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
-                            ConnectorContext context) throws Exception {
-        throw new UnsupportedOperationException();
-    }
-}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
deleted file mode 100644
index 0dec96d..0000000
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.s3;
-
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
-import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
-import org.apache.airavata.mft.secret.client.SecretServiceClient;
-import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class S3Sender implements Connector {
-
-    private static final Logger logger = LoggerFactory.getLogger(S3Sender.class);
-
-    private AmazonS3 s3Client;
-
-    private String resourceServiceHost;
-    private int resourceServicePort;
-    private String secretServiceHost;
-    private int secretServicePort;
-
-    @Override
-    public void init(String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
-
-        this.resourceServiceHost = resourceServiceHost;
-        this.resourceServicePort = resourceServicePort;
-        this.secretServiceHost = secretServiceHost;
-        this.secretServicePort = secretServicePort;
-    }
-
-    @Override
-    public void destroy() {
-
-    }
-
-    @Override
-    public void startStream(AuthToken authToken, String resourceId, String credentialToken, ConnectorContext context) throws Exception {
-
-        logger.info("Starting S3 Sender stream for transfer {}", context.getTransferId());
-        logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
-
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(resourceId).build());
-
-        if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
-            throw new Exception("Invalid storage type specified for resource " + resourceId);
-        }
-
-        S3Storage s3Storage = resource.getS3Storage();
-
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
-        BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
-
-        s3Client = AmazonS3ClientBuilder.standard()
-                .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
-                .withRegion(s3Storage.getRegion())
-                .build();
-
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setContentLength(context.getMetadata().getResourceSize());
-
-        s3Client.putObject(s3Storage.getBucketName(), resource.getFile().getResourcePath(),
-                context.getStreamBuffer().getInputStream(), metadata);
-        logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
-    }
-
-    @Override
-    public void startStream(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
-                            ConnectorContext context) throws Exception {
-        throw new UnsupportedOperationException();
-    }
-}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
index 9cbb590..d6bacdb 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
@@ -21,7 +21,7 @@
 import com.jcraft.jsch.ChannelExec;
 import com.jcraft.jsch.Session;
 import org.apache.airavata.mft.core.api.ConnectorConfig;
-import org.apache.airavata.mft.core.api.IncomingConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
@@ -38,7 +38,7 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 
-public final class SCPIncomingConnector implements IncomingConnector {
+public final class SCPIncomingConnector implements IncomingStreamingConnector {
 
     private static final Logger logger = LoggerFactory.getLogger(SCPIncomingConnector.class);
 
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
index 980019b..28ea5e0 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
@@ -21,7 +21,7 @@
 import com.jcraft.jsch.ChannelExec;
 import com.jcraft.jsch.Session;
 import org.apache.airavata.mft.core.api.ConnectorConfig;
-import org.apache.airavata.mft.core.api.OutgoingConnector;
+import org.apache.airavata.mft.core.api.OutgoingStreamingConnector;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
@@ -38,7 +38,7 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 
-public final class SCPOutgoingConnector implements OutgoingConnector {
+public final class SCPOutgoingConnector implements OutgoingStreamingConnector {
 
     private static final Logger logger = LoggerFactory.getLogger(SCPOutgoingConnector.class);