MINIFICPP-1744 Add FetchGCSObject, DeleteGCSObject, ListGCSBucket
MINIFICPP-1744: Add FetchGCSObject
MINIFICPP-1745: Add DeleteGCSObject
MINIFICPP-1746: Add ListGCSBucket processor
Closes #1297
Signed-off-by: Marton Szasz <szaszm@apache.org>
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 88a0742..cbd1ac2 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -16,6 +16,7 @@
- [DefragmentText](#defragmenttext)
- [DeleteAzureBlobStorage](#deleteazureblobstorage)
- [DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)
+- [DeleteGCSObject](#deletegcsobject)
- [DeleteS3Object](#deletes3object)
- [ExecuteProcess](#executeprocess)
- [ExecutePythonProcessor](#executepythonprocessor)
@@ -24,6 +25,7 @@
- [ExtractText](#extracttext)
- [FetchAzureBlobStorage](#fetchazureblobstorage)
- [FetchAzureDataLakeStorage](#fetchazuredatalakestorage)
+- [FetchGCSObject](#fetchgcsobject)
- [FetchFile](#fetchfile)
- [FetchOPCProcessor](#fetchopcprocessor)
- [FetchS3Object](#fetchs3object)
@@ -38,6 +40,7 @@
- [InvokeHTTP](#invokehttp)
- [ListAzureBlobStorage](#listazureblobstorage)
- [ListAzureDataLakeStorage](#listazuredatalakestorage)
+- [ListGCSBucket](#listgcsbucket)
- [ListenHTTP](#listenhttp)
- [ListenSyslog](#listensyslog)
- [ListFile](#listfile)
@@ -393,6 +396,42 @@
|failure|If file deletion from Azure Data Lake storage fails the flowfile is transferred to this relationship|
|success|If file deletion from Azure Data Lake storage succeeds the flowfile is transferred to this relationship|
+## DeleteGCSObject
+
+### Description
+
+Deletes an object from a Google Cloud Bucket.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+|--------------------------------------|---------------|-----------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Bucket** | ${gcs.bucket} | | Bucket of the object.<br>**Supports Expression Language: true** |
+| **Key** | ${filename} | | Name of the object.<br>**Supports Expression Language: true** |
+| **Number of retries** | 6 | integers | How many retry attempts should be made before routing to the failure relationship. |
+| **GCP Credentials Provider Service** | | [GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService) | The Controller Service used to obtain Google Cloud Platform credentials. |
+| Server Side Encryption Key | | | An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.<br>**Supports Expression Language: true** |
+| Object Generation | | | The generation of the Object to download. If left empty, then it will download the latest generation.<br>**Supports Expression Language: true** |
+| Endpoint Override URL | | | Overrides the default Google Cloud Storage endpoints |
+
+
+### Relationships
+
+| Name | Description |
+|---------|----------------------------------------------------------------------------------------------|
+| success | FlowFiles are routed to this relationship after a successful Google Cloud Storage operation. |
+| failure | FlowFiles are routed to this relationship if the Google Cloud Storage operation fails. |
+
+
+### Output Attributes
+
+| Attribute | Relationship | Description |
+|----------------------|--------------|---------------------------------------------------------|
+| _gcs.status.message_ | failure | The status message received from google cloud. |
+| _gcs.error.reason_ | failure | The description of the error occurred during operation. |
+| _gcs.error.domain_ | failure | The domain of the error occurred during operation. |
## DeleteS3Object
@@ -603,6 +642,43 @@
|success|Files that have been successfully fetched from Azure storage are transferred to this relationship|
+## FetchGCSObject
+
+### Description
+
+Fetches a file from a Google Cloud Bucket. Designed to be used in tandem with ListGCSBucket.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+|--------------------------------------|---------------|-----------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Bucket** | ${gcs.bucket} | | Bucket of the object.<br>**Supports Expression Language: true** |
+| **Key** | ${filename} | | Name of the object.<br>**Supports Expression Language: true** |
+| **Number of retries** | 6 | integers | How many retry attempts should be made before routing to the failure relationship. |
+| **GCP Credentials Provider Service** | | [GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService) | The Controller Service used to obtain Google Cloud Platform credentials. |
+| Server Side Encryption Key | | | An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.<br>**Supports Expression Language: true** |
+| Object Generation | | | The generation of the Object to download. If left empty, then it will download the latest generation.<br>**Supports Expression Language: true** |
+| Endpoint Override URL | | | Overrides the default Google Cloud Storage endpoints |
+
+
+### Relationships
+
+| Name | Description |
+|---------|----------------------------------------------------------------------------------------------|
+| success | FlowFiles are routed to this relationship after a successful Google Cloud Storage operation. |
+| failure | FlowFiles are routed to this relationship if the Google Cloud Storage operation fails. |
+
+
+### Output Attributes
+
+| Attribute | Relationship | Description |
+|----------------------|--------------|---------------------------------------------------------|
+| _gcs.status.message_ | failure | The status message received from google cloud. |
+| _gcs.error.reason_ | failure | The description of the error occurred during operation. |
+| _gcs.error.domain_ | failure | The domain of the error occurred during operation. |
+
## FetchFile
### Description
@@ -1003,6 +1079,59 @@
|success|All FlowFiles that are received are routed to success|
+## ListGCSBucket
+
+### Description
+
+Retrieves a listing of objects from an GCS bucket.
+For each object that is listed, creates a FlowFile that represents the object so that it can be fetched or deleted in conjunction with FetchGCSObject/DeleteGCSObject.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+|--------------------------------------|---------------|-----------------------------------------------------------------------------------|------------------------------------------------------------------------------------|
+| **Bucket** | | | Bucket of the object.<br>**Supports Expression Language: true** |
+| **Number of retries** | 6 | integers | How many retry attempts should be made before routing to the failure relationship. |
+| **GCP Credentials Provider Service** | | [GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService) | The Controller Service used to obtain Google Cloud Platform credentials. |
+| Endpoint Override URL | | | Overrides the default Google Cloud Storage endpoints |
+| List all versions | false | false<br>true | Set this option to `true` to get all the previous versions separately. |
+
+
+### Relationships
+
+| Name | Description |
+|---------|-----------------------------------------------------------------------------------------------|
+| success | FlowFiles are routed to this relationship after a successful Google Cloud Storage operation. |
+
+### Output Attributes
+
+| Attribute | Relationship | Description |
+|----------------------------|--------------|--------------------------------------------------------------------|
+| _gcs.bucket_ | success | Bucket of the object. |
+| _gcs.key, filename_ | success | Name of the object. |
+| _gcs.size_ | success | Size of the object. |
+| _gcs.crc32c_ | success | The CRC32C checksum of object's data, encoded in base64 |
+| _gcs.md5_ | success | The MD5 hash of the object's data encoded in base64. |
+| _gcs.owner.entity_ | success | The owner entity, in the form "user-emailAddress". |
+| _gcs.owner.entity.id_ | success | The ID for the entity. |
+| _gcs.content.encoding_ | success | The content encoding of the object. |
+| _gcs.content.language_ | success | The content language of the object. |
+| _gcs.content.disposition_ | success | The data content disposition of the object. |
+| _gcs.media.link_ | success | The media download link to the object. |
+| _gcs.self.link_ | success | The link to this object. |
+| _gcs.etag_ | success | The HTTP 1.1 Entity tag for the object. |
+| _gcs.generated.id_ | success | The service-generated ID for the object |
+| _gcs.generation_ | success | The content generation of this object. Used for object versioning. |
+| _gcs.metageneration_ | success | The metageneration of the object. |
+| _gcs.create.time_ | success | Unix timestamp of the object's creation in milliseconds |
+| _gcs.update.time_ | success | Unix timestamp of the object's last modification in milliseconds |
+| _gcs.delete.time_ | success | Unix timestamp of the object's deletion in milliseconds |
+| _gcs.encryption.algorithm_ | success | The algorithm used to encrypt the object. |
+| _gcs.encryption.sha256_ | success | The SHA256 hash of the key used to encrypt the object |
+
+
## ListenHTTP
### Description
@@ -1614,7 +1743,7 @@
|--------------------------------------|---------------|-----------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Bucket** | ${gcs.bucket} | | Bucket of the object.<br>**Supports Expression Language: true** |
| **Key** | ${filename} | | Name of the object.<br>**Supports Expression Language: true** |
-| **Number Of retries** | 6 | integers | How many retry attempts should be made before routing to the failure relationship. |
+| **Number of retries** | 6 | integers | How many retry attempts should be made before routing to the failure relationship. |
| **GCP Credentials Provider Service** | | [GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService) | The Controller Service used to obtain Google Cloud Platform credentials. |
| Object ACL | | authenticatedRead<br>bucketOwnerFullControl<br>bucketOwnerRead<br>private<br>projectPrivate<br>publicRead | Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults. For more information please visit [Google Cloud Access control lists](https://cloud.google.com/storage/docs/access-control/lists#predefined-acl) |
| Server Side Encryption Key | | | An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.<br>**Supports Expression Language: true** |
@@ -1634,6 +1763,7 @@
| Attribute | Relationship | Description |
|----------------------------|--------------|--------------------------------------------------------------------|
+| _gcs.status.message_ | failure | The status message received from google cloud. |
| _gcs.error.reason_ | failure | The description of the error occurred during upload. |
| _gcs.error.domain_ | failure | The domain of the error occurred during upload. |
| _gcs.bucket_ | success | Bucket of the object. |
@@ -1652,9 +1782,9 @@
| _gcs.generated.id_ | success | The service-generated ID for the object |
| _gcs.generation_ | success | The content generation of this object. Used for object versioning. |
| _gcs.metageneration_ | success | The metageneration of the object. |
-| _gcs.create.time_ | success | The creation time of the object (milliseconds) |
-| _gcs.update.time_ | success | The last modification time of the object (milliseconds) |
-| _gcs.delete.time_ | success | The deletion time of the object (milliseconds) |
+| _gcs.create.time_ | success | Unix timestamp of the object's creation in milliseconds |
+| _gcs.update.time_ | success | Unix timestamp of the object's last modification in milliseconds |
+| _gcs.delete.time_ | success | Unix timestamp of the object's deletion in milliseconds |
| _gcs.encryption.algorithm_ | success | The algorithm used to encrypt the object. |
| _gcs.encryption.sha256_ | success | The SHA256 hash of the key used to encrypt the object |
diff --git a/README.md b/README.md
index 26af00c..2ffb244 100644
--- a/README.md
+++ b/README.md
@@ -80,7 +80,7 @@
| CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) | -DDISABLE_CIVET=ON |
| CURL | [InvokeHTTP](PROCESSORS.md#invokehttp) | -DDISABLE_CURL=ON |
| GPS | GetGPS | -DENABLE_GPS=ON |
-| Google Cloud Platform | [GcpCredentialsControllerService](CONTROLLERS.md#GcpCredentialsControllerService)<br>[PutGCSObject](PROCESSORS.md#putgcsobject) | -DENABLE_GCP=ON |
+| Google Cloud Platform | [DeleteGCSObject](PROCESSORS.md#deletegcsobject)<br>[FetchGCSObject](PROCESSORS.md#fetchgcsobject)<br>[GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService)<br>[ListGCSBucket](PROCESSORS.md#listgcsbucket)<br>[PutGCSObject](PROCESSORS.md#putgcsobject) | -DENABLE_GCP=ON |
| Kafka | [PublishKafka](PROCESSORS.md#publishkafka) | -DENABLE_LIBRDKAFKA=ON |
| Kubernetes | [KubernetesControllerService](CONTROLLERS.md#kubernetesControllerService) | -DENABLE_KUBERNETES=ON |
| JNI | **NiFi Processors** | -DENABLE_JNI=ON |
diff --git a/docker/test/integration/features/google_cloud_storage.feature b/docker/test/integration/features/google_cloud_storage.feature
index 18af9b1..ad2b6f1 100644
--- a/docker/test/integration/features/google_cloud_storage.feature
+++ b/docker/test/integration/features/google_cloud_storage.feature
@@ -17,3 +17,31 @@
Then a flowfile with the content "hello_gcs" is placed in the monitored directory in less than 45 seconds
And an object with the content "hello_gcs" is present in the Google Cloud storage
+
+ Scenario: A MiNiFi instance can fetch the listed objects from Google Cloud storage bucket
+ Given a Google Cloud storage server is set up and a single object with contents "preloaded data" is present
+ And a ListGCSBucket processor
+ And a FetchGCSObject processor
+ And the ListGCSBucket and the FetchGCSObject processors are set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And the "success" relationship of the ListGCSBucket processor is connected to the FetchGCSObject
+ And the "success" relationship of the FetchGCSObject processor is connected to the PutFile
+
+ When all instances start up
+
+ Then a flowfile with the content "preloaded data" is placed in the monitored directory in less than 10 seconds
+
+
+ Scenario: A MiNiFi instance can delete the listed objects from Google Cloud storage bucket
+ Given a Google Cloud storage server is set up with some test data
+ And a ListGCSBucket processor
+ And a DeleteGCSObject processor
+ And the ListGCSBucket and the DeleteGCSObject processors are set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And the "success" relationship of the ListGCSBucket processor is connected to the DeleteGCSObject
+ And the "success" relationship of the DeleteGCSObject processor is connected to the PutFile
+
+ When all instances start up
+
+ Then the test bucket of Google Cloud Storage is empty
+ And at least one empty flowfile is placed in the monitored directory in less than 10 seconds
diff --git a/docker/test/integration/minifi/processors/DeleteGCSObject.py b/docker/test/integration/minifi/processors/DeleteGCSObject.py
new file mode 100644
index 0000000..f816c6f
--- /dev/null
+++ b/docker/test/integration/minifi/processors/DeleteGCSObject.py
@@ -0,0 +1,14 @@
+from ..core.Processor import Processor
+
+
+class DeleteGCSObject(Processor):
+ def __init__(
+ self):
+ super(DeleteGCSObject, self).__init__(
+ 'DeleteGCSObject',
+ properties={
+ 'Bucket': 'test-bucket',
+ 'Endpoint Override URL': 'fake-gcs-server:4443',
+ 'Number of retries': 2
+ },
+ auto_terminate=["success", "failure"])
diff --git a/docker/test/integration/minifi/processors/FetchGCSObject.py b/docker/test/integration/minifi/processors/FetchGCSObject.py
new file mode 100644
index 0000000..557e565
--- /dev/null
+++ b/docker/test/integration/minifi/processors/FetchGCSObject.py
@@ -0,0 +1,14 @@
+from ..core.Processor import Processor
+
+
+class FetchGCSObject(Processor):
+ def __init__(
+ self):
+ super(FetchGCSObject, self).__init__(
+ 'FetchGCSObject',
+ properties={
+ 'Bucket': 'test-bucket',
+ 'Endpoint Override URL': 'fake-gcs-server:4443',
+ 'Number of retries': 2
+ },
+ auto_terminate=["success", "failure"])
diff --git a/docker/test/integration/minifi/processors/ListGCSBucket.py b/docker/test/integration/minifi/processors/ListGCSBucket.py
new file mode 100644
index 0000000..5c66254
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ListGCSBucket.py
@@ -0,0 +1,14 @@
+from ..core.Processor import Processor
+
+
+class ListGCSBucket(Processor):
+ def __init__(
+ self):
+ super(ListGCSBucket, self).__init__(
+ 'ListGCSBucket',
+ properties={
+ 'Bucket': 'test-bucket',
+ 'Endpoint Override URL': 'fake-gcs-server:4443',
+ 'Number of retries': 2
+ },
+ auto_terminate=["success"])
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index f588c8c..8769ed6 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -403,8 +403,9 @@
# google cloud storage setup
-@given("a Google Cloud storage server is set up with some test data")
@given("a Google Cloud storage server is set up")
+@given("a Google Cloud storage server is set up with some test data")
+@given('a Google Cloud storage server is set up and a single object with contents "preloaded data" is present')
def step_impl(context):
context.test.acquire_container("fake-gcs-server", "fake-gcs-server")
@@ -445,6 +446,17 @@
p1.set_property("GCP Credentials Provider Service", gcp_controller_service.name)
+@given(u'the {processor_one} and the {processor_two} processors are set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server')
+def step_impl(context, processor_one, processor_two):
+ gcp_controller_service = GCPCredentialsControllerService(credentials_location="Use Anonymous credentials")
+ p1 = context.test.get_node_by_name(processor_one)
+ p2 = context.test.get_node_by_name(processor_two)
+ p1.controller_services.append(gcp_controller_service)
+ p1.set_property("GCP Credentials Provider Service", gcp_controller_service.name)
+ p2.controller_services.append(gcp_controller_service)
+ p2.set_property("GCP Credentials Provider Service", gcp_controller_service.name)
+
+
@given("the kafka broker is started")
def step_impl(context):
context.test.start_kafka_broker()
diff --git a/extensions/gcp/GCPAttributes.h b/extensions/gcp/GCPAttributes.h
index 92548fd..38af0d3 100644
--- a/extensions/gcp/GCPAttributes.h
+++ b/extensions/gcp/GCPAttributes.h
@@ -24,6 +24,7 @@
constexpr const char* GCS_ERROR_REASON = "gcs.error.reason";
constexpr const char* GCS_ERROR_DOMAIN = "gcs.error.domain";
+constexpr const char* GCS_STATUS_MESSAGE = "gcs.status.message";
constexpr const char* GCS_BUCKET_ATTR = "gcs.bucket";
constexpr const char* GCS_OBJECT_NAME_ATTR = "gcs.key";
constexpr const char* GCS_SIZE_ATTR = "gcs.size";
diff --git a/extensions/gcp/processors/DeleteGCSObject.cpp b/extensions/gcp/processors/DeleteGCSObject.cpp
new file mode 100644
index 0000000..e95178d
--- /dev/null
+++ b/extensions/gcp/processors/DeleteGCSObject.cpp
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeleteGCSObject.h"
+
+#include "core/Resource.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/FlowFile.h"
+#include "../GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property DeleteGCSObject::Bucket(
+ core::PropertyBuilder::createProperty("Bucket")
+ ->withDescription("Bucket of the object.")
+ ->withDefaultValue("${gcs.bucket}")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property DeleteGCSObject::Key(
+ core::PropertyBuilder::createProperty("Key")
+ ->withDescription("Name of the object.")
+ ->withDefaultValue("${filename}")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property DeleteGCSObject::ObjectGeneration(
+ core::PropertyBuilder::createProperty("Object Generation")
+ ->withDescription("The generation of the Object to download. If left empty, then it will download the latest generation.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property DeleteGCSObject::EncryptionKey(
+ core::PropertyBuilder::createProperty("Server Side Encryption Key")
+ ->withDescription("The AES256 Encryption Key (encoded in base64) for server-side decryption of the object.")
+ ->isRequired(false)
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Relationship DeleteGCSObject::Success("success", "FlowFiles are routed to this relationship after a successful Google Cloud Storage operation.");
+const core::Relationship DeleteGCSObject::Failure("failure", "FlowFiles are routed to this relationship if the Google Cloud Storage operation fails.");
+
+void DeleteGCSObject::initialize() {
+ setSupportedProperties({GCPCredentials,
+ Bucket,
+ Key,
+ ObjectGeneration,
+ NumberOfRetries,
+ EncryptionKey,
+ EndpointOverrideURL});
+ setSupportedRelationships({Success, Failure});
+}
+
+void DeleteGCSObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context && session && gcp_credentials_);
+
+ auto flow_file = session->get();
+ if (!flow_file) {
+ context->yield();
+ return;
+ }
+
+ auto bucket = context->getProperty(Bucket, flow_file);
+ if (!bucket || bucket->empty()) {
+ logger_->log_error("Missing bucket name");
+ session->transfer(flow_file, Failure);
+ return;
+ }
+ auto object_name = context->getProperty(Key, flow_file);
+ if (!object_name || object_name->empty()) {
+ logger_->log_error("Missing object name");
+ session->transfer(flow_file, Failure);
+ return;
+ }
+
+ gcs::Generation generation;
+
+ if (auto gen_str = context->getProperty(ObjectGeneration, flow_file); gen_str && !gen_str->empty()) {
+ try {
+ uint64_t gen;
+ utils::internal::ValueParser(*gen_str).parse(gen).parseEnd();
+ generation = gcs::Generation(gen);
+ } catch (const utils::internal::ValueException&) {
+ logger_->log_error("Invalid generation: %s", *gen_str);
+ session->transfer(flow_file, Failure);
+ return;
+ }
+ }
+
+ auto status = getClient().DeleteObject(*bucket, *object_name, generation, gcs::IfGenerationNotMatch(0));
+
+ if (!status.ok()) {
+ flow_file->setAttribute(GCS_STATUS_MESSAGE, status.message());
+ flow_file->setAttribute(GCS_ERROR_REASON, status.error_info().reason());
+ flow_file->setAttribute(GCS_ERROR_DOMAIN, status.error_info().domain());
+ logger_->log_error("Failed to delete %s object from %s bucket on Google Cloud Storage %s %s", *object_name, *bucket, status.message(), status.error_info().reason());
+ session->transfer(flow_file, Failure);
+ return;
+ }
+
+ session->transfer(flow_file, Success);
+}
+
+REGISTER_RESOURCE(DeleteGCSObject, "Deletes an object from a Google Cloud Bucket.");
+} // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/DeleteGCSObject.h b/extensions/gcp/processors/DeleteGCSObject.h
new file mode 100644
index 0000000..22644c9
--- /dev/null
+++ b/extensions/gcp/processors/DeleteGCSObject.h
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "GCSProcessor.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+
+class DeleteGCSObject : public GCSProcessor {
+ public:
+ explicit DeleteGCSObject(const std::string& name, const utils::Identifier& uuid = {})
+ : GCSProcessor(name, uuid, core::logging::LoggerFactory<DeleteGCSObject>::getLogger()) {
+ }
+ ~DeleteGCSObject() override = default;
+
+ EXTENSIONAPI static const core::Property Bucket;
+ EXTENSIONAPI static const core::Property Key;
+ EXTENSIONAPI static const core::Property EncryptionKey;
+ EXTENSIONAPI static const core::Property ObjectGeneration;
+
+ EXTENSIONAPI static const core::Relationship Success;
+ EXTENSIONAPI static const core::Relationship Failure;
+
+ void initialize() override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+ core::annotation::Input getInputRequirement() const override {
+ return core::annotation::Input::INPUT_REQUIRED;
+ }
+
+ bool isSingleThreaded() const override {
+ return true;
+ }
+};
+
+} // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/FetchGCSObject.cpp b/extensions/gcp/processors/FetchGCSObject.cpp
new file mode 100644
index 0000000..58e36df
--- /dev/null
+++ b/extensions/gcp/processors/FetchGCSObject.cpp
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchGCSObject.h"
+
+#include <utility>
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "../GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property FetchGCSObject::Bucket(
+ core::PropertyBuilder::createProperty("Bucket")
+ ->withDescription("Bucket of the object.")
+ ->withDefaultValue("${gcs.bucket}")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property FetchGCSObject::Key(
+ core::PropertyBuilder::createProperty("Key")
+ ->withDescription("Name of the object.")
+ ->withDefaultValue("${filename}")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property FetchGCSObject::ObjectGeneration(
+ core::PropertyBuilder::createProperty("Object Generation")
+ ->withDescription("The generation of the Object to download. If left empty, then it will download the latest generation.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property FetchGCSObject::EncryptionKey(
+ core::PropertyBuilder::createProperty("Server Side Encryption Key")
+ ->withDescription("The AES256 Encryption Key (encoded in base64) for server-side decryption of the object.")
+ ->isRequired(false)
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Relationship FetchGCSObject::Success("success", "FlowFiles are routed to this relationship after a successful Google Cloud Storage operation.");
+const core::Relationship FetchGCSObject::Failure("failure", "FlowFiles are routed to this relationship if the Google Cloud Storage operation fails.");
+
+
+namespace {
+class FetchFromGCSCallback {
+ public:
+ FetchFromGCSCallback(gcs::Client& client, std::string bucket, std::string key)
+ : bucket_(std::move(bucket)),
+ key_(std::move(key)),
+ client_(client) {
+ }
+
+ int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
+ auto reader = client_.ReadObject(bucket_, key_, encryption_key_, generation_, gcs::IfGenerationNotMatch(0));
+ auto set_members = gsl::finally([&]{
+ status_ = reader.status();
+ result_generation_ = reader.generation();
+ meta_generation_ = reader.metageneration();
+ storage_class_ = reader.storage_class();
+ });
+ if (!reader)
+ return 0;
+ std::string contents{std::istreambuf_iterator<char>{reader}, {}};
+ auto write_ret = stream->write(gsl::make_span(contents).as_span<std::byte>());
+ reader.Close();
+ return write_ret;
+ }
+
+ [[nodiscard]] auto getStatus() const noexcept { return status_; }
+ [[nodiscard]] auto getGeneration() const noexcept { return result_generation_; }
+ [[nodiscard]] auto getMetaGeneration() const noexcept { return meta_generation_; }
+ [[nodiscard]] auto getStorageClass() const noexcept { return storage_class_; }
+
+
+ void setEncryptionKey(const gcs::EncryptionKey& encryption_key) {
+ encryption_key_ = encryption_key;
+ }
+
+ void setGeneration(const gcs::Generation generation) {
+ generation_ = generation;
+ }
+
+ private:
+ std::string bucket_;
+ std::string key_;
+ gcs::Client& client_;
+
+ gcs::EncryptionKey encryption_key_;
+ gcs::Generation generation_;
+
+ google::cloud::Status status_;
+ std::optional<std::int64_t> result_generation_;
+ std::optional<std::int64_t> meta_generation_;
+ std::optional<std::string> storage_class_;
+};
+} // namespace
+
+
+void FetchGCSObject::initialize() {
+ setSupportedProperties({GCPCredentials,
+ Bucket,
+ Key,
+ ObjectGeneration,
+ NumberOfRetries,
+ EncryptionKey,
+ EndpointOverrideURL});
+ setSupportedRelationships({Success, Failure});
+}
+
+void FetchGCSObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+ GCSProcessor::onSchedule(context, session_factory);
+ gsl_Expects(context);
+ if (auto encryption_key = context->getProperty(EncryptionKey)) {
+ try {
+ encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key);
+ } catch (const google::cloud::RuntimeStatusError&) {
+ throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + EncryptionKey.getName()); }
+ }
+}
+
+void FetchGCSObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context && session && gcp_credentials_);
+
+ auto flow_file = session->get();
+ if (!flow_file) {
+ context->yield();
+ return;
+ }
+
+ auto bucket = context->getProperty(Bucket, flow_file);
+ if (!bucket || bucket->empty()) {
+ logger_->log_error("Missing bucket name");
+ session->transfer(flow_file, Failure);
+ return;
+ }
+ auto object_name = context->getProperty(Key, flow_file);
+ if (!object_name || object_name->empty()) {
+ logger_->log_error("Missing object name");
+ session->transfer(flow_file, Failure);
+ return;
+ }
+
+ gcs::Client client = getClient();
+ FetchFromGCSCallback callback(client, *bucket, *object_name);
+ callback.setEncryptionKey(encryption_key_);
+
+ if (auto gen_str = context->getProperty(ObjectGeneration, flow_file); gen_str && !gen_str->empty()) {
+ try {
+ uint64_t gen;
+ utils::internal::ValueParser(*gen_str).parse(gen).parseEnd();
+ callback.setGeneration(gcs::Generation(gen));
+ } catch (const utils::internal::ValueException&) {
+ logger_->log_error("Invalid generation: %s", *gen_str);
+ session->transfer(flow_file, Failure);
+ return;
+ }
+ }
+
+ session->write(flow_file, std::ref(callback));
+ if (!callback.getStatus().ok()) {
+ flow_file->setAttribute(GCS_STATUS_MESSAGE, callback.getStatus().message());
+ flow_file->setAttribute(GCS_ERROR_REASON, callback.getStatus().error_info().reason());
+ flow_file->setAttribute(GCS_ERROR_DOMAIN, callback.getStatus().error_info().domain());
+ logger_->log_error("Failed to fetch from Google Cloud Storage %s %s", callback.getStatus().message(), callback.getStatus().error_info().reason());
+ session->transfer(flow_file, Failure);
+ return;
+ }
+
+ if (auto generation = callback.getGeneration())
+ flow_file->setAttribute(GCS_GENERATION, std::to_string(*generation));
+ if (auto meta_generation = callback.getMetaGeneration())
+ flow_file->setAttribute(GCS_META_GENERATION, std::to_string(*meta_generation));
+ if (auto storage_class = callback.getStorageClass())
+ flow_file->setAttribute(GCS_STORAGE_CLASS, *storage_class);
+ session->transfer(flow_file, Success);
+}
+
+REGISTER_RESOURCE(FetchGCSObject, "Fetches a file from a Google Cloud Bucket. Designed to be used in tandem with ListGCSBucket.");
+} // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/FetchGCSObject.h b/extensions/gcp/processors/FetchGCSObject.h
new file mode 100644
index 0000000..d71e747
--- /dev/null
+++ b/extensions/gcp/processors/FetchGCSObject.h
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "GCSProcessor.h"
+#include "google/cloud/storage/well_known_headers.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+
+class FetchGCSObject : public GCSProcessor {
+ public:
+ explicit FetchGCSObject(const std::string& name, const utils::Identifier& uuid = {})
+ : GCSProcessor(name, uuid, core::logging::LoggerFactory<FetchGCSObject>::getLogger()) {
+ }
+ ~FetchGCSObject() override = default;
+
+ EXTENSIONAPI static const core::Property Bucket;
+ EXTENSIONAPI static const core::Property Key;
+ EXTENSIONAPI static const core::Property EncryptionKey;
+ EXTENSIONAPI static const core::Property ObjectGeneration;
+
+ EXTENSIONAPI static const core::Relationship Success;
+ EXTENSIONAPI static const core::Relationship Failure;
+
+ void initialize() override;
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+ core::annotation::Input getInputRequirement() const override {
+ return core::annotation::Input::INPUT_REQUIRED;
+ }
+
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
+ private:
+ google::cloud::storage::EncryptionKey encryption_key_;
+};
+
+} // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/GCSProcessor.cpp b/extensions/gcp/processors/GCSProcessor.cpp
new file mode 100644
index 0000000..445f295
--- /dev/null
+++ b/extensions/gcp/processors/GCSProcessor.cpp
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GCSProcessor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "../controllerservices/GCPCredentialsControllerService.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+
+const core::Property GCSProcessor::GCPCredentials(
+ core::PropertyBuilder::createProperty("GCP Credentials Provider Service")
+ ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.")
+ ->isRequired(true)
+ ->asType<GCPCredentialsControllerService>()
+ ->build());
+
+const core::Property GCSProcessor::NumberOfRetries(
+ core::PropertyBuilder::createProperty("Number of retries")
+ ->withDescription("How many retry attempts should be made before routing to the failure relationship.")
+ ->withDefaultValue<uint64_t>(6)
+ ->isRequired(true)
+ ->supportsExpressionLanguage(false)
+ ->build());
+
+const core::Property GCSProcessor::EndpointOverrideURL(
+ core::PropertyBuilder::createProperty("Endpoint Override URL")
+ ->withDescription("Overrides the default Google Cloud Storage endpoints")
+ ->isRequired(false)
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+namespace {
+std::shared_ptr<google::cloud::storage::oauth2::Credentials> getCredentials(core::ProcessContext& context) {
+ std::string service_name;
+ if (context.getProperty(GCSProcessor::GCPCredentials.getName(), service_name) && !IsNullOrEmpty(service_name)) {
+ auto gcp_credentials_controller_service = std::dynamic_pointer_cast<const GCPCredentialsControllerService>(context.getControllerService(service_name));
+ if (!gcp_credentials_controller_service)
+ return nullptr;
+ return gcp_credentials_controller_service->getCredentials();
+ }
+ return nullptr;
+}
+} // namespace
+
+
+void GCSProcessor::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+ gsl_Expects(context);
+ if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) {
+ retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries);
+ }
+
+ gcp_credentials_ = getCredentials(*context);
+ if (!gcp_credentials_) {
+ throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing GCP Credentials");
+ }
+
+ endpoint_url_ = context->getProperty(EndpointOverrideURL);
+ if (endpoint_url_)
+ logger_->log_debug("Endpoint overwritten: %s", *endpoint_url_);
+}
+
+gcs::Client GCSProcessor::getClient() const {
+ auto options = gcs::ClientOptions(gcp_credentials_);
+ if (endpoint_url_)
+ options.set_endpoint(*endpoint_url_);
+ return gcs::Client(options, *retry_policy_);
+}
+
+} // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/GCSProcessor.h b/extensions/gcp/processors/GCSProcessor.h
new file mode 100644
index 0000000..f673a3c
--- /dev/null
+++ b/extensions/gcp/processors/GCSProcessor.h
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#pragma once
+
+#include <string>
+#include <memory>
+#include <utility>
+#include <optional>
+
+#include "core/logging/Logger.h"
+#include "core/Processor.h"
+#include "google/cloud/storage/oauth2/credentials.h"
+#include "google/cloud/storage/client.h"
+#include "google/cloud/storage/retry_policy.h"
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+class GCSProcessor : public core::Processor {
+ public:
+ GCSProcessor(const std::string& name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger)
+ : core::Processor(name, uuid),
+ logger_(std::move(logger)) {
+ }
+
+ EXTENSIONAPI static const core::Property GCPCredentials;
+ EXTENSIONAPI static const core::Property NumberOfRetries;
+ EXTENSIONAPI static const core::Property EndpointOverrideURL;
+
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+ protected:
+ virtual google::cloud::storage::Client getClient() const;
+
+ std::optional<std::string> endpoint_url_;
+ std::shared_ptr<google::cloud::storage::oauth2::Credentials> gcp_credentials_;
+ google::cloud::storage::RetryPolicyOption::Type retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(6);
+ std::shared_ptr<core::logging::Logger> logger_;
+};
+
+} // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/ListGCSBucket.cpp b/extensions/gcp/processors/ListGCSBucket.cpp
new file mode 100644
index 0000000..be6dc22
--- /dev/null
+++ b/extensions/gcp/processors/ListGCSBucket.cpp
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListGCSBucket.h"
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "../GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property ListGCSBucket::Bucket(
+ core::PropertyBuilder::createProperty("Bucket")
+ ->withDescription("Bucket of the object.")
+ ->isRequired(true)
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+const core::Property ListGCSBucket::ListAllVersions(
+ core::PropertyBuilder::createProperty("List all versions")
+ ->withDescription("Set this option to `true` to get all the previous versions separately.")
+ ->withDefaultValue<bool>(false)
+ ->build());
+
+const core::Relationship ListGCSBucket::Success("success", "FlowFiles are routed to this relationship after a successful Google Cloud Storage operation.");
+
+void ListGCSBucket::initialize() {
+ setSupportedProperties({GCPCredentials,
+ Bucket,
+ NumberOfRetries,
+ EndpointOverrideURL,
+ ListAllVersions});
+ setSupportedRelationships({Success});
+}
+
+
+void ListGCSBucket::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+ GCSProcessor::onSchedule(context, session_factory);
+ gsl_Expects(context);
+ context->getProperty(Bucket.getName(), bucket_);
+}
+
+void ListGCSBucket::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context && session && gcp_credentials_);
+
+ gcs::Client client = getClient();
+ auto list_all_versions = context->getProperty<bool>(ListAllVersions);
+ gcs::Versions versions = (list_all_versions && *list_all_versions) ? gcs::Versions(true) : gcs::Versions(false);
+ auto objects_in_bucket = client.ListObjects(bucket_, versions);
+ for (const auto& object_in_bucket : objects_in_bucket) {
+ if (object_in_bucket.ok()) {
+ auto flow_file = session->create();
+ flow_file->updateAttribute(core::SpecialFlowAttribute::FILENAME, object_in_bucket->name());
+ setAttributesFromObjectMetadata(*flow_file, *object_in_bucket);
+ session->transfer(flow_file, Success);
+ } else {
+ logger_->log_error("Invalid object in bucket %s", bucket_);
+ }
+ }
+}
+
+REGISTER_RESOURCE(ListGCSBucket, "Retrieves a listing of objects from an GCS bucket. "
+ "For each object that is listed, creates a FlowFile that represents the object so that it can be fetched in conjunction with FetchGCSObject.");
+} // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/ListGCSBucket.h b/extensions/gcp/processors/ListGCSBucket.h
new file mode 100644
index 0000000..bdf8555
--- /dev/null
+++ b/extensions/gcp/processors/ListGCSBucket.h
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "GCSProcessor.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+
+class ListGCSBucket : public GCSProcessor {
+ public:
+ explicit ListGCSBucket(const std::string& name, const utils::Identifier& uuid = {})
+ : GCSProcessor(name, uuid, core::logging::LoggerFactory<ListGCSBucket>::getLogger()) {
+ }
+ ~ListGCSBucket() override = default;
+
+ EXTENSIONAPI static const core::Property Bucket;
+ EXTENSIONAPI static const core::Property ListAllVersions;
+
+ EXTENSIONAPI static const core::Relationship Success;
+
+ void initialize() override;
+ void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+ core::annotation::Input getInputRequirement() const override {
+ return core::annotation::Input::INPUT_FORBIDDEN;
+ }
+
+ bool isSingleThreaded() const override {
+ return true;
+ }
+
+ private:
+ std::string bucket_;
+};
+
+} // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/PutGCSObject.cpp b/extensions/gcp/processors/PutGCSObject.cpp
index 00a3d0c..eb5f499 100644
--- a/extensions/gcp/processors/PutGCSObject.cpp
+++ b/extensions/gcp/processors/PutGCSObject.cpp
@@ -17,27 +17,17 @@
#include "PutGCSObject.h"
-#include <vector>
#include <utility>
#include "core/Resource.h"
#include "core/FlowFile.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
-#include "io/StreamPipe.h"
-#include "utils/OptionalUtils.h"
#include "../GCPAttributes.h"
namespace gcs = ::google::cloud::storage;
namespace org::apache::nifi::minifi::extensions::gcp {
-const core::Property PutGCSObject::GCPCredentials(
- core::PropertyBuilder::createProperty("GCP Credentials Provider Service")
- ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.")
- ->isRequired(true)
- ->asType<GCPCredentialsControllerService>()
- ->build());
-
const core::Property PutGCSObject::Bucket(
core::PropertyBuilder::createProperty("Bucket")
->withDescription("Bucket of the object.")
@@ -52,14 +42,6 @@
->supportsExpressionLanguage(true)
->build());
-const core::Property PutGCSObject::NumberOfRetries(
- core::PropertyBuilder::createProperty("Number of retries")
- ->withDescription("How many retry attempts should be made before routing to the failure relationship.")
- ->withDefaultValue<uint64_t>(6)
- ->isRequired(true)
- ->supportsExpressionLanguage(false)
- ->build());
-
const core::Property PutGCSObject::ContentType(
core::PropertyBuilder::createProperty("Content Type")
->withDescription("Content Type for the file, i.e. text/plain ")
@@ -102,17 +84,9 @@
->withDefaultValue<bool>(true)
->build());
-const core::Property PutGCSObject::EndpointOverrideURL(
- core::PropertyBuilder::createProperty("Endpoint Override URL")
- ->withDescription("Overrides the default Google Cloud Storage endpoints")
- ->isRequired(false)
- ->supportsExpressionLanguage(true)
- ->build());
-
const core::Relationship PutGCSObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship");
const core::Relationship PutGCSObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship");
-
namespace {
class UploadToGCSCallback {
public:
@@ -183,16 +157,6 @@
google::cloud::StatusOr<gcs::ObjectMetadata> result_;
};
-std::shared_ptr<google::cloud::storage::oauth2::Credentials> getCredentials(core::ProcessContext& context) {
- std::string service_name;
- if (context.getProperty(PutGCSObject::GCPCredentials.getName(), service_name) && !IsNullOrEmpty(service_name)) {
- auto gcp_credentials_controller_service = std::dynamic_pointer_cast<const GCPCredentialsControllerService>(context.getControllerService(service_name));
- if (!gcp_credentials_controller_service)
- return nullptr;
- return gcp_credentials_controller_service->getCredentials();
- }
- return nullptr;
-}
} // namespace
@@ -211,16 +175,10 @@
setSupportedRelationships({Success, Failure});
}
-gcs::Client PutGCSObject::getClient(const gcs::ClientOptions& options) const {
- return gcs::Client(options, *retry_policy_);
-}
-
-void PutGCSObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+void PutGCSObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+ GCSProcessor::onSchedule(context, session_factory);
gsl_Expects(context);
- if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) {
- retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries);
- }
if (auto encryption_key = context->getProperty(EncryptionKey)) {
try {
encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key);
@@ -228,10 +186,6 @@
throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + EncryptionKey.getName());
}
}
- gcp_credentials_ = getCredentials(*context);
- if (!gcp_credentials_) {
- throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing GCP Credentials");
- }
}
void PutGCSObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
@@ -256,13 +210,7 @@
return;
}
- auto options = gcs::ClientOptions(gcp_credentials_);
- if (auto endpoint_override_url = context->getProperty(EndpointOverrideURL)) {
- options.set_endpoint(*endpoint_override_url);
- logger_->log_debug("Endpoint override url %s", *endpoint_override_url);
- }
-
- gcs::Client client = getClient(options);
+ gcs::Client client = getClient();
UploadToGCSCallback callback(client, *bucket, *object_name);
if (auto crc32_checksum = context->getProperty(Crc32cChecksum, flow_file)) {
@@ -286,6 +234,7 @@
session->read(flow_file, std::ref(callback));
auto& result = callback.getResult();
if (!result.ok()) {
+ flow_file->setAttribute(GCS_STATUS_MESSAGE, result.status().message());
flow_file->setAttribute(GCS_ERROR_REASON, result.status().error_info().reason());
flow_file->setAttribute(GCS_ERROR_DOMAIN, result.status().error_info().domain());
logger_->log_error("Failed to upload to Google Cloud Storage %s %s", result.status().message(), result.status().error_info().reason());
diff --git a/extensions/gcp/processors/PutGCSObject.h b/extensions/gcp/processors/PutGCSObject.h
index 4539622..d9cd680 100644
--- a/extensions/gcp/processors/PutGCSObject.h
+++ b/extensions/gcp/processors/PutGCSObject.h
@@ -20,17 +20,14 @@
#include <string>
#include <memory>
-#include "core/Processor.h"
-#include "core/logging/Logger.h"
+#include "GCSProcessor.h"
#include "core/logging/LoggerConfiguration.h"
-#include "../controllerservices/GCPCredentialsControllerService.h"
-#include "google/cloud/storage/client.h"
-#include "google/cloud/storage/retry_policy.h"
#include "utils/Enum.h"
+#include "google/cloud/storage/well_known_headers.h"
namespace org::apache::nifi::minifi::extensions::gcp {
-class PutGCSObject : public core::Processor {
+class PutGCSObject : public GCSProcessor {
public:
SMART_ENUM(PredefinedAcl,
(AUTHENTICATED_READ, "authenticatedRead"),
@@ -42,25 +39,18 @@
(PUBLIC_READ_WRITE, "publicReadWrite"));
explicit PutGCSObject(const std::string& name, const utils::Identifier& uuid = {})
- : core::Processor(name, uuid) {
+ : GCSProcessor(name, uuid, core::logging::LoggerFactory<PutGCSObject>::getLogger()) {
}
- PutGCSObject(const PutGCSObject&) = delete;
- PutGCSObject(PutGCSObject&&) = delete;
- PutGCSObject& operator=(const PutGCSObject&) = delete;
- PutGCSObject& operator=(PutGCSObject&&) = delete;
~PutGCSObject() override = default;
- EXTENSIONAPI static const core::Property GCPCredentials;
EXTENSIONAPI static const core::Property Bucket;
EXTENSIONAPI static const core::Property Key;
- EXTENSIONAPI static const core::Property NumberOfRetries;
EXTENSIONAPI static const core::Property ContentType;
EXTENSIONAPI static const core::Property MD5Hash;
EXTENSIONAPI static const core::Property Crc32cChecksum;
EXTENSIONAPI static const core::Property EncryptionKey;
EXTENSIONAPI static const core::Property ObjectACL;
EXTENSIONAPI static const core::Property OverwriteObject;
- EXTENSIONAPI static const core::Property EndpointOverrideURL;
EXTENSIONAPI static const core::Relationship Success;
EXTENSIONAPI static const core::Relationship Failure;
@@ -78,15 +68,7 @@
}
private:
- virtual google::cloud::storage::Client getClient(const google::cloud::storage::ClientOptions& options) const;
-
google::cloud::storage::EncryptionKey encryption_key_;
-
- std::shared_ptr<google::cloud::storage::oauth2::Credentials> gcp_credentials_;
- std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutGCSObject>::getLogger();
-
- protected:
- google::cloud::storage::RetryPolicyOption::Type retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(6);
};
} // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/tests/DeleteGCSObjectTests.cpp b/extensions/gcp/tests/DeleteGCSObjectTests.cpp
new file mode 100644
index 0000000..2b0e446
--- /dev/null
+++ b/extensions/gcp/tests/DeleteGCSObjectTests.cpp
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../processors/DeleteGCSObject.h"
+#include "../controllerservices/GCPCredentialsControllerService.h"
+#include "GCPAttributes.h"
+#include "core/Resource.h"
+#include "SingleProcessorTestController.h"
+#include "google/cloud/storage/testing/mock_client.h"
+#include "google/cloud/storage/internal/object_metadata_parser.h"
+#include "google/cloud/storage/testing/canonical_errors.h"
+
+namespace gcs = ::google::cloud::storage;
+namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp;
+
+using DeleteGCSObject = org::apache::nifi::minifi::extensions::gcp::DeleteGCSObject;
+using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService;
+using DeleteObjectRequest = gcs::internal::DeleteObjectRequest;
+using ::google::cloud::storage::testing::canonical_errors::TransientError;
+using ::google::cloud::storage::testing::canonical_errors::PermanentError;
+
+namespace {
+class DeleteGCSObjectMocked : public DeleteGCSObject {
+ using org::apache::nifi::minifi::extensions::gcp::DeleteGCSObject::DeleteGCSObject;
+ public:
+ gcs::Client getClient() const override {
+ return gcs::testing::ClientFromMock(mock_client_, *retry_policy_);
+ }
+ std::shared_ptr<gcs::testing::MockClient> mock_client_ = std::make_shared<gcs::testing::MockClient>();
+};
+REGISTER_RESOURCE(DeleteGCSObjectMocked, "DeleteGCSObjectMocked");
+} // namespace
+
+class DeleteGCSObjectTests : public ::testing::Test {
+ public:
+ void SetUp() override {
+ gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service");
+ test_controller_.plan->setProperty(gcp_credentials_node_,
+ GCPCredentialsControllerService::CredentialsLoc.getName(),
+ toString(GCPCredentialsControllerService::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS));
+ test_controller_.plan->setProperty(delete_gcs_object_,
+ DeleteGCSObject::GCPCredentials.getName(),
+ "gcp_credentials_controller_service");
+ }
+ std::shared_ptr<DeleteGCSObjectMocked> delete_gcs_object_ = std::make_shared<DeleteGCSObjectMocked>("DeleteGCSObjectMocked");
+ org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{delete_gcs_object_};
+ std::shared_ptr<minifi::core::controller::ControllerServiceNode> gcp_credentials_node_;
+};
+
+TEST_F(DeleteGCSObjectTests, MissingBucket) {
+ EXPECT_CALL(*delete_gcs_object_->mock_client_, CreateResumableSession).Times(0);
+ EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::Bucket.getName(), ""));
+ const auto& result = test_controller_.trigger("hello world");
+ EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size());
+ ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size());
+ EXPECT_EQ(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
+ EXPECT_EQ(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
+ EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
+}
+
+TEST_F(DeleteGCSObjectTests, ServerGivesPermaError) {
+ EXPECT_CALL(*delete_gcs_object_->mock_client_, DeleteObject)
+ .WillOnce(testing::Return(PermanentError()));
+ EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::Bucket.getName(), "bucket-from-property"));
+ const auto& result = test_controller_.trigger("hello world");
+ EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size());
+ ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size());
+ EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
+ EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
+ EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
+}
+
+TEST_F(DeleteGCSObjectTests, ServerGivesTransientErrors) {
+ EXPECT_CALL(*delete_gcs_object_->mock_client_, DeleteObject)
+ .WillOnce(testing::Return(TransientError()))
+ .WillOnce(testing::Return(TransientError()));
+ EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::NumberOfRetries.getName(), "1"));
+ EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::Bucket.getName(), "bucket-from-property"));
+ const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
+ EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size());
+ ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size());
+ EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
+ EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
+ EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
+}
+
+
+TEST_F(DeleteGCSObjectTests, HandlingSuccessfullDeletion) {
+ EXPECT_CALL(*delete_gcs_object_->mock_client_, DeleteObject)
+ .WillOnce([](DeleteObjectRequest const& request) {
+ EXPECT_EQ("bucket-from-attribute", request.bucket_name());
+ EXPECT_TRUE(request.HasOption<gcs::Generation>());
+ EXPECT_TRUE(request.GetOption<gcs::Generation>().has_value());
+ EXPECT_EQ(23, request.GetOption<gcs::Generation>().value());
+ return google::cloud::make_status_or(gcs::internal::EmptyResponse{});
+ });
+ EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
+ const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}, {minifi_gcp::GCS_GENERATION, "23"}});
+ ASSERT_EQ(1, result.at(DeleteGCSObject::Success).size());
+ EXPECT_EQ(0, result.at(DeleteGCSObject::Failure).size());
+ EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Success)[0]));
+}
+
+TEST_F(DeleteGCSObjectTests, EmptyGeneration) {
+ EXPECT_CALL(*delete_gcs_object_->mock_client_, DeleteObject)
+ .WillOnce([](DeleteObjectRequest const& request) {
+ EXPECT_EQ("bucket-from-attribute", request.bucket_name());
+ EXPECT_FALSE(request.HasOption<gcs::Generation>());
+ return google::cloud::make_status_or(gcs::internal::EmptyResponse{});
+ });
+ EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
+ const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
+ ASSERT_EQ(1, result.at(DeleteGCSObject::Success).size());
+ EXPECT_EQ(0, result.at(DeleteGCSObject::Failure).size());
+ EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Success)[0]));
+}
+
+TEST_F(DeleteGCSObjectTests, InvalidGeneration) {
+ EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
+ const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}, {minifi_gcp::GCS_GENERATION, "23 banana"}});
+ ASSERT_EQ(0, result.at(DeleteGCSObject::Success).size());
+ EXPECT_EQ(1, result.at(DeleteGCSObject::Failure).size());
+ EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
+}
diff --git a/extensions/gcp/tests/FetchGCSObjectTests.cpp b/extensions/gcp/tests/FetchGCSObjectTests.cpp
new file mode 100644
index 0000000..696d3b4
--- /dev/null
+++ b/extensions/gcp/tests/FetchGCSObjectTests.cpp
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../processors/FetchGCSObject.h"
+#include "../controllerservices/GCPCredentialsControllerService.h"
+#include "GCPAttributes.h"
+#include "core/Resource.h"
+#include "SingleProcessorTestController.h"
+#include "google/cloud/storage/testing/mock_client.h"
+#include "google/cloud/storage/internal/object_metadata_parser.h"
+#include "google/cloud/storage/testing/canonical_errors.h"
+
+namespace gcs = ::google::cloud::storage;
+namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp;
+
+using FetchGCSObject = org::apache::nifi::minifi::extensions::gcp::FetchGCSObject;
+using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService;
+using ::google::cloud::storage::testing::canonical_errors::TransientError;
+using ::google::cloud::storage::testing::canonical_errors::PermanentError;
+
+namespace {
+class FetchGCSObjectMocked : public FetchGCSObject {
+ using org::apache::nifi::minifi::extensions::gcp::FetchGCSObject::FetchGCSObject;
+ public:
+ gcs::Client getClient() const override {
+ return gcs::testing::ClientFromMock(mock_client_, *retry_policy_);
+ }
+ std::shared_ptr<gcs::testing::MockClient> mock_client_ = std::make_shared<gcs::testing::MockClient>();
+};
+REGISTER_RESOURCE(FetchGCSObjectMocked, "FetchGCSObjectMocked");
+} // namespace
+
+class FetchGCSObjectTests : public ::testing::Test {
+ public:
+ void SetUp() override {
+ gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service");
+ test_controller_.plan->setProperty(gcp_credentials_node_,
+ GCPCredentialsControllerService::CredentialsLoc.getName(),
+ toString(GCPCredentialsControllerService::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS));
+ test_controller_.plan->setProperty(fetch_gcs_object_,
+ FetchGCSObject::GCPCredentials.getName(),
+ "gcp_credentials_controller_service");
+ }
+ std::shared_ptr<FetchGCSObjectMocked> fetch_gcs_object_ = std::make_shared<FetchGCSObjectMocked>("FetchGCSObjectMocked");
+ org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{fetch_gcs_object_};
+ std::shared_ptr<minifi::core::controller::ControllerServiceNode> gcp_credentials_node_;
+};
+
+TEST_F(FetchGCSObjectTests, MissingBucket) {
+ EXPECT_CALL(*fetch_gcs_object_->mock_client_, CreateResumableSession).Times(0);
+ EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::Bucket.getName(), ""));
+ const auto& result = test_controller_.trigger("hello world");
+ EXPECT_EQ(0, result.at(FetchGCSObject::Success).size());
+ ASSERT_EQ(1, result.at(FetchGCSObject::Failure).size());
+ EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
+ EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
+ EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(FetchGCSObject::Failure)[0]));
+}
+
+TEST_F(FetchGCSObjectTests, ServerError) {
+ EXPECT_CALL(*fetch_gcs_object_->mock_client_, ReadObject)
+ .WillOnce([](gcs::internal::ReadObjectRangeRequest const& request) {
+ EXPECT_EQ(request.bucket_name(), "bucket-from-property") << request;
+ auto* mock_source = new gcs::testing::MockObjectReadSource;
+ ::testing::InSequence seq;
+ EXPECT_CALL(*mock_source, IsOpen).WillRepeatedly(testing::Return(true));
+ EXPECT_CALL(*mock_source, Read)
+ .WillOnce(testing::Return(google::cloud::Status(
+ google::cloud::StatusCode::kInvalidArgument,
+ "Invalid Argument")));
+ EXPECT_CALL(*mock_source, IsOpen).WillRepeatedly(testing::Return(false));
+
+ std::unique_ptr<gcs::internal::ObjectReadSource> result(mock_source);
+
+ return google::cloud::make_status_or(std::move(result));
+ });
+ EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::Bucket.getName(), "bucket-from-property"));
+ const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
+ EXPECT_EQ(0, result.at(FetchGCSObject::Success).size());
+ ASSERT_EQ(1, result.at(FetchGCSObject::Failure).size());
+ EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
+ EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
+}
+
+TEST_F(FetchGCSObjectTests, HappyPath) {
+ std::string const text = "stored text";
+ std::size_t offset = 0;
+ // Simulate a Read() call in the MockObjectReadSource object created below
+ auto simulate_read = [&text, &offset](void* buf, std::size_t n) {
+ auto const l = (std::min)(n, text.size() - offset);
+ std::memcpy(buf, text.data() + offset, l);
+ offset += l;
+ return gcs::internal::ReadSourceResult{
+ l, gcs::internal::HttpResponse{200, {}, {}}};
+ };
+ EXPECT_CALL(*fetch_gcs_object_->mock_client_, ReadObject)
+ .WillOnce([&](gcs::internal::ReadObjectRangeRequest const& request) {
+ EXPECT_EQ(request.bucket_name(), "bucket-from-attribute") << request;
+ EXPECT_TRUE(request.HasOption<gcs::Generation>());
+ EXPECT_TRUE(request.GetOption<gcs::Generation>().has_value());
+ EXPECT_EQ(23, request.GetOption<gcs::Generation>().value());
+ std::unique_ptr<gcs::testing::MockObjectReadSource> mock_source(new gcs::testing::MockObjectReadSource);
+ ::testing::InSequence seq;
+ EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(true));
+ EXPECT_CALL(*mock_source, Read).WillOnce(simulate_read);
+ EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(false));
+
+ return google::cloud::make_status_or(
+ std::unique_ptr<gcs::internal::ObjectReadSource>(
+ std::move(mock_source)));
+ });
+ EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
+ const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}, {minifi_gcp::GCS_GENERATION, "23"}});
+ ASSERT_EQ(1, result.at(FetchGCSObject::Success).size());
+ EXPECT_EQ(0, result.at(FetchGCSObject::Failure).size());
+ EXPECT_EQ("stored text", test_controller_.plan->getContent(result.at(FetchGCSObject::Success)[0]));
+}
+
+TEST_F(FetchGCSObjectTests, EmptyGeneration) {
+ std::string const text = "stored text";
+ std::size_t offset = 0;
+ // Simulate a Read() call in the MockObjectReadSource object created below
+ auto simulate_read = [&text, &offset](void* buf, std::size_t n) {
+ auto const l = (std::min)(n, text.size() - offset);
+ std::memcpy(buf, text.data() + offset, l);
+ offset += l;
+ return gcs::internal::ReadSourceResult{
+ l, gcs::internal::HttpResponse{200, {}, {}}};
+ };
+ EXPECT_CALL(*fetch_gcs_object_->mock_client_, ReadObject)
+ .WillOnce([&](gcs::internal::ReadObjectRangeRequest const& request) {
+ EXPECT_EQ(request.bucket_name(), "bucket-from-attribute") << request;
+ EXPECT_FALSE(request.HasOption<gcs::Generation>());
+ std::unique_ptr<gcs::testing::MockObjectReadSource> mock_source(new gcs::testing::MockObjectReadSource);
+ ::testing::InSequence seq;
+ EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(true));
+ EXPECT_CALL(*mock_source, Read).WillOnce(simulate_read);
+ EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(false));
+
+ return google::cloud::make_status_or(
+ std::unique_ptr<gcs::internal::ObjectReadSource>(
+ std::move(mock_source)));
+ });
+ EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
+ const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
+ ASSERT_EQ(1, result.at(FetchGCSObject::Success).size());
+ EXPECT_EQ(0, result.at(FetchGCSObject::Failure).size());
+ EXPECT_EQ("stored text", test_controller_.plan->getContent(result.at(FetchGCSObject::Success)[0]));
+}
+
+TEST_F(FetchGCSObjectTests, InvalidGeneration) {
+ EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
+ const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}, {minifi_gcp::GCS_GENERATION, "23 banana"}});
+ ASSERT_EQ(0, result.at(FetchGCSObject::Success).size());
+ EXPECT_EQ(1, result.at(FetchGCSObject::Failure).size());
+ EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(FetchGCSObject::Failure)[0]));
+}
diff --git a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
index 4eb97cd..f6e3b71 100644
--- a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
+++ b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
@@ -27,7 +27,6 @@
#include "rapidjson/writer.h"
#include "google/cloud/internal/setenv.h"
-
namespace gcs = ::google::cloud::storage;
using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService;
diff --git a/extensions/gcp/tests/ListGCSBucketTests.cpp b/extensions/gcp/tests/ListGCSBucketTests.cpp
new file mode 100644
index 0000000..a585194
--- /dev/null
+++ b/extensions/gcp/tests/ListGCSBucketTests.cpp
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../processors/ListGCSBucket.h"
+#include "../controllerservices/GCPCredentialsControllerService.h"
+#include "core/Resource.h"
+#include "SingleProcessorTestController.h"
+#include "google/cloud/storage/testing/mock_client.h"
+#include "google/cloud/storage/internal/object_metadata_parser.h"
+#include "google/cloud/storage/testing/canonical_errors.h"
+
+namespace gcs = ::google::cloud::storage;
+namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp;
+
+using ListGCSBucket = org::apache::nifi::minifi::extensions::gcp::ListGCSBucket;
+using ListObjectsRequest = gcs::internal::ListObjectsRequest;
+using ListObjectsResponse = gcs::internal::ListObjectsResponse;
+using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService;
+using ::google::cloud::storage::testing::canonical_errors::TransientError;
+using ::google::cloud::storage::testing::canonical_errors::PermanentError;
+
+namespace {
+class ListGCSBucketMocked : public ListGCSBucket {
+ using org::apache::nifi::minifi::extensions::gcp::ListGCSBucket::ListGCSBucket;
+ public:
+ gcs::Client getClient() const override {
+ return gcs::testing::ClientFromMock(mock_client_, *retry_policy_);
+ }
+ std::shared_ptr<gcs::testing::MockClient> mock_client_ = std::make_shared<gcs::testing::MockClient>();
+};
+REGISTER_RESOURCE(ListGCSBucketMocked, "ListGCSBucketMocked");
+
+auto CreateObject(int index, int generation = 1) {
+ std::string id = "object-" + std::to_string(index);
+ std::string name = id;
+ std::string link =
+ "https://storage.googleapis.com/storage/v1/b/test-bucket/" + id + "#1";
+ nlohmann::json metadata{
+ {"bucket", "test-bucket"},
+ {"id", id},
+ {"name", name},
+ {"selfLink", link},
+ {"generation", generation},
+ {"kind", "storage#object"},
+ };
+ return google::cloud::storage::internal::ObjectMetadataParser::FromJson(metadata).value();
+}
+} // namespace
+
+class ListGCSBucketTests : public ::testing::Test {
+ public:
+ void SetUp() override {
+ gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service");
+ test_controller_.plan->setProperty(gcp_credentials_node_,
+ GCPCredentialsControllerService::CredentialsLoc.getName(),
+ toString(GCPCredentialsControllerService::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS));
+ test_controller_.plan->setProperty(list_gcs_bucket_,
+ ListGCSBucket::GCPCredentials.getName(),
+ "gcp_credentials_controller_service");
+ }
+ std::shared_ptr<ListGCSBucketMocked> list_gcs_bucket_ = std::make_shared<ListGCSBucketMocked>("ListGCSBucketMocked");
+ org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{list_gcs_bucket_};
+ std::shared_ptr<minifi::core::controller::ControllerServiceNode> gcp_credentials_node_;
+};
+
+TEST_F(ListGCSBucketTests, MissingBucket) {
+ EXPECT_CALL(*list_gcs_bucket_->mock_client_, CreateResumableSession).Times(0);
+ EXPECT_THROW(test_controller_.trigger(), utils::internal::RequiredPropertyMissingException);
+}
+
+TEST_F(ListGCSBucketTests, ServerGivesPermaError) {
+ auto return_permanent_error = [](ListObjectsRequest const&) {
+ return google::cloud::StatusOr<ListObjectsResponse>(PermanentError());
+ };
+ EXPECT_CALL(*list_gcs_bucket_->mock_client_, ListObjects)
+ .WillOnce(return_permanent_error);
+ EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::Bucket.getName(), "bucket-from-property"));
+ const auto& result = test_controller_.trigger();
+ EXPECT_EQ(0, result.at(ListGCSBucket::Success).size());
+}
+
+TEST_F(ListGCSBucketTests, ServerGivesTransientErrors) {
+ auto return_temp_error = [](ListObjectsRequest const&) {
+ return google::cloud::StatusOr<ListObjectsResponse>(TransientError());
+ };
+ EXPECT_CALL(*list_gcs_bucket_->mock_client_, ListObjects)
+ .WillOnce(return_temp_error)
+ .WillOnce(return_temp_error);
+ EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::NumberOfRetries.getName(), "1"));
+ EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::Bucket.getName(), "bucket-from-property"));
+ const auto& result = test_controller_.trigger();
+ EXPECT_EQ(0, result.at(ListGCSBucket::Success).size());
+}
+
+TEST_F(ListGCSBucketTests, WithoutVersions) {
+ EXPECT_CALL(*list_gcs_bucket_->mock_client_, ListObjects)
+ .WillOnce([](ListObjectsRequest const& req)
+ -> google::cloud::StatusOr<ListObjectsResponse> {
+ EXPECT_EQ("bucket-from-property", req.bucket_name());
+ EXPECT_TRUE(req.HasOption<gcs::Versions>());
+ EXPECT_FALSE(req.GetOption<gcs::Versions>().value());
+
+ ListObjectsResponse response;
+ response.items.emplace_back(CreateObject(1, 1));
+ response.items.emplace_back(CreateObject(1, 2));
+ response.items.emplace_back(CreateObject(1, 3));
+ return response;
+ });
+ EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::Bucket.getName(), "bucket-from-property"));
+ const auto& result = test_controller_.trigger();
+ EXPECT_EQ(3, result.at(ListGCSBucket::Success).size());
+}
+
+
+TEST_F(ListGCSBucketTests, WithVersions) {
+ EXPECT_CALL(*list_gcs_bucket_->mock_client_, ListObjects)
+ .WillOnce([](ListObjectsRequest const& req)
+ -> google::cloud::StatusOr<ListObjectsResponse> {
+ EXPECT_EQ("bucket-from-property", req.bucket_name());
+ EXPECT_TRUE(req.HasOption<gcs::Versions>());
+ EXPECT_TRUE(req.GetOption<gcs::Versions>().value());
+
+ ListObjectsResponse response;
+ response.items.emplace_back(CreateObject(1));
+ response.items.emplace_back(CreateObject(2));
+ response.items.emplace_back(CreateObject(3));
+ return response;
+ });
+ EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::Bucket.getName(), "bucket-from-property"));
+ EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::ListAllVersions.getName(), "true"));
+ const auto& result = test_controller_.trigger();
+ EXPECT_EQ(3, result.at(ListGCSBucket::Success).size());
+}
+
diff --git a/extensions/gcp/tests/PutGCSObjectTests.cpp b/extensions/gcp/tests/PutGCSObjectTests.cpp
index 0c2a3ba..a5c8919 100644
--- a/extensions/gcp/tests/PutGCSObjectTests.cpp
+++ b/extensions/gcp/tests/PutGCSObjectTests.cpp
@@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "../processors/PutGCSObject.h"
+#include "../controllerservices/GCPCredentialsControllerService.h"
#include "GCPAttributes.h"
#include "core/Resource.h"
#include "SingleProcessorTestController.h"
@@ -39,7 +40,7 @@
class PutGCSObjectMocked : public PutGCSObject {
using org::apache::nifi::minifi::extensions::gcp::PutGCSObject::PutGCSObject;
public:
- gcs::Client getClient(const gcs::ClientOptions&) const override {
+ gcs::Client getClient() const override {
return gcs::testing::ClientFromMock(mock_client_, *retry_policy_);
}
std::shared_ptr<gcs::testing::MockClient> mock_client_ = std::make_shared<gcs::testing::MockClient>();
@@ -85,8 +86,7 @@
TEST_F(PutGCSObjectTests, MissingBucket) {
EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession).Times(0);
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), ""));
- test_controller_.enqueueFlowFile("hello world");
- const auto& result = test_controller_.trigger();
+ const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
@@ -107,8 +107,7 @@
return google::cloud::make_status_or(std::unique_ptr<gcs::internal::ResumableUploadSession>(std::move(mock_upload_session)));
});
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "${gcs.bucket}"));
- test_controller_.enqueueFlowFile("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
- const auto& result = test_controller_.trigger();
+ const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
@@ -127,8 +126,7 @@
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::NumberOfRetries.getName(), "2"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
- test_controller_.enqueueFlowFile("hello world");
- const auto& result = test_controller_.trigger();
+ const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
@@ -146,8 +144,7 @@
.WillOnce(return_permanent_error);
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
- test_controller_.enqueueFlowFile("hello world");
- const auto& result = test_controller_.trigger();
+ const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
@@ -171,8 +168,7 @@
});
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
- test_controller_.enqueueFlowFile("hello world");
- const auto& result = test_controller_.trigger();
+ const auto& result = test_controller_.trigger("hello world");
EXPECT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
}
@@ -195,8 +191,7 @@
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Crc32cChecksum.getName(), "${crc32c}"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
- test_controller_.enqueueFlowFile("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
- const auto& result = test_controller_.trigger();
+ const auto& result = test_controller_.trigger("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
EXPECT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
}
@@ -215,8 +210,7 @@
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::OverwriteObject.getName(), "false"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
- test_controller_.enqueueFlowFile("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
- const auto& result = test_controller_.trigger();
+ const auto& result = test_controller_.trigger("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
@@ -236,8 +230,7 @@
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::EncryptionKey.getName(), "ZW5jcnlwdGlvbl9rZXk="));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
- test_controller_.enqueueFlowFile("hello world");
- const auto& result = test_controller_.trigger();
+ const auto& result = test_controller_.trigger("hello world");
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(minifi_gcp::GCS_ENCRYPTION_SHA256_ATTR));
@@ -250,8 +243,7 @@
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::EncryptionKey.getName(), "not_base64_key"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
- test_controller_.enqueueFlowFile("hello world");
- EXPECT_THROW(test_controller_.trigger(), minifi::Exception);
+ EXPECT_THROW(test_controller_.trigger("hello world"), minifi::Exception);
}
TEST_F(PutGCSObjectTests, NoContentType) {
@@ -267,8 +259,7 @@
});
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
- test_controller_.enqueueFlowFile("hello world");
- const auto& result = test_controller_.trigger();
+ const auto& result = test_controller_.trigger("hello world");
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
@@ -288,8 +279,7 @@
});
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
- test_controller_.enqueueFlowFile("hello world", {{"mime.type", "text/attribute"}});
- const auto& result = test_controller_.trigger();
+ const auto& result = test_controller_.trigger("hello world", {{"mime.type", "text/attribute"}});
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
@@ -310,8 +300,7 @@
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::ObjectACL.getName(), toString(PutGCSObject::PredefinedAcl::AUTHENTICATED_READ)));
- test_controller_.enqueueFlowFile("hello world");
- const auto& result = test_controller_.trigger();
+ const auto& result = test_controller_.trigger("hello world");
ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index 8509144..e5df28f 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -317,8 +317,7 @@
invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "fail");
invokehttp->setProperty(InvokeHTTP::AttributesToSend, ".*");
invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::Success, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
- test_controller.enqueueFlowFile("data", {{"invalid header", "value"}});
- const auto result = test_controller.trigger();
+ const auto result = test_controller.trigger("data", {{"invalid header", "value"}});
auto file_contents = result.at(InvokeHTTP::RelFailure);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
@@ -338,8 +337,7 @@
invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "fail");
invokehttp->setProperty(InvokeHTTP::AttributesToSend, "valid.*");
invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::Success, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
- test_controller.enqueueFlowFile("data", {{"invalid header", "value"}, {"valid-header", "value2"}});
- const auto result = test_controller.trigger();
+ const auto result = test_controller.trigger("data", {{"invalid header", "value"}, {"valid-header", "value2"}});
REQUIRE(result.at(InvokeHTTP::RelFailure).empty());
const auto& success_contents = result.at(InvokeHTTP::Success);
REQUIRE(success_contents.size() == 1);
@@ -360,8 +358,7 @@
invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
invokehttp->setProperty(InvokeHTTP::AttributesToSend, ".*");
invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
- test_controller.enqueueFlowFile("data", {{"invalid header", "value"}, {"", "value2"}});
- const auto result = test_controller.trigger();
+ const auto result = test_controller.trigger("data", {{"invalid header", "value"}, {"", "value2"}});
auto file_contents = result.at(InvokeHTTP::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
@@ -383,8 +380,7 @@
invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "drop");
invokehttp->setProperty(InvokeHTTP::AttributesToSend, ".*");
invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
- test_controller.enqueueFlowFile("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
- const auto result = test_controller.trigger();
+ const auto result = test_controller.trigger("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
auto file_contents = result.at(InvokeHTTP::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
@@ -406,8 +402,7 @@
invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "drop");
invokehttp->setProperty(InvokeHTTP::AttributesToSend, "");
invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
- test_controller.enqueueFlowFile("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
- const auto result = test_controller.trigger();
+ const auto result = test_controller.trigger("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
auto file_contents = result.at(InvokeHTTP::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
@@ -429,8 +424,7 @@
invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "drop");
invokehttp->setProperty(InvokeHTTP::AttributesToSend, "he.*er");
invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
- test_controller.enqueueFlowFile("data", {{"header1", "value1"}, {"header", "value2"}});
- const auto result = test_controller.trigger();
+ const auto result = test_controller.trigger("data", {{"header1", "value1"}, {"header", "value2"}});
auto file_contents = result.at(InvokeHTTP::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
diff --git a/extensions/librdkafka/tests/PublishKafkaTests.cpp b/extensions/librdkafka/tests/PublishKafkaTests.cpp
index b508ee0..78d65b0 100644
--- a/extensions/librdkafka/tests/PublishKafkaTests.cpp
+++ b/extensions/librdkafka/tests/PublishKafkaTests.cpp
@@ -31,8 +31,7 @@
publish_kafka->setProperty(processors::PublishKafka::SeedBrokers, "test_seedbroker");
publish_kafka->setProperty(processors::PublishKafka::QueueBufferMaxMessage, "1000");
publish_kafka->setProperty(processors::PublishKafka::BatchSize, "1500");
- test_controller.enqueueFlowFile("");
- REQUIRE_THROWS_WITH(test_controller.trigger(), "Process Schedule Operation: Invalid configuration: Batch Size cannot be larger than Queue Max Message");
+ REQUIRE_THROWS_WITH(test_controller.trigger(""), "Process Schedule Operation: Invalid configuration: Batch Size cannot be larger than Queue Max Message");
}
} // namespace org::apache::nifi::minifi::test
diff --git a/extensions/standard-processors/tests/unit/FetchFileTests.cpp b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
index a6cd1d0..2643edc 100644
--- a/extensions/standard-processors/tests/unit/FetchFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
@@ -87,8 +87,7 @@
TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default but non-existent file path", "[testFetchFile]") {
attributes_["filename"] = "non_existent.file";
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::NotFound);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
@@ -99,8 +98,7 @@
TEST_CASE_METHOD(FetchFileTestFixture, "FileToFetch property set to a non-existent file path", "[testFetchFile]") {
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch, "/tmp/non_existent.file");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenFileNotFound, "INFO");
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::NotFound);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
@@ -113,8 +111,7 @@
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch,
input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenPermissionDenied, "WARN");
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::PermissionDenied);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
@@ -124,8 +121,7 @@
#endif
TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default file path", "[testFetchFile]") {
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -137,8 +133,7 @@
utils::putFileToDir(input_dir_ + utils::file::FileUtils::get_separator() + "sub", input_file_name_, file_content_);
auto file_path = input_dir_ + utils::file::FileUtils::get_separator() + "sub" + utils::file::FileUtils::get_separator() + input_file_name_;
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch, file_path);
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -147,8 +142,7 @@
TEST_CASE_METHOD(FetchFileTestFixture, "Flow scheduling fails due to missing move destination directory when completion strategy is set to move file", "[testFetchFile]") {
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
- test_controller_->enqueueFlowFile("", attributes_);
- REQUIRE_THROWS_AS(test_controller_->trigger(), minifi::Exception);
+ REQUIRE_THROWS_AS(test_controller_->trigger("", attributes_), minifi::Exception);
}
TEST_CASE_METHOD(FetchFileTestFixture, "Flow fails due to move conflict", "[testFetchFile]") {
@@ -157,8 +151,7 @@
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Fail");
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Failure);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
@@ -173,8 +166,7 @@
utils::putFileToDir(move_dir, input_file_name_, "old content");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Fail");
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -186,8 +178,7 @@
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Replace File");
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -203,8 +194,7 @@
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Rename");
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -221,8 +211,7 @@
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Keep Existing");
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -236,8 +225,7 @@
auto move_dir = test_controller_->createTempDirectory();
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -252,8 +240,7 @@
move_dir = move_dir + utils::file::FileUtils::get_separator() + "temp";
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -269,8 +256,7 @@
utils::file::FileUtils::set_permissions(move_dir, 0);
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -283,8 +269,7 @@
TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is deleted after flow completion", "[testFetchFile]") {
fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Delete File");
- test_controller_->enqueueFlowFile("", attributes_);
- const auto result = test_controller_->trigger();
+ const auto result = test_controller_->trigger("", attributes_);
auto file_contents = result.at(minifi::processors::FetchFile::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
index 6095d72..49f4e3f 100644
--- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -88,8 +88,7 @@
{
const char* const message = "first message: hello";
- controller.enqueueFlowFile(message);
- const auto result = controller.trigger();
+ const auto result = controller.trigger(message);
const auto& success_flow_files = result.at(PutUDP::Success);
REQUIRE(success_flow_files.size() == 1);
REQUIRE(result.at(PutUDP::Failure).empty());
@@ -101,8 +100,7 @@
{
const char* const message = "longer message AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; // NOLINT
- controller.enqueueFlowFile(message);
- const auto result = controller.trigger();
+ const auto result = controller.trigger(message);
const auto& success_flow_files = result.at(PutUDP::Success);
REQUIRE(success_flow_files.size() == 1);
REQUIRE(result.at(PutUDP::Failure).empty());
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 1423bc4..6450f72 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -404,7 +404,7 @@
std::string string_value;
if (!getProperty(property, string_value, flow_file)) return std::nullopt;
try {
- if (!state::response::Value{string_value}.template convertValue(value)) return std::nullopt;
+ if (!state::response::Value{string_value}.template convertValue<>(value)) return std::nullopt;
} catch (const utils::internal::ValueException&) {
return std::nullopt;
}
diff --git a/libminifi/test/SingleProcessorTestController.h b/libminifi/test/SingleProcessorTestController.h
index 1e4707c..b91b4c0 100644
--- a/libminifi/test/SingleProcessorTestController.h
+++ b/libminifi/test/SingleProcessorTestController.h
@@ -51,9 +51,10 @@
return result;
}
- void enqueueFlowFile(const std::string_view input_flow_file_content, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
+ auto trigger(const std::string_view input_flow_file_content, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
const auto new_flow_file = createFlowFile(input_flow_file_content, std::move(input_flow_file_attributes));
input_->put(new_flow_file);
+ return trigger();
}
core::Relationship addDynamicRelationship(std::string name) {