| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.geode.distributed.internal; |
| |
| import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; |
| import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; |
| import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.Reader; |
| import java.lang.reflect.Array; |
| import java.net.InetAddress; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Properties; |
| import java.util.ServiceLoader; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.StringTokenizer; |
| import java.util.TreeSet; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import io.micrometer.core.instrument.MeterRegistry; |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelCriterion; |
| import org.apache.geode.CancelException; |
| import org.apache.geode.ForcedDisconnectException; |
| import org.apache.geode.GemFireConfigException; |
| import org.apache.geode.GemFireIOException; |
| import org.apache.geode.LogWriter; |
| import org.apache.geode.StatisticDescriptor; |
| import org.apache.geode.Statistics; |
| import org.apache.geode.StatisticsType; |
| import org.apache.geode.SystemConnectException; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.annotations.Immutable; |
| import org.apache.geode.annotations.VisibleForTesting; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.annotations.internal.MutableForTesting; |
| import org.apache.geode.cache.CacheClosedException; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.DurableClientAttributes; |
| import org.apache.geode.distributed.internal.locks.GrantorRequestProcessor; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.distributed.internal.membership.MembershipManager; |
| import org.apache.geode.distributed.internal.membership.QuorumChecker; |
| import org.apache.geode.distributed.internal.membership.gms.messenger.MembershipInformation; |
| import org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.DSFIDFactory; |
| import org.apache.geode.internal.InternalDataSerializer; |
| import org.apache.geode.internal.InternalInstantiator; |
| import org.apache.geode.internal.SystemTimer; |
| import org.apache.geode.internal.admin.remote.DistributionLocatorId; |
| import org.apache.geode.internal.alerting.AlertLevel; |
| import org.apache.geode.internal.alerting.AlertMessaging; |
| import org.apache.geode.internal.alerting.AlertingService; |
| import org.apache.geode.internal.alerting.AlertingSession; |
| import org.apache.geode.internal.cache.CacheServerImpl; |
| import org.apache.geode.internal.cache.EventID; |
| import org.apache.geode.internal.cache.GemFireCacheImpl; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.InternalCacheBuilder; |
| import org.apache.geode.internal.cache.execute.FunctionServiceStats; |
| import org.apache.geode.internal.cache.execute.FunctionStats; |
| import org.apache.geode.internal.cache.execute.InternalFunctionService; |
| import org.apache.geode.internal.cache.tier.sockets.EncryptorImpl; |
| import org.apache.geode.internal.cache.xmlcache.CacheServerCreation; |
| import org.apache.geode.internal.config.ClusterConfigurationNotAvailableException; |
| import org.apache.geode.internal.logging.InternalLogWriter; |
| import org.apache.geode.internal.logging.LogConfig; |
| import org.apache.geode.internal.logging.LogConfigListener; |
| import org.apache.geode.internal.logging.LogConfigSupplier; |
| import org.apache.geode.internal.logging.LogFile; |
| import org.apache.geode.internal.logging.LogService; |
| import org.apache.geode.internal.logging.LogWriterFactory; |
| import org.apache.geode.internal.logging.LoggingSession; |
| import org.apache.geode.internal.logging.LoggingThread; |
| import org.apache.geode.internal.logging.NullLoggingSession; |
| import org.apache.geode.internal.net.SocketCreatorFactory; |
| import org.apache.geode.internal.offheap.MemoryAllocator; |
| import org.apache.geode.internal.offheap.OffHeapStorage; |
| import org.apache.geode.internal.security.SecurityService; |
| import org.apache.geode.internal.security.SecurityServiceFactory; |
| import org.apache.geode.internal.statistics.DummyStatisticsRegistry; |
| import org.apache.geode.internal.statistics.GemFireStatSampler; |
| import org.apache.geode.internal.statistics.StatisticsConfig; |
| import org.apache.geode.internal.statistics.StatisticsManager; |
| import org.apache.geode.internal.statistics.StatisticsManagerFactory; |
| import org.apache.geode.internal.statistics.StatisticsRegistry; |
| import org.apache.geode.internal.statistics.platform.LinuxProcFsStatistics; |
| import org.apache.geode.internal.tcp.ConnectionTable; |
| import org.apache.geode.internal.util.JavaWorkarounds; |
| import org.apache.geode.management.ManagementException; |
| import org.apache.geode.pdx.internal.TypeRegistry; |
| import org.apache.geode.security.GemFireSecurityException; |
| import org.apache.geode.security.PostProcessor; |
| import org.apache.geode.security.SecurityManager; |
| |
| /** |
| * The concrete implementation of {@link DistributedSystem} that provides internal-only |
| * functionality. |
| * |
| * @since GemFire 3.0 |
| */ |
| public class InternalDistributedSystem extends DistributedSystem |
| implements LogConfigSupplier { |
| |
| /** |
| * True if the user is allowed lock when memory resources appear to be overcommitted. |
| */ |
| private static final boolean ALLOW_MEMORY_LOCK_WHEN_OVERCOMMITTED = |
| Boolean.getBoolean(GEMFIRE_PREFIX + "Cache.ALLOW_MEMORY_OVERCOMMIT"); |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static final String DISABLE_MANAGEMENT_PROPERTY = |
| GEMFIRE_PREFIX + "disableManagement"; |
| |
| public static final String ALLOW_MULTIPLE_SYSTEMS_PROPERTY = |
| GEMFIRE_PREFIX + "ALLOW_MULTIPLE_SYSTEMS"; |
| |
| /** |
| * If auto-reconnect is going on this will hold a reference to it |
| */ |
| @MakeNotStatic |
| public static volatile DistributedSystem systemAttemptingReconnect; |
| |
| @Immutable |
| public static final CreationStackGenerator DEFAULT_CREATION_STACK_GENERATOR = config -> null; |
| |
| // the following is overridden from DistributedTestCase to fix #51058 |
| @MutableForTesting |
| public static final AtomicReference<CreationStackGenerator> TEST_CREATION_STACK_GENERATOR = |
| new AtomicReference<>(DEFAULT_CREATION_STACK_GENERATOR); |
| |
| /** |
| * A value of Boolean.TRUE will identify a thread being used to execute |
| * disconnectListeners. {@link #addDisconnectListener} will not throw ShutdownException if the |
| * value is Boolean.TRUE. |
| */ |
| private final ThreadLocal<Boolean> isDisconnectThread = |
| ThreadLocal.withInitial(() -> Boolean.FALSE); |
| |
| private final StatisticsManager statisticsManager; |
| |
| /** |
| * The distribution manager that is used to communicate with the distributed system. |
| */ |
| protected DistributionManager dm; |
| |
| private final GrantorRequestProcessor.GrantorRequestContext grc; |
| |
| /** services provided by other modules */ |
| private Map<Class, DistributedSystemService> services = new HashMap<>(); |
| |
| /** |
| * If the experimental multiple-system feature is enabled, always create a new system. |
| * |
| * <p> |
| * Otherwise, create a new InternalDistributedSystem with the given properties, or connect to an |
| * existing one with the same properties. |
| */ |
| public static InternalDistributedSystem connectInternal(Properties config, |
| SecurityConfig securityConfig) { |
| if (config == null) { |
| config = new Properties(); |
| } |
| |
| if (Boolean.getBoolean(ALLOW_MULTIPLE_SYSTEMS_PROPERTY)) { |
| return new Builder(config) |
| .setSecurityConfig(securityConfig) |
| .build(); |
| } |
| |
| synchronized (existingSystemsLock) { |
| if (ClusterDistributionManager.isDedicatedAdminVM()) { |
| // For a dedicated admin VM, check to see if there is already |
| // a connect that will suit our purposes. |
| InternalDistributedSystem existingSystem = |
| (InternalDistributedSystem) getConnection(config); |
| if (existingSystem != null) { |
| return existingSystem; |
| } |
| |
| } else { |
| boolean existingSystemDisconnecting = true; |
| boolean isReconnecting = false; |
| while (!existingSystems.isEmpty() && existingSystemDisconnecting && !isReconnecting) { |
| Assert.assertTrue(existingSystems.size() == 1); |
| |
| InternalDistributedSystem existingSystem = existingSystems.get(0); |
| existingSystemDisconnecting = existingSystem.isDisconnecting(); |
| // a reconnecting DS will block on GemFireCache.class and a ReconnectThread |
| // holds that lock and invokes this method, so we break out of the loop |
| // if we detect this condition |
| isReconnecting = existingSystem.isReconnectingDS(); |
| if (existingSystemDisconnecting) { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| // no notify for existingSystemsLock, just to release the sync |
| existingSystemsLock.wait(50); |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } else if (existingSystem.isConnected()) { |
| existingSystem.validateSameProperties(config, existingSystem.isConnected()); |
| return existingSystem; |
| } else { |
| throw new AssertionError( |
| "system should not have both disconnecting==false and isConnected==false"); |
| } |
| } |
| } |
| |
| // Make a new connection to the distributed system |
| InternalDistributedSystem newSystem = new Builder(config) |
| .setSecurityConfig(securityConfig) |
| .build(); |
| addSystem(newSystem); |
| return newSystem; |
| } |
| } |
| |
| public GrantorRequestProcessor.GrantorRequestContext getGrantorRequestContext() { |
| return grc; |
| } |
| |
| /** |
| * Numeric id that identifies this node in a DistributedSystem |
| */ |
| private long id; |
| |
| /** |
| * The log writer used to log information messages |
| */ |
| @Deprecated |
| protected InternalLogWriter logWriter = null; |
| |
| /** |
| * The log writer used to log security related messages |
| */ |
| @Deprecated |
| protected InternalLogWriter securityLogWriter = null; |
| |
| /** |
| * Distributed System clock |
| */ |
| private DSClock clock; |
| |
| /** |
| * Time this system was created |
| */ |
| private final long startTime; |
| |
| /** |
| * Guards access to {@link #isConnected} |
| */ |
| private final Object isConnectedMutex = new Object(); |
| |
| /** |
| * Is this <code>DistributedSystem</code> connected to a distributed system? |
| * <p> |
| * Concurrency: volatile for reads and protected by synchronization of {@link #isConnectedMutex} |
| * for writes |
| */ |
| protected volatile boolean isConnected; |
| |
| /** |
| * Set to true if this distributed system is a singleton; it will always be the only member of the |
| * system. |
| */ |
| private boolean isLoner = false; |
| |
| /** |
| * The sampler for this DistributedSystem. |
| */ |
| private GemFireStatSampler sampler = null; |
| |
| /** |
| * A set of listeners that are invoked when this connection to the distributed system is |
| * disconnected |
| */ |
| private final Set<DisconnectListener> disconnectListeners = new LinkedHashSet<>(); // needs to be |
| // ordered |
| |
| /** |
| * Set of listeners that are invoked whenever a connection is created to the distributed system |
| */ |
| // needs to be ordered |
| @MakeNotStatic |
| private static final Set<ConnectListener> connectListeners = new LinkedHashSet<>(); |
| |
| /** |
| * auto-reconnect listeners |
| */ |
| @MakeNotStatic |
| private static final List<ReconnectListener> reconnectListeners = new ArrayList<>(); |
| |
| /** |
| * whether this DS is one created to reconnect to the distributed system after a |
| * forced-disconnect. This state is cleared once reconnect is successful. |
| */ |
| private boolean isReconnectingDS; |
| |
| /** |
| * During a reconnect attempt this is used to perform quorum checks before allowing a location |
| * service to be started up in this JVM. If quorum checks fail then we delay starting location |
| * services until a live locator can be contacted. |
| */ |
| private QuorumChecker quorumChecker; |
| |
| |
| /** |
| * Due to Bug 38407, be careful about moving this to another class. |
| */ |
| public static final String SHUTDOWN_HOOK_NAME = "Distributed system shutdown hook"; |
| /** |
| * A property to prevent shutdown hooks from being registered with the VM. This is regarding bug |
| * 38407 |
| */ |
| public static final String DISABLE_SHUTDOWN_HOOK_PROPERTY = |
| GEMFIRE_PREFIX + "disableShutdownHook"; |
| |
| /** |
| * A property to append to existing log-file instead of truncating it. |
| */ |
| private static final String APPEND_TO_LOG_FILE = GEMFIRE_PREFIX + "append-log"; |
| |
| //////////////////// Configuration Fields //////////////////// |
| |
| /** |
| * The config object used to create this distributed system |
| */ |
| private final DistributionConfig originalConfig; |
| |
| private final boolean statsDisabled = |
| Boolean.getBoolean(GEMFIRE_PREFIX + "statsDisabled"); |
| |
| /** |
| * The config object to which most configuration work is delegated |
| */ |
| private DistributionConfig config; |
| |
| private volatile boolean shareSockets; |
| |
| /** |
| * if this distributed system starts a locator, it is stored here |
| */ |
| private InternalLocator startedLocator; |
| |
| private final List<ResourceEventsListener> resourceListeners = new CopyOnWriteArrayList<>(); |
| |
| private final boolean disableManagement = Boolean.getBoolean(DISABLE_MANAGEMENT_PROPERTY); |
| |
| /** |
| * Stack trace showing the creation of this instance of InternalDistributedSystem. |
| */ |
| private final Throwable creationStack; |
| |
| private volatile SecurityService securityService; |
| |
| /** |
| * Used at client side, indicates whether the 'delta-propagation' property is enabled on the DS |
| * this client is connected to. This variable is used to decide whether to send delta bytes or |
| * full value to the server for a delta-update operation. |
| */ |
| private boolean deltaEnabledOnServer = true; |
| |
| private final AlertingSession alertingSession; |
| private final AlertingService alertingService; |
| |
| private final LoggingSession loggingSession; |
| private final Set<LogConfigListener> logConfigListeners = new HashSet<>(); |
| |
| public boolean isDeltaEnabledOnServer() { |
| return deltaEnabledOnServer; |
| } |
| |
| public void setDeltaEnabledOnServer(boolean deltaEnabledOnServer) { |
| this.deltaEnabledOnServer = deltaEnabledOnServer; |
| } |
| |
| public static void removeSystem(InternalDistributedSystem oldSystem) { |
| DistributedSystem.removeSystem(oldSystem); |
| } |
| |
| /** |
| * Returns a connection to the distributed system that is suitable for administration. For |
| * administration, we are not as strict when it comes to existing connections. |
| * |
| * @since GemFire 4.0 |
| */ |
| public static DistributedSystem connectForAdmin(Properties props) { |
| return DistributedSystem.connectForAdmin(props); |
| } |
| |
| /** |
| * Returns a connected distributed system for this VM, or null if there is no connected |
| * distributed system in this VM. This method synchronizes on the existingSystems collection. |
| * <p> |
| * <p> |
| * author bruce |
| * |
| * @since GemFire 5.0 |
| */ |
| public static InternalDistributedSystem getConnectedInstance() { |
| InternalDistributedSystem result = null; |
| synchronized (existingSystemsLock) { |
| if (!existingSystems.isEmpty()) { |
| InternalDistributedSystem existingSystem = existingSystems.get(0); |
| if (existingSystem.isConnected()) { |
| result = existingSystem; |
| } |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Returns the current distributed system, if there is one. Note: this method is no longer unsafe |
| * size existingSystems uses copy-on-write. |
| * <p> |
| * author bruce |
| * |
| * @since GemFire 5.0 |
| */ |
| public static InternalDistributedSystem unsafeGetConnectedInstance() { |
| InternalDistributedSystem result = getAnyInstance(); |
| if (result != null) { |
| if (!result.isConnected()) { |
| result = null; |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * @return distribution stats, or null if there is no distributed system available |
| */ |
| public static DMStats getDMStats() { |
| InternalDistributedSystem sys = getAnyInstance(); |
| if (sys != null && sys.dm != null) { |
| return sys.dm.getStats(); |
| } |
| return null; |
| } |
| |
| /** |
| * @return a log writer, or null if there is no distributed system available |
| */ |
| @Deprecated |
| public static LogWriter getLogger() { |
| InternalDistributedSystem sys = getAnyInstance(); |
| if (sys != null && sys.logWriter != null) { |
| return sys.logWriter; |
| } |
| return null; |
| } |
| |
| @Deprecated |
| public static InternalLogWriter getStaticInternalLogWriter() { |
| InternalDistributedSystem sys = getAnyInstance(); |
| if (sys != null) { |
| return sys.logWriter; |
| } |
| return null; |
| } |
| |
| @Deprecated |
| public InternalLogWriter getInternalLogWriter() { |
| return logWriter; |
| } |
| |
| @Deprecated |
| public InternalLogWriter getSecurityInternalLogWriter() { |
| InternalDistributedSystem sys = getAnyInstance(); |
| if (sys != null) { |
| return sys.securityLogWriter; |
| } |
| return null; |
| } |
| |
| /** |
| * reset the reconnectAttempt counter for a new go at reconnecting |
| */ |
| private static void resetReconnectAttemptCounter() { |
| reconnectAttemptCounter.set(0); |
| } |
| |
| /** |
| * Creates a new {@code InternalDistributedSystem} with the given configuration. |
| * |
| * <p> |
| * See {@link #connect} for a list of exceptions that may be thrown. |
| * |
| * @param config the configuration for the connection |
| * @param statisticsManagerFactory creates the statistics manager for this member |
| */ |
| private InternalDistributedSystem(ConnectionConfig config, |
| StatisticsManagerFactory statisticsManagerFactory) { |
| alertingSession = AlertingSession.create(); |
| alertingService = new AlertingService(); |
| loggingSession = LoggingSession.create(); |
| |
| // register DSFID types first; invoked explicitly so that all message type |
| // initializations do not happen in first deserialization on a possibly |
| // "precious" thread |
| DSFIDFactory.registerTypes(); |
| |
| originalConfig = config.distributionConfig(); |
| isReconnectingDS = config.isReconnecting(); |
| quorumChecker = config.quorumChecker(); |
| |
| ((DistributionConfigImpl) originalConfig).checkForDisallowedDefaults(); // throws |
| // IllegalStateEx |
| shareSockets = originalConfig.getConserveSockets(); |
| startTime = System.currentTimeMillis(); |
| grc = new GrantorRequestProcessor.GrantorRequestContext(stopper); |
| |
| creationStack = |
| TEST_CREATION_STACK_GENERATOR.get().generateCreationStack(originalConfig); |
| |
| statisticsManager = |
| statisticsManagerFactory.create(originalConfig.getName(), startTime, statsDisabled); |
| } |
| |
| public SecurityService getSecurityService() { |
| return securityService; |
| } |
| |
| public void setSecurityService(SecurityService securityService) { |
| this.securityService = securityService; |
| } |
| |
| /** |
| * Registers a listener to the system |
| * |
| * @param listener listener to be added |
| */ |
| public void addResourceListener(ResourceEventsListener listener) { |
| resourceListeners.add(listener); |
| } |
| |
| /** |
| * Un-Registers a listener to the system |
| * |
| * @param listener listener to be removed |
| */ |
| public void removeResourceListener(ResourceEventsListener listener) { |
| resourceListeners.remove(listener); |
| } |
| |
| /** |
| * @return the listeners registered with the system |
| */ |
| public List<ResourceEventsListener> getResourceListeners() { |
| return resourceListeners; |
| } |
| |
| /** |
| * Handles a particular event associated with a resource |
| * |
| * @param event Resource event |
| * @param resource resource on which event is generated |
| */ |
| public void handleResourceEvent(ResourceEvent event, Object resource) { |
| if (disableManagement) { |
| return; |
| } |
| if (resourceListeners.size() == 0) { |
| return; |
| } |
| notifyResourceEventListeners(event, resource); |
| } |
| |
| /** |
| * Returns true if system is a loner (for testing) |
| */ |
| public boolean isLoner() { |
| return isLoner; |
| } |
| |
| private MemoryAllocator offHeapStore = null; |
| |
| public MemoryAllocator getOffHeapStore() { |
| return offHeapStore; |
| } |
| |
| /** |
| * Initialize any services that provided as extensions to the cache using the service loader |
| * mechanism. |
| */ |
| private void initializeServices() { |
| ServiceLoader<DistributedSystemService> loader = |
| ServiceLoader.load(DistributedSystemService.class); |
| for (DistributedSystemService service : loader) { |
| service.init(this); |
| services.put(service.getInterface(), service); |
| } |
| } |
| |
| |
| /** |
| * Initializes this connection to a distributed system with the current configuration state. |
| */ |
| private void initialize(SecurityManager securityManager, PostProcessor postProcessor) { |
| if (originalConfig.getLocators().equals("")) { |
| if (originalConfig.getMcastPort() != 0) { |
| throw new GemFireConfigException("The " + LOCATORS + " attribute can not be empty when the " |
| + MCAST_PORT + " attribute is non-zero."); |
| } else { |
| // no distribution |
| isLoner = true; |
| } |
| } |
| |
| config = new RuntimeDistributionConfigImpl(this); |
| |
| securityService = SecurityServiceFactory.create( |
| config.getSecurityProps(), |
| securityManager, postProcessor); |
| |
| if (!isLoner) { |
| attemptingToReconnect = (reconnectAttemptCounter.get() > 0); |
| } |
| try { |
| SocketCreatorFactory.setDistributionConfig(config); |
| |
| boolean logBanner = !attemptingToReconnect; |
| boolean logConfiguration = !attemptingToReconnect; |
| loggingSession.createSession(this, logBanner, logConfiguration); |
| |
| // LOG: create LogWriterLogger(s) for backwards compatibility of getLogWriter and |
| // getSecurityLogWriter |
| if (logWriter == null) { |
| logWriter = |
| LogWriterFactory.createLogWriterLogger(config, false); |
| logWriter.fine("LogWriter is created."); |
| } |
| |
| // logWriter.info("Created log writer for IDS@"+System.identityHashCode(this)); |
| |
| if (securityLogWriter == null) { |
| // LOG: whole new LogWriterLogger instance for security |
| securityLogWriter = |
| LogWriterFactory.createLogWriterLogger(config, true); |
| securityLogWriter.fine("SecurityLogWriter is created."); |
| } |
| |
| loggingSession.startSession(); |
| |
| clock = new DSClock(isLoner); |
| |
| if (attemptingToReconnect && logger.isDebugEnabled()) { |
| logger.debug( |
| "This thread is initializing a new DistributedSystem in order to reconnect to other members"); |
| } |
| // Note we need loners to load the license in case they are a |
| // cache server and will need to enforce the member limit |
| if (Boolean.getBoolean(InternalLocator.FORCE_LOCATOR_DM_TYPE)) { |
| locatorDMTypeForced = true; |
| } |
| |
| initializeServices(); |
| InternalDataSerializer.initialize(config, services.values()); |
| |
| // Initialize the Diffie-Hellman and public/private keys |
| try { |
| EncryptorImpl.initCertsMap(config.getSecurityProps()); |
| EncryptorImpl.initPrivateKey(config.getSecurityProps()); |
| EncryptorImpl.initDHKeys(config); |
| } catch (Exception ex) { |
| throw new GemFireSecurityException( |
| "Problem in initializing keys for client authentication", |
| ex); |
| } |
| |
| final long offHeapMemorySize = |
| OffHeapStorage.parseOffHeapMemorySize(getConfig().getOffHeapMemorySize()); |
| |
| offHeapStore = OffHeapStorage.createOffHeapStorage(this, offHeapMemorySize, this); |
| |
| // Note: this can only happen on a linux system |
| if (getConfig().getLockMemory()) { |
| // This calculation is not exact, but seems fairly close. So far we have |
| // not loaded much into the heap and the current RSS usage is already |
| // included the available memory calculation. |
| long avail = LinuxProcFsStatistics.getAvailableMemory(logger); |
| long size = offHeapMemorySize + Runtime.getRuntime().totalMemory(); |
| if (avail < size) { |
| if (ALLOW_MEMORY_LOCK_WHEN_OVERCOMMITTED) { |
| logger.warn( |
| "System memory appears to be over committed by {} bytes. You may experience instability, performance issues, or terminated processes due to the Linux OOM killer.", |
| size - avail); |
| } else { |
| throw new IllegalStateException( |
| String.format( |
| "Insufficient free memory (%s) when attempting to lock %s bytes. Either reduce the amount of heap or off-heap memory requested or free up additional system memory. You may also specify -Dgemfire.Cache.ALLOW_MEMORY_OVERCOMMIT=true on the command-line to override the constraint check.", |
| avail, size)); |
| } |
| } |
| |
| logger.info("Locking memory. This may take a while..."); |
| GemFireCacheImpl.lockMemory(); |
| logger.info("Finished locking memory."); |
| } |
| |
| try { |
| startInitLocator(); |
| } catch (InterruptedException e) { |
| throw new SystemConnectException("Startup has been interrupted", e); |
| } |
| |
| synchronized (isConnectedMutex) { |
| isConnected = true; |
| } |
| |
| if (!isLoner) { |
| try { |
| if (quorumChecker != null) { |
| quorumChecker.suspend(); |
| } |
| dm = ClusterDistributionManager.create(this); |
| // fix bug #46324 |
| if (InternalLocator.hasLocator()) { |
| InternalLocator locator = InternalLocator.getLocator(); |
| getDistributionManager().addHostedLocators(getDistributedMember(), |
| InternalLocator.getLocatorStrings(), locator.isSharedConfigurationEnabled()); |
| } |
| } finally { |
| if (dm == null && quorumChecker != null) { |
| quorumChecker.resume(); |
| } |
| setDisconnected(); |
| } |
| } else { |
| dm = new LonerDistributionManager(this, logWriter); |
| } |
| |
| Assert.assertTrue(dm != null); |
| Assert.assertTrue(dm.getSystem() == this); |
| |
| try { |
| id = dm.getMembershipPort(); |
| } catch (DistributedSystemDisconnectedException e) { |
| // bug #48144 - The dm's channel threw an NPE. It now throws this exception |
| // but during startup we should instead throw a SystemConnectException |
| throw new SystemConnectException( |
| "Distributed system has disconnected during startup.", |
| e); |
| } |
| |
| synchronized (isConnectedMutex) { |
| isConnected = true; |
| } |
| if (attemptingToReconnect && (startedLocator == null)) { |
| try { |
| startInitLocator(); |
| } catch (InterruptedException e) { |
| throw new SystemConnectException("Startup has been interrupted", e); |
| } |
| } |
| try { |
| endInitLocator(); |
| } catch (IOException e) { |
| throw new GemFireIOException("Problem finishing a locator service start", e); |
| } |
| |
| startSampler(); |
| |
| alertingSession.createSession(new AlertMessaging(this)); |
| alertingSession.startSession(); |
| |
| // Log any instantiators that were registered before the log writer |
| // was created |
| InternalInstantiator.logInstantiators(); |
| } catch (RuntimeException ex) { |
| config.close(); |
| throw ex; |
| } |
| |
| reconnected = attemptingToReconnect; |
| attemptingToReconnect = false; |
| reconnectAttemptCounter.set(0); |
| } |
| |
| private void startSampler() { |
| if (statsDisabled) { |
| return; |
| } |
| sampler = loggingSession.getLogFile() |
| .map(logFile -> new GemFireStatSampler(this, logFile)) |
| .orElseGet(() -> new GemFireStatSampler(this)); |
| sampler.start(); |
| } |
| |
| /** |
| * @since GemFire 5.7 |
| */ |
| private void startInitLocator() throws InterruptedException { |
| String locatorString = originalConfig.getStartLocator(); |
| if (locatorString.length() == 0) { |
| return; |
| } |
| |
| // when reconnecting we don't want to join with a colocated locator unless |
| // there is a quorum of the old members available |
| if (attemptingToReconnect && !isConnected) { |
| if (quorumChecker != null) { |
| logger.info("performing a quorum check to see if location services can be started early"); |
| if (!quorumChecker.checkForQuorum(3L * config.getMemberTimeout())) { |
| logger.info("quorum check failed - not allowing location services to start early"); |
| return; |
| } |
| logger.info("Quorum check passed - allowing location services to start early"); |
| } |
| } |
| DistributionLocatorId locId = new DistributionLocatorId(locatorString); |
| try { |
| startedLocator = |
| InternalLocator.createLocator(locId.getPort(), NullLoggingSession.create(), null, |
| logWriter, securityLogWriter, locId.getHost().getAddress(), |
| locId.getHostnameForClients(), originalConfig.toProperties(), false); |
| |
| // if locator is started this way, cluster config is not enabled, set the flag correctly |
| startedLocator.getConfig().setEnableClusterConfiguration(false); |
| |
| boolean startedPeerLocation = false; |
| try { |
| startedLocator.startPeerLocation(); |
| startedPeerLocation = true; |
| } finally { |
| if (!startedPeerLocation) { |
| startedLocator.stop(); |
| } |
| } |
| } catch (IOException e) { |
| throw new GemFireIOException( |
| "Problem starting a locator service", |
| e); |
| } |
| } |
| |
| /** |
| * @since GemFire 5.7 |
| */ |
| private void endInitLocator() throws IOException { |
| InternalLocator loc = startedLocator; |
| if (loc != null) { |
| boolean finished = false; |
| try { |
| loc.startServerLocation(this); |
| loc.endStartLocator(this); |
| finished = true; |
| } finally { |
| if (!finished) { |
| loc.stop(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * record a locator as a dependent of this distributed system |
| */ |
| void setDependentLocator(InternalLocator theLocator) { |
| startedLocator = theLocator; |
| } |
| |
| /** |
| * Used by DistributionManager to fix bug 33362 |
| */ |
| void setDM(DistributionManager dm) { |
| this.dm = dm; |
| } |
| |
| /** |
| * Checks whether or not this connection to a distributed system is closed. |
| * |
| * @throws DistributedSystemDisconnectedException This connection has been |
| * {@link #disconnect(boolean, String, boolean) disconnected} |
| */ |
| private void checkConnected() { |
| if (!isConnected()) { |
| throw new DistributedSystemDisconnectedException( |
| "This connection to a distributed system has been disconnected.", |
| dm.getRootCause()); |
| } |
| } |
| |
| @Override |
| public boolean isConnected() { |
| if (dm == null) { |
| return false; |
| } |
| if (dm.getCancelCriterion().isCancelInProgress()) { |
| return false; |
| } |
| if (isDisconnecting) { |
| return false; |
| } |
| return isConnected; |
| } |
| |
| /* |
| * This method was introduced so we can deterministically query whether the distributed |
| * system has fully disconnected or not. The isConnected() method will return false if a |
| * disconnection/cancellation is in progress, so it does not provide a reliable way to |
| * query if the distributed system is fully disconnected or not. |
| */ |
| public boolean isDisconnected() { |
| return !isConnected; |
| } |
| |
| public StatisticsManager getStatisticsManager() { |
| return statisticsManager; |
| } |
| |
| @Override |
| public StatisticDescriptor createIntCounter(String name, |
| String description, |
| String units) { |
| return createLongCounter(name, description, units); |
| } |
| |
| @Override |
| public StatisticDescriptor createLongCounter(String name, |
| String description, |
| String units) { |
| return statisticsManager.createLongCounter(name, description, units); |
| } |
| |
| @Override |
| public StatisticDescriptor createDoubleCounter(String name, |
| String description, |
| String units) { |
| return statisticsManager.createDoubleCounter(name, description, units); |
| } |
| |
| @Override |
| public StatisticDescriptor createIntGauge(String name, |
| String description, |
| String units) { |
| return createLongGauge(name, description, units); |
| } |
| |
| @Override |
| public StatisticDescriptor createLongGauge(String name, |
| String description, |
| String units) { |
| return statisticsManager.createLongGauge(name, description, units); |
| } |
| |
| @Override |
| public StatisticDescriptor createDoubleGauge(String name, |
| String description, |
| String units) { |
| return statisticsManager.createDoubleGauge(name, description, units); |
| } |
| |
| @Override |
| public StatisticDescriptor createIntCounter(String name, |
| String description, |
| String units, boolean largerBetter) { |
| return createLongCounter(name, description, units, largerBetter); |
| } |
| |
| @Override |
| public StatisticDescriptor createLongCounter(String name, |
| String description, |
| String units, boolean largerBetter) { |
| return statisticsManager.createLongCounter(name, description, units, largerBetter); |
| } |
| |
| @Override |
| public StatisticDescriptor createDoubleCounter(String name, |
| String description, |
| String units, |
| boolean largerBetter) { |
| return statisticsManager.createDoubleCounter(name, description, units, largerBetter); |
| } |
| |
| @Override |
| public StatisticDescriptor createIntGauge(String name, |
| String description, |
| String units, boolean largerBetter) { |
| return createLongGauge(name, description, units, largerBetter); |
| } |
| |
| @Override |
| public StatisticDescriptor createLongGauge(String name, |
| String description, |
| String units, boolean largerBetter) { |
| return statisticsManager.createLongGauge(name, description, units, largerBetter); |
| } |
| |
| @Override |
| public StatisticDescriptor createDoubleGauge(String name, |
| String description, |
| String units, boolean largerBetter) { |
| return statisticsManager.createDoubleGauge(name, description, units, largerBetter); |
| } |
| |
| @Override |
| public StatisticsType createType(String name, String description, |
| StatisticDescriptor[] stats) { |
| return statisticsManager.createType(name, description, stats); |
| } |
| |
| @Override |
| public StatisticsType findType(String name) { |
| return statisticsManager.findType(name); |
| } |
| |
| @Override |
| public StatisticsType[] createTypesFromXml(Reader reader) |
| throws IOException { |
| return statisticsManager.createTypesFromXml(reader); |
| } |
| |
| @Override |
| public Statistics createStatistics(StatisticsType type) { |
| return statisticsManager.createStatistics(type); |
| } |
| |
| @Override |
| public Statistics createStatistics(StatisticsType type, |
| String textId) { |
| return statisticsManager.createStatistics(type, textId); |
| } |
| |
| @Override |
| public Statistics createStatistics(StatisticsType type, |
| String textId, long numericId) { |
| return statisticsManager.createStatistics(type, textId, numericId); |
| } |
| |
| @Override |
| public Statistics createAtomicStatistics(StatisticsType type) { |
| return statisticsManager.createAtomicStatistics(type); |
| } |
| |
| @Override |
| public Statistics createAtomicStatistics(StatisticsType type, |
| String textId) { |
| return statisticsManager.createAtomicStatistics(type, textId); |
| } |
| |
| @Override |
| public Statistics createAtomicStatistics(StatisticsType type, |
| String textId, long numericId) { |
| return statisticsManager.createAtomicStatistics(type, textId, numericId); |
| } |
| |
| @Override |
| public Statistics[] findStatisticsByType(StatisticsType type) { |
| return statisticsManager.findStatisticsByType(type); |
| } |
| |
| @Override |
| public Statistics[] findStatisticsByTextId(String textId) { |
| return statisticsManager.findStatisticsByTextId(textId); |
| } |
| |
| @Override |
| public Statistics[] findStatisticsByNumericId(long numericId) { |
| return statisticsManager.findStatisticsByNumericId(numericId); |
| } |
| |
| @Override |
| public String getName() { |
| return getOriginalConfig().getName(); |
| } |
| |
| @Override |
| public long getId() { |
| return id; |
| } |
| |
| public long getStartTime() { |
| return startTime; |
| } |
| |
| /** |
| * This class defers to the DM. If we don't have a DM, we're dead. |
| */ |
| protected class Stopper extends CancelCriterion { |
| |
| @Override |
| public String cancelInProgress() { |
| checkFailure(); |
| if (dm == null) { |
| return "No dm"; |
| } |
| return dm.getCancelCriterion().cancelInProgress(); |
| } |
| |
| @Override |
| public RuntimeException generateCancelledException(Throwable e) { |
| if (dm == null) { |
| return new DistributedSystemDisconnectedException("no dm", e); |
| } |
| return dm.getCancelCriterion().generateCancelledException(e); |
| } |
| } |
| |
| /** |
| * Handles all cancellation queries for this distributed system |
| */ |
| private final Stopper stopper = new Stopper(); |
| |
| @Override |
| public CancelCriterion getCancelCriterion() { |
| return stopper; |
| } |
| |
| public boolean isDisconnecting() { |
| if (dm == null) { |
| return true; |
| } |
| if (dm.getCancelCriterion().isCancelInProgress()) { |
| return true; |
| } |
| if (!isConnected) { |
| return true; |
| } |
| return isDisconnecting; |
| } |
| |
| @Override |
| public LogWriter getLogWriter() { |
| return logWriter; |
| } |
| |
| public DSClock getClock() { |
| return clock; |
| } |
| |
| @Override |
| public LogWriter getSecurityLogWriter() { |
| return securityLogWriter; |
| } |
| |
| /** |
| * Returns the stat sampler |
| */ |
| public GemFireStatSampler getStatSampler() { |
| return sampler; |
| } |
| |
| /** |
| * Has this system started the disconnect process? |
| */ |
| protected volatile boolean isDisconnecting = false; |
| |
| /** |
| * Disconnects this VM from the distributed system. Shuts down the distribution manager, and if |
| * necessary, |
| */ |
| @Override |
| public void disconnect() { |
| disconnect(false, |
| "normal disconnect", false); |
| } |
| |
| /** |
| * Disconnects this member from the distributed system when an internal error has caused |
| * distribution to fail (e.g., this member was shunned) |
| * |
| * @param reason a string describing why the disconnect is occurring |
| * @param shunned whether this member was shunned by the membership coordinator |
| */ |
| public void disconnect(String reason, boolean shunned) { |
| boolean isForcedDisconnect = dm.getRootCause() instanceof ForcedDisconnectException; |
| boolean rejoined = false; |
| reconnected = false; |
| if (isForcedDisconnect && !isReconnectingDS) { |
| forcedDisconnect = true; |
| resetReconnectAttemptCounter(); |
| rejoined = tryReconnect(true, reason, GemFireCacheImpl.getInstance()); |
| } |
| if (!rejoined) { |
| disconnect(false, reason, shunned); |
| } |
| } |
| |
| /** |
| * This is how much time, in milliseconds to allow a disconnect listener to run before we |
| * interrupt it. |
| */ |
| private static final long MAX_DISCONNECT_WAIT = |
| Long.getLong("DistributionManager.DISCONNECT_WAIT", 10 * 1000); |
| |
| /** |
| * Run a disconnect listener, checking for errors and honoring the timeout |
| * {@link #MAX_DISCONNECT_WAIT}. |
| * |
| * @param dc the listener to run |
| */ |
| private void runDisconnect(final DisconnectListener dc) { |
| // Create a general handler for running the disconnect |
| // Launch it and wait a little bit |
| Thread t = new LoggingThread(dc.toString(), false, () -> { |
| try { |
| isDisconnectThread.set(Boolean.TRUE); |
| dc.onDisconnect(InternalDistributedSystem.this); |
| } catch (CancelException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Disconnect listener <{}> thwarted by cancellation: {}", dc, e, |
| traceException(e)); |
| } |
| } |
| }); |
| try { |
| t.start(); |
| t.join(MAX_DISCONNECT_WAIT); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| logger.warn("Interrupted while processing disconnect listener", |
| e); |
| } |
| |
| // Make sure the listener gets the cue to die |
| if (t.isAlive()) { |
| logger.warn("Disconnect listener still running: {}", dc); |
| t.interrupt(); |
| |
| try { |
| t.join(MAX_DISCONNECT_WAIT); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| |
| if (t.isAlive()) { |
| logger.warn("Disconnect listener ignored its interrupt: {}", |
| dc); |
| } |
| } |
| |
| } |
| |
| private Exception traceException(CancelException e) { |
| return logger.isTraceEnabled() ? e : null; |
| } |
| |
| public boolean isDisconnectThread() { |
| return isDisconnectThread.get(); |
| } |
| |
| public void setIsDisconnectThread() { |
| isDisconnectThread.set(Boolean.TRUE); |
| } |
| |
| /** |
| * Run a disconnect listener in the same thread sequence as the reconnect. |
| * |
| * @param dc the listener to run |
| */ |
| |
| private void runDisconnectForReconnect(final DisconnectListener dc) { |
| try { |
| dc.onDisconnect(this); |
| } catch (DistributedSystemDisconnectedException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Disconnect listener <{}> thwarted by shutdown: {}", dc, e, traceException(e)); |
| } |
| } |
| } |
| |
| /** |
| * Disconnect cache, run disconnect listeners. |
| * |
| * @param doReconnect whether a reconnect will be done |
| * @return a collection of shutdownListeners |
| */ |
| private HashSet<ShutdownListener> doDisconnects(boolean doReconnect) { |
| // Make a pass over the disconnect listeners, asking them _politely_ |
| // to clean up. |
| HashSet<ShutdownListener> shutdownListeners = new HashSet<>(); |
| for (;;) { |
| DisconnectListener listener; |
| synchronized (disconnectListeners) { |
| Iterator<DisconnectListener> itr = disconnectListeners.iterator(); |
| if (!itr.hasNext()) { |
| return shutdownListeners; |
| } |
| listener = itr.next(); |
| if (listener instanceof ShutdownListener) { |
| shutdownListeners.add((ShutdownListener) listener); |
| } |
| itr.remove(); |
| } // synchronized |
| |
| if (doReconnect) { |
| runDisconnectForReconnect(listener); |
| } else { |
| runDisconnect(listener); |
| } |
| } // for |
| } |
| |
| /** |
| * Process the shutdown listeners. It is essential that the DM has been shut down before calling |
| * this step, to ensure that no new listeners are registering. |
| * |
| * @param shutdownListeners shutdown listeners initially registered with us |
| */ |
| private void doShutdownListeners(HashSet<ShutdownListener> shutdownListeners) { |
| if (shutdownListeners == null) { |
| return; |
| } |
| |
| // Process any shutdown listeners we reaped during first pass |
| for (ShutdownListener shutdownListener : shutdownListeners) { |
| try { |
| shutdownListener.onShutdown(this); |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| // things could break since we continue, but we want to disconnect! |
| logger.fatal(String.format("ShutdownListener < %s > threw...", shutdownListener), t); |
| } |
| } |
| |
| // During the window while we were running disconnect listeners, new |
| // disconnect listeners may have appeared. After messagingDisabled is |
| // set, no new ones will be created. However, we must process any |
| // that appeared in the interim. |
| for (;;) { |
| // Pluck next listener from the list |
| DisconnectListener dcListener; |
| ShutdownListener sdListener = null; |
| synchronized (disconnectListeners) { |
| Iterator<DisconnectListener> itr = disconnectListeners.iterator(); |
| if (!itr.hasNext()) { |
| break; |
| } |
| dcListener = itr.next(); |
| itr.remove(); |
| if (dcListener instanceof ShutdownListener) { |
| sdListener = (ShutdownListener) dcListener; |
| } |
| } |
| |
| runDisconnect(dcListener); |
| |
| // Run the shutdown, if any |
| if (sdListener != null) { |
| try { |
| sdListener.onShutdown(this); |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| // things could break since we continue, but we want to disconnect! |
| logger.fatal("DisconnectListener/Shutdown threw...", t); |
| } |
| } |
| } // for |
| } |
| |
| /** |
| * break any potential circularity in {@link #loadEmergencyClasses()} |
| */ |
| @MakeNotStatic |
| private static volatile boolean emergencyClassesLoaded = false; |
| |
| /** |
| * Ensure that the MembershipManager class gets loaded. |
| * |
| * @see SystemFailure#loadEmergencyClasses() |
| */ |
| public static void loadEmergencyClasses() { |
| if (emergencyClassesLoaded) { |
| return; |
| } |
| emergencyClassesLoaded = true; |
| GMSMembershipManager.loadEmergencyClasses(); |
| } |
| |
| /** |
| * Closes the membership manager |
| * |
| * @see SystemFailure#emergencyClose() |
| */ |
| public void emergencyClose() { |
| if (dm != null) { |
| MembershipManager mm = dm.getMembershipManager(); |
| if (mm != null) { |
| mm.emergencyClose(); |
| } |
| } |
| |
| // Garbage collection |
| // Leave dm alone; its CancelCriterion will help people die |
| isConnected = false; |
| if (dm != null) { |
| dm.setRootCause(SystemFailure.getFailure()); |
| } |
| isDisconnecting = true; |
| disconnectListeners.clear(); |
| } |
| |
| private void setDisconnected() { |
| synchronized (isConnectedMutex) { |
| isConnected = false; |
| isConnectedMutex.notifyAll(); |
| } |
| } |
| |
| private void waitDisconnected() { |
| synchronized (isConnectedMutex) { |
| while (isConnected) { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| isConnectedMutex.wait(); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| getLogWriter() |
| .warning("Disconnect wait interrupted", e); |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // while |
| } |
| } |
| |
| /** |
| * Disconnects this VM from the distributed system. Shuts down the distribution manager. |
| * |
| * @param preparingForReconnect true if called by a reconnect operation |
| * @param reason the reason the disconnect is being performed |
| * @param keepAlive true if user requested durable subscriptions are to be retained at server. |
| */ |
| protected void disconnect(boolean preparingForReconnect, String reason, boolean keepAlive) { |
| boolean isShutdownHook = (shutdownHook != null) && (Thread.currentThread() == shutdownHook); |
| |
| if (!preparingForReconnect) { |
| // logger.info("disconnecting IDS@"+System.identityHashCode(this)); |
| synchronized (reconnectListeners) { |
| reconnectListeners.clear(); |
| } |
| cancelReconnect(); |
| } |
| |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| try { |
| HashSet<ShutdownListener> shutdownListeners = null; |
| try { |
| if (isDebugEnabled) { |
| logger.debug("DistributedSystem.disconnect invoked on {}", this); |
| } |
| synchronized (GemFireCacheImpl.class) { |
| // bug 36955, 37014: don't use a disconnect listener on the cache; |
| // it takes too long. |
| // |
| // However, make sure cache is completely closed before starting |
| // the distributed system close. |
| InternalCache currentCache = getCache(); |
| if (currentCache != null && !currentCache.isClosed()) { |
| isDisconnectThread.set(Boolean.TRUE); // bug #42663 - this must be set while |
| // closing the cache |
| try { |
| currentCache.close(reason, dm.getRootCause(), keepAlive, true); // fix for 42150 |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable e) { |
| SystemFailure.checkFailure(); |
| // Whenever you catch Error or Throwable, you must also |
| // check for fatal JVM error (see above). However, there is |
| logger.warn( |
| "Exception trying to close cache", |
| e); |
| } finally { |
| isDisconnectThread.set(Boolean.FALSE); |
| } |
| } |
| |
| // While still holding the lock, make sure this instance is |
| // marked as shutting down |
| synchronized (this) { |
| if (isDisconnecting) { |
| // It's already started, but don't return |
| // to the caller until it has completed. |
| waitDisconnected(); |
| return; |
| } // isDisconnecting |
| isDisconnecting = true; |
| |
| if (!preparingForReconnect) { |
| // move cancelReconnect above this synchronized block fix for bug 35202 |
| if (reconnectDS != null) { |
| // break recursion |
| if (isDebugEnabled) { |
| logger.debug("disconnecting reconnected DS: {}", reconnectDS); |
| } |
| InternalDistributedSystem r = reconnectDS; |
| reconnectDS = null; |
| r.disconnect(false, null, false); |
| } |
| } // !reconnect |
| } // synchronized (this) |
| } // synchronized (GemFireCache.class) |
| |
| securityService.close(); |
| |
| if (!isShutdownHook) { |
| shutdownListeners = doDisconnects(attemptingToReconnect); |
| } |
| |
| if (!attemptingToReconnect) { |
| alertingSession.stopSession(); |
| } |
| |
| } finally { // be ABSOLUTELY CERTAIN that dm closed |
| try { |
| // Do the bulk of the close... |
| dm.close(); |
| // we close the locator after the DM so that when split-brain detection |
| // is enabled, loss of the locator doesn't cause the DM to croak |
| if (startedLocator != null) { |
| startedLocator.stop(forcedDisconnect, preparingForReconnect, false); |
| startedLocator = null; |
| } |
| } finally { // timer canceled |
| // bug 38501: this has to happen *after* |
| // the DM is closed :-( |
| if (!preparingForReconnect) { |
| SystemTimer.cancelSwarm(this); |
| } |
| } // finally timer cancelled |
| } // finally dm closed |
| |
| if (!isShutdownHook) { |
| doShutdownListeners(shutdownListeners); |
| } |
| |
| // closing the Aggregate stats |
| if (functionServiceStats != null) { |
| functionServiceStats.close(); |
| } |
| // closing individual function stats |
| for (FunctionStats functionstats : functionExecutionStatsMap.values()) { |
| functionstats.close(); |
| } |
| |
| InternalFunctionService.unregisterAllFunctions(); |
| |
| if (sampler != null) { |
| sampler.stop(); |
| sampler = null; |
| } |
| |
| if (!attemptingToReconnect) { |
| loggingSession.stopSession(); |
| } |
| |
| EventID.unsetDS(); |
| |
| } finally { |
| try { |
| if (getOffHeapStore() != null) { |
| getOffHeapStore().close(); |
| } |
| } finally { |
| try { |
| removeSystem(this); |
| if (!attemptingToReconnect) { |
| loggingSession.shutdown(); |
| } |
| alertingSession.shutdown(); |
| // Close the config object |
| config.close(); |
| } finally { |
| // Finally, mark ourselves as disconnected |
| setDisconnected(); |
| SystemFailure.stopThreads(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns the distribution manager for accessing this distributed system. |
| */ |
| public DistributionManager getDistributionManager() { |
| checkConnected(); |
| return dm; |
| } |
| |
| /** |
| * Returns the distribution manager without checking for connected or not so can also return null. |
| */ |
| public DistributionManager getDM() { |
| return dm; |
| } |
| |
| /** |
| * If this DistributedSystem is attempting to reconnect to the distributed system this will return |
| * the quorum checker created by the old MembershipManager for checking to see if a quorum of old |
| * members can be reached. |
| * |
| * @return the quorum checking service |
| */ |
| QuorumChecker getQuorumChecker() { |
| return quorumChecker; |
| } |
| |
| /** |
| * Returns true if this DS has been attempting to reconnect but the attempt has been cancelled. |
| */ |
| public boolean isReconnectCancelled() { |
| return reconnectCancelled; |
| } |
| |
| /** |
| * Returns whether or not this distributed system has the same configuration as the given set of |
| * properties. |
| * |
| * @see DistributedSystem#connect |
| */ |
| public boolean sameAs(Properties props, boolean isConnected) { |
| return originalConfig.sameAs(DistributionConfigImpl.produce(props, isConnected)); |
| } |
| |
| public boolean threadOwnsResources() { |
| Boolean b = ConnectionTable.getThreadOwnsResourcesRegistration(); |
| if (b == null) { |
| // thread does not have a preference so return default |
| return !shareSockets; |
| } else { |
| return b; |
| } |
| } |
| |
| /** |
| * Returns whether or not the given configuration properties refer to the same distributed system |
| * as this <code>InternalDistributedSystem</code> connection. |
| * |
| * @since GemFire 4.0 |
| */ |
| public boolean sameSystemAs(Properties props) { |
| DistributionConfig other = DistributionConfigImpl.produce(props); |
| DistributionConfig me = getConfig(); |
| |
| if (!me.getBindAddress().equals(other.getBindAddress())) { |
| return false; |
| } |
| |
| // locators |
| String myLocators = me.getLocators(); |
| String otherLocators = other.getLocators(); |
| |
| // quick check |
| if (myLocators.equals(otherLocators)) { |
| return true; |
| |
| } else { |
| myLocators = canonicalizeLocators(myLocators); |
| otherLocators = canonicalizeLocators(otherLocators); |
| |
| return myLocators.equals(otherLocators); |
| } |
| } |
| |
| /** |
| * Canonicalizes a locators string so that they may be compared. |
| * |
| * @since GemFire 4.0 |
| */ |
| private static String canonicalizeLocators(String locators) { |
| SortedSet<String> sorted = new TreeSet<>(); |
| StringTokenizer st = new StringTokenizer(locators, ","); |
| while (st.hasMoreTokens()) { |
| String l = st.nextToken(); |
| StringBuilder canonical = new StringBuilder(); |
| DistributionLocatorId locId = new DistributionLocatorId(l); |
| String addr = locId.getBindAddress(); |
| if (addr != null && addr.trim().length() > 0) { |
| canonical.append(addr); |
| } else { |
| canonical.append(locId.getHostName()); |
| } |
| canonical.append("["); |
| canonical.append(locId.getPort()); |
| canonical.append("]"); |
| sorted.add(canonical.toString()); |
| } |
| |
| StringBuilder sb = new StringBuilder(); |
| for (Iterator iter = sorted.iterator(); iter.hasNext();) { |
| sb.append((String) iter.next()); |
| if (iter.hasNext()) { |
| sb.append(","); |
| } |
| } |
| return sb.toString(); |
| } |
| |
| /** |
| * Returns the current configuration of this distributed system. |
| */ |
| public DistributionConfig getConfig() { |
| return config; |
| } |
| |
| public AlertingService getAlertingService() { |
| return alertingService; |
| } |
| |
| @Override |
| public LogConfig getLogConfig() { |
| return config; |
| } |
| |
| @Override |
| public StatisticsConfig getStatisticsConfig() { |
| return config; |
| } |
| |
| @Override |
| public void addLogConfigListener(LogConfigListener logConfigListener) { |
| logConfigListeners.add(logConfigListener); |
| } |
| |
| @Override |
| public void removeLogConfigListener(LogConfigListener logConfigListener) { |
| logConfigListeners.remove(logConfigListener); |
| } |
| |
| public Optional<LogFile> getLogFile() { |
| return loggingSession.getLogFile(); |
| } |
| |
| void logConfigChanged() { |
| for (LogConfigListener listener : logConfigListeners) { |
| listener.configChanged(); |
| } |
| } |
| |
| /** |
| * Returns the string value of the distribution manager's id. |
| */ |
| @Override |
| public String getMemberId() { |
| return String.valueOf(dm.getId()); |
| } |
| |
| @Override |
| public InternalDistributedMember getDistributedMember() { |
| return dm.getId(); |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public Set<DistributedMember> getAllOtherMembers() { |
| return (Set) dm.getAllOtherMembers(); |
| } |
| |
| @Override |
| public Set<DistributedMember> getGroupMembers(String group) { |
| return dm.getGroupMembers(group); |
| } |
| |
| |
| @Override |
| public Set<DistributedMember> findDistributedMembers(InetAddress address) { |
| Set<InternalDistributedMember> allMembers = dm.getDistributionManagerIdsIncludingAdmin(); |
| Set<DistributedMember> results = new HashSet<>(2); |
| |
| // Search through the set of all members |
| for (InternalDistributedMember member : allMembers) { |
| |
| Set<InetAddress> equivalentAddresses = dm.getEquivalents(member.getInetAddress()); |
| // Check to see if the passed in address is matches one of the addresses on |
| // the given member. |
| if (address.equals(member.getInetAddress()) || equivalentAddresses.contains(address)) { |
| results.add(member); |
| } |
| } |
| |
| return results; |
| } |
| |
| @Override |
| public DistributedMember findDistributedMember(String name) { |
| for (DistributedMember member : dm.getDistributionManagerIdsIncludingAdmin()) { |
| if (member.getName().equals(name)) { |
| return member; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Returns the configuration this distributed system was created with. |
| */ |
| public DistributionConfig getOriginalConfig() { |
| return originalConfig; |
| } |
| |
| /////////////////////// Utility Methods /////////////////////// |
| |
| /** |
| * Since {@link DistributedSystem#connect} guarantees that there is a canonical instance of |
| * <code>DistributedSystem</code> for each configuration, we can use the default implementation of |
| * <code>equals</code>. |
| * |
| * @see #sameAs |
| */ |
| @Override |
| public boolean equals(Object o) { |
| return super.equals(o); |
| } |
| |
| /** |
| * Since we use the default implementation of {@link #equals equals}, we can use the default |
| * implementation of <code>hashCode</code>. |
| */ |
| @Override |
| public int hashCode() { |
| return super.hashCode(); |
| } |
| |
| /** |
| * Returns a string describing this connection to distributed system (including highlights of its |
| * configuration). |
| */ |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("Connected "); |
| String name = getName(); |
| if (name != null && !name.equals("")) { |
| sb.append("\""); |
| sb.append(name); |
| sb.append("\" "); |
| } |
| sb.append("(id="); |
| sb.append(Integer.toHexString(System.identityHashCode(this))); |
| sb.append(") "); |
| |
| sb.append("to distributed system using "); |
| int port = config.getMcastPort(); |
| if (port != 0) { |
| sb.append("multicast port "); |
| sb.append(port); |
| sb.append(" "); |
| |
| } else { |
| sb.append("locators \""); |
| sb.append(config.getLocators()); |
| sb.append("\" "); |
| } |
| |
| File logFile = config.getLogFile(); |
| sb.append("logging to "); |
| if (logFile == null || logFile.equals(new File(""))) { |
| sb.append("standard out "); |
| |
| } else { |
| sb.append(logFile); |
| sb.append(" "); |
| } |
| |
| sb.append(" started at "); |
| sb.append((new Date(startTime)).toString()); |
| |
| if (!isConnected()) { |
| sb.append(" (closed)"); |
| } |
| |
| return sb.toString().trim(); |
| } |
| |
| // As the function execution stats can be lot in number, its better to put |
| // them in a map so that it will be accessible immediately |
| private final ConcurrentHashMap<String, FunctionStats> functionExecutionStatsMap = |
| new ConcurrentHashMap<>(); |
| private FunctionServiceStats functionServiceStats = null; |
| |
| public FunctionStats getFunctionStats(String textId) { |
| if (statsDisabled) { |
| return FunctionStats.dummy; |
| } |
| return JavaWorkarounds.computeIfAbsent(functionExecutionStatsMap, textId, |
| key -> new FunctionStats(this, key)); |
| } |
| |
| |
| public synchronized FunctionServiceStats getFunctionServiceStats() { |
| if (functionServiceStats == null) { |
| functionServiceStats = new FunctionServiceStats(this, "FunctionExecution"); |
| } |
| return functionServiceStats; |
| } |
| |
| |
| /** |
| * Makes note of a <code>ConnectListener</code> whose <code>onConnect</code> method will be |
| * invoked when a connection is created to a distributed system. |
| * |
| * @return set of currently existing system connections |
| */ |
| public static List addConnectListener(ConnectListener listener) { |
| synchronized (existingSystemsLock) { |
| synchronized (connectListeners) { |
| connectListeners.add(listener); |
| return existingSystems; |
| } |
| } |
| } |
| |
| public static void removeConnectListener(ConnectListener listener) { |
| synchronized (existingSystemsLock) { |
| synchronized (connectListeners) { |
| connectListeners.remove(listener); |
| } |
| } |
| } |
| |
| /** |
| * Makes note of a <code>ReconnectListener</code> whose <code>onReconnect</code> method will be |
| * invoked when a connection is recreated to a distributed system during auto-reconnect. |
| * <p> |
| * <p> |
| * The ReconnectListener set is cleared after a disconnect. |
| */ |
| public static void addReconnectListener(ReconnectListener listener) { |
| synchronized (existingSystemsLock) { |
| synchronized (reconnectListeners) { |
| reconnectListeners.add(listener); |
| } |
| } |
| } |
| |
| /** |
| * Notifies all registered <code>ConnectListener</code>s that a connection to a distributed system |
| * has been created. |
| */ |
| private static void notifyConnectListeners(InternalDistributedSystem sys) { |
| synchronized (connectListeners) { |
| for (ConnectListener listener : connectListeners) { |
| try { |
| listener.onConnect(sys); |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| sys.getLogWriter() |
| .severe("ConnectListener threw...", t); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Notifies all registered <code>ReconnectListener</code>s that a connection to a distributed |
| * system has been recreated. |
| */ |
| private static void notifyReconnectListeners(InternalDistributedSystem oldsys, |
| InternalDistributedSystem newsys, boolean starting) { |
| List<ReconnectListener> listeners; |
| synchronized (reconnectListeners) { |
| listeners = new ArrayList<>(reconnectListeners); |
| } |
| for (ReconnectListener listener : listeners) { |
| try { |
| if (starting) { |
| listener.reconnecting(oldsys); |
| } else { |
| listener.onReconnect(oldsys, newsys); |
| } |
| } catch (Throwable t) { |
| Error err; |
| if (t instanceof OutOfMemoryError || t instanceof UnknownError) { |
| err = (Error) t; |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| // Whenever you catch Error or Throwable, you must also |
| // check for fatal JVM error (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.fatal("ConnectListener threw...", t); |
| } |
| } |
| } |
| |
| /** |
| * Notifies all resource event listeners. All exceptions are caught here and only a warning |
| * message is printed in the log |
| * |
| * @param event Enumeration depicting particular resource event |
| * @param resource the actual resource object. |
| */ |
| private void notifyResourceEventListeners(ResourceEvent event, Object resource) { |
| for (ResourceEventsListener listener : resourceListeners) { |
| try { |
| listener.handleEvent(event, resource); |
| } catch (CancelException e) { |
| // ignore |
| logger.info("Skipping notifyResourceEventListeners for {} due to cancellation", event); |
| } catch (GemFireSecurityException | ManagementException ex) { |
| if (event == ResourceEvent.CACHE_CREATE) { |
| throw ex; |
| } else { |
| logger.warn(ex.getMessage(), ex); |
| } |
| } catch (Exception err) { |
| logger.warn(err.getMessage(), err); |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable t) { |
| SystemFailure.checkFailure(); |
| logger.warn(t.getMessage(), t); |
| } |
| } |
| } |
| |
| /** |
| * Makes note of a <code>DisconnectListener</code> whose <code>onDisconnect</code> method will be |
| * invoked when this connection to the distributed system is disconnected. |
| */ |
| public void addDisconnectListener(DisconnectListener listener) { |
| synchronized (disconnectListeners) { |
| disconnectListeners.add(listener); |
| |
| boolean disconnectThreadBoolean = isDisconnectThread.get(); |
| |
| if (!disconnectThreadBoolean) { |
| // Don't add disconnect listener after messaging has been disabled. |
| // Do this test _after_ adding the listener to narrow the window. |
| // It's possible to miss it still and never invoke the listener, but |
| // other shutdown conditions will presumably get flagged. |
| String reason = stopper.cancelInProgress(); |
| if (reason != null) { |
| disconnectListeners.remove(listener); // don't leave in the list! |
| throw new DistributedSystemDisconnectedException( |
| String.format("No listeners permitted after shutdown: %s", |
| reason), |
| dm.getRootCause()); |
| } |
| } |
| } // synchronized |
| } |
| |
| /** |
| * Removes a <code>DisconnectListener</code> from the list of listeners that will be notified when |
| * this connection to the distributed system is disconnected. |
| */ |
| public void removeDisconnectListener(DisconnectListener listener) { |
| synchronized (disconnectListeners) { |
| disconnectListeners.remove(listener); |
| } |
| } |
| |
| /** |
| * Returns any existing <code>InternalDistributedSystem</code> instance. Returns <code>null</code> |
| * if no instance exists. |
| */ |
| public static InternalDistributedSystem getAnyInstance() { |
| List l = existingSystems; |
| if (l.isEmpty()) { |
| return null; |
| } else { |
| return (InternalDistributedSystem) l.get(0); |
| } |
| } |
| |
| /** |
| * Test hook |
| */ |
| public static List getExistingSystems() { |
| return existingSystems; |
| } |
| |
| @Override |
| public Properties getProperties() { |
| return config.toProperties(); |
| } |
| |
| @Override |
| public Properties getSecurityProperties() { |
| return config.getSecurityProps(); |
| } |
| |
| /** |
| * Installs a shutdown hook to ensure that we are disconnected if an application VM shuts down |
| * without first calling disconnect itself. |
| */ |
| @Immutable |
| public static final Thread shutdownHook; |
| |
| static { |
| // Create a shutdown hook to cleanly close connection if |
| // VM shuts down with an open connection. |
| Thread tmp_shutdownHook = null; |
| try { |
| // Added for bug 38407 |
| if (!Boolean.getBoolean(DISABLE_SHUTDOWN_HOOK_PROPERTY)) { |
| tmp_shutdownHook = new LoggingThread(SHUTDOWN_HOOK_NAME, false, () -> { |
| InternalDistributedSystem ds = InternalDistributedSystem.getAnyInstance(); |
| setThreadsSocketPolicy(true /* conserve sockets */); |
| if (ds != null && ds.isConnected()) { |
| logger.info("VM is exiting - shutting down distributed system"); |
| DurableClientAttributes dca = ds.getDistributedMember() |
| .getDurableClientAttributes(); |
| boolean isDurableClient = false; |
| |
| if (dca != null) { |
| isDurableClient = (!(dca.getId() == null || dca.getId().isEmpty())); |
| } |
| |
| ds.disconnect(false, |
| "normal disconnect", |
| isDurableClient/* keep alive drive from this */); |
| // this was how we wanted to do it for 5.7, but there were shutdown |
| // issues in PR/dlock (see bug 39287) |
| // InternalDistributedSystem ids = (InternalDistributedSystem)ds; |
| // if (ids.getDistributionManager() != null && |
| // ids.getDistributionManager().getMembershipManager() != null) { |
| // ids.getDistributionManager().getMembershipManager() |
| // .uncleanShutdown("VM is exiting", null); |
| // } |
| } |
| }); |
| Runtime.getRuntime().addShutdownHook(tmp_shutdownHook); |
| } |
| } finally { |
| shutdownHook = tmp_shutdownHook; |
| } |
| } |
| /////////////////////// Inner Classes /////////////////////// |
| |
| /** |
| * A listener that gets invoked before this connection to the distributed system is disconnected. |
| */ |
| public interface DisconnectListener { |
| |
| /** |
| * Invoked before a connection to the distributed system is disconnected. |
| * |
| * @param sys the the system we are disconnecting from process should take before returning. |
| */ |
| void onDisconnect(InternalDistributedSystem sys); |
| |
| } |
| |
| /** |
| * A listener that gets invoked before and after a successful auto-reconnect |
| */ |
| public interface ReconnectListener { |
| |
| /** |
| * Invoked when reconnect attempts are initiated |
| * |
| * @param oldSystem the old DS, which is in a partially disconnected state and cannot be used |
| * for messaging |
| */ |
| void reconnecting(InternalDistributedSystem oldSystem); |
| |
| /** |
| * Invoked after a reconnect to the distributed system |
| * |
| * @param oldSystem the old DS |
| * @param newSystem the new DS |
| */ |
| void onReconnect(InternalDistributedSystem oldSystem, InternalDistributedSystem newSystem); |
| } |
| |
| /** |
| * A listener that gets invoked after this connection to the distributed system is disconnected |
| */ |
| public interface ShutdownListener extends DisconnectListener { |
| |
| /** |
| * Invoked after the connection to the distributed system has been disconnected |
| * |
| */ |
| void onShutdown(InternalDistributedSystem sys); |
| } |
| |
| /** |
| * Integer representing number of tries already made to reconnect and that failed. |
| */ |
| @MakeNotStatic |
| private static final AtomicInteger reconnectAttemptCounter = new AtomicInteger(); |
| |
| /** |
| * Boolean indicating if DS needs to reconnect and reconnect is in progress. |
| */ |
| private volatile boolean attemptingToReconnect = false; |
| |
| /** |
| * Boolean indicating this DS joined through a reconnect attempt |
| */ |
| private volatile boolean reconnected = false; |
| |
| /** |
| * If reconnect fails due to an exception it will be in this field |
| */ |
| private Exception reconnectException; |
| |
| /** |
| * Boolean indicating that this member has been shunned by other members or a network partition |
| * has occurred |
| */ |
| private volatile boolean forcedDisconnect = false; |
| |
| /** |
| * Used to keep track of the DS created by doing an reconnect on this. |
| */ |
| private volatile InternalDistributedSystem reconnectDS; |
| /** |
| * Was this distributed system started with FORCE_LOCATOR_DM_TYPE=true? We need to know when |
| * reconnecting. |
| */ |
| private boolean locatorDMTypeForced; |
| |
| |
| /** |
| * Returns true if we are reconnecting the distributed system or reconnect has completed. If this |
| * returns true it means that this instance of the DS is now disconnected and unusable. |
| */ |
| @Override |
| public boolean isReconnecting() { |
| InternalDistributedSystem rds = reconnectDS; |
| if (!attemptingToReconnect) { |
| return false; |
| } |
| if (reconnectCancelled) { |
| return false; |
| } |
| return (rds == null || !rds.isConnected()); |
| } |
| |
| |
| /** |
| * Returns true if we are reconnecting the distributed system and this instance was created for |
| * one of the connection attempts. If the connection succeeds this state is cleared and this |
| * method will commence to return false. |
| */ |
| boolean isReconnectingDS() { |
| return isReconnectingDS; |
| } |
| |
| /** |
| * returns the membership socket of the old distributed system, if available, when |
| * isReconnectingDS returns true. This is used to connect the new DM to the distributed system |
| * through RemoteTransportConfig. |
| */ |
| MembershipInformation oldDSMembershipInfo() { |
| if (quorumChecker != null) { |
| return quorumChecker.getMembershipInfo(); |
| } |
| return null; |
| } |
| |
| /** |
| * Returns true if this DS reconnected to the distributed system after a forced disconnect or loss |
| * of required-roles |
| */ |
| public boolean reconnected() { |
| return reconnected; |
| } |
| |
| /** |
| * Returns true if this DS has been kicked out of the distributed system |
| */ |
| public boolean forcedDisconnect() { |
| return forcedDisconnect; |
| } |
| |
| /** |
| * If true then this DS will never reconnect. |
| */ |
| private volatile boolean reconnectCancelled = false; |
| |
| /** |
| * Make sure this instance of DS never does a reconnect. Also if reconnect is in progress cancel |
| * it. |
| */ |
| private void cancelReconnect() { |
| reconnectCancelled = true; |
| if (isReconnecting()) { |
| synchronized (reconnectLock) { // should the synchronized be first on this and |
| // then on this.reconnectLock. |
| reconnectLock.notifyAll(); |
| } |
| } |
| } |
| |
| /** |
| * This lock must be acquired *after* locking any GemFireCache. |
| */ |
| private final Object reconnectLock = new Object(); |
| |
| /** |
| * Tries to reconnect to the distributed system on role loss if configure to reconnect. |
| * |
| * @param oldCache cache that has apparently failed |
| */ |
| public boolean tryReconnect(boolean forcedDisconnect, String reason, InternalCache oldCache) { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isReconnectingDS && forcedDisconnect) { |
| return false; |
| } |
| synchronized (InternalCacheBuilder.class) { |
| synchronized (GemFireCacheImpl.class) { |
| // bug 39329: must lock reconnectLock *after* the cache |
| synchronized (reconnectLock) { |
| if (!forcedDisconnect && !oldCache.isClosed() |
| && oldCache.getCachePerfStats().getReliableRegionsMissing() == 0) { |
| if (isDebugEnabled) { |
| logger.debug("tryReconnect: No required roles are missing."); |
| } |
| return false; |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("tryReconnect: forcedDisconnect={}", forcedDisconnect); |
| } |
| if (forcedDisconnect) { |
| if (config.getDisableAutoReconnect()) { |
| if (isDebugEnabled) { |
| logger.debug("tryReconnect: auto reconnect after forced disconnect is disabled"); |
| } |
| return false; |
| } |
| } |
| reconnect(forcedDisconnect, reason); |
| return reconnectDS != null && reconnectDS.isConnected(); |
| } // synchronized reconnectLock |
| } // synchronized cache |
| } // synchronized CacheFactory.class |
| } |
| |
| |
| /** |
| * Returns the value for the number of time reconnect has been tried. Test method used by DUnit. |
| */ |
| public static int getReconnectAttemptCounter() { |
| return reconnectAttemptCounter.get(); |
| } |
| |
| /** |
| * A reconnect is tried when gemfire is configured to reconnect in case of a required role loss. |
| * The reconnect will try reconnecting to the distributed system every max-time-out millseconds |
| * for max-number-of-tries configured in gemfire.properties file. It uses the cache.xml file to |
| * intialize the cache and create regions. |
| */ |
| private void reconnect(boolean forcedDisconnect, String reason) { |
| |
| // Collect all the state for cache |
| // Collect all the state for Regions |
| // Close the cache, |
| // loop trying to connect, waiting before each attempt |
| // |
| // If reconnecting for lost-roles the reconnected system's cache will decide |
| // whether the reconnected system should stay up. After max-tries we will |
| // give up. |
| // |
| // If reconnecting for forced-disconnect we ignore max-tries and keep attempting |
| // to join the distributed system until successful |
| |
| attemptingToReconnect = true; |
| InternalDistributedSystem ids = InternalDistributedSystem.getAnyInstance(); |
| if (ids == null) { |
| ids = this; |
| } |
| |
| // first save the current cache description. This is created by |
| // the membership manager when forced-disconnect starts. If we're |
| // reconnecting for lost roles then this will be null |
| String cacheXML = null; |
| List<CacheServerCreation> cacheServerCreation = null; |
| Set<MeterRegistry> meterRegistries = null; |
| |
| InternalCache cache = GemFireCacheImpl.getInstance(); |
| if (cache != null) { |
| cacheXML = cache.getCacheConfig().getCacheXMLDescription(); |
| cacheServerCreation = cache.getCacheConfig().getCacheServerCreation(); |
| meterRegistries = cache.getMeterSubregistries(); |
| } |
| |
| DistributionConfig oldConfig = ids.getConfig(); |
| Properties configProps = config.toProperties(); |
| configProps.putAll(config.toSecurityProperties()); |
| |
| int timeOut = oldConfig.getMaxWaitTimeForReconnect(); |
| int memberTimeout = oldConfig.getMemberTimeout(); |
| // we need to make sure that a surviving member is able |
| // to take over coordination before trying to auto-reconnect. |
| // failure detection can take 4 member-timeout intervals |
| // so we set that as a minimum. (suspect, check suspect, final check, send new view) |
| final int intervalsAllowedForFailureDetection = 4; |
| timeOut = Math.max(timeOut, memberTimeout * intervalsAllowedForFailureDetection); |
| |
| int maxTries = oldConfig.getMaxNumReconnectTries(); |
| |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| if (Thread.currentThread().getName().equals("DisconnectThread")) { |
| if (isDebugEnabled) { |
| logger.debug("changing thread name to ReconnectThread"); |
| } |
| Thread.currentThread().setName("ReconnectThread"); |
| } |
| |
| // get the membership manager for quorum checks |
| MembershipManager mbrMgr = dm.getMembershipManager(); |
| quorumChecker = mbrMgr.getQuorumChecker(); |
| if (logger.isDebugEnabled()) { |
| if (quorumChecker == null) { |
| logger.debug("No quorum checks will be performed during reconnect attempts"); |
| } else { |
| logger.debug("Initialized quorum checking service: {}", quorumChecker); |
| } |
| } |
| |
| // LOG:CLEANUP: deal with reconnect and INHIBIT_DM_BANNER -- this should be ok now |
| String appendToLogFile = System.getProperty(APPEND_TO_LOG_FILE); |
| if (appendToLogFile == null) { |
| System.setProperty(APPEND_TO_LOG_FILE, "true"); |
| } |
| String inhibitBanner = System.getProperty(InternalLocator.INHIBIT_DM_BANNER); |
| if (inhibitBanner == null) { |
| System.setProperty(InternalLocator.INHIBIT_DM_BANNER, "true"); |
| } |
| if (forcedDisconnect) { |
| systemAttemptingReconnect = this; |
| } |
| try { |
| while (reconnectDS == null || !reconnectDS.isConnected()) { |
| if (isReconnectCancelled()) { |
| break; |
| } |
| |
| if (!forcedDisconnect) { |
| if (isDebugEnabled) { |
| logger.debug("Max number of tries : {} and max time out : {}", maxTries, timeOut); |
| } |
| if (reconnectAttemptCounter.get() >= maxTries) { |
| if (isDebugEnabled) { |
| logger.debug( |
| "Stopping the checkrequiredrole thread because reconnect : {} reached the max number of reconnect tries : {}", |
| reconnectAttemptCounter, maxTries); |
| } |
| InternalCache internalCache = dm.getCache(); |
| if (internalCache == null) { |
| throw new CacheClosedException( |
| "Some required roles missing"); |
| } else { |
| throw internalCache.getCacheClosedException( |
| "Some required roles missing"); |
| } |
| } |
| } |
| |
| reconnectAttemptCounter.getAndIncrement(); |
| |
| if (isReconnectCancelled()) { |
| return; |
| } |
| |
| logger.info("Disconnecting old DistributedSystem to prepare for a reconnect attempt"); |
| |
| try { |
| disconnect(true, reason, false); |
| } catch (Exception ee) { |
| logger.warn("Exception disconnecting for reconnect", ee); |
| } |
| |
| TypeRegistry.init(); |
| |
| try { |
| reconnectLock.wait(timeOut); |
| } catch (InterruptedException e) { |
| logger.warn("Waiting thread for reconnect got interrupted."); |
| Thread.currentThread().interrupt(); |
| return; |
| } |
| |
| if (isReconnectCancelled()) { |
| return; |
| } |
| |
| |
| logger.info( |
| "Attempting to reconnect to the distributed system. This is attempt #{}.", |
| reconnectAttemptCounter); |
| |
| int saveNumberOfTries = reconnectAttemptCounter.get(); |
| try { |
| // notify listeners of each attempt and then again after successful |
| notifyReconnectListeners(this, reconnectDS, true); |
| |
| if (locatorDMTypeForced) { |
| System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true"); |
| } |
| |
| configProps.put(DistributionConfig.DS_RECONNECTING_NAME, Boolean.TRUE); |
| if (quorumChecker != null) { |
| configProps.put(DistributionConfig.DS_QUORUM_CHECKER_NAME, quorumChecker); |
| } |
| |
| InternalDistributedSystem newDS = null; |
| if (isReconnectCancelled()) { |
| return; |
| } |
| |
| try { |
| |
| newDS = (InternalDistributedSystem) connect(configProps); |
| |
| } catch (CancelException e) { |
| if (isReconnectCancelled()) { |
| return; |
| } else { |
| throw e; |
| } |
| } finally { |
| if (newDS == null && quorumChecker != null) { |
| // make sure the quorum checker is listening for messages from former members |
| quorumChecker.resume(); |
| } |
| } |
| |
| if (reconnectCancelled) { |
| newDS.disconnect(); |
| continue; |
| } |
| |
| reconnectDS = newDS; |
| } catch (SystemConnectException e) { |
| logger.debug("Attempt to reconnect failed with SystemConnectException"); |
| |
| if (e.getMessage().contains("Rejecting the attempt of a member using an older version")) { |
| logger.warn("Exception occurred while trying to connect the system during reconnect", |
| e); |
| attemptingToReconnect = false; |
| reconnectException = e; |
| return; |
| } |
| logger.warn("Caught SystemConnectException in reconnect", e); |
| continue; |
| } catch (GemFireConfigException e) { |
| logger.warn("Caught GemFireConfigException in reconnect", e); |
| continue; |
| } catch (Exception e) { |
| logger.warn("Exception occurred while trying to connect the system during reconnect", |
| e); |
| attemptingToReconnect = false; |
| reconnectException = e; |
| return; |
| } finally { |
| if (locatorDMTypeForced) { |
| System.getProperties().remove(InternalLocator.FORCE_LOCATOR_DM_TYPE); |
| } |
| reconnectAttemptCounter.set(saveNumberOfTries); |
| } |
| |
| |
| DistributionManager newDM = reconnectDS.getDistributionManager(); |
| if (newDM instanceof ClusterDistributionManager) { |
| // Admin systems don't carry a cache, but for others we can now create |
| // a cache |
| if (newDM.getDMType() != ClusterDistributionManager.ADMIN_ONLY_DM_TYPE) { |
| boolean retry; |
| do { |
| retry = false; |
| try { |
| InternalCacheBuilder cacheBuilder = new InternalCacheBuilder() |
| .setCacheXMLDescription(cacheXML); |
| for (MeterRegistry meterRegistry : meterRegistries) { |
| cacheBuilder.addMeterSubregistry(meterRegistry); |
| } |
| cache = cacheBuilder.create(reconnectDS); |
| |
| if (!cache.isClosed()) { |
| createAndStartCacheServers(cacheServerCreation, cache); |
| if (cache.getCachePerfStats().getReliableRegionsMissing() == 0) { |
| reconnectAttemptCounter.set(0); |
| } |
| } |
| |
| } catch (GemFireConfigException e) { |
| if (e.getCause() instanceof ClusterConfigurationNotAvailableException) { |
| retry = true; |
| logger.info("Reconnected to the cluster but the cluster configuration service " |
| + "isn't available - will retry creating the cache"); |
| try { |
| Thread.sleep(5000); |
| } catch (InterruptedException e1) { |
| reconnectCancelled = true; |
| reconnectException = e; |
| break; |
| } |
| } |
| } catch (Exception e) { |
| // We need to give up because we'll probably get the same exception in |
| // the next attempt to build the cache. |
| logger.warn( |
| "Exception occurred while trying to create the cache during reconnect. Auto-reconnect is terminating.", |
| e); |
| reconnectCancelled = true; |
| reconnectException = e; |
| break; |
| } |
| } while (retry); |
| } |
| } |
| |
| if (reconnectDS != null && reconnectDS.isConnected()) { |
| // make sure the new DS and cache are stable before exiting this loop |
| try { |
| Thread.sleep(config.getMemberTimeout() * 3L); |
| } catch (InterruptedException e) { |
| logger.info("Reconnect thread has been interrupted - exiting"); |
| Thread.currentThread().interrupt(); |
| reconnectCancelled = true; |
| reconnectException = e; |
| return; |
| } |
| } |
| } // while() |
| |
| if (isReconnectCancelled()) { |
| if (reconnectDS != null) { |
| reconnectDS.disconnect(); |
| } |
| } else { |
| reconnectDS.isReconnectingDS = false; |
| if (reconnectDS.isConnected()) { |
| notifyReconnectListeners(this, reconnectDS, false); |
| } |
| } |
| |
| } finally { |
| systemAttemptingReconnect = null; |
| attemptingToReconnect = false; |
| if (appendToLogFile == null) { |
| System.getProperties().remove(APPEND_TO_LOG_FILE); |
| } else { |
| System.setProperty(APPEND_TO_LOG_FILE, appendToLogFile); |
| } |
| if (inhibitBanner == null) { |
| System.getProperties().remove(InternalLocator.INHIBIT_DM_BANNER); |
| } else { |
| System.setProperty(InternalLocator.INHIBIT_DM_BANNER, inhibitBanner); |
| } |
| dm.getMembershipManager().setReconnectCompleted(true); |
| InternalDistributedSystem newds = reconnectDS; |
| if (newds != null) { |
| newds.getDM().getMembershipManager().setReconnectCompleted(true); |
| } |
| if (quorumChecker != null) { |
| mbrMgr.releaseQuorumChecker(quorumChecker, reconnectDS); |
| } |
| } |
| |
| if (isReconnectCancelled()) { |
| logger.debug("reconnect can no longer be done because of an explicit disconnect"); |
| if (reconnectDS != null) { |
| reconnectDS.disconnect(); |
| } |
| attemptingToReconnect = false; |
| } else if (reconnectDS != null && reconnectDS.isConnected()) { |
| logger.info("Reconnect completed.\nNew DistributedSystem is {}\nNew Cache is {}", reconnectDS, |
| cache); |
| } |
| } |
| |
| |
| /** |
| * after an auto-reconnect we may need to recreate a cache server and start it |
| */ |
| public void createAndStartCacheServers(List<CacheServerCreation> cacheServerCreation, |
| InternalCache cache) { |
| |
| List<CacheServer> servers = cache.getCacheServers(); |
| |
| // if there used to be a cache server but now there isn't one we need |
| // to recreate it. |
| if (servers.isEmpty() && cacheServerCreation != null) { |
| for (CacheServerCreation bridge : cacheServerCreation) { |
| CacheServerImpl impl = (CacheServerImpl) cache.addCacheServer(); |
| impl.configureFrom(bridge); |
| } |
| } |
| |
| servers = cache.getCacheServers(); |
| for (CacheServer server : servers) { |
| try { |
| if (!server.isRunning()) { |
| server.start(); |
| } |
| } catch (IOException ex) { |
| throw new GemFireIOException( |
| String.format("While starting cache server %s", server), |
| ex); |
| } |
| } |
| |
| } |
| |
| /** |
| * Validates that the configuration provided is the same as the configuration for this |
| * InternalDistributedSystem |
| * |
| * @param propsToCheck the Properties instance to compare with the existing Properties |
| * |
| * @throws IllegalStateException when the configuration is not the same other returns |
| */ |
| public void validateSameProperties(Properties propsToCheck, boolean isConnected) { |
| if (!sameAs(propsToCheck, isConnected)) { |
| StringBuilder sb = new StringBuilder(); |
| |
| DistributionConfig wanted = DistributionConfigImpl.produce(propsToCheck); |
| |
| String[] validAttributeNames = originalConfig.getAttributeNames(); |
| for (String attName : validAttributeNames) { |
| Object expectedAtt = wanted.getAttributeObject(attName); |
| String expectedAttStr = expectedAtt.toString(); |
| Object actualAtt = originalConfig.getAttributeObject(attName); |
| String actualAttStr = actualAtt.toString(); |
| sb.append(" "); |
| sb.append(attName); |
| sb.append("=\""); |
| if (actualAtt.getClass().isArray()) { |
| actualAttStr = arrayToString(actualAtt); |
| expectedAttStr = arrayToString(expectedAtt); |
| } |
| |
| sb.append(actualAttStr); |
| sb.append("\""); |
| if (!expectedAttStr.equals(actualAttStr)) { |
| sb.append(" ***(wanted \""); |
| sb.append(expectedAtt); |
| sb.append("\")***"); |
| } |
| |
| sb.append("\n"); |
| } |
| |
| if (creationStack == null) { |
| throw new IllegalStateException( |
| String.format( |
| "A connection to a distributed system already exists in this VM. It has the following configuration:%s", |
| sb.toString())); |
| } else { |
| throw new IllegalStateException( |
| String.format( |
| "A connection to a distributed system already exists in this VM. It has the following configuration:%s", |
| sb.toString()), |
| creationStack); |
| } |
| } |
| } |
| |
| public static String arrayToString(Object obj) { |
| if (!obj.getClass().isArray()) { |
| return "-not-array-object-"; |
| } |
| StringBuilder buff = new StringBuilder("["); |
| int arrayLength = Array.getLength(obj); |
| for (int i = 0; i < arrayLength - 1; i++) { |
| buff.append(Array.get(obj, i).toString()); |
| buff.append(","); |
| } |
| if (arrayLength > 0) { |
| buff.append(Array.get(obj, arrayLength - 1).toString()); |
| } |
| buff.append("]"); |
| |
| return buff.toString(); |
| } |
| |
| public boolean isShareSockets() { |
| return shareSockets; |
| } |
| |
| public void setShareSockets(boolean shareSockets) { |
| this.shareSockets = shareSockets; |
| } |
| |
| /** |
| * A listener that gets invoked whenever a connection is created to a distributed system |
| */ |
| public interface ConnectListener { |
| |
| /** |
| * Invoked after a connection to the distributed system is created |
| */ |
| void onConnect(InternalDistributedSystem sys); |
| } |
| |
| public boolean hasAlertListenerFor(DistributedMember member) { |
| return hasAlertListenerFor(member, AlertLevel.WARNING.intLevel()); |
| } |
| |
| public boolean hasAlertListenerFor(DistributedMember member, int severity) { |
| return alertingService.hasAlertListener(member, AlertLevel.find(severity)); |
| } |
| |
| /** |
| * see {@link org.apache.geode.admin.AdminDistributedSystemFactory} |
| * |
| * @since GemFire 5.7 |
| */ |
| public static void setEnableAdministrationOnly(boolean adminOnly) { |
| DistributedSystem.setEnableAdministrationOnly(adminOnly); |
| } |
| |
| public static void setCommandLineAdmin(boolean adminOnly) { |
| DistributedSystem.setEnableAdministrationOnly(adminOnly); |
| } |
| |
| @Override |
| public boolean waitUntilReconnected(long time, TimeUnit units) throws InterruptedException { |
| int sleepTime = 1000; |
| long endTime = System.currentTimeMillis(); |
| if (time < 0) { |
| endTime = Long.MAX_VALUE; |
| } else { |
| endTime += TimeUnit.MILLISECONDS.convert(time, units); |
| } |
| synchronized (reconnectLock) { |
| while (isReconnecting()) { |
| if (reconnectCancelled) { |
| break; |
| } |
| if (time != 0) { |
| reconnectLock.wait(sleepTime); |
| } |
| if (time == 0 || System.currentTimeMillis() > endTime) { |
| break; |
| } |
| } |
| |
| if (reconnectException != null) { |
| throw new DistributedSystemDisconnectedException( |
| "Reconnect attempts terminated due to exception", reconnectException); |
| } |
| InternalDistributedSystem recon = reconnectDS; |
| return !attemptingToReconnect && recon != null && recon.isConnected(); |
| } |
| } |
| |
| @Override |
| public DistributedSystem getReconnectedSystem() { |
| return reconnectDS; |
| } |
| |
| @Override |
| public void stopReconnecting() { |
| reconnectCancelled = true; |
| synchronized (reconnectLock) { |
| reconnectLock.notify(); |
| } |
| disconnect(false, "stopReconnecting was invoked", false); |
| attemptingToReconnect = false; |
| } |
| |
| public void stopReconnectingNoDisconnect() { |
| reconnectCancelled = true; |
| synchronized (reconnectLock) { |
| reconnectLock.notify(); |
| } |
| attemptingToReconnect = false; |
| } |
| |
| /** |
| * Provides hook for dunit to generate and store a detailed creation stack trace that includes the |
| * keys/values of DistributionConfig including security related attributes without introducing |
| * Privacy Violations that Fortify will complain about. |
| * </p> |
| */ |
| public interface CreationStackGenerator { |
| |
| Throwable generateCreationStack(final DistributionConfig config); |
| } |
| |
| public void setCache(InternalCache instance) { |
| dm.setCache(instance); |
| } |
| |
| public InternalCache getCache() { |
| return dm == null ? null : dm.getCache(); |
| } |
| |
| private static StatisticsManagerFactory defaultStatisticsManagerFactory() { |
| return (name, startTime, statsDisabled) -> { |
| if (statsDisabled) { |
| return new DummyStatisticsRegistry(name, startTime); |
| } else { |
| return new StatisticsRegistry(name, startTime); |
| } |
| }; |
| } |
| |
| public static class Builder { |
| |
| private final Properties configProperties; |
| |
| private SecurityConfig securityConfig; |
| |
| public Builder(Properties configProperties) { |
| this.configProperties = configProperties; |
| } |
| |
| public Builder setSecurityConfig(SecurityConfig securityConfig) { |
| this.securityConfig = securityConfig; |
| return this; |
| } |
| |
| /** |
| * Builds and initializes new instance of InternalDistributedSystem. |
| */ |
| public InternalDistributedSystem build() { |
| if (securityConfig == null) { |
| securityConfig = new SecurityConfig(null, null); |
| } |
| |
| boolean stopThreads = true; |
| InternalDataSerializer.checkSerializationVersion(); |
| try { |
| SystemFailure.startThreads(); |
| InternalDistributedSystem newSystem = |
| new InternalDistributedSystem(new ConnectionConfigImpl( |
| configProperties), defaultStatisticsManagerFactory()); |
| newSystem |
| .initialize(securityConfig.getSecurityManager(), securityConfig.getPostProcessor()); |
| notifyConnectListeners(newSystem); |
| stopThreads = false; |
| return newSystem; |
| } finally { |
| if (stopThreads) { |
| SystemFailure.stopThreads(); |
| } |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| public static class BuilderForTesting { |
| |
| private final Properties configProperties; |
| |
| private DistributionManager distributionManager; |
| private StatisticsManagerFactory statisticsManagerFactory = defaultStatisticsManagerFactory(); |
| |
| public BuilderForTesting(Properties configProperties) { |
| this.configProperties = configProperties; |
| } |
| |
| public BuilderForTesting setDistributionManager(DistributionManager distributionManager) { |
| this.distributionManager = distributionManager; |
| return this; |
| } |
| |
| public BuilderForTesting setStatisticsManagerFactory( |
| StatisticsManagerFactory statisticsManagerFactory) { |
| this.statisticsManagerFactory = statisticsManagerFactory; |
| return this; |
| } |
| |
| /** |
| * Builds instance without initializing it for testing. |
| */ |
| public InternalDistributedSystem build() { |
| ConnectionConfigImpl connectionConfig = new ConnectionConfigImpl(configProperties); |
| |
| InternalDistributedSystem internalDistributedSystem = |
| new InternalDistributedSystem(connectionConfig, statisticsManagerFactory); |
| |
| internalDistributedSystem.config = |
| new RuntimeDistributionConfigImpl(internalDistributedSystem); |
| internalDistributedSystem.dm = distributionManager; |
| internalDistributedSystem.isConnected = true; |
| |
| return internalDistributedSystem; |
| } |
| } |
| } |