Refactored async delivery dispatching thread to be interruptable so that it can be cleanly stopped and started.
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index c477356..a64ba15 100644
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -32,6 +32,9 @@
private readonly AcknowledgementMode acknowledgementMode;
private MessageQueue messageQueue;
private event MessageListener listener;
+ private int listenerCount = 0;
+ private Thread asyncDeliveryThread = null;
+ private AutoResetEvent pause = new AutoResetEvent(false);
private AtomicBoolean asyncDelivery = new AtomicBoolean(false);
public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
@@ -50,30 +53,86 @@
add
{
listener += value;
+ listenerCount++;
StartAsyncDelivery();
}
+
remove
{
- listener -= value;
+ if(listenerCount > 0)
+ {
+ listener -= value;
+ listenerCount--;
+ }
+
+ if(0 == listenerCount)
+ {
+ StopAsyncDelivery();
+ }
}
}
public IMessage Receive()
{
- Message message = messageQueue.Receive();
- return ToNmsMessage(message);
+ IMessage nmsMessage = null;
+
+ if(messageQueue != null)
+ {
+ Message message;
+
+ try
+ {
+ message = messageQueue.Receive(zeroTimeout);
+ }
+ catch
+ {
+ message = null;
+ }
+
+ if(null == message)
+ {
+ ReceiveCompletedEventHandler receiveMsg =
+ delegate(Object source, ReceiveCompletedEventArgs asyncResult) {
+ message = messageQueue.EndReceive(asyncResult.AsyncResult);
+ pause.Set();
+ };
+
+ messageQueue.ReceiveCompleted += receiveMsg;
+ messageQueue.BeginReceive();
+ pause.WaitOne();
+ messageQueue.ReceiveCompleted -= receiveMsg;
+ }
+
+ nmsMessage = ToNmsMessage(message);
+ }
+
+ return nmsMessage;
}
public IMessage Receive(TimeSpan timeout)
{
- Message message = messageQueue.Receive(timeout);
- return ToNmsMessage(message);
+ IMessage nmsMessage = null;
+
+ if(messageQueue != null)
+ {
+ Message message = messageQueue.Receive(timeout);
+ nmsMessage = ToNmsMessage(message);
+ }
+
+ return nmsMessage;
}
public IMessage ReceiveNoWait()
{
- Message message = messageQueue.Receive(zeroTimeout);
- return ToNmsMessage(message);
+ IMessage nmsMessage = null;
+
+ if(messageQueue != null)
+ {
+ Message message = messageQueue.Receive(zeroTimeout);
+ nmsMessage = ToNmsMessage(message);
+ }
+
+ return nmsMessage;
}
public void Dispose()
@@ -91,18 +150,34 @@
}
}
- public void StopAsyncDelivery()
+ protected virtual void StopAsyncDelivery()
{
- asyncDelivery.Value = false;
+ if(asyncDelivery.CompareAndSet(true, false))
+ {
+ if(null != asyncDeliveryThread)
+ {
+ Tracer.Info("Stopping async delivery thread.");
+ pause.Set();
+ if(!asyncDeliveryThread.Join(10000))
+ {
+ Tracer.Info("Aborting async delivery thread.");
+ asyncDeliveryThread.Abort();
+ }
+
+ asyncDeliveryThread = null;
+ Tracer.Info("Async delivery thread stopped.");
+ }
+ }
}
protected virtual void StartAsyncDelivery()
{
if(asyncDelivery.CompareAndSet(false, true))
{
- Thread thread = new Thread(new ThreadStart(DispatchLoop));
- thread.IsBackground = true;
- thread.Start();
+ asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
+ asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageQueue.QueueName;
+ asyncDeliveryThread.IsBackground = true;
+ asyncDeliveryThread.Start();
}
}
@@ -111,17 +186,29 @@
Tracer.Info("Starting dispatcher thread consumer: " + this);
while(asyncDelivery.Value)
{
- IMessage message = Receive();
- if(message != null)
+ try
{
- try
+ IMessage message = Receive();
+ if(asyncDelivery.Value && message != null)
{
- listener(message);
+ try
+ {
+ listener(message);
+ }
+ catch(Exception e)
+ {
+ HandleAsyncException(e);
+ }
}
- catch(Exception e)
- {
- HandleAsyncException(e);
- }
+ }
+ catch(ThreadAbortException ex)
+ {
+ Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message);
+ break;
+ }
+ catch(Exception ex)
+ {
+ Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
}
}
Tracer.Info("Stopping dispatcher thread consumer: " + this);