blob: 99ade0721329697336a288ca0a0c53a81bbf8ad4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.message.Message;
public class AmqpMessage {
private final AmqpReceiver receiver;
private final Message message;
private final Delivery delivery;
private Map<Symbol, Object> deliveryAnnotationsMap;
private Map<Symbol, Object> messageAnnotationsMap;
private Map<String, Object> applicationPropertiesMap;
/**
* Creates a new AmqpMessage that wraps the information necessary to handle
* an outgoing message.
*/
public AmqpMessage() {
receiver = null;
delivery = null;
message = Proton.message();
}
/**
* Creates a new AmqpMessage that wraps the information necessary to handle
* an outgoing message.
*
* @param message the Proton message that is to be sent.
*/
public AmqpMessage(Message message) {
this(null, message, null);
}
/**
* Creates a new AmqpMessage that wraps the information necessary to handle
* an incoming delivery.
*
* @param receiver the AmqpReceiver that received this message.
* @param message the Proton message that was received.
* @param delivery the Delivery instance that produced this message.
*/
@SuppressWarnings("unchecked")
public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) {
this.receiver = receiver;
this.message = message;
this.delivery = delivery;
if (message.getMessageAnnotations() != null) {
messageAnnotationsMap = message.getMessageAnnotations().getValue();
}
if (message.getApplicationProperties() != null) {
applicationPropertiesMap = message.getApplicationProperties().getValue();
}
if (message.getDeliveryAnnotations() != null) {
deliveryAnnotationsMap = message.getDeliveryAnnotations().getValue();
}
}
//----- Access to interal client resources -------------------------------//
/**
* @return the AMQP Delivery object linked to a received message.
*/
public Delivery getWrappedDelivery() {
if (delivery != null) {
return UnmodifiableProxy.deliveryProxy(delivery);
}
return null;
}
/**
* @return the AMQP Message that is wrapped by this object.
*/
public Message getWrappedMessage() {
return message;
}
/**
* @return the AmqpReceiver that consumed this message.
*/
public AmqpReceiver getAmqpReceiver() {
return receiver;
}
//----- Message disposition control --------------------------------------//
/**
* Accepts the message marking it as consumed on the remote peer.
*
* @throws Exception if an error occurs during the accept.
*/
public void accept() throws Exception {
accept(true);
}
/**
* Accepts the message marking it as consumed on the remote peer.
*
* @param settle
* true if the client should also settle the delivery when sending the accept.
*
* @throws Exception if an error occurs during the accept.
*/
public void accept(boolean settle) throws Exception {
if (receiver == null) {
throw new IllegalStateException("Can't accept non-received message.");
}
receiver.accept(delivery, settle);
}
/**
* Accepts the message marking it as consumed on the remote peer.
*
* @param txnSession The session that is used to manage acceptance of the message.
* @throws Exception if an error occurs during the accept.
*/
public void accept(AmqpSession txnSession) throws Exception {
accept(txnSession, true);
}
/**
* Accepts the message marking it as consumed on the remote peer.
*
* @param txnSession
* The session that is used to manage acceptance of the message.
*
* @throws Exception if an error occurs during the accept.
*/
public void accept(AmqpSession txnSession, boolean settle) throws Exception {
if (receiver == null) {
throw new IllegalStateException("Can't accept non-received message.");
}
receiver.accept(delivery, txnSession, settle);
}
/**
* Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
*
* @param deliveryFailed indicates that the delivery failed for some reason.
* @param undeliverableHere marks the delivery as not being able to be process by link it was sent to.
* @throws Exception if an error occurs during the process.
*/
public void modified(Boolean deliveryFailed, Boolean undeliverableHere) throws Exception {
if (receiver == null) {
throw new IllegalStateException("Can't modify non-received message.");
}
receiver.modified(delivery, deliveryFailed, undeliverableHere);
}
/**
* Release the message, remote can redeliver it elsewhere.
*
* @throws Exception if an error occurs during the release.
*/
public void release() throws Exception {
if (receiver == null) {
throw new IllegalStateException("Can't release non-received message.");
}
receiver.release(delivery);
}
/**
* Reject the message, remote can redeliver it elsewhere.
*
* @throws Exception if an error occurs during the reject.
*/
public void reject() throws Exception {
if (receiver == null) {
throw new IllegalStateException("Can't release non-received message.");
}
receiver.reject(delivery);
}
//----- Convenience methods for constructing outbound messages -----------//
/**
* Sets the address which is applied to the AMQP message To field in the message properties
*
* @param address The address that should be applied in the Message To field.
*/
public void setAddress(String address) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setAddress(address);
}
/**
* Return the set address that was set in the Message To field.
*
* @return the set address String form or null if not set.
*/
public String getAddress() {
if (message.getProperties() == null) {
return null;
}
return message.getProperties().getTo();
}
/**
* Sets the replyTo address which is applied to the AMQP message reply-to field in the message properties
*
* @param address The replyTo address that should be applied in the Message To field.
*/
public void setReplyToAddress(String address) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setReplyTo(address);
}
/**
* Return the set replyTo address that was set in the Message To field.
*
* @return the set replyTo address String form or null if not set.
*/
public String getReplyToAddress() {
if (message.getProperties() == null) {
return null;
}
return message.getProperties().getReplyTo();
}
/**
* Sets the MessageId property on an outbound message using the provided String
*
* @param messageId the String message ID value to set.
*/
public void setMessageId(String messageId) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setMessageId(messageId);
}
/**
* Return the set MessageId value in String form, if there are no properties
* in the given message return null.
*
* @return the set message ID in String form or null if not set.
*/
public String getMessageId() {
if (message.getProperties() == null || message.getProperties().getMessageId() == null) {
return null;
}
return message.getProperties().getMessageId().toString();
}
/**
* Return the set MessageId value in the original form, if there are no properties
* in the given message return null.
*
* @return the set message ID in its original form or null if not set.
*/
public Object getRawMessageId() {
if (message.getProperties() == null) {
return null;
}
return message.getProperties().getMessageId();
}
/**
* Sets the MessageId property on an outbound message using the provided value
*
* @param messageId the message ID value to set.
*/
public void setRawMessageId(Object messageId) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setMessageId(messageId);
}
/**
* Sets the CorrelationId property on an outbound message using the provided String
*
* @param correlationId the String Correlation ID value to set.
*/
public void setCorrelationId(String correlationId) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setCorrelationId(correlationId);
}
/**
* Return the set CorrelationId value in String form, if there are no properties
* in the given message return null.
*
* @return the set correlation ID in String form or null if not set.
*/
public String getCorrelationId() {
if (message.getProperties() == null || message.getProperties().getCorrelationId() == null) {
return null;
}
return message.getProperties().getCorrelationId().toString();
}
/**
* Return the set CorrelationId value in the original form, if there are no properties
* in the given message return null.
*
* @return the set message ID in its original form or null if not set.
*/
public Object getRawCorrelationId() {
if (message.getProperties() == null) {
return null;
}
return message.getProperties().getCorrelationId();
}
/**
* Sets the CorrelationId property on an outbound message using the provided value
*
* @param correlationId the correlation ID value to set.
*/
public void setRawCorrelationId(Object correlationId) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setCorrelationId(correlationId);
}
/**
* Sets the GroupId property on an outbound message using the provided String
*
* @param groupId the String Group ID value to set.
*/
public void setGroupId(String groupId) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setGroupId(groupId);
}
/**
* Return the set GroupId value in String form, if there are no properties
* in the given message return null.
*
* @return the set GroupID in String form or null if not set.
*/
public String getGroupId() {
if (message.getProperties() == null) {
return null;
}
return message.getProperties().getGroupId();
}
/**
* Sets the Subject property on an outbound message using the provided String
*
* @param subject the String Subject value to set.
*/
public void setSubject(String subject) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setSubject(subject);
}
/**
* Return the set Subject value in String form, if there are no properties
* in the given message return null.
*
* @return the set Subject in String form or null if not set.
*/
public String getSubject() {
if (message.getProperties() == null) {
return null;
}
return message.getProperties().getSubject();
}
/**
* Sets the durable header on the outgoing message.
*
* @param durable the boolean durable value to set.
*/
public void setDurable(boolean durable) {
checkReadOnly();
lazyCreateHeader();
getWrappedMessage().setDurable(durable);
}
/**
* Checks the durable value in the Message Headers to determine if
* the message was sent as a durable Message.
*
* @return true if the message is marked as being durable.
*/
public boolean isDurable() {
if (message.getHeader() == null || message.getHeader().getDurable() == null) {
return false;
}
return message.getHeader().getDurable();
}
/**
* Sets the priority header on the outgoing message.
*
* @param priority the priority value to set.
*/
public void setPriority(short priority) {
checkReadOnly();
lazyCreateHeader();
getWrappedMessage().setPriority(priority);
}
/**
* Gets the priority header on the message.
*/
public short getPriority() {
return getWrappedMessage().getPriority();
}
/**
* Sets the ttl header on the outgoing message.
*
* @param timeToLive the ttl value to set.
*/
public void setTimeToLive(long timeToLive) {
checkReadOnly();
lazyCreateHeader();
getWrappedMessage().setTtl(timeToLive);
}
/**
* Sets the ttl header on the outgoing message.
*/
public long getTimeToLive() {
return getWrappedMessage().getTtl();
}
/**
* Sets the absolute expiration time property on the message.
*
* @param absoluteExpiryTime the expiration time value to set.
*/
public void setAbsoluteExpiryTime(long absoluteExpiryTime) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setExpiryTime(absoluteExpiryTime);
}
/**
* Gets the absolute expiration time property on the message.
*/
public long getAbsoluteExpiryTime() {
return getWrappedMessage().getExpiryTime();
}
/**
* Sets the creation time property on the message.
*
* @param creationTime the time value to set.
*/
public void setCreationTime(long creationTime) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setCreationTime(creationTime);
}
/**
* Gets the absolute expiration time property on the message.
*/
public long getCreationTime() {
return getWrappedMessage().getCreationTime();
}
/**
* Sets a given application property on an outbound message.
*
* @param key the name to assign the new property.
* @param value the value to set for the named property.
*/
public void setApplicationProperty(String key, Object value) {
checkReadOnly();
lazyCreateApplicationProperties();
applicationPropertiesMap.put(key, value);
}
/**
* Gets the application property that is mapped to the given name or null
* if no property has been set with that name.
*
* @param key the name used to lookup the property in the application properties.
* @return the property value or null if not set.
*/
public Object getApplicationProperty(String key) {
if (applicationPropertiesMap == null) {
return null;
}
return applicationPropertiesMap.get(key);
}
/**
* Perform a proper annotation set on the AMQP Message based on a Symbol key and
* the target value to append to the current annotations.
*
* @param key The name of the Symbol whose value is being set.
* @param value The new value to set in the annotations of this message.
*/
public void setMessageAnnotation(String key, Object value) {
checkReadOnly();
lazyCreateMessageAnnotations();
messageAnnotationsMap.put(Symbol.valueOf(key), value);
}
/**
* Given a message annotation name, lookup and return the value associated with
* that annotation name. If the message annotations have not been created yet
* then this method will always return null.
*
* @param key the Symbol name that should be looked up in the message annotations.
* @return the value of the annotation if it exists, or null if not set or not accessible.
*/
public Object getMessageAnnotation(String key) {
if (messageAnnotationsMap == null) {
return null;
}
return messageAnnotationsMap.get(Symbol.valueOf(key));
}
/**
* Perform a proper delivery annotation set on the AMQP Message based on a Symbol
* key and the target value to append to the current delivery annotations.
*
* @param key The name of the Symbol whose value is being set.
* @param value The new value to set in the delivery annotations of this message.
*/
public void setDeliveryAnnotation(String key, Object value) {
checkReadOnly();
lazyCreateDeliveryAnnotations();
deliveryAnnotationsMap.put(Symbol.valueOf(key), value);
}
/**
* Given a message annotation name, lookup and return the value associated with
* that annotation name. If the message annotations have not been created yet
* then this method will always return null.
*
* @param key the Symbol name that should be looked up in the message annotations.
* @return the value of the annotation if it exists, or null if not set or not accessible.
*/
public Object getDeliveryAnnotation(String key) {
if (deliveryAnnotationsMap == null) {
return null;
}
return deliveryAnnotationsMap.get(Symbol.valueOf(key));
}
//----- Methods for manipulating the Message body ------------------------//
/**
* Sets a String value into the body of an outgoing Message, throws
* an exception if this is an incoming message instance.
*
* @param value the String value to store in the Message body.
* @throws IllegalStateException if the message is read only.
*/
public void setText(String value) throws IllegalStateException {
checkReadOnly();
AmqpValue body = new AmqpValue(value);
getWrappedMessage().setBody(body);
}
/**
* Attempts to retrieve the message body as a String from an AmqpValue body.
*
* @return the string
* @throws NoSuchElementException if the body does not contain a AmqpValue with String.
*/
public String getText() throws NoSuchElementException {
Section body = getWrappedMessage().getBody();
if (body instanceof AmqpValue) {
AmqpValue value = (AmqpValue) body;
if (value.getValue() instanceof String) {
return (String) value.getValue();
}
}
throw new NoSuchElementException("Message does not contain a String body");
}
/**
* Sets a byte array value into the body of an outgoing Message, throws
* an exception if this is an incoming message instance.
*
* @param bytes the byte array value to store in the Message body.
* @throws IllegalStateException if the message is read only.
*/
public void setBytes(byte[] bytes) throws IllegalStateException {
checkReadOnly();
Data body = new Data(new Binary(bytes));
getWrappedMessage().setBody(body);
}
/**
* Sets a described type into the body of an outgoing Message, throws
* an exception if this is an incoming message instance.
*
* @param described the described type value to store in the Message body.
* @throws IllegalStateException if the message is read only.
*/
public void setDescribedType(DescribedType described) throws IllegalStateException {
checkReadOnly();
AmqpValue body = new AmqpValue(described);
getWrappedMessage().setBody(body);
}
/**
* Attempts to retrieve the message body as an DescribedType instance.
*
* @return an DescribedType instance if one is stored in the message body.
* @throws NoSuchElementException if the body does not contain a DescribedType.
*/
public DescribedType getDescribedType() throws NoSuchElementException {
DescribedType result = null;
if (getWrappedMessage().getBody() == null) {
return null;
} else {
if (getWrappedMessage().getBody() instanceof AmqpValue) {
AmqpValue value = (AmqpValue) getWrappedMessage().getBody();
if (value.getValue() == null) {
result = null;
} else if (value.getValue() instanceof DescribedType) {
result = (DescribedType) value.getValue();
} else {
throw new NoSuchElementException("Message does not contain a DescribedType body");
}
}
}
return result;
}
//----- Internal implementation ------------------------------------------//
private void checkReadOnly() throws IllegalStateException {
if (delivery != null) {
throw new IllegalStateException("Message is read only.");
}
}
private void lazyCreateMessageAnnotations() {
if (messageAnnotationsMap == null) {
messageAnnotationsMap = new HashMap<>();
message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
}
}
private void lazyCreateDeliveryAnnotations() {
if (deliveryAnnotationsMap == null) {
deliveryAnnotationsMap = new HashMap<>();
message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
}
}
private void lazyCreateApplicationProperties() {
if (applicationPropertiesMap == null) {
applicationPropertiesMap = new HashMap<>();
message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
}
}
private void lazyCreateHeader() {
if (message.getHeader() == null) {
message.setHeader(new Header());
}
}
private void lazyCreateProperties() {
if (message.getProperties() == null) {
message.setProperties(new Properties());
}
}
public void settle() {
delivery.settle();
}
}