| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.catalina.tribes.membership; |
| |
| |
| import java.io.IOException; |
| import java.net.BindException; |
| import java.net.DatagramPacket; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.MulticastSocket; |
| import java.net.NetworkInterface; |
| import java.net.SocketTimeoutException; |
| import java.net.StandardSocketOptions; |
| import java.util.Arrays; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.catalina.tribes.Channel; |
| import org.apache.catalina.tribes.Member; |
| import org.apache.catalina.tribes.MembershipListener; |
| import org.apache.catalina.tribes.MessageListener; |
| import org.apache.catalina.tribes.io.ChannelData; |
| import org.apache.catalina.tribes.io.XByteBuffer; |
| import org.apache.catalina.tribes.util.StringManager; |
| import org.apache.juli.logging.Log; |
| import org.apache.juli.logging.LogFactory; |
| |
| /** |
| * A <b>membership</b> implementation using simple multicast. This is the representation of a multicast membership |
| * service. This class is responsible for maintaining a list of active cluster nodes in the cluster. If a node fails to |
| * send out a heartbeat, the node will be dismissed. This is the low level implementation that handles the multicasting |
| * sockets. Need to fix this, could use java.nio and only need one thread to send and receive, or just use a timeout on |
| * the receive |
| */ |
| public class McastServiceImpl extends MembershipProviderBase { |
| |
| private static final Log log = LogFactory.getLog(McastService.class); |
| |
| protected static final int MAX_PACKET_SIZE = 65535; |
| |
| protected static final StringManager sm = StringManager.getManager(Constants.Package); |
| /** |
| * Internal flag used for the listen thread that listens to the multicasting socket. |
| */ |
| protected volatile boolean doRunSender = false; |
| protected volatile boolean doRunReceiver = false; |
| protected volatile int startLevel = 0; |
| /** |
| * Socket that we intend to listen to |
| */ |
| protected MulticastSocket socket; |
| /** |
| * The local member that we intend to broadcast over and over again |
| */ |
| protected final MemberImpl member; |
| /** |
| * The multicast address |
| */ |
| protected final InetAddress address; |
| /** |
| * The multicast port |
| */ |
| protected final int port; |
| /** |
| * The time it takes for a member to expire. |
| */ |
| protected final long timeToExpiration; |
| /** |
| * How often to we send out a broadcast saying we are alive, must be smaller than timeToExpiration |
| */ |
| protected final long sendFrequency; |
| /** |
| * Reuse the sendPacket, no need to create a new one every time |
| */ |
| protected DatagramPacket sendPacket; |
| /** |
| * Reuse the receivePacket, no need to create a new one every time |
| */ |
| protected DatagramPacket receivePacket; |
| |
| /** |
| * The actual listener, for callback when stuff goes down |
| */ |
| protected final MembershipListener service; |
| /** |
| * The actual listener for broadcast callbacks |
| */ |
| protected final MessageListener msgservice; |
| /** |
| * Thread to listen for pings |
| */ |
| protected ReceiverThread receiver; |
| /** |
| * Thread to send pings |
| */ |
| protected SenderThread sender; |
| |
| /** |
| * Time to live for the multicast packets that are being sent out |
| */ |
| protected final int mcastTTL; |
| /** |
| * Read timeout on the mcast socket |
| */ |
| protected int mcastSoTimeout; |
| /** |
| * bind address |
| */ |
| protected final InetAddress mcastBindAddress; |
| |
| /** |
| * nr of times the system has to fail before a recovery is initiated |
| */ |
| protected int recoveryCounter = 10; |
| |
| /** |
| * The time the recovery thread sleeps between recovery attempts |
| */ |
| protected long recoverySleepTime = 5000; |
| |
| /** |
| * Add the ability to turn on/off recovery |
| */ |
| protected boolean recoveryEnabled = true; |
| |
| /** |
| * disable/enable local loopback message |
| */ |
| protected final boolean localLoopbackDisabled; |
| |
| private Channel channel; |
| |
| /** |
| * Create a new mcast service instance. |
| * |
| * @param member - the local member |
| * @param sendFrequency - the time (ms) in between pings sent out |
| * @param expireTime - the time (ms) for a member to expire |
| * @param port - the mcast port |
| * @param bind - the bind address (not sure this is used yet) |
| * @param mcastAddress - the mcast address |
| * @param ttl multicast ttl that will be set on the socket |
| * @param soTimeout Socket timeout |
| * @param service - the callback service |
| * @param msgservice Message listener |
| * @param localLoopbackDisabled - disable loopbackMode |
| * |
| * @throws IOException Init error |
| */ |
| public McastServiceImpl(MemberImpl member, long sendFrequency, long expireTime, int port, InetAddress bind, |
| InetAddress mcastAddress, int ttl, int soTimeout, MembershipListener service, MessageListener msgservice, |
| boolean localLoopbackDisabled) throws IOException { |
| this.member = member; |
| this.address = mcastAddress; |
| this.port = port; |
| this.mcastSoTimeout = soTimeout; |
| this.mcastTTL = ttl; |
| this.mcastBindAddress = bind; |
| this.timeToExpiration = expireTime; |
| this.service = service; |
| this.msgservice = msgservice; |
| this.sendFrequency = sendFrequency; |
| this.localLoopbackDisabled = localLoopbackDisabled; |
| init(); |
| } |
| |
| public void init() throws IOException { |
| setupSocket(); |
| sendPacket = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE); |
| sendPacket.setAddress(address); |
| sendPacket.setPort(port); |
| receivePacket = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE); |
| receivePacket.setAddress(address); |
| receivePacket.setPort(port); |
| member.setCommand(new byte[0]); |
| if (membership == null) { |
| membership = new Membership(member); |
| } |
| } |
| |
| protected void setupSocket() throws IOException { |
| if (mcastBindAddress != null) { |
| try { |
| log.info(sm.getString("mcastServiceImpl.bind", address, Integer.toString(port))); |
| socket = new MulticastSocket(new InetSocketAddress(address, port)); |
| } catch (BindException e) { |
| /* |
| * On some platforms (e.g. Linux) it is not possible to bind to the multicast address. In this case only |
| * bind to the port. |
| */ |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("mcastServiceImpl.bind.failed"), e); |
| } else { |
| log.info(sm.getString("mcastServiceImpl.bind.failed")); |
| } |
| socket = new MulticastSocket(port); |
| } |
| } else { |
| socket = new MulticastSocket(port); |
| } |
| // Hint if we want to disable loop back(local machine) messages |
| socket.setOption(StandardSocketOptions.IP_MULTICAST_LOOP, Boolean.valueOf(!localLoopbackDisabled)); |
| if (mcastBindAddress != null) { |
| if (log.isInfoEnabled()) { |
| log.info(sm.getString("mcastServiceImpl.setInterface", mcastBindAddress)); |
| } |
| NetworkInterface networkInterface = NetworkInterface.getByInetAddress(mcastBindAddress); |
| socket.setNetworkInterface(networkInterface); |
| } // end if |
| // force a so timeout so that we don't block forever |
| if (mcastSoTimeout <= 0) { |
| mcastSoTimeout = (int) sendFrequency; |
| } |
| if (log.isInfoEnabled()) { |
| log.info(sm.getString("mcastServiceImpl.setSoTimeout", Integer.toString(mcastSoTimeout))); |
| } |
| socket.setSoTimeout(mcastSoTimeout); |
| |
| if (mcastTTL >= 0) { |
| if (log.isInfoEnabled()) { |
| log.info(sm.getString("mcastServiceImpl.setTTL", Integer.toString(mcastTTL))); |
| } |
| socket.setTimeToLive(mcastTTL); |
| } |
| } |
| |
| |
| @Override |
| public synchronized void start(int level) throws IOException { |
| boolean valid = false; |
| if ((level & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) { |
| if (receiver != null) { |
| throw new IllegalStateException(sm.getString("mcastServiceImpl.receive.running")); |
| } |
| try { |
| if (sender == null) { |
| socket.joinGroup(new InetSocketAddress(address, 0), null); |
| } |
| } catch (IOException iox) { |
| log.error(sm.getString("mcastServiceImpl.unable.join")); |
| throw iox; |
| } |
| doRunReceiver = true; |
| receiver = new ReceiverThread(); |
| receiver.setDaemon(true); |
| receiver.start(); |
| valid = true; |
| } |
| if ((level & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) { |
| if (sender != null) { |
| throw new IllegalStateException(sm.getString("mcastServiceImpl.send.running")); |
| } |
| if (receiver == null) { |
| socket.joinGroup(new InetSocketAddress(address, 0), null); |
| } |
| // make sure at least one packet gets out there |
| send(false); |
| doRunSender = true; |
| sender = new SenderThread(sendFrequency); |
| sender.setDaemon(true); |
| sender.start(); |
| // we have started the receiver, but not yet waited for membership to establish |
| valid = true; |
| } |
| if (!valid) { |
| throw new IllegalArgumentException(sm.getString("mcastServiceImpl.invalid.startLevel")); |
| } |
| // pause, once or twice |
| waitForMembers(level); |
| startLevel = (startLevel | level); |
| } |
| |
| private void waitForMembers(int level) { |
| long memberwait = sendFrequency * 2; |
| if (log.isInfoEnabled()) { |
| log.info(sm.getString("mcastServiceImpl.waitForMembers.start", Long.toString(memberwait), |
| Integer.toString(level))); |
| } |
| try { |
| Thread.sleep(memberwait); |
| } catch (InterruptedException ignore) { |
| // Ignore |
| } |
| if (log.isInfoEnabled()) { |
| log.info(sm.getString("mcastServiceImpl.waitForMembers.done", Integer.toString(level))); |
| } |
| } |
| |
| @Override |
| public synchronized boolean stop(int level) throws IOException { |
| boolean valid = false; |
| |
| if ((level & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) { |
| valid = true; |
| doRunReceiver = false; |
| if (receiver != null) { |
| receiver.interrupt(); |
| } |
| receiver = null; |
| } |
| if ((level & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) { |
| valid = true; |
| doRunSender = false; |
| if (sender != null) { |
| sender.interrupt(); |
| } |
| sender = null; |
| } |
| |
| if (!valid) { |
| throw new IllegalArgumentException(sm.getString("mcastServiceImpl.invalid.stopLevel")); |
| } |
| startLevel = (startLevel & (~level)); |
| // we're shutting down, send a shutdown message and close the socket |
| if (startLevel == 0) { |
| // send a stop message |
| member.setCommand(Member.SHUTDOWN_PAYLOAD); |
| send(false); |
| // leave mcast group |
| try { |
| socket.leaveGroup(new InetSocketAddress(address, 0), null); |
| } catch (Exception e) { |
| // Shutting down. Only log at debug. |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("mcastServiceImpl.error.stop"), e); |
| } |
| } |
| try { |
| socket.close(); |
| } catch (Exception e) { |
| // Shutting down. Only log at debug. |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("mcastServiceImpl.error.stop"), e); |
| } |
| } |
| member.setServiceStartTime(-1); |
| } |
| return (startLevel == 0); |
| } |
| |
| /** |
| * Receive a datagram packet, locking wait |
| * |
| * @throws IOException Received failed |
| */ |
| public void receive() throws IOException { |
| try { |
| socket.receive(receivePacket); |
| if (receivePacket.getLength() > MAX_PACKET_SIZE) { |
| log.error(sm.getString("mcastServiceImpl.packet.tooLong", Integer.toString(receivePacket.getLength()))); |
| } else { |
| byte[] data = new byte[receivePacket.getLength()]; |
| System.arraycopy(receivePacket.getData(), receivePacket.getOffset(), data, 0, data.length); |
| if (XByteBuffer.firstIndexOf(data, 0, MemberImpl.TRIBES_MBR_BEGIN) == 0) { |
| memberDataReceived(data); |
| } else { |
| memberBroadcastsReceived(data); |
| } |
| } |
| } catch (SocketTimeoutException ignore) { |
| /* |
| * Do nothing. This is normal. We don't want to block forever since the receive thread is the same thread |
| * that does membership expiration. |
| */ |
| } |
| checkExpired(); |
| } |
| |
| private void memberDataReceived(byte[] data) { |
| final Member m = MemberImpl.getMember(data); |
| if (log.isTraceEnabled()) { |
| log.trace("Mcast receive ping from member " + m); |
| } |
| Runnable t = null; |
| Thread currentThread = Thread.currentThread(); |
| if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) { |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("mcastServiceImpl.memberShutdown", m)); |
| } |
| membership.removeMember(m); |
| t = () -> { |
| String name = currentThread.getName(); |
| try { |
| currentThread.setName("Membership-MemberDisappeared"); |
| service.memberDisappeared(m); |
| } finally { |
| currentThread.setName(name); |
| } |
| }; |
| } else if (membership.memberAlive(m)) { |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("mcastServiceImpl.memberAdd", m)); |
| } |
| t = () -> { |
| String name = currentThread.getName(); |
| try { |
| currentThread.setName("Membership-MemberAdded"); |
| service.memberAdded(m); |
| } finally { |
| currentThread.setName(name); |
| } |
| }; |
| } |
| if (t != null) { |
| executor.execute(t); |
| } |
| } |
| |
| private void memberBroadcastsReceived(final byte[] b) { |
| if (log.isTraceEnabled()) { |
| log.trace("Mcast received broadcasts."); |
| } |
| XByteBuffer buffer = new XByteBuffer(b, true); |
| if (buffer.countPackages(true) > 0) { |
| int count = buffer.countPackages(); |
| final ChannelData[] data = new ChannelData[count]; |
| for (int i = 0; i < count; i++) { |
| try { |
| data[i] = buffer.extractPackage(true); |
| } catch (IllegalStateException ise) { |
| log.debug(sm.getString("mcastServiceImpl.messageError"), ise); |
| } |
| } |
| Runnable t = () -> { |
| Thread currentThread = Thread.currentThread(); |
| String name = currentThread.getName(); |
| try { |
| currentThread.setName("Membership-MemberAdded"); |
| for (ChannelData datum : data) { |
| try { |
| if (datum != null && !member.equals(datum.getAddress())) { |
| msgservice.messageReceived(datum); |
| } |
| } catch (Throwable t1) { |
| if (t1 instanceof VirtualMachineError) { |
| throw (VirtualMachineError) t1; |
| } |
| log.error(sm.getString("mcastServiceImpl.unableReceive.broadcastMessage"), t1); |
| } |
| } |
| } finally { |
| currentThread.setName(name); |
| } |
| }; |
| executor.execute(t); |
| } |
| } |
| |
| protected final Object expiredMutex = new Object(); |
| |
| protected void checkExpired() { |
| synchronized (expiredMutex) { |
| Member[] expired = membership.expire(timeToExpiration); |
| for (final Member member : expired) { |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("mcastServiceImpl.memberExpire", member)); |
| } |
| try { |
| Runnable t = () -> { |
| Thread currentThread = Thread.currentThread(); |
| String name = currentThread.getName(); |
| try { |
| currentThread.setName("Membership-MemberExpired"); |
| service.memberDisappeared(member); |
| } finally { |
| currentThread.setName(name); |
| } |
| }; |
| executor.execute(t); |
| } catch (Exception e) { |
| log.error(sm.getString("mcastServiceImpl.memberDisappeared.failed"), e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Send a ping. |
| * |
| * @param checkexpired <code>true</code> to check for expiration |
| * |
| * @throws IOException Send error |
| */ |
| public void send(boolean checkexpired) throws IOException { |
| send(checkexpired, null); |
| } |
| |
| private final Object sendLock = new Object(); |
| |
| public void send(boolean checkexpired, DatagramPacket packet) throws IOException { |
| checkexpired = (checkexpired && (packet == null)); |
| // ignore if we haven't started the sender |
| // if ( (startLevel&Channel.MBR_TX_SEQ) != Channel.MBR_TX_SEQ ) return; |
| if (packet == null) { |
| member.inc(); |
| if (log.isTraceEnabled()) { |
| log.trace("Mcast send ping from member " + member); |
| } |
| byte[] data = member.getData(); |
| packet = new DatagramPacket(data, data.length); |
| } else if (log.isTraceEnabled()) { |
| log.trace("Sending message broadcast " + packet.getLength() + " bytes from " + member); |
| } |
| packet.setAddress(address); |
| packet.setPort(port); |
| // TODO this operation is not thread safe |
| synchronized (sendLock) { |
| socket.send(packet); |
| } |
| if (checkexpired) { |
| checkExpired(); |
| } |
| } |
| |
| public long getServiceStartTime() { |
| return (member != null) ? member.getServiceStartTime() : -1L; |
| } |
| |
| public int getRecoveryCounter() { |
| return recoveryCounter; |
| } |
| |
| public boolean isRecoveryEnabled() { |
| return recoveryEnabled; |
| } |
| |
| public long getRecoverySleepTime() { |
| return recoverySleepTime; |
| } |
| |
| public Channel getChannel() { |
| return channel; |
| } |
| |
| public void setChannel(Channel channel) { |
| this.channel = channel; |
| } |
| |
| public class ReceiverThread extends Thread { |
| int errorCounter = 0; |
| |
| public ReceiverThread() { |
| super(); |
| String channelName = ""; |
| if (channel.getName() != null) { |
| channelName = "[" + channel.getName() + "]"; |
| } |
| setName("Tribes-MembershipReceiver" + channelName); |
| } |
| |
| @Override |
| public void run() { |
| while (doRunReceiver) { |
| try { |
| receive(); |
| errorCounter = 0; |
| } catch (ArrayIndexOutOfBoundsException ax) { |
| // we can ignore this, as it means we have an invalid package |
| // but we will log it to debug |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("mcastServiceImpl.invalidMemberPackage"), ax); |
| } |
| } catch (Exception e) { |
| if (errorCounter == 0 && doRunReceiver) { |
| log.warn(sm.getString("mcastServiceImpl.error.receiving"), e); |
| } else if (log.isDebugEnabled()) { |
| if (doRunReceiver) { |
| log.debug(sm.getString("mcastServiceImpl.error.receiving"), e); |
| } else { |
| log.debug(sm.getString("mcastServiceImpl.error.receivingNoSleep"), e); |
| } |
| } |
| if (doRunReceiver) { |
| try { |
| sleep(500); |
| } catch (Exception ignore) { |
| // Ignore |
| } |
| if ((++errorCounter) >= recoveryCounter) { |
| errorCounter = 0; |
| RecoveryThread.recover(McastServiceImpl.this); |
| } |
| } |
| } |
| } |
| } |
| }// class ReceiverThread |
| |
| public class SenderThread extends Thread { |
| final long time; |
| int errorCounter = 0; |
| |
| public SenderThread(long time) { |
| this.time = time; |
| String channelName = ""; |
| if (channel.getName() != null) { |
| channelName = "[" + channel.getName() + "]"; |
| } |
| setName("Tribes-MembershipSender" + channelName); |
| |
| } |
| |
| @Override |
| public void run() { |
| while (doRunSender) { |
| try { |
| send(true); |
| errorCounter = 0; |
| } catch (Exception e) { |
| if (errorCounter == 0) { |
| log.warn(sm.getString("mcastServiceImpl.send.failed"), e); |
| } else { |
| log.debug(sm.getString("mcastServiceImpl.send.failed"), e); |
| } |
| if ((++errorCounter) >= recoveryCounter) { |
| errorCounter = 0; |
| RecoveryThread.recover(McastServiceImpl.this); |
| } |
| } |
| try { |
| sleep(time); |
| } catch (Exception ignore) { |
| // Ignore |
| } |
| } |
| } |
| }// class SenderThread |
| |
| protected static class RecoveryThread extends Thread { |
| |
| private static final AtomicBoolean running = new AtomicBoolean(false); |
| |
| public static synchronized void recover(McastServiceImpl parent) { |
| |
| if (!parent.isRecoveryEnabled()) { |
| return; |
| } |
| |
| if (!running.compareAndSet(false, true)) { |
| return; |
| } |
| |
| Thread t = new RecoveryThread(parent); |
| String channelName = ""; |
| if (parent.channel.getName() != null) { |
| channelName = "[" + parent.channel.getName() + "]"; |
| } |
| t.setName("Tribes-MembershipRecovery" + channelName); |
| t.setDaemon(true); |
| t.start(); |
| } |
| |
| |
| final McastServiceImpl parent; |
| |
| public RecoveryThread(McastServiceImpl parent) { |
| this.parent = parent; |
| } |
| |
| public boolean stopService() { |
| try { |
| parent.stop(Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ); |
| return true; |
| } catch (Exception e) { |
| log.warn(sm.getString("mcastServiceImpl.recovery.stopFailed"), e); |
| return false; |
| } |
| } |
| |
| public boolean startService() { |
| try { |
| parent.init(); |
| parent.start(Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ); |
| return true; |
| } catch (Exception e) { |
| log.warn(sm.getString("mcastServiceImpl.recovery.startFailed"), e); |
| return false; |
| } |
| } |
| |
| @Override |
| public void run() { |
| boolean success = false; |
| int attempt = 0; |
| try { |
| while (!success) { |
| if (log.isInfoEnabled()) { |
| log.info(sm.getString("mcastServiceImpl.recovery")); |
| } |
| if (stopService() & startService()) { |
| success = true; |
| if (log.isInfoEnabled()) { |
| log.info(sm.getString("mcastServiceImpl.recovery.successful")); |
| } |
| } |
| try { |
| if (!success) { |
| if (log.isInfoEnabled()) { |
| log.info(sm.getString("mcastServiceImpl.recovery.failed", Integer.toString(++attempt), |
| Long.toString(parent.recoverySleepTime))); |
| } |
| sleep(parent.recoverySleepTime); |
| } |
| } catch (InterruptedException ignore) { |
| } |
| } |
| } finally { |
| running.set(false); |
| } |
| } |
| } |
| |
| public void setRecoveryCounter(int recoveryCounter) { |
| this.recoveryCounter = recoveryCounter; |
| } |
| |
| public void setRecoveryEnabled(boolean recoveryEnabled) { |
| this.recoveryEnabled = recoveryEnabled; |
| } |
| |
| public void setRecoverySleepTime(long recoverySleepTime) { |
| this.recoverySleepTime = recoverySleepTime; |
| } |
| } |