blob: 978083fad5c7ae930a4368076f83e403c8e9b1a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using Apache.Qpid.Proton.Buffer;
using Apache.Qpid.Proton.Client.Exceptions;
using Apache.Qpid.Proton.Codec;
using Apache.Qpid.Proton.Types;
using Apache.Qpid.Proton.Types.Messaging;
namespace Apache.Qpid.Proton.Client.Implementation
{
public static class ClientMessageSupport
{
private static readonly IEncoder DEFAULT_ENCODER = CodecFactory.DefaultEncoder;
private static readonly IDecoder DEFAULT_DECODER = CodecFactory.DefaultDecoder;
/// <summary>
/// Converts an unknown Message instance into a client Message instance
/// either by cast or by construction of a new instance with a copy of the
/// values carried in the given message.
/// </summary>
/// <typeparam name="T">The type of body the message carries</typeparam>
/// <param name="message">The message to convert</param>
/// <returns>A converted client message instance from the source message.</returns>
public static IAdvancedMessage<T> ConvertMessage<T>(IMessage<T> message)
{
if (message is IAdvancedMessage<T>)
{
return (IAdvancedMessage<T>)message;
}
else
{
try
{
return message.ToAdvancedMessage();
}
catch (NotImplementedException)
{
return ConvertFromOutsideMessage(message);
}
}
}
/// <summary>
/// Simple encode of section instance into a given buffer using the default encoder
/// </summary>
/// <param name="section">The section to encode</param>
/// <param name="buffer">The buffer to encode into and return</param>
/// <returns>The provided buffer with the encoded bytes added.</returns>
public static IProtonBuffer EncodeSection(ISection section, IProtonBuffer buffer)
{
DEFAULT_ENCODER.WriteObject(buffer, DEFAULT_ENCODER.NewEncoderState(), section);
return buffer;
}
/// <summary>
/// Given a value of some type attempt to convert to the most appropriate AMQP
/// body section type and return that to the caller.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="body"></param>
/// <returns></returns>
public static ISection CreateSectionFromValue<T>(T body)
{
if (body == null)
{
return null;
}
else if (body is byte[])
{
return new Data(body as byte[]);
}
else if (body is IList)
{
return new AmqpSequence(body as IList);
}
else
{
return new AmqpValue(body);
}
}
#region Message Encoding support API
public static IProtonBuffer EncodeMessage<T>(IAdvancedMessage<T> message, IDictionary<string, object> deliveryAnnotations)
{
return EncodeMessage(DEFAULT_ENCODER, DEFAULT_ENCODER.NewEncoderState(), ProtonByteBufferAllocator.Instance, message, deliveryAnnotations);
}
public static IProtonBuffer EncodeMessage<T>(IEncoder encoder, IProtonBufferAllocator allocator, IAdvancedMessage<T> message, IDictionary<string, object> deliveryAnnotations)
{
return EncodeMessage(encoder, encoder.NewEncoderState(), ProtonByteBufferAllocator.Instance, message, deliveryAnnotations);
}
public static IProtonBuffer EncodeMessage<T>(IEncoder encoder, IEncoderState encoderState, IProtonBufferAllocator allocator, IAdvancedMessage<T> message, IDictionary<string, object> deliveryAnnotations)
{
IProtonBuffer buffer = allocator.Allocate();
Header header = message.Header;
MessageAnnotations messageAnnotations = message.Annotations;
Properties properties = message.Properties;
ApplicationProperties applicationProperties = message.ApplicationProperties;
Footer footer = message.Footer;
if (header != null)
{
encoder.WriteObject(buffer, encoderState, header);
}
if (deliveryAnnotations != null)
{
encoder.WriteObject(buffer, encoderState, new DeliveryAnnotations(ClientConversionSupport.ToSymbolKeyedMap(deliveryAnnotations)));
}
if (messageAnnotations != null)
{
encoder.WriteObject(buffer, encoderState, messageAnnotations);
}
if (properties != null)
{
encoder.WriteObject(buffer, encoderState, properties);
}
if (applicationProperties != null)
{
encoder.WriteObject(buffer, encoderState, applicationProperties);
}
message.ForEachBodySection(section => encoder.WriteObject(buffer, encoderState, section));
if (footer != null)
{
encoder.WriteObject(buffer, encoderState, footer);
}
return buffer;
}
#endregion
#region Message Decoding support API
public static ClientMessage<object> DecodeMessage(IProtonBuffer buffer, Action<DeliveryAnnotations> daConsumer)
{
return DecodeMessage(DEFAULT_DECODER, DEFAULT_DECODER.NewDecoderState(), buffer, daConsumer);
}
public static ClientMessage<object> DecodeMessage(IDecoder decoder, IProtonBuffer buffer, Action<DeliveryAnnotations> daConsumer)
{
return DecodeMessage(decoder, decoder.NewDecoderState(), buffer, daConsumer);
}
public static ClientMessage<object> DecodeMessage(IDecoder decoder, IDecoderState decoderState,
IProtonBuffer buffer, Action<DeliveryAnnotations> daConsumer)
{
ClientMessage<object> message = new ClientMessage<object>();
ISection section = null;
while (buffer.IsReadable)
{
try
{
section = (ISection)decoder.ReadObject(buffer, decoderState);
}
catch (Exception e)
{
throw ClientExceptionSupport.CreateNonFatalOrPassthrough(e);
}
switch (section.Type)
{
case SectionType.Header:
message.Header = (Header)section;
break;
case SectionType.DeliveryAnnotations:
if (daConsumer != null)
{
daConsumer.Invoke((DeliveryAnnotations)section);
}
break;
case SectionType.MessageAnnotations:
message.Annotations = (MessageAnnotations)section;
break;
case SectionType.Properties:
message.Properties = (Properties)section;
break;
case SectionType.ApplicationProperties:
message.ApplicationProperties = (ApplicationProperties)section;
break;
case SectionType.Data:
case SectionType.AmqpSequence:
case SectionType.AmqpValue:
message.AddBodySection(section);
break;
case SectionType.Footer:
message.Footer = (Footer)section;
break;
default:
throw new ClientException("Unknown Message Section forced decode abort.");
}
}
return message;
}
#endregion
#region Private supporting conversion methods
private static ClientMessage<T> ConvertFromOutsideMessage<T>(IMessage<T> source)
{
Header header = new Header();
header.Durable = source.Durable;
header.Priority = source.Priority;
header.TimeToLive = source.TimeToLive;
header.FirstAcquirer = source.FirstAcquirer;
header.DeliveryCount = source.DeliveryCount;
byte[] userId = source.UserId;
Properties properties = new Properties();
properties.MessageId = source.MessageId;
properties.UserId = userId != null ? ProtonByteBufferAllocator.Instance.Wrap(userId) : null;
properties.To = source.To;
properties.Subject = source.Subject;
properties.ReplyTo = source.ReplyTo;
properties.CorrelationId = source.CorrelationId;
properties.ContentType = source.ContentType;
properties.ContentEncoding = source.ContentEncoding;
properties.AbsoluteExpiryTime = source.AbsoluteExpiryTime;
properties.CreationTime = source.CreationTime;
properties.GroupId = source.GroupId;
properties.GroupSequence = source.GroupSequence;
properties.ReplyToGroupId = source.ReplyToGroupId;
MessageAnnotations messageAnnotations;
if (source.HasAnnotations)
{
messageAnnotations = new MessageAnnotations(new Dictionary<Symbol, object>());
source.ForEachAnnotation((key, value) =>
{
messageAnnotations.Value.Add(Symbol.Lookup(key), value);
});
}
else
{
messageAnnotations = null;
}
ApplicationProperties applicationProperties;
if (source.HasProperties)
{
applicationProperties = new ApplicationProperties(new Dictionary<string, object>());
source.ForEachProperty((key, value) =>
{
applicationProperties.Value.Add(key, value);
});
}
else
{
applicationProperties = null;
}
Footer footer;
if (source.HasFooters)
{
footer = new Footer(new Dictionary<Symbol, object>());
source.ForEachFooter((key, value) =>
{
footer.Value.Add(Symbol.Lookup(key), value);
});
}
else
{
footer = null;
}
ClientMessage<T> message = new ClientMessage<T>(CreateSectionFromValue(source.Body));
message.Header = header;
message.Properties = properties;
message.Annotations = messageAnnotations;
message.ApplicationProperties = applicationProperties;
message.Footer = footer;
return message;
}
#endregion
}
}