blob: f6cb67177d9ac1fe8d2a6e0a0429e03c6d76979a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import org.apache.qpid.jms.JmsClientProperties;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.provider.amqp.AmqpConnectionProperties;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageIdHelper;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.ApplicationPropertiesSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.junit.Test;
public class MessageIntegrationTest extends QpidJmsTestCase
{
private static final String NULL_STRING_PROP = "nullStringProperty";
private static final String NULL_STRING_PROP_VALUE = null;
private static final String STRING_PROP = "stringProperty";
private static final String STRING_PROP_VALUE = "string";
private static final String BOOLEAN_PROP = "booleanProperty";
private static final boolean BOOLEAN_PROP_VALUE = true;
private static final String BYTE_PROP = "byteProperty";
private static final byte BYTE_PROP_VALUE = (byte)1;
private static final String SHORT_PROP = "shortProperty";
private static final short SHORT_PROP_VALUE = (short)1;
private static final String INT_PROP = "intProperty";
private static final int INT_PROP_VALUE = Integer.MAX_VALUE;
private static final String LONG_PROP = "longProperty";
private static final long LONG_PROP_VALUE = Long.MAX_VALUE;
private static final String FLOAT_PROP = "floatProperty";
private static final float FLOAT_PROP_VALUE = Float.MAX_VALUE;
private static final String DOUBLE_PROP = "doubleProperty";
private static final double DOUBLE_PROP_VALUE = Double.MAX_VALUE;
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
//==== Application Properties Section ====
//========================================
@Test(timeout = 2000)
public void testSendMessageWithApplicationProperties() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin(true);
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
ApplicationPropertiesSectionMatcher appPropsMatcher = new ApplicationPropertiesSectionMatcher(true);
appPropsMatcher.withEntry(NULL_STRING_PROP, nullValue());
appPropsMatcher.withEntry(STRING_PROP, equalTo(STRING_PROP_VALUE));
appPropsMatcher.withEntry(BOOLEAN_PROP, equalTo(BOOLEAN_PROP_VALUE));
appPropsMatcher.withEntry(BYTE_PROP, equalTo(BYTE_PROP_VALUE));
appPropsMatcher.withEntry(SHORT_PROP, equalTo(SHORT_PROP_VALUE));
appPropsMatcher.withEntry(INT_PROP, equalTo(INT_PROP_VALUE));
appPropsMatcher.withEntry(LONG_PROP, equalTo(LONG_PROP_VALUE));
appPropsMatcher.withEntry(FLOAT_PROP, equalTo(FLOAT_PROP_VALUE));
appPropsMatcher.withEntry(DOUBLE_PROP, equalTo(DOUBLE_PROP_VALUE));
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(queueName));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setApplicationPropertiesMatcher(appPropsMatcher);
//TODO: currently we aren't sending any body section, decide if this is allowed
//messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
testPeer.expectTransfer(messageMatcher);
Message message = session.createMessage();
message.setStringProperty(NULL_STRING_PROP, NULL_STRING_PROP_VALUE);
message.setStringProperty(STRING_PROP, STRING_PROP_VALUE);
message.setBooleanProperty(BOOLEAN_PROP, BOOLEAN_PROP_VALUE);
message.setByteProperty(BYTE_PROP, BYTE_PROP_VALUE);
message.setShortProperty(SHORT_PROP, SHORT_PROP_VALUE);
message.setIntProperty(INT_PROP, INT_PROP_VALUE);
message.setLongProperty(LONG_PROP, LONG_PROP_VALUE);
message.setFloatProperty(FLOAT_PROP, FLOAT_PROP_VALUE);
message.setDoubleProperty(DOUBLE_PROP, DOUBLE_PROP_VALUE);
producer.send(message);
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 2000)
public void testReceiveMessageWithApplicationProperties() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
PropertiesDescribedType props = new PropertiesDescribedType();
props.setMessageId("myMessageIDString");
ApplicationPropertiesDescribedType appProperties = new ApplicationPropertiesDescribedType();
appProperties.setApplicationProperty(STRING_PROP, STRING_PROP_VALUE);
appProperties.setApplicationProperty(NULL_STRING_PROP, NULL_STRING_PROP_VALUE);
appProperties.setApplicationProperty(BOOLEAN_PROP, BOOLEAN_PROP_VALUE);
appProperties.setApplicationProperty(BYTE_PROP, BYTE_PROP_VALUE);
appProperties.setApplicationProperty(SHORT_PROP, SHORT_PROP_VALUE);
appProperties.setApplicationProperty(INT_PROP, INT_PROP_VALUE);
appProperties.setApplicationProperty(LONG_PROP, LONG_PROP_VALUE);
appProperties.setApplicationProperty(FLOAT_PROP, FLOAT_PROP_VALUE);
appProperties.setApplicationProperty(DOUBLE_PROP, DOUBLE_PROP_VALUE);
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, appProperties, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(3000);
assertTrue(receivedMessage.propertyExists(STRING_PROP));
assertTrue(receivedMessage.propertyExists(NULL_STRING_PROP));
assertTrue(receivedMessage.propertyExists(BYTE_PROP));
assertTrue(receivedMessage.propertyExists(BOOLEAN_PROP));
assertTrue(receivedMessage.propertyExists(SHORT_PROP));
assertTrue(receivedMessage.propertyExists(INT_PROP));
assertTrue(receivedMessage.propertyExists(LONG_PROP));
assertTrue(receivedMessage.propertyExists(FLOAT_PROP));
assertTrue(receivedMessage.propertyExists(DOUBLE_PROP));
assertNull(receivedMessage.getStringProperty(NULL_STRING_PROP));
assertEquals(STRING_PROP_VALUE, receivedMessage.getStringProperty(STRING_PROP));
assertEquals(STRING_PROP_VALUE, receivedMessage.getStringProperty(STRING_PROP));
assertEquals(BOOLEAN_PROP_VALUE, receivedMessage.getBooleanProperty(BOOLEAN_PROP));
assertEquals(BYTE_PROP_VALUE, receivedMessage.getByteProperty(BYTE_PROP));
assertEquals(SHORT_PROP_VALUE, receivedMessage.getShortProperty(SHORT_PROP));
assertEquals(INT_PROP_VALUE, receivedMessage.getIntProperty(INT_PROP));
assertEquals(LONG_PROP_VALUE, receivedMessage.getLongProperty(LONG_PROP));
assertEquals(FLOAT_PROP_VALUE, receivedMessage.getFloatProperty(FLOAT_PROP), 0.0);
assertEquals(DOUBLE_PROP_VALUE, receivedMessage.getDoubleProperty(DOUBLE_PROP), 0.0);
}
}
//==== Destination Handling ====
//==============================
/**
* Tests that the lack of a 'to' in the Properties section of the incoming message (e.g
* one sent by a non-JMS client) is handled by making the JMSDestination method simply
* return the Queue Destination used to create the consumer that received the message.
*/
@Test(timeout = 2000)
public void testReceivedMessageFromQueueWithoutToResultsInUseOfConsumerDestinationQueue() throws Exception {
receivedMessageFromQueueWithoutToResultsInUseOfConsumerDestinationImpl(true);
}
/**
* Tests that the lack of a 'to' in the Properties section of the incoming message (e.g
* one sent by a non-JMS client) is handled by making the JMSDestination method simply
* return the Topic Destination used to create the consumer that received the message.
*/
@Test(timeout = 2000)
public void testReceivedMessageFromQueueWithoutToResultsInUseOfConsumerDestinationTopic() throws Exception {
receivedMessageFromQueueWithoutToResultsInUseOfConsumerDestinationImpl(false);
}
public void receivedMessageFromQueueWithoutToResultsInUseOfConsumerDestinationImpl(boolean useQueue) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
String queueName = "myQueue";
String topicName = "myTopic";
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = null;
if (useQueue) {
destination = session.createQueue(queueName);
} else {
destination = session.createTopic(topicName);
}
PropertiesDescribedType props = new PropertiesDescribedType();
props.setMessageId("myMessageIDString");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(destination);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
Destination dest = receivedMessage.getJMSDestination();
if (useQueue) {
assertNotNull("expected Queue instance, got null", dest);
assertTrue("expected Queue instance. Actual type was: " + dest.getClass().getName(), dest instanceof Queue);
assertEquals(queueName, ((Queue) dest).getQueueName());
} else {
assertNotNull("expected Topic instance, got null", dest);
assertTrue("expected Topic instance. Actual type was: " + dest.getClass().getName(), dest instanceof Topic);
assertEquals(topicName, ((Topic) dest).getTopicName());
}
}
}
/**
* Tests that the a connection with a 'topic prefix' set on it strips the
* prefix from the content of the to/reply-to fields for incoming messages.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithTopicDestinationsOnConnectionWithTopicPrefix() throws Exception {
Class<? extends Destination> destType = Topic.class;
String destPrefix = "t12321-";
String destName = "myTopic";
String replyName = "myReplyTopic";
String destAddress = destPrefix + destName;
String replyAddress = destPrefix + replyName;
String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE;
String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
Byte replyAnnotationValue = AmqpDestinationHelper.TOPIC_TYPE;
doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
destAddress, replyAddress, annotationName,
annotationValue, replyAnnotationName, replyAnnotationValue);
}
/**
* Tests that the a connection with a 'topic prefix' set on it strips the
* prefix from the content of the to/reply-to fields for incoming messages
* if they don't have the 'destination type annotation' set.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithNoTypeAnnotationAndTopicDestinationsOnConnectionWithTopicPrefix() throws Exception {
Class<? extends Destination> destType = Topic.class;
String destPrefix = "t12321-";
String destName = "myTopic";
String replyName = "myReplyTopic";
String destAddress = destPrefix + destName;
String replyAddress = destPrefix + replyName;
String annotationName = null;
Byte annotationValue = null;
String replyAnnotationName = null;
Byte replyAnnotationValue = null;
doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
destAddress, replyAddress, annotationName,
annotationValue, replyAnnotationName, replyAnnotationValue);
}
/**
* Tests that the a connection with a 'queue prefix' set on it strips the
* prefix from the content of the to/reply-to fields for incoming messages.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithQueueDestinationsOnConnectionWithQueuePrefix() throws Exception {
Class<? extends Destination> destType = Queue.class;
String destPrefix = "q12321-";
String destName = "myQueue";
String replyName = "myReplyQueue";
String destAddress = destPrefix + destName;
String replyAddress = destPrefix + replyName;
String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE;
String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
Byte replyAnnotationValue = AmqpDestinationHelper.QUEUE_TYPE;
doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
destAddress, replyAddress, annotationName,
annotationValue, replyAnnotationName, replyAnnotationValue);
}
/**
* Tests that the a connection with a 'queue prefix' set on it strips the
* prefix from the content of the to/reply-to fields for incoming messages
* if they don't have the 'destination type annotation' set.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithNoTypeAnnotationAndQueueDestinationsOnConnectionWithQueuePrefix() throws Exception {
Class<? extends Destination> destType = Queue.class;
String destPrefix = "q12321-";
String destName = "myQueue";
String replyName = "myReplyQueue";
String destAddress = destPrefix + destName;
String replyAddress = destPrefix + replyName;
String annotationName = null;
Byte annotationValue = null;
String replyAnnotationName = null;
Byte replyAnnotationValue = null;
doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
destAddress, replyAddress, annotationName,
annotationValue, replyAnnotationName, replyAnnotationValue);
}
/**
* Tests that a connection with a 'prefixes' set on its does not alter the
* address for a temporary queue in the to/reply-to fields for incoming messages.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithTemporaryQueueDestinationsOnConnectionWithPrefixes() throws Exception {
Class<? extends Destination> destType = TemporaryQueue.class;
String destPrefix = "q12321-";
String destName = "temp-queue://myTempQueue";
String replyName = "temp-queue://myReplyTempQueue";
String destAddress = destName; // We won't manipulate the temporary addresses generated by the broker
String replyAddress = replyName; // We won't manipulate the temporary addresses generated by the broker
String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
Byte annotationValue = AmqpDestinationHelper.TEMP_QUEUE_TYPE;
String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
Byte replyAnnotationValue = AmqpDestinationHelper.TEMP_QUEUE_TYPE;
doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
destAddress, replyAddress, annotationName,
annotationValue, replyAnnotationName, replyAnnotationValue);
}
/**
* Tests that a connection with a 'prefixes' set on its does not alter the
* address for a temporary queue in the to/reply-to fields for incoming messages.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithTemporaryTopicDestinationsOnConnectionWithPrefixes() throws Exception {
Class<? extends Destination> destType = TemporaryTopic.class;
String destPrefix = "q12321-";
String destName = "temp-topic://myTempTopic";
String replyName = "temp-topic://myReplyTempTopic";
String destAddress = destName; // We won't manipulate the temporary addresses generated by the broker
String replyAddress = replyName; // We won't manipulate the temporary addresses generated by the broker
String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
Byte annotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE;
String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
Byte replyAnnotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE;
doReceivedMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, replyName,
destAddress, replyAddress, annotationName,
annotationValue, replyAnnotationName, replyAnnotationValue);
}
private void doReceivedMessageOnConnectionWithPrefixTestImpl(Class<? extends Destination> destType,
String destPrefix,
String destName,
String replyName,
String destAddress,
String replyAddress,
String annotationName,
Object annotationValue,
String replyAnnotationName,
Object replyAnnotationValue) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = null;
if (destType == Topic.class) {
connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix);
} else if (destType == Queue.class) {
connection = testFixture.establishConnecton(testPeer, "?jms.queuePrefix=" + destPrefix);
} else {
//Set both the non-temporary prefixes, we wont use non-temp dests but want to ensure they don't affect anything
connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix + "&jms.queuePrefix=" + destPrefix);
}
connection.start();
// Set the prefix if Topic or Queue dest type.
if (destType == Topic.class) {
((JmsConnection) connection).setTopicPrefix(destPrefix);
} else if (destType == Queue.class) {
((JmsConnection) connection).setQueuePrefix(destPrefix);
}
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination
Destination dest = null;
if (destType == Topic.class) {
dest= session.createTopic(destName);
} else if (destType == Queue.class) {
dest = session.createQueue(destName);
} else if (destType == TemporaryTopic.class) {
//TODO:add method to expect temp topic creation
testPeer.expectTempQueueCreationAttach(destAddress);
dest = session.createTemporaryTopic();
} else if (destType == TemporaryQueue.class) {
testPeer.expectTempQueueCreationAttach(destAddress);
dest = session.createTemporaryQueue();
}
MessageAnnotationsDescribedType msgAnnotations = null;
if (annotationName != null || replyAnnotationName != null) {
msgAnnotations = new MessageAnnotationsDescribedType();
if (annotationName != null) {
msgAnnotations.setSymbolKeyedAnnotation(annotationName, annotationValue);
}
if (replyAnnotationName != null) {
msgAnnotations.setSymbolKeyedAnnotation(replyAnnotationName, replyAnnotationValue);
}
}
PropertiesDescribedType props = new PropertiesDescribedType();
props.setTo(destAddress);
props.setReplyTo(replyAddress);
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
SourceMatcher sourceMatcher = new SourceMatcher();
sourceMatcher.withAddress(equalTo(destAddress));
testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(dest);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(2000);
assertNotNull(receivedMessage);
Destination jmsDest = receivedMessage.getJMSDestination();
Destination jmsReplyTo = receivedMessage.getJMSReplyTo();
assertNotNull("Expected JMSDestination but got null", jmsDest);
assertNotNull("Expected JMSReplyTo but got null", jmsReplyTo);
// Verify destination/replyto names on received message
String recievedName = null;
String recievedReplyName = null;
if (destType == Topic.class || destType == TemporaryTopic.class) {
recievedName = ((Topic) jmsDest).getTopicName();
recievedReplyName = ((Topic) jmsReplyTo).getTopicName();
} else if (destType == Queue.class || destType == TemporaryQueue.class) {
recievedName = ((Queue) jmsDest).getQueueName();
recievedReplyName = ((Queue) jmsReplyTo).getQueueName();
}
assertEquals("Unexpected name for JMSDestination", destName, recievedName);
assertEquals("Unexpected name for JMSReplyTo", replyName, recievedReplyName);
if (destType == TemporaryQueue.class || destType == TemporaryTopic.class) {
assertEquals("Temporary destination name and address should be equal", destName, destAddress);
assertEquals("Temporary replyto name and address should be equal", replyName, replyAddress);
}
}
}
/**
* Tests that the a connection with a 'topic prefix' set on it adds the
* prefix to the content of the to/reply-to fields for outgoing messages.
*/
@Test(timeout = 2000)
public void testSendMessageWithTopicDestinationsOnConnectionWithTopicPrefix() throws Exception {
Class<? extends Destination> destType = Topic.class;
String destPrefix = "t12321-";
String destName = "myTopic";
String destAddress = destPrefix + destName;
Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE;
doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress, annotationValue);
}
/**
* Tests that the a connection with a 'queue prefix' set on it adds the
* prefix to the content of the to/reply-to fields for outgoing messages.
*/
@Test(timeout = 2000)
public void testSendMessageWithQueueDestinationsOnConnectionWithQueuePrefix() throws Exception {
Class<? extends Destination> destType = Queue.class;
String destPrefix = "q12321-";
String destName = "myQueue";
String destAddress = destPrefix + destName;
Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE;
doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress, annotationValue);
}
/**
* Tests that the a connection with 'destination prefixes' set on it does not add
* the prefix to the content of the to/reply-to fields for TemporaryQueues.
*/
@Test(timeout = 2000)
public void testSendMessageWithTemporaryQueueDestinationsOnConnectionWithDestinationPrefixes() throws Exception {
Class<? extends Destination> destType = TemporaryQueue.class;
String destPrefix = "q12321-";
String destName = null;
String destAddress = "temp-queue://myTempQueue";
Byte annotationValue = AmqpDestinationHelper.TEMP_QUEUE_TYPE;
doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress, annotationValue);
}
/**
* Tests that the a connection with 'destination prefixes' set on it does not add
* the prefix to the content of the to/reply-to fields for TemporaryTopics.
*/
@Test(timeout = 2000)
public void testSendMessageWithTemporaryTopicDestinationsOnConnectionWithDestinationPrefixes() throws Exception {
Class<? extends Destination> destType = TemporaryTopic.class;
String destPrefix = "q12321-";
String destName = null;
String destAddress = "temp-topic://myTempTopic";
Byte annotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE;
doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress, annotationValue);
}
private void doSendMessageOnConnectionWithPrefixTestImpl(Class<? extends Destination> destType,
String destPrefix,
String destName,
String destAddress,
Byte destTypeAnnotationValue) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = null;
if (destType == Topic.class) {
connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix);
} else if (destType == Queue.class) {
connection = testFixture.establishConnecton(testPeer, "?jms.queuePrefix=" + destPrefix);
} else {
// Set both the non-temporary prefixes, we wont use non-temp dests but want to ensure they don't affect anything
connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix + "&jms.queuePrefix=" + destPrefix);
}
connection.start();
// Set the prefix if Topic or Queue dest type.
if (destType == Topic.class) {
((JmsConnection) connection).setTopicPrefix(destPrefix);
} else if (destType == Queue.class) {
((JmsConnection) connection).setQueuePrefix(destPrefix);
}
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination
Destination dest = null;
if (destType == Topic.class) {
dest = session.createTopic(destName);
} else if (destType == Queue.class) {
dest = session.createQueue(destName);
} else if (destType == TemporaryTopic.class) {
// TODO:add method to expect temp topic creation
testPeer.expectTempQueueCreationAttach(destAddress);
dest = session.createTemporaryTopic();
} else if (destType == TemporaryQueue.class) {
testPeer.expectTempQueueCreationAttach(destAddress);
dest = session.createTemporaryQueue();
}
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(destAddress));
testPeer.expectSenderAttach(targetMatcher, false, false);
MessageProducer producer = session.createProducer(dest);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME), equalTo(destTypeAnnotationValue));
msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME), equalTo(destTypeAnnotationValue));
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
propsMatcher.withTo(equalTo(destAddress));
propsMatcher.withReplyTo(equalTo(destAddress));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
//TODO: currently we aren't sending any body section, decide if this is allowed
//messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
testPeer.expectTransfer(messageMatcher);
Message message = session.createMessage();
message.setJMSReplyTo(dest);
producer.send(message);
testPeer.waitForAllHandlersToComplete(2000);
}
}
/**
* Tests that a connection with 'prefixes' set on it via broker-provided connection properties
* strips the prefix from the to/reply-to fields for incoming messages with Topic destinations.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithTopicDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception {
Class<? extends Destination> destType = Topic.class;
String destPrefix = "t-broker-provided-prefix-";
String destName = "myTopic";
String replyName = "myReplyTopic";
String destAddress = destPrefix + destName;
String replyAddress = destPrefix + replyName;
String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE;
String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
Byte replyAnnotationValue = AmqpDestinationHelper.TOPIC_TYPE;
doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, replyName,
destAddress, replyAddress, annotationName,
annotationValue, replyAnnotationName, replyAnnotationValue);
}
/**
* Tests that a connection with 'prefixes' set on it via broker-provided connection properties
* strips the prefix from the to/reply-to fields for incoming messages with Queue destinations.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithQueueDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception {
Class<? extends Destination> destType = Queue.class;
String destPrefix = "q-broker-provided-prefix-";
String destName = "myQueue";
String replyName = "myReplyQueue";
String destAddress = destPrefix + destName;
String replyAddress = destPrefix + replyName;
String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE;
String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
Byte replyAnnotationValue = AmqpDestinationHelper.QUEUE_TYPE;
doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, replyName,
destAddress, replyAddress, annotationName,
annotationValue, replyAnnotationName, replyAnnotationValue);
}
private void doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(Class<? extends Destination> destType,
String destPrefix,
String destName,
String replyName,
String destAddress,
String replyAddress,
String annotationName,
Object annotationValue,
String replyAnnotationName,
Object replyAnnotationValue) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
// Have the test peer provide the destination prefixes as connection properties
Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
properties.put(AmqpConnectionProperties.QUEUE_PREFIX, destPrefix);
properties.put(AmqpConnectionProperties.TOPIC_PREFIX, destPrefix);
Connection connection = testFixture.establishConnecton(testPeer, null, null, properties);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination
Destination dest = null;
if (destType == Topic.class) {
dest= session.createTopic(destName);
} else if (destType == Queue.class) {
dest = session.createQueue(destName);
} else {
fail("non-temporary destination type set");
}
MessageAnnotationsDescribedType msgAnnotations = null;
if (annotationName != null || replyAnnotationName != null) {
msgAnnotations = new MessageAnnotationsDescribedType();
if (annotationName != null) {
msgAnnotations.setSymbolKeyedAnnotation(annotationName, annotationValue);
}
if (replyAnnotationName != null) {
msgAnnotations.setSymbolKeyedAnnotation(replyAnnotationName, replyAnnotationValue);
}
}
PropertiesDescribedType props = new PropertiesDescribedType();
props.setTo(destAddress);
props.setReplyTo(replyAddress);
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
SourceMatcher sourceMatcher = new SourceMatcher();
sourceMatcher.withAddress(equalTo(destAddress));
testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(dest);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(2000);
assertNotNull(receivedMessage);
Destination jmsDest = receivedMessage.getJMSDestination();
Destination jmsReplyTo = receivedMessage.getJMSReplyTo();
assertNotNull("Expected JMSDestination but got null", jmsDest);
assertNotNull("Expected JMSReplyTo but got null", jmsReplyTo);
// Verify destination/replyto names on received message
String recievedName = null;
String recievedReplyName = null;
if (destType == Topic.class) {
recievedName = ((Topic) jmsDest).getTopicName();
recievedReplyName = ((Topic) jmsReplyTo).getTopicName();
} else if (destType == Queue.class) {
recievedName = ((Queue) jmsDest).getQueueName();
recievedReplyName = ((Queue) jmsReplyTo).getQueueName();
}
assertEquals("Unexpected name for JMSDestination", destName, recievedName);
assertEquals("Unexpected name for JMSReplyTo", replyName, recievedReplyName);
}
}
/**
* Tests that the a connection with a 'queue prefix' set on it via broker-provided connection
* properties adds the prefix to the content of the to/reply-to fields for outgoing messages.
*/
@Test(timeout = 2000)
public void testSendMessageWithQueueDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception {
Class<? extends Destination> destType = Queue.class;
String destPrefix = "q-broker-provided-prefix-";
String destName = "myQueue";
String destAddress = destPrefix + destName;
Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE;
doSendMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, destAddress, annotationValue);
}
/**
* Tests that the a connection with a 'topic prefix' set on it via broker-provided connection
* properties adds the prefix to the content of the to/reply-to fields for outgoing messages.
*/
@Test(timeout = 2000)
public void testSendMessageWithTopicDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception {
Class<? extends Destination> destType = Topic.class;
String destPrefix = "t-broker-provided-prefix-";
String destName = "myTopic";
String destAddress = destPrefix + destName;
Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE;
doSendMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, destAddress, annotationValue);
}
private void doSendMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(Class<? extends Destination> destType,
String destPrefix,
String destName,
String destAddress,
Byte destTypeAnnotationValue) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
// Have the test peer provide the destination prefixes as connection properties
Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
properties.put(AmqpConnectionProperties.QUEUE_PREFIX, destPrefix);
properties.put(AmqpConnectionProperties.TOPIC_PREFIX, destPrefix);
Connection connection = testFixture.establishConnecton(testPeer, null, null, properties);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination
Destination dest = null;
if (destType == Topic.class) {
dest = session.createTopic(destName);
} else if (destType == Queue.class) {
dest = session.createQueue(destName);
} else {
fail("non-temporary destination type set");
}
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(destAddress));
testPeer.expectSenderAttach(targetMatcher, false, false);
MessageProducer producer = session.createProducer(dest);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME), equalTo(destTypeAnnotationValue));
msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME), equalTo(destTypeAnnotationValue));
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
propsMatcher.withTo(equalTo(destAddress));
propsMatcher.withReplyTo(equalTo(destAddress));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
//TODO: currently we aren't sending any body section, decide if this is allowed
//messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
testPeer.expectTransfer(messageMatcher);
Message message = session.createMessage();
message.setJMSReplyTo(dest);
producer.send(message);
testPeer.waitForAllHandlersToComplete(2000);
}
}
// --- byte type annotation values --- //
/**
* Tests that the {@link AmqpDestinationHelper#JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME} is set as a byte on
* a sent message to indicate its 'to' address represents a Topic JMSDestination.
*/
@Test(timeout = 5000)
public void testSentMessageContainsToTypeAnnotationByte() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin(true);
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_TYPE));
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(topicName));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
testPeer.expectTransfer(messageMatcher);
Message message = session.createMessage();
Topic topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
producer.send(message);
testPeer.waitForAllHandlersToComplete(2000);
}
}
/**
* Tests that the {@link AmqpDestinationHelper#JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} is set as a byte on
* a sent message to indicate its 'reply-to' address represents a Topic JMSDestination.
*/
@Test(timeout = 5000)
public void testSentMessageContainsReplyToTypeAnnotationByte() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin(true);
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
String replyTopicName = "myReplyTopic";
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_TYPE));
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withReplyTo(equalTo(replyTopicName));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
testPeer.expectTransfer(messageMatcher);
Topic replyTopic = session.createTopic(replyTopicName);
Message message = session.createMessage();
message.setJMSReplyTo(replyTopic);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.send(message);
testPeer.waitForAllHandlersToComplete(2000);
}
}
// --- old string type annotation values --- //
/**
* Tests that the {@link AmqpDestinationHelper#TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} set on a message to
* indicate its 'to' address represents a Topic results in the JMSDestination object being a
* Topic. Ensure the consumers destination is not used by consuming from a Queue.
*/
@Test(timeout = 2000)
public void testReceivedMessageFromQueueWithToTypeAnnotationForTopic() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
msgAnnotations.setSymbolKeyedAnnotation(AmqpDestinationHelper.TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, AmqpDestinationHelper.TOPIC_ATTRIBUTE);
PropertiesDescribedType props = new PropertiesDescribedType();
String myTopicAddress = "myTopicAddress";
props.setTo(myTopicAddress );
props.setMessageId("myMessageIDString");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
Destination dest = receivedMessage.getJMSDestination();
assertNotNull("Expected Topic destination but got null", dest);
assertTrue("Expected Topic instance but did not get one. Actual type was: " + dest.getClass().getName(), dest instanceof Topic);
assertEquals(myTopicAddress, ((Topic)dest).getTopicName());
}
}
/**
* Tests that the {@link AmqpDestinationHelper#REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME} set on a message to
* indicate its 'reply-to' address represents a Topic results in the JMSReplyTo object being a
* Topic. Ensure the consumers destination is not used by consuming from a Queue.
*/
@Test(timeout = 2000)
public void testReceivedMessageFromQueueWithReplyToTypeAnnotationForTopic() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
msgAnnotations.setSymbolKeyedAnnotation(AmqpDestinationHelper.REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, AmqpDestinationHelper.TOPIC_ATTRIBUTE);
PropertiesDescribedType props = new PropertiesDescribedType();
String myTopicAddress = "myTopicAddress";
props.setReplyTo(myTopicAddress);
props.setMessageId("myMessageIDString");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
Destination dest = receivedMessage.getJMSReplyTo();
assertTrue(dest instanceof Topic);
assertEquals(myTopicAddress, ((Topic)dest).getTopicName());
}
}
/**
* Tests that lack of the {@link AmqpMessageSupport#AMQP_REPLY_TO_ANNOTATION} set on a
* message to indicate type of its 'reply-to' address results in it being classed as the same
* type as the destination used to create the consumer.
*/
@Test(timeout = 2000)
public void testReceivedMessageFromQueueWithReplyToWithoutTypeAnnotationResultsInUseOfConsumerDestinationType() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
String myOtherQueueAddress = "myOtherQueueAddress";
PropertiesDescribedType props = new PropertiesDescribedType();
props.setReplyTo(myOtherQueueAddress);
props.setMessageId("myMessageIDString");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
Destination dest = receivedMessage.getJMSReplyTo();
assertTrue(dest instanceof Queue);
assertEquals(myOtherQueueAddress, ((Queue)dest).getQueueName());
}
}
/**
* Tests that lack of the reply-to set on a message results in it returning null for JMSReplyTo
* and not the consumer destination as happens for JMSDestination.
*/
@Test(timeout = 2000)
public void testReceivedMessageFromQueueWithNoReplyToReturnsNull() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
PropertiesDescribedType props = new PropertiesDescribedType();
props.setMessageId("myMessageIDString");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
assertNull(receivedMessage.getJMSReplyTo());
}
}
//==== TTL / Expiration Handling ====
//===================================
/**
* Tests that lack of the absolute-expiry-time and ttl fields on a message results
* in it returning 0 for for JMSExpiration
*/
@Test(timeout = 2000)
public void testReceivedMessageFromQueueWithNoAbsoluteExpiryOrTtlReturnsJMSExpirationZero() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
PropertiesDescribedType props = new PropertiesDescribedType();
props.setMessageId("myMessageIDString");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
assertEquals(0L, receivedMessage.getJMSExpiration());
}
}
/**
* Tests that setting a non-zero value in the absolute-expiry-time field on a
* message results in it returning this value for JMSExpiration
*/
@Test(timeout = 2000)
public void testReceivedMessageFromQueueWithAbsoluteExpiryReturnsJMSExpirationNonZero() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
long timestamp = System.currentTimeMillis();
PropertiesDescribedType props = new PropertiesDescribedType();
props.setAbsoluteExpiryTime(new Date(timestamp));
props.setMessageId("myMessageIDString");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
assertEquals(timestamp, receivedMessage.getJMSExpiration());
}
}
//==== MessageID and CorrelationID Handling ====
//==============================================
@Test(timeout = 2000)
public void testReceiveMessageWithoutMessageId() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(2000);
assertNull(receivedMessage.getJMSMessageID());
}
}
/**
* Tests that receiving a message with a string typed message-id results in returning the
* expected value for JMSMessageId where the JMS "ID:" prefix has been added.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithStringMessageIdReturnsExpectedJMSMessageID() throws Exception {
receivedMessageWithMessageIdTestImpl("myTestMessageIdString");
}
/**
* Tests that receiving a message with a UUID typed message-id results in returning the
* expected value for JMSMessageId where the JMS "ID:" prefix has been added to the UUID.tostring()
*/
@Test(timeout = 2000)
public void testReceivedMessageWithUUIDMessageIdReturnsExpectedJMSMessageID() throws Exception {
receivedMessageWithMessageIdTestImpl(UUID.randomUUID());
}
/**
* Tests that receiving a message with a ulong typed message-id results in returning the
* expected value for JMSMessageId where the JMS "ID:" prefix has been added to the UnsignedLong.tostring()
*/
@Test(timeout = 2000)
public void testReceivedMessageWithUnsignedLongMessageIdReturnsExpectedJMSMessageID() throws Exception {
receivedMessageWithMessageIdTestImpl(UnsignedLong.valueOf(123456789L));
}
/**
* Tests that receiving a message with a binary typed message-id results in returning the
* expected value for JMSMessageId where the JMS "ID:" prefix has been added to the hex representation of the binary.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithBinaryMessageIdReturnsExpectedJMSMessageID() throws Exception {
receivedMessageWithMessageIdTestImpl(new Binary(new byte[]{(byte)0x02, (byte)0x20, (byte) 0xAE, (byte) 0x00}));
}
private void receivedMessageWithMessageIdTestImpl(Object underlyingAmqpMessageId) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
PropertiesDescribedType props = new PropertiesDescribedType();
props.setMessageId(underlyingAmqpMessageId);
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
String expectedBaseIdString = new AmqpMessageIdHelper().toBaseMessageIdString(underlyingAmqpMessageId);
assertEquals("ID:" + expectedBaseIdString, receivedMessage.getJMSMessageID());
}
}
/**
* Tests that receiving a message with a string typed correlation-id results in returning the
* expected value for JMSCorrelationID where the JMS "ID:" prefix has been added.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithStringCorrelationIdReturnsExpectedJMSCorrelationID() throws Exception {
receivedMessageWithCorrelationIdTestImpl("myTestCorrelationIdString", false);
}
/**
* Tests that receiving a message with a string typed correlation-id, which is indicated to be an
* application-specific value, results in returning the expected value for JMSCorrelationID
* where the JMS "ID:" prefix has NOT been added.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithAppSpecificStringCorrelationIdReturnsExpectedJMSCorrelationID() throws Exception {
receivedMessageWithCorrelationIdTestImpl("myTestCorrelationIdString", true);
}
/**
* Tests that receiving a message with a UUID typed correlation-id results in returning the
* expected value for JMSCorrelationID where the JMS "ID:" prefix has been added to the UUID.tostring()
*/
@Test(timeout = 2000)
public void testReceivedMessageWithUUIDCorrelationIdReturnsExpectedJMSCorrelationID() throws Exception {
receivedMessageWithCorrelationIdTestImpl(UUID.randomUUID(), false);
}
/**
* Tests that receiving a message with a UUID typed correlation-id results in returning the
* expected value for JMSCorrelationID where the JMS "ID:" prefix has been added to the UUID.tostring()
*/
@Test(timeout = 2000)
public void testReceivedMessageWithLongCorrelationIdReturnsExpectedJMSCorrelationID() throws Exception {
receivedMessageWithCorrelationIdTestImpl(UnsignedLong.valueOf(123456789L), false);
}
private void receivedMessageWithCorrelationIdTestImpl(Object correlationIdForAmqpMessageClass, boolean appSpecific) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
PropertiesDescribedType props = new PropertiesDescribedType();
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
MessageAnnotationsDescribedType ann = null;
props.setMessageId("myMessageIdString");
props.setCorrelationId(correlationIdForAmqpMessageClass);
if (appSpecific) {
ann = new MessageAnnotationsDescribedType();
ann.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_APP_CORRELATION_ID, true);
}
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, ann, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
String expectedBaseIdString = new AmqpMessageIdHelper().toBaseMessageIdString(correlationIdForAmqpMessageClass);
String expected = expectedBaseIdString;
if (!appSpecific) {
expected = "ID:" + expected;
}
assertEquals(expected, receivedMessage.getJMSCorrelationID());
}
}
/**
* Tests that sending a message with a uuid typed correlation-id value which is a
* message-id results in an AMQP message with the expected encoding of the correlation-id,
* where the type is uuid, the "ID:" prefix of the JMSCorrelationID value is (obviously) not present, and there is
* no presence of the message annotation to indicate an app-specific correlation-id.
*/
@Test(timeout = 2000)
public void testSentMessageWithUUIDCorrelationId() throws Exception {
UUID uuid = UUID.randomUUID();
String stringCorrelationId = AmqpMessageIdHelper.JMS_ID_PREFIX + AmqpMessageIdHelper.AMQP_UUID_PREFIX + uuid.toString();
sentMessageWithCorrelationIdTestImpl(stringCorrelationId, uuid, false);
}
/**
* Tests that sending a message with a binary typed correlation-id value which is a
* message-id results in an AMQP message with the expected encoding of the correlation-id,
* where the type is binary, the "ID:" prefix of the JMSCorrelationID value is (obviously) not present, and there is
* no presence of the message annotation to indicate an app-specific correlation-id.
*/
@Test(timeout = 2000)
public void testSentMessageWithBinaryCorrelationId() throws Exception
{
Binary bin = new Binary(new byte[]{(byte)0x01, (byte)0x23, (byte) 0xAF, (byte) 0x00});
String stringCorrelationId = AmqpMessageIdHelper.JMS_ID_PREFIX + AmqpMessageIdHelper.AMQP_BINARY_PREFIX + "0123af00";
sentMessageWithCorrelationIdTestImpl(stringCorrelationId, bin, false);
}
/**
* Tests that sending a message with a ulong typed correlation-id value which is a
* message-id results in an AMQP message with the expected encoding of the correlation-id,
* where the type is ulong, the "ID:" prefix of the JMSCorrelationID value is (obviously) not present, and there is
* no presence of the message annotation to indicate an app-specific correlation-id.
*/
@Test(timeout = 2000)
public void testSentMessageWithUlongCorrelationId() throws Exception {
UnsignedLong ulong = UnsignedLong.valueOf(Long.MAX_VALUE);
String stringCorrelationId = AmqpMessageIdHelper.JMS_ID_PREFIX + AmqpMessageIdHelper.AMQP_ULONG_PREFIX + ulong.toString();
sentMessageWithCorrelationIdTestImpl(stringCorrelationId, ulong, false);
}
/**
* Tests that sending a message with a string typed correlation-id value which is a
* message-id results in an AMQP message with the expected encoding of the correlation-id,
* where the "ID:" prefix of the JMSCorrelationID value is not present, and there is
* no presence of the message annotation to indicate an app-specific correlation-id.
*/
@Test(timeout = 2000)
public void testSentMessageWithStringCorrelationId() throws Exception {
String stringCorrelationId = "ID:myTestMessageIdString";
String underlyingCorrelationId = "myTestMessageIdString";
sentMessageWithCorrelationIdTestImpl(stringCorrelationId, underlyingCorrelationId, false);
}
/**
* Tests that sending a message with a string typed correlation-id value which is a
* app-specific results in an AMQP message with the expected encoding of the correlation-id,
* and the presence of the message annotation to indicate an app-specific correlation-id.
*/
@Test(timeout = 2000)
public void testSentMessageWithAppSpecificStringCorrelationId() throws Exception {
String stringCorrelationId = "myTestAppSpecificString";
sentMessageWithCorrelationIdTestImpl(stringCorrelationId, stringCorrelationId, true);
}
private void sentMessageWithCorrelationIdTestImpl(String stringCorrelationId, Object correlationIdForAmqpMessageClass, boolean appSpecific) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin(true);
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
//Set matcher to validate the correlation-id, and the annotation
//presence+value if it is application-specific
propsMatcher.withCorrelationId(equalTo(correlationIdForAmqpMessageClass));
if (appSpecific) {
msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpMessageSupport.JMS_APP_CORRELATION_ID), equalTo(Boolean.TRUE));
}
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
testPeer.expectTransfer(messageMatcher);
Message message = session.createTextMessage();
message.setJMSCorrelationID(stringCorrelationId);
producer.send(message);
testPeer.waitForAllHandlersToComplete(3000);
//validate the annotation was not present if the value was a message-id
if (!appSpecific) {
assertFalse(msgAnnotationsMatcher.keyExistsInReceivedAnnotations(Symbol.valueOf(AmqpMessageSupport.JMS_APP_CORRELATION_ID)));
}
}
}
/**
* Tests that receiving a message with a string typed message-id, and then sending a message which
* uses the result of calling getJMSMessageID as the value for setJMSCorrelationId results in
* transmission of the expected AMQP message content.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithStringMessageIdAndSendValueAsCorrelationId() throws Exception {
recieveMessageIdSendCorrelationIdTestImpl("myStringMessageId");
}
/**
* Tests that receiving a message with a UUID typed message-id, and then sending a message which
* uses the result of calling getJMSMessageID as the value for setJMSCorrelationId results in
* transmission of the expected AMQP message content.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithUUIDMessageIdAndSendValueAsCorrelationId() throws Exception {
recieveMessageIdSendCorrelationIdTestImpl(UUID.randomUUID());
}
/**
* Tests that receiving a message with a ulong typed message-id, and then sending a message which
* uses the result of calling getJMSMessageID as the value for setJMSCorrelationId results in
* transmission of the expected AMQP message content.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithUlongMessageIdAndSendValueAsCorrelationId() throws Exception {
recieveMessageIdSendCorrelationIdTestImpl(UnsignedLong.valueOf(123456789L));
}
/**
* Tests that receiving a message with a binary typed message-id, and then sending a message which
* uses the result of calling getJMSMessageID as the value for setJMSCorrelationId results in
* transmission of the expected AMQP message content.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithBinaryMessageIdAndSendValueAsCorrelationId() throws Exception {
recieveMessageIdSendCorrelationIdTestImpl(new Binary(new byte[]{(byte)0x00, (byte)0xCD, (byte) 0xEF, (byte) 0x01}));
}
private void recieveMessageIdSendCorrelationIdTestImpl(Object idForAmqpMessageClass) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
PropertiesDescribedType props = new PropertiesDescribedType();
props.setMessageId(idForAmqpMessageClass);
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
String expectedBaseIdString = new AmqpMessageIdHelper().toBaseMessageIdString(idForAmqpMessageClass);
String jmsMessageID = receivedMessage.getJMSMessageID();
assertEquals("ID:" + expectedBaseIdString, jmsMessageID);
//Now take the received JMSMessageID, and send a message with it set
//as the JMSCorrelationID and verify we get the same AMQP id as we started with.
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
//Set matcher to validate the correlation-id matches the previous message-id
propsMatcher.withCorrelationId(equalTo(idForAmqpMessageClass));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
testPeer.expectTransfer(messageMatcher);
Message message = session.createTextMessage();
message.setJMSCorrelationID(jmsMessageID);
producer.send(message);
testPeer.waitForAllHandlersToComplete(3000);
}
}
//==== Group Property Handling ====
//=================================
/**
* Tests that when receiving a message with the group-id, reply-to-group-id, and group-sequence
* fields of the AMQP properties section set, that the expected JMSX or JMS_AMQP properties
* are present, and the expected values are returned when retrieved from the JMS message.
*/
@Test(timeout = 2000)
public void testReceivedMessageWithGroupRelatedPropertiesSet() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
PropertiesDescribedType props = new PropertiesDescribedType();
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
MessageAnnotationsDescribedType ann = null;
String expectedGroupId = "myGroupId123";
int expectedGroupSeq = 1;
String expectedReplyToGroupId = "myReplyToGroupId456";
props.setGroupId(expectedGroupId);
props.setGroupSequence(UnsignedInteger.valueOf(expectedGroupSeq));
props.setReplyToGroupId(expectedReplyToGroupId);
props.setMessageId("myMessageIDString");
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, ann, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(1000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull("did not receive the message", receivedMessage);
boolean foundGroupId = false;
boolean foundGroupSeq = false;
boolean foundReplyToGroupId = false;
Enumeration<?> names = receivedMessage.getPropertyNames();
assertTrue("Message had no property names", names.hasMoreElements());
while (names.hasMoreElements()) {
Object element = names.nextElement();
if (JmsClientProperties.JMSXGROUPID.equals(element)) {
foundGroupId = true;
}
if (JmsClientProperties.JMSXGROUPSEQ.equals(element)) {
foundGroupSeq = true;
}
if (AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID.equals(element)) {
foundReplyToGroupId = true;
}
}
assertTrue("JMSXGroupID not in property names", foundGroupId);
assertTrue("JMSXGroupSeq not in property names", foundGroupSeq);
assertTrue("JMS_AMQP_REPLY_TO_GROUP_ID not in property names", foundReplyToGroupId);
assertTrue("JMSXGroupID does not exist", receivedMessage.propertyExists(JmsClientProperties.JMSXGROUPID));
assertTrue("JMSXGroupSeq does not exist", receivedMessage.propertyExists(JmsClientProperties.JMSXGROUPSEQ));
assertTrue("JMS_AMQP_REPLY_TO_GROUP_ID does not exist", receivedMessage.propertyExists(AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID));
assertEquals("did not get the expected JMSXGroupID", expectedGroupId, receivedMessage.getStringProperty(JmsClientProperties.JMSXGROUPID));
assertEquals("did not get the expected JMSXGroupSeq", expectedGroupSeq, receivedMessage.getIntProperty(JmsClientProperties.JMSXGROUPSEQ));
assertEquals("did not get the expected JMS_AMQP_REPLY_TO_GROUP_ID", expectedReplyToGroupId, receivedMessage.getStringProperty(AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID));
}
}
/**
* Tests that when sending a message with the JMSXGroupID, JMSXGroupSeq, and JMS_AMQP_REPLY_TO_GROUP_ID
* properties of the JMS message set, that the expected values are included in the fields of
* the AMQP message emitted.
*/
@Test(timeout = 2000)
public void testSendMessageWithGroupRelatedPropertiesSet() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin(true);
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
String expectedGroupId = "myGroupId123";
int expectedGroupSeq = 1;
String expectedReplyToGroupId = "myReplyToGroupId456";
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
propsMatcher.withGroupId(equalTo(expectedGroupId));
propsMatcher.withReplyToGroupId(equalTo(expectedReplyToGroupId));
propsMatcher.withGroupSequence(equalTo(UnsignedInteger.valueOf(expectedGroupSeq)));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
testPeer.expectTransfer(messageMatcher);
Message message = session.createTextMessage();
message.setStringProperty(JmsClientProperties.JMSXGROUPID, expectedGroupId);
message.setIntProperty(JmsClientProperties.JMSXGROUPSEQ, expectedGroupSeq);
message.setStringProperty(AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID, expectedReplyToGroupId);
producer.send(message);
testPeer.waitForAllHandlersToComplete(1000);
}
}
}