blob: 9ab03412fe121ec78d0544007b7feb928322b6e4 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client.message;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Map;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.collections.ReferenceMap;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
public static final String JMS_TYPE = "x-jms-type";
private boolean _readableProperties = false;
private Destination _destination;
private JMSHeaderAdapter _headerAdapter;
private static final boolean STRICT_AMQP_COMPLIANCE =
Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
private ContentHeaderProperties _contentHeaderProperties;
// The base set of items that needs to be set.
private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
{
super(deliveryTag);
_contentHeaderProperties = properties;
_readableProperties = (_contentHeaderProperties != null);
_headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
: (new BasicContentHeaderProperties()).getHeaders() );
}
// Used for the creation of new messages
protected AMQMessageDelegate_0_8()
{
this(new BasicContentHeaderProperties(), -1);
_readableProperties = false;
_headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
}
// Used when generating a received message object
protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
AMQShortString routingKey)
{
this(contentHeader, deliveryTag);
Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
AMQDestination dest = null;
// If we have a type set the attempt to use that.
if (type != null)
{
switch (type.intValue())
{
case AMQDestination.QUEUE_TYPE:
dest = new AMQQueue(exchange, routingKey, routingKey);
break;
case AMQDestination.TOPIC_TYPE:
dest = new AMQTopic(exchange, routingKey, null);
break;
default:
// Use the generateDestination method
dest = null;
}
}
if (dest == null)
{
dest = generateDestination(exchange, routingKey);
}
setJMSDestination(dest);
}
public String getJMSMessageID() throws JMSException
{
return getContentHeaderProperties().getMessageIdAsString();
}
public void setJMSMessageID(String messageId) throws JMSException
{
if (messageId != null)
{
getContentHeaderProperties().setMessageId(messageId);
}
}
public void setJMSMessageID(UUID messageId) throws JMSException
{
if (messageId != null)
{
getContentHeaderProperties().setMessageId("ID:" + messageId);
}
}
public long getJMSTimestamp() throws JMSException
{
return getContentHeaderProperties().getTimestamp();
}
public void setJMSTimestamp(long timestamp) throws JMSException
{
getContentHeaderProperties().setTimestamp(timestamp);
}
public byte[] getJMSCorrelationIDAsBytes() throws JMSException
{
return getContentHeaderProperties().getCorrelationIdAsString().getBytes();
}
public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
{
getContentHeaderProperties().setCorrelationId(new String(bytes));
}
public void setJMSCorrelationID(String correlationId) throws JMSException
{
getContentHeaderProperties().setCorrelationId(correlationId);
}
public String getJMSCorrelationID() throws JMSException
{
return getContentHeaderProperties().getCorrelationIdAsString();
}
public Destination getJMSReplyTo() throws JMSException
{
String replyToEncoding = getContentHeaderProperties().getReplyToAsString();
if (replyToEncoding == null)
{
return null;
}
else
{
Destination dest = (Destination) _destinationCache.get(replyToEncoding);
if (dest == null)
{
try
{
BindingURL binding = new AMQBindingURL(replyToEncoding);
dest = AMQDestination.createDestination(binding);
}
catch (URISyntaxException e)
{
throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
}
_destinationCache.put(replyToEncoding, dest);
}
return dest;
}
}
public void setJMSReplyTo(Destination destination) throws JMSException
{
if (destination == null)
{
getContentHeaderProperties().setReplyTo((String) null);
return; // We're done here
}
if (!(destination instanceof AMQDestination))
{
throw new IllegalArgumentException(
"ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
}
final AMQDestination amqd = (AMQDestination) destination;
final AMQShortString encodedDestination = amqd.getEncodedName();
_destinationCache.put(encodedDestination, destination);
getContentHeaderProperties().setReplyTo(encodedDestination);
}
public Destination getJMSDestination() throws JMSException
{
return _destination;
}
public void setJMSDestination(Destination destination)
{
_destination = destination;
}
public void setContentType(String contentType)
{
getContentHeaderProperties().setContentType(contentType);
}
public String getContentType()
{
return getContentHeaderProperties().getContentTypeAsString();
}
public void setEncoding(String encoding)
{
getContentHeaderProperties().setEncoding(encoding);
}
public String getEncoding()
{
return getContentHeaderProperties().getEncodingAsString();
}
public String getReplyToString()
{
return getContentHeaderProperties().getReplyToAsString();
}
public int getJMSDeliveryMode() throws JMSException
{
return getContentHeaderProperties().getDeliveryMode();
}
public void setJMSDeliveryMode(int i) throws JMSException
{
getContentHeaderProperties().setDeliveryMode((byte) i);
}
public BasicContentHeaderProperties getContentHeaderProperties()
{
return (BasicContentHeaderProperties) _contentHeaderProperties;
}
public String getJMSType() throws JMSException
{
return getContentHeaderProperties().getTypeAsString();
}
public void setJMSType(String string) throws JMSException
{
getContentHeaderProperties().setType(string);
}
public long getJMSExpiration() throws JMSException
{
return getContentHeaderProperties().getExpiration();
}
public void setJMSExpiration(long l) throws JMSException
{
getContentHeaderProperties().setExpiration(l);
}
public boolean propertyExists(String propertyName) throws JMSException
{
return getJmsHeaders().propertyExists(propertyName);
}
public boolean getBooleanProperty(String propertyName) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
return getJmsHeaders().getBoolean(propertyName);
}
public byte getByteProperty(String propertyName) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
return getJmsHeaders().getByte(propertyName);
}
public short getShortProperty(String propertyName) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
return getJmsHeaders().getShort(propertyName);
}
public int getIntProperty(String propertyName) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
return getJmsHeaders().getInteger(propertyName);
}
public long getLongProperty(String propertyName) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
return getJmsHeaders().getLong(propertyName);
}
public float getFloatProperty(String propertyName) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
return getJmsHeaders().getFloat(propertyName);
}
public double getDoubleProperty(String propertyName) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
return getJmsHeaders().getDouble(propertyName);
}
public String getStringProperty(String propertyName) throws JMSException
{
//NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
{
return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
}
else
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
return getJmsHeaders().getString(propertyName);
}
}
public Object getObjectProperty(String propertyName) throws JMSException
{
return getJmsHeaders().getObject(propertyName);
}
public Enumeration getPropertyNames() throws JMSException
{
return getJmsHeaders().getPropertyNames();
}
public void setBooleanProperty(String propertyName, boolean b) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
checkWritableProperties();
getJmsHeaders().setBoolean(propertyName, b);
}
public void setByteProperty(String propertyName, byte b) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
checkWritableProperties();
getJmsHeaders().setByte(propertyName, new Byte(b));
}
public void setShortProperty(String propertyName, short i) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
checkWritableProperties();
getJmsHeaders().setShort(propertyName, new Short(i));
}
public void setIntProperty(String propertyName, int i) throws JMSException
{
checkWritableProperties();
getJmsHeaders().setInteger(propertyName, new Integer(i));
}
public void setLongProperty(String propertyName, long l) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
checkWritableProperties();
getJmsHeaders().setLong(propertyName, new Long(l));
}
public void setFloatProperty(String propertyName, float f) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
checkWritableProperties();
getJmsHeaders().setFloat(propertyName, new Float(f));
}
public void setDoubleProperty(String propertyName, double v) throws JMSException
{
if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
checkWritableProperties();
getJmsHeaders().setDouble(propertyName, new Double(v));
}
public void setStringProperty(String propertyName, String value) throws JMSException
{
checkWritableProperties();
getJmsHeaders().setString(propertyName, value);
}
public void setObjectProperty(String propertyName, Object object) throws JMSException
{
checkWritableProperties();
getJmsHeaders().setObject(propertyName, object);
}
public void removeProperty(String propertyName) throws JMSException
{
getJmsHeaders().remove(propertyName);
}
private JMSHeaderAdapter getJmsHeaders()
{
return _headerAdapter;
}
protected void checkWritableProperties() throws MessageNotWriteableException
{
if (_readableProperties)
{
throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
}
}
public int getJMSPriority() throws JMSException
{
return getContentHeaderProperties().getPriority();
}
public void setJMSPriority(int i) throws JMSException
{
getContentHeaderProperties().setPriority((byte) i);
}
public void clearProperties() throws JMSException
{
getJmsHeaders().clear();
_readableProperties = false;
}
}