blob: 3f14b308ecf86d6ac11b54325128cbbca2150590 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.converter;
import static org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP;
import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_SEQUENCE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_BINARY;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_LIST;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_MAP;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_NULL;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_STRING;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createBytesMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMapMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createObjectMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createStreamMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createTextMessage;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.isContentType;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.ConversionException;
import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.CoreMessageWrapper;
import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.CoreStreamMessageWrapper;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Decimal128;
import org.apache.qpid.proton.amqp.Decimal32;
import org.apache.qpid.proton.amqp.Decimal64;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.UnsignedShort;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.WritableBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.NON_PERSISTENT;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.PERSISTENT;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.MESSAGE_DEFAULT_PRIORITY;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.MESSAGE_DEFAULT_TIME_TO_LIVE;
import static org.apache.activemq.artemis.utils.DestinationUtil.QUEUE_QUALIFIED_PREFIX;
import static org.apache.activemq.artemis.utils.DestinationUtil.TEMP_QUEUE_QUALIFED_PREFIX;
import static org.apache.activemq.artemis.utils.DestinationUtil.TEMP_TOPIC_QUALIFED_PREFIX;
import static org.apache.activemq.artemis.utils.DestinationUtil.TOPIC_QUALIFIED_PREFIX;
/**
* This class was created just to separate concerns on AMQPConverter.
* For better organization of the code.
* */
public class AmqpCoreConverter {
public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
return message.toCore(coreMessageObjectPools);
}
@SuppressWarnings("unchecked")
public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools, Header header, MessageAnnotations annotations, Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) throws ConversionException {
final long messageId = message.getMessageID();
final Symbol contentType = properties != null ? properties.getContentType() : null;
final String contentTypeString = contentType != null ? contentType.toString() : null;
CoreMessageWrapper result;
if (body == null) {
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType) || isContentType(null, contentType)) {
result = createBytesMessage(messageId, coreMessageObjectPools);
} else {
Charset charset = getCharsetForTextualContent(contentTypeString);
if (charset != null) {
result = createTextMessage(messageId, coreMessageObjectPools);
} else {
result = createMessage(messageId, coreMessageObjectPools);
}
}
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
} else if (body instanceof Data) {
Binary payload = ((Data) body).getValue();
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType)) {
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else {
Charset charset = getCharsetForTextualContent(contentTypeString);
if (StandardCharsets.UTF_8.equals(charset)) {
ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
try {
CharBuffer chars = charset.newDecoder().decode(buf);
result = createTextMessage(messageId, String.valueOf(chars), coreMessageObjectPools);
} catch (CharacterCodingException e) {
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
} else {
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
}
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
} else if (body instanceof AmqpSequence) {
AmqpSequence sequence = (AmqpSequence) body;
CoreStreamMessageWrapper m = createStreamMessage(messageId, coreMessageObjectPools);
for (Object item : sequence.getValue()) {
m.writeObject(item);
}
result = m;
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
} else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue();
if (value == null || value instanceof String) {
result = createTextMessage(messageId, (String) value, coreMessageObjectPools);
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
} else if (value instanceof Binary) {
Binary payload = (Binary) value;
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, payload, coreMessageObjectPools);
} else {
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
} else if (value instanceof List) {
CoreStreamMessageWrapper m = createStreamMessage(messageId, coreMessageObjectPools);
for (Object item : (List<Object>) value) {
m.writeObject(item);
}
result = m;
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
} else if (value instanceof Map) {
result = createMapMessage(messageId, (Map<String, Object>) value, coreMessageObjectPools);
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
} else {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
try {
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
TLSEncode.getEncoder().writeObject(body);
result = createBytesMessage(messageId, buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
} finally {
buf.release();
TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
}
}
} else {
throw new RuntimeException("Unexpected body type: " + body.getClass());
}
// Initialize the JMS expiration with the value calculated during the scan of the AMQP message
// to avoid a different value for each conversion based on System.currentTimeMillis() and ttl.
result.setJMSExpiration(message.getExpiration());
processHeader(result, header);
processMessageAnnotations(result, annotations);
processApplicationProperties(result, applicationProperties);
processProperties(result, properties, annotations);
processFooter(result, footer);
processExtraProperties(result, message.getExtraProperties());
// If the JMS expiration has not yet been set...
if (header != null && result.getExpiration() == 0) {
// Then lets try to set it based on the message TTL.
long ttl = MESSAGE_DEFAULT_TIME_TO_LIVE;
if (header.getTtl() != null) {
ttl = header.getTtl().longValue();
}
if (ttl == 0) {
result.setJMSExpiration(0);
} else {
result.setJMSExpiration(System.currentTimeMillis() + ttl);
}
}
result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority());
result.getInnerMessage().setAddress(message.getAddressSimpleString());
result.getInnerMessage().setRoutingType(message.getRoutingType());
result.encode();
return result.getInnerMessage();
}
protected static CoreMessageWrapper processHeader(CoreMessageWrapper jms, Header header) throws ConversionException {
if (header != null) {
jms.setBooleanProperty(JMS_AMQP_HEADER, true);
if (header.getDurable() != null) {
jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true);
jms.setDeliveryMode(header.getDurable().booleanValue() ? PERSISTENT : NON_PERSISTENT);
} else {
jms.setDeliveryMode(NON_PERSISTENT);
}
if (header.getPriority() != null) {
jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true);
jms.setJMSPriority(header.getPriority().intValue());
} else {
jms.setJMSPriority(MESSAGE_DEFAULT_PRIORITY);
}
if (header.getFirstAcquirer() != null) {
jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer());
}
if (header.getDeliveryCount() != null) {
// AMQP Delivery Count counts only failed delivers where JMS
// Delivery Count should include the original delivery in the count.
jms.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue() + 1);
}
} else {
jms.setJMSPriority((byte) MESSAGE_DEFAULT_PRIORITY);
jms.setDeliveryMode(NON_PERSISTENT);
}
return jms;
}
protected static CoreMessageWrapper processMessageAnnotations(CoreMessageWrapper jms, MessageAnnotations annotations) {
if (annotations != null && annotations.getValue() != null) {
for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
String key = entry.getKey().toString();
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
long deliveryTime = ((Number) entry.getValue()).longValue();
jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime);
} else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
long delay = ((Number) entry.getValue()).longValue();
if (delay > 0) {
jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay);
}
} else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) {
jms.setLongProperty(HDR_INGRESS_TIMESTAMP.toString(), ((Number) entry.getValue()).longValue());
}
try {
setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
} catch (ActiveMQPropertyConversionException e) {
encodeUnsupportedMessagePropertyType(jms, JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
}
}
}
return jms;
}
private static CoreMessageWrapper processApplicationProperties(CoreMessageWrapper jms, ApplicationProperties properties) {
if (properties != null && properties.getValue() != null) {
for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) properties.getValue().entrySet()) {
setProperty(jms, entry.getKey(), entry.getValue());
}
}
return jms;
}
private static CoreMessageWrapper processExtraProperties(CoreMessageWrapper jms, TypedProperties properties) {
if (properties != null) {
properties.forEach((k, v) -> {
if (!k.equals(AMQPMessage.ADDRESS_PROPERTY)) {
jms.getInnerMessage().putObjectProperty(k, v);
}
});
}
return jms;
}
private static CoreMessageWrapper processProperties(CoreMessageWrapper jms, Properties properties, MessageAnnotations annotations) {
if (properties != null) {
if (properties.getMessageId() != null) {
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId()));
//core jms clients get JMSMessageID from UserID which is a UUID object
if (properties.getMessageId() instanceof UUID) {
//AMQP's message ID can be a UUID, keep it
jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().fromJavaUUID((UUID) properties.getMessageId()));
} else {
jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().generateUUID());
}
}
Binary userId = properties.getUserId();
if (userId != null) {
jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
}
if (properties.getTo() != null) {
byte queueType = parseQueueAnnotation(annotations, AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION);
jms.setDestination(properties.getTo());
}
if (properties.getSubject() != null) {
jms.setJMSType(properties.getSubject());
}
if (properties.getReplyTo() != null) {
byte value = parseQueueAnnotation(annotations, AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION);
switch (value) {
case AMQPMessageSupport.QUEUE_TYPE:
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), QUEUE_QUALIFIED_PREFIX + properties.getReplyTo());
break;
case AMQPMessageSupport.TEMP_QUEUE_TYPE:
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), TEMP_QUEUE_QUALIFED_PREFIX + properties.getReplyTo());
break;
case AMQPMessageSupport.TOPIC_TYPE:
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), TOPIC_QUALIFIED_PREFIX + properties.getReplyTo());
break;
case AMQPMessageSupport.TEMP_TOPIC_TYPE:
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), TEMP_TOPIC_QUALIFED_PREFIX + properties.getReplyTo());
break;
default:
org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), QUEUE_QUALIFIED_PREFIX + properties.getReplyTo());
break;
}
}
Object correlationID = properties.getCorrelationId();
if (correlationID != null) {
try {
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID));
} catch (IllegalArgumentException e) {
jms.getInnerMessage().setCorrelationID(String.valueOf(correlationID));
}
}
if (properties.getContentType() != null) {
jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString());
}
if (properties.getContentEncoding() != null) {
jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString());
}
if (properties.getCreationTime() != null) {
jms.setJMSTimestamp(properties.getCreationTime().getTime());
}
if (properties.getGroupId() != null) {
jms.setStringProperty("_AMQ_GROUP_ID", properties.getGroupId());
}
if (properties.getGroupSequence() != null) {
jms.setIntProperty("JMSXGroupSeq", properties.getGroupSequence().intValue());
}
if (properties.getReplyToGroupId() != null) {
jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId());
}
if (properties.getAbsoluteExpiryTime() != null) {
jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
}
}
return jms;
}
private static byte parseQueueAnnotation(MessageAnnotations annotations, Symbol symbol) {
Object value = (annotations != null && annotations.getValue() != null ? annotations.getValue().get(symbol) : AMQPMessageSupport.QUEUE_TYPE);
byte queueType;
if (value == null || !(value instanceof Number)) {
queueType = AMQPMessageSupport.QUEUE_TYPE;
} else {
queueType = ((Number)value).byteValue();
}
return queueType;
}
@SuppressWarnings("unchecked")
private static CoreMessageWrapper processFooter(CoreMessageWrapper jms, Footer footer) {
if (footer != null && footer.getValue() != null) {
for (Map.Entry<Symbol, Object> entry : (Set<Map.Entry<Symbol, Object>>) footer.getValue().entrySet()) {
String key = entry.getKey().toString();
try {
setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
} catch (ActiveMQPropertyConversionException e) {
encodeUnsupportedMessagePropertyType(jms, JMS_AMQP_ENCODED_FOOTER_PREFIX + key, entry.getValue());
}
}
}
return jms;
}
private static void encodeUnsupportedMessagePropertyType(CoreMessageWrapper jms, String key, Object value) {
final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer();
final EncoderImpl encoder = TLSEncode.getEncoder();
try {
encoder.setByteBuffer(new NettyWritable(buffer));
encoder.writeObject(value);
final byte[] encodedBytes = new byte[buffer.writerIndex()];
buffer.readBytes(encodedBytes);
setProperty(jms, key, encodedBytes);
} finally {
encoder.setByteBuffer((WritableBuffer) null);
buffer.release();
}
}
private static void setProperty(CoreMessageWrapper msg, String key, Object value) {
if (value instanceof UnsignedLong) {
long v = ((UnsignedLong) value).longValue();
msg.setLongProperty(key, v);
} else if (value instanceof UnsignedInteger) {
long v = ((UnsignedInteger) value).longValue();
if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) {
msg.setIntProperty(key, (int) v);
} else {
msg.setLongProperty(key, v);
}
} else if (value instanceof UnsignedShort) {
int v = ((UnsignedShort) value).intValue();
if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) {
msg.setShortProperty(key, (short) v);
} else {
msg.setIntProperty(key, v);
}
} else if (value instanceof UnsignedByte) {
short v = ((UnsignedByte) value).shortValue();
if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) {
msg.setByteProperty(key, (byte) v);
} else {
msg.setShortProperty(key, v);
}
} else if (value instanceof Symbol) {
msg.setStringProperty(key, value.toString());
} else if (value instanceof Decimal128) {
msg.setDoubleProperty(key, ((Decimal128) value).doubleValue());
} else if (value instanceof Decimal64) {
msg.setDoubleProperty(key, ((Decimal64) value).doubleValue());
} else if (value instanceof Decimal32) {
msg.setFloatProperty(key, ((Decimal32) value).floatValue());
} else if (value instanceof Binary) {
Binary bin = (Binary) value;
msg.setObjectProperty(key, Arrays.copyOfRange(bin.getArray(), bin.getArrayOffset(), bin.getLength()));
} else {
msg.setObjectProperty(key, value);
}
}
}