NIFI-9317: Updating config verification for ListS3 (#5485)
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index 36e6a59..6609beb 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -303,17 +303,24 @@
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
protected void initializeRegionAndEndpoint(final ProcessContext context, final AmazonWebServiceClient client) {
+ this.region = getRegionAndInitializeEndpoint(context, client);
+ }
+
+ protected Region getRegionAndInitializeEndpoint(final ProcessContext context, final AmazonWebServiceClient client) {
+ final Region region;
// if the processor supports REGION, get the configured region.
if (getSupportedPropertyDescriptors().contains(REGION)) {
- final String region = context.getProperty(REGION).getValue();
- if (region != null) {
- this.region = Region.getRegion(Regions.fromName(region));
+ final String regionValue = context.getProperty(REGION).getValue();
+ if (regionValue != null) {
+ region = Region.getRegion(Regions.fromName(regionValue));
if (client != null) {
- client.setRegion(this.region);
+ client.setRegion(region);
}
} else {
- this.region = null;
+ region = null;
}
+ } else {
+ region = null;
}
// if the endpoint override has been configured, set the endpoint.
@@ -328,8 +335,8 @@
// handling vpce endpoints
// falling back to the configured region if the parse fails
// e.g. in case of https://vpce-***-***.sqs.{region}.vpce.amazonaws.com
- String region = parseRegionForVPCE(urlstr, this.region.getName());
- client.setEndpoint(urlstr, this.client.getServiceName(), region);
+ String regionValue = parseRegionForVPCE(urlstr, region.getName());
+ client.setEndpoint(urlstr, this.client.getServiceName(), regionValue);
} else {
// handling non-vpce custom endpoints where the AWS library can parse the region out
// e.g. https://sqs.{region}.***.***.***.gov
@@ -337,6 +344,7 @@
}
}
}
+ return region;
}
/*
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 8ab51fe..5446650 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -53,6 +53,7 @@
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -342,41 +343,23 @@
return;
}
+ final AmazonS3 client = getClient();
+
+ S3BucketLister bucketLister = getS3BucketLister(context, client);
+
final long startNanos = System.nanoTime();
- final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long listingTimestamp = System.currentTimeMillis();
- final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
+
+ final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final ListingSnapshot currentListing = listing.get();
final long currentTimestamp = currentListing.getTimestamp();
final Set<String> currentKeys = currentListing.getKeys();
-
- final AmazonS3 client = getClient();
int listCount = 0;
int totalListCount = 0;
long latestListedTimestampInThisCycle = currentTimestamp;
- String delimiter = context.getProperty(DELIMITER).getValue();
- String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
-
- boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
- int listType = context.getProperty(LIST_TYPE).asInteger();
- S3BucketLister bucketLister = useVersions
- ? new S3VersionBucketLister(client)
- : listType == 2
- ? new S3ObjectBucketListerVersion2(client)
- : new S3ObjectBucketLister(client);
-
- bucketLister.setBucketName(bucket);
- bucketLister.setRequesterPays(requesterPays);
-
- if (delimiter != null && !delimiter.isEmpty()) {
- bucketLister.setDelimiter(delimiter);
- }
- if (prefix != null && !prefix.isEmpty()) {
- bucketLister.setPrefix(prefix);
- }
VersionListing versionListing;
final Set<String> listedKeys = new HashSet<>();
@@ -486,6 +469,33 @@
}
}
+ private S3BucketLister getS3BucketLister(final ProcessContext context, final AmazonS3 client) {
+ final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
+ final boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
+
+ final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ final String delimiter = context.getProperty(DELIMITER).getValue();
+ final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+
+ final int listType = context.getProperty(LIST_TYPE).asInteger();
+
+ final S3BucketLister bucketLister = useVersions
+ ? new S3VersionBucketLister(client)
+ : listType == 2
+ ? new S3ObjectBucketListerVersion2(client)
+ : new S3ObjectBucketLister(client);
+
+ bucketLister.setBucketName(bucket);
+ bucketLister.setRequesterPays(requesterPays);
+
+ if (delimiter != null && !delimiter.isEmpty()) {
+ bucketLister.setDelimiter(delimiter);
+ }
+ if (prefix != null && !prefix.isEmpty()) {
+ bucketLister.setPrefix(prefix);
+ }
+ return bucketLister;
+ }
private interface S3BucketLister {
void setBucketName(String bucketName);
@@ -891,11 +901,15 @@
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog logger, final Map<String, String> attributes) {
- final AmazonS3Client client = createClient(context, getCredentials(context), createConfiguration(context));
- initializeRegionAndEndpoint(context, client);
+ final ControllerService service = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
+ final AmazonS3Client client = service != null ? createClient(context, getCredentialsProvider(context), createConfiguration(context))
+ : createClient(context, getCredentials(context), createConfiguration(context));
+
+ getRegionAndInitializeEndpoint(context, client);
final List<ConfigVerificationResult> results = new ArrayList<>();
final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
if (bucketName == null || bucketName.trim().isEmpty()) {
results.add(new ConfigVerificationResult.Builder()
@@ -907,17 +921,35 @@
return results;
}
- final String prefix = context.getProperty(PREFIX).getValue();
+ final S3BucketLister bucketLister = getS3BucketLister(context, client);
+ final long listingTimestamp = System.currentTimeMillis();
// Attempt to perform a listing of objects in the S3 bucket
try {
- final ObjectListing listing = client.listObjects(bucketName, prefix);
- final int count = listing.getObjectSummaries().size();
+ int listCount = 0;
+ int totalListCount = 0;
+ VersionListing versionListing;
+ do {
+ versionListing = bucketLister.listVersions();
+ for (final S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
+ long lastModified = versionSummary.getLastModified().getTime();
+ if (lastModified > (listingTimestamp - minAgeMilliseconds)) {
+ continue;
+ }
+
+ listCount++;
+ }
+ bucketLister.setNextMarker();
+
+ totalListCount += listCount;
+
+ listCount = 0;
+ } while (bucketLister.isTruncated());
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Perform Listing")
.outcome(Outcome.SUCCESSFUL)
- .explanation("Successfully listed contents of bucket '" + bucketName + "', finding " + count + " objects" + (prefix == null ? "" : " with a prefix of '" + prefix + "'"))
+ .explanation("Successfully listed contents of bucket '" + bucketName + "', finding " + totalListCount + " objects matching the filter")
.build());
logger.info("Successfully verified configuration");
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index a05c9e3..2942f23 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.aws.s3;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
@@ -28,7 +30,10 @@
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;
import org.apache.commons.lang3.time.DateUtils;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.state.MockStateManager;
@@ -44,6 +49,7 @@
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -66,6 +72,11 @@
protected AmazonS3Client getClient() {
return mockS3Client;
}
+
+ @Override
+ protected AmazonS3Client createClient(ProcessContext context, AWSCredentials credentials, ClientConfiguration config) {
+ return mockS3Client;
+ }
};
runner = TestRunners.newTestRunner(mockListS3);
}
@@ -114,6 +125,10 @@
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
flowFiles.get(2).assertAttributeEquals("filename", "d/e");
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, lastModifiedTimestamp, Scope.CLUSTER);
+
+ final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
+ .verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+ assertTrue(results.get(0).getExplanation().contains("finding 3 objects"));
}
@Test