QPID-8305: [Broker-J][JDBC Message Store] Create the pre-commit action in the constructor of JBDCTransaction
This closes #30
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 7c65499..b2359b7 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -604,6 +604,10 @@
private void enqueueMessages(ConnectionWrapper connWrapper, Map<Long, List<TransactionLogResource>> queuesPerMessage) throws StoreException
{
+ if (queuesPerMessage.isEmpty())
+ {
+ return;
+ }
Connection conn = connWrapper.getConnection();
String sql = String.format("INSERT INTO %s (queue_id, message_id) values (?,?)", getQueueEntryTableName());
@@ -1148,6 +1152,8 @@
{
throw new StoreException(e);
}
+
+ _preCommitActions.add(() -> AbstractJDBCMessageStore.this.enqueueMessages(_connWrapper, _messagesToEnqueue));
}
@Override
@@ -1170,13 +1176,6 @@
}
});
}
- if(_messagesToEnqueue.isEmpty())
- {
- _preCommitActions.add(() -> {
- AbstractJDBCMessageStore.this.enqueueMessages(_connWrapper, _messagesToEnqueue);
- _messagesToEnqueue.clear();
- });
- }
List<TransactionLogResource> queues = _messagesToEnqueue.computeIfAbsent(message.getMessageNumber(), messageId -> new ArrayList<>());
queues.add(queue);
return new JDBCEnqueueRecord(queue.getId(), message.getMessageNumber());
@@ -1220,6 +1219,7 @@
action.run();
}
_preCommitActions.clear();
+ _messagesToEnqueue.clear();
}
private void doPostCommitActions()