| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Linq; |
| using System.Threading.Tasks; |
| using Apache.NMS.AMQP.Message; |
| using Apache.NMS.AMQP.Meta; |
| using Apache.NMS.AMQP.Provider; |
| using Apache.NMS.AMQP.Util; |
| using Apache.NMS.Util; |
| |
| namespace Apache.NMS.AMQP |
| { |
| /// <summary> |
| /// Manages the details of a Session operating inside of a local NMS transaction. |
| /// </summary> |
| internal sealed class NmsLocalTransactionContext : INmsTransactionContext |
| { |
| private readonly NmsConnection connection; |
| private readonly HashSet<Id> participants = new HashSet<Id>(); |
| private readonly NmsSession session; |
| private TransactionInfo transactionInfo; |
| |
| public NmsLocalTransactionContext(NmsSession session) |
| { |
| this.session = session; |
| this.connection = session.Connection; |
| } |
| |
| public async Task Send(OutboundMessageDispatch envelope) |
| { |
| if (!IsInDoubt()) |
| { |
| await this.connection.Send(envelope); |
| this.participants.Add(envelope.ProducerId); |
| } |
| } |
| |
| public async Task Acknowledge(InboundMessageDispatch envelope, AckType ackType) |
| { |
| // Consumed or delivered messages fall into a transaction otherwise just pass it in. |
| if (ackType == AckType.ACCEPTED || ackType == AckType.DELIVERED) |
| { |
| try |
| { |
| await this.connection.Acknowledge(envelope, ackType).ConfigureAwait(false); |
| this.participants.Add(envelope.ConsumerId); |
| Tracer.Debug($"TX:{this.transactionInfo.Id} has performed an acknowledge."); |
| } |
| catch (Exception) |
| { |
| Tracer.Debug($"TX:{this.transactionInfo.Id} has failed an acknowledge."); |
| this.participants.Add(envelope.ConsumerId); |
| throw; |
| } |
| } |
| else |
| { |
| await this.connection.Acknowledge(envelope, ackType).ConfigureAwait(false); |
| } |
| } |
| |
| public async Task Begin() |
| { |
| this.transactionInfo = GetNextTransactionInfo(); |
| |
| try |
| { |
| Reset(); |
| await this.session.Connection.CreateResource(this.transactionInfo); |
| OnTransactionStarted(); |
| Tracer.Debug($"Begin: {this.transactionInfo.Id}"); |
| } |
| catch (Exception) |
| { |
| this.transactionInfo.SetInDoubt(); |
| throw; |
| } |
| } |
| |
| public Task Rollback() |
| { |
| return DoRollback(true); |
| } |
| |
| public Task Shutdown() |
| { |
| return DoRollback(false); |
| } |
| |
| public void OnConnectionInterrupted() |
| { |
| this.transactionInfo.SetInDoubt(); |
| } |
| |
| public async Task OnConnectionRecovery(IProvider provider) |
| { |
| if (this.participants.Any()) |
| { |
| Tracer.Debug($"Transaction recovery marking current TX:{this.transactionInfo.Id} as in-doubt."); |
| this.transactionInfo.SetInDoubt(); |
| } |
| else |
| { |
| this.transactionInfo = GetNextTransactionInfo(); |
| Tracer.Debug($"Transaction recovery creating new TX:{this.transactionInfo.Id} after failover."); |
| await provider.CreateResource(this.transactionInfo).ConfigureAwait(false); |
| } |
| } |
| |
| public bool IsActiveInThisContext(Id infoId) |
| { |
| return this.participants.Contains(infoId); |
| } |
| |
| public event SessionTxEventDelegate TransactionStartedListener; |
| |
| public event SessionTxEventDelegate TransactionCommittedListener; |
| |
| public event SessionTxEventDelegate TransactionRolledBackListener; |
| |
| public async Task Commit() |
| { |
| if (IsInDoubt()) |
| { |
| try |
| { |
| await Rollback(); |
| } |
| catch (Exception e) |
| { |
| Tracer.WarnFormat("Error during rollback of failed TX: ", e); |
| } |
| |
| throw new TransactionRolledBackException("Transaction failed and has been rolled back."); |
| } |
| Tracer.Debug($"Commit: {this.transactionInfo.Id}"); |
| |
| var oldTransactionId = this.transactionInfo.Id; |
| var nextTx = GetNextTransactionInfo(); |
| |
| try |
| { |
| await this.connection.Commit(this.transactionInfo, nextTx); |
| OnTransactionCommitted(); |
| Reset(); |
| this.transactionInfo = nextTx; |
| } |
| catch (NMSException) |
| { |
| Tracer.Info($"Commit failed for transaction :{oldTransactionId}"); |
| throw; |
| } |
| finally |
| { |
| try |
| { |
| // If the provider failed to start a new transaction there will not be |
| // a current provider transaction id present, so we attempt to create |
| // one to recover our state. |
| if (nextTx.ProviderTxId == null) |
| { |
| await Begin(); |
| } |
| } |
| catch (Exception e) |
| { |
| // TODO |
| // At this point the transacted session is now unrecoverable, we should |
| // probably close it. |
| Tracer.Info($"Failed to start new Transaction after failed rollback of: {oldTransactionId} {e}"); |
| } |
| } |
| } |
| |
| private async Task DoRollback(bool startNewTransaction) |
| { |
| Tracer.Debug($"Rollback: {this.transactionInfo.Id}"); |
| |
| var oldTransactionId = this.transactionInfo.Id; |
| var nextTx = startNewTransaction ? GetNextTransactionInfo() : null; |
| |
| try |
| { |
| await this.connection.Rollback(this.transactionInfo, nextTx); |
| OnTransactionRolledBack(); |
| Reset(); |
| this.transactionInfo = nextTx; |
| } |
| catch (Exception e) |
| { |
| Tracer.Info($"Rollback failed for transaction: {oldTransactionId}"); |
| throw NMSExceptionSupport.Create(e); |
| } |
| finally |
| { |
| Reset(); |
| |
| try |
| { |
| // If the provider failed to start a new transaction there will not be |
| // a current provider transaction id present, so we attempt to create |
| // one to recover our state. |
| if (startNewTransaction && nextTx.ProviderTxId == null) |
| { |
| await Begin(); |
| } |
| } |
| catch (Exception e) |
| { |
| // TODO |
| // At this point the transacted session is now unrecoverable, we should |
| // probably close it. |
| Tracer.Info($"Failed to start new Transaction after failed rollback of: {this.transactionInfo} {e}"); |
| } |
| } |
| } |
| |
| private TransactionInfo GetNextTransactionInfo() |
| { |
| var transactionId = this.connection.TransactionIdGenerator.GenerateId(); |
| return new TransactionInfo(transactionId, this.session.SessionInfo.Id); |
| } |
| |
| private bool IsInDoubt() |
| { |
| return this.transactionInfo?.IsInDoubt ?? false; |
| } |
| |
| private void Reset() |
| { |
| this.participants.Clear(); |
| } |
| |
| private void OnTransactionStarted() |
| { |
| try |
| { |
| TransactionStartedListener?.Invoke(this.session); |
| } |
| catch (Exception e) |
| { |
| Tracer.Warn($"Local TX listener error ignored: {e}"); |
| } |
| } |
| |
| private void OnTransactionCommitted() |
| { |
| try |
| { |
| TransactionCommittedListener?.Invoke(this.session); |
| } |
| catch (Exception e) |
| { |
| Tracer.Warn($"Local TX listener error ignored: {e}"); |
| } |
| } |
| |
| private void OnTransactionRolledBack() |
| { |
| try |
| { |
| TransactionRolledBackListener?.Invoke(this.session); |
| } |
| catch (Exception e) |
| { |
| Tracer.Warn($"Local TX listener error ignored: {e}"); |
| } |
| } |
| } |
| } |