blob: 818bfe34f48948dc86071a36ebd2982f10b358a0 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using Amqp;
using Amqp.Framing;
using Amqp.Types;
using System.Web.Script.Serialization;
//using Newtonsoft.Json;
namespace Qpidit
{
/// <summary>
/// MessageValue holds a typed AMQP.Net Lite message body AmqpValue,
/// decodes it, and returns the QpidIT string that represents the message.
/// Complex values List and Map are constructed recursively.
/// Remaining singleton values like int or long are held directly as strings.
/// </summary>
class MessageValue
{
// Original message body object
private object messageValue;
// Computed object type names
private string liteType;
private string qpiditType;
// Singleton decoded value in QpidIt format
private string valueString;
// Storage for List
private List<MessageValue> valueList;
// Storage for Map
// Kept as lists to avoid dictionary reordering complications.
private List<MessageValue> valueMapKeys;
private List<MessageValue> valueMapValues;
/// <summary>
/// Constructor.
/// User must call Decode to parse values.
/// </summary>
/// <param name="messageValue">Lite AmqpValue</param>
public MessageValue(object messageValue)
{
this.messageValue = messageValue;
liteType = "";
qpiditType = "";
valueString = null;
valueList = null;
valueMapKeys = null;
valueMapValues = null;
}
/// <summary>
/// Lite type name property
/// </summary>
public string LiteTypeName { get { return liteType; } }
/// <summary>
/// QpidIt type name property
/// </summary>
public string QpiditTypeName { get { return qpiditType; } }
/// <summary>
/// Decode the value and contained sub-values.
/// </summary>
/// <returns></returns>
public override string ToString()
{
if (valueString != null)
{
return "\"" + valueString + "\"";
}
else if (valueList != null)
{
StringBuilder sb = new StringBuilder();
bool leading = false;
sb.Append("[");
foreach (var item in valueList)
{
if (leading) sb.Append(", ");
leading = true;
sb.Append(item.ToString());
}
sb.Append("]");
return sb.ToString();
}
else if (valueMapKeys != null)
{
StringBuilder msb = new StringBuilder();
msb.Append("{");
for (int i = 0; i < valueMapKeys.Count; i++)
{
if (i > 0) msb.Append(", ");
msb.Append(valueMapKeys[i].ToString());
msb.Append(":");
msb.Append(valueMapValues[i].ToString());
}
msb.Append("}");
return msb.ToString();
}
else
{
throw new ApplicationException("printing undecoded MessageValue");
}
}
/// <summary>
/// Compute contents of byte array in reverse order.
/// </summary>
/// <param name="input">The byte array.</param>
/// <param name="suppressLeading0s">Flag controls suppression of leading zeros.</param>
/// <returns>Hexadecimal string</returns>
public string BytesReversedToString(byte[] input, bool suppressLeading0s = false)
{
if (!BitConverter.IsLittleEndian)
{
throw new ApplicationException("This code requires a little endian host.");
}
string result = "";
for (int i = input.Length - 1; i >= 0; i--)
{
result += String.Format("{0:x2}", input[i]);
}
if (suppressLeading0s)
{
result = result.TrimStart('0');
}
if (String.IsNullOrEmpty(result))
{
result = "0";
}
return result;
}
/// <summary>
/// Json-encode an array of bytes assumed to be plain ascii text.
/// </summary>
/// <param name="theBytes">The byte array.</param>
/// <returns>Serialized string</returns>
public string SerializeBytes(byte[] theBytes)
{
StringBuilder builder = new StringBuilder();
byte asciiSlash = Encoding.Default.GetBytes("\\")[0];
byte asciiDoubleQuote = Encoding.Default.GetBytes("\"")[0];
foreach (byte b in theBytes)
{
if (b == asciiSlash) {
// backslash needs backslash escape
builder.Append((char)asciiSlash);
builder.Append((char)asciiSlash);
}
else if (b == asciiDoubleQuote) {
// doublequote needs backslash escape
builder.Append((char)asciiSlash);
builder.Append((char)asciiDoubleQuote);
}
else if (b >= 32 && b <= 127)
// printable ascii passed through
builder.Append((char)b);
else
// non-printable ascii as escaped hex bytes
builder.Append(String.Format("\\x{0:x2}", b));
}
return builder.ToString();
}
/// <summary>
/// Json-encode an object whose string value is assumed to be plain ascii text.
/// </summary>
/// <param name="theObject">The object.</param>
/// <returns>Serialized string</returns>
public string SerializeString(object theObject)
{
byte[] bytes = Encoding.ASCII.GetBytes(theObject.ToString());
return SerializeBytes(bytes);
}
/// <summary>
/// Decode message body's object type names and QpidIt display details.
/// Recursively process maps and lists.
/// </summary>
public void Decode()
{
if (messageValue == null)
{
liteType = "null";
qpiditType = "null";
valueString = "None";
return;
}
liteType = messageValue.GetType().Name;
if (liteType == "List")
{
qpiditType = "list";
valueList = new List<MessageValue>();
List list = (List)messageValue;
foreach (var item in list)
{
MessageValue itemValue = new Qpidit.MessageValue(item);
itemValue.Decode();
valueList.Add(itemValue);
}
}
else if (liteType == "Map")
{
qpiditType = "map";
valueMapKeys = new List<MessageValue>();
valueMapValues = new List<MessageValue>();
Map map = (Map)messageValue;
foreach (var key in map.Keys)
{
MessageValue valueKey = new Qpidit.MessageValue(key);
valueKey.Decode();
MessageValue valueValue = new Qpidit.MessageValue(map[key]);
valueValue.Decode();
valueMapKeys.Add(valueKey);
valueMapValues.Add(valueValue);
}
}
else
{
//Console.WriteLine("MessageValue singleton type : {0}", liteType);
switch (liteType)
{
case "Boolean":
qpiditType = "boolean";
valueString = (Boolean)messageValue ? "True" : "False";
break;
case "Byte":
qpiditType = "ubyte";
valueString = String.Format("0x{0:x}", (Byte)messageValue);
break;
case "UInt16":
qpiditType = "ushort";
valueString = String.Format("0x{0:x}", (UInt16)messageValue);
break;
case "UInt32":
qpiditType = "uint";
valueString = String.Format("0x{0:x}", (UInt32)messageValue);
break;
case "UInt64":
qpiditType = "ulong";
valueString = String.Format("0x{0:x}", (UInt64)messageValue);
break;
case "SByte":
qpiditType = "byte";
SByte mySbyte = (SByte)messageValue;
if (mySbyte >= 0)
{
valueString = String.Format("0x{0:x}", mySbyte);
}
else
{
mySbyte = (SByte) (-mySbyte);
valueString = String.Format("-0x{0:x}", mySbyte);
}
break;
case "Int16":
qpiditType = "short";
Int16 myInt16 = (Int16)messageValue;
if (myInt16 >= 0)
{
valueString = String.Format("0x{0:x}", myInt16);
}
else
{
myInt16 = (Int16)(-myInt16);
valueString = String.Format("-0x{0:x}", myInt16);
}
break;
case "Int32":
qpiditType = "int";
Int32 myInt32 = (Int32)messageValue;
if (myInt32 >= 0)
{
valueString = String.Format("0x{0:x}", myInt32);
}
else
{
myInt32 = (Int32)(-myInt32);
valueString = String.Format("-0x{0:x}", myInt32);
}
break;
case "Int64":
qpiditType = "long";
Int64 myInt64 = (Int64)messageValue;
if (myInt64 >= 0)
{
valueString = String.Format("0x{0:x}", myInt64);
}
else
{
myInt64 = (Int64)(-myInt64);
valueString = String.Format("-0x{0:x}", myInt64);
}
break;
case "Single":
byte[] sbytes = BitConverter.GetBytes((Single)messageValue);
qpiditType = "float";
valueString = "0x" + BytesReversedToString(sbytes);
break;
case "Double":
byte[] dbytes = BitConverter.GetBytes((Double)messageValue);
qpiditType = "double";
valueString = "0x" + BytesReversedToString(dbytes);
break;
case "DateTime":
// epochTicks is the number of 100uSec ticks between 01/01/0001
// and 01/01/1970. Used to adjust between DateTime and unix epoch.
const long epochTicks = 621355968000000000;
byte[] dtbytes = BitConverter.GetBytes(
(((DateTime)messageValue).Ticks - epochTicks) / TimeSpan.TicksPerMillisecond);
qpiditType = "timestamp";
valueString = "0x" + BytesReversedToString(dtbytes, true);
break;
case "Guid":
qpiditType = "uuid";
valueString = messageValue.ToString();
break;
case "Byte[]":
qpiditType = "binary";
byte[] binstr = (byte[])messageValue;
valueString = SerializeBytes(binstr);
break;
case "String":
qpiditType = "string";
valueString = SerializeString(messageValue);
break;
case "Symbol":
qpiditType = "symbol";
valueString = SerializeString(messageValue);
break;
default:
qpiditType = "unknown";
valueString = String.Format("Unknown AMQP type: {0}", liteType);
throw new ApplicationException(valueString);
}
}
}
}
/// <summary>
/// Receive and categorize some number of messages.
/// </summary>
class Receiver
{
private string brokerUrl;
private string queueName;
private string amqpType;
private Int32 nExpected;
private Int32 nReceived;
private List<MessageValue> receivedMessageValues;
public Receiver(string brokerUrl_, string queueName_, string amqpType_, Int32 expected_)
{
brokerUrl = brokerUrl_;
queueName = queueName_;
amqpType = amqpType_;
nExpected = expected_;
nReceived = 0;
receivedMessageValues = new List<MessageValue>();
}
~Receiver()
{ }
public string ReceivedValueList
{
get
{
StringBuilder sb = new StringBuilder();
sb.Append("[");
for (int i = 0; i < receivedMessageValues.Count; i++)
{
if (i > 0) sb.Append(", ");
sb.Append(receivedMessageValues[i].ToString());
}
sb.Append("]");
return sb.ToString();
}
}
public void run()
{
ManualResetEvent receiverAttached = new ManualResetEvent(false);
OnAttached onReceiverAttached = (l, a) => { receiverAttached.Set(); };
Address address = new Address(string.Format("amqp://{0}", brokerUrl));
Connection connection = new Connection(address);
Session session = new Session(connection);
ReceiverLink receiverlink = new ReceiverLink(session,
"Lite-amqp-types-test-receiver",
new Source() { Address = queueName },
onReceiverAttached);
if (receiverAttached.WaitOne(10000))
{
while (nReceived < nExpected)
{
Message message = receiverlink.Receive(10000);
if (message != null)
{
nReceived += 1;
receiverlink.Accept(message);
MessageValue mv = new MessageValue(message.Body);
mv.Decode();
if (mv.QpiditTypeName != amqpType)
{
throw new ApplicationException(string.Format
("Incorrect AMQP type found in message body: expected: {0}; found: {1}",
amqpType, mv.QpiditTypeName));
}
//Console.WriteLine("{0} [{1}]", mv.QpiditTypeName, mv.ToString());
receivedMessageValues.Add(mv);
}
else
{
throw new ApplicationException(string.Format(
"Time out receiving message {0} of {1}", nReceived+1, nExpected));
}
}
}
else
{
throw new ApplicationException(string.Format(
"Time out attaching to test broker {0} queue {1}", brokerUrl, queueName));
}
receiverlink.Close();
session.Close();
connection.Close();
}
}
class MainProgram
{
static int Main(string[] args)
{
/*
* --- main ---
* Args: 1: Broker address (ip-addr:port)
* 2: Queue name
* 3: QPIDIT AMQP type name of expected message body values
* 4: Expected number of test values to receive
*/
int exitCode = 0;
try
{
if (args.Length != 4)
{
throw new ApplicationException(
"program requires four arguments: brokerAddr queueName amqpType nValues");
}
//Trace.TraceLevel = TraceLevel.Frame | TraceLevel.Verbose;
//Trace.TraceListener = (f, a) => Console.WriteLine(DateTime.Now.ToString("[hh:mm:ss.fff]") + " " + string.Format(f, a));
Receiver receiver = new Qpidit.Receiver(
args[0], args[1], args[2], Int32.Parse(args[3]));
receiver.run();
Console.WriteLine(args[2]);
Console.WriteLine("{0}", receiver.ReceivedValueList);
}
catch (Exception e)
{
string firstline = new StringReader(e.ToString()).ReadLine();
Console.Error.WriteLine("AmqpSender error: {0}.", firstline);
exitCode = 1;
}
return exitCode;
}
}
}