blob: 7e88db2808ccea2c652f010f8d33c730df92b7b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Text;
using Apache.NMS.Util;
using Org.Apache.Qpid.Messaging;
namespace Apache.NMS.Amqp
{
public enum NMSMessageType
{
BaseMessage,
TextMessage,
BytesMessage,
ObjectMessage,
MapMessage,
StreamMessage
}
public class DefaultMessageConverter : IMessageConverter
{
#region IMessageConverter Members
// NMS Message AMQP Message
// ================================ =================
// string NMSCorrelationID string CorrelationId
// MsgDeliveryMode NMSDeliveryMode bool Durable
// IDestination NMSDestination
// string MNSMessageId string MessageId
// MsgPriority NMSPriority byte Priority
// bool NMSRedelivered bool Redelivered
// IDestination NMSReplyTo Address ReplyTo
// DateTime NMSTimestamp
// TimeSpan NMSTimeToLive Duration Ttl
// string NMSType string ContentType
// IPrimitiveMap Properties Dictionary Properties
// string Subject
// string UserId
//
public virtual Message ToAmqpMessage(IMessage message)
{
Message amqpMessage = CreateAmqpMessage(message);
if (null != message.NMSCorrelationID)
{
amqpMessage.CorrelationId = message.NMSCorrelationID;
}
amqpMessage.Durable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent);
if (null != message.NMSMessageId)
{
amqpMessage.MessageId = message.NMSMessageId;
}
amqpMessage.Priority = ToAmqpMessagePriority(message.NMSPriority);
amqpMessage.Redelivered = message.NMSRedelivered;
if (null != message.NMSReplyTo)
{
amqpMessage.ReplyTo = ToAmqpAddress(message.NMSReplyTo);
}
if (message.NMSTimeToLive != TimeSpan.Zero)
{
amqpMessage.Ttl = ToQpidDuration(message.NMSTimeToLive);
}
if (null != message.NMSType)
{
amqpMessage.ContentType = message.NMSType;
}
amqpMessage.Properties = FromNmsPrimitiveMap(message.Properties);
// TODO: NMSDestination, Amqp.Subect, Amqp.UserId
return amqpMessage;
}
//
public virtual IMessage ToNmsMessage(Message message)
{
BaseMessage answer = CreateNmsMessage(message);
try
{
answer.NMSCorrelationID = message.CorrelationId;
answer.NMSDeliveryMode = (message.Durable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent);
answer.NMSMessageId = message.MessageId;
answer.NMSPriority = ToNmsPriority(message.Priority);
answer.NMSRedelivered = message.Redelivered;
answer.NMSReplyTo = ToNmsDestination(message.ReplyTo);
answer.NMSTimeToLive = ToNMSTimespan(message.Ttl);
answer.NMSType = message.ContentType;
SetNmsPrimitiveMap(answer.Properties, message.Properties);
// TODO: NMSDestination, NMSTimestamp, Properties
}
catch (InvalidOperationException)
{
}
return answer;
}
#endregion
#region MessagePriority Methods
//
private static byte ToAmqpMessagePriority(MsgPriority msgPriority)
{
return (byte)msgPriority;
}
//
private static MsgPriority ToNmsPriority(byte qpidMsgPriority)
{
if (qpidMsgPriority > (byte)MsgPriority.Highest)
{
return MsgPriority.Highest;
}
return (MsgPriority)qpidMsgPriority;
}
#endregion
#region Duration Methods
//
private static Duration ToQpidDuration(TimeSpan timespan)
{
if (timespan.TotalMilliseconds <= 0)
{
Duration result = DurationConstants.IMMEDIATE;
return result;
}
else if (timespan.TotalMilliseconds > (Double)DurationConstants.FORVER.Milliseconds)
{
Duration result = DurationConstants.FORVER;
return result;
}
else
{
Duration result = new Duration((UInt64)timespan.TotalMilliseconds);
return result;
}
}
//
private static TimeSpan ToNMSTimespan(Duration duration)
{
if (duration.Milliseconds > Int64.MaxValue)
{
TimeSpan result = new TimeSpan(Int64.MaxValue);
return result;
}
else
{
TimeSpan result = new TimeSpan((Int64)duration.Milliseconds);
return result;
}
}
#endregion
#region MessageBody Conversion Methods
protected virtual Message CreateAmqpMessage(IMessage message)
{
if (message is TextMessage)
{
TextMessage textMessage = message as TextMessage;
Message result = new Message(textMessage.Text);
return result;
}
else if (message is BytesMessage)
{
BytesMessage bytesMessage = message as BytesMessage;
Message result = new Message(bytesMessage.Content, 0, bytesMessage.Content.Length);
return result;
}
else if (message is ObjectMessage)
{
ObjectMessage objectMessage = message as ObjectMessage;
Message result = new Message(objectMessage.Body);
return result;
}
else if (message is MapMessage)
{
MapMessage mapMessage = message as MapMessage;
PrimitiveMap mapBody = mapMessage.Body as PrimitiveMap;
byte[] buf = mapBody.Marshal();
Message result = new Message(buf, 0, buf.Length);
return result;
}
else if (message is StreamMessage)
{
StreamMessage streamMessage = message as StreamMessage;
Message result = new Message(streamMessage.Content, 0, streamMessage.Content.Length);
return result;
}
else if (message is BaseMessage)
{
Message result = new Message();
return result;
}
else
{
throw new Exception("unhandled message type");
}
}
protected virtual BaseMessage CreateNmsMessage(Message message)
{
BaseMessage result = null;
if ("amqp/map" == message.ContentType)
{
// TODO: Return map message
}
else if ("amqp/list" == message.ContentType)
{
// TODO: Return list message
}
else
{
TextMessage textMessage = new TextMessage();
textMessage.Text = message.GetContent();
result = textMessage;
}
return result;
}
#endregion
#region Address/Destination Conversion Methods
public Address ToAmqpAddress(IDestination destination)
{
if (null == destination)
{
return null;
}
return new Address((destination as Destination).Path);
}
protected virtual IDestination ToNmsDestination(Address destinationQueue)
{
if (null == destinationQueue)
{
return null;
}
return new Queue(destinationQueue.ToString());
}
#endregion
#region PrimitiveMap Conversion Methods
//
public void SetNmsPrimitiveMap(IPrimitiveMap map, Dictionary<string, object> dict)
{
// TODO: lock?
map.Clear();
foreach (System.Collections.Generic.KeyValuePair
<string, object> kvp in dict)
{
map[kvp.Key] = kvp.Value;
}
}
//
public Dictionary<string, object> FromNmsPrimitiveMap(IPrimitiveMap pm)
{
Dictionary<string, object> dict = new Dictionary<string,object>();
// TODO: lock?
ICollection keys = pm.Keys;
foreach (object key in keys)
{
dict.Add(key.ToString(), pm[key.ToString()]);
}
return dict;
}
#endregion
}
}