ARTEMIS-2544 Remove rolledback PageTransactionInfo to free up memory
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
index 1b92b90..4684b8b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
@@ -113,7 +113,8 @@
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID);
}
-
+ }
+ if (pagingManager != null) {
pagingManager.removeTransaction(this.transactionID);
}
return false;
@@ -242,6 +243,7 @@
if (lateDeliveries != null) {
for (LateDelivery pos : lateDeliveries) {
pos.getSubscription().lateDeliveryRollback(pos.getPagePosition());
+ onUpdate(1, null, pos.getSubscription().getPagingStore().getPagingManager());
}
lateDeliveries = null;
}
@@ -283,6 +285,7 @@
logger.trace("rolled back, position ignored on " + cursor + ", position=" + cursorPos);
}
cursor.positionIgnored(cursorPos);
+ onUpdate(1, null, cursor.getPagingStore().getPagingManager());
return true;
} else {
if (logger.isTraceEnabled()) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index c020fbe..cda5311 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -70,6 +70,7 @@
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
@@ -6838,6 +6839,66 @@
}
}
+ @Test
+ public void testRollbackPageTransactionBeforeDelivery() throws Exception {
+ testRollbackPageTransaction(true);
+ }
+
+ @Test
+ public void testRollbackPageTransactionAfterDelivery() throws Exception {
+ testRollbackPageTransaction(false);
+ }
+
+ private void testRollbackPageTransaction(boolean rollbackBeforeDelivery) throws Exception {
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultInVMConfig();
+
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
+
+ server.start();
+
+ final int numberOfMessages = 2;
+
+ locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession(null, null, false, false, true, false, 0);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+ queue.getPageSubscription().getPagingStore().startPaging();
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ if (rollbackBeforeDelivery) {
+ sendMessages(session, producer, numberOfMessages);
+ session.rollback();
+ assertEquals(server.getPagingManager().getTransactions().size(), 1);
+ PageTransactionInfo pageTransactionInfo = server.getPagingManager().getTransactions().values().iterator().next();
+ // Make sure rollback happens before delivering messages
+ Wait.assertTrue(() -> pageTransactionInfo.isRollback(), 1000, 100);
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+ session.start();
+ Assert.assertNull(consumer.receiveImmediate());
+ assertTrue(server.getPagingManager().getTransactions().isEmpty());
+ } else {
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+ session.start();
+ sendMessages(session, producer, numberOfMessages);
+ Assert.assertNull(consumer.receiveImmediate());
+ assertEquals(server.getPagingManager().getTransactions().size(), 1);
+ PageTransactionInfo pageTransactionInfo = server.getPagingManager().getTransactions().values().iterator().next();
+ session.rollback();
+ Wait.assertTrue(() -> pageTransactionInfo.isRollback(), 1000, 100);
+ assertTrue(server.getPagingManager().getTransactions().isEmpty());
+ }
+
+ session.close();
+ }
+
@Override
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(serverID, netty);