NIFI-8631: Ensure that GCP Pub/Sub messages are not acknowledged until session has been committed, in order ot ensure that we don't have data loss

This closes #5102.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 5693721..70b9e26 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -45,6 +45,7 @@
 

 import java.io.IOException;

 import java.util.ArrayList;

+import java.util.Collection;

 import java.util.Collections;

 import java.util.HashMap;

 import java.util.List;

@@ -130,11 +131,10 @@
     }

 

     @Override

-    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {

+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

         if (subscriber == null) {

-

             if (storedException.get() != null) {

-                getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", new Object[]{storedException.get()});

+                getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", storedException.get());

             } else {

                 getLogger().error("Google Cloud PubSub Subscriber was not properly created. Yielding the processor...");

             }

@@ -145,6 +145,7 @@
 

         final PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

         final List<String> ackIds = new ArrayList<>();

+        final String subscriptionName = getSubscriptionName(context);

 

         for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {

             if (message.hasMessage()) {

@@ -164,20 +165,26 @@
                 flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toByteArray()));

 

                 session.transfer(flowFile, REL_SUCCESS);

-                session.getProvenanceReporter().receive(flowFile, getSubscriptionName(context));

+                session.getProvenanceReporter().receive(flowFile, subscriptionName);

             }

         }

 

-        if (!ackIds.isEmpty()) {

-            AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()

-                    .addAllAckIds(ackIds)

-                    .setSubscription(getSubscriptionName(context))

-                    .build();

-            subscriber.acknowledgeCallable().call(acknowledgeRequest);

-        }

+        session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));

     }

 

-    private String getSubscriptionName(ProcessContext context) {

+    private void acknowledgeAcks(final Collection<String> ackIds, final String subscriptionName) {

+        if (ackIds == null || ackIds.isEmpty()) {

+            return;

+        }

+

+        AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()

+            .addAllAckIds(ackIds)

+            .setSubscription(subscriptionName)

+            .build();

+        subscriber.acknowledgeCallable().call(acknowledgeRequest);

+    }

+

+    private String getSubscriptionName(final ProcessContext context) {

         final String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();

         final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();

 

@@ -189,8 +196,7 @@
 

     }

 

-    private SubscriberStub getSubscriber(ProcessContext context) throws IOException {

-

+    private SubscriberStub getSubscriber(final ProcessContext context) throws IOException {

         final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()

                 .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))

                 .build();