NIFI-9317: Updating config verification for ListS3 (#5485)

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index 36e6a59..6609beb 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -303,17 +303,24 @@
     public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
 
     protected void initializeRegionAndEndpoint(final ProcessContext context, final AmazonWebServiceClient client) {
+        this.region = getRegionAndInitializeEndpoint(context, client);
+    }
+
+    protected Region getRegionAndInitializeEndpoint(final ProcessContext context, final AmazonWebServiceClient client) {
+        final Region region;
         // if the processor supports REGION, get the configured region.
         if (getSupportedPropertyDescriptors().contains(REGION)) {
-            final String region = context.getProperty(REGION).getValue();
-            if (region != null) {
-                this.region = Region.getRegion(Regions.fromName(region));
+            final String regionValue = context.getProperty(REGION).getValue();
+            if (regionValue != null) {
+                region = Region.getRegion(Regions.fromName(regionValue));
                 if (client != null) {
-                    client.setRegion(this.region);
+                    client.setRegion(region);
                 }
             } else {
-                this.region = null;
+                region = null;
             }
+        } else {
+            region = null;
         }
 
         // if the endpoint override has been configured, set the endpoint.
@@ -328,8 +335,8 @@
                     // handling vpce endpoints
                     // falling back to the configured region if the parse fails
                     // e.g. in case of https://vpce-***-***.sqs.{region}.vpce.amazonaws.com
-                    String region = parseRegionForVPCE(urlstr, this.region.getName());
-                    client.setEndpoint(urlstr, this.client.getServiceName(), region);
+                    String regionValue = parseRegionForVPCE(urlstr, region.getName());
+                    client.setEndpoint(urlstr, this.client.getServiceName(), regionValue);
                 } else {
                     // handling non-vpce custom endpoints where the AWS library can parse the region out
                     // e.g. https://sqs.{region}.***.***.***.gov
@@ -337,6 +344,7 @@
                 }
             }
         }
+        return region;
     }
 
     /*
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 8ab51fe..5446650 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -53,6 +53,7 @@
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -342,41 +343,23 @@
             return;
         }
 
+        final AmazonS3 client = getClient();
+
+        S3BucketLister bucketLister = getS3BucketLister(context, client);
+
         final long startNanos = System.nanoTime();
-        final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
         final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
         final long listingTimestamp = System.currentTimeMillis();
-        final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
+
+        final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
 
         final ListingSnapshot currentListing = listing.get();
         final long currentTimestamp = currentListing.getTimestamp();
         final Set<String> currentKeys = currentListing.getKeys();
-
-        final AmazonS3 client = getClient();
         int listCount = 0;
         int totalListCount = 0;
         long latestListedTimestampInThisCycle = currentTimestamp;
-        String delimiter = context.getProperty(DELIMITER).getValue();
-        String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
-
-        boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
-        int listType = context.getProperty(LIST_TYPE).asInteger();
-        S3BucketLister bucketLister = useVersions
-                ? new S3VersionBucketLister(client)
-                : listType == 2
-                    ? new S3ObjectBucketListerVersion2(client)
-                    : new S3ObjectBucketLister(client);
-
-        bucketLister.setBucketName(bucket);
-        bucketLister.setRequesterPays(requesterPays);
-
-        if (delimiter != null && !delimiter.isEmpty()) {
-            bucketLister.setDelimiter(delimiter);
-        }
-        if (prefix != null && !prefix.isEmpty()) {
-            bucketLister.setPrefix(prefix);
-        }
 
         VersionListing versionListing;
         final Set<String> listedKeys = new HashSet<>();
@@ -486,6 +469,33 @@
         }
     }
 
+    private S3BucketLister getS3BucketLister(final ProcessContext context, final AmazonS3 client) {
+        final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
+        final boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
+
+        final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+        final String delimiter = context.getProperty(DELIMITER).getValue();
+        final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+
+        final int listType = context.getProperty(LIST_TYPE).asInteger();
+
+        final S3BucketLister bucketLister = useVersions
+                ? new S3VersionBucketLister(client)
+                : listType == 2
+                ? new S3ObjectBucketListerVersion2(client)
+                : new S3ObjectBucketLister(client);
+
+        bucketLister.setBucketName(bucket);
+        bucketLister.setRequesterPays(requesterPays);
+
+        if (delimiter != null && !delimiter.isEmpty()) {
+            bucketLister.setDelimiter(delimiter);
+        }
+        if (prefix != null && !prefix.isEmpty()) {
+            bucketLister.setPrefix(prefix);
+        }
+        return bucketLister;
+    }
 
     private interface S3BucketLister {
         void setBucketName(String bucketName);
@@ -891,11 +901,15 @@
 
     @Override
     public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog logger, final Map<String, String> attributes) {
-        final AmazonS3Client client = createClient(context, getCredentials(context), createConfiguration(context));
-        initializeRegionAndEndpoint(context, client);
+        final ControllerService service = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
+        final AmazonS3Client client = service != null ? createClient(context, getCredentialsProvider(context), createConfiguration(context))
+            : createClient(context, getCredentials(context), createConfiguration(context));
+
+        getRegionAndInitializeEndpoint(context, client);
 
         final List<ConfigVerificationResult> results = new ArrayList<>();
         final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+        final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
 
         if (bucketName == null || bucketName.trim().isEmpty()) {
             results.add(new ConfigVerificationResult.Builder()
@@ -907,17 +921,35 @@
             return results;
         }
 
-        final String prefix = context.getProperty(PREFIX).getValue();
+        final S3BucketLister bucketLister = getS3BucketLister(context, client);
+        final long listingTimestamp = System.currentTimeMillis();
 
         // Attempt to perform a listing of objects in the S3 bucket
         try {
-            final ObjectListing listing = client.listObjects(bucketName, prefix);
-            final int count = listing.getObjectSummaries().size();
+            int listCount = 0;
+            int totalListCount = 0;
+            VersionListing versionListing;
+            do {
+                versionListing = bucketLister.listVersions();
+                for (final S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
+                    long lastModified = versionSummary.getLastModified().getTime();
+                    if (lastModified > (listingTimestamp - minAgeMilliseconds)) {
+                        continue;
+                    }
+
+                    listCount++;
+                }
+                bucketLister.setNextMarker();
+
+                totalListCount += listCount;
+
+                listCount = 0;
+            } while (bucketLister.isTruncated());
 
             results.add(new ConfigVerificationResult.Builder()
                 .verificationStepName("Perform Listing")
                 .outcome(Outcome.SUCCESSFUL)
-                .explanation("Successfully listed contents of bucket '" + bucketName + "', finding " + count + " objects" + (prefix == null ? "" : " with a prefix of '" + prefix + "'"))
+                .explanation("Successfully listed contents of bucket '" + bucketName + "', finding " + totalListCount + " objects matching the filter")
                 .build());
 
             logger.info("Successfully verified configuration");
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index a05c9e3..2942f23 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.processors.aws.s3;
 
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
@@ -28,7 +30,10 @@
 import com.amazonaws.services.s3.model.S3VersionSummary;
 import com.amazonaws.services.s3.model.VersionListing;
 import org.apache.commons.lang3.time.DateUtils;
+import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.state.MockStateManager;
@@ -44,6 +49,7 @@
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -66,6 +72,11 @@
             protected AmazonS3Client getClient() {
                 return mockS3Client;
             }
+
+            @Override
+            protected AmazonS3Client createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
+                return mockS3Client;
+            }
         };
         runner = TestRunners.newTestRunner(mockListS3);
     }
@@ -114,6 +125,10 @@
         flowFiles.get(1).assertAttributeEquals("filename", "b/c");
         flowFiles.get(2).assertAttributeEquals("filename", "d/e");
         runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
+
+        final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
+                .verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+        assertTrue(results.get(0).getExplanation().contains("finding 3 objects"));
     }
 
     @Test