QPID-8242 : JDBC store should remove message content/metadata asynchronously
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 4ad0fe6..e92aa73 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -38,8 +38,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -83,6 +86,10 @@
private final AtomicLong _messageId = new AtomicLong(0);
+ private static final List<Long> EMPTY_LIST = Collections.emptyList();
+ private final AtomicReference<List<Long>> _messagesToDelete = new AtomicReference<>(EMPTY_LIST);
+ private final AtomicBoolean _messageRemovalScheduled = new AtomicBoolean();
+
protected final EventManager _eventManager = new EventManager();
private ConfiguredObject<?> _parent;
@@ -409,56 +416,99 @@
return _messageId.incrementAndGet();
}
- private void removeMessage(long messageId)
+ private void removeMessageAsync(long messageId)
{
- try(Connection conn = newConnection())
+ List<Long> orig;
+ List<Long> updated;
+ do
{
- try
- {
- try(PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + getMetaDataTableName()
- + " WHERE message_id = ?"))
+ orig = _messagesToDelete.get();
+ updated = new ArrayList<>(orig.size()+1);
+ updated.addAll(orig);
+ updated.add(messageId);
+ updated = Collections.unmodifiableList(updated);
+ } while (! _messagesToDelete.compareAndSet(orig, updated));
+ scheduleMessageRemoval();
+ }
+
+ private void scheduleMessageRemoval()
+ {
+ if(_messageRemovalScheduled.compareAndSet(false, true))
+ {
+ _executor.submit(() -> {
+ List<Long> messageIds;
+ do
{
- stmt.setLong(1, messageId);
- int results = stmt.executeUpdate();
- stmt.close();
+ messageIds = _messagesToDelete.getAndSet(EMPTY_LIST);
+ removeMessages(messageIds);
+ } while(!messageIds.isEmpty());
- if (results == 0)
- {
- getLogger().debug(
- "Message id {} not found (attempt to remove failed - probably application initiated rollback)",
-
- messageId);
- }
-
- getLogger().debug("Deleted metadata for message {}", messageId);
+ _messageRemovalScheduled.set(false);
+ if(!_messagesToDelete.get().isEmpty())
+ {
+ scheduleMessageRemoval();
}
- try(PreparedStatement stmt = conn.prepareStatement("DELETE FROM " + getMessageContentTableName()
- + " WHERE message_id = ?"))
- {
+ });
+ }
+ }
- stmt.setLong(1, messageId);
- int results = stmt.executeUpdate();
- }
- conn.commit();
- }
- catch(SQLException e)
+ boolean isMessageRemovalScheduled()
+ {
+ return _messageRemovalScheduled.get();
+ }
+
+ private void removeMessages(List<Long> messageIds)
+ {
+ if(messageIds != null && !messageIds.isEmpty())
+ {
+ String inpart = messageIds.stream().map(Object::toString).collect(Collectors.joining(", ", "(", ")"));
+ try(Connection conn = newConnection())
{
try
{
- conn.rollback();
- }
- catch(SQLException t)
- {
- // ignore - we are re-throwing underlying exception
- }
+ try(Statement stmt = conn.createStatement())
+ {
+ int results = stmt.executeUpdate("DELETE FROM " + getMetaDataTableName()
+ + " WHERE message_id IN " + inpart);
+ stmt.close();
- throw e;
+ if (results != messageIds.size())
+ {
+ getLogger().debug(
+ "Some message ids in {} not found (attempt to remove failed - probably application initiated rollback)",
+
+ messageIds);
+ }
+
+ getLogger().debug("Deleted metadata for messages {}", messageIds);
+ }
+
+ try(Statement stmt = conn.createStatement())
+ {
+ int results = stmt.executeUpdate("DELETE FROM " + getMessageContentTableName()
+ + " WHERE message_id IN " + inpart);
+ }
+ conn.commit();
+ }
+ catch(SQLException e)
+ {
+ try
+ {
+ conn.rollback();
+ }
+ catch(SQLException t)
+ {
+ // ignore - we are re-throwing underlying exception
+ }
+
+ throw e;
+ }
}
- }
- catch (SQLException e)
- {
- throw new StoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
+ catch (SQLException e)
+ {
+ throw new StoreException("Error removing messages with ids " + messageIds + " from database: " + e.getMessage(), e);
+ }
}
}
@@ -1510,7 +1560,7 @@
_messages.remove(this);
if(stored())
{
- AbstractJDBCMessageStore.this.removeMessage(_messageId);
+ AbstractJDBCMessageStore.this.removeMessageAsync(_messageId);
storedSizeChange(-getContentSize());
}
@@ -1652,7 +1702,7 @@
public void visitMessages(MessageHandler handler) throws StoreException
{
checkMessageStoreOpen();
-
+ while(isMessageRemovalScheduled());
try(Connection conn = newAutoCommitConnection())
{
try (Statement stmt = conn.createStatement())