blob: 95f14654af6a691b81dcf3b3cc5a0240bcdfebbc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Apache.NMS;
using Amqp.Types;
using Amqp.Framing;
namespace Apache.NMS.AMQP.Message.AMQP
{
using Util;
using Cloak;
class AMQPMessageBuilder
{
public static IMessage CreateProviderMessage(MessageConsumer consumer, Amqp.Message message)
{
IMessage msg = null;
msg = CreateFromMessageAnnontations(consumer, message);
if(msg == null)
{
msg = CreateFromMessageBody(consumer, message);
}
if(msg == null)
{
throw new NMSException("Could not create NMS Message.");
}
return msg;
}
private static IMessage CreateFromMessageBody(MessageConsumer consumer, Amqp.Message message)
{
IMessage msg = null;
object body = message.Body;
if(body == null)
{
if (IsContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message))
{
msg = CreateObjectMessage(consumer, message);
}
else if (IsContentType(SymbolUtil.OCTET_STREAM_CONTENT_TYPE, message) || IsContentType(null, message))
{
msg = CreateBytesMessage(consumer, message);
}
else
{
Symbol contentType = GetContentType(message);
if(contentType != null)
{
msg = CreateTextMessage(consumer, message);
}
else
{
msg = CreateMessage(consumer, message);
}
}
}
else if (message.BodySection is Data)
{
if(IsContentType(SymbolUtil.OCTET_STREAM_CONTENT_TYPE, message) || IsContentType(null, message))
{
msg = CreateBytesMessage(consumer, message);
}
else if (IsContentType(SymbolUtil.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message))
{
msg = CreateObjectMessage(consumer, message);
}
else
{
Symbol contentType = GetContentType(message);
if(contentType != null)
{
msg = CreateTextMessage(consumer, message);
}
else
{
msg = CreateBytesMessage(consumer, message);
}
}
}
else if (message.BodySection is AmqpSequence)
{
msg = CreateObjectMessage(consumer, message);
}
else if (body is string)
{
msg = CreateTextMessage(consumer, message);
}
else if (body is byte[])
{
msg = CreateBytesMessage(consumer, message);
}
else
{
msg = CreateObjectMessage(consumer, message);
}
return msg;
}
private static Symbol GetContentType(Amqp.Message message)
{
Properties msgProps = message.Properties;
if (msgProps == null)
{
return null;
}
else
{
return msgProps.ContentType;
}
}
private static bool IsContentType(Symbol type, Amqp.Message message)
{
Symbol contentType = GetContentType(message);
if (contentType == null)
{
return type == null;
}
else
{
return type.Equals(contentType);
}
}
private static IMessage CreateFromMessageAnnontations(MessageConsumer consumer, Amqp.Message message)
{
IMessage msg = null;
object objVal = message.MessageAnnotations[SymbolUtil.JMSX_OPT_MSG_TYPE];
if(objVal != null && objVal is SByte)
{
byte type = Convert.ToByte(objVal);
switch (type)
{
case MessageSupport.JMS_TYPE_MSG:
msg = CreateMessage(consumer, message);
break;
case MessageSupport.JMS_TYPE_BYTE:
msg = CreateBytesMessage(consumer, message);
break;
case MessageSupport.JMS_TYPE_TXT:
msg = CreateTextMessage(consumer, message);
break;
case MessageSupport.JMS_TYPE_OBJ:
msg = CreateObjectMessage(consumer, message);
break;
case MessageSupport.JMS_TYPE_STRM:
msg = CreateStreamMessage(consumer, message);
break;
case MessageSupport.JMS_TYPE_MAP:
msg = CreateMapMessage(consumer, message);
break;
default:
throw new NMSException("Unsupported Msg Annontation type: " + type);
}
}
return msg;
}
private static IMessage CreateMessage(MessageConsumer consumer, Amqp.Message message)
{
IMessageCloak cloak = new AMQPMessageCloak(consumer, message);
return new Message(cloak);
}
private static IMessage CreateTextMessage(MessageConsumer consumer, Amqp.Message message)
{
ITextMessageCloak cloak = new AMQPTextMessageCloak(consumer, message);
return new TextMessage(cloak);
}
private static IMessage CreateStreamMessage(MessageConsumer consumer, Amqp.Message message)
{
IStreamMessageCloak cloak = new AMQPStreamMessageCloak(consumer, message);
return new StreamMessage(cloak);
}
private static IMessage CreateObjectMessage(MessageConsumer consumer, Amqp.Message message)
{
IObjectMessageCloak cloak = new AMQPObjectMessageCloak(consumer, message);
return new ObjectMessage(cloak);
}
private static IMessage CreateMapMessage(MessageConsumer consumer, Amqp.Message message)
{
IMapMessageCloak cloak = new AMQPMapMessageCloak(consumer, message);
return new MapMessage(cloak);
}
private static IMessage CreateBytesMessage(MessageConsumer consumer, Amqp.Message message)
{
IBytesMessageCloak cloak = new AMQPBytesMessageCloak(consumer, message);
return new BytesMessage(cloak);
}
}
}