blob: 34b47137e5df9ce68700ce01109cb5bc66412c0b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Collections;
using System.Text;
using log4net;
using Apache.Qpid.Framing;
using Apache.Qpid.Messaging;
using Apache.Qpid.Buffer;
namespace Apache.Qpid.Client.Message
{
public abstract class AbstractQmsMessage : AMQMessage, IMessage
{
private static ILog log = LogManager.GetLogger(typeof(AbstractQmsMessage));
protected bool _redelivered;
protected ByteBuffer _data;
protected bool _readableMessage = false;
private QpidHeaders _headers;
protected AbstractQmsMessage(ByteBuffer data)
: base(new BasicContentHeaderProperties())
{
Init(data);
}
protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
: this(contentHeader, deliveryTag)
{
Init(data);
}
protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
{
Init(null);
}
private void Init(ByteBuffer data)
{
_data = data;
if ( _data != null )
{
_data.Acquire();
}
_readableMessage = (data != null);
if ( ContentHeaderProperties.Headers == null )
ContentHeaderProperties.Headers = new FieldTable();
_headers = new QpidHeaders(ContentHeaderProperties.Headers);
}
//
// Properties
//
/// <summary>
/// The application message identifier
/// </summary>
public string MessageId
{
get
{
if (ContentHeaderProperties.MessageId == null)
{
ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
}
return ContentHeaderProperties.MessageId;
}
set { ContentHeaderProperties.MessageId = value; }
}
/// <summary>
/// The message timestamp
/// </summary>
public long Timestamp
{
get
{
// TODO: look at ulong/long choice
return (long) ContentHeaderProperties.Timestamp;
}
set
{
ContentHeaderProperties.Timestamp = (ulong) value;
}
}
/// <summary>
/// The <see cref="CorrelationId"/> as a byte array.
/// </summary>
public byte[] CorrelationIdAsBytes
{
get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); }
set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); }
}
/// <summary>
/// The application correlation identifier
/// </summary>
public string CorrelationId
{
get { return ContentHeaderProperties.CorrelationId; }
set { ContentHeaderProperties.CorrelationId = value; }
}
struct Dest
{
public string ExchangeName;
public string RoutingKey;
public Dest(string exchangeName, string routingKey)
{
ExchangeName = exchangeName;
RoutingKey = routingKey;
}
}
/// <summary>
/// Exchange name of the reply-to address
/// </summary>
public string ReplyToExchangeName
{
get
{
return ReadReplyToHeader().ExchangeName;
}
set
{
BindingURL dest = ReadReplyToHeader();
dest.ExchangeName = value;
WriteReplyToHeader(dest);
}
}
/// <summary>
/// Routing key of the reply-to address
/// </summary>
public string ReplyToRoutingKey
{
get
{
return ReadReplyToHeader().RoutingKey;
}
set
{
BindingURL dest = ReadReplyToHeader();
dest.RoutingKey = value;
WriteReplyToHeader(dest);
}
}
/// <summary>
/// Non-persistent (1) or persistent (2)
/// </summary>
public DeliveryMode DeliveryMode
{
get
{
byte b = ContentHeaderProperties.DeliveryMode;
switch (b)
{
case 1:
return DeliveryMode.NonPersistent;
case 2:
return DeliveryMode.Persistent;
default:
throw new QpidException("Illegal value for delivery mode in content header properties");
}
}
set
{
ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2);
}
}
/// <summary>
/// True, if this is a redelivered message
/// </summary>
public bool Redelivered
{
get { return _redelivered; }
set { _redelivered = value; }
}
/// <summary>
/// The message type name
/// </summary>
public string Type
{
get { return ContentHeaderProperties.Type; }
set { ContentHeaderProperties.Type = value; }
}
/// <summary>
/// Message expiration specification
/// </summary>
public long Expiration
{
get { return ContentHeaderProperties.Expiration; }
set { ContentHeaderProperties.Expiration = value; }
}
/// <summary>
/// The message priority, 0 to 9
/// </summary>
public byte Priority
{
get { return ContentHeaderProperties.Priority; }
set { ContentHeaderProperties.Priority = (byte) value; }
}
/// <summary>
/// The MIME Content Type
/// </summary>
public string ContentType
{
get { return ContentHeaderProperties.ContentType; }
set { ContentHeaderProperties.ContentType = value; }
}
/// <summary>
/// The MIME Content Encoding
/// </summary>
public string ContentEncoding
{
get { return ContentHeaderProperties.Encoding; }
set { ContentHeaderProperties.Encoding = value; }
}
/// <summary>
/// Headers of this message
/// </summary>
public IHeaders Headers
{
get { return _headers; }
}
/// <summary>
/// The creating user id
/// </summary>
public string UserId
{
get { return ContentHeaderProperties.UserId; }
set { ContentHeaderProperties.UserId = value; }
}
/// <summary>
/// The creating application id
/// </summary>
public string AppId
{
get { return ContentHeaderProperties.AppId; }
set { ContentHeaderProperties.AppId = value; }
}
/// <summary>
/// Intra-cluster routing identifier
/// </summary>
public string ClusterId
{
get { return ContentHeaderProperties.ClusterId; }
set { ContentHeaderProperties.ClusterId = value; }
}
/// <summary>
/// Return the raw byte array that is used to populate the frame when sending
/// the message.
/// </summary>
/// <value>a byte array of message data</value>
public ByteBuffer Data
{
get
{
if (_data != null)
{
if (!_readableMessage)
{
_data.Flip();
}
else
{
// Make sure we rewind the data just in case any method has moved the
// position beyond the start.
_data.Rewind();
}
}
return _data;
}
set
{
_data = value;
}
}
public void Acknowledge()
{
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
if (_channel != null)
{
// we set multiple to true here since acknowledgement implies acknowledge of all count messages
// received on the session
_channel.AcknowledgeMessage((ulong)DeliveryTag, true);
}
}
public abstract void ClearBodyImpl();
public void ClearBody()
{
ClearBodyImpl();
_readableMessage = false;
}
/// <summary>
/// Get a String representation of the body of the message. Used in the
/// toString() method which outputs this before message properties.
/// </summary>
/// <exception cref="QpidException"></exception>
public abstract string ToBodyString();
public override string ToString()
{
try
{
StringBuilder buf = new StringBuilder("Body:\n");
buf.Append(ToBodyString());
buf.Append("\nQmsTimestamp: ").Append(Timestamp);
buf.Append("\nQmsExpiration: ").Append(Expiration);
buf.Append("\nQmsPriority: ").Append(Priority);
buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
buf.Append("\nAMQ message number: ").Append(DeliveryTag);
buf.Append("\nProperties:");
if (ContentHeaderProperties.Headers == null)
{
buf.Append("<NONE>");
}
else
{
buf.Append(Headers.ToString());
}
return buf.ToString();
}
catch (Exception e)
{
return e.ToString();
}
}
public FieldTable PopulateHeadersFromMessageProperties()
{
if (ContentHeaderProperties.Headers == null)
{
return null;
}
else
{
//
// We need to convert every property into a String representation
// Note that type information is preserved in the property name
//
FieldTable table = new FieldTable();
foreach (DictionaryEntry entry in ContentHeaderProperties.Headers)
{
string propertyName = (string) entry.Key;
if (propertyName == null)
{
continue;
}
else
{
table[propertyName] = entry.Value.ToString();
}
}
return table;
}
}
public BasicContentHeaderProperties ContentHeaderProperties
{
get
{
return (BasicContentHeaderProperties) _contentHeaderProperties;
}
}
protected virtual void Reset()
{
_readableMessage = true;
}
public bool IsReadable
{
get { return _readableMessage; }
}
public bool isWritable
{
get { return !_readableMessage; }
}
protected void CheckReadable()
{
if ( !_readableMessage )
{
throw new MessageNotReadableException("You need to call reset() to make the message readable");
}
}
/// <summary>
/// Decodes the replyto field if one is set.
///
/// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and
/// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is
/// empty the replyto field is expected to being with ':'.
///
/// Anyhting other than a two part replyto field sperated with a ':' will result in an exception.
/// </summary>
///
/// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
private BindingURL ReadReplyToHeader()
{
string replyToEncoding = ContentHeaderProperties.ReplyTo;
//log.Debug("replyToEncoding = " + replyToEncoding);
BindingURL bindingUrl = new BindingURL(replyToEncoding);
//log.Debug("bindingUrl = " + bindingUrl.ToString());
return bindingUrl;
//log.Info("replyToEncoding = " + replyToEncoding);
// if ( replyToEncoding == null )
// {
// return new Dest();
// } else
// {
// // Split the replyto field on a ':'
// string[] split = replyToEncoding.Split(':');
// // Ensure that the replyto field argument only consisted of two parts.
// if ( split.Length != 2 )
// {
// throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
// }
// // Extract the exchange name and routing key from the split replyto field.
// string exchangeName = split[0];
// string[] split2 = split[1].Split('/');
// string routingKey = split2[3];
// return new Dest(exchangeName, routingKey);
// }
}
private void WriteReplyToHeader(BindingURL dest)
{
string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
ContentHeaderProperties.ReplyTo = encodedDestination;
}
}
public class BindingURL
{
public readonly static string OPTION_EXCLUSIVE = "exclusive";
public readonly static string OPTION_AUTODELETE = "autodelete";
public readonly static string OPTION_DURABLE = "durable";
public readonly static string OPTION_CLIENTID = "clientid";
public readonly static string OPTION_SUBSCRIPTION = "subscription";
public readonly static string OPTION_ROUTING_KEY = "routingkey";
/// <summary> Holds the undecoded URL </summary>
string url;
/// <summary> Holds the decoded options. </summary>
IDictionary options = new Hashtable();
/// <summary> Holds the decoded exchange class. </summary>
string exchangeClass;
/// <summary> Holds the decoded exchange name. </summary>
string exchangeName;
/// <summary> Holds the destination name. </summary>
string destination;
/// <summary> Holds the decoded queue name. </summary>
string queueName;
/// <summary>
/// The binding URL has the format:
/// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
/// </summary>
public BindingURL(string url)
{
this.url = url;
Parse();
}
public string Url { get { return url; } }
public string ExchangeClass
{
get { return exchangeClass; }
set { exchangeClass = value; }
}
public string ExchangeName
{
get { return exchangeName; }
set { exchangeName = value; }
}
public string QueueName
{
get { return queueName; }
set { queueName = value; }
}
public string DestinationName
{
get { return destination; }
set { destination = value; }
}
public string RoutingKey {
get { return (string)options[OPTION_ROUTING_KEY]; }
set { options[OPTION_ROUTING_KEY] = value; }
}
public bool ContainsOption(string key) { return options.Contains(key); }
public string ToString()
{
return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName +
", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] ";
}
private void Parse()
{
Uri binding = new Uri(url);
// Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified.
string exchangeClass = binding.Scheme;
if (exchangeClass == null)
{
url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url;
Parse();
return;
}
else
{
this.exchangeClass = exchangeClass;
}
// Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified.
string exchangeName = binding.Host;
if (exchangeName == null)
{
if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS))
{
this.exchangeName = "";
}
}
else
{
this.exchangeName = exchangeName;
}
// Extract the destination and queue name.
if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals(""))
{
throw new UriFormatException("Destination or Queue required");
}
else
{
int slashOffset = binding.AbsolutePath.IndexOf("/", 1);
if (slashOffset == -1)
{
throw new UriFormatException("Destination required");
}
else
{
String path = binding.AbsolutePath;
this.destination = path.Substring(1, slashOffset - 1);
this.queueName = path.Substring(slashOffset + 1);
}
}
ParseOptions(options, binding.Query);
// If the routing key is not set as an option, set it to the destination name.
if (!ContainsOption(OPTION_ROUTING_KEY))
{
options[OPTION_ROUTING_KEY] = destination;
}
}
/// <summary>
/// options looks like this
/// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'
/// </summary>
public static void ParseOptions(IDictionary optionMap, string options)
{
// Check that there really are some options to parse.
if ((options == null) || (options.IndexOf('=') == -1))
{
return;
}
int optionIndex = options.IndexOf('=');
string option = options.Substring(0, optionIndex);
int length = options.Length;
int nestedQuotes = 0;
// Holds the index of the final "'".
int valueIndex = optionIndex;
// Loop over all the options.Dest
while ((nestedQuotes > 0) || (valueIndex < length))
{
valueIndex++;
if (valueIndex >= length)
{
break;
}
if (options[valueIndex] == '\'')
{
if ((valueIndex + 1) < options.Length)
{
if ((options[valueIndex + 1] == '&') ||
(options[valueIndex + 1] == ',') ||
(options[valueIndex + 1] == ';') ||
(options[valueIndex + 1] == '\''))
{
nestedQuotes--;
if (nestedQuotes == 0)
{
// We've found the value of an option
break;
}
}
else
{
nestedQuotes++;
}
}
else
{
// We are at the end of the string
// Check to see if we are corectly closing quotes
if (options[valueIndex] == '\'')
{
nestedQuotes--;
}
break;
}
}
}
}
}
}