blob: d04e403970a591060d8bb105ab820cbcbd38c976 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.provider.amqp.message;
import java.util.HashSet;
import java.util.Set;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsTemporaryQueue;
import org.apache.qpid.jms.JmsTemporaryTopic;
import org.apache.qpid.jms.JmsTopic;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
/**
* A set of static utility method useful when mapping JmsDestination types to / from the AMQP
* destination fields in a Message that's being sent or received.
*/
public class AmqpDestinationHelper {
public static final AmqpDestinationHelper INSTANCE = new AmqpDestinationHelper();
// For support of current byte type values
public static final String JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-dest";
public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to";
public static final byte QUEUE_TYPE = 0x00;
public static final byte TOPIC_TYPE = 0x01;
public static final byte TEMP_QUEUE_TYPE = 0x02;
public static final byte TEMP_TOPIC_TYPE = 0x03;
// For support of old string type values
public static final String TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-to-type";
public static final String REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-reply-type";
public static final String QUEUE_ATTRIBUTE = "queue";
public static final String TOPIC_ATTRIBUTE = "topic";
public static final String TEMPORARY_ATTRIBUTE = "temporary";
/**
* Decode the provided To address, type description, and consumer destination
* information such that an appropriate Destination object can be returned.
*
* If an address and type description is provided then this will be used to
* create the Destination. If the type information is missing, it will be
* derived from the consumer destination if present, or default to a queue
* destination if not.
*
* If the address is null then the consumer destination is returned, unless
* the useConsumerDestForTypeOnly flag is true, in which case null will be
* returned.
*/
public JmsDestination getJmsDestination(AmqpJmsMessageFacade message, JmsDestination consumerDestination) {
String to = message.getToAddress();
Byte typeByte = getTypeByte(message, JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
if (typeByte == null) {
// Try the legacy string type annotation
typeByte = getTypeByte(message, TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
}
String name = stripPrefixIfNecessary(to, message.getConnection(), typeByte, consumerDestination);
return createDestination(name, typeByte, consumerDestination, false);
}
public JmsDestination getJmsReplyTo(AmqpJmsMessageFacade message, JmsDestination consumerDestination) {
String replyTo = message.getReplyToAddress();
Byte typeByte = getTypeByte(message, JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
if (typeByte == null) {
// Try the legacy string type annotation
typeByte = getTypeByte(message, REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
}
String name = stripPrefixIfNecessary(replyTo, message.getConnection(), typeByte, consumerDestination);
return createDestination(name, typeByte, consumerDestination, true);
}
private String stripPrefixIfNecessary(String address, AmqpConnection conn, Byte typeByte, JmsDestination consumerDestination) {
if (address == null) {
return null;
}
if (typeByte == null) {
String queuePrefix = conn.getQueuePrefix();
if (queuePrefix != null && address.startsWith(queuePrefix)) {
return address.substring(queuePrefix.length());
}
String topicPrefix = conn.getTopicPrefix();
if (topicPrefix != null && address.startsWith(topicPrefix)) {
return address.substring(topicPrefix.length());
}
} else if (typeByte == QUEUE_TYPE) {
String queuePrefix = conn.getQueuePrefix();
if (queuePrefix != null && address.startsWith(queuePrefix)) {
return address.substring(queuePrefix.length());
}
} else if (typeByte == TOPIC_TYPE) {
String topicPrefix = conn.getTopicPrefix();
if (topicPrefix != null && address.startsWith(topicPrefix)) {
return address.substring(topicPrefix.length());
}
}
return address;
}
private JmsDestination createDestination(String address, Byte typeByte, JmsDestination consumerDestination, boolean useConsumerDestForTypeOnly) {
if (address == null) {
return useConsumerDestForTypeOnly ? null : consumerDestination;
}
if (typeByte != null) {
switch (typeByte) {
case QUEUE_TYPE:
return new JmsQueue(address);
case TOPIC_TYPE:
return new JmsTopic(address);
case TEMP_QUEUE_TYPE:
return new JmsTemporaryQueue(address);
case TEMP_TOPIC_TYPE:
return new JmsTemporaryTopic(address);
}
}
if (consumerDestination.isQueue()) {
if (consumerDestination.isTemporary()) {
return new JmsTemporaryQueue(address);
} else {
return new JmsQueue(address);
}
} else if (consumerDestination.isTopic()) {
if (consumerDestination.isTemporary()) {
return new JmsTemporaryTopic(address);
} else {
return new JmsTopic(address);
}
}
// fall back to a Queue Destination since we need a real JMS destination
return new JmsQueue(address);
}
public void setToAddressFromDestination(AmqpJmsMessageFacade message, JmsDestination destination) {
String address = getDestinationAddress(destination, message.getConnection());
Object typeValue = toTypeAnnotation(destination);
message.setToAddress(address);
if (address == null || typeValue == null) {
message.removeMessageAnnotation(JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
} else {
message.setMessageAnnotation(JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME, typeValue);
}
// Always clear the legacy string type annotation
message.removeMessageAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
}
public void setReplyToAddressFromDestination(AmqpJmsMessageFacade message, JmsDestination destination) {
String replyToAddress = getDestinationAddress(destination, message.getConnection());
Object typeValue = toTypeAnnotation(destination);
message.setReplyToAddress(replyToAddress);
if (replyToAddress == null || typeValue == null) {
message.removeMessageAnnotation(JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
} else {
message.setMessageAnnotation(JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, typeValue);
}
// Always clear the legacy string type annotation
message.removeMessageAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
}
public String getDestinationAddress(JmsDestination destination, AmqpConnection conn) {
if (destination == null) {
return null;
}
final String name = destination.getName();
// Add prefix if necessary
if (!destination.isTemporary()) {
if (destination.isQueue()) {
String queuePrefix = conn.getQueuePrefix();
if (queuePrefix != null && !name.startsWith(queuePrefix)) {
return queuePrefix + name;
}
}
if (destination.isTopic()) {
String topicPrefix = conn.getTopicPrefix();
if (topicPrefix != null && !name.startsWith(topicPrefix)) {
return topicPrefix + name;
}
}
}
return name;
}
/**
* @return the annotation type value, or null if the supplied destination
* is null or can't be classified
*/
private Object toTypeAnnotation(JmsDestination destination) {
if (destination == null) {
return null;
}
if (destination.isQueue()) {
if (destination.isTemporary()) {
return TEMP_QUEUE_TYPE;
} else {
return QUEUE_TYPE;
}
} else if (destination.isTopic()) {
if (destination.isTemporary()) {
return TEMP_TOPIC_TYPE;
} else {
return TOPIC_TYPE;
}
}
return null;
}
Set<String> splitAttributesString(String typeString) {
if (typeString == null) {
return null;
}
HashSet<String> typeSet = new HashSet<String>();
// Split string on commas and their surrounding whitespace
for (String attr : typeString.split("\\s*,\\s*")) {
// ignore empty values
if (!attr.equals("")) {
typeSet.add(attr);
}
}
return typeSet;
}
private Byte getTypeByte(AmqpJmsMessageFacade message, String annotationName) {
Object typeAnnotation = message.getMessageAnnotation(annotationName);
if (typeAnnotation == null) {
// Doesn't exist, or null.
return null;
} else if (typeAnnotation instanceof Byte) {
// Return the value found.
return (Byte) typeAnnotation;
} else {
// Handle legacy strings.
String typeString = String.valueOf(typeAnnotation);
Set<String> typeSet = null;
if (typeString != null) {
typeSet = splitAttributesString(typeString);
}
if (typeSet != null && !typeSet.isEmpty()) {
if (typeSet.contains(QUEUE_ATTRIBUTE)) {
if (typeSet.contains(TEMPORARY_ATTRIBUTE)) {
return TEMP_QUEUE_TYPE;
} else {
return QUEUE_TYPE;
}
} else if (typeSet.contains(TOPIC_ATTRIBUTE)) {
if (typeSet.contains(TEMPORARY_ATTRIBUTE)) {
return TEMP_TOPIC_TYPE;
} else {
return TOPIC_TYPE;
}
}
}
return null;
}
}
}