| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.qpid.jms.integration; |
| |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.notNullValue; |
| import static org.hamcrest.Matchers.nullValue; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.fail; |
| |
| import java.util.Date; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import javax.jms.Connection; |
| import javax.jms.Destination; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.Session; |
| import javax.jms.TemporaryQueue; |
| import javax.jms.TemporaryTopic; |
| import javax.jms.Topic; |
| |
| import org.apache.qpid.jms.JmsClientProperties; |
| import org.apache.qpid.jms.JmsConnection; |
| import org.apache.qpid.jms.provider.amqp.AmqpConnectionProperties; |
| import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper; |
| import org.apache.qpid.jms.provider.amqp.message.AmqpMessageIdHelper; |
| import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport; |
| import org.apache.qpid.jms.test.QpidJmsTestCase; |
| import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType; |
| import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.ApplicationPropertiesSectionMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; |
| import org.apache.qpid.proton.amqp.Binary; |
| import org.apache.qpid.proton.amqp.DescribedType; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.UnsignedInteger; |
| import org.apache.qpid.proton.amqp.UnsignedLong; |
| import org.junit.Test; |
| |
| public class MessageIntegrationTest extends QpidJmsTestCase |
| { |
| private static final String NULL_STRING_PROP = "nullStringProperty"; |
| private static final String NULL_STRING_PROP_VALUE = null; |
| private static final String STRING_PROP = "stringProperty"; |
| private static final String STRING_PROP_VALUE = "string"; |
| private static final String BOOLEAN_PROP = "booleanProperty"; |
| private static final boolean BOOLEAN_PROP_VALUE = true; |
| private static final String BYTE_PROP = "byteProperty"; |
| private static final byte BYTE_PROP_VALUE = (byte)1; |
| private static final String SHORT_PROP = "shortProperty"; |
| private static final short SHORT_PROP_VALUE = (short)1; |
| private static final String INT_PROP = "intProperty"; |
| private static final int INT_PROP_VALUE = Integer.MAX_VALUE; |
| private static final String LONG_PROP = "longProperty"; |
| private static final long LONG_PROP_VALUE = Long.MAX_VALUE; |
| private static final String FLOAT_PROP = "floatProperty"; |
| private static final float FLOAT_PROP_VALUE = Float.MAX_VALUE; |
| private static final String DOUBLE_PROP = "doubleProperty"; |
| private static final double DOUBLE_PROP_VALUE = Double.MAX_VALUE; |
| |
| private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); |
| |
| //==== Application Properties Section ==== |
| //======================================== |
| |
| @Test(timeout = 2000) |
| public void testSendMessageWithApplicationProperties() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| testPeer.expectBegin(true); |
| testPeer.expectSenderAttach(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String queueName = "myQueue"; |
| Queue queue = session.createQueue(queueName); |
| MessageProducer producer = session.createProducer(queue); |
| |
| ApplicationPropertiesSectionMatcher appPropsMatcher = new ApplicationPropertiesSectionMatcher(true); |
| appPropsMatcher.withEntry(NULL_STRING_PROP, nullValue()); |
| appPropsMatcher.withEntry(STRING_PROP, equalTo(STRING_PROP_VALUE)); |
| appPropsMatcher.withEntry(BOOLEAN_PROP, equalTo(BOOLEAN_PROP_VALUE)); |
| appPropsMatcher.withEntry(BYTE_PROP, equalTo(BYTE_PROP_VALUE)); |
| appPropsMatcher.withEntry(SHORT_PROP, equalTo(SHORT_PROP_VALUE)); |
| appPropsMatcher.withEntry(INT_PROP, equalTo(INT_PROP_VALUE)); |
| appPropsMatcher.withEntry(LONG_PROP, equalTo(LONG_PROP_VALUE)); |
| appPropsMatcher.withEntry(FLOAT_PROP, equalTo(FLOAT_PROP_VALUE)); |
| appPropsMatcher.withEntry(DOUBLE_PROP, equalTo(DOUBLE_PROP_VALUE)); |
| |
| MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); |
| |
| MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); |
| |
| MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(queueName)); |
| |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(headersMatcher); |
| messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); |
| messageMatcher.setPropertiesMatcher(propsMatcher); |
| messageMatcher.setApplicationPropertiesMatcher(appPropsMatcher); |
| //TODO: currently we aren't sending any body section, decide if this is allowed |
| //messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null)); |
| testPeer.expectTransfer(messageMatcher); |
| |
| Message message = session.createMessage(); |
| message.setStringProperty(NULL_STRING_PROP, NULL_STRING_PROP_VALUE); |
| message.setStringProperty(STRING_PROP, STRING_PROP_VALUE); |
| message.setBooleanProperty(BOOLEAN_PROP, BOOLEAN_PROP_VALUE); |
| message.setByteProperty(BYTE_PROP, BYTE_PROP_VALUE); |
| message.setShortProperty(SHORT_PROP, SHORT_PROP_VALUE); |
| message.setIntProperty(INT_PROP, INT_PROP_VALUE); |
| message.setLongProperty(LONG_PROP, LONG_PROP_VALUE); |
| message.setFloatProperty(FLOAT_PROP, FLOAT_PROP_VALUE); |
| message.setDoubleProperty(DOUBLE_PROP, DOUBLE_PROP_VALUE); |
| |
| producer.send(message); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test(timeout = 2000) |
| public void testReceiveMessageWithApplicationProperties() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| props.setMessageId("myMessageIDString"); |
| |
| ApplicationPropertiesDescribedType appProperties = new ApplicationPropertiesDescribedType(); |
| appProperties.setApplicationProperty(STRING_PROP, STRING_PROP_VALUE); |
| appProperties.setApplicationProperty(NULL_STRING_PROP, NULL_STRING_PROP_VALUE); |
| appProperties.setApplicationProperty(BOOLEAN_PROP, BOOLEAN_PROP_VALUE); |
| appProperties.setApplicationProperty(BYTE_PROP, BYTE_PROP_VALUE); |
| appProperties.setApplicationProperty(SHORT_PROP, SHORT_PROP_VALUE); |
| appProperties.setApplicationProperty(INT_PROP, INT_PROP_VALUE); |
| appProperties.setApplicationProperty(LONG_PROP, LONG_PROP_VALUE); |
| appProperties.setApplicationProperty(FLOAT_PROP, FLOAT_PROP_VALUE); |
| appProperties.setApplicationProperty(DOUBLE_PROP, DOUBLE_PROP_VALUE); |
| |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, props, appProperties, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| assertTrue(receivedMessage.propertyExists(STRING_PROP)); |
| assertTrue(receivedMessage.propertyExists(NULL_STRING_PROP)); |
| assertTrue(receivedMessage.propertyExists(BYTE_PROP)); |
| assertTrue(receivedMessage.propertyExists(BOOLEAN_PROP)); |
| assertTrue(receivedMessage.propertyExists(SHORT_PROP)); |
| assertTrue(receivedMessage.propertyExists(INT_PROP)); |
| assertTrue(receivedMessage.propertyExists(LONG_PROP)); |
| assertTrue(receivedMessage.propertyExists(FLOAT_PROP)); |
| assertTrue(receivedMessage.propertyExists(DOUBLE_PROP)); |
| assertNull(receivedMessage.getStringProperty(NULL_STRING_PROP)); |
| assertEquals(STRING_PROP_VALUE, receivedMessage.getStringProperty(STRING_PROP)); |
| assertEquals(STRING_PROP_VALUE, receivedMessage.getStringProperty(STRING_PROP)); |
| assertEquals(BOOLEAN_PROP_VALUE, receivedMessage.getBooleanProperty(BOOLEAN_PROP)); |
| assertEquals(BYTE_PROP_VALUE, receivedMessage.getByteProperty(BYTE_PROP)); |
| assertEquals(SHORT_PROP_VALUE, receivedMessage.getShortProperty(SHORT_PROP)); |
| assertEquals(INT_PROP_VALUE, receivedMessage.getIntProperty(INT_PROP)); |
| assertEquals(LONG_PROP_VALUE, receivedMessage.getLongProperty(LONG_PROP)); |
| assertEquals(FLOAT_PROP_VALUE, receivedMessage.getFloatProperty(FLOAT_PROP), 0.0); |
| assertEquals(DOUBLE_PROP_VALUE, receivedMessage.getDoubleProperty(DOUBLE_PROP), 0.0); |
| } |
| } |
| |
| //==== Destination Handling ==== |
| //============================== |
| |
| /** |
| * Tests that the lack of a 'to' in the Properties section of the incoming message (e.g |
| * one sent by a non-JMS client) is handled by making the JMSDestination method simply |
| * return the Queue Destination used to create the consumer that received the message. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageFromQueueWithoutToResultsInUseOfConsumerDestinationQueue() throws Exception { |
| receivedMessageFromQueueWithoutToResultsInUseOfConsumerDestinationImpl(true); |
| } |
| |
| /** |
| * Tests that the lack of a 'to' in the Properties section of the incoming message (e.g |
| * one sent by a non-JMS client) is handled by making the JMSDestination method simply |
| * return the Topic Destination used to create the consumer that received the message. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageFromQueueWithoutToResultsInUseOfConsumerDestinationTopic() throws Exception { |
| receivedMessageFromQueueWithoutToResultsInUseOfConsumerDestinationImpl(false); |
| } |
| |
| public void receivedMessageFromQueueWithoutToResultsInUseOfConsumerDestinationImpl(boolean useQueue) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| String queueName = "myQueue"; |
| String topicName = "myTopic"; |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| Destination destination = null; |
| if (useQueue) { |
| destination = session.createQueue(queueName); |
| } else { |
| destination = session.createTopic(topicName); |
| } |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| props.setMessageId("myMessageIDString"); |
| |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(destination); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| assertNotNull(receivedMessage); |
| |
| Destination dest = receivedMessage.getJMSDestination(); |
| if (useQueue) { |
| assertNotNull("expected Queue instance, got null", dest); |
| assertTrue("expected Queue instance. Actual type was: " + dest.getClass().getName(), dest instanceof Queue); |
| assertEquals(queueName, ((Queue) dest).getQueueName()); |
| } else { |
| assertNotNull("expected Topic instance, got null", dest); |
| assertTrue("expected Topic instance. Actual type was: " + dest.getClass().getName(), dest instanceof Topic); |
| assertEquals(topicName, ((Topic) dest).getTopicName()); |
| } |
| } |
| } |
| |
| /** |
| * Tests that the a connection with a 'topic prefix' set on it strips the |
| * prefix from the content of the to/reply-to fields for incoming messages. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithTopicDestinationsOnConnectionWithTopicPrefix() throws Exception { |
| Class<? extends Destination> destType = Topic.class; |
| String destPrefix = "t12321-"; |
| String destName = "myTopic"; |
| String replyName = "myReplyTopic"; |
| String destAddress = destPrefix + destName; |
| String replyAddress = destPrefix + replyName; |
| String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME; |
| Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE; |
| String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME; |
| Byte replyAnnotationValue = AmqpDestinationHelper.TOPIC_TYPE; |
| |
| doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName, |
| destAddress, replyAddress, annotationName, |
| annotationValue, replyAnnotationName, replyAnnotationValue); |
| } |
| |
| /** |
| * Tests that the a connection with a 'topic prefix' set on it strips the |
| * prefix from the content of the to/reply-to fields for incoming messages |
| * if they don't have the 'destination type annotation' set. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithNoTypeAnnotationAndTopicDestinationsOnConnectionWithTopicPrefix() throws Exception { |
| Class<? extends Destination> destType = Topic.class; |
| String destPrefix = "t12321-"; |
| String destName = "myTopic"; |
| String replyName = "myReplyTopic"; |
| String destAddress = destPrefix + destName; |
| String replyAddress = destPrefix + replyName; |
| String annotationName = null; |
| Byte annotationValue = null; |
| String replyAnnotationName = null; |
| Byte replyAnnotationValue = null; |
| |
| doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName, |
| destAddress, replyAddress, annotationName, |
| annotationValue, replyAnnotationName, replyAnnotationValue); |
| } |
| |
| /** |
| * Tests that the a connection with a 'queue prefix' set on it strips the |
| * prefix from the content of the to/reply-to fields for incoming messages. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithQueueDestinationsOnConnectionWithQueuePrefix() throws Exception { |
| Class<? extends Destination> destType = Queue.class; |
| String destPrefix = "q12321-"; |
| String destName = "myQueue"; |
| String replyName = "myReplyQueue"; |
| String destAddress = destPrefix + destName; |
| String replyAddress = destPrefix + replyName; |
| String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME; |
| Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE; |
| String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME; |
| Byte replyAnnotationValue = AmqpDestinationHelper.QUEUE_TYPE; |
| |
| doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName, |
| destAddress, replyAddress, annotationName, |
| annotationValue, replyAnnotationName, replyAnnotationValue); |
| } |
| |
| /** |
| * Tests that the a connection with a 'queue prefix' set on it strips the |
| * prefix from the content of the to/reply-to fields for incoming messages |
| * if they don't have the 'destination type annotation' set. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithNoTypeAnnotationAndQueueDestinationsOnConnectionWithQueuePrefix() throws Exception { |
| Class<? extends Destination> destType = Queue.class; |
| String destPrefix = "q12321-"; |
| String destName = "myQueue"; |
| String replyName = "myReplyQueue"; |
| String destAddress = destPrefix + destName; |
| String replyAddress = destPrefix + replyName; |
| String annotationName = null; |
| Byte annotationValue = null; |
| String replyAnnotationName = null; |
| Byte replyAnnotationValue = null; |
| |
| doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName, |
| destAddress, replyAddress, annotationName, |
| annotationValue, replyAnnotationName, replyAnnotationValue); |
| } |
| |
| /** |
| * Tests that a connection with a 'prefixes' set on its does not alter the |
| * address for a temporary queue in the to/reply-to fields for incoming messages. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithTemporaryQueueDestinationsOnConnectionWithPrefixes() throws Exception { |
| Class<? extends Destination> destType = TemporaryQueue.class; |
| String destPrefix = "q12321-"; |
| String destName = "temp-queue://myTempQueue"; |
| String replyName = "temp-queue://myReplyTempQueue"; |
| String destAddress = destName; // We won't manipulate the temporary addresses generated by the broker |
| String replyAddress = replyName; // We won't manipulate the temporary addresses generated by the broker |
| String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME; |
| Byte annotationValue = AmqpDestinationHelper.TEMP_QUEUE_TYPE; |
| String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME; |
| Byte replyAnnotationValue = AmqpDestinationHelper.TEMP_QUEUE_TYPE; |
| |
| doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName, |
| destAddress, replyAddress, annotationName, |
| annotationValue, replyAnnotationName, replyAnnotationValue); |
| } |
| |
| /** |
| * Tests that a connection with a 'prefixes' set on its does not alter the |
| * address for a temporary queue in the to/reply-to fields for incoming messages. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithTemporaryTopicDestinationsOnConnectionWithPrefixes() throws Exception { |
| Class<? extends Destination> destType = TemporaryTopic.class; |
| String destPrefix = "q12321-"; |
| String destName = "temp-topic://myTempTopic"; |
| String replyName = "temp-topic://myReplyTempTopic"; |
| String destAddress = destName; // We won't manipulate the temporary addresses generated by the broker |
| String replyAddress = replyName; // We won't manipulate the temporary addresses generated by the broker |
| String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME; |
| Byte annotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE; |
| String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME; |
| Byte replyAnnotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE; |
| |
| doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName, |
| destAddress, replyAddress, annotationName, |
| annotationValue, replyAnnotationName, replyAnnotationValue); |
| } |
| |
| private void doReceivedMessageOnConnectionWithPrefixTestImpl(Class<? extends Destination> destType, |
| String destPrefix, |
| String destName, |
| String replyName, |
| String destAddress, |
| String replyAddress, |
| String annotationName, |
| Object annotationValue, |
| String replyAnnotationName, |
| Object replyAnnotationValue) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = null; |
| if (destType == Topic.class) { |
| connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix); |
| } else if (destType == Queue.class) { |
| connection = testFixture.establishConnecton(testPeer, "?jms.queuePrefix=" + destPrefix); |
| } else { |
| //Set both the non-temporary prefixes, we wont use non-temp dests but want to ensure they don't affect anything |
| connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix + "&jms.queuePrefix=" + destPrefix); |
| } |
| |
| connection.start(); |
| |
| // Set the prefix if Topic or Queue dest type. |
| if (destType == Topic.class) { |
| ((JmsConnection) connection).setTopicPrefix(destPrefix); |
| } else if (destType == Queue.class) { |
| ((JmsConnection) connection).setQueuePrefix(destPrefix); |
| } |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| // Create the destination |
| Destination dest = null; |
| if (destType == Topic.class) { |
| dest= session.createTopic(destName); |
| } else if (destType == Queue.class) { |
| dest = session.createQueue(destName); |
| } else if (destType == TemporaryTopic.class) { |
| //TODO:add method to expect temp topic creation |
| testPeer.expectTempQueueCreationAttach(destAddress); |
| dest = session.createTemporaryTopic(); |
| } else if (destType == TemporaryQueue.class) { |
| testPeer.expectTempQueueCreationAttach(destAddress); |
| dest = session.createTemporaryQueue(); |
| } |
| |
| MessageAnnotationsDescribedType msgAnnotations = null; |
| if (annotationName != null || replyAnnotationName != null) { |
| msgAnnotations = new MessageAnnotationsDescribedType(); |
| if (annotationName != null) { |
| msgAnnotations.setSymbolKeyedAnnotation(annotationName, annotationValue); |
| } |
| |
| if (replyAnnotationName != null) { |
| msgAnnotations.setSymbolKeyedAnnotation(replyAnnotationName, replyAnnotationValue); |
| } |
| } |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| props.setTo(destAddress); |
| props.setReplyTo(replyAddress); |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| SourceMatcher sourceMatcher = new SourceMatcher(); |
| sourceMatcher.withAddress(equalTo(destAddress)); |
| |
| testPeer.expectReceiverAttach(notNullValue(), sourceMatcher); |
| testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(dest); |
| Message receivedMessage = messageConsumer.receive(1000); |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| assertNotNull(receivedMessage); |
| |
| Destination jmsDest = receivedMessage.getJMSDestination(); |
| Destination jmsReplyTo = receivedMessage.getJMSReplyTo(); |
| |
| assertNotNull("Expected JMSDestination but got null", jmsDest); |
| assertNotNull("Expected JMSReplyTo but got null", jmsReplyTo); |
| |
| // Verify destination/replyto names on received message |
| String recievedName = null; |
| String recievedReplyName = null; |
| if (destType == Topic.class || destType == TemporaryTopic.class) { |
| recievedName = ((Topic) jmsDest).getTopicName(); |
| recievedReplyName = ((Topic) jmsReplyTo).getTopicName(); |
| } else if (destType == Queue.class || destType == TemporaryQueue.class) { |
| recievedName = ((Queue) jmsDest).getQueueName(); |
| recievedReplyName = ((Queue) jmsReplyTo).getQueueName(); |
| } |
| |
| assertEquals("Unexpected name for JMSDestination", destName, recievedName); |
| assertEquals("Unexpected name for JMSReplyTo", replyName, recievedReplyName); |
| |
| if (destType == TemporaryQueue.class || destType == TemporaryTopic.class) { |
| assertEquals("Temporary destination name and address should be equal", destName, destAddress); |
| assertEquals("Temporary replyto name and address should be equal", replyName, replyAddress); |
| } |
| } |
| } |
| |
| /** |
| * Tests that the a connection with a 'topic prefix' set on it adds the |
| * prefix to the content of the to/reply-to fields for outgoing messages. |
| */ |
| @Test(timeout = 2000) |
| public void testSendMessageWithTopicDestinationsOnConnectionWithTopicPrefix() throws Exception { |
| Class<? extends Destination> destType = Topic.class; |
| String destPrefix = "t12321-"; |
| String destName = "myTopic"; |
| String destAddress = destPrefix + destName; |
| Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE; |
| |
| doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress, annotationValue); |
| } |
| |
| /** |
| * Tests that the a connection with a 'queue prefix' set on it adds the |
| * prefix to the content of the to/reply-to fields for outgoing messages. |
| */ |
| @Test(timeout = 2000) |
| public void testSendMessageWithQueueDestinationsOnConnectionWithQueuePrefix() throws Exception { |
| Class<? extends Destination> destType = Queue.class; |
| String destPrefix = "q12321-"; |
| String destName = "myQueue"; |
| String destAddress = destPrefix + destName; |
| Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE; |
| |
| doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress, annotationValue); |
| } |
| |
| /** |
| * Tests that the a connection with 'destination prefixes' set on it does not add |
| * the prefix to the content of the to/reply-to fields for TemporaryQueues. |
| */ |
| @Test(timeout = 2000) |
| public void testSendMessageWithTemporaryQueueDestinationsOnConnectionWithDestinationPrefixes() throws Exception { |
| Class<? extends Destination> destType = TemporaryQueue.class; |
| String destPrefix = "q12321-"; |
| String destName = null; |
| String destAddress = "temp-queue://myTempQueue"; |
| Byte annotationValue = AmqpDestinationHelper.TEMP_QUEUE_TYPE; |
| |
| doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress, annotationValue); |
| } |
| |
| /** |
| * Tests that the a connection with 'destination prefixes' set on it does not add |
| * the prefix to the content of the to/reply-to fields for TemporaryTopics. |
| */ |
| @Test(timeout = 2000) |
| public void testSendMessageWithTemporaryTopicDestinationsOnConnectionWithDestinationPrefixes() throws Exception { |
| Class<? extends Destination> destType = TemporaryTopic.class; |
| String destPrefix = "q12321-"; |
| String destName = null; |
| String destAddress = "temp-topic://myTempTopic"; |
| Byte annotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE; |
| |
| doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress, annotationValue); |
| } |
| |
| private void doSendMessageOnConnectionWithPrefixTestImpl(Class<? extends Destination> destType, |
| String destPrefix, |
| String destName, |
| String destAddress, |
| Byte destTypeAnnotationValue) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = null; |
| if (destType == Topic.class) { |
| connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix); |
| } else if (destType == Queue.class) { |
| connection = testFixture.establishConnecton(testPeer, "?jms.queuePrefix=" + destPrefix); |
| } else { |
| // Set both the non-temporary prefixes, we wont use non-temp dests but want to ensure they don't affect anything |
| connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix + "&jms.queuePrefix=" + destPrefix); |
| } |
| |
| connection.start(); |
| |
| // Set the prefix if Topic or Queue dest type. |
| if (destType == Topic.class) { |
| ((JmsConnection) connection).setTopicPrefix(destPrefix); |
| } else if (destType == Queue.class) { |
| ((JmsConnection) connection).setQueuePrefix(destPrefix); |
| } |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| // Create the destination |
| Destination dest = null; |
| if (destType == Topic.class) { |
| dest = session.createTopic(destName); |
| } else if (destType == Queue.class) { |
| dest = session.createQueue(destName); |
| } else if (destType == TemporaryTopic.class) { |
| // TODO:add method to expect temp topic creation |
| testPeer.expectTempQueueCreationAttach(destAddress); |
| dest = session.createTemporaryTopic(); |
| } else if (destType == TemporaryQueue.class) { |
| testPeer.expectTempQueueCreationAttach(destAddress); |
| dest = session.createTemporaryQueue(); |
| } |
| |
| TargetMatcher targetMatcher = new TargetMatcher(); |
| targetMatcher.withAddress(equalTo(destAddress)); |
| |
| testPeer.expectSenderAttach(targetMatcher, false, false); |
| |
| MessageProducer producer = session.createProducer(dest); |
| |
| MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); |
| MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); |
| msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME), equalTo(destTypeAnnotationValue)); |
| msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME), equalTo(destTypeAnnotationValue)); |
| MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true); |
| propsMatcher.withTo(equalTo(destAddress)); |
| propsMatcher.withReplyTo(equalTo(destAddress)); |
| |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(headersMatcher); |
| messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); |
| messageMatcher.setPropertiesMatcher(propsMatcher); |
| |
| //TODO: currently we aren't sending any body section, decide if this is allowed |
| //messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null)); |
| testPeer.expectTransfer(messageMatcher); |
| |
| Message message = session.createMessage(); |
| message.setJMSReplyTo(dest); |
| |
| producer.send(message); |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| } |
| } |
| |
| /** |
| * Tests that a connection with 'prefixes' set on it via broker-provided connection properties |
| * strips the prefix from the to/reply-to fields for incoming messages with Topic destinations. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithTopicDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception { |
| Class<? extends Destination> destType = Topic.class; |
| String destPrefix = "t-broker-provided-prefix-"; |
| String destName = "myTopic"; |
| String replyName = "myReplyTopic"; |
| String destAddress = destPrefix + destName; |
| String replyAddress = destPrefix + replyName; |
| String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME; |
| Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE; |
| String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME; |
| Byte replyAnnotationValue = AmqpDestinationHelper.TOPIC_TYPE; |
| |
| doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, replyName, |
| destAddress, replyAddress, annotationName, |
| annotationValue, replyAnnotationName, replyAnnotationValue); |
| } |
| |
| /** |
| * Tests that a connection with 'prefixes' set on it via broker-provided connection properties |
| * strips the prefix from the to/reply-to fields for incoming messages with Queue destinations. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithQueueDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception { |
| Class<? extends Destination> destType = Queue.class; |
| String destPrefix = "q-broker-provided-prefix-"; |
| String destName = "myQueue"; |
| String replyName = "myReplyQueue"; |
| String destAddress = destPrefix + destName; |
| String replyAddress = destPrefix + replyName; |
| String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME; |
| Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE; |
| String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME; |
| Byte replyAnnotationValue = AmqpDestinationHelper.QUEUE_TYPE; |
| |
| doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, replyName, |
| destAddress, replyAddress, annotationName, |
| annotationValue, replyAnnotationName, replyAnnotationValue); |
| } |
| |
| private void doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(Class<? extends Destination> destType, |
| String destPrefix, |
| String destName, |
| String replyName, |
| String destAddress, |
| String replyAddress, |
| String annotationName, |
| Object annotationValue, |
| String replyAnnotationName, |
| Object replyAnnotationValue) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| // Have the test peer provide the destination prefixes as connection properties |
| Map<Symbol, Object> properties = new HashMap<Symbol, Object>(); |
| properties.put(AmqpConnectionProperties.QUEUE_PREFIX, destPrefix); |
| properties.put(AmqpConnectionProperties.TOPIC_PREFIX, destPrefix); |
| |
| Connection connection = testFixture.establishConnecton(testPeer, null, null, properties); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| // Create the destination |
| Destination dest = null; |
| if (destType == Topic.class) { |
| dest= session.createTopic(destName); |
| } else if (destType == Queue.class) { |
| dest = session.createQueue(destName); |
| } else { |
| fail("non-temporary destination type set"); |
| } |
| |
| MessageAnnotationsDescribedType msgAnnotations = null; |
| if (annotationName != null || replyAnnotationName != null) { |
| msgAnnotations = new MessageAnnotationsDescribedType(); |
| if (annotationName != null) { |
| msgAnnotations.setSymbolKeyedAnnotation(annotationName, annotationValue); |
| } |
| |
| if (replyAnnotationName != null) { |
| msgAnnotations.setSymbolKeyedAnnotation(replyAnnotationName, replyAnnotationValue); |
| } |
| } |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| props.setTo(destAddress); |
| props.setReplyTo(replyAddress); |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| SourceMatcher sourceMatcher = new SourceMatcher(); |
| sourceMatcher.withAddress(equalTo(destAddress)); |
| |
| testPeer.expectReceiverAttach(notNullValue(), sourceMatcher); |
| testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(dest); |
| Message receivedMessage = messageConsumer.receive(1000); |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| assertNotNull(receivedMessage); |
| |
| Destination jmsDest = receivedMessage.getJMSDestination(); |
| Destination jmsReplyTo = receivedMessage.getJMSReplyTo(); |
| |
| assertNotNull("Expected JMSDestination but got null", jmsDest); |
| assertNotNull("Expected JMSReplyTo but got null", jmsReplyTo); |
| |
| // Verify destination/replyto names on received message |
| String recievedName = null; |
| String recievedReplyName = null; |
| if (destType == Topic.class) { |
| recievedName = ((Topic) jmsDest).getTopicName(); |
| recievedReplyName = ((Topic) jmsReplyTo).getTopicName(); |
| } else if (destType == Queue.class) { |
| recievedName = ((Queue) jmsDest).getQueueName(); |
| recievedReplyName = ((Queue) jmsReplyTo).getQueueName(); |
| } |
| |
| assertEquals("Unexpected name for JMSDestination", destName, recievedName); |
| assertEquals("Unexpected name for JMSReplyTo", replyName, recievedReplyName); |
| } |
| } |
| |
| /** |
| * Tests that the a connection with a 'queue prefix' set on it via broker-provided connection |
| * properties adds the prefix to the content of the to/reply-to fields for outgoing messages. |
| */ |
| @Test(timeout = 2000) |
| public void testSendMessageWithQueueDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception { |
| Class<? extends Destination> destType = Queue.class; |
| String destPrefix = "q-broker-provided-prefix-"; |
| String destName = "myQueue"; |
| String destAddress = destPrefix + destName; |
| Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE; |
| |
| doSendMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, destAddress, annotationValue); |
| } |
| |
| /** |
| * Tests that the a connection with a 'topic prefix' set on it via broker-provided connection |
| * properties adds the prefix to the content of the to/reply-to fields for outgoing messages. |
| */ |
| @Test(timeout = 2000) |
| public void testSendMessageWithTopicDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception { |
| Class<? extends Destination> destType = Topic.class; |
| String destPrefix = "t-broker-provided-prefix-"; |
| String destName = "myTopic"; |
| String destAddress = destPrefix + destName; |
| Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE; |
| |
| doSendMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, destAddress, annotationValue); |
| } |
| |
| private void doSendMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(Class<? extends Destination> destType, |
| String destPrefix, |
| String destName, |
| String destAddress, |
| Byte destTypeAnnotationValue) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| // Have the test peer provide the destination prefixes as connection properties |
| Map<Symbol, Object> properties = new HashMap<Symbol, Object>(); |
| properties.put(AmqpConnectionProperties.QUEUE_PREFIX, destPrefix); |
| properties.put(AmqpConnectionProperties.TOPIC_PREFIX, destPrefix); |
| |
| Connection connection = testFixture.establishConnecton(testPeer, null, null, properties); |
| |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| // Create the destination |
| Destination dest = null; |
| if (destType == Topic.class) { |
| dest = session.createTopic(destName); |
| } else if (destType == Queue.class) { |
| dest = session.createQueue(destName); |
| } else { |
| fail("non-temporary destination type set"); |
| } |
| |
| TargetMatcher targetMatcher = new TargetMatcher(); |
| targetMatcher.withAddress(equalTo(destAddress)); |
| |
| testPeer.expectSenderAttach(targetMatcher, false, false); |
| |
| MessageProducer producer = session.createProducer(dest); |
| |
| MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); |
| MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); |
| msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME), equalTo(destTypeAnnotationValue)); |
| msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME), equalTo(destTypeAnnotationValue)); |
| MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true); |
| propsMatcher.withTo(equalTo(destAddress)); |
| propsMatcher.withReplyTo(equalTo(destAddress)); |
| |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(headersMatcher); |
| messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); |
| messageMatcher.setPropertiesMatcher(propsMatcher); |
| |
| //TODO: currently we aren't sending any body section, decide if this is allowed |
| //messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null)); |
| testPeer.expectTransfer(messageMatcher); |
| |
| Message message = session.createMessage(); |
| message.setJMSReplyTo(dest); |
| |
| producer.send(message); |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| } |
| } |
| |
| // --- byte type annotation values --- // |
| |
| /** |
| * Tests that the {@link AmqpDestinationHelper#JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME} is set as a byte on |
| * a sent message to indicate its 'to' address represents a Topic JMSDestination. |
| */ |
| @Test(timeout = 5000) |
| public void testSentMessageContainsToTypeAnnotationByte() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| testPeer.expectBegin(true); |
| testPeer.expectSenderAttach(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String topicName = "myTopic"; |
| |
| MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); |
| MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); |
| Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME); |
| msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_TYPE)); |
| |
| MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(topicName)); |
| |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(headersMatcher); |
| messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); |
| messageMatcher.setPropertiesMatcher(propsMatcher); |
| |
| testPeer.expectTransfer(messageMatcher); |
| |
| Message message = session.createMessage(); |
| Topic topic = session.createTopic(topicName); |
| MessageProducer producer = session.createProducer(topic); |
| producer.send(message); |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| } |
| } |
| |
| /** |
| * Tests that the {@link AmqpDestinationHelper#JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} is set as a byte on |
| * a sent message to indicate its 'reply-to' address represents a Topic JMSDestination. |
| */ |
| @Test(timeout = 5000) |
| public void testSentMessageContainsReplyToTypeAnnotationByte() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| |
| testPeer.expectBegin(true); |
| testPeer.expectSenderAttach(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String queueName = "myQueue"; |
| String replyTopicName = "myReplyTopic"; |
| |
| MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); |
| MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); |
| Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME); |
| msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_TYPE)); |
| |
| MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withReplyTo(equalTo(replyTopicName)); |
| |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(headersMatcher); |
| messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); |
| messageMatcher.setPropertiesMatcher(propsMatcher); |
| |
| testPeer.expectTransfer(messageMatcher); |
| |
| Topic replyTopic = session.createTopic(replyTopicName); |
| Message message = session.createMessage(); |
| message.setJMSReplyTo(replyTopic); |
| |
| Queue queue = session.createQueue(queueName); |
| MessageProducer producer = session.createProducer(queue); |
| producer.send(message); |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| } |
| } |
| |
| // --- old string type annotation values --- // |
| |
| /** |
| * Tests that the {@link AmqpDestinationHelper#TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} set on a message to |
| * indicate its 'to' address represents a Topic results in the JMSDestination object being a |
| * Topic. Ensure the consumers destination is not used by consuming from a Queue. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageFromQueueWithToTypeAnnotationForTopic() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType(); |
| msgAnnotations.setSymbolKeyedAnnotation(AmqpDestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, AmqpDestinationHelper.TOPIC_ATTRIBUTE); |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| String myTopicAddress = "myTopicAddress"; |
| props.setTo(myTopicAddress ); |
| props.setMessageId("myMessageIDString"); |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| assertNotNull(receivedMessage); |
| |
| Destination dest = receivedMessage.getJMSDestination(); |
| assertNotNull("Expected Topic destination but got null", dest); |
| assertTrue("Expected Topic instance but did not get one. Actual type was: " + dest.getClass().getName(), dest instanceof Topic); |
| assertEquals(myTopicAddress, ((Topic)dest).getTopicName()); |
| } |
| } |
| |
| /** |
| * Tests that the {@link AmqpDestinationHelper#REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} set on a message to |
| * indicate its 'reply-to' address represents a Topic results in the JMSReplyTo object being a |
| * Topic. Ensure the consumers destination is not used by consuming from a Queue. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageFromQueueWithReplyToTypeAnnotationForTopic() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType(); |
| msgAnnotations.setSymbolKeyedAnnotation(AmqpDestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, AmqpDestinationHelper.TOPIC_ATTRIBUTE); |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| String myTopicAddress = "myTopicAddress"; |
| props.setReplyTo(myTopicAddress); |
| props.setMessageId("myMessageIDString"); |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| assertNotNull(receivedMessage); |
| |
| Destination dest = receivedMessage.getJMSReplyTo(); |
| assertTrue(dest instanceof Topic); |
| assertEquals(myTopicAddress, ((Topic)dest).getTopicName()); |
| } |
| } |
| |
| /** |
| * Tests that lack of the {@link AmqpMessageSupport#AMQP_REPLY_TO_ANNOTATION} set on a |
| * message to indicate type of its 'reply-to' address results in it being classed as the same |
| * type as the destination used to create the consumer. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageFromQueueWithReplyToWithoutTypeAnnotationResultsInUseOfConsumerDestinationType() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| String myOtherQueueAddress = "myOtherQueueAddress"; |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| props.setReplyTo(myOtherQueueAddress); |
| props.setMessageId("myMessageIDString"); |
| |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| assertNotNull(receivedMessage); |
| |
| Destination dest = receivedMessage.getJMSReplyTo(); |
| assertTrue(dest instanceof Queue); |
| assertEquals(myOtherQueueAddress, ((Queue)dest).getQueueName()); |
| } |
| } |
| |
| /** |
| * Tests that lack of the reply-to set on a message results in it returning null for JMSReplyTo |
| * and not the consumer destination as happens for JMSDestination. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageFromQueueWithNoReplyToReturnsNull() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| props.setMessageId("myMessageIDString"); |
| |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| assertNotNull(receivedMessage); |
| assertNull(receivedMessage.getJMSReplyTo()); |
| } |
| } |
| |
| //==== TTL / Expiration Handling ==== |
| //=================================== |
| |
| /** |
| * Tests that lack of the absolute-expiry-time and ttl fields on a message results |
| * in it returning 0 for for JMSExpiration |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageFromQueueWithNoAbsoluteExpiryOrTtlReturnsJMSExpirationZero() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| props.setMessageId("myMessageIDString"); |
| |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| assertNotNull(receivedMessage); |
| assertEquals(0L, receivedMessage.getJMSExpiration()); |
| } |
| } |
| |
| /** |
| * Tests that setting a non-zero value in the absolute-expiry-time field on a |
| * message results in it returning this value for JMSExpiration |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageFromQueueWithAbsoluteExpiryReturnsJMSExpirationNonZero() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| long timestamp = System.currentTimeMillis(); |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| props.setAbsoluteExpiryTime(new Date(timestamp)); |
| props.setMessageId("myMessageIDString"); |
| |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| assertNotNull(receivedMessage); |
| assertEquals(timestamp, receivedMessage.getJMSExpiration()); |
| } |
| } |
| |
| //==== MessageID and CorrelationID Handling ==== |
| //============================================== |
| |
| @Test(timeout = 2000) |
| public void testReceiveMessageWithoutMessageId() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(2000); |
| |
| assertNull(receivedMessage.getJMSMessageID()); |
| } |
| } |
| |
| /** |
| * Tests that receiving a message with a string typed message-id results in returning the |
| * expected value for JMSMessageId where the JMS "ID:" prefix has been added. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithStringMessageIdReturnsExpectedJMSMessageID() throws Exception { |
| receivedMessageWithMessageIdTestImpl("myTestMessageIdString"); |
| } |
| |
| /** |
| * Tests that receiving a message with a UUID typed message-id results in returning the |
| * expected value for JMSMessageId where the JMS "ID:" prefix has been added to the UUID.tostring() |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithUUIDMessageIdReturnsExpectedJMSMessageID() throws Exception { |
| receivedMessageWithMessageIdTestImpl(UUID.randomUUID()); |
| } |
| |
| /** |
| * Tests that receiving a message with a ulong typed message-id results in returning the |
| * expected value for JMSMessageId where the JMS "ID:" prefix has been added to the UnsignedLong.tostring() |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithUnsignedLongMessageIdReturnsExpectedJMSMessageID() throws Exception { |
| receivedMessageWithMessageIdTestImpl(UnsignedLong.valueOf(123456789L)); |
| } |
| |
| /** |
| * Tests that receiving a message with a binary typed message-id results in returning the |
| * expected value for JMSMessageId where the JMS "ID:" prefix has been added to the hex representation of the binary. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithBinaryMessageIdReturnsExpectedJMSMessageID() throws Exception { |
| receivedMessageWithMessageIdTestImpl(new Binary(new byte[]{(byte)0x02, (byte)0x20, (byte) 0xAE, (byte) 0x00})); |
| } |
| |
| private void receivedMessageWithMessageIdTestImpl(Object underlyingAmqpMessageId) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| props.setMessageId(underlyingAmqpMessageId); |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| assertNotNull(receivedMessage); |
| |
| String expectedBaseIdString = new AmqpMessageIdHelper().toBaseMessageIdString(underlyingAmqpMessageId); |
| |
| assertEquals("ID:" + expectedBaseIdString, receivedMessage.getJMSMessageID()); |
| } |
| } |
| |
| /** |
| * Tests that receiving a message with a string typed correlation-id results in returning the |
| * expected value for JMSCorrelationID where the JMS "ID:" prefix has been added. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithStringCorrelationIdReturnsExpectedJMSCorrelationID() throws Exception { |
| receivedMessageWithCorrelationIdTestImpl("myTestCorrelationIdString", false); |
| } |
| |
| /** |
| * Tests that receiving a message with a string typed correlation-id, which is indicated to be an |
| * application-specific value, results in returning the expected value for JMSCorrelationID |
| * where the JMS "ID:" prefix has NOT been added. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithAppSpecificStringCorrelationIdReturnsExpectedJMSCorrelationID() throws Exception { |
| receivedMessageWithCorrelationIdTestImpl("myTestCorrelationIdString", true); |
| } |
| |
| /** |
| * Tests that receiving a message with a UUID typed correlation-id results in returning the |
| * expected value for JMSCorrelationID where the JMS "ID:" prefix has been added to the UUID.tostring() |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithUUIDCorrelationIdReturnsExpectedJMSCorrelationID() throws Exception { |
| receivedMessageWithCorrelationIdTestImpl(UUID.randomUUID(), false); |
| } |
| |
| /** |
| * Tests that receiving a message with a UUID typed correlation-id results in returning the |
| * expected value for JMSCorrelationID where the JMS "ID:" prefix has been added to the UUID.tostring() |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithLongCorrelationIdReturnsExpectedJMSCorrelationID() throws Exception { |
| receivedMessageWithCorrelationIdTestImpl(UnsignedLong.valueOf(123456789L), false); |
| } |
| |
| private void receivedMessageWithCorrelationIdTestImpl(Object correlationIdForAmqpMessageClass, boolean appSpecific) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| MessageAnnotationsDescribedType ann = null; |
| |
| props.setMessageId("myMessageIdString"); |
| props.setCorrelationId(correlationIdForAmqpMessageClass); |
| if (appSpecific) { |
| ann = new MessageAnnotationsDescribedType(); |
| ann.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_APP_CORRELATION_ID, true); |
| } |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, ann, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| assertNotNull(receivedMessage); |
| String expectedBaseIdString = new AmqpMessageIdHelper().toBaseMessageIdString(correlationIdForAmqpMessageClass); |
| String expected = expectedBaseIdString; |
| if (!appSpecific) { |
| expected = "ID:" + expected; |
| } |
| |
| assertEquals(expected, receivedMessage.getJMSCorrelationID()); |
| } |
| } |
| |
| /** |
| * Tests that sending a message with a uuid typed correlation-id value which is a |
| * message-id results in an AMQP message with the expected encoding of the correlation-id, |
| * where the type is uuid, the "ID:" prefix of the JMSCorrelationID value is (obviously) not present, and there is |
| * no presence of the message annotation to indicate an app-specific correlation-id. |
| */ |
| @Test(timeout = 2000) |
| public void testSentMessageWithUUIDCorrelationId() throws Exception { |
| UUID uuid = UUID.randomUUID(); |
| String stringCorrelationId = AmqpMessageIdHelper.JMS_ID_PREFIX + AmqpMessageIdHelper.AMQP_UUID_PREFIX + uuid.toString(); |
| sentMessageWithCorrelationIdTestImpl(stringCorrelationId, uuid, false); |
| } |
| |
| /** |
| * Tests that sending a message with a binary typed correlation-id value which is a |
| * message-id results in an AMQP message with the expected encoding of the correlation-id, |
| * where the type is binary, the "ID:" prefix of the JMSCorrelationID value is (obviously) not present, and there is |
| * no presence of the message annotation to indicate an app-specific correlation-id. |
| */ |
| @Test(timeout = 2000) |
| public void testSentMessageWithBinaryCorrelationId() throws Exception |
| { |
| Binary bin = new Binary(new byte[]{(byte)0x01, (byte)0x23, (byte) 0xAF, (byte) 0x00}); |
| String stringCorrelationId = AmqpMessageIdHelper.JMS_ID_PREFIX + AmqpMessageIdHelper.AMQP_BINARY_PREFIX + "0123af00"; |
| sentMessageWithCorrelationIdTestImpl(stringCorrelationId, bin, false); |
| } |
| |
| /** |
| * Tests that sending a message with a ulong typed correlation-id value which is a |
| * message-id results in an AMQP message with the expected encoding of the correlation-id, |
| * where the type is ulong, the "ID:" prefix of the JMSCorrelationID value is (obviously) not present, and there is |
| * no presence of the message annotation to indicate an app-specific correlation-id. |
| */ |
| @Test(timeout = 2000) |
| public void testSentMessageWithUlongCorrelationId() throws Exception { |
| UnsignedLong ulong = UnsignedLong.valueOf(Long.MAX_VALUE); |
| String stringCorrelationId = AmqpMessageIdHelper.JMS_ID_PREFIX + AmqpMessageIdHelper.AMQP_ULONG_PREFIX + ulong.toString(); |
| sentMessageWithCorrelationIdTestImpl(stringCorrelationId, ulong, false); |
| } |
| |
| /** |
| * Tests that sending a message with a string typed correlation-id value which is a |
| * message-id results in an AMQP message with the expected encoding of the correlation-id, |
| * where the "ID:" prefix of the JMSCorrelationID value is not present, and there is |
| * no presence of the message annotation to indicate an app-specific correlation-id. |
| */ |
| @Test(timeout = 2000) |
| public void testSentMessageWithStringCorrelationId() throws Exception { |
| String stringCorrelationId = "ID:myTestMessageIdString"; |
| String underlyingCorrelationId = "myTestMessageIdString"; |
| sentMessageWithCorrelationIdTestImpl(stringCorrelationId, underlyingCorrelationId, false); |
| } |
| |
| /** |
| * Tests that sending a message with a string typed correlation-id value which is a |
| * app-specific results in an AMQP message with the expected encoding of the correlation-id, |
| * and the presence of the message annotation to indicate an app-specific correlation-id. |
| */ |
| @Test(timeout = 2000) |
| public void testSentMessageWithAppSpecificStringCorrelationId() throws Exception { |
| String stringCorrelationId = "myTestAppSpecificString"; |
| sentMessageWithCorrelationIdTestImpl(stringCorrelationId, stringCorrelationId, true); |
| } |
| |
| private void sentMessageWithCorrelationIdTestImpl(String stringCorrelationId, Object correlationIdForAmqpMessageClass, boolean appSpecific) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| testPeer.expectBegin(true); |
| testPeer.expectSenderAttach(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String queueName = "myQueue"; |
| Queue queue = session.createQueue(queueName); |
| MessageProducer producer = session.createProducer(queue); |
| |
| MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); |
| MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); |
| MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true); |
| |
| //Set matcher to validate the correlation-id, and the annotation |
| //presence+value if it is application-specific |
| propsMatcher.withCorrelationId(equalTo(correlationIdForAmqpMessageClass)); |
| if (appSpecific) { |
| msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpMessageSupport.JMS_APP_CORRELATION_ID), equalTo(Boolean.TRUE)); |
| } |
| |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(headersMatcher); |
| messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); |
| messageMatcher.setPropertiesMatcher(propsMatcher); |
| messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null)); |
| testPeer.expectTransfer(messageMatcher); |
| |
| Message message = session.createTextMessage(); |
| message.setJMSCorrelationID(stringCorrelationId); |
| |
| producer.send(message); |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| //validate the annotation was not present if the value was a message-id |
| if (!appSpecific) { |
| assertFalse(msgAnnotationsMatcher.keyExistsInReceivedAnnotations(Symbol.valueOf(AmqpMessageSupport.JMS_APP_CORRELATION_ID))); |
| } |
| } |
| } |
| |
| /** |
| * Tests that receiving a message with a string typed message-id, and then sending a message which |
| * uses the result of calling getJMSMessageID as the value for setJMSCorrelationId results in |
| * transmission of the expected AMQP message content. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithStringMessageIdAndSendValueAsCorrelationId() throws Exception { |
| recieveMessageIdSendCorrelationIdTestImpl("myStringMessageId"); |
| } |
| |
| /** |
| * Tests that receiving a message with a UUID typed message-id, and then sending a message which |
| * uses the result of calling getJMSMessageID as the value for setJMSCorrelationId results in |
| * transmission of the expected AMQP message content. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithUUIDMessageIdAndSendValueAsCorrelationId() throws Exception { |
| recieveMessageIdSendCorrelationIdTestImpl(UUID.randomUUID()); |
| } |
| |
| /** |
| * Tests that receiving a message with a ulong typed message-id, and then sending a message which |
| * uses the result of calling getJMSMessageID as the value for setJMSCorrelationId results in |
| * transmission of the expected AMQP message content. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithUlongMessageIdAndSendValueAsCorrelationId() throws Exception { |
| recieveMessageIdSendCorrelationIdTestImpl(UnsignedLong.valueOf(123456789L)); |
| } |
| |
| /** |
| * Tests that receiving a message with a binary typed message-id, and then sending a message which |
| * uses the result of calling getJMSMessageID as the value for setJMSCorrelationId results in |
| * transmission of the expected AMQP message content. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithBinaryMessageIdAndSendValueAsCorrelationId() throws Exception { |
| recieveMessageIdSendCorrelationIdTestImpl(new Binary(new byte[]{(byte)0x00, (byte)0xCD, (byte) 0xEF, (byte) 0x01})); |
| } |
| |
| private void recieveMessageIdSendCorrelationIdTestImpl(Object idForAmqpMessageClass) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| props.setMessageId(idForAmqpMessageClass); |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| assertNotNull(receivedMessage); |
| |
| String expectedBaseIdString = new AmqpMessageIdHelper().toBaseMessageIdString(idForAmqpMessageClass); |
| |
| String jmsMessageID = receivedMessage.getJMSMessageID(); |
| assertEquals("ID:" + expectedBaseIdString, jmsMessageID); |
| |
| //Now take the received JMSMessageID, and send a message with it set |
| //as the JMSCorrelationID and verify we get the same AMQP id as we started with. |
| |
| testPeer.expectSenderAttach(); |
| MessageProducer producer = session.createProducer(queue); |
| |
| MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); |
| MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); |
| MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true); |
| |
| //Set matcher to validate the correlation-id matches the previous message-id |
| propsMatcher.withCorrelationId(equalTo(idForAmqpMessageClass)); |
| |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(headersMatcher); |
| messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); |
| messageMatcher.setPropertiesMatcher(propsMatcher); |
| messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null)); |
| testPeer.expectTransfer(messageMatcher); |
| |
| Message message = session.createTextMessage(); |
| message.setJMSCorrelationID(jmsMessageID); |
| |
| producer.send(message); |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| } |
| } |
| |
| //==== Group Property Handling ==== |
| //================================= |
| |
| /** |
| * Tests that when receiving a message with the group-id, reply-to-group-id, and group-sequence |
| * fields of the AMQP properties section set, that the expected JMSX or JMS_AMQP properties |
| * are present, and the expected values are returned when retrieved from the JMS message. |
| */ |
| @Test(timeout = 2000) |
| public void testReceivedMessageWithGroupRelatedPropertiesSet() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| MessageAnnotationsDescribedType ann = null; |
| |
| String expectedGroupId = "myGroupId123"; |
| int expectedGroupSeq = 1; |
| String expectedReplyToGroupId = "myReplyToGroupId456"; |
| |
| props.setGroupId(expectedGroupId); |
| props.setGroupSequence(UnsignedInteger.valueOf(expectedGroupSeq)); |
| props.setReplyToGroupId(expectedReplyToGroupId); |
| props.setMessageId("myMessageIDString"); |
| |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlowRespondWithTransfer(null, ann, props, null, amqpValueNullContent); |
| testPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| Message receivedMessage = messageConsumer.receive(1000); |
| testPeer.waitForAllHandlersToComplete(3000); |
| |
| assertNotNull("did not receive the message", receivedMessage); |
| |
| boolean foundGroupId = false; |
| boolean foundGroupSeq = false; |
| boolean foundReplyToGroupId = false; |
| |
| Enumeration<?> names = receivedMessage.getPropertyNames(); |
| assertTrue("Message had no property names", names.hasMoreElements()); |
| while (names.hasMoreElements()) { |
| Object element = names.nextElement(); |
| |
| if (JmsClientProperties.JMSXGROUPID.equals(element)) { |
| foundGroupId = true; |
| } |
| |
| if (JmsClientProperties.JMSXGROUPSEQ.equals(element)) { |
| foundGroupSeq = true; |
| } |
| |
| if (AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID.equals(element)) { |
| foundReplyToGroupId = true; |
| } |
| } |
| |
| assertTrue("JMSXGroupID not in property names", foundGroupId); |
| assertTrue("JMSXGroupSeq not in property names", foundGroupSeq); |
| assertTrue("JMS_AMQP_REPLY_TO_GROUP_ID not in property names", foundReplyToGroupId); |
| |
| assertTrue("JMSXGroupID does not exist", receivedMessage.propertyExists(JmsClientProperties.JMSXGROUPID)); |
| assertTrue("JMSXGroupSeq does not exist", receivedMessage.propertyExists(JmsClientProperties.JMSXGROUPSEQ)); |
| assertTrue("JMS_AMQP_REPLY_TO_GROUP_ID does not exist", receivedMessage.propertyExists(AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID)); |
| |
| assertEquals("did not get the expected JMSXGroupID", expectedGroupId, receivedMessage.getStringProperty(JmsClientProperties.JMSXGROUPID)); |
| assertEquals("did not get the expected JMSXGroupSeq", expectedGroupSeq, receivedMessage.getIntProperty(JmsClientProperties.JMSXGROUPSEQ)); |
| assertEquals("did not get the expected JMS_AMQP_REPLY_TO_GROUP_ID", expectedReplyToGroupId, receivedMessage.getStringProperty(AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID)); |
| } |
| } |
| |
| /** |
| * Tests that when sending a message with the JMSXGroupID, JMSXGroupSeq, and JMS_AMQP_REPLY_TO_GROUP_ID |
| * properties of the JMS message set, that the expected values are included in the fields of |
| * the AMQP message emitted. |
| */ |
| @Test(timeout = 2000) |
| public void testSendMessageWithGroupRelatedPropertiesSet() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { |
| Connection connection = testFixture.establishConnecton(testPeer); |
| testPeer.expectBegin(true); |
| testPeer.expectSenderAttach(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String queueName = "myQueue"; |
| Queue queue = session.createQueue(queueName); |
| MessageProducer producer = session.createProducer(queue); |
| |
| MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); |
| MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); |
| |
| String expectedGroupId = "myGroupId123"; |
| int expectedGroupSeq = 1; |
| String expectedReplyToGroupId = "myReplyToGroupId456"; |
| |
| MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true); |
| propsMatcher.withGroupId(equalTo(expectedGroupId)); |
| propsMatcher.withReplyToGroupId(equalTo(expectedReplyToGroupId)); |
| propsMatcher.withGroupSequence(equalTo(UnsignedInteger.valueOf(expectedGroupSeq))); |
| |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(headersMatcher); |
| messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); |
| messageMatcher.setPropertiesMatcher(propsMatcher); |
| messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null)); |
| testPeer.expectTransfer(messageMatcher); |
| |
| Message message = session.createTextMessage(); |
| message.setStringProperty(JmsClientProperties.JMSXGROUPID, expectedGroupId); |
| message.setIntProperty(JmsClientProperties.JMSXGROUPSEQ, expectedGroupSeq); |
| message.setStringProperty(AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID, expectedReplyToGroupId); |
| |
| producer.send(message); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| } |