blob: 45e6cc4ad1034412c5b67e5597a24e8f6e2438ad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.CacheClientStatus;
import org.apache.geode.internal.cache.IncomingGatewayStatus;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.TXId;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.cache.tier.ServerSideHandshake;
import org.apache.geode.internal.lang.JavaWorkarounds;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Class <code>ClientHealthMonitor</code> is a server-side singleton that monitors the health of
* clients by looking at their heartbeats. If too much time elapses between heartbeats, the monitor
* determines that the client is dead and interrupts its threads.
*
* @since GemFire 4.2.3
*/
public class ClientHealthMonitor {
private static final Logger logger = LogService.getLogger();
public static final String CLIENT_HEALTH_MONITOR_INTERVAL_PROPERTY =
"geode.client-health-monitor-interval";
/**
* The map of known clients and last time seen.
*/
private ConcurrentMap<ClientProxyMembershipID, AtomicLong> clientHeartbeats =
new ConcurrentHashMap<>();
/**
* The map of known clients and maximum time between pings.
*/
private ConcurrentMap<ClientProxyMembershipID, Integer> clientMaximumTimeBetweenPings =
new ConcurrentHashMap<>();
/**
* THe GemFire <code>Cache</code>
*/
private final InternalCache cache;
public int getMaximumTimeBetweenPings() {
return maximumTimeBetweenPings;
}
private volatile int maximumTimeBetweenPings;
/**
* A thread that validates client connections
*/
private final ClientHealthMonitorThread clientMonitor;
/**
* The singleton <code>CacheClientNotifier</code> instance
*/
@MakeNotStatic
private static ClientHealthMonitor instance;
/**
* Reference count in the event that multiple cache servers are using the health monitor
*/
@MakeNotStatic
private static int refCount = 0;
/**
* The interval between client monitor iterations
*/
private static final long DEFAULT_CLIENT_MONITOR_INTERVAL_IN_MILLIS = 1000;
private final CacheClientNotifierStats stats;
/**
* Used to track the number of handshakes in a VM primary use, license enforcement.
*
* note, these were moved from static fields in ServerConnection so that they will be cleaned up
* when the client health monitor is shutdown.
*/
private final HashMap<ServerSideHandshake, MutableInt> cleanupTable = new HashMap<>();
private final HashMap<ClientProxyMembershipID, MutableInt> cleanupProxyIdTable = new HashMap<>();
/**
* Used to track the connections for a particular client
*/
private final HashMap<ClientProxyMembershipID, ServerConnectionCollection> proxyIdConnections =
new HashMap<>();
/**
* Gives, version-wise, the number of clients connected to the cache servers in this cache, which
* are capable of processing received deltas.
*
* NOTE: It does not necessarily give the actual number of clients per version connected to the
* cache servers in this cache.
*
* @see CacheClientNotifier#addClientProxy(CacheClientProxy)
*/
AtomicIntegerArray numOfClientsPerVersion =
new AtomicIntegerArray(KnownVersion.HIGHEST_VERSION + 1);
public long getMonitorInterval() {
return monitorInterval;
}
private long monitorInterval;
/**
* Factory method to construct or return the singleton <code>ClientHealthMonitor</code> instance.
*
* @param cache The GemFire <code>Cache</code>
* @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
* client has died and interrupting its sockets.
* @return The singleton <code>ClientHealthMonitor</code> instance
*/
public static ClientHealthMonitor getInstance(InternalCache cache, int maximumTimeBetweenPings,
CacheClientNotifierStats stats) {
createInstance(cache, maximumTimeBetweenPings, stats);
return instance;
}
/**
* Factory method to return the singleton <code>ClientHealthMonitor</code> instance.
*
* @return the singleton <code>ClientHealthMonitor</code> instance
*/
public static ClientHealthMonitor getInstance() {
return instance;
}
/**
* Shuts down the singleton <code>ClientHealthMonitor</code> instance.
*/
public static synchronized void shutdownInstance() {
refCount--;
if (instance == null)
return;
if (refCount > 0)
return;
instance.shutdown();
boolean interrupted = false; // Don't clear, let join fail if already interrupted
try {
if (instance.clientMonitor != null) {
instance.clientMonitor.join();
}
} catch (InterruptedException e) {
interrupted = true;
if (logger.isDebugEnabled()) {
logger.debug(":Interrupted joining with the ClientHealthMonitor Thread", e);
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
instance = null;
refCount = 0;
}
public void registerClient(ClientProxyMembershipID proxyID) {
registerClient(proxyID, maximumTimeBetweenPings);
}
/**
* Registers a new client to be monitored.
*
* @param proxyID The id of the client to be registered
*/
public void registerClient(ClientProxyMembershipID proxyID, int maximumTimeBetweenPings) {
if (!clientMaximumTimeBetweenPings.containsKey(proxyID)) {
clientMaximumTimeBetweenPings.putIfAbsent(proxyID, maximumTimeBetweenPings);
}
if (!clientHeartbeats.containsKey(proxyID)) {
if (null == clientHeartbeats.putIfAbsent(proxyID,
new AtomicLong(System.currentTimeMillis()))) {
// don't do this in computeIfAbsent because segment is locked while logging/stats
if (stats != null) {
stats.incClientRegisterRequests();
}
if (logger.isDebugEnabled()) {
logger.debug("ClientHealthMonitor: Registering client with member id {}", proxyID);
}
}
}
}
/**
* Takes care of unregistering from the _clientHeartBeats map.
*
* @param proxyID The id of the client to be unregistered
*/
private void unregisterClient(ClientProxyMembershipID proxyID, boolean clientDisconnectedCleanly,
Throwable clientDisconnectException) {
if (clientHeartbeats.remove(proxyID) != null) {
if (clientDisconnectedCleanly) {
if (logger.isDebugEnabled()) {
logger.debug("ClientHealthMonitor: Unregistering client with member id {}", proxyID);
}
} else {
logger.warn("ClientHealthMonitor: Unregistering client with member id {} due to: {}",
proxyID, clientDisconnectException == null ? "Unknown reason"
: clientDisconnectException.getLocalizedMessage());
}
if (stats != null) {
stats.incClientUnRegisterRequests();
}
expireTXStates(proxyID);
}
clientMaximumTimeBetweenPings.remove(proxyID);
}
/**
* Unregisters a client to be monitored.
*
* @param proxyID The id of the client to be unregistered
* @param acceptor non-null if the call is from a <code>ServerConnection</code> (as opposed to a
* <code>CacheClientProxy</code>).
* @param clientDisconnectedCleanly Whether the client disconnected cleanly or crashed
*/
void unregisterClient(ClientProxyMembershipID proxyID, Acceptor acceptor,
boolean clientDisconnectedCleanly, Throwable clientDisconnectException) {
unregisterClient(proxyID, clientDisconnectedCleanly, clientDisconnectException);
// Unregister any CacheClientProxy instances associated with this member id
// if this method was invoked from a ServerConnection and the client did
// not disconnect cleanly.
if (acceptor != null) {
CacheClientNotifier ccn = acceptor.getCacheClientNotifier();
if (ccn != null) {
try {
ccn.unregisterClient(proxyID, clientDisconnectedCleanly);
} catch (CancelException ignore) {
}
}
}
}
/**
* provide a test hook to track client transactions to be removed
*/
@SuppressWarnings("unused") // do not delete
public Set<TXId> getScheduledToBeRemovedTx() {
final TXManagerImpl txMgr = (TXManagerImpl) cache.getCacheTransactionManager();
return txMgr.getScheduledToBeRemovedTx();
}
/**
* expire the transaction states for the given client. This uses the transactionTimeToLive setting
* that is inherited from the TXManagerImpl. If that setting is non-positive we expire the states
* immediately
*
*/
private void expireTXStates(ClientProxyMembershipID proxyID) {
if (cache.isClosed()) {
return;
}
final TXManagerImpl txMgr = (TXManagerImpl) cache.getCacheTransactionManager();
if (null == txMgr) {
return;
}
final Set<TXId> txIds =
txMgr.getTransactionsForClient((InternalDistributedMember) proxyID.getDistributedMember());
if (!txIds.isEmpty()) {
txMgr.expireDisconnectedClientTransactions(txIds, true);
}
}
public void removeAllConnectionsAndUnregisterClient(ClientProxyMembershipID proxyID,
Throwable t) {
// Remove all connections
cleanupClientThreads(proxyID, false);
unregisterClient(proxyID, false, t);
}
/**
* Adds a <code>ServerConnection</code> to the client's processing threads
*
* @param proxyID The membership id of the client to be updated
* @param connection The thread processing client requests
*/
public ServerConnectionCollection addConnection(ClientProxyMembershipID proxyID,
ServerConnection connection) {
synchronized (proxyIdConnections) {
ServerConnectionCollection collection = getProxyIdCollection(proxyID);
collection.addConnection(connection);
return collection;
}
}
/**
* Removes a <code>ServerConnection</code> from the client's processing threads
*
* @param proxyID The id of the client to be updated
* @param connection The thread processing client requests
*/
void removeConnection(ClientProxyMembershipID proxyID, ServerConnection connection) {
synchronized (proxyIdConnections) {
ServerConnectionCollection collection = proxyIdConnections.get(proxyID);
if (collection != null) {
collection.removeConnection(connection);
if (collection.getConnections().isEmpty()) {
proxyIdConnections.remove(proxyID);
}
}
}
}
/**
* Processes a received ping for a client.
*
* @param proxyID The id of the client from which the ping was received
*/
public void receivedPing(ClientProxyMembershipID proxyID) {
if (clientMonitor == null) {
return;
}
if (logger.isTraceEnabled()) {
logger.trace("ClientHealthMonitor: Received ping from client with member id {}", proxyID);
}
AtomicLong heartbeat = clientHeartbeats.get(proxyID);
if (null == heartbeat) {
registerClient(proxyID, getMaximumTimeBetweenPings(proxyID));
} else {
heartbeat.set(System.currentTimeMillis());
}
}
/**
* Returns modifiable map (changes do not effect this class) of client membershipID to connection
* count. This is different from the map contained in this class as here the key is client
* membershipID & not the the proxyID. It is to be noted that a given client can have multiple
* proxies.
*
* @param filterProxies Set identifying the Connection proxies which should be fetched. These
* ConnectionProxies may be from same client member or different. If it is null this would
* mean to fetch the Connections of all the ConnectionProxy objects.
*/
public Map<String, Object[]> getConnectedClients(Set filterProxies) {
Map<String, Object[]> map = new HashMap<>(); // KEY=proxyID, VALUE=connectionCount (Integer)
synchronized (proxyIdConnections) {
for (Map.Entry<ClientProxyMembershipID, ServerConnectionCollection> entry : proxyIdConnections
.entrySet()) {
// proxyID includes FQDN
ClientProxyMembershipID proxyID = entry.getKey();
if (filterProxies == null || filterProxies.contains(proxyID)) {
String membershipID = null;
Set<ServerConnection> connections = entry.getValue().getConnections();
int socketPort = 0;
InetAddress socketAddress = null;
// Get data from one.
for (ServerConnection sc : connections) {
socketPort = sc.getSocketPort();
socketAddress = sc.getSocketAddress();
membershipID = sc.getMembershipID();
break;
}
int connectionCount = connections.size();
String clientString;
if (socketAddress == null) {
clientString = "client member id=" + membershipID;
} else {
clientString = "host name=" + socketAddress.toString() + " host ip="
+ socketAddress.getHostAddress() + " client port=" + socketPort
+ " client member id=" + membershipID;
}
Object[] data = map.get(membershipID);
if (data == null) {
map.put(membershipID, new Object[] {clientString, connectionCount});
} else {
data[1] = (Integer) data[1] + connectionCount;
}
}
}
}
return map;
}
/**
* This method returns the CacheClientStatus for all the clients that are connected to this
* server. This method returns all clients irrespective of whether subscription is enabled or not.
*
* @return Map of ClientProxyMembershipID against CacheClientStatus objects.
*/
public Map<ClientProxyMembershipID, CacheClientStatus> getStatusForAllClients() {
Map<ClientProxyMembershipID, CacheClientStatus> result = new HashMap<>();
synchronized (proxyIdConnections) {
for (Map.Entry<ClientProxyMembershipID, ServerConnectionCollection> entry : proxyIdConnections
.entrySet()) {
ClientProxyMembershipID proxyID = entry.getKey();
CacheClientStatus cci = new CacheClientStatus(proxyID);
Set<ServerConnection> connections = entry.getValue().getConnections();
if (connections != null) {
String memberId;
for (ServerConnection sc : connections) {
if (sc.isClientServerConnection()) {
// each ServerConnection has the same member id
memberId = sc.getMembershipID();
cci.setMemberId(memberId);
cci.setNumberOfConnections(connections.size());
result.put(proxyID, cci);
break;
}
}
}
}
}
return result;
}
public void fillInClientInfo(Map<ClientProxyMembershipID, CacheClientStatus> allClients) {
// The allClients parameter includes only actual clients (not remote
// gateways). This monitor will include remote gateway connections,
// so weed those out.
synchronized (proxyIdConnections) {
for (Map.Entry<ClientProxyMembershipID, CacheClientStatus> entry : allClients.entrySet()) {
// proxyID includes FQDN
ClientProxyMembershipID proxyID = entry.getKey();
CacheClientStatus cci = entry.getValue();
ServerConnectionCollection collection = proxyIdConnections.get(proxyID);
Set<ServerConnection> connections = collection != null ? collection.getConnections() : null;
if (connections != null) {
String memberId = null;
cci.setNumberOfConnections(connections.size());
List<Integer> socketPorts = new ArrayList<>();
List<InetAddress> socketAddresses = new ArrayList<>();
for (ServerConnection sc : connections) {
socketPorts.add(sc.getSocketPort());
socketAddresses.add(sc.getSocketAddress());
// each ServerConnection has the same member id
memberId = sc.getMembershipID();
}
cci.setMemberId(memberId);
cci.setSocketPorts(socketPorts);
cci.setSocketAddresses(socketAddresses);
}
}
}
}
public Map<String, IncomingGatewayStatus> getConnectedIncomingGateways() {
Map<String, IncomingGatewayStatus> connectedIncomingGateways = new HashMap<>();
synchronized (proxyIdConnections) {
for (Map.Entry<ClientProxyMembershipID, ServerConnectionCollection> entry : proxyIdConnections
.entrySet()) {
ClientProxyMembershipID proxyID = entry.getKey();
Set<ServerConnection> connections = entry.getValue().getConnections();
for (ServerConnection sc : connections) {
if (sc.getCommunicationMode().isWAN()) {
IncomingGatewayStatus status = new IncomingGatewayStatus(proxyID.getDSMembership(),
sc.getSocketAddress(), sc.getSocketPort());
connectedIncomingGateways.put(proxyID.getDSMembership(), status);
}
}
}
}
return connectedIncomingGateways;
}
private boolean cleanupClientThreads(ClientProxyMembershipID proxyID, boolean timedOut) {
boolean result = false;
Set<ServerConnection> serverConnections = null;
synchronized (proxyIdConnections) {
ServerConnectionCollection collection = proxyIdConnections.remove(proxyID);
if (collection != null) {
serverConnections = collection.getConnections();
}
}
{
if (serverConnections != null) {
result = true;
for (ServerConnection serverConnection : serverConnections) {
serverConnection.handleTermination(timedOut);
}
}
}
return result;
}
// This will return true if the proxyID is truly idle (or if no connections are found), or false
// if there was a active connection.
private boolean prepareToTerminateIfNoConnectionIsProcessing(ClientProxyMembershipID proxyID) {
synchronized (proxyIdConnections) {
ServerConnectionCollection collection = proxyIdConnections.get(proxyID);
if (collection == null) {
return true;
}
if (collection.connectionsProcessing.get() == 0) {
collection.isTerminating = true;
return true;
} else {
return false;
}
}
}
private void validateThreads(ClientProxyMembershipID proxyID) {
Set<ServerConnection> serverConnections;
synchronized (proxyIdConnections) {
ServerConnectionCollection collection = proxyIdConnections.get(proxyID);
serverConnections =
collection != null ? new HashSet<>(collection.getConnections()) : Collections.emptySet();
}
// release sync and operation on copy
for (ServerConnection serverConnection : serverConnections) {
if (serverConnection.hasBeenTimedOutOnClient()) {
logger.warn("{} is being terminated because its client timeout of {} has expired.",
serverConnection, serverConnection.getClientReadTimeout());
try {
serverConnection.handleTermination(true);
// Not all the code in a ServerConnection correctly
// handles interrupt. In particular it is possible to be doing
// p2p distribution and to have sent a message to one peer but
// to never send it to another due to interrupt.
// serverConnection.interruptOwner();
} finally {
// Just to be sure we clean it up.
// This call probably isn't needed.
removeConnection(proxyID, serverConnection);
}
}
}
}
/**
* Returns the map of known clients.
*
* @return the map of known clients
*
* Test hook only.
*/
@VisibleForTesting
Map<ClientProxyMembershipID, Long> getClientHeartbeats() {
return clientHeartbeats.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get()));
}
/**
* Shuts down the singleton <code>CacheClientNotifier</code> instance.
*/
protected synchronized void shutdown() {
// Stop the client monitor
if (clientMonitor != null) {
clientMonitor.stopMonitoring();
}
}
/**
* Creates the singleton <code>CacheClientNotifier</code> instance.
*
* @param cache The GemFire <code>Cache</code>
* @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
* client has died and interrupting its sockets
*/
protected static synchronized void createInstance(InternalCache cache,
int maximumTimeBetweenPings, CacheClientNotifierStats stats) {
refCount++;
if (instance != null) {
return;
}
instance = new ClientHealthMonitor(cache, maximumTimeBetweenPings, stats);
}
/**
*
* Constructor.
*
* @param cache The GemFire <code>Cache</code>
* @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
* client has died and interrupting its sockets
*/
private ClientHealthMonitor(InternalCache cache, int maximumTimeBetweenPings,
CacheClientNotifierStats stats) {
// Set the Cache
this.cache = cache;
this.maximumTimeBetweenPings = maximumTimeBetweenPings;
monitorInterval = Long.getLong(CLIENT_HEALTH_MONITOR_INTERVAL_PROPERTY,
DEFAULT_CLIENT_MONITOR_INTERVAL_IN_MILLIS);
logger.debug("Setting monitorInterval to {}", monitorInterval);
if (maximumTimeBetweenPings > 0) {
if (logger.isDebugEnabled()) {
logger.debug("{}: Initializing client health monitor thread", this);
}
clientMonitor = new ClientHealthMonitorThread(maximumTimeBetweenPings);
clientMonitor.start();
} else {
// LOG:CONFIG: changed from config to info
logger.info(
"Client health monitor thread disabled due to maximumTimeBetweenPings setting: {}",
maximumTimeBetweenPings);
clientMonitor = null;
}
this.stats = stats;
}
/**
* Returns a brief description of this <code>ClientHealthMonitor</code>
*
* @since GemFire 5.1
*/
@Override
public String toString() {
return "ClientHealthMonitor@" + Integer.toHexString(System.identityHashCode(this));
}
private ServerConnectionCollection getProxyIdCollection(ClientProxyMembershipID proxyID) {
return JavaWorkarounds.computeIfAbsent(proxyIdConnections, proxyID,
key -> new ServerConnectionCollection());
}
Map<ClientProxyMembershipID, MutableInt> getCleanupProxyIdTable() {
return cleanupProxyIdTable;
}
Map<ServerSideHandshake, MutableInt> getCleanupTable() {
return cleanupTable;
}
private int getNumberOfClientsAtOrAboveVersion(KnownVersion version) {
int number = 0;
for (int i = version.ordinal(); i < numOfClientsPerVersion.length(); i++) {
number += numOfClientsPerVersion.get(i);
}
return number;
}
public boolean hasDeltaClients() {
return getNumberOfClientsAtOrAboveVersion(KnownVersion.GFE_61) > 0;
}
private int getMaximumTimeBetweenPings(ClientProxyMembershipID proxyID) {
return clientMaximumTimeBetweenPings.getOrDefault(proxyID, maximumTimeBetweenPings);
}
/**
* Interface for changing the heartbeat timeout behavior in the ClientHealthMonitorThread, should
* only be used for testing
*/
interface HeartbeatTimeoutCheck {
boolean timedOut(long current, long lastHeartbeat, long interval);
}
@VisibleForTesting
void testUseCustomHeartbeatCheck(HeartbeatTimeoutCheck check) {
clientMonitor.overrideHeartbeatTimeoutCheck(check);
}
/**
* Class <code>ClientHealthMonitorThread</code> is a <code>Thread</code> that verifies all clients
* are still alive.
*/
class ClientHealthMonitorThread extends LoggingThread {
private HeartbeatTimeoutCheck checkHeartbeat = (long currentTime, long lastHeartbeat,
long allowedInterval) -> currentTime - lastHeartbeat > allowedInterval;
void overrideHeartbeatTimeoutCheck(HeartbeatTimeoutCheck newCheck) {
checkHeartbeat = newCheck;
}
/**
* The maximum time allowed between pings before determining the client has died and
* interrupting its sockets.
*/
final int _maximumTimeBetweenPings;
/**
* Whether the monitor is stopped
*/
volatile boolean _isStopped = false;
/**
* Constructor.
*
* @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
* client has died and interrupting its sockets
*/
ClientHealthMonitorThread(int maximumTimeBetweenPings) {
super("ClientHealthMonitor Thread");
// Set the client connection timeout
_maximumTimeBetweenPings = maximumTimeBetweenPings;
// LOG:CONFIG: changed from config to info
logger.info("ClientHealthMonitorThread maximum allowed time between pings: {}",
_maximumTimeBetweenPings);
if (maximumTimeBetweenPings == 0) {
if (logger.isDebugEnabled()) {
logger.debug("zero ping interval detected", new Exception(
"stack trace"));
}
}
}
/**
* Notifies the monitor to stop monitoring.
*/
protected synchronized void stopMonitoring() {
if (logger.isDebugEnabled()) {
logger.debug("{}: Stopping monitoring", ClientHealthMonitor.this);
}
_isStopped = true;
interrupt();
if (logger.isDebugEnabled()) {
logger.debug("{}: Stopped dispatching", ClientHealthMonitor.this);
}
}
/**
* Returns whether the dispatcher is stopped
*
* @return whether the dispatcher is stopped
*/
protected boolean isStopped() {
return _isStopped;
}
/**
* Runs the monitor by iterating the map of clients and testing the latest ping time received
* against the current time.
*/
@Override
public void run() {
if (logger.isDebugEnabled()) {
logger.debug("{}: Beginning to monitor clients", ClientHealthMonitor.this);
}
while (!_isStopped) {
SystemFailure.checkFailure();
try {
Thread.sleep(monitorInterval);
if (logger.isTraceEnabled()) {
logger.trace("Monitoring {} client(s)", getClientHeartbeats().size());
}
// Get the current time
long currentTime = System.currentTimeMillis();
if (logger.isTraceEnabled()) {
logger.trace("{} starting sweep at {}", ClientHealthMonitor.this, currentTime);
}
// Iterate through the clients and verify that they are all still
// alive
for (Map.Entry<ClientProxyMembershipID, Long> entry : getClientHeartbeats().entrySet()) {
ClientProxyMembershipID proxyID = entry.getKey();
// Validate all ServerConnection threads. If a thread has been
// processing a message for more than the socket timeout time,
// close it it since the client will have timed out and resent.
validateThreads(proxyID);
Long latestHeartbeatValue = entry.getValue();
// Compare the current value with the current time if it is not null
// If it is null, that means that the client was just registered
// and has not done a heartbeat yet.
if (latestHeartbeatValue != null) {
long latestHeartbeat = latestHeartbeatValue;
if (logger.isTraceEnabled()) {
logger.trace(
"{} ms have elapsed since the latest heartbeat for client with member id {}",
(currentTime - latestHeartbeat), proxyID);
}
int maximumTimeBetweenPingsForClient = getMaximumTimeBetweenPings(proxyID);
if (checkHeartbeat.timedOut(currentTime, latestHeartbeat,
maximumTimeBetweenPingsForClient)) {
// This client has been idle for too long. Determine whether
// any of its ServerConnection threads are currently processing
// a message. If so, let it go. If not, disconnect it.
if (prepareToTerminateIfNoConnectionIsProcessing(proxyID)) {
if (cleanupClientThreads(proxyID, true)) {
logger.warn(
"Monitoring client with member id {}. It had been {} ms since the latest heartbeat. Max interval is {}. Terminated client.",
entry.getKey(), currentTime - latestHeartbeat,
maximumTimeBetweenPingsForClient);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
"Monitoring client with member id {}. It has been {} ms since the latest heartbeat. This client would have been terminated but at least one of its threads is processing a message.",
entry.getKey(), (currentTime - latestHeartbeat));
}
}
} else {
if (logger.isTraceEnabled()) {
logger.trace(
"Monitoring client with member id {}. It has been {} ms since the latest heartbeat. This client is healthy.",
entry.getKey(), (currentTime - latestHeartbeat));
}
}
}
}
} catch (InterruptedException e) {
// no need to reset the bit; we're exiting
if (_isStopped) {
break;
}
logger.warn("Unexpected interrupt, exiting", e);
break;
} catch (Exception e) {
// An exception occurred while monitoring the clients. If the monitor
// is not stopped, log it and continue processing.
if (!_isStopped) {
logger.fatal(ClientHealthMonitor.this.toString() + ": An unexpected Exception occurred",
e);
}
}
} // while
}
} // ClientHealthMonitorThread
@VisibleForTesting
public static ClientHealthMonitorProvider singletonProvider() {
return ClientHealthMonitor::getInstance;
}
@VisibleForTesting
public static Supplier<ClientHealthMonitor> singletonGetter() {
return ClientHealthMonitor::getInstance;
}
@FunctionalInterface
@VisibleForTesting
public interface ClientHealthMonitorProvider {
ClientHealthMonitor get(InternalCache cache, int maximumTimeBetweenPings,
CacheClientNotifierStats stats);
}
}