blob: 3410eb62c7ecc907414d7c0ce1451a60cbd2715b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider.Amqp.Message;
using Apache.NMS.AMQP.Transport;
using Apache.NMS.AMQP.Util;
namespace Apache.NMS.AMQP.Provider.Amqp
public interface IAmqpConnection
string QueuePrefix { get; }
string TopicPrefix { get; }
bool ObjectMessageUsesAmqpTypes { get; }
public class AmqpConnection : IAmqpConnection
private readonly ConcurrentDictionary<Id, AmqpSession> sessions = new ConcurrentDictionary<Id, AmqpSession>();
private readonly ConcurrentDictionary<Id, AmqpTemporaryDestination> temporaryDestinations = new ConcurrentDictionary<Id, AmqpTemporaryDestination>();
public AmqpProvider Provider { get; }
private readonly ITransportContext transport;
private readonly Uri remoteUri;
private Connection underlyingConnection;
private readonly AmqpMessageFactory messageFactory;
private AmqpConnectionSession connectionSession;
private TaskCompletionSource<bool> tsc;
public AmqpConnection(AmqpProvider provider, ITransportContext transport, ConnectionInfo info)
this.Provider = provider;
this.transport = transport;
this.remoteUri = provider.RemoteUri;
this.Info = info;
this.messageFactory = new AmqpMessageFactory(this);
public Connection UnderlyingConnection => underlyingConnection;
public string QueuePrefix => Info.QueuePrefix;
public string TopicPrefix => Info.TopicPrefix;
public bool ObjectMessageUsesAmqpTypes { get; set; } = false;
public ConnectionInfo Info { get; }
public INmsMessageFactory MessageFactory => messageFactory;
internal async Task Start()
Address address = UriUtil.ToAddress(remoteUri, Info.username, Info.password);
this.tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
underlyingConnection = await transport.CreateAsync(address, new AmqpHandler(this)).ConfigureAwait(false);
underlyingConnection.AddClosedCallback((sender, error) => Provider.OnConnectionClosed(error));
// Wait for connection to be opened
await tsc.Task.ConfigureAwait(false);
// Create a Session for this connection that is used for Temporary Destinations
// and perhaps later on management and advisory monitoring.
// TODO: change the way how connection session id is obtained
SessionInfo sessionInfo = new SessionInfo(Info.Id);
sessionInfo.AcknowledgementMode = AcknowledgementMode.AutoAcknowledge;
connectionSession = new AmqpConnectionSession(this, sessionInfo);
await connectionSession.Start().ConfigureAwait(false);
internal void OnLocalOpen(Open open)
open.ContainerId = Info.ClientId;
open.ChannelMax = Info.channelMax;
open.MaxFrameSize = Convert.ToUInt32(Info.maxFrameSize);
open.HostName = remoteUri.Host;
open.IdleTimeOut = Convert.ToUInt32(Info.idleTimout);
open.DesiredCapabilities = new[]
internal void OnRemoteOpened(Open open)
if (SymbolUtil.CheckAndCompareFields(open.Properties, SymbolUtil.CONNECTION_ESTABLISH_FAILED, SymbolUtil.BOOLEAN_TRUE))
Tracer.InfoFormat("Open response contains {0} property the connection {1} will soon be closed.",
object value = SymbolUtil.GetFromFields(open.Properties, SymbolUtil.CONNECTION_PROPERTY_TOPIC_PREFIX);
if (value is string topicPrefix)
Info.TopicPrefix = topicPrefix;
value = SymbolUtil.GetFromFields(open.Properties, SymbolUtil.CONNECTION_PROPERTY_QUEUE_PREFIX);
if (value is string queuePrefix)
Info.QueuePrefix = queuePrefix;
public async Task CreateSession(SessionInfo sessionInfo)
var amqpSession = new AmqpSession(this, sessionInfo);
await amqpSession.Start().ConfigureAwait(false);
sessions.TryAdd(sessionInfo.Id, amqpSession);
public void Close()
catch (Exception ex)
// log network errors
NMSException nmse = ExceptionSupport.Wrap(ex, "Amqp Connection close failure for NMS Connection {0}", this.Info.Id);
Tracer.DebugFormat("Caught Exception while closing Amqp Connection {0}. Exception {1}", this.Info.Id, nmse);
public AmqpSession GetSession(Id sessionId)
if (sessions.TryGetValue(sessionId, out AmqpSession session))
return session;
throw new InvalidOperationException($"Amqp Session {sessionId} doesn't exist and cannot be retrieved.");
public void RemoveSession(Id sessionId)
sessions.TryRemove(sessionId, out AmqpSession removedSession);
public async Task CreateTemporaryDestination(NmsTemporaryDestination destination)
AmqpTemporaryDestination amqpTemporaryDestination = new AmqpTemporaryDestination(connectionSession, destination);
await amqpTemporaryDestination.Attach();
temporaryDestinations.TryAdd(destination.Id, amqpTemporaryDestination);
public AmqpTemporaryDestination GetTemporaryDestination(NmsTemporaryDestination destination)
return temporaryDestinations.TryGetValue(destination.Id, out AmqpTemporaryDestination amqpTemporaryDestination) ? amqpTemporaryDestination : null;
public void RemoveTemporaryDestination(Id destinationId)
temporaryDestinations.TryRemove(destinationId, out _);
public Task Unsubscribe(string subscriptionName)
// check for any active consumers on the subscription name.
if (sessions.Values.Any(session => session.ContainsSubscriptionName(subscriptionName)))
throw new IllegalStateException("Cannot unsubscribe from Durable Consumer while consuming messages.");
return connectionSession.Unsubscribe(subscriptionName);