blob: f8930f100678cc618125226e414c818191cae9e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Amqp;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider;
using Apache.NMS.AMQP.Util;
namespace Apache.NMS.AMQP
{
public class NmsSession : ISession
{
private readonly ConcurrentDictionary<Id, NmsMessageConsumer> consumers = new ConcurrentDictionary<Id, NmsMessageConsumer>();
private readonly ConcurrentDictionary<Id, NmsMessageProducer> producers = new ConcurrentDictionary<Id, NmsMessageProducer>();
private readonly NestedIdGenerator consumerIdGenerator;
private readonly NestedIdGenerator producerIdGenerator;
private readonly AtomicBool closed = new AtomicBool();
private readonly AtomicBool started = new AtomicBool();
public SessionInfo SessionInfo { get; }
public NmsConnection Connection { get; }
private SessionDispatcher dispatcher;
private Exception failureCause;
private readonly AcknowledgementMode acknowledgementMode;
public NmsSession(NmsConnection connection, Id sessionId, AcknowledgementMode acknowledgementMode)
{
Connection = connection;
this.acknowledgementMode = acknowledgementMode;
SessionInfo = new SessionInfo(sessionId)
{
AcknowledgementMode = acknowledgementMode
};
consumerIdGenerator = new NestedIdGenerator("ID:consumer", SessionInfo.Id, true);
producerIdGenerator = new NestedIdGenerator("ID:producer", SessionInfo.Id, true);
if (AcknowledgementMode == AcknowledgementMode.Transactional)
TransactionContext = new NmsLocalTransactionContext(this);
else
TransactionContext = new NmsNoTxTransactionContext(this);
}
public bool IsStarted => started;
internal async Task Begin()
{
await Connection.CreateResource(SessionInfo).ConfigureAwait(false);
try
{
// We always keep an open TX if transacted so start now.
await TransactionContext.Begin().ConfigureAwait(false);
}
catch (Exception)
{
// failed, close the AMQP session before we throw
try
{
await Connection.DestroyResource(SessionInfo).ConfigureAwait(false);
}
catch (Exception)
{
// Ignore, throw original error
}
throw;
}
}
public void Close()
{
CheckIsOnDeliveryThread();
if (!closed)
{
Shutdown();
Connection.DestroyResource(SessionInfo);
}
}
public void Dispose()
{
try
{
Close();
}
catch (Exception ex)
{
Tracer.DebugFormat("Caught exception while disposing {0} {1}. Exception {2}", this.GetType().Name, this.SessionInfo.Id, ex);
}
}
public IMessageProducer CreateProducer()
{
return CreateProducer(null);
}
public IMessageProducer CreateProducer(IDestination destination)
{
return new NmsMessageProducer(producerIdGenerator.GenerateId(), this, destination);
}
public IMessageConsumer CreateConsumer(IDestination destination)
{
return CreateConsumer(destination, null);
}
public IMessageConsumer CreateConsumer(IDestination destination, string selector)
{
return CreateConsumer(destination, selector, false);
}
public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
{
CheckClosed();
NmsMessageConsumer messageConsumer = new NmsMessageConsumer(consumerIdGenerator.GenerateId(), this, destination, selector, noLocal);
messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
return messageConsumer;
}
public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
{
CheckClosed();
NmsMessageConsumer messageConsumer = new NmsDurableTopicSubscriber(consumerIdGenerator.GenerateId(), this, destination, name, selector, noLocal);
messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
return messageConsumer;
}
public void DeleteDurableConsumer(string name)
{
CheckClosed();
Connection.Unsubscribe(name);
}
public IQueueBrowser CreateBrowser(IQueue queue)
{
throw new NotImplementedException();
}
public IQueueBrowser CreateBrowser(IQueue queue, string selector)
{
throw new NotImplementedException();
}
public IQueue GetQueue(string name)
{
CheckClosed();
return new NmsQueue(name);
}
public ITopic GetTopic(string name)
{
CheckClosed();
return new NmsTopic(name);
}
public ITemporaryQueue CreateTemporaryQueue()
{
CheckClosed();
return Connection.CreateTemporaryQueue();
}
public ITemporaryTopic CreateTemporaryTopic()
{
CheckClosed();
return Connection.CreateTemporaryTopic();
}
public void DeleteDestination(IDestination destination)
{
CheckClosed();
if (destination == null)
return;
if (destination is ITemporaryQueue temporaryQueue)
temporaryQueue.Delete();
else if (destination is ITemporaryTopic temporaryTopic)
temporaryTopic.Delete();
else
throw new NotSupportedException("AMQP can not delete a Queue or Topic destination.");
}
public IMessage CreateMessage()
{
CheckClosed();
return Connection.MessageFactory.CreateMessage();
}
public ITextMessage CreateTextMessage()
{
CheckClosed();
return Connection.MessageFactory.CreateTextMessage();
}
public ITextMessage CreateTextMessage(string text)
{
CheckClosed();
return Connection.MessageFactory.CreateTextMessage(text);
}
public IMapMessage CreateMapMessage()
{
CheckClosed();
return Connection.MessageFactory.CreateMapMessage();
}
public IObjectMessage CreateObjectMessage(object body)
{
CheckClosed();
return Connection.MessageFactory.CreateObjectMessage(body);
}
public IBytesMessage CreateBytesMessage()
{
CheckClosed();
return Connection.MessageFactory.CreateBytesMessage();
}
public IBytesMessage CreateBytesMessage(byte[] body)
{
CheckClosed();
return Connection.MessageFactory.CreateBytesMessage(body);
}
public IStreamMessage CreateStreamMessage()
{
CheckClosed();
return Connection.MessageFactory.CreateStreamMessage();
}
public void Recover()
{
CheckClosed();
bool wasStarted = IsStarted;
Stop();
Connection.Recover(SessionInfo.Id).ConfigureAwait(false).GetAwaiter().GetResult();
if (wasStarted)
Start();
}
public void Commit()
{
CheckClosed();
TransactionContext.Commit().ConfigureAwait(false).GetAwaiter().GetResult();
}
public void Rollback()
{
CheckClosed();
// Stop processing any new messages that arrive
try
{
foreach (NmsMessageConsumer consumer in consumers.Values)
{
consumer.SuspendForRollback();
}
}
finally
{
TransactionContext.Rollback().ConfigureAwait(false).GetAwaiter().GetResult();
}
// Currently some consumers won't get suspended and some won't restart
// after a failed rollback.
foreach (NmsMessageConsumer consumer in consumers.Values)
{
consumer.ResumeAfterRollback();
}
}
private void CheckClosed()
{
if (closed)
{
if (failureCause == null)
throw new IllegalStateException("The Session is closed");
else
throw new IllegalStateException("The Session was closed due to an unrecoverable error.", failureCause);
}
}
public ConsumerTransformerDelegate ConsumerTransformer { get; set; }
public ProducerTransformerDelegate ProducerTransformer { get; set; }
public TimeSpan RequestTimeout { get; set; }
public bool Transacted => AcknowledgementMode == AcknowledgementMode.Transactional;
public AcknowledgementMode AcknowledgementMode
{
get
{
CheckClosed();
return acknowledgementMode;
}
}
public bool IsClosed => closed;
internal INmsTransactionContext TransactionContext { get; }
public event SessionTxEventDelegate TransactionStartedListener
{
add => this.TransactionContext.TransactionStartedListener += value;
remove => this.TransactionContext.TransactionStartedListener -= value;
}
public event SessionTxEventDelegate TransactionCommittedListener
{
add => this.TransactionContext.TransactionCommittedListener += value;
remove => this.TransactionContext.TransactionCommittedListener -= value;
}
public event SessionTxEventDelegate TransactionRolledBackListener
{
add => this.TransactionContext.TransactionRolledBackListener += value;
remove => this.TransactionContext.TransactionRolledBackListener -= value;
}
public void OnInboundMessage(InboundMessageDispatch envelope)
{
if (consumers.TryGetValue(envelope.ConsumerInfo.Id, out NmsMessageConsumer messageConsumer))
{
messageConsumer.OnInboundMessage(envelope);
}
else
{
Tracer.Error($"Could not dispatch message {envelope.Message.NMSMessageId} because consumer {envelope.ConsumerId} not found.");
}
}
public void Acknowledge(AckType ackType)
{
Connection.Acknowledge(SessionInfo.Id, ackType).ConfigureAwait(false).GetAwaiter().GetResult();
}
public void Acknowledge(AckType ackType, InboundMessageDispatch envelope)
{
TransactionContext.Acknowledge(envelope, ackType).ConfigureAwait(false).GetAwaiter().GetResult();
}
public void AcknowledgeIndividual(AckType ackType, InboundMessageDispatch envelope)
{
if (Transacted)
{
throw new IllegalStateException("Message acknowledge called inside a transacted Session");
}
Connection.Acknowledge(envelope, ackType).ConfigureAwait(false).GetAwaiter().GetResult();
}
public void Send(NmsMessageProducer producer, IDestination destination, IMessage original, MsgDeliveryMode deliveryMode,
MsgPriority priority, TimeSpan timeToLive, bool disableMessageId, bool disableMessageTimestamp)
{
if (destination == null)
throw new InvalidDestinationException("Destination must not be null");
if (original == null)
throw new MessageFormatException("Message must not be null");
NmsMessage outbound = null;
original.NMSDeliveryMode = deliveryMode;
original.NMSPriority = priority;
original.NMSRedelivered = false;
original.NMSDestination = destination;
bool isNmsMessage = original is NmsMessage;
DateTime timeStamp = DateTime.UtcNow;
bool hasTTL = timeToLive > TimeSpan.Zero;
if (!disableMessageTimestamp)
original.NMSTimestamp = timeStamp;
else
original.NMSTimestamp = DateTime.MinValue;
if (!disableMessageId)
original.NMSMessageId = producer.MessageIdGenerator.GenerateId().ToString();
else
original.NMSMessageId = null;
if (isNmsMessage)
outbound = (NmsMessage) original;
else
outbound = NmsMessageTransformation.TransformMessage(Connection.MessageFactory, original);
if (hasTTL)
outbound.Facade.Expiration = timeStamp + timeToLive;
else
outbound.Facade.Expiration = null;
outbound.OnSend(timeToLive);
bool sync = deliveryMode == MsgDeliveryMode.Persistent;
TransactionContext.Send(new OutboundMessageDispatch
{
Message = outbound,
ProducerId = producer.Info.Id,
ProducerInfo = producer.Info,
SendAsync = !sync
}).ConfigureAwait(false).GetAwaiter().GetResult();
}
internal void EnqueueForDispatch(NmsMessageConsumer.MessageDeliveryTask task)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Message enqueued for dispatch.");
}
dispatcher?.Post(task);
}
public NmsMessageConsumer ConsumerClosed(Id consumerId, Exception cause)
{
Tracer.Info($"A NMS MessageConsumer has been closed: {consumerId}");
if (consumers.TryGetValue(consumerId, out NmsMessageConsumer consumer))
{
if (consumer.HasMessageListener())
{
Connection.OnAsyncException(cause);
}
}
try
{
consumer?.Shutdown(cause);
}
catch (Exception error)
{
Tracer.DebugFormat("Ignoring exception thrown during cleanup of closed consumer", error);
}
return consumer;
}
public NmsMessageProducer ProducerClosed(Id producerId, Exception cause)
{
Tracer.Info($"A NmsMessageProducer has been closed: {producerId}. Cause: {cause}");
if (producers.TryGetValue(producerId, out NmsMessageProducer producer))
{
try
{
producer.Shutdown(cause);
}
catch (Exception error)
{
Tracer.DebugFormat("Ignoring exception thrown during cleanup of closed producer", error);
}
}
else
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"NmsMessageProducer: {producerId} not found in session {this.SessionInfo.Id}.");
}
}
return producer;
}
public void Add(NmsMessageConsumer messageConsumer)
{
consumers.TryAdd(messageConsumer.Info.Id, messageConsumer);
}
public void Add(NmsMessageProducer messageProducer)
{
producers.TryAdd(messageProducer.Info.Id, messageProducer);
}
public void Remove(NmsMessageConsumer messageConsumer)
{
consumers.TryRemove(messageConsumer.Info.Id, out _);
}
public void Remove(NmsMessageProducer messageProducer)
{
producers.TryRemove(messageProducer.Info.Id, out _);
}
public void Shutdown(NMSException exception = null)
{
if (closed.CompareAndSet(false, true))
{
failureCause = exception;
Stop();
try
{
foreach (NmsMessageConsumer consumer in consumers.Values.ToArray())
consumer.Shutdown(exception);
foreach (NmsMessageProducer producer in producers.Values.ToArray())
producer.Shutdown(exception);
TransactionContext.Shutdown().ConfigureAwait(false).GetAwaiter().GetResult();
}
finally
{
Connection.RemoveSession(SessionInfo);
}
}
}
public void Start()
{
if (started.CompareAndSet(false, true))
{
dispatcher = new SessionDispatcher();
foreach (NmsMessageConsumer consumer in consumers.Values.ToArray())
{
consumer.Start();
}
}
}
internal void CheckIsOnDeliveryThread()
{
if (dispatcher != null && dispatcher.IsOnDeliveryThread())
{
throw new IllegalStateException("Illegal invocation from MessageListener callback");
}
}
public async Task OnConnectionRecovery(IProvider provider)
{
await provider.CreateResource(SessionInfo).ConfigureAwait(false);
foreach (NmsMessageConsumer consumer in consumers.Values)
{
await consumer.OnConnectionRecovery(provider).ConfigureAwait(false);
}
foreach (NmsMessageProducer producer in producers.Values)
{
await producer.OnConnectionRecovery(provider).ConfigureAwait(false);
}
}
public async Task OnConnectionRecovered(IProvider provider)
{
foreach (NmsMessageConsumer consumer in consumers.Values)
{
await consumer.OnConnectionRecovered(provider).ConfigureAwait(false);
}
}
public void Stop()
{
if (started.CompareAndSet(true, false))
{
dispatcher.Stop();
dispatcher = null;
foreach (NmsMessageConsumer consumer in consumers.Values)
{
consumer.Stop();
}
}
}
public bool IsDestinationInUse(NmsTemporaryDestination destination)
{
foreach (var consumer in consumers.Values.ToArray())
{
if (consumer.IsDestinationInUse(destination))
{
return true;
}
}
return false;
}
public void OnConnectionInterrupted()
{
TransactionContext.OnConnectionInterrupted();
foreach (NmsMessageConsumer consumer in consumers.Values)
{
consumer.OnConnectionInterrupted();
}
}
internal bool IsIndividualAcknowledge() => acknowledgementMode == AcknowledgementMode.IndividualAcknowledge;
}
}