Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/nifi
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index 7c90342..46891a4 100644
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -169,8 +169,18 @@
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
- final int flowFilesBinned = binFlowFiles(context, sessionFactory);
- getLogger().debug("Binned {} FlowFiles", new Object[]{flowFilesBinned});
+ final int totalBinCount = binManager.getBinCount() + readyBins.size();
+ final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
+ final int flowFilesBinned;
+
+ if (totalBinCount < maxBinCount) {
+ flowFilesBinned = binFlowFiles(context, sessionFactory);
+ getLogger().debug("Binned {} FlowFiles", new Object[] {flowFilesBinned});
+ } else {
+ flowFilesBinned = 0;
+ getLogger().debug("Will not bin any FlowFiles because {} bins already exist;"
+ + "will wait until bins have been emptied before any more are created", new Object[] {totalBinCount});
+ }
if (!isScheduled()) {
return;
@@ -194,7 +204,7 @@
// if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
// this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
// bins. So we may as well expire it now.
- if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
+ if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
final Bin bin = binManager.removeOldestBin();
if (bin != null) {
added++;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 7057dff..e06befb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -323,9 +323,6 @@
@OnScheduled
public void schedule(ProcessContext context) {
this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
- if (this.executor == null || this.executor.isShutdown()) {
- this.executor = Executors.newCachedThreadPool();
- }
}
@Override
@@ -335,6 +332,9 @@
* of onTrigger. Will be reset to 'false' in the event of exception
*/
synchronized (this.consumerStreamsReady) {
+ if (this.executor == null || this.executor.isShutdown()) {
+ this.executor = Executors.newCachedThreadPool();
+ }
if (!this.consumerStreamsReady.get()) {
Future<Void> f = this.executor.submit(new Callable<Void>() {
@Override