blob: a62369720b33b8062a08ab05e04a9f6a1a671ae9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Specialized;
using Apache.NMS.Policies;
using Org.Apache.Qpid.Messaging;
namespace Apache.NMS.Amqp
{
/// <summary>
/// A Factory that can establish NMS connections to AMQP using QPID.
///
/// @param brokerUri String or Uri specifying base connection address
/// @param clientID String specifying client ID
/// @param connectionProperties formed by one of:
/// * 0..N Strings specifying Qpid connection connectionProperties in the form "name:value".
/// * Hashtable containing properties as key/value pairs
///
/// Connection URI are defined in
/// http://qpid.apache.org/releases/qpid-trunk/programming/book/connections.html#connection-url
///
/// Example using property strings:
///
/// Uri connecturi = new Uri("amqp:localhost:5673")
/// IConnectionFactory factory = new NMSConnectionFactory();
/// IConnectionFactory factory = new NMSConnectionFactory(connecturi);
/// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA");
/// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA", "protocol:amqp1.0");
/// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA", "protocol:amqp1.0", "reconnect:true", "reconnect_timeout:60", "username:bob", "password:secret");
///
/// Example using property table:
///
/// Uri connecturi = new Uri("amqp:localhost:5672")
/// Hashtable properties = new Hashtable();
/// properties.Add("protocol", "amqp1.0");
/// properties.Add("reconnect_timeout", 60)
/// IConnectionFactory factory = new NMSConnectionFactory(connecturi, "UserA", properties);
///
/// See http://qpid.apache.org/components/programming/book/connection-options.html
/// for more information on Qpid connection options.
/// </summary>
public class ConnectionFactory : IConnectionFactory
{
public const string DEFAULT_BROKER_URL = "tcp://localhost:5672";
public const string ENV_BROKER_URL = "AMQP_BROKER_URL";
private const char SEP_NAME_VALUE = ':';
private Uri brokerUri;
private string clientID;
private StringDictionary properties = new StringDictionary();
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
#region Constructor Methods
public static string GetDefaultBrokerUrl()
{
string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
if (answer == null)
{
answer = DEFAULT_BROKER_URL;
}
return answer;
}
public ConnectionFactory()
: this(new Uri(GetDefaultBrokerUrl()), string.Empty, (Object[])null)
{
}
public ConnectionFactory(string brokerUri)
: this(new Uri(brokerUri), string.Empty, (Object[])null)
{
}
public ConnectionFactory(string brokerUri, string clientID)
: this(new Uri(brokerUri), clientID, (Object[])null)
{
}
public ConnectionFactory(Uri brokerUri)
: this(brokerUri, string.Empty, (Object[])null)
{
}
public ConnectionFactory(Uri brokerUri, string clientID)
: this(brokerUri, clientID, (Object[])null)
{
}
public ConnectionFactory(Uri brokerUri, string clientID, params Object[] propsArray)
{
Tracer.DebugFormat("Amqp: create connection factory for Uri: {0}", brokerUri.ToString());
try
{
this.brokerUri = brokerUri;
this.clientID = clientID;
if (propsArray != null)
{
foreach (object prop in propsArray)
{
string nvp = prop.ToString();
int sepPos = nvp.IndexOf(SEP_NAME_VALUE);
if (sepPos > 0)
{
properties.Add(nvp.Substring(0, sepPos), nvp.Substring(sepPos + 1));
}
else
{
throw new NMSException("Connection property is not in the form \"name:value\" :" + nvp);
}
}
}
}
catch (Exception ex)
{
Apache.NMS.Tracer.DebugFormat("Exception instantiating AMQP.ConnectionFactory: {0}", ex.Message);
throw;
}
}
public ConnectionFactory(Uri brokerUri, string clientID, Hashtable propsTable)
{
Tracer.DebugFormat("Amqp: create connection factory for Uri: {0}", brokerUri.ToString());
try
{
this.brokerUri = brokerUri;
this.clientID = clientID;
if (properties != null)
{
foreach (var key in propsTable.Keys)
{
properties.Add(key.ToString(), propsTable[key].ToString());
}
}
}
catch (Exception ex)
{
Apache.NMS.Tracer.DebugFormat("Exception instantiating AMQP.ConnectionFactory: {0}", ex.Message);
throw;
}
}
#endregion
#region IConnectionFactory Members
/// <summary>
/// Creates a new connection to Qpid/Amqp.
/// </summary>
public IConnection CreateConnection()
{
return CreateConnection(string.Empty, string.Empty);
}
/// <summary>
/// Creates a new connection to Qpid/Amqp.
/// </summary>
public IConnection CreateConnection(string userName, string password)
{
Connection connection = new Connection();
connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
//connection.ConsumerTransformer = this.consumerTransformer; // TODO:
//connection.ProducerTransformer = this.producerTransformer; // TODO:
connection.BrokerUri = this.BrokerUri;
connection.ClientId = this.clientID;
connection.ConnectionProperties = this.properties;
if (!String.IsNullOrEmpty(userName))
{
connection.SetConnectionProperty(Connection.USERNAME_OPTION, userName);
}
if (!String.IsNullOrEmpty(password))
{
connection.SetConnectionProperty(Connection.PASSWORD_OPTION, password);
}
IConnection ReturnValue = null;
ReturnValue = connection;
return ReturnValue;
}
/// <summary>
/// Get/or set the broker Uri.
/// </summary>
public Uri BrokerUri
{
get { return brokerUri; }
set { brokerUri = value; }
}
/// <summary>
/// Get/or set the redelivery policy that new IConnection objects are
/// assigned upon creation.
/// </summary>
public IRedeliveryPolicy RedeliveryPolicy
{
get { return this.redeliveryPolicy; }
set
{
if (value != null)
{
this.redeliveryPolicy = value;
}
}
}
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
{
get { return this.consumerTransformer; }
set { this.consumerTransformer = value; }
}
private ProducerTransformerDelegate producerTransformer;
public ProducerTransformerDelegate ProducerTransformer
{
get { return this.producerTransformer; }
set { this.producerTransformer = value; }
}
#endregion
#region ConnectionProperties Methods
/// <summary>
/// Connection connectionProperties acceessor
/// </summary>
/// <remarks>This factory does not check for legal property names. Users
/// my specify anything they want. Propery name processing happens when
/// connections are created and started.</remarks>
public StringDictionary ConnectionProperties
{
get { return properties; }
set { properties = value; }
}
/// <summary>
/// Test existence of named property
/// </summary>
/// <param name="name">The name of the connection property to test.</param>
/// <returns>Boolean indicating if property exists in setting dictionary.</returns>
public bool ConnectionPropertyExists(string name)
{
return properties.ContainsKey(name);
}
/// <summary>
/// Get value of named property
/// </summary>
/// <param name="name">The name of the connection property to get.</param>
/// <returns>string value of property.</returns>
/// <remarks>Throws if requested property does not exist.</remarks>
public string GetConnectionProperty(string name)
{
if (properties.ContainsKey(name))
{
return properties[name];
}
else
{
throw new NMSException("Amqp connection property '" + name + "' does not exist");
}
}
/// <summary>
/// Set value of named property
/// </summary>
/// <param name="name">The name of the connection property to set.</param>
/// <param name="value">The value of the connection property.</param>
/// <returns>void</returns>
/// <remarks>Existing property values are overwritten. New property values
/// are added.</remarks>
public void SetConnectionProperty(string name, string value)
{
if (properties.ContainsKey(name))
{
properties[name] = value;
}
else
{
properties.Add(name, value);
}
}
#endregion
}
}