| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.artemis.tests.integration.client; |
| |
| import javax.jms.Connection; |
| import javax.jms.JMSException; |
| import javax.jms.MessageConsumer; |
| import javax.jms.Session; |
| import javax.transaction.xa.XAResource; |
| import javax.transaction.xa.Xid; |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.activemq.artemis.api.core.ActiveMQException; |
| import org.apache.activemq.artemis.api.core.Interceptor; |
| import org.apache.activemq.artemis.api.core.Message; |
| import org.apache.activemq.artemis.api.core.QueueConfiguration; |
| import org.apache.activemq.artemis.api.core.RoutingType; |
| import org.apache.activemq.artemis.api.core.SimpleString; |
| import org.apache.activemq.artemis.api.core.client.ActiveMQClient; |
| import org.apache.activemq.artemis.api.core.client.ClientConsumer; |
| import org.apache.activemq.artemis.api.core.client.ClientMessage; |
| import org.apache.activemq.artemis.api.core.client.ClientProducer; |
| import org.apache.activemq.artemis.api.core.client.ClientSession; |
| import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; |
| import org.apache.activemq.artemis.api.core.client.ServerLocator; |
| import org.apache.activemq.artemis.core.config.StoreConfiguration; |
| import org.apache.activemq.artemis.core.filter.Filter; |
| import org.apache.activemq.artemis.core.filter.impl.FilterImpl; |
| import org.apache.activemq.artemis.core.paging.PagingManager; |
| import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; |
| import org.apache.activemq.artemis.core.persistence.StorageManager; |
| import org.apache.activemq.artemis.core.postoffice.PostOffice; |
| import org.apache.activemq.artemis.core.protocol.core.Packet; |
| import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionContinuationMessage; |
| import org.apache.activemq.artemis.core.server.ActiveMQServer; |
| import org.apache.activemq.artemis.core.server.MessageReference; |
| import org.apache.activemq.artemis.core.server.Queue; |
| import org.apache.activemq.artemis.core.server.QueueConfig; |
| import org.apache.activemq.artemis.core.server.QueueFactory; |
| import org.apache.activemq.artemis.core.server.impl.AckReason; |
| import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; |
| import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl; |
| import org.apache.activemq.artemis.core.server.impl.QueueImpl; |
| import org.apache.activemq.artemis.core.settings.HierarchicalRepository; |
| import org.apache.activemq.artemis.core.settings.impl.AddressSettings; |
| import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; |
| import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; |
| import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; |
| import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; |
| import org.apache.activemq.artemis.utils.ExecutorFactory; |
| import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; |
| import org.jboss.logging.Logger; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class InterruptedLargeMessageTest extends LargeMessageTestBase { |
| // Constants ----------------------------------------------------- |
| |
| static final int RECEIVE_WAIT_TIME = 60000; |
| |
| private static final Logger log = Logger.getLogger(InterruptedLargeMessageTest.class); |
| |
| private final int LARGE_MESSAGE_SIZE = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 3; |
| |
| protected ServerLocator locator; |
| |
| public InterruptedLargeMessageTest(StoreConfiguration.StoreType storeType) { |
| super(storeType); |
| } |
| |
| @Override |
| @Before |
| public void setUp() throws Exception { |
| super.setUp(); |
| LargeMessageTestInterceptorIgnoreLastPacket.clearInterrupt(); |
| locator = createFactory(isNetty()); |
| } |
| |
| protected boolean isNetty() { |
| return false; |
| } |
| |
| @Test |
| public void testInterruptLargeMessageSend() throws Exception { |
| |
| ClientSession session = null; |
| |
| LargeMessageTestInterceptorIgnoreLastPacket.clearInterrupt(); |
| ActiveMQServer server = createServer(true, isNetty()); |
| |
| server.getConfiguration().getIncomingInterceptorClassNames().add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName()); |
| |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false); |
| |
| ClientSessionFactory sf = createSessionFactory(locator); |
| |
| session = sf.createSession(false, true, true); |
| |
| session.createQueue(new QueueConfiguration(ADDRESS)); |
| |
| ClientProducer producer = session.createProducer(ADDRESS); |
| |
| Message clientFile = createLargeClientMessageStreaming(session, LARGE_MESSAGE_SIZE, true); |
| |
| clientFile.setExpiration(System.currentTimeMillis()); |
| |
| producer.send(clientFile); |
| |
| Thread.sleep(500); |
| // |
| // for (ServerSession srvSession : server.getSessions()) { |
| // ((ServerSessionImpl) srvSession).clearLargeMessage(); |
| // } |
| |
| server.fail(false); |
| |
| ActiveMQTestBase.forceGC(); |
| |
| server.start(); |
| |
| server.stop(); |
| |
| validateNoFilesOnLargeDir(); |
| } |
| |
| @Test |
| public void testCloseConsumerDuringTransmission() throws Exception { |
| ActiveMQServer server = createServer(true, isNetty()); |
| |
| LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt(); |
| |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).addIncomingInterceptor(new LargeMessageTestInterceptorIgnoreLastPacket()); |
| |
| ClientSessionFactory sf = createSessionFactory(locator); |
| |
| final ClientSession session = sf.createSession(false, true, true); |
| |
| session.createQueue(new QueueConfiguration(ADDRESS)); |
| |
| ClientProducer producer = session.createProducer(ADDRESS); |
| |
| Message clientFile = createLargeClientMessageStreaming(session, LARGE_MESSAGE_SIZE, true); |
| |
| producer.send(clientFile); |
| |
| session.commit(); |
| |
| LargeMessageTestInterceptorIgnoreLastPacket.clearInterrupt(); |
| |
| final AtomicInteger unexpectedErrors = new AtomicInteger(0); |
| final AtomicInteger expectedErrors = new AtomicInteger(0); |
| final ClientConsumer cons = session.createConsumer(ADDRESS); |
| |
| session.start(); |
| |
| Thread t = new Thread() { |
| @Override |
| public void run() { |
| try { |
| instanceLog.debug("Receiving message"); |
| ClientMessage msg = cons.receive(5000); |
| if (msg == null) { |
| System.err.println("Message not received"); |
| unexpectedErrors.incrementAndGet(); |
| return; |
| } |
| |
| msg.checkCompletion(); |
| } catch (ActiveMQException e) { |
| e.printStackTrace(); |
| expectedErrors.incrementAndGet(); |
| } |
| } |
| }; |
| |
| t.start(); |
| |
| LargeMessageTestInterceptorIgnoreLastPacket.awaitInterrupt(); |
| |
| cons.close(); |
| |
| t.join(); |
| |
| Assert.assertEquals(0, unexpectedErrors.get()); |
| Assert.assertEquals(1, expectedErrors.get()); |
| |
| session.close(); |
| |
| server.stop(); |
| } |
| |
| @Test |
| public void testForcedInterruptUsingJMS() throws Exception { |
| ActiveMQServer server = createServer(true, isNetty()); |
| |
| server.start(); |
| |
| SimpleString jmsAddress = new SimpleString("Test"); |
| |
| server.createQueue(new QueueConfiguration(jmsAddress).setRoutingType(RoutingType.ANYCAST)); |
| |
| final AtomicInteger unexpectedErrors = new AtomicInteger(0); |
| final AtomicInteger expectedErrors = new AtomicInteger(0); |
| ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://0"); |
| Connection connection = cf.createConnection(); |
| Session session = connection.createSession(Session.SESSION_TRANSACTED); |
| connection.start(); |
| final MessageConsumer consumer = session.createConsumer(session.createQueue(jmsAddress.toString())); |
| |
| Thread t = new Thread() { |
| @Override |
| public void run() { |
| try { |
| instanceLog.debug("Receiving message"); |
| javax.jms.Message msg = consumer.receive(5000); |
| if (msg == null) { |
| System.err.println("Message not received"); |
| unexpectedErrors.incrementAndGet(); |
| return; |
| } |
| } catch (JMSException e) { |
| instanceLog.debug("This exception was ok as it was expected", e); |
| expectedErrors.incrementAndGet(); |
| } catch (Throwable e) { |
| instanceLog.warn("Captured unexpected exception", e); |
| unexpectedErrors.incrementAndGet(); |
| } |
| } |
| }; |
| |
| t.start(); |
| t.interrupt(); |
| |
| t.join(); |
| |
| Assert.assertEquals(0, unexpectedErrors.get()); |
| Assert.assertEquals(1, expectedErrors.get()); |
| |
| session.close(); |
| |
| server.stop(); |
| } |
| |
| @Test |
| public void testSendNonPersistentQueue() throws Exception { |
| |
| ClientSession session = null; |
| |
| LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt(); |
| ActiveMQServer server = createServer(true, isNetty()); |
| |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true); |
| |
| ClientSessionFactory sf = createSessionFactory(locator); |
| |
| session = sf.createSession(false, true, true); |
| |
| session.createQueue(new QueueConfiguration(ADDRESS).setDurable(false)); |
| |
| ClientProducer producer = session.createProducer(ADDRESS); |
| |
| for (int i = 0; i < 10; i++) { |
| Message clientFile = createLargeClientMessageStreaming(session, LARGE_MESSAGE_SIZE, true); |
| |
| producer.send(clientFile); |
| } |
| session.commit(); |
| |
| session.close(); |
| |
| session = sf.createSession(false, false); |
| |
| ClientConsumer cons = session.createConsumer(ADDRESS); |
| |
| session.start(); |
| |
| for (int h = 0; h < 5; h++) { |
| for (int i = 0; i < 10; i++) { |
| ClientMessage clientMessage = cons.receive(5000); |
| Assert.assertNotNull(clientMessage); |
| for (int countByte = 0; countByte < LARGE_MESSAGE_SIZE; countByte++) { |
| Assert.assertEquals(ActiveMQTestBase.getSamplebyte(countByte), clientMessage.getBodyBuffer().readByte()); |
| } |
| clientMessage.acknowledge(); |
| } |
| session.rollback(); |
| } |
| |
| server.fail(false); |
| server.start(); |
| |
| server.stop(); |
| |
| validateNoFilesOnLargeDir(); |
| } |
| |
| @Test |
| public void testSendPaging() throws Exception { |
| |
| ClientSession session = null; |
| |
| LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt(); |
| ActiveMQServer server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>()); |
| |
| // server.getConfiguration() |
| // .getIncomingInterceptorClassNames() |
| // .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName()); |
| |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true); |
| |
| ClientSessionFactory sf = createSessionFactory(locator); |
| |
| session = sf.createSession(false, true, true); |
| |
| session.createQueue(new QueueConfiguration(ADDRESS)); |
| |
| server.getPagingManager().getPageStore(ADDRESS).startPaging(); |
| |
| ClientProducer producer = session.createProducer(ADDRESS); |
| |
| for (int i = 0; i < 10; i++) { |
| Message clientFile = createLargeClientMessageStreaming(session, LARGE_MESSAGE_SIZE, true); |
| |
| producer.send(clientFile); |
| } |
| session.commit(); |
| |
| validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), 10); |
| |
| for (int h = 0; h < 5; h++) { |
| session.close(); |
| |
| sf.close(); |
| |
| server.stop(); |
| |
| server.start(); |
| |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(false, false); |
| |
| ClientConsumer cons = session.createConsumer(ADDRESS); |
| |
| session.start(); |
| |
| for (int i = 0; i < 10; i++) { |
| ClientMessage clientMessage = cons.receive(5000); |
| Assert.assertNotNull(clientMessage); |
| for (int countByte = 0; countByte < LARGE_MESSAGE_SIZE; countByte++) { |
| Assert.assertEquals(ActiveMQTestBase.getSamplebyte(countByte), clientMessage.getBodyBuffer().readByte()); |
| } |
| clientMessage.acknowledge(); |
| } |
| if (h == 4) { |
| session.commit(); |
| } else { |
| session.rollback(); |
| } |
| |
| session.close(); |
| sf.close(); |
| } |
| |
| server.fail(false); |
| server.start(); |
| |
| validateNoFilesOnLargeDir(); |
| |
| } |
| |
| @Test |
| public void testSendPreparedXA() throws Exception { |
| ClientSession session = null; |
| |
| LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt(); |
| |
| ActiveMQServer server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>()); |
| |
| server.getConfiguration().getIncomingInterceptorClassNames().add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName()); |
| |
| server.start(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true); |
| |
| ClientSessionFactory sf = createSessionFactory(locator); |
| |
| session = sf.createSession(true, false, false); |
| |
| Xid xid1 = newXID(); |
| Xid xid2 = newXID(); |
| |
| session.createQueue(new QueueConfiguration(ADDRESS)); |
| |
| ClientProducer producer = session.createProducer(ADDRESS); |
| |
| session.start(xid1, XAResource.TMNOFLAGS); |
| |
| for (int i = 0; i < 10; i++) { |
| Message clientFile = createLargeClientMessageStreaming(session, LARGE_MESSAGE_SIZE, true); |
| clientFile.putIntProperty("txid", 1); |
| producer.send(clientFile); |
| } |
| session.end(xid1, XAResource.TMSUCCESS); |
| |
| session.prepare(xid1); |
| |
| session.start(xid2, XAResource.TMNOFLAGS); |
| |
| for (int i = 0; i < 10; i++) { |
| Message clientFile = createLargeClientMessageStreaming(session, LARGE_MESSAGE_SIZE, true); |
| clientFile.putIntProperty("txid", 2); |
| clientFile.putIntProperty("i", i); |
| producer.send(clientFile); |
| } |
| session.end(xid2, XAResource.TMSUCCESS); |
| |
| session.prepare(xid2); |
| |
| session.close(); |
| sf.close(); |
| |
| server.fail(false); |
| server.start(); |
| |
| for (int start = 0; start < 2; start++) { |
| instanceLog.debug("Start " + start); |
| |
| sf = createSessionFactory(locator); |
| |
| if (start == 0) { |
| session = sf.createSession(true, false, false); |
| session.commit(xid1, false); |
| session.close(); |
| } |
| |
| session = sf.createSession(false, false, false); |
| ClientConsumer cons1 = session.createConsumer(ADDRESS); |
| session.start(); |
| for (int i = 0; i < 10; i++) { |
| instanceLog.info("I = " + i); |
| ClientMessage msg = cons1.receive(5000); |
| Assert.assertNotNull(msg); |
| Assert.assertEquals(1, msg.getIntProperty("txid").intValue()); |
| msg.acknowledge(); |
| } |
| |
| if (start == 1) { |
| session.commit(); |
| } else { |
| session.rollback(); |
| } |
| |
| session.close(); |
| sf.close(); |
| |
| server.stop(); |
| server.start(); |
| } |
| server.stop(); |
| |
| validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), 10); |
| |
| server.start(); |
| |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(true, false, false); |
| session.rollback(xid2); |
| |
| sf.close(); |
| |
| server.stop(); |
| server.start(); |
| server.stop(); |
| |
| validateNoFilesOnLargeDir(); |
| } |
| |
| @Test |
| public void testRestartBeforeDelete() throws Exception { |
| |
| class NoPostACKQueue extends QueueImpl { |
| |
| NoPostACKQueue(long id, |
| SimpleString address, |
| SimpleString name, |
| Filter filter, |
| SimpleString user, |
| PageSubscription pageSubscription, |
| boolean durable, |
| boolean temporary, |
| boolean autoCreated, |
| ScheduledExecutorService scheduledExecutor, |
| PostOffice postOffice, |
| StorageManager storageManager, |
| HierarchicalRepository<AddressSettings> addressSettingsRepository, |
| ActiveMQServer server, |
| ArtemisExecutor executor) { |
| super(id, address, name, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, |
| postOffice, storageManager, addressSettingsRepository, executor, server, null); |
| } |
| |
| @Override |
| public void postAcknowledge(final MessageReference ref, AckReason reason) { |
| instanceLog.debug("Ignoring postACK on message " + ref); |
| } |
| |
| @Override |
| public void deliverScheduledMessages() { |
| } |
| } |
| |
| final class NoPostACKQueueFactory implements QueueFactory { |
| |
| final StorageManager storageManager; |
| |
| final PostOffice postOffice; |
| |
| final ScheduledExecutorService scheduledExecutor; |
| |
| final HierarchicalRepository<AddressSettings> addressSettingsRepository; |
| |
| final ExecutorFactory execFactory; |
| |
| final ActiveMQServer server; |
| |
| NoPostACKQueueFactory(ActiveMQServer server, |
| StorageManager storageManager, |
| PostOffice postOffice, |
| ScheduledExecutorService scheduledExecutor, |
| HierarchicalRepository<AddressSettings> addressSettingsRepository, |
| final ExecutorFactory execFactory) { |
| this.storageManager = storageManager; |
| this.postOffice = postOffice; |
| this.scheduledExecutor = scheduledExecutor; |
| this.addressSettingsRepository = addressSettingsRepository; |
| this.execFactory = execFactory; |
| this.server = server; |
| } |
| |
| @Override |
| public Queue createQueueWith(final QueueConfig config) { |
| return new NoPostACKQueue(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, server, execFactory.getExecutor()); |
| } |
| |
| @Override |
| public Queue createQueueWith(QueueConfiguration config, PagingManager pagingManager) throws Exception { |
| return new NoPostACKQueue(config.getId(), config.getAddress(), config.getName(), FilterImpl.createFilter(config.getFilterString()), config.getUser(), QueueFactoryImpl.getPageSubscription(config, pagingManager), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, server, execFactory.getExecutor()); |
| } |
| |
| @Deprecated |
| @Override |
| public Queue createQueue(long persistenceID, |
| SimpleString address, |
| SimpleString name, |
| Filter filter, |
| PageSubscription pageSubscription, |
| SimpleString user, |
| boolean durable, |
| boolean temporary, |
| boolean autoCreated) { |
| |
| return new NoPostACKQueue(persistenceID, address, name, filter, user, pageSubscription, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, server, execFactory.getExecutor()); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.activemq.artemis.core.server.QueueFactory#setPostOffice(org.apache.activemq.artemis.core.postoffice.PostOffice) |
| */ |
| @Override |
| public void setPostOffice(PostOffice postOffice) { |
| } |
| |
| } |
| |
| ClientSession session = null; |
| |
| LargeMessageTestInterceptorIgnoreLastPacket.disableInterrupt(); |
| |
| ActiveMQServer server = createServer(true, isNetty()); |
| server.start(); |
| |
| QueueFactory original = server.getQueueFactory(); |
| |
| ((ActiveMQServerImpl) server).replaceQueueFactory(new NoPostACKQueueFactory(server, server.getStorageManager(), server.getPostOffice(), server.getScheduledPool(), server.getAddressSettingsRepository(), server.getExecutorFactory())); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true); |
| |
| ClientSessionFactory sf = createSessionFactory(locator); |
| |
| session = sf.createSession(false, true, true); |
| |
| session.createQueue(new QueueConfiguration(ADDRESS)); |
| |
| ClientProducer producer = session.createProducer(ADDRESS); |
| |
| for (int i = 0; i < 10; i++) { |
| Message clientFile = createLargeClientMessageStreaming(session, LARGE_MESSAGE_SIZE, true); |
| |
| producer.send(clientFile); |
| } |
| session.commit(); |
| |
| session.close(); |
| |
| session = sf.createSession(false, false); |
| |
| ClientConsumer cons = session.createConsumer(ADDRESS); |
| |
| session.start(); |
| |
| for (int i = 0; i < 10; i++) { |
| ClientMessage msg = cons.receive(5000); |
| Assert.assertNotNull(msg); |
| msg.saveToOutputStream(new java.io.OutputStream() { |
| @Override |
| public void write(int b) throws IOException { |
| } |
| }); |
| msg.acknowledge(); |
| session.commit(); |
| } |
| |
| ((ActiveMQServerImpl) server).replaceQueueFactory(original); |
| server.fail(false); |
| server.start(); |
| |
| server.stop(); |
| |
| validateNoFilesOnLargeDir(); |
| } |
| |
| @Test |
| public void testConsumeAfterRestart() throws Exception { |
| ClientSession session = null; |
| |
| LargeMessageTestInterceptorIgnoreLastPacket.clearInterrupt(); |
| |
| ActiveMQServer server = createServer(true, isNetty()); |
| server.start(); |
| |
| QueueFactory original = server.getQueueFactory(); |
| |
| locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true); |
| |
| ClientSessionFactory sf = createSessionFactory(locator); |
| |
| session = sf.createSession(false, true, true); |
| |
| session.createQueue(new QueueConfiguration(ADDRESS)); |
| |
| ClientProducer producer = session.createProducer(ADDRESS); |
| |
| for (int i = 0; i < 10; i++) { |
| Message clientFile = createLargeClientMessageStreaming(session, LARGE_MESSAGE_SIZE, true); |
| |
| producer.send(clientFile); |
| } |
| session.commit(); |
| |
| session.close(); |
| sf.close(); |
| |
| server.stop(); |
| server.start(); |
| |
| sf = createSessionFactory(locator); |
| |
| session = sf.createSession(false, false); |
| |
| ClientConsumer cons = session.createConsumer(ADDRESS); |
| |
| session.start(); |
| |
| for (int i = 0; i < 10; i++) { |
| ClientMessage msg = cons.receive(5000); |
| Assert.assertNotNull(msg); |
| msg.saveToOutputStream(new java.io.OutputStream() { |
| @Override |
| public void write(int b) throws IOException { |
| } |
| }); |
| msg.acknowledge(); |
| session.commit(); |
| } |
| |
| ((ActiveMQServerImpl) server).replaceQueueFactory(original); |
| server.fail(false); |
| server.start(); |
| |
| server.stop(); |
| |
| validateNoFilesOnLargeDir(); |
| } |
| |
| public static class LargeMessageTestInterceptorIgnoreLastPacket implements Interceptor { |
| |
| public static void clearInterrupt() { |
| intMessages = true; |
| latch = new CountDownLatch(1); |
| } |
| |
| public static void disableInterrupt() { |
| intMessages = false; |
| } |
| |
| public static void awaitInterrupt() throws Exception { |
| latch.await(); |
| } |
| |
| private static boolean intMessages = false; |
| |
| private static CountDownLatch latch = new CountDownLatch(1); |
| |
| @Override |
| public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { |
| if (packet instanceof SessionContinuationMessage) { |
| SessionContinuationMessage msg = (SessionContinuationMessage) packet; |
| if (!msg.isContinues() && intMessages) { |
| log.debug("Ignored a message"); |
| latch.countDown(); |
| return false; |
| } |
| } |
| return true; |
| } |
| } |
| } |