blob: 2aa34382f2ce175b43ac62909386caf956588c74 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using System.Messaging;
using System.Text;
using Apache.NMS.Util;
namespace Apache.NMS.MSMQ
{
public enum NMSMessageType
{
BaseMessage,
TextMessage,
BytesMessage,
ObjectMessage,
MapMessage,
StreamMessage
}
public class DefaultMessageConverter : IMessageConverter
{
public virtual Message ToMsmqMessage(IMessage message)
{
Message msmqMessage = new Message();
PrimitiveMap metaData = new PrimitiveMap();
ConvertMessageBodyToMSMQ(message, msmqMessage);
if(message.NMSTimeToLive != TimeSpan.Zero)
{
msmqMessage.TimeToBeReceived = message.NMSTimeToLive;
}
if(message.NMSCorrelationID != null)
{
metaData.SetString("NMSCorrelationID", message.NMSCorrelationID);
}
msmqMessage.Recoverable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent);
msmqMessage.Priority = ToMessagePriority(message.NMSPriority);
msmqMessage.ResponseQueue = ToMsmqDestination(message.NMSReplyTo);
if(message.NMSType != null)
{
msmqMessage.Label = message.NMSType;
}
// Store the NMS meta data in the extension area
msmqMessage.Extension = metaData.Marshal();
return msmqMessage;
}
public virtual IMessage ToNmsMessage(Message message)
{
BaseMessage answer = CreateNmsMessage(message);
// Get the NMS meta data from the extension area
PrimitiveMap metaData = PrimitiveMap.Unmarshal(message.Extension);
try
{
answer.NMSMessageId = message.Id;
answer.NMSCorrelationID = metaData.GetString("NMSCorrelationID");
answer.NMSDeliveryMode = (message.Recoverable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent);
answer.NMSDestination = ToNmsDestination(message.DestinationQueue);
}
catch(InvalidOperationException)
{
}
try
{
answer.NMSType = message.Label;
answer.NMSReplyTo = ToNmsDestination(message.ResponseQueue);
answer.NMSTimeToLive = message.TimeToBeReceived;
}
catch(InvalidOperationException)
{
}
return answer;
}
private static MessagePriority ToMessagePriority(MsgPriority msgPriority)
{
switch(msgPriority)
{
case MsgPriority.Lowest:
return MessagePriority.Lowest;
case MsgPriority.VeryLow:
return MessagePriority.VeryLow;
case MsgPriority.Low:
case MsgPriority.AboveLow:
return MessagePriority.Low;
default:
case MsgPriority.BelowNormal:
case MsgPriority.Normal:
return MessagePriority.Normal;
case MsgPriority.AboveNormal:
return MessagePriority.AboveNormal;
case MsgPriority.High:
return MessagePriority.High;
case MsgPriority.VeryHigh:
return MessagePriority.VeryHigh;
case MsgPriority.Highest:
return MessagePriority.Highest;
}
}
protected virtual void ConvertMessageBodyToMSMQ(IMessage message, Message answer)
{
if(message is TextMessage)
{
TextMessage textMessage = message as TextMessage;
byte[] buf = Encoding.UTF32.GetBytes(textMessage.Text);
answer.BodyStream.Write(buf, 0, buf.Length);
answer.AppSpecific = (int) NMSMessageType.TextMessage;
}
else if(message is BytesMessage)
{
BytesMessage bytesMessage = message as BytesMessage;
answer.BodyStream.Write(bytesMessage.Content, 0, bytesMessage.Content.Length);
answer.AppSpecific = (int) NMSMessageType.BytesMessage;
}
else if(message is ObjectMessage)
{
ObjectMessage objectMessage = message as ObjectMessage;
answer.Body = objectMessage.Body;
answer.AppSpecific = (int) NMSMessageType.ObjectMessage;
}
else if(message is MapMessage)
{
MapMessage mapMessage = message as MapMessage;
PrimitiveMap mapBody = mapMessage.Body as PrimitiveMap;
byte[] buf = mapBody.Marshal();
answer.BodyStream.Write(buf, 0, buf.Length);
answer.AppSpecific = (int) NMSMessageType.MapMessage;
}
else if(message is StreamMessage)
{
StreamMessage streamMessage = message as StreamMessage;
answer.AppSpecific = (int) NMSMessageType.StreamMessage;
// TODO: Implement
}
else if(message is BaseMessage)
{
answer.AppSpecific = (int) NMSMessageType.BaseMessage;
}
else
{
throw new Exception("unhandled message type");
}
}
protected virtual BaseMessage CreateNmsMessage(Message message)
{
BaseMessage result = null;
if((int) NMSMessageType.TextMessage == message.AppSpecific)
{
TextMessage textMessage = new TextMessage();
string content = String.Empty;
if(message.BodyStream != null && message.BodyStream.Length > 0)
{
byte[] buf = null;
buf = new byte[message.BodyStream.Length];
message.BodyStream.Read(buf, 0, buf.Length);
content = Encoding.UTF32.GetString(buf);
}
textMessage.Text = content;
result = textMessage;
}
else if((int) NMSMessageType.BytesMessage == message.AppSpecific)
{
byte[] buf = null;
if(message.BodyStream != null && message.BodyStream.Length > 0)
{
buf = new byte[message.BodyStream.Length];
message.BodyStream.Read(buf, 0, buf.Length);
}
BytesMessage bytesMessage = new BytesMessage();
bytesMessage.Content = buf;
result = bytesMessage;
}
else if((int) NMSMessageType.ObjectMessage == message.AppSpecific)
{
ObjectMessage objectMessage = new ObjectMessage();
objectMessage.Body = message.Body;
result = objectMessage;
}
else if((int) NMSMessageType.MapMessage == message.AppSpecific)
{
byte[] buf = null;
if(message.BodyStream != null && message.BodyStream.Length > 0)
{
buf = new byte[message.BodyStream.Length];
message.BodyStream.Read(buf, 0, buf.Length);
}
MapMessage mapMessage = new MapMessage();
mapMessage.Body = PrimitiveMap.Unmarshal(buf);
result = mapMessage;
}
else if((int) NMSMessageType.StreamMessage == message.AppSpecific)
{
StreamMessage streamMessage = new StreamMessage();
// TODO: Implement
result = streamMessage;
}
else
{
BaseMessage baseMessage = new BaseMessage();
result = baseMessage;
}
return result;
}
public MessageQueue ToMsmqDestination(IDestination destination)
{
if(null == destination)
{
return null;
}
return new MessageQueue((destination as Destination).Path);
}
protected virtual IDestination ToNmsDestination(MessageQueue destinationQueue)
{
if(null == destinationQueue)
{
return null;
}
return new Queue(destinationQueue.Path);
}
}
}