| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.artemis.tests.integration.paging; |
| |
| import javax.jms.Connection; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Session; |
| import javax.transaction.xa.XAResource; |
| import javax.transaction.xa.Xid; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.PrintWriter; |
| import java.lang.management.ManagementFactory; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.activemq.artemis.api.core.ActiveMQBuffer; |
| import org.apache.activemq.artemis.api.core.ActiveMQException; |
| import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; |
| import org.apache.activemq.artemis.api.core.Message; |
| import org.apache.activemq.artemis.api.core.Pair; |
| import org.apache.activemq.artemis.api.core.RoutingType; |
| import org.apache.activemq.artemis.api.core.SimpleString; |
| import org.apache.activemq.artemis.api.core.client.ClientConsumer; |
| import org.apache.activemq.artemis.api.core.client.ClientMessage; |
| import org.apache.activemq.artemis.api.core.client.ClientProducer; |
| import org.apache.activemq.artemis.api.core.client.ClientSession; |
| import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; |
| import org.apache.activemq.artemis.api.core.client.MessageHandler; |
| import org.apache.activemq.artemis.api.core.client.ServerLocator; |
| import org.apache.activemq.artemis.api.core.management.QueueControl; |
| import org.apache.activemq.artemis.api.core.management.ResourceNames; |
| import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; |
| import org.apache.activemq.artemis.core.config.Configuration; |
| import org.apache.activemq.artemis.core.config.DivertConfiguration; |
| import org.apache.activemq.artemis.core.config.StoreConfiguration; |
| import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; |
| import org.apache.activemq.artemis.core.filter.Filter; |
| import org.apache.activemq.artemis.core.io.IOCallback; |
| import org.apache.activemq.artemis.core.io.SequentialFileFactory; |
| import org.apache.activemq.artemis.core.journal.Journal; |
| import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; |
| import org.apache.activemq.artemis.core.journal.RecordInfo; |
| import org.apache.activemq.artemis.core.paging.PageTransactionInfo; |
| import org.apache.activemq.artemis.core.paging.PagingManager; |
| import org.apache.activemq.artemis.core.paging.PagingStore; |
| import org.apache.activemq.artemis.core.paging.PagingStoreFactory; |
| import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; |
| import org.apache.activemq.artemis.core.paging.cursor.PageIterator; |
| import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; |
| import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl; |
| import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl; |
| import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl; |
| import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase; |
| import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; |
| import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl; |
| import org.apache.activemq.artemis.core.persistence.OperationContext; |
| import org.apache.activemq.artemis.core.persistence.StorageManager; |
| import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe; |
| import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal; |
| import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe; |
| import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; |
| import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; |
| import org.apache.activemq.artemis.core.server.ActiveMQServer; |
| import org.apache.activemq.artemis.core.server.JournalType; |
| import org.apache.activemq.artemis.core.server.Queue; |
| import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; |
| import org.apache.activemq.artemis.core.server.impl.AddressInfo; |
| import org.apache.activemq.artemis.core.server.impl.QueueImpl; |
| import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; |
| import org.apache.activemq.artemis.core.settings.impl.AddressSettings; |
| import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; |
| import org.apache.activemq.artemis.logs.AssertionLoggerHandler; |
| import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl; |
| import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; |
| import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; |
| import org.apache.activemq.artemis.tests.util.Wait; |
| import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; |
| import org.jboss.logging.Logger; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Assume; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| @RunWith(Parameterized.class) |
| public class PagingTest extends ActiveMQTestBase { |
| |
| private static final Logger logger = Logger.getLogger(PagingTest.class); |
| |
| protected ServerLocator locator; |
| protected ActiveMQServer server; |
| protected ClientSessionFactory sf; |
| static final int MESSAGE_SIZE = 1024; // 1k |
| static final int LARGE_MESSAGE_SIZE = 100 * 1024; |
| |
| protected static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; |
| |
| protected static final int RECEIVE_TIMEOUT = 5000; |
| |
| protected static final int PAGE_MAX = 100 * 1024; |
| |
| protected static final int PAGE_SIZE = 10 * 1024; |
| |
| protected final boolean mapped; |
| |
| protected final StoreConfiguration.StoreType storeType; |
| |
| static final SimpleString ADDRESS = new SimpleString("SimpleAddress"); |
| |
| public PagingTest(StoreConfiguration.StoreType storeType, boolean mapped) { |
| this.storeType = storeType; |
| this.mapped = mapped; |
| } |
| |
| @Parameterized.Parameters(name = "storeType={0}, mapped={1}") |
| public static Collection<Object[]> data() { |
| Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE, false}, {StoreConfiguration.StoreType.FILE, true}, {StoreConfiguration.StoreType.DATABASE, false}}; |
| return Arrays.asList(params); |
| } |
| |
| @Before |
| public void checkLoggerStart() throws Exception { |
| AssertionLoggerHandler.startCapture(); |
| } |
| |
| @After |
| public void checkLoggerEnd() throws Exception { |
| try { |
| // These are the message errors for the negative size address size |
| Assert.assertFalse(AssertionLoggerHandler.findText("222214")); |
| Assert.assertFalse(AssertionLoggerHandler.findText("222215")); |
| } finally { |
| AssertionLoggerHandler.stopCapture(); |
| } |
| } |
| |
| @Override |
| @Before |
| public void setUp() throws Exception { |
| super.setUp(); |
| locator = createInVMNonHALocator(); |
| } |
| |
| @Test |
| public void testTooLongPageStoreTableNamePrefix() throws Exception { |
| if (storeType == StoreConfiguration.StoreType.DATABASE) { |
| final Configuration config = createDefaultInVMConfig(); |
| final DatabaseStorageConfiguration storageConfiguration = (DatabaseStorageConfiguration) config.getStoreConfiguration(); |
| //set the page store table to be longer than 10 chars -> the paging manager initialization will fail |
| storageConfiguration.setPageStoreTableName("PAGE_STORE_"); |
| |
| final int PAGE_MAX = 20 * 1024; |
| |
| final int PAGE_SIZE = 10 * 1024; |
| |
| final ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX); |
| server.start(); |
| |
| //due to a failed initialisation of the paging manager, it must be null |
| Assert.assertNull(server.getPagingManager()); |
| |
| server.stop(); |
| } |
| } |
| |
| @Test |
| public void testPageOnLargeMessageMultipleQueues() throws Exception { |
| internaltestOnLargetMessageMultipleQueues(MESSAGE_SIZE, true); |
| } |
| |
| @Test |
| public void testPageOnLargeMessageMultipleQueuesNoPersistence() throws Exception { |
| internaltestOnLargetMessageMultipleQueues(LARGE_MESSAGE_SIZE, false); |
| } |
| |
| private void internaltestOnLargetMessageMultipleQueues(final int messageSize, final boolean enablePersistence) throws Exception, ActiveMQException { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| final int PAGE_MAX = 20 * 1024; |
| |
| final int PAGE_SIZE = 10 * 1024; |
| |
| ActiveMQServer server = createServer(enablePersistence, config, PAGE_SIZE, PAGE_MAX); |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); |
| |
| ClientSession session = sf.createSession(null, null, false, true, true, false, 0); |
| |
| session.createQueue(ADDRESS, ADDRESS.concat("-0"), null, true); |
| session.createQueue(ADDRESS, ADDRESS.concat("-1"), null, true); |
| |
| ClientProducer producer = session.createProducer(ADDRESS); |
| |
| ClientMessage message = null; |
| |
| for (int i = 0; i < 201; i++) { |
| message = session.createMessage(true); |
| |
| message.getBodyBuffer().writerIndex(0); |
| |
| message.getBodyBuffer().writeBytes(new byte[messageSize]); |
| |
| for (int j = 1; j <= messageSize; j++) { |
| message.getBodyBuffer().writeInt(j); |
| } |
| |
| producer.send(message); |
| } |
| |
| session.close(); |
| |
| for (int ad = 0; ad < 2; ad++) { |
| session = sf.createSession(false, false, false); |
| |
| ClientConsumer consumer = session.createConsumer(ADDRESS.concat("-" + ad)); |
| |
| session.start(); |
| |
| for (int i = 0; i < 201; i++) { |
| ClientMessage message2 = consumer.receive(10000); |
| |
| Assert.assertNotNull(message2); |
| |
| message2.acknowledge(); |
| |
| Assert.assertNotNull(message2); |
| } |
| |
| if (ad > -1) { |
| session.commit(); |
| } else { |
| session.rollback(); |
| for (int i = 0; i < 100; i++) { |
| ClientMessage message2 = consumer.receive(10000); |
| |
| Assert.assertNotNull(message2); |
| |
| message2.acknowledge(); |
| |
| Assert.assertNotNull(message2); |
| } |
| session.commit(); |
| |
| } |
| |
| consumer.close(); |
| |
| session.close(); |
| } |
| } |
| |
| @Test |
| public void testPageTX() throws Exception { |
| AssertionLoggerHandler.startCapture(); |
| |
| try { |
| Configuration config = createDefaultInVMConfig(); |
| |
| final int PAGE_MAX = 20 * 1024; |
| |
| final int PAGE_SIZE = 10 * 1024; |
| |
| ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX); |
| server.start(); |
| |
| final int numberOfBytes = 1024; |
| |
| locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(false); |
| |
| ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); |
| |
| ClientSession session = sf.createSession(null, null, false, false, false, false, 0); |
| |
| session.createQueue(ADDRESS, ADDRESS.concat("-0"), null, true); |
| |
| server.getPagingManager().getPageStore(ADDRESS).forceAnotherPage(); |
| server.getPagingManager().getPageStore(ADDRESS).disableCleanup(); |
| session.start(); |
| |
| ClientProducer producer = session.createProducer(ADDRESS); |
| |
| ClientConsumer browserConsumer = session.createConsumer(ADDRESS.concat("-0"), true); |
| |
| ClientMessage message = null; |
| |
| for (int i = 0; i < 201; i++) { |
| message = session.createMessage(true); |
| |
| message.getBodyBuffer().writerIndex(0); |
| |
| message.getBodyBuffer().writeBytes(new byte[numberOfBytes]); |
| |
| for (int j = 1; j <= numberOfBytes; j++) { |
| message.getBodyBuffer().writeInt(j); |
| } |
| |
| producer.send(message); |
| session.commit(); |
| } |
| |
| ClientConsumer consumer = session.createConsumer(ADDRESS.concat("-0")); |
| |
| session.start(); |
| |
| for (int i = 0; i < 201; i++) { |
| ClientMessage message2 = consumer.receive(10000); |
| |
| Assert.assertNotNull(message2); |
| |
| message2.acknowledge(); |
| |
| Assert.assertNotNull(message2); |
| |
| session.commit(); |
| } |
| |
| consumer.close(); |
| |
| Queue queue = server.locateQueue(ADDRESS.concat("-0")); |
| |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| PageCursorProvider provider = store.getCursorProvider(); |
| |
| PageSubscription cursorSubscription = provider.getSubscription(queue.getID()); |
| PageIterator iterator = (PageIterator) cursorSubscription.iterator(); |
| |
| for (int i = 0; i < 5; i++) { |
| Assert.assertFalse(iterator.hasNext()); |
| Assert.assertNull(browserConsumer.receiveImmediate()); |
| } |
| |
| session.close(); |
| Assert.assertFalse(AssertionLoggerHandler.findText("Could not locate page")); |
| Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222029")); |
| server.getPagingManager().getPageStore(ADDRESS).enableCleanup(); |
| Wait.assertFalse(server.getPagingManager().getPageStore(ADDRESS)::isPaging); |
| } finally { |
| AssertionLoggerHandler.stopCapture(); |
| } |
| } |
| |
| @Test |
| public void testPageCleanup() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 5000; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| producer.close(); |
| session.close(); |
| |
| session = sf.createSession(false, false, false); |
| producer = session.createProducer(PagingTest.ADDRESS); |
| producer.send(session.createMessage(true)); |
| session.rollback(); |
| producer.close(); |
| session.close(); |
| |
| session = sf.createSession(false, false, false); |
| producer = session.createProducer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| producer.close(); |
| session.close(); |
| //System.out.println("Just sent " + numberOfMessages + " messages."); |
| |
| Queue queue = server.locateQueue(PagingTest.ADDRESS); |
| |
| session = sf.createSession(false, false, false); |
| |
| session.start(); |
| |
| Wait.assertEquals(numberOfMessages * 2, queue::getMessageCount); |
| |
| // The consumer has to be created after the getMessageCount(queue) assertion |
| // otherwise delivery could alter the messagecount and give us a false failure |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| ClientMessage msg = null; |
| |
| for (int i = 0; i < numberOfMessages * 2; i++) { |
| msg = consumer.receive(1000); |
| assertNotNull(msg); |
| msg.acknowledge(); |
| if (i % 500 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| consumer.close(); |
| session.close(); |
| |
| sf.close(); |
| |
| locator.close(); |
| |
| Wait.assertEquals(0, queue::getMessageCount); |
| |
| waitForNotPaging(queue); |
| |
| server.stop(); |
| |
| HashMap<Integer, AtomicInteger> counts = countJournalLivingRecords(server.getConfiguration()); |
| |
| AtomicInteger pgComplete = counts.get((int) JournalRecordIds.PAGE_CURSOR_COMPLETE); |
| |
| assertTrue(pgComplete == null || pgComplete.get() == 0); |
| |
| System.out.println("pgComplete = " + pgComplete); |
| } |
| |
| @Test |
| public void testQueueRemoveAll() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 5000; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| producer.close(); |
| session.close(); |
| |
| session = sf.createSession(false, false, false); |
| producer = session.createProducer(PagingTest.ADDRESS); |
| producer.send(session.createMessage(true)); |
| session.rollback(); |
| producer.close(); |
| session.close(); |
| |
| session = sf.createSession(false, false, false); |
| producer = session.createProducer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| producer.close(); |
| session.close(); |
| |
| Queue queue = server.locateQueue(PagingTest.ADDRESS); |
| |
| Wait.assertEquals(numberOfMessages * 2, queue::getMessageCount); |
| |
| QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource(ResourceNames.QUEUE + PagingSendTest.ADDRESS); |
| int removedMessages = queueControl.removeAllMessages(); |
| |
| Assert.assertEquals(numberOfMessages * 2, removedMessages); |
| } |
| |
| @Test |
| public void testQueueRetryMessages() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 500; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, new SimpleString(PagingTest.ADDRESS + "Queue"), null, true); |
| session.createQueue(PagingTest.ADDRESS + "Original", PagingTest.ADDRESS + "QueueOriginal", null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| producer.send(message); |
| |
| message = session.createMessage(true); |
| message.getBodyBuffer().writeBytes(body); |
| message.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, PagingTest.ADDRESS + "Original"); |
| message.putStringProperty(Message.HDR_ORIGINAL_QUEUE, PagingTest.ADDRESS + "QueueOriginal"); |
| producer.send(message); |
| |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| producer.close(); |
| session.close(); |
| |
| session = sf.createSession(false, false, false); |
| producer = session.createProducer(PagingTest.ADDRESS); |
| producer.send(session.createMessage(true)); |
| session.rollback(); |
| producer.close(); |
| session.close(); |
| |
| session = sf.createSession(false, false, false); |
| producer = session.createProducer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| producer.send(message); |
| |
| message = session.createMessage(true); |
| message.getBodyBuffer().writeBytes(body); |
| message.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, PagingTest.ADDRESS + "Original"); |
| message.putStringProperty(Message.HDR_ORIGINAL_QUEUE, PagingTest.ADDRESS + "QueueOriginal"); |
| producer.send(message); |
| |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| producer.close(); |
| session.close(); |
| |
| Queue queue = server.locateQueue(new SimpleString(PagingTest.ADDRESS + "Queue")); |
| Queue originalQueue = server.locateQueue(new SimpleString(PagingTest.ADDRESS + "QueueOriginal")); |
| |
| Wait.assertEquals(numberOfMessages * 4, queue::getMessageCount); |
| Wait.assertEquals(0, originalQueue::getMessageCount); |
| |
| QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource(ResourceNames.QUEUE + PagingSendTest.ADDRESS + "Queue"); |
| QueueControl originalQueueControl = (QueueControl) this.server.getManagementService().getResource(ResourceNames.QUEUE + PagingSendTest.ADDRESS + "QueueOriginal"); |
| queueControl.retryMessages(); |
| |
| Wait.assertEquals(numberOfMessages * 2, queue::getMessageCount, 5000); |
| Wait.assertEquals(numberOfMessages * 2, originalQueue::getMessageCount, 5000); |
| } |
| |
| @Test |
| public void testEmptyAddress() throws Exception { |
| if (storeType == StoreConfiguration.StoreType.FILE) { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 5000; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, RoutingType.ANYCAST, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage message = session.createMessage(true); |
| |
| message.getBodyBuffer().writeBytes(body); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| producer.close(); |
| session.close(); |
| |
| String addressTxt = server.getPagingManager().getPageStore(PagingTest.ADDRESS).getFolder().getAbsolutePath() + File.separator + PagingStoreFactoryNIO.ADDRESS_FILE; |
| |
| server.stop(); |
| |
| // delete contents of address.txt |
| new PrintWriter(addressTxt).close(); |
| |
| final AtomicBoolean activationFailures = new AtomicBoolean(); |
| |
| server.registerActivationFailureListener(exception -> activationFailures.set(true)); |
| |
| server.start(); |
| |
| server.stop(); |
| |
| assertFalse(activationFailures.get()); |
| } |
| } |
| |
| @Test |
| public void testPurge() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultNettyConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| String queue = "purgeQueue"; |
| SimpleString ssQueue = new SimpleString(queue); |
| server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST)); |
| QueueImpl purgeQueue = (QueueImpl) server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false); |
| |
| ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); |
| Connection connection = cf.createConnection(); |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| javax.jms.Queue jmsQueue = session.createQueue(queue); |
| |
| MessageProducer producer = session.createProducer(jmsQueue); |
| |
| for (int i = 0; i < 100; i++) { |
| producer.send(session.createTextMessage("hello" + i)); |
| } |
| session.commit(); |
| |
| Wait.assertEquals(0, purgeQueue::getMessageCount); |
| |
| Wait.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize); |
| |
| MessageConsumer consumer = session.createConsumer(jmsQueue); |
| |
| for (int i = 0; i < 100; i++) { |
| producer.send(session.createTextMessage("hello" + i)); |
| if (i == 10) { |
| purgeQueue.getPageSubscription().getPagingStore().startPaging(); |
| } |
| } |
| session.commit(); |
| |
| consumer.close(); |
| |
| Wait.assertEquals(0, purgeQueue::getMessageCount); |
| |
| Wait.assertFalse(purgeQueue.getPageSubscription()::isPaging); |
| |
| Wait.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize); |
| consumer = session.createConsumer(jmsQueue); |
| |
| for (int i = 0; i < 100; i++) { |
| purgeQueue.getPageSubscription().getPagingStore().startPaging(); |
| Assert.assertTrue(purgeQueue.getPageSubscription().isPaging()); |
| producer.send(session.createTextMessage("hello" + i)); |
| if (i % 2 == 0) { |
| session.commit(); |
| } |
| } |
| |
| session.commit(); |
| |
| Wait.assertTrue(purgeQueue.getPageSubscription()::isPaging); |
| |
| connection.start(); |
| |
| server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(50000); |
| Assert.assertNotNull(consumer.receive(5000)); |
| session.commit(); |
| |
| consumer.close(); |
| |
| Wait.assertEquals(0, purgeQueue::getMessageCount); |
| Wait.assertEquals(0, purgeQueue.getPageSubscription().getPagingStore()::getAddressSize); |
| Wait.assertFalse(purgeQueue.getPageSubscription()::isPaging); |
| |
| StorageManager sm = server.getStorageManager(); |
| |
| for (int i = 0; i < 1000; i++) { |
| long tx = sm.generateID(); |
| PageTransactionInfoImpl txinfo = new PageTransactionInfoImpl(tx); |
| sm.storePageTransaction(tx, txinfo); |
| sm.commit(tx); |
| tx = sm.generateID(); |
| sm.updatePageTransaction(tx, txinfo, 1); |
| sm.commit(tx); |
| } |
| |
| server.stop(); |
| server.start(); |
| Wait.assertEquals(0, ()->server.getPagingManager().getTransactions().size()); |
| } |
| |
| // First page is complete but it wasn't deleted |
| @Test |
| public void testFirstPageCompleteNotDeleted() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 20; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, true, true); |
| server.addAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST)); |
| Queue queue = server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false); |
| |
| queue.getPageSubscription().getPagingStore().startPaging(); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty("count", i); |
| |
| producer.send(message); |
| |
| if ((i + 1) % 5 == 0) { |
| session.commit(); |
| queue.getPageSubscription().getPagingStore().forceAnotherPage(); |
| } |
| } |
| |
| session.commit(); |
| producer.close(); |
| session.close(); |
| |
| // This will make the cursor to set the page complete and not actually delete it |
| queue.getPageSubscription().getPagingStore().disableCleanup(); |
| |
| session = sf.createSession(false, false, false); |
| |
| ClientConsumer consumer = session.createConsumer(ADDRESS); |
| session.start(); |
| |
| for (int i = 0; i < 5; i++) { |
| ClientMessage msg = consumer.receive(2000); |
| assertNotNull(msg); |
| assertEquals(i, msg.getIntProperty("count").intValue()); |
| |
| msg.individualAcknowledge(); |
| |
| System.out.println(msg); |
| } |
| |
| session.commit(); |
| |
| session.close(); |
| |
| server.stop(); |
| |
| server.start(); |
| |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(false, false, false); |
| |
| consumer = session.createConsumer(ADDRESS); |
| session.start(); |
| |
| for (int i = 5; i < numberOfMessages; i++) { |
| ClientMessage msg = consumer.receive(2000); |
| assertNotNull(msg); |
| assertEquals(i, msg.getIntProperty("count").intValue()); |
| msg.acknowledge(); |
| System.out.println(msg); |
| } |
| |
| assertNull(consumer.receiveImmediate()); |
| session.commit(); |
| |
| session.close(); |
| sf.close(); |
| locator.close(); |
| |
| } |
| |
| @Test |
| public void testPreparedACKAndRestart() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 50; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setAckBatchSize(0); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, true, true); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| Queue queue = server.locateQueue(PagingTest.ADDRESS); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| queue.getPageSubscription().getPagingStore().startPaging(); |
| |
| forcePage(queue); |
| |
| // Send many messages, 5 on each page |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage message = session.createMessage(true); |
| |
| message.putIntProperty("count", i); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| producer.send(message); |
| |
| if ((i + 1) % 5 == 0) { |
| System.out.println("Forcing at " + i); |
| session.commit(); |
| queue.getPageSubscription().getPagingStore().forceAnotherPage(); |
| } |
| } |
| |
| session.close(); |
| |
| session = sf.createSession(true, false, false); |
| |
| Xid xidConsumeNoCommit = newXID(); |
| session.start(xidConsumeNoCommit, XAResource.TMNOFLAGS); |
| |
| ClientConsumer cons = session.createConsumer(ADDRESS); |
| |
| session.start(); |
| |
| // First message is consumed, prepared, will be rolled back later |
| ClientMessage firstMessageConsumed = cons.receive(5000); |
| assertNotNull(firstMessageConsumed); |
| firstMessageConsumed.acknowledge(); |
| |
| session.end(xidConsumeNoCommit, XAResource.TMSUCCESS); |
| |
| session.prepare(xidConsumeNoCommit); |
| |
| Xid xidConsumeCommit = newXID(); |
| session.start(xidConsumeCommit, XAResource.TMNOFLAGS); |
| |
| Xid neverCommittedXID = newXID(); |
| |
| for (int i = 1; i < numberOfMessages; i++) { |
| if (i == 20) { |
| // I elected a single message to be in prepared state, it won't ever be committed |
| session.end(xidConsumeCommit, XAResource.TMSUCCESS); |
| session.commit(xidConsumeCommit, true); |
| session.start(neverCommittedXID, XAResource.TMNOFLAGS); |
| } |
| ClientMessage message = cons.receive(5000); |
| assertNotNull(message); |
| System.out.println("ACK " + i); |
| message.acknowledge(); |
| assertEquals(i, message.getIntProperty("count").intValue()); |
| if (i == 20) { |
| session.end(neverCommittedXID, XAResource.TMSUCCESS); |
| session.prepare(neverCommittedXID); |
| xidConsumeCommit = newXID(); |
| session.start(xidConsumeCommit, XAResource.TMNOFLAGS); |
| } |
| } |
| |
| session.end(xidConsumeCommit, XAResource.TMSUCCESS); |
| |
| session.commit(xidConsumeCommit, true); |
| |
| session.close(); |
| sf.close(); |
| |
| // Restart the server, and we expect cleanup to not destroy any page with prepared data |
| server.stop(); |
| |
| server.start(); |
| |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(false, true, true); |
| |
| queue = server.locateQueue(ADDRESS); |
| |
| assertTrue(queue.getPageSubscription().getPagingStore().isPaging()); |
| |
| producer = session.createProducer(ADDRESS); |
| |
| for (int i = numberOfMessages; i < numberOfMessages * 2; i++) { |
| ClientMessage message = session.createMessage(true); |
| |
| message.putIntProperty("count", i); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| producer.send(message); |
| |
| if ((i + 1) % 5 == 0) { |
| session.commit(); |
| queue.getPageSubscription().getPagingStore().forceAnotherPage(); |
| } |
| } |
| |
| cons = session.createConsumer(ADDRESS); |
| |
| session.start(); |
| |
| for (int i = numberOfMessages; i < numberOfMessages * 2; i++) { |
| ClientMessage message = cons.receive(5000); |
| assertNotNull(message); |
| assertEquals(i, message.getIntProperty("count").intValue()); |
| message.acknowledge(); |
| } |
| assertNull(cons.receiveImmediate()); |
| session.commit(); |
| |
| System.out.println("count = " + getMessageCount(queue)); |
| |
| session.commit(); |
| |
| session.close(); |
| |
| session = sf.createSession(true, false, false); |
| |
| session.rollback(xidConsumeNoCommit); |
| |
| session.start(); |
| |
| xidConsumeCommit = newXID(); |
| |
| session.start(xidConsumeCommit, XAResource.TMNOFLAGS); |
| cons = session.createConsumer(ADDRESS); |
| |
| session.start(); |
| |
| ClientMessage message = cons.receive(5000); |
| assertNotNull(message); |
| message.acknowledge(); |
| |
| session.end(xidConsumeCommit, XAResource.TMSUCCESS); |
| |
| session.commit(xidConsumeCommit, true); |
| |
| session.close(); |
| } |
| |
| /** |
| * @param queue |
| * @throws InterruptedException |
| */ |
| private void forcePage(Queue queue) throws InterruptedException { |
| for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && !queue.getPageSubscription().getPagingStore().isPaging(); ) { |
| Thread.sleep(10); |
| } |
| assertTrue(queue.getPageSubscription().getPagingStore().isPaging()); |
| } |
| |
| @Test |
| public void testMoveExpire() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalDirectory(getJournalDir()).setJournalSyncNonTransactional(false).setJournalCompactMinFiles(0) // disable compact |
| .setMessageExpiryScanPeriod(500); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setExpiryAddress(new SimpleString("EXP")).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); |
| |
| server.getAddressSettingsRepository().clear(); |
| |
| server.getAddressSettingsRepository().addMatch("#", defaultSetting); |
| |
| server.start(); |
| |
| final int numberOfMessages = 5000; |
| |
| locator = createInVMNonHALocator().setConsumerWindowSize(10 * 1024 * 1024).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| session.createQueue("EXP", "EXP", null, true); |
| |
| Queue queue1 = server.locateQueue(ADDRESS); |
| Queue qEXP = server.locateQueue(new SimpleString("EXP")); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| final int MESSAGE_SIZE = 1024; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage message = session.createMessage(true); |
| |
| if (i < 1000) { |
| message.setExpiration(System.currentTimeMillis() + 100); |
| } |
| |
| message.putIntProperty("tst-count", i); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| producer.close(); |
| |
| Wait.assertEquals(1000, qEXP::getMessageCount); |
| |
| session.start(); |
| |
| ClientConsumer consumer = session.createConsumer(ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages - 1000; i++) { |
| ClientMessage message = consumer.receive(5000); |
| assertNotNull(message); |
| message.acknowledge(); |
| assertTrue(message.getIntProperty("tst-count") >= 1000); |
| } |
| |
| session.commit(); |
| |
| assertNull(consumer.receiveImmediate()); |
| |
| Wait.assertEquals(0, queue1::getMessageCount); |
| |
| consumer.close(); |
| |
| consumer = session.createConsumer("EXP"); |
| |
| for (int i = 0; i < 1000; i++) { |
| ClientMessage message = consumer.receive(5000); |
| assertNotNull(message); |
| message.acknowledge(); |
| assertTrue(message.getIntProperty("tst-count") < 1000); |
| } |
| |
| assertNull(consumer.receiveImmediate()); |
| |
| // This is just to hold some messages as being delivered |
| ClientConsumerInternal cons = (ClientConsumerInternal) session.createConsumer(ADDRESS); |
| |
| session.commit(); |
| producer.close(); |
| session.close(); |
| |
| server.stop(); |
| } |
| |
| @Test |
| public void testDeleteQueueRestart() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalDirectory(getJournalDir()).setJournalSyncNonTransactional(false).setJournalCompactMinFiles(0); // disable compact |
| |
| ActiveMQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 5000; |
| |
| locator = createInVMNonHALocator().setConsumerWindowSize(10 * 1024 * 1024).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| SimpleString QUEUE2 = ADDRESS.concat("-2"); |
| |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| session.createQueue(PagingTest.ADDRESS, QUEUE2, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| // This is just to hold some messages as being delivered |
| ClientConsumerInternal cons = (ClientConsumerInternal) session.createConsumer(ADDRESS); |
| ClientConsumerInternal cons2 = (ClientConsumerInternal) session.createConsumer(QUEUE2); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| producer.close(); |
| session.start(); |
| |
| long timeout = System.currentTimeMillis() + 30000; |
| |
| // I want the buffer full to make sure there are pending messages on the server's side |
| while (System.currentTimeMillis() < timeout && (cons.getBufferSize() < 1000 || cons2.getBufferSize() < 1000)) { |
| System.out.println("cons1 buffer = " + cons.getBufferSize() + ", cons2 buffer = " + cons2.getBufferSize()); |
| Thread.sleep(100); |
| } |
| |
| assertTrue(cons.getBufferSize() >= 1000); |
| assertTrue(cons2.getBufferSize() >= 1000); |
| |
| session.close(); |
| |
| Queue queue = server.locateQueue(QUEUE2); |
| |
| long deletedQueueID = queue.getID(); |
| |
| server.destroyQueue(QUEUE2); |
| |
| sf.close(); |
| locator.close(); |
| locator = null; |
| sf = null; |
| |
| server.stop(); |
| |
| final HashMap<Integer, AtomicInteger> recordsType = countJournal(config); |
| |
| for (Map.Entry<Integer, AtomicInteger> entry : recordsType.entrySet()) { |
| System.out.println(entry.getKey() + "=" + entry.getValue()); |
| } |
| |
| assertNull("The system is acking page records instead of just delete data", recordsType.get(new Integer(JournalRecordIds.ACKNOWLEDGE_CURSOR))); |
| |
| Pair<List<RecordInfo>, List<PreparedTransactionInfo>> journalData = loadMessageJournal(config); |
| |
| HashSet<Long> deletedQueueReferences = new HashSet<>(); |
| |
| for (RecordInfo info : journalData.getA()) { |
| if (info.getUserRecordType() == JournalRecordIds.ADD_REF) { |
| DescribeJournal.ReferenceDescribe ref = (ReferenceDescribe) DescribeJournal.newObjectEncoding(info); |
| |
| if (ref.refEncoding.queueID == deletedQueueID) { |
| deletedQueueReferences.add(new Long(info.id)); |
| } |
| } else if (info.getUserRecordType() == JournalRecordIds.ACKNOWLEDGE_REF) { |
| AckDescribe ref = (AckDescribe) DescribeJournal.newObjectEncoding(info); |
| |
| if (ref.refEncoding.queueID == deletedQueueID) { |
| deletedQueueReferences.remove(new Long(info.id)); |
| } |
| } |
| } |
| |
| if (!deletedQueueReferences.isEmpty()) { |
| for (Long value : deletedQueueReferences) { |
| System.out.println("Deleted Queue still has a reference:" + value); |
| } |
| |
| fail("Deleted queue still have references"); |
| } |
| |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| locator.setConsumerWindowSize(10 * 1024 * 1024); |
| sf = locator.createSessionFactory(); |
| session = sf.createSession(false, false, false); |
| cons = (ClientConsumerInternal) session.createConsumer(ADDRESS); |
| session.start(); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = cons.receive(5000); |
| assertNotNull(message); |
| message.acknowledge(); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| producer.close(); |
| session.close(); |
| |
| queue = server.locateQueue(PagingTest.ADDRESS); |
| |
| Wait.assertEquals(0, queue::getMessageCount); |
| |
| timeout = System.currentTimeMillis() + 10000; |
| while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging()) { |
| Thread.sleep(100); |
| } |
| assertFalse(queue.getPageSubscription().getPagingStore().isPaging()); |
| |
| server.stop(); |
| } |
| |
| @Test |
| public void testPreparePersistent() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 500; |
| |
| final int numberOfTX = 10; |
| |
| final int messagesPerTX = numberOfMessages / numberOfTX; |
| |
| locator = createInVMNonHALocator(); |
| |
| locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(false); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| session.close(); |
| session = null; |
| |
| sf.close(); |
| locator.close(); |
| |
| server.stop(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| sf = createSessionFactory(locator); |
| |
| Queue queue = server.locateQueue(ADDRESS); |
| |
| Wait.assertEquals(numberOfMessages, queue::getMessageCount); |
| |
| LinkedList<Xid> xids = new LinkedList<>(); |
| |
| int msgReceived = 0; |
| for (int i = 0; i < numberOfTX; i++) { |
| ClientSession sessionConsumer = sf.createSession(true, false, false); |
| Xid xid = newXID(); |
| xids.add(xid); |
| sessionConsumer.start(xid, XAResource.TMNOFLAGS); |
| sessionConsumer.start(); |
| ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS); |
| for (int msgCount = 0; msgCount < messagesPerTX; msgCount++) { |
| if (msgReceived == numberOfMessages) { |
| break; |
| } |
| msgReceived++; |
| ClientMessage msg = consumer.receive(10000); |
| assertNotNull(msg); |
| msg.acknowledge(); |
| } |
| sessionConsumer.end(xid, XAResource.TMSUCCESS); |
| sessionConsumer.prepare(xid); |
| sessionConsumer.close(); |
| } |
| |
| ClientSession sessionCheck = sf.createSession(true, true); |
| |
| ClientConsumer consumer = sessionCheck.createConsumer(PagingTest.ADDRESS); |
| |
| assertNull(consumer.receiveImmediate()); |
| |
| sessionCheck.close(); |
| |
| Wait.assertEquals(numberOfMessages, queue::getMessageCount); |
| |
| sf.close(); |
| locator.close(); |
| |
| server.stop(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.start(); |
| |
| waitForServerToStart(server); |
| |
| queue = server.locateQueue(ADDRESS); |
| |
| locator = createInVMNonHALocator(); |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(true, false, false); |
| |
| consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| session.start(); |
| |
| Wait.assertEquals(numberOfMessages, queue::getMessageCount); |
| |
| ClientMessage msg = consumer.receive(5000); |
| if (msg != null) { |
| while (true) { |
| ClientMessage msg2 = consumer.receive(1000); |
| if (msg2 == null) { |
| break; |
| } |
| } |
| } |
| assertNull(msg); |
| |
| for (int i = xids.size() - 1; i >= 0; i--) { |
| Xid xid = xids.get(i); |
| session.rollback(xid); |
| } |
| |
| xids.clear(); |
| |
| session.close(); |
| |
| session = sf.createSession(false, false, false); |
| |
| session.start(); |
| |
| consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| msg = consumer.receive(1000); |
| assertNotNull(msg); |
| msg.acknowledge(); |
| |
| assertEquals(i, msg.getIntProperty("id").intValue()); |
| |
| if (i % 500 == 0) { |
| session.commit(); |
| } |
| } |
| |
| session.commit(); |
| |
| session.close(); |
| |
| sf.close(); |
| |
| locator.close(); |
| |
| Wait.assertEquals(0, queue::getMessageCount); |
| |
| waitForNotPaging(queue); |
| } |
| |
| @Test |
| public void testSendOverBlockingNoFlowControl() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.getAddressSettingsRepository().getMatch("#").setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); |
| |
| server.start(); |
| |
| final int biggerMessageSize = 10 * 1024; |
| |
| final int numberOfMessages = 500; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setProducerWindowSize(-1).setMinLargeMessageSize(1024 * 1024); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[biggerMessageSize]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= biggerMessageSize; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| |
| if (i % 10 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| |
| session.start(); |
| |
| ClientConsumer cons = session.createConsumer(ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = cons.receive(5000); |
| assertNotNull(message); |
| message.acknowledge(); |
| |
| if (i % 10 == 0) { |
| session.commit(); |
| } |
| } |
| |
| session.commit(); |
| |
| } |
| |
| @Test |
| public void testReceiveImmediate() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 1000; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| session.close(); |
| |
| session = null; |
| |
| sf.close(); |
| locator.close(); |
| |
| server.stop(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| sf = createSessionFactory(locator); |
| |
| Queue queue = server.locateQueue(ADDRESS); |
| |
| Wait.assertEquals(numberOfMessages, queue::getMessageCount); |
| |
| int msgReceived = 0; |
| ClientSession sessionConsumer = sf.createSession(false, false, false); |
| sessionConsumer.start(); |
| ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS); |
| for (int msgCount = 0; msgCount < numberOfMessages; msgCount++) { |
| log.info("Received " + msgCount); |
| msgReceived++; |
| ClientMessage msg = consumer.receiveImmediate(); |
| if (msg == null) { |
| log.info("It's null. leaving now"); |
| sessionConsumer.commit(); |
| fail("Didn't receive a message"); |
| } |
| msg.acknowledge(); |
| |
| if (msgCount % 5 == 0) { |
| log.info("commit"); |
| sessionConsumer.commit(); |
| } |
| } |
| |
| sessionConsumer.commit(); |
| |
| sessionConsumer.close(); |
| |
| sf.close(); |
| |
| locator.close(); |
| |
| Wait.assertEquals(0, queue::getMessageCount); |
| |
| long timeout = System.currentTimeMillis() + 5000; |
| while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging()) { |
| Thread.sleep(100); |
| } |
| assertFalse(queue.getPageSubscription().getPagingStore().isPaging()); |
| |
| } |
| |
| @Test |
| public void testInabilityToCreateDirectoryDuringPaging() throws Exception { |
| // this test only applies to file-based stores |
| Assume.assumeTrue(storeType == StoreConfiguration.StoreType.FILE); |
| |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setPagingDirectory("/" + UUID.randomUUID().toString()); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 100; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, true, true); |
| |
| session.createQueue(PagingTest.ADDRESS, RoutingType.MULTICAST, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| try { |
| producer.send(message); |
| } catch (Exception e) { |
| // ignore |
| } |
| } |
| assertTrue(Wait.waitFor(() -> server.getState() == ActiveMQServer.SERVER_STATE.STOPPED, 5000, 200)); |
| session.close(); |
| sf.close(); |
| locator.close(); |
| } |
| |
| /** |
| * This test will remove all the page directories during a restart, simulating a crash scenario. The server should still start after this |
| */ |
| @Test |
| public void testDeletePhysicalPages() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setPersistDeliveryCountBeforeDelivery(true); |
| |
| config.setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 1000; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| session.close(); |
| |
| session = null; |
| |
| sf.close(); |
| locator.close(); |
| |
| server.stop(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| sf = createSessionFactory(locator); |
| |
| Queue queue = server.locateQueue(ADDRESS); |
| |
| Wait.assertEquals(numberOfMessages, queue::getMessageCount); |
| |
| int msgReceived = 0; |
| ClientSession sessionConsumer = sf.createSession(false, false, false); |
| sessionConsumer.start(); |
| ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS); |
| for (int msgCount = 0; msgCount < numberOfMessages; msgCount++) { |
| log.info("Received " + msgCount); |
| msgReceived++; |
| ClientMessage msg = consumer.receiveImmediate(); |
| if (msg == null) { |
| log.info("It's null. leaving now"); |
| sessionConsumer.commit(); |
| fail("Didn't receive a message"); |
| } |
| msg.acknowledge(); |
| |
| if (msgCount % 5 == 0) { |
| log.info("commit"); |
| sessionConsumer.commit(); |
| } |
| } |
| |
| sessionConsumer.commit(); |
| |
| sessionConsumer.close(); |
| |
| sf.close(); |
| |
| locator.close(); |
| |
| Wait.assertEquals(0, queue::getMessageCount); |
| |
| long timeout = System.currentTimeMillis() + 5000; |
| while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging()) { |
| Thread.sleep(100); |
| } |
| assertFalse(queue.getPageSubscription().getPagingStore().isPaging()); |
| |
| server.stop(); |
| |
| // Deleting the paging data. Simulating a failure |
| // a dumb user, or anything that will remove the data |
| deleteDirectory(new File(getPageDir())); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.start(); |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| queue = server.locateQueue(ADDRESS); |
| |
| sf = createSessionFactory(locator); |
| session = sf.createSession(false, false, false); |
| |
| producer = session.createProducer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages * 2; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty(new SimpleString("theid"), i); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| |
| session.commit(); |
| |
| server.stop(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| sf = createSessionFactory(locator); |
| |
| msgReceived = 0; |
| sessionConsumer = sf.createSession(false, false, false); |
| sessionConsumer.start(); |
| consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS); |
| for (int msgCount = 0; msgCount < numberOfMessages; msgCount++) { |
| log.info("Received " + msgCount); |
| msgReceived++; |
| ClientMessage msg = consumer.receive(5000); |
| if (msg == null) { |
| log.info("It's null. leaving now"); |
| sessionConsumer.commit(); |
| fail("Didn't receive a message"); |
| } |
| System.out.println("Message " + msg.getIntProperty(SimpleString.toSimpleString("theid"))); |
| msg.acknowledge(); |
| |
| if (msgCount % 5 == 0) { |
| log.info("commit"); |
| sessionConsumer.commit(); |
| } |
| } |
| |
| sessionConsumer.commit(); |
| |
| sessionConsumer.close(); |
| |
| } |
| |
| // 4 messages are send/received, it creates 2 pages, where for second page there is no delete completion record in journal |
| // server is restarted and 4 messages sent/received again. There should be no lost message. |
| @Test |
| public void testRestartWithCompleteAndDeletedPhysicalPage() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| final AtomicBoolean mainCleanup = new AtomicBoolean(true); |
| |
| class InterruptedCursorProvider extends PageCursorProviderImpl { |
| |
| InterruptedCursorProvider(PagingStore pagingStore, |
| StorageManager storageManager, |
| ArtemisExecutor executor, |
| int maxCacheSize) { |
| super(pagingStore, storageManager, executor, maxCacheSize); |
| } |
| |
| @Override |
| public void cleanup() { |
| if (mainCleanup.get()) { |
| super.cleanup(); |
| } else { |
| try { |
| pagingStore.unlock(); |
| } catch (Throwable ignored) { |
| } |
| } |
| } |
| } |
| |
| server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) { |
| @Override |
| protected PagingStoreFactoryNIO getPagingStoreFactory() { |
| return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) { |
| @Override |
| public PageCursorProvider newCursorProvider(PagingStore store, |
| StorageManager storageManager, |
| AddressSettings addressSettings, |
| ArtemisExecutor executor) { |
| return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize()); |
| } |
| }; |
| } |
| |
| }; |
| |
| addServer(server); |
| |
| AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(MESSAGE_SIZE). |
| setMaxSizeBytes(2 * MESSAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); |
| |
| server.getAddressSettingsRepository().addMatch("#", defaultSetting); |
| |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(true, true, 0); |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| Queue queue = server.locateQueue(ADDRESS); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message; |
| |
| for (int i = 0; i < 4; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(new byte[MESSAGE_SIZE]); |
| |
| producer.send(message); |
| session.commit(); |
| |
| //last page (#2, whch contains only message #3) is marked as complete - is full - but no delete complete record is added |
| if (i == 3) { |
| queue.getPageSubscription().getPagingStore().forceAnotherPage(); |
| } |
| |
| } |
| |
| Wait.assertEquals(3, queue.getPageSubscription().getPagingStore()::getCurrentWritingPage); |
| |
| ClientConsumer consumer = session.createConsumer(ADDRESS); |
| session.start(); |
| |
| for (int i = 0; i < 4; i++) { |
| message = consumer.receive(5000); |
| Assert.assertNotNull("Before restart - message " + i + " is empty.", message); |
| message.acknowledge(); |
| } |
| |
| server.stop(); |
| mainCleanup.set(false); |
| |
| // Deleting the paging data. Simulating a failure |
| // a dumb user, or anything that will remove the data |
| deleteDirectory(new File(getPageDir())); |
| |
| logger.trace("Server restart"); |
| |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| sf = createSessionFactory(locator); |
| session = sf.createSession(null, null, false, false, true, false, 0); |
| producer = session.createProducer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < 4; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(new byte[MESSAGE_SIZE]); |
| |
| producer.send(message); |
| } |
| session.commit(); |
| |
| mainCleanup.set(true); |
| |
| queue = server.locateQueue(ADDRESS); |
| queue.getPageSubscription().cleanupEntries(false); |
| queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup(); |
| |
| consumer = session.createConsumer(ADDRESS); |
| session.start(); |
| |
| for (int i = 0; i < 4; i++) { |
| message = consumer.receive(5000); |
| Assert.assertNotNull("After restart - message " + i + " is empty.", message); |
| message.acknowledge(); |
| } |
| |
| server.stop(); |
| |
| } |
| |
| @Test |
| public void testMissingTXEverythingAcked() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 5000; |
| |
| final int numberOfTX = 10; |
| |
| final int messagesPerTX = numberOfMessages / numberOfTX; |
| |
| try { |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(ADDRESS.toString(), "q1", true); |
| |
| session.createQueue(ADDRESS.toString(), "q2", true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| if (i % messagesPerTX == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| session.close(); |
| |
| ArrayList<RecordInfo> records = new ArrayList<>(); |
| |
| List<PreparedTransactionInfo> list = new ArrayList<>(); |
| |
| server.getStorageManager().getMessageJournal().stop(); |
| |
| Journal jrn = server.getStorageManager().getMessageJournal(); |
| jrn.start(); |
| jrn.load(records, list, null); |
| |
| // Delete everything from the journal |
| for (RecordInfo info : records) { |
| if (!info.isUpdate && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COUNTER_INC && info.getUserRecordType() != JournalRecordIds.PAGE_CURSOR_COMPLETE) { |
| jrn.appendDeleteRecord(info.id, false); |
| } |
| } |
| |
| jrn.stop(); |
| |
| } finally { |
| try { |
| server.stop(); |
| } catch (Throwable ignored) { |
| } |
| } |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| ClientSessionFactory csf = createSessionFactory(locator); |
| |
| ClientSession sess = csf.createSession(); |
| |
| sess.start(); |
| |
| ClientConsumer cons = sess.createConsumer("q1"); |
| |
| assertNull(cons.receiveImmediate()); |
| |
| ClientConsumer cons2 = sess.createConsumer("q2"); |
| assertNull(cons2.receiveImmediate()); |
| |
| Queue q1 = server.locateQueue(new SimpleString("q1")); |
| Queue q2 = server.locateQueue(new SimpleString("q2")); |
| |
| q1.getPageSubscription().cleanupEntries(false); |
| q2.getPageSubscription().cleanupEntries(false); |
| |
| PageCursorProvider provider = q1.getPageSubscription().getPagingStore().getCursorProvider(); |
| provider.cleanup(); |
| |
| waitForNotPaging(q1); |
| |
| sess.close(); |
| } |
| |
| @Test |
| public void testMissingTXEverythingAcked2() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 6; |
| |
| final int numberOfTX = 2; |
| |
| final int messagesPerTX = numberOfMessages / numberOfTX; |
| |
| try { |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(ADDRESS.toString(), "q1", true); |
| |
| session.createQueue(ADDRESS.toString(), "q2", true); |
| |
| server.getPagingManager().getPageStore(ADDRESS).startPaging(); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putStringProperty("id", "str-" + i); |
| |
| producer.send(message); |
| if ((i + 1) % messagesPerTX == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| |
| session.start(); |
| |
| for (int i = 1; i <= 2; i++) { |
| ClientConsumer cons = session.createConsumer("q" + i); |
| |
| for (int j = 0; j < 3; j++) { |
| ClientMessage msg = cons.receive(5000); |
| |
| assertNotNull(msg); |
| |
| assertEquals("str-" + j, msg.getStringProperty("id")); |
| |
| msg.acknowledge(); |
| } |
| |
| session.commit(); |
| |
| } |
| |
| session.close(); |
| } finally { |
| locator.close(); |
| try { |
| server.stop(); |
| } catch (Throwable ignored) { |
| } |
| } |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| ClientSessionFactory csf = createSessionFactory(locator); |
| |
| ClientSession session = csf.createSession(); |
| |
| session.start(); |
| |
| for (int i = 1; i <= 2; i++) { |
| ClientConsumer cons = session.createConsumer("q" + i); |
| |
| for (int j = 3; j < 6; j++) { |
| ClientMessage msg = cons.receive(5000); |
| |
| assertNotNull(msg); |
| |
| assertEquals("str-" + j, msg.getStringProperty("id")); |
| |
| msg.acknowledge(); |
| } |
| |
| session.commit(); |
| assertNull(cons.receive(500)); |
| |
| } |
| |
| session.close(); |
| |
| long timeout = System.currentTimeMillis() + 5000; |
| |
| while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging()) { |
| Thread.sleep(100); |
| } |
| } |
| |
| @Test |
| public void testTwoQueuesOneNoRouting() throws Exception { |
| boolean persistentMessages = true; |
| |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 1000; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("-invalid"), new SimpleString(Filter.GENERIC_IGNORED_FILTER), true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(persistentMessages); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| |
| session.commit(); |
| |
| session.start(); |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = consumer.receive(5000); |
| assertNotNull(message); |
| message.acknowledge(); |
| |
| assertEquals(i, message.getIntProperty("id").intValue()); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| |
| session.commit(); |
| |
| session.commit(); |
| |
| session.commit(); |
| |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| store.getCursorProvider().cleanup(); |
| |
| long timeout = System.currentTimeMillis() + 5000; |
| while (store.isPaging() && timeout > System.currentTimeMillis()) { |
| Thread.sleep(100); |
| } |
| |
| // It's async, so need to wait a bit for it happening |
| assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging()); |
| } |
| |
| @Test |
| public void testSendReceivePagingPersistent() throws Exception { |
| internaltestSendReceivePaging(true); |
| } |
| |
| @Test |
| public void testSendReceivePagingNonPersistent() throws Exception { |
| internaltestSendReceivePaging(false); |
| } |
| |
| @Test |
| public void testWithDiverts() throws Exception { |
| internalMultiQueuesTest(true); |
| } |
| |
| @Test |
| public void testWithMultiQueues() throws Exception { |
| internalMultiQueuesTest(false); |
| } |
| |
| public void internalMultiQueuesTest(final boolean divert) throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| if (divert) { |
| DivertConfiguration divert1 = new DivertConfiguration().setName("dv1").setRoutingName("nm1").setAddress(PagingTest.ADDRESS.toString()).setForwardingAddress(PagingTest.ADDRESS.toString() + "-1").setExclusive(true); |
| |
| config.addDivertConfiguration(divert1); |
| |
| DivertConfiguration divert2 = new DivertConfiguration().setName("dv2").setRoutingName("nm2").setAddress(PagingTest.ADDRESS.toString()).setForwardingAddress(PagingTest.ADDRESS.toString() + "-2").setExclusive(true); |
| |
| config.addDivertConfiguration(divert2); |
| } |
| |
| server.start(); |
| |
| final int numberOfMessages = 3000; |
| |
| final byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| final AtomicBoolean running = new AtomicBoolean(true); |
| |
| class TCount extends Thread { |
| |
| Queue queue; |
| |
| TCount(Queue queue) { |
| this.queue = queue; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| while (running.get()) { |
| // this will be overusing what some users do. flush / getCount |
| getMessagesAdded(queue); |
| getMessageCount(queue); |
| Thread.sleep(10); |
| } |
| } catch (InterruptedException e) { |
| log.info("Thread interrupted"); |
| } |
| } |
| } |
| |
| TCount tcount1 = null; |
| TCount tcount2 = null; |
| |
| try { |
| { |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| if (divert) { |
| session.createQueue(PagingTest.ADDRESS + "-1", PagingTest.ADDRESS + "-1", null, true); |
| |
| session.createQueue(PagingTest.ADDRESS + "-2", PagingTest.ADDRESS + "-2", null, true); |
| } else { |
| session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-1", null, true); |
| |
| session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-2", null, true); |
| } |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| if (i % 500 == 0) { |
| log.info("Sent " + i + " messages"); |
| session.commit(); |
| } |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| } |
| |
| session.commit(); |
| |
| session.close(); |
| |
| server.stop(); |
| |
| sf.close(); |
| locator.close(); |
| } |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.start(); |
| |
| Queue queue1 = server.locateQueue(PagingTest.ADDRESS.concat("-1")); |
| |
| Queue queue2 = server.locateQueue(PagingTest.ADDRESS.concat("-2")); |
| |
| assertNotNull(queue1); |
| |
| assertNotNull(queue2); |
| |
| assertNotSame(queue1, queue2); |
| |
| tcount1 = new TCount(queue1); |
| |
| tcount2 = new TCount(queue2); |
| |
| tcount1.start(); |
| tcount2.start(); |
| |
| locator = createInVMNonHALocator(); |
| final ClientSessionFactory sf2 = createSessionFactory(locator); |
| |
| final AtomicInteger errors = new AtomicInteger(0); |
| |
| Thread[] threads = new Thread[2]; |
| |
| for (int start = 1; start <= 2; start++) { |
| |
| final String addressToSubscribe = PagingTest.ADDRESS + "-" + start; |
| |
| threads[start - 1] = new Thread() { |
| @Override |
| public void run() { |
| try { |
| ClientSession session = sf2.createSession(null, null, false, true, true, false, 0); |
| |
| ClientConsumer consumer = session.createConsumer(addressToSubscribe); |
| |
| session.start(); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| |
| Assert.assertNotNull(message2); |
| |
| Assert.assertEquals(i, message2.getIntProperty("id").intValue()); |
| |
| message2.acknowledge(); |
| |
| Assert.assertNotNull(message2); |
| |
| if (i % 100 == 0) { |
| if (i % 5000 == 0) { |
| log.info(addressToSubscribe + " consumed " + i + " messages"); |
| } |
| session.commit(); |
| } |
| |
| try { |
| assertBodiesEqual(body, message2.getBodyBuffer()); |
| } catch (AssertionError e) { |
| PagingTest.log.info("Expected buffer:" + ActiveMQTestBase.dumpBytesHex(body, 40)); |
| PagingTest.log.info("Arriving buffer:" + ActiveMQTestBase.dumpBytesHex(message2.getBodyBuffer().toByteBuffer().array(), 40)); |
| throw e; |
| } |
| } |
| |
| session.commit(); |
| |
| consumer.close(); |
| |
| session.close(); |
| } catch (Throwable e) { |
| e.printStackTrace(); |
| errors.incrementAndGet(); |
| } |
| |
| } |
| }; |
| } |
| |
| for (int i = 0; i < 2; i++) { |
| threads[i].start(); |
| } |
| |
| for (int i = 0; i < 2; i++) { |
| threads[i].join(); |
| } |
| |
| sf2.close(); |
| locator.close(); |
| |
| assertEquals(0, errors.get()); |
| |
| for (int i = 0; i < 20 && server.getPagingManager().getTransactions().size() != 0; i++) { |
| if (server.getPagingManager().getTransactions().size() != 0) { |
| // The delete may be asynchronous, giving some time case it eventually happen asynchronously |
| Thread.sleep(500); |
| } |
| } |
| |
| Wait.assertEquals(0, ()->server.getPagingManager().getTransactions().size()); |
| |
| } finally { |
| running.set(false); |
| |
| if (tcount1 != null) { |
| tcount1.interrupt(); |
| tcount1.join(); |
| } |
| |
| if (tcount2 != null) { |
| tcount2.interrupt(); |
| tcount2.join(); |
| } |
| |
| try { |
| server.stop(); |
| } catch (Throwable ignored) { |
| } |
| } |
| |
| } |
| |
| @Test |
| public void testMultiQueuesNonPersistentAndPersistent() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 3000; |
| |
| final byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| { |
| locator = createInVMNonHALocator(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-1", null, true); |
| |
| session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-2", null, false); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| if (i % 500 == 0) { |
| session.commit(); |
| } |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| } |
| |
| session.commit(); |
| |
| session.close(); |
| |
| server.stop(); |
| |
| sf.close(); |
| locator.close(); |
| } |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.start(); |
| |
| ServerLocator locator1 = createInVMNonHALocator(); |
| final ClientSessionFactory sf2 = locator1.createSessionFactory(); |
| |
| final AtomicInteger errors = new AtomicInteger(0); |
| |
| Thread t = new Thread() { |
| @Override |
| public void run() { |
| try { |
| ClientSession session = sf2.createSession(null, null, false, true, true, false, 0); |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS + "-1"); |
| |
| session.start(); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| |
| Assert.assertNotNull(message2); |
| |
| Assert.assertEquals(i, message2.getIntProperty("id").intValue()); |
| |
| message2.acknowledge(); |
| |
| Assert.assertNotNull(message2); |
| |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| |
| try { |
| assertBodiesEqual(body, message2.getBodyBuffer()); |
| } catch (AssertionError e) { |
| PagingTest.log.info("Expected buffer:" + ActiveMQTestBase.dumpBytesHex(body, 40)); |
| PagingTest.log.info("Arriving buffer:" + ActiveMQTestBase.dumpBytesHex(message2.getBodyBuffer().toByteBuffer().array(), 40)); |
| throw e; |
| } |
| } |
| |
| session.commit(); |
| |
| consumer.close(); |
| |
| session.close(); |
| } catch (Throwable e) { |
| e.printStackTrace(); |
| errors.incrementAndGet(); |
| } |
| |
| } |
| }; |
| |
| t.start(); |
| t.join(); |
| |
| assertEquals(0, errors.get()); |
| |
| for (int i = 0; i < 20 && server.getPagingManager().getPageStore(ADDRESS).isPaging(); i++) { |
| // The delete may be asynchronous, giving some time case it eventually happen asynchronously |
| Thread.sleep(500); |
| } |
| |
| assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging()); |
| |
| for (int i = 0; i < 20 && server.getPagingManager().getTransactions().size() != 0; i++) { |
| // The delete may be asynchronous, giving some time case it eventually happen asynchronously |
| Thread.sleep(500); |
| } |
| |
| Wait.assertEquals(0, ()->server.getPagingManager().getTransactions().size()); |
| |
| } |
| |
| private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception { |
| |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfIntegers = 256; |
| |
| final int numberOfMessages = 1000; |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(null, null, false, true, true, false, 0); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| Queue queue = server.locateQueue(ADDRESS); |
| queue.getPageSubscription().getPagingStore().startPaging(); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[numberOfIntegers * 4]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= numberOfIntegers; j++) { |
| bb.putInt(j); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(persistentMessages); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| } |
| |
| session.close(); |
| sf.close(); |
| locator.close(); |
| |
| server.stop(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(null, null, false, true, true, false, 0); |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| session.start(); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| |
| Assert.assertNotNull(message2); |
| |
| Assert.assertEquals(i, message2.getIntProperty("id").intValue()); |
| |
| assertEquals(body.length, message2.getBodySize()); |
| |
| message2.acknowledge(); |
| |
| Assert.assertNotNull(message2); |
| |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| |
| try { |
| assertBodiesEqual(body, message2.getBodyBuffer()); |
| } catch (AssertionError e) { |
| PagingTest.log.info("Expected buffer:" + ActiveMQTestBase.dumpBytesHex(body, 40)); |
| PagingTest.log.info("Arriving buffer:" + ActiveMQTestBase.dumpBytesHex(message2.getBodyBuffer().toByteBuffer().array(), 40)); |
| throw e; |
| } |
| } |
| |
| consumer.close(); |
| |
| session.close(); |
| } |
| |
| private void assertBodiesEqual(final byte[] body, final ActiveMQBuffer buffer) { |
| byte[] other = new byte[body.length]; |
| |
| buffer.readBytes(other); |
| |
| ActiveMQTestBase.assertEqualsByteArrays(body, other); |
| } |
| |
| /** |
| * - Make a destination in page mode |
| * - Add stuff to a transaction |
| * - Consume the entire destination (not in page mode any more) |
| * - Add stuff to a transaction again |
| * - Check order |
| */ |
| @Test |
| public void testDepageDuringTransaction() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(null, null, false, true, true, false, 0); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| // ActiveMQBuffer bodyLocal = ActiveMQChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers); |
| |
| ClientMessage message = null; |
| |
| int numberOfMessages = 0; |
| while (true) { |
| message = session.createMessage(true); |
| message.getBodyBuffer().writeBytes(body); |
| |
| // Stop sending message as soon as we start paging |
| if (server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging()) { |
| break; |
| } |
| numberOfMessages++; |
| |
| producer.send(message); |
| } |
| |
| Assert.assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging()); |
| |
| session.start(); |
| |
| ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0); |
| |
| ClientProducer producerTransacted = sessionTransacted.createProducer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < 10; i++) { |
| message = session.createMessage(true); |
| message.getBodyBuffer().writeBytes(body); |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| // Consume messages to force an eventual out of order delivery |
| if (i == 5) { |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| for (int j = 0; j < numberOfMessages; j++) { |
| ClientMessage msg = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| msg.acknowledge(); |
| Assert.assertNotNull(msg); |
| } |
| |
| Assert.assertNull(consumer.receiveImmediate()); |
| consumer.close(); |
| } |
| |
| Integer messageID = (Integer) message.getObjectProperty(new SimpleString("id")); |
| Assert.assertNotNull(messageID); |
| Assert.assertEquals(messageID.intValue(), i); |
| |
| producerTransacted.send(message); |
| } |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| Assert.assertNull(consumer.receiveImmediate()); |
| |
| sessionTransacted.commit(); |
| |
| sessionTransacted.close(); |
| |
| for (int i = 0; i < 10; i++) { |
| message = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| |
| Assert.assertNotNull(message); |
| |
| Integer messageID = (Integer) message.getObjectProperty(new SimpleString("id")); |
| |
| Assert.assertNotNull(messageID); |
| Assert.assertEquals("message received out of order", messageID.intValue(), i); |
| |
| message.acknowledge(); |
| } |
| |
| Assert.assertNull(consumer.receiveImmediate()); |
| |
| consumer.close(); |
| |
| session.close(); |
| } |
| |
| /** |
| * - Make a destination in page mode |
| * - Add stuff to a transaction |
| * - Consume the entire destination (not in page mode any more) |
| * - Add stuff to a transaction again |
| * - Check order |
| * <br> |
| * Test under discussion at : http://community.jboss.org/thread/154061?tstart=0 |
| */ |
| @Test |
| public void testDepageDuringTransaction2() throws Exception { |
| boolean IS_DURABLE_MESSAGE = true; |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0); |
| ClientProducer producerTransacted = sessionTransacted.createProducer(PagingTest.ADDRESS); |
| |
| ClientSession session = sf.createSession(null, null, false, true, true, false, 0); |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientMessage firstMessage = sessionTransacted.createMessage(IS_DURABLE_MESSAGE); |
| firstMessage.getBodyBuffer().writeBytes(body); |
| firstMessage.putIntProperty(new SimpleString("id"), 0); |
| |
| producerTransacted.send(firstMessage); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| int numberOfMessages = 0; |
| while (true) { |
| System.out.println("Sending message " + numberOfMessages); |
| message = session.createMessage(IS_DURABLE_MESSAGE); |
| message.getBodyBuffer().writeBytes(body); |
| message.putIntProperty("id", numberOfMessages); |
| message.putBooleanProperty("new", false); |
| |
| // Stop sending message as soon as we start paging |
| if (server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging()) { |
| break; |
| } |
| numberOfMessages++; |
| |
| producer.send(message); |
| } |
| |
| Assert.assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging()); |
| |
| session.start(); |
| |
| for (int i = 1; i < 10; i++) { |
| message = session.createMessage(true); |
| message.getBodyBuffer().writeBytes(body); |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| // Consume messages to force an eventual out of order delivery |
| if (i == 5) { |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| for (int j = 0; j < numberOfMessages; j++) { |
| ClientMessage msg = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| msg.acknowledge(); |
| assertEquals(j, msg.getIntProperty("id").intValue()); |
| assertFalse(msg.getBooleanProperty("new")); |
| Assert.assertNotNull(msg); |
| } |
| |
| ClientMessage msgReceived = consumer.receiveImmediate(); |
| |
| Assert.assertNull(msgReceived); |
| consumer.close(); |
| } |
| |
| Integer messageID = (Integer) message.getObjectProperty(new SimpleString("id")); |
| Assert.assertNotNull(messageID); |
| Assert.assertEquals(messageID.intValue(), i); |
| |
| producerTransacted.send(message); |
| } |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| Assert.assertNull(consumer.receiveImmediate()); |
| |
| sessionTransacted.commit(); |
| |
| sessionTransacted.close(); |
| |
| for (int i = 0; i < 10; i++) { |
| message = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| |
| Assert.assertNotNull(message); |
| |
| Integer messageID = (Integer) message.getObjectProperty(new SimpleString("id")); |
| |
| // System.out.println(messageID); |
| Assert.assertNotNull(messageID); |
| Assert.assertEquals("message received out of order", i, messageID.intValue()); |
| |
| message.acknowledge(); |
| } |
| |
| Assert.assertNull(consumer.receiveImmediate()); |
| |
| consumer.close(); |
| |
| session.close(); |
| |
| } |
| |
| @Test |
| public void testDepageDuringTransaction3() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0); |
| ClientProducer producerTransacted = sessionTransacted.createProducer(PagingTest.ADDRESS); |
| |
| ClientSession sessionNonTX = sf.createSession(true, true, 0); |
| sessionNonTX.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producerNonTransacted = sessionNonTX.createProducer(PagingTest.ADDRESS); |
| |
| sessionNonTX.start(); |
| |
| for (int i = 0; i < 50; i++) { |
| ClientMessage message = sessionNonTX.createMessage(true); |
| message.getBodyBuffer().writeBytes(body); |
| message.putIntProperty(new SimpleString("id"), i); |
| message.putStringProperty(new SimpleString("tst"), new SimpleString("i=" + i)); |
| |
| producerTransacted.send(message); |
| |
| if (i % 2 == 0) { |
| for (int j = 0; j < 20; j++) { |
| ClientMessage msgSend = sessionNonTX.createMessage(true); |
| msgSend.putStringProperty(new SimpleString("tst"), new SimpleString("i=" + i + ", j=" + j)); |
| msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]); |
| producerNonTransacted.send(msgSend); |
| } |
| assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging()); |
| } else { |
| ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS); |
| for (int j = 0; j < 20; j++) { |
| ClientMessage msgReceived = consumer.receive(10000); |
| assertNotNull(msgReceived); |
| msgReceived.acknowledge(); |
| } |
| consumer.close(); |
| } |
| } |
| |
| ClientConsumer consumerNonTX = sessionNonTX.createConsumer(PagingTest.ADDRESS); |
| while (true) { |
| ClientMessage msgReceived = consumerNonTX.receive(1000); |
| if (msgReceived == null) { |
| break; |
| } |
| msgReceived.acknowledge(); |
| } |
| consumerNonTX.close(); |
| |
| ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS); |
| |
| Assert.assertNull(consumer.receiveImmediate()); |
| |
| sessionTransacted.commit(); |
| |
| sessionTransacted.close(); |
| |
| for (int i = 0; i < 50; i++) { |
| ClientMessage message = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| |
| Assert.assertNotNull(message); |
| |
| Integer messageID = (Integer) message.getObjectProperty(new SimpleString("id")); |
| |
| Assert.assertNotNull(messageID); |
| Assert.assertEquals("message received out of order", i, messageID.intValue()); |
| |
| message.acknowledge(); |
| } |
| |
| Assert.assertNull(consumer.receiveImmediate()); |
| |
| consumer.close(); |
| |
| sessionNonTX.close(); |
| } |
| |
| @Test |
| public void testDepageDuringTransaction4() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalSyncTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final AtomicInteger errors = new AtomicInteger(0); |
| |
| final int numberOfMessages = 10000; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(false); |
| |
| sf = createSessionFactory(locator); |
| |
| final byte[] body = new byte[MESSAGE_SIZE]; |
| |
| Thread producerThread = new Thread() { |
| @Override |
| public void run() { |
| ClientSession sessionProducer = null; |
| try { |
| sessionProducer = sf.createSession(false, false); |
| ClientProducer producer = sessionProducer.createProducer(ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage msg = sessionProducer.createMessage(true); |
| msg.getBodyBuffer().writeBytes(body); |
| msg.putIntProperty("count", i); |
| producer.send(msg); |
| |
| if (i % 100 == 0 && i != 0) { |
| sessionProducer.commit(); |
| // Thread.sleep(500); |
| } |
| } |
| |
| sessionProducer.commit(); |
| |
| } catch (Throwable e) { |
| e.printStackTrace(); // >> junit report |
| errors.incrementAndGet(); |
| } finally { |
| try { |
| if (sessionProducer != null) { |
| sessionProducer.close(); |
| } |
| } catch (Throwable e) { |
| e.printStackTrace(); |
| errors.incrementAndGet(); |
| } |
| } |
| } |
| }; |
| |
| ClientSession session = sf.createSession(true, true, 0); |
| session.start(); |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| producerThread.start(); |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage msg = consumer.receive(5000); |
| assertNotNull(msg); |
| assertEquals(i, msg.getIntProperty("count").intValue()); |
| msg.acknowledge(); |
| if (i > 0 && i % 10 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| |
| session.close(); |
| |
| producerThread.join(); |
| |
| locator.close(); |
| |
| sf.close(); |
| |
| assertEquals(0, errors.get()); |
| } |
| |
| @Test |
| public void testOrderingNonTX() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalSyncTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_SIZE * 2); |
| |
| server.getConfiguration(); |
| server.getConfiguration(); |
| |
| server.start(); |
| |
| final AtomicInteger errors = new AtomicInteger(0); |
| |
| final int numberOfMessages = 2000; |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| final CountDownLatch ready = new CountDownLatch(1); |
| |
| final byte[] body = new byte[MESSAGE_SIZE]; |
| |
| Thread producerThread = new Thread() { |
| @Override |
| public void run() { |
| ClientSession sessionProducer = null; |
| try { |
| sessionProducer = sf.createSession(true, true); |
| ClientProducer producer = sessionProducer.createProducer(ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage msg = sessionProducer.createMessage(true); |
| msg.getBodyBuffer().writeBytes(body); |
| msg.putIntProperty("count", i); |
| producer.send(msg); |
| |
| if (i == 1000) { |
| // The session is not TX, but we do this just to perform a round trip to the server |
| // and make sure there are no pending messages |
| sessionProducer.commit(); |
| |
| assertTrue(server.getPagingManager().getPageStore(ADDRESS).isPaging()); |
| ready.countDown(); |
| } |
| } |
| |
| sessionProducer.commit(); |
| |
| log.info("Producer gone"); |
| |
| } catch (Throwable e) { |
| e.printStackTrace(); // >> junit report |
| errors.incrementAndGet(); |
| } finally { |
| try { |
| if (sessionProducer != null) { |
| sessionProducer.close(); |
| } |
| } catch (Throwable e) { |
| e.printStackTrace(); |
| errors.incrementAndGet(); |
| } |
| } |
| } |
| }; |
| |
| ClientSession session = sf.createSession(true, true, 0); |
| session.start(); |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| producerThread.start(); |
| |
| assertTrue(ready.await(100, TimeUnit.SECONDS)); |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage msg = consumer.receive(5000); |
| assertNotNull(msg); |
| if (i != msg.getIntProperty("count").intValue()) { |
| log.info("Received " + i + " with property = " + msg.getIntProperty("count")); |
| log.info("###### different"); |
| } |
| // assertEquals(i, msg.getIntProperty("count").intValue()); |
| msg.acknowledge(); |
| } |
| |
| session.close(); |
| |
| producerThread.join(); |
| |
| assertEquals(0, errors.get()); |
| } |
| |
| @Test |
| public void testPageOnSchedulingNoRestart() throws Exception { |
| internalTestPageOnScheduling(false); |
| } |
| |
| @Test |
| public void testPageOnSchedulingRestart() throws Exception { |
| internalTestPageOnScheduling(true); |
| } |
| |
| public void internalTestPageOnScheduling(final boolean restart) throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 1000; |
| |
| final int numberOfBytes = 1024; |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(null, null, false, true, true, false, 0); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[numberOfBytes]; |
| |
| for (int j = 0; j < numberOfBytes; j++) { |
| body[j] = ActiveMQTestBase.getSamplebyte(j); |
| } |
| |
| long scheduledTime = System.currentTimeMillis() + 5000; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| message.getBodyBuffer().writeBytes(body); |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| PagingStore store = server.getPagingManager().getPageStore(PagingTest.ADDRESS); |
| |
| // Worse scenario possible... only schedule what's on pages |
| if (store.getCurrentPage() != null) { |
| message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, scheduledTime); |
| } |
| |
| producer.send(message); |
| } |
| |
| if (restart) { |
| session.close(); |
| |
| server.stop(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.start(); |
| |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(null, null, false, true, true, false, 0); |
| } |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| session.start(); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| |
| ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| |
| Assert.assertNotNull(message2); |
| |
| message2.acknowledge(); |
| |
| Assert.assertNotNull(message2); |
| |
| Long scheduled = (Long) message2.getObjectProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); |
| if (scheduled != null) { |
| Assert.assertTrue("Scheduling didn't work", System.currentTimeMillis() >= scheduledTime); |
| } |
| |
| try { |
| assertBodiesEqual(body, message2.getBodyBuffer()); |
| } catch (AssertionError e) { |
| PagingTest.log.info("Expected buffer:" + ActiveMQTestBase.dumpBytesHex(body, 40)); |
| PagingTest.log.info("Arriving buffer:" + ActiveMQTestBase.dumpBytesHex(message2.getBodyBuffer().toByteBuffer().array(), 40)); |
| throw e; |
| } |
| } |
| |
| consumer.close(); |
| |
| session.close(); |
| } |
| |
| @Test |
| public void testRollbackOnSend() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfIntegers = 256; |
| |
| final int numberOfMessages = 10; |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(null, null, false, false, true, false, 0); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| for (int j = 1; j <= numberOfIntegers; j++) { |
| bodyLocal.writeInt(j); |
| } |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| } |
| |
| session.rollback(); |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| session.start(); |
| |
| Assert.assertNull(consumer.receiveImmediate()); |
| |
| session.close(); |
| } |
| |
| @Test |
| public void testRollbackOnSendThenSendMore() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(null, null, false, false, true, false, 0); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| Queue queue = server.locateQueue(ADDRESS); |
| |
| queue.getPageSubscription().getPagingStore().startPaging(); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message; |
| |
| for (int i = 0; i < 20; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(new byte[100 * 4]); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| session.commit(); |
| queue.getPageSubscription().getPagingStore().forceAnotherPage(); |
| |
| } |
| |
| for (int i = 20; i < 24; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(new byte[100 * 4]); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| } |
| |
| session.rollback(); |
| |
| ClientSession consumerSession = sf.createSession(false, false); |
| |
| queue.getPageSubscription().getPagingStore().disableCleanup(); |
| |
| queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup(); |
| |
| consumerSession.start(); |
| ClientConsumer consumer = consumerSession.createConsumer(ADDRESS, SimpleString.toSimpleString("id > 0")); |
| for (int i = 0; i < 19; i++) { |
| ClientMessage messageRec = consumer.receive(5000); |
| System.err.println("msg::" + messageRec); |
| Assert.assertNotNull(messageRec); |
| messageRec.acknowledge(); |
| consumerSession.commit(); |
| |
| // The only reason I'm calling cleanup directly is that it would be easy to debug in case of bugs |
| // if you see an issue with cleanup here, enjoy debugging this method |
| queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup(); |
| } |
| queue.getPageSubscription().getPagingStore().enableCleanup(); |
| |
| consumerSession.close(); |
| |
| session.close(); |
| sf.close(); |
| |
| server.stop(); |
| } |
| |
| // The pages are complete, and this is simulating a scenario where the server crashed before deleting the pages. |
| @Test |
| public void testRestartWithComplete() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| final AtomicBoolean mainCleanup = new AtomicBoolean(true); |
| |
| class InterruptedCursorProvider extends PageCursorProviderImpl { |
| |
| InterruptedCursorProvider(PagingStore pagingStore, |
| StorageManager storageManager, |
| ArtemisExecutor executor, |
| int maxCacheSize) { |
| super(pagingStore, storageManager, executor, maxCacheSize); |
| } |
| |
| @Override |
| public void cleanup() { |
| if (mainCleanup.get()) { |
| super.cleanup(); |
| } else { |
| try { |
| pagingStore.unlock(); |
| } catch (Throwable ignored) { |
| } |
| } |
| } |
| } |
| |
| server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) { |
| @Override |
| protected PagingStoreFactoryNIO getPagingStoreFactory() { |
| return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) { |
| @Override |
| public PageCursorProvider newCursorProvider(PagingStore store, |
| StorageManager storageManager, |
| AddressSettings addressSettings, |
| ArtemisExecutor executor) { |
| return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize()); |
| } |
| }; |
| } |
| |
| }; |
| |
| addServer(server); |
| |
| AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PagingTest.PAGE_SIZE).setMaxSizeBytes(PagingTest.PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); |
| |
| server.getAddressSettingsRepository().addMatch("#", defaultSetting); |
| |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(true, true, 0); |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| final Queue queue1 = server.locateQueue(ADDRESS); |
| |
| queue1.getPageSubscription().getPagingStore().startPaging(); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message; |
| |
| for (int i = 0; i < 20; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(new byte[100 * 4]); |
| |
| message.putIntProperty(new SimpleString("idi"), i); |
| |
| producer.send(message); |
| session.commit(); |
| if (i < 19) { |
| queue1.getPageSubscription().getPagingStore().forceAnotherPage(); |
| } |
| |
| } |
| |
| Wait.assertEquals(20, ()->queue1.getPageSubscription().getPagingStore().getCurrentWritingPage()); |
| |
| // This will force a scenario where the pages are cleaned up. When restarting we need to check if the current page is complete |
| // if it is complete we must move to another page avoiding races on cleanup |
| // which could happen during a crash / restart |
| long tx = server.getStorageManager().generateID(); |
| for (int i = 1; i <= 20; i++) { |
| server.getStorageManager().storePageCompleteTransactional(tx, queue1.getID(), new PagePositionImpl(i, 1)); |
| } |
| |
| server.getStorageManager().commit(tx); |
| |
| session.close(); |
| sf.close(); |
| |
| server.stop(); |
| mainCleanup.set(false); |
| |
| logger.trace("Server restart"); |
| |
| server.start(); |
| |
| Queue queue = server.locateQueue(ADDRESS); |
| |
| locator = createInVMNonHALocator(); |
| sf = createSessionFactory(locator); |
| session = sf.createSession(null, null, false, false, true, false, 0); |
| producer = session.createProducer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < 10; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(new byte[100 * 4]); |
| |
| message.putIntProperty(new SimpleString("newid"), i); |
| |
| producer.send(message); |
| session.commit(); |
| |
| if (i == 5) { |
| queue.getPageSubscription().getPagingStore().forceAnotherPage(); |
| } |
| } |
| |
| mainCleanup.set(true); |
| |
| queue = server.locateQueue(ADDRESS); |
| queue.getPageSubscription().cleanupEntries(false); |
| queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup(); |
| |
| ClientConsumer consumer = session.createConsumer(ADDRESS); |
| session.start(); |
| |
| for (int i = 0; i < 10; i++) { |
| message = consumer.receive(5000); |
| Assert.assertNotNull(message); |
| Assert.assertEquals(i, message.getIntProperty("newid").intValue()); |
| message.acknowledge(); |
| } |
| |
| server.stop(); |
| |
| } |
| |
| @Test |
| public void testCommitOnSend() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfIntegers = 10; |
| |
| final int numberOfMessages = 500; |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(null, null, false, false, false, false, 0); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| for (int j = 1; j <= numberOfIntegers; j++) { |
| bodyLocal.writeInt(j); |
| } |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| } |
| |
| session.commit(); |
| |
| session.close(); |
| |
| locator.close(); |
| |
| locator = createInVMNonHALocator(); |
| |
| server.stop(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(null, null, false, false, false, false, 0); |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| session.start(); |
| for (int i = 0; i < numberOfMessages; i++) { |
| if (i == 55) { |
| System.out.println("i = 55"); |
| } |
| ClientMessage msg = consumer.receive(5000); |
| Assert.assertNotNull(msg); |
| msg.acknowledge(); |
| session.commit(); |
| } |
| |
| session.close(); |
| } |
| |
| @Test |
| public void testParialConsume() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 1000; |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(null, null, false, false, false, false, 0); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(new byte[1024]); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| } |
| |
| session.commit(); |
| |
| session.close(); |
| |
| locator.close(); |
| |
| server.stop(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(null, null, false, false, false, false, 0); |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| session.start(); |
| // 347 = I just picked any odd number, not rounded, to make sure it's not at the beginning of any page |
| for (int i = 0; i < 347; i++) { |
| ClientMessage msg = consumer.receive(5000); |
| assertEquals(i, msg.getIntProperty("id").intValue()); |
| Assert.assertNotNull(msg); |
| msg.acknowledge(); |
| session.commit(); |
| } |
| |
| session.close(); |
| |
| locator.close(); |
| |
| server.stop(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(null, null, false, false, false, false, 0); |
| |
| consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| session.start(); |
| for (int i = 347; i < numberOfMessages; i++) { |
| ClientMessage msg = consumer.receive(5000); |
| assertEquals(i, msg.getIntProperty("id").intValue()); |
| Assert.assertNotNull(msg); |
| msg.acknowledge(); |
| session.commit(); |
| } |
| |
| session.close(); |
| } |
| |
| @Test |
| public void testPageMultipleDestinations() throws Exception { |
| internalTestPageMultipleDestinations(false); |
| } |
| |
| @Test |
| public void testPageMultipleDestinationsTransacted() throws Exception { |
| internalTestPageMultipleDestinations(true); |
| } |
| |
| @Test |
| public void testDropMessagesPersistent() throws Exception { |
| testDropMessages(true); |
| } |
| |
| @Test |
| public void testDropMessagesNonPersistent() throws Exception { |
| testDropMessages(false); |
| } |
| |
| public void testDropMessages(final boolean persistent) throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| HashMap<String, AddressSettings> settings = new HashMap<>(); |
| |
| AddressSettings set = new AddressSettings(); |
| set.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP); |
| |
| settings.put(PagingTest.ADDRESS.toString(), set); |
| |
| server = createServer(persistent, createDefaultInVMConfig(), 1024, 10 * 1024, settings); |
| |
| server.start(); |
| |
| final int numberOfMessages = 1000; |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(null, null, false, true, true, false, 0); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| byte[] body = new byte[2048]; |
| |
| message = session.createMessage(true); |
| message.getBodyBuffer().writeBytes(body); |
| |
| producer.send(message); |
| } |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| session.start(); |
| |
| for (int i = 0; i < 5; i++) { |
| ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| |
| Assert.assertNotNull(message2); |
| |
| message2.acknowledge(); |
| } |
| |
| Assert.assertNull(consumer.receiveImmediate()); |
| |
| Wait.assertEquals(0, ()->server.getPagingManager().getPageStore(PagingTest.ADDRESS).getAddressSize()); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| byte[] body = new byte[2048]; |
| |
| message = session.createMessage(true); |
| message.getBodyBuffer().writeBytes(body); |
| |
| producer.send(message); |
| } |
| |
| for (int i = 0; i < 5; i++) { |
| ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| |
| Assert.assertNotNull(message2); |
| |
| message2.acknowledge(); |
| } |
| |
| Assert.assertNull(consumer.receiveImmediate()); |
| |
| session.close(); |
| |
| session = sf.createSession(false, true, true); |
| |
| producer = session.createProducer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| byte[] body = new byte[2048]; |
| |
| message = session.createMessage(true); |
| message.getBodyBuffer().writeBytes(body); |
| |
| producer.send(message); |
| } |
| |
| session.commit(); |
| |
| consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| session.start(); |
| |
| for (int i = 0; i < 5; i++) { |
| ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| |
| Assert.assertNotNull(message2); |
| |
| message2.acknowledge(); |
| } |
| |
| session.commit(); |
| |
| Assert.assertNull(consumer.receiveImmediate()); |
| |
| session.close(); |
| |
| Wait.assertEquals(0, ()->server.getPagingManager().getPageStore(PagingTest.ADDRESS).getAddressSize()); |
| } |
| |
| @Test |
| public void testDropMessagesExpiring() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| HashMap<String, AddressSettings> settings = new HashMap<>(); |
| |
| AddressSettings set = new AddressSettings(); |
| set.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP); |
| |
| settings.put(PagingTest.ADDRESS.toString(), set); |
| |
| server = createServer(true, createDefaultInVMConfig(), 1024, 1024 * 1024, settings); |
| |
| server.start(); |
| |
| final int numberOfMessages = 30000; |
| |
| locator.setAckBatchSize(0); |
| |
| sf = createSessionFactory(locator); |
| ClientSession sessionProducer = sf.createSession(); |
| |
| sessionProducer.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = sessionProducer.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| ClientSession sessionConsumer = sf.createSession(); |
| |
| class MyHandler implements MessageHandler { |
| |
| int count; |
| |
| @Override |
| public void onMessage(ClientMessage message1) { |
| try { |
| Thread.sleep(1); |
| } catch (Exception e) { |
| |
| } |
| |
| count++; |
| |
| if (count % 1000 == 0) { |
| log.info("received " + count); |
| } |
| |
| try { |
| message1.acknowledge(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| |
| ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS); |
| |
| sessionConsumer.start(); |
| |
| consumer.setMessageHandler(new MyHandler()); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| byte[] body = new byte[1024]; |
| |
| message = sessionProducer.createMessage(false); |
| message.getBodyBuffer().writeBytes(body); |
| |
| message.setExpiration(System.currentTimeMillis() + 100); |
| |
| producer.send(message); |
| } |
| |
| sessionProducer.close(); |
| sessionConsumer.close(); |
| } |
| |
| private void internalTestPageMultipleDestinations(final boolean transacted) throws Exception { |
| Configuration config = createDefaultInVMConfig(); |
| |
| final int NUMBER_OF_BINDINGS = 100; |
| |
| int NUMBER_OF_MESSAGES = 2; |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| ClientSessionFactory sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(null, null, false, !transacted, true, false, 0); |
| |
| for (int i = 0; i < NUMBER_OF_BINDINGS; i++) { |
| session.createQueue(PagingTest.ADDRESS, new SimpleString("someQueue" + i), null, true); |
| } |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[1024]; |
| |
| message = session.createMessage(true); |
| message.getBodyBuffer().writeBytes(body); |
| |
| for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { |
| producer.send(message); |
| |
| if (transacted) { |
| session.commit(); |
| } |
| } |
| |
| session.close(); |
| |
| server.stop(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.start(); |
| |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(null, null, false, true, true, false, 0); |
| |
| session.start(); |
| |
| for (int msg = 0; msg < NUMBER_OF_MESSAGES; msg++) { |
| |
| for (int i = 0; i < NUMBER_OF_BINDINGS; i++) { |
| ClientConsumer consumer = session.createConsumer(new SimpleString("someQueue" + i)); |
| |
| ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT); |
| |
| Assert.assertNotNull(message2); |
| |
| message2.acknowledge(); |
| |
| Assert.assertNotNull(message2); |
| |
| consumer.close(); |
| |
| } |
| } |
| |
| session.close(); |
| |
| for (int i = 0; i < NUMBER_OF_BINDINGS; i++) { |
| Queue queue = (Queue) server.getPostOffice().getBinding(new SimpleString("someQueue" + i)).getBindable(); |
| |
| Assert.assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, getMessageCount(queue)); |
| Assert.assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, queue.getDeliveringCount()); |
| } |
| } |
| |
| @Test |
| public void testSyncPage() throws Exception { |
| Configuration config = createDefaultInVMConfig(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| try { |
| server.addAddressInfo(new AddressInfo(PagingTest.ADDRESS, RoutingType.ANYCAST)); |
| server.createQueue(PagingTest.ADDRESS, RoutingType.ANYCAST, PagingTest.ADDRESS, null, true, false); |
| |
| final CountDownLatch pageUp = new CountDownLatch(0); |
| final CountDownLatch pageDone = new CountDownLatch(1); |
| |
| OperationContext ctx = new DummyOperationContext(pageUp, pageDone); |
| |
| OperationContextImpl.setContext(ctx); |
| |
| PagingManager paging = server.getPagingManager(); |
| |
| PagingStore store = paging.getPageStore(ADDRESS); |
| |
| store.sync(); |
| |
| assertTrue(pageUp.await(10, TimeUnit.SECONDS)); |
| |
| assertTrue(pageDone.await(10, TimeUnit.SECONDS)); |
| |
| server.stop(); |
| |
| } finally { |
| try { |
| server.stop(); |
| } catch (Throwable ignored) { |
| } |
| |
| OperationContextImpl.clearContext(); |
| } |
| |
| } |
| |
| @Test |
| public void testSyncPageTX() throws Exception { |
| Configuration config = createDefaultInVMConfig(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| server.createQueue(PagingTest.ADDRESS, RoutingType.ANYCAST, PagingTest.ADDRESS, null, true, false); |
| |
| final CountDownLatch pageUp = new CountDownLatch(0); |
| final CountDownLatch pageDone = new CountDownLatch(1); |
| |
| OperationContext ctx = new DummyOperationContext(pageUp, pageDone); |
| OperationContextImpl.setContext(ctx); |
| |
| PagingManager paging = server.getPagingManager(); |
| |
| PagingStore store = paging.getPageStore(ADDRESS); |
| |
| store.sync(); |
| |
| assertTrue(pageUp.await(10, TimeUnit.SECONDS)); |
| |
| assertTrue(pageDone.await(10, TimeUnit.SECONDS)); |
| } |
| |
| @Test |
| public void testPagingOneDestinationOnly() throws Exception { |
| SimpleString PAGED_ADDRESS = new SimpleString("paged"); |
| SimpleString NON_PAGED_ADDRESS = new SimpleString("non-paged"); |
| |
| Configuration configuration = createDefaultInVMConfig(); |
| |
| Map<String, AddressSettings> addresses = new HashMap<>(); |
| |
| addresses.put("#", new AddressSettings()); |
| |
| AddressSettings pagedDestination = new AddressSettings().setPageSizeBytes(1024).setMaxSizeBytes(10 * 1024); |
| |
| addresses.put(PAGED_ADDRESS.toString(), pagedDestination); |
| |
| server = createServer(true, configuration, -1, -1, addresses); |
| |
| server.start(); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, true, false); |
| |
| session.createQueue(PAGED_ADDRESS, PAGED_ADDRESS, true); |
| |
| session.createQueue(NON_PAGED_ADDRESS, NON_PAGED_ADDRESS, true); |
| |
| ClientProducer producerPaged = session.createProducer(PAGED_ADDRESS); |
| ClientProducer producerNonPaged = session.createProducer(NON_PAGED_ADDRESS); |
| |
| int NUMBER_OF_MESSAGES = 100; |
| |
| for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { |
| ClientMessage msg = session.createMessage(true); |
| msg.getBodyBuffer().writeBytes(new byte[512]); |
| |
| producerPaged.send(msg); |
| producerNonPaged.send(msg); |
| } |
| |
| session.close(); |
| |
| Assert.assertTrue(server.getPagingManager().getPageStore(PAGED_ADDRESS).isPaging()); |
| Assert.assertFalse(server.getPagingManager().getPageStore(NON_PAGED_ADDRESS).isPaging()); |
| |
| session = sf.createSession(false, true, false); |
| |
| session.start(); |
| |
| ClientConsumer consumerNonPaged = session.createConsumer(NON_PAGED_ADDRESS); |
| ClientConsumer consumerPaged = session.createConsumer(PAGED_ADDRESS); |
| |
| ClientMessage[] ackList = new ClientMessage[NUMBER_OF_MESSAGES]; |
| |
| for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { |
| ClientMessage msg = consumerNonPaged.receive(5000); |
| Assert.assertNotNull(msg); |
| ackList[i] = msg; |
| } |
| |
| Assert.assertNull(consumerNonPaged.receiveImmediate()); |
| |
| for (ClientMessage ack : ackList) { |
| ack.acknowledge(); |
| } |
| |
| consumerNonPaged.close(); |
| |
| session.commit(); |
| |
| ackList = null; |
| |
| for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { |
| ClientMessage msg = consumerPaged.receive(5000); |
| Assert.assertNotNull(msg); |
| msg.acknowledge(); |
| session.commit(); |
| } |
| |
| Assert.assertNull(consumerPaged.receiveImmediate()); |
| |
| session.close(); |
| } |
| |
| @Test |
| public void testPagingDifferentSizes() throws Exception { |
| SimpleString PAGED_ADDRESS_A = new SimpleString("paged-a"); |
| SimpleString PAGED_ADDRESS_B = new SimpleString("paged-b"); |
| |
| Configuration configuration = createDefaultInVMConfig(); |
| |
| Map<String, AddressSettings> addresses = new HashMap<>(); |
| |
| addresses.put("#", new AddressSettings()); |
| |
| AddressSettings pagedDestinationA = new AddressSettings().setPageSizeBytes(1024).setMaxSizeBytes(10 * 1024); |
| |
| int NUMBER_MESSAGES_BEFORE_PAGING = 11; |
| |
| addresses.put(PAGED_ADDRESS_A.toString(), pagedDestinationA); |
| |
| AddressSettings pagedDestinationB = new AddressSettings().setPageSizeBytes(2024).setMaxSizeBytes(20 * 1024); |
| |
| addresses.put(PAGED_ADDRESS_B.toString(), pagedDestinationB); |
| |
| server = createServer(true, configuration, -1, -1, addresses); |
| server.start(); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, true, false); |
| |
| session.createQueue(PAGED_ADDRESS_A, PAGED_ADDRESS_A, true); |
| |
| session.createQueue(PAGED_ADDRESS_B, PAGED_ADDRESS_B, true); |
| |
| ClientProducer producerA = session.createProducer(PAGED_ADDRESS_A); |
| ClientProducer producerB = session.createProducer(PAGED_ADDRESS_B); |
| |
| int NUMBER_OF_MESSAGES = 100; |
| |
| for (int i = 0; i < NUMBER_MESSAGES_BEFORE_PAGING; i++) { |
| ClientMessage msg = session.createMessage(true); |
| msg.getBodyBuffer().writeBytes(new byte[896]); |
| |
| producerA.send(msg); |
| producerB.send(msg); |
| } |
| |
| session.commit(); // commit was called to clean the buffer only (making sure everything is on the server side) |
| |
| Assert.assertTrue(server.getPagingManager().getPageStore(PAGED_ADDRESS_A).isPaging()); |
| Assert.assertFalse(server.getPagingManager().getPageStore(PAGED_ADDRESS_B).isPaging()); |
| |
| for (int i = 0; i < NUMBER_MESSAGES_BEFORE_PAGING; i++) { |
| ClientMessage msg = session.createMessage(true); |
| msg.getBodyBuffer().writeBytes(new byte[896]); |
| |
| producerA.send(msg); |
| producerB.send(msg); |
| } |
| |
| session.commit(); // commit was called to clean the buffer only (making sure everything is on the server side) |
| |
| Assert.assertTrue(server.getPagingManager().getPageStore(PAGED_ADDRESS_A).isPaging()); |
| Assert.assertTrue(server.getPagingManager().getPageStore(PAGED_ADDRESS_B).isPaging()); |
| |
| for (int i = NUMBER_MESSAGES_BEFORE_PAGING * 2; i < NUMBER_OF_MESSAGES; i++) { |
| ClientMessage msg = session.createMessage(true); |
| msg.getBodyBuffer().writeBytes(new byte[896]); |
| |
| producerA.send(msg); |
| producerB.send(msg); |
| } |
| |
| session.close(); |
| |
| Assert.assertTrue(server.getPagingManager().getPageStore(PAGED_ADDRESS_A).isPaging()); |
| Assert.assertTrue(server.getPagingManager().getPageStore(PAGED_ADDRESS_B).isPaging()); |
| |
| session = sf.createSession(null, null, false, true, true, false, 0); |
| |
| session.start(); |
| |
| ClientConsumer consumerA = session.createConsumer(PAGED_ADDRESS_A); |
| |
| ClientConsumer consumerB = session.createConsumer(PAGED_ADDRESS_B); |
| |
| for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { |
| ClientMessage msg = consumerA.receive(5000); |
| Assert.assertNotNull("Couldn't receive a message on consumerA, iteration = " + i, msg); |
| msg.acknowledge(); |
| } |
| |
| Assert.assertNull(consumerA.receiveImmediate()); |
| |
| consumerA.close(); |
| |
| Assert.assertTrue(server.getPagingManager().getPageStore(PAGED_ADDRESS_B).isPaging()); |
| |
| for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { |
| ClientMessage msg = consumerB.receive(5000); |
| Assert.assertNotNull(msg); |
| msg.acknowledge(); |
| session.commit(); |
| } |
| |
| Assert.assertNull(consumerB.receiveImmediate()); |
| |
| consumerB.close(); |
| |
| session.close(); |
| } |
| |
| @Test |
| public void testPageAndDepageRapidly() throws Exception { |
| boolean persistentMessages = true; |
| |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalFileSize(10 * 1024 * 1024); |
| |
| server = createServer(true, config, 100 * 1024, 1024 * 1024 / 2); |
| |
| server.start(); |
| |
| final int messageSize = 51527; |
| |
| final int numberOfMessages = 200; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(true, true); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| final AtomicInteger errors = new AtomicInteger(0); |
| |
| Thread consumeThread = new Thread() { |
| @Override |
| public void run() { |
| ClientSession sessionConsumer = null; |
| try { |
| sessionConsumer = sf.createSession(false, false); |
| sessionConsumer.start(); |
| |
| ClientConsumer cons = sessionConsumer.createConsumer(ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage msg = cons.receive(PagingTest.RECEIVE_TIMEOUT); |
| assertNotNull(msg); |
| msg.acknowledge(); |
| |
| if (i % 20 == 0) { |
| sessionConsumer.commit(); |
| } |
| } |
| sessionConsumer.commit(); |
| } catch (Throwable e) { |
| e.printStackTrace(); |
| errors.incrementAndGet(); |
| } finally { |
| try { |
| sessionConsumer.close(); |
| } catch (ActiveMQException e) { |
| e.printStackTrace(); |
| errors.incrementAndGet(); |
| } |
| } |
| |
| } |
| }; |
| |
| consumeThread.start(); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[messageSize]; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(persistentMessages); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty(new SimpleString("id"), i); |
| |
| producer.send(message); |
| |
| Thread.sleep(50); |
| } |
| |
| consumeThread.join(); |
| |
| assertEquals(0, errors.get()); |
| |
| long timeout = System.currentTimeMillis() + 5000; |
| |
| while (System.currentTimeMillis() < timeout && (server.getPagingManager().getPageStore(ADDRESS).isPaging() || server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages() != 1)) { |
| Thread.sleep(1); |
| } |
| |
| // It's async, so need to wait a bit for it happening |
| assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging()); |
| |
| Wait.assertEquals(1, ()->server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages()); |
| } |
| |
| @Test |
| public void testTwoQueuesDifferentFilters() throws Exception { |
| boolean persistentMessages = true; |
| |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 200; |
| locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| // note: if you want to change this, numberOfMessages has to be a multiple of NQUEUES |
| int NQUEUES = 2; |
| |
| for (int i = 0; i < NQUEUES; i++) { |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=" + i), new SimpleString("propTest=" + i), true); |
| } |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(persistentMessages); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty("propTest", i % NQUEUES); |
| message.putIntProperty("id", i); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| |
| session.commit(); |
| |
| session.start(); |
| |
| for (int nqueue = 0; nqueue < NQUEUES; nqueue++) { |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + nqueue)); |
| |
| for (int i = 0; i < (numberOfMessages / NQUEUES); i++) { |
| message = consumer.receive(500000); |
| assertNotNull(message); |
| message.acknowledge(); |
| |
| assertEquals(nqueue, message.getIntProperty("propTest").intValue()); |
| } |
| |
| assertNull(consumer.receiveImmediate()); |
| |
| consumer.close(); |
| |
| session.commit(); |
| } |
| |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| store.getCursorProvider().cleanup(); |
| |
| long timeout = System.currentTimeMillis() + 5000; |
| while (store.isPaging() && timeout > System.currentTimeMillis()) { |
| Thread.sleep(100); |
| } |
| |
| // It's async, so need to wait a bit for it happening |
| assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging()); |
| } |
| |
| @Test |
| public void testTwoQueues() throws Exception { |
| boolean persistentMessages = true; |
| |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int messageSize = 1024; |
| |
| final int numberOfMessages = 1000; |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), null, true); |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[messageSize]; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(persistentMessages); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty("propTest", i % 2 == 0 ? 1 : 2); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| |
| session.commit(); |
| |
| session.start(); |
| |
| for (int msg = 1; msg <= 2; msg++) { |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + msg)); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = consumer.receive(5000); |
| assertNotNull(message); |
| message.acknowledge(); |
| |
| System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest")); |
| } |
| |
| session.commit(); |
| |
| assertNull(consumer.receiveImmediate()); |
| |
| consumer.close(); |
| } |
| |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| store.getCursorProvider().cleanup(); |
| |
| long timeout = System.currentTimeMillis() + 5000; |
| while (store.isPaging() && timeout > System.currentTimeMillis()) { |
| Thread.sleep(100); |
| } |
| |
| store.getCursorProvider().cleanup(); |
| |
| waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1"))); |
| |
| sf.close(); |
| |
| locator.close(); |
| } finally { |
| try { |
| server.stop(); |
| } catch (Throwable ignored) { |
| } |
| } |
| } |
| |
| @Test |
| public void testTwoQueuesAndOneInativeQueue() throws Exception { |
| boolean persistentMessages = true; |
| |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), null, true); |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), null, true); |
| |
| // A queue with an impossible filter |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("-3"), new SimpleString("nothing='something'"), true); |
| |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| |
| Queue queue = server.locateQueue(PagingTest.ADDRESS.concat("=1")); |
| |
| queue.getPageSubscription().getPagingStore().startPaging(); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = session.createMessage(persistentMessages); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(new byte[1024]); |
| |
| producer.send(message); |
| |
| session.commit(); |
| |
| session.start(); |
| |
| for (int msg = 1; msg <= 2; msg++) { |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=" + msg)); |
| |
| message = consumer.receive(5000); |
| assertNotNull(message); |
| message.acknowledge(); |
| |
| assertNull(consumer.receiveImmediate()); |
| |
| consumer.close(); |
| } |
| |
| session.commit(); |
| session.close(); |
| |
| store.getCursorProvider().cleanup(); |
| |
| waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1"))); |
| |
| sf.close(); |
| |
| locator.close(); |
| } finally { |
| try { |
| server.stop(); |
| } catch (Throwable ignored) { |
| } |
| } |
| } |
| |
| @Test |
| public void testTwoQueuesConsumeOneRestart() throws Exception { |
| boolean persistentMessages = true; |
| |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int messageSize = 1024; |
| |
| final int numberOfMessages = 1000; |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), null, true); |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[messageSize]; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(persistentMessages); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty("propTest", i % 2 == 0 ? 1 : 2); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| |
| session.commit(); |
| |
| session.start(); |
| |
| session.deleteQueue(PagingTest.ADDRESS.concat("=1")); |
| |
| sf = locator.createSessionFactory(); |
| |
| session = sf.createSession(false, false, false); |
| |
| session.start(); |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=2")); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = consumer.receive(5000); |
| assertNotNull(message); |
| message.acknowledge(); |
| } |
| |
| session.commit(); |
| |
| assertNull(consumer.receiveImmediate()); |
| |
| consumer.close(); |
| |
| long timeout = System.currentTimeMillis() + 10000; |
| |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| |
| // It's async, so need to wait a bit for it happening |
| while (timeout > System.currentTimeMillis() && store.isPaging()) { |
| Thread.sleep(100); |
| } |
| |
| assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging()); |
| |
| server.stop(); |
| |
| server.start(); |
| |
| server.stop(); |
| server.start(); |
| |
| sf.close(); |
| |
| locator.close(); |
| } finally { |
| try { |
| server.stop(); |
| } catch (Throwable ignored) { |
| } |
| } |
| } |
| |
| @Test |
| public void testDLAOnLargeMessageAndPaging() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setThreadPoolMaxSize(5).setJournalSyncNonTransactional(false); |
| |
| Map<String, AddressSettings> settings = new HashMap<>(); |
| AddressSettings dla = new AddressSettings().setMaxDeliveryAttempts(5).setDeadLetterAddress(new SimpleString("DLA")).setRedeliveryDelay(0); |
| settings.put(ADDRESS.toString(), dla); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings); |
| |
| server.start(); |
| |
| final int messageSize = 1024; |
| |
| ServerLocator locator = null; |
| ClientSessionFactory sf = null; |
| ClientSession session = null; |
| try { |
| locator = createInVMNonHALocator(); |
| |
| locator.setBlockOnNonDurableSend(true); |
| locator.setBlockOnDurableSend(true); |
| |
| sf = locator.createSessionFactory(); |
| |
| session = sf.createSession(false, false, false); |
| |
| session.createQueue(ADDRESS, ADDRESS, true); |
| session.createQueue("DLA", "DLA", true); |
| |
| Queue serverQueue = server.locateQueue(ADDRESS); |
| Queue serverQueueDLA = server.locateQueue(SimpleString.toSimpleString("DLA")); |
| |
| PagingStore pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS); |
| pgStoreAddress.startPaging(); |
| PagingStore pgStoreDLA = server.getPagingManager().getPageStore(new SimpleString("DLA")); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < 100; i++) { |
| log.debug("send message #" + i); |
| ClientMessage message = session.createMessage(true); |
| |
| message.putStringProperty("id", "str" + i); |
| |
| message.setBodyInputStream(createFakeLargeStream(messageSize)); |
| |
| producer.send(message); |
| |
| if ((i + 1) % 2 == 0) { |
| session.commit(); |
| } |
| } |
| |
| session.commit(); |
| |
| session.start(); |
| |
| ClientConsumer cons = session.createConsumer(ADDRESS); |
| |
| for (int msgNr = 0; msgNr < 2; msgNr++) { |
| for (int i = 0; i < 5; i++) { |
| ClientMessage msg = cons.receive(5000); |
| |
| assertNotNull(msg); |
| |
| msg.acknowledge(); |
| |
| for (int j = 0; j < messageSize; j++) { |
| assertEquals(getSamplebyte(j), msg.getBodyBuffer().readByte()); |
| } |
| |
| session.rollback(); |
| } |
| |
| pgStoreDLA.startPaging(); |
| } |
| |
| for (int i = 2; i < 100; i++) { |
| log.debug("Received message " + i); |
| ClientMessage message = cons.receive(5000); |
| assertNotNull("Message " + i + " wasn't received", message); |
| message.acknowledge(); |
| |
| final AtomicInteger bytesOutput = new AtomicInteger(0); |
| |
| message.setOutputStream(new OutputStream() { |
| @Override |
| public void write(int b) throws IOException { |
| bytesOutput.incrementAndGet(); |
| } |
| }); |
| |
| try { |
| if (!message.waitOutputStreamCompletion(10000)) { |
| log.info(threadDump("dump")); |
| fail("Couldn't finish large message receiving"); |
| } |
| } catch (Throwable e) { |
| log.info("output bytes = " + bytesOutput); |
| log.info(threadDump("dump")); |
| fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") + " with messageID=" + message.getMessageID()); |
| } |
| |
| } |
| |
| assertNull(cons.receiveImmediate()); |
| |
| cons.close(); |
| |
| cons = session.createConsumer("DLA"); |
| |
| for (int i = 0; i < 2; i++) { |
| assertNotNull(cons.receive(5000)); |
| } |
| |
| sf.close(); |
| |
| session.close(); |
| |
| locator.close(); |
| |
| server.stop(); |
| |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| |
| sf = locator.createSessionFactory(); |
| |
| session = sf.createSession(false, false); |
| |
| session.start(); |
| |
| cons = session.createConsumer(ADDRESS); |
| |
| for (int i = 2; i < 100; i++) { |
| log.debug("Received message " + i); |
| ClientMessage message = cons.receive(5000); |
| assertNotNull(message); |
| |
| assertEquals("str" + i, message.getStringProperty("id")); |
| |
| message.acknowledge(); |
| |
| message.setOutputStream(new OutputStream() { |
| @Override |
| public void write(int b) throws IOException { |
| |
| } |
| }); |
| |
| assertTrue(message.waitOutputStreamCompletion(5000)); |
| } |
| |
| assertNull(cons.receiveImmediate()); |
| |
| cons.close(); |
| |
| cons = session.createConsumer("DLA"); |
| |
| for (int msgNr = 0; msgNr < 2; msgNr++) { |
| ClientMessage msg = cons.receive(10000); |
| |
| assertNotNull(msg); |
| |
| assertEquals("str" + msgNr, msg.getStringProperty("id")); |
| |
| for (int i = 0; i < messageSize; i++) { |
| assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte()); |
| } |
| |
| msg.acknowledge(); |
| } |
| |
| cons.close(); |
| |
| cons = session.createConsumer(ADDRESS); |
| |
| session.commit(); |
| |
| assertNull(cons.receiveImmediate()); |
| |
| long timeout = System.currentTimeMillis() + 5000; |
| |
| pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS); |
| |
| pgStoreAddress.getCursorProvider().getSubscription(serverQueue.getID()).cleanupEntries(false); |
| |
| pgStoreAddress.getCursorProvider().cleanup(); |
| |
| while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging()) { |
| Thread.sleep(50); |
| } |
| |
| assertFalse(pgStoreAddress.isPaging()); |
| |
| session.commit(); |
| } finally { |
| session.close(); |
| sf.close(); |
| locator.close(); |
| try { |
| server.stop(); |
| } catch (Throwable ignored) { |
| } |
| } |
| } |
| |
| @Test |
| public void testExpireLargeMessageOnPaging() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setMessageExpiryScanPeriod(500).setJournalSyncNonTransactional(false); |
| |
| Map<String, AddressSettings> settings = new HashMap<>(); |
| AddressSettings dla = new AddressSettings().setMaxDeliveryAttempts(5).setDeadLetterAddress(new SimpleString("DLA")).setExpiryAddress(new SimpleString("DLA")); |
| settings.put(ADDRESS.toString(), dla); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings); |
| |
| server.start(); |
| |
| final int messageSize = 20; |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(ADDRESS, ADDRESS, true); |
| |
| session.createQueue("DLA", "DLA", true); |
| |
| PagingStore pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS); |
| pgStoreAddress.startPaging(); |
| PagingStore pgStoreDLA = server.getPagingManager().getPageStore(new SimpleString("DLA")); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| for (int i = 0; i < 500; i++) { |
| if (i % 100 == 0) |
| log.info("send message #" + i); |
| message = session.createMessage(true); |
| |
| message.putStringProperty("id", "str" + i); |
| |
| message.setExpiration(System.currentTimeMillis() + 2000); |
| |
| if (i % 2 == 0) { |
| message.setBodyInputStream(createFakeLargeStream(messageSize)); |
| } else { |
| byte[] bytes = new byte[messageSize]; |
| for (int s = 0; s < bytes.length; s++) { |
| bytes[s] = getSamplebyte(s); |
| } |
| message.getBodyBuffer().writeBytes(bytes); |
| } |
| |
| producer.send(message); |
| |
| if ((i + 1) % 2 == 0) { |
| session.commit(); |
| if (i < 400) { |
| pgStoreAddress.forceAnotherPage(); |
| } |
| } |
| } |
| |
| session.commit(); |
| |
| sf.close(); |
| |
| locator.close(); |
| |
| server.stop(); |
| |
| Thread.sleep(3000); |
| |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| |
| sf = locator.createSessionFactory(); |
| |
| session = sf.createSession(false, false); |
| |
| session.start(); |
| |
| ClientConsumer consAddr = session.createConsumer(ADDRESS); |
| |
| assertNull(consAddr.receive(1000)); |
| |
| ClientConsumer cons = session.createConsumer("DLA"); |
| |
| for (int i = 0; i < 500; i++) { |
| log.info("Received message " + i); |
| message = cons.receive(10000); |
| assertNotNull(message); |
| message.acknowledge(); |
| |
| message.saveToOutputStream(new OutputStream() { |
| @Override |
| public void write(int b) throws IOException { |
| |
| } |
| }); |
| } |
| |
| assertNull(cons.receiveImmediate()); |
| |
| session.commit(); |
| |
| cons.close(); |
| |
| long timeout = System.currentTimeMillis() + 5000; |
| |
| pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS); |
| |
| while (timeout > System.currentTimeMillis() && pgStoreAddress.isPaging()) { |
| Thread.sleep(50); |
| } |
| |
| assertFalse(pgStoreAddress.isPaging()); |
| |
| session.close(); |
| } finally { |
| locator.close(); |
| try { |
| server.stop(); |
| } catch (Throwable ignored) { |
| } |
| } |
| } |
| |
| @Test |
| /** |
| * When running this test from an IDE add this to the test command line so that the AssertionLoggerHandler works properly: |
| * |
| * -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dlogging.configuration=file:<path_to_source>/tests/config/logging.properties |
| * |
| * Note: Idea should get these from the pom and you shouldn't need to do this. |
| */ public void testFailMessagesNonDurable() throws Exception { |
| AssertionLoggerHandler.startCapture(); |
| |
| try { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| HashMap<String, AddressSettings> settings = new HashMap<>(); |
| |
| AddressSettings set = new AddressSettings(); |
| set.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); |
| |
| settings.put(PagingTest.ADDRESS.toString(), set); |
| |
| server = createServer(true, config, 1024, 5 * 1024, settings); |
| |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(true, true, 0); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = session.createMessage(false); |
| |
| int biggerMessageSize = 1024; |
| byte[] body = new byte[biggerMessageSize]; |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| for (int j = 1; j <= biggerMessageSize; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| message.getBodyBuffer().writeBytes(body); |
| |
| // Send enough messages to fill up the address, but don't test for an immediate exception because we do not block |
| // on non-durable send. Instead of receiving an exception immediately the exception will be logged on the server. |
| for (int i = 0; i < 32; i++) { |
| producer.send(message); |
| } |
| |
| // allow time for the logging to actually happen on the server |
| Thread.sleep(100); |
| |
| Assert.assertTrue("Expected to find AMQ224016", AssertionLoggerHandler.findText("AMQ224016")); |
| |
| ClientConsumer consumer = session.createConsumer(ADDRESS); |
| |
| session.start(); |
| |
| // Once the destination is full and the client has run out of credits then it will receive an exception |
| for (int i = 0; i < 10; i++) { |
| validateExceptionOnSending(producer, message); |
| } |
| |
| // Receive a message.. this should release credits |
| ClientMessage msgReceived = consumer.receive(5000); |
| assertNotNull(msgReceived); |
| msgReceived.acknowledge(); |
| session.commit(); // to make sure it's on the server (roundtrip) |
| |
| boolean exception = false; |
| |
| try { |
| for (int i = 0; i < 1000; i++) { |
| // this send will succeed on the server |
| producer.send(message); |
| } |
| } catch (Exception e) { |
| exception = true; |
| } |
| |
| assertTrue("Expected to throw an exception", exception); |
| } finally { |
| AssertionLoggerHandler.stopCapture(); |
| } |
| } |
| |
| @Test |
| public void testFailMessagesDurable() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| HashMap<String, AddressSettings> settings = new HashMap<>(); |
| |
| AddressSettings set = new AddressSettings(); |
| set.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); |
| |
| settings.put(PagingTest.ADDRESS.toString(), set); |
| |
| server = createServer(true, config, 1024, 5 * 1024, settings); |
| |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(true, true, 0); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = session.createMessage(true); |
| |
| int biggerMessageSize = 2048; |
| byte[] body = new byte[biggerMessageSize]; |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| for (int j = 1; j <= biggerMessageSize; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| message.getBodyBuffer().writeBytes(body); |
| |
| // Send enough messages to fill up the address and test for an exception. |
| // The address will actually fill up after 3 messages. Also, it takes 32 messages for the client's |
| // credits to run out. |
| for (int i = 0; i < 50; i++) { |
| if (i > 2) { |
| validateExceptionOnSending(producer, message); |
| } else { |
| producer.send(message); |
| } |
| } |
| |
| ClientConsumer consumer = session.createConsumer(ADDRESS); |
| |
| session.start(); |
| |
| // Receive a message.. this should release credits |
| ClientMessage msgReceived = consumer.receive(5000); |
| assertNotNull(msgReceived); |
| msgReceived.acknowledge(); |
| session.commit(); // to make sure it's on the server (roundtrip) |
| |
| boolean exception = false; |
| |
| try { |
| for (int i = 0; i < 1000; i++) { |
| // this send will succeed on the server |
| producer.send(message); |
| } |
| } catch (Exception e) { |
| exception = true; |
| } |
| |
| assertTrue("Expected to throw an exception", exception); |
| } |
| |
| @Test |
| public void testFailMessagesDuplicates() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| HashMap<String, AddressSettings> settings = new HashMap<>(); |
| |
| AddressSettings set = new AddressSettings(); |
| set.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); |
| |
| settings.put(PagingTest.ADDRESS.toString(), set); |
| |
| server = createServer(true, config, 1024, 5 * 1024, settings); |
| |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| ClientSession session = addClientSession(sf.createSession(true, true, 0)); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = session.createMessage(true); |
| |
| int biggerMessageSize = 2048; |
| byte[] body = new byte[biggerMessageSize]; |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| for (int j = 1; j <= biggerMessageSize; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| message.getBodyBuffer().writeBytes(body); |
| |
| // Send enough messages to fill up the address. |
| producer.send(message); |
| producer.send(message); |
| producer.send(message); |
| |
| Queue q = (Queue) server.getPostOffice().getBinding(ADDRESS).getBindable(); |
| Wait.assertEquals(3, q::getMessageCount); |
| |
| // send a message with a dup ID that should fail b/c the address is full |
| SimpleString dupID1 = new SimpleString("abcdefg"); |
| message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID1.getData()); |
| message.putStringProperty("key", dupID1.toString()); |
| |
| validateExceptionOnSending(producer, message); |
| |
| Wait.assertEquals(3, q::getMessageCount); |
| |
| ClientConsumer consumer = session.createConsumer(ADDRESS); |
| |
| session.start(); |
| |
| // Receive a message...this should open space for another message |
| ClientMessage msgReceived = consumer.receive(5000); |
| assertNotNull(msgReceived); |
| msgReceived.acknowledge(); |
| session.commit(); // to make sure it's on the server (roundtrip) |
| consumer.close(); |
| |
| Wait.assertEquals(2, q::getMessageCount); |
| |
| producer.send(message); |
| |
| Wait.assertEquals(3, q::getMessageCount); |
| |
| consumer = session.createConsumer(ADDRESS); |
| |
| for (int i = 0; i < 3; i++) { |
| msgReceived = consumer.receive(5000); |
| assertNotNull(msgReceived); |
| msgReceived.acknowledge(); |
| session.commit(); |
| } |
| } |
| |
| /** |
| * This method validates if sending a message will throw an exception |
| */ |
| private void validateExceptionOnSending(ClientProducer producer, ClientMessage message) { |
| ActiveMQException expected = null; |
| |
| try { |
| // after the address is full this send should fail (since the address full policy is FAIL) |
| producer.send(message); |
| } catch (ActiveMQException e) { |
| expected = e; |
| } |
| |
| assertNotNull(expected); |
| assertEquals(ActiveMQExceptionType.ADDRESS_FULL, expected.getType()); |
| } |
| |
| @Test |
| public void testSpreadMessagesWithFilterWithDeadConsumer() throws Exception { |
| testSpreadMessagesWithFilter(true); |
| } |
| |
| @Test |
| public void testSpreadMessagesWithFilterWithoutDeadConsumer() throws Exception { |
| testSpreadMessagesWithFilter(false); |
| } |
| |
| @Test |
| public void testRouteOnTopWithMultipleQueues() throws Exception { |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| ServerLocator locator = createInVMNonHALocator().setBlockOnDurableSend(false); |
| ClientSessionFactory sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(false, true, 0); |
| |
| session.createQueue("Q", "Q1", "dest=1", true); |
| session.createQueue("Q", "Q2", "dest=2", true); |
| session.createQueue("Q", "Q3", "dest=3", true); |
| |
| Queue queue = server.locateQueue(new SimpleString("Q1")); |
| queue.getPageSubscription().getPagingStore().startPaging(); |
| |
| ClientProducer prod = session.createProducer("Q"); |
| ClientMessage msg = session.createMessage(true); |
| msg.putIntProperty("dest", 1); |
| prod.send(msg); |
| session.commit(); |
| |
| msg = session.createMessage(true); |
| msg.putIntProperty("dest", 2); |
| prod.send(msg); |
| session.commit(); |
| |
| session.start(); |
| ClientConsumer cons1 = session.createConsumer("Q1"); |
| msg = cons1.receive(5000); |
| assertNotNull(msg); |
| msg.acknowledge(); |
| |
| ClientConsumer cons2 = session.createConsumer("Q2"); |
| msg = cons2.receive(5000); |
| assertNotNull(msg); |
| |
| queue.getPageSubscription().getPagingStore().forceAnotherPage(); |
| |
| msg = session.createMessage(true); |
| msg.putIntProperty("dest", 1); |
| prod.send(msg); |
| session.commit(); |
| |
| msg = cons1.receive(5000); |
| assertNotNull(msg); |
| msg.acknowledge(); |
| |
| queue.getPageSubscription().cleanupEntries(false); |
| |
| System.out.println("Waiting there"); |
| |
| server.stop(); |
| } |
| |
| // https://issues.jboss.org/browse/HORNETQ-1042 - spread messages because of filters |
| public void testSpreadMessagesWithFilter(boolean deadConsumer) throws Exception { |
| |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator(); |
| locator.setBlockOnDurableSend(false); |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| ClientSession session = sf.createSession(true, false); |
| |
| session.createQueue(ADDRESS.toString(), "Q1", "destQ=1 or both=true", true); |
| session.createQueue(ADDRESS.toString(), "Q2", "destQ=2 or both=true", true); |
| |
| if (deadConsumer) { |
| // This queue won't receive any messages |
| session.createQueue(ADDRESS.toString(), "Q3", "destQ=3", true); |
| } |
| |
| session.createQueue(ADDRESS.toString(), "Q_initial", "initialBurst=true", true); |
| |
| ClientSession sessionConsumerQ3 = null; |
| |
| final AtomicInteger consumerQ3Msgs = new AtomicInteger(0); |
| |
| if (deadConsumer) { |
| sessionConsumerQ3 = sf.createSession(true, true); |
| ClientConsumer consumerQ3 = sessionConsumerQ3.createConsumer("Q3"); |
| |
| consumerQ3.setMessageHandler(new MessageHandler() { |
| |
| @Override |
| public void onMessage(ClientMessage message) { |
| System.out.println("Received an unexpected message"); |
| consumerQ3Msgs.incrementAndGet(); |
| } |
| }); |
| |
| sessionConsumerQ3.start(); |
| |
| } |
| |
| final int initialBurst = 100; |
| final int messagesSentAfterBurst = 100; |
| final int MESSAGE_SIZE = 300; |
| final byte[] bodyWrite = new byte[MESSAGE_SIZE]; |
| |
| Queue serverQueue = server.locateQueue(new SimpleString("Q1")); |
| PagingStore pageStore = serverQueue.getPageSubscription().getPagingStore(); |
| |
| ClientProducer producer = session.createProducer(ADDRESS); |
| |
| // send an initial burst that will put the system into page mode. The initial burst will be towards Q1 only |
| for (int i = 0; i < initialBurst; i++) { |
| ClientMessage m = session.createMessage(true); |
| m.getBodyBuffer().writeBytes(bodyWrite); |
| m.putIntProperty("destQ", 1); |
| m.putBooleanProperty("both", false); |
| m.putBooleanProperty("initialBurst", true); |
| producer.send(m); |
| if (i % 100 == 0) { |
| session.commit(); |
| } |
| } |
| |
| session.commit(); |
| |
| pageStore.forceAnotherPage(); |
| |
| for (int i = 0; i < messagesSentAfterBurst; i++) { |
| { |
| ClientMessage m = session.createMessage(true); |
| m.getBodyBuffer().writeBytes(bodyWrite); |
| m.putIntProperty("destQ", 1); |
| m.putBooleanProperty("initialBurst", false); |
| m.putIntProperty("i", i); |
| m.putBooleanProperty("both", i % 10 == 0); |
| producer.send(m); |
| } |
| |
| if (i % 10 != 0) { |
| ClientMessage m = session.createMessage(true); |
| m.getBodyBuffer().writeBytes(bodyWrite); |
| m.putIntProperty("destQ", 2); |
| m.putIntProperty("i", i); |
| m.putBooleanProperty("both", false); |
| m.putBooleanProperty("initialBurst", false); |
| producer.send(m); |
| } |
| |
| if (i > 0 && i % 10 == 0) { |
| session.commit(); |
| if (i + 10 < messagesSentAfterBurst) { |
| pageStore.forceAnotherPage(); |
| } |
| } |
| } |
| |
| session.commit(); |
| |
| ClientConsumer consumerQ1 = session.createConsumer("Q1"); |
| ClientConsumer consumerQ2 = session.createConsumer("Q2"); |
| session.start(); |
| |
| // consuming now |
| |
| // initial burst |
| |
| for (int i = 0; i < initialBurst; i++) { |
| ClientMessage m = consumerQ1.receive(5000); |
| assertNotNull(m); |
| assertEquals(1, m.getIntProperty("destQ").intValue()); |
| m.acknowledge(); |
| session.commit(); |
| } |
| |
| // This will consume messages from the beginning of the queue only |
| ClientConsumer consumerInitial = session.createConsumer("Q_initial"); |
| for (int i = 0; i < initialBurst; i++) { |
| ClientMessage m = consumerInitial.receive(5000); |
| assertNotNull(m); |
| assertEquals(1, m.getIntProperty("destQ").intValue()); |
| m.acknowledge(); |
| } |
| |
| assertNull(consumerInitial.receiveImmediate()); |
| session.commit(); |
| |
| // messages from Q1 |
| |
| for (int i = 0; i < messagesSentAfterBurst; i++) { |
| ClientMessage m = consumerQ1.receive(5000); |
| assertNotNull(m); |
| if (!m.getBooleanProperty("both")) { |
| assertEquals(1, m.getIntProperty("destQ").intValue()); |
| } |
| assertEquals(i, m.getIntProperty("i").intValue()); |
| m.acknowledge(); |
| |
| session.commit(); |
| } |
| |
| for (int i = 0; i < messagesSentAfterBurst; i++) { |
| ClientMessage m = consumerQ2.receive(5000); |
| assertNotNull(m); |
| |
| if (!m.getBooleanProperty("both")) { |
| assertEquals(2, m.getIntProperty("destQ").intValue()); |
| } |
| assertEquals(i, m.getIntProperty("i").intValue()); |
| m.acknowledge(); |
| |
| session.commit(); |
| } |
| |
| waitForNotPaging(serverQueue); |
| |
| if (sessionConsumerQ3 != null) { |
| sessionConsumerQ3.close(); |
| } |
| assertEquals(0, consumerQ3Msgs.intValue()); |
| |
| session.close(); |
| locator.close(); |
| |
| } finally { |
| server.stop(); |
| } |
| } |
| |
| // We send messages to pages, create a big hole (a few pages without any messages), ack everything |
| // and expect it to move to the next page |
| @Test |
| public void testPageHole() throws Throwable { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator().setBlockOnDurableSend(true); |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| ClientSession session = sf.createSession(true, true, 0); |
| |
| session.createQueue(ADDRESS.toString(), "Q1", "dest=1", true); |
| session.createQueue(ADDRESS.toString(), "Q2", "dest=2", true); |
| |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| |
| store.startPaging(); |
| |
| ClientProducer prod = session.createProducer(ADDRESS); |
| |
| ClientMessage msg = session.createMessage(true); |
| msg.putIntProperty("dest", 1); |
| prod.send(msg); |
| |
| for (int i = 0; i < 100; i++) { |
| msg = session.createMessage(true); |
| msg.putIntProperty("dest", 2); |
| prod.send(msg); |
| |
| if (i > 0 && i % 10 == 0) { |
| store.forceAnotherPage(); |
| } |
| } |
| |
| session.start(); |
| |
| ClientConsumer cons1 = session.createConsumer("Q1"); |
| |
| ClientMessage msgReceivedCons1 = cons1.receive(5000); |
| assertNotNull(msgReceivedCons1); |
| msgReceivedCons1.acknowledge(); |
| |
| ClientConsumer cons2 = session.createConsumer("Q2"); |
| for (int i = 0; i < 100; i++) { |
| ClientMessage msgReceivedCons2 = cons2.receive(1000); |
| assertNotNull(msgReceivedCons2); |
| msgReceivedCons2.acknowledge(); |
| |
| session.commit(); |
| |
| // It will send another message when it's mid consumed |
| if (i == 20) { |
| // wait at least one page to be deleted before sending a new one |
| for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && store.checkPageFileExists(2); ) { |
| Thread.sleep(10); |
| } |
| msg = session.createMessage(true); |
| msg.putIntProperty("dest", 1); |
| prod.send(msg); |
| } |
| } |
| |
| msgReceivedCons1 = cons1.receive(5000); |
| assertNotNull(msgReceivedCons1); |
| msgReceivedCons1.acknowledge(); |
| |
| assertNull(cons1.receiveImmediate()); |
| assertNull(cons2.receiveImmediate()); |
| |
| session.commit(); |
| |
| session.close(); |
| |
| waitForNotPaging(store); |
| } finally { |
| server.stop(); |
| } |
| |
| } |
| |
| @Test |
| public void testMultiFiltersBrowsing() throws Throwable { |
| internalTestMultiFilters(true); |
| } |
| |
| @Test |
| public void testMultiFiltersRegularConsumer() throws Throwable { |
| internalTestMultiFilters(false); |
| } |
| |
| @Test |
| public void testPageEmptyFile() throws Exception { |
| boolean persistentMessages = true; |
| |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int messageSize = 1024; |
| |
| final int numberOfMessages = 100; |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| store.forceAnotherPage(); |
| store.forceAnotherPage(); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[messageSize]; |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(persistentMessages); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| producer.send(message); |
| } |
| |
| session.commit(); |
| |
| Queue queue = server.locateQueue(PagingTest.ADDRESS); |
| Wait.assertEquals(numberOfMessages, queue::getMessageCount); |
| |
| store.forceAnotherPage(); |
| |
| session.start(); |
| |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = consumer.receive(5000); |
| assertNotNull(message); |
| message.acknowledge(); |
| } |
| |
| session.commit(); |
| |
| assertNull(consumer.receiveImmediate()); |
| |
| consumer.close(); |
| |
| store.getCursorProvider().cleanup(); |
| |
| Wait.assertEquals(0, queue::getMessageCount); |
| |
| long timeout = System.currentTimeMillis() + 5000; |
| while (store.isPaging() && timeout > System.currentTimeMillis()) { |
| Thread.sleep(100); |
| } |
| |
| store.getCursorProvider().cleanup(); |
| |
| sf.close(); |
| |
| locator.close(); |
| |
| Wait.assertEquals(2, store::getNumberOfPages); |
| |
| } finally { |
| try { |
| server.stop(); |
| } catch (Throwable ignored) { |
| } |
| } |
| } |
| |
| public void internalTestMultiFilters(boolean browsing) throws Throwable { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator().setBlockOnDurableSend(true); |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| ClientSession session = sf.createSession(true, true, 0); |
| |
| session.createQueue(ADDRESS.toString(), "Q1", null, true); |
| |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| |
| ClientProducer prod = session.createProducer(ADDRESS); |
| |
| ClientMessage msg = null; |
| store.startPaging(); |
| |
| for (int i = 0; i < 100; i++) { |
| msg = session.createMessage(true); |
| msg.putStringProperty("color", "red"); |
| msg.putIntProperty("count", i); |
| prod.send(msg); |
| |
| if (i > 0 && i % 10 == 0) { |
| store.startPaging(); |
| store.forceAnotherPage(); |
| } |
| } |
| |
| for (int i = 0; i < 100; i++) { |
| msg = session.createMessage(true); |
| msg.putStringProperty("color", "green"); |
| msg.putIntProperty("count", i); |
| prod.send(msg); |
| |
| if (i > 0 && i % 10 == 0) { |
| store.startPaging(); |
| store.forceAnotherPage(); |
| } |
| } |
| |
| session.commit(); |
| |
| session.close(); |
| |
| session = sf.createSession(false, false, 0); |
| session.start(); |
| |
| ClientConsumer cons1; |
| |
| if (browsing) { |
| cons1 = session.createConsumer("Q1", "color='green'", true); |
| } else { |
| cons1 = session.createConsumer("Q1", "color='red'", false); |
| } |
| |
| for (int i = 0; i < 100; i++) { |
| msg = cons1.receive(5000); |
| |
| System.out.println("Received " + msg); |
| assertNotNull(msg); |
| if (!browsing) { |
| msg.acknowledge(); |
| } |
| } |
| |
| session.commit(); |
| |
| session.close(); |
| } finally { |
| server.stop(); |
| } |
| |
| } |
| |
| @Test |
| public void testPendingACKOutOfOrder() throws Throwable { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator(); |
| locator.setBlockOnDurableSend(false); |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| ClientSession session = sf.createSession(true, true, 0); |
| |
| session.createQueue(ADDRESS.toString(), "Q1", true); |
| |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| |
| store.startPaging(); |
| |
| ClientProducer prod = session.createProducer(ADDRESS); |
| |
| for (int i = 0; i < 100; i++) { |
| ClientMessage msg = session.createMessage(true); |
| msg.putIntProperty("count", i); |
| prod.send(msg); |
| session.commit(); |
| if ((i + 1) % 5 == 0 && i < 50) { |
| store.forceAnotherPage(); |
| } |
| } |
| |
| session.start(); |
| |
| ClientConsumer cons1 = session.createConsumer("Q1"); |
| |
| for (int i = 0; i < 100; i++) { |
| ClientMessage msg = cons1.receive(5000); |
| assertNotNull(msg); |
| |
| if (i == 13) { |
| msg.individualAcknowledge(); |
| } |
| } |
| |
| session.close(); |
| |
| locator.close(); |
| |
| server.stop(); |
| |
| server.start(); |
| |
| store = server.getPagingManager().getPageStore(ADDRESS); |
| |
| locator = createInVMNonHALocator(); |
| |
| sf = locator.createSessionFactory(); |
| |
| session = sf.createSession(true, true, 0); |
| cons1 = session.createConsumer("Q1"); |
| session.start(); |
| |
| for (int i = 0; i < 99; i++) { |
| ClientMessage msg = cons1.receive(5000); |
| assertNotNull(msg); |
| System.out.println("count = " + msg.getIntProperty("count")); |
| msg.acknowledge(); |
| } |
| |
| assertNull(cons1.receiveImmediate()); |
| |
| session.close(); |
| waitForNotPaging(store); |
| } finally { |
| server.stop(); |
| } |
| |
| } |
| |
| // Test a scenario where a page was complete and now needs to be cleared |
| @Test |
| public void testPageCompleteWasLive() throws Throwable { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator(); |
| locator.setBlockOnDurableSend(false); |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| ClientSession session = sf.createSession(true, true, 0); |
| |
| session.createQueue(ADDRESS.toString(), "Q1", "dest=1", true); |
| session.createQueue(ADDRESS.toString(), "Q2", "dest=2", true); |
| |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| |
| store.startPaging(); |
| |
| ClientProducer prod = session.createProducer(ADDRESS); |
| |
| ClientMessage msg = session.createMessage(true); |
| msg.putIntProperty("dest", 1); |
| prod.send(msg); |
| |
| msg = session.createMessage(true); |
| msg.putIntProperty("dest", 2); |
| prod.send(msg); |
| |
| session.start(); |
| |
| ClientConsumer cons1 = session.createConsumer("Q1"); |
| |
| ClientMessage msgReceivedCons1 = cons1.receive(1000); |
| |
| assertNotNull(msgReceivedCons1); |
| |
| ClientConsumer cons2 = session.createConsumer("Q2"); |
| ClientMessage msgReceivedCons2 = cons2.receive(1000); |
| assertNotNull(msgReceivedCons2); |
| |
| store.forceAnotherPage(); |
| |
| msg = session.createMessage(true); |
| |
| msg.putIntProperty("dest", 1); |
| |
| prod.send(msg); |
| |
| msgReceivedCons1.acknowledge(); |
| |
| msgReceivedCons1 = cons1.receive(1000); |
| assertNotNull(msgReceivedCons1); |
| msgReceivedCons1.acknowledge(); |
| msgReceivedCons2.acknowledge(); |
| |
| assertNull(cons1.receiveImmediate()); |
| assertNull(cons2.receiveImmediate()); |
| |
| session.commit(); |
| |
| session.close(); |
| |
| waitForNotPaging(store); |
| } finally { |
| server.stop(); |
| } |
| |
| } |
| |
| @Test |
| public void testNoCursors() throws Exception { |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| ServerLocator locator = createInVMNonHALocator(); |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| ClientSession session = sf.createSession(); |
| |
| session.createQueue(ADDRESS, ADDRESS, true); |
| ClientProducer prod = session.createProducer(ADDRESS); |
| |
| for (int i = 0; i < 100; i++) { |
| Message msg = session.createMessage(true); |
| msg.toCore().getBodyBuffer().writeBytes(new byte[1024]); |
| prod.send(msg); |
| } |
| |
| session.commit(); |
| |
| session.deleteQueue(ADDRESS); |
| session.close(); |
| sf.close(); |
| locator.close(); |
| server.stop(); |
| server.start(); |
| waitForNotPaging(server.getPagingManager().getPageStore(ADDRESS)); |
| server.stop(); |
| |
| } |
| |
| // Test a scenario where a page was complete and now needs to be cleared |
| @Test |
| public void testMoveMessages() throws Throwable { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int LARGE_MESSAGE_SIZE = 1024 * 1024; |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator(); |
| locator.setBlockOnDurableSend(false); |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| ClientSession session = sf.createSession(true, true, 0); |
| |
| session.createQueue("Q1", "Q1", true); |
| session.createQueue("Q2", "Q2", true); |
| |
| PagingStore store = server.getPagingManager().getPageStore(new SimpleString("Q1")); |
| |
| ClientProducer prod = session.createProducer("Q1"); |
| |
| for (int i = 0; i < 50; i++) { |
| ClientMessage msg = session.createMessage(true); |
| msg.putIntProperty("count", i); |
| if (i > 0 && i % 10 == 0) { |
| msg.setBodyInputStream(createFakeLargeStream(LARGE_MESSAGE_SIZE)); |
| } |
| prod.send(msg); |
| } |
| session.commit(); |
| |
| store.startPaging(); |
| for (int i = 50; i < 100; i++) { |
| ClientMessage msg = session.createMessage(true); |
| msg.putIntProperty("count", i); |
| if (i % 10 == 0) { |
| msg.setBodyInputStream(createFakeLargeStream(LARGE_MESSAGE_SIZE)); |
| } |
| prod.send(msg); |
| if (i % 10 == 0) { |
| session.commit(); |
| store.forceAnotherPage(); |
| } |
| } |
| session.commit(); |
| |
| Queue queue = server.locateQueue(new SimpleString("Q1")); |
| |
| queue.moveReferences(10, (Filter) null, new SimpleString("Q2"), false, server.getPostOffice().getBinding(new SimpleString("Q2"))); |
| |
| waitForNotPaging(store); |
| |
| session.close(); |
| locator.close(); |
| |
| server.stop(); |
| server.start(); |
| |
| locator = createInVMNonHALocator(); |
| locator.setBlockOnDurableSend(false); |
| sf = locator.createSessionFactory(); |
| session = sf.createSession(true, true, 0); |
| |
| session.start(); |
| |
| ClientConsumer cons = session.createConsumer("Q2"); |
| |
| for (int i = 0; i < 100; i++) { |
| ClientMessage msg = cons.receive(10000); |
| assertNotNull(msg); |
| if (i > 0 && i % 10 == 0) { |
| byte[] largeMessageRead = new byte[LARGE_MESSAGE_SIZE]; |
| msg.getBodyBuffer().readBytes(largeMessageRead); |
| for (int j = 0; j < LARGE_MESSAGE_SIZE; j++) { |
| assertEquals(largeMessageRead[j], getSamplebyte(j)); |
| } |
| } |
| msg.acknowledge(); |
| assertEquals(i, msg.getIntProperty("count").intValue()); |
| } |
| |
| assertNull(cons.receiveImmediate()); |
| |
| waitForNotPaging(server.locateQueue(new SimpleString("Q2"))); |
| |
| session.close(); |
| sf.close(); |
| locator.close(); |
| |
| } finally { |
| server.stop(); |
| } |
| |
| } |
| |
| @Test |
| public void testOnlyOnePageOnServerCrash() throws Throwable { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| class NonStoppablePagingStoreImpl extends PagingStoreImpl { |
| |
| NonStoppablePagingStoreImpl(SimpleString address, |
| ScheduledExecutorService scheduledExecutor, |
| long syncTimeout, |
| PagingManager pagingManager, |
| StorageManager storageManager, |
| SequentialFileFactory fileFactory, |
| PagingStoreFactory storeFactory, |
| SimpleString storeName, |
| AddressSettings addressSettings, |
| ArtemisExecutor executor, |
| boolean syncNonTransactional) { |
| super(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, syncNonTransactional); |
| } |
| |
| /** |
| * Normal stopping will cleanup non tx page subscription counter which will not trigger the bug. |
| * Here we override stop to simulate server crash. |
| * @throws Exception |
| */ |
| @Override |
| public synchronized void stop() throws Exception { |
| } |
| } |
| |
| if (storeType == StoreConfiguration.StoreType.DATABASE) { |
| server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) { |
| @Override |
| protected PagingStoreFactoryDatabase getPagingStoreFactory() throws Exception { |
| return new PagingStoreFactoryDatabase((DatabaseStorageConfiguration) this.getConfiguration().getStoreConfiguration(), this.getStorageManager(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) { |
| @Override |
| public synchronized PagingStore newStore(SimpleString address, AddressSettings settings) { |
| return new NonStoppablePagingStoreImpl(address, this.getScheduledExecutor(), config.getJournalBufferTimeout_NIO(), getPagingManager(), getStorageManager(), null, this, address, settings, getExecutorFactory().getExecutor(), this.syncNonTransactional); |
| } |
| }; |
| } |
| }; |
| } else { |
| server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) { |
| @Override |
| protected PagingStoreFactoryNIO getPagingStoreFactory() { |
| return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) { |
| @Override |
| public synchronized PagingStore newStore(SimpleString address, AddressSettings settings) { |
| return new NonStoppablePagingStoreImpl(address, this.getScheduledExecutor(), config.getJournalBufferTimeout_NIO(), getPagingManager(), getStorageManager(), null, this, address, settings, getExecutorFactory().getExecutor(), this.isSyncNonTransactional()); |
| } |
| }; |
| } |
| }; |
| } |
| |
| addServer(server); |
| |
| AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PagingTest.PAGE_SIZE).setMaxSizeBytes(PagingTest.PAGE_SIZE + MESSAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); |
| |
| server.getAddressSettingsRepository().addMatch("#", defaultSetting); |
| |
| server.start(); |
| |
| // Here we send some messages to ensure the queue start paging and create only one page |
| final int numberOfMessages = 12; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, true, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| message.putIntProperty("count", i); |
| |
| producer.send(message); |
| |
| } |
| producer.close(); |
| session.close(); |
| |
| Queue queue = server.locateQueue(PagingTest.ADDRESS); |
| Wait.assertEquals(numberOfMessages, queue::getMessageCount); |
| Wait.assertEquals(1, ()->server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages()); |
| |
| sf.close(); |
| |
| server.stop(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_SIZE + MESSAGE_SIZE); |
| server.start(); |
| |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(false, false, false); |
| |
| ClientConsumer consumer = session.createConsumer(ADDRESS); |
| session.start(); |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage msg = consumer.receive(1000); |
| assertNotNull(i + "th msg is null", msg); |
| assertEquals(i, msg.getIntProperty("count").intValue()); |
| msg.acknowledge(); |
| System.out.println(msg); |
| } |
| |
| assertNull(consumer.receiveImmediate()); |
| session.commit(); |
| |
| session.close(); |
| sf.close(); |
| locator.close(); |
| server.stop(); |
| } |
| |
| @Test |
| public void testPagingStoreDestroyed() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 5000; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| |
| bodyLocal.writeBytes(body); |
| |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| producer.close(); |
| assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS)); |
| assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging()); |
| |
| session.deleteQueue(PagingTest.ADDRESS); |
| session.close(); |
| sf.close(); |
| locator.close(); |
| locator = null; |
| sf = null; |
| assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS)); |
| // Ensure pagingStore is physically deleted |
| server.getPagingManager().reloadStores(); |
| assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS)); |
| server.stop(); |
| |
| server.start(); |
| assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS)); |
| // Ensure pagingStore is physically deleted |
| server.getPagingManager().reloadStores(); |
| assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS)); |
| server.stop(); |
| } |
| |
| @Test |
| public void testStopPagingWithoutConsumersIfTwoPages() throws Exception { |
| testStopPagingWithoutConsumersOnOneQueue(true); |
| } |
| |
| @Test |
| public void testStopPagingWithoutConsumersIfOnePage() throws Exception { |
| testStopPagingWithoutConsumersOnOneQueue(false); |
| } |
| |
| private void testStopPagingWithoutConsumersOnOneQueue(boolean forceAnotherPage) throws Exception { |
| boolean persistentMessages = true; |
| |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| ClientSession session = sf.createSession(false, false, false); |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), SimpleString.toSimpleString("destQ=1 or both=true"), true); |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), SimpleString.toSimpleString("destQ=2 or both=true"), true); |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| Queue queue = server.locateQueue(PagingTest.ADDRESS.concat("=1")); |
| queue.getPageSubscription().getPagingStore().startPaging(); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| ClientMessage message = session.createMessage(persistentMessages); |
| message.putBooleanProperty("both", true); |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| bodyLocal.writeBytes(new byte[1024]); |
| producer.send(message); |
| session.commit(); |
| session.start(); |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=2")); |
| message = consumer.receive(5000); |
| assertNotNull(message); |
| message.acknowledge(); |
| assertNull(consumer.receiveImmediate()); |
| consumer.close(); |
| session.commit(); |
| |
| if (forceAnotherPage) { |
| queue.getPageSubscription().getPagingStore().forceAnotherPage(); |
| } |
| |
| message = session.createMessage(persistentMessages); |
| message.putIntProperty("destQ", 1); |
| bodyLocal = message.getBodyBuffer(); |
| bodyLocal.writeBytes(new byte[1024]); |
| producer.send(message); |
| session.commit(); |
| |
| consumer = session.createConsumer(PagingTest.ADDRESS.concat("=1")); |
| for (int i = 0; i < 2; i++) { |
| message = consumer.receive(5000); |
| assertNotNull(message); |
| message.acknowledge(); |
| session.commit(); |
| } |
| assertNull(consumer.receiveImmediate()); |
| consumer.close(); |
| session.close(); |
| |
| store.getCursorProvider().cleanup(); |
| waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1"))); |
| sf.close(); |
| locator.close(); |
| } finally { |
| try { |
| server.stop(); |
| } catch (Throwable ignored) { |
| } |
| } |
| } |
| |
| @Test |
| public void testStopPagingWithoutMsgsOnOneQueue() throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 500; |
| |
| locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| |
| ClientSession session = sf.createSession(false, false, false); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), SimpleString.toSimpleString("destQ=1"), true); |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), SimpleString.toSimpleString("destQ=2"), true); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| ClientConsumer consumer1 = session.createConsumer(PagingTest.ADDRESS.concat("=1")); |
| session.start(); |
| ClientSession session2 = sf.createSession(false, false, false); |
| ClientConsumer consumer2 = session2.createConsumer(PagingTest.ADDRESS.concat("=2")); |
| session2.start(); |
| |
| ClientMessage message = null; |
| |
| byte[] body = new byte[MESSAGE_SIZE]; |
| |
| ByteBuffer bb = ByteBuffer.wrap(body); |
| |
| for (int j = 1; j <= MESSAGE_SIZE; j++) { |
| bb.put(getSamplebyte(j)); |
| } |
| |
| /** |
| * Here we first send messages and consume them to move every subscription to the next bookmarked page. |
| * Then we send messages and consume them again, expecting paging is stopped normally. |
| */ |
| for (int x = 0; x < 2; x++) { |
| for (int i = 0; i < numberOfMessages; i++) { |
| message = session.createMessage(true); |
| message.putIntProperty("destQ", 1); |
| ActiveMQBuffer bodyLocal = message.getBodyBuffer(); |
| bodyLocal.writeBytes(body); |
| producer.send(message); |
| if (i % 1000 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS)); |
| assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging()); |
| for (int i = 0; i < numberOfMessages; i++) { |
| ClientMessage msg = consumer1.receive(1000); |
| assertNotNull(msg); |
| msg.acknowledge(); |
| if (i % 500 == 0) { |
| session.commit(); |
| } |
| } |
| session.commit(); |
| assertNull(consumer1.receiveImmediate()); |
| waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1"))); |
| } |
| |
| producer.close(); |
| consumer1.close(); |
| consumer2.close(); |
| session.close(); |
| session2.close(); |
| sf.close(); |
| locator.close(); |
| locator = null; |
| sf = null; |
| server.stop(); |
| } |
| |
| // We send messages to page, evict live page cache, send last message when mid consumed, and expect to receive all messages |
| @Test |
| public void testLivePageCacheEvicted() throws Throwable { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| server.start(); |
| |
| try { |
| ServerLocator locator = createInVMNonHALocator().setBlockOnDurableSend(true); |
| ClientSessionFactory sf = locator.createSessionFactory(); |
| ClientSession session = sf.createSession(true, true, 0); |
| |
| session.createQueue(ADDRESS, ADDRESS, null, true); |
| |
| PagingStore store = server.getPagingManager().getPageStore(ADDRESS); |
| store.startPaging(); |
| |
| ClientProducer prod = session.createProducer(ADDRESS); |
| int num = 10; |
| for (int i = 0; i < num; i++) { |
| ClientMessage msg = session.createMessage(true); |
| msg.putIntProperty("index", i); |
| prod.send(msg); |
| } |
| |
| session.start(); |
| ClientConsumer cons = session.createConsumer(ADDRESS); |
| ClientMessage msgReceivedCons = null; |
| // simulate the live page cache evicted |
| store.getCursorProvider().clearCache(); |
| for (int i = 0; i < num; i++) { |
| msgReceivedCons = cons.receive(1000); |
| assertNotNull(msgReceivedCons); |
| assertTrue(msgReceivedCons.getIntProperty("index") == i); |
| msgReceivedCons.acknowledge(); |
| |
| session.commit(); |
| |
| if (i == num / 2) { |
| ClientMessage msg = session.createMessage(true); |
| msg.putIntProperty("index", num); |
| prod.send(msg); |
| } |
| } |
| |
| msgReceivedCons = cons.receive(1000); |
| assertNotNull(msgReceivedCons); |
| assertTrue(msgReceivedCons.getIntProperty("index") == num); |
| msgReceivedCons.acknowledge(); |
| |
| assertNull(cons.receiveImmediate()); |
| |
| session.commit(); |
| session.close(); |
| |
| waitForNotPaging(store); |
| } finally { |
| server.stop(); |
| } |
| } |
| |
| @Test |
| public void testRollbackPageTransactionBeforeDelivery() throws Exception { |
| testRollbackPageTransaction(true); |
| } |
| |
| @Test |
| public void testRollbackPageTransactionAfterDelivery() throws Exception { |
| testRollbackPageTransaction(false); |
| } |
| |
| private void testRollbackPageTransaction(boolean rollbackBeforeDelivery) throws Exception { |
| clearDataRecreateServerDirs(); |
| |
| Configuration config = createDefaultInVMConfig(); |
| |
| server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); |
| |
| server.start(); |
| |
| final int numberOfMessages = 2; |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); |
| |
| sf = createSessionFactory(locator); |
| ClientSession session = sf.createSession(null, null, false, false, true, false, 0); |
| |
| session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); |
| |
| Queue queue = server.locateQueue(PagingTest.ADDRESS); |
| |
| queue.getPageSubscription().getPagingStore().startPaging(); |
| |
| ClientProducer producer = session.createProducer(PagingTest.ADDRESS); |
| |
| if (rollbackBeforeDelivery) { |
| sendMessages(session, producer, numberOfMessages); |
| session.rollback(); |
| assertEquals(server.getPagingManager().getTransactions().size(), 1); |
| PageTransactionInfo pageTransactionInfo = server.getPagingManager().getTransactions().values().iterator().next(); |
| // Make sure rollback happens before delivering messages |
| Wait.assertTrue(() -> pageTransactionInfo.isRollback(), 1000, 100); |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| session.start(); |
| Assert.assertNull(consumer.receiveImmediate()); |
| assertTrue(server.getPagingManager().getTransactions().isEmpty()); |
| } else { |
| ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); |
| session.start(); |
| sendMessages(session, producer, numberOfMessages); |
| Assert.assertNull(consumer.receiveImmediate()); |
| assertEquals(server.getPagingManager().getTransactions().size(), 1); |
| PageTransactionInfo pageTransactionInfo = server.getPagingManager().getTransactions().values().iterator().next(); |
| session.rollback(); |
| Wait.assertTrue(() -> pageTransactionInfo.isRollback(), 1000, 100); |
| assertTrue(server.getPagingManager().getTransactions().isEmpty()); |
| } |
| |
| session.close(); |
| } |
| |
| @Override |
| protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception { |
| Configuration configuration = super.createDefaultConfig(serverID, netty); |
| if (storeType == StoreConfiguration.StoreType.DATABASE) { |
| setDBStoreType(configuration); |
| } else if (mapped) { |
| configuration.setJournalType(JournalType.MAPPED); |
| } |
| return configuration; |
| } |
| |
| private static final class DummyOperationContext implements OperationContext { |
| |
| private final CountDownLatch pageUp; |
| private final CountDownLatch pageDone; |
| |
| private DummyOperationContext(CountDownLatch pageUp, CountDownLatch pageDone) { |
| this.pageDone = pageDone; |
| this.pageUp = pageUp; |
| } |
| |
| @Override |
| public void onError(int errorCode, String errorMessage) { |
| } |
| |
| @Override |
| public void done() { |
| } |
| |
| @Override |
| public void storeLineUp() { |
| } |
| |
| @Override |
| public boolean waitCompletion(long timeout) throws Exception { |
| return false; |
| } |
| |
| @Override |
| public void waitCompletion() throws Exception { |
| |
| } |
| |
| @Override |
| public void replicationLineUp() { |
| |
| } |
| |
| @Override |
| public void replicationDone() { |
| |
| } |
| |
| @Override |
| public void pageSyncLineUp() { |
| pageUp.countDown(); |
| } |
| |
| @Override |
| public void pageSyncDone() { |
| pageDone.countDown(); |
| } |
| |
| @Override |
| public void executeOnCompletion(IOCallback runnable) { |
| runnable.done(); |
| } |
| |
| @Override |
| public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { |
| runnable.done(); |
| } |
| } |
| } |