https://issues.apache.org/jira/browse/AMQNET-445
Clean up transport code a bit.
Improve Discovery transport to use an underlying FailoverTransport to manage connect / reconnect.
Added a more defined discovery agent interface and an attribute to use to tag them so that they can be auto discovered.
Cleaned up and improved the Multicast agent.
diff --git a/src/main/csharp/Transport/Discovery/ActiveMQDiscoveryAgentFactoryAttribute.cs b/src/main/csharp/Transport/Discovery/ActiveMQDiscoveryAgentFactoryAttribute.cs
new file mode 100644
index 0000000..6e42b50
--- /dev/null
+++ b/src/main/csharp/Transport/Discovery/ActiveMQDiscoveryAgentFactoryAttribute.cs
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+ /// <summary>
+ /// Attribute that decorates DiscoveryAgentFactory implementations to allow
+ /// the DiscoverAgentFactory to find all the available factories dynamically.
+ /// </summary>
+ public class ActiveMQDiscoveryAgentFactoryAttribute : FactoryAttribute
+ {
+ public ActiveMQDiscoveryAgentFactoryAttribute(string scheme) : base(scheme)
+ {
+ }
+ }
+}
+
diff --git a/src/main/csharp/Transport/Discovery/DiscoveryAgentFactory.cs b/src/main/csharp/Transport/Discovery/DiscoveryAgentFactory.cs
new file mode 100644
index 0000000..37bb248
--- /dev/null
+++ b/src/main/csharp/Transport/Discovery/DiscoveryAgentFactory.cs
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Generic;
+using Apache.NMS.ActiveMQ.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+ public class DiscoveryAgentFactory
+ {
+ private static readonly FactoryFinder<ActiveMQDiscoveryAgentFactoryAttribute, ITransportFactory> FACTORY_FINDER =
+ new FactoryFinder<ActiveMQDiscoveryAgentFactoryAttribute, ITransportFactory>();
+
+ private readonly static object AGENT_FACTORY_TYPES_LOCK = new object();
+ private readonly static Dictionary<String, Type> AGENT_FACTORY_TYPES = new Dictionary<String, Type>();
+
+ public void RegisterAgentFactory(string scheme, Type factoryType)
+ {
+ lock (AGENT_FACTORY_TYPES_LOCK)
+ {
+ AGENT_FACTORY_TYPES[scheme] = factoryType;
+ }
+ }
+
+ public static IDiscoveryAgent CreateAgent(Uri location)
+ {
+ IDiscoveryAgentFactory tf = DiscoveryAgentFactory.CreateAgentFactory(location);
+ return tf.CreateAgent(location);
+ }
+
+ /// <summary>
+ /// Create a DiscoveryAgent Factory for the scheme. If we do not support the agent protocol,
+ /// an NMSConnectionException will be thrown.
+ /// </summary>
+ /// <param name="location"></param>
+ /// <returns></returns>
+ private static IDiscoveryAgentFactory CreateAgentFactory(Uri location)
+ {
+ string scheme = location.Scheme;
+
+ if(string.IsNullOrEmpty(scheme))
+ {
+ throw new NMSConnectionException(String.Format("Discovery Agent scheme invalid: [{0}]", location.ToString()));
+ }
+
+ IDiscoveryAgentFactory factory = null;
+
+ try
+ {
+ factory = NewInstance(scheme.ToLower());
+ }
+ catch(NMSConnectionException)
+ {
+ throw;
+ }
+ catch(Exception e)
+ {
+ throw new NMSConnectionException("Error creating discovery agent.", e);
+ }
+
+ if(null == factory)
+ {
+ throw new NMSConnectionException("Unable to create a discovery agent.");
+ }
+
+ return factory;
+ }
+
+ private static IDiscoveryAgentFactory NewInstance(string scheme)
+ {
+ try
+ {
+ Type factoryType = FindAgentFactory(scheme);
+
+ if(factoryType == null)
+ {
+ throw new Exception("NewInstance failed to find a match for id = " + scheme);
+ }
+
+ return (IDiscoveryAgentFactory) Activator.CreateInstance(factoryType);
+ }
+ catch(Exception ex)
+ {
+ Tracer.WarnFormat("NewInstance failed to create an IDiscoveryAgentFactory with error: {0}", ex.Message);
+ throw;
+ }
+ }
+
+ private static Type FindAgentFactory(string scheme)
+ {
+ lock (AGENT_FACTORY_TYPES_LOCK)
+ {
+ if(AGENT_FACTORY_TYPES.ContainsKey(scheme))
+ {
+ return AGENT_FACTORY_TYPES[scheme];
+ }
+ }
+
+ try
+ {
+ Type factoryType = FACTORY_FINDER.FindFactoryType(scheme);
+
+ lock (AGENT_FACTORY_TYPES_LOCK)
+ {
+ AGENT_FACTORY_TYPES[scheme] = factoryType;
+ }
+ return factoryType;
+ }
+ catch
+ {
+ throw new NMSConnectionException("Failed to find Factory for Discovery Agent type: " + scheme);
+ }
+ }
+ }
+}
+
diff --git a/src/main/csharp/Transport/Discovery/DiscoveryTransport.cs b/src/main/csharp/Transport/Discovery/DiscoveryTransport.cs
new file mode 100644
index 0000000..900d13b
--- /dev/null
+++ b/src/main/csharp/Transport/Discovery/DiscoveryTransport.cs
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Text;
+using System.Collections.Generic;
+using System.Collections.Specialized;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+ public class DiscoveryTransport : TransportFilter
+ {
+ public const string DISCOVERED_OPTION_PREFIX = "discovered.";
+
+ private readonly new ICompositeTransport next;
+ private readonly object syncRoot = new object();
+ private readonly Dictionary<String, Uri> serviceURIs = new Dictionary<String, Uri>();
+
+ private IDiscoveryAgent discoveryAgent;
+ public IDiscoveryAgent DiscoveryAgent
+ {
+ get { return this.discoveryAgent; }
+ set { this.discoveryAgent = value; }
+ }
+
+ private StringDictionary properties;
+ public StringDictionary Properties
+ {
+ get { return this.properties; }
+ set { this.properties = value; }
+ }
+
+ public DiscoveryTransport(ICompositeTransport next) : base(next)
+ {
+ this.next = next;
+
+ // Intercept the interrupted and resumed events so we can disable our
+ // agent if its supports suspend / resume semantics.
+ this.next.Interrupted = TransportInterrupted;
+ this.next.Resumed = TransportResumed;
+ }
+
+ public override void Start()
+ {
+ if (discoveryAgent == null)
+ {
+ throw new InvalidOperationException("discoveryAgent not configured");
+ }
+
+ // lets pass into the agent the broker name and connection details
+ discoveryAgent.ServiceAdd = OnServiceAdded;
+ discoveryAgent.ServiceRemove = OnServiceRemoved;
+ discoveryAgent.Start();
+ this.next.Start();
+ }
+
+ public override void Stop()
+ {
+ ServiceStopper ss = new ServiceStopper();
+
+ ss.Stop(discoveryAgent);
+ ss.Stop(next);
+ ss.ThrowFirstException();
+ }
+
+ private void OnServiceAdded(DiscoveryEvent addEvent)
+ {
+ String url = addEvent.ServiceName;
+ if (url != null)
+ {
+ try
+ {
+ Uri uri = new Uri(url);
+ Tracer.InfoFormat("Adding new broker connection URL: {0}", uri);
+ uri = ApplyParameters(uri, properties, DISCOVERED_OPTION_PREFIX);
+
+ lock (syncRoot)
+ {
+ serviceURIs[addEvent.ServiceName] = uri;
+ }
+ next.Add(false, new Uri[] {uri});
+ }
+ catch (Exception e)
+ {
+ Tracer.WarnFormat("Could not connect to remote URI: {0} due to bad URI syntax: {1}", url, e.Message);
+ }
+ }
+ }
+
+ private void OnServiceRemoved(DiscoveryEvent removeEvent)
+ {
+ Uri toRemove = null;
+ lock (syncRoot)
+ {
+ serviceURIs.TryGetValue(removeEvent.ServiceName, out toRemove);
+ }
+ if (toRemove != null)
+ {
+ next.Remove(false, new Uri[] {toRemove});
+ }
+ }
+
+ private void TransportResumed(ITransport sender)
+ {
+ ISuspendable service = this.discoveryAgent as ISuspendable;
+ if (service != null)
+ {
+ try
+ {
+ service.Suspend();
+ }
+ catch (Exception e)
+ {
+ Tracer.WarnFormat("Caught error while suspending service: {0} - {1}", service, e.Message);
+ }
+ }
+
+ if (this.Resumed != null)
+ {
+ this.Resumed(sender);
+ }
+ }
+
+ private void TransportInterrupted(ITransport sender)
+ {
+ ISuspendable service = this.discoveryAgent as ISuspendable;
+ if (service != null)
+ {
+ try
+ {
+ service.Resume();
+ }
+ catch (Exception e)
+ {
+ Tracer.WarnFormat("Caught error while resuming service: {0} - {1}", service, e.Message);
+ }
+ }
+
+ if (this.Interrupted != null)
+ {
+ this.Interrupted(sender);
+ }
+ }
+
+ /// <summary>
+ /// Given a Key / Value mapping create and append a URI query value that represents the mapped
+ /// entries, return the newly updated URI that contains the value of the given URI and the
+ /// appended query value. Each entry in the query string is prefixed by the supplied
+ /// optionPrefix string.
+ /// </summary>
+ private static Uri ApplyParameters(Uri uri, StringDictionary queryParameters, String optionPrefix)
+ {
+ if (queryParameters != null && queryParameters.Count != 0)
+ {
+ StringBuilder newQuery = uri.Query != null ? new StringBuilder(uri.Query) : new StringBuilder();
+
+ foreach(KeyValuePair<string, string> entry in queryParameters)
+ {
+ if (entry.Key.StartsWith(optionPrefix))
+ {
+ if (newQuery.Length !=0)
+ {
+ newQuery.Append('&');
+ }
+ string key = entry.Key.Substring(optionPrefix.Length);
+ newQuery.Append(key).Append('=').Append(entry.Value);
+ }
+ }
+ uri = URISupport.CreateUriWithQuery(uri, newQuery.ToString());
+ }
+ return uri;
+ }
+ }
+}
+
diff --git a/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs b/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs
index e045025..db8a19d 100644
--- a/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs
+++ b/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs
@@ -1,133 +1,53 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Threading;
-using Apache.NMS.ActiveMQ.Transport.Discovery.Multicast;
-using Apache.NMS.ActiveMQ.Transport.Tcp;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.ActiveMQ.Transport.Discovery
-{
- [ActiveMQTransportFactory("discovery")]
- public class DiscoveryTransportFactory : ITransportFactory
- {
- private const int TIMEOUT_IN_SECONDS = 20;
-
- private static Uri discoveredUri;
- private static readonly MulticastDiscoveryAgent agent;
- private static string currentServiceName;
- private static readonly object uriLock = new object();
- private static readonly AutoResetEvent discoveredUriEvent = new AutoResetEvent(false);
- private static event ExceptionListener OnException;
-
- static DiscoveryTransportFactory()
- {
- DiscoveryTransportFactory.OnException += TransportFactory.HandleException;
- agent = new MulticastDiscoveryAgent();
- agent.OnNewServiceFound += agent_OnNewServiceFound;
- agent.OnServiceRemoved += agent_OnServiceRemoved;
- }
-
- public DiscoveryTransportFactory()
- {
- lock(uriLock)
- {
- currentServiceName = String.Empty;
- }
- }
-
- public static Uri DiscoveredUri
- {
- get { lock(uriLock) { return discoveredUri; } }
- set { lock(uriLock) { discoveredUri = value; } }
- }
-
- private static void agent_OnNewServiceFound(string brokerName, string serviceName)
- {
- lock(uriLock)
- {
- if(discoveredUri == null)
- {
- currentServiceName = serviceName;
- discoveredUri = new Uri(currentServiceName);
- }
- }
-
- // This will end the wait in the CreateTransport method.
- discoveredUriEvent.Set();
- }
-
- private static void agent_OnServiceRemoved(string brokerName, string serviceName)
- {
- lock(uriLock)
- {
- if(serviceName == currentServiceName)
- {
- DiscoveredUri = null;
- DiscoveryTransportFactory.OnException(new Exception("Broker connection is no longer valid."));
- }
- }
- }
-
- #region Overloaded ITransportFactory Members
-
- public ITransport CreateTransport(Uri location)
- {
- URISupport.CompositeData cd = URISupport.ParseComposite(location);
-
- if(cd.Components.Length > 0)
- {
- agent.DiscoveryURI = cd.Components[0];
- }
-
- if(!agent.IsStarted)
- {
- agent.Start();
- }
-
- Uri hostUri = DiscoveredUri;
-
- if(null == hostUri)
- {
- // If a new broker is found the agent will fire an event which will result in discoveredUri being set.
- discoveredUriEvent.WaitOne(TIMEOUT_IN_SECONDS * 1000, true);
- hostUri = DiscoveredUri;
- if(null == hostUri)
- {
- throw new NMSConnectionException(String.Format("Unable to find a connection to {0} before the timeout period expired.", location.ToString()));
- }
- }
-
- TcpTransportFactory tcpTransFactory = new TcpTransportFactory();
- return tcpTransFactory.CompositeConnect(new Uri(hostUri + location.Query));
- }
-
- public ITransport CompositeConnect(Uri location)
- {
- return CreateTransport(location);
- }
-
-
- public ITransport CompositeConnect(Uri location, SetTransport setTransport)
- {
- return CreateTransport(location);
- }
-
- #endregion
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.ActiveMQ.Transport.Failover;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+ [ActiveMQTransportFactory("discovery")]
+ public class DiscoveryTransportFactory : FailoverTransportFactory
+ {
+ public override ITransport CreateTransport(URISupport.CompositeData compositData)
+ {
+ StringDictionary options = compositData.Parameters;
+ FailoverTransport failoverTransport = CreateTransport(options);
+ return CreateTransport(failoverTransport, compositData, options);
+ }
+
+ /// <summary>
+ /// Factory method for creating a DiscoveryTransport. The Discovery Transport wraps the
+ /// given ICompositeTransport and will add and remove Transport URIs as they are discovered.
+ /// </summary>
+ public static DiscoveryTransport CreateTransport(ICompositeTransport compositeTransport, URISupport.CompositeData compositeData, StringDictionary options)
+ {
+ DiscoveryTransport transport = new DiscoveryTransport(compositeTransport);
+
+ URISupport.SetProperties(transport, options, "transport.");
+ transport.Properties = options;
+
+ Uri discoveryAgentURI = compositeData.Components[0];
+ IDiscoveryAgent discoveryAgent = DiscoveryAgentFactory.CreateAgent(discoveryAgentURI);
+ transport.DiscoveryAgent = discoveryAgent;
+
+ return transport;
+ }
+ }
+}
diff --git a/src/main/csharp/Transport/Discovery/IDiscoveryAgent.cs b/src/main/csharp/Transport/Discovery/IDiscoveryAgent.cs
new file mode 100644
index 0000000..c75c1f8
--- /dev/null
+++ b/src/main/csharp/Transport/Discovery/IDiscoveryAgent.cs
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+ public delegate void ServiceAddHandler(DiscoveryEvent addEvent);
+ public delegate void ServiceRemoveHandler(DiscoveryEvent removeEvent);
+
+ public interface IDiscoveryAgent : IStartable, IStoppable
+ {
+ /// <summary>
+ /// Gets or sets the service add event handler
+ /// </summary>
+ ServiceAddHandler ServiceAdd
+ {
+ get;
+ set;
+ }
+
+ /// <summary>
+ /// Gets or sets the service remove event handler.
+ /// </summary>
+ ServiceRemoveHandler ServiceRemove
+ {
+ get;
+ set;
+ }
+
+ /// <summary>
+ /// Registers the service with the given name.
+ /// </summary>
+ void RegisterService(String name);
+
+ /// <summary>
+ /// A process actively using a service may see it go down before the DiscoveryAgent notices
+ /// the service's failure. That process can use this method to notify the IDiscoveryAgent
+ /// of the failure so that other listeners of this IDiscoveryAgent can also be made aware
+ /// of the failure.
+ /// </summary>
+ void ServiceFailed(DiscoveryEvent failedEvent);
+
+ }
+}
+
diff --git a/src/main/csharp/Transport/Discovery/IDiscoveryAgentFactory.cs b/src/main/csharp/Transport/Discovery/IDiscoveryAgentFactory.cs
new file mode 100644
index 0000000..3cedd8f
--- /dev/null
+++ b/src/main/csharp/Transport/Discovery/IDiscoveryAgentFactory.cs
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+ /// <summary>
+ /// Factory class interface for all DiscoveryAgent factories. Each agent factory
+ /// should define its own factory attribute so that the agents can be found dynamically
+ /// by the DiscoveryAgentFactory class.
+ /// </summary>
+ public interface IDiscoveryAgentFactory
+ {
+ IDiscoveryAgent CreateAgent(Uri uri);
+ }
+}
+
diff --git a/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs b/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
index 0f3de13..a6e9561 100644
--- a/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
+++ b/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
@@ -20,150 +20,356 @@
using System.Net;
using System.Net.Sockets;
using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ.Threads;
using Apache.NMS.ActiveMQ.Transport.Tcp;
+using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast
{
internal delegate void NewBrokerServiceFound(string brokerName, string serviceName);
internal delegate void BrokerServiceRemoved(string brokerName, string serviceName);
- internal class MulticastDiscoveryAgent : IDisposable
+ internal class MulticastDiscoveryAgent : IDiscoveryAgent, IDisposable
{
- public const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3;
- public const int DEFAULT_BACKOFF_MILLISECONDS = 100;
- public const int BACKOFF_MULTIPLIER = 2;
- public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
- public const string DEFAULT_HOST_STR = "default";
- public const string DEFAULT_HOST_IP = "239.255.2.3";
- public const int DEFAULT_PORT = 6155;
+ public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
+ public const string DEFAULT_HOST_STR = "default";
+ public const string DEFAULT_HOST_IP = "239.255.2.3";
+ public const int DEFAULT_PORT = 6155;
+
+ public const int DEFAULT_INITIAL_RECONNECT_DELAY = 1000 * 5;
+ public const int DEFAULT_BACKOFF_MULTIPLIER = 2;
+ public const int DEFAULT_MAX_RECONNECT_DELAY = 1000 * 30;
private const string TYPE_SUFFIX = "ActiveMQ-4.";
private const string ALIVE = "alive";
private const string DEAD = "dead";
private const char DELIMITER = '%';
private const int BUFF_SIZE = 8192;
- private const string DEFAULT_GROUP = "default";
- private const int EXPIRATION_OFFSET_IN_SECONDS = 2;
- private const int WORKER_KILL_TIME_SECONDS = 10;
- private const int SOCKET_TIMEOUT_MILLISECONDS = 500;
+ private const int HEARTBEAT_MISS_BEFORE_DEATH = 10;
+ private const int DEFAULT_IDLE_TIME = 500;
+ private const string DEFAULT_GROUP = "default";
+ private const int WORKER_KILL_TIME_SECONDS = 1000;
- private string group;
- private readonly object stopstartSemaphore = new object();
- private bool isStarted = false;
- private Uri discoveryUri;
+ private const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3;
+ private const int SOCKET_CONNECTION_BACKOFF_TIME = 500;
+
+ private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
+ private long maxReconnectDelay = DEFAULT_MAX_RECONNECT_DELAY;
+ private long backOffMultiplier = DEFAULT_BACKOFF_MULTIPLIER;
+ private bool useExponentialBackOff;
+ private int maxReconnectAttempts;
+
+ private int timeToLive = 1;
+ private string group = DEFAULT_GROUP;
+ private bool loopBackMode;
+ private Dictionary<String, RemoteBrokerData> brokersByService =
+ new Dictionary<String, RemoteBrokerData>();
+ private readonly object servicesLock = new object();
+ private String selfService;
+ private long keepAliveInterval = DEFAULT_IDLE_TIME;
+ private string mcInterface;
+ private string mcNetworkInterface;
+ private string mcJoinNetworkInterface;
+ private DateTime lastAdvertizeTime;
+ private bool reportAdvertizeFailed = true;
+ private readonly Atomic<bool> started = new Atomic<bool>(false);
+
+ private Uri discoveryUri;
private Socket multicastSocket;
private IPEndPoint endPoint;
private Thread worker;
+ private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
- private event NewBrokerServiceFound onNewServiceFound;
- private event BrokerServiceRemoved onServiceRemoved;
+ private ServiceAddHandler serviceAddHandler;
+ private ServiceRemoveHandler serviceRemoveHandler;
- /// <summary>
- /// Indexed by service name
- /// </summary>
- private readonly Dictionary<string, RemoteBrokerData> remoteBrokers;
+ #region Property Setters and Getters
- public MulticastDiscoveryAgent()
- {
- group = DEFAULT_GROUP;
- remoteBrokers = new Dictionary<string, RemoteBrokerData>();
- }
+ public bool LoopBackMode
+ {
+ get { return this.loopBackMode; }
+ set { this.loopBackMode = value; }
+ }
+
+ public int TimeToLive
+ {
+ get { return this.timeToLive; }
+ set { this.timeToLive = value; }
+ }
+
+ public long KeepAliveInterval
+ {
+ get { return this.keepAliveInterval; }
+ set { this.keepAliveInterval = value; }
+ }
+
+ public string Interface
+ {
+ get { return this.mcInterface; }
+ set { this.mcInterface = value; }
+ }
+
+ public string NetworkInterface
+ {
+ get { return this.mcNetworkInterface; }
+ set { this.mcNetworkInterface = value; }
+ }
+
+ public string JoinNetworkInterface
+ {
+ get { return this.mcJoinNetworkInterface; }
+ set { this.mcJoinNetworkInterface = value; }
+ }
+
+ public string Type
+ {
+ get { return this.group + "." + TYPE_SUFFIX; }
+ }
+
+ public long BackOffMultiplier
+ {
+ get { return this.backOffMultiplier; }
+ set { this.backOffMultiplier = value; }
+ }
+
+ public long InitialReconnectDelay
+ {
+ get { return this.initialReconnectDelay; }
+ set { this.initialReconnectDelay = value; }
+ }
+
+ public int MaxReconnectAttempts
+ {
+ get { return this.maxReconnectAttempts; }
+ set { this.maxReconnectAttempts = value; }
+ }
+
+ public long MaxReconnectDelay
+ {
+ get { return this.maxReconnectDelay; }
+ set { this.maxReconnectDelay = value; }
+ }
+
+ public bool UseExponentialBackOff
+ {
+ get { return this.useExponentialBackOff; }
+ set { this.useExponentialBackOff = value; }
+ }
+
+ public string Group
+ {
+ get { return this.group; }
+ set { this.group = value; }
+ }
+
+ public ServiceAddHandler ServiceAdd
+ {
+ get { return serviceAddHandler; }
+ set { this.serviceAddHandler = value; }
+ }
+
+ public ServiceRemoveHandler ServiceRemove
+ {
+ get { return serviceRemoveHandler; }
+ set { this.serviceRemoveHandler = value; }
+ }
+
+ public Uri DiscoveryURI
+ {
+ get { return discoveryUri; }
+ set { discoveryUri = value; }
+ }
+
+ public bool IsStarted
+ {
+ get { return started.Value; }
+ }
+
+ #endregion
+
+ public override String ToString()
+ {
+ return "MulticastDiscoveryAgent-" + (selfService != null ? "advertise:" + selfService : "");
+ }
+
+ public void RegisterService(String name)
+ {
+ this.selfService = name;
+ if (started.Value)
+ {
+ DoAdvertizeSelf();
+ }
+ }
+
+ public void ServiceFailed(DiscoveryEvent failedEvent)
+ {
+ RemoteBrokerData data = brokersByService[failedEvent.ServiceName];
+ if (data != null && data.MarkFailed())
+ {
+ FireServiceRemoveEvent(data);
+ }
+ }
public void Start()
{
- lock(stopstartSemaphore)
- {
- if (discoveryUri == null || discoveryUri.Host.Equals(DEFAULT_HOST_STR))
- {
- discoveryUri = new Uri(DEFAULT_DISCOVERY_URI_STRING);
- }
-
- if(multicastSocket == null)
- {
- int numFailedAttempts = 0;
- int backoffTime = DEFAULT_BACKOFF_MILLISECONDS;
+ if (started.CompareAndSet(false, true)) {
+
+ if (String.IsNullOrEmpty(group))
+ {
+ throw new IOException("You must specify a group to discover");
+ }
+ String type = Type;
+ if (!type.EndsWith("."))
+ {
+ Tracer.Warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
+ type += ".";
+ }
+
+ if (discoveryUri == null)
+ {
+ discoveryUri = new Uri(DEFAULT_DISCOVERY_URI_STRING);
+ }
- Tracer.Info("Connecting to multicast discovery socket.");
- while(!TryToConnectSocket())
- {
- numFailedAttempts++;
- if(numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
- {
- throw new ApplicationException(
- "Could not open the socket in order to discover advertising brokers.");
- }
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("start - discoveryURI = " + discoveryUri);
+ }
- Thread.Sleep(backoffTime);
- backoffTime *= BACKOFF_MULTIPLIER;
- }
- }
+ String targetHost = discoveryUri.Host;
+ int targetPort = discoveryUri.Port;
+
+ if (DEFAULT_HOST_STR.Equals(targetHost))
+ {
+ targetHost = DEFAULT_HOST_IP;
+ }
- if(worker == null)
- {
- Tracer.Info("Starting multicast discovery agent worker thread");
- worker = new Thread(new ThreadStart(worker_DoWork));
- worker.IsBackground = true;
- worker.Start();
- isStarted = true;
- }
- }
+ if (targetPort < 0)
+ {
+ targetPort = DEFAULT_PORT;
+ }
+
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("start - myHost = {0}", targetHost);
+ Tracer.DebugFormat("start - myPort = {0}", targetPort);
+ Tracer.DebugFormat("start - group = {0}", group);
+ Tracer.DebugFormat("start - interface = {0}", mcInterface);
+ Tracer.DebugFormat("start - network interface = {0}", mcNetworkInterface);
+ Tracer.DebugFormat("start - join network interface = {0}", mcJoinNetworkInterface);
+ }
+
+ int numFailedAttempts = 0;
+ int backoffTime = SOCKET_CONNECTION_BACKOFF_TIME;
+
+ Tracer.Info("Connecting to multicast discovery socket.");
+ while (!TryToConnectSocket(targetHost, targetPort))
+ {
+ numFailedAttempts++;
+ if (numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
+ {
+ throw new ApplicationException(
+ "Could not open the socket in order to discover advertising brokers.");
+ }
+
+ Thread.Sleep(backoffTime);
+ backoffTime = (int)(backoffTime * BackOffMultiplier);
+ }
+
+ if(worker == null)
+ {
+ Tracer.Info("Starting multicast discovery agent worker thread");
+ worker = new Thread(new ThreadStart(DiscoveryAgentRun));
+ worker.IsBackground = true;
+ worker.Start();
+ }
+
+ DoAdvertizeSelf();
+ }
}
public void Stop()
{
- Thread localThread = null;
-
- lock(stopstartSemaphore)
+ // Changing the isStarted flag will signal the thread that it needs to shut down.
+ if (started.CompareAndSet(true, false))
{
Tracer.Info("Stopping multicast discovery agent worker thread");
- localThread = worker;
- worker = null;
- // Changing the isStarted flag will signal the thread that it needs to shut down.
- isStarted = false;
+ if (multicastSocket != null)
+ {
+ multicastSocket.Close();
+ }
+ if(worker != null)
+ {
+ // wait for the worker to stop.
+ if(!worker.Join(WORKER_KILL_TIME_SECONDS))
+ {
+ Tracer.Info("!! Timeout waiting for multicast discovery agent localThread to stop");
+ worker.Abort();
+ }
+ worker = null;
+ Tracer.Debug("Multicast discovery agent worker thread stopped");
+ }
+ executor.Shutdown();
+ if (!executor.AwaitTermination(TimeSpan.FromMinutes(1)))
+ {
+ Tracer.DebugFormat("Failed to properly shutdown agent executor {0}", this);
+ }
}
-
- if(localThread != null)
- {
- // wait for the worker to stop.
- if(!localThread.Join(WORKER_KILL_TIME_SECONDS))
- {
- Tracer.Info("!! Timeout waiting for multicast discovery agent localThread to stop");
- localThread.Abort();
- }
- }
-
- Tracer.Info("Multicast discovery agent worker thread joined");
}
- private bool TryToConnectSocket()
+ private bool TryToConnectSocket(string targetHost, int targetPort)
{
bool hasSucceeded = false;
try
{
multicastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
- endPoint = new IPEndPoint(IPAddress.Any, discoveryUri.Port);
+ endPoint = new IPEndPoint(IPAddress.Any, targetPort);
- //We have to allow reuse in the multicast socket. Otherwise, we would be unable to use multiple clients on the same machine.
+ // We have to allow reuse in the multicast socket. Otherwise, we would be unable to
+ // use multiple clients on the same machine.
multicastSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1);
multicastSocket.Bind(endPoint);
IPAddress ipaddress;
- if(!TcpTransportFactory.TryParseIPAddress(discoveryUri.Host, out ipaddress))
+ if(!TcpTransportFactory.TryParseIPAddress(targetHost, out ipaddress))
{
- ipaddress = TcpTransportFactory.GetIPAddress(discoveryUri.Host, AddressFamily.InterNetwork);
+ ipaddress = TcpTransportFactory.GetIPAddress(targetHost, AddressFamily.InterNetwork);
if(null == ipaddress)
{
throw new NMSConnectionException("Invalid host address.");
}
}
- multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
- new MulticastOption(ipaddress, IPAddress.Any));
-#if !NETCF
- multicastSocket.ReceiveTimeout = SOCKET_TIMEOUT_MILLISECONDS;
-#endif
+ if (LoopBackMode)
+ {
+ multicastSocket.MulticastLoopback = true;
+ }
+ if (TimeToLive != 0)
+ {
+ multicastSocket.SetSocketOption(SocketOptionLevel.IP,
+ SocketOptionName.MulticastTimeToLive, timeToLive);
+ }
+ if (!String.IsNullOrEmpty(mcJoinNetworkInterface))
+ {
+ // TODO figure out how to set this.
+ throw new NotSupportedException("McJoinNetworkInterface not yet implemented.");
+ }
+ else
+ {
+ multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
+ new MulticastOption(ipaddress, IPAddress.Any));
+ }
+
+ if (!String.IsNullOrEmpty(mcNetworkInterface))
+ {
+ // TODO figure out how to set this.
+ throw new NotSupportedException("McNetworkInterface not yet implemented.");
+ }
+
+ multicastSocket.ReceiveTimeout = (int)keepAliveInterval;
+
hasSucceeded = true;
}
catch(SocketException)
@@ -173,15 +379,16 @@
return hasSucceeded;
}
- private void worker_DoWork()
+ private void DiscoveryAgentRun()
{
Thread.CurrentThread.Name = "Discovery Agent Thread.";
byte[] buffer = new byte[BUFF_SIZE];
string receivedInfoRaw;
string receivedInfo;
- while(isStarted)
+ while (started.Value)
{
+ DoTimeKeepingServices();
try
{
int numBytes = multicastSocket.Receive(buffer);
@@ -197,177 +404,334 @@
receivedInfo = receivedInfoRaw;
}
- ProcessBrokerMessage(receivedInfo);
+ ProcessServiceAdvertisement(receivedInfo);
}
catch(SocketException)
{
// There was no multicast message sent before the timeout expired...Let us try again.
}
- //We need to clear the buffer.
+ // We need to clear the buffer.
buffer[0] = 0x0;
- ExpireOldServices();
}
}
- private void ProcessBrokerMessage(string message)
+ private void ProcessServiceAdvertisement(string message)
{
string payload;
string brokerName;
string serviceName;
- if(message.StartsWith(MulticastType))
+ if (message.StartsWith(Type))
{
- payload = message.Substring(MulticastType.Length);
+ payload = message.Substring(Type.Length);
brokerName = GetBrokerName(payload);
serviceName = GetServiceName(payload);
- if(payload.StartsWith(ALIVE))
+ if (payload.StartsWith(ALIVE))
{
- ProcessAliveBrokerMessage(brokerName, serviceName);
+ ProcessAlive(brokerName, serviceName);
}
- else if(payload.StartsWith(DEAD))
+ else if (payload.StartsWith(DEAD))
{
- ProcessDeadBrokerMessage(brokerName, serviceName);
+ ProcessDead(serviceName);
}
else
{
- //Malformed Payload
+ // Malformed Payload
}
}
}
- private void ProcessDeadBrokerMessage(string brokerName, string serviceName)
- {
- if(remoteBrokers.ContainsKey(serviceName))
- {
- remoteBrokers.Remove(serviceName);
- if(onServiceRemoved != null)
- {
- onServiceRemoved(brokerName, serviceName);
- }
- }
- }
+ private void DoTimeKeepingServices()
+ {
+ if (started.Value)
+ {
+ DateTime currentTime = DateTime.Now;
+ if (currentTime < lastAdvertizeTime ||
+ ((currentTime - TimeSpan.FromMilliseconds(keepAliveInterval)) > lastAdvertizeTime))
+ {
+ DoAdvertizeSelf();
+ lastAdvertizeTime = currentTime;
+ }
+ DoExpireOldServices();
+ }
+ }
- private void ProcessAliveBrokerMessage(string brokerName, string serviceName)
- {
- if(remoteBrokers.ContainsKey(serviceName))
- {
- remoteBrokers[serviceName].UpdateHeartBeat();
- }
- else
- {
- remoteBrokers.Add(serviceName, new RemoteBrokerData(brokerName, serviceName));
+ private void DoAdvertizeSelf()
+ {
+ if (!String.IsNullOrEmpty(selfService))
+ {
+ String payload = Type;
+ payload += started.Value ? ALIVE : DEAD;
+ payload += DELIMITER + "localhost" + DELIMITER;
+ payload += selfService;
+ try
+ {
+ byte[] data = System.Text.Encoding.UTF8.GetBytes(payload);
+ multicastSocket.Send(data);
+ }
+ catch (Exception e)
+ {
+ // If a send fails, chances are all subsequent sends will fail
+ // too.. No need to keep reporting the
+ // same error over and over.
+ if (reportAdvertizeFailed)
+ {
+ reportAdvertizeFailed = false;
+ Tracer.ErrorFormat("Failed to advertise our service: {0} cause: {1}", payload, e.Message);
+ }
+ }
+ }
+ }
- if(onNewServiceFound != null)
- {
- onNewServiceFound(brokerName, serviceName);
- }
- }
- }
+ private void ProcessAlive(string brokerName, string service)
+ {
+ if (selfService == null || !service.Equals(selfService))
+ {
+ RemoteBrokerData remoteBroker = null;
+ lock (servicesLock)
+ {
+ brokersByService.TryGetValue(service, out remoteBroker);
+ }
+ if (remoteBroker == null)
+ {
+ remoteBroker = new RemoteBrokerData(this, brokerName, service);
+ brokersByService.Add(service, remoteBroker);
+ FireServiceAddEvent(remoteBroker);
+ DoAdvertizeSelf();
+ }
+ else
+ {
+ remoteBroker.UpdateHeartBeat();
+ if (remoteBroker.IsTimeForRecovery())
+ {
+ FireServiceAddEvent(remoteBroker);
+ }
+ }
+ }
+ }
+
+ private void ProcessDead(string service)
+ {
+ if (!service.Equals(selfService))
+ {
+ RemoteBrokerData remoteBroker = null;
+ lock (servicesLock)
+ {
+ brokersByService.TryGetValue(service, out remoteBroker);
+ if (remoteBroker != null)
+ {
+ brokersByService.Remove(service);
+ }
+ }
+ if (remoteBroker != null && !remoteBroker.Failed)
+ {
+ FireServiceRemoveEvent(remoteBroker);
+ }
+ }
+ }
+
+ private void DoExpireOldServices()
+ {
+ DateTime expireTime = DateTime.Now - TimeSpan.FromMilliseconds(keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
+
+ RemoteBrokerData[] services = null;
+ lock (servicesLock)
+ {
+ services = new RemoteBrokerData[this.brokersByService.Count];
+ this.brokersByService.Values.CopyTo(services, 0);
+ }
+
+ foreach(RemoteBrokerData service in services)
+ {
+ if (service.LastHeartBeat < expireTime)
+ {
+ ProcessDead(service.ServiceName);
+ }
+ }
+ }
private static string GetBrokerName(string payload)
{
string[] results = payload.Split(DELIMITER);
- return results[1];
+ if (results.Length >= 2)
+ {
+ return results[1];
+ }
+ return null;
}
private static string GetServiceName(string payload)
{
string[] results = payload.Split(DELIMITER);
- return results[2];
- }
-
- private void ExpireOldServices()
- {
- DateTime expireTime;
- List<RemoteBrokerData> deadServices = new List<RemoteBrokerData>();
-
- foreach(KeyValuePair<string, RemoteBrokerData> brokerService in remoteBrokers)
- {
- expireTime = brokerService.Value.lastHeartBeat.AddSeconds(EXPIRATION_OFFSET_IN_SECONDS);
- if(DateTime.Now > expireTime)
- {
- deadServices.Add(brokerService.Value);
- }
- }
-
- // Remove all of the dead services
- for(int i = 0; i < deadServices.Count; i++)
- {
- ProcessDeadBrokerMessage(deadServices[i].brokerName, deadServices[i].serviceName);
- }
- }
-
- #region Properties
-
- /// <summary>
- /// This property indicates whether or not async send is enabled.
- /// </summary>
- public Uri DiscoveryURI
- {
- get { return discoveryUri; }
- set { discoveryUri = value; }
- }
-
- public bool IsStarted
- {
- get { return isStarted; }
- }
-
- public string Group
- {
- get { return group; }
- set { group = value; }
- }
-
- #endregion
-
- internal string MulticastType
- {
- get { return group + "." + TYPE_SUFFIX; }
- }
-
- internal event NewBrokerServiceFound OnNewServiceFound
- {
- add { onNewServiceFound += value; }
- remove { onNewServiceFound -= value; }
- }
-
- internal event BrokerServiceRemoved OnServiceRemoved
- {
- add { onServiceRemoved += value; }
- remove { onServiceRemoved += value; }
+ if (results.Length >= 3)
+ {
+ return results[2];
+ }
+ return null;
}
public void Dispose()
{
- if(isStarted)
+ if(started.Value)
{
Stop();
}
-
- multicastSocket.Shutdown(SocketShutdown.Both);
- multicastSocket = null;
}
- internal class RemoteBrokerData
+ private void FireServiceRemoveEvent(RemoteBrokerData data)
+ {
+ if (serviceRemoveHandler != null && started.Value)
+ {
+ // Have the listener process the event async so that
+ // he does not block this thread since we are doing time sensitive
+ // processing of events.
+ executor.QueueUserWorkItem(ServiceRemoveCallback, data);
+ }
+ }
+
+ private void ServiceRemoveCallback(object data)
+ {
+ RemoteBrokerData serviceData = data as RemoteBrokerData;
+ this.serviceRemoveHandler(serviceData);
+ }
+
+ private void FireServiceAddEvent(RemoteBrokerData data)
+ {
+ if (serviceAddHandler != null && started.Value)
+ {
+ // Have the listener process the event async so that
+ // he does not block this thread since we are doing time sensitive
+ // processing of events.
+ executor.QueueUserWorkItem(ServiceAddCallback, data);
+ }
+ }
+
+ private void ServiceAddCallback(object data)
+ {
+ RemoteBrokerData serviceData = data as RemoteBrokerData;
+ this.serviceAddHandler(serviceData);
+ }
+
+ internal class RemoteBrokerData : DiscoveryEvent
{
- internal string brokerName;
- internal string serviceName;
+ internal DateTime recoveryTime = DateTime.MinValue;
+ internal int failureCount;
+ internal bool failed;
internal DateTime lastHeartBeat;
- internal RemoteBrokerData(string brokerName, string serviceName)
+ private readonly object syncRoot = new object();
+ private readonly MulticastDiscoveryAgent parent;
+
+ internal RemoteBrokerData(MulticastDiscoveryAgent parent, string brokerName, string serviceName) : base()
{
- this.brokerName = brokerName;
- this.serviceName = serviceName;
+ this.parent = parent;
+ this.BrokerName = brokerName;
+ this.ServiceName = serviceName;
this.lastHeartBeat = DateTime.Now;
}
+ internal bool Failed
+ {
+ get { return this.failed; }
+ }
+
+ internal DateTime LastHeartBeat
+ {
+ get
+ {
+ lock(syncRoot)
+ {
+ return this.lastHeartBeat;
+ }
+ }
+ }
+
internal void UpdateHeartBeat()
{
- this.lastHeartBeat = DateTime.Now;
+ lock (syncRoot)
+ {
+ this.lastHeartBeat = DateTime.Now;
+
+ // Consider that the broker recovery has succeeded if it has not
+ // failed in 60 seconds.
+ if (!failed && failureCount > 0 &&
+ (lastHeartBeat - recoveryTime) > TimeSpan.FromMilliseconds(1000 * 60)) {
+
+ Tracer.DebugFormat("I now think that the {0} service has recovered.", ServiceName);
+
+ failureCount = 0;
+ recoveryTime = DateTime.MinValue;
+ }
+ }
}
+
+ internal bool MarkFailed()
+ {
+ lock (syncRoot)
+ {
+ if (!failed)
+ {
+ failed = true;
+ failureCount++;
+
+ long reconnectDelay;
+ if (!parent.UseExponentialBackOff)
+ {
+ reconnectDelay = parent.InitialReconnectDelay;
+ }
+ else
+ {
+ reconnectDelay = (long)Math.Pow(parent.BackOffMultiplier, failureCount);
+ reconnectDelay = Math.Min(reconnectDelay, parent.MaxReconnectDelay);
+ }
+
+ Tracer.DebugFormat("Remote failure of {0} while still receiving multicast advertisements. " +
+ "Advertising events will be suppressed for {1} ms, the current " +
+ "failure count is: {2}", ServiceName, reconnectDelay, failureCount);
+
+ recoveryTime = DateTime.Now + TimeSpan.FromMilliseconds(reconnectDelay);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /// <summary>
+ /// Returns true if this Broker has been marked as failed and it is now time to
+ /// start a recovery attempt.
+ /// </summary>
+ public bool IsTimeForRecovery()
+ {
+ lock (syncRoot)
+ {
+ if (!failed)
+ {
+ return false;
+ }
+
+ int maxReconnectAttempts = parent.MaxReconnectAttempts;
+
+ // Are we done trying to recover this guy?
+ if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts)
+ {
+ Tracer.DebugFormat("Max reconnect attempts of the {0} service has been reached.", ServiceName);
+ return false;
+ }
+
+ // Is it not yet time?
+ if (DateTime.Now < recoveryTime)
+ {
+ return false;
+ }
+
+ Tracer.DebugFormat("Resuming event advertisement of the {0} service.", ServiceName);
+
+ failed = false;
+ return true;
+ }
+ }
}
}
}
diff --git a/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgentFactory.cs b/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgentFactory.cs
new file mode 100644
index 0000000..0069cad
--- /dev/null
+++ b/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgentFactory.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast
+{
+ [ActiveMQDiscoveryAgentFactory("multicast")]
+ public class MulticastDiscoveryAgentFactory : IDiscoveryAgentFactory
+ {
+ public IDiscoveryAgent CreateAgent(Uri uri)
+ {
+ Tracer.DebugFormat("Creating DiscoveryAgent:[{0}]", uri);
+
+ try
+ {
+ MulticastDiscoveryAgent agent = new MulticastDiscoveryAgent();
+ agent.DiscoveryURI = uri;
+
+ // allow MDA's params to be set via query arguments
+ // (e.g., multicast://default?group=foo
+
+ StringDictionary parameters = URISupport.ParseParameters(uri);
+ URISupport.SetProperties(agent, parameters);
+
+ return agent;
+ }
+ catch(Exception e)
+ {
+ throw new IOException("Could not create discovery agent", e);
+ }
+ }
+ }
+}
+
diff --git a/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs b/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
index c061af0..504d07b 100644
--- a/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
+++ b/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
@@ -1,69 +1,68 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Collections.Specialized;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.ActiveMQ.Transport.Failover
-{
- [ActiveMQTransportFactory("failover")]
- public class FailoverTransportFactory : ITransportFactory
- {
- private ITransport doConnect(Uri location)
- {
- ITransport transport = CreateTransport(URISupport.ParseComposite(location));
- transport = new MutexTransport(transport);
- transport = new ResponseCorrelator(transport);
- return transport;
- }
-
- public ITransport CompositeConnect(Uri location)
- {
- return CreateTransport(URISupport.ParseComposite(location));
- }
-
- public ITransport CompositeConnect(Uri location, SetTransport setTransport)
- {
- throw new NMSConnectionException("Asynchronous composite connection not supported with Failover transport.");
- }
-
- public ITransport CreateTransport(Uri location)
- {
- return doConnect(location);
- }
-
- /// <summary>
- /// </summary>
- /// <param name="compositData"></param>
- /// <returns></returns>
- public ITransport CreateTransport(URISupport.CompositeData compositData)
- {
- StringDictionary options = compositData.Parameters;
- FailoverTransport transport = CreateTransport(options);
- transport.Add(false, compositData.Components);
- return transport;
- }
-
- public FailoverTransport CreateTransport(StringDictionary parameters)
- {
- FailoverTransport transport = new FailoverTransport();
- URISupport.SetProperties(transport, parameters, "transport.");
- return transport;
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+ [ActiveMQTransportFactory("failover")]
+ public class FailoverTransportFactory : ITransportFactory
+ {
+ private ITransport doConnect(Uri location)
+ {
+ ITransport transport = CreateTransport(URISupport.ParseComposite(location));
+ transport = new MutexTransport(transport);
+ transport = new ResponseCorrelator(transport);
+ return transport;
+ }
+
+ public ITransport CompositeConnect(Uri location)
+ {
+ return CreateTransport(URISupport.ParseComposite(location));
+ }
+
+ public ITransport CreateTransport(Uri location)
+ {
+ return doConnect(location);
+ }
+
+ /// <summary>
+ /// Virtual transport create method which can be overriden by subclasses to provide
+ /// an alternate FailoverTransport implementation. All transport creation methods in
+ /// this factory calls through this method to create the ITransport instance so this
+ /// is the only method that needs to be overriden.
+ /// </summary>
+ /// <param name="compositData"></param>
+ /// <returns></returns>
+ public virtual ITransport CreateTransport(URISupport.CompositeData compositData)
+ {
+ StringDictionary options = compositData.Parameters;
+ FailoverTransport transport = CreateTransport(options);
+ transport.Add(false, compositData.Components);
+ return transport;
+ }
+
+ protected FailoverTransport CreateTransport(StringDictionary parameters)
+ {
+ FailoverTransport transport = new FailoverTransport();
+ URISupport.SetProperties(transport, parameters, "transport.");
+ return transport;
+ }
+ }
+}
diff --git a/src/main/csharp/Transport/ITransportFactory.cs b/src/main/csharp/Transport/ITransportFactory.cs
index dd744ff..7b008df 100644
--- a/src/main/csharp/Transport/ITransportFactory.cs
+++ b/src/main/csharp/Transport/ITransportFactory.cs
@@ -25,6 +25,5 @@
{
ITransport CreateTransport(Uri location);
ITransport CompositeConnect(Uri location);
- ITransport CompositeConnect(Uri location, SetTransport setTransport);
}
}
diff --git a/src/main/csharp/Transport/Mock/MockTransportFactory.cs b/src/main/csharp/Transport/Mock/MockTransportFactory.cs
index f3c194f..6fac96b 100644
--- a/src/main/csharp/Transport/Mock/MockTransportFactory.cs
+++ b/src/main/csharp/Transport/Mock/MockTransportFactory.cs
@@ -137,10 +137,5 @@
return transport;
}
-
- public ITransport CompositeConnect(Uri location, SetTransport setTransport)
- {
- throw new NMSConnectionException("Asynchronous composite connection not supported with Mock transport.");
- }
}
}
diff --git a/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs b/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
index cfa1f8f..786e4e2 100644
--- a/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
+++ b/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
@@ -107,11 +107,6 @@
public ITransport CompositeConnect(Uri location)
{
- return CompositeConnect(location, null);
- }
-
- public ITransport CompositeConnect(Uri location, SetTransport setTransport)
- {
// Extract query parameters from broker Uri
StringDictionary map = URISupport.ParseQuery(location.Query);
@@ -165,10 +160,6 @@
}
transport = new WireFormatNegotiator(transport, wireformat);
- if(setTransport != null)
- {
- setTransport(transport, location);
- }
return transport;
}
diff --git a/src/main/csharp/Transport/TransportFactory.cs b/src/main/csharp/Transport/TransportFactory.cs
index 7b110bf..1368a2d 100644
--- a/src/main/csharp/Transport/TransportFactory.cs
+++ b/src/main/csharp/Transport/TransportFactory.cs
@@ -19,10 +19,6 @@
using System.Reflection;
using System.Collections.Generic;
using Apache.NMS.ActiveMQ.Util;
-using Apache.NMS.ActiveMQ.Transport.Discovery;
-using Apache.NMS.ActiveMQ.Transport.Failover;
-using Apache.NMS.ActiveMQ.Transport.Mock;
-using Apache.NMS.ActiveMQ.Transport.Tcp;
namespace Apache.NMS.ActiveMQ.Transport
{
@@ -69,12 +65,6 @@
return tf.CompositeConnect(location);
}
- public static ITransport AsyncCompositeConnect(Uri location, SetTransport setTransport)
- {
- ITransportFactory tf = TransportFactory.CreateTransportFactory(location);
- return tf.CompositeConnect(location, setTransport);
- }
-
/// <summary>
/// Create a transport factory for the scheme. If we do not support the transport protocol,
/// an NMSConnectionException will be thrown.
diff --git a/src/main/csharp/Util/ISuspendable.cs b/src/main/csharp/Util/ISuspendable.cs
new file mode 100644
index 0000000..84f60e6
--- /dev/null
+++ b/src/main/csharp/Util/ISuspendable.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+
+namespace Apache.NMS.ActiveMQ
+{
+ /// <summary>
+ /// Optional interface for service type objects which support a
+ /// logical suspend and resume mode. Services that can be suspended
+ /// when not needed can reduce resource load.
+ /// </summary>
+ public interface ISuspendable
+ {
+ void Suspend();
+
+ void Resume();
+ }
+}
+