blob: 5fa7b2d125bac63d370633e42a8840300b0dc558 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.client.failover;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import javax.naming.NamingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.BrokerDetails;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.test.utils.FailoverBaseCase;
import org.apache.qpid.url.URLSyntaxException;
/**
* Test suite to test all possible failover corner cases
*/
public class FailoverBehaviourTest extends FailoverBaseCase implements ExceptionListener
{
protected static final Logger _LOGGER = LoggerFactory.getLogger(FailoverBehaviourTest.class);
private static final String TEST_MESSAGE_FORMAT = "test message {0}";
/** Indicates whether tests are run against clustered broker */
private static boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
/** Default number of messages to send before failover */
private static final int DEFAULT_NUMBER_OF_MESSAGES = 40;
/** Actual number of messages to send before failover */
protected int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES);
/** Test connection */
protected Connection _connection;
/**
* Consumer session
*/
private Session _consumerSession;
/**
* Test destination
*/
private Destination _destination;
/**
* Consumer
*/
private MessageConsumer _consumer;
/**
* Producer session
*/
private Session _producerSession;
/**
* Producer
*/
private MessageProducer _producer;
/**
* Holds exception sent into {@link ExceptionListener} on failover
*/
private JMSException _exceptionListenerException;
@Override
public void setUp() throws Exception
{
super.setUp();
_connection = getConnection();
_connection.setExceptionListener(this);
((AMQConnection) _connection).setConnectionListener(this);
}
/**
* Test whether MessageProducer can successfully publish messages after
* failover and rollback transaction
*/
public void testMessageProducingAndRollbackAfterFailover() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
causeFailure();
assertFailoverException();
// producer should be able to send messages after failover
_producer.send(_producerSession.createTextMessage("test message " + _messageNumber));
// rollback after failover
_producerSession.rollback();
// tests whether sending and committing is working after failover
produceMessages();
_producerSession.commit();
// tests whether receiving and committing is working after failover
consumeMessages();
_consumerSession.commit();
}
/**
* Test whether {@link TransactionRolledBackException} is thrown on commit
* of dirty transacted session after failover.
* <p>
* Verifies whether second after failover commit is successful.
*/
public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnProducingMessages() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
causeFailure();
assertFailoverException();
// producer should be able to send messages after failover
_producer.send(_producerSession.createTextMessage("test message " + _messageNumber));
try
{
_producerSession.commit();
fail("TransactionRolledBackException is expected on commit after failover with dirty session!");
}
catch (JMSException t)
{
assertTrue("Expected TransactionRolledBackException but thrown " + t,
t instanceof TransactionRolledBackException);
}
// simulate process of user replaying the transaction
produceMessages("replayed test message {0}", _messageNumber, false);
// no exception should be thrown
_producerSession.commit();
// only messages sent after rollback should be received
consumeMessages("replayed test message {0}", _messageNumber);
// no exception should be thrown
_consumerSession.commit();
}
/**
* Tests JMSException is not thrown on commit with a clean session after
* failover
*/
public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanProducerSession() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
causeFailure();
assertFailoverException();
// should not throw an exception for a clean session
_producerSession.commit();
// tests whether sending and committing is working after failover
produceMessages();
_producerSession.commit();
// tests whether receiving and committing is working after failover
consumeMessages();
_consumerSession.commit();
}
/**
* Tests {@link TransactionRolledBackException} is thrown on commit of dirty
* transacted session after failover.
* <p>
* Verifies whether second after failover commit is successful.
*/
public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnMessageReceiving() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
_producerSession.commit();
// receive messages but do not commit
consumeMessages();
causeFailure();
assertFailoverException();
try
{
// should throw TransactionRolledBackException
_consumerSession.commit();
fail("TransactionRolledBackException is expected on commit after failover");
}
catch (Exception t)
{
assertTrue("Expected TransactionRolledBackException but thrown " + t,
t instanceof TransactionRolledBackException);
}
resendMessagesIfNecessary();
// consume messages successfully
consumeMessages();
_consumerSession.commit();
}
/**
* Tests JMSException is not thrown on commit with a clean session after failover
*/
public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanConsumerSession() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
_producerSession.commit();
consumeMessages();
_consumerSession.commit();
causeFailure();
assertFailoverException();
// should not throw an exception with a clean consumer session
_consumerSession.commit();
}
/**
* Test that TransactionRolledBackException is thrown on commit of
* dirty session in asynchronous consumer after failover.
*/
public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnReceivingMessagesAsynchronously()
throws Exception
{
init(Session.SESSION_TRANSACTED, false);
FailoverTestMessageListener ml = new FailoverTestMessageListener();
_consumer.setMessageListener(ml);
_connection.start();
produceMessages();
_producerSession.commit();
// wait for message receiving
ml.awaitForEnd();
assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
// assert messages
int counter = 0;
for (Message message : ml.getReceivedMessages())
{
assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
}
ml.reset();
causeFailure();
assertFailoverException();
try
{
_consumerSession.commit();
fail("TransactionRolledBackException should be thrown!");
}
catch (TransactionRolledBackException e)
{
// that is what is expected
}
resendMessagesIfNecessary();
// wait for message receiving
ml.awaitForEnd();
assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
// assert messages
counter = 0;
for (Message message : ml.getReceivedMessages())
{
assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
}
// commit again. It should be successful
_consumerSession.commit();
}
/**
* Test that {@link Session#rollback()} does not throw exception after failover
* and that we are able to consume messages.
*/
public void testRollbackAfterFailover() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
_producerSession.commit();
consumeMessages();
causeFailure();
assertFailoverException();
_consumerSession.rollback();
resendMessagesIfNecessary();
// tests whether receiving and committing is working after failover
consumeMessages();
_consumerSession.commit();
}
/**
* Test that {@link Session#rollback()} does not throw exception after receiving further messages
* after failover, and we can receive published messages after rollback.
*/
public void testRollbackAfterReceivingAfterFailover() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
_producerSession.commit();
consumeMessages();
causeFailure();
assertFailoverException();
resendMessagesIfNecessary();
consumeMessages();
_consumerSession.rollback();
// tests whether receiving and committing is working after failover
consumeMessages();
_consumerSession.commit();
}
/**
* Test that {@link Session#recover()} does not throw an exception after failover
* and that we can consume messages after recover.
*/
public void testRecoverAfterFailover() throws Exception
{
init(Session.CLIENT_ACKNOWLEDGE, true);
produceMessages();
// consume messages but do not acknowledge them
consumeMessages();
causeFailure();
assertFailoverException();
_consumerSession.recover();
resendMessagesIfNecessary();
// tests whether receiving and acknowledgment is working after recover
Message lastMessage = consumeMessages();
lastMessage.acknowledge();
}
/**
* Test that receiving more messages after failover and then calling
* {@link Session#recover()} does not throw an exception
* and that we can consume messages after recover.
*/
public void testRecoverWithConsumedMessagesAfterFailover() throws Exception
{
init(Session.CLIENT_ACKNOWLEDGE, true);
produceMessages();
// consume messages but do not acknowledge them
consumeMessages();
causeFailure();
assertFailoverException();
// publishing should work after failover
resendMessagesIfNecessary();
// consume messages again on a dirty session
consumeMessages();
// recover should successfully restore session
_consumerSession.recover();
// tests whether receiving and acknowledgment is working after recover
Message lastMessage = consumeMessages();
lastMessage.acknowledge();
}
/**
* Test that first call to {@link Message#acknowledge()} after failover
* throws a JMSEXception if session is dirty.
*/
public void testAcknowledgeAfterFailover() throws Exception
{
init(Session.CLIENT_ACKNOWLEDGE, true);
produceMessages();
// consume messages but do not acknowledge them
Message lastMessage = consumeMessages();
causeFailure();
assertFailoverException();
try
{
// an implicit recover performed when acknowledge throws an exception due to failover
lastMessage.acknowledge();
fail("JMSException should be thrown");
}
catch (JMSException t)
{
// TODO: assert error code and/or expected exception type
}
resendMessagesIfNecessary();
// tests whether receiving and acknowledgment is working after recover
lastMessage = consumeMessages();
lastMessage.acknowledge();
}
/**
* Test that calling acknowledge before failover leaves the session
* clean for use after failover.
*/
public void testAcknowledgeBeforeFailover() throws Exception
{
init(Session.CLIENT_ACKNOWLEDGE, true);
produceMessages();
// consume messages and acknowledge them
Message lastMessage = consumeMessages();
lastMessage.acknowledge();
causeFailure();
assertFailoverException();
produceMessages();
// tests whether receiving and acknowledgment is working after recover
lastMessage = consumeMessages();
lastMessage.acknowledge();
}
/**
* Test that receiving of messages after failover prior to calling
* {@link Message#acknowledge()} still results in acknowledge throwing an exception.
*/
public void testAcknowledgeAfterMessageReceivingAfterFailover() throws Exception
{
init(Session.CLIENT_ACKNOWLEDGE, true);
produceMessages();
// consume messages but do not acknowledge them
consumeMessages();
causeFailure();
assertFailoverException();
resendMessagesIfNecessary();
// consume again on dirty session
Message lastMessage = consumeMessages();
try
{
// an implicit recover performed when acknowledge throws an exception due to failover
lastMessage.acknowledge();
fail("JMSException should be thrown");
}
catch (JMSException t)
{
// TODO: assert error code and/or expected exception type
}
// tests whether receiving and acknowledgment is working on a clean session
lastMessage = consumeMessages();
lastMessage.acknowledge();
}
/**
* Tests that call to {@link Message#acknowledge()} after failover throws an exception in asynchronous consumer
* and we can consume messages after acknowledge.
*/
public void testAcknowledgeAfterFailoverForAsynchronousConsumer() throws Exception
{
init(Session.CLIENT_ACKNOWLEDGE, false);
FailoverTestMessageListener ml = new FailoverTestMessageListener();
_consumer.setMessageListener(ml);
_connection.start();
produceMessages();
// wait for message receiving
ml.awaitForEnd();
assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
// assert messages
int counter = 0;
Message currentMessage = null;
for (Message message : ml.getReceivedMessages())
{
assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
currentMessage = message;
}
ml.reset();
causeFailure();
assertFailoverException();
try
{
currentMessage.acknowledge();
fail("JMSException should be thrown!");
}
catch (JMSException e)
{
// TODO: assert error code and/or expected exception type
}
resendMessagesIfNecessary();
// wait for message receiving
ml.awaitForEnd();
assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());
// assert messages
counter = 0;
for (Message message : ml.getReceivedMessages())
{
assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
currentMessage = message;
}
// acknowledge again. It should be successful
currentMessage.acknowledge();
}
/**
* Test whether {@link Session#recover()} works as expected after failover
* in AA mode.
*/
public void testRecoverAfterFailoverInAutoAcknowledgeMode() throws Exception
{
init(Session.AUTO_ACKNOWLEDGE, true);
produceMessages();
// receive first message in order to start a dispatcher thread
Message receivedMessage = _consumer.receive(1000l);
assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
causeFailure();
assertFailoverException();
_consumerSession.recover();
resendMessagesIfNecessary();
// tests whether receiving is working after recover
consumeMessages();
}
public void testClientAcknowledgedSessionCloseAfterFailover() throws Exception
{
sessionCloseAfterFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
}
public void testTransactedSessionCloseAfterFailover() throws Exception
{
sessionCloseAfterFailoverImpl(Session.SESSION_TRANSACTED);
}
public void testAutoAcknowledgedSessionCloseAfterFailover() throws Exception
{
sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE);
}
public void testPublishAutoAcknowledgedWhileFailover() throws Exception
{
publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE);
}
public void testPublishClientAcknowledgedWhileFailover() throws Exception
{
Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE);
receivedMessage.acknowledge();
}
public void testPublishTransactedAcknowledgedWhileFailover() throws Exception
{
publishWhileFailingOver(Session.SESSION_TRANSACTED);
_consumerSession.commit();
}
public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception
{
publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE);
}
public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception
{
publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE);
}
public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception
{
publishWithFailoverMutex(Session.SESSION_TRANSACTED);
}
public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception
{
sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
}
public void testTransactedSessionCloseWhileFailover() throws Exception
{
sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
}
public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception
{
sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
}
public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
{
browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
}
public void testTransactedQueueBrowserCloseWhileFailover() throws Exception
{
browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
}
public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
{
browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
}
public void testKillBrokerFailoverWhilstPublishingInFlight() throws Exception
{
doFailoverWhilstPublishingInFlight(true);
}
public void testStopBrokerFailoverWhilstPublishingInFlight() throws Exception
{
doFailoverWhilstPublishingInFlight(false);
}
private void doFailoverWhilstPublishingInFlight(boolean hardKill) throws Exception
{
init(Session.SESSION_TRANSACTED, false);
final int numberOfMessages = 200;
final CountDownLatch halfWay = new CountDownLatch(1);
final CountDownLatch allDone = new CountDownLatch(1);
final AtomicReference<Exception> exception = new AtomicReference<>();
Runnable producerRunnable = new Runnable()
{
@Override
public void run()
{
Thread.currentThread().setName("ProducingThread");
try
{
for(int i=0; i< numberOfMessages; i++)
{
boolean success = false;
while(!success)
{
try
{
Message message = _producerSession.createMessage();
message.setIntProperty("msgNum", i);
_producer.send(message);
_producerSession.commit();
success = true;
}
catch (javax.jms.IllegalStateException e)
{
// fail - failover should not leave a JMS object in an illegal state
throw e;
}
catch (JMSException e)
{
// OK we will be failing over
_logger.debug("Got JMS exception, probably just failing over", e);
}
}
if (i > numberOfMessages / 2 && halfWay.getCount() == 1)
{
halfWay.countDown();
}
}
allDone.countDown();
}
catch (Exception e)
{
exception.set(e);
}
}
};
Thread producerThread = new Thread(producerRunnable);
producerThread.start();
assertTrue("Didn't get to half way within timeout", halfWay.await(30000, TimeUnit.MILLISECONDS));
if (hardKill)
{
_logger.debug("Killing the Broker");
killDefaultBroker();
}
else
{
_logger.debug("Stopping the Broker");
stopDefaultBroker();
}
if (exception.get() != null)
{
_logger.error("Unexpected exception from producer thread", exception.get());
}
assertNull("Producer thread should not have got an exception", exception.get());
assertTrue("All producing work was not completed", allDone.await(30000, TimeUnit.MILLISECONDS));
producerThread.join(30000);
// Extra work to prove the session still okay
assertNotNull(_producerSession.createTemporaryQueue());
}
private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
{
setDelayedFailoverPolicy(5);
init(autoAcknowledge, true);
String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
Message message = _producerSession.createTextMessage(text);
failDefaultBroker();
if(!_failoverStarted.await(5, TimeUnit.SECONDS))
{
fail("Did not receieve notification failover had started");
}
_producer.send(message);
if (_producerSession.getTransacted())
{
_producerSession.commit();
}
Message receivedMessage = _consumer.receive(1000l);
assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
return receivedMessage;
}
private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException
{
setDelayedFailoverPolicy(5);
init(autoAcknowledge, true);
String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
Message message = _producerSession.createTextMessage(text);
AMQConnection connection = (AMQConnection)_connection;
// holding failover mutex should prevent the failover from
// proceeding before we try to send the message
synchronized(connection.getFailoverMutex())
{
failDefaultBroker();
// wait to make sure that connection is lost
while(!connection.isFailingOver())
{
Thread.sleep(25l);
}
try
{
_producer.send(message);
fail("Sending should fail because connection was lost and failover has not yet completed");
}
catch(JMSException e)
{
// JMSException is expected
}
}
// wait for failover completion, thus ensuring it actually
//got started, before allowing the test to tear down
awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
}
/**
* This test only tests 0-8/0-9/0-9-1 failover timeout
*/
public void testFailoverHandlerTimeoutExpires() throws Exception
{
_connection.close();
setTestSystemProperty("qpid.failover_method_timeout", "10000");
AMQConnection connection = null;
try
{
connection = createConnectionWithFailover();
// holding failover mutex should prevent the failover from proceeding
synchronized(connection.getFailoverMutex())
{
killDefaultBroker();
startDefaultBroker();
// sleep interval exceeds failover timeout interval
Thread.sleep(11000l);
}
// allows the failover thread to proceed
Thread.yield();
assertFalse("Unexpected failover", _failoverComplete.await(2000l, TimeUnit.MILLISECONDS));
assertTrue("Failover should not succeed due to timeout", connection.isClosed());
}
finally
{
if (connection != null)
{
connection.close();
}
}
}
public void testFailoverHandlerTimeoutReconnected() throws Exception
{
_connection.close();
setTestSystemProperty("qpid.failover_method_timeout", "10000");
AMQConnection connection = null;
try
{
connection = createConnectionWithFailover();
// holding failover mutex should prevent the failover from proceeding
synchronized(connection.getFailoverMutex())
{
killDefaultBroker();
startDefaultBroker();
}
// allows the failover thread to proceed
Thread.yield();
awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
assertFalse("Failover should restore connectivity", connection.isClosed());
}
finally
{
if (connection != null)
{
connection.close();
}
}
}
/**
* Tests that the producer flow control flag is reset when failover occurs while
* the producers are being blocked by the broker.
*
* Uses Apache Qpid Broker for Java specific queue configuration to enabled PSFC.
*/
public void testFlowControlFlagResetOnFailover() throws Exception
{
// we do not need the connection failing to second broker
_connection.close();
// make sure that failover timeout is bigger than flow control timeout
setTestSystemProperty("qpid.failover_method_timeout", "60000");
setTestSystemProperty("qpid.flow_control_wait_failure", "10000");
AMQConnection connection = null;
try
{
connection = createConnectionWithFailover(Collections.singletonMap(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all"));
final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2);
final AtomicInteger counter = new AtomicInteger();
// try to send 5 messages (should block after 4)
new Thread(new Runnable()
{
@Override
public void run()
{
try
{
MessageProducer producer = producerSession.createProducer(queue);
for (int i=0; i < 5; i++)
{
Message next = createNextMessage(producerSession, i);
producer.send(next);
producerSession.commit();
counter.incrementAndGet();
}
}
catch(Exception e)
{
// ignore
}
}
}).start();
long limit= 30000l;
long start = System.currentTimeMillis();
// wait until session is blocked
while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit)
{
Thread.sleep(100l);
}
assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
final int currentCounter = counter.get();
assertTrue("Unexpected number of sent messages:" + currentCounter, currentCounter >=3);
killDefaultBroker();
startDefaultBroker();
// allows the failover thread to proceed
Thread.yield();
awaitForFailoverCompletion(60000l);
assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
}
finally
{
if (connection != null)
{
connection.close();
}
}
}
public void testFailoverWhenConnectionStopped() throws Exception
{
init(Session.SESSION_TRANSACTED, true);
produceMessages();
_producerSession.commit();
final CountDownLatch stopFlag = new CountDownLatch(1);
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch expectedMessageLatch = new CountDownLatch(_messageNumber);
final AtomicInteger counter = new AtomicInteger();
_consumer.setMessageListener(new MessageListener()
{
@Override
public void onMessage(Message message)
{
if (stopFlag.getCount() == 1)
{
try
{
_LOGGER.debug("Stopping connection from dispatcher thread");
_connection.stop();
_LOGGER.debug("Connection stopped from dispatcher thread");
}
catch (Exception e)
{
exception.set(e);
}
finally
{
stopFlag.countDown();
failDefaultBroker();
}
}
else
{
try
{
_consumerSession.commit();
counter.incrementAndGet();
expectedMessageLatch.countDown();
}
catch (Exception e)
{
exception.set(e);
}
}
}
});
boolean stopResult = stopFlag.await(2000, TimeUnit.MILLISECONDS);
assertTrue("Connection was not stopped" + (exception.get() == null ? "." : ":" + exception.get().getMessage()),
stopResult);
assertNull("Unexpected exception on stop :" + exception.get(), exception.get());
// wait for failover to complete
awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
assertFailoverException();
resendMessagesIfNecessary();
_producerSession.commit();
_connection.start();
assertTrue("Not all messages were delivered. Remaining message number " + expectedMessageLatch.getCount(), expectedMessageLatch.await(11000, TimeUnit.MILLISECONDS));
Thread.sleep(500l);
assertEquals("Unexpected messages recieved ", _messageNumber, counter.get());
_connection.close();
}
public void testConnectionCloseInterruptsFailover() throws Exception
{
_connection.close();
final AtomicBoolean failoverCompleted = new AtomicBoolean(false);
final CountDownLatch failoverBegun = new CountDownLatch(1);
AMQConnection connection = createConnectionWithFailover();
connection.setConnectionListener(new ConnectionListener()
{
@Override
public void bytesSent(final long count)
{
}
@Override
public void bytesReceived(final long count)
{
}
@Override
public boolean preFailover(final boolean redirect)
{
failoverBegun.countDown();
_LOGGER.info("Failover started");
return true;
}
@Override
public boolean preResubscribe()
{
return true;
}
@Override
public void failoverComplete()
{
failoverCompleted.set(true);
}
});
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
assertNotNull("Session should be created", session);
killDefaultBroker();
boolean failingOver = failoverBegun.await(5000, TimeUnit.MILLISECONDS);
assertTrue("Failover did not begin with a reasonable time", failingOver);
// Failover will now be in flight
connection.close();
assertTrue("Failover policy is unexpectedly exhausted", connection.getFailoverPolicy().failoverAllowed());
}
private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
{
final Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-qpid-capacity", capacity);
arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
((AMQSession<?, ?>) session).createQueue(queueName, false, true, false, arguments);
Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true
+ "'&autodelete='" + false + "'");
((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
return queue;
}
private AMQConnection createConnectionWithFailover() throws NamingException, JMSException, URLSyntaxException
{
return createConnectionWithFailover(null);
}
private AMQConnection createConnectionWithFailover(Map<String,String> connectionOptions) throws NamingException, JMSException, URLSyntaxException
{
BrokerDetails origBrokerDetails = ((AMQConnectionFactory) getConnectionFactory("default")).getConnectionURL().getBrokerDetails(0);
String retries = "200";
String connectdelay = "1000";
String cycleCount = "2";
String newUrlFormat="amqp://username:password@clientid/test?brokerlist=" +
"'tcp://%s:%s?retries='%s'&connectdelay='%s''&failover='singlebroker?cyclecount='%s''";
String newUrl = String.format(newUrlFormat, origBrokerDetails.getHost(), origBrokerDetails.getPort(),
retries, connectdelay, cycleCount);
if (connectionOptions != null)
{
for (Map.Entry<String,String> option: connectionOptions.entrySet())
{
newUrl+= "&" + option.getKey() + "='" + option.getValue() + "'";
}
}
ConnectionFactory connectionFactory = new AMQConnectionFactory(newUrl);
AMQConnection connection = (AMQConnection) connectionFactory.createConnection("admin", "admin");
connection.setConnectionListener(this);
return connection;
}
/**
* Tests {@link Session#close()} for session with given acknowledge mode
* to ensure that close works after failover.
*
* @param acknowledgeMode session acknowledge mode
* @throws JMSException
*/
private void sessionCloseAfterFailoverImpl(int acknowledgeMode) throws JMSException
{
init(acknowledgeMode, true);
produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
if (acknowledgeMode == Session.SESSION_TRANSACTED)
{
_producerSession.commit();
}
// intentionally receive message but do not commit or acknowledge it in
// case of transacted or CLIENT_ACK session
Message receivedMessage = _consumer.receive(1000l);
assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
causeFailure();
assertFailoverException();
// for transacted/client_ack session
// no exception should be thrown but transaction should be automatically
// rolled back
_consumerSession.close();
}
/**
* A helper method to instantiate produce and consumer sessions, producer
* and consumer.
*
* @param acknowledgeMode
* acknowledge mode
* @param startConnection
* indicates whether connection should be started
* @throws JMSException
*/
private void init(int acknowledgeMode, boolean startConnection) throws JMSException
{
boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
_consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
_destination = createDestination(_consumerSession);
_consumer = _consumerSession.createConsumer(_destination);
if (startConnection)
{
_connection.start();
}
_producerSession = _connection.createSession(isTransacted, acknowledgeMode);
_producer = _producerSession.createProducer(_destination);
}
protected Destination createDestination(Session session) throws JMSException
{
return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
}
/**
* Resends messages if reconnected to a non-clustered broker
*
* @throws JMSException
*/
private void resendMessagesIfNecessary() throws JMSException
{
if (!CLUSTERED)
{
// assert that a new broker does not have messages on a queue
if (_consumer.getMessageListener() == null)
{
Message message = _consumer.receive(100l);
assertNull("Received a message after failover with non-clustered broker!", message);
}
// re-sending messages if reconnected to a non-clustered broker
produceMessages(true);
}
}
/**
* Produces a default number of messages with default text content into test
* queue
*
* @throws JMSException
*/
private void produceMessages() throws JMSException
{
produceMessages(false);
}
private void produceMessages(boolean seperateProducer) throws JMSException
{
produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, seperateProducer);
}
/**
* Consumes a default number of messages and asserts their content.
*
* @return last consumed message
* @throws JMSException
*/
private Message consumeMessages() throws JMSException
{
return consumeMessages(TEST_MESSAGE_FORMAT, _messageNumber);
}
/**
* Produces given number of text messages with content matching given
* content pattern
*
* @param messagePattern message content pattern
* @param messageNumber number of messages to send
* @param standaloneProducer whether to use the existing producer or a new one.
* @throws JMSException
*/
private void produceMessages(String messagePattern, int messageNumber, boolean standaloneProducer) throws JMSException
{
Session producerSession;
MessageProducer producer;
if(standaloneProducer)
{
producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
producer = producerSession.createProducer(_destination);
}
else
{
producerSession = _producerSession;
producer = _producer;
}
for (int i = 0; i < messageNumber; i++)
{
String text = MessageFormat.format(messagePattern, i);
Message message = producerSession.createTextMessage(text);
producer.send(message);
_LOGGER.debug("Test message number " + i + " produced with text = " + text + ", and JMSMessageID = " + message.getJMSMessageID());
}
if(standaloneProducer)
{
producerSession.commit();
}
}
/**
* Consumes given number of text messages and asserts that their content
* matches given pattern
*
* @param messagePattern
* messages content pattern
* @param messageNumber
* message number to received
* @return last consumed message
* @throws JMSException
*/
private Message consumeMessages(String messagePattern, int messageNumber) throws JMSException
{
Message receivedMesssage = null;
for (int i = 0; i < messageNumber; i++)
{
receivedMesssage = _consumer.receive(1000l);
assertReceivedMessage(receivedMesssage, messagePattern, i);
}
return receivedMesssage;
}
/**
* Asserts received message
*
* @param receivedMessage
* received message
* @param messagePattern
* messages content pattern
* @param messageIndex
* message index
*/
private void assertReceivedMessage(Message receivedMessage, String messagePattern, int messageIndex) throws JMSException
{
assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage);
assertTrue("Failure to receive message [" + messageIndex + "], expected TextMessage but received "
+ receivedMessage, receivedMessage instanceof TextMessage);
String expectedText = MessageFormat.format(messagePattern, messageIndex);
String receivedText = null;
try
{
receivedText = ((TextMessage) receivedMessage).getText();
}
catch (JMSException e)
{
fail("JMSException occured while getting message text:" + e.getMessage());
}
_LOGGER.debug("Test message number " + messageIndex + " consumed with text = " + receivedText + ", and JMSMessageID = " + receivedMessage.getJMSMessageID());
assertEquals("Failover is broken! Expected [" + expectedText + "] but got [" + receivedText + "]",
expectedText, receivedText);
}
/**
* Causes failover and waits till connection is re-established.
*/
private void causeFailure()
{
causeFailure(DEFAULT_FAILOVER_TIME * 2);
}
/**
* Causes failover by stopping broker and waits till
* connection is re-established during given time interval.
*
* @param delay
* time interval to wait for connection re-establishement
*/
private void causeFailure(long delay)
{
failDefaultBroker();
awaitForFailoverCompletion(delay);
}
private void awaitForFailoverCompletion(long delay)
{
_logger.info("Awaiting {} ms for failover completion..", delay);
try
{
if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
{
fail("Failover did not complete");
}
}
catch (InterruptedException e)
{
fail("Test was interrupted:" + e.getMessage());
}
}
private void assertFailoverException()
{
// TODO: assert exception is received (once implemented)
// along with error code and/or expected exception type
}
@Override
public void onException(JMSException e)
{
_exceptionListenerException = e;
}
/**
* Causes 1 second delay before reconnect in order to test whether JMS
* methods block while failover is in progress
*/
private static class DelayingFailoverPolicy extends FailoverPolicy
{
private CountDownLatch _suspendLatch;
private long _delay;
public DelayingFailoverPolicy(AMQConnection connection, long delay)
{
super(connection.getConnectionURL(), connection);
_suspendLatch = new CountDownLatch(1);
_delay = delay;
}
public void attainedConnection()
{
try
{
_suspendLatch.await(_delay, TimeUnit.SECONDS);
}
catch (InterruptedException e)
{
// continue
}
super.attainedConnection();
}
}
private class FailoverTestMessageListener implements MessageListener
{
// message counter
private AtomicInteger _counter = new AtomicInteger();
private List<Message> _receivedMessage = new ArrayList<Message>();
private volatile CountDownLatch _endLatch;
public FailoverTestMessageListener() throws JMSException
{
_endLatch = new CountDownLatch(1);
}
@Override
public void onMessage(Message message)
{
_receivedMessage.add(message);
if (_counter.incrementAndGet() % _messageNumber == 0)
{
_endLatch.countDown();
}
}
public void reset()
{
_receivedMessage.clear();
_endLatch = new CountDownLatch(1);
_counter.set(0);
}
public List<Message> getReceivedMessages()
{
return _receivedMessage;
}
public Object awaitForEnd() throws InterruptedException
{
return _endLatch.await((long) _messageNumber, TimeUnit.SECONDS);
}
public int getMessageCounter()
{
return _counter.get();
}
}
/**
* Tests {@link Session#close()} for session with given acknowledge mode
* to ensure that it blocks until failover implementation restores connection.
*
* @param acknowledgeMode session acknowledge mode
* @throws JMSException
*/
private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
{
initDelayedFailover(acknowledgeMode);
// intentionally receive message but not commit or acknowledge it in
// case of transacted or CLIENT_ACK session
Message receivedMessage = _consumer.receive(1000l);
assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
failDefaultBroker();
// wait until failover is started
_failoverStarted.await(5, TimeUnit.SECONDS);
// test whether session#close blocks while failover is in progress
_consumerSession.close();
assertTrue("Failover has not completed yet but session was closed", _failoverComplete.await(5, TimeUnit.SECONDS));
assertFailoverException();
}
/**
* A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing.
*
* @param acknowledgeMode session acknowledge mode
* @return queue browser
* @throws JMSException
*/
private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException, QpidException
{
init(acknowledgeMode, false);
_consumer.close();
_connection.start();
produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
if (acknowledgeMode == Session.SESSION_TRANSACTED)
{
_producerSession.commit();
}
else
{
((AMQSession)_producerSession).sync();
}
QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
return browser;
}
/**
* Tests {@link QueueBrowser#close()} for session with given acknowledge mode
* to ensure that it blocks until failover implementation restores connection.
*
* @param acknowledgeMode session acknowledge mode
* @throws JMSException
*/
private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
{
QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);
@SuppressWarnings("unchecked")
Enumeration<Message> messages = browser.getEnumeration();
Message receivedMessage = (Message) messages.nextElement();
assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
failDefaultBroker();
// wait until failover is started
_failoverStarted.await(5, TimeUnit.SECONDS);
browser.close();
assertTrue("Failover has not completed yet but browser was closed", _failoverComplete.await(5, TimeUnit.SECONDS));
assertFailoverException();
}
private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException
{
DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy();
init(acknowledgeMode, true);
produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
if (acknowledgeMode == Session.SESSION_TRANSACTED)
{
_producerSession.commit();
}
return failoverPolicy;
}
private DelayingFailoverPolicy setDelayedFailoverPolicy()
{
return setDelayedFailoverPolicy(2);
}
private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay)
{
AMQConnection amqConnection = (AMQConnection) _connection;
DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay);
((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
return failoverPolicy;
}
}