QPID-7791: Ensure that threads used by the recoverer dispose thread-locally cached QBBs

Cherry picked from 95ee1329cdc9b2a3c5fb0903bb61105b14eb2f37.  Conflicts resolved by hand and source downgraded back to JDK 1.7
diff --git a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index fabbd2d..d77dda2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -30,8 +30,8 @@
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -47,6 +47,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor;
 import org.apache.qpid.server.util.FutureHelper;
 
 public class TaskExecutorImpl implements TaskExecutor
@@ -84,15 +85,26 @@
         if (_running.compareAndSet(false, true))
         {
             LOGGER.debug("Starting task executor {}", _name);
-            _executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1, new ThreadFactory()
-            {
-                @Override
-                public Thread newThread(Runnable r)
-                {
-                    _taskThread = new TaskThread(r, _name, TaskExecutorImpl.this);
-                    return _taskThread;
-                }
-            }));
+            _executor = MoreExecutors.listeningDecorator(new QpidByteBufferDisposingThreadPoolExecutor(1,
+                                                                                                       1,
+                                                                                                       0L,
+                                                                                                       TimeUnit.MILLISECONDS,
+                                                                                                       new LinkedBlockingQueue<Runnable>(),
+                                                                                                       new ThreadFactory()
+                                                                                                       {
+                                                                                                           @Override
+                                                                                                           public Thread
+                                                                                                           newThread(
+                                                                                                                   final Runnable r)
+                                                                                                           {
+                                                                                                               _taskThread =
+                                                                                                                       new TaskThread(
+                                                                                                                               r,
+                                                                                                                               _name,
+                                                                                                                               TaskExecutorImpl.this);
+                                                                                                               return _taskThread;
+                                                                                                           }
+                                                                                                       }));
             LOGGER.debug("Task executor is started");
         }
     }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java b/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java
new file mode 100644
index 0000000..77ef166
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.pool;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
+public class QpidByteBufferDisposingThreadPoolExecutor extends ThreadPoolExecutor
+{
+    private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
+
+    public QpidByteBufferDisposingThreadPoolExecutor(final int corePoolSize,
+                                                     final int maximumPoolSize,
+                                                     final long keepAliveTime,
+                                                     final TimeUnit unit,
+                                                     final BlockingQueue<Runnable> workQueue)
+    {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory());
+    }
+
+    public QpidByteBufferDisposingThreadPoolExecutor(final int corePoolSize,
+                                                     final int maximumPoolSize,
+                                                     final long keepAliveTime,
+                                                     final TimeUnit unit,
+                                                     final BlockingQueue<Runnable> workQueue,
+                                                     final ThreadFactory factory)
+    {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, factory);
+    }
+
+    @Override
+    protected void afterExecute(final Runnable r, final Throwable t)
+    {
+        super.afterExecute(r, t);
+        final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
+        if (cachedThreadLocalBuffer != null)
+        {
+            _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
+        }
+        else
+        {
+            _cachedBufferMap.remove(Thread.currentThread());
+        }
+    }
+
+    @Override
+    protected void terminated()
+    {
+        super.terminated();
+        for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+        {
+            qpidByteBuffer.dispose();
+        }
+        _cachedBufferMap.clear();
+    }
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
index bdb1a4b..9837130 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
@@ -34,9 +34,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.transport.TransportException;
 
+import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor;
 
 public class NetworkConnectionScheduler
 {
@@ -103,38 +103,9 @@
         try
         {
             _selectorThread = new SelectorThread(this, _numberOfSelectors);
-            _executor = new ThreadPoolExecutor(_poolSize, _poolSize,
-                                               _threadKeepAliveTimeout, TimeUnit.MINUTES,
-                                               new LinkedBlockingQueue<Runnable>(), _factory)
-            {
-                private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
-
-                @Override
-                protected void afterExecute(final Runnable r, final Throwable t)
-                {
-                    super.afterExecute(r, t);
-                    final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
-                    if (cachedThreadLocalBuffer != null)
-                    {
-                        _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
-                    }
-                    else
-                    {
-                        _cachedBufferMap.remove(Thread.currentThread());
-                    }
-                }
-
-                @Override
-                protected void terminated()
-                {
-                    super.terminated();
-                    for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
-                    {
-                        qpidByteBuffer.dispose();
-                    }
-                    _cachedBufferMap.clear();
-                }
-            };
+            _executor = new QpidByteBufferDisposingThreadPoolExecutor(_poolSize, _poolSize,
+                                                                      _threadKeepAliveTimeout, TimeUnit.MINUTES,
+                                                                      new LinkedBlockingQueue<Runnable>(), _factory);
             _executor.prestartAllCoreThreads();
             _executor.allowCoreThreadTimeOut(true);
             for(int i = 0 ; i < _poolSize; i++)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index fc4d72d..90aab2c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -27,7 +27,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -60,6 +60,7 @@
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.Xid;
 import org.apache.qpid.transport.util.Functions;
@@ -99,7 +100,11 @@
         private final Set<Queue<?>> _recoveringQueues = new CopyOnWriteArraySet<>();
         private final AtomicBoolean _recoveryComplete = new AtomicBoolean();
         private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>();
-        private final ListeningExecutorService _queueRecoveryExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+        private final ListeningExecutorService _queueRecoveryExecutor =
+                MoreExecutors.listeningDecorator(new QpidByteBufferDisposingThreadPoolExecutor(0, Integer.MAX_VALUE,
+                                                                                               60L, TimeUnit.SECONDS,
+                                                                                               new SynchronousQueue<Runnable>()));
+
         private final MessageStore.MessageStoreReader _storeReader;
         private AtomicBoolean _continueRecovery = new AtomicBoolean(true);