| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Collections.Concurrent; |
| using System.Collections.Generic; |
| using System.Linq; |
| using System.Threading; |
| using System.Threading.Tasks; |
| using Apache.NMS.AMQP.Message; |
| using Apache.NMS.AMQP.Meta; |
| using Apache.NMS.AMQP.Provider; |
| using Apache.NMS.AMQP.Util; |
| using Apache.NMS.Util; |
| |
| namespace Apache.NMS.AMQP |
| { |
| public class NmsConnection : IConnection, IProviderListener |
| { |
| private readonly AtomicBool closed = new AtomicBool(); |
| private readonly AtomicBool connected = new AtomicBool(); |
| private readonly HashSet<INmsConnectionListener> connectionListeners = new HashSet<INmsConnectionListener>(); |
| private readonly IProvider provider; |
| private readonly ConcurrentDictionary<NmsSessionId, NmsSession> sessions = new ConcurrentDictionary<NmsSessionId, NmsSession>(); |
| private readonly ConcurrentDictionary<NmsTemporaryDestination, NmsTemporaryDestination> tempDestinations = new ConcurrentDictionary<NmsTemporaryDestination, NmsTemporaryDestination>(); |
| private readonly AtomicBool started = new AtomicBool(); |
| private readonly AtomicLong sessionIdGenerator = new AtomicLong(); |
| private readonly AtomicLong temporaryTopicIdGenerator = new AtomicLong(); |
| private readonly AtomicLong temporaryQueueIdGenerator = new AtomicLong(); |
| private readonly AtomicLong transactionIdGenerator = new AtomicLong(); |
| private Exception failureCause; |
| private readonly object syncRoot = new object(); |
| |
| public NmsConnection(NmsConnectionInfo connectionInfo, IProvider provider) |
| { |
| if (provider == null) |
| { |
| throw new ArgumentNullException($"{nameof(provider)}", "Remove provider instance not set."); |
| } |
| |
| ConnectionInfo = connectionInfo; |
| this.provider = provider; |
| provider.SetProviderListener(this); |
| |
| try |
| { |
| provider.Start(); |
| } |
| catch (Exception e) |
| { |
| throw NMSExceptionSupport.Create(e); |
| } |
| } |
| |
| public NmsConnectionInfo ConnectionInfo { get; } |
| public bool IsClosed => closed.Value; |
| public bool IsConnected => connected.Value; |
| public NmsConnectionId Id => ConnectionInfo.Id; |
| public INmsMessageFactory MessageFactory { get; private set; } |
| |
| public void Dispose() |
| { |
| try |
| { |
| Close(); |
| } |
| catch (Exception ex) |
| { |
| Tracer.DebugFormat("Caught exception while disposing {0} {1}. Exception {2}", GetType().Name, ConnectionInfo.Id, ex); |
| } |
| } |
| |
| public void Stop() |
| { |
| DoStop(true); |
| } |
| |
| private void DoStop(bool checkClosed) |
| { |
| if (checkClosed) |
| CheckClosedOrFailed(); |
| |
| CheckIsOnDeliveryThread(); |
| |
| if (started.CompareAndSet(true, false)) |
| { |
| foreach (NmsSession session in sessions.Values) |
| { |
| session.Stop(); |
| } |
| } |
| } |
| |
| public ISession CreateSession() |
| { |
| return CreateSession(AcknowledgementMode); |
| } |
| |
| public ISession CreateSession(AcknowledgementMode acknowledgementMode) |
| { |
| CheckClosedOrFailed(); |
| CreateNmsConnection(); |
| |
| NmsSession session = new NmsSession(this, GetNextSessionId(), acknowledgementMode); |
| try |
| { |
| session.Begin().ConfigureAwait(false).GetAwaiter().GetResult(); |
| sessions.TryAdd(session.SessionInfo.Id, session); |
| if (started) |
| { |
| session.Start(); |
| } |
| |
| return session; |
| } |
| catch (NMSException) |
| { |
| throw; |
| } |
| catch (Exception ex) |
| { |
| throw ExceptionSupport.Wrap(ex, "Failed to establish amqp Session."); |
| } |
| } |
| |
| public void Close() |
| { |
| CheckIsOnDeliveryThread(); |
| |
| if (closed.CompareAndSet(false, true)) |
| { |
| DoStop(false); |
| |
| foreach (NmsSession session in sessions.Values) |
| session.Shutdown(null); |
| |
| try |
| { |
| provider.Close(); |
| } |
| catch (Exception) |
| { |
| Tracer.Debug("Ignoring provider exception during connection close"); |
| } |
| |
| sessions.Clear(); |
| started.Set(false); |
| connected.Set(false); |
| } |
| } |
| |
| public void PurgeTempDestinations() |
| { |
| throw new NotImplementedException(); |
| } |
| |
| public void Start() |
| { |
| CreateNmsConnection(); |
| |
| if (started.CompareAndSet(false, true)) |
| { |
| foreach (var session in sessions.Values.ToArray()) |
| { |
| session.Start(); |
| } |
| } |
| } |
| |
| public bool IsStarted => started; |
| |
| public ConsumerTransformerDelegate ConsumerTransformer { get; set; } |
| public ProducerTransformerDelegate ProducerTransformer { get; set; } |
| |
| public TimeSpan RequestTimeout |
| { |
| get => TimeSpan.FromMilliseconds(ConnectionInfo.RequestTimeout); |
| set => ConnectionInfo.RequestTimeout = Convert.ToInt64(value.TotalMilliseconds); |
| } |
| |
| public AcknowledgementMode AcknowledgementMode { get; set; } |
| |
| public string ClientId |
| { |
| get => ConnectionInfo.ClientId; |
| set |
| { |
| if (ConnectionInfo.IsExplicitClientId) |
| { |
| throw new IllegalStateException("The clientId has already been set"); |
| } |
| |
| if (string.IsNullOrEmpty(value)) |
| { |
| throw new InvalidClientIDException("Cannot have a null or empty clientId"); |
| } |
| |
| if (connected) |
| { |
| throw new IllegalStateException("Cannot set the client id once connected."); |
| } |
| |
| ConnectionInfo.SetClientId(value, true); |
| |
| // We weren't connected if we got this far, we should now connect to ensure the |
| // configured clientId is valid. |
| CreateNmsConnection(); |
| } |
| } |
| |
| public IRedeliveryPolicy RedeliveryPolicy { get; set; } |
| public IConnectionMetaData MetaData { get; } = ConnectionMetaData.Version; |
| public event ExceptionListener ExceptionListener; |
| public event ConnectionInterruptedListener ConnectionInterruptedListener; |
| public event ConnectionResumedListener ConnectionResumedListener; |
| |
| public void OnInboundMessage(InboundMessageDispatch envelope) |
| { |
| if (sessions.TryGetValue(envelope.ConsumerInfo.SessionId, out NmsSession session)) |
| { |
| session.OnInboundMessage(envelope); |
| } |
| else |
| { |
| Tracer.Error($"Could not dispatch message {envelope.Message.NMSMessageId} because session {envelope.ConsumerInfo.SessionId} not found."); |
| } |
| |
| if (connectionListeners.Any()) |
| { |
| foreach (INmsConnectionListener listener in connectionListeners) |
| listener.OnInboundMessage(envelope.Message); |
| } |
| } |
| |
| public void OnConnectionFailure(NMSException exception) |
| { |
| Interlocked.CompareExchange(ref failureCause, exception, null); |
| |
| OnAsyncException(exception); |
| |
| if (!closed) |
| { |
| try |
| { |
| provider?.Close(); |
| } |
| catch (Exception error) |
| { |
| Tracer.Error($"Error while closing failed Provider: {error.Message}"); |
| } |
| |
| connected.Set(false); |
| |
| try |
| { |
| foreach (NmsSession session in sessions.Values.ToArray()) |
| { |
| session.Shutdown(exception); |
| } |
| } |
| catch (NMSException e) |
| { |
| Tracer.Error($"Exception during connection cleanup, {e}"); |
| } |
| |
| foreach (INmsConnectionListener listener in connectionListeners) |
| { |
| listener.OnConnectionFailure(exception); |
| } |
| } |
| } |
| |
| public async Task OnConnectionRecovery(IProvider provider) |
| { |
| foreach (NmsTemporaryDestination tempDestination in tempDestinations.Values) |
| { |
| await provider.CreateResource(tempDestination); |
| } |
| |
| foreach (NmsSession session in sessions.Values) |
| { |
| await session.OnConnectionRecovery(provider).ConfigureAwait(false); |
| } |
| } |
| |
| public void OnConnectionEstablished(Uri remoteUri) |
| { |
| Tracer.Info($"Connection {ConnectionInfo.Id} connected to remote Broker: {remoteUri.Scheme}://{remoteUri.Host}:{remoteUri.Port}"); |
| MessageFactory = provider.MessageFactory; |
| |
| foreach (var listener in connectionListeners) |
| { |
| listener.OnConnectionEstablished(remoteUri); |
| } |
| } |
| |
| public async Task OnConnectionRecovered(IProvider provider) |
| { |
| Tracer.Debug($"Connection {ConnectionInfo.Id} is finalizing recovery."); |
| |
| foreach (NmsSession session in sessions.Values) |
| { |
| await session.OnConnectionRecovered(provider).ConfigureAwait(false); |
| } |
| } |
| |
| public void OnConnectionRestored(Uri remoteUri) |
| { |
| MessageFactory = provider.MessageFactory; |
| |
| foreach (var listener in connectionListeners) |
| { |
| listener.OnConnectionRestored(remoteUri); |
| } |
| |
| ConnectionResumedListener?.Invoke(); |
| } |
| |
| public void OnResourceClosed(INmsResource resource, Exception error) |
| { |
| switch (resource) |
| { |
| case NmsConsumerInfo consumerInfo: |
| { |
| if (!sessions.TryGetValue(consumerInfo.SessionId, out NmsSession session)) |
| return; |
| |
| NmsMessageConsumer messageConsumer = session.ConsumerClosed(consumerInfo.Id, error); |
| if (messageConsumer == null) |
| return; |
| foreach (var connectionListener in connectionListeners) |
| connectionListener.OnConsumerClosed(messageConsumer, error); |
| break; |
| } |
| |
| case NmsProducerInfo producerInfo: |
| { |
| if (!sessions.TryGetValue(producerInfo.SessionId, out NmsSession session)) |
| return; |
| |
| NmsMessageProducer messageProducer = session.ProducerClosed(producerInfo.Id, error); |
| if (messageProducer == null) |
| return; |
| |
| foreach (var connectionListener in connectionListeners) |
| connectionListener.OnProducerClosed(messageProducer, error); |
| break; |
| } |
| } |
| } |
| |
| public void OnConnectionInterrupted(Uri failedUri) |
| { |
| foreach (NmsSession session in sessions.Values) |
| { |
| session.OnConnectionInterrupted(); |
| } |
| |
| foreach (INmsConnectionListener listener in connectionListeners) |
| listener.OnConnectionInterrupted(failedUri); |
| |
| ConnectionInterruptedListener?.Invoke(); |
| } |
| |
| private void CheckClosedOrFailed() |
| { |
| if (closed) |
| { |
| throw new IllegalStateException("The Connection is closed"); |
| } |
| if (failureCause != null) |
| { |
| throw new NMSConnectionException(failureCause.Message, failureCause); |
| } |
| } |
| |
| internal Task CreateResource(INmsResource resource) |
| { |
| return provider.CreateResource(resource); |
| } |
| |
| internal Task DestroyResource(INmsResource resource) |
| { |
| return provider.DestroyResource(resource); |
| } |
| |
| internal Task Send(OutboundMessageDispatch envelope) |
| { |
| return provider.Send(envelope); |
| } |
| |
| private void CheckIsOnDeliveryThread() |
| { |
| foreach (NmsSession session in sessions.Values) |
| { |
| session.CheckIsOnDeliveryThread(); |
| } |
| } |
| |
| private void CreateNmsConnection() |
| { |
| if (connected || closed) |
| { |
| return; |
| } |
| |
| lock (syncRoot) |
| { |
| if (closed || connected) |
| { |
| return; |
| } |
| |
| try |
| { |
| provider.Connect(ConnectionInfo).ConfigureAwait(false).GetAwaiter().GetResult(); |
| connected.Set(true); |
| } |
| catch (Exception e) |
| { |
| try |
| { |
| provider.Close(); |
| } |
| catch |
| { |
| // ignored |
| } |
| |
| throw NMSExceptionSupport.Create(e); |
| } |
| } |
| } |
| |
| internal void OnAsyncException(Exception error) |
| { |
| ExceptionListener?.Invoke(error); |
| } |
| |
| internal Task Recover(NmsSessionId sessionId) |
| { |
| return provider.Recover(sessionId); |
| } |
| |
| public Task StartResource(INmsResource resourceInfo) |
| { |
| return provider.StartResource(resourceInfo); |
| } |
| |
| public Task StopResource(INmsResource resourceInfo) |
| { |
| return provider.StopResource(resourceInfo); |
| } |
| |
| public void AddConnectionListener(INmsConnectionListener listener) |
| { |
| connectionListeners.Add(listener); |
| } |
| |
| internal Task Acknowledge(NmsSessionId sessionId, AckType ackType) |
| { |
| return provider.Acknowledge(sessionId, ackType); |
| } |
| |
| internal Task Acknowledge(InboundMessageDispatch envelope, AckType ackType) |
| { |
| return provider.Acknowledge(envelope, ackType); |
| } |
| |
| internal void RemoveSession(NmsSessionInfo sessionInfo) |
| { |
| sessions.TryRemove(sessionInfo.Id, out _); |
| } |
| |
| public ITemporaryQueue CreateTemporaryQueue() |
| { |
| var destinationName = $"{Id}:{temporaryQueueIdGenerator.IncrementAndGet().ToString()}"; |
| var queue = new NmsTemporaryQueue(destinationName); |
| InitializeTemporaryDestination(queue); |
| return queue; |
| } |
| |
| public ITemporaryTopic CreateTemporaryTopic() |
| { |
| var destinationName = $"{Id}:{temporaryTopicIdGenerator.IncrementAndGet().ToString()}"; |
| NmsTemporaryTopic topic = new NmsTemporaryTopic(destinationName); |
| InitializeTemporaryDestination(topic); |
| return topic; |
| } |
| |
| private void InitializeTemporaryDestination(NmsTemporaryDestination temporaryDestination) |
| { |
| CreateResource(temporaryDestination).ConfigureAwait(false).GetAwaiter().GetResult(); |
| tempDestinations.TryAdd(temporaryDestination, temporaryDestination); |
| temporaryDestination.Connection = this; |
| } |
| |
| internal void CheckConsumeFromTemporaryDestination(NmsTemporaryDestination destination) |
| { |
| if (!Equals(this, destination.Connection)) |
| throw new InvalidDestinationException("Can't consume from a temporary destination created using another connection"); |
| } |
| |
| public void DeleteTemporaryDestination(NmsTemporaryDestination destination) |
| { |
| CheckClosedOrFailed(); |
| |
| try |
| { |
| foreach (NmsSession session in sessions.Values) |
| { |
| if (session.IsDestinationInUse(destination)) |
| { |
| throw new IllegalStateException("A consumer is consuming from the temporary destination"); |
| } |
| } |
| |
| tempDestinations.TryRemove(destination, out _); |
| |
| DestroyResource(destination).ConfigureAwait(false).GetAwaiter().GetResult(); |
| } |
| catch (Exception e) |
| { |
| throw NMSExceptionSupport.Create(e); |
| } |
| } |
| |
| public void Unsubscribe(string subscriptionName) |
| { |
| CheckClosedOrFailed(); |
| |
| provider.Unsubscribe(subscriptionName).ConfigureAwait(false).GetAwaiter().GetResult(); |
| } |
| |
| public Task Rollback(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo) |
| { |
| return provider.Rollback(transactionInfo, nextTransactionInfo); |
| } |
| |
| public Task Commit(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo) |
| { |
| return provider.Commit(transactionInfo, nextTransactionInfo); |
| } |
| |
| private NmsSessionId GetNextSessionId() |
| { |
| return new NmsSessionId(ConnectionInfo.Id, sessionIdGenerator.IncrementAndGet()); |
| } |
| |
| public NmsTransactionId GetNextTransactionId() |
| { |
| return new NmsTransactionId(Id, transactionIdGenerator); |
| } |
| } |
| } |