| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Net; |
| using System.Net.Sockets; |
| using System.Threading; |
| using Apache.NMS.Util; |
| using Apache.NMS.ActiveMQ.Threads; |
| using Apache.NMS.ActiveMQ.Transport.Tcp; |
| using Apache.NMS.ActiveMQ.Commands; |
| |
| namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast |
| { |
| internal class MulticastDiscoveryAgent : AbstractDiscoveryAgent |
| { |
| public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155"; |
| public const string DEFAULT_HOST_STR = "default"; |
| public const string DEFAULT_HOST_IP = "239.255.2.3"; |
| public const int DEFAULT_PORT = 6155; |
| |
| private const string TYPE_SUFFIX = "ActiveMQ-4."; |
| private const string ALIVE = "alive"; |
| private const string DEAD = "dead"; |
| private const char DELIMITER = '%'; |
| private const int BUFF_SIZE = 8192; |
| private const int DEFAULT_IDLE_TIME = 500; |
| private const string DEFAULT_GROUP = "default"; |
| |
| private const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3; |
| private const int SOCKET_CONNECTION_BACKOFF_TIME = 500; |
| |
| private int timeToLive = 1; |
| private string group = DEFAULT_GROUP; |
| private bool loopBackMode; |
| private long keepAliveInterval = DEFAULT_IDLE_TIME; |
| private string mcInterface; |
| private string mcNetworkInterface; |
| private string mcJoinNetworkInterface; |
| |
| private Socket multicastSocket; |
| private IPEndPoint endPoint; |
| |
| #region Property Setters and Getters |
| |
| public bool LoopBackMode |
| { |
| get { return this.loopBackMode; } |
| set { this.loopBackMode = value; } |
| } |
| |
| public int TimeToLive |
| { |
| get { return this.timeToLive; } |
| set { this.timeToLive = value; } |
| } |
| |
| public string Interface |
| { |
| get { return this.mcInterface; } |
| set { this.mcInterface = value; } |
| } |
| |
| public string NetworkInterface |
| { |
| get { return this.mcNetworkInterface; } |
| set { this.mcNetworkInterface = value; } |
| } |
| |
| public string JoinNetworkInterface |
| { |
| get { return this.mcJoinNetworkInterface; } |
| set { this.mcJoinNetworkInterface = value; } |
| } |
| |
| public string Type |
| { |
| get { return this.group + "." + TYPE_SUFFIX; } |
| } |
| |
| public override long KeepAliveInterval |
| { |
| get { return this.keepAliveInterval; } |
| set { this.keepAliveInterval = value; } |
| } |
| |
| #endregion |
| |
| public override String ToString() |
| { |
| return "MulticastDiscoveryAgent-" + (SelfService != null ? "advertise:" + SelfService : ""); |
| } |
| |
| protected override void DoStartAgent() |
| { |
| if (String.IsNullOrEmpty(group)) |
| { |
| throw new IOException("You must specify a group to discover"); |
| } |
| String type = Type; |
| if (!type.EndsWith(".")) |
| { |
| Tracer.Warn("The type '" + type + "' should end with '.' to be a valid Discovery type"); |
| type += "."; |
| } |
| |
| if (DiscoveryURI == null) |
| { |
| DiscoveryURI = new Uri(DEFAULT_DISCOVERY_URI_STRING); |
| } |
| |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.Debug("start - discoveryURI = " + DiscoveryURI); |
| } |
| |
| String targetHost = DiscoveryURI.Host; |
| int targetPort = DiscoveryURI.Port; |
| |
| if (DEFAULT_HOST_STR.Equals(targetHost)) |
| { |
| targetHost = DEFAULT_HOST_IP; |
| } |
| |
| if (targetPort < 0) |
| { |
| targetPort = DEFAULT_PORT; |
| } |
| |
| if (Tracer.IsDebugEnabled) |
| { |
| Tracer.DebugFormat("start - myHost = {0}", targetHost); |
| Tracer.DebugFormat("start - myPort = {0}", targetPort); |
| Tracer.DebugFormat("start - group = {0}", group); |
| Tracer.DebugFormat("start - interface = {0}", mcInterface); |
| Tracer.DebugFormat("start - network interface = {0}", mcNetworkInterface); |
| Tracer.DebugFormat("start - join network interface = {0}", mcJoinNetworkInterface); |
| } |
| |
| int numFailedAttempts = 0; |
| int backoffTime = SOCKET_CONNECTION_BACKOFF_TIME; |
| |
| Tracer.Info("Connecting to multicast discovery socket."); |
| while (!TryToConnectSocket(targetHost, targetPort)) |
| { |
| numFailedAttempts++; |
| if (numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS) |
| { |
| throw new ApplicationException( |
| "Could not open the socket in order to discover advertising brokers."); |
| } |
| |
| Thread.Sleep(backoffTime); |
| backoffTime = (int)(backoffTime * BackOffMultiplier); |
| } |
| } |
| |
| protected override void DoStopAgent() |
| { |
| if (multicastSocket != null) |
| { |
| multicastSocket.Close(); |
| } |
| } |
| |
| private bool TryToConnectSocket(string targetHost, int targetPort) |
| { |
| bool hasSucceeded = false; |
| |
| try |
| { |
| multicastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); |
| endPoint = new IPEndPoint(IPAddress.Any, targetPort); |
| |
| // We have to allow reuse in the multicast socket. Otherwise, we would be unable to |
| // use multiple clients on the same machine. |
| multicastSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1); |
| multicastSocket.Bind(endPoint); |
| |
| IPAddress ipaddress; |
| |
| if(!TcpTransportFactory.TryParseIPAddress(targetHost, out ipaddress)) |
| { |
| ipaddress = TcpTransportFactory.GetIPAddress(targetHost, AddressFamily.InterNetwork); |
| if(null == ipaddress) |
| { |
| throw new NMSConnectionException("Invalid host address."); |
| } |
| } |
| |
| if (LoopBackMode) |
| { |
| multicastSocket.MulticastLoopback = true; |
| } |
| if (TimeToLive != 0) |
| { |
| multicastSocket.SetSocketOption(SocketOptionLevel.IP, |
| SocketOptionName.MulticastTimeToLive, timeToLive); |
| } |
| if (!String.IsNullOrEmpty(mcJoinNetworkInterface)) |
| { |
| // TODO figure out how to set this. |
| throw new NotSupportedException("McJoinNetworkInterface not yet implemented."); |
| } |
| else |
| { |
| multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, |
| new MulticastOption(ipaddress, IPAddress.Any)); |
| } |
| |
| if (!String.IsNullOrEmpty(mcNetworkInterface)) |
| { |
| // TODO figure out how to set this. |
| throw new NotSupportedException("McNetworkInterface not yet implemented."); |
| } |
| |
| multicastSocket.ReceiveTimeout = (int)keepAliveInterval; |
| |
| hasSucceeded = true; |
| } |
| catch(SocketException) |
| { |
| } |
| |
| return hasSucceeded; |
| } |
| |
| protected override void DoDiscovery() |
| { |
| byte[] buffer = new byte[BUFF_SIZE]; |
| string receivedInfoRaw; |
| string receivedInfo; |
| |
| try |
| { |
| int numBytes = multicastSocket.Receive(buffer); |
| receivedInfoRaw = System.Text.Encoding.UTF8.GetString(buffer, 0, numBytes); |
| // We have to remove all of the null bytes if there are any otherwise we just |
| // take the whole string as is. |
| if (receivedInfoRaw.IndexOf("\0") != -1) |
| { |
| receivedInfo = receivedInfoRaw.Substring(0, receivedInfoRaw.IndexOf("\0")); |
| } |
| else |
| { |
| receivedInfo = receivedInfoRaw; |
| } |
| |
| ProcessServiceAdvertisement(receivedInfo); |
| } |
| catch (SocketException) |
| { |
| // There was no multicast message sent before the timeout expired...Let us try again. |
| } |
| } |
| |
| protected override void DoAdvertizeSelf() |
| { |
| if (!String.IsNullOrEmpty(SelfService)) |
| { |
| String payload = Type; |
| payload += started.Value ? ALIVE : DEAD; |
| payload += DELIMITER + "localhost" + DELIMITER; |
| payload += SelfService; |
| |
| byte[] data = System.Text.Encoding.UTF8.GetBytes(payload); |
| multicastSocket.Send(data); |
| } |
| } |
| |
| private void ProcessServiceAdvertisement(string message) |
| { |
| string payload; |
| string brokerName; |
| string serviceName; |
| |
| if (message.StartsWith(Type)) |
| { |
| payload = message.Substring(Type.Length); |
| brokerName = GetBrokerName(payload); |
| serviceName = GetServiceName(payload); |
| |
| if (payload.StartsWith(ALIVE)) |
| { |
| ProcessLiveService(brokerName, serviceName); |
| } |
| else if (payload.StartsWith(DEAD)) |
| { |
| ProcessDeadService(serviceName); |
| } |
| else |
| { |
| // Malformed Payload |
| } |
| } |
| } |
| |
| private static string GetBrokerName(string payload) |
| { |
| string[] results = payload.Split(DELIMITER); |
| if (results.Length >= 2) |
| { |
| return results[1]; |
| } |
| return null; |
| } |
| |
| private static string GetServiceName(string payload) |
| { |
| string[] results = payload.Split(DELIMITER); |
| if (results.Length >= 3) |
| { |
| return results[2]; |
| } |
| return null; |
| } |
| } |
| } |