Add support for serializing/deserializing BytesMessages.
Fixes [AMQNET-491]. (See https://issues.apache.org/jira/browse/AMQNET-491)
diff --git a/src/main/csharp/BaseMessage.cs b/src/main/csharp/BaseMessage.cs
index af0f1b9..be3b784 100644
--- a/src/main/csharp/BaseMessage.cs
+++ b/src/main/csharp/BaseMessage.cs
@@ -36,12 +36,19 @@
private string type;
private event AcknowledgeHandler Acknowledger;
private DateTime timestamp = new DateTime();
+ private bool readOnlyMsgProperties = false;
private bool readOnlyMsgBody = false;
- public bool ReadOnlyBody
+ public virtual bool ReadOnlyProperties
{
- get { return readOnlyMsgBody; }
- set { readOnlyMsgBody = value; }
+ get { return this.readOnlyMsgProperties; }
+ set { this.readOnlyMsgProperties = value; }
+ }
+
+ public virtual bool ReadOnlyBody
+ {
+ get { return this.readOnlyMsgBody; }
+ set { this.readOnlyMsgBody = value; }
}
// IMessage interface
@@ -155,7 +162,6 @@
set { }
}
-
/// <summary>
/// The destination that the consumer of this message should send replies to
/// </summary>
@@ -190,7 +196,6 @@
set { type = value; }
}
-
public object GetObjectProperty(string name)
{
return null;
@@ -200,9 +205,15 @@
{
}
+ public virtual void OnSend()
+ {
+ this.ReadOnlyProperties = true;
+ this.ReadOnlyBody = true;
+ }
+
protected void FailIfReadOnlyBody()
{
- if(ReadOnlyBody == true)
+ if(ReadOnlyBody)
{
throw new MessageNotWriteableException("Message is in Read-Only mode.");
}
@@ -210,7 +221,7 @@
protected void FailIfWriteOnlyBody()
{
- if(ReadOnlyBody == false)
+ if(!ReadOnlyBody)
{
throw new MessageNotReadableException("Message is in Write-Only mode.");
}
diff --git a/src/main/csharp/Destination.cs b/src/main/csharp/Destination.cs
index 5f0cecd..fe91b6e 100644
--- a/src/main/csharp/Destination.cs
+++ b/src/main/csharp/Destination.cs
@@ -36,6 +36,7 @@
protected ZmqSocket producerEndpoint = null;
protected ZmqSocket consumerEndpoint = null;
protected string destinationName;
+ internal byte[] rawDestinationName;
private bool disposed = false;
@@ -47,6 +48,7 @@
{
this.session = session;
this.destinationName = destName;
+ this.rawDestinationName = Destination.encoding.GetBytes(this.destinationName);
this.session.RegisterDestination(this);
}
@@ -88,23 +90,8 @@
/// </summary>
protected virtual void OnDispose()
{
- if(null != this.producerEndpoint)
- {
- if(null != this.session
- && null != this.session.Connection)
- {
- this.session.Connection.ReleaseProducer(this.producerEndpoint);
- }
-
- this.producerEndpoint = null;
- }
-
- if(null != this.consumerEndpoint)
- {
- this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
- this.consumerEndpoint = null;
- }
-
+ DeinitSender();
+ DeinitReceiver();
this.session.UnregisterDestination(this);
}
@@ -190,6 +177,20 @@
}
}
+ internal void DeinitSender()
+ {
+ if(null != this.producerEndpoint)
+ {
+ if(null != this.session
+ && null != this.session.Connection)
+ {
+ this.session.Connection.ReleaseProducer(this.producerEndpoint);
+ }
+
+ this.producerEndpoint = null;
+ }
+ }
+
internal void InitReceiver()
{
if(null == this.consumerEndpoint)
@@ -198,34 +199,27 @@
this.consumerEndpoint = connection.GetConsumer();
// Must subscribe first before connecting to the endpoint binding
- this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName));
+ this.consumerEndpoint.Subscribe(this.rawDestinationName);
this.consumerEndpoint.Connect(connection.GetConsumerBindingPath());
}
}
- internal void Subscribe(string prefixName)
- {
- InitReceiver();
- this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName));
- }
-
- internal void Unsubscribe(string prefixName)
+ internal void DeinitReceiver()
{
if(null != this.consumerEndpoint)
{
- this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName));
+ this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
+ this.consumerEndpoint = null;
}
}
internal SendStatus Send(string msg)
{
- Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");
return this.producerEndpoint.Send(msg, Destination.encoding);
}
internal SendStatus Send(byte[] buffer)
{
- Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");
return this.producerEndpoint.Send(buffer);
}
@@ -246,20 +240,6 @@
this.InitReceiver();
return this.consumerEndpoint.Receive(null, flags, out size);
}
-
- internal Frame ReceiveFrame()
- {
- // TODO: Implement
- this.InitReceiver();
- return null;
- }
-
- internal ZmqMessage ReceiveMessage()
- {
- // TODO: Implement
- this.InitReceiver();
- return null;
- }
}
}
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index a0ddf13..5530050 100644
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -40,7 +40,6 @@
private object asyncDeliveryLock = new object();
private bool asyncDelivery = false;
private bool asyncInit = false;
- private byte[] rawDestinationName;
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
@@ -82,7 +81,6 @@
this.session = sess;
this.destination = theDest;
- this.rawDestinationName = Destination.encoding.GetBytes(this.destination.Name);
this.acknowledgementMode = ackMode;
}
@@ -145,7 +143,7 @@
if(size > 0)
{
// Strip off the subscribed destination name.
- int receivedMsgIndex = this.rawDestinationName.Length;
+ int receivedMsgIndex = this.destination.rawDestinationName.Length;
int msgLength = receivedMsg.Length - receivedMsgIndex;
byte[] msgContent = new byte[msgLength];
@@ -406,6 +404,14 @@
}
break;
+ case WireFormat.MT_BYTESMESSAGE:
+ nmsMessage = new BytesMessage();
+ if(null != messageBody)
+ {
+ ((BytesMessage) nmsMessage).Content = messageBody;
+ }
+ break;
+
case WireFormat.MT_UNKNOWN:
default:
break;
@@ -444,6 +450,9 @@
nmsMessage = transformedMessage as BaseMessage;
}
}
+
+ nmsMessage.ReadOnlyBody = true;
+ nmsMessage.ReadOnlyProperties = true;
}
return nmsMessage;
diff --git a/src/main/csharp/MessageProducer.cs b/src/main/csharp/MessageProducer.cs
index e0639b6..d114468 100644
--- a/src/main/csharp/MessageProducer.cs
+++ b/src/main/csharp/MessageProducer.cs
@@ -19,8 +19,8 @@
using System;
using System.Collections.Generic;
-using System.Text;
using System.Net;
+using System.Text;
using Apache.NMS.Util;
namespace Apache.NMS.ZMQ
@@ -48,13 +48,35 @@
public MessageProducer(Session sess, IDestination dest)
{
- if(null == sess.Connection.Context)
+ if(null == sess
+ || null == sess.Connection
+ || null == sess.Connection.Context)
{
throw new NMSConnectionException();
}
+ Destination theDest = dest as Destination;
+
+ if(null == theDest)
+ {
+ throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+ }
+ else if(null == theDest.Name)
+ {
+ throw new InvalidDestinationException("The destination object was not given a physical name.");
+ }
+ else if(theDest.IsTemporary)
+ {
+ String physicalName = theDest.Name;
+
+ if(String.IsNullOrEmpty(physicalName))
+ {
+ throw new InvalidDestinationException("Physical name of Destination should be valid: " + theDest);
+ }
+ }
+
this.session = sess;
- this.destination = (Destination) dest;
+ this.destination = theDest;
this.destination.InitSender();
}
@@ -150,6 +172,17 @@
EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody);
}
}
+ else if(message is IBytesMessage)
+ {
+ EncodeField(msgDataBuilder, WireFormat.MFT_MSGTYPE, WireFormat.MT_BYTESMESSAGE);
+ // Append the message text body to the msg.
+ byte[] msgBody = ((IBytesMessage) message).Content;
+
+ if(null != msgBody)
+ {
+ EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody);
+ }
+ }
else
{
// TODO: Add support for more message types
@@ -158,6 +191,8 @@
// Put the sentinal field marker.
EncodeField(msgDataBuilder, WireFormat.MFT_NONE, 0);
+
+ ((BaseMessage) message).OnSend();
theDest.Send(msgDataBuilder.ToArray());
}
diff --git a/src/main/csharp/TemporaryQueue.cs b/src/main/csharp/TemporaryQueue.cs
index 8d10023..8938114 100644
--- a/src/main/csharp/TemporaryQueue.cs
+++ b/src/main/csharp/TemporaryQueue.cs
@@ -25,7 +25,7 @@
public class TemporaryQueue : Destination, ITemporaryQueue
{
public TemporaryQueue(Session session)
- : base(session, Guid.NewGuid().ToString())
+ : base(session, "TEMPQUEUE." + Guid.NewGuid().ToString())
{
}
diff --git a/src/main/csharp/TemporaryTopic.cs b/src/main/csharp/TemporaryTopic.cs
index bfbf80c..bd5c2e5 100644
--- a/src/main/csharp/TemporaryTopic.cs
+++ b/src/main/csharp/TemporaryTopic.cs
@@ -25,7 +25,7 @@
public class TemporaryTopic : Destination, ITemporaryTopic
{
public TemporaryTopic(Session session)
- : base(session, Guid.NewGuid().ToString())
+ : base(session, "TEMPTOPIC." + Guid.NewGuid().ToString())
{
}