https://issues.apache.org/jira/browse/AMQNET-445

Clean up transport code a bit.
Improve Discovery transport to use an underlying FailoverTransport to manage connect / reconnect.
Added a more defined discovery agent interface and an attribute to use to tag them so that they can be auto discovered.  
Cleaned up and improved the Multicast agent.
diff --git a/src/main/csharp/Transport/Discovery/ActiveMQDiscoveryAgentFactoryAttribute.cs b/src/main/csharp/Transport/Discovery/ActiveMQDiscoveryAgentFactoryAttribute.cs
new file mode 100644
index 0000000..6e42b50
--- /dev/null
+++ b/src/main/csharp/Transport/Discovery/ActiveMQDiscoveryAgentFactoryAttribute.cs
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+    /// <summary>
+    /// Attribute that decorates DiscoveryAgentFactory implementations to allow
+    /// the DiscoverAgentFactory to find all the available factories dynamically.
+    /// </summary>
+    public class ActiveMQDiscoveryAgentFactoryAttribute : FactoryAttribute
+    {
+        public ActiveMQDiscoveryAgentFactoryAttribute(string scheme) : base(scheme)
+        {
+        }
+    }
+}
+
diff --git a/src/main/csharp/Transport/Discovery/DiscoveryAgentFactory.cs b/src/main/csharp/Transport/Discovery/DiscoveryAgentFactory.cs
new file mode 100644
index 0000000..37bb248
--- /dev/null
+++ b/src/main/csharp/Transport/Discovery/DiscoveryAgentFactory.cs
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using Apache.NMS.ActiveMQ.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+    public class DiscoveryAgentFactory
+    {
+        private static readonly FactoryFinder<ActiveMQDiscoveryAgentFactoryAttribute, ITransportFactory> FACTORY_FINDER =
+            new FactoryFinder<ActiveMQDiscoveryAgentFactoryAttribute, ITransportFactory>();
+        
+        private readonly static object AGENT_FACTORY_TYPES_LOCK = new object();
+        private readonly static Dictionary<String, Type> AGENT_FACTORY_TYPES = new Dictionary<String, Type>();
+
+        public void RegisterAgentFactory(string scheme, Type factoryType)
+        {
+            lock (AGENT_FACTORY_TYPES_LOCK)
+            {
+                AGENT_FACTORY_TYPES[scheme] = factoryType;
+            }
+        }
+
+        public static IDiscoveryAgent CreateAgent(Uri location)
+        {
+            IDiscoveryAgentFactory tf = DiscoveryAgentFactory.CreateAgentFactory(location);
+            return tf.CreateAgent(location);
+        }
+
+        /// <summary>
+        /// Create a DiscoveryAgent Factory for the scheme.  If we do not support the agent protocol,
+        /// an NMSConnectionException will be thrown.
+        /// </summary>
+        /// <param name="location"></param>
+        /// <returns></returns>
+        private static IDiscoveryAgentFactory CreateAgentFactory(Uri location)
+        {
+            string scheme = location.Scheme;
+
+            if(string.IsNullOrEmpty(scheme))
+            {
+                throw new NMSConnectionException(String.Format("Discovery Agent scheme invalid: [{0}]", location.ToString()));
+            }
+
+            IDiscoveryAgentFactory factory = null;
+
+            try
+            {
+                factory = NewInstance(scheme.ToLower());
+            }
+            catch(NMSConnectionException)
+            {
+                throw;
+            }
+            catch(Exception e)
+            {
+                throw new NMSConnectionException("Error creating discovery agent.", e);
+            }
+
+            if(null == factory)
+            {
+                throw new NMSConnectionException("Unable to create a discovery agent.");
+            }
+
+            return factory;
+        }
+
+        private static IDiscoveryAgentFactory NewInstance(string scheme)
+        {
+            try
+            {
+                Type factoryType = FindAgentFactory(scheme);
+
+                if(factoryType == null)
+                {
+                    throw new Exception("NewInstance failed to find a match for id = " + scheme);
+                }
+
+                return (IDiscoveryAgentFactory) Activator.CreateInstance(factoryType);
+            }
+            catch(Exception ex)
+            {
+                Tracer.WarnFormat("NewInstance failed to create an IDiscoveryAgentFactory with error: {0}", ex.Message);
+                throw;
+            }
+        }
+
+        private static Type FindAgentFactory(string scheme)
+        {
+            lock (AGENT_FACTORY_TYPES_LOCK)
+            {           
+                if(AGENT_FACTORY_TYPES.ContainsKey(scheme))
+                {
+                    return AGENT_FACTORY_TYPES[scheme];
+                }
+            }
+            
+            try
+            {
+                Type factoryType = FACTORY_FINDER.FindFactoryType(scheme);
+                
+                lock (AGENT_FACTORY_TYPES_LOCK)
+                {           
+                    AGENT_FACTORY_TYPES[scheme] = factoryType;
+                }
+                return factoryType;
+            }
+            catch
+            {
+                throw new NMSConnectionException("Failed to find Factory for Discovery Agent type: " + scheme);
+            }
+        }
+    }
+}
+
diff --git a/src/main/csharp/Transport/Discovery/DiscoveryTransport.cs b/src/main/csharp/Transport/Discovery/DiscoveryTransport.cs
new file mode 100644
index 0000000..900d13b
--- /dev/null
+++ b/src/main/csharp/Transport/Discovery/DiscoveryTransport.cs
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Text;
+using System.Collections.Generic;
+using System.Collections.Specialized;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+	public class DiscoveryTransport : TransportFilter
+	{
+        public const string DISCOVERED_OPTION_PREFIX = "discovered.";
+
+        private readonly new ICompositeTransport next;
+        private readonly object syncRoot = new object();
+        private readonly Dictionary<String, Uri> serviceURIs = new Dictionary<String, Uri>();
+
+        private IDiscoveryAgent discoveryAgent;
+        public IDiscoveryAgent DiscoveryAgent
+        {
+            get { return this.discoveryAgent; }
+            set { this.discoveryAgent = value; }
+        }
+
+        private StringDictionary properties;
+        public StringDictionary Properties
+        {
+            get { return this.properties; }
+            set { this.properties = value; }
+        }
+
+		public DiscoveryTransport(ICompositeTransport next) : base(next)
+		{
+            this.next = next;
+
+            // Intercept the interrupted and resumed events so we can disable our
+            // agent if its supports suspend / resume semantics.
+            this.next.Interrupted = TransportInterrupted;
+            this.next.Resumed = TransportResumed;
+		}
+
+        public override void Start()
+        {
+            if (discoveryAgent == null) 
+            {
+                throw new InvalidOperationException("discoveryAgent not configured");
+            }
+
+            // lets pass into the agent the broker name and connection details
+            discoveryAgent.ServiceAdd = OnServiceAdded;
+            discoveryAgent.ServiceRemove = OnServiceRemoved;
+            discoveryAgent.Start();
+            this.next.Start();
+        }
+
+        public override void Stop()
+        {
+            ServiceStopper ss = new ServiceStopper();
+
+            ss.Stop(discoveryAgent);
+            ss.Stop(next);
+            ss.ThrowFirstException();
+        }
+
+        private void OnServiceAdded(DiscoveryEvent addEvent)
+        {
+            String url = addEvent.ServiceName;
+            if (url != null) 
+            {
+                try 
+                {
+                    Uri uri = new Uri(url);
+                    Tracer.InfoFormat("Adding new broker connection URL: {0}", uri);
+                    uri = ApplyParameters(uri, properties, DISCOVERED_OPTION_PREFIX);
+                  
+                    lock (syncRoot)
+                    {
+                        serviceURIs[addEvent.ServiceName] = uri;
+                    }
+                    next.Add(false, new Uri[] {uri});
+                } 
+                catch (Exception e) 
+                {
+                    Tracer.WarnFormat("Could not connect to remote URI: {0} due to bad URI syntax: {1}", url, e.Message);
+                }
+            }
+        }
+
+        private void OnServiceRemoved(DiscoveryEvent removeEvent)
+        {
+            Uri toRemove = null;
+            lock (syncRoot)
+            {
+                serviceURIs.TryGetValue(removeEvent.ServiceName, out toRemove);
+            }
+            if (toRemove != null) 
+            {
+                next.Remove(false, new Uri[] {toRemove});
+            }
+        }
+
+        private void TransportResumed(ITransport sender) 
+        {
+            ISuspendable service = this.discoveryAgent as ISuspendable;
+            if (service != null) 
+            {
+                try 
+                {
+                    service.Suspend();
+                }
+                catch (Exception e) 
+                {
+                    Tracer.WarnFormat("Caught error while suspending service: {0} - {1}", service, e.Message);
+                }
+            }
+
+            if (this.Resumed != null)
+            {
+                this.Resumed(sender);
+            }
+        }
+
+        private void TransportInterrupted(ITransport sender)
+        {
+            ISuspendable service = this.discoveryAgent as ISuspendable;
+            if (service != null) 
+            {
+                try 
+                {
+                    service.Resume();
+                }
+                catch (Exception e) 
+                {
+                    Tracer.WarnFormat("Caught error while resuming service: {0} - {1}", service, e.Message);
+                }
+            }
+
+            if (this.Interrupted != null)
+            {
+                this.Interrupted(sender);
+            }
+        }
+
+        /// <summary>
+        /// Given a Key / Value mapping create and append a URI query value that represents the mapped 
+        /// entries, return the newly updated URI that contains the value of the given URI and the 
+        /// appended query value.  Each entry in the query string is prefixed by the supplied 
+        /// optionPrefix string.
+        /// </summary>
+        private static Uri ApplyParameters(Uri uri, StringDictionary queryParameters, String optionPrefix)
+        {
+            if (queryParameters != null && queryParameters.Count != 0) 
+            {
+                StringBuilder newQuery = uri.Query != null ? new StringBuilder(uri.Query) : new StringBuilder();
+
+                foreach(KeyValuePair<string, string> entry in queryParameters)
+                {
+                    if (entry.Key.StartsWith(optionPrefix)) 
+                    {
+                        if (newQuery.Length !=0) 
+                        {
+                            newQuery.Append('&');
+                        }
+                        string key = entry.Key.Substring(optionPrefix.Length);
+                        newQuery.Append(key).Append('=').Append(entry.Value);
+                    }
+                }
+                uri = URISupport.CreateUriWithQuery(uri, newQuery.ToString());
+            }
+            return uri;
+        }
+	}
+}
+
diff --git a/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs b/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs
index e045025..db8a19d 100644
--- a/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs
+++ b/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs
@@ -1,133 +1,53 @@
-/*

- * Licensed to the Apache Software Foundation (ASF) under one or more

- * contributor license agreements.  See the NOTICE file distributed with

- * this work for additional information regarding copyright ownership.

- * The ASF licenses this file to You under the Apache License, Version 2.0

- * (the "License"); you may not use this file except in compliance with

- * the License.  You may obtain a copy of the License at

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-using System;

-using System.Threading;

-using Apache.NMS.ActiveMQ.Transport.Discovery.Multicast;

-using Apache.NMS.ActiveMQ.Transport.Tcp;

-using Apache.NMS.Util;

-

-namespace Apache.NMS.ActiveMQ.Transport.Discovery

-{

-    [ActiveMQTransportFactory("discovery")]

-	public class DiscoveryTransportFactory : ITransportFactory

-	{

-		private const int TIMEOUT_IN_SECONDS = 20;

-

-		private static Uri discoveredUri;

-		private static readonly MulticastDiscoveryAgent agent;

-		private static string currentServiceName;

-		private static readonly object uriLock = new object();

-		private static readonly AutoResetEvent discoveredUriEvent = new AutoResetEvent(false);

-		private static event ExceptionListener OnException;

-

-		static DiscoveryTransportFactory()

-		{

-			DiscoveryTransportFactory.OnException += TransportFactory.HandleException;

-			agent = new MulticastDiscoveryAgent();

-			agent.OnNewServiceFound += agent_OnNewServiceFound;

-			agent.OnServiceRemoved += agent_OnServiceRemoved;

-		}

-

-		public DiscoveryTransportFactory()

-		{

-			lock(uriLock)

-			{

-				currentServiceName = String.Empty;

-			}

-		}

-

-		public static Uri DiscoveredUri

-		{

-			get { lock(uriLock) { return discoveredUri; } }

-			set { lock(uriLock) { discoveredUri = value; } }

-		}

-

-		private static void agent_OnNewServiceFound(string brokerName, string serviceName)

-		{

-			lock(uriLock)

-			{

-				if(discoveredUri == null)

-				{

-					currentServiceName = serviceName;

-					discoveredUri = new Uri(currentServiceName);

-				}

-			}

-

-			// This will end the wait in the CreateTransport method.

-			discoveredUriEvent.Set();

-		}

-

-		private static void agent_OnServiceRemoved(string brokerName, string serviceName)

-		{

-			lock(uriLock)

-			{

-				if(serviceName == currentServiceName)

-				{

-					DiscoveredUri = null;

-					DiscoveryTransportFactory.OnException(new Exception("Broker connection is no longer valid."));

-				}

-			}

-		}

-

-		#region Overloaded ITransportFactory Members

-

-		public ITransport CreateTransport(Uri location)

-		{

-			URISupport.CompositeData cd = URISupport.ParseComposite(location);

-

-			if(cd.Components.Length > 0)

-			{

-				agent.DiscoveryURI = cd.Components[0];

-			}

-

-			if(!agent.IsStarted)

-			{

-				agent.Start();

-			}

-

-			Uri hostUri = DiscoveredUri;

-

-			if(null == hostUri)

-			{

-				// If a new broker is found the agent will fire an event which will result in discoveredUri being set.

-				discoveredUriEvent.WaitOne(TIMEOUT_IN_SECONDS * 1000, true);

-				hostUri = DiscoveredUri;

-				if(null == hostUri)

-				{

-					throw new NMSConnectionException(String.Format("Unable to find a connection to {0} before the timeout period expired.", location.ToString()));

-				}

-			}

-

-			TcpTransportFactory tcpTransFactory = new TcpTransportFactory();

-			return tcpTransFactory.CompositeConnect(new Uri(hostUri + location.Query));

-		}

-

-		public ITransport CompositeConnect(Uri location)

-		{

-			return CreateTransport(location);

-		}

-

-

-		public ITransport CompositeConnect(Uri location, SetTransport setTransport)

-		{

-			return CreateTransport(location);

-		}

-

-		#endregion

-	}

-}

+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.ActiveMQ.Transport.Failover;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+    [ActiveMQTransportFactory("discovery")]
+	public class DiscoveryTransportFactory : FailoverTransportFactory
+	{
+        public override ITransport CreateTransport(URISupport.CompositeData compositData)
+        {
+            StringDictionary options = compositData.Parameters;
+            FailoverTransport failoverTransport = CreateTransport(options);
+            return CreateTransport(failoverTransport, compositData, options);
+        }
+
+        /// <summary>
+        /// Factory method for creating a DiscoveryTransport.  The Discovery Transport wraps the
+        /// given ICompositeTransport and will add and remove Transport URIs as they are discovered.
+        /// </summary>
+        public static DiscoveryTransport CreateTransport(ICompositeTransport compositeTransport, URISupport.CompositeData compositeData, StringDictionary options)
+        {
+            DiscoveryTransport transport = new DiscoveryTransport(compositeTransport);
+
+            URISupport.SetProperties(transport, options, "transport.");
+            transport.Properties = options;
+            
+            Uri discoveryAgentURI = compositeData.Components[0];
+            IDiscoveryAgent discoveryAgent = DiscoveryAgentFactory.CreateAgent(discoveryAgentURI);
+            transport.DiscoveryAgent = discoveryAgent;
+
+            return transport;
+        }
+	}
+}
diff --git a/src/main/csharp/Transport/Discovery/IDiscoveryAgent.cs b/src/main/csharp/Transport/Discovery/IDiscoveryAgent.cs
new file mode 100644
index 0000000..c75c1f8
--- /dev/null
+++ b/src/main/csharp/Transport/Discovery/IDiscoveryAgent.cs
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+    public delegate void ServiceAddHandler(DiscoveryEvent addEvent);
+    public delegate void ServiceRemoveHandler(DiscoveryEvent removeEvent);
+
+    public interface IDiscoveryAgent : IStartable, IStoppable
+    {
+        /// <summary>
+        /// Gets or sets the service add event handler
+        /// </summary>
+        ServiceAddHandler ServiceAdd
+        {
+            get;
+            set;
+        }
+
+        /// <summary>
+        /// Gets or sets the service remove event handler.
+        /// </summary>
+        ServiceRemoveHandler ServiceRemove
+        {
+            get;
+            set;
+        }
+
+        /// <summary>
+        /// Registers the service with the given name.
+        /// </summary>
+        void RegisterService(String name);
+
+        /// <summary>
+        /// A process actively using a service may see it go down before the DiscoveryAgent notices 
+        /// the service's failure.  That process can use this method to notify the IDiscoveryAgent 
+        /// of the failure so that other listeners of this IDiscoveryAgent can also be made aware 
+        /// of the failure.
+        /// </summary>
+        void ServiceFailed(DiscoveryEvent failedEvent);
+    
+    }
+}
+
diff --git a/src/main/csharp/Transport/Discovery/IDiscoveryAgentFactory.cs b/src/main/csharp/Transport/Discovery/IDiscoveryAgentFactory.cs
new file mode 100644
index 0000000..3cedd8f
--- /dev/null
+++ b/src/main/csharp/Transport/Discovery/IDiscoveryAgentFactory.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+    /// <summary>
+    /// Factory class interface for all DiscoveryAgent factories.  Each agent factory
+    /// should define its own factory attribute so that the agents can be found dynamically
+    /// by the DiscoveryAgentFactory class.
+    /// </summary>
+    public interface IDiscoveryAgentFactory
+    {
+        IDiscoveryAgent CreateAgent(Uri uri);
+    }
+}
+
diff --git a/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs b/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
index 0f3de13..a6e9561 100644
--- a/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
+++ b/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
@@ -20,150 +20,356 @@
 using System.Net;
 using System.Net.Sockets;
 using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ.Threads;
 using Apache.NMS.ActiveMQ.Transport.Tcp;
+using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast
 {
 	internal delegate void NewBrokerServiceFound(string brokerName, string serviceName);
 	internal delegate void BrokerServiceRemoved(string brokerName, string serviceName);
 
-	internal class MulticastDiscoveryAgent : IDisposable
+	internal class MulticastDiscoveryAgent : IDiscoveryAgent, IDisposable
 	{
-		public const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3;
-		public const int DEFAULT_BACKOFF_MILLISECONDS = 100;
-		public const int BACKOFF_MULTIPLIER = 2;
-		public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
-    	public const string DEFAULT_HOST_STR = "default"; 
-    	public const string DEFAULT_HOST_IP = "239.255.2.3";
-    	public const int DEFAULT_PORT = 6155;
+        public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
+        public const string DEFAULT_HOST_STR = "default"; 
+        public const string DEFAULT_HOST_IP = "239.255.2.3";
+        public const int DEFAULT_PORT = 6155;
+
+		public const int DEFAULT_INITIAL_RECONNECT_DELAY = 1000 * 5;
+        public const int DEFAULT_BACKOFF_MULTIPLIER = 2;
+        public const int DEFAULT_MAX_RECONNECT_DELAY = 1000 * 30;
 
 		private const string TYPE_SUFFIX = "ActiveMQ-4.";
 		private const string ALIVE = "alive";
 		private const string DEAD = "dead";
 		private const char DELIMITER = '%';
 		private const int BUFF_SIZE = 8192;
-		private const string DEFAULT_GROUP = "default";
-		private const int EXPIRATION_OFFSET_IN_SECONDS = 2;
-		private const int WORKER_KILL_TIME_SECONDS = 10;
-		private const int SOCKET_TIMEOUT_MILLISECONDS = 500;
+		private const int HEARTBEAT_MISS_BEFORE_DEATH = 10;
+		private const int DEFAULT_IDLE_TIME = 500;
+        private const string DEFAULT_GROUP = "default";
+        private const int WORKER_KILL_TIME_SECONDS = 1000;
 
-		private string group;
-		private readonly object stopstartSemaphore = new object();
-		private bool isStarted = false;
-		private Uri discoveryUri;
+        private const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3;
+        private const int SOCKET_CONNECTION_BACKOFF_TIME = 500;
+
+        private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
+        private long maxReconnectDelay = DEFAULT_MAX_RECONNECT_DELAY;
+        private long backOffMultiplier = DEFAULT_BACKOFF_MULTIPLIER;
+        private bool useExponentialBackOff;
+        private int maxReconnectAttempts;
+
+        private int timeToLive = 1;
+        private string group = DEFAULT_GROUP;
+        private bool loopBackMode;
+        private Dictionary<String, RemoteBrokerData> brokersByService = 
+            new Dictionary<String, RemoteBrokerData>();
+        private readonly object servicesLock = new object();
+        private String selfService;
+        private long keepAliveInterval = DEFAULT_IDLE_TIME;
+        private string mcInterface;
+        private string mcNetworkInterface;
+        private string mcJoinNetworkInterface;
+        private DateTime lastAdvertizeTime;
+        private bool reportAdvertizeFailed = true;
+        private readonly Atomic<bool> started = new Atomic<bool>(false);
+
+        private Uri discoveryUri;
 		private Socket multicastSocket;
 		private IPEndPoint endPoint;
 		private Thread worker;
+        private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
 
-		private event NewBrokerServiceFound onNewServiceFound;
-		private event BrokerServiceRemoved onServiceRemoved;
+        private ServiceAddHandler serviceAddHandler;
+        private ServiceRemoveHandler serviceRemoveHandler;
 
-		/// <summary>
-		/// Indexed by service name
-		/// </summary>
-		private readonly Dictionary<string, RemoteBrokerData> remoteBrokers;
+        #region Property Setters and Getters
 
-		public MulticastDiscoveryAgent()
-		{
-			group = DEFAULT_GROUP;
-			remoteBrokers = new Dictionary<string, RemoteBrokerData>();
-		}
+        public bool LoopBackMode
+        {
+            get { return this.loopBackMode; }
+            set { this.loopBackMode = value; }
+        }
+
+        public int TimeToLive
+        {
+            get { return this.timeToLive; }
+            set { this.timeToLive = value; }
+        }
+
+        public long KeepAliveInterval
+        {
+            get { return this.keepAliveInterval; }
+            set { this.keepAliveInterval = value; }
+        }
+
+        public string Interface
+        {
+            get { return this.mcInterface; }
+            set { this.mcInterface = value; }
+        }
+
+        public string NetworkInterface
+        {
+            get { return this.mcNetworkInterface; }
+            set { this.mcNetworkInterface = value; }
+        }
+
+        public string JoinNetworkInterface
+        {
+            get { return this.mcJoinNetworkInterface; }
+            set { this.mcJoinNetworkInterface = value; }
+        }
+
+        public string Type
+        {
+            get { return this.group + "." + TYPE_SUFFIX; }
+        }
+
+        public long BackOffMultiplier
+        {
+            get { return this.backOffMultiplier; }
+            set { this.backOffMultiplier = value; }
+        }
+
+        public long InitialReconnectDelay
+        {
+            get { return this.initialReconnectDelay; }
+            set { this.initialReconnectDelay = value; }
+        }
+
+        public int MaxReconnectAttempts
+        {
+            get { return this.maxReconnectAttempts; }
+            set { this.maxReconnectAttempts = value; }
+        }
+
+        public long MaxReconnectDelay
+        {
+            get { return this.maxReconnectDelay; }
+            set { this.maxReconnectDelay = value; }
+        }
+
+        public bool UseExponentialBackOff
+        {
+            get { return this.useExponentialBackOff; }
+            set { this.useExponentialBackOff = value; }
+        }
+
+        public string Group
+        {
+            get { return this.group; }
+            set { this.group = value; }
+        }
+
+        public ServiceAddHandler ServiceAdd
+        {
+            get { return serviceAddHandler; }
+            set { this.serviceAddHandler = value; }
+        }
+
+        public ServiceRemoveHandler ServiceRemove
+        {
+            get { return serviceRemoveHandler; }
+            set { this.serviceRemoveHandler = value; }
+        }
+
+        public Uri DiscoveryURI
+        {
+            get { return discoveryUri; }
+            set { discoveryUri = value; }
+        }
+
+        public bool IsStarted
+        {
+            get { return started.Value; }
+        }
+
+        #endregion
+
+        public override String ToString()
+        {
+            return "MulticastDiscoveryAgent-" + (selfService != null ? "advertise:" + selfService : "");
+        }
+
+        public void RegisterService(String name)
+        {
+            this.selfService = name;
+            if (started.Value)
+            {
+                DoAdvertizeSelf();
+            }
+        }
+
+        public void ServiceFailed(DiscoveryEvent failedEvent)
+        {
+            RemoteBrokerData data = brokersByService[failedEvent.ServiceName];
+            if (data != null && data.MarkFailed()) 
+            {
+                FireServiceRemoveEvent(data);
+            }
+        }
 
 		public void Start()
 		{
-			lock(stopstartSemaphore)
-			{
-				if (discoveryUri == null || discoveryUri.Host.Equals(DEFAULT_HOST_STR))
-				{
-					discoveryUri = new Uri(DEFAULT_DISCOVERY_URI_STRING);
-				}
-				
-				if(multicastSocket == null)
-				{
-					int numFailedAttempts = 0;
-					int backoffTime = DEFAULT_BACKOFF_MILLISECONDS;
+            if (started.CompareAndSet(false, true)) {           
+                            
+                if (String.IsNullOrEmpty(group)) 
+                {
+                    throw new IOException("You must specify a group to discover");
+                }
+                String type = Type;
+                if (!type.EndsWith(".")) 
+                {
+                    Tracer.Warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
+                    type += ".";
+                }
+                
+                if (discoveryUri == null) 
+                {
+                    discoveryUri = new Uri(DEFAULT_DISCOVERY_URI_STRING);
+                }
 
-					Tracer.Info("Connecting to multicast discovery socket.");
-					while(!TryToConnectSocket())
-					{
-						numFailedAttempts++;
-						if(numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
-						{
-							throw new ApplicationException(
-								"Could not open the socket in order to discover advertising brokers.");
-						}
+                if (Tracer.IsDebugEnabled) 
+                {
+                    Tracer.Debug("start - discoveryURI = " + discoveryUri);                              
+                }
 
-						Thread.Sleep(backoffTime);
-						backoffTime *= BACKOFF_MULTIPLIER;
-					}
-				}
+                String targetHost = discoveryUri.Host;
+                int targetPort = discoveryUri.Port;
+                     
+                if (DEFAULT_HOST_STR.Equals(targetHost)) 
+                {
+                    targetHost = DEFAULT_HOST_IP;                     
+                }
 
-				if(worker == null)
-				{
-					Tracer.Info("Starting multicast discovery agent worker thread");
-					worker = new Thread(new ThreadStart(worker_DoWork));
-					worker.IsBackground = true;
-					worker.Start();
-					isStarted = true;
-				}
-			}
+                if (targetPort < 0) 
+                {
+                    targetPort = DEFAULT_PORT;              
+                }
+                  
+                if (Tracer.IsDebugEnabled) 
+                {
+                    Tracer.DebugFormat("start - myHost = {0}", targetHost); 
+                    Tracer.DebugFormat("start - myPort = {0}", targetPort);    
+                    Tracer.DebugFormat("start - group  = {0}", group);                    
+                    Tracer.DebugFormat("start - interface  = {0}", mcInterface);
+                    Tracer.DebugFormat("start - network interface  = {0}", mcNetworkInterface);
+                    Tracer.DebugFormat("start - join network interface  = {0}", mcJoinNetworkInterface);
+                } 
+
+                int numFailedAttempts = 0;
+                int backoffTime = SOCKET_CONNECTION_BACKOFF_TIME;
+
+                Tracer.Info("Connecting to multicast discovery socket.");
+                while (!TryToConnectSocket(targetHost, targetPort))
+                {
+                    numFailedAttempts++;
+                    if (numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
+                    {
+                        throw new ApplicationException(
+                            "Could not open the socket in order to discover advertising brokers.");
+                    }
+
+                    Thread.Sleep(backoffTime);
+                    backoffTime = (int)(backoffTime * BackOffMultiplier);
+                }
+
+                if(worker == null)
+                {
+                    Tracer.Info("Starting multicast discovery agent worker thread");
+                    worker = new Thread(new ThreadStart(DiscoveryAgentRun));
+                    worker.IsBackground = true;
+                    worker.Start();
+                }
+
+                DoAdvertizeSelf();
+            }
 		}
 
 		public void Stop()
 		{
-			Thread localThread = null;
-
-			lock(stopstartSemaphore)
+            // Changing the isStarted flag will signal the thread that it needs to shut down.
+            if (started.CompareAndSet(true, false))
 			{
 				Tracer.Info("Stopping multicast discovery agent worker thread");
-				localThread = worker;
-				worker = null;
-				// Changing the isStarted flag will signal the thread that it needs to shut down.
-				isStarted = false;
+                if (multicastSocket != null)
+                {
+                    multicastSocket.Close();
+                }
+                if(worker != null)
+                {
+                    // wait for the worker to stop.
+                    if(!worker.Join(WORKER_KILL_TIME_SECONDS))
+                    {
+                        Tracer.Info("!! Timeout waiting for multicast discovery agent localThread to stop");
+                        worker.Abort();
+                    }
+                    worker = null;
+                    Tracer.Debug("Multicast discovery agent worker thread stopped");
+                }
+                executor.Shutdown();
+                if (!executor.AwaitTermination(TimeSpan.FromMinutes(1)))
+                {
+                    Tracer.DebugFormat("Failed to properly shutdown agent executor {0}", this);
+                }
 			}
-
-			if(localThread != null)
-			{
-				// wait for the worker to stop.
-				if(!localThread.Join(WORKER_KILL_TIME_SECONDS))
-				{
-					Tracer.Info("!! Timeout waiting for multicast discovery agent localThread to stop");
-					localThread.Abort();
-				}
-			}
-
-			Tracer.Info("Multicast discovery agent worker thread joined");
 		}
 
-		private bool TryToConnectSocket()
+		private bool TryToConnectSocket(string targetHost, int targetPort)
 		{
 			bool hasSucceeded = false;
 
 			try
 			{
 				multicastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
-				endPoint = new IPEndPoint(IPAddress.Any, discoveryUri.Port);
+				endPoint = new IPEndPoint(IPAddress.Any, targetPort);
 
-				//We have to allow reuse in the multicast socket. Otherwise, we would be unable to use multiple clients on the same machine.
+				// We have to allow reuse in the multicast socket. Otherwise, we would be unable to
+                // use multiple clients on the same machine.
 				multicastSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1);
 				multicastSocket.Bind(endPoint);
 
 				IPAddress ipaddress;
 
-				if(!TcpTransportFactory.TryParseIPAddress(discoveryUri.Host, out ipaddress))
+				if(!TcpTransportFactory.TryParseIPAddress(targetHost, out ipaddress))
 				{
-					ipaddress = TcpTransportFactory.GetIPAddress(discoveryUri.Host, AddressFamily.InterNetwork);
+					ipaddress = TcpTransportFactory.GetIPAddress(targetHost, AddressFamily.InterNetwork);
 					if(null == ipaddress)
 					{
 						throw new NMSConnectionException("Invalid host address.");
 					}
 				}
 
-				multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
-												 new MulticastOption(ipaddress, IPAddress.Any));
-#if !NETCF
-				multicastSocket.ReceiveTimeout = SOCKET_TIMEOUT_MILLISECONDS;
-#endif
+                if (LoopBackMode)
+                {
+                    multicastSocket.MulticastLoopback = true;
+                }
+                if (TimeToLive != 0)
+                {
+				    multicastSocket.SetSocketOption(SocketOptionLevel.IP, 
+                                                    SocketOptionName.MulticastTimeToLive, timeToLive);
+                }
+                if (!String.IsNullOrEmpty(mcJoinNetworkInterface))
+                {
+                    // TODO figure out how to set this.
+                    throw new NotSupportedException("McJoinNetworkInterface not yet implemented.");
+                }
+                else 
+                {
+                    multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
+                                                     new MulticastOption(ipaddress, IPAddress.Any));
+                }
+
+                if (!String.IsNullOrEmpty(mcNetworkInterface))
+                {
+                    // TODO figure out how to set this.
+                    throw new NotSupportedException("McNetworkInterface not yet implemented.");
+                }
+
+				multicastSocket.ReceiveTimeout = (int)keepAliveInterval;
+
 				hasSucceeded = true;
 			}
 			catch(SocketException)
@@ -173,15 +379,16 @@
 			return hasSucceeded;
 		}
 
-		private void worker_DoWork()
+		private void DiscoveryAgentRun()
 		{
 			Thread.CurrentThread.Name = "Discovery Agent Thread.";
 			byte[] buffer = new byte[BUFF_SIZE];
 			string receivedInfoRaw;
 			string receivedInfo;
 
-			while(isStarted)
+			while (started.Value)
 			{
+                DoTimeKeepingServices();
 				try
 				{
 					int numBytes = multicastSocket.Receive(buffer);
@@ -197,177 +404,334 @@
 						receivedInfo = receivedInfoRaw;
 					}
 
-					ProcessBrokerMessage(receivedInfo);
+					ProcessServiceAdvertisement(receivedInfo);
 				}
 				catch(SocketException)
 				{
 					// There was no multicast message sent before the timeout expired...Let us try again.
 				}
 
-				//We need to clear the buffer.
+				// We need to clear the buffer.
 				buffer[0] = 0x0;
-				ExpireOldServices();
 			}
 		}
 
-		private void ProcessBrokerMessage(string message)
+		private void ProcessServiceAdvertisement(string message)
 		{
 			string payload;
 			string brokerName;
 			string serviceName;
 
-			if(message.StartsWith(MulticastType))
+			if (message.StartsWith(Type))
 			{
-				payload = message.Substring(MulticastType.Length);
+				payload = message.Substring(Type.Length);
 				brokerName = GetBrokerName(payload);
 				serviceName = GetServiceName(payload);
 
-				if(payload.StartsWith(ALIVE))
+				if (payload.StartsWith(ALIVE))
 				{
-					ProcessAliveBrokerMessage(brokerName, serviceName);
+					ProcessAlive(brokerName, serviceName);
 				}
-				else if(payload.StartsWith(DEAD))
+				else if (payload.StartsWith(DEAD))
 				{
-					ProcessDeadBrokerMessage(brokerName, serviceName);
+					ProcessDead(serviceName);
 				}
 				else
 				{
-					//Malformed Payload
+					// Malformed Payload
 				}
 			}
 		}
 
-		private void ProcessDeadBrokerMessage(string brokerName, string serviceName)
-		{
-			if(remoteBrokers.ContainsKey(serviceName))
-			{
-				remoteBrokers.Remove(serviceName);
-				if(onServiceRemoved != null)
-				{
-					onServiceRemoved(brokerName, serviceName);
-				}
-			}
-		}
+        private void DoTimeKeepingServices() 
+        {
+            if (started.Value)
+            {
+                DateTime currentTime = DateTime.Now;
+                if (currentTime < lastAdvertizeTime || 
+                    ((currentTime - TimeSpan.FromMilliseconds(keepAliveInterval)) > lastAdvertizeTime))
+                {
+                    DoAdvertizeSelf();
+                    lastAdvertizeTime = currentTime;
+                }
+                DoExpireOldServices();
+            }
+        }
 
-		private void ProcessAliveBrokerMessage(string brokerName, string serviceName)
-		{
-			if(remoteBrokers.ContainsKey(serviceName))
-			{
-				remoteBrokers[serviceName].UpdateHeartBeat();
-			}
-			else
-			{
-				remoteBrokers.Add(serviceName, new RemoteBrokerData(brokerName, serviceName));
+        private void DoAdvertizeSelf()
+        {
+            if (!String.IsNullOrEmpty(selfService)) 
+            {
+                String payload = Type;
+                payload += started.Value ? ALIVE : DEAD;
+                payload += DELIMITER + "localhost" + DELIMITER;
+                payload += selfService;
+                try 
+                {
+                    byte[] data = System.Text.Encoding.UTF8.GetBytes(payload);
+                    multicastSocket.Send(data);
+                } 
+                catch (Exception e) 
+                {
+                    // If a send fails, chances are all subsequent sends will fail
+                    // too.. No need to keep reporting the
+                    // same error over and over.
+                    if (reportAdvertizeFailed) 
+                    {
+                        reportAdvertizeFailed = false;
+                        Tracer.ErrorFormat("Failed to advertise our service: {0} cause: {1}", payload, e.Message);
+                    }
+                }
+            }
+        }
 
-				if(onNewServiceFound != null)
-				{
-					onNewServiceFound(brokerName, serviceName);
-				}
-			}
-		}
+        private void ProcessAlive(string brokerName, string service)
+        {
+            if (selfService == null || !service.Equals(selfService)) 
+            {
+                RemoteBrokerData remoteBroker = null;
+                lock (servicesLock)
+                {
+                    brokersByService.TryGetValue(service, out remoteBroker);
+                }
+                if (remoteBroker == null) 
+                {
+                    remoteBroker = new RemoteBrokerData(this, brokerName, service);
+                    brokersByService.Add(service, remoteBroker);      
+                    FireServiceAddEvent(remoteBroker);
+                    DoAdvertizeSelf();
+                } 
+                else 
+                {
+                    remoteBroker.UpdateHeartBeat();
+                    if (remoteBroker.IsTimeForRecovery()) 
+                    {
+                        FireServiceAddEvent(remoteBroker);
+                    }
+                }
+            }
+        }
+
+        private void ProcessDead(string service) 
+        {
+            if (!service.Equals(selfService)) 
+            {
+                RemoteBrokerData remoteBroker = null;
+                lock (servicesLock)
+                {
+                    brokersByService.TryGetValue(service, out remoteBroker);
+                    if (remoteBroker != null)
+                    {
+                        brokersByService.Remove(service);
+                    }
+                }
+                if (remoteBroker != null && !remoteBroker.Failed) 
+                {
+                    FireServiceRemoveEvent(remoteBroker);
+                }
+            }
+        }
+
+        private void DoExpireOldServices() 
+        {
+            DateTime expireTime = DateTime.Now - TimeSpan.FromMilliseconds(keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 
+
+            RemoteBrokerData[] services = null;
+            lock (servicesLock)
+            {
+                services = new RemoteBrokerData[this.brokersByService.Count];
+                this.brokersByService.Values.CopyTo(services, 0);
+            }
+
+            foreach(RemoteBrokerData service in services)
+            {
+                if (service.LastHeartBeat < expireTime) 
+                {
+                    ProcessDead(service.ServiceName);
+                }
+            }
+        }
 
 		private static string GetBrokerName(string payload)
 		{
 			string[] results = payload.Split(DELIMITER);
-			return results[1];
+            if (results.Length >= 2)
+            {
+			    return results[1];
+            }
+            return null;
 		}
 
 		private static string GetServiceName(string payload)
 		{
 			string[] results = payload.Split(DELIMITER);
-			return results[2];
-		}
-
-		private void ExpireOldServices()
-		{
-			DateTime expireTime;
-			List<RemoteBrokerData> deadServices = new List<RemoteBrokerData>();
-
-			foreach(KeyValuePair<string, RemoteBrokerData> brokerService in remoteBrokers)
-			{
-				expireTime = brokerService.Value.lastHeartBeat.AddSeconds(EXPIRATION_OFFSET_IN_SECONDS);
-				if(DateTime.Now > expireTime)
-				{
-					deadServices.Add(brokerService.Value);
-				}
-			}
-
-			// Remove all of the dead services
-			for(int i = 0; i < deadServices.Count; i++)
-			{
-				ProcessDeadBrokerMessage(deadServices[i].brokerName, deadServices[i].serviceName);
-			}
-		}
-
-		#region Properties
-
-		/// <summary>
-		/// This property indicates whether or not async send is enabled.
-		/// </summary>
-		public Uri DiscoveryURI
-		{
-			get { return discoveryUri; }
-			set { discoveryUri = value; }
-		}
-
-		public bool IsStarted
-		{
-			get { return isStarted; }
-		}
-
-		public string Group
-		{
-			get { return group; }
-			set { group = value; }
-		}
-
-		#endregion
-
-		internal string MulticastType
-		{
-			get { return group + "." + TYPE_SUFFIX; }
-		}
-
-		internal event NewBrokerServiceFound OnNewServiceFound
-		{
-			add { onNewServiceFound += value; }
-			remove { onNewServiceFound -= value; }
-		}
-
-		internal event BrokerServiceRemoved OnServiceRemoved
-		{
-			add { onServiceRemoved += value; }
-			remove { onServiceRemoved += value; }
+            if (results.Length >= 3)
+            {
+                return results[2];
+            }
+            return null;
 		}
 
 		public void Dispose()
 		{
-			if(isStarted)
+			if(started.Value)
 			{
 				Stop();
 			}
-
-			multicastSocket.Shutdown(SocketShutdown.Both);
-			multicastSocket = null;
 		}
 
-		internal class RemoteBrokerData
+        private void FireServiceRemoveEvent(RemoteBrokerData data) 
+        {
+            if (serviceRemoveHandler != null && started.Value) 
+            {
+                // Have the listener process the event async so that
+                // he does not block this thread since we are doing time sensitive
+                // processing of events.
+                executor.QueueUserWorkItem(ServiceRemoveCallback, data);
+            }
+        }
+
+        private void ServiceRemoveCallback(object data)
+        {
+            RemoteBrokerData serviceData = data as RemoteBrokerData;
+            this.serviceRemoveHandler(serviceData);
+        }
+
+        private void FireServiceAddEvent(RemoteBrokerData data) 
+        {
+            if (serviceAddHandler != null && started.Value) 
+            {
+                // Have the listener process the event async so that
+                // he does not block this thread since we are doing time sensitive
+                // processing of events.
+                executor.QueueUserWorkItem(ServiceAddCallback, data);
+            }
+        }
+
+        private void ServiceAddCallback(object data)
+        {
+            RemoteBrokerData serviceData = data as RemoteBrokerData;
+            this.serviceAddHandler(serviceData);
+        }
+
+		internal class RemoteBrokerData : DiscoveryEvent
 		{
-			internal string brokerName;
-			internal string serviceName;
+            internal DateTime recoveryTime = DateTime.MinValue;
+            internal int failureCount;
+            internal bool failed;
 			internal DateTime lastHeartBeat;
 
-			internal RemoteBrokerData(string brokerName, string serviceName)
+            private readonly object syncRoot = new object();
+            private readonly MulticastDiscoveryAgent parent;
+
+			internal RemoteBrokerData(MulticastDiscoveryAgent parent, string brokerName, string serviceName) : base()
 			{
-				this.brokerName = brokerName;
-				this.serviceName = serviceName;
+                this.parent = parent;
+				this.BrokerName = brokerName;
+				this.ServiceName = serviceName;
 				this.lastHeartBeat = DateTime.Now;
 			}
 
+            internal bool Failed
+            {
+                get { return this.failed; }
+            }
+
+            internal DateTime LastHeartBeat
+            {
+                get 
+                { 
+                    lock(syncRoot)
+                    {
+                        return this.lastHeartBeat; 
+                    }
+                }
+            }
+
 			internal void UpdateHeartBeat()
 			{
-				this.lastHeartBeat = DateTime.Now;
+                lock (syncRoot)
+                {
+                    this.lastHeartBeat = DateTime.Now;
+
+                    // Consider that the broker recovery has succeeded if it has not
+                    // failed in 60 seconds.
+                    if (!failed && failureCount > 0 && 
+                        (lastHeartBeat - recoveryTime) > TimeSpan.FromMilliseconds(1000 * 60)) {
+
+                        Tracer.DebugFormat("I now think that the {0} service has recovered.", ServiceName);
+
+                        failureCount = 0;
+                        recoveryTime = DateTime.MinValue;
+                    }
+                }
 			}
+
+            internal bool MarkFailed()
+            {
+                lock (syncRoot)
+                {
+                    if (!failed) 
+                    {
+                        failed = true;
+                        failureCount++;
+
+                        long reconnectDelay;
+                        if (!parent.UseExponentialBackOff) 
+                        {
+                            reconnectDelay = parent.InitialReconnectDelay;
+                        } 
+                        else 
+                        {
+                            reconnectDelay = (long)Math.Pow(parent.BackOffMultiplier, failureCount);
+                            reconnectDelay = Math.Min(reconnectDelay, parent.MaxReconnectDelay);
+                        }
+
+                        Tracer.DebugFormat("Remote failure of {0} while still receiving multicast advertisements.  " +
+                                           "Advertising events will be suppressed for {1} ms, the current " +
+                                           "failure count is: {2}", ServiceName, reconnectDelay, failureCount);
+
+                        recoveryTime = DateTime.Now + TimeSpan.FromMilliseconds(reconnectDelay);
+                        return true;
+                    }
+                }
+                return false;            
+            }
+
+            /// <summary>
+            /// Returns true if this Broker has been marked as failed and it is now time to
+            /// start a recovery attempt.
+            /// </summary>
+            public bool IsTimeForRecovery() 
+            {
+                lock (syncRoot)
+                {
+                    if (!failed) 
+                    {
+                        return false;
+                    }
+
+                    int maxReconnectAttempts = parent.MaxReconnectAttempts;
+
+                    // Are we done trying to recover this guy?
+                    if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) 
+                    {
+                        Tracer.DebugFormat("Max reconnect attempts of the {0} service has been reached.", ServiceName);
+                        return false;
+                    }
+
+                    // Is it not yet time?
+                    if (DateTime.Now < recoveryTime) 
+                    {
+                        return false;
+                    }
+
+                    Tracer.DebugFormat("Resuming event advertisement of the {0} service.", ServiceName);
+
+                    failed = false;
+                    return true;
+                }
+            }
 		}
 	}
 }
diff --git a/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgentFactory.cs b/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgentFactory.cs
new file mode 100644
index 0000000..0069cad
--- /dev/null
+++ b/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgentFactory.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast
+{
+    [ActiveMQDiscoveryAgentFactory("multicast")]
+    public class MulticastDiscoveryAgentFactory : IDiscoveryAgentFactory
+    {
+        public IDiscoveryAgent CreateAgent(Uri uri)
+        {
+            Tracer.DebugFormat("Creating DiscoveryAgent:[{0}]", uri);
+
+            try
+            {
+                MulticastDiscoveryAgent agent = new MulticastDiscoveryAgent();
+                agent.DiscoveryURI = uri;
+
+                // allow MDA's params to be set via query arguments  
+                // (e.g., multicast://default?group=foo             
+
+                StringDictionary parameters = URISupport.ParseParameters(uri);
+                URISupport.SetProperties(agent, parameters);
+
+                return agent;
+            }
+            catch(Exception e)
+            {
+                throw new IOException("Could not create discovery agent", e);
+            }
+        }
+    }
+}
+
diff --git a/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs b/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
index c061af0..504d07b 100644
--- a/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
+++ b/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
@@ -1,69 +1,68 @@
-/*

- * Licensed to the Apache Software Foundation (ASF) under one or more

- * contributor license agreements.  See the NOTICE file distributed with

- * this work for additional information regarding copyright ownership.

- * The ASF licenses this file to You under the Apache License, Version 2.0

- * (the "License"); you may not use this file except in compliance with

- * the License.  You may obtain a copy of the License at

- *

- *      http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-using System;

-using System.Collections.Specialized;

-using Apache.NMS.Util;

-

-namespace Apache.NMS.ActiveMQ.Transport.Failover

-{

-    [ActiveMQTransportFactory("failover")]

-	public class FailoverTransportFactory : ITransportFactory

-	{

-		private ITransport doConnect(Uri location)

-		{

-			ITransport transport = CreateTransport(URISupport.ParseComposite(location));

-			transport = new MutexTransport(transport);

-			transport = new ResponseCorrelator(transport);

-			return transport;

-		}

-

-		public ITransport CompositeConnect(Uri location)

-		{

-			return CreateTransport(URISupport.ParseComposite(location));

-		}

-

-		public ITransport CompositeConnect(Uri location, SetTransport setTransport)

-		{

-			throw new NMSConnectionException("Asynchronous composite connection not supported with Failover transport.");

-		}

-

-		public ITransport CreateTransport(Uri location)

-		{

-			return doConnect(location);

-		}

-

-		/// <summary>

-		/// </summary>

-		/// <param name="compositData"></param>

-		/// <returns></returns>

-		public ITransport CreateTransport(URISupport.CompositeData compositData)

-		{

-			StringDictionary options = compositData.Parameters;

-			FailoverTransport transport = CreateTransport(options);

-			transport.Add(false, compositData.Components);

-			return transport;

-		}

-

-		public FailoverTransport CreateTransport(StringDictionary parameters)

-		{

-			FailoverTransport transport = new FailoverTransport();

-			URISupport.SetProperties(transport, parameters, "transport.");

-			return transport;

-		}

-	}

-}

+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+    [ActiveMQTransportFactory("failover")]
+	public class FailoverTransportFactory : ITransportFactory
+	{
+		private ITransport doConnect(Uri location)
+		{
+			ITransport transport = CreateTransport(URISupport.ParseComposite(location));
+			transport = new MutexTransport(transport);
+			transport = new ResponseCorrelator(transport);
+			return transport;
+		}
+
+		public ITransport CompositeConnect(Uri location)
+		{
+			return CreateTransport(URISupport.ParseComposite(location));
+		}
+
+		public ITransport CreateTransport(Uri location)
+		{
+			return doConnect(location);
+		}
+
+		/// <summary>
+        /// Virtual transport create method which can be overriden by subclasses to provide
+        /// an alternate FailoverTransport implementation.  All transport creation methods in
+        /// this factory calls through this method to create the ITransport instance so this
+        /// is the only method that needs to be overriden.  
+		/// </summary>
+		/// <param name="compositData"></param>
+		/// <returns></returns>
+		public virtual ITransport CreateTransport(URISupport.CompositeData compositData)
+		{
+			StringDictionary options = compositData.Parameters;
+			FailoverTransport transport = CreateTransport(options);
+			transport.Add(false, compositData.Components);
+			return transport;
+		}
+
+		protected FailoverTransport CreateTransport(StringDictionary parameters)
+		{
+			FailoverTransport transport = new FailoverTransport();
+			URISupport.SetProperties(transport, parameters, "transport.");
+			return transport;
+		}
+	}
+}
diff --git a/src/main/csharp/Transport/ITransportFactory.cs b/src/main/csharp/Transport/ITransportFactory.cs
index dd744ff..7b008df 100644
--- a/src/main/csharp/Transport/ITransportFactory.cs
+++ b/src/main/csharp/Transport/ITransportFactory.cs
@@ -25,6 +25,5 @@
 	{
 		ITransport CreateTransport(Uri location);
 		ITransport CompositeConnect(Uri location);
-		ITransport CompositeConnect(Uri location, SetTransport setTransport);
 	}
 }
diff --git a/src/main/csharp/Transport/Mock/MockTransportFactory.cs b/src/main/csharp/Transport/Mock/MockTransportFactory.cs
index f3c194f..6fac96b 100644
--- a/src/main/csharp/Transport/Mock/MockTransportFactory.cs
+++ b/src/main/csharp/Transport/Mock/MockTransportFactory.cs
@@ -137,10 +137,5 @@
 
 			return transport;
 		}
-
-		public ITransport CompositeConnect(Uri location, SetTransport setTransport)
-		{
-			throw new NMSConnectionException("Asynchronous composite connection not supported with Mock transport.");
-		}
 	}
 }
diff --git a/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs b/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
index cfa1f8f..786e4e2 100644
--- a/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
+++ b/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
@@ -107,11 +107,6 @@
 
         public ITransport CompositeConnect(Uri location)
         {
-            return CompositeConnect(location, null);
-        }
-
-        public ITransport CompositeConnect(Uri location, SetTransport setTransport)
-        {
             // Extract query parameters from broker Uri
             StringDictionary map = URISupport.ParseQuery(location.Query);
 
@@ -165,10 +160,6 @@
             }
 
             transport = new WireFormatNegotiator(transport, wireformat);
-            if(setTransport != null)
-            {
-                setTransport(transport, location);
-            }
 
             return transport;
         }
diff --git a/src/main/csharp/Transport/TransportFactory.cs b/src/main/csharp/Transport/TransportFactory.cs
index 7b110bf..1368a2d 100644
--- a/src/main/csharp/Transport/TransportFactory.cs
+++ b/src/main/csharp/Transport/TransportFactory.cs
@@ -19,10 +19,6 @@
 using System.Reflection;
 using System.Collections.Generic;
 using Apache.NMS.ActiveMQ.Util;
-using Apache.NMS.ActiveMQ.Transport.Discovery;
-using Apache.NMS.ActiveMQ.Transport.Failover;
-using Apache.NMS.ActiveMQ.Transport.Mock;
-using Apache.NMS.ActiveMQ.Transport.Tcp;
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
@@ -69,12 +65,6 @@
 			return tf.CompositeConnect(location);
 		}
 
-		public static ITransport AsyncCompositeConnect(Uri location, SetTransport setTransport)
-		{
-			ITransportFactory tf = TransportFactory.CreateTransportFactory(location);
-			return tf.CompositeConnect(location, setTransport);
-		}
-
 		/// <summary>
 		/// Create a transport factory for the scheme.  If we do not support the transport protocol,
 		/// an NMSConnectionException will be thrown.
diff --git a/src/main/csharp/Util/ISuspendable.cs b/src/main/csharp/Util/ISuspendable.cs
new file mode 100644
index 0000000..84f60e6
--- /dev/null
+++ b/src/main/csharp/Util/ISuspendable.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.ActiveMQ
+{
+    /// <summary>
+    /// Optional interface for service type objects which support a 
+    /// logical suspend and resume mode.  Services that can be suspended
+    /// when not needed can reduce resource load.  
+    /// </summary>
+    public interface ISuspendable
+    {
+        void Suspend();
+
+        void Resume();
+    }
+}
+