Introducing file resource and directory resource types
diff --git a/core/src/main/java/org/apache/airavata/mft/core/ResourceTypes.java b/core/src/main/java/org/apache/airavata/mft/core/ResourceTypes.java
new file mode 100644
index 0000000..cfa92d2
--- /dev/null
+++ b/core/src/main/java/org/apache/airavata/mft/core/ResourceTypes.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.core;
+
+public final class ResourceTypes {
+ public static final String FILE = "FILE";
+ public static final String DIRECTORY = "DIRECTORY";
+}
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
index 36466a6..6299a07 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/airavata/AiravataResourceBackend.java
@@ -20,6 +20,7 @@
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.stubs.azure.resource.*;
import org.apache.airavata.mft.resource.stubs.box.resource.*;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
import org.apache.airavata.mft.resource.stubs.ftp.resource.*;
import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
@@ -129,7 +130,7 @@
SCPResource scpResource = SCPResource.newBuilder()
.setResourceId(resourceId)
- .setResourcePath(path)
+ .setFile(FileResource.newBuilder().setResourcePath(path).build())
.setScpStorage(getSCPStorage(SCPStorageGetRequest.newBuilder().setStorageId(resourceId).build()).get())
.build();
return Optional.of(scpResource);
diff --git a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
index d445998..151f868 100644
--- a/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
+++ b/services/resource-service/server/src/main/java/org/apache/airavata/mft/resource/server/backend/file/FileBasedResourceBackend.java
@@ -20,6 +20,8 @@
import org.apache.airavata.mft.resource.server.backend.ResourceBackend;
import org.apache.airavata.mft.resource.stubs.azure.resource.*;
import org.apache.airavata.mft.resource.stubs.box.resource.*;
+import org.apache.airavata.mft.resource.stubs.common.DirectoryResource;
+import org.apache.airavata.mft.resource.stubs.common.FileResource;
import org.apache.airavata.mft.resource.stubs.dropbox.resource.*;
import org.apache.airavata.mft.resource.stubs.ftp.resource.*;
import org.apache.airavata.mft.resource.stubs.ftp.storage.*;
@@ -106,12 +108,22 @@
.setUser(((JSONObject)r.get("scpStorage")).get("user").toString())
.setPort(Integer.parseInt(((JSONObject)r.get("scpStorage")).get("port").toString())).build();
- SCPResource scpResource = SCPResource.newBuilder()
- .setResourcePath(r.get("resourcePath").toString())
+ SCPResource.Builder builder = SCPResource.newBuilder()
.setResourceId(r.get("resourceId").toString())
- .setScpStorage(storage).build();
+ .setScpStorage(storage);
- return scpResource;
+ switch (r.get("resourceMode").toString()) {
+ case "FILE":
+ FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setFile(fileResource);
+ break;
+ case "DIRECTORY":
+ DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setDirectory(directoryResource);
+ break;
+ }
+ return builder.build();
+
}).collect(Collectors.toList());
return scpResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
@@ -152,11 +164,19 @@
.map(resource -> {
JSONObject r = (JSONObject) resource;
- LocalResource localResource = LocalResource.newBuilder()
- .setResourcePath(r.get("resourcePath").toString())
- .setResourceId(r.get("resourceId").toString()).build();
+ LocalResource.Builder builder = LocalResource.newBuilder().setResourceId(r.get("resourceId").toString());
- return localResource;
+ switch (r.get("resourceMode").toString()) {
+ case "FILE":
+ FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setFile(fileResource);
+ break;
+ case "DIRECTORY":
+ DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setDirectory(directoryResource);
+ break;
+ }
+ return builder.build();
}).collect(Collectors.toList());
return localResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
}
@@ -195,14 +215,22 @@
.map(resource -> {
JSONObject r = (JSONObject) resource;
- S3Resource s3Resource = S3Resource.newBuilder()
- .setResourcePath(r.get("resourcePath").toString())
+ S3Resource.Builder builder = S3Resource.newBuilder()
.setResourceId(r.get("resourceId").toString())
.setBucketName(r.get("bucketName").toString())
- .setRegion(r.get("region").toString())
- .build();
+ .setRegion(r.get("region").toString());
- return s3Resource;
+ switch (r.get("resourceMode").toString()) {
+ case "FILE":
+ FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setFile(fileResource);
+ break;
+ case "DIRECTORY":
+ DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setDirectory(directoryResource);
+ break;
+ }
+ return builder.build();
}).collect(Collectors.toList());
return s3Resources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
}
@@ -243,12 +271,20 @@
.map(resource -> {
JSONObject r = (JSONObject) resource;
- BoxResource boxResource = BoxResource.newBuilder()
- .setResourceId(r.get("resourceId").toString())
- .setBoxFileId(r.get("boxFileId").toString())
- .build();
+ BoxResource.Builder builder = BoxResource.newBuilder()
+ .setResourceId(r.get("resourceId").toString());
- return boxResource;
+ switch (r.get("resourceMode").toString()) {
+ case "FILE":
+ FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setFile(fileResource);
+ break;
+ case "DIRECTORY":
+ DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setDirectory(directoryResource);
+ break;
+ }
+ return builder.build();
}).collect(Collectors.toList());
return boxResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
}
@@ -285,13 +321,22 @@
.map(resource -> {
JSONObject r = (JSONObject) resource;
- AzureResource azureResource = AzureResource.newBuilder()
- .setBlobName(r.get("blobName").toString())
+ AzureResource.Builder builder = AzureResource.newBuilder()
.setContainer(r.get("container").toString())
- .setResourceId(r.get("resourceId").toString())
- .build();
+ .setResourceId(r.get("resourceId").toString());
- return azureResource;
+ switch (r.get("resourceMode").toString()) {
+ case "FILE":
+ FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setFile(fileResource);
+ break;
+ case "DIRECTORY":
+ DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setDirectory(directoryResource);
+ break;
+ }
+ return builder.build();
+
}).collect(Collectors.toList());
return azureResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
}
@@ -327,13 +372,22 @@
.map(resource -> {
JSONObject r = (JSONObject) resource;
- GCSResource gcsResource = GCSResource.newBuilder()
+ GCSResource.Builder builder = GCSResource.newBuilder()
.setBucketName(r.get("bucketName").toString())
- .setResourceId(r.get("resourceId").toString())
- .setResourcePath(r.get("resourcePath").toString())
- .build();
+ .setResourceId(r.get("resourceId").toString());
- return gcsResource;
+ switch (r.get("resourceMode").toString()) {
+ case "FILE":
+ FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setFile(fileResource);
+ break;
+ case "DIRECTORY":
+ DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setDirectory(directoryResource);
+ break;
+ }
+ return builder.build();
+
}).collect(Collectors.toList());
return gcsResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
}
@@ -369,12 +423,21 @@
JSONObject r = (JSONObject) resource;
String resourcePath = r.get("resourcePath").toString();
resourcePath = resourcePath.startsWith("/") ? resourcePath : "/" + resourcePath;
- DropboxResource dropboxResource = DropboxResource.newBuilder()
- .setResourceId(r.get("resourceId").toString())
- .setResourcePath(resourcePath)
- .build();
+ DropboxResource.Builder builder = DropboxResource.newBuilder()
+ .setResourceId(r.get("resourceId").toString());
- return dropboxResource;
+ switch (r.get("resourceMode").toString()) {
+ case "FILE":
+ FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setFile(fileResource);
+ break;
+ case "DIRECTORY":
+ DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setDirectory(directoryResource);
+ break;
+ }
+ return builder.build();
+
}).collect(Collectors.toList());
return dropboxResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
}
@@ -421,10 +484,22 @@
.setHost(((JSONObject)r.get("ftpStorage")).get("host").toString())
.setPort(Integer.parseInt(((JSONObject)r.get("ftpStorage")).get("port").toString())).build();
- return FTPResource.newBuilder()
- .setResourcePath(r.get("resourcePath").toString())
+ FTPResource.Builder builder = FTPResource.newBuilder()
.setResourceId(r.get("resourceId").toString())
- .setFtpStorage(storage).build();
+ .setFtpStorage(storage);
+
+ switch (r.get("resourceMode").toString()) {
+ case "FILE":
+ FileResource fileResource = FileResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setFile(fileResource);
+ break;
+ case "DIRECTORY":
+ DirectoryResource directoryResource = DirectoryResource.newBuilder().setResourcePath(r.get("resourcePath").toString()).build();
+ builder = builder.setDirectory(directoryResource);
+ break;
+ }
+ return builder.build();
+
}).collect(Collectors.toList());
return ftpResources.stream().filter(r -> request.getResourceId().equals(r.getResourceId())).findFirst();
diff --git a/services/resource-service/server/src/main/resources/resources.json b/services/resource-service/server/src/main/resources/resources.json
index 5c960fb..9d430db 100644
--- a/services/resource-service/server/src/main/resources/resources.json
+++ b/services/resource-service/server/src/main/resources/resources.json
@@ -2,6 +2,7 @@
{
"type": "SCP",
"resourceId": "remote-ssh-resource",
+ "resourceMode": "FILE",
"resourcePath": "/tmp/1mb.txt",
"scpStorage" : {
"storageId": "remote-ssh-storage",
@@ -13,6 +14,7 @@
{
"type": "SCP",
"resourceId": "remote-ssh-resource2",
+ "resourceMode": "FILE",
"resourcePath": "/tmp/10mb.txt",
"scpStorage" : {
"storageId": "remote-ssh-storage",
@@ -24,45 +26,53 @@
{
"type": "LOCAL",
"resourceId": "10mb-file",
- "resourcePath": "/tmp/1mb.txt"
+ "resourceMode": "FILE",
+ "resourcePath": "/tmp/10mb.txt"
},
{
"type": "S3",
"resourceId": "s3-file",
- "resourcePath": "new-file.txt",
+ "resourceMode": "FILE",
+ "resourcePath": "10mb-s3.txt",
"region": "us-east-2",
- "bucketName": "s3-bucket"
+ "bucketName": "airavata-s3"
},
{
"type": "BOX",
"resourceId": "box-file-abcd",
- "boxFileId": "655108198452"
+ "resourceMode": "FILE",
+ "resourcePath": "655108198452"
},
{
"type": "BOX",
"resourceId": "box-file-efgh",
- "boxFileId": "655450661536"
+ "resourceMode": "FILE",
+ "resourcePath": "655450661536"
},
{
"type": "AZURE",
"resourceId": "azure-blob",
"container": "sample-container",
- "blobName": "sample.blob"
+ "resourceMode": "FILE",
+ "resourcePath": "sample.blob"
},
{
"type": "GCS",
"resourceId": "gcs-bucket",
+ "resourceMode": "FILE",
"bucketName": "pika-pika-bucket",
"resourcePath": "PikaPikaTest.txt"
},
{
"type": "DROPBOX",
"resourceId": "dropbox-file",
+ "resourceMode": "FILE",
"resourcePath": "/test.txt"
},
{
"type": "FTP",
"resourceId": "ftp-resource",
+ "resourceMode": "FILE",
"resourcePath": "mft-1mb.txt",
"ftpStorage": {
"storageId": "ftp-resource",
diff --git a/services/resource-service/stub/src/main/proto/azure/AzureResource.proto b/services/resource-service/stub/src/main/proto/azure/AzureResource.proto
index fb0cf75..084fbc5 100644
--- a/services/resource-service/stub/src/main/proto/azure/AzureResource.proto
+++ b/services/resource-service/stub/src/main/proto/azure/AzureResource.proto
@@ -20,11 +20,15 @@
option java_multiple_files = true;
package org.apache.airavata.mft.resource.stubs.azure.resource;
+import "common/common.proto";
message AzureResource {
string resourceId = 1;
string container = 2;
- string blobName = 3;
+ oneof resource {
+ org.apache.airavata.mft.resource.stubs.common.FileResource file = 3;
+ org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 4;
+ }
}
message AzureResourceGetRequest {
diff --git a/services/resource-service/stub/src/main/proto/box/BoxResource.proto b/services/resource-service/stub/src/main/proto/box/BoxResource.proto
index dc4be83..7899fdb 100644
--- a/services/resource-service/stub/src/main/proto/box/BoxResource.proto
+++ b/services/resource-service/stub/src/main/proto/box/BoxResource.proto
@@ -20,10 +20,14 @@
option java_multiple_files = true;
package org.apache.airavata.mft.resource.stubs.box.resource;
+import "common/common.proto";
+
message BoxResource {
string resourceId = 1;
- string boxFileId = 2;
-}
+ oneof resource {
+ org.apache.airavata.mft.resource.stubs.common.FileResource file = 2;
+ org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 3;
+ }}
message BoxResourceGetRequest {
string resourceId = 1;
diff --git a/services/resource-service/stub/src/main/proto/common/common.proto b/services/resource-service/stub/src/main/proto/common/common.proto
new file mode 100644
index 0000000..4d69533
--- /dev/null
+++ b/services/resource-service/stub/src/main/proto/common/common.proto
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+package org.apache.airavata.mft.resource.stubs.common;
+
+message FileResource {
+ string resourcePath = 1;
+}
+
+message DirectoryResource {
+ string resourcePath = 1;
+ repeated FileResource files = 2;
+ repeated DirectoryResource directories = 3;
+}
\ No newline at end of file
diff --git a/services/resource-service/stub/src/main/proto/dropbox/DropboxResource.proto b/services/resource-service/stub/src/main/proto/dropbox/DropboxResource.proto
index 215657d..6ae5af9 100644
--- a/services/resource-service/stub/src/main/proto/dropbox/DropboxResource.proto
+++ b/services/resource-service/stub/src/main/proto/dropbox/DropboxResource.proto
@@ -20,9 +20,14 @@
option java_multiple_files = true;
package org.apache.airavata.mft.resource.stubs.dropbox.resource;
+import "common/common.proto";
+
message DropboxResource {
string resourceId = 1;
- string resourcePath = 2;
+ oneof resource {
+ org.apache.airavata.mft.resource.stubs.common.FileResource file = 2;
+ org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 3;
+ }
}
message DropboxResourceGetRequest {
diff --git a/services/resource-service/stub/src/main/proto/ftp/FTPResource.proto b/services/resource-service/stub/src/main/proto/ftp/FTPResource.proto
index c7f3e63..2ba2b9e 100644
--- a/services/resource-service/stub/src/main/proto/ftp/FTPResource.proto
+++ b/services/resource-service/stub/src/main/proto/ftp/FTPResource.proto
@@ -21,11 +21,15 @@
package org.apache.airavata.mft.resource.stubs.ftp.resource;
import "ftp/FTPStorage.proto";
+import "common/common.proto";
message FTPResource {
string resourceId = 1;
org.apache.airavata.mft.resource.stubs.ftp.storage.FTPStorage ftpStorage = 2;
- string resourcePath = 3;
+ oneof resource {
+ org.apache.airavata.mft.resource.stubs.common.FileResource file = 3;
+ org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 4;
+ }
}
message FTPResourceGetRequest {
diff --git a/services/resource-service/stub/src/main/proto/gcs/GCSResource.proto b/services/resource-service/stub/src/main/proto/gcs/GCSResource.proto
index 795196d..36f3c33 100644
--- a/services/resource-service/stub/src/main/proto/gcs/GCSResource.proto
+++ b/services/resource-service/stub/src/main/proto/gcs/GCSResource.proto
@@ -20,10 +20,15 @@
option java_multiple_files = true;
package org.apache.airavata.mft.resource.stubs.gcs.resource;
+import "common/common.proto";
+
message GCSResource {
string resourceId = 1;
string bucketName = 2;
- string resourcePath = 3;
+ oneof resource {
+ org.apache.airavata.mft.resource.stubs.common.FileResource file = 3;
+ org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 4;
+ }
}
message GCSResourceGetRequest {
diff --git a/services/resource-service/stub/src/main/proto/local/LocalResource.proto b/services/resource-service/stub/src/main/proto/local/LocalResource.proto
index ed64e90..1978b0e 100644
--- a/services/resource-service/stub/src/main/proto/local/LocalResource.proto
+++ b/services/resource-service/stub/src/main/proto/local/LocalResource.proto
@@ -20,10 +20,15 @@
option java_multiple_files = true;
package org.apache.airavata.mft.resource.stubs.local.resource;
+import "common/common.proto";
+
message LocalResource {
string resourceId = 1;
- string resourcePath = 2;
- string agentId = 3;
+ string agentId = 2;
+ oneof resource {
+ org.apache.airavata.mft.resource.stubs.common.FileResource file = 3;
+ org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 4;
+ }
}
message LocalResourceGetRequest {
diff --git a/services/resource-service/stub/src/main/proto/s3/S3Resource.proto b/services/resource-service/stub/src/main/proto/s3/S3Resource.proto
index 3727f27..9a9404a 100644
--- a/services/resource-service/stub/src/main/proto/s3/S3Resource.proto
+++ b/services/resource-service/stub/src/main/proto/s3/S3Resource.proto
@@ -20,11 +20,16 @@
option java_multiple_files = true;
package org.apache.airavata.mft.resource.stubs.s3.resource;
+import "common/common.proto";
+
message S3Resource {
string resourceId = 1;
string bucketName = 2;
string region = 3;
- string resourcePath = 4;
+ oneof resource {
+ org.apache.airavata.mft.resource.stubs.common.FileResource file = 4;
+ org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 5;
+ }
}
message S3ResourceGetRequest {
diff --git a/services/resource-service/stub/src/main/proto/scp/SCPResource.proto b/services/resource-service/stub/src/main/proto/scp/SCPResource.proto
index 07136a7..bfa51fb 100644
--- a/services/resource-service/stub/src/main/proto/scp/SCPResource.proto
+++ b/services/resource-service/stub/src/main/proto/scp/SCPResource.proto
@@ -21,11 +21,15 @@
package org.apache.airavata.mft.resource.stubs.scp.resource;
import "scp/SCPStorage.proto";
+import "common/common.proto";
message SCPResource {
string resourceId = 1;
org.apache.airavata.mft.resource.stubs.scp.storage.SCPStorage scpStorage = 2;
- string resourcePath = 3;
+ oneof resource {
+ org.apache.airavata.mft.resource.stubs.common.FileResource file = 3;
+ org.apache.airavata.mft.resource.stubs.common.DirectoryResource directory = 4;
+ }
}
message SCPResourceGetRequest {
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
index 0d1c321..1a725cb 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureMetadataCollector.java
@@ -23,6 +23,7 @@
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobProperties;
import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.airavata.mft.credential.stubs.azure.AzureSecret;
import org.apache.airavata.mft.credential.stubs.azure.AzureSecretGetRequest;
@@ -72,7 +73,7 @@
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().connectionString(azureSecret.getConnectionString()).buildClient();
- BlobClient blobClient = blobServiceClient.getBlobContainerClient(azureResource.getContainer()).getBlobClient(azureResource.getBlobName());
+ BlobClient blobClient = blobServiceClient.getBlobContainerClient(azureResource.getContainer()).getBlobClient(azureResource.getFile().getResourcePath());
BlobProperties properties = blobClient.getBlockBlobClient().getProperties();
ResourceMetadata metadata = new ResourceMetadata();
@@ -107,6 +108,12 @@
if (!containerExists) {
return false;
}
- return containerClient.getBlobClient(azureResource.getBlobName()).exists();
+ switch (azureResource.getResourceCase().name()){
+ case ResourceTypes.FILE:
+ return containerClient.getBlobClient(azureResource.getFile().getResourcePath()).exists();
+ case ResourceTypes.DIRECTORY:
+ return containerClient.getBlobClient(azureResource.getDirectory().getResourcePath()).exists();
+ }
+ return false;
}
}
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java
index 6d5a51e..162d31d 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureReceiver.java
@@ -23,6 +23,7 @@
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.specialized.BlobInputStream;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.azure.AzureSecret;
import org.apache.airavata.mft.credential.stubs.azure.AzureSecretGetRequest;
@@ -74,37 +75,46 @@
public void startStream(ConnectorContext context) throws Exception {
logger.info("Starting azure receive for remote server for transfer {}", context.getTransferId());
checkInitialized();
- BlobClient blobClient = containerClient.getBlobClient(azureResource.getBlobName());
- BlobInputStream blobInputStream = blobClient.openInputStream();
- OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+ if (ResourceTypes.FILE.equals(this.azureResource.getResourceCase().name())) {
+ BlobClient blobClient = containerClient.getBlobClient(azureResource.getFile().getResourcePath());
+ BlobInputStream blobInputStream = blobClient.openInputStream();
- long fileSize = context.getMetadata().getResourceSize();
+ OutputStream streamOs = context.getStreamBuffer().getOutputStream();
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize = 0;
+ long fileSize = context.getMetadata().getResourceSize();
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = blobInputStream.read(buf, 0, bufSize);
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize = 0;
- if (bufSize < 0) {
- break;
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = blobInputStream.read(buf, 0, bufSize);
+
+ if (bufSize < 0) {
+ break;
+ }
+
+ streamOs.write(buf, 0, bufSize);
+ streamOs.flush();
+
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
- streamOs.write(buf, 0, bufSize);
- streamOs.flush();
+ streamOs.close();
+ logger.info("Completed azure receive for remote server for transfer {}", context.getTransferId());
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.azureResource.getResourceId(), this.azureResource.getResourceCase().name());
+ throw new Exception("Resource " + this.azureResource.getResourceId() + " should be a FILE type. Found a " +
+ this.azureResource.getResourceCase().name());
}
-
- streamOs.close();
- logger.info("Completed azure receive for remote server for transfer {}", context.getTransferId());
}
}
diff --git a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java
index a2d05dc..9642103 100644
--- a/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java
+++ b/transport/azure-transport/src/main/java/org/apache/airavata/mft/transport/azure/AzureSender.java
@@ -22,6 +22,7 @@
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.specialized.BlockBlobClient;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.azure.AzureSecret;
import org.apache.airavata.mft.credential.stubs.azure.AzureSecretGetRequest;
@@ -71,9 +72,17 @@
public void startStream(ConnectorContext context) throws Exception {
logger.info("Starting Azure send for remote server for transfer {}", context.getTransferId());
checkInitialized();
- BlockBlobClient blockBlobClient = containerClient.getBlobClient(azureResource.getBlobName()).getBlockBlobClient();
- blockBlobClient.upload(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize(), true);
- logger.info("Completed Azure send for remote server for transfer {}", context.getTransferId());
+
+ if (ResourceTypes.FILE.equals(this.azureResource.getResourceCase().name())) {
+ BlockBlobClient blockBlobClient = containerClient.getBlobClient(azureResource.getFile().getResourcePath()).getBlockBlobClient();
+ blockBlobClient.upload(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize(), true);
+ logger.info("Completed Azure send for remote server for transfer {}", context.getTransferId());
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.azureResource.getResourceId(), this.azureResource.getResourceCase().name());
+ throw new Exception("Resource " + this.azureResource.getResourceId() + " should be a FILE type. Found a " +
+ this.azureResource.getResourceCase().name());
+ }
}
}
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
index 06a0c16..b55a0f2 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxMetadataCollector.java
@@ -21,6 +21,7 @@
import com.box.sdk.BoxAPIConnection;
import com.box.sdk.BoxFile;
import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.airavata.mft.credential.stubs.box.BoxSecret;
import org.apache.airavata.mft.credential.stubs.box.BoxSecretGetRequest;
@@ -66,7 +67,7 @@
BoxSecret boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
BoxAPIConnection api = new BoxAPIConnection(boxSecret.getAccessToken());
- BoxFile boxFile = new BoxFile(api, boxResource.getBoxFileId());
+ BoxFile boxFile = new BoxFile(api, boxResource.getFile().getResourcePath());
BoxFile.Info boxFileInfo = boxFile.getInfo();
ResourceMetadata metadata = new ResourceMetadata();
@@ -93,8 +94,14 @@
BoxSecret boxSecret = secretClient.box().getBoxSecret(BoxSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
BoxAPIConnection api = new BoxAPIConnection(boxSecret.getAccessToken());
- BoxFile boxFile = new BoxFile(api, boxResource.getBoxFileId());
+ BoxFile boxFile;
+ switch (boxResource.getResourceCase().name()){
+ case ResourceTypes.FILE:
+ boxFile = new BoxFile(api, boxResource.getFile().getResourcePath());
+ case ResourceTypes.DIRECTORY:
+ boxFile = new BoxFile(api, boxResource.getDirectory().getResourcePath());
+ }
return true;
}
}
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java
index 20af8f7..907b485 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxReceiver.java
@@ -21,6 +21,7 @@
import com.box.sdk.BoxAPIConnection;
import com.box.sdk.BoxFile;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.box.BoxSecret;
import org.apache.airavata.mft.credential.stubs.box.BoxSecretGetRequest;
@@ -65,15 +66,23 @@
@Override
public void startStream(ConnectorContext context) throws Exception {
- logger.info("Starting Box Receiver stream for transfer {}", context.getTransferId());
+ if (ResourceTypes.FILE.equals(this.boxResource.getResourceCase().name())) {
+ logger.info("Starting Box Receiver stream for transfer {}", context.getTransferId());
- BoxFile file = new BoxFile(this.boxClient, this.boxResource.getBoxFileId());
+ BoxFile file = new BoxFile(this.boxClient, this.boxResource.getFile().getResourcePath());
- OutputStream os = context.getStreamBuffer().getOutputStream();
- file.download(os);
- os.flush();
- os.close();
+ OutputStream os = context.getStreamBuffer().getOutputStream();
+ file.download(os);
+ os.flush();
+ os.close();
- logger.info("Completed Box Receiver stream for transfer {}", context.getTransferId());
+ logger.info("Completed Box Receiver stream for transfer {}", context.getTransferId());
+
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.boxResource.getResourceId(), this.boxResource.getResourceCase().name());
+ throw new Exception("Resource " + this.boxResource.getResourceId() + " should be a FILE type. Found a " +
+ this.boxResource.getResourceCase().name());
+ }
}
}
diff --git a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java
index 528bfc1..c9b279f 100644
--- a/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java
+++ b/transport/box-transport/src/main/java/org/apache/airavata/mft/transport/box/BoxSender.java
@@ -21,6 +21,7 @@
import com.box.sdk.BoxAPIConnection;
import com.box.sdk.BoxFile;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.box.BoxSecret;
import org.apache.airavata.mft.credential.stubs.box.BoxSecretGetRequest;
@@ -62,18 +63,26 @@
public void startStream(ConnectorContext context) throws Exception {
logger.info("Starting Box Sender stream for transfer {}", context.getTransferId());
- logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
+ logger.debug("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
- BoxFile file = new BoxFile(this.boxClient, this.boxResource.getBoxFileId());
+ if (ResourceTypes.FILE.equals(this.boxResource.getResourceCase().name())) {
+ BoxFile file = new BoxFile(this.boxClient, this.boxResource.getFile().getResourcePath());
- // Upload chunks only if the file size is > 20mb
- // Ref: https://developer.box.com/guides/uploads/chunked/
- if (context.getMetadata().getResourceSize() > 20971520) {
- file.uploadLargeFile(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize());
+ // Upload chunks only if the file size is > 20mb
+ // Ref: https://developer.box.com/guides/uploads/chunked/
+ if (context.getMetadata().getResourceSize() > 20971520) {
+ file.uploadLargeFile(context.getStreamBuffer().getInputStream(), context.getMetadata().getResourceSize());
+ } else {
+ file.uploadNewVersion(context.getStreamBuffer().getInputStream());
+ }
+
+ logger.info("Completed Box Sender stream for transfer {}", context.getTransferId());
+
} else {
- file.uploadNewVersion(context.getStreamBuffer().getInputStream());
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.boxResource.getResourceId(), this.boxResource.getResourceCase().name());
+ throw new Exception("Resource " + this.boxResource.getResourceId() + " should be a FILE type. Found a " +
+ this.boxResource.getResourceCase().name());
}
-
- logger.info("Completed Box Sender stream for transfer {}", context.getTransferId());
}
}
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
index 5618337..1310c53 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxMetadataCollector.java
@@ -21,6 +21,7 @@
import com.dropbox.core.v2.DbxClientV2;
import com.dropbox.core.v2.files.FileMetadata;
import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecret;
import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecretGetRequest;
@@ -68,7 +69,7 @@
DbxClientV2 dbxClientV2 = new DbxClientV2(config, dropboxSecret.getAccessToken());
ResourceMetadata metadata = new ResourceMetadata();
- FileMetadata fileMetadata = (FileMetadata) dbxClientV2.files().getMetadata(dropboxResource.getResourcePath());
+ FileMetadata fileMetadata = (FileMetadata) dbxClientV2.files().getMetadata(dropboxResource.getFile().getResourcePath());
metadata.setResourceSize(fileMetadata.getSize());
metadata.setMd5sum(null);
metadata.setUpdateTime(fileMetadata.getServerModified().getTime());
@@ -88,6 +89,12 @@
DbxRequestConfig config = DbxRequestConfig.newBuilder("mftdropbox/v1").build();
DbxClientV2 dbxClientV2 = new DbxClientV2(config, dropboxSecret.getAccessToken());
- return !dbxClientV2.files().searchV2(dropboxResource.getResourcePath()).getMatches().isEmpty();
+ switch (dropboxResource.getResourceCase().name()){
+ case ResourceTypes.FILE:
+ return !dbxClientV2.files().searchV2(dropboxResource.getFile().getResourcePath()).getMatches().isEmpty();
+ case ResourceTypes.DIRECTORY:
+ return !dbxClientV2.files().searchV2(dropboxResource.getDirectory().getResourcePath()).getMatches().isEmpty();
+ }
+ return false;
}
}
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java
index d047393..5bdf5a9 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxReceiver.java
@@ -20,6 +20,7 @@
import com.dropbox.core.DbxRequestConfig;
import com.dropbox.core.v2.DbxClientV2;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecret;
import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecretGetRequest;
@@ -61,38 +62,47 @@
@Override
public void startStream(ConnectorContext context) throws Exception {
- logger.info("Starting Dropbox Receiver stream for transfer {}", context.getTransferId());
- InputStream inputStream = dbxClientV2.files().download(this.dropboxResource.getResourcePath()).getInputStream();
- OutputStream os = context.getStreamBuffer().getOutputStream();
- int read;
- long bytes = 0;
- long fileSize = context.getMetadata().getResourceSize();
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize = 0;
+ if (ResourceTypes.FILE.equals(this.dropboxResource.getResourceCase().name())) {
+ logger.info("Starting Dropbox Receiver stream for transfer {}", context.getTransferId());
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = inputStream.read(buf, 0, bufSize);
+ InputStream inputStream = dbxClientV2.files().download(this.dropboxResource.getFile().getResourcePath()).getInputStream();
+ OutputStream os = context.getStreamBuffer().getOutputStream();
+ int read;
+ long bytes = 0;
+ long fileSize = context.getMetadata().getResourceSize();
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize = 0;
- if (bufSize < 0) {
- break;
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = inputStream.read(buf, 0, bufSize);
+
+ if (bufSize < 0) {
+ break;
+ }
+
+ os.write(buf, 0, bufSize);
+ os.flush();
+
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
- os.write(buf, 0, bufSize);
- os.flush();
+ os.close();
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ logger.info("Completed Dropbox Receiver stream for transfer {}", context.getTransferId());
+
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.dropboxResource.getResourceId(), this.dropboxResource.getResourceCase().name());
+ throw new Exception("Resource " + this.dropboxResource.getResourceId() + " should be a FILE type. Found a " +
+ this.dropboxResource.getResourceCase().name());
}
-
- os.close();
-
- logger.info("Completed Dropbox Receiver stream for transfer {}", context.getTransferId());
}
}
diff --git a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java
index b0d16a7..ffab965 100644
--- a/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java
+++ b/transport/dropbox-transport/src/main/java/org/apache/airavata/mft/transport/dropbox/DropboxSender.java
@@ -22,6 +22,7 @@
import com.dropbox.core.v2.files.FileMetadata;
import com.dropbox.core.v2.files.WriteMode;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecret;
import org.apache.airavata.mft.credential.stubs.dropbox.DropboxSecretGetRequest;
@@ -64,10 +65,20 @@
logger.info("Starting Dropbox Sender stream for transfer {}", context.getTransferId());
logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
- FileMetadata metadata = dbxClientV2.files().uploadBuilder(dropboxResource.getResourcePath())
- .withMode(WriteMode.OVERWRITE)
- .uploadAndFinish(context.getStreamBuffer().getInputStream());
- logger.info("Completed Dropbox Sender stream for transfer {}", context.getTransferId());
+
+ if (ResourceTypes.FILE.equals(this.dropboxResource.getResourceCase().name())) {
+ FileMetadata metadata = dbxClientV2.files().uploadBuilder(dropboxResource.getFile().getResourcePath())
+ .withMode(WriteMode.OVERWRITE)
+ .uploadAndFinish(context.getStreamBuffer().getInputStream());
+ logger.info("Completed Dropbox Sender stream for transfer {}", context.getTransferId());
+
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.dropboxResource.getResourceId(), this.dropboxResource.getResourceCase().name());
+ throw new Exception("Resource " + this.dropboxResource.getResourceId() + " should be a FILE type. Found a " +
+ this.dropboxResource.getResourceCase().name());
+ }
+
}
}
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
index 1f3ba45..9b71dc8 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPMetadataCollector.java
@@ -18,6 +18,7 @@
package org.apache.airavata.mft.transport.ftp;
import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.airavata.mft.credential.stubs.ftp.FTPSecret;
import org.apache.airavata.mft.credential.stubs.ftp.FTPSecretGetRequest;
@@ -73,14 +74,14 @@
FTPClient ftpClient = null;
try {
ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret);
- logger.info("Fetching metadata for resource {} in {}", ftpResource.getResourcePath(), ftpResource.getFtpStorage().getHost());
+ logger.info("Fetching metadata for resource {} in {}", ftpResource.getFile().getResourcePath(), ftpResource.getFtpStorage().getHost());
- FTPFile ftpFile = ftpClient.mlistFile(ftpResource.getResourcePath());
+ FTPFile ftpFile = ftpClient.mlistFile(ftpResource.getFile().getResourcePath());
if (ftpFile != null) {
resourceMetadata.setResourceSize(ftpFile.getSize());
resourceMetadata.setUpdateTime(ftpFile.getTimestamp().getTimeInMillis());
- if (ftpClient.hasFeature("MD5") && FTPReply.isPositiveCompletion(ftpClient.sendCommand("MD5 " + ftpResource.getResourcePath()))) {
+ if (ftpClient.hasFeature("MD5") && FTPReply.isPositiveCompletion(ftpClient.sendCommand("MD5 " + ftpResource.getFile().getResourcePath()))) {
String[] replies = ftpClient.getReplyStrings();
resourceMetadata.setMd5sum(replies[0]);
} else {
@@ -109,8 +110,14 @@
FTPClient ftpClient = null;
try {
ftpClient = FTPTransportUtil.getFTPClient(ftpResource, ftpSecret);
- InputStream inputStream = ftpClient.retrieveFileStream(ftpResource.getResourcePath());
+ InputStream inputStream = null;
+ switch (ftpResource.getResourceCase().name()){
+ case ResourceTypes.FILE:
+ inputStream = ftpClient.retrieveFileStream(ftpResource.getFile().getResourcePath());
+ case ResourceTypes.DIRECTORY:
+ inputStream = ftpClient.retrieveFileStream(ftpResource.getDirectory().getResourcePath());
+ }
return !(inputStream == null || ftpClient.getReplyCode() == 550);
} catch (Exception e) {
logger.error("FTP client initialization failed ", e);
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
index 6ced168..9225b0e 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPReceiver.java
@@ -18,6 +18,7 @@
package org.apache.airavata.mft.transport.ftp;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.ftp.FTPSecret;
import org.apache.airavata.mft.credential.stubs.ftp.FTPSecretGetRequest;
@@ -62,40 +63,49 @@
@Override
public void startStream(ConnectorContext context) throws Exception {
- logger.info("Starting FTP receiver stream for transfer {}", context.getTransferId());
- checkInitialized();
- OutputStream streamOs = context.getStreamBuffer().getOutputStream();
- InputStream inputStream = ftpClient.retrieveFileStream(resource.getResourcePath());
+ if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
+ logger.info("Starting FTP receiver stream for transfer {}", context.getTransferId());
- long fileSize = context.getMetadata().getResourceSize();
+ checkInitialized();
+ OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+ InputStream inputStream = ftpClient.retrieveFileStream(resource.getFile().getResourcePath());
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize;
+ long fileSize = context.getMetadata().getResourceSize();
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = inputStream.read(buf, 0, bufSize);
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize;
- if (bufSize < 0) {
- break;
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = inputStream.read(buf, 0, bufSize);
+
+ if (bufSize < 0) {
+ break;
+ }
+
+ streamOs.write(buf, 0, bufSize);
+ streamOs.flush();
+
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
- streamOs.write(buf, 0, bufSize);
- streamOs.flush();
+ inputStream.close();
+ streamOs.close();
+ logger.info("Completed FTP receiver stream for transfer {}", context.getTransferId());
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.resource.getResourceId(), this.resource.getResourceCase().name());
+ throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
+ this.resource.getResourceCase().name());
}
-
- inputStream.close();
- streamOs.close();
- logger.info("Completed FTP receiver stream for transfer {}", context.getTransferId());
}
private void checkInitialized() {
diff --git a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
index d9dbf06..f3a55f7 100644
--- a/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
+++ b/transport/ftp-transport/src/main/java/org/apache/airavata/mft/transport/ftp/FTPSender.java
@@ -18,6 +18,7 @@
package org.apache.airavata.mft.transport.ftp;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.ftp.FTPSecret;
import org.apache.airavata.mft.credential.stubs.ftp.FTPSecretGetRequest;
@@ -66,38 +67,45 @@
logger.info("Starting FTP sender stream for transfer {}", context.getTransferId());
checkInitialized();
- InputStream in = context.getStreamBuffer().getInputStream();
- long fileSize = context.getMetadata().getResourceSize();
- OutputStream outputStream = ftpClient.storeFileStream(resource.getResourcePath());
-
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize;
-
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = in.read(buf, 0, bufSize);
-
- if (bufSize < 0) {
- break;
- }
-
- outputStream.write(buf, 0, bufSize);
- outputStream.flush();
-
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
- }
-
- in.close();
- outputStream.close();
logger.info("Completed FTP sender stream for transfer {}", context.getTransferId());
+ if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
+ InputStream in = context.getStreamBuffer().getInputStream();
+ long fileSize = context.getMetadata().getResourceSize();
+ OutputStream outputStream = ftpClient.storeFileStream(resource.getFile().getResourcePath());
+
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize;
+
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = in.read(buf, 0, bufSize);
+
+ if (bufSize < 0) {
+ break;
+ }
+
+ outputStream.write(buf, 0, bufSize);
+ outputStream.flush();
+
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
+ }
+
+ in.close();
+ outputStream.close();
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.resource.getResourceId(), this.resource.getResourceCase().name());
+ throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
+ this.resource.getResourceCase().name());
+ }
}
private void checkInitialized() {
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
index d8d0a6b..0f8e634 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSMetadataCollector.java
@@ -26,6 +26,7 @@
import com.google.api.services.storage.StorageScopes;
import com.google.api.services.storage.model.StorageObject;
import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.airavata.mft.credential.stubs.gcs.GCSSecret;
import org.apache.airavata.mft.credential.stubs.gcs.GCSSecretGetRequest;
@@ -86,7 +87,7 @@
Storage storage = new Storage.Builder(transport, jsonFactory, credential).build();
ResourceMetadata metadata = new ResourceMetadata();
- StorageObject gcsMetadata = storage.objects().get(gcsResource.getBucketName(), gcsResource.getResourcePath()).execute();
+ StorageObject gcsMetadata = storage.objects().get(gcsResource.getBucketName(), gcsResource.getFile().getResourcePath()).execute();
metadata.setResourceSize(gcsMetadata.getSize().longValue());
String md5Sum = String.format("%032x", new BigInteger(1, Base64.getDecoder().decode(gcsMetadata.getMd5Hash())));
metadata.setMd5sum(md5Sum);
@@ -114,6 +115,12 @@
}
Storage storage = new Storage.Builder(transport, jsonFactory, credential).build();
- return !storage.objects().get(gcsResource.getBucketName(), gcsResource.getResourcePath()).execute().isEmpty();
+ switch (gcsResource.getResourceCase().name()){
+ case ResourceTypes.FILE:
+ return !storage.objects().get(gcsResource.getBucketName(), gcsResource.getFile().getResourcePath()).execute().isEmpty();
+ case ResourceTypes.DIRECTORY:
+ return !storage.objects().get(gcsResource.getBucketName(), gcsResource.getDirectory().getResourcePath()).execute().isEmpty();
+ }
+ return false;
}
}
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java
index 4261e40..5a1bc63 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSReceiver.java
@@ -25,6 +25,7 @@
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.StorageScopes;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.gcs.GCSSecret;
import org.apache.airavata.mft.credential.stubs.gcs.GCSSecretGetRequest;
@@ -78,36 +79,45 @@
public void startStream(ConnectorContext context) throws Exception {
logger.info("Starting GCS Receiver stream for transfer {}", context.getTransferId());
- InputStream inputStream = storage.objects().get(this.gcsResource.getBucketName(), this.gcsResource.getResourcePath()).executeMediaAsInputStream();
- OutputStream os = context.getStreamBuffer().getOutputStream();
- int read;
- long bytes = 0;
- long fileSize = context.getMetadata().getResourceSize();
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize = 0;
+ if (ResourceTypes.FILE.equals(this.gcsResource.getResourceCase().name())) {
+ InputStream inputStream = storage.objects().get(this.gcsResource.getBucketName(),
+ this.gcsResource.getFile().getResourcePath()).executeMediaAsInputStream();
+ OutputStream os = context.getStreamBuffer().getOutputStream();
+ int read;
+ long bytes = 0;
+ long fileSize = context.getMetadata().getResourceSize();
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize = 0;
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = inputStream.read(buf, 0, bufSize);
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = inputStream.read(buf, 0, bufSize);
- if (bufSize < 0) {
- break;
+ if (bufSize < 0) {
+ break;
+ }
+
+ os.write(buf, 0, bufSize);
+ os.flush();
+
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
- os.write(buf, 0, bufSize);
- os.flush();
+ os.close();
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ logger.info("Completed GCS Receiver stream for transfer {}", context.getTransferId());
+
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.gcsResource.getResourceId(), this.gcsResource.getResourceCase().name());
+ throw new Exception("Resource " + this.gcsResource.getResourceId() + " should be a FILE type. Found a " +
+ this.gcsResource.getResourceCase().name());
}
-
- os.close();
-
- logger.info("Completed GCS Receiver stream for transfer {}", context.getTransferId());
}
}
diff --git a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java
index 79768de..7326545 100644
--- a/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java
+++ b/transport/gcp-transport/src/main/java/org/apache/airavata/mft/transport/gcp/GCSSender.java
@@ -31,6 +31,7 @@
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.gcs.GCSSecret;
import org.apache.airavata.mft.credential.stubs.gcs.GCSSecretGetRequest;
@@ -87,22 +88,31 @@
@Override
public void startStream(ConnectorContext context) throws Exception {
logger.info("Starting GCS Sender stream for transfer {}", context.getTransferId());
- logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
+ logger.debug("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
- InputStreamContent contentStream = new InputStreamContent(
- null, context.getStreamBuffer().getInputStream());
- String entityUser = jsonObject.get("client_email").getAsString();
- StorageObject objectMetadata = new StorageObject()
- // Set the destination object name
- .setName(this.gcsResource.getResourcePath())
- // Set the access control list to publicly read-only
- .setAcl(Arrays.asList(new ObjectAccessControl().setEntity("user-" + entityUser).setRole("OWNER")));
+ if (ResourceTypes.FILE.equals(this.gcsResource.getResourceCase().name())) {
- Insert insertRequest = storage.objects().insert(
- this.gcsResource.getBucketName(), objectMetadata, contentStream);
+ InputStreamContent contentStream = new InputStreamContent(
+ null, context.getStreamBuffer().getInputStream());
+ String entityUser = jsonObject.get("client_email").getAsString();
+ StorageObject objectMetadata = new StorageObject()
+ // Set the destination object name
+ .setName(this.gcsResource.getFile().getResourcePath())
+ // Set the access control list to publicly read-only
+ .setAcl(Arrays.asList(new ObjectAccessControl().setEntity("user-" + entityUser).setRole("OWNER")));
- insertRequest.execute();
+ Insert insertRequest = storage.objects().insert(this.gcsResource.getBucketName(), objectMetadata, contentStream);
- logger.info("Completed GCS Sender stream for transfer {}", context.getTransferId());
+ insertRequest.execute();
+
+ logger.info("Completed GCS Sender stream for transfer {}", context.getTransferId());
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.gcsResource.getResourceId(), this.gcsResource.getResourceCase().name());
+ throw new Exception("Resource " + this.gcsResource.getResourceId() + " should be a FILE type. Found a " +
+ this.gcsResource.getResourceCase().name());
+ }
+
+
}
}
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
index e28c42c..ba21a2c 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
@@ -18,6 +18,7 @@
package org.apache.airavata.mft.transport.local;
import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
@@ -59,17 +60,17 @@
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
LocalResource localResource = resourceClient.local().getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
- File resourceFile = new File(localResource.getResourcePath());
+ File resourceFile = new File(localResource.getFile().getResourcePath());
if (resourceFile.exists()) {
- BasicFileAttributes basicFileAttributes = Files.readAttributes(Path.of(localResource.getResourcePath()), BasicFileAttributes.class);
+ BasicFileAttributes basicFileAttributes = Files.readAttributes(Path.of(localResource.getFile().getResourcePath()), BasicFileAttributes.class);
ResourceMetadata metadata = new ResourceMetadata();
metadata.setCreatedTime(basicFileAttributes.creationTime().toMillis());
metadata.setUpdateTime(basicFileAttributes.lastModifiedTime().toMillis());
metadata.setResourceSize(basicFileAttributes.size());
MessageDigest digest = MessageDigest.getInstance("MD5");
- FileInputStream fis = new FileInputStream(localResource.getResourcePath());
+ FileInputStream fis = new FileInputStream(localResource.getFile().getResourcePath());
byte[] byteArray = new byte[1024];
int bytesCount = 0;
while ((bytesCount = fis.read(byteArray)) != -1) {
@@ -85,7 +86,7 @@
return metadata;
} else {
- throw new Exception("Resource with id " + resourceId + " in path " + localResource.getResourcePath() + " does not exist");
+ throw new Exception("Resource with id " + resourceId + " in path " + localResource.getFile().getResourcePath() + " does not exist");
}
}
@@ -93,7 +94,13 @@
public Boolean isAvailable(String resourceId, String credentialToken) throws Exception {
ResourceServiceClient resourceClient = ResourceServiceClientBuilder.buildClient(resourceServiceHost, resourceServicePort);
LocalResource localResource = resourceClient.local().getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
- File resourceFile = new File(localResource.getResourcePath());
- return resourceFile.exists();
+
+ switch (localResource.getResourceCase().name()){
+ case ResourceTypes.FILE:
+ return new File(localResource.getFile().getResourcePath()).exists();
+ case ResourceTypes.DIRECTORY:
+ return new File(localResource.getDirectory().getResourcePath()).exists();
+ }
+ return false;
}
}
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
index 06d34fc..1b61fb1 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
@@ -18,6 +18,7 @@
package org.apache.airavata.mft.transport.local;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
@@ -60,36 +61,46 @@
logger.info("Starting local receiver stream for transfer {}", context.getTransferId());
checkInitialized();
- OutputStream streamOs = context.getStreamBuffer().getOutputStream();
- FileInputStream fis = new FileInputStream(new File(resource.getResourcePath()));
- long fileSize = context.getMetadata().getResourceSize();
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize = 0;
+ if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
+ OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+ FileInputStream fis = new FileInputStream(new File(resource.getFile().getResourcePath()));
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = fis.read(buf, 0, bufSize);
+ long fileSize = context.getMetadata().getResourceSize();
- if (bufSize < 0) {
- break;
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize = 0;
+
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = fis.read(buf, 0, bufSize);
+
+ if (bufSize < 0) {
+ break;
+ }
+
+ streamOs.write(buf, 0, bufSize);
+ streamOs.flush();
+
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
- streamOs.write(buf, 0, bufSize);
- streamOs.flush();
+ fis.close();
+ streamOs.close();
+ logger.info("Completed local receiver stream for transfer {}", context.getTransferId());
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.resource.getResourceId(), this.resource.getResourceCase().name());
+ throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
+ this.resource.getResourceCase().name());
}
-
- fis.close();
- streamOs.close();
- logger.info("Completed local receiver stream for transfer {}", context.getTransferId());
}
}
diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
index 17baf35..315fa25 100644
--- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
+++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
@@ -18,6 +18,7 @@
package org.apache.airavata.mft.transport.local;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.resource.client.ResourceServiceClient;
import org.apache.airavata.mft.resource.client.ResourceServiceClientBuilder;
@@ -61,37 +62,44 @@
logger.info("Starting local sender stream for transfer {}", context.getTransferId());
checkInitialized();
- InputStream in = context.getStreamBuffer().getInputStream();
- long fileSize = context.getMetadata().getResourceSize();
- OutputStream fos = new FileOutputStream(resource.getResourcePath());
- byte[] buf = new byte[1024];
- while (true) {
- int bufSize = 0;
+ if (ResourceTypes.FILE.equals(this.resource.getResourceCase().name())) {
+ InputStream in = context.getStreamBuffer().getInputStream();
+ long fileSize = context.getMetadata().getResourceSize();
+ OutputStream fos = new FileOutputStream(resource.getFile().getResourcePath());
- if (buf.length < fileSize) {
- bufSize = buf.length;
- } else {
- bufSize = (int) fileSize;
- }
- bufSize = in.read(buf, 0, bufSize);
+ byte[] buf = new byte[1024];
+ while (true) {
+ int bufSize = 0;
- if (bufSize < 0) {
- break;
+ if (buf.length < fileSize) {
+ bufSize = buf.length;
+ } else {
+ bufSize = (int) fileSize;
+ }
+ bufSize = in.read(buf, 0, bufSize);
+
+ if (bufSize < 0) {
+ break;
+ }
+
+ fos.write(buf, 0, bufSize);
+ fos.flush();
+
+ fileSize -= bufSize;
+ if (fileSize == 0L)
+ break;
}
- fos.write(buf, 0, bufSize);
- fos.flush();
+ in.close();
+ fos.close();
- fileSize -= bufSize;
- if (fileSize == 0L)
- break;
+ logger.info("Completed local sender stream for transfer {}", context.getTransferId());
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.resource.getResourceId(), this.resource.getResourceCase().name());
+ throw new Exception("Resource " + this.resource.getResourceId() + " should be a FILE type. Found a " +
+ this.resource.getResourceCase().name());
}
-
- in.close();
- fos.close();
-
- logger.info("Completed local sender stream for transfer {}", context.getTransferId());
-
}
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
index 6203306..56d98f4 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
@@ -23,6 +23,7 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
@@ -74,7 +75,7 @@
.build();
ResourceMetadata metadata = new ResourceMetadata();
- ObjectMetadata s3Metadata = s3Client.getObjectMetadata(s3Resource.getBucketName(), s3Resource.getResourcePath());
+ ObjectMetadata s3Metadata = s3Client.getObjectMetadata(s3Resource.getBucketName(), s3Resource.getFile().getResourcePath());
metadata.setResourceSize(s3Metadata.getContentLength());
metadata.setMd5sum(s3Metadata.getETag());
metadata.setUpdateTime(s3Metadata.getLastModified().getTime());
@@ -99,6 +100,12 @@
.withRegion(s3Resource.getRegion())
.build();
- return s3Client.doesObjectExist(s3Resource.getBucketName(), s3Resource.getResourcePath());
+ switch (s3Resource.getResourceCase().name()){
+ case ResourceTypes.FILE:
+ return s3Client.doesObjectExist(s3Resource.getBucketName(), s3Resource.getFile().getResourcePath());
+ case ResourceTypes.DIRECTORY:
+ return s3Client.doesObjectExist(s3Resource.getBucketName(), s3Resource.getDirectory().getResourcePath());
+ }
+ return false;
}
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
index 0ac7191..7e8d42b 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Receiver.java
@@ -24,6 +24,7 @@
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
@@ -70,21 +71,29 @@
@Override
public void startStream(ConnectorContext context) throws Exception {
- logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
+ if (ResourceTypes.FILE.equals(this.s3Resource.getResourceCase().name())) {
+ logger.info("Starting S3 Receiver stream for transfer {}", context.getTransferId());
- S3Object s3object = s3Client.getObject(s3Resource.getBucketName(), s3Resource.getResourcePath());
- S3ObjectInputStream inputStream = s3object.getObjectContent();
+ S3Object s3object = s3Client.getObject(s3Resource.getBucketName(), s3Resource.getFile().getResourcePath());
+ S3ObjectInputStream inputStream = s3object.getObjectContent();
- OutputStream os = context.getStreamBuffer().getOutputStream();
- int read;
- long bytes = 0;
- while ((read = inputStream.read()) != -1) {
- bytes++;
- os.write(read);
+ OutputStream os = context.getStreamBuffer().getOutputStream();
+ int read;
+ long bytes = 0;
+ while ((read = inputStream.read()) != -1) {
+ bytes++;
+ os.write(read);
+ }
+ os.flush();
+ os.close();
+
+ logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
+
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.s3Resource.getResourceId(), this.s3Resource.getResourceCase().name());
+ throw new Exception("Resource " + this.s3Resource.getResourceId() + " should be a FILE type. Found a " +
+ this.s3Resource.getResourceCase().name());
}
- os.flush();
- os.close();
-
- logger.info("Completed S3 Receiver stream for transfer {}", context.getTransferId());
}
}
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
index 70b32e4..f0faeb2 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Sender.java
@@ -23,6 +23,7 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.airavata.mft.core.ConnectorContext;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
import org.apache.airavata.mft.credential.stubs.s3.S3SecretGetRequest;
@@ -70,8 +71,17 @@
logger.info("Content length for transfer {} {}", context.getTransferId(), context.getMetadata().getResourceSize());
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(context.getMetadata().getResourceSize());
- s3Client.putObject(this.s3Resource.getBucketName(), this.s3Resource.getResourcePath(), context.getStreamBuffer().getInputStream(), metadata);
- logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
+ if (ResourceTypes.FILE.equals(this.s3Resource.getResourceCase().name())) {
+ s3Client.putObject(this.s3Resource.getBucketName(), this.s3Resource.getFile().getResourcePath(),
+ context.getStreamBuffer().getInputStream(), metadata);
+ logger.info("Completed S3 Sender stream for transfer {}", context.getTransferId());
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.s3Resource.getResourceId(), this.s3Resource.getResourceCase().name());
+ throw new Exception("Resource " + this.s3Resource.getResourceId() + " should be a FILE type. Found a " +
+ this.s3Resource.getResourceCase().name());
+ }
+
}
}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
index e167ed6..2c1ac22 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPMetadataCollector.java
@@ -28,6 +28,7 @@
import net.schmizz.sshj.userauth.method.ChallengeResponseProvider;
import net.schmizz.sshj.userauth.password.Resource;
import org.apache.airavata.mft.core.ResourceMetadata;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
@@ -82,10 +83,10 @@
try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
- logger.info("Fetching metadata for resource {} in {}", scpResource.getResourcePath(), scpResource.getScpStorage().getHost());
+ logger.info("Fetching metadata for resource {} in {}", scpResource.getFile().getResourcePath(), scpResource.getScpStorage().getHost());
try (SFTPClient sftpClient = sshClient.newSFTPClient()) {
- FileAttributes lstat = sftpClient.lstat(scpResource.getResourcePath());
+ FileAttributes lstat = sftpClient.lstat(scpResource.getFile().getResourcePath());
sftpClient.close();
ResourceMetadata metadata = new ResourceMetadata();
@@ -96,7 +97,7 @@
try {
// TODO calculate md5 using the binary based on the OS platform. Eg: MacOS has md5. Linux has md5sum
// This only works for linux SCP resources. Improve to work in mac and windows resources
- Session.Command md5Command = sshClient.startSession().exec("md5sum " + scpResource.getResourcePath());
+ Session.Command md5Command = sshClient.startSession().exec("md5sum " + scpResource.getFile().getResourcePath());
StringWriter outWriter = new StringWriter();
StringWriter errorWriter = new StringWriter();
@@ -129,9 +130,15 @@
SCPSecret scpSecret = secretClient.scp().getSCPSecret(SCPSecretGetRequest.newBuilder().setSecretId(credentialToken).build());
try (SSHClient sshClient = getSSHClient(scpResource, scpSecret)) {
- logger.info("Checking the availability of file {}", scpResource.getResourcePath());
+ logger.info("Checking the availability of file {}", scpResource.getFile().getResourcePath());
try (SFTPClient sftpClient = sshClient.newSFTPClient()) {
- return sftpClient.statExistence(scpResource.getResourcePath()) != null;
+ switch (scpResource.getResourceCase().name()){
+ case ResourceTypes.FILE:
+ return sftpClient.statExistence(scpResource.getFile().getResourcePath()) != null;
+ case ResourceTypes.DIRECTORY:
+ return sftpClient.statExistence(scpResource.getDirectory().getResourcePath()) != null;
+ }
+ return false;
}
}
}
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
index e867a99..127ea8c 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPReceiver.java
@@ -21,6 +21,7 @@
import com.jcraft.jsch.Session;
import org.apache.airavata.mft.core.ConnectorContext;
import org.apache.airavata.mft.core.DoubleStreamingBuffer;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
@@ -77,12 +78,20 @@
public void startStream(ConnectorContext context) throws Exception {
checkInitialized();
if (session == null) {
- System.out.println("Session can not be null. Make sure that SCP Receiver is properly initialized");
+ logger.error("Session can not be null. Make sure that SCP Receiver is properly initialized");
throw new Exception("Session can not be null. Make sure that SCP Receiver is properly initialized");
}
- transferRemoteToStream(session, this.scpResource.getResourcePath(), context.getStreamBuffer());
- logger.info("SCP Receive completed. Transfer {}", context.getTransferId());
+ if (ResourceTypes.FILE.equals(this.scpResource.getResourceCase().name())) {
+ transferRemoteToStream(session, this.scpResource.getFile().getResourcePath(), context.getStreamBuffer());
+ logger.info("SCP Receive completed. Transfer {}", context.getTransferId());
+
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.scpResource.getResourceId(), this.scpResource.getResourceCase().name());
+ throw new Exception("Resource " + this.scpResource.getResourceId() + " should be a FILE type. Found a " +
+ this.scpResource.getResourceCase().name());
+ }
}
private void transferRemoteToStream(Session session, String from, DoubleStreamingBuffer streamBuffer) throws Exception {
diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
index 27fcf89..b552f0e 100644
--- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
+++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPSender.java
@@ -22,6 +22,7 @@
import com.jcraft.jsch.Session;
import org.apache.airavata.mft.core.ConnectorContext;
import org.apache.airavata.mft.core.DoubleStreamingBuffer;
+import org.apache.airavata.mft.core.ResourceTypes;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecret;
import org.apache.airavata.mft.credential.stubs.scp.SCPSecretGetRequest;
@@ -92,8 +93,17 @@
throw new Exception("Session can not be null. Make sure that SCP Sender is properly initialized");
}
try {
- copyLocalToRemote(this.session, this.scpResource.getResourcePath(), context.getStreamBuffer(), context.getMetadata().getResourceSize());
- logger.info("SCP send to transfer {} completed", context.getTransferId());
+ if (ResourceTypes.FILE.equals(this.scpResource.getResourceCase().name())) {
+ copyLocalToRemote(this.session, this.scpResource.getFile().getResourcePath(), context.getStreamBuffer(), context.getMetadata().getResourceSize());
+ logger.info("SCP send to transfer {} completed", context.getTransferId());
+
+ } else {
+ logger.error("Resource {} should be a FILE type. Found a {}",
+ this.scpResource.getResourceId(), this.scpResource.getResourceCase().name());
+ throw new Exception("Resource " + this.scpResource.getResourceId() + " should be a FILE type. Found a " +
+ this.scpResource.getResourceCase().name());
+ }
+
} catch (Exception e) {
logger.error("Errored while streaming to remote scp server. Transfer {}", context.getTransferId() , e);
throw e;