QPID-8387: [Broker-J] Handle exceptions thrown on asynchromous message removal in JDBC-based message stores

(cherry picked from commit a597b9051f67287ab64cd7c1a966bfeab239088d)
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index df02e55..e1181b2 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -38,6 +38,7 @@
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -108,6 +109,7 @@
     private final Set<StoredJDBCMessage<?>> _messages = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Set<MessageDeleteListener> _messageDeleteListeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Set<Action<Connection>> _deleteActions = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private final Thread.UncaughtExceptionHandler _uncaughtExceptionHandler;
 
     protected abstract boolean isMessageStoreOpen();
 
@@ -117,6 +119,7 @@
 
     public AbstractJDBCMessageStore()
     {
+        _uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
     }
 
     protected void setMaximumMessageId()
@@ -453,21 +456,67 @@
     {
         if(_messageRemovalScheduled.compareAndSet(false, true))
         {
-            _executor.submit(() -> {
-                List<Long> messageIds;
-                do
-                {
-                    messageIds = _messagesToDelete.getAndSet(EMPTY_LIST);
-                    removeMessages(messageIds);
-                } while(!messageIds.isEmpty());
-
+            try
+            {
+                _executor.submit(this::removeScheduledMessages);
+            }
+            catch (RejectedExecutionException e)
+            {
                 _messageRemovalScheduled.set(false);
-                if(!_messagesToDelete.get().isEmpty())
-                {
-                    scheduleMessageRemoval();
-                }
+                throw new IllegalStateException("Cannot schedule removal of messages", e);
+            }
+        }
+    }
 
-            });
+    private void removeScheduledMessages()
+    {
+        try
+        {
+            removeScheduledMessagesAndRescheduleIfRequired();
+        }
+        catch (RuntimeException e)
+        {
+            handleExceptionOnScheduledMessageRemoval(e);
+        }
+    }
+
+    private void removeScheduledMessagesAndRescheduleIfRequired()
+    {
+        List<Long> messageIds;
+        try
+        {
+            do
+            {
+                messageIds = _messagesToDelete.getAndSet(EMPTY_LIST);
+                removeMessages(messageIds);
+            } while (!messageIds.isEmpty());
+        }
+        finally
+        {
+            _messageRemovalScheduled.set(false);
+        }
+        if (!_messagesToDelete.get().isEmpty() && isMessageStoreOpen())
+        {
+            scheduleMessageRemoval();
+        }
+    }
+
+    private void handleExceptionOnScheduledMessageRemoval(final RuntimeException e)
+    {
+        if (isMessageStoreOpen())
+        {
+            if (_uncaughtExceptionHandler == null)
+            {
+                getLogger().error("Unexpected exception on asynchronous message removal", e);
+            }
+            else
+            {
+                _uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
+            }
+        }
+        else
+        {
+            getLogger().warn("Ignoring unexpected exception on asynchronous message removal as store is not open", e);
         }
     }