blob: 0e30ad657f5e5a57196731a65653980227dcc6cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
using Amqp.Types;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider.Amqp.Message;
using Apache.NMS.AMQP.Transport;
using Apache.NMS.AMQP.Util;
using Apache.NMS.AMQP.Util.Synchronization;
namespace Apache.NMS.AMQP.Provider.Amqp
{
public interface IAmqpConnection
{
string QueuePrefix { get; }
string TopicPrefix { get; }
bool ObjectMessageUsesAmqpTypes { get; }
}
public class AmqpConnection : IAmqpConnection
{
private readonly ConcurrentDictionary<NmsSessionId, AmqpSession> sessions = new ConcurrentDictionary<NmsSessionId, AmqpSession>();
private readonly ConcurrentDictionary<NmsTemporaryDestination, AmqpTemporaryDestination> temporaryDestinations = new ConcurrentDictionary<NmsTemporaryDestination, AmqpTemporaryDestination>();
public AmqpProvider Provider { get; }
private readonly ITransportContext transport;
private readonly Uri remoteUri;
private Connection underlyingConnection;
private readonly AmqpMessageFactory messageFactory;
private AmqpConnectionSession connectionSession;
private TaskCompletionSource<bool> tsc;
public AmqpConnection(AmqpProvider provider, ITransportContext transport, NmsConnectionInfo info)
{
this.Provider = provider;
this.transport = transport;
this.remoteUri = provider.RemoteUri;
this.Info = info;
this.messageFactory = new AmqpMessageFactory(this);
}
public Connection UnderlyingConnection => underlyingConnection;
public string QueuePrefix => Info.QueuePrefix;
public string TopicPrefix => Info.TopicPrefix;
public bool ObjectMessageUsesAmqpTypes { get; set; } = false;
public NmsConnectionInfo Info { get; }
public AmqpSubscriptionTracker SubscriptionTracker { get; } = new AmqpSubscriptionTracker();
public INmsMessageFactory MessageFactory => messageFactory;
internal async Task Start()
{
Address address = UriUtil.ToAddress(remoteUri, Info.UserName, Info.Password);
this.tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
underlyingConnection = await transport.CreateAsync(address, new AmqpHandler(this)).AwaitRunContinuationAsync();
underlyingConnection.AddClosedCallback(OnClosed);
// Wait for connection to be opened
await this.tsc.Task.Await();
// Create a Session for this connection that is used for Temporary Destinations
// and perhaps later on management and advisory monitoring.
NmsSessionInfo sessionInfo = new NmsSessionInfo(Info, -1);
sessionInfo.AcknowledgementMode = AcknowledgementMode.AutoAcknowledge;
connectionSession = new AmqpConnectionSession(this, sessionInfo);
await connectionSession.Start().Await();
}
private void OnClosed(IAmqpObject sender, Error error)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"Connection closed. {error}");
}
bool connectionExplicitlyClosed = error == null;
if (!connectionExplicitlyClosed)
{
var exception = ExceptionSupport.GetException(error);
if (!this.tsc.TrySetException(exception))
{
Provider.OnConnectionClosed(exception);
}
}
}
internal void OnLocalOpen(Open open)
{
open.ContainerId = Info.ClientId;
open.ChannelMax = Info.ChannelMax;
open.MaxFrameSize = (uint) Info.MaxFrameSize;
open.HostName = remoteUri.Host;
open.IdleTimeOut = (uint) Info.IdleTimeOut;
open.DesiredCapabilities = new[]
{
SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY,
SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY,
SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS
};
}
internal void OnRemoteOpened(Open open)
{
if (SymbolUtil.CheckAndCompareFields(open.Properties, SymbolUtil.CONNECTION_ESTABLISH_FAILED, SymbolUtil.BOOLEAN_TRUE))
{
Tracer.InfoFormat("Open response contains {0} property the connection {1} will soon be closed.",
SymbolUtil.CONNECTION_ESTABLISH_FAILED, Info.Id);
}
else
{
Symbol[] capabilities = open.OfferedCapabilities;
if (capabilities != null)
{
if (Array.Exists(capabilities,
symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY)))
{
Info.AnonymousRelaySupported = true;
}
if (Array.Exists(capabilities,
symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY)))
{
Info.DelayedDeliverySupported = true;
}
if (Array.Exists(capabilities,
symbol => Equals(symbol, SymbolUtil.OPEN_CAPABILITY_SHARED_SUBS)))
{
Info.SharedSubsSupported = true;
}
}
object value = SymbolUtil.GetFromFields(open.Properties, SymbolUtil.CONNECTION_PROPERTY_TOPIC_PREFIX);
if (value is string topicPrefix)
{
Info.TopicPrefix = topicPrefix;
}
value = SymbolUtil.GetFromFields(open.Properties, SymbolUtil.CONNECTION_PROPERTY_QUEUE_PREFIX);
if (value is string queuePrefix)
{
Info.QueuePrefix = queuePrefix;
}
this.tsc.TrySetResult(true);
Provider.FireConnectionEstablished();
}
}
public async Task CreateSession(NmsSessionInfo sessionInfo)
{
var amqpSession = new AmqpSession(this, sessionInfo);
await amqpSession.Start().Await();
sessions.TryAdd(sessionInfo.Id, amqpSession);
}
public void Close()
{
try
{
UnderlyingConnection?.Close();
}
catch (Exception ex)
{
// log network errors
NMSException nmse = ExceptionSupport.Wrap(ex, "Amqp Connection close failure for NMS Connection {0}", this.Info.Id);
Tracer.DebugFormat("Caught Exception while closing Amqp Connection {0}. Exception {1}", this.Info.Id, nmse);
}
}
public async Task CloseAsync()
{
try
{
if (UnderlyingConnection != null) await UnderlyingConnection.CloseAsync().AwaitRunContinuationAsync();
}
catch (Exception ex)
{
// log network errors
NMSException nmse = ExceptionSupport.Wrap(ex, "Amqp Connection close failure for NMS Connection {0}", this.Info.Id);
Tracer.DebugFormat("Caught Exception while closing Amqp Connection {0}. Exception {1}", this.Info.Id, nmse);
}
}
public AmqpSession GetSession(NmsSessionId sessionId)
{
if (sessions.TryGetValue(sessionId, out AmqpSession session))
{
return session;
}
throw new InvalidOperationException($"Amqp Session {sessionId} doesn't exist and cannot be retrieved.");
}
public void RemoveSession(NmsSessionId sessionId)
{
sessions.TryRemove(sessionId, out AmqpSession _);
}
public async Task CreateTemporaryDestination(NmsTemporaryDestination destination)
{
AmqpTemporaryDestination amqpTemporaryDestination = new AmqpTemporaryDestination(connectionSession, destination);
await amqpTemporaryDestination.Attach().Await();
temporaryDestinations.TryAdd(destination, amqpTemporaryDestination);
}
public AmqpTemporaryDestination GetTemporaryDestination(NmsTemporaryDestination destination)
{
return temporaryDestinations.TryGetValue(destination, out AmqpTemporaryDestination amqpTemporaryDestination) ? amqpTemporaryDestination : null;
}
public void RemoveTemporaryDestination(NmsTemporaryDestination destinationId)
{
temporaryDestinations.TryRemove(destinationId, out _);
}
public Task Unsubscribe(string subscriptionName)
{
// check for any active consumers on the subscription name.
if (sessions.Values.Any(session => session.ContainsSubscriptionName(subscriptionName)))
{
throw new IllegalStateException("Cannot unsubscribe from Durable Consumer while consuming messages.");
}
return connectionSession.Unsubscribe(subscriptionName);
}
}
}