QPID-8074: [JMS AMQP 0-x][System Tests] Move subset of failover tests from broker-j to the systest module.
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java
index 2e499f7..598eb6d 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/AbstractSpawnQpidBrokerAdmin.java
@@ -133,6 +133,12 @@
     }
 
     @Override
+    public void stop()
+    {
+        end(_currentTestClass, _currentTestMethod);
+    }
+
+    @Override
     public InetSocketAddress getBrokerAddress(final PortType portType)
     {
         Integer port = null;
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
index 9899226..efb82a3 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
@@ -35,6 +35,7 @@
     void afterTestMethod(final Class testClass, final Method method);
     void afterTestClass(final Class testClass);
     void restart();
+    void stop();
 
     InetSocketAddress getBrokerAddress(PortType portType);
     boolean supportsPersistence();
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
index 60535d6..5925fe6 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
@@ -119,6 +119,16 @@
     }
 
     @Override
+    public void stop()
+    {
+        if (_virtualHostNodeName == null)
+        {
+            throw new BrokerAdminException("Virtual host is not started");
+        }
+        stopVirtualHost(_virtualHostNodeName);
+    }
+
+    @Override
     protected void setUp(final Class testClass)
     {
         try
@@ -403,7 +413,29 @@
         }
         catch (JMSException e)
         {
-            throw new BrokerAdminException(String.format("Cannot create virtual host '%s'", virtualHostNodeName), e);
+            throw new BrokerAdminException(String.format("Cannot restart virtual host '%s'", virtualHostNodeName), e);
+        }
+    }
+
+    void stopVirtualHost(final String virtualHostNodeName)
+    {
+        try
+        {
+            Connection connection = createManagementConnection();
+            try
+            {
+                connection.start();
+                updateVirtualHostNode(virtualHostNodeName,
+                                      Collections.<String, Object>singletonMap("desiredState", "STOPPED"), connection);
+            }
+            finally
+            {
+                connection.close();
+            }
+        }
+        catch (JMSException e)
+        {
+            throw new BrokerAdminException(String.format("Cannot stop virtual host '%s'", virtualHostNodeName), e);
         }
     }
 
diff --git a/systests/src/test/java/org/apache/qpid/systest/connection/FailoverBehaviourTest.java b/systests/src/test/java/org/apache/qpid/systest/connection/FailoverBehaviourTest.java
new file mode 100644
index 0000000..6520fdb
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/connection/FailoverBehaviourTest.java
@@ -0,0 +1,1425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.connection;
+
+import static junit.framework.TestCase.assertFalse;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
+
+import java.text.MessageFormat;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TransactionRolledBackException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.QpidException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.systest.core.util.Utils;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Test suite to test all possible failover corner cases
+ */
+public class FailoverBehaviourTest extends JmsTestBase implements ExceptionListener, ConnectionListener
+{
+    private static final long DEFAULT_FAILOVER_TIME = Long.getLong("FailoverBaseCase.defaultFailoverTime", 10000L);
+    private static final Logger LOGGER = LoggerFactory.getLogger(FailoverBehaviourTest.class);
+
+    private static final String TEST_MESSAGE_FORMAT = "test message {0}";
+
+    /** Default number of messages to send before failover */
+    private static final int DEFAULT_NUMBER_OF_MESSAGES = 40;
+
+    private static final int DEFAULT_MESSAGE_SIZE = 1024;
+
+    /** Actual number of messages to send before failover */
+    private int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES);
+
+    /** Test connection */
+    private Connection _connection;
+    private CountDownLatch _failoverStarted;
+    private CountDownLatch _failoverComplete;
+
+    /**
+     * Consumer session
+     */
+    private Session _consumerSession;
+
+    /**
+     * Test destination
+     */
+    private Destination _destination;
+
+    /**
+     * Consumer
+     */
+    private MessageConsumer _consumer;
+
+    /**
+     * Producer session
+     */
+    private Session _producerSession;
+
+    /**
+     * Producer
+     */
+    private MessageProducer _producer;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        _failoverComplete = new CountDownLatch(1);
+        _failoverStarted = new CountDownLatch(1);
+
+        _connection = getConnection();
+        _connection.setExceptionListener(this);
+        ((AMQConnection) _connection).setConnectionListener(this);
+    }
+
+    /**
+     * Test whether MessageProducer can successfully publish messages after
+     * failover and rollback transaction
+     */
+    @Test
+    public void testMessageProducingAndRollbackAfterFailover() throws Exception
+    {
+        init(Session.SESSION_TRANSACTED, true);
+        produceMessages();
+        getBrokerAdmin().restart();
+
+        // producer should be able to send messages after failover
+        _producer.send(_producerSession.createTextMessage("test message " + _messageNumber));
+
+        // rollback after failover
+        _producerSession.rollback();
+
+        // tests whether sending and committing is working after failover
+        produceMessages();
+        _producerSession.commit();
+
+        // tests whether receiving and committing is working after failover
+        consumeMessages();
+        _consumerSession.commit();
+    }
+
+    /**
+     * Test whether {@link TransactionRolledBackException} is thrown on commit
+     * of dirty transacted session after failover.
+     * <p>
+     * Verifies whether second after failover commit is successful.
+     */
+    @Test
+    public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnProducingMessages() throws Exception
+    {
+        init(Session.SESSION_TRANSACTED, true);
+        produceMessages();
+        getBrokerAdmin().restart();
+
+        // producer should be able to send messages after failover
+        _producer.send(_producerSession.createTextMessage("test message " + _messageNumber));
+
+        try
+        {
+            _producerSession.commit();
+            fail("TransactionRolledBackException is expected on commit after failover with dirty session!");
+        }
+        catch (JMSException t)
+        {
+            assertTrue("Expected TransactionRolledBackException but thrown " + t,
+                    t instanceof TransactionRolledBackException);
+        }
+
+        // simulate process of user replaying the transaction
+        produceMessages("replayed test message {0}", _messageNumber, false);
+
+        // no exception should be thrown
+        _producerSession.commit();
+
+        // only messages sent after rollback should be received
+        consumeMessages("replayed test message {0}", _messageNumber);
+
+        // no exception should be thrown
+        _consumerSession.commit();
+    }
+
+    /**
+     * Tests JMSException is not thrown on commit with a clean session after
+     * failover
+     */
+    @Test
+    public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanProducerSession() throws Exception
+    {
+        init(Session.SESSION_TRANSACTED, true);
+
+        restartAndAwaitFailoverCompletion();
+
+        // should not throw an exception for a clean session
+        _producerSession.commit();
+
+        // tests whether sending and committing is working after failover
+        produceMessages();
+        _producerSession.commit();
+
+        // tests whether receiving and committing is working after failover
+        consumeMessages();
+        _consumerSession.commit();
+    }
+
+    /**
+     * Tests {@link TransactionRolledBackException} is thrown on commit of dirty
+     * transacted session after failover.
+     * <p>
+     * Verifies whether second after failover commit is successful.
+     */
+    @Test
+    public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnMessageReceiving() throws Exception
+    {
+        init(Session.SESSION_TRANSACTED, true);
+        produceMessages();
+        _producerSession.commit();
+
+        // receive messages but do not commit
+        consumeMessages();
+
+        restartAndAwaitFailoverCompletion();
+
+        try
+        {
+            // should throw TransactionRolledBackException
+            _consumerSession.commit();
+            fail("TransactionRolledBackException is expected on commit after failover");
+        }
+        catch (Exception t)
+        {
+            assertTrue("Expected TransactionRolledBackException but thrown " + t,
+                    t instanceof TransactionRolledBackException);
+        }
+
+        // consume messages successfully
+        consumeMessages();
+        _consumerSession.commit();
+    }
+
+    /**
+     * Tests JMSException is not thrown on commit with a clean session after failover
+     */
+    @Test
+    public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanConsumerSession() throws Exception
+    {
+        init(Session.SESSION_TRANSACTED, true);
+        produceMessages();
+        _producerSession.commit();
+
+        consumeMessages();
+        _consumerSession.commit();
+
+        restartAndAwaitFailoverCompletion();
+
+        // should not throw an exception with a clean consumer session
+        _consumerSession.commit();
+    }
+
+    /**
+     * Test that {@link Session#rollback()} does not throw exception after failover
+     * and that we are able to consume messages.
+     */
+    @Test
+    public void testRollbackAfterFailover() throws Exception
+    {
+        init(Session.SESSION_TRANSACTED, true);
+
+        produceMessages();
+        _producerSession.commit();
+
+        consumeMessages();
+
+        restartAndAwaitFailoverCompletion();
+
+        _consumerSession.rollback();
+
+        // tests whether receiving and committing is working after failover
+        consumeMessages();
+        _consumerSession.commit();
+    }
+
+    /**
+     * Test that {@link Session#rollback()} does not throw exception after receiving further messages
+     * after failover, and we can receive published messages after rollback.
+     */
+    @Test
+    public void testRollbackAfterReceivingAfterFailover() throws Exception
+    {
+        init(Session.SESSION_TRANSACTED, true);
+
+        produceMessages();
+        _producerSession.commit();
+
+        consumeMessages();
+        getBrokerAdmin().restart();
+
+        consumeMessages();
+
+        _consumerSession.rollback();
+
+        // tests whether receiving and committing is working after failover
+        consumeMessages();
+        _consumerSession.commit();
+    }
+
+    /**
+     * Test that {@link Session#recover()} does not throw an exception after failover
+     * and that we can consume messages after recover.
+     */
+    @Test
+    public void testRecoverAfterFailover() throws Exception
+    {
+        init(Session.CLIENT_ACKNOWLEDGE, true);
+
+        produceMessages();
+
+        // consume messages but do not acknowledge them
+        consumeMessages();
+
+        restartAndAwaitFailoverCompletion();
+
+        _consumerSession.recover();
+
+        // tests whether receiving and acknowledgment is working after recover
+        Message lastMessage = consumeMessages();
+        lastMessage.acknowledge();
+    }
+
+    /**
+     * Test that receiving more messages after failover and then calling
+     * {@link Session#recover()} does not throw an exception
+     * and that we can consume messages after recover.
+     */
+    @Test
+    public void testRecoverWithConsumedMessagesAfterFailover() throws Exception
+    {
+        init(Session.CLIENT_ACKNOWLEDGE, true);
+
+        produceMessages();
+
+        // consume messages but do not acknowledge them
+        consumeMessages();
+
+        getBrokerAdmin().restart();
+
+        // publishing should work after failover
+
+        // consume messages again on a dirty session
+        consumeMessages();
+
+        // recover should successfully restore session
+        _consumerSession.recover();
+
+        // tests whether receiving and acknowledgment is working after recover
+        Message lastMessage = consumeMessages();
+        lastMessage.acknowledge();
+    }
+
+    /**
+     * Test that first call to {@link Message#acknowledge()} after failover
+     * throws a JMSEXception if session is dirty.
+     */
+    @Test
+    public void testAcknowledgeAfterFailover() throws Exception
+    {
+        LOGGER.debug("KWDEBUG");
+        init(Session.CLIENT_ACKNOWLEDGE, true);
+
+        produceMessages();
+
+        // consume messages but do not acknowledge them
+        Message lastMessage = consumeMessages();
+
+        restartAndAwaitFailoverCompletion();
+
+        try
+        {
+            // an implicit recover performed when acknowledge throws an exception due to failover
+            lastMessage.acknowledge();
+            fail("JMSException should be thrown");
+        }
+        catch (JMSException t)
+        {
+            // TODO: assert error code and/or expected exception type
+        }
+
+        // tests whether receiving and acknowledgment is working after recover
+        lastMessage = consumeMessages();
+        lastMessage.acknowledge();
+    }
+
+    /**
+     * Test that calling acknowledge before failover leaves the session
+     * clean for use after failover.
+     */
+    @Test
+    public void testAcknowledgeBeforeFailover() throws Exception
+    {
+        init(Session.CLIENT_ACKNOWLEDGE, true);
+
+        produceMessages();
+
+        // consume messages and acknowledge them
+        Message lastMessage = consumeMessages();
+        lastMessage.acknowledge();
+
+        getBrokerAdmin().restart();
+
+        produceMessages();
+
+        // tests whether receiving and acknowledgment is working after recover
+        lastMessage = consumeMessages();
+        lastMessage.acknowledge();
+    }
+
+    /**
+     * Test that receiving of messages after failover prior to calling
+     * {@link Message#acknowledge()} still results in acknowledge throwing an exception.
+     */
+    @Test
+    public void testAcknowledgeAfterMessageReceivingAfterFailover() throws Exception
+    {
+        init(Session.CLIENT_ACKNOWLEDGE, true);
+
+        produceMessages();
+
+        // consume messages but do not acknowledge them
+        consumeMessages();
+
+        restartAndAwaitFailoverCompletion();
+
+        // consume again on dirty session
+        Message lastMessage = consumeMessages();
+        try
+        {
+            // an implicit recover performed when acknowledge throws an exception due to failover
+            lastMessage.acknowledge();
+            fail("JMSException should be thrown");
+        }
+        catch (JMSException t)
+        {
+            // TODO: assert error code and/or expected exception type
+        }
+
+        // tests whether receiving and acknowledgment is working on a clean session
+        lastMessage = consumeMessages();
+        lastMessage.acknowledge();
+    }
+
+    @Test
+    public void testClientAcknowledgedSessionCloseAfterFailover() throws Exception
+    {
+        sessionCloseAfterFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    @Test
+    public void testTransactedSessionCloseAfterFailover() throws Exception
+    {
+        sessionCloseAfterFailoverImpl(Session.SESSION_TRANSACTED);
+    }
+
+    @Test
+    public void testAutoAcknowledgedSessionCloseAfterFailover() throws Exception
+    {
+        sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test
+    public void testPublishAutoAcknowledgedWhileFailover() throws Exception
+    {
+        publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test
+    public void testPublishClientAcknowledgedWhileFailover() throws Exception
+    {
+        Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE);
+        receivedMessage.acknowledge();
+    }
+
+    @Test
+    public void testPublishTransactedAcknowledgedWhileFailover() throws Exception
+    {
+        publishWhileFailingOver(Session.SESSION_TRANSACTED);
+        _consumerSession.commit();
+    }
+
+    @Test
+    public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception
+    {
+        sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    @Test
+    public void testTransactedSessionCloseWhileFailover() throws Exception
+    {
+        sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
+    }
+
+    @Test
+    public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception
+    {
+        sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test
+    public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
+    {
+        browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
+    }
+
+    @Test
+    public void testTransactedQueueBrowserCloseWhileFailover() throws Exception
+    {
+        browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
+    }
+
+    @Test
+    public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
+    {
+        browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test
+    public void testKillBrokerFailoverWhilstPublishingInFlight() throws Exception
+    {
+        doFailoverWhilstPublishingInFlight();
+    }
+
+    @Test
+    public void testStopBrokerFailoverWhilstPublishingInFlight() throws Exception
+    {
+        doFailoverWhilstPublishingInFlight();
+    }
+
+    private void doFailoverWhilstPublishingInFlight() throws Exception
+    {
+        init(Session.SESSION_TRANSACTED, false);
+
+        final int numberOfMessages = 200;
+
+        final CountDownLatch halfWay = new CountDownLatch(1);
+        final CountDownLatch allDone = new CountDownLatch(1);
+        final AtomicReference<Exception> exception = new AtomicReference<>();
+
+        Runnable producerRunnable = new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                Thread.currentThread().setName("ProducingThread");
+
+                try
+                {
+                    for(int i=0; i< numberOfMessages; i++)
+                    {
+                        boolean success = false;
+                        while(!success)
+                        {
+                            try
+                            {
+                                Message message = _producerSession.createMessage();
+                                message.setIntProperty("msgNum", i);
+                                _producer.send(message);
+                                _producerSession.commit();
+                                success = true;
+                            }
+                            catch (javax.jms.IllegalStateException e)
+                            {
+                                // fail - failover should not leave a JMS object in an illegal state
+                                throw e;
+                            }
+                            catch (JMSException e)
+                            {
+                                // OK we will be failing over
+                                LOGGER.debug("Got JMS exception, probably just failing over", e);
+                            }
+                        }
+
+                        if (i > numberOfMessages / 2 && halfWay.getCount() == 1)
+                        {
+                            halfWay.countDown();
+                        }
+                    }
+
+                    allDone.countDown();
+                }
+                catch (Exception e)
+                {
+                    exception.set(e);
+                }
+            }
+        };
+
+        Thread producerThread = new Thread(producerRunnable);
+        producerThread.start();
+
+        assertTrue("Didn't get to half way within timeout", halfWay.await(30000, TimeUnit.MILLISECONDS));
+
+        getBrokerAdmin().restart();
+
+        if (exception.get() != null)
+        {
+            LOGGER.error("Unexpected exception from producer thread", exception.get());
+        }
+        assertNull("Producer thread should not have got an exception", exception.get());
+
+        assertTrue("All producing work was not completed", allDone.await(30000, TimeUnit.MILLISECONDS));
+
+        producerThread.join(30000);
+
+        // Extra work to prove the session still okay
+        assertNotNull(_producerSession.createTemporaryQueue());
+    }
+
+
+    private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
+    {
+        setDelayedFailoverPolicy(5);
+        init(autoAcknowledge, true);
+
+        String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
+        Message message = _producerSession.createTextMessage(text);
+
+        getBrokerAdmin().restart();
+
+        if(!_failoverStarted.await(5, TimeUnit.SECONDS))
+        {
+            fail("Did not receive notification failover had started");
+        }
+
+        _producer.send(message);
+
+        if (_producerSession.getTransacted())
+        {
+            _producerSession.commit();
+        }
+
+        Message receivedMessage = _consumer.receive(1000L);
+        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+        return receivedMessage;
+    }
+
+
+    /**
+     * This test only tests 0-8/0-9/0-9-1 failover timeout
+     */
+    @Test
+    public void testFailoverHandlerTimeoutExpires() throws Exception
+    {
+
+        assumeThat("This test only tests 0-8/0-9/0-9-1 failover timeout",
+                   getProtocol(),
+                   is(not(equalTo("0-10"))));
+
+        _connection.close();
+        _connection = null;
+
+        System.setProperty("qpid.failover_method_timeout", "10000");
+        AMQConnection connection = null;
+        try
+        {
+            connection = createConnectionWithFailover();
+
+            // holding failover mutex should prevent the failover from proceeding
+            synchronized(connection.getFailoverMutex())
+            {
+                getBrokerAdmin().restart();
+
+                // sleep interval exceeds failover timeout interval
+                Thread.sleep(11000L);
+            }
+
+            // allows the failover thread to proceed
+            Thread.yield();
+            assertFalse("Unexpected failover", _failoverComplete.await(2000L, TimeUnit.MILLISECONDS));
+            assertTrue("Failover should not succeed due to timeout", connection.isClosed());
+        }
+        finally
+        {
+            System.clearProperty("qpid.failover_method_timeout");
+            if (connection != null)
+            {
+                connection.close();
+            }
+        }
+    }
+
+    @Test
+    public void testFailoverHandlerTimeoutReconnected() throws Exception
+    {
+        _connection.close();
+        System.setProperty("qpid.failover_method_timeout", "10000");
+        AMQConnection connection = null;
+        try
+        {
+            connection = createConnectionWithFailover();
+
+            // holding failover mutex should prevent the failover from proceeding
+            synchronized(connection.getFailoverMutex())
+            {
+                getBrokerAdmin().restart();
+            }
+
+            // allows the failover thread to proceed
+            Thread.yield();
+            awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
+            assertFalse("Failover should restore connectivity", connection.isClosed());
+        }
+        finally
+        {
+            System.clearProperty("qpid.failover_method_timeout");
+            if (connection != null)
+            {
+                connection.close();
+            }
+        }
+    }
+
+    /**
+     * Tests that the producer flow control flag is reset when failover occurs while
+     * the producers are being blocked by the broker.
+     *
+     * Uses Apache Qpid Broker-J specific queue configuration to enabled PSFC.
+     */
+    @Test
+    public void testFlowControlFlagResetOnFailover() throws Exception
+    {
+
+        assumeThat("This test only tests 0-8/0-9/0-9-1 failover timeout",
+                   getProtocol(),
+                   is(not(equalTo("0-10"))));
+
+
+        // we do not need the connection failing to second broker
+        _connection.close();
+
+        // make sure that failover timeout is bigger than flow control timeout
+        System.setProperty("qpid.failover_method_timeout", "60000");
+        System.setProperty("qpid.flow_control_wait_failure", "10000");
+
+        AMQConnection connection = null;
+        try
+        {
+            connection = createConnectionWithFailover(Collections.singletonMap(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all"));
+
+            final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+            final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2);
+            final AtomicInteger counter = new AtomicInteger();
+            // try to send 5 messages (should block after 4)
+            new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        MessageProducer producer = producerSession.createProducer(queue);
+                        for (int i=0; i < 5; i++)
+                        {
+                            Message next = createNextMessage(producerSession, i);
+                            producer.send(next);
+                            producerSession.commit();
+                            counter.incrementAndGet();
+                        }
+                    }
+                    catch(Exception e)
+                    {
+                        // ignore
+                    }
+                }
+            }).start();
+
+            long limit= 30000L;
+            long start = System.currentTimeMillis();
+
+            // wait  until session is blocked
+            while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit)
+            {
+                Thread.sleep(100L);
+            }
+
+            assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
+
+            final int currentCounter = counter.get();
+            assertTrue("Unexpected number of sent messages:" + currentCounter, currentCounter >=3);
+
+            getBrokerAdmin().restart();
+
+            // allows the failover thread to proceed
+            Thread.yield();
+            awaitForFailoverCompletion(60000L);
+
+            assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
+        }
+        finally
+        {
+            System.clearProperty("qpid.failover_method_timeout");
+            System.clearProperty("qpid.flow_control_wait_failure");
+
+            if (connection != null)
+            {
+                connection.close();
+            }
+        }
+    }
+
+    @Test
+    public void testFailoverWhenConnectionStopped() throws Exception
+    {
+        init(Session.SESSION_TRANSACTED, true);
+
+        produceMessages();
+        _producerSession.commit();
+
+        final CountDownLatch stopFlag = new CountDownLatch(1);
+        final AtomicReference<Exception> exception = new AtomicReference<>();
+        final CountDownLatch expectedMessageLatch = new CountDownLatch(_messageNumber);
+        final AtomicInteger counter = new AtomicInteger();
+
+        _consumer.setMessageListener(new MessageListener()
+        {
+            @Override
+            public void onMessage(Message message)
+            {
+                if (stopFlag.getCount() == 1)
+                {
+                    try
+                    {
+                        LOGGER.debug("Stopping connection from dispatcher thread");
+                        _connection.stop();
+                        LOGGER.debug("Connection stopped from dispatcher thread");
+
+                    }
+                    catch (Exception e)
+                    {
+                        exception.set(e);
+                    }
+                    finally
+                    {
+                        stopFlag.countDown();
+                        getBrokerAdmin().restart();
+                    }
+
+                }
+                else
+                {
+                    try
+                    {
+                        _consumerSession.commit();
+                        counter.incrementAndGet();
+                        expectedMessageLatch.countDown();
+                    }
+                    catch (Exception e)
+                    {
+                        exception.set(e);
+                    }
+                }
+            }
+        });
+
+
+        boolean stopResult = stopFlag.await(2000, TimeUnit.MILLISECONDS);
+        assertTrue("Connection was not stopped" + (exception.get() == null ? "." : ":" + exception.get().getMessage()),
+                stopResult);
+        assertNull("Unexpected exception on stop :" + exception.get(), exception.get());
+
+        // wait for failover to complete
+        awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
+
+        _producerSession.commit();
+
+        _connection.start();
+
+        assertTrue("Not all messages were delivered. Remaining message number " + expectedMessageLatch.getCount(), expectedMessageLatch.await(11000, TimeUnit.MILLISECONDS));
+
+        Thread.sleep(500L);
+        assertEquals("Unexpected messages recieved ", _messageNumber, counter.get());
+
+        _connection.close();
+    }
+
+    @Test
+    public void testConnectionCloseInterruptsFailover() throws Exception
+    {
+        _connection.close();
+
+        final AtomicBoolean failoverCompleted = new AtomicBoolean(false);
+        final CountDownLatch failoverBegun = new CountDownLatch(1);
+
+        AMQConnection connection = createConnectionWithFailover();
+        connection.setConnectionListener(new ConnectionListener()
+        {
+            @Override
+            public void bytesSent(final long count)
+            {
+            }
+
+            @Override
+            public void bytesReceived(final long count)
+            {
+            }
+
+            @Override
+            public boolean preFailover(final boolean redirect)
+            {
+                failoverBegun.countDown();
+                LOGGER.info("Failover started");
+                return true;
+            }
+
+            @Override
+            public boolean preResubscribe()
+            {
+                return true;
+            }
+
+            @Override
+            public void failoverComplete()
+            {
+                failoverCompleted.set(true);
+            }
+        });
+
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        assertNotNull("Session should be created", session);
+        getBrokerAdmin().stop();
+
+        boolean failingOver = failoverBegun.await(5000, TimeUnit.MILLISECONDS);
+        assertTrue("Failover did not begin with a reasonable time", failingOver);
+
+        // Failover will now be in flight
+        connection.close();
+        assertTrue("Failover policy is unexpectedly exhausted", connection.getFailoverPolicy().failoverAllowed());
+    }
+
+    private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
+    {
+        final Map<String, Object> arguments = new HashMap<>();
+        arguments.put("x-qpid-capacity", capacity);
+        arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
+        ((AMQSession<?, ?>) session).createQueue(queueName, false, true, false, arguments);
+        Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true
+                + "'&autodelete='" + false + "'");
+        ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
+        return queue;
+    }
+
+    private AMQConnection createConnectionWithFailover() throws JMSException
+    {
+        return createConnectionWithFailover(null);
+    }
+
+    private AMQConnection createConnectionWithFailover(Map<String,String> connectionOptions) throws JMSException
+    {
+        String retries = "200";
+        String connectdelay = "1000";
+        String cycleCount = "2";
+
+        String newUrlFormat = "amqp://username:password@clientid/%s?brokerlist=" +
+                              "'tcp://%s:%s?retries='%s'&connectdelay='%s''&failover='singlebroker?cyclecount='%s''";
+
+        int port = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP).getPort();
+        String virtualHostName = getBrokerAdmin().getVirtualHostName();
+        StringBuilder newUrl = new StringBuilder(String.format(newUrlFormat,
+                                                               virtualHostName,
+                                                               "localhost",
+                                                               port,
+                                                               retries, connectdelay, cycleCount));
+
+        if (connectionOptions != null)
+        {
+            for (Map.Entry<String,String> option: connectionOptions.entrySet())
+            {
+                newUrl.append("&").append(option.getKey()).append("='").append(option.getValue()).append("'");
+            }
+        }
+        ConnectionFactory connectionFactory;
+        try
+        {
+            connectionFactory = new AMQConnectionFactory(newUrl.toString());
+        }
+        catch (URLSyntaxException e)
+        {
+            throw new RuntimeException(e);
+        }
+        AMQConnection connection = (AMQConnection) connectionFactory.createConnection(getBrokerAdmin().getValidUsername(),
+                                                                                      getBrokerAdmin().getValidPassword());
+        connection.setConnectionListener(this);
+        return connection;
+    }
+
+    /**
+     * Tests {@link Session#close()} for session with given acknowledge mode
+     * to ensure that close works after failover.
+     *
+     * @param acknowledgeMode session acknowledge mode
+     */
+    private void sessionCloseAfterFailoverImpl(int acknowledgeMode) throws JMSException
+    {
+        init(acknowledgeMode, true);
+        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+        if (acknowledgeMode == Session.SESSION_TRANSACTED)
+        {
+            _producerSession.commit();
+        }
+
+        // intentionally receive message but do not commit or acknowledge it in
+        // case of transacted or CLIENT_ACK session
+        Message receivedMessage = _consumer.receive(1000L);
+        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+        restartAndAwaitFailoverCompletion();
+
+        // for transacted/client_ack session
+        // no exception should be thrown but transaction should be automatically
+        // rolled back
+        _consumerSession.close();
+    }
+
+    /**
+     * A helper method to instantiate produce and consumer sessions, producer
+     * and consumer.
+     *
+     * @param acknowledgeMode
+     *            acknowledge mode
+     * @param startConnection
+     *            indicates whether connection should be started
+     */
+    private void init(int acknowledgeMode, boolean startConnection) throws JMSException
+    {
+        boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED;
+
+        _consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
+        _destination = createDestination(_consumerSession);
+        _consumer = _consumerSession.createConsumer(_destination);
+
+        if (startConnection)
+        {
+            _connection.start();
+        }
+
+        _producerSession = _connection.createSession(isTransacted, acknowledgeMode);
+        _producer = _producerSession.createProducer(_destination);
+
+    }
+
+    private Destination createDestination(Session session) throws JMSException
+    {
+        return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
+    }
+
+    /**
+     * Produces a default number of messages with default text content into test
+     * queue
+     *
+     */
+    private void produceMessages() throws JMSException
+    {
+        produceMessages(false);
+    }
+
+    private void produceMessages(boolean separateProducer) throws JMSException
+    {
+        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, separateProducer);
+    }
+
+    /**
+     * Consumes a default number of messages and asserts their content.
+     *
+     * @return last consumed message
+     */
+    private Message consumeMessages() throws JMSException
+    {
+        return consumeMessages(TEST_MESSAGE_FORMAT, _messageNumber);
+    }
+
+    /**
+     * Produces given number of text messages with content matching given
+     * content pattern
+     *
+     * @param messagePattern message content pattern
+     * @param messageNumber  number of messages to send
+     * @param standaloneProducer whether to use the existing producer or a new one.
+     */
+    private void produceMessages(String messagePattern, int messageNumber, boolean standaloneProducer) throws JMSException
+    {
+        Session producerSession;
+        MessageProducer producer;
+
+        if(standaloneProducer)
+        {
+            producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+            producer = producerSession.createProducer(_destination);
+        }
+        else
+        {
+            producerSession = _producerSession;
+            producer = _producer;
+        }
+
+        for (int i = 0; i < messageNumber; i++)
+        {
+            String text = MessageFormat.format(messagePattern, i);
+            Message message = producerSession.createTextMessage(text);
+            producer.send(message);
+            LOGGER.debug("Test message number " + i + " produced with text = " + text + ", and JMSMessageID = " + message.getJMSMessageID());
+        }
+
+        if(standaloneProducer)
+        {
+            producerSession.commit();
+        }
+    }
+
+    /**
+     * Consumes given number of text messages and asserts that their content
+     * matches given pattern
+     *
+     * @param messagePattern
+     *            messages content pattern
+     * @param messageNumber
+     *            message number to received
+     * @return last consumed message
+     */
+    private Message consumeMessages(String messagePattern, int messageNumber) throws JMSException
+    {
+        Message receivedMesssage = null;
+        for (int i = 0; i < messageNumber; i++)
+        {
+            receivedMesssage = _consumer.receive(1000L);
+            assertReceivedMessage(receivedMesssage, messagePattern, i);
+        }
+        return receivedMesssage;
+    }
+
+    /**
+     * Asserts received message
+     *
+     * @param receivedMessage
+     *            received message
+     * @param messagePattern
+     *            messages content pattern
+     * @param messageIndex
+     *            message index
+     */
+    private void assertReceivedMessage(Message receivedMessage, String messagePattern, int messageIndex) throws JMSException
+    {
+        assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage);
+        assertTrue("Failure to receive message [" + messageIndex + "], expected TextMessage but received "
+                + receivedMessage, receivedMessage instanceof TextMessage);
+        String expectedText = MessageFormat.format(messagePattern, messageIndex);
+        String receivedText = null;
+        try
+        {
+            receivedText = ((TextMessage) receivedMessage).getText();
+        }
+        catch (JMSException e)
+        {
+            fail("JMSException occured while getting message text:" + e.getMessage());
+        }
+        LOGGER.debug("Test message number " + messageIndex + " consumed with text = " + receivedText + ", and JMSMessageID = " + receivedMessage.getJMSMessageID());
+        assertEquals("Failover is broken! Expected [" + expectedText + "] but got [" + receivedText + "]",
+                expectedText, receivedText);
+    }
+
+
+    private void restartAndAwaitFailoverCompletion()
+    {
+        getBrokerAdmin().restart();
+        awaitForFailoverCompletion();
+    }
+
+    private void awaitForFailoverCompletion()
+    {
+        awaitForFailoverCompletion(2 * DEFAULT_FAILOVER_TIME);
+    }
+
+    private void awaitForFailoverCompletion(long delay)
+    {
+        LOGGER.info("Awaiting {} ms for failover completion..", delay);
+        try
+        {
+            if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
+            {
+                fail("Failover did not complete");
+            }
+        }
+        catch (InterruptedException e)
+        {
+            fail("Test was interrupted:" + e.getMessage());
+        }
+    }
+
+    @Override
+    public void onException(JMSException e)
+    {
+    }
+
+    @After
+    public void tearDown()
+    {
+        if (_connection != null)
+        {
+            try
+            {
+                _connection.close();
+            }
+            catch (JMSException e)
+            {
+                // PASS
+            }
+        }
+    }
+
+    public Connection getConnection() throws JMSException
+    {
+        return createConnectionWithFailover();
+    }
+
+    public void failDefaultBroker()
+    {
+        getBrokerAdmin().stop();
+    }
+
+    @Override
+    public void bytesSent(long count)
+    {
+    }
+
+    @Override
+    public void bytesReceived(long count)
+    {
+    }
+
+    @Override
+    public boolean preFailover(boolean redirect)
+    {
+        _failoverStarted.countDown();
+        return true;
+    }
+
+    @Override
+    public boolean preResubscribe()
+    {
+        return true;
+    }
+
+    @Override
+    public void failoverComplete()
+    {
+        _failoverComplete.countDown();
+    }
+
+    /**
+     * Causes 1 second delay before reconnect in order to test whether JMS
+     * methods block while failover is in progress
+     */
+    private static class DelayingFailoverPolicy extends FailoverPolicy
+    {
+
+        private CountDownLatch _suspendLatch;
+        private long _delay;
+
+        DelayingFailoverPolicy(AMQConnection connection, long delay)
+        {
+            super(connection.getConnectionURL(), connection);
+            _suspendLatch = new CountDownLatch(1);
+            _delay = delay;
+        }
+
+        @Override
+        public void attainedConnection()
+        {
+            try
+            {
+                _suspendLatch.await(_delay, TimeUnit.SECONDS);
+            }
+            catch (InterruptedException e)
+            {
+                // continue
+            }
+            super.attainedConnection();
+        }
+    }
+
+    /**
+     * Tests {@link Session#close()} for session with given acknowledge mode
+     * to ensure that it blocks until failover implementation restores connection.
+     *
+     * @param acknowledgeMode session acknowledge mode
+     */
+    private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
+    {
+        initDelayedFailover(acknowledgeMode);
+
+        // intentionally receive message but not commit or acknowledge it in
+        // case of transacted or CLIENT_ACK session
+        Message receivedMessage = _consumer.receive(1000L);
+        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+        getBrokerAdmin().restart();
+
+        // wait until failover is started
+        _failoverStarted.await(5, TimeUnit.SECONDS);
+
+        // test whether session#close blocks while failover is in progress
+        _consumerSession.close();
+
+        assertTrue("Failover has not completed yet but session was closed", _failoverComplete.await(5, TimeUnit.SECONDS));
+    }
+
+    /**
+     * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing.
+     *
+     * @param acknowledgeMode session acknowledge mode
+     * @return queue browser
+     */
+    private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException, QpidException
+    {
+        init(acknowledgeMode, false);
+        _consumer.close();
+        _connection.start();
+
+        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+        if (acknowledgeMode == Session.SESSION_TRANSACTED)
+        {
+            _producerSession.commit();
+        }
+        else
+        {
+            ((AMQSession)_producerSession).sync();
+        }
+
+        return _consumerSession.createBrowser((Queue) _destination);
+    }
+
+    /**
+     * Tests {@link QueueBrowser#close()} for session with given acknowledge mode
+     * to ensure that it blocks until failover implementation restores connection.
+     *
+     * @param acknowledgeMode session acknowledge mode
+     */
+    private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
+    {
+        QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);
+
+        @SuppressWarnings("unchecked")
+        Enumeration<Message> messages = browser.getEnumeration();
+        Message receivedMessage = messages.nextElement();
+        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
+
+        getBrokerAdmin().restart();
+
+        // wait until failover is started
+        _failoverStarted.await(5, TimeUnit.SECONDS);
+
+        browser.close();
+
+        assertTrue("Failover has not completed yet but browser was closed", _failoverComplete.await(5, TimeUnit.SECONDS));
+    }
+
+    private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException
+    {
+        DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy();
+        init(acknowledgeMode, true);
+        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
+        if (acknowledgeMode == Session.SESSION_TRANSACTED)
+        {
+            _producerSession.commit();
+        }
+        return failoverPolicy;
+    }
+
+    private DelayingFailoverPolicy setDelayedFailoverPolicy()
+    {
+        return setDelayedFailoverPolicy(2);
+    }
+
+    private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay)
+    {
+        AMQConnection amqConnection = (AMQConnection) _connection;
+        DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay);
+        ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
+        return failoverPolicy;
+    }
+
+
+    public Message createNextMessage(Session session, int msgCount) throws JMSException
+    {
+        Message message = createMessage(session, DEFAULT_MESSAGE_SIZE);
+        message.setIntProperty(Utils.INDEX, msgCount);
+
+        return message;
+    }
+
+    public Message createMessage(Session session, int messageSize) throws JMSException
+    {
+        String payload = new String(new byte[messageSize]);
+
+        TextMessage message = session.createTextMessage();
+        message.setText(payload);
+        return message;
+    }
+}
diff --git a/systests/src/test/java/org/apache/qpid/systest/connection/FailoverMethodTest.java b/systests/src/test/java/org/apache/qpid/systest/connection/FailoverMethodTest.java
new file mode 100644
index 0000000..58d140d
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/connection/FailoverMethodTest.java
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.connection;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+import com.google.common.util.concurrent.SettableFuture;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.systest.core.util.PortHelper;
+import org.apache.qpid.util.SystemUtils;
+
+public class FailoverMethodTest extends JmsTestBase implements ExceptionListener
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(FailoverMethodTest.class);
+    private final SettableFuture<JMSException> _failoverComplete = SettableFuture.create();
+    private int _freePortWithNoBroker;
+    private int _port;
+
+    @Before
+    public void setUp()
+    {
+        assumeThat("Test requires redevelopment - timings/behaviour on windows mean it fails",
+                   SystemUtils.isWindows(), is(not(true)));
+
+        InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        _port = brokerAddress.getPort();
+        _freePortWithNoBroker = new PortHelper().getNextAvailable();
+    }
+    /**
+     * Test that the round robin method has the correct delays.
+     * The first connection will work but the localhost connection should fail but the duration it takes
+     * to report the failure is what is being tested.
+     *
+     */
+    @Test
+    public void testFailoverRoundRobinDelay() throws Exception
+    {
+        //note: The first broker has no connect delay and the default 1 retry
+        //        while the tcp:localhost broker has 3 retries with a 2s connect delay
+        String connectionString = String.format(
+                "amqp://%s:%s@/?brokerlist='tcp://localhost:%d;tcp://localhost:%d?connectdelay='2000',retries='3''",
+                getBrokerAdmin().getValidUsername(),
+                getBrokerAdmin().getValidPassword(),
+                _port,
+                _freePortWithNoBroker);
+
+        AMQConnectionURL url = new AMQConnectionURL(connectionString);
+
+        AMQConnection connection = null;
+        try
+        {
+            long start = System.currentTimeMillis();
+            connection = new AMQConnection(url);
+
+            connection.setExceptionListener(this);
+
+            LOGGER.debug("Stopping broker");
+            getBrokerAdmin().stop();
+            LOGGER.debug("Stopped broker");
+
+            _failoverComplete.get(30, TimeUnit.SECONDS);
+
+            long end = System.currentTimeMillis();
+
+            long duration = (end - start);
+
+            //Failover should take more that 6 seconds.
+            // 3 Retries
+            // so VM Broker NoDelay 0 (Connect) NoDelay 0
+            // then TCP NoDelay 0 Delay 1 Delay 2 Delay  3
+            // so 3 delays of 2s in total for connection
+            // as this is a tcp connection it will take 1second per connection to fail
+            // so max time is 6seconds of delay plus 4 seconds of TCP Delay + 1 second of runtime. == 11 seconds
+
+            // Ensure we actually had the delay
+            assertTrue("Failover took less than 6 seconds", duration > 6000);
+
+            // Ensure we don't have delays before initial connection and reconnection.
+            // We allow 1 second for initial connection and failover logic on top of 6s of sleep.
+            assertTrue("Failover took more than 11 seconds:(" + duration + ")", duration < 11000);
+        }
+        finally
+        {
+            if (connection != null)
+            {
+                connection.close();
+            }
+        }
+    }
+
+    @Test
+    public void testFailoverSingleDelay() throws Exception
+    {
+        String connectionString = String.format(
+                "amqp://%s:%s@/?brokerlist='tcp://localhost:%d?connectdelay='2000',retries='3''",
+                getBrokerAdmin().getValidUsername(),
+                getBrokerAdmin().getValidPassword(),
+                _port);
+
+        AMQConnectionURL url = new AMQConnectionURL(connectionString);
+        AMQConnection connection = null;
+        try
+        {
+            long start = System.currentTimeMillis();
+            connection = new AMQConnection(url);
+
+            connection.setExceptionListener(this);
+
+            LOGGER.debug("Stopping broker");
+            getBrokerAdmin().stop();
+            LOGGER.debug("Stopped broker");
+
+            _failoverComplete.get(30, TimeUnit.SECONDS);
+
+            long end = System.currentTimeMillis();
+
+            long duration = (end - start);
+
+            //Failover should take more that 6 seconds.
+            // 3 Retries
+            // so NoDelay 0 (Connect) NoDelay 0 Delay 1 Delay 2 Delay  3
+            // so 3 delays of 2s in total for connection
+            // so max time is 6 seconds of delay + 1 second of runtime. == 7 seconds
+
+            // Ensure we actually had the delay
+            assertTrue("Failover took less than 6 seconds", duration > 6000);
+
+            // Ensure we don't have delays before initial connection and reconnection.
+            // We allow 3 second for initial connection and failover logic on top of 6s of sleep.
+            assertTrue("Failover took more than 9 seconds:(" + duration + ")", duration < 9000);
+        }
+        finally
+        {
+            if (connection != null)
+            {
+                connection.close();
+            }
+        }
+    }
+
+    @Override
+    public void onException(JMSException e)
+    {
+        _failoverComplete.set(e);
+    }
+}