| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.activemq.transport.discovery.multicast; |
| |
| import java.io.IOException; |
| import java.net.DatagramPacket; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.MulticastSocket; |
| import java.net.NetworkInterface; |
| import java.net.SocketAddress; |
| import java.net.SocketTimeoutException; |
| import java.net.URI; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.activemq.command.DiscoveryEvent; |
| import org.apache.activemq.transport.discovery.DiscoveryAgent; |
| import org.apache.activemq.transport.discovery.DiscoveryListener; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A {@link DiscoveryAgent} using a multicast address and heartbeat packets |
| * encoded using any wireformat, but openwire by default. |
| * |
| * |
| */ |
| public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { |
| |
| public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155"; |
| public static final String DEFAULT_HOST_STR = "default"; |
| public static final String DEFAULT_HOST_IP = System.getProperty("activemq.partition.discovery", "239.255.2.3"); |
| public static final int DEFAULT_PORT = 6155; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class); |
| private static final String TYPE_SUFFIX = "ActiveMQ-4."; |
| private static final String ALIVE = "alive."; |
| private static final String DEAD = "dead."; |
| private static final String DELIMITER = "%"; |
| private static final int BUFF_SIZE = 8192; |
| private static final int DEFAULT_IDLE_TIME = 500; |
| private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10; |
| |
| private long initialReconnectDelay = 1000 * 5; |
| private long maxReconnectDelay = 1000 * 30; |
| private long backOffMultiplier = 2; |
| private boolean useExponentialBackOff; |
| private int maxReconnectAttempts; |
| |
| private int timeToLive = 1; |
| private boolean loopBackMode; |
| private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>(); |
| private String group = "default"; |
| private URI discoveryURI; |
| private InetAddress inetAddress; |
| private SocketAddress sockAddress; |
| private DiscoveryListener discoveryListener; |
| private String selfService; |
| private MulticastSocket mcast; |
| private Thread runner; |
| private long keepAliveInterval = DEFAULT_IDLE_TIME; |
| private String mcInterface; |
| private String mcNetworkInterface; |
| private String mcJoinNetworkInterface; |
| private long lastAdvertizeTime; |
| private AtomicBoolean started = new AtomicBoolean(false); |
| private boolean reportAdvertizeFailed = true; |
| private ExecutorService executor = null; |
| |
| class RemoteBrokerData { |
| final String brokerName; |
| final String service; |
| long lastHeartBeat; |
| long recoveryTime; |
| int failureCount; |
| boolean failed; |
| |
| public RemoteBrokerData(String brokerName, String service) { |
| this.brokerName = brokerName; |
| this.service = service; |
| this.lastHeartBeat = System.currentTimeMillis(); |
| } |
| |
| public synchronized void updateHeartBeat() { |
| lastHeartBeat = System.currentTimeMillis(); |
| |
| // Consider that the broker recovery has succeeded if it has not |
| // failed in 60 seconds. |
| if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("I now think that the " + service + " service has recovered."); |
| } |
| failureCount = 0; |
| recoveryTime = 0; |
| } |
| } |
| |
| public synchronized long getLastHeartBeat() { |
| return lastHeartBeat; |
| } |
| |
| public synchronized boolean markFailed() { |
| if (!failed) { |
| failed = true; |
| failureCount++; |
| |
| long reconnectDelay; |
| if (!useExponentialBackOff) { |
| reconnectDelay = initialReconnectDelay; |
| } else { |
| reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount); |
| if (reconnectDelay > maxReconnectDelay) { |
| reconnectDelay = maxReconnectDelay; |
| } |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Remote failure of " + service + " while still receiving multicast advertisements. Advertising events will be suppressed for " + reconnectDelay |
| + " ms, the current failure count is: " + failureCount); |
| } |
| |
| recoveryTime = System.currentTimeMillis() + reconnectDelay; |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * @return true if this broker is marked failed and it is now the right |
| * time to start recovery. |
| */ |
| public synchronized boolean doRecovery() { |
| if (!failed) { |
| return false; |
| } |
| |
| // Are we done trying to recover this guy? |
| if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Max reconnect attempts of the " + service + " service has been reached."); |
| } |
| return false; |
| } |
| |
| // Is it not yet time? |
| if (System.currentTimeMillis() < recoveryTime) { |
| return false; |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Resuming event advertisement of the " + service + " service."); |
| } |
| failed = false; |
| return true; |
| } |
| |
| public boolean isFailed() { |
| return failed; |
| } |
| } |
| |
| /** |
| * Set the discovery listener |
| * |
| * @param listener |
| */ |
| public void setDiscoveryListener(DiscoveryListener listener) { |
| this.discoveryListener = listener; |
| } |
| |
| /** |
| * register a service |
| */ |
| public void registerService(String name) throws IOException { |
| this.selfService = name; |
| if (started.get()) { |
| doAdvertizeSelf(); |
| } |
| } |
| |
| /** |
| * @return Returns the loopBackMode. |
| */ |
| public boolean isLoopBackMode() { |
| return loopBackMode; |
| } |
| |
| /** |
| * @param loopBackMode The loopBackMode to set. |
| */ |
| public void setLoopBackMode(boolean loopBackMode) { |
| this.loopBackMode = loopBackMode; |
| } |
| |
| /** |
| * @return Returns the timeToLive. |
| */ |
| public int getTimeToLive() { |
| return timeToLive; |
| } |
| |
| /** |
| * @param timeToLive The timeToLive to set. |
| */ |
| public void setTimeToLive(int timeToLive) { |
| this.timeToLive = timeToLive; |
| } |
| |
| /** |
| * @return the discoveryURI |
| */ |
| public URI getDiscoveryURI() { |
| return discoveryURI; |
| } |
| |
| /** |
| * Set the discoveryURI |
| * |
| * @param discoveryURI |
| */ |
| public void setDiscoveryURI(URI discoveryURI) { |
| this.discoveryURI = discoveryURI; |
| } |
| |
| public long getKeepAliveInterval() { |
| return keepAliveInterval; |
| } |
| |
| public void setKeepAliveInterval(long keepAliveInterval) { |
| this.keepAliveInterval = keepAliveInterval; |
| } |
| |
| public void setInterface(String mcInterface) { |
| this.mcInterface = mcInterface; |
| } |
| |
| public void setNetworkInterface(String mcNetworkInterface) { |
| this.mcNetworkInterface = mcNetworkInterface; |
| } |
| |
| public void setJoinNetworkInterface(String mcJoinNetwrokInterface) { |
| this.mcJoinNetworkInterface = mcJoinNetwrokInterface; |
| } |
| |
| /** |
| * start the discovery agent |
| * |
| * @throws Exception |
| */ |
| public void start() throws Exception { |
| |
| if (started.compareAndSet(false, true)) { |
| |
| if (group == null || group.length() == 0) { |
| throw new IOException("You must specify a group to discover"); |
| } |
| String type = getType(); |
| if (!type.endsWith(".")) { |
| LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type"); |
| type += "."; |
| } |
| |
| if (discoveryURI == null) { |
| discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING); |
| } |
| |
| if (LOG.isTraceEnabled()) |
| LOG.trace("start - discoveryURI = " + discoveryURI); |
| |
| String myHost = discoveryURI.getHost(); |
| int myPort = discoveryURI.getPort(); |
| |
| if( DEFAULT_HOST_STR.equals(myHost) ) |
| myHost = DEFAULT_HOST_IP; |
| |
| if(myPort < 0 ) |
| myPort = DEFAULT_PORT; |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("start - myHost = " + myHost); |
| LOG.trace("start - myPort = " + myPort); |
| LOG.trace("start - group = " + group ); |
| LOG.trace("start - interface = " + mcInterface ); |
| LOG.trace("start - network interface = " + mcNetworkInterface ); |
| LOG.trace("start - join network interface = " + mcJoinNetworkInterface ); |
| } |
| |
| this.inetAddress = InetAddress.getByName(myHost); |
| this.sockAddress = new InetSocketAddress(this.inetAddress, myPort); |
| mcast = new MulticastSocket(myPort); |
| mcast.setLoopbackMode(loopBackMode); |
| mcast.setTimeToLive(getTimeToLive()); |
| if (mcJoinNetworkInterface != null) { |
| mcast.joinGroup(sockAddress, NetworkInterface.getByName(mcJoinNetworkInterface)); |
| } |
| else { |
| mcast.joinGroup(inetAddress); |
| } |
| mcast.setSoTimeout((int)keepAliveInterval); |
| if (mcInterface != null) { |
| mcast.setInterface(InetAddress.getByName(mcInterface)); |
| } |
| if (mcNetworkInterface != null) { |
| mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface)); |
| } |
| runner = new Thread(this); |
| runner.setName(this.toString() + ":" + runner.getName()); |
| runner.setDaemon(true); |
| runner.start(); |
| doAdvertizeSelf(); |
| } |
| } |
| |
| /** |
| * stop the channel |
| * |
| * @throws Exception |
| */ |
| public void stop() throws Exception { |
| if (started.compareAndSet(true, false)) { |
| doAdvertizeSelf(); |
| if (mcast != null) { |
| mcast.close(); |
| } |
| if (runner != null) { |
| runner.interrupt(); |
| } |
| getExecutor().shutdownNow(); |
| } |
| } |
| |
| public String getType() { |
| return group + "." + TYPE_SUFFIX; |
| } |
| |
| public void run() { |
| byte[] buf = new byte[BUFF_SIZE]; |
| DatagramPacket packet = new DatagramPacket(buf, 0, buf.length); |
| while (started.get()) { |
| doTimeKeepingServices(); |
| try { |
| mcast.receive(packet); |
| if (packet.getLength() > 0) { |
| String str = new String(packet.getData(), packet.getOffset(), packet.getLength()); |
| processData(str); |
| } |
| } catch (SocketTimeoutException se) { |
| // ignore |
| } catch (IOException e) { |
| if (started.get()) { |
| LOG.error("failed to process packet: " + e); |
| } |
| } |
| } |
| } |
| |
| private void processData(String str) { |
| if (discoveryListener != null) { |
| if (str.startsWith(getType())) { |
| String payload = str.substring(getType().length()); |
| if (payload.startsWith(ALIVE)) { |
| String brokerName = getBrokerName(payload.substring(ALIVE.length())); |
| String service = payload.substring(ALIVE.length() + brokerName.length() + 2); |
| processAlive(brokerName, service); |
| } else { |
| String brokerName = getBrokerName(payload.substring(DEAD.length())); |
| String service = payload.substring(DEAD.length() + brokerName.length() + 2); |
| processDead(service); |
| } |
| } |
| } |
| } |
| |
| private void doTimeKeepingServices() { |
| if (started.get()) { |
| long currentTime = System.currentTimeMillis(); |
| if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) { |
| doAdvertizeSelf(); |
| lastAdvertizeTime = currentTime; |
| } |
| doExpireOldServices(); |
| } |
| } |
| |
| private void doAdvertizeSelf() { |
| if (selfService != null) { |
| String payload = getType(); |
| payload += started.get() ? ALIVE : DEAD; |
| payload += DELIMITER + "localhost" + DELIMITER; |
| payload += selfService; |
| try { |
| byte[] data = payload.getBytes(); |
| DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress); |
| mcast.send(packet); |
| } catch (IOException e) { |
| // If a send fails, chances are all subsequent sends will fail |
| // too.. No need to keep reporting the |
| // same error over and over. |
| if (reportAdvertizeFailed) { |
| reportAdvertizeFailed = false; |
| LOG.error("Failed to advertise our service: " + payload, e); |
| if ("Operation not permitted".equals(e.getMessage())) { |
| LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup. " |
| + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress()); |
| } |
| } |
| } |
| } |
| } |
| |
| private void processAlive(String brokerName, String service) { |
| if (selfService == null || !service.equals(selfService)) { |
| RemoteBrokerData data = brokersByService.get(service); |
| if (data == null) { |
| data = new RemoteBrokerData(brokerName, service); |
| brokersByService.put(service, data); |
| fireServiceAddEvent(data); |
| doAdvertizeSelf(); |
| } else { |
| data.updateHeartBeat(); |
| if (data.doRecovery()) { |
| fireServiceAddEvent(data); |
| } |
| } |
| } |
| } |
| |
| private void processDead(String service) { |
| if (!service.equals(selfService)) { |
| RemoteBrokerData data = brokersByService.remove(service); |
| if (data != null && !data.isFailed()) { |
| fireServiceRemovedEvent(data); |
| } |
| } |
| } |
| |
| private void doExpireOldServices() { |
| long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); |
| for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) { |
| RemoteBrokerData data = i.next(); |
| if (data.getLastHeartBeat() < expireTime) { |
| processDead(data.service); |
| } |
| } |
| } |
| |
| private String getBrokerName(String str) { |
| String result = null; |
| int start = str.indexOf(DELIMITER); |
| if (start >= 0) { |
| int end = str.indexOf(DELIMITER, start + 1); |
| result = str.substring(start + 1, end); |
| } |
| return result; |
| } |
| |
| public void serviceFailed(DiscoveryEvent event) throws IOException { |
| RemoteBrokerData data = brokersByService.get(event.getServiceName()); |
| if (data != null && data.markFailed()) { |
| fireServiceRemovedEvent(data); |
| } |
| } |
| |
| private void fireServiceRemovedEvent(RemoteBrokerData data) { |
| if (discoveryListener != null && started.get()) { |
| final DiscoveryEvent event = new DiscoveryEvent(data.service); |
| event.setBrokerName(data.brokerName); |
| |
| // Have the listener process the event async so that |
| // he does not block this thread since we are doing time sensitive |
| // processing of events. |
| getExecutor().execute(new Runnable() { |
| public void run() { |
| DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; |
| if (discoveryListener != null) { |
| discoveryListener.onServiceRemove(event); |
| } |
| } |
| }); |
| } |
| } |
| |
| private void fireServiceAddEvent(RemoteBrokerData data) { |
| if (discoveryListener != null && started.get()) { |
| final DiscoveryEvent event = new DiscoveryEvent(data.service); |
| event.setBrokerName(data.brokerName); |
| |
| // Have the listener process the event async so that |
| // he does not block this thread since we are doing time sensitive |
| // processing of events. |
| getExecutor().execute(new Runnable() { |
| public void run() { |
| DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; |
| if (discoveryListener != null) { |
| discoveryListener.onServiceAdd(event); |
| } |
| } |
| }); |
| } |
| } |
| |
| private ExecutorService getExecutor() { |
| if (executor == null) { |
| final String threadName = "Notifier-" + this.toString(); |
| executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { |
| public Thread newThread(Runnable runable) { |
| Thread t = new Thread(runable, threadName); |
| t.setDaemon(true); |
| return t; |
| } |
| }); |
| } |
| return executor; |
| } |
| |
| public long getBackOffMultiplier() { |
| return backOffMultiplier; |
| } |
| |
| public void setBackOffMultiplier(long backOffMultiplier) { |
| this.backOffMultiplier = backOffMultiplier; |
| } |
| |
| public long getInitialReconnectDelay() { |
| return initialReconnectDelay; |
| } |
| |
| public void setInitialReconnectDelay(long initialReconnectDelay) { |
| this.initialReconnectDelay = initialReconnectDelay; |
| } |
| |
| public int getMaxReconnectAttempts() { |
| return maxReconnectAttempts; |
| } |
| |
| public void setMaxReconnectAttempts(int maxReconnectAttempts) { |
| this.maxReconnectAttempts = maxReconnectAttempts; |
| } |
| |
| public long getMaxReconnectDelay() { |
| return maxReconnectDelay; |
| } |
| |
| public void setMaxReconnectDelay(long maxReconnectDelay) { |
| this.maxReconnectDelay = maxReconnectDelay; |
| } |
| |
| public boolean isUseExponentialBackOff() { |
| return useExponentialBackOff; |
| } |
| |
| public void setUseExponentialBackOff(boolean useExponentialBackOff) { |
| this.useExponentialBackOff = useExponentialBackOff; |
| } |
| |
| public void setGroup(String group) { |
| this.group = group; |
| } |
| |
| @Override |
| public String toString() { |
| return "MulticastDiscoveryAgent-" |
| + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener); |
| } |
| } |