Making storage as first class entity and supporting http transport in the agent
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java
index 175e867..b74b90a 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferCommand.java
@@ -22,12 +22,14 @@
 public class TransferCommand {
 
     private String transferId;
-    private String sourceId;
+    private String sourceStorageId;
+    private String sourcePath;
     private String sourceType;
     private String sourceToken;
     private String sourceResourceBackend;
     private String sourceCredentialBackend;
-    private String destinationId;
+    private String destinationStorageId;
+    private String destinationPath;
     private String destinationType;
     private String destinationToken;
     private String destResourceBackend;
@@ -42,12 +44,21 @@
         return this;
     }
 
-    public String getSourceId() {
-        return sourceId;
+    public String getSourceStorageId() {
+        return sourceStorageId;
     }
 
-    public TransferCommand setSourceId(String sourceId) {
-        this.sourceId = sourceId;
+    public TransferCommand setSourceStorageId(String sourceStorageId) {
+        this.sourceStorageId = sourceStorageId;
+        return this;
+    }
+
+    public String getSourcePath() {
+        return sourcePath;
+    }
+
+    public TransferCommand setSourcePath(String sourcePath) {
+        this.sourcePath = sourcePath;
         return this;
     }
 
@@ -87,12 +98,21 @@
         return this;
     }
 
-    public String getDestinationId() {
-        return destinationId;
+    public String getDestinationStorageId() {
+        return destinationStorageId;
     }
 
-    public TransferCommand setDestinationId(String destinationId) {
-        this.destinationId = destinationId;
+    public TransferCommand setDestinationStorageId(String destinationStorageId) {
+        this.destinationStorageId = destinationStorageId;
+        return this;
+    }
+
+    public String getDestinationPath() {
+        return destinationPath;
+    }
+
+    public TransferCommand setDestinationPath(String destinationPath) {
+        this.destinationPath = destinationPath;
         return this;
     }
 
diff --git a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
index ecd5cdc..8512713 100644
--- a/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
+++ b/admin/src/main/java/org/apache/airavata/mft/admin/models/TransferRequest.java
@@ -21,12 +21,14 @@
 
 public class TransferRequest {
 
-    private String sourceId;
+    private String sourceStorageId;
+    private String sourcePath;
     private String sourceType;
     private String sourceToken;
     private String sourceResourceBackend;
     private String sourceCredentialBackend;
-    private String destinationId;
+    private String destinationStorageId;
+    private String destinationPath;
     private String destinationType;
     private String destinationToken;
     private String destResourceBackend;
@@ -34,12 +36,21 @@
     private boolean affinityTransfer;
     private Map<String, Integer> targetAgents;
 
-    public String getSourceId() {
-        return sourceId;
+    public String getSourceStorageId() {
+        return sourceStorageId;
     }
 
-    public TransferRequest setSourceId(String sourceId) {
-        this.sourceId = sourceId;
+    public TransferRequest setSourceStorageId(String sourceStorageId) {
+        this.sourceStorageId = sourceStorageId;
+        return this;
+    }
+
+    public String getSourcePath() {
+        return sourcePath;
+    }
+
+    public TransferRequest setSourcePath(String sourcePath) {
+        this.sourcePath = sourcePath;
         return this;
     }
 
@@ -79,12 +90,21 @@
         return this;
     }
 
-    public String getDestinationId() {
-        return destinationId;
+    public String getDestinationStorageId() {
+        return destinationStorageId;
     }
 
-    public TransferRequest setDestinationId(String destinationId) {
-        this.destinationId = destinationId;
+    public TransferRequest setDestinationStorageId(String destinationStorageId) {
+        this.destinationStorageId = destinationStorageId;
+        return this;
+    }
+
+    public String getDestinationPath() {
+        return destinationPath;
+    }
+
+    public TransferRequest setDestinationPath(String destinationPath) {
+        this.destinationPath = destinationPath;
         return this;
     }
 
diff --git a/agent/pom.xml b/agent/pom.xml
index 7ca3753..7d6765e 100644
--- a/agent/pom.xml
+++ b/agent/pom.xml
@@ -99,6 +99,11 @@
             <artifactId>log4j-over-slf4j</artifactId>
             <version>${log4j.over.slf4j}</version>
         </dependency>
+        <dependency>
+            <groupId>com.sun.activation</groupId>
+            <artifactId>javax.activation</artifactId>
+            <version>1.2.0</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java b/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
index 4409232..49b3ddf 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/AppConfig.java
@@ -18,10 +18,10 @@
 package org.apache.airavata.mft.agent;
 
 import org.apache.airavata.mft.admin.MFTConsulClient;
+import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
 import org.apache.airavata.mft.agent.rpc.RPCParser;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.PropertySource;
 
 @Configuration
 public class AppConfig {
@@ -41,4 +41,9 @@
     public RPCParser rpcParser() {
         return new RPCParser();
     }
+
+    @Bean
+    public HttpTransferRequestsStore transferRequestStore() {
+        return new HttpTransferRequestsStore();
+    }
 }
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index c7bd8f7..19a0dfb 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -30,6 +30,8 @@
 import org.apache.airavata.mft.admin.models.TransferCommand;
 import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
+import org.apache.airavata.mft.agent.http.HttpServer;
+import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
 import org.apache.airavata.mft.agent.rpc.RPCParser;
 import org.apache.airavata.mft.core.ConnectorResolver;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
@@ -101,6 +103,9 @@
     @Autowired
     private MFTConsulClient mftConsulClient;
 
+    @Autowired
+    private HttpTransferRequestsStore transferRequestsStore;
+
     public void init() {
         transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId );
         rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId );
@@ -147,11 +152,11 @@
 
                         Optional<Connector> inConnectorOpt = ConnectorResolver.resolveConnector(request.getSourceType(), "IN");
                         Connector inConnector = inConnectorOpt.orElseThrow(() -> new Exception("Could not find an in connector for given input"));
-                        inConnector.init(request.getSourceId(), request.getSourceToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+                        inConnector.init(request.getSourceStorageId(), request.getSourceToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
 
                         Optional<Connector> outConnectorOpt = ConnectorResolver.resolveConnector(request.getDestinationType(), "OUT");
                         Connector outConnector = outConnectorOpt.orElseThrow(() -> new Exception("Could not find an out connector for given input"));
-                        outConnector.init(request.getDestinationId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
+                        outConnector.init(request.getDestinationStorageId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
 
                         Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
                         MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
@@ -221,6 +226,19 @@
         transferMessageCache.start();
     }
 
+    private void acceptHTTPRequests() {
+        logger.info("Starting the HTTP front end");
+
+        new Thread(() -> {
+            HttpServer httpServer = new HttpServer(3333, false, transferRequestsStore);
+            try {
+                httpServer.run();
+            } catch (Exception e) {
+                logger.error("Http frontend server start failed", e);
+            }
+        }).start();
+    }
+
     private boolean connectAgent() throws MFTConsulClientException {
         final ImmutableSession session = ImmutableSession.builder()
                 .name(agentId)
@@ -316,6 +334,7 @@
 
         acceptTransferRequests();
         acceptRPCRequests();
+        acceptHTTPRequests();
     }
 
     @Override
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index cd90503..c1ac253 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -52,7 +52,10 @@
                            MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onStatusCallback,
                            BiConsumer<String, Boolean> exitingCallback) throws Exception {
 
-        FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(command.getSourceId(), command.getSourceToken());
+        FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
+                            command.getSourceStorageId(),
+                            command.getSourcePath(),
+                            command.getSourceToken());
 
         final long resourceSize = srcMetadata.getResourceSize();
         logger.debug("Source file size {}. MD5 {}", resourceSize, srcMetadata.getMd5sum());
@@ -65,8 +68,8 @@
         context.setStreamBuffer(streamBuffer);
         context.setTransferId(command.getTransferId());
 
-        TransferTask recvTask = new TransferTask(inConnector, context);
-        TransferTask sendTask = new TransferTask(outConnector, context);
+        TransferTask recvTask = new TransferTask(inConnector, context, command.getSourcePath());
+        TransferTask sendTask = new TransferTask(outConnector, context, command.getDestinationPath());
         List<Future<Integer>> futureList = new ArrayList<>();
 
         ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
@@ -121,14 +124,19 @@
                     }
 
                     if (!transferErrored) {
-                        Boolean transferred = destMetadataCollector.isAvailable(command.getDestinationId(), command.getDestinationToken());
+                        Boolean transferred = destMetadataCollector.isAvailable(
+                                command.getDestinationStorageId(),
+                                command.getDestinationPath(),
+                                command.getDestinationToken());
 
                         if (!transferred) {
                             logger.error("Transfer completed but resource is not available in destination");
                             throw new Exception("Transfer completed but resource is not available in destination");
                         }
 
-                        FileResourceMetadata destMetadata = destMetadataCollector.getFileResourceMetadata(command.getDestinationId(),
+                        FileResourceMetadata destMetadata = destMetadataCollector.getFileResourceMetadata(
+                                command.getDestinationStorageId(),
+                                command.getDestinationPath(),
                                 command.getDestinationToken());
 
                         boolean doIntegrityVerify = true;
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java
new file mode 100644
index 0000000..bb61ebd
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/ConnectorParams.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+public class ConnectorParams {
+
+    private String storageId, credentialToken, resourceServiceHost, secretServiceHost;
+    private int resourceServicePort, secretServicePort;
+
+    public String getStorageId() {
+        return storageId;
+    }
+
+    public ConnectorParams setStorageId(String storageId) {
+        this.storageId = storageId;
+        return this;
+    }
+
+    public String getCredentialToken() {
+        return credentialToken;
+    }
+
+    public ConnectorParams setCredentialToken(String credentialToken) {
+        this.credentialToken = credentialToken;
+        return this;
+    }
+
+    public String getResourceServiceHost() {
+        return resourceServiceHost;
+    }
+
+    public ConnectorParams setResourceServiceHost(String resourceServiceHost) {
+        this.resourceServiceHost = resourceServiceHost;
+        return this;
+    }
+
+    public String getSecretServiceHost() {
+        return secretServiceHost;
+    }
+
+    public ConnectorParams setSecretServiceHost(String secretServiceHost) {
+        this.secretServiceHost = secretServiceHost;
+        return this;
+    }
+
+    public int getResourceServicePort() {
+        return resourceServicePort;
+    }
+
+    public ConnectorParams setResourceServicePort(int resourceServicePort) {
+        this.resourceServicePort = resourceServicePort;
+        return this;
+    }
+
+    public int getSecretServicePort() {
+        return secretServicePort;
+    }
+
+    public ConnectorParams setSecretServicePort(int secretServicePort) {
+        this.secretServicePort = secretServicePort;
+        return this;
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java
new file mode 100644
index 0000000..2cf56af
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpDownloadRequest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+
+public class HttpDownloadRequest {
+
+    private ConnectorParams connectorParams;
+    private Connector srcConnector;
+    private MetadataCollector srcMetadataCollector;
+    private String srcResourceId;
+    private String srcToken;
+
+    public ConnectorParams getConnectorParams() {
+        return connectorParams;
+    }
+
+    public HttpDownloadRequest setConnectorParams(ConnectorParams connectorParams) {
+        this.connectorParams = connectorParams;
+        return this;
+    }
+
+    public Connector getSrcConnector() {
+        return srcConnector;
+    }
+
+    public HttpDownloadRequest setSrcConnector(Connector srcConnector) {
+        this.srcConnector = srcConnector;
+        return this;
+    }
+
+    public MetadataCollector getSrcMetadataCollector() {
+        return srcMetadataCollector;
+    }
+
+    public HttpDownloadRequest setSrcMetadataCollector(MetadataCollector srcMetadataCollector) {
+        this.srcMetadataCollector = srcMetadataCollector;
+        return this;
+    }
+
+    public String getSrcResourceId() {
+        return srcResourceId;
+    }
+
+    public HttpDownloadRequest setSrcResourceId(String srcResourceId) {
+        this.srcResourceId = srcResourceId;
+        return this;
+    }
+
+    public String getSrcToken() {
+        return srcToken;
+    }
+
+    public HttpDownloadRequest setSrcToken(String srcToken) {
+        this.srcToken = srcToken;
+        return this;
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServer.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServer.java
new file mode 100644
index 0000000..4cc0d8a
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+
+public class HttpServer {
+
+    private final int port;
+    private final boolean enableSSL;
+    private HttpTransferRequestsStore transferRequestsStore;
+
+    public HttpServer(int port, boolean enableSSL, HttpTransferRequestsStore transferRequestsStore) {
+        this.port = port;
+        this.enableSSL = enableSSL;
+        this.transferRequestsStore = transferRequestsStore;
+    }
+
+    public void run() throws Exception {
+        // Configure SSL.
+        final SslContext sslCtx;
+        if (enableSSL) {
+            SelfSignedCertificate ssc = new SelfSignedCertificate();
+            sslCtx = SslContext.newServerContext(SslProvider.JDK, ssc.certificate(), ssc.privateKey());
+        } else {
+            sslCtx = null;
+        }
+
+        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(bossGroup, workerGroup)
+                    .channel(NioServerSocketChannel.class)
+                    .handler(new LoggingHandler(LogLevel.INFO))
+                    .childHandler(new HttpServerInitializer(sslCtx, transferRequestsStore));
+
+            Channel ch = b.bind(port).sync().channel();
+
+            System.err.println("Open your web browser and navigate to " +
+                    (enableSSL? "https" : "http") + "://127.0.0.1:" + port + '/');
+
+            ch.closeFuture().sync();
+        } finally {
+            bossGroup.shutdownGracefully();
+            workerGroup.shutdownGracefully();
+        }
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
new file mode 100644
index 0000000..57ee4f0
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerHandler.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelProgressiveFuture;
+import io.netty.channel.ChannelProgressiveFutureListener;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.util.CharsetUtil;
+import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.DoubleStreamingBuffer;
+import org.apache.airavata.mft.core.FileResourceMetadata;
+import org.apache.airavata.mft.core.TransferTask;
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.activation.MimetypesFileTypeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static io.netty.handler.codec.http.HttpMethod.*;
+import static io.netty.handler.codec.http.HttpResponseStatus.*;
+import static io.netty.handler.codec.http.HttpVersion.*;
+
+public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
+
+    private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
+
+    private final HttpTransferRequestsStore transferRequestsStore;
+    private final ExecutorService executor = Executors.newFixedThreadPool(10);
+
+    public HttpServerHandler(HttpTransferRequestsStore transferRequestsStore) {
+        this.transferRequestsStore = transferRequestsStore;
+    }
+
+    @Override
+    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
+        if (!request.decoderResult().isSuccess()) {
+            sendError(ctx, BAD_REQUEST);
+            return;
+        }
+
+        if (request.method() != GET) {
+            sendError(ctx, METHOD_NOT_ALLOWED);
+            return;
+        }
+
+        final String uri = request.uri().substring(1);
+        logger.info("Received download request through url {}", uri);
+
+        HttpTransferRequest httpTransferRequest = transferRequestsStore.getDownloadRequest(uri);
+
+        if (httpTransferRequest == null) {
+            logger.error("Couldn't find transfer request for uri {}", uri);
+            sendError(ctx, NOT_FOUND);
+            return;
+        }
+
+        Connector connector = httpTransferRequest.getOtherConnector();
+        MetadataCollector metadataCollector = httpTransferRequest.getOtherMetadataCollector();
+
+        ConnectorParams params = httpTransferRequest.getConnectorParams();
+
+        connector.init(params.getStorageId(), params.getCredentialToken(), params.getResourceServiceHost(),
+                params.getResourceServicePort(), params.getSecretServiceHost(), params.getSecretServicePort());
+
+        metadataCollector.init(params.getResourceServiceHost(), params.getResourceServicePort(),
+                params.getSecretServiceHost(), params.getSecretServicePort());
+
+        Boolean available = metadataCollector.isAvailable(params.getStorageId(),
+                httpTransferRequest.getTargetResourcePath(),
+                params.getCredentialToken());
+
+
+        if (!available) {
+            logger.error("File {} is not available in store {}", httpTransferRequest.getTargetResourcePath(), params.getStorageId());
+            sendError(ctx, NOT_FOUND);
+            return;
+        }
+
+        FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(params.getStorageId(),
+                httpTransferRequest.getTargetResourcePath(),
+                params.getCredentialToken());
+
+        long fileLength = fileResourceMetadata.getResourceSize();
+
+        HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+        HttpUtil.setContentLength(response, fileLength);
+        setContentTypeHeader(response, httpTransferRequest.getTargetResourcePath());
+
+        if (HttpUtil.isKeepAlive(request)) {
+            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
+        }
+
+        // Write the initial line and the header.
+        ctx.write(response);
+
+        // Write the content.
+        ChannelFuture sendFileFuture;
+        ChannelFuture lastContentFuture;
+
+        ConnectorContext connectorContext = new ConnectorContext();
+        connectorContext.setStreamBuffer(new DoubleStreamingBuffer());
+        connectorContext.setTransferId(uri);
+        connectorContext.setMetadata(new FileResourceMetadata()); // TODO Resolve
+
+        TransferTask pullTask = new TransferTask(connector, connectorContext, httpTransferRequest.getTargetResourcePath());
+
+        // TODO aggregate pullStatusFuture and sendFileFuture for keepalive test
+        Future<Integer> pullStatusFuture = executor.submit(pullTask);
+
+        sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedStream(connectorContext.getStreamBuffer().getInputStream())),
+                ctx.newProgressivePromise());
+
+        // HttpChunkedInput will write the end marker (LastHttpContent) for us.
+        lastContentFuture = sendFileFuture;
+
+        sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
+            @Override
+            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
+                if (total < 0) { // total unknown
+                    System.err.println(future.channel() + " Transfer progress: " + progress);
+                } else {
+                    System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
+                }
+            }
+
+            @Override
+            public void operationComplete(ChannelProgressiveFuture future) {
+                System.err.println(future.channel() + " Transfer complete.");
+            }
+        });
+
+        // Decide whether to close the connection or not.
+        if (!HttpUtil.isKeepAlive(request)) {
+            // Close the connection when the whole content is written out.
+            lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        cause.printStackTrace();
+        if (ctx.channel().isActive()) {
+            sendError(ctx, INTERNAL_SERVER_ERROR);
+        }
+    }
+
+    private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
+        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
+        response.headers().set(HttpHeaderNames.LOCATION, newUri);
+
+        // Close the connection as soon as the error message is sent.
+        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+    }
+
+    private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+        FullHttpResponse response = new DefaultFullHttpResponse(
+                HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
+
+        // Close the connection as soon as the error message is sent.
+        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+    }
+
+    private static void setContentTypeHeader(HttpResponse response, String path) {
+        MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, path);
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerInitializer.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerInitializer.java
new file mode 100644
index 0000000..f94d067
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpServerInitializer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
+
+    private final SslContext sslCtx;
+    private final HttpTransferRequestsStore transferRequestsStore;
+
+    public HttpServerInitializer(SslContext sslCtx, HttpTransferRequestsStore transferRequestsStore) {
+        this.sslCtx = sslCtx;
+        this.transferRequestsStore = transferRequestsStore;
+    }
+
+    @Override
+    public void initChannel(SocketChannel ch) {
+        ChannelPipeline pipeline = ch.pipeline();
+        if (sslCtx != null) {
+            pipeline.addLast(sslCtx.newHandler(ch.alloc()));
+        }
+        pipeline.addLast(new HttpServerCodec());
+        pipeline.addLast(new HttpObjectAggregator(65536));
+        pipeline.addLast(new ChunkedWriteHandler());
+        pipeline.addLast(new HttpServerHandler(transferRequestsStore));
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java
new file mode 100644
index 0000000..dbc807f
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.MetadataCollector;
+
+public class HttpTransferRequest {
+    private Connector otherConnector;
+    private MetadataCollector otherMetadataCollector;
+    private ConnectorParams connectorParams;
+    private String targetResourcePath;
+
+    public Connector getOtherConnector() {
+        return otherConnector;
+    }
+
+    public HttpTransferRequest setOtherConnector(Connector otherConnector) {
+        this.otherConnector = otherConnector;
+        return this;
+    }
+
+    public MetadataCollector getOtherMetadataCollector() {
+        return otherMetadataCollector;
+    }
+
+    public HttpTransferRequest setOtherMetadataCollector(MetadataCollector otherMetadataCollector) {
+        this.otherMetadataCollector = otherMetadataCollector;
+        return this;
+    }
+
+    public String getTargetResourcePath() {
+        return targetResourcePath;
+    }
+
+    public HttpTransferRequest setTargetResourcePath(String targetResourcePath) {
+        this.targetResourcePath = targetResourcePath;
+        return this;
+    }
+
+    public ConnectorParams getConnectorParams() {
+        return connectorParams;
+    }
+
+    public HttpTransferRequest setConnectorParams(ConnectorParams connectorParams) {
+        this.connectorParams = connectorParams;
+        return this;
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
new file mode 100644
index 0000000..fc38701
--- /dev/null
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/http/HttpTransferRequestsStore.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.agent.http;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class HttpTransferRequestsStore {
+
+    final private Map<String, HttpTransferRequest> downloadRequestStore = new HashMap<>();
+    final private Map<String, HttpTransferRequest> uploadRequestStore = new HashMap<>();
+
+    public String addDownloadRequest(HttpTransferRequest request) {
+        String randomUrl = UUID.randomUUID().toString();
+        downloadRequestStore.put(randomUrl, request);
+        return randomUrl;
+    }
+
+    public HttpTransferRequest getDownloadRequest(String url) {
+
+        //TODO  Need to block concurrent calls to same url as connectors are not thread safe
+        HttpTransferRequest request = downloadRequestStore.get(url);
+        return request;
+    }
+
+    public String addUploadRequest(HttpTransferRequest request) {
+        String randomUrl = UUID.randomUUID().toString();
+        uploadRequestStore.put(randomUrl, request);
+        return randomUrl;
+    }
+
+    public HttpTransferRequest getUploadRequest(String url) {
+
+        //TODO  Need to block concurrent calls to same url as connectors are not thread safe
+        HttpTransferRequest request = downloadRequestStore.get(url);
+        return request;
+    }
+}
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
index f136704..dbcc48a 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/rpc/RPCParser.java
@@ -20,12 +20,18 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
 import org.apache.airavata.mft.admin.models.rpc.SyncRPCResponse;
+import org.apache.airavata.mft.agent.http.ConnectorParams;
+import org.apache.airavata.mft.agent.http.HttpTransferRequest;
+import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
+import org.apache.airavata.mft.core.ConnectorResolver;
 import org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
+import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.Optional;
 
@@ -45,6 +51,9 @@
     @org.springframework.beans.factory.annotation.Value("${secret.service.port}")
     private int secretServicePort;
 
+    @Autowired
+    private HttpTransferRequestsStore httpTransferRequestsStore;
+
     public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
         // TODO implement using the reflection
         ObjectMapper mapper = new ObjectMapper();
@@ -64,6 +73,7 @@
                     return mapper.writeValueAsString(fileResourceMetadata);
                 }
                 break;
+
             case "getChildFileResourceMetadata":
                 resourceId = request.getParameters().get("resourceId");
                 resourceType = request.getParameters().get("resourceType");
@@ -79,6 +89,7 @@
                     return mapper.writeValueAsString(fileResourceMetadata);
                 }
                 break;
+
             case "getDirectoryResourceMetadata":
                 resourceId = request.getParameters().get("resourceId");
                 resourceType = request.getParameters().get("resourceType");
@@ -93,6 +104,7 @@
                     return mapper.writeValueAsString(dirResourceMetadata);
                 }
                 break;
+
             case "getChildDirectoryResourceMetadata":
                 resourceId = request.getParameters().get("resourceId");
                 resourceType = request.getParameters().get("resourceType");
@@ -108,6 +120,32 @@
                     return mapper.writeValueAsString(dirResourceMetadata);
                 }
                 break;
+
+            case "submitHttpDownload":
+                String storeId = request.getParameters().get("storeId");
+                String sourcePath = request.getParameters().get("sourcePath");
+                String sourceToken = request.getParameters().get("sourceToken");
+                String storeType = request.getParameters().get("storeType");
+                mftAuthorizationToken = request.getParameters().get("mftAuthorizationToken");
+
+                metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(storeType);
+                Optional<Connector> connectorOp = ConnectorResolver.resolveConnector(storeType, "IN");
+
+                if (metadataCollectorOp.isPresent() && connectorOp.isPresent()) {
+                    HttpTransferRequest transferRequest = new HttpTransferRequest();
+                    transferRequest.setConnectorParams(new ConnectorParams()
+                            .setResourceServiceHost(resourceServiceHost)
+                            .setResourceServicePort(resourceServicePort)
+                            .setSecretServiceHost(secretServiceHost)
+                            .setSecretServicePort(secretServicePort)
+                            .setStorageId(storeId).setCredentialToken(sourceToken));
+                    transferRequest.setTargetResourcePath(sourcePath);
+                    transferRequest.setOtherMetadataCollector(metadataCollectorOp.get());
+                    transferRequest.setOtherConnector(connectorOp.get());
+                    String url = httpTransferRequestsStore.addDownloadRequest(transferRequest);
+                    return url;
+                }
+                break;
         }
         logger.error("Unknown method type specified {}", request.getMethod());
         throw new Exception("Unknown method " + request.getMethod());
diff --git a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
index 108bd30..1a6a8b9 100644
--- a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
+++ b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
@@ -92,6 +92,51 @@
     }
 
     @Override
+    public void submitHttpUpload(HttpUploadApiRequest request, StreamObserver<HttpUploadApiResponse> responseObserver) {
+        super.submitHttpUpload(request, responseObserver);
+    }
+
+    @Override
+    public void submitHttpDownload(HttpDownloadApiRequest request, StreamObserver<HttpDownloadApiResponse> responseObserver) {
+        try {
+
+            // TODO : Automatically derive agent if the target agent is empty
+            SyncRPCRequest.SyncRPCRequestBuilder requestBuilder = SyncRPCRequest.SyncRPCRequestBuilder.builder()
+                    .withAgentId(request.getTargetAgent())
+                    .withMessageId(UUID.randomUUID().toString())
+                    .withMethod("submitHttpDownload")
+                    .withParameter("storeId", request.getSourceStoreId())
+                    .withParameter("sourcePath", request.getSourcePath())
+                    .withParameter("sourceToken", request.getSourceToken())
+                    .withParameter("storeType", request.getSourceType())
+                    .withParameter("mftAuthorizationToken", request.getMftAuthorizationToken());
+
+            SyncRPCResponse rpcResponse = agentRPCClient.sendSyncRequest(requestBuilder.build());
+
+            switch (rpcResponse.getResponseStatus()) {
+                case SUCCESS:
+                    String url = rpcResponse.getResponseAsStr();
+                    HttpDownloadApiResponse downloadResponse = HttpDownloadApiResponse.newBuilder()
+                            .setUrl(url)
+                            .setTargetAgent(request.getTargetAgent()).build();
+                    responseObserver.onNext(downloadResponse);
+                    responseObserver.onCompleted();
+                    return;
+                case FAIL:
+                    logger.error("Errored while processing the download request to resource {} in store {}. Error msg : {}",
+                            request.getSourcePath(), request.getSourceStoreId(), rpcResponse.getErrorAsStr());
+                    responseObserver.onError(new Exception("Errored while processing the the fetch file metadata response. Error msg : " +
+                            rpcResponse.getErrorAsStr()));
+            }
+
+        } catch (Exception e) {
+            logger.error("Error while submitting http download request to path {} in store {}",
+                                                request.getSourcePath(), request.getSourceStoreId() , e);
+            responseObserver.onError(new Exception("Failed to submit http download request", e));
+        }
+    }
+
+    @Override
     public void getTransferStates(TransferStateApiRequest request, StreamObserver<TransferStateApiResponse> responseObserver) {
         try {
             List<TransferState> states = mftConsulClient.getTransferStates(request.getTransferId());
diff --git a/api/stub/src/main/proto/MFTApi.proto b/api/stub/src/main/proto/MFTApi.proto
index dc003ef..891413c 100644
--- a/api/stub/src/main/proto/MFTApi.proto
+++ b/api/stub/src/main/proto/MFTApi.proto
@@ -7,25 +7,55 @@
 import "google/protobuf/empty.proto";
 
 message TransferApiRequest {
-    string sourceId = 1;
-    string sourceType = 2;
-    string sourceToken = 3;
-    string sourceResourceBackend = 4;
-    string sourceCredentialBackend = 5;
-    string destinationId = 6;
-    string destinationType = 7;
-    string destinationToken = 8;
-    string destResourceBackend = 9;
-    string destCredentialBackend = 10;
-    bool affinityTransfer = 11;
-    map<string, int32> targetAgents = 12;
-    string mftAuthorizationToken = 13;
+    string sourceStorageId = 1;
+    string sourcePath = 2;
+    string sourceType = 3;
+    string sourceToken = 4;
+    string sourceResourceBackend = 5;
+    string sourceCredentialBackend = 6;
+    string destinationStorageId = 7;
+    string destinationPath = 8;
+    string destinationType = 9;
+    string destinationToken = 10;
+    string destResourceBackend = 11;
+    string destCredentialBackend = 12;
+    bool affinityTransfer = 13;
+    map<string, int32> targetAgents = 14;
+    string mftAuthorizationToken = 15;
 }
 
 message TransferApiResponse {
     string transferId = 1;
 }
 
+message HttpUploadApiRequest {
+    string destinationStoreId = 1;
+    string destinationPath = 2;
+    string destinationToken = 3;
+    string destinationType = 4;
+    string targetAgent = 5;
+    string mftAuthorizationToken = 6;
+}
+
+message HttpUploadApiResponse {
+    string url = 1;
+    string targetAgent = 2;
+}
+
+message HttpDownloadApiRequest {
+    string sourceStoreId = 1;
+    string sourcePath = 2;
+    string sourceToken = 3;
+    string sourceType = 4;
+    string targetAgent = 5;
+    string mftAuthorizationToken = 6;
+}
+
+message HttpDownloadApiResponse {
+    string url = 1;
+    string targetAgent = 2;
+}
+
 message TransferStateApiRequest {
     string transferId = 1;
     string mftAuthorizationToken = 2;
@@ -93,6 +123,18 @@
         };
     }
 
+    rpc submitHttpUpload(HttpUploadApiRequest) returns (HttpUploadApiResponse) {
+        option (google.api.http) = {
+           post: "/v1.0/api/http-upload"
+        };
+    }
+
+    rpc submitHttpDownload(HttpDownloadApiRequest) returns (HttpDownloadApiResponse) {
+        option (google.api.http) = {
+           post: "/v1.0/api/http-download"
+        };
+    }
+
     rpc getTransferStates(TransferStateApiRequest) returns (stream TransferStateApiResponse) {
         option (google.api.http) = {
            get: "/v1.0/api/transfer/states"
diff --git a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
index 0b0f2cd..ec4240c 100644
--- a/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
+++ b/controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java
@@ -221,12 +221,14 @@
 
     private TransferCommand convertRequestToCommand(String transferId, TransferRequest transferRequest) {
         TransferCommand transferCommand = new TransferCommand();
-        transferCommand.setSourceId(transferRequest.getSourceId())
+        transferCommand.setSourceStorageId(transferRequest.getSourceStorageId())
+                .setSourcePath(transferRequest.getSourcePath())
                 .setSourceToken(transferRequest.getSourceToken())
                 .setSourceType(transferRequest.getSourceType())
                 .setSourceResourceBackend(transferRequest.getSourceResourceBackend())
                 .setSourceCredentialBackend(transferRequest.getSourceCredentialBackend())
-                .setDestinationId(transferRequest.getDestinationId())
+                .setDestinationStorageId(transferRequest.getDestinationStorageId())
+                .setDestinationPath(transferRequest.getDestinationPath())
                 .setDestinationToken(transferRequest.getDestinationToken())
                 .setDestinationType(transferRequest.getDestinationType())
                 .setDestResourceBackend(transferRequest.getDestResourceBackend())
diff --git a/core/src/main/java/org/apache/airavata/mft/core/DirectoryResourceMetadata.java b/core/src/main/java/org/apache/airavata/mft/core/DirectoryResourceMetadata.java
index 5eb9813..5ff3b5f 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/DirectoryResourceMetadata.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/DirectoryResourceMetadata.java
@@ -26,8 +26,6 @@
     private long createdTime;
     private long updateTime;
     private String resourcePath;
-    private String parentResourceId;
-    private String parentResourceType;
     private List<DirectoryResourceMetadata> directories = new ArrayList<>();
     private List<FileResourceMetadata> files = new ArrayList<>();
     private boolean lazyInitialized = true;
@@ -68,24 +66,6 @@
         return this;
     }
 
-    public String getParentResourceId() {
-        return parentResourceId;
-    }
-
-    public DirectoryResourceMetadata setParentResourceId(String parentResourceId) {
-        this.parentResourceId = parentResourceId;
-        return this;
-    }
-
-    public String getParentResourceType() {
-        return parentResourceType;
-    }
-
-    public DirectoryResourceMetadata setParentResourceType(String parentResourceType) {
-        this.parentResourceType = parentResourceType;
-        return this;
-    }
-
     public List<DirectoryResourceMetadata> getDirectories() {
         return directories;
     }
@@ -119,8 +99,6 @@
         private long createdTime;
         private long updateTime;
         private String resourcePath;
-        private String parentResourceId;
-        private String parentResourceType;
         private List<DirectoryResourceMetadata> directories = new ArrayList<>();
         private List<FileResourceMetadata> files = new ArrayList<>();
         private boolean lazyInitialized = true;
@@ -152,16 +130,6 @@
             return this;
         }
 
-        public Builder withParentResourceId(String parentResourceId) {
-            this.parentResourceId = parentResourceId;
-            return this;
-        }
-
-        public Builder withParentResourceType(String parentResourceType) {
-            this.parentResourceType = parentResourceType;
-            return this;
-        }
-
         public Builder withDirectories(List<DirectoryResourceMetadata> directories) {
             this.directories = directories;
             return this;
@@ -193,8 +161,6 @@
             directoryResourceMetadata.setCreatedTime(createdTime);
             directoryResourceMetadata.setUpdateTime(updateTime);
             directoryResourceMetadata.setResourcePath(resourcePath);
-            directoryResourceMetadata.setParentResourceId(parentResourceId);
-            directoryResourceMetadata.setParentResourceType(parentResourceType);
             directoryResourceMetadata.setDirectories(directories);
             directoryResourceMetadata.setFiles(files);
             directoryResourceMetadata.setLazyInitialized(lazyInitialized);
diff --git a/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java b/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
index 0b2372e..d61d743 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/FileResourceMetadata.java
@@ -25,8 +25,6 @@
     private long updateTime;
     private String md5sum;
     private String resourcePath;
-    private String parentResourceId;
-    private String parentResourceType;
 
     public String getFriendlyName() {
         return friendlyName;
@@ -82,24 +80,6 @@
         return this;
     }
 
-    public String getParentResourceId() {
-        return parentResourceId;
-    }
-
-    public FileResourceMetadata setParentResourceId(String parentResourceId) {
-        this.parentResourceId = parentResourceId;
-        return this;
-    }
-
-    public String getParentResourceType() {
-        return parentResourceType;
-    }
-
-    public FileResourceMetadata setParentResourceType(String parentResourceType) {
-        this.parentResourceType = parentResourceType;
-        return this;
-    }
-
     public static final class Builder {
         private String friendlyName;
         private long resourceSize;
@@ -107,8 +87,6 @@
         private long updateTime;
         private String md5sum;
         private String resourcePath;
-        private String parentResourceId;
-        private String parentResourceType;
 
         private Builder() {
         }
@@ -147,15 +125,6 @@
             return this;
         }
 
-        public Builder withParentResourceId(String parentResourceId) {
-            this.parentResourceId = parentResourceId;
-            return this;
-        }
-
-        public Builder withParentResourceType(String parentResourceType) {
-            this.parentResourceType = parentResourceType;
-            return this;
-        }
 
         public FileResourceMetadata build() {
             FileResourceMetadata fileResourceMetadata = new FileResourceMetadata();
@@ -165,8 +134,6 @@
             fileResourceMetadata.setUpdateTime(updateTime);
             fileResourceMetadata.setMd5sum(md5sum);
             fileResourceMetadata.setResourcePath(resourcePath);
-            fileResourceMetadata.setParentResourceId(parentResourceId);
-            fileResourceMetadata.setParentResourceType(parentResourceType);
             return fileResourceMetadata;
         }
     }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java b/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
index b90f4cc..860e9f7 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/TransferTask.java
@@ -17,7 +17,6 @@
 
 package org.apache.airavata.mft.core;
 
-import org.apache.airavata.mft.core.ConnectorContext;
 import org.apache.airavata.mft.core.api.Connector;
 
 import java.util.concurrent.Callable;
@@ -26,15 +25,17 @@
 
     private Connector connector;
     private ConnectorContext context;
+    private String resourcePath;
 
-    public TransferTask(Connector connector, ConnectorContext context) {
+    public TransferTask(Connector connector, ConnectorContext context, String resourcePath) {
         this.connector = connector;
         this.context = context;
+        this.resourcePath = resourcePath;
     }
 
     @Override
     public Integer call() throws Exception {
-        this.connector.startStream(context);
+        this.connector.startStream(resourcePath, context);
         return 0;
     }
 }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
index 772d90e..8264d58 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
@@ -20,8 +20,8 @@
 import org.apache.airavata.mft.core.ConnectorContext;
 
 public interface Connector {
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
         String secretServiceHost, int secretServicePort) throws Exception;
     public void destroy();
-    void startStream(ConnectorContext context) throws Exception;
+    void startStream(String targetPath, ConnectorContext context) throws Exception;
 }
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/MetadataCollector.java b/core/src/main/java/org/apache/airavata/mft/core/api/MetadataCollector.java
index e784867..38a5529 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/MetadataCollector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/MetadataCollector.java
@@ -43,17 +43,15 @@
     public FileResourceMetadata getFileResourceMetadata(String resourceId, String credentialToken) throws Exception;
 
     /**
-     * Fetches a metadata of given File Resource inside a registered directory resource. Target file might be living in
-     * multiple level below the parent directory
+     * Fetches a metadata of given File Resource
      *
-     * @param parentResourceId parent directory resource id
-     * @param resourcePath path of the target resource. This should be a child path of the parent resource
+     * @param storageId id of the storage resource
+     * @param resourcePath resource path
      * @param credentialToken credential token for the resource
      * @return an object of {@link FileResourceMetadata}
-     * @throws Exception if the parent resource is not a Directory resource or the target resource is not a File Resource type
-     * or the resource can't be fetched from the resource service
+     * @throws Exception if the resource id is not a File Resource type or the resource can't be fetched from the resource service
      */
-    public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception;
+    public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception;
 
     /**
      * Fetches a metadata of given Directory Resource
@@ -66,17 +64,14 @@
     public DirectoryResourceMetadata getDirectoryResourceMetadata(String resourceId, String credentialToken) throws Exception;
 
     /**
-     * Fetches a metadata of given Directory Resource inside a registered directory resource. Target directory might be living in
-     * multiple level below the parent directory
+     * Fetches a metadata of given Directory Resource
      *
-     * @param parentResourceId parent directory resource id
-     * @param resourcePath path of the target resource. This should be a child path of the parent resource
-     * @param credentialToken credential token for the resource
+     * @param storageId id of the storage resource
+     * @param resourcePath resource path
      * @return an object of {@link DirectoryResourceMetadata}
-     * @throws Exception if the parent resource is not a Directory resource or the target resource is not a Directory Resource type
-     * or the resource can't be fetched from the resource service
+     * @throws Exception if the resource id is not a Directory Resource type or the resource can't be fetched from the resource service
      */
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception;
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception;
 
     /**
      * Check whether the resource is available in the actual storage
@@ -87,4 +82,15 @@
      * @throws Exception if the resource details can not be fetched from the resource service
      */
     public Boolean isAvailable(String resourceId, String credentialToken) throws Exception;
+
+    /**
+     * Check whether the resource is available in the actual storage
+     *
+     * @param storageId id of the storage
+     * @param resourcePath resource path
+     * @param credentialToken credential token for the resource
+     * @return true of the resource is available false otherwise
+     * @throws Exception if the resource details can not be fetched from the resource service
+     */
+    public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception;
 }
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java
new file mode 100644
index 0000000..93b5d83
--- /dev/null
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/http/DownloadExample.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.examples.http;
+
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.HttpDownloadApiRequest;
+import org.apache.airavata.mft.api.service.HttpDownloadApiResponse;
+import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
+
+public class DownloadExample {
+    public static void main(String args[]) {
+        MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
+        HttpDownloadApiResponse httpDownloadApiResponse = client.submitHttpDownload(HttpDownloadApiRequest.newBuilder()
+                .setTargetAgent("agent0")
+                .setSourcePath("/tmp/a.txt")
+                .setSourceStoreId("remote-ssh-storage")
+                .setSourceToken("local-ssh-cred")
+                .setSourceType("SCP")
+                .setMftAuthorizationToken("")
+                .build());
+
+        System.out.println(httpDownloadApiResponse);
+    }
+}
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
index 53bfa71..af73d7c 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/LocalExample.java
@@ -26,18 +26,22 @@
     public static void main(String args[]) throws Exception {
         MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
 
-        String sourceId = "remote-ssh-resource2";
+        String sourceStorageId = "remote-ssh-storage";
+        String sourceResourcePath = "/tmp/1mb.txt";
         String sourceToken = "local-ssh-cred";
-        String destId = "10mb-file";
+        String destStorageId = "local-storage-1";
+        String destResourcePath = "/tmp/1mb-copy.txt";
         String destToken = "";
         String mftAuthorizationToken = "43ff79ac-e4f2-473c-9ea1-04eee9509a53";
 
         TransferApiRequest request = TransferApiRequest.newBuilder()
                 .setMftAuthorizationToken(mftAuthorizationToken)
-                .setSourceId(sourceId)
+                .setSourceStorageId(sourceStorageId)
+                .setSourcePath(sourceResourcePath)
                 .setSourceToken(sourceToken)
                 .setSourceType("SCP")
-                .setDestinationId(destId)
+                .setDestinationStorageId(destStorageId)
+                .setDestinationPath(destResourcePath)
                 .setDestinationToken(destToken)
                 .setDestinationType("LOCAL")
                 .setAffinityTransfer(false).build();
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
index 8443cf2..9df81e2 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/S3Example.java
@@ -26,18 +26,22 @@
     public static void main(String args[]) throws Exception {
         MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
 
-        String sourceId = "remote-ssh-resource2";
-        String sourceToken = "local-ssh-cred";
-        String destId = "s3-file";
+        String sourceStorageId = "remote-ssh-storage";
+        String sourceResourcePath = "/tmp/1mb.txt";
+        String sourceToken = "ssh-cred";
+        String destStorageId = "s3-storage-1";
+        String destResourcePath = "1mb-copy.txt";
         String destToken = "s3-cred";
         String mftAuthorizationToken = "43ff79ac-e4f2-473c-9ea1-04eee9509a53";
 
         TransferApiRequest request = TransferApiRequest.newBuilder()
                 .setMftAuthorizationToken(mftAuthorizationToken)
-                .setSourceId(sourceId)
+                .setSourceStorageId(sourceStorageId)
+                .setSourcePath(sourceResourcePath)
                 .setSourceToken(sourceToken)
                 .setSourceType("SCP")
-                .setDestinationId(destId)
+                .setDestinationStorageId(destStorageId)
+                .setDestinationPath(destResourcePath)
                 .setDestinationToken(destToken)
                 .setDestinationType("S3")
                 .setAffinityTransfer(false).build();
diff --git a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
index 5977598..72ffac4 100644
--- a/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
+++ b/examples/src/main/java/org/apache/airavata/mft/examples/transfer/SCPExample.java
@@ -26,18 +26,22 @@
     public static void main(String args[]) throws Exception {
         MFTApiServiceGrpc.MFTApiServiceBlockingStub client = MFTApiClient.buildClient("localhost", 7004);
 
-        String sourceId = "remote-ssh-resource2";
-        String sourceToken = "local-ssh-cred";
-        String destId = "remote-ssh-resource";
-        String destToken = "local-ssh-cred";
+        String sourceStorageId = "remote-ssh-storage-1";
+        String sourceResourcePath = "/tmp/1mb.txt";
+        String sourceToken = "ssh-cred-1";
+        String destStorageId = "remote-ssh-storage-2";
+        String destResourcePath = "/tmp/1mb-copy.txt";
+        String destToken = "ssh-cred-2";
         String mftAuthorizationToken = "43ff79ac-e4f2-473c-9ea1-04eee9509a53";
 
         TransferApiRequest request = TransferApiRequest.newBuilder()
                 .setMftAuthorizationToken(mftAuthorizationToken)
-                .setSourceId(sourceId)
+                .setSourceStorageId(sourceStorageId)
+                .setSourcePath(sourceResourcePath)
                 .setSourceToken(sourceToken)
                 .setSourceType("SCP")
-                .setDestinationId(destId)
+                .setDestinationStorageId(destStorageId)
+                .setDestinationPath(destResourcePath)
                 .setDestinationToken(destToken)
                 .setDestinationType("SCP")
                 .setAffinityTransfer(false).build();
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java
index 2e3a550..d924995 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/ResourceBackend.java
@@ -18,13 +18,19 @@
 package org.apache.airavata.mft.resource.server.backend;
 
 import org.apache.airavata.mft.resource.stubs.azure.resource.*;
+import org.apache.airavata.mft.resource.stubs.azure.storage.*;
 import org.apache.airavata.mft.resource.stubs.box.resource.*;
+import org.apache.airavata.mft.resource.stubs.box.storage.*;
 import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.*;
 import org.apache.airavata.mft.resource.stubs.ftp.resource.*;
 import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
 import org.apache.airavata.mft.resource.stubs.gcs.resource.*;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.*;
 import org.apache.airavata.mft.resource.stubs.local.resource.*;
+import org.apache.airavata.mft.resource.stubs.local.storage.*;
 import org.apache.airavata.mft.resource.stubs.s3.resource.*;
+import org.apache.airavata.mft.resource.stubs.s3.storage.*;
 import org.apache.airavata.mft.resource.stubs.scp.resource.*;
 import org.apache.airavata.mft.resource.stubs.scp.storage.*;
 
@@ -45,31 +51,61 @@
     public boolean updateSCPResource(SCPResourceUpdateRequest request) throws Exception;
     public boolean deleteSCPResource(SCPResourceDeleteRequest request) throws Exception;
 
+    public Optional<LocalStorage> getLocalStorage(LocalStorageGetRequest request) throws Exception;
+    public LocalStorage createLocalStorage(LocalStorageCreateRequest request) throws Exception;
+    public boolean updateLocalStorage(LocalStorageUpdateRequest request) throws Exception;
+    public boolean deleteLocalStorage(LocalStorageDeleteRequest request) throws Exception;
+
     public Optional<LocalResource> getLocalResource(LocalResourceGetRequest request) throws Exception;
     public LocalResource createLocalResource(LocalResourceCreateRequest request) throws Exception;
     public boolean updateLocalResource(LocalResourceUpdateRequest request) throws Exception;
     public boolean deleteLocalResource(LocalResourceDeleteRequest request) throws Exception;
 
+    public Optional<S3Storage> getS3Storage(S3StorageGetRequest request) throws Exception;
+    public S3Storage createS3Storage(S3StorageCreateRequest request) throws Exception;
+    public boolean updateS3Storage(S3StorageUpdateRequest request) throws Exception;
+    public boolean deleteS3Storage(S3StorageDeleteRequest request) throws Exception;
+
     public Optional<S3Resource> getS3Resource(S3ResourceGetRequest request) throws Exception;
     public S3Resource createS3Resource(S3ResourceCreateRequest request) throws Exception;
     public boolean updateS3Resource(S3ResourceUpdateRequest request) throws Exception;
     public boolean deleteS3Resource(S3ResourceDeleteRequest request) throws Exception;
 
+    public Optional<BoxStorage> getBoxStorage(BoxStorageGetRequest request) throws Exception;
+    public BoxStorage createBoxStorage(BoxStorageCreateRequest request) throws Exception;
+    public boolean updateBoxStorage(BoxStorageUpdateRequest request) throws Exception;
+    public boolean deleteBoxStorage(BoxStorageDeleteRequest request) throws Exception;
+
     public Optional<BoxResource> getBoxResource(BoxResourceGetRequest request) throws Exception;
     public BoxResource createBoxResource(BoxResourceCreateRequest request) throws Exception;
     public boolean updateBoxResource(BoxResourceUpdateRequest request) throws Exception;
     public boolean deleteBoxResource(BoxResourceDeleteRequest request) throws Exception;
 
+    public Optional<AzureStorage> getAzureStorage(AzureStorageGetRequest request) throws Exception;
+    public AzureStorage createAzureStorage(AzureStorageCreateRequest request) throws Exception;
+    public boolean updateAzureStorage(AzureStorageUpdateRequest request) throws Exception;
+    public boolean deleteAzureStorage(AzureStorageDeleteRequest request) throws Exception;
+
     public Optional<AzureResource> getAzureResource(AzureResourceGetRequest request) throws Exception;
     public AzureResource createAzureResource(AzureResourceCreateRequest request) throws Exception;
     public boolean updateAzureResource(AzureResourceUpdateRequest request) throws Exception;
     public boolean deleteAzureResource(AzureResourceDeleteRequest request) throws Exception;
 
+    public Optional<GCSStorage> getGCSStorage(GCSStorageGetRequest request) throws Exception;
+    public GCSStorage createGCSStorage(GCSStorageCreateRequest request) throws Exception;
+    public boolean updateGCSStorage(GCSStorageUpdateRequest request) throws Exception;
+    public boolean deleteGCSStorage(GCSStorageDeleteRequest request) throws Exception;
+
     public Optional<GCSResource> getGCSResource(GCSResourceGetRequest request) throws Exception;
     public GCSResource createGCSResource(GCSResourceCreateRequest request) throws Exception;
     public boolean updateGCSResource(GCSResourceUpdateRequest request) throws Exception;
     public boolean deleteGCSResource(GCSResourceDeleteRequest request) throws Exception;
 
+    public Optional<DropboxStorage> getDropboxStorage(DropboxStorageGetRequest request) throws Exception;
+    public DropboxStorage createDropboxStorage(DropboxStorageCreateRequest request) throws Exception;
+    public boolean updateDropboxStorage(DropboxStorageUpdateRequest request) throws Exception;
+    public boolean deleteDropboxStorage(DropboxStorageDeleteRequest request) throws Exception;
+
     public Optional<DropboxResource> getDropboxResource(DropboxResourceGetRequest request) throws Exception;
     public DropboxResource createDropboxResource(DropboxResourceCreateRequest request) throws Exception;
     public boolean updateDropboxResource(DropboxResourceUpdateRequest request) throws Exception;
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
index 6299a07..a5a78f1 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
@@ -19,14 +19,20 @@
 
 import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
 import org.apache.airavata.mft.resource.stubs.azure.resource.*;
+import org.apache.airavata.mft.resource.stubs.azure.storage.*;
 import org.apache.airavata.mft.resource.stubs.box.resource.*;
+import org.apache.airavata.mft.resource.stubs.box.storage.*;
 import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.*;
 import org.apache.airavata.mft.resource.stubs.ftp.resource.*;
 import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
 import org.apache.airavata.mft.resource.stubs.gcs.resource.*;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.*;
 import org.apache.airavata.mft.resource.stubs.local.resource.*;
+import org.apache.airavata.mft.resource.stubs.local.storage.*;
 import org.apache.airavata.mft.resource.stubs.s3.resource.*;
+import org.apache.airavata.mft.resource.stubs.s3.storage.*;
 import org.apache.airavata.mft.resource.stubs.scp.resource.*;
 import org.apache.airavata.mft.resource.stubs.scp.storage.*;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
@@ -155,6 +161,26 @@
     }
 
     @Override
+    public Optional<LocalStorage> getLocalStorage(LocalStorageGetRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public LocalStorage createLocalStorage(LocalStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateLocalStorage(LocalStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteLocalStorage(LocalStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<LocalResource> getLocalResource(LocalResourceGetRequest request) {
         throw new UnsupportedOperationException("Operation is not supported in backend");
 
@@ -179,6 +205,26 @@
     }
 
     @Override
+    public Optional<S3Storage> getS3Storage(S3StorageGetRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public S3Storage createS3Storage(S3StorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateS3Storage(S3StorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteS3Storage(S3StorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<S3Resource> getS3Resource(S3ResourceGetRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
 
@@ -203,6 +249,26 @@
     }
 
     @Override
+    public Optional<BoxStorage> getBoxStorage(BoxStorageGetRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public BoxStorage createBoxStorage(BoxStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateBoxStorage(BoxStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteBoxStorage(BoxStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<BoxResource> getBoxResource(BoxResourceGetRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
     }
@@ -223,6 +289,26 @@
     }
 
     @Override
+    public Optional<AzureStorage> getAzureStorage(AzureStorageGetRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public AzureStorage createAzureStorage(AzureStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateAzureStorage(AzureStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteAzureStorage(AzureStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<AzureResource> getAzureResource(AzureResourceGetRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
     }
@@ -243,6 +329,26 @@
     }
 
     @Override
+    public Optional<GCSStorage> getGCSStorage(GCSStorageGetRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public GCSStorage createGCSStorage(GCSStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateGCSStorage(GCSStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteGCSStorage(GCSStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<GCSResource> getGCSResource(GCSResourceGetRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
     }
@@ -261,6 +367,27 @@
     public boolean deleteGCSResource(GCSResourceDeleteRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
     }
+
+    @Override
+    public Optional<DropboxStorage> getDropboxStorage(DropboxStorageGetRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public DropboxStorage createDropboxStorage(DropboxStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateDropboxStorage(DropboxStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteDropboxStorage(DropboxStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
     @Override
     public Optional<DropboxResource> getDropboxResource(DropboxResourceGetRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
index 3841409..eace41a 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
@@ -19,21 +19,21 @@
 
 import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
 import org.apache.airavata.mft.resource.stubs.azure.resource.*;
-import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorage;
+import org.apache.airavata.mft.resource.stubs.azure.storage.*;
 import org.apache.airavata.mft.resource.stubs.box.resource.*;
-import org.apache.airavata.mft.resource.stubs.box.storage.BoxStorage;
+import org.apache.airavata.mft.resource.stubs.box.storage.*;
 import org.apache.airavata.mft.resource.stubs.common.DirectoryResource;
 import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
-import org.apache.airavata.mft.resource.stubs.dropbox.storage.DropboxStorage;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.*;
 import org.apache.airavata.mft.resource.stubs.ftp.resource.*;
 import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
 import org.apache.airavata.mft.resource.stubs.gcs.resource.*;
-import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorage;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.*;
 import org.apache.airavata.mft.resource.stubs.local.resource.*;
-import org.apache.airavata.mft.resource.stubs.local.storage.LocalStorage;
+import org.apache.airavata.mft.resource.stubs.local.storage.*;
 import org.apache.airavata.mft.resource.stubs.s3.resource.*;
-import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.resource.stubs.s3.storage.*;
 import org.apache.airavata.mft.resource.stubs.scp.resource.*;
 import org.apache.airavata.mft.resource.stubs.scp.storage.*;
 import org.json.simple.JSONArray;
@@ -57,6 +57,9 @@
     @org.springframework.beans.factory.annotation.Value("${file.backend.resource.file}")
     private String resourceFile;
 
+    @org.springframework.beans.factory.annotation.Value("${file.backend.storage.file}")
+    private String storageFile;
+
     @Override
     public void init() {
         logger.info("Initializing file based resource backend");
@@ -69,7 +72,33 @@
 
     @Override
     public Optional<SCPStorage> getSCPStorage(SCPStorageGetRequest request) throws Exception {
-        throw new UnsupportedOperationException("Operation is not supported in backend");
+        InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+        JSONParser jsonParser = new JSONParser();
+
+        try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+
+            Object obj = jsonParser.parse(reader);
+
+            JSONArray resourceList = (JSONArray) obj;
+
+            List<SCPStorage> scpStorages = (List<SCPStorage>) resourceList.stream()
+                    .filter(resource -> "SCP".equals(((JSONObject) resource).get("type").toString()))
+                    .map(st -> {
+                        JSONObject s = (JSONObject) st;
+
+                        SCPStorage storage = SCPStorage.newBuilder()
+                                .setStorageId(s.get("storageId").toString())
+                                .setHost(s.get("host").toString())
+                                .setUser(s.get("user").toString())
+                                .setPort(Integer.parseInt(s.get("port").toString())).build();
+
+                        return storage;
+
+                    }).collect(Collectors.toList());
+
+            return scpStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+        }
     }
 
     @Override
@@ -108,15 +137,9 @@
                     .map(resource -> {
                         JSONObject r = (JSONObject) resource;
 
-                        SCPStorage storage = SCPStorage.newBuilder()
-                                .setStorageId(((JSONObject)r.get("scpStorage")).get("storageId").toString())
-                                .setHost(((JSONObject)r.get("scpStorage")).get("host").toString())
-                                .setUser(((JSONObject)r.get("scpStorage")).get("user").toString())
-                                .setPort(Integer.parseInt(((JSONObject)r.get("scpStorage")).get("port").toString())).build();
-
                         SCPResource.Builder builder = SCPResource.newBuilder()
                                 .setResourceId(r.get("resourceId").toString())
-                                .setScpStorage(storage);
+                                .setScpStorage(SCPStorage.newBuilder().setStorageId(r.get("storageId").toString()).getDefaultInstanceForType());
 
                         switch (r.get("resourceMode").toString()) {
                             case "FILE":
@@ -156,6 +179,47 @@
     }
 
     @Override
+    public Optional<LocalStorage> getLocalStorage(LocalStorageGetRequest request) throws Exception {
+        JSONParser jsonParser = new JSONParser();
+        InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+        try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+            Object obj = jsonParser.parse(reader);
+
+            JSONArray storageList = (JSONArray) obj;
+
+            List<LocalStorage> localStorages = (List<LocalStorage>) storageList.stream()
+                    .filter(storage -> "LOCAL".equals(((JSONObject) storage).get("type").toString()))
+                    .map( storage -> {
+                        JSONObject s = (JSONObject) storage;
+
+                        LocalStorage st = LocalStorage.newBuilder()
+                                .setStorageId(s.get("storageId").toString())
+                                .setAgentId(s.get("agentId").toString())
+                                .build();
+
+                        return st;
+                    }).collect(Collectors.toList());
+            return localStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+        }
+    }
+
+    @Override
+    public LocalStorage createLocalStorage(LocalStorageCreateRequest request) throws Exception {
+        return null;
+    }
+
+    @Override
+    public boolean updateLocalStorage(LocalStorageUpdateRequest request) throws Exception {
+        return false;
+    }
+
+    @Override
+    public boolean deleteLocalStorage(LocalStorageDeleteRequest request) throws Exception {
+        return false;
+    }
+
+    @Override
     public Optional<LocalResource> getLocalResource(LocalResourceGetRequest request) throws Exception {
         JSONParser jsonParser = new JSONParser();
         InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
@@ -214,6 +278,48 @@
     }
 
     @Override
+    public Optional<S3Storage> getS3Storage(S3StorageGetRequest request) throws Exception {
+        JSONParser jsonParser = new JSONParser();
+        InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+        try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+            Object obj = jsonParser.parse(reader);
+
+            JSONArray storagesList = (JSONArray) obj;
+
+            List<S3Storage> s3Storages = (List<S3Storage>) storagesList.stream()
+                    .filter(storage -> "S3".equals(((JSONObject) storage).get("type").toString()))
+                    .map(storage -> {
+                        JSONObject s = (JSONObject) storage;
+
+                        S3Storage st = S3Storage.newBuilder()
+                                .setStorageId(s.get("storageId").toString())
+                                .setRegion(s.get("region").toString())
+                                .setRegion(s.get("bucketName").toString())
+                                .build();
+
+                        return st;
+                    }).collect(Collectors.toList());
+            return s3Storages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+        }
+    }
+
+    @Override
+    public S3Storage createS3Storage(S3StorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateS3Storage(S3StorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteS3Storage(S3StorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<S3Resource> getS3Resource(S3ResourceGetRequest request) throws Exception {
         JSONParser jsonParser = new JSONParser();
         InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
@@ -274,6 +380,47 @@
     }
 
     @Override
+    public Optional<BoxStorage> getBoxStorage(BoxStorageGetRequest request) throws Exception {
+        JSONParser jsonParser = new JSONParser();
+        InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+        try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+            Object obj = jsonParser.parse(reader);
+
+            JSONArray storageList = (JSONArray) obj;
+
+            System.out.println("All resources ");
+            List<BoxStorage> boxStorages = (List<BoxStorage>) storageList.stream()
+                    .filter(storage -> "BOX".equals(((JSONObject) storage).get("type").toString()))
+                    .map(storage -> {
+                        JSONObject s = (JSONObject) storage;
+
+                        BoxStorage st = BoxStorage.newBuilder()
+                                .setStorageId(s.get("storageId").toString())
+                                .build();
+
+                        return st;
+                    }).collect(Collectors.toList());
+            return boxStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+        }
+    }
+
+    @Override
+    public BoxStorage createBoxStorage(BoxStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateBoxStorage(BoxStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteBoxStorage(BoxStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<BoxResource> getBoxResource(BoxResourceGetRequest request) throws Exception {
         JSONParser jsonParser = new JSONParser();
         InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
@@ -330,6 +477,47 @@
     }
 
     @Override
+    public Optional<AzureStorage> getAzureStorage(AzureStorageGetRequest request) throws Exception {
+        JSONParser jsonParser = new JSONParser();
+        InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+        try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+            Object obj = jsonParser.parse(reader);
+
+            JSONArray storageList = (JSONArray) obj;
+
+            List<AzureStorage> azureStorages = (List<AzureStorage>) storageList.stream()
+                    .filter(storage -> "AZURE".equals(((JSONObject) storage).get("type").toString()))
+                    .map(storage -> {
+                        JSONObject s = (JSONObject) storage;
+
+                        AzureStorage st = AzureStorage.newBuilder()
+                                .setStorageId(s.get("storageId").toString())
+                                .setContainer(s.get("container").toString())
+                                .build();
+
+                        return st;
+                    }).collect(Collectors.toList());
+            return azureStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+        }
+    }
+
+    @Override
+    public AzureStorage createAzureStorage(AzureStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateAzureStorage(AzureStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteAzureStorage(AzureStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<AzureResource> getAzureResource(AzureResourceGetRequest request) throws Exception {
         JSONParser jsonParser = new JSONParser();
         InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
@@ -386,6 +574,47 @@
     }
 
     @Override
+    public Optional<GCSStorage> getGCSStorage(GCSStorageGetRequest request) throws Exception {
+        JSONParser jsonParser = new JSONParser();
+        InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+        try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+            Object obj = jsonParser.parse(reader);
+
+            JSONArray storageList = (JSONArray) obj;
+
+            List<GCSStorage> gcsStorages = (List<GCSStorage>) storageList.stream()
+                    .filter(storage -> "GCS".equals(((JSONObject) storage).get("type").toString()))
+                    .map(storage -> {
+                        JSONObject s = (JSONObject) storage;
+
+                        GCSStorage st = GCSStorage.newBuilder()
+                                .setStorageId(s.get("storageId").toString())
+                                .setBucketName(s.get("bucketName").toString())
+                                .build();
+
+                        return st;
+                    }).collect(Collectors.toList());
+            return gcsStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+        }
+    }
+
+    @Override
+    public GCSStorage createGCSStorage(GCSStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateGCSStorage(GCSStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteGCSStorage(GCSStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<GCSResource> getGCSResource(GCSResourceGetRequest request) throws Exception {
         JSONParser jsonParser = new JSONParser();
         InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
@@ -440,6 +669,64 @@
     public boolean deleteGCSResource(GCSResourceDeleteRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
     }
+
+    @Override
+    public Optional<DropboxStorage> getDropboxStorage(DropboxStorageGetRequest request) throws Exception {
+        JSONParser jsonParser = new JSONParser();
+        InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
+
+        try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+            Object obj = jsonParser.parse(reader);
+
+            JSONArray storageList = (JSONArray) obj;
+
+            List<DropboxStorage> dropboxStorages = (List<DropboxStorage>) storageList.stream()
+                    .filter(resource -> "DROPBOX".equals(((JSONObject) resource).get("type").toString()))
+                    .map(resource -> {
+                        JSONObject r = (JSONObject) resource;
+                        String resourcePath = r.get("resourcePath").toString();
+                        resourcePath = resourcePath.startsWith("/") ? resourcePath : "/" + resourcePath;
+
+                        DropboxStorage storage = DropboxStorage.newBuilder()
+                                .setStorageId(((JSONObject)r.get("dropboxStorage")).get("storageId").toString())
+                                .build();
+
+                        DropboxResource.Builder builder = DropboxResource.newBuilder()
+                                .setResourceId(r.get("resourceId").toString())
+                                .setDropboxStorage(storage);
+
+                        switch (r.get("resourceMode").toString()) {
+                            case "FILE":
+                                FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setFile(fileResource);
+                                break;
+                            case "DIRECTORY":
+                                DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setDirectory(directoryResource);
+                                break;
+                        }
+                        return builder.build();
+
+                    }).collect(Collectors.toList());
+            return dropboxStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+        }
+    }
+
+    @Override
+    public DropboxStorage createDropboxStorage(DropboxStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateDropboxStorage(DropboxStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteDropboxStorage(DropboxStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
     @Override
     public Optional<DropboxResource> getDropboxResource(DropboxResourceGetRequest request) throws Exception {
         JSONParser jsonParser = new JSONParser();
@@ -498,6 +785,55 @@
     }
 
     @Override
+    public Optional<FTPStorage> getFTPStorage(FTPStorageGetRequest request) throws Exception {
+        InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(storageFile);
+
+        JSONParser jsonParser = new JSONParser();
+
+        if (inputStream == null) {
+            throw new IOException("resources file not found");
+        }
+
+        try (InputStreamReader reader = new InputStreamReader(inputStream)) {
+
+            Object obj = jsonParser.parse(reader);
+
+            JSONArray storageList = (JSONArray) obj;
+
+            List<FTPStorage> ftpStorages = (List<FTPStorage>) storageList.stream()
+                    .filter(storage -> "FTP".equals(((JSONObject) storage).get("type").toString()))
+                    .map(storage -> {
+                        JSONObject s = (JSONObject) storage;
+
+                        FTPStorage st = FTPStorage.newBuilder()
+                                .setStorageId(s.get("storageId").toString())
+                                .setHost(s.get("host").toString())
+                                .setPort(Integer.parseInt(s.get("port").toString())).build();
+
+                        return st;
+
+                    }).collect(Collectors.toList());
+
+            return ftpStorages.stream().filter(s -> request.getStorageId().equals(s.getStorageId())).findFirst();
+        }
+    }
+
+    @Override
+    public FTPStorage createFTPStorage(FTPStorageCreateRequest request) {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateFTPStorage(FTPStorageUpdateRequest request) {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteFTPStorage(FTPStorageDeleteRequest request) {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<FTPResource> getFTPResource(FTPResourceGetRequest request) throws Exception {
         InputStream inputStream = FileBasedResourceBackend.class.getClassLoader().getResourceAsStream(resourceFile);
 
@@ -559,24 +895,4 @@
     public boolean deleteFTPResource(FTPResourceDeleteRequest request) {
         throw new UnsupportedOperationException("Operation is not supported in backend");
     }
-
-    @Override
-    public Optional<FTPStorage> getFTPStorage(FTPStorageGetRequest request) {
-        throw new UnsupportedOperationException("Operation is not supported in backend");
-    }
-
-    @Override
-    public FTPStorage createFTPStorage(FTPStorageCreateRequest request) {
-        throw new UnsupportedOperationException("Operation is not supported in backend");
-    }
-
-    @Override
-    public boolean updateFTPStorage(FTPStorageUpdateRequest request) {
-        throw new UnsupportedOperationException("Operation is not supported in backend");
-    }
-
-    @Override
-    public boolean deleteFTPStorage(FTPStorageDeleteRequest request) {
-        throw new UnsupportedOperationException("Operation is not supported in backend");
-    }
 }
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java
index a070b33..caf5057 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/sql/SQLResourceBackend.java
@@ -17,18 +17,23 @@
 
 package org.apache.airavata.mft.resource.server.backend.sql;
 
-
 import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
 import org.apache.airavata.mft.resource.server.backend.sql.entity.*;
 import org.apache.airavata.mft.resource.server.backend.sql.repository.*;
 import org.apache.airavata.mft.resource.stubs.azure.resource.*;
+import org.apache.airavata.mft.resource.stubs.azure.storage.*;
 import org.apache.airavata.mft.resource.stubs.box.resource.*;
+import org.apache.airavata.mft.resource.stubs.box.storage.*;
 import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.*;
 import org.apache.airavata.mft.resource.stubs.ftp.resource.*;
 import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
 import org.apache.airavata.mft.resource.stubs.gcs.resource.*;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.*;
 import org.apache.airavata.mft.resource.stubs.local.resource.*;
+import org.apache.airavata.mft.resource.stubs.local.storage.*;
 import org.apache.airavata.mft.resource.stubs.s3.resource.*;
+import org.apache.airavata.mft.resource.stubs.s3.storage.*;
 import org.apache.airavata.mft.resource.stubs.scp.resource.*;
 import org.apache.airavata.mft.resource.stubs.scp.storage.*;
 import org.dozer.DozerBeanMapper;
@@ -121,6 +126,26 @@
     }
 
     @Override
+    public Optional<LocalStorage> getLocalStorage(LocalStorageGetRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public LocalStorage createLocalStorage(LocalStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateLocalStorage(LocalStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteLocalStorage(LocalStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<LocalResource> getLocalResource(LocalResourceGetRequest request) {
         Optional<LocalResourceEntity> resourceEntity = localResourceRepository.findByResourceId(request.getResourceId());
         return resourceEntity.map(scpResourceEntity -> mapper.map(scpResourceEntity, LocalResource.newBuilder().getClass()).build());
@@ -145,6 +170,26 @@
     }
 
     @Override
+    public Optional<S3Storage> getS3Storage(S3StorageGetRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public S3Storage createS3Storage(S3StorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateS3Storage(S3StorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteS3Storage(S3StorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<S3Resource> getS3Resource(S3ResourceGetRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
 
@@ -169,6 +214,26 @@
     }
 
     @Override
+    public Optional<BoxStorage> getBoxStorage(BoxStorageGetRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public BoxStorage createBoxStorage(BoxStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateBoxStorage(BoxStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteBoxStorage(BoxStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<BoxResource> getBoxResource(BoxResourceGetRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
     }
@@ -189,6 +254,26 @@
     }
 
     @Override
+    public Optional<AzureStorage> getAzureStorage(AzureStorageGetRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public AzureStorage createAzureStorage(AzureStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateAzureStorage(AzureStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteAzureStorage(AzureStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<AzureResource> getAzureResource(AzureResourceGetRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
     }
@@ -209,6 +294,26 @@
     }
 
     @Override
+    public Optional<GCSStorage> getGCSStorage(GCSStorageGetRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public GCSStorage createGCSStorage(GCSStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateGCSStorage(GCSStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteGCSStorage(GCSStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<GCSResource> getGCSResource(GCSResourceGetRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
     }
@@ -229,6 +334,26 @@
     }
 
     @Override
+    public Optional<DropboxStorage> getDropboxStorage(DropboxStorageGetRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public DropboxStorage createDropboxStorage(DropboxStorageCreateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean updateDropboxStorage(DropboxStorageUpdateRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
+    public boolean deleteDropboxStorage(DropboxStorageDeleteRequest request) throws Exception {
+        throw new UnsupportedOperationException("Operation is not supported in backend");
+    }
+
+    @Override
     public Optional<DropboxResource> getDropboxResource(DropboxResourceGetRequest request) throws Exception {
         throw new UnsupportedOperationException("Operation is not supported in backend");
     }
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/AzureServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/AzureServiceHandler.java
index 1175ff2..6808096 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/AzureServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/AzureServiceHandler.java
@@ -23,6 +23,7 @@
 import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
 import org.apache.airavata.mft.resource.service.azure.AzureResourceServiceGrpc;
 import org.apache.airavata.mft.resource.stubs.azure.resource.*;
+import org.apache.airavata.mft.resource.stubs.azure.storage.*;
 import org.lognet.springboot.grpc.GRpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,4 +103,69 @@
         }
     }
 
+    @Override
+    public void getAzureStorage(AzureStorageGetRequest request, StreamObserver<AzureStorage> responseObserver) {
+        try {
+            this.backend.getAzureStorage(request).ifPresentOrElse(resource -> {
+                responseObserver.onNext(resource);
+                responseObserver.onCompleted();
+            }, () -> {
+                responseObserver.onError(Status.INTERNAL
+                        .withDescription("No Azure storage with id " + request.getStorageId())
+                        .asRuntimeException());
+            });
+        } catch (Exception e) {
+            logger.error("Failed in retrieving Azure resource with id {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in retrieving Azure resource with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void createAzureStorage(AzureStorageCreateRequest request, StreamObserver<AzureStorage> responseObserver) {
+        try {
+            responseObserver.onNext(this.backend.createAzureStorage(request));
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in creating the Azure storage", e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in creating the Azure resource")
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void updateAzureStorage(AzureStorageUpdateRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            this.backend.updateAzureStorage(request);
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in updating the Azure storage {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in updating the Azure storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void deleteAzureStorage(AzureStorageDeleteRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            boolean res = this.backend.deleteAzureStorage(request);
+            if (res) {
+                responseObserver.onCompleted();
+            } else {
+                responseObserver.onError(new Exception("Failed to delete Azure storage with id " + request.getStorageId()));
+            }
+        } catch (Exception e) {
+            logger.error("Failed in deleting the Azure storage {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in deleting the Azure storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
 }
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/BoxServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/BoxServiceHandler.java
index a17422c..5279d0a 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/BoxServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/BoxServiceHandler.java
@@ -23,6 +23,7 @@
 import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
 import org.apache.airavata.mft.resource.service.box.BoxResourceServiceGrpc;
 import org.apache.airavata.mft.resource.stubs.box.resource.*;
+import org.apache.airavata.mft.resource.stubs.box.storage.*;
 import org.lognet.springboot.grpc.GRpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +68,8 @@
             responseObserver.onError(Status.INTERNAL.withCause(e)
                     .withDescription("Failed in creating the Box resource")
                     .asRuntimeException());
-        }    }
+        }
+    }
 
     @Override
     public void updateBoxResource(BoxResourceUpdateRequest request, StreamObserver<Empty> responseObserver) {
@@ -80,7 +82,8 @@
             responseObserver.onError(Status.INTERNAL.withCause(e)
                     .withDescription("Failed in updating the Box resource with id " + request.getResourceId())
                     .asRuntimeException());
-        }    }
+        }
+    }
 
     @Override
     public void deleteBoxResource(BoxResourceDeleteRequest request, StreamObserver<Empty> responseObserver) {
@@ -99,4 +102,70 @@
                     .asRuntimeException());
         }
     }
+
+    @Override
+    public void getBoxStorage(BoxStorageGetRequest request, StreamObserver<BoxStorage> responseObserver) {
+        try {
+            this.backend.getBoxStorage(request).ifPresentOrElse(resource -> {
+                responseObserver.onNext(resource);
+                responseObserver.onCompleted();
+            }, () -> {
+                responseObserver.onError(Status.INTERNAL
+                        .withDescription("No Box storage with id " + request.getStorageId())
+                        .asRuntimeException());
+            });
+        } catch (Exception e) {
+            logger.error("Failed in retrieving Box resource with id {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in retrieving Box storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void createBoxStorage(BoxStorageCreateRequest request, StreamObserver<BoxStorage> responseObserver) {
+        try {
+            responseObserver.onNext(this.backend.createBoxStorage(request));
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in creating the Box storage", e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in creating the Box storage")
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void updateBoxStorage(BoxStorageUpdateRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            this.backend.updateBoxStorage(request);
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in updating the Box storage {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in updating the Box storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void deleteBoxStorage(BoxStorageDeleteRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            boolean res = this.backend.deleteBoxStorage(request);
+            if (res) {
+                responseObserver.onCompleted();
+            } else {
+                responseObserver.onError(new Exception("Failed to delete Box storage with id " + request.getStorageId()));
+            }
+        } catch (Exception e) {
+            logger.error("Failed in deleting the Box storage {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in deleting the Box storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
 }
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/DropboxServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/DropboxServiceHandler.java
index acdd1cc..0435361 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/DropboxServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/DropboxServiceHandler.java
@@ -23,6 +23,7 @@
 import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
 import org.apache.airavata.mft.resource.service.dropbox.DropboxServiceGrpc;
 import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.*;
 import org.lognet.springboot.grpc.GRpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,4 +102,70 @@
                     .asRuntimeException());
         }
     }
+
+    @Override
+    public void getDropboxStorage(DropboxStorageGetRequest request, StreamObserver<DropboxStorage> responseObserver) {
+        try {
+            this.backend.getDropboxStorage(request).ifPresentOrElse(resource -> {
+                responseObserver.onNext(resource);
+                responseObserver.onCompleted();
+            }, () -> {
+                responseObserver.onError(Status.INTERNAL
+                        .withDescription("No dropbox storage with id " + request.getStorageId())
+                        .asRuntimeException());
+            });
+        } catch (Exception e) {
+            logger.error("Failed in retrieving dropbox storage with id {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in retrieving dropbox storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void createDropboxStorage(DropboxStorageCreateRequest request, StreamObserver<DropboxStorage> responseObserver) {
+        try {
+            responseObserver.onNext(this.backend.createDropboxStorage(request));
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in creating the dropbox storage", e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in creating the dropbox storage")
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void updateDropboxStorage(DropboxStorageUpdateRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            this.backend.updateDropboxStorage(request);
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in updating the dropbox storage {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in updating the dropbox storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void deleteDropboxStorage(DropboxStorageDeleteRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            boolean res = this.backend.deleteDropboxStorage(request);
+            if (res) {
+                responseObserver.onCompleted();
+            } else {
+                responseObserver.onError(new Exception("Failed to delete dropbox storage with id " + request.getStorageId()));
+            }
+        } catch (Exception e) {
+            logger.error("Failed in deleting the dropbox storage {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in deleting the dropbox storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
 }
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GCSServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GCSServiceHandler.java
index 300e0e2..474ea62 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GCSServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/GCSServiceHandler.java
@@ -23,6 +23,7 @@
 import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
 import org.apache.airavata.mft.resource.service.gcs.GCSResourceServiceGrpc;
 import org.apache.airavata.mft.resource.stubs.gcs.resource.*;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.*;
 import org.lognet.springboot.grpc.GRpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,4 +102,70 @@
                     .asRuntimeException());
         }
     }
+
+    @Override
+    public void getGCSStorage(GCSStorageGetRequest request, StreamObserver<GCSStorage> responseObserver) {
+        try {
+            this.backend.getGCSStorage(request).ifPresentOrElse(resource -> {
+                responseObserver.onNext(resource);
+                responseObserver.onCompleted();
+            }, () -> {
+                responseObserver.onError(Status.INTERNAL
+                        .withDescription("No GCS storage with id " + request.getStorageId())
+                        .asRuntimeException());
+            });
+        } catch (Exception e) {
+            logger.error("Failed in retrieving GCS storage with id {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in retrieving GCS storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void createGCSStorage(GCSStorageCreateRequest request, StreamObserver<GCSStorage> responseObserver) {
+        try {
+            responseObserver.onNext(this.backend.createGCSStorage(request));
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in creating the GCS storage", e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in creating the GCS storage")
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void updateGCSStorage(GCSStorageUpdateRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            this.backend.updateGCSStorage(request);
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in updating the GCS storage {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in updating the GCS storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void deleteGCSStorage(GCSStorageDeleteRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            boolean res = this.backend.deleteGCSStorage(request);
+            if (res) {
+                responseObserver.onCompleted();
+            } else {
+                responseObserver.onError(new Exception("Failed to delete GCS storage with id " + request.getStorageId()));
+            }
+        } catch (Exception e) {
+            logger.error("Failed in deleting the GCS storage {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in deleting the GCS storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
 }
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/LocalServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/LocalServiceHandler.java
index 212bd24..122ac03 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/LocalServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/LocalServiceHandler.java
@@ -23,6 +23,7 @@
 import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
 import org.apache.airavata.mft.resource.service.local.LocalResourceServiceGrpc;
 import org.apache.airavata.mft.resource.stubs.local.resource.*;
+import org.apache.airavata.mft.resource.stubs.local.storage.*;
 import org.lognet.springboot.grpc.GRpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,4 +104,69 @@
         }
     }
 
+    @Override
+    public void getLocalStorage(LocalStorageGetRequest request, StreamObserver<LocalStorage> responseObserver) {
+        try {
+            this.backend.getLocalStorage(request).ifPresentOrElse(resource -> {
+                responseObserver.onNext(resource);
+                responseObserver.onCompleted();
+            }, () -> {
+                responseObserver.onError(Status.INTERNAL
+                        .withDescription("No Local storage with id " + request.getStorageId())
+                        .asRuntimeException());
+            });
+        } catch (Exception e) {
+            logger.error("Failed in retrieving storage with id {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in retrieving storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void createLocalStorage(LocalStorageCreateRequest request, StreamObserver<LocalStorage> responseObserver) {
+        try {
+            responseObserver.onNext(this.backend.createLocalStorage(request));
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in creating the local storage", e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in creating the local storage")
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void updateLocalStorage(LocalStorageUpdateRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            this.backend.updateLocalStorage(request);
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in updating the local storage {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in updating the local storge with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void deleteLocalStorage(LocalStorageDeleteRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            boolean res = this.backend.deleteLocalStorage(request);
+            if (res) {
+                responseObserver.onCompleted();
+            } else {
+                responseObserver.onError(new Exception("Failed to delete Local storage with id " + request.getStorageId()));
+            }
+        } catch (Exception e) {
+            logger.error("Failed in deleting the local storage {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in deleting the local storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
 }
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/S3ServiceHandler.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/S3ServiceHandler.java
index 753bd6a..53e0dcb 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/S3ServiceHandler.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/handler/S3ServiceHandler.java
@@ -23,6 +23,7 @@
 import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
 import org.apache.airavata.mft.resource.service.s3.S3ResourceServiceGrpc;
 import org.apache.airavata.mft.resource.stubs.s3.resource.*;
+import org.apache.airavata.mft.resource.stubs.s3.storage.*;
 import org.lognet.springboot.grpc.GRpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,4 +102,70 @@
                     .asRuntimeException());
         }
     }
+
+    @Override
+    public void getS3Storage(S3StorageGetRequest request, StreamObserver<S3Storage> responseObserver) {
+        try {
+            this.backend.getS3Storage(request).ifPresentOrElse(resource -> {
+                responseObserver.onNext(resource);
+                responseObserver.onCompleted();
+            }, () -> {
+                responseObserver.onError(Status.INTERNAL
+                        .withDescription("No S3 storage with id " + request.getStorageId())
+                        .asRuntimeException());
+            });
+        } catch (Exception e) {
+            logger.error("Failed in retrieving S3 storage with id {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in retrieving S3 storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void createS3Storage(S3StorageCreateRequest request, StreamObserver<S3Storage> responseObserver) {
+        try {
+            responseObserver.onNext(this.backend.createS3Storage(request));
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in creating the S3 storage", e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in creating the S3 storage")
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void updateS3Storage(S3StorageUpdateRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            this.backend.updateS3Storage(request);
+            responseObserver.onCompleted();
+        } catch (Exception e) {
+            logger.error("Failed in updating the S3 storage {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in updating the S3 storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
+
+    @Override
+    public void deleteS3Storage(S3StorageDeleteRequest request, StreamObserver<Empty> responseObserver) {
+        try {
+            boolean res = this.backend.deleteS3Storage(request);
+            if (res) {
+                responseObserver.onCompleted();
+            } else {
+                responseObserver.onError(new Exception("Failed to delete S3 storage with id " + request.getStorageId()));
+            }
+        } catch (Exception e) {
+            logger.error("Failed in deleting the S3 storage {}", request.getStorageId(), e);
+
+            responseObserver.onError(Status.INTERNAL.withCause(e)
+                    .withDescription("Failed in deleting the S3 storage with id " + request.getStorageId())
+                    .asRuntimeException());
+        }
+    }
 }
diff --git a/services/resource-service/server/src/main/resources/application.properties b/services/resource-service/server/src/main/resources/application.properties
index 51872df..3dbfa86 100644
--- a/services/resource-service/server/src/main/resources/application.properties
+++ b/services/resource-service/server/src/main/resources/application.properties
@@ -24,4 +24,5 @@
 airavata.backend.registry.server.port=8970
 
 # Configurations for File Backend
-file.backend.resource.file=resources.json
\ No newline at end of file
+file.backend.resource.file=resources.json
+file.backend.storage.file=storages.json
\ No newline at end of file
diff --git a/services/resource-service/server/src/main/resources/resources.json b/services/resource-service/server/src/main/resources/resources.json
index a389282..3d7b62f 100644
--- a/services/resource-service/server/src/main/resources/resources.json
+++ b/services/resource-service/server/src/main/resources/resources.json
@@ -4,114 +4,76 @@
     "resourceId":  "remote-ssh-dir-resource",
     "resourceMode": "DIRECTORY",
     "resourcePath": "/tmp",
-    "scpStorage" : {
-      "storageId": "remote-ssh-storage",
-      "host": "149.165.156.124",
-      "port": 22,
-      "user": "root"
-    }
+    "storageId" : "remote-ssh-storage"
   },
   {
     "type": "SCP",
     "resourceId":  "remote-ssh-resource",
     "resourceMode": "FILE",
     "resourcePath": "/tmp/1mb.txt",
-    "scpStorage" : {
-      "storageId": "remote-ssh-storage",
-      "host": "149.165.156.124",
-      "port": 22,
-      "user": "root"
-    }
+    "storageId" : "remote-ssh-storage"
   },
   {
     "type": "SCP",
     "resourceId":  "remote-ssh-resource2",
     "resourceMode": "FILE",
     "resourcePath": "/tmp/10mb.txt",
-    "scpStorage" : {
-      "storageId": "remote-ssh-storage",
-      "host": "149.165.156.124",
-      "port": 22,
-      "user": "root"
-    }
+    "storageId" : "remote-ssh-storage"
   },
   {
     "type": "LOCAL",
     "resourceId":  "10mb-file",
     "resourceMode": "FILE",
     "resourcePath": "/tmp/10mb.txt",
-    "localStorage": {
-      "storageId": "local-storage-1",
-      "agentId": "agent-0"
-    }
+    "storageId": "local-storage-1"
   },
   {
     "type": "S3",
     "resourceId": "s3-file",
     "resourceMode": "FILE",
     "resourcePath": "10mb-s3.txt",
-    "s3Storage": {
-      "storageId": "s3-storage-1",
-      "region": "us-east-2",
-      "bucketName": "airavata-s3"
-    }
+    "storageId": "s3-storage-1"
   },
   {
     "type": "BOX",
     "resourceId": "box-file-abcd",
     "resourceMode": "FILE",
     "resourcePath": "655108198452",
-    "boxStorage" : {
-      "storageId": "box-storage-1"
-    }
+    "storageId" : "box-storage-1"
   },
   {
     "type": "BOX",
     "resourceId": "box-file-efgh",
     "resourceMode": "FILE",
     "resourcePath": "655450661536",
-    "boxStorage" : {
-      "storageId": "box-storage-1"
-    }
+    "storageId" : "box-storage-1"
   },
   {
     "type": "AZURE",
     "resourceId": "azure-blob",
     "resourceMode": "FILE",
     "resourcePath": "sample.blob",
-    "azureStorage" : {
-      "storageId": "azure-storage-1",
-      "container": "sample-container"
-    }
+    "storageId" : "azure-storage-1"
   },
   {
     "type": "GCS",
     "resourceId": "gcs-bucket",
     "resourceMode": "FILE",
     "resourcePath": "PikaPikaTest.txt",
-    "gcsStorage": {
-      "storageId": "gcs-storage-1",
-      "bucketName": "pika-pika-bucket"
-    }
+    "storageId": "gcs-storage-1"
   },
   {
     "type": "DROPBOX",
     "resourceId": "dropbox-file",
     "resourceMode": "FILE",
     "resourcePath": "/test.txt",
-    "dropboxStorage": {
-      "storageId": "dropbox-storage-1"
-    }
+    "storageId": "dropbox-storage-1"
   },
   {
     "type": "FTP",
     "resourceId": "ftp-resource",
     "resourceMode": "FILE",
     "resourcePath": "mft-1mb.txt",
-    "ftpStorage": {
-      "storageId": "ftp-resource",
-      "host": "ftp.dlptest.com",
-      "port": "21"
-    }
+    "storageId": "ftp-resource"
   }
 ]
\ No newline at end of file
diff --git a/services/resource-service/server/src/main/resources/storages.json b/services/resource-service/server/src/main/resources/storages.json
new file mode 100644
index 0000000..69d0b13
--- /dev/null
+++ b/services/resource-service/server/src/main/resources/storages.json
@@ -0,0 +1,44 @@
+[
+  {
+    "type": "SCP",
+    "storageId": "remote-ssh-storage",
+    "host": "149.165.156.124",
+    "port": 22,
+    "user": "root"
+  },
+  {
+    "type": "LOCAL",
+    "storageId": "local-storage-1",
+    "agentId": "agent-0"
+  },
+  {
+    "type": "S3",
+    "storageId": "s3-storage-1",
+    "region": "us-east-2",
+    "bucketName": "airavata-s3"
+  },
+  {
+    "type": "BOX",
+    "storageId": "box-storage-1"
+  },
+  {
+    "type": "AZURE",
+    "storageId": "azure-storage-1",
+    "container": "sample-container"
+  },
+  {
+    "type": "GCS",
+    "storageId": "gcs-storage-1",
+    "bucketName": "pika-pika-bucket"
+  },
+  {
+    "type": "DROPBOX",
+    "storageId": "dropbox-storage-1"
+  },
+  {
+    "type": "FTP",
+    "storageId": "ftp-resource",
+    "host": "ftp.dlptest.com",
+    "port": "21"
+  }
+]
\ No newline at end of file
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
index 5a64d54..76a6f13 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
@@ -32,6 +32,9 @@
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.azure.resource.AzureResource;
 import org.apache.airavata.mft.resource.stubs.azure.resource.AzureResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorage;
+import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorageGetRequest;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 
@@ -95,7 +98,7 @@
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -104,7 +107,7 @@
         throw new UnsupportedOperationException("Method not implemented");    }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -115,6 +118,20 @@
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
         AzureResource azureResource = resourceClient.azure().getAzureResource(AzureResourceGetRequest.newBuilder().setResourceId(resourceId).build());
 
+        return isAvailable(azureResource, credentialToken);
+    }
+
+    @Override
+    public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+        AzureStorage azureStorage = resourceClient.azure().getAzureStorage(AzureStorageGetRequest.newBuilder().setStorageId(storageId).build());
+
+        AzureResource azureResource = AzureResource.newBuilder().setFile(FileResource.newBuilder().setResourcePath(resourcePath).build()).setAzureStorage(azureStorage).build();
+        return isAvailable(azureResource, credentialToken);
+    }
+
+    public Boolean isAvailable(AzureResource azureResource, String credentialToken) throws Exception {
+
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         AzureSecret azureSecret = secretClient.azure().getAzureSecret(AzureSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
@@ -132,4 +149,5 @@
         }
         return false;
     }
+
 }
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java
index a9bfdf7..dab9337 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java
@@ -29,8 +29,8 @@
 import org.apache.airavata.mft.credential.stubs.azure.AzureSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.azure.resource.AzureResource;
-import org.apache.airavata.mft.resource.stubs.azure.resource.AzureResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorage;
+import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -43,21 +43,20 @@
     private static final Logger logger = LoggerFactory.getLogger(AzureReceiver.class);
 
     private boolean initialized = false;
-    private AzureResource azureResource;
     BlobContainerClient containerClient;
 
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
         this.initialized = true;
 
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.azureResource = resourceClient.azure().getAzureResource(AzureResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        AzureStorage azureStorage = resourceClient.azure().getAzureStorage(AzureStorageGetRequest.newBuilder().setStorageId(storageId).build());
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         AzureSecret azureSecret = secretClient.azure().getAzureSecret(AzureSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
         BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(azureSecret.getConnectionString()).buildClient();
-        this.containerClient = blobServiceClient.getBlobContainerClient(azureResource.getAzureStorage().getContainer());
+        this.containerClient = blobServiceClient.getBlobContainerClient(azureStorage.getContainer());
     }
 
     @Override
@@ -72,49 +71,41 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
         logger.info("Starting azure receive for remote server for transfer {}", context.getTransferId());
         checkInitialized();
 
-        if (ResourceTypes.FILE.equals(this.azureResource.getResourceCase().name())) {
-            BlobClient blobClient = containerClient.getBlobClient(azureResource.getFile().getResourcePath());
-            BlobInputStream blobInputStream = blobClient.openInputStream();
+        BlobClient blobClient = containerClient.getBlobClient(targetPath);
+        BlobInputStream blobInputStream = blobClient.openInputStream();
 
-            OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+        OutputStream streamOs = context.getStreamBuffer().getOutputStream();
 
-            long fileSize = context.getMetadata().getResourceSize();
+        long fileSize = context.getMetadata().getResourceSize();
 
-            byte[] buf = new byte[1024];
-            while (true) {
-                int bufSize = 0;
+        byte[] buf = new byte[1024];
+        while (true) {
+            int bufSize = 0;
 
-                if (buf.length < fileSize) {
-                    bufSize = buf.length;
-                } else {
-                    bufSize = (int) fileSize;
-                }
-                bufSize = blobInputStream.read(buf, 0, bufSize);
+            if (buf.length < fileSize) {
+                bufSize = buf.length;
+            } else {
+                bufSize = (int) fileSize;
+            }
+            bufSize = blobInputStream.read(buf, 0, bufSize);
 
-                if (bufSize < 0) {
-                    break;
-                }
-
-                streamOs.write(buf, 0, bufSize);
-                streamOs.flush();
-
-                fileSize -= bufSize;
-                if (fileSize == 0L)
-                    break;
+            if (bufSize < 0) {
+                break;
             }
 
-            streamOs.close();
-            logger.info("Completed azure receive for remote server for transfer {}", context.getTransferId());
+            streamOs.write(buf, 0, bufSize);
+            streamOs.flush();
 
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.azureResource.getResourceId(), this.azureResource.getResourceCase().name());
-            throw new Exception("Resource " + this.azureResource.getResourceId() + " should be a FILE type. Found a " +
-                    this.azureResource.getResourceCase().name());
+            fileSize -= bufSize;
+            if (fileSize == 0L)
+                break;
         }
+
+        streamOs.close();
+        logger.info("Completed azure receive for remote server for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java
index 7dcff4b..ef3249e 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java
@@ -30,6 +30,8 @@
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.azure.resource.AzureResource;
 import org.apache.airavata.mft.resource.stubs.azure.resource.AzureResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorage;
+import org.apache.airavata.mft.resource.stubs.azure.storage.AzureStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -40,21 +42,20 @@
     private static final Logger logger = LoggerFactory.getLogger(AzureSender.class);
 
     private boolean initialized = false;
-    private AzureResource azureResource;
     BlobContainerClient containerClient;
 
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
         this.initialized = true;
 
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.azureResource = resourceClient.azure().getAzureResource(AzureResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        AzureStorage azureStorage = resourceClient.azure().getAzureStorage(AzureStorageGetRequest.newBuilder().setStorageId(storageId).build());
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         AzureSecret azureSecret = secretClient.azure().getAzureSecret(AzureSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
         BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(azureSecret.getConnectionString()).buildClient();
-        this.containerClient = blobServiceClient.getBlobContainerClient(azureResource.getAzureStorage().getContainer());
+        this.containerClient = blobServiceClient.getBlobContainerClient(azureStorage.getContainer());
     }
 
     @Override
@@ -69,20 +70,13 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
         logger.info("Starting Azure send for remote server for transfer {}", context.getTransferId());
         checkInitialized();
 
-        if (ResourceTypes.FILE.equals(this.azureResource.getResourceCase().name())) {
-            BlockBlobClient blockBlobClient = containerClient.getBlobClient(azureResource.getFile().getResourcePath()).getBlockBlobClient();
-            blockBlobClient.upload(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize(), true);
-            logger.info("Completed Azure send for remote server for transfer {}", context.getTransferId());
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.azureResource.getResourceId(), this.azureResource.getResourceCase().name());
-            throw new Exception("Resource " + this.azureResource.getResourceId() + " should be a FILE type. Found a " +
-                    this.azureResource.getResourceCase().name());
-        }
+        BlockBlobClient blockBlobClient = containerClient.getBlobClient(targetPath).getBlockBlobClient();
+        blockBlobClient.upload(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize(), true);
+        logger.info("Completed Azure send for remote server for transfer {}", context.getTransferId());
 
     }
 }
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
index 8e21c9f..5e77e72 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
@@ -30,6 +30,9 @@
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.box.resource.BoxResource;
 import org.apache.airavata.mft.resource.stubs.box.resource.BoxResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.box.storage.BoxStorage;
+import org.apache.airavata.mft.resource.stubs.box.storage.BoxStorageGetRequest;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 
@@ -84,7 +87,7 @@
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -93,7 +96,7 @@
         throw new UnsupportedOperationException("Method not implemented");    }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -104,7 +107,22 @@
 
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
         BoxResource boxResource = resourceClient.box().getBoxResource(BoxResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        return isAvailable(boxResource, credentialToken);
+    }
 
+    @Override
+    public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+        checkInitialized();
+
+        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+        BoxStorage boxStorage = resourceClient.box().getBoxStorage(BoxStorageGetRequest.newBuilder().setStorageId(storageId).build());
+
+        BoxResource boxResource = BoxResource.newBuilder().setFile(FileResource.newBuilder().setResourcePath(resourcePath).build()).setBoxStorage(boxStorage).build();
+
+        return isAvailable(boxResource, credentialToken);
+    }
+
+    private Boolean isAvailable(BoxResource boxResource, String credentialToken) throws Exception {
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         BoxSecret boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java
index 907b485..e041e35 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java
@@ -29,6 +29,8 @@
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.box.resource.BoxResource;
 import org.apache.airavata.mft.resource.stubs.box.resource.BoxResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.box.storage.BoxStorage;
+import org.apache.airavata.mft.resource.stubs.box.storage.BoxStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -40,20 +42,14 @@
 public class BoxReceiver implements Connector {
 
     private static final Logger logger = LoggerFactory.getLogger(BoxReceiver.class);
-    private BoxSecret boxSecret;
-    private BoxResource boxResource;
     private BoxAPIConnection boxClient;
 
-
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
                      String secretServiceHost, int secretServicePort) throws Exception {
 
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        boxResource = resourceClient.box().getBoxResource(BoxResourceGetRequest.newBuilder().setResourceId(resourceId).build());
-
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        BoxSecret boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
         boxClient = new BoxAPIConnection(boxSecret.getAccessToken());
     }
@@ -64,25 +60,17 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
 
-        if (ResourceTypes.FILE.equals(this.boxResource.getResourceCase().name())) {
-            logger.info("Starting Box Receiver stream for transfer {}", context.getTransferId());
+        logger.info("Starting Box Receiver stream for transfer {}", context.getTransferId());
 
-            BoxFile file = new BoxFile(this.boxClient, this.boxResource.getFile().getResourcePath());
+        BoxFile file = new BoxFile(this.boxClient, targetPath);
 
-            OutputStream os = context.getStreamBuffer().getOutputStream();
-            file.download(os);
-            os.flush();
-            os.close();
+        OutputStream os = context.getStreamBuffer().getOutputStream();
+        file.download(os);
+        os.flush();
+        os.close();
 
-            logger.info("Completed Box Receiver stream for transfer {}", context.getTransferId());
-
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.boxResource.getResourceId(), this.boxResource.getResourceCase().name());
-            throw new Exception("Resource " + this.boxResource.getResourceId() + " should be a FILE type. Found a " +
-                    this.boxResource.getResourceCase().name());
-        }
+        logger.info("Completed Box Receiver stream for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java
index c9b279f..6cd50c5 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java
@@ -37,19 +37,13 @@
 public class BoxSender implements Connector {
 
     private static final Logger logger = LoggerFactory.getLogger(BoxSender.class);
-    private BoxSecret boxSecret;
-    private BoxResource boxResource;
     private BoxAPIConnection boxClient;
 
-
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
-
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        boxResource = resourceClient.box().getBoxResource(BoxResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
-        boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+        BoxSecret boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
         boxClient = new BoxAPIConnection(boxSecret.getAccessToken());
     }
@@ -60,29 +54,22 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
 
         logger.info("Starting Box Sender stream for transfer {}", context.getTransferId());
         logger.debug("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
 
-        if (ResourceTypes.FILE.equals(this.boxResource.getResourceCase().name())) {
-            BoxFile file = new BoxFile(this.boxClient, this.boxResource.getFile().getResourcePath());
+        BoxFile file = new BoxFile(this.boxClient, targetPath);
 
-            // Upload chunks only if the file size is > 20mb
-            // Ref: https://developer.box.com/guides/uploads/chunked/
-            if (context.getMetadata().getResourceSize() > 20971520) {
-                file.uploadLargeFile(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize());
-            } else {
-                file.uploadNewVersion(context.getStreamBuffer().getInputStream());
-            }
-
-            logger.info("Completed Box Sender stream for transfer {}", context.getTransferId());
-
+        // Upload chunks only if the file size is > 20mb
+        // Ref: https://developer.box.com/guides/uploads/chunked/
+        if (context.getMetadata().getResourceSize() > 20971520) {
+            file.uploadLargeFile(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize());
         } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.boxResource.getResourceId(), this.boxResource.getResourceCase().name());
-            throw new Exception("Resource " + this.boxResource.getResourceId() + " should be a FILE type. Found a " +
-                    this.boxResource.getResourceCase().name());
+            file.uploadNewVersion(context.getStreamBuffer().getInputStream());
         }
+
+        logger.info("Completed Box Sender stream for transfer {}", context.getTransferId());
+
     }
 }
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
index 906fb0b..2d00e10 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
@@ -28,8 +28,11 @@
 import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.resource.stubs.dropbox.resource.DropboxResource;
 import org.apache.airavata.mft.resource.stubs.dropbox.resource.DropboxResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.DropboxStorage;
+import org.apache.airavata.mft.resource.stubs.dropbox.storage.DropboxStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 
@@ -78,7 +81,7 @@
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -87,7 +90,7 @@
         throw new UnsupportedOperationException("Method not implemented");    }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -97,6 +100,23 @@
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
         DropboxResource dropboxResource = resourceClient.dropbox().getDropboxResource(DropboxResourceGetRequest.newBuilder().setResourceId(resourceId).build());
 
+        return isAvailable(dropboxResource, credentialToken);
+    }
+
+    @Override
+    public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+
+        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+        DropboxStorage dropboxStorage = resourceClient.dropbox().getDropboxStorage(DropboxStorageGetRequest.newBuilder().setStorageId(storageId).build());
+
+        DropboxResource dropboxResource = DropboxResource.newBuilder()
+                .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
+                .setDropboxStorage(dropboxStorage).build();
+
+        return isAvailable(dropboxResource, credentialToken);
+    }
+
+    private Boolean isAvailable(DropboxResource dropboxResource, String credentialToken) throws Exception {
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         DropboxSecret dropboxSecret = secretClient.dropbox().getDropboxSecret(DropboxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java
index 5bdf5a9..d694f91 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java
@@ -40,14 +40,10 @@
 
     private static final Logger logger = LoggerFactory.getLogger(DropboxReceiver.class);
 
-    private DropboxResource dropboxResource;
     private DbxClientV2 dbxClientV2;
 
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.dropboxResource = resourceClient.dropbox().getDropboxResource(DropboxResourceGetRequest.newBuilder().setResourceId(resourceId).build());
-
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         DropboxSecret dropboxSecret = secretClient.dropbox().getDropboxSecret(DropboxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
@@ -61,48 +57,41 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
 
-        if (ResourceTypes.FILE.equals(this.dropboxResource.getResourceCase().name())) {
-            logger.info("Starting Dropbox Receiver stream for transfer {}", context.getTransferId());
+        logger.info("Starting Dropbox Receiver stream for transfer {}", context.getTransferId());
 
-            InputStream inputStream = dbxClientV2.files().download(this.dropboxResource.getFile().getResourcePath()).getInputStream();
-            OutputStream os = context.getStreamBuffer().getOutputStream();
-            int read;
-            long bytes = 0;
-            long fileSize = context.getMetadata().getResourceSize();
-            byte[] buf = new byte[1024];
-            while (true) {
-                int bufSize = 0;
+        InputStream inputStream = dbxClientV2.files().download(targetPath).getInputStream();
+        OutputStream os = context.getStreamBuffer().getOutputStream();
+        int read;
+        long bytes = 0;
+        long fileSize = context.getMetadata().getResourceSize();
+        byte[] buf = new byte[1024];
+        while (true) {
+            int bufSize = 0;
 
-                if (buf.length < fileSize) {
-                    bufSize = buf.length;
-                } else {
-                    bufSize = (int) fileSize;
-                }
-                bufSize = inputStream.read(buf, 0, bufSize);
+            if (buf.length < fileSize) {
+                bufSize = buf.length;
+            } else {
+                bufSize = (int) fileSize;
+            }
+            bufSize = inputStream.read(buf, 0, bufSize);
 
-                if (bufSize < 0) {
-                    break;
-                }
-
-                os.write(buf, 0, bufSize);
-                os.flush();
-
-                fileSize -= bufSize;
-                if (fileSize == 0L)
-                    break;
+            if (bufSize < 0) {
+                break;
             }
 
-            os.close();
+            os.write(buf, 0, bufSize);
+            os.flush();
 
-            logger.info("Completed Dropbox Receiver stream for transfer {}", context.getTransferId());
-
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.dropboxResource.getResourceId(), this.dropboxResource.getResourceCase().name());
-            throw new Exception("Resource " + this.dropboxResource.getResourceId() + " should be a FILE type. Found a " +
-                    this.dropboxResource.getResourceCase().name());
+            fileSize -= bufSize;
+            if (fileSize == 0L)
+                break;
         }
+
+        os.close();
+
+        logger.info("Completed Dropbox Receiver stream for transfer {}", context.getTransferId());
+
     }
 }
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java
index ffab965..da8f8b7 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java
@@ -40,13 +40,10 @@
 
     private static final Logger logger = LoggerFactory.getLogger(DropboxSender.class);
 
-    private DropboxResource dropboxResource;
     private DbxClientV2 dbxClientV2;
 
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.dropboxResource = resourceClient.dropbox().getDropboxResource(DropboxResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         DropboxSecret dropboxSecret = secretClient.dropbox().getDropboxSecret(DropboxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
@@ -61,24 +58,15 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
         logger.info("Starting Dropbox Sender stream for transfer {}", context.getTransferId());
         logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
 
+        FileMetadata metadata = dbxClientV2.files().uploadBuilder(targetPath)
+                .withMode(WriteMode.OVERWRITE)
+                .uploadAndFinish(context.getStreamBuffer().getInputStream());
+        logger.info("Completed Dropbox Sender stream for transfer {}", context.getTransferId());
 
 
-        if (ResourceTypes.FILE.equals(this.dropboxResource.getResourceCase().name())) {
-            FileMetadata metadata = dbxClientV2.files().uploadBuilder(dropboxResource.getFile().getResourcePath())
-                    .withMode(WriteMode.OVERWRITE)
-                    .uploadAndFinish(context.getStreamBuffer().getInputStream());
-            logger.info("Completed Dropbox Sender stream for transfer {}", context.getTransferId());
-
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.dropboxResource.getResourceId(), this.dropboxResource.getResourceCase().name());
-            throw new Exception("Resource " + this.dropboxResource.getResourceId() + " should be a FILE type. Found a " +
-                    this.dropboxResource.getResourceCase().name());
-        }
-
     }
 }
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
index 66675f3..5f6f7af 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
@@ -25,8 +25,11 @@
 import org.apache.airavata.mft.credential.stubs.ftp.FTPSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResource;
 import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorage;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.apache.commons.net.ftp.FTPClient;
@@ -74,7 +77,7 @@
         FileResourceMetadata resourceMetadata = new FileResourceMetadata();
         FTPClient ftpClient = null;
         try {
-            ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret);
+            ftpClient = FTPTransportUtil.getFTPClient(ftpResource.getFtpStorage(), ftpSecret);
             logger.info("Fetching metadata for resource {} in {}", ftpResource.getFile().getResourcePath(), ftpResource.getFtpStorage().getHost());
 
             FTPFile ftpFile = ftpClient.mlistFile(ftpResource.getFile().getResourcePath());
@@ -99,7 +102,7 @@
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -108,7 +111,7 @@
         throw new UnsupportedOperationException("Method not implemented");    }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -119,12 +122,32 @@
 
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
         FTPResource ftpResource = resourceClient.ftp().getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+
+        return isAvailable(ftpResource, credentialToken);
+    }
+
+    @Override
+    public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+
+        checkInitialized();
+
+        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+        FTPStorage ftpStorage = resourceClient.ftp().getFTPStorage(FTPStorageGetRequest.newBuilder().setStorageId(storageId).build());
+
+        FTPResource ftpResource = FTPResource.newBuilder()
+                .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
+                .setFtpStorage(ftpStorage).build();
+        return isAvailable(ftpResource, credentialToken);
+    }
+
+    public Boolean isAvailable(FTPResource ftpResource, String credentialToken) {
+
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         FTPSecret ftpSecret = secretClient.ftp().getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
         FTPClient ftpClient = null;
         try {
-            ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret);
+            ftpClient = FTPTransportUtil.getFTPClient(ftpResource.getFtpStorage(), ftpSecret);
             InputStream inputStream = null;
 
             switch (ftpResource.getResourceCase().name()){
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
index 9225b0e..f3b4ac8 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
@@ -26,6 +26,8 @@
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResource;
 import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorage;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.apache.commons.net.ftp.FTPClient;
@@ -39,21 +41,20 @@
 
     private static final Logger logger = LoggerFactory.getLogger(FTPReceiver.class);
 
-    private FTPResource resource;
     private boolean initialized;
     private FTPClient ftpClient;
 
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
         this.initialized = true;
 
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.resource = resourceClient.ftp().getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        FTPStorage ftpStorage = resourceClient.ftp().getFTPStorage(FTPStorageGetRequest.newBuilder().setStorageId(storageId).build());
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         FTPSecret ftpSecret = secretClient.ftp().getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
-        this.ftpClient = FTPTransportUtil.getFTPClient(this.resource, ftpSecret);
+        this.ftpClient = FTPTransportUtil.getFTPClient(ftpStorage, ftpSecret);
     }
 
     @Override
@@ -62,50 +63,43 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
 
-        if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
-            logger.info("Starting FTP receiver stream for transfer {}", context.getTransferId());
+        logger.info("Starting FTP receiver stream for transfer {}", context.getTransferId());
 
-            checkInitialized();
-            OutputStream streamOs = context.getStreamBuffer().getOutputStream();
-            InputStream inputStream = ftpClient.retrieveFileStream(resource.getFile().getResourcePath());
+        checkInitialized();
+        OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+        InputStream inputStream = ftpClient.retrieveFileStream(targetPath);
 
-            long fileSize = context.getMetadata().getResourceSize();
+        long fileSize = context.getMetadata().getResourceSize();
 
-            byte[] buf = new byte[1024];
-            while (true) {
-                int bufSize;
+        byte[] buf = new byte[1024];
+        while (true) {
+            int bufSize;
 
-                if (buf.length < fileSize) {
-                    bufSize = buf.length;
-                } else {
-                    bufSize = (int) fileSize;
-                }
-                bufSize = inputStream.read(buf, 0, bufSize);
+            if (buf.length < fileSize) {
+                bufSize = buf.length;
+            } else {
+                bufSize = (int) fileSize;
+            }
+            bufSize = inputStream.read(buf, 0, bufSize);
 
-                if (bufSize < 0) {
-                    break;
-                }
-
-                streamOs.write(buf, 0, bufSize);
-                streamOs.flush();
-
-                fileSize -= bufSize;
-                if (fileSize == 0L)
-                    break;
+            if (bufSize < 0) {
+                break;
             }
 
-            inputStream.close();
-            streamOs.close();
-            logger.info("Completed FTP receiver stream for transfer {}", context.getTransferId());
+            streamOs.write(buf, 0, bufSize);
+            streamOs.flush();
 
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.resource.getResourceId(), this.resource.getResourceCase().name());
-            throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
-                    this.resource.getResourceCase().name());
+            fileSize -= bufSize;
+            if (fileSize == 0L)
+                break;
         }
+
+        inputStream.close();
+        streamOs.close();
+        logger.info("Completed FTP receiver stream for transfer {}", context.getTransferId());
+
     }
 
     private void checkInitialized() {
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
index f3a55f7..5579cb8 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
@@ -18,14 +18,13 @@
 package org.apache.airavata.mft.transport.ftp;
 
 import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.ftp.FTPSecret;
 import org.apache.airavata.mft.credential.stubs.ftp.FTPSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResource;
-import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorage;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.apache.commons.net.ftp.FTPClient;
@@ -39,21 +38,20 @@
 
     private static final Logger logger = LoggerFactory.getLogger(FTPReceiver.class);
 
-    private FTPResource resource;
     private boolean initialized;
     private FTPClient ftpClient;
 
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
         this.initialized = true;
 
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.resource = resourceClient.ftp().getFTPResource(FTPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        FTPStorage ftpStorage = resourceClient.ftp().getFTPStorage(FTPStorageGetRequest.newBuilder().setStorageId(storageId).build());
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         FTPSecret ftpSecret = secretClient.ftp().getFTPSecret(FTPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
-        this.ftpClient = FTPTransportUtil.getFTPClient(this.resource, ftpSecret);
+        this.ftpClient = FTPTransportUtil.getFTPClient(ftpStorage, ftpSecret);
     }
 
     @Override
@@ -62,7 +60,7 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
 
         logger.info("Starting FTP sender stream for transfer {}", context.getTransferId());
 
@@ -70,42 +68,35 @@
 
         logger.info("Completed FTP sender stream for transfer {}", context.getTransferId());
 
-        if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
-            InputStream in = context.getStreamBuffer().getInputStream();
-            long fileSize = context.getMetadata().getResourceSize();
-            OutputStream outputStream = ftpClient.storeFileStream(resource.getFile().getResourcePath());
+        InputStream in = context.getStreamBuffer().getInputStream();
+        long fileSize = context.getMetadata().getResourceSize();
+        OutputStream outputStream = ftpClient.storeFileStream(targetPath);
 
-            byte[] buf = new byte[1024];
-            while (true) {
-                int bufSize;
+        byte[] buf = new byte[1024];
+        while (true) {
+            int bufSize;
 
-                if (buf.length < fileSize) {
-                    bufSize = buf.length;
-                } else {
-                    bufSize = (int) fileSize;
-                }
-                bufSize = in.read(buf, 0, bufSize);
+            if (buf.length < fileSize) {
+                bufSize = buf.length;
+            } else {
+                bufSize = (int) fileSize;
+            }
+            bufSize = in.read(buf, 0, bufSize);
 
-                if (bufSize < 0) {
-                    break;
-                }
-
-                outputStream.write(buf, 0, bufSize);
-                outputStream.flush();
-
-                fileSize -= bufSize;
-                if (fileSize == 0L)
-                    break;
+            if (bufSize < 0) {
+                break;
             }
 
-            in.close();
-            outputStream.close();
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.resource.getResourceId(), this.resource.getResourceCase().name());
-            throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
-                    this.resource.getResourceCase().name());
+            outputStream.write(buf, 0, bufSize);
+            outputStream.flush();
+
+            fileSize -= bufSize;
+            if (fileSize == 0L)
+                break;
         }
+
+        in.close();
+        outputStream.close();
     }
 
     private void checkInitialized() {
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java
index 7a1e3c8..988a208 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPTransportUtil.java
@@ -18,7 +18,7 @@
 package org.apache.airavata.mft.transport.ftp;
 
 import org.apache.airavata.mft.credential.stubs.ftp.FTPSecret;
-import org.apache.airavata.mft.resource.stubs.ftp.resource.FTPResource;
+import org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorage;
 import org.apache.commons.net.ftp.FTPClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,10 +29,10 @@
 
     private static final Logger logger = LoggerFactory.getLogger(FTPTransportUtil.class);
 
-    static FTPClient getFTPClient(FTPResource ftpResource, FTPSecret ftpSecret) throws IOException {
+    static FTPClient getFTPClient(FTPStorage ftpStorage, FTPSecret ftpSecret) throws IOException {
 
         FTPClient ftpClient = new FTPClient();
-        ftpClient.connect(ftpResource.getFtpStorage().getHost(), ftpResource.getFtpStorage().getPort());
+        ftpClient.connect(ftpStorage.getHost(), ftpStorage.getPort());
         ftpClient.enterLocalActiveMode();
         ftpClient.login(ftpSecret.getUserId(), ftpSecret.getPassword());
 
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
index aa97091..0d1a1a8 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
@@ -33,8 +33,11 @@
 import org.apache.airavata.mft.credential.stubs.gcs.GCSSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.resource.stubs.gcs.resource.GCSResource;
 import org.apache.airavata.mft.resource.stubs.gcs.resource.GCSResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorage;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 
@@ -99,7 +102,7 @@
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -108,7 +111,7 @@
         throw new UnsupportedOperationException("Method not implemented");    }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -118,6 +121,24 @@
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
         GCSResource gcsResource = resourceClient.gcs().getGCSResource(GCSResourceGetRequest.newBuilder().setResourceId(resourceId).build());
 
+        return isAvailable(gcsResource, credentialToken);
+    }
+
+    @Override
+    public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+        checkInitialized();
+        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+        GCSStorage gcsStorage = resourceClient.gcs().getGCSStorage(GCSStorageGetRequest.newBuilder().setStorageId(storageId).build());
+
+        GCSResource gcsResource = GCSResource.newBuilder()
+                .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
+                .setGcsStorage(gcsStorage).build();
+
+        return isAvailable(gcsResource, credentialToken);
+    }
+
+    private Boolean isAvailable(GCSResource gcsResource, String credentialToken) throws Exception {
+
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         GCSSecret gcsSecret = secretClient.gcs().getGCSSecret(GCSSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
@@ -134,10 +155,10 @@
         switch (gcsResource.getResourceCase().name()){
             case ResourceTypes.FILE:
                 return !storage.objects().get(gcsResource.getGcsStorage().getBucketName(), gcsResource.getFile().getResourcePath())
-                                            .execute().isEmpty();
+                        .execute().isEmpty();
             case ResourceTypes.DIRECTORY:
                 return !storage.objects().get(gcsResource.getGcsStorage().getBucketName(), gcsResource.getDirectory().getResourcePath())
-                                            .execute().isEmpty();
+                        .execute().isEmpty();
         }
         return false;
     }
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java
index 03fd31c..d734f92 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java
@@ -33,6 +33,8 @@
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.gcs.resource.GCSResource;
 import org.apache.airavata.mft.resource.stubs.gcs.resource.GCSResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorage;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -49,16 +51,18 @@
 
     private static final Logger logger = LoggerFactory.getLogger(GCSReceiver.class);
 
-    private GCSResource gcsResource;
     private Storage storage;
+    private GCSStorage gcsStorage;
 
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.gcsResource = resourceClient.gcs().getGCSResource(GCSResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        this.gcsStorage = resourceClient.gcs().getGCSStorage(GCSStorageGetRequest.newBuilder().setStorageId(storageId).build());
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         GCSSecret gcsSecret = secretClient.gcs().getGCSSecret(GCSSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
+
         HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
         JsonFactory jsonFactory = new JacksonFactory();
         String jsonString = gcsSecret.getCredentialsJson();
@@ -76,48 +80,40 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
         logger.info("Starting GCS Receiver stream for transfer {}", context.getTransferId());
 
-        if (ResourceTypes.FILE.equals(this.gcsResource.getResourceCase().name())) {
-            InputStream inputStream = storage.objects().get(this.gcsResource.getGcsStorage().getBucketName(),
-                                        this.gcsResource.getFile().getResourcePath()).executeMediaAsInputStream();
-            OutputStream os = context.getStreamBuffer().getOutputStream();
-            int read;
-            long bytes = 0;
-            long fileSize = context.getMetadata().getResourceSize();
-            byte[] buf = new byte[1024];
-            while (true) {
-                int bufSize = 0;
+        InputStream inputStream = storage.objects().get(this.gcsStorage.getBucketName(), targetPath).executeMediaAsInputStream();
+        OutputStream os = context.getStreamBuffer().getOutputStream();
+        int read;
+        long bytes = 0;
+        long fileSize = context.getMetadata().getResourceSize();
+        byte[] buf = new byte[1024];
+        while (true) {
+            int bufSize = 0;
 
-                if (buf.length < fileSize) {
-                    bufSize = buf.length;
-                } else {
-                    bufSize = (int) fileSize;
-                }
-                bufSize = inputStream.read(buf, 0, bufSize);
+            if (buf.length < fileSize) {
+                bufSize = buf.length;
+            } else {
+                bufSize = (int) fileSize;
+            }
+            bufSize = inputStream.read(buf, 0, bufSize);
 
-                if (bufSize < 0) {
-                    break;
-                }
-
-                os.write(buf, 0, bufSize);
-                os.flush();
-
-                fileSize -= bufSize;
-                if (fileSize == 0L)
-                    break;
+            if (bufSize < 0) {
+                break;
             }
 
-            os.close();
+            os.write(buf, 0, bufSize);
+            os.flush();
 
-            logger.info("Completed GCS Receiver stream for transfer {}", context.getTransferId());
-
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.gcsResource.getResourceId(), this.gcsResource.getResourceCase().name());
-            throw new Exception("Resource " + this.gcsResource.getResourceId() + " should be a FILE type. Found a " +
-                    this.gcsResource.getResourceCase().name());
+            fileSize -= bufSize;
+            if (fileSize == 0L)
+                break;
         }
+
+        os.close();
+
+        logger.info("Completed GCS Receiver stream for transfer {}", context.getTransferId());
+
     }
 }
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java
index 368d24d..4d0ef29 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java
@@ -31,14 +31,13 @@
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.gcs.GCSSecret;
 import org.apache.airavata.mft.credential.stubs.gcs.GCSSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.gcs.resource.GCSResource;
-import org.apache.airavata.mft.resource.stubs.gcs.resource.GCSResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorage;
+import org.apache.airavata.mft.resource.stubs.gcs.storage.GCSStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -54,15 +53,15 @@
 
     private static final Logger logger = LoggerFactory.getLogger(GCSSender.class);
 
-    private GCSResource gcsResource;
     private Storage storage;
+    private GCSStorage gcsStorage;
     private JsonObject jsonObject;
 
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
 
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.gcsResource = resourceClient.gcs().getGCSResource(GCSResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        this.gcsStorage = resourceClient.gcs().getGCSStorage(GCSStorageGetRequest.newBuilder().setStorageId(storageId).build());
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         GCSSecret gcsSecret = secretClient.gcs().getGCSSecret(GCSSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
@@ -86,33 +85,23 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
         logger.info("Starting GCS Sender stream for transfer {}", context.getTransferId());
         logger.debug("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
 
-        if (ResourceTypes.FILE.equals(this.gcsResource.getResourceCase().name())) {
+        InputStreamContent contentStream = new InputStreamContent(
+                null, context.getStreamBuffer().getInputStream());
+        String entityUser = jsonObject.get("client_email").getAsString();
+        StorageObject objectMetadata = new StorageObject()
+                // Set the destination object name
+                .setName(targetPath)
+                // Set the access control list to publicly read-only
+                .setAcl(Arrays.asList(new ObjectAccessControl().setEntity("user-" + entityUser).setRole("OWNER")));
 
-            InputStreamContent contentStream = new InputStreamContent(
-                    null, context.getStreamBuffer().getInputStream());
-            String entityUser = jsonObject.get("client_email").getAsString();
-            StorageObject objectMetadata = new StorageObject()
-                    // Set the destination object name
-                    .setName(this.gcsResource.getFile().getResourcePath())
-                    // Set the access control list to publicly read-only
-                    .setAcl(Arrays.asList(new ObjectAccessControl().setEntity("user-" + entityUser).setRole("OWNER")));
+        Insert insertRequest = storage.objects().insert(this.gcsStorage.getBucketName(), objectMetadata, contentStream);
 
-            Insert insertRequest = storage.objects().insert(this.gcsResource.getGcsStorage().getBucketName(), objectMetadata, contentStream);
+        insertRequest.execute();
 
-            insertRequest.execute();
-
-            logger.info("Completed GCS Sender stream for transfer {}", context.getTransferId());
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.gcsResource.getResourceId(), this.gcsResource.getResourceCase().name());
-            throw new Exception("Resource " + this.gcsResource.getResourceId() + " should be a FILE type. Found a " +
-                    this.gcsResource.getResourceCase().name());
-        }
-
-
+        logger.info("Completed GCS Sender stream for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
index f24c70f..9faa70a 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
@@ -23,8 +23,11 @@
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.resource.stubs.local.resource.LocalResource;
 import org.apache.airavata.mft.resource.stubs.local.resource.LocalResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.local.storage.LocalStorage;
+import org.apache.airavata.mft.resource.stubs.local.storage.LocalStorageGetRequest;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -92,7 +95,7 @@
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -101,15 +104,29 @@
         throw new UnsupportedOperationException("Method not implemented");    }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
     @Override
     public Boolean isAvailable(String resourceId, String credentialToken) throws Exception {
-       ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
         LocalResource localResource = resourceClient.local().getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        return isAvailable(localResource, credentialToken);
+    }
 
+    @Override
+    public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+        LocalStorage localStorage = resourceClient.local().getLocalStorage(LocalStorageGetRequest.newBuilder().setStorageId(storageId).build());
+
+        LocalResource localResource = LocalResource.newBuilder()
+                .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
+                .setLocalStorage(localStorage).build();
+        return isAvailable(localResource, credentialToken);
+    }
+
+    public Boolean isAvailable(LocalResource localResource, String credentialToken) throws Exception {
         switch (localResource.getResourceCase().name()){
             case ResourceTypes.FILE:
                 return new File(localResource.getFile().getResourcePath()).exists();
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
index 1b61fb1..53776ff 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
@@ -18,12 +18,7 @@
 package org.apache.airavata.mft.transport.local;
 
 import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.local.resource.LocalResource;
-import org.apache.airavata.mft.resource.stubs.local.resource.LocalResourceGetRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,16 +28,12 @@
 
     private static final Logger logger = LoggerFactory.getLogger(LocalReceiver.class);
 
-    private LocalResource resource;
     private boolean initialized;
 
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
                      String secretServiceHost, int secretServicePort) throws Exception {
         this.initialized = true;
-
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.resource = resourceClient.local().getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
     }
 
     @Override
@@ -57,50 +48,41 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
         logger.info("Starting local receiver stream for transfer {}", context.getTransferId());
 
         checkInitialized();
 
+        OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+        FileInputStream fis = new FileInputStream(new File(targetPath));
 
-        if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
-            OutputStream streamOs = context.getStreamBuffer().getOutputStream();
-            FileInputStream fis = new FileInputStream(new File(resource.getFile().getResourcePath()));
+        long fileSize = context.getMetadata().getResourceSize();
 
-            long fileSize = context.getMetadata().getResourceSize();
+        byte[] buf = new byte[1024];
+        while (true) {
+            int bufSize = 0;
 
-            byte[] buf = new byte[1024];
-            while (true) {
-                int bufSize = 0;
+            if (buf.length < fileSize) {
+                bufSize = buf.length;
+            } else {
+                bufSize = (int) fileSize;
+            }
+            bufSize = fis.read(buf, 0, bufSize);
 
-                if (buf.length < fileSize) {
-                    bufSize = buf.length;
-                } else {
-                    bufSize = (int) fileSize;
-                }
-                bufSize = fis.read(buf, 0, bufSize);
-
-                if (bufSize < 0) {
-                    break;
-                }
-
-                streamOs.write(buf, 0, bufSize);
-                streamOs.flush();
-
-                fileSize -= bufSize;
-                if (fileSize == 0L)
-                    break;
+            if (bufSize < 0) {
+                break;
             }
 
-            fis.close();
-            streamOs.close();
-            logger.info("Completed local receiver stream for transfer {}", context.getTransferId());
+            streamOs.write(buf, 0, bufSize);
+            streamOs.flush();
 
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.resource.getResourceId(), this.resource.getResourceCase().name());
-            throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
-                    this.resource.getResourceCase().name());
+            fileSize -= bufSize;
+            if (fileSize == 0L)
+                break;
         }
+
+        fis.close();
+        streamOs.close();
+        logger.info("Completed local receiver stream for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
index 315fa25..e4d5e48 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
@@ -18,12 +18,7 @@
 package org.apache.airavata.mft.transport.local;
 
 import org.apache.airavata.mft.core.ConnectorContext;
-import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
-import org.apache.airavata.mft.resource.client.ResourceServiceClient;
-import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.local.resource.LocalResource;
-import org.apache.airavata.mft.resource.stubs.local.resource.LocalResourceGetRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,16 +28,12 @@
 
     private static final Logger logger = LoggerFactory.getLogger(LocalSender.class);
 
-    private LocalResource resource;
     private boolean initialized;
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
                      String secretServiceHost, int secretServicePort) throws Exception {
 
         this.initialized = true;
-
-        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.resource = resourceClient.local().getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
     }
 
     @Override
@@ -57,49 +48,42 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
 
         logger.info("Starting local sender stream for transfer {}", context.getTransferId());
 
         checkInitialized();
 
-        if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
-            InputStream in = context.getStreamBuffer().getInputStream();
-            long fileSize = context.getMetadata().getResourceSize();
-            OutputStream fos = new FileOutputStream(resource.getFile().getResourcePath());
+        InputStream in = context.getStreamBuffer().getInputStream();
+        long fileSize = context.getMetadata().getResourceSize();
+        OutputStream fos = new FileOutputStream(targetPath);
 
-            byte[] buf = new byte[1024];
-            while (true) {
-                int bufSize = 0;
+        byte[] buf = new byte[1024];
+        while (true) {
+            int bufSize = 0;
 
-                if (buf.length < fileSize) {
-                    bufSize = buf.length;
-                } else {
-                    bufSize = (int) fileSize;
-                }
-                bufSize = in.read(buf, 0, bufSize);
+            if (buf.length < fileSize) {
+                bufSize = buf.length;
+            } else {
+                bufSize = (int) fileSize;
+            }
+            bufSize = in.read(buf, 0, bufSize);
 
-                if (bufSize < 0) {
-                    break;
-                }
-
-                fos.write(buf, 0, bufSize);
-                fos.flush();
-
-                fileSize -= bufSize;
-                if (fileSize == 0L)
-                    break;
+            if (bufSize < 0) {
+                break;
             }
 
-            in.close();
-            fos.close();
+            fos.write(buf, 0, bufSize);
+            fos.flush();
 
-            logger.info("Completed local sender stream for transfer {}", context.getTransferId());
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.resource.getResourceId(), this.resource.getResourceCase().name());
-            throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
-                    this.resource.getResourceCase().name());
+            fileSize -= bufSize;
+            if (fileSize == 0L)
+                break;
         }
+
+        in.close();
+        fos.close();
+
+        logger.info("Completed local sender stream for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
index 572affd..58be5b6 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
@@ -30,8 +30,11 @@
 import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.resource.stubs.s3.resource.S3Resource;
 import org.apache.airavata.mft.resource.stubs.s3.resource.S3ResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 
@@ -85,7 +88,7 @@
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -94,7 +97,7 @@
         throw new UnsupportedOperationException("Method not implemented");    }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         throw new UnsupportedOperationException("Method not implemented");
     }
 
@@ -105,6 +108,23 @@
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
         S3Resource s3Resource = resourceClient.s3().getS3Resource(S3ResourceGetRequest.newBuilder().setResourceId(resourceId).build());
 
+        return isAvailable(s3Resource, credentialToken);
+    }
+
+    @Override
+    public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+        checkInitialized();
+        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+        S3Storage s3Storage = resourceClient.s3().getS3Storage(S3StorageGetRequest.newBuilder().setStorageId(storageId).build());
+        S3Resource s3Resource = S3Resource.newBuilder()
+                .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
+                .setS3Storage(s3Storage).build();
+
+        return isAvailable(s3Resource, credentialToken);
+    }
+
+    private Boolean isAvailable(S3Resource s3Resource, String credentialToken) throws Exception {
+
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
index ddfd425..2e7e816 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
@@ -32,6 +32,8 @@
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.s3.resource.S3Resource;
 import org.apache.airavata.mft.resource.stubs.s3.resource.S3ResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -43,15 +45,15 @@
 
     private static final Logger logger = LoggerFactory.getLogger(S3Receiver.class);
 
-    private S3Resource s3Resource;
     private AmazonS3 s3Client;
+    private S3Storage s3Storage;
 
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
-                                                    String secretServiceHost, int secretServicePort) throws Exception {
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+                     String secretServiceHost, int secretServicePort) throws Exception {
 
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.s3Resource = resourceClient.s3().getS3Resource(S3ResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        this.s3Storage = resourceClient.s3().getS3Storage(S3StorageGetRequest.newBuilder().setStorageId(storageId).build());
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
@@ -59,7 +61,7 @@
 
         s3Client = AmazonS3ClientBuilder.standard()
                 .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
-                .withRegion(s3Resource.getS3Storage().getRegion())
+                .withRegion(s3Storage.getRegion())
                 .build();
     }
 
@@ -69,31 +71,23 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
 
-        if (ResourceTypes.FILE.equals(this.s3Resource.getResourceCase().name())) {
-            logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
+        logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
 
-            S3Object s3object = s3Client.getObject(s3Resource.getS3Storage().getBucketName(), s3Resource.getFile().getResourcePath());
-            S3ObjectInputStream inputStream = s3object.getObjectContent();
+        S3Object s3object = s3Client.getObject(this.s3Storage.getBucketName(), targetPath);
+        S3ObjectInputStream inputStream = s3object.getObjectContent();
 
-            OutputStream os = context.getStreamBuffer().getOutputStream();
-            int read;
-            long bytes = 0;
-            while ((read = inputStream.read()) != -1) {
-                bytes++;
-                os.write(read);
-            }
-            os.flush();
-            os.close();
-
-            logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
-
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.s3Resource.getResourceId(), this.s3Resource.getResourceCase().name());
-            throw new Exception("Resource " + this.s3Resource.getResourceId() + " should be a FILE type. Found a " +
-                    this.s3Resource.getResourceCase().name());
+        OutputStream os = context.getStreamBuffer().getOutputStream();
+        int read;
+        long bytes = 0;
+        while ((read = inputStream.read()) != -1) {
+            bytes++;
+            os.write(read);
         }
+        os.flush();
+        os.close();
+
+        logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
index 095ed3c..a7a640c 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
@@ -31,6 +31,8 @@
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.s3.resource.S3Resource;
 import org.apache.airavata.mft.resource.stubs.s3.resource.S3ResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3StorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -41,13 +43,13 @@
     private static final Logger logger = LoggerFactory.getLogger(S3Sender.class);
 
     private AmazonS3 s3Client;
-    private S3Resource s3Resource;
+    private S3Storage s3Storage;
 
     @Override
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort, String secretServiceHost, int secretServicePort) throws Exception {
 
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.s3Resource = resourceClient.s3().getS3Resource(S3ResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        this.s3Storage = resourceClient.s3().getS3Storage(S3StorageGetRequest.newBuilder().setStorageId(storageId).build());
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         S3Secret s3Secret = secretClient.s3().getS3Secret(S3SecretGetRequest.newBuilder().setSecretId(credentialToken).build());
@@ -55,7 +57,7 @@
 
         s3Client = AmazonS3ClientBuilder.standard()
                 .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
-                .withRegion(s3Resource.getS3Storage().getRegion())
+                .withRegion(s3Storage.getRegion())
                 .build();
     }
 
@@ -65,23 +67,14 @@
     }
 
     @Override
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
 
         logger.info("Starting S3 Sender stream for transfer {}", context.getTransferId());
         logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
         ObjectMetadata metadata = new ObjectMetadata();
         metadata.setContentLength(context.getMetadata().getResourceSize());
 
-        if (ResourceTypes.FILE.equals(this.s3Resource.getResourceCase().name())) {
-            s3Client.putObject(this.s3Resource.getS3Storage().getBucketName(), this.s3Resource.getFile().getResourcePath(),
-                                                        context.getStreamBuffer().getInputStream(), metadata);
-            logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                    this.s3Resource.getResourceId(), this.s3Resource.getResourceCase().name());
-            throw new Exception("Resource " + this.s3Resource.getResourceId() + " should be a FILE type. Found a " +
-                    this.s3Resource.getResourceCase().name());
-        }
-
+        s3Client.putObject(this.s3Storage.getBucketName(), targetPath, context.getStreamBuffer().getInputStream(), metadata);
+        logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
index d0086a5..ac09acb 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
@@ -40,6 +40,9 @@
 import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.resource.stubs.scp.resource.SCPResource;
 import org.apache.airavata.mft.resource.stubs.scp.resource.SCPResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageCreateRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.apache.commons.io.IOUtils;
@@ -76,7 +79,7 @@
         }
     }
 
-    private FileResourceMetadata getFileResourceMetadata(SCPResource scpResource, SCPSecret scpSecret, String parentResourceId) throws Exception {
+    private FileResourceMetadata getFileResourceMetadata(SCPResource scpResource, SCPSecret scpSecret) throws Exception {
         try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
 
             logger.info("Fetching metadata for resource {} in {}", scpResource.getFile().getResourcePath(), scpResource.getScpStorage().getHost());
@@ -89,8 +92,6 @@
                 metadata.setResourceSize(lstat.getSize());
                 metadata.setCreatedTime(lstat.getAtime());
                 metadata.setUpdateTime(lstat.getMtime());
-                metadata.setParentResourceId(parentResourceId);
-                metadata.setParentResourceType("SCP");
                 metadata.setFriendlyName(new File(scpResource.getFile().getResourcePath()).getName());
                 metadata.setResourcePath(scpResource.getFile().getResourcePath());
 
@@ -127,28 +128,26 @@
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
-        return getFileResourceMetadata(scpResource, scpSecret, resourceId);
+        return getFileResourceMetadata(scpResource, scpSecret);
     }
 
     @Override
-    public FileResourceMetadata getFileResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public FileResourceMetadata getFileResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        SCPResource parentSCPResource = resourceClient.scp().getSCPResource(SCPResourceGetRequest.newBuilder().setResourceId(parentResourceId).build());
+        SCPStorage scpStorage = resourceClient.scp().getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
-        validateParent(parentSCPResource, resourcePath);
-
         SCPResource scpResource = SCPResource.newBuilder()
                                         .setFile(FileResource.newBuilder()
                                         .setResourcePath(resourcePath).build())
-                                        .setScpStorage(parentSCPResource.getScpStorage()).build();
+                                        .setScpStorage(scpStorage).build();
 
-        return getFileResourceMetadata(scpResource, scpSecret, parentResourceId);
+        return getFileResourceMetadata(scpResource, scpSecret);
     }
 
-    private DirectoryResourceMetadata getDirectoryResourceMetadata(SCPResource scpResource, SCPSecret scpSecret, String parentResourceId) throws Exception {
+    private DirectoryResourceMetadata getDirectoryResourceMetadata(SCPResource scpResource, SCPSecret scpSecret) throws Exception {
         try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
 
             logger.info("Fetching metadata for resource {} in {}", scpResource.getFile().getResourcePath(), scpResource.getScpStorage().getHost());
@@ -167,9 +166,7 @@
                                         .withFriendlyName(rri.getName())
                                         .withResourcePath(rri.getPath())
                                         .withCreatedTime(rri.getAttributes().getAtime())
-                                        .withUpdateTime(rri.getAttributes().getMtime())
-                                        .withParentResourceId(parentResourceId)
-                                        .withParentResourceType("SCP");
+                                        .withUpdateTime(rri.getAttributes().getMtime());
                         dirMetadataBuilder = dirMetadataBuilder.withDirectory(childDirBuilder.build());
                     }
 
@@ -178,20 +175,16 @@
                                         .withFriendlyName(rri.getName())
                                         .withResourcePath(rri.getPath())
                                         .withCreatedTime(rri.getAttributes().getAtime())
-                                        .withUpdateTime(rri.getAttributes().getMtime())
-                                        .withParentResourceId(parentResourceId)
-                                        .withParentResourceType("SCP");
+                                        .withUpdateTime(rri.getAttributes().getMtime());
 
                         dirMetadataBuilder = dirMetadataBuilder.withFile(childFileBuilder.build());
                     }
                 }
 
                 dirMetadataBuilder = dirMetadataBuilder.withFriendlyName(new File(scpResource.getDirectory().getResourcePath()).getName())
-                        .withResourcePath(parentResourceId)
+                        .withResourcePath(scpResource.getDirectory().getResourcePath())
                         .withCreatedTime(lsStat.getAtime())
-                        .withUpdateTime(lsStat.getMtime())
-                        .withParentResourceId(parentResourceId)
-                        .withParentResourceType("SCP");
+                        .withUpdateTime(lsStat.getMtime());
                 return dirMetadataBuilder.build();
             }
         }
@@ -206,24 +199,23 @@
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
-        return getDirectoryResourceMetadata(scpPResource, scpSecret, resourceId);
+        return getDirectoryResourceMetadata(scpPResource, scpSecret);
     }
 
     @Override
-    public DirectoryResourceMetadata getDirectoryResourceMetadata(String parentResourceId, String resourcePath, String credentialToken) throws Exception {
+    public DirectoryResourceMetadata getDirectoryResourceMetadata(String storageId, String resourcePath, String credentialToken) throws Exception {
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        SCPResource parentSCPPResource = resourceClient.scp().getSCPResource(SCPResourceGetRequest.newBuilder().setResourceId(parentResourceId).build());
+        SCPStorage scpStorage = resourceClient.scp().getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
-        validateParent(parentSCPPResource, resourcePath);
 
-        SCPResource scpResource = SCPResource.newBuilder().setScpStorage(parentSCPPResource.getScpStorage())
+        SCPResource scpResource = SCPResource.newBuilder().setScpStorage(scpStorage)
                         .setDirectory(DirectoryResource.newBuilder()
                         .setResourcePath(resourcePath).build()).build();
 
-        return getDirectoryResourceMetadata(scpResource, scpSecret, parentResourceId);
+        return getDirectoryResourceMetadata(scpResource, scpSecret);
     }
 
     @Override
@@ -233,6 +225,23 @@
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
         SCPResource scpResource = resourceClient.scp().getSCPResource(SCPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
 
+        return isAvailable(scpResource, credentialToken);
+    }
+
+    @Override
+    public Boolean isAvailable(String storageId, String resourcePath, String credentialToken) throws Exception {
+        checkInitialized();
+        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
+        SCPStorage scpStorage = resourceClient.scp().getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
+        SCPResource scpResource = SCPResource.newBuilder()
+                .setFile(FileResource.newBuilder().setResourcePath(resourcePath).build())
+                .setScpStorage(scpStorage).build();
+
+        return isAvailable(scpResource, credentialToken);
+    }
+
+    public Boolean isAvailable(SCPResource scpResource, String credentialToken) throws Exception {
+
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
index 127ea8c..516ff7d 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
@@ -21,14 +21,13 @@
 import com.jcraft.jsch.Session;
 import org.apache.airavata.mft.core.ConnectorContext;
 import org.apache.airavata.mft.core.DoubleStreamingBuffer;
-import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
-import org.apache.airavata.mft.resource.stubs.scp.resource.SCPResource;
-import org.apache.airavata.mft.resource.stubs.scp.resource.SCPResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -43,30 +42,37 @@
     boolean initialized = false;
 
     private Session session;
-    private SCPResource scpResource;
 
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
                      String secretServiceHost, int secretServicePort) throws Exception {
 
+        if (initialized) {
+            destroy();
+        }
+
         this.initialized = true;
 
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.scpResource = resourceClient.scp().getSCPResource(SCPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        SCPStorage scpStorage = resourceClient.scp().getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
         this.session = SCPTransportUtil.createSession(
-                scpResource.getScpStorage().getUser(),
-                scpResource.getScpStorage().getHost(),
-                scpResource.getScpStorage().getPort(),
+                scpStorage.getUser(),
+                scpStorage.getHost(),
+                scpStorage.getPort(),
                 scpSecret.getPrivateKey().getBytes(),
                 scpSecret.getPublicKey().getBytes(),
                 scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
     }
 
     public void destroy() {
-
+        try {
+            this.session.disconnect();
+        } catch (Exception e) {
+            logger.error("Errored while disconnecting session", e);
+        }
     }
 
     private void checkInitialized() {
@@ -75,23 +81,16 @@
         }
     }
 
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
         checkInitialized();
         if (session == null) {
             logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
             throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
         }
 
-        if (ResourceTypes.FILE.equals(this.scpResource.getResourceCase().name())) {
-            transferRemoteToStream(session, this.scpResource.getFile().getResourcePath(), context.getStreamBuffer());
-            logger.info("SCP Receive completed. Transfer {}", context.getTransferId());
+        transferRemoteToStream(session, targetPath, context.getStreamBuffer());
+        logger.info("SCP Receive completed. Transfer {}", context.getTransferId());
 
-        } else {
-            logger.error("Resource {} should be a FILE type. Found a {}",
-                                            this.scpResource.getResourceId(), this.scpResource.getResourceCase().name());
-            throw new Exception("Resource " + this.scpResource.getResourceId() + " should be a FILE type. Found a " +
-                                                                            this.scpResource.getResourceCase().name());
-        }
     }
 
     private void transferRemoteToStream(Session session, String from, DoubleStreamingBuffer streamBuffer) throws Exception {
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
index b552f0e..01b8ed4 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
@@ -30,6 +30,8 @@
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
 import org.apache.airavata.mft.resource.stubs.scp.resource.SCPResource;
 import org.apache.airavata.mft.resource.stubs.scp.resource.SCPResourceGetRequest;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage;
+import org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorageGetRequest;
 import org.apache.airavata.mft.secret.client.SecretServiceClient;
 import org.apache.airavata.mft.secret.client.SecretServiceClientBuilder;
 import org.slf4j.Logger;
@@ -44,27 +46,28 @@
     boolean initialized = false;
 
     private Session session;
-    private SCPResource scpResource;
 
-    public void init(String resourceId, String credentialToken, String resourceServiceHost, int resourceServicePort,
+    public void init(String storageId, String credentialToken, String resourceServiceHost, int resourceServicePort,
                      String secretServiceHost, int secretServicePort) throws Exception {
 
+        if (initialized) {
+            destroy();
+        }
+
         this.initialized = true;
 
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
-        this.scpResource = resourceClient.scp().getSCPResource(SCPResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        SCPStorage scpStorage = resourceClient.scp().getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(storageId).build());
 
         SecretServiceClient secretClient = SecretServiceClientBuilder.buildClient(secretServiceHost, secretServicePort);
         SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
-        logger.info("Creating a ssh session for {}@{}:{}",
-                scpResource.getScpStorage().getUser(), scpResource.getScpStorage().getHost(),
-                scpResource.getScpStorage().getPort());
+        logger.info("Creating a ssh session for {}@{}:{}", scpStorage.getUser(), scpStorage.getHost(), scpStorage.getPort());
 
         this.session = SCPTransportUtil.createSession(
-                scpResource.getScpStorage().getUser(),
-                scpResource.getScpStorage().getHost(),
-                scpResource.getScpStorage().getPort(),
+                scpStorage.getUser(),
+                scpStorage.getHost(),
+                scpStorage.getPort(),
                 scpSecret.getPrivateKey().getBytes(),
                 scpSecret.getPublicKey().getBytes(),
                 scpSecret.getPassphrase().equals("")? null : scpSecret.getPassphrase().getBytes());
@@ -72,11 +75,11 @@
 
     public void destroy() {
 
-        //try {
-        //    this.session.disconnect();
-        //} catch (Exception e) {
-        //    logger.error("Errored while disconnecting session", e);
-        //}
+        try {
+            this.session.disconnect();
+        } catch (Exception e) {
+            logger.error("Errored while disconnecting session", e);
+        }
     }
 
     private void checkInitialized() {
@@ -85,7 +88,7 @@
         }
     }
 
-    public void startStream(ConnectorContext context) throws Exception {
+    public void startStream(String targetPath, ConnectorContext context) throws Exception {
 
         checkInitialized();
         if (session == null) {
@@ -93,16 +96,8 @@
             throw new Exception("Session can not be null. Make sure that SCP Sender is properly initialized");
         }
         try {
-            if (ResourceTypes.FILE.equals(this.scpResource.getResourceCase().name())) {
-                copyLocalToRemote(this.session, this.scpResource.getFile().getResourcePath(), context.getStreamBuffer(), context.getMetadata().getResourceSize());
-                logger.info("SCP send to transfer {} completed", context.getTransferId());
-
-            } else {
-                logger.error("Resource {} should be a FILE type. Found a {}",
-                        this.scpResource.getResourceId(), this.scpResource.getResourceCase().name());
-                throw new Exception("Resource " + this.scpResource.getResourceId() + " should be a FILE type. Found a " +
-                        this.scpResource.getResourceCase().name());
-            }
+            copyLocalToRemote(this.session, targetPath, context.getStreamBuffer(), context.getMetadata().getResourceSize());
+            logger.info("SCP send to transfer {} completed", context.getTransferId());
 
         } catch (Exception e) {
             logger.error("Errored while streaming to remote scp server. Transfer {}", context.getTransferId() , e);