| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.catalina.cluster.mcast; |
| |
| |
| import java.net.MulticastSocket; |
| import java.io.IOException; |
| import java.net.InetAddress ; |
| import java.net.DatagramPacket; |
| import java.net.SocketTimeoutException; |
| |
| import org.apache.catalina.cluster.MembershipListener; |
| |
| /** |
| * A <b>membership</b> implementation using simple multicast. |
| * This is the representation of a multicast membership service. |
| * This class is responsible for maintaining a list of active cluster nodes in the cluster. |
| * If a node fails to send out a heartbeat, the node will be dismissed. |
| * This is the low level implementation that handles the multicasting sockets. |
| * Need to fix this, could use java.nio and only need one thread to send and receive, or |
| * just use a timeout on the receive |
| * @author Filip Hanik |
| * @author Peter Rossbach |
| * @version $Revision$, $Date$ |
| */ |
| public class McastServiceImpl |
| { |
| private static org.apache.commons.logging.Log log = |
| org.apache.commons.logging.LogFactory.getLog( McastService.class ); |
| /** |
| * Internal flag used for the listen thread that listens to the multicasting socket. |
| */ |
| protected boolean doRun = false; |
| /** |
| * Socket that we intend to listen to |
| */ |
| protected MulticastSocket socket; |
| /** |
| * The local member that we intend to broad cast over and over again |
| */ |
| protected McastMember member; |
| /** |
| * The multicast address |
| */ |
| protected InetAddress address; |
| /** |
| * The multicast port |
| */ |
| protected int port; |
| /** |
| * The time it takes for a member to expire. |
| */ |
| protected long timeToExpiration; |
| /** |
| * How often to we send out a broadcast saying we are alive, must be smaller than timeToExpiration |
| */ |
| protected long sendFrequency; |
| /** |
| * Reuse the sendPacket, no need to create a new one everytime |
| */ |
| protected DatagramPacket sendPacket; |
| /** |
| * Reuse the receivePacket, no need to create a new one everytime |
| */ |
| protected DatagramPacket receivePacket; |
| /** |
| * The membership, used so that we calculate memberships when they arrive or don't arrive |
| */ |
| protected McastMembership membership; |
| /** |
| * The actual listener, for callback when shits goes down |
| */ |
| protected MembershipListener service; |
| /** |
| * Thread to listen for pings |
| */ |
| protected ReceiverThread receiver; |
| /** |
| * Thread to send pings |
| */ |
| protected SenderThread sender; |
| |
| /** |
| * When was the service started |
| */ |
| protected long serviceStartTime = System.currentTimeMillis(); |
| |
| protected int mcastTTL = -1; |
| protected int mcastSoTimeout = -1; |
| protected InetAddress mcastBindAddress = null; |
| |
| /** |
| * nr of times the system has to fail before a recovery is initiated |
| */ |
| protected int recoveryCounter = 10; |
| |
| /** |
| * The time the recovery thread sleeps between recovery attempts |
| */ |
| protected long recoverySleepTime = 5000; |
| |
| /** |
| * Add the ability to turn on/off recovery |
| */ |
| protected boolean recoveryEnabled = true; |
| |
| /** |
| * Create a new mcast service impl |
| * @param member - the local member |
| * @param sendFrequency - the time (ms) in between pings sent out |
| * @param expireTime - the time (ms) for a member to expire |
| * @param port - the mcast port |
| * @param bind - the bind address (not sure this is used yet) |
| * @param mcastAddress - the mcast address |
| * @param service - the callback service |
| * @throws IOException |
| */ |
| public McastServiceImpl( |
| McastMember member, |
| long sendFrequency, |
| long expireTime, |
| int port, |
| InetAddress bind, |
| InetAddress mcastAddress, |
| int ttl, |
| int soTimeout, |
| MembershipListener service) |
| throws IOException { |
| this.member = member; |
| address = mcastAddress; |
| this.port = port; |
| this.mcastSoTimeout = soTimeout; |
| this.mcastTTL = ttl; |
| this.mcastBindAddress = bind; |
| timeToExpiration = expireTime; |
| this.service = service; |
| this.sendFrequency = sendFrequency; |
| init(); |
| } |
| |
| protected void init() throws IOException { |
| setupSocket(); |
| sendPacket = new DatagramPacket(new byte[1000],1000); |
| sendPacket.setAddress(address); |
| sendPacket.setPort(port); |
| receivePacket = new DatagramPacket(new byte[1000],1000); |
| receivePacket.setAddress(address); |
| receivePacket.setPort(port); |
| if(membership == null) membership = new McastMembership(member.getName()); |
| } |
| |
| protected void setupSocket() throws IOException { |
| if (mcastBindAddress != null) socket = new MulticastSocket(new java.net. |
| InetSocketAddress(mcastBindAddress, port)); |
| else socket = new MulticastSocket(port); |
| socket.setLoopbackMode(false); //hint that we don't need loop back messages |
| if (mcastBindAddress != null) { |
| if(log.isInfoEnabled()) |
| log.info("Setting multihome multicast interface to:" + |
| mcastBindAddress); |
| socket.setInterface(mcastBindAddress); |
| } //end if |
| //force a so timeout so that we don't block forever |
| if ( mcastSoTimeout <= 0 ) mcastSoTimeout = (int)sendFrequency; |
| if(log.isInfoEnabled()) |
| log.info("Setting cluster mcast soTimeout to "+mcastSoTimeout); |
| socket.setSoTimeout(mcastSoTimeout); |
| if ( mcastTTL >= 0 ) { |
| if(log.isInfoEnabled()) |
| log.info("Setting cluster mcast TTL to " + mcastTTL); |
| socket.setTimeToLive(mcastTTL); |
| } |
| } |
| |
| /** |
| * Start the service |
| * @param level 1 starts the receiver, level 2 starts the sender |
| * @throws IOException if the service fails to start |
| * @throws IllegalStateException if the service is already started |
| */ |
| public synchronized void start(int level) throws IOException { |
| if ( sender != null && receiver != null ) throw new IllegalStateException("Service already running."); |
| if ( level == 1 ) { |
| socket.joinGroup(address); |
| doRun = true; |
| receiver = new ReceiverThread(); |
| receiver.setDaemon(true); |
| receiver.start(); |
| } |
| if ( level==2 ) { |
| serviceStartTime = System.currentTimeMillis(); |
| sender = new SenderThread(sendFrequency); |
| sender.setDaemon(true); |
| sender.start(); |
| |
| } |
| } |
| |
| /** |
| * Stops the service |
| * @throws IOException if the service fails to disconnect from the sockets |
| */ |
| public synchronized void stop() throws IOException { |
| try { |
| socket.leaveGroup(address); |
| } catch (IOException ignore) { |
| } finally { |
| doRun = false; |
| if(sender!= null) sender.interrupt() ; |
| sender = null; |
| if(receiver!= null) receiver.interrupt() ; |
| receiver = null; |
| serviceStartTime = Long.MAX_VALUE; |
| } |
| } |
| |
| /** |
| * Receive a datagram packet, locking wait |
| * @throws IOException |
| */ |
| public void receive() throws IOException { |
| try { |
| socket.receive(receivePacket); |
| |
| byte[] data = new byte[receivePacket.getLength()]; |
| System.arraycopy(receivePacket.getData(),receivePacket.getOffset(),data,0,data.length); |
| McastMember m = McastMember.getMember(data); |
| if(log.isDebugEnabled()) |
| log.debug("Mcast receive ping from member " + m); |
| if ( membership.memberAlive(m) ) { |
| if(log.isDebugEnabled()) |
| log.debug("Mcast add member " + m); |
| service.memberAdded(m); |
| } |
| } finally { |
| checkExpire(); |
| } |
| } |
| |
| protected Object expiredMutex = new Object(); |
| |
| /** |
| * check member expire or alive |
| */ |
| protected void checkExpire() { |
| synchronized (expiredMutex) { |
| McastMember[] expired = membership.expire(timeToExpiration); |
| for ( int i=0; i<expired.length; i++) { |
| if(log.isDebugEnabled()) |
| log.debug("Mcast expire member " + expired[i]); |
| service.memberDisappeared(expired[i]); |
| } |
| } |
| } |
| |
| /** |
| * Send a ping |
| * @throws Exception |
| */ |
| public void send() throws Exception{ |
| try { |
| member.inc(); |
| |
| if(log.isDebugEnabled()) |
| log.debug("Mcast send ping from member " + member); |
| byte[] data = member.getData(this.serviceStartTime); |
| DatagramPacket p = new DatagramPacket(data,data.length); |
| p.setAddress(address); |
| p.setPort(port); |
| socket.send(p); |
| } finally { |
| checkExpire() ; |
| } |
| } |
| |
| public long getServiceStartTime() { |
| return this.serviceStartTime; |
| } |
| |
| public int getRecoveryCounter() { |
| return recoveryCounter; |
| } |
| |
| public boolean isRecoveryEnabled() { |
| return recoveryEnabled; |
| } |
| |
| public long getRecoverySleepTime() { |
| return recoverySleepTime; |
| } |
| |
| public void setRecoveryCounter(int recoveryCounter) { |
| this.recoveryCounter = recoveryCounter; |
| } |
| |
| public void setRecoveryEnabled(boolean recoveryEnabled) { |
| this.recoveryEnabled = recoveryEnabled; |
| } |
| |
| public void setRecoverySleepTime(long recoverySleepTime) { |
| this.recoverySleepTime = recoverySleepTime; |
| } |
| |
| public class ReceiverThread extends Thread { |
| |
| public ReceiverThread() { |
| super(); |
| setName("Cluster-MembershipReceiver"); |
| } |
| |
| public void run() { |
| int errorCounter = 0 ; |
| while ( doRun ) { |
| try { |
| receive(); |
| errorCounter = 0; |
| } catch ( Exception x ) { |
| if (errorCounter==0) { |
| if(! (x instanceof SocketTimeoutException)) |
| log.warn("Error receiving mcast package (errorCounter=" +errorCounter+ "). Sleeping " +sendFrequency + " ms",x); |
| } else { |
| if(! (x instanceof SocketTimeoutException) |
| && log.isDebugEnabled()) |
| log.debug("Error receiving mcast package (errorCounter=" +errorCounter+ "). Sleeping " +sendFrequency+ " ms",x); |
| } |
| try { Thread.sleep(sendFrequency); } catch ( Exception ignore ){} |
| if ( (++errorCounter)>=recoveryCounter ) { |
| log.warn("Error receiving mcast package (errorCounter=" +errorCounter+ "). Try Recovery!",x); |
| errorCounter=0; |
| new RecoveryThread(McastServiceImpl.this); |
| } |
| } |
| } |
| log.warn("Receiver Thread ends with errorCounter=" +errorCounter+ "."); |
| |
| } |
| } |
| |
| public class SenderThread extends Thread { |
| |
| long time; |
| |
| McastServiceImpl service ; |
| |
| public SenderThread(long time) { |
| this.time = time; |
| setName("Cluster-MembershipSender"); |
| |
| } |
| |
| public void run() { |
| int errorCounter = 0 ; |
| while ( doRun ) { |
| try { |
| send(); |
| errorCounter = 0; |
| } catch ( Exception x ) { |
| if (errorCounter==0) { |
| log.warn("Unable to send mcast message.",x); |
| } |
| else { |
| if(log.isDebugEnabled()) |
| log.debug("Unable to send mcast message.",x); |
| } |
| if ( (++errorCounter)>=recoveryCounter ) { |
| errorCounter=0; |
| new RecoveryThread(McastServiceImpl.this); |
| } |
| } |
| try { Thread.sleep(time); } catch ( Exception ignore ) {} |
| } |
| log.warn("Sender Thread ends with errorCounter=" +errorCounter+ "."); |
| } |
| } |
| |
| protected static class RecoveryThread extends Thread { |
| |
| static boolean running = false; |
| |
| McastServiceImpl parent = null; |
| |
| public RecoveryThread(McastServiceImpl parent) { |
| this.parent = parent; |
| if (!init(this)) this.parent = null; |
| } |
| |
| public static synchronized boolean init(RecoveryThread t) { |
| if ( running ) { |
| return false; |
| } |
| if ( !t.parent.isRecoveryEnabled()) { |
| return false; |
| } |
| running = true; |
| t.setName("Cluster-MembershipRecovery"); |
| t.setDaemon(true); |
| t.start(); |
| return true; |
| } |
| |
| public boolean stopService() { |
| try { |
| parent.stop(); |
| return true; |
| } catch (Exception x) { |
| log.warn("Recovery thread failed to stop membership service.", x); |
| return false; |
| } |
| } |
| |
| public boolean startService() { |
| try { |
| parent.init(); |
| parent.start(1); |
| parent.start(2); |
| return true; |
| } catch (Exception x) { |
| log.warn("Recovery thread failed to start membership service.", x); |
| return false; |
| } |
| } |
| |
| public void run() { |
| boolean success = false; |
| int attempt = 0; |
| try { |
| while (!success) { |
| if(log.isInfoEnabled()) |
| log.info("Cluster membership, running recovery thread, multicasting is not functional."); |
| success = stopService(); |
| if(success) { |
| try { |
| Thread.sleep(1000 + parent.mcastSoTimeout); |
| } catch (Exception ignore){} |
| success = startService(); |
| if(success && log.isInfoEnabled()) |
| log.info("Membership recovery was successful."); |
| } |
| try { |
| if (!success) { |
| if(log.isInfoEnabled()) |
| log.info("Recovery attempt " + (++attempt) + " failed, trying again in " +parent.recoverySleepTime + " milliseconds"); |
| Thread.sleep(parent.recoverySleepTime); |
| // check member expire... |
| parent.checkExpire() ; |
| } |
| }catch (InterruptedException ignore) { |
| } |
| } |
| } finally { |
| running = false; |
| } |
| } |
| } |
| } |