blob: a62369720b33b8062a08ab05e04a9f6a1a671ae9 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
using System;
using System.Collections;
using System.Collections.Specialized;
using Apache.NMS.Policies;
using Org.Apache.Qpid.Messaging;
namespace Apache.NMS.Amqp
/// <summary>
/// A Factory that can establish NMS connections to AMQP using QPID.
/// @param brokerUri String or Uri specifying base connection address
/// @param clientID String specifying client ID
/// @param connectionProperties formed by one of:
/// * 0..N Strings specifying Qpid connection connectionProperties in the form "name:value".
/// * Hashtable containing properties as key/value pairs
/// Connection URI are defined in
/// Example using property strings:
/// Uri connecturi = new Uri("amqp:localhost:5673")
/// IConnectionFactory factory = new NMSConnectionFactory();
/// IConnectionFactory factory = new NMSConnectionFactory(connecturi);
/// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA");
/// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA", "protocol:amqp1.0");
/// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA", "protocol:amqp1.0", "reconnect:true", "reconnect_timeout:60", "username:bob", "password:secret");
/// Example using property table:
/// Uri connecturi = new Uri("amqp:localhost:5672")
/// Hashtable properties = new Hashtable();
/// properties.Add("protocol", "amqp1.0");
/// properties.Add("reconnect_timeout", 60)
/// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA", properties);
/// See
/// for more information on Qpid connection options.
/// </summary>
public class ConnectionFactory : IConnectionFactory
public const string DEFAULT_BROKER_URL = "tcp://localhost:5672";
public const string ENV_BROKER_URL = "AMQP_BROKER_URL";
private const char SEP_NAME_VALUE = ':';
private Uri brokerUri;
private string clientID;
private StringDictionary properties = new StringDictionary();
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
#region Constructor Methods
public static string GetDefaultBrokerUrl()
string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
if (answer == null)
return answer;
public ConnectionFactory()
: this(new Uri(GetDefaultBrokerUrl()), string.Empty, (Object[])null)
public ConnectionFactory(string brokerUri)
: this(new Uri(brokerUri), string.Empty, (Object[])null)
public ConnectionFactory(string brokerUri, string clientID)
: this(new Uri(brokerUri), clientID, (Object[])null)
public ConnectionFactory(Uri brokerUri)
: this(brokerUri, string.Empty, (Object[])null)
public ConnectionFactory(Uri brokerUri, string clientID)
: this(brokerUri, clientID, (Object[])null)
public ConnectionFactory(Uri brokerUri, string clientID, params Object[] propsArray)
Tracer.DebugFormat("Amqp: create connection factory for Uri: {0}", brokerUri.ToString());
this.brokerUri = brokerUri;
this.clientID = clientID;
if (propsArray != null)
foreach (object prop in propsArray)
string nvp = prop.ToString();
int sepPos = nvp.IndexOf(SEP_NAME_VALUE);
if (sepPos > 0)
properties.Add(nvp.Substring(0, sepPos), nvp.Substring(sepPos + 1));
throw new NMSException("Connection property is not in the form \"name:value\" :" + nvp);
catch (Exception ex)
Apache.NMS.Tracer.DebugFormat("Exception instantiating AMQP.ConnectionFactory: {0}", ex.Message);
public ConnectionFactory(Uri brokerUri, string clientID, Hashtable propsTable)
Tracer.DebugFormat("Amqp: create connection factory for Uri: {0}", brokerUri.ToString());
this.brokerUri = brokerUri;
this.clientID = clientID;
if (properties != null)
foreach (var key in propsTable.Keys)
properties.Add(key.ToString(), propsTable[key].ToString());
catch (Exception ex)
Apache.NMS.Tracer.DebugFormat("Exception instantiating AMQP.ConnectionFactory: {0}", ex.Message);
#region IConnectionFactory Members
/// <summary>
/// Creates a new connection to Qpid/Amqp.
/// </summary>
public IConnection CreateConnection()
return CreateConnection(string.Empty, string.Empty);
/// <summary>
/// Creates a new connection to Qpid/Amqp.
/// </summary>
public IConnection CreateConnection(string userName, string password)
Connection connection = new Connection();
connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
//connection.ConsumerTransformer = this.consumerTransformer; // TODO:
//connection.ProducerTransformer = this.producerTransformer; // TODO:
connection.BrokerUri = this.BrokerUri;
connection.ClientId = this.clientID;
connection.ConnectionProperties =;
if (!String.IsNullOrEmpty(userName))
connection.SetConnectionProperty(Connection.USERNAME_OPTION, userName);
if (!String.IsNullOrEmpty(password))
connection.SetConnectionProperty(Connection.PASSWORD_OPTION, password);
IConnection ReturnValue = null;
ReturnValue = connection;
return ReturnValue;
/// <summary>
/// Get/or set the broker Uri.
/// </summary>
public Uri BrokerUri
get { return brokerUri; }
set { brokerUri = value; }
/// <summary>
/// Get/or set the redelivery policy that new IConnection objects are
/// assigned upon creation.
/// </summary>
public IRedeliveryPolicy RedeliveryPolicy
get { return this.redeliveryPolicy; }
if (value != null)
this.redeliveryPolicy = value;
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
get { return this.consumerTransformer; }
set { this.consumerTransformer = value; }
private ProducerTransformerDelegate producerTransformer;
public ProducerTransformerDelegate ProducerTransformer
get { return this.producerTransformer; }
set { this.producerTransformer = value; }
#region ConnectionProperties Methods
/// <summary>
/// Connection connectionProperties acceessor
/// </summary>
/// <remarks>This factory does not check for legal property names. Users
/// my specify anything they want. Propery name processing happens when
/// connections are created and started.</remarks>
public StringDictionary ConnectionProperties
get { return properties; }
set { properties = value; }
/// <summary>
/// Test existence of named property
/// </summary>
/// <param name="name">The name of the connection property to test.</param>
/// <returns>Boolean indicating if property exists in setting dictionary.</returns>
public bool ConnectionPropertyExists(string name)
return properties.ContainsKey(name);
/// <summary>
/// Get value of named property
/// </summary>
/// <param name="name">The name of the connection property to get.</param>
/// <returns>string value of property.</returns>
/// <remarks>Throws if requested property does not exist.</remarks>
public string GetConnectionProperty(string name)
if (properties.ContainsKey(name))
return properties[name];
throw new NMSException("Amqp connection property '" + name + "' does not exist");
/// <summary>
/// Set value of named property
/// </summary>
/// <param name="name">The name of the connection property to set.</param>
/// <param name="value">The value of the connection property.</param>
/// <returns>void</returns>
/// <remarks>Existing property values are overwritten. New property values
/// are added.</remarks>
public void SetConnectionProperty(string name, string value)
if (properties.ContainsKey(name))
properties[name] = value;
properties.Add(name, value);