blob: ff767325dffc6492a484c96e63e3f35284ed2d10 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Web.Script.Serialization;
using Amqp;
using Amqp.Framing;
using Amqp.Types;
namespace Qpidit
{
/// <summary>
/// MessageValue holds a QpidIT type name and a json object created from the
/// CLI string argument, encodes it, and returns the object to be used as the
/// constructor for a message to be sent.
/// Complex values List and Map are constructed recursively.
/// Remaining singleton values like int or long are held directly as objects.
/// </summary>
class MessageValue
{
// Original type and json object
private string baseType;
private object baseValue;
// has Encode been called?
private Boolean encoded;
// Simple objects completely encoded
// List and Map defined recursively
private object valueDirect;
/// <summary>
/// Constructor
/// </summary>
/// <param name="type">qpidit type name</param>
/// <param name="value">json encoded object</param>
public MessageValue(string type, object value)
{
baseType = type;
baseValue = value;
encoded = false;
valueDirect = null;
}
/// <summary>
/// Create a MessageValue without knowing beforehand what
/// type of system object is being handled.
/// * Objects created in the list from the command line have a
/// known type and are created via the constructor.
/// * Objects created inside List and Map have only string
/// type externally but need an actual type to be figured out.
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
public static MessageValue CreateAutoType(object obj)
{
return new Qpidit.MessageValue(QpiditTypeOf(obj), obj);
}
/// <summary>
/// Return the native object that represents the encoded
/// Amqp value.
/// </summary>
/// <returns></returns>
public Object ToObject()
{
if (!encoded)
Encode();
return valueDirect;
}
/// <summary>
/// Return the Amqp Message that holds this value.
/// </summary>
/// <returns></returns>
public Message ToMessage()
{
return new Message(this.ToObject());
}
/// <summary>
/// Get an object's QPIDIT type
/// </summary>
/// <param name="obj">a .NET object</param>
/// <returns>QpidIT type name</returns>
public static string QpiditTypeOf(object obj)
{
string typename = obj.GetType().Name;
string qpiditType = null;
if (obj == null)
{
qpiditType = "null";
}
else
{
switch (typename)
{
case "Boolean":
qpiditType = "boolean";
break;
case "Byte":
qpiditType = "ubyte";
break;
case "UInt16":
qpiditType = "ushort";
break;
case "UInt32":
qpiditType = "uint";
break;
case "UInt64":
qpiditType = "ulong";
break;
case "SByte":
qpiditType = "byte";
break;
case "Int16":
qpiditType = "short";
break;
case "Int32":
qpiditType = "int";
break;
case "Int64":
qpiditType = "long";
break;
case "Single":
qpiditType = "float";
break;
case "Double":
qpiditType = "double";
break;
case "DateTime":
qpiditType = "timestamp";
break;
case "Guid":
qpiditType = "uuid";
break;
case "Byte[]":
qpiditType = "binary";
break;
case "String":
qpiditType = "string";
break;
case "Symbol":
qpiditType = "symbol";
break;
case "Array":
qpiditType = "list";
break;
case "ArrayList":
qpiditType = "list";
break;
case "Dictionary":
qpiditType = "map";
break;
case "Dictionary`2":
qpiditType = "map";
break;
default:
throw new ApplicationException(String.Format(
"Can not translate system type {0} to a QpidIT type", typename));
}
}
return qpiditType;
}
public string StripLeading0x(string value)
{
if (!value.StartsWith("0x"))
throw new ApplicationException(String.Format(
"EncodeUInt string does not start with '0x' : {0}", value));
return value.Substring(2);
}
public UInt64 EncodeUInt(string value)
{
UInt64 result = 0;
value = StripLeading0x(value);
result = UInt64.Parse(value, System.Globalization.NumberStyles.HexNumber);
return result;
}
public Int64 EncodeInt(string value)
{
Int64 result = 0;
bool isNegated = value.StartsWith("-");
if (isNegated)
value = value.Substring(1);
value = StripLeading0x(value);
result = Int64.Parse(value, System.Globalization.NumberStyles.HexNumber);
if (isNegated)
result = -result;
return result;
}
/// <summary>
/// Generate the object used to create a message
/// </summary>
public void Encode()
{
if (baseValue is Array)
{
// List
if (! String.Equals(baseType, "list", StringComparison.OrdinalIgnoreCase))
{
throw new ApplicationException(String.Format(
"Sender asked to encode a {0} but received a list: {1}", baseType, baseValue.ToString()));
}
valueDirect = new Amqp.Types.List();
foreach (object item in (Array)baseValue)
{
MessageValue itemValue = MessageValue.CreateAutoType(item);
itemValue.Encode();
((Amqp.Types.List)valueDirect).Add(itemValue.ToObject());
}
}
if (baseValue is ArrayList)
{
// List
if (! String.Equals(baseType, "list", StringComparison.OrdinalIgnoreCase))
{
throw new ApplicationException(String.Format(
"Sender asked to encode a {0} but received a list: {1}", baseType, baseValue.ToString()));
}
valueDirect = new Amqp.Types.List();
foreach (object item in (ArrayList)baseValue)
{
MessageValue itemValue = MessageValue.CreateAutoType(item);
itemValue.Encode();
((Amqp.Types.List)valueDirect).Add(itemValue.ToObject());
}
}
else if (baseValue is Dictionary<string, object>)
{
// Map
if (!String.Equals(baseType, "map", StringComparison.OrdinalIgnoreCase))
{
throw new ApplicationException(String.Format(
"Sender asked to encode a {0} but received a map: {1}", baseType, baseValue.ToString()));
}
valueDirect = new Amqp.Types.Map();
Dictionary<string, object> myDict = new Dictionary<string, object>();
myDict = (Dictionary<string, object>)baseValue;
foreach (var key in myDict.Keys)
{
MessageValue itemValue = MessageValue.CreateAutoType(myDict[key]);
((Amqp.Types.Map)valueDirect)[key] = itemValue.ToObject();
}
}
else if (baseValue is String)
{
string value;
UInt64 valueUInt64;
Int64 valueInt64;
switch (baseType)
{
case "null":
valueDirect = null;
break;
case "boolean":
value = (string)baseValue;
bool mybool = String.Equals(value, "true", StringComparison.OrdinalIgnoreCase);
valueDirect = mybool;
break;
case "ubyte":
value = (string)baseValue;
valueUInt64 = EncodeUInt(value);
Byte mybyte = (Byte)(valueUInt64 & 0xff);
valueDirect = mybyte;
break;
case "ushort":
value = (string)baseValue;
valueUInt64 = EncodeUInt(value);
UInt16 myuint16 = (UInt16)(valueUInt64 & 0xffff);
valueDirect = myuint16;
break;
case "uint":
value = (string)baseValue;
valueUInt64 = EncodeUInt(value);
UInt32 myuint32 = (UInt32)(valueUInt64 & 0xffffffff);
valueDirect = myuint32;
break;
case "ulong":
value = (string)baseValue;
valueUInt64 = EncodeUInt(value);
valueDirect = valueUInt64;
break;
case "byte":
value = (string)baseValue;
valueInt64 = EncodeInt(value);
SByte mysbyte = (SByte)(valueInt64 & 0xff);
valueDirect = mysbyte;
break;
case "short":
value = (string)baseValue;
valueInt64 = EncodeInt(value);
Int16 myint16 = (Int16)(valueInt64 & 0xffff);
valueDirect = myint16;
break;
case "int":
value = (string)baseValue;
valueInt64 = EncodeInt(value);
Int32 myint32 = (Int32)(valueInt64 & 0xffffffff);
valueDirect = myint32;
break;
case "long":
value = (string)baseValue;
valueInt64 = EncodeInt(value);
valueDirect = valueInt64;
break;
case "float":
value = StripLeading0x((string)baseValue);
UInt32 num32 = UInt32.Parse(value, System.Globalization.NumberStyles.AllowHexSpecifier);
byte[] floatVals = BitConverter.GetBytes(num32);
float flt = BitConverter.ToSingle(floatVals, 0);
valueDirect = flt;
break;
case "double":
value = StripLeading0x((string)baseValue);
UInt64 num64 = UInt64.Parse(value, System.Globalization.NumberStyles.AllowHexSpecifier);
byte[] doubleVals = BitConverter.GetBytes(num64);
double dbl = BitConverter.ToDouble(doubleVals, 0);
valueDirect = dbl;
break;
case "timestamp":
// epochTicks is the number of 100uSec ticks between 01/01/0001
// and 01/01/1970. Used to adjust between DateTime and unix epoch.
const long epochTicks = 621355968000000000;
value = StripLeading0x((string)baseValue);
Int64 dtticks = Int64.Parse(value, System.Globalization.NumberStyles.AllowHexSpecifier);
dtticks *= TimeSpan.TicksPerMillisecond;
dtticks += epochTicks;
DateTime dt = new DateTime(dtticks, DateTimeKind.Utc);
valueDirect = dt;
break;
case "uuid":
value = (string)baseValue;
Guid guid = new Guid(value);
valueDirect = guid;
break;
case "binary":
// TODO: fix this
value = (string)baseValue;
byte[] binval = Encoding.ASCII.GetBytes(value);
valueDirect = binval;
break;
case "string":
valueDirect = (string)baseValue;
break;
case "symbol":
Symbol sym = new Symbol((string)baseValue);
valueDirect = sym;
break;
case "list":
throw new ApplicationException(String.Format(
"Sender asked to encode a list but received a string: {0}", baseValue));
case "map":
throw new ApplicationException(String.Format(
"Sender asked to encode a map but received a string: {0}", baseValue));
case "decimal32":
case "decimal64":
case "decimal128":
throw new ApplicationException(String.Format(
"AMQP.Net Lite does not support AMQP decimal type: {0}", baseType));
default:
throw new ApplicationException(String.Format(
"Sender can not encode base type: {0}", baseType));
}
}
else
{
throw new ApplicationException(String.Format(
"Sender can not encode object type {0}", baseValue.GetType().Name));
}
encoded = true;
}
}
class Sender
{
private string brokerUrl;
private string queueName;
private string amqpType;
private string jsonMessages;
public Sender(string brokerUrl_, string queueName_, string amqpType_, string jsonMessages_)
{
brokerUrl = brokerUrl_;
queueName = queueName_;
amqpType = amqpType_;
jsonMessages = jsonMessages_;
}
~Sender()
{ }
public void run()
{
List<Message> messagesToSend = new List<Message>();
// Deserialize the json message list
JavaScriptSerializer jss = new JavaScriptSerializer();
var itMsgs = jss.Deserialize<dynamic>(jsonMessages);
//if (!(itMsgs is Array))
// throw new ApplicationException(String.Format(
// "Messages are not formatted as a json list: {0}, but as type: {1}", jsonMessages, itMsgs.GetType().Name));
// Generate messages
foreach (object itMsg in itMsgs)
{
MessageValue mv = new MessageValue(amqpType, itMsg);
mv.Encode();
messagesToSend.Add(mv.ToMessage());
}
// Send the messages
ManualResetEvent senderAttached = new ManualResetEvent(false);
OnAttached onSenderAttached = (l, a) => { senderAttached.Set(); };
Address address = new Address(string.Format("amqp://{0}", brokerUrl));
Connection connection = new Connection(address);
Session session = new Session(connection);
SenderLink sender = new SenderLink(session,
"Lite-amqp-types-test-sender",
new Target() { Address = queueName },
onSenderAttached);
if (senderAttached.WaitOne(10000))
{
foreach (Message message in messagesToSend)
{
sender.Send(message);
}
}
else
{
throw new ApplicationException(string.Format(
"Time out attaching to test broker {0} queue {1}", brokerUrl, queueName));
}
sender.Close();
session.Close();
connection.Close();
}
}
class MainProgram
{
static int Main(string[] args)
{
/*
* --- main ---
* Args: 1: Broker address (ip-addr:port)
* 2: Queue name
* 3: AMQP type
* 4: Test value(s) as JSON string
*/
int exitCode = 0;
try
{
if (args.Length != 4)
{
throw new ApplicationException(
"program requires four arguments: brokerAddr queueName amqpType jsonValuesToSend");
}
//Trace.TraceLevel = TraceLevel.Frame | TraceLevel.Verbose;
//Trace.TraceListener = (f, a) => Console.WriteLine(DateTime.Now.ToString("[hh:mm:ss.fff]") + " " + string.Format(f, a));
Sender sender = new Qpidit.Sender(args[0], args[1], args[2], args[3]);
sender.run();
}
catch (Exception e)
{
string firstline = new StringReader(e.ToString()).ReadLine();
Console.Error.WriteLine("AmqpSender error: {0}.", firstline);
exitCode = 1;
}
return exitCode;
}
}
}