blob: a5b9c618bc761ada5cd9077a8356e9f67a06d057 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.client.failover;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.test.utils.FailoverBaseCase;
/**
* Test suite to test all possible failover corner cases
*/
public class FailoverBehaviourTest extends FailoverBaseCase implements ConnectionListener, ExceptionListener
{
private static final String TEST_MESSAGE_FORMAT = "test message {0}";
/** Indicates whether tests are run against clustered broker */
private static boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
/** Default number of messages to send before failover */
private static final int DEFAULT_NUMBER_OF_MESSAGES = 10;
/** Actual number of messages to send before failover */
protected int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES);
/** Test connection */
protected Connection _connection;
/**
* Failover completion latch is used to wait till connectivity to broker is
* restored
*/
private CountDownLatch _failoverComplete;
/**
* Consumer session
*/
private Session _consumerSession;
/**
* Test destination
*/
private Destination _destination;
/**
* Consumer
*/
private MessageConsumer _consumer;
/**
* Producer session
*/
private Session _producerSession;
/**
* Producer
*/
private MessageProducer _producer;
/**
* Holds exception sent into {@link ExceptionListener} on failover
*/
private JMSException _exceptionListenerException;
/**
* Latch to check that failover mutex is hold by a failover thread
*/
private CountDownLatch _failoverStarted;
@Override
protected void setUp() throws Exception
{
super.setUp();
_connection = getConnection();
_connection.setExceptionListener(this);
((AMQConnection) _connection).setConnectionListener(this);
_failoverComplete = new CountDownLatch(1);
_failoverStarted = new CountDownLatch(1);
}
/**
* Test whether MessageProducer can successfully publish messages after
* failover and rollback transaction
*/
public void testMessageProducingAndRollbackAfterFailover() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
causeFailure();
assertFailoverException();
// producer should be able to send messages after failover
_producer.send(_producerSession.createTextMessage("test message " + _messageNumber));
// rollback after failover
_producerSession.rollback();
// tests whether sending and committing is working after failover
produceMessages();
_producerSession.commit();
// tests whether receiving and committing is working after failover
consumeMessages();
_consumerSession.commit();
}
/**
* Test whether {@link TransactionRolledBackException} is thrown on commit
* of dirty transacted session after failover.
* <p>
* Verifies whether second after failover commit is successful.
*/
public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnProducingMessages() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
causeFailure();
assertFailoverException();
// producer should be able to send messages after failover
_producer.send(_producerSession.createTextMessage("test message " + _messageNumber));
try
{
_producerSession.commit();
fail("TransactionRolledBackException is expected on commit after failover with dirty session!");
}
catch (JMSException t)
{
assertTrue("Expected TransactionRolledBackException but thrown " + t,
t instanceof TransactionRolledBackException);
}
// simulate process of user replaying the transaction
produceMessages("replayed test message {0}", _messageNumber, false);
// no exception should be thrown
_producerSession.commit();
// only messages sent after rollback should be received
consumeMessages("replayed test message {0}", _messageNumber);
// no exception should be thrown
_consumerSession.commit();
}
/**
* Tests JMSException is not thrown on commit with a clean session after
* failover
*/
public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanProducerSession() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
causeFailure();
assertFailoverException();
// should not throw an exception for a clean session
_producerSession.commit();
// tests whether sending and committing is working after failover
produceMessages();
_producerSession.commit();
// tests whether receiving and committing is working after failover
consumeMessages();
_consumerSession.commit();
}
/**
* Tests {@link TransactionRolledBackException} is thrown on commit of dirty
* transacted session after failover.
* <p>
* Verifies whether second after failover commit is successful.
*/
public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnMessageReceiving() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
_producerSession.commit();
// receive messages but do not commit
consumeMessages();
causeFailure();
assertFailoverException();
try
{
// should throw TransactionRolledBackException
_consumerSession.commit();
fail("TransactionRolledBackException is expected on commit after failover");
}
catch (Exception t)
{
assertTrue("Expected TransactionRolledBackException but thrown " + t,
t instanceof TransactionRolledBackException);
}
resendMessagesIfNecessary();
// consume messages successfully
consumeMessages();
_consumerSession.commit();
}
/**
* Tests JMSException is not thrown on commit with a clean session after failover
*/
public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanConsumerSession() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
_producerSession.commit();
consumeMessages();
_consumerSession.commit();
causeFailure();
assertFailoverException();
// should not throw an exception with a clean consumer session
_consumerSession.commit();
}
/**
* Test that TransactionRolledBackException is thrown on commit of
* dirty session in asynchronous consumer after failover.
*/
public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnReceivingMessagesAsynchronously()
throws Exception
{
init(Session.SESSION_TRANSACTED, false);
FailoverTestMessageListener ml = new FailoverTestMessageListener();
_consumer.setMessageListener(ml);
_connection.start();
produceMessages();
_producerSession.commit();
// wait for message receiving
ml.awaitForEnd();
assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
// assert messages
int counter = 0;
for (Message message : ml.getReceivedMessages())
{
assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
}
ml.reset();
causeFailure();
assertFailoverException();
try
{
_consumerSession.commit();
fail("TransactionRolledBackException should be thrown!");
}
catch (TransactionRolledBackException e)
{
// that is what is expected
}
resendMessagesIfNecessary();
// wait for message receiving
ml.awaitForEnd();
assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
// assert messages
counter = 0;
for (Message message : ml.getReceivedMessages())
{
assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
}
// commit again. It should be successful
_consumerSession.commit();
}
/**
* Test that {@link Session#rollback()} does not throw exception after failover
* and that we are able to consume messages.
*/
public void testRollbackAfterFailover() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
_producerSession.commit();
consumeMessages();
causeFailure();
assertFailoverException();
_consumerSession.rollback();
resendMessagesIfNecessary();
// tests whether receiving and committing is working after failover
consumeMessages();
_consumerSession.commit();
}
/**
* Test that {@link Session#rollback()} does not throw exception after receiving further messages
* after failover, and we can receive published messages after rollback.
*/
public void testRollbackAfterReceivingAfterFailover() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
_producerSession.commit();
consumeMessages();
causeFailure();
assertFailoverException();
resendMessagesIfNecessary();
consumeMessages();
_consumerSession.rollback();
// tests whether receiving and committing is working after failover
consumeMessages();
_consumerSession.commit();
}
/**
* Test that {@link Session#recover()} does not throw an exception after failover
* and that we can consume messages after recover.
*/
public void testRecoverAfterFailover() throws Exception
{
init(Session.CLIENT_ACKNOWLEDGE, true);
produceMessages();
// consume messages but do not acknowledge them
consumeMessages();
causeFailure();
assertFailoverException();
_consumerSession.recover();
resendMessagesIfNecessary();
// tests whether receiving and acknowledgment is working after recover
Message lastMessage = consumeMessages();
lastMessage.acknowledge();
}
/**
* Test that receiving more messages after failover and then calling
* {@link Session#recover()} does not throw an exception
* and that we can consume messages after recover.
*/
public void testRecoverWithConsumedMessagesAfterFailover() throws Exception
{
init(Session.CLIENT_ACKNOWLEDGE, true);
produceMessages();
// consume messages but do not acknowledge them
consumeMessages();
causeFailure();
assertFailoverException();
// publishing should work after failover
resendMessagesIfNecessary();
// consume messages again on a dirty session
consumeMessages();
// recover should successfully restore session
_consumerSession.recover();
// tests whether receiving and acknowledgment is working after recover
Message lastMessage = consumeMessages();
lastMessage.acknowledge();
}
/**
* Test that first call to {@link Message#acknowledge()} after failover
* throws a JMSEXception if session is dirty.
*/
public void testAcknowledgeAfterFailover() throws Exception
{
init(Session.CLIENT_ACKNOWLEDGE, true);
produceMessages();
// consume messages but do not acknowledge them
Message lastMessage = consumeMessages();
causeFailure();
assertFailoverException();
try
{
// an implicit recover performed when acknowledge throws an exception due to failover
lastMessage.acknowledge();
fail("JMSException should be thrown");
}
catch (JMSException t)
{
// TODO: assert error code and/or expected exception type
}
resendMessagesIfNecessary();
// tests whether receiving and acknowledgment is working after recover
lastMessage = consumeMessages();
lastMessage.acknowledge();
}
/**
* Test that calling acknowledge before failover leaves the session
* clean for use after failover.
*/
public void testAcknowledgeBeforeFailover() throws Exception
{
init(Session.CLIENT_ACKNOWLEDGE, true);
produceMessages();
// consume messages and acknowledge them
Message lastMessage = consumeMessages();
lastMessage.acknowledge();
causeFailure();
assertFailoverException();
produceMessages();
// tests whether receiving and acknowledgment is working after recover
lastMessage = consumeMessages();
lastMessage.acknowledge();
}
/**
* Test that receiving of messages after failover prior to calling
* {@link Message#acknowledge()} still results in acknowledge throwing an exception.
*/
public void testAcknowledgeAfterMessageReceivingAfterFailover() throws Exception
{
init(Session.CLIENT_ACKNOWLEDGE, true);
produceMessages();
// consume messages but do not acknowledge them
consumeMessages();
causeFailure();
assertFailoverException();
resendMessagesIfNecessary();
// consume again on dirty session
Message lastMessage = consumeMessages();
try
{
// an implicit recover performed when acknowledge throws an exception due to failover
lastMessage.acknowledge();
fail("JMSException should be thrown");
}
catch (JMSException t)
{
// TODO: assert error code and/or expected exception type
}
// tests whether receiving and acknowledgment is working on a clean session
lastMessage = consumeMessages();
lastMessage.acknowledge();
}
/**
* Tests that call to {@link Message#acknowledge()} after failover throws an exception in asynchronous consumer
* and we can consume messages after acknowledge.
*/
public void testAcknowledgeAfterFailoverForAsynchronousConsumer() throws Exception
{
init(Session.CLIENT_ACKNOWLEDGE, false);
FailoverTestMessageListener ml = new FailoverTestMessageListener();
_consumer.setMessageListener(ml);
_connection.start();
produceMessages();
// wait for message receiving
ml.awaitForEnd();
assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
// assert messages
int counter = 0;
Message currentMessage = null;
for (Message message : ml.getReceivedMessages())
{
assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
currentMessage = message;
}
ml.reset();
causeFailure();
assertFailoverException();
try
{
currentMessage.acknowledge();
fail("JMSException should be thrown!");
}
catch (JMSException e)
{
// TODO: assert error code and/or expected exception type
}
resendMessagesIfNecessary();
// wait for message receiving
ml.awaitForEnd();
assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
// assert messages
counter = 0;
for (Message message : ml.getReceivedMessages())
{
assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
currentMessage = message;
}
// acknowledge again. It should be successful
currentMessage.acknowledge();
}
/**
* Test whether {@link Session#recover()} works as expected after failover
* in AA mode.
*/
public void testRecoverAfterFailoverInAutoAcknowledgeMode() throws Exception
{
init(Session.AUTO_ACKNOWLEDGE, true);
produceMessages();
// receive first message in order to start a dispatcher thread
Message receivedMessage = _consumer.receive(1000l);
assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
causeFailure();
assertFailoverException();
_consumerSession.recover();
resendMessagesIfNecessary();
// tests whether receiving is working after recover
consumeMessages();
}
public void testClientAcknowledgedSessionCloseAfterFailover() throws Exception
{
sessionCloseAfterFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
}
public void testTransactedSessionCloseAfterFailover() throws Exception
{
sessionCloseAfterFailoverImpl(Session.SESSION_TRANSACTED);
}
public void testAutoAcknowledgedSessionCloseAfterFailover() throws Exception
{
sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE);
}
public void testPublishAutoAcknowledgedWhileFailover() throws Exception
{
publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE);
}
public void testPublishClientAcknowledgedWhileFailover() throws Exception
{
Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE);
receivedMessage.acknowledge();
}
public void testPublishTransactedAcknowledgedWhileFailover() throws Exception
{
publishWhileFailingOver(Session.SESSION_TRANSACTED);
_consumerSession.commit();
}
public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception
{
publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE);
}
public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception
{
publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE);
}
public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception
{
publishWithFailoverMutex(Session.SESSION_TRANSACTED);
}
public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception
{
sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
}
public void testTransactedSessionCloseWhileFailover() throws Exception
{
sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
}
public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception
{
sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
}
public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
{
browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
}
public void testTransactedQueueBrowserCloseWhileFailover() throws Exception
{
browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
}
public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
{
browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
}
private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
{
setDelayedFailoverPolicy(5);
init(autoAcknowledge, true);
String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
Message message = _producerSession.createTextMessage(text);
failBroker(getFailingPort());
if(!_failoverStarted.await(5, TimeUnit.SECONDS))
{
fail("Did not receieve notification failover had started");
}
_producer.send(message);
if (_producerSession.getTransacted())
{
_producerSession.commit();
}
Message receivedMessage = _consumer.receive(1000l);
assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
return receivedMessage;
}
private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException
{
setDelayedFailoverPolicy(5);
init(autoAcknowledge, true);
String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
Message message = _producerSession.createTextMessage(text);
AMQConnection connection = (AMQConnection)_connection;
// holding failover mutex should prevent the failover from
// proceeding before we try to send the message
synchronized(connection.getFailoverMutex())
{
failBroker(getFailingPort());
// wait to make sure that connection is lost
while(!connection.isFailingOver())
{
Thread.sleep(25l);
}
try
{
_producer.send(message);
fail("Sending should fail because connection was lost and failover has not yet completed");
}
catch(JMSException e)
{
// JMSException is expected
}
}
// wait for failover completion, thus ensuring it actually
//got started, before allowing the test to tear down
awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
}
/**
* Tests {@link Session#close()} for session with given acknowledge mode
* to ensure that close works after failover.
*
* @param acknowledgeMode session acknowledge mode
* @throws JMSException
*/
private void sessionCloseAfterFailoverImpl(int acknowledgeMode) throws JMSException
{
init(acknowledgeMode, true);
produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
if (acknowledgeMode == Session.SESSION_TRANSACTED)
{
_producerSession.commit();
}
// intentionally receive message but do not commit or acknowledge it in
// case of transacted or CLIENT_ACK session
Message receivedMessage = _consumer.receive(1000l);
assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
causeFailure();
assertFailoverException();
// for transacted/client_ack session
// no exception should be thrown but transaction should be automatically
// rolled back
_consumerSession.close();
}
/**
* A helper method to instantiate produce and consumer sessions, producer
* and consumer.
*
* @param acknowledgeMode
* acknowledge mode
* @param startConnection
* indicates whether connection should be started
* @throws JMSException
*/
private void init(int acknowledgeMode, boolean startConnection) throws JMSException
{
boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
_consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
_destination = createDestination(_consumerSession);
_consumer = _consumerSession.createConsumer(_destination);
if (startConnection)
{
_connection.start();
}
_producerSession = _connection.createSession(isTransacted, acknowledgeMode);
_producer = _producerSession.createProducer(_destination);
}
protected Destination createDestination(Session session) throws JMSException
{
return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
}
/**
* Resends messages if reconnected to a non-clustered broker
*
* @throws JMSException
*/
private void resendMessagesIfNecessary() throws JMSException
{
if (!CLUSTERED)
{
// assert that a new broker does not have messages on a queue
if (_consumer.getMessageListener() == null)
{
Message message = _consumer.receive(100l);
assertNull("Received a message after failover with non-clustered broker!", message);
}
// re-sending messages if reconnected to a non-clustered broker
produceMessages(true);
}
}
/**
* Produces a default number of messages with default text content into test
* queue
*
* @throws JMSException
*/
private void produceMessages() throws JMSException
{
produceMessages(false);
}
private void produceMessages(boolean seperateProducer) throws JMSException
{
produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, seperateProducer);
}
/**
* Consumes a default number of messages and asserts their content.
*
* @return last consumed message
* @throws JMSException
*/
private Message consumeMessages() throws JMSException
{
return consumeMessages(TEST_MESSAGE_FORMAT, _messageNumber);
}
/**
* Produces given number of text messages with content matching given
* content pattern
*
* @param messagePattern message content pattern
* @param messageNumber number of messages to send
* @param standaloneProducer whether to use the existing producer or a new one.
* @throws JMSException
*/
private void produceMessages(String messagePattern, int messageNumber, boolean standaloneProducer) throws JMSException
{
Session producerSession;
MessageProducer producer;
if(standaloneProducer)
{
producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
producer = producerSession.createProducer(_destination);
}
else
{
producerSession = _producerSession;
producer = _producer;
}
for (int i = 0; i < messageNumber; i++)
{
String text = MessageFormat.format(messagePattern, i);
Message message = producerSession.createTextMessage(text);
producer.send(message);
}
if(standaloneProducer)
{
producerSession.commit();
}
}
/**
* Consumes given number of text messages and asserts that their content
* matches given pattern
*
* @param messagePattern
* messages content pattern
* @param messageNumber
* message number to received
* @return last consumed message
* @throws JMSException
*/
private Message consumeMessages(String messagePattern, int messageNumber) throws JMSException
{
Message receivedMesssage = null;
for (int i = 0; i < messageNumber; i++)
{
receivedMesssage = _consumer.receive(1000l);
assertReceivedMessage(receivedMesssage, messagePattern, i);
}
return receivedMesssage;
}
/**
* Asserts received message
*
* @param receivedMessage
* received message
* @param messagePattern
* messages content pattern
* @param messageIndex
* message index
*/
private void assertReceivedMessage(Message receivedMessage, String messagePattern, int messageIndex)
{
assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage);
assertTrue("Failure to receive message [" + messageIndex + "], expected TextMessage but received "
+ receivedMessage, receivedMessage instanceof TextMessage);
String expectedText = MessageFormat.format(messagePattern, messageIndex);
String receivedText = null;
try
{
receivedText = ((TextMessage) receivedMessage).getText();
}
catch (JMSException e)
{
fail("JMSException occured while getting message text:" + e.getMessage());
}
assertEquals("Failover is broken! Expected [" + expectedText + "] but got [" + receivedText + "]",
expectedText, receivedText);
}
/**
* Causes failover and waits till connection is re-established.
*/
private void causeFailure()
{
causeFailure(getFailingPort(), DEFAULT_FAILOVER_TIME * 2);
}
/**
* Causes failover by stopping broker on given port and waits till
* connection is re-established during given time interval.
*
* @param port
* broker port
* @param delay
* time interval to wait for connection re-establishement
*/
private void causeFailure(int port, long delay)
{
failBroker(port);
awaitForFailoverCompletion(delay);
}
private void awaitForFailoverCompletion(long delay)
{
_logger.info("Awaiting Failover completion..");
try
{
if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
{
fail("Failover did not complete");
}
}
catch (InterruptedException e)
{
fail("Test was interrupted:" + e.getMessage());
}
}
private void assertFailoverException()
{
// TODO: assert exception is received (once implemented)
// along with error code and/or expected exception type
}
@Override
public void bytesSent(long count)
{
}
@Override
public void bytesReceived(long count)
{
}
@Override
public boolean preFailover(boolean redirect)
{
_failoverStarted.countDown();
return true;
}
@Override
public boolean preResubscribe()
{
return true;
}
@Override
public void failoverComplete()
{
_failoverComplete.countDown();
}
@Override
public void onException(JMSException e)
{
_exceptionListenerException = e;
}
/**
* Causes 1 second delay before reconnect in order to test whether JMS
* methods block while failover is in progress
*/
private static class DelayingFailoverPolicy extends FailoverPolicy
{
private CountDownLatch _suspendLatch;
private long _delay;
public DelayingFailoverPolicy(AMQConnection connection, long delay)
{
super(connection.getConnectionURL(), connection);
_suspendLatch = new CountDownLatch(1);
_delay = delay;
}
public void attainedConnection()
{
try
{
_suspendLatch.await(_delay, TimeUnit.SECONDS);
}
catch (InterruptedException e)
{
// continue
}
super.attainedConnection();
}
}
private class FailoverTestMessageListener implements MessageListener
{
// message counter
private AtomicInteger _counter = new AtomicInteger();
private List<Message> _receivedMessage = new ArrayList<Message>();
private volatile CountDownLatch _endLatch;
public FailoverTestMessageListener() throws JMSException
{
_endLatch = new CountDownLatch(1);
}
@Override
public void onMessage(Message message)
{
_receivedMessage.add(message);
if (_counter.incrementAndGet() % _messageNumber == 0)
{
_endLatch.countDown();
}
}
public void reset()
{
_receivedMessage.clear();
_endLatch = new CountDownLatch(1);
_counter.set(0);
}
public List<Message> getReceivedMessages()
{
return _receivedMessage;
}
public Object awaitForEnd() throws InterruptedException
{
return _endLatch.await((long) _messageNumber, TimeUnit.SECONDS);
}
public int getMessageCounter()
{
return _counter.get();
}
}
/**
* Tests {@link Session#close()} for session with given acknowledge mode
* to ensure that it blocks until failover implementation restores connection.
*
* @param acknowledgeMode session acknowledge mode
* @throws JMSException
*/
private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
{
initDelayedFailover(acknowledgeMode);
// intentionally receive message but not commit or acknowledge it in
// case of transacted or CLIENT_ACK session
Message receivedMessage = _consumer.receive(1000l);
assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
failBroker(getFailingPort());
// test whether session#close blocks while failover is in progress
_consumerSession.close();
assertFailoverException();
}
/**
* A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing.
*
* @param acknowledgeMode session acknowledge mode
* @return queue browser
* @throws JMSException
*/
private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException
{
init(acknowledgeMode, false);
_consumer.close();
QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
_connection.start();
produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
if (acknowledgeMode == Session.SESSION_TRANSACTED)
{
_producerSession.commit();
}
return browser;
}
/**
* Tests {@link QueueBrowser#close()} for session with given acknowledge mode
* to ensure that it blocks until failover implementation restores connection.
*
* @param acknowledgeMode session acknowledge mode
* @throws JMSException
*/
private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws JMSException
{
setDelayedFailoverPolicy();
QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);
@SuppressWarnings("unchecked")
Enumeration<Message> messages = browser.getEnumeration();
Message receivedMessage = (Message) messages.nextElement();
assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
failBroker(getFailingPort());
browser.close();
assertFailoverException();
}
private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException
{
DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy();
init(acknowledgeMode, true);
produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
if (acknowledgeMode == Session.SESSION_TRANSACTED)
{
_producerSession.commit();
}
return failoverPolicy;
}
private DelayingFailoverPolicy setDelayedFailoverPolicy()
{
return setDelayedFailoverPolicy(2);
}
private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay)
{
AMQConnection amqConnection = (AMQConnection) _connection;
DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay);
((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
return failoverPolicy;
}
}