blob: ef9c60e306ada83d7a6d46b93ccd0339cb74a4d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using Apache.NMS.ActiveMQ.Transport.Tcp;
namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast
{
internal delegate void NewBrokerServiceFound(string brokerName, string serviceName);
internal delegate void BrokerServiceRemoved(string brokerName, string serviceName);
internal class MulticastDiscoveryAgent : IDisposable
{
public const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3;
public const int DEFAULT_BACKOFF_MILLISECONDS = 100;
public const int BACKOFF_MULTIPLIER = 2;
public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
private const string TYPE_SUFFIX = "ActiveMQ-4.";
private const string ALIVE = "alive";
private const string DEAD = "dead";
private const char DELIMITER = '%';
private const int BUFF_SIZE = 8192;
private const string DEFAULT_GROUP = "default";
private const int EXPIRATION_OFFSET_IN_SECONDS = 2;
private const int WORKER_KILL_TIME_SECONDS = 10;
private const int SOCKET_TIMEOUT_MILLISECONDS = 500;
private string group;
private object stopstartSemaphore = new object();
private bool isStarted = false;
private Uri discoveryUri;
private Socket multicastSocket;
private IPEndPoint endPoint;
private Thread worker;
private event NewBrokerServiceFound onNewServiceFound;
private event BrokerServiceRemoved onServiceRemoved;
/// <summary>
/// Indexed by service name
/// </summary>
private readonly Dictionary<string, RemoteBrokerData> remoteBrokers;
public MulticastDiscoveryAgent()
{
group = DEFAULT_GROUP;
remoteBrokers = new Dictionary<string, RemoteBrokerData>();
}
public void Start()
{
lock(stopstartSemaphore)
{
if (discoveryUri == null)
{
discoveryUri = new Uri(DEFAULT_DISCOVERY_URI_STRING);
}
if(multicastSocket == null)
{
int numFailedAttempts = 0;
int backoffTime = DEFAULT_BACKOFF_MILLISECONDS;
Tracer.Info("Connecting to multicast discovery socket.");
while(!TryToConnectSocket())
{
numFailedAttempts++;
if(numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
{
throw new ApplicationException(
"Could not open the socket in order to discover advertising brokers.");
}
Thread.Sleep(backoffTime);
backoffTime *= BACKOFF_MULTIPLIER;
}
}
if(worker == null)
{
Tracer.Info("Starting multicast discovery agent worker thread");
worker = new Thread(new ThreadStart(worker_DoWork));
worker.IsBackground = true;
worker.Start();
isStarted = true;
}
}
}
public void Stop()
{
Thread localThread = null;
lock(stopstartSemaphore)
{
Tracer.Info("Stopping multicast discovery agent worker thread");
localThread = worker;
worker = null;
// Changing the isStarted flag will signal the thread that it needs to shut down.
isStarted = false;
}
if(localThread != null)
{
// wait for the worker to stop.
if(!localThread.Join(WORKER_KILL_TIME_SECONDS))
{
Tracer.Info("!! Timeout waiting for multicast discovery agent localThread to stop");
localThread.Abort();
}
}
Tracer.Info("Multicast discovery agent worker thread joined");
}
private bool TryToConnectSocket()
{
bool hasSucceeded = false;
try
{
multicastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
endPoint = new IPEndPoint(IPAddress.Any, discoveryUri.Port);
//We have to allow reuse in the multicast socket. Otherwise, we would be unable to use multiple clients on the same machine.
multicastSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1);
multicastSocket.Bind(endPoint);
IPAddress ipaddress;
if(!TcpTransportFactory.TryParseIPAddress(discoveryUri.Host, out ipaddress))
{
ipaddress = TcpTransportFactory.GetIPAddress(discoveryUri.Host, AddressFamily.InterNetwork);
if(null == ipaddress)
{
throw new NMSConnectionException("Invalid host address.");
}
}
multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
new MulticastOption(ipaddress, IPAddress.Any));
#if !NETCF
multicastSocket.ReceiveTimeout = SOCKET_TIMEOUT_MILLISECONDS;
#endif
hasSucceeded = true;
}
catch(SocketException)
{
}
return hasSucceeded;
}
private void worker_DoWork()
{
Thread.CurrentThread.Name = "Discovery Agent Thread.";
byte[] buffer = new byte[BUFF_SIZE];
string receivedInfoRaw;
string receivedInfo;
while(isStarted)
{
try
{
int numBytes = multicastSocket.Receive(buffer);
receivedInfoRaw = System.Text.Encoding.UTF8.GetString(buffer, 0, numBytes);
// We have to remove all of the null bytes if there are any otherwise we just
// take the whole string as is.
if (receivedInfoRaw.IndexOf("\0") != -1)
{
receivedInfo = receivedInfoRaw.Substring(0, receivedInfoRaw.IndexOf("\0"));
}
else
{
receivedInfo = receivedInfoRaw;
}
ProcessBrokerMessage(receivedInfo);
}
catch(SocketException)
{
// There was no multicast message sent before the timeout expired...Let us try again.
}
//We need to clear the buffer.
buffer[0] = 0x0;
ExpireOldServices();
}
}
private void ProcessBrokerMessage(string message)
{
string payload;
string brokerName;
string serviceName;
if(message.StartsWith(MulticastType))
{
payload = message.Substring(MulticastType.Length);
brokerName = GetBrokerName(payload);
serviceName = GetServiceName(payload);
if(payload.StartsWith(ALIVE))
{
ProcessAliveBrokerMessage(brokerName, serviceName);
}
else if(payload.StartsWith(DEAD))
{
ProcessDeadBrokerMessage(brokerName, serviceName);
}
else
{
//Malformed Payload
}
}
}
private void ProcessDeadBrokerMessage(string brokerName, string serviceName)
{
if(remoteBrokers.ContainsKey(serviceName))
{
remoteBrokers.Remove(serviceName);
if(onServiceRemoved != null)
{
onServiceRemoved(brokerName, serviceName);
}
}
}
private void ProcessAliveBrokerMessage(string brokerName, string serviceName)
{
if(remoteBrokers.ContainsKey(serviceName))
{
remoteBrokers[serviceName].UpdateHeartBeat();
}
else
{
remoteBrokers.Add(serviceName, new RemoteBrokerData(brokerName, serviceName));
if(onNewServiceFound != null)
{
onNewServiceFound(brokerName, serviceName);
}
}
}
private static string GetBrokerName(string payload)
{
string[] results = payload.Split(DELIMITER);
return results[1];
}
private static string GetServiceName(string payload)
{
string[] results = payload.Split(DELIMITER);
return results[2];
}
private void ExpireOldServices()
{
DateTime expireTime;
List<RemoteBrokerData> deadServices = new List<RemoteBrokerData>();
foreach(KeyValuePair<string, RemoteBrokerData> brokerService in remoteBrokers)
{
expireTime = brokerService.Value.lastHeartBeat.AddSeconds(EXPIRATION_OFFSET_IN_SECONDS);
if(DateTime.Now > expireTime)
{
deadServices.Add(brokerService.Value);
}
}
// Remove all of the dead services
for(int i = 0; i < deadServices.Count; i++)
{
ProcessDeadBrokerMessage(deadServices[i].brokerName, deadServices[i].serviceName);
}
}
#region Properties
/// <summary>
/// This property indicates whether or not async send is enabled.
/// </summary>
public Uri DiscoveryURI
{
get { return discoveryUri; }
set { discoveryUri = value; }
}
public bool IsStarted
{
get { return isStarted; }
}
public string Group
{
get { return group; }
set { group = value; }
}
#endregion
internal string MulticastType
{
get { return group + "." + TYPE_SUFFIX; }
}
internal event NewBrokerServiceFound OnNewServiceFound
{
add { onNewServiceFound += value; }
remove { onNewServiceFound -= value; }
}
internal event BrokerServiceRemoved OnServiceRemoved
{
add { onServiceRemoved += value; }
remove { onServiceRemoved += value; }
}
public void Dispose()
{
if(isStarted)
{
Stop();
}
multicastSocket.Shutdown(SocketShutdown.Both);
multicastSocket = null;
}
internal class RemoteBrokerData
{
internal string brokerName;
internal string serviceName;
internal DateTime lastHeartBeat;
internal RemoteBrokerData(string brokerName, string serviceName)
{
this.brokerName = brokerName;
this.serviceName = serviceName;
this.lastHeartBeat = DateTime.Now;
}
internal void UpdateHeartBeat()
{
this.lastHeartBeat = DateTime.Now;
}
}
}
}