QPID-7791: Ensure that threads used by the recoverer dispose thread-locally cached QBBs
Cherry picked from 95ee1329cdc9b2a3c5fb0903bb61105b14eb2f37. Conflicts resolved by hand and source downgraded back to JDK 1.7
diff --git a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index fabbd2d..d77dda2 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -30,8 +30,8 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -47,6 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor;
import org.apache.qpid.server.util.FutureHelper;
public class TaskExecutorImpl implements TaskExecutor
@@ -84,15 +85,26 @@
if (_running.compareAndSet(false, true))
{
LOGGER.debug("Starting task executor {}", _name);
- _executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1, new ThreadFactory()
- {
- @Override
- public Thread newThread(Runnable r)
- {
- _taskThread = new TaskThread(r, _name, TaskExecutorImpl.this);
- return _taskThread;
- }
- }));
+ _executor = MoreExecutors.listeningDecorator(new QpidByteBufferDisposingThreadPoolExecutor(1,
+ 1,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactory()
+ {
+ @Override
+ public Thread
+ newThread(
+ final Runnable r)
+ {
+ _taskThread =
+ new TaskThread(
+ r,
+ _name,
+ TaskExecutorImpl.this);
+ return _taskThread;
+ }
+ }));
LOGGER.debug("Task executor is started");
}
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java b/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java
new file mode 100644
index 0000000..77ef166
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.pool;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
+public class QpidByteBufferDisposingThreadPoolExecutor extends ThreadPoolExecutor
+{
+ private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
+
+ public QpidByteBufferDisposingThreadPoolExecutor(final int corePoolSize,
+ final int maximumPoolSize,
+ final long keepAliveTime,
+ final TimeUnit unit,
+ final BlockingQueue<Runnable> workQueue)
+ {
+ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory());
+ }
+
+ public QpidByteBufferDisposingThreadPoolExecutor(final int corePoolSize,
+ final int maximumPoolSize,
+ final long keepAliveTime,
+ final TimeUnit unit,
+ final BlockingQueue<Runnable> workQueue,
+ final ThreadFactory factory)
+ {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, factory);
+ }
+
+ @Override
+ protected void afterExecute(final Runnable r, final Throwable t)
+ {
+ super.afterExecute(r, t);
+ final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
+ if (cachedThreadLocalBuffer != null)
+ {
+ _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
+ }
+ else
+ {
+ _cachedBufferMap.remove(Thread.currentThread());
+ }
+ }
+
+ @Override
+ protected void terminated()
+ {
+ super.terminated();
+ for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+ {
+ qpidByteBuffer.dispose();
+ }
+ _cachedBufferMap.clear();
+ }
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
index bdb1a4b..9837130 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
@@ -34,9 +34,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor;
public class NetworkConnectionScheduler
{
@@ -103,38 +103,9 @@
try
{
_selectorThread = new SelectorThread(this, _numberOfSelectors);
- _executor = new ThreadPoolExecutor(_poolSize, _poolSize,
- _threadKeepAliveTimeout, TimeUnit.MINUTES,
- new LinkedBlockingQueue<Runnable>(), _factory)
- {
- private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>();
-
- @Override
- protected void afterExecute(final Runnable r, final Throwable t)
- {
- super.afterExecute(r, t);
- final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer();
- if (cachedThreadLocalBuffer != null)
- {
- _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer);
- }
- else
- {
- _cachedBufferMap.remove(Thread.currentThread());
- }
- }
-
- @Override
- protected void terminated()
- {
- super.terminated();
- for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
- {
- qpidByteBuffer.dispose();
- }
- _cachedBufferMap.clear();
- }
- };
+ _executor = new QpidByteBufferDisposingThreadPoolExecutor(_poolSize, _poolSize,
+ _threadKeepAliveTimeout, TimeUnit.MINUTES,
+ new LinkedBlockingQueue<Runnable>(), _factory);
_executor.prestartAllCoreThreads();
_executor.allowCoreThreadTimeOut(true);
for(int i = 0 ; i < _poolSize; i++)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index fc4d72d..90aab2c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -27,7 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,6 +60,7 @@
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
@@ -99,7 +100,11 @@
private final Set<Queue<?>> _recoveringQueues = new CopyOnWriteArraySet<>();
private final AtomicBoolean _recoveryComplete = new AtomicBoolean();
private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>();
- private final ListeningExecutorService _queueRecoveryExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ private final ListeningExecutorService _queueRecoveryExecutor =
+ MoreExecutors.listeningDecorator(new QpidByteBufferDisposingThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>()));
+
private final MessageStore.MessageStoreReader _storeReader;
private AtomicBoolean _continueRecovery = new AtomicBoolean(true);