Introducing file resource and directory resource types
diff --git a/core/src/main/java/org/apache/airavata/mft/core/ResourceTypes.java b/core/src/main/java/org/apache/airavata/mft/core/ResourceTypes.java
new file mode 100644
index 0000000..cfa92d2
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/ResourceTypes.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.core;
+
+public final class ResourceTypes {
+    public static final String FILE = "FILE";
+    public static final String DIRECTORY = "DIRECTORY";
+}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
index 36466a6..6299a07 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
@@ -20,6 +20,7 @@
 import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
 import org.apache.airavata.mft.resource.stubs.azure.resource.*;
 import org.apache.airavata.mft.resource.stubs.box.resource.*;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
 import org.apache.airavata.mft.resource.stubs.ftp.resource.*;
 import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
@@ -129,7 +130,7 @@
 
         SCPResource scpResource = SCPResource.newBuilder()
                 .setResourceId(resourceId)
-                .setResourcePath(path)
+                .setFile(FileResource.newBuilder().setResourcePath(path).build())
                 .setScpStorage(getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(resourceId).build()).get())
                 .build();
         return Optional.of(scpResource);
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
index d445998..151f868 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
@@ -20,6 +20,8 @@
 import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
 import org.apache.airavata.mft.resource.stubs.azure.resource.*;
 import org.apache.airavata.mft.resource.stubs.box.resource.*;
+import org.apache.airavata.mft.resource.stubs.common.DirectoryResource;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
 import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
 import org.apache.airavata.mft.resource.stubs.ftp.resource.*;
 import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
@@ -106,12 +108,22 @@
                                 .setUser(((JSONObject)r.get("scpStorage")).get("user").toString())
                                 .setPort(Integer.parseInt(((JSONObject)r.get("scpStorage")).get("port").toString())).build();
 
-                        SCPResource scpResource = SCPResource.newBuilder()
-                                .setResourcePath(r.get("resourcePath").toString())
+                        SCPResource.Builder builder = SCPResource.newBuilder()
                                 .setResourceId(r.get("resourceId").toString())
-                                .setScpStorage(storage).build();
+                                .setScpStorage(storage);
 
-                        return scpResource;
+                        switch (r.get("resourceMode").toString()) {
+                            case "FILE":
+                                FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setFile(fileResource);
+                                break;
+                            case "DIRECTORY":
+                                DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setDirectory(directoryResource);
+                                break;
+                        }
+                        return builder.build();
+
                     }).collect(Collectors.toList());
 
             return scpResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
@@ -152,11 +164,19 @@
                     .map(resource -> {
                         JSONObject r = (JSONObject) resource;
 
-                        LocalResource localResource = LocalResource.newBuilder()
-                                .setResourcePath(r.get("resourcePath").toString())
-                                .setResourceId(r.get("resourceId").toString()).build();
+                        LocalResource.Builder builder = LocalResource.newBuilder().setResourceId(r.get("resourceId").toString());
 
-                        return localResource;
+                        switch (r.get("resourceMode").toString()) {
+                            case "FILE":
+                                FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setFile(fileResource);
+                                break;
+                            case "DIRECTORY":
+                                DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setDirectory(directoryResource);
+                                break;
+                        }
+                        return builder.build();
                     }).collect(Collectors.toList());
             return localResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
         }
@@ -195,14 +215,22 @@
                     .map(resource -> {
                         JSONObject r = (JSONObject) resource;
 
-                        S3Resource s3Resource = S3Resource.newBuilder()
-                                .setResourcePath(r.get("resourcePath").toString())
+                        S3Resource.Builder builder = S3Resource.newBuilder()
                                 .setResourceId(r.get("resourceId").toString())
                                 .setBucketName(r.get("bucketName").toString())
-                                .setRegion(r.get("region").toString())
-                                .build();
+                                .setRegion(r.get("region").toString());
 
-                        return s3Resource;
+                        switch (r.get("resourceMode").toString()) {
+                            case "FILE":
+                                FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setFile(fileResource);
+                                break;
+                            case "DIRECTORY":
+                                DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setDirectory(directoryResource);
+                                break;
+                        }
+                        return builder.build();
                     }).collect(Collectors.toList());
             return s3Resources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
         }
@@ -243,12 +271,20 @@
                     .map(resource -> {
                         JSONObject r = (JSONObject) resource;
 
-                        BoxResource boxResource = BoxResource.newBuilder()
-                                .setResourceId(r.get("resourceId").toString())
-                                .setBoxFileId(r.get("boxFileId").toString())
-                                .build();
+                        BoxResource.Builder builder = BoxResource.newBuilder()
+                                .setResourceId(r.get("resourceId").toString());
 
-                        return boxResource;
+                        switch (r.get("resourceMode").toString()) {
+                            case "FILE":
+                                FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setFile(fileResource);
+                                break;
+                            case "DIRECTORY":
+                                DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setDirectory(directoryResource);
+                                break;
+                        }
+                        return builder.build();
                     }).collect(Collectors.toList());
             return boxResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
         }
@@ -285,13 +321,22 @@
                     .map(resource -> {
                         JSONObject r = (JSONObject) resource;
 
-                        AzureResource azureResource = AzureResource.newBuilder()
-                                .setBlobName(r.get("blobName").toString())
+                        AzureResource.Builder builder = AzureResource.newBuilder()
                                 .setContainer(r.get("container").toString())
-                                .setResourceId(r.get("resourceId").toString())
-                                .build();
+                                .setResourceId(r.get("resourceId").toString());
 
-                        return azureResource;
+                        switch (r.get("resourceMode").toString()) {
+                            case "FILE":
+                                FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setFile(fileResource);
+                                break;
+                            case "DIRECTORY":
+                                DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setDirectory(directoryResource);
+                                break;
+                        }
+                        return builder.build();
+
                     }).collect(Collectors.toList());
             return azureResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
         }
@@ -327,13 +372,22 @@
                     .map(resource -> {
                         JSONObject r = (JSONObject) resource;
 
-                        GCSResource gcsResource = GCSResource.newBuilder()
+                        GCSResource.Builder builder = GCSResource.newBuilder()
                                 .setBucketName(r.get("bucketName").toString())
-                                .setResourceId(r.get("resourceId").toString())
-                                .setResourcePath(r.get("resourcePath").toString())
-                                .build();
+                                .setResourceId(r.get("resourceId").toString());
 
-                        return gcsResource;
+                        switch (r.get("resourceMode").toString()) {
+                            case "FILE":
+                                FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setFile(fileResource);
+                                break;
+                            case "DIRECTORY":
+                                DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setDirectory(directoryResource);
+                                break;
+                        }
+                        return builder.build();
+
                     }).collect(Collectors.toList());
             return gcsResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
         }
@@ -369,12 +423,21 @@
                         JSONObject r = (JSONObject) resource;
                         String resourcePath = r.get("resourcePath").toString();
                         resourcePath = resourcePath.startsWith("/") ? resourcePath : "/" + resourcePath;
-                        DropboxResource dropboxResource = DropboxResource.newBuilder()
-                                .setResourceId(r.get("resourceId").toString())
-                                .setResourcePath(resourcePath)
-                                .build();
+                        DropboxResource.Builder builder = DropboxResource.newBuilder()
+                                .setResourceId(r.get("resourceId").toString());
 
-                        return dropboxResource;
+                        switch (r.get("resourceMode").toString()) {
+                            case "FILE":
+                                FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setFile(fileResource);
+                                break;
+                            case "DIRECTORY":
+                                DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setDirectory(directoryResource);
+                                break;
+                        }
+                        return builder.build();
+
                     }).collect(Collectors.toList());
             return dropboxResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
         }
@@ -421,10 +484,22 @@
                                 .setHost(((JSONObject)r.get("ftpStorage")).get("host").toString())
                                 .setPort(Integer.parseInt(((JSONObject)r.get("ftpStorage")).get("port").toString())).build();
 
-                        return FTPResource.newBuilder()
-                                .setResourcePath(r.get("resourcePath").toString())
+                        FTPResource.Builder builder = FTPResource.newBuilder()
                                 .setResourceId(r.get("resourceId").toString())
-                                .setFtpStorage(storage).build();
+                                .setFtpStorage(storage);
+
+                        switch (r.get("resourceMode").toString()) {
+                            case "FILE":
+                                FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setFile(fileResource);
+                                break;
+                            case "DIRECTORY":
+                                DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+                                builder = builder.setDirectory(directoryResource);
+                                break;
+                        }
+                        return builder.build();
+
                     }).collect(Collectors.toList());
 
             return ftpResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
diff --git a/services/resource-service/server/src/main/resources/resources.json b/services/resource-service/server/src/main/resources/resources.json
index 5c960fb..9d430db 100644
--- a/services/resource-service/server/src/main/resources/resources.json
+++ b/services/resource-service/server/src/main/resources/resources.json
@@ -2,6 +2,7 @@
   {
     "type": "SCP",
     "resourceId":  "remote-ssh-resource",
+    "resourceMode": "FILE",
     "resourcePath": "/tmp/1mb.txt",
     "scpStorage" : {
       "storageId": "remote-ssh-storage",
@@ -13,6 +14,7 @@
   {
     "type": "SCP",
     "resourceId":  "remote-ssh-resource2",
+    "resourceMode": "FILE",
     "resourcePath": "/tmp/10mb.txt",
     "scpStorage" : {
       "storageId": "remote-ssh-storage",
@@ -24,45 +26,53 @@
   {
     "type": "LOCAL",
     "resourceId":  "10mb-file",
-    "resourcePath": "/tmp/1mb.txt"
+    "resourceMode": "FILE",
+    "resourcePath": "/tmp/10mb.txt"
   },
   {
     "type": "S3",
     "resourceId": "s3-file",
-    "resourcePath": "new-file.txt",
+    "resourceMode": "FILE",
+    "resourcePath": "10mb-s3.txt",
     "region": "us-east-2",
-    "bucketName": "s3-bucket"
+    "bucketName": "airavata-s3"
   },
   {
     "type": "BOX",
     "resourceId": "box-file-abcd",
-    "boxFileId": "655108198452"
+    "resourceMode": "FILE",
+    "resourcePath": "655108198452"
   },
   {
     "type": "BOX",
     "resourceId": "box-file-efgh",
-    "boxFileId": "655450661536"
+    "resourceMode": "FILE",
+    "resourcePath": "655450661536"
   },
   {
     "type": "AZURE",
     "resourceId": "azure-blob",
     "container": "sample-container",
-    "blobName": "sample.blob"
+    "resourceMode": "FILE",
+    "resourcePath": "sample.blob"
   },
   {
     "type": "GCS",
     "resourceId": "gcs-bucket",
+    "resourceMode": "FILE",
     "bucketName": "pika-pika-bucket",
     "resourcePath": "PikaPikaTest.txt"
   },
   {
     "type": "DROPBOX",
     "resourceId": "dropbox-file",
+    "resourceMode": "FILE",
     "resourcePath": "/test.txt"
   },
   {
     "type": "FTP",
     "resourceId": "ftp-resource",
+    "resourceMode": "FILE",
     "resourcePath": "mft-1mb.txt",
     "ftpStorage": {
       "storageId": "ftp-resource",
diff --git a/services/resource-service/stub/src/main/proto/azure/AzureResource.proto b/services/resource-service/stub/src/main/proto/azure/AzureResource.proto
index fb0cf75..084fbc5 100644
--- a/services/resource-service/stub/src/main/proto/azure/AzureResource.proto
+++ b/services/resource-service/stub/src/main/proto/azure/AzureResource.proto
@@ -20,11 +20,15 @@
 option java_multiple_files = true;
 package org.apache.airavata.mft.resource.stubs.azure.resource;
 
+import "common/common.proto";
 
 message AzureResource {
     string resourceId = 1;
     string container = 2;
-    string blobName = 3;
+    oneof resource {
+        org.apache.airavata.mft.resource.stubs.common.FileResource file = 3;
+        org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 4;
+    }
 }
 
 message AzureResourceGetRequest {
diff --git a/services/resource-service/stub/src/main/proto/box/BoxResource.proto b/services/resource-service/stub/src/main/proto/box/BoxResource.proto
index dc4be83..7899fdb 100644
--- a/services/resource-service/stub/src/main/proto/box/BoxResource.proto
+++ b/services/resource-service/stub/src/main/proto/box/BoxResource.proto
@@ -20,10 +20,14 @@
 option java_multiple_files = true;
 package org.apache.airavata.mft.resource.stubs.box.resource;
 
+import "common/common.proto";
+
 message BoxResource {
     string resourceId = 1;
-    string boxFileId = 2;
-}
+    oneof resource {
+        org.apache.airavata.mft.resource.stubs.common.FileResource file = 2;
+        org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 3;
+    }}
 
 message BoxResourceGetRequest {
     string resourceId = 1;
diff --git a/services/resource-service/stub/src/main/proto/common/common.proto b/services/resource-service/stub/src/main/proto/common/common.proto
new file mode 100644
index 0000000..4d69533
--- /dev/null
+++ b/services/resource-service/stub/src/main/proto/common/common.proto
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+package org.apache.airavata.mft.resource.stubs.common;
+
+message FileResource {
+    string resourcePath = 1;
+}
+
+message DirectoryResource {
+    string resourcePath = 1;
+    repeated FileResource files = 2;
+    repeated DirectoryResource directories = 3;
+}
\ No newline at end of file
diff --git a/services/resource-service/stub/src/main/proto/dropbox/DropboxResource.proto b/services/resource-service/stub/src/main/proto/dropbox/DropboxResource.proto
index 215657d..6ae5af9 100644
--- a/services/resource-service/stub/src/main/proto/dropbox/DropboxResource.proto
+++ b/services/resource-service/stub/src/main/proto/dropbox/DropboxResource.proto
@@ -20,9 +20,14 @@
 option java_multiple_files = true;
 package org.apache.airavata.mft.resource.stubs.dropbox.resource;
 
+import "common/common.proto";
+
 message DropboxResource {
     string resourceId = 1;
-    string resourcePath = 2;
+    oneof resource {
+        org.apache.airavata.mft.resource.stubs.common.FileResource file = 2;
+        org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 3;
+    }
 }
 
 message DropboxResourceGetRequest {
diff --git a/services/resource-service/stub/src/main/proto/ftp/FTPResource.proto b/services/resource-service/stub/src/main/proto/ftp/FTPResource.proto
index c7f3e63..2ba2b9e 100644
--- a/services/resource-service/stub/src/main/proto/ftp/FTPResource.proto
+++ b/services/resource-service/stub/src/main/proto/ftp/FTPResource.proto
@@ -21,11 +21,15 @@
 package org.apache.airavata.mft.resource.stubs.ftp.resource;
 
 import "ftp/FTPStorage.proto";
+import "common/common.proto";
 
 message FTPResource {
     string resourceId = 1;
     org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorage ftpStorage = 2;
-    string resourcePath = 3;
+    oneof resource {
+        org.apache.airavata.mft.resource.stubs.common.FileResource file = 3;
+        org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 4;
+    }
 }
 
 message FTPResourceGetRequest {
diff --git a/services/resource-service/stub/src/main/proto/gcs/GCSResource.proto b/services/resource-service/stub/src/main/proto/gcs/GCSResource.proto
index 795196d..36f3c33 100644
--- a/services/resource-service/stub/src/main/proto/gcs/GCSResource.proto
+++ b/services/resource-service/stub/src/main/proto/gcs/GCSResource.proto
@@ -20,10 +20,15 @@
 option java_multiple_files = true;
 package org.apache.airavata.mft.resource.stubs.gcs.resource;
 
+import "common/common.proto";
+
 message GCSResource {
     string resourceId = 1;
     string bucketName = 2;
-    string resourcePath = 3;
+    oneof resource {
+        org.apache.airavata.mft.resource.stubs.common.FileResource file = 3;
+        org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 4;
+    }
 }
 
 message GCSResourceGetRequest {
diff --git a/services/resource-service/stub/src/main/proto/local/LocalResource.proto b/services/resource-service/stub/src/main/proto/local/LocalResource.proto
index ed64e90..1978b0e 100644
--- a/services/resource-service/stub/src/main/proto/local/LocalResource.proto
+++ b/services/resource-service/stub/src/main/proto/local/LocalResource.proto
@@ -20,10 +20,15 @@
 option java_multiple_files = true;
 package org.apache.airavata.mft.resource.stubs.local.resource;
 
+import "common/common.proto";
+
 message LocalResource {
     string resourceId = 1;
-    string resourcePath = 2;
-    string agentId = 3;
+    string agentId = 2;
+    oneof resource {
+        org.apache.airavata.mft.resource.stubs.common.FileResource file = 3;
+        org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 4;
+    }
 }
 
 message LocalResourceGetRequest {
diff --git a/services/resource-service/stub/src/main/proto/s3/S3Resource.proto b/services/resource-service/stub/src/main/proto/s3/S3Resource.proto
index 3727f27..9a9404a 100644
--- a/services/resource-service/stub/src/main/proto/s3/S3Resource.proto
+++ b/services/resource-service/stub/src/main/proto/s3/S3Resource.proto
@@ -20,11 +20,16 @@
 option java_multiple_files = true;
 package org.apache.airavata.mft.resource.stubs.s3.resource;
 
+import "common/common.proto";
+
 message S3Resource {
     string resourceId = 1;
     string bucketName = 2;
     string region = 3;
-    string resourcePath = 4;
+    oneof resource {
+        org.apache.airavata.mft.resource.stubs.common.FileResource file = 4;
+        org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 5;
+    }
 }
 
 message S3ResourceGetRequest {
diff --git a/services/resource-service/stub/src/main/proto/scp/SCPResource.proto b/services/resource-service/stub/src/main/proto/scp/SCPResource.proto
index 07136a7..bfa51fb 100644
--- a/services/resource-service/stub/src/main/proto/scp/SCPResource.proto
+++ b/services/resource-service/stub/src/main/proto/scp/SCPResource.proto
@@ -21,11 +21,15 @@
 package org.apache.airavata.mft.resource.stubs.scp.resource;
 
 import "scp/SCPStorage.proto";
+import "common/common.proto";
 
 message SCPResource {
     string resourceId = 1;
     org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage scpStorage = 2;
-    string resourcePath = 3;
+    oneof resource {
+        org.apache.airavata.mft.resource.stubs.common.FileResource file = 3;
+        org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 4;
+    }
 }
 
 message SCPResourceGetRequest {
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
index 0d1c321..1a725cb 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
@@ -23,6 +23,7 @@
 import com.azure.storage.blob.BlobServiceClientBuilder;
 import com.azure.storage.blob.models.BlobProperties;
 import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.azure.AzureSecret;
 import org.apache.airavata.mft.credential.stubs.azure.AzureSecretGetRequest;
@@ -72,7 +73,7 @@
 
         BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(azureSecret.getConnectionString()).buildClient();
 
-        BlobClient blobClient = blobServiceClient.getBlobContainerClient(azureResource.getContainer()).getBlobClient(azureResource.getBlobName());
+        BlobClient blobClient = blobServiceClient.getBlobContainerClient(azureResource.getContainer()).getBlobClient(azureResource.getFile().getResourcePath());
 
         BlobProperties properties = blobClient.getBlockBlobClient().getProperties();
         ResourceMetadata metadata = new ResourceMetadata();
@@ -107,6 +108,12 @@
         if (!containerExists) {
             return false;
         }
-        return containerClient.getBlobClient(azureResource.getBlobName()).exists();
+        switch (azureResource.getResourceCase().name()){
+            case ResourceTypes.FILE:
+                return containerClient.getBlobClient(azureResource.getFile().getResourcePath()).exists();
+            case ResourceTypes.DIRECTORY:
+                return containerClient.getBlobClient(azureResource.getDirectory().getResourcePath()).exists();
+        }
+        return false;
     }
 }
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java
index 6d5a51e..162d31d 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java
@@ -23,6 +23,7 @@
 import com.azure.storage.blob.BlobServiceClientBuilder;
 import com.azure.storage.blob.specialized.BlobInputStream;
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.azure.AzureSecret;
 import org.apache.airavata.mft.credential.stubs.azure.AzureSecretGetRequest;
@@ -74,37 +75,46 @@
     public void startStream(ConnectorContext context) throws Exception {
         logger.info("Starting azure receive for remote server for transfer {}", context.getTransferId());
         checkInitialized();
-        BlobClient blobClient = containerClient.getBlobClient(azureResource.getBlobName());
-        BlobInputStream blobInputStream = blobClient.openInputStream();
 
-        OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+        if (ResourceTypes.FILE.equals(this.azureResource.getResourceCase().name())) {
+            BlobClient blobClient = containerClient.getBlobClient(azureResource.getFile().getResourcePath());
+            BlobInputStream blobInputStream = blobClient.openInputStream();
 
-        long fileSize = context.getMetadata().getResourceSize();
+            OutputStream streamOs = context.getStreamBuffer().getOutputStream();
 
-        byte[] buf = new byte[1024];
-        while (true) {
-            int bufSize = 0;
+            long fileSize = context.getMetadata().getResourceSize();
 
-            if (buf.length < fileSize) {
-                bufSize = buf.length;
-            } else {
-                bufSize = (int) fileSize;
-            }
-            bufSize = blobInputStream.read(buf, 0, bufSize);
+            byte[] buf = new byte[1024];
+            while (true) {
+                int bufSize = 0;
 
-            if (bufSize < 0) {
-                break;
+                if (buf.length < fileSize) {
+                    bufSize = buf.length;
+                } else {
+                    bufSize = (int) fileSize;
+                }
+                bufSize = blobInputStream.read(buf, 0, bufSize);
+
+                if (bufSize < 0) {
+                    break;
+                }
+
+                streamOs.write(buf, 0, bufSize);
+                streamOs.flush();
+
+                fileSize -= bufSize;
+                if (fileSize == 0L)
+                    break;
             }
 
-            streamOs.write(buf, 0, bufSize);
-            streamOs.flush();
+            streamOs.close();
+            logger.info("Completed azure receive for remote server for transfer {}", context.getTransferId());
 
-            fileSize -= bufSize;
-            if (fileSize == 0L)
-                break;
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.azureResource.getResourceId(), this.azureResource.getResourceCase().name());
+            throw new Exception("Resource " + this.azureResource.getResourceId() + " should be a FILE type. Found a " +
+                    this.azureResource.getResourceCase().name());
         }
-
-        streamOs.close();
-        logger.info("Completed azure receive for remote server for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java
index a2d05dc..9642103 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java
@@ -22,6 +22,7 @@
 import com.azure.storage.blob.BlobServiceClientBuilder;
 import com.azure.storage.blob.specialized.BlockBlobClient;
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.azure.AzureSecret;
 import org.apache.airavata.mft.credential.stubs.azure.AzureSecretGetRequest;
@@ -71,9 +72,17 @@
     public void startStream(ConnectorContext context) throws Exception {
         logger.info("Starting Azure send for remote server for transfer {}", context.getTransferId());
         checkInitialized();
-        BlockBlobClient blockBlobClient = containerClient.getBlobClient(azureResource.getBlobName()).getBlockBlobClient();
-        blockBlobClient.upload(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize(), true);
-        logger.info("Completed Azure send for remote server for transfer {}", context.getTransferId());
+
+        if (ResourceTypes.FILE.equals(this.azureResource.getResourceCase().name())) {
+            BlockBlobClient blockBlobClient = containerClient.getBlobClient(azureResource.getFile().getResourcePath()).getBlockBlobClient();
+            blockBlobClient.upload(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize(), true);
+            logger.info("Completed Azure send for remote server for transfer {}", context.getTransferId());
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.azureResource.getResourceId(), this.azureResource.getResourceCase().name());
+            throw new Exception("Resource " + this.azureResource.getResourceId() + " should be a FILE type. Found a " +
+                    this.azureResource.getResourceCase().name());
+        }
 
     }
 }
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
index 06a0c16..b55a0f2 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
@@ -21,6 +21,7 @@
 import com.box.sdk.BoxAPIConnection;
 import com.box.sdk.BoxFile;
 import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.box.BoxSecret;
 import org.apache.airavata.mft.credential.stubs.box.BoxSecretGetRequest;
@@ -66,7 +67,7 @@
         BoxSecret boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
         BoxAPIConnection api = new BoxAPIConnection(boxSecret.getAccessToken());
-        BoxFile boxFile = new BoxFile(api, boxResource.getBoxFileId());
+        BoxFile boxFile = new BoxFile(api, boxResource.getFile().getResourcePath());
         BoxFile.Info boxFileInfo = boxFile.getInfo();
 
         ResourceMetadata metadata = new ResourceMetadata();
@@ -93,8 +94,14 @@
         BoxSecret boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
         BoxAPIConnection api = new BoxAPIConnection(boxSecret.getAccessToken());
-        BoxFile boxFile = new BoxFile(api, boxResource.getBoxFileId());
 
+        BoxFile boxFile;
+        switch (boxResource.getResourceCase().name()){
+            case ResourceTypes.FILE:
+                boxFile = new BoxFile(api, boxResource.getFile().getResourcePath());
+            case ResourceTypes.DIRECTORY:
+                boxFile = new BoxFile(api, boxResource.getDirectory().getResourcePath());
+        }
         return true;
     }
 }
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java
index 20af8f7..907b485 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java
@@ -21,6 +21,7 @@
 import com.box.sdk.BoxAPIConnection;
 import com.box.sdk.BoxFile;
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.box.BoxSecret;
 import org.apache.airavata.mft.credential.stubs.box.BoxSecretGetRequest;
@@ -65,15 +66,23 @@
     @Override
     public void startStream(ConnectorContext context) throws Exception {
 
-        logger.info("Starting Box Receiver stream for transfer {}", context.getTransferId());
+        if (ResourceTypes.FILE.equals(this.boxResource.getResourceCase().name())) {
+            logger.info("Starting Box Receiver stream for transfer {}", context.getTransferId());
 
-        BoxFile file = new BoxFile(this.boxClient, this.boxResource.getBoxFileId());
+            BoxFile file = new BoxFile(this.boxClient, this.boxResource.getFile().getResourcePath());
 
-        OutputStream os = context.getStreamBuffer().getOutputStream();
-        file.download(os);
-        os.flush();
-        os.close();
+            OutputStream os = context.getStreamBuffer().getOutputStream();
+            file.download(os);
+            os.flush();
+            os.close();
 
-        logger.info("Completed Box Receiver stream for transfer {}", context.getTransferId());
+            logger.info("Completed Box Receiver stream for transfer {}", context.getTransferId());
+
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.boxResource.getResourceId(), this.boxResource.getResourceCase().name());
+            throw new Exception("Resource " + this.boxResource.getResourceId() + " should be a FILE type. Found a " +
+                    this.boxResource.getResourceCase().name());
+        }
     }
 }
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java
index 528bfc1..c9b279f 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java
@@ -21,6 +21,7 @@
 import com.box.sdk.BoxAPIConnection;
 import com.box.sdk.BoxFile;
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.box.BoxSecret;
 import org.apache.airavata.mft.credential.stubs.box.BoxSecretGetRequest;
@@ -62,18 +63,26 @@
     public void startStream(ConnectorContext context) throws Exception {
 
         logger.info("Starting Box Sender stream for transfer {}", context.getTransferId());
-        logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
+        logger.debug("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
 
-        BoxFile file = new BoxFile(this.boxClient, this.boxResource.getBoxFileId());
+        if (ResourceTypes.FILE.equals(this.boxResource.getResourceCase().name())) {
+            BoxFile file = new BoxFile(this.boxClient, this.boxResource.getFile().getResourcePath());
 
-        // Upload chunks only if the file size is > 20mb
-        // Ref: https://developer.box.com/guides/uploads/chunked/
-        if (context.getMetadata().getResourceSize() > 20971520) {
-            file.uploadLargeFile(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize());
+            // Upload chunks only if the file size is > 20mb
+            // Ref: https://developer.box.com/guides/uploads/chunked/
+            if (context.getMetadata().getResourceSize() > 20971520) {
+                file.uploadLargeFile(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize());
+            } else {
+                file.uploadNewVersion(context.getStreamBuffer().getInputStream());
+            }
+
+            logger.info("Completed Box Sender stream for transfer {}", context.getTransferId());
+
         } else {
-            file.uploadNewVersion(context.getStreamBuffer().getInputStream());
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.boxResource.getResourceId(), this.boxResource.getResourceCase().name());
+            throw new Exception("Resource " + this.boxResource.getResourceId() + " should be a FILE type. Found a " +
+                    this.boxResource.getResourceCase().name());
         }
-
-        logger.info("Completed Box Sender stream for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
index 5618337..1310c53 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
@@ -21,6 +21,7 @@
 import com.dropbox.core.v2.DbxClientV2;
 import com.dropbox.core.v2.files.FileMetadata;
 import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecret;
 import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecretGetRequest;
@@ -68,7 +69,7 @@
         DbxClientV2 dbxClientV2 = new DbxClientV2(config, dropboxSecret.getAccessToken());
 
         ResourceMetadata metadata = new ResourceMetadata();
-        FileMetadata fileMetadata = (FileMetadata) dbxClientV2.files().getMetadata(dropboxResource.getResourcePath());
+        FileMetadata fileMetadata = (FileMetadata) dbxClientV2.files().getMetadata(dropboxResource.getFile().getResourcePath());
         metadata.setResourceSize(fileMetadata.getSize());
         metadata.setMd5sum(null);
         metadata.setUpdateTime(fileMetadata.getServerModified().getTime());
@@ -88,6 +89,12 @@
         DbxRequestConfig config = DbxRequestConfig.newBuilder("mftdropbox/v1").build();
         DbxClientV2 dbxClientV2 = new DbxClientV2(config, dropboxSecret.getAccessToken());
 
-        return !dbxClientV2.files().searchV2(dropboxResource.getResourcePath()).getMatches().isEmpty();
+        switch (dropboxResource.getResourceCase().name()){
+            case ResourceTypes.FILE:
+                return !dbxClientV2.files().searchV2(dropboxResource.getFile().getResourcePath()).getMatches().isEmpty();
+            case ResourceTypes.DIRECTORY:
+                return !dbxClientV2.files().searchV2(dropboxResource.getDirectory().getResourcePath()).getMatches().isEmpty();
+        }
+        return false;
     }
 }
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java
index d047393..5bdf5a9 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java
@@ -20,6 +20,7 @@
 import com.dropbox.core.DbxRequestConfig;
 import com.dropbox.core.v2.DbxClientV2;
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecret;
 import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecretGetRequest;
@@ -61,38 +62,47 @@
 
     @Override
     public void startStream(ConnectorContext context) throws Exception {
-        logger.info("Starting Dropbox Receiver stream for transfer {}", context.getTransferId());
 
-        InputStream inputStream = dbxClientV2.files().download(this.dropboxResource.getResourcePath()).getInputStream();
-        OutputStream os = context.getStreamBuffer().getOutputStream();
-        int read;
-        long bytes = 0;
-        long fileSize = context.getMetadata().getResourceSize();
-        byte[] buf = new byte[1024];
-        while (true) {
-            int bufSize = 0;
+        if (ResourceTypes.FILE.equals(this.dropboxResource.getResourceCase().name())) {
+            logger.info("Starting Dropbox Receiver stream for transfer {}", context.getTransferId());
 
-            if (buf.length < fileSize) {
-                bufSize = buf.length;
-            } else {
-                bufSize = (int) fileSize;
-            }
-            bufSize = inputStream.read(buf, 0, bufSize);
+            InputStream inputStream = dbxClientV2.files().download(this.dropboxResource.getFile().getResourcePath()).getInputStream();
+            OutputStream os = context.getStreamBuffer().getOutputStream();
+            int read;
+            long bytes = 0;
+            long fileSize = context.getMetadata().getResourceSize();
+            byte[] buf = new byte[1024];
+            while (true) {
+                int bufSize = 0;
 
-            if (bufSize < 0) {
-                break;
+                if (buf.length < fileSize) {
+                    bufSize = buf.length;
+                } else {
+                    bufSize = (int) fileSize;
+                }
+                bufSize = inputStream.read(buf, 0, bufSize);
+
+                if (bufSize < 0) {
+                    break;
+                }
+
+                os.write(buf, 0, bufSize);
+                os.flush();
+
+                fileSize -= bufSize;
+                if (fileSize == 0L)
+                    break;
             }
 
-            os.write(buf, 0, bufSize);
-            os.flush();
+            os.close();
 
-            fileSize -= bufSize;
-            if (fileSize == 0L)
-                break;
+            logger.info("Completed Dropbox Receiver stream for transfer {}", context.getTransferId());
+
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.dropboxResource.getResourceId(), this.dropboxResource.getResourceCase().name());
+            throw new Exception("Resource " + this.dropboxResource.getResourceId() + " should be a FILE type. Found a " +
+                    this.dropboxResource.getResourceCase().name());
         }
-
-        os.close();
-
-        logger.info("Completed Dropbox Receiver stream for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java
index b0d16a7..ffab965 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java
@@ -22,6 +22,7 @@
 import com.dropbox.core.v2.files.FileMetadata;
 import com.dropbox.core.v2.files.WriteMode;
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecret;
 import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecretGetRequest;
@@ -64,10 +65,20 @@
         logger.info("Starting Dropbox Sender stream for transfer {}", context.getTransferId());
         logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
 
-        FileMetadata metadata = dbxClientV2.files().uploadBuilder(dropboxResource.getResourcePath())
-                .withMode(WriteMode.OVERWRITE)
-                .uploadAndFinish(context.getStreamBuffer().getInputStream());
 
-        logger.info("Completed Dropbox Sender stream for transfer {}", context.getTransferId());
+
+        if (ResourceTypes.FILE.equals(this.dropboxResource.getResourceCase().name())) {
+            FileMetadata metadata = dbxClientV2.files().uploadBuilder(dropboxResource.getFile().getResourcePath())
+                    .withMode(WriteMode.OVERWRITE)
+                    .uploadAndFinish(context.getStreamBuffer().getInputStream());
+            logger.info("Completed Dropbox Sender stream for transfer {}", context.getTransferId());
+
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.dropboxResource.getResourceId(), this.dropboxResource.getResourceCase().name());
+            throw new Exception("Resource " + this.dropboxResource.getResourceId() + " should be a FILE type. Found a " +
+                    this.dropboxResource.getResourceCase().name());
+        }
+
     }
 }
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
index 1f3ba45..9b71dc8 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
@@ -18,6 +18,7 @@
 package org.apache.airavata.mft.transport.ftp;
 
 import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.ftp.FTPSecret;
 import org.apache.airavata.mft.credential.stubs.ftp.FTPSecretGetRequest;
@@ -73,14 +74,14 @@
         FTPClient ftpClient = null;
         try {
             ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret);
-            logger.info("Fetching metadata for resource {} in {}", ftpResource.getResourcePath(), ftpResource.getFtpStorage().getHost());
+            logger.info("Fetching metadata for resource {} in {}", ftpResource.getFile().getResourcePath(), ftpResource.getFtpStorage().getHost());
 
-            FTPFile ftpFile = ftpClient.mlistFile(ftpResource.getResourcePath());
+            FTPFile ftpFile = ftpClient.mlistFile(ftpResource.getFile().getResourcePath());
 
             if (ftpFile != null) {
                 resourceMetadata.setResourceSize(ftpFile.getSize());
                 resourceMetadata.setUpdateTime(ftpFile.getTimestamp().getTimeInMillis());
-                if (ftpClient.hasFeature("MD5") && FTPReply.isPositiveCompletion(ftpClient.sendCommand("MD5 " + ftpResource.getResourcePath()))) {
+                if (ftpClient.hasFeature("MD5") && FTPReply.isPositiveCompletion(ftpClient.sendCommand("MD5 " + ftpResource.getFile().getResourcePath()))) {
                     String[] replies = ftpClient.getReplyStrings();
                     resourceMetadata.setMd5sum(replies[0]);
                 } else {
@@ -109,8 +110,14 @@
         FTPClient ftpClient = null;
         try {
             ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret);
-            InputStream inputStream = ftpClient.retrieveFileStream(ftpResource.getResourcePath());
+            InputStream inputStream = null;
 
+            switch (ftpResource.getResourceCase().name()){
+                case ResourceTypes.FILE:
+                    inputStream = ftpClient.retrieveFileStream(ftpResource.getFile().getResourcePath());
+                case ResourceTypes.DIRECTORY:
+                    inputStream = ftpClient.retrieveFileStream(ftpResource.getDirectory().getResourcePath());
+            }
             return !(inputStream == null || ftpClient.getReplyCode() == 550);
         } catch (Exception e) {
             logger.error("FTP client initialization failed ", e);
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
index 6ced168..9225b0e 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
@@ -18,6 +18,7 @@
 package org.apache.airavata.mft.transport.ftp;
 
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.ftp.FTPSecret;
 import org.apache.airavata.mft.credential.stubs.ftp.FTPSecretGetRequest;
@@ -62,40 +63,49 @@
 
     @Override
     public void startStream(ConnectorContext context) throws Exception {
-        logger.info("Starting FTP receiver stream for transfer {}", context.getTransferId());
 
-        checkInitialized();
-        OutputStream streamOs = context.getStreamBuffer().getOutputStream();
-        InputStream inputStream = ftpClient.retrieveFileStream(resource.getResourcePath());
+        if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
+            logger.info("Starting FTP receiver stream for transfer {}", context.getTransferId());
 
-        long fileSize = context.getMetadata().getResourceSize();
+            checkInitialized();
+            OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+            InputStream inputStream = ftpClient.retrieveFileStream(resource.getFile().getResourcePath());
 
-        byte[] buf = new byte[1024];
-        while (true) {
-            int bufSize;
+            long fileSize = context.getMetadata().getResourceSize();
 
-            if (buf.length < fileSize) {
-                bufSize = buf.length;
-            } else {
-                bufSize = (int) fileSize;
-            }
-            bufSize = inputStream.read(buf, 0, bufSize);
+            byte[] buf = new byte[1024];
+            while (true) {
+                int bufSize;
 
-            if (bufSize < 0) {
-                break;
+                if (buf.length < fileSize) {
+                    bufSize = buf.length;
+                } else {
+                    bufSize = (int) fileSize;
+                }
+                bufSize = inputStream.read(buf, 0, bufSize);
+
+                if (bufSize < 0) {
+                    break;
+                }
+
+                streamOs.write(buf, 0, bufSize);
+                streamOs.flush();
+
+                fileSize -= bufSize;
+                if (fileSize == 0L)
+                    break;
             }
 
-            streamOs.write(buf, 0, bufSize);
-            streamOs.flush();
+            inputStream.close();
+            streamOs.close();
+            logger.info("Completed FTP receiver stream for transfer {}", context.getTransferId());
 
-            fileSize -= bufSize;
-            if (fileSize == 0L)
-                break;
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.resource.getResourceId(), this.resource.getResourceCase().name());
+            throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
+                    this.resource.getResourceCase().name());
         }
-
-        inputStream.close();
-        streamOs.close();
-        logger.info("Completed FTP receiver stream for transfer {}", context.getTransferId());
     }
 
     private void checkInitialized() {
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
index d9dbf06..f3a55f7 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
@@ -18,6 +18,7 @@
 package org.apache.airavata.mft.transport.ftp;
 
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.ftp.FTPSecret;
 import org.apache.airavata.mft.credential.stubs.ftp.FTPSecretGetRequest;
@@ -66,38 +67,45 @@
         logger.info("Starting FTP sender stream for transfer {}", context.getTransferId());
 
         checkInitialized();
-        InputStream in = context.getStreamBuffer().getInputStream();
-        long fileSize = context.getMetadata().getResourceSize();
-        OutputStream outputStream = ftpClient.storeFileStream(resource.getResourcePath());
-
-        byte[] buf = new byte[1024];
-        while (true) {
-            int bufSize;
-
-            if (buf.length < fileSize) {
-                bufSize = buf.length;
-            } else {
-                bufSize = (int) fileSize;
-            }
-            bufSize = in.read(buf, 0, bufSize);
-
-            if (bufSize < 0) {
-                break;
-            }
-
-            outputStream.write(buf, 0, bufSize);
-            outputStream.flush();
-
-            fileSize -= bufSize;
-            if (fileSize == 0L)
-                break;
-        }
-
-        in.close();
-        outputStream.close();
 
         logger.info("Completed FTP sender stream for transfer {}", context.getTransferId());
 
+        if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
+            InputStream in = context.getStreamBuffer().getInputStream();
+            long fileSize = context.getMetadata().getResourceSize();
+            OutputStream outputStream = ftpClient.storeFileStream(resource.getFile().getResourcePath());
+
+            byte[] buf = new byte[1024];
+            while (true) {
+                int bufSize;
+
+                if (buf.length < fileSize) {
+                    bufSize = buf.length;
+                } else {
+                    bufSize = (int) fileSize;
+                }
+                bufSize = in.read(buf, 0, bufSize);
+
+                if (bufSize < 0) {
+                    break;
+                }
+
+                outputStream.write(buf, 0, bufSize);
+                outputStream.flush();
+
+                fileSize -= bufSize;
+                if (fileSize == 0L)
+                    break;
+            }
+
+            in.close();
+            outputStream.close();
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.resource.getResourceId(), this.resource.getResourceCase().name());
+            throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
+                    this.resource.getResourceCase().name());
+        }
     }
 
     private void checkInitialized() {
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
index d8d0a6b..0f8e634 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
@@ -26,6 +26,7 @@
 import com.google.api.services.storage.StorageScopes;
 import com.google.api.services.storage.model.StorageObject;
 import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.gcs.GCSSecret;
 import org.apache.airavata.mft.credential.stubs.gcs.GCSSecretGetRequest;
@@ -86,7 +87,7 @@
         Storage storage = new Storage.Builder(transport, jsonFactory, credential).build();
 
         ResourceMetadata metadata = new ResourceMetadata();
-        StorageObject gcsMetadata = storage.objects().get(gcsResource.getBucketName(), gcsResource.getResourcePath()).execute();
+        StorageObject gcsMetadata = storage.objects().get(gcsResource.getBucketName(), gcsResource.getFile().getResourcePath()).execute();
         metadata.setResourceSize(gcsMetadata.getSize().longValue());
         String md5Sum = String.format("%032x", new BigInteger(1, Base64.getDecoder().decode(gcsMetadata.getMd5Hash())));
         metadata.setMd5sum(md5Sum);
@@ -114,6 +115,12 @@
         }
 
         Storage storage = new Storage.Builder(transport, jsonFactory, credential).build();
-        return !storage.objects().get(gcsResource.getBucketName(), gcsResource.getResourcePath()).execute().isEmpty();
+        switch (gcsResource.getResourceCase().name()){
+            case ResourceTypes.FILE:
+                return !storage.objects().get(gcsResource.getBucketName(), gcsResource.getFile().getResourcePath()).execute().isEmpty();
+            case ResourceTypes.DIRECTORY:
+                return !storage.objects().get(gcsResource.getBucketName(), gcsResource.getDirectory().getResourcePath()).execute().isEmpty();
+        }
+        return false;
     }
 }
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java
index 4261e40..5a1bc63 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java
@@ -25,6 +25,7 @@
 import com.google.api.services.storage.Storage;
 import com.google.api.services.storage.StorageScopes;
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.gcs.GCSSecret;
 import org.apache.airavata.mft.credential.stubs.gcs.GCSSecretGetRequest;
@@ -78,36 +79,45 @@
     public void startStream(ConnectorContext context) throws Exception {
         logger.info("Starting GCS Receiver stream for transfer {}", context.getTransferId());
 
-        InputStream inputStream = storage.objects().get(this.gcsResource.getBucketName(), this.gcsResource.getResourcePath()).executeMediaAsInputStream();
-        OutputStream os = context.getStreamBuffer().getOutputStream();
-        int read;
-        long bytes = 0;
-        long fileSize = context.getMetadata().getResourceSize();
-        byte[] buf = new byte[1024];
-        while (true) {
-            int bufSize = 0;
+        if (ResourceTypes.FILE.equals(this.gcsResource.getResourceCase().name())) {
+            InputStream inputStream = storage.objects().get(this.gcsResource.getBucketName(),
+                                        this.gcsResource.getFile().getResourcePath()).executeMediaAsInputStream();
+            OutputStream os = context.getStreamBuffer().getOutputStream();
+            int read;
+            long bytes = 0;
+            long fileSize = context.getMetadata().getResourceSize();
+            byte[] buf = new byte[1024];
+            while (true) {
+                int bufSize = 0;
 
-            if (buf.length < fileSize) {
-                bufSize = buf.length;
-            } else {
-                bufSize = (int) fileSize;
-            }
-            bufSize = inputStream.read(buf, 0, bufSize);
+                if (buf.length < fileSize) {
+                    bufSize = buf.length;
+                } else {
+                    bufSize = (int) fileSize;
+                }
+                bufSize = inputStream.read(buf, 0, bufSize);
 
-            if (bufSize < 0) {
-                break;
+                if (bufSize < 0) {
+                    break;
+                }
+
+                os.write(buf, 0, bufSize);
+                os.flush();
+
+                fileSize -= bufSize;
+                if (fileSize == 0L)
+                    break;
             }
 
-            os.write(buf, 0, bufSize);
-            os.flush();
+            os.close();
 
-            fileSize -= bufSize;
-            if (fileSize == 0L)
-                break;
+            logger.info("Completed GCS Receiver stream for transfer {}", context.getTransferId());
+
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.gcsResource.getResourceId(), this.gcsResource.getResourceCase().name());
+            throw new Exception("Resource " + this.gcsResource.getResourceId() + " should be a FILE type. Found a " +
+                    this.gcsResource.getResourceCase().name());
         }
-
-        os.close();
-
-        logger.info("Completed GCS Receiver stream for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java
index 79768de..7326545 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java
@@ -31,6 +31,7 @@
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.gcs.GCSSecret;
 import org.apache.airavata.mft.credential.stubs.gcs.GCSSecretGetRequest;
@@ -87,22 +88,31 @@
     @Override
     public void startStream(ConnectorContext context) throws Exception {
         logger.info("Starting GCS Sender stream for transfer {}", context.getTransferId());
-        logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
+        logger.debug("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
 
-        InputStreamContent contentStream = new InputStreamContent(
-                null, context.getStreamBuffer().getInputStream());
-        String entityUser = jsonObject.get("client_email").getAsString();
-        StorageObject objectMetadata = new StorageObject()
-                // Set the destination object name
-                .setName(this.gcsResource.getResourcePath())
-                // Set the access control list to publicly read-only
-                .setAcl(Arrays.asList(new ObjectAccessControl().setEntity("user-" + entityUser).setRole("OWNER")));
+        if (ResourceTypes.FILE.equals(this.gcsResource.getResourceCase().name())) {
 
-        Insert insertRequest = storage.objects().insert(
-                this.gcsResource.getBucketName(), objectMetadata, contentStream);
+            InputStreamContent contentStream = new InputStreamContent(
+                    null, context.getStreamBuffer().getInputStream());
+            String entityUser = jsonObject.get("client_email").getAsString();
+            StorageObject objectMetadata = new StorageObject()
+                    // Set the destination object name
+                    .setName(this.gcsResource.getFile().getResourcePath())
+                    // Set the access control list to publicly read-only
+                    .setAcl(Arrays.asList(new ObjectAccessControl().setEntity("user-" + entityUser).setRole("OWNER")));
 
-        insertRequest.execute();
+            Insert insertRequest = storage.objects().insert(this.gcsResource.getBucketName(), objectMetadata, contentStream);
 
-        logger.info("Completed GCS Sender stream for transfer {}", context.getTransferId());
+            insertRequest.execute();
+
+            logger.info("Completed GCS Sender stream for transfer {}", context.getTransferId());
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.gcsResource.getResourceId(), this.gcsResource.getResourceCase().name());
+            throw new Exception("Resource " + this.gcsResource.getResourceId() + " should be a FILE type. Found a " +
+                    this.gcsResource.getResourceCase().name());
+        }
+
+
     }
 }
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
index e28c42c..ba21a2c 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
@@ -18,6 +18,7 @@
 package org.apache.airavata.mft.transport.local;
 
 import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
@@ -59,17 +60,17 @@
 
         ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
         LocalResource localResource = resourceClient.local().getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
-        File resourceFile = new File(localResource.getResourcePath());
+        File resourceFile = new File(localResource.getFile().getResourcePath());
         if (resourceFile.exists()) {
 
-            BasicFileAttributes basicFileAttributes = Files.readAttributes(Path.of(localResource.getResourcePath()), BasicFileAttributes.class);
+            BasicFileAttributes basicFileAttributes = Files.readAttributes(Path.of(localResource.getFile().getResourcePath()), BasicFileAttributes.class);
             ResourceMetadata metadata = new ResourceMetadata();
             metadata.setCreatedTime(basicFileAttributes.creationTime().toMillis());
             metadata.setUpdateTime(basicFileAttributes.lastModifiedTime().toMillis());
             metadata.setResourceSize(basicFileAttributes.size());
 
             MessageDigest digest = MessageDigest.getInstance("MD5");
-            FileInputStream fis = new FileInputStream(localResource.getResourcePath());
+            FileInputStream fis = new FileInputStream(localResource.getFile().getResourcePath());
             byte[] byteArray = new byte[1024];
             int bytesCount = 0;
             while ((bytesCount = fis.read(byteArray)) != -1) {
@@ -85,7 +86,7 @@
 
             return metadata;
         } else {
-            throw new Exception("Resource with id " + resourceId + " in path " + localResource.getResourcePath() + " does not exist");
+            throw new Exception("Resource with id " + resourceId + " in path " + localResource.getFile().getResourcePath() + " does not exist");
         }
     }
 
@@ -93,7 +94,13 @@
     public Boolean isAvailable(String resourceId, String credentialToken) throws Exception {
        ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
         LocalResource localResource = resourceClient.local().getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
-        File resourceFile = new File(localResource.getResourcePath());
-        return resourceFile.exists();
+
+        switch (localResource.getResourceCase().name()){
+            case ResourceTypes.FILE:
+                return new File(localResource.getFile().getResourcePath()).exists();
+            case ResourceTypes.DIRECTORY:
+                return new File(localResource.getDirectory().getResourcePath()).exists();
+        }
+        return false;
     }
 }
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
index 06d34fc..1b61fb1 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
@@ -18,6 +18,7 @@
 package org.apache.airavata.mft.transport.local;
 
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
@@ -60,36 +61,46 @@
         logger.info("Starting local receiver stream for transfer {}", context.getTransferId());
 
         checkInitialized();
-        OutputStream streamOs = context.getStreamBuffer().getOutputStream();
-        FileInputStream fis = new FileInputStream(new File(resource.getResourcePath()));
 
-        long fileSize = context.getMetadata().getResourceSize();
 
-        byte[] buf = new byte[1024];
-        while (true) {
-            int bufSize = 0;
+        if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
+            OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+            FileInputStream fis = new FileInputStream(new File(resource.getFile().getResourcePath()));
 
-            if (buf.length < fileSize) {
-                bufSize = buf.length;
-            } else {
-                bufSize = (int) fileSize;
-            }
-            bufSize = fis.read(buf, 0, bufSize);
+            long fileSize = context.getMetadata().getResourceSize();
 
-            if (bufSize < 0) {
-                break;
+            byte[] buf = new byte[1024];
+            while (true) {
+                int bufSize = 0;
+
+                if (buf.length < fileSize) {
+                    bufSize = buf.length;
+                } else {
+                    bufSize = (int) fileSize;
+                }
+                bufSize = fis.read(buf, 0, bufSize);
+
+                if (bufSize < 0) {
+                    break;
+                }
+
+                streamOs.write(buf, 0, bufSize);
+                streamOs.flush();
+
+                fileSize -= bufSize;
+                if (fileSize == 0L)
+                    break;
             }
 
-            streamOs.write(buf, 0, bufSize);
-            streamOs.flush();
+            fis.close();
+            streamOs.close();
+            logger.info("Completed local receiver stream for transfer {}", context.getTransferId());
 
-            fileSize -= bufSize;
-            if (fileSize == 0L)
-                break;
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.resource.getResourceId(), this.resource.getResourceCase().name());
+            throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
+                    this.resource.getResourceCase().name());
         }
-
-        fis.close();
-        streamOs.close();
-        logger.info("Completed local receiver stream for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
index 17baf35..315fa25 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
@@ -18,6 +18,7 @@
 package org.apache.airavata.mft.transport.local;
 
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.resource.client.ResourceServiceClient;
 import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
@@ -61,37 +62,44 @@
         logger.info("Starting local sender stream for transfer {}", context.getTransferId());
 
         checkInitialized();
-        InputStream in = context.getStreamBuffer().getInputStream();
-        long fileSize = context.getMetadata().getResourceSize();
-        OutputStream fos = new FileOutputStream(resource.getResourcePath());
 
-        byte[] buf = new byte[1024];
-        while (true) {
-            int bufSize = 0;
+        if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
+            InputStream in = context.getStreamBuffer().getInputStream();
+            long fileSize = context.getMetadata().getResourceSize();
+            OutputStream fos = new FileOutputStream(resource.getFile().getResourcePath());
 
-            if (buf.length < fileSize) {
-                bufSize = buf.length;
-            } else {
-                bufSize = (int) fileSize;
-            }
-            bufSize = in.read(buf, 0, bufSize);
+            byte[] buf = new byte[1024];
+            while (true) {
+                int bufSize = 0;
 
-            if (bufSize < 0) {
-                break;
+                if (buf.length < fileSize) {
+                    bufSize = buf.length;
+                } else {
+                    bufSize = (int) fileSize;
+                }
+                bufSize = in.read(buf, 0, bufSize);
+
+                if (bufSize < 0) {
+                    break;
+                }
+
+                fos.write(buf, 0, bufSize);
+                fos.flush();
+
+                fileSize -= bufSize;
+                if (fileSize == 0L)
+                    break;
             }
 
-            fos.write(buf, 0, bufSize);
-            fos.flush();
+            in.close();
+            fos.close();
 
-            fileSize -= bufSize;
-            if (fileSize == 0L)
-                break;
+            logger.info("Completed local sender stream for transfer {}", context.getTransferId());
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.resource.getResourceId(), this.resource.getResourceCase().name());
+            throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
+                    this.resource.getResourceCase().name());
         }
-
-        in.close();
-        fos.close();
-
-        logger.info("Completed local sender stream for transfer {}", context.getTransferId());
-
     }
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
index 6203306..56d98f4 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
@@ -23,6 +23,7 @@
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
 import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
@@ -74,7 +75,7 @@
                 .build();
 
         ResourceMetadata metadata = new ResourceMetadata();
-        ObjectMetadata s3Metadata = s3Client.getObjectMetadata(s3Resource.getBucketName(), s3Resource.getResourcePath());
+        ObjectMetadata s3Metadata = s3Client.getObjectMetadata(s3Resource.getBucketName(), s3Resource.getFile().getResourcePath());
         metadata.setResourceSize(s3Metadata.getContentLength());
         metadata.setMd5sum(s3Metadata.getETag());
         metadata.setUpdateTime(s3Metadata.getLastModified().getTime());
@@ -99,6 +100,12 @@
                 .withRegion(s3Resource.getRegion())
                 .build();
 
-        return  s3Client.doesObjectExist(s3Resource.getBucketName(), s3Resource.getResourcePath());
+        switch (s3Resource.getResourceCase().name()){
+            case ResourceTypes.FILE:
+                return s3Client.doesObjectExist(s3Resource.getBucketName(), s3Resource.getFile().getResourcePath());
+            case ResourceTypes.DIRECTORY:
+                return s3Client.doesObjectExist(s3Resource.getBucketName(), s3Resource.getDirectory().getResourcePath());
+        }
+        return false;
     }
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
index 0ac7191..7e8d42b 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
@@ -24,6 +24,7 @@
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
 import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
@@ -70,21 +71,29 @@
     @Override
     public void startStream(ConnectorContext context) throws Exception {
 
-        logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
+        if (ResourceTypes.FILE.equals(this.s3Resource.getResourceCase().name())) {
+            logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
 
-        S3Object s3object = s3Client.getObject(s3Resource.getBucketName(), s3Resource.getResourcePath());
-        S3ObjectInputStream inputStream = s3object.getObjectContent();
+            S3Object s3object = s3Client.getObject(s3Resource.getBucketName(), s3Resource.getFile().getResourcePath());
+            S3ObjectInputStream inputStream = s3object.getObjectContent();
 
-        OutputStream os = context.getStreamBuffer().getOutputStream();
-        int read;
-        long bytes = 0;
-        while ((read = inputStream.read()) != -1) {
-            bytes++;
-            os.write(read);
+            OutputStream os = context.getStreamBuffer().getOutputStream();
+            int read;
+            long bytes = 0;
+            while ((read = inputStream.read()) != -1) {
+                bytes++;
+                os.write(read);
+            }
+            os.flush();
+            os.close();
+
+            logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
+
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.s3Resource.getResourceId(), this.s3Resource.getResourceCase().name());
+            throw new Exception("Resource " + this.s3Resource.getResourceId() + " should be a FILE type. Found a " +
+                    this.s3Resource.getResourceCase().name());
         }
-        os.flush();
-        os.close();
-
-        logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
     }
 }
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
index 70b32e4..f0faeb2 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
@@ -23,6 +23,7 @@
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
 import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
@@ -70,8 +71,17 @@
         logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
         ObjectMetadata metadata = new ObjectMetadata();
         metadata.setContentLength(context.getMetadata().getResourceSize());
-        s3Client.putObject(this.s3Resource.getBucketName(), this.s3Resource.getResourcePath(), context.getStreamBuffer().getInputStream(), metadata);
 
-        logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
+        if (ResourceTypes.FILE.equals(this.s3Resource.getResourceCase().name())) {
+            s3Client.putObject(this.s3Resource.getBucketName(), this.s3Resource.getFile().getResourcePath(),
+                                                        context.getStreamBuffer().getInputStream(), metadata);
+            logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                    this.s3Resource.getResourceId(), this.s3Resource.getResourceCase().name());
+            throw new Exception("Resource " + this.s3Resource.getResourceId() + " should be a FILE type. Found a " +
+                    this.s3Resource.getResourceCase().name());
+        }
+
     }
 }
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
index e167ed6..2c1ac22 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
@@ -28,6 +28,7 @@
 import net.schmizz.sshj.userauth.method.ChallengeResponseProvider;
 import net.schmizz.sshj.userauth.password.Resource;
 import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.MetadataCollector;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
@@ -82,10 +83,10 @@
 
         try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
 
-            logger.info("Fetching metadata for resource {} in {}", scpResource.getResourcePath(), scpResource.getScpStorage().getHost());
+            logger.info("Fetching metadata for resource {} in {}", scpResource.getFile().getResourcePath(), scpResource.getScpStorage().getHost());
 
             try (SFTPClient sftpClient = sshClient.newSFTPClient()) {
-                FileAttributes lstat = sftpClient.lstat(scpResource.getResourcePath());
+                FileAttributes lstat = sftpClient.lstat(scpResource.getFile().getResourcePath());
                 sftpClient.close();
 
                 ResourceMetadata metadata = new ResourceMetadata();
@@ -96,7 +97,7 @@
                 try {
                     // TODO calculate md5 using the binary based on the OS platform. Eg: MacOS has md5. Linux has md5sum
                     // This only works for linux SCP resources. Improve to work in mac and windows resources
-                    Session.Command md5Command = sshClient.startSession().exec("md5sum " + scpResource.getResourcePath());
+                    Session.Command md5Command = sshClient.startSession().exec("md5sum " + scpResource.getFile().getResourcePath());
                     StringWriter outWriter = new StringWriter();
                     StringWriter errorWriter = new StringWriter();
 
@@ -129,9 +130,15 @@
         SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
 
         try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
-            logger.info("Checking the availability of file {}", scpResource.getResourcePath());
+            logger.info("Checking the availability of file {}", scpResource.getFile().getResourcePath());
             try (SFTPClient sftpClient = sshClient.newSFTPClient()) {
-                return sftpClient.statExistence(scpResource.getResourcePath()) != null;
+                switch (scpResource.getResourceCase().name()){
+                    case ResourceTypes.FILE:
+                        return sftpClient.statExistence(scpResource.getFile().getResourcePath()) != null;
+                    case ResourceTypes.DIRECTORY:
+                        return sftpClient.statExistence(scpResource.getDirectory().getResourcePath()) != null;
+                }
+                return false;
             }
         }
     }
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
index e867a99..127ea8c 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
@@ -21,6 +21,7 @@
 import com.jcraft.jsch.Session;
 import org.apache.airavata.mft.core.ConnectorContext;
 import org.apache.airavata.mft.core.DoubleStreamingBuffer;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
@@ -77,12 +78,20 @@
     public void startStream(ConnectorContext context) throws Exception {
         checkInitialized();
         if (session == null) {
-            System.out.println("Session can not be null. Make sure that SCP Receiver is properly initialized");
+            logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
             throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
         }
 
-        transferRemoteToStream(session, this.scpResource.getResourcePath(), context.getStreamBuffer());
-        logger.info("SCP Receive completed. Transfer {}", context.getTransferId());
+        if (ResourceTypes.FILE.equals(this.scpResource.getResourceCase().name())) {
+            transferRemoteToStream(session, this.scpResource.getFile().getResourcePath(), context.getStreamBuffer());
+            logger.info("SCP Receive completed. Transfer {}", context.getTransferId());
+
+        } else {
+            logger.error("Resource {} should be a FILE type. Found a {}",
+                                            this.scpResource.getResourceId(), this.scpResource.getResourceCase().name());
+            throw new Exception("Resource " + this.scpResource.getResourceId() + " should be a FILE type. Found a " +
+                                                                            this.scpResource.getResourceCase().name());
+        }
     }
 
     private void transferRemoteToStream(Session session, String from, DoubleStreamingBuffer streamBuffer) throws Exception {
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
index 27fcf89..b552f0e 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
@@ -22,6 +22,7 @@
 import com.jcraft.jsch.Session;
 import org.apache.airavata.mft.core.ConnectorContext;
 import org.apache.airavata.mft.core.DoubleStreamingBuffer;
+import org.apache.airavata.mft.core.ResourceTypes;
 import org.apache.airavata.mft.core.api.Connector;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
 import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
@@ -92,8 +93,17 @@
             throw new Exception("Session can not be null. Make sure that SCP Sender is properly initialized");
         }
         try {
-            copyLocalToRemote(this.session, this.scpResource.getResourcePath(), context.getStreamBuffer(), context.getMetadata().getResourceSize());
-            logger.info("SCP send to transfer {} completed", context.getTransferId());
+            if (ResourceTypes.FILE.equals(this.scpResource.getResourceCase().name())) {
+                copyLocalToRemote(this.session, this.scpResource.getFile().getResourcePath(), context.getStreamBuffer(), context.getMetadata().getResourceSize());
+                logger.info("SCP send to transfer {} completed", context.getTransferId());
+
+            } else {
+                logger.error("Resource {} should be a FILE type. Found a {}",
+                        this.scpResource.getResourceId(), this.scpResource.getResourceCase().name());
+                throw new Exception("Resource " + this.scpResource.getResourceId() + " should be a FILE type. Found a " +
+                        this.scpResource.getResourceCase().name());
+            }
+
         } catch (Exception e) {
             logger.error("Errored while streaming to remote scp server. Transfer {}", context.getTransferId() , e);
             throw e;