blob: a6e95611c94327325ee6bda42feaed95b984809b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ.Threads;
using Apache.NMS.ActiveMQ.Transport.Tcp;
using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast
{
internal delegate void NewBrokerServiceFound(string brokerName, string serviceName);
internal delegate void BrokerServiceRemoved(string brokerName, string serviceName);
internal class MulticastDiscoveryAgent : IDiscoveryAgent, IDisposable
{
public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
public const string DEFAULT_HOST_STR = "default";
public const string DEFAULT_HOST_IP = "239.255.2.3";
public const int DEFAULT_PORT = 6155;
public const int DEFAULT_INITIAL_RECONNECT_DELAY = 1000 * 5;
public const int DEFAULT_BACKOFF_MULTIPLIER = 2;
public const int DEFAULT_MAX_RECONNECT_DELAY = 1000 * 30;
private const string TYPE_SUFFIX = "ActiveMQ-4.";
private const string ALIVE = "alive";
private const string DEAD = "dead";
private const char DELIMITER = '%';
private const int BUFF_SIZE = 8192;
private const int HEARTBEAT_MISS_BEFORE_DEATH = 10;
private const int DEFAULT_IDLE_TIME = 500;
private const string DEFAULT_GROUP = "default";
private const int WORKER_KILL_TIME_SECONDS = 1000;
private const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3;
private const int SOCKET_CONNECTION_BACKOFF_TIME = 500;
private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
private long maxReconnectDelay = DEFAULT_MAX_RECONNECT_DELAY;
private long backOffMultiplier = DEFAULT_BACKOFF_MULTIPLIER;
private bool useExponentialBackOff;
private int maxReconnectAttempts;
private int timeToLive = 1;
private string group = DEFAULT_GROUP;
private bool loopBackMode;
private Dictionary<String, RemoteBrokerData> brokersByService =
new Dictionary<String, RemoteBrokerData>();
private readonly object servicesLock = new object();
private String selfService;
private long keepAliveInterval = DEFAULT_IDLE_TIME;
private string mcInterface;
private string mcNetworkInterface;
private string mcJoinNetworkInterface;
private DateTime lastAdvertizeTime;
private bool reportAdvertizeFailed = true;
private readonly Atomic<bool> started = new Atomic<bool>(false);
private Uri discoveryUri;
private Socket multicastSocket;
private IPEndPoint endPoint;
private Thread worker;
private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
private ServiceAddHandler serviceAddHandler;
private ServiceRemoveHandler serviceRemoveHandler;
#region Property Setters and Getters
public bool LoopBackMode
{
get { return this.loopBackMode; }
set { this.loopBackMode = value; }
}
public int TimeToLive
{
get { return this.timeToLive; }
set { this.timeToLive = value; }
}
public long KeepAliveInterval
{
get { return this.keepAliveInterval; }
set { this.keepAliveInterval = value; }
}
public string Interface
{
get { return this.mcInterface; }
set { this.mcInterface = value; }
}
public string NetworkInterface
{
get { return this.mcNetworkInterface; }
set { this.mcNetworkInterface = value; }
}
public string JoinNetworkInterface
{
get { return this.mcJoinNetworkInterface; }
set { this.mcJoinNetworkInterface = value; }
}
public string Type
{
get { return this.group + "." + TYPE_SUFFIX; }
}
public long BackOffMultiplier
{
get { return this.backOffMultiplier; }
set { this.backOffMultiplier = value; }
}
public long InitialReconnectDelay
{
get { return this.initialReconnectDelay; }
set { this.initialReconnectDelay = value; }
}
public int MaxReconnectAttempts
{
get { return this.maxReconnectAttempts; }
set { this.maxReconnectAttempts = value; }
}
public long MaxReconnectDelay
{
get { return this.maxReconnectDelay; }
set { this.maxReconnectDelay = value; }
}
public bool UseExponentialBackOff
{
get { return this.useExponentialBackOff; }
set { this.useExponentialBackOff = value; }
}
public string Group
{
get { return this.group; }
set { this.group = value; }
}
public ServiceAddHandler ServiceAdd
{
get { return serviceAddHandler; }
set { this.serviceAddHandler = value; }
}
public ServiceRemoveHandler ServiceRemove
{
get { return serviceRemoveHandler; }
set { this.serviceRemoveHandler = value; }
}
public Uri DiscoveryURI
{
get { return discoveryUri; }
set { discoveryUri = value; }
}
public bool IsStarted
{
get { return started.Value; }
}
#endregion
public override String ToString()
{
return "MulticastDiscoveryAgent-" + (selfService != null ? "advertise:" + selfService : "");
}
public void RegisterService(String name)
{
this.selfService = name;
if (started.Value)
{
DoAdvertizeSelf();
}
}
public void ServiceFailed(DiscoveryEvent failedEvent)
{
RemoteBrokerData data = brokersByService[failedEvent.ServiceName];
if (data != null && data.MarkFailed())
{
FireServiceRemoveEvent(data);
}
}
public void Start()
{
if (started.CompareAndSet(false, true)) {
if (String.IsNullOrEmpty(group))
{
throw new IOException("You must specify a group to discover");
}
String type = Type;
if (!type.EndsWith("."))
{
Tracer.Warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
type += ".";
}
if (discoveryUri == null)
{
discoveryUri = new Uri(DEFAULT_DISCOVERY_URI_STRING);
}
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("start - discoveryURI = " + discoveryUri);
}
String targetHost = discoveryUri.Host;
int targetPort = discoveryUri.Port;
if (DEFAULT_HOST_STR.Equals(targetHost))
{
targetHost = DEFAULT_HOST_IP;
}
if (targetPort < 0)
{
targetPort = DEFAULT_PORT;
}
if (Tracer.IsDebugEnabled)
{
Tracer.DebugFormat("start - myHost = {0}", targetHost);
Tracer.DebugFormat("start - myPort = {0}", targetPort);
Tracer.DebugFormat("start - group = {0}", group);
Tracer.DebugFormat("start - interface = {0}", mcInterface);
Tracer.DebugFormat("start - network interface = {0}", mcNetworkInterface);
Tracer.DebugFormat("start - join network interface = {0}", mcJoinNetworkInterface);
}
int numFailedAttempts = 0;
int backoffTime = SOCKET_CONNECTION_BACKOFF_TIME;
Tracer.Info("Connecting to multicast discovery socket.");
while (!TryToConnectSocket(targetHost, targetPort))
{
numFailedAttempts++;
if (numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
{
throw new ApplicationException(
"Could not open the socket in order to discover advertising brokers.");
}
Thread.Sleep(backoffTime);
backoffTime = (int)(backoffTime * BackOffMultiplier);
}
if(worker == null)
{
Tracer.Info("Starting multicast discovery agent worker thread");
worker = new Thread(new ThreadStart(DiscoveryAgentRun));
worker.IsBackground = true;
worker.Start();
}
DoAdvertizeSelf();
}
}
public void Stop()
{
// Changing the isStarted flag will signal the thread that it needs to shut down.
if (started.CompareAndSet(true, false))
{
Tracer.Info("Stopping multicast discovery agent worker thread");
if (multicastSocket != null)
{
multicastSocket.Close();
}
if(worker != null)
{
// wait for the worker to stop.
if(!worker.Join(WORKER_KILL_TIME_SECONDS))
{
Tracer.Info("!! Timeout waiting for multicast discovery agent localThread to stop");
worker.Abort();
}
worker = null;
Tracer.Debug("Multicast discovery agent worker thread stopped");
}
executor.Shutdown();
if (!executor.AwaitTermination(TimeSpan.FromMinutes(1)))
{
Tracer.DebugFormat("Failed to properly shutdown agent executor {0}", this);
}
}
}
private bool TryToConnectSocket(string targetHost, int targetPort)
{
bool hasSucceeded = false;
try
{
multicastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
endPoint = new IPEndPoint(IPAddress.Any, targetPort);
// We have to allow reuse in the multicast socket. Otherwise, we would be unable to
// use multiple clients on the same machine.
multicastSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1);
multicastSocket.Bind(endPoint);
IPAddress ipaddress;
if(!TcpTransportFactory.TryParseIPAddress(targetHost, out ipaddress))
{
ipaddress = TcpTransportFactory.GetIPAddress(targetHost, AddressFamily.InterNetwork);
if(null == ipaddress)
{
throw new NMSConnectionException("Invalid host address.");
}
}
if (LoopBackMode)
{
multicastSocket.MulticastLoopback = true;
}
if (TimeToLive != 0)
{
multicastSocket.SetSocketOption(SocketOptionLevel.IP,
SocketOptionName.MulticastTimeToLive, timeToLive);
}
if (!String.IsNullOrEmpty(mcJoinNetworkInterface))
{
// TODO figure out how to set this.
throw new NotSupportedException("McJoinNetworkInterface not yet implemented.");
}
else
{
multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
new MulticastOption(ipaddress, IPAddress.Any));
}
if (!String.IsNullOrEmpty(mcNetworkInterface))
{
// TODO figure out how to set this.
throw new NotSupportedException("McNetworkInterface not yet implemented.");
}
multicastSocket.ReceiveTimeout = (int)keepAliveInterval;
hasSucceeded = true;
}
catch(SocketException)
{
}
return hasSucceeded;
}
private void DiscoveryAgentRun()
{
Thread.CurrentThread.Name = "Discovery Agent Thread.";
byte[] buffer = new byte[BUFF_SIZE];
string receivedInfoRaw;
string receivedInfo;
while (started.Value)
{
DoTimeKeepingServices();
try
{
int numBytes = multicastSocket.Receive(buffer);
receivedInfoRaw = System.Text.Encoding.UTF8.GetString(buffer, 0, numBytes);
// We have to remove all of the null bytes if there are any otherwise we just
// take the whole string as is.
if (receivedInfoRaw.IndexOf("\0") != -1)
{
receivedInfo = receivedInfoRaw.Substring(0, receivedInfoRaw.IndexOf("\0"));
}
else
{
receivedInfo = receivedInfoRaw;
}
ProcessServiceAdvertisement(receivedInfo);
}
catch(SocketException)
{
// There was no multicast message sent before the timeout expired...Let us try again.
}
// We need to clear the buffer.
buffer[0] = 0x0;
}
}
private void ProcessServiceAdvertisement(string message)
{
string payload;
string brokerName;
string serviceName;
if (message.StartsWith(Type))
{
payload = message.Substring(Type.Length);
brokerName = GetBrokerName(payload);
serviceName = GetServiceName(payload);
if (payload.StartsWith(ALIVE))
{
ProcessAlive(brokerName, serviceName);
}
else if (payload.StartsWith(DEAD))
{
ProcessDead(serviceName);
}
else
{
// Malformed Payload
}
}
}
private void DoTimeKeepingServices()
{
if (started.Value)
{
DateTime currentTime = DateTime.Now;
if (currentTime < lastAdvertizeTime ||
((currentTime - TimeSpan.FromMilliseconds(keepAliveInterval)) > lastAdvertizeTime))
{
DoAdvertizeSelf();
lastAdvertizeTime = currentTime;
}
DoExpireOldServices();
}
}
private void DoAdvertizeSelf()
{
if (!String.IsNullOrEmpty(selfService))
{
String payload = Type;
payload += started.Value ? ALIVE : DEAD;
payload += DELIMITER + "localhost" + DELIMITER;
payload += selfService;
try
{
byte[] data = System.Text.Encoding.UTF8.GetBytes(payload);
multicastSocket.Send(data);
}
catch (Exception e)
{
// If a send fails, chances are all subsequent sends will fail
// too.. No need to keep reporting the
// same error over and over.
if (reportAdvertizeFailed)
{
reportAdvertizeFailed = false;
Tracer.ErrorFormat("Failed to advertise our service: {0} cause: {1}", payload, e.Message);
}
}
}
}
private void ProcessAlive(string brokerName, string service)
{
if (selfService == null || !service.Equals(selfService))
{
RemoteBrokerData remoteBroker = null;
lock (servicesLock)
{
brokersByService.TryGetValue(service, out remoteBroker);
}
if (remoteBroker == null)
{
remoteBroker = new RemoteBrokerData(this, brokerName, service);
brokersByService.Add(service, remoteBroker);
FireServiceAddEvent(remoteBroker);
DoAdvertizeSelf();
}
else
{
remoteBroker.UpdateHeartBeat();
if (remoteBroker.IsTimeForRecovery())
{
FireServiceAddEvent(remoteBroker);
}
}
}
}
private void ProcessDead(string service)
{
if (!service.Equals(selfService))
{
RemoteBrokerData remoteBroker = null;
lock (servicesLock)
{
brokersByService.TryGetValue(service, out remoteBroker);
if (remoteBroker != null)
{
brokersByService.Remove(service);
}
}
if (remoteBroker != null && !remoteBroker.Failed)
{
FireServiceRemoveEvent(remoteBroker);
}
}
}
private void DoExpireOldServices()
{
DateTime expireTime = DateTime.Now - TimeSpan.FromMilliseconds(keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
RemoteBrokerData[] services = null;
lock (servicesLock)
{
services = new RemoteBrokerData[this.brokersByService.Count];
this.brokersByService.Values.CopyTo(services, 0);
}
foreach(RemoteBrokerData service in services)
{
if (service.LastHeartBeat < expireTime)
{
ProcessDead(service.ServiceName);
}
}
}
private static string GetBrokerName(string payload)
{
string[] results = payload.Split(DELIMITER);
if (results.Length >= 2)
{
return results[1];
}
return null;
}
private static string GetServiceName(string payload)
{
string[] results = payload.Split(DELIMITER);
if (results.Length >= 3)
{
return results[2];
}
return null;
}
public void Dispose()
{
if(started.Value)
{
Stop();
}
}
private void FireServiceRemoveEvent(RemoteBrokerData data)
{
if (serviceRemoveHandler != null && started.Value)
{
// Have the listener process the event async so that
// he does not block this thread since we are doing time sensitive
// processing of events.
executor.QueueUserWorkItem(ServiceRemoveCallback, data);
}
}
private void ServiceRemoveCallback(object data)
{
RemoteBrokerData serviceData = data as RemoteBrokerData;
this.serviceRemoveHandler(serviceData);
}
private void FireServiceAddEvent(RemoteBrokerData data)
{
if (serviceAddHandler != null && started.Value)
{
// Have the listener process the event async so that
// he does not block this thread since we are doing time sensitive
// processing of events.
executor.QueueUserWorkItem(ServiceAddCallback, data);
}
}
private void ServiceAddCallback(object data)
{
RemoteBrokerData serviceData = data as RemoteBrokerData;
this.serviceAddHandler(serviceData);
}
internal class RemoteBrokerData : DiscoveryEvent
{
internal DateTime recoveryTime = DateTime.MinValue;
internal int failureCount;
internal bool failed;
internal DateTime lastHeartBeat;
private readonly object syncRoot = new object();
private readonly MulticastDiscoveryAgent parent;
internal RemoteBrokerData(MulticastDiscoveryAgent parent, string brokerName, string serviceName) : base()
{
this.parent = parent;
this.BrokerName = brokerName;
this.ServiceName = serviceName;
this.lastHeartBeat = DateTime.Now;
}
internal bool Failed
{
get { return this.failed; }
}
internal DateTime LastHeartBeat
{
get
{
lock(syncRoot)
{
return this.lastHeartBeat;
}
}
}
internal void UpdateHeartBeat()
{
lock (syncRoot)
{
this.lastHeartBeat = DateTime.Now;
// Consider that the broker recovery has succeeded if it has not
// failed in 60 seconds.
if (!failed && failureCount > 0 &&
(lastHeartBeat - recoveryTime) > TimeSpan.FromMilliseconds(1000 * 60)) {
Tracer.DebugFormat("I now think that the {0} service has recovered.", ServiceName);
failureCount = 0;
recoveryTime = DateTime.MinValue;
}
}
}
internal bool MarkFailed()
{
lock (syncRoot)
{
if (!failed)
{
failed = true;
failureCount++;
long reconnectDelay;
if (!parent.UseExponentialBackOff)
{
reconnectDelay = parent.InitialReconnectDelay;
}
else
{
reconnectDelay = (long)Math.Pow(parent.BackOffMultiplier, failureCount);
reconnectDelay = Math.Min(reconnectDelay, parent.MaxReconnectDelay);
}
Tracer.DebugFormat("Remote failure of {0} while still receiving multicast advertisements. " +
"Advertising events will be suppressed for {1} ms, the current " +
"failure count is: {2}", ServiceName, reconnectDelay, failureCount);
recoveryTime = DateTime.Now + TimeSpan.FromMilliseconds(reconnectDelay);
return true;
}
}
return false;
}
/// <summary>
/// Returns true if this Broker has been marked as failed and it is now time to
/// start a recovery attempt.
/// </summary>
public bool IsTimeForRecovery()
{
lock (syncRoot)
{
if (!failed)
{
return false;
}
int maxReconnectAttempts = parent.MaxReconnectAttempts;
// Are we done trying to recover this guy?
if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts)
{
Tracer.DebugFormat("Max reconnect attempts of the {0} service has been reached.", ServiceName);
return false;
}
// Is it not yet time?
if (DateTime.Now < recoveryTime)
{
return false;
}
Tracer.DebugFormat("Resuming event advertisement of the {0} service.", ServiceName);
failed = false;
return true;
}
}
}
}
}