blob: a695141f325419d7cfe0b5f0265f0674aa29a706 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.AMQPDefinedTypes;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.EncodingCodes;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class AMQPMessageTest {
private static final String TEST_TO_ADDRESS = "someAddress";
private static final String TEST_MESSAGE_ANNOTATION_KEY = "x-opt-test-annotation";
private static final String TEST_MESSAGE_ANNOTATION_VALUE = "test-annotation";
private static final String TEST_MESSAGE_ANNOTATION_KEY2 = "x-opt-test-annotation2";
private static final String TEST_MESSAGE_ANNOTATION_VALUE2 = "test-annotation2";
private static final String TEST_EXTRA_PROPERTY_KEY1 = "extraPropertyKey1";
private static final String TEST_EXTRA_PROPERTY_VALUE1 = "extraPropertyValue1";
private static final String TEST_EXTRA_PROPERTY_KEY2 = "extraPropertyKey2";
private static final String TEST_EXTRA_PROPERTY_VALUE2 = "extraPropertyValue2";
private static final String TEST_APPLICATION_PROPERTY_KEY = "key-1";
private static final String TEST_APPLICATION_PROPERTY_VALUE = "value-1";
private static final String TEST_APPLICATION_PROPERTY_KEY2 = "key-2";
private static final String TEST_APPLICATION_PROPERTY_VALUE2 = "value-2";
private static final String TEST_STRING_BODY = "test-string-body";
private static final String PROPERTY_MAP_APP_PROPERTIES_PREFIX = "applicationProperties.";
private static final String PROPERTY_MAP_PROPERTIES_PREFIX = "properties.";
private static final String PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX = "messageAnnotations.";
private static final String PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX = "extraProperties.";
private byte[] encodedProtonMessage;
private final DecoderImpl decoder = new DecoderImpl();
private final EncoderImpl encoder = new EncoderImpl(decoder);
{
AMQPDefinedTypes.registerAllTypes(decoder, encoder);
}
@Before
public void setUp() {
encodedProtonMessage = encodeMessage(createProtonMessage());
}
//----- Test Message Creation ---------------------------------------------//
@Test
public void testCreateMessageFromEncodedByteArrayData() {
// Constructor 1
AMQPStandardMessage decoded = new AMQPStandardMessage(0, encodedProtonMessage, null);
assertTrue(decoded.isDurable());
assertEquals(TEST_TO_ADDRESS, decoded.getAddress());
// Constructor 2
decoded = new AMQPStandardMessage(0, encodedProtonMessage, null, null);
assertTrue(decoded.isDurable());
assertEquals(TEST_TO_ADDRESS, decoded.getAddress());
}
@Test
public void testCreateMessageFromEncodedReadableBuffer() {
AMQPStandardMessage decoded = new AMQPStandardMessage(0, ReadableBuffer.ByteBufferReader.wrap(encodedProtonMessage), null, null);
assertEquals(true, decoded.getHeader().getDurable());
assertEquals(TEST_TO_ADDRESS, decoded.getAddress());
}
@Test
public void testCreateMessageFromEncodedByteArrayDataWithExtraProperties() {
AMQPStandardMessage decoded = new AMQPStandardMessage(0, encodedProtonMessage, new TypedProperties(), null);
assertEquals(true, decoded.getHeader().getDurable());
assertEquals(TEST_TO_ADDRESS, decoded.getAddress());
assertNotNull(decoded.getExtraProperties());
}
@Test
public void testCreateMessageForPersistenceDataReload() throws ActiveMQException {
MessageImpl protonMessage = createProtonMessage();
ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
AMQPStandardMessage message = new AMQPStandardMessage(0);
try {
message.getProtonMessage();
fail("Should throw NPE due to not being initialized yet");
} catch (NullPointerException npe) {
}
final long persistedSize = (long) encoded.readableBytes();
// Now reload from encoded data
message.reloadPersistence(encoded, null);
assertEquals(persistedSize, message.getPersistSize());
assertEquals(persistedSize - Integer.BYTES, message.getPersistentSize());
assertEquals(persistedSize - Integer.BYTES, message.getEncodeSize());
assertEquals(true, message.getHeader().getDurable());
assertEquals(TEST_TO_ADDRESS, message.getAddress());
}
@Test
public void testHasScheduledDeliveryTimeReloadPersistence() {
final long scheduledTime = System.currentTimeMillis();
MessageImpl protonMessage = createProtonMessage();
MessageAnnotations annotations = protonMessage.getMessageAnnotations();
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
AMQPMessage message = new AMQPStandardMessage(0);
try {
message.getProtonMessage();
fail("Should throw NPE due to not being initialized yet");
} catch (NullPointerException npe) {
}
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus());
// Now reload from encoded data
message.reloadPersistence(encoded, null);
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertTrue(message.hasScheduledDeliveryTime());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
message.getHeader();
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
assertTrue(message.hasScheduledDeliveryTime());
}
@Test
public void testHasScheduledDeliveryDelayReloadPersistence() {
final long scheduledDelay = 100000;
MessageImpl protonMessage = createProtonMessage();
MessageAnnotations annotations = protonMessage.getMessageAnnotations();
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
AMQPMessage message = new AMQPStandardMessage(0);
try {
message.getProtonMessage();
fail("Should throw NPE due to not being initialized yet");
} catch (NullPointerException npe) {
}
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus());
// Now reload from encoded data
message.reloadPersistence(encoded, null);
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertTrue(message.hasScheduledDeliveryTime());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
message.getHeader();
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
assertTrue(message.hasScheduledDeliveryTime());
}
@Test
public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() {
MessageImpl protonMessage = createProtonMessage();
ActiveMQBuffer encoded = encodeMessageAsPersistedBuffer(protonMessage);
AMQPMessage message = new AMQPStandardMessage(0);
try {
message.getProtonMessage();
fail("Should throw NPE due to not being initialized yet");
} catch (NullPointerException npe) {
}
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.NOT_SCANNED, message.getDataScanningStatus());
// Now reload from encoded data
message.reloadPersistence(encoded, null);
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
assertFalse(message.hasScheduledDeliveryTime());
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus());
message.getHeader();
Assert.assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus());
assertFalse(message.hasScheduledDeliveryTime());
}
//----- Test Memory Estimate access ---------------------------------------//
@Test
public void testGetMemoryEstimate() {
AMQPStandardMessage decoded = new AMQPStandardMessage(0, encodedProtonMessage, new TypedProperties(), null);
int estimate = decoded.getMemoryEstimate();
assertTrue(encodedProtonMessage.length < decoded.getMemoryEstimate());
assertEquals(estimate, decoded.getMemoryEstimate());
decoded.putStringProperty(new SimpleString("newProperty"), "newValue");
decoded.reencode();
assertNotEquals(estimate, decoded.getMemoryEstimate());
}
@Test
public void testGetMemoryEstimateWithDecodedApplicationProperties() {
AMQPStandardMessage decoded = new AMQPStandardMessage(0, encodedProtonMessage, new TypedProperties(), null);
AMQPStandardMessage decodedWithApplicationPropertiesUnmarshalled =
new AMQPStandardMessage(0, encodeMessage(createProtonMessage()), new TypedProperties(), null);
assertEquals(decodedWithApplicationPropertiesUnmarshalled.getStringProperty(TEST_APPLICATION_PROPERTY_KEY), TEST_APPLICATION_PROPERTY_VALUE);
assertNotEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate());
}
//----- Test Connection ID access -----------------------------------------//
@Test
public void testDecodeMultiThreaded() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
Properties properties = new Properties();
properties.setTo("someNiceLocal");
protonMessage.setProperties(properties);
protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7));
protonMessage.getHeader().setDurable(Boolean.TRUE);
protonMessage.setApplicationProperties(new ApplicationProperties(new HashMap<>()));
final AtomicInteger failures = new AtomicInteger(0);
for (int testTry = 0; testTry < 100; testTry++) {
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
Thread[] threads = new Thread[100];
CountDownLatch latchAlign = new CountDownLatch(threads.length);
CountDownLatch go = new CountDownLatch(1);
Runnable run = new Runnable() {
@Override
public void run() {
try {
latchAlign.countDown();
go.await();
Assert.assertNotNull(decoded.getHeader());
// this is a method used by Core Converter
decoded.getProtonMessage();
Assert.assertNotNull(decoded.getHeader());
} catch (Throwable e) {
e.printStackTrace();
failures.incrementAndGet();
}
}
};
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(run);
threads[i].start();
}
Assert.assertTrue(latchAlign.await(10, TimeUnit.SECONDS));
go.countDown();
for (Thread thread : threads) {
thread.join(5000);
Assert.assertFalse(thread.isAlive());
}
Assert.assertEquals(0, failures.get());
}
}
@Test
public void testGetConnectionID() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(null, decoded.getConnectionID());
}
@Test
public void testSetConnectionID() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
final String ID = UUID.randomUUID().toString();
assertEquals(null, decoded.getConnectionID());
decoded.setConnectionID(ID);
assertEquals(ID, decoded.getConnectionID());
}
@Test
public void testGetConnectionIDFromProperties() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
final String ID = UUID.randomUUID().toString();
assertEquals(null, decoded.getConnectionID());
decoded.setConnectionID(ID);
assertEquals(ID, decoded.getConnectionID());
assertEquals(ID, decoded.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME));
}
//----- Test LastValueProperty access -------------------------------//
@Test
public void testGetLastValueFromMessageWithNone() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getLastValueProperty());
}
@Test
public void testSetLastValueFromMessageWithNone() {
SimpleString lastValue = new SimpleString("last-address");
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getLastValueProperty());
decoded.setLastValueProperty(lastValue);
assertEquals(lastValue, decoded.getLastValueProperty());
}
//----- Test User ID access -----------------------------------------//
@Test
public void getUserIDWhenNoPropertiesExists() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getUserID());
decoded.setUserID(UUID.randomUUID().toString());
assertNull(decoded.getUserID());
}
@Test
public void testSetUserIDHasNoEffectOnMessagePropertiesWhenNotPresent() {
final String ID = UUID.randomUUID().toString();
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getUserID());
assertNull(decoded.getProperties());
decoded.setUserID(ID);
decoded.reencode();
assertNull(decoded.getUserID());
assertNull(decoded.getProperties());
}
@Test
public void testSetUserIDHasNoEffectOnMessagePropertiesWhenPresentButNoMessageID() {
final String ID = UUID.randomUUID().toString();
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setProperties(new Properties());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getUserID());
assertNotNull(decoded.getProperties());
assertNull(decoded.getProperties().getMessageId());
decoded.setUserID(ID);
decoded.reencode();
assertNull(decoded.getUserID());
assertNotNull(decoded.getProperties());
assertNull(decoded.getProperties().getMessageId());
}
@Test
public void testSetUserIDHasNoEffectOnMessagePropertiesWhenPresentWithMessageID() {
final String ID = UUID.randomUUID().toString();
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setProperties(new Properties());
protonMessage.setMessageId(ID);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNotNull(decoded.getUserID());
assertNotNull(decoded.getProperties());
assertNotNull(decoded.getProperties().getMessageId());
assertEquals(ID, decoded.getUserID());
decoded.setUserID(ID);
decoded.reencode();
assertNotNull(decoded.getUserID());
assertNotNull(decoded.getProperties());
assertNotNull(decoded.getProperties().getMessageId());
assertEquals(ID, decoded.getUserID());
}
//----- Test the getDuplicateProperty methods -----------------------------//
@Test
public void testGetDuplicateProperty() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(null, decoded.getDuplicateProperty());
}
//----- Test the getAddress methods ---------------------------------------//
@Test
public void testGetAddressFromMessage() {
final String ADDRESS = "myQueue";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setAddress(ADDRESS);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(ADDRESS, decoded.getAddress());
}
@Test
public void testGetAddressSimpleStringFromMessage() {
final String ADDRESS = "myQueue";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setAddress(ADDRESS);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(ADDRESS, decoded.getAddressSimpleString().toString());
}
@Test
public void testGetAddressFromMessageWithNoValueSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getAddress());
assertNull(decoded.getAddressSimpleString());
}
@Test
public void testSetAddressFromMessage() {
final String ADDRESS = "myQueue";
final SimpleString NEW_ADDRESS = new SimpleString("myQueue-1");
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setAddress(ADDRESS);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(ADDRESS, decoded.getAddress());
decoded.setAddress(NEW_ADDRESS);
assertEquals(NEW_ADDRESS, decoded.getAddressSimpleString());
}
@Test
public void testSetAddressFromMessageUpdatesPropertiesOnReencode() {
final String ADDRESS = "myQueue";
final SimpleString NEW_ADDRESS = new SimpleString("myQueue-1");
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setAddress(ADDRESS);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(ADDRESS, decoded.getAddress());
decoded.setAddress(NEW_ADDRESS);
decoded.reencode();
assertEquals(NEW_ADDRESS.toString(), decoded.getProperties().getTo());
assertEquals(NEW_ADDRESS, decoded.getAddressSimpleString());
}
//----- Test the durability set and get methods ---------------------------//
@Test
public void testIsDurableFromMessageWithHeaderTaggedAsTrue() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setDurable(true);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertTrue(decoded.isDurable());
}
@Test
public void testIsDurableFromMessageWithHeaderTaggedAsFalse() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setDurable(false);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertFalse(decoded.isDurable());
}
@Test
public void testIsDurableFromMessageWithNoValueSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertFalse(decoded.isDurable());
}
@Test
public void testIsDuranleReturnsTrueOnceUpdated() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertFalse(decoded.isDurable());
decoded.setDurable(true);
assertTrue(decoded.isDurable());
}
@Test
public void testNonDurableMessageReencodedToDurable() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertFalse(decoded.isDurable());
// Underlying message data not updated yet
assertNull(decoded.getHeader().getDurable());
decoded.setDurable(true);
decoded.reencode();
assertTrue(decoded.isDurable());
// Underlying message data now updated
assertTrue(decoded.getHeader().getDurable());
}
@Test
public void testMessageWithNoHeaderGetsOneWhenDurableSetAndReencoded() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertFalse(decoded.isDurable());
// Underlying message data not updated yet
assertNull(decoded.getHeader());
decoded.setDurable(true);
decoded.reencode();
assertTrue(decoded.isDurable());
// Underlying message data now updated
Header header = decoded.getHeader();
assertNotNull(header);
assertTrue(header.getDurable());
}
//----- Test RoutingType access -------------------------------------------//
@Test
public void testGetRoutingTypeFromMessageWithoutIt() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getRoutingType());
}
@Test
public void testSetRoutingType() {
RoutingType type = RoutingType.ANYCAST;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getRoutingType());
decoded.setRoutingType(type);
assertEquals(type, decoded.getRoutingType());
}
@Test
public void testSetRoutingTypeToClear() {
RoutingType type = RoutingType.ANYCAST;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getRoutingType());
decoded.setRoutingType(type);
assertEquals(type, decoded.getRoutingType());
decoded.setRoutingType(null);
assertNull(decoded.getRoutingType());
}
@Test
public void testRemoveRoutingTypeFromMessageEncodedWithOne() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.ROUTING_TYPE, RoutingType.ANYCAST.getType());
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(RoutingType.ANYCAST, decoded.getRoutingType());
decoded.setRoutingType(null);
decoded.reencode();
assertNull(decoded.getRoutingType());
assertTrue(decoded.getMessageAnnotations().getValue().isEmpty());
}
@Test
public void testGetRoutingTypeFromMessageWithAnyCastType() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.ROUTING_TYPE, RoutingType.ANYCAST.getType());
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(RoutingType.ANYCAST, decoded.getRoutingType());
}
@Test
public void testGetRoutingTypeFromMessageWithMulticastType() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.ROUTING_TYPE, RoutingType.MULTICAST.getType());
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(RoutingType.MULTICAST, decoded.getRoutingType());
}
@Test
public void testGetRoutingTypeFromMessageWithQueueType() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.QUEUE_TYPE);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(RoutingType.ANYCAST, decoded.getRoutingType());
}
@Test
public void testGetRoutingTypeFromMessageWithTempQueueType() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.TEMP_QUEUE_TYPE);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(RoutingType.ANYCAST, decoded.getRoutingType());
}
@Test
public void testGetRoutingTypeFromMessageWithTopicType() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.TOPIC_TYPE);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(RoutingType.MULTICAST, decoded.getRoutingType());
}
@Test
public void testGetRoutingTypeFromMessageWithTempTopicType() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, AMQPMessageSupport.TEMP_TOPIC_TYPE);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(RoutingType.MULTICAST, decoded.getRoutingType());
}
@Test
public void testGetRoutingTypeFromMessageWithUnknownType() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION, (byte) 32);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getRoutingType());
}
//----- Test access to message Group ID -----------------------------------//
@Test
public void testGetGroupIDFromMessage() {
final String GROUP_ID = "group-1";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setGroupId(GROUP_ID);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(GROUP_ID, decoded.getGroupID().toString());
}
@Test
public void testGetGroupIDFromMessageWithNoGroupId() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setProperties(new Properties());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getGroupID());
}
@Test
public void testGetGroupIDFromMessageWithNoProperties() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getGroupID());
}
//----- Test access to message Group ID -----------------------------------//
@Test
public void testGetReplyToFromMessage() {
final String REPLY_TO = "address-1";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setReplyTo(REPLY_TO);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(REPLY_TO, decoded.getReplyTo().toString());
}
@Test
public void testGetReplyToFromMessageWithNoReplyTo() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setProperties(new Properties());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getReplyTo());
}
@Test
public void testGetReplyToFromMessageWithNoProperties() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getReplyTo());
}
@Test
public void testSetReplyToFromMessageWithProperties() {
final String REPLY_TO = "address-1";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setProperties(new Properties());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getReplyTo());
decoded.setReplyTo(new SimpleString(REPLY_TO));
decoded.reencode();
assertEquals(REPLY_TO, decoded.getReplyTo().toString());
assertEquals(REPLY_TO, decoded.getProperties().getReplyTo());
}
@Test
public void testSetReplyToFromMessageWithNoProperties() {
final String REPLY_TO = "address-1";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getReplyTo());
decoded.setReplyTo(new SimpleString(REPLY_TO));
decoded.reencode();
assertEquals(REPLY_TO, decoded.getReplyTo().toString());
assertEquals(REPLY_TO, decoded.getProperties().getReplyTo());
}
@Test
public void testSetReplyToFromMessageWithPropertiesCanClear() {
final String REPLY_TO = "address-1";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setProperties(new Properties());
protonMessage.setReplyTo(REPLY_TO);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(REPLY_TO, decoded.getReplyTo().toString());
decoded.setReplyTo(null);
decoded.reencode();
assertEquals(null, decoded.getReplyTo());
assertEquals(null, decoded.getProperties().getReplyTo());
}
//----- Test access to User ID --------------------------------------------//
@Test
public void testGetUserIDFromMessage() {
final String USER_NAME = "foo";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setUserId(USER_NAME.getBytes(StandardCharsets.UTF_8));
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(USER_NAME, decoded.getAMQPUserID());
}
@Test
public void testGetUserIDFromMessageWithNoProperties() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getAMQPUserID());
}
@Test
public void testGetUserIDFromMessageWithNoUserID() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setProperties(new Properties());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getAMQPUserID());
}
//----- Test access message priority --------------------------------------//
@Test
public void testGetPriorityFromMessage() {
final short PRIORITY = 7;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setPriority(PRIORITY);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(PRIORITY, decoded.getPriority());
}
@Test
public void testGetPriorityFromMessageWithNoHeader() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(AMQPStandardMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
}
@Test
public void testGetPriorityFromMessageWithNoPrioritySet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(AMQPStandardMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
}
@Test
public void testSetPriorityOnMessageWithHeader() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(AMQPStandardMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
decoded.setPriority((byte) 9);
decoded.reencode();
assertEquals(9, decoded.getPriority());
assertEquals(9, decoded.getHeader().getPriority().byteValue());
}
@Test
public void testSetPriorityOnMessageWithoutHeader() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(AMQPStandardMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
decoded.setPriority((byte) 9);
decoded.reencode();
assertEquals(9, decoded.getPriority());
assertEquals(9, decoded.getHeader().getPriority().byteValue());
}
//----- Test access message expiration ------------------------------------//
@Test
public void testGetExpirationFromMessageWithNoHeader() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0, decoded.getExpiration());
}
@Test
public void testGetExpirationFromMessageWithNoTTLInHeader() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0, decoded.getExpiration());
}
@Test
public void testGetExpirationFromMessageWithNoTTLInHeaderOrExpirationInProperties() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setProperties(new Properties());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0, decoded.getExpiration());
}
@Test
public void testGetExpirationFromMessageUsingTTL() {
final long ttl = 100000;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setTtl(ttl);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertTrue(decoded.getExpiration() > System.currentTimeMillis());
}
@Test
public void testGetExpirationFromCoreMessageUsingTTL() {
final long ttl = 100000;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setTtl(ttl);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
ICoreMessage coreMessage = decoded.toCore();
assertEquals(decoded.getExpiration(), coreMessage.getExpiration());
}
@Test
public void testGetExpirationFromMessageUsingAbsoluteExpiration() {
final Date expirationTime = new Date(System.currentTimeMillis());
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
Properties properties = new Properties();
properties.setAbsoluteExpiryTime(expirationTime);
protonMessage.setProperties(properties);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(expirationTime.getTime(), decoded.getExpiration());
}
@Test
public void testGetExpirationFromMessageUsingAbsoluteExpirationNegative() {
final Date expirationTime = new Date(-1);
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
Properties properties = new Properties();
properties.setAbsoluteExpiryTime(expirationTime);
protonMessage.setProperties(properties);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0, decoded.getExpiration());
}
@Test
public void testGetExpirationFromMessageAbsoluteExpirationOVerrideTTL() {
final Date expirationTime = new Date(System.currentTimeMillis());
final long ttl = 100000;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setTtl(ttl);
Properties properties = new Properties();
properties.setAbsoluteExpiryTime(expirationTime);
protonMessage.setProperties(properties);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(expirationTime.getTime(), decoded.getExpiration());
}
@Test
public void testSetExpiration() {
final Date expirationTime = new Date(System.currentTimeMillis());
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0, decoded.getExpiration());
decoded.setExpiration(expirationTime.getTime());
assertEquals(expirationTime.getTime(), decoded.getExpiration());
}
@Test
public void testSetExpirationUpdatesProperties() {
final Date originalExpirationTime = new Date(System.currentTimeMillis());
final Date expirationTime = new Date(System.currentTimeMillis());
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setProperties(new Properties());
protonMessage.setExpiryTime(originalExpirationTime.getTime());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(originalExpirationTime.getTime(), decoded.getExpiration());
decoded.setExpiration(expirationTime.getTime());
assertEquals(expirationTime.getTime(), decoded.getExpiration());
decoded.reencode();
assertEquals(expirationTime, decoded.getProperties().getAbsoluteExpiryTime());
}
@Test
public void testSetExpirationAddsPropertiesWhenNonePresent() {
final Date expirationTime = new Date(System.currentTimeMillis());
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0, decoded.getExpiration());
decoded.setExpiration(expirationTime.getTime());
assertEquals(expirationTime.getTime(), decoded.getExpiration());
decoded.reencode();
assertEquals(expirationTime, decoded.getProperties().getAbsoluteExpiryTime());
}
@Test
public void testSetExpirationToClearUpdatesPropertiesWhenPresent() {
final Date expirationTime = new Date(System.currentTimeMillis());
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setProperties(new Properties());
protonMessage.setExpiryTime(expirationTime.getTime());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(expirationTime.getTime(), decoded.getExpiration());
decoded.setExpiration(-1);
assertEquals(0, decoded.getExpiration());
decoded.reencode();
assertEquals(0, decoded.getExpiration());
assertNull(decoded.getProperties().getAbsoluteExpiryTime());
}
@Test
public void testSetExpirationToClearDoesNotAddPropertiesWhenNonePresent() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0, decoded.getExpiration());
decoded.setExpiration(-1);
assertEquals(0, decoded.getExpiration());
decoded.reencode();
assertEquals(0, decoded.getExpiration());
assertNull(decoded.getProperties());
}
@Test
public void testSetExpirationToClearUpdateHeader() {
final long ttl = 100000;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setTtl(ttl);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertTrue(decoded.getExpiration() > System.currentTimeMillis());
decoded.setExpiration(-1);
decoded.reencode();
assertEquals(0, decoded.getExpiration());
assertNull(decoded.getHeader().getTtl());
}
//----- Test access message time stamp ------------------------------------//
@Test
public void testGetTimestampFromMessage() {
Date timestamp = new Date(System.currentTimeMillis());
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
Properties properties = new Properties();
properties.setCreationTime(timestamp);
protonMessage.setProperties(properties);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(timestamp.getTime(), decoded.getTimestamp());
}
@Test
public void testGetTimestampFromMessageWithNoCreateTimeSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0L, decoded.getTimestamp());
}
@Test
public void testGetTimestampFromMessageWithNoHeader() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0L, decoded.getTimestamp());
}
@Test
public void testSetTimestampOnMessage() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setProperties(new Properties());
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0L, decoded.getTimestamp());
Date createTime = new Date(System.currentTimeMillis());
decoded.setTimestamp(createTime.getTime());
decoded.reencode();
assertEquals(createTime.getTime(), decoded.getTimestamp());
assertEquals(createTime, decoded.getProperties().getCreationTime());
}
@Test
public void testSetTimestampOnMessageWithNoPropertiesSection() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0L, decoded.getTimestamp());
Date createTime = new Date(System.currentTimeMillis());
decoded.setTimestamp(createTime.getTime());
decoded.reencode();
assertNotNull(decoded.getProperties());
assertEquals(createTime.getTime(), decoded.getTimestamp());
assertEquals(createTime, decoded.getProperties().getCreationTime());
}
//----- Test access to message scheduled delivery time --------------------//
@Test
public void testGetScheduledDeliveryTimeMessageSentWithFixedTime() {
final long scheduledTime = System.currentTimeMillis();
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
}
@Test
public void testGetScheduledDeliveryTimeMessageSentWithFixedTimeAndDelay() {
final long scheduledTime = System.currentTimeMillis();
final long scheduledDelay = 100000;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
}
@Test
public void testGetScheduledDeliveryTimeMessageSentWithFixedDelay() {
final long scheduledDelay = 100000;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertTrue(decoded.getScheduledDeliveryTime().longValue() > System.currentTimeMillis());
}
@Test
public void testGetScheduledDeliveryTimeWhenMessageHasNoSetValue() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0, decoded.getScheduledDeliveryTime().longValue());
}
@Test
public void testSetScheduledDeliveryTimeWhenNonPresent() {
final long scheduledTime = System.currentTimeMillis() + 5000;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0, decoded.getScheduledDeliveryTime().longValue());
decoded.setScheduledDeliveryTime(scheduledTime);
assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
decoded.reencode();
assertEquals(scheduledTime, decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME));
}
@Test
public void testSetScheduledDeliveryTimeMessageSentWithFixedTime() {
final long scheduledTime = System.currentTimeMillis();
final long newScheduledTime = System.currentTimeMillis() + 1000;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
decoded.setScheduledDeliveryTime(newScheduledTime);
assertEquals(newScheduledTime, decoded.getScheduledDeliveryTime().longValue());
decoded.reencode();
assertEquals(newScheduledTime, decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME));
}
@Test
public void testSetScheduledDeliveryTimeMessageSentWithFixedDelay() {
final long scheduledDelay = 100000;
final long newScheduledTime = System.currentTimeMillis() + 1000;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertTrue(decoded.getScheduledDeliveryTime().longValue() > System.currentTimeMillis());
decoded.setScheduledDeliveryTime(newScheduledTime);
assertEquals(newScheduledTime, decoded.getScheduledDeliveryTime().longValue());
decoded.reencode();
assertEquals(newScheduledTime, decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME));
}
@Test
public void testSetScheduledDeliveryTimeToNoneClearsDelayAndTimeValues() {
final long scheduledTime = System.currentTimeMillis();
final long scheduledDelay = 100000;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY, scheduledDelay);
annotations.getValue().put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledTime);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(scheduledTime, decoded.getScheduledDeliveryTime().longValue());
decoded.setScheduledDeliveryTime((long) 0);
decoded.reencode();
assertNull(decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME));
assertNull(decoded.getMessageAnnotations().getValue().get(AMQPMessageSupport.SCHEDULED_DELIVERY_DELAY));
}
//----- Tests access to Message Annotations -------------------------------//
@Test
public void testGetAnnotation() {
AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null);
Object result = message.getAnnotation(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY));
String stringResult = message.getAnnotationString(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY));
assertEquals(result, stringResult);
}
@Test
public void testRemoveAnnotation() {
AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null);
assertNotNull(message.getAnnotation(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY)));
message.removeAnnotation(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY));
assertNull(message.getAnnotation(new SimpleString(TEST_MESSAGE_ANNOTATION_KEY)));
message.reencode();
assertTrue(message.getMessageAnnotations().getValue().isEmpty());
}
@Test
public void testSetAnnotation() {
AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null);
final SimpleString newAnnotation = new SimpleString("testSetAnnotation");
final String newValue = "newValue";
message.setAnnotation(newAnnotation, newValue);
assertEquals(newValue, message.getAnnotation(newAnnotation));
message.reencode();
assertEquals(newValue, message.getMessageAnnotations().getValue().get(Symbol.valueOf(newAnnotation.toString())));
}
//----- Tests accessing Proton Sections from encoded data -----------------//
@Test
public void testGetProtonMessage() {
MessageImpl protonMessage = createProtonMessage();
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
assertProtonMessageEquals(protonMessage, message.getProtonMessage());
message.setAnnotation(new SimpleString("testGetProtonMessage"), "1");
message.messageChanged();
assertProtonMessageNotEquals(protonMessage, message.getProtonMessage());
}
@Test
public void testGetHeader() {
MessageImpl protonMessage = createProtonMessage();
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
Header decoded = message.getHeader();
assertNotSame(decoded, protonMessage.getHeader());
assertHeaderEquals(protonMessage.getHeader(), decoded);
// Update the values
decoded.setDeliveryCount(UnsignedInteger.ZERO);
decoded.setTtl(UnsignedInteger.valueOf(255));
decoded.setFirstAcquirer(true);
// Check that the message is unaffected.
assertHeaderNotEquals(protonMessage.getHeader(), decoded);
}
@Test
public void testGetProperties() {
MessageImpl protonMessage = createProtonMessage();
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
Properties decoded = message.getProperties();
assertNotSame(decoded, protonMessage.getProperties());
assertPropertiesEquals(protonMessage.getProperties(), decoded);
// Update the values
decoded.setAbsoluteExpiryTime(new Date(System.currentTimeMillis()));
decoded.setGroupSequence(UnsignedInteger.valueOf(255));
decoded.setSubject(UUID.randomUUID().toString());
// Check that the message is unaffected.
assertPropertiesNotEquals(protonMessage.getProperties(), decoded);
}
@Test
public void testGetDeliveryAnnotations() {
MessageImpl protonMessage = createProtonMessage();
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
deliveryAnnotations.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test-1");
protonMessage.setDeliveryAnnotations(deliveryAnnotations);
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
DeliveryAnnotations decoded = message.getDeliveryAnnotations();
assertNotSame(decoded, protonMessage.getDeliveryAnnotations());
assertDeliveryAnnotationsEquals(protonMessage.getDeliveryAnnotations(), decoded);
// Update the values
decoded.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test-2");
// Check that the message is unaffected.
assertDeliveryAnnotationsNotEquals(protonMessage.getDeliveryAnnotations(), decoded);
}
@Test
public void testGetMessageAnnotations() {
MessageImpl protonMessage = createProtonMessage();
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
MessageAnnotations decoded = message.getMessageAnnotations();
assertNotSame(decoded, protonMessage.getMessageAnnotations());
assertMessageAnnotationsEquals(protonMessage.getMessageAnnotations(), decoded);
// Update the values
decoded.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test");
// Check that the message is unaffected.
assertMessageAnnotationsNotEquals(protonMessage.getMessageAnnotations(), decoded);
}
@Test
public void testGetApplicationProperties() {
MessageImpl protonMessage = createProtonMessage();
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
ApplicationProperties decoded = message.getApplicationProperties();
assertNotSame(decoded, protonMessage.getApplicationProperties());
assertApplicationPropertiesEquals(protonMessage.getApplicationProperties(), decoded);
// Update the values
decoded.getValue().put(UUID.randomUUID().toString(), "test");
// Check that the message is unaffected.
assertApplicationPropertiesNotEquals(protonMessage.getApplicationProperties(), decoded);
}
@Test
public void testGetBody() {
MessageImpl protonMessage = createProtonMessage();
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
Object body = message.getBody();
assertTrue(body instanceof AmqpValue);
AmqpValue amqpValueBody = (AmqpValue) body;
assertNotNull(amqpValueBody.getValue());
assertNotSame(((AmqpValue)protonMessage.getBody()).getValue(), amqpValueBody.getValue());
assertEquals(((AmqpValue)protonMessage.getBody()).getValue(), amqpValueBody.getValue());
}
@SuppressWarnings("unchecked")
@Test
public void testGetFooter() {
MessageImpl protonMessage = createProtonMessage();
Footer footer = new Footer(new HashMap<>());
footer.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test-1");
protonMessage.setFooter(footer);
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
Footer decoded = message.getFooter();
assertNotSame(decoded, protonMessage.getFooter());
assertFootersEquals(protonMessage.getFooter(), decoded);
// Update the values
decoded.getValue().put(Symbol.valueOf(UUID.randomUUID().toString()), "test-2");
// Check that the message is unaffected.
assertFootersNotEquals(protonMessage.getFooter(), decoded);
}
//----- Test re-encode of updated message sections ------------------------//
@Test
public void testApplicationPropertiesReencodeAfterUpdate() {
MessageImpl protonMessage = createProtonMessage();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertProtonMessageEquals(protonMessage, decoded.getProtonMessage());
decoded.putStringProperty("key-2", "value-2");
decoded.reencode();
assertProtonMessageNotEquals(protonMessage, decoded.getProtonMessage());
assertEquals(decoded.getStringProperty(TEST_APPLICATION_PROPERTY_KEY), TEST_APPLICATION_PROPERTY_VALUE);
assertEquals(decoded.getStringProperty("key-2"), "value-2");
}
@Test
public void testMessageAnnotationsReencodeAfterUpdate() {
final SimpleString TEST_ANNOTATION = new SimpleString("testMessageAnnotationsReencodeAfterUpdate");
MessageImpl protonMessage = createProtonMessage();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertProtonMessageEquals(protonMessage, decoded.getProtonMessage());
decoded.setAnnotation(TEST_ANNOTATION, "value-2");
decoded.reencode();
assertProtonMessageNotEquals(protonMessage, decoded.getProtonMessage());
assertEquals(decoded.getAnnotation(TEST_ANNOTATION), "value-2");
}
//----- Test handling of message extra properties -------------------------//
@Test
public void testExtraByteProperty() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
byte[] value = RandomUtil.randomBytes();
SimpleString name = SimpleString.toSimpleString("myProperty");
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getExtraProperties());
assertNull(decoded.getExtraBytesProperty(name));
assertNull(decoded.removeExtraBytesProperty(name));
decoded.putExtraBytesProperty(name, value);
assertFalse(decoded.getExtraProperties().isEmpty());
assertTrue(Arrays.equals(value, decoded.getExtraBytesProperty(name)));
assertTrue(Arrays.equals(value, decoded.removeExtraBytesProperty(name)));
}
@Test
public void testExtraProperty() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
byte[] original = RandomUtil.randomBytes();
SimpleString name = SimpleString.toSimpleString("myProperty");
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
decoded.setAddress("someAddress");
decoded.setMessageID(33);
decoded.putExtraBytesProperty(name, original);
ICoreMessage coreMessage = decoded.toCore();
Assert.assertEquals(original, coreMessage.getBytesProperty(name));
ActiveMQBuffer buffer = ActiveMQBuffers.pooledBuffer(10 * 1024);
try {
decoded.getPersister().encode(buffer, decoded);
Assert.assertEquals(AMQPMessagePersisterV3.getInstance().getID(), buffer.readByte()); // the journal reader will read 1 byte to find the persister
AMQPStandardMessage readMessage = (AMQPStandardMessage)decoded.getPersister().decode(buffer, null, null);
Assert.assertEquals(33, readMessage.getMessageID());
Assert.assertEquals("someAddress", readMessage.getAddress());
assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
} finally {
buffer.release();
}
{
ICoreMessage embeddedMessage = EmbedMessageUtil.embedAsCoreMessage(decoded);
AMQPStandardMessage readMessage = (AMQPStandardMessage) EmbedMessageUtil.extractEmbedded(embeddedMessage, null);
Assert.assertEquals(33, readMessage.getMessageID());
Assert.assertEquals("someAddress", readMessage.getAddress());
assertArrayEquals(original, readMessage.getExtraBytesProperty(name));
}
}
//----- Test that message decode ignores unused sections ------------------//
private static final UnsignedLong AMQPVALUE_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000077L);
private static final UnsignedLong APPLICATION_PROPERTIES_DESCRIPTOR = UnsignedLong.valueOf(0x0000000000000074L);
@Test
public void testPartialDecodeIgnoresApplicationPropertiesByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(APPLICATION_PROPERTIES_DESCRIPTOR.byteValue());
encodedBytes.writeByte(EncodingCodes.MAP8);
encodedBytes.writeByte(2); // Size
encodedBytes.writeByte(2); // Elements
// Use bad encoding code on underlying type of map key which will fail the decode if run
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPStandardMessage message = null;
try {
message = new AMQPStandardMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
assertTrue(message.isDurable());
try {
// This should perform the lazy decode of the ApplicationProperties portion of the message
message.getStringProperty("test");
fail("Should have thrown an error when attempting to decode the ApplicationProperties which are malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
@Test
public void testPartialDecodeIgnoresBodyByDefault() {
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 6));
ByteBuf encodedBytes = Unpooled.buffer(1024);
NettyWritable writable = new NettyWritable(encodedBytes);
EncoderImpl encoder = TLSEncode.getEncoder();
encoder.setByteBuffer(writable);
encoder.writeObject(header);
// Signal body of AmqpValue but write corrupt underlying type info
encodedBytes.writeByte(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
encodedBytes.writeByte(EncodingCodes.SMALLULONG);
encodedBytes.writeByte(AMQPVALUE_DESCRIPTOR.byteValue());
// Use bad encoding code on underlying type
encodedBytes.writeByte(255);
ReadableBuffer readable = new NettyReadable(encodedBytes);
AMQPStandardMessage message = null;
try {
message = new AMQPStandardMessage(0, readable, null, null);
} catch (Exception decodeError) {
fail("Should not have encountered an exception on partial decode: " + decodeError.getMessage());
}
assertTrue(message.isDurable());
try {
// This will decode the body section if present in order to present it as a Proton Message object
message.getBody();
fail("Should have thrown an error when attempting to decode the body which is malformed.");
} catch (Exception ex) {
// Expected decode to fail when building full message.
}
}
//----- Tests for message copy correctness --------------------------------//
@Test
public void testCopyMessage() {
AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null, null);
message.setMessageID(127);
AMQPStandardMessage copy = (AMQPStandardMessage) message.copy();
assertEquals(message.getMessageID(), copy.getMessageID());
assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
}
@Test
public void testCopyMessageWithNewArtemisMessageID() {
AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null, null);
message.setMessageID(127);
AMQPStandardMessage copy = (AMQPStandardMessage) message.copy(255);
assertNotEquals(message.getMessageID(), copy.getMessageID());
assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
}
@Test
public void testCopyMessageDoesNotRemovesMessageAnnotations() {
MessageImpl protonMessage = createProtonMessage();
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
deliveryAnnotations.getValue().put(Symbol.valueOf("testCopyMessageRemovesMessageAnnotations"), "1");
protonMessage.setDeliveryAnnotations(deliveryAnnotations);
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
message.setMessageID(127);
AMQPStandardMessage copy = (AMQPStandardMessage) message.copy();
assertEquals(message.getMessageID(), copy.getMessageID());
assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
assertNotNull(copy.getDeliveryAnnotations());
}
@Test
public void testDecodeCopyUpdateReencodeAndThenDecodeAgain() {
AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null, null);
// Sanity checks
assertTrue(message.isDurable());
assertEquals(TEST_STRING_BODY, ((AmqpValue) message.getBody()).getValue());
// Copy the message
message = (AMQPStandardMessage) message.copy();
// Sanity checks
assertTrue(message.isDurable());
assertEquals(TEST_STRING_BODY, ((AmqpValue) message.getBody()).getValue());
// Update the message
message.setAnnotation(new SimpleString("x-opt-extra-1"), "test-1");
message.setAnnotation(new SimpleString("x-opt-extra-2"), "test-2");
message.setAnnotation(new SimpleString("x-opt-extra-3"), "test-3");
// Reencode and then decode the message again
message.reencode();
// Sanity checks
assertTrue(message.isDurable());
assertEquals(TEST_STRING_BODY, ((AmqpValue) message.getBody()).getValue());
}
//----- Test sendBuffer method --------------------------------------------//
@Test
public void testSendBuffer() {
ByteBuf buffer = Unpooled.buffer(255);
AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null, null);
message.sendBuffer(buffer, 1);
assertNotNull(buffer);
AMQPStandardMessage copy = new AMQPStandardMessage(0, new NettyReadable(buffer), null, null);
assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
}
//----- Test getSendBuffer variations -------------------------------------//
@Test
public void testGetSendBuffer() {
AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null, null);
ReadableBuffer buffer = message.getSendBuffer(1, null);
assertNotNull(buffer);
assertTrue(buffer.hasArray());
assertTrue(Arrays.equals(encodedProtonMessage, buffer.array()));
AMQPStandardMessage copy = new AMQPStandardMessage(0, buffer, null, null);
assertProtonMessageEquals(message.getProtonMessage(), copy.getProtonMessage());
}
@Test
public void testGetSendBufferAddsDeliveryCountOnlyToSendMessage() {
AMQPStandardMessage message = new AMQPStandardMessage(0, encodedProtonMessage, null, null);
ReadableBuffer buffer = message.getSendBuffer(7, null);
assertNotNull(buffer);
message.reencode(); // Ensures Header is current if accidentally updated
AMQPStandardMessage copy = new AMQPStandardMessage(0, buffer, null, null);
MessageImpl originalsProtonMessage = message.getProtonMessage();
MessageImpl copyProtonMessage = copy.getProtonMessage();
assertProtonMessageNotEquals(originalsProtonMessage, copyProtonMessage);
assertNull(originalsProtonMessage.getHeader().getDeliveryCount());
assertEquals(6, copyProtonMessage.getHeader().getDeliveryCount().intValue());
}
@Test
public void testGetSendBufferAddsDeliveryCountOnlyToSendMessageOriginalHadNoHeader() {
MessageImpl protonMessage = (MessageImpl) Proton.message();
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
ReadableBuffer buffer = message.getSendBuffer(7, null);
assertNotNull(buffer);
message.reencode(); // Ensures Header is current if accidentally updated
AMQPStandardMessage copy = new AMQPStandardMessage(0, buffer, null, null);
MessageImpl originalsProtonMessage = message.getProtonMessage();
MessageImpl copyProtonMessage = copy.getProtonMessage();
assertProtonMessageNotEquals(originalsProtonMessage, copyProtonMessage);
assertNull(originalsProtonMessage.getHeader());
assertEquals(6, copyProtonMessage.getHeader().getDeliveryCount().intValue());
}
@Test
public void testGetSendBufferRemoveDeliveryAnnotations() {
MessageImpl protonMessage = createProtonMessage();
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
deliveryAnnotations.getValue().put(Symbol.valueOf("testGetSendBufferRemoveDeliveryAnnotations"), "X");
protonMessage.setDeliveryAnnotations(deliveryAnnotations);
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
ReadableBuffer buffer = message.getSendBuffer(1, null);
assertNotNull(buffer);
AMQPStandardMessage copy = new AMQPStandardMessage(0, buffer, null, null);
MessageImpl copyProtonMessage = copy.getProtonMessage();
assertProtonMessageNotEquals(message.getProtonMessage(), copyProtonMessage);
assertNull(copyProtonMessage.getDeliveryAnnotations());
}
@Test
public void testGetSendBufferAddsDeliveryCountOnlyToSendMessageAndTrimsDeliveryAnnotations() {
MessageImpl protonMessage = createProtonMessage();
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
deliveryAnnotations.getValue().put(Symbol.valueOf("testGetSendBufferRemoveDeliveryAnnotations"), "X");
protonMessage.setDeliveryAnnotations(deliveryAnnotations);
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
ReadableBuffer buffer = message.getSendBuffer(7, null);
assertNotNull(buffer);
message.reencode(); // Ensures Header is current if accidentally updated
AMQPStandardMessage copy = new AMQPStandardMessage(0, buffer, null, null);
MessageImpl originalsProtonMessage = message.getProtonMessage();
MessageImpl copyProtonMessage = copy.getProtonMessage();
assertProtonMessageNotEquals(originalsProtonMessage, copyProtonMessage);
assertNull(originalsProtonMessage.getHeader().getDeliveryCount());
assertEquals(6, copyProtonMessage.getHeader().getDeliveryCount().intValue());
assertNull(copyProtonMessage.getDeliveryAnnotations());
}
//----- Test reencode method ----------------------------------------------//
@Test
public void testReencodeOnMessageWithNoPayoad() {
doTestMessageReencodeProducesEqualMessage(false, false, false, false, false, false, false);
}
@Test
public void testReencodeOnMessageWithFullPayoad() {
doTestMessageReencodeProducesEqualMessage(true, true, true, true, true, true, true);
}
@Test
public void testReencodeOnMessageWithHeadersOnly() {
doTestMessageReencodeProducesEqualMessage(true, false, false, false, false, false, false);
}
@Test
public void testReencodeOnMessageWithDeliveryAnnotationsOnly() {
doTestMessageReencodeProducesEqualMessage(false, true, false, false, false, false, false);
}
@Test
public void testReencodeOnMessageWithMessageAnnotationsOnly() {
doTestMessageReencodeProducesEqualMessage(false, false, true, false, false, false, false);
}
@Test
public void testReencodeOnMessageWithPropertiesOnly() {
doTestMessageReencodeProducesEqualMessage(false, false, false, true, false, false, false);
}
@Test
public void testReencodeOnMessageWithApplicationPropertiesOnly() {
doTestMessageReencodeProducesEqualMessage(false, false, false, false, true, false, false);
}
@Test
public void testReencodeOnMessageWithBodyOnly() {
doTestMessageReencodeProducesEqualMessage(false, false, false, false, false, true, false);
}
@Test
public void testReencodeOnMessageWithFooterOnly() {
doTestMessageReencodeProducesEqualMessage(false, false, false, false, false, false, true);
}
@Test
public void testReencodeOnMessageWithApplicationPropertiesAndBody() {
doTestMessageReencodeProducesEqualMessage(false, false, false, false, true, true, false);
}
@Test
public void testReencodeOnMessageWithApplicationPropertiesAndBodyAndFooter() {
doTestMessageReencodeProducesEqualMessage(false, false, false, false, true, true, true);
}
@Test
public void testReencodeOnMessageWithPropertiesApplicationPropertiesAndBodyAndFooter() {
doTestMessageReencodeProducesEqualMessage(false, false, false, true, true, true, true);
}
@Test
public void testReencodeOnMessageWithMessageAnnotationsPropertiesApplicationPropertiesAndBodyAndFooter() {
doTestMessageReencodeProducesEqualMessage(false, false, true, true, true, true, true);
}
@Test
public void testReencodeOnMessageWithDeliveryAnnotationsMessageAnnotationsPropertiesApplicationPropertiesBodyFooter() {
doTestMessageReencodeProducesEqualMessage(false, true, true, true, true, true, true);
}
@SuppressWarnings("unchecked")
private void doTestMessageReencodeProducesEqualMessage(
boolean header, boolean deliveryAnnotations, boolean messageAnnotations, boolean properties, boolean applicationProperties, boolean body, boolean footer) {
MessageImpl protonMessage = (MessageImpl) Proton.message();
if (header) {
Header headers = new Header();
headers.setDurable(true);
headers.setPriority(UnsignedByte.valueOf((byte) 9));
protonMessage.setHeader(headers);
}
if (properties) {
Properties props = new Properties();
props.setCreationTime(new Date(System.currentTimeMillis()));
props.setTo(TEST_TO_ADDRESS);
props.setMessageId(UUID.randomUUID());
protonMessage.setProperties(props);
}
if (deliveryAnnotations) {
DeliveryAnnotations annotations = new DeliveryAnnotations(new HashMap<>());
annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY + "_DA"), TEST_MESSAGE_ANNOTATION_VALUE);
protonMessage.setDeliveryAnnotations(annotations);
}
if (messageAnnotations) {
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
protonMessage.setMessageAnnotations(annotations);
}
if (applicationProperties) {
ApplicationProperties appProps = new ApplicationProperties(new HashMap<>());
appProps.getValue().put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
protonMessage.setApplicationProperties(appProps);
}
if (body) {
AmqpValue text = new AmqpValue(TEST_STRING_BODY);
protonMessage.setBody(text);
}
if (footer) {
Footer foot = new Footer(new HashMap<>());
foot.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY + "_FOOT"), TEST_MESSAGE_ANNOTATION_VALUE);
protonMessage.setFooter(foot);
}
AMQPStandardMessage message = new AMQPStandardMessage(0, encodeMessage(protonMessage), null, null);
message.reencode();
if (deliveryAnnotations) {
assertProtonMessageNotEquals(protonMessage, message.getProtonMessage());
} else {
assertProtonMessageEquals(protonMessage, message.getProtonMessage());
}
}
@Test
public void testGetSendBufferWithoutDeliveryAnnotations() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
Header header = new Header();
header.setDeliveryCount(new UnsignedInteger(1));
protonMessage.setHeader(header);
Properties properties = new Properties();
properties.setTo("someNiceLocal");
protonMessage.setProperties(properties);
protonMessage.setBody(new AmqpValue("Sample payload"));
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
final String annotationKey = "annotationKey";
final String annotationValue = "annotationValue";
deliveryAnnotations.getValue().put(Symbol.getSymbol(annotationKey), annotationValue);
protonMessage.setDeliveryAnnotations(deliveryAnnotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
ReadableBuffer sendBuffer = decoded.getSendBuffer(1, null);
assertEquals(decoded.getEncodeSize(), sendBuffer.capacity());
AMQPStandardMessage msgFromSendBuffer = new AMQPStandardMessage(0, sendBuffer, null, null);
assertEquals("someNiceLocal", msgFromSendBuffer.getAddress());
assertNull(msgFromSendBuffer.getDeliveryAnnotations());
// again with higher deliveryCount
ReadableBuffer sendBuffer2 = decoded.getSendBuffer(5, null);
assertEquals(decoded.getEncodeSize(), sendBuffer2.capacity());
AMQPStandardMessage msgFromSendBuffer2 = new AMQPStandardMessage(0, sendBuffer2, null, null);
assertEquals("someNiceLocal", msgFromSendBuffer2.getAddress());
assertNull(msgFromSendBuffer2.getDeliveryAnnotations());
}
@Test
public void testGetSendBufferWithDeliveryAnnotations() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
Header header = new Header();
header.setDeliveryCount(new UnsignedInteger(1));
protonMessage.setHeader(header);
Properties properties = new Properties();
properties.setTo("someNiceLocal");
protonMessage.setProperties(properties);
protonMessage.setBody(new AmqpValue("Sample payload"));
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
DeliveryAnnotations newDeliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
final String annotationKey = "annotationKey";
final String annotationValue = "annotationValue";
newDeliveryAnnotations.getValue().put(Symbol.getSymbol(annotationKey), annotationValue);
decoded.setDeliveryAnnotationsForSendBuffer(newDeliveryAnnotations);
ReadableBuffer sendBuffer = decoded.getSendBuffer(1, null);
assertEquals(decoded.getEncodeSize(), sendBuffer.capacity());
AMQPStandardMessage msgFromSendBuffer = new AMQPStandardMessage(0, sendBuffer, null, null);
assertEquals("someNiceLocal", msgFromSendBuffer.getAddress());
assertNotNull(msgFromSendBuffer.getDeliveryAnnotations());
assertEquals(1, msgFromSendBuffer.getDeliveryAnnotations().getValue().size());
assertEquals(annotationValue, msgFromSendBuffer.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey)));
// again with higher deliveryCount
DeliveryAnnotations newDeliveryAnnotations2 = new DeliveryAnnotations(new HashMap<>());
final String annotationKey2 = "annotationKey2";
final String annotationValue2 = "annotationValue2";
newDeliveryAnnotations2.getValue().put(Symbol.getSymbol(annotationKey2), annotationValue2);
decoded.setDeliveryAnnotationsForSendBuffer(newDeliveryAnnotations2);
ReadableBuffer sendBuffer2 = decoded.getSendBuffer(5, null);
assertEquals(decoded.getEncodeSize(), sendBuffer2.capacity());
AMQPStandardMessage msgFromSendBuffer2 = new AMQPStandardMessage(0, sendBuffer2, null, null);
assertEquals("someNiceLocal", msgFromSendBuffer2.getAddress());
assertNotNull(msgFromSendBuffer2.getDeliveryAnnotations());
assertEquals(1, msgFromSendBuffer2.getDeliveryAnnotations().getValue().size());
assertEquals(annotationValue2, msgFromSendBuffer2.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey2)));
}
/** It validates we are not adding a header if we don't need to */
@Test
public void testGetSendBufferWithDeliveryAnnotationsAndNoHeader() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
Properties properties = new Properties();
properties.setTo("someNiceLocal");
protonMessage.setProperties(properties);
protonMessage.setBody(new AmqpValue("Sample payload"));
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
DeliveryAnnotations newDeliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
final String annotationKey = "annotationKey";
final String annotationValue = "annotationValue";
newDeliveryAnnotations.getValue().put(Symbol.getSymbol(annotationKey), annotationValue);
decoded.setDeliveryAnnotationsForSendBuffer(newDeliveryAnnotations);
ReadableBuffer sendBuffer = decoded.getSendBuffer(1, null);
assertEquals(decoded.getEncodeSize(), sendBuffer.capacity());
AMQPStandardMessage msgFromSendBuffer = new AMQPStandardMessage(0, sendBuffer, null, null);
assertEquals("someNiceLocal", msgFromSendBuffer.getAddress());
assertNull(msgFromSendBuffer.getProtonMessage().getHeader());
assertNotNull(msgFromSendBuffer.getDeliveryAnnotations());
assertEquals(1, msgFromSendBuffer.getDeliveryAnnotations().getValue().size());
assertEquals(annotationValue, msgFromSendBuffer.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey)));
// again with higher deliveryCount
DeliveryAnnotations newDeliveryAnnotations2 = new DeliveryAnnotations(new HashMap<>());
final String annotationKey2 = "annotationKey2";
final String annotationValue2 = "annotationValue2";
newDeliveryAnnotations2.getValue().put(Symbol.getSymbol(annotationKey2), annotationValue2);
decoded.setDeliveryAnnotationsForSendBuffer(newDeliveryAnnotations2);
ReadableBuffer sendBuffer2 = decoded.getSendBuffer(5, null);
AMQPStandardMessage msgFromSendBuffer2 = new AMQPStandardMessage(0, sendBuffer2, null, null);
assertEquals(4, msgFromSendBuffer2.getProtonMessage().getHeader().getDeliveryCount().intValue());
assertEquals("someNiceLocal", msgFromSendBuffer2.getAddress());
assertNotNull(msgFromSendBuffer2.getDeliveryAnnotations());
assertEquals(1, msgFromSendBuffer2.getDeliveryAnnotations().getValue().size());
assertEquals(annotationValue2, msgFromSendBuffer2.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey2)));
}
//----- CompositeData handling -------------------------------------------//
@Test
public void testToCompositeDataHeaderSectionDurable() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
// With section missing (defaults false)
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
Object durableObj = cd.get(CompositeDataConstants.DURABLE);
assertTrue(durableObj instanceof Boolean);
assertEquals(Boolean.FALSE, durableObj);
// With section present, but value not set (defaults false)
Header protonHeader = new Header();
protonMessage.setHeader(protonHeader);
decoded = encodeAndDecodeMessage(protonMessage);
cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
durableObj = cd.get(CompositeDataConstants.DURABLE);
assertTrue(durableObj instanceof Boolean);
assertEquals(Boolean.FALSE, durableObj);
// With section present, value set False explicitly
protonHeader = new Header();
protonHeader.setDurable(Boolean.FALSE);
protonMessage.setHeader(protonHeader);
decoded = encodeAndDecodeMessage(protonMessage);
cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
durableObj = cd.get(CompositeDataConstants.DURABLE);
assertTrue(durableObj instanceof Boolean);
assertEquals(Boolean.FALSE, durableObj);
// With section present, value set True explicitly
protonHeader = new Header();
protonHeader.setDurable(Boolean.TRUE);
protonMessage.setHeader(protonHeader);
decoded = encodeAndDecodeMessage(protonMessage);
cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.DURABLE));
durableObj = cd.get(CompositeDataConstants.DURABLE);
assertTrue(durableObj instanceof Boolean);
assertEquals(Boolean.TRUE, durableObj);
}
@Test
public void testToCompositeDataHeaderSectionPriority() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
// With section missing (defaults 4)
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY));
Object priorityObj = cd.get(CompositeDataConstants.PRIORITY);
assertTrue(priorityObj instanceof Byte);
assertEquals(Byte.valueOf((byte) 4), priorityObj);
// With section present, but value not set (defaults 4)
Header protonHeader = new Header();
protonMessage.setHeader(protonHeader);
decoded = encodeAndDecodeMessage(protonMessage);
cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY));
priorityObj = cd.get(CompositeDataConstants.PRIORITY);
assertTrue(priorityObj instanceof Byte);
assertEquals(Byte.valueOf((byte) 4), priorityObj);
// With section present, value set to 5 explicitly
protonHeader = new Header();
protonHeader.setPriority(UnsignedByte.valueOf((byte) 5));
protonMessage.setHeader(protonHeader);
decoded = encodeAndDecodeMessage(protonMessage);
cd = decoded.toCompositeData(0, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY));
priorityObj = cd.get(CompositeDataConstants.PRIORITY);
assertTrue(priorityObj instanceof Byte);
assertEquals(Byte.valueOf((byte) 5), priorityObj);
}
@Test
public void testToCompositeDataPropertiesSection() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
String testContentEncoding = "gzip";
String testGroupId = "testGroupId";
int testGroupSequence = 45678;
String testReplyToGroupId = "testReplyToGroupId";
long testCreationTime = System.currentTimeMillis();
long testExpiryTime = testCreationTime + 5000;
String testSubject = "testSubject";
String testMessageId = "testMessageId";
String testCorrelationId = "testCorrelationId";
Properties protonProperties = new Properties();
protonProperties.setContentType(Symbol.valueOf(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE));
protonProperties.setContentEncoding(Symbol.valueOf(testContentEncoding));
protonProperties.setGroupId(testGroupId);
protonProperties.setGroupSequence(UnsignedInteger.valueOf(testGroupSequence));
protonProperties.setReplyToGroupId(testReplyToGroupId);
protonProperties.setCreationTime(new Date(testCreationTime));
protonProperties.setAbsoluteExpiryTime(new Date(testExpiryTime));
protonProperties.setSubject(testSubject);
protonProperties.setTo(TEST_TO_ADDRESS);
protonProperties.setMessageId(testMessageId);
protonProperties.setCorrelationId(testCorrelationId);
protonMessage.setProperties(protonProperties);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
assertTrue(propsObject instanceof String);
String properties = (String) propsObject;
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "contentType" + "=" + AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "contentEncoding" + "=" + testContentEncoding));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "groupId" + "=" + testGroupId));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "groupSequence" + "=" + testGroupSequence));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "replyToGroupId" + "=" + testReplyToGroupId));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "creationTime" + "=" + testCreationTime));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "absoluteExpiryTime" + "=" + testExpiryTime));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "to" + "=" + TEST_TO_ADDRESS));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "subject" + "=" + testSubject));
assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "correlationId" + "=" + testCorrelationId));
// TODO: should these fields be included in the 'properties' string and tested above?
// Some are shown elsewhere in a way, others missing entirely. Eg:
//
// message-id: included'ish, with an ID: prefix added, as the CompositeDataConstants.USER_ID.
// user-id: not included.
// Some fields of the properties section already align with fields given
// their own top level entries of the CompositeData, which remain:
// The message-id is presented via the 'user id' field, inc an added prefix.
assertTrue(cd.containsKey(CompositeDataConstants.USER_ID));
Object messageIdObj = cd.get(CompositeDataConstants.USER_ID);
assertTrue(messageIdObj instanceof String);
assertEquals(AMQPMessageIdHelper.JMS_ID_PREFIX + testMessageId, messageIdObj);
// The creation-time is duplicated as the 'timestamp' field
assertTrue(cd.containsKey(CompositeDataConstants.TIMESTAMP));
Object timestampObj = cd.get(CompositeDataConstants.TIMESTAMP);
assertTrue(timestampObj instanceof Long);
assertEquals(testCreationTime, timestampObj);
}
@Test
public void testToCompositeDataApplicationPropertiesSection() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
Map<String, Object> appPropsMap = new HashMap<>();
appPropsMap.put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
appPropsMap.put(TEST_APPLICATION_PROPERTY_KEY2, TEST_APPLICATION_PROPERTY_VALUE2);
ApplicationProperties appProps = new ApplicationProperties(appPropsMap);
protonMessage.setApplicationProperties(appProps);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
assertTrue(propsObject instanceof String);
String properties = (String) propsObject;
assertTrue(properties.contains(PROPERTY_MAP_APP_PROPERTIES_PREFIX +
TEST_APPLICATION_PROPERTY_KEY + "=" + TEST_APPLICATION_PROPERTY_VALUE));
assertTrue(properties.contains(PROPERTY_MAP_APP_PROPERTIES_PREFIX +
TEST_APPLICATION_PROPERTY_KEY2 + "=" + TEST_APPLICATION_PROPERTY_VALUE2));
}
@Test
public void testToCompositeDataMessageAnnotationSection() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
Map<Symbol, Object> annotationsMap = new HashMap<>();
annotationsMap.put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
annotationsMap.put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY2), TEST_MESSAGE_ANNOTATION_VALUE2);
MessageAnnotations annotations = new MessageAnnotations(annotationsMap);
protonMessage.setMessageAnnotations(annotations);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
assertTrue(propsObject instanceof String);
String properties = (String) propsObject;
assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
TEST_MESSAGE_ANNOTATION_KEY + "=" + TEST_MESSAGE_ANNOTATION_VALUE));
assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
TEST_MESSAGE_ANNOTATION_KEY2 + "=" + TEST_MESSAGE_ANNOTATION_VALUE2));
// Now try some specific annotations with their own handling
long testIngressTime = System.currentTimeMillis();
long testDeliveryTime = System.currentTimeMillis() + 5678;
long testDeliveryDelay = 6789;
annotationsMap = new HashMap<>();
annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_INGRESS_TIME), testIngressTime);
annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_DELIVERY_TIME), testDeliveryTime);
annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_DELIVERY_DELAY), testDeliveryDelay);
annotations = new MessageAnnotations(annotationsMap);
protonMessage.setMessageAnnotations(annotations);
decoded = encodeAndDecodeMessage(protonMessage);
cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
propsObject = cd.get(CompositeDataConstants.PROPERTIES);
assertTrue(propsObject instanceof String);
properties = (String) propsObject;
assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
AMQPMessageSupport.X_OPT_INGRESS_TIME + "=" + testIngressTime));
assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
AMQPMessageSupport.X_OPT_DELIVERY_TIME + "=" + testDeliveryTime));
assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX +
AMQPMessageSupport.X_OPT_DELIVERY_DELAY + "=" + testDeliveryDelay));
}
@Test
public void testToCompositeDataExtraProperties() throws Exception {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
TypedProperties extraProperties = new TypedProperties();
extraProperties.putProperty(new SimpleString(TEST_EXTRA_PROPERTY_KEY1), TEST_EXTRA_PROPERTY_VALUE1);
extraProperties.putProperty(new SimpleString(TEST_EXTRA_PROPERTY_KEY2), TEST_EXTRA_PROPERTY_VALUE2);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage, extraProperties);
CompositeData cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES));
Object propsObject = cd.get(CompositeDataConstants.PROPERTIES);
assertTrue(propsObject instanceof String);
String properties = (String) propsObject;
assertTrue(properties.contains(PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX +
TEST_EXTRA_PROPERTY_KEY1 + "=" + TEST_EXTRA_PROPERTY_VALUE1));
assertTrue(properties.contains(PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX +
TEST_EXTRA_PROPERTY_KEY2 + "=" + TEST_EXTRA_PROPERTY_VALUE2));
}
@Test
public void testToCompositeDataWithDataBodySectionWithoutContentType() throws Exception {
doToCompositeDataWithDataBodySectionTestImpl(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE);
}
@Test
public void testToCompositeDataWithDataBodySectionWithOctetStreamContentType() throws Exception {
doToCompositeDataWithDataBodySectionTestImpl(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE);
}
@Test
public void testToCompositeDataWithDataBodySectionWithTextPlainContentType() throws Exception {
doToCompositeDataWithDataBodySectionTestImpl("text/plain", org.apache.activemq.artemis.api.core.Message.TEXT_TYPE);
}
@Test
public void testToCompositeDataWithDataBodySectionWithSerializedObjectContentType() throws Exception {
doToCompositeDataWithDataBodySectionTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE);
}
private void doToCompositeDataWithDataBodySectionTestImpl(String contentType, byte expectedMessageType) throws OpenDataException {
Message protonMessage = Message.Factory.create();
// Not the right payload for some of the content types,but it
// doesnt matter, mainly checking type value and for lack of NPEs.
String bytesSource = "testPayloadBytes";
String expectedBodyText = "Data{" + bytesSource + "}";
Data body = new Data(new Binary(bytesSource.getBytes(StandardCharsets.UTF_8)));
protonMessage.setBody(body);
protonMessage.setContentType(contentType);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY));
assertEquals(expectedBodyText, cd.get(CompositeDataConstants.TEXT_BODY));
assertTrue(cd.containsKey(CompositeDataConstants.TYPE));
assertEquals(expectedMessageType, cd.get(CompositeDataConstants.TYPE));
}
@Test
public void testToCompositeDataWithAmqpValueBinaryBodySectionWithoutContentType() throws Exception {
doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE);
}
@Test
public void testToCompositeDataWithAmqpValueBinaryBodySectionWithSerializedObjectContentType() throws Exception {
// Shouldnt really get in this situation, not meant to use content-type without the Data body section.
doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE);
}
private void doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(String contentType, byte expectedMessageType) throws OpenDataException {
Message protonMessage = Message.Factory.create();
// Not the right payload for some of the content types,but it
// doesnt matter, mainly checking type value and for lack of NPEs.
String bytesSource = "testPayloadBytes";
AmqpValue body = new AmqpValue(new Binary(bytesSource.getBytes(StandardCharsets.UTF_8)));
protonMessage.setBody(body);
protonMessage.setContentType(contentType);
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(-1, 0);
assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY));
assertEquals(bytesSource, cd.get(CompositeDataConstants.TEXT_BODY));
assertTrue(cd.containsKey(CompositeDataConstants.TYPE));
assertEquals(expectedMessageType, cd.get(CompositeDataConstants.TYPE));
}
@Test
public void testToCompositeDataWithStringBodyWithoutValueSizeLimit() throws Exception {
doToCompositeDataWithStringBodyValueSizeLimitTestImpl(-1, TEST_STRING_BODY);
}
@Test
public void testToCompositeDataWithStringBodyWithValueSizeLimit() throws Exception {
int limit = 11;
int testBodyLength = TEST_STRING_BODY.length();
assertTrue(testBodyLength > limit);
String expected = TEST_STRING_BODY.substring(0, limit) + ", + " + String.valueOf(testBodyLength - limit) + " more";
doToCompositeDataWithStringBodyValueSizeLimitTestImpl(limit, expected);
}
private void doToCompositeDataWithStringBodyValueSizeLimitTestImpl(int fieldsLimit, String expectedBodyText) throws OpenDataException {
Message protonMessage = Message.Factory.create();
protonMessage.setBody(new AmqpValue(TEST_STRING_BODY));
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
CompositeData cd = decoded.toCompositeData(fieldsLimit, 0);
assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY));
assertEquals(expectedBodyText, cd.get(CompositeDataConstants.TEXT_BODY));
assertTrue(cd.containsKey(CompositeDataConstants.TYPE));
assertEquals(org.apache.activemq.artemis.api.core.Message.TEXT_TYPE, cd.get(CompositeDataConstants.TYPE));
}
@Test
public void testToPropertyMap() throws Exception {
Message protonMessage = Message.Factory.create();
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
TypedProperties props = decoded.createExtraProperties();
props.putSimpleStringProperty(new SimpleString("firstString"), new SimpleString("firstValue"));
props.putLongProperty(new SimpleString("secondLong"), 1234567);
// same as toPropertyMap(false,5)
Map<String, Object> map = decoded.toPropertyMap(-1);
assertEquals(2, map.size());
assertEquals(map.get("firstString"), "firstValue");
assertEquals(map.get("secondLong"), 1234567L);
}
@Test
public void testEncodedAMQPMessageHasReversedHeaderAndDA() throws Exception {
final Header header = new Header();
header.setDurable(true);
final DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
deliveryAnnotations.getValue().put(Symbol.valueOf("test-da"), "test-da");
final MessageAnnotations messageAnnotations = new MessageAnnotations(new HashMap<>());
messageAnnotations.getValue().put(Symbol.valueOf("test-ma"), "test-ma");
final ByteBuf nettyBuffer = Unpooled.buffer(1500);
WritableBuffer buffer = new NettyWritable(nettyBuffer);
final MessageReference reference = Mockito.mock(MessageReference.class);
try {
encoder.setByteBuffer(buffer);
encoder.writeObject(deliveryAnnotations);
encoder.writeObject(header);
encoder.writeObject(messageAnnotations);
} finally {
encoder.setByteBuffer((WritableBuffer) null);
}
final byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
final AMQPMessage message = new AMQPStandardMessage(0, bytes, null);
final ReadableBuffer encoded = message.getSendBuffer(0, reference);
final Message protonMessage = Proton.message();
protonMessage.decode(encoded);
final Header readHeader = protonMessage.getHeader();
final DeliveryAnnotations readDeliveryAnnotations = protonMessage.getDeliveryAnnotations();
final MessageAnnotations readMessageAnnotations = protonMessage.getMessageAnnotations();
assertTrue(readHeader.getDurable());
assertNull(readDeliveryAnnotations);
assertNotNull(readMessageAnnotations);
assertEquals("test-ma", readMessageAnnotations.getValue().get(Symbol.valueOf("test-ma")));
}
@Test
public void testEncodedAMQPMessageHasReversedHeaderAndDAWithOutgoingDeliveryAnnotations() throws Exception {
final Header header = new Header();
header.setDurable(true);
final DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
deliveryAnnotations.getValue().put(Symbol.valueOf("test-da"), "test-da");
final MessageAnnotations messageAnnotations = new MessageAnnotations(new HashMap<>());
messageAnnotations.getValue().put(Symbol.valueOf("test-ma"), "test-ma");
final ByteBuf nettyBuffer = Unpooled.buffer(1500);
WritableBuffer buffer = new NettyWritable(nettyBuffer);
final MessageReference reference = Mockito.mock(MessageReference.class);
final DeliveryAnnotations deliveryAnnotationsOut = new DeliveryAnnotations(new HashMap<>());
deliveryAnnotationsOut.getValue().put(Symbol.valueOf("new-da"), "new-da");
Mockito.when(reference.getProtocolData(DeliveryAnnotations.class)).thenReturn(deliveryAnnotationsOut);
try {
encoder.setByteBuffer(buffer);
encoder.writeObject(deliveryAnnotations);
encoder.writeObject(header);
encoder.writeObject(messageAnnotations);
} finally {
encoder.setByteBuffer((WritableBuffer) null);
}
final byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
final AMQPMessage message = new AMQPStandardMessage(0, bytes, null);
final ReadableBuffer encoded = message.getSendBuffer(0, reference);
final Message protonMessage = Proton.message();
protonMessage.decode(encoded);
final Header readHeader = protonMessage.getHeader();
final DeliveryAnnotations readDeliveryAnnotations = protonMessage.getDeliveryAnnotations();
final MessageAnnotations readMessageAnnotations = protonMessage.getMessageAnnotations();
assertTrue(readHeader.getDurable());
assertNotNull(readDeliveryAnnotations);
assertEquals("new-da", readDeliveryAnnotations.getValue().get(Symbol.valueOf("new-da")));
assertNotNull(readMessageAnnotations);
assertEquals("test-ma", readMessageAnnotations.getValue().get(Symbol.valueOf("test-ma")));
}
//----- Test Support ------------------------------------------------------//
private MessageImpl createProtonMessage() {
MessageImpl message = (MessageImpl) Proton.message();
Header header = new Header();
header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 9));
Properties properties = new Properties();
properties.setCreationTime(new Date(System.currentTimeMillis()));
properties.setTo(TEST_TO_ADDRESS);
properties.setMessageId(UUID.randomUUID());
MessageAnnotations annotations = new MessageAnnotations(new LinkedHashMap<>());
annotations.getValue().put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE);
ApplicationProperties applicationProperties = new ApplicationProperties(new LinkedHashMap<>());
applicationProperties.getValue().put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE);
AmqpValue body = new AmqpValue(TEST_STRING_BODY);
message.setHeader(header);
message.setMessageAnnotations(annotations);
message.setProperties(properties);
message.setApplicationProperties(applicationProperties);
message.setBody(body);
return message;
}
private void assertProtonMessageEquals(MessageImpl left, MessageImpl right) {
if (!isEquals(left, right)) {
fail("MessageImpl values should be equal: left{" + left + "} right{" + right + "}");
}
}
private void assertProtonMessageNotEquals(MessageImpl left, MessageImpl right) {
if (isEquals(left, right)) {
fail("MessageImpl values should be equal: left{" + left + "} right{" + right + "}");
}
}
private boolean isEquals(MessageImpl left, MessageImpl right) {
if (left == null && right == null) {
return true;
}
if (!isNullnessEquals(left, right)) {
return false;
}
try {
assertHeaderEquals(left.getHeader(), right.getHeader());
assertDeliveryAnnotationsEquals(left.getDeliveryAnnotations(), right.getDeliveryAnnotations());
assertMessageAnnotationsEquals(left.getMessageAnnotations(), right.getMessageAnnotations());
assertPropertiesEquals(left.getProperties(), right.getProperties());
assertApplicationPropertiesEquals(left.getApplicationProperties(), right.getApplicationProperties());
assertTrue(isEquals(left.getBody(), right.getBody()));
assertFootersEquals(left.getFooter(), right.getFooter());
} catch (Throwable e) {
return false;
}
return true;
}
private void assertHeaderEquals(Header left, Header right) {
if (!isEquals(left, right)) {
fail("Header values should be equal: left{" + left + "} right{" + right + "}");
}
}
private void assertHeaderNotEquals(Header left, Header right) {
if (isEquals(left, right)) {
fail("Header values should not be equal: left{" + left + "} right{" + right + "}");
}
}
private boolean isEquals(Header left, Header right) {
if (left == null && right == null) {
return true;
}
if (!isNullnessEquals(left, right)) {
return false;
}
try {
assertEquals(left.getDurable(), right.getDurable());
assertEquals(left.getDeliveryCount(), right.getDeliveryCount());
assertEquals(left.getFirstAcquirer(), right.getFirstAcquirer());
assertEquals(left.getPriority(), right.getPriority());
assertEquals(left.getTtl(), right.getTtl());
} catch (Throwable e) {
return false;
}
return true;
}
private void assertPropertiesEquals(Properties left, Properties right) {
if (!isEquals(left, right)) {
fail("Properties values should be equal: left{" + left + "} right{" + right + "}");
}
}
private void assertPropertiesNotEquals(Properties left, Properties right) {
if (isEquals(left, right)) {
fail("Properties values should not be equal: left{" + left + "} right{" + right + "}");
}
}
private boolean isEquals(Properties left, Properties right) {
if (left == null && right == null) {
return true;
}
if (!isNullnessEquals(left, right)) {
return false;
}
try {
assertEquals(left.getAbsoluteExpiryTime(), right.getAbsoluteExpiryTime());
assertEquals(left.getContentEncoding(), right.getAbsoluteExpiryTime());
assertEquals(left.getContentType(), right.getContentType());
assertEquals(left.getCorrelationId(), right.getCorrelationId());
assertEquals(left.getCreationTime(), right.getCreationTime());
assertEquals(left.getGroupId(), right.getGroupId());
assertEquals(left.getGroupSequence(), right.getGroupSequence());
assertEquals(left.getMessageId(), right.getMessageId());
assertEquals(left.getReplyTo(), right.getReplyTo());
assertEquals(left.getReplyToGroupId(), right.getReplyToGroupId());
assertEquals(left.getSubject(), right.getSubject());
assertEquals(left.getUserId(), right.getUserId());
assertEquals(left.getTo(), right.getTo());
} catch (Throwable e) {
return false;
}
return true;
}
private void assertMessageAnnotationsEquals(MessageAnnotations left, MessageAnnotations right) {
if (!isEquals(left, right)) {
fail("MessageAnnotations values should be equal: left{" + left + "} right{" + right + "}");
}
}
private void assertMessageAnnotationsNotEquals(MessageAnnotations left, MessageAnnotations right) {
if (isEquals(left, right)) {
fail("MessageAnnotations values should not be equal: left{" + left + "} right{" + right + "}");
}
}
private boolean isEquals(MessageAnnotations left, MessageAnnotations right) {
if (left == null && right == null) {
return true;
}
if (!isNullnessEquals(left, right)) {
return false;
}
return isEquals(left.getValue(), right.getValue());
}
private void assertDeliveryAnnotationsEquals(DeliveryAnnotations left, DeliveryAnnotations right) {
if (!isEquals(left, right)) {
fail("DeliveryAnnotations values should be equal: left{" + left + "} right{" + right + "}");
}
}
private void assertDeliveryAnnotationsNotEquals(DeliveryAnnotations left, DeliveryAnnotations right) {
if (isEquals(left, right)) {
fail("DeliveryAnnotations values should not be equal: left{" + left + "} right{" + right + "}");
}
}
private boolean isEquals(DeliveryAnnotations left, DeliveryAnnotations right) {
if (left == null && right == null) {
return true;
}
if (!isNullnessEquals(left, right)) {
return false;
}
return isEquals(left.getValue(), right.getValue());
}
private void assertApplicationPropertiesEquals(ApplicationProperties left, ApplicationProperties right) {
if (!isEquals(left, right)) {
fail("ApplicationProperties values should be equal: left{" + left + "} right{" + right + "}");
}
}
private void assertApplicationPropertiesNotEquals(ApplicationProperties left, ApplicationProperties right) {
if (isEquals(left, right)) {
fail("ApplicationProperties values should not be equal: left{" + left + "} right{" + right + "}");
}
}
private boolean isEquals(ApplicationProperties left, ApplicationProperties right) {
if (left == null && right == null) {
return true;
}
if (!isNullnessEquals(left, right)) {
return false;
}
return isEquals(left.getValue(), right.getValue());
}
private void assertFootersEquals(Footer left, Footer right) {
if (!isEquals(left, right)) {
fail("Footer values should be equal: left{" + left + "} right{" + right + "}");
}
}
private void assertFootersNotEquals(Footer left, Footer right) {
if (isEquals(left, right)) {
fail("Footer values should not be equal: left{" + left + "} right{" + right + "}");
}
}
private boolean isEquals(Footer left, Footer right) {
if (left == null && right == null) {
return true;
}
if (!isNullnessEquals(left, right)) {
return false;
}
return isEquals(left.getValue(), right.getValue());
}
private boolean isEquals(Map<?, ?> left, Map<?, ?> right) {
if (left == null && right == null) {
return true;
}
if (!isNullnessEquals(left, right)) {
return false;
}
if (left.size() != right.size()) {
return false;
}
for (Object leftKey : left.keySet()) {
assertEquals(right.get(leftKey), left.get(leftKey));
}
for (Object rightKey : right.keySet()) {
assertEquals(left.get(rightKey), right.get(rightKey));
}
return true;
}
@SuppressWarnings("unchecked")
private boolean isEquals(Section left, Section right) {
if (left == null && right == null) {
return true;
}
if (!isNullnessEquals(left, right)) {
return false;
}
assertTrue(left.getClass().equals(right.getClass()));
if (left instanceof AmqpValue) {
AmqpValue leftValue = (AmqpValue) left;
AmqpValue rightValue = (AmqpValue) right;
if (leftValue.getValue() == null && rightValue.getValue() == null) {
return true;
}
if (!isNullnessEquals(leftValue.getValue(), rightValue.getValue())) {
return false;
}
assertEquals(leftValue.getValue(), rightValue.getValue());
} else if (left instanceof AmqpSequence) {
AmqpSequence leftValue = (AmqpSequence) left;
AmqpSequence rightValue = (AmqpSequence) right;
if (leftValue.getValue() == null && rightValue.getValue() == null) {
return true;
}
if (!isNullnessEquals(leftValue.getValue(), rightValue.getValue())) {
return false;
}
List<Object> leftList = leftValue.getValue();
List<Object> rightList = leftValue.getValue();
assertEquals(leftList.size(), rightList.size());
for (int i = 0; i < leftList.size(); ++i) {
assertEquals(leftList.get(i), rightList.get(i));
}
} else if (left instanceof Data) {
Data leftValue = (Data) left;
Data rightValue = (Data) right;
if (leftValue.getValue() == null && rightValue.getValue() == null) {
return true;
}
if (!isNullnessEquals(leftValue.getValue(), rightValue.getValue())) {
return false;
}
byte[] leftArray = leftValue.getValue().getArray();
byte[] rightArray = rightValue.getValue().getArray();
if (leftArray == null && rightArray == null) {
return true;
}
if (!isNullnessEquals(leftArray, rightArray)) {
return false;
}
assertArrayEquals(leftArray, rightArray);
} else {
return false;
}
return true;
}
private boolean isNullnessEquals(Object left, Object right) {
if (left == null && right != null) {
return false;
}
if (left != null && right == null) {
return false;
}
return true;
}
private ActiveMQBuffer encodeMessageAsPersistedBuffer(MessageImpl message) {
ByteBuf nettyBuffer = Unpooled.buffer(1500);
message.encode(new NettyWritable(nettyBuffer));
byte[] bytes = new byte[nettyBuffer.writerIndex() + Integer.BYTES];
nettyBuffer.readBytes(bytes, Integer.BYTES, nettyBuffer.readableBytes());
ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(bytes);
buffer.writerIndex(0);
buffer.writeInt(bytes.length - Integer.BYTES);
buffer.setIndex(0, bytes.length);
return buffer;
}
private byte[] encodeMessage(MessageImpl message) {
ByteBuf nettyBuffer = Unpooled.buffer(1500);
message.encode(new NettyWritable(nettyBuffer));
byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
return bytes;
}
private AMQPStandardMessage encodeAndDecodeMessage(Message message) {
return encodeAndDecodeMessage(message, null);
}
private AMQPStandardMessage encodeAndDecodeMessage(Message message, TypedProperties extraProperties) {
ByteBuf nettyBuffer = Unpooled.buffer(1500);
message.encode(new NettyWritable(nettyBuffer));
byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
return new AMQPStandardMessage(0, bytes, extraProperties);
}
}