blob: 9ecffa96c19d36d78bdca47baef0d31e57605389 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider;
using Apache.NMS.AMQP.Util;
using Apache.NMS.Util;
namespace Apache.NMS.AMQP
{
public class NmsConnection : IConnection, IProviderListener
{
private readonly AtomicBool closed = new AtomicBool();
private readonly AtomicBool connected = new AtomicBool();
private readonly HashSet<INmsConnectionListener> connectionListeners = new HashSet<INmsConnectionListener>();
private readonly IProvider provider;
private readonly ConcurrentDictionary<NmsSessionId, NmsSession> sessions = new ConcurrentDictionary<NmsSessionId, NmsSession>();
private readonly ConcurrentDictionary<NmsTemporaryDestination, NmsTemporaryDestination> tempDestinations = new ConcurrentDictionary<NmsTemporaryDestination, NmsTemporaryDestination>();
private readonly AtomicBool started = new AtomicBool();
private readonly AtomicLong sessionIdGenerator = new AtomicLong();
private readonly AtomicLong temporaryTopicIdGenerator = new AtomicLong();
private readonly AtomicLong temporaryQueueIdGenerator = new AtomicLong();
private readonly AtomicLong transactionIdGenerator = new AtomicLong();
private Exception failureCause;
private readonly object syncRoot = new object();
public NmsConnection(NmsConnectionInfo connectionInfo, IProvider provider)
{
if (provider == null)
{
throw new ArgumentNullException($"{nameof(provider)}", "Remove provider instance not set.");
}
ConnectionInfo = connectionInfo;
this.provider = provider;
provider.SetProviderListener(this);
try
{
provider.Start();
}
catch (Exception e)
{
throw NMSExceptionSupport.Create(e);
}
}
public NmsConnectionInfo ConnectionInfo { get; }
public bool IsClosed => closed.Value;
public bool IsConnected => connected.Value;
public NmsConnectionId Id => ConnectionInfo.Id;
public INmsMessageFactory MessageFactory { get; private set; }
public void Dispose()
{
try
{
Close();
}
catch (Exception ex)
{
Tracer.DebugFormat("Caught exception while disposing {0} {1}. Exception {2}", GetType().Name, ConnectionInfo.Id, ex);
}
}
public void Stop()
{
DoStop(true);
}
private void DoStop(bool checkClosed)
{
if (checkClosed)
CheckClosedOrFailed();
CheckIsOnDeliveryThread();
if (started.CompareAndSet(true, false))
{
foreach (NmsSession session in sessions.Values)
{
session.Stop();
}
}
}
public ISession CreateSession()
{
return CreateSession(AcknowledgementMode);
}
public ISession CreateSession(AcknowledgementMode acknowledgementMode)
{
CheckClosedOrFailed();
CreateNmsConnection();
NmsSession session = new NmsSession(this, GetNextSessionId(), acknowledgementMode);
try
{
session.Begin().ConfigureAwait(false).GetAwaiter().GetResult();
sessions.TryAdd(session.SessionInfo.Id, session);
if (started)
{
session.Start();
}
return session;
}
catch (NMSException)
{
throw;
}
catch (Exception ex)
{
throw ExceptionSupport.Wrap(ex, "Failed to establish amqp Session.");
}
}
public void Close()
{
CheckIsOnDeliveryThread();
if (closed.CompareAndSet(false, true))
{
DoStop(false);
foreach (NmsSession session in sessions.Values)
session.Shutdown(null);
try
{
provider.Close();
}
catch (Exception)
{
Tracer.Debug("Ignoring provider exception during connection close");
}
sessions.Clear();
started.Set(false);
connected.Set(false);
}
}
public void PurgeTempDestinations()
{
throw new NotImplementedException();
}
public void Start()
{
CreateNmsConnection();
if (started.CompareAndSet(false, true))
{
foreach (var session in sessions.Values.ToArray())
{
session.Start();
}
}
}
public bool IsStarted => started;
public ConsumerTransformerDelegate ConsumerTransformer { get; set; }
public ProducerTransformerDelegate ProducerTransformer { get; set; }
public TimeSpan RequestTimeout
{
get => TimeSpan.FromMilliseconds(ConnectionInfo.RequestTimeout);
set => ConnectionInfo.RequestTimeout = Convert.ToInt64(value.TotalMilliseconds);
}
public AcknowledgementMode AcknowledgementMode { get; set; }
public string ClientId
{
get => ConnectionInfo.ClientId;
set
{
if (ConnectionInfo.IsExplicitClientId)
{
throw new IllegalStateException("The clientId has already been set");
}
if (string.IsNullOrEmpty(value))
{
throw new InvalidClientIDException("Cannot have a null or empty clientId");
}
if (connected)
{
throw new IllegalStateException("Cannot set the client id once connected.");
}
ConnectionInfo.SetClientId(value, true);
// We weren't connected if we got this far, we should now connect to ensure the
// configured clientId is valid.
CreateNmsConnection();
}
}
public IRedeliveryPolicy RedeliveryPolicy { get; set; }
public IConnectionMetaData MetaData { get; } = ConnectionMetaData.Version;
public event ExceptionListener ExceptionListener;
public event ConnectionInterruptedListener ConnectionInterruptedListener;
public event ConnectionResumedListener ConnectionResumedListener;
public void OnInboundMessage(InboundMessageDispatch envelope)
{
if (sessions.TryGetValue(envelope.ConsumerInfo.SessionId, out NmsSession session))
{
session.OnInboundMessage(envelope);
}
else
{
Tracer.Error($"Could not dispatch message {envelope.Message.NMSMessageId} because session {envelope.ConsumerInfo.SessionId} not found.");
}
if (connectionListeners.Any())
{
foreach (INmsConnectionListener listener in connectionListeners)
listener.OnInboundMessage(envelope.Message);
}
}
public void OnConnectionFailure(NMSException exception)
{
Interlocked.CompareExchange(ref failureCause, exception, null);
OnAsyncException(exception);
if (!closed)
{
try
{
provider?.Close();
}
catch (Exception error)
{
Tracer.Error($"Error while closing failed Provider: {error.Message}");
}
connected.Set(false);
try
{
foreach (NmsSession session in sessions.Values.ToArray())
{
session.Shutdown(exception);
}
}
catch (NMSException e)
{
Tracer.Error($"Exception during connection cleanup, {e}");
}
foreach (INmsConnectionListener listener in connectionListeners)
{
listener.OnConnectionFailure(exception);
}
}
}
public async Task OnConnectionRecovery(IProvider provider)
{
foreach (NmsTemporaryDestination tempDestination in tempDestinations.Values)
{
await provider.CreateResource(tempDestination);
}
foreach (NmsSession session in sessions.Values)
{
await session.OnConnectionRecovery(provider).ConfigureAwait(false);
}
}
public void OnConnectionEstablished(Uri remoteUri)
{
Tracer.Info($"Connection {ConnectionInfo.Id} connected to remote Broker: {remoteUri.Scheme}://{remoteUri.Host}:{remoteUri.Port}");
MessageFactory = provider.MessageFactory;
foreach (var listener in connectionListeners)
{
listener.OnConnectionEstablished(remoteUri);
}
}
public async Task OnConnectionRecovered(IProvider provider)
{
Tracer.Debug($"Connection {ConnectionInfo.Id} is finalizing recovery.");
foreach (NmsSession session in sessions.Values)
{
await session.OnConnectionRecovered(provider).ConfigureAwait(false);
}
}
public void OnConnectionRestored(Uri remoteUri)
{
MessageFactory = provider.MessageFactory;
foreach (var listener in connectionListeners)
{
listener.OnConnectionRestored(remoteUri);
}
ConnectionResumedListener?.Invoke();
}
public void OnResourceClosed(INmsResource resource, Exception error)
{
switch (resource)
{
case NmsConsumerInfo consumerInfo:
{
if (!sessions.TryGetValue(consumerInfo.SessionId, out NmsSession session))
return;
NmsMessageConsumer messageConsumer = session.ConsumerClosed(consumerInfo.Id, error);
if (messageConsumer == null)
return;
foreach (var connectionListener in connectionListeners)
connectionListener.OnConsumerClosed(messageConsumer, error);
break;
}
case NmsProducerInfo producerInfo:
{
if (!sessions.TryGetValue(producerInfo.SessionId, out NmsSession session))
return;
NmsMessageProducer messageProducer = session.ProducerClosed(producerInfo.Id, error);
if (messageProducer == null)
return;
foreach (var connectionListener in connectionListeners)
connectionListener.OnProducerClosed(messageProducer, error);
break;
}
}
}
public void OnConnectionInterrupted(Uri failedUri)
{
foreach (NmsSession session in sessions.Values)
{
session.OnConnectionInterrupted();
}
foreach (INmsConnectionListener listener in connectionListeners)
listener.OnConnectionInterrupted(failedUri);
ConnectionInterruptedListener?.Invoke();
}
private void CheckClosedOrFailed()
{
if (closed)
{
throw new IllegalStateException("The Connection is closed");
}
if (failureCause != null)
{
throw new NMSConnectionException(failureCause.Message, failureCause);
}
}
internal Task CreateResource(INmsResource resource)
{
return provider.CreateResource(resource);
}
internal Task DestroyResource(INmsResource resource)
{
return provider.DestroyResource(resource);
}
internal Task Send(OutboundMessageDispatch envelope)
{
return provider.Send(envelope);
}
private void CheckIsOnDeliveryThread()
{
foreach (NmsSession session in sessions.Values)
{
session.CheckIsOnDeliveryThread();
}
}
private void CreateNmsConnection()
{
if (connected || closed)
{
return;
}
lock (syncRoot)
{
if (closed || connected)
{
return;
}
try
{
provider.Connect(ConnectionInfo).ConfigureAwait(false).GetAwaiter().GetResult();
connected.Set(true);
}
catch (Exception e)
{
try
{
provider.Close();
}
catch
{
// ignored
}
throw NMSExceptionSupport.Create(e);
}
}
}
internal void OnAsyncException(Exception error)
{
ExceptionListener?.Invoke(error);
}
internal Task Recover(NmsSessionId sessionId)
{
return provider.Recover(sessionId);
}
public Task StartResource(INmsResource resourceInfo)
{
return provider.StartResource(resourceInfo);
}
public Task StopResource(INmsResource resourceInfo)
{
return provider.StopResource(resourceInfo);
}
public void AddConnectionListener(INmsConnectionListener listener)
{
connectionListeners.Add(listener);
}
internal Task Acknowledge(NmsSessionId sessionId, AckType ackType)
{
return provider.Acknowledge(sessionId, ackType);
}
internal Task Acknowledge(InboundMessageDispatch envelope, AckType ackType)
{
return provider.Acknowledge(envelope, ackType);
}
internal void RemoveSession(NmsSessionInfo sessionInfo)
{
sessions.TryRemove(sessionInfo.Id, out _);
}
public ITemporaryQueue CreateTemporaryQueue()
{
var destinationName = $"{Id}:{temporaryQueueIdGenerator.IncrementAndGet().ToString()}";
var queue = new NmsTemporaryQueue(destinationName);
InitializeTemporaryDestination(queue);
return queue;
}
public ITemporaryTopic CreateTemporaryTopic()
{
var destinationName = $"{Id}:{temporaryTopicIdGenerator.IncrementAndGet().ToString()}";
NmsTemporaryTopic topic = new NmsTemporaryTopic(destinationName);
InitializeTemporaryDestination(topic);
return topic;
}
private void InitializeTemporaryDestination(NmsTemporaryDestination temporaryDestination)
{
CreateResource(temporaryDestination).ConfigureAwait(false).GetAwaiter().GetResult();
tempDestinations.TryAdd(temporaryDestination, temporaryDestination);
temporaryDestination.Connection = this;
}
internal void CheckConsumeFromTemporaryDestination(NmsTemporaryDestination destination)
{
if (!Equals(this, destination.Connection))
throw new InvalidDestinationException("Can't consume from a temporary destination created using another connection");
}
public void DeleteTemporaryDestination(NmsTemporaryDestination destination)
{
CheckClosedOrFailed();
try
{
foreach (NmsSession session in sessions.Values)
{
if (session.IsDestinationInUse(destination))
{
throw new IllegalStateException("A consumer is consuming from the temporary destination");
}
}
tempDestinations.TryRemove(destination, out _);
DestroyResource(destination).ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (Exception e)
{
throw NMSExceptionSupport.Create(e);
}
}
public void Unsubscribe(string subscriptionName)
{
CheckClosedOrFailed();
provider.Unsubscribe(subscriptionName).ConfigureAwait(false).GetAwaiter().GetResult();
}
public Task Rollback(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
return provider.Rollback(transactionInfo, nextTransactionInfo);
}
public Task Commit(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
return provider.Commit(transactionInfo, nextTransactionInfo);
}
private NmsSessionId GetNextSessionId()
{
return new NmsSessionId(ConnectionInfo.Id, sessionIdGenerator.IncrementAndGet());
}
public NmsTransactionId GetNextTransactionId()
{
return new NmsTransactionId(Id, transactionIdGenerator);
}
}
}