Implement disposable pattern for Connection.
Only dispose the producer endpoint after its final release.
Add overloaded send/receive API for destinations.
Initialize the sockets on the correct message handler thread.
diff --git a/src/main/csharp/BaseMessage.cs b/src/main/csharp/BaseMessage.cs
index 17db55a..49ff291 100644
--- a/src/main/csharp/BaseMessage.cs
+++ b/src/main/csharp/BaseMessage.cs
@@ -29,7 +29,7 @@
private string correlationId;
private TimeSpan timeToLive;
private string messageId;
- private MsgDeliveryMode deliveryMode;
+ private MsgDeliveryMode deliveryMode = MsgDeliveryMode.NonPersistent;
private MsgPriority priority;
private Destination replyTo;
private byte[] content;
diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index 3fb6142..4def1c5 100644
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -19,6 +19,7 @@
using ZeroMQ;
using System.Collections.Generic;
using System.Text;
+using System.Collections;
namespace Apache.NMS.ZMQ
{
@@ -46,24 +47,87 @@
/// <summary>
/// ZMQ context
/// </summary>
+ private static object contextLock = new object();
+ private static int instanceCount = 0;
private static ZmqContext _context;
- private static Dictionary<string, ProducerRef> producerCache;
- private static object producerCacheLock;
+ private static Dictionary<string, ProducerRef> producerCache = new Dictionary<string, ProducerRef>();
+ private static object producerCacheLock = new object();
+ private TimeSpan zeroTimeout = new TimeSpan(0);
- static Connection()
+ private bool disposed = false;
+
+ private static void InitContext()
{
- Connection._context = ZmqContext.Create();
- Connection.producerCache = new Dictionary<string, ProducerRef>();
- Connection.producerCacheLock = new object();
+ lock(contextLock)
+ {
+ if(0 == instanceCount++)
+ {
+ Connection._context = ZmqContext.Create();
+ }
+ }
+ }
+
+ private static void DestroyContext()
+ {
+ lock(contextLock)
+ {
+ if(0 == --instanceCount)
+ {
+ Connection._context.Dispose();
+ }
+ }
}
public Connection(Uri connectionUri)
{
+ InitContext();
this.brokerUri = connectionUri;
this.producerContextBinding = string.Format("{0}://*:{1}", this.brokerUri.Scheme, this.brokerUri.Port);
this.consumerContextBinding = string.Format("{0}://{1}:{2}", brokerUri.Scheme, brokerUri.Host, this.brokerUri.Port);
}
+ ~Connection()
+ {
+ Dispose(false);
+ }
+
+ public void Dispose()
+ {
+ Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ private void Dispose(bool disposing)
+ {
+ if(disposed)
+ {
+ return;
+ }
+
+ if(disposing)
+ {
+ try
+ {
+ OnDispose();
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Exception disposing Connection {0}: {1}", this.brokerUri.AbsoluteUri, ex.Message);
+ }
+ }
+
+ disposed = true;
+ }
+
+ /// <summary>
+ /// Child classes can override this method to perform clean-up logic.
+ /// </summary>
+ protected virtual void OnDispose()
+ {
+ Close();
+ DestroyContext();
+ }
+
/// <summary>
/// Starts message delivery for this connection.
/// </summary>
@@ -147,12 +211,13 @@
{
producerCache.Remove(contextBinding);
producerRef.producer.Unbind(contextBinding);
+ producerRef.producer.Dispose();
}
}
}
}
- internal ZmqSocket GetConsumer(Encoding encoding, string destinationName)
+ internal ZmqSocket GetConsumer()
{
ZmqSocket endpoint = this.Context.CreateSocket(SocketType.SUB);
@@ -160,8 +225,6 @@
{
throw new ResourceAllocationException();
}
- endpoint.Subscribe(encoding.GetBytes(destinationName));
- endpoint.Connect(GetConsumerBindingPath());
return endpoint;
}
@@ -169,6 +232,7 @@
internal void ReleaseConsumer(ZmqSocket endpoint)
{
endpoint.Disconnect(GetConsumerBindingPath());
+ endpoint.Dispose();
}
internal string GetProducerContextBinding()
@@ -176,16 +240,11 @@
return this.producerContextBinding;
}
- private string GetConsumerBindingPath()
+ internal string GetConsumerBindingPath()
{
return this.consumerContextBinding;
}
- public void Dispose()
- {
- Close();
- }
-
public void Close()
{
Stop();
diff --git a/src/main/csharp/Destination.cs b/src/main/csharp/Destination.cs
index 7178e94..2fb045f 100644
--- a/src/main/csharp/Destination.cs
+++ b/src/main/csharp/Destination.cs
@@ -18,6 +18,7 @@
using System;
using System.Text;
using ZeroMQ;
+using System.Diagnostics;
namespace Apache.NMS.ZMQ
{
@@ -26,6 +27,8 @@
/// </summary>
public abstract class Destination : IDestination
{
+ public static Encoding encoding = Encoding.UTF8;
+
protected Session session;
/// <summary>
/// Socket object
@@ -92,14 +95,12 @@
this.session.Connection.ReleaseProducer(this.producerEndpoint);
}
- this.producerEndpoint.Dispose();
this.producerEndpoint = null;
}
if(null != this.consumerEndpoint)
{
this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
- this.consumerEndpoint.Dispose();
this.consumerEndpoint = null;
}
}
@@ -178,35 +179,82 @@
get;
}
- internal int Send(byte[] buffer, TimeSpan timeout)
+ internal void InitSender()
{
if(null == this.producerEndpoint)
{
this.producerEndpoint = this.session.Connection.GetProducer();
}
-
- return this.producerEndpoint.Send(buffer, buffer.Length, SocketFlags.None, timeout);
}
- internal string Receive(Encoding encoding, TimeSpan timeout)
+ internal void InitReceiver()
{
if(null == this.consumerEndpoint)
{
- this.consumerEndpoint = this.session.Connection.GetConsumer(encoding, this.destinationName);
- }
+ Connection connection = this.session.Connection;
- return consumerEndpoint.Receive(encoding, timeout);
+ this.consumerEndpoint = connection.GetConsumer();
+ // Must subscribe first before connecting to the endpoint binding
+ this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName));
+ this.consumerEndpoint.Connect(connection.GetConsumerBindingPath());
+ }
+ }
+
+ internal void Subscribe(string prefixName)
+ {
+ InitReceiver();
+ this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName));
+ }
+
+ internal void Unsubscribe(string prefixName)
+ {
+ if(null != this.consumerEndpoint)
+ {
+ this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName));
+ }
+ }
+
+ internal SendStatus Send(string msg)
+ {
+ Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");
+ return this.producerEndpoint.Send(msg, Destination.encoding);
+ }
+
+ internal SendStatus Send(byte[] buffer)
+ {
+ Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");
+ return this.producerEndpoint.Send(buffer);
+ }
+
+ internal string ReceiveString(TimeSpan timeout)
+ {
+ this.InitReceiver();
+ return this.consumerEndpoint.Receive(Destination.encoding, timeout);
+ }
+
+ internal byte[] ReceiveBytes(TimeSpan timeout, out int size)
+ {
+ this.InitReceiver();
+ return this.consumerEndpoint.Receive(null, timeout, out size);
+ }
+
+ internal byte[] ReceiveBytes(SocketFlags flags, out int size)
+ {
+ this.InitReceiver();
+ return this.consumerEndpoint.Receive(null, flags, out size);
}
internal Frame ReceiveFrame()
{
// TODO: Implement
+ this.InitReceiver();
return null;
}
internal ZmqMessage ReceiveMessage()
{
// TODO: Implement
+ this.InitReceiver();
return null;
}
}
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index a26ae57..486c992 100644
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -15,14 +15,10 @@
* limitations under the License.
*/
-#define PUBSUB
-
using System;
+using System.Diagnostics;
using System.Text;
using System.Threading;
-using Apache.NMS.Util;
-using ZeroMQ;
-using System.Diagnostics;
namespace Apache.NMS.ZMQ
{
@@ -31,7 +27,7 @@
/// </summary>
public class MessageConsumer : IMessageConsumer
{
- protected TimeSpan zeroTimeout = new TimeSpan(0);
+ protected static readonly TimeSpan zeroTimeout = new TimeSpan(0);
private readonly Session session;
private readonly AcknowledgementMode acknowledgementMode;
@@ -41,6 +37,8 @@
private Thread asyncDeliveryThread = null;
private object asyncDeliveryLock = new object();
private bool asyncDelivery = false;
+ private bool asyncInit = false;
+ private byte[] rawDestinationName;
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
@@ -60,29 +58,40 @@
this.session = sess;
this.destination = (Destination) dest;
+ this.rawDestinationName = Destination.encoding.GetBytes(this.destination.Name);
this.acknowledgementMode = ackMode;
}
+ private object listenerLock = new object();
public event MessageListener Listener
{
add
{
- this.listener += value;
- this.listenerCount++;
- StartAsyncDelivery();
+ lock(listenerLock)
+ {
+ this.listener += value;
+ if(0 == this.listenerCount)
+ {
+ StartAsyncDelivery();
+ }
+
+ this.listenerCount++;
+ }
}
remove
{
- if(this.listenerCount > 0)
+ lock(listenerLock)
{
this.listener -= value;
- this.listenerCount--;
- }
-
- if(0 == listenerCount)
- {
- StopAsyncDelivery();
+ if(this.listenerCount > 0)
+ {
+ this.listenerCount--;
+ if(0 == this.listenerCount)
+ {
+ StopAsyncDelivery();
+ }
+ }
}
}
}
@@ -106,15 +115,17 @@
/// </returns>
public IMessage Receive(TimeSpan timeout)
{
- // TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
- string msgContent = this.destination.Receive(Encoding.UTF8, timeout);
+ int size;
+ byte[] receivedMsg = this.destination.ReceiveBytes(timeout, out size);
- if(null != msgContent)
+ if(size > 0)
{
// Strip off the subscribed destination name.
- string destinationName = this.destination.Name;
- string messageText = msgContent.Substring(destinationName.Length, msgContent.Length - destinationName.Length);
- return ToNmsMessage(messageText);
+ // TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
+ int msgStart = this.rawDestinationName.Length;
+ int msgLength = receivedMsg.Length - msgStart;
+ string msgContent = Encoding.UTF8.GetString(receivedMsg, msgStart, msgLength);
+ return ToNmsMessage(msgContent);
}
return null;
@@ -150,7 +161,7 @@
protected virtual void StopAsyncDelivery()
{
- lock(asyncDeliveryLock)
+ lock(this.asyncDeliveryLock)
{
this.asyncDelivery = false;
if(null != this.asyncDeliveryThread)
@@ -174,33 +185,49 @@
Debug.Assert(null == this.asyncDeliveryThread);
lock(this.asyncDeliveryLock)
{
+ this.asyncInit = false;
this.asyncDelivery = true;
- this.asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
+ this.asyncDeliveryThread = new Thread(new ThreadStart(MsgDispatchLoop));
this.asyncDeliveryThread.Name = string.Format("MsgConsumerAsync: {0}", this.destination.Name);
this.asyncDeliveryThread.IsBackground = true;
this.asyncDeliveryThread.Start();
+ while(!asyncInit)
+ {
+ Thread.Sleep(1);
+ }
}
}
- protected virtual void DispatchLoop()
+ protected virtual void MsgDispatchLoop()
{
Tracer.InfoFormat("Starting dispatcher thread consumer: {0}", this.asyncDeliveryThread.Name);
- TimeSpan receiveWait = TimeSpan.FromSeconds(3);
+ TimeSpan receiveWait = TimeSpan.FromSeconds(2);
+
+ // Signal that this thread has started.
+ asyncInit = true;
while(asyncDelivery)
{
try
{
IMessage message = Receive(receiveWait);
- if(asyncDelivery && message != null)
+
+ if(asyncDelivery)
{
- try
+ if(null != message)
{
- listener(message);
+ try
+ {
+ listener(message);
+ }
+ catch(Exception ex)
+ {
+ HandleAsyncException(ex);
+ }
}
- catch(Exception ex)
+ else
{
- HandleAsyncException(ex);
+ Thread.Sleep(0);
}
}
}
diff --git a/src/main/csharp/MessageProducer.cs b/src/main/csharp/MessageProducer.cs
index 3d70631..d1b3b18 100644
--- a/src/main/csharp/MessageProducer.cs
+++ b/src/main/csharp/MessageProducer.cs
@@ -30,9 +30,9 @@
public class MessageProducer : IMessageProducer
{
private readonly Session session;
- private IDestination destination;
+ private Destination destination;
- private MsgDeliveryMode deliveryMode;
+ private MsgDeliveryMode deliveryMode = MsgDeliveryMode.NonPersistent;
private TimeSpan timeToLive;
private MsgPriority priority;
private bool disableMessageID;
@@ -53,17 +53,18 @@
}
this.session = sess;
- this.destination = dest;
+ this.destination = (Destination) dest;
+ this.destination.InitSender();
}
public void Send(IMessage message)
{
- Send(this.Destination, message);
+ Send(this.destination, message);
}
public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
- Send(this.Destination, message, deliveryMode, priority, timeToLive);
+ Send(this.destination, message, deliveryMode, priority, timeToLive);
}
public void Send(IDestination dest, IMessage message)
@@ -94,7 +95,7 @@
Destination theDest = (Destination) dest;
string msg = theDest.Name + ((ITextMessage) message).Text;
- theDest.Send(Encoding.UTF8.GetBytes(msg), this.session.Connection.RequestTimeout);
+ theDest.Send(msg);
}
public void Dispose()
@@ -168,12 +169,6 @@
set { }
}
- public IDestination Destination
- {
- get { return this.destination; }
- set { this.destination = value; }
- }
-
public MsgPriority Priority
{
get { return this.priority; }
diff --git a/src/test/csharp/ZMQTest.cs b/src/test/csharp/ZMQTest.cs
index ddb6af1..b025564 100644
--- a/src/test/csharp/ZMQTest.cs
+++ b/src/test/csharp/ZMQTest.cs
@@ -24,7 +24,7 @@
[TestFixture]
public class ZMQTest : BaseTest
{
- private bool receivedTestMessage = false;
+ private int receivedMsgCount = 0;
[Test]
public void TestConnection()
@@ -132,46 +132,62 @@
[Test]
public void TestSendReceive(
+ // inproc, ipc, tcp, pgm, or epgm
+ [Values("zmq:tcp://localhost:5556", "zmq:inproc://localhost:5557")]
+ string connectionName,
[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
- string destination)
+ string destinationName)
{
- IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+ IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri(connectionName));
Assert.IsNotNull(factory, "Error creating connection factory.");
- this.receivedTestMessage = false;
+ this.receivedMsgCount = 0;
using(IConnection connection = factory.CreateConnection())
{
Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
using(ISession session = connection.CreateSession())
{
Assert.IsNotNull(session, "Error creating Session.");
- using(IDestination testDestination = session.GetDestination(destination))
+ using(IDestination testDestination = session.GetDestination(destinationName))
{
- Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+ Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destinationName);
using(IMessageConsumer consumer = session.CreateConsumer(testDestination))
{
- Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
- consumer.Listener += OnMessage;
- using(IMessageProducer producer = session.CreateProducer(testDestination))
+ Assert.IsNotNull(consumer, "Error creating consumer on {0}", destinationName);
+ int sendMsgCount = 0;
+ try
{
- Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);
- ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
- Assert.IsNotNull(testMsg, "Error creating test message.");
- producer.Send(testMsg);
- }
-
- // Wait for the message
- DateTime startWaitTime = DateTime.Now;
- TimeSpan maxWaitTime = TimeSpan.FromSeconds(5);
-
- while(!receivedTestMessage)
- {
- if((DateTime.Now - startWaitTime) > maxWaitTime)
+ consumer.Listener += OnMessage;
+ using(IMessageProducer producer = session.CreateProducer(testDestination))
{
- Assert.Fail("Timeout waiting for message receive.");
- }
+ Assert.IsNotNull(consumer, "Error creating producer on {0}", destinationName);
+ ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
+ Assert.IsNotNull(testMsg, "Error creating test message.");
- Thread.Sleep(5);
+ // Wait for the message
+ DateTime startWaitTime = DateTime.Now;
+ TimeSpan maxWaitTime = TimeSpan.FromSeconds(5);
+
+ // Continually send the message to compensate for the
+ // slow joiner problem inherent to spinning up the
+ // internal dispatching threads in ZeroMQ.
+ while(this.receivedMsgCount < 1)
+ {
+ ++sendMsgCount;
+ producer.Send(testMsg);
+ if((DateTime.Now - startWaitTime) > maxWaitTime)
+ {
+ Assert.Fail("Timeout waiting for message receive.");
+ }
+
+ Thread.Sleep(1);
+ }
+ }
+ }
+ finally
+ {
+ consumer.Listener -= OnMessage;
+ Console.WriteLine("Sent {0} msgs.\nReceived {1} msgs", sendMsgCount, this.receivedMsgCount);
}
}
}
@@ -188,7 +204,7 @@
Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
ITextMessage textMsg = (ITextMessage) message;
Assert.AreEqual(textMsg.Text, "Zero Message.");
- receivedTestMessage = true;
+ this.receivedMsgCount++;
}
}
}