QPID-8303: [Broker-J][JDBC Message Store] Add test
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 4ec6e1b..d19c034 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -34,6 +34,7 @@
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -47,6 +48,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -59,7 +61,6 @@
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.jdbc.JDBCVirtualHost;
-import org.mockito.Mockito;
public class JDBCMessageStoreTest extends MessageStoreTestCase
{
@@ -197,6 +198,60 @@
verify(store).removeMessagesFromDatabase(any(Connection.class), eq(Collections.singletonList(2001L)));
}
+ @Test
+ public void testRemoveMessages1000()
+ {
+ final String queueName = getTestName();
+ final UUID transactionalLogId = UUID.randomUUID();
+ final TransactionLogResource resource = mockTransactionLogResource(transactionalLogId, queueName);
+ final int numberOfMessages = 1000;
+ final GenericJDBCMessageStore store = (GenericJDBCMessageStore) getStore();
+
+ final List<MessageEnqueueRecord> records = enqueueMessages(store, resource, numberOfMessages);
+ assertEquals(numberOfMessages, records.size());
+ assertRecords(store, resource, records);
+
+ store.removeMessages(records.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toList()));
+
+ final List<MessageEnqueueRecord> stored = new ArrayList<>();
+ store.newMessageStoreReader().visitMessageInstances(resource, (r) -> {
+ stored.add(r);
+ return true;
+ });
+
+ assertTrue(stored.isEmpty());
+ }
+
+ private List<MessageEnqueueRecord> enqueueMessages(final MessageStore store,
+ final TransactionLogResource resource,
+ final int numberOfMessages)
+ {
+ final Transaction transaction = store.newTransaction();
+ final String name = resource.getName();
+ final List<MessageEnqueueRecord> records = LongStream.rangeClosed(1, numberOfMessages)
+ .boxed()
+ .map(i -> {
+ final InternalMessage m =
+ addTestMessage(store, name, i + "");
+ return transaction.enqueueMessage(resource, m);
+ }).collect(Collectors.toList());
+ transaction.commitTran();
+ return records;
+ }
+
+ private void assertRecords(final MessageStore store,
+ final TransactionLogResource resource,
+ final List<MessageEnqueueRecord> records)
+ {
+ final List<MessageEnqueueRecord> visited = new ArrayList<>();
+ store.newMessageStoreReader().visitMessageInstances(resource, (r) -> {
+ visited.add(r);
+ return true;
+ });
+ assertEquals(records.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toSet()),
+ visited.stream().map(MessageEnqueueRecord::getMessageNumber).collect(Collectors.toSet()));
+ }
+
private InternalMessage addTestMessage(final MessageStore store,
final String transactionalLogName,
final String messageContent)