| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.qpid.systest.connection; |
| |
| import static junit.framework.TestCase.assertFalse; |
| import static org.hamcrest.CoreMatchers.equalTo; |
| import static org.hamcrest.CoreMatchers.is; |
| import static org.hamcrest.CoreMatchers.not; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.junit.Assume.assumeThat; |
| import static org.junit.Assume.assumeTrue; |
| |
| import java.text.MessageFormat; |
| import java.util.Collections; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import javax.jms.Connection; |
| import javax.jms.ConnectionFactory; |
| import javax.jms.Destination; |
| import javax.jms.ExceptionListener; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageListener; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.QueueBrowser; |
| import javax.jms.Session; |
| import javax.jms.TextMessage; |
| import javax.jms.TransactionRolledBackException; |
| |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.QpidException; |
| import org.apache.qpid.client.AMQConnection; |
| import org.apache.qpid.client.AMQConnectionFactory; |
| import org.apache.qpid.client.AMQDestination; |
| import org.apache.qpid.client.AMQSession; |
| import org.apache.qpid.jms.ConnectionListener; |
| import org.apache.qpid.jms.ConnectionURL; |
| import org.apache.qpid.jms.FailoverPolicy; |
| import org.apache.qpid.systest.core.BrokerAdmin; |
| import org.apache.qpid.systest.core.JmsTestBase; |
| import org.apache.qpid.systest.core.util.Utils; |
| import org.apache.qpid.url.URLSyntaxException; |
| |
| /** |
| * Test suite to test all possible failover corner cases |
| */ |
| public class FailoverBehaviourTest extends JmsTestBase implements ExceptionListener, ConnectionListener |
| { |
| private static final long DEFAULT_FAILOVER_TIME = Long.getLong("FailoverBaseCase.defaultFailoverTime", 10000L); |
| private static final Logger LOGGER = LoggerFactory.getLogger(FailoverBehaviourTest.class); |
| |
| private static final String TEST_MESSAGE_FORMAT = "test message {0}"; |
| |
| /** Default number of messages to send before failover */ |
| private static final int DEFAULT_NUMBER_OF_MESSAGES = 40; |
| |
| private static final int DEFAULT_MESSAGE_SIZE = 1024; |
| |
| /** Actual number of messages to send before failover */ |
| private int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES); |
| |
| /** Test connection */ |
| private Connection _connection; |
| private CountDownLatch _failoverStarted; |
| private CountDownLatch _failoverComplete; |
| |
| /** |
| * Consumer session |
| */ |
| private Session _consumerSession; |
| |
| /** |
| * Test destination |
| */ |
| private Destination _destination; |
| |
| /** |
| * Consumer |
| */ |
| private MessageConsumer _consumer; |
| |
| /** |
| * Producer session |
| */ |
| private Session _producerSession; |
| |
| /** |
| * Producer |
| */ |
| private MessageProducer _producer; |
| |
| @Before |
| public void setUp() throws Exception |
| { |
| _failoverComplete = new CountDownLatch(1); |
| _failoverStarted = new CountDownLatch(1); |
| |
| _connection = getConnection(); |
| _connection.setExceptionListener(this); |
| ((AMQConnection) _connection).setConnectionListener(this); |
| } |
| |
| /** |
| * Test whether MessageProducer can successfully publish messages after |
| * failover and rollback transaction |
| */ |
| @Test |
| public void testMessageProducingAndRollbackAfterFailover() throws Exception |
| { |
| init(Session.SESSION_TRANSACTED, true); |
| produceMessages(); |
| getBrokerAdmin().restart(); |
| |
| // producer should be able to send messages after failover |
| _producer.send(_producerSession.createTextMessage("test message " + _messageNumber)); |
| |
| // rollback after failover |
| _producerSession.rollback(); |
| |
| // tests whether sending and committing is working after failover |
| produceMessages(); |
| _producerSession.commit(); |
| |
| // tests whether receiving and committing is working after failover |
| consumeMessages(); |
| _consumerSession.commit(); |
| } |
| |
| /** |
| * Test whether {@link TransactionRolledBackException} is thrown on commit |
| * of dirty transacted session after failover. |
| * <p> |
| * Verifies whether second after failover commit is successful. |
| */ |
| @Test |
| public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnProducingMessages() throws Exception |
| { |
| init(Session.SESSION_TRANSACTED, true); |
| produceMessages(); |
| getBrokerAdmin().restart(); |
| |
| // producer should be able to send messages after failover |
| _producer.send(_producerSession.createTextMessage("test message " + _messageNumber)); |
| |
| try |
| { |
| _producerSession.commit(); |
| fail("TransactionRolledBackException is expected on commit after failover with dirty session!"); |
| } |
| catch (JMSException t) |
| { |
| assertTrue("Expected TransactionRolledBackException but thrown " + t, |
| t instanceof TransactionRolledBackException); |
| } |
| |
| // simulate process of user replaying the transaction |
| produceMessages("replayed test message {0}", _messageNumber, false); |
| |
| // no exception should be thrown |
| _producerSession.commit(); |
| |
| // only messages sent after rollback should be received |
| consumeMessages("replayed test message {0}", _messageNumber); |
| |
| // no exception should be thrown |
| _consumerSession.commit(); |
| } |
| |
| /** |
| * Tests JMSException is not thrown on commit with a clean session after |
| * failover |
| */ |
| @Test |
| public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanProducerSession() throws Exception |
| { |
| init(Session.SESSION_TRANSACTED, true); |
| |
| restartAndAwaitFailoverCompletion(); |
| |
| // should not throw an exception for a clean session |
| _producerSession.commit(); |
| |
| // tests whether sending and committing is working after failover |
| produceMessages(); |
| _producerSession.commit(); |
| |
| // tests whether receiving and committing is working after failover |
| consumeMessages(); |
| _consumerSession.commit(); |
| } |
| |
| /** |
| * Tests {@link TransactionRolledBackException} is thrown on commit of dirty |
| * transacted session after failover. |
| * <p> |
| * Verifies whether second after failover commit is successful. |
| */ |
| @Test |
| public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnMessageReceiving() throws Exception |
| { |
| assumeTrue(getBrokerAdmin().supportsPersistence()); |
| init(Session.SESSION_TRANSACTED, true); |
| produceMessages(); |
| _producerSession.commit(); |
| |
| // receive messages but do not commit |
| consumeMessages(); |
| |
| restartAndAwaitFailoverCompletion(); |
| |
| try |
| { |
| // should throw TransactionRolledBackException |
| _consumerSession.commit(); |
| fail("TransactionRolledBackException is expected on commit after failover"); |
| } |
| catch (Exception t) |
| { |
| assertTrue("Expected TransactionRolledBackException but thrown " + t, |
| t instanceof TransactionRolledBackException); |
| } |
| |
| // consume messages successfully |
| consumeMessages(); |
| _consumerSession.commit(); |
| } |
| |
| /** |
| * Tests JMSException is not thrown on commit with a clean session after failover |
| */ |
| @Test |
| public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanConsumerSession() throws Exception |
| { |
| init(Session.SESSION_TRANSACTED, true); |
| produceMessages(); |
| _producerSession.commit(); |
| |
| consumeMessages(); |
| _consumerSession.commit(); |
| |
| restartAndAwaitFailoverCompletion(); |
| |
| // should not throw an exception with a clean consumer session |
| _consumerSession.commit(); |
| } |
| |
| /** |
| * Test that {@link Session#rollback()} does not throw exception after failover |
| * and that we are able to consume messages. |
| */ |
| @Test |
| public void testRollbackAfterFailover() throws Exception |
| { |
| assumeTrue(getBrokerAdmin().supportsPersistence()); |
| init(Session.SESSION_TRANSACTED, true); |
| |
| produceMessages(); |
| _producerSession.commit(); |
| |
| consumeMessages(); |
| |
| restartAndAwaitFailoverCompletion(); |
| |
| _consumerSession.rollback(); |
| |
| // tests whether receiving and committing is working after failover |
| consumeMessages(); |
| _consumerSession.commit(); |
| } |
| |
| /** |
| * Test that {@link Session#rollback()} does not throw exception after receiving further messages |
| * after failover, and we can receive published messages after rollback. |
| */ |
| @Test |
| public void testRollbackAfterReceivingAfterFailover() throws Exception |
| { |
| assumeTrue(getBrokerAdmin().supportsPersistence()); |
| init(Session.SESSION_TRANSACTED, true); |
| |
| produceMessages(); |
| _producerSession.commit(); |
| |
| consumeMessages(); |
| getBrokerAdmin().restart(); |
| |
| consumeMessages(); |
| |
| _consumerSession.rollback(); |
| |
| // tests whether receiving and committing is working after failover |
| consumeMessages(); |
| _consumerSession.commit(); |
| } |
| |
| /** |
| * Test that {@link Session#recover()} does not throw an exception after failover |
| * and that we can consume messages after recover. |
| */ |
| @Test |
| public void testRecoverAfterFailover() throws Exception |
| { |
| assumeTrue(getBrokerAdmin().supportsPersistence()); |
| init(Session.CLIENT_ACKNOWLEDGE, true); |
| |
| produceMessages(); |
| |
| // consume messages but do not acknowledge them |
| consumeMessages(); |
| |
| restartAndAwaitFailoverCompletion(); |
| |
| _consumerSession.recover(); |
| |
| // tests whether receiving and acknowledgment is working after recover |
| Message lastMessage = consumeMessages(); |
| lastMessage.acknowledge(); |
| } |
| |
| /** |
| * Test that receiving more messages after failover and then calling |
| * {@link Session#recover()} does not throw an exception |
| * and that we can consume messages after recover. |
| */ |
| @Test |
| public void testRecoverWithConsumedMessagesAfterFailover() throws Exception |
| { |
| assumeTrue(getBrokerAdmin().supportsPersistence()); |
| init(Session.CLIENT_ACKNOWLEDGE, true); |
| |
| produceMessages(); |
| |
| // consume messages but do not acknowledge them |
| consumeMessages(); |
| |
| getBrokerAdmin().restart(); |
| |
| // publishing should work after failover |
| |
| // consume messages again on a dirty session |
| consumeMessages(); |
| |
| // recover should successfully restore session |
| _consumerSession.recover(); |
| |
| // tests whether receiving and acknowledgment is working after recover |
| Message lastMessage = consumeMessages(); |
| lastMessage.acknowledge(); |
| } |
| |
| /** |
| * Test that first call to {@link Message#acknowledge()} after failover |
| * throws a JMSEXception if session is dirty. |
| */ |
| @Test |
| public void testAcknowledgeAfterFailover() throws Exception |
| { |
| assumeTrue(getBrokerAdmin().supportsPersistence()); |
| init(Session.CLIENT_ACKNOWLEDGE, true); |
| |
| produceMessages(); |
| |
| // consume messages but do not acknowledge them |
| Message lastMessage = consumeMessages(); |
| |
| restartAndAwaitFailoverCompletion(); |
| |
| try |
| { |
| // an implicit recover performed when acknowledge throws an exception due to failover |
| lastMessage.acknowledge(); |
| fail("JMSException should be thrown"); |
| } |
| catch (JMSException t) |
| { |
| // TODO: assert error code and/or expected exception type |
| } |
| |
| // tests whether receiving and acknowledgment is working after recover |
| lastMessage = consumeMessages(); |
| lastMessage.acknowledge(); |
| } |
| |
| /** |
| * Test that calling acknowledge before failover leaves the session |
| * clean for use after failover. |
| */ |
| @Test |
| public void testAcknowledgeBeforeFailover() throws Exception |
| { |
| init(Session.CLIENT_ACKNOWLEDGE, true); |
| |
| produceMessages(); |
| |
| // consume messages and acknowledge them |
| Message lastMessage = consumeMessages(); |
| lastMessage.acknowledge(); |
| |
| getBrokerAdmin().restart(); |
| |
| produceMessages(); |
| |
| // tests whether receiving and acknowledgment is working after recover |
| lastMessage = consumeMessages(); |
| lastMessage.acknowledge(); |
| } |
| |
| /** |
| * Test that receiving of messages after failover prior to calling |
| * {@link Message#acknowledge()} still results in acknowledge throwing an exception. |
| */ |
| @Test |
| public void testAcknowledgeAfterMessageReceivingAfterFailover() throws Exception |
| { |
| assumeTrue(getBrokerAdmin().supportsPersistence()); |
| init(Session.CLIENT_ACKNOWLEDGE, true); |
| |
| produceMessages(); |
| |
| // consume messages but do not acknowledge them |
| consumeMessages(); |
| |
| restartAndAwaitFailoverCompletion(); |
| |
| // consume again on dirty session |
| Message lastMessage = consumeMessages(); |
| try |
| { |
| // an implicit recover performed when acknowledge throws an exception due to failover |
| lastMessage.acknowledge(); |
| fail("JMSException should be thrown"); |
| } |
| catch (JMSException t) |
| { |
| // TODO: assert error code and/or expected exception type |
| } |
| |
| // tests whether receiving and acknowledgment is working on a clean session |
| lastMessage = consumeMessages(); |
| lastMessage.acknowledge(); |
| } |
| |
| @Test |
| public void testClientAcknowledgedSessionCloseAfterFailover() throws Exception |
| { |
| sessionCloseAfterFailoverImpl(Session.CLIENT_ACKNOWLEDGE); |
| } |
| |
| @Test |
| public void testTransactedSessionCloseAfterFailover() throws Exception |
| { |
| sessionCloseAfterFailoverImpl(Session.SESSION_TRANSACTED); |
| } |
| |
| @Test |
| public void testAutoAcknowledgedSessionCloseAfterFailover() throws Exception |
| { |
| sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE); |
| } |
| |
| @Test |
| public void testPublishAutoAcknowledgedWhileFailover() throws Exception |
| { |
| publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE); |
| } |
| |
| @Test |
| public void testPublishClientAcknowledgedWhileFailover() throws Exception |
| { |
| Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE); |
| receivedMessage.acknowledge(); |
| } |
| |
| @Test |
| public void testPublishTransactedAcknowledgedWhileFailover() throws Exception |
| { |
| publishWhileFailingOver(Session.SESSION_TRANSACTED); |
| _consumerSession.commit(); |
| } |
| |
| @Test |
| public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception |
| { |
| sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE); |
| } |
| |
| @Test |
| public void testTransactedSessionCloseWhileFailover() throws Exception |
| { |
| sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED); |
| } |
| |
| @Test |
| public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception |
| { |
| sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE); |
| } |
| |
| @Test |
| public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception |
| { |
| browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE); |
| } |
| |
| @Test |
| public void testTransactedQueueBrowserCloseWhileFailover() throws Exception |
| { |
| browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED); |
| } |
| |
| @Test |
| public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception |
| { |
| browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE); |
| } |
| |
| @Test |
| public void testKillBrokerFailoverWhilstPublishingInFlight() throws Exception |
| { |
| doFailoverWhilstPublishingInFlight(); |
| } |
| |
| @Test |
| public void testStopBrokerFailoverWhilstPublishingInFlight() throws Exception |
| { |
| doFailoverWhilstPublishingInFlight(); |
| } |
| |
| private void doFailoverWhilstPublishingInFlight() throws Exception |
| { |
| init(Session.SESSION_TRANSACTED, false); |
| |
| final int numberOfMessages = 200; |
| |
| final CountDownLatch halfWay = new CountDownLatch(1); |
| final CountDownLatch allDone = new CountDownLatch(1); |
| final AtomicReference<Exception> exception = new AtomicReference<>(); |
| |
| Runnable producerRunnable = new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| Thread.currentThread().setName("ProducingThread"); |
| |
| try |
| { |
| for(int i=0; i< numberOfMessages; i++) |
| { |
| boolean success = false; |
| while(!success) |
| { |
| try |
| { |
| Message message = _producerSession.createMessage(); |
| message.setIntProperty("msgNum", i); |
| _producer.send(message); |
| _producerSession.commit(); |
| success = true; |
| } |
| catch (javax.jms.IllegalStateException e) |
| { |
| // fail - failover should not leave a JMS object in an illegal state |
| throw e; |
| } |
| catch (JMSException e) |
| { |
| // OK we will be failing over |
| LOGGER.debug("Got JMS exception, probably just failing over", e); |
| } |
| } |
| |
| if (i > numberOfMessages / 2 && halfWay.getCount() == 1) |
| { |
| halfWay.countDown(); |
| } |
| } |
| |
| allDone.countDown(); |
| } |
| catch (Exception e) |
| { |
| exception.set(e); |
| } |
| } |
| }; |
| |
| Thread producerThread = new Thread(producerRunnable); |
| producerThread.start(); |
| |
| assertTrue("Didn't get to half way within timeout", halfWay.await(30000, TimeUnit.MILLISECONDS)); |
| |
| getBrokerAdmin().restart(); |
| |
| if (exception.get() != null) |
| { |
| LOGGER.error("Unexpected exception from producer thread", exception.get()); |
| } |
| assertNull("Producer thread should not have got an exception", exception.get()); |
| |
| assertTrue("All producing work was not completed", allDone.await(30000, TimeUnit.MILLISECONDS)); |
| |
| producerThread.join(30000); |
| |
| // Extra work to prove the session still okay |
| assertNotNull(_producerSession.createTemporaryQueue()); |
| } |
| |
| |
| private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException |
| { |
| setDelayedFailoverPolicy(5); |
| init(autoAcknowledge, true); |
| |
| String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0); |
| Message message = _producerSession.createTextMessage(text); |
| |
| getBrokerAdmin().restart(); |
| |
| if(!_failoverStarted.await(5, TimeUnit.SECONDS)) |
| { |
| fail("Did not receive notification failover had started"); |
| } |
| |
| _producer.send(message); |
| |
| if (_producerSession.getTransacted()) |
| { |
| _producerSession.commit(); |
| } |
| |
| Message receivedMessage = _consumer.receive(1000L); |
| assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); |
| return receivedMessage; |
| } |
| |
| |
| /** |
| * This test only tests 0-8/0-9/0-9-1 failover timeout |
| */ |
| @Test |
| public void testFailoverHandlerTimeoutExpires() throws Exception |
| { |
| |
| assumeThat("This test only tests 0-8/0-9/0-9-1 failover timeout", |
| getProtocol(), |
| is(not(equalTo("0-10")))); |
| |
| _connection.close(); |
| _connection = null; |
| |
| System.setProperty("qpid.failover_method_timeout", "10000"); |
| AMQConnection connection = null; |
| try |
| { |
| connection = createConnectionWithFailover(); |
| |
| // holding failover mutex should prevent the failover from proceeding |
| synchronized(connection.getFailoverMutex()) |
| { |
| getBrokerAdmin().restart(); |
| |
| // sleep interval exceeds failover timeout interval |
| Thread.sleep(11000L); |
| } |
| |
| // allows the failover thread to proceed |
| Thread.yield(); |
| assertFalse("Unexpected failover", _failoverComplete.await(2000L, TimeUnit.MILLISECONDS)); |
| assertTrue("Failover should not succeed due to timeout", connection.isClosed()); |
| } |
| finally |
| { |
| System.clearProperty("qpid.failover_method_timeout"); |
| if (connection != null) |
| { |
| connection.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testFailoverHandlerTimeoutReconnected() throws Exception |
| { |
| _connection.close(); |
| System.setProperty("qpid.failover_method_timeout", "10000"); |
| AMQConnection connection = null; |
| try |
| { |
| connection = createConnectionWithFailover(); |
| |
| // holding failover mutex should prevent the failover from proceeding |
| synchronized(connection.getFailoverMutex()) |
| { |
| getBrokerAdmin().restart(); |
| } |
| |
| // allows the failover thread to proceed |
| Thread.yield(); |
| awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME); |
| assertFalse("Failover should restore connectivity", connection.isClosed()); |
| } |
| finally |
| { |
| System.clearProperty("qpid.failover_method_timeout"); |
| if (connection != null) |
| { |
| connection.close(); |
| } |
| } |
| } |
| |
| /** |
| * Tests that the producer flow control flag is reset when failover occurs while |
| * the producers are being blocked by the broker. |
| * |
| * Uses Apache Qpid Broker-J specific queue configuration to enabled PSFC. |
| */ |
| @Test |
| public void testFlowControlFlagResetOnFailover() throws Exception |
| { |
| |
| assumeThat("This test only tests 0-8/0-9/0-9-1 failover timeout", |
| getProtocol(), |
| is(not(equalTo("0-10")))); |
| |
| |
| // we do not need the connection failing to second broker |
| _connection.close(); |
| |
| // make sure that failover timeout is bigger than flow control timeout |
| System.setProperty("qpid.failover_method_timeout", "60000"); |
| System.setProperty("qpid.flow_control_wait_failure", "10000"); |
| |
| AMQConnection connection = null; |
| try |
| { |
| connection = createConnectionWithFailover(Collections.singletonMap(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all")); |
| |
| final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); |
| final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2); |
| final AtomicInteger counter = new AtomicInteger(); |
| // try to send 5 messages (should block after 4) |
| new Thread(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| try |
| { |
| MessageProducer producer = producerSession.createProducer(queue); |
| for (int i=0; i < 5; i++) |
| { |
| Message next = createNextMessage(producerSession, i); |
| producer.send(next); |
| producerSession.commit(); |
| counter.incrementAndGet(); |
| } |
| } |
| catch(Exception e) |
| { |
| // ignore |
| } |
| } |
| }).start(); |
| |
| long limit= 30000L; |
| long start = System.currentTimeMillis(); |
| |
| // wait until session is blocked |
| while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit) |
| { |
| Thread.sleep(100L); |
| } |
| |
| assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked()); |
| |
| final int currentCounter = counter.get(); |
| assertTrue("Unexpected number of sent messages:" + currentCounter, currentCounter >=3); |
| |
| getBrokerAdmin().restart(); |
| |
| // allows the failover thread to proceed |
| Thread.yield(); |
| awaitForFailoverCompletion(60000L); |
| |
| assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked()); |
| } |
| finally |
| { |
| System.clearProperty("qpid.failover_method_timeout"); |
| System.clearProperty("qpid.flow_control_wait_failure"); |
| |
| if (connection != null) |
| { |
| connection.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testFailoverWhenConnectionStopped() throws Exception |
| { |
| assumeTrue(getBrokerAdmin().supportsPersistence()); |
| init(Session.SESSION_TRANSACTED, true); |
| |
| produceMessages(); |
| _producerSession.commit(); |
| |
| final CountDownLatch stopFlag = new CountDownLatch(1); |
| final AtomicReference<Exception> exception = new AtomicReference<>(); |
| final CountDownLatch expectedMessageLatch = new CountDownLatch(_messageNumber); |
| final AtomicInteger counter = new AtomicInteger(); |
| |
| _consumer.setMessageListener(new MessageListener() |
| { |
| @Override |
| public void onMessage(Message message) |
| { |
| if (stopFlag.getCount() == 1) |
| { |
| try |
| { |
| LOGGER.debug("Stopping connection from dispatcher thread"); |
| _connection.stop(); |
| LOGGER.debug("Connection stopped from dispatcher thread"); |
| |
| } |
| catch (Exception e) |
| { |
| exception.set(e); |
| } |
| finally |
| { |
| stopFlag.countDown(); |
| getBrokerAdmin().restart(); |
| } |
| |
| } |
| else |
| { |
| try |
| { |
| _consumerSession.commit(); |
| counter.incrementAndGet(); |
| expectedMessageLatch.countDown(); |
| } |
| catch (Exception e) |
| { |
| exception.set(e); |
| } |
| } |
| } |
| }); |
| |
| |
| boolean stopResult = stopFlag.await(2000, TimeUnit.MILLISECONDS); |
| assertTrue("Connection was not stopped" + (exception.get() == null ? "." : ":" + exception.get().getMessage()), |
| stopResult); |
| assertNull("Unexpected exception on stop :" + exception.get(), exception.get()); |
| |
| // wait for failover to complete |
| awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME); |
| |
| _producerSession.commit(); |
| |
| _connection.start(); |
| |
| assertTrue("Not all messages were delivered. Remaining message number " + expectedMessageLatch.getCount(), expectedMessageLatch.await(11000, TimeUnit.MILLISECONDS)); |
| |
| Thread.sleep(500L); |
| assertEquals("Unexpected messages recieved ", _messageNumber, counter.get()); |
| |
| _connection.close(); |
| } |
| |
| @Test |
| public void testConnectionCloseInterruptsFailover() throws Exception |
| { |
| _connection.close(); |
| |
| final AtomicBoolean failoverCompleted = new AtomicBoolean(false); |
| final CountDownLatch failoverBegun = new CountDownLatch(1); |
| |
| AMQConnection connection = createConnectionWithFailover(); |
| connection.setConnectionListener(new ConnectionListener() |
| { |
| @Override |
| public void bytesSent(final long count) |
| { |
| } |
| |
| @Override |
| public void bytesReceived(final long count) |
| { |
| } |
| |
| @Override |
| public boolean preFailover(final boolean redirect) |
| { |
| failoverBegun.countDown(); |
| LOGGER.info("Failover started"); |
| return true; |
| } |
| |
| @Override |
| public boolean preResubscribe() |
| { |
| return true; |
| } |
| |
| @Override |
| public void failoverComplete() |
| { |
| failoverCompleted.set(true); |
| } |
| }); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| assertNotNull("Session should be created", session); |
| getBrokerAdmin().stop(); |
| |
| boolean failingOver = failoverBegun.await(5000, TimeUnit.MILLISECONDS); |
| assertTrue("Failover did not begin with a reasonable time", failingOver); |
| |
| // Failover will now be in flight |
| connection.close(); |
| assertTrue("Failover policy is unexpectedly exhausted", connection.getFailoverPolicy().failoverAllowed()); |
| } |
| |
| private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception |
| { |
| final Map<String, Object> arguments = new HashMap<>(); |
| arguments.put("x-qpid-capacity", capacity); |
| arguments.put("x-qpid-flow-resume-capacity", resumeCapacity); |
| ((AMQSession<?, ?>) session).createQueue(queueName, false, true, false, arguments); |
| Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true |
| + "'&autodelete='" + false + "'"); |
| ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue); |
| return queue; |
| } |
| |
| private AMQConnection createConnectionWithFailover() throws JMSException |
| { |
| return createConnectionWithFailover(null); |
| } |
| |
| private AMQConnection createConnectionWithFailover(Map<String,String> connectionOptions) throws JMSException |
| { |
| String retries = "200"; |
| String connectdelay = "1000"; |
| String cycleCount = "2"; |
| |
| String newUrlFormat = "amqp://username:password@clientid/%s?brokerlist=" + |
| "'tcp://%s:%s?retries='%s'&connectdelay='%s''&failover='singlebroker?cyclecount='%s''"; |
| |
| int port = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP).getPort(); |
| String virtualHostName = getBrokerAdmin().getVirtualHostName(); |
| StringBuilder newUrl = new StringBuilder(String.format(newUrlFormat, |
| virtualHostName, |
| "localhost", |
| port, |
| retries, connectdelay, cycleCount)); |
| |
| if (connectionOptions != null) |
| { |
| for (Map.Entry<String,String> option: connectionOptions.entrySet()) |
| { |
| newUrl.append("&").append(option.getKey()).append("='").append(option.getValue()).append("'"); |
| } |
| } |
| ConnectionFactory connectionFactory; |
| try |
| { |
| connectionFactory = new AMQConnectionFactory(newUrl.toString()); |
| } |
| catch (URLSyntaxException e) |
| { |
| throw new RuntimeException(e); |
| } |
| AMQConnection connection = (AMQConnection) connectionFactory.createConnection(getBrokerAdmin().getValidUsername(), |
| getBrokerAdmin().getValidPassword()); |
| connection.setConnectionListener(this); |
| return connection; |
| } |
| |
| /** |
| * Tests {@link Session#close()} for session with given acknowledge mode |
| * to ensure that close works after failover. |
| * |
| * @param acknowledgeMode session acknowledge mode |
| */ |
| private void sessionCloseAfterFailoverImpl(int acknowledgeMode) throws JMSException |
| { |
| init(acknowledgeMode, true); |
| produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false); |
| if (acknowledgeMode == Session.SESSION_TRANSACTED) |
| { |
| _producerSession.commit(); |
| } |
| |
| // intentionally receive message but do not commit or acknowledge it in |
| // case of transacted or CLIENT_ACK session |
| Message receivedMessage = _consumer.receive(1000L); |
| assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); |
| |
| restartAndAwaitFailoverCompletion(); |
| |
| // for transacted/client_ack session |
| // no exception should be thrown but transaction should be automatically |
| // rolled back |
| _consumerSession.close(); |
| } |
| |
| /** |
| * A helper method to instantiate produce and consumer sessions, producer |
| * and consumer. |
| * |
| * @param acknowledgeMode |
| * acknowledge mode |
| * @param startConnection |
| * indicates whether connection should be started |
| */ |
| private void init(int acknowledgeMode, boolean startConnection) throws JMSException |
| { |
| boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED; |
| |
| _consumerSession = _connection.createSession(isTransacted, acknowledgeMode); |
| _destination = createDestination(_consumerSession); |
| _consumer = _consumerSession.createConsumer(_destination); |
| |
| if (startConnection) |
| { |
| _connection.start(); |
| } |
| |
| _producerSession = _connection.createSession(isTransacted, acknowledgeMode); |
| _producer = _producerSession.createProducer(_destination); |
| |
| } |
| |
| private Destination createDestination(Session session) throws JMSException |
| { |
| return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis()); |
| } |
| |
| /** |
| * Produces a default number of messages with default text content into test |
| * queue |
| * |
| */ |
| private void produceMessages() throws JMSException |
| { |
| produceMessages(false); |
| } |
| |
| private void produceMessages(boolean separateProducer) throws JMSException |
| { |
| produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, separateProducer); |
| } |
| |
| /** |
| * Consumes a default number of messages and asserts their content. |
| * |
| * @return last consumed message |
| */ |
| private Message consumeMessages() throws JMSException |
| { |
| return consumeMessages(TEST_MESSAGE_FORMAT, _messageNumber); |
| } |
| |
| /** |
| * Produces given number of text messages with content matching given |
| * content pattern |
| * |
| * @param messagePattern message content pattern |
| * @param messageNumber number of messages to send |
| * @param standaloneProducer whether to use the existing producer or a new one. |
| */ |
| private void produceMessages(String messagePattern, int messageNumber, boolean standaloneProducer) throws JMSException |
| { |
| Session producerSession; |
| MessageProducer producer; |
| |
| if(standaloneProducer) |
| { |
| producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); |
| producer = producerSession.createProducer(_destination); |
| } |
| else |
| { |
| producerSession = _producerSession; |
| producer = _producer; |
| } |
| |
| for (int i = 0; i < messageNumber; i++) |
| { |
| String text = MessageFormat.format(messagePattern, i); |
| Message message = producerSession.createTextMessage(text); |
| producer.send(message); |
| LOGGER.debug("Test message number " + i + " produced with text = " + text + ", and JMSMessageID = " + message.getJMSMessageID()); |
| } |
| |
| if(standaloneProducer) |
| { |
| producerSession.commit(); |
| } |
| } |
| |
| /** |
| * Consumes given number of text messages and asserts that their content |
| * matches given pattern |
| * |
| * @param messagePattern |
| * messages content pattern |
| * @param messageNumber |
| * message number to received |
| * @return last consumed message |
| */ |
| private Message consumeMessages(String messagePattern, int messageNumber) throws JMSException |
| { |
| Message receivedMesssage = null; |
| for (int i = 0; i < messageNumber; i++) |
| { |
| receivedMesssage = _consumer.receive(1000L); |
| assertReceivedMessage(receivedMesssage, messagePattern, i); |
| } |
| return receivedMesssage; |
| } |
| |
| /** |
| * Asserts received message |
| * |
| * @param receivedMessage |
| * received message |
| * @param messagePattern |
| * messages content pattern |
| * @param messageIndex |
| * message index |
| */ |
| private void assertReceivedMessage(Message receivedMessage, String messagePattern, int messageIndex) throws JMSException |
| { |
| assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage); |
| assertTrue("Failure to receive message [" + messageIndex + "], expected TextMessage but received " |
| + receivedMessage, receivedMessage instanceof TextMessage); |
| String expectedText = MessageFormat.format(messagePattern, messageIndex); |
| String receivedText = null; |
| try |
| { |
| receivedText = ((TextMessage) receivedMessage).getText(); |
| } |
| catch (JMSException e) |
| { |
| fail("JMSException occured while getting message text:" + e.getMessage()); |
| } |
| LOGGER.debug("Test message number " + messageIndex + " consumed with text = " + receivedText + ", and JMSMessageID = " + receivedMessage.getJMSMessageID()); |
| assertEquals("Failover is broken! Expected [" + expectedText + "] but got [" + receivedText + "]", |
| expectedText, receivedText); |
| } |
| |
| |
| private void restartAndAwaitFailoverCompletion() |
| { |
| getBrokerAdmin().restart(); |
| awaitForFailoverCompletion(); |
| } |
| |
| private void awaitForFailoverCompletion() |
| { |
| awaitForFailoverCompletion(2 * DEFAULT_FAILOVER_TIME); |
| } |
| |
| private void awaitForFailoverCompletion(long delay) |
| { |
| LOGGER.info("Awaiting {} ms for failover completion..", delay); |
| try |
| { |
| if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS)) |
| { |
| fail("Failover did not complete"); |
| } |
| } |
| catch (InterruptedException e) |
| { |
| fail("Test was interrupted:" + e.getMessage()); |
| } |
| } |
| |
| @Override |
| public void onException(JMSException e) |
| { |
| } |
| |
| @After |
| public void tearDown() |
| { |
| if (_connection != null) |
| { |
| try |
| { |
| _connection.close(); |
| } |
| catch (JMSException e) |
| { |
| // PASS |
| } |
| } |
| } |
| |
| public Connection getConnection() throws JMSException |
| { |
| return createConnectionWithFailover(); |
| } |
| |
| public void failDefaultBroker() |
| { |
| getBrokerAdmin().stop(); |
| } |
| |
| @Override |
| public void bytesSent(long count) |
| { |
| } |
| |
| @Override |
| public void bytesReceived(long count) |
| { |
| } |
| |
| @Override |
| public boolean preFailover(boolean redirect) |
| { |
| _failoverStarted.countDown(); |
| return true; |
| } |
| |
| @Override |
| public boolean preResubscribe() |
| { |
| return true; |
| } |
| |
| @Override |
| public void failoverComplete() |
| { |
| _failoverComplete.countDown(); |
| } |
| |
| /** |
| * Causes 1 second delay before reconnect in order to test whether JMS |
| * methods block while failover is in progress |
| */ |
| private static class DelayingFailoverPolicy extends FailoverPolicy |
| { |
| |
| private CountDownLatch _suspendLatch; |
| private long _delay; |
| |
| DelayingFailoverPolicy(AMQConnection connection, long delay) |
| { |
| super(connection.getConnectionURL(), connection); |
| _suspendLatch = new CountDownLatch(1); |
| _delay = delay; |
| } |
| |
| @Override |
| public void attainedConnection() |
| { |
| try |
| { |
| _suspendLatch.await(_delay, TimeUnit.SECONDS); |
| } |
| catch (InterruptedException e) |
| { |
| // continue |
| } |
| super.attainedConnection(); |
| } |
| } |
| |
| /** |
| * Tests {@link Session#close()} for session with given acknowledge mode |
| * to ensure that it blocks until failover implementation restores connection. |
| * |
| * @param acknowledgeMode session acknowledge mode |
| */ |
| private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws Exception |
| { |
| initDelayedFailover(acknowledgeMode); |
| |
| // intentionally receive message but not commit or acknowledge it in |
| // case of transacted or CLIENT_ACK session |
| Message receivedMessage = _consumer.receive(1000L); |
| assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); |
| |
| getBrokerAdmin().restart(); |
| |
| // wait until failover is started |
| _failoverStarted.await(5, TimeUnit.SECONDS); |
| |
| // test whether session#close blocks while failover is in progress |
| _consumerSession.close(); |
| |
| assertTrue("Failover has not completed yet but session was closed", _failoverComplete.await(5, TimeUnit.SECONDS)); |
| } |
| |
| /** |
| * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing. |
| * |
| * @param acknowledgeMode session acknowledge mode |
| * @return queue browser |
| */ |
| private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException, QpidException |
| { |
| init(acknowledgeMode, false); |
| _consumer.close(); |
| _connection.start(); |
| |
| produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false); |
| if (acknowledgeMode == Session.SESSION_TRANSACTED) |
| { |
| _producerSession.commit(); |
| } |
| else |
| { |
| ((AMQSession)_producerSession).sync(); |
| } |
| |
| return _consumerSession.createBrowser((Queue) _destination); |
| } |
| |
| /** |
| * Tests {@link QueueBrowser#close()} for session with given acknowledge mode |
| * to ensure that it blocks until failover implementation restores connection. |
| * |
| * @param acknowledgeMode session acknowledge mode |
| */ |
| private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws Exception |
| { |
| QueueBrowser browser = prepareQueueBrowser(acknowledgeMode); |
| |
| @SuppressWarnings("unchecked") |
| Enumeration<Message> messages = browser.getEnumeration(); |
| Message receivedMessage = messages.nextElement(); |
| assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0); |
| |
| getBrokerAdmin().restart(); |
| |
| // wait until failover is started |
| _failoverStarted.await(5, TimeUnit.SECONDS); |
| |
| browser.close(); |
| |
| assertTrue("Failover has not completed yet but browser was closed", _failoverComplete.await(5, TimeUnit.SECONDS)); |
| } |
| |
| private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException |
| { |
| DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy(); |
| init(acknowledgeMode, true); |
| produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false); |
| if (acknowledgeMode == Session.SESSION_TRANSACTED) |
| { |
| _producerSession.commit(); |
| } |
| return failoverPolicy; |
| } |
| |
| private DelayingFailoverPolicy setDelayedFailoverPolicy() |
| { |
| return setDelayedFailoverPolicy(2); |
| } |
| |
| private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay) |
| { |
| AMQConnection amqConnection = (AMQConnection) _connection; |
| DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay); |
| ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy); |
| return failoverPolicy; |
| } |
| |
| |
| public Message createNextMessage(Session session, int msgCount) throws JMSException |
| { |
| Message message = createMessage(session, DEFAULT_MESSAGE_SIZE); |
| message.setIntProperty(Utils.INDEX, msgCount); |
| |
| return message; |
| } |
| |
| public Message createMessage(Session session, int messageSize) throws JMSException |
| { |
| String payload = new String(new byte[messageSize]); |
| |
| TextMessage message = session.createTextMessage(); |
| message.setText(payload); |
| return message; |
| } |
| } |