Add support for serializing/deserializing BytesMessages.
Fixes [AMQNET-491]. (See https://issues.apache.org/jira/browse/AMQNET-491)

diff --git a/src/main/csharp/BaseMessage.cs b/src/main/csharp/BaseMessage.cs
index af0f1b9..be3b784 100644
--- a/src/main/csharp/BaseMessage.cs
+++ b/src/main/csharp/BaseMessage.cs
@@ -36,12 +36,19 @@
 		private string type;

 		private event AcknowledgeHandler Acknowledger;

 		private DateTime timestamp = new DateTime();

+		private bool readOnlyMsgProperties = false;

 		private bool readOnlyMsgBody = false;

 

-		public bool ReadOnlyBody

+		public virtual bool ReadOnlyProperties

 		{

-			get { return readOnlyMsgBody; }

-			set { readOnlyMsgBody = value; }

+			get { return this.readOnlyMsgProperties; }

+			set { this.readOnlyMsgProperties = value; }

+		}

+

+		public virtual bool ReadOnlyBody

+		{

+			get { return this.readOnlyMsgBody; }

+			set { this.readOnlyMsgBody = value; }

 		}

 

 		// IMessage interface

@@ -155,7 +162,6 @@
 			set { }

 		}

 

-

 		/// <summary>

 		/// The destination that the consumer of this message should send replies to

 		/// </summary>

@@ -190,7 +196,6 @@
 			set { type = value; }

 		}

 

-

 		public object GetObjectProperty(string name)

 		{

 			return null;

@@ -200,9 +205,15 @@
 		{

 		}

 

+		public virtual void OnSend()

+		{

+			this.ReadOnlyProperties = true;

+			this.ReadOnlyBody = true;

+		}

+

 		protected void FailIfReadOnlyBody()

 		{

-			if(ReadOnlyBody == true)

+			if(ReadOnlyBody)

 			{

 				throw new MessageNotWriteableException("Message is in Read-Only mode.");

 			}

@@ -210,7 +221,7 @@
 

 		protected void FailIfWriteOnlyBody()

 		{

-			if(ReadOnlyBody == false)

+			if(!ReadOnlyBody)

 			{

 				throw new MessageNotReadableException("Message is in Write-Only mode.");

 			}

diff --git a/src/main/csharp/Destination.cs b/src/main/csharp/Destination.cs
index 5f0cecd..fe91b6e 100644
--- a/src/main/csharp/Destination.cs
+++ b/src/main/csharp/Destination.cs
@@ -36,6 +36,7 @@
 		protected ZmqSocket producerEndpoint = null;

 		protected ZmqSocket consumerEndpoint = null;

 		protected string destinationName;

+		internal byte[] rawDestinationName;

 

 		private bool disposed = false;

 

@@ -47,6 +48,7 @@
 		{

 			this.session = session;

 			this.destinationName = destName;

+			this.rawDestinationName = Destination.encoding.GetBytes(this.destinationName);

 			this.session.RegisterDestination(this);

 		}

 

@@ -88,23 +90,8 @@
 		/// </summary>

 		protected virtual void OnDispose()

 		{

-			if(null != this.producerEndpoint)

-			{

-				if(null != this.session

-					&& null != this.session.Connection)

-				{

-					this.session.Connection.ReleaseProducer(this.producerEndpoint);

-				}

-

-				this.producerEndpoint = null;

-			}

-

-			if(null != this.consumerEndpoint)

-			{

-				this.session.Connection.ReleaseConsumer(this.consumerEndpoint);

-				this.consumerEndpoint = null;

-			}

-

+			DeinitSender();

+			DeinitReceiver();

 			this.session.UnregisterDestination(this);

 		}

 

@@ -190,6 +177,20 @@
 			}

 		}

 

+		internal void DeinitSender()

+		{

+			if(null != this.producerEndpoint)

+			{

+				if(null != this.session

+					&& null != this.session.Connection)

+				{

+					this.session.Connection.ReleaseProducer(this.producerEndpoint);

+				}

+

+				this.producerEndpoint = null;

+			}

+		}

+

 		internal void InitReceiver()

 		{

 			if(null == this.consumerEndpoint)

@@ -198,34 +199,27 @@
 

 				this.consumerEndpoint = connection.GetConsumer();

 				// Must subscribe first before connecting to the endpoint binding

-				this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName));

+				this.consumerEndpoint.Subscribe(this.rawDestinationName);

 				this.consumerEndpoint.Connect(connection.GetConsumerBindingPath());

 			}

 		}

 

-		internal void Subscribe(string prefixName)

-		{

-			InitReceiver();

-			this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName));

-		}

-

-		internal void Unsubscribe(string prefixName)

+		internal void DeinitReceiver()

 		{

 			if(null != this.consumerEndpoint)

 			{

-				this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName));

+				this.session.Connection.ReleaseConsumer(this.consumerEndpoint);

+				this.consumerEndpoint = null;

 			}

 		}

 

 		internal SendStatus Send(string msg)

 		{

-			Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");

 			return this.producerEndpoint.Send(msg, Destination.encoding);

 		}

 

 		internal SendStatus Send(byte[] buffer)

 		{

-			Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");

 			return this.producerEndpoint.Send(buffer);

 		}

 

@@ -246,20 +240,6 @@
 			this.InitReceiver();

 			return this.consumerEndpoint.Receive(null, flags, out size);

 		}

-

-		internal Frame ReceiveFrame()

-		{

-			// TODO: Implement

-			this.InitReceiver();

-			return null;

-		}

-

-		internal ZmqMessage ReceiveMessage()

-		{

-			// TODO: Implement

-			this.InitReceiver();

-			return null;

-		}

 	}

 }

 

diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index a0ddf13..5530050 100644
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -40,7 +40,6 @@
 		private object asyncDeliveryLock = new object();

 		private bool asyncDelivery = false;

 		private bool asyncInit = false;

-		private byte[] rawDestinationName;

 

 		private ConsumerTransformerDelegate consumerTransformer;

 		public ConsumerTransformerDelegate ConsumerTransformer

@@ -82,7 +81,6 @@
 

 			this.session = sess;

 			this.destination = theDest;

-			this.rawDestinationName = Destination.encoding.GetBytes(this.destination.Name);

 			this.acknowledgementMode = ackMode;

 		}

 

@@ -145,7 +143,7 @@
 			if(size > 0)

 			{

 				// Strip off the subscribed destination name.

-				int receivedMsgIndex = this.rawDestinationName.Length;

+				int receivedMsgIndex = this.destination.rawDestinationName.Length;

 				int msgLength = receivedMsg.Length - receivedMsgIndex;

 				byte[] msgContent = new byte[msgLength];

 

@@ -406,6 +404,14 @@
 				}

 				break;

 

+			case WireFormat.MT_BYTESMESSAGE:

+				nmsMessage = new BytesMessage();

+				if(null != messageBody)

+				{

+					((BytesMessage) nmsMessage).Content = messageBody;

+				}

+				break;

+

 			case WireFormat.MT_UNKNOWN:

 			default:

 				break;

@@ -444,6 +450,9 @@
 						nmsMessage = transformedMessage as BaseMessage;

 					}

 				}

+

+				nmsMessage.ReadOnlyBody = true;

+				nmsMessage.ReadOnlyProperties = true;

 			}

 

 			return nmsMessage;

diff --git a/src/main/csharp/MessageProducer.cs b/src/main/csharp/MessageProducer.cs
index e0639b6..d114468 100644
--- a/src/main/csharp/MessageProducer.cs
+++ b/src/main/csharp/MessageProducer.cs
@@ -19,8 +19,8 @@
 

 using System;

 using System.Collections.Generic;

-using System.Text;

 using System.Net;

+using System.Text;

 using Apache.NMS.Util;

 

 namespace Apache.NMS.ZMQ

@@ -48,13 +48,35 @@
 

 		public MessageProducer(Session sess, IDestination dest)

 		{

-			if(null == sess.Connection.Context)

+			if(null == sess

+				|| null == sess.Connection

+				|| null == sess.Connection.Context)

 			{

 				throw new NMSConnectionException();

 			}

 

+			Destination theDest = dest as Destination;

+

+			if(null == theDest)

+			{

+				throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");

+			}

+			else if(null == theDest.Name)

+			{

+				throw new InvalidDestinationException("The destination object was not given a physical name.");

+			}

+			else if(theDest.IsTemporary)

+			{

+				String physicalName = theDest.Name;

+

+				if(String.IsNullOrEmpty(physicalName))

+				{

+					throw new InvalidDestinationException("Physical name of Destination should be valid: " + theDest);

+				}

+			}

+

 			this.session = sess;

-			this.destination = (Destination) dest;

+			this.destination = theDest;

 			this.destination.InitSender();

 		}

 

@@ -150,6 +172,17 @@
 					EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody);

 				}

 			}

+			else if(message is IBytesMessage)

+			{

+				EncodeField(msgDataBuilder, WireFormat.MFT_MSGTYPE, WireFormat.MT_BYTESMESSAGE);

+				// Append the message text body to the msg.

+				byte[] msgBody = ((IBytesMessage) message).Content;

+

+				if(null != msgBody)

+				{

+					EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody);

+				}

+			}

 			else

 			{

 				// TODO: Add support for more message types

@@ -158,6 +191,8 @@
 

 			// Put the sentinal field marker.

 			EncodeField(msgDataBuilder, WireFormat.MFT_NONE, 0);

+

+			((BaseMessage) message).OnSend();

 			theDest.Send(msgDataBuilder.ToArray());

 		}

 

diff --git a/src/main/csharp/TemporaryQueue.cs b/src/main/csharp/TemporaryQueue.cs
index 8d10023..8938114 100644
--- a/src/main/csharp/TemporaryQueue.cs
+++ b/src/main/csharp/TemporaryQueue.cs
@@ -25,7 +25,7 @@
 	public class TemporaryQueue : Destination, ITemporaryQueue

 	{

 		public TemporaryQueue(Session session)

-			: base(session, Guid.NewGuid().ToString())

+			: base(session, "TEMPQUEUE." + Guid.NewGuid().ToString())

 		{

 		}

 

diff --git a/src/main/csharp/TemporaryTopic.cs b/src/main/csharp/TemporaryTopic.cs
index bfbf80c..bd5c2e5 100644
--- a/src/main/csharp/TemporaryTopic.cs
+++ b/src/main/csharp/TemporaryTopic.cs
@@ -25,7 +25,7 @@
 	public class TemporaryTopic : Destination, ITemporaryTopic

 	{

 		public TemporaryTopic(Session session)

-			: base(session, Guid.NewGuid().ToString())

+			: base(session, "TEMPTOPIC." + Guid.NewGuid().ToString())

 		{

 		}