blob: 34b47137e5df9ce68700ce01109cb5bc66412c0b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
using System;
using System.Collections;
using System.Text;
using log4net;
using Apache.Qpid.Framing;
using Apache.Qpid.Messaging;
using Apache.Qpid.Buffer;
namespace Apache.Qpid.Client.Message
public abstract class AbstractQmsMessage : AMQMessage, IMessage
private static ILog log = LogManager.GetLogger(typeof(AbstractQmsMessage));
protected bool _redelivered;
protected ByteBuffer _data;
protected bool _readableMessage = false;
private QpidHeaders _headers;
protected AbstractQmsMessage(ByteBuffer data)
: base(new BasicContentHeaderProperties())
protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
: this(contentHeader, deliveryTag)
protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
private void Init(ByteBuffer data)
_data = data;
if ( _data != null )
_readableMessage = (data != null);
if ( ContentHeaderProperties.Headers == null )
ContentHeaderProperties.Headers = new FieldTable();
_headers = new QpidHeaders(ContentHeaderProperties.Headers);
// Properties
/// <summary>
/// The application message identifier
/// </summary>
public string MessageId
if (ContentHeaderProperties.MessageId == null)
ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
return ContentHeaderProperties.MessageId;
set { ContentHeaderProperties.MessageId = value; }
/// <summary>
/// The message timestamp
/// </summary>
public long Timestamp
// TODO: look at ulong/long choice
return (long) ContentHeaderProperties.Timestamp;
ContentHeaderProperties.Timestamp = (ulong) value;
/// <summary>
/// The <see cref="CorrelationId"/> as a byte array.
/// </summary>
public byte[] CorrelationIdAsBytes
get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); }
set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); }
/// <summary>
/// The application correlation identifier
/// </summary>
public string CorrelationId
get { return ContentHeaderProperties.CorrelationId; }
set { ContentHeaderProperties.CorrelationId = value; }
struct Dest
public string ExchangeName;
public string RoutingKey;
public Dest(string exchangeName, string routingKey)
ExchangeName = exchangeName;
RoutingKey = routingKey;
/// <summary>
/// Exchange name of the reply-to address
/// </summary>
public string ReplyToExchangeName
return ReadReplyToHeader().ExchangeName;
BindingURL dest = ReadReplyToHeader();
dest.ExchangeName = value;
/// <summary>
/// Routing key of the reply-to address
/// </summary>
public string ReplyToRoutingKey
return ReadReplyToHeader().RoutingKey;
BindingURL dest = ReadReplyToHeader();
dest.RoutingKey = value;
/// <summary>
/// Non-persistent (1) or persistent (2)
/// </summary>
public DeliveryMode DeliveryMode
byte b = ContentHeaderProperties.DeliveryMode;
switch (b)
case 1:
return DeliveryMode.NonPersistent;
case 2:
return DeliveryMode.Persistent;
throw new QpidException("Illegal value for delivery mode in content header properties");
ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2);
/// <summary>
/// True, if this is a redelivered message
/// </summary>
public bool Redelivered
get { return _redelivered; }
set { _redelivered = value; }
/// <summary>
/// The message type name
/// </summary>
public string Type
get { return ContentHeaderProperties.Type; }
set { ContentHeaderProperties.Type = value; }
/// <summary>
/// Message expiration specification
/// </summary>
public long Expiration
get { return ContentHeaderProperties.Expiration; }
set { ContentHeaderProperties.Expiration = value; }
/// <summary>
/// The message priority, 0 to 9
/// </summary>
public byte Priority
get { return ContentHeaderProperties.Priority; }
set { ContentHeaderProperties.Priority = (byte) value; }
/// <summary>
/// The MIME Content Type
/// </summary>
public string ContentType
get { return ContentHeaderProperties.ContentType; }
set { ContentHeaderProperties.ContentType = value; }
/// <summary>
/// The MIME Content Encoding
/// </summary>
public string ContentEncoding
get { return ContentHeaderProperties.Encoding; }
set { ContentHeaderProperties.Encoding = value; }
/// <summary>
/// Headers of this message
/// </summary>
public IHeaders Headers
get { return _headers; }
/// <summary>
/// The creating user id
/// </summary>
public string UserId
get { return ContentHeaderProperties.UserId; }
set { ContentHeaderProperties.UserId = value; }
/// <summary>
/// The creating application id
/// </summary>
public string AppId
get { return ContentHeaderProperties.AppId; }
set { ContentHeaderProperties.AppId = value; }
/// <summary>
/// Intra-cluster routing identifier
/// </summary>
public string ClusterId
get { return ContentHeaderProperties.ClusterId; }
set { ContentHeaderProperties.ClusterId = value; }
/// <summary>
/// Return the raw byte array that is used to populate the frame when sending
/// the message.
/// </summary>
/// <value>a byte array of message data</value>
public ByteBuffer Data
if (_data != null)
if (!_readableMessage)
// Make sure we rewind the data just in case any method has moved the
// position beyond the start.
return _data;
_data = value;
public void Acknowledge()
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
if (_channel != null)
// we set multiple to true here since acknowledgement implies acknowledge of all count messages
// received on the session
_channel.AcknowledgeMessage((ulong)DeliveryTag, true);
public abstract void ClearBodyImpl();
public void ClearBody()
_readableMessage = false;
/// <summary>
/// Get a String representation of the body of the message. Used in the
/// toString() method which outputs this before message properties.
/// </summary>
/// <exception cref="QpidException"></exception>
public abstract string ToBodyString();
public override string ToString()
StringBuilder buf = new StringBuilder("Body:\n");
buf.Append("\nQmsTimestamp: ").Append(Timestamp);
buf.Append("\nQmsExpiration: ").Append(Expiration);
buf.Append("\nQmsPriority: ").Append(Priority);
buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
buf.Append("\nAMQ message number: ").Append(DeliveryTag);
if (ContentHeaderProperties.Headers == null)
return buf.ToString();
catch (Exception e)
return e.ToString();
public FieldTable PopulateHeadersFromMessageProperties()
if (ContentHeaderProperties.Headers == null)
return null;
// We need to convert every property into a String representation
// Note that type information is preserved in the property name
FieldTable table = new FieldTable();
foreach (DictionaryEntry entry in ContentHeaderProperties.Headers)
string propertyName = (string) entry.Key;
if (propertyName == null)
table[propertyName] = entry.Value.ToString();
return table;
public BasicContentHeaderProperties ContentHeaderProperties
return (BasicContentHeaderProperties) _contentHeaderProperties;
protected virtual void Reset()
_readableMessage = true;
public bool IsReadable
get { return _readableMessage; }
public bool isWritable
get { return !_readableMessage; }
protected void CheckReadable()
if ( !_readableMessage )
throw new MessageNotReadableException("You need to call reset() to make the message readable");
/// <summary>
/// Decodes the replyto field if one is set.
/// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and
/// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is
/// empty the replyto field is expected to being with ':'.
/// Anyhting other than a two part replyto field sperated with a ':' will result in an exception.
/// </summary>
/// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
private BindingURL ReadReplyToHeader()
string replyToEncoding = ContentHeaderProperties.ReplyTo;
//log.Debug("replyToEncoding = " + replyToEncoding);
BindingURL bindingUrl = new BindingURL(replyToEncoding);
//log.Debug("bindingUrl = " + bindingUrl.ToString());
return bindingUrl;
//log.Info("replyToEncoding = " + replyToEncoding);
// if ( replyToEncoding == null )
// {
// return new Dest();
// } else
// {
// // Split the replyto field on a ':'
// string[] split = replyToEncoding.Split(':');
// // Ensure that the replyto field argument only consisted of two parts.
// if ( split.Length != 2 )
// {
// throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
// }
// // Extract the exchange name and routing key from the split replyto field.
// string exchangeName = split[0];
// string[] split2 = split[1].Split('/');
// string routingKey = split2[3];
// return new Dest(exchangeName, routingKey);
// }
private void WriteReplyToHeader(BindingURL dest)
string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
ContentHeaderProperties.ReplyTo = encodedDestination;
public class BindingURL
public readonly static string OPTION_EXCLUSIVE = "exclusive";
public readonly static string OPTION_AUTODELETE = "autodelete";
public readonly static string OPTION_DURABLE = "durable";
public readonly static string OPTION_CLIENTID = "clientid";
public readonly static string OPTION_SUBSCRIPTION = "subscription";
public readonly static string OPTION_ROUTING_KEY = "routingkey";
/// <summary> Holds the undecoded URL </summary>
string url;
/// <summary> Holds the decoded options. </summary>
IDictionary options = new Hashtable();
/// <summary> Holds the decoded exchange class. </summary>
string exchangeClass;
/// <summary> Holds the decoded exchange name. </summary>
string exchangeName;
/// <summary> Holds the destination name. </summary>
string destination;
/// <summary> Holds the decoded queue name. </summary>
string queueName;
/// <summary>
/// The binding URL has the format:
/// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
/// </summary>
public BindingURL(string url)
this.url = url;
public string Url { get { return url; } }
public string ExchangeClass
get { return exchangeClass; }
set { exchangeClass = value; }
public string ExchangeName
get { return exchangeName; }
set { exchangeName = value; }
public string QueueName
get { return queueName; }
set { queueName = value; }
public string DestinationName
get { return destination; }
set { destination = value; }
public string RoutingKey {
get { return (string)options[OPTION_ROUTING_KEY]; }
set { options[OPTION_ROUTING_KEY] = value; }
public bool ContainsOption(string key) { return options.Contains(key); }
public string ToString()
return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName +
", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] ";
private void Parse()
Uri binding = new Uri(url);
// Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified.
string exchangeClass = binding.Scheme;
if (exchangeClass == null)
url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url;
this.exchangeClass = exchangeClass;
// Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified.
string exchangeName = binding.Host;
if (exchangeName == null)
if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS))
this.exchangeName = "";
this.exchangeName = exchangeName;
// Extract the destination and queue name.
if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals(""))
throw new UriFormatException("Destination or Queue required");
int slashOffset = binding.AbsolutePath.IndexOf("/", 1);
if (slashOffset == -1)
throw new UriFormatException("Destination required");
String path = binding.AbsolutePath;
this.destination = path.Substring(1, slashOffset - 1);
this.queueName = path.Substring(slashOffset + 1);
ParseOptions(options, binding.Query);
// If the routing key is not set as an option, set it to the destination name.
if (!ContainsOption(OPTION_ROUTING_KEY))
options[OPTION_ROUTING_KEY] = destination;
/// <summary>
/// options looks like this
/// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'
/// </summary>
public static void ParseOptions(IDictionary optionMap, string options)
// Check that there really are some options to parse.
if ((options == null) || (options.IndexOf('=') == -1))
int optionIndex = options.IndexOf('=');
string option = options.Substring(0, optionIndex);
int length = options.Length;
int nestedQuotes = 0;
// Holds the index of the final "'".
int valueIndex = optionIndex;
// Loop over all the options.Dest
while ((nestedQuotes > 0) || (valueIndex < length))
if (valueIndex >= length)
if (options[valueIndex] == '\'')
if ((valueIndex + 1) < options.Length)
if ((options[valueIndex + 1] == '&') ||
(options[valueIndex + 1] == ',') ||
(options[valueIndex + 1] == ';') ||
(options[valueIndex + 1] == '\''))
if (nestedQuotes == 0)
// We've found the value of an option
// We are at the end of the string
// Check to see if we are corectly closing quotes
if (options[valueIndex] == '\'')