blob: 1b6a2cdc53dd4aeb3fbd59b447cad7b301f5b6af [file] [log] [blame]
package org.apache.qpid.messaging.util;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.qpid.messaging.Message;
import org.apache.qpid.messaging.MessageEncodingException;
import org.apache.qpid.messaging.MessageFactory;
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageDeliveryMode;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.transport.codec.BBEncoder;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
/**
* A generic message factory that is based on the AMQP 0-10 encoding.
*
*/
public class MessageFactory_AMQP_0_10 extends AbstractMessageFactory
{
protected Class<? extends MessageFactory> getFactoryClass()
{
return MessageFactory_AMQP_0_10.class;
}
protected Map<String,Object> decodeAsMap(ByteBuffer buf) throws MessageEncodingException
{
try
{
BBDecoder decorder = new BBDecoder();
decorder.init(buf);
return decorder.readMap();
}
catch (Exception e)
{
throw new MessageEncodingException("Error decoding content as Map",e);
}
}
protected ByteBuffer encodeMap(Map<String,Object> map) throws MessageEncodingException
{
try
{
//need to investigate the capacity here.
BBEncoder encoder = new BBEncoder(1024);
encoder.writeMap(map);
return (ByteBuffer)encoder.buffer().flip();
}
catch (Exception e)
{
throw new MessageEncodingException("Cannot encode Map" ,e);
}
}
protected List<Object> decodeAsList(ByteBuffer buf) throws MessageEncodingException
{
try
{
BBDecoder decorder = new BBDecoder();
decorder.init(buf);
return decorder.readList();
}
catch (Exception e)
{
throw new MessageEncodingException("Error decoding content as List",e);
}
}
protected ByteBuffer encodeList(List<Object> list) throws MessageEncodingException
{
try
{
//need to investigate the capacity here.
BBEncoder encoder = new BBEncoder(1024);
encoder.writeList(list);
return (ByteBuffer)encoder.buffer().flip();
}
catch (Exception e)
{
throw new MessageEncodingException("Cannot encode List" ,e);
}
}
@Override
protected Message createFactorySpecificMessageDelegate()
{
return new Mesage_AMQP_0_10_Delegate();
}
class Mesage_AMQP_0_10_Delegate implements Message
{
private MessageProperties _messageProps;
private DeliveryProperties _deliveryProps;
private UUIDGen _ssnNameGenerator = UUIDs.newGenerator();
// creating a new message for sending
protected Mesage_AMQP_0_10_Delegate()
{
this(new MessageProperties(),new DeliveryProperties());
}
// Message received with data.
protected Mesage_AMQP_0_10_Delegate(MessageProperties messageProps,
DeliveryProperties deliveryProps)
{
_messageProps = messageProps;
_deliveryProps = deliveryProps;
}
@Override
public String getMessageId() throws MessagingException
{
return _messageProps.getMessageId().toString();
}
@Override
public void setMessageId(String messageId) throws MessagingException
{
// Temp hack for the time being
_messageProps.setMessageId(_ssnNameGenerator.generate());
}
@Override
public String getSubject() throws MessagingException
{
Map<String,Object> props = _messageProps.getApplicationHeaders();
return props == null ? null : (String)props.get(QPID_SUBJECT);
}
@Override
public void setSubject(String subject) throws MessagingException
{
Map<String,Object> props = _messageProps.getApplicationHeaders();
if (props == null)
{
props = new HashMap<String,Object>();
_messageProps.setApplicationHeaders(props);
}
props.put(QPID_SUBJECT, subject);
}
@Override
public String getContentType() throws MessagingException
{
return _messageProps.getContentType();
}
@Override
public void setContentType(String contentType)
throws MessagingException
{
_messageProps.setContentType(contentType);
}
@Override
public String getCorrelationId() throws MessagingException
{
return new String(_messageProps.getCorrelationId());
}
@Override
public void setCorrelationId(String correlationId)
throws MessagingException
{
_messageProps.setCorrelationId(correlationId.getBytes());
}
@Override
public String getReplyTo() throws MessagingException
{
return addressFrom0_10_ReplyTo(_deliveryProps.getExchange(),
_deliveryProps.getRoutingKey());
}
@Override
public void setReplyTo(String replyTo) throws MessagingException
{
// TODO
}
@Override
public String getUserId() throws MessagingException
{
return new String(_messageProps.getUserId());
}
@Override
public void setUserId(String userId) throws MessagingException
{
_messageProps.setUserId(userId.getBytes());
}
@Override
public boolean isDurable() throws MessagingException
{
return _deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT;
}
@Override
public void setDurable(boolean durable) throws MessagingException
{
_deliveryProps.setDeliveryMode(durable ?
MessageDeliveryMode.PERSISTENT: MessageDeliveryMode.NON_PERSISTENT);
}
@Override
public boolean isRedelivered() throws MessagingException
{
return _deliveryProps.getRedelivered();
}
@Override
public void setRedelivered(boolean redelivered)
throws MessagingException
{
_deliveryProps.setRedelivered(redelivered);
}
@Override
public int getPriority() throws MessagingException
{
return _deliveryProps.getPriority().getValue();
}
@Override
public void setPriority(int priority) throws MessagingException
{
_deliveryProps.setPriority(MessageDeliveryPriority.get((short)priority));
}
@Override
public long getTtl() throws MessagingException
{
return _deliveryProps.getTtl();
}
@Override
public void setTtl(long ttl) throws MessagingException
{
_deliveryProps.setTtl(ttl);
}
@Override
public long getTimestamp() throws MessagingException
{
return _deliveryProps.getTimestamp();
}
@Override
public void setTimestamp(long timestamp) throws MessagingException
{
_deliveryProps.setTimestamp(timestamp);
}
@Override
public Map<String, Object> getProperties() throws MessagingException
{
return _messageProps.getApplicationHeaders();
}
@Override
public void setProperty(String key, Object value)
throws MessagingException
{
Map<String,Object> props = _messageProps.getApplicationHeaders();
if (props == null)
{
props = new HashMap<String,Object>();
_messageProps.setApplicationHeaders(props);
}
props.put(key, value);
}
private String addressFrom0_10_ReplyTo(String exchange, String routingKey)
{
if ("".equals(exchange)) // type Queue
{
return routingKey;
}
else
{
return exchange + "/" + routingKey;
}
}
// noop, this is for the headers only.
@Override
public ByteBuffer getContent() throws MessagingException
{
throw new MessagingException("Empty!");
}
}
}