| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Threading; |
| using System.Collections.Generic; |
| using System.Collections.Specialized; |
| using Apache.NMS.ActiveMQ.Commands; |
| using Apache.NMS.ActiveMQ.Util; |
| using Apache.NMS.ActiveMQ.Threads; |
| using Apache.NMS.Util; |
| |
| namespace Apache.NMS.ActiveMQ |
| { |
| public enum AckType |
| { |
| DeliveredAck = 0, // Message delivered but not consumed |
| PoisonAck = 1, // Message could not be processed due to poison pill but discard anyway |
| ConsumedAck = 2, // Message consumed, discard |
| RedeliveredAck = 3, // Message has been Redelivered and is not yet poisoned. |
| IndividualAck = 4, // Only the given message is to be treated as consumed. |
| UnmatchedAck = 5, // Case where durable topic subscription does not match selector |
| ExpiredAck = 6 // Case where message has expired before being dispatched to a consumer. |
| } |
| |
| /// <summary> |
| /// An object capable of receiving messages from some destination |
| /// </summary> |
| public class MessageConsumer : IMessageConsumer, IDispatcher |
| { |
| private const int NO_MAXIMUM_REDELIVERIES = -1; |
| |
| private readonly MessageTransformation messageTransformation; |
| private readonly MessageDispatchChannel unconsumedMessages; |
| private readonly LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>(); |
| private readonly ConsumerInfo info; |
| private readonly Session session; |
| |
| private MessageAck pendingAck = null; |
| |
| private readonly Atomic<bool> started = new Atomic<bool>(); |
| private readonly Atomic<bool> deliveringAcks = new Atomic<bool>(); |
| |
| private int redeliveryTimeout = 500; |
| protected bool disposed = false; |
| private long lastDeliveredSequenceId = -1; |
| private int ackCounter = 0; |
| private int deliveredCounter = 0; |
| private int additionalWindowSize = 0; |
| private long redeliveryDelay = 0; |
| private int dispatchedCount = 0; |
| private volatile bool synchronizationRegistered = false; |
| private bool clearDeliveredList = false; |
| private bool inProgressClearRequiredFlag; |
| private bool optimizeAcknowledge; |
| private DateTime optimizeAckTimestamp = DateTime.Now; |
| private long optimizeAcknowledgeTimeOut = 0; |
| private long optimizedAckScheduledAckInterval = 0; |
| private WaitCallback optimizedAckTask = null; |
| private long failoverRedeliveryWaitPeriod = 0; |
| private bool transactedIndividualAck = false; |
| private bool nonBlockingRedelivery = false; |
| |
| private Exception failureError; |
| private ThreadPoolExecutor executor; |
| |
| private event MessageListener listener; |
| |
| private IRedeliveryPolicy redeliveryPolicy; |
| private PreviouslyDeliveredMap previouslyDeliveredMessages; |
| |
| // Constructor internal to prevent clients from creating an instance. |
| internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination, |
| String name, String selector, int prefetch, int maxPendingMessageCount, |
| bool noLocal, bool browser, bool dispatchAsync ) |
| { |
| if(destination == null) |
| { |
| throw new InvalidDestinationException("Consumer cannot receive on Null Destinations."); |
| } |
| else if(destination.PhysicalName == null) |
| { |
| throw new InvalidDestinationException("The destination object was not given a physical name."); |
| } |
| else if (destination.IsTemporary) |
| { |
| String physicalName = destination.PhysicalName; |
| |
| if(String.IsNullOrEmpty(physicalName)) |
| { |
| throw new InvalidDestinationException("Physical name of Destination should be valid: " + destination); |
| } |
| |
| String connectionID = session.Connection.ConnectionId.Value; |
| |
| if(physicalName.IndexOf(connectionID) < 0) |
| { |
| throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection"); |
| } |
| |
| if(!session.Connection.IsTempDestinationActive(destination as ActiveMQTempDestination)) |
| { |
| throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted"); |
| } |
| } |
| |
| this.session = session; |
| this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy; |
| this.messageTransformation = this.session.Connection.MessageTransformation; |
| |
| if(session.Connection.MessagePrioritySupported) |
| { |
| this.unconsumedMessages = new SimplePriorityMessageDispatchChannel(); |
| } |
| else |
| { |
| this.unconsumedMessages = new FifoMessageDispatchChannel(); |
| } |
| |
| this.info = new ConsumerInfo(); |
| this.info.ConsumerId = id; |
| this.info.Destination = destination; |
| this.info.SubscriptionName = name; |
| this.info.Selector = selector; |
| this.info.PrefetchSize = prefetch; |
| this.info.MaximumPendingMessageLimit = maxPendingMessageCount; |
| this.info.NoLocal = noLocal; |
| this.info.Browser = browser; |
| this.info.DispatchAsync = dispatchAsync; |
| this.info.Retroactive = session.Retroactive; |
| this.info.Exclusive = session.Exclusive; |
| this.info.Priority = session.Priority; |
| this.info.ClientId = session.Connection.ClientId; |
| |
| // If the destination contained a URI query, then use it to set public properties |
| // on the ConsumerInfo |
| if(destination.Options != null) |
| { |
| // Get options prefixed with "consumer.*" |
| StringDictionary options = URISupport.GetProperties(destination.Options, "consumer."); |
| // Extract out custom extension options "consumer.nms.*" |
| StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms."); |
| |
| URISupport.SetProperties(this.info, options); |
| URISupport.SetProperties(this, customConsumerOptions, "nms."); |
| } |
| |
| this.optimizeAcknowledge = session.Connection.OptimizeAcknowledge && |
| session.IsAutoAcknowledge && !this.info.Browser; |
| |
| if (this.optimizeAcknowledge) { |
| this.optimizeAcknowledgeTimeOut = session.Connection.OptimizeAcknowledgeTimeOut; |
| OptimizedAckScheduledAckInterval = session.Connection.OptimizedAckScheduledAckInterval; |
| } |
| |
| this.info.OptimizedAcknowledge = this.optimizeAcknowledge; |
| this.failoverRedeliveryWaitPeriod = session.Connection.ConsumerFailoverRedeliveryWaitPeriod; |
| this.nonBlockingRedelivery = session.Connection.NonBlockingRedelivery; |
| this.transactedIndividualAck = session.Connection.TransactedIndividualAck || |
| session.Connection.MessagePrioritySupported || |
| this.nonBlockingRedelivery; |
| } |
| |
| ~MessageConsumer() |
| { |
| Dispose(false); |
| } |
| |
| #region Property Accessors |
| |
| public long LastDeliveredSequenceId |
| { |
| get { return this.lastDeliveredSequenceId; } |
| } |
| |
| public ConsumerId ConsumerId |
| { |
| get { return this.info.ConsumerId; } |
| } |
| |
| public ConsumerInfo ConsumerInfo |
| { |
| get { return this.info; } |
| } |
| |
| public int RedeliveryTimeout |
| { |
| get { return redeliveryTimeout; } |
| set { redeliveryTimeout = value; } |
| } |
| |
| public int PrefetchSize |
| { |
| get { return this.info.PrefetchSize; } |
| } |
| |
| public IRedeliveryPolicy RedeliveryPolicy |
| { |
| get { return this.redeliveryPolicy; } |
| set { this.redeliveryPolicy = value; } |
| } |
| |
| public long UnconsumedMessageCount |
| { |
| get { return this.unconsumedMessages.Count; } |
| } |
| |
| // Custom Options |
| private bool ignoreExpiration = false; |
| public bool IgnoreExpiration |
| { |
| get { return ignoreExpiration; } |
| set { ignoreExpiration = value; } |
| } |
| |
| public Exception FailureError |
| { |
| get { return this.failureError; } |
| set { this.failureError = value; } |
| } |
| |
| public bool OptimizeAcknowledge |
| { |
| get { return this.optimizeAcknowledge; } |
| set |
| { |
| if (optimizeAcknowledge && !value) |
| { |
| DeliverAcks(); |
| } |
| this.optimizeAcknowledge = value; |
| } |
| } |
| |
| public long OptimizeAcknowledgeTimeOut |
| { |
| get { return this.optimizeAcknowledgeTimeOut; } |
| set { this.optimizeAcknowledgeTimeOut = value; } |
| } |
| |
| public long OptimizedAckScheduledAckInterval |
| { |
| get { return this.optimizedAckScheduledAckInterval; } |
| set |
| { |
| this.optimizedAckScheduledAckInterval = value; |
| |
| if (this.optimizedAckTask != null) |
| { |
| this.session.Scheduler.Cancel(this.optimizedAckTask); |
| this.optimizedAckTask = null; |
| } |
| |
| // Should we periodically send out all outstanding acks. |
| if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0) |
| { |
| this.optimizedAckTask = new WaitCallback(DoOptimizedAck); |
| this.session.Scheduler.ExecutePeriodically( |
| optimizedAckTask, null, TimeSpan.FromMilliseconds(optimizedAckScheduledAckInterval)); |
| } |
| } |
| } |
| |
| public long FailoverRedeliveryWaitPeriod |
| { |
| get { return this.failoverRedeliveryWaitPeriod; } |
| set { this.failoverRedeliveryWaitPeriod = value; } |
| } |
| |
| public bool TransactedIndividualAck |
| { |
| get { return this.transactedIndividualAck; } |
| set { this.transactedIndividualAck = value; } |
| } |
| |
| public bool NonBlockingRedelivery |
| { |
| get { return this.nonBlockingRedelivery; } |
| set { this.nonBlockingRedelivery = value; } |
| } |
| |
| #endregion |
| |
| #region IMessageConsumer Members |
| |
| private ConsumerTransformerDelegate consumerTransformer; |
| /// <summary> |
| /// A Delegate that is called each time a Message is dispatched to allow the client to do |
| /// any necessary transformations on the received message before it is delivered. |
| /// </summary> |
| public ConsumerTransformerDelegate ConsumerTransformer |
| { |
| get { return this.consumerTransformer; } |
| set { this.consumerTransformer = value; } |
| } |
| |
| public event MessageListener Listener |
| { |
| add |
| { |
| CheckClosed(); |
| |
| if(this.PrefetchSize == 0) |
| { |
| throw new NMSException("Cannot set Asynchronous Listener on a Consumer with a zero Prefetch size"); |
| } |
| |
| bool wasStarted = this.session.Started; |
| |
| if(wasStarted) |
| { |
| this.session.Stop(); |
| } |
| |
| listener += value; |
| this.session.Redispatch(this, this.unconsumedMessages); |
| |
| if(wasStarted) |
| { |
| this.session.Start(); |
| } |
| } |
| remove { listener -= value; } |
| } |
| |
| public IMessage Receive() |
| { |
| CheckClosed(); |
| CheckMessageListener(); |
| |
| SendPullRequest(0); |
| MessageDispatch dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1)); |
| |
| if(dispatch == null) |
| { |
| return null; |
| } |
| |
| BeforeMessageIsConsumed(dispatch); |
| AfterMessageIsConsumed(dispatch, false); |
| |
| return CreateActiveMQMessage(dispatch); |
| } |
| |
| public IMessage Receive(TimeSpan timeout) |
| { |
| CheckClosed(); |
| CheckMessageListener(); |
| |
| MessageDispatch dispatch = null; |
| SendPullRequest((long) timeout.TotalMilliseconds); |
| |
| if(this.PrefetchSize == 0) |
| { |
| dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1)); |
| } |
| else |
| { |
| dispatch = this.Dequeue(timeout); |
| } |
| |
| if(dispatch == null) |
| { |
| return null; |
| } |
| |
| BeforeMessageIsConsumed(dispatch); |
| AfterMessageIsConsumed(dispatch, false); |
| |
| return CreateActiveMQMessage(dispatch); |
| } |
| |
| public IMessage ReceiveNoWait() |
| { |
| CheckClosed(); |
| CheckMessageListener(); |
| |
| MessageDispatch dispatch = null; |
| SendPullRequest(-1); |
| |
| if(this.PrefetchSize == 0) |
| { |
| dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1)); |
| } |
| else |
| { |
| dispatch = this.Dequeue(TimeSpan.Zero); |
| } |
| |
| if(dispatch == null) |
| { |
| return null; |
| } |
| |
| BeforeMessageIsConsumed(dispatch); |
| AfterMessageIsConsumed(dispatch, false); |
| |
| return CreateActiveMQMessage(dispatch); |
| } |
| |
| public void Dispose() |
| { |
| Dispose(true); |
| GC.SuppressFinalize(this); |
| } |
| |
| protected void Dispose(bool disposing) |
| { |
| if(disposed) |
| { |
| return; |
| } |
| |
| try |
| { |
| Close(); |
| } |
| catch |
| { |
| // Ignore network errors. |
| } |
| |
| disposed = true; |
| } |
| |
| public virtual void Close() |
| { |
| if(!this.unconsumedMessages.Closed) |
| { |
| if(this.deliveredMessages.Count != 0 && this.session.IsTransacted && this.session.TransactionContext.InTransaction) |
| { |
| Tracer.DebugFormat("Consumer[{0}] Registering new ConsumerCloseSynchronization", |
| this.info.ConsumerId); |
| this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this)); |
| } |
| else |
| { |
| Tracer.DebugFormat("Consumer[{0}] No Active TX or pending acks, closing normally.", |
| this.info.ConsumerId); |
| this.DoClose(); |
| } |
| } |
| } |
| |
| internal void DoClose() |
| { |
| Shutdown(); |
| RemoveInfo removeCommand = new RemoveInfo(); |
| removeCommand.ObjectId = this.ConsumerId; |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Remove of Consumer[{0}] of destination [{1}] sent last delivered Id[{2}].", |
| this.ConsumerId, this.info.Destination, this.lastDeliveredSequenceId); |
| } |
| removeCommand.LastDeliveredSequenceId = lastDeliveredSequenceId; |
| this.session.Connection.Oneway(removeCommand); |
| } |
| |
| /// <summary> |
| /// Called from the parent Session of this Consumer to indicate that its |
| /// parent session is closing and this Consumer should close down but not |
| /// send any message to the Broker as the parent close will take care of |
| /// removing its child resources at the broker. |
| /// </summary> |
| internal void Shutdown() |
| { |
| if(!this.unconsumedMessages.Closed) |
| { |
| if(Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Shutdown of Consumer[{0}] started.", ConsumerId); |
| } |
| |
| // Do we have any acks we need to send out before closing? |
| // Ack any delivered messages now. |
| if(!this.session.IsTransacted) |
| { |
| DeliverAcks(); |
| if(this.IsAutoAcknowledgeBatch) |
| { |
| Acknowledge(); |
| } |
| } |
| |
| if (this.executor != null) |
| { |
| this.executor.Shutdown(); |
| this.executor.AwaitTermination(TimeSpan.FromMinutes(1)); |
| this.executor = null; |
| } |
| if (this.optimizedAckTask != null) |
| { |
| this.session.Scheduler.Cancel(this.optimizedAckTask); |
| } |
| |
| if (this.session.IsClientAcknowledge) |
| { |
| if (!this.info.Browser) |
| { |
| // rollback duplicates that aren't acknowledged |
| LinkedList<MessageDispatch> temp = null; |
| lock(this.deliveredMessages) |
| { |
| temp = new LinkedList<MessageDispatch>(this.deliveredMessages); |
| } |
| foreach (MessageDispatch old in temp) |
| { |
| this.session.Connection.RollbackDuplicate(this, old.Message); |
| } |
| temp.Clear(); |
| } |
| } |
| |
| if(!this.session.IsTransacted) |
| { |
| lock(this.deliveredMessages) |
| { |
| deliveredMessages.Clear(); |
| } |
| } |
| |
| this.session.RemoveConsumer(this); |
| this.unconsumedMessages.Close(); |
| |
| MessageDispatch[] unconsumed = unconsumedMessages.RemoveAll(); |
| if (!this.info.Browser) |
| { |
| foreach (MessageDispatch old in unconsumed) |
| { |
| // ensure we don't filter this as a duplicate |
| session.Connection.RollbackDuplicate(this, old.Message); |
| } |
| } |
| if(Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("Shutdown of Consumer[{0}] completed.", ConsumerId); |
| } |
| } |
| } |
| |
| #endregion |
| |
| protected void SendPullRequest(long timeout) |
| { |
| if(this.info.PrefetchSize == 0 && this.unconsumedMessages.Empty) |
| { |
| MessagePull messagePull = new MessagePull(); |
| messagePull.ConsumerId = this.info.ConsumerId; |
| messagePull.Destination = this.info.Destination; |
| messagePull.Timeout = timeout; |
| messagePull.ResponseRequired = false; |
| |
| Tracer.DebugFormat("Consumer[{0}] sending MessagePull: {1}", ConsumerId, messagePull); |
| |
| session.Connection.Oneway(messagePull); |
| } |
| } |
| |
| protected void DoIndividualAcknowledge(ActiveMQMessage message) |
| { |
| MessageDispatch dispatch = null; |
| |
| lock(this.deliveredMessages) |
| { |
| foreach(MessageDispatch originalDispatch in this.deliveredMessages) |
| { |
| if(originalDispatch.Message.MessageId.Equals(message.MessageId)) |
| { |
| dispatch = originalDispatch; |
| this.deliveredMessages.Remove(originalDispatch); |
| break; |
| } |
| } |
| } |
| |
| if(dispatch == null) |
| { |
| Tracer.DebugFormat("Consumer[{0}] attempt to Ack MessageId[{1}] failed " + |
| "because the original dispatch is not in the Dispatch List", |
| ConsumerId, message.MessageId); |
| return; |
| } |
| |
| MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1); |
| Tracer.DebugFormat("Consumer[{0}] sending Individual Ack for MessageId: {1}", |
| ConsumerId, ack.LastMessageId); |
| this.session.SendAck(ack); |
| } |
| |
| protected void DoNothingAcknowledge(ActiveMQMessage message) |
| { |
| } |
| |
| protected void DoClientAcknowledge(ActiveMQMessage message) |
| { |
| this.CheckClosed(); |
| Tracer.DebugFormat("Consumer[{0}] sending Client Ack", ConsumerId); |
| this.session.Acknowledge(); |
| } |
| |
| public void Start() |
| { |
| if(this.unconsumedMessages.Closed) |
| { |
| return; |
| } |
| |
| this.started.Value = true; |
| this.unconsumedMessages.Start(); |
| this.session.Executor.Wakeup(); |
| } |
| |
| public void Stop() |
| { |
| this.started.Value = false; |
| this.unconsumedMessages.Stop(); |
| } |
| |
| public void DeliverAcks() |
| { |
| MessageAck ack = null; |
| |
| if(this.deliveringAcks.CompareAndSet(false, true)) |
| { |
| if(this.IsAutoAcknowledgeEach) |
| { |
| lock(this.deliveredMessages) |
| { |
| ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck); |
| if(ack != null) |
| { |
| Tracer.DebugFormat("Consumer[{0}] DeliverAcks clearing the Dispatch list", ConsumerId); |
| this.deliveredMessages.Clear(); |
| this.ackCounter = 0; |
| } |
| else |
| { |
| ack = this.pendingAck; |
| this.pendingAck = null; |
| } |
| } |
| } |
| else if(pendingAck != null && pendingAck.AckType == (byte) AckType.ConsumedAck) |
| { |
| ack = pendingAck; |
| pendingAck = null; |
| } |
| |
| if(ack != null) |
| { |
| if (this.executor == null) |
| { |
| this.executor = new ThreadPoolExecutor(); |
| } |
| |
| this.executor.QueueUserWorkItem(AsyncDeliverAck, ack); |
| } |
| else |
| { |
| this.deliveringAcks.Value = false; |
| } |
| } |
| } |
| |
| private void AsyncDeliverAck(object ack) |
| { |
| MessageAck pending = ack as MessageAck; |
| try |
| { |
| this.session.SendAck(pending, true); |
| } |
| catch |
| { |
| Tracer.ErrorFormat("Consumer[{0}] Failed to deliver async Ack {1}", |
| this.info.ConsumerId, pending); |
| } |
| finally |
| { |
| this.deliveringAcks.Value = false; |
| } |
| } |
| |
| internal void InProgressClearRequired() |
| { |
| inProgressClearRequiredFlag = true; |
| // deal with delivered messages async to avoid lock contention with in progress acks |
| clearDeliveredList = true; |
| } |
| |
| internal void ClearMessagesInProgress() |
| { |
| if(inProgressClearRequiredFlag) |
| { |
| // Called from a thread in the ThreadPool, so we wait until we can |
| // get a lock on the unconsumed list then we clear it. |
| lock(this.unconsumedMessages.SyncRoot) |
| { |
| if(inProgressClearRequiredFlag) |
| { |
| Tracer.DebugFormat("Consumer[{0}] clearing unconsumed list ({1}) on transport interrupt", |
| ConsumerId, this.unconsumedMessages.Count); |
| |
| // ensure unconsumed are rolledback up front as they may get redelivered to another consumer |
| MessageDispatch[] list = this.unconsumedMessages.RemoveAll(); |
| if (!this.info.Browser) |
| { |
| foreach (MessageDispatch unconsumed in list) |
| { |
| session.Connection.RollbackDuplicate(this, unconsumed.Message); |
| } |
| } |
| |
| // allow dispatch on this connection to resume |
| this.session.Connection.TransportInterruptionProcessingComplete(); |
| this.inProgressClearRequiredFlag = false; |
| |
| // Wake up anyone blocked on the message channel. |
| this.unconsumedMessages.Signal(); |
| } |
| } |
| } |
| ClearDeliveredList(); |
| } |
| |
| private void ClearDeliveredList() |
| { |
| if (this.clearDeliveredList) |
| { |
| lock(this.deliveredMessages) |
| { |
| if (!this.clearDeliveredList || deliveredMessages.Count == 0) |
| { |
| clearDeliveredList = false; |
| return; |
| } |
| |
| if (session.IsTransacted) |
| { |
| Tracer.DebugFormat("Consumer[{0}]: tracking existing transacted delivered list {1} on transport interrupt", |
| this.info.ConsumerId, deliveredMessages.Count); |
| |
| if (previouslyDeliveredMessages == null) |
| { |
| previouslyDeliveredMessages = new PreviouslyDeliveredMap(session.TransactionContext.TransactionId); |
| } |
| |
| foreach (MessageDispatch delivered in deliveredMessages) |
| { |
| this.previouslyDeliveredMessages[delivered.Message.MessageId] = false; |
| } |
| } |
| else |
| { |
| if (this.session.IsClientAcknowledge) |
| { |
| Tracer.DebugFormat("Consumer[{0}] rolling back delivered list " + |
| "({1}) on transport interrupt", |
| ConsumerId, deliveredMessages.Count); |
| |
| // allow redelivery |
| if (!this.info.Browser) |
| { |
| foreach (MessageDispatch dispatch in deliveredMessages) |
| { |
| this.session.Connection.RollbackDuplicate(this, dispatch.Message); |
| } |
| } |
| } |
| Tracer.DebugFormat("Consumer[{0}]: clearing delivered list {1} on transport interrupt", |
| this.info.ConsumerId, deliveredMessages.Count); |
| this.deliveredMessages.Clear(); |
| this.pendingAck = null; |
| } |
| |
| this.clearDeliveredList = false; |
| } |
| } |
| } |
| |
| public virtual void Dispatch(MessageDispatch dispatch) |
| { |
| MessageListener listener = this.listener; |
| bool dispatchMessage = false; |
| |
| try |
| { |
| ClearMessagesInProgress(); |
| ClearDeliveredList(); |
| |
| lock(this.unconsumedMessages.SyncRoot) |
| { |
| if(!this.unconsumedMessages.Closed) |
| { |
| if(this.info.Browser || !session.Connection.IsDuplicate(this, dispatch.Message)) |
| { |
| if(listener != null && this.unconsumedMessages.Running) |
| { |
| if (RedeliveryExceeded(dispatch)) |
| { |
| PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries); |
| return; |
| } |
| else |
| { |
| dispatchMessage = true; |
| } |
| } |
| else |
| { |
| if (!this.unconsumedMessages.Running) |
| { |
| // delayed redelivery, ensure it can be re delivered |
| session.Connection.RollbackDuplicate(this, dispatch.Message); |
| } |
| this.unconsumedMessages.Enqueue(dispatch); |
| // TODO - Signal message available when we have that event hook. |
| } |
| } |
| else |
| { |
| // deal with duplicate delivery |
| ConsumerId consumerWithPendingTransaction; |
| if (RedeliveryExpectedInCurrentTransaction(dispatch, true)) |
| { |
| Tracer.DebugFormat("Consumer[{0}] tracking transacted({1}) redelivery [{2}]", |
| ConsumerId, previouslyDeliveredMessages.TransactionId, dispatch.Message); |
| if (TransactedIndividualAck) |
| { |
| ImmediateIndividualTransactedAck(dispatch); |
| } |
| else |
| { |
| this.session.SendAck(new MessageAck(dispatch, (byte) AckType.DeliveredAck, 1)); |
| } |
| } |
| else if ((consumerWithPendingTransaction = RedeliveryPendingInCompetingTransaction(dispatch)) != null) |
| { |
| Tracer.WarnFormat("Consumer[{0}] delivering duplicate [{1}], pending transaction completion on ({1}) will rollback", |
| ConsumerId, dispatch.Message, consumerWithPendingTransaction); |
| this.session.Connection.RollbackDuplicate(this, dispatch.Message); |
| Dispatch(dispatch); |
| } |
| else |
| { |
| Tracer.WarnFormat("Consumer[{0}] suppressing duplicate delivery on connection, poison acking: ({1})", |
| ConsumerId, dispatch); |
| PosionAck(dispatch, "Suppressing duplicate delivery on connection, consumer " + ConsumerId); |
| } |
| } |
| } |
| } |
| |
| if(dispatchMessage) |
| { |
| ActiveMQMessage message = CreateActiveMQMessage(dispatch); |
| |
| this.BeforeMessageIsConsumed(dispatch); |
| |
| try |
| { |
| bool expired = (!IgnoreExpiration && message.IsExpired()); |
| |
| if(!expired) |
| { |
| listener(message); |
| } |
| |
| this.AfterMessageIsConsumed(dispatch, expired); |
| } |
| catch(Exception e) |
| { |
| if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge) |
| { |
| // Schedule redelivery and possible dlq processing |
| dispatch.RollbackCause = e; |
| Rollback(); |
| } |
| else |
| { |
| // Transacted or Client ack: Deliver the next message. |
| this.AfterMessageIsConsumed(dispatch, false); |
| } |
| |
| Tracer.ErrorFormat("Consumer[{0}] Exception while processing message: {1}", this.info.ConsumerId, e); |
| |
| // If aborted we stop the abort here and let normal processing resume. |
| // This allows the session to shutdown normally and ack all messages |
| // that have outstanding acks in this consumer. |
| if((Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == ThreadState.AbortRequested) |
| { |
| Thread.ResetAbort(); |
| } |
| } |
| } |
| |
| if(++dispatchedCount % 1000 == 0) |
| { |
| dispatchedCount = 0; |
| Thread.Sleep(1); |
| } |
| } |
| catch(Exception e) |
| { |
| this.session.Connection.OnSessionException(this.session, e); |
| } |
| } |
| |
| private bool RedeliveryExpectedInCurrentTransaction(MessageDispatch dispatch, bool markReceipt) |
| { |
| if (session.IsTransacted) |
| { |
| lock (this.deliveredMessages) |
| { |
| if (previouslyDeliveredMessages != null) |
| { |
| if (previouslyDeliveredMessages.ContainsKey(dispatch.Message.MessageId)) |
| { |
| if (markReceipt) |
| { |
| previouslyDeliveredMessages[dispatch.Message.MessageId] = true; |
| } |
| |
| return true; |
| } |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| private ConsumerId RedeliveryPendingInCompetingTransaction(MessageDispatch dispatch) |
| { |
| foreach (Session session in this.session.Connection.Sessions) |
| { |
| foreach (MessageConsumer consumer in session.Consumers) |
| { |
| if (consumer.RedeliveryExpectedInCurrentTransaction(dispatch, false)) |
| { |
| return consumer.ConsumerId; |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| public bool Iterate() |
| { |
| if(this.listener != null) |
| { |
| MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait(); |
| if(dispatch != null) |
| { |
| this.Dispatch(dispatch); |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /// <summary> |
| /// Used to get an enqueued message from the unconsumedMessages list. The |
| /// amount of time this method blocks is based on the timeout value. if |
| /// timeout == Timeout.Infinite then it blocks until a message is received. |
| /// if timeout == 0 then it it tries to not block at all, it returns a |
| /// message if it is available if timeout > 0 then it blocks up to timeout |
| /// amount of time. Expired messages will consumed by this method. |
| /// </summary> |
| /// <param name="timeout"> |
| /// A <see cref="System.TimeSpan"/> |
| /// </param> |
| /// <returns> |
| /// A <see cref="MessageDispatch"/> |
| /// </returns> |
| private MessageDispatch Dequeue(TimeSpan timeout) |
| { |
| DateTime deadline = DateTime.Now; |
| |
| if(timeout > TimeSpan.Zero) |
| { |
| deadline += timeout; |
| } |
| |
| while(true) |
| { |
| MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout); |
| |
| // Grab a single date/time for calculations to avoid timing errors. |
| DateTime dispatchTime = DateTime.Now; |
| |
| if(dispatch == null) |
| { |
| if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed) |
| { |
| if(dispatchTime > deadline) |
| { |
| // Out of time. |
| timeout = TimeSpan.Zero; |
| } |
| else |
| { |
| // Adjust the timeout to the remaining time. |
| timeout = deadline - dispatchTime; |
| } |
| } |
| else |
| { |
| // Informs the caller of an error in the event that an async exception |
| // took down the parent connection. |
| if(this.failureError != null) |
| { |
| throw NMSExceptionSupport.Create(this.failureError); |
| } |
| |
| return null; |
| } |
| } |
| else if(dispatch.Message == null) |
| { |
| return null; |
| } |
| else if(!IgnoreExpiration && dispatch.Message.IsExpired()) |
| { |
| Tracer.DebugFormat("Consumer[{0}] received expired message: {1}", |
| ConsumerId, dispatch.Message.MessageId); |
| |
| BeforeMessageIsConsumed(dispatch); |
| AfterMessageIsConsumed(dispatch, true); |
| // Refresh the dispatch time |
| dispatchTime = DateTime.Now; |
| |
| if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed) |
| { |
| if(dispatchTime > deadline) |
| { |
| // Out of time. |
| timeout = TimeSpan.Zero; |
| } |
| else |
| { |
| // Adjust the timeout to the remaining time. |
| timeout = deadline - dispatchTime; |
| } |
| } |
| } |
| else if (RedeliveryExceeded(dispatch)) |
| { |
| Tracer.DebugFormat("Consumer[{0}] received with excessive redelivered: {1}", |
| ConsumerId, dispatch); |
| PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery " + |
| "policy limit:" + redeliveryPolicy.MaximumRedeliveries); |
| } |
| else |
| { |
| return dispatch; |
| } |
| } |
| } |
| |
| public virtual void BeforeMessageIsConsumed(MessageDispatch dispatch) |
| { |
| dispatch.DeliverySequenceId = session.NextDeliveryId; |
| this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId; |
| |
| if (!IsAutoAcknowledgeBatch) |
| { |
| lock(this.deliveredMessages) |
| { |
| this.deliveredMessages.AddFirst(dispatch); |
| } |
| |
| if (this.session.IsTransacted) |
| { |
| if (this.transactedIndividualAck) |
| { |
| ImmediateIndividualTransactedAck(dispatch); |
| } |
| else |
| { |
| this.AckLater(dispatch, AckType.DeliveredAck); |
| } |
| } |
| } |
| } |
| |
| private bool IsOptimizedAckTime() |
| { |
| // evaluate both expired and normal msgs as otherwise consumer may get stalled |
| if (ackCounter + deliveredCounter >= (this.info.PrefetchSize * .65)) |
| { |
| return true; |
| } |
| |
| if (optimizeAcknowledgeTimeOut > 0) |
| { |
| DateTime deadline = optimizeAckTimestamp + |
| TimeSpan.FromMilliseconds(optimizeAcknowledgeTimeOut); |
| |
| if (DateTime.Now >= deadline) |
| { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| public virtual void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired) |
| { |
| if(this.unconsumedMessages.Closed) |
| { |
| return; |
| } |
| |
| if(expired) |
| { |
| Acknowledge(dispatch, AckType.ExpiredAck); |
| } |
| else |
| { |
| if(this.session.IsTransacted) |
| { |
| // Do nothing. |
| } |
| else if(this.IsAutoAcknowledgeEach) |
| { |
| if(this.deliveringAcks.CompareAndSet(false, true)) |
| { |
| lock(this.deliveredMessages) |
| { |
| if(this.deliveredMessages.Count != 0) |
| { |
| if (this.optimizeAcknowledge) |
| { |
| this.ackCounter++; |
| |
| if (IsOptimizedAckTime()) |
| { |
| MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck); |
| if (ack != null) |
| { |
| this.deliveredMessages.Clear(); |
| this.ackCounter = 0; |
| this.session.SendAck(ack); |
| this.optimizeAckTimestamp = DateTime.Now; |
| } |
| // as further optimization send ack for expired msgs wehn |
| // there are any. This resets the deliveredCounter to 0 so |
| // that we won't sent standard acks with every msg just |
| // because the deliveredCounter just below 0.5 * prefetch |
| // as used in ackLater() |
| if (this.pendingAck != null && this.deliveredCounter > 0) |
| { |
| this.session.SendAck(pendingAck); |
| this.pendingAck = null; |
| this.deliveredCounter = 0; |
| } |
| } |
| } |
| else |
| { |
| MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck); |
| if (ack != null) |
| { |
| this.deliveredMessages.Clear(); |
| this.session.SendAck(ack); |
| } |
| } |
| } |
| } |
| this.deliveringAcks.Value = false; |
| } |
| } |
| else if(this.IsAutoAcknowledgeBatch) |
| { |
| AckLater(dispatch, AckType.ConsumedAck); |
| } |
| else if(IsClientAcknowledge || IsIndividualAcknowledge) |
| { |
| bool messageAckedByConsumer = false; |
| |
| lock(this.deliveredMessages) |
| { |
| messageAckedByConsumer = this.deliveredMessages.Contains(dispatch); |
| } |
| |
| if(messageAckedByConsumer) |
| { |
| AckLater(dispatch, AckType.DeliveredAck); |
| } |
| } |
| else |
| { |
| throw new NMSException("Invalid session state."); |
| } |
| } |
| } |
| |
| private MessageAck MakeAckForAllDeliveredMessages(AckType type) |
| { |
| lock(this.deliveredMessages) |
| { |
| if(this.deliveredMessages.Count == 0) |
| { |
| return null; |
| } |
| |
| MessageDispatch dispatch = this.deliveredMessages.First.Value; |
| MessageAck ack = new MessageAck(dispatch, (byte) type, this.deliveredMessages.Count); |
| ack.FirstMessageId = this.deliveredMessages.Last.Value.Message.MessageId; |
| return ack; |
| } |
| } |
| |
| private void AckLater(MessageDispatch dispatch, AckType type) |
| { |
| // Don't acknowledge now, but we may need to let the broker know the |
| // consumer got the message to expand the pre-fetch window |
| if(this.session.IsTransacted) |
| { |
| RegisterSync(); |
| } |
| |
| this.deliveredCounter++; |
| |
| MessageAck oldPendingAck = pendingAck; |
| |
| pendingAck = new MessageAck(dispatch, (byte) type, deliveredCounter); |
| |
| if(this.session.IsTransacted && this.session.TransactionContext.InTransaction) |
| { |
| pendingAck.TransactionId = this.session.TransactionContext.TransactionId; |
| } |
| |
| if(oldPendingAck == null) |
| { |
| pendingAck.FirstMessageId = pendingAck.LastMessageId; |
| } |
| else if(oldPendingAck.AckType == pendingAck.AckType) |
| { |
| pendingAck.FirstMessageId = oldPendingAck.FirstMessageId; |
| } |
| else |
| { |
| // old pending ack being superseded by ack of another type, if is is not a delivered |
| // ack and hence important, send it now so it is not lost. |
| if(oldPendingAck.AckType != (byte) AckType.DeliveredAck) |
| { |
| Tracer.DebugFormat("Consumer[{0}] Sending old pending ack {1}, new pending: {2}", |
| ConsumerId, oldPendingAck, pendingAck); |
| |
| this.session.SendAck(oldPendingAck); |
| } |
| else |
| { |
| Tracer.DebugFormat("Consumer[{0}] dropping old pending ack {1}, new pending: {2}", |
| ConsumerId, oldPendingAck, pendingAck); |
| } |
| } |
| |
| // evaluate both expired and normal msgs as otherwise consumer may get stalled |
| if ((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize)) |
| { |
| this.session.SendAck(pendingAck); |
| this.pendingAck = null; |
| this.deliveredCounter = 0; |
| this.additionalWindowSize = 0; |
| } |
| } |
| |
| private void ImmediateIndividualTransactedAck(MessageDispatch dispatch) |
| { |
| // acks accumulate on the broker pending transaction completion to indicate |
| // delivery status |
| RegisterSync(); |
| MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1); |
| ack.TransactionId = session.TransactionContext.TransactionId; |
| this.session.Connection.SyncRequest(ack); |
| } |
| |
| private void PosionAck(MessageDispatch dispatch, string cause) |
| { |
| BrokerError poisonCause = new BrokerError(); |
| poisonCause.ExceptionClass = "javax.jms.JMSException"; |
| poisonCause.Message = cause; |
| |
| MessageAck posionAck = new MessageAck(dispatch, (byte) AckType.PoisonAck, 1); |
| posionAck.FirstMessageId = dispatch.Message.MessageId; |
| posionAck.PoisonCause = poisonCause; |
| this.session.SendAck(posionAck); |
| } |
| |
| private void RegisterSync() |
| { |
| // Don't acknowledge now, but we may need to let the broker know the |
| // consumer got the message to expand the pre-fetch window |
| if(this.session.IsTransacted) |
| { |
| this.session.DoStartTransaction(); |
| |
| if(!synchronizationRegistered) |
| { |
| Tracer.DebugFormat("Consumer[{0}] Registering new MessageConsumerSynchronization", ConsumerId); |
| this.synchronizationRegistered = true; |
| this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this)); |
| } |
| } |
| } |
| |
| private void Acknowledge(MessageDispatch dispatch, AckType ackType) |
| { |
| MessageAck ack = new MessageAck(dispatch, (byte) ackType, 1); |
| this.session.SendAck(ack); |
| lock(this.deliveredMessages) |
| { |
| deliveredMessages.Remove(dispatch); |
| } |
| } |
| |
| internal void Acknowledge() |
| { |
| ClearDeliveredList(); |
| WaitForRedeliveries(); |
| |
| lock(this.deliveredMessages) |
| { |
| // Acknowledge all messages so far. |
| MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck); |
| |
| if(ack == null) |
| { |
| return; // no msgs |
| } |
| |
| if(this.session.IsTransacted) |
| { |
| RollbackOnFailedRecoveryRedelivery(); |
| if (!this.session.TransactionContext.InTransaction) |
| { |
| this.session.DoStartTransaction(); |
| } |
| ack.TransactionId = this.session.TransactionContext.TransactionId; |
| } |
| |
| this.session.SendAck(ack); |
| this.pendingAck = null; |
| |
| // Adjust the counters |
| this.deliveredCounter = Math.Max(0, this.deliveredCounter - this.deliveredMessages.Count); |
| this.additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.deliveredMessages.Count); |
| |
| if(!this.session.IsTransacted) |
| { |
| this.deliveredMessages.Clear(); |
| } |
| } |
| } |
| |
| internal void Commit() |
| { |
| lock(this.deliveredMessages) |
| { |
| this.deliveredMessages.Clear(); |
| ClearPreviouslyDelivered(); |
| } |
| |
| this.redeliveryDelay = 0; |
| } |
| |
| internal void Rollback() |
| { |
| ClearDeliveredList(); |
| lock(this.unconsumedMessages.SyncRoot) |
| { |
| if (this.optimizeAcknowledge) |
| { |
| // remove messages read but not acked at the broker yet through optimizeAcknowledge |
| if (!this.info.Browser) |
| { |
| lock(this.deliveredMessages) |
| { |
| for (int i = 0; (i < this.deliveredMessages.Count) && (i < ackCounter); i++) |
| { |
| // ensure we don't filter this as a duplicate |
| MessageDispatch dispatch = this.deliveredMessages.Last.Value; |
| this.deliveredMessages.RemoveLast(); |
| session.Connection.RollbackDuplicate(this, dispatch.Message); |
| } |
| } |
| } |
| } |
| lock(this.deliveredMessages) |
| { |
| RollbackPreviouslyDeliveredAndNotRedelivered(); |
| if(this.deliveredMessages.Count == 0) |
| { |
| Tracer.DebugFormat("Consumer[{0}] Rolled Back with no dispatched Messages", ConsumerId); |
| return; |
| } |
| |
| // Only increase the redelivery delay after the first redelivery.. |
| MessageDispatch lastMd = this.deliveredMessages.First.Value; |
| int currentRedeliveryCount = lastMd.Message.RedeliveryCounter; |
| |
| redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount); |
| |
| MessageId firstMsgId = this.deliveredMessages.Last.Value.Message.MessageId; |
| |
| foreach(MessageDispatch dispatch in this.deliveredMessages) |
| { |
| // Allow the message to update its internal to reflect a Rollback. |
| dispatch.Message.OnMessageRollback(); |
| // ensure we don't filter this as a duplicate |
| session.Connection.RollbackDuplicate(this, dispatch.Message); |
| } |
| |
| if(this.redeliveryPolicy.MaximumRedeliveries >= 0 && |
| lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries) |
| { |
| // We need to NACK the messages so that they get sent to the DLQ. |
| MessageAck ack = new MessageAck(lastMd, (byte) AckType.PoisonAck, deliveredMessages.Count); |
| |
| Tracer.DebugFormat("Consumer[{0}] Poison Ack of {1} messages aft max redeliveries: {2}", |
| ConsumerId, this.deliveredMessages.Count, this.redeliveryPolicy.MaximumRedeliveries); |
| |
| BrokerError poisonCause = new BrokerError(); |
| poisonCause.ExceptionClass = "javax.jms.JMSException"; |
| poisonCause.Message = "Exceeded RedeliveryPolicy limit: " + RedeliveryPolicy.MaximumRedeliveries; |
| |
| if (lastMd.RollbackCause != null) |
| { |
| BrokerError cause = new BrokerError(); |
| cause.ExceptionClass = "javax.jms.JMSException"; |
| cause.Message = lastMd.RollbackCause.Message; |
| poisonCause.Cause = cause; |
| } |
| ack.FirstMessageId = firstMsgId; |
| ack.PoisonCause = poisonCause; |
| |
| this.session.SendAck(ack); |
| |
| // Adjust the window size. |
| additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.deliveredMessages.Count); |
| |
| this.redeliveryDelay = 0; |
| this.deliveredCounter -= this.deliveredMessages.Count; |
| this.deliveredMessages.Clear(); |
| } |
| else |
| { |
| // We only send a RedeliveryAck after the first redelivery |
| if(currentRedeliveryCount > 0) |
| { |
| MessageAck ack = new MessageAck(lastMd, (byte) AckType.RedeliveredAck, deliveredMessages.Count); |
| ack.FirstMessageId = firstMsgId; |
| this.session.SendAck(ack, true); |
| } |
| |
| if (this.nonBlockingRedelivery) |
| { |
| if(redeliveryDelay == 0) |
| { |
| redeliveryDelay = RedeliveryPolicy.InitialRedeliveryDelay; |
| } |
| |
| Tracer.DebugFormat("Consumer[{0}] Rolled Back, Re-enque {1} messages in Non-Blocking mode, delay: {2}", |
| ConsumerId, this.deliveredMessages.Count, redeliveryDelay); |
| |
| List<MessageDispatch> pendingRedeliveries = |
| new List<MessageDispatch>(this.deliveredMessages); |
| pendingRedeliveries.Reverse(); |
| |
| this.deliveredCounter -= this.deliveredMessages.Count; |
| this.deliveredMessages.Clear(); |
| |
| this.session.Scheduler.ExecuteAfterDelay( |
| NonBlockingRedeliveryCallback, |
| pendingRedeliveries, |
| TimeSpan.FromMilliseconds(redeliveryDelay)); |
| } |
| else |
| { |
| // stop the delivery of messages. |
| this.unconsumedMessages.Stop(); |
| |
| Tracer.DebugFormat("Consumer {0} Rolled Back, Re-enque {1} messages", |
| ConsumerId, this.deliveredMessages.Count); |
| |
| foreach(MessageDispatch dispatch in this.deliveredMessages) |
| { |
| this.unconsumedMessages.EnqueueFirst(dispatch); |
| } |
| |
| this.deliveredCounter -= this.deliveredMessages.Count; |
| this.deliveredMessages.Clear(); |
| |
| if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed) |
| { |
| DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay); |
| ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline); |
| } |
| else |
| { |
| Start(); |
| } |
| } |
| } |
| } |
| } |
| |
| // Only redispatch if there's an async listener otherwise a synchronous |
| // consumer will pull them from the local queue. |
| if(this.listener != null) |
| { |
| this.session.Redispatch(this, this.unconsumedMessages); |
| } |
| } |
| |
| private void RollbackHelper(Object arg) |
| { |
| try |
| { |
| TimeSpan waitTime = (DateTime) arg - DateTime.Now; |
| |
| if(waitTime.CompareTo(TimeSpan.Zero) > 0) |
| { |
| Thread.Sleep(waitTime); |
| } |
| |
| this.Start(); |
| } |
| catch(Exception e) |
| { |
| if(!this.unconsumedMessages.Closed) |
| { |
| this.session.Connection.OnSessionException(this.session, e); |
| } |
| } |
| } |
| |
| private void NonBlockingRedeliveryCallback(object arg) |
| { |
| try |
| { |
| if (!this.unconsumedMessages.Closed) |
| { |
| List<MessageDispatch> pendingRedeliveries = arg as List<MessageDispatch>; |
| |
| foreach (MessageDispatch dispatch in pendingRedeliveries) |
| { |
| session.Dispatch(dispatch); |
| } |
| } |
| } |
| catch (Exception e) |
| { |
| session.Connection.OnAsyncException(e); |
| } |
| } |
| |
| private ActiveMQMessage CreateActiveMQMessage(MessageDispatch dispatch) |
| { |
| ActiveMQMessage message = dispatch.Message.Clone() as ActiveMQMessage; |
| |
| if(this.ConsumerTransformer != null) |
| { |
| IMessage newMessage = ConsumerTransformer(this.session, this, message); |
| if(newMessage != null) |
| { |
| message = this.messageTransformation.TransformMessage<ActiveMQMessage>(newMessage); |
| } |
| } |
| |
| message.Connection = this.session.Connection; |
| |
| if(IsClientAcknowledge) |
| { |
| message.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge); |
| } |
| else if(IsIndividualAcknowledge) |
| { |
| message.Acknowledger += new AcknowledgeHandler(DoIndividualAcknowledge); |
| } |
| else |
| { |
| message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge); |
| } |
| |
| return message; |
| } |
| |
| private void CheckClosed() |
| { |
| if(this.unconsumedMessages.Closed) |
| { |
| throw new NMSException("The Consumer has been Closed"); |
| } |
| } |
| |
| private void CheckMessageListener() |
| { |
| if(this.listener != null) |
| { |
| throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero"); |
| } |
| } |
| |
| internal bool HasMessageListener() |
| { |
| return this.listener != null; |
| } |
| |
| protected bool IsAutoAcknowledgeEach |
| { |
| get |
| { |
| return this.session.IsAutoAcknowledge || |
| (this.session.IsDupsOkAcknowledge && this.info.Destination.IsQueue); |
| } |
| } |
| |
| protected bool IsAutoAcknowledgeBatch |
| { |
| get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; } |
| } |
| |
| protected bool IsIndividualAcknowledge |
| { |
| get { return this.session.IsIndividualAcknowledge; } |
| } |
| |
| protected bool IsClientAcknowledge |
| { |
| get { return this.session.IsClientAcknowledge; } |
| } |
| |
| internal bool IsInUse(ActiveMQTempDestination dest) |
| { |
| return this.info.Destination.Equals(dest); |
| } |
| |
| internal bool Closed |
| { |
| get { return this.unconsumedMessages.Closed; } |
| } |
| |
| private void DoOptimizedAck(object state) |
| { |
| if (this.optimizeAcknowledge && !this.unconsumedMessages.Closed) |
| { |
| Tracer.DebugFormat("Consumer[{0}] performing scheduled delivery of outstanding optimized ack.", ConsumerId); |
| DeliverAcks(); |
| } |
| } |
| |
| private void WaitForRedeliveries() |
| { |
| if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) |
| { |
| DateTime expiry = DateTime.Now + TimeSpan.FromMilliseconds(failoverRedeliveryWaitPeriod); |
| int numberNotReplayed; |
| do |
| { |
| numberNotReplayed = 0; |
| lock(this.deliveredMessages) |
| { |
| if (previouslyDeliveredMessages != null) |
| { |
| foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages) |
| { |
| if (!entry.Value) |
| { |
| numberNotReplayed++; |
| } |
| } |
| } |
| } |
| if (numberNotReplayed > 0) |
| { |
| Tracer.InfoFormat("Consumer[{0}] waiting for redelivery of {1} in transaction: {2}", |
| ConsumerId, numberNotReplayed, previouslyDeliveredMessages.TransactionId); |
| Thread.Sleep((int) Math.Max(500, failoverRedeliveryWaitPeriod/4)); |
| } |
| } |
| while (numberNotReplayed > 0 && expiry > DateTime.Now); |
| } |
| } |
| |
| // called with deliveredMessages locked |
| private void RollbackOnFailedRecoveryRedelivery() |
| { |
| if (previouslyDeliveredMessages != null) |
| { |
| // if any previously delivered messages was not re-delivered, transaction is |
| // invalid and must rollback as messages have been dispatched else where. |
| int numberNotReplayed = 0; |
| foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages) |
| { |
| if (!entry.Value) |
| { |
| numberNotReplayed++; |
| Tracer.DebugFormat("Consumer[{0}] previously delivered message has not been " + |
| "replayed in transaction: {1}, messageId: {2}", |
| ConsumerId, previouslyDeliveredMessages.TransactionId, entry.Key); |
| } |
| } |
| |
| if (numberNotReplayed > 0) |
| { |
| String message = "rolling back transaction (" + |
| previouslyDeliveredMessages.TransactionId + ") post failover recovery. " + numberNotReplayed + |
| " previously delivered message(s) not replayed to consumer: " + this.info.ConsumerId; |
| Tracer.Warn(message); |
| throw new TransactionRolledBackException(message); |
| } |
| } |
| } |
| |
| // called with unconsumedMessages && dispatchedMessages locked |
| // remove any message not re-delivered as they can't be replayed to this |
| // consumer on rollback |
| private void RollbackPreviouslyDeliveredAndNotRedelivered() |
| { |
| if (previouslyDeliveredMessages != null) |
| { |
| foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages) |
| { |
| if (!entry.Value) |
| { |
| RemoveFromDeliveredMessages(entry.Key); |
| } |
| } |
| |
| ClearPreviouslyDelivered(); |
| } |
| } |
| |
| // Must be called with dispatchedMessages locked |
| private void RemoveFromDeliveredMessages(MessageId key) |
| { |
| MessageDispatch toRemove = null; |
| foreach(MessageDispatch candidate in this.deliveredMessages) |
| { |
| if (candidate.Message.MessageId.Equals(key)) |
| { |
| session.Connection.RollbackDuplicate(this, candidate.Message); |
| toRemove = candidate; |
| break; |
| } |
| } |
| |
| if (toRemove != null) |
| { |
| this.deliveredMessages.Remove(toRemove); |
| } |
| } |
| |
| // called with deliveredMessages locked |
| private void ClearPreviouslyDelivered() |
| { |
| if (previouslyDeliveredMessages != null) |
| { |
| previouslyDeliveredMessages.Clear(); |
| previouslyDeliveredMessages = null; |
| } |
| } |
| |
| #region Transaction Redelivery Tracker |
| |
| class PreviouslyDeliveredMap : Dictionary<MessageId, bool> |
| { |
| private TransactionId transactionId; |
| public TransactionId TransactionId |
| { |
| get { return this.transactionId; } |
| } |
| |
| public PreviouslyDeliveredMap(TransactionId transactionId) : base() |
| { |
| this.transactionId = transactionId; |
| } |
| } |
| |
| private bool RedeliveryExceeded(MessageDispatch dispatch) |
| { |
| try |
| { |
| ActiveMQMessage amqMessage = dispatch.Message as ActiveMQMessage; |
| |
| return session.IsTransacted && redeliveryPolicy != null && |
| redeliveryPolicy.MaximumRedeliveries != NO_MAXIMUM_REDELIVERIES && |
| dispatch.RedeliveryCounter > redeliveryPolicy.MaximumRedeliveries && |
| // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin |
| !amqMessage.Properties.Contains("redeliveryDelay"); |
| } |
| catch |
| { |
| return false; |
| } |
| } |
| |
| #endregion |
| |
| #region Nested ISyncronization Types |
| |
| class MessageConsumerSynchronization : ISynchronization |
| { |
| private readonly MessageConsumer consumer; |
| |
| public MessageConsumerSynchronization(MessageConsumer consumer) |
| { |
| this.consumer = consumer; |
| } |
| |
| public void BeforeEnd() |
| { |
| Tracer.DebugFormat("Consumer[{0}] MessageConsumerSynchronization - BeforeEnd Called", |
| this.consumer.ConsumerId); |
| |
| if (this.consumer.TransactedIndividualAck) |
| { |
| this.consumer.ClearDeliveredList(); |
| this.consumer.WaitForRedeliveries(); |
| lock(this.consumer.deliveredMessages) |
| { |
| this.consumer.RollbackOnFailedRecoveryRedelivery(); |
| } |
| } |
| else |
| { |
| this.consumer.Acknowledge(); |
| } |
| |
| this.consumer.synchronizationRegistered = false; |
| } |
| |
| public void AfterCommit() |
| { |
| Tracer.DebugFormat("Consumer[{0}] MessageConsumerSynchronization - AfterCommit Called.", |
| this.consumer.ConsumerId); |
| this.consumer.Commit(); |
| this.consumer.synchronizationRegistered = false; |
| } |
| |
| public void AfterRollback() |
| { |
| Tracer.DebugFormat("Consumer[{0}] MessageConsumerSynchronization - AfterRollback.", |
| this.consumer.ConsumerId); |
| this.consumer.Rollback(); |
| this.consumer.synchronizationRegistered = false; |
| } |
| } |
| |
| protected class ConsumerCloseSynchronization : ISynchronization |
| { |
| private readonly MessageConsumer consumer; |
| |
| public ConsumerCloseSynchronization(MessageConsumer consumer) |
| { |
| this.consumer = consumer; |
| } |
| |
| public void BeforeEnd() |
| { |
| } |
| |
| public void AfterCommit() |
| { |
| if (!this.consumer.Closed) |
| { |
| Tracer.DebugFormat("Consumer[{0}] ConsumerCloseSynchronization - AfterCommit Called.", |
| this.consumer.ConsumerId); |
| this.consumer.DoClose(); |
| } |
| } |
| |
| public void AfterRollback() |
| { |
| if (!this.consumer.Closed) |
| { |
| Tracer.DebugFormat("Consumer[{0}] ConsumerCloseSynchronization - AfterRollback Called.", |
| this.consumer.ConsumerId); |
| this.consumer.DoClose(); |
| } |
| } |
| } |
| |
| #endregion |
| } |
| } |