exclude lof4j-slf4j-impl jar
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java b/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java
index 19fd30e..9e2e1f6 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/SyncRPCClient.java
@@ -96,11 +96,13 @@
         this.responseQueueMap.put(request.getMessageId(), queue);
 
         try {
+            logger.info("Requesting sync request {} on agent {}", request.getRequestId(), request.getAgentId());
             this.mftConsulClient.sendSyncRPCToAgent(request.getAgentId(), request);
             SyncRPCResponse response = queue.poll(waitMs, TimeUnit.MILLISECONDS);
             if (response == null) {
                 throw new MFTConsulClientException("Timed out waiting for the response");
             }
+            logger.info("Completing sync request {} on agent {}", request.getRequestId(), request.getAgentId());
             return response;
         } finally {
             this.responseQueueMap.remove(request.getMessageId());
@@ -108,6 +110,6 @@
     }
 
     public SyncRPCResponse sendSyncRequest(SyncRPCRequest request) throws MFTConsulClientException, InterruptedException {
-        return sendSyncRequest(request, 10000);
+        return sendSyncRequest(request, 100000);
     }
 }
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java
index 1aadc83..0d0bdff 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/rpc/SyncRPCRequest.java
@@ -19,8 +19,10 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 public class SyncRPCRequest {
+    private String requestId = UUID.randomUUID().toString();
     private String agentId;
     private String method;
     private Map<String, String> parameters;
@@ -72,6 +74,14 @@
         return this;
     }
 
+    public String getRequestId() {
+        return requestId;
+    }
+
+    public void setRequestId(String requestId) {
+        this.requestId = requestId;
+    }
+
     public static final class SyncRPCRequestBuilder {
         private String agentId;
         private String method;
diff --git a/agent/pom.xml b/agent/pom.xml
index 7d6765e..b7a2ce2 100644
--- a/agent/pom.xml
+++ b/agent/pom.xml
@@ -37,6 +37,12 @@
             <groupId>org.apache.airavata</groupId>
             <artifactId>mft-scp-transport</artifactId>
             <version>0.01-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.custos</groupId>
+                    <artifactId>custos-java-sdk</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
@@ -87,6 +93,10 @@
                     <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>jackson-core</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -95,6 +105,16 @@
             <version>2.10.0</version>
         </dependency>
         <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.10.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-xml</artifactId>
+            <version>2.10.0</version>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>log4j-over-slf4j</artifactId>
             <version>${log4j.over.slf4j}</version>
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 5ccf25a..fc670c5 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -35,9 +35,9 @@
 import org.apache.airavata.mft.agent.rpc.RPCParser;
 import org.apache.airavata.mft.api.service.CallbackEndpoint;
 import org.apache.airavata.mft.api.service.TransferApiRequest;
-import org.apache.airavata.mft.core.ConnectorResolver;
+import org.apache.airavata.mft.core.FileResourceMetadata;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
-import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -65,8 +65,6 @@
 
     private static final Logger logger = LoggerFactory.getLogger(MFTAgent.class);
 
-    private final TransportMediator mediator = new TransportMediator();
-
     @org.springframework.beans.factory.annotation.Value("${agent.id}")
     private String agentId;
 
@@ -88,6 +86,9 @@
     @org.springframework.beans.factory.annotation.Value("${agent.supported.protocols}")
     private String supportedProtocols;
 
+    @org.springframework.beans.factory.annotation.Value("${agent.temp.data.dir}")
+    private String tempDataDir = "/tmp";
+
     @org.springframework.beans.factory.annotation.Value("${resource.service.host}")
     private String resourceServiceHost;
 
@@ -113,6 +114,9 @@
     private long sessionTTLSeconds = 10;
     private String session;
 
+    private TransportMediator mediator;
+
+
     private ObjectMapper mapper = new ObjectMapper();
 
     @Autowired
@@ -127,6 +131,7 @@
     public void init() {
         transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
         rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
+        mediator = new TransportMediator(tempDataDir);
     }
 
     private void acceptRPCRequests() {
@@ -172,14 +177,6 @@
                             .setPublisher(agentId)
                             .setDescription("Starting the transfer"));
 
-                        Optional<Connector> inConnectorOpt = ConnectorResolver.resolveConnector(request.getSourceType(), "IN");
-                        Connector inConnector = inConnectorOpt.orElseThrow(() -> new Exception("Could not find an in connector for given input"));
-                        inConnector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
-
-                        Optional<Connector> outConnectorOpt = ConnectorResolver.resolveConnector(request.getDestinationType(), "OUT");
-                        Connector outConnector = outConnectorOpt.orElseThrow(() -> new Exception("Could not find an out connector for given input"));
-                        outConnector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
-
                         Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
                         MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
                         srcMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
@@ -188,6 +185,34 @@
                         MetadataCollector dstMetadataCollector = dstMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for destination"));
                         dstMetadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
 
+                        FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
+                                request.getMftAuthorizationToken(),
+                                request.getSourceResourceId(),
+                                request.getSourceToken());
+
+
+                        ConnectorConfig srcCC = ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+                                .withAuthToken(request.getMftAuthorizationToken())
+                                .withResourceServiceHost(resourceServiceHost)
+                                .withResourceServicePort(resourceServicePort)
+                                .withSecretServiceHost(secretServiceHost)
+                                .withSecretServicePort(secretServicePort)
+                                .withTransferId(transferId)
+                                .withResourceId(request.getSourceResourceId())
+                                .withCredentialToken(request.getSourceToken())
+                                .withMetadata(srcMetadata).build();
+
+                        ConnectorConfig dstCC = ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+                                .withAuthToken(request.getMftAuthorizationToken())
+                                .withResourceServiceHost(resourceServiceHost)
+                                .withResourceServicePort(resourceServicePort)
+                                .withSecretServiceHost(secretServiceHost)
+                                .withSecretServicePort(secretServicePort)
+                                .withTransferId(transferId)
+                                .withResourceId(request.getDestinationResourceId())
+                                .withCredentialToken(request.getDestinationToken())
+                                .withMetadata(srcMetadata).build();
+
                         mftConsulClient.submitTransferStateToProcess(transferId, agentId, new TransferState()
                             .setState("STARTED")
                             .setPercentage(0)
@@ -195,13 +220,11 @@
                             .setPublisher(agentId)
                             .setDescription("Started the transfer"));
 
-
-                        TransferApiRequest finalRequest = request;
-                        mediator.transfer(transferId, request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector,
+                        mediator.transferSingleThread(transferId, request, srcCC, dstCC,
                                 (id, st) -> {
                                     try {
                                         mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
-                                        handleCallbacks(finalRequest.getCallbackEndpointsList(), id, st);
+
                                     } catch (MFTConsulClientException e) {
                                         logger.error("Failed while updating transfer state", e);
                                     }
@@ -213,8 +236,7 @@
                                     } catch (Exception e) {
                                         logger.error("Failed while deleting scheduled path for transfer {}", id);
                                     }
-                                }
-                        );
+                        });
 
                         logger.info("Started the transfer " + transferId);
 
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index b9195b8..b891b05 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -20,17 +20,18 @@
 import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.api.service.TransferApiRequest;
 import org.apache.airavata.mft.core.*;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.core.api.*;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Optional;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 
 public class TransportMediator {
@@ -40,197 +41,209 @@
     /*
     Number of maximum transfers handled at atime
      */
-    private int concurrentTransfers = 10;
-    private ExecutorService executor = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 connections per transfer
-    private ExecutorService monitorPool = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 monitors per transfer
+    private final int concurrentTransfers = 10;
+    private final ExecutorService executor = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 connections per transfer
+    private final ExecutorService monitorPool = Executors.newFixedThreadPool(concurrentTransfers * 2); // 2 monitors per transfer
+
+    private String tempDataDir = "/tmp";
+
+    public TransportMediator(String tempDataDir) {
+        this.tempDataDir = tempDataDir;
+    }
+
+    public void transferSingleThread(String transferId,
+                                     TransferApiRequest request,
+                                     ConnectorConfig srcCC,
+                                     ConnectorConfig dstCC,
+                                     BiConsumer<String, TransferState> onStatusCallback,
+                                     BiConsumer<String, Boolean> exitingCallback) {
+
+        executor.submit(() -> {
+
+            final AtomicBoolean transferInProgress = new AtomicBoolean(true);
+
+            try {
+
+                long start = System.currentTimeMillis();
+
+                onStatusCallback.accept(transferId, new TransferState()
+                        .setPercentage(100)
+                        .setState("RUNNING")
+                        .setUpdateTimeMils(System.currentTimeMillis())
+                        .setDescription("Transfer successfully completed"));
+
+                Optional<IncomingStreamingConnector> inStreamingConnectorOp = ConnectorResolver
+                        .resolveIncomingStreamingConnector(request.getSourceType());
+                Optional<OutgoingStreamingConnector> outStreamingConnectorOp = ConnectorResolver
+                        .resolveOutgoingStreamingConnector(request.getDestinationType());
+
+                Optional<IncomingChunkedConnector> inChunkedConnectorOp = ConnectorResolver
+                        .resolveIncomingChunkedConnector(request.getSourceType());
+                Optional<OutgoingChunkedConnector> outChunkedConnectorOp = ConnectorResolver
+                        .resolveOutgoingChunkedConnector(request.getDestinationType());
+
+                // Give priority for chunked transfers.
+                // TODO: Provide a preference at the API level
+                if (inChunkedConnectorOp.isPresent() && outChunkedConnectorOp.isPresent()) {
+
+                    logger.info("Starting the chunked transfer for transfer {}", transferId);
+
+                    long chunkSize = 20 * 1024 * 1024L;
+
+                    ExecutorService chunkedExecutorService = Executors.newFixedThreadPool(20);
+
+                    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(chunkedExecutorService);
+
+                    long fileLength = srcCC.getMetadata().getResourceSize();
+                    long uploadLength = 0L;
+                    int chunkIdx = 0;
+
+                    IncomingChunkedConnector inConnector = inChunkedConnectorOp
+                            .orElseThrow(() -> new Exception("Could not find an in chunked connector for type " + request.getSourceType()));
+
+                    OutgoingChunkedConnector outConnector = outChunkedConnectorOp
+                            .orElseThrow(() -> new Exception("Could not find an out chunked connector for type " + request.getDestinationType()));
+
+                    inConnector.init(srcCC);
+                    outConnector.init(dstCC);
+
+                    while(uploadLength < fileLength) {
+
+                        long endPos = uploadLength + chunkSize;
+                        if (endPos > fileLength) {
+                            endPos = fileLength;
+                        }
+
+                        String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
+                        completionService.submit(new ChunkMover(inConnector, outConnector, uploadLength, endPos, chunkIdx, tempFile));
+
+                        uploadLength = endPos;
+                        chunkIdx++;
+                    }
+
+
+                    for (int i = 0; i < chunkIdx; i++) {
+                        Future<Integer> future = completionService.take();
+                    }
+
+                    logger.info("Completed chunked transfer for transfer {}", transferId);
+
+                } else if (inStreamingConnectorOp.isPresent() && outStreamingConnectorOp.isPresent()) {
+
+                    logger.info("Starting streaming transfer for transfer {}", transferId);
+                    IncomingStreamingConnector inConnector = inStreamingConnectorOp
+                            .orElseThrow(() -> new Exception("Could not find an in streaming connector for type " + request.getSourceType()));
+
+                    OutgoingStreamingConnector outConnector = outStreamingConnectorOp
+                            .orElseThrow(() -> new Exception("Could not find an out streaming connector for type " + request.getDestinationType()));
+
+                    inConnector.init(srcCC);
+                    outConnector.init(dstCC);
+
+                    String srcChild = request.getSourceChildResourcePath();
+                    String dstChild = request.getDestinationChildResourcePath();
+
+                    InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild);
+                    OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild);
+
+                    long count = 0;
+                    final AtomicLong countAtomic = new AtomicLong();
+                    countAtomic.set(count);
+
+                    monitorPool.submit(() -> {
+                        while (true) {
+                            try {
+                                Thread.sleep(2000);
+                            } catch (InterruptedException e) {
+                                // Ignore
+                            }
+                            if (!transferInProgress.get()) {
+                                logger.info("Status monitor is exiting for transfer {}", transferId);
+                                break;
+                            }
+                            double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getResourceSize();
+                            logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
+                            onStatusCallback.accept(transferId, new TransferState()
+                                    .setPercentage(transferPercentage)
+                                    .setState("RUNNING")
+                                    .setUpdateTimeMils(System.currentTimeMillis())
+                                    .setDescription("Transfer Progress Updated"));
+                        }
+                    });
+
+                    int n;
+                    byte[] buffer = new byte[128 * 1024];
+                    for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) {
+                        outputStream.write(buffer, 0, n);
+                        countAtomic.set(count);
+                    }
+
+                    inConnector.complete();
+                    outConnector.complete();
+
+                    logger.info("Completed s ttreaming ransfer for transfer {}", transferId);
+
+                } else {
+                    throw new Exception("No matching connector found to perform the transfer");
+                }
+
+                long time = (System.currentTimeMillis() - start) / 1000;
+
+                logger.info("Transfer {} completed. Time {} S.  Speed {} MB/s", transferId, time,
+                        (srcCC.getMetadata().getResourceSize() * 1.0 / time) / (1024 * 1024));
+
+                onStatusCallback.accept(transferId, new TransferState()
+                        .setPercentage(100)
+                        .setState("COMPLETED")
+                        .setUpdateTimeMils(System.currentTimeMillis())
+                        .setDescription("Transfer successfully completed"));
+
+                exitingCallback.accept(transferId, true);
+            } catch (Exception e) {
+
+                logger.error("Transfer {} failed with error", transferId, e);
+
+                onStatusCallback.accept(transferId, new TransferState()
+                        .setPercentage(0)
+                        .setState("FAILED")
+                        .setUpdateTimeMils(System.currentTimeMillis())
+                        .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
+            } finally {
+                transferInProgress.set(false);
+            }
+        });
+    }
 
     public void destroy() {
         executor.shutdown();
+        monitorPool.shutdown();
     }
 
-    public void transfer(String transferId, TransferApiRequest request, Connector inConnector, Connector outConnector, MetadataCollector srcMetadataCollector,
-                           MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onStatusCallback,
-                           BiConsumer<String, Boolean> exitingCallback) throws Exception {
+    private static class ChunkMover implements Callable<Integer> {
 
-        FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
-                            request.getMftAuthorizationToken(),
-                            request.getSourceResourceId(),
-                            request.getSourceToken());
+        IncomingChunkedConnector downloader;
+        OutgoingChunkedConnector uploader;
+        long uploadLength;
+        long endPos;
+        int chunkIdx;
+        String tempFile;
 
-        final long resourceSize = srcMetadata.getResourceSize();
-        logger.debug("Source file size {}. MD5 {}", resourceSize, srcMetadata.getMd5sum());
+        public ChunkMover(IncomingChunkedConnector downloader, OutgoingChunkedConnector uploader, long uploadLength,
+                          long endPos, int chunkIdx, String tempFile) {
+            this.downloader = downloader;
+            this.uploader = uploader;
+            this.uploadLength = uploadLength;
+            this.endPos = endPos;
+            this.chunkIdx = chunkIdx;
+            this.tempFile = tempFile;
+        }
 
-        final DoubleStreamingBuffer streamBuffer = new DoubleStreamingBuffer();
-        final ReentrantLock statusLock = new ReentrantLock();
-
-        ConnectorContext context = new ConnectorContext();
-        context.setMetadata(srcMetadata);
-        context.setStreamBuffer(streamBuffer);
-        context.setTransferId(transferId);
-
-        TransferTask recvTask = new TransferTask(request.getMftAuthorizationToken(), request.getSourceResourceId(),
-                request.getSourceChildResourcePath(), request.getSourceToken(), context, inConnector);
-        TransferTask sendTask = new TransferTask(request.getMftAuthorizationToken(), request.getDestinationResourceId(),
-                request.getDestinationChildResourcePath(), request.getDestinationToken(), context, outConnector);
-        List<Future<Integer>> futureList = new ArrayList<>();
-
-        ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
-
-        long startTime = System.nanoTime();
-
-        futureList.add(completionService.submit(recvTask));
-        futureList.add(completionService.submit(sendTask));
-
-        final AtomicBoolean transferInProgress = new AtomicBoolean(true);
-        final AtomicBoolean transferSuccess = new AtomicBoolean(true);
-
-
-        // Monitoring the completeness of the transfer
-        Thread monitorThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-
-                try {
-                    int futureCnt = futureList.size();
-                    boolean transferErrored = false;
-
-                    for (int i = 0; i < futureCnt; i++) {
-                        Future<Integer> ft = completionService.take();
-                        futureList.remove(ft);
-                        try {
-                            ft.get();
-                        } catch (Exception e) {
-
-                            logger.error("One task failed with error", e);
-                            transferErrored = true;
-                            statusLock.lock();
-                            onStatusCallback.accept(transferId, new TransferState()
-                                .setPercentage(0)
-                                .setState("FAILED")
-                                .setUpdateTimeMils(System.currentTimeMillis())
-                                .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
-                            transferInProgress.set(false);
-                            transferSuccess.set(false);
-                            statusLock.unlock();
-
-                            for (Future<Integer> f : futureList) {
-                                try {
-                                    Thread.sleep(1000);
-                                } catch (InterruptedException ex) {
-                                    logger.error("Sleep failed", e);
-                                }
-                                f.cancel(true);
-                            }
-                            futureList.clear();
-                        }
-                    }
-
-                    if (!transferErrored) {
-                        Boolean transferred = destMetadataCollector.isAvailable(
-                                request.getMftAuthorizationToken(),
-                                request.getDestinationResourceId(),
-                                request.getDestinationToken());
-
-
-                        if (!transferred) {
-                            logger.error("Transfer completed but resource is not available in destination");
-                            throw new Exception("Transfer completed but resource is not available in destination");
-                        }
-
-                        FileResourceMetadata destMetadata = destMetadataCollector.getFileResourceMetadata(
-                                request.getMftAuthorizationToken(),
-                                request.getDestinationResourceId(),
-                                request.getDestinationToken());
-
-
-                        boolean doIntegrityVerify = true;
-
-                        if (srcMetadata.getMd5sum() == null) {
-                            logger.warn("MD5 sum is not available for source resource. So this disables integrity verification");
-                            doIntegrityVerify = false;
-                        } else if (destMetadata.getMd5sum() == null) {
-                            logger.warn("MD5 sum is not available for destination resource. So this disables integrity verification");
-                            doIntegrityVerify = false;
-                        }
-
-                        if (doIntegrityVerify && !destMetadata.getMd5sum().equals(srcMetadata.getMd5sum())) {
-                            logger.error("Resource integrity violated. MD5 sums are not matching. Source md5 {} destination md5 {}",
-                                    srcMetadata.getMd5sum(), destMetadata.getMd5sum());
-                            throw new Exception("Resource integrity violated. MD5 sums are not matching. Source md5 " + srcMetadata.getMd5sum()
-                                    + " destination md5 " + destMetadata.getMd5sum());
-                        }
-
-                        // Check
-
-                        long endTime = System.nanoTime();
-
-                        double time = (endTime - startTime) * 1.0 / 1000000000;
-
-                        statusLock.lock();
-                        onStatusCallback.accept(transferId, new TransferState()
-                                .setPercentage(100)
-                                .setState("COMPLETED")
-                                .setUpdateTimeMils(System.currentTimeMillis())
-                                .setDescription("Transfer successfully completed"));
-                        transferInProgress.set(false);
-                        transferSuccess.set(true);
-                        statusLock.unlock();
-
-                        logger.info("Transfer {} completed.  Speed {} MB/s", transferId,
-                                (srcMetadata.getResourceSize() * 1.0 / time) / (1024 * 1024));
-                    }
-                } catch (Exception e) {
-
-                    statusLock.lock();
-                    onStatusCallback.accept(transferId, new TransferState()
-                            .setPercentage(0)
-                            .setState("FAILED")
-                            .setUpdateTimeMils(System.currentTimeMillis())
-                            .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e)));
-                    transferInProgress.set(false);
-                    transferSuccess.set(false);
-                    statusLock.unlock();
-
-                    logger.error("Transfer {} failed", transferId, e);
-                } finally {
-                    inConnector.destroy();
-                    outConnector.destroy();
-                    transferInProgress.set(false);
-                    exitingCallback.accept(transferId,transferSuccess.get());
-                }
-            }
-        });
-
-        // Monitoring the status of the transfer
-        Thread progressThread = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                while (true) {
-
-                    try {
-                        Thread.sleep(2000);
-                    } catch (InterruptedException e) {
-                        // Ignore
-                    }
-                    statusLock.lock();
-                    if (!transferInProgress.get()){
-                        statusLock.unlock();
-                        logger.info("Status monitor is exiting for transfer {}", transferId);
-                        break;
-                    }
-                    double transferPercentage = streamBuffer.getProcessedBytes() * 100.0/ resourceSize;
-                    logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage);
-                    onStatusCallback.accept(transferId, new TransferState()
-                            .setPercentage(transferPercentage)
-                            .setState("RUNNING")
-                            .setUpdateTimeMils(System.currentTimeMillis())
-                            .setDescription("Transfer Progress Updated"));
-                    statusLock.unlock();
-                }
-            }
-        });
-
-        monitorPool.submit(monitorThread);
-        monitorPool.submit(progressThread);
+        @Override
+        public Integer call() throws Exception {
+            downloader.downloadChunk(chunkIdx, uploadLength, endPos, tempFile);
+            uploader.uploadChunk(chunkIdx, uploadLength, endPos, tempFile);
+            new File(tempFile).delete();
+            return chunkIdx;
+        }
     }
 }
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
new file mode 100644
index 0000000..db7a444
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/AgentHttpDownloadData.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+
+public class AgentHttpDownloadData {
+    private IncomingStreamingConnector incomingStreamingConnector;
+    private IncomingChunkedConnector incomingChunkedConnector;
+    private ConnectorConfig connectorConfig;
+    private String childResourcePath;
+    private long createdTime = System.currentTimeMillis();
+
+    public IncomingStreamingConnector getIncomingStreamingConnector() {
+        return incomingStreamingConnector;
+    }
+
+    public void setIncomingStreamingConnector(IncomingStreamingConnector incomingStreamingConnector) {
+        this.incomingStreamingConnector = incomingStreamingConnector;
+    }
+
+    public IncomingChunkedConnector getIncomingChunkedConnector() {
+        return incomingChunkedConnector;
+    }
+
+    public void setIncomingChunkedConnector(IncomingChunkedConnector incomingChunkedConnector) {
+        this.incomingChunkedConnector = incomingChunkedConnector;
+    }
+
+    public ConnectorConfig getConnectorConfig() {
+        return connectorConfig;
+    }
+
+    public void setConnectorConfig(ConnectorConfig connectorConfig) {
+        this.connectorConfig = connectorConfig;
+    }
+
+    public String getChildResourcePath() {
+        return childResourcePath;
+    }
+
+    public void setChildResourcePath(String childResourcePath) {
+        this.childResourcePath = childResourcePath;
+    }
+
+    public long getCreatedTime() {
+        return createdTime;
+    }
+
+    public void setCreatedTime(long createdTime) {
+        this.createdTime = createdTime;
+    }
+
+
+    public static final class AgentHttpDownloadDataBuilder {
+        private IncomingStreamingConnector incomingStreamingConnector;
+        private IncomingChunkedConnector incomingChunkedConnector;
+        private ConnectorConfig connectorConfig;
+        private String childResourcePath;
+        private long createdTime = System.currentTimeMillis();
+
+        private AgentHttpDownloadDataBuilder() {
+        }
+
+        public static AgentHttpDownloadDataBuilder newBuilder() {
+            return new AgentHttpDownloadDataBuilder();
+        }
+
+        public AgentHttpDownloadDataBuilder withIncomingStreamingConnector(IncomingStreamingConnector incomingConnector) {
+            this.incomingStreamingConnector = incomingConnector;
+            return this;
+        }
+
+        public AgentHttpDownloadDataBuilder withIncomingChunkedConnector(IncomingChunkedConnector incomingConnector) {
+            this.incomingChunkedConnector = incomingConnector;
+            return this;
+        }
+
+        public AgentHttpDownloadDataBuilder withConnectorConfig(ConnectorConfig connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public AgentHttpDownloadDataBuilder withChildResourcePath(String childResourcePath) {
+            this.childResourcePath = childResourcePath;
+            return this;
+        }
+
+        public AgentHttpDownloadDataBuilder withCreatedTime(long createdTime) {
+            this.createdTime = createdTime;
+            return this;
+        }
+
+
+        public AgentHttpDownloadData build() {
+            AgentHttpDownloadData agentHttpDownloadData = new AgentHttpDownloadData();
+            agentHttpDownloadData.setIncomingStreamingConnector(incomingStreamingConnector);
+            agentHttpDownloadData.setIncomingChunkedConnector(incomingChunkedConnector);
+            agentHttpDownloadData.setConnectorConfig(connectorConfig);
+            agentHttpDownloadData.setChildResourcePath(childResourcePath);
+            agentHttpDownloadData.setCreatedTime(createdTime);
+            return agentHttpDownloadData;
+        }
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java
deleted file mode 100644
index 99a4b38..0000000
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.agent.http;
-
-public class ConnectorParams {
-
-    private String resourceServiceHost, secretServiceHost;
-    private int resourceServicePort, secretServicePort;
-
-    public String getResourceServiceHost() {
-        return resourceServiceHost;
-    }
-
-    public ConnectorParams setResourceServiceHost(String resourceServiceHost) {
-        this.resourceServiceHost = resourceServiceHost;
-        return this;
-    }
-
-    public String getSecretServiceHost() {
-        return secretServiceHost;
-    }
-
-    public ConnectorParams setSecretServiceHost(String secretServiceHost) {
-        this.secretServiceHost = secretServiceHost;
-        return this;
-    }
-
-    public int getResourceServicePort() {
-        return resourceServicePort;
-    }
-
-    public ConnectorParams setResourceServicePort(int resourceServicePort) {
-        this.resourceServicePort = resourceServicePort;
-        return this;
-    }
-
-    public int getSecretServicePort() {
-        return secretServicePort;
-    }
-
-    public ConnectorParams setSecretServicePort(int secretServicePort) {
-        this.secretServicePort = secretServicePort;
-        return this;
-    }
-}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java
deleted file mode 100644
index 2cf56af..0000000
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.agent.http;
-
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
-
-public class HttpDownloadRequest {
-
-    private ConnectorParams connectorParams;
-    private Connector srcConnector;
-    private MetadataCollector srcMetadataCollector;
-    private String srcResourceId;
-    private String srcToken;
-
-    public ConnectorParams getConnectorParams() {
-        return connectorParams;
-    }
-
-    public HttpDownloadRequest setConnectorParams(ConnectorParams connectorParams) {
-        this.connectorParams = connectorParams;
-        return this;
-    }
-
-    public Connector getSrcConnector() {
-        return srcConnector;
-    }
-
-    public HttpDownloadRequest setSrcConnector(Connector srcConnector) {
-        this.srcConnector = srcConnector;
-        return this;
-    }
-
-    public MetadataCollector getSrcMetadataCollector() {
-        return srcMetadataCollector;
-    }
-
-    public HttpDownloadRequest setSrcMetadataCollector(MetadataCollector srcMetadataCollector) {
-        this.srcMetadataCollector = srcMetadataCollector;
-        return this;
-    }
-
-    public String getSrcResourceId() {
-        return srcResourceId;
-    }
-
-    public HttpDownloadRequest setSrcResourceId(String srcResourceId) {
-        this.srcResourceId = srcResourceId;
-        return this;
-    }
-
-    public String getSrcToken() {
-        return srcToken;
-    }
-
-    public HttpDownloadRequest setSrcToken(String srcToken) {
-        this.srcToken = srcToken;
-        return this;
-    }
-}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
index 07c0191..655eb68 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
@@ -22,17 +22,11 @@
 import io.netty.handler.codec.http.*;
 import io.netty.handler.stream.ChunkedStream;
 import io.netty.util.CharsetUtil;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.*;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.activation.MimetypesFileTypeMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.io.InputStream;
 
 import static io.netty.handler.codec.http.HttpMethod.GET;
 import static io.netty.handler.codec.http.HttpResponseStatus.*;
@@ -43,7 +37,6 @@
     private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
 
     private final HttpTransferRequestsStore transferRequestsStore;
-    private final ExecutorService executor = Executors.newFixedThreadPool(10);
 
     public HttpServerHandler(HttpTransferRequestsStore transferRequestsStore) {
         this.transferRequestsStore = transferRequestsStore;
@@ -51,111 +44,89 @@
 
     @Override
     public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
-        if (!request.decoderResult().isSuccess()) {
-            sendError(ctx, BAD_REQUEST);
-            return;
-        }
 
-        if (request.method() != GET) {
-            sendError(ctx, METHOD_NOT_ALLOWED);
-            return;
-        }
+        try {
+            if (!request.decoderResult().isSuccess()) {
+                sendError(ctx, BAD_REQUEST);
+                return;
+            }
 
-        final String uri = request.uri().substring(1);
-        logger.info("Received download request through url {}", uri);
+            if (request.method() != GET) {
+                sendError(ctx, METHOD_NOT_ALLOWED);
+                return;
+            }
 
-        HttpTransferRequest httpTransferRequest = transferRequestsStore.getDownloadRequest(uri);
+            final String uri = request.uri().substring(request.uri().lastIndexOf("/") + 1);
+            logger.info("Received download request through url {}", uri);
 
-        if (httpTransferRequest == null) {
-            logger.error("Couldn't find transfer request for uri {}", uri);
-            sendError(ctx, NOT_FOUND);
-            return;
-        }
+            AgentHttpDownloadData downloadData = transferRequestsStore.getDownloadRequest(uri);
 
-        Connector connector = httpTransferRequest.getOtherConnector();
-        MetadataCollector metadataCollector = httpTransferRequest.getOtherMetadataCollector();
+            if (downloadData == null) {
+                logger.error("Couldn't find transfer request for uri {}", uri);
+                sendError(ctx, NOT_FOUND);
+                return;
+            }
 
-        ConnectorParams params = httpTransferRequest.getConnectorParams();
+            long fileLength = downloadData.getConnectorConfig().getMetadata().getResourceSize();
 
-        // TODO Load from HTTP Headers
-        AuthToken authToken = httpTransferRequest.getAuthToken();
+            HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+            HttpUtil.setContentLength(response, fileLength);
+            setContentTypeHeader(response, downloadData.getConnectorConfig().getMetadata().getFriendlyName());
 
-        connector.init(params.getResourceServiceHost(),
-                params.getResourceServicePort(), params.getSecretServiceHost(), params.getSecretServicePort());
+            if (HttpUtil.isKeepAlive(request)) {
+                response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
+            }
 
-        metadataCollector.init(params.getResourceServiceHost(), params.getResourceServicePort(),
-                params.getSecretServiceHost(), params.getSecretServicePort());
+            // Write the initial line and the header.
+            ctx.write(response);
 
-        Boolean available = metadataCollector.isAvailable(authToken,
-                httpTransferRequest.getResourceId(), httpTransferRequest.getCredentialToken());
+            // Write the content.
+            ChannelFuture sendFileFuture;
+            ChannelFuture lastContentFuture;
 
+            // TODO: Support chunked streaming
+            if (downloadData.getIncomingStreamingConnector() == null && downloadData.getIncomingChunkedConnector() != null) {
+                logger.error("Chunked data download is not yes supported in Http transport");
+                throw new Exception("Chunked data download is not yes supported in Http transport");
+            }
 
-        if (!available) {
-            logger.error("File resource {} is not available", httpTransferRequest.getResourceId());
-            sendError(ctx, NOT_FOUND);
-            return;
-        }
+            IncomingStreamingConnector incomingStreamingConnector = downloadData.getIncomingStreamingConnector();
+            incomingStreamingConnector.init(downloadData.getConnectorConfig());
+            InputStream inputStream = downloadData.getChildResourcePath().equals("")?
+                    incomingStreamingConnector.fetchInputStream() :
+                    incomingStreamingConnector.fetchInputStream(downloadData.getChildResourcePath());
 
-        FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(authToken,
-                httpTransferRequest.getResourceId(),
-                httpTransferRequest.getChildResourcePath(),
-                httpTransferRequest.getCredentialToken());
+            sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(inputStream)),
+                    ctx.newProgressivePromise());
 
-        long fileLength = fileResourceMetadata.getResourceSize();
+            // HttpChunkedInput will write the end marker (LastHttpContent) for us.
+            lastContentFuture = sendFileFuture;
 
-        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-        HttpUtil.setContentLength(response, fileLength);
-        setContentTypeHeader(response, httpTransferRequest.getResourceId());
-
-        if (HttpUtil.isKeepAlive(request)) {
-            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
-        }
-
-        // Write the initial line and the header.
-        ctx.write(response);
-
-        // Write the content.
-        ChannelFuture sendFileFuture;
-        ChannelFuture lastContentFuture;
-
-        ConnectorContext connectorContext = new ConnectorContext();
-        connectorContext.setStreamBuffer(new DoubleStreamingBuffer());
-        connectorContext.setTransferId(uri);
-        connectorContext.setMetadata(new FileResourceMetadata()); // TODO Resolve
-
-        TransferTask pullTask = new TransferTask(authToken, httpTransferRequest.getResourceId(),
-                httpTransferRequest.getChildResourcePath(), httpTransferRequest.getCredentialToken(),
-                connectorContext, connector);
-
-        // TODO aggregate pullStatusFuture and sendFileFuture for keepalive test
-        Future<Integer> pullStatusFuture = executor.submit(pullTask);
-
-        sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(connectorContext.getStreamBuffer().getInputStream())),
-                ctx.newProgressivePromise());
-
-        // HttpChunkedInput will write the end marker (LastHttpContent) for us.
-        lastContentFuture = sendFileFuture;
-
-        sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
-            @Override
-            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
-                if (total < 0) { // total unknown
-                    logger.error(future.channel() + " Transfer progress: " + progress);
-                } else {
-                    logger.error(future.channel() + " Transfer progress: " + progress + " / " + total);
+            sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
+                @Override
+                public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
+                    if (total < 0) { // total unknown
+                        logger.debug(future.channel() + " Transfer progress: " + progress);
+                    } else {
+                        logger.debug(future.channel() + " Transfer progress: " + progress + " / " + total);
+                    }
                 }
+
+                @Override
+                public void operationComplete(ChannelProgressiveFuture future) {
+                    logger.info(future.channel() + " Transfer complete.");
+                }
+            });
+
+            // Decide whether to close the connection or not.
+            if (!HttpUtil.isKeepAlive(request)) {
+                // Close the connection when the whole content is written out.
+                lastContentFuture.addListener(ChannelFutureListener.CLOSE);
             }
 
-            @Override
-            public void operationComplete(ChannelProgressiveFuture future) {
-                System.err.println(future.channel() + " Transfer complete.");
-            }
-        });
-
-        // Decide whether to close the connection or not.
-        if (!HttpUtil.isKeepAlive(request)) {
-            // Close the connection when the whole content is written out.
-            lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+        } catch (Exception e) {
+            logger.error("Errored while processing HTTP download request", e);
+            sendError(ctx, INTERNAL_SERVER_ERROR);
         }
     }
 
@@ -185,7 +156,7 @@
     }
 
     private static void setContentTypeHeader(HttpResponse response, String path) {
-        MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
-        response.headers().set(HttpHeaderNames.CONTENT_TYPE, path);
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
+        response.headers().set(HttpHeaderNames.CONTENT_DISPOSITION, "attachment; filename=\"" + path+ "\"");
     }
 }
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java
deleted file mode 100644
index c6d5192..0000000
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.agent.http;
-
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.core.api.MetadataCollector;
-
-public class HttpTransferRequest {
-    private Connector otherConnector;
-    private MetadataCollector otherMetadataCollector;
-    private ConnectorParams connectorParams;
-    private String resourceId;
-    private String childResourcePath;
-    private String credentialToken;
-    private long createdTime = System.currentTimeMillis();
-    private AuthToken authToken;
-
-    public Connector getOtherConnector() {
-        return otherConnector;
-    }
-
-    public HttpTransferRequest setOtherConnector(Connector otherConnector) {
-        this.otherConnector = otherConnector;
-        return this;
-    }
-
-    public MetadataCollector getOtherMetadataCollector() {
-        return otherMetadataCollector;
-    }
-
-    public HttpTransferRequest setOtherMetadataCollector(MetadataCollector otherMetadataCollector) {
-        this.otherMetadataCollector = otherMetadataCollector;
-        return this;
-    }
-
-    public String getResourceId() {
-        return resourceId;
-    }
-
-    public HttpTransferRequest setResourceId(String resourceId) {
-        this.resourceId = resourceId;
-        return this;
-    }
-
-    public String getChildResourcePath() {
-        return childResourcePath;
-    }
-
-    public HttpTransferRequest setChildResourcePath(String childResourcePath) {
-        this.childResourcePath = childResourcePath;
-        return this;
-    }
-
-    public String getCredentialToken() {
-        return credentialToken;
-    }
-
-    public HttpTransferRequest setCredentialToken(String credentialToken) {
-        this.credentialToken = credentialToken;
-        return this;
-    }
-
-    public ConnectorParams getConnectorParams() {
-        return connectorParams;
-    }
-
-    public HttpTransferRequest setConnectorParams(ConnectorParams connectorParams) {
-        this.connectorParams = connectorParams;
-        return this;
-    }
-
-    public long getCreatedTime() {
-        return createdTime;
-    }
-
-    public HttpTransferRequest setCreatedTime(long createdTime) {
-        this.createdTime = createdTime;
-        return this;
-    }
-
-    public AuthToken getAuthToken() {
-        return authToken;
-    }
-
-    public HttpTransferRequest setAuthToken(AuthToken authToken) {
-        this.authToken = authToken;
-        return this;
-    }
-}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
index dd2f17e..56c468b 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
@@ -31,8 +31,8 @@
 
     private static final Logger logger = LoggerFactory.getLogger(HttpTransferRequestsStore.class);
 
-    final private Map<String, HttpTransferRequest> downloadRequestStore = new ConcurrentHashMap<>();
-    final private Map<String, HttpTransferRequest> uploadRequestStore = new ConcurrentHashMap<>();
+    final private Map<String, AgentHttpDownloadData> downloadRequestStore = new ConcurrentHashMap<>();
+    final private Map<String, AgentHttpDownloadData> uploadRequestStore = new ConcurrentHashMap<>();
 
     final private ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
     private long entryExpiryTimeMS = 300 * 1000;
@@ -56,32 +56,32 @@
         }, 2, 10, TimeUnit.SECONDS);
     }
 
-    public String addDownloadRequest(HttpTransferRequest request) {
+    public String addDownloadRequest(AgentHttpDownloadData request) {
         String randomUrl = UUID.randomUUID().toString();
         downloadRequestStore.put(randomUrl, request);
         return randomUrl;
     }
 
-    public HttpTransferRequest getDownloadRequest(String url) {
+    public AgentHttpDownloadData getDownloadRequest(String url) {
 
         //TODO  Need to block concurrent calls to same url as connectors are not thread safe
-        HttpTransferRequest request = downloadRequestStore.get(url);
+        AgentHttpDownloadData request = downloadRequestStore.get(url);
         if (request != null) {
             downloadRequestStore.remove(url);
         }
         return request;
     }
 
-    public String addUploadRequest(HttpTransferRequest request) {
+    public String addUploadRequest(AgentHttpDownloadData request) {
         String randomUrl = UUID.randomUUID().toString();
         uploadRequestStore.put(randomUrl, request);
         return randomUrl;
     }
 
-    public HttpTransferRequest getUploadRequest(String url) {
+    public AgentHttpDownloadData getUploadRequest(String url) {
 
         //TODO  Need to block concurrent calls to same url as connectors are not thread safe
-        HttpTransferRequest request = uploadRequestStore.get(url);
+        AgentHttpDownloadData request = uploadRequestStore.get(url);
         if (request != null) {
             uploadRequestStore.remove(url);
         }
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
index 7cf638b..03466d4 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
@@ -21,15 +21,16 @@
 import com.google.protobuf.util.JsonFormat;
 import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
 import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
-import org.apache.airavata.mft.agent.http.ConnectorParams;
-import org.apache.airavata.mft.agent.http.HttpTransferRequest;
+import org.apache.airavata.mft.agent.http.AgentHttpDownloadData;
 import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
 import org.apache.airavata.mft.common.AuthToken;
 import org.apache.airavata.mft.core.ConnectorResolver;
 import org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
-import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,14 +54,8 @@
     @org.springframework.beans.factory.annotation.Value("${secret.service.port}")
     private int secretServicePort;
 
-    @org.springframework.beans.factory.annotation.Value("${agent.advertised.host}")
-    private String agentAdvertisedHost;
-
-    @org.springframework.beans.factory.annotation.Value("${agent.http.port}")
-    private Integer agentHttpPort;
-
-    @org.springframework.beans.factory.annotation.Value("${agent.https.enabled}")
-    private boolean agentHttpsEnabled;
+    @org.springframework.beans.factory.annotation.Value("${agent.advertised.url}")
+    private String agentAdvertisedUrl;
 
     @Autowired
     private HttpTransferRequestsStore httpTransferRequestsStore;
@@ -68,6 +63,7 @@
     public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
         // TODO implement using the reflection
         ObjectMapper mapper = new ObjectMapper();
+        logger.info("Accepting sync request {} for method {}", request.getRequestId(), request.getMethod());
 
         switch (request.getMethod()) {
             case "getFileResourceMetadata":
@@ -159,25 +155,44 @@
                 mftAuthorizationToken = tokenBuilder.build();
 
                 metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(storeType);
-                Optional<Connector> connectorOp = ConnectorResolver.resolveConnector(storeType, "IN");
+                Optional<IncomingStreamingConnector> connectorStreamingOp = ConnectorResolver.resolveIncomingStreamingConnector(storeType);
+                Optional<IncomingChunkedConnector> connectorChunkedOp = ConnectorResolver.resolveIncomingChunkedConnector(storeType);
 
-                if (metadataCollectorOp.isPresent() && connectorOp.isPresent()) {
-                    HttpTransferRequest transferRequest = new HttpTransferRequest()
-                            .setConnectorParams(new ConnectorParams()
-                                .setResourceServiceHost(resourceServiceHost)
-                                .setResourceServicePort(resourceServicePort)
-                                .setSecretServiceHost(secretServiceHost)
-                                .setSecretServicePort(secretServicePort))
-                            .setResourceId(resourceId)
-                            .setChildResourcePath(childResourcePath)
-                            .setCredentialToken(sourceToken)
-                            .setOtherMetadataCollector(metadataCollectorOp.get())
-                            .setOtherConnector(connectorOp.get())
-                            .setAuthToken(mftAuthorizationToken);
-                    String url = httpTransferRequestsStore.addDownloadRequest(transferRequest);
-                    return (agentHttpsEnabled? "https": "http") + "://" + agentAdvertisedHost + ":" + agentHttpPort + "/" + url;
+                if (metadataCollectorOp.isPresent() && (connectorStreamingOp.isPresent() || connectorChunkedOp.isPresent())) {
+
+                    MetadataCollector metadataCollector = metadataCollectorOp.get();
+                    metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+
+                    FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(
+                            mftAuthorizationToken,
+                            resourceId,
+                            childResourcePath,
+                            sourceToken);
+
+                    AgentHttpDownloadData.AgentHttpDownloadDataBuilder agentHttpDownloadDataBuilder = AgentHttpDownloadData.AgentHttpDownloadDataBuilder.newBuilder()
+                            .withChildResourcePath(childResourcePath)
+                            .withConnectorConfig(ConnectorConfig.ConnectorConfigBuilder.newBuilder()
+                                    .withResourceServiceHost(resourceServiceHost)
+                                    .withResourceServicePort(resourceServicePort)
+                                    .withSecretServiceHost(secretServiceHost)
+                                    .withSecretServicePort(secretServicePort)
+                                    .withResourceId(resourceId)
+                                    .withCredentialToken(sourceToken)
+                                    .withAuthToken(mftAuthorizationToken)
+                                    .withMetadata(fileResourceMetadata).build());
+
+                    connectorStreamingOp.ifPresent(agentHttpDownloadDataBuilder::withIncomingStreamingConnector);
+                    connectorChunkedOp.ifPresent(agentHttpDownloadDataBuilder::withIncomingChunkedConnector);
+
+                    AgentHttpDownloadData downloadData = agentHttpDownloadDataBuilder.build();
+
+                    String url = httpTransferRequestsStore.addDownloadRequest(downloadData);
+
+                    return (agentAdvertisedUrl.endsWith("/")? agentAdvertisedUrl : agentAdvertisedUrl + "/") + url;
+                } else {
+                    logger.error("Medata collector or connector is not available for store type {}", storeType);
+                    throw new Exception("Medata collector or connector is not available for store type " + storeType);
                 }
-                break;
         }
         logger.error("Unknown method type specified {}", request.getMethod());
         throw new Exception("Unknown method " + request.getMethod());
diff --git a/agent/src/main/resources/application.properties b/agent/src/main/resources/application.properties
index d5949da..4d8d87c 100644
--- a/agent/src/main/resources/application.properties
+++ b/agent/src/main/resources/application.properties
@@ -31,4 +31,5 @@
 resource.service.host=localhost
 resource.service.port=7002
 secret.service.host=localhost
-secret.service.port=7003
\ No newline at end of file
+secret.service.port=7003
+agent.advertised.url=http://localhost:3333
\ No newline at end of file
diff --git a/agent/src/main/resources/distribution/conf/application.properties b/agent/src/main/resources/distribution/conf/application.properties
index d5949da..1b00a6c 100644
--- a/agent/src/main/resources/distribution/conf/application.properties
+++ b/agent/src/main/resources/distribution/conf/application.properties
@@ -19,7 +19,7 @@
 agent.id=agent0
 agent.secret=CHANGE_ME
 agent.host=localhost
-agent.advertised.host=localhost
+agent.advertised.url=http://localhost
 agent.user=dimuthu
 agent.http.port=3333
 agent.https.enabled=false
diff --git a/api/client/src/main/java/org/apache/airavata/mft/api/client/MFTApiClient.java b/api/client/src/main/java/org/apache/airavata/mft/api/client/MFTApiClient.java
index 82bfcbb..d103849 100644
--- a/api/client/src/main/java/org/apache/airavata/mft/api/client/MFTApiClient.java
+++ b/api/client/src/main/java/org/apache/airavata/mft/api/client/MFTApiClient.java
@@ -21,24 +21,28 @@
 import io.grpc.ManagedChannelBuilder;
 import org.apache.airavata.mft.api.service.*;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class MFTApiClient {
-    private static Map<String, Map<Integer, MFTApiServiceGrpc.MFTApiServiceBlockingStub>> stubCache = new ConcurrentHashMap<>();
+public class MFTApiClient implements Closeable {
 
-    public static MFTApiServiceGrpc.MFTApiServiceBlockingStub buildClient(String hostName, int port) {
+    private final ManagedChannel channel;
 
-        if (stubCache.containsKey(hostName)) {
-            if (stubCache.get(hostName).containsKey(port)) {
-                return stubCache.get(hostName).get(port);
-            }
+    public MFTApiClient(String hostName, int port) {
+        channel = ManagedChannelBuilder.forAddress(hostName, port).usePlaintext().build();
+    }
+
+    public MFTApiServiceGrpc.MFTApiServiceBlockingStub get() {
+        return MFTApiServiceGrpc.newBlockingStub(channel);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (channel != null) {
+            channel.shutdown();
         }
-
-        ManagedChannel channel = ManagedChannelBuilder.forAddress(hostName, port).usePlaintext().build();
-        MFTApiServiceGrpc.MFTApiServiceBlockingStub stub = MFTApiServiceGrpc.newBlockingStub(channel);
-        stubCache.put(hostName, Collections.singletonMap(port, stub));
-        return stub;
     }
 }
diff --git a/api/client/src/main/java/org/apache/airavata/mft/api/client/examples/Example.java b/api/client/src/main/java/org/apache/airavata/mft/api/client/examples/Example.java
index ca63e1c..808c913 100644
--- a/api/client/src/main/java/org/apache/airavata/mft/api/client/examples/Example.java
+++ b/api/client/src/main/java/org/apache/airavata/mft/api/client/examples/Example.java
@@ -8,7 +8,7 @@
 public class Example {
 
     public static void main(String a[]) {
-        MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = MFTApiClient.buildClient("localhost", 7004);
+        MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient = new MFTApiClient("localhost", 7004).get();
         mftClient.getResourceAvailability(ResourceAvailabilityRequest.newBuilder()
                 .setResourceId("a")
                 .setResourceToken("b")
diff --git a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
index 29f144b..dbce2d2 100644
--- a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
+++ b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
@@ -201,8 +201,13 @@
     public void getFileResourceMetadata(FetchResourceMetadataRequest request, StreamObserver<FileMetadataResponse> responseObserver) {
 
         try {
+
+            logger.info("Calling get file resource metadata for resource {}", request.getResourceId());
+
+            String targetAgent = derriveTargetAgent(request.getTargetAgentId());
+
             SyncRPCRequest.SyncRPCRequestBuilder requestBuilder = SyncRPCRequest.SyncRPCRequestBuilder.builder()
-                    .withAgentId(request.getTargetAgentId())
+                    .withAgentId(targetAgent)
                     .withMessageId(UUID.randomUUID().toString())
                     .withParameter("resourceId", request.getResourceId())
                     .withParameter("resourceType", request.getResourceType())
@@ -249,8 +254,12 @@
     @Override
     public void getDirectoryResourceMetadata(FetchResourceMetadataRequest request, StreamObserver<DirectoryMetadataResponse> responseObserver) {
         try {
+
+            logger.info("Calling get directory metadata for resource {}", request.getResourceId());
+            String targetAgent = derriveTargetAgent(request.getTargetAgentId());
+
             SyncRPCRequest.SyncRPCRequestBuilder requestBuilder = SyncRPCRequest.SyncRPCRequestBuilder.builder()
-                    .withAgentId(request.getTargetAgentId())
+                    .withAgentId(targetAgent)
                     .withMessageId(UUID.randomUUID().toString())
                     .withParameter("resourceId", request.getResourceId())
                     .withParameter("resourceType", request.getResourceType())
diff --git a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
index 469d19e..7a76bcd 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/ConnectorResolver.java
@@ -17,103 +17,84 @@
 
 package org.apache.airavata.mft.core;
 
-import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+import org.apache.airavata.mft.core.api.OutgoingChunkedConnector;
+import org.apache.airavata.mft.core.api.OutgoingStreamingConnector;
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.Optional;
 
 public final class ConnectorResolver {
-    public static Optional<Connector> resolveConnector(String type, String direction) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
+
+    public static Optional<IncomingStreamingConnector> resolveIncomingStreamingConnector(String type) throws Exception {
 
         String className = null;
         switch (type) {
             case "SCP":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.scp.SCPReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.scp.SCPSender";
-                        break;
-                }
-                break;
-            case "LOCAL":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.local.LocalReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.local.LocalSender";
-                        break;
-                }
-                break;
-            case "S3":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.s3.S3Receiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.s3.S3Sender";
-                        break;
-                }
-                break;
-            case "BOX":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.box.BoxReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.box.BoxSender";
-                        break;
-                }
-                break;
-            case "AZURE":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.azure.AzureReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.azure.AzureSender";
-                        break;
-                }
-                break;
-            case "GCS":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.gcp.GCSReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.gcp.GCSSender";
-                        break;
-                }
-                break;
-            case "DROPBOX":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.dropbox.DropboxReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.dropbox.DropboxSender";
-                        break;
-                }
-                break;
-            case "FTP":
-                switch (direction) {
-                    case "IN":
-                        className = "org.apache.airavata.mft.transport.ftp.FTPReceiver";
-                        break;
-                    case "OUT":
-                        className = "org.apache.airavata.mft.transport.ftp.FTPSender";
-                        break;
-                }
+                className = "org.apache.airavata.mft.transport.scp.SCPIncomingConnector";
                 break;
         }
 
         if (className != null) {
             Class<?> aClass = Class.forName(className);
-            return Optional.of((Connector) aClass.getDeclaredConstructor().newInstance());
+            return Optional.of((IncomingStreamingConnector) aClass.getDeclaredConstructor().newInstance());
         } else {
             return Optional.empty();
         }
     }
+
+    public static Optional<OutgoingStreamingConnector> resolveOutgoingStreamingConnector(String type) throws Exception {
+
+        String className = null;
+        switch (type) {
+            case "SCP":
+                className = "org.apache.airavata.mft.transport.scp.SCPOutgoingConnector";
+                break;
+            case "S3":
+                className = "org.apache.airavata.mft.transport.s3.S3IncomingConnector";
+                break;
+        }
+
+        if (className != null) {
+            Class<?> aClass = Class.forName(className);
+            return Optional.of((OutgoingStreamingConnector) aClass.getDeclaredConstructor().newInstance());
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    public static Optional<IncomingChunkedConnector> resolveIncomingChunkedConnector(String type) throws Exception {
+
+        String className = null;
+        switch (type) {
+            case "S3":
+                className = "org.apache.airavata.mft.transport.s3.S3IncomingConnector";
+                break;
+        }
+
+        if (className != null) {
+            Class<?> aClass = Class.forName(className);
+            return Optional.of((IncomingChunkedConnector) aClass.getDeclaredConstructor().newInstance());
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    public static Optional<OutgoingChunkedConnector> resolveOutgoingChunkedConnector(String type) throws Exception {
+
+        String className = null;
+        switch (type) {
+            case "S3":
+                className = "org.apache.airavata.mft.transport.s3.S3OutgoingConnector";
+                break;
+        }
+
+        if (className != null) {
+            Class<?> aClass = Class.forName(className);
+            return Optional.of((OutgoingChunkedConnector) aClass.getDeclaredConstructor().newInstance());
+        } else {
+            return Optional.empty();
+        }
+    }
+
 }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java b/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
index d61d743..7033d02 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
@@ -91,7 +91,7 @@
         private Builder() {
         }
 
-        public static Builder getBuilder() {
+        public static Builder newBuilder() {
             return new Builder();
         }
 
diff --git a/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java b/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
deleted file mode 100644
index 42a0e08..0000000
--- a/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.core;
-
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.api.Connector;
-
-import java.util.concurrent.Callable;
-
-public class TransferTask implements Callable<Integer> {
-
-    private Connector connector;
-    private ConnectorContext context;
-    private String resourceId;
-    private String childResourcePath;
-    private String credentialToken;
-    private AuthToken authToken;
-
-    public TransferTask(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
-                        ConnectorContext context, Connector connector) {
-        this.connector = connector;
-        this.context = context;
-        this.resourceId = resourceId;
-        this.authToken = authToken;
-        this.credentialToken = credentialToken;
-        this.childResourcePath = childResourcePath;
-    }
-
-    @Override
-    public Integer call() throws Exception {
-        if (childResourcePath == null || "".equals(childResourcePath)) {
-            this.connector.startStream(authToken, resourceId, credentialToken, context);
-        } else {
-            this.connector.startStream(authToken, resourceId, childResourcePath, credentialToken, context);
-        }
-        return 0;
-    }
-}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
new file mode 100644
index 0000000..1ca72d6
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java
@@ -0,0 +1,6 @@
+package org.apache.airavata.mft.core.api;
+
+public interface BasicConnector {
+    public void init(ConnectorConfig connectorConfig) throws Exception;
+    public void complete() throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
new file mode 100644
index 0000000..bc754e8
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
@@ -0,0 +1,167 @@
+package org.apache.airavata.mft.core.api;
+
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.core.FileResourceMetadata;
+
+public class ConnectorConfig {
+    private String resourceServiceHost;
+    private int resourceServicePort;
+    private String secretServiceHost;
+    private int secretServicePort;
+    private String resourceId;
+    private String credentialToken;
+    private AuthToken authToken;
+    private String transferId;
+    private FileResourceMetadata metadata;
+
+    public String getResourceServiceHost() {
+        return resourceServiceHost;
+    }
+
+    public void setResourceServiceHost(String resourceServiceHost) {
+        this.resourceServiceHost = resourceServiceHost;
+    }
+
+    public int getResourceServicePort() {
+        return resourceServicePort;
+    }
+
+    public void setResourceServicePort(int resourceServicePort) {
+        this.resourceServicePort = resourceServicePort;
+    }
+
+    public String getSecretServiceHost() {
+        return secretServiceHost;
+    }
+
+    public void setSecretServiceHost(String secretServiceHost) {
+        this.secretServiceHost = secretServiceHost;
+    }
+
+    public int getSecretServicePort() {
+        return secretServicePort;
+    }
+
+    public void setSecretServicePort(int secretServicePort) {
+        this.secretServicePort = secretServicePort;
+    }
+
+    public String getResourceId() {
+        return resourceId;
+    }
+
+    public void setResourceId(String resourceId) {
+        this.resourceId = resourceId;
+    }
+
+    public String getCredentialToken() {
+        return credentialToken;
+    }
+
+    public void setCredentialToken(String credentialToken) {
+        this.credentialToken = credentialToken;
+    }
+
+    public AuthToken getAuthToken() {
+        return authToken;
+    }
+
+    public void setAuthToken(AuthToken authToken) {
+        this.authToken = authToken;
+    }
+
+    public String getTransferId() {
+        return transferId;
+    }
+
+    public void setTransferId(String transferId) {
+        this.transferId = transferId;
+    }
+
+    public FileResourceMetadata getMetadata() {
+        return metadata;
+    }
+
+    public void setMetadata(FileResourceMetadata metadata) {
+        this.metadata = metadata;
+    }
+
+
+    public static final class ConnectorConfigBuilder {
+        private String resourceServiceHost;
+        private int resourceServicePort;
+        private String secretServiceHost;
+        private int secretServicePort;
+        private String resourceId;
+        private String credentialToken;
+        private AuthToken authToken;
+        private String transferId;
+        private FileResourceMetadata metadata;
+
+        private ConnectorConfigBuilder() {
+        }
+
+        public static ConnectorConfigBuilder newBuilder() {
+            return new ConnectorConfigBuilder();
+        }
+
+        public ConnectorConfigBuilder withResourceServiceHost(String resourceServiceHost) {
+            this.resourceServiceHost = resourceServiceHost;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withResourceServicePort(int resourceServicePort) {
+            this.resourceServicePort = resourceServicePort;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withSecretServiceHost(String secretServiceHost) {
+            this.secretServiceHost = secretServiceHost;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withSecretServicePort(int secretServicePort) {
+            this.secretServicePort = secretServicePort;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withResourceId(String resourceId) {
+            this.resourceId = resourceId;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withCredentialToken(String credentialToken) {
+            this.credentialToken = credentialToken;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withAuthToken(AuthToken authToken) {
+            this.authToken = authToken;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withTransferId(String transferId) {
+            this.transferId = transferId;
+            return this;
+        }
+
+        public ConnectorConfigBuilder withMetadata(FileResourceMetadata metadata) {
+            this.metadata = metadata;
+            return this;
+        }
+
+        public ConnectorConfig build() {
+            ConnectorConfig connectorConfig = new ConnectorConfig();
+            connectorConfig.setResourceServiceHost(resourceServiceHost);
+            connectorConfig.setResourceServicePort(resourceServicePort);
+            connectorConfig.setSecretServiceHost(secretServiceHost);
+            connectorConfig.setSecretServicePort(secretServicePort);
+            connectorConfig.setResourceId(resourceId);
+            connectorConfig.setCredentialToken(credentialToken);
+            connectorConfig.setAuthToken(authToken);
+            connectorConfig.setTransferId(transferId);
+            connectorConfig.setMetadata(metadata);
+            return connectorConfig;
+        }
+    }
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
new file mode 100644
index 0000000..37288c6
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
@@ -0,0 +1,5 @@
+package org.apache.airavata.mft.core.api;
+
+public interface IncomingChunkedConnector extends BasicConnector {
+    public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
new file mode 100644
index 0000000..900b148
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.core.api;
+
+import java.io.InputStream;
+
+public interface IncomingStreamingConnector {
+    public void init(ConnectorConfig connectorConfig) throws Exception;
+    public InputStream fetchInputStream() throws Exception;
+    public InputStream fetchInputStream(String childPath) throws Exception;
+    public void complete() throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
new file mode 100644
index 0000000..9a2f3cb
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
@@ -0,0 +1,5 @@
+package org.apache.airavata.mft.core.api;
+
+public interface OutgoingChunkedConnector extends BasicConnector {
+    public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception;
+}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
new file mode 100644
index 0000000..1e2633d
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingStreamingConnector.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.core.api;
+
+import java.io.OutputStream;
+
+public interface OutgoingStreamingConnector extends BasicConnector {
+    public OutputStream fetchOutputStream() throws Exception;
+    public OutputStream fetchOutputStream(String childPath) throws Exception;
+}
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java
index 4e95295..31f8f02 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java
@@ -28,7 +28,7 @@
     public static void main(String args[]) {
         AuthToken mftAuthorizationToken = AuthToken.newBuilder().setUserTokenAuth(UserTokenAuth.newBuilder().setToken("43ff79ac-e4f2-473c-9ea1-04eee9509a53").build()).build();
 
-        MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
+        MFTApiServiceGrpc.MFTApiServiceBlockingStub client = new MFTApiClient("localhost", 7004).get();
         HttpDownloadApiResponse httpDownloadApiResponse = client.submitHttpDownload(HttpDownloadApiRequest.newBuilder()
                 .setTargetAgent("agent0")
                 .setSourceResourceId("remote-ssh-resource")
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/metadata/SCPExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/metadata/SCPExample.java
index 3cf2ab0..0a95ac1 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/metadata/SCPExample.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/metadata/SCPExample.java
@@ -8,7 +8,7 @@
 
 public class SCPExample {
     public static void main(String args[]) throws Exception {
-        MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
+        MFTApiServiceGrpc.MFTApiServiceBlockingStub client = new MFTApiClient("localhost", 7004).get();
 
         // File metadata
         long startTime = System.currentTimeMillis();
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
index 2869a38..096cba9 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
@@ -26,7 +26,7 @@
 
 public class LocalExample {
     public static void main(String args[]) throws Exception {
-        MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
+        MFTApiServiceGrpc.MFTApiServiceBlockingStub client = new MFTApiClient("localhost", 7004).get();
 
         String sourceResourceId = "remote-ssh-resource";
         String sourceToken = "local-ssh-cred";
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
index eeb48f9..6894b7e 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
@@ -26,7 +26,7 @@
 
 public class S3Example {
     public static void main(String args[]) throws Exception {
-        MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
+        MFTApiServiceGrpc.MFTApiServiceBlockingStub client = new MFTApiClient("localhost", 7004).get();
 
         String sourceResourceId = "remote-ssh-storage";
         String sourceResourcePath = "/tmp/1mb.txt";
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
index 6e740b9..c48119b 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
@@ -26,7 +26,7 @@
 
 public class SCPExample {
     public static void main(String args[]) throws Exception {
-        MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
+        MFTApiServiceGrpc.MFTApiServiceBlockingStub client = new MFTApiClient("localhost", 7004).get();
 
         String sourceResourceId = "remote-ssh-resource-1";
         String sourceResourcePath = "/tmp/1mb.txt";
diff --git a/pom.xml b/pom.xml
index 57ce5a3..567b6be 100755
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
         <jsch>0.1.55</jsch>
         <sshj>0.27.0</sshj>
         <mariadb.jdbc>2.5.1</mariadb.jdbc>
-        <custos.clients.version>1.0-SNAPSHOT</custos.clients.version>
+        <custos.clients.version>1.1-SNAPSHOT</custos.clients.version>
     </properties>
 
 </project>
diff --git a/scripts/build.sh b/scripts/build.sh
index 5bb68ab..2d5e4af 100755
--- a/scripts/build.sh
+++ b/scripts/build.sh
@@ -19,6 +19,7 @@
 
 cd ../
 mvn clean install
+rm -rf build
 mkdir -p build
 cp agent/target/MFT-Agent-0.01-bin.zip build/
 cp controller/target/MFT-Controller-0.01-bin.zip build/
@@ -30,4 +31,4 @@
 unzip -o build/MFT-Controller-0.01-bin.zip -d build/
 unzip -o build/Resource-Service-0.01-bin.zip -d build/
 unzip -o build/Secret-Service-0.01-bin.zip -d build/
-unzip -o build/API-Service-0.01-bin.zip -d build/
\ No newline at end of file
+unzip -o build/API-Service-0.01-bin.zip -d build/
diff --git a/services/resource-service/server/pom.xml b/services/resource-service/server/pom.xml
index 3b8712e..1e3e9d0 100644
--- a/services/resource-service/server/pom.xml
+++ b/services/resource-service/server/pom.xml
@@ -68,6 +68,12 @@
             <groupId>org.apache.airavata.data.lake</groupId>
             <artifactId>drms-stubs</artifactId>
             <version>0.01-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/datalake/DatalakeResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/datalake/DatalakeResourceBackend.java
index bd00053..931f6db 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/datalake/DatalakeResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/datalake/DatalakeResourceBackend.java
@@ -19,12 +19,16 @@
 
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.drms.AuthCredentialType;
+import org.apache.airavata.datalake.drms.AuthenticatedUser;
 import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
 import org.apache.airavata.datalake.drms.storage.ResourceFetchRequest;
 import org.apache.airavata.datalake.drms.storage.ResourceFetchResponse;
 import org.apache.airavata.datalake.drms.storage.ResourceServiceGrpc;
 import org.apache.airavata.datalake.drms.storage.ssh.SSHStorage;
 import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.DelegateAuth;
+import org.apache.airavata.mft.common.PasswordAuth;
 import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
 import org.apache.airavata.mft.resource.stubs.azure.storage.*;
 import org.apache.airavata.mft.resource.stubs.box.storage.*;
@@ -38,6 +42,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
 import java.util.Optional;
 
 public class DatalakeResourceBackend implements ResourceBackend {
@@ -72,14 +78,44 @@
         }
     }
 
+    private DRMSServiceAuthToken getDrmsToken(AuthToken authToken) {
+        switch (authToken.getAuthMechanismCase()) {
+            case USERTOKENAUTH:
+                return DRMSServiceAuthToken.newBuilder().setAccessToken(authToken.getUserTokenAuth().getToken()).build();
+
+            case DELEGATEAUTH:
+                DelegateAuth delegateAuth = authToken.getDelegateAuth();
+                return DRMSServiceAuthToken.newBuilder()
+                        .setAccessToken(Base64.getEncoder()
+                                .encodeToString((delegateAuth.getClientId() + ":" + delegateAuth.getClientSecret())
+                                        .getBytes(StandardCharsets.UTF_8)))
+                        .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+                        .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+                                .setUsername(delegateAuth.getUserId())
+                                .setTenantId(delegateAuth.getPropertiesOrThrow("TENANT_ID"))
+                                .build())
+                        .build();
+
+        }
+        return null;
+
+    }
     @Override
     public Optional<GenericResource> getGenericResource(GenericResourceGetRequest request) throws Exception {
 
         AuthToken authzToken = request.getAuthzToken();
 
+        DRMSServiceAuthToken drmsServiceAuthToken = getDrmsToken(authzToken);
+
+        if (drmsServiceAuthToken == null) {
+            logger.error("DRMS Service auth token can not be null. Invalid token type {} specified",
+                    authzToken.getAuthMechanismCase());
+            throw new Exception("DRMS Service auth token can not be null. Invalid token type specified");
+        }
+
         ResourceServiceGrpc.ResourceServiceBlockingStub datalakeResourceStub = ResourceServiceGrpc.newBlockingStub(channel);
         ResourceFetchResponse resourceFetchResponse = datalakeResourceStub.fetchResource(ResourceFetchRequest.newBuilder()
-                .setAuthToken(DRMSServiceAuthToken.newBuilder().setAccessToken(authzToken.getUserTokenAuth().getToken()).build())
+                .setAuthToken(drmsServiceAuthToken)
                 .setResourceId(request.getResourceId())
                 .build());
 
@@ -105,10 +141,9 @@
         switch (resource.getStorageCase()) {
             case SSH_STORAGE:
                 SSHStorage storage = resource.getSshStorage();
-//                resourceBuilder.setScpStorage(SCPStorage.newBuilder()
-//                        .setStorageId(storage.getStorageId()).setHost(storage.getHostName())
-//                        .setPort(storage.getPort())
-//                        .setUser(sshPreference.getUserName()).build());
+                resourceBuilder.setScpStorage(SCPStorage.newBuilder()
+                        .setStorageId(storage.getStorageId()).setHost(storage.getHostName())
+                        .setPort(storage.getPort()).build());
                 break;
             case S3_STORAGE:
                 org.apache.airavata.datalake.drms.storage.s3.S3Storage s3Storage = resource.getS3Storage();
@@ -299,6 +334,4 @@
     public boolean deleteFTPStorage(FTPStorageDeleteRequest request) throws Exception {
         return false;
     }
-
-
 }
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GenericResourceServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GenericResourceServiceHandler.java
index 491cea2..5ed85e4 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GenericResourceServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GenericResourceServiceHandler.java
@@ -33,7 +33,7 @@
             logger.error("Failed in retrieving generic resource with id {}", request.getResourceId(), e);
 
             responseObserver.onError(Status.INTERNAL.withCause(e)
-                    .withDescription("Failed in retrieving GCS resource with id " + request.getResourceId())
+                    .withDescription("Failed in retrieving Generic resource with id " + request.getResourceId())
                     .asRuntimeException());
         }
     }
diff --git a/services/resource-service/server/src/main/resources/applicationContext.xml b/services/resource-service/server/src/main/resources/applicationContext.xml
index 2fbd7eb..ea845a3 100644
--- a/services/resource-service/server/src/main/resources/applicationContext.xml
+++ b/services/resource-service/server/src/main/resources/applicationContext.xml
@@ -6,7 +6,7 @@
         http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd">
 
-    <bean id="resourceBackend" class="org.apache.airavata.mft.resource.server.backend.datalake.DatalakeResourceBackend"
+    <bean id="resourceBackend" class="org.apache.airavata.mft.resource.server.backend.file.FileBasedResourceBackend"
           init-method="init" destroy-method="destroy"></bean>
 
 </beans>
\ No newline at end of file
diff --git a/services/secret-service/server/pom.xml b/services/secret-service/server/pom.xml
index 8293b24..bfcc650 100644
--- a/services/secret-service/server/pom.xml
+++ b/services/secret-service/server/pom.xml
@@ -78,6 +78,17 @@
             <artifactId>json-simple</artifactId>
             <version>1.1.1</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata.data.lake</groupId>
+            <artifactId>drms-stubs</artifactId>
+            <version>0.01-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/CustosSecretBackend.java b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/CustosSecretBackend.java
index 601f2d8..c71ef77 100644
--- a/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/CustosSecretBackend.java
+++ b/services/secret-service/server/src/main/java/org/apache/airavata/mft/secret/server/backend/custos/CustosSecretBackend.java
@@ -1,5 +1,13 @@
 package org.apache.airavata.mft.secret.server.backend.custos;
 
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.apache.airavata.datalake.drms.AuthCredentialType;
+import org.apache.airavata.datalake.drms.AuthenticatedUser;
+import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
+import org.apache.airavata.datalake.drms.storage.*;
+import org.apache.airavata.datalake.drms.storage.preference.ssh.SSHStoragePreference;
+import org.apache.airavata.mft.common.AuthToken;
 import org.apache.airavata.mft.common.DelegateAuth;
 import org.apache.airavata.mft.credential.stubs.azure.*;
 import org.apache.airavata.mft.credential.stubs.box.*;
@@ -24,6 +32,8 @@
 import org.springframework.beans.factory.annotation.Value;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
 import java.util.Map;
 import java.util.Optional;
 
@@ -45,6 +55,12 @@
     @Value("${custos.secret}")
     private String custosSecret;
 
+    @Value("${custos.backend.drms.host}")
+    private String drmsHost;
+
+    @Value("${custos.backend.drms.port}")
+    private int drmsPort;
+
     private AgentAuthenticationHandler handler;
 
     private CustosClientsFactory custosClientsFactory;
@@ -88,18 +104,80 @@
         }
     }
 
+    private AnyStoragePreference getStoragePreference(String storagePefId) {
+        return AnyStoragePreference.newBuilder().build();
+    }
+
+    private DRMSServiceAuthToken getDrmsToken(AuthToken authToken) {
+        switch (authToken.getAuthMechanismCase()) {
+            case USERTOKENAUTH:
+                return DRMSServiceAuthToken.newBuilder().setAccessToken(authToken.getUserTokenAuth().getToken()).build();
+
+            case DELEGATEAUTH:
+                DelegateAuth delegateAuth = authToken.getDelegateAuth();
+                return DRMSServiceAuthToken.newBuilder()
+                        .setAccessToken(Base64.getEncoder()
+                                .encodeToString((delegateAuth.getClientId() + ":" + delegateAuth.getClientSecret())
+                                        .getBytes(StandardCharsets.UTF_8)))
+                        .setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+                        .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+                                .setUsername(delegateAuth.getUserId())
+                                .setTenantId(delegateAuth.getPropertiesOrThrow("TENANT_ID"))
+                                .build())
+                        .build();
+        }
+        return null;
+    }
+
+
     @Override
     public Optional<SCPSecret> getSCPSecret(SCPSecretGetRequest request) throws Exception {
+
+        DRMSServiceAuthToken drmsToken = getDrmsToken(request.getAuthzToken());
+
+        if (drmsToken == null) {
+            LOGGER.error("DRMS Token can not be null");
+            return Optional.empty();
+        }
+
+        String storagePrefId = request.getSecretId();
+
+        ManagedChannel channel = ManagedChannelBuilder.forAddress(drmsHost, drmsPort).usePlaintext().build();
+        AnyStoragePreference storagePreference;
+
+        try {
+            StoragePreferenceServiceGrpc.StoragePreferenceServiceBlockingStub spClient =
+                    StoragePreferenceServiceGrpc.newBlockingStub(channel);
+
+            StoragePreferenceFetchResponse storagePreferenceResp = spClient.
+                    fetchStoragePreference(StoragePreferenceFetchRequest.newBuilder().
+                            setAuthToken(drmsToken).setStoragePreferenceId(storagePrefId).build());
+
+            storagePreference = storagePreferenceResp.getStoragePreference();
+        } finally {
+            channel.shutdown();
+        }
+
+        SSHStoragePreference sshStoragePreference;
+        if (storagePreference.getStorageCase() == AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE) {
+            sshStoragePreference = storagePreference.getSshStoragePreference();
+        } else {
+            LOGGER.error("Invalid storage case {} for preference {}", storagePreference.getStorageCase(), storagePrefId);
+            return Optional.empty();
+        }
+
         switch (request.getAuthzToken().getAuthMechanismCase()) {
             case AGENTAUTH:
                 String agentId = request.getAuthzToken().getAgentAuth().getAgentId();
                 String secret = request.getAuthzToken().getAgentAuth().getAgentSecret();
+
                 Optional<AuthConfig> optionalAuthConfig = handler.authenticate(agentId, secret);
                 if (optionalAuthConfig.isPresent()) {
                     AuthConfig authConfig = optionalAuthConfig.get();
                     SSHCredential sshCredential = csAgentClient.getSSHCredential(request.getAuthzToken().getAgentAuth().getToken(),
-                            authConfig.getAccessToken(), request.getSecretId(), false);
+                            authConfig.getAccessToken(), sshStoragePreference.getCredentialToken(), false);
                     SCPSecret scpSecret = SCPSecret.newBuilder()
+                            .setUser(sshStoragePreference.getUserName())
                             .setSecretId(sshCredential.getMetadata().getToken())
                             .setPublicKey(sshCredential.getPublicKey())
                             .setPassphrase(sshCredential.getPassphrase())
@@ -110,8 +188,9 @@
             case USERTOKENAUTH:
                 if (identityClient.isAuthenticated(request.getAuthzToken().getUserTokenAuth().getToken())) {
                     //custosId need to be replaced with actual gateway custos Id
-                    SSHCredential sshCredential = csClient.getSSHCredential(custosId, request.getSecretId(), false);
+                    SSHCredential sshCredential = csClient.getSSHCredential(custosId, sshStoragePreference.getCredentialToken(), false);
                     SCPSecret scpSecret = SCPSecret.newBuilder()
+                            .setUser(sshStoragePreference.getUserName())
                             .setSecretId(sshCredential.getMetadata().getToken())
                             .setPublicKey(sshCredential.getPublicKey())
                             .setPassphrase(sshCredential.getPassphrase())
@@ -122,9 +201,10 @@
             case DELEGATEAUTH:
                 DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
                 ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
-                SSHCredential sshCredential = csClient.getSSHCredential(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
-                        request.getSecretId(), false);
+                SSHCredential sshCredential = csClient.getSSHCredential(delegateAuth.getPropertiesMap().get("TENANT_ID"),
+                        sshStoragePreference.getCredentialToken(), false);
                 SCPSecret scpSecret = SCPSecret.newBuilder()
+                        .setUser(sshStoragePreference.getUserName())
                         .setSecretId(sshCredential.getMetadata().getToken())
                         .setPublicKey(sshCredential.getPublicKey())
                         .setPassphrase(sshCredential.getPassphrase())
@@ -183,7 +263,7 @@
             case DELEGATEAUTH:
                 DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
                 ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
-                CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
+                CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("TENANT_ID"),
                         request.getSecretId());
                 Map<String, String> secretValues = credentialMap.getCredentialMapMap();
                 S3Secret s3Secret = S3Secret.newBuilder()
@@ -242,7 +322,7 @@
             case DELEGATEAUTH:
                 DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
                 ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
-                CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
+                CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("TENANT_ID"),
                         request.getSecretId());
                 Map<String, String> secretValues = credentialMap.getCredentialMapMap();
                 BoxSecret boxSecret = BoxSecret.newBuilder()
@@ -305,7 +385,7 @@
             case DELEGATEAUTH:
                 DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
                 ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
-                CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
+                CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("TENANT_ID"),
                         request.getSecretId());
                 Map<String, String> secretValues = credentialMap.getCredentialMapMap();
                 AzureSecret azureSecret = AzureSecret.newBuilder()
@@ -367,7 +447,7 @@
             case DELEGATEAUTH:
                 DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
                 ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
-                CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
+                CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("TENANT_ID"),
                         request.getSecretId());
                 Map<String, String> secretValues = credentialMap.getCredentialMapMap();
                 GCSSecret gcsSecret = GCSSecret.newBuilder()
@@ -429,7 +509,7 @@
             case DELEGATEAUTH:
                 DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
                 ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
-                CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
+                CredentialMap credentialMap = csClient.getCredentialMap(delegateAuth.getPropertiesMap().get("TENANT_ID"),
                         request.getSecretId());
                 Map<String, String> secretValues = credentialMap.getCredentialMapMap();
                 DropboxSecret dropboxSecret = DropboxSecret.newBuilder()
@@ -492,9 +572,10 @@
                 break;
             case DELEGATEAUTH:
                 DelegateAuth delegateAuth = request.getAuthzToken().getDelegateAuth();
+                // TODO validate delegate auth token
                 ResourceSecretManagementClient csClient = getTenantResourceSecretManagementClient(delegateAuth);
                 PasswordCredential passwordCredential = csClient
-                        .getPasswordCredential(delegateAuth.getPropertiesMap().get("PORTAL_CUSTOS_ID"),
+                        .getPasswordCredential(delegateAuth.getPropertiesMap().get("TENANT_ID"),
                                 request.getSecretId());
                 FTPSecret ftpSecret = FTPSecret.newBuilder()
                         .setSecretId(request.getSecretId())
@@ -523,10 +604,8 @@
 
 
     private ResourceSecretManagementClient getTenantResourceSecretManagementClient(DelegateAuth delegateAuth) throws IOException {
-        String adminCustosId = delegateAuth.getClientId();
-        String adminCustosSecret = delegateAuth.getClientSecret();
         CustosClientProvider custosClientProvider = custosClientsFactory
-                .getCustosClientProvider(adminCustosId, adminCustosSecret);
+                .getCustosClientProvider(custosId, custosSecret);
         return custosClientProvider
                 .getResourceSecretManagementClient();
     }
diff --git a/transport/azure-transport/pom.xml b/transport/azure-transport/pom.xml
index d2d8d5f..6d05805 100644
--- a/transport/azure-transport/pom.xml
+++ b/transport/azure-transport/pom.xml
@@ -36,7 +36,7 @@
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-blob</artifactId>
-            <version>12.0.0</version>
+            <version>12.13.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.airavata</groupId>
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
new file mode 100644
index 0000000..03b745b
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
@@ -0,0 +1,96 @@
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingChunkedConnector;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
+import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+
+public class S3IncomingConnector implements IncomingChunkedConnector, IncomingStreamingConnector {
+
+    private static final Logger logger = LoggerFactory.getLogger(S3IncomingConnector.class);
+
+    private GenericResource resource;
+    private AmazonS3 s3Client;
+
+    @Override
+    public void init(ConnectorConfig cc) throws Exception {
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setResourceId(cc.getResourceId()).build());
+        }
+
+        if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
+            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+        }
+
+        S3Storage s3Storage = resource.getS3Storage();
+
+        S3Secret s3Secret;
+
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+            s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setSecretId(cc.getCredentialToken()).build());
+
+            BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
+
+            s3Client = AmazonS3ClientBuilder.standard()
+                    .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+                    .withRegion(s3Storage.getRegion())
+                    .build();
+        }
+    }
+
+
+    @Override
+    public InputStream fetchInputStream() throws Exception {
+        S3Object s3object = s3Client.getObject(resource.getS3Storage().getBucketName(), resource.getFile().getResourcePath());
+        return s3object.getObjectContent();
+    }
+
+    @Override
+    public InputStream fetchInputStream(String childPath) throws Exception {
+        S3Object s3object = s3Client.getObject(resource.getS3Storage().getBucketName(), childPath);
+        return s3object.getObjectContent();
+    }
+
+    @Override
+    public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception {
+        GetObjectRequest rangeObjectRequest = new GetObjectRequest(resource.getS3Storage().getBucketName(),
+                resource.getFile().getResourcePath());
+        rangeObjectRequest.setRange(startByte, endByte - 1);
+        ObjectMetadata objectMetadata = s3Client.getObject(rangeObjectRequest, new File(downloadFile));
+        logger.info("Downloaded S3 chunk to path {} for resource id {}", downloadFile, resource.getResourceId());
+    }
+
+    @Override
+    public void complete() throws Exception {
+
+    }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
new file mode 100644
index 0000000..3e95383
--- /dev/null
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -0,0 +1,101 @@
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.*;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.OutgoingChunkedConnector;
+import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
+import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class S3OutgoingConnector implements OutgoingChunkedConnector {
+
+    private static final Logger logger = LoggerFactory.getLogger(S3OutgoingConnector.class);
+
+    private GenericResource resource;
+    private AmazonS3 s3Client;
+
+    InitiateMultipartUploadResult initResponse;
+    List<PartETag> partETags = Collections.synchronizedList(new ArrayList<>());
+
+    @Override
+    public void init(ConnectorConfig cc) throws Exception {
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setResourceId(cc.getResourceId()).build());
+        }
+
+        if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
+            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+        }
+
+        S3Storage s3Storage = resource.getS3Storage();
+
+        S3Secret s3Secret;
+
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+            s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setSecretId(cc.getCredentialToken()).build());
+
+            BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
+
+            s3Client = AmazonS3ClientBuilder.standard()
+                    .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+                    .withRegion(s3Storage.getRegion())
+                    .build();
+        }
+
+        InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(resource.getS3Storage().getBucketName(),
+                resource.getFile().getResourcePath());
+        initResponse = s3Client.initiateMultipartUpload(initRequest);
+    }
+
+    @Override
+    public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception {
+        File file = new File(uploadFile);
+        UploadPartRequest uploadRequest = new UploadPartRequest()
+                .withBucketName(resource.getS3Storage().getBucketName())
+                .withKey(resource.getFile().getResourcePath())
+                .withUploadId(initResponse.getUploadId())
+                .withPartNumber(chunkId + 1)
+                .withFileOffset(0)
+                .withFile(file)
+                .withPartSize(file.length());
+
+        UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+        this.partETags.add(uploadResult.getPartETag());
+        logger.info("Uploaded S3 chunk to path {} for resource id {}", uploadFile, resource.getResourceId());
+    }
+
+
+    @Override
+    public void complete() throws Exception {
+        CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(resource.getS3Storage().getBucketName(),
+                resource.getFile().getResourcePath(), initResponse.getUploadId(), partETags);
+        s3Client.completeMultipartUpload(compRequest);
+    }
+}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
deleted file mode 100644
index a430323..0000000
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.s3;
-
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
-import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
-import org.apache.airavata.mft.secret.client.SecretServiceClient;
-import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-
-public class S3Receiver implements Connector {
-
-    private static final Logger logger = LoggerFactory.getLogger(S3Receiver.class);
-
-    private AmazonS3 s3Client;
-
-    private String resourceServiceHost;
-    private int resourceServicePort;
-    private String secretServiceHost;
-    private int secretServicePort;
-
-    @Override
-    public void init(String resourceServiceHost, int resourceServicePort,
-                     String secretServiceHost, int secretServicePort) throws Exception {
-
-        this.resourceServiceHost = resourceServiceHost;
-        this.resourceServicePort = resourceServicePort;
-        this.secretServiceHost = secretServiceHost;
-        this.secretServicePort = secretServicePort;
-    }
-
-    @Override
-    public void destroy() {
-
-    }
-
-    @Override
-    public void startStream(AuthToken authToken, String resourceId, String credentialToken, ConnectorContext context) throws Exception {
-
-        logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
-
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(resourceId).build());
-
-        if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
-            throw new Exception("Invalid storage type specified for resource " + resourceId);
-        }
-
-        S3Storage s3Storage = resource.getS3Storage();
-
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
-        BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
-
-        s3Client = AmazonS3ClientBuilder.standard()
-                .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
-                .withRegion(s3Storage.getRegion())
-                .build();
-
-        S3Object s3object = s3Client.getObject(s3Storage.getBucketName(), resource.getFile().getResourcePath());
-        S3ObjectInputStream inputStream = s3object.getObjectContent();
-
-        OutputStream os = context.getStreamBuffer().getOutputStream();
-        int read;
-        long bytes = 0;
-        while ((read = inputStream.read()) != -1) {
-            bytes++;
-            os.write(read);
-        }
-        os.flush();
-        os.close();
-
-        logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
-    }
-
-    @Override
-    public void startStream(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
-                            ConnectorContext context) throws Exception {
-        throw new UnsupportedOperationException();
-    }
-}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
deleted file mode 100644
index 0dec96d..0000000
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.s3;
-
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
-import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
-import org.apache.airavata.mft.secret.client.SecretServiceClient;
-import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class S3Sender implements Connector {
-
-    private static final Logger logger = LoggerFactory.getLogger(S3Sender.class);
-
-    private AmazonS3 s3Client;
-
-    private String resourceServiceHost;
-    private int resourceServicePort;
-    private String secretServiceHost;
-    private int secretServicePort;
-
-    @Override
-    public void init(String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
-
-        this.resourceServiceHost = resourceServiceHost;
-        this.resourceServicePort = resourceServicePort;
-        this.secretServiceHost = secretServiceHost;
-        this.secretServicePort = secretServicePort;
-    }
-
-    @Override
-    public void destroy() {
-
-    }
-
-    @Override
-    public void startStream(AuthToken authToken, String resourceId, String credentialToken, ConnectorContext context) throws Exception {
-
-        logger.info("Starting S3 Sender stream for transfer {}", context.getTransferId());
-        logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
-
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setResourceId(resourceId).build());
-
-        if (resource.getStorageCase() != GenericResource.StorageCase.S3STORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
-            throw new Exception("Invalid storage type specified for resource " + resourceId);
-        }
-
-        S3Storage s3Storage = resource.getS3Storage();
-
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
-        BasicAWSCredentials awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), s3Secret.getSecretKey());
-
-        s3Client = AmazonS3ClientBuilder.standard()
-                .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
-                .withRegion(s3Storage.getRegion())
-                .build();
-
-        ObjectMetadata metadata = new ObjectMetadata();
-        metadata.setContentLength(context.getMetadata().getResourceSize());
-
-        s3Client.putObject(s3Storage.getBucketName(), resource.getFile().getResourcePath(),
-                context.getStreamBuffer().getInputStream(), metadata);
-        logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
-    }
-
-    @Override
-    public void startStream(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
-                            ConnectorContext context) throws Exception {
-        throw new UnsupportedOperationException();
-    }
-}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/LimitInputStream.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/LimitInputStream.java
new file mode 100644
index 0000000..806f7c2
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/LimitInputStream.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.scp;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channel;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LimitInputStream extends FilterInputStream implements Channel {
+    private final AtomicBoolean open = new AtomicBoolean(true);
+    private long remaining;
+
+    public LimitInputStream(InputStream in, long length) {
+        super(in);
+        this.remaining = length;
+    }
+
+    public boolean isOpen() {
+        return this.open.get();
+    }
+
+    public int read() throws IOException {
+        if (!this.isOpen()) {
+            throw new IOException("read() - stream is closed (remaining=" + this.remaining + ")");
+        } else if (this.remaining > 0L) {
+            --this.remaining;
+            return super.read();
+        } else {
+            return -1;
+        }
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (!this.isOpen()) {
+            throw new IOException("read(len=" + len + ") stream is closed (remaining=" + this.remaining + ")");
+        } else {
+            int nb = len;
+            if ((long)len > this.remaining) {
+                nb = (int)this.remaining;
+            }
+
+            if (nb > 0) {
+                int read = super.read(b, off, nb);
+                this.remaining -= (long)read;
+                return read;
+            } else {
+                return -1;
+            }
+        }
+    }
+
+    public long skip(long n) throws IOException {
+        if (!this.isOpen()) {
+            throw new IOException("skip(" + n + ") stream is closed (remaining=" + this.remaining + ")");
+        } else {
+            long skipped = super.skip(n);
+            this.remaining -= skipped;
+            return skipped;
+        }
+    }
+
+    public int available() throws IOException {
+        if (!this.isOpen()) {
+            throw new IOException("available() stream is closed (remaining=" + this.remaining + ")");
+        } else {
+            int av = super.available();
+            return (long)av > this.remaining ? (int)this.remaining : av;
+        }
+    }
+
+    public void close() throws IOException {
+        if (!this.open.getAndSet(false)) {
+            ;
+        }
+    }
+}
+
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
new file mode 100644
index 0000000..d6bacdb
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.scp;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.IncomingStreamingConnector;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public final class SCPIncomingConnector implements IncomingStreamingConnector {
+
+    private static final Logger logger = LoggerFactory.getLogger(SCPIncomingConnector.class);
+
+    private Session session;
+    private GenericResource resource;
+    private Channel channel;
+    private OutputStream out;
+    private InputStream in;
+    private final byte[] buf = new byte[1024];
+
+    @Override
+    public void init(ConnectorConfig cc) throws Exception {
+
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setResourceId(cc.getResourceId()).build());
+        }
+
+        if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
+            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+        }
+
+        SCPSecret scpSecret;
+
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+            scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setSecretId(cc.getCredentialToken()).build());
+        }
+
+        SCPStorage scpStorage = resource.getScpStorage();
+        logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
+
+        this.session = SCPTransportUtil.createSession(
+                scpSecret.getUser(),
+                scpStorage.getHost(),
+                scpStorage.getPort(),
+                scpSecret.getPrivateKey().getBytes(),
+                scpSecret.getPublicKey().getBytes(),
+                scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
+
+        if (session == null) {
+            logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
+            throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
+        }
+    }
+
+    private String escapeSpecialChars(String path) {
+        return  path.replace(" ", "\\ ");
+    }
+
+    @Override
+    public InputStream fetchInputStream() throws Exception {
+        String resourcePath = null;
+        switch (resource.getResourceCase()){
+            case FILE:
+                resourcePath = resource.getFile().getResourcePath();
+                break;
+            case DIRECTORY:
+                throw new Exception("A directory path can not be streamed");
+            case RESOURCE_NOT_SET:
+                throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+        }
+
+        return fetchInputStreamJCraft(escapeSpecialChars(resourcePath));
+    }
+
+    @Override
+    public InputStream fetchInputStream(String childPath) throws Exception {
+
+        String resourcePath = null;
+        switch (resource.getResourceCase()){
+            case FILE:
+                throw new Exception("A child path can not be associated with a file parent");
+            case DIRECTORY:
+                resourcePath = resource.getDirectory().getResourcePath();
+                if (!childPath.startsWith(resourcePath)) {
+                    throw new Exception("Child path " + childPath + " is not in the parent path " + resourcePath);
+                }
+                resourcePath = childPath;
+                break;
+            case RESOURCE_NOT_SET:
+                throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+        }
+
+        return fetchInputStreamJCraft(escapeSpecialChars(resourcePath));
+    }
+
+    private InputStream fetchInputStreamJCraft(String resourcePath) throws Exception{
+        String command = "scp -f " + resourcePath;
+        channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+
+        // get I/O streams for remote scp
+        out = channel.getOutputStream();
+        in = channel.getInputStream();
+
+        channel.connect();
+
+        // send '\0'
+        buf[0] = 0;
+        out.write(buf, 0, 1);
+        out.flush();
+
+        while (true) {
+            int c = checkAck(in);
+            if (c != 'C') {
+                break;
+            }
+
+            // read '0644 '
+            in.read(buf, 0, 5);
+
+            long filesize = 0L;
+            while (true) {
+                if (in.read(buf, 0, 1) < 0) {
+                    // error
+                    break;
+                }
+                if (buf[0] == ' ') break;
+                filesize = filesize * 10L + (long) (buf[0] - '0');
+            }
+
+            String file = null;
+            for (int i = 0; ; i++) {
+                in.read(buf, i, 1);
+                if (buf[i] == (byte) 0x0a) {
+                    file = new String(buf, 0, i);
+                    break;
+                }
+            }
+
+            logger.info("file-size=" + filesize + ", file=" + file);
+            // send '\0'
+            buf[0] = 0;
+            out.write(buf, 0, 1);
+            out.flush();
+
+            // read a content of lfile
+            return new LimitInputStream(in, filesize);
+        }
+        return null;
+    }
+
+    @Override
+    public void complete() throws Exception {
+        if (checkAck(in) != 0) {
+            throw new IOException("Error code found in ack " + (checkAck(in)));
+        }
+
+        // send '\0'
+        buf[0] = 0;
+        out.write(buf, 0, 1);
+        out.flush();
+
+        channel.disconnect();
+        session.disconnect();
+    }
+
+    private int checkAck(InputStream in) throws IOException {
+        int b = in.read();
+        // b may be 0 for success,
+        //          1 for error,
+        //          2 for fatal error,
+        //         -1
+        if (b == 0) return b;
+        if (b == -1) return b;
+
+        if (b == 1 || b == 2) {
+            StringBuffer sb = new StringBuffer();
+            int c;
+            do {
+                c = in.read();
+                sb.append((char) c);
+            }
+            while (c != '\n');
+            if (b == 1) { // error
+                logger.error("Check Ack Failure b = 1 " + sb.toString());
+            }
+            if (b == 2) { // fatal error
+                logger.error("Check Ack Failure b = 2 " + sb.toString());
+            }
+        }
+        return b;
+    }
+}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
index 1468479..882f8d2 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
@@ -120,27 +120,38 @@
     public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
 
         checkInitialized();
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource scpResource = resourceClient.get()
-                .getGenericResource(GenericResourceGetRequest.newBuilder()
-                        .setAuthzToken(authZToken).setResourceId(resourceId).build());
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
-                .setAuthzToken(authZToken).setSecretId(credentialToken).build());
+        GenericResource scpResource;
+        SCPSecret scpSecret;
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
+            scpResource = resourceClient.get()
+                    .getGenericResource(GenericResourceGetRequest.newBuilder()
+                            .setAuthzToken(authZToken).setResourceId(resourceId).build());
+        }
+
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
+            scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+                    .setAuthzToken(authZToken).setSecretId(credentialToken).build());
+        }
 
         return getFileResourceMetadata(authZToken,scpResource, scpSecret);
     }
 
     @Override
     public FileResourceMetadata getFileResourceMetadata(AuthToken authZToken, String parentResourceId, String childResourcePath, String credentialToken) throws Exception {
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setAuthzToken(authZToken)
-                .setResourceId(parentResourceId).build());
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        GenericResource resource;
+        SCPSecret scpSecret;
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(authZToken)
+                    .setResourceId(parentResourceId).build());
+        }
+
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
+            scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+                    .setAuthzToken(authZToken).setSecretId(credentialToken).build());
+        }
 
         boolean isChildPath = false;
         if (childResourcePath != null && !"".equals(childResourcePath)) {
@@ -148,9 +159,9 @@
         }
 
         String resourcePath = null;
-        switch (resource.getResourceCase()){
+        switch (resource.getResourceCase()) {
             case FILE:
-                if (isChildPath){
+                if (isChildPath) {
                     throw new Exception("A child path can not be associated with a file parent");
                 }
                 resourcePath = resource.getFile().getResourcePath();
@@ -170,9 +181,9 @@
         }
 
         GenericResource scpResource2 = GenericResource.newBuilder()
-                                        .setFile(FileResource.newBuilder()
-                                        .setResourcePath(resourcePath).build())
-                                        .setScpStorage(resource.getScpStorage()).build();
+                .setFile(FileResource.newBuilder()
+                        .setResourcePath(resourcePath).build())
+                .setScpStorage(resource.getScpStorage()).build();
 
         return getFileResourceMetadata(authZToken, scpResource2, scpSecret);
     }
@@ -201,7 +212,7 @@
                     }
 
                     if (rri.isRegularFile()) {
-                        FileResourceMetadata.Builder childFileBuilder = FileResourceMetadata.Builder.getBuilder()
+                        FileResourceMetadata.Builder childFileBuilder = FileResourceMetadata.Builder.newBuilder()
                                         .withFriendlyName(rri.getName())
                                         .withResourcePath(rri.getPath())
                                         .withCreatedTime(rri.getAttributes().getAtime())
@@ -224,25 +235,36 @@
     @Override
     public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
 
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource scpPResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setAuthzToken(authZToken)
-                .setResourceId(resourceId).build());
+        GenericResource resource;
+        SCPSecret scpSecret;
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(authZToken)
+                    .setResourceId(resourceId).build());
+        }
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
+            scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+                    .setAuthzToken(authZToken).setSecretId(credentialToken).build());
+        }
 
-        return getDirectoryResourceMetadata(authZToken,scpPResource, scpSecret);
+        return getDirectoryResourceMetadata(authZToken, resource, scpSecret);
     }
 
     @Override
     public DirectoryResourceMetadata getDirectoryResourceMetadata(AuthToken authZToken, String parentResourceId, String childResourcePath, String credentialToken) throws Exception {
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setAuthzToken(authZToken).setResourceId(parentResourceId).build());
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        GenericResource resource;
+        SCPSecret scpSecret;
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(authZToken).setResourceId(parentResourceId).build());
+        }
+
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
+            scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+                    .setAuthzToken(authZToken).setSecretId(credentialToken).build());
+        }
 
         boolean isChildPath = false;
         if (childResourcePath != null && !"".equals(childResourcePath)) {
@@ -283,34 +305,44 @@
     public Boolean isAvailable(AuthToken authZToken, String resourceId, String credentialToken) throws Exception {
 
         checkInitialized();
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource scpResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setAuthzToken(authZToken).setResourceId(resourceId).build());
 
-        return isAvailable(authZToken, scpResource, credentialToken);
+        GenericResource resource;
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(authZToken).setResourceId(resourceId).build());
+        }
+
+        return isAvailable(authZToken, resource, credentialToken);
     }
 
     @Override
     public Boolean isAvailable(AuthToken authToken, String parentResourceId, String resourcePath, String credentialToken) throws Exception {
         checkInitialized();
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource scpResource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setAuthzToken(authToken).setResourceId(parentResourceId).build());
 
-        validateParent(scpResource, resourcePath);
+        GenericResource resource;
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort)) {
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(authToken).setResourceId(parentResourceId).build());
+        }
+
+        validateParent(resource, resourcePath);
 
         GenericResource targetScpResource = GenericResource.newBuilder()
                 .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
-                .setScpStorage(scpResource.getScpStorage()).build();
+                .setScpStorage(resource.getScpStorage()).build();
 
         return isAvailable(authToken, targetScpResource, credentialToken);
     }
 
     public Boolean isAvailable(AuthToken authToken, GenericResource scpResource, String credentialToken) throws Exception {
 
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
-                .setAuthzToken(authToken).setSecretId(credentialToken).build());
+
+        SCPSecret scpSecret;
+
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort)) {
+            scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+                    .setAuthzToken(authToken).setSecretId(credentialToken).build());
+        }
 
         try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
             logger.info("Checking the availability of file {}", scpResource.getFile().getResourcePath());
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
new file mode 100644
index 0000000..28ea5e0
--- /dev/null
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.scp;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.Session;
+import org.apache.airavata.mft.core.api.ConnectorConfig;
+import org.apache.airavata.mft.core.api.OutgoingStreamingConnector;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
+import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.GenericResource;
+import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.secret.client.SecretServiceClient;
+import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public final class SCPOutgoingConnector implements OutgoingStreamingConnector {
+
+    private static final Logger logger = LoggerFactory.getLogger(SCPOutgoingConnector.class);
+
+    private GenericResource resource;
+    private Session session;
+    private OutputStream out;
+    private InputStream in;
+    private Channel channel;
+    private ConnectorConfig cc;
+    private final byte[] buf = new byte[1024];
+
+
+    @Override
+    public void init(ConnectorConfig cc) throws Exception {
+
+        this.cc = cc;
+        try (ResourceServiceClient resourceClient = ResourceServiceClientBuilder
+                .buildClient(cc.getResourceServiceHost(), cc.getResourceServicePort())) {
+
+            resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setResourceId(cc.getResourceId()).build());
+        }
+
+        if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
+            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), cc.getResourceId());
+            throw new Exception("Invalid storage type specified for resource " + cc.getResourceId());
+        }
+
+        SCPSecret scpSecret;
+
+        try (SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(
+                cc.getSecretServiceHost(), cc.getSecretServicePort())) {
+
+            scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
+                    .setAuthzToken(cc.getAuthToken())
+                    .setSecretId(cc.getCredentialToken()).build());
+        }
+
+        SCPStorage scpStorage = resource.getScpStorage();
+        logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
+
+        this.session = SCPTransportUtil.createSession(
+                scpSecret.getUser(),
+                scpStorage.getHost(),
+                scpStorage.getPort(),
+                scpSecret.getPrivateKey().getBytes(),
+                scpSecret.getPublicKey().getBytes(),
+                scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
+
+        if (session == null) {
+            logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
+            throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
+        }
+    }
+
+    private String escapeSpecialChars(String path) {
+        return  path.replace(" ", "\\ ");
+    }
+
+    @Override
+    public OutputStream fetchOutputStream() throws Exception {
+        String resourcePath = null;
+        switch (resource.getResourceCase()){
+            case FILE:
+                resourcePath = resource.getFile().getResourcePath();
+                break;
+            case DIRECTORY:
+                throw new Exception("A directory path can not be streamed");
+            case RESOURCE_NOT_SET:
+                throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+        }
+
+        return fetchOutputStreamJCraft(escapeSpecialChars(resourcePath), cc.getMetadata().getResourceSize());
+    }
+
+    @Override
+    public OutputStream fetchOutputStream(String childPath) throws Exception {
+        String resourcePath = null;
+        switch (resource.getResourceCase()){
+            case FILE:
+                throw new Exception("A child path can not be associated with a file parent");
+            case DIRECTORY:
+                resourcePath = resource.getDirectory().getResourcePath();
+                if (!childPath.startsWith(resourcePath)) {
+                    throw new Exception("Child path " + childPath + " is not in the parent path " + resourcePath);
+                }
+                resourcePath = childPath;
+                break;
+            case RESOURCE_NOT_SET:
+                throw new Exception("Resource was not set in resource with id " + resource.getResourceId());
+        }
+
+        return fetchOutputStreamJCraft(escapeSpecialChars(resourcePath), cc.getMetadata().getResourceSize());
+    }
+
+    public OutputStream fetchOutputStreamJCraft(String resourcePath, long fileSize) throws Exception {
+        boolean ptimestamp = true;
+
+        // exec 'scp -t rfile' remotely
+        String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + resourcePath;
+        channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+
+        // get I/O streams for remote scp
+        out = channel.getOutputStream();
+        in = channel.getInputStream();
+
+        channel.connect();
+
+        if (checkAck(in) != 0) {
+            throw new IOException("Error code found in ack " + (checkAck(in)));
+        }
+
+        if (ptimestamp) {
+            command = "T" + (System.currentTimeMillis() / 1000) + " 0";
+            // The access time should be sent here,
+            // but it is not accessible with JavaAPI ;-<
+            command += (" " + (System.currentTimeMillis() / 1000) + " 0\n");
+            out.write(command.getBytes());
+            out.flush();
+            if (checkAck(in) != 0) {
+                throw new IOException("Error code found in ack " + (checkAck(in)));
+            }
+        }
+
+        // send "C0644 filesize filename", where filename should not include '/'
+        command = "C0644 " + fileSize + " ";
+        if (resourcePath.lastIndexOf('/') > 0) {
+            command += resourcePath.substring(resourcePath.lastIndexOf('/') + 1);
+        } else {
+            command += resourcePath;
+        }
+
+        command += "\n";
+        out.write(command.getBytes());
+        out.flush();
+
+        if (checkAck(in) != 0) {
+            throw new IOException("Error code found in ack " + (checkAck(in)));
+        }
+
+        return out;
+    }
+
+    @Override
+    public void complete() throws Exception {
+        buf[0] = 0;
+        out.write(buf, 0, 1);
+        out.flush();
+
+        if (checkAck(in) != 0) {
+            throw new IOException("Error code found in ack " + (checkAck(in)));
+        }
+        out.close();
+        channel.disconnect();
+        session.disconnect();
+    }
+
+    public int checkAck(InputStream in) throws IOException {
+        int b = in.read();
+        // b may be 0 for success,
+        //          1 for error,
+        //          2 for fatal error,
+        //         -1
+        if (b == 0) return b;
+        if (b == -1) return b;
+
+        if (b == 1 || b == 2) {
+            StringBuffer sb = new StringBuffer();
+            int c;
+            do {
+                c = in.read();
+                sb.append((char) c);
+            }
+            while (c != '\n');
+            if (b == 1) { // error
+                logger.error("Check Ack Failure b = 1 " + sb.toString());
+            }
+            if (b == 2) { // fatal error
+                logger.error("Check Ack Failure b = 2 " + sb.toString());
+            }
+        }
+        return b;
+    }
+}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
deleted file mode 100644
index dab47d2..0000000
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.scp;
-
-import com.jcraft.jsch.ChannelExec;
-import com.jcraft.jsch.Session;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.DoubleStreamingBuffer;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
-import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
-import org.apache.airavata.mft.secret.client.SecretServiceClient;
-import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class SCPReceiver implements Connector {
-
-    private static final Logger logger = LoggerFactory.getLogger(SCPReceiver.class);
-
-    boolean initialized = false;
-
-    private Session session;
-    private String resourceServiceHost;
-    private int resourceServicePort;
-    private String secretServiceHost;
-    private int secretServicePort;
-
-    public void init(String resourceServiceHost, int resourceServicePort,
-                     String secretServiceHost, int secretServicePort) throws Exception {
-
-        this.resourceServiceHost = resourceServiceHost;
-        this.resourceServicePort = resourceServicePort;
-        this.secretServiceHost = secretServiceHost;
-        this.secretServicePort = secretServicePort;
-
-        if (initialized) {
-            destroy();
-        }
-
-        this.initialized = true;
-    }
-
-    public void destroy() {
-        try {
-            this.session.disconnect();
-        } catch (Exception e) {
-            logger.error("Errored while disconnecting session", e);
-        }
-    }
-
-    private void checkInitialized() {
-        if (!initialized) {
-            throw new IllegalStateException("SCP Receiver is not initialized");
-        }
-    }
-
-    public void startStream(AuthToken authToken, String resourceId, String credentialToken, ConnectorContext context) throws Exception {
-        startStream(authToken, resourceId, null, credentialToken, context);
-    }
-
-    @Override
-    public void startStream(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken,
-                            ConnectorContext context) throws Exception {
-        checkInitialized();
-
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setAuthzToken(authToken).setResourceId(resourceId).build());
-
-        if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
-            throw new Exception("Invalid storage type specified for resource " + resourceId);
-        }
-
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
-                .setAuthzToken(authToken)
-                .setSecretId(credentialToken).build());
-
-        SCPStorage scpStorage = resource.getScpStorage();
-        logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
-
-        this.session = SCPTransportUtil.createSession(
-                scpSecret.getUser(),
-                scpStorage.getHost(),
-                scpStorage.getPort(),
-                scpSecret.getPrivateKey().getBytes(),
-                scpSecret.getPublicKey().getBytes(),
-                scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
-
-        if (session == null) {
-            logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
-            throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
-        }
-
-        boolean isChildPath = false;
-        if (childResourcePath != null && !"".equals(childResourcePath)) {
-            isChildPath = true;
-        }
-
-        String resourcePath = null;
-        switch (resource.getResourceCase()){
-            case FILE:
-                if (isChildPath){
-                    throw new Exception("A child path can not be associated with a file parent");
-                }
-                resourcePath = resource.getFile().getResourcePath();
-                break;
-            case DIRECTORY:
-                resourcePath = resource.getDirectory().getResourcePath();
-                if (isChildPath) {
-                    if (!childResourcePath.startsWith(resourcePath)) {
-                        throw new Exception("Child path " + childResourcePath + " is not in the parent path " + resourcePath);
-                    }
-                    resourcePath = childResourcePath;
-                }
-
-                break;
-            case RESOURCE_NOT_SET:
-                throw new Exception("Resource was not set in resource with id " + resourceId);
-        }
-
-        transferRemoteToStream(session, resourcePath, context.getStreamBuffer());
-        logger.info("SCP Receive completed. Transfer {}", context.getTransferId());
-    }
-
-    private void transferRemoteToStream(Session session, String from, DoubleStreamingBuffer streamBuffer) throws Exception {
-
-        try {
-            OutputStream outputStream = streamBuffer.getOutputStream();
-
-            logger.info("Starting scp receive");
-            // exec 'scp -f rfile' remotely
-            String command = "scp -f " + from;
-            com.jcraft.jsch.Channel channel = session.openChannel("exec");
-            ((ChannelExec) channel).setCommand(command);
-
-            // get I/O streams for remote scp
-            OutputStream out = channel.getOutputStream();
-            InputStream in = channel.getInputStream();
-
-            channel.connect();
-
-            byte[] buf = new byte[1024];
-
-            // send '\0'
-            buf[0] = 0;
-            out.write(buf, 0, 1);
-            out.flush();
-
-            while (true) {
-                int c = checkAck(in);
-                if (c != 'C') {
-                    break;
-                }
-
-                // read '0644 '
-                in.read(buf, 0, 5);
-
-                long filesize = 0L;
-                while (true) {
-                    if (in.read(buf, 0, 1) < 0) {
-                        // error
-                        break;
-                    }
-                    if (buf[0] == ' ') break;
-                    filesize = filesize * 10L + (long) (buf[0] - '0');
-                }
-
-                String file = null;
-                for (int i = 0; ; i++) {
-                    in.read(buf, i, 1);
-                    if (buf[i] == (byte) 0x0a) {
-                        file = new String(buf, 0, i);
-                        break;
-                    }
-                }
-
-                logger.info("file-size=" + filesize + ", file=" + file);
-                // send '\0'
-                buf[0] = 0;
-                out.write(buf, 0, 1);
-                out.flush();
-
-                // read a content of lfile
-                int bufSize;
-                while (true) {
-                    if (buf.length < filesize) bufSize = buf.length;
-                    else bufSize = (int) filesize;
-                    bufSize = in.read(buf, 0, bufSize);
-                    if (bufSize < 0) {
-                        // error
-                        break;
-                    }
-                    //System.out.println("Read " + bufSize);
-                    outputStream.write(buf, 0, bufSize);
-                    outputStream.flush();
-
-                    filesize -= bufSize;
-                    if (filesize == 0L) break;
-                }
-
-                if (checkAck(in) != 0) {
-                    throw new IOException("Error code found in ack " + (checkAck(in)));
-                }
-
-                // send '\0'
-                buf[0] = 0;
-                out.write(buf, 0, 1);
-                out.flush();
-                outputStream.close();
-            }
-
-            channel.disconnect();
-            session.disconnect();
-
-        } finally {
-            try {
-                session.disconnect();
-            } catch (Exception e) {
-                logger.warn("Session disconnection failed", e);
-            }
-        }
-
-    }
-
-    public int checkAck(InputStream in) throws IOException {
-        int b = in.read();
-        // b may be 0 for success,
-        //          1 for error,
-        //          2 for fatal error,
-        //         -1
-        if (b == 0) return b;
-        if (b == -1) return b;
-
-        if (b == 1 || b == 2) {
-            StringBuffer sb = new StringBuffer();
-            int c;
-            do {
-                c = in.read();
-                sb.append((char) c);
-            }
-            while (c != '\n');
-            if (b == 1) { // error
-                System.out.print(sb.toString());
-            }
-            if (b == 2) { // fatal error
-                System.out.print(sb.toString());
-            }
-        }
-        return b;
-    }
-}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
deleted file mode 100644
index ce33646..0000000
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.scp;
-
-import com.jcraft.jsch.ChannelExec;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
-import org.apache.airavata.mft.common.AuthToken;
-import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.DoubleStreamingBuffer;
-import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
-import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.common.GenericResource;
-import org.apache.airavata.mft.resource.stubs.common.GenericResourceGetRequest;
-import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
-import org.apache.airavata.mft.secret.client.SecretServiceClient;
-import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-
-public class SCPSender implements Connector {
-
-    private static final Logger logger = LoggerFactory.getLogger(SCPSender.class);
-
-    boolean initialized = false;
-
-    private Session session;
-    private String resourceServiceHost;
-    private int resourceServicePort;
-    private String secretServiceHost;
-    private int secretServicePort;
-
-    public void init(String resourceServiceHost, int resourceServicePort,
-                     String secretServiceHost, int secretServicePort) throws Exception {
-
-        this.resourceServiceHost = resourceServiceHost;
-        this.resourceServicePort = resourceServicePort;
-        this.secretServiceHost = secretServiceHost;
-        this.secretServicePort = secretServicePort;
-
-        if (initialized) {
-            destroy();
-        }
-        this.initialized = true;
-    }
-
-
-    public void destroy() {
-
-        try {
-            this.session.disconnect();
-        } catch (Exception e) {
-            logger.error("Errored while disconnecting session", e);
-        }
-    }
-
-    private void checkInitialized() {
-        if (!initialized) {
-            throw new IllegalStateException("SCP Sender is not initialized");
-        }
-    }
-
-    public void startStream(AuthToken authToken, String resourceId, String credentialToken, ConnectorContext context) throws Exception {
-        startStream(authToken, resourceId, null, credentialToken, context);
-    }
-
-    @Override
-    public void startStream(AuthToken authToken, String resourceId, String childResourcePath, String credentialToken, ConnectorContext context) throws Exception {
-        checkInitialized();
-
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        GenericResource resource = resourceClient.get().getGenericResource(GenericResourceGetRequest.newBuilder()
-                .setAuthzToken(authToken).setResourceId(resourceId).build());
-
-        if (resource.getStorageCase() != GenericResource.StorageCase.SCPSTORAGE) {
-            logger.error("Invalid storage type {} specified for resource {}", resource.getStorageCase(), resourceId);
-            throw new Exception("Invalid storage type specified for resource " + resourceId);
-        }
-
-        SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder()
-                .setAuthzToken(authToken)
-                .setSecretId(credentialToken).build());
-
-        SCPStorage scpStorage = resource.getScpStorage();
-        logger.info("Creating a ssh session for {}@{}:{}", scpSecret.getUser(), scpStorage.getHost(), scpStorage.getPort());
-
-        this.session = SCPTransportUtil.createSession(
-                scpSecret.getUser(),
-                scpStorage.getHost(),
-                scpStorage.getPort(),
-                scpSecret.getPrivateKey().getBytes(),
-                scpSecret.getPublicKey().getBytes(),
-                scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
-
-        if (session == null) {
-            System.out.println("Session can not be null. Make sure that SCP Sender is properly initialized");
-            throw new Exception("Session can not be null. Make sure that SCP Sender is properly initialized");
-        }
-
-        boolean isChildPath = false;
-        if (childResourcePath != null && !"".equals(childResourcePath)) {
-            isChildPath = true;
-        }
-
-        String resourcePath = null;
-        switch (resource.getResourceCase()){
-            case FILE:
-                if (isChildPath){
-                    throw new Exception("A child path can not be associated with a file parent");
-                }
-                resourcePath = resource.getFile().getResourcePath();
-                break;
-            case DIRECTORY:
-                resourcePath = resource.getDirectory().getResourcePath();
-                if (isChildPath) {
-                    if (!childResourcePath.startsWith(resourcePath)) {
-                        throw new Exception("Child path " + childResourcePath + " is not in the parent path " + resourcePath);
-                    }
-                    resourcePath = childResourcePath;
-                }
-
-                break;
-            case RESOURCE_NOT_SET:
-                throw new Exception("Resource was not set in resource with id " + resourceId);
-        }
-
-        try {
-            copyLocalToRemote(this.session,
-                    resourcePath,
-                    context.getStreamBuffer(),
-                    context.getMetadata().getResourceSize());
-            logger.info("SCP send to transfer {} completed", context.getTransferId());
-
-        } catch (Exception e) {
-            logger.error("Errored while streaming to remote scp server. Transfer {}", context.getTransferId() , e);
-            throw e;
-        }
-    }
-
-    private void copyLocalToRemote(Session session, String to, DoubleStreamingBuffer streamBuffer, long fileSize) throws JSchException, IOException {
-        try {
-            logger.info("Starting scp send for remote server");
-            InputStream inputStream = streamBuffer.getInputStream();
-
-            boolean ptimestamp = true;
-
-            // exec 'scp -t rfile' remotely
-            String command = "scp " + (ptimestamp ? "-p" : "") + " -t " + to;
-            com.jcraft.jsch.Channel channel = session.openChannel("exec");
-            ((ChannelExec) channel).setCommand(command);
-
-            // get I/O streams for remote scp
-            OutputStream out = channel.getOutputStream();
-            InputStream in = channel.getInputStream();
-
-            channel.connect();
-
-            if (checkAck(in) != 0) {
-                throw new IOException("Error code found in ack " + (checkAck(in)));
-            }
-
-            if (ptimestamp) {
-                command = "T" + (System.currentTimeMillis() / 1000) + " 0";
-                // The access time should be sent here,
-                // but it is not accessible with JavaAPI ;-<
-                command += (" " + (System.currentTimeMillis() / 1000) + " 0\n");
-                out.write(command.getBytes());
-                out.flush();
-                if (checkAck(in) != 0) {
-                    throw new IOException("Error code found in ack " + (checkAck(in)));
-                }
-            }
-
-            // send "C0644 filesize filename", where filename should not include '/'
-            command = "C0644 " + fileSize + " ";
-            if (to.lastIndexOf('/') > 0) {
-                command += to.substring(to.lastIndexOf('/') + 1);
-            } else {
-                command += to;
-            }
-
-            command += "\n";
-            out.write(command.getBytes());
-            out.flush();
-
-            if (checkAck(in) != 0) {
-                throw new IOException("Error code found in ack " + (checkAck(in)));
-            }
-
-            // send a content of lfile
-            byte[] buf = new byte[1024];
-            long totalWritten = 0;
-            while (true) {
-                int len = inputStream.read(buf, 0, buf.length);
-                if (len == -1) {
-                    break;
-                } else {
-                    out.write(buf, 0, len); //out.flush();
-                    totalWritten += len;
-                    //System.out.println("Write " + totalWritten);
-                    if (totalWritten == fileSize) {
-                        break;
-                    }
-                }
-            }
-
-            // send '\0'
-            buf[0] = 0;
-            out.write(buf, 0, 1);
-            out.flush();
-
-            if (checkAck(in) != 0) {
-                throw new IOException("Error code found in ack " + (checkAck(in)));
-            }
-            out.close();
-            channel.disconnect();
-
-        } finally {
-            try {
-                session.disconnect();
-            } catch (Exception e) {
-                logger.warn("Session disconnection failed", e);
-            }
-        }
-    }
-
-    public int checkAck(InputStream in) throws IOException {
-        int b = in.read();
-        // b may be 0 for success,
-        //          1 for error,
-        //          2 for fatal error,
-        //         -1
-        if (b == 0) return b;
-        if (b == -1) return b;
-
-        if (b == 1 || b == 2) {
-            StringBuffer sb = new StringBuffer();
-            int c;
-            do {
-                c = in.read();
-                sb.append((char) c);
-            }
-            while (c != '\n');
-            if (b == 1) { // error
-                System.out.print(sb.toString());
-            }
-            if (b == 2) { // fatal error
-                System.out.print(sb.toString());
-            }
-        }
-        return b;
-    }
-}