| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Threading; |
| using Apache.NMS.Util; |
| using Org.Apache.Qpid.Messaging; |
| |
| namespace Apache.NMS.Amqp |
| { |
| /// <summary> |
| /// An object capable of receiving messages from some destination |
| /// </summary> |
| public class MessageConsumer : IMessageConsumer |
| { |
| /// <summary> |
| /// Private object used for synchronization, instead of public "this" |
| /// </summary> |
| private readonly object myLock = new object(); |
| |
| protected TimeSpan zeroTimeout = new TimeSpan(0); |
| |
| private readonly Session session; |
| private readonly int id; |
| private readonly Destination destination; |
| private Destination replyToDestination; |
| private readonly AcknowledgementMode acknowledgementMode; |
| private event MessageListener listener; |
| private int listenerCount = 0; |
| private Thread asyncDeliveryThread = null; |
| private AutoResetEvent pause = new AutoResetEvent(false); |
| private Atomic<bool> asyncDelivery = new Atomic<bool>(false); |
| |
| private readonly Atomic<bool> started = new Atomic<bool>(false); |
| private Org.Apache.Qpid.Messaging.Receiver qpidReceiver = null; |
| |
| private ConsumerTransformerDelegate consumerTransformer; |
| public ConsumerTransformerDelegate ConsumerTransformer |
| { |
| get { return this.consumerTransformer; } |
| set { this.consumerTransformer = value; } |
| } |
| |
| public MessageConsumer(Session session, int consumerId, Destination dest, AcknowledgementMode acknowledgementMode) |
| { |
| this.session = session; |
| this.id = consumerId; |
| this.destination = dest; |
| this.acknowledgementMode = acknowledgementMode; |
| } |
| |
| #region IStartable Methods |
| public void Start() |
| { |
| // Don't try creating receiver if session not yet up |
| if (!session.IsStarted) |
| { |
| throw new SessionClosedException(); |
| } |
| |
| if (started.CompareAndSet(false, true)) |
| { |
| try |
| { |
| // Create qpid receiver |
| Tracer.DebugFormat("Start Consumer Id = " + ConsumerId.ToString()); |
| if (qpidReceiver == null) |
| { |
| qpidReceiver = session.CreateQpidReceiver(destination.Address); |
| // Recover replyTo address from qpid receiver and set as the |
| // replyTo destination for received messages. |
| Address replyTo = qpidReceiver.GetAddress(); |
| if (destination.IsQueue) |
| { |
| Queue queue = new Queue(replyTo.Name, replyTo.Subject, replyTo.Options); |
| replyToDestination = (Destination)queue; |
| } |
| else if (destination.IsTopic) |
| { |
| Topic topic = new Topic(replyTo.Name, replyTo.Subject, replyTo.Options); |
| replyToDestination = (Destination)topic; |
| } |
| } |
| } |
| catch (Org.Apache.Qpid.Messaging.QpidException e) |
| { |
| throw new NMSException("Failed to create Qpid Receiver : " + e.Message); |
| } |
| } |
| } |
| |
| public bool IsStarted |
| { |
| get { return started.Value; } |
| } |
| #endregion |
| |
| #region IStoppable Methods |
| public void Stop() |
| { |
| if (started.CompareAndSet(true, false)) |
| { |
| try |
| { |
| Tracer.DebugFormat("Stop Consumer Id = " + ConsumerId); |
| qpidReceiver.Close(); |
| qpidReceiver.Dispose(); |
| qpidReceiver = null; |
| } |
| catch (Org.Apache.Qpid.Messaging.QpidException e) |
| { |
| throw new NMSException("Failed to close consumer with Id " + ConsumerId.ToString() + " : " + e.Message); |
| } |
| } |
| } |
| #endregion |
| |
| public event MessageListener Listener |
| { |
| add |
| { |
| listener += value; |
| listenerCount++; |
| StartAsyncDelivery(); |
| } |
| |
| remove |
| { |
| if(listenerCount > 0) |
| { |
| listener -= value; |
| listenerCount--; |
| } |
| |
| if(0 == listenerCount) |
| { |
| StopAsyncDelivery(); |
| } |
| } |
| } |
| |
| |
| /// <summary> |
| /// Fetch a message from Qpid Receiver. |
| /// Will wait FOREVER. |
| /// </summary> |
| /// <returns>NMS message or null if Fetch fails</returns> |
| public IMessage Receive() |
| { |
| return ReceiveQpid(DurationConstants.FORVER); |
| } |
| |
| |
| /// <summary> |
| /// Fetch a message from Qpid Receiver |
| /// Will wait for given timespan before abandoning the Fetch. |
| /// </summary> |
| /// <param name="timeout"></param> |
| /// <returns>>NMS message or null if Fetch fails or times out</returns> |
| public IMessage Receive(TimeSpan timeout) |
| { |
| return ReceiveQpid(DefaultMessageConverter.ToQpidDuration(timeout)); |
| } |
| |
| |
| /// <summary> |
| /// Fetch a message from Qpid Receiver |
| /// Returns from the Fetch immediately. |
| /// </summary> |
| /// <returns>NMS message or null if none was pending</returns> |
| public IMessage ReceiveNoWait() |
| { |
| return ReceiveQpid(DurationConstants.IMMEDIATE); |
| } |
| |
| |
| |
| private IMessage ReceiveQpid(Org.Apache.Qpid.Messaging.Duration timeout) |
| { |
| IMessage nmsMessage = null; |
| |
| Message qpidMessage = new Message(); |
| if (qpidReceiver.Fetch(ref qpidMessage, timeout)) |
| { |
| nmsMessage = session.MessageConverter.ToNmsMessage(qpidMessage); |
| nmsMessage.NMSReplyTo = replyToDestination; |
| if (this.session.IsAutoAcknowledge) |
| { |
| this.session.Acknowledge(); |
| } |
| } |
| return nmsMessage; |
| } |
| |
| |
| public void Dispose() |
| { |
| Close(); |
| } |
| |
| public void Close() |
| { |
| StopAsyncDelivery(); |
| Stop(); |
| } |
| |
| protected virtual void StopAsyncDelivery() |
| { |
| if(asyncDelivery.CompareAndSet(true, false)) |
| { |
| if(null != asyncDeliveryThread) |
| { |
| Tracer.Info("Stopping async delivery thread."); |
| pause.Set(); |
| if(!asyncDeliveryThread.Join(10000)) |
| { |
| Tracer.Info("Aborting async delivery thread."); |
| asyncDeliveryThread.Abort(); |
| } |
| |
| asyncDeliveryThread = null; |
| Tracer.Info("Async delivery thread stopped."); |
| } |
| } |
| } |
| |
| protected virtual void StartAsyncDelivery() |
| { |
| if(asyncDelivery.CompareAndSet(false, true)) |
| { |
| asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop)); |
| asyncDeliveryThread.Name = "Message Consumer Dispatch: " + asyncDeliveryThread.ManagedThreadId.ToString(); |
| asyncDeliveryThread.IsBackground = true; |
| asyncDeliveryThread.Start(); |
| } |
| } |
| |
| protected virtual void DispatchLoop() |
| { |
| Tracer.Info("Starting dispatcher thread consumer: " + this); |
| while(asyncDelivery.Value) |
| { |
| try |
| { |
| IMessage message = Receive(); |
| if(asyncDelivery.Value && message != null) |
| { |
| try |
| { |
| listener(message); |
| } |
| catch(Exception e) |
| { |
| HandleAsyncException(e); |
| } |
| } |
| } |
| catch(ThreadAbortException ex) |
| { |
| Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message); |
| break; |
| } |
| catch(Exception ex) |
| { |
| Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message); |
| } |
| } |
| Tracer.Info("Stopping dispatcher thread consumer: " + this); |
| } |
| |
| protected virtual void HandleAsyncException(Exception e) |
| { |
| session.Connection.HandleException(e); |
| } |
| |
| protected virtual IMessage ToNmsMessage(Message message) |
| { |
| if(message == null) |
| { |
| return null; |
| } |
| |
| IMessage converted = session.MessageConverter.ToNmsMessage(message); |
| |
| if(this.ConsumerTransformer != null) |
| { |
| IMessage newMessage = ConsumerTransformer(this.session, this, converted); |
| if(newMessage != null) |
| { |
| converted = newMessage; |
| } |
| } |
| |
| return converted; |
| } |
| |
| public int ConsumerId |
| { |
| get { return id; } |
| } |
| } |
| } |