NIFI-8739 Penalized flowfiles should be able to be polled from the queue in some cases (#5189)
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index fbbd4bb..d4b6b2e 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -147,18 +147,26 @@
/**
* @param expiredRecords expired records
+ * @param pollStrategy strategy of polling
* @return the next flow file on the queue; null if empty
*/
+ FlowFileRecord poll(Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
+
FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
/**
* @param maxResults limits how many results can be polled
* @param expiredRecords for expired records
+ * @param pollStrategy strategy of polling
* @return the next flow files on the queue up to the max results; null if
* empty
*/
+ List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
+
List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
+ List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
+
List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
String getFlowFileExpiration();
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/PollStrategy.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/PollStrategy.java
new file mode 100644
index 0000000..2f45ba2
--- /dev/null
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/PollStrategy.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue;
+
+/**
+ * Represents a strategy that how to poll the queue.
+ */
+public enum PollStrategy {
+
+ ALL_FLOWFILES,
+
+ UNPENALIZED_FLOWFILES
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index bea9572..1cf78b6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -32,6 +32,7 @@
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueFactory;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter;
@@ -341,12 +342,12 @@
@Override
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
- return flowFileQueue.poll(filter, expiredRecords);
+ return flowFileQueue.poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
- return flowFileQueue.poll(expiredRecords);
+ return flowFileQueue.poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 614abc4..769fad5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -24,6 +24,7 @@
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.lifecycle.TaskTermination;
import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
@@ -2284,7 +2285,7 @@
for (final Connection conn : context.getConnectable().getIncomingConnections()) {
do {
expired.clear();
- conn.getFlowFileQueue().poll(filter, expired);
+ conn.getFlowFileQueue().poll(filter, expired, PollStrategy.ALL_FLOWFILES);
removeExpired(expired, conn);
} while (!expired.isEmpty());
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
index aac0659..de49ad2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
@@ -27,6 +27,7 @@
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
@@ -40,6 +41,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -483,4 +485,19 @@
loadBalanceReadLock.unlock();
}
}
+
+ @Override
+ public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
+ return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+ }
+
+ @Override
+ public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
+ return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+ }
+
+ @Override
+ public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
+ return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java
index 9a220ae..db0e30e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/BlockingSwappablePriorityQueue.java
@@ -51,13 +51,13 @@
}
}
- public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final long waitMillis) throws InterruptedException {
+ public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final long waitMillis, final PollStrategy pollStrategy) throws InterruptedException {
final long maxTimestamp = System.currentTimeMillis() + waitMillis;
synchronized (monitor) {
FlowFileRecord flowFile = null;
do {
- flowFile = super.poll(expiredRecords, expirationMillis);
+ flowFile = super.poll(expiredRecords, expirationMillis, pollStrategy);
if (flowFile != null) {
return flowFile;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
index 903ed49..a87fcfe 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
@@ -124,16 +124,16 @@
@Override
- public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
+ public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
// First check if we have any records Pre-Fetched.
final long expirationMillis = getFlowFileExpiration(TimeUnit.MILLISECONDS);
- return queue.poll(expiredRecords, expirationMillis);
+ return queue.poll(expiredRecords, expirationMillis, pollStrategy);
}
@Override
- public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords) {
- return queue.poll(maxResults, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS));
+ public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+ return queue.poll(maxResults, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS), pollStrategy);
}
@@ -184,8 +184,8 @@
}
@Override
- public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
- return queue.poll(filter, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS));
+ public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+ return queue.poll(filter, expiredRecords, getFlowFileExpiration(TimeUnit.MILLISECONDS), pollStrategy);
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
index ec02009..34da62c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java
@@ -435,33 +435,6 @@
return getFlowFileQueueSize().isEmpty();
}
- public boolean isFlowFileAvailable() {
- if (isEmpty()) {
- return false;
- }
-
- readLock.lock();
- try {
- // If we have data in the active or swap queue that is penalized, then we know that all FlowFiles
- // are penalized. As a result, we can say that no FlowFile is available.
- FlowFileRecord firstRecord = activeQueue.peek();
- if (firstRecord == null && !swapQueue.isEmpty()) {
- firstRecord = swapQueue.get(0);
- }
-
- if (firstRecord == null) {
- // If the queue is not empty, then all data is swapped out. We don't actually know whether or not the swapped out data is penalized, so we assume
- // that it is not penalized and is therefore available.
- return !isEmpty();
- }
-
- // We do have a FlowFile that was retrieved from the active or swap queue. It is available if it is not penalized.
- return !firstRecord.isPenalized();
- } finally {
- readLock.unlock("isFlowFileAvailable");
- }
- }
-
public boolean isActiveQueueEmpty() {
final FlowFileQueueSize queueSize = getFlowFileQueueSize();
return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0;
@@ -524,12 +497,16 @@
}
public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+ return poll(expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES);
+ }
+
+ public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
FlowFileRecord flowFile;
// First check if we have any records Pre-Fetched.
writeLock.lock();
try {
- flowFile = doPoll(expiredRecords, expirationMillis);
+ flowFile = doPoll(expiredRecords, expirationMillis, pollStrategy);
if (flowFile != null) {
logger.trace("{} poll() returning {}", this, flowFile);
@@ -543,7 +520,7 @@
}
- private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+ private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
FlowFileRecord flowFile;
boolean isExpired;
@@ -562,7 +539,7 @@
if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
break;
}
- } else if (flowFile != null && flowFile.isPenalized()) {
+ } else if (flowFile != null && flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
this.activeQueue.add(flowFile);
flowFile = null;
break;
@@ -581,12 +558,16 @@
}
public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+ return poll(maxResults, expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES);
+ }
+
+ public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
final List<FlowFileRecord> records = new ArrayList<>(Math.min(1, maxResults));
// First check if we have any records Pre-Fetched.
writeLock.lock();
try {
- doPoll(records, maxResults, expiredRecords, expirationMillis);
+ doPoll(records, maxResults, expiredRecords, expirationMillis, pollStrategy);
} finally {
writeLock.unlock("poll(int, Set)");
}
@@ -599,6 +580,10 @@
}
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+ return poll(filter, expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES);
+ }
+
+ public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
long bytesPulled = 0L;
int flowFilesPulled = 0;
@@ -626,7 +611,7 @@
} else {
continue;
}
- } else if (flowFile.isPenalized()) {
+ } else if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
this.activeQueue.add(flowFile);
break; // just stop searching because the rest are all penalized.
}
@@ -660,10 +645,10 @@
}
}
- private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+ private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) {
migrateSwapToActive();
- final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords, expirationMillis);
+ final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords, expirationMillis, pollStrategy);
long expiredBytes = 0L;
for (final FlowFileRecord record : expiredRecords) {
@@ -701,7 +686,9 @@
}
- private long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) {
+ private long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination,
+ int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis,
+ final PollStrategy pollStrategy) {
long drainedSize = 0L;
FlowFileRecord pulled;
@@ -712,7 +699,7 @@
break;
}
} else {
- if (pulled.isPenalized()) {
+ if (pulled.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
sourceQueue.add(pulled);
break;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 1a1e187..6b8d418 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -32,6 +32,7 @@
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueDiagnostics;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
@@ -930,22 +931,22 @@
}
@Override
- public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
- final FlowFileRecord flowFile = localPartition.poll(expiredRecords);
+ public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+ final FlowFileRecord flowFile = localPartition.poll(expiredRecords, pollStrategy);
onAbort(expiredRecords);
return flowFile;
}
@Override
- public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
- final List<FlowFileRecord> flowFiles = localPartition.poll(maxResults, expiredRecords);
+ public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+ final List<FlowFileRecord> flowFiles = localPartition.poll(maxResults, expiredRecords, pollStrategy);
onAbort(expiredRecords);
return flowFiles;
}
@Override
- public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
- final List<FlowFileRecord> flowFiles = localPartition.poll(filter, expiredRecords);
+ public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+ final List<FlowFileRecord> flowFiles = localPartition.poll(filter, expiredRecords, pollStrategy);
onAbort(expiredRecords);
return flowFiles;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java
index 8e9b165..fe1baac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/StandardLoadBalanceFlowFileCodec.java
@@ -39,6 +39,7 @@
out.writeLong(flowFile.getLineageStartDate());
out.writeLong(flowFile.getEntryDate());
+ out.writeLong(flowFile.getPenaltyExpirationMillis());
}
private void writeString(final String value, final DataOutputStream out) throws IOException {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
index 9ee0e0e..84f0bab 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
@@ -19,6 +19,7 @@
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.processor.FlowFileFilter;
@@ -46,8 +47,11 @@
* Returns a single FlowFile with the highest priority that is available in the partition, or <code>null</code> if no FlowFile is available
*
* @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added
+ * @param pollStrategy strategy of polling
* @return a single FlowFile with the highest priority that is available in the partition, or <code>null</code> if no FlowFile is available
*/
+ FlowFileRecord poll(Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
+
FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
/**
@@ -55,8 +59,11 @@
*
* @param maxResults the maximum number of FlowFiles to return
* @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added
+ * @param pollStrategy strategy of polling
* @return a List of FlowFiles (possibly empty) with the highest priority FlowFiles that are available in the partition
*/
+ List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
+
List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords);
/**
@@ -64,8 +71,11 @@
*
* @param filter the filter to determine whether or not a given FlowFile should be returned
* @param expiredRecords a Set of FlowFileRecord's to which any expired records that are encountered should be added
+ * @param pollStrategy strategy of polling
* @return a List of FlowFiles (possibly empty) with FlowFiles that matched the given filter
*/
+ List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy);
+
List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords);
/**
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
index 53f0f9f..5f69905 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
@@ -21,6 +21,7 @@
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.StandardRemoteQueuePartitionDiagnostics;
@@ -150,7 +151,7 @@
private FlowFileRecord getFlowFile() {
final Set<FlowFileRecord> expired = new HashSet<>();
- final FlowFileRecord flowFile = priorityQueue.poll(expired, flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS));
+ final FlowFileRecord flowFile = priorityQueue.poll(expired, flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS), PollStrategy.ALL_FLOWFILES);
flowFileQueue.handleExpiredRecords(expired);
return flowFile;
}
@@ -225,22 +226,13 @@
}
};
- // Consider the queue empty unless a FlowFile is available. This means that if the queue has only penalized FlowFiles, it will be considered empty.
- // This is what we want for the purpose of load balancing the data. Otherwise, we would have a situation where we create a connection to the other node,
- // determine that now FlowFile is available to send, and then notify the node of this and close the connection. And then this would repeat over and over
- // until the FlowFile is no longer penalized. Instead, we want to consider the queue empty until a FlowFile is actually available, and only then bother
- // creating the connection to send data.
- final BooleanSupplier emptySupplier = this::isQueueEmpty;
+ final BooleanSupplier emptySupplier = priorityQueue::isEmpty;
clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, emptySupplier, this::getFlowFile,
failureCallback, successCallback, flowFileQueue::getLoadBalanceCompression, flowFileQueue::isPropagateBackpressureAcrossNodes);
running = true;
}
- private boolean isQueueEmpty() {
- return !priorityQueue.isFlowFileAvailable();
- }
-
public void onRemoved() {
clientRegistry.unregister(flowFileQueue.getIdentifier(), nodeIdentifier);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
index 7ecac18..de46d17 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
@@ -23,6 +23,7 @@
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
@@ -191,7 +192,7 @@
// Wait up to #pollWaitMillis milliseconds to get a FlowFile. If none, then check if stopped
// and if not, poll again.
try {
- polled = queue.poll(expiredRecords, -1, pollWaitMillis);
+ polled = queue.poll(expiredRecords, -1, pollWaitMillis, PollStrategy.ALL_FLOWFILES);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
continue;
@@ -211,7 +212,7 @@
final List<FlowFileRecord> toDistribute = new ArrayList<>();
toDistribute.add(polled);
- final List<FlowFileRecord> additionalRecords = queue.poll(999, expiredRecords, -1);
+ final List<FlowFileRecord> additionalRecords = queue.poll(999, expiredRecords, -1, PollStrategy.ALL_FLOWFILES);
toDistribute.addAll(additionalRecords);
flowFileQueue.handleExpiredRecords(expiredRecords);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
index 075039c..03e8e18 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
@@ -23,6 +23,7 @@
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
@@ -102,18 +103,33 @@
}
@Override
- public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
- return priorityQueue.poll(expiredRecords, getExpiration());
+ public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+ return priorityQueue.poll(expiredRecords, getExpiration(), pollStrategy);
}
@Override
- public List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords) {
- return priorityQueue.poll(maxResults, expiredRecords, getExpiration());
+ public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
+ return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
@Override
- public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
- return priorityQueue.poll(filter, expiredRecords, getExpiration());
+ public List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+ return priorityQueue.poll(maxResults, expiredRecords, getExpiration(), pollStrategy);
+ }
+
+ @Override
+ public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
+ return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+ }
+
+ @Override
+ public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
+ return priorityQueue.poll(filter, expiredRecords, getExpiration(), pollStrategy);
+ }
+
+ @Override
+ public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
+ return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
private int getExpiration() {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index 1e35bc7..858b76a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -509,6 +509,7 @@
final long lineageStartDate = metadataIn.readLong();
final long entryDate = metadataIn.readLong();
+ final long penaltyExpirationMillis = metadataIn.readLong();
final ContentClaimTriple contentClaimTriple = consumeContent(dis, out, contentClaim, claimOffset, peerDescription, compression == LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
@@ -521,6 +522,7 @@
.size(contentClaimTriple.getContentLength())
.entryDate(entryDate)
.lineageStart(lineageStartDate, lineageStartIndex.getAndIncrement())
+ .penaltyExpirationTime(penaltyExpirationMillis)
.build();
logger.debug("Received FlowFile {} with {} attributes and {} bytes of content", flowFileRecord, attributes.size(), contentClaimTriple.getContentLength());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
index 5cc3d7e..95a204b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
@@ -22,6 +22,7 @@
import org.apache.nifi.controller.queue.DropFlowFileAction;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
@@ -326,6 +327,29 @@
}
@Test
+ public void testPollWithPenalizedFlowFile() {
+ final FlowFileRecord penalizedFlowFile = mock(FlowFileRecord.class);
+ when(penalizedFlowFile.isPenalized()).thenReturn(true);
+ assertTrue(queue.isEmpty());
+ queue.put(penalizedFlowFile);
+
+ final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+ FlowFileRecord polled = queue.poll(expiredRecords, 0, PollStrategy.UNPENALIZED_FLOWFILES);
+ assertNull(polled);
+
+ assertFalse(queue.isEmpty());
+
+ polled = queue.poll(expiredRecords, 0, PollStrategy.ALL_FLOWFILES);
+ assertNotNull(polled);
+ assertSame(penalizedFlowFile, polled);
+
+ // queue is still not empty because FlowFile has not yet been acknowledged.
+ queue.acknowledge(polled);
+
+ assertTrue(queue.isEmpty());
+ }
+
+ @Test
public void testPollWithOnlyExpiredFlowFile() {
final FlowFileRecord expiredFlowFile = mock(FlowFileRecord.class);
when(expiredFlowFile.getEntryDate()).thenReturn(System.currentTimeMillis() - 5000L);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
index cfe4e2b..2ae0f0b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
@@ -148,7 +148,7 @@
expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE);
expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
- expectedDos.writeInt(68); // metadata length
+ expectedDos.writeInt(76); // metadata length
expectedDos.writeInt(1); // 1 attribute
expectedDos.writeInt(4); // length of attribute
expectedDos.write("uuid".getBytes());
@@ -156,13 +156,14 @@
expectedDos.write(flowFile1.getAttribute("uuid").getBytes());
expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date
expectedDos.writeLong(flowFile1.getEntryDate()); // entry date
+ expectedDos.writeLong(flowFile1.getPenaltyExpirationMillis()); // penalty expiration time
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
expectedDos.writeInt(5);
expectedDos.write("hello".getBytes());
expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
- expectedDos.writeInt(68); // metadata length
+ expectedDos.writeInt(76); // metadata length
expectedDos.writeInt(1); // 1 attribute
expectedDos.writeInt(4); // length of attribute
expectedDos.write("uuid".getBytes());
@@ -170,6 +171,7 @@
expectedDos.write(flowFile2.getAttribute("uuid").getBytes());
expectedDos.writeLong(flowFile2.getLineageStartDate()); // lineage start date
expectedDos.writeLong(flowFile2.getEntryDate()); // entry date
+ expectedDos.writeLong(flowFile2.getPenaltyExpirationMillis()); // penalty expiration time
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
expectedDos.writeInt(8);
expectedDos.write("good-bye".getBytes());
@@ -235,7 +237,7 @@
expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE);
expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
- expectedDos.writeInt(68); // metadata length
+ expectedDos.writeInt(76); // metadata length
expectedDos.writeInt(1); // 1 attribute
expectedDos.writeInt(4); // length of attribute
expectedDos.write("uuid".getBytes());
@@ -243,6 +245,7 @@
expectedDos.write(flowFile1.getAttribute("uuid").getBytes());
expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date
expectedDos.writeLong(flowFile1.getEntryDate()); // entry date
+ expectedDos.writeLong(flowFile1.getPenaltyExpirationMillis()); // penalty expiration time
// first data frame
expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
index bfd6f36..d74c213 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
@@ -633,6 +633,7 @@
out.writeLong(0L); // lineage start date
out.writeLong(0L); // entry date
+ out.writeLong(0L); // penalty expiration time
dos.writeInt(baos.size());
baos.writeTo(dos);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index 99de3a3..af26727 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -275,7 +275,7 @@
Mockito.doAnswer(new Answer<List<FlowFileRecord>>() {
@Override
public List<FlowFileRecord> answer(InvocationOnMock invocation) throws Throwable {
- return localFlowFileQueue.poll(invocation.getArgument(0), invocation.getArgument(1));
+ return localFlowFileQueue.poll((FlowFileFilter) invocation.getArgument(0), invocation.getArgument(1));
}
}).when(connection).poll(any(FlowFileFilter.class), any(Set.class));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 44db6cc..712a9e7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -47,6 +47,7 @@
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
+import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueDiagnostics;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
@@ -219,16 +220,31 @@
}
@Override
+ public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords, PollStrategy pollStrategy) {
+ return null;
+ }
+
+ @Override
public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
return null;
}
@Override
+ public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, PollStrategy pollStrategy) {
+ return null;
+ }
+
+ @Override
public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
return null;
}
@Override
+ public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, PollStrategy pollStrategy) {
+ return null;
+ }
+
+ @Override
public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
return null;
}
diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java
index f2d78a2..3b4a621 100644
--- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java
+++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java
@@ -21,6 +21,7 @@
import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueDiagnostics;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
@@ -162,7 +163,7 @@
}
@Override
- public synchronized FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
+ public synchronized FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
while (!flowFiles.isEmpty()) {
final FlowFileRecord flowFile = flowFiles.peek();
if (flowFile == null) {
@@ -178,7 +179,7 @@
continue;
}
- if (flowFile.isPenalized()) {
+ if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
return null;
}
@@ -189,6 +190,11 @@
return null;
}
+ @Override
+ public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
+ return poll(expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+ }
+
private boolean isExpired(final FlowFileRecord flowFile) {
if (expirationMillis == 0L) {
return false;
@@ -199,10 +205,10 @@
}
@Override
- public synchronized List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords) {
+ public synchronized List<FlowFileRecord> poll(final int maxResults, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
final List<FlowFileRecord> selected = new ArrayList<>(Math.min(maxResults, flowFiles.size()));
for (int i=0; i < maxResults; i++) {
- final FlowFileRecord flowFile = poll(expiredRecords);
+ final FlowFileRecord flowFile = poll(expiredRecords, pollStrategy);
if (flowFile != null) {
selected.add(flowFile);
}
@@ -216,7 +222,12 @@
}
@Override
- public synchronized List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
+ public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) {
+ return poll(maxResults, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+ }
+
+ @Override
+ public synchronized List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) {
final List<FlowFileRecord> selected = new ArrayList<>();
// Use an iterator to iterate over all FlowFiles in the queue. This allows us to
@@ -235,7 +246,7 @@
continue;
}
- if (flowFile.isPenalized()) {
+ if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
break;
}
@@ -255,6 +266,11 @@
}
@Override
+ public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
+ return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
+ }
+
+ @Override
public String getFlowFileExpiration() {
return expirationMillis + " millis";
}