blob: 2e22894ea68ce002843eff9057b783b28ec79038 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.membership.gms.fd;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger;
import org.jgroups.util.UUID;
import org.apache.geode.CancelException;
import org.apache.geode.GemFireConfigException;
import org.apache.geode.SystemConnectException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.NetView;
import org.apache.geode.distributed.internal.membership.gms.GMSMember;
import org.apache.geode.distributed.internal.membership.gms.ServiceConfig;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
import org.apache.geode.distributed.internal.membership.gms.messages.FinalCheckPassedMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatRequestMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.SuspectMembersMessage;
import org.apache.geode.distributed.internal.membership.gms.messages.SuspectRequest;
import org.apache.geode.internal.ConnectionWatcher;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.logging.LoggingExecutors;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
/**
* Failure Detection
* <p>
* This class make sure that each member is alive and communicating to this member. To make sure
* that we create the ring of members based on current view. On this ring, each member make sure
* that next-member in ring is communicating with it. For that we record last message timestamp from
* next-member. And if it sees this member has not communicated in last period(member-timeout) then
* we check whether this member is still alive or not. Based on that we informed probable
* coordinators to remove that member from view.
* <p>
* It has {@link #suspect(InternalDistributedMember, String)} api, which can be used to initiate
* suspect processing for any member. First is checks whether the member is responding or not. Then
* it informs probable coordinators to remove that member from view.
* <p>
* It has {@link HealthMonitor#checkIfAvailable(InternalDistributedMember, String, boolean)} api to
* see if that member is
* alive. Then based on removal flag it initiates the suspect processing for that member.
*/
@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "NullableProblems"})
public class GMSHealthMonitor implements HealthMonitor {
private Services services;
private volatile NetView currentView;
private volatile InternalDistributedMember nextNeighbor;
long memberTimeout;
private volatile boolean isStopping = false;
private final AtomicInteger requestId = new AtomicInteger();
/**
* membership logger
*/
private static final Logger logger = Services.getLogger();
/**
* The number of recipients of periodic heartbeats. The recipients will be selected from the
* members that are likely to be monitoring this member.
*/
private static final int NUM_HEARTBEATS = Integer.getInteger("geode.heartbeat-recipients", 2);
/**
* Member activity will be recorded per interval/period. Timer task will set interval's starting
* time. Each interval will be member-timeout/LOGICAL_INTERVAL. LOGICAL_INTERVAL may be configured
* via a system property with a default of 2. At least 1 interval is needed.
*/
public static final int LOGICAL_INTERVAL =
Integer.getInteger("geode.logical-message-received-interval", 2);
/**
* stall time to wait for members leaving concurrently
*/
public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL =
Long.getLong("geode.suspect-member-collection-interval", 200);
private volatile long currentTimeStamp;
/**
* this member's ID
*/
private InternalDistributedMember localAddress;
/**
* Timestamp at which we last had contact from a member
*/
final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps =
new ConcurrentHashMap<>();
/**
* Members currently being suspected and the view they were suspected in
*/
private final ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberIds =
new ConcurrentHashMap<>();
/**
* Members undergoing final checks
*/
private final List<InternalDistributedMember> membersInFinalCheck =
Collections.synchronizedList(new ArrayList<>(30));
/**
* Replies to messages
*/
private final Map<Integer, Response> requestIdVsResponse = new ConcurrentHashMap<>();
/**
* Members suspected in a particular view
*/
private final Map<NetView, Set<SuspectRequest>> suspectRequestsInView = new HashMap<>();
private ScheduledExecutorService scheduler;
private ExecutorService checkExecutor;
/**
* to stop check scheduler
*/
private ScheduledFuture<?> monitorFuture;
/**
* test hook
*/
private volatile boolean playingDead = false;
/**
* test hook
*/
private volatile boolean beingSick = false;
// For TCP check
private ExecutorService serverSocketExecutor;
static final int OK = 0x7B;
static final int ERROR = 0x00;
private volatile int socketPort;
private volatile ServerSocket serverSocket;
/**
* Statistics about health monitor
*/
private DMStats stats;
/**
* Interval to run the Monitor task
*/
private long monitorInterval;
/**
* /**
* this class is to avoid garbage
*/
private static class TimeStamp {
private volatile long timeStamp;
TimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public long getTime() {
return timeStamp;
}
public void setTime(long timeStamp) {
this.timeStamp = timeStamp;
}
}
/***
* This class sets start interval timestamp to record the activity of all members. That is used by
* {@link GMSHealthMonitor#contactedBy(InternalDistributedMember)} to record the activity of
* member.
*
* It initiates the suspect processing for next neighbour if it doesn't see any activity from that
* member in last interval(member-timeout)
*/
private class Monitor implements Runnable {
/**
* Here we use the same threshold for detecting JVM pauses as the StatSampler
*/
private final long MONITOR_DELAY_THRESHOLD =
Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "statSamplerDelayThreshold", 3000);
final long memberTimeoutInMillis;
public Monitor(long memberTimeout) {
memberTimeoutInMillis = memberTimeout;
}
@Override
public void run() {
if (GMSHealthMonitor.this.isStopping) {
return;
}
InternalDistributedMember neighbour = nextNeighbor;
long currentTime = System.currentTimeMillis();
// this is the start of interval to record member activity
GMSHealthMonitor.this.currentTimeStamp = currentTime;
long oldTimeStamp = currentTimeStamp;
currentTimeStamp = System.currentTimeMillis();
NetView myView = GMSHealthMonitor.this.currentView;
if (myView == null) {
return;
}
if (currentTimeStamp - oldTimeStamp > monitorInterval + MONITOR_DELAY_THRESHOLD) {
// delay in running this task - don't suspect anyone for a while
logger.info(
"Failure detector has noticed a JVM pause and is giving all members a heartbeat in view {}",
currentView);
for (InternalDistributedMember member : myView.getMembers()) {
contactedBy(member);
}
return;
}
if (neighbour != null) {
TimeStamp nextNeighborTS;
synchronized (GMSHealthMonitor.this) {
nextNeighborTS = GMSHealthMonitor.this.memberTimeStamps.get(neighbour);
}
if (nextNeighborTS == null) {
TimeStamp customTS = new TimeStamp(currentTime);
memberTimeStamps.put(neighbour, customTS);
return;
}
long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL;
long lastTS = currentTime - nextNeighborTS.getTime();
if (lastTS + interval >= memberTimeoutInMillis) {
logger.debug("Checking member {} ", neighbour);
// now do check request for this member;
checkMember(neighbour);
}
}
}
}
/***
* Check thread waits on this object for response. It puts requestId in requestIdVsResponse map.
* Response will have requestId, which is used to get ResponseObject. Then it is used to notify
* waiting thread.
*/
private class Response {
private DistributionMessage responseMsg;
public DistributionMessage getResponseMsg() {
return responseMsg;
}
public void setResponseMsg(DistributionMessage responseMsg) {
this.responseMsg = responseMsg;
}
}
class ClientSocketHandler implements Runnable {
private final Socket socket;
public ClientSocketHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
socket.setTcpNoDelay(true);
DataInputStream in = new DataInputStream(socket.getInputStream());
OutputStream out = socket.getOutputStream();
@SuppressWarnings("UnusedAssignment")
short version = in.readShort();
int vmViewId = in.readInt();
long uuidLSBs = in.readLong();
long uuidMSBs = in.readLong();
GMSHealthMonitor.this.stats.incFinalCheckRequestsReceived();
GMSHealthMonitor.this.stats.incTcpFinalCheckRequestsReceived();
GMSMember gmbr = (GMSMember) GMSHealthMonitor.this.localAddress.getNetMember();
UUID myUUID = gmbr.getUUID();
// during reconnect or rapid restart we will have a zero viewId but there may still
// be an old ID in the membership view that we do not want to respond to
int myVmViewId = gmbr.getVmViewId();
if (playingDead) {
logger.debug("HealthMonitor: simulating sick member in health check");
} else if (uuidLSBs == myUUID.getLeastSignificantBits()
&& uuidMSBs == myUUID.getMostSignificantBits()
&& (vmViewId == myVmViewId || myVmViewId < 0)) {
logger.debug("HealthMonitor: sending OK reply");
out.write(OK);
out.flush();
socket.shutdownOutput();
GMSHealthMonitor.this.stats.incFinalCheckResponsesSent();
GMSHealthMonitor.this.stats.incTcpFinalCheckResponsesSent();
logger.debug("HealthMonitor: server replied OK.");
} else {
if (logger.isDebugEnabled()) {
logger.debug(
"HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}. My viewID is {} received is {}",
Long.toHexString(myUUID.getMostSignificantBits()),
Long.toHexString(myUUID.getLeastSignificantBits()), Long.toHexString(uuidMSBs),
Long.toHexString(uuidLSBs), myVmViewId, vmViewId);
}
out.write(ERROR);
out.flush();
socket.shutdownOutput();
GMSHealthMonitor.this.stats.incFinalCheckResponsesSent();
GMSHealthMonitor.this.stats.incTcpFinalCheckResponsesSent();
logger.debug("HealthMonitor: server replied ERROR.");
}
} catch (IOException e) {
// this is expected if it is a connection-timeout or other failure
// to connect
} catch (RuntimeException e) {
logger.debug("Unexpected runtime exception", e);
throw e;
} catch (Error e) {
logger.debug("Unexpected error", e);
throw e;
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
// expected if the socket is already closed
}
}
}
}
}
public GMSHealthMonitor() {
}
@SuppressWarnings("EmptyMethod")
public static void loadEmergencyClasses() {}
/*
* Record the member activity for current time interval.
*/
@Override
public void contactedBy(InternalDistributedMember sender) {
contactedBy(sender, currentTimeStamp);
}
/**
* Record member activity at a specified time
*/
private void contactedBy(InternalDistributedMember sender, long timeStamp) {
TimeStamp cTS = new TimeStamp(timeStamp);
cTS = memberTimeStamps.putIfAbsent(sender, cTS);
if (cTS != null && cTS.getTime() < timeStamp) {
cTS.setTime(timeStamp);
}
if (suspectedMemberIds.containsKey(sender)) {
memberUnsuspected(sender);
setNextNeighbor(currentView, null);
}
}
private HeartbeatRequestMessage constructHeartbeatRequestMessage(
final InternalDistributedMember mbr) {
final int reqId = requestId.getAndIncrement();
final HeartbeatRequestMessage hrm = new HeartbeatRequestMessage(mbr, reqId);
hrm.setRecipient(mbr);
return hrm;
}
private void checkMember(final InternalDistributedMember mbr) {
final NetView cv = GMSHealthMonitor.this.currentView;
// as check may take time
setNextNeighbor(cv, mbr);
// we need to check this member
checkExecutor.execute(() -> {
boolean pinged;
try {
pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
} catch (CancelException e) {
return;
}
if (!pinged) {
String reason = "Member isn't responding to heartbeat requests";
memberSuspected(localAddress, mbr, reason);
initiateSuspicion(mbr, reason);
setNextNeighbor(currentView, null);
} else {
logger.trace("Setting next neighbor as member {} has responded.", mbr);
memberUnsuspected(mbr);
// back to previous one
setNextNeighbor(currentView, null);
}
});
}
private void initiateSuspicion(InternalDistributedMember mbr, String reason) {
if (services.getJoinLeave().isMemberLeaving(mbr)) {
return;
}
sendSuspectRequest(Collections.singletonList(new SuspectRequest(mbr, reason)));
}
/**
* This method sends heartbeat request to other member and waits for member-timeout time for
* response. If it doesn't see response then it returns false.
*/
private boolean doCheckMember(InternalDistributedMember member, boolean waitForResponse) {
if (playingDead || beingSick) {
// a member playingDead should not be sending messages to other
// members, so we avoid sending heartbeat requests or suspect
// messages by returning true.
return true;
}
long startTime = System.currentTimeMillis();
logger.debug("Requesting heartbeat from {}", member);
final HeartbeatRequestMessage hrm = constructHeartbeatRequestMessage(member);
Response pingResp = null;
if (waitForResponse) {
pingResp = new Response();
requestIdVsResponse.put(hrm.getRequestId(), pingResp);
} else {
hrm.clearRequestId();
}
try {
Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(hrm);
this.stats.incHeartbeatRequestsSent();
if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(member)) {
// member is not part of current view.
logger.trace("Member {} is not part of current view.", member);
} else if (waitForResponse) {
synchronized (pingResp) {
if (pingResp.getResponseMsg() == null) {
pingResp.wait(memberTimeout);
}
TimeStamp ts = memberTimeStamps.get(member);
if (ts != null && ts.getTime() > startTime) {
return true;
}
if (pingResp.getResponseMsg() == null) {
if (isStopping) {
return true;
}
logger.debug("no heartbeat response received from {} and no recent activity", member);
return false;
} else {
logger.trace("received heartbeat from {}", member);
this.stats.incHeartbeatsReceived();
if (ts != null) {
ts.setTime(System.currentTimeMillis());
}
return true;
}
}
}
} catch (InterruptedException e) {
logger.debug(
"GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .",
member);
} finally {
if (waitForResponse) {
requestIdVsResponse.remove(hrm.getRequestId());
}
}
return false;
}
/**
* During final check, establish TCP connection between current member and suspect member. And
* exchange PING/PONG message to see if the suspect member is still alive.
*
* @param suspectMember member that does not respond to HeartbeatRequestMessage
* @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
*/
boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port,
boolean retryIfConnectFails) {
Socket clientSocket = null;
// make sure we try to check on the member for the contracted memberTimeout period
// in case a timed socket.connect() returns immediately
long giveupTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(
services.getConfig().getMemberTimeout(), TimeUnit.MILLISECONDS);
boolean passed = false;
int iteration = 0;
do {
iteration++;
if (iteration > 1) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
try {
logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember,
suspectMember.getInetAddress(), port);
clientSocket =
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER)
.connect(suspectMember.getInetAddress(), port, (int) memberTimeout,
new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false);
clientSocket.setTcpNoDelay(true);
passed = doTCPCheckMember(suspectMember, clientSocket);
} catch (IOException e) {
// this is expected if it is a connection-timeout or other failure
// to connect
} catch (IllegalStateException | GemFireConfigException e) {
if (!isStopping) {
logger.trace("Unexpected exception", e);
}
} finally {
try {
if (clientSocket != null) {
clientSocket.setSoLinger(true, 0); // abort the connection
clientSocket.close();
}
} catch (IOException e) {
// expected
}
}
} while (retryIfConnectFails && !passed && !this.isShutdown()
&& System.nanoTime() < giveupTime);
return passed;
}
// Package protected for testing purposes
boolean doTCPCheckMember(InternalDistributedMember suspectMember, Socket clientSocket) {
try {
if (clientSocket.isConnected()) {
clientSocket.setSoTimeout((int) services.getConfig().getMemberTimeout());
InputStream in = clientSocket.getInputStream();
DataOutputStream out = new DataOutputStream(clientSocket.getOutputStream());
GMSMember gmbr = (GMSMember) suspectMember.getNetMember();
writeMemberToStream(gmbr, out);
this.stats.incFinalCheckRequestsSent();
this.stats.incTcpFinalCheckRequestsSent();
logger.debug("Connected to suspect member - reading response");
int b = in.read();
if (logger.isDebugEnabled()) {
logger.debug("Received {}",
(b == OK ? "OK" : (b == ERROR ? "ERROR" : "unknown response: " + b)));
}
if (b >= 0) {
this.stats.incFinalCheckResponsesReceived();
this.stats.incTcpFinalCheckResponsesReceived();
}
if (b == OK) {
TimeStamp ts = memberTimeStamps.get(suspectMember);
if (ts != null) {
ts.setTime(System.currentTimeMillis());
}
return true;
} else {
// received ERROR
return false;
}
} else {// cannot establish TCP connection with suspect member
return false;
}
} catch (SocketTimeoutException e) {
logger.debug("Availability check TCP/IP connection timed out for suspect member {}",
suspectMember);
return false;
} catch (IOException e) {
logger.trace("Unexpected exception", e);
}
return false;
}
void writeMemberToStream(GMSMember gmbr, DataOutputStream out) throws IOException {
out.writeShort(Version.CURRENT_ORDINAL);
out.writeInt(gmbr.getVmViewId());
out.writeLong(gmbr.getUuidLSBs());
out.writeLong(gmbr.getUuidMSBs());
out.flush();
}
@Override
public void suspect(InternalDistributedMember mbr, String reason) {
initiateSuspicion(mbr, reason);
}
@Override
public boolean checkIfAvailable(InternalDistributedMember mbr, String reason,
boolean initiateRemoval) {
return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval,
mbr, reason);
}
@Override
public void start() {
scheduler = LoggingExecutors.newScheduledThreadPool("Geode Failure Detection Scheduler", 1);
checkExecutor = LoggingExecutors.newCachedThreadPool("Geode Failure Detection thread ", true);
Monitor m = this.new Monitor(memberTimeout);
monitorInterval = memberTimeout / LOGICAL_INTERVAL;
monitorFuture =
scheduler.scheduleAtFixedRate(m, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
serverSocketExecutor =
LoggingExecutors.newCachedThreadPool("Geode Failure Detection Server thread ", true);
}
ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) {
ServerSocket serverSocket;
try {
serverSocket = SocketCreatorFactory
.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER)
.createServerSocketUsingPortRange(socketAddress, 50/* backlog */, true/* isBindAddress */,
false/* useNIO */, 65536/* tcpBufferSize */, portRange, false);
socketPort = serverSocket.getLocalPort();
} catch (IOException | SystemConnectException e) {
throw new GemFireConfigException(
"Unable to allocate a failure detection port in the membership-port range", e);
}
return serverSocket;
}
/**
* start the thread that listens for tcp/ip connections and responds to connection attempts
*/
private void startTcpServer(ServerSocket ssocket) {
// allocate a socket here so there are no race conditions between knowing the FD
// socket port and joining the system
serverSocketExecutor.execute(() -> {
logger.info("Started failure detection server thread on {}:{}.", ssocket.getInetAddress(),
socketPort);
Socket socket = null;
try {
while (!services.getCancelCriterion().isCancelInProgress()
&& !GMSHealthMonitor.this.isStopping) {
try {
socket = ssocket.accept();
if (GMSHealthMonitor.this.playingDead) {
continue;
}
serverSocketExecutor.execute(new ClientSocketHandler(socket));
} catch (RejectedExecutionException e) {
// this can happen during shutdown
} catch (IOException e) {
if (!isStopping) {
logger.trace("Unexpected exception", e);
}
try {
if (socket != null) {
socket.close();
}
} catch (IOException ioe) {
logger.trace("Unexpected exception", ioe);
}
}
}
logger.info("GMSHealthMonitor server thread exiting");
} finally {
// close the server socket
if (!ssocket.isClosed()) {
try {
ssocket.close();
} catch (IOException e) {
logger.debug("Unexpected exception", e);
}
}
}
});
}
/**
* start the thread that periodically sends a message to processes that might be watching this
* process
*/
private void startHeartbeatThread() {
checkExecutor.execute(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("Geode Heartbeat Sender");
sendPeriodicHeartbeats();
}
private void sendPeriodicHeartbeats() {
while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
try {
Thread.sleep(memberTimeout / LOGICAL_INTERVAL);
} catch (InterruptedException e) {
return;
}
NetView v = currentView;
if (v != null) {
List<InternalDistributedMember> mbrs = v.getMembers();
int index = mbrs.indexOf(localAddress);
if (index < 0 || mbrs.size() < 2) {
continue;
}
if (!playingDead) {
sendHeartbeats(mbrs, index);
}
}
}
}
private void sendHeartbeats(List<InternalDistributedMember> mbrs, int startIndex) {
InternalDistributedMember coordinator = currentView.getCoordinator();
if (coordinator != null && !coordinator.equals(localAddress)) {
HeartbeatMessage message = new HeartbeatMessage(-1);
message.setRecipient(coordinator);
try {
if (isStopping) {
return;
}
services.getMessenger().sendUnreliably(message);
GMSHealthMonitor.this.stats.incHeartbeatsSent();
} catch (CancelException e) {
return;
}
}
int index = startIndex;
int numSent = 0;
for (;;) {
index--;
if (index < 0) {
index = mbrs.size() - 1;
}
InternalDistributedMember mbr = mbrs.get(index);
if (mbr.equals(localAddress)) {
break;
}
if (mbr.equals(coordinator)) {
continue;
}
if (isStopping) {
return;
}
HeartbeatMessage message = new HeartbeatMessage(-1);
message.setRecipient(mbr);
try {
services.getMessenger().sendUnreliably(message);
GMSHealthMonitor.this.stats.incHeartbeatsSent();
numSent++;
if (numSent >= NUM_HEARTBEATS) {
break;
}
} catch (CancelException e) {
return;
}
}
} // for (;;)
});
}
@Override
public synchronized void installView(NetView newView) {
synchronized (suspectRequestsInView) {
suspectRequestsInView.clear();
}
for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator(); it
.hasNext();) {
if (!newView.contains(it.next())) {
it.remove();
}
}
for (Iterator<InternalDistributedMember> it = suspectedMemberIds.keySet().iterator(); it
.hasNext();) {
if (!newView.contains(it.next())) {
it.remove();
}
}
currentView = newView;
setNextNeighbor(newView, null);
}
/**
* this method is primarily for tests. The current view should be pulled from JoinLeave or the
* MembershipManager (which includes surprise members)
*/
public synchronized NetView getView() {
return currentView;
}
/***
* This method sets next neighbour which it needs to watch in current view.
*
* if nextTo == null then it watches member next to it.
*
* It becomes null when we suspect current neighbour, during that time it watches member next to
* suspect member.
*/
protected synchronized void setNextNeighbor(NetView newView, InternalDistributedMember nextTo) {
if (newView == null) {
return;
}
if (nextTo == null) {
nextTo = localAddress;
}
List<InternalDistributedMember> allMembers = newView.getMembers();
if (allMembers.size() > 1 && suspectedMemberIds.size() >= allMembers.size() - 1) {
boolean nonSuspectFound = false;
for (InternalDistributedMember member : allMembers) {
if (member.equals(localAddress)) {
continue;
}
if (!suspectedMemberIds.containsKey(member)) {
nonSuspectFound = true;
break;
}
}
if (!nonSuspectFound) {
logger.info("All other members are suspect at this point");
nextNeighbor = null;
return;
}
}
int index = allMembers.indexOf(nextTo);
if (index != -1) {
int nextNeighborIndex = (index + 1) % allMembers.size();
InternalDistributedMember newNeighbor = allMembers.get(nextNeighborIndex);
if (suspectedMemberIds.containsKey(newNeighbor)) {
setNextNeighbor(newView, newNeighbor);
return;
}
InternalDistributedMember oldNeighbor = nextNeighbor;
if (oldNeighbor != newNeighbor) {
logger.debug("Failure detection is now watching " + newNeighbor);
nextNeighbor = newNeighbor;
}
}
if (nextNeighbor != null && nextNeighbor.equals(localAddress)) {
if (logger.isDebugEnabled()) {
logger.debug("Health monitor is unable to find a neighbor to watch. "
+ "Current suspects are {}", suspectedMemberIds);
}
nextNeighbor = null;
}
}
/** test method */
public InternalDistributedMember getNextNeighbor() {
return nextNeighbor;
}
@Override
public void init(Services s) {
isStopping = false;
services = s;
memberTimeout = s.getConfig().getMemberTimeout();
this.stats = services.getStatistics();
services.getMessenger().addHandler(HeartbeatRequestMessage.class,
this::processMessage);
services.getMessenger().addHandler(HeartbeatMessage.class,
this::processMessage);
services.getMessenger().addHandler(SuspectMembersMessage.class,
this::processMessage);
services.getMessenger().addHandler(FinalCheckPassedMessage.class,
this::processMessage);
}
@Override
public void started() {
setLocalAddress(services.getMessenger().getMemberID());
serverSocket = createServerSocket(localAddress.getInetAddress(),
services.getConfig().getMembershipPortRange());
startTcpServer(serverSocket);
startHeartbeatThread();
}
@Override
public void stop() {
stopServices();
}
private void stopServices() {
logger.debug("Stopping HealthMonitor");
isStopping = true;
if (monitorFuture != null) {
monitorFuture.cancel(true);
}
if (scheduler != null) {
scheduler.shutdown();
}
Collection<Response> val = requestIdVsResponse.values();
for (Response r : val) {
synchronized (r) {
r.notify();
}
}
if (checkExecutor != null) {
checkExecutor.shutdown();
}
stopServer();
}
void stopServer() {
if (serverSocketExecutor != null) {
if (serverSocket != null && !serverSocket.isClosed()) {
try {
serverSocket.close();
} catch (IOException e) {
logger.trace("Unexpected exception", e);
}
}
serverSocketExecutor.shutdownNow();
try {
serverSocketExecutor.awaitTermination(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/***
* test method
*/
public boolean isShutdown() {
return scheduler.isShutdown() && checkExecutor.isShutdown()
&& serverSocketExecutor.isShutdown();
}
/**
* Test method - check to see if a member is under suspicion
*/
public boolean isSuspectMember(InternalDistributedMember m) {
return this.suspectedMemberIds.containsKey(m);
}
@Override
public void stopped() {
}
@Override
public void memberSuspected(InternalDistributedMember initiator,
InternalDistributedMember suspect, String reason) {
synchronized (suspectRequestsInView) {
suspectedMemberIds.put(suspect, currentView);
Collection<SuspectRequest> requests = suspectRequestsInView.get(currentView);
boolean found = false;
if (requests == null) {
requests = new HashSet<>();
requests.add(new SuspectRequest(suspect, reason));
}
for (SuspectRequest request : requests) {
if (suspect.equals(request.getSuspectMember())) {
found = true;
break;
}
}
if (!found) {
requests.add(new SuspectRequest(suspect, reason));
}
}
}
private void memberUnsuspected(InternalDistributedMember mbr) {
synchronized (suspectRequestsInView) {
if (suspectedMemberIds.remove(mbr) != null) {
logger.info("No longer suspecting {}", mbr);
}
Collection<SuspectRequest> suspectRequests = suspectRequestsInView.get(currentView);
if (suspectRequests != null) {
Collection<SuspectRequest> removals = new ArrayList<>(suspectRequests.size());
for (SuspectRequest suspectRequest : suspectRequests) {
if (mbr.equals(suspectRequest.getSuspectMember())) {
removals.add(suspectRequest);
}
}
suspectRequests.removeAll(removals);
}
}
}
@Override
public void beSick() {
this.beingSick = true;
}
@Override
public void playDead() {
this.playingDead = true;
}
@Override
public void beHealthy() {
this.beingSick = false;
this.playingDead = false;
}
@Override
public void emergencyClose() {
stopServices();
}
@Override
public void setLocalAddress(InternalDistributedMember idm) {
this.localAddress = idm;
}
void processMessage(HeartbeatRequestMessage m) {
if (isStopping) {
return;
}
if (beingSick || playingDead) {
logger.debug("sick member is ignoring check request");
return;
}
this.stats.incHeartbeatRequestsReceived();
if (this.isStopping) {
return;
}
// only respond if the intended recipient is this member
InternalDistributedMember me = localAddress;
if (me == null || me.getVmViewId() >= 0 && m.getTarget().equals(me)) {
HeartbeatMessage hm = new HeartbeatMessage(m.getRequestId());
hm.setRecipient(m.getSender());
Set<InternalDistributedMember> membersNotReceivedMsg = services.getMessenger().send(hm);
this.stats.incHeartbeatsSent();
if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(m.getSender())) {
logger.debug("Unable to send heartbeat to member: {}", m.getSender());
}
} else {
logger.debug("Ignoring heartbeat request intended for {}. My ID is {}", m.getTarget(), me);
}
}
void processMessage(HeartbeatMessage m) {
if (isStopping) {
return;
}
if (beingSick || playingDead) {
logger.debug("sick member is ignoring check response");
return;
}
this.stats.incHeartbeatsReceived();
if (m.getRequestId() >= 0) {
Response resp = requestIdVsResponse.get(m.getRequestId());
logger.trace("Got heartbeat from member {}. {}", m.getSender(),
(resp != null ? "Check thread still waiting" : "Check thread is not waiting"));
if (resp != null) {
synchronized (resp) {
resp.setResponseMsg(m);
resp.notify();
}
}
}
// we got heartbeat lets update timestamp
contactedBy(m.getSender(), System.currentTimeMillis());
}
/**
* Process a Suspect request from another member. This may cause this member to become the new
* membership coordinator. it will to final check on that member and then it will send remove
* request for that member
*/
void processMessage(SuspectMembersMessage incomingRequest) {
if (isStopping) {
return;
}
if (beingSick || playingDead) {
logger.debug("sick member is ignoring suspect message");
return;
}
this.stats.incSuspectsReceived();
NetView cv = currentView;
if (cv == null) {
return;
}
List<SuspectRequest> suspectRequests = incomingRequest.getMembers();
InternalDistributedMember sender = incomingRequest.getSender();
int viewId = sender.getVmViewId();
if (cv.getViewId() >= viewId && !cv.contains(incomingRequest.getSender())) {
logger.info("Membership ignoring suspect request for " + incomingRequest + " from non-member "
+ incomingRequest.getSender());
services.getJoinLeave().remove(sender,
"this process is initiating suspect processing but is no longer a member");
return;
}
// take care of any suspicion of this member by sending a heartbeat back
for (Iterator<SuspectRequest> it = incomingRequest.getMembers().iterator(); it.hasNext();) {
SuspectRequest req = it.next();
if (req.getSuspectMember().equals(localAddress)) {
HeartbeatMessage message = new HeartbeatMessage(-1);
message.setRecipient(sender);
try {
services.getMessenger().send(message);
this.stats.incHeartbeatsSent();
it.remove();
} catch (CancelException e) {
return;
}
}
}
logger.debug(
"Processing suspect requests {}\nproposed view is currently {}\nwith coordinator {}",
suspectRequests, cv, cv.getCoordinator());
if (cv.getCoordinator().equals(localAddress)) {
// This process is the membership coordinator and should perform a final check
logSuspectRequests(incomingRequest, sender);
checkIfAvailable(sender, suspectRequests, cv);
} else {
// Another process has raised suspicion - check to see if
// this process should become the membership coordinator if
// all current suspects are gone
NetView check = new NetView(cv, cv.getViewId() + 1);
ArrayList<SuspectRequest> membersToCheck = new ArrayList<>();
synchronized (suspectRequestsInView) {
recordSuspectRequests(suspectRequests, cv);
Set<SuspectRequest> suspectsInView = suspectRequestsInView.get(cv);
logger.debug("Current suspects are {}", suspectsInView);
for (final SuspectRequest sr : suspectsInView) {
check.remove(sr.getSuspectMember());
membersToCheck.add(sr);
}
}
List membersLeaving = new ArrayList();
for (InternalDistributedMember member : cv.getMembers()) {
if (services.getJoinLeave().isMemberLeaving(member)) {
membersLeaving.add(member);
}
}
if (!membersLeaving.isEmpty()) {
logger.debug("Current leave requests are {}", membersLeaving);
check.removeAll(membersLeaving);
}
logger.trace(
"Proposed view with suspects & leaving members removed is {}\nwith coordinator {}\nmy address is {}",
check,
check.getCoordinator(), localAddress);
InternalDistributedMember coordinator = check.getCoordinator();
if (coordinator != null && coordinator.equals(localAddress)) {
// new coordinator
logSuspectRequests(incomingRequest, sender);
checkIfAvailable(sender, membersToCheck, cv);
}
}
}
void processMessage(FinalCheckPassedMessage m) {
if (isStopping) {
return;
}
contactedBy(m.getSuspect());
}
private void logSuspectRequests(SuspectMembersMessage incomingRequest,
InternalDistributedMember sender) {
for (SuspectRequest req : incomingRequest.getMembers()) {
String who = sender.equals(localAddress) ? "myself" : sender.toString();
logger.info("received suspect message from {} for {}: {}", who, req.getSuspectMember(),
req.getReason());
}
}
/***
* This method make sure that records suspectRequest. We need to make sure this on preferred
* coordinators, as elder coordinator might be in suspected list next.
*/
private void recordSuspectRequests(List<SuspectRequest> suspectRequests, NetView cv) {
// record suspect requests
Set<SuspectRequest> suspectedMembers;
synchronized (suspectRequestsInView) {
suspectedMembers = suspectRequestsInView.get(cv);
if (suspectedMembers == null) {
suspectedMembers = new HashSet<>();
suspectRequestsInView.put(cv, suspectedMembers);
}
suspectedMembers.addAll(suspectRequests);
}
}
/**
* performs a "final" health check on the member. If failure-detection socket information is
* available for the member (in the view) then we attempt to connect to its socket and ask if it's
* the expected member. Otherwise we send a heartbeat request and wait for a reply.
*/
private void checkIfAvailable(final InternalDistributedMember initiator,
List<SuspectRequest> sMembers, final NetView cv) {
for (final SuspectRequest sr : sMembers) {
final InternalDistributedMember mbr = sr.getSuspectMember();
if (!cv.contains(mbr) || membersInFinalCheck.contains(mbr)) {
continue;
}
if (mbr.equals(localAddress)) {
continue;// self
}
final String reason = sr.getReason();
logger.debug("Scheduling availability check for member {}; reason={}", mbr, reason);
// its a coordinator
checkExecutor.execute(() -> {
try {
inlineCheckIfAvailable(initiator, cv, true, mbr, reason);
} catch (CancelException e) {
// shutting down
} catch (Exception e) {
logger.info("Unexpected exception while verifying member", e);
}
});
}
}
protected boolean inlineCheckIfAvailable(final InternalDistributedMember initiator,
final NetView cv, boolean forceRemovalIfCheckFails, final InternalDistributedMember mbr,
final String reason) {
if (services.getJoinLeave().isMemberLeaving(mbr)) {
return false;
}
boolean failed = false;
logger.info("Performing availability check for suspect member {} reason={}", mbr, reason);
membersInFinalCheck.add(mbr);
setNextNeighbor(currentView, mbr);
try {
services.memberSuspected(initiator, mbr, reason);
long startTime = System.currentTimeMillis();
// for some reason we used to update the timestamp for the member
// with the startTime, but we don't want to do that because it looks
// like a heartbeat has been received
boolean pinged;
int port = cv.getFailureDetectionPort(mbr);
if (port <= 0) {
logger.info("Unable to locate failure detection port - requesting a heartbeat");
if (logger.isDebugEnabled()) {
logger.debug("\ncurrent view: {}\nports: {}", cv,
Arrays.toString(cv.getFailureDetectionPorts()));
}
pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();
GMSHealthMonitor.this.stats.incUdpFinalCheckRequestsSent();
if (pinged) {
GMSHealthMonitor.this.stats.incFinalCheckResponsesReceived();
GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived();
}
} else {
// this will just send heartbeat request, it will not wait for response
// if we will get heartbeat then it will change the timestamp, which we are
// checking below in case of tcp check failure..
doCheckMember(mbr, false);
// now, while waiting for a heartbeat, try connecting to the suspect's failure detection
// port
final boolean retryIfConnectFails = forceRemovalIfCheckFails;
pinged = doTCPCheckMember(mbr, port, retryIfConnectFails);
}
if (!pinged && !isStopping) {
failed = true;
TimeStamp ts = memberTimeStamps.get(mbr);
if (ts == null || ts.getTime() < startTime) {
logger.info("Availability check failed for member {}", mbr);
// if the final check fails & this VM is the coordinator we don't need to do another final
// check
if (forceRemovalIfCheckFails) {
logger.info("Requesting removal of suspect member {}", mbr);
services.getJoinLeave().remove(mbr, reason);
// make sure it is still suspected
memberSuspected(localAddress, mbr, reason);
} else {
// if this node can survive an availability check then initiate suspicion about
// the node that failed the availability check
if (doTCPCheckMember(localAddress, this.socketPort, false)) {
membersInFinalCheck.remove(mbr);
// tell peers about this member and then perform another availability check
memberSuspected(localAddress, mbr, reason);
initiateSuspicion(mbr, reason);
SuspectMembersMessage suspectMembersMessage =
new SuspectMembersMessage(Collections.singletonList(localAddress),
Collections
.singletonList(new SuspectRequest(mbr, "failed availability check")));
suspectMembersMessage.setSender(localAddress);
logger.debug("Performing local processing on suspect request");
processMessage(suspectMembersMessage);
} else {
logger.info(
"Self-check for availability failed - will not continue to suspect {} for now",
mbr);
failed = false;
}
}
} else {
logger.info(
"Availability check failed but detected recent message traffic for suspect member "
+ mbr);
failed = false;
}
}
if (!failed) {
if (!isStopping && !initiator.equals(localAddress)
&& initiator.getVersionObject().compareTo(Version.GEODE_1_3_0) >= 0) {
// let the sender know that it's okay to monitor this member again
FinalCheckPassedMessage message = new FinalCheckPassedMessage(initiator, mbr);
services.getMessenger().send(message);
}
logger.info("Availability check passed for suspect member " + mbr);
}
} finally {
if (!failed) {
memberUnsuspected(mbr);
setNextNeighbor(currentView, null);
}
membersInFinalCheck.remove(mbr);
}
return !failed;
}
@Override
public void memberShutdown(DistributedMember mbr, String reason) {}
@Override
public int getFailureDetectionPort() {
return this.socketPort;
}
private void sendSuspectRequest(final List<SuspectRequest> requests) {
logger.debug("Sending suspect request for members {}", requests);
List<InternalDistributedMember> recipients;
if (currentView.size() > ServiceConfig.SMALL_CLUSTER_SIZE) {
HashSet<InternalDistributedMember> filter = new HashSet<>();
for (Enumeration<InternalDistributedMember> e = suspectedMemberIds.keys(); e
.hasMoreElements();) {
filter.add(e.nextElement());
}
filter.addAll(
requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList()));
recipients =
currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(),
ServiceConfig.SMALL_CLUSTER_SIZE + 1);
} else {
recipients = currentView.getMembers();
}
logger.trace("Sending suspect messages to {}", recipients);
SuspectMembersMessage smm = new SuspectMembersMessage(recipients, requests);
smm.setSender(localAddress);
Set<InternalDistributedMember> failedRecipients;
try {
failedRecipients = services.getMessenger().send(smm);
this.stats.incSuspectsSent();
} catch (CancelException e) {
return;
}
if (failedRecipients != null && failedRecipients.size() > 0) {
logger.trace("Unable to send suspect message to {}", failedRecipients);
}
logger.trace("Processing suspect message locally");
processMessage(smm);
}
private static class ConnectTimeoutTask extends TimerTask implements ConnectionWatcher {
final Timer scheduler;
Socket socket;
final long timeout;
ConnectTimeoutTask(Timer scheduler, long timeout) {
this.scheduler = scheduler;
this.timeout = timeout;
}
@Override
public void beforeConnect(Socket socket) {
this.socket = socket;
scheduler.schedule(this, timeout);
}
@Override
public void afterConnect(Socket socket) {
cancel();
}
@Override
public void run() {
try {
if (socket != null) {
socket.close();
}
} catch (IOException e) {
// ignored - nothing useful to do here
}
}
}
public DMStats getStats() {
return this.stats;
}
}