MINIFICPP-1744 Add FetchGCSObject, DeleteGCSObject, ListGCSBucket

MINIFICPP-1744: Add FetchGCSObject
MINIFICPP-1745: Add DeleteGCSObject
MINIFICPP-1746: Add ListGCSBucket processor

Closes #1297
Signed-off-by: Marton Szasz <szaszm@apache.org>
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 88a0742..cbd1ac2 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -16,6 +16,7 @@
 - [DefragmentText](#defragmenttext)
 - [DeleteAzureBlobStorage](#deleteazureblobstorage)
 - [DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)
+- [DeleteGCSObject](#deletegcsobject)
 - [DeleteS3Object](#deletes3object)
 - [ExecuteProcess](#executeprocess)
 - [ExecutePythonProcessor](#executepythonprocessor)
@@ -24,6 +25,7 @@
 - [ExtractText](#extracttext)
 - [FetchAzureBlobStorage](#fetchazureblobstorage)
 - [FetchAzureDataLakeStorage](#fetchazuredatalakestorage)
+- [FetchGCSObject](#fetchgcsobject)
 - [FetchFile](#fetchfile)
 - [FetchOPCProcessor](#fetchopcprocessor)
 - [FetchS3Object](#fetchs3object)
@@ -38,6 +40,7 @@
 - [InvokeHTTP](#invokehttp)
 - [ListAzureBlobStorage](#listazureblobstorage)
 - [ListAzureDataLakeStorage](#listazuredatalakestorage)
+- [ListGCSBucket](#listgcsbucket)
 - [ListenHTTP](#listenhttp)
 - [ListenSyslog](#listensyslog)
 - [ListFile](#listfile)
@@ -393,6 +396,42 @@
 |failure|If file deletion from Azure Data Lake storage fails the flowfile is transferred to this relationship|
 |success|If file deletion from Azure Data Lake storage succeeds the flowfile is transferred to this relationship|
 
+## DeleteGCSObject
+
+### Description
+
+Deletes an object from a Google Cloud Bucket.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                                 | Default Value | Allowable Values                                                                  | Description                                                                                                                                     |
+|--------------------------------------|---------------|-----------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Bucket**                           | ${gcs.bucket} |                                                                                   | Bucket of the object.<br>**Supports Expression Language: true**                                                                                 |
+| **Key**                              | ${filename}   |                                                                                   | Name of the object.<br>**Supports Expression Language: true**                                                                                   |
+| **Number of retries**                | 6             | integers                                                                          | How many retry attempts should be made before routing to the failure relationship.                                                              |
+| **GCP Credentials Provider Service** |               | [GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService) | The Controller Service used to obtain Google Cloud Platform credentials.                                                                        |
+| Server Side Encryption Key           |               |                                                                                   | An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.<br>**Supports Expression Language: true**                |
+| Object Generation                    |               |                                                                                   | The generation of the Object to download. If left empty, then it will download the latest generation.<br>**Supports Expression Language: true** |
+| Endpoint Override URL                |               |                                                                                   | Overrides the default Google Cloud Storage endpoints                                                                                            |
+
+
+### Relationships
+
+| Name    | Description                                                                                  |
+|---------|----------------------------------------------------------------------------------------------|
+| success | FlowFiles are routed to this relationship after a successful Google Cloud Storage operation. |
+| failure | FlowFiles are routed to this relationship if the Google Cloud Storage operation fails.       |
+
+
+### Output Attributes
+
+| Attribute            | Relationship | Description                                             |
+|----------------------|--------------|---------------------------------------------------------|
+| _gcs.status.message_ | failure      | The status message received from google cloud.          |
+| _gcs.error.reason_   | failure      | The description of the error occurred during operation. |
+| _gcs.error.domain_   | failure      | The domain of the error occurred during operation.      |
 
 ## DeleteS3Object
 
@@ -603,6 +642,43 @@
 |success|Files that have been successfully fetched from Azure storage are transferred to this relationship|
 
 
+## FetchGCSObject
+
+### Description
+
+Fetches a file from a Google Cloud Bucket. Designed to be used in tandem with ListGCSBucket.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                                 | Default Value | Allowable Values                                                                  | Description                                                                                                                                     |
+|--------------------------------------|---------------|-----------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Bucket**                           | ${gcs.bucket} |                                                                                   | Bucket of the object.<br>**Supports Expression Language: true**                                                                                 |
+| **Key**                              | ${filename}   |                                                                                   | Name of the object.<br>**Supports Expression Language: true**                                                                                   |
+| **Number of retries**                | 6             | integers                                                                          | How many retry attempts should be made before routing to the failure relationship.                                                              |
+| **GCP Credentials Provider Service** |               | [GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService) | The Controller Service used to obtain Google Cloud Platform credentials.                                                                        |
+| Server Side Encryption Key           |               |                                                                                   | An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.<br>**Supports Expression Language: true**                |
+| Object Generation                    |               |                                                                                   | The generation of the Object to download. If left empty, then it will download the latest generation.<br>**Supports Expression Language: true** |
+| Endpoint Override URL                |               |                                                                                   | Overrides the default Google Cloud Storage endpoints                                                                                            |
+
+
+### Relationships
+
+| Name    | Description                                                                                  |
+|---------|----------------------------------------------------------------------------------------------|
+| success | FlowFiles are routed to this relationship after a successful Google Cloud Storage operation. |
+| failure | FlowFiles are routed to this relationship if the Google Cloud Storage operation fails.       |
+
+
+### Output Attributes
+
+| Attribute            | Relationship | Description                                             |
+|----------------------|--------------|---------------------------------------------------------|
+| _gcs.status.message_ | failure      | The status message received from google cloud.          |
+| _gcs.error.reason_   | failure      | The description of the error occurred during operation. |
+| _gcs.error.domain_   | failure      | The domain of the error occurred during operation.      |
+
 ## FetchFile
 
 ### Description
@@ -1003,6 +1079,59 @@
 |success|All FlowFiles that are received are routed to success|
 
 
+## ListGCSBucket
+
+### Description
+
+Retrieves a listing of objects from an GCS bucket.
+For each object that is listed, creates a FlowFile that represents the object so that it can be fetched or deleted in conjunction with FetchGCSObject/DeleteGCSObject.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                                 | Default Value | Allowable Values                                                                  | Description                                                                        |
+|--------------------------------------|---------------|-----------------------------------------------------------------------------------|------------------------------------------------------------------------------------|
+| **Bucket**                           |               |                                                                                   | Bucket of the object.<br>**Supports Expression Language: true**                    |
+| **Number of retries**                | 6             | integers                                                                          | How many retry attempts should be made before routing to the failure relationship. |
+| **GCP Credentials Provider Service** |               | [GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService) | The Controller Service used to obtain Google Cloud Platform credentials.           |
+| Endpoint Override URL                |               |                                                                                   | Overrides the default Google Cloud Storage endpoints                               |
+| List all versions                    | false         | false<br>true                                                                     | Set this option to `true` to get all the previous versions separately.             |
+
+
+### Relationships
+
+| Name    | Description                                                                                   |
+|---------|-----------------------------------------------------------------------------------------------|
+| success | FlowFiles are routed to this relationship after a successful Google Cloud Storage operation.  |
+
+### Output Attributes
+
+| Attribute                  | Relationship | Description                                                        |
+|----------------------------|--------------|--------------------------------------------------------------------|
+| _gcs.bucket_               | success      | Bucket of the object.                                              |
+| _gcs.key, filename_        | success      | Name of the object.                                                |
+| _gcs.size_                 | success      | Size of the object.                                                |
+| _gcs.crc32c_               | success      | The CRC32C checksum of object's data, encoded in base64            |
+| _gcs.md5_                  | success      | The MD5 hash of the object's data encoded in base64.               |
+| _gcs.owner.entity_         | success      | The owner entity, in the form "user-emailAddress".                 |
+| _gcs.owner.entity.id_      | success      | The ID for the entity.                                             |
+| _gcs.content.encoding_     | success      | The content encoding of the object.                                |
+| _gcs.content.language_     | success      | The content language of the object.                                |
+| _gcs.content.disposition_  | success      | The data content disposition of the object.                        |
+| _gcs.media.link_           | success      | The media download link to the object.                             |
+| _gcs.self.link_            | success      | The link to this object.                                           |
+| _gcs.etag_                 | success      | The HTTP 1.1 Entity tag for the object.                            |
+| _gcs.generated.id_         | success      | The service-generated ID for the object                            |
+| _gcs.generation_           | success      | The content generation of this object. Used for object versioning. |
+| _gcs.metageneration_       | success      | The metageneration of the object.                                  |
+| _gcs.create.time_          | success      | Unix timestamp of the object's creation in milliseconds            |
+| _gcs.update.time_          | success      | Unix timestamp of the object's last modification in milliseconds   |
+| _gcs.delete.time_          | success      | Unix timestamp of the object's deletion in milliseconds            |
+| _gcs.encryption.algorithm_ | success      | The algorithm used to encrypt the object.                          |
+| _gcs.encryption.sha256_    | success      | The SHA256 hash of the key used to encrypt the object              |
+
+
 ## ListenHTTP
 
 ### Description
@@ -1614,7 +1743,7 @@
 |--------------------------------------|---------------|-----------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | **Bucket**                           | ${gcs.bucket} |                                                                                                           | Bucket of the object.<br>**Supports Expression Language: true**                                                                                                                                                                                           |
 | **Key**                              | ${filename}   |                                                                                                           | Name of the object.<br>**Supports Expression Language: true**                                                                                                                                                                                             |
-| **Number Of retries**                | 6             | integers                                                                                                  | How many retry attempts should be made before routing to the failure relationship.                                                                                                                                                                        |
+| **Number of retries**                | 6             | integers                                                                                                  | How many retry attempts should be made before routing to the failure relationship.                                                                                                                                                                        |
 | **GCP Credentials Provider Service** |               | [GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService)                         | The Controller Service used to obtain Google Cloud Platform credentials.                                                                                                                                                                                  |
 | Object ACL                           |               | authenticatedRead<br>bucketOwnerFullControl<br>bucketOwnerRead<br>private<br>projectPrivate<br>publicRead | Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults. For more information please visit [Google Cloud Access control lists](https://cloud.google.com/storage/docs/access-control/lists#predefined-acl) |
 | Server Side Encryption Key           |               |                                                                                                           | An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.<br>**Supports Expression Language: true**                                                                                                                          |
@@ -1634,6 +1763,7 @@
 
 | Attribute                  | Relationship | Description                                                        |
 |----------------------------|--------------|--------------------------------------------------------------------|
+| _gcs.status.message_       | failure      | The status message received from google cloud.                     |
 | _gcs.error.reason_         | failure      | The description of the error occurred during upload.               |
 | _gcs.error.domain_         | failure      | The domain of the error occurred during upload.                    |
 | _gcs.bucket_               | success      | Bucket of the object.                                              |
@@ -1652,9 +1782,9 @@
 | _gcs.generated.id_         | success      | The service-generated ID for the object                            |
 | _gcs.generation_           | success      | The content generation of this object. Used for object versioning. |
 | _gcs.metageneration_       | success      | The metageneration of the object.                                  |
-| _gcs.create.time_          | success      | The creation time of the object (milliseconds)                     |
-| _gcs.update.time_          | success      | The last modification time of the object (milliseconds)            |
-| _gcs.delete.time_          | success      | The deletion time of the object (milliseconds)                     |
+| _gcs.create.time_          | success      | Unix timestamp of the object's creation in milliseconds            |
+| _gcs.update.time_          | success      | Unix timestamp of the object's last modification in milliseconds   |
+| _gcs.delete.time_          | success      | Unix timestamp of the object's deletion in milliseconds            |
 | _gcs.encryption.algorithm_ | success      | The algorithm used to encrypt the object.                          |
 | _gcs.encryption.sha256_    | success      | The SHA256 hash of the key used to encrypt the object              |
 
diff --git a/README.md b/README.md
index 26af00c..2ffb244 100644
--- a/README.md
+++ b/README.md
@@ -80,7 +80,7 @@
 | CivetWeb                         | [ListenHTTP](PROCESSORS.md#listenhttp)                                                                                                                                                                                                                                                                                                                                                                                                                                                             | -DDISABLE_CIVET=ON     |
 | CURL                             | [InvokeHTTP](PROCESSORS.md#invokehttp)                                                                                                                                                                                                                                                                                                                                                                                                                                                             | -DDISABLE_CURL=ON      |
 | GPS                              | GetGPS                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             | -DENABLE_GPS=ON        |
-| Google Cloud Platform            | [GcpCredentialsControllerService](CONTROLLERS.md#GcpCredentialsControllerService)<br>[PutGCSObject](PROCESSORS.md#putgcsobject)                                                                                                                                                                                                                                                                                                                                                                    | -DENABLE_GCP=ON        |
+| Google Cloud Platform            | [DeleteGCSObject](PROCESSORS.md#deletegcsobject)<br>[FetchGCSObject](PROCESSORS.md#fetchgcsobject)<br>[GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService)<br>[ListGCSBucket](PROCESSORS.md#listgcsbucket)<br>[PutGCSObject](PROCESSORS.md#putgcsobject)                                                                                                                                                                                                              | -DENABLE_GCP=ON        |
 | Kafka                            | [PublishKafka](PROCESSORS.md#publishkafka)                                                                                                                                                                                                                                                                                                                                                                                                                                                         | -DENABLE_LIBRDKAFKA=ON |
 | Kubernetes                       | [KubernetesControllerService](CONTROLLERS.md#kubernetesControllerService)                                                                                                                                                                                                                                                                                                                                                                                                                          | -DENABLE_KUBERNETES=ON |
 | JNI                              | **NiFi Processors**                                                                                                                                                                                                                                                                                                                                                                                                                                                                                | -DENABLE_JNI=ON        |
diff --git a/docker/test/integration/features/google_cloud_storage.feature b/docker/test/integration/features/google_cloud_storage.feature
index 18af9b1..ad2b6f1 100644
--- a/docker/test/integration/features/google_cloud_storage.feature
+++ b/docker/test/integration/features/google_cloud_storage.feature
@@ -17,3 +17,31 @@
 
     Then a flowfile with the content "hello_gcs" is placed in the monitored directory in less than 45 seconds
     And an object with the content "hello_gcs" is present in the Google Cloud storage
+
+  Scenario: A MiNiFi instance can fetch the listed objects from Google Cloud storage bucket
+    Given a Google Cloud storage server is set up and a single object with contents "preloaded data" is present
+    And a ListGCSBucket processor
+    And a FetchGCSObject processor
+    And the ListGCSBucket and the FetchGCSObject processors are set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the ListGCSBucket processor is connected to the FetchGCSObject
+    And the "success" relationship of the FetchGCSObject processor is connected to the PutFile
+
+    When all instances start up
+
+    Then a flowfile with the content "preloaded data" is placed in the monitored directory in less than 10 seconds
+
+
+  Scenario: A MiNiFi instance can delete the listed objects from Google Cloud storage bucket
+    Given a Google Cloud storage server is set up with some test data
+    And a ListGCSBucket processor
+    And a DeleteGCSObject processor
+    And the ListGCSBucket and the DeleteGCSObject processors are set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the ListGCSBucket processor is connected to the DeleteGCSObject
+    And the "success" relationship of the DeleteGCSObject processor is connected to the PutFile
+
+    When all instances start up
+
+    Then the test bucket of Google Cloud Storage is empty
+    And at least one empty flowfile is placed in the monitored directory in less than 10 seconds
diff --git a/docker/test/integration/minifi/processors/DeleteGCSObject.py b/docker/test/integration/minifi/processors/DeleteGCSObject.py
new file mode 100644
index 0000000..f816c6f
--- /dev/null
+++ b/docker/test/integration/minifi/processors/DeleteGCSObject.py
@@ -0,0 +1,14 @@
+from ..core.Processor import Processor
+
+
+class DeleteGCSObject(Processor):
+    def __init__(
+            self):
+        super(DeleteGCSObject, self).__init__(
+            'DeleteGCSObject',
+            properties={
+                'Bucket': 'test-bucket',
+                'Endpoint Override URL': 'fake-gcs-server:4443',
+                'Number of retries': 2
+            },
+            auto_terminate=["success", "failure"])
diff --git a/docker/test/integration/minifi/processors/FetchGCSObject.py b/docker/test/integration/minifi/processors/FetchGCSObject.py
new file mode 100644
index 0000000..557e565
--- /dev/null
+++ b/docker/test/integration/minifi/processors/FetchGCSObject.py
@@ -0,0 +1,14 @@
+from ..core.Processor import Processor
+
+
+class FetchGCSObject(Processor):
+    def __init__(
+            self):
+        super(FetchGCSObject, self).__init__(
+            'FetchGCSObject',
+            properties={
+                'Bucket': 'test-bucket',
+                'Endpoint Override URL': 'fake-gcs-server:4443',
+                'Number of retries': 2
+            },
+            auto_terminate=["success", "failure"])
diff --git a/docker/test/integration/minifi/processors/ListGCSBucket.py b/docker/test/integration/minifi/processors/ListGCSBucket.py
new file mode 100644
index 0000000..5c66254
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ListGCSBucket.py
@@ -0,0 +1,14 @@
+from ..core.Processor import Processor
+
+
+class ListGCSBucket(Processor):
+    def __init__(
+            self):
+        super(ListGCSBucket, self).__init__(
+            'ListGCSBucket',
+            properties={
+                'Bucket': 'test-bucket',
+                'Endpoint Override URL': 'fake-gcs-server:4443',
+                'Number of retries': 2
+            },
+            auto_terminate=["success"])
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index f588c8c..8769ed6 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -403,8 +403,9 @@
 
 
 # google cloud storage setup
-@given("a Google Cloud storage server is set up with some test data")
 @given("a Google Cloud storage server is set up")
+@given("a Google Cloud storage server is set up with some test data")
+@given('a Google Cloud storage server is set up and a single object with contents "preloaded data" is present')
 def step_impl(context):
     context.test.acquire_container("fake-gcs-server", "fake-gcs-server")
 
@@ -445,6 +446,17 @@
     p1.set_property("GCP Credentials Provider Service", gcp_controller_service.name)
 
 
+@given(u'the {processor_one} and the {processor_two} processors are set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server')
+def step_impl(context, processor_one, processor_two):
+    gcp_controller_service = GCPCredentialsControllerService(credentials_location="Use Anonymous credentials")
+    p1 = context.test.get_node_by_name(processor_one)
+    p2 = context.test.get_node_by_name(processor_two)
+    p1.controller_services.append(gcp_controller_service)
+    p1.set_property("GCP Credentials Provider Service", gcp_controller_service.name)
+    p2.controller_services.append(gcp_controller_service)
+    p2.set_property("GCP Credentials Provider Service", gcp_controller_service.name)
+
+
 @given("the kafka broker is started")
 def step_impl(context):
     context.test.start_kafka_broker()
diff --git a/extensions/gcp/GCPAttributes.h b/extensions/gcp/GCPAttributes.h
index 92548fd..38af0d3 100644
--- a/extensions/gcp/GCPAttributes.h
+++ b/extensions/gcp/GCPAttributes.h
@@ -24,6 +24,7 @@
 
 constexpr const char* GCS_ERROR_REASON = "gcs.error.reason";
 constexpr const char* GCS_ERROR_DOMAIN = "gcs.error.domain";
+constexpr const char* GCS_STATUS_MESSAGE = "gcs.status.message";
 constexpr const char* GCS_BUCKET_ATTR = "gcs.bucket";
 constexpr const char* GCS_OBJECT_NAME_ATTR = "gcs.key";
 constexpr const char* GCS_SIZE_ATTR = "gcs.size";
diff --git a/extensions/gcp/processors/DeleteGCSObject.cpp b/extensions/gcp/processors/DeleteGCSObject.cpp
new file mode 100644
index 0000000..e95178d
--- /dev/null
+++ b/extensions/gcp/processors/DeleteGCSObject.cpp
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DeleteGCSObject.h"
+
+#include "core/Resource.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/FlowFile.h"
+#include "../GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property DeleteGCSObject::Bucket(
+    core::PropertyBuilder::createProperty("Bucket")
+        ->withDescription("Bucket of the object.")
+        ->withDefaultValue("${gcs.bucket}")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property DeleteGCSObject::Key(
+    core::PropertyBuilder::createProperty("Key")
+        ->withDescription("Name of the object.")
+        ->withDefaultValue("${filename}")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property DeleteGCSObject::ObjectGeneration(
+    core::PropertyBuilder::createProperty("Object Generation")
+        ->withDescription("The generation of the Object to download. If left empty, then it will download the latest generation.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property DeleteGCSObject::EncryptionKey(
+    core::PropertyBuilder::createProperty("Server Side Encryption Key")
+        ->withDescription("The AES256 Encryption Key (encoded in base64) for server-side decryption of the object.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Relationship DeleteGCSObject::Success("success", "FlowFiles are routed to this relationship after a successful Google Cloud Storage operation.");
+const core::Relationship DeleteGCSObject::Failure("failure", "FlowFiles are routed to this relationship if the Google Cloud Storage operation fails.");
+
+void DeleteGCSObject::initialize() {
+  setSupportedProperties({GCPCredentials,
+                          Bucket,
+                          Key,
+                          ObjectGeneration,
+                          NumberOfRetries,
+                          EncryptionKey,
+                          EndpointOverrideURL});
+  setSupportedRelationships({Success, Failure});
+}
+
+void DeleteGCSObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session && gcp_credentials_);
+
+  auto flow_file = session->get();
+  if (!flow_file) {
+    context->yield();
+    return;
+  }
+
+  auto bucket = context->getProperty(Bucket, flow_file);
+  if (!bucket || bucket->empty()) {
+    logger_->log_error("Missing bucket name");
+    session->transfer(flow_file, Failure);
+    return;
+  }
+  auto object_name = context->getProperty(Key, flow_file);
+  if (!object_name || object_name->empty()) {
+    logger_->log_error("Missing object name");
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  gcs::Generation generation;
+
+  if (auto gen_str = context->getProperty(ObjectGeneration, flow_file); gen_str && !gen_str->empty()) {
+    try {
+      uint64_t gen;
+      utils::internal::ValueParser(*gen_str).parse(gen).parseEnd();
+      generation = gcs::Generation(gen);
+    } catch (const utils::internal::ValueException&) {
+      logger_->log_error("Invalid generation: %s", *gen_str);
+      session->transfer(flow_file, Failure);
+      return;
+    }
+  }
+
+  auto status = getClient().DeleteObject(*bucket, *object_name, generation, gcs::IfGenerationNotMatch(0));
+
+  if (!status.ok()) {
+    flow_file->setAttribute(GCS_STATUS_MESSAGE, status.message());
+    flow_file->setAttribute(GCS_ERROR_REASON, status.error_info().reason());
+    flow_file->setAttribute(GCS_ERROR_DOMAIN, status.error_info().domain());
+    logger_->log_error("Failed to delete %s object from %s bucket on Google Cloud Storage %s %s", *object_name, *bucket, status.message(), status.error_info().reason());
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  session->transfer(flow_file, Success);
+}
+
+REGISTER_RESOURCE(DeleteGCSObject, "Deletes an object from a Google Cloud Bucket.");
+}  // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/DeleteGCSObject.h b/extensions/gcp/processors/DeleteGCSObject.h
new file mode 100644
index 0000000..22644c9
--- /dev/null
+++ b/extensions/gcp/processors/DeleteGCSObject.h
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "GCSProcessor.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+
+class DeleteGCSObject : public GCSProcessor {
+ public:
+  explicit DeleteGCSObject(const std::string& name, const utils::Identifier& uuid = {})
+      : GCSProcessor(name, uuid, core::logging::LoggerFactory<DeleteGCSObject>::getLogger()) {
+  }
+  ~DeleteGCSObject() override = default;
+
+  EXTENSIONAPI static const core::Property Bucket;
+  EXTENSIONAPI static const core::Property Key;
+  EXTENSIONAPI static const core::Property EncryptionKey;
+  EXTENSIONAPI static const core::Property ObjectGeneration;
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+
+  void initialize() override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_REQUIRED;
+  }
+
+  bool isSingleThreaded() const override {
+    return true;
+  }
+};
+
+}  // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/FetchGCSObject.cpp b/extensions/gcp/processors/FetchGCSObject.cpp
new file mode 100644
index 0000000..58e36df
--- /dev/null
+++ b/extensions/gcp/processors/FetchGCSObject.cpp
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchGCSObject.h"
+
+#include <utility>
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "../GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property FetchGCSObject::Bucket(
+    core::PropertyBuilder::createProperty("Bucket")
+        ->withDescription("Bucket of the object.")
+        ->withDefaultValue("${gcs.bucket}")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property FetchGCSObject::Key(
+    core::PropertyBuilder::createProperty("Key")
+        ->withDescription("Name of the object.")
+        ->withDefaultValue("${filename}")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property FetchGCSObject::ObjectGeneration(
+    core::PropertyBuilder::createProperty("Object Generation")
+        ->withDescription("The generation of the Object to download. If left empty, then it will download the latest generation.")
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property FetchGCSObject::EncryptionKey(
+    core::PropertyBuilder::createProperty("Server Side Encryption Key")
+        ->withDescription("The AES256 Encryption Key (encoded in base64) for server-side decryption of the object.")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Relationship FetchGCSObject::Success("success", "FlowFiles are routed to this relationship after a successful Google Cloud Storage operation.");
+const core::Relationship FetchGCSObject::Failure("failure", "FlowFiles are routed to this relationship if the Google Cloud Storage operation fails.");
+
+
+namespace {
+class FetchFromGCSCallback {
+ public:
+  FetchFromGCSCallback(gcs::Client& client, std::string bucket, std::string key)
+      : bucket_(std::move(bucket)),
+        key_(std::move(key)),
+        client_(client) {
+  }
+
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
+    auto reader = client_.ReadObject(bucket_, key_, encryption_key_, generation_, gcs::IfGenerationNotMatch(0));
+    auto set_members = gsl::finally([&]{
+      status_ = reader.status();
+      result_generation_ = reader.generation();
+      meta_generation_ = reader.metageneration();
+      storage_class_ = reader.storage_class();
+    });
+    if (!reader)
+      return 0;
+    std::string contents{std::istreambuf_iterator<char>{reader}, {}};
+    auto write_ret = stream->write(gsl::make_span(contents).as_span<std::byte>());
+    reader.Close();
+    return write_ret;
+  }
+
+  [[nodiscard]] auto getStatus() const noexcept { return status_; }
+  [[nodiscard]] auto getGeneration() const noexcept { return result_generation_; }
+  [[nodiscard]] auto getMetaGeneration() const noexcept { return meta_generation_; }
+  [[nodiscard]] auto getStorageClass() const noexcept { return storage_class_; }
+
+
+  void setEncryptionKey(const gcs::EncryptionKey& encryption_key) {
+    encryption_key_ = encryption_key;
+  }
+
+  void setGeneration(const gcs::Generation generation) {
+    generation_ = generation;
+  }
+
+ private:
+  std::string bucket_;
+  std::string key_;
+  gcs::Client& client_;
+
+  gcs::EncryptionKey encryption_key_;
+  gcs::Generation generation_;
+
+  google::cloud::Status status_;
+  std::optional<std::int64_t> result_generation_;
+  std::optional<std::int64_t> meta_generation_;
+  std::optional<std::string> storage_class_;
+};
+}  // namespace
+
+
+void FetchGCSObject::initialize() {
+  setSupportedProperties({GCPCredentials,
+                          Bucket,
+                          Key,
+                          ObjectGeneration,
+                          NumberOfRetries,
+                          EncryptionKey,
+                          EndpointOverrideURL});
+  setSupportedRelationships({Success, Failure});
+}
+
+void FetchGCSObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+  GCSProcessor::onSchedule(context, session_factory);
+  gsl_Expects(context);
+  if (auto encryption_key = context->getProperty(EncryptionKey)) {
+    try {
+      encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key);
+    } catch (const google::cloud::RuntimeStatusError&) {
+      throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + EncryptionKey.getName());    }
+  }
+}
+
+void FetchGCSObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session && gcp_credentials_);
+
+  auto flow_file = session->get();
+  if (!flow_file) {
+    context->yield();
+    return;
+  }
+
+  auto bucket = context->getProperty(Bucket, flow_file);
+  if (!bucket || bucket->empty()) {
+    logger_->log_error("Missing bucket name");
+    session->transfer(flow_file, Failure);
+    return;
+  }
+  auto object_name = context->getProperty(Key, flow_file);
+  if (!object_name || object_name->empty()) {
+    logger_->log_error("Missing object name");
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  gcs::Client client = getClient();
+  FetchFromGCSCallback callback(client, *bucket, *object_name);
+  callback.setEncryptionKey(encryption_key_);
+
+  if (auto gen_str = context->getProperty(ObjectGeneration, flow_file); gen_str && !gen_str->empty()) {
+    try {
+      uint64_t gen;
+      utils::internal::ValueParser(*gen_str).parse(gen).parseEnd();
+      callback.setGeneration(gcs::Generation(gen));
+    } catch (const utils::internal::ValueException&) {
+      logger_->log_error("Invalid generation: %s", *gen_str);
+      session->transfer(flow_file, Failure);
+      return;
+    }
+  }
+
+  session->write(flow_file, std::ref(callback));
+  if (!callback.getStatus().ok()) {
+    flow_file->setAttribute(GCS_STATUS_MESSAGE, callback.getStatus().message());
+    flow_file->setAttribute(GCS_ERROR_REASON, callback.getStatus().error_info().reason());
+    flow_file->setAttribute(GCS_ERROR_DOMAIN, callback.getStatus().error_info().domain());
+    logger_->log_error("Failed to fetch from Google Cloud Storage %s %s", callback.getStatus().message(), callback.getStatus().error_info().reason());
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  if (auto generation = callback.getGeneration())
+    flow_file->setAttribute(GCS_GENERATION, std::to_string(*generation));
+  if (auto meta_generation = callback.getMetaGeneration())
+    flow_file->setAttribute(GCS_META_GENERATION, std::to_string(*meta_generation));
+  if (auto storage_class = callback.getStorageClass())
+    flow_file->setAttribute(GCS_STORAGE_CLASS, *storage_class);
+  session->transfer(flow_file, Success);
+}
+
+REGISTER_RESOURCE(FetchGCSObject, "Fetches a file from a Google Cloud Bucket. Designed to be used in tandem with ListGCSBucket.");
+}  // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/FetchGCSObject.h b/extensions/gcp/processors/FetchGCSObject.h
new file mode 100644
index 0000000..d71e747
--- /dev/null
+++ b/extensions/gcp/processors/FetchGCSObject.h
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "GCSProcessor.h"
+#include "google/cloud/storage/well_known_headers.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+
+class FetchGCSObject : public GCSProcessor {
+ public:
+  explicit FetchGCSObject(const std::string& name, const utils::Identifier& uuid = {})
+      : GCSProcessor(name, uuid, core::logging::LoggerFactory<FetchGCSObject>::getLogger()) {
+  }
+  ~FetchGCSObject() override = default;
+
+  EXTENSIONAPI static const core::Property Bucket;
+  EXTENSIONAPI static const core::Property Key;
+  EXTENSIONAPI static const core::Property EncryptionKey;
+  EXTENSIONAPI static const core::Property ObjectGeneration;
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_REQUIRED;
+  }
+
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
+ private:
+  google::cloud::storage::EncryptionKey encryption_key_;
+};
+
+}  // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/GCSProcessor.cpp b/extensions/gcp/processors/GCSProcessor.cpp
new file mode 100644
index 0000000..445f295
--- /dev/null
+++ b/extensions/gcp/processors/GCSProcessor.cpp
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GCSProcessor.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "../controllerservices/GCPCredentialsControllerService.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+
+const core::Property GCSProcessor::GCPCredentials(
+    core::PropertyBuilder::createProperty("GCP Credentials Provider Service")
+        ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.")
+        ->isRequired(true)
+        ->asType<GCPCredentialsControllerService>()
+        ->build());
+
+const core::Property GCSProcessor::NumberOfRetries(
+    core::PropertyBuilder::createProperty("Number of retries")
+        ->withDescription("How many retry attempts should be made before routing to the failure relationship.")
+        ->withDefaultValue<uint64_t>(6)
+        ->isRequired(true)
+        ->supportsExpressionLanguage(false)
+        ->build());
+
+const core::Property GCSProcessor::EndpointOverrideURL(
+    core::PropertyBuilder::createProperty("Endpoint Override URL")
+        ->withDescription("Overrides the default Google Cloud Storage endpoints")
+        ->isRequired(false)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+namespace {
+std::shared_ptr<google::cloud::storage::oauth2::Credentials> getCredentials(core::ProcessContext& context) {
+  std::string service_name;
+  if (context.getProperty(GCSProcessor::GCPCredentials.getName(), service_name) && !IsNullOrEmpty(service_name)) {
+    auto gcp_credentials_controller_service = std::dynamic_pointer_cast<const GCPCredentialsControllerService>(context.getControllerService(service_name));
+    if (!gcp_credentials_controller_service)
+      return nullptr;
+    return gcp_credentials_controller_service->getCredentials();
+  }
+  return nullptr;
+}
+}  // namespace
+
+
+void GCSProcessor::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+  if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) {
+    retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries);
+  }
+
+  gcp_credentials_ = getCredentials(*context);
+  if (!gcp_credentials_) {
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing GCP Credentials");
+  }
+
+  endpoint_url_ = context->getProperty(EndpointOverrideURL);
+  if (endpoint_url_)
+    logger_->log_debug("Endpoint overwritten: %s", *endpoint_url_);
+}
+
+gcs::Client GCSProcessor::getClient() const {
+  auto options = gcs::ClientOptions(gcp_credentials_);
+  if (endpoint_url_)
+    options.set_endpoint(*endpoint_url_);
+  return gcs::Client(options, *retry_policy_);
+}
+
+}  // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/GCSProcessor.h b/extensions/gcp/processors/GCSProcessor.h
new file mode 100644
index 0000000..f673a3c
--- /dev/null
+++ b/extensions/gcp/processors/GCSProcessor.h
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#pragma once
+
+#include <string>
+#include <memory>
+#include <utility>
+#include <optional>
+
+#include "core/logging/Logger.h"
+#include "core/Processor.h"
+#include "google/cloud/storage/oauth2/credentials.h"
+#include "google/cloud/storage/client.h"
+#include "google/cloud/storage/retry_policy.h"
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+class GCSProcessor : public core::Processor {
+ public:
+  GCSProcessor(const std::string& name, const minifi::utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger)
+      : core::Processor(name, uuid),
+        logger_(std::move(logger)) {
+  }
+
+  EXTENSIONAPI static const core::Property GCPCredentials;
+  EXTENSIONAPI static const core::Property NumberOfRetries;
+  EXTENSIONAPI static const core::Property EndpointOverrideURL;
+
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+ protected:
+  virtual google::cloud::storage::Client getClient() const;
+
+  std::optional<std::string> endpoint_url_;
+  std::shared_ptr<google::cloud::storage::oauth2::Credentials> gcp_credentials_;
+  google::cloud::storage::RetryPolicyOption::Type retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(6);
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+}  // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/ListGCSBucket.cpp b/extensions/gcp/processors/ListGCSBucket.cpp
new file mode 100644
index 0000000..be6dc22
--- /dev/null
+++ b/extensions/gcp/processors/ListGCSBucket.cpp
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListGCSBucket.h"
+
+#include "core/Resource.h"
+#include "core/FlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "../GCPAttributes.h"
+
+namespace gcs = ::google::cloud::storage;
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+const core::Property ListGCSBucket::Bucket(
+    core::PropertyBuilder::createProperty("Bucket")
+        ->withDescription("Bucket of the object.")
+        ->isRequired(true)
+        ->supportsExpressionLanguage(true)
+        ->build());
+
+const core::Property ListGCSBucket::ListAllVersions(
+    core::PropertyBuilder::createProperty("List all versions")
+        ->withDescription("Set this option to `true` to get all the previous versions separately.")
+        ->withDefaultValue<bool>(false)
+        ->build());
+
+const core::Relationship ListGCSBucket::Success("success", "FlowFiles are routed to this relationship after a successful Google Cloud Storage operation.");
+
+void ListGCSBucket::initialize() {
+  setSupportedProperties({GCPCredentials,
+                          Bucket,
+                          NumberOfRetries,
+                          EndpointOverrideURL,
+                          ListAllVersions});
+  setSupportedRelationships({Success});
+}
+
+
+void ListGCSBucket::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+  GCSProcessor::onSchedule(context, session_factory);
+  gsl_Expects(context);
+  context->getProperty(Bucket.getName(), bucket_);
+}
+
+void ListGCSBucket::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session && gcp_credentials_);
+
+  gcs::Client client = getClient();
+  auto list_all_versions = context->getProperty<bool>(ListAllVersions);
+  gcs::Versions versions = (list_all_versions && *list_all_versions) ? gcs::Versions(true) : gcs::Versions(false);
+  auto objects_in_bucket = client.ListObjects(bucket_, versions);
+  for (const auto& object_in_bucket : objects_in_bucket) {
+    if (object_in_bucket.ok()) {
+      auto flow_file = session->create();
+      flow_file->updateAttribute(core::SpecialFlowAttribute::FILENAME, object_in_bucket->name());
+      setAttributesFromObjectMetadata(*flow_file, *object_in_bucket);
+      session->transfer(flow_file, Success);
+    } else {
+      logger_->log_error("Invalid object in bucket %s", bucket_);
+    }
+  }
+}
+
+REGISTER_RESOURCE(ListGCSBucket, "Retrieves a listing of objects from an GCS bucket. "
+                                 "For each object that is listed, creates a FlowFile that represents the object so that it can be fetched in conjunction with FetchGCSObject.");
+}  // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/ListGCSBucket.h b/extensions/gcp/processors/ListGCSBucket.h
new file mode 100644
index 0000000..bdf8555
--- /dev/null
+++ b/extensions/gcp/processors/ListGCSBucket.h
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "GCSProcessor.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::extensions::gcp {
+
+class ListGCSBucket : public GCSProcessor {
+ public:
+  explicit ListGCSBucket(const std::string& name, const utils::Identifier& uuid = {})
+      : GCSProcessor(name, uuid, core::logging::LoggerFactory<ListGCSBucket>::getLogger()) {
+  }
+  ~ListGCSBucket() override = default;
+
+  EXTENSIONAPI static const core::Property Bucket;
+  EXTENSIONAPI static const core::Property ListAllVersions;
+
+  EXTENSIONAPI static const core::Relationship Success;
+
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_FORBIDDEN;
+  }
+
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
+ private:
+  std::string bucket_;
+};
+
+}  // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/processors/PutGCSObject.cpp b/extensions/gcp/processors/PutGCSObject.cpp
index 00a3d0c..eb5f499 100644
--- a/extensions/gcp/processors/PutGCSObject.cpp
+++ b/extensions/gcp/processors/PutGCSObject.cpp
@@ -17,27 +17,17 @@
 
 #include "PutGCSObject.h"
 
-#include <vector>
 #include <utility>
 
 #include "core/Resource.h"
 #include "core/FlowFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
-#include "io/StreamPipe.h"
-#include "utils/OptionalUtils.h"
 #include "../GCPAttributes.h"
 
 namespace gcs = ::google::cloud::storage;
 
 namespace org::apache::nifi::minifi::extensions::gcp {
-const core::Property PutGCSObject::GCPCredentials(
-    core::PropertyBuilder::createProperty("GCP Credentials Provider Service")
-        ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.")
-        ->isRequired(true)
-        ->asType<GCPCredentialsControllerService>()
-        ->build());
-
 const core::Property PutGCSObject::Bucket(
     core::PropertyBuilder::createProperty("Bucket")
         ->withDescription("Bucket of the object.")
@@ -52,14 +42,6 @@
         ->supportsExpressionLanguage(true)
         ->build());
 
-const core::Property PutGCSObject::NumberOfRetries(
-    core::PropertyBuilder::createProperty("Number of retries")
-        ->withDescription("How many retry attempts should be made before routing to the failure relationship.")
-        ->withDefaultValue<uint64_t>(6)
-        ->isRequired(true)
-        ->supportsExpressionLanguage(false)
-        ->build());
-
 const core::Property PutGCSObject::ContentType(
     core::PropertyBuilder::createProperty("Content Type")
         ->withDescription("Content Type for the file, i.e. text/plain ")
@@ -102,17 +84,9 @@
         ->withDefaultValue<bool>(true)
         ->build());
 
-const core::Property PutGCSObject::EndpointOverrideURL(
-    core::PropertyBuilder::createProperty("Endpoint Override URL")
-        ->withDescription("Overrides the default Google Cloud Storage endpoints")
-        ->isRequired(false)
-        ->supportsExpressionLanguage(true)
-        ->build());
-
 const core::Relationship PutGCSObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship");
 const core::Relationship PutGCSObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship");
 
-
 namespace {
 class UploadToGCSCallback {
  public:
@@ -183,16 +157,6 @@
   google::cloud::StatusOr<gcs::ObjectMetadata> result_;
 };
 
-std::shared_ptr<google::cloud::storage::oauth2::Credentials> getCredentials(core::ProcessContext& context) {
-  std::string service_name;
-  if (context.getProperty(PutGCSObject::GCPCredentials.getName(), service_name) && !IsNullOrEmpty(service_name)) {
-    auto gcp_credentials_controller_service = std::dynamic_pointer_cast<const GCPCredentialsControllerService>(context.getControllerService(service_name));
-    if (!gcp_credentials_controller_service)
-      return nullptr;
-    return gcp_credentials_controller_service->getCredentials();
-  }
-  return nullptr;
-}
 }  // namespace
 
 
@@ -211,16 +175,10 @@
   setSupportedRelationships({Success, Failure});
 }
 
-gcs::Client PutGCSObject::getClient(const gcs::ClientOptions& options) const {
-  return gcs::Client(options, *retry_policy_);
-}
 
-
-void PutGCSObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+void PutGCSObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& session_factory) {
+  GCSProcessor::onSchedule(context, session_factory);
   gsl_Expects(context);
-  if (auto number_of_retries = context->getProperty<uint64_t>(NumberOfRetries)) {
-    retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(*number_of_retries);
-  }
   if (auto encryption_key = context->getProperty(EncryptionKey)) {
     try {
       encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key);
@@ -228,10 +186,6 @@
       throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + EncryptionKey.getName());
     }
   }
-  gcp_credentials_ = getCredentials(*context);
-  if (!gcp_credentials_) {
-    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing GCP Credentials");
-  }
 }
 
 void PutGCSObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
@@ -256,13 +210,7 @@
     return;
   }
 
-  auto options = gcs::ClientOptions(gcp_credentials_);
-  if (auto endpoint_override_url = context->getProperty(EndpointOverrideURL)) {
-    options.set_endpoint(*endpoint_override_url);
-    logger_->log_debug("Endpoint override url %s", *endpoint_override_url);
-  }
-
-  gcs::Client client = getClient(options);
+  gcs::Client client = getClient();
   UploadToGCSCallback callback(client, *bucket, *object_name);
 
   if (auto crc32_checksum = context->getProperty(Crc32cChecksum, flow_file)) {
@@ -286,6 +234,7 @@
   session->read(flow_file, std::ref(callback));
   auto& result = callback.getResult();
   if (!result.ok()) {
+    flow_file->setAttribute(GCS_STATUS_MESSAGE, result.status().message());
     flow_file->setAttribute(GCS_ERROR_REASON, result.status().error_info().reason());
     flow_file->setAttribute(GCS_ERROR_DOMAIN, result.status().error_info().domain());
     logger_->log_error("Failed to upload to Google Cloud Storage %s %s", result.status().message(), result.status().error_info().reason());
diff --git a/extensions/gcp/processors/PutGCSObject.h b/extensions/gcp/processors/PutGCSObject.h
index 4539622..d9cd680 100644
--- a/extensions/gcp/processors/PutGCSObject.h
+++ b/extensions/gcp/processors/PutGCSObject.h
@@ -20,17 +20,14 @@
 #include <string>
 #include <memory>
 
-#include "core/Processor.h"
-#include "core/logging/Logger.h"
+#include "GCSProcessor.h"
 #include "core/logging/LoggerConfiguration.h"
-#include "../controllerservices/GCPCredentialsControllerService.h"
-#include "google/cloud/storage/client.h"
-#include "google/cloud/storage/retry_policy.h"
 #include "utils/Enum.h"
+#include "google/cloud/storage/well_known_headers.h"
 
 namespace org::apache::nifi::minifi::extensions::gcp {
 
-class PutGCSObject : public core::Processor {
+class PutGCSObject : public GCSProcessor {
  public:
   SMART_ENUM(PredefinedAcl,
              (AUTHENTICATED_READ, "authenticatedRead"),
@@ -42,25 +39,18 @@
              (PUBLIC_READ_WRITE, "publicReadWrite"));
 
   explicit PutGCSObject(const std::string& name, const utils::Identifier& uuid = {})
-      : core::Processor(name, uuid) {
+      : GCSProcessor(name, uuid, core::logging::LoggerFactory<PutGCSObject>::getLogger()) {
   }
-  PutGCSObject(const PutGCSObject&) = delete;
-  PutGCSObject(PutGCSObject&&) = delete;
-  PutGCSObject& operator=(const PutGCSObject&) = delete;
-  PutGCSObject& operator=(PutGCSObject&&) = delete;
   ~PutGCSObject() override = default;
 
-  EXTENSIONAPI static const core::Property GCPCredentials;
   EXTENSIONAPI static const core::Property Bucket;
   EXTENSIONAPI static const core::Property Key;
-  EXTENSIONAPI static const core::Property NumberOfRetries;
   EXTENSIONAPI static const core::Property ContentType;
   EXTENSIONAPI static const core::Property MD5Hash;
   EXTENSIONAPI static const core::Property Crc32cChecksum;
   EXTENSIONAPI static const core::Property EncryptionKey;
   EXTENSIONAPI static const core::Property ObjectACL;
   EXTENSIONAPI static const core::Property OverwriteObject;
-  EXTENSIONAPI static const core::Property EndpointOverrideURL;
 
   EXTENSIONAPI static const core::Relationship Success;
   EXTENSIONAPI static const core::Relationship Failure;
@@ -78,15 +68,7 @@
   }
 
  private:
-  virtual google::cloud::storage::Client getClient(const google::cloud::storage::ClientOptions& options) const;
-
   google::cloud::storage::EncryptionKey encryption_key_;
-
-  std::shared_ptr<google::cloud::storage::oauth2::Credentials> gcp_credentials_;
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutGCSObject>::getLogger();
-
- protected:
-  google::cloud::storage::RetryPolicyOption::Type retry_policy_ = std::make_shared<google::cloud::storage::LimitedErrorCountRetryPolicy>(6);
 };
 
 }  // namespace org::apache::nifi::minifi::extensions::gcp
diff --git a/extensions/gcp/tests/DeleteGCSObjectTests.cpp b/extensions/gcp/tests/DeleteGCSObjectTests.cpp
new file mode 100644
index 0000000..2b0e446
--- /dev/null
+++ b/extensions/gcp/tests/DeleteGCSObjectTests.cpp
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../processors/DeleteGCSObject.h"
+#include "../controllerservices/GCPCredentialsControllerService.h"
+#include "GCPAttributes.h"
+#include "core/Resource.h"
+#include "SingleProcessorTestController.h"
+#include "google/cloud/storage/testing/mock_client.h"
+#include "google/cloud/storage/internal/object_metadata_parser.h"
+#include "google/cloud/storage/testing/canonical_errors.h"
+
+namespace gcs = ::google::cloud::storage;
+namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp;
+
+using DeleteGCSObject = org::apache::nifi::minifi::extensions::gcp::DeleteGCSObject;
+using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService;
+using DeleteObjectRequest = gcs::internal::DeleteObjectRequest;
+using ::google::cloud::storage::testing::canonical_errors::TransientError;
+using ::google::cloud::storage::testing::canonical_errors::PermanentError;
+
+namespace {
+class DeleteGCSObjectMocked : public DeleteGCSObject {
+  using org::apache::nifi::minifi::extensions::gcp::DeleteGCSObject::DeleteGCSObject;
+ public:
+  gcs::Client getClient() const override {
+    return gcs::testing::ClientFromMock(mock_client_, *retry_policy_);
+  }
+  std::shared_ptr<gcs::testing::MockClient> mock_client_ = std::make_shared<gcs::testing::MockClient>();
+};
+REGISTER_RESOURCE(DeleteGCSObjectMocked, "DeleteGCSObjectMocked");
+}  // namespace
+
+class DeleteGCSObjectTests : public ::testing::Test {
+ public:
+  void SetUp() override {
+    gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service");
+    test_controller_.plan->setProperty(gcp_credentials_node_,
+                                       GCPCredentialsControllerService::CredentialsLoc.getName(),
+                                       toString(GCPCredentialsControllerService::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS));
+    test_controller_.plan->setProperty(delete_gcs_object_,
+                                       DeleteGCSObject::GCPCredentials.getName(),
+                                       "gcp_credentials_controller_service");
+  }
+  std::shared_ptr<DeleteGCSObjectMocked> delete_gcs_object_ = std::make_shared<DeleteGCSObjectMocked>("DeleteGCSObjectMocked");
+  org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{delete_gcs_object_};
+  std::shared_ptr<minifi::core::controller::ControllerServiceNode>  gcp_credentials_node_;
+};
+
+TEST_F(DeleteGCSObjectTests, MissingBucket) {
+  EXPECT_CALL(*delete_gcs_object_->mock_client_, CreateResumableSession).Times(0);
+  EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::Bucket.getName(), ""));
+  const auto& result = test_controller_.trigger("hello world");
+  EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size());
+  ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size());
+  EXPECT_EQ(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
+  EXPECT_EQ(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
+  EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
+}
+
+TEST_F(DeleteGCSObjectTests, ServerGivesPermaError) {
+  EXPECT_CALL(*delete_gcs_object_->mock_client_, DeleteObject)
+      .WillOnce(testing::Return(PermanentError()));
+  EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::Bucket.getName(), "bucket-from-property"));
+  const auto& result = test_controller_.trigger("hello world");
+  EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size());
+  ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size());
+  EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
+  EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
+  EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
+}
+
+TEST_F(DeleteGCSObjectTests, ServerGivesTransientErrors) {
+  EXPECT_CALL(*delete_gcs_object_->mock_client_, DeleteObject)
+      .WillOnce(testing::Return(TransientError()))
+      .WillOnce(testing::Return(TransientError()));
+  EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::NumberOfRetries.getName(), "1"));
+  EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::Bucket.getName(), "bucket-from-property"));
+  const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
+  EXPECT_EQ(0, result.at(DeleteGCSObject::Success).size());
+  ASSERT_EQ(1, result.at(DeleteGCSObject::Failure).size());
+  EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
+  EXPECT_NE(std::nullopt, result.at(DeleteGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
+  EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
+}
+
+
+TEST_F(DeleteGCSObjectTests, HandlingSuccessfullDeletion) {
+  EXPECT_CALL(*delete_gcs_object_->mock_client_, DeleteObject)
+      .WillOnce([](DeleteObjectRequest const& request) {
+        EXPECT_EQ("bucket-from-attribute", request.bucket_name());
+        EXPECT_TRUE(request.HasOption<gcs::Generation>());
+        EXPECT_TRUE(request.GetOption<gcs::Generation>().has_value());
+        EXPECT_EQ(23, request.GetOption<gcs::Generation>().value());
+        return google::cloud::make_status_or(gcs::internal::EmptyResponse{});
+      });
+  EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
+  const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}, {minifi_gcp::GCS_GENERATION, "23"}});
+  ASSERT_EQ(1, result.at(DeleteGCSObject::Success).size());
+  EXPECT_EQ(0, result.at(DeleteGCSObject::Failure).size());
+  EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Success)[0]));
+}
+
+TEST_F(DeleteGCSObjectTests, EmptyGeneration) {
+  EXPECT_CALL(*delete_gcs_object_->mock_client_, DeleteObject)
+      .WillOnce([](DeleteObjectRequest const& request) {
+        EXPECT_EQ("bucket-from-attribute", request.bucket_name());
+        EXPECT_FALSE(request.HasOption<gcs::Generation>());
+        return google::cloud::make_status_or(gcs::internal::EmptyResponse{});
+      });
+  EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
+  const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
+  ASSERT_EQ(1, result.at(DeleteGCSObject::Success).size());
+  EXPECT_EQ(0, result.at(DeleteGCSObject::Failure).size());
+  EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Success)[0]));
+}
+
+TEST_F(DeleteGCSObjectTests, InvalidGeneration) {
+  EXPECT_TRUE(test_controller_.plan->setProperty(delete_gcs_object_, DeleteGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
+  const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}, {minifi_gcp::GCS_GENERATION, "23 banana"}});
+  ASSERT_EQ(0, result.at(DeleteGCSObject::Success).size());
+  EXPECT_EQ(1, result.at(DeleteGCSObject::Failure).size());
+  EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(DeleteGCSObject::Failure)[0]));
+}
diff --git a/extensions/gcp/tests/FetchGCSObjectTests.cpp b/extensions/gcp/tests/FetchGCSObjectTests.cpp
new file mode 100644
index 0000000..696d3b4
--- /dev/null
+++ b/extensions/gcp/tests/FetchGCSObjectTests.cpp
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../processors/FetchGCSObject.h"
+#include "../controllerservices/GCPCredentialsControllerService.h"
+#include "GCPAttributes.h"
+#include "core/Resource.h"
+#include "SingleProcessorTestController.h"
+#include "google/cloud/storage/testing/mock_client.h"
+#include "google/cloud/storage/internal/object_metadata_parser.h"
+#include "google/cloud/storage/testing/canonical_errors.h"
+
+namespace gcs = ::google::cloud::storage;
+namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp;
+
+using FetchGCSObject = org::apache::nifi::minifi::extensions::gcp::FetchGCSObject;
+using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService;
+using ::google::cloud::storage::testing::canonical_errors::TransientError;
+using ::google::cloud::storage::testing::canonical_errors::PermanentError;
+
+namespace {
+class FetchGCSObjectMocked : public FetchGCSObject {
+  using org::apache::nifi::minifi::extensions::gcp::FetchGCSObject::FetchGCSObject;
+ public:
+  gcs::Client getClient() const override {
+    return gcs::testing::ClientFromMock(mock_client_, *retry_policy_);
+  }
+  std::shared_ptr<gcs::testing::MockClient> mock_client_ = std::make_shared<gcs::testing::MockClient>();
+};
+REGISTER_RESOURCE(FetchGCSObjectMocked, "FetchGCSObjectMocked");
+}  // namespace
+
+class FetchGCSObjectTests : public ::testing::Test {
+ public:
+  void SetUp() override {
+    gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service");
+    test_controller_.plan->setProperty(gcp_credentials_node_,
+                                       GCPCredentialsControllerService::CredentialsLoc.getName(),
+                                       toString(GCPCredentialsControllerService::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS));
+    test_controller_.plan->setProperty(fetch_gcs_object_,
+                                       FetchGCSObject::GCPCredentials.getName(),
+                                       "gcp_credentials_controller_service");
+  }
+  std::shared_ptr<FetchGCSObjectMocked> fetch_gcs_object_ = std::make_shared<FetchGCSObjectMocked>("FetchGCSObjectMocked");
+  org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{fetch_gcs_object_};
+  std::shared_ptr<minifi::core::controller::ControllerServiceNode>  gcp_credentials_node_;
+};
+
+TEST_F(FetchGCSObjectTests, MissingBucket) {
+  EXPECT_CALL(*fetch_gcs_object_->mock_client_, CreateResumableSession).Times(0);
+  EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::Bucket.getName(), ""));
+  const auto& result = test_controller_.trigger("hello world");
+  EXPECT_EQ(0, result.at(FetchGCSObject::Success).size());
+  ASSERT_EQ(1, result.at(FetchGCSObject::Failure).size());
+  EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
+  EXPECT_EQ(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
+  EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(FetchGCSObject::Failure)[0]));
+}
+
+TEST_F(FetchGCSObjectTests, ServerError) {
+  EXPECT_CALL(*fetch_gcs_object_->mock_client_, ReadObject)
+      .WillOnce([](gcs::internal::ReadObjectRangeRequest const& request) {
+        EXPECT_EQ(request.bucket_name(), "bucket-from-property") << request;
+        auto* mock_source = new gcs::testing::MockObjectReadSource;
+        ::testing::InSequence seq;
+        EXPECT_CALL(*mock_source, IsOpen).WillRepeatedly(testing::Return(true));
+        EXPECT_CALL(*mock_source, Read)
+            .WillOnce(testing::Return(google::cloud::Status(
+                google::cloud::StatusCode::kInvalidArgument,
+                "Invalid Argument")));
+        EXPECT_CALL(*mock_source, IsOpen).WillRepeatedly(testing::Return(false));
+
+        std::unique_ptr<gcs::internal::ObjectReadSource> result(mock_source);
+
+        return google::cloud::make_status_or(std::move(result));
+      });
+  EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::Bucket.getName(), "bucket-from-property"));
+  const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
+  EXPECT_EQ(0, result.at(FetchGCSObject::Success).size());
+  ASSERT_EQ(1, result.at(FetchGCSObject::Failure).size());
+  EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
+  EXPECT_NE(std::nullopt, result.at(FetchGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON));
+}
+
+TEST_F(FetchGCSObjectTests, HappyPath) {
+  std::string const text = "stored text";
+  std::size_t offset = 0;
+  // Simulate a Read() call in the MockObjectReadSource object created below
+  auto simulate_read = [&text, &offset](void* buf, std::size_t n) {
+    auto const l = (std::min)(n, text.size() - offset);
+    std::memcpy(buf, text.data() + offset, l);
+    offset += l;
+    return gcs::internal::ReadSourceResult{
+        l, gcs::internal::HttpResponse{200, {}, {}}};
+  };
+  EXPECT_CALL(*fetch_gcs_object_->mock_client_, ReadObject)
+      .WillOnce([&](gcs::internal::ReadObjectRangeRequest const& request) {
+        EXPECT_EQ(request.bucket_name(), "bucket-from-attribute") << request;
+        EXPECT_TRUE(request.HasOption<gcs::Generation>());
+        EXPECT_TRUE(request.GetOption<gcs::Generation>().has_value());
+        EXPECT_EQ(23, request.GetOption<gcs::Generation>().value());
+        std::unique_ptr<gcs::testing::MockObjectReadSource> mock_source(new gcs::testing::MockObjectReadSource);
+        ::testing::InSequence seq;
+        EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(true));
+        EXPECT_CALL(*mock_source, Read).WillOnce(simulate_read);
+        EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(false));
+
+        return google::cloud::make_status_or(
+            std::unique_ptr<gcs::internal::ObjectReadSource>(
+                std::move(mock_source)));
+      });
+  EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
+  const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}, {minifi_gcp::GCS_GENERATION, "23"}});
+  ASSERT_EQ(1, result.at(FetchGCSObject::Success).size());
+  EXPECT_EQ(0, result.at(FetchGCSObject::Failure).size());
+  EXPECT_EQ("stored text", test_controller_.plan->getContent(result.at(FetchGCSObject::Success)[0]));
+}
+
+TEST_F(FetchGCSObjectTests, EmptyGeneration) {
+  std::string const text = "stored text";
+  std::size_t offset = 0;
+  // Simulate a Read() call in the MockObjectReadSource object created below
+  auto simulate_read = [&text, &offset](void* buf, std::size_t n) {
+    auto const l = (std::min)(n, text.size() - offset);
+    std::memcpy(buf, text.data() + offset, l);
+    offset += l;
+    return gcs::internal::ReadSourceResult{
+        l, gcs::internal::HttpResponse{200, {}, {}}};
+  };
+  EXPECT_CALL(*fetch_gcs_object_->mock_client_, ReadObject)
+      .WillOnce([&](gcs::internal::ReadObjectRangeRequest const& request) {
+        EXPECT_EQ(request.bucket_name(), "bucket-from-attribute") << request;
+        EXPECT_FALSE(request.HasOption<gcs::Generation>());
+        std::unique_ptr<gcs::testing::MockObjectReadSource> mock_source(new gcs::testing::MockObjectReadSource);
+        ::testing::InSequence seq;
+        EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(true));
+        EXPECT_CALL(*mock_source, Read).WillOnce(simulate_read);
+        EXPECT_CALL(*mock_source, IsOpen()).WillRepeatedly(testing::Return(false));
+
+        return google::cloud::make_status_or(
+            std::unique_ptr<gcs::internal::ObjectReadSource>(
+                std::move(mock_source)));
+      });
+  EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
+  const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
+  ASSERT_EQ(1, result.at(FetchGCSObject::Success).size());
+  EXPECT_EQ(0, result.at(FetchGCSObject::Failure).size());
+  EXPECT_EQ("stored text", test_controller_.plan->getContent(result.at(FetchGCSObject::Success)[0]));
+}
+
+TEST_F(FetchGCSObjectTests, InvalidGeneration) {
+  EXPECT_TRUE(test_controller_.plan->setProperty(fetch_gcs_object_, FetchGCSObject::ObjectGeneration.getName(), "${gcs.generation}"));
+  const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}, {minifi_gcp::GCS_GENERATION, "23 banana"}});
+  ASSERT_EQ(0, result.at(FetchGCSObject::Success).size());
+  EXPECT_EQ(1, result.at(FetchGCSObject::Failure).size());
+  EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(FetchGCSObject::Failure)[0]));
+}
diff --git a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
index 4eb97cd..f6e3b71 100644
--- a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
+++ b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp
@@ -27,7 +27,6 @@
 #include "rapidjson/writer.h"
 #include "google/cloud/internal/setenv.h"
 
-
 namespace gcs = ::google::cloud::storage;
 
 using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService;
diff --git a/extensions/gcp/tests/ListGCSBucketTests.cpp b/extensions/gcp/tests/ListGCSBucketTests.cpp
new file mode 100644
index 0000000..a585194
--- /dev/null
+++ b/extensions/gcp/tests/ListGCSBucketTests.cpp
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "../processors/ListGCSBucket.h"
+#include "../controllerservices/GCPCredentialsControllerService.h"
+#include "core/Resource.h"
+#include "SingleProcessorTestController.h"
+#include "google/cloud/storage/testing/mock_client.h"
+#include "google/cloud/storage/internal/object_metadata_parser.h"
+#include "google/cloud/storage/testing/canonical_errors.h"
+
+namespace gcs = ::google::cloud::storage;
+namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp;
+
+using ListGCSBucket = org::apache::nifi::minifi::extensions::gcp::ListGCSBucket;
+using ListObjectsRequest = gcs::internal::ListObjectsRequest;
+using ListObjectsResponse = gcs::internal::ListObjectsResponse;
+using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService;
+using ::google::cloud::storage::testing::canonical_errors::TransientError;
+using ::google::cloud::storage::testing::canonical_errors::PermanentError;
+
+namespace {
+class ListGCSBucketMocked : public ListGCSBucket {
+  using org::apache::nifi::minifi::extensions::gcp::ListGCSBucket::ListGCSBucket;
+ public:
+  gcs::Client getClient() const override {
+    return gcs::testing::ClientFromMock(mock_client_, *retry_policy_);
+  }
+  std::shared_ptr<gcs::testing::MockClient> mock_client_ = std::make_shared<gcs::testing::MockClient>();
+};
+REGISTER_RESOURCE(ListGCSBucketMocked, "ListGCSBucketMocked");
+
+auto CreateObject(int index, int generation = 1) {
+  std::string id = "object-" + std::to_string(index);
+  std::string name = id;
+  std::string link =
+      "https://storage.googleapis.com/storage/v1/b/test-bucket/" + id + "#1";
+  nlohmann::json metadata{
+      {"bucket", "test-bucket"},
+      {"id", id},
+      {"name", name},
+      {"selfLink", link},
+      {"generation", generation},
+      {"kind", "storage#object"},
+  };
+  return google::cloud::storage::internal::ObjectMetadataParser::FromJson(metadata).value();
+}
+}  // namespace
+
+class ListGCSBucketTests : public ::testing::Test {
+ public:
+  void SetUp() override {
+    gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service");
+    test_controller_.plan->setProperty(gcp_credentials_node_,
+                                       GCPCredentialsControllerService::CredentialsLoc.getName(),
+                                       toString(GCPCredentialsControllerService::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS));
+    test_controller_.plan->setProperty(list_gcs_bucket_,
+                                       ListGCSBucket::GCPCredentials.getName(),
+                                       "gcp_credentials_controller_service");
+  }
+  std::shared_ptr<ListGCSBucketMocked> list_gcs_bucket_ = std::make_shared<ListGCSBucketMocked>("ListGCSBucketMocked");
+  org::apache::nifi::minifi::test::SingleProcessorTestController test_controller_{list_gcs_bucket_};
+  std::shared_ptr<minifi::core::controller::ControllerServiceNode>  gcp_credentials_node_;
+};
+
+TEST_F(ListGCSBucketTests, MissingBucket) {
+  EXPECT_CALL(*list_gcs_bucket_->mock_client_, CreateResumableSession).Times(0);
+  EXPECT_THROW(test_controller_.trigger(), utils::internal::RequiredPropertyMissingException);
+}
+
+TEST_F(ListGCSBucketTests, ServerGivesPermaError) {
+  auto return_permanent_error = [](ListObjectsRequest const&) {
+    return google::cloud::StatusOr<ListObjectsResponse>(PermanentError());
+  };
+  EXPECT_CALL(*list_gcs_bucket_->mock_client_, ListObjects)
+      .WillOnce(return_permanent_error);
+  EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::Bucket.getName(), "bucket-from-property"));
+  const auto& result = test_controller_.trigger();
+  EXPECT_EQ(0, result.at(ListGCSBucket::Success).size());
+}
+
+TEST_F(ListGCSBucketTests, ServerGivesTransientErrors) {
+  auto return_temp_error = [](ListObjectsRequest const&) {
+    return google::cloud::StatusOr<ListObjectsResponse>(TransientError());
+  };
+  EXPECT_CALL(*list_gcs_bucket_->mock_client_, ListObjects)
+      .WillOnce(return_temp_error)
+      .WillOnce(return_temp_error);
+  EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::NumberOfRetries.getName(), "1"));
+  EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::Bucket.getName(), "bucket-from-property"));
+  const auto& result = test_controller_.trigger();
+  EXPECT_EQ(0, result.at(ListGCSBucket::Success).size());
+}
+
+TEST_F(ListGCSBucketTests, WithoutVersions) {
+  EXPECT_CALL(*list_gcs_bucket_->mock_client_, ListObjects)
+      .WillOnce([](ListObjectsRequest const& req)
+                    -> google::cloud::StatusOr<ListObjectsResponse> {
+        EXPECT_EQ("bucket-from-property", req.bucket_name());
+        EXPECT_TRUE(req.HasOption<gcs::Versions>());
+        EXPECT_FALSE(req.GetOption<gcs::Versions>().value());
+
+        ListObjectsResponse response;
+        response.items.emplace_back(CreateObject(1, 1));
+        response.items.emplace_back(CreateObject(1, 2));
+        response.items.emplace_back(CreateObject(1, 3));
+        return response;
+      });
+  EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::Bucket.getName(), "bucket-from-property"));
+  const auto& result = test_controller_.trigger();
+  EXPECT_EQ(3, result.at(ListGCSBucket::Success).size());
+}
+
+
+TEST_F(ListGCSBucketTests, WithVersions) {
+  EXPECT_CALL(*list_gcs_bucket_->mock_client_, ListObjects)
+      .WillOnce([](ListObjectsRequest const& req)
+                    -> google::cloud::StatusOr<ListObjectsResponse> {
+        EXPECT_EQ("bucket-from-property", req.bucket_name());
+        EXPECT_TRUE(req.HasOption<gcs::Versions>());
+        EXPECT_TRUE(req.GetOption<gcs::Versions>().value());
+
+        ListObjectsResponse response;
+        response.items.emplace_back(CreateObject(1));
+        response.items.emplace_back(CreateObject(2));
+        response.items.emplace_back(CreateObject(3));
+        return response;
+      });
+  EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::Bucket.getName(), "bucket-from-property"));
+  EXPECT_TRUE(test_controller_.plan->setProperty(list_gcs_bucket_, ListGCSBucket::ListAllVersions.getName(), "true"));
+  const auto& result = test_controller_.trigger();
+  EXPECT_EQ(3, result.at(ListGCSBucket::Success).size());
+}
+
diff --git a/extensions/gcp/tests/PutGCSObjectTests.cpp b/extensions/gcp/tests/PutGCSObjectTests.cpp
index 0c2a3ba..a5c8919 100644
--- a/extensions/gcp/tests/PutGCSObjectTests.cpp
+++ b/extensions/gcp/tests/PutGCSObjectTests.cpp
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 #include "../processors/PutGCSObject.h"
+#include "../controllerservices/GCPCredentialsControllerService.h"
 #include "GCPAttributes.h"
 #include "core/Resource.h"
 #include "SingleProcessorTestController.h"
@@ -39,7 +40,7 @@
 class PutGCSObjectMocked : public PutGCSObject {
   using org::apache::nifi::minifi::extensions::gcp::PutGCSObject::PutGCSObject;
  public:
-  gcs::Client getClient(const gcs::ClientOptions&) const override {
+  gcs::Client getClient() const override {
     return gcs::testing::ClientFromMock(mock_client_, *retry_policy_);
   }
   std::shared_ptr<gcs::testing::MockClient> mock_client_ = std::make_shared<gcs::testing::MockClient>();
@@ -85,8 +86,7 @@
 TEST_F(PutGCSObjectTests, MissingBucket) {
   EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession).Times(0);
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), ""));
-  test_controller_.enqueueFlowFile("hello world");
-  const auto& result = test_controller_.trigger();
+  const auto& result = test_controller_.trigger("hello world");
   EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
   ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
   EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
@@ -107,8 +107,7 @@
         return google::cloud::make_status_or(std::unique_ptr<gcs::internal::ResumableUploadSession>(std::move(mock_upload_session)));
       });
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "${gcs.bucket}"));
-  test_controller_.enqueueFlowFile("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
-  const auto& result = test_controller_.trigger();
+  const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}});
   ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
   EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
@@ -127,8 +126,7 @@
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::NumberOfRetries.getName(), "2"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  test_controller_.enqueueFlowFile("hello world");
-  const auto& result = test_controller_.trigger();
+  const auto& result = test_controller_.trigger("hello world");
   EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
   ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
   EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
@@ -146,8 +144,7 @@
       .WillOnce(return_permanent_error);
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  test_controller_.enqueueFlowFile("hello world");
-  const auto& result = test_controller_.trigger();
+  const auto& result = test_controller_.trigger("hello world");
   EXPECT_EQ(0, result.at(PutGCSObject::Success).size());
   ASSERT_EQ(1, result.at(PutGCSObject::Failure).size());
   EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN));
@@ -171,8 +168,7 @@
       });
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  test_controller_.enqueueFlowFile("hello world");
-  const auto& result = test_controller_.trigger();
+  const auto& result = test_controller_.trigger("hello world");
   EXPECT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
 }
@@ -195,8 +191,7 @@
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Crc32cChecksum.getName(), "${crc32c}"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  test_controller_.enqueueFlowFile("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
-  const auto& result = test_controller_.trigger();
+  const auto& result = test_controller_.trigger("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
   EXPECT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
 }
@@ -215,8 +210,7 @@
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::OverwriteObject.getName(), "false"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  test_controller_.enqueueFlowFile("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
-  const auto& result = test_controller_.trigger();
+  const auto& result = test_controller_.trigger("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}});
   ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
   EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
@@ -236,8 +230,7 @@
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::EncryptionKey.getName(), "ZW5jcnlwdGlvbl9rZXk="));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  test_controller_.enqueueFlowFile("hello world");
-  const auto& result = test_controller_.trigger();
+  const auto& result = test_controller_.trigger("hello world");
   ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
   EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(minifi_gcp::GCS_ENCRYPTION_SHA256_ATTR));
@@ -250,8 +243,7 @@
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::EncryptionKey.getName(), "not_base64_key"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  test_controller_.enqueueFlowFile("hello world");
-  EXPECT_THROW(test_controller_.trigger(), minifi::Exception);
+  EXPECT_THROW(test_controller_.trigger("hello world"), minifi::Exception);
 }
 
 TEST_F(PutGCSObjectTests, NoContentType) {
@@ -267,8 +259,7 @@
       });
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  test_controller_.enqueueFlowFile("hello world");
-  const auto& result = test_controller_.trigger();
+  const auto& result = test_controller_.trigger("hello world");
   ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
   EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
@@ -288,8 +279,7 @@
       });
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
-  test_controller_.enqueueFlowFile("hello world", {{"mime.type", "text/attribute"}});
-  const auto& result = test_controller_.trigger();
+  const auto& result = test_controller_.trigger("hello world", {{"mime.type", "text/attribute"}});
   ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
   EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
@@ -310,8 +300,7 @@
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property"));
   EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::ObjectACL.getName(), toString(PutGCSObject::PredefinedAcl::AUTHENTICATED_READ)));
-  test_controller_.enqueueFlowFile("hello world");
-  const auto& result = test_controller_.trigger();
+  const auto& result = test_controller_.trigger("hello world");
   ASSERT_EQ(1, result.at(PutGCSObject::Success).size());
   EXPECT_EQ(0, result.at(PutGCSObject::Failure).size());
   EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0]));
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index 8509144..e5df28f 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -317,8 +317,7 @@
   invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "fail");
   invokehttp->setProperty(InvokeHTTP::AttributesToSend, ".*");
   invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::Success, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
-  test_controller.enqueueFlowFile("data", {{"invalid header", "value"}});
-  const auto result = test_controller.trigger();
+  const auto result = test_controller.trigger("data", {{"invalid header", "value"}});
   auto file_contents = result.at(InvokeHTTP::RelFailure);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
@@ -338,8 +337,7 @@
   invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "fail");
   invokehttp->setProperty(InvokeHTTP::AttributesToSend, "valid.*");
   invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::Success, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
-  test_controller.enqueueFlowFile("data", {{"invalid header", "value"}, {"valid-header", "value2"}});
-  const auto result = test_controller.trigger();
+  const auto result = test_controller.trigger("data", {{"invalid header", "value"}, {"valid-header", "value2"}});
   REQUIRE(result.at(InvokeHTTP::RelFailure).empty());
   const auto& success_contents = result.at(InvokeHTTP::Success);
   REQUIRE(success_contents.size() == 1);
@@ -360,8 +358,7 @@
   invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
   invokehttp->setProperty(InvokeHTTP::AttributesToSend, ".*");
   invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
-  test_controller.enqueueFlowFile("data", {{"invalid header", "value"}, {"", "value2"}});
-  const auto result = test_controller.trigger();
+  const auto result = test_controller.trigger("data", {{"invalid header", "value"}, {"", "value2"}});
   auto file_contents = result.at(InvokeHTTP::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
@@ -383,8 +380,7 @@
   invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "drop");
   invokehttp->setProperty(InvokeHTTP::AttributesToSend, ".*");
   invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
-  test_controller.enqueueFlowFile("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
-  const auto result = test_controller.trigger();
+  const auto result = test_controller.trigger("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
   auto file_contents = result.at(InvokeHTTP::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
@@ -406,8 +402,7 @@
   invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "drop");
   invokehttp->setProperty(InvokeHTTP::AttributesToSend, "");
   invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
-  test_controller.enqueueFlowFile("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
-  const auto result = test_controller.trigger();
+  const auto result = test_controller.trigger("data", {{"legit-header", "value1"}, {"invalid header", "value2"}});
   auto file_contents = result.at(InvokeHTTP::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
@@ -429,8 +424,7 @@
   invokehttp->setProperty(InvokeHTTP::InvalidHTTPHeaderFieldHandlingStrategy, "drop");
   invokehttp->setProperty(InvokeHTTP::AttributesToSend, "he.*er");
   invokehttp->setAutoTerminatedRelationships({InvokeHTTP::RelNoRetry, InvokeHTTP::RelFailure, InvokeHTTP::RelResponse, InvokeHTTP::RelRetry});
-  test_controller.enqueueFlowFile("data", {{"header1", "value1"}, {"header", "value2"}});
-  const auto result = test_controller.trigger();
+  const auto result = test_controller.trigger("data", {{"header1", "value1"}, {"header", "value2"}});
   auto file_contents = result.at(InvokeHTTP::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
diff --git a/extensions/librdkafka/tests/PublishKafkaTests.cpp b/extensions/librdkafka/tests/PublishKafkaTests.cpp
index b508ee0..78d65b0 100644
--- a/extensions/librdkafka/tests/PublishKafkaTests.cpp
+++ b/extensions/librdkafka/tests/PublishKafkaTests.cpp
@@ -31,8 +31,7 @@
   publish_kafka->setProperty(processors::PublishKafka::SeedBrokers, "test_seedbroker");
   publish_kafka->setProperty(processors::PublishKafka::QueueBufferMaxMessage, "1000");
   publish_kafka->setProperty(processors::PublishKafka::BatchSize, "1500");
-  test_controller.enqueueFlowFile("");
-  REQUIRE_THROWS_WITH(test_controller.trigger(), "Process Schedule Operation: Invalid configuration: Batch Size cannot be larger than Queue Max Message");
+  REQUIRE_THROWS_WITH(test_controller.trigger(""), "Process Schedule Operation: Invalid configuration: Batch Size cannot be larger than Queue Max Message");
 }
 
 }  // namespace org::apache::nifi::minifi::test
diff --git a/extensions/standard-processors/tests/unit/FetchFileTests.cpp b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
index a6cd1d0..2643edc 100644
--- a/extensions/standard-processors/tests/unit/FetchFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/FetchFileTests.cpp
@@ -87,8 +87,7 @@
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default but non-existent file path", "[testFetchFile]") {
   attributes_["filename"] = "non_existent.file";
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::NotFound);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
@@ -99,8 +98,7 @@
 TEST_CASE_METHOD(FetchFileTestFixture, "FileToFetch property set to a non-existent file path", "[testFetchFile]") {
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch, "/tmp/non_existent.file");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenFileNotFound, "INFO");
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::NotFound);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
@@ -113,8 +111,7 @@
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch,
     input_dir_ + utils::file::FileUtils::get_separator() + permission_denied_file_name_);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::LogLevelWhenPermissionDenied, "WARN");
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::PermissionDenied);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
@@ -124,8 +121,7 @@
 #endif
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Test fetching file with default file path", "[testFetchFile]") {
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -137,8 +133,7 @@
   utils::putFileToDir(input_dir_ + utils::file::FileUtils::get_separator() + "sub", input_file_name_, file_content_);
   auto file_path = input_dir_ + utils::file::FileUtils::get_separator() + "sub" + utils::file::FileUtils::get_separator() + input_file_name_;
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::FileToFetch, file_path);
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -147,8 +142,7 @@
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Flow scheduling fails due to missing move destination directory when completion strategy is set to move file", "[testFetchFile]") {
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
-  test_controller_->enqueueFlowFile("", attributes_);
-  REQUIRE_THROWS_AS(test_controller_->trigger(), minifi::Exception);
+  REQUIRE_THROWS_AS(test_controller_->trigger("", attributes_), minifi::Exception);
 }
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Flow fails due to move conflict", "[testFetchFile]") {
@@ -157,8 +151,7 @@
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Fail");
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::Failure);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == "");
@@ -173,8 +166,7 @@
   utils::putFileToDir(move_dir, input_file_name_, "old content");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Fail");
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -186,8 +178,7 @@
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Replace File");
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -203,8 +194,7 @@
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Rename");
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -221,8 +211,7 @@
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveConflictStrategy, "Keep Existing");
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -236,8 +225,7 @@
   auto move_dir = test_controller_->createTempDirectory();
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -252,8 +240,7 @@
   move_dir = move_dir + utils::file::FileUtils::get_separator() + "temp";
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -269,8 +256,7 @@
   utils::file::FileUtils::set_permissions(move_dir, 0);
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Move File");
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::MoveDestinationDirectory, move_dir);
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
@@ -283,8 +269,7 @@
 
 TEST_CASE_METHOD(FetchFileTestFixture, "Fetched file is deleted after flow completion", "[testFetchFile]") {
   fetch_file_processor_->setProperty(org::apache::nifi::minifi::processors::FetchFile::CompletionStrategy, "Delete File");
-  test_controller_->enqueueFlowFile("", attributes_);
-  const auto result = test_controller_->trigger();
+  const auto result = test_controller_->trigger("", attributes_);
   auto file_contents = result.at(minifi::processors::FetchFile::Success);
   REQUIRE(file_contents.size() == 1);
   REQUIRE(test_controller_->plan->getContent(file_contents[0]) == file_content_);
diff --git a/extensions/standard-processors/tests/unit/PutUDPTests.cpp b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
index 6095d72..49f4e3f 100644
--- a/extensions/standard-processors/tests/unit/PutUDPTests.cpp
+++ b/extensions/standard-processors/tests/unit/PutUDPTests.cpp
@@ -88,8 +88,7 @@
 
   {
     const char* const message = "first message: hello";
-    controller.enqueueFlowFile(message);
-    const auto result = controller.trigger();
+    const auto result = controller.trigger(message);
     const auto& success_flow_files = result.at(PutUDP::Success);
     REQUIRE(success_flow_files.size() == 1);
     REQUIRE(result.at(PutUDP::Failure).empty());
@@ -101,8 +100,7 @@
 
   {
     const char* const message = "longer message AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";  // NOLINT
-    controller.enqueueFlowFile(message);
-    const auto result = controller.trigger();
+    const auto result = controller.trigger(message);
     const auto& success_flow_files = result.at(PutUDP::Success);
     REQUIRE(success_flow_files.size() == 1);
     REQUIRE(result.at(PutUDP::Failure).empty());
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 1423bc4..6450f72 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -404,7 +404,7 @@
   std::string string_value;
   if (!getProperty(property, string_value, flow_file)) return std::nullopt;
   try {
-    if (!state::response::Value{string_value}.template convertValue(value)) return std::nullopt;
+    if (!state::response::Value{string_value}.template convertValue<>(value)) return std::nullopt;
   } catch (const utils::internal::ValueException&) {
     return std::nullopt;
   }
diff --git a/libminifi/test/SingleProcessorTestController.h b/libminifi/test/SingleProcessorTestController.h
index 1e4707c..b91b4c0 100644
--- a/libminifi/test/SingleProcessorTestController.h
+++ b/libminifi/test/SingleProcessorTestController.h
@@ -51,9 +51,10 @@
     return result;
   }
 
-  void enqueueFlowFile(const std::string_view input_flow_file_content, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
+  auto trigger(const std::string_view input_flow_file_content, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
     const auto new_flow_file = createFlowFile(input_flow_file_content, std::move(input_flow_file_attributes));
     input_->put(new_flow_file);
+    return trigger();
   }
 
   core::Relationship addDynamicRelationship(std::string name) {