/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
using System;
using System.Threading;
using System.Collections.Generic;
using System.Collections.Specialized;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Util;
using Apache.NMS.ActiveMQ.Threads;
using Apache.NMS.Util;

namespace Apache.NMS.ActiveMQ
{
	public enum AckType
	{
		DeliveredAck = 0, // Message delivered but not consumed
		PoisonAck = 1, // Message could not be processed due to poison pill but discard anyway
		ConsumedAck = 2, // Message consumed, discard
		RedeliveredAck = 3, // Message has been Redelivered and is not yet poisoned.
		IndividualAck = 4 // Only the given message is to be treated as consumed.
	}

	/// <summary>
	/// An object capable of receiving messages from some destination
	/// </summary>
	public class MessageConsumer : IMessageConsumer, IDispatcher
	{
        private const int NO_MAXIMUM_REDELIVERIES = -1;

        private readonly MessageTransformation messageTransformation;
        private readonly MessageDispatchChannel unconsumedMessages;
        private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
        private readonly ConsumerInfo info;
        private readonly Session session;

		private MessageAck pendingAck = null;

		private readonly Atomic<bool> started = new Atomic<bool>();
		private readonly Atomic<bool> deliveringAcks = new Atomic<bool>();

		private int redeliveryTimeout = 500;
		protected bool disposed = false;
		private long lastDeliveredSequenceId = 0;
		private int ackCounter = 0;
		private int deliveredCounter = 0;
		private int additionalWindowSize = 0;
		private long redeliveryDelay = 0;
		private int dispatchedCount = 0;
		private volatile bool synchronizationRegistered = false;
		private bool clearDispatchList = false;
		private bool inProgressClearRequiredFlag;
		private bool optimizeAcknowledge;
		private DateTime optimizeAckTimestamp = DateTime.Now;
	    private long optimizeAcknowledgeTimeOut = 0;
	    private long optimizedAckScheduledAckInterval = 0;
	    private WaitCallback optimizedAckTask = null;
	    private long failoverRedeliveryWaitPeriod = 0;
	    private bool transactedIndividualAck = false;
	    private bool nonBlockingRedelivery = false;

        private Exception failureError;
		private ThreadPoolExecutor executor;

		private event MessageListener listener;

		private IRedeliveryPolicy redeliveryPolicy;
		private PreviouslyDeliveredMap previouslyDeliveredMessages;

		// Constructor internal to prevent clients from creating an instance.
		internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination,
								 String name, String selector, int prefetch, int maxPendingMessageCount,
								 bool noLocal, bool browser, bool dispatchAsync )
		{
			if(destination == null)
			{
				throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
            }
            else if(destination.PhysicalName == null)
            {
                throw new InvalidDestinationException("The destination object was not given a physical name.");
            }
            else if (destination.IsTemporary)
            {
                String physicalName = destination.PhysicalName;

                if(String.IsNullOrEmpty(physicalName))
                {
                    throw new InvalidDestinationException("Physical name of Destination should be valid: " + destination);
                }
    
                String connectionID = session.Connection.ConnectionId.Value;

                if(physicalName.IndexOf(connectionID) < 0)
                {
                    throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
                }
    
                if(!session.Connection.IsTempDestinationActive(destination as ActiveMQTempDestination))
                {
                    throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
                }
            }

			this.session = session;
			this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
			this.messageTransformation = this.session.Connection.MessageTransformation;

			if(session.Connection.MessagePrioritySupported)
			{
				this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
			}
			else
			{
				this.unconsumedMessages = new FifoMessageDispatchChannel();
			}

			this.info = new ConsumerInfo();
			this.info.ConsumerId = id;
			this.info.Destination = destination;
			this.info.SubscriptionName = name;
			this.info.Selector = selector;
			this.info.PrefetchSize = prefetch;
			this.info.MaximumPendingMessageLimit = maxPendingMessageCount;
			this.info.NoLocal = noLocal;
			this.info.Browser = browser;
			this.info.DispatchAsync = dispatchAsync;
			this.info.Retroactive = session.Retroactive;
			this.info.Exclusive = session.Exclusive;
			this.info.Priority = session.Priority;
			this.info.ClientId = session.Connection.ClientId;

			// If the destination contained a URI query, then use it to set public properties
			// on the ConsumerInfo
			if(destination.Options != null)
			{
				// Get options prefixed with "consumer.*"
				StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
				// Extract out custom extension options "consumer.nms.*"
				StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms.");

				URISupport.SetProperties(this.info, options);
				URISupport.SetProperties(this, customConsumerOptions, "nms.");
			}

	        this.optimizeAcknowledge = session.Connection.OptimizeAcknowledge && 
									   session.IsAutoAcknowledge && !this.info.Browser;
	        
			if (this.optimizeAcknowledge) {
	            this.optimizeAcknowledgeTimeOut = session.Connection.OptimizeAcknowledgeTimeOut;
	            OptimizedAckScheduledAckInterval = session.Connection.OptimizedAckScheduledAckInterval;
	        }

	        this.info.OptimizedAcknowledge = this.optimizeAcknowledge;
	        this.failoverRedeliveryWaitPeriod = session.Connection.ConsumerFailoverRedeliveryWaitPeriod;
	        this.nonBlockingRedelivery = session.Connection.NonBlockingRedelivery;
	        this.transactedIndividualAck = session.Connection.TransactedIndividualAck || this.nonBlockingRedelivery;
		}

		~MessageConsumer()
		{
			Dispose(false);
		}

		#region Property Accessors

		public long LastDeliveredSequenceId
		{
			get { return this.lastDeliveredSequenceId; }
		}

		public ConsumerId ConsumerId
		{
			get { return this.info.ConsumerId; }
		}

		public ConsumerInfo ConsumerInfo
		{
			get { return this.info; }
		}

		public int RedeliveryTimeout
		{
			get { return redeliveryTimeout; }
			set { redeliveryTimeout = value; }
		}

		public int PrefetchSize
		{
			get { return this.info.PrefetchSize; }
		}

		public IRedeliveryPolicy RedeliveryPolicy
		{
			get { return this.redeliveryPolicy; }
			set { this.redeliveryPolicy = value; }
		}

		public long UnconsumedMessageCount
		{
			get { return this.unconsumedMessages.Count; }
		}

		// Custom Options
		private bool ignoreExpiration = false;
		public bool IgnoreExpiration
		{
			get { return ignoreExpiration; }
			set { ignoreExpiration = value; }
		}

        public Exception FailureError
        {
            get { return this.failureError; }
            set { this.failureError = value; }
        }

		public bool OptimizeAcknowledge
		{
			get { return this.optimizeAcknowledge; }
			set 
			{
				if (optimizeAcknowledge && !value)
				{
					DeliverAcks();
				}
				this.optimizeAcknowledge = value;
			}
		}

		public long OptimizeAcknowledgeTimeOut
		{
			get { return this.optimizeAcknowledgeTimeOut; }
			set { this.optimizeAcknowledgeTimeOut = value; }
		}
	    
		public long OptimizedAckScheduledAckInterval
		{
			get { return this.optimizedAckScheduledAckInterval; }
			set 
			{ 
				this.optimizedAckScheduledAckInterval = value; 

		        if (this.optimizedAckTask != null) 
				{
					this.session.Scheduler.Cancel(this.optimizedAckTask);
					this.optimizedAckTask = null;
		        }

		        // Should we periodically send out all outstanding acks.
		        if (this.optimizeAcknowledge && this.optimizedAckScheduledAckInterval > 0)
				{
					this.optimizedAckTask = new WaitCallback(DoOptimizedAck);
					this.session.Scheduler.ExecutePeriodically(
						optimizedAckTask, null, TimeSpan.FromMilliseconds(optimizedAckScheduledAckInterval));
				}
			}
		}
	    
		public long FailoverRedeliveryWaitPeriod 
		{
			get { return this.failoverRedeliveryWaitPeriod; }
			set { this.failoverRedeliveryWaitPeriod = value; }
		}
	    
		public bool TransactedIndividualAck
		{
			get { return this.transactedIndividualAck; }
			set { this.transactedIndividualAck = value; }
		}
	    
		public bool NonBlockingRedelivery
		{
			get { return this.nonBlockingRedelivery; }
			set { this.nonBlockingRedelivery = value; }
		}

		#endregion

		#region IMessageConsumer Members

		private ConsumerTransformerDelegate consumerTransformer;
		/// <summary>
		/// A Delegate that is called each time a Message is dispatched to allow the client to do
		/// any necessary transformations on the received message before it is delivered.
		/// </summary>
		public ConsumerTransformerDelegate ConsumerTransformer
		{
			get { return this.consumerTransformer; }
			set { this.consumerTransformer = value; }
		}

		public event MessageListener Listener
		{
			add
			{
				CheckClosed();

				if(this.PrefetchSize == 0)
				{
					throw new NMSException("Cannot set Asynchronous Listener on a Consumer with a zero Prefetch size");
				}

				bool wasStarted = this.session.Started;

				if(wasStarted)
				{
					this.session.Stop();
				}

				listener += value;
				this.session.Redispatch(this.unconsumedMessages);

				if(wasStarted)
				{
					this.session.Start();
				}
			}
			remove { listener -= value; }
		}

		public IMessage Receive()
		{
			CheckClosed();
			CheckMessageListener();

			SendPullRequest(0);
			MessageDispatch dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));

			if(dispatch == null)
			{
				return null;
			}

			BeforeMessageIsConsumed(dispatch);
			AfterMessageIsConsumed(dispatch, false);

			return CreateActiveMQMessage(dispatch);
		}

		public IMessage Receive(TimeSpan timeout)
		{
			CheckClosed();
			CheckMessageListener();

			MessageDispatch dispatch = null;
			SendPullRequest((long) timeout.TotalMilliseconds);

			if(this.PrefetchSize == 0)
			{
				dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
			}
			else
			{
				dispatch = this.Dequeue(timeout);
			}

			if(dispatch == null)
			{
				return null;
			}

			BeforeMessageIsConsumed(dispatch);
			AfterMessageIsConsumed(dispatch, false);

			return CreateActiveMQMessage(dispatch);
		}

		public IMessage ReceiveNoWait()
		{
			CheckClosed();
			CheckMessageListener();

			MessageDispatch dispatch = null;
			SendPullRequest(-1);

			if(this.PrefetchSize == 0)
			{
				dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
			}
			else
			{
				dispatch = this.Dequeue(TimeSpan.Zero);
			}

			if(dispatch == null)
			{
				return null;
			}

			BeforeMessageIsConsumed(dispatch);
			AfterMessageIsConsumed(dispatch, false);

			return CreateActiveMQMessage(dispatch);
		}

		public void Dispose()
		{
			Dispose(true);
			GC.SuppressFinalize(this);
		}

		protected void Dispose(bool disposing)
		{
			if(disposed)
			{
				return;
			}

			try
			{
				Close();
			}
			catch
			{
				// Ignore network errors.
			}

			disposed = true;
		}

		public virtual void Close()
		{
			if(!this.unconsumedMessages.Closed)
			{
				if(this.dispatchedMessages.Count != 0 && this.session.IsTransacted && this.session.TransactionContext.InTransaction)
				{
                    Tracer.DebugFormat("Consumer {0} Registering new ConsumerCloseSynchronization",
                                       this.info.ConsumerId);
                    this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this));
				}
				else
				{
                    Tracer.DebugFormat("Consumer {0} No Active TX or pending acks, closing normally.",
                                       this.info.ConsumerId);
                    this.DoClose();
				}
			}
		}

		internal void DoClose()
		{
	        Shutdown();
			RemoveInfo removeCommand = new RemoveInfo();
			removeCommand.ObjectId = this.ConsumerId;
	        if (Tracer.IsDebugEnabled) 
			{
                Tracer.DebugFormat("Remove of Consumer[{0}] of destination [{1}] sent last delivered Id[{2}].", 
				                   this.ConsumerId, this.info.Destination, this.lastDeliveredSequenceId);
	        }
	        removeCommand.LastDeliveredSequenceId = lastDeliveredSequenceId;
	        this.session.Connection.Oneway(removeCommand);
		}
		
		/// <summary>
		/// Called from the parent Session of this Consumer to indicate that its
		/// parent session is closing and this Consumer should close down but not
		/// send any message to the Broker as the parent close will take care of
		/// removing its child resources at the broker.
		/// </summary>
		internal void Shutdown()
		{
			if(!this.unconsumedMessages.Closed)
			{
				if(Tracer.IsDebugEnabled)
				{
					Tracer.DebugFormat("Shutdown of Consumer[{0}] started.", ConsumerId);
				}

				// Do we have any acks we need to send out before closing?
				// Ack any delivered messages now.
				if(!this.session.IsTransacted)
				{
					DeliverAcks();
					if(this.IsAutoAcknowledgeBatch)
					{
						Acknowledge();
					}
				}

	            if (this.executor != null) 
				{
					this.executor.Shutdown();
					this.executor.AwaitTermination(TimeSpan.FromMinutes(1));
					this.executor = null;
	            }
				if (this.optimizedAckTask != null)
				{
					this.session.Scheduler.Cancel(this.optimizedAckTask);
				}

	            if (this.session.IsClientAcknowledge)
				{
	                if (!this.info.Browser) 
					{
	                    // rollback duplicates that aren't acknowledged
	                    LinkedList<MessageDispatch> temp = null;
					    lock(this.dispatchedMessages)
						{
	                        temp = new LinkedList<MessageDispatch>(this.dispatchedMessages);
	                    }
	                    foreach (MessageDispatch old in temp) 
						{
	                        this.session.Connection.RollbackDuplicate(this, old.Message);
	                    }
	                    temp.Clear();
	                }
	            }

				if(!this.session.IsTransacted)
				{
					lock(this.dispatchedMessages)
					{
						dispatchedMessages.Clear();
					}
				}

				this.session.RemoveConsumer(this);
				this.unconsumedMessages.Close();

	            MessageDispatch[] unconsumed = unconsumedMessages.RemoveAll();
	            if (!this.info.Browser) 
				{
	                foreach (MessageDispatch old in unconsumed) 
					{
	                    // ensure we don't filter this as a duplicate
	                    session.Connection.RollbackDuplicate(this, old.Message);
	                }
	            }
				if(Tracer.IsDebugEnabled)
				{
					Tracer.DebugFormat("Shutdown of Consumer[{0}] completed.", ConsumerId);
				}
			}			
		}

		#endregion

		protected void SendPullRequest(long timeout)
		{
			if(this.info.PrefetchSize == 0 && this.unconsumedMessages.Empty)
			{
				MessagePull messagePull = new MessagePull();
				messagePull.ConsumerId = this.info.ConsumerId;
				messagePull.Destination = this.info.Destination;
				messagePull.Timeout = timeout;
				messagePull.ResponseRequired = false;

				if(Tracer.IsDebugEnabled)
				{
					Tracer.Debug("Sending MessagePull: " + messagePull);
				}

				session.Connection.Oneway(messagePull);
			}
		}

		protected void DoIndividualAcknowledge(ActiveMQMessage message)
		{
			MessageDispatch dispatch = null;

			lock(this.dispatchedMessages)
			{
				foreach(MessageDispatch originalDispatch in this.dispatchedMessages)
				{
					if(originalDispatch.Message.MessageId.Equals(message.MessageId))
					{
						dispatch = originalDispatch;
						this.dispatchedMessages.Remove(originalDispatch);
						break;
					}
				}
			}

			if(dispatch == null)
			{
				Tracer.DebugFormat("Attempt to Ack MessageId[{0}] failed because the original dispatch is not in the Dispatch List", message.MessageId);
				return;
			}

			MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
			Tracer.Debug("Sending Individual Ack for MessageId: " + ack.LastMessageId.ToString());
			this.session.SendAck(ack);
		}

		protected void DoNothingAcknowledge(ActiveMQMessage message)
		{
		}

		protected void DoClientAcknowledge(ActiveMQMessage message)
		{
			this.CheckClosed();
			Tracer.Debug("Sending Client Ack:");
			this.session.Acknowledge();
		}

		public void Start()
		{
			if(this.unconsumedMessages.Closed)
			{
				return;
			}

			this.started.Value = true;
			this.unconsumedMessages.Start();
			this.session.Executor.Wakeup();
		}

		public void Stop()
		{
			this.started.Value = false;
			this.unconsumedMessages.Stop();
		}

		public void DeliverAcks()
		{
			MessageAck ack = null;

			if(this.deliveringAcks.CompareAndSet(false, true))
			{
				if(this.IsAutoAcknowledgeEach)
				{
					lock(this.dispatchedMessages)
					{
						ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
						if(ack != null)
						{
                            Tracer.Debug("Consumer - DeliverAcks clearing the Dispatch list");
							this.dispatchedMessages.Clear();
							this.ackCounter = 0;
						}
						else
						{
							ack = this.pendingAck;
							this.pendingAck = null;
						}
					}
				}
				else if(pendingAck != null && pendingAck.AckType == (byte) AckType.ConsumedAck)
				{
					ack = pendingAck;
					pendingAck = null;
				}

				if(ack != null)
				{
	                if (this.executor == null) 
					{
						this.executor = new ThreadPoolExecutor();
	                }

					this.executor.QueueUserWorkItem(AsyncDeliverAck, ack);
				}
				else
				{
					this.deliveringAcks.Value = false;
				}
			}
		}

		private void AsyncDeliverAck(object ack)
		{
			MessageAck pending = ack as MessageAck;
			try
			{
				this.session.SendAck(pending, true);
			}
			catch
			{
				Tracer.ErrorFormat("Consumer {0} Failed to deliver async Ack {1}",
                                   this.info.ConsumerId, pending);
			}
			finally
			{
				this.deliveringAcks.Value = false;
			}
		}

		internal void InProgressClearRequired()
		{
			inProgressClearRequiredFlag = true;
			// deal with delivered messages async to avoid lock contention with in progress acks
			clearDispatchList = true;
		}

		internal void ClearMessagesInProgress()
		{
			if(inProgressClearRequiredFlag)
			{
				// Called from a thread in the ThreadPool, so we wait until we can
				// get a lock on the unconsumed list then we clear it.
				lock(this.unconsumedMessages.SyncRoot)
				{
					if(inProgressClearRequiredFlag)
					{
						if(Tracer.IsDebugEnabled)
						{
							Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
										 this.unconsumedMessages.Count + ") on transport interrupt");
						}

    	                // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
	                    MessageDispatch[] list = this.unconsumedMessages.RemoveAll();
	                    if (!this.info.Browser) 
						{
	                        foreach (MessageDispatch old in list) 
							{
	                            session.Connection.RollbackDuplicate(this, old.Message);
	                        }
	                    }

						// allow dispatch on this connection to resume
						this.session.Connection.TransportInterruptionProcessingComplete();
						this.inProgressClearRequiredFlag = false;
					}
				}
			}
		}

	    private void ClearDispatchList() 
		{
	        if (this.clearDispatchList) 
			{
				lock(this.dispatchedMessages)
				{
	                if (this.clearDispatchList) 
					{
	                    if (dispatchedMessages.Count != 0) 
						{
	                        if (session.IsTransacted) 
							{
	                            if (Tracer.IsDebugEnabled) 
								{
									Tracer.DebugFormat("Consumer[{0}]: tracking existing transacted delivered list {1} on transport interrupt",
									                   this.info.ConsumerId, dispatchedMessages.Count);
	                            }
	                            if (previouslyDeliveredMessages == null) 
								{
	                                previouslyDeliveredMessages = new PreviouslyDeliveredMap(session.TransactionContext.TransactionId);
	                            }
	                            foreach (MessageDispatch delivered in dispatchedMessages) 
								{
	                                this.previouslyDeliveredMessages[delivered.Message.MessageId] = false;
	                            }
	                        } 
							else 
							{
	                            if (Tracer.IsDebugEnabled) 
								{
									Tracer.DebugFormat("Consumer[{0}]: clearing delivered list {1} on transport interrupt",
									                   this.info.ConsumerId, dispatchedMessages.Count);
	                            }
								this.dispatchedMessages.Clear();
	                            this.pendingAck = null;
	                        }
	                    }
	                    this.clearDispatchList = false;
	                }
	            }
	        }
	    }

		public virtual void Dispatch(MessageDispatch dispatch)
		{
			MessageListener listener = this.listener;
			bool dispatchMessage = false;

			try
			{
				ClearMessagesInProgress();
				ClearDispatchList();

				lock(this.unconsumedMessages.SyncRoot)
				{
					if(!this.unconsumedMessages.Closed)
					{
	                    if(this.info.Browser || !session.Connection.IsDuplicate(this, dispatch.Message)) 
						{
							if(listener != null && this.unconsumedMessages.Running)
							{
                                if (RedeliveryExceeded(dispatch)) 
                                {
                                    PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries);
                                    return;
                                } 
                                else
                                {
								    dispatchMessage = true;
                                }
							}
							else
							{
	                            if (!this.unconsumedMessages.Running) 
								{
	                                // delayed redelivery, ensure it can be re delivered
	                                session.Connection.RollbackDuplicate(this, dispatch.Message);
	                            }
								this.unconsumedMessages.Enqueue(dispatch);
							}
						}
						else 
						{
	                        if (!this.session.IsTransacted) 
							{
	                            Tracer.Warn("Duplicate dispatch on connection: " + session.Connection.ConnectionId +
	                                        " to consumer: " + ConsumerId + ", ignoring (auto acking) duplicate: " + dispatch);
	                            MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
	                            session.SendAck(ack);
	                        } 
							else
							{
	                            if (Tracer.IsDebugEnabled)
								{
									Tracer.DebugFormat("Consumer[{0}]: tracking transacted redelivery of duplicate: {1}",
									                   this.info.ConsumerId, dispatch.Message);
	                            }
	                            bool needsPoisonAck = false;
	                            lock(this.dispatchedMessages)
								{
	                                if (previouslyDeliveredMessages != null) 
									{
	                                    previouslyDeliveredMessages[dispatch.Message.MessageId] = true;
	                                } 
									else 
									{
	                                    // delivery while pending redelivery to another consumer on the same connection
	                                    // not waiting for redelivery will help here
	                                    needsPoisonAck = true;
	                                }
	                            }
	                            if (needsPoisonAck) 
								{
	                                MessageAck poisonAck = new MessageAck(dispatch, (byte) AckType.PoisonAck, 1);
	                                poisonAck.FirstMessageId = dispatch.Message.MessageId;
									BrokerError cause = new BrokerError();
									cause.ExceptionClass = "javax.jms.JMSException";
									cause.Message = "Duplicate dispatch with transacted redeliver pending on another consumer, connection: " + 
													session.Connection.ConnectionId;
	                                Tracer.Warn("Acking duplicate delivery as poison, redelivery must be pending to another" +
	                                            " consumer on this connection, failoverRedeliveryWaitPeriod=" +
	                                            failoverRedeliveryWaitPeriod + ". Message: " + dispatch + ", poisonAck: " + poisonAck);
	                                this.session.SendAck(poisonAck);
	                            } 
								else 
								{
	                                if (transactedIndividualAck) 
									{
	                                    ImmediateIndividualTransactedAck(dispatch);
	                                } 
									else 
									{
	                                    this.session.SendAck(new MessageAck(dispatch, (byte) AckType.DeliveredAck, 1));
	                                }
	                            }
	                        }
						}
					}
				}

				if(dispatchMessage)
				{
					ActiveMQMessage message = CreateActiveMQMessage(dispatch);

					this.BeforeMessageIsConsumed(dispatch);

					try
					{
						bool expired = (!IgnoreExpiration && message.IsExpired());

						if(!expired)
						{
							listener(message);
						}

						this.AfterMessageIsConsumed(dispatch, expired);
					}
					catch(Exception e)
					{
						if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
						{
                            // Schedule redelivery and possible dlq processing
                            dispatch.RollbackCause = e;
                            Rollback();
						}
						else
						{
							// Transacted or Client ack: Deliver the next message.
							this.AfterMessageIsConsumed(dispatch, false);
						}

						Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);

						// If aborted we stop the abort here and let normal processing resume.
						// This allows the session to shutdown normally and ack all messages
						// that have outstanding acks in this consumer.
						if((Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == ThreadState.AbortRequested)
						{
							Thread.ResetAbort();
						}
					}
				}

				if(++dispatchedCount % 1000 == 0)
				{
					dispatchedCount = 0;
					Thread.Sleep(1);
				}
			}
			catch(Exception e)
			{
				this.session.Connection.OnSessionException(this.session, e);
			}
		}

		public bool Iterate()
		{
			if(this.listener != null)
			{
				MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait();
				if(dispatch != null)
				{
					this.Dispatch(dispatch);
					return true;
				}
			}

			return false;
		}

		/// <summary>
		/// Used to get an enqueued message from the unconsumedMessages list. The
		/// amount of time this method blocks is based on the timeout value.  if
		/// timeout == Timeout.Infinite then it blocks until a message is received.
		/// if timeout == 0 then it it tries to not block at all, it returns a
		/// message if it is available if timeout > 0 then it blocks up to timeout
		/// amount of time.  Expired messages will consumed by this method.
		/// </summary>
		/// <param name="timeout">
		/// A <see cref="System.TimeSpan"/>
		/// </param>
		/// <returns>
		/// A <see cref="MessageDispatch"/>
		/// </returns>
		private MessageDispatch Dequeue(TimeSpan timeout)
		{
			DateTime deadline = DateTime.Now;

			if(timeout > TimeSpan.Zero)
			{
				deadline += timeout;
			}

			while(true)
			{
				MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout);

				// Grab a single date/time for calculations to avoid timing errors.
				DateTime dispatchTime = DateTime.Now;

				if(dispatch == null)
				{
					if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
					{
						if(dispatchTime > deadline)
						{
							// Out of time.
							timeout = TimeSpan.Zero;
						}
						else
						{
							// Adjust the timeout to the remaining time.
							timeout = deadline - dispatchTime;
						}
					}
					else
					{
                        // Informs the caller of an error in the event that an async exception
                        // took down the parent connection.
                        if(this.failureError != null)
                        {
                            throw NMSExceptionSupport.Create(this.failureError);
                        }

						return null;
					}
				}
				else if(dispatch.Message == null)
				{
					return null;
				}
				else if(!IgnoreExpiration && dispatch.Message.IsExpired())
				{
					Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);

					BeforeMessageIsConsumed(dispatch);
					AfterMessageIsConsumed(dispatch, true);
					// Refresh the dispatch time
					dispatchTime = DateTime.Now;

					if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
					{
						if(dispatchTime > deadline)
						{
							// Out of time.
							timeout = TimeSpan.Zero;
						}
						else
						{
							// Adjust the timeout to the remaining time.
							timeout = deadline - dispatchTime;
						}
					}
				}
                else if (RedeliveryExceeded(dispatch))
                {
                    Tracer.DebugFormat("[{0}] received with excessive redelivered: {1}", ConsumerId, dispatch);
                    PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery policy limit:" + redeliveryPolicy.MaximumRedeliveries);
                }
				else
				{
					return dispatch;
				}
			}
		}

		public virtual void BeforeMessageIsConsumed(MessageDispatch dispatch)
		{
			this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId;

			if (!IsAutoAcknowledgeBatch)
			{
			    lock(this.dispatchedMessages)
				{
					this.dispatchedMessages.AddFirst(dispatch);
				}

				if (this.session.IsTransacted)
				{
                	if (this.transactedIndividualAck) 
					{
                    	ImmediateIndividualTransactedAck(dispatch);
                	} 
					else 
					{
						this.AckLater(dispatch, AckType.DeliveredAck);
                	}
				}
			}
		}

		private bool IsOptimizedAckTime()
		{
            // evaluate both expired and normal msgs as otherwise consumer may get stalled
            if (ackCounter + deliveredCounter >= (this.info.PrefetchSize * .65))
			{
				return true;
			}

			if (optimizeAcknowledgeTimeOut > 0)
			{
				DateTime deadline = optimizeAckTimestamp + 
					TimeSpan.FromMilliseconds(optimizeAcknowledgeTimeOut);

				if (DateTime.Now >= deadline)
				{
					return true;
				}
			}

			return false;
		}

		public virtual void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
		{
			if(this.unconsumedMessages.Closed)
			{
				return;
			}

			if(expired)
			{
				lock(this.dispatchedMessages)
				{
					this.dispatchedMessages.Remove(dispatch);
				}

				Acknowledge(dispatch, AckType.DeliveredAck);
			}
			else
			{
				if(this.session.IsTransacted)
				{
					// Do nothing.
				}
				else if(this.IsAutoAcknowledgeEach)
				{
					if(this.deliveringAcks.CompareAndSet(false, true))
					{
						lock(this.dispatchedMessages)
						{
							if(this.dispatchedMessages.Count != 0)
							{
	                            if (this.optimizeAcknowledge) 
								{
	                                this.ackCounter++;

	                                if (IsOptimizedAckTime())
									{
	                                    MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
	                                    if (ack != null) 
										{
	                                        this.dispatchedMessages.Clear();
	                                        this.ackCounter = 0;
	                                        this.session.SendAck(ack);
	                                        this.optimizeAckTimestamp = DateTime.Now;
	                                    }
	                                    // as further optimization send ack for expired msgs wehn
	                                    // there are any.  This resets the deliveredCounter to 0 so 
										// that we won't sent standard acks with every msg just
	                                    // because the deliveredCounter just below 0.5 * prefetch 
										// as used in ackLater()
	                                    if (this.pendingAck != null && this.deliveredCounter > 0) 
										{
	                                        this.session.SendAck(pendingAck);
	                                        this.pendingAck = null;
	                                        this.deliveredCounter = 0;
	                                    }
	                                }
	                            }
								else 
								{
	                                MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
	                                if (ack != null) 
									{
	                                    this.dispatchedMessages.Clear();
	                                    this.session.SendAck(ack);
	                                }
	                            }
							}
						}
						this.deliveringAcks.Value = false;
					}
				}
				else if(this.IsAutoAcknowledgeBatch)
				{
					AckLater(dispatch, AckType.ConsumedAck);
				}
				else if(IsClientAcknowledge || IsIndividualAcknowledge)
				{
					bool messageAckedByConsumer = false;

					lock(this.dispatchedMessages)
					{
						messageAckedByConsumer = this.dispatchedMessages.Contains(dispatch);
					}

					if(messageAckedByConsumer)
					{
						AckLater(dispatch, AckType.DeliveredAck);
					}
				}
				else
				{
					throw new NMSException("Invalid session state.");
				}
			}
		}

		private MessageAck MakeAckForAllDeliveredMessages(AckType type)
		{
			lock(this.dispatchedMessages)
			{
				if(this.dispatchedMessages.Count == 0)
				{
					return null;
				}

				MessageDispatch dispatch = this.dispatchedMessages.First.Value;
				MessageAck ack = new MessageAck(dispatch, (byte) type, this.dispatchedMessages.Count);
				ack.FirstMessageId = this.dispatchedMessages.Last.Value.Message.MessageId;
				return ack;
			}
		}

		private void AckLater(MessageDispatch dispatch, AckType type)
		{
			// Don't acknowledge now, but we may need to let the broker know the
			// consumer got the message to expand the pre-fetch window
			if(this.session.IsTransacted)
			{
				RegisterSync();
			}

			this.deliveredCounter++;

			MessageAck oldPendingAck = pendingAck;

        	pendingAck = new MessageAck(dispatch, (byte) type, deliveredCounter);

			if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
			{
				pendingAck.TransactionId = this.session.TransactionContext.TransactionId;
			}

			if(oldPendingAck == null)
			{
				pendingAck.FirstMessageId = pendingAck.LastMessageId;
			}
			else if(oldPendingAck.AckType == pendingAck.AckType)
			{
				pendingAck.FirstMessageId = oldPendingAck.FirstMessageId;
			}
			else
			{
				// old pending ack being superseded by ack of another type, if is is not a delivered
				// ack and hence important, send it now so it is not lost.
				if(oldPendingAck.AckType != (byte) AckType.DeliveredAck)
				{
					if(Tracer.IsDebugEnabled)
					{
						Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
					}

					this.session.SendAck(oldPendingAck);
				}
				else
				{
					if(Tracer.IsDebugEnabled)
					{
						Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
					}
				}
			}

	        // evaluate both expired and normal msgs as otherwise consumer may get stalled
			if ((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter + this.ackCounter - this.additionalWindowSize))
			{
				this.session.SendAck(pendingAck);
				this.pendingAck = null;
				this.deliveredCounter = 0;
				this.additionalWindowSize = 0;
			}
		}

	    private void ImmediateIndividualTransactedAck(MessageDispatch dispatch)
		{
	        // acks accumulate on the broker pending transaction completion to indicate
	        // delivery status
	        RegisterSync();
	        MessageAck ack = new MessageAck(dispatch, (byte) AckType.IndividualAck, 1);
			ack.TransactionId = session.TransactionContext.TransactionId;
	        this.session.Connection.SyncRequest(ack);
	    }

        private void PosionAck(MessageDispatch dispatch, string cause)
        {
            BrokerError poisonCause = new BrokerError();
            poisonCause.ExceptionClass = "javax.jms.JMSException";
            poisonCause.Message = cause;

            MessageAck posionAck = new MessageAck(dispatch, (byte) AckType.PoisonAck, 1);
            posionAck.FirstMessageId = dispatch.Message.MessageId;
            posionAck.PoisonCause = poisonCause;
            this.session.Connection.SyncRequest(posionAck);
        }

	    private void RegisterSync()
		{
			// Don't acknowledge now, but we may need to let the broker know the
			// consumer got the message to expand the pre-fetch window
			if(this.session.IsTransacted)
			{
				this.session.DoStartTransaction();

				if(!synchronizationRegistered)
				{
                    Tracer.DebugFormat("Consumer {0} Registering new MessageConsumerSynchronization",
                                       this.info.ConsumerId);
					this.synchronizationRegistered = true;
					this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this));
				}
			}
		}

	    private void Acknowledge(MessageDispatch dispatch, AckType ackType)
		{
	        MessageAck ack = new MessageAck(dispatch, (byte) ackType, 1);
	        this.session.SendAck(ack);
	        lock(this.dispatchedMessages)
			{
	            dispatchedMessages.Remove(dispatch);
	        }
	    }

		internal void Acknowledge()
		{
        	ClearDispatchList();
        	WaitForRedeliveries();

			lock(this.dispatchedMessages)
			{
				// Acknowledge all messages so far.
				MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);

				if(ack == null)
				{
					return; // no msgs
				}

				if(this.session.IsTransacted)
				{
                	RollbackOnFailedRecoveryRedelivery();
                    if (!this.session.TransactionContext.InTransaction)
                    {
                        this.session.DoStartTransaction();
                    }
				    ack.TransactionId = this.session.TransactionContext.TransactionId;
				}

				this.session.SendAck(ack);
				this.pendingAck = null;

				// Adjust the counters
				this.deliveredCounter = Math.Max(0, this.deliveredCounter - this.dispatchedMessages.Count);
				this.additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);

				if(!this.session.IsTransacted)
				{
					this.dispatchedMessages.Clear();
				}
			}
		}

		internal void Commit()
		{
			lock(this.dispatchedMessages)
			{
				this.dispatchedMessages.Clear();
				ClearPreviouslyDelivered();
			}

			this.redeliveryDelay = 0;
		}

		internal void Rollback()
		{
			lock(this.unconsumedMessages.SyncRoot)
			{
	            if (this.optimizeAcknowledge) 
				{
	                // remove messages read but not acked at the broker yet through optimizeAcknowledge
	                if (!this.info.Browser) 
					{
	                    lock(this.dispatchedMessages)
						{
	                        for (int i = 0; (i < this.dispatchedMessages.Count) && (i < ackCounter); i++)
							{
	                            // ensure we don't filter this as a duplicate
								MessageDispatch dispatch = this.dispatchedMessages.Last.Value;
								this.dispatchedMessages.RemoveLast();
	                            session.Connection.RollbackDuplicate(this, dispatch.Message);
	                        }
	                    }
	                }
	            }
				lock(this.dispatchedMessages)
				{
                	RollbackPreviouslyDeliveredAndNotRedelivered();
					if(this.dispatchedMessages.Count == 0)
					{
                        Tracer.DebugFormat("Consumer {0} Rolled Back, no dispatched Messages",
                                           this.info.ConsumerId);
						return;
					}

					// Only increase the redelivery delay after the first redelivery..
					MessageDispatch lastMd = this.dispatchedMessages.First.Value;
					int currentRedeliveryCount = lastMd.Message.RedeliveryCounter;

					redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);

					MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;

					foreach(MessageDispatch dispatch in this.dispatchedMessages)
					{
						// Allow the message to update its internal to reflect a Rollback.
						dispatch.Message.OnMessageRollback();
                    	// ensure we don't filter this as a duplicate
                    	session.Connection.RollbackDuplicate(this, dispatch.Message);
					}

					if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
					   lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
					{
						// We need to NACK the messages so that they get sent to the DLQ.
                    	MessageAck ack = new MessageAck(lastMd, (byte) AckType.PoisonAck, dispatchedMessages.Count);
                    	
                        if(Tracer.IsDebugEnabled)
                        {
							Tracer.DebugFormat("Consumer {0} Poison Ack of {1} messages aft max redeliveries: {2}",
                                               this.info.ConsumerId, this.dispatchedMessages.Count, this.redeliveryPolicy.MaximumRedeliveries);
                        }

                        BrokerError poisonCause = new BrokerError();
                        poisonCause.ExceptionClass = "javax.jms.JMSException";
                        poisonCause.Message = "Exceeded RedeliveryPolicy limit: " + RedeliveryPolicy.MaximumRedeliveries;

						if (lastMd.RollbackCause != null)
						{
							BrokerError cause = new BrokerError();
                            poisonCause.ExceptionClass = "javax.jms.JMSException";
                            poisonCause.Message = lastMd.RollbackCause.Message;
                            poisonCause.Cause = cause;
						}
                    	ack.FirstMessageId = firstMsgId;
                        ack.PoisonCause = poisonCause;

						this.session.SendAck(ack);

						// Adjust the window size.
						additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);

						this.redeliveryDelay = 0;
                    	this.deliveredCounter -= this.dispatchedMessages.Count;
                    	this.dispatchedMessages.Clear();
					}
					else
					{
						// We only send a RedeliveryAck after the first redelivery
						if(currentRedeliveryCount > 0)
						{
                        	MessageAck ack = new MessageAck(lastMd, (byte) AckType.RedeliveredAck, dispatchedMessages.Count);
							ack.FirstMessageId = firstMsgId;
							this.session.SendAck(ack);
						}

						if (this.nonBlockingRedelivery)
						{
							if(redeliveryDelay == 0)
							{
								redeliveryDelay = RedeliveryPolicy.InitialRedeliveryDelay;
							}

	                        if(Tracer.IsDebugEnabled)
	                        {
								Tracer.DebugFormat("Consumer {0} Rolled Back, Re-enque {1} messages in Non-Blocking mode, delay: {2}",
	                                               this.info.ConsumerId, this.dispatchedMessages.Count, redeliveryDelay);
	                        }

                            List<MessageDispatch> pendingRedeliveries =
                                new List<MessageDispatch>(this.dispatchedMessages);
							pendingRedeliveries.Reverse();

							this.deliveredCounter -= this.dispatchedMessages.Count;
							this.dispatchedMessages.Clear();

							this.session.Scheduler.ExecuteAfterDelay(
								NonBlockingRedeliveryCallback, 
								pendingRedeliveries, 
								TimeSpan.FromMilliseconds(redeliveryDelay));
						}
						else 
						{
							// stop the delivery of messages.
							this.unconsumedMessages.Stop();

	                        if(Tracer.IsDebugEnabled)
	                        {
	                            Tracer.DebugFormat("Consumer {0} Rolled Back, Re-enque {1} messages",
	                                               this.info.ConsumerId, this.dispatchedMessages.Count);
	                        }

							foreach(MessageDispatch dispatch in this.dispatchedMessages)
							{
	                            this.unconsumedMessages.EnqueueFirst(dispatch);
							}

							this.deliveredCounter -= this.dispatchedMessages.Count;
							this.dispatchedMessages.Clear();

							if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed)
							{
								DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
								ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
							}
							else
							{
								Start();
							}
						}
					}
				}
			}

			// Only redispatch if there's an async listener otherwise a synchronous
			// consumer will pull them from the local queue.
			if(this.listener != null)
			{
				this.session.Redispatch(this.unconsumedMessages);
			}
		}

		private void RollbackHelper(Object arg)
		{
			try
			{
				TimeSpan waitTime = (DateTime) arg - DateTime.Now;

				if(waitTime.CompareTo(TimeSpan.Zero) > 0)
				{
					Thread.Sleep(waitTime);
				}

				this.Start();
			}
			catch(Exception e)
			{
				if(!this.unconsumedMessages.Closed)
				{
					this.session.Connection.OnSessionException(this.session, e);
				}
			}
		}

        private void NonBlockingRedeliveryCallback(object arg) 
		{
            try 
			{
                if (!this.unconsumedMessages.Closed) 
				{
					List<MessageDispatch> pendingRedeliveries = arg as List<MessageDispatch>;

                    foreach (MessageDispatch dispatch in pendingRedeliveries) 
					{
                        session.Dispatch(dispatch);
                    }
                }
            } 
			catch (Exception e) 
			{
				session.Connection.OnAsyncException(e);
            }
        }

		private ActiveMQMessage CreateActiveMQMessage(MessageDispatch dispatch)
		{
			ActiveMQMessage message = dispatch.Message.Clone() as ActiveMQMessage;

			if(this.ConsumerTransformer != null)
			{
				IMessage newMessage = ConsumerTransformer(this.session, this, message);
				if(newMessage != null)
				{
					message = this.messageTransformation.TransformMessage<ActiveMQMessage>(newMessage);
				}
			}

			message.Connection = this.session.Connection;

			if(IsClientAcknowledge)
			{
				message.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
			}
			else if(IsIndividualAcknowledge)
			{
				message.Acknowledger += new AcknowledgeHandler(DoIndividualAcknowledge);
			}
			else
			{
				message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
			}

			return message;
		}

		private void CheckClosed()
		{
			if(this.unconsumedMessages.Closed)
			{
				throw new NMSException("The Consumer has been Closed");
			}
		}

		private void CheckMessageListener()
		{
			if(this.listener != null)
			{
				throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero");
			}
		}

		protected bool IsAutoAcknowledgeEach
		{
			get
			{
				return this.session.IsAutoAcknowledge ||
					   (this.session.IsDupsOkAcknowledge && this.info.Destination.IsQueue);
			}
		}

	    protected bool IsAutoAcknowledgeBatch
		{
			get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; }
		}

        protected bool IsIndividualAcknowledge
		{
			get { return this.session.IsIndividualAcknowledge; }
		}

        protected bool IsClientAcknowledge
		{
			get { return this.session.IsClientAcknowledge; }
		}

        internal bool IsInUse(ActiveMQTempDestination dest)
        {
            return this.info.Destination.Equals(dest);
        }

	    internal bool Closed
	    {
            get { return this.unconsumedMessages.Closed; }
	    }

	    private void DoOptimizedAck(object state)
		{
			if (this.optimizeAcknowledge && !this.unconsumedMessages.Closed)
			{
				DeliverAcks();
			}
		}
	    
	    private void WaitForRedeliveries() 
		{
	        if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != null) 
			{
				DateTime expiry = DateTime.Now + TimeSpan.FromMilliseconds(failoverRedeliveryWaitPeriod);
	            int numberNotReplayed;
	            do 
				{
	                numberNotReplayed = 0;
	                lock(this.dispatchedMessages)
					{
	                    if (previouslyDeliveredMessages != null) 
						{
							foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages)
							{
								if (!entry.Value)
								{
									numberNotReplayed++;
								}
							}
	                    }
	                }
	                if (numberNotReplayed > 0) 
					{
	                    Tracer.Info("waiting for redelivery of " + numberNotReplayed + " in transaction: " +
	                                previouslyDeliveredMessages.TransactionId +  ", to consumer :" + 
						            this.info.ConsumerId);
	                    Thread.Sleep((int) Math.Max(500, failoverRedeliveryWaitPeriod/4));
	                }
	            } 
				while (numberNotReplayed > 0 && expiry > DateTime.Now);
	        }
	    }

     	// called with deliveredMessages locked
	    private void RollbackOnFailedRecoveryRedelivery() 
		{
	        if (previouslyDeliveredMessages != null) 
			{
	            // if any previously delivered messages was not re-delivered, transaction is 
				// invalid and must rollback as messages have been dispatched else where.
	            int numberNotReplayed = 0;
				foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages)
				{
					if (!entry.Value)
					{
						numberNotReplayed++;
	                    if (Tracer.IsDebugEnabled) 
						{
	                        Tracer.DebugFormat("previously delivered message has not been replayed in transaction: " +
	                            previouslyDeliveredMessages.TransactionId + " , messageId: " + entry.Key);
	                    }
					}
				}

	            if (numberNotReplayed > 0) 
				{
	                String message = "rolling back transaction (" +
	                     previouslyDeliveredMessages.TransactionId + ") post failover recovery. " + numberNotReplayed +
	                     " previously delivered message(s) not replayed to consumer: " + this.info.ConsumerId;
	                Tracer.Warn(message);
	                throw new TransactionRolledBackException(message);
	            }
	        }
	    }

	     // called with unconsumedMessages && dispatchedMessages locked
	     // remove any message not re-delivered as they can't be replayed to this
	     // consumer on rollback
	    private void RollbackPreviouslyDeliveredAndNotRedelivered() 
		{
	        if (previouslyDeliveredMessages != null) 
			{
				foreach(KeyValuePair<MessageId, bool> entry in previouslyDeliveredMessages)
				{
	                if (!entry.Value) 
					{
	                    RemoveFromDeliveredMessages(entry.Key);
	                }
	            }

	            ClearPreviouslyDelivered();
	        }
	    }

		// Must be called with dispatchedMessages locked
	    private void RemoveFromDeliveredMessages(MessageId key) 
		{
			MessageDispatch toRemove = null;
			foreach(MessageDispatch candidate in this.dispatchedMessages)
			{
				if (candidate.Message.MessageId.Equals(key))
				{
                	session.Connection.RollbackDuplicate(this, candidate.Message);
					toRemove = candidate;
					break;
				}
			}

			if (toRemove != null)
			{
				this.dispatchedMessages.Remove(toRemove);
			}
	    }

	    // called with deliveredMessages locked
	    private void ClearPreviouslyDelivered() 
		{
	        if (previouslyDeliveredMessages != null) 
			{
	            previouslyDeliveredMessages.Clear();
	            previouslyDeliveredMessages = null;
	        }
	    }

		#region Transaction Redelivery Tracker

		class PreviouslyDeliveredMap : Dictionary<MessageId, bool>
		{
			private TransactionId transactionId;
			public TransactionId TransactionId
			{
				get { return this.transactionId; }
			}

			public PreviouslyDeliveredMap(TransactionId transactionId) : base()
			{
				this.transactionId = transactionId;
			}
		}

        private bool RedeliveryExceeded(MessageDispatch dispatch) 
        {
            try 
            {
                ActiveMQMessage amqMessage = dispatch.Message as ActiveMQMessage;

                Tracer.Debug("Checking if Redelivery count is exceeded.");
                Tracer.DebugFormat("Current policy = {0}", RedeliveryPolicy.MaximumRedeliveries);
                Tracer.DebugFormat("Message Redelivery Count = {0}", dispatch.RedeliveryCounter);
                Tracer.DebugFormat("Is Transacted? {0}", session.IsTransacted);
                Tracer.DebugFormat("Is Message from redelivery plugin? {0}", amqMessage.Properties.Contains("redeliveryDelay"));

                bool result = session.IsTransacted && redeliveryPolicy != null &&
                       redeliveryPolicy.MaximumRedeliveries != NO_MAXIMUM_REDELIVERIES &&
                       dispatch.RedeliveryCounter > redeliveryPolicy.MaximumRedeliveries &&
                       // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
                       !amqMessage.Properties.Contains("redeliveryDelay");

                Tracer.DebugFormat("Exceeded Redelivery Max? {0}", result);
                return result;
            }
            catch (Exception ignored) 
            {
                return false;
            }
        }

		#endregion

		#region Nested ISyncronization Types

		class MessageConsumerSynchronization : ISynchronization
		{
			private readonly MessageConsumer consumer;

			public MessageConsumerSynchronization(MessageConsumer consumer)
			{
				this.consumer = consumer;
			}

			public void BeforeEnd()
			{
                Tracer.DebugFormat("MessageConsumerSynchronization - BeforeEnd Called for Consumer {0}.",
                                   this.consumer.ConsumerId);

                if (this.consumer.TransactedIndividualAck) 
				{
                    this.consumer.ClearDispatchList();
                    this.consumer.WaitForRedeliveries();
                    lock(this.consumer.dispatchedMessages)
					{
                        this.consumer.RollbackOnFailedRecoveryRedelivery();
                    }
                } 
				else 
				{
					this.consumer.Acknowledge();
                }

				this.consumer.synchronizationRegistered = false;
			}

			public void AfterCommit()
			{
                Tracer.DebugFormat("MessageConsumerSynchronization - AfterCommit Called for Consumer {0}.",
                                   this.consumer.ConsumerId);
				this.consumer.Commit();
				this.consumer.synchronizationRegistered = false;
			}

			public void AfterRollback()
			{
                Tracer.DebugFormat("MessageConsumerSynchronization - AfterRollback Called for Consumer {0}.",
                                   this.consumer.ConsumerId);
				this.consumer.Rollback();
				this.consumer.synchronizationRegistered = false;
			}
		}

		protected class ConsumerCloseSynchronization : ISynchronization
		{
			private readonly MessageConsumer consumer;

			public ConsumerCloseSynchronization(MessageConsumer consumer)
			{
				this.consumer = consumer;
			}

			public void BeforeEnd()
			{
			}

			public void AfterCommit()
			{
                if (!this.consumer.Closed) 
                {
                    Tracer.DebugFormat("ConsumerCloseSynchronization - AfterCommit Called for Consumer {0}.",
                                       this.consumer.ConsumerId);
                    this.consumer.DoClose();
                }
			}

			public void AfterRollback()
			{
                if (!this.consumer.Closed) 
                {
                    Tracer.DebugFormat("ConsumerCloseSynchronization - AfterRollback Called for Consumer {0}.",
                                       this.consumer.ConsumerId);
                    this.consumer.DoClose();
                }
			}
		}

		#endregion
	}
}
