QPID-8387: [Broker-J] Await for asyncroronous message removal tasks to complete on message store close

(cherry picked from commit 7193e26f6fad83f5ce81fd9ef855edb0fd5594ea)
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index e1181b2..21c7d87 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -41,6 +41,7 @@
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -91,6 +92,8 @@
 
     private static final int EXECUTOR_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors();
     private static final String EXECUTOR_THREADS = "qpid.jdbcstore.executorThreads";
+    private static final String EXECUTOR_SHUTDOWN_TIMEOUT = "qpid.jdbcstore.executorShutdownTimeoutInSeconds";
+    private static final int EXECUTOR_SHUTDOWN_TIMEOUT_DEFAULT = 5;
 
     private static final int DB_VERSION = 8;
 
@@ -116,6 +119,7 @@
     protected abstract void checkMessageStoreOpen();
     private ScheduledThreadPoolExecutor _executor;
     private volatile int _inClauseMaxSize;
+    private volatile int _executorShutdownTimeOut;
 
     public AbstractJDBCMessageStore()
     {
@@ -230,6 +234,8 @@
 
         int corePoolSize = getContextValue(Integer.class, EXECUTOR_THREADS, EXECUTOR_THREADS_DEFAULT);
 
+        _executorShutdownTimeOut = getContextValue(Integer.class, EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIMEOUT_DEFAULT);
+
         _executor = new ScheduledThreadPoolExecutor(corePoolSize, new ThreadFactory()
         {
             private final AtomicInteger _count = new AtomicInteger();
@@ -259,6 +265,21 @@
         if(_executor != null)
         {
             _executor.shutdown();
+            if (_executorShutdownTimeOut > 0)
+            {
+                try
+                {
+                    if (!_executor.awaitTermination(_executorShutdownTimeOut, TimeUnit.SECONDS))
+                    {
+                        _executor.shutdownNow();
+                    }
+                }
+                catch (InterruptedException e)
+                {
+                    getLogger().warn("Interrupted during store executor shutdown:", e);
+                    Thread.currentThread().interrupt();
+                }
+            }
         }
 
     }