| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.jms.integration; |
| |
| import static org.apache.qpid.jms.provider.amqp.AmqpSupport.NETWORK_HOST; |
| import static org.apache.qpid.jms.provider.amqp.AmqpSupport.OPEN_HOSTNAME; |
| import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PORT; |
| import static org.hamcrest.CoreMatchers.containsString; |
| import static org.hamcrest.Matchers.allOf; |
| import static org.hamcrest.Matchers.arrayContaining; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.hasEntry; |
| import static org.hamcrest.Matchers.notNullValue; |
| import static org.hamcrest.Matchers.nullValue; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import javax.jms.BytesMessage; |
| import javax.jms.Connection; |
| import javax.jms.ConnectionFactory; |
| import javax.jms.ExceptionListener; |
| import javax.jms.IllegalStateException; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.ResourceAllocationException; |
| import javax.jms.Session; |
| |
| import org.apache.qpid.jms.JmsConnection; |
| import org.apache.qpid.jms.JmsConnectionExtensions; |
| import org.apache.qpid.jms.JmsConnectionFactory; |
| import org.apache.qpid.jms.JmsConnectionRemotelyClosedException; |
| import org.apache.qpid.jms.JmsDefaultConnectionListener; |
| import org.apache.qpid.jms.provider.amqp.AmqpSupport; |
| import org.apache.qpid.jms.provider.exceptions.ProviderConnectionRedirectedException; |
| import org.apache.qpid.jms.test.QpidJmsTestCase; |
| import org.apache.qpid.jms.test.Wait; |
| import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; |
| import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; |
| import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError; |
| import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; |
| import org.apache.qpid.jms.util.MetaDataSupport; |
| import org.apache.qpid.jms.util.QpidJMSTestRunner; |
| import org.apache.qpid.jms.util.Repeat; |
| import org.apache.qpid.proton.amqp.Binary; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.UnsignedInteger; |
| import org.apache.qpid.proton.amqp.transaction.TxnCapability; |
| import org.apache.qpid.proton.engine.impl.AmqpHeader; |
| import org.hamcrest.Matcher; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| |
| @RunWith(QpidJMSTestRunner.class) |
| public class ConnectionIntegrationTest extends QpidJmsTestCase { |
| private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); |
| |
| @Test(timeout = 20000) |
| public void testCreateAndCloseConnection() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 10000) |
| public void testCreateConnectionToNonSaslPeer() throws Exception { |
| doConnectionWithUnexpectedHeaderTestImpl(AmqpHeader.HEADER); |
| } |
| |
| @Test(timeout = 10000) |
| public void testCreateConnectionToNonAmqpPeer() throws Exception { |
| byte[] responseHeader = new byte[] { 'N', 'O', 'T', '-', 'A', 'M', 'Q', 'P' }; |
| doConnectionWithUnexpectedHeaderTestImpl(responseHeader); |
| } |
| |
| private void doConnectionWithUnexpectedHeaderTestImpl(byte[] responseHeader) throws Exception, IOException { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| testPeer.expectHeader(AmqpHeader.SASL_HEADER, responseHeader); |
| |
| ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort()); |
| try { |
| factory.createConnection("guest", "guest"); |
| fail("Expected connection creation to fail"); |
| } catch (JMSException jmse) { |
| assertThat(jmse.getMessage(), containsString("SASL header mismatch")); |
| } |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testCloseConnectionTimesOut() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); |
| connection.setCloseTimeout(500); |
| |
| testPeer.expectClose(false); |
| |
| connection.start(); |
| assertNotNull("Connection should not be null", connection); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testCloseConnectionCompletesWhenConnectionDropsBeforeResponse() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); |
| |
| testPeer.expectClose(false); |
| testPeer.dropAfterLastHandler(); |
| |
| connection.start(); |
| assertNotNull("Connection should not be null", connection); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testCreateConnectionWithClientId() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| Connection connection = testFixture.establishConnecton(testPeer, false, null, null, null, true); |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testCreateAutoAckSession() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| testPeer.expectBegin(); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| assertNotNull("Session should not be null", session); |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testCreateAutoAckSessionByDefault() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| testPeer.expectBegin(); |
| Session session = connection.createSession(); |
| assertNotNull("Session should not be null", session); |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testCreateAutoAckSessionUsingAckModeOnlyMethod() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| testPeer.expectBegin(); |
| Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); |
| assertNotNull("Session should not be null", session); |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testCreateTransactedSession() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| |
| testPeer.expectBegin(); |
| // Expect the session, with an immediate link to the transaction coordinator |
| // using a target with the expected capabilities only. |
| CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); |
| txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN)); |
| testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); |
| |
| // First expect an unsettled 'declare' transfer to the txn coordinator, and |
| // reply with a declared disposition state containing the txnId. |
| Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| testPeer.expectDeclare(txnId); |
| testPeer.expectDischarge(txnId, true); |
| testPeer.expectClose(); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| assertNotNull("Session should not be null", session); |
| |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testCreateTransactedSessionUsingAckModeOnlyMethod() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| |
| testPeer.expectBegin(); |
| // Expect the session, with an immediate link to the transaction coordinator |
| // using a target with the expected capabilities only. |
| CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); |
| txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN)); |
| testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); |
| |
| // First expect an unsettled 'declare' transfer to the txn coordinator, and |
| // reply with a declared disposition state containing the txnId. |
| Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| testPeer.expectDeclare(txnId); |
| testPeer.expectDischarge(txnId, true); |
| testPeer.expectClose(); |
| |
| Session session = connection.createSession(Session.SESSION_TRANSACTED); |
| assertNotNull("Session should not be null", session); |
| |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testCreateTransactedSessionFailsWhenNoDetachResponseSent() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| ((JmsConnection) connection).setRequestTimeout(500); |
| |
| testPeer.expectBegin(); |
| // Expect the session, with an immediate link to the transaction coordinator |
| // using a target with the expected capabilities only. |
| CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); |
| txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN)); |
| testPeer.expectSenderAttach(notNullValue(), txCoordinatorMatcher, true, true, false, 0, null, null); |
| testPeer.expectDetach(true, false, false); |
| // Expect the AMQP session to be closed due to the JMS session creation failure. |
| testPeer.expectEnd(); |
| |
| try { |
| connection.createSession(true, Session.SESSION_TRANSACTED); |
| fail("Session create should have failed."); |
| } catch (JMSException ex) { |
| // Expected |
| } |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testRemotelyCloseConnectionDuringSessionCreation() throws Exception { |
| final String BREAD_CRUMB = "ErrorMessageBreadCrumb"; |
| |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| |
| // Expect the begin, then explicitly close the connection with an error |
| testPeer.expectBegin(notNullValue(), false); |
| testPeer.remotelyCloseConnection(true, AmqpError.NOT_ALLOWED, BREAD_CRUMB); |
| |
| try { |
| connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| fail("Expected exception to be thrown"); |
| } catch (JMSException jmse) { |
| // Expected |
| assertNotNull("Expected exception to have a message", jmse.getMessage()); |
| assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB)); |
| } |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testRemotelyDropConnectionDuringSessionCreation() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| |
| // Expect the begin, then drop connection without without a close frame. |
| testPeer.expectBegin(notNullValue(), false); |
| testPeer.dropAfterLastHandler(); |
| |
| try { |
| connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| fail("Expected exception to be thrown"); |
| } catch (JMSException jmse) { |
| // Expected |
| } |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| connection.close(); |
| } |
| } |
| |
| @Repeat(repetitions = 1) |
| @Test(timeout = 20000) |
| public void testRemotelyDropConnectionDuringSessionCreationTransacted() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| |
| ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?jms.clientID=foo"); |
| Connection connection = factory.createConnection(); |
| |
| CountDownLatch exceptionListenerFired = new CountDownLatch(1); |
| connection.setExceptionListener(ex -> exceptionListenerFired.countDown()); |
| |
| // Expect the begin, then drop connection without without a close frame before the tx-coordinator setup. |
| testPeer.expectBegin(); |
| testPeer.dropAfterLastHandler(); |
| |
| try { |
| connection.createSession(true, Session.SESSION_TRANSACTED); |
| fail("Expected exception to be thrown"); |
| } catch (JMSException jmse) { |
| // Expected |
| } |
| |
| assertTrue("Exception listener did not fire", exceptionListenerFired.await(5, TimeUnit.SECONDS)); |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testConnectionPropertiesContainExpectedMetaData() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| Matcher<?> connPropsMatcher = allOf(hasEntry(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME), |
| hasEntry(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION), |
| hasEntry(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS)); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(connPropsMatcher, null, false); |
| testPeer.expectBegin(); |
| |
| ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?jms.clientID=foo"); |
| Connection connection = factory.createConnection(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| assertNull(testPeer.getThrowable()); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testMaxFrameSizeOptionCommunicatedInOpen() throws Exception { |
| int frameSize = 39215; |
| doMaxFrameSizeOptionTestImpl(frameSize, UnsignedInteger.valueOf(frameSize)); |
| } |
| |
| @Test(timeout = 20000) |
| public void testMaxFrameSizeOptionCommunicatedInOpenDefault() throws Exception { |
| doMaxFrameSizeOptionTestImpl(-1, UnsignedInteger.MAX_VALUE); |
| } |
| |
| private void doMaxFrameSizeOptionTestImpl(int uriOption, UnsignedInteger transmittedValue) throws JMSException, InterruptedException, Exception, IOException { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| testPeer.expectSaslLayerDisabledConnect(equalTo(transmittedValue)); |
| // Each connection creates a session for managing temporary destinations etc |
| testPeer.expectBegin(); |
| |
| String uri = "amqp://localhost:" + testPeer.getServerPort() + "?amqp.saslLayer=false&amqp.maxFrameSize=" + uriOption; |
| ConnectionFactory factory = new JmsConnectionFactory(uri); |
| Connection connection = factory.createConnection(); |
| connection.start(); |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| assertNull(testPeer.getThrowable()); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testMaxFrameSizeInfluencesOutgoingFrameSize() throws Exception { |
| doMaxFrameSizeInfluencesOutgoingFrameSizeTestImpl(1000, 10001, 11); |
| doMaxFrameSizeInfluencesOutgoingFrameSizeTestImpl(1500, 6001, 5); |
| } |
| |
| private void doMaxFrameSizeInfluencesOutgoingFrameSizeTestImpl(int frameSize, int bytesPayloadSize, int numFrames) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| testPeer.expectSaslLayerDisabledConnect(equalTo(UnsignedInteger.valueOf(frameSize))); |
| // Each connection creates a session for managing temporary destinations etc |
| testPeer.expectBegin(); |
| |
| String uri = "amqp://localhost:" + testPeer.getServerPort() + "?amqp.saslLayer=false&amqp.maxFrameSize=" + frameSize; |
| ConnectionFactory factory = new JmsConnectionFactory(uri); |
| Connection connection = factory.createConnection(); |
| |
| testPeer.expectBegin(); |
| testPeer.expectSenderAttach(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| MessageProducer producer = session.createProducer(queue); |
| |
| // Expect n-1 transfers of maxFrameSize |
| for (int i = 1; i < numFrames; i++) { |
| testPeer.expectTransfer(frameSize); |
| } |
| // Plus one more of unknown size (framing overhead). |
| testPeer.expectTransfer(0); |
| |
| // Send the message |
| byte[] orig = createBytePyload(bytesPayloadSize); |
| BytesMessage message = session.createBytesMessage(); |
| message.writeBytes(orig); |
| |
| producer.send(message); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| } |
| } |
| |
| private static byte[] createBytePyload(int sizeInBytes) { |
| Random rand = new Random(System.currentTimeMillis()); |
| |
| byte[] payload = new byte[sizeInBytes]; |
| for (int i = 0; i < sizeInBytes; i++) { |
| payload[i] = (byte) (64 + 1 + rand.nextInt(9)); |
| } |
| |
| return payload; |
| } |
| |
| @Test(timeout = 20000) |
| public void testAmqpHostnameSetByDefault() throws Exception { |
| doAmqpHostnameTestImpl("localhost", false, equalTo("localhost")); |
| } |
| |
| @Test(timeout = 20000) |
| public void testAmqpHostnameSetByVhostOption() throws Exception { |
| String vhost = "myAmqpHost"; |
| doAmqpHostnameTestImpl(vhost, true, equalTo(vhost)); |
| } |
| |
| @Test(timeout = 20000) |
| public void testAmqpHostnameNotSetWithEmptyVhostOption() throws Exception { |
| doAmqpHostnameTestImpl("", true, nullValue()); |
| } |
| |
| private void doAmqpHostnameTestImpl(String amqpHostname, boolean setHostnameOption, Matcher<?> hostnameMatcher) throws JMSException, InterruptedException, Exception, IOException { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(null, hostnameMatcher, false); |
| // Each connection creates a session for managing temporary destinations etc |
| testPeer.expectBegin(); |
| |
| String uri = "amqp://localhost:" + testPeer.getServerPort(); |
| if(setHostnameOption) { |
| uri += "?amqp.vhost=" + amqpHostname; |
| } |
| |
| ConnectionFactory factory = new JmsConnectionFactory(uri); |
| Connection connection = factory.createConnection(); |
| // Set a clientID to provoke the actual AMQP connection process to occur. |
| connection.setClientID("clientName"); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| assertNull(testPeer.getThrowable()); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testRemotelyEndConnectionListenerInvoked() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| final CountDownLatch done = new CountDownLatch(1); |
| |
| // Don't set a ClientId, so that the underlying AMQP connection isn't established yet |
| Connection connection = testFixture.establishConnecton(testPeer, false, null, null, null, false); |
| |
| // Tell the test peer to close the connection when executing its last handler |
| testPeer.remotelyCloseConnection(true); |
| |
| // Add the exception listener |
| connection.setExceptionListener(new ExceptionListener() { |
| |
| @Override |
| public void onException(JMSException exception) { |
| done.countDown(); |
| } |
| }); |
| |
| // Trigger the underlying AMQP connection |
| connection.start(); |
| |
| assertTrue("Connection should report failure", done.await(5, TimeUnit.SECONDS)); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testRemotelyEndConnectionWithRedirect() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| final CountDownLatch done = new CountDownLatch(1); |
| final AtomicReference<JMSException> asyncError = new AtomicReference<JMSException>(); |
| |
| final String redirectVhost = "vhost"; |
| final String redirectNetworkHost = "localhost"; |
| final int redirectPort = 5677; |
| |
| // Don't set a ClientId, so that the underlying AMQP connection isn't established yet |
| Connection connection = testFixture.establishConnecton(testPeer, false, null, null, null, false); |
| |
| // Tell the test peer to close the connection when executing its last handler |
| Map<Symbol, Object> errorInfo = new HashMap<Symbol, Object>(); |
| errorInfo.put(OPEN_HOSTNAME, redirectVhost); |
| errorInfo.put(NETWORK_HOST, redirectNetworkHost); |
| errorInfo.put(PORT, 5677); |
| |
| testPeer.remotelyCloseConnection(true, ConnectionError.REDIRECT, "Connection redirected", errorInfo); |
| |
| // Add the exception listener |
| connection.setExceptionListener(new ExceptionListener() { |
| |
| @Override |
| public void onException(JMSException exception) { |
| asyncError.set(exception); |
| done.countDown(); |
| } |
| }); |
| |
| // Trigger the underlying AMQP connection |
| connection.start(); |
| |
| assertTrue("Connection should report failure", done.await(5, TimeUnit.SECONDS)); |
| |
| assertTrue(asyncError.get() instanceof JMSException); |
| assertTrue(asyncError.get().getCause() instanceof ProviderConnectionRedirectedException); |
| |
| ProviderConnectionRedirectedException redirect = (ProviderConnectionRedirectedException) asyncError.get().getCause(); |
| URI redirectionURI = redirect.getRedirectionURI(); |
| |
| assertNotNull(redirectionURI); |
| assertTrue(redirectVhost, redirectionURI.getQuery().contains("amqp.vhost=" + redirectVhost)); |
| assertEquals(redirectNetworkHost, redirectionURI.getHost()); |
| assertEquals(redirectPort, redirectionURI.getPort()); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testRemotelyEndConnectionWithSessionWithConsumer() throws Exception { |
| final String BREAD_CRUMB = "ErrorMessage"; |
| |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| final Connection connection = testFixture.establishConnecton(testPeer); |
| |
| testPeer.expectBegin(); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| // Create a consumer, then remotely end the connection afterwards. |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlow(); |
| testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB); |
| |
| Queue queue = session.createQueue("myQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| assertTrue("connection never closed.", Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisfied() throws Exception { |
| return !((JmsConnection) connection).isConnected(); |
| } |
| }, 10000, 10)); |
| |
| try { |
| connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| fail("Expected ISE to be thrown due to being closed"); |
| } catch (IllegalStateException jmsise) { |
| String message = jmsise.getCause().getMessage(); |
| assertTrue(message.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString())); |
| assertTrue(message.contains(BREAD_CRUMB)); |
| } |
| |
| // Verify the session is now marked closed |
| try { |
| session.getAcknowledgeMode(); |
| fail("Expected ISE to be thrown due to being closed"); |
| } catch (IllegalStateException jmsise) { |
| String message = jmsise.getCause().getMessage(); |
| assertTrue(message.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString())); |
| assertTrue(message.contains(BREAD_CRUMB)); |
| } |
| |
| // Verify the consumer is now marked closed |
| try { |
| consumer.getMessageListener(); |
| fail("Expected ISE to be thrown due to being closed"); |
| } catch (IllegalStateException jmsise) { |
| String message = jmsise.getCause().getMessage(); |
| assertTrue(message.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString())); |
| assertTrue(message.contains(BREAD_CRUMB)); |
| } |
| |
| // Try closing them explicitly, should effectively no-op in client. |
| // The test peer will throw during close if it sends anything. |
| consumer.close(); |
| session.close(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testRemotelyEndConnectionWithSessionWithProducerWithSendWaitingOnCredit() throws Exception { |
| final String BREAD_CRUMB = "ErrorMessageBreadCrumb"; |
| |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| |
| testPeer.expectBegin(); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| // Expect producer creation, don't give it credit. |
| testPeer.expectSenderAttachWithoutGrantingCredit(); |
| |
| // Producer has no credit so the send should block waiting for it. |
| testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB, 50); |
| |
| Queue queue = session.createQueue("myQueue"); |
| final MessageProducer producer = session.createProducer(queue); |
| |
| Message message = session.createTextMessage("myMessage"); |
| |
| try { |
| producer.send(message); |
| fail("Expected exception to be thrown"); |
| } catch (ResourceAllocationException jmse) { |
| // Expected |
| assertNotNull("Expected exception to have a message", jmse.getMessage()); |
| assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB)); |
| } catch (Throwable t) { |
| fail("Caught unexpected exception: " + t); |
| } |
| |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testRemotelyEndConnectionWithSessionWithProducerWithSendWaitingOnOutcome() throws Exception { |
| final String BREAD_CRUMB = "ErrorMessageBreadCrumb"; |
| |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| |
| testPeer.expectBegin(); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| // Expect producer creation, and a message to be sent, but don't return a disposition. |
| // Instead, close the connection. |
| testPeer.expectSenderAttach(); |
| testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher()); |
| testPeer.remotelyCloseConnection(true, ConnectionError.CONNECTION_FORCED, BREAD_CRUMB, 50); |
| |
| Queue queue = session.createQueue("myQueue"); |
| final MessageProducer producer = session.createProducer(queue); |
| |
| Message message = session.createTextMessage("myMessage"); |
| |
| try { |
| producer.send(message); |
| fail("Expected exception to be thrown"); |
| } catch (JmsConnectionRemotelyClosedException jmse) { |
| // Expected |
| assertNotNull("Expected exception to have a message", jmse.getMessage()); |
| assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB)); |
| } catch (Throwable t) { |
| fail("Caught unexpected exception: " + t); |
| } |
| |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testCreateConnectionWithServerSendingPreemptiveData() throws Exception { |
| boolean sendServerSaslHeaderPreEmptively = true; |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(null, false, sendServerSaslHeaderPreEmptively);) { |
| // Don't use test fixture, handle the connection directly to control sasl behaviour |
| testPeer.expectSaslAnonymousWithPreEmptiveServerHeader(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| |
| JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort()); |
| Connection connection = factory.createConnection(); |
| connection.start(); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testDontAwaitClientIDBeforeOpen() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| String uri = "amqp://localhost:" + testPeer.getServerPort() + "?jms.awaitClientID=false"; |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| |
| ConnectionFactory factory = new JmsConnectionFactory(uri); |
| Connection connection = factory.createConnection(); |
| |
| // Verify that all handlers complete, i.e. the awaitClientID=false option |
| // setting was effective in provoking the AMQP Open immediately even |
| // though it has no ClientID and we haven't used the Connection. |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testWaitForClientIDDoesNotOpenUntilPromptedWithSetClientID() throws Exception { |
| doTestWaitForClientIDDoesNotOpenUntilPrompted(true); |
| } |
| |
| @Test(timeout = 20000) |
| public void testWaitForClientIDDoesNotOpenUntilPromptedWithStart() throws Exception { |
| doTestWaitForClientIDDoesNotOpenUntilPrompted(false); |
| } |
| |
| private void doTestWaitForClientIDDoesNotOpenUntilPrompted(boolean setClientID) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| String uri = "amqp://localhost:" + testPeer.getServerPort() + "?jms.awaitClientID=true"; |
| |
| testPeer.expectSaslAnonymous(); |
| |
| ConnectionFactory factory = new JmsConnectionFactory(uri); |
| Connection connection = factory.createConnection(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| |
| if (setClientID) { |
| connection.setClientID("client-id"); |
| } else { |
| connection.start(); |
| } |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testUseDaemonThreadURIOption() throws Exception { |
| doUseDaemonThreadTestImpl(null); |
| doUseDaemonThreadTestImpl(false); |
| doUseDaemonThreadTestImpl(true); |
| } |
| |
| private void doUseDaemonThreadTestImpl(Boolean useDaemonThread) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| String remoteURI = "amqp://localhost:" + testPeer.getServerPort(); |
| if (useDaemonThread != null) { |
| remoteURI += "?jms.useDaemonThread=" + useDaemonThread; |
| } |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| |
| final CountDownLatch connectionEstablished = new CountDownLatch(1); |
| final AtomicBoolean daemonThread = new AtomicBoolean(false); |
| |
| ConnectionFactory factory = new JmsConnectionFactory(remoteURI); |
| JmsConnection connection = (JmsConnection) factory.createConnection(); |
| |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| // Record whether the thread is daemon or not |
| daemonThread.set(Thread.currentThread().isDaemon()); |
| |
| connectionEstablished.countDown(); |
| } |
| }); |
| |
| connection.start(); |
| |
| assertTrue("Connection established callback didn't trigger", connectionEstablished.await(5, TimeUnit.SECONDS)); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| |
| if (useDaemonThread == null) { |
| // Expect default to be false when not configured |
| assertFalse(daemonThread.get()); |
| } else { |
| // Else expect to match the URI option value |
| assertEquals(useDaemonThread, daemonThread.get()); |
| } |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testConnectionWithPreemptiveServerOpen() throws Exception { |
| |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| // Ensure the Connection awaits a ClientID being set or not, giving time for the preemptive server Open |
| String uri = "amqp://localhost:" + testPeer.getServerPort() + "?jms.awaitClientID=true"; |
| |
| testPeer.expectSaslAnonymousWithServerAmqpHeaderSentPreemptively(); |
| testPeer.sendPreemptiveServerAmqpHeader(); |
| testPeer.sendPreemptiveServerOpenFrame(); |
| // Then expect the clients header to arrive, but defer responding since the servers was already sent. |
| testPeer.expectHeader(AmqpHeader.HEADER, null); |
| |
| ConnectionFactory factory = new JmsConnectionFactory(uri); |
| Connection connection = factory.createConnection(); |
| |
| // Then expect the clients Open frame to arrive, but defer responding since the servers was already sent |
| // before the clients AMQP connection open is provoked. |
| testPeer.expectOpen(null, null, true); |
| testPeer.expectBegin(); |
| |
| Thread.sleep(10); // Gives a little more time for the preemptive Open to actually arrive. |
| |
| // Use the connection to provoke the Open |
| connection.setClientID("client-id"); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testConnectionPropertiesExtensionAddedValues() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| final String property1 = "property1"; |
| final String property2 = "property2"; |
| |
| final String value1 = UUID.randomUUID().toString(); |
| final String value2 = UUID.randomUUID().toString(); |
| |
| Matcher<?> connPropsMatcher = allOf( |
| hasEntry(Symbol.valueOf(property1), value1), |
| hasEntry(Symbol.valueOf(property2), value2), |
| hasEntry(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME), |
| hasEntry(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION), |
| hasEntry(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS)); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(connPropsMatcher, null, false); |
| testPeer.expectBegin(); |
| |
| final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort()); |
| |
| JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI); |
| |
| factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> { |
| Map<String, Object> properties = new HashMap<>(); |
| |
| properties.put(property1, value1); |
| properties.put(property2, value2); |
| |
| return properties; |
| }); |
| |
| Connection connection = factory.createConnection(); |
| connection.start(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| assertNull(testPeer.getThrowable()); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testConnectionPropertiesExtensionAddedValuesOfNonString() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| final String property1 = "property1"; |
| final String property2 = "property2"; |
| |
| final UUID value1 = UUID.randomUUID(); |
| final UUID value2 = UUID.randomUUID(); |
| |
| Matcher<?> connPropsMatcher = allOf( |
| hasEntry(Symbol.valueOf(property1), value1), |
| hasEntry(Symbol.valueOf(property2), value2)); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(connPropsMatcher, null, false); |
| testPeer.expectBegin(); |
| |
| final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort()); |
| |
| JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI); |
| |
| factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> { |
| Map<String, Object> properties = new HashMap<>(); |
| |
| properties.put(property1, value1); |
| properties.put(property2, value2); |
| |
| return properties; |
| }); |
| |
| Connection connection = factory.createConnection(); |
| connection.start(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testConnectionPropertiesExtensionProtectsClientProperties() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| Matcher<?> connPropsMatcher = allOf( |
| hasEntry(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME), |
| hasEntry(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION), |
| hasEntry(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS)); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(connPropsMatcher, null, false); |
| testPeer.expectBegin(); |
| |
| final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort()); |
| |
| JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI); |
| |
| factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> { |
| Map<String, Object> properties = new HashMap<>(); |
| |
| properties.put(AmqpSupport.PRODUCT.toString(), "Super-Duper-Qpid-JMS"); |
| properties.put(AmqpSupport.VERSION.toString(), "5.0.32.Final"); |
| properties.put(AmqpSupport.PLATFORM.toString(), "Commodore 64"); |
| |
| return properties; |
| }); |
| |
| Connection connection = factory.createConnection(); |
| connection.start(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testConnectionFailsWhenUserSuppliesIllegalProperties() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| testPeer.expectSaslAnonymous(); |
| |
| final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort()); |
| |
| JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI); |
| |
| factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> { |
| Map<String, Object> properties = new HashMap<>(); |
| |
| properties.put("not-amqp-encodable", factory); |
| |
| return properties; |
| }); |
| |
| Connection connection = factory.createConnection(); |
| |
| try { |
| connection.start(); |
| fail("Should not be able to connect when illegal types are in the properties"); |
| } catch (JMSException ex) { |
| } |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Ignore("Disabled due to requirement of hard coded port") |
| @Test(timeout = 20000) |
| public void testLocalPortOption() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| // Each connection creates a session for managing temporary destinations etc |
| testPeer.expectBegin(); |
| |
| int localPort = 5671; |
| String uri = "amqp://localhost:" + testPeer.getServerPort() + "?transport.localPort=" + localPort; |
| ConnectionFactory factory = new JmsConnectionFactory(uri); |
| Connection connection = factory.createConnection(); |
| connection.start(); |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| int clientPort = testPeer.getClientSocket().getPort(); |
| assertEquals(localPort, clientPort); |
| } |
| } |
| } |