blob: e0b5ab8b6c78adee6a3834431eb5d7881e070a17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.CacheClientStatus;
import org.apache.geode.internal.cache.IncomingGatewayStatus;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.TXId;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.concurrent.ConcurrentHashSet;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
/**
* Class <code>ClientHealthMonitor</code> is a server-side singleton that monitors the health of
* clients by looking at their heartbeats. If too much time elapses between heartbeats, the monitor
* determines that the client is dead and interrupts its threads.
*
* @since GemFire 4.2.3
*/
public class ClientHealthMonitor {
private static final Logger logger = LogService.getLogger();
/**
* The map of known clients
*/
protected volatile Map _clientHeartbeats = Collections.EMPTY_MAP;
/**
* An object used to lock the map of known clients
*/
final protected Object _clientHeartbeatsLock = new Object();
/**
* The map of known client threads
*/
final protected Map _clientThreads;
/**
* An object used to lock the map of client threads
*/
final private Object _clientThreadsLock = new Object();
/**
* THe GemFire <code>Cache</code>
*/
private final InternalCache _cache;
/**
* A thread that validates client connections
*/
final private ClientHealthMonitorThread _clientMonitor;
/**
* The singleton <code>CacheClientNotifier</code> instance
*/
static ClientHealthMonitor _instance;
/**
* Reference count in the event that multiple bridge servers are using the health monitor
*/
private static int refCount = 0;
/**
* The interval between client monitor iterations
*/
final protected static long CLIENT_MONITOR_INTERVAL = 1000;
final private CacheClientNotifierStats stats;
/**
* Used to track the number of handshakes in a VM primary use, license enforcement.
*
* note, these were moved from static fields in ServerConnection so that they will be cleaned up
* when the client health monitor is shutdown.
*/
private final HashMap cleanupTable = new HashMap();
private final HashMap cleanupProxyIdTable = new HashMap();
/**
* Gives, version-wise, the number of clients connected to the cache servers in this cache, which
* are capable of processing recieved deltas.
*
* NOTE: It does not necessarily give the actual number of clients per version connected to the
* cache servers in this cache.
*
* @see CacheClientNotifier#addClientProxy(CacheClientProxy)
*/
AtomicIntegerArray numOfClientsPerVersion = new AtomicIntegerArray(Version.HIGHEST_VERSION + 1);
/**
* Factory method to construct or return the singleton <code>ClientHealthMonitor</code> instance.
*
* @param cache The GemFire <code>Cache</code>
* @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
* client has died and interrupting its sockets.
* @return The singleton <code>ClientHealthMonitor</code> instance
*/
public static ClientHealthMonitor getInstance(InternalCache cache, int maximumTimeBetweenPings,
CacheClientNotifierStats stats) {
createInstance(cache, maximumTimeBetweenPings, stats);
return _instance;
}
/**
* Factory method to return the singleton <code>ClientHealthMonitor</code> instance.
*
* @return the singleton <code>ClientHealthMonitor</code> instance
*/
public static ClientHealthMonitor getInstance() {
return _instance;
}
/**
* Shuts down the singleton <code>ClientHealthMonitor</code> instance.
*/
public synchronized static void shutdownInstance() {
refCount--;
if (_instance == null)
return;
if (refCount > 0)
return;
_instance.shutdown();
boolean interrupted = false; // Don't clear, let join fail if already interrupted
try {
if (_instance._clientMonitor != null) {
_instance._clientMonitor.join();
}
} catch (InterruptedException e) {
interrupted = true;
if (logger.isDebugEnabled()) {
logger.debug(":Interrupted joining with the ClientHealthMonitor Thread", e);
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
_instance = null;
refCount = 0;
}
/**
* Registers a new client to be monitored.
*
* @param proxyID The id of the client to be registered
*/
public void registerClient(ClientProxyMembershipID proxyID) {
boolean registerClient = false;
synchronized (_clientHeartbeatsLock) {
Map oldClientHeartbeats = this._clientHeartbeats;
if (!oldClientHeartbeats.containsKey(proxyID)) {
Map newClientHeartbeats = new HashMap(oldClientHeartbeats);
newClientHeartbeats.put(proxyID, Long.valueOf(System.currentTimeMillis()));
this._clientHeartbeats = newClientHeartbeats;
registerClient = true;
}
}
if (registerClient) {
if (this.stats != null) {
this.stats.incClientRegisterRequests();
}
if (logger.isDebugEnabled()) {
logger.debug(LocalizedMessage.create(
LocalizedStrings.ClientHealthMonitor_CLIENTHEALTHMONITOR_REGISTERING_CLIENT_WITH_MEMBER_ID_0,
new Object[] {proxyID}));
}
}
}
/**
* Takes care of unregistering from the _clientHeatBeats map.
*
* @param proxyID The id of the client to be unregistered
*/
private void unregisterClient(ClientProxyMembershipID proxyID, boolean clientDisconnectedCleanly,
Throwable clientDisconnectException) {
boolean unregisterClient = false;
synchronized (_clientHeartbeatsLock) {
Map oldClientHeartbeats = this._clientHeartbeats;
if (oldClientHeartbeats.containsKey(proxyID)) {
unregisterClient = true;
Map newClientHeartbeats = new HashMap(oldClientHeartbeats);
newClientHeartbeats.remove(proxyID);
this._clientHeartbeats = newClientHeartbeats;
}
}
if (unregisterClient) {
if (clientDisconnectedCleanly) {
if (logger.isDebugEnabled()) {
logger.debug(LocalizedMessage.create(
LocalizedStrings.ClientHealthMonitor_CLIENTHEALTHMONITOR_UNREGISTERING_CLIENT_WITH_MEMBER_ID_0,
new Object[] {proxyID}));
}
} else {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ClientHealthMonitor_CLIENTHEALTHMONITOR_UNREGISTERING_CLIENT_WITH_MEMBER_ID_0_DUE_TO_1,
new Object[] {proxyID, clientDisconnectException == null ? "Unknown reason"
: clientDisconnectException.getLocalizedMessage()}));
}
if (this.stats != null) {
this.stats.incClientUnRegisterRequests();
}
expireTXStates(proxyID);
}
}
/**
* Unregisters a client to be monitored.
*
* @param proxyID The id of the client to be unregistered
* @param acceptor non-null if the call is from a <code>ServerConnection</code> (as opposed to a
* <code>CacheClientProxy</code>).
* @param clientDisconnectedCleanly Whether the client disconnected cleanly or crashed
*/
public void unregisterClient(ClientProxyMembershipID proxyID, AcceptorImpl acceptor,
boolean clientDisconnectedCleanly, Throwable clientDisconnectException) {
unregisterClient(proxyID, clientDisconnectedCleanly, clientDisconnectException);
// Unregister any CacheClientProxy instances associated with this member id
// if this method was invoked from a ServerConnection and the client did
// not disconnect cleanly.
if (acceptor != null) {
CacheClientNotifier ccn = acceptor.getCacheClientNotifier();
if (ccn != null) {
try {
ccn.unregisterClient(proxyID, clientDisconnectedCleanly);
} catch (CancelException ignore) {
}
}
}
}
private final Set<TXId> scheduledToBeRemovedTx =
Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "trackScheduledToBeRemovedTx")
? new ConcurrentHashSet<TXId>() : null;
/**
* provide a test hook to track client transactions to be removed
*/
public Set<TXId> getScheduledToBeRemovedTx() {
return scheduledToBeRemovedTx;
}
/**
* expire the transaction states for the given client. This uses the transactionTimeToLive setting
* that is inherited from the TXManagerImpl. If that setting is non-positive we expire the states
* immediately
*
* @param proxyID
*/
private void expireTXStates(ClientProxyMembershipID proxyID) {
final TXManagerImpl txMgr = (TXManagerImpl) this._cache.getCacheTransactionManager();
final Set<TXId> txids =
txMgr.getTransactionsForClient((InternalDistributedMember) proxyID.getDistributedMember());
if (this._cache.isClosed()) {
return;
}
long timeout = txMgr.getTransactionTimeToLive() * 1000;
if (!txids.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("expiring {} transaction contexts for {} timeout={}", txids.size(), proxyID,
timeout / 1000);
}
if (timeout <= 0) {
txMgr.removeTransactions(txids, true);
} else {
if (scheduledToBeRemovedTx != null)
scheduledToBeRemovedTx.addAll(txids);
SystemTimerTask task = new SystemTimerTask() {
@Override
public void run2() {
txMgr.removeTransactions(txids, true);
if (scheduledToBeRemovedTx != null)
scheduledToBeRemovedTx.removeAll(txids);
}
};
this._cache.getCCPTimer().schedule(task, timeout);
}
}
}
public void removeAllConnectionsAndUnregisterClient(ClientProxyMembershipID proxyID,
Throwable t) {
// Remove all connections
cleanupClientThreads(proxyID, false);
unregisterClient(proxyID, false, t);
}
/**
* Adds a <code>ServerConnection</code> to the client's processing threads
*
* @param proxyID The membership id of the client to be updated
* @param connection The thread processing client requests
*/
public void addConnection(ClientProxyMembershipID proxyID, ServerConnection connection) {
// logger.info("ClientHealthMonitor: Adding " + connection + " to
// client with member id " + proxyID);
synchronized (_clientThreadsLock) {
Set serverConnections = (Set) this._clientThreads.get(proxyID);
if (serverConnections == null) {
serverConnections = new HashSet();
this._clientThreads.put(proxyID, serverConnections);
}
serverConnections.add(connection);
// logger.info("ClientHealthMonitor: The client with member id " +
// proxyID + " contains " + serverConnections.size() + " threads");
}
}
/**
* Removes a <code>ServerConnection</code> from the client's processing threads
*
* @param proxyID The id of the client to be updated
* @param connection The thread processing client requests
*/
public void removeConnection(ClientProxyMembershipID proxyID, ServerConnection connection) {
// logger.info("ClientHealthMonitor: Removing " + connection + " from
// client with member id " + proxyID);
synchronized (_clientThreadsLock) {
Set serverConnections = (Set) this._clientThreads.get(proxyID);
if (serverConnections != null) { // fix for bug 35343
serverConnections.remove(connection);
// logger.info("ClientHealthMonitor: The client with member id " +
// proxyID + " contains " + serverConnections.size() + " threads");
if (serverConnections.isEmpty()) {
// logger.info("ClientHealthMonitor: The client with member id "
// + proxyID + " is being removed since it contains 0 threads");
this._clientThreads.remove(proxyID);
}
}
}
}
/**
* Processes a received ping for a client.
*
* @param proxyID The id of the client from which the ping was received
*/
public void receivedPing(ClientProxyMembershipID proxyID) {
if (this._clientMonitor == null) {
return;
}
if (logger.isTraceEnabled()) {
logger.trace("ClientHealthMonitor: Received ping from client with member id {}", proxyID);
}
synchronized (_clientHeartbeatsLock) {
if (!this._clientHeartbeats.containsKey(proxyID)) {
registerClient(proxyID);
} else {
this._clientHeartbeats.put(proxyID, Long.valueOf(System.currentTimeMillis()));
}
}
}
/**
* Returns modifiable map (changes do not effect this class) of client membershipID to connection
* count. This is different from the map contained in this class as here the key is client
* membershipID & not the the proxyID. It is to be noted that a given client can have multiple
* proxies.
*
* @param filterProxies Set identifying the Connection proxies which should be fetched. These
* ConnectionProxies may be from same client member or different. If it is null this would
* mean to fetch the Connections of all the ConnectionProxy objects.
*/
public Map getConnectedClients(Set filterProxies) {
Map map = new HashMap(); // KEY=proxyID, VALUE=connectionCount (Integer)
synchronized (_clientThreadsLock) {
Iterator connectedClients = this._clientThreads.entrySet().iterator();
while (connectedClients.hasNext()) {
Map.Entry entry = (Map.Entry) connectedClients.next();
ClientProxyMembershipID proxyID = (ClientProxyMembershipID) entry.getKey();// proxyID
// includes FQDN
if (filterProxies == null || filterProxies.contains(proxyID)) {
String membershipID = null;
Set connections = (Set) entry.getValue();
int socketPort = 0;
InetAddress socketAddress = null;
/// *
Iterator serverConnections = connections.iterator();
// Get data from one.
while (serverConnections.hasNext()) {
ServerConnection sc = (ServerConnection) serverConnections.next();
socketPort = sc.getSocketPort();
socketAddress = sc.getSocketAddress();
membershipID = sc.getMembershipID();
break;
}
// */
int connectionCount = connections.size();
String clientString = null;
if (socketAddress == null) {
clientString = "client member id=" + membershipID;
} else {
clientString = "host name=" + socketAddress.toString() + " host ip="
+ socketAddress.getHostAddress() + " client port=" + socketPort
+ " client member id=" + membershipID;
}
Object[] data = null;
data = (Object[]) map.get(membershipID);
if (data == null) {
map.put(membershipID, new Object[] {clientString, Integer.valueOf(connectionCount)});
} else {
data[1] = Integer.valueOf(((Integer) data[1]).intValue() + connectionCount);
}
/*
* Note: all client addresses are same... Iterator serverThreads = ((Set)
* entry.getValue()).iterator(); while (serverThreads.hasNext()) { ServerConnection
* connection = (ServerConnection) serverThreads.next(); InetAddress clientAddress =
* connection.getClientAddress(); logger.severe("getConnectedClients: proxyID=" + proxyID
* + " clientAddress=" + clientAddress + " FQDN=" + clientAddress.getCanonicalHostName());
* }
*/
}
}
}
return map;
}
/**
* This method returns the CacheClientStatus for all the clients that are connected to this
* server. This method returns all clients irrespective of whether subscription is enabled or not.
*
* @return Map of ClientProxyMembershipID against CacheClientStatus objects.
*/
public Map getStatusForAllClients() {
Map result = new HashMap();
synchronized (_clientThreadsLock) {
Iterator connectedClients = this._clientThreads.entrySet().iterator();
while (connectedClients.hasNext()) {
Map.Entry entry = (Map.Entry) connectedClients.next();
ClientProxyMembershipID proxyID = (ClientProxyMembershipID) entry.getKey();
CacheClientStatus cci = new CacheClientStatus(proxyID);
Set connections = (Set) this._clientThreads.get(proxyID);
if (connections != null) {
String memberId = null;
Iterator connectionsIterator = connections.iterator();
while (connectionsIterator.hasNext()) {
ServerConnection sc = (ServerConnection) connectionsIterator.next();
byte communicationMode = sc.getCommunicationMode();
/* Check for all modes that could be used for Client-Server communication */
if (communicationMode == Acceptor.CLIENT_TO_SERVER
|| communicationMode == Acceptor.PRIMARY_SERVER_TO_CLIENT
|| communicationMode == Acceptor.SECONDARY_SERVER_TO_CLIENT
|| communicationMode == Acceptor.CLIENT_TO_SERVER_FOR_QUEUE) {
memberId = sc.getMembershipID(); // each ServerConnection has the same member id
cci.setMemberId(memberId);
cci.setNumberOfConnections(connections.size());
result.put(proxyID, cci);
break;
}
}
}
}
}
return result;
}
public void fillInClientInfo(Map allClients) {
// The allClients parameter includes only actual clients (not remote
// gateways). This monitor will include remote gateway connections,
// so weed those out.
synchronized (_clientThreadsLock) {
Iterator allClientsIterator = allClients.entrySet().iterator();
while (allClientsIterator.hasNext()) {
Map.Entry entry = (Map.Entry) allClientsIterator.next();
ClientProxyMembershipID proxyID = (ClientProxyMembershipID) entry.getKey();// proxyID
// includes FQDN
CacheClientStatus cci = (CacheClientStatus) entry.getValue();
Set connections = (Set) this._clientThreads.get(proxyID);
if (connections != null) {
String memberId = null;
cci.setNumberOfConnections(connections.size());
List socketPorts = new ArrayList();
List socketAddresses = new ArrayList();
Iterator connectionsIterator = connections.iterator();
while (connectionsIterator.hasNext()) {
ServerConnection sc = (ServerConnection) connectionsIterator.next();
socketPorts.add(Integer.valueOf(sc.getSocketPort()));
socketAddresses.add(sc.getSocketAddress());
memberId = sc.getMembershipID(); // each ServerConnection has the
// same member id
}
cci.setMemberId(memberId);
cci.setSocketPorts(socketPorts);
cci.setSocketAddresses(socketAddresses);
}
}
}
}
public Map getConnectedIncomingGateways() {
Map connectedIncomingGateways = new HashMap();
synchronized (_clientThreadsLock) {
Iterator connectedClients = this._clientThreads.entrySet().iterator();
while (connectedClients.hasNext()) {
Map.Entry entry = (Map.Entry) connectedClients.next();
ClientProxyMembershipID proxyID = (ClientProxyMembershipID) entry.getKey();
Set connections = (Set) entry.getValue();
Iterator connectionsIterator = connections.iterator();
while (connectionsIterator.hasNext()) {
ServerConnection sc = (ServerConnection) connectionsIterator.next();
if (sc.getCommunicationMode() == Acceptor.GATEWAY_TO_GATEWAY) {
IncomingGatewayStatus status = new IncomingGatewayStatus(proxyID.getDSMembership(),
sc.getSocketAddress(), sc.getSocketPort());
connectedIncomingGateways.put(proxyID.getDSMembership(), status);
}
}
}
}
return connectedIncomingGateways;
}
protected boolean cleanupClientThreads(ClientProxyMembershipID proxyID, boolean timedOut) {
boolean result = false;
Set serverConnections = null;
synchronized (this._clientThreadsLock) {
serverConnections = (Set) this._clientThreads.remove(proxyID);
// It is ok to modify the set after releasing the sync
// because it has been removed from the map while holding
// the sync.
} // end sync here to fix bug 37576 and 36740
{
if (serverConnections != null) { // fix for bug 35343
result = true;
// logger.warning("Terminating " + serverConnections.size() + "
// connections");
for (Iterator it = serverConnections.iterator(); it.hasNext();) {
ServerConnection serverConnection = (ServerConnection) it.next();
// logger.warning("Terminating " + serverConnection);
serverConnection.handleTermination(timedOut);
}
}
}
return result;
}
protected boolean isAnyThreadProcessingMessage(ClientProxyMembershipID proxyID) {
boolean processingMessage = false;
synchronized (this._clientThreadsLock) {
Set serverConnections = (Set) this._clientThreads.get(proxyID);
if (serverConnections != null) {
for (Iterator it = serverConnections.iterator(); it.hasNext();) {
ServerConnection serverConnection = (ServerConnection) it.next();
if (serverConnection.isProcessingMessage()) {
processingMessage = true;
break;
}
}
}
}
return processingMessage;
}
protected void validateThreads(ClientProxyMembershipID proxyID) {
Set serverConnections = null;
synchronized (this._clientThreadsLock) {
serverConnections = (Set) this._clientThreads.get(proxyID);
if (serverConnections != null) {
serverConnections = new HashSet(serverConnections);
}
}
// release sync and operation on copy to fix bug 37675
if (serverConnections != null) {
for (Iterator it = serverConnections.iterator(); it.hasNext();) {
ServerConnection serverConnection = (ServerConnection) it.next();
if (serverConnection.hasBeenTimedOutOnClient()) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ClientHealtMonitor_0_IS_BEING_TERMINATED_BECAUSE_ITS_CLIENT_TIMEOUT_OF_1_HAS_EXPIRED,
new Object[] {serverConnection,
Integer.valueOf(serverConnection.getClientReadTimeout())}));
try {
serverConnection.handleTermination(true);
// Not all the code in a ServerConnection correctly
// handles interrupt. In particular it is possible to be doing
// p2p distribution and to have sent a message to one peer but
// to never send it to another due to interrupt.
// serverConnection.interruptOwner();
} finally {
// Just to be sure we clean it up.
// This call probably isn't needed.
removeConnection(proxyID, serverConnection);
}
}
}
}
}
/**
* Returns the map of known clients.
*
* @return the map of known clients
*/
public Map getClientHeartbeats() {
return this._clientHeartbeats;
}
/**
* Shuts down the singleton <code>CacheClientNotifier</code> instance.
*/
protected synchronized void shutdown() {
// Stop the client monitor
if (this._clientMonitor != null) {
this._clientMonitor.stopMonitoring();
}
}
/**
* Creates the singleton <code>CacheClientNotifier</code> instance.
*
* @param cache The GemFire <code>Cache</code>
* @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
*/
protected static synchronized void createInstance(InternalCache cache,
int maximumTimeBetweenPings, CacheClientNotifierStats stats) {
refCount++;
if (_instance != null) {
return;
}
_instance = new ClientHealthMonitor(cache, maximumTimeBetweenPings, stats);
}
/**
*
* Constructor.
*
* @param cache The GemFire <code>Cache</code>
* @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
*/
private ClientHealthMonitor(InternalCache cache, int maximumTimeBetweenPings,
CacheClientNotifierStats stats) {
// Set the Cache
this._cache = cache;
// Initialize the client threads map
this._clientThreads = new HashMap();
if (maximumTimeBetweenPings > 0) {
if (logger.isDebugEnabled()) {
logger.debug("{}: Initializing client health monitor thread", this);
}
this._clientMonitor = new ClientHealthMonitorThread(maximumTimeBetweenPings);
this._clientMonitor.start();
} else {
// LOG:CONFIG: changed from config to info
logger.info(LocalizedMessage.create(
LocalizedStrings.ClientHealthMonitor_CLIENT_HEALTH_MONITOR_THREAD_DISABLED_DUE_TO_MAXIMUMTIMEBETWEENPINGS_SETTING__0,
maximumTimeBetweenPings));
this._clientMonitor = null;
}
this.stats = stats;
}
/**
* Returns a brief description of this <code>ClientHealthMonitor</code>
*
* @since GemFire 5.1
*/
@Override
public String toString() {
return "ClientHealthMonitor@" + Integer.toHexString(System.identityHashCode(this));
}
public Map getCleanupProxyIdTable() {
return cleanupProxyIdTable;
}
public Map getCleanupTable() {
return cleanupTable;
}
public int getNumberOfClientsAtVersion(Version version) {
return numOfClientsPerVersion.get(version.ordinal());
}
public int getNumberOfClientsAtOrAboveVersion(Version version) {
int number = 0;
for (int i = version.ordinal(); i < numOfClientsPerVersion.length(); i++) {
number += numOfClientsPerVersion.get(i);
}
return number;
}
public boolean hasDeltaClients() {
return getNumberOfClientsAtOrAboveVersion(Version.GFE_61) > 0;
}
/**
* Class <code>ClientHealthMonitorThread</code> is a <code>Thread</code> that verifies all clients
* are still alive.
*/
class ClientHealthMonitorThread extends Thread {
/**
* The maximum time allowed between pings before determining the client has died and
* interrupting its sockets.
*/
final protected int _maximumTimeBetweenPings;
/**
* Whether the monitor is stopped
*/
protected volatile boolean _isStopped = false;
/**
* Constructor.
*
* @param maximumTimeBetweenPings The maximum time allowed between pings before determining the
* client has died and interrupting its sockets
*/
protected ClientHealthMonitorThread(int maximumTimeBetweenPings) {
super(LoggingThreadGroup.createThreadGroup("ClientHealthMonitor Thread Group", logger),
"ClientHealthMonitor Thread");
setDaemon(true);
// Set the client connection timeout
this._maximumTimeBetweenPings = maximumTimeBetweenPings;
// LOG:CONFIG: changed from config to info
logger.info(LocalizedMessage.create(
LocalizedStrings.ClientHealthMonitor_CLIENTHEALTHMONITORTHREAD_MAXIMUM_ALLOWED_TIME_BETWEEN_PINGS_0,
this._maximumTimeBetweenPings));
if (maximumTimeBetweenPings == 0) {
if (logger.isDebugEnabled()) {
logger.debug("zero ping interval detected", new Exception(
LocalizedStrings.ClientHealthMonitor_STACK_TRACE_0.toLocalizedString()));
}
}
}
/**
* Notifies the monitor to stop monitoring.
*/
protected synchronized void stopMonitoring() {
if (logger.isDebugEnabled()) {
logger.debug("{}: Stopping monitoring", ClientHealthMonitor.this);
}
this._isStopped = true;
this.interrupt();
if (logger.isDebugEnabled()) {
logger.debug("{}: Stopped dispatching", ClientHealthMonitor.this);
}
}
/**
* Returns whether the dispatcher is stopped
*
* @return whether the dispatcher is stopped
*/
protected boolean isStopped() {
return this._isStopped;
}
/**
* Runs the monitor by iterating the map of clients and testing the latest ping time received
* against the current time.
*/
@Override
public void run() {
if (logger.isDebugEnabled()) {
logger.debug("{}: Beginning to monitor clients", ClientHealthMonitor.this);
}
while (!this._isStopped) {
SystemFailure.checkFailure();
try {
Thread.sleep(CLIENT_MONITOR_INTERVAL);
if (logger.isTraceEnabled()) {
logger.trace("Monitoring {} client(s)", getClientHeartbeats().size());
}
// logger.warning("Monitoring " + getClientHeartbeats().size() +
// " client(s).");
// Get the current time
long currentTime = System.currentTimeMillis();
if (logger.isTraceEnabled()) {
logger.trace("{} starting sweep at {}", ClientHealthMonitor.this, currentTime);
}
// Iterate through the clients and verify that they are all still
// alive
for (Iterator i = getClientHeartbeats().entrySet().iterator(); i.hasNext();) {
Map.Entry entry = (Map.Entry) i.next();
ClientProxyMembershipID proxyID = (ClientProxyMembershipID) entry.getKey();
// Validate all ServerConnection threads. If a thread has been
// processing a message for more than the socket timeout time,
// close it it since the client will have timed out and resent.
validateThreads(proxyID);
Long latestHeartbeatValue = (Long) entry.getValue();
// Compare the current value with the current time if it is not null
// If it is null, that means that the client was just registered
// and has not done a heartbeat yet.
if (latestHeartbeatValue != null) {
long latestHeartbeat = latestHeartbeatValue.longValue();
if (logger.isTraceEnabled()) {
logger.trace(
"{} ms have elapsed since the latest heartbeat for client with member id {}",
(currentTime - latestHeartbeat), proxyID);
}
if ((currentTime - latestHeartbeat) > this._maximumTimeBetweenPings) {
// This client has been idle for too long. Determine whether
// any of its ServerConnection threads are currently processing
// a message. If so, let it go. If not, disconnect it.
if (isAnyThreadProcessingMessage(proxyID)) {
if (logger.isDebugEnabled()) {
logger.debug(
"Monitoring client with member id {}. It has been {} ms since the latest heartbeat. This client would have been terminated but at least one of its threads is processing a message.",
entry.getKey(), (currentTime - latestHeartbeat));
}
} else {
if (cleanupClientThreads(proxyID, true)) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ClientHealthMonitor_MONITORING_CLIENT_WITH_MEMBER_ID_0_IT_HAD_BEEN_1_MS_SINCE_THE_LATEST_HEARTBEAT_MAX_INTERVAL_IS_2_TERMINATED_CLIENT,
new Object[] {entry.getKey(), currentTime - latestHeartbeat,
this._maximumTimeBetweenPings}));
}
}
} else {
if (logger.isTraceEnabled()) {
logger.trace(
"Monitoring client with member id {}. It has been {} ms since the latest heartbeat. This client is healthy.",
entry.getKey(), (currentTime - latestHeartbeat));
}
// logger.warning("Monitoring client with member id " +
// entry.getKey() + ". It has been " + (currentTime -
// latestHeartbeat) + " ms since the latest heartbeat. This
// client is healthy.");
}
}
}
} catch (InterruptedException e) {
// no need to reset the bit; we're exiting
if (this._isStopped) {
break;
}
logger.warn(LocalizedMessage
.create(LocalizedStrings.ClientHealthMonitor_UNEXPECTED_INTERRUPT_EXITING), e);
break;
} catch (Exception e) {
// An exception occurred while monitoring the clients. If the monitor
// is not stopped, log it and continue processing.
if (!this._isStopped) {
logger.fatal(LocalizedMessage.create(
LocalizedStrings.ClientHealthMonitor_0_AN_UNEXPECTED_EXCEPTION_OCCURRED,
ClientHealthMonitor.this), e);
}
}
} // while
}
} // ClientHealthMonitorThread
}