blob: 66543df28ea30464fccaf40baebf8862ae6cf105 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Specialized;
using System.Threading.Tasks;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider;
using Apache.NMS.AMQP.Util;
using Apache.NMS.AMQP.Util.Synchronization;
using Apache.NMS.Util;
using URISupport = Apache.NMS.AMQP.Util.URISupport;
namespace Apache.NMS.AMQP
{
public class NmsConnectionFactory : IConnectionFactory
{
private const string DEFAULT_REMOTE_HOST = "localhost";
private const string DEFAULT_REMOTE_PORT = "5672";
private Uri brokerUri;
private IdGenerator clientIdGenerator;
private IdGenerator connectionIdGenerator;
private readonly object syncRoot = new object();
public NmsConnectionFactory(string userName, string password)
{
UserName = userName;
Password = password;
}
public NmsConnectionFactory()
{
BrokerUri = GetDefaultRemoteAddress();
}
public NmsConnectionFactory(string userName, string password, string brokerUri) : this(userName, password)
{
BrokerUri = CreateUri(brokerUri);
}
public NmsConnectionFactory(string brokerUri)
{
BrokerUri = CreateUri(brokerUri);
}
public NmsConnectionFactory(Uri brokerUri)
{
BrokerUri = brokerUri;
}
private IdGenerator ClientIdGenerator
{
get
{
if (clientIdGenerator == null)
{
lock (syncRoot)
{
if (clientIdGenerator == null)
{
clientIdGenerator = ClientIdPrefix != null ? new IdGenerator(ClientIdPrefix) : new IdGenerator();
}
}
}
return clientIdGenerator;
}
}
private IdGenerator ConnectionIdGenerator
{
get
{
if (connectionIdGenerator == null)
{
lock (syncRoot)
{
if (connectionIdGenerator == null)
{
connectionIdGenerator = ConnectionIdPrefix != null ? new IdGenerator(ConnectionIdPrefix) : new IdGenerator();
}
}
}
return connectionIdGenerator;
}
}
/// <summary>
/// Enables local message expiry for all MessageConsumers under the connection. Default is true.
/// </summary>
public bool LocalMessageExpiry { get; set; } = true;
/// <summary>
/// User name value used to authenticate the connection
/// </summary>
public string UserName { get; set; }
/// <summary>
/// The password value used to authenticate the connection
/// </summary>
public string Password { get; set; }
/// <summary>
/// Timeout value that controls how long the client waits on completion of various synchronous interactions, such as opening a producer or consumer,
/// before returning an error. Does not affect synchronous message sends. By default the client will wait indefinitely for a request to complete.
/// </summary>
public long RequestTimeout { get; set; } = NmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
/// <summary>
/// Timeout value that controls how long the client waits on completion of a synchronous message send before returning an error.
/// By default the client will wait indefinitely for a send to complete.
/// </summary>
public long SendTimeout { get; set; } = NmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
/// <summary>
/// Gets and sets the close timeout used to control how long a Connection close will wait for
/// clean shutdown of the connection before giving up. A negative value means wait
/// forever.
///
/// Care should be taken in that a very short close timeout can cause the client to
/// not cleanly shutdown the connection and it's resources.
///
/// Time in milliseconds to wait for a clean connection close.
/// </summary>
public long CloseTimeout { get; set; } = NmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
/// <summary>
/// Optional prefix value that is used for generated Connection ID values when a new Connection is created for the NMS ConnectionFactory.
/// This connection ID is used when logging some information from the JMS Connection object so a configurable prefix can make breadcrumbing
/// the logs easier. The default prefix is 'ID:'.
/// </summary>
public string ConnectionIdPrefix { get; set; }
/// <summary>
/// Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS ConnectionFactory.
/// The default prefix is 'ID:'.
/// </summary>
public string ClientIdPrefix { get; set; }
/// <summary>
/// Sets and gets the NMS clientId to use for connections created by this factory.
///
/// NOTE: A clientID can only be used by one Connection at a time, so setting it here
/// will restrict the ConnectionFactory to creating a single open Connection at a time.
/// It is possible to set the clientID on the Connection itself immediately after
/// creation if no value has been set via the factory that created it, which will
/// allow the factory to create multiple open connections at a time.
/// </summary>
public string ClientId { get; set; }
public IConnection CreateConnection()
{
return CreateConnection(UserName, Password);
}
public Task<IConnection> CreateConnectionAsync()
{
return CreateConnectionAsync(UserName, Password);
}
public Task<IConnection> CreateConnectionAsync(string userName, string password)
{
return Task.FromResult(CreateConnection(userName, password));
}
public IConnection CreateConnection(string userName, string password)
{
try
{
NmsConnectionInfo connectionInfo = ConfigureConnectionInfo(userName, password);
IProvider provider = ProviderFactory.Create(BrokerUri);
return new NmsConnection(connectionInfo, provider);
}
catch (Exception e)
{
throw NMSExceptionSupport.Create(e);
}
}
public INMSContext CreateContext()
{
return new NmsContext((NmsConnection)CreateConnection(), AcknowledgementMode.AutoAcknowledge);
}
public INMSContext CreateContext(AcknowledgementMode acknowledgementMode)
{
return new NmsContext((NmsConnection)CreateConnection(), acknowledgementMode);
}
public INMSContext CreateContext(string userName, string password)
{
return new NmsContext((NmsConnection)CreateConnection(userName, password), AcknowledgementMode.AutoAcknowledge);
}
public INMSContext CreateContext(string userName, string password, AcknowledgementMode acknowledgementMode)
{
return new NmsContext((NmsConnection)CreateConnection(userName, password), acknowledgementMode);
}
public async Task<INMSContext> CreateContextAsync()
{
return new NmsContext((NmsConnection)await CreateConnectionAsync().Await(), AcknowledgementMode.AutoAcknowledge);
}
public async Task<INMSContext> CreateContextAsync(AcknowledgementMode acknowledgementMode)
{
return new NmsContext((NmsConnection)await CreateConnectionAsync().Await(), acknowledgementMode);
}
public async Task<INMSContext> CreateContextAsync(string userName, string password)
{
return new NmsContext((NmsConnection)await CreateConnectionAsync(userName, password).Await(), AcknowledgementMode.AutoAcknowledge);
}
public async Task<INMSContext> CreateContextAsync(string userName, string password, AcknowledgementMode acknowledgementMode)
{
return new NmsContext((NmsConnection)await CreateConnectionAsync(userName, password).Await(), acknowledgementMode);
}
public Uri BrokerUri
{
get => brokerUri;
set
{
if (value == null)
throw new ArgumentNullException(nameof(value), "Invalid URI: cannot be null or empty");
brokerUri = value;
if (URISupport.IsCompositeUri(value))
{
var compositeData = NMS.Util.URISupport.ParseComposite(value);
SetUriOptions(compositeData.Parameters);
brokerUri = compositeData.toUri();
}
else
{
StringDictionary options = NMS.Util.URISupport.ParseQuery(brokerUri.Query);
SetUriOptions(options);
}
}
}
public IRedeliveryPolicy RedeliveryPolicy { get; set; }
public ConsumerTransformerDelegate ConsumerTransformer { get; set; }
public ProducerTransformerDelegate ProducerTransformer { get; set; }
private Uri CreateUri(string uri)
{
if (string.IsNullOrEmpty(uri))
{
throw new ArgumentException("Invalid Uri: cannot be null or empty");
}
try
{
return NMS.Util.URISupport.CreateCompatibleUri(uri);
}
catch (UriFormatException e)
{
throw new ArgumentException("Invalid Uri: " + uri, e);
}
}
private NmsConnectionInfo ConfigureConnectionInfo(string userName, string password)
{
var connectionInfo = new NmsConnectionInfo(new NmsConnectionId(ConnectionIdGenerator.GenerateId()))
{
UserName = userName,
Password = password,
ConfiguredUri = BrokerUri,
RequestTimeout = RequestTimeout,
SendTimeout = SendTimeout,
CloseTimeout = CloseTimeout,
LocalMessageExpiry = LocalMessageExpiry
};
bool userSpecifiedClientId = ClientId != null;
if (userSpecifiedClientId)
{
connectionInfo.SetClientId(ClientId, true);
}
else
{
connectionInfo.SetClientId(ClientIdGenerator.GenerateId(), false);
}
return connectionInfo;
}
private void SetUriOptions(StringDictionary options)
{
StringDictionary nmsOptions = PropertyUtil.FilterProperties(options, "nms.");
PropertyUtil.SetProperties(this, nmsOptions);
// TODO: Check if there are any unused options, if so throw argument exception
}
private Uri GetDefaultRemoteAddress()
{
return new Uri("amqp://" + DEFAULT_REMOTE_HOST + ":" + DEFAULT_REMOTE_PORT);
}
}
}