| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| using System; |
| using System.Threading; |
| using System.Collections; |
| using System.Collections.Generic; |
| using Apache.NMS.Stomp.Commands; |
| using Apache.NMS.Stomp.Util; |
| using Apache.NMS; |
| using Apache.NMS.Util; |
| |
| namespace Apache.NMS.Stomp |
| { |
| public enum AckType |
| { |
| ConsumedAck = 1, // Message consumed, discard |
| IndividualAck = 2 // Only the given message is to be treated as consumed. |
| } |
| |
| /// <summary> |
| /// An object capable of receiving messages from some destination |
| /// </summary> |
| public class MessageConsumer : IMessageConsumer, IDispatcher |
| { |
| private readonly MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel(); |
| private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>(); |
| private readonly ConsumerInfo info; |
| private Session session; |
| |
| private MessageAck pendingAck = null; |
| |
| private Atomic<bool> started = new Atomic<bool>(); |
| private Atomic<bool> deliveringAcks = new Atomic<bool>(); |
| |
| protected bool disposed = false; |
| private int deliveredCounter = 0; |
| private int additionalWindowSize = 0; |
| private long redeliveryDelay = 0; |
| private int dispatchedCount = 0; |
| private volatile bool synchronizationRegistered = false; |
| private bool clearDispatchList = false; |
| |
| private event MessageListener listener; |
| |
| private IRedeliveryPolicy redeliveryPolicy; |
| |
| // Constructor internal to prevent clients from creating an instance. |
| internal MessageConsumer(Session session, ConsumerInfo info) |
| { |
| this.session = session; |
| this.info = info; |
| this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy; |
| } |
| |
| ~MessageConsumer() |
| { |
| Dispose(false); |
| } |
| |
| #region Property Accessors |
| |
| public ConsumerId ConsumerId |
| { |
| get { return info.ConsumerId; } |
| } |
| |
| public int PrefetchSize |
| { |
| get { return this.info.PrefetchSize; } |
| } |
| |
| public IRedeliveryPolicy RedeliveryPolicy |
| { |
| get { return this.redeliveryPolicy; } |
| set { this.redeliveryPolicy = value; } |
| } |
| |
| #endregion |
| |
| #region IMessageConsumer Members |
| |
| public event MessageListener Listener |
| { |
| add |
| { |
| CheckClosed(); |
| |
| if(this.PrefetchSize == 0) |
| { |
| throw new NMSException("Cannot set Asynchronous Listener on a Consumer with a zero Prefetch size"); |
| } |
| |
| bool wasStarted = this.session.Started; |
| |
| if(wasStarted == true) |
| { |
| this.session.Stop(); |
| } |
| |
| listener += value; |
| this.session.Redispatch(this.unconsumedMessages); |
| |
| if(wasStarted == true) |
| { |
| this.session.Start(); |
| } |
| } |
| remove { listener -= value; } |
| } |
| |
| public IMessage Receive() |
| { |
| CheckClosed(); |
| CheckMessageListener(); |
| |
| MessageDispatch dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1)); |
| |
| if(dispatch == null) |
| { |
| return null; |
| } |
| |
| BeforeMessageIsConsumed(dispatch); |
| AfterMessageIsConsumed(dispatch, false); |
| |
| return CreateStompMessage(dispatch); |
| } |
| |
| public IMessage Receive(TimeSpan timeout) |
| { |
| CheckClosed(); |
| CheckMessageListener(); |
| |
| MessageDispatch dispatch = this.Dequeue(timeout); |
| |
| if(dispatch == null) |
| { |
| return null; |
| } |
| |
| BeforeMessageIsConsumed(dispatch); |
| AfterMessageIsConsumed(dispatch, false); |
| |
| return CreateStompMessage(dispatch); |
| } |
| |
| public IMessage ReceiveNoWait() |
| { |
| CheckClosed(); |
| CheckMessageListener(); |
| |
| MessageDispatch dispatch = this.Dequeue(TimeSpan.Zero); |
| |
| if(dispatch == null) |
| { |
| return null; |
| } |
| |
| BeforeMessageIsConsumed(dispatch); |
| AfterMessageIsConsumed(dispatch, false); |
| |
| return CreateStompMessage(dispatch); |
| } |
| |
| public void Dispose() |
| { |
| Dispose(true); |
| GC.SuppressFinalize(this); |
| } |
| |
| protected void Dispose(bool disposing) |
| { |
| if(disposed) |
| { |
| return; |
| } |
| |
| if(disposing) |
| { |
| // Dispose managed code here. |
| } |
| |
| try |
| { |
| Close(); |
| } |
| catch |
| { |
| // Ignore network errors. |
| } |
| |
| disposed = true; |
| } |
| |
| public void Close() |
| { |
| if(!this.unconsumedMessages.Closed) |
| { |
| if(this.session.IsTransacted && this.session.TransactionContext.InTransaction) |
| { |
| this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this)); |
| } |
| else |
| { |
| this.DoClose(); |
| } |
| } |
| } |
| |
| internal void DoClose() |
| { |
| if(!this.unconsumedMessages.Closed) |
| { |
| Tracer.Debug("Closing down the Consumer"); |
| |
| if(!this.session.IsTransacted) |
| { |
| lock(this.dispatchedMessages) |
| { |
| dispatchedMessages.Clear(); |
| } |
| } |
| |
| this.unconsumedMessages.Close(); |
| this.session.DisposeOf(this.info.ConsumerId); |
| |
| RemoveInfo removeCommand = new RemoveInfo(); |
| removeCommand.ObjectId = this.info.ConsumerId; |
| |
| this.session.Connection.Oneway(removeCommand); |
| this.session = null; |
| |
| Tracer.Debug("Consumer instnace Closed."); |
| } |
| } |
| |
| #endregion |
| |
| protected void DoIndividualAcknowledge(Message message) |
| { |
| MessageDispatch dispatch = null; |
| |
| lock(this.dispatchedMessages) |
| { |
| foreach(MessageDispatch originalDispatch in this.dispatchedMessages) |
| { |
| if(originalDispatch.Message.MessageId.Equals(message.MessageId)) |
| { |
| dispatch = originalDispatch; |
| this.dispatchedMessages.Remove(originalDispatch); |
| break; |
| } |
| } |
| } |
| |
| if(dispatch == null) |
| { |
| Tracer.DebugFormat("Attempt to Ack MessageId[{0}] failed because the original dispatch is not in the Dispatch List", message.MessageId); |
| return; |
| } |
| |
| MessageAck ack = new MessageAck(); |
| |
| ack.AckType = (byte) AckType.IndividualAck; |
| ack.ConsumerId = this.info.ConsumerId; |
| ack.Destination = dispatch.Destination; |
| ack.LastMessageId = dispatch.Message.MessageId; |
| ack.MessageCount = 1; |
| |
| Tracer.Debug("Sending Individual Ack for MessageId: " + ack.LastMessageId.ToString()); |
| this.session.SendAck(ack); |
| } |
| |
| protected void DoNothingAcknowledge(Message message) |
| { |
| } |
| |
| protected void DoClientAcknowledge(Message message) |
| { |
| this.CheckClosed(); |
| Tracer.Debug("Sending Client Ack:"); |
| this.session.Acknowledge(); |
| } |
| |
| public void Start() |
| { |
| if(this.unconsumedMessages.Closed) |
| { |
| return; |
| } |
| |
| this.started.Value = true; |
| this.unconsumedMessages.Start(); |
| this.session.Executor.Wakeup(); |
| } |
| |
| public void Stop() |
| { |
| this.started.Value = false; |
| this.unconsumedMessages.Stop(); |
| } |
| |
| public void ClearMessagesInProgress() |
| { |
| // we are called from inside the transport reconnection logic |
| // which involves us clearing all the connections' consumers |
| // dispatch lists and clearing them |
| // so rather than trying to grab a mutex (which could be already |
| // owned by the message listener calling the send) we will just set |
| // a flag so that the list can be cleared as soon as the |
| // dispatch thread is ready to flush the dispatch list |
| this.clearDispatchList = true; |
| } |
| |
| public void DeliverAcks() |
| { |
| MessageAck ack = null; |
| |
| if(this.deliveringAcks.CompareAndSet(false, true)) |
| { |
| if(pendingAck != null && pendingAck.AckType == (byte) AckType.ConsumedAck) |
| { |
| ack = pendingAck; |
| pendingAck = null; |
| } |
| |
| if(pendingAck != null) |
| { |
| MessageAck ackToSend = ack; |
| |
| try |
| { |
| this.session.SendAck(ackToSend); |
| } |
| catch(Exception e) |
| { |
| Tracer.DebugFormat("{0} : Failed to send ack, {1}", this.info.ConsumerId, e); |
| } |
| } |
| else |
| { |
| this.deliveringAcks.Value = false; |
| } |
| } |
| } |
| |
| public void Dispatch(MessageDispatch dispatch) |
| { |
| MessageListener listener = this.listener; |
| |
| try |
| { |
| lock(this.unconsumedMessages.SyncRoot) |
| { |
| if(this.clearDispatchList) |
| { |
| // we are reconnecting so lets flush the in progress messages |
| this.clearDispatchList = false; |
| this.unconsumedMessages.Clear(); |
| |
| if(this.pendingAck != null) |
| { |
| // on resumption a pending delivered ack will be out of sync with |
| // re-deliveries. |
| if(Tracer.IsDebugEnabled) |
| { |
| Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck); |
| } |
| this.pendingAck = null; |
| } |
| } |
| |
| if(!this.unconsumedMessages.Closed) |
| { |
| if(listener != null && this.unconsumedMessages.Running) |
| { |
| Message message = CreateStompMessage(dispatch); |
| |
| this.BeforeMessageIsConsumed(dispatch); |
| |
| try |
| { |
| bool expired = message.IsExpired(); |
| |
| if(!expired) |
| { |
| listener(message); |
| } |
| |
| this.AfterMessageIsConsumed(dispatch, expired); |
| } |
| catch(Exception e) |
| { |
| if(this.session.IsAutoAcknowledge || this.session.IsIndividualAcknowledge) |
| { |
| // Redeliver the message |
| } |
| else |
| { |
| // Transacted or Client ack: Deliver the next message. |
| this.AfterMessageIsConsumed(dispatch, false); |
| } |
| |
| Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e); |
| } |
| } |
| else |
| { |
| this.unconsumedMessages.Enqueue(dispatch); |
| } |
| } |
| } |
| |
| if(++dispatchedCount % 1000 == 0) |
| { |
| dispatchedCount = 0; |
| Thread.Sleep((int)1); |
| } |
| } |
| catch(Exception e) |
| { |
| this.session.Connection.OnSessionException(this.session, e); |
| } |
| } |
| |
| public bool Iterate() |
| { |
| if(this.listener != null) |
| { |
| MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait(); |
| if(dispatch != null) |
| { |
| try |
| { |
| Message message = CreateStompMessage(dispatch); |
| BeforeMessageIsConsumed(dispatch); |
| listener(message); |
| AfterMessageIsConsumed(dispatch, false); |
| } |
| catch(NMSException e) |
| { |
| this.session.Connection.OnSessionException(this.session, e); |
| } |
| |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /// <summary> |
| /// Used to get an enqueued message from the unconsumedMessages list. The |
| /// amount of time this method blocks is based on the timeout value. if |
| /// timeout == Timeout.Infinite then it blocks until a message is received. |
| /// if timeout == 0 then it it tries to not block at all, it returns a |
| /// message if it is available if timeout > 0 then it blocks up to timeout |
| /// amount of time. Expired messages will consumed by this method. |
| /// </summary> |
| /// <param name="timeout"> |
| /// A <see cref="System.TimeSpan"/> |
| /// </param> |
| /// <returns> |
| /// A <see cref="MessageDispatch"/> |
| /// </returns> |
| private MessageDispatch Dequeue(TimeSpan timeout) |
| { |
| DateTime deadline = DateTime.Now; |
| |
| if(timeout > TimeSpan.Zero) |
| { |
| deadline += timeout; |
| } |
| |
| while(true) |
| { |
| MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout); |
| |
| // Grab a single date/time for calculations to avoid timing errors. |
| DateTime dispatchTime = DateTime.Now; |
| |
| if(dispatch == null) |
| { |
| if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed) |
| { |
| if(dispatchTime > deadline) |
| { |
| // Out of time. |
| timeout = TimeSpan.Zero; |
| } |
| else |
| { |
| // Adjust the timeout to the remaining time. |
| timeout = deadline - dispatchTime; |
| } |
| } |
| else |
| { |
| return null; |
| } |
| } |
| else if(dispatch.Message == null) |
| { |
| return null; |
| } |
| else if(dispatch.Message.IsExpired()) |
| { |
| Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId); |
| |
| BeforeMessageIsConsumed(dispatch); |
| AfterMessageIsConsumed(dispatch, true); |
| // Refresh the dispatch time |
| dispatchTime = DateTime.Now; |
| |
| if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed) |
| { |
| if(dispatchTime > deadline) |
| { |
| // Out of time. |
| timeout = TimeSpan.Zero; |
| } |
| else |
| { |
| // Adjust the timeout to the remaining time. |
| timeout = deadline - dispatchTime; |
| } |
| } |
| } |
| else |
| { |
| return dispatch; |
| } |
| } |
| } |
| |
| public void BeforeMessageIsConsumed(MessageDispatch dispatch) |
| { |
| if(!this.session.IsAutoAcknowledge) |
| { |
| lock(this.dispatchedMessages) |
| { |
| this.dispatchedMessages.AddFirst(dispatch); |
| } |
| |
| if(this.session.IsTransacted) |
| { |
| this.AckLater(dispatch); |
| } |
| } |
| } |
| |
| public void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired) |
| { |
| if(this.unconsumedMessages.Closed) |
| { |
| return; |
| } |
| |
| if(expired == true) |
| { |
| lock(this.dispatchedMessages) |
| { |
| this.dispatchedMessages.Remove(dispatch); |
| } |
| |
| // TODO - Not sure if we need to ack this in stomp. |
| // AckLater(dispatch, AckType.ConsumedAck); |
| } |
| else |
| { |
| if(this.session.IsTransacted) |
| { |
| // Do nothing. |
| } |
| else if(this.session.IsAutoAcknowledge) |
| { |
| if(this.deliveringAcks.CompareAndSet(false, true)) |
| { |
| lock(this.dispatchedMessages) |
| { |
| MessageAck ack = new MessageAck(); |
| |
| ack.AckType = (byte) AckType.ConsumedAck; |
| ack.ConsumerId = this.info.ConsumerId; |
| ack.Destination = dispatch.Destination; |
| ack.LastMessageId = dispatch.Message.MessageId; |
| ack.MessageCount = 1; |
| |
| this.session.SendAck(ack); |
| } |
| |
| this.deliveringAcks.Value = false; |
| } |
| } |
| else if(this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge) |
| { |
| // Do nothing. |
| } |
| else |
| { |
| throw new NMSException("Invalid session state."); |
| } |
| } |
| } |
| |
| private MessageAck MakeAckForAllDeliveredMessages() |
| { |
| lock(this.dispatchedMessages) |
| { |
| if(this.dispatchedMessages.Count == 0) |
| { |
| return null; |
| } |
| |
| MessageDispatch dispatch = this.dispatchedMessages.First.Value; |
| MessageAck ack = new MessageAck(); |
| |
| ack.AckType = (byte) AckType.ConsumedAck; |
| ack.ConsumerId = this.info.ConsumerId; |
| ack.Destination = dispatch.Destination; |
| ack.LastMessageId = dispatch.Message.MessageId; |
| ack.MessageCount = this.dispatchedMessages.Count; |
| ack.FirstMessageId = this.dispatchedMessages.Last.Value.Message.MessageId; |
| |
| return ack; |
| } |
| } |
| |
| private void AckLater(MessageDispatch dispatch) |
| { |
| // Don't acknowledge now, but we may need to let the broker know the |
| // consumer got the message to expand the pre-fetch window |
| if(this.session.IsTransacted) |
| { |
| this.session.DoStartTransaction(); |
| |
| if(!synchronizationRegistered) |
| { |
| this.synchronizationRegistered = true; |
| this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this)); |
| } |
| } |
| |
| this.deliveredCounter++; |
| |
| MessageAck oldPendingAck = pendingAck; |
| |
| pendingAck = new MessageAck(); |
| pendingAck.AckType = (byte) AckType.ConsumedAck; |
| pendingAck.ConsumerId = this.info.ConsumerId; |
| pendingAck.Destination = dispatch.Destination; |
| pendingAck.LastMessageId = dispatch.Message.MessageId; |
| pendingAck.MessageCount = deliveredCounter; |
| |
| if(this.session.IsTransacted && this.session.TransactionContext.InTransaction) |
| { |
| pendingAck.TransactionId = this.session.TransactionContext.TransactionId; |
| } |
| |
| if(oldPendingAck == null) |
| { |
| pendingAck.FirstMessageId = pendingAck.LastMessageId; |
| } |
| |
| if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize)) |
| { |
| this.session.SendAck(pendingAck); |
| this.pendingAck = null; |
| this.deliveredCounter = 0; |
| this.additionalWindowSize = 0; |
| } |
| } |
| |
| internal void Acknowledge() |
| { |
| lock(this.dispatchedMessages) |
| { |
| // Acknowledge all messages so far. |
| MessageAck ack = MakeAckForAllDeliveredMessages(); |
| |
| if(ack == null) |
| { |
| return; // no msgs |
| } |
| |
| if(this.session.IsTransacted) |
| { |
| this.session.DoStartTransaction(); |
| ack.TransactionId = this.session.TransactionContext.TransactionId; |
| } |
| |
| this.session.SendAck(ack); |
| this.pendingAck = null; |
| |
| // Adjust the counters |
| this.deliveredCounter = Math.Max(0, this.deliveredCounter - this.dispatchedMessages.Count); |
| this.additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count); |
| |
| if(!this.session.IsTransacted) |
| { |
| this.dispatchedMessages.Clear(); |
| } |
| } |
| } |
| |
| private void Commit() |
| { |
| lock(this.dispatchedMessages) |
| { |
| this.dispatchedMessages.Clear(); |
| } |
| |
| this.redeliveryDelay = 0; |
| } |
| |
| private void Rollback() |
| { |
| lock(this.unconsumedMessages.SyncRoot) |
| { |
| lock(this.dispatchedMessages) |
| { |
| if(this.dispatchedMessages.Count == 0) |
| { |
| return; |
| } |
| |
| // Only increase the redelivery delay after the first redelivery.. |
| MessageDispatch lastMd = this.dispatchedMessages.First.Value; |
| int currentRedeliveryCount = lastMd.Message.RedeliveryCounter; |
| |
| redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount); |
| |
| // MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId; |
| |
| foreach(MessageDispatch dispatch in this.dispatchedMessages) |
| { |
| // Allow the message to update its internal to reflect a Rollback. |
| dispatch.Message.OnMessageRollback(); |
| } |
| |
| if(this.redeliveryPolicy.MaximumRedeliveries >= 0 && |
| lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries) |
| { |
| this.redeliveryDelay = 0; |
| } |
| else |
| { |
| // stop the delivery of messages. |
| this.unconsumedMessages.Stop(); |
| |
| foreach(MessageDispatch dispatch in this.dispatchedMessages) |
| { |
| this.unconsumedMessages.EnqueueFirst(dispatch); |
| } |
| |
| if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed) |
| { |
| DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay); |
| ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline); |
| } |
| else |
| { |
| Start(); |
| } |
| } |
| |
| this.deliveredCounter -= this.dispatchedMessages.Count; |
| this.dispatchedMessages.Clear(); |
| } |
| } |
| |
| // Only redispatch if there's an async listener otherwise a synchronous |
| // consumer will pull them from the local queue. |
| if(this.listener != null) |
| { |
| this.session.Redispatch(this.unconsumedMessages); |
| } |
| } |
| |
| private void RollbackHelper(Object arg) |
| { |
| try |
| { |
| TimeSpan waitTime = (DateTime) arg - DateTime.Now; |
| |
| if(waitTime.CompareTo(TimeSpan.Zero) > 0) |
| { |
| Thread.Sleep((int)waitTime.TotalMilliseconds); |
| } |
| |
| this.Start(); |
| } |
| catch(Exception e) |
| { |
| if(!this.unconsumedMessages.Closed) |
| { |
| this.session.Connection.OnSessionException(this.session, e); |
| } |
| } |
| } |
| |
| private Message CreateStompMessage(MessageDispatch dispatch) |
| { |
| Message message = dispatch.Message.Clone() as Message; |
| |
| message.Connection = this.session.Connection; |
| |
| if(this.session.IsClientAcknowledge) |
| { |
| message.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge); |
| } |
| else if(this.session.IsIndividualAcknowledge) |
| { |
| message.Acknowledger += new AcknowledgeHandler(DoIndividualAcknowledge); |
| } |
| else |
| { |
| message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge); |
| } |
| |
| return message; |
| } |
| |
| private void CheckClosed() |
| { |
| if(this.unconsumedMessages.Closed) |
| { |
| throw new NMSException("The Consumer has been Closed"); |
| } |
| } |
| |
| private void CheckMessageListener() |
| { |
| if(this.listener != null) |
| { |
| throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero"); |
| } |
| } |
| |
| #region Nested ISyncronization Types |
| |
| class MessageConsumerSynchronization : ISynchronization |
| { |
| private readonly MessageConsumer consumer; |
| |
| public MessageConsumerSynchronization(MessageConsumer consumer) |
| { |
| this.consumer = consumer; |
| } |
| |
| public void BeforeEnd() |
| { |
| this.consumer.Acknowledge(); |
| this.consumer.synchronizationRegistered = false; |
| } |
| |
| public void AfterCommit() |
| { |
| this.consumer.Commit(); |
| this.consumer.synchronizationRegistered = false; |
| } |
| |
| public void AfterRollback() |
| { |
| this.consumer.Rollback(); |
| this.consumer.synchronizationRegistered = false; |
| } |
| } |
| |
| class ConsumerCloseSynchronization : ISynchronization |
| { |
| private readonly MessageConsumer consumer; |
| |
| public ConsumerCloseSynchronization(MessageConsumer consumer) |
| { |
| this.consumer = consumer; |
| } |
| |
| public void BeforeEnd() |
| { |
| } |
| |
| public void AfterCommit() |
| { |
| this.consumer.DoClose(); |
| } |
| |
| public void AfterRollback() |
| { |
| this.consumer.DoClose(); |
| } |
| } |
| |
| #endregion |
| } |
| } |