blob: b726474431779092a598e2bb57364a4b6994d3b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using System.Messaging;
using System.Text;
using Apache.NMS.Util;
namespace Apache.NMS.MSMQ
{
public enum NMSMessageType
{
BaseMessage,
TextMessage,
BytesMessage,
ObjectMessage,
MapMessage,
StreamMessage
}
/// <summary>
/// This class provides default rules for converting MSMQ to and from
/// NMS messages, when the peer system expects or produces compatible
/// mappings, typically when the peer system is also implemented on
/// Apache.NMS.
/// Default mappings are as follows :
/// <ul>
/// <li>
/// the MSMQ Message.AppSetting field is used for specifying the NMS
/// message type, as specified by the <c>NMSMessageType</c> enumeration.
/// </li>
/// <li>
/// the MSMQ Message.Extension field is populated with a map
/// (a marshalled <c>PrimitiveMap</c>) of message properties.
/// </li>
/// <li>
/// in earlier versions of Apache.NMS.MSMQ, the MSMQ Message.Label
/// field was populated with the value of the NMSType field. Setting
/// <c>SetLabelAsNMSType</c> to true (the default value) applies that
/// same rule, which makes it compatible with existing NMS peers. If
/// set to false, the Message.Label field is populated with the value
/// of a "Label" property, if it exists, thus making it readable by
/// standard management or monitoring tools. The NMSType value is then
/// transmitted as a field in the Message.Extension map.
/// </li>
/// </ul>
/// Please note that in earlier versions of Apache.NMS, only one property
/// was set in the Message.Extension field : the NMSCorrelationID.
/// The native Message.CorrelationId field is not settable, except for
/// reply messages explicitely created as such through the MSMQ API.
/// Transmission of the correlation id. through a mapped property called
/// NMSCorrelationID is therefore maintained.
/// When exchanging messages with a non compatible peer, a specific
/// message converter must be provided, which should at least be able to
/// map message types and define the encoding used for text messages.
/// </summary>
public class DefaultMessageConverter : IMessageConverterEx
{
private bool setLabelAsNMSType = true;
public bool SetLabelAsNMSType
{
get { return setLabelAsNMSType; }
set { setLabelAsNMSType = value; }
}
#region Messages
/// <summary>
/// Converts the specified NMS message to an equivalent MSMQ message.
/// </summary>
/// <param name="message">NMS message to be converted.</param>
/// <result>Converted MSMQ message.</result>
public virtual Message ToMsmqMessage(IMessage message)
{
Message msmqMessage = new Message();
PrimitiveMap propertyData = new PrimitiveMap();
ConvertMessageBodyToMSMQ(message, msmqMessage);
if(message.NMSTimeToLive != TimeSpan.Zero)
{
msmqMessage.TimeToBeReceived = message.NMSTimeToLive;
}
if(message.NMSCorrelationID != null)
{
propertyData.SetString("NMSCorrelationID", message.NMSCorrelationID);
}
msmqMessage.Recoverable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent);
msmqMessage.Priority = ToMsmqMessagePriority(message.NMSPriority);
msmqMessage.ResponseQueue = ToMsmqDestination(message.NMSReplyTo);
if(message.NMSType != null)
{
if(SetLabelAsNMSType)
{
propertyData.SetString("NMSType", message.NMSType);
}
else
{
msmqMessage.Label = message.NMSType;
}
}
// Populate property data
foreach(object keyObject in message.Properties.Keys)
{
string key = (keyObject as string);
object val = message.Properties[key];
if(!SetLabelAsNMSType && string.Compare(key, "Label", true) == 0 && val != null)
{
msmqMessage.Label = val.ToString();
}
else
{
propertyData[key] = val;
}
}
// Store the NMS property data in the extension area
msmqMessage.Extension = propertyData.Marshal();
return msmqMessage;
}
/// <summary>
/// Converts the specified MSMQ message to an equivalent NMS message
/// (including its message body).
/// </summary>
/// <param name="message">MSMQ message to be converted.</param>
/// <result>Converted NMS message.</result>
public virtual IMessage ToNmsMessage(Message message)
{
return ToNmsMessage(message, true);
}
/// <summary>
/// Converts the specified MSMQ message to an equivalent NMS message.
/// </summary>
/// <param name="message">MSMQ message to be converted.</param>
/// <param name="convertBody">true if message body should be converted.</param>
/// <result>Converted NMS message.</result>
public virtual IMessage ToNmsMessage(Message message, bool convertBody)
{
BaseMessage answer = CreateNmsMessage(message, convertBody);
// Get the NMS property data from the extension area
PrimitiveMap propertyData = PrimitiveMap.Unmarshal(message.Extension);
try
{
answer.NMSMessageId = message.Id;
answer.NMSCorrelationID = propertyData.GetString("NMSCorrelationID");
answer.NMSDeliveryMode = (message.Recoverable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent);
answer.NMSDestination = ToNmsDestination(message.DestinationQueue);
}
catch(InvalidOperationException)
{
}
try
{
answer.NMSReplyTo = ToNmsDestination(message.ResponseQueue);
answer.NMSTimeToLive = message.TimeToBeReceived;
answer.NMSPriority = ToNmsMsgPriority(message.Priority);
}
catch(InvalidOperationException)
{
}
try
{
if(message.Label != null)
{
if(SetLabelAsNMSType)
{
answer.NMSType = message.Label;
}
else
{
answer.Properties["Label"] = message.Label;
}
}
answer.Properties["LookupId"] = message.LookupId;
}
catch(InvalidOperationException)
{
}
foreach(object keyObject in propertyData.Keys)
{
try
{
string key = (keyObject as string);
if(string.Compare(key, "NMSType", true) == 0)
{
answer.NMSType = propertyData.GetString(key);
}
else if(string.Compare(key, "NMSCorrelationID", true) == 0)
{
answer.NMSCorrelationID = propertyData.GetString("NMSCorrelationID");
}
else
{
answer.Properties[key] = propertyData[key];
}
}
catch(InvalidOperationException)
{
}
}
return answer;
}
#endregion
#region Message priority
// Message priorities are defined as follows :
// | MSMQ | NMS |
// | MessagePriority | MsgPriority |
// +--------------------+--------------------+
// | Lowest | Lowest |
// | VeryLow | VeryLow |
// | Low | Low |
// | \-> | AboveLow |
// | /-> | BelowNormal |
// | Normal | Normal |
// | AboveNormal | AboveNormal |
// | High | High |
// | VeryHigh | VeryHigh |
// | Highest | Highest |
// +--------------------+--------------------+
/// <summary>
/// Converts the specified NMS message priority to an equivalent MSMQ
/// message priority.
/// </summary>
/// <param name="msgPriority">NMS message priority to be converted.</param>
/// <result>Converted MSMQ message priority.</result>
private static MessagePriority ToMsmqMessagePriority(MsgPriority msgPriority)
{
switch(msgPriority)
{
case MsgPriority.Lowest:
return MessagePriority.Lowest;
case MsgPriority.VeryLow:
return MessagePriority.VeryLow;
case MsgPriority.Low:
case MsgPriority.AboveLow:
return MessagePriority.Low;
default:
case MsgPriority.BelowNormal:
case MsgPriority.Normal:
return MessagePriority.Normal;
case MsgPriority.AboveNormal:
return MessagePriority.AboveNormal;
case MsgPriority.High:
return MessagePriority.High;
case MsgPriority.VeryHigh:
return MessagePriority.VeryHigh;
case MsgPriority.Highest:
return MessagePriority.Highest;
}
}
/// <summary>
/// Converts the specified MSMQ message priority to an equivalent NMS
/// message priority.
/// </summary>
/// <param name="messagePriority">MSMQ message priority to be converted.</param>
/// <result>Converted NMS message priority.</result>
private static MsgPriority ToNmsMsgPriority(MessagePriority messagePriority)
{
switch(messagePriority)
{
case MessagePriority.Lowest:
return MsgPriority.Lowest;
case MessagePriority.VeryLow:
return MsgPriority.VeryLow;
case MessagePriority.Low:
return MsgPriority.Low;
default:
case MessagePriority.Normal:
return MsgPriority.Normal;
case MessagePriority.AboveNormal:
return MsgPriority.AboveNormal;
case MessagePriority.High:
return MsgPriority.High;
case MessagePriority.VeryHigh:
return MsgPriority.VeryHigh;
case MessagePriority.Highest:
return MsgPriority.Highest;
}
}
#endregion
#region Message creation
// Conversion of the message body has been separated from the creation
// of the NMS message object for performance reasons when using
// selectors (selectors handle only message attributes, not message
// bodies).
// CreateNmsMessage(Message) is maintained for compatibility reasons
// with existing clients that may have implemented derived classes,
// instead of completely removing the body conversion part from the
// method.
/// <summary>
/// Creates an NMS message of appropriate type for the specified MSMQ
/// message, and convert the message body.
/// </summary>
/// <param name="message">MSMQ message.</param>
/// <result>NMS message created for retrieving the MSMQ message.</result>
protected virtual BaseMessage CreateNmsMessage(Message message)
{
return CreateNmsMessage(message, true);
}
/// <summary>
/// Creates an NMS message of appropriate type for the specified MSMQ
/// message, and convert the message body if specified.
/// </summary>
/// <param name="message">MSMQ message.</param>
/// <param name="convertBody">true if the message body must be
/// converted.</param>
/// <result>NMS message created for retrieving the MSMQ message.</result>
protected virtual BaseMessage CreateNmsMessage(Message message,
bool convertBody)
{
BaseMessage result = null;
if((int) NMSMessageType.TextMessage == message.AppSpecific)
{
TextMessage textMessage = new TextMessage();
if(convertBody)
{
ConvertTextMessageBodyToNMS(message, textMessage);
}
result = textMessage;
}
else if((int) NMSMessageType.BytesMessage == message.AppSpecific)
{
BytesMessage bytesMessage = new BytesMessage();
if(convertBody)
{
ConvertBytesMessageBodyToNMS(message, bytesMessage);
}
result = bytesMessage;
}
else if((int) NMSMessageType.ObjectMessage == message.AppSpecific)
{
ObjectMessage objectMessage = new ObjectMessage();
if(convertBody)
{
ConvertObjectMessageBodyToNMS(message, objectMessage);
}
result = objectMessage;
}
else if((int) NMSMessageType.MapMessage == message.AppSpecific)
{
MapMessage mapMessage = new MapMessage();
if(convertBody)
{
ConvertMapMessageBodyToNMS(message, mapMessage);
}
result = mapMessage;
}
else if((int) NMSMessageType.StreamMessage == message.AppSpecific)
{
StreamMessage streamMessage = new StreamMessage();
if(convertBody)
{
ConvertStreamMessageBodyToNMS(message, streamMessage);
}
result = streamMessage;
}
else
{
BaseMessage baseMessage = new BaseMessage();
result = baseMessage;
}
result.ReadOnlyBody = true;
return result;
}
#endregion
#region Message body
/// <summary>
/// Converts an NMS message body to the equivalent MSMQ message body.
/// </summary>
/// <param name="message">Source NMS message.</param>
/// <param name="answer">Target MSMQ message.</param>
protected virtual void ConvertMessageBodyToMSMQ(IMessage message, Message answer)
{
if(message is TextMessage)
{
TextMessage textMessage = message as TextMessage;
byte[] buf = Encoding.UTF32.GetBytes(textMessage.Text);
answer.BodyStream.Write(buf, 0, buf.Length);
answer.AppSpecific = (int) NMSMessageType.TextMessage;
}
else if(message is BytesMessage)
{
BytesMessage bytesMessage = message as BytesMessage;
bytesMessage.Reset();
answer.BodyStream.Write(bytesMessage.Content, 0, bytesMessage.Content.Length);
answer.AppSpecific = (int) NMSMessageType.BytesMessage;
}
else if(message is ObjectMessage)
{
ObjectMessage objectMessage = message as ObjectMessage;
answer.Body = objectMessage.Body;
answer.AppSpecific = (int) NMSMessageType.ObjectMessage;
}
else if(message is MapMessage)
{
MapMessage mapMessage = message as MapMessage;
PrimitiveMap mapBody = mapMessage.Body as PrimitiveMap;
byte[] buf = mapBody.Marshal();
answer.BodyStream.Write(buf, 0, buf.Length);
answer.AppSpecific = (int) NMSMessageType.MapMessage;
}
else if(message is StreamMessage)
{
StreamMessage streamMessage = message as StreamMessage;
answer.AppSpecific = (int) NMSMessageType.StreamMessage;
// TODO: Implement
}
else if(message is BaseMessage)
{
answer.AppSpecific = (int) NMSMessageType.BaseMessage;
}
else
{
throw new Exception("unhandled message type");
}
}
/// <summary>
/// Converts an MSMQ message body to the equivalent NMS message body.
/// </summary>
/// <param name="message">Source MSMQ message.</param>
/// <param name="answer">Target NMS message.</param>
public virtual void ConvertMessageBodyToNMS(Message message, IMessage answer)
{
if(answer is TextMessage)
{
ConvertTextMessageBodyToNMS(message, (TextMessage)answer);
}
else if(answer is BytesMessage)
{
ConvertBytesMessageBodyToNMS(message, (BytesMessage)answer);
}
else if(answer is ObjectMessage)
{
ConvertObjectMessageBodyToNMS(message, (ObjectMessage)answer);
}
else if(answer is MapMessage)
{
ConvertMapMessageBodyToNMS(message, (MapMessage)answer);
}
else if(answer is StreamMessage)
{
ConvertStreamMessageBodyToNMS(message, (StreamMessage)answer);
}
return;
}
/// <summary>
/// Converts an MSMQ message body to the equivalent NMS text message
/// body.
/// </summary>
/// <param name="message">Source MSMQ message.</param>
/// <param name="answer">Target NMS text message.</param>
public virtual void ConvertTextMessageBodyToNMS(Message message,
TextMessage answer)
{
string content = String.Empty;
if(message.BodyStream != null && message.BodyStream.Length > 0)
{
byte[] buf = new byte[message.BodyStream.Length];
message.BodyStream.Read(buf, 0, buf.Length);
content = Encoding.UTF32.GetString(buf);
}
answer.Text = content;
}
/// <summary>
/// Converts an MSMQ message body to the equivalent NMS bytes message
/// body.
/// </summary>
/// <param name="message">Source MSMQ message.</param>
/// <param name="answer">Target NMS bytes message.</param>
public virtual void ConvertBytesMessageBodyToNMS(Message message,
BytesMessage answer)
{
byte[] buf = null;
if(message.BodyStream != null && message.BodyStream.Length > 0)
{
buf = new byte[message.BodyStream.Length];
message.BodyStream.Read(buf, 0, buf.Length);
}
answer.Content = buf;
}
/// <summary>
/// Converts an MSMQ message body to the equivalent NMS object message
/// body.
/// </summary>
/// <param name="message">Source MSMQ message.</param>
/// <param name="answer">Target NMS object message.</param>
public virtual void ConvertObjectMessageBodyToNMS(Message message,
ObjectMessage answer)
{
answer.Body = message.Body;
}
/// <summary>
/// Converts an MSMQ message body to the equivalent NMS map message
/// body.
/// </summary>
/// <param name="message">Source MSMQ message.</param>
/// <param name="answer">Target NMS map message.</param>
public virtual void ConvertMapMessageBodyToNMS(Message message,
MapMessage answer)
{
byte[] buf = null;
if(message.BodyStream != null && message.BodyStream.Length > 0)
{
buf = new byte[message.BodyStream.Length];
message.BodyStream.Read(buf, 0, buf.Length);
}
answer.Body = PrimitiveMap.Unmarshal(buf);
}
/// <summary>
/// Converts an MSMQ message body to the equivalent NMS stream message
/// body.
/// </summary>
/// <param name="message">Source MSMQ message.</param>
/// <param name="answer">Target NMS stream message.</param>
public virtual void ConvertStreamMessageBodyToNMS(Message message,
StreamMessage answer)
{
// TODO: Implement
throw new NotImplementedException();
}
#endregion
#region Destination
/// <summary>
/// Converts an NMS destination to the equivalent MSMQ destination
/// (ie. queue).
/// </summary>
/// <param name="destination">NMS destination.</param>
/// <result>MSMQ queue.</result>
public MessageQueue ToMsmqDestination(IDestination destination)
{
Queue queue = destination as Queue;
if(destination == null)
{
return null;
}
return queue.MSMQMessageQueue;
}
/// <summary>
/// Converts an MSMQ destination (ie. queue) to the equivalent NMS
/// destination.
/// </summary>
/// <param name="destinationQueue">MSMQ destination queue.</param>
/// <result>NMS destination.</result>
protected virtual IDestination ToNmsDestination(MessageQueue destinationQueue)
{
if(null == destinationQueue)
{
return null;
}
return new Queue(destinationQueue);
}
#endregion
}
}