Refactoring to support multiple producers and consumers.  Fixed wire protocol format. Added many new unit tests to validate the refactoring, and to give example usage.
diff --git a/src/main/csharp/CommonAssemblyInfo.cs b/src/main/csharp/CommonAssemblyInfo.cs
index 0523c0e..09dc050 100644
--- a/src/main/csharp/CommonAssemblyInfo.cs
+++ b/src/main/csharp/CommonAssemblyInfo.cs
@@ -13,7 +13,7 @@
 //------------------------------------------------------------------------------

 

 [assembly: ComVisibleAttribute(false)]

-[assembly: CLSCompliantAttribute(true)]

+[assembly: CLSCompliantAttribute(false)]

 [assembly: AssemblyTitleAttribute("Apache NMS for ZMQ Class Library")]

 [assembly: AssemblyDescriptionAttribute("Apache NMS for ZMQ Class Library (.Net Messaging Library Implementation): An imp" +

     "lementation of the NMS API for ZMQ")]

diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index 7dfb45c..3fb6142 100644
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -17,6 +17,8 @@
 

 using System;

 using ZeroMQ;

+using System.Collections.Generic;

+using System.Text;

 

 namespace Apache.NMS.ZMQ

 {

@@ -26,19 +28,43 @@
     ///

     public class Connection : IConnection

     {

+		private class ProducerRef

+		{

+			public ZmqSocket producer = null;

+			public int refCount = 1;

+		}

+

         private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;

         private IRedeliveryPolicy redeliveryPolicy;

         private ConnectionMetaData metaData = null;

         private bool closed = true;

         private string clientId;

         private Uri brokerUri;

+		private string producerContextBinding;

+		private string consumerContextBinding;

 

         /// <summary>

         /// ZMQ context

         /// </summary>

-		private ZmqContext _context = ZmqContext.Create();

+		private static ZmqContext _context;

+		private static Dictionary<string, ProducerRef> producerCache;

+		private static object producerCacheLock;

 

-        /// <summary>

+		static Connection()

+		{

+			Connection._context = ZmqContext.Create();

+			Connection.producerCache = new Dictionary<string, ProducerRef>();

+			Connection.producerCacheLock = new object();

+		}

+

+		public Connection(Uri connectionUri)

+		{

+			this.brokerUri = connectionUri;

+			this.producerContextBinding = string.Format("{0}://*:{1}", this.brokerUri.Scheme, this.brokerUri.Port);

+			this.consumerContextBinding = string.Format("{0}://{1}:{2}", brokerUri.Scheme, brokerUri.Host, this.brokerUri.Port);

+		}

+

+		/// <summary>

         /// Starts message delivery for this connection.

         /// </summary>

         public void Start()

@@ -79,7 +105,83 @@
             return new Session(this, mode);

         }

 

-        public void Dispose()

+		internal ZmqSocket GetProducer()

+		{

+			ProducerRef producerRef;

+			string contextBinding = GetProducerContextBinding();

+

+			lock(producerCacheLock)

+			{

+				if(!producerCache.TryGetValue(contextBinding, out producerRef))

+				{

+					producerRef = new ProducerRef();

+					producerRef.producer = this.Context.CreateSocket(SocketType.PUB);

+					if(null == producerRef.producer)

+					{

+						throw new ResourceAllocationException();

+					}

+					producerRef.producer.Bind(contextBinding);

+					producerCache.Add(contextBinding, producerRef);

+				}

+				else

+				{

+					producerRef.refCount++;

+				}

+			}

+

+			return producerRef.producer;

+		}

+

+		internal void ReleaseProducer(ZmqSocket endpoint)

+		{

+			// UNREFERENCED_PARAM(endpoint);

+			ProducerRef producerRef;

+			string contextBinding = GetProducerContextBinding();

+

+			lock(producerCacheLock)

+			{

+				if(producerCache.TryGetValue(contextBinding, out producerRef))

+				{

+					producerRef.refCount--;

+					if(producerRef.refCount < 1)

+					{

+						producerCache.Remove(contextBinding);

+						producerRef.producer.Unbind(contextBinding);

+					}

+				}

+			}

+		}

+

+		internal ZmqSocket GetConsumer(Encoding encoding, string destinationName)

+		{

+			ZmqSocket endpoint = this.Context.CreateSocket(SocketType.SUB);

+

+			if(null == endpoint)

+			{

+				throw new ResourceAllocationException();

+			}

+			endpoint.Subscribe(encoding.GetBytes(destinationName));

+			endpoint.Connect(GetConsumerBindingPath());

+

+			return endpoint;

+		}

+

+		internal void ReleaseConsumer(ZmqSocket endpoint)

+		{

+			endpoint.Disconnect(GetConsumerBindingPath());

+		}

+

+		internal string GetProducerContextBinding()

+		{

+			return this.producerContextBinding;

+		}

+

+		private string GetConsumerBindingPath()

+		{

+			return this.consumerContextBinding;

+		}

+

+		public void Dispose()

         {

             Close();

         }

@@ -87,7 +189,17 @@
         public void Close()

         {

             Stop();

-        }

+

+			lock(producerCacheLock)

+			{

+				foreach(KeyValuePair<string, ProducerRef> cacheItem in producerCache)

+				{

+					cacheItem.Value.producer.Unbind(cacheItem.Key);

+				}

+

+				producerCache.Clear();

+			}

+		}

 

         public void PurgeTempDestinations()

         {

@@ -114,7 +226,6 @@
         public Uri BrokerUri

         {

             get { return brokerUri; }

-            set { brokerUri = value; }

         }

 

         /// <summary>

@@ -154,7 +265,7 @@
         /// </summary>

         internal ZmqContext Context

         {

-            get { return _context; }

+            get { return Connection._context; }

         }

 

         /// <summary>

diff --git a/src/main/csharp/ConnectionFactory.cs b/src/main/csharp/ConnectionFactory.cs
index 41cd808..127ba1f 100644
--- a/src/main/csharp/ConnectionFactory.cs
+++ b/src/main/csharp/ConnectionFactory.cs
@@ -16,6 +16,7 @@
  */

 using System;

 using Apache.NMS.Policies;

+using Apache.NMS.Util;

 

 namespace Apache.NMS.ZMQ

 {

@@ -25,7 +26,7 @@
 	public class ConnectionFactory : IConnectionFactory

 	{

 		private Uri brokerUri;

-		private string clientID;

+		private string clientId;

 		private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();

 

 		private const string DEFAULT_BROKER_URL = "tcp://localhost:5556";

@@ -36,20 +37,35 @@
 		{

 		}

 

-		public ConnectionFactory(string brokerUri)

-			: this(brokerUri, null)

+		public ConnectionFactory(string rawBrokerUri)

+			: this(rawBrokerUri, null)

 		{

 		}

 

-		public ConnectionFactory(string brokerUri, string clientID)

-			: this(new Uri(brokerUri), clientID)

+		public ConnectionFactory(string rawBrokerUri, string clientID)

+			: this(URISupport.CreateCompatibleUri(rawBrokerUri), clientID)

 		{

 		}

 

-		public ConnectionFactory(Uri brokerUri, string clientID)

+		public ConnectionFactory(Uri rawBrokerUri)

+			: this(rawBrokerUri, null)

 		{

-			this.brokerUri = brokerUri;

-			this.clientID = clientID;

+		}

+

+		public ConnectionFactory(Uri rawBrokerUri, string clientID)

+		{

+			this.BrokerUri = rawBrokerUri;

+			if(this.BrokerUri.Port < 1)

+			{

+				throw new NMSConnectionException("Missing connection port number.");

+			}

+

+			if(null == clientID)

+			{

+				clientID = Guid.NewGuid().ToString();

+			}

+

+			this.ClientId = clientID;

 		}

 

 		/// <summary>

@@ -93,17 +109,13 @@
 		/// </summary>

 		public IConnection CreateConnection(string userName, string password, bool useLogging)

 		{

-			IConnection ReturnValue = null;

-			Connection connection = new Connection();

+			Connection connection = new Connection(this.BrokerUri);

 

 			connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;

 			connection.ConsumerTransformer = this.consumerTransformer;

 			connection.ProducerTransformer = this.producerTransformer;

-			connection.BrokerUri = this.BrokerUri;

-			connection.ClientId = this.clientID;

-			ReturnValue = connection;

-

-			return ReturnValue;

+			connection.ClientId = this.ClientId;

+			return connection;

 		}

 

 		/// <summary>

@@ -111,8 +123,18 @@
 		/// </summary>

 		public Uri BrokerUri

 		{

-			get { return brokerUri; }

-			set { brokerUri = value; }

+			get { return this.brokerUri; }

+			set

+			{

+				Tracer.InfoFormat("BrokerUri set {0}", value.OriginalString);

+				this.brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "zmq:"));

+			}

+		}

+

+		public string ClientId

+		{

+			get { return this.clientId; }

+			set { this.clientId = value; }

 		}

 

 		/// <summary>

diff --git a/src/main/csharp/Destination.cs b/src/main/csharp/Destination.cs
index 9fce88b..79d54a8 100644
--- a/src/main/csharp/Destination.cs
+++ b/src/main/csharp/Destination.cs
@@ -16,6 +16,8 @@
  */

 

 using System;

+using System.Text;

+using ZeroMQ;

 

 namespace Apache.NMS.ZMQ

 {

@@ -24,38 +26,43 @@
 	/// </summary>

 	public abstract class Destination : IDestination

 	{

-

-		private String name = "";

-

+		protected Session session;

 		/// <summary>

-		/// The Default Constructor

+		/// Socket object

 		/// </summary>

-		protected Destination()

-		{

-		}

+		protected ZmqSocket producerEndpoint = null;

+		protected ZmqSocket consumerEndpoint = null;

+		protected string destinationName;

 

 		/// <summary>

 		/// Construct the Destination with a defined physical name.

 		/// </summary>

 		/// <param name="name"></param>

-		protected Destination(String destName)

+		protected Destination(Session session, string destName)

 		{

-			Name = destName;

+			this.session = session;

+			this.destinationName = destName;

 		}

 

-		public String Name

+		~Destination()

 		{

-			get { return this.name; }

-			set

+			// TODO: Implement IDisposable pattern

+			if(null != this.producerEndpoint)

 			{

-				this.name = value;

-				if(!this.name.Contains("\\"))

-				{

-					// Destinations must have paths in them.  If no path specified, then

-					// default to local machine.

-					this.name = ".\\" + this.name;

-				}

+				this.session.Connection.ReleaseProducer(this.producerEndpoint);

+				this.producerEndpoint = null;

 			}

+

+			if(null != this.consumerEndpoint)

+			{

+				this.session.Connection.ReleaseConsumer(this.consumerEndpoint);

+				this.consumerEndpoint = null;

+			}

+		}

+

+		public string Name

+		{

+			get { return this.destinationName; }

 		}

 

 		public bool IsTopic

@@ -88,21 +95,26 @@
 		/// <summary>

 		/// </summary>

 		/// <returns>string representation of this instance</returns>

-		public override String ToString()

+		public override string ToString()

+		{

+			return MakeUriString(this.destinationName);

+		}

+

+		private string MakeUriString(string destName)

 		{

 			switch(DestinationType)

 			{

 			case DestinationType.Topic:

-				return "topic://" + Name;

+				return "topic://" + destName;

 

 			case DestinationType.TemporaryTopic:

-				return "temp-topic://" + Name;

+				return "temp-topic://" + destName;

 

 			case DestinationType.TemporaryQueue:

-				return "temp-queue://" + Name;

+				return "temp-queue://" + destName;

 

 			default:

-				return "queue://" + Name;

+				return "queue://" + destName;

 			}

 		}

 

@@ -114,10 +126,7 @@
 		{

 			int answer = 37;

 

-			if(this.name != null)

-			{

-				answer = name.GetHashCode();

-			}

+			answer = this.Name.GetHashCode();

 

 			if(IsTopic)

 			{

@@ -140,7 +149,7 @@
 			{

 				Destination other = (Destination) obj;

 				result = (this.DestinationType == other.DestinationType

-							&& this.name.Equals(other.name));

+							&& this.Name.Equals(other.Name));

 			}

 

 			return result;

@@ -150,6 +159,38 @@
 		{

 			get;

 		}

+

+		internal int Send(byte[] buffer, TimeSpan timeout)

+		{

+			if(null == this.producerEndpoint)

+			{

+				this.producerEndpoint = this.session.Connection.GetProducer();

+			}

+

+			return this.producerEndpoint.Send(buffer, buffer.Length, SocketFlags.None, timeout);

+		}

+

+		internal string Receive(Encoding encoding, TimeSpan timeout)

+		{

+			if(null == this.consumerEndpoint)

+			{

+				this.consumerEndpoint = this.session.Connection.GetConsumer(encoding, this.destinationName);

+			}

+

+			return consumerEndpoint.Receive(encoding, timeout);

+		}

+

+		internal Frame ReceiveFrame()

+		{

+			// TODO: Implement

+			return null;

+		}

+

+		internal ZmqMessage ReceiveMessage()

+		{

+			// TODO: Implement

+			return null;

+		}

 	}

 }

 

diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index 85752cd..9fa0027 100644
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -15,13 +15,14 @@
  * limitations under the License.

  */

 

+#define PUBSUB

+

 using System;

 using System.Text;

 using System.Threading;

 using Apache.NMS.Util;

 using ZeroMQ;

-//using ZSendRecvOpt = ZMQ.SendRecvOpt;

-//using ZSocketType = ZeroMQ.SocketType;

+using System.Diagnostics;

 

 namespace Apache.NMS.ZMQ

 {

@@ -34,20 +35,12 @@
 

 		private readonly Session session;

 		private readonly AcknowledgementMode acknowledgementMode;

-		/// <summary>

-		/// Socket object

-		/// </summary>

-		private ZmqSocket messageSubscriber = null;

-		/// <summary>

-		/// Context binding string

-		/// </summary>

-		private string contextBinding;

-		private Queue destination;

+		private Destination destination;

 		private event MessageListener listener;

 		private int listenerCount = 0;

 		private Thread asyncDeliveryThread = null;

-		private AutoResetEvent pause = new AutoResetEvent(false);

-		private Atomic<bool> asyncDelivery = new Atomic<bool>(false);

+		private object asyncDeliveryLock = new object();

+		private bool asyncDelivery = false;

 

 		private ConsumerTransformerDelegate consumerTransformer;

 		public ConsumerTransformerDelegate ConsumerTransformer

@@ -56,39 +49,18 @@
 			set { this.consumerTransformer = value; }

 		}

 

-		public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, IDestination destination, string selector)

+		public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, IDestination dest, string selector)

 		{

+			// UNUSED_PARAM(selector);		// Selectors are not currently supported

+

 			if(null == session.Connection.Context)

 			{

 				throw new NMSConnectionException();

 			}

 

 			this.session = session;

+			this.destination = (Destination) dest;

 			this.acknowledgementMode = acknowledgementMode;

-			this.messageSubscriber = session.Connection.Context.CreateSocket(SocketType.SUB);

-			if(null == this.messageSubscriber)

-			{

-				throw new ResourceAllocationException();

-			}

-

-			string clientId = session.Connection.ClientId;

-

-			this.contextBinding = session.Connection.BrokerUri.LocalPath;

-			this.destination = new Queue(this.contextBinding);

-			if(!string.IsNullOrEmpty(clientId))

-			{

-				this.messageSubscriber.Identity = Encoding.Unicode.GetBytes(clientId);

-			}

-

-			this.messageSubscriber.Connect(contextBinding);

-			byte[] prefix = null;

-

-			if(!string.IsNullOrWhiteSpace(selector))

-			{

-				prefix = Encoding.ASCII.GetBytes(selector);

-			}

-

-			this.messageSubscriber.Subscribe(prefix);

 		}

 

 		public event MessageListener Listener

@@ -123,8 +95,7 @@
 		/// </returns>

 		public IMessage Receive()

 		{

-			// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)

-			return ToNmsMessage(messageSubscriber.Receive(Encoding.ASCII));

+			return Receive(TimeSpan.MaxValue);

 		}

 

 		/// <summary>

@@ -136,7 +107,17 @@
 		public IMessage Receive(TimeSpan timeout)

 		{

 			// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)

-			return ToNmsMessage(messageSubscriber.Receive(Encoding.ASCII, timeout));

+			string msgContent = this.destination.Receive(Encoding.UTF8, timeout);

+

+			if(null != msgContent)

+			{

+				// Strip off the subscribed destination name.

+				string destinationName = this.destination.Name;

+				string messageText = msgContent.Substring(destinationName.Length, msgContent.Length - destinationName.Length);

+				return ToNmsMessage(messageText);

+			}

+

+			return null;

 		}

 

 		/// <summary>

@@ -164,21 +145,18 @@
 		public void Close()

 		{

 			StopAsyncDelivery();

-			if(null != messageSubscriber)

-			{

-				messageSubscriber.Dispose();

-				messageSubscriber = null;

-			}

+			this.destination = null;

 		}

 

 		protected virtual void StopAsyncDelivery()

 		{

-			if(asyncDelivery.CompareAndSet(true, false))

+			lock(asyncDeliveryLock)

 			{

+				asyncDelivery = false;

 				if(null != asyncDeliveryThread)

 				{

 					Tracer.Info("Stopping async delivery thread.");

-					pause.Set();

+					asyncDeliveryThread.Interrupt();

 					if(!asyncDeliveryThread.Join(10000))

 					{

 						Tracer.Info("Aborting async delivery thread.");

@@ -193,10 +171,12 @@
 

 		protected virtual void StartAsyncDelivery()

 		{

-			if(asyncDelivery.CompareAndSet(false, true))

+			Debug.Assert(null == asyncDeliveryThread);

+			lock(asyncDeliveryLock)

 			{

+				asyncDelivery = true;

 				asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));

-				asyncDeliveryThread.Name = "Message Consumer Dispatch: " + contextBinding;

+				asyncDeliveryThread.Name = string.Format("MsgConsumerAsync: {0}", this.destination.Name);

 				asyncDeliveryThread.IsBackground = true;

 				asyncDeliveryThread.Start();

 			}

@@ -205,21 +185,22 @@
 		protected virtual void DispatchLoop()

 		{

 			Tracer.Info("Starting dispatcher thread consumer: " + this);

+			TimeSpan receiveWait = TimeSpan.FromSeconds(3);

 

-			while(asyncDelivery.Value)

+			while(asyncDelivery)

 			{

 				try

 				{

-					IMessage message = Receive();

-					if(asyncDelivery.Value && message != null)

+					IMessage message = Receive(receiveWait);

+					if(asyncDelivery && message != null)

 					{

 						try

 						{

 							listener(message);

 						}

-						catch(Exception e)

+						catch(Exception ex)

 						{

-							HandleAsyncException(e);

+							HandleAsyncException(ex);

 						}

 					}

 				}

@@ -233,7 +214,7 @@
 					Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);

 				}

 			}

-			Tracer.Info("Stopping dispatcher thread consumer: " + this);

+			Tracer.Info("Stopped dispatcher thread consumer: " + this);

 		}

 

 		protected virtual void HandleAsyncException(Exception e)

@@ -252,6 +233,7 @@
 		/// </returns>

 		protected virtual IMessage ToNmsMessage(string messageText)

 		{

+			// Strip off the destination name prefix.

 			IMessage nmsMessage = new TextMessage(messageText);

 

 			try

diff --git a/src/main/csharp/MessageProducer.cs b/src/main/csharp/MessageProducer.cs
index 4d1c56e..4b1db8f 100644
--- a/src/main/csharp/MessageProducer.cs
+++ b/src/main/csharp/MessageProducer.cs
@@ -15,10 +15,13 @@
  * limitations under the License.

  */

 

+#define PUBSUB

+

 using System;

 using System.Text;

 using ZeroMQ;

 

+

 namespace Apache.NMS.ZMQ

 {

 	/// <summary>

@@ -29,10 +32,6 @@
 		private readonly Session session;

 		private IDestination destination;

 

-		/// <summary>

-		/// Socket object

-		/// </summary>

-		private ZmqSocket messageProducer = null;

 		private MsgDeliveryMode deliveryMode;

 		private TimeSpan timeToLive;

 		private MsgPriority priority;

@@ -46,24 +45,15 @@
 			set { this.producerTransformer = value; }

 		}

 

-		public MessageProducer(Connection connection, Session session, IDestination destination)

+		public MessageProducer(Session session, IDestination dest)

 		{

-			if(null == connection.Context)

+			if(null == session.Connection.Context)

 			{

 				throw new NMSConnectionException();

 			}

 

 			this.session = session;

-			this.destination = destination;

-			this.messageProducer = connection.Context.CreateSocket(SocketType.SUB);

-

-			string clientId = connection.ClientId;

-			if(!string.IsNullOrEmpty(clientId))

-			{

-				this.messageProducer.Identity = Encoding.Unicode.GetBytes(clientId);

-			}

-

-			this.messageProducer.Connect(connection.BrokerUri.LocalPath);

+			this.destination = dest;

 		}

 

 		public void Send(IMessage message)

@@ -76,13 +66,17 @@
 			Send(Destination, message, deliveryMode, priority, timeToLive);

 		}

 

-		public void Send(IDestination destination, IMessage message)

+		public void Send(IDestination dest, IMessage message)

 		{

-			Send(destination, message, DeliveryMode, Priority, TimeToLive);

+			Send(dest, message, DeliveryMode, Priority, TimeToLive);

 		}

 

-		public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)

+		public void Send(IDestination dest, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)

 		{

+			// UNUSED_PARAM(deliveryMode);	// No concept of different delivery modes in ZMQ

+			// UNUSED_PARAM(priority);		// No concept of priority messages in ZMQ

+			// UNUSED_PARAM(timeToLive);	// No concept of time-to-live in ZMQ

+

 			if(null != this.ProducerTransformer)

 			{

 				IMessage transformedMessage = ProducerTransformer(this.session, this, message);

@@ -94,7 +88,13 @@
 			}

 

 			// TODO: Support encoding of all message types + all meta data (e.g., headers and properties)

-			messageProducer.Send(((ITextMessage) message).Text, Encoding.ASCII);

+

+			// Prefix the message with the destination name. The client will subscribe to this destination name

+			// in order to receive messages.

+			Destination destination = (Destination) dest;

+

+			string msg = destination.Name + ((ITextMessage) message).Text;

+			destination.Send(Encoding.UTF8.GetBytes(msg), this.session.Connection.RequestTimeout);

 		}

 

 		public void Dispose()

@@ -104,11 +104,6 @@
 

 		public void Close()

 		{

-			if(null != messageProducer)

-			{

-				messageProducer.Dispose();

-				messageProducer = null;

-			}

 		}

 

 		public IMessage CreateMessage()

diff --git a/src/main/csharp/Queue.cs b/src/main/csharp/Queue.cs
index 4d90580..b2b0dd6 100644
--- a/src/main/csharp/Queue.cs
+++ b/src/main/csharp/Queue.cs
@@ -24,13 +24,8 @@
 	/// </summary>

 	public class Queue : Destination, IQueue

 	{

-		public Queue()

-			: base()

-		{

-		}

-

-		public Queue(String name)

-			: base(name)

+		public Queue(Session session, string name)

+			: base(session, name)

 		{

 		}

 

diff --git a/src/main/csharp/Session.cs b/src/main/csharp/Session.cs
index 7cc02b6..bcde99c 100644
--- a/src/main/csharp/Session.cs
+++ b/src/main/csharp/Session.cs
@@ -61,7 +61,7 @@
 

         public IMessageProducer CreateProducer(IDestination destination)

         {

-            return new MessageProducer(connection, this, destination);

+            return new MessageProducer(this, destination);

         }

         #endregion

 

@@ -106,22 +106,22 @@
 

         public IQueue GetQueue(string name)

         {

-            return new Queue(name);

+            return new Queue(this, name);

         }

 

         public ITopic GetTopic(string name)

         {

-            return new Topic(name);

+			return new Topic(this, name);

         }

 

         public ITemporaryQueue CreateTemporaryQueue()

         {

-            return new TemporaryQueue();

+            return new TemporaryQueue(this);

         }

 

         public ITemporaryTopic CreateTemporaryTopic()

         {

-            return new TemporaryTopic();

+            return new TemporaryTopic(this);

         }

 

         /// <summary>

diff --git a/src/main/csharp/TemporaryQueue.cs b/src/main/csharp/TemporaryQueue.cs
index 44845e4..8a82ccc 100644
--- a/src/main/csharp/TemporaryQueue.cs
+++ b/src/main/csharp/TemporaryQueue.cs
@@ -24,13 +24,8 @@
 	/// </summary>

 	public class TemporaryQueue : Destination, ITemporaryQueue

 	{

-		public TemporaryQueue()

-			: base()

-		{

-		}

-

-		public TemporaryQueue(String name)

-			: base(name)

+		public TemporaryQueue(Session session)

+			: base(session, Guid.NewGuid().ToString())

 		{

 		}

 

@@ -39,15 +34,6 @@
 			get { return DestinationType.TemporaryQueue; }

 		}

 

-		#region ITemporaryQueue Members

-

-		public void Delete()

-		{

-			// Nothing to delete.  Resources are cleaned up automatically.

-		}

-

-		#endregion

-

 		#region IQueue Members

 

 		public string QueueName

@@ -56,5 +42,14 @@
 		}

 

 		#endregion

+

+		#region ITemporaryQueue Members

+

+		public void Delete()

+		{

+			// Nothing to delete.  Resources are cleaned up automatically.

+		}

+

+		#endregion

 	}

 }

diff --git a/src/main/csharp/TemporaryTopic.cs b/src/main/csharp/TemporaryTopic.cs
index 3d50bed..556832e 100644
--- a/src/main/csharp/TemporaryTopic.cs
+++ b/src/main/csharp/TemporaryTopic.cs
@@ -24,13 +24,8 @@
 	/// </summary>

 	public class TemporaryTopic : Destination, ITemporaryTopic

 	{

-		public TemporaryTopic()

-			: base()

-		{

-		}

-

-		public TemporaryTopic(String name)

-			: base(name)

+		public TemporaryTopic(Session session)

+			: base(session, Guid.NewGuid().ToString())

 		{

 		}

 

diff --git a/src/main/csharp/TextMessage.cs b/src/main/csharp/TextMessage.cs
index a7bfca1..c1c723d 100644
--- a/src/main/csharp/TextMessage.cs
+++ b/src/main/csharp/TextMessage.cs
@@ -16,13 +16,12 @@
  */

 

 using System;

+using System.Text;

 

 namespace Apache.NMS.ZMQ

 {

 	public class TextMessage : BaseMessage, ITextMessage

 	{

-		public const int SIZE_OF_INT = 4; // sizeof(int) - though causes unsafe issues with net 1.1

-

 		private String text;

 

 		public TextMessage()

@@ -34,68 +33,13 @@
 			this.Text = text;

 		}

 

-

 		// Properties

 

 		public string Text

 		{

-			get

-			{

-				if(text == null)

-				{

-					// now lets read the content

-					byte[] data = this.Content;

-					if(data != null)

-					{

-						// TODO assume that the text is ASCII

-						char[] chars = new char[data.Length - SIZE_OF_INT];

-						for(int i = 0; i < chars.Length; i++)

-						{

-							chars[i] = (char) data[i + SIZE_OF_INT];

-						}

-						text = new String(chars);

-					}

-				}

-				return text;

-			}

-

-			set

-			{

-				this.text = value;

-				byte[] data = null;

-				if(text != null)

-				{

-					// TODO assume that the text is ASCII

-

-					byte[] sizePrefix = System.BitConverter.GetBytes(text.Length);

-					data = new byte[text.Length + sizePrefix.Length];  //int at the front of it

-

-					// add the size prefix

-					for(int j = 0; j < sizePrefix.Length; j++)

-					{

-						// The bytes need to be encoded in big endian

-						if(BitConverter.IsLittleEndian)

-						{

-							data[j] = sizePrefix[sizePrefix.Length - j - 1];

-						}

-						else

-						{

-							data[j] = sizePrefix[j];

-						}

-					}

-

-					// Add the data.

-					char[] chars = text.ToCharArray();

-					for(int i = 0; i < chars.Length; i++)

-					{

-						data[i + sizePrefix.Length] = (byte) chars[i];

-					}

-				}

-				this.Content = data;

-

-			}

+			get { return text; }

+			set { this.text = value; }

 		}

-

 	}

 }

 

diff --git a/src/main/csharp/Topic.cs b/src/main/csharp/Topic.cs
index 3f50c01..7419866 100644
--- a/src/main/csharp/Topic.cs
+++ b/src/main/csharp/Topic.cs
@@ -24,13 +24,8 @@
 	/// </summary>

 	public class Topic : Destination, ITopic

 	{

-		public Topic()

-			: base()

-		{

-		}

-

-		public Topic(String name)

-			: base(name)

+		public Topic(Session session, String name)

+			: base(session, name)

 		{

 		}

 

diff --git a/src/main/csharp/Utils.cs b/src/main/csharp/Utils.cs
new file mode 100644
index 0000000..8f4ab80
--- /dev/null
+++ b/src/main/csharp/Utils.cs
@@ -0,0 +1,40 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one or more

+ * contributor license agreements.  See the NOTICE file distributed with

+ * this work for additional information regarding copyright ownership.

+ * The ASF licenses this file to You under the Apache License, Version 2.0

+ * (the "License"); you may not use this file except in compliance with

+ * the License.  You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+using System;

+using System.Collections.Generic;

+using System.Linq;

+using System.Text;

+

+namespace Apache.NMS.ZMQ

+{

+	public class ZMQUtils

+	{

+

+		public static string GetDestinationName(IDestination destination)

+		{

+			switch(destination.DestinationType)

+			{

+			case DestinationType.Topic: return ((Topic) destination).TopicName;

+			case DestinationType.Queue: return ((Queue) destination).QueueName;

+			case DestinationType.TemporaryTopic: return ((TemporaryTopic) destination).TopicName;

+			case DestinationType.TemporaryQueue: return ((TemporaryQueue) destination).QueueName;

+			default: return string.Empty;

+			}

+		}

+	}

+}

diff --git a/src/test/csharp/BaseTest.cs b/src/test/csharp/BaseTest.cs
new file mode 100644
index 0000000..f6196b2
--- /dev/null
+++ b/src/test/csharp/BaseTest.cs
@@ -0,0 +1,66 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one or more

+ * contributor license agreements.  See the NOTICE file distributed with

+ * this work for additional information regarding copyright ownership.

+ * The ASF licenses this file to You under the Apache License, Version 2.0

+ * (the "License"); you may not use this file except in compliance with

+ * the License.  You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+using System;

+using System.IO;

+using System.Threading;

+using NUnit.Framework;

+

+namespace Apache.NMS.ZMQ

+{

+	/// <summary>

+	/// Use to test and verify ZMQ behavior

+	/// </summary>

+	public class BaseTest

+	{

+		[TestFixtureSetUp]

+		public void TestFixtureSetup()

+		{

+			////////////////////////////

+			// Dependencies check

+			////////////////////////////

+			string libFolder = Environment.CurrentDirectory;

+			string libFileName;

+

+			libFileName = Path.Combine(libFolder, "clrzmq.dll");

+			Assert.IsTrue(File.Exists(libFileName), "Missing zmq wrapper file: {0}", libFileName);

+			libFileName = Path.Combine(libFolder, "libzmq.dll");

+			Assert.IsTrue(File.Exists(libFileName), "Missing zmq library file: {0}", libFileName);

+			libFileName = Path.Combine(libFolder, "libzmq64.dll");

+			Assert.IsTrue(File.Exists(libFileName), "Missing 64-bit zmq library file: {0}", libFileName);

+			libFileName = Path.Combine(libFolder, "Apache.NMS.dll");

+			Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS library file: {0}", libFileName);

+			libFileName = Path.Combine(libFolder, "Apache.NMS.ZMQ.dll");

+			Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS.ZMQ library file: {0}", libFileName);

+		}

+

+		[SetUp]

+		public void SetUp()

+		{

+			// Setup before each test

+		}

+

+		[TearDown]

+		public void TearDown()

+		{

+			// Clean up after each test

+		}

+	}

+}

+

+

+

diff --git a/src/test/csharp/CommonAssemblyInfo.cs b/src/test/csharp/CommonAssemblyInfo.cs
index ebd3f6d..20d4a99 100644
--- a/src/test/csharp/CommonAssemblyInfo.cs
+++ b/src/test/csharp/CommonAssemblyInfo.cs
@@ -13,7 +13,7 @@
 //------------------------------------------------------------------------------

 

 [assembly: ComVisibleAttribute(false)]

-[assembly: CLSCompliantAttribute(true)]

+[assembly: CLSCompliantAttribute(false)]

 [assembly: AssemblyTitleAttribute("Apache NMS for ZMQ Class Library")]

 [assembly: AssemblyDescriptionAttribute("Apache NMS for ZMQ Class Library (.Net Messaging Library Implementation): An imp" +

     "lementation of the NMS API for ZMQ")]

diff --git a/src/test/csharp/FactoryTests.cs b/src/test/csharp/FactoryTests.cs
new file mode 100644
index 0000000..94ff505
--- /dev/null
+++ b/src/test/csharp/FactoryTests.cs
@@ -0,0 +1,96 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one or more

+ * contributor license agreements.  See the NOTICE file distributed with

+ * this work for additional information regarding copyright ownership.

+ * The ASF licenses this file to You under the Apache License, Version 2.0

+ * (the "License"); you may not use this file except in compliance with

+ * the License.  You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+using System;

+using NUnit.Framework;

+

+namespace Apache.NMS.ZMQ

+{

+	[TestFixture]

+	public class FactoryTests : BaseTest

+	{

+		[Test]

+		public void TestFactory()

+		{

+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+			Assert.IsInstanceOf<ConnectionFactory>(factory, "Wrong factory type.");

+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");

+		}

+

+		[Test]

+		public void TestFactoryClientId()

+		{

+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"), "MyClientId");

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+			Assert.IsInstanceOf<ConnectionFactory>(factory, "Wrong factory type.");

+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");

+			ConnectionFactory zmqConnectionFactory = (ConnectionFactory) factory;

+			Assert.AreEqual(zmqConnectionFactory.ClientId, "MyClientId", "Wrong client Id.");

+		}

+

+		[Test, ExpectedException(typeof(NMSConnectionException))]

+		public void TestFactoryUriMissingPort()

+		{

+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost"));

+		}

+

+		[Test]

+		public void TestFactoryDefault()

+		{

+			IConnectionFactory factory = new ConnectionFactory();

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong default port.");

+		}

+

+		[Test]

+		public void TestFactoryDirectString()

+		{

+			IConnectionFactory factory = new ConnectionFactory("tcp://localhost:5556");

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");

+		}

+

+		[Test]

+		public void TestFactoryDirectStringClientId()

+		{

+			IConnectionFactory factory = new ConnectionFactory("tcp://localhost:5556", "DirectClientId");

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");

+			ConnectionFactory zmqConnectionFactory = (ConnectionFactory) factory;

+			Assert.AreEqual(zmqConnectionFactory.ClientId, "DirectClientId", "Wrong client Id.");

+		}

+

+		[Test]

+		public void TestFactoryDirectUri()

+		{

+			IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:5556"));

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");

+		}

+

+		[Test]

+		public void TestFactoryDirectUriClientId()

+		{

+			IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:5556"), "DirectClientId");

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");

+			ConnectionFactory zmqConnectionFactory = (ConnectionFactory) factory;

+			Assert.AreEqual(zmqConnectionFactory.ClientId, "DirectClientId", "Wrong client Id.");

+		}

+	}

+}

diff --git a/src/test/csharp/MultiProducersMultiConsumers.cs b/src/test/csharp/MultiProducersMultiConsumers.cs
new file mode 100644
index 0000000..dac3af7
--- /dev/null
+++ b/src/test/csharp/MultiProducersMultiConsumers.cs
@@ -0,0 +1,155 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one or more

+ * contributor license agreements.  See the NOTICE file distributed with

+ * this work for additional information regarding copyright ownership.

+ * The ASF licenses this file to You under the Apache License, Version 2.0

+ * (the "License"); you may not use this file except in compliance with

+ * the License.  You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+using System;

+using System.Threading;

+using NUnit.Framework;

+

+namespace Apache.NMS.ZMQ

+{

+	[TestFixture]

+	public class ProducerConsumers : BaseTest

+	{

+		private int totalMsgCountToReceive = 0;

+

+		private class ConsumerTracker

+		{

+			public IMessageConsumer consumer;

+			public int msgCount = 0;

+		}

+

+		[Test]

+		public void TestMultipleProducersConsumer(

+			[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]

+			string destination,

+			[Values(1, 3)]

+			int numProducers,

+			[Values(1, 3)]

+			int numConsumers)

+		{

+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+

+			using(IConnection connection = factory.CreateConnection())

+			{

+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");

+				using(ISession session = connection.CreateSession())

+				{

+					Assert.IsNotNull(session, "Error creating Session.");

+					IDestination testDestination = session.GetDestination(destination);

+					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);

+

+					// Track the number of messages we should receive

+					this.totalMsgCountToReceive = numProducers * numConsumers;

+

+					ConsumerTracker[] consumerTrackers = null;

+					IMessageProducer[] producers = null;

+

+					try

+					{

+						// Create the consumers

+						consumerTrackers = new ConsumerTracker[numConsumers];

+						for(int index = 0; index < numConsumers; index++)

+						{

+							ConsumerTracker tracker = new ConsumerTracker();

+							tracker.consumer = session.CreateConsumer(testDestination);

+							Assert.IsNotNull(tracker.consumer, "Error creating consumer #{0} on {1}", index, destination);

+							tracker.consumer.Listener += (message) =>

+								{

+									Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");

+									ITextMessage textMsg = (ITextMessage) message;

+									Assert.AreEqual(textMsg.Text, "Zero Message.");

+									tracker.msgCount++;

+								};

+							consumerTrackers[index] = tracker;

+						}

+

+						// Create the producers

+						producers = new IMessageProducer[numProducers];

+						for(int index = 0; index < numProducers; index++)

+						{

+							producers[index] = session.CreateProducer(testDestination);

+							Assert.IsNotNull(producers[index], "Error creating producer #{0} on {1}", index, destination);

+						}

+

+						// Send the messages

+						for(int index = 0; index < numProducers; index++)

+						{

+							ITextMessage testMsg = producers[index].CreateTextMessage("Zero Message.");

+							Assert.IsNotNull(testMsg, "Error creating test message for producer #{0}.", index);

+							producers[index].Send(testMsg);

+						}

+

+						// Wait for the message

+						DateTime startWaitTime = DateTime.Now;

+						TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);

+

+						while(GetNumMsgsReceived(consumerTrackers) < this.totalMsgCountToReceive)

+						{

+							if((DateTime.Now - startWaitTime) > maxWaitTime)

+							{

+								Assert.Fail("Timeout waiting for message receive.");

+							}

+

+							Thread.Sleep(5);

+						}

+

+						// Sleep for an extra 2 seconds to see if any extra messages get delivered

+						Thread.Sleep(2 * 1000);

+						Assert.AreEqual(this.totalMsgCountToReceive, GetNumMsgsReceived(consumerTrackers), "Received too many messages.");

+					}

+					finally

+					{

+

+						// Clean up the producers

+						if(null != producers)

+						{

+							foreach(IMessageProducer producer in producers)

+							{

+								producer.Dispose();

+							}

+						}

+

+						// Clean up the consumers

+						if(null != consumerTrackers)

+						{

+							foreach(ConsumerTracker tracker in consumerTrackers)

+							{

+								tracker.consumer.Dispose();

+							}

+						}

+					}

+				}

+			}

+		}

+

+		private int GetNumMsgsReceived(ConsumerTracker[] consumerTrackers)

+		{

+			int numMsgs = 0;

+

+			foreach(ConsumerTracker tracker in consumerTrackers)

+			{

+				numMsgs += tracker.msgCount;

+			}

+

+			return numMsgs;

+		}

+	}

+}

+

+

+

diff --git a/src/test/csharp/ZMQTest.cs b/src/test/csharp/ZMQTest.cs
index 0abdf12..4bf09d6 100644
--- a/src/test/csharp/ZMQTest.cs
+++ b/src/test/csharp/ZMQTest.cs
@@ -16,114 +16,156 @@
  */

 

 using System;

-using System.IO;

 using System.Threading;

 using NUnit.Framework;

 

 namespace Apache.NMS.ZMQ

 {

-	/// <summary>

-	/// Use to test and verify ZMQ behavior

-	/// </summary>

 	[TestFixture]

-	public class ZMQTest

+	public class ZMQTest : BaseTest

 	{

 		private bool receivedTestMessage = true;

 

-		[SetUp]

-		public void SetUp()

+		[Test]

+		public void TestConnection()

 		{

-			// Setup before each test

-		}

-

-		[TearDown]

-		public void TearDown()

-		{

-			// Clean up after each test

+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+			using(IConnection connection = factory.CreateConnection())

+			{

+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");

+				Assert.IsInstanceOf<Connection>(connection, "Wrong connection type.");

+			}

 		}

 

 		[Test]

-		public void TestReceive()

+		public void TestSession()

 		{

-			////////////////////////////

-			// Dependencies check

-			////////////////////////////

-			string libFolder = System.Environment.CurrentDirectory;

-			string libFileName;

-

-			libFileName = Path.Combine(libFolder, "clrzmq.dll");

-			Assert.IsTrue(File.Exists(libFileName), "Missing zmq wrapper file: {0}", libFileName);

-			libFileName = Path.Combine(libFolder, "libzmq.dll");

-			Assert.IsTrue(File.Exists(libFileName), "Missing zmq library file: {0}", libFileName);

-			libFileName = Path.Combine(libFolder, "libzmq64.dll");

-			Assert.IsTrue(File.Exists(libFileName), "Missing 64-bit zmq library file: {0}", libFileName);

-			libFileName = Path.Combine(libFolder, "Apache.NMS.dll");

-			Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS library file: {0}", libFileName);

-			libFileName = Path.Combine(libFolder, "Apache.NMS.ZMQ.dll");

-			Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS.ZMQ library file: {0}", libFileName);

-

-			////////////////////////////

-			// Factory check

-			////////////////////////////

-			IConnectionFactory factory = new ConnectionFactory("tcp://localhost:5556", "");

+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));

 			Assert.IsNotNull(factory, "Error creating connection factory.");

-

-			////////////////////////////

-			// Connection check

-			////////////////////////////

-			IConnection connection = null;

-			try

+			using(IConnection connection = factory.CreateConnection())

 			{

-				connection = factory.CreateConnection();

-				Assert.IsNotNull(connection, "problem creating connection class, usually problem with libzmq and clrzmq ");

-			}

-			catch(System.Exception ex1)

-			{

-				Assert.Fail("Problem creating connection, make sure dependencies are present. Error: {0}", ex1.Message);

-			}

-

-			////////////////////////////

-			// Session check

-			////////////////////////////

-			ISession session = connection.CreateSession();

-			// Is session good?

-			Assert.IsNotNull(session, "Error creating Session.");

-

-			////////////////////////////

-			// Consumer check

-			////////////////////////////

-			IQueue testQueue = session.GetQueue("ZMQTestQueue");

-			Assert.IsNotNull(testQueue, "Error creating test queue.");

-			IMessageConsumer consumer = session.CreateConsumer(testQueue);

-			Assert.IsNotNull(consumer, "Error creating consumer.");

-

-			consumer.Listener += OnMessage;

-

-			////////////////////////////

-			// Producer check

-			////////////////////////////

-			IMessageProducer producer = session.CreateProducer(testQueue);

-			Assert.IsNotNull(consumer, "Error creating producer.");

-

-			ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");

-			Assert.IsNotNull(testMsg, "Error creating test message.");

-

-			producer.Send(testMsg);

-

-			////////////////////////////

-			// Listener check

-			////////////////////////////

-			DateTime startWaitTime = DateTime.Now;

-			TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);

-

-			while(!receivedTestMessage)

-			{

-				if((DateTime.Now - startWaitTime) > maxWaitTime)

+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");

+				using(ISession session = connection.CreateSession())

 				{

-					Assert.Fail("Timeout waiting for message receive.");

+					Assert.IsNotNull(session, "Error creating session.");

+					Assert.IsInstanceOf<Session>(session, "Wrong session type.");

 				}

+			}

+		}

 

-				Thread.Sleep(5);

+		[Test, Sequential]

+		public void TestDestinations(

+			[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]

+			string destination,

+			[Values(typeof(Queue), typeof(Topic), typeof(TemporaryQueue), typeof(TemporaryTopic))]

+			Type destinationType)

+		{

+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+			using(IConnection connection = factory.CreateConnection())

+			{

+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");

+				using(ISession session = connection.CreateSession())

+				{

+					Assert.IsNotNull(session, "Error creating session.");

+					IDestination testDestination = session.GetDestination(destination);

+					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);

+					Assert.IsInstanceOf(destinationType, testDestination, "Wrong destintation type.");

+				}

+			}

+		}

+

+		[Test]

+		public void TestProducers(

+			[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]

+			string destination)

+		{

+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+			using(IConnection connection = factory.CreateConnection())

+			{

+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");

+				using(ISession session = connection.CreateSession())

+				{

+					Assert.IsNotNull(session, "Error creating session.");

+					IDestination testDestination = session.GetDestination(destination);

+					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);

+					using(IMessageProducer producer = session.CreateProducer(testDestination))

+					{

+						Assert.IsNotNull(producer, "Error creating producer on {0}", destination);

+						Assert.IsInstanceOf<MessageProducer>(producer, "Wrong producer type.");

+					}

+				}

+			}

+		}

+

+		[Test]

+		public void TestConsumers(

+			[Values("queue://ZMQTestQueue:", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]

+			string destination)

+		{

+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+			using(IConnection connection = factory.CreateConnection())

+			{

+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");

+				using(ISession session = connection.CreateSession())

+				{

+					Assert.IsNotNull(session, "Error creating session.");

+					IDestination testDestination = session.GetDestination(destination);

+					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);

+					using(IMessageConsumer consumer = session.CreateConsumer(testDestination))

+					{

+						Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);

+						Assert.IsInstanceOf<MessageConsumer>(consumer, "Wrong consumer type.");

+					}

+				}

+			}

+		}

+

+		[Test]

+		public void TestSendReceive(

+			[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]

+			string destination)

+		{

+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));

+			Assert.IsNotNull(factory, "Error creating connection factory.");

+			using(IConnection connection = factory.CreateConnection())

+			{

+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");

+				using(ISession session = connection.CreateSession())

+				{

+					Assert.IsNotNull(session, "Error creating Session.");

+					IDestination testDestination = session.GetDestination(destination);

+					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);

+					using(IMessageConsumer consumer = session.CreateConsumer(testDestination))

+					{

+						Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);

+						consumer.Listener += OnMessage;

+						using(IMessageProducer producer = session.CreateProducer(testDestination))

+						{

+							Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);

+							ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");

+							Assert.IsNotNull(testMsg, "Error creating test message.");

+							producer.Send(testMsg);

+						}

+

+						// Wait for the message

+						DateTime startWaitTime = DateTime.Now;

+						TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);

+

+						while(!receivedTestMessage)

+						{

+							if((DateTime.Now - startWaitTime) > maxWaitTime)

+							{

+								Assert.Fail("Timeout waiting for message receive.");

+							}

+

+							Thread.Sleep(5);

+						}

+					}

+				}

 			}

 		}

 

@@ -133,13 +175,10 @@
 		/// <param name="message"></param>

 		private void OnMessage(IMessage message)

 		{

-			Assert.IsInstanceOf<ITextMessage>(message, "Wrong message type received.");

+			Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");

 			ITextMessage textMsg = (ITextMessage) message;

 			Assert.AreEqual(textMsg.Text, "Zero Message.");

 			receivedTestMessage = true;

 		}

 	}

 }

-

-

-

diff --git a/vs2010-zmq-net-4.0-test.csproj b/vs2010-zmq-net-4.0-test.csproj
index 8494ee4..3a58166 100644
--- a/vs2010-zmq-net-4.0-test.csproj
+++ b/vs2010-zmq-net-4.0-test.csproj
@@ -48,6 +48,9 @@
   </ItemGroup>

   <ItemGroup>

     <Compile Include="src\test\csharp\CommonAssemblyInfo.cs" />

+    <Compile Include="src\test\csharp\BaseTest.cs" />

+    <Compile Include="src\test\csharp\FactoryTests.cs" />

+    <Compile Include="src\test\csharp\MultiProducersMultiConsumers.cs" />

     <Compile Include="src\test\csharp\ZMQTest.cs" />

   </ItemGroup>

   <ItemGroup>

diff --git a/vs2010-zmq-net-4.0.csproj b/vs2010-zmq-net-4.0.csproj
index bdfdc0d..f41958b 100644
--- a/vs2010-zmq-net-4.0.csproj
+++ b/vs2010-zmq-net-4.0.csproj
@@ -23,6 +23,8 @@
     <RegisterForComInterop>false</RegisterForComInterop>

     <PlatformTarget>AnyCPU</PlatformTarget>

     <TreatWarningsAsErrors>false</TreatWarningsAsErrors>

+    <NoWarn>

+    </NoWarn>

   </PropertyGroup>

   <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">

     <DebugSymbols>true</DebugSymbols>

@@ -31,6 +33,8 @@
     <DefineConstants>TRACE;NET</DefineConstants>

     <AllowUnsafeBlocks>false</AllowUnsafeBlocks>

     <DebugType>full</DebugType>

+    <NoWarn>

+    </NoWarn>

   </PropertyGroup>

   <ItemGroup>

     <Reference Include="Apache.NMS, Version=1.5.0.2363, Culture=neutral, PublicKeyToken=82756feee3957618, processorArchitecture=MSIL">

@@ -69,6 +73,7 @@
       <SubType>Code</SubType>

     </Compile>

     <Compile Include="src\main\csharp\TextMessage.cs" />

+    <Compile Include="src\main\csharp\Utils.cs" />

   </ItemGroup>

   <ItemGroup>

     <Content Include="lib\clrzmq\net-4.0\libzmq.dll">