NIFI-9276: Adding config verification to AbstractListProcessor subclasses (#5453)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index 8c82dbe..eb439b4 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -35,7 +35,6 @@
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
 import com.microsoft.azure.storage.blob.CloudBlockBlob;
 import com.microsoft.azure.storage.blob.ListBlobItem;
-
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -62,6 +61,8 @@
 import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.util.Optional;
+
 @PrimaryNodeOnly
 @TriggerSerially
 @Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@@ -141,6 +142,11 @@
     }
 
     @Override
+    protected String getListingContainerName(final ProcessContext context) {
+        return String.format("Azure Blob Storage Container [%s]", getPath(context));
+    }
+
+    @Override
     protected String getPath(final ProcessContext context) {
         return context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
     }
@@ -173,27 +179,24 @@
     }
 
     @Override
-    protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
-        String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
-        String prefix = context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue();
-        if (prefix == null) {
-            prefix = "";
-        }
+    protected List<BlobInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
+        final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions().getValue();
+        final String prefix = Optional.ofNullable(context.getProperty(PROP_PREFIX).evaluateAttributeExpressions().getValue()).orElse("");
         final List<BlobInfo> listing = new ArrayList<>();
         try {
-            CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
-            CloudBlobContainer container = blobClient.getContainerReference(containerName);
+            final CloudBlobClient blobClient = AzureStorageUtils.createCloudBlobClient(context, getLogger(), null);
+            final CloudBlobContainer container = blobClient.getContainerReference(containerName);
 
             final OperationContext operationContext = new OperationContext();
             AzureStorageUtils.setProxy(operationContext, context);
 
-            for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
+            for (final ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, operationContext)) {
                 if (blob instanceof CloudBlob) {
-                    CloudBlob cloudBlob = (CloudBlob) blob;
-                    BlobProperties properties = cloudBlob.getProperties();
-                    StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
+                    final CloudBlob cloudBlob = (CloudBlob) blob;
+                    final BlobProperties properties = cloudBlob.getProperties();
+                    final StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
 
-                    Builder builder = new BlobInfo.Builder()
+                    final Builder builder = new BlobInfo.Builder()
                                               .primaryUri(uri.getPrimaryUri().toString())
                                               .blobName(cloudBlob.getName())
                                               .containerName(containerName)
@@ -215,12 +218,15 @@
                     listing.add(builder.build());
                 }
             }
-        } catch (Throwable t) {
+        } catch (final Throwable t) {
             throw new IOException(ExceptionUtils.getRootCause(t));
         }
         return listing;
     }
 
-
-
+    // Unfiltered listing is not supported - must provide a prefix
+    @Override
+    protected Integer countUnfilteredListing(final ProcessContext context) {
+        return null;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index c726e81..268b8c1 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -160,12 +160,9 @@
     }
 
     @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        String fileFilter = context.getProperty(FILE_FILTER).evaluateAttributeExpressions().getValue();
-        filePattern = fileFilter != null ? Pattern.compile(fileFilter) : null;
-
-        String pathFilter = context.getProperty(PATH_FILTER).evaluateAttributeExpressions().getValue();
-        pathPattern = pathFilter != null ? Pattern.compile(pathFilter) : null;
+    public void onScheduled(final ProcessContext context) {
+        filePattern = getPattern(context, FILE_FILTER);
+        pathPattern = getPattern(context, PATH_FILTER);
     }
 
     @OnStopped
@@ -175,7 +172,7 @@
     }
 
     @Override
-    protected void customValidate(ValidationContext context, Collection<ValidationResult> results) {
+    protected void customValidate(final ValidationContext context, final Collection<ValidationResult> results) {
         if (context.getProperty(PATH_FILTER).isSet() && !context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean()) {
             results.add(new ValidationResult.Builder()
                     .subject(PATH_FILTER.getDisplayName())
@@ -191,7 +188,7 @@
     }
 
     @Override
-    protected Scope getStateScope(PropertyContext context) {
+    protected Scope getStateScope(final PropertyContext context) {
         return Scope.CLUSTER;
     }
 
@@ -201,55 +198,34 @@
     }
 
     @Override
-    protected boolean isListingResetNecessary(PropertyDescriptor property) {
+    protected boolean isListingResetNecessary(final PropertyDescriptor property) {
         return LISTING_RESET_PROPERTIES.contains(property);
     }
 
     @Override
-    protected String getPath(ProcessContext context) {
-        String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
+    protected String getPath(final ProcessContext context) {
+        final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
         return directory != null ? directory : ".";
     }
 
     @Override
-    protected List<ADLSFileInfo> performListing(ProcessContext context, Long minTimestamp) throws IOException {
-        try {
-            String fileSystem = evaluateFileSystemProperty(context, null);
-            String baseDirectory = evaluateDirectoryProperty(context, null);
-            boolean recurseSubdirectories = context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean();
-
-            DataLakeServiceClient storageClient = getStorageClient(context, null);
-            DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
-
-            ListPathsOptions options = new ListPathsOptions();
-            options.setPath(baseDirectory);
-            options.setRecursive(recurseSubdirectories);
-
-            Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
-
-            List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream()
-                    .filter(pathItem -> !pathItem.isDirectory())
-                    .map(pathItem -> new ADLSFileInfo.Builder()
-                            .fileSystem(fileSystem)
-                            .filePath(pathItem.getName())
-                            .length(pathItem.getContentLength())
-                            .lastModified(pathItem.getLastModified().toInstant().toEpochMilli())
-                            .etag(pathItem.getETag())
-                            .build())
-                    .filter(fileInfo -> filePattern == null || filePattern.matcher(fileInfo.getFilename()).matches())
-                    .filter(fileInfo -> pathPattern == null || pathPattern.matcher(RegExUtils.removeFirst(fileInfo.getDirectory(), baseDirectoryPattern)).matches())
-                    .collect(Collectors.toList());
-
-            return listing;
-        } catch (Exception e) {
-            getLogger().error("Failed to list directory on Azure Data Lake Storage", e);
-            throw new IOException(ExceptionUtils.getRootCause(e));
-        }
+    protected List<ADLSFileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
+        return performListing(context, listingMode, true);
     }
 
     @Override
-    protected Map<String, String> createAttributes(ADLSFileInfo fileInfo, ProcessContext context) {
-        Map<String, String> attributes = new HashMap<>();
+    protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
+        return performListing(context, ListingMode.CONFIGURATION_VERIFICATION, false).size();
+    }
+
+    @Override
+    protected String getListingContainerName(final ProcessContext context) {
+        return String.format("Azure Data Lake Directory [%s]", getPath(context));
+    }
+
+    @Override
+    protected Map<String, String> createAttributes(final ADLSFileInfo fileInfo, final ProcessContext context) {
+        final Map<String, String> attributes = new HashMap<>();
 
         attributes.put(ATTR_NAME_FILESYSTEM, fileInfo.getFileSystem());
         attributes.put(ATTR_NAME_FILE_PATH, fileInfo.getFilePath());
@@ -261,4 +237,48 @@
 
         return attributes;
     }
+
+    private List<ADLSFileInfo> performListing(final ProcessContext context, final ListingMode listingMode,
+                                              final boolean applyFilters) throws IOException {
+        try {
+            final String fileSystem = evaluateFileSystemProperty(context, null);
+            final String baseDirectory = evaluateDirectoryProperty(context, null);
+            final boolean recurseSubdirectories = context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean();
+
+            final Pattern filePattern = listingMode == ListingMode.EXECUTION ? this.filePattern : getPattern(context, FILE_FILTER);
+            final Pattern pathPattern = listingMode == ListingMode.EXECUTION ? this.pathPattern : getPattern(context, PATH_FILTER);
+
+            final DataLakeServiceClient storageClient = getStorageClient(context, null);
+            final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
+
+            final ListPathsOptions options = new ListPathsOptions();
+            options.setPath(baseDirectory);
+            options.setRecursive(recurseSubdirectories);
+
+            final Pattern baseDirectoryPattern = Pattern.compile("^" + baseDirectory + "/?");
+
+            final List<ADLSFileInfo> listing = fileSystemClient.listPaths(options, null).stream()
+                    .filter(pathItem -> !pathItem.isDirectory())
+                    .map(pathItem -> new ADLSFileInfo.Builder()
+                            .fileSystem(fileSystem)
+                            .filePath(pathItem.getName())
+                            .length(pathItem.getContentLength())
+                            .lastModified(pathItem.getLastModified().toInstant().toEpochMilli())
+                            .etag(pathItem.getETag())
+                            .build())
+                    .filter(fileInfo -> applyFilters && (filePattern == null || filePattern.matcher(fileInfo.getFilename()).matches()))
+                    .filter(fileInfo -> applyFilters && (pathPattern == null || pathPattern.matcher(RegExUtils.removeFirst(fileInfo.getDirectory(), baseDirectoryPattern)).matches()))
+                    .collect(Collectors.toList());
+
+            return listing;
+        } catch (final Exception e) {
+            getLogger().error("Failed to list directory on Azure Data Lake Storage", e);
+            throw new IOException(ExceptionUtils.getRootCause(e));
+        }
+    }
+
+    private Pattern getPattern(final ProcessContext context, final PropertyDescriptor filterDescriptor) {
+        String value = context.getProperty(filterDescriptor).evaluateAttributeExpressions().getValue();
+        return value != null ? Pattern.compile(value) : null;
+    }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index e494b8c..4fcb862 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -26,6 +26,8 @@
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.ValidationContext;
@@ -40,10 +42,12 @@
 import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
 import org.apache.nifi.distributed.cache.client.exception.SerializationException;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordSetWriter;
@@ -121,7 +125,7 @@
  * </p>
  * <ul>
  * <li>
- * Perform a listing of resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
+ * Perform a listing of resources. The subclass will implement the {@link #performListing(ProcessContext, Long, ListingMode)} method, which creates a listing of all
  * entities on the target system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
  * entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability
  * to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation.
@@ -149,42 +153,61 @@
 @TriggerSerially
 @Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. "
     + "The scope used depends on the implementation.")
-public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
+public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor implements VerifiableProcessor {
+
+    /**
+     * Indicates the mode when performing a listing.
+     */
+    protected enum ListingMode {
+        /**
+         * Indicates the listing is being performed during normal processor execution.  May use configuration cached in the Processor object.
+         */
+        EXECUTION,
+        /**
+         * Indicates the listing is being performed during configuration verification.  Only use configuration provided in the ProcessContext argument, since the configuration may not
+         * have been applied to the processor yet.
+         */
+        CONFIGURATION_VERIFICATION
+    }
+
+    private static final Long IGNORE_MIN_TIMESTAMP_VALUE = 0L;
 
     public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new Builder()
-        .name("Distributed Cache Service")
-        .description("NOTE: This property is used merely for migration from old NiFi version before state management was introduced at version 0.5.0. "
-            + "The stored value in the cache service will be migrated into the state when this processor is started at the first time. "
-            + "The specified Controller Service was used to maintain state about what had been pulled from the remote server so that if a new node "
-            + "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information was not shared across the cluster. "
-            + "This property did not need to be set for standalone instances of NiFi but was supposed to be configured if NiFi had been running within a cluster.")
-        .required(false)
-        .identifiesControllerService(DistributedMapCacheClient.class)
-        .build();
+            .name("Distributed Cache Service")
+            .description("NOTE: This property is used merely for migration from old NiFi version before state management was introduced at version 0.5.0. "
+                    + "The stored value in the cache service will be migrated into the state when this processor is started at the first time. "
+                    + "The specified Controller Service was used to maintain state about what had been pulled from the remote server so that if a new node "
+                    + "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information was not shared across the cluster. "
+                    + "This property did not need to be set for standalone instances of NiFi but was supposed to be configured if NiFi had been running within a cluster.")
+            .required(false)
+            .identifiesControllerService(DistributedMapCacheClient.class)
+            .build();
 
     public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect",
-    "Automatically detect time unit deterministically based on candidate entries timestamp."
-            + " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
-            + " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
+            "Automatically detect time unit deterministically based on candidate entries timestamp."
+                    + " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
+                    + " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', "
+                    + "then its precision is determined as 'seconds'.");
     public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds",
             "This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
-    public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds","For a target system that does not have millis precision, but has in seconds.");
+    public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds",
+            "For a target system that does not have millis precision, but has in seconds.");
     public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
 
     public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new Builder()
-        .name("target-system-timestamp-precision")
-        .displayName("Target System Timestamp Precision")
-        .description("Specify timestamp precision at the target system."
-                + " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.")
-        .required(true)
-        .allowableValues(PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES)
-        .defaultValue(PRECISION_AUTO_DETECT.getValue())
-        .build();
+            .name("target-system-timestamp-precision")
+            .displayName("Target System Timestamp Precision")
+            .description("Specify timestamp precision at the target system."
+                    + " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.")
+            .required(true)
+            .allowableValues(PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES)
+            .defaultValue(PRECISION_AUTO_DETECT.getValue())
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("All FlowFiles that are received are routed to success")
-        .build();
+            .name("success")
+            .description("All FlowFiles that are received are routed to success")
+            .build();
 
     public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
             "This strategy tracks the latest timestamp of listed entity to determine new/updated entities." +
@@ -214,22 +237,22 @@
                     " are accurate.");
 
     public static final PropertyDescriptor LISTING_STRATEGY = new Builder()
-        .name("listing-strategy")
-        .displayName("Listing Strategy")
-        .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
-        .required(true)
-        .allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING)
-        .defaultValue(BY_TIMESTAMPS.getValue())
-        .build();
+            .name("listing-strategy")
+            .displayName("Listing Strategy")
+            .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
+            .required(true)
+            .allowableValues(BY_TIMESTAMPS, BY_ENTITIES, NO_TRACKING)
+            .defaultValue(BY_TIMESTAMPS.getValue())
+            .build();
 
     public static final PropertyDescriptor RECORD_WRITER = new Builder()
-        .name("record-writer")
-        .displayName("Record Writer")
-        .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
-            "all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.")
-        .required(false)
-        .identifiesControllerService(RecordSetWriterFactory.class)
-        .build();
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. " +
+                    "If the Record Writer is specified, all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.")
+            .required(false)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
 
     /**
      * Represents the timestamp of an entity which was the latest one within those listed at the previous cycle.
@@ -255,6 +278,7 @@
      * near instantaneously after the prior iteration effectively voiding the built in buffer
      */
     public static final Map<TimeUnit, Long> LISTING_LAG_MILLIS;
+
     static {
         final Map<TimeUnit, Long> nanos = new HashMap<>();
         nanos.put(TimeUnit.MILLISECONDS, 100L);
@@ -262,6 +286,7 @@
         nanos.put(TimeUnit.MINUTES, 60_000L);
         LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos);
     }
+
     static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
     static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
     static final String IDENTIFIER_PREFIX = "id";
@@ -279,7 +304,6 @@
         }
     }
 
-
     @Override
     public Set<Relationship> getRelationships() {
         final Set<Relationship> relationships = new HashSet<>();
@@ -291,7 +315,7 @@
      * In order to add custom validation at sub-classes, implement {@link #customValidate(ValidationContext, Collection)} method.
      */
     @Override
-    protected final Collection<ValidationResult> customValidate(ValidationContext context) {
+    protected final Collection<ValidationResult> customValidate(final ValidationContext context) {
         final Collection<ValidationResult> results = new ArrayList<>();
 
         final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
@@ -303,9 +327,9 @@
         return results;
     }
 
-
     /**
      * Sub-classes can add custom validation by implementing this method.
+     *
      * @param validationContext the validation context
      * @param validationResults add custom validation result to this collection
      */
@@ -313,6 +337,54 @@
 
     }
 
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog logger, final Map<String, String> attributes) {
+
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+
+        final String containerName = getListingContainerName(context);
+        try {
+            final Integer unfilteredListingCount = countUnfilteredListing(context);
+            final int matchingCount = performListing(context, IGNORE_MIN_TIMESTAMP_VALUE, ListingMode.CONFIGURATION_VERIFICATION).size();
+
+            final String countExplanation;
+            if (unfilteredListingCount == null) {
+                if (matchingCount == 0) {
+                    countExplanation = "Found no objects matching the filter.";
+                } else {
+                    final String matchingCountText = matchingCount == 1 ? matchingCount + " object" : matchingCount + " objects";
+                    countExplanation = String.format("Found %s matching the filter.", matchingCountText);
+                }
+            } else if (unfilteredListingCount == 0) {
+                countExplanation = "Found no objects.";
+            } else {
+                final String unfilteredListingCountText = unfilteredListingCount == 1 ? unfilteredListingCount + " object" : unfilteredListingCount + " objects";
+                final String unfilteredDemonstrativePronoun = unfilteredListingCount == 1 ? "that" : "those";
+                final String matchingCountText = matchingCount == 1 ? matchingCount + " matches" : matchingCount + " match";
+                countExplanation = String.format("Found %s.  Of %s, %s the filter.",
+                        unfilteredListingCountText, unfilteredDemonstrativePronoun, matchingCountText);
+            }
+
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Perform Listing")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation(String.format("Successfully listed contents of %s.  %s", containerName, countExplanation))
+                    .build());
+
+            logger.info("Successfully verified configuration");
+        } catch (final IOException e) {
+            logger.warn("Failed to verify configuration. Could not list contents of {}", containerName, e);
+
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Perform Listing")
+                    .outcome(Outcome.FAILED)
+                    .explanation(String.format("Failed to list contents of %s: %s", containerName, e.getMessage()))
+                    .build());
+        }
+
+        return results;
+    }
+
     @OnPrimaryNodeStateChange
     public void onPrimaryNodeChange(final PrimaryNodeState newState) {
         justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
@@ -376,7 +448,7 @@
                     client.remove(path, new StringSerDe());
                 } catch (final IOException ioe) {
                     getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new "
-                        + "State Management service, so the Distributed Cache Service is no longer needed.");
+                            + "State Management service, so the Distributed Cache Service is no longer needed.");
                 }
             }
         }
@@ -415,8 +487,8 @@
     }
 
     private Map<String, String> createStateMap(final long latestListedEntryTimestampThisCycleMillis,
-                         final long lastProcessedLatestEntryTimestampMillis,
-                         final List<String> processedIdentifiesWithLatestTimestamp) throws IOException {
+                                               final long lastProcessedLatestEntryTimestampMillis,
+                                               final List<String> processedIdentifiesWithLatestTimestamp) throws IOException {
 
         final Map<String, String> updatedState = new HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2);
         updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
@@ -428,7 +500,6 @@
         return updatedState;
     }
 
-
     private void persist(final long latestListedEntryTimestampThisCycleMillis,
                          final long lastProcessedLatestEntryTimestampMillis,
                          final List<String> processedIdentifiesWithLatestTimestamp,
@@ -467,6 +538,10 @@
         }
     }
 
+    protected long getCurrentTime() {
+        return System.currentTimeMillis();
+    }
+
     public void listByNoTracking(final ProcessContext context, final ProcessSession session) {
         final List<T> entityList;
 
@@ -483,7 +558,7 @@
         try {
             // minTimestamp = 0L by default on this strategy to ignore any future
             // comparision in lastModifiedMap to the same entity.
-            entityList = performListing(context, 0L);
+            entityList = performListing(context, IGNORE_MIN_TIMESTAMP_VALUE, ListingMode.EXECUTION);
         } catch (final IOException pe) {
             getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{pe.getMessage()}, pe);
             context.yield();
@@ -516,12 +591,12 @@
     }
 
     public void listByTimeWindow(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        if (this.lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
+        if (lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
             try {
                 final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
                 Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY))
-                    .map(Long::parseLong)
-                    .ifPresent(lastTimestamp -> this.lastListedLatestEntryTimestampMillis = lastTimestamp);
+                        .map(Long::parseLong)
+                        .ifPresent(lastTimestamp -> lastListedLatestEntryTimestampMillis = lastTimestamp);
 
                 justElectedPrimaryNode = false;
             } catch (final IOException ioe) {
@@ -531,14 +606,14 @@
             }
         }
 
-        long lowerBoundInclusiveTimestamp = Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L);
+        long lowerBoundInclusiveTimestamp = Optional.ofNullable(lastListedLatestEntryTimestampMillis).orElse(IGNORE_MIN_TIMESTAMP_VALUE);
         long upperBoundExclusiveTimestamp;
 
         long currentTime = getCurrentTime();
 
         final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
         try {
-            List<T> entityList = performListing(context, lowerBoundInclusiveTimestamp);
+            List<T> entityList = performListing(context, lowerBoundInclusiveTimestamp, ListingMode.EXECUTION);
 
             boolean targetSystemHasMilliseconds = false;
             boolean targetSystemHasSeconds = false;
@@ -560,7 +635,7 @@
             }
             final TimeUnit targetSystemTimePrecision
                     = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
-                        ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
+                    ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
                     : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS
                     : PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
             final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
@@ -572,19 +647,19 @@
                 getLogger().trace("entityList: " + entityList.stream().map(entity -> entity.getName() + "_" + entity.getTimestamp()).collect(Collectors.joining(", ")));
             }
             entityList
-                .stream()
-                .filter(entity -> entity.getTimestamp() >= lowerBoundInclusiveTimestamp)
-                .filter(entity -> entity.getTimestamp() < upperBoundExclusiveTimestamp)
-                .forEach(entity -> orderedEntries
-                    .computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>())
-                    .add(entity)
-                );
+                    .stream()
+                    .filter(entity -> entity.getTimestamp() >= lowerBoundInclusiveTimestamp)
+                    .filter(entity -> entity.getTimestamp() < upperBoundExclusiveTimestamp)
+                    .forEach(entity -> orderedEntries
+                            .computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>())
+                            .add(entity)
+                    );
             if (getLogger().isTraceEnabled()) {
                 getLogger().trace("orderedEntries: " +
-                    orderedEntries.values().stream()
-                        .flatMap(List::stream)
-                        .map(entity -> entity.getName() + "_" + entity.getTimestamp())
-                        .collect(Collectors.joining(", "))
+                        orderedEntries.values().stream()
+                                .flatMap(List::stream)
+                                .map(entity -> entity.getName() + "_" + entity.getTimestamp())
+                                .collect(Collectors.joining(", "))
                 );
             }
         } catch (final IOException e) {
@@ -614,31 +689,27 @@
 
         try {
             if (getLogger().isTraceEnabled()) {
-                getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + this.lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp);
+                getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp);
             }
-            this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp;
+            lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp;
             persist(upperBoundExclusiveTimestamp, upperBoundExclusiveTimestamp, latestIdentifiersProcessed, session, getStateScope(context));
         } catch (final IOException ioe) {
             getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
-                + "if another node begins executing this Processor, data duplication may occur.", ioe);
+                    + "if another node begins executing this Processor, data duplication may occur.", ioe);
         }
     }
 
-    protected long getCurrentTime() {
-        return System.currentTimeMillis();
-    }
-
     public void listByTrackingTimestamps(final ProcessContext context, final ProcessSession session) throws ProcessException {
         Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;
 
-        if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
+        if (lastListedLatestEntryTimestampMillis == null || lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
             try {
                 boolean noUpdateRequired = false;
                 // Attempt to retrieve state from the state manager if a last listing was not yet established or
                 // if just elected the primary node
                 final StateMap stateMap = session.getState(getStateScope(context));
                 latestIdentifiersProcessed.clear();
-                for (Map.Entry<String, String> state : stateMap.toMap().entrySet()) {
+                for (final Map.Entry<String, String> state : stateMap.toMap().entrySet()) {
                     final String k = state.getKey();
                     final String v = state.getValue();
                     if (v == null || v.isEmpty()) {
@@ -648,13 +719,13 @@
                     if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
                         minTimestampToListMillis = Long.parseLong(v);
                         // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
-                        if (minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
+                        if (minTimestampToListMillis.equals(lastListedLatestEntryTimestampMillis)) {
                             noUpdateRequired = true;
                         } else {
-                            this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
+                            lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
                         }
                     } else if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
-                        this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
+                        lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
                     } else if (k.startsWith(IDENTIFIER_PREFIX)) {
                         latestIdentifiersProcessed.add(v);
                     }
@@ -676,7 +747,7 @@
         final long currentRunTimeMillis = System.currentTimeMillis();
         try {
             // track of when this last executed for consideration of the lag nanos
-            entityList = performListing(context, minTimestampToListMillis);
+            entityList = performListing(context, minTimestampToListMillis, ListingMode.EXECUTION);
         } catch (final IOException e) {
             getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
             context.yield();
@@ -728,7 +799,7 @@
             }
             final TimeUnit targetSystemTimePrecision
                     = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
-                        ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
+                    ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
                     : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS
                     : PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
             final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
@@ -743,8 +814,8 @@
                 final long  listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
                 if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos
                         || (latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
-                            && orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream()
-                                    .allMatch(entity -> latestIdentifiersProcessed.contains(entity.getIdentifier())))) {
+                        && orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream()
+                                .allMatch(entity -> latestIdentifiersProcessed.contains(entity.getIdentifier())))) {
                     context.yield();
                     return;
                 }
@@ -778,7 +849,7 @@
 
         // As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
         if (latestListedEntryTimestampThisCycleMillis != null) {
-            boolean processedNewFiles = entitiesListed > 0;
+            final boolean processedNewFiles = entitiesListed > 0;
 
             if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
                 // We have performed a listing and pushed any FlowFiles out that may have been generated
@@ -794,7 +865,7 @@
                     persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, latestIdentifiersProcessed, session, getStateScope(context));
                 } catch (final IOException ioe) {
                     getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
-                        + "if another node begins executing this Processor, data duplication may occur.", ioe);
+                            + "if another node begins executing this Processor, data duplication may occur.", ioe);
                 }
             }
 
@@ -834,17 +905,17 @@
         final WriteResult writeResult;
 
         try (final OutputStream out = session.write(flowFile);
-            final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
+             final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
 
             recordSetWriter.beginRecordSet();
-            for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
+            for (final Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
                 List<T> entities = timestampEntities.getValue();
                 if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
                     // Filter out previously processed entities.
                     entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
                 }
 
-                for (T entity : entities) {
+                for (final T entity : entities) {
                     entitiesListed++;
                     recordSetWriter.write(entity.toRecord());
                 }
@@ -868,14 +939,14 @@
 
     private int createFlowFilesForEntities(final ProcessContext context, final ProcessSession session, final Map<Long, List<T>> orderedEntries) {
         int entitiesListed = 0;
-        for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
+        for (final Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
             List<T> entities = timestampEntities.getValue();
             if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
                 // Filter out previously processed entities.
                 entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
             }
 
-            for (T entity : entities) {
+            for (final T entity : entities) {
                 entitiesListed++;
 
                 // Create the FlowFile for this path.
@@ -894,6 +965,7 @@
      * So that it use return different precisions than PRECISION_AUTO_DETECT.
      * If TARGET_SYSTEM_TIMESTAMP_PRECISION is supported as a valid Processor property,
      * then PRECISION_AUTO_DETECT will be the default value when not specified by a user.
+     *
      * @return
      */
     protected String getDefaultTimePrecision() {
@@ -936,11 +1008,13 @@
      * will be filtered out by the Processor. Therefore, it is not necessary that implementations perform this filtering but can be more efficient
      * if the filtering can be performed on the server side prior to retrieving the information.
      *
-     * @param context      the ProcessContex to use in order to pull the appropriate entities
-     * @param minTimestamp the minimum timestamp of entities that should be returned.
+     * @param context      the ProcessContext to use in order to pull the appropriate entities
+     * @param minTimestamp the minimum timestamp of entities that should be returned
+     * @param listingMode  the listing mode, indicating whether the listing is being performed during configuration verification or normal processor execution
      * @return a Listing of entities that have a timestamp >= minTimestamp
      */
-    protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp) throws IOException;
+    protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode)
+            throws IOException;
 
     /**
      * Determines whether or not the listing must be reset if the value of the given property is changed
@@ -963,7 +1037,25 @@
      */
     protected abstract RecordSchema getRecordSchema();
 
+    /**
+     * Performs an unfiltered listing and returns the count, or null if this operation is not supported.
+     *
+     * @param context the ProcessContext to use in order to pull the appropriate entities
+     * @return The number of unfiltered entities in the listing, or null if this processor does not support an unfiltered listing
+     */
+    protected abstract Integer countUnfilteredListing(final ProcessContext context)
+            throws IOException;
+
+    /**
+     * Provides a human-readable name for the container being listed, for the purpose of displaying readable verification messages during processor configuration verification.
+     *
+     * @param context The process context
+     * @return The user-friendly name for the container
+     */
+    protected abstract String getListingContainerName(final ProcessContext context);
+
     private static class StringSerDe implements Serializer<String>, Deserializer<String> {
+
         @Override
         public String deserialize(final byte[] value) throws DeserializationException, IOException {
             if (value == null) {
@@ -972,11 +1064,11 @@
 
             return new String(value, StandardCharsets.UTF_8);
         }
-
         @Override
         public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
             out.write(value.getBytes(StandardCharsets.UTF_8));
         }
+
     }
 
     @OnScheduled
@@ -1007,7 +1099,7 @@
     private void listByTrackingEntities(ProcessContext context, ProcessSession session) throws ProcessException {
         listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, getStateScope(context), minTimestampToList -> {
             try {
-                return performListing(context, minTimestampToList);
+                return performListing(context, minTimestampToList, ListingMode.EXECUTION);
             } catch (final IOException e) {
                 getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e);
                 return Collections.emptyList();
@@ -1015,5 +1107,4 @@
         }, entity -> createAttributes(entity, context));
         justElectedPrimaryNode = false;
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
index 7565f18..1a70ecb 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
@@ -257,7 +257,7 @@
                 } else {
                     this.alreadyListedEntities = new ConcurrentHashMap<>(fetchedListedEntities);
                 }
-            } catch (IOException e) {
+            } catch (final IOException e) {
                 throw new ProcessException("Failed to restore already-listed entities due to " + e, e);
             }
         }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
index e59d65c..ec64785 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processor.util.list;
 
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.flowfile.FlowFile;
@@ -30,7 +31,9 @@
 import org.junit.rules.TestWatcher;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -225,6 +228,10 @@
         runner.run();
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
         runner.clearTransferState();
+
+        final List<ConfigVerificationResult> results = proc.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+        assertEquals(1, results.size());
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(0).getOutcome());
     }
 
     private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index 440a08d..4f78e8c 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -17,7 +17,10 @@
 
 package org.apache.nifi.processor.util.list;
 
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
@@ -30,6 +33,7 @@
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -43,6 +47,7 @@
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.glassfish.jersey.internal.guava.Predicates;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -55,6 +60,7 @@
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -62,11 +68,13 @@
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestAbstractListProcessor {
 
@@ -224,9 +232,12 @@
         proc.addEntity("one","firstFile",1585344381476L);
         proc.addEntity("two","secondFile",1585344381475L);
 
+        assertVerificationOutcome(Outcome.SUCCESSFUL, ".* Found 2 objects.  Of those, 2 match the filter.");
+
         runner.run();
         assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
         assertEquals(2, proc.entities.size());
+        assertVerificationOutcome(Outcome.SUCCESSFUL, ".* Found 2 objects.  Of those, 2 match the filter.");
 
         final MockStateManager stateManager = runner.getStateManager();
         final Map<String, String> expectedState = new HashMap<>();
@@ -252,14 +263,16 @@
 
         // Clear any listed entities after choose No Tracking Strategy
         proc.entities.clear();
+        assertVerificationOutcome(Outcome.SUCCESSFUL, ".* Found no objects.");
 
         // Add new entity
         proc.addEntity("one","firstFile",1585344381476L);
-        proc.listByNoTracking(context, session);
+        proc.listByTrackingTimestamps(context, session);
 
         // Test if state cleared or not
         runner.getStateManager().assertStateNotEquals(expectedState, Scope.CLUSTER);
         assertEquals(1, proc.entities.size());
+        assertVerificationOutcome(Outcome.SUCCESSFUL, ".* Found 1 object.  Of that, 1 matches the filter.");
     }
 
     @Test
@@ -285,14 +298,22 @@
         proc.addEntity("one", "one", 1, 1);
         proc.currentTimestamp.set(1L);
         runner.clearTransferState();
+        // Prior to running the processor, we should expect 3 objects during verification
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+                "Found 1 object.  Of that, 1 matches the filter.");
         runner.run();
         assertEquals(1, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
         runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
             .assertAttributeEquals(CoreAttributes.FILENAME.key(), "one");
+        // The object is now tracked, so it's no longer considered new
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+                "Found 1 object.  Of that, 1 matches the filter.");
 
         // Should not list any entity.
         proc.currentTimestamp.set(2L);
         runner.clearTransferState();
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+                "Found 1 object.  Of that, 1 matches the filter.");
         runner.run();
         assertEquals(0, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
 
@@ -301,6 +322,8 @@
         proc.addEntity("five", "five", 5, 5);
         proc.addEntity("six", "six", 6, 6);
         runner.clearTransferState();
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+                "Found 3 objects.  Of those, 3 match the filter.");
         runner.run();
         assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
         runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
@@ -316,6 +339,8 @@
         proc.addEntity("three", "three", 3, 3);
         proc.addEntity("four", "four", 4, 4);
         runner.clearTransferState();
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+                "Found 6 objects.  Of those, 6 match the filter.");
         runner.run();
         assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
         runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
@@ -329,6 +354,8 @@
         proc.addEntity("five", "five", 7, 5);
         proc.addEntity("six", "six", 6, 16);
         runner.clearTransferState();
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+                "Found 6 objects.  Of those, 6 match the filter.");
         runner.run();
         assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
         runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
@@ -344,7 +371,12 @@
         runner.setProperty(ConcreteListProcessor.RESET_STATE, "1");
         runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "window");
         runner.clearTransferState();
+
+        // Prior to running the processor, we should expect 3 objects during verification
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+                "Found 6 objects.  Of those, 6 match the filter.");
         runner.run();
+
         assertEquals(3, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
         runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0)
                 .assertAttributeEquals(CoreAttributes.FILENAME.key(), "four");
@@ -353,16 +385,44 @@
         runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(2)
                 .assertAttributeEquals(CoreAttributes.FILENAME.key(), "five");
 
-
         // Reset state again.
         proc.currentTimestamp.set(20L);
         // ConcreteListProcessor can reset state with any property.
         runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "all");
         runner.setProperty(ConcreteListProcessor.RESET_STATE, "2");
         runner.clearTransferState();
+
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+                "Found 6 objects.  Of those, 6 match the filter.");
+
         runner.run();
         // All entities should be picked, one to six.
         assertEquals(6, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
+        // Now all are tracked, so none are new
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+                "Found 6 objects.  Of those, 6 match the filter.");
+
+        // Reset state again.
+        proc.currentTimestamp.set(25L);
+        runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "window");
+        runner.setProperty(ListedEntityTracker.TRACKING_TIME_WINDOW, "20ms");
+        runner.setProperty(ConcreteListProcessor.LISTING_FILTER, "f[a-z]+"); // Match only four and five
+        runner.setProperty(ConcreteListProcessor.RESET_STATE, "3");
+        runner.clearTransferState();
+
+        // Time window is now 5ms - 25ms, so only 5 and 6 fall in the window, so only 1 of the 2 filtered entities are considered 'new'
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed contents of .*.json.*" +
+                "Found 6 objects.  Of those, 2 match the filter.");
+    }
+
+    private void assertVerificationOutcome(final Outcome expectedOutcome, final String expectedExplanationRegex) {
+        final List<ConfigVerificationResult> results = proc.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+
+        assertEquals(1, results.size());
+        final ConfigVerificationResult result = results.get(0);
+        assertEquals(expectedOutcome, result.getOutcome());
+        assertTrue(String.format("Expected verification result to match pattern [%s].  Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()),
+                result.getExplanation().matches(expectedExplanationRegex));
     }
 
     static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
@@ -434,6 +494,12 @@
                 .name("reset-state")
                 .addValidator(Validator.VALID)
                 .build();
+        private static final PropertyDescriptor LISTING_FILTER = new PropertyDescriptor.Builder()
+                .name("listing-filter")
+                .displayName("Listing Filter")
+                .description("Filters listed entities by name.")
+                .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+                .build();
 
         final AtomicReference<Long> currentTimestamp = new AtomicReference<>();
 
@@ -453,6 +519,7 @@
             properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW);
             properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET);
             properties.add(RESET_STATE);
+            properties.add(LISTING_FILTER);
             return properties;
         }
 
@@ -514,8 +581,17 @@
         }
 
         @Override
-        protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
-            return getEntityList();
+        protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp, ListingMode listingMode) throws IOException {
+            final PropertyValue listingFilter = context.getProperty(LISTING_FILTER);
+            Predicate<ListableEntity> filter = listingFilter.isSet()
+                    ? entity -> entity.getName().matches(listingFilter.getValue())
+                    : Predicates.alwaysTrue();
+            return getEntityList().stream().filter(filter).collect(Collectors.toList());
+        }
+
+        @Override
+        protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
+            return entities.size();
         }
 
         List<ListableEntity> getEntityList() {
@@ -528,6 +604,11 @@
         }
 
         @Override
+        protected String getListingContainerName(final ProcessContext context) {
+            return persistenceFilename;
+        }
+
+        @Override
         protected Scope getStateScope(final PropertyContext context) {
             return Scope.CLUSTER;
         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
index bd07aba..da36236 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java
@@ -296,7 +296,7 @@
         }
 
         final StopWatch stopWatch = new StopWatch(true);
-        final List<FileInfo> listing = transfer.getListing();
+        final List<FileInfo> listing = transfer.getListing(true);
         final long millis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
 
         int newItems = 0;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
index ad9601e..fd6032a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
@@ -126,4 +126,5 @@
     protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
         FTPTransfer.validateProxySpec(validationContext, results);
     }
+
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index 8d06dff..c0af210 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -338,7 +338,6 @@
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
-        fileFilterRef.set(createFileFilter(context));
         includeFileAttributes = context.getProperty(INCLUDE_FILE_ATTRIBUTES).asBoolean();
 
         final long maxDiskOperationMillis = context.getProperty(MAX_DISK_OPERATION_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
@@ -351,6 +350,7 @@
         } else {
             performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis);
         }
+        fileFilterRef.set(createFileFilter(context, performanceTracker, true));
 
         final long millisToKeepStats = TimeUnit.MINUTES.toMillis(15);
         final MonitorActiveTasks monitorTask = new MonitorActiveTasks(performanceTracker, getLogger(), maxDiskOperationMillis, maxListingMillis, millisToKeepStats);
@@ -502,12 +502,31 @@
     }
 
     @Override
-    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
+    protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
+        return performListing(context, 0L, ListingMode.CONFIGURATION_VERIFICATION, false).size();
+    }
+    @Override
+    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode)
+            throws IOException {
+        return performListing(context, minTimestamp, listingMode, true);
+    }
+
+    private List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode, final boolean applyFilters)
+            throws IOException {
         final Path basePath = new File(getPath(context)).toPath();
         final Boolean recurse = context.getProperty(RECURSE).asBoolean();
         final Map<Path, BasicFileAttributes> lastModifiedMap = new HashMap<>();
 
-        final BiPredicate<Path, BasicFileAttributes> fileFilter = fileFilterRef.get();
+        final BiPredicate<Path, BasicFileAttributes> fileFilter;
+        final PerformanceTracker performanceTracker;
+        if (listingMode == ListingMode.EXECUTION) {
+            fileFilter = fileFilterRef.get();
+            performanceTracker = this.performanceTracker;
+        } else {
+            final long maxDiskOperationMillis = context.getProperty(MAX_DISK_OPERATION_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+            performanceTracker = new UntrackedPerformanceTracker(getLogger(), maxDiskOperationMillis);
+            fileFilter = createFileFilter(context, performanceTracker, applyFilters);
+        }
         int maxDepth = recurse ? Integer.MAX_VALUE : 1;
 
         final BiPredicate<Path, BasicFileAttributes> matcher = new BiPredicate<Path, BasicFileAttributes>() {
@@ -515,7 +534,7 @@
 
             @Override
             public boolean test(final Path path, final BasicFileAttributes attributes) {
-                if (!isScheduled()) {
+                if (!isScheduled() && listingMode == ListingMode.EXECUTION) {
                     throw new ProcessorStoppedException();
                 }
 
@@ -536,10 +555,11 @@
                 final TimedOperationKey operationKey = performanceTracker.beginOperation(DiskOperation.FILTER, relativePath, filename);
 
                 try {
-                    if (!isDirectory && (minTimestamp == null || attributes.lastModifiedTime().toMillis() >= minTimestamp)
-                        && fileFilter.test(path, attributes)) {
-                        // We store the attributes for each Path we are returning in order to avoid to
-                        // retrieve them again later when creating the FileInfo
+                    final boolean matchesFilters = (minTimestamp == null || attributes.lastModifiedTime().toMillis() >= minTimestamp)
+                            && fileFilter.test(path, attributes);
+                    if (!isDirectory && (!applyFilters || matchesFilters)) {
+                        // We store the attributes for each Path we are returning in order to avoid
+                        // retrieving them again later when creating the FileInfo
                         lastModifiedMap.put(path, attributes);
 
                         return true;
@@ -562,17 +582,17 @@
 
             Files.walkFileTree(basePath, Collections.singleton(FileVisitOption.FOLLOW_LINKS), maxDepth, new FileVisitor<Path>() {
                 @Override
-                public FileVisitResult preVisitDirectory(final Path dir, final BasicFileAttributes attributes) throws IOException {
+                public FileVisitResult preVisitDirectory(final Path dir, final BasicFileAttributes attributes) {
                     if (Files.isReadable(dir)) {
                         return FileVisitResult.CONTINUE;
                     } else {
-                        getLogger().debug("The following directory is not readable: {}", new Object[] {dir.toString()});
+                        getLogger().debug("The following directory is not readable: {}", new Object[]{dir.toString()});
                         return FileVisitResult.SKIP_SUBTREE;
                     }
                 }
 
                 @Override
-                public FileVisitResult visitFile(final Path path, final BasicFileAttributes attributes) throws IOException {
+                public FileVisitResult visitFile(final Path path, final BasicFileAttributes attributes) {
                     if (matcher.test(path, attributes)) {
                         final File file = path.toFile();
                         final BasicFileAttributes fileAttributes = lastModifiedMap.get(path);
@@ -591,20 +611,20 @@
                 }
 
                 @Override
-                public FileVisitResult visitFileFailed(final Path path, final IOException e) throws IOException {
+                public FileVisitResult visitFileFailed(final Path path, final IOException e) {
                     if (e instanceof AccessDeniedException) {
-                        getLogger().debug("The following file is not readable: {}", new Object[] {path.toString()});
+                        getLogger().debug("The following file is not readable: {}", new Object[]{path.toString()});
                         return FileVisitResult.SKIP_SUBTREE;
                     } else {
-                        getLogger().error("Error during visiting file {}: {}", new Object[] {path.toString(), e.getMessage()}, e);
+                        getLogger().error("Error during visiting file {}: {}", new Object[]{path.toString(), e.getMessage()}, e);
                         return FileVisitResult.TERMINATE;
                     }
                 }
 
                 @Override
-                public FileVisitResult postVisitDirectory(final Path dir, final IOException e) throws IOException {
+                public FileVisitResult postVisitDirectory(final Path dir, final IOException e) {
                     if (e != null) {
-                        getLogger().error("Error during visiting directory {}: {}", new Object[] {dir.toString(), e.getMessage()}, e);
+                        getLogger().error("Error during visiting directory {}: {}", new Object[]{dir.toString(), e.getMessage()}, e);
                     }
 
                     return FileVisitResult.CONTINUE;
@@ -619,11 +639,18 @@
             getLogger().info("Processor was stopped so will not complete listing of Files");
             return Collections.emptyList();
         } finally {
-            performanceTracker.completeActiveDirectory();
+            if (performanceTracker != null) {
+                performanceTracker.completeActiveDirectory();
+            }
         }
     }
 
     @Override
+    protected String getListingContainerName(final ProcessContext context) {
+        return String.format("%s Directory [%s]", context.getProperty(DIRECTORY_LOCATION).getValue(), getPath(context));
+    }
+
+    @Override
     protected boolean isListingResetNecessary(final PropertyDescriptor property) {
         return DIRECTORY.equals(property)
                 || RECURSE.equals(property)
@@ -636,7 +663,8 @@
                 || IGNORE_HIDDEN_FILES.equals(property);
     }
 
-    private BiPredicate<Path, BasicFileAttributes> createFileFilter(final ProcessContext context) {
+    private BiPredicate<Path, BasicFileAttributes> createFileFilter(final ProcessContext context, final PerformanceTracker performanceTracker,
+                                                                    final boolean applyFilters) {
         final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
         final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
         final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
@@ -652,6 +680,10 @@
         final Path basePath = Paths.get(indir);
 
         return (path, attributes) -> {
+            if (!applyFilters) {
+                return true;
+            }
+
             if (minSize > attributes.size()) {
                 return false;
             }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
index 95d475c..c2f3f11 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -17,10 +17,6 @@
 
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -32,12 +28,15 @@
 import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.serialization.record.RecordSchema;
 
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Iterator;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 
 public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
     public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
@@ -104,11 +103,21 @@
     }
 
     @Override
-    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
+    protected Integer countUnfilteredListing(final ProcessContext context) throws IOException {
+        return performListing(context, 0L, ListingMode.CONFIGURATION_VERIFICATION, false).size();
+    }
+
+    @Override
+    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode) throws IOException {
+        return performListing(context, minTimestamp, listingMode, true);
+    }
+
+    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode,
+                                            final boolean applyFilters) throws IOException {
         final FileTransfer transfer = getFileTransfer(context);
         final List<FileInfo> listing;
         try {
-            listing = transfer.getListing();
+            listing = transfer.getListing(applyFilters);
         } finally {
             IOUtils.closeQuietly(transfer);
         }
@@ -129,6 +138,12 @@
     }
 
     @Override
+    protected String getListingContainerName(final ProcessContext context) {
+        return String.format("Remote Directory [%s] on [%s:%s]", getPath(context), context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(),
+                context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions().getValue());
+    }
+
+    @Override
     protected RecordSchema getRecordSchema() {
         return FileInfo.getRecordSchema();
     }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index 704cebc..c2c0aeb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -146,11 +146,17 @@
     }
 
     @Override
-    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
-        final List<FileInfo> listing = super.performListing(context, minTimestamp);
+    protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode,
+                                            final boolean applyFilters) throws IOException {
+        final List<FileInfo> listing = super.performListing(context, minTimestamp, listingMode, applyFilters);
 
+        if (!applyFilters) {
+            return listing;
+        }
+
+        final Predicate<FileInfo> filePredicate = listingMode == ListingMode.EXECUTION ? this.fileFilter : createFileFilter(context);
         return listing.stream()
-                .filter(fileFilter)
+                .filter(filePredicate)
                 .collect(Collectors.toList());
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index b7ac6a9..c3a35fa 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -190,14 +190,14 @@
     }
 
     @Override
-    public List<FileInfo> getListing() throws IOException {
+    public List<FileInfo> getListing(final boolean applyFilters) throws IOException {
         final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
         final int depth = 0;
         final int maxResults = ctx.getProperty(FileTransfer.REMOTE_POLL_BATCH_SIZE).asInteger();
-        return getListing(path, depth, maxResults);
+        return getListing(path, depth, maxResults, applyFilters);
     }
 
-    private List<FileInfo> getListing(final String path, final int depth, final int maxResults) throws IOException {
+    private List<FileInfo> getListing(final String path, final int depth, final int maxResults, final boolean applyFilters) throws IOException {
         final List<FileInfo> listing = new ArrayList<>();
         if (maxResults < 1) {
             return listing;
@@ -266,7 +266,7 @@
             // OR if is a link and we're supposed to follow symlink
             if ((recurse && file.isDirectory()) || (symlink && file.isSymbolicLink())) {
                 try {
-                    listing.addAll(getListing(newFullForwardPath, depth + 1, maxResults - count));
+                    listing.addAll(getListing(newFullForwardPath, depth + 1, maxResults - count, applyFilters));
                 } catch (final IOException e) {
                     logger.error("Unable to get listing from " + newFullForwardPath + "; skipping", e);
                 }
@@ -274,8 +274,8 @@
 
             // if is not a directory and is not a link and it matches
             // FILE_FILTER_REGEX - then let's add it
-            if (!file.isDirectory() && !file.isSymbolicLink() && pathFilterMatches) {
-                if (pattern == null || pattern.matcher(filename).matches()) {
+            if (!file.isDirectory() && !file.isSymbolicLink() && (pathFilterMatches || !applyFilters)) {
+                if (pattern == null || !applyFilters || pattern.matcher(filename).matches()) {
                     listing.add(newFileInfo(file, path));
                     count++;
                 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
index 64bb130..e109714 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java
@@ -34,7 +34,7 @@
 
     String getHomeDirectory(FlowFile flowFile) throws IOException;
 
-    List<FileInfo> getListing() throws IOException;
+    List<FileInfo> getListing(boolean applyFilters) throws IOException;
 
     FlowFile getRemoteFile(String remoteFileName, FlowFile flowFile, ProcessSession session) throws ProcessException, IOException;
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
index fd77991..1082dd7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java
@@ -263,7 +263,7 @@
     }
 
     @Override
-    public List<FileInfo> getListing() throws IOException {
+    public List<FileInfo> getListing(final boolean applyFilters) throws IOException {
         final String path = ctx.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions().getValue();
         final int depth = 0;
 
@@ -277,11 +277,12 @@
         }
 
         final List<FileInfo> listing = new ArrayList<>(1000);
-        getListing(path, depth, maxResults, listing);
+        getListing(path, depth, maxResults, listing, applyFilters);
         return listing;
     }
 
-    protected void getListing(final String path, final int depth, final int maxResults, final List<FileInfo> listing) throws IOException {
+    protected void getListing(final String path, final int depth, final int maxResults, final List<FileInfo> listing,
+                              final boolean applyFilters) throws IOException {
         if (maxResults < 1 || listing.size() >= maxResults) {
             return;
         }
@@ -346,8 +347,8 @@
                 }
 
                 // if is not a directory and is not a link and it matches FILE_FILTER_REGEX - then let's add it
-                if (!entry.isDirectory() && !(entry.getAttributes().getType() == FileMode.Type.SYMLINK) && isPathMatch) {
-                    if (pattern == null || pattern.matcher(entryFilename).matches()) {
+                if (!entry.isDirectory() && !(entry.getAttributes().getType() == FileMode.Type.SYMLINK) && (!applyFilters || isPathMatch)) {
+                    if (pattern == null || !applyFilters || pattern.matcher(entryFilename).matches()) {
                         listing.add(newFileInfo(entry, path));
                     }
                 }
@@ -379,7 +380,7 @@
             final String newFullForwardPath = newFullPath.getPath().replace("\\", "/");
 
             try {
-                getListing(newFullForwardPath, depth + 1, maxResults, listing);
+                getListing(newFullForwardPath, depth + 1, maxResults, listing, applyFilters);
             } catch (final IOException e) {
                 logger.error("Unable to get listing from " + newFullForwardPath + "; skipping", e);
             }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index b6af938..7c3caa2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -18,6 +18,8 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.commons.lang3.SystemUtils;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -233,27 +235,34 @@
         assertTrue(file1.createNewFile());
         assertTrue(file1.setLastModified(time4millis));
 
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object.  Of that, 1 matches the filter.");
+
         // process first file and set new timestamp
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles1.size());
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object.  Of that, 1 matches the filter.");
 
         // create second file
         final File file2 = new File(TESTDIR + "/listing2.txt");
         assertTrue(file2.createNewFile());
         assertTrue(file2.setLastModified(time2millis));
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects.  Of those, 2 match the filter.");
 
         // process second file after timestamp
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles2.size());
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects.  Of those, 2 match the filter.");
 
         // create third file
         final File file3 = new File(TESTDIR + "/listing3.txt");
         assertTrue(file3.createNewFile());
         assertTrue(file3.setLastModified(time4millis));
+        // 0 are new because the timestamp is before the min listed timestamp
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 3 match the filter.");
 
         // process third file before timestamp
         runNext();
@@ -264,6 +273,7 @@
         // force state to reset and process all files
         runner.removeProperty(ListFile.DIRECTORY);
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 3 match the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -271,6 +281,7 @@
 
         runNext();
         runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 3 match the filter.");
     }
 
     @Test
@@ -309,6 +320,7 @@
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runNext.apply(true);
         runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 3 match the filter.");
 
         // processor updates internal state, it shouldn't pick the same ones.
         runNext.apply(false);
@@ -323,6 +335,7 @@
         assertEquals(2, successFiles2.size());
         assertEquals(file2.getName(), successFiles2.get(0).getAttribute("filename"));
         assertEquals(file1.getName(), successFiles2.get(1).getAttribute("filename"));
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 2 match the filter.");
 
         // exclude newest
         runner.setProperty(ListFile.MIN_AGE, age1);
@@ -333,6 +346,7 @@
         assertEquals(2, successFiles3.size());
         assertEquals(file3.getName(), successFiles3.get(0).getAttribute("filename"));
         assertEquals(file2.getName(), successFiles3.get(1).getAttribute("filename"));
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 2 match the filter.");
 
         // exclude oldest and newest
         runner.setProperty(ListFile.MIN_AGE, age1);
@@ -342,6 +356,7 @@
         final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(1, successFiles4.size());
         assertEquals(file2.getName(), successFiles4.get(0).getAttribute("filename"));
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 1 matches the filter.");
 
     }
 
@@ -377,26 +392,31 @@
 
         // check all files
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 3 match the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(3, successFiles1.size());
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 3 match the filter.");
 
         // exclude largest
         runner.removeProperty(ListFile.MIN_AGE);
         runner.removeProperty(ListFile.MAX_AGE);
         runner.setProperty(ListFile.MIN_SIZE, "0 b");
         runner.setProperty(ListFile.MAX_SIZE, "7500 b");
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 2 match the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
         assertEquals(2, successFiles2.size());
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 2 match the filter.");
 
         // exclude smallest
         runner.removeProperty(ListFile.MIN_AGE);
         runner.removeProperty(ListFile.MAX_AGE);
         runner.setProperty(ListFile.MIN_SIZE, "2500 b");
         runner.removeProperty(ListFile.MAX_SIZE);
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 2 match the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -407,6 +427,7 @@
         runner.removeProperty(ListFile.MAX_AGE);
         runner.setProperty(ListFile.MIN_SIZE, "2500 b");
         runner.setProperty(ListFile.MAX_SIZE, "7500 b");
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 1 matches the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -444,6 +465,7 @@
         runner.removeProperty(ListFile.MIN_SIZE);
         runner.removeProperty(ListFile.MAX_SIZE);
         runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false");
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects.  Of those, 2 match the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -451,6 +473,7 @@
 
         // exclude hidden
         runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true");
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects.  Of those, 1 matches the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -472,6 +495,7 @@
 
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.FILE_FILTER, ".*");
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 2 objects.  Of those, 1 matches the filter.");
         runNext();
 
         final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -484,30 +508,33 @@
         assertTrue(subdir.mkdir());
         assertTrue(subdir.setReadable(false));
 
-        final File file1 = new File(TESTDIR + "/subdir/unreadable.txt");
-        assertTrue(file1.createNewFile());
-        assertTrue(file1.setReadable(false));
+        try {
+            final File file1 = new File(TESTDIR + "/subdir/unreadable.txt");
+            assertTrue(file1.createNewFile());
+            assertTrue(file1.setReadable(false));
 
-        final File file2 = new File(TESTDIR + "/subdir/readable.txt");
-        assertTrue(file2.createNewFile());
+            final File file2 = new File(TESTDIR + "/subdir/readable.txt");
+            assertTrue(file2.createNewFile());
 
-        final File file3 = new File(TESTDIR + "/secondReadable.txt");
-        assertTrue(file3.createNewFile());
+            final File file3 = new File(TESTDIR + "/secondReadable.txt");
+            assertTrue(file3.createNewFile());
 
-        final long now = getTestModifiedTime();
-        assertTrue(file1.setLastModified(now));
-        assertTrue(file2.setLastModified(now));
-        assertTrue(file3.setLastModified(now));
+            final long now = getTestModifiedTime();
+            assertTrue(file1.setLastModified(now));
+            assertTrue(file2.setLastModified(now));
+            assertTrue(file3.setLastModified(now));
 
-        runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
-        runner.setProperty(ListFile.FILE_FILTER, ".*");
-        runNext();
+            runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
+            runner.setProperty(ListFile.FILE_FILTER, ".*");
+            assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object.  Of that, 1 matches the filter.");
+            runNext();
 
-        final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
-        assertEquals(1, successFiles.size());
-        assertEquals("secondReadable.txt", successFiles.get(0).getAttribute("filename"));
-
-        subdir.setReadable(true);
+            final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
+            assertEquals(1, successFiles.size());
+            assertEquals("secondReadable.txt", successFiles.get(0).getAttribute("filename"));
+        } finally {
+            subdir.setReadable(true);
+        }
     }
 
     @Test
@@ -527,6 +554,7 @@
         // Run with privileges and with fitting filter
         runner.setProperty(ListFile.FILE_FILTER, "file.*");
         assertTrue(file.setLastModified(getTestModifiedTime()));
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object.  Of that, 1 matches the filter.");
         runNext();
 
         final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -535,6 +563,7 @@
         // Run without privileges and with fitting filter
         assertTrue(file.setReadable(false));
         assertTrue(file.setLastModified(getTestModifiedTime()));
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object.  Of that, 0 match the filter.");
         runNext();
 
         final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -566,6 +595,7 @@
         // check all files
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects.  Of those, 4 match the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -574,9 +604,11 @@
         // filter file on pattern
         // Modifying FILE_FILTER property reset listing status, so these files will be listed again.
         runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*");
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects.  Of those, 2 match the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 2);
 
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects.  Of those, 2 match the filter.");
         runNext();
         runner.assertTransferCount(ListFile.REL_SUCCESS, 0);
     }
@@ -611,6 +643,7 @@
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
         runner.setProperty(ListFile.RECURSE, "true");
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects.  Of those, 4 match the filter.");
         runNext();
 
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@@ -620,6 +653,7 @@
         // filter path on pattern subdir1
         runner.setProperty(ListFile.PATH_FILTER, "subdir1");
         runner.setProperty(ListFile.RECURSE, "true");
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects.  Of those, 3 match the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -628,6 +662,7 @@
         // filter path on pattern subdir2
         runner.setProperty(ListFile.PATH_FILTER, "subdir2");
         runner.setProperty(ListFile.RECURSE, "true");
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 4 objects.  Of those, 1 matches the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -659,6 +694,7 @@
         // check all files
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.RECURSE, "true");
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 3 match the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS, 3);
         final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -683,8 +719,9 @@
         }
         assertEquals(3, successFiles1.size());
 
-        // exclude hidden
+        // don't recurse
         runner.setProperty(ListFile.RECURSE, "false");
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 1 object.  Of that, 1 matches the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
@@ -710,6 +747,7 @@
         // check all files
         runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
         runner.setProperty(ListFile.RECURSE, "true");
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 3 match the filter.");
         runNext();
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
         runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
@@ -802,6 +840,7 @@
         makeTestFile("/batch1-age5.txt", time5millis, fileTimes);
 
         // check files
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 3 objects.  Of those, 3 match the filter.");
         runNext();
 
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@@ -815,6 +854,7 @@
         // should be ignored since it's older than age3
         makeTestFile("/batch2-age4.txt", time4millis, fileTimes);
 
+        assertVerificationOutcome(Outcome.SUCCESSFUL, "Successfully listed .* Found 6 objects.  Of those, 6 match the filter.");
         runNext();
 
         runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
@@ -880,4 +920,14 @@
             }
         }
     }
+
+    private void assertVerificationOutcome(final Outcome expectedOutcome, final String expectedExplanationRegex) {
+        final List<ConfigVerificationResult> results = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+
+        assertEquals(1, results.size());
+        final ConfigVerificationResult result = results.get(0);
+        assertEquals(expectedOutcome, result.getOutcome());
+        assertTrue(String.format("Expected verification result to match pattern [%s].  Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()),
+                result.getExplanation().matches(expectedExplanationRegex));
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
index f864e12..4027ddd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListSFTP.java
@@ -29,7 +29,10 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.processor.util.list.AbstractListProcessor;
 import org.apache.nifi.processors.standard.util.FTPTransfer;
 import org.apache.nifi.processors.standard.util.FileInfo;
@@ -47,6 +50,7 @@
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestListSFTP {
     @Rule
@@ -95,7 +99,7 @@
             protected FileTransfer getFileTransfer(ProcessContext context) {
                 return new SFTPTransfer(context, getLogger()){
                     @Override
-                    protected void getListing(String path, int depth, int maxResults, List<FileInfo> listing) throws IOException {
+                    protected void getListing(String path, int depth, int maxResults, List<FileInfo> listing, boolean applyFilters) throws IOException {
                         if (path.contains("subdir")) {
                             reachScanningSubDir.countDown();
                             try {
@@ -105,7 +109,7 @@
                             }
                         }
 
-                        super.getListing(path, depth, maxResults, listing);
+                        super.getListing(path, depth, maxResults, listing, applyFilters);
                     }
                 };
             }
@@ -193,6 +197,7 @@
         Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2);
 
         runner.run();
+        assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects.  Of those, 3 match the filter.");
 
         runner.assertTransferCount(ListSFTP.REL_SUCCESS, 3);
 
@@ -231,10 +236,22 @@
 
         runner.run();
 
+        assertVerificationOutcome(runner, Outcome.SUCCESSFUL, ".* Found 3 objects.  Of those, 1 matches the filter.");
         runner.assertTransferCount(ListSFTP.REL_SUCCESS, 1);
 
         final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(ListSFTP.REL_SUCCESS).get(0);
         //the only file between the limits
         retrievedFile.assertAttributeEquals("filename", "file.txt");
     }
+
+    private void assertVerificationOutcome(final TestRunner runner, final Outcome expectedOutcome, final String expectedExplanationRegex) {
+        final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor())
+                .verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+
+        assertEquals(1, results.size());
+        final ConfigVerificationResult result = results.get(0);
+        assertEquals(expectedOutcome, result.getOutcome());
+        assertTrue(String.format("Expected verification result to match pattern [%s].  Actual explanation was: %s", expectedExplanationRegex, result.getExplanation()),
+                result.getExplanation().matches(expectedExplanationRegex));
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java
index de8c8b6..50813c9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/ITestSFTPTransferWithSSHTestServer.java
@@ -144,7 +144,7 @@
         properties.put(SFTPTransfer.REMOTE_PATH, DIR_2);
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(2, listing.size());
 
@@ -167,7 +167,7 @@
         properties.put(SFTPTransfer.IGNORE_DOTTED_FILES, "false");
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(3, listing.size());
 
@@ -183,7 +183,7 @@
         properties.put(SFTPTransfer.RECURSIVE_SEARCH, "false");
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(0, listing.size());
         }
@@ -196,7 +196,7 @@
         properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(4, listing.size());
         }
@@ -210,7 +210,7 @@
         properties.put(SFTPTransfer.FOLLOW_SYMLINK, "false");
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(0, listing.size());
         }
@@ -224,7 +224,7 @@
         properties.put(SFTPTransfer.FOLLOW_SYMLINK, "true");
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(4, listing.size());
         }
@@ -238,7 +238,7 @@
 
         // first listing is without batch size and shows 4 results
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(4, listing.size());
         }
@@ -247,7 +247,7 @@
         properties.put(SFTPTransfer.REMOTE_POLL_BATCH_SIZE, "2");
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(2, listing.size());
         }
@@ -263,7 +263,7 @@
         properties.put(SFTPTransfer.FILE_FILTER_REGEX, fileFilterRegex);
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(2, listing.size());
 
@@ -282,7 +282,7 @@
         properties.put(SFTPTransfer.PATH_FILTER_REGEX, pathFilterRegex);
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(2, listing.size());
 
@@ -306,7 +306,7 @@
         properties.put(SFTPTransfer.RECURSIVE_SEARCH, "true");
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
-            transfer.getListing();
+            transfer.getListing(true);
         }
     }
 
@@ -317,7 +317,7 @@
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
             // verify the directory has two files
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(2, listing.size());
 
@@ -327,7 +327,7 @@
             }
 
             // verify there are now zero files
-            final List<FileInfo> listingAfterDelete = transfer.getListing();
+            final List<FileInfo> listingAfterDelete = transfer.getListing(true);
             assertNotNull(listingAfterDelete);
             assertEquals(0, listingAfterDelete.size());
         }
@@ -340,7 +340,7 @@
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
             // verify the directory has two files
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(2, listing.size());
 
@@ -352,7 +352,7 @@
             }
 
             // verify there are now zero files
-            final List<FileInfo> listingAfterDelete = transfer.getListing();
+            final List<FileInfo> listingAfterDelete = transfer.getListing(true);
             assertNotNull(listingAfterDelete);
             assertEquals(0, listingAfterDelete.size());
         }
@@ -374,7 +374,7 @@
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
             // verify the directory exists
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(0, listing.size());
 
@@ -382,7 +382,7 @@
 
             // verify the directory no longer exists
             try {
-                transfer.getListing();
+                transfer.getListing(true);
                 Assert.fail("Should have thrown exception");
             } catch (FileNotFoundException e) {
                 // nothing to do, expected
@@ -408,7 +408,7 @@
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
             // verify the directory does not exist
             try {
-                transfer.getListing();
+                transfer.getListing(true);
                 Assert.fail("Should have failed");
             } catch (FileNotFoundException e) {
                 // Nothing to do, expected
@@ -418,7 +418,7 @@
             transfer.ensureDirectoryExists(null, new File(absolutePath));
 
             // verify the directory now exists
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(0, listing.size());
         }
@@ -433,7 +433,7 @@
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
             // verify the directory does not exist
             try {
-                transfer.getListing();
+                transfer.getListing(true);
                 Assert.fail("Should have failed");
             } catch (FileNotFoundException e) {
                 // Nothing to do, expected
@@ -443,7 +443,7 @@
             transfer.ensureDirectoryExists(null, new File(absolutePath));
 
             // verify the directory now exists
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(0, listing.size());
         }
@@ -456,7 +456,7 @@
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
             // verify the directory already exists
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(2, listing.size());
 
@@ -475,7 +475,7 @@
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
             // verify the directory does not exist
             try {
-                transfer.getListing();
+                transfer.getListing(true);
                 Assert.fail("Should have failed");
             } catch (FileNotFoundException e) {
                 // Nothing to do, expected
@@ -485,7 +485,7 @@
             transfer.ensureDirectoryExists(null, new File(absolutePath));
 
             // verify the directory now exists
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(0, listing.size());
         }
@@ -499,7 +499,7 @@
 
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
             // verify the directory already exists
-            final List<FileInfo> listing = transfer.getListing();
+            final List<FileInfo> listing = transfer.getListing(true);
             assertNotNull(listing);
             assertEquals(2, listing.size());
 
@@ -519,7 +519,7 @@
         try(final SFTPTransfer transfer = createSFTPTransfer(properties)) {
             // verify the directory does not exist
             try {
-                transfer.getListing();
+                transfer.getListing(true);
                 Assert.fail("Should have failed");
             } catch (FileNotFoundException e) {
                 // Nothing to do, expected