blob: 80e2fdd3b898ce981032bdb09182b90a99fc0fd3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Transport;
using Apache.NMS.AMQP.Util;
namespace Apache.NMS.AMQP.Provider.Amqp
{
public class AmqpProvider : IProvider
{
public static readonly uint DEFAULT_MAX_HANDLE = 1024;
private readonly ITransportContext transport;
private ConnectionInfo connectionInfo;
private AmqpConnection connection;
public uint MaxHandle { get; set; } = DEFAULT_MAX_HANDLE;
public AmqpProvider(Uri remoteUri, ITransportContext transport)
{
RemoteUri = remoteUri;
this.transport = transport;
}
static AmqpProvider()
{
// Set up tracing in AMQP. We capture all AMQP traces in the TraceListener below
// and map to NMS 'Tracer' logs as follows:
// AMQP Tracer
// Verbose Debug
// Frame Debug
// Information Info
// Output Info (should not happen)
// Warning Warn
// Error Error
Trace.TraceLevel = TraceLevel.Verbose;
Trace.TraceListener = (level, format, args) =>
{
switch (level)
{
case TraceLevel.Verbose:
case TraceLevel.Frame:
Tracer.DebugFormat(format, args);
break;
case TraceLevel.Information:
case TraceLevel.Output:
//
// Applications should not access AmqpLite directly so there
// should be no 'Output' level logs.
Tracer.InfoFormat(format, args);
break;
case TraceLevel.Warning:
Tracer.WarnFormat(format, args);
break;
case TraceLevel.Error:
Tracer.ErrorFormat(format, args);
break;
default:
Tracer.InfoFormat("Unknown AMQP LogLevel: {}", level);
Tracer.InfoFormat(format, args);
break;
}
};
}
/// <summary>
/// Enables AmqpNetLite's Frame logging level.
/// </summary>
public bool TraceFrames
{
get => ((Trace.TraceLevel & TraceLevel.Frame) == TraceLevel.Frame);
set
{
if (value)
{
Trace.TraceLevel = Trace.TraceLevel | TraceLevel.Frame;
}
else
{
Trace.TraceLevel = Trace.TraceLevel & ~TraceLevel.Frame;
}
}
}
public long SendTimeout => connectionInfo?.SendTimeout ?? ConnectionInfo.DEFAULT_SEND_TIMEOUT;
public Uri RemoteUri { get; }
public IProviderListener Listener { get; private set; }
public void Start()
{
}
public Task Connect(ConnectionInfo connectionInfo)
{
this.connectionInfo = connectionInfo;
connection = new AmqpConnection(this, transport, connectionInfo);
return connection.Start();
}
internal void OnConnectionClosed(NMSException exception)
{
Listener?.OnConnectionFailure(exception);
}
internal void FireConnectionEstablished()
{
Listener?.OnConnectionEstablished(RemoteUri);
}
public void Close()
{
connection?.Close();
}
public void SetProviderListener(IProviderListener providerListener)
{
Listener = providerListener;
}
public Task CreateResource(ResourceInfo resourceInfo)
{
switch (resourceInfo)
{
case SessionInfo sessionInfo:
return connection.CreateSession(sessionInfo);
case ConsumerInfo consumerInfo:
{
AmqpSession session = connection.GetSession(consumerInfo.SessionId);
return session.CreateConsumer(consumerInfo);
}
case ProducerInfo producerInfo:
{
AmqpSession session = connection.GetSession(producerInfo.SessionId);
return session.CreateProducer(producerInfo);
}
case NmsTemporaryDestination temporaryDestination:
return connection.CreateTemporaryDestination(temporaryDestination);
case TransactionInfo transactionInfo:
var amqpSession = connection.GetSession(transactionInfo.SessionId);
return amqpSession.BeginTransaction(transactionInfo);
default:
throw new ArgumentOutOfRangeException(nameof(resourceInfo), "Not supported resource type.");
}
}
public Task DestroyResource(ResourceInfo resourceInfo)
{
switch (resourceInfo)
{
case SessionInfo sessionInfo:
{
AmqpSession session = connection.GetSession(sessionInfo.Id);
session.Close();
return Task.CompletedTask;
}
case ConsumerInfo consumerInfo:
{
AmqpSession session = connection.GetSession(consumerInfo.SessionId);
AmqpConsumer consumer = session.GetConsumer(consumerInfo.Id);
consumer.Close();
session.RemoveConsumer(consumerInfo.Id);
return Task.CompletedTask;
}
case ProducerInfo producerInfo:
{
AmqpSession session = connection.GetSession(producerInfo.SessionId);
AmqpProducer producer = session.GetProducer(producerInfo.Id);
producer.Close();
session.RemoveProducer(producerInfo.Id);
return Task.CompletedTask;
}
case NmsTemporaryDestination temporaryDestination:
{
AmqpTemporaryDestination amqpTemporaryDestination = connection.GetTemporaryDestination(temporaryDestination);
if (amqpTemporaryDestination != null)
{
amqpTemporaryDestination.Close();
connection.RemoveTemporaryDestination(temporaryDestination.Id);
}
else
Tracer.Debug($"Could not find temporary destination {temporaryDestination.Id} to delete.");
return Task.CompletedTask;
}
default:
throw new ArgumentOutOfRangeException(nameof(resourceInfo), "Not supported resource type.");
}
}
public Task StartResource(ResourceInfo resourceInfo)
{
switch (resourceInfo)
{
case ConsumerInfo consumerInfo:
AmqpSession session = connection.GetSession(consumerInfo.SessionId);
AmqpConsumer amqpConsumer = session.GetConsumer(consumerInfo.Id);
amqpConsumer.Start();
return Task.CompletedTask;
default:
throw new ArgumentOutOfRangeException(nameof(resourceInfo), "Not supported resource type.");
}
}
public Task StopResource(ResourceInfo resourceInfo)
{
switch (resourceInfo)
{
case ConsumerInfo consumerInfo:
AmqpSession session = connection.GetSession(consumerInfo.SessionId);
AmqpConsumer amqpConsumer = session.GetConsumer(consumerInfo.Id);
amqpConsumer.Stop();
return Task.CompletedTask;
default:
throw new ArgumentOutOfRangeException(nameof(resourceInfo), "Not supported resource type.");
}
}
public Task Recover(Id sessionId)
{
AmqpSession session = connection.GetSession(sessionId);
session.Recover();
return Task.CompletedTask;
}
public Task Acknowledge(Id sessionId, AckType ackType)
{
AmqpSession session = connection.GetSession(sessionId);
foreach (AmqpConsumer consumer in session.Consumers)
{
consumer.Acknowledge(ackType);
}
return Task.CompletedTask;
}
public Task Acknowledge(InboundMessageDispatch envelope, AckType ackType)
{
AmqpSession session = connection.GetSession(envelope.ConsumerInfo.SessionId);
AmqpConsumer consumer = session.GetConsumer(envelope.ConsumerId);
consumer.Acknowledge(envelope, ackType);
return Task.CompletedTask;
}
public INmsMessageFactory MessageFactory => connection.MessageFactory;
public Task Send(OutboundMessageDispatch envelope)
{
AmqpSession session = connection.GetSession(envelope.ProducerInfo.SessionId);
AmqpProducer producer = session.GetProducer(envelope.ProducerId);
producer.Send(envelope);
envelope.Message.IsReadOnly = false;
return Task.CompletedTask;
}
public Task Unsubscribe(string subscriptionName)
{
return connection.Unsubscribe(subscriptionName);
}
public Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
{
var session = connection.GetSession(transactionInfo.SessionId);
return session.Rollback(transactionInfo, nextTransactionInfo);
}
public Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
{
var session = connection.GetSession(transactionInfo.SessionId);
return session.Commit(transactionInfo, nextTransactionInfo);
}
public void FireResourceClosed(ResourceInfo resourceInfo, Exception error)
{
Listener.OnResourceClosed(resourceInfo, error);
}
}
}