QPID-8387: [Broker-J] Handle exceptions thrown on asynchromous message removal in JDBC-based message stores
(cherry picked from commit a597b9051f67287ab64cd7c1a966bfeab239088d)
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index df02e55..e1181b2 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -38,6 +38,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -108,6 +109,7 @@
private final Set<StoredJDBCMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<MessageDeleteListener> _messageDeleteListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Action<Connection>> _deleteActions = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final Thread.UncaughtExceptionHandler _uncaughtExceptionHandler;
protected abstract boolean isMessageStoreOpen();
@@ -117,6 +119,7 @@
public AbstractJDBCMessageStore()
{
+ _uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
}
protected void setMaximumMessageId()
@@ -453,21 +456,67 @@
{
if(_messageRemovalScheduled.compareAndSet(false, true))
{
- _executor.submit(() -> {
- List<Long> messageIds;
- do
- {
- messageIds = _messagesToDelete.getAndSet(EMPTY_LIST);
- removeMessages(messageIds);
- } while(!messageIds.isEmpty());
-
+ try
+ {
+ _executor.submit(this::removeScheduledMessages);
+ }
+ catch (RejectedExecutionException e)
+ {
_messageRemovalScheduled.set(false);
- if(!_messagesToDelete.get().isEmpty())
- {
- scheduleMessageRemoval();
- }
+ throw new IllegalStateException("Cannot schedule removal of messages", e);
+ }
+ }
+ }
- });
+ private void removeScheduledMessages()
+ {
+ try
+ {
+ removeScheduledMessagesAndRescheduleIfRequired();
+ }
+ catch (RuntimeException e)
+ {
+ handleExceptionOnScheduledMessageRemoval(e);
+ }
+ }
+
+ private void removeScheduledMessagesAndRescheduleIfRequired()
+ {
+ List<Long> messageIds;
+ try
+ {
+ do
+ {
+ messageIds = _messagesToDelete.getAndSet(EMPTY_LIST);
+ removeMessages(messageIds);
+ } while (!messageIds.isEmpty());
+ }
+ finally
+ {
+ _messageRemovalScheduled.set(false);
+ }
+ if (!_messagesToDelete.get().isEmpty() && isMessageStoreOpen())
+ {
+ scheduleMessageRemoval();
+ }
+ }
+
+ private void handleExceptionOnScheduledMessageRemoval(final RuntimeException e)
+ {
+ if (isMessageStoreOpen())
+ {
+ if (_uncaughtExceptionHandler == null)
+ {
+ getLogger().error("Unexpected exception on asynchronous message removal", e);
+ }
+ else
+ {
+ _uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
+ }
+ }
+ else
+ {
+ getLogger().warn("Ignoring unexpected exception on asynchronous message removal as store is not open", e);
}
}