NIFI-8739 Penalized flowfiles should be able to be polled from the queue in some cases (#5189)

diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index fbbd4bb..d4b6b2e 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -147,18 +147,26 @@
 
     /**
      * @param expiredRecords expired records
+     * @param pollStrategy strategy of polling
      * @return the next flow file on the queue; null if empty
      */
+    FlowFileRecord poll(Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
+
     FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
 
     /**
      * @param maxResults limits how many results can be polled
      * @param expiredRecords for expired records
+     * @param pollStrategy strategy of polling
      * @return the next flow files on the queue up to the max results; null if
      *         empty
      */
+    List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
+
     List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
 
+    List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
+
     List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
 
     String getFlowFileExpiration();
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/PollStrategy.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/PollStrategy.java
new file mode 100644
index 0000000..2f45ba2
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/PollStrategy.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+/**
+ * Represents a strategy that how to poll the queue.
+ */
+public enum PollStrategy {
+
+    ALL_FLOWFILES,
+
+    UNPENALIZED_FLOWFILES
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index bea9572..1cf78b6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -32,6 +32,7 @@
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.FlowFileQueueFactory;
 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.queue.PollStrategy;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.FlowFileFilter;
@@ -341,12 +342,12 @@
 
     @Override
     public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
-        return flowFileQueue.poll(filter, expiredRecords);
+        return flowFileQueue.poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
     }
 
     @Override
     public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
-        return flowFileQueue.poll(expiredRecords);
+        return flowFileQueue.poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 614abc4..769fad5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -24,6 +24,7 @@
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.lifecycle.TaskTermination;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.PollStrategy;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
@@ -2284,7 +2285,7 @@
         for (final Connection conn : context.getConnectable().getIncomingConnections()) {
             do {
                 expired.clear();
-                conn.getFlowFileQueue().poll(filter, expired);
+                conn.getFlowFileQueue().poll(filter, expired, PollStrategy.ALL_FLOWFILES);
                 removeExpired(expired, conn);
             } while (!expired.isEmpty());
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
index aac0659..de49ad2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
@@ -27,6 +27,7 @@
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.provenance.ProvenanceEventBuilder;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
@@ -40,6 +41,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -483,4 +485,19 @@
             loadBalanceReadLock.unlock();
         }
     }
+
+    @Override
+    public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
+        return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+    }
+
+    @Override
+    public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
+        return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+    }
+
+    @Override
+    public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
+        return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java
index 9a220ae..db0e30e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java
@@ -51,13 +51,13 @@
         }
     }
 
-    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final long waitMillis) throws InterruptedException {
+    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final long waitMillis, final PollStrategy pollStrategy) throws InterruptedException {
         final long maxTimestamp = System.currentTimeMillis() + waitMillis;
 
         synchronized (monitor) {
             FlowFileRecord flowFile = null;
             do {
-                flowFile = super.poll(expiredRecords, expirationMillis);
+                flowFile = super.poll(expiredRecords, expirationMillis, pollStrategy);
                 if (flowFile != null) {
                     return flowFile;
                 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
index 903ed49..a87fcfe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
@@ -124,16 +124,16 @@
 
 
     @Override
-    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
+    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
         // First check if we have any records Pre-Fetched.
         final long expirationMillis = getFlowFileExpiration(TimeUnit.MILLISECONDS);
-        return queue.poll(expiredRecords, expirationMillis);
+        return queue.poll(expiredRecords, expirationMillis, pollStrategy);
     }
 
 
     @Override
-    public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) {
-        return queue.poll(maxResults, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS));
+    public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+        return queue.poll(maxResults, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS), pollStrategy);
     }
 
 
@@ -184,8 +184,8 @@
     }
 
     @Override
-    public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
-        return queue.poll(filter, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS));
+    public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+        return queue.poll(filter, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS), pollStrategy);
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index ec02009..34da62c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -435,33 +435,6 @@
         return getFlowFileQueueSize().isEmpty();
     }
 
-    public boolean isFlowFileAvailable() {
-        if (isEmpty()) {
-            return false;
-        }
-
-        readLock.lock();
-        try {
-            // If we have data in the active or swap queue that is penalized, then we know that all FlowFiles
-            // are penalized. As a result, we can say that no FlowFile is available.
-            FlowFileRecord firstRecord = activeQueue.peek();
-            if (firstRecord == null && !swapQueue.isEmpty()) {
-                firstRecord = swapQueue.get(0);
-            }
-
-            if (firstRecord == null) {
-                // If the queue is not empty, then all data is swapped out. We don't actually know whether or not the swapped out data is penalized, so we assume
-                // that it is not penalized and is therefore available.
-                return !isEmpty();
-            }
-
-            // We do have a FlowFile that was retrieved from the active or swap queue. It is available if it is not penalized.
-            return !firstRecord.isPenalized();
-        } finally {
-            readLock.unlock("isFlowFileAvailable");
-        }
-    }
-
     public boolean isActiveQueueEmpty() {
         final FlowFileQueueSize queueSize = getFlowFileQueueSize();
         return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0;
@@ -524,12 +497,16 @@
     }
 
     public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+        return poll(expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES);
+    }
+
+    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
         FlowFileRecord flowFile;
 
         // First check if we have any records Pre-Fetched.
         writeLock.lock();
         try {
-            flowFile = doPoll(expiredRecords, expirationMillis);
+            flowFile = doPoll(expiredRecords, expirationMillis, pollStrategy);
 
             if (flowFile != null) {
                 logger.trace("{} poll() returning {}", this, flowFile);
@@ -543,7 +520,7 @@
     }
 
 
-    private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+    private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
         FlowFileRecord flowFile;
         boolean isExpired;
 
@@ -562,7 +539,7 @@
                 if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
                     break;
                 }
-            } else if (flowFile != null && flowFile.isPenalized()) {
+            } else if (flowFile != null && flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
                 this.activeQueue.add(flowFile);
                 flowFile = null;
                 break;
@@ -581,12 +558,16 @@
     }
 
     public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+        return poll(maxResults, expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES);
+    }
+
+    public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
         final List<FlowFileRecord> records = new ArrayList<>(Math.min(1, maxResults));
 
         // First check if we have any records Pre-Fetched.
         writeLock.lock();
         try {
-            doPoll(records, maxResults, expiredRecords, expirationMillis);
+            doPoll(records, maxResults, expiredRecords, expirationMillis, pollStrategy);
         } finally {
             writeLock.unlock("poll(int, Set)");
         }
@@ -599,6 +580,10 @@
     }
 
     public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+        return poll(filter, expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES);
+    }
+
+    public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
         long bytesPulled = 0L;
         int flowFilesPulled = 0;
 
@@ -626,7 +611,7 @@
                     } else {
                         continue;
                     }
-                } else if (flowFile.isPenalized()) {
+                } else if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
                     this.activeQueue.add(flowFile);
                     break; // just stop searching because the rest are all penalized.
                 }
@@ -660,10 +645,10 @@
         }
     }
 
-    private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+    private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
         migrateSwapToActive();
 
-        final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords, expirationMillis);
+        final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords, expirationMillis, pollStrategy);
 
         long expiredBytes = 0L;
         for (final FlowFileRecord record : expiredRecords) {
@@ -701,7 +686,9 @@
     }
 
 
-    private long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+    private long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination,
+                            int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis,
+                            final PollStrategy pollStrategy) {
         long drainedSize = 0L;
         FlowFileRecord pulled;
 
@@ -712,7 +699,7 @@
                     break;
                 }
             } else {
-                if (pulled.isPenalized()) {
+                if (pulled.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
                     sourceQueue.add(pulled);
                     break;
                 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 1a1e187..6b8d418 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -32,6 +32,7 @@
 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
 import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.PollStrategy;
 import org.apache.nifi.controller.queue.QueueDiagnostics;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
@@ -930,22 +931,22 @@
     }
 
     @Override
-    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
-        final FlowFileRecord flowFile = localPartition.poll(expiredRecords);
+    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+        final FlowFileRecord flowFile = localPartition.poll(expiredRecords, pollStrategy);
         onAbort(expiredRecords);
         return flowFile;
     }
 
     @Override
-    public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
-        final List<FlowFileRecord> flowFiles = localPartition.poll(maxResults, expiredRecords);
+    public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+        final List<FlowFileRecord> flowFiles = localPartition.poll(maxResults, expiredRecords, pollStrategy);
         onAbort(expiredRecords);
         return flowFiles;
     }
 
     @Override
-    public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
-        final List<FlowFileRecord> flowFiles = localPartition.poll(filter, expiredRecords);
+    public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+        final List<FlowFileRecord> flowFiles = localPartition.poll(filter, expiredRecords, pollStrategy);
         onAbort(expiredRecords);
         return flowFiles;
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java
index 8e9b165..fe1baac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java
@@ -39,6 +39,7 @@
 
         out.writeLong(flowFile.getLineageStartDate());
         out.writeLong(flowFile.getEntryDate());
+        out.writeLong(flowFile.getPenaltyExpirationMillis());
     }
 
     private void writeString(final String value, final DataOutputStream out) throws IOException {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
index 9ee0e0e..84f0bab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
@@ -19,6 +19,7 @@
 
 import org.apache.nifi.controller.queue.FlowFileQueueContents;
 import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.PollStrategy;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.processor.FlowFileFilter;
 
@@ -46,8 +47,11 @@
      * Returns a single FlowFile with the highest priority that is available in the partition, or <code>null</code> if no FlowFile is available
      *
      * @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added
+     * @param pollStrategy strategy of polling
      * @return a single FlowFile with the highest priority that is available in the partition, or <code>null</code> if no FlowFile is available
      */
+    FlowFileRecord poll(Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
+
     FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
 
     /**
@@ -55,8 +59,11 @@
      *
      * @param maxResults the maximum number of FlowFiles to return
      * @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added
+     * @param pollStrategy strategy of polling
      * @return a List of FlowFiles (possibly empty) with the highest priority FlowFiles that are available in the partition
      */
+    List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
+
     List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
 
     /**
@@ -64,8 +71,11 @@
      *
      * @param filter the filter to determine whether or not a given FlowFile should be returned
      * @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added
+     * @param pollStrategy strategy of polling
      * @return a List of FlowFiles (possibly empty) with FlowFiles that matched the given filter
      */
+    List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
+
     List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
 
     /**
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
index 53f0f9f..5f69905 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
@@ -21,6 +21,7 @@
 import org.apache.nifi.controller.queue.DropFlowFileRequest;
 import org.apache.nifi.controller.queue.FlowFileQueueContents;
 import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.queue.PollStrategy;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
 import org.apache.nifi.controller.queue.StandardRemoteQueuePartitionDiagnostics;
@@ -150,7 +151,7 @@
 
     private FlowFileRecord getFlowFile() {
         final Set<FlowFileRecord> expired = new HashSet<>();
-        final FlowFileRecord flowFile = priorityQueue.poll(expired, flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS));
+        final FlowFileRecord flowFile = priorityQueue.poll(expired, flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS), PollStrategy.ALL_FLOWFILES);
         flowFileQueue.handleExpiredRecords(expired);
         return flowFile;
     }
@@ -225,22 +226,13 @@
             }
         };
 
-        // Consider the queue empty unless a FlowFile is available. This means that if the queue has only penalized FlowFiles, it will be considered empty.
-        // This is what we want for the purpose of load balancing the data. Otherwise, we would have a situation where we create a connection to the other node,
-        // determine that now FlowFile is available to send, and then notify the node of this and close the connection. And then this would repeat over and over
-        // until the FlowFile is no longer penalized. Instead, we want to consider the queue empty until a FlowFile is actually available, and only then bother
-        // creating the connection to send data.
-        final BooleanSupplier emptySupplier = this::isQueueEmpty;
+        final BooleanSupplier emptySupplier = priorityQueue::isEmpty;
         clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, emptySupplier, this::getFlowFile,
             failureCallback, successCallback, flowFileQueue::getLoadBalanceCompression, flowFileQueue::isPropagateBackpressureAcrossNodes);
 
         running = true;
     }
 
-    private boolean isQueueEmpty() {
-        return !priorityQueue.isFlowFileAvailable();
-    }
-
     public void onRemoved() {
         clientRegistry.unregister(flowFileQueue.getIdentifier(), nodeIdentifier);
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
index 7ecac18..de46d17 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
@@ -23,6 +23,7 @@
 import org.apache.nifi.controller.queue.DropFlowFileRequest;
 import org.apache.nifi.controller.queue.FlowFileQueueContents;
 import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.queue.PollStrategy;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileSwapManager;
@@ -191,7 +192,7 @@
                 // Wait up to #pollWaitMillis milliseconds to get a FlowFile. If none, then check if stopped
                 // and if not, poll again.
                 try {
-                    polled = queue.poll(expiredRecords, -1, pollWaitMillis);
+                    polled = queue.poll(expiredRecords, -1, pollWaitMillis, PollStrategy.ALL_FLOWFILES);
                 } catch (final InterruptedException ie) {
                     Thread.currentThread().interrupt();
                     continue;
@@ -211,7 +212,7 @@
                 final List<FlowFileRecord> toDistribute = new ArrayList<>();
                 toDistribute.add(polled);
 
-                final List<FlowFileRecord> additionalRecords = queue.poll(999, expiredRecords, -1);
+                final List<FlowFileRecord> additionalRecords = queue.poll(999, expiredRecords, -1, PollStrategy.ALL_FLOWFILES);
                 toDistribute.addAll(additionalRecords);
 
                 flowFileQueue.handleExpiredRecords(expiredRecords);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
index 075039c..03e8e18 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
@@ -23,6 +23,7 @@
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.FlowFileQueueContents;
 import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.PollStrategy;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.queue.SwappablePriorityQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
@@ -102,18 +103,33 @@
     }
 
     @Override
-    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
-        return priorityQueue.poll(expiredRecords, getExpiration());
+    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+        return priorityQueue.poll(expiredRecords, getExpiration(), pollStrategy);
     }
 
     @Override
-    public List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords) {
-        return priorityQueue.poll(maxResults, expiredRecords, getExpiration());
+    public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
+        return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
     }
 
     @Override
-    public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
-        return priorityQueue.poll(filter, expiredRecords, getExpiration());
+    public List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+        return priorityQueue.poll(maxResults, expiredRecords, getExpiration(), pollStrategy);
+    }
+
+    @Override
+    public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
+        return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+    }
+
+    @Override
+    public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+        return priorityQueue.poll(filter, expiredRecords, getExpiration(), pollStrategy);
+    }
+
+    @Override
+    public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
+        return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
     }
 
     private int getExpiration() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index 1e35bc7..858b76a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -509,6 +509,7 @@
 
         final long lineageStartDate = metadataIn.readLong();
         final long entryDate = metadataIn.readLong();
+        final long penaltyExpirationMillis = metadataIn.readLong();
 
         final ContentClaimTriple contentClaimTriple = consumeContent(dis, out, contentClaim, claimOffset, peerDescription, compression == LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
 
@@ -521,6 +522,7 @@
             .size(contentClaimTriple.getContentLength())
             .entryDate(entryDate)
             .lineageStart(lineageStartDate, lineageStartIndex.getAndIncrement())
+            .penaltyExpirationTime(penaltyExpirationMillis)
             .build();
 
         logger.debug("Received FlowFile {} with {} attributes and {} bytes of content", flowFileRecord, attributes.size(), contentClaimTriple.getContentLength());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
index 5cc3d7e..95a204b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
@@ -22,6 +22,7 @@
 import org.apache.nifi.controller.queue.DropFlowFileAction;
 import org.apache.nifi.controller.queue.DropFlowFileRequest;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.PollStrategy;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.queue.SwappablePriorityQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
@@ -326,6 +327,29 @@
     }
 
     @Test
+    public void testPollWithPenalizedFlowFile() {
+        final FlowFileRecord penalizedFlowFile = mock(FlowFileRecord.class);
+        when(penalizedFlowFile.isPenalized()).thenReturn(true);
+        assertTrue(queue.isEmpty());
+        queue.put(penalizedFlowFile);
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        FlowFileRecord polled = queue.poll(expiredRecords, 0, PollStrategy.UNPENALIZED_FLOWFILES);
+        assertNull(polled);
+
+        assertFalse(queue.isEmpty());
+
+        polled = queue.poll(expiredRecords, 0, PollStrategy.ALL_FLOWFILES);
+        assertNotNull(polled);
+        assertSame(penalizedFlowFile, polled);
+
+        // queue is still not empty because FlowFile has not yet been acknowledged.
+        queue.acknowledge(polled);
+
+        assertTrue(queue.isEmpty());
+    }
+
+    @Test
     public void testPollWithOnlyExpiredFlowFile() {
         final FlowFileRecord expiredFlowFile = mock(FlowFileRecord.class);
         when(expiredFlowFile.getEntryDate()).thenReturn(System.currentTimeMillis() - 5000L);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
index cfe4e2b..2ae0f0b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
@@ -148,7 +148,7 @@
 
         expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE);
         expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
-        expectedDos.writeInt(68); // metadata length
+        expectedDos.writeInt(76); // metadata length
         expectedDos.writeInt(1); // 1 attribute
         expectedDos.writeInt(4); // length of attribute
         expectedDos.write("uuid".getBytes());
@@ -156,13 +156,14 @@
         expectedDos.write(flowFile1.getAttribute("uuid").getBytes());
         expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date
         expectedDos.writeLong(flowFile1.getEntryDate()); // entry date
+        expectedDos.writeLong(flowFile1.getPenaltyExpirationMillis()); // penalty expiration time
         expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
         expectedDos.writeInt(5);
         expectedDos.write("hello".getBytes());
         expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
 
         expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
-        expectedDos.writeInt(68); // metadata length
+        expectedDos.writeInt(76); // metadata length
         expectedDos.writeInt(1); // 1 attribute
         expectedDos.writeInt(4); // length of attribute
         expectedDos.write("uuid".getBytes());
@@ -170,6 +171,7 @@
         expectedDos.write(flowFile2.getAttribute("uuid").getBytes());
         expectedDos.writeLong(flowFile2.getLineageStartDate()); // lineage start date
         expectedDos.writeLong(flowFile2.getEntryDate()); // entry date
+        expectedDos.writeLong(flowFile2.getPenaltyExpirationMillis()); // penalty expiration time
         expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
         expectedDos.writeInt(8);
         expectedDos.write("good-bye".getBytes());
@@ -235,7 +237,7 @@
 
         expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE);
         expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
-        expectedDos.writeInt(68); // metadata length
+        expectedDos.writeInt(76); // metadata length
         expectedDos.writeInt(1); // 1 attribute
         expectedDos.writeInt(4); // length of attribute
         expectedDos.write("uuid".getBytes());
@@ -243,6 +245,7 @@
         expectedDos.write(flowFile1.getAttribute("uuid").getBytes());
         expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date
         expectedDos.writeLong(flowFile1.getEntryDate()); // entry date
+        expectedDos.writeLong(flowFile1.getPenaltyExpirationMillis()); // penalty expiration time
 
         // first data frame
         expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
index bfd6f36..d74c213 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
@@ -633,6 +633,7 @@
 
             out.writeLong(0L); // lineage start date
             out.writeLong(0L); // entry date
+            out.writeLong(0L); // penalty expiration time
 
             dos.writeInt(baos.size());
             baos.writeTo(dos);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index 99de3a3..af26727 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -275,7 +275,7 @@
         Mockito.doAnswer(new Answer<List<FlowFileRecord>>() {
             @Override
             public List<FlowFileRecord> answer(InvocationOnMock invocation) throws Throwable {
-                return localFlowFileQueue.poll(invocation.getArgument(0), invocation.getArgument(1));
+                return localFlowFileQueue.poll((FlowFileFilter) invocation.getArgument(0), invocation.getArgument(1));
             }
         }).when(connection).poll(any(FlowFileFilter.class), any(Set.class));
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 44db6cc..712a9e7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -47,6 +47,7 @@
 import org.apache.nifi.controller.queue.LoadBalanceCompression;
 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.controller.queue.NopConnectionEventListener;
+import org.apache.nifi.controller.queue.PollStrategy;
 import org.apache.nifi.controller.queue.QueueDiagnostics;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.queue.StandardFlowFileQueue;
@@ -219,16 +220,31 @@
             }
 
             @Override
+            public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords, PollStrategy pollStrategy) {
+                return null;
+            }
+
+            @Override
             public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
                 return null;
             }
 
             @Override
+            public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, PollStrategy pollStrategy) {
+                return null;
+            }
+
+            @Override
             public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
                 return null;
             }
 
             @Override
+            public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, PollStrategy pollStrategy) {
+                return null;
+            }
+
+            @Override
             public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
                 return null;
             }
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java
index f2d78a2..3b4a621 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java
@@ -21,6 +21,7 @@
 import org.apache.nifi.controller.queue.ListFlowFileStatus;
 import org.apache.nifi.controller.queue.LoadBalanceCompression;
 import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.queue.PollStrategy;
 import org.apache.nifi.controller.queue.QueueDiagnostics;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileRecord;
@@ -162,7 +163,7 @@
     }
 
     @Override
-    public synchronized FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
+    public synchronized FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
         while (!flowFiles.isEmpty()) {
             final FlowFileRecord flowFile = flowFiles.peek();
             if (flowFile == null) {
@@ -178,7 +179,7 @@
                 continue;
             }
 
-            if (flowFile.isPenalized()) {
+            if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
                 return null;
             }
 
@@ -189,6 +190,11 @@
         return null;
     }
 
+    @Override
+    public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
+        return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+    }
+
     private boolean isExpired(final FlowFileRecord flowFile) {
         if (expirationMillis == 0L) {
             return false;
@@ -199,10 +205,10 @@
     }
 
     @Override
-    public synchronized List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords) {
+    public synchronized List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
         final List<FlowFileRecord> selected = new ArrayList<>(Math.min(maxResults, flowFiles.size()));
         for (int i=0; i < maxResults; i++) {
-            final FlowFileRecord flowFile = poll(expiredRecords);
+            final FlowFileRecord flowFile = poll(expiredRecords, pollStrategy);
             if (flowFile != null) {
                 selected.add(flowFile);
             }
@@ -216,7 +222,12 @@
     }
 
     @Override
-    public synchronized List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
+    public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
+        return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+    }
+
+    @Override
+    public synchronized List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
         final List<FlowFileRecord> selected = new ArrayList<>();
 
         // Use an iterator to iterate over all FlowFiles in the queue. This allows us to
@@ -235,7 +246,7 @@
                 continue;
             }
 
-            if (flowFile.isPenalized()) {
+            if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
                 break;
             }
 
@@ -255,6 +266,11 @@
     }
 
     @Override
+    public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
+        return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+    }
+
+    @Override
     public String getFlowFileExpiration() {
         return expirationMillis + " millis";
     }