| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.activemq.artemis.protocol.amqp.converter; |
| |
| import static org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP; |
| import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_SEQUENCE; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_BINARY; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_LIST; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_MAP; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_NULL; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_VALUE_STRING; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createBytesMessage; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMapMessage; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMessage; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createObjectMessage; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createStreamMessage; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createTextMessage; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.isContentType; |
| |
| import java.nio.ByteBuffer; |
| import java.nio.CharBuffer; |
| import java.nio.charset.CharacterCodingException; |
| import java.nio.charset.Charset; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| |
| import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; |
| import org.apache.activemq.artemis.api.core.ICoreMessage; |
| import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; |
| import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; |
| import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.ConversionException; |
| import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.CoreMessageWrapper; |
| import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.CoreStreamMessageWrapper; |
| import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; |
| import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; |
| import org.apache.activemq.artemis.utils.UUIDGenerator; |
| import org.apache.activemq.artemis.utils.collections.TypedProperties; |
| import org.apache.qpid.proton.amqp.Binary; |
| import org.apache.qpid.proton.amqp.Decimal128; |
| import org.apache.qpid.proton.amqp.Decimal32; |
| import org.apache.qpid.proton.amqp.Decimal64; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.UnsignedByte; |
| import org.apache.qpid.proton.amqp.UnsignedInteger; |
| import org.apache.qpid.proton.amqp.UnsignedLong; |
| import org.apache.qpid.proton.amqp.UnsignedShort; |
| import org.apache.qpid.proton.amqp.messaging.AmqpSequence; |
| import org.apache.qpid.proton.amqp.messaging.AmqpValue; |
| import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; |
| import org.apache.qpid.proton.amqp.messaging.Data; |
| import org.apache.qpid.proton.amqp.messaging.Footer; |
| import org.apache.qpid.proton.amqp.messaging.Header; |
| import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; |
| import org.apache.qpid.proton.amqp.messaging.Properties; |
| import org.apache.qpid.proton.amqp.messaging.Section; |
| import org.apache.qpid.proton.codec.EncoderImpl; |
| import org.apache.qpid.proton.codec.WritableBuffer; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.PooledByteBufAllocator; |
| |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.NON_PERSISTENT; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.PERSISTENT; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.MESSAGE_DEFAULT_PRIORITY; |
| import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.MESSAGE_DEFAULT_TIME_TO_LIVE; |
| import static org.apache.activemq.artemis.utils.DestinationUtil.QUEUE_QUALIFIED_PREFIX; |
| import static org.apache.activemq.artemis.utils.DestinationUtil.TEMP_QUEUE_QUALIFED_PREFIX; |
| import static org.apache.activemq.artemis.utils.DestinationUtil.TEMP_TOPIC_QUALIFED_PREFIX; |
| import static org.apache.activemq.artemis.utils.DestinationUtil.TOPIC_QUALIFIED_PREFIX; |
| |
| /** |
| * This class was created just to separate concerns on AMQPConverter. |
| * For better organization of the code. |
| * */ |
| public class AmqpCoreConverter { |
| public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools) throws Exception { |
| return message.toCore(coreMessageObjectPools); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools, Header header, MessageAnnotations annotations, Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) throws ConversionException { |
| final long messageId = message.getMessageID(); |
| final Symbol contentType = properties != null ? properties.getContentType() : null; |
| final String contentTypeString = contentType != null ? contentType.toString() : null; |
| |
| CoreMessageWrapper result; |
| |
| if (body == null) { |
| if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) { |
| result = createObjectMessage(messageId, coreMessageObjectPools); |
| } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType) || isContentType(null, contentType)) { |
| result = createBytesMessage(messageId, coreMessageObjectPools); |
| } else { |
| Charset charset = getCharsetForTextualContent(contentTypeString); |
| if (charset != null) { |
| result = createTextMessage(messageId, coreMessageObjectPools); |
| } else { |
| result = createMessage(messageId, coreMessageObjectPools); |
| } |
| } |
| |
| result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL); |
| } else if (body instanceof Data) { |
| Binary payload = ((Data) body).getValue(); |
| |
| if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) { |
| result = createObjectMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); |
| } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType)) { |
| result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); |
| } else { |
| Charset charset = getCharsetForTextualContent(contentTypeString); |
| if (StandardCharsets.UTF_8.equals(charset)) { |
| ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength()); |
| |
| try { |
| CharBuffer chars = charset.newDecoder().decode(buf); |
| result = createTextMessage(messageId, String.valueOf(chars), coreMessageObjectPools); |
| } catch (CharacterCodingException e) { |
| result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); |
| } |
| } else { |
| result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); |
| } |
| } |
| |
| result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); |
| } else if (body instanceof AmqpSequence) { |
| AmqpSequence sequence = (AmqpSequence) body; |
| CoreStreamMessageWrapper m = createStreamMessage(messageId, coreMessageObjectPools); |
| for (Object item : sequence.getValue()) { |
| m.writeObject(item); |
| } |
| |
| result = m; |
| result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE); |
| } else if (body instanceof AmqpValue) { |
| Object value = ((AmqpValue) body).getValue(); |
| if (value == null || value instanceof String) { |
| result = createTextMessage(messageId, (String) value, coreMessageObjectPools); |
| |
| result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING); |
| } else if (value instanceof Binary) { |
| Binary payload = (Binary) value; |
| |
| if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) { |
| result = createObjectMessage(messageId, payload, coreMessageObjectPools); |
| } else { |
| result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools); |
| } |
| |
| result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); |
| } else if (value instanceof List) { |
| CoreStreamMessageWrapper m = createStreamMessage(messageId, coreMessageObjectPools); |
| for (Object item : (List<Object>) value) { |
| m.writeObject(item); |
| } |
| result = m; |
| result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST); |
| } else if (value instanceof Map) { |
| result = createMapMessage(messageId, (Map<String, Object>) value, coreMessageObjectPools); |
| result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP); |
| } else { |
| ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); |
| try { |
| TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf)); |
| TLSEncode.getEncoder().writeObject(body); |
| result = createBytesMessage(messageId, buf.array(), 0, buf.writerIndex(), coreMessageObjectPools); |
| } finally { |
| buf.release(); |
| TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null); |
| } |
| } |
| } else { |
| throw new RuntimeException("Unexpected body type: " + body.getClass()); |
| } |
| |
| // Initialize the JMS expiration with the value calculated during the scan of the AMQP message |
| // to avoid a different value for each conversion based on System.currentTimeMillis() and ttl. |
| result.setJMSExpiration(message.getExpiration()); |
| |
| processHeader(result, header); |
| processMessageAnnotations(result, annotations); |
| processApplicationProperties(result, applicationProperties); |
| processProperties(result, properties, annotations); |
| processFooter(result, footer); |
| processExtraProperties(result, message.getExtraProperties()); |
| |
| // If the JMS expiration has not yet been set... |
| if (header != null && result.getExpiration() == 0) { |
| // Then lets try to set it based on the message TTL. |
| long ttl = MESSAGE_DEFAULT_TIME_TO_LIVE; |
| if (header.getTtl() != null) { |
| ttl = header.getTtl().longValue(); |
| } |
| |
| if (ttl == 0) { |
| result.setJMSExpiration(0); |
| } else { |
| result.setJMSExpiration(System.currentTimeMillis() + ttl); |
| } |
| } |
| |
| result.getInnerMessage().setDurable(message.isDurable()); |
| result.getInnerMessage().setPriority(message.getPriority()); |
| result.getInnerMessage().setAddress(message.getAddressSimpleString()); |
| result.getInnerMessage().setRoutingType(message.getRoutingType()); |
| result.encode(); |
| |
| return result.getInnerMessage(); |
| } |
| |
| protected static CoreMessageWrapper processHeader(CoreMessageWrapper jms, Header header) throws ConversionException { |
| if (header != null) { |
| jms.setBooleanProperty(JMS_AMQP_HEADER, true); |
| |
| if (header.getDurable() != null) { |
| jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true); |
| jms.setDeliveryMode(header.getDurable().booleanValue() ? PERSISTENT : NON_PERSISTENT); |
| } else { |
| jms.setDeliveryMode(NON_PERSISTENT); |
| } |
| |
| if (header.getPriority() != null) { |
| jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true); |
| jms.setJMSPriority(header.getPriority().intValue()); |
| } else { |
| jms.setJMSPriority(MESSAGE_DEFAULT_PRIORITY); |
| } |
| |
| if (header.getFirstAcquirer() != null) { |
| jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer()); |
| } |
| |
| if (header.getDeliveryCount() != null) { |
| // AMQP Delivery Count counts only failed delivers where JMS |
| // Delivery Count should include the original delivery in the count. |
| jms.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue() + 1); |
| } |
| } else { |
| jms.setJMSPriority((byte) MESSAGE_DEFAULT_PRIORITY); |
| jms.setDeliveryMode(NON_PERSISTENT); |
| } |
| |
| return jms; |
| } |
| |
| protected static CoreMessageWrapper processMessageAnnotations(CoreMessageWrapper jms, MessageAnnotations annotations) { |
| if (annotations != null && annotations.getValue() != null) { |
| for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) { |
| String key = entry.getKey().toString(); |
| if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { |
| long deliveryTime = ((Number) entry.getValue()).longValue(); |
| jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime); |
| } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { |
| long delay = ((Number) entry.getValue()).longValue(); |
| if (delay > 0) { |
| jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay); |
| } |
| } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) { |
| jms.setLongProperty(HDR_INGRESS_TIMESTAMP.toString(), ((Number) entry.getValue()).longValue()); |
| } |
| |
| try { |
| setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue()); |
| } catch (ActiveMQPropertyConversionException e) { |
| encodeUnsupportedMessagePropertyType(jms, JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue()); |
| } |
| } |
| } |
| |
| return jms; |
| } |
| |
| private static CoreMessageWrapper processApplicationProperties(CoreMessageWrapper jms, ApplicationProperties properties) { |
| if (properties != null && properties.getValue() != null) { |
| for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) properties.getValue().entrySet()) { |
| setProperty(jms, entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| return jms; |
| } |
| |
| private static CoreMessageWrapper processExtraProperties(CoreMessageWrapper jms, TypedProperties properties) { |
| if (properties != null) { |
| properties.forEach((k, v) -> { |
| if (!k.equals(AMQPMessage.ADDRESS_PROPERTY)) { |
| jms.getInnerMessage().putObjectProperty(k, v); |
| } |
| }); |
| } |
| |
| return jms; |
| } |
| |
| private static CoreMessageWrapper processProperties(CoreMessageWrapper jms, Properties properties, MessageAnnotations annotations) { |
| if (properties != null) { |
| if (properties.getMessageId() != null) { |
| jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId())); |
| //core jms clients get JMSMessageID from UserID which is a UUID object |
| if (properties.getMessageId() instanceof UUID) { |
| //AMQP's message ID can be a UUID, keep it |
| jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().fromJavaUUID((UUID) properties.getMessageId())); |
| } else { |
| jms.getInnerMessage().setUserID(UUIDGenerator.getInstance().generateUUID()); |
| } |
| } |
| |
| Binary userId = properties.getUserId(); |
| if (userId != null) { |
| jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8)); |
| } |
| if (properties.getTo() != null) { |
| byte queueType = parseQueueAnnotation(annotations, AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION); |
| jms.setDestination(properties.getTo()); |
| } |
| if (properties.getSubject() != null) { |
| jms.setJMSType(properties.getSubject()); |
| } |
| if (properties.getReplyTo() != null) { |
| byte value = parseQueueAnnotation(annotations, AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION); |
| |
| switch (value) { |
| case AMQPMessageSupport.QUEUE_TYPE: |
| org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), QUEUE_QUALIFIED_PREFIX + properties.getReplyTo()); |
| break; |
| case AMQPMessageSupport.TEMP_QUEUE_TYPE: |
| org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), TEMP_QUEUE_QUALIFED_PREFIX + properties.getReplyTo()); |
| break; |
| case AMQPMessageSupport.TOPIC_TYPE: |
| org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), TOPIC_QUALIFIED_PREFIX + properties.getReplyTo()); |
| break; |
| case AMQPMessageSupport.TEMP_TOPIC_TYPE: |
| org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), TEMP_TOPIC_QUALIFED_PREFIX + properties.getReplyTo()); |
| break; |
| default: |
| org.apache.activemq.artemis.reader.MessageUtil.setJMSReplyTo(jms.getInnerMessage(), QUEUE_QUALIFIED_PREFIX + properties.getReplyTo()); |
| break; |
| } |
| } |
| Object correlationID = properties.getCorrelationId(); |
| |
| if (correlationID != null) { |
| try { |
| jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID)); |
| } catch (IllegalArgumentException e) { |
| jms.getInnerMessage().setCorrelationID(String.valueOf(correlationID)); |
| } |
| } |
| if (properties.getContentType() != null) { |
| jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString()); |
| } |
| if (properties.getContentEncoding() != null) { |
| jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString()); |
| } |
| if (properties.getCreationTime() != null) { |
| jms.setJMSTimestamp(properties.getCreationTime().getTime()); |
| } |
| if (properties.getGroupId() != null) { |
| jms.setStringProperty("_AMQ_GROUP_ID", properties.getGroupId()); |
| } |
| if (properties.getGroupSequence() != null) { |
| jms.setIntProperty("JMSXGroupSeq", properties.getGroupSequence().intValue()); |
| } |
| if (properties.getReplyToGroupId() != null) { |
| jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId()); |
| } |
| if (properties.getAbsoluteExpiryTime() != null) { |
| jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); |
| } |
| } |
| return jms; |
| } |
| |
| private static byte parseQueueAnnotation(MessageAnnotations annotations, Symbol symbol) { |
| Object value = (annotations != null && annotations.getValue() != null ? annotations.getValue().get(symbol) : AMQPMessageSupport.QUEUE_TYPE); |
| |
| byte queueType; |
| if (value == null || !(value instanceof Number)) { |
| queueType = AMQPMessageSupport.QUEUE_TYPE; |
| } else { |
| queueType = ((Number)value).byteValue(); |
| } |
| return queueType; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static CoreMessageWrapper processFooter(CoreMessageWrapper jms, Footer footer) { |
| if (footer != null && footer.getValue() != null) { |
| for (Map.Entry<Symbol, Object> entry : (Set<Map.Entry<Symbol, Object>>) footer.getValue().entrySet()) { |
| String key = entry.getKey().toString(); |
| try { |
| setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue()); |
| } catch (ActiveMQPropertyConversionException e) { |
| encodeUnsupportedMessagePropertyType(jms, JMS_AMQP_ENCODED_FOOTER_PREFIX + key, entry.getValue()); |
| } |
| } |
| } |
| |
| return jms; |
| } |
| |
| private static void encodeUnsupportedMessagePropertyType(CoreMessageWrapper jms, String key, Object value) { |
| final ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(); |
| final EncoderImpl encoder = TLSEncode.getEncoder(); |
| |
| try { |
| encoder.setByteBuffer(new NettyWritable(buffer)); |
| encoder.writeObject(value); |
| |
| final byte[] encodedBytes = new byte[buffer.writerIndex()]; |
| |
| buffer.readBytes(encodedBytes); |
| |
| setProperty(jms, key, encodedBytes); |
| } finally { |
| encoder.setByteBuffer((WritableBuffer) null); |
| buffer.release(); |
| } |
| } |
| |
| private static void setProperty(CoreMessageWrapper msg, String key, Object value) { |
| if (value instanceof UnsignedLong) { |
| long v = ((UnsignedLong) value).longValue(); |
| msg.setLongProperty(key, v); |
| } else if (value instanceof UnsignedInteger) { |
| long v = ((UnsignedInteger) value).longValue(); |
| if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) { |
| msg.setIntProperty(key, (int) v); |
| } else { |
| msg.setLongProperty(key, v); |
| } |
| } else if (value instanceof UnsignedShort) { |
| int v = ((UnsignedShort) value).intValue(); |
| if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) { |
| msg.setShortProperty(key, (short) v); |
| } else { |
| msg.setIntProperty(key, v); |
| } |
| } else if (value instanceof UnsignedByte) { |
| short v = ((UnsignedByte) value).shortValue(); |
| if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) { |
| msg.setByteProperty(key, (byte) v); |
| } else { |
| msg.setShortProperty(key, v); |
| } |
| } else if (value instanceof Symbol) { |
| msg.setStringProperty(key, value.toString()); |
| } else if (value instanceof Decimal128) { |
| msg.setDoubleProperty(key, ((Decimal128) value).doubleValue()); |
| } else if (value instanceof Decimal64) { |
| msg.setDoubleProperty(key, ((Decimal64) value).doubleValue()); |
| } else if (value instanceof Decimal32) { |
| msg.setFloatProperty(key, ((Decimal32) value).floatValue()); |
| } else if (value instanceof Binary) { |
| Binary bin = (Binary) value; |
| msg.setObjectProperty(key, Arrays.copyOfRange(bin.getArray(), bin.getArrayOffset(), bin.getLength())); |
| } else { |
| msg.setObjectProperty(key, value); |
| } |
| } |
| } |