blob: c774ff963f0a8711c59bd26d3fdaf08a644729d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Specialized;
using Apache.NMS.ActiveMQ.Util;
using Apache.NMS.ActiveMQ.Transport;
using Apache.NMS.Util;
using Apache.NMS.Policies;
namespace Apache.NMS.ActiveMQ
{
/// <summary>
/// Represents a connection with a message broker
/// </summary>
public class ConnectionFactory : IConnectionFactory
{
public const string DEFAULT_BROKER_URL = "failover:tcp://localhost:61616";
public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL";
private static event ExceptionListener onException;
private Uri brokerUri;
private string connectionUserName;
private string connectionPassword;
private string clientId;
private string clientIdPrefix;
private IdGenerator clientIdGenerator;
private bool useCompression;
private bool copyMessageOnSend = true;
private bool dispatchAsync = true;
private bool asyncSend;
private bool asyncClose;
private bool alwaysSyncSend;
private bool sendAcksAsync = true;
private int producerWindowSize = 0;
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
private bool messagePrioritySupported = true;
private bool watchTopicAdvisories = true;
private bool optimizeAcknowledge;
private long optimizeAcknowledgeTimeOut = 300;
private long optimizedAckScheduledAckInterval = 0;
private bool useRetroactiveConsumer;
private bool exclusiveConsumer;
private long consumerFailoverRedeliveryWaitPeriod = 0;
private bool checkForDuplicates = true;
private bool transactedIndividualAck = false;
private bool nonBlockingRedelivery = false;
private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
private ICompressionPolicy compressionPolicy = new CompressionPolicy();
static ConnectionFactory()
{
TransportFactory.OnException += ConnectionFactory.ExceptionHandler;
}
public static string GetDefaultBrokerUrl()
{
#if (PocketPC||NETCF||NETCF_2_0)
return DEFAULT_BROKER_URL;
#else
return Environment.GetEnvironmentVariable(ENV_BROKER_URL) ?? DEFAULT_BROKER_URL;
#endif
}
public ConnectionFactory() : this(GetDefaultBrokerUrl())
{
}
public ConnectionFactory(string brokerUri) : this(brokerUri, null)
{
}
public ConnectionFactory(string brokerUri, string clientID)
: this(URISupport.CreateCompatibleUri(brokerUri), clientID)
{
}
public ConnectionFactory(Uri brokerUri) : this(brokerUri, null)
{
}
public ConnectionFactory(Uri brokerUri, string clientID)
{
this.BrokerUri = brokerUri;
this.ClientId = clientID;
}
public IConnection CreateConnection()
{
return CreateActiveMQConnection();
}
public IConnection CreateConnection(string userName, string password)
{
return CreateActiveMQConnection(userName, password);
}
protected virtual Connection CreateActiveMQConnection()
{
return CreateActiveMQConnection(connectionUserName, connectionPassword);
}
protected virtual Connection CreateActiveMQConnection(string userName, string password)
{
Connection connection = null;
try
{
Tracer.InfoFormat("Connecting to: {0}", brokerUri.ToString());
ITransport transport = TransportFactory.CreateTransport(brokerUri);
connection = CreateActiveMQConnection(transport);
ConfigureConnection(connection);
connection.UserName = userName;
connection.Password = password;
if(this.clientId != null)
{
connection.DefaultClientId = this.clientId;
}
return connection;
}
catch(NMSException)
{
try
{
connection.Close();
}
catch
{
}
throw;
}
catch(Exception e)
{
try
{
connection.Close();
}
catch
{
}
throw NMSExceptionSupport.Create("Could not connect to broker URL: " + this.brokerUri + ". Reason: " + e.Message, e);
}
}
protected virtual Connection CreateActiveMQConnection(ITransport transport)
{
return new Connection(this.brokerUri, transport, this.ClientIdGenerator);
}
#region ConnectionFactory Properties
/// <summary>
/// Get/or set the broker Uri.
/// </summary>
public Uri BrokerUri
{
get { return brokerUri; }
set
{
Tracer.Info("BrokerUri set = " + value.OriginalString);
if(value.OriginalString.StartsWith("activemqnettx:"))
{
brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "activemqnettx:"));
}
else
{
brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "activemq:"));
}
if(!String.IsNullOrEmpty(brokerUri.Query) && !brokerUri.OriginalString.EndsWith(")"))
{
// Since the Uri class will return the end of a Query string found in a Composite
// URI we must ensure that we trim that off before we proceed.
string query = brokerUri.Query.Substring(brokerUri.Query.LastIndexOf(")") + 1);
StringDictionary properties = URISupport.ParseQuery(query);
StringDictionary connection = URISupport.ExtractProperties(properties, "connection.");
StringDictionary nms = URISupport.ExtractProperties(properties, "nms.");
IntrospectionSupport.SetProperties(this, connection, "connection.");
IntrospectionSupport.SetProperties(this, nms, "nms.");
brokerUri = URISupport.CreateRemainingUri(brokerUri, properties);
}
}
}
public string UserName
{
get { return connectionUserName; }
set { connectionUserName = value; }
}
public string Password
{
get { return connectionPassword; }
set { connectionPassword = value; }
}
public string ClientId
{
get { return clientId; }
set { clientId = value; }
}
public string ClientIdPrefix
{
get { return clientIdPrefix; }
set { clientIdPrefix = value; }
}
public bool UseCompression
{
get { return this.useCompression; }
set { this.useCompression = value; }
}
public bool CopyMessageOnSend
{
get { return copyMessageOnSend; }
set { copyMessageOnSend = value; }
}
public bool AlwaysSyncSend
{
get { return alwaysSyncSend; }
set { alwaysSyncSend = value; }
}
public bool AsyncClose
{
get { return asyncClose; }
set { asyncClose = value; }
}
public bool SendAcksAsync
{
get { return sendAcksAsync; }
set { sendAcksAsync = value; }
}
public bool AsyncSend
{
get { return asyncSend; }
set { asyncSend = value; }
}
public bool DispatchAsync
{
get { return this.dispatchAsync; }
set { this.dispatchAsync = value; }
}
public bool WatchTopicAdvisories
{
get { return this.watchTopicAdvisories; }
set { this.watchTopicAdvisories = value; }
}
public bool MessagePrioritySupported
{
get { return this.messagePrioritySupported; }
set { this.messagePrioritySupported = value; }
}
public int RequestTimeout
{
get { return (int)this.requestTimeout.TotalMilliseconds; }
set { this.requestTimeout = TimeSpan.FromMilliseconds(value); }
}
public string AckMode
{
set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
}
public AcknowledgementMode AcknowledgementMode
{
get { return acknowledgementMode; }
set { this.acknowledgementMode = value; }
}
public int ProducerWindowSize
{
get { return producerWindowSize; }
set { producerWindowSize = value; }
}
public PrefetchPolicy PrefetchPolicy
{
get { return this.prefetchPolicy; }
set { this.prefetchPolicy = value; }
}
public IRedeliveryPolicy RedeliveryPolicy
{
get { return this.redeliveryPolicy; }
set
{
if(value != null)
{
this.redeliveryPolicy = value;
}
}
}
public ICompressionPolicy CompressionPolicy
{
get { return this.compressionPolicy; }
set
{
if(value != null)
{
this.compressionPolicy = value;
}
}
}
public IdGenerator ClientIdGenerator
{
set { this.clientIdGenerator = value; }
get
{
lock(this)
{
if(this.clientIdGenerator == null)
{
if(this.clientIdPrefix != null)
{
this.clientIdGenerator = new IdGenerator(this.clientIdPrefix);
}
else
{
this.clientIdGenerator = new IdGenerator();
}
}
return this.clientIdGenerator;
}
}
}
public event ExceptionListener OnException
{
add { onException += value; }
remove
{
if(onException != null)
{
onException -= value;
}
}
}
private ConsumerTransformerDelegate consumerTransformer;
/// <summary>
/// A Delegate that is called each time a Message is dispatched to allow the client to do
/// any necessary transformations on the received message before it is delivered. The
/// ConnectionFactory sets the provided delegate instance on each Connection instance that
/// is created from this factory, each connection in turn passes the delegate along to each
/// Session it creates which then passes that along to the Consumers it creates.
/// </summary>
public ConsumerTransformerDelegate ConsumerTransformer
{
get { return this.consumerTransformer; }
set { this.consumerTransformer = value; }
}
private ProducerTransformerDelegate producerTransformer;
/// <summary>
/// A delegate that is called each time a Message is sent from this Producer which allows
/// the application to perform any needed transformations on the Message before it is sent.
/// The ConnectionFactory sets the provided delegate instance on each Connection instance that
/// is created from this factory, each connection in turn passes the delegate along to each
/// Session it creates which then passes that along to the Producers it creates.
/// </summary>
public ProducerTransformerDelegate ProducerTransformer
{
get { return this.producerTransformer; }
set { this.producerTransformer = value; }
}
public bool OptimizeAcknowledge
{
get { return this.optimizeAcknowledge; }
set { this.optimizeAcknowledge = value; }
}
public long OptimizeAcknowledgeTimeOut
{
get { return this.optimizeAcknowledgeTimeOut; }
set { this.optimizeAcknowledgeTimeOut = value; }
}
public long OptimizedAckScheduledAckInterval
{
get { return this.optimizedAckScheduledAckInterval; }
set { this.optimizedAckScheduledAckInterval = value; }
}
public bool UseRetroactiveConsumer
{
get { return this.useRetroactiveConsumer; }
set { this.useRetroactiveConsumer = value; }
}
public bool ExclusiveConsumer
{
get { return this.exclusiveConsumer; }
set { this.exclusiveConsumer = value; }
}
public long ConsumerFailoverRedeliveryWaitPeriod
{
get { return this.consumerFailoverRedeliveryWaitPeriod; }
set { this.consumerFailoverRedeliveryWaitPeriod = value; }
}
public bool CheckForDuplicates
{
get { return this.checkForDuplicates; }
set { this.checkForDuplicates = value; }
}
public bool TransactedIndividualAck
{
get { return this.transactedIndividualAck; }
set { this.transactedIndividualAck = value; }
}
public bool NonBlockingRedelivery
{
get { return this.nonBlockingRedelivery; }
set { this.nonBlockingRedelivery = value; }
}
public int AuditDepth
{
get { return this.auditDepth; }
set { this.auditDepth = value; }
}
public int AuditMaximumProducerNumber
{
get { return this.auditMaximumProducerNumber; }
set { this.auditMaximumProducerNumber = value; }
}
#endregion
protected virtual void ConfigureConnection(Connection connection)
{
connection.AsyncClose = this.AsyncClose;
connection.AsyncSend = this.AsyncSend;
connection.CopyMessageOnSend = this.CopyMessageOnSend;
connection.AlwaysSyncSend = this.AlwaysSyncSend;
connection.DispatchAsync = this.DispatchAsync;
connection.SendAcksAsync = this.SendAcksAsync;
connection.AcknowledgementMode = this.acknowledgementMode;
connection.UseCompression = this.useCompression;
connection.RequestTimeout = this.requestTimeout;
connection.ProducerWindowSize = this.producerWindowSize;
connection.MessagePrioritySupported = this.messagePrioritySupported;
connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;
connection.ConsumerTransformer = this.consumerTransformer;
connection.ProducerTransformer = this.producerTransformer;
connection.WatchTopicAdvisories = this.watchTopicAdvisories;
connection.OptimizeAcknowledge = this.optimizeAcknowledge;
connection.OptimizeAcknowledgeTimeOut = this.optimizeAcknowledgeTimeOut;
connection.OptimizedAckScheduledAckInterval = this.optimizedAckScheduledAckInterval;
connection.UseRetroactiveConsumer = this.useRetroactiveConsumer;
connection.ExclusiveConsumer = this.exclusiveConsumer;
connection.ConsumerFailoverRedeliveryWaitPeriod = this.consumerFailoverRedeliveryWaitPeriod;
connection.CheckForDuplicates = this.checkForDuplicates;
connection.TransactedIndividualAck = this.transactedIndividualAck;
connection.NonBlockingRedelivery = this.nonBlockingRedelivery;
connection.AuditDepth = this.auditDepth;
connection.AuditMaximumProducerNumber = this.auditMaximumProducerNumber;
}
protected static void ExceptionHandler(Exception ex)
{
if(ConnectionFactory.onException != null)
{
ConnectionFactory.onException(ex);
}
}
}
}