| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.transport.amqp.client; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| |
| import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy; |
| import org.apache.qpid.proton.Proton; |
| import org.apache.qpid.proton.amqp.Binary; |
| import org.apache.qpid.proton.amqp.DescribedType; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.messaging.AmqpValue; |
| import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; |
| import org.apache.qpid.proton.amqp.messaging.Data; |
| import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; |
| import org.apache.qpid.proton.amqp.messaging.Header; |
| import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; |
| import org.apache.qpid.proton.amqp.messaging.Properties; |
| import org.apache.qpid.proton.amqp.messaging.Section; |
| import org.apache.qpid.proton.engine.Delivery; |
| import org.apache.qpid.proton.message.Message; |
| |
| public class AmqpMessage { |
| |
| private final AmqpReceiver receiver; |
| private final Message message; |
| private final Delivery delivery; |
| |
| private Map<Symbol, Object> deliveryAnnotationsMap; |
| private Map<Symbol, Object> messageAnnotationsMap; |
| private Map<String, Object> applicationPropertiesMap; |
| |
| /** |
| * Creates a new AmqpMessage that wraps the information necessary to handle |
| * an outgoing message. |
| */ |
| public AmqpMessage() { |
| receiver = null; |
| delivery = null; |
| |
| message = Proton.message(); |
| } |
| |
| /** |
| * Creates a new AmqpMessage that wraps the information necessary to handle |
| * an outgoing message. |
| * |
| * @param message the Proton message that is to be sent. |
| */ |
| public AmqpMessage(Message message) { |
| this(null, message, null); |
| } |
| |
| /** |
| * Creates a new AmqpMessage that wraps the information necessary to handle |
| * an incoming delivery. |
| * |
| * @param receiver the AmqpReceiver that received this message. |
| * @param message the Proton message that was received. |
| * @param delivery the Delivery instance that produced this message. |
| */ |
| @SuppressWarnings("unchecked") |
| public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) { |
| this.receiver = receiver; |
| this.message = message; |
| this.delivery = delivery; |
| |
| if (message.getMessageAnnotations() != null) { |
| messageAnnotationsMap = message.getMessageAnnotations().getValue(); |
| } |
| |
| if (message.getApplicationProperties() != null) { |
| applicationPropertiesMap = message.getApplicationProperties().getValue(); |
| } |
| |
| if (message.getDeliveryAnnotations() != null) { |
| deliveryAnnotationsMap = message.getDeliveryAnnotations().getValue(); |
| } |
| } |
| |
| //----- Access to interal client resources -------------------------------// |
| |
| /** |
| * @return the AMQP Delivery object linked to a received message. |
| */ |
| public Delivery getWrappedDelivery() { |
| if (delivery != null) { |
| return UnmodifiableProxy.deliveryProxy(delivery); |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @return the AMQP Message that is wrapped by this object. |
| */ |
| public Message getWrappedMessage() { |
| return message; |
| } |
| |
| /** |
| * @return the AmqpReceiver that consumed this message. |
| */ |
| public AmqpReceiver getAmqpReceiver() { |
| return receiver; |
| } |
| |
| //----- Message disposition control --------------------------------------// |
| |
| /** |
| * Accepts the message marking it as consumed on the remote peer. |
| * |
| * @throws Exception if an error occurs during the accept. |
| */ |
| public void accept() throws Exception { |
| accept(true); |
| } |
| |
| /** |
| * Accepts the message marking it as consumed on the remote peer. |
| * |
| * @param settle |
| * true if the client should also settle the delivery when sending the accept. |
| * |
| * @throws Exception if an error occurs during the accept. |
| */ |
| public void accept(boolean settle) throws Exception { |
| if (receiver == null) { |
| throw new IllegalStateException("Can't accept non-received message."); |
| } |
| |
| receiver.accept(delivery, settle); |
| } |
| |
| /** |
| * Accepts the message marking it as consumed on the remote peer. |
| * |
| * @param txnSession The session that is used to manage acceptance of the message. |
| * @throws Exception if an error occurs during the accept. |
| */ |
| public void accept(AmqpSession txnSession) throws Exception { |
| accept(txnSession, true); |
| } |
| |
| /** |
| * Accepts the message marking it as consumed on the remote peer. |
| * |
| * @param txnSession |
| * The session that is used to manage acceptance of the message. |
| * |
| * @throws Exception if an error occurs during the accept. |
| */ |
| public void accept(AmqpSession txnSession, boolean settle) throws Exception { |
| if (receiver == null) { |
| throw new IllegalStateException("Can't accept non-received message."); |
| } |
| |
| receiver.accept(delivery, txnSession, settle); |
| } |
| |
| /** |
| * Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here. |
| * |
| * @param deliveryFailed indicates that the delivery failed for some reason. |
| * @param undeliverableHere marks the delivery as not being able to be process by link it was sent to. |
| * @throws Exception if an error occurs during the process. |
| */ |
| public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception { |
| if (receiver == null) { |
| throw new IllegalStateException("Can't modify non-received message."); |
| } |
| |
| receiver.modified(delivery, deliveryFailed, undeliverableHere); |
| } |
| |
| /** |
| * Release the message, remote can redeliver it elsewhere. |
| * |
| * @throws Exception if an error occurs during the release. |
| */ |
| public void release() throws Exception { |
| if (receiver == null) { |
| throw new IllegalStateException("Can't release non-received message."); |
| } |
| |
| receiver.release(delivery); |
| } |
| |
| /** |
| * Reject the message, remote can redeliver it elsewhere. |
| * |
| * @throws Exception if an error occurs during the reject. |
| */ |
| public void reject() throws Exception { |
| if (receiver == null) { |
| throw new IllegalStateException("Can't release non-received message."); |
| } |
| |
| receiver.reject(delivery); |
| } |
| |
| //----- Convenience methods for constructing outbound messages -----------// |
| |
| /** |
| * Sets the address which is applied to the AMQP message To field in the message properties |
| * |
| * @param address The address that should be applied in the Message To field. |
| */ |
| public void setAddress(String address) { |
| checkReadOnly(); |
| lazyCreateProperties(); |
| getWrappedMessage().setAddress(address); |
| } |
| |
| /** |
| * Return the set address that was set in the Message To field. |
| * |
| * @return the set address String form or null if not set. |
| */ |
| public String getAddress() { |
| if (message.getProperties() == null) { |
| return null; |
| } |
| |
| return message.getProperties().getTo(); |
| } |
| |
| /** |
| * Sets the replyTo address which is applied to the AMQP message reply-to field in the message properties |
| * |
| * @param address The replyTo address that should be applied in the Message To field. |
| */ |
| public void setReplyToAddress(String address) { |
| checkReadOnly(); |
| lazyCreateProperties(); |
| getWrappedMessage().setReplyTo(address); |
| } |
| |
| /** |
| * Return the set replyTo address that was set in the Message To field. |
| * |
| * @return the set replyTo address String form or null if not set. |
| */ |
| public String getReplyToAddress() { |
| if (message.getProperties() == null) { |
| return null; |
| } |
| |
| return message.getProperties().getReplyTo(); |
| } |
| |
| /** |
| * Sets the MessageId property on an outbound message using the provided String |
| * |
| * @param messageId the String message ID value to set. |
| */ |
| public void setMessageId(String messageId) { |
| checkReadOnly(); |
| lazyCreateProperties(); |
| getWrappedMessage().setMessageId(messageId); |
| } |
| |
| /** |
| * Return the set MessageId value in String form, if there are no properties |
| * in the given message return null. |
| * |
| * @return the set message ID in String form or null if not set. |
| */ |
| public String getMessageId() { |
| if (message.getProperties() == null || message.getProperties().getMessageId() == null) { |
| return null; |
| } |
| |
| return message.getProperties().getMessageId().toString(); |
| } |
| |
| /** |
| * Return the set MessageId value in the original form, if there are no properties |
| * in the given message return null. |
| * |
| * @return the set message ID in its original form or null if not set. |
| */ |
| public Object getRawMessageId() { |
| if (message.getProperties() == null) { |
| return null; |
| } |
| |
| return message.getProperties().getMessageId(); |
| } |
| |
| /** |
| * Sets the MessageId property on an outbound message using the provided value |
| * |
| * @param messageId the message ID value to set. |
| */ |
| public void setRawMessageId(Object messageId) { |
| checkReadOnly(); |
| lazyCreateProperties(); |
| getWrappedMessage().setMessageId(messageId); |
| } |
| |
| /** |
| * Sets the CorrelationId property on an outbound message using the provided String |
| * |
| * @param correlationId the String Correlation ID value to set. |
| */ |
| public void setCorrelationId(String correlationId) { |
| checkReadOnly(); |
| lazyCreateProperties(); |
| getWrappedMessage().setCorrelationId(correlationId); |
| } |
| |
| /** |
| * Return the set CorrelationId value in String form, if there are no properties |
| * in the given message return null. |
| * |
| * @return the set correlation ID in String form or null if not set. |
| */ |
| public String getCorrelationId() { |
| if (message.getProperties() == null || message.getProperties().getCorrelationId() == null) { |
| return null; |
| } |
| |
| return message.getProperties().getCorrelationId().toString(); |
| } |
| |
| /** |
| * Return the set CorrelationId value in the original form, if there are no properties |
| * in the given message return null. |
| * |
| * @return the set message ID in its original form or null if not set. |
| */ |
| public Object getRawCorrelationId() { |
| if (message.getProperties() == null) { |
| return null; |
| } |
| |
| return message.getProperties().getCorrelationId(); |
| } |
| |
| /** |
| * Sets the CorrelationId property on an outbound message using the provided value |
| * |
| * @param correlationId the correlation ID value to set. |
| */ |
| public void setRawCorrelationId(Object correlationId) { |
| checkReadOnly(); |
| lazyCreateProperties(); |
| getWrappedMessage().setCorrelationId(correlationId); |
| } |
| |
| /** |
| * Sets the GroupId property on an outbound message using the provided String |
| * |
| * @param groupId the String Group ID value to set. |
| */ |
| public void setGroupId(String groupId) { |
| checkReadOnly(); |
| lazyCreateProperties(); |
| getWrappedMessage().setGroupId(groupId); |
| } |
| |
| /** |
| * Return the set GroupId value in String form, if there are no properties |
| * in the given message return null. |
| * |
| * @return the set GroupID in String form or null if not set. |
| */ |
| public String getGroupId() { |
| if (message.getProperties() == null) { |
| return null; |
| } |
| |
| return message.getProperties().getGroupId(); |
| } |
| |
| /** |
| * Sets the Subject property on an outbound message using the provided String |
| * |
| * @param subject the String Subject value to set. |
| */ |
| public void setSubject(String subject) { |
| checkReadOnly(); |
| lazyCreateProperties(); |
| getWrappedMessage().setSubject(subject); |
| } |
| |
| /** |
| * Return the set Subject value in String form, if there are no properties |
| * in the given message return null. |
| * |
| * @return the set Subject in String form or null if not set. |
| */ |
| public String getSubject() { |
| if (message.getProperties() == null) { |
| return null; |
| } |
| |
| return message.getProperties().getSubject(); |
| } |
| |
| /** |
| * Sets the durable header on the outgoing message. |
| * |
| * @param durable the boolean durable value to set. |
| */ |
| public void setDurable(boolean durable) { |
| checkReadOnly(); |
| lazyCreateHeader(); |
| getWrappedMessage().setDurable(durable); |
| } |
| |
| /** |
| * Checks the durable value in the Message Headers to determine if |
| * the message was sent as a durable Message. |
| * |
| * @return true if the message is marked as being durable. |
| */ |
| public boolean isDurable() { |
| if (message.getHeader() == null || message.getHeader().getDurable() == null) { |
| return false; |
| } |
| |
| return message.getHeader().getDurable(); |
| } |
| |
| /** |
| * Sets the priority header on the outgoing message. |
| * |
| * @param priority the priority value to set. |
| */ |
| public void setPriority(short priority) { |
| checkReadOnly(); |
| lazyCreateHeader(); |
| getWrappedMessage().setPriority(priority); |
| } |
| |
| /** |
| * Gets the priority header on the message. |
| */ |
| public short getPriority() { |
| return getWrappedMessage().getPriority(); |
| } |
| |
| /** |
| * Sets the ttl header on the outgoing message. |
| * |
| * @param timeToLive the ttl value to set. |
| */ |
| public void setTimeToLive(long timeToLive) { |
| checkReadOnly(); |
| lazyCreateHeader(); |
| getWrappedMessage().setTtl(timeToLive); |
| } |
| |
| /** |
| * Sets the ttl header on the outgoing message. |
| */ |
| public long getTimeToLive() { |
| return getWrappedMessage().getTtl(); |
| } |
| |
| /** |
| * Sets the absolute expiration time property on the message. |
| * |
| * @param absoluteExpiryTime the expiration time value to set. |
| */ |
| public void setAbsoluteExpiryTime(long absoluteExpiryTime) { |
| checkReadOnly(); |
| lazyCreateProperties(); |
| getWrappedMessage().setExpiryTime(absoluteExpiryTime); |
| } |
| |
| /** |
| * Gets the absolute expiration time property on the message. |
| */ |
| public long getAbsoluteExpiryTime() { |
| return getWrappedMessage().getExpiryTime(); |
| } |
| |
| /** |
| * Sets the creation time property on the message. |
| * |
| * @param creationTime the time value to set. |
| */ |
| public void setCreationTime(long creationTime) { |
| checkReadOnly(); |
| lazyCreateProperties(); |
| getWrappedMessage().setCreationTime(creationTime); |
| } |
| |
| /** |
| * Gets the absolute expiration time property on the message. |
| */ |
| public long getCreationTime() { |
| return getWrappedMessage().getCreationTime(); |
| } |
| |
| /** |
| * Sets a given application property on an outbound message. |
| * |
| * @param key the name to assign the new property. |
| * @param value the value to set for the named property. |
| */ |
| public void setApplicationProperty(String key, Object value) { |
| checkReadOnly(); |
| lazyCreateApplicationProperties(); |
| applicationPropertiesMap.put(key, value); |
| } |
| |
| /** |
| * Gets the application property that is mapped to the given name or null |
| * if no property has been set with that name. |
| * |
| * @param key the name used to lookup the property in the application properties. |
| * @return the property value or null if not set. |
| */ |
| public Object getApplicationProperty(String key) { |
| if (applicationPropertiesMap == null) { |
| return null; |
| } |
| |
| return applicationPropertiesMap.get(key); |
| } |
| |
| /** |
| * Perform a proper annotation set on the AMQP Message based on a Symbol key and |
| * the target value to append to the current annotations. |
| * |
| * @param key The name of the Symbol whose value is being set. |
| * @param value The new value to set in the annotations of this message. |
| */ |
| public void setMessageAnnotation(String key, Object value) { |
| checkReadOnly(); |
| lazyCreateMessageAnnotations(); |
| messageAnnotationsMap.put(Symbol.valueOf(key), value); |
| } |
| |
| /** |
| * Given a message annotation name, lookup and return the value associated with |
| * that annotation name. If the message annotations have not been created yet |
| * then this method will always return null. |
| * |
| * @param key the Symbol name that should be looked up in the message annotations. |
| * @return the value of the annotation if it exists, or null if not set or not accessible. |
| */ |
| public Object getMessageAnnotation(String key) { |
| if (messageAnnotationsMap == null) { |
| return null; |
| } |
| |
| return messageAnnotationsMap.get(Symbol.valueOf(key)); |
| } |
| |
| /** |
| * Perform a proper delivery annotation set on the AMQP Message based on a Symbol |
| * key and the target value to append to the current delivery annotations. |
| * |
| * @param key The name of the Symbol whose value is being set. |
| * @param value The new value to set in the delivery annotations of this message. |
| */ |
| public void setDeliveryAnnotation(String key, Object value) { |
| checkReadOnly(); |
| lazyCreateDeliveryAnnotations(); |
| deliveryAnnotationsMap.put(Symbol.valueOf(key), value); |
| } |
| |
| /** |
| * Given a message annotation name, lookup and return the value associated with |
| * that annotation name. If the message annotations have not been created yet |
| * then this method will always return null. |
| * |
| * @param key the Symbol name that should be looked up in the message annotations. |
| * @return the value of the annotation if it exists, or null if not set or not accessible. |
| */ |
| public Object getDeliveryAnnotation(String key) { |
| if (deliveryAnnotationsMap == null) { |
| return null; |
| } |
| |
| return deliveryAnnotationsMap.get(Symbol.valueOf(key)); |
| } |
| |
| //----- Methods for manipulating the Message body ------------------------// |
| |
| /** |
| * Sets a String value into the body of an outgoing Message, throws |
| * an exception if this is an incoming message instance. |
| * |
| * @param value the String value to store in the Message body. |
| * @throws IllegalStateException if the message is read only. |
| */ |
| public void setText(String value) throws IllegalStateException { |
| checkReadOnly(); |
| AmqpValue body = new AmqpValue(value); |
| getWrappedMessage().setBody(body); |
| } |
| |
| /** |
| * Attempts to retrieve the message body as a String from an AmqpValue body. |
| * |
| * @return the string |
| * @throws NoSuchElementException if the body does not contain a AmqpValue with String. |
| */ |
| public String getText() throws NoSuchElementException { |
| Section body = getWrappedMessage().getBody(); |
| if (body instanceof AmqpValue) { |
| AmqpValue value = (AmqpValue) body; |
| |
| if (value.getValue() instanceof String) { |
| return (String) value.getValue(); |
| } |
| } |
| |
| throw new NoSuchElementException("Message does not contain a String body"); |
| } |
| |
| /** |
| * Sets a byte array value into the body of an outgoing Message, throws |
| * an exception if this is an incoming message instance. |
| * |
| * @param bytes the byte array value to store in the Message body. |
| * @throws IllegalStateException if the message is read only. |
| */ |
| public void setBytes(byte[] bytes) throws IllegalStateException { |
| checkReadOnly(); |
| Data body = new Data(new Binary(bytes)); |
| getWrappedMessage().setBody(body); |
| } |
| |
| /** |
| * Sets a described type into the body of an outgoing Message, throws |
| * an exception if this is an incoming message instance. |
| * |
| * @param described the described type value to store in the Message body. |
| * @throws IllegalStateException if the message is read only. |
| */ |
| public void setDescribedType(DescribedType described) throws IllegalStateException { |
| checkReadOnly(); |
| AmqpValue body = new AmqpValue(described); |
| getWrappedMessage().setBody(body); |
| } |
| |
| /** |
| * Attempts to retrieve the message body as an DescribedType instance. |
| * |
| * @return an DescribedType instance if one is stored in the message body. |
| * @throws NoSuchElementException if the body does not contain a DescribedType. |
| */ |
| public DescribedType getDescribedType() throws NoSuchElementException { |
| DescribedType result = null; |
| |
| if (getWrappedMessage().getBody() == null) { |
| return null; |
| } else { |
| if (getWrappedMessage().getBody() instanceof AmqpValue) { |
| AmqpValue value = (AmqpValue) getWrappedMessage().getBody(); |
| |
| if (value.getValue() == null) { |
| result = null; |
| } else if (value.getValue() instanceof DescribedType) { |
| result = (DescribedType) value.getValue(); |
| } else { |
| throw new NoSuchElementException("Message does not contain a DescribedType body"); |
| } |
| } |
| } |
| |
| return result; |
| } |
| |
| //----- Internal implementation ------------------------------------------// |
| |
| private void checkReadOnly() throws IllegalStateException { |
| if (delivery != null) { |
| throw new IllegalStateException("Message is read only."); |
| } |
| } |
| |
| private void lazyCreateMessageAnnotations() { |
| if (messageAnnotationsMap == null) { |
| messageAnnotationsMap = new HashMap<>(); |
| message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap)); |
| } |
| } |
| |
| private void lazyCreateDeliveryAnnotations() { |
| if (deliveryAnnotationsMap == null) { |
| deliveryAnnotationsMap = new HashMap<>(); |
| message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap)); |
| } |
| } |
| |
| private void lazyCreateApplicationProperties() { |
| if (applicationPropertiesMap == null) { |
| applicationPropertiesMap = new HashMap<>(); |
| message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap)); |
| } |
| } |
| |
| private void lazyCreateHeader() { |
| if (message.getHeader() == null) { |
| message.setHeader(new Header()); |
| } |
| } |
| |
| private void lazyCreateProperties() { |
| if (message.getProperties() == null) { |
| message.setProperties(new Properties()); |
| } |
| } |
| |
| public void settle() { |
| delivery.settle(); |
| } |
| } |