/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.jms.integration;

import static org.apache.qpid.jms.provider.amqp.AmqpSupport.NETWORK_HOST;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.OPEN_HOSTNAME;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PORT;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;

import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionExtensions;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsConnectionRemotelyClosedException;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.provider.exceptions.ProviderConnectionRedirectedException;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.util.MetaDataSupport;
import org.apache.qpid.jms.util.QpidJMSTestRunner;
import org.apache.qpid.jms.util.Repeat;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.transaction.TxnCapability;
import org.apache.qpid.proton.engine.impl.AmqpHeader;
import org.hamcrest.Matcher;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(QpidJMSTestRunner.class)
public class ConnectionIntegrationTest extends QpidJmsTestCase {
    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();

    @Test(timeout = 20000)
    public void testCreateAndCloseConnection() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            Connection connection = testFixture.establishConnecton(testPeer);
            testPeer.expectClose();
            connection.close();
        }
    }

    @Test(timeout = 10000)
    public void testCreateConnectionToNonSaslPeer() throws Exception {
        doConnectionWithUnexpectedHeaderTestImpl(AmqpHeader.HEADER);
    }

    @Test(timeout = 10000)
    public void testCreateConnectionToNonAmqpPeer() throws Exception {
        byte[] responseHeader = new byte[] { 'N', 'O', 'T', '-', 'A', 'M', 'Q', 'P' };
        doConnectionWithUnexpectedHeaderTestImpl(responseHeader);
    }

    private void doConnectionWithUnexpectedHeaderTestImpl(byte[] responseHeader) throws Exception, IOException {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

            testPeer.expectHeader(AmqpHeader.SASL_HEADER, responseHeader);

            ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
            try {
                factory.createConnection("guest", "guest");
                fail("Expected connection creation to fail");
            } catch (JMSException jmse) {
                assertThat(jmse.getMessage(), containsString("SASL header mismatch"));
            }
        }
    }

    @Test(timeout = 20000)
    public void testCloseConnectionTimesOut() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
            connection.setCloseTimeout(500);

            testPeer.expectClose(false);

            connection.start();
            assertNotNull("Connection should not be null", connection);
            connection.close();

            testPeer.waitForAllHandlersToComplete(1000);
        }
    }

    @Test(timeout = 20000)
    public void testCloseConnectionCompletesWhenConnectionDropsBeforeResponse() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);

            testPeer.expectClose(false);
            testPeer.dropAfterLastHandler();

            connection.start();
            assertNotNull("Connection should not be null", connection);
            connection.close();

            testPeer.waitForAllHandlersToComplete(1000);
        }
    }

    @Test(timeout = 20000)
    public void testCreateConnectionWithClientId() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            Connection connection = testFixture.establishConnecton(testPeer, false, null, null, null, true);
            testPeer.expectClose();
            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testCreateAutoAckSession() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            Connection connection = testFixture.establishConnecton(testPeer);
            testPeer.expectBegin();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            assertNotNull("Session should not be null", session);
            testPeer.expectClose();
            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testCreateAutoAckSessionByDefault() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            Connection connection = testFixture.establishConnecton(testPeer);
            testPeer.expectBegin();
            Session session = connection.createSession();
            assertNotNull("Session should not be null", session);
            testPeer.expectClose();
            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testCreateAutoAckSessionUsingAckModeOnlyMethod() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            Connection connection = testFixture.establishConnecton(testPeer);
            testPeer.expectBegin();
            Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
            assertNotNull("Session should not be null", session);
            testPeer.expectClose();
            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testCreateTransactedSession() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            Connection connection = testFixture.establishConnecton(testPeer);

            testPeer.expectBegin();
            // Expect the session, with an immediate link to the transaction coordinator
            // using a target with the expected capabilities only.
            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
            txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);

            // First expect an unsettled 'declare' transfer to the txn coordinator, and
            // reply with a declared disposition state containing the txnId.
            Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
            testPeer.expectDeclare(txnId);
            testPeer.expectDischarge(txnId, true);
            testPeer.expectClose();

            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            assertNotNull("Session should not be null", session);

            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testCreateTransactedSessionUsingAckModeOnlyMethod() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            Connection connection = testFixture.establishConnecton(testPeer);

            testPeer.expectBegin();
            // Expect the session, with an immediate link to the transaction coordinator
            // using a target with the expected capabilities only.
            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
            txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);

            // First expect an unsettled 'declare' transfer to the txn coordinator, and
            // reply with a declared disposition state containing the txnId.
            Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
            testPeer.expectDeclare(txnId);
            testPeer.expectDischarge(txnId, true);
            testPeer.expectClose();

            Session session = connection.createSession(Session.SESSION_TRANSACTED);
            assertNotNull("Session should not be null", session);

            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testCreateTransactedSessionFailsWhenNoDetachResponseSent() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            Connection connection = testFixture.establishConnecton(testPeer);
            ((JmsConnection) connection).setRequestTimeout(500);

            testPeer.expectBegin();
            // Expect the session, with an immediate link to the transaction coordinator
            // using a target with the expected capabilities only.
            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
            txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
            testPeer.expectSenderAttach(notNullValue(), txCoordinatorMatcher, true, true, false, 0, null, null);
            testPeer.expectDetach(true, false, false);
            // Expect the AMQP session to be closed due to the JMS session creation failure.
            testPeer.expectEnd();

            try {
                connection.createSession(true, Session.SESSION_TRANSACTED);
                fail("Session create should have failed.");
            } catch (JMSException ex) {
                // Expected
            }

            testPeer.expectClose();
            connection.close();

            testPeer.waitForAllHandlersToComplete(1000);
        }
    }

    @Test(timeout = 20000)
    public void testRemotelyCloseConnectionDuringSessionCreation() throws Exception {
        final String BREAD_CRUMB = "ErrorMessageBreadCrumb";

        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            Connection connection = testFixture.establishConnecton(testPeer);

            // Expect the begin, then explicitly close the connection with an error
            testPeer.expectBegin(notNullValue(), false);
            testPeer.remotelyCloseConnection(true, AmqpError.NOT_ALLOWED, BREAD_CRUMB);

            try {
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                fail("Expected exception to be thrown");
            } catch (JMSException jmse) {
                // Expected
                assertNotNull("Expected exception to have a message", jmse.getMessage());
                assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
            }

            testPeer.waitForAllHandlersToComplete(3000);

            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testRemotelyDropConnectionDuringSessionCreation() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            Connection connection = testFixture.establishConnecton(testPeer);

            // Expect the begin, then drop connection without without a close frame.
            testPeer.expectBegin(notNullValue(), false);
            testPeer.dropAfterLastHandler();

            try {
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                fail("Expected exception to be thrown");
            } catch (JMSException jmse) {
                // Expected
            }

            testPeer.waitForAllHandlersToComplete(3000);

            connection.close();
        }
    }

    @Repeat(repetitions = 1)
    @Test(timeout = 20000)
    public void testRemotelyDropConnectionDuringSessionCreationTransacted() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            testPeer.expectSaslAnonymous();
            testPeer.expectOpen();
            testPeer.expectBegin();

            ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?jms.clientID=foo");
            Connection connection = factory.createConnection();

            CountDownLatch exceptionListenerFired = new CountDownLatch(1);
            connection.setExceptionListener(ex -> exceptionListenerFired.countDown());

            // Expect the begin, then drop connection without without a close frame before the tx-coordinator setup.
            testPeer.expectBegin();
            testPeer.dropAfterLastHandler();

            try {
                connection.createSession(true, Session.SESSION_TRANSACTED);
                fail("Expected exception to be thrown");
            } catch (JMSException jmse) {
                // Expected
            }

            assertTrue("Exception listener did not fire", exceptionListenerFired.await(5, TimeUnit.SECONDS));

            testPeer.waitForAllHandlersToComplete(3000);

            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testConnectionPropertiesContainExpectedMetaData() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

            Matcher<?> connPropsMatcher = allOf(hasEntry(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME),
                    hasEntry(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION),
                    hasEntry(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS));

            testPeer.expectSaslAnonymous();
            testPeer.expectOpen(connPropsMatcher, null, false);
            testPeer.expectBegin();

            ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?jms.clientID=foo");
            Connection connection = factory.createConnection();

            testPeer.waitForAllHandlersToComplete(1000);
            assertNull(testPeer.getThrowable());

            testPeer.expectClose();
            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testMaxFrameSizeOptionCommunicatedInOpen() throws Exception {
        int frameSize = 39215;
        doMaxFrameSizeOptionTestImpl(frameSize, UnsignedInteger.valueOf(frameSize));
    }

    @Test(timeout = 20000)
    public void testMaxFrameSizeOptionCommunicatedInOpenDefault() throws Exception {
        doMaxFrameSizeOptionTestImpl(-1, UnsignedInteger.MAX_VALUE);
    }

    private void doMaxFrameSizeOptionTestImpl(int uriOption, UnsignedInteger transmittedValue) throws JMSException, InterruptedException, Exception, IOException {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            testPeer.expectSaslLayerDisabledConnect(equalTo(transmittedValue));
            // Each connection creates a session for managing temporary destinations etc
            testPeer.expectBegin();

            String uri = "amqp://localhost:" + testPeer.getServerPort() + "?amqp.saslLayer=false&amqp.maxFrameSize=" + uriOption;
            ConnectionFactory factory = new JmsConnectionFactory(uri);
            Connection connection = factory.createConnection();
            connection.start();

            testPeer.waitForAllHandlersToComplete(3000);
            assertNull(testPeer.getThrowable());

            testPeer.expectClose();
            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testMaxFrameSizeInfluencesOutgoingFrameSize() throws Exception {
        doMaxFrameSizeInfluencesOutgoingFrameSizeTestImpl(1000, 10001, 11);
        doMaxFrameSizeInfluencesOutgoingFrameSizeTestImpl(1500, 6001, 5);
    }

    private void doMaxFrameSizeInfluencesOutgoingFrameSizeTestImpl(int frameSize, int bytesPayloadSize, int numFrames) throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            testPeer.expectSaslLayerDisabledConnect(equalTo(UnsignedInteger.valueOf(frameSize)));
            // Each connection creates a session for managing temporary destinations etc
            testPeer.expectBegin();

            String uri = "amqp://localhost:" + testPeer.getServerPort() + "?amqp.saslLayer=false&amqp.maxFrameSize=" + frameSize;
            ConnectionFactory factory = new JmsConnectionFactory(uri);
            Connection connection = factory.createConnection();

            testPeer.expectBegin();
            testPeer.expectSenderAttach();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("myQueue");
            MessageProducer producer = session.createProducer(queue);

            // Expect n-1 transfers of maxFrameSize
            for (int i = 1; i < numFrames; i++) {
                testPeer.expectTransfer(frameSize);
            }
            // Plus one more of unknown size (framing overhead).
            testPeer.expectTransfer(0);

            // Send the message
            byte[] orig = createBytePyload(bytesPayloadSize);
            BytesMessage message = session.createBytesMessage();
            message.writeBytes(orig);

            producer.send(message);

            testPeer.expectClose();
            connection.close();

            testPeer.waitForAllHandlersToComplete(3000);
        }
    }

    private static byte[] createBytePyload(int sizeInBytes) {
        Random rand = new Random(System.currentTimeMillis());

        byte[] payload = new byte[sizeInBytes];
        for (int i = 0; i < sizeInBytes; i++) {
            payload[i] = (byte) (64 + 1 + rand.nextInt(9));
        }

        return payload;
    }

    @Test(timeout = 20000)
    public void testAmqpHostnameSetByDefault() throws Exception {
        doAmqpHostnameTestImpl("localhost", false, equalTo("localhost"));
    }

    @Test(timeout = 20000)
    public void testAmqpHostnameSetByVhostOption() throws Exception {
        String vhost = "myAmqpHost";
        doAmqpHostnameTestImpl(vhost, true, equalTo(vhost));
    }

    @Test(timeout = 20000)
    public void testAmqpHostnameNotSetWithEmptyVhostOption() throws Exception {
        doAmqpHostnameTestImpl("", true, nullValue());
    }

    private void doAmqpHostnameTestImpl(String amqpHostname, boolean setHostnameOption, Matcher<?> hostnameMatcher) throws JMSException, InterruptedException, Exception, IOException {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            testPeer.expectSaslAnonymous();
            testPeer.expectOpen(null, hostnameMatcher, false);
            // Each connection creates a session for managing temporary destinations etc
            testPeer.expectBegin();

            String uri = "amqp://localhost:" + testPeer.getServerPort();
            if(setHostnameOption) {
                uri += "?amqp.vhost=" + amqpHostname;
            }

            ConnectionFactory factory = new JmsConnectionFactory(uri);
            Connection connection = factory.createConnection();
            // Set a clientID to provoke the actual AMQP connection process to occur.
            connection.setClientID("clientName");

            testPeer.waitForAllHandlersToComplete(1000);
            assertNull(testPeer.getThrowable());

            testPeer.expectClose();
            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testRemotelyEndConnectionListenerInvoked() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            final CountDownLatch done = new CountDownLatch(1);

            // Don't set a ClientId, so that the underlying AMQP connection isn't established yet
            Connection connection = testFixture.establishConnecton(testPeer, false, null, null, null, false);

            // Tell the test peer to close the connection when executing its last handler
            testPeer.remotelyCloseConnection(true);

            // Add the exception listener
            connection.setExceptionListener(new ExceptionListener() {

                @Override
                public void onException(JMSException exception) {
                    done.countDown();
                }
            });

            // Trigger the underlying AMQP connection
            connection.start();

            assertTrue("Connection should report failure", done.await(5, TimeUnit.SECONDS));

            testPeer.waitForAllHandlersToComplete(1000);

            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testRemotelyEndConnectionWithRedirect() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            final CountDownLatch done = new CountDownLatch(1);
            final AtomicReference<JMSException> asyncError = new AtomicReference<JMSException>();

            final String redirectVhost = "vhost";
            final String redirectNetworkHost = "localhost";
            final int redirectPort = 5677;

            // Don't set a ClientId, so that the underlying AMQP connection isn't established yet
            Connection connection = testFixture.establishConnecton(testPeer, false, null, null, null, false);

            // Tell the test peer to close the connection when executing its last handler
            Map<Symbol, Object> errorInfo = new HashMap<Symbol, Object>();
            errorInfo.put(OPEN_HOSTNAME, redirectVhost);
            errorInfo.put(NETWORK_HOST, redirectNetworkHost);
            errorInfo.put(PORT, 5677);

            testPeer.remotelyCloseConnection(true, ConnectionError.REDIRECT, "Connection redirected", errorInfo);

            // Add the exception listener
            connection.setExceptionListener(new ExceptionListener() {

                @Override
                public void onException(JMSException exception) {
                    asyncError.set(exception);
                    done.countDown();
                }
            });

            // Trigger the underlying AMQP connection
            connection.start();

            assertTrue("Connection should report failure", done.await(5, TimeUnit.SECONDS));

            assertTrue(asyncError.get() instanceof JMSException);
            assertTrue(asyncError.get().getCause() instanceof ProviderConnectionRedirectedException);

            ProviderConnectionRedirectedException redirect = (ProviderConnectionRedirectedException) asyncError.get().getCause();
            URI redirectionURI = redirect.getRedirectionURI();

            assertNotNull(redirectionURI);
            assertTrue(redirectVhost, redirectionURI.getQuery().contains("amqp.vhost=" + redirectVhost));
            assertEquals(redirectNetworkHost, redirectionURI.getHost());
            assertEquals(redirectPort, redirectionURI.getPort());

            testPeer.waitForAllHandlersToComplete(1000);

            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testRemotelyEndConnectionWithSessionWithConsumer() throws Exception {
        final String BREAD_CRUMB = "ErrorMessage";

        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            final Connection connection = testFixture.establishConnecton(testPeer);

            testPeer.expectBegin();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Create a consumer, then remotely end the connection afterwards.
            testPeer.expectReceiverAttach();
            testPeer.expectLinkFlow();
            testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);

            Queue queue = session.createQueue("myQueue");
            MessageConsumer consumer = session.createConsumer(queue);

            testPeer.waitForAllHandlersToComplete(1000);
            assertTrue("connection never closed.", Wait.waitFor(new Wait.Condition() {
                @Override
                public boolean isSatisfied() throws Exception {
                    return !((JmsConnection) connection).isConnected();
                }
            }, 10000, 10));

            try {
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                fail("Expected ISE to be thrown due to being closed");
            } catch (IllegalStateException jmsise) {
                String message = jmsise.getCause().getMessage();
                assertTrue(message.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()));
                assertTrue(message.contains(BREAD_CRUMB));
            }

            // Verify the session is now marked closed
            try {
                session.getAcknowledgeMode();
                fail("Expected ISE to be thrown due to being closed");
            } catch (IllegalStateException jmsise) {
                String message = jmsise.getCause().getMessage();
                assertTrue(message.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()));
                assertTrue(message.contains(BREAD_CRUMB));
            }

            // Verify the consumer is now marked closed
            try {
                consumer.getMessageListener();
                fail("Expected ISE to be thrown due to being closed");
            } catch (IllegalStateException jmsise) {
                String message = jmsise.getCause().getMessage();
                assertTrue(message.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()));
                assertTrue(message.contains(BREAD_CRUMB));
            }

            // Try closing them explicitly, should effectively no-op in client.
            // The test peer will throw during close if it sends anything.
            consumer.close();
            session.close();
            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void  testRemotelyEndConnectionWithSessionWithProducerWithSendWaitingOnCredit() throws Exception {
        final String BREAD_CRUMB = "ErrorMessageBreadCrumb";

        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            Connection connection = testFixture.establishConnecton(testPeer);

            testPeer.expectBegin();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Expect producer creation, don't give it credit.
            testPeer.expectSenderAttachWithoutGrantingCredit();

            // Producer has no credit so the send should block waiting for it.
            testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB, 50);

            Queue queue = session.createQueue("myQueue");
            final MessageProducer producer = session.createProducer(queue);

            Message message = session.createTextMessage("myMessage");

            try {
                producer.send(message);
                fail("Expected exception to be thrown");
            } catch (ResourceAllocationException jmse) {
                // Expected
                assertNotNull("Expected exception to have a message", jmse.getMessage());
                assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
            } catch (Throwable t) {
                fail("Caught unexpected exception: " + t);
            }

            connection.close();

            testPeer.waitForAllHandlersToComplete(3000);
        }
    }

    @Test(timeout = 20000)
    public void  testRemotelyEndConnectionWithSessionWithProducerWithSendWaitingOnOutcome() throws Exception {
        final String BREAD_CRUMB = "ErrorMessageBreadCrumb";

        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            Connection connection = testFixture.establishConnecton(testPeer);

            testPeer.expectBegin();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Expect producer creation, and a message to be sent, but don't return a disposition.
            // Instead, close the connection.
            testPeer.expectSenderAttach();
            testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
            testPeer.remotelyCloseConnection(true, ConnectionError.CONNECTION_FORCED, BREAD_CRUMB, 50);

            Queue queue = session.createQueue("myQueue");
            final MessageProducer producer = session.createProducer(queue);

            Message message = session.createTextMessage("myMessage");

            try {
                producer.send(message);
                fail("Expected exception to be thrown");
            } catch (JmsConnectionRemotelyClosedException jmse) {
                // Expected
                assertNotNull("Expected exception to have a message", jmse.getMessage());
                assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
            } catch (Throwable t) {
                fail("Caught unexpected exception: " + t);
            }

            connection.close();

            testPeer.waitForAllHandlersToComplete(3000);
        }
    }

    @Test(timeout = 20000)
    public void testCreateConnectionWithServerSendingPreemptiveData() throws Exception {
        boolean sendServerSaslHeaderPreEmptively = true;
        try (TestAmqpPeer testPeer = new TestAmqpPeer(null, false, sendServerSaslHeaderPreEmptively);) {
            // Don't use test fixture, handle the connection directly to control sasl behaviour
            testPeer.expectSaslAnonymousWithPreEmptiveServerHeader();
            testPeer.expectOpen();
            testPeer.expectBegin();

            JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
            Connection connection = factory.createConnection();
            connection.start();

            testPeer.expectClose();
            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testDontAwaitClientIDBeforeOpen() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

            String uri = "amqp://localhost:" + testPeer.getServerPort() + "?jms.awaitClientID=false";

            testPeer.expectSaslAnonymous();
            testPeer.expectOpen();
            testPeer.expectBegin();

            ConnectionFactory factory = new JmsConnectionFactory(uri);
            Connection connection = factory.createConnection();

            // Verify that all handlers complete, i.e. the awaitClientID=false option
            // setting was effective in provoking the AMQP Open immediately even
            // though it has no ClientID and we haven't used the Connection.
            testPeer.waitForAllHandlersToComplete(3000);

            testPeer.expectClose();
            connection.close();

            testPeer.waitForAllHandlersToComplete(1000);
        }
    }

    @Test(timeout = 20000)
    public void testWaitForClientIDDoesNotOpenUntilPromptedWithSetClientID() throws Exception {
        doTestWaitForClientIDDoesNotOpenUntilPrompted(true);
    }

    @Test(timeout = 20000)
    public void testWaitForClientIDDoesNotOpenUntilPromptedWithStart() throws Exception {
        doTestWaitForClientIDDoesNotOpenUntilPrompted(false);
    }

    private void doTestWaitForClientIDDoesNotOpenUntilPrompted(boolean setClientID) throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

            String uri = "amqp://localhost:" + testPeer.getServerPort() + "?jms.awaitClientID=true";

            testPeer.expectSaslAnonymous();

            ConnectionFactory factory = new JmsConnectionFactory(uri);
            Connection connection = factory.createConnection();

            testPeer.waitForAllHandlersToComplete(1000);

            testPeer.expectOpen();
            testPeer.expectBegin();

            if (setClientID) {
                connection.setClientID("client-id");
            } else {
                connection.start();
            }

            testPeer.expectClose();
            connection.close();

            testPeer.waitForAllHandlersToComplete(2000);
        }
    }

    @Test(timeout = 20000)
    public void testUseDaemonThreadURIOption() throws Exception {
        doUseDaemonThreadTestImpl(null);
        doUseDaemonThreadTestImpl(false);
        doUseDaemonThreadTestImpl(true);
    }

    private void doUseDaemonThreadTestImpl(Boolean useDaemonThread) throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

            String remoteURI = "amqp://localhost:" + testPeer.getServerPort();
            if (useDaemonThread != null) {
                remoteURI += "?jms.useDaemonThread=" + useDaemonThread;
            }

            testPeer.expectSaslAnonymous();
            testPeer.expectOpen();
            testPeer.expectBegin();

            final CountDownLatch connectionEstablished = new CountDownLatch(1);
            final AtomicBoolean daemonThread = new AtomicBoolean(false);

            ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
            JmsConnection connection = (JmsConnection) factory.createConnection();

            connection.addConnectionListener(new JmsDefaultConnectionListener() {
                @Override
                public void onConnectionEstablished(URI remoteURI) {
                    // Record whether the thread is daemon or not
                    daemonThread.set(Thread.currentThread().isDaemon());

                    connectionEstablished.countDown();
                }
            });

            connection.start();

            assertTrue("Connection established callback didn't trigger", connectionEstablished.await(5, TimeUnit.SECONDS));

            testPeer.expectClose();
            connection.close();

            testPeer.waitForAllHandlersToComplete(2000);

            if (useDaemonThread == null) {
                // Expect default to be false when not configured
                assertFalse(daemonThread.get());
            } else {
                // Else expect to match the URI option value
                assertEquals(useDaemonThread, daemonThread.get());
            }
        }
    }

    @Test(timeout = 20000)
    public void testConnectionWithPreemptiveServerOpen() throws Exception {

        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

            // Ensure the Connection awaits a ClientID being set or not, giving time for the preemptive server Open
            String uri = "amqp://localhost:" + testPeer.getServerPort() + "?jms.awaitClientID=true";

            testPeer.expectSaslAnonymousWithServerAmqpHeaderSentPreemptively();
            testPeer.sendPreemptiveServerAmqpHeader();
            testPeer.sendPreemptiveServerOpenFrame();
            // Then expect the clients header to arrive, but defer responding since the servers was already sent.
            testPeer.expectHeader(AmqpHeader.HEADER, null);

            ConnectionFactory factory = new JmsConnectionFactory(uri);
            Connection connection = factory.createConnection();

            // Then expect the clients Open frame to arrive, but defer responding since the servers was already sent
            // before the clients AMQP connection open is provoked.
            testPeer.expectOpen(null, null, true);
            testPeer.expectBegin();

            Thread.sleep(10); // Gives a little more time for the preemptive Open to actually arrive.

            // Use the connection to provoke the Open
            connection.setClientID("client-id");

            testPeer.expectClose();
            connection.close();

            testPeer.waitForAllHandlersToComplete(2000);
        }
    }

    @Test(timeout = 20000)
    public void testConnectionPropertiesExtensionAddedValues() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            final String property1 = "property1";
            final String property2 = "property2";

            final String value1 = UUID.randomUUID().toString();
            final String value2 = UUID.randomUUID().toString();

            Matcher<?> connPropsMatcher = allOf(
                    hasEntry(Symbol.valueOf(property1), value1),
                    hasEntry(Symbol.valueOf(property2), value2),
                    hasEntry(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME),
                    hasEntry(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION),
                    hasEntry(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS));

            testPeer.expectSaslAnonymous();
            testPeer.expectOpen(connPropsMatcher, null, false);
            testPeer.expectBegin();

            final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort());

            JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);

            factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
                Map<String, Object> properties = new HashMap<>();

                properties.put(property1, value1);
                properties.put(property2, value2);

                return properties;
            });

            Connection connection = factory.createConnection();
            connection.start();

            testPeer.waitForAllHandlersToComplete(1000);
            assertNull(testPeer.getThrowable());

            testPeer.expectClose();
            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testConnectionPropertiesExtensionAddedValuesOfNonString() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            final String property1 = "property1";
            final String property2 = "property2";

            final UUID value1 = UUID.randomUUID();
            final UUID value2 = UUID.randomUUID();

            Matcher<?> connPropsMatcher = allOf(
                    hasEntry(Symbol.valueOf(property1), value1),
                    hasEntry(Symbol.valueOf(property2), value2));

            testPeer.expectSaslAnonymous();
            testPeer.expectOpen(connPropsMatcher, null, false);
            testPeer.expectBegin();

            final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort());

            JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);

            factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
                Map<String, Object> properties = new HashMap<>();

                properties.put(property1, value1);
                properties.put(property2, value2);

                return properties;
            });

            Connection connection = factory.createConnection();
            connection.start();

            testPeer.waitForAllHandlersToComplete(1000);
            testPeer.expectClose();
            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testConnectionPropertiesExtensionProtectsClientProperties() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

            Matcher<?> connPropsMatcher = allOf(
                    hasEntry(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME),
                    hasEntry(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION),
                    hasEntry(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS));

            testPeer.expectSaslAnonymous();
            testPeer.expectOpen(connPropsMatcher, null, false);
            testPeer.expectBegin();

            final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort());

            JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);

            factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
                Map<String, Object> properties = new HashMap<>();

                properties.put(AmqpSupport.PRODUCT.toString(), "Super-Duper-Qpid-JMS");
                properties.put(AmqpSupport.VERSION.toString(), "5.0.32.Final");
                properties.put(AmqpSupport.PLATFORM.toString(), "Commodore 64");

                return properties;
            });

            Connection connection = factory.createConnection();
            connection.start();

            testPeer.waitForAllHandlersToComplete(1000);
            testPeer.expectClose();
            connection.close();
        }
    }

    @Test(timeout = 20000)
    public void testConnectionFailsWhenUserSuppliesIllegalProperties() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

            testPeer.setSuppressReadExceptionOnClose(true);
            testPeer.expectSaslAnonymous();

            final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort());

            JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);

            factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
                Map<String, Object> properties = new HashMap<>();

                properties.put("not-amqp-encodable", factory);

                return properties;
            });

            Connection connection = factory.createConnection();

            try {
                connection.start();
                fail("Should not be able to connect when illegal types are in the properties");
            } catch (JMSException ex) {
            } catch (Exception unexpected) {
                fail("Caught unexpected error from connnection.start() : " + unexpected);
            }

            testPeer.waitForAllHandlersToComplete(1000);
        }
    }

    @Ignore("Disabled due to requirement of hard coded port")
    @Test(timeout = 20000)
    public void testLocalPortOption() throws Exception {
        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
            testPeer.expectSaslAnonymous();
            testPeer.expectOpen();
            // Each connection creates a session for managing temporary destinations etc
            testPeer.expectBegin();

            int localPort = 5671;
            String uri = "amqp://localhost:" + testPeer.getServerPort() + "?transport.localPort=" + localPort;
            ConnectionFactory factory = new JmsConnectionFactory(uri);
            Connection connection = factory.createConnection();
            connection.start();

            testPeer.waitForAllHandlersToComplete(2000);

            testPeer.expectClose();
            connection.close();

            int clientPort = testPeer.getClientSocket().getPort();
            assertEquals(localPort, clientPort);
        }
    }
}
