| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.qpid.client.message; |
| |
| import java.net.URISyntaxException; |
| import java.util.Collections; |
| import java.util.Enumeration; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| |
| import javax.jms.Destination; |
| import javax.jms.JMSException; |
| import javax.jms.MessageNotWriteableException; |
| import javax.jms.Queue; |
| |
| import org.apache.qpid.client.AMQDestination; |
| import org.apache.qpid.client.AMQQueue; |
| import org.apache.qpid.client.AMQSession; |
| import org.apache.qpid.client.AMQSession_0_8; |
| import org.apache.qpid.client.AMQTopic; |
| import org.apache.qpid.client.CustomJMSXProperty; |
| import org.apache.qpid.framing.AMQShortString; |
| import org.apache.qpid.framing.BasicContentHeaderProperties; |
| import org.apache.qpid.url.AMQBindingURL; |
| import org.apache.qpid.url.BindingURL; |
| |
| |
| public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate |
| { |
| private static final float DESTINATION_CACHE_LOAD_FACTOR = 0.75f; |
| private static final int DESTINATION_CACHE_SIZE = 500; |
| private static final int DESTINATION_CACHE_CAPACITY = (int) (DESTINATION_CACHE_SIZE / DESTINATION_CACHE_LOAD_FACTOR); |
| |
| private static final Map<String, Destination> _destinationCache = |
| Collections.synchronizedMap(new LinkedHashMap<String,Destination>(DESTINATION_CACHE_CAPACITY, |
| DESTINATION_CACHE_LOAD_FACTOR, |
| true) |
| { |
| @Override |
| protected boolean removeEldestEntry(Map.Entry<String, Destination> eldest) |
| { |
| return size() >= DESTINATION_CACHE_SIZE; |
| } |
| }); |
| |
| public static final String JMS_TYPE = "x-jms-type"; |
| public static final boolean STRICT_JMS = Boolean.getBoolean("strict-jms"); |
| |
| |
| private boolean _readableProperties = false; |
| |
| private Destination _destination; |
| private JMSHeaderAdapter _headerAdapter; |
| private static final boolean STRICT_AMQP_COMPLIANCE = |
| Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); |
| |
| private BasicContentHeaderProperties _contentHeaderProperties; |
| |
| // The base set of items that needs to be set. |
| public AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag) |
| { |
| super(deliveryTag); |
| _contentHeaderProperties = properties; |
| _readableProperties = (_contentHeaderProperties != null); |
| _headerAdapter = new JMSHeaderAdapter(_readableProperties ? _contentHeaderProperties.getHeaders() |
| : (new BasicContentHeaderProperties()).getHeaders() ); |
| } |
| |
| // Used for the creation of new messages |
| protected AMQMessageDelegate_0_8() |
| { |
| this(new BasicContentHeaderProperties(), -1); |
| _readableProperties = false; |
| _headerAdapter = new JMSHeaderAdapter(_contentHeaderProperties.getHeaders()); |
| |
| } |
| |
| // Used when generating a received message object |
| protected AMQMessageDelegate_0_8(long deliveryTag, |
| BasicContentHeaderProperties contentHeader, |
| String exchange, |
| String routingKey, |
| AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache, |
| AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache, |
| int addressType) |
| { |
| this(contentHeader, deliveryTag); |
| |
| Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); |
| |
| contentHeader.getHeaders().remove(QpidMessageProperties.QPID_NOT_VALID_BEFORE); |
| |
| AMQDestination dest = null; |
| |
| if(AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL) |
| { |
| // If we have a type set the attempt to use that. |
| if (type != null) |
| { |
| switch (type.intValue()) |
| { |
| case AMQDestination.QUEUE_TYPE: |
| dest = queueDestinationCache.getDestination(exchange, routingKey); |
| break; |
| case AMQDestination.TOPIC_TYPE: |
| dest = topicDestinationCache.getDestination(exchange, routingKey); |
| break; |
| default: |
| // Use the generateDestination method |
| dest = null; |
| } |
| } |
| |
| if (dest == null) |
| { |
| dest = generateDestination(exchange, routingKey); |
| } |
| } |
| else |
| { |
| String subject = null; |
| if (contentHeader.getHeaders() != null |
| && contentHeader.getHeaders().containsKey(QpidMessageProperties.QPID_SUBJECT)) |
| { |
| subject = contentHeader.getHeaders().getString(QpidMessageProperties.QPID_SUBJECT); |
| } |
| if(type == null) |
| { |
| type = addressType; |
| } |
| dest = (AMQDestination) convertToAddressBasedDestination(exchange, |
| routingKey, subject, |
| true, type); |
| } |
| setJMSDestination(dest); |
| } |
| |
| |
| |
| public String getJMSMessageID() throws JMSException |
| { |
| return getContentHeaderProperties().getMessageIdAsString(); |
| } |
| |
| public void setJMSMessageID(String messageId) throws JMSException |
| { |
| if (messageId != null) |
| { |
| getContentHeaderProperties().setMessageId(messageId); |
| } |
| } |
| |
| public void setJMSMessageID(UUID messageId) throws JMSException |
| { |
| if (messageId == null) |
| { |
| getContentHeaderProperties().setMessageId((String)null); |
| } |
| else |
| { |
| getContentHeaderProperties().setMessageId(asShortStringMsgId(messageId)); |
| } |
| } |
| |
| private static final byte[] HEX_DIGITS = {0x30,0x31,0x32,0x33,0x34,0x35,0x36,0x37,0x38,0x39, |
| 0x61,0x62,0x63,0x64,0x65,0x66}; |
| |
| private static AMQShortString asShortStringMsgId(UUID messageId) |
| { |
| long msb = messageId.getMostSignificantBits(); |
| long lsb = messageId.getLeastSignificantBits(); |
| |
| byte[] messageIdBytes = new byte[39]; |
| messageIdBytes[0] = (byte) 'I'; |
| messageIdBytes[1] = (byte) 'D'; |
| messageIdBytes[2] = (byte) ':'; |
| |
| messageIdBytes[3] = HEX_DIGITS[(int)((msb >> 60) & 0xFl)]; |
| messageIdBytes[4] = HEX_DIGITS[(int)((msb >> 56) & 0xFl)]; |
| messageIdBytes[5] = HEX_DIGITS[(int)((msb >> 52) & 0xFl)]; |
| messageIdBytes[6] = HEX_DIGITS[(int)((msb >> 48) & 0xFl)]; |
| messageIdBytes[7] = HEX_DIGITS[(int)((msb >> 44) & 0xFl)]; |
| messageIdBytes[8] = HEX_DIGITS[(int)((msb >> 40) & 0xFl)]; |
| messageIdBytes[9] = HEX_DIGITS[(int)((msb >> 36) & 0xFl)]; |
| messageIdBytes[10] = HEX_DIGITS[(int)((msb >> 32) & 0xFl)]; |
| |
| messageIdBytes[11] = (byte) '-'; |
| messageIdBytes[12] = HEX_DIGITS[(int)((msb >> 28) & 0xFl)]; |
| messageIdBytes[13] = HEX_DIGITS[(int)((msb >> 24) & 0xFl)]; |
| messageIdBytes[14] = HEX_DIGITS[(int)((msb >> 20) & 0xFl)]; |
| messageIdBytes[15] = HEX_DIGITS[(int)((msb >> 16) & 0xFl)]; |
| messageIdBytes[16] = (byte) '-'; |
| messageIdBytes[17] = HEX_DIGITS[(int)((msb >> 12) & 0xFl)]; |
| messageIdBytes[18] = HEX_DIGITS[(int)((msb >> 8) & 0xFl)]; |
| messageIdBytes[19] = HEX_DIGITS[(int)((msb >> 4) & 0xFl)]; |
| messageIdBytes[20] = HEX_DIGITS[(int)(msb & 0xFl)]; |
| messageIdBytes[21] = (byte) '-'; |
| |
| messageIdBytes[22] = HEX_DIGITS[(int)((lsb >> 60) & 0xFl)]; |
| messageIdBytes[23] = HEX_DIGITS[(int)((lsb >> 56) & 0xFl)]; |
| messageIdBytes[24] = HEX_DIGITS[(int)((lsb >> 52) & 0xFl)]; |
| messageIdBytes[25] = HEX_DIGITS[(int)((lsb >> 48) & 0xFl)]; |
| |
| messageIdBytes[26] = (byte) '-'; |
| |
| messageIdBytes[27] = HEX_DIGITS[(int)((lsb >> 44) & 0xFl)]; |
| messageIdBytes[28] = HEX_DIGITS[(int)((lsb >> 40) & 0xFl)]; |
| messageIdBytes[29] = HEX_DIGITS[(int)((lsb >> 36) & 0xFl)]; |
| messageIdBytes[30] = HEX_DIGITS[(int)((lsb >> 32) & 0xFl)]; |
| messageIdBytes[31] = HEX_DIGITS[(int)((lsb >> 28) & 0xFl)]; |
| messageIdBytes[32] = HEX_DIGITS[(int)((lsb >> 24) & 0xFl)]; |
| messageIdBytes[33] = HEX_DIGITS[(int)((lsb >> 20) & 0xFl)]; |
| messageIdBytes[34] = HEX_DIGITS[(int)((lsb >> 16) & 0xFl)]; |
| messageIdBytes[35] = HEX_DIGITS[(int)((lsb >> 12) & 0xFl)]; |
| messageIdBytes[36] = HEX_DIGITS[(int)((lsb >> 8) & 0xFl)]; |
| messageIdBytes[37] = HEX_DIGITS[(int)((lsb >> 4) & 0xFl)]; |
| messageIdBytes[38] = HEX_DIGITS[(int)(lsb & 0xFl)]; |
| |
| return new AMQShortString(messageIdBytes); |
| } |
| |
| public long getJMSTimestamp() throws JMSException |
| { |
| return getContentHeaderProperties().getTimestamp(); |
| } |
| |
| public void setJMSTimestamp(long timestamp) throws JMSException |
| { |
| getContentHeaderProperties().setTimestamp(timestamp); |
| } |
| |
| public byte[] getJMSCorrelationIDAsBytes() throws JMSException |
| { |
| final AMQShortString correlationId = getContentHeaderProperties().getCorrelationId(); |
| return correlationId == null ? null : correlationId.getBytes(); |
| } |
| |
| public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException |
| { |
| getContentHeaderProperties().setCorrelationId(new AMQShortString(bytes)); |
| } |
| |
| public void setJMSCorrelationID(String correlationId) throws JMSException |
| { |
| getContentHeaderProperties().setCorrelationId(correlationId); |
| } |
| |
| public String getJMSCorrelationID() throws JMSException |
| { |
| return getContentHeaderProperties().getCorrelationIdAsString(); |
| } |
| |
| public Destination getJMSReplyTo() throws JMSException |
| { |
| String replyToEncoding = getContentHeaderProperties().getReplyToAsString(); |
| if (replyToEncoding == null) |
| { |
| return null; |
| } |
| else |
| { |
| Destination dest = _destinationCache.get(replyToEncoding); |
| if (dest == null) |
| { |
| try |
| { |
| BindingURL binding = new AMQBindingURL(replyToEncoding); |
| dest = AMQDestination.createDestination(binding); |
| } |
| catch (URISyntaxException e) |
| { |
| if(replyToEncoding.startsWith("/")) |
| { |
| dest = new DefaultRouterDestination(replyToEncoding); |
| } |
| else if(replyToEncoding.contains("/")) |
| { |
| String[] parts = replyToEncoding.split("/",2); |
| dest = new NonBURLReplyToDestination(parts[0], parts[1]); |
| |
| |
| } |
| else |
| { |
| if(getAMQSession().isQueueBound(replyToEncoding, null, null)) |
| { |
| dest = new NonBURLReplyToDestination(replyToEncoding, ""); |
| } |
| else |
| { |
| dest = new DefaultRouterDestination(replyToEncoding); |
| } |
| } |
| |
| } |
| |
| _destinationCache.put(replyToEncoding, dest); |
| } |
| |
| return dest; |
| } |
| } |
| |
| public void setJMSReplyTo(Destination destination) throws JMSException |
| { |
| if (destination == null) |
| { |
| getContentHeaderProperties().setReplyTo((String) null); |
| return; // We're done here |
| } |
| |
| if (!(destination instanceof AMQDestination)) |
| { |
| throw new IllegalArgumentException( |
| "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass()); |
| } |
| |
| final AMQDestination amqd = (AMQDestination) destination; |
| |
| final AMQShortString encodedDestination = amqd.getEncodedName(); |
| _destinationCache.put(encodedDestination.toString(), destination); |
| getContentHeaderProperties().setReplyTo(encodedDestination); |
| } |
| |
| public Destination getJMSDestination() |
| { |
| return _destination; |
| } |
| |
| public void setJMSDestination(Destination destination) |
| { |
| _destination = destination; |
| } |
| |
| public void setContentType(String contentType) |
| { |
| getContentHeaderProperties().setContentType(contentType); |
| } |
| |
| public String getContentType() |
| { |
| return getContentHeaderProperties().getContentTypeAsString(); |
| } |
| |
| public void setEncoding(String encoding) |
| { |
| getContentHeaderProperties().setEncoding(encoding); |
| } |
| |
| public String getEncoding() |
| { |
| return getContentHeaderProperties().getEncodingAsString(); |
| } |
| |
| public String getReplyToString() |
| { |
| return getContentHeaderProperties().getReplyToAsString(); |
| } |
| |
| public int getJMSDeliveryMode() throws JMSException |
| { |
| return getContentHeaderProperties().getDeliveryMode(); |
| } |
| |
| public void setJMSDeliveryMode(int i) throws JMSException |
| { |
| getContentHeaderProperties().setDeliveryMode((byte) i); |
| } |
| |
| public BasicContentHeaderProperties getContentHeaderProperties() |
| { |
| return _contentHeaderProperties; |
| } |
| |
| |
| public String getJMSType() throws JMSException |
| { |
| return getContentHeaderProperties().getTypeAsString(); |
| } |
| |
| public void setJMSType(String string) throws JMSException |
| { |
| getContentHeaderProperties().setType(string); |
| } |
| |
| public long getJMSExpiration() throws JMSException |
| { |
| return getContentHeaderProperties().getExpiration(); |
| } |
| |
| public void setJMSExpiration(long l) throws JMSException |
| { |
| getContentHeaderProperties().setExpiration(l); |
| } |
| |
| |
| |
| public boolean propertyExists(String propertyName) throws JMSException |
| { |
| return getJmsHeaders().propertyExists(propertyName); |
| } |
| |
| public boolean getBooleanProperty(String propertyName) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| return getJmsHeaders().getBoolean(propertyName); |
| } |
| |
| public byte getByteProperty(String propertyName) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| return getJmsHeaders().getByte(propertyName); |
| } |
| |
| public short getShortProperty(String propertyName) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| return getJmsHeaders().getShort(propertyName); |
| } |
| |
| public int getIntProperty(String propertyName) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| return getJmsHeaders().getInteger(propertyName); |
| } |
| |
| public long getLongProperty(String propertyName) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| return getJmsHeaders().getLong(propertyName); |
| } |
| |
| public float getFloatProperty(String propertyName) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| return getJmsHeaders().getFloat(propertyName); |
| } |
| |
| public double getDoubleProperty(String propertyName) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| return getJmsHeaders().getDouble(propertyName); |
| } |
| |
| public String getStringProperty(String propertyName) throws JMSException |
| { |
| //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below. |
| if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString())) |
| { |
| return _contentHeaderProperties.getUserIdAsString(); |
| } |
| else |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| return getJmsHeaders().getString(propertyName); |
| } |
| } |
| |
| public Object getObjectProperty(String propertyName) throws JMSException |
| { |
| return getJmsHeaders().getObject(propertyName); |
| } |
| |
| public Enumeration getPropertyNames() throws JMSException |
| { |
| Set<String> keys = getJmsHeaders().getPropertyNames(); |
| return Collections.enumeration(keys); |
| } |
| |
| public void setBooleanProperty(String propertyName, boolean b) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| checkWritableProperties(); |
| getJmsHeaders().setBoolean(propertyName, b); |
| } |
| |
| public void setByteProperty(String propertyName, byte b) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| checkWritableProperties(); |
| getJmsHeaders().setByte(propertyName, b); |
| } |
| |
| public void setShortProperty(String propertyName, short i) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| checkWritableProperties(); |
| getJmsHeaders().setShort(propertyName, i); |
| } |
| |
| public void setIntProperty(String propertyName, int i) throws JMSException |
| { |
| checkWritableProperties(); |
| getJmsHeaders().setInteger(propertyName, i); |
| } |
| |
| public void setLongProperty(String propertyName, long l) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| checkWritableProperties(); |
| getJmsHeaders().setLong(propertyName, l); |
| } |
| |
| public void setFloatProperty(String propertyName, float f) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| checkWritableProperties(); |
| getJmsHeaders().setFloat(propertyName, f); |
| } |
| |
| public void setDoubleProperty(String propertyName, double v) throws JMSException |
| { |
| if (STRICT_AMQP_COMPLIANCE) |
| { |
| throw new UnsupportedOperationException("JMS Properties not supported in AMQP"); |
| } |
| |
| checkWritableProperties(); |
| getJmsHeaders().setDouble(propertyName, v); |
| } |
| |
| public void setStringProperty(String propertyName, String value) throws JMSException |
| { |
| checkWritableProperties(); |
| getJmsHeaders().setString(propertyName, value); |
| } |
| |
| public void setObjectProperty(String propertyName, Object object) throws JMSException |
| { |
| checkWritableProperties(); |
| getJmsHeaders().setObject(propertyName, object); |
| } |
| |
| public void removeProperty(String propertyName) throws JMSException |
| { |
| getJmsHeaders().remove(propertyName); |
| } |
| |
| |
| private JMSHeaderAdapter getJmsHeaders() |
| { |
| return _headerAdapter; |
| } |
| |
| protected void checkWritableProperties() throws MessageNotWriteableException |
| { |
| if (_readableProperties) |
| { |
| throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable"); |
| } |
| } |
| |
| |
| public int getJMSPriority() throws JMSException |
| { |
| return getContentHeaderProperties().getPriority(); |
| } |
| |
| public void setJMSPriority(int i) throws JMSException |
| { |
| getContentHeaderProperties().setPriority((byte) i); |
| } |
| |
| public void clearProperties() throws JMSException |
| { |
| getJmsHeaders().clear(); |
| |
| _readableProperties = false; |
| } |
| |
| @Override |
| Object getProperty(final String name) |
| { |
| return getContentHeaderProperties().getHeaders().get(name); |
| } |
| |
| @Override |
| boolean hasProperty(final String name) |
| { |
| return getContentHeaderProperties().getHeaders().containsKey(name); |
| } |
| |
| private static class DefaultRouterDestination extends AMQDestination implements Queue |
| { |
| private static final long serialVersionUID = -5042408431861384536L; |
| |
| public DefaultRouterDestination(final String replyToEncoding) |
| { |
| super("", |
| "direct", |
| replyToEncoding, |
| replyToEncoding); |
| } |
| |
| @Override |
| public boolean isNameRequired() |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean neverDeclare() |
| { |
| return true; |
| } |
| } |
| |
| private static class NonBURLReplyToDestination extends AMQDestination implements Queue |
| { |
| private static final long serialVersionUID = 122897705932489259L; |
| |
| public NonBURLReplyToDestination(final String exchange, final String routingKey) |
| { |
| super(exchange, |
| null, |
| routingKey, |
| routingKey); |
| } |
| |
| @Override |
| public boolean isNameRequired() |
| { |
| return false; |
| } |
| |
| @Override |
| public boolean neverDeclare() |
| { |
| return true; |
| } |
| } |
| } |