Refactoring to support multiple producers and consumers. Fixed wire protocol format. Added many new unit tests to validate the refactoring, and to give example usage.
diff --git a/src/main/csharp/CommonAssemblyInfo.cs b/src/main/csharp/CommonAssemblyInfo.cs
index 0523c0e..09dc050 100644
--- a/src/main/csharp/CommonAssemblyInfo.cs
+++ b/src/main/csharp/CommonAssemblyInfo.cs
@@ -13,7 +13,7 @@
//------------------------------------------------------------------------------
[assembly: ComVisibleAttribute(false)]
-[assembly: CLSCompliantAttribute(true)]
+[assembly: CLSCompliantAttribute(false)]
[assembly: AssemblyTitleAttribute("Apache NMS for ZMQ Class Library")]
[assembly: AssemblyDescriptionAttribute("Apache NMS for ZMQ Class Library (.Net Messaging Library Implementation): An imp" +
"lementation of the NMS API for ZMQ")]
diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index 7dfb45c..3fb6142 100644
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -17,6 +17,8 @@
using System;
using ZeroMQ;
+using System.Collections.Generic;
+using System.Text;
namespace Apache.NMS.ZMQ
{
@@ -26,19 +28,43 @@
///
public class Connection : IConnection
{
+ private class ProducerRef
+ {
+ public ZmqSocket producer = null;
+ public int refCount = 1;
+ }
+
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private IRedeliveryPolicy redeliveryPolicy;
private ConnectionMetaData metaData = null;
private bool closed = true;
private string clientId;
private Uri brokerUri;
+ private string producerContextBinding;
+ private string consumerContextBinding;
/// <summary>
/// ZMQ context
/// </summary>
- private ZmqContext _context = ZmqContext.Create();
+ private static ZmqContext _context;
+ private static Dictionary<string, ProducerRef> producerCache;
+ private static object producerCacheLock;
- /// <summary>
+ static Connection()
+ {
+ Connection._context = ZmqContext.Create();
+ Connection.producerCache = new Dictionary<string, ProducerRef>();
+ Connection.producerCacheLock = new object();
+ }
+
+ public Connection(Uri connectionUri)
+ {
+ this.brokerUri = connectionUri;
+ this.producerContextBinding = string.Format("{0}://*:{1}", this.brokerUri.Scheme, this.brokerUri.Port);
+ this.consumerContextBinding = string.Format("{0}://{1}:{2}", brokerUri.Scheme, brokerUri.Host, this.brokerUri.Port);
+ }
+
+ /// <summary>
/// Starts message delivery for this connection.
/// </summary>
public void Start()
@@ -79,7 +105,83 @@
return new Session(this, mode);
}
- public void Dispose()
+ internal ZmqSocket GetProducer()
+ {
+ ProducerRef producerRef;
+ string contextBinding = GetProducerContextBinding();
+
+ lock(producerCacheLock)
+ {
+ if(!producerCache.TryGetValue(contextBinding, out producerRef))
+ {
+ producerRef = new ProducerRef();
+ producerRef.producer = this.Context.CreateSocket(SocketType.PUB);
+ if(null == producerRef.producer)
+ {
+ throw new ResourceAllocationException();
+ }
+ producerRef.producer.Bind(contextBinding);
+ producerCache.Add(contextBinding, producerRef);
+ }
+ else
+ {
+ producerRef.refCount++;
+ }
+ }
+
+ return producerRef.producer;
+ }
+
+ internal void ReleaseProducer(ZmqSocket endpoint)
+ {
+ // UNREFERENCED_PARAM(endpoint);
+ ProducerRef producerRef;
+ string contextBinding = GetProducerContextBinding();
+
+ lock(producerCacheLock)
+ {
+ if(producerCache.TryGetValue(contextBinding, out producerRef))
+ {
+ producerRef.refCount--;
+ if(producerRef.refCount < 1)
+ {
+ producerCache.Remove(contextBinding);
+ producerRef.producer.Unbind(contextBinding);
+ }
+ }
+ }
+ }
+
+ internal ZmqSocket GetConsumer(Encoding encoding, string destinationName)
+ {
+ ZmqSocket endpoint = this.Context.CreateSocket(SocketType.SUB);
+
+ if(null == endpoint)
+ {
+ throw new ResourceAllocationException();
+ }
+ endpoint.Subscribe(encoding.GetBytes(destinationName));
+ endpoint.Connect(GetConsumerBindingPath());
+
+ return endpoint;
+ }
+
+ internal void ReleaseConsumer(ZmqSocket endpoint)
+ {
+ endpoint.Disconnect(GetConsumerBindingPath());
+ }
+
+ internal string GetProducerContextBinding()
+ {
+ return this.producerContextBinding;
+ }
+
+ private string GetConsumerBindingPath()
+ {
+ return this.consumerContextBinding;
+ }
+
+ public void Dispose()
{
Close();
}
@@ -87,7 +189,17 @@
public void Close()
{
Stop();
- }
+
+ lock(producerCacheLock)
+ {
+ foreach(KeyValuePair<string, ProducerRef> cacheItem in producerCache)
+ {
+ cacheItem.Value.producer.Unbind(cacheItem.Key);
+ }
+
+ producerCache.Clear();
+ }
+ }
public void PurgeTempDestinations()
{
@@ -114,7 +226,6 @@
public Uri BrokerUri
{
get { return brokerUri; }
- set { brokerUri = value; }
}
/// <summary>
@@ -154,7 +265,7 @@
/// </summary>
internal ZmqContext Context
{
- get { return _context; }
+ get { return Connection._context; }
}
/// <summary>
diff --git a/src/main/csharp/ConnectionFactory.cs b/src/main/csharp/ConnectionFactory.cs
index 41cd808..127ba1f 100644
--- a/src/main/csharp/ConnectionFactory.cs
+++ b/src/main/csharp/ConnectionFactory.cs
@@ -16,6 +16,7 @@
*/
using System;
using Apache.NMS.Policies;
+using Apache.NMS.Util;
namespace Apache.NMS.ZMQ
{
@@ -25,7 +26,7 @@
public class ConnectionFactory : IConnectionFactory
{
private Uri brokerUri;
- private string clientID;
+ private string clientId;
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
private const string DEFAULT_BROKER_URL = "tcp://localhost:5556";
@@ -36,20 +37,35 @@
{
}
- public ConnectionFactory(string brokerUri)
- : this(brokerUri, null)
+ public ConnectionFactory(string rawBrokerUri)
+ : this(rawBrokerUri, null)
{
}
- public ConnectionFactory(string brokerUri, string clientID)
- : this(new Uri(brokerUri), clientID)
+ public ConnectionFactory(string rawBrokerUri, string clientID)
+ : this(URISupport.CreateCompatibleUri(rawBrokerUri), clientID)
{
}
- public ConnectionFactory(Uri brokerUri, string clientID)
+ public ConnectionFactory(Uri rawBrokerUri)
+ : this(rawBrokerUri, null)
{
- this.brokerUri = brokerUri;
- this.clientID = clientID;
+ }
+
+ public ConnectionFactory(Uri rawBrokerUri, string clientID)
+ {
+ this.BrokerUri = rawBrokerUri;
+ if(this.BrokerUri.Port < 1)
+ {
+ throw new NMSConnectionException("Missing connection port number.");
+ }
+
+ if(null == clientID)
+ {
+ clientID = Guid.NewGuid().ToString();
+ }
+
+ this.ClientId = clientID;
}
/// <summary>
@@ -93,17 +109,13 @@
/// </summary>
public IConnection CreateConnection(string userName, string password, bool useLogging)
{
- IConnection ReturnValue = null;
- Connection connection = new Connection();
+ Connection connection = new Connection(this.BrokerUri);
connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
connection.ConsumerTransformer = this.consumerTransformer;
connection.ProducerTransformer = this.producerTransformer;
- connection.BrokerUri = this.BrokerUri;
- connection.ClientId = this.clientID;
- ReturnValue = connection;
-
- return ReturnValue;
+ connection.ClientId = this.ClientId;
+ return connection;
}
/// <summary>
@@ -111,8 +123,18 @@
/// </summary>
public Uri BrokerUri
{
- get { return brokerUri; }
- set { brokerUri = value; }
+ get { return this.brokerUri; }
+ set
+ {
+ Tracer.InfoFormat("BrokerUri set {0}", value.OriginalString);
+ this.brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "zmq:"));
+ }
+ }
+
+ public string ClientId
+ {
+ get { return this.clientId; }
+ set { this.clientId = value; }
}
/// <summary>
diff --git a/src/main/csharp/Destination.cs b/src/main/csharp/Destination.cs
index 9fce88b..79d54a8 100644
--- a/src/main/csharp/Destination.cs
+++ b/src/main/csharp/Destination.cs
@@ -16,6 +16,8 @@
*/
using System;
+using System.Text;
+using ZeroMQ;
namespace Apache.NMS.ZMQ
{
@@ -24,38 +26,43 @@
/// </summary>
public abstract class Destination : IDestination
{
-
- private String name = "";
-
+ protected Session session;
/// <summary>
- /// The Default Constructor
+ /// Socket object
/// </summary>
- protected Destination()
- {
- }
+ protected ZmqSocket producerEndpoint = null;
+ protected ZmqSocket consumerEndpoint = null;
+ protected string destinationName;
/// <summary>
/// Construct the Destination with a defined physical name.
/// </summary>
/// <param name="name"></param>
- protected Destination(String destName)
+ protected Destination(Session session, string destName)
{
- Name = destName;
+ this.session = session;
+ this.destinationName = destName;
}
- public String Name
+ ~Destination()
{
- get { return this.name; }
- set
+ // TODO: Implement IDisposable pattern
+ if(null != this.producerEndpoint)
{
- this.name = value;
- if(!this.name.Contains("\\"))
- {
- // Destinations must have paths in them. If no path specified, then
- // default to local machine.
- this.name = ".\\" + this.name;
- }
+ this.session.Connection.ReleaseProducer(this.producerEndpoint);
+ this.producerEndpoint = null;
}
+
+ if(null != this.consumerEndpoint)
+ {
+ this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
+ this.consumerEndpoint = null;
+ }
+ }
+
+ public string Name
+ {
+ get { return this.destinationName; }
}
public bool IsTopic
@@ -88,21 +95,26 @@
/// <summary>
/// </summary>
/// <returns>string representation of this instance</returns>
- public override String ToString()
+ public override string ToString()
+ {
+ return MakeUriString(this.destinationName);
+ }
+
+ private string MakeUriString(string destName)
{
switch(DestinationType)
{
case DestinationType.Topic:
- return "topic://" + Name;
+ return "topic://" + destName;
case DestinationType.TemporaryTopic:
- return "temp-topic://" + Name;
+ return "temp-topic://" + destName;
case DestinationType.TemporaryQueue:
- return "temp-queue://" + Name;
+ return "temp-queue://" + destName;
default:
- return "queue://" + Name;
+ return "queue://" + destName;
}
}
@@ -114,10 +126,7 @@
{
int answer = 37;
- if(this.name != null)
- {
- answer = name.GetHashCode();
- }
+ answer = this.Name.GetHashCode();
if(IsTopic)
{
@@ -140,7 +149,7 @@
{
Destination other = (Destination) obj;
result = (this.DestinationType == other.DestinationType
- && this.name.Equals(other.name));
+ && this.Name.Equals(other.Name));
}
return result;
@@ -150,6 +159,38 @@
{
get;
}
+
+ internal int Send(byte[] buffer, TimeSpan timeout)
+ {
+ if(null == this.producerEndpoint)
+ {
+ this.producerEndpoint = this.session.Connection.GetProducer();
+ }
+
+ return this.producerEndpoint.Send(buffer, buffer.Length, SocketFlags.None, timeout);
+ }
+
+ internal string Receive(Encoding encoding, TimeSpan timeout)
+ {
+ if(null == this.consumerEndpoint)
+ {
+ this.consumerEndpoint = this.session.Connection.GetConsumer(encoding, this.destinationName);
+ }
+
+ return consumerEndpoint.Receive(encoding, timeout);
+ }
+
+ internal Frame ReceiveFrame()
+ {
+ // TODO: Implement
+ return null;
+ }
+
+ internal ZmqMessage ReceiveMessage()
+ {
+ // TODO: Implement
+ return null;
+ }
}
}
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index 85752cd..9fa0027 100644
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -15,13 +15,14 @@
* limitations under the License.
*/
+#define PUBSUB
+
using System;
using System.Text;
using System.Threading;
using Apache.NMS.Util;
using ZeroMQ;
-//using ZSendRecvOpt = ZMQ.SendRecvOpt;
-//using ZSocketType = ZeroMQ.SocketType;
+using System.Diagnostics;
namespace Apache.NMS.ZMQ
{
@@ -34,20 +35,12 @@
private readonly Session session;
private readonly AcknowledgementMode acknowledgementMode;
- /// <summary>
- /// Socket object
- /// </summary>
- private ZmqSocket messageSubscriber = null;
- /// <summary>
- /// Context binding string
- /// </summary>
- private string contextBinding;
- private Queue destination;
+ private Destination destination;
private event MessageListener listener;
private int listenerCount = 0;
private Thread asyncDeliveryThread = null;
- private AutoResetEvent pause = new AutoResetEvent(false);
- private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
+ private object asyncDeliveryLock = new object();
+ private bool asyncDelivery = false;
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
@@ -56,39 +49,18 @@
set { this.consumerTransformer = value; }
}
- public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, IDestination destination, string selector)
+ public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, IDestination dest, string selector)
{
+ // UNUSED_PARAM(selector); // Selectors are not currently supported
+
if(null == session.Connection.Context)
{
throw new NMSConnectionException();
}
this.session = session;
+ this.destination = (Destination) dest;
this.acknowledgementMode = acknowledgementMode;
- this.messageSubscriber = session.Connection.Context.CreateSocket(SocketType.SUB);
- if(null == this.messageSubscriber)
- {
- throw new ResourceAllocationException();
- }
-
- string clientId = session.Connection.ClientId;
-
- this.contextBinding = session.Connection.BrokerUri.LocalPath;
- this.destination = new Queue(this.contextBinding);
- if(!string.IsNullOrEmpty(clientId))
- {
- this.messageSubscriber.Identity = Encoding.Unicode.GetBytes(clientId);
- }
-
- this.messageSubscriber.Connect(contextBinding);
- byte[] prefix = null;
-
- if(!string.IsNullOrWhiteSpace(selector))
- {
- prefix = Encoding.ASCII.GetBytes(selector);
- }
-
- this.messageSubscriber.Subscribe(prefix);
}
public event MessageListener Listener
@@ -123,8 +95,7 @@
/// </returns>
public IMessage Receive()
{
- // TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
- return ToNmsMessage(messageSubscriber.Receive(Encoding.ASCII));
+ return Receive(TimeSpan.MaxValue);
}
/// <summary>
@@ -136,7 +107,17 @@
public IMessage Receive(TimeSpan timeout)
{
// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
- return ToNmsMessage(messageSubscriber.Receive(Encoding.ASCII, timeout));
+ string msgContent = this.destination.Receive(Encoding.UTF8, timeout);
+
+ if(null != msgContent)
+ {
+ // Strip off the subscribed destination name.
+ string destinationName = this.destination.Name;
+ string messageText = msgContent.Substring(destinationName.Length, msgContent.Length - destinationName.Length);
+ return ToNmsMessage(messageText);
+ }
+
+ return null;
}
/// <summary>
@@ -164,21 +145,18 @@
public void Close()
{
StopAsyncDelivery();
- if(null != messageSubscriber)
- {
- messageSubscriber.Dispose();
- messageSubscriber = null;
- }
+ this.destination = null;
}
protected virtual void StopAsyncDelivery()
{
- if(asyncDelivery.CompareAndSet(true, false))
+ lock(asyncDeliveryLock)
{
+ asyncDelivery = false;
if(null != asyncDeliveryThread)
{
Tracer.Info("Stopping async delivery thread.");
- pause.Set();
+ asyncDeliveryThread.Interrupt();
if(!asyncDeliveryThread.Join(10000))
{
Tracer.Info("Aborting async delivery thread.");
@@ -193,10 +171,12 @@
protected virtual void StartAsyncDelivery()
{
- if(asyncDelivery.CompareAndSet(false, true))
+ Debug.Assert(null == asyncDeliveryThread);
+ lock(asyncDeliveryLock)
{
+ asyncDelivery = true;
asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
- asyncDeliveryThread.Name = "Message Consumer Dispatch: " + contextBinding;
+ asyncDeliveryThread.Name = string.Format("MsgConsumerAsync: {0}", this.destination.Name);
asyncDeliveryThread.IsBackground = true;
asyncDeliveryThread.Start();
}
@@ -205,21 +185,22 @@
protected virtual void DispatchLoop()
{
Tracer.Info("Starting dispatcher thread consumer: " + this);
+ TimeSpan receiveWait = TimeSpan.FromSeconds(3);
- while(asyncDelivery.Value)
+ while(asyncDelivery)
{
try
{
- IMessage message = Receive();
- if(asyncDelivery.Value && message != null)
+ IMessage message = Receive(receiveWait);
+ if(asyncDelivery && message != null)
{
try
{
listener(message);
}
- catch(Exception e)
+ catch(Exception ex)
{
- HandleAsyncException(e);
+ HandleAsyncException(ex);
}
}
}
@@ -233,7 +214,7 @@
Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
}
}
- Tracer.Info("Stopping dispatcher thread consumer: " + this);
+ Tracer.Info("Stopped dispatcher thread consumer: " + this);
}
protected virtual void HandleAsyncException(Exception e)
@@ -252,6 +233,7 @@
/// </returns>
protected virtual IMessage ToNmsMessage(string messageText)
{
+ // Strip off the destination name prefix.
IMessage nmsMessage = new TextMessage(messageText);
try
diff --git a/src/main/csharp/MessageProducer.cs b/src/main/csharp/MessageProducer.cs
index 4d1c56e..4b1db8f 100644
--- a/src/main/csharp/MessageProducer.cs
+++ b/src/main/csharp/MessageProducer.cs
@@ -15,10 +15,13 @@
* limitations under the License.
*/
+#define PUBSUB
+
using System;
using System.Text;
using ZeroMQ;
+
namespace Apache.NMS.ZMQ
{
/// <summary>
@@ -29,10 +32,6 @@
private readonly Session session;
private IDestination destination;
- /// <summary>
- /// Socket object
- /// </summary>
- private ZmqSocket messageProducer = null;
private MsgDeliveryMode deliveryMode;
private TimeSpan timeToLive;
private MsgPriority priority;
@@ -46,24 +45,15 @@
set { this.producerTransformer = value; }
}
- public MessageProducer(Connection connection, Session session, IDestination destination)
+ public MessageProducer(Session session, IDestination dest)
{
- if(null == connection.Context)
+ if(null == session.Connection.Context)
{
throw new NMSConnectionException();
}
this.session = session;
- this.destination = destination;
- this.messageProducer = connection.Context.CreateSocket(SocketType.SUB);
-
- string clientId = connection.ClientId;
- if(!string.IsNullOrEmpty(clientId))
- {
- this.messageProducer.Identity = Encoding.Unicode.GetBytes(clientId);
- }
-
- this.messageProducer.Connect(connection.BrokerUri.LocalPath);
+ this.destination = dest;
}
public void Send(IMessage message)
@@ -76,13 +66,17 @@
Send(Destination, message, deliveryMode, priority, timeToLive);
}
- public void Send(IDestination destination, IMessage message)
+ public void Send(IDestination dest, IMessage message)
{
- Send(destination, message, DeliveryMode, Priority, TimeToLive);
+ Send(dest, message, DeliveryMode, Priority, TimeToLive);
}
- public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+ public void Send(IDestination dest, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
+ // UNUSED_PARAM(deliveryMode); // No concept of different delivery modes in ZMQ
+ // UNUSED_PARAM(priority); // No concept of priority messages in ZMQ
+ // UNUSED_PARAM(timeToLive); // No concept of time-to-live in ZMQ
+
if(null != this.ProducerTransformer)
{
IMessage transformedMessage = ProducerTransformer(this.session, this, message);
@@ -94,7 +88,13 @@
}
// TODO: Support encoding of all message types + all meta data (e.g., headers and properties)
- messageProducer.Send(((ITextMessage) message).Text, Encoding.ASCII);
+
+ // Prefix the message with the destination name. The client will subscribe to this destination name
+ // in order to receive messages.
+ Destination destination = (Destination) dest;
+
+ string msg = destination.Name + ((ITextMessage) message).Text;
+ destination.Send(Encoding.UTF8.GetBytes(msg), this.session.Connection.RequestTimeout);
}
public void Dispose()
@@ -104,11 +104,6 @@
public void Close()
{
- if(null != messageProducer)
- {
- messageProducer.Dispose();
- messageProducer = null;
- }
}
public IMessage CreateMessage()
diff --git a/src/main/csharp/Queue.cs b/src/main/csharp/Queue.cs
index 4d90580..b2b0dd6 100644
--- a/src/main/csharp/Queue.cs
+++ b/src/main/csharp/Queue.cs
@@ -24,13 +24,8 @@
/// </summary>
public class Queue : Destination, IQueue
{
- public Queue()
- : base()
- {
- }
-
- public Queue(String name)
- : base(name)
+ public Queue(Session session, string name)
+ : base(session, name)
{
}
diff --git a/src/main/csharp/Session.cs b/src/main/csharp/Session.cs
index 7cc02b6..bcde99c 100644
--- a/src/main/csharp/Session.cs
+++ b/src/main/csharp/Session.cs
@@ -61,7 +61,7 @@
public IMessageProducer CreateProducer(IDestination destination)
{
- return new MessageProducer(connection, this, destination);
+ return new MessageProducer(this, destination);
}
#endregion
@@ -106,22 +106,22 @@
public IQueue GetQueue(string name)
{
- return new Queue(name);
+ return new Queue(this, name);
}
public ITopic GetTopic(string name)
{
- return new Topic(name);
+ return new Topic(this, name);
}
public ITemporaryQueue CreateTemporaryQueue()
{
- return new TemporaryQueue();
+ return new TemporaryQueue(this);
}
public ITemporaryTopic CreateTemporaryTopic()
{
- return new TemporaryTopic();
+ return new TemporaryTopic(this);
}
/// <summary>
diff --git a/src/main/csharp/TemporaryQueue.cs b/src/main/csharp/TemporaryQueue.cs
index 44845e4..8a82ccc 100644
--- a/src/main/csharp/TemporaryQueue.cs
+++ b/src/main/csharp/TemporaryQueue.cs
@@ -24,13 +24,8 @@
/// </summary>
public class TemporaryQueue : Destination, ITemporaryQueue
{
- public TemporaryQueue()
- : base()
- {
- }
-
- public TemporaryQueue(String name)
- : base(name)
+ public TemporaryQueue(Session session)
+ : base(session, Guid.NewGuid().ToString())
{
}
@@ -39,15 +34,6 @@
get { return DestinationType.TemporaryQueue; }
}
- #region ITemporaryQueue Members
-
- public void Delete()
- {
- // Nothing to delete. Resources are cleaned up automatically.
- }
-
- #endregion
-
#region IQueue Members
public string QueueName
@@ -56,5 +42,14 @@
}
#endregion
+
+ #region ITemporaryQueue Members
+
+ public void Delete()
+ {
+ // Nothing to delete. Resources are cleaned up automatically.
+ }
+
+ #endregion
}
}
diff --git a/src/main/csharp/TemporaryTopic.cs b/src/main/csharp/TemporaryTopic.cs
index 3d50bed..556832e 100644
--- a/src/main/csharp/TemporaryTopic.cs
+++ b/src/main/csharp/TemporaryTopic.cs
@@ -24,13 +24,8 @@
/// </summary>
public class TemporaryTopic : Destination, ITemporaryTopic
{
- public TemporaryTopic()
- : base()
- {
- }
-
- public TemporaryTopic(String name)
- : base(name)
+ public TemporaryTopic(Session session)
+ : base(session, Guid.NewGuid().ToString())
{
}
diff --git a/src/main/csharp/TextMessage.cs b/src/main/csharp/TextMessage.cs
index a7bfca1..c1c723d 100644
--- a/src/main/csharp/TextMessage.cs
+++ b/src/main/csharp/TextMessage.cs
@@ -16,13 +16,12 @@
*/
using System;
+using System.Text;
namespace Apache.NMS.ZMQ
{
public class TextMessage : BaseMessage, ITextMessage
{
- public const int SIZE_OF_INT = 4; // sizeof(int) - though causes unsafe issues with net 1.1
-
private String text;
public TextMessage()
@@ -34,68 +33,13 @@
this.Text = text;
}
-
// Properties
public string Text
{
- get
- {
- if(text == null)
- {
- // now lets read the content
- byte[] data = this.Content;
- if(data != null)
- {
- // TODO assume that the text is ASCII
- char[] chars = new char[data.Length - SIZE_OF_INT];
- for(int i = 0; i < chars.Length; i++)
- {
- chars[i] = (char) data[i + SIZE_OF_INT];
- }
- text = new String(chars);
- }
- }
- return text;
- }
-
- set
- {
- this.text = value;
- byte[] data = null;
- if(text != null)
- {
- // TODO assume that the text is ASCII
-
- byte[] sizePrefix = System.BitConverter.GetBytes(text.Length);
- data = new byte[text.Length + sizePrefix.Length]; //int at the front of it
-
- // add the size prefix
- for(int j = 0; j < sizePrefix.Length; j++)
- {
- // The bytes need to be encoded in big endian
- if(BitConverter.IsLittleEndian)
- {
- data[j] = sizePrefix[sizePrefix.Length - j - 1];
- }
- else
- {
- data[j] = sizePrefix[j];
- }
- }
-
- // Add the data.
- char[] chars = text.ToCharArray();
- for(int i = 0; i < chars.Length; i++)
- {
- data[i + sizePrefix.Length] = (byte) chars[i];
- }
- }
- this.Content = data;
-
- }
+ get { return text; }
+ set { this.text = value; }
}
-
}
}
diff --git a/src/main/csharp/Topic.cs b/src/main/csharp/Topic.cs
index 3f50c01..7419866 100644
--- a/src/main/csharp/Topic.cs
+++ b/src/main/csharp/Topic.cs
@@ -24,13 +24,8 @@
/// </summary>
public class Topic : Destination, ITopic
{
- public Topic()
- : base()
- {
- }
-
- public Topic(String name)
- : base(name)
+ public Topic(Session session, String name)
+ : base(session, name)
{
}
diff --git a/src/main/csharp/Utils.cs b/src/main/csharp/Utils.cs
new file mode 100644
index 0000000..8f4ab80
--- /dev/null
+++ b/src/main/csharp/Utils.cs
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Apache.NMS.ZMQ
+{
+ public class ZMQUtils
+ {
+
+ public static string GetDestinationName(IDestination destination)
+ {
+ switch(destination.DestinationType)
+ {
+ case DestinationType.Topic: return ((Topic) destination).TopicName;
+ case DestinationType.Queue: return ((Queue) destination).QueueName;
+ case DestinationType.TemporaryTopic: return ((TemporaryTopic) destination).TopicName;
+ case DestinationType.TemporaryQueue: return ((TemporaryQueue) destination).QueueName;
+ default: return string.Empty;
+ }
+ }
+ }
+}
diff --git a/src/test/csharp/BaseTest.cs b/src/test/csharp/BaseTest.cs
new file mode 100644
index 0000000..f6196b2
--- /dev/null
+++ b/src/test/csharp/BaseTest.cs
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Threading;
+using NUnit.Framework;
+
+namespace Apache.NMS.ZMQ
+{
+ /// <summary>
+ /// Use to test and verify ZMQ behavior
+ /// </summary>
+ public class BaseTest
+ {
+ [TestFixtureSetUp]
+ public void TestFixtureSetup()
+ {
+ ////////////////////////////
+ // Dependencies check
+ ////////////////////////////
+ string libFolder = Environment.CurrentDirectory;
+ string libFileName;
+
+ libFileName = Path.Combine(libFolder, "clrzmq.dll");
+ Assert.IsTrue(File.Exists(libFileName), "Missing zmq wrapper file: {0}", libFileName);
+ libFileName = Path.Combine(libFolder, "libzmq.dll");
+ Assert.IsTrue(File.Exists(libFileName), "Missing zmq library file: {0}", libFileName);
+ libFileName = Path.Combine(libFolder, "libzmq64.dll");
+ Assert.IsTrue(File.Exists(libFileName), "Missing 64-bit zmq library file: {0}", libFileName);
+ libFileName = Path.Combine(libFolder, "Apache.NMS.dll");
+ Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS library file: {0}", libFileName);
+ libFileName = Path.Combine(libFolder, "Apache.NMS.ZMQ.dll");
+ Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS.ZMQ library file: {0}", libFileName);
+ }
+
+ [SetUp]
+ public void SetUp()
+ {
+ // Setup before each test
+ }
+
+ [TearDown]
+ public void TearDown()
+ {
+ // Clean up after each test
+ }
+ }
+}
+
+
+
diff --git a/src/test/csharp/CommonAssemblyInfo.cs b/src/test/csharp/CommonAssemblyInfo.cs
index ebd3f6d..20d4a99 100644
--- a/src/test/csharp/CommonAssemblyInfo.cs
+++ b/src/test/csharp/CommonAssemblyInfo.cs
@@ -13,7 +13,7 @@
//------------------------------------------------------------------------------
[assembly: ComVisibleAttribute(false)]
-[assembly: CLSCompliantAttribute(true)]
+[assembly: CLSCompliantAttribute(false)]
[assembly: AssemblyTitleAttribute("Apache NMS for ZMQ Class Library")]
[assembly: AssemblyDescriptionAttribute("Apache NMS for ZMQ Class Library (.Net Messaging Library Implementation): An imp" +
"lementation of the NMS API for ZMQ")]
diff --git a/src/test/csharp/FactoryTests.cs b/src/test/csharp/FactoryTests.cs
new file mode 100644
index 0000000..94ff505
--- /dev/null
+++ b/src/test/csharp/FactoryTests.cs
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using NUnit.Framework;
+
+namespace Apache.NMS.ZMQ
+{
+ [TestFixture]
+ public class FactoryTests : BaseTest
+ {
+ [Test]
+ public void TestFactory()
+ {
+ IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+ Assert.IsInstanceOf<ConnectionFactory>(factory, "Wrong factory type.");
+ Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");
+ }
+
+ [Test]
+ public void TestFactoryClientId()
+ {
+ IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"), "MyClientId");
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+ Assert.IsInstanceOf<ConnectionFactory>(factory, "Wrong factory type.");
+ Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");
+ ConnectionFactory zmqConnectionFactory = (ConnectionFactory) factory;
+ Assert.AreEqual(zmqConnectionFactory.ClientId, "MyClientId", "Wrong client Id.");
+ }
+
+ [Test, ExpectedException(typeof(NMSConnectionException))]
+ public void TestFactoryUriMissingPort()
+ {
+ IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost"));
+ }
+
+ [Test]
+ public void TestFactoryDefault()
+ {
+ IConnectionFactory factory = new ConnectionFactory();
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+ Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong default port.");
+ }
+
+ [Test]
+ public void TestFactoryDirectString()
+ {
+ IConnectionFactory factory = new ConnectionFactory("tcp://localhost:5556");
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+ Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");
+ }
+
+ [Test]
+ public void TestFactoryDirectStringClientId()
+ {
+ IConnectionFactory factory = new ConnectionFactory("tcp://localhost:5556", "DirectClientId");
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+ Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");
+ ConnectionFactory zmqConnectionFactory = (ConnectionFactory) factory;
+ Assert.AreEqual(zmqConnectionFactory.ClientId, "DirectClientId", "Wrong client Id.");
+ }
+
+ [Test]
+ public void TestFactoryDirectUri()
+ {
+ IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:5556"));
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+ Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");
+ }
+
+ [Test]
+ public void TestFactoryDirectUriClientId()
+ {
+ IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:5556"), "DirectClientId");
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+ Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");
+ ConnectionFactory zmqConnectionFactory = (ConnectionFactory) factory;
+ Assert.AreEqual(zmqConnectionFactory.ClientId, "DirectClientId", "Wrong client Id.");
+ }
+ }
+}
diff --git a/src/test/csharp/MultiProducersMultiConsumers.cs b/src/test/csharp/MultiProducersMultiConsumers.cs
new file mode 100644
index 0000000..dac3af7
--- /dev/null
+++ b/src/test/csharp/MultiProducersMultiConsumers.cs
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using NUnit.Framework;
+
+namespace Apache.NMS.ZMQ
+{
+ [TestFixture]
+ public class ProducerConsumers : BaseTest
+ {
+ private int totalMsgCountToReceive = 0;
+
+ private class ConsumerTracker
+ {
+ public IMessageConsumer consumer;
+ public int msgCount = 0;
+ }
+
+ [Test]
+ public void TestMultipleProducersConsumer(
+ [Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
+ string destination,
+ [Values(1, 3)]
+ int numProducers,
+ [Values(1, 3)]
+ int numConsumers)
+ {
+ IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+
+ using(IConnection connection = factory.CreateConnection())
+ {
+ Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+ using(ISession session = connection.CreateSession())
+ {
+ Assert.IsNotNull(session, "Error creating Session.");
+ IDestination testDestination = session.GetDestination(destination);
+ Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+
+ // Track the number of messages we should receive
+ this.totalMsgCountToReceive = numProducers * numConsumers;
+
+ ConsumerTracker[] consumerTrackers = null;
+ IMessageProducer[] producers = null;
+
+ try
+ {
+ // Create the consumers
+ consumerTrackers = new ConsumerTracker[numConsumers];
+ for(int index = 0; index < numConsumers; index++)
+ {
+ ConsumerTracker tracker = new ConsumerTracker();
+ tracker.consumer = session.CreateConsumer(testDestination);
+ Assert.IsNotNull(tracker.consumer, "Error creating consumer #{0} on {1}", index, destination);
+ tracker.consumer.Listener += (message) =>
+ {
+ Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
+ ITextMessage textMsg = (ITextMessage) message;
+ Assert.AreEqual(textMsg.Text, "Zero Message.");
+ tracker.msgCount++;
+ };
+ consumerTrackers[index] = tracker;
+ }
+
+ // Create the producers
+ producers = new IMessageProducer[numProducers];
+ for(int index = 0; index < numProducers; index++)
+ {
+ producers[index] = session.CreateProducer(testDestination);
+ Assert.IsNotNull(producers[index], "Error creating producer #{0} on {1}", index, destination);
+ }
+
+ // Send the messages
+ for(int index = 0; index < numProducers; index++)
+ {
+ ITextMessage testMsg = producers[index].CreateTextMessage("Zero Message.");
+ Assert.IsNotNull(testMsg, "Error creating test message for producer #{0}.", index);
+ producers[index].Send(testMsg);
+ }
+
+ // Wait for the message
+ DateTime startWaitTime = DateTime.Now;
+ TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);
+
+ while(GetNumMsgsReceived(consumerTrackers) < this.totalMsgCountToReceive)
+ {
+ if((DateTime.Now - startWaitTime) > maxWaitTime)
+ {
+ Assert.Fail("Timeout waiting for message receive.");
+ }
+
+ Thread.Sleep(5);
+ }
+
+ // Sleep for an extra 2 seconds to see if any extra messages get delivered
+ Thread.Sleep(2 * 1000);
+ Assert.AreEqual(this.totalMsgCountToReceive, GetNumMsgsReceived(consumerTrackers), "Received too many messages.");
+ }
+ finally
+ {
+
+ // Clean up the producers
+ if(null != producers)
+ {
+ foreach(IMessageProducer producer in producers)
+ {
+ producer.Dispose();
+ }
+ }
+
+ // Clean up the consumers
+ if(null != consumerTrackers)
+ {
+ foreach(ConsumerTracker tracker in consumerTrackers)
+ {
+ tracker.consumer.Dispose();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private int GetNumMsgsReceived(ConsumerTracker[] consumerTrackers)
+ {
+ int numMsgs = 0;
+
+ foreach(ConsumerTracker tracker in consumerTrackers)
+ {
+ numMsgs += tracker.msgCount;
+ }
+
+ return numMsgs;
+ }
+ }
+}
+
+
+
diff --git a/src/test/csharp/ZMQTest.cs b/src/test/csharp/ZMQTest.cs
index 0abdf12..4bf09d6 100644
--- a/src/test/csharp/ZMQTest.cs
+++ b/src/test/csharp/ZMQTest.cs
@@ -16,114 +16,156 @@
*/
using System;
-using System.IO;
using System.Threading;
using NUnit.Framework;
namespace Apache.NMS.ZMQ
{
- /// <summary>
- /// Use to test and verify ZMQ behavior
- /// </summary>
[TestFixture]
- public class ZMQTest
+ public class ZMQTest : BaseTest
{
private bool receivedTestMessage = true;
- [SetUp]
- public void SetUp()
+ [Test]
+ public void TestConnection()
{
- // Setup before each test
- }
-
- [TearDown]
- public void TearDown()
- {
- // Clean up after each test
+ IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+ using(IConnection connection = factory.CreateConnection())
+ {
+ Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+ Assert.IsInstanceOf<Connection>(connection, "Wrong connection type.");
+ }
}
[Test]
- public void TestReceive()
+ public void TestSession()
{
- ////////////////////////////
- // Dependencies check
- ////////////////////////////
- string libFolder = System.Environment.CurrentDirectory;
- string libFileName;
-
- libFileName = Path.Combine(libFolder, "clrzmq.dll");
- Assert.IsTrue(File.Exists(libFileName), "Missing zmq wrapper file: {0}", libFileName);
- libFileName = Path.Combine(libFolder, "libzmq.dll");
- Assert.IsTrue(File.Exists(libFileName), "Missing zmq library file: {0}", libFileName);
- libFileName = Path.Combine(libFolder, "libzmq64.dll");
- Assert.IsTrue(File.Exists(libFileName), "Missing 64-bit zmq library file: {0}", libFileName);
- libFileName = Path.Combine(libFolder, "Apache.NMS.dll");
- Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS library file: {0}", libFileName);
- libFileName = Path.Combine(libFolder, "Apache.NMS.ZMQ.dll");
- Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS.ZMQ library file: {0}", libFileName);
-
- ////////////////////////////
- // Factory check
- ////////////////////////////
- IConnectionFactory factory = new ConnectionFactory("tcp://localhost:5556", "");
+ IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
Assert.IsNotNull(factory, "Error creating connection factory.");
-
- ////////////////////////////
- // Connection check
- ////////////////////////////
- IConnection connection = null;
- try
+ using(IConnection connection = factory.CreateConnection())
{
- connection = factory.CreateConnection();
- Assert.IsNotNull(connection, "problem creating connection class, usually problem with libzmq and clrzmq ");
- }
- catch(System.Exception ex1)
- {
- Assert.Fail("Problem creating connection, make sure dependencies are present. Error: {0}", ex1.Message);
- }
-
- ////////////////////////////
- // Session check
- ////////////////////////////
- ISession session = connection.CreateSession();
- // Is session good?
- Assert.IsNotNull(session, "Error creating Session.");
-
- ////////////////////////////
- // Consumer check
- ////////////////////////////
- IQueue testQueue = session.GetQueue("ZMQTestQueue");
- Assert.IsNotNull(testQueue, "Error creating test queue.");
- IMessageConsumer consumer = session.CreateConsumer(testQueue);
- Assert.IsNotNull(consumer, "Error creating consumer.");
-
- consumer.Listener += OnMessage;
-
- ////////////////////////////
- // Producer check
- ////////////////////////////
- IMessageProducer producer = session.CreateProducer(testQueue);
- Assert.IsNotNull(consumer, "Error creating producer.");
-
- ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
- Assert.IsNotNull(testMsg, "Error creating test message.");
-
- producer.Send(testMsg);
-
- ////////////////////////////
- // Listener check
- ////////////////////////////
- DateTime startWaitTime = DateTime.Now;
- TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);
-
- while(!receivedTestMessage)
- {
- if((DateTime.Now - startWaitTime) > maxWaitTime)
+ Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+ using(ISession session = connection.CreateSession())
{
- Assert.Fail("Timeout waiting for message receive.");
+ Assert.IsNotNull(session, "Error creating session.");
+ Assert.IsInstanceOf<Session>(session, "Wrong session type.");
}
+ }
+ }
- Thread.Sleep(5);
+ [Test, Sequential]
+ public void TestDestinations(
+ [Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
+ string destination,
+ [Values(typeof(Queue), typeof(Topic), typeof(TemporaryQueue), typeof(TemporaryTopic))]
+ Type destinationType)
+ {
+ IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+ using(IConnection connection = factory.CreateConnection())
+ {
+ Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+ using(ISession session = connection.CreateSession())
+ {
+ Assert.IsNotNull(session, "Error creating session.");
+ IDestination testDestination = session.GetDestination(destination);
+ Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+ Assert.IsInstanceOf(destinationType, testDestination, "Wrong destintation type.");
+ }
+ }
+ }
+
+ [Test]
+ public void TestProducers(
+ [Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
+ string destination)
+ {
+ IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+ using(IConnection connection = factory.CreateConnection())
+ {
+ Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+ using(ISession session = connection.CreateSession())
+ {
+ Assert.IsNotNull(session, "Error creating session.");
+ IDestination testDestination = session.GetDestination(destination);
+ Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+ using(IMessageProducer producer = session.CreateProducer(testDestination))
+ {
+ Assert.IsNotNull(producer, "Error creating producer on {0}", destination);
+ Assert.IsInstanceOf<MessageProducer>(producer, "Wrong producer type.");
+ }
+ }
+ }
+ }
+
+ [Test]
+ public void TestConsumers(
+ [Values("queue://ZMQTestQueue:", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
+ string destination)
+ {
+ IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+ using(IConnection connection = factory.CreateConnection())
+ {
+ Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+ using(ISession session = connection.CreateSession())
+ {
+ Assert.IsNotNull(session, "Error creating session.");
+ IDestination testDestination = session.GetDestination(destination);
+ Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+ using(IMessageConsumer consumer = session.CreateConsumer(testDestination))
+ {
+ Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
+ Assert.IsInstanceOf<MessageConsumer>(consumer, "Wrong consumer type.");
+ }
+ }
+ }
+ }
+
+ [Test]
+ public void TestSendReceive(
+ [Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
+ string destination)
+ {
+ IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+ Assert.IsNotNull(factory, "Error creating connection factory.");
+ using(IConnection connection = factory.CreateConnection())
+ {
+ Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+ using(ISession session = connection.CreateSession())
+ {
+ Assert.IsNotNull(session, "Error creating Session.");
+ IDestination testDestination = session.GetDestination(destination);
+ Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+ using(IMessageConsumer consumer = session.CreateConsumer(testDestination))
+ {
+ Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
+ consumer.Listener += OnMessage;
+ using(IMessageProducer producer = session.CreateProducer(testDestination))
+ {
+ Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);
+ ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
+ Assert.IsNotNull(testMsg, "Error creating test message.");
+ producer.Send(testMsg);
+ }
+
+ // Wait for the message
+ DateTime startWaitTime = DateTime.Now;
+ TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);
+
+ while(!receivedTestMessage)
+ {
+ if((DateTime.Now - startWaitTime) > maxWaitTime)
+ {
+ Assert.Fail("Timeout waiting for message receive.");
+ }
+
+ Thread.Sleep(5);
+ }
+ }
+ }
}
}
@@ -133,13 +175,10 @@
/// <param name="message"></param>
private void OnMessage(IMessage message)
{
- Assert.IsInstanceOf<ITextMessage>(message, "Wrong message type received.");
+ Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
ITextMessage textMsg = (ITextMessage) message;
Assert.AreEqual(textMsg.Text, "Zero Message.");
receivedTestMessage = true;
}
}
}
-
-
-
diff --git a/vs2010-zmq-net-4.0-test.csproj b/vs2010-zmq-net-4.0-test.csproj
index 8494ee4..3a58166 100644
--- a/vs2010-zmq-net-4.0-test.csproj
+++ b/vs2010-zmq-net-4.0-test.csproj
@@ -48,6 +48,9 @@
</ItemGroup>
<ItemGroup>
<Compile Include="src\test\csharp\CommonAssemblyInfo.cs" />
+ <Compile Include="src\test\csharp\BaseTest.cs" />
+ <Compile Include="src\test\csharp\FactoryTests.cs" />
+ <Compile Include="src\test\csharp\MultiProducersMultiConsumers.cs" />
<Compile Include="src\test\csharp\ZMQTest.cs" />
</ItemGroup>
<ItemGroup>
diff --git a/vs2010-zmq-net-4.0.csproj b/vs2010-zmq-net-4.0.csproj
index bdfdc0d..f41958b 100644
--- a/vs2010-zmq-net-4.0.csproj
+++ b/vs2010-zmq-net-4.0.csproj
@@ -23,6 +23,8 @@
<RegisterForComInterop>false</RegisterForComInterop>
<PlatformTarget>AnyCPU</PlatformTarget>
<TreatWarningsAsErrors>false</TreatWarningsAsErrors>
+ <NoWarn>
+ </NoWarn>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@@ -31,6 +33,8 @@
<DefineConstants>TRACE;NET</DefineConstants>
<AllowUnsafeBlocks>false</AllowUnsafeBlocks>
<DebugType>full</DebugType>
+ <NoWarn>
+ </NoWarn>
</PropertyGroup>
<ItemGroup>
<Reference Include="Apache.NMS, Version=1.5.0.2363, Culture=neutral, PublicKeyToken=82756feee3957618, processorArchitecture=MSIL">
@@ -69,6 +73,7 @@
<SubType>Code</SubType>
</Compile>
<Compile Include="src\main\csharp\TextMessage.cs" />
+ <Compile Include="src\main\csharp\Utils.cs" />
</ItemGroup>
<ItemGroup>
<Content Include="lib\clrzmq\net-4.0\libzmq.dll">