blob: e64d89d7b4bd246d16a845d7a7564add82d0784a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal;
import java.io.NotSerializableException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.ForcedDisconnectException;
import org.apache.geode.IncompatibleSystemException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.SystemConnectException;
import org.apache.geode.SystemFailure;
import org.apache.geode.ToDataException;
import org.apache.geode.admin.GemFireHealthConfig;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.Role;
import org.apache.geode.distributed.internal.locks.ElderState;
import org.apache.geode.distributed.internal.membership.DistributedMembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.MemberFactory;
import org.apache.geode.distributed.internal.membership.MembershipManager;
import org.apache.geode.distributed.internal.membership.NetView;
import org.apache.geode.distributed.internal.membership.adapter.auth.GMSAuthenticator;
import org.apache.geode.distributed.internal.membership.gms.messages.ViewAckMessage;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.OSProcess;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.admin.remote.AdminConsoleDisconnectMessage;
import org.apache.geode.internal.admin.remote.RemoteGfManagerAgent;
import org.apache.geode.internal.admin.remote.RemoteTransportConfig;
import org.apache.geode.internal.alerting.AlertingService;
import org.apache.geode.internal.cache.InitialImageOperation;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingExecutors;
import org.apache.geode.internal.logging.LoggingThread;
import org.apache.geode.internal.logging.LoggingUncaughtExceptionHandler;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.internal.monitoring.ThreadsMonitoringImpl;
import org.apache.geode.internal.monitoring.ThreadsMonitoringImplDummy;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.sequencelog.MembershipLogger;
import org.apache.geode.internal.tcp.Connection;
import org.apache.geode.internal.tcp.ConnectionTable;
import org.apache.geode.internal.tcp.ReenteredConnectException;
/**
* The <code>DistributionManager</code> uses a {@link MembershipManager} to distribute
* {@link DistributionMessage messages}. It also reports on who is currently in the distributed
* system and tracks the elder member for the distributed lock service. You may also register a
* membership listener with the DistributionManager to receive notification of changes in
* membership.
*
* <P>
*
* Code that wishes to send a {@link DistributionMessage} must get the
* <code>DistributionManager</code> and invoke {@link #putOutgoing}.
*
* <P>
*
* @see DistributionMessage#process
* @see IgnoredByManager
*/
public class ClusterDistributionManager implements DistributionManager {
private static final Logger logger = LogService.getLogger();
private static final boolean DEBUG_NO_ACKNOWLEDGEMENTS =
Boolean.getBoolean("DistributionManager.DEBUG_NO_ACKNOWLEDGEMENTS");
/**
* maximum time, in milliseconds, to wait for all threads to exit
*/
private static final int MAX_STOP_TIME = 20000;
/**
* Time to sleep, in milliseconds, while polling to see if threads have finished
*/
private static final int STOP_PAUSE_TIME = 1000;
/**
* Maximum number of interrupt attempts to stop a thread
*/
private static final int MAX_STOP_ATTEMPTS = 10;
private static final boolean SYNC_EVENTS = Boolean.getBoolean("DistributionManager.syncEvents");
/**
* Flag indicating whether to use single Serial-Executor thread or Multiple Serial-executor
* thread,
*/
private static final boolean MULTI_SERIAL_EXECUTORS =
!Boolean.getBoolean("DistributionManager.singleSerialExecutor");
private static final int MAX_WAITING_THREADS =
Integer.getInteger("DistributionManager.MAX_WAITING_THREADS", Integer.MAX_VALUE);
private static final int MAX_PR_META_DATA_CLEANUP_THREADS =
Integer.getInteger("DistributionManager.MAX_PR_META_DATA_CLEANUP_THREADS", 1);
public static final int MAX_THREADS =
Integer.getInteger("DistributionManager.MAX_THREADS", 100);
private static final int MAX_PR_THREADS = Integer.getInteger("DistributionManager.MAX_PR_THREADS",
Math.max(Runtime.getRuntime().availableProcessors() * 4, 16));
public static final int MAX_FE_THREADS = Integer.getInteger("DistributionManager.MAX_FE_THREADS",
Math.max(Runtime.getRuntime().availableProcessors() * 4, 16));
private static final int INCOMING_QUEUE_LIMIT =
Integer.getInteger("DistributionManager.INCOMING_QUEUE_LIMIT", 80000);
/** Throttling based on the Queue byte size */
private static final double THROTTLE_PERCENT = (double) (Integer
.getInteger("DistributionManager.SERIAL_QUEUE_THROTTLE_PERCENT", 75)) / 100;
private static final int SERIAL_QUEUE_BYTE_LIMIT = Integer
.getInteger("DistributionManager.SERIAL_QUEUE_BYTE_LIMIT", (40 * (1024 * 1024)));
private static final int SERIAL_QUEUE_THROTTLE =
Integer.getInteger("DistributionManager.SERIAL_QUEUE_THROTTLE",
(int) (SERIAL_QUEUE_BYTE_LIMIT * THROTTLE_PERCENT));
private static final int TOTAL_SERIAL_QUEUE_BYTE_LIMIT =
Integer.getInteger("DistributionManager.TOTAL_SERIAL_QUEUE_BYTE_LIMIT", (80 * (1024 * 1024)));
private static final int TOTAL_SERIAL_QUEUE_THROTTLE =
Integer.getInteger("DistributionManager.TOTAL_SERIAL_QUEUE_THROTTLE",
(int) (SERIAL_QUEUE_BYTE_LIMIT * THROTTLE_PERCENT));
/** Throttling based on the Queue item size */
private static final int SERIAL_QUEUE_SIZE_LIMIT =
Integer.getInteger("DistributionManager.SERIAL_QUEUE_SIZE_LIMIT", 20000);
private static final int SERIAL_QUEUE_SIZE_THROTTLE =
Integer.getInteger("DistributionManager.SERIAL_QUEUE_SIZE_THROTTLE",
(int) (SERIAL_QUEUE_SIZE_LIMIT * THROTTLE_PERCENT));
/** Max number of serial Queue executors, in case of multi-serial-queue executor */
private static final int MAX_SERIAL_QUEUE_THREAD =
Integer.getInteger("DistributionManager.MAX_SERIAL_QUEUE_THREAD", 20);
static final String FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX =
"Function Execution Processor";
/** The DM type for regular distribution managers */
public static final int NORMAL_DM_TYPE = 10;
/** The DM type for locator distribution managers */
public static final int LOCATOR_DM_TYPE = 11;
/** The DM type for Console (admin-only) distribution managers */
public static final int ADMIN_ONLY_DM_TYPE = 12;
/** The DM type for stand-alone members */
public static final int LONER_DM_TYPE = 13;
/**
* @see org.apache.geode.distributed.internal.PooledDistributionMessage
*/
public static final int STANDARD_EXECUTOR = 73;
/**
* @see org.apache.geode.distributed.internal.SerialDistributionMessage
*/
public static final int SERIAL_EXECUTOR = 74;
/**
* @see org.apache.geode.distributed.internal.HighPriorityDistributionMessage
*/
public static final int HIGH_PRIORITY_EXECUTOR = 75;
// 76 not in use
/**
* @see org.apache.geode.internal.cache.InitialImageOperation
*/
public static final int WAITING_POOL_EXECUTOR = 77;
/**
* @see org.apache.geode.internal.cache.InitialImageOperation
*/
public static final int PARTITIONED_REGION_EXECUTOR = 78;
/**
* Executor for view related messages
*
* @see ViewAckMessage
*/
public static final int VIEW_EXECUTOR = 79;
public static final int REGION_FUNCTION_EXECUTION_EXECUTOR = 80;
/** Is this node running an AdminDistributedSystem? */
@MakeNotStatic
private static volatile boolean isDedicatedAdminVM = false;
@MakeNotStatic
private static final ThreadLocal<Boolean> isStartupThread = new ThreadLocal<>();
/**
* Identifier for function execution threads and any of their children
*/
@MakeNotStatic()
private static final InheritableThreadLocal<Boolean> isFunctionExecutionThread =
new InheritableThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return Boolean.FALSE;
}
};
///////////////////// Instance Fields //////////////////////
private final Stopper stopper = new Stopper(this);
/** The id of this distribution manager */
private final InternalDistributedMember localAddress;
/**
* The distribution manager type of this dm; set in its constructor.
*/
private final int dmType;
/**
* The <code>MembershipListener</code>s that are registered on this manager.
*/
private final ConcurrentMap<MembershipListener, Boolean> membershipListeners;
private final ClusterElderManager clusterElderManager = new ClusterElderManager(this);
/**
* The <code>MembershipListener</code>s that are registered on this manager for ALL members.
*
* @since GemFire 5.7
*/
private volatile Set<MembershipListener> allMembershipListeners = Collections.emptySet();
/**
* A lock to hold while adding and removing all membership listeners.
*
* @since GemFire 5.7
*/
private final Object allMembershipListenersLock = new Object();
/** A queue of MemberEvent instances */
private final BlockingQueue<MemberEvent> membershipEventQueue = new LinkedBlockingQueue<>();
/** Used to invoke registered membership listeners in the background. */
private Thread memberEventThread;
/** A brief description of this DistributionManager */
protected final String description;
/** Statistics about distribution */
protected final DistributionStats stats;
/** Did an exception occur in one of the DM threads? */
private boolean exceptionInThreads;
private volatile boolean shutdownMsgSent = false;
/** Set to true when this manager is being shutdown */
private volatile boolean closeInProgress = false;
private volatile boolean receivedStartupResponse = false;
private volatile String rejectionMessage = null;
private MembershipManager membershipManager;
/**
* The (non-admin-only) members of the distributed system. This is a map of memberid->memberid for
* fast access to canonical ID references. All accesses to this field must be synchronized on
* {@link #membersLock}.
*/
private Map<InternalDistributedMember, InternalDistributedMember> members =
Collections.emptyMap();
/**
* All (admin and non-admin) members of the distributed system. All accesses to this field must be
* synchronized on {@link #membersLock}.
*/
private Set<InternalDistributedMember> membersAndAdmin = Collections.emptySet();
/**
* Map of all locator members of the distributed system. The value is a collection of locator
* strings that are hosted in that member. All accesses to this field must be synchronized on
* {@link #membersLock}.
*/
private Map<InternalDistributedMember, Collection<String>> hostedLocatorsAll =
Collections.emptyMap();
/**
* Map of all locator members of the distributed system which have the shared configuration. The
* value is a collection of locator strings that are hosted in that member. All accesses to this
* field must be synchronized on {@link #membersLock}.
*/
private Map<InternalDistributedMember, Collection<String>> hostedLocatorsWithSharedConfiguration =
Collections.emptyMap();
/**
* The lock held while accessing the field references to the following:<br>
* 1) {@link #members}<br>
* 2) {@link #membersAndAdmin}<br>
* 3) {@link #hostedLocatorsAll}<br>
* 4) {@link #hostedLocatorsWithSharedConfiguration}<br>
*/
private final Object membersLock = new Object();
/**
* The lock held while writing {@link #adminConsoles}.
*/
private final Object adminConsolesLock = new Object();
/**
* The ids of all known admin consoles Uses Copy on Write. Writers must sync on adminConsolesLock.
* Readers don't need to sync.
*/
private volatile Set<InternalDistributedMember> adminConsoles = Collections.emptySet();
/** Message processing thread pool */
private ExecutorService threadPool;
/**
* High Priority processing thread pool, used for initializing messages such as UpdateAttributes
* and CreateRegion messages
*/
private ExecutorService highPriorityPool;
/**
* Waiting Pool, used for messages that may have to wait on something. Use this separate pool with
* an unbounded queue so that waiting runnables don't get in the way of other processing threads.
* Used for threads that will most likely have to wait for a region to be finished initializing
* before it can proceed
*/
private ExecutorService waitingPool;
private ExecutorService prMetaDataCleanupThreadPool;
/**
* Thread used to decouple {@link org.apache.geode.internal.cache.partitioned.PartitionMessage}s
* from {@link org.apache.geode.internal.cache.DistributedCacheOperation}s </b>
*
* @see #SERIAL_EXECUTOR
*/
private ExecutorService partitionedRegionThread;
private ExecutorService partitionedRegionPool;
/** Function Execution executors */
private ExecutorService functionExecutionThread;
private ExecutorService functionExecutionPool;
/** Message processing executor for serial, ordered, messages. */
private ExecutorService serialThread;
/**
* Message processing executor for view messages
*
* @see ViewAckMessage
*/
private ExecutorService viewThread;
/**
* If using a throttling queue for the serialThread, we cache the queue here so we can see if
* delivery would block
*/
private ThrottlingMemLinkedQueueWithDMStats<Runnable> serialQueue;
/**
* Thread Monitor mechanism to monitor system threads
*
* @see org.apache.geode.internal.monitoring.ThreadsMonitoring
*/
private final ThreadsMonitoring threadMonitor;
/** a map keyed on InternalDistributedMember, to direct channels to other systems */
// protected final Map channelMap = CFactory.createCM();
private volatile boolean readyForMessages = false;
/**
* Set to true once this DM is ready to send messages. Note that it is always ready to send the
* startup message.
*/
private volatile boolean readyToSendMsgs = false;
private final Object readyToSendMsgsLock = new Object();
/** Is this distribution manager closed? */
private volatile boolean closed = false;
/**
* The distributed system to which this distribution manager is connected.
*/
private InternalDistributedSystem system;
/** The remote transport configuration for this dm */
private RemoteTransportConfig transport;
/**
* The administration agent associated with this distribution manager.
*/
private volatile RemoteGfManagerAgent agent;
private SerialQueuedExecutorPool serialQueuedExecutorPool;
/**
* TODO why does the distribution manager arbitrate GII operations? That should be a Cache
* function
*/
private final Semaphore parallelGIIs = new Semaphore(InitialImageOperation.MAX_PARALLEL_GIIS);
/**
* Map of InetAddress to HashSets of InetAddress, to define equivalences between network interface
* cards and hosts.
*/
private final HashMap<InetAddress, Set<InetAddress>> equivalentHosts = new HashMap<>();
private int distributedSystemId;
private final Map<InternalDistributedMember, String> redundancyZones =
Collections.synchronizedMap(new HashMap<>());
private boolean enforceUniqueZone = false;
/**
* root cause of forcibly shutting down the distribution manager
*/
private volatile Throwable rootCause = null;
/**
* @see #closeInProgress
*/
private final Object shutdownMutex = new Object();
private final AlertingService alertingService;
////////////////////// Static Methods //////////////////////
/**
* Is the current thread used for executing Functions?
*/
public static Boolean isFunctionExecutionThread() {
return isFunctionExecutionThread.get();
}
/**
* Creates a new distribution manager and discovers the other members of the distributed system.
* Note that it does not check to see whether or not this VM already has a distribution manager.
*
* @param system The distributed system to which this distribution manager will send messages.
*/
static ClusterDistributionManager create(InternalDistributedSystem system) {
ClusterDistributionManager distributionManager = null;
boolean beforeJoined = true;
try {
int vmKind;
if (Boolean.getBoolean(InternalLocator.FORCE_LOCATOR_DM_TYPE)) {
// if this DM is starting for a locator, set it to be a locator DM
vmKind = LOCATOR_DM_TYPE;
} else if (isDedicatedAdminVM()) {
vmKind = ADMIN_ONLY_DM_TYPE;
} else {
vmKind = NORMAL_DM_TYPE;
}
RemoteTransportConfig transport = new RemoteTransportConfig(system.getConfig(), vmKind);
transport.setIsReconnectingDS(system.isReconnectingDS());
transport.setOldDSMembershipInfo(system.oldDSMembershipInfo());
long start = System.currentTimeMillis();
distributionManager =
new ClusterDistributionManager(system, transport, system.getAlertingService());
distributionManager.assertDistributionManagerType();
beforeJoined = false; // we have now joined the system
{
InternalDistributedMember id = distributionManager.getDistributionManagerId();
if (!"".equals(id.getName())) {
for (InternalDistributedMember m : distributionManager
.getViewMembers()) {
if (m.equals(id)) {
// I'm counting on the members returned by getViewMembers being ordered such that
// members that joined before us will precede us AND members that join after us
// will succeed us.
// SO once we find ourselves break out of this loop.
break;
}
if (id.getName().equals(m.getName())) {
if (distributionManager.getMembershipManager().verifyMember(m,
"member is using the name of " + id)) {
throw new IncompatibleSystemException("Member " + id
+ " could not join this distributed system because the existing member " + m
+ " used the same name. Set the \"name\" gemfire property to a unique value.");
}
}
}
}
distributionManager.addNewMember(id); // add ourselves
}
// Send out a StartupMessage to the other members.
StartupOperation op = new StartupOperation(distributionManager, transport);
try {
if (!distributionManager.sendStartupMessage(op)) {
// Well we didn't hear back from anyone else. We assume that
// we're the first one.
if (distributionManager.getOtherDistributionManagerIds().size() == 0) {
logger.info("Did not hear back from any other system. I am the first one.");
} else if (transport.isMcastEnabled()) {
// perform a multicast ping test
if (!distributionManager.testMulticast()) {
logger.warn(
"Did not receive a startup response but other members exist. Multicast does not seem to be working.");
}
}
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
// This is ALWAYS bad; don't consult a CancelCriterion.
throw new InternalGemFireException(
"Interrupted while waiting for first StartupResponseMessage",
ex);
} catch (IncompatibleSystemException ex) {
logger.fatal(ex.getMessage(), ex);
throw ex;
} finally {
distributionManager.readyToSendMsgs();
}
if (logger.isInfoEnabled()) {
long delta = System.currentTimeMillis() - start;
logger.info(LogMarker.DM_MARKER,
"DistributionManager {} started on {}. There were {} other DMs. others: {} {} {}",
distributionManager.getDistributionManagerId(), transport,
distributionManager.getOtherDistributionManagerIds().size(),
distributionManager.getOtherDistributionManagerIds(),
(logger.isInfoEnabled(LogMarker.DM_MARKER) ? " (took " + delta + " ms)" : ""),
((distributionManager.getDMType() == ADMIN_ONLY_DM_TYPE) ? " (admin only)"
: (distributionManager.getDMType() == LOCATOR_DM_TYPE) ? " (locator)" : ""));
MembershipLogger.logStartup(distributionManager.getDistributionManagerId());
}
return distributionManager;
} catch (RuntimeException r) {
if (distributionManager != null) {
if (logger.isDebugEnabled()) {
logger.debug("cleaning up incompletely started DistributionManager due to exception", r);
}
distributionManager.uncleanShutdown(beforeJoined);
}
throw r;
}
}
/////////////////////// Constructors ///////////////////////
/**
* Creates a new <code>DistributionManager</code> by initializing itself, creating the membership
* manager and executors
*
* @param transport The configuration for the communications transport
*
*/
private ClusterDistributionManager(RemoteTransportConfig transport,
InternalDistributedSystem system, AlertingService alertingService) {
this.system = system;
this.transport = transport;
this.alertingService = alertingService;
dmType = transport.getVmKind();
membershipListeners = new ConcurrentHashMap<>();
distributedSystemId = system.getConfig().getDistributedSystemId();
long statId = OSProcess.getId();
stats = new DistributionStats(system, statId);
DistributionStats.enableClockStats = system.getConfig().getEnableTimeStatistics();
exceptionInThreads = false;
{
Properties nonDefault = new Properties();
DistributionConfigImpl distributionConfigImpl = new DistributionConfigImpl(nonDefault);
if (distributionConfigImpl.getThreadMonitorEnabled()) {
threadMonitor = new ThreadsMonitoringImpl(system);
logger.info("[ThreadsMonitor] a New Monitor object and process were created.\n");
} else {
threadMonitor = new ThreadsMonitoringImplDummy();
logger.info("[ThreadsMonitor] Monitoring is disabled and will not be run.\n");
}
}
boolean finishedConstructor = false;
try {
if (MULTI_SERIAL_EXECUTORS) {
if (logger.isInfoEnabled(LogMarker.DM_MARKER)) {
logger.info(LogMarker.DM_MARKER,
"Serial Queue info :" + " THROTTLE_PERCENT: " + THROTTLE_PERCENT
+ " SERIAL_QUEUE_BYTE_LIMIT :" + SERIAL_QUEUE_BYTE_LIMIT
+ " SERIAL_QUEUE_THROTTLE :" + SERIAL_QUEUE_THROTTLE
+ " TOTAL_SERIAL_QUEUE_BYTE_LIMIT :" + TOTAL_SERIAL_QUEUE_BYTE_LIMIT
+ " TOTAL_SERIAL_QUEUE_THROTTLE :" + TOTAL_SERIAL_QUEUE_THROTTLE
+ " SERIAL_QUEUE_SIZE_LIMIT :" + SERIAL_QUEUE_SIZE_LIMIT
+ " SERIAL_QUEUE_SIZE_THROTTLE :" + SERIAL_QUEUE_SIZE_THROTTLE);
}
// when TCP/IP is disabled we can't throttle the serial queue or we run the risk of
// distributed deadlock when we block the UDP reader thread
boolean throttlingDisabled = system.getConfig().getDisableTcp();
serialQueuedExecutorPool =
new SerialQueuedExecutorPool(stats, throttlingDisabled, threadMonitor);
}
{
BlockingQueue<Runnable> poolQueue;
if (SERIAL_QUEUE_BYTE_LIMIT == 0) {
poolQueue = new OverflowQueueWithDMStats<>(stats.getSerialQueueHelper());
} else {
serialQueue =
new ThrottlingMemLinkedQueueWithDMStats<>(TOTAL_SERIAL_QUEUE_BYTE_LIMIT,
TOTAL_SERIAL_QUEUE_THROTTLE, SERIAL_QUEUE_SIZE_LIMIT, SERIAL_QUEUE_SIZE_THROTTLE,
stats.getSerialQueueHelper());
poolQueue = serialQueue;
}
serialThread = LoggingExecutors.newSerialThreadPool("Serial Message Processor",
thread -> stats.incSerialThreadStarts(),
this::doSerialThread, stats.getSerialProcessorHelper(),
threadMonitor, poolQueue);
}
viewThread =
LoggingExecutors.newSerialThreadPoolWithUnlimitedFeed("View Message Processor",
thread -> stats.incViewThreadStarts(), this::doViewThread,
stats.getViewProcessorHelper(), threadMonitor);
threadPool =
LoggingExecutors.newThreadPoolWithFeedStatistics("Pooled Message Processor ",
thread -> stats.incProcessingThreadStarts(), this::doProcessingThread,
MAX_THREADS, stats.getNormalPoolHelper(), threadMonitor,
INCOMING_QUEUE_LIMIT, stats.getOverflowQueueHelper());
highPriorityPool = LoggingExecutors.newThreadPoolWithFeedStatistics(
"Pooled High Priority Message Processor ",
thread -> stats.incHighPriorityThreadStarts(), this::doHighPriorityThread,
MAX_THREADS, stats.getHighPriorityPoolHelper(), threadMonitor,
INCOMING_QUEUE_LIMIT, stats.getHighPriorityQueueHelper());
{
BlockingQueue<Runnable> poolQueue;
if (MAX_WAITING_THREADS == Integer.MAX_VALUE) {
// no need for a queue since we have infinite threads
poolQueue = new SynchronousQueue<>();
} else {
poolQueue = new OverflowQueueWithDMStats<>(stats.getWaitingQueueHelper());
}
waitingPool = LoggingExecutors.newThreadPool("Pooled Waiting Message Processor ",
thread -> stats.incWaitingThreadStarts(), this::doWaitingThread,
MAX_WAITING_THREADS, stats.getWaitingPoolHelper(), threadMonitor, poolQueue);
}
// should this pool using the waiting pool stats?
prMetaDataCleanupThreadPool =
LoggingExecutors.newThreadPoolWithFeedStatistics("PrMetaData cleanup Message Processor ",
thread -> stats.incWaitingThreadStarts(), this::doWaitingThread,
MAX_PR_META_DATA_CLEANUP_THREADS, stats.getWaitingPoolHelper(), threadMonitor,
0, stats.getWaitingQueueHelper());
if (MAX_PR_THREADS > 1) {
partitionedRegionPool =
LoggingExecutors.newThreadPoolWithFeedStatistics("PartitionedRegion Message Processor",
thread -> stats.incPartitionedRegionThreadStarts(), this::doPartitionRegionThread,
MAX_PR_THREADS, stats.getPartitionedRegionPoolHelper(), threadMonitor,
INCOMING_QUEUE_LIMIT, stats.getPartitionedRegionQueueHelper());
} else {
partitionedRegionThread = LoggingExecutors.newSerialThreadPoolWithFeedStatistics(
"PartitionedRegion Message Processor",
thread -> stats.incPartitionedRegionThreadStarts(), this::doPartitionRegionThread,
stats.getPartitionedRegionPoolHelper(), threadMonitor,
INCOMING_QUEUE_LIMIT, stats.getPartitionedRegionQueueHelper());
}
if (MAX_FE_THREADS > 1) {
functionExecutionPool =
LoggingExecutors.newFunctionThreadPoolWithFeedStatistics(
FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX,
thread -> stats.incFunctionExecutionThreadStarts(), this::doFunctionExecutionThread,
MAX_FE_THREADS, stats.getFunctionExecutionPoolHelper(), threadMonitor,
INCOMING_QUEUE_LIMIT, stats.getFunctionExecutionQueueHelper());
} else {
functionExecutionThread =
LoggingExecutors.newSerialThreadPoolWithFeedStatistics(
FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX,
thread -> stats.incFunctionExecutionThreadStarts(), this::doFunctionExecutionThread,
stats.getFunctionExecutionPoolHelper(), threadMonitor,
INCOMING_QUEUE_LIMIT, stats.getFunctionExecutionQueueHelper());
}
if (!SYNC_EVENTS) {
memberEventThread =
new LoggingThread("DM-MemberEventInvoker", new MemberEventInvoker());
}
StringBuilder sb = new StringBuilder(" (took ");
// connect to the cluster
long start = System.currentTimeMillis();
DMListener l = new DMListener(this);
membershipManager = MemberFactory.newMembershipManager(l, transport,
stats,
new GMSAuthenticator(system.getSecurityProperties(), system.getSecurityService(),
system.getSecurityLogWriter(), system.getInternalLogWriter()),
system.getConfig());
sb.append(System.currentTimeMillis() - start);
localAddress = membershipManager.getLocalMember();
membershipManager.postConnect();
sb.append(" ms)");
logger.info("Starting DistributionManager {}. {}",
new Object[] {localAddress,
(logger.isInfoEnabled(LogMarker.DM_MARKER) ? sb.toString() : "")});
description = "Distribution manager on " + localAddress + " started at "
+ (new Date(System.currentTimeMillis())).toString();
finishedConstructor = true;
} finally {
if (!finishedConstructor) {
askThreadsToStop(); // fix for bug 42039
}
}
}
private void doFunctionExecutionThread(Runnable command) {
stats.incFunctionExecutionThreads(1);
isFunctionExecutionThread.set(Boolean.TRUE);
try {
ConnectionTable.threadWantsSharedResources();
Connection.makeReaderThread();
runUntilShutdown(command);
} finally {
ConnectionTable.releaseThreadsSockets();
stats.incFunctionExecutionThreads(-1);
}
}
private void doProcessingThread(Runnable command) {
stats.incNumProcessingThreads(1);
try {
ConnectionTable.threadWantsSharedResources();
Connection.makeReaderThread();
runUntilShutdown(command);
} finally {
ConnectionTable.releaseThreadsSockets();
stats.incNumProcessingThreads(-1);
}
}
private void doHighPriorityThread(Runnable command) {
stats.incHighPriorityThreads(1);
try {
ConnectionTable.threadWantsSharedResources();
Connection.makeReaderThread();
runUntilShutdown(command);
} finally {
ConnectionTable.releaseThreadsSockets();
stats.incHighPriorityThreads(-1);
}
}
private void doWaitingThread(Runnable command) {
stats.incWaitingThreads(1);
try {
ConnectionTable.threadWantsSharedResources();
Connection.makeReaderThread();
runUntilShutdown(command);
} finally {
ConnectionTable.releaseThreadsSockets();
stats.incWaitingThreads(-1);
}
}
private void doPartitionRegionThread(Runnable command) {
stats.incPartitionedRegionThreads(1);
try {
ConnectionTable.threadWantsSharedResources();
Connection.makeReaderThread();
runUntilShutdown(command);
} finally {
ConnectionTable.releaseThreadsSockets();
stats.incPartitionedRegionThreads(-1);
}
}
private void doViewThread(Runnable command) {
stats.incNumViewThreads(1);
try {
ConnectionTable.threadWantsSharedResources();
Connection.makeReaderThread();
runUntilShutdown(command);
} finally {
ConnectionTable.releaseThreadsSockets();
stats.incNumViewThreads(-1);
}
}
private void doSerialThread(Runnable command) {
stats.incNumSerialThreads(1);
try {
ConnectionTable.threadWantsSharedResources();
Connection.makeReaderThread();
runUntilShutdown(command);
} finally {
ConnectionTable.releaseThreadsSockets();
stats.incNumSerialThreads(-1);
}
}
/**
* Creates a new distribution manager
*
* @param system The distributed system to which this distribution manager will send messages.
*/
private ClusterDistributionManager(InternalDistributedSystem system,
RemoteTransportConfig transport, AlertingService alertingService) {
this(transport, system, alertingService);
boolean finishedConstructor = false;
try {
setIsStartupThread();
startThreads();
// Allow events to start being processed.
membershipManager.startEventProcessing();
for (;;) {
getCancelCriterion().checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
membershipManager.waitForEventProcessing();
break;
} catch (InterruptedException e) {
interrupted = true;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
finishedConstructor = true;
} finally {
if (!finishedConstructor) {
askThreadsToStop(); // fix for bug 42039
}
}
}
/**
* Is this VM dedicated to administration (like a GUI console or a JMX agent)? If so, then it
* creates {@link #ADMIN_ONLY_DM_TYPE} type distribution managers.
*
* @since GemFire 4.0
*/
public static boolean isDedicatedAdminVM() {
return isDedicatedAdminVM;
}
public static void setIsDedicatedAdminVM(boolean isDedicatedAdminVM) {
ClusterDistributionManager.isDedicatedAdminVM = isDedicatedAdminVM;
}
private static Boolean isStartupThread() {
return isStartupThread.get();
}
private static void setIsStartupThread() {
ClusterDistributionManager.isStartupThread.set(Boolean.TRUE);
}
//////////////////// Instance Methods /////////////////////
private void runUntilShutdown(Runnable r) {
try {
r.run();
} catch (CancelException e) {
if (logger.isTraceEnabled()) {
logger.trace("Caught shutdown exception", e);
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
SystemFailure.checkFailure();
if (shouldInhibitMembershipWarnings()) {
logger.debug("Caught unusual exception during shutdown: {}", t.getMessage(), t);
} else {
logger.warn("Task failed with exception", t);
}
}
}
/**
* Returns true if the two members are on the same equivalent host based on overlapping IP
* addresses collected for all NICs during exchange of startup messages.
*
* @param member1 First member
* @param member2 Second member
*/
@Override
public boolean areOnEquivalentHost(InternalDistributedMember member1,
InternalDistributedMember member2) {
Set<InetAddress> equivalents1 = getEquivalents(member1.getInetAddress());
return equivalents1.contains(member2.getInetAddress());
}
/**
* Set the host equivalencies for a given host. This overrides any previous information in the
* tables.
*
* @param equivs list of InetAddress's that all point at same host
*/
void setEquivalentHosts(Set<InetAddress> equivs) {
Iterator<InetAddress> it = equivs.iterator();
synchronized (equivalentHosts) {
while (it.hasNext()) {
equivalentHosts.put(it.next(), Collections.unmodifiableSet(equivs));
}
}
}
/**
* Return all of the InetAddress's that are equivalent to the given one (same host)
*
* @param in host to match up
* @return all the addresses thus equivalent
*/
@Override
public Set<InetAddress> getEquivalents(InetAddress in) {
Set<InetAddress> result;
synchronized (equivalentHosts) {
result = equivalentHosts.get(in);
}
// DS 11/25/08 - It appears that when using VPN, the distributed member
// id is the vpn address, but that doesn't show up in the equivalents.
if (result == null) {
result = Collections.singleton(in);
}
return result;
}
void setRedundancyZone(InternalDistributedMember member, String redundancyZone) {
if (redundancyZone != null && !redundancyZone.equals("")) {
redundancyZones.put(member, redundancyZone);
}
if (member != getDistributionManagerId()) {
String relationship = areInSameZone(getDistributionManagerId(), member) ? "" : "not ";
Object[] logArgs = new Object[] {member, relationship};
logger.info("Member {} is {} equivalent or in the same redundancy zone.",
logArgs);
}
}
/**
* Set the flag indicating that we should enforce unique zones. If we are already enforcing unique
* zones, keep it that way.
*/
void setEnforceUniqueZone(boolean enforceUniqueZone) {
this.enforceUniqueZone |= enforceUniqueZone;
}
@Override
public boolean enforceUniqueZone() {
return enforceUniqueZone;
}
public String getRedundancyZone(InternalDistributedMember member) {
return redundancyZones.get(member);
}
/**
* Asserts that distributionManagerType is LOCAL, GEMFIRE, or ADMIN_ONLY. Also asserts that the
* distributionManagerId (jgroups DistributedMember) has a VmKind that matches.
*/
private void assertDistributionManagerType() {
// Assert that dmType is one of the three DM types...
int theDmType = getDMType();
switch (theDmType) {
case NORMAL_DM_TYPE:
case LONER_DM_TYPE:
case ADMIN_ONLY_DM_TYPE:
case LOCATOR_DM_TYPE:
break;
default:
Assert.assertTrue(false, "unknown distribution manager type");
}
// Assert InternalDistributedMember VmKind matches this DistributionManagerType...
final InternalDistributedMember theId = getDistributionManagerId();
final int vmKind = theId.getVmKind();
if (theDmType != vmKind) {
Assert.assertTrue(false,
"InternalDistributedMember has a vmKind of " + vmKind + " instead of " + theDmType);
}
}
@Override
public int getDMType() {
return dmType;
}
@Override
public List<InternalDistributedMember> getViewMembers() {
return membershipManager.getView().getMembers();
}
private boolean testMulticast() {
return membershipManager.testMulticast();
}
/**
* Need to do this outside the constructor so that the child constructor can finish.
*/
private void startThreads() {
system.setDM(this); // fix for bug 33362
if (memberEventThread != null)
memberEventThread.start();
try {
// And the distinguished guests today are...
NetView v = membershipManager.getView();
logger.info("Initial (distribution manager) view, {}",
String.valueOf(v));
// Add them all to our view
for (InternalDistributedMember internalDistributedMember : v.getMembers()) {
addNewMember(internalDistributedMember);
}
} catch (Exception ex) {
throw new InternalGemFireException(
"Could not process initial view",
ex);
}
try {
getWaitingThreadPool().execute(() -> {
// call in background since it might need to send a reply
// and we are not ready to send messages until startup is finished
setIsStartupThread();
readyForMessages();
});
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.fatal("Uncaught exception calling readyForMessages", t);
}
}
private void readyForMessages() {
synchronized (this) {
readyForMessages = true;
notifyAll();
}
membershipManager.startEventProcessing();
}
private void waitUntilReadyForMessages() {
if (readyForMessages)
return;
synchronized (this) {
while (!readyForMessages) {
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
wait();
} catch (InterruptedException e) {
interrupted = true;
stopper.checkCancelInProgress(e);
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
} // synchronized
}
/**
* Call when the DM is ready to send messages.
*/
private void readyToSendMsgs() {
synchronized (readyToSendMsgsLock) {
readyToSendMsgs = true;
readyToSendMsgsLock.notifyAll();
}
}
/**
* Return when DM is ready to send out messages.
*
* @param msg the messsage that is currently being sent
*/
private void waitUntilReadyToSendMsgs(DistributionMessage msg) {
if (readyToSendMsgs) {
return;
}
// another process may have been started in the same view, so we need
// to be responsive to startup messages and be able to send responses
if (msg instanceof AdminMessageType) {
return;
}
if (isStartupThread() == Boolean.TRUE) {
// let the startup thread send messages
// the only case I know of that does this is if we happen to log a
// message during startup and an alert listener has registered.
return;
}
synchronized (readyToSendMsgsLock) {
while (!readyToSendMsgs) {
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
readyToSendMsgsLock.wait();
} catch (InterruptedException e) {
interrupted = true;
stopper.checkCancelInProgress(e);
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
} // synchronized
}
@Override
public void forceUDPMessagingForCurrentThread() {
membershipManager.forceUDPMessagingForCurrentThread();
}
@Override
public void releaseUDPMessagingForCurrentThread() {
membershipManager.releaseUDPMessagingForCurrentThread();
}
/**
* Did an exception occur in one of the threads launched by this distribution manager?
*/
@Override
public boolean exceptionInThreads() {
return exceptionInThreads
|| LoggingUncaughtExceptionHandler.getUncaughtExceptionsCount() > 0;
}
/**
* Clears the boolean that determines whether or not an exception occurred in one of the worker
* threads. This method should be used for testing purposes only!
*/
@Override
public void clearExceptionInThreads() {
exceptionInThreads = false;
LoggingUncaughtExceptionHandler.clearUncaughtExceptionsCount();
}
/**
* Returns the current "cache time" in milliseconds since the epoch. The "cache time" takes into
* account skew among the local clocks on the various machines involved in the cache.
*/
@Override
public long cacheTimeMillis() {
return system.getClock().cacheTimeMillis();
}
@Override
public DistributedMember getMemberWithName(String name) {
for (DistributedMember id : members.values()) {
if (Objects.equals(id.getName(), name)) {
return id;
}
}
if (Objects.equals(localAddress.getName(), name)) {
return localAddress;
}
return null;
}
/**
* Returns the id of this distribution manager.
*/
@Override
public InternalDistributedMember getDistributionManagerId() {
return localAddress;
}
/**
* Returns an unmodifiable set containing the identities of all of the known (non-admin-only)
* distribution managers.
*/
@Override
public Set<InternalDistributedMember> getDistributionManagerIds() {
// access to members synchronized under membersLock in order to
// ensure serialization
synchronized (membersLock) {
return members.keySet();
}
}
/**
* Adds the entry in {@link #hostedLocatorsAll} for a member with one or more hosted locators. The
* value is a collection of host[port] strings. If a bind-address was used for a locator then the
* form is bind-addr[port].
*
* @since GemFire 6.6.3
*/
@Override
public void addHostedLocators(InternalDistributedMember member, Collection<String> locators,
boolean isSharedConfigurationEnabled) {
synchronized (membersLock) {
if (locators == null || locators.isEmpty()) {
throw new IllegalArgumentException("Cannot use empty collection of locators");
}
if (hostedLocatorsAll.isEmpty()) {
hostedLocatorsAll = new HashMap<>();
}
Map<InternalDistributedMember, Collection<String>> tmp =
new HashMap<>(hostedLocatorsAll);
tmp.remove(member);
tmp.put(member, locators);
tmp = Collections.unmodifiableMap(tmp);
hostedLocatorsAll = tmp;
if (isSharedConfigurationEnabled) {
if (hostedLocatorsWithSharedConfiguration.isEmpty()) {
hostedLocatorsWithSharedConfiguration = new HashMap<>();
}
tmp = new HashMap<>(hostedLocatorsWithSharedConfiguration);
tmp.remove(member);
tmp.put(member, locators);
tmp = Collections.unmodifiableMap(tmp);
hostedLocatorsWithSharedConfiguration = tmp;
}
}
}
private void removeHostedLocators(InternalDistributedMember member) {
synchronized (membersLock) {
if (hostedLocatorsAll.containsKey(member)) {
Map<InternalDistributedMember, Collection<String>> tmp =
new HashMap<>(hostedLocatorsAll);
tmp.remove(member);
if (tmp.isEmpty()) {
tmp = Collections.emptyMap();
} else {
tmp = Collections.unmodifiableMap(tmp);
}
hostedLocatorsAll = tmp;
}
if (hostedLocatorsWithSharedConfiguration.containsKey(member)) {
Map<InternalDistributedMember, Collection<String>> tmp =
new HashMap<>(
hostedLocatorsWithSharedConfiguration);
tmp.remove(member);
if (tmp.isEmpty()) {
tmp = Collections.emptyMap();
} else {
tmp = Collections.unmodifiableMap(tmp);
}
hostedLocatorsWithSharedConfiguration = tmp;
}
}
}
/**
* Gets the value in {@link #hostedLocatorsAll} for a member with one or more hosted locators. The
* value is a collection of host[port] strings. If a bind-address was used for a locator then the
* form is bind-addr[port].
*
* @since GemFire 6.6.3
*/
@Override
public Collection<String> getHostedLocators(InternalDistributedMember member) {
synchronized (membersLock) {
return hostedLocatorsAll.get(member);
}
}
/**
* Returns a copy of the map of all members hosting locators. The key is the member, and the value
* is a collection of host[port] strings. If a bind-address was used for a locator then the form
* is bind-addr[port].
*
* The member is the vm that hosts one or more locator, if another locator starts up linking to
* this locator, it will put that member in this map as well, and this member will in the map on
* the other locato vm as well.
*
* The keyset of the map are the locator vms in this cluster.
*
* the value is a collection of strings in case one vm can have multiple locators ????
*
* @since GemFire 6.6.3
*/
@Override
public Map<InternalDistributedMember, Collection<String>> getAllHostedLocators() {
synchronized (membersLock) {
return hostedLocatorsAll;
}
}
/**
* Returns a copy of the map of all members hosting locators with shared configuration. The key is
* the member, and the value is a collection of host[port] strings. If a bind-address was used for
* a locator then the form is bind-addr[port].
*
* @since GemFire 8.0
*/
@Override
public Map<InternalDistributedMember, Collection<String>> getAllHostedLocatorsWithSharedConfiguration() {
synchronized (membersLock) {
return hostedLocatorsWithSharedConfiguration;
}
}
/**
* Returns an unmodifiable set containing the identities of all of the known (including admin)
* distribution managers.
*/
@Override
public Set<InternalDistributedMember> getDistributionManagerIdsIncludingAdmin() {
// access to members synchronized under membersLock in order to
// ensure serialization
synchronized (membersLock) {
return membersAndAdmin;
}
}
/**
* Returns a private-memory list containing the identities of all the other known distribution
* managers not including me.
*/
@Override
public Set<InternalDistributedMember> getOtherDistributionManagerIds() {
// We return a modified copy of the list, so
// collect the old list and copy under the lock.
Set<InternalDistributedMember> result = new HashSet<>(getDistributionManagerIds());
InternalDistributedMember me = getDistributionManagerId();
result.remove(me);
// It's okay for my own id to not be in the list of all ids yet.
return result;
}
@Override
public Set<InternalDistributedMember> getOtherNormalDistributionManagerIds() {
// We return a modified copy of the list, so
// collect the old list and copy under the lock.
Set<InternalDistributedMember> result = new HashSet<>(getNormalDistributionManagerIds());
InternalDistributedMember me = getDistributionManagerId();
result.remove(me);
// It's okay for my own id to not be in the list of all ids yet.
return result;
}
@Override
public InternalDistributedMember getCanonicalId(DistributedMember id) {
// the members set is copy-on-write, so it is safe to iterate over it
InternalDistributedMember result = members.get(id);
if (result == null) {
return (InternalDistributedMember) id;
}
return result;
}
/**
* Add a membership listener and return other DistributionManagerIds as an atomic operation
*/
@Override
public Set<InternalDistributedMember> addMembershipListenerAndGetDistributionManagerIds(
MembershipListener l) {
// switched sync order to fix bug 30360
synchronized (membersLock) {
// Don't let the members come and go while we are adding this
// listener. This ensures that the listener (probably a
// ReplyProcessor) gets a consistent view of the members.
addMembershipListener(l);
// Note it is ok to return the members set
// because we will never modify the returned set.
return members.keySet();
}
}
private void addNewMember(InternalDistributedMember member) {
// This is the place to cleanup the zombieMembers
int vmType = member.getVmKind();
switch (vmType) {
case ADMIN_ONLY_DM_TYPE:
handleConsoleStartup(member);
break;
case LOCATOR_DM_TYPE:
case NORMAL_DM_TYPE:
handleManagerStartup(member);
break;
default:
throw new InternalGemFireError(String.format("Unknown member type: %s",
vmType));
}
}
/**
* Returns the identity of this <code>DistributionManager</code>
*/
@Override
public InternalDistributedMember getId() {
return localAddress;
}
@Override
public long getMembershipPort() {
return localAddress.getPort();
}
@Override
public Set<InternalDistributedMember> putOutgoing(final DistributionMessage msg) {
try {
DistributionMessageObserver observer = DistributionMessageObserver.getInstance();
if (observer != null) {
observer.beforeSendMessage(this, msg);
}
return sendMessage(msg);
} catch (NotSerializableException e) {
throw new InternalGemFireException(e);
}
}
@Override
public String toString() {
return description;
}
/**
* Informs other members that this dm is shutting down. Stops the pusher, puller, and processor
* threads and closes the connection to the transport layer.
*/
protected void shutdown() {
// Make sure only one thread initiates shutdown...
synchronized (shutdownMutex) {
if (closeInProgress) {
return;
}
closeInProgress = true;
} // synchronized
// [bruce] log shutdown at info level and with ID to balance the
// "Starting" message. recycleConn.conf is hard to debug w/o this
final String exceptionStatus = (exceptionInThreads()
? "At least one Exception occurred."
: "");
logger.info("Shutting down DistributionManager {}. {}",
new Object[] {localAddress, exceptionStatus});
final long start = System.currentTimeMillis();
try {
if (rootCause instanceof ForcedDisconnectException) {
if (logger.isDebugEnabled()) {
logger.debug(
"inhibiting sending of shutdown message to other members due to forced-disconnect");
}
} else {
// Don't block indefinitely trying to send the shutdown message, in
// case other VMs in the system are ill-behaved. (bug 34710)
final Runnable r = () -> {
try {
ConnectionTable.threadWantsSharedResources();
sendShutdownMessage();
} catch (final CancelException e) {
// We were terminated.
logger.debug("Cancelled during shutdown message", e);
}
};
final Thread t =
new LoggingThread(String.format("Shutdown Message Thread for %s",
localAddress), false, r);
t.start();
boolean interrupted = Thread.interrupted();
try {
t.join(MAX_STOP_TIME / 4);
} catch (final InterruptedException e) {
interrupted = true;
t.interrupt();
logger.warn("Interrupted sending shutdown message to peers",
e);
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
if (t.isAlive()) {
t.interrupt();
logger.warn("Failed sending shutdown message to peers (timeout)");
}
}
} finally {
shutdownMsgSent = true; // in case sendShutdownMessage failed....
try {
uncleanShutdown(false);
} finally {
final Long delta = System.currentTimeMillis() - start;
logger.info("DistributionManager stopped in {}ms.", delta);
}
}
}
private void askThreadsToStop() {
// Stop executors after they have finished
ExecutorService es;
threadMonitor.close();
es = serialThread;
if (es != null) {
es.shutdown();
}
es = viewThread;
if (es != null) {
// Hmmm...OK, I'll let any view events currently in the queue be
// processed. Not sure it's very important whether they get
// handled...
es.shutdown();
}
if (serialQueuedExecutorPool != null) {
serialQueuedExecutorPool.shutdown();
}
es = functionExecutionThread;
if (es != null) {
es.shutdown();
}
es = functionExecutionPool;
if (es != null) {
es.shutdown();
}
es = partitionedRegionThread;
if (es != null) {
es.shutdown();
}
es = partitionedRegionPool;
if (es != null) {
es.shutdown();
}
es = highPriorityPool;
if (es != null) {
es.shutdown();
}
es = waitingPool;
if (es != null) {
es.shutdown();
}
es = prMetaDataCleanupThreadPool;
if (es != null) {
es.shutdown();
}
es = threadPool;
if (es != null) {
es.shutdown();
}
Thread th = memberEventThread;
if (th != null)
th.interrupt();
}
private void waitForThreadsToStop(long timeInMillis) throws InterruptedException {
long start = System.currentTimeMillis();
long remaining = timeInMillis;
ExecutorService[] allExecutors = new ExecutorService[] {serialThread, viewThread,
functionExecutionThread, functionExecutionPool, partitionedRegionThread,
partitionedRegionPool, highPriorityPool, waitingPool,
prMetaDataCleanupThreadPool, threadPool};
for (ExecutorService es : allExecutors) {
if (es != null) {
es.awaitTermination(remaining, TimeUnit.MILLISECONDS);
}
remaining = timeInMillis - (System.currentTimeMillis() - start);
if (remaining <= 0) {
return;
}
}
serialQueuedExecutorPool.awaitTermination(remaining, TimeUnit.MILLISECONDS);
remaining = timeInMillis - (System.currentTimeMillis() - start);
if (remaining <= 0) {
return;
}
Thread th = memberEventThread;
if (th != null) {
th.interrupt(); // bug #43452 - this thread sometimes eats interrupts, so we interrupt it
// again here
th.join(remaining);
}
}
/**
* Cheap tool to kill a referenced thread
*
* @param t the thread to kill
*/
private void clobberThread(Thread t) {
if (t == null)
return;
if (t.isAlive()) {
logger.warn("Forcing thread stop on < {} >", t);
// Start by being nice.
t.interrupt();
// we could be more violent here...
// t.stop();
try {
for (int i = 0; i < MAX_STOP_ATTEMPTS && t.isAlive(); i++) {
t.join(STOP_PAUSE_TIME);
t.interrupt();
}
} catch (InterruptedException ex) {
logger.debug("Interrupted while attempting to terminate threads.");
Thread.currentThread().interrupt();
// just keep going
}
if (t.isAlive()) {
logger.warn("Thread refused to die: {}", t);
}
}
}
/**
* Cheap tool to examine an executor to see if it is still working
*
* @return true if executor is still active
*/
private boolean executorAlive(ExecutorService tpe, String name) {
if (tpe == null) {
return false;
} else {
int ac = ((ThreadPoolExecutor) tpe).getActiveCount();
// boolean result = tpe.getActiveCount() > 0;
if (ac > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Still waiting for {} threads in '{}' pool to exit", ac, name);
}
return true;
} else {
return false;
}
}
}
/**
* Wait for the ancillary queues to exit. Kills them if they are still around.
*
*/
private void forceThreadsToStop() {
long endTime = System.currentTimeMillis() + MAX_STOP_TIME;
StringBuilder culprits;
for (;;) {
boolean stillAlive = false;
culprits = new StringBuilder();
if (executorAlive(serialThread, "serial thread")) {
stillAlive = true;
culprits.append(" serial thread;");
}
if (executorAlive(viewThread, "view thread")) {
stillAlive = true;
culprits.append(" view thread;");
}
if (executorAlive(partitionedRegionThread, "partitioned region thread")) {
stillAlive = true;
culprits.append(" partitioned region thread;");
}
if (executorAlive(partitionedRegionPool, "partitioned region pool")) {
stillAlive = true;
culprits.append(" partitioned region pool;");
}
if (executorAlive(highPriorityPool, "high priority pool")) {
stillAlive = true;
culprits.append(" high priority pool;");
}
if (executorAlive(waitingPool, "waiting pool")) {
stillAlive = true;
culprits.append(" waiting pool;");
}
if (executorAlive(prMetaDataCleanupThreadPool, "prMetaDataCleanupThreadPool")) {
stillAlive = true;
culprits.append(" special waiting pool;");
}
if (executorAlive(threadPool, "thread pool")) {
stillAlive = true;
culprits.append(" thread pool;");
}
if (!stillAlive)
return;
long now = System.currentTimeMillis();
if (now >= endTime)
break;
try {
Thread.sleep(STOP_PAUSE_TIME);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// Desperation, the shutdown thread is being killed. Don't
// consult a CancelCriterion.
logger.warn("Interrupted during shutdown", e);
break;
}
} // for
logger.warn("Daemon threads are slow to stop; culprits include: {}",
culprits);
// Kill with no mercy
if (serialThread != null) {
serialThread.shutdownNow();
}
if (viewThread != null) {
viewThread.shutdownNow();
}
if (functionExecutionThread != null) {
functionExecutionThread.shutdownNow();
}
if (functionExecutionPool != null) {
functionExecutionPool.shutdownNow();
}
if (partitionedRegionThread != null) {
partitionedRegionThread.shutdownNow();
}
if (partitionedRegionPool != null) {
partitionedRegionPool.shutdownNow();
}
if (highPriorityPool != null) {
highPriorityPool.shutdownNow();
}
if (waitingPool != null) {
waitingPool.shutdownNow();
}
if (prMetaDataCleanupThreadPool != null) {
prMetaDataCleanupThreadPool.shutdownNow();
}
if (threadPool != null) {
threadPool.shutdownNow();
}
Thread th = memberEventThread;
if (th != null) {
clobberThread(th);
}
}
private volatile boolean shutdownInProgress = false;
/** guard for membershipViewIdAcknowledged */
private final Object membershipViewIdGuard = new Object();
/** the latest view ID that has been processed by all membership listeners */
private long membershipViewIdAcknowledged;
@Override
public boolean shutdownInProgress() {
return shutdownInProgress;
}
/**
* Stops the pusher, puller and processor threads and closes the connection to the transport
* layer. This should only be used from shutdown() or from the dm initialization code
*/
private void uncleanShutdown(boolean beforeJoined) {
try {
closeInProgress = true; // set here also to fix bug 36736
removeAllHealthMonitors();
shutdownInProgress = true;
if (membershipManager != null) {
membershipManager.setShutdown();
}
askThreadsToStop();
// wait a moment before asking threads to terminate
try {
waitForThreadsToStop(1000);
} catch (InterruptedException ie) {
// No need to reset interrupt bit, we're really trying to quit...
}
forceThreadsToStop();
} // try
finally {
// ABSOLUTELY ESSENTIAL that we close the distribution channel!
try {
// For safety, but channel close in a finally AFTER this...
if (stats != null) {
stats.close();
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
// No need to reset interrupt bit, we're really trying to quit...
}
}
} finally {
if (membershipManager != null) {
logger.info("Now closing distribution for {}",
localAddress);
membershipManager.disconnect(beforeJoined);
}
}
}
}
@Override
public InternalDistributedSystem getSystem() {
return system;
}
@Override
public AlertingService getAlertingService() {
return alertingService;
}
/**
* Returns the transport configuration for this distribution manager
*/
RemoteTransportConfig getTransport() {
return transport;
}
@Override
public void addMembershipListener(MembershipListener l) {
membershipListeners.putIfAbsent(l, Boolean.TRUE);
}
@Override
public void removeMembershipListener(MembershipListener l) {
membershipListeners.remove(l);
}
@Override
public Collection<MembershipListener> getMembershipListeners() {
return Collections.unmodifiableSet(membershipListeners.keySet());
}
/**
* Adds a <code>MembershipListener</code> to this distribution manager.
*/
private void addAllMembershipListener(MembershipListener l) {
synchronized (allMembershipListenersLock) {
Set<MembershipListener> newAllMembershipListeners =
new HashSet<>(allMembershipListeners);
newAllMembershipListeners.add(l);
allMembershipListeners = newAllMembershipListeners;
}
}
@Override
public void removeAllMembershipListener(MembershipListener l) {
synchronized (allMembershipListenersLock) {
Set<MembershipListener> newAllMembershipListeners =
new HashSet<>(allMembershipListeners);
if (!newAllMembershipListeners.remove(l)) {
// There seems to be a race condition in which
// multiple departure events can be registered
// on the same peer. We regard this as benign.
// FIXME when membership events become sane again
// String s = "MembershipListener was never registered";
// throw new IllegalArgumentException(s);
}
allMembershipListeners = newAllMembershipListeners;
}
}
private boolean shouldInhibitMembershipWarnings() {
if (isCloseInProgress()) {
return true;
}
InternalDistributedSystem ds = getSystem();
return ds != null && ds.isDisconnecting();
}
/**
* Returns true if this distribution manager has initiated shutdown
*/
public boolean isCloseInProgress() {
return closeInProgress;
}
private void handleViewInstalledEvent(ViewInstalledEvent ev) {
synchronized (membershipViewIdGuard) {
membershipViewIdAcknowledged = ev.getViewId();
membershipViewIdGuard.notifyAll();
}
}
/**
* This stalls waiting for the current membership view (as seen by the membership manager) to be
* acknowledged by all membership listeners
*/
void waitForViewInstallation(long id) throws InterruptedException {
if (id <= membershipViewIdAcknowledged) {
return;
}
synchronized (membershipViewIdGuard) {
while (membershipViewIdAcknowledged < id && !stopper.isCancelInProgress()) {
if (logger.isDebugEnabled()) {
logger.debug("waiting for view {}. Current DM view processed by all listeners is {}", id,
membershipViewIdAcknowledged);
}
membershipViewIdGuard.wait();
}
}
}
private void handleMemberEvent(MemberEvent ev) {
ev.handleEvent(this);
}
/**
* This thread processes member events as they occur.
*
* @see ClusterDistributionManager.MemberCrashedEvent
* @see ClusterDistributionManager.MemberJoinedEvent
* @see ClusterDistributionManager.MemberDepartedEvent
*
*/
protected class MemberEventInvoker implements Runnable {
@Override
@SuppressWarnings("synthetic-access")
public void run() {
for (;;) {
SystemFailure.checkFailure();
// bug 41539 - member events need to be delivered during shutdown
// or reply processors may hang waiting for replies from
// departed members
// if (getCancelCriterion().isCancelInProgress()) {
// break; // no message, just quit
// }
if (!system.isConnected
&& isClosed()) {
break;
}
try {
MemberEvent ev =
membershipEventQueue.take();
handleMemberEvent(ev);
} catch (InterruptedException e) {
if (shouldInhibitMembershipWarnings()) {
if (logger.isTraceEnabled()) {
logger.trace("MemberEventInvoker: InterruptedException during shutdown");
}
} else {
logger.warn("Unexpected InterruptedException", e);
}
break;
} catch (DistributedSystemDisconnectedException e) {
break;
} catch (CancelException e) {
if (shouldInhibitMembershipWarnings()) {
if (logger.isTraceEnabled()) {
logger.trace("MemberEventInvoker: cancelled");
}
} else {
logger.warn("Unexpected cancellation", e);
}
break;
} catch (Exception e) {
logger.fatal(
"Uncaught exception processing member event",
e);
}
} // for
if (logger.isTraceEnabled()) {
logger.trace("MemberEventInvoker on {} stopped", ClusterDistributionManager.this);
}
}
}
private void addMemberEvent(MemberEvent ev) {
if (SYNC_EVENTS) {
handleMemberEvent(ev);
} else {
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
membershipEventQueue.put(ev);
} catch (InterruptedException ex) {
interrupted = true;
stopper.checkCancelInProgress(ex);
handleMemberEvent(ev); // FIXME why???
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
@Override
public void close() {
if (!closed) {
shutdown();
logger.info("Marking DistributionManager {} as closed.",
localAddress);
MembershipLogger.logShutdown(localAddress);
closed = true;
}
}
@Override
public void throwIfDistributionStopped() {
if (shutdownMsgSent) {
throw new DistributedSystemDisconnectedException(
"Message distribution has terminated",
getRootCause());
}
}
/**
* Returns true if this distribution manager has been closed.
*/
public boolean isClosed() {
return closed;
}
@Override
public void addAdminConsole(InternalDistributedMember theId) {
logger.info("New administration member detected at {}.", theId);
synchronized (adminConsolesLock) {
HashSet<InternalDistributedMember> tmp = new HashSet<>(adminConsoles);
tmp.add(theId);
adminConsoles = Collections.unmodifiableSet(tmp);
}
}
@Override
public DMStats getStats() {
return stats;
}
@Override
public DistributionConfig getConfig() {
DistributionConfig result = null;
InternalDistributedSystem sys = getSystem();
if (sys != null) {
result = system.getConfig();
}
return result;
}
@Override
public Set<InternalDistributedMember> getAllOtherMembers() {
Set<InternalDistributedMember> result =
new HashSet<>(getDistributionManagerIdsIncludingAdmin());
result.remove(getDistributionManagerId());
return result;
}
@Override
public void retainMembersWithSameOrNewerVersion(Collection<InternalDistributedMember> members,
Version version) {
members.removeIf(id -> id.getVersionObject().compareTo(version) < 0);
}
@Override
public void removeMembersWithSameOrNewerVersion(Collection<InternalDistributedMember> members,
Version version) {
members.removeIf(id -> id.getVersionObject().compareTo(version) >= 0);
}
@Override
public Set<InternalDistributedMember> addAllMembershipListenerAndGetAllIds(MembershipListener l) {
MembershipManager mgr = membershipManager;
mgr.getViewLock().writeLock().lock();
try {
synchronized (membersLock) {
// Don't let the members come and go while we are adding this
// listener. This ensures that the listener (probably a
// ReplyProcessor) gets a consistent view of the members.
addAllMembershipListener(l);
return getDistributionManagerIdsIncludingAdmin();
}
} finally {
mgr.getViewLock().writeLock().unlock();
}
}
/**
* Sends a startup message and waits for a response. Returns true if response received; false if
* it timed out or there are no peers.
*/
private boolean sendStartupMessage(StartupOperation startupOperation)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
receivedStartupResponse = false;
boolean ok;
// Be sure to add ourself to the equivalencies list!
Set<InetAddress> equivs = StartupMessage.getMyAddresses(this);
if (equivs == null || equivs.size() == 0) {
// no network interface
equivs = new HashSet<>();
try {
equivs.add(SocketCreator.getLocalHost());
} catch (UnknownHostException e) {
// can't even get localhost
if (getViewMembers().size() > 1) {
throw new SystemConnectException(
"Unable to examine network cards and other members exist");
}
}
}
setEquivalentHosts(equivs);
setEnforceUniqueZone(getConfig().getEnforceUniqueHost());
String redundancyZone = getConfig().getRedundancyZone();
if (redundancyZone != null && !redundancyZone.equals("")) {
setEnforceUniqueZone(true);
}
setRedundancyZone(getDistributionManagerId(), redundancyZone);
if (logger.isDebugEnabled()) {
StringBuffer sb = new StringBuffer();
sb.append("Equivalent IPs for this host: ");
Iterator it = equivs.iterator();
while (it.hasNext()) {
InetAddress in = (InetAddress) it.next();
sb.append(in.toString());
if (it.hasNext()) {
sb.append(", ");
}
} // while
logger.debug(sb);
}
// we need to send this to everyone else; even admin vm
Set<InternalDistributedMember> allOthers = new HashSet<>(getViewMembers());
allOthers.remove(getDistributionManagerId());
if (allOthers.isEmpty()) {
return false; // no peers, we are alone.
}
try {
ok = startupOperation.sendStartupMessage(allOthers, equivs, redundancyZone,
enforceUniqueZone());
} catch (Exception re) {
throw new SystemConnectException(
"One or more peers generated exceptions during connection attempt",
re);
}
if (rejectionMessage != null) {
throw new IncompatibleSystemException(rejectionMessage);
}
boolean receivedAny = receivedStartupResponse;
if (!ok) { // someone didn't reply
int unresponsiveCount;
synchronized (unfinishedStartupsLock) {
if (unfinishedStartups == null)
unresponsiveCount = 0;
else
unresponsiveCount = unfinishedStartups.size();
if (unresponsiveCount != 0) {
if (Boolean.getBoolean("DistributionManager.requireAllStartupResponses")) {
throw new SystemConnectException(
String.format("No startup replies from: %s",
unfinishedStartups));
}
}
} // synchronized
// Bug 35887:
// If there are other members, we must receive at least _one_ response
if (allOthers.size() != 0) { // there exist others
if (!receivedAny) { // and none responded
StringBuilder sb = new StringBuilder();
Iterator itt = allOthers.iterator();
while (itt.hasNext()) {
Object m = itt.next();
sb.append(m.toString());
if (itt.hasNext())
sb.append(", ");
}
if (DEBUG_NO_ACKNOWLEDGEMENTS) {
printStacks(allOthers, false);
}
throw new SystemConnectException(
String.format(
"Received no connection acknowledgments from any of the %s senior cache members: %s",
Integer.toString(allOthers.size()), sb.toString()));
} // and none responded
} // there exist others
InternalDistributedMember e = clusterElderManager.getElderId();
if (e != null) { // an elder exists
boolean unresponsiveElder;
synchronized (unfinishedStartupsLock) {
if (unfinishedStartups == null)
unresponsiveElder = false;
else
unresponsiveElder = unfinishedStartups.contains(e);
}
if (unresponsiveElder) {
logger.warn(
"Forcing an elder join event since a startup response was not received from elder {}.",
e);
handleManagerStartup(e);
}
} // an elder exists
} // someone didn't reply
return receivedAny;
}
/**
* List of InternalDistributedMember's that we have not received startup replies from. If null, we
* have not finished sending the startup message.
* <p>
* Must be synchronized using {@link #unfinishedStartupsLock}
*/
private Set<InternalDistributedMember> unfinishedStartups = null;
/**
* Synchronization for {@link #unfinishedStartups}
*/
private final Object unfinishedStartupsLock = new Object();
@Override
public void setUnfinishedStartups(Collection<InternalDistributedMember> s) {
synchronized (unfinishedStartupsLock) {
Assert.assertTrue(unfinishedStartups == null, "Set unfinished startups twice");
unfinishedStartups = new HashSet<>(s);
// OK, I don't _quite_ trust the list to be current, so let's
// prune it here.
Iterator it = unfinishedStartups.iterator();
synchronized (membersLock) {
while (it.hasNext()) {
InternalDistributedMember m = (InternalDistributedMember) it.next();
if (!isCurrentMember(m)) {
it.remove();
}
} // while
} // synchronized
}
}
@Override
public void removeUnfinishedStartup(InternalDistributedMember m, boolean departed) {
synchronized (unfinishedStartupsLock) {
if (logger.isDebugEnabled()) {
logger.debug("removeUnfinishedStartup for {} wtih {}", m, unfinishedStartups);
}
if (unfinishedStartups == null)
return; // not yet done with startup
if (!unfinishedStartups.remove(m))
return;
String msg;
if (departed) {
msg =
"Stopped waiting for startup reply from <{}> because the peer departed the view.";
} else {
msg =
"Stopped waiting for startup reply from <{}> because the reply was finally received.";
}
logger.info(msg, m);
int numLeft = unfinishedStartups.size();
if (numLeft != 0) {
logger.info("Still awaiting {} response(s) from: {}.",
new Object[] {numLeft, unfinishedStartups});
}
} // synchronized
}
/**
* Processes the first startup response.
*
* @see StartupResponseMessage#process
*/
void processStartupResponse(InternalDistributedMember sender, String theRejectionMessage) {
removeUnfinishedStartup(sender, false);
synchronized (this) {
if (!receivedStartupResponse) {
// only set the cacheTimeDelta once
receivedStartupResponse = true;
}
if (theRejectionMessage != null && rejectionMessage == null) {
// remember the first non-null rejection. This fixes bug 33266
rejectionMessage = theRejectionMessage;
}
}
}
private String prettifyReason(String r) {
final String str = "java.io.IOException:";
if (r.startsWith(str)) {
return r.substring(str.length());
}
return r;
}
/**
* Returns true if id was removed. Returns false if it was not in the list of managers.
*/
private boolean removeManager(InternalDistributedMember theId, boolean crashed, String p_reason) {
String reason = p_reason;
boolean result = false; // initialization shouldn't be required, but...
// Test once before acquiring the lock, fault tolerance for potentially
// recursive (and deadlock) conditions -- bug33626
// Note that it is always safe to _read_ {@link members} without locking
if (isCurrentMember(theId)) {
// Destroy underlying member's resources
reason = prettifyReason(reason);
synchronized (membersLock) {
if (logger.isDebugEnabled()) {
logger.debug("DistributionManager: removing member <{}>; crashed {}; reason = {}", theId,
crashed, reason);
}
Map<InternalDistributedMember, InternalDistributedMember> tmp = new HashMap<>(members);
if (tmp.remove(theId) != null) {
// Note we don't modify in place. This allows reader to get snapshots
// without locking.
if (tmp.isEmpty()) {
tmp = Collections.emptyMap();
} else {
tmp = Collections.unmodifiableMap(tmp);
}
members = tmp;
result = true;
} else {
result = false;
// Don't get upset since this can happen twice due to
// an explicit remove followed by an implicit one caused
// by a JavaGroup view change
}
Set<InternalDistributedMember> tmp2 = new HashSet<>(membersAndAdmin);
if (tmp2.remove(theId)) {
if (tmp2.isEmpty()) {
tmp2 = Collections.emptySet();
} else {
tmp2 = Collections.unmodifiableSet(tmp2);
}
membersAndAdmin = tmp2;
}
removeHostedLocators(theId);
} // synchronized
} // if
redundancyZones.remove(theId);
return result;
}
/**
* Makes note of a new distribution manager that has started up in the distributed cache. Invokes
* the appropriately listeners.
*
* @param theId The id of the distribution manager starting up
*
*/
private void handleManagerStartup(InternalDistributedMember theId) {
HashMap<InternalDistributedMember, InternalDistributedMember> tmp;
synchronized (membersLock) {
// Note test is under membersLock
if (members.containsKey(theId)) {
return; // already accounted for
}
// Note we don't modify in place. This allows reader to get snapshots
// without locking.
tmp = new HashMap<>(members);
tmp.put(theId, theId);
members = Collections.unmodifiableMap(tmp);
Set<InternalDistributedMember> stmp = new HashSet<>(membersAndAdmin);
stmp.add(theId);
membersAndAdmin = Collections.unmodifiableSet(stmp);
} // synchronized
if (theId.getVmKind() != ClusterDistributionManager.LOCATOR_DM_TYPE) {
stats.incNodes(1);
}
logger.info("Admitting member <{}>. Now there are {} non-admin member(s).",
theId, tmp.size());
addMemberEvent(new MemberJoinedEvent(theId));
}
@Override
public boolean isCurrentMember(DistributedMember id) {
Set m;
synchronized (membersLock) {
// access to members synchronized under membersLock in order to
// ensure serialization
m = membersAndAdmin;
}
return m.contains(id);
}
/**
* Makes note of a new console that has started up in the distributed cache.
*
*/
private void handleConsoleStartup(InternalDistributedMember theId) {
// if we have an all listener then notify it NOW.
HashSet<InternalDistributedMember> tmp;
synchronized (membersLock) {
// Note test is under membersLock
if (membersAndAdmin.contains(theId))
return; // already accounted for
// Note we don't modify in place. This allows reader to get snapshots
// without locking.
tmp = new HashSet<>(membersAndAdmin);
tmp.add(theId);
membersAndAdmin = Collections.unmodifiableSet(tmp);
} // synchronized
for (MembershipListener listener : allMembershipListeners) {
listener.memberJoined(this, theId);
}
logger.info("DMMembership: Admitting new administration member < {} >.",
theId);
// Note that we don't add the member to the list of admin consoles until
// we receive a message from them.
}
/**
* Process an incoming distribution message. This includes scheduling it correctly based on the
* message's nioPriority (executor type)
*/
private void handleIncomingDMsg(DistributionMessage message) {
stats.incReceivedMessages(1L);
stats.incReceivedBytes(message.getBytesRead());
stats.incMessageChannelTime(message.resetTimestamp());
if (logger.isDebugEnabled()) {
logger.debug("Received message '{}' from <{}>", message, message.getSender());
}
scheduleIncomingMessage(message);
}
/**
* Makes note of a console that has shut down.
*
* @param theId The id of the console shutting down
* @param crashed only true if we detect this id to be gone from a javagroup view
*
* @see AdminConsoleDisconnectMessage#process
*/
public void handleConsoleShutdown(InternalDistributedMember theId, boolean crashed,
String reason) {
boolean removedConsole = false;
boolean removedMember = false;
synchronized (membersLock) {
// to fix bug 39747 we can only remove this member from
// membersAndAdmin if it is not in members.
// This happens when we have an admin member colocated with a normal DS.
// In this case we need for the normal DS to shutdown or crash.
if (!members.containsKey(theId)) {
if (logger.isDebugEnabled())
logger.debug("DistributionManager: removing admin member <{}>; crashed = {}; reason = {}",
theId, crashed, reason);
Set<InternalDistributedMember> tmp = new HashSet<>(membersAndAdmin);
if (tmp.remove(theId)) {
// Note we don't modify in place. This allows reader to get snapshots
// without locking.
if (tmp.isEmpty()) {
tmp = Collections.emptySet();
} else {
tmp = Collections.unmodifiableSet(tmp);
}
membersAndAdmin = tmp;
removedMember = true;
} else {
// Don't get upset since this can happen twice due to
// an explicit remove followed by an implicit one caused
// by a JavaGroup view change
}
}
removeHostedLocators(theId);
}
synchronized (adminConsolesLock) {
if (adminConsoles.contains(theId)) {
removedConsole = true;
Set<InternalDistributedMember> tmp = new HashSet<>(adminConsoles);
tmp.remove(theId);
if (tmp.isEmpty()) {
tmp = Collections.emptySet();
} else {
tmp = Collections.unmodifiableSet(tmp);
}
adminConsoles = tmp;
}
}
if (removedMember) {
for (MembershipListener listener : allMembershipListeners) {
listener.memberDeparted(this, theId, crashed);
}
}
if (removedConsole) {
String msg;
if (crashed) {
msg = "Administration member at {} crashed: {}";
} else {
msg = "Administration member at {} closed: {}";
}
logger.info(msg, new Object[] {theId, reason});
}
redundancyZones.remove(theId);
}
void shutdownMessageReceived(InternalDistributedMember theId, String reason) {
membershipManager.shutdownMessageReceived(theId, reason);
handleManagerDeparture(theId, false,
"shutdown message received");
}
@Override
public void handleManagerDeparture(InternalDistributedMember theId, boolean p_crashed,
String p_reason) {
alertingService.removeAlertListener(theId);
int vmType = theId.getVmKind();
if (vmType == ADMIN_ONLY_DM_TYPE) {
removeUnfinishedStartup(theId, true);
handleConsoleShutdown(theId, p_crashed, p_reason);
return;
}
// not an admin VM...
if (!isCurrentMember(theId)) {
return; // fault tolerance
}
removeUnfinishedStartup(theId, true);
if (removeManager(theId, p_crashed, p_reason)) {
if (theId.getVmKind() != ClusterDistributionManager.LOCATOR_DM_TYPE) {
stats.incNodes(-1);
}
String msg;
if (p_crashed && !shouldInhibitMembershipWarnings()) {
msg =
"Member at {} unexpectedly left the distributed cache: {}";
addMemberEvent(new MemberCrashedEvent(theId, p_reason));
} else {
msg =
"Member at {} gracefully left the distributed cache: {}";
addMemberEvent(new MemberDepartedEvent(theId, p_reason));
}
logger.info(msg, new Object[] {theId, prettifyReason(p_reason)});
// Remove this manager from the serialQueueExecutor.
if (serialQueuedExecutorPool != null) {
serialQueuedExecutorPool.handleMemberDeparture(theId);
}
}
}
private void handleManagerSuspect(InternalDistributedMember suspect,
InternalDistributedMember whoSuspected, String reason) {
if (!isCurrentMember(suspect)) {
return; // fault tolerance
}
int vmType = suspect.getVmKind();
if (vmType == ADMIN_ONLY_DM_TYPE) {
return;
}
addMemberEvent(new MemberSuspectEvent(suspect, whoSuspected, reason));
}
private void handleViewInstalled(NetView view) {
addMemberEvent(new ViewInstalledEvent(view));
}
private void handleQuorumLost(Set<InternalDistributedMember> failures,
List<InternalDistributedMember> remaining) {
addMemberEvent(new QuorumLostEvent(failures, remaining));
}
/**
* Sends the shutdown message. Not all DistributionManagers need to do this.
*/
private void sendShutdownMessage() {
if (getDMType() == ADMIN_ONLY_DM_TYPE && Locator.getLocators().size() == 0) {
// [bruce] changed above "if" to have ShutdownMessage sent by locators.
// Otherwise the system can hang because an admin member does not trigger
// member-left notification unless a new view is received showing the departure.
// If two locators are simultaneously shut down this may not occur.
return;
}
ShutdownMessage m = new ShutdownMessage();
InternalDistributedMember theId = getDistributionManagerId();
m.setDistributionManagerId(theId);
Set<InternalDistributedMember> allOthers = new HashSet<>(getViewMembers());
allOthers.remove(getDistributionManagerId());
m.setRecipients(allOthers);
// Address recipient = (Address) m.getRecipient();
if (logger.isTraceEnabled()) {
logger.trace("{} Sending {} to {}", getDistributionManagerId(), m,
m.getRecipientsDescription());
}
try {
// m.resetTimestamp(); // nanotimers across systems don't match
long startTime = DistributionStats.getStatTime();
sendViaMembershipManager(m.getRecipients(), m, this, stats);
stats.incSentMessages(1L);
if (DistributionStats.enableClockStats) {
stats.incSentMessagesTime(DistributionStats.getStatTime() - startTime);
}
} catch (CancelException e) {
logger.debug(String.format("CancelException caught sending shutdown: %s", e.getMessage()), e);
} catch (Exception ex2) {
logger.fatal("While sending shutdown message", ex2);
} finally {
// Even if the message wasn't sent, *lie* about it, so that
// everyone believes that message distribution is done.
shutdownMsgSent = true;
}
}
/**
* Returns the executor for the given type of processor.
*/
public Executor getExecutor(int processorType, InternalDistributedMember sender) {
switch (processorType) {
case STANDARD_EXECUTOR:
return getThreadPool();
case SERIAL_EXECUTOR:
return getSerialExecutor(sender);
case VIEW_EXECUTOR:
return viewThread;
case HIGH_PRIORITY_EXECUTOR:
return getHighPriorityThreadPool();
case WAITING_POOL_EXECUTOR:
return getWaitingThreadPool();
case PARTITIONED_REGION_EXECUTOR:
return getPartitionedRegionExcecutor();
case REGION_FUNCTION_EXECUTION_EXECUTOR:
return getFunctionExecutor();
default:
throw new InternalGemFireError(String.format("unknown processor type %s",
processorType));
}
}
/**
* Actually does the work of sending a message out over the distribution channel.
*
* @param message the message to send
* @return list of recipients that did not receive the message because they left the view (null if
* all received it or it was sent to {@link DistributionMessage#ALL_RECIPIENTS}.
* @throws NotSerializableException If <code>message</code> cannot be serialized
*/
Set<InternalDistributedMember> sendOutgoing(DistributionMessage message)
throws NotSerializableException {
long startTime = DistributionStats.getStatTime();
Set<InternalDistributedMember> result =
sendViaMembershipManager(message.getRecipients(), message, this, stats);
long endTime = 0L;
if (DistributionStats.enableClockStats) {
endTime = NanoTimer.getTime();
}
boolean sentToAll = message.forAll();
if (sentToAll) {
stats.incBroadcastMessages(1L);
if (DistributionStats.enableClockStats) {
stats.incBroadcastMessagesTime(endTime - startTime);
}
}
stats.incSentMessages(1L);
if (DistributionStats.enableClockStats) {
stats.incSentMessagesTime(endTime - startTime);
stats.incDistributeMessageTime(endTime - message.getTimestamp());
}
return result;
}
/**
* @return recipients who did not receive the message
* @throws NotSerializableException If <codE>message</code> cannot be serialized
*/
private Set<InternalDistributedMember> sendMessage(DistributionMessage message)
throws NotSerializableException {
try {
// Verify we're not too far into the shutdown
stopper.checkCancelInProgress(null);
// avoid race condition during startup
waitUntilReadyToSendMsgs(message);
return sendOutgoing(message);
} catch (NotSerializableException | ToDataException | ReenteredConnectException
| InvalidDeltaException | CancelException ex) {
throw ex;
} catch (Exception ex) {
exceptionInThreads = true;
String receiver = "NULL";
if (message != null) {
receiver = message.getRecipientsDescription();
}
logger.fatal(String.format("While pushing message <%s> to %s", message, receiver), ex);
if (message == null || message.forAll()) {
return null;
}
return new HashSet<>(Arrays.asList(message.getRecipients()));
}
}
/**
* @return list of recipients who did not receive the message because they left the view (null if
* all received it or it was sent to {@link DistributionMessage#ALL_RECIPIENTS}).
* @throws NotSerializableException If content cannot be serialized
*/
private Set<InternalDistributedMember> sendViaMembershipManager(
InternalDistributedMember[] destinations,
DistributionMessage content, ClusterDistributionManager dm, DistributionStats stats)
throws NotSerializableException {
if (membershipManager == null) {
logger.warn("Attempting a send to a disconnected DistributionManager");
if (destinations.length == 1 && destinations[0] == DistributionMessage.ALL_RECIPIENTS)
return null;
HashSet<InternalDistributedMember> result = new HashSet<>();
Collections.addAll(result, destinations);
return result;
}
return membershipManager.send(destinations, content, stats);
}
/**
* Schedule a given message appropriately, depending upon its executor kind.
*/
private void scheduleIncomingMessage(DistributionMessage message) {
/*
* Potential race condition between starting up and getting other distribution manager ids -- DM
* will only be initialized upto the point at which it called startThreads
*/
waitUntilReadyForMessages();
message.schedule(this);
}
@Override
public InternalDistributedMember getElderId() throws DistributedSystemDisconnectedException {
return clusterElderManager.getElderId();
}
@Override
public boolean isElder() {
return clusterElderManager.isElder();
}
@Override
public boolean isLoner() {
return false;
}
@Override
public ElderState getElderState(boolean waitToBecomeElder) throws InterruptedException {
return clusterElderManager.getElderState(waitToBecomeElder);
}
/**
* Waits until elder if newElder or newElder is no longer a member
*
* @return true if newElder is the elder; false if it is no longer a member or we are the elder.
*/
public boolean waitForElder(final InternalDistributedMember desiredElder)
throws InterruptedException {
return clusterElderManager.waitForElder(desiredElder);
}
@Override
public ExecutorService getThreadPool() {
return threadPool;
}
@Override
public ExecutorService getHighPriorityThreadPool() {
return highPriorityPool;
}
@Override
public ExecutorService getWaitingThreadPool() {
return waitingPool;
}
@Override
public ExecutorService getPrMetaDataCleanupThreadPool() {
return prMetaDataCleanupThreadPool;
}
private Executor getPartitionedRegionExcecutor() {
if (partitionedRegionThread != null) {
return partitionedRegionThread;
} else {
return partitionedRegionPool;
}
}
@Override
public Executor getFunctionExecutor() {
if (functionExecutionThread != null) {
return functionExecutionThread;
} else {
return functionExecutionPool;
}
}
private Executor getSerialExecutor(InternalDistributedMember sender) {
if (MULTI_SERIAL_EXECUTORS) {
return serialQueuedExecutorPool.getThrottledSerialExecutor(sender);
} else {
return serialThread;
}
}
/** returns the serialThread's queue if throttling is being used, null if not */
public OverflowQueueWithDMStats<Runnable> getSerialQueue(InternalDistributedMember sender) {
if (MULTI_SERIAL_EXECUTORS) {
return serialQueuedExecutorPool.getSerialQueue(sender);
} else {
return serialQueue;
}
}
@Override
public ThreadsMonitoring getThreadMonitoring() {
return threadMonitor;
}
/**
* Sets the administration agent associated with this distribution manager.
*/
public void setAgent(RemoteGfManagerAgent agent) {
// Don't let the agent be set twice. There should be a one-to-one
// correspondence between admin agent and distribution manager.
if (agent != null) {
if (this.agent != null) {
throw new IllegalStateException(
"There is already an Admin Agent associated with this distribution manager.");
}
} else {
if (this.agent == null) {
throw new IllegalStateException(
"There was never an Admin Agent associated with this distribution manager.");
}
}
this.agent = agent;
}
/**
* Returns the agent that owns this distribution manager. (in ConsoleDistributionManager)
*/
public RemoteGfManagerAgent getAgent() {
return agent;
}
/**
* Returns a description of the distribution configuration used for this distribution manager. (in
* ConsoleDistributionManager)
*
* @return <code>null</code> if no admin {@linkplain #getAgent agent} is associated with this
* distribution manager
*/
public String getDistributionConfigDescription() {
if (agent == null) {
return null;
} else {
return agent.getTransport().toString();
}
}
/* -----------------------------Health Monitor------------------------------ */
private final ConcurrentMap<InternalDistributedMember, HealthMonitor> hmMap =
new ConcurrentHashMap<>();
private volatile InternalCache cache;
/**
* Returns the health monitor for this distribution manager and owner.
*
* @param owner the agent that owns the returned monitor
* @return the health monitor created by the owner; <code>null</code> if the owner has now created
* a monitor.
* @since GemFire 3.5
*/
@Override
public HealthMonitor getHealthMonitor(InternalDistributedMember owner) {
return hmMap.get(owner);
}
/**
* Returns the health monitor for this distribution manager.
*
* @param owner the agent that owns the created monitor
* @param cfg the configuration to use when creating the monitor
* @since GemFire 3.5
*/
@Override
public void createHealthMonitor(InternalDistributedMember owner, GemFireHealthConfig cfg) {
if (closeInProgress) {
return;
}
{
final HealthMonitor hm = getHealthMonitor(owner);
if (hm != null) {
hm.stop();
hmMap.remove(owner);
}
}
{
HealthMonitorImpl newHm = new HealthMonitorImpl(owner, cfg, this);
newHm.start();
hmMap.put(owner, newHm);
}
}
/**
* Remove a monitor that was previously created.
*
* @param owner the agent that owns the monitor to remove
*/
@Override
public void removeHealthMonitor(InternalDistributedMember owner, int theId) {
final HealthMonitor hm = getHealthMonitor(owner);
if (hm != null && hm.getId() == theId) {
hm.stop();
hmMap.remove(owner);
}
}
private void removeAllHealthMonitors() {
Iterator it = hmMap.values().iterator();
while (it.hasNext()) {
HealthMonitor hm = (HealthMonitor) it.next();
hm.stop();
it.remove();
}
}
@Override
public Set<InternalDistributedMember> getAdminMemberSet() {
return adminConsoles;
}
/** Returns count of members filling the specified role */
@Override
public int getRoleCount(Role role) {
int count = 0;
Set<InternalDistributedMember> mbrs = getDistributionManagerIds();
for (InternalDistributedMember mbr : mbrs) {
Set<Role> roles = (mbr).getRoles();
for (Role mbrRole : roles) {
if (mbrRole.equals(role)) {
count++;
break;
}
}
}
return count;
}
/** Returns true if at least one member is filling the specified role */
@Override
public boolean isRolePresent(Role role) {
Set<InternalDistributedMember> mbrs = getDistributionManagerIds();
for (InternalDistributedMember mbr : mbrs) {
Set<Role> roles = mbr.getRoles();
for (Role mbrRole : roles) {
if ((mbrRole).equals(role)) {
return true;
}
}
}
return false;
}
/** Returns a set of all roles currently in the distributed system. */
@Override
public Set<Role> getAllRoles() {
Set<Role> allRoles = new HashSet<>();
Set<InternalDistributedMember> mbrs = getDistributionManagerIds();
for (InternalDistributedMember mbr : mbrs) {
allRoles.addAll(mbr.getRoles());
}
return allRoles;
}
/**
* Returns the membership manager for this distributed system. The membership manager owns the
* membership set and handles all communications. The manager should NOT be used to bypass
* DistributionManager to send or receive messages.
* <p>
* This method was added to allow hydra to obtain thread-local data for transport from one thread
* to another.
*/
@Override
public MembershipManager getMembershipManager() {
// NOTE: do not add cancellation checks here. This method is
// used during auto-reconnect after the DS has been closed
return membershipManager;
}
////////////////////// Inner Classes //////////////////////
/**
* This class is used for DM's multi serial executor. The serial messages are managed/executed by
* multiple serial thread. This class takes care of executing messages related to a sender using
* the same thread.
*/
private static class SerialQueuedExecutorPool {
/** To store the serial threads */
final ConcurrentMap<Integer, ExecutorService> serialQueuedExecutorMap =
new ConcurrentHashMap<>(MAX_SERIAL_QUEUE_THREAD);
/** To store the queue associated with thread */
final Map<Integer, OverflowQueueWithDMStats<Runnable>> serialQueuedMap =
new HashMap<>(MAX_SERIAL_QUEUE_THREAD);
/** Holds mapping between sender to the serial thread-id */
final Map<InternalDistributedMember, Integer> senderToSerialQueueIdMap = new HashMap<>();
/**
* Holds info about unused thread, a thread is marked unused when the member associated with it
* has left distribution system.
*/
final ArrayList<Integer> threadMarkedForUse = new ArrayList<>();
final DistributionStats stats;
final boolean throttlingDisabled;
final ThreadsMonitoring threadMonitoring;
SerialQueuedExecutorPool(DistributionStats stats,
boolean throttlingDisabled, ThreadsMonitoring tMonitoring) {
this.stats = stats;
this.throttlingDisabled = throttlingDisabled;
threadMonitoring = tMonitoring;
}
/*
* Returns an id of the thread in serialQueuedExecutorMap, thats mapped to the given seder.
*
*
* @param createNew boolean flag to indicate whether to create a new id, if id doesnot exists.
*/
private Integer getQueueId(InternalDistributedMember sender, boolean createNew) {
// Create a new Id.
Integer queueId;
synchronized (senderToSerialQueueIdMap) {
// Check if there is a executor associated with this sender.
queueId = senderToSerialQueueIdMap.get(sender);
if (!createNew || queueId != null) {
return queueId;
}
// Create new.
// Check if any threads are availabe that is marked for Use.
if (!threadMarkedForUse.isEmpty()) {
queueId = threadMarkedForUse.remove(0);
}
// If Map is full, use the threads in round-robin fashion.
if (queueId == null) {
queueId = (serialQueuedExecutorMap.size() + 1) % MAX_SERIAL_QUEUE_THREAD;
}
senderToSerialQueueIdMap.put(sender, queueId);
}
return queueId;
}
/*
* Returns the queue associated with this sender. Used in FlowControl for throttling (based on
* queue size).
*/
OverflowQueueWithDMStats<Runnable> getSerialQueue(InternalDistributedMember sender) {
Integer queueId = getQueueId(sender, false);
if (queueId == null) {
return null;
}
return serialQueuedMap.get(queueId);
}
/*
* Returns the serial queue executor, before returning the thread this applies throttling, based
* on the total serial queue size (total - sum of all the serial queue size). The throttling is
* applied during put event, this doesnt block the extract operation on the queue.
*
*/
ExecutorService getThrottledSerialExecutor(
InternalDistributedMember sender) {
ExecutorService executor = getSerialExecutor(sender);
// Get the total serial queue size.
long totalSerialQueueMemSize = stats.getInternalSerialQueueBytes();
// for tcp socket reader threads, this code throttles the thread
// to keep the sender-side from overwhelming the receiver.
// UDP readers are throttled in the FC protocol, which queries
// the queue to see if it should throttle
if (stats.getInternalSerialQueueBytes() > TOTAL_SERIAL_QUEUE_THROTTLE
&& !DistributionMessage.isPreciousThread()) {
do {
boolean interrupted = Thread.interrupted();
try {
float throttlePercent = (float) (totalSerialQueueMemSize - TOTAL_SERIAL_QUEUE_THROTTLE)
/ (float) (TOTAL_SERIAL_QUEUE_BYTE_LIMIT - TOTAL_SERIAL_QUEUE_THROTTLE);
int sleep = (int) (100.0 * throttlePercent);
sleep = Math.max(sleep, 1);
Thread.sleep(sleep);
} catch (InterruptedException ex) {
interrupted = true;
// FIXME-InterruptedException
// Perhaps we should return null here?
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
stats.getSerialQueueHelper().incThrottleCount();
} while (stats.getInternalSerialQueueBytes() >= TOTAL_SERIAL_QUEUE_BYTE_LIMIT);
}
return executor;
}
/*
* Returns the serial queue executor for the given sender.
*/
ExecutorService getSerialExecutor(InternalDistributedMember sender) {
ExecutorService executor;
Integer queueId = getQueueId(sender, true);
if ((executor =
serialQueuedExecutorMap.get(queueId)) != null) {
return executor;
}
// If executor doesn't exists for this sender, create one.
executor = createSerialExecutor(queueId);
serialQueuedExecutorMap.put(queueId, executor);
if (logger.isDebugEnabled()) {
logger.debug(
"Created Serial Queued Executor With queueId {}. Total number of live Serial Threads :{}",
queueId, serialQueuedExecutorMap.size());
}
stats.incSerialPooledThread();
return executor;
}
/*
* Creates a serial queue executor.
*/
private ExecutorService createSerialExecutor(final Integer id) {
OverflowQueueWithDMStats<Runnable> poolQueue;
if (SERIAL_QUEUE_BYTE_LIMIT == 0 || throttlingDisabled) {
poolQueue = new OverflowQueueWithDMStats<>(stats.getSerialQueueHelper());
} else {
poolQueue = new ThrottlingMemLinkedQueueWithDMStats<>(SERIAL_QUEUE_BYTE_LIMIT,
SERIAL_QUEUE_THROTTLE, SERIAL_QUEUE_SIZE_LIMIT, SERIAL_QUEUE_SIZE_THROTTLE,
stats.getSerialQueueHelper());
}
serialQueuedMap.put(id, poolQueue);
return LoggingExecutors.newSerialThreadPool("Pooled Serial Message Processor" + id + "-",
thread -> stats.incSerialPooledThreadStarts(), this::doSerialPooledThread,
stats.getSerialPooledProcessorHelper(), threadMonitoring, poolQueue);
}
private void doSerialPooledThread(Runnable command) {
ConnectionTable.threadWantsSharedResources();
Connection.makeReaderThread();
try {
command.run();
} finally {
ConnectionTable.releaseThreadsSockets();
}
}
/*
* Does cleanup relating to this member. And marks the serial executor associated with this
* member for re-use.
*/
private void handleMemberDeparture(InternalDistributedMember member) {
Integer queueId = getQueueId(member, false);
if (queueId == null) {
return;
}
boolean isUsed = false;
synchronized (senderToSerialQueueIdMap) {
senderToSerialQueueIdMap.remove(member);
// Check if any other members are using the same executor.
for (Integer value : senderToSerialQueueIdMap.values()) {
if (value.equals(queueId)) {
isUsed = true;
break;
}
}
// If not used mark this as unused.
if (!isUsed) {
if (logger.isInfoEnabled(LogMarker.DM_MARKER))
logger.info(LogMarker.DM_MARKER,
"Marking the SerialQueuedExecutor with id : {} used by the member {} to be unused.",
new Object[] {queueId, member});
threadMarkedForUse.add(queueId);
}
}
}
private void awaitTermination(long time, TimeUnit unit) throws InterruptedException {
long remainingNanos = unit.toNanos(time);
long start = System.nanoTime();
for (ExecutorService executor : serialQueuedExecutorMap.values()) {
executor.awaitTermination(remainingNanos, TimeUnit.NANOSECONDS);
remainingNanos = (System.nanoTime() - start);
if (remainingNanos <= 0) {
return;
}
}
}
private void shutdown() {
for (ExecutorService executor : serialQueuedExecutorMap
.values()) {
executor.shutdown();
}
}
}
/**
* This is the listener implementation for responding from events from the Membership Manager.
*
*/
private class DMListener implements DistributedMembershipListener {
ClusterDistributionManager dm;
DMListener(ClusterDistributionManager dm) {
this.dm = dm;
}
@Override
public void membershipFailure(String reason, Throwable t) {
exceptionInThreads = true;
rootCause = t;
getSystem().disconnect(reason, true);
}
@Override
public void messageReceived(DistributionMessage message) {
handleIncomingDMsg(message);
}
@Override
public void newMemberConnected(InternalDistributedMember member) {
// Do not elect the elder here as surprise members invoke this callback
// without holding the view lock. That can cause a race condition and
// subsequent deadlock (#45566). Elder selection is now done when a view
// is installed.
dm.addNewMember(member);
}
@Override
public void memberDeparted(InternalDistributedMember theId, boolean crashed, String reason) {
boolean wasAdmin = getAdminMemberSet().contains(theId);
if (wasAdmin) {
// Pretend we received an AdminConsoleDisconnectMessage from the console that
// is no longer in the JavaGroup view.
// It must have died without sending a ShutdownMessage.
// This fixes bug 28454.
AdminConsoleDisconnectMessage message = new AdminConsoleDisconnectMessage();
message.setSender(theId);
message.setCrashed(crashed);
message.setAlertListenerExpected(true);
message.setIgnoreAlertListenerRemovalFailure(true); // we don't know if it was a listener so
// don't issue a warning
message.setRecipient(localAddress);
message.setReason(reason); // added for #37950
handleIncomingDMsg(message);
}
dm.handleManagerDeparture(theId, crashed, reason);
}
@Override
public void memberSuspect(InternalDistributedMember suspect,
InternalDistributedMember whoSuspected, String reason) {
dm.handleManagerSuspect(suspect, whoSuspected, reason);
}
@Override
public void viewInstalled(NetView view) {
dm.handleViewInstalled(view);
}
@Override
public void quorumLost(Set<InternalDistributedMember> failures,
List<InternalDistributedMember> remaining) {
dm.handleQuorumLost(failures, remaining);
}
@Override
public ClusterDistributionManager getDM() {
return dm;
}
@Override
public void saveConfig() {
if (!getConfig().getDisableAutoReconnect()) {
cache.saveCacheXmlForReconnect();
}
}
}
private abstract static class MemberEvent {
private InternalDistributedMember id;
MemberEvent(InternalDistributedMember id) {
this.id = id;
}
public InternalDistributedMember getId() {
return id;
}
void handleEvent(ClusterDistributionManager manager) {
handleEvent(manager, manager.membershipListeners.keySet());
handleEvent(manager, manager.allMembershipListeners);
}
protected abstract void handleEvent(ClusterDistributionManager manager,
MembershipListener listener);
private void handleEvent(ClusterDistributionManager manager,
Set<MembershipListener> membershipListeners) {
for (MembershipListener listener : membershipListeners) {
try {
handleEvent(manager, listener);
} catch (CancelException e) {
if (manager.shouldInhibitMembershipWarnings()) {
if (logger.isTraceEnabled()) {
logger.trace("MemberEventInvoker: cancelled");
}
} else {
logger.warn("Unexpected cancellation", e);
}
break;
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.warn(String.format("Exception while calling membership listener for event: %s",
this),
t);
}
}
}
}
/**
* This is an event reflecting that a InternalDistributedMember has joined the system.
*
*
*/
private static class MemberJoinedEvent extends MemberEvent {
MemberJoinedEvent(InternalDistributedMember id) {
super(id);
}
@Override
public String toString() {
return "member " + getId() + " joined";
}
@Override
protected void handleEvent(ClusterDistributionManager manager, MembershipListener listener) {
listener.memberJoined(manager, getId());
}
}
/**
* This is an event reflecting that a InternalDistributedMember has left the system.
*
*/
private static class MemberDepartedEvent extends MemberEvent {
String reason;
MemberDepartedEvent(InternalDistributedMember id, String r) {
super(id);
reason = r;
}
@Override
public String toString() {
return "member " + getId() + " departed (" + reason + ")";
}
@Override
protected void handleEvent(ClusterDistributionManager manager, MembershipListener listener) {
listener.memberDeparted(manager, getId(), false);
}
}
/**
* This is an event reflecting that a InternalDistributedMember has left the system in an
* unexpected way.
*
*
*/
private static class MemberCrashedEvent extends MemberEvent {
String reason;
MemberCrashedEvent(InternalDistributedMember id, String r) {
super(id);
reason = r;
}
@Override
public String toString() {
return "member " + getId() + " crashed: " + reason;
}
@Override
protected void handleEvent(ClusterDistributionManager manager, MembershipListener listener) {
listener.memberDeparted(manager, getId(), true/* crashed */);
}
}
/**
* This is an event reflecting that a InternalDistributedMember may be missing but has not yet
* left the system.
*/
private static class MemberSuspectEvent extends MemberEvent {
InternalDistributedMember whoSuspected;
String reason;
MemberSuspectEvent(InternalDistributedMember suspect, InternalDistributedMember whoSuspected,
String reason) {
super(suspect);
this.whoSuspected = whoSuspected;
this.reason = reason;
}
public InternalDistributedMember whoSuspected() {
return whoSuspected;
}
public String getReason() {
return reason;
}
@Override
public String toString() {
return "member " + getId() + " suspected by: " + whoSuspected + " reason: " + reason;
}
@Override
protected void handleEvent(ClusterDistributionManager manager, MembershipListener listener) {
listener.memberSuspect(manager, getId(), whoSuspected(), reason);
}
}
private static class ViewInstalledEvent extends MemberEvent {
NetView view;
ViewInstalledEvent(NetView view) {
super(null);
this.view = view;
}
public long getViewId() {
return view.getViewId();
}
@Override
public String toString() {
return "view installed: " + view;
}
@Override
public void handleEvent(ClusterDistributionManager manager) {
manager.handleViewInstalledEvent(this);
}
@Override
protected void handleEvent(ClusterDistributionManager manager, MembershipListener listener) {
throw new UnsupportedOperationException();
}
}
private static class QuorumLostEvent extends MemberEvent {
Set<InternalDistributedMember> failures;
List<InternalDistributedMember> remaining;
QuorumLostEvent(Set<InternalDistributedMember> failures,
List<InternalDistributedMember> remaining) {
super(null);
this.failures = failures;
this.remaining = remaining;
}
public Set<InternalDistributedMember> getFailures() {
return failures;
}
public List<InternalDistributedMember> getRemaining() {
return remaining;
}
@Override
public String toString() {
return "quorum lost. failures=" + failures + "; remaining=" + remaining;
}
@Override
protected void handleEvent(ClusterDistributionManager manager, MembershipListener listener) {
listener.quorumLost(manager, getFailures(), getRemaining());
}
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.distributed.internal.DM#getRootCause()
*/
@Override
public Throwable getRootCause() {
return rootCause;
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.distributed.internal.DM#setRootCause(java.lang.Throwable)
*/
@Override
public void setRootCause(Throwable t) {
rootCause = t;
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.distributed.internal.DM#getMembersOnThisHost()
*
* @since GemFire 5.9
*/
@Override
public Set<InternalDistributedMember> getMembersInThisZone() {
return getMembersInSameZone(getDistributionManagerId());
}
@Override
public Set<InternalDistributedMember> getMembersInSameZone(
InternalDistributedMember targetMember) {
Set<InternalDistributedMember> buddyMembers = new HashSet<>();
if (!redundancyZones.isEmpty()) {
synchronized (redundancyZones) {
String targetZone = redundancyZones.get(targetMember);
for (Map.Entry<InternalDistributedMember, String> entry : redundancyZones.entrySet()) {
if (entry.getValue().equals(targetZone)) {
buddyMembers.add(entry.getKey());
}
}
}
} else {
buddyMembers.add(targetMember);
Set<InetAddress> targetAddrs = getEquivalents(targetMember.getInetAddress());
for (InternalDistributedMember o : getDistributionManagerIds()) {
if (!Collections.disjoint(targetAddrs, getEquivalents(o.getInetAddress()))) {
buddyMembers.add(o);
}
}
}
return buddyMembers;
}
@Override
public boolean areInSameZone(InternalDistributedMember member1,
InternalDistributedMember member2) {
if (!redundancyZones.isEmpty()) {
String zone1 = redundancyZones.get(member1);
String zone2 = redundancyZones.get(member2);
return zone1 != null && zone1.equals(zone2);
} else {
return areOnEquivalentHost(member1, member2);
}
}
@Override
public void acquireGIIPermitUninterruptibly() {
parallelGIIs.acquireUninterruptibly();
stats.incInitialImageRequestsInProgress(1);
}
@Override
public void releaseGIIPermit() {
stats.incInitialImageRequestsInProgress(-1);
parallelGIIs.release();
}
public void setDistributedSystemId(int distributedSystemId) {
if (distributedSystemId != -1) {
this.distributedSystemId = distributedSystemId;
}
}
@Override
public int getDistributedSystemId() {
return distributedSystemId;
}
/**
* this causes the given InternalDistributedMembers to log thread dumps. If useNative is true we
* attempt to use OSProcess native code for the dumps. This goes to stdout instead of the
* system.log files.
*/
public void printStacks(Collection<InternalDistributedMember> ids, boolean useNative) {
Set<InternalDistributedMember> requiresMessage = new HashSet<>();
if (ids.contains(localAddress)) {
OSProcess.printStacks(0, useNative);
}
if (useNative) {
requiresMessage.addAll(ids);
ids.remove(localAddress);
} else {
for (InternalDistributedMember mbr : ids) {
if (mbr.getProcessId() > 0
&& mbr.getInetAddress().equals(localAddress.getInetAddress())) {
if (!mbr.equals(localAddress)) {
if (!OSProcess.printStacks(mbr.getProcessId(), false)) {
requiresMessage.add(mbr);
}
}
} else {
requiresMessage.add(mbr);
}
}
}
if (requiresMessage.size() > 0) {
HighPriorityAckedMessage msg = new HighPriorityAckedMessage();
msg.dumpStacks(requiresMessage, useNative, false);
}
}
@Override
public Set<DistributedMember> getGroupMembers(String group) {
HashSet<DistributedMember> result = null;
for (DistributedMember m : getDistributionManagerIdsIncludingAdmin()) {
if (m.getGroups().contains(group)) {
if (result == null) {
result = new HashSet<>();
}
result.add(m);
}
}
if (result == null) {
return Collections.emptySet();
} else {
return result;
}
}
@Override
public Set<InternalDistributedMember> getNormalDistributionManagerIds() {
// access to members synchronized under membersLock in order to
// ensure serialization
synchronized (membersLock) {
HashSet<InternalDistributedMember> result = new HashSet<>();
for (InternalDistributedMember m : members.keySet()) {
if (m.getVmKind() != ClusterDistributionManager.LOCATOR_DM_TYPE) {
result.add(m);
}
}
return result;
}
}
/** test method to get the member IDs of all locators in the distributed system */
public Set<InternalDistributedMember> getLocatorDistributionManagerIds() {
// access to members synchronized under membersLock in order to
// ensure serialization
synchronized (membersLock) {
HashSet<InternalDistributedMember> result = new HashSet<>();
for (InternalDistributedMember m : members.keySet()) {
if (m.getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) {
result.add(m);
}
}
return result;
}
}
@Override
public void setCache(InternalCache instance) {
cache = instance;
}
@Override
public InternalCache getCache() {
return cache;
}
@Override
public InternalCache getExistingCache() {
InternalCache result = cache;
if (result == null) {
throw new CacheClosedException(
"A cache has not yet been created.");
}
result.getCancelCriterion().checkCancelInProgress(null);
if (result.isClosed()) {
throw result.getCacheClosedException(
"The cache has been closed.", null);
}
return result;
}
private static class Stopper extends CancelCriterion {
private ClusterDistributionManager dm;
Stopper(ClusterDistributionManager dm) {
this.dm = dm;
}
@Override
public String cancelInProgress() {
checkFailure();
// remove call to validateDM() to fix bug 38356
if (dm.shutdownMsgSent) {
return String.format("%s: Message distribution has terminated",
dm.toString());
}
if (dm.rootCause != null) {
return dm.toString() + ": " + dm.rootCause.getMessage();
}
// Nope.
return null;
}
@Override
public RuntimeException generateCancelledException(Throwable e) {
String reason = cancelInProgress();
if (reason == null) {
return null;
}
Throwable rc = dm.rootCause; // volatile read
if (rc == null) {
// No root cause, specify the one given and be done with it.
return new DistributedSystemDisconnectedException(reason, e);
}
if (e == null) {
// Caller did not specify any root cause, so just use our own.
return new DistributedSystemDisconnectedException(reason, rc);
}
// Attempt to stick rootCause at tail end of the exception chain.
Throwable nt = e;
while (nt.getCause() != null) {
nt = nt.getCause();
}
if (nt == rc) {
// Root cause already in place; we're done
return new DistributedSystemDisconnectedException(reason, e);
}
try {
nt.initCause(rc);
return new DistributedSystemDisconnectedException(reason, e);
} catch (IllegalStateException e2) {
// Bug 39496 (Jrockit related) Give up. The following
// error is not entirely sane but gives the correct general picture.
return new DistributedSystemDisconnectedException(reason, rc);
}
}
}
@Override
public CancelCriterion getCancelCriterion() {
return stopper;
}
}