Implement disposable pattern for Connection.
Only dispose the producer endpoint after its final release.
Add overloaded send/receive API for destinations.
Initialize the sockets on the correct message handler thread.
diff --git a/src/main/csharp/BaseMessage.cs b/src/main/csharp/BaseMessage.cs
index 17db55a..49ff291 100644
--- a/src/main/csharp/BaseMessage.cs
+++ b/src/main/csharp/BaseMessage.cs
@@ -29,7 +29,7 @@
 		private string correlationId;

 		private TimeSpan timeToLive;

 		private string messageId;

-		private MsgDeliveryMode deliveryMode;

+		private MsgDeliveryMode deliveryMode = MsgDeliveryMode.NonPersistent;

 		private MsgPriority priority;

 		private Destination replyTo;

 		private byte[] content;

diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index 3fb6142..4def1c5 100644
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -19,6 +19,7 @@
 using ZeroMQ;

 using System.Collections.Generic;

 using System.Text;

+using System.Collections;

 

 namespace Apache.NMS.ZMQ

 {

@@ -46,24 +47,87 @@
         /// <summary>

         /// ZMQ context

         /// </summary>

+		private static object contextLock = new object();

+		private static int instanceCount = 0;

 		private static ZmqContext _context;

-		private static Dictionary<string, ProducerRef> producerCache;

-		private static object producerCacheLock;

+		private static Dictionary<string, ProducerRef> producerCache = new Dictionary<string, ProducerRef>();

+		private static object producerCacheLock = new object();

+		private TimeSpan zeroTimeout = new TimeSpan(0);

 

-		static Connection()

+		private bool disposed = false;

+

+		private static void InitContext()

 		{

-			Connection._context = ZmqContext.Create();

-			Connection.producerCache = new Dictionary<string, ProducerRef>();

-			Connection.producerCacheLock = new object();

+			lock(contextLock)

+			{

+				if(0 == instanceCount++)

+				{

+					Connection._context = ZmqContext.Create();

+				}

+			}

+		}

+

+		private static void DestroyContext()

+		{

+			lock(contextLock)

+			{

+				if(0 == --instanceCount)

+				{

+					Connection._context.Dispose();

+				}

+			}

 		}

 

 		public Connection(Uri connectionUri)

 		{

+			InitContext();

 			this.brokerUri = connectionUri;

 			this.producerContextBinding = string.Format("{0}://*:{1}", this.brokerUri.Scheme, this.brokerUri.Port);

 			this.consumerContextBinding = string.Format("{0}://{1}:{2}", brokerUri.Scheme, brokerUri.Host, this.brokerUri.Port);

 		}

 

+		~Connection()

+		{

+			Dispose(false);

+		}

+

+		public void Dispose()

+		{

+			Dispose(true);

+			GC.SuppressFinalize(this);

+		}

+

+		private void Dispose(bool disposing)

+		{

+			if(disposed)

+			{

+				return;

+			}

+

+			if(disposing)

+			{

+				try

+				{

+					OnDispose();

+				}

+				catch(Exception ex)

+				{

+					Tracer.ErrorFormat("Exception disposing Connection {0}: {1}", this.brokerUri.AbsoluteUri, ex.Message);

+				}

+			}

+

+			disposed = true;

+		}

+

+		/// <summary>

+		/// Child classes can override this method to perform clean-up logic.

+		/// </summary>

+		protected virtual void OnDispose()

+		{

+			Close();

+			DestroyContext();

+		}

+

 		/// <summary>

         /// Starts message delivery for this connection.

         /// </summary>

@@ -147,12 +211,13 @@
 					{

 						producerCache.Remove(contextBinding);

 						producerRef.producer.Unbind(contextBinding);

+						producerRef.producer.Dispose();

 					}

 				}

 			}

 		}

 

-		internal ZmqSocket GetConsumer(Encoding encoding, string destinationName)

+		internal ZmqSocket GetConsumer()

 		{

 			ZmqSocket endpoint = this.Context.CreateSocket(SocketType.SUB);

 

@@ -160,8 +225,6 @@
 			{

 				throw new ResourceAllocationException();

 			}

-			endpoint.Subscribe(encoding.GetBytes(destinationName));

-			endpoint.Connect(GetConsumerBindingPath());

 

 			return endpoint;

 		}

@@ -169,6 +232,7 @@
 		internal void ReleaseConsumer(ZmqSocket endpoint)

 		{

 			endpoint.Disconnect(GetConsumerBindingPath());

+			endpoint.Dispose();

 		}

 

 		internal string GetProducerContextBinding()

@@ -176,16 +240,11 @@
 			return this.producerContextBinding;

 		}

 

-		private string GetConsumerBindingPath()

+		internal string GetConsumerBindingPath()

 		{

 			return this.consumerContextBinding;

 		}

 

-		public void Dispose()

-        {

-            Close();

-        }

-

         public void Close()

         {

             Stop();

diff --git a/src/main/csharp/Destination.cs b/src/main/csharp/Destination.cs
index 7178e94..2fb045f 100644
--- a/src/main/csharp/Destination.cs
+++ b/src/main/csharp/Destination.cs
@@ -18,6 +18,7 @@
 using System;

 using System.Text;

 using ZeroMQ;

+using System.Diagnostics;

 

 namespace Apache.NMS.ZMQ

 {

@@ -26,6 +27,8 @@
 	/// </summary>

 	public abstract class Destination : IDestination

 	{

+		public static Encoding encoding = Encoding.UTF8;

+

 		protected Session session;

 		/// <summary>

 		/// Socket object

@@ -92,14 +95,12 @@
 					this.session.Connection.ReleaseProducer(this.producerEndpoint);

 				}

 

-				this.producerEndpoint.Dispose();

 				this.producerEndpoint = null;

 			}

 

 			if(null != this.consumerEndpoint)

 			{

 				this.session.Connection.ReleaseConsumer(this.consumerEndpoint);

-				this.consumerEndpoint.Dispose();

 				this.consumerEndpoint = null;

 			}

 		}

@@ -178,35 +179,82 @@
 			get;

 		}

 

-		internal int Send(byte[] buffer, TimeSpan timeout)

+		internal void InitSender()

 		{

 			if(null == this.producerEndpoint)

 			{

 				this.producerEndpoint = this.session.Connection.GetProducer();

 			}

-

-			return this.producerEndpoint.Send(buffer, buffer.Length, SocketFlags.None, timeout);

 		}

 

-		internal string Receive(Encoding encoding, TimeSpan timeout)

+		internal void InitReceiver()

 		{

 			if(null == this.consumerEndpoint)

 			{

-				this.consumerEndpoint = this.session.Connection.GetConsumer(encoding, this.destinationName);

-			}

+				Connection connection = this.session.Connection;

 

-			return consumerEndpoint.Receive(encoding, timeout);

+				this.consumerEndpoint = connection.GetConsumer();

+				// Must subscribe first before connecting to the endpoint binding

+				this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName));

+				this.consumerEndpoint.Connect(connection.GetConsumerBindingPath());

+			}

+		}

+

+		internal void Subscribe(string prefixName)

+		{

+			InitReceiver();

+			this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName));

+		}

+

+		internal void Unsubscribe(string prefixName)

+		{

+			if(null != this.consumerEndpoint)

+			{

+				this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName));

+			}

+		}

+

+		internal SendStatus Send(string msg)

+		{

+			Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");

+			return this.producerEndpoint.Send(msg, Destination.encoding);

+		}

+

+		internal SendStatus Send(byte[] buffer)

+		{

+			Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");

+			return this.producerEndpoint.Send(buffer);

+		}

+

+		internal string ReceiveString(TimeSpan timeout)

+		{

+			this.InitReceiver();

+			return this.consumerEndpoint.Receive(Destination.encoding, timeout);

+		}

+

+		internal byte[] ReceiveBytes(TimeSpan timeout, out int size)

+		{

+			this.InitReceiver();

+			return this.consumerEndpoint.Receive(null, timeout, out size);

+		}

+

+		internal byte[] ReceiveBytes(SocketFlags flags, out int size)

+		{

+			this.InitReceiver();

+			return this.consumerEndpoint.Receive(null, flags, out size);

 		}

 

 		internal Frame ReceiveFrame()

 		{

 			// TODO: Implement

+			this.InitReceiver();

 			return null;

 		}

 

 		internal ZmqMessage ReceiveMessage()

 		{

 			// TODO: Implement

+			this.InitReceiver();

 			return null;

 		}

 	}

diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index a26ae57..486c992 100644
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -15,14 +15,10 @@
  * limitations under the License.

  */

 

-#define PUBSUB

-

 using System;

+using System.Diagnostics;

 using System.Text;

 using System.Threading;

-using Apache.NMS.Util;

-using ZeroMQ;

-using System.Diagnostics;

 

 namespace Apache.NMS.ZMQ

 {

@@ -31,7 +27,7 @@
 	/// </summary>

 	public class MessageConsumer : IMessageConsumer

 	{

-		protected TimeSpan zeroTimeout = new TimeSpan(0);

+		protected static readonly TimeSpan zeroTimeout = new TimeSpan(0);

 

 		private readonly Session session;

 		private readonly AcknowledgementMode acknowledgementMode;

@@ -41,6 +37,8 @@
 		private Thread asyncDeliveryThread = null;

 		private object asyncDeliveryLock = new object();

 		private bool asyncDelivery = false;

+		private bool asyncInit = false;

+		private byte[] rawDestinationName;

 

 		private ConsumerTransformerDelegate consumerTransformer;

 		public ConsumerTransformerDelegate ConsumerTransformer

@@ -60,29 +58,40 @@
 

 			this.session = sess;

 			this.destination = (Destination) dest;

+			this.rawDestinationName = Destination.encoding.GetBytes(this.destination.Name);

 			this.acknowledgementMode = ackMode;

 		}

 

+		private object listenerLock = new object();

 		public event MessageListener Listener

 		{

 			add

 			{

-				this.listener += value;

-				this.listenerCount++;

-				StartAsyncDelivery();

+				lock(listenerLock)

+				{

+					this.listener += value;

+					if(0 == this.listenerCount)

+					{

+						StartAsyncDelivery();

+					}

+

+					this.listenerCount++;

+				}

 			}

 

 			remove

 			{

-				if(this.listenerCount > 0)

+				lock(listenerLock)

 				{

 					this.listener -= value;

-					this.listenerCount--;

-				}

-

-				if(0 == listenerCount)

-				{

-					StopAsyncDelivery();

+					if(this.listenerCount > 0)

+					{

+						this.listenerCount--;

+						if(0 == this.listenerCount)

+						{

+							StopAsyncDelivery();

+						}

+					}

 				}

 			}

 		}

@@ -106,15 +115,17 @@
 		/// </returns>

 		public IMessage Receive(TimeSpan timeout)

 		{

-			// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)

-			string msgContent = this.destination.Receive(Encoding.UTF8, timeout);

+			int size;

+			byte[] receivedMsg = this.destination.ReceiveBytes(timeout, out size);

 

-			if(null != msgContent)

+			if(size > 0)

 			{

 				// Strip off the subscribed destination name.

-				string destinationName = this.destination.Name;

-				string messageText = msgContent.Substring(destinationName.Length, msgContent.Length - destinationName.Length);

-				return ToNmsMessage(messageText);

+				// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)

+				int msgStart = this.rawDestinationName.Length;

+				int msgLength = receivedMsg.Length - msgStart;

+				string msgContent = Encoding.UTF8.GetString(receivedMsg, msgStart, msgLength);

+				return ToNmsMessage(msgContent);

 			}

 

 			return null;

@@ -150,7 +161,7 @@
 

 		protected virtual void StopAsyncDelivery()

 		{

-			lock(asyncDeliveryLock)

+			lock(this.asyncDeliveryLock)

 			{

 				this.asyncDelivery = false;

 				if(null != this.asyncDeliveryThread)

@@ -174,33 +185,49 @@
 			Debug.Assert(null == this.asyncDeliveryThread);

 			lock(this.asyncDeliveryLock)

 			{

+				this.asyncInit = false;

 				this.asyncDelivery = true;

-				this.asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));

+				this.asyncDeliveryThread = new Thread(new ThreadStart(MsgDispatchLoop));

 				this.asyncDeliveryThread.Name = string.Format("MsgConsumerAsync: {0}", this.destination.Name);

 				this.asyncDeliveryThread.IsBackground = true;

 				this.asyncDeliveryThread.Start();

+				while(!asyncInit)

+				{

+					Thread.Sleep(1);

+				}

 			}

 		}

 

-		protected virtual void DispatchLoop()

+		protected virtual void MsgDispatchLoop()

 		{

 			Tracer.InfoFormat("Starting dispatcher thread consumer: {0}", this.asyncDeliveryThread.Name);

-			TimeSpan receiveWait = TimeSpan.FromSeconds(3);

+			TimeSpan receiveWait = TimeSpan.FromSeconds(2);

+

+			// Signal that this thread has started.

+			asyncInit = true;

 

 			while(asyncDelivery)

 			{

 				try

 				{

 					IMessage message = Receive(receiveWait);

-					if(asyncDelivery && message != null)

+

+					if(asyncDelivery)

 					{

-						try

+						if(null != message)

 						{

-							listener(message);

+							try

+							{

+								listener(message);

+							}

+							catch(Exception ex)

+							{

+								HandleAsyncException(ex);

+							}

 						}

-						catch(Exception ex)

+						else

 						{

-							HandleAsyncException(ex);

+							Thread.Sleep(0);

 						}

 					}

 				}

diff --git a/src/main/csharp/MessageProducer.cs b/src/main/csharp/MessageProducer.cs
index 3d70631..d1b3b18 100644
--- a/src/main/csharp/MessageProducer.cs
+++ b/src/main/csharp/MessageProducer.cs
@@ -30,9 +30,9 @@
 	public class MessageProducer : IMessageProducer

 	{

 		private readonly Session session;

-		private IDestination destination;

+		private Destination destination;

 

-		private MsgDeliveryMode deliveryMode;

+		private MsgDeliveryMode deliveryMode = MsgDeliveryMode.NonPersistent;

 		private TimeSpan timeToLive;

 		private MsgPriority priority;

 		private bool disableMessageID;

@@ -53,17 +53,18 @@
 			}

 

 			this.session = sess;

-			this.destination = dest;

+			this.destination = (Destination) dest;

+			this.destination.InitSender();

 		}

 

 		public void Send(IMessage message)

 		{

-			Send(this.Destination, message);

+			Send(this.destination, message);

 		}

 

 		public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)

 		{

-			Send(this.Destination, message, deliveryMode, priority, timeToLive);

+			Send(this.destination, message, deliveryMode, priority, timeToLive);

 		}

 

 		public void Send(IDestination dest, IMessage message)

@@ -94,7 +95,7 @@
 			Destination theDest = (Destination) dest;

 

 			string msg = theDest.Name + ((ITextMessage) message).Text;

-			theDest.Send(Encoding.UTF8.GetBytes(msg), this.session.Connection.RequestTimeout);

+			theDest.Send(msg);

 		}

 

 		public void Dispose()

@@ -168,12 +169,6 @@
 			set { }

 		}

 

-		public IDestination Destination

-		{

-			get { return this.destination; }

-			set { this.destination = value; }

-		}

-

 		public MsgPriority Priority

 		{

 			get { return this.priority; }

diff --git a/src/test/csharp/ZMQTest.cs b/src/test/csharp/ZMQTest.cs
index ddb6af1..b025564 100644
--- a/src/test/csharp/ZMQTest.cs
+++ b/src/test/csharp/ZMQTest.cs
@@ -24,7 +24,7 @@
 	[TestFixture]

 	public class ZMQTest : BaseTest

 	{

-		private bool receivedTestMessage = false;

+		private int receivedMsgCount = 0;

 

 		[Test]

 		public void TestConnection()

@@ -132,46 +132,62 @@
 

 		[Test]

 		public void TestSendReceive(

+			// inproc, ipc, tcp, pgm, or epgm

+			[Values("zmq:tcp://localhost:5556", "zmq:inproc://localhost:5557")]

+			string connectionName,

 			[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]

-			string destination)

+			string destinationName)

 		{

-			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));

+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri(connectionName));

 			Assert.IsNotNull(factory, "Error creating connection factory.");

 

-			this.receivedTestMessage = false;

+			this.receivedMsgCount = 0;

 			using(IConnection connection = factory.CreateConnection())

 			{

 				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");

 				using(ISession session = connection.CreateSession())

 				{

 					Assert.IsNotNull(session, "Error creating Session.");

-					using(IDestination testDestination = session.GetDestination(destination))

+					using(IDestination testDestination = session.GetDestination(destinationName))

 					{

-						Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);

+						Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destinationName);

 						using(IMessageConsumer consumer = session.CreateConsumer(testDestination))

 						{

-							Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);

-							consumer.Listener += OnMessage;

-							using(IMessageProducer producer = session.CreateProducer(testDestination))

+							Assert.IsNotNull(consumer, "Error creating consumer on {0}", destinationName);

+							int sendMsgCount = 0;

+							try

 							{

-								Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);

-								ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");

-								Assert.IsNotNull(testMsg, "Error creating test message.");

-								producer.Send(testMsg);

-							}

-

-							// Wait for the message

-							DateTime startWaitTime = DateTime.Now;

-							TimeSpan maxWaitTime = TimeSpan.FromSeconds(5);

-

-							while(!receivedTestMessage)

-							{

-								if((DateTime.Now - startWaitTime) > maxWaitTime)

+								consumer.Listener += OnMessage;

+								using(IMessageProducer producer = session.CreateProducer(testDestination))

 								{

-									Assert.Fail("Timeout waiting for message receive.");

-								}

+									Assert.IsNotNull(consumer, "Error creating producer on {0}", destinationName);

+									ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");

+									Assert.IsNotNull(testMsg, "Error creating test message.");

 

-								Thread.Sleep(5);

+									// Wait for the message

+									DateTime startWaitTime = DateTime.Now;

+									TimeSpan maxWaitTime = TimeSpan.FromSeconds(5);

+

+									// Continually send the message to compensate for the

+									// slow joiner problem inherent to spinning up the

+									// internal dispatching threads in ZeroMQ.

+									while(this.receivedMsgCount < 1)

+									{

+										++sendMsgCount;

+										producer.Send(testMsg);

+										if((DateTime.Now - startWaitTime) > maxWaitTime)

+										{

+											Assert.Fail("Timeout waiting for message receive.");

+										}

+

+										Thread.Sleep(1);

+									}

+								}

+							}

+							finally

+							{

+								consumer.Listener -= OnMessage;

+								Console.WriteLine("Sent {0} msgs.\nReceived {1} msgs", sendMsgCount, this.receivedMsgCount);

 							}

 						}

 					}

@@ -188,7 +204,7 @@
 			Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");

 			ITextMessage textMsg = (ITextMessage) message;

 			Assert.AreEqual(textMsg.Text, "Zero Message.");

-			receivedTestMessage = true;

+			this.receivedMsgCount++;

 		}

 	}

 }