blob: e64d89d7b4bd246d16a845d7a7564add82d0784a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.distributed.internal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.ForcedDisconnectException;
import org.apache.geode.IncompatibleSystemException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.SystemConnectException;
import org.apache.geode.SystemFailure;
import org.apache.geode.ToDataException;
import org.apache.geode.admin.GemFireHealthConfig;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.Role;
import org.apache.geode.distributed.internal.locks.ElderState;
import org.apache.geode.distributed.internal.membership.DistributedMembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.MemberFactory;
import org.apache.geode.distributed.internal.membership.MembershipManager;
import org.apache.geode.distributed.internal.membership.NetView;
import org.apache.geode.distributed.internal.membership.adapter.auth.GMSAuthenticator;
import org.apache.geode.distributed.internal.membership.gms.messages.ViewAckMessage;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.OSProcess;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.admin.remote.AdminConsoleDisconnectMessage;
import org.apache.geode.internal.admin.remote.RemoteGfManagerAgent;
import org.apache.geode.internal.admin.remote.RemoteTransportConfig;
import org.apache.geode.internal.alerting.AlertingService;
import org.apache.geode.internal.cache.InitialImageOperation;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingExecutors;
import org.apache.geode.internal.logging.LoggingThread;
import org.apache.geode.internal.logging.LoggingUncaughtExceptionHandler;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.internal.monitoring.ThreadsMonitoringImpl;
import org.apache.geode.internal.monitoring.ThreadsMonitoringImplDummy;
import org.apache.geode.internal.sequencelog.MembershipLogger;
import org.apache.geode.internal.tcp.Connection;
import org.apache.geode.internal.tcp.ConnectionTable;
import org.apache.geode.internal.tcp.ReenteredConnectException;
* The <code>DistributionManager</code> uses a {@link MembershipManager} to distribute
* {@link DistributionMessage messages}. It also reports on who is currently in the distributed
* system and tracks the elder member for the distributed lock service. You may also register a
* membership listener with the DistributionManager to receive notification of changes in
* membership.
* <P>
* Code that wishes to send a {@link DistributionMessage} must get the
* <code>DistributionManager</code> and invoke {@link #putOutgoing}.
* <P>
* @see DistributionMessage#process
* @see IgnoredByManager
public class ClusterDistributionManager implements DistributionManager {
private static final Logger logger = LogService.getLogger();
private static final boolean DEBUG_NO_ACKNOWLEDGEMENTS =
* maximum time, in milliseconds, to wait for all threads to exit
private static final int MAX_STOP_TIME = 20000;
* Time to sleep, in milliseconds, while polling to see if threads have finished
private static final int STOP_PAUSE_TIME = 1000;
* Maximum number of interrupt attempts to stop a thread
private static final int MAX_STOP_ATTEMPTS = 10;
private static final boolean SYNC_EVENTS = Boolean.getBoolean("DistributionManager.syncEvents");
* Flag indicating whether to use single Serial-Executor thread or Multiple Serial-executor
* thread,
private static final boolean MULTI_SERIAL_EXECUTORS =
private static final int MAX_WAITING_THREADS =
Integer.getInteger("DistributionManager.MAX_WAITING_THREADS", Integer.MAX_VALUE);
private static final int MAX_PR_META_DATA_CLEANUP_THREADS =
Integer.getInteger("DistributionManager.MAX_PR_META_DATA_CLEANUP_THREADS", 1);
public static final int MAX_THREADS =
Integer.getInteger("DistributionManager.MAX_THREADS", 100);
private static final int MAX_PR_THREADS = Integer.getInteger("DistributionManager.MAX_PR_THREADS",
Math.max(Runtime.getRuntime().availableProcessors() * 4, 16));
public static final int MAX_FE_THREADS = Integer.getInteger("DistributionManager.MAX_FE_THREADS",
Math.max(Runtime.getRuntime().availableProcessors() * 4, 16));
private static final int INCOMING_QUEUE_LIMIT =
Integer.getInteger("DistributionManager.INCOMING_QUEUE_LIMIT", 80000);
/** Throttling based on the Queue byte size */
private static final double THROTTLE_PERCENT = (double) (Integer
.getInteger("DistributionManager.SERIAL_QUEUE_THROTTLE_PERCENT", 75)) / 100;
private static final int SERIAL_QUEUE_BYTE_LIMIT = Integer
.getInteger("DistributionManager.SERIAL_QUEUE_BYTE_LIMIT", (40 * (1024 * 1024)));
private static final int SERIAL_QUEUE_THROTTLE =
private static final int TOTAL_SERIAL_QUEUE_BYTE_LIMIT =
Integer.getInteger("DistributionManager.TOTAL_SERIAL_QUEUE_BYTE_LIMIT", (80 * (1024 * 1024)));
private static final int TOTAL_SERIAL_QUEUE_THROTTLE =
/** Throttling based on the Queue item size */
private static final int SERIAL_QUEUE_SIZE_LIMIT =
Integer.getInteger("DistributionManager.SERIAL_QUEUE_SIZE_LIMIT", 20000);
private static final int SERIAL_QUEUE_SIZE_THROTTLE =
/** Max number of serial Queue executors, in case of multi-serial-queue executor */
private static final int MAX_SERIAL_QUEUE_THREAD =
Integer.getInteger("DistributionManager.MAX_SERIAL_QUEUE_THREAD", 20);
"Function Execution Processor";
/** The DM type for regular distribution managers */
public static final int NORMAL_DM_TYPE = 10;
/** The DM type for locator distribution managers */
public static final int LOCATOR_DM_TYPE = 11;
/** The DM type for Console (admin-only) distribution managers */
public static final int ADMIN_ONLY_DM_TYPE = 12;
/** The DM type for stand-alone members */
public static final int LONER_DM_TYPE = 13;
* @see org.apache.geode.distributed.internal.PooledDistributionMessage
public static final int STANDARD_EXECUTOR = 73;
* @see org.apache.geode.distributed.internal.SerialDistributionMessage
public static final int SERIAL_EXECUTOR = 74;
* @see org.apache.geode.distributed.internal.HighPriorityDistributionMessage
public static final int HIGH_PRIORITY_EXECUTOR = 75;
// 76 not in use
* @see org.apache.geode.internal.cache.InitialImageOperation
public static final int WAITING_POOL_EXECUTOR = 77;
* @see org.apache.geode.internal.cache.InitialImageOperation
public static final int PARTITIONED_REGION_EXECUTOR = 78;
* Executor for view related messages
* @see ViewAckMessage
public static final int VIEW_EXECUTOR = 79;
public static final int REGION_FUNCTION_EXECUTION_EXECUTOR = 80;
/** Is this node running an AdminDistributedSystem? */
private static volatile boolean isDedicatedAdminVM = false;
private static final ThreadLocal<Boolean> isStartupThread = new ThreadLocal<>();
* Identifier for function execution threads and any of their children
private static final InheritableThreadLocal<Boolean> isFunctionExecutionThread =
new InheritableThreadLocal<Boolean>() {
protected Boolean initialValue() {
return Boolean.FALSE;
///////////////////// Instance Fields //////////////////////
private final Stopper stopper = new Stopper(this);
/** The id of this distribution manager */
private final InternalDistributedMember localAddress;
* The distribution manager type of this dm; set in its constructor.
private final int dmType;
* The <code>MembershipListener</code>s that are registered on this manager.
private final ConcurrentMap<MembershipListener, Boolean> membershipListeners;
private final ClusterElderManager clusterElderManager = new ClusterElderManager(this);
* The <code>MembershipListener</code>s that are registered on this manager for ALL members.
* @since GemFire 5.7
private volatile Set<MembershipListener> allMembershipListeners = Collections.emptySet();
* A lock to hold while adding and removing all membership listeners.
* @since GemFire 5.7
private final Object allMembershipListenersLock = new Object();
/** A queue of MemberEvent instances */
private final BlockingQueue<MemberEvent> membershipEventQueue = new LinkedBlockingQueue<>();
/** Used to invoke registered membership listeners in the background. */
private Thread memberEventThread;
/** A brief description of this DistributionManager */
protected final String description;
/** Statistics about distribution */
protected final DistributionStats stats;
/** Did an exception occur in one of the DM threads? */
private boolean exceptionInThreads;
private volatile boolean shutdownMsgSent = false;
/** Set to true when this manager is being shutdown */
private volatile boolean closeInProgress = false;
private volatile boolean receivedStartupResponse = false;
private volatile String rejectionMessage = null;
private MembershipManager membershipManager;
* The (non-admin-only) members of the distributed system. This is a map of memberid->memberid for
* fast access to canonical ID references. All accesses to this field must be synchronized on
* {@link #membersLock}.
private Map<InternalDistributedMember, InternalDistributedMember> members =
* All (admin and non-admin) members of the distributed system. All accesses to this field must be
* synchronized on {@link #membersLock}.
private Set<InternalDistributedMember> membersAndAdmin = Collections.emptySet();
* Map of all locator members of the distributed system. The value is a collection of locator
* strings that are hosted in that member. All accesses to this field must be synchronized on
* {@link #membersLock}.
private Map<InternalDistributedMember, Collection<String>> hostedLocatorsAll =
* Map of all locator members of the distributed system which have the shared configuration. The
* value is a collection of locator strings that are hosted in that member. All accesses to this
* field must be synchronized on {@link #membersLock}.
private Map<InternalDistributedMember, Collection<String>> hostedLocatorsWithSharedConfiguration =
* The lock held while accessing the field references to the following:<br>
* 1) {@link #members}<br>
* 2) {@link #membersAndAdmin}<br>
* 3) {@link #hostedLocatorsAll}<br>
* 4) {@link #hostedLocatorsWithSharedConfiguration}<br>
private final Object membersLock = new Object();
* The lock held while writing {@link #adminConsoles}.
private final Object adminConsolesLock = new Object();
* The ids of all known admin consoles Uses Copy on Write. Writers must sync on adminConsolesLock.
* Readers don't need to sync.
private volatile Set<InternalDistributedMember> adminConsoles = Collections.emptySet();
/** Message processing thread pool */
private ExecutorService threadPool;
* High Priority processing thread pool, used for initializing messages such as UpdateAttributes
* and CreateRegion messages
private ExecutorService highPriorityPool;
* Waiting Pool, used for messages that may have to wait on something. Use this separate pool with
* an unbounded queue so that waiting runnables don't get in the way of other processing threads.
* Used for threads that will most likely have to wait for a region to be finished initializing
* before it can proceed
private ExecutorService waitingPool;
private ExecutorService prMetaDataCleanupThreadPool;
* Thread used to decouple {@link org.apache.geode.internal.cache.partitioned.PartitionMessage}s
* from {@link org.apache.geode.internal.cache.DistributedCacheOperation}s </b>
private ExecutorService partitionedRegionThread;
private ExecutorService partitionedRegionPool;
/** Function Execution executors */
private ExecutorService functionExecutionThread;
private ExecutorService functionExecutionPool;
/** Message processing executor for serial, ordered, messages. */
private ExecutorService serialThread;
* Message processing executor for view messages
* @see ViewAckMessage
private ExecutorService viewThread;
* If using a throttling queue for the serialThread, we cache the queue here so we can see if
* delivery would block
private ThrottlingMemLinkedQueueWithDMStats<Runnable> serialQueue;
* Thread Monitor mechanism to monitor system threads
* @see org.apache.geode.internal.monitoring.ThreadsMonitoring
private final ThreadsMonitoring threadMonitor;
/** a map keyed on InternalDistributedMember, to direct channels to other systems */
// protected final Map channelMap = CFactory.createCM();
private volatile boolean readyForMessages = false;
* Set to true once this DM is ready to send messages. Note that it is always ready to send the
* startup message.
private volatile boolean readyToSendMsgs = false;
private final Object readyToSendMsgsLock = new Object();
/** Is this distribution manager closed? */
private volatile boolean closed = false;
* The distributed system to which this distribution manager is connected.
private InternalDistributedSystem system;
/** The remote transport configuration for this dm */
private RemoteTransportConfig transport;
* The administration agent associated with this distribution manager.
private volatile RemoteGfManagerAgent agent;
private SerialQueuedExecutorPool serialQueuedExecutorPool;
* TODO why does the distribution manager arbitrate GII operations? That should be a Cache
* function
private final Semaphore parallelGIIs = new Semaphore(InitialImageOperation.MAX_PARALLEL_GIIS);
* Map of InetAddress to HashSets of InetAddress, to define equivalences between network interface
* cards and hosts.
private final HashMap<InetAddress, Set<InetAddress>> equivalentHosts = new HashMap<>();
private int distributedSystemId;
private final Map<InternalDistributedMember, String> redundancyZones =
Collections.synchronizedMap(new HashMap<>());
private boolean enforceUniqueZone = false;
* root cause of forcibly shutting down the distribution manager
private volatile Throwable rootCause = null;
* @see #closeInProgress
private final Object shutdownMutex = new Object();
private final AlertingService alertingService;
////////////////////// Static Methods //////////////////////
* Is the current thread used for executing Functions?
public static Boolean isFunctionExecutionThread() {
return isFunctionExecutionThread.get();
* Creates a new distribution manager and discovers the other members of the distributed system.
* Note that it does not check to see whether or not this VM already has a distribution manager.
* @param system The distributed system to which this distribution manager will send messages.
static ClusterDistributionManager create(InternalDistributedSystem system) {
ClusterDistributionManager distributionManager = null;
boolean beforeJoined = true;
try {
int vmKind;
if (Boolean.getBoolean(InternalLocator.FORCE_LOCATOR_DM_TYPE)) {
// if this DM is starting for a locator, set it to be a locator DM
} else if (isDedicatedAdminVM()) {
} else {
RemoteTransportConfig transport = new RemoteTransportConfig(system.getConfig(), vmKind);
long start = System.currentTimeMillis();
distributionManager =
new ClusterDistributionManager(system, transport, system.getAlertingService());
beforeJoined = false; // we have now joined the system
InternalDistributedMember id = distributionManager.getDistributionManagerId();
if (!"".equals(id.getName())) {
for (InternalDistributedMember m : distributionManager
.getViewMembers()) {
if (m.equals(id)) {
// I'm counting on the members returned by getViewMembers being ordered such that
// members that joined before us will precede us AND members that join after us
// will succeed us.
// SO once we find ourselves break out of this loop.
if (id.getName().equals(m.getName())) {
if (distributionManager.getMembershipManager().verifyMember(m,
"member is using the name of " + id)) {
throw new IncompatibleSystemException("Member " + id
+ " could not join this distributed system because the existing member " + m
+ " used the same name. Set the \"name\" gemfire property to a unique value.");
distributionManager.addNewMember(id); // add ourselves
// Send out a StartupMessage to the other members.
StartupOperation op = new StartupOperation(distributionManager, transport);
try {
if (!distributionManager.sendStartupMessage(op)) {
// Well we didn't hear back from anyone else. We assume that
// we're the first one.
if (distributionManager.getOtherDistributionManagerIds().size() == 0) {"Did not hear back from any other system. I am the first one.");
} else if (transport.isMcastEnabled()) {
// perform a multicast ping test
if (!distributionManager.testMulticast()) {
"Did not receive a startup response but other members exist. Multicast does not seem to be working.");
} catch (InterruptedException ex) {
// This is ALWAYS bad; don't consult a CancelCriterion.
throw new InternalGemFireException(
"Interrupted while waiting for first StartupResponseMessage",
} catch (IncompatibleSystemException ex) {
logger.fatal(ex.getMessage(), ex);
throw ex;
} finally {
if (logger.isInfoEnabled()) {
long delta = System.currentTimeMillis() - start;,
"DistributionManager {} started on {}. There were {} other DMs. others: {} {} {}",
distributionManager.getDistributionManagerId(), transport,
(logger.isInfoEnabled(LogMarker.DM_MARKER) ? " (took " + delta + " ms)" : ""),
((distributionManager.getDMType() == ADMIN_ONLY_DM_TYPE) ? " (admin only)"
: (distributionManager.getDMType() == LOCATOR_DM_TYPE) ? " (locator)" : ""));
return distributionManager;
} catch (RuntimeException r) {
if (distributionManager != null) {
if (logger.isDebugEnabled()) {
logger.debug("cleaning up incompletely started DistributionManager due to exception", r);
throw r;
/////////////////////// Constructors ///////////////////////
* Creates a new <code>DistributionManager</code> by initializing itself, creating the membership
* manager and executors
* @param transport The configuration for the communications transport
private ClusterDistributionManager(RemoteTransportConfig transport,
InternalDistributedSystem system, AlertingService alertingService) {
this.system = system;
this.transport = transport;
this.alertingService = alertingService;
dmType = transport.getVmKind();
membershipListeners = new ConcurrentHashMap<>();
distributedSystemId = system.getConfig().getDistributedSystemId();
long statId = OSProcess.getId();
stats = new DistributionStats(system, statId);
DistributionStats.enableClockStats = system.getConfig().getEnableTimeStatistics();
exceptionInThreads = false;
Properties nonDefault = new Properties();
DistributionConfigImpl distributionConfigImpl = new DistributionConfigImpl(nonDefault);
if (distributionConfigImpl.getThreadMonitorEnabled()) {
threadMonitor = new ThreadsMonitoringImpl(system);"[ThreadsMonitor] a New Monitor object and process were created.\n");
} else {
threadMonitor = new ThreadsMonitoringImplDummy();"[ThreadsMonitor] Monitoring is disabled and will not be run.\n");
boolean finishedConstructor = false;
try {
if (logger.isInfoEnabled(LogMarker.DM_MARKER)) {,
"Serial Queue info :" + " THROTTLE_PERCENT: " + THROTTLE_PERCENT
// when TCP/IP is disabled we can't throttle the serial queue or we run the risk of
// distributed deadlock when we block the UDP reader thread
boolean throttlingDisabled = system.getConfig().getDisableTcp();
serialQueuedExecutorPool =
new SerialQueuedExecutorPool(stats, throttlingDisabled, threadMonitor);
BlockingQueue<Runnable> poolQueue;
poolQueue = new OverflowQueueWithDMStats<>(stats.getSerialQueueHelper());
} else {
serialQueue =
new ThrottlingMemLinkedQueueWithDMStats<>(TOTAL_SERIAL_QUEUE_BYTE_LIMIT,
poolQueue = serialQueue;
serialThread = LoggingExecutors.newSerialThreadPool("Serial Message Processor",
thread -> stats.incSerialThreadStarts(),
this::doSerialThread, stats.getSerialProcessorHelper(),
threadMonitor, poolQueue);
viewThread =
LoggingExecutors.newSerialThreadPoolWithUnlimitedFeed("View Message Processor",
thread -> stats.incViewThreadStarts(), this::doViewThread,
stats.getViewProcessorHelper(), threadMonitor);
threadPool =
LoggingExecutors.newThreadPoolWithFeedStatistics("Pooled Message Processor ",
thread -> stats.incProcessingThreadStarts(), this::doProcessingThread,
MAX_THREADS, stats.getNormalPoolHelper(), threadMonitor,
INCOMING_QUEUE_LIMIT, stats.getOverflowQueueHelper());
highPriorityPool = LoggingExecutors.newThreadPoolWithFeedStatistics(
"Pooled High Priority Message Processor ",
thread -> stats.incHighPriorityThreadStarts(), this::doHighPriorityThread,
MAX_THREADS, stats.getHighPriorityPoolHelper(), threadMonitor,
INCOMING_QUEUE_LIMIT, stats.getHighPriorityQueueHelper());
BlockingQueue<Runnable> poolQueue;
// no need for a queue since we have infinite threads
poolQueue = new SynchronousQueue<>();
} else {
poolQueue = new OverflowQueueWithDMStats<>(stats.getWaitingQueueHelper());
waitingPool = LoggingExecutors.newThreadPool("Pooled Waiting Message Processor ",
thread -> stats.incWaitingThreadStarts(), this::doWaitingThread,
MAX_WAITING_THREADS, stats.getWaitingPoolHelper(), threadMonitor, poolQueue);
// should this pool using the waiting pool stats?
prMetaDataCleanupThreadPool =
LoggingExecutors.newThreadPoolWithFeedStatistics("PrMetaData cleanup Message Processor ",
thread -> stats.incWaitingThreadStarts(), this::doWaitingThread,
MAX_PR_META_DATA_CLEANUP_THREADS, stats.getWaitingPoolHelper(), threadMonitor,
0, stats.getWaitingQueueHelper());
if (MAX_PR_THREADS > 1) {
partitionedRegionPool =
LoggingExecutors.newThreadPoolWithFeedStatistics("PartitionedRegion Message Processor",
thread -> stats.incPartitionedRegionThreadStarts(), this::doPartitionRegionThread,
MAX_PR_THREADS, stats.getPartitionedRegionPoolHelper(), threadMonitor,
INCOMING_QUEUE_LIMIT, stats.getPartitionedRegionQueueHelper());
} else {
partitionedRegionThread = LoggingExecutors.newSerialThreadPoolWithFeedStatistics(
"PartitionedRegion Message Processor",
thread -> stats.incPartitionedRegionThreadStarts(), this::doPartitionRegionThread,
stats.getPartitionedRegionPoolHelper(), threadMonitor,
INCOMING_QUEUE_LIMIT, stats.getPartitionedRegionQueueHelper());
if (MAX_FE_THREADS > 1) {
functionExecutionPool =
thread -> stats.incFunctionExecutionThreadStarts(), this::doFunctionExecutionThread,
MAX_FE_THREADS, stats.getFunctionExecutionPoolHelper(), threadMonitor,
INCOMING_QUEUE_LIMIT, stats.getFunctionExecutionQueueHelper());
} else {
functionExecutionThread =
thread -> stats.incFunctionExecutionThreadStarts(), this::doFunctionExecutionThread,
stats.getFunctionExecutionPoolHelper(), threadMonitor,
INCOMING_QUEUE_LIMIT, stats.getFunctionExecutionQueueHelper());
memberEventThread =
new LoggingThread("DM-MemberEventInvoker", new MemberEventInvoker());
StringBuilder sb = new StringBuilder(" (took ");
// connect to the cluster
long start = System.currentTimeMillis();
DMListener l = new DMListener(this);
membershipManager = MemberFactory.newMembershipManager(l, transport,
new GMSAuthenticator(system.getSecurityProperties(), system.getSecurityService(),
system.getSecurityLogWriter(), system.getInternalLogWriter()),
sb.append(System.currentTimeMillis() - start);
localAddress = membershipManager.getLocalMember();
sb.append(" ms)");"Starting DistributionManager {}. {}",
new Object[] {localAddress,
(logger.isInfoEnabled(LogMarker.DM_MARKER) ? sb.toString() : "")});
description = "Distribution manager on " + localAddress + " started at "
+ (new Date(System.currentTimeMillis())).toString();
finishedConstructor = true;
} finally {
if (!finishedConstructor) {
askThreadsToStop(); // fix for bug 42039
private void doFunctionExecutionThread(Runnable command) {
try {
} finally {
private void doProcessingThread(Runnable command) {
try {
} finally {
private void doHighPriorityThread(Runnable command) {
try {
} finally {
private void doWaitingThread(Runnable command) {
try {
} finally {
private void doPartitionRegionThread(Runnable command) {
try {
} finally {
private void doViewThread(Runnable command) {
try {
} finally {
private void doSerialThread(Runnable command) {
try {
} finally {
* Creates a new distribution manager
* @param system The distributed system to which this distribution manager will send messages.
private ClusterDistributionManager(InternalDistributedSystem system,
RemoteTransportConfig transport, AlertingService alertingService) {
this(transport, system, alertingService);
boolean finishedConstructor = false;
try {
// Allow events to start being processed.
for (;;) {
boolean interrupted = Thread.interrupted();
try {
} catch (InterruptedException e) {
interrupted = true;
} finally {
if (interrupted) {
finishedConstructor = true;
} finally {
if (!finishedConstructor) {
askThreadsToStop(); // fix for bug 42039
* Is this VM dedicated to administration (like a GUI console or a JMX agent)? If so, then it
* creates {@link #ADMIN_ONLY_DM_TYPE} type distribution managers.
* @since GemFire 4.0
public static boolean isDedicatedAdminVM() {
return isDedicatedAdminVM;
public static void setIsDedicatedAdminVM(boolean isDedicatedAdminVM) {
ClusterDistributionManager.isDedicatedAdminVM = isDedicatedAdminVM;
private static Boolean isStartupThread() {
return isStartupThread.get();
private static void setIsStartupThread() {
//////////////////// Instance Methods /////////////////////
private void runUntilShutdown(Runnable r) {
try {;
} catch (CancelException e) {
if (logger.isTraceEnabled()) {
logger.trace("Caught shutdown exception", e);
} catch (VirtualMachineError err) {
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
if (shouldInhibitMembershipWarnings()) {
logger.debug("Caught unusual exception during shutdown: {}", t.getMessage(), t);
} else {
logger.warn("Task failed with exception", t);
* Returns true if the two members are on the same equivalent host based on overlapping IP
* addresses collected for all NICs during exchange of startup messages.
* @param member1 First member
* @param member2 Second member
public boolean areOnEquivalentHost(InternalDistributedMember member1,
InternalDistributedMember member2) {
Set<InetAddress> equivalents1 = getEquivalents(member1.getInetAddress());
return equivalents1.contains(member2.getInetAddress());
* Set the host equivalencies for a given host. This overrides any previous information in the
* tables.
* @param equivs list of InetAddress's that all point at same host
void setEquivalentHosts(Set<InetAddress> equivs) {
Iterator<InetAddress> it = equivs.iterator();
synchronized (equivalentHosts) {
while (it.hasNext()) {
equivalentHosts.put(, Collections.unmodifiableSet(equivs));
* Return all of the InetAddress's that are equivalent to the given one (same host)
* @param in host to match up
* @return all the addresses thus equivalent
public Set<InetAddress> getEquivalents(InetAddress in) {
Set<InetAddress> result;
synchronized (equivalentHosts) {
result = equivalentHosts.get(in);
// DS 11/25/08 - It appears that when using VPN, the distributed member
// id is the vpn address, but that doesn't show up in the equivalents.
if (result == null) {
result = Collections.singleton(in);
return result;
void setRedundancyZone(InternalDistributedMember member, String redundancyZone) {
if (redundancyZone != null && !redundancyZone.equals("")) {
redundancyZones.put(member, redundancyZone);
if (member != getDistributionManagerId()) {
String relationship = areInSameZone(getDistributionManagerId(), member) ? "" : "not ";
Object[] logArgs = new Object[] {member, relationship};"Member {} is {} equivalent or in the same redundancy zone.",
* Set the flag indicating that we should enforce unique zones. If we are already enforcing unique
* zones, keep it that way.
void setEnforceUniqueZone(boolean enforceUniqueZone) {
this.enforceUniqueZone |= enforceUniqueZone;
public boolean enforceUniqueZone() {
return enforceUniqueZone;
public String getRedundancyZone(InternalDistributedMember member) {
return redundancyZones.get(member);
* Asserts that distributionManagerType is LOCAL, GEMFIRE, or ADMIN_ONLY. Also asserts that the
* distributionManagerId (jgroups DistributedMember) has a VmKind that matches.
private void assertDistributionManagerType() {
// Assert that dmType is one of the three DM types...
int theDmType = getDMType();
switch (theDmType) {
Assert.assertTrue(false, "unknown distribution manager type");
// Assert InternalDistributedMember VmKind matches this DistributionManagerType...
final InternalDistributedMember theId = getDistributionManagerId();
final int vmKind = theId.getVmKind();
if (theDmType != vmKind) {
"InternalDistributedMember has a vmKind of " + vmKind + " instead of " + theDmType);
public int getDMType() {
return dmType;
public List<InternalDistributedMember> getViewMembers() {
return membershipManager.getView().getMembers();
private boolean testMulticast() {
return membershipManager.testMulticast();
* Need to do this outside the constructor so that the child constructor can finish.
private void startThreads() {
system.setDM(this); // fix for bug 33362
if (memberEventThread != null)
try {
// And the distinguished guests today are...
NetView v = membershipManager.getView();"Initial (distribution manager) view, {}",
// Add them all to our view
for (InternalDistributedMember internalDistributedMember : v.getMembers()) {
} catch (Exception ex) {
throw new InternalGemFireException(
"Could not process initial view",
try {
getWaitingThreadPool().execute(() -> {
// call in background since it might need to send a reply
// and we are not ready to send messages until startup is finished
} catch (VirtualMachineError err) {
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
logger.fatal("Uncaught exception calling readyForMessages", t);
private void readyForMessages() {
synchronized (this) {
readyForMessages = true;
private void waitUntilReadyForMessages() {
if (readyForMessages)
synchronized (this) {
while (!readyForMessages) {
boolean interrupted = Thread.interrupted();
try {
} catch (InterruptedException e) {
interrupted = true;
} finally {
if (interrupted) {
} // synchronized
* Call when the DM is ready to send messages.
private void readyToSendMsgs() {
synchronized (readyToSendMsgsLock) {
readyToSendMsgs = true;
* Return when DM is ready to send out messages.
* @param msg the messsage that is currently being sent
private void waitUntilReadyToSendMsgs(DistributionMessage msg) {
if (readyToSendMsgs) {
// another process may have been started in the same view, so we need
// to be responsive to startup messages and be able to send responses
if (msg instanceof AdminMessageType) {
if (isStartupThread() == Boolean.TRUE) {
// let the startup thread send messages
// the only case I know of that does this is if we happen to log a
// message during startup and an alert listener has registered.
synchronized (readyToSendMsgsLock) {
while (!readyToSendMsgs) {
boolean interrupted = Thread.interrupted();
try {
} catch (InterruptedException e) {
interrupted = true;
} finally {
if (interrupted) {
} // for
} // synchronized
public void forceUDPMessagingForCurrentThread() {
public void releaseUDPMessagingForCurrentThread() {
* Did an exception occur in one of the threads launched by this distribution manager?
public boolean exceptionInThreads() {
return exceptionInThreads
|| LoggingUncaughtExceptionHandler.getUncaughtExceptionsCount() > 0;
* Clears the boolean that determines whether or not an exception occurred in one of the worker
* threads. This method should be used for testing purposes only!
public void clearExceptionInThreads() {
exceptionInThreads = false;
* Returns the current "cache time" in milliseconds since the epoch. The "cache time" takes into
* account skew among the local clocks on the various machines involved in the cache.
public long cacheTimeMillis() {
return system.getClock().cacheTimeMillis();
public DistributedMember getMemberWithName(String name) {
for (DistributedMember id : members.values()) {
if (Objects.equals(id.getName(), name)) {
return id;
if (Objects.equals(localAddress.getName(), name)) {
return localAddress;
return null;
* Returns the id of this distribution manager.
public InternalDistributedMember getDistributionManagerId() {
return localAddress;
* Returns an unmodifiable set containing the identities of all of the known (non-admin-only)
* distribution managers.
public Set<InternalDistributedMember> getDistributionManagerIds() {
// access to members synchronized under membersLock in order to
// ensure serialization
synchronized (membersLock) {
return members.keySet();
* Adds the entry in {@link #hostedLocatorsAll} for a member with one or more hosted locators. The
* value is a collection of host[port] strings. If a bind-address was used for a locator then the
* form is bind-addr[port].
* @since GemFire 6.6.3
public void addHostedLocators(InternalDistributedMember member, Collection<String> locators,
boolean isSharedConfigurationEnabled) {
synchronized (membersLock) {
if (locators == null || locators.isEmpty()) {
throw new IllegalArgumentException("Cannot use empty collection of locators");
if (hostedLocatorsAll.isEmpty()) {
hostedLocatorsAll = new HashMap<>();
Map<InternalDistributedMember, Collection<String>> tmp =
new HashMap<>(hostedLocatorsAll);
tmp.put(member, locators);
tmp = Collections.unmodifiableMap(tmp);
hostedLocatorsAll = tmp;
if (isSharedConfigurationEnabled) {
if (hostedLocatorsWithSharedConfiguration.isEmpty()) {
hostedLocatorsWithSharedConfiguration = new HashMap<>();
tmp = new HashMap<>(hostedLocatorsWithSharedConfiguration);
tmp.put(member, locators);
tmp = Collections.unmodifiableMap(tmp);
hostedLocatorsWithSharedConfiguration = tmp;
private void removeHostedLocators(InternalDistributedMember member) {
synchronized (membersLock) {
if (hostedLocatorsAll.containsKey(member)) {
Map<InternalDistributedMember, Collection<String>> tmp =
new HashMap<>(hostedLocatorsAll);
if (tmp.isEmpty()) {
tmp = Collections.emptyMap();
} else {
tmp = Collections.unmodifiableMap(tmp);
hostedLocatorsAll = tmp;
if (hostedLocatorsWithSharedConfiguration.containsKey(member)) {
Map<InternalDistributedMember, Collection<String>> tmp =
new HashMap<>(
if (tmp.isEmpty()) {
tmp = Collections.emptyMap();
} else {
tmp = Collections.unmodifiableMap(tmp);
hostedLocatorsWithSharedConfiguration = tmp;
* Gets the value in {@link #hostedLocatorsAll} for a member with one or more hosted locators. The
* value is a collection of host[port] strings. If a bind-address was used for a locator then the
* form is bind-addr[port].
* @since GemFire 6.6.3
public Collection<String> getHostedLocators(InternalDistributedMember member) {
synchronized (membersLock) {
return hostedLocatorsAll.get(member);
* Returns a copy of the map of all members hosting locators. The key is the member, and the value
* is a collection of host[port] strings. If a bind-address was used for a locator then the form
* is bind-addr[port].
* The member is the vm that hosts one or more locator, if another locator starts up linking to
* this locator, it will put that member in this map as well, and this member will in the map on
* the other locato vm as well.
* The keyset of the map are the locator vms in this cluster.
* the value is a collection of strings in case one vm can have multiple locators ????
* @since GemFire 6.6.3
public Map<InternalDistributedMember, Collection<String>> getAllHostedLocators() {
synchronized (membersLock) {
return hostedLocatorsAll;
* Returns a copy of the map of all members hosting locators with shared configuration. The key is
* the member, and the value is a collection of host[port] strings. If a bind-address was used for
* a locator then the form is bind-addr[port].
* @since GemFire 8.0
public Map<InternalDistributedMember, Collection<String>> getAllHostedLocatorsWithSharedConfiguration() {
synchronized (membersLock) {
return hostedLocatorsWithSharedConfiguration;
* Returns an unmodifiable set containing the identities of all of the known (including admin)
* distribution managers.
public Set<InternalDistributedMember> getDistributionManagerIdsIncludingAdmin() {
// access to members synchronized under membersLock in order to
// ensure serialization
synchronized (membersLock) {
return membersAndAdmin;
* Returns a private-memory list containing the identities of all the other known distribution
* managers not including me.
public Set<InternalDistributedMember> getOtherDistributionManagerIds() {
// We return a modified copy of the list, so
// collect the old list and copy under the lock.
Set<InternalDistributedMember> result = new HashSet<>(getDistributionManagerIds());
InternalDistributedMember me = getDistributionManagerId();
// It's okay for my own id to not be in the list of all ids yet.
return result;
public Set<InternalDistributedMember> getOtherNormalDistributionManagerIds() {
// We return a modified copy of the list, so
// collect the old list and copy under the lock.
Set<InternalDistributedMember> result = new HashSet<>(getNormalDistributionManagerIds());
InternalDistributedMember me = getDistributionManagerId();
// It's okay for my own id to not be in the list of all ids yet.
return result;
public InternalDistributedMember getCanonicalId(DistributedMember id) {
// the members set is copy-on-write, so it is safe to iterate over it
InternalDistributedMember result = members.get(id);
if (result == null) {
return (InternalDistributedMember) id;
return result;
* Add a membership listener and return other DistributionManagerIds as an atomic operation
public Set<InternalDistributedMember> addMembershipListenerAndGetDistributionManagerIds(
MembershipListener l) {
// switched sync order to fix bug 30360
synchronized (membersLock) {
// Don't let the members come and go while we are adding this
// listener. This ensures that the listener (probably a
// ReplyProcessor) gets a consistent view of the members.
// Note it is ok to return the members set
// because we will never modify the returned set.
return members.keySet();
private void addNewMember(InternalDistributedMember member) {
// This is the place to cleanup the zombieMembers
int vmType = member.getVmKind();
switch (vmType) {
throw new InternalGemFireError(String.format("Unknown member type: %s",
* Returns the identity of this <code>DistributionManager</code>
public InternalDistributedMember getId() {
return localAddress;
public long getMembershipPort() {
return localAddress.getPort();
public Set<InternalDistributedMember> putOutgoing(final DistributionMessage msg) {
try {
DistributionMessageObserver observer = DistributionMessageObserver.getInstance();
if (observer != null) {
observer.beforeSendMessage(this, msg);
return sendMessage(msg);
} catch (NotSerializableException e) {
throw new InternalGemFireException(e);
public String toString() {
return description;
* Informs other members that this dm is shutting down. Stops the pusher, puller, and processor
* threads and closes the connection to the transport layer.
protected void shutdown() {
// Make sure only one thread initiates shutdown...
synchronized (shutdownMutex) {
if (closeInProgress) {
closeInProgress = true;
} // synchronized
// [bruce] log shutdown at info level and with ID to balance the
// "Starting" message. recycleConn.conf is hard to debug w/o this
final String exceptionStatus = (exceptionInThreads()
? "At least one Exception occurred."
: "");"Shutting down DistributionManager {}. {}",
new Object[] {localAddress, exceptionStatus});
final long start = System.currentTimeMillis();
try {
if (rootCause instanceof ForcedDisconnectException) {
if (logger.isDebugEnabled()) {
"inhibiting sending of shutdown message to other members due to forced-disconnect");
} else {
// Don't block indefinitely trying to send the shutdown message, in
// case other VMs in the system are ill-behaved. (bug 34710)
final Runnable r = () -> {
try {
} catch (final CancelException e) {
// We were terminated.
logger.debug("Cancelled during shutdown message", e);
final Thread t =
new LoggingThread(String.format("Shutdown Message Thread for %s",
localAddress), false, r);
boolean interrupted = Thread.interrupted();
try {
t.join(MAX_STOP_TIME / 4);
} catch (final InterruptedException e) {
interrupted = true;
logger.warn("Interrupted sending shutdown message to peers",
} finally {
if (interrupted) {
if (t.isAlive()) {
logger.warn("Failed sending shutdown message to peers (timeout)");
} finally {
shutdownMsgSent = true; // in case sendShutdownMessage failed....
try {
} finally {
final Long delta = System.currentTimeMillis() - start;"DistributionManager stopped in {}ms.", delta);
private void askThreadsToStop() {
// Stop executors after they have finished
ExecutorService es;
es = serialThread;
if (es != null) {
es = viewThread;
if (es != null) {
// Hmmm...OK, I'll let any view events currently in the queue be
// processed. Not sure it's very important whether they get
// handled...
if (serialQueuedExecutorPool != null) {
es = functionExecutionThread;
if (es != null) {
es = functionExecutionPool;
if (es != null) {
es = partitionedRegionThread;
if (es != null) {
es = partitionedRegionPool;
if (es != null) {
es = highPriorityPool;
if (es != null) {
es = waitingPool;
if (es != null) {
es = prMetaDataCleanupThreadPool;
if (es != null) {
es = threadPool;
if (es != null) {
Thread th = memberEventThread;
if (th != null)
private void waitForThreadsToStop(long timeInMillis) throws InterruptedException {
long start = System.currentTimeMillis();
long remaining = timeInMillis;
ExecutorService[] allExecutors = new ExecutorService[] {serialThread, viewThread,
functionExecutionThread, functionExecutionPool, partitionedRegionThread,
partitionedRegionPool, highPriorityPool, waitingPool,
prMetaDataCleanupThreadPool, threadPool};
for (ExecutorService es : allExecutors) {
if (es != null) {
es.awaitTermination(remaining, TimeUnit.MILLISECONDS);
remaining = timeInMillis - (System.currentTimeMillis() - start);
if (remaining <= 0) {
serialQueuedExecutorPool.awaitTermination(remaining, TimeUnit.MILLISECONDS);
remaining = timeInMillis - (System.currentTimeMillis() - start);
if (remaining <= 0) {
Thread th = memberEventThread;
if (th != null) {
th.interrupt(); // bug #43452 - this thread sometimes eats interrupts, so we interrupt it
// again here
* Cheap tool to kill a referenced thread
* @param t the thread to kill
private void clobberThread(Thread t) {
if (t == null)
if (t.isAlive()) {
logger.warn("Forcing thread stop on < {} >", t);
// Start by being nice.
// we could be more violent here...
// t.stop();
try {
for (int i = 0; i < MAX_STOP_ATTEMPTS && t.isAlive(); i++) {
} catch (InterruptedException ex) {
logger.debug("Interrupted while attempting to terminate threads.");
// just keep going
if (t.isAlive()) {
logger.warn("Thread refused to die: {}", t);
* Cheap tool to examine an executor to see if it is still working
* @return true if executor is still active
private boolean executorAlive(ExecutorService tpe, String name) {
if (tpe == null) {
return false;
} else {
int ac = ((ThreadPoolExecutor) tpe).getActiveCount();
// boolean result = tpe.getActiveCount() > 0;
if (ac > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Still waiting for {} threads in '{}' pool to exit", ac, name);
return true;
} else {
return false;
* Wait for the ancillary queues to exit. Kills them if they are still around.
private void forceThreadsToStop() {
long endTime = System.currentTimeMillis() + MAX_STOP_TIME;
StringBuilder culprits;
for (;;) {
boolean stillAlive = false;
culprits = new StringBuilder();
if (executorAlive(serialThread, "serial thread")) {
stillAlive = true;
culprits.append(" serial thread;");
if (executorAlive(viewThread, "view thread")) {
stillAlive = true;
culprits.append(" view thread;");
if (executorAlive(partitionedRegionThread, "partitioned region thread")) {
stillAlive = true;
culprits.append(" partitioned region thread;");
if (executorAlive(partitionedRegionPool, "partitioned region pool")) {
stillAlive = true;
culprits.append(" partitioned region pool;");
if (executorAlive(highPriorityPool, "high priority pool")) {
stillAlive = true;
culprits.append(" high priority pool;");
if (executorAlive(waitingPool, "waiting pool")) {
stillAlive = true;
culprits.append(" waiting pool;");
if (executorAlive(prMetaDataCleanupThreadPool, "prMetaDataCleanupThreadPool")) {
stillAlive = true;
culprits.append(" special waiting pool;");
if (executorAlive(threadPool, "thread pool")) {
stillAlive = true;
culprits.append(" thread pool;");
if (!stillAlive)
long now = System.currentTimeMillis();
if (now >= endTime)
try {
} catch (InterruptedException e) {
// Desperation, the shutdown thread is being killed. Don't
// consult a CancelCriterion.
logger.warn("Interrupted during shutdown", e);
} // for
logger.warn("Daemon threads are slow to stop; culprits include: {}",
// Kill with no mercy
if (serialThread != null) {
if (viewThread != null) {
if (functionExecutionThread != null) {
if (functionExecutionPool != null) {
if (partitionedRegionThread != null) {
if (partitionedRegionPool != null) {
if (highPriorityPool != null) {
if (waitingPool != null) {
if (prMetaDataCleanupThreadPool != null) {
if (threadPool != null) {
Thread th = memberEventThread;
if (th != null) {
private volatile boolean shutdownInProgress = false;
/** guard for membershipViewIdAcknowledged */
private final Object membershipViewIdGuard = new Object();
/** the latest view ID that has been processed by all membership listeners */
private long membershipViewIdAcknowledged;
public boolean shutdownInProgress() {
return shutdownInProgress;
* Stops the pusher, puller and processor threads and closes the connection to the transport
* layer. This should only be used from shutdown() or from the dm initialization code
private void uncleanShutdown(boolean beforeJoined) {
try {
closeInProgress = true; // set here also to fix bug 36736
shutdownInProgress = true;
if (membershipManager != null) {
// wait a moment before asking threads to terminate
try {
} catch (InterruptedException ie) {
// No need to reset interrupt bit, we're really trying to quit...
} // try
finally {
// ABSOLUTELY ESSENTIAL that we close the distribution channel!
try {
// For safety, but channel close in a finally AFTER this...
if (stats != null) {
try {
} catch (InterruptedException ie) {
// No need to reset interrupt bit, we're really trying to quit...
} finally {
if (membershipManager != null) {"Now closing distribution for {}",
public InternalDistributedSystem getSystem() {
return system;
public AlertingService getAlertingService() {
return alertingService;
* Returns the transport configuration for this distribution manager
RemoteTransportConfig getTransport() {
return transport;
public void addMembershipListener(MembershipListener l) {
membershipListeners.putIfAbsent(l, Boolean.TRUE);
public void removeMembershipListener(MembershipListener l) {
public Collection<MembershipListener> getMembershipListeners() {
return Collections.unmodifiableSet(membershipListeners.keySet());
* Adds a <code>MembershipListener</code> to this distribution manager.
private void addAllMembershipListener(MembershipListener l) {
synchronized (allMembershipListenersLock) {
Set<MembershipListener> newAllMembershipListeners =
new HashSet<>(allMembershipListeners);
allMembershipListeners = newAllMembershipListeners;
public void removeAllMembershipListener(MembershipListener l) {
synchronized (allMembershipListenersLock) {
Set<MembershipListener> newAllMembershipListeners =
new HashSet<>(allMembershipListeners);
if (!newAllMembershipListeners.remove(l)) {
// There seems to be a race condition in which
// multiple departure events can be registered
// on the same peer. We regard this as benign.
// FIXME when membership events become sane again
// String s = "MembershipListener was never registered";
// throw new IllegalArgumentException(s);
allMembershipListeners = newAllMembershipListeners;
private boolean shouldInhibitMembershipWarnings() {
if (isCloseInProgress()) {
return true;
InternalDistributedSystem ds = getSystem();
return ds != null && ds.isDisconnecting();
* Returns true if this distribution manager has initiated shutdown
public boolean isCloseInProgress() {
return closeInProgress;
private void handleViewInstalledEvent(ViewInstalledEvent ev) {
synchronized (membershipViewIdGuard) {
membershipViewIdAcknowledged = ev.getViewId();
* This stalls waiting for the current membership view (as seen by the membership manager) to be
* acknowledged by all membership listeners
void waitForViewInstallation(long id) throws InterruptedException {
if (id <= membershipViewIdAcknowledged) {
synchronized (membershipViewIdGuard) {
while (membershipViewIdAcknowledged < id && !stopper.isCancelInProgress()) {
if (logger.isDebugEnabled()) {
logger.debug("waiting for view {}. Current DM view processed by all listeners is {}", id,
private void handleMemberEvent(MemberEvent ev) {
* This thread processes member events as they occur.
* @see ClusterDistributionManager.MemberCrashedEvent
* @see ClusterDistributionManager.MemberJoinedEvent
* @see ClusterDistributionManager.MemberDepartedEvent
protected class MemberEventInvoker implements Runnable {
public void run() {
for (;;) {
// bug 41539 - member events need to be delivered during shutdown
// or reply processors may hang waiting for replies from
// departed members
// if (getCancelCriterion().isCancelInProgress()) {
// break; // no message, just quit
// }
if (!system.isConnected
&& isClosed()) {
try {
MemberEvent ev =
} catch (InterruptedException e) {
if (shouldInhibitMembershipWarnings()) {
if (logger.isTraceEnabled()) {
logger.trace("MemberEventInvoker: InterruptedException during shutdown");
} else {
logger.warn("Unexpected InterruptedException", e);
} catch (DistributedSystemDisconnectedException e) {
} catch (CancelException e) {
if (shouldInhibitMembershipWarnings()) {
if (logger.isTraceEnabled()) {
logger.trace("MemberEventInvoker: cancelled");
} else {
logger.warn("Unexpected cancellation", e);
} catch (Exception e) {
"Uncaught exception processing member event",
} // for
if (logger.isTraceEnabled()) {
logger.trace("MemberEventInvoker on {} stopped", ClusterDistributionManager.this);
private void addMemberEvent(MemberEvent ev) {
} else {
boolean interrupted = Thread.interrupted();
try {
} catch (InterruptedException ex) {
interrupted = true;
handleMemberEvent(ev); // FIXME why???
} finally {
if (interrupted) {
public void close() {
if (!closed) {
shutdown();"Marking DistributionManager {} as closed.",
closed = true;
public void throwIfDistributionStopped() {
if (shutdownMsgSent) {
throw new DistributedSystemDisconnectedException(
"Message distribution has terminated",
* Returns true if this distribution manager has been closed.
public boolean isClosed() {
return closed;
public void addAdminConsole(InternalDistributedMember theId) {"New administration member detected at {}.", theId);
synchronized (adminConsolesLock) {
HashSet<InternalDistributedMember> tmp = new HashSet<>(adminConsoles);
adminConsoles = Collections.unmodifiableSet(tmp);
public DMStats getStats() {
return stats;
public DistributionConfig getConfig() {
DistributionConfig result = null;
InternalDistributedSystem sys = getSystem();
if (sys != null) {
result = system.getConfig();
return result;
public Set<InternalDistributedMember> getAllOtherMembers() {
Set<InternalDistributedMember> result =
new HashSet<>(getDistributionManagerIdsIncludingAdmin());
return result;
public void retainMembersWithSameOrNewerVersion(Collection<InternalDistributedMember> members,
Version version) {
members.removeIf(id -> id.getVersionObject().compareTo(version) < 0);
public void removeMembersWithSameOrNewerVersion(Collection<InternalDistributedMember> members,
Version version) {
members.removeIf(id -> id.getVersionObject().compareTo(version) >= 0);
public Set<InternalDistributedMember> addAllMembershipListenerAndGetAllIds(MembershipListener l) {
MembershipManager mgr = membershipManager;
try {
synchronized (membersLock) {
// Don't let the members come and go while we are adding this
// listener. This ensures that the listener (probably a
// ReplyProcessor) gets a consistent view of the members.
return getDistributionManagerIdsIncludingAdmin();
} finally {
* Sends a startup message and waits for a response. Returns true if response received; false if
* it timed out or there are no peers.
private boolean sendStartupMessage(StartupOperation startupOperation)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
receivedStartupResponse = false;
boolean ok;
// Be sure to add ourself to the equivalencies list!
Set<InetAddress> equivs = StartupMessage.getMyAddresses(this);
if (equivs == null || equivs.size() == 0) {
// no network interface
equivs = new HashSet<>();
try {
} catch (UnknownHostException e) {
// can't even get localhost
if (getViewMembers().size() > 1) {
throw new SystemConnectException(
"Unable to examine network cards and other members exist");
String redundancyZone = getConfig().getRedundancyZone();
if (redundancyZone != null && !redundancyZone.equals("")) {
setRedundancyZone(getDistributionManagerId(), redundancyZone);
if (logger.isDebugEnabled()) {
StringBuffer sb = new StringBuffer();
sb.append("Equivalent IPs for this host: ");
Iterator it = equivs.iterator();
while (it.hasNext()) {
InetAddress in = (InetAddress);
if (it.hasNext()) {
sb.append(", ");
} // while
// we need to send this to everyone else; even admin vm
Set<InternalDistributedMember> allOthers = new HashSet<>(getViewMembers());
if (allOthers.isEmpty()) {
return false; // no peers, we are alone.
try {
ok = startupOperation.sendStartupMessage(allOthers, equivs, redundancyZone,
} catch (Exception re) {
throw new SystemConnectException(
"One or more peers generated exceptions during connection attempt",
if (rejectionMessage != null) {
throw new IncompatibleSystemException(rejectionMessage);
boolean receivedAny = receivedStartupResponse;
if (!ok) { // someone didn't reply
int unresponsiveCount;
synchronized (unfinishedStartupsLock) {
if (unfinishedStartups == null)
unresponsiveCount = 0;
unresponsiveCount = unfinishedStartups.size();
if (unresponsiveCount != 0) {
if (Boolean.getBoolean("DistributionManager.requireAllStartupResponses")) {
throw new SystemConnectException(
String.format("No startup replies from: %s",
} // synchronized
// Bug 35887:
// If there are other members, we must receive at least _one_ response
if (allOthers.size() != 0) { // there exist others
if (!receivedAny) { // and none responded
StringBuilder sb = new StringBuilder();
Iterator itt = allOthers.iterator();
while (itt.hasNext()) {
Object m =;
if (itt.hasNext())
sb.append(", ");
printStacks(allOthers, false);
throw new SystemConnectException(
"Received no connection acknowledgments from any of the %s senior cache members: %s",
Integer.toString(allOthers.size()), sb.toString()));
} // and none responded
} // there exist others
InternalDistributedMember e = clusterElderManager.getElderId();
if (e != null) { // an elder exists
boolean unresponsiveElder;
synchronized (unfinishedStartupsLock) {
if (unfinishedStartups == null)
unresponsiveElder = false;
unresponsiveElder = unfinishedStartups.contains(e);
if (unresponsiveElder) {
"Forcing an elder join event since a startup response was not received from elder {}.",
} // an elder exists
} // someone didn't reply
return receivedAny;
* List of InternalDistributedMember's that we have not received startup replies from. If null, we
* have not finished sending the startup message.
* <p>
* Must be synchronized using {@link #unfinishedStartupsLock}
private Set<InternalDistributedMember> unfinishedStartups = null;
* Synchronization for {@link #unfinishedStartups}
private final Object unfinishedStartupsLock = new Object();
public void setUnfinishedStartups(Collection<InternalDistributedMember> s) {
synchronized (unfinishedStartupsLock) {
Assert.assertTrue(unfinishedStartups == null, "Set unfinished startups twice");
unfinishedStartups = new HashSet<>(s);
// OK, I don't _quite_ trust the list to be current, so let's
// prune it here.
Iterator it = unfinishedStartups.iterator();
synchronized (membersLock) {
while (it.hasNext()) {
InternalDistributedMember m = (InternalDistributedMember);
if (!isCurrentMember(m)) {
} // while
} // synchronized
public void removeUnfinishedStartup(InternalDistributedMember m, boolean departed) {
synchronized (unfinishedStartupsLock) {
if (logger.isDebugEnabled()) {
logger.debug("removeUnfinishedStartup for {} wtih {}", m, unfinishedStartups);
if (unfinishedStartups == null)
return; // not yet done with startup
if (!unfinishedStartups.remove(m))
String msg;
if (departed) {
msg =
"Stopped waiting for startup reply from <{}> because the peer departed the view.";
} else {
msg =
"Stopped waiting for startup reply from <{}> because the reply was finally received.";
}, m);
int numLeft = unfinishedStartups.size();
if (numLeft != 0) {"Still awaiting {} response(s) from: {}.",
new Object[] {numLeft, unfinishedStartups});
} // synchronized
* Processes the first startup response.
* @see StartupResponseMessage#process
void processStartupResponse(InternalDistributedMember sender, String theRejectionMessage) {
removeUnfinishedStartup(sender, false);
synchronized (this) {
if (!receivedStartupResponse) {
// only set the cacheTimeDelta once
receivedStartupResponse = true;
if (theRejectionMessage != null && rejectionMessage == null) {
// remember the first non-null rejection. This fixes bug 33266
rejectionMessage = theRejectionMessage;
private String prettifyReason(String r) {
final String str = "";
if (r.startsWith(str)) {
return r.substring(str.length());
return r;
* Returns true if id was removed. Returns false if it was not in the list of managers.
private boolean removeManager(InternalDistributedMember theId, boolean crashed, String p_reason) {
String reason = p_reason;
boolean result = false; // initialization shouldn't be required, but...
// Test once before acquiring the lock, fault tolerance for potentially
// recursive (and deadlock) conditions -- bug33626
// Note that it is always safe to _read_ {@link members} without locking
if (isCurrentMember(theId)) {
// Destroy underlying member's resources
reason = prettifyReason(reason);
synchronized (membersLock) {
if (logger.isDebugEnabled()) {
logger.debug("DistributionManager: removing member <{}>; crashed {}; reason = {}", theId,
crashed, reason);
Map<InternalDistributedMember, InternalDistributedMember> tmp = new HashMap<>(members);
if (tmp.remove(theId) != null) {
// Note we don't modify in place. This allows reader to get snapshots
// without locking.
if (tmp.isEmpty()) {
tmp = Collections.emptyMap();
} else {
tmp = Collections.unmodifiableMap(tmp);
members = tmp;
result = true;
} else {
result = false;
// Don't get upset since this can happen twice due to
// an explicit remove followed by an implicit one caused
// by a JavaGroup view change
Set<InternalDistributedMember> tmp2 = new HashSet<>(membersAndAdmin);
if (tmp2.remove(theId)) {
if (tmp2.isEmpty()) {
tmp2 = Collections.emptySet();
} else {
tmp2 = Collections.unmodifiableSet(tmp2);
membersAndAdmin = tmp2;
} // synchronized
} // if
return result;
* Makes note of a new distribution manager that has started up in the distributed cache. Invokes
* the appropriately listeners.
* @param theId The id of the distribution manager starting up
private void handleManagerStartup(InternalDistributedMember theId) {
HashMap<InternalDistributedMember, InternalDistributedMember> tmp;
synchronized (membersLock) {
// Note test is under membersLock
if (members.containsKey(theId)) {
return; // already accounted for
// Note we don't modify in place. This allows reader to get snapshots
// without locking.
tmp = new HashMap<>(members);
tmp.put(theId, theId);
members = Collections.unmodifiableMap(tmp);
Set<InternalDistributedMember> stmp = new HashSet<>(membersAndAdmin);
membersAndAdmin = Collections.unmodifiableSet(stmp);
} // synchronized
if (theId.getVmKind() != ClusterDistributionManager.LOCATOR_DM_TYPE) {
}"Admitting member <{}>. Now there are {} non-admin member(s).",
theId, tmp.size());
addMemberEvent(new MemberJoinedEvent(theId));
public boolean isCurrentMember(DistributedMember id) {
Set m;
synchronized (membersLock) {
// access to members synchronized under membersLock in order to
// ensure serialization
m = membersAndAdmin;
return m.contains(id);
* Makes note of a new console that has started up in the distributed cache.
private void handleConsoleStartup(InternalDistributedMember theId) {
// if we have an all listener then notify it NOW.
HashSet<InternalDistributedMember> tmp;
synchronized (membersLock) {
// Note test is under membersLock
if (membersAndAdmin.contains(theId))
return; // already accounted for
// Note we don't modify in place. This allows reader to get snapshots
// without locking.
tmp = new HashSet<>(membersAndAdmin);
membersAndAdmin = Collections.unmodifiableSet(tmp);
} // synchronized
for (MembershipListener listener : allMembershipListeners) {
listener.memberJoined(this, theId);
}"DMMembership: Admitting new administration member < {} >.",
// Note that we don't add the member to the list of admin consoles until
// we receive a message from them.
* Process an incoming distribution message. This includes scheduling it correctly based on the
* message's nioPriority (executor type)
private void handleIncomingDMsg(DistributionMessage message) {
if (logger.isDebugEnabled()) {
logger.debug("Received message '{}' from <{}>", message, message.getSender());
* Makes note of a console that has shut down.
* @param theId The id of the console shutting down
* @param crashed only true if we detect this id to be gone from a javagroup view
* @see AdminConsoleDisconnectMessage#process
public void handleConsoleShutdown(InternalDistributedMember theId, boolean crashed,
String reason) {
boolean removedConsole = false;
boolean removedMember = false;
synchronized (membersLock) {
// to fix bug 39747 we can only remove this member from
// membersAndAdmin if it is not in members.
// This happens when we have an admin member colocated with a normal DS.
// In this case we need for the normal DS to shutdown or crash.
if (!members.containsKey(theId)) {
if (logger.isDebugEnabled())
logger.debug("DistributionManager: removing admin member <{}>; crashed = {}; reason = {}",
theId, crashed, reason);
Set<InternalDistributedMember> tmp = new HashSet<>(membersAndAdmin);
if (tmp.remove(theId)) {
// Note we don't modify in place. This allows reader to get snapshots
// without locking.
if (tmp.isEmpty()) {
tmp = Collections.emptySet();
} else {
tmp = Collections.unmodifiableSet(tmp);
membersAndAdmin = tmp;
removedMember = true;
} else {
// Don't get upset since this can happen twice due to
// an explicit remove followed by an implicit one caused
// by a JavaGroup view change
synchronized (adminConsolesLock) {
if (adminConsoles.contains(theId)) {
removedConsole = true;
Set<InternalDistributedMember> tmp = new HashSet<>(adminConsoles);
if (tmp.isEmpty()) {
tmp = Collections.emptySet();
} else {
tmp = Collections.unmodifiableSet(tmp);
adminConsoles = tmp;
if (removedMember) {
for (MembershipListener listener : allMembershipListeners) {
listener.memberDeparted(this, theId, crashed);
if (removedConsole) {
String msg;
if (crashed) {
msg = "Administration member at {} crashed: {}";
} else {
msg = "Administration member at {} closed: {}";
}, new Object[] {theId, reason});
void shutdownMessageReceived(InternalDistributedMember theId, String reason) {
membershipManager.shutdownMessageReceived(theId, reason);
handleManagerDeparture(theId, false,
"shutdown message received");
public void handleManagerDeparture(InternalDistributedMember theId, boolean p_crashed,
String p_reason) {
int vmType = theId.getVmKind();
if (vmType == ADMIN_ONLY_DM_TYPE) {
removeUnfinishedStartup(theId, true);
handleConsoleShutdown(theId, p_crashed, p_reason);
// not an admin VM...
if (!isCurrentMember(theId)) {
return; // fault tolerance
removeUnfinishedStartup(theId, true);
if (removeManager(theId, p_crashed, p_reason)) {
if (theId.getVmKind() != ClusterDistributionManager.LOCATOR_DM_TYPE) {
String msg;
if (p_crashed && !shouldInhibitMembershipWarnings()) {
msg =
"Member at {} unexpectedly left the distributed cache: {}";
addMemberEvent(new MemberCrashedEvent(theId, p_reason));
} else {
msg =
"Member at {} gracefully left the distributed cache: {}";
addMemberEvent(new MemberDepartedEvent(theId, p_reason));
}, new Object[] {theId, prettifyReason(p_reason)});
// Remove this manager from the serialQueueExecutor.
if (serialQueuedExecutorPool != null) {
private void handleManagerSuspect(InternalDistributedMember suspect,
InternalDistributedMember whoSuspected, String reason) {
if (!isCurrentMember(suspect)) {
return; // fault tolerance
int vmType = suspect.getVmKind();
if (vmType == ADMIN_ONLY_DM_TYPE) {
addMemberEvent(new MemberSuspectEvent(suspect, whoSuspected, reason));
private void handleViewInstalled(NetView view) {
addMemberEvent(new ViewInstalledEvent(view));
private void handleQuorumLost(Set<InternalDistributedMember> failures,
List<InternalDistributedMember> remaining) {
addMemberEvent(new QuorumLostEvent(failures, remaining));
* Sends the shutdown message. Not all DistributionManagers need to do this.
private void sendShutdownMessage() {
if (getDMType() == ADMIN_ONLY_DM_TYPE && Locator.getLocators().size() == 0) {
// [bruce] changed above "if" to have ShutdownMessage sent by locators.
// Otherwise the system can hang because an admin member does not trigger
// member-left notification unless a new view is received showing the departure.
// If two locators are simultaneously shut down this may not occur.
ShutdownMessage m = new ShutdownMessage();
InternalDistributedMember theId = getDistributionManagerId();
Set<InternalDistributedMember> allOthers = new HashSet<>(getViewMembers());
// Address recipient = (Address) m.getRecipient();
if (logger.isTraceEnabled()) {
logger.trace("{} Sending {} to {}", getDistributionManagerId(), m,
try {
// m.resetTimestamp(); // nanotimers across systems don't match
long startTime = DistributionStats.getStatTime();
sendViaMembershipManager(m.getRecipients(), m, this, stats);
if (DistributionStats.enableClockStats) {
stats.incSentMessagesTime(DistributionStats.getStatTime() - startTime);
} catch (CancelException e) {
logger.debug(String.format("CancelException caught sending shutdown: %s", e.getMessage()), e);
} catch (Exception ex2) {
logger.fatal("While sending shutdown message", ex2);
} finally {
// Even if the message wasn't sent, *lie* about it, so that
// everyone believes that message distribution is done.
shutdownMsgSent = true;
* Returns the executor for the given type of processor.
public Executor getExecutor(int processorType, InternalDistributedMember sender) {
switch (processorType) {
return getThreadPool();
return getSerialExecutor(sender);
return viewThread;
return getHighPriorityThreadPool();
return getWaitingThreadPool();
return getPartitionedRegionExcecutor();
return getFunctionExecutor();
throw new InternalGemFireError(String.format("unknown processor type %s",
* Actually does the work of sending a message out over the distribution channel.
* @param message the message to send
* @return list of recipients that did not receive the message because they left the view (null if
* all received it or it was sent to {@link DistributionMessage#ALL_RECIPIENTS}.
* @throws NotSerializableException If <code>message</code> cannot be serialized
Set<InternalDistributedMember> sendOutgoing(DistributionMessage message)
throws NotSerializableException {
long startTime = DistributionStats.getStatTime();
Set<InternalDistributedMember> result =
sendViaMembershipManager(message.getRecipients(), message, this, stats);
long endTime = 0L;
if (DistributionStats.enableClockStats) {
endTime = NanoTimer.getTime();
boolean sentToAll = message.forAll();
if (sentToAll) {
if (DistributionStats.enableClockStats) {
stats.incBroadcastMessagesTime(endTime - startTime);
if (DistributionStats.enableClockStats) {
stats.incSentMessagesTime(endTime - startTime);
stats.incDistributeMessageTime(endTime - message.getTimestamp());
return result;
* @return recipients who did not receive the message
* @throws NotSerializableException If <codE>message</code> cannot be serialized
private Set<InternalDistributedMember> sendMessage(DistributionMessage message)
throws NotSerializableException {
try {
// Verify we're not too far into the shutdown
// avoid race condition during startup
return sendOutgoing(message);
} catch (NotSerializableException | ToDataException | ReenteredConnectException
| InvalidDeltaException | CancelException ex) {
throw ex;
} catch (Exception ex) {
exceptionInThreads = true;
String receiver = "NULL";
if (message != null) {
receiver = message.getRecipientsDescription();
logger.fatal(String.format("While pushing message <%s> to %s", message, receiver), ex);
if (message == null || message.forAll()) {
return null;
return new HashSet<>(Arrays.asList(message.getRecipients()));
* @return list of recipients who did not receive the message because they left the view (null if
* all received it or it was sent to {@link DistributionMessage#ALL_RECIPIENTS}).
* @throws NotSerializableException If content cannot be serialized
private Set<InternalDistributedMember> sendViaMembershipManager(
InternalDistributedMember[] destinations,
DistributionMessage content, ClusterDistributionManager dm, DistributionStats stats)
throws NotSerializableException {
if (membershipManager == null) {
logger.warn("Attempting a send to a disconnected DistributionManager");
if (destinations.length == 1 && destinations[0] == DistributionMessage.ALL_RECIPIENTS)
return null;
HashSet<InternalDistributedMember> result = new HashSet<>();
Collections.addAll(result, destinations);
return result;
return membershipManager.send(destinations, content, stats);
* Schedule a given message appropriately, depending upon its executor kind.
private void scheduleIncomingMessage(DistributionMessage message) {
* Potential race condition between starting up and getting other distribution manager ids -- DM
* will only be initialized upto the point at which it called startThreads
public InternalDistributedMember getElderId() throws DistributedSystemDisconnectedException {
return clusterElderManager.getElderId();
public boolean isElder() {
return clusterElderManager.isElder();
public boolean isLoner() {
return false;
public ElderState getElderState(boolean waitToBecomeElder) throws InterruptedException {
return clusterElderManager.getElderState(waitToBecomeElder);
* Waits until elder if newElder or newElder is no longer a member
* @return true if newElder is the elder; false if it is no longer a member or we are the elder.
public boolean waitForElder(final InternalDistributedMember desiredElder)
throws InterruptedException {
return clusterElderManager.waitForElder(desiredElder);
public ExecutorService getThreadPool() {
return threadPool;
public ExecutorService getHighPriorityThreadPool() {
return highPriorityPool;
public ExecutorService getWaitingThreadPool() {
return waitingPool;
public ExecutorService getPrMetaDataCleanupThreadPool() {
return prMetaDataCleanupThreadPool;
private Executor getPartitionedRegionExcecutor() {
if (partitionedRegionThread != null) {
return partitionedRegionThread;
} else {
return partitionedRegionPool;
public Executor getFunctionExecutor() {
if (functionExecutionThread != null) {
return functionExecutionThread;
} else {
return functionExecutionPool;
private Executor getSerialExecutor(InternalDistributedMember sender) {
return serialQueuedExecutorPool.getThrottledSerialExecutor(sender);
} else {
return serialThread;
/** returns the serialThread's queue if throttling is being used, null if not */
public OverflowQueueWithDMStats<Runnable> getSerialQueue(InternalDistributedMember sender) {
return serialQueuedExecutorPool.getSerialQueue(sender);
} else {
return serialQueue;
public ThreadsMonitoring getThreadMonitoring() {
return threadMonitor;
* Sets the administration agent associated with this distribution manager.
public void setAgent(RemoteGfManagerAgent agent) {
// Don't let the agent be set twice. There should be a one-to-one
// correspondence between admin agent and distribution manager.
if (agent != null) {
if (this.agent != null) {
throw new IllegalStateException(
"There is already an Admin Agent associated with this distribution manager.");
} else {
if (this.agent == null) {
throw new IllegalStateException(
"There was never an Admin Agent associated with this distribution manager.");
this.agent = agent;
* Returns the agent that owns this distribution manager. (in ConsoleDistributionManager)
public RemoteGfManagerAgent getAgent() {
return agent;
* Returns a description of the distribution configuration used for this distribution manager. (in
* ConsoleDistributionManager)
* @return <code>null</code> if no admin {@linkplain #getAgent agent} is associated with this
* distribution manager
public String getDistributionConfigDescription() {
if (agent == null) {
return null;
} else {
return agent.getTransport().toString();
/* -----------------------------Health Monitor------------------------------ */
private final ConcurrentMap<InternalDistributedMember, HealthMonitor> hmMap =
new ConcurrentHashMap<>();
private volatile InternalCache cache;
* Returns the health monitor for this distribution manager and owner.
* @param owner the agent that owns the returned monitor
* @return the health monitor created by the owner; <code>null</code> if the owner has now created
* a monitor.
* @since GemFire 3.5
public HealthMonitor getHealthMonitor(InternalDistributedMember owner) {
return hmMap.get(owner);
* Returns the health monitor for this distribution manager.
* @param owner the agent that owns the created monitor
* @param cfg the configuration to use when creating the monitor
* @since GemFire 3.5
public void createHealthMonitor(InternalDistributedMember owner, GemFireHealthConfig cfg) {
if (closeInProgress) {
final HealthMonitor hm = getHealthMonitor(owner);
if (hm != null) {
HealthMonitorImpl newHm = new HealthMonitorImpl(owner, cfg, this);
hmMap.put(owner, newHm);
* Remove a monitor that was previously created.
* @param owner the agent that owns the monitor to remove
public void removeHealthMonitor(InternalDistributedMember owner, int theId) {
final HealthMonitor hm = getHealthMonitor(owner);
if (hm != null && hm.getId() == theId) {
private void removeAllHealthMonitors() {
Iterator it = hmMap.values().iterator();
while (it.hasNext()) {
HealthMonitor hm = (HealthMonitor);
public Set<InternalDistributedMember> getAdminMemberSet() {
return adminConsoles;
/** Returns count of members filling the specified role */
public int getRoleCount(Role role) {
int count = 0;
Set<InternalDistributedMember> mbrs = getDistributionManagerIds();
for (InternalDistributedMember mbr : mbrs) {
Set<Role> roles = (mbr).getRoles();
for (Role mbrRole : roles) {
if (mbrRole.equals(role)) {
return count;
/** Returns true if at least one member is filling the specified role */
public boolean isRolePresent(Role role) {
Set<InternalDistributedMember> mbrs = getDistributionManagerIds();
for (InternalDistributedMember mbr : mbrs) {
Set<Role> roles = mbr.getRoles();
for (Role mbrRole : roles) {
if ((mbrRole).equals(role)) {
return true;
return false;
/** Returns a set of all roles currently in the distributed system. */
public Set<Role> getAllRoles() {
Set<Role> allRoles = new HashSet<>();
Set<InternalDistributedMember> mbrs = getDistributionManagerIds();
for (InternalDistributedMember mbr : mbrs) {
return allRoles;
* Returns the membership manager for this distributed system. The membership manager owns the
* membership set and handles all communications. The manager should NOT be used to bypass
* DistributionManager to send or receive messages.
* <p>
* This method was added to allow hydra to obtain thread-local data for transport from one thread
* to another.
public MembershipManager getMembershipManager() {
// NOTE: do not add cancellation checks here. This method is
// used during auto-reconnect after the DS has been closed
return membershipManager;
////////////////////// Inner Classes //////////////////////
* This class is used for DM's multi serial executor. The serial messages are managed/executed by
* multiple serial thread. This class takes care of executing messages related to a sender using
* the same thread.
private static class SerialQueuedExecutorPool {
/** To store the serial threads */
final ConcurrentMap<Integer, ExecutorService> serialQueuedExecutorMap =
new ConcurrentHashMap<>(MAX_SERIAL_QUEUE_THREAD);
/** To store the queue associated with thread */
final Map<Integer, OverflowQueueWithDMStats<Runnable>> serialQueuedMap =
/** Holds mapping between sender to the serial thread-id */
final Map<InternalDistributedMember, Integer> senderToSerialQueueIdMap = new HashMap<>();
* Holds info about unused thread, a thread is marked unused when the member associated with it
* has left distribution system.
final ArrayList<Integer> threadMarkedForUse = new ArrayList<>();
final DistributionStats stats;
final boolean throttlingDisabled;
final ThreadsMonitoring threadMonitoring;
SerialQueuedExecutorPool(DistributionStats stats,
boolean throttlingDisabled, ThreadsMonitoring tMonitoring) {
this.stats = stats;
this.throttlingDisabled = throttlingDisabled;
threadMonitoring = tMonitoring;
* Returns an id of the thread in serialQueuedExecutorMap, thats mapped to the given seder.
* @param createNew boolean flag to indicate whether to create a new id, if id doesnot exists.
private Integer getQueueId(InternalDistributedMember sender, boolean createNew) {
// Create a new Id.
Integer queueId;
synchronized (senderToSerialQueueIdMap) {
// Check if there is a executor associated with this sender.
queueId = senderToSerialQueueIdMap.get(sender);
if (!createNew || queueId != null) {
return queueId;
// Create new.
// Check if any threads are availabe that is marked for Use.
if (!threadMarkedForUse.isEmpty()) {
queueId = threadMarkedForUse.remove(0);
// If Map is full, use the threads in round-robin fashion.
if (queueId == null) {
queueId = (serialQueuedExecutorMap.size() + 1) % MAX_SERIAL_QUEUE_THREAD;
senderToSerialQueueIdMap.put(sender, queueId);
return queueId;
* Returns the queue associated with this sender. Used in FlowControl for throttling (based on
* queue size).
OverflowQueueWithDMStats<Runnable> getSerialQueue(InternalDistributedMember sender) {
Integer queueId = getQueueId(sender, false);
if (queueId == null) {
return null;
return serialQueuedMap.get(queueId);
* Returns the serial queue executor, before returning the thread this applies throttling, based
* on the total serial queue size (total - sum of all the serial queue size). The throttling is
* applied during put event, this doesnt block the extract operation on the queue.
ExecutorService getThrottledSerialExecutor(
InternalDistributedMember sender) {
ExecutorService executor = getSerialExecutor(sender);
// Get the total serial queue size.
long totalSerialQueueMemSize = stats.getInternalSerialQueueBytes();
// for tcp socket reader threads, this code throttles the thread
// to keep the sender-side from overwhelming the receiver.
// UDP readers are throttled in the FC protocol, which queries
// the queue to see if it should throttle
if (stats.getInternalSerialQueueBytes() > TOTAL_SERIAL_QUEUE_THROTTLE
&& !DistributionMessage.isPreciousThread()) {
do {
boolean interrupted = Thread.interrupted();
try {
float throttlePercent = (float) (totalSerialQueueMemSize - TOTAL_SERIAL_QUEUE_THROTTLE)
int sleep = (int) (100.0 * throttlePercent);
sleep = Math.max(sleep, 1);
} catch (InterruptedException ex) {
interrupted = true;
// FIXME-InterruptedException
// Perhaps we should return null here?
} finally {
if (interrupted) {
} while (stats.getInternalSerialQueueBytes() >= TOTAL_SERIAL_QUEUE_BYTE_LIMIT);
return executor;
* Returns the serial queue executor for the given sender.
ExecutorService getSerialExecutor(InternalDistributedMember sender) {
ExecutorService executor;
Integer queueId = getQueueId(sender, true);
if ((executor =
serialQueuedExecutorMap.get(queueId)) != null) {
return executor;
// If executor doesn't exists for this sender, create one.
executor = createSerialExecutor(queueId);
serialQueuedExecutorMap.put(queueId, executor);
if (logger.isDebugEnabled()) {
"Created Serial Queued Executor With queueId {}. Total number of live Serial Threads :{}",
queueId, serialQueuedExecutorMap.size());
return executor;
* Creates a serial queue executor.
private ExecutorService createSerialExecutor(final Integer id) {
OverflowQueueWithDMStats<Runnable> poolQueue;
if (SERIAL_QUEUE_BYTE_LIMIT == 0 || throttlingDisabled) {
poolQueue = new OverflowQueueWithDMStats<>(stats.getSerialQueueHelper());
} else {
poolQueue = new ThrottlingMemLinkedQueueWithDMStats<>(SERIAL_QUEUE_BYTE_LIMIT,
serialQueuedMap.put(id, poolQueue);
return LoggingExecutors.newSerialThreadPool("Pooled Serial Message Processor" + id + "-",
thread -> stats.incSerialPooledThreadStarts(), this::doSerialPooledThread,
stats.getSerialPooledProcessorHelper(), threadMonitoring, poolQueue);
private void doSerialPooledThread(Runnable command) {
try {;
} finally {
* Does cleanup relating to this member. And marks the serial executor associated with this
* member for re-use.
private void handleMemberDeparture(InternalDistributedMember member) {
Integer queueId = getQueueId(member, false);
if (queueId == null) {
boolean isUsed = false;
synchronized (senderToSerialQueueIdMap) {
// Check if any other members are using the same executor.
for (Integer value : senderToSerialQueueIdMap.values()) {
if (value.equals(queueId)) {
isUsed = true;
// If not used mark this as unused.
if (!isUsed) {
if (logger.isInfoEnabled(LogMarker.DM_MARKER)),
"Marking the SerialQueuedExecutor with id : {} used by the member {} to be unused.",
new Object[] {queueId, member});
private void awaitTermination(long time, TimeUnit unit) throws InterruptedException {
long remainingNanos = unit.toNanos(time);
long start = System.nanoTime();
for (ExecutorService executor : serialQueuedExecutorMap.values()) {
executor.awaitTermination(remainingNanos, TimeUnit.NANOSECONDS);
remainingNanos = (System.nanoTime() - start);
if (remainingNanos <= 0) {
private void shutdown() {
for (ExecutorService executor : serialQueuedExecutorMap
.values()) {
* This is the listener implementation for responding from events from the Membership Manager.
private class DMListener implements DistributedMembershipListener {
ClusterDistributionManager dm;
DMListener(ClusterDistributionManager dm) { = dm;
public void membershipFailure(String reason, Throwable t) {
exceptionInThreads = true;
rootCause = t;
getSystem().disconnect(reason, true);
public void messageReceived(DistributionMessage message) {
public void newMemberConnected(InternalDistributedMember member) {
// Do not elect the elder here as surprise members invoke this callback
// without holding the view lock. That can cause a race condition and
// subsequent deadlock (#45566). Elder selection is now done when a view
// is installed.
public void memberDeparted(InternalDistributedMember theId, boolean crashed, String reason) {
boolean wasAdmin = getAdminMemberSet().contains(theId);
if (wasAdmin) {
// Pretend we received an AdminConsoleDisconnectMessage from the console that
// is no longer in the JavaGroup view.
// It must have died without sending a ShutdownMessage.
// This fixes bug 28454.
AdminConsoleDisconnectMessage message = new AdminConsoleDisconnectMessage();
message.setIgnoreAlertListenerRemovalFailure(true); // we don't know if it was a listener so
// don't issue a warning
message.setReason(reason); // added for #37950
dm.handleManagerDeparture(theId, crashed, reason);
public void memberSuspect(InternalDistributedMember suspect,
InternalDistributedMember whoSuspected, String reason) {
dm.handleManagerSuspect(suspect, whoSuspected, reason);
public void viewInstalled(NetView view) {
public void quorumLost(Set<InternalDistributedMember> failures,
List<InternalDistributedMember> remaining) {
dm.handleQuorumLost(failures, remaining);
public ClusterDistributionManager getDM() {
return dm;
public void saveConfig() {
if (!getConfig().getDisableAutoReconnect()) {
private abstract static class MemberEvent {
private InternalDistributedMember id;
MemberEvent(InternalDistributedMember id) { = id;
public InternalDistributedMember getId() {
return id;
void handleEvent(ClusterDistributionManager manager) {
handleEvent(manager, manager.membershipListeners.keySet());
handleEvent(manager, manager.allMembershipListeners);
protected abstract void handleEvent(ClusterDistributionManager manager,
MembershipListener listener);
private void handleEvent(ClusterDistributionManager manager,
Set<MembershipListener> membershipListeners) {
for (MembershipListener listener : membershipListeners) {
try {
handleEvent(manager, listener);
} catch (CancelException e) {
if (manager.shouldInhibitMembershipWarnings()) {
if (logger.isTraceEnabled()) {
logger.trace("MemberEventInvoker: cancelled");
} else {
logger.warn("Unexpected cancellation", e);
} catch (VirtualMachineError err) {
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
logger.warn(String.format("Exception while calling membership listener for event: %s",
* This is an event reflecting that a InternalDistributedMember has joined the system.
private static class MemberJoinedEvent extends MemberEvent {
MemberJoinedEvent(InternalDistributedMember id) {
public String toString() {
return "member " + getId() + " joined";
protected void handleEvent(ClusterDistributionManager manager, MembershipListener listener) {
listener.memberJoined(manager, getId());
* This is an event reflecting that a InternalDistributedMember has left the system.
private static class MemberDepartedEvent extends MemberEvent {
String reason;
MemberDepartedEvent(InternalDistributedMember id, String r) {
reason = r;
public String toString() {
return "member " + getId() + " departed (" + reason + ")";
protected void handleEvent(ClusterDistributionManager manager, MembershipListener listener) {
listener.memberDeparted(manager, getId(), false);
* This is an event reflecting that a InternalDistributedMember has left the system in an
* unexpected way.
private static class MemberCrashedEvent extends MemberEvent {
String reason;
MemberCrashedEvent(InternalDistributedMember id, String r) {
reason = r;
public String toString() {
return "member " + getId() + " crashed: " + reason;
protected void handleEvent(ClusterDistributionManager manager, MembershipListener listener) {
listener.memberDeparted(manager, getId(), true/* crashed */);
* This is an event reflecting that a InternalDistributedMember may be missing but has not yet
* left the system.
private static class MemberSuspectEvent extends MemberEvent {
InternalDistributedMember whoSuspected;
String reason;
MemberSuspectEvent(InternalDistributedMember suspect, InternalDistributedMember whoSuspected,
String reason) {
this.whoSuspected = whoSuspected;
this.reason = reason;
public InternalDistributedMember whoSuspected() {
return whoSuspected;
public String getReason() {
return reason;
public String toString() {
return "member " + getId() + " suspected by: " + whoSuspected + " reason: " + reason;
protected void handleEvent(ClusterDistributionManager manager, MembershipListener listener) {
listener.memberSuspect(manager, getId(), whoSuspected(), reason);
private static class ViewInstalledEvent extends MemberEvent {
NetView view;
ViewInstalledEvent(NetView view) {
this.view = view;
public long getViewId() {
return view.getViewId();
public String toString() {
return "view installed: " + view;
public void handleEvent(ClusterDistributionManager manager) {
protected void handleEvent(ClusterDistributionManager manager, MembershipListener listener) {
throw new UnsupportedOperationException();
private static class QuorumLostEvent extends MemberEvent {
Set<InternalDistributedMember> failures;
List<InternalDistributedMember> remaining;
QuorumLostEvent(Set<InternalDistributedMember> failures,
List<InternalDistributedMember> remaining) {
this.failures = failures;
this.remaining = remaining;
public Set<InternalDistributedMember> getFailures() {
return failures;
public List<InternalDistributedMember> getRemaining() {
return remaining;
public String toString() {
return "quorum lost. failures=" + failures + "; remaining=" + remaining;
protected void handleEvent(ClusterDistributionManager manager, MembershipListener listener) {
listener.quorumLost(manager, getFailures(), getRemaining());
* (non-Javadoc)
* @see org.apache.geode.distributed.internal.DM#getRootCause()
public Throwable getRootCause() {
return rootCause;
* (non-Javadoc)
* @see org.apache.geode.distributed.internal.DM#setRootCause(java.lang.Throwable)
public void setRootCause(Throwable t) {
rootCause = t;
* (non-Javadoc)
* @see org.apache.geode.distributed.internal.DM#getMembersOnThisHost()
* @since GemFire 5.9
public Set<InternalDistributedMember> getMembersInThisZone() {
return getMembersInSameZone(getDistributionManagerId());
public Set<InternalDistributedMember> getMembersInSameZone(
InternalDistributedMember targetMember) {
Set<InternalDistributedMember> buddyMembers = new HashSet<>();
if (!redundancyZones.isEmpty()) {
synchronized (redundancyZones) {
String targetZone = redundancyZones.get(targetMember);
for (Map.Entry<InternalDistributedMember, String> entry : redundancyZones.entrySet()) {
if (entry.getValue().equals(targetZone)) {
} else {
Set<InetAddress> targetAddrs = getEquivalents(targetMember.getInetAddress());
for (InternalDistributedMember o : getDistributionManagerIds()) {
if (!Collections.disjoint(targetAddrs, getEquivalents(o.getInetAddress()))) {
return buddyMembers;
public boolean areInSameZone(InternalDistributedMember member1,
InternalDistributedMember member2) {
if (!redundancyZones.isEmpty()) {
String zone1 = redundancyZones.get(member1);
String zone2 = redundancyZones.get(member2);
return zone1 != null && zone1.equals(zone2);
} else {
return areOnEquivalentHost(member1, member2);
public void acquireGIIPermitUninterruptibly() {
public void releaseGIIPermit() {
public void setDistributedSystemId(int distributedSystemId) {
if (distributedSystemId != -1) {
this.distributedSystemId = distributedSystemId;
public int getDistributedSystemId() {
return distributedSystemId;
* this causes the given InternalDistributedMembers to log thread dumps. If useNative is true we
* attempt to use OSProcess native code for the dumps. This goes to stdout instead of the
* system.log files.
public void printStacks(Collection<InternalDistributedMember> ids, boolean useNative) {
Set<InternalDistributedMember> requiresMessage = new HashSet<>();
if (ids.contains(localAddress)) {
OSProcess.printStacks(0, useNative);
if (useNative) {
} else {
for (InternalDistributedMember mbr : ids) {
if (mbr.getProcessId() > 0
&& mbr.getInetAddress().equals(localAddress.getInetAddress())) {
if (!mbr.equals(localAddress)) {
if (!OSProcess.printStacks(mbr.getProcessId(), false)) {
} else {
if (requiresMessage.size() > 0) {
HighPriorityAckedMessage msg = new HighPriorityAckedMessage();
msg.dumpStacks(requiresMessage, useNative, false);
public Set<DistributedMember> getGroupMembers(String group) {
HashSet<DistributedMember> result = null;
for (DistributedMember m : getDistributionManagerIdsIncludingAdmin()) {
if (m.getGroups().contains(group)) {
if (result == null) {
result = new HashSet<>();
if (result == null) {
return Collections.emptySet();
} else {
return result;
public Set<InternalDistributedMember> getNormalDistributionManagerIds() {
// access to members synchronized under membersLock in order to
// ensure serialization
synchronized (membersLock) {
HashSet<InternalDistributedMember> result = new HashSet<>();
for (InternalDistributedMember m : members.keySet()) {
if (m.getVmKind() != ClusterDistributionManager.LOCATOR_DM_TYPE) {
return result;
/** test method to get the member IDs of all locators in the distributed system */
public Set<InternalDistributedMember> getLocatorDistributionManagerIds() {
// access to members synchronized under membersLock in order to
// ensure serialization
synchronized (membersLock) {
HashSet<InternalDistributedMember> result = new HashSet<>();
for (InternalDistributedMember m : members.keySet()) {
if (m.getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) {
return result;
public void setCache(InternalCache instance) {
cache = instance;
public InternalCache getCache() {
return cache;
public InternalCache getExistingCache() {
InternalCache result = cache;
if (result == null) {
throw new CacheClosedException(
"A cache has not yet been created.");
if (result.isClosed()) {
throw result.getCacheClosedException(
"The cache has been closed.", null);
return result;
private static class Stopper extends CancelCriterion {
private ClusterDistributionManager dm;
Stopper(ClusterDistributionManager dm) { = dm;
public String cancelInProgress() {
// remove call to validateDM() to fix bug 38356
if (dm.shutdownMsgSent) {
return String.format("%s: Message distribution has terminated",
if (dm.rootCause != null) {
return dm.toString() + ": " + dm.rootCause.getMessage();
// Nope.
return null;
public RuntimeException generateCancelledException(Throwable e) {
String reason = cancelInProgress();
if (reason == null) {
return null;
Throwable rc = dm.rootCause; // volatile read
if (rc == null) {
// No root cause, specify the one given and be done with it.
return new DistributedSystemDisconnectedException(reason, e);
if (e == null) {
// Caller did not specify any root cause, so just use our own.
return new DistributedSystemDisconnectedException(reason, rc);
// Attempt to stick rootCause at tail end of the exception chain.
Throwable nt = e;
while (nt.getCause() != null) {
nt = nt.getCause();
if (nt == rc) {
// Root cause already in place; we're done
return new DistributedSystemDisconnectedException(reason, e);
try {
return new DistributedSystemDisconnectedException(reason, e);
} catch (IllegalStateException e2) {
// Bug 39496 (Jrockit related) Give up. The following
// error is not entirely sane but gives the correct general picture.
return new DistributedSystemDisconnectedException(reason, rc);
public CancelCriterion getCancelCriterion() {
return stopper;