QPID-8242 : JDBC store should remove message content/metadata asynchronously
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 4ad0fe6..e92aa73 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -38,8 +38,11 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -83,6 +86,10 @@
 
     private final AtomicLong _messageId = new AtomicLong(0);
 
+    private static final List<Long> EMPTY_LIST = Collections.emptyList();
+    private final AtomicReference<List<Long>> _messagesToDelete = new AtomicReference<>(EMPTY_LIST);
+    private final AtomicBoolean _messageRemovalScheduled = new AtomicBoolean();
+
 
     protected final EventManager _eventManager = new EventManager();
     private ConfiguredObject<?> _parent;
@@ -409,56 +416,99 @@
         return _messageId.incrementAndGet();
     }
 
-    private void removeMessage(long messageId)
+    private void removeMessageAsync(long messageId)
     {
-        try(Connection conn = newConnection())
+        List<Long> orig;
+        List<Long> updated;
+        do
         {
-            try
-            {
-                try(PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + getMetaDataTableName()
-                                                                   + " WHERE message_id = ?"))
+            orig = _messagesToDelete.get();
+            updated = new ArrayList<>(orig.size()+1);
+            updated.addAll(orig);
+            updated.add(messageId);
+            updated = Collections.unmodifiableList(updated);
+        } while (! _messagesToDelete.compareAndSet(orig, updated));
+        scheduleMessageRemoval();
+    }
+
+    private void scheduleMessageRemoval()
+    {
+        if(_messageRemovalScheduled.compareAndSet(false, true))
+        {
+            _executor.submit(() -> {
+                List<Long> messageIds;
+                do
                 {
-                    stmt.setLong(1, messageId);
-                    int results = stmt.executeUpdate();
-                    stmt.close();
+                    messageIds = _messagesToDelete.getAndSet(EMPTY_LIST);
+                    removeMessages(messageIds);
+                } while(!messageIds.isEmpty());
 
-                    if (results == 0)
-                    {
-                        getLogger().debug(
-                                "Message id {} not found (attempt to remove failed - probably application initiated rollback)",
-
-                                messageId);
-                    }
-
-                    getLogger().debug("Deleted metadata for message {}", messageId);
+                _messageRemovalScheduled.set(false);
+                if(!_messagesToDelete.get().isEmpty())
+                {
+                    scheduleMessageRemoval();
                 }
 
-                try(PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + getMessageContentTableName()
-                + " WHERE message_id = ?"))
-                {
+            });
+        }
+    }
 
-                    stmt.setLong(1, messageId);
-                    int results = stmt.executeUpdate();
-                }
-                conn.commit();
-            }
-            catch(SQLException e)
+    boolean isMessageRemovalScheduled()
+    {
+        return _messageRemovalScheduled.get();
+    }
+
+    private void removeMessages(List<Long> messageIds)
+    {
+        if(messageIds != null && !messageIds.isEmpty())
+        {
+            String inpart = messageIds.stream().map(Object::toString).collect(Collectors.joining(", ", "(", ")"));
+            try(Connection conn = newConnection())
             {
                 try
                 {
-                    conn.rollback();
-                }
-                catch(SQLException t)
-                {
-                    // ignore - we are re-throwing underlying exception
-                }
+                    try(Statement stmt = conn.createStatement())
+                    {
+                        int results = stmt.executeUpdate("DELETE FROM " + getMetaDataTableName()
+                                     + " WHERE message_id IN " + inpart);
+                        stmt.close();
 
-                throw e;
+                        if (results != messageIds.size())
+                        {
+                            getLogger().debug(
+                                    "Some message ids in {} not found (attempt to remove failed - probably application initiated rollback)",
+
+                                    messageIds);
+                        }
+
+                        getLogger().debug("Deleted metadata for messages {}", messageIds);
+                    }
+
+                    try(Statement stmt = conn.createStatement())
+                    {
+                        int results = stmt.executeUpdate("DELETE FROM " + getMessageContentTableName()
+                                                         + " WHERE message_id IN " + inpart);
+                    }
+                    conn.commit();
+                }
+                catch(SQLException e)
+                {
+                    try
+                    {
+                        conn.rollback();
+                    }
+                    catch(SQLException t)
+                    {
+                        // ignore - we are re-throwing underlying exception
+                    }
+
+                    throw e;
+                }
             }
-        }
-        catch (SQLException e)
-        {
-            throw new StoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
+            catch (SQLException e)
+            {
+                throw new StoreException("Error removing messages with ids " + messageIds + " from database: " + e.getMessage(), e);
+            }
         }
 
     }
@@ -1510,7 +1560,7 @@
             _messages.remove(this);
             if(stored())
             {
-                AbstractJDBCMessageStore.this.removeMessage(_messageId);
+                AbstractJDBCMessageStore.this.removeMessageAsync(_messageId);
                 storedSizeChange(-getContentSize());
             }
 
@@ -1652,7 +1702,7 @@
         public void visitMessages(MessageHandler handler) throws StoreException
         {
             checkMessageStoreOpen();
-
+            while(isMessageRemovalScheduled());
             try(Connection conn = newAutoCommitConnection())
             {
                 try (Statement stmt = conn.createStatement())