| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.catalina.tribes.membership; |
| |
| import java.io.Serial; |
| import java.io.Serializable; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Properties; |
| |
| import org.apache.catalina.tribes.Channel; |
| import org.apache.catalina.tribes.ChannelException; |
| import org.apache.catalina.tribes.ChannelException.FaultyMember; |
| import org.apache.catalina.tribes.ChannelListener; |
| import org.apache.catalina.tribes.Heartbeat; |
| import org.apache.catalina.tribes.Member; |
| import org.apache.catalina.tribes.group.Response; |
| import org.apache.catalina.tribes.group.RpcCallback; |
| import org.apache.catalina.tribes.group.RpcChannel; |
| import org.apache.catalina.tribes.util.Arrays; |
| import org.apache.catalina.tribes.util.ExceptionUtils; |
| import org.apache.catalina.tribes.util.StringManager; |
| import org.apache.juli.logging.Log; |
| import org.apache.juli.logging.LogFactory; |
| |
| public class StaticMembershipProvider extends MembershipProviderBase |
| implements RpcCallback, ChannelListener, Heartbeat { |
| |
| protected static final StringManager sm = StringManager.getManager(StaticMembershipProvider.class); |
| private static final Log log = LogFactory.getLog(StaticMembershipProvider.class); |
| |
| protected Channel channel; |
| protected RpcChannel rpcChannel; |
| private String membershipName = null; |
| private byte[] membershipId = null; |
| protected ArrayList<StaticMember> staticMembers; |
| protected int sendOptions = Channel.SEND_OPTIONS_ASYNCHRONOUS; |
| protected long expirationTime = 5000; |
| protected int connectTimeout = 500; |
| protected long rpcTimeout = 3000; |
| protected int startLevel = 0; |
| // for ping thread |
| protected boolean useThread = false; |
| protected long pingInterval = 1000; |
| protected volatile boolean running = true; |
| protected PingThread thread = null; |
| |
| @Override |
| public void init(Properties properties) throws Exception { |
| String expirationTimeStr = properties.getProperty("expirationTime"); |
| this.expirationTime = Long.parseLong(expirationTimeStr); |
| String connectTimeoutStr = properties.getProperty("connectTimeout"); |
| this.connectTimeout = Integer.parseInt(connectTimeoutStr); |
| String rpcTimeouStr = properties.getProperty("rpcTimeout"); |
| this.rpcTimeout = Long.parseLong(rpcTimeouStr); |
| this.membershipName = properties.getProperty("membershipName"); |
| this.membershipId = membershipName.getBytes(StandardCharsets.ISO_8859_1); |
| membership = new Membership(service.getLocalMember(true)); |
| this.rpcChannel = new RpcChannel(this.membershipId, channel, this); |
| this.channel.addChannelListener(this); |
| String useThreadStr = properties.getProperty("useThread"); |
| this.useThread = Boolean.parseBoolean(useThreadStr); |
| String pingIntervalStr = properties.getProperty("pingInterval"); |
| this.pingInterval = Long.parseLong(pingIntervalStr); |
| } |
| |
| @Override |
| public void start(int level) throws Exception { |
| if (Channel.MBR_RX_SEQ == (level & Channel.MBR_RX_SEQ)) { |
| // no-op |
| } |
| if (Channel.MBR_TX_SEQ == (level & Channel.MBR_TX_SEQ)) { |
| // no-op |
| } |
| startLevel = (startLevel | level); |
| if (startLevel == (Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ)) { |
| startMembership(getAliveMembers(staticMembers.toArray(new Member[0]))); |
| running = true; |
| if (thread == null && useThread) { |
| thread = new PingThread(); |
| thread.setDaemon(true); |
| thread.setName("StaticMembership.PingThread[" + this.channel.getName() + "]"); |
| thread.start(); |
| } |
| } |
| } |
| |
| @Override |
| public boolean stop(int level) throws Exception { |
| if (Channel.MBR_RX_SEQ == (level & Channel.MBR_RX_SEQ)) { |
| // no-op |
| } |
| if (Channel.MBR_TX_SEQ == (level & Channel.MBR_TX_SEQ)) { |
| // no-op |
| } |
| startLevel = (startLevel & (~level)); |
| if (startLevel == 0) { |
| running = false; |
| if (thread != null) { |
| thread.interrupt(); |
| thread = null; |
| } |
| if (this.rpcChannel != null) { |
| this.rpcChannel.breakdown(); |
| } |
| if (this.channel != null) { |
| try { |
| stopMembership(this.getMembers()); |
| } catch (Throwable t) { |
| ExceptionUtils.handleThrowable(t); |
| // Otherwise ignore |
| } |
| this.channel.removeChannelListener(this); |
| this.channel = null; |
| } |
| this.rpcChannel = null; |
| this.membership.reset(); |
| } |
| return (startLevel == 0); |
| } |
| |
| protected void startMembership(Member[] members) throws ChannelException { |
| if (members.length == 0) { |
| return; |
| } |
| MemberMessage msg = new MemberMessage(membershipId, MemberMessage.MSG_START, service.getLocalMember(true)); |
| Response[] resp = rpcChannel.send(members, msg, RpcChannel.ALL_REPLY, sendOptions, rpcTimeout); |
| if (resp.length > 0) { |
| for (Response response : resp) { |
| messageReceived(response.getMessage(), response.getSource()); |
| } |
| } else { |
| log.warn(sm.getString("staticMembershipProvider.startMembership.noReplies")); |
| } |
| } |
| |
| protected Member setupMember(Member mbr) { |
| // no-op |
| return mbr; |
| } |
| |
| protected void memberAdded(Member member) { |
| Member mbr = setupMember(member); |
| if (membership.memberAlive(mbr)) { |
| Runnable r = () -> { |
| Thread currentThread = Thread.currentThread(); |
| String name = currentThread.getName(); |
| try { |
| currentThread.setName("StaticMembership-memberAdded"); |
| membershipListener.memberAdded(mbr); |
| } finally { |
| currentThread.setName(name); |
| } |
| }; |
| executor.execute(r); |
| } |
| } |
| |
| protected void memberDisappeared(Member member) { |
| membership.removeMember(member); |
| Runnable r = () -> { |
| Thread currentThread = Thread.currentThread(); |
| String name = currentThread.getName(); |
| try { |
| currentThread.setName("StaticMembership-memberDisappeared"); |
| membershipListener.memberDisappeared(member); |
| } finally { |
| currentThread.setName(name); |
| } |
| }; |
| executor.execute(r); |
| } |
| |
| protected void memberAlive(Member member) { |
| if (!membership.contains(member)) { |
| memberAdded(member); |
| } |
| membership.memberAlive(member); |
| } |
| |
| protected void stopMembership(Member[] members) { |
| if (members.length == 0) { |
| return; |
| } |
| Member localmember = service.getLocalMember(false); |
| localmember.setCommand(Member.SHUTDOWN_PAYLOAD); |
| MemberMessage msg = new MemberMessage(membershipId, MemberMessage.MSG_STOP, localmember); |
| try { |
| channel.send(members, msg, sendOptions); |
| } catch (ChannelException e) { |
| log.error(sm.getString("staticMembershipProvider.stopMembership.sendFailed"), e); |
| } |
| } |
| |
| @Override |
| public void messageReceived(Serializable msg, Member sender) { |
| MemberMessage memMsg = (MemberMessage) msg; |
| Member member = memMsg.getMember(); |
| if (memMsg.getMsgtype() == MemberMessage.MSG_START) { |
| memberAdded(member); |
| } else if (memMsg.getMsgtype() == MemberMessage.MSG_STOP) { |
| memberDisappeared(member); |
| } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) { |
| memberAlive(member); |
| } |
| } |
| |
| @Override |
| public boolean accept(Serializable msg, Member sender) { |
| boolean result = false; |
| if (msg instanceof MemberMessage) { |
| result = Arrays.equals(this.membershipId, ((MemberMessage) msg).getMembershipId()); |
| } |
| return result; |
| } |
| |
| @Override |
| public Serializable replyRequest(Serializable msg, final Member sender) { |
| if (!(msg instanceof MemberMessage memMsg)) { |
| return null; |
| } |
| if (memMsg.getMsgtype() == MemberMessage.MSG_START) { |
| messageReceived(memMsg, sender); |
| memMsg.setMember(service.getLocalMember(true)); |
| return memMsg; |
| } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) { |
| messageReceived(memMsg, sender); |
| memMsg.setMember(service.getLocalMember(true)); |
| return memMsg; |
| } else { |
| // other messages are ignored. |
| if (log.isInfoEnabled()) { |
| log.info(sm.getString("staticMembershipProvider.replyRequest.ignored", memMsg.getTypeDesc())); |
| } |
| return null; |
| } |
| } |
| |
| @Override |
| public void leftOver(Serializable msg, Member sender) { |
| if (!(msg instanceof MemberMessage memMsg)) { |
| return; |
| } |
| if (memMsg.getMsgtype() == MemberMessage.MSG_START) { |
| messageReceived(memMsg, sender); |
| } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) { |
| messageReceived(memMsg, sender); |
| } else { |
| // other messages are ignored. |
| if (log.isInfoEnabled()) { |
| log.info(sm.getString("staticMembershipProvider.leftOver.ignored", memMsg.getTypeDesc())); |
| } |
| } |
| } |
| |
| @Override |
| public void heartbeat() { |
| try { |
| if (!useThread) { |
| ping(); |
| } |
| } catch (ChannelException e) { |
| log.warn(sm.getString("staticMembershipProvider.heartbeat.failed"), e); |
| } |
| } |
| |
| protected void ping() throws ChannelException { |
| // send ping |
| Member[] members = getAliveMembers(staticMembers.toArray(new Member[0])); |
| if (members.length > 0) { |
| try { |
| MemberMessage msg = |
| new MemberMessage(membershipId, MemberMessage.MSG_PING, service.getLocalMember(true)); |
| Response[] resp = rpcChannel.send(members, msg, RpcChannel.ALL_REPLY, sendOptions, rpcTimeout); |
| for (Response response : resp) { |
| messageReceived(response.getMessage(), response.getSource()); |
| } |
| } catch (ChannelException ce) { |
| // Handle known failed members |
| FaultyMember[] faultyMembers = ce.getFaultyMembers(); |
| for (FaultyMember faultyMember : faultyMembers) { |
| memberDisappeared(faultyMember.getMember()); |
| } |
| throw ce; |
| } |
| } |
| // expire |
| checkExpired(); |
| } |
| |
| protected void checkExpired() { |
| Member[] expired = membership.expire(expirationTime); |
| for (Member member : expired) { |
| membershipListener.memberDisappeared(member); |
| } |
| } |
| |
| public void setChannel(Channel channel) { |
| this.channel = channel; |
| } |
| |
| public void setStaticMembers(ArrayList<StaticMember> staticMembers) { |
| this.staticMembers = staticMembers; |
| } |
| |
| private Member[] getAliveMembers(Member[] members) { |
| List<Member> aliveMembers = new ArrayList<>(); |
| for (Member member : members) { |
| try (Socket socket = new Socket()) { |
| InetAddress ia = InetAddress.getByAddress(member.getHost()); |
| InetSocketAddress addr = new InetSocketAddress(ia, member.getPort()); |
| socket.connect(addr, connectTimeout); |
| aliveMembers.add(member); |
| } catch (Throwable t) { |
| ExceptionUtils.handleThrowable(t); |
| // Otherwise ignore |
| } |
| } |
| return aliveMembers.toArray(new Member[0]); |
| } |
| |
| // ------------------------------------------------------------------------------ |
| // member message to send to and from other memberships |
| // ------------------------------------------------------------------------------ |
| public static class MemberMessage implements Serializable { |
| @Serial |
| private static final long serialVersionUID = 1L; |
| public static final int MSG_START = 1; |
| public static final int MSG_STOP = 2; |
| public static final int MSG_PING = 3; |
| private final int msgtype; |
| private final byte[] membershipId; |
| private Member member; |
| |
| public MemberMessage(byte[] membershipId, int msgtype, Member member) { |
| this.membershipId = membershipId; |
| this.msgtype = msgtype; |
| this.member = member; |
| } |
| |
| public int getMsgtype() { |
| return msgtype; |
| } |
| |
| public byte[] getMembershipId() { |
| return membershipId; |
| } |
| |
| public Member getMember() { |
| return member; |
| } |
| |
| public void setMember(Member local) { |
| this.member = local; |
| } |
| |
| @Override |
| public String toString() { |
| return "MemberMessage[" + "name=" + new String(membershipId) + "; type=" + getTypeDesc() + "; member=" + |
| member + ']'; |
| } |
| |
| protected String getTypeDesc() { |
| return switch (msgtype) { |
| case MSG_START -> "MSG_START"; |
| case MSG_STOP -> "MSG_STOP"; |
| case MSG_PING -> "MSG_PING"; |
| default -> "UNKNOWN"; |
| }; |
| } |
| } |
| |
| protected class PingThread extends Thread { |
| @Override |
| public void run() { |
| while (running) { |
| try { |
| sleep(pingInterval); |
| ping(); |
| } catch (InterruptedException ix) { |
| // Ignore |
| } catch (Exception e) { |
| log.warn(sm.getString("staticMembershipProvider.pingThread.failed"), e); |
| } |
| } |
| } |
| } |
| } |