QPID-8303: [Broker-J][JDBC Message Store] Batch delete fails when deleting exactly 1000 messages
This closes #28
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index d7a391b..2dc992f 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -44,6 +44,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -472,13 +473,8 @@
{
try
{
- for (int i = 0; i <= messageIds.size() / IN_CLAUSE_MAX_SIZE; i++)
+ for (List<Long> boundMessageIds : Lists.partition(messageIds, IN_CLAUSE_MAX_SIZE))
{
- List<Long> boundMessageIds = messageIds.stream()
- .skip(i * IN_CLAUSE_MAX_SIZE)
- .limit(IN_CLAUSE_MAX_SIZE)
- .collect(Collectors.toList());
-
removeMessagesFromDatabase(conn, boundMessageIds);
}
}
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 84b93a2..4ec6e1b 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -36,6 +36,7 @@
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -183,6 +184,12 @@
GenericJDBCMessageStore store = spy((GenericJDBCMessageStore) getStore());
when(store.newConnection()).thenReturn(mock(Connection.class, Mockito.RETURNS_MOCKS));
+ store.removeMessages(LongStream.rangeClosed(1,1000).boxed().collect(Collectors.toList()));
+
+ verify(store).removeMessagesFromDatabase(any(Connection.class), any(List.class));
+
+ Mockito.reset(store);
+
store.removeMessages(LongStream.rangeClosed(1,2001).boxed().collect(Collectors.toList()));
verify(store).removeMessagesFromDatabase(any(Connection.class), eq(LongStream.rangeClosed(1,1000).boxed().collect(Collectors.toList())));