/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.qpid.client.failover;

import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import javax.naming.NamingException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.BrokerDetails;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.test.utils.FailoverBaseCase;
import org.apache.qpid.url.URLSyntaxException;

/**
 * Test suite to test all possible failover corner cases
 */
public class FailoverBehaviourTest extends FailoverBaseCase implements ExceptionListener
{
    protected static final Logger _LOGGER = LoggerFactory.getLogger(FailoverBehaviourTest.class);

    private static final String TEST_MESSAGE_FORMAT = "test message {0}";

    /** Indicates whether tests are run against clustered broker */
    private static boolean CLUSTERED = Boolean.getBoolean("profile.clustered");

    /** Default number of messages to send before failover */
    private static final int DEFAULT_NUMBER_OF_MESSAGES = 40;

    /** Actual number of messages to send before failover */
    protected int _messageNumber = Integer.getInteger("profile.failoverMsgCount", DEFAULT_NUMBER_OF_MESSAGES);

    /** Test connection */
    protected Connection _connection;

    /**
     * Consumer session
     */
    private Session _consumerSession;

    /**
     * Test destination
     */
    private Destination _destination;

    /**
     * Consumer
     */
    private MessageConsumer _consumer;

    /**
     * Producer session
     */
    private Session _producerSession;

    /**
     * Producer
     */
    private MessageProducer _producer;

    /**
     * Holds exception sent into {@link ExceptionListener} on failover
     */
    private JMSException _exceptionListenerException;

    @Override
    public void setUp() throws Exception
    {
        super.setUp();

        _connection = getConnection();
        _connection.setExceptionListener(this);
        ((AMQConnection) _connection).setConnectionListener(this);
    }

    /**
     * Test whether MessageProducer can successfully publish messages after
     * failover and rollback transaction
     */
    public void testMessageProducingAndRollbackAfterFailover() throws Exception
    {
        init(Session.SESSION_TRANSACTED, true);
        produceMessages();
        causeFailure();

        assertFailoverException();
        // producer should be able to send messages after failover
        _producer.send(_producerSession.createTextMessage("test message " + _messageNumber));

        // rollback after failover
        _producerSession.rollback();

        // tests whether sending and committing is working after failover
        produceMessages();
        _producerSession.commit();

        // tests whether receiving and committing is working after failover
        consumeMessages();
        _consumerSession.commit();
    }

    /**
     * Test whether {@link TransactionRolledBackException} is thrown on commit
     * of dirty transacted session after failover.
     * <p>
     * Verifies whether second after failover commit is successful.
     */
    public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnProducingMessages() throws Exception
    {
        init(Session.SESSION_TRANSACTED, true);
        produceMessages();
        causeFailure();

        assertFailoverException();

        // producer should be able to send messages after failover
        _producer.send(_producerSession.createTextMessage("test message " + _messageNumber));

        try
        {
            _producerSession.commit();
            fail("TransactionRolledBackException is expected on commit after failover with dirty session!");
        }
        catch (JMSException t)
        {
            assertTrue("Expected TransactionRolledBackException but thrown " + t,
                    t instanceof TransactionRolledBackException);
        }

        // simulate process of user replaying the transaction
        produceMessages("replayed test message {0}", _messageNumber, false);

        // no exception should be thrown
        _producerSession.commit();

        // only messages sent after rollback should be received
        consumeMessages("replayed test message {0}", _messageNumber);

        // no exception should be thrown
        _consumerSession.commit();
    }

    /**
     * Tests JMSException is not thrown on commit with a clean session after
     * failover
     */
    public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanProducerSession() throws Exception
    {
        init(Session.SESSION_TRANSACTED, true);

        causeFailure();

        assertFailoverException();

        // should not throw an exception for a clean session
        _producerSession.commit();

        // tests whether sending and committing is working after failover
        produceMessages();
        _producerSession.commit();

        // tests whether receiving and committing is working after failover
        consumeMessages();
        _consumerSession.commit();
    }

    /**
     * Tests {@link TransactionRolledBackException} is thrown on commit of dirty
     * transacted session after failover.
     * <p>
     * Verifies whether second after failover commit is successful.
     */
    public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnMessageReceiving() throws Exception
    {
        init(Session.SESSION_TRANSACTED, true);
        produceMessages();
        _producerSession.commit();

        // receive messages but do not commit
        consumeMessages();

        causeFailure();

        assertFailoverException();

        try
        {
            // should throw TransactionRolledBackException
            _consumerSession.commit();
            fail("TransactionRolledBackException is expected on commit after failover");
        }
        catch (Exception t)
        {
            assertTrue("Expected TransactionRolledBackException but thrown " + t,
                    t instanceof TransactionRolledBackException);
        }

        resendMessagesIfNecessary();

        // consume messages successfully
        consumeMessages();
        _consumerSession.commit();
    }

    /**
     * Tests JMSException is not thrown on commit with a clean session after failover
     */
    public void testNoJMSExceptionThrownOnCommitAfterFailoverWithCleanConsumerSession() throws Exception
    {
        init(Session.SESSION_TRANSACTED, true);
        produceMessages();
        _producerSession.commit();

        consumeMessages();
        _consumerSession.commit();

        causeFailure();

        assertFailoverException();

        // should not throw an exception with a clean consumer session
        _consumerSession.commit();
    }

    /**
     * Test that TransactionRolledBackException is thrown on commit of
     * dirty session in asynchronous consumer after failover.
     */
    public void testTransactionRolledBackExceptionThrownOnCommitAfterFailoverOnReceivingMessagesAsynchronously()
    throws Exception
    {
        init(Session.SESSION_TRANSACTED, false);
        FailoverTestMessageListener ml = new FailoverTestMessageListener();
        _consumer.setMessageListener(ml);

        _connection.start();

        produceMessages();
        _producerSession.commit();

        // wait for message receiving
        ml.awaitForEnd();

        assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());

        // assert messages
        int counter = 0;
        for (Message message : ml.getReceivedMessages())
        {
            assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
        }
        ml.reset();

        causeFailure();
        assertFailoverException();


        try
        {
            _consumerSession.commit();
            fail("TransactionRolledBackException should be thrown!");
        }
        catch (TransactionRolledBackException e)
        {
            // that is what is expected
        }

        resendMessagesIfNecessary();

        // wait for message receiving
        ml.awaitForEnd();

        assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());

        // assert messages
        counter = 0;
        for (Message message : ml.getReceivedMessages())
        {
            assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
        }

        // commit again. It should be successful
        _consumerSession.commit();
    }

    /**
     * Test that {@link Session#rollback()} does not throw exception after failover
     * and that we are able to consume messages.
     */
    public void testRollbackAfterFailover() throws Exception
    {
        init(Session.SESSION_TRANSACTED, true);

        produceMessages();
        _producerSession.commit();

        consumeMessages();

        causeFailure();

        assertFailoverException();

        _consumerSession.rollback();

        resendMessagesIfNecessary();

        // tests whether receiving and committing is working after failover
        consumeMessages();
        _consumerSession.commit();
    }

    /**
     * Test that {@link Session#rollback()} does not throw exception after receiving further messages
     * after failover, and we can receive published messages after rollback.
     */
    public void testRollbackAfterReceivingAfterFailover() throws Exception
    {
        init(Session.SESSION_TRANSACTED, true);

        produceMessages();
        _producerSession.commit();

        consumeMessages();
        causeFailure();

        assertFailoverException();

        resendMessagesIfNecessary();

        consumeMessages();

        _consumerSession.rollback();

        // tests whether receiving and committing is working after failover
        consumeMessages();
        _consumerSession.commit();
    }

    /**
     * Test that {@link Session#recover()} does not throw an exception after failover
     * and that we can consume messages after recover.
     */
    public void testRecoverAfterFailover() throws Exception
    {
        init(Session.CLIENT_ACKNOWLEDGE, true);

        produceMessages();

        // consume messages but do not acknowledge them
        consumeMessages();

        causeFailure();

        assertFailoverException();

        _consumerSession.recover();

        resendMessagesIfNecessary();

        // tests whether receiving and acknowledgment is working after recover
        Message lastMessage = consumeMessages();
        lastMessage.acknowledge();
    }

    /**
     * Test that receiving more messages after failover and then calling
     * {@link Session#recover()} does not throw an exception
     * and that we can consume messages after recover.
     */
    public void testRecoverWithConsumedMessagesAfterFailover() throws Exception
    {
        init(Session.CLIENT_ACKNOWLEDGE, true);

        produceMessages();

        // consume messages but do not acknowledge them
        consumeMessages();

        causeFailure();

        assertFailoverException();

        // publishing should work after failover
        resendMessagesIfNecessary();

        // consume messages again on a dirty session
        consumeMessages();

        // recover should successfully restore session
        _consumerSession.recover();

        // tests whether receiving and acknowledgment is working after recover
        Message lastMessage = consumeMessages();
        lastMessage.acknowledge();
    }

    /**
     * Test that first call to {@link Message#acknowledge()} after failover
     * throws a JMSEXception if session is dirty.
     */
    public void testAcknowledgeAfterFailover() throws Exception
    {
        init(Session.CLIENT_ACKNOWLEDGE, true);

        produceMessages();

        // consume messages but do not acknowledge them
        Message lastMessage = consumeMessages();
        causeFailure();

        assertFailoverException();

        try
        {
            // an implicit recover performed when acknowledge throws an exception due to failover
            lastMessage.acknowledge();
            fail("JMSException should be thrown");
        }
        catch (JMSException t)
        {
            // TODO: assert error code and/or expected exception type
        }

        resendMessagesIfNecessary();

        // tests whether receiving and acknowledgment is working after recover
        lastMessage = consumeMessages();
        lastMessage.acknowledge();
    }

    /**
     * Test that calling acknowledge before failover leaves the session
     * clean for use after failover.
     */
    public void testAcknowledgeBeforeFailover() throws Exception
    {
        init(Session.CLIENT_ACKNOWLEDGE, true);

        produceMessages();

        // consume messages and acknowledge them
        Message lastMessage = consumeMessages();
        lastMessage.acknowledge();

        causeFailure();

        assertFailoverException();

        produceMessages();

        // tests whether receiving and acknowledgment is working after recover
        lastMessage = consumeMessages();
        lastMessage.acknowledge();
    }

    /**
     * Test that receiving of messages after failover prior to calling
     * {@link Message#acknowledge()} still results in acknowledge throwing an exception.
     */
    public void testAcknowledgeAfterMessageReceivingAfterFailover() throws Exception
    {
        init(Session.CLIENT_ACKNOWLEDGE, true);

        produceMessages();

        // consume messages but do not acknowledge them
        consumeMessages();
        causeFailure();

        assertFailoverException();

        resendMessagesIfNecessary();

        // consume again on dirty session
        Message lastMessage = consumeMessages();
        try
        {
            // an implicit recover performed when acknowledge throws an exception due to failover
            lastMessage.acknowledge();
            fail("JMSException should be thrown");
        }
        catch (JMSException t)
        {
            // TODO: assert error code and/or expected exception type
        }

        // tests whether receiving and acknowledgment is working on a clean session
        lastMessage = consumeMessages();
        lastMessage.acknowledge();
    }

    /**
     * Tests that call to {@link Message#acknowledge()} after failover throws an exception in asynchronous consumer
     * and we can consume messages after acknowledge.
     */
    public void testAcknowledgeAfterFailoverForAsynchronousConsumer() throws Exception
    {
        init(Session.CLIENT_ACKNOWLEDGE, false);
        FailoverTestMessageListener ml = new FailoverTestMessageListener();
        _consumer.setMessageListener(ml);
        _connection.start();

        produceMessages();

        // wait for message receiving
        ml.awaitForEnd();

        assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());

        // assert messages
        int counter = 0;
        Message currentMessage = null;
        for (Message message : ml.getReceivedMessages())
        {
            assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
            currentMessage = message;
        }
        ml.reset();

        causeFailure();
        assertFailoverException();


        try
        {
            currentMessage.acknowledge();
            fail("JMSException should be thrown!");
        }
        catch (JMSException e)
        {
            // TODO: assert error code and/or expected exception type
        }

        resendMessagesIfNecessary();

        // wait for message receiving
        ml.awaitForEnd();

        assertEquals("Received unexpected number of messages!", _messageNumber, ml.getMessageCounter());

        // assert messages
        counter = 0;
        for (Message message : ml.getReceivedMessages())
        {
            assertReceivedMessage(message, TEST_MESSAGE_FORMAT, counter++);
            currentMessage = message;
        }

        // acknowledge again. It should be successful
        currentMessage.acknowledge();
    }

    /**
     * Test whether {@link Session#recover()} works as expected after failover
     * in AA mode.
     */
    public void testRecoverAfterFailoverInAutoAcknowledgeMode() throws Exception
    {
        init(Session.AUTO_ACKNOWLEDGE, true);

        produceMessages();

        // receive first message in order to start a dispatcher thread
        Message receivedMessage = _consumer.receive(1000l);
        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);

        causeFailure();

        assertFailoverException();

        _consumerSession.recover();

        resendMessagesIfNecessary();

        // tests whether receiving is working after recover
        consumeMessages();
    }

    public void testClientAcknowledgedSessionCloseAfterFailover() throws Exception
    {
        sessionCloseAfterFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
    }

    public void testTransactedSessionCloseAfterFailover() throws Exception
    {
        sessionCloseAfterFailoverImpl(Session.SESSION_TRANSACTED);
    }

    public void testAutoAcknowledgedSessionCloseAfterFailover() throws Exception
    {
        sessionCloseAfterFailoverImpl(Session.AUTO_ACKNOWLEDGE);
    }

    public void testPublishAutoAcknowledgedWhileFailover() throws Exception
    {
        publishWhileFailingOver(Session.AUTO_ACKNOWLEDGE);
    }

    public void testPublishClientAcknowledgedWhileFailover() throws Exception
    {
        Message receivedMessage = publishWhileFailingOver(Session.CLIENT_ACKNOWLEDGE);
        receivedMessage.acknowledge();
    }

    public void testPublishTransactedAcknowledgedWhileFailover() throws Exception
    {
        publishWhileFailingOver(Session.SESSION_TRANSACTED);
        _consumerSession.commit();
    }

    public void testPublishAutoAcknowledgedWithFailoverMutex() throws Exception
    {
        publishWithFailoverMutex(Session.AUTO_ACKNOWLEDGE);
    }

    public void testPublishClientAcknowledgedWithFailoverMutex() throws Exception
    {
        publishWithFailoverMutex(Session.CLIENT_ACKNOWLEDGE);

    }

    public void testPublishTransactedAcknowledgedWithFailoverMutex() throws Exception
    {
        publishWithFailoverMutex(Session.SESSION_TRANSACTED);
    }

    public void testClientAcknowledgedSessionCloseWhileFailover() throws Exception
    {
        sessionCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
    }

    public void testTransactedSessionCloseWhileFailover() throws Exception
    {
        sessionCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
    }

    public void testAutoAcknowledgedSessionCloseWhileFailover() throws Exception
    {
        sessionCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
    }

    public void testClientAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
    {
        browserCloseWhileFailoverImpl(Session.CLIENT_ACKNOWLEDGE);
    }

    public void testTransactedQueueBrowserCloseWhileFailover() throws Exception
    {
        browserCloseWhileFailoverImpl(Session.SESSION_TRANSACTED);
    }

    public void testAutoAcknowledgedQueueBrowserCloseWhileFailover() throws Exception
    {
        browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
    }

    public void testKillBrokerFailoverWhilstPublishingInFlight() throws Exception
    {
        doFailoverWhilstPublishingInFlight(true);
    }

    public void testStopBrokerFailoverWhilstPublishingInFlight() throws Exception
    {
        doFailoverWhilstPublishingInFlight(false);
    }

    private void doFailoverWhilstPublishingInFlight(boolean hardKill) throws Exception
    {
        init(Session.SESSION_TRANSACTED, false);

        final int numberOfMessages = 200;

        final CountDownLatch halfWay = new CountDownLatch(1);
        final CountDownLatch allDone = new CountDownLatch(1);
        final AtomicReference<Exception> exception = new AtomicReference<>();

        Runnable producerRunnable = new Runnable()
        {
            @Override
            public void run()
            {
                Thread.currentThread().setName("ProducingThread");

                try
                {
                    for(int i=0; i< numberOfMessages; i++)
                    {
                        boolean success = false;
                        while(!success)
                        {
                            try
                            {
                                Message message = _producerSession.createMessage();
                                message.setIntProperty("msgNum", i);
                                _producer.send(message);
                                _producerSession.commit();
                                success = true;
                            }
                            catch (javax.jms.IllegalStateException e)
                            {
                                // fail - failover should not leave a JMS object in an illegal state
                                throw e;
                            }
                            catch (JMSException e)
                            {
                                // OK we will be failing over
                                _logger.debug("Got JMS exception, probably just failing over", e);
                            }
                        }

                        if (i > numberOfMessages / 2 && halfWay.getCount() == 1)
                        {
                            halfWay.countDown();
                        }
                    }

                    allDone.countDown();
                }
                catch (Exception e)
                {
                    exception.set(e);
                }
            }
        };

        Thread producerThread = new Thread(producerRunnable);
        producerThread.start();

        assertTrue("Didn't get to half way within timeout", halfWay.await(30000, TimeUnit.MILLISECONDS));

        if (hardKill)
        {
            _logger.debug("Killing the Broker");
            killDefaultBroker();
        }
        else
        {
            _logger.debug("Stopping the Broker");
            stopDefaultBroker();
        }

        if (exception.get() != null)
        {
            _logger.error("Unexpected exception from producer thread", exception.get());
        }
        assertNull("Producer thread should not have got an exception", exception.get());

        assertTrue("All producing work was not completed", allDone.await(30000, TimeUnit.MILLISECONDS));

        producerThread.join(30000);

        // Extra work to prove the session still okay
        assertNotNull(_producerSession.createTemporaryQueue());
    }


    private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
    {
        setDelayedFailoverPolicy(5);
        init(autoAcknowledge, true);

        String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
        Message message = _producerSession.createTextMessage(text);

        failDefaultBroker();

        if(!_failoverStarted.await(5, TimeUnit.SECONDS))
        {
            fail("Did not receieve notification failover had started");
        }

        _producer.send(message);

        if (_producerSession.getTransacted())
        {
            _producerSession.commit();
        }

        Message receivedMessage = _consumer.receive(1000l);
        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);
        return receivedMessage;
    }

    private void publishWithFailoverMutex(int autoAcknowledge) throws JMSException, InterruptedException
    {
        setDelayedFailoverPolicy(5);
        init(autoAcknowledge, true);

        String text = MessageFormat.format(TEST_MESSAGE_FORMAT, 0);
        Message message = _producerSession.createTextMessage(text);

        AMQConnection connection = (AMQConnection)_connection;

        // holding failover mutex should prevent the failover from
        // proceeding before we try to send the message
        synchronized(connection.getFailoverMutex())
        {
            failDefaultBroker();

            // wait to make sure that connection is lost
            while(!connection.isFailingOver())
            {
                Thread.sleep(25l);
            }

            try
            {
                _producer.send(message);
                fail("Sending should fail because connection was lost and failover has not yet completed");
            }
            catch(JMSException e)
            {
                // JMSException is expected
            }
        }
        // wait for failover completion, thus ensuring it actually
        //got started, before allowing the test to tear down
        awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
     }

    /**
     * This test only tests 0-8/0-9/0-9-1 failover timeout
     */
    public void testFailoverHandlerTimeoutExpires() throws Exception
    {
        _connection.close();
        setTestSystemProperty("qpid.failover_method_timeout", "10000");
        AMQConnection connection = null;
        try
        {
            connection = createConnectionWithFailover();

            // holding failover mutex should prevent the failover from proceeding
            synchronized(connection.getFailoverMutex())
            {
                killDefaultBroker();
                startDefaultBroker();

                // sleep interval exceeds failover timeout interval
                Thread.sleep(11000l);
            }

            // allows the failover thread to proceed
            Thread.yield();
            assertFalse("Unexpected failover", _failoverComplete.await(2000l, TimeUnit.MILLISECONDS));
            assertTrue("Failover should not succeed due to timeout", connection.isClosed());
        }
        finally
        {
            if (connection != null)
            {
                connection.close();
            }
        }
    }

    public void testFailoverHandlerTimeoutReconnected() throws Exception
    {
        _connection.close();
        setTestSystemProperty("qpid.failover_method_timeout", "10000");
        AMQConnection connection = null;
        try
        {
            connection = createConnectionWithFailover();

            // holding failover mutex should prevent the failover from proceeding
            synchronized(connection.getFailoverMutex())
            {
                killDefaultBroker();
                startDefaultBroker();
            }

            // allows the failover thread to proceed
            Thread.yield();
            awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
            assertFalse("Failover should restore connectivity", connection.isClosed());
        }
        finally
        {
            if (connection != null)
            {
                connection.close();
            }
        }
    }

    /**
     * Tests that the producer flow control flag is reset when failover occurs while
     * the producers are being blocked by the broker.
     *
     * Uses Apache Qpid Broker for Java specific queue configuration to enabled PSFC.
     */
    public void testFlowControlFlagResetOnFailover() throws Exception
    {
        // we do not need the connection failing to second broker
        _connection.close();

        // make sure that failover timeout is bigger than flow control timeout
        setTestSystemProperty("qpid.failover_method_timeout", "60000");
        setTestSystemProperty("qpid.flow_control_wait_failure", "10000");

        AMQConnection connection = null;
        try
        {
            connection = createConnectionWithFailover(Collections.singletonMap(ConnectionURL.OPTIONS_SYNC_PUBLISH, "all"));

            final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
            final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2);
            final AtomicInteger counter = new AtomicInteger();
            // try to send 5 messages (should block after 4)
            new Thread(new Runnable()
            {
                @Override
                public void run()
                {
                    try
                    {
                        MessageProducer producer = producerSession.createProducer(queue);
                        for (int i=0; i < 5; i++)
                        {
                            Message next = createNextMessage(producerSession, i);
                            producer.send(next);
                            producerSession.commit();
                            counter.incrementAndGet();
                        }
                    }
                    catch(Exception e)
                    {
                        // ignore
                    }
                }
            }).start();

            long limit= 30000l;
            long start = System.currentTimeMillis();

            // wait  until session is blocked
            while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit)
            {
                Thread.sleep(100l);
            }

            assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());

            final int currentCounter = counter.get();
            assertTrue("Unexpected number of sent messages:" + currentCounter, currentCounter >=3);

            killDefaultBroker();
            startDefaultBroker();

            // allows the failover thread to proceed
            Thread.yield();
            awaitForFailoverCompletion(60000l);

            assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
        }
        finally
        {
            if (connection != null)
            {
                connection.close();
            }
        }
    }

    public void testFailoverWhenConnectionStopped() throws Exception
    {
        init(Session.SESSION_TRANSACTED, true);

        produceMessages();
        _producerSession.commit();

        final CountDownLatch stopFlag = new CountDownLatch(1);
        final AtomicReference<Exception> exception = new AtomicReference<>();
        final CountDownLatch expectedMessageLatch = new CountDownLatch(_messageNumber);
        final AtomicInteger counter = new AtomicInteger();

        _consumer.setMessageListener(new MessageListener()
        {
            @Override
            public void onMessage(Message message)
            {
                if (stopFlag.getCount() == 1)
                {
                    try
                    {
                        _LOGGER.debug("Stopping connection from dispatcher thread");
                        _connection.stop();
                        _LOGGER.debug("Connection stopped from dispatcher thread");

                    }
                    catch (Exception e)
                    {
                        exception.set(e);
                    }
                    finally
                    {
                        stopFlag.countDown();

                        failDefaultBroker();
                    }

                }
                else
                {
                    try
                    {
                        _consumerSession.commit();
                        counter.incrementAndGet();
                        expectedMessageLatch.countDown();
                    }
                    catch (Exception e)
                    {
                        exception.set(e);
                    }
                }
            }
        });


        boolean stopResult = stopFlag.await(2000, TimeUnit.MILLISECONDS);
        assertTrue("Connection was not stopped" + (exception.get() == null ? "." : ":" + exception.get().getMessage()),
                stopResult);
        assertNull("Unexpected exception on stop :" + exception.get(), exception.get());

        // wait for failover to complete
        awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
        assertFailoverException();

        resendMessagesIfNecessary();
        _producerSession.commit();

        _connection.start();

        assertTrue("Not all messages were delivered. Remaining message number " + expectedMessageLatch.getCount(), expectedMessageLatch.await(11000, TimeUnit.MILLISECONDS));

        Thread.sleep(500l);
        assertEquals("Unexpected messages recieved ", _messageNumber, counter.get());

        _connection.close();
    }

    public void testConnectionCloseInterruptsFailover() throws Exception
    {
        _connection.close();

        final AtomicBoolean failoverCompleted = new AtomicBoolean(false);
        final CountDownLatch failoverBegun = new CountDownLatch(1);

        AMQConnection connection = createConnectionWithFailover();
        connection.setConnectionListener(new ConnectionListener()
        {
            @Override
            public void bytesSent(final long count)
            {
            }

            @Override
            public void bytesReceived(final long count)
            {
            }

            @Override
            public boolean preFailover(final boolean redirect)
            {
                failoverBegun.countDown();
                _LOGGER.info("Failover started");
                return true;
            }

            @Override
            public boolean preResubscribe()
            {
                return true;
            }

            @Override
            public void failoverComplete()
            {
                failoverCompleted.set(true);
            }
        });

        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        assertNotNull("Session should be created", session);
        killDefaultBroker();

        boolean failingOver = failoverBegun.await(5000, TimeUnit.MILLISECONDS);
        assertTrue("Failover did not begin with a reasonable time", failingOver);

        // Failover will now be in flight
        connection.close();
        assertTrue("Failover policy is unexpectedly exhausted", connection.getFailoverPolicy().failoverAllowed());
    }

    private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
    {
        final Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-qpid-capacity", capacity);
        arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
        ((AMQSession<?, ?>) session).createQueue(queueName, false, true, false, arguments);
        Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true
                + "'&autodelete='" + false + "'");
        ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
        return queue;
    }

    private AMQConnection createConnectionWithFailover() throws NamingException, JMSException, URLSyntaxException
    {
        return createConnectionWithFailover(null);
    }

    private AMQConnection createConnectionWithFailover(Map<String,String> connectionOptions) throws NamingException, JMSException, URLSyntaxException
    {
        BrokerDetails origBrokerDetails =  ((AMQConnectionFactory) getConnectionFactory("default")).getConnectionURL().getBrokerDetails(0);

        String retries = "200";
        String connectdelay = "1000";
        String cycleCount = "2";

        String newUrlFormat="amqp://username:password@clientid/test?brokerlist=" +
                            "'tcp://%s:%s?retries='%s'&connectdelay='%s''&failover='singlebroker?cyclecount='%s''";

        String newUrl = String.format(newUrlFormat, origBrokerDetails.getHost(), origBrokerDetails.getPort(),
                                                    retries, connectdelay, cycleCount);

        if (connectionOptions != null)
        {
            for (Map.Entry<String,String> option: connectionOptions.entrySet())
            {
                newUrl+= "&" + option.getKey() + "='" + option.getValue() + "'";
            }
        }
        ConnectionFactory connectionFactory = new AMQConnectionFactory(newUrl);
        AMQConnection connection = (AMQConnection) connectionFactory.createConnection("admin", "admin");
        connection.setConnectionListener(this);
        return connection;
    }

    /**
     * Tests {@link Session#close()} for session with given acknowledge mode
     * to ensure that close works after failover.
     *
     * @param acknowledgeMode session acknowledge mode
     * @throws JMSException
     */
    private void sessionCloseAfterFailoverImpl(int acknowledgeMode) throws JMSException
    {
        init(acknowledgeMode, true);
        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
        if (acknowledgeMode == Session.SESSION_TRANSACTED)
        {
            _producerSession.commit();
        }

        // intentionally receive message but do not commit or acknowledge it in
        // case of transacted or CLIENT_ACK session
        Message receivedMessage = _consumer.receive(1000l);
        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);

        causeFailure();

        assertFailoverException();

        // for transacted/client_ack session
        // no exception should be thrown but transaction should be automatically
        // rolled back
        _consumerSession.close();
    }

    /**
     * A helper method to instantiate produce and consumer sessions, producer
     * and consumer.
     *
     * @param acknowledgeMode
     *            acknowledge mode
     * @param startConnection
     *            indicates whether connection should be started
     * @throws JMSException
     */
    private void init(int acknowledgeMode, boolean startConnection) throws JMSException
    {
        boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;

        _consumerSession = _connection.createSession(isTransacted, acknowledgeMode);
        _destination = createDestination(_consumerSession);
        _consumer = _consumerSession.createConsumer(_destination);

        if (startConnection)
        {
            _connection.start();
        }

        _producerSession = _connection.createSession(isTransacted, acknowledgeMode);
        _producer = _producerSession.createProducer(_destination);

    }

    protected Destination createDestination(Session session) throws JMSException
    {
        return session.createQueue(getTestQueueName() + "_" + System.currentTimeMillis());
    }

    /**
     * Resends messages if reconnected to a non-clustered broker
     *
     * @throws JMSException
     */
    private void resendMessagesIfNecessary() throws JMSException
    {
        if (!CLUSTERED)
        {
            // assert that a new broker does not have messages on a queue
            if (_consumer.getMessageListener() == null)
            {
                Message message = _consumer.receive(100l);
                assertNull("Received a message after failover with non-clustered broker!", message);
            }
            // re-sending messages if reconnected to a non-clustered broker
            produceMessages(true);
        }
    }

    /**
     * Produces a default number of messages with default text content into test
     * queue
     *
     * @throws JMSException
     */
    private void produceMessages() throws JMSException
    {
        produceMessages(false);
    }

    private void produceMessages(boolean seperateProducer) throws JMSException
    {
        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, seperateProducer);
    }

    /**
     * Consumes a default number of messages and asserts their content.
     *
     * @return last consumed message
     * @throws JMSException
     */
    private Message consumeMessages() throws JMSException
    {
        return consumeMessages(TEST_MESSAGE_FORMAT, _messageNumber);
    }

    /**
     * Produces given number of text messages with content matching given
     * content pattern
     *
     * @param messagePattern message content pattern
     * @param messageNumber  number of messages to send
     * @param standaloneProducer whether to use the existing producer or a new one.
     * @throws JMSException
     */
    private void produceMessages(String messagePattern, int messageNumber, boolean standaloneProducer) throws JMSException
    {
        Session producerSession;
        MessageProducer producer;

        if(standaloneProducer)
        {
            producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
            producer = producerSession.createProducer(_destination);
        }
        else
        {
            producerSession = _producerSession;
            producer = _producer;
        }

        for (int i = 0; i < messageNumber; i++)
        {
            String text = MessageFormat.format(messagePattern, i);
            Message message = producerSession.createTextMessage(text);
            producer.send(message);
            _LOGGER.debug("Test message number " + i + " produced with text = " + text + ", and JMSMessageID = " + message.getJMSMessageID());
        }

        if(standaloneProducer)
        {
            producerSession.commit();
        }
    }

    /**
     * Consumes given number of text messages and asserts that their content
     * matches given pattern
     *
     * @param messagePattern
     *            messages content pattern
     * @param messageNumber
     *            message number to received
     * @return last consumed message
     * @throws JMSException
     */
    private Message consumeMessages(String messagePattern, int messageNumber) throws JMSException
    {
        Message receivedMesssage = null;
        for (int i = 0; i < messageNumber; i++)
        {
            receivedMesssage = _consumer.receive(1000l);
            assertReceivedMessage(receivedMesssage, messagePattern, i);
        }
        return receivedMesssage;
    }

    /**
     * Asserts received message
     *
     * @param receivedMessage
     *            received message
     * @param messagePattern
     *            messages content pattern
     * @param messageIndex
     *            message index
     */
    private void assertReceivedMessage(Message receivedMessage, String messagePattern, int messageIndex) throws JMSException
    {
        assertNotNull("Expected message [" + messageIndex + "] is not received!", receivedMessage);
        assertTrue("Failure to receive message [" + messageIndex + "], expected TextMessage but received "
                + receivedMessage, receivedMessage instanceof TextMessage);
        String expectedText = MessageFormat.format(messagePattern, messageIndex);
        String receivedText = null;
        try
        {
            receivedText = ((TextMessage) receivedMessage).getText();
        }
        catch (JMSException e)
        {
            fail("JMSException occured while getting message text:" + e.getMessage());
        }
        _LOGGER.debug("Test message number " + messageIndex + " consumed with text = " + receivedText + ", and JMSMessageID = " + receivedMessage.getJMSMessageID());
        assertEquals("Failover is broken! Expected [" + expectedText + "] but got [" + receivedText + "]",
                expectedText, receivedText);
    }

    /**
     * Causes failover and waits till connection is re-established.
     */
    private void causeFailure()
    {
        causeFailure(DEFAULT_FAILOVER_TIME * 2);
    }

    /**
     * Causes failover by stopping broker and waits till
     * connection is re-established during given time interval.
     *
     * @param delay
     *            time interval to wait for connection re-establishement
     */
    private void causeFailure(long delay)
    {
        failDefaultBroker();

        awaitForFailoverCompletion(delay);
    }

    private void awaitForFailoverCompletion(long delay)
    {
        _logger.info("Awaiting {} ms for failover completion..", delay);
        try
        {
            if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
            {
                fail("Failover did not complete");
            }
        }
        catch (InterruptedException e)
        {
            fail("Test was interrupted:" + e.getMessage());
        }
    }

    private void assertFailoverException()
    {
        // TODO: assert exception is received (once implemented)
        // along with error code and/or expected exception type
    }


    @Override
    public void onException(JMSException e)
    {
        _exceptionListenerException = e;
    }

    /**
     * Causes 1 second delay before reconnect in order to test whether JMS
     * methods block while failover is in progress
     */
    private static class DelayingFailoverPolicy extends FailoverPolicy
    {

        private CountDownLatch _suspendLatch;
        private long _delay;

        public DelayingFailoverPolicy(AMQConnection connection, long delay)
        {
            super(connection.getConnectionURL(), connection);
            _suspendLatch = new CountDownLatch(1);
            _delay = delay;
        }

        public void attainedConnection()
        {
            try
            {
                _suspendLatch.await(_delay, TimeUnit.SECONDS);
            }
            catch (InterruptedException e)
            {
                // continue
            }
            super.attainedConnection();
        }

    }


    private class FailoverTestMessageListener implements MessageListener
    {
        // message counter
        private AtomicInteger _counter = new AtomicInteger();

        private List<Message> _receivedMessage = new ArrayList<Message>();

        private volatile CountDownLatch _endLatch;

        public FailoverTestMessageListener() throws JMSException
        {
            _endLatch = new CountDownLatch(1);
        }

        @Override
        public void onMessage(Message message)
        {
            _receivedMessage.add(message);
            if (_counter.incrementAndGet() % _messageNumber == 0)
            {
                _endLatch.countDown();
            }
        }

        public void reset()
        {
            _receivedMessage.clear();
            _endLatch = new CountDownLatch(1);
            _counter.set(0);
        }

        public List<Message> getReceivedMessages()
        {
            return _receivedMessage;
        }

        public Object awaitForEnd() throws InterruptedException
        {
            return _endLatch.await((long) _messageNumber, TimeUnit.SECONDS);
        }

        public int getMessageCounter()
        {
            return _counter.get();
        }
    }

    /**
     * Tests {@link Session#close()} for session with given acknowledge mode
     * to ensure that it blocks until failover implementation restores connection.
     *
     * @param acknowledgeMode session acknowledge mode
     * @throws JMSException
     */
    private void sessionCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
    {
        initDelayedFailover(acknowledgeMode);

        // intentionally receive message but not commit or acknowledge it in
        // case of transacted or CLIENT_ACK session
        Message receivedMessage = _consumer.receive(1000l);
        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);

        failDefaultBroker();

        // wait until failover is started
        _failoverStarted.await(5, TimeUnit.SECONDS);

        // test whether session#close blocks while failover is in progress
        _consumerSession.close();

        assertTrue("Failover has not completed yet but session was closed", _failoverComplete.await(5, TimeUnit.SECONDS));

        assertFailoverException();
    }

    /**
     * A helper method to instantiate {@link QueueBrowser} and publish test messages on a test queue for further browsing.
     *
     * @param acknowledgeMode session acknowledge mode
     * @return queue browser
     * @throws JMSException
     */
    private QueueBrowser prepareQueueBrowser(int acknowledgeMode) throws JMSException, QpidException
    {
        init(acknowledgeMode, false);
        _consumer.close();
        _connection.start();

        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
        if (acknowledgeMode == Session.SESSION_TRANSACTED)
        {
            _producerSession.commit();
        }
        else
        {
            ((AMQSession)_producerSession).sync();
        }

        QueueBrowser browser = _consumerSession.createBrowser((Queue) _destination);
        return browser;
    }

    /**
     * Tests {@link QueueBrowser#close()} for session with given acknowledge mode
     * to ensure that it blocks until failover implementation restores connection.
     *
     * @param acknowledgeMode session acknowledge mode
     * @throws JMSException
     */
    private void browserCloseWhileFailoverImpl(int acknowledgeMode) throws Exception
    {
        QueueBrowser browser = prepareQueueBrowser(acknowledgeMode);

        @SuppressWarnings("unchecked")
        Enumeration<Message> messages = browser.getEnumeration();
        Message receivedMessage = (Message) messages.nextElement();
        assertReceivedMessage(receivedMessage, TEST_MESSAGE_FORMAT, 0);

        failDefaultBroker();

        // wait until failover is started
        _failoverStarted.await(5, TimeUnit.SECONDS);

        browser.close();

        assertTrue("Failover has not completed yet but browser was closed", _failoverComplete.await(5, TimeUnit.SECONDS));

        assertFailoverException();
    }

    private DelayingFailoverPolicy initDelayedFailover(int acknowledgeMode) throws JMSException
    {
        DelayingFailoverPolicy failoverPolicy = setDelayedFailoverPolicy();
        init(acknowledgeMode, true);
        produceMessages(TEST_MESSAGE_FORMAT, _messageNumber, false);
        if (acknowledgeMode == Session.SESSION_TRANSACTED)
        {
            _producerSession.commit();
        }
        return failoverPolicy;
    }

    private DelayingFailoverPolicy setDelayedFailoverPolicy()
    {
        return setDelayedFailoverPolicy(2);
    }

    private DelayingFailoverPolicy setDelayedFailoverPolicy(long delay)
    {
        AMQConnection amqConnection = (AMQConnection) _connection;
        DelayingFailoverPolicy failoverPolicy = new DelayingFailoverPolicy(amqConnection, delay);
        ((AMQConnection) _connection).setFailoverPolicy(failoverPolicy);
        return failoverPolicy;
    }
}
