blob: 84ecf226786a41273d2f84711431727e72825088 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
using System;
using System.Collections.Specialized;
using System.Threading.Tasks;
using Apache.NMS.ActiveMQ.Util;
using Apache.NMS.ActiveMQ.Transport;
using Apache.NMS.ActiveMQ.Util.Synchronization;
using Apache.NMS.Util;
using Apache.NMS.Policies;
namespace Apache.NMS.ActiveMQ
/// <summary>
/// Represents a connection with a message broker
/// </summary>
public class ConnectionFactory : IConnectionFactory
public const string DEFAULT_BROKER_URL = "failover:tcp://localhost:61616";
public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL";
private static event ExceptionListener onException;
private Uri brokerUri;
private string connectionUserName;
private string connectionPassword;
private string clientId;
private string clientIdPrefix;
private IdGenerator clientIdGenerator;
private bool useCompression;
private bool copyMessageOnSend = true;
private bool dispatchAsync = true;
private bool asyncSend;
private bool asyncClose;
private bool alwaysSyncSend;
private bool sendAcksAsync = true;
private int producerWindowSize = 0;
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
private bool messagePrioritySupported = false;
private bool watchTopicAdvisories = true;
private bool optimizeAcknowledge;
private long optimizeAcknowledgeTimeOut = 300;
private long optimizedAckScheduledAckInterval = 0;
private bool useRetroactiveConsumer;
private bool exclusiveConsumer;
private long consumerFailoverRedeliveryWaitPeriod = 0;
private bool checkForDuplicates = true;
private bool transactedIndividualAck = false;
private bool nonBlockingRedelivery = false;
private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
private ICompressionPolicy compressionPolicy = new CompressionPolicy();
static ConnectionFactory()
TransportFactory.OnException += ConnectionFactory.ExceptionHandler;
public static string GetDefaultBrokerUrl()
#if (PocketPC||NETCF||NETCF_2_0)
return Environment.GetEnvironmentVariable(ENV_BROKER_URL) ?? DEFAULT_BROKER_URL;
public ConnectionFactory() : this(GetDefaultBrokerUrl())
public ConnectionFactory(string brokerUri) : this(brokerUri, null)
public ConnectionFactory(string brokerUri, string clientID)
: this(URISupport.CreateCompatibleUri(brokerUri), clientID)
public ConnectionFactory(Uri brokerUri) : this(brokerUri, null)
public ConnectionFactory(Uri brokerUri, string clientID)
this.BrokerUri = brokerUri;
this.ClientId = clientID;
public IConnection CreateConnection()
return CreateActiveMQConnection();
public IConnection CreateConnection(string userName, string password)
return CreateActiveMQConnection(userName, password);
public Task<IConnection> CreateConnectionAsync()
return Task.FromResult(CreateConnection());
public Task<IConnection> CreateConnectionAsync(string userName, string password)
return Task.FromResult(CreateConnection(userName, password));
public INMSContext CreateContext()
return new NmsContext((Connection)CreateConnection(), acknowledgementMode);
public INMSContext CreateContext(AcknowledgementMode ackMode)
return new NmsContext((Connection)CreateConnection(), ackMode);
public INMSContext CreateContext(string userName, string password)
return new NmsContext((Connection)CreateConnection(userName, password), acknowledgementMode);
public INMSContext CreateContext(string userName, string password, AcknowledgementMode ackMode)
return new NmsContext((Connection)CreateConnection(userName, password), ackMode);
public async Task<INMSContext> CreateContextAsync()
return new NmsContext((Connection)await CreateConnectionAsync().Await(), acknowledgementMode);
public async Task<INMSContext> CreateContextAsync(AcknowledgementMode ackMode)
return new NmsContext((Connection)await CreateConnectionAsync().Await(), ackMode);
public async Task<INMSContext> CreateContextAsync(string userName, string password)
return new NmsContext((Connection)await CreateConnectionAsync(userName, password).Await(), acknowledgementMode);
public async Task<INMSContext> CreateContextAsync(string userName, string password, AcknowledgementMode ackMode)
return new NmsContext((Connection)await CreateConnectionAsync(userName, password).Await(), ackMode);
protected virtual Connection CreateActiveMQConnection()
return CreateActiveMQConnection(connectionUserName, connectionPassword);
protected virtual Connection CreateActiveMQConnection(string userName, string password)
Connection connection = null;
Tracer.InfoFormat("Connecting to: {0}", brokerUri.ToString());
ITransport transport = TransportFactory.CreateTransport(brokerUri);
connection = CreateActiveMQConnection(transport);
connection.UserName = userName;
connection.Password = password;
if(this.clientId != null)
connection.DefaultClientId = this.clientId;
return connection;
catch(Exception e)
throw NMSExceptionSupport.Create("Could not connect to broker URL: " + this.brokerUri + ". Reason: " + e.Message, e);
protected virtual Connection CreateActiveMQConnection(ITransport transport)
return new Connection(this.brokerUri, transport, this.ClientIdGenerator);
#region ConnectionFactory Properties
/// <summary>
/// Get/or set the broker Uri.
/// </summary>
public Uri BrokerUri
get { return brokerUri; }
Tracer.Info("BrokerUri set = " + value.OriginalString);
brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "activemqnettx:"));
brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "activemq:"));
if(!String.IsNullOrEmpty(brokerUri.Query) && !brokerUri.OriginalString.EndsWith(")"))
// Since the Uri class will return the end of a Query string found in a Composite
// URI we must ensure that we trim that off before we proceed.
string query = brokerUri.Query.Substring(brokerUri.Query.LastIndexOf(")") + 1);
StringDictionary properties = URISupport.ParseQuery(query);
StringDictionary connection = URISupport.ExtractProperties(properties, "connection.");
StringDictionary nms = URISupport.ExtractProperties(properties, "nms.");
IntrospectionSupport.SetProperties(this, connection, "connection.");
IntrospectionSupport.SetProperties(this, nms, "nms.");
brokerUri = URISupport.CreateRemainingUri(brokerUri, properties);
public string UserName
get { return connectionUserName; }
set { connectionUserName = value; }
public string Password
get { return connectionPassword; }
set { connectionPassword = value; }
public string ClientId
get { return clientId; }
set { clientId = value; }
public string ClientIdPrefix
get { return clientIdPrefix; }
set { clientIdPrefix = value; }
public bool UseCompression
get { return this.useCompression; }
set { this.useCompression = value; }
public bool CopyMessageOnSend
get { return copyMessageOnSend; }
set { copyMessageOnSend = value; }
public bool AlwaysSyncSend
get { return alwaysSyncSend; }
set { alwaysSyncSend = value; }
public bool AsyncClose
get { return asyncClose; }
set { asyncClose = value; }
public bool SendAcksAsync
get { return sendAcksAsync; }
set { sendAcksAsync = value; }
public bool AsyncSend
get { return asyncSend; }
set { asyncSend = value; }
public bool DispatchAsync
get { return this.dispatchAsync; }
set { this.dispatchAsync = value; }
public bool WatchTopicAdvisories
get { return this.watchTopicAdvisories; }
set { this.watchTopicAdvisories = value; }
public bool MessagePrioritySupported
get { return this.messagePrioritySupported; }
set { this.messagePrioritySupported = value; }
public int RequestTimeout
get { return (int)this.requestTimeout.TotalMilliseconds; }
set { this.requestTimeout = TimeSpan.FromMilliseconds(value); }
public string AckMode
set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
public AcknowledgementMode AcknowledgementMode
get { return acknowledgementMode; }
set { this.acknowledgementMode = value; }
public int ProducerWindowSize
get { return producerWindowSize; }
set { producerWindowSize = value; }
public PrefetchPolicy PrefetchPolicy
get { return this.prefetchPolicy; }
set { this.prefetchPolicy = value; }
public IRedeliveryPolicy RedeliveryPolicy
get { return this.redeliveryPolicy; }
if(value != null)
this.redeliveryPolicy = value;
public ICompressionPolicy CompressionPolicy
get { return this.compressionPolicy; }
if(value != null)
this.compressionPolicy = value;
public IdGenerator ClientIdGenerator
set { this.clientIdGenerator = value; }
if(this.clientIdGenerator == null)
if(this.clientIdPrefix != null)
this.clientIdGenerator = new IdGenerator(this.clientIdPrefix);
this.clientIdGenerator = new IdGenerator();
return this.clientIdGenerator;
public event ExceptionListener OnException
add { onException += value; }
if(onException != null)
onException -= value;
private ConsumerTransformerDelegate consumerTransformer;
/// <summary>
/// A Delegate that is called each time a Message is dispatched to allow the client to do
/// any necessary transformations on the received message before it is delivered. The
/// ConnectionFactory sets the provided delegate instance on each Connection instance that
/// is created from this factory, each connection in turn passes the delegate along to each
/// Session it creates which then passes that along to the Consumers it creates.
/// </summary>
public ConsumerTransformerDelegate ConsumerTransformer
get { return this.consumerTransformer; }
set { this.consumerTransformer = value; }
private ProducerTransformerDelegate producerTransformer;
/// <summary>
/// A delegate that is called each time a Message is sent from this Producer which allows
/// the application to perform any needed transformations on the Message before it is sent.
/// The ConnectionFactory sets the provided delegate instance on each Connection instance that
/// is created from this factory, each connection in turn passes the delegate along to each
/// Session it creates which then passes that along to the Producers it creates.
/// </summary>
public ProducerTransformerDelegate ProducerTransformer
get { return this.producerTransformer; }
set { this.producerTransformer = value; }
public bool OptimizeAcknowledge
get { return this.optimizeAcknowledge; }
set { this.optimizeAcknowledge = value; }
public long OptimizeAcknowledgeTimeOut
get { return this.optimizeAcknowledgeTimeOut; }
set { this.optimizeAcknowledgeTimeOut = value; }
public long OptimizedAckScheduledAckInterval
get { return this.optimizedAckScheduledAckInterval; }
set { this.optimizedAckScheduledAckInterval = value; }
public bool UseRetroactiveConsumer
get { return this.useRetroactiveConsumer; }
set { this.useRetroactiveConsumer = value; }
public bool ExclusiveConsumer
get { return this.exclusiveConsumer; }
set { this.exclusiveConsumer = value; }
public long ConsumerFailoverRedeliveryWaitPeriod
get { return this.consumerFailoverRedeliveryWaitPeriod; }
set { this.consumerFailoverRedeliveryWaitPeriod = value; }
public bool CheckForDuplicates
get { return this.checkForDuplicates; }
set { this.checkForDuplicates = value; }
public bool TransactedIndividualAck
get { return this.transactedIndividualAck; }
set { this.transactedIndividualAck = value; }
public bool NonBlockingRedelivery
get { return this.nonBlockingRedelivery; }
set { this.nonBlockingRedelivery = value; }
public int AuditDepth
get { return this.auditDepth; }
set { this.auditDepth = value; }
public int AuditMaximumProducerNumber
get { return this.auditMaximumProducerNumber; }
set { this.auditMaximumProducerNumber = value; }
protected virtual void ConfigureConnection(Connection connection)
connection.AsyncClose = this.AsyncClose;
connection.AsyncSend = this.AsyncSend;
connection.CopyMessageOnSend = this.CopyMessageOnSend;
connection.AlwaysSyncSend = this.AlwaysSyncSend;
connection.DispatchAsync = this.DispatchAsync;
connection.SendAcksAsync = this.SendAcksAsync;
connection.AcknowledgementMode = this.acknowledgementMode;
connection.UseCompression = this.useCompression;
connection.RequestTimeout = this.requestTimeout;
connection.ProducerWindowSize = this.producerWindowSize;
connection.MessagePrioritySupported = this.messagePrioritySupported;
connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;
connection.ConsumerTransformer = this.consumerTransformer;
connection.ProducerTransformer = this.producerTransformer;
connection.WatchTopicAdvisories = this.watchTopicAdvisories;
connection.OptimizeAcknowledge = this.optimizeAcknowledge;
connection.OptimizeAcknowledgeTimeOut = this.optimizeAcknowledgeTimeOut;
connection.OptimizedAckScheduledAckInterval = this.optimizedAckScheduledAckInterval;
connection.UseRetroactiveConsumer = this.useRetroactiveConsumer;
connection.ExclusiveConsumer = this.exclusiveConsumer;
connection.ConsumerFailoverRedeliveryWaitPeriod = this.consumerFailoverRedeliveryWaitPeriod;
connection.CheckForDuplicates = this.checkForDuplicates;
connection.TransactedIndividualAck = this.transactedIndividualAck;
connection.NonBlockingRedelivery = this.nonBlockingRedelivery;
connection.AuditDepth = this.auditDepth;
connection.AuditMaximumProducerNumber = this.auditMaximumProducerNumber;
protected static void ExceptionHandler(Exception ex)
if(ConnectionFactory.onException != null)