Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/nifi
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index 7c90342..46891a4 100644
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -169,8 +169,18 @@
 
     @Override
     public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
-        final int flowFilesBinned = binFlowFiles(context, sessionFactory);
-        getLogger().debug("Binned {} FlowFiles", new Object[]{flowFilesBinned});
+        final int totalBinCount = binManager.getBinCount() + readyBins.size();
+        final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
+        final int flowFilesBinned;
+
+        if (totalBinCount < maxBinCount) {
+            flowFilesBinned = binFlowFiles(context, sessionFactory);
+            getLogger().debug("Binned {} FlowFiles", new Object[] {flowFilesBinned});
+        } else {
+            flowFilesBinned = 0;
+            getLogger().debug("Will not bin any FlowFiles because {} bins already exist;"
+                + "will wait until bins have been emptied before any more are created", new Object[] {totalBinCount});
+        }
 
         if (!isScheduled()) {
             return;
@@ -194,7 +204,7 @@
         // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
         // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
         // bins. So we may as well expire it now.
-        if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
+        if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
             final Bin bin = binManager.removeOldestBin();
             if (bin != null) {
                 added++;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 7057dff..e06befb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -323,9 +323,6 @@
     @OnScheduled
     public void schedule(ProcessContext context) {
         this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
-        if (this.executor == null || this.executor.isShutdown()) {
-            this.executor = Executors.newCachedThreadPool();
-        }
     }
 
     @Override
@@ -335,6 +332,9 @@
          * of onTrigger. Will be reset to 'false' in the event of exception
          */
         synchronized (this.consumerStreamsReady) {
+            if (this.executor == null || this.executor.isShutdown()) {
+                this.executor = Executors.newCachedThreadPool();
+            }
             if (!this.consumerStreamsReady.get()) {
                 Future<Void> f = this.executor.submit(new Callable<Void>() {
                     @Override