/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.geode.distributed.internal;

import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;

import java.io.File;
import java.io.IOException;
import java.io.Reader;
import java.lang.reflect.Array;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.SortedSet;
import java.util.StringTokenizer;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import io.micrometer.core.instrument.MeterRegistry;
import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.ForcedDisconnectException;
import org.apache.geode.GemFireConfigException;
import org.apache.geode.GemFireIOException;
import org.apache.geode.LogWriter;
import org.apache.geode.StatisticDescriptor;
import org.apache.geode.Statistics;
import org.apache.geode.StatisticsType;
import org.apache.geode.SystemConnectException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.DurableClientAttributes;
import org.apache.geode.distributed.internal.locks.GrantorRequestProcessor;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.MembershipManager;
import org.apache.geode.distributed.internal.membership.QuorumChecker;
import org.apache.geode.distributed.internal.membership.gms.messenger.MembershipInformation;
import org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.DSFIDFactory;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.InternalInstantiator;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.admin.remote.DistributionLocatorId;
import org.apache.geode.internal.alerting.AlertLevel;
import org.apache.geode.internal.alerting.AlertMessaging;
import org.apache.geode.internal.alerting.AlertingService;
import org.apache.geode.internal.alerting.AlertingSession;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheBuilder;
import org.apache.geode.internal.cache.execute.FunctionServiceStats;
import org.apache.geode.internal.cache.execute.FunctionStats;
import org.apache.geode.internal.cache.execute.InternalFunctionService;
import org.apache.geode.internal.cache.tier.sockets.EncryptorImpl;
import org.apache.geode.internal.cache.xmlcache.CacheServerCreation;
import org.apache.geode.internal.config.ClusterConfigurationNotAvailableException;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.logging.LogConfig;
import org.apache.geode.internal.logging.LogConfigListener;
import org.apache.geode.internal.logging.LogConfigSupplier;
import org.apache.geode.internal.logging.LogFile;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LogWriterFactory;
import org.apache.geode.internal.logging.LoggingSession;
import org.apache.geode.internal.logging.LoggingThread;
import org.apache.geode.internal.logging.NullLoggingSession;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.offheap.MemoryAllocator;
import org.apache.geode.internal.offheap.OffHeapStorage;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.security.SecurityServiceFactory;
import org.apache.geode.internal.statistics.DummyStatisticsRegistry;
import org.apache.geode.internal.statistics.GemFireStatSampler;
import org.apache.geode.internal.statistics.StatisticsConfig;
import org.apache.geode.internal.statistics.StatisticsManager;
import org.apache.geode.internal.statistics.StatisticsManagerFactory;
import org.apache.geode.internal.statistics.StatisticsRegistry;
import org.apache.geode.internal.statistics.platform.LinuxProcFsStatistics;
import org.apache.geode.internal.tcp.ConnectionTable;
import org.apache.geode.internal.util.JavaWorkarounds;
import org.apache.geode.management.ManagementException;
import org.apache.geode.pdx.internal.TypeRegistry;
import org.apache.geode.security.GemFireSecurityException;
import org.apache.geode.security.PostProcessor;
import org.apache.geode.security.SecurityManager;

/**
 * The concrete implementation of {@link DistributedSystem} that provides internal-only
 * functionality.
 *
 * @since GemFire 3.0
 */
public class InternalDistributedSystem extends DistributedSystem
    implements LogConfigSupplier {

  /**
   * True if the user is allowed lock when memory resources appear to be overcommitted.
   */
  private static final boolean ALLOW_MEMORY_LOCK_WHEN_OVERCOMMITTED =
      Boolean.getBoolean(GEMFIRE_PREFIX + "Cache.ALLOW_MEMORY_OVERCOMMIT");
  private static final Logger logger = LogService.getLogger();

  private static final String DISABLE_MANAGEMENT_PROPERTY =
      GEMFIRE_PREFIX + "disableManagement";

  public static final String ALLOW_MULTIPLE_SYSTEMS_PROPERTY =
      GEMFIRE_PREFIX + "ALLOW_MULTIPLE_SYSTEMS";

  /**
   * If auto-reconnect is going on this will hold a reference to it
   */
  @MakeNotStatic
  public static volatile DistributedSystem systemAttemptingReconnect;

  @Immutable
  public static final CreationStackGenerator DEFAULT_CREATION_STACK_GENERATOR = config -> null;

  // the following is overridden from DistributedTestCase to fix #51058
  @MutableForTesting
  public static final AtomicReference<CreationStackGenerator> TEST_CREATION_STACK_GENERATOR =
      new AtomicReference<>(DEFAULT_CREATION_STACK_GENERATOR);

  /**
   * A value of Boolean.TRUE will identify a thread being used to execute
   * disconnectListeners. {@link #addDisconnectListener} will not throw ShutdownException if the
   * value is Boolean.TRUE.
   */
  private final ThreadLocal<Boolean> isDisconnectThread =
      ThreadLocal.withInitial(() -> Boolean.FALSE);

  private final StatisticsManager statisticsManager;

  /**
   * The distribution manager that is used to communicate with the distributed system.
   */
  protected DistributionManager dm;

  private final GrantorRequestProcessor.GrantorRequestContext grc;

  /** services provided by other modules */
  private Map<Class, DistributedSystemService> services = new HashMap<>();

  /**
   * If the experimental multiple-system feature is enabled, always create a new system.
   *
   * <p>
   * Otherwise, create a new InternalDistributedSystem with the given properties, or connect to an
   * existing one with the same properties.
   */
  public static InternalDistributedSystem connectInternal(Properties config,
      SecurityConfig securityConfig) {
    if (config == null) {
      config = new Properties();
    }

    if (Boolean.getBoolean(ALLOW_MULTIPLE_SYSTEMS_PROPERTY)) {
      return new Builder(config)
          .setSecurityConfig(securityConfig)
          .build();
    }

    synchronized (existingSystemsLock) {
      if (ClusterDistributionManager.isDedicatedAdminVM()) {
        // For a dedicated admin VM, check to see if there is already
        // a connect that will suit our purposes.
        InternalDistributedSystem existingSystem =
            (InternalDistributedSystem) getConnection(config);
        if (existingSystem != null) {
          return existingSystem;
        }

      } else {
        boolean existingSystemDisconnecting = true;
        boolean isReconnecting = false;
        while (!existingSystems.isEmpty() && existingSystemDisconnecting && !isReconnecting) {
          Assert.assertTrue(existingSystems.size() == 1);

          InternalDistributedSystem existingSystem = existingSystems.get(0);
          existingSystemDisconnecting = existingSystem.isDisconnecting();
          // a reconnecting DS will block on GemFireCache.class and a ReconnectThread
          // holds that lock and invokes this method, so we break out of the loop
          // if we detect this condition
          isReconnecting = existingSystem.isReconnectingDS();
          if (existingSystemDisconnecting) {
            boolean interrupted = Thread.interrupted();
            try {
              // no notify for existingSystemsLock, just to release the sync
              existingSystemsLock.wait(50);
            } catch (InterruptedException ex) {
              interrupted = true;
            } finally {
              if (interrupted) {
                Thread.currentThread().interrupt();
              }
            }
          } else if (existingSystem.isConnected()) {
            existingSystem.validateSameProperties(config, existingSystem.isConnected());
            return existingSystem;
          } else {
            throw new AssertionError(
                "system should not have both disconnecting==false and isConnected==false");
          }
        }
      }

      // Make a new connection to the distributed system
      InternalDistributedSystem newSystem = new Builder(config)
          .setSecurityConfig(securityConfig)
          .build();
      addSystem(newSystem);
      return newSystem;
    }
  }

  public GrantorRequestProcessor.GrantorRequestContext getGrantorRequestContext() {
    return grc;
  }

  /**
   * Numeric id that identifies this node in a DistributedSystem
   */
  private long id;

  /**
   * The log writer used to log information messages
   */
  @Deprecated
  protected InternalLogWriter logWriter = null;

  /**
   * The log writer used to log security related messages
   */
  @Deprecated
  protected InternalLogWriter securityLogWriter = null;

  /**
   * Distributed System clock
   */
  private DSClock clock;

  /**
   * Time this system was created
   */
  private final long startTime;

  /**
   * Guards access to {@link #isConnected}
   */
  private final Object isConnectedMutex = new Object();

  /**
   * Is this <code>DistributedSystem</code> connected to a distributed system?
   * <p>
   * Concurrency: volatile for reads and protected by synchronization of {@link #isConnectedMutex}
   * for writes
   */
  protected volatile boolean isConnected;

  /**
   * Set to true if this distributed system is a singleton; it will always be the only member of the
   * system.
   */
  private boolean isLoner = false;

  /**
   * The sampler for this DistributedSystem.
   */
  private GemFireStatSampler sampler = null;

  /**
   * A set of listeners that are invoked when this connection to the distributed system is
   * disconnected
   */
  private final Set<DisconnectListener> disconnectListeners = new LinkedHashSet<>(); // needs to be
                                                                                     // ordered

  /**
   * Set of listeners that are invoked whenever a connection is created to the distributed system
   */
  // needs to be ordered
  @MakeNotStatic
  private static final Set<ConnectListener> connectListeners = new LinkedHashSet<>();

  /**
   * auto-reconnect listeners
   */
  @MakeNotStatic
  private static final List<ReconnectListener> reconnectListeners = new ArrayList<>();

  /**
   * whether this DS is one created to reconnect to the distributed system after a
   * forced-disconnect. This state is cleared once reconnect is successful.
   */
  private boolean isReconnectingDS;

  /**
   * During a reconnect attempt this is used to perform quorum checks before allowing a location
   * service to be started up in this JVM. If quorum checks fail then we delay starting location
   * services until a live locator can be contacted.
   */
  private QuorumChecker quorumChecker;


  /**
   * Due to Bug 38407, be careful about moving this to another class.
   */
  public static final String SHUTDOWN_HOOK_NAME = "Distributed system shutdown hook";
  /**
   * A property to prevent shutdown hooks from being registered with the VM. This is regarding bug
   * 38407
   */
  public static final String DISABLE_SHUTDOWN_HOOK_PROPERTY =
      GEMFIRE_PREFIX + "disableShutdownHook";

  /**
   * A property to append to existing log-file instead of truncating it.
   */
  private static final String APPEND_TO_LOG_FILE = GEMFIRE_PREFIX + "append-log";

  //////////////////// Configuration Fields ////////////////////

  /**
   * The config object used to create this distributed system
   */
  private final DistributionConfig originalConfig;

  private final boolean statsDisabled =
      Boolean.getBoolean(GEMFIRE_PREFIX + "statsDisabled");

  /**
   * The config object to which most configuration work is delegated
   */
  private DistributionConfig config;

  private volatile boolean shareSockets;

  /**
   * if this distributed system starts a locator, it is stored here
   */
  private InternalLocator startedLocator;

  private final List<ResourceEventsListener> resourceListeners = new CopyOnWriteArrayList<>();

  private final boolean disableManagement = Boolean.getBoolean(DISABLE_MANAGEMENT_PROPERTY);

  /**
   * Stack trace showing the creation of this instance of InternalDistributedSystem.
   */
  private final Throwable creationStack;

  private volatile SecurityService securityService;

  /**
   * Used at client side, indicates whether the 'delta-propagation' property is enabled on the DS
   * this client is connected to. This variable is used to decide whether to send delta bytes or
   * full value to the server for a delta-update operation.
   */
  private boolean deltaEnabledOnServer = true;

  private final AlertingSession alertingSession;
  private final AlertingService alertingService;

  private final LoggingSession loggingSession;
  private final Set<LogConfigListener> logConfigListeners = new HashSet<>();

  public boolean isDeltaEnabledOnServer() {
    return deltaEnabledOnServer;
  }

  public void setDeltaEnabledOnServer(boolean deltaEnabledOnServer) {
    this.deltaEnabledOnServer = deltaEnabledOnServer;
  }

  public static void removeSystem(InternalDistributedSystem oldSystem) {
    DistributedSystem.removeSystem(oldSystem);
  }

  /**
   * Returns a connection to the distributed system that is suitable for administration. For
   * administration, we are not as strict when it comes to existing connections.
   *
   * @since GemFire 4.0
   */
  public static DistributedSystem connectForAdmin(Properties props) {
    return DistributedSystem.connectForAdmin(props);
  }

  /**
   * Returns a connected distributed system for this VM, or null if there is no connected
   * distributed system in this VM. This method synchronizes on the existingSystems collection.
   * <p>
   * <p>
   * author bruce
   *
   * @since GemFire 5.0
   */
  public static InternalDistributedSystem getConnectedInstance() {
    InternalDistributedSystem result = null;
    synchronized (existingSystemsLock) {
      if (!existingSystems.isEmpty()) {
        InternalDistributedSystem existingSystem = existingSystems.get(0);
        if (existingSystem.isConnected()) {
          result = existingSystem;
        }
      }
    }
    return result;
  }

  /**
   * Returns the current distributed system, if there is one. Note: this method is no longer unsafe
   * size existingSystems uses copy-on-write.
   * <p>
   * author bruce
   *
   * @since GemFire 5.0
   */
  public static InternalDistributedSystem unsafeGetConnectedInstance() {
    InternalDistributedSystem result = getAnyInstance();
    if (result != null) {
      if (!result.isConnected()) {
        result = null;
      }
    }
    return result;
  }

  /**
   * @return distribution stats, or null if there is no distributed system available
   */
  public static DMStats getDMStats() {
    InternalDistributedSystem sys = getAnyInstance();
    if (sys != null && sys.dm != null) {
      return sys.dm.getStats();
    }
    return null;
  }

  /**
   * @return a log writer, or null if there is no distributed system available
   */
  @Deprecated
  public static LogWriter getLogger() {
    InternalDistributedSystem sys = getAnyInstance();
    if (sys != null && sys.logWriter != null) {
      return sys.logWriter;
    }
    return null;
  }

  @Deprecated
  public static InternalLogWriter getStaticInternalLogWriter() {
    InternalDistributedSystem sys = getAnyInstance();
    if (sys != null) {
      return sys.logWriter;
    }
    return null;
  }

  @Deprecated
  public InternalLogWriter getInternalLogWriter() {
    return logWriter;
  }

  @Deprecated
  public InternalLogWriter getSecurityInternalLogWriter() {
    InternalDistributedSystem sys = getAnyInstance();
    if (sys != null) {
      return sys.securityLogWriter;
    }
    return null;
  }

  /**
   * reset the reconnectAttempt counter for a new go at reconnecting
   */
  private static void resetReconnectAttemptCounter() {
    reconnectAttemptCounter.set(0);
  }

  /**
   * Creates a new {@code InternalDistributedSystem} with the given configuration.
   *
   * <p>
   * See {@link #connect} for a list of exceptions that may be thrown.
   *
   * @param config the configuration for the connection
   * @param statisticsManagerFactory creates the statistics manager for this member
   */
  private InternalDistributedSystem(ConnectionConfig config,
      StatisticsManagerFactory statisticsManagerFactory) {
    alertingSession = AlertingSession.create();
    alertingService = new AlertingService();
    loggingSession = LoggingSession.create();

    // register DSFID types first; invoked explicitly so that all message type
    // initializations do not happen in first deserialization on a possibly
    // "precious" thread
    DSFIDFactory.registerTypes();

    originalConfig = config.distributionConfig();
    isReconnectingDS = config.isReconnecting();
    quorumChecker = config.quorumChecker();

    ((DistributionConfigImpl) originalConfig).checkForDisallowedDefaults(); // throws
                                                                            // IllegalStateEx
    shareSockets = originalConfig.getConserveSockets();
    startTime = System.currentTimeMillis();
    grc = new GrantorRequestProcessor.GrantorRequestContext(stopper);

    creationStack =
        TEST_CREATION_STACK_GENERATOR.get().generateCreationStack(originalConfig);

    statisticsManager =
        statisticsManagerFactory.create(originalConfig.getName(), startTime, statsDisabled);
  }

  public SecurityService getSecurityService() {
    return securityService;
  }

  public void setSecurityService(SecurityService securityService) {
    this.securityService = securityService;
  }

  /**
   * Registers a listener to the system
   *
   * @param listener listener to be added
   */
  public void addResourceListener(ResourceEventsListener listener) {
    resourceListeners.add(listener);
  }

  /**
   * Un-Registers a listener to the system
   *
   * @param listener listener to be removed
   */
  public void removeResourceListener(ResourceEventsListener listener) {
    resourceListeners.remove(listener);
  }

  /**
   * @return the listeners registered with the system
   */
  public List<ResourceEventsListener> getResourceListeners() {
    return resourceListeners;
  }

  /**
   * Handles a particular event associated with a resource
   *
   * @param event Resource event
   * @param resource resource on which event is generated
   */
  public void handleResourceEvent(ResourceEvent event, Object resource) {
    if (disableManagement) {
      return;
    }
    if (resourceListeners.size() == 0) {
      return;
    }
    notifyResourceEventListeners(event, resource);
  }

  /**
   * Returns true if system is a loner (for testing)
   */
  public boolean isLoner() {
    return isLoner;
  }

  private MemoryAllocator offHeapStore = null;

  public MemoryAllocator getOffHeapStore() {
    return offHeapStore;
  }

  /**
   * Initialize any services that provided as extensions to the cache using the service loader
   * mechanism.
   */
  private void initializeServices() {
    ServiceLoader<DistributedSystemService> loader =
        ServiceLoader.load(DistributedSystemService.class);
    for (DistributedSystemService service : loader) {
      service.init(this);
      services.put(service.getInterface(), service);
    }
  }


  /**
   * Initializes this connection to a distributed system with the current configuration state.
   */
  private void initialize(SecurityManager securityManager, PostProcessor postProcessor) {
    if (originalConfig.getLocators().equals("")) {
      if (originalConfig.getMcastPort() != 0) {
        throw new GemFireConfigException("The " + LOCATORS + " attribute can not be empty when the "
            + MCAST_PORT + " attribute is non-zero.");
      } else {
        // no distribution
        isLoner = true;
      }
    }

    config = new RuntimeDistributionConfigImpl(this);

    securityService = SecurityServiceFactory.create(
        config.getSecurityProps(),
        securityManager, postProcessor);

    if (!isLoner) {
      attemptingToReconnect = (reconnectAttemptCounter.get() > 0);
    }
    try {
      SocketCreatorFactory.setDistributionConfig(config);

      boolean logBanner = !attemptingToReconnect;
      boolean logConfiguration = !attemptingToReconnect;
      loggingSession.createSession(this, logBanner, logConfiguration);

      // LOG: create LogWriterLogger(s) for backwards compatibility of getLogWriter and
      // getSecurityLogWriter
      if (logWriter == null) {
        logWriter =
            LogWriterFactory.createLogWriterLogger(config, false);
        logWriter.fine("LogWriter is created.");
      }

      // logWriter.info("Created log writer for IDS@"+System.identityHashCode(this));

      if (securityLogWriter == null) {
        // LOG: whole new LogWriterLogger instance for security
        securityLogWriter =
            LogWriterFactory.createLogWriterLogger(config, true);
        securityLogWriter.fine("SecurityLogWriter is created.");
      }

      loggingSession.startSession();

      clock = new DSClock(isLoner);

      if (attemptingToReconnect && logger.isDebugEnabled()) {
        logger.debug(
            "This thread is initializing a new DistributedSystem in order to reconnect to other members");
      }
      // Note we need loners to load the license in case they are a
      // cache server and will need to enforce the member limit
      if (Boolean.getBoolean(InternalLocator.FORCE_LOCATOR_DM_TYPE)) {
        locatorDMTypeForced = true;
      }

      initializeServices();
      InternalDataSerializer.initialize(config, services.values());

      // Initialize the Diffie-Hellman and public/private keys
      try {
        EncryptorImpl.initCertsMap(config.getSecurityProps());
        EncryptorImpl.initPrivateKey(config.getSecurityProps());
        EncryptorImpl.initDHKeys(config);
      } catch (Exception ex) {
        throw new GemFireSecurityException(
            "Problem in initializing keys for client authentication",
            ex);
      }

      final long offHeapMemorySize =
          OffHeapStorage.parseOffHeapMemorySize(getConfig().getOffHeapMemorySize());

      offHeapStore = OffHeapStorage.createOffHeapStorage(this, offHeapMemorySize, this);

      // Note: this can only happen on a linux system
      if (getConfig().getLockMemory()) {
        // This calculation is not exact, but seems fairly close. So far we have
        // not loaded much into the heap and the current RSS usage is already
        // included the available memory calculation.
        long avail = LinuxProcFsStatistics.getAvailableMemory(logger);
        long size = offHeapMemorySize + Runtime.getRuntime().totalMemory();
        if (avail < size) {
          if (ALLOW_MEMORY_LOCK_WHEN_OVERCOMMITTED) {
            logger.warn(
                "System memory appears to be over committed by {} bytes.  You may experience instability, performance issues, or terminated processes due to the Linux OOM killer.",
                size - avail);
          } else {
            throw new IllegalStateException(
                String.format(
                    "Insufficient free memory (%s) when attempting to lock %s bytes.  Either reduce the amount of heap or off-heap memory requested or free up additional system memory.  You may also specify -Dgemfire.Cache.ALLOW_MEMORY_OVERCOMMIT=true on the command-line to override the constraint check.",
                    avail, size));
          }
        }

        logger.info("Locking memory. This may take a while...");
        GemFireCacheImpl.lockMemory();
        logger.info("Finished locking memory.");
      }

      try {
        startInitLocator();
      } catch (InterruptedException e) {
        throw new SystemConnectException("Startup has been interrupted", e);
      }

      synchronized (isConnectedMutex) {
        isConnected = true;
      }

      if (!isLoner) {
        try {
          if (quorumChecker != null) {
            quorumChecker.suspend();
          }
          dm = ClusterDistributionManager.create(this);
          // fix bug #46324
          if (InternalLocator.hasLocator()) {
            InternalLocator locator = InternalLocator.getLocator();
            getDistributionManager().addHostedLocators(getDistributedMember(),
                InternalLocator.getLocatorStrings(), locator.isSharedConfigurationEnabled());
          }
        } finally {
          if (dm == null && quorumChecker != null) {
            quorumChecker.resume();
          }
          setDisconnected();
        }
      } else {
        dm = new LonerDistributionManager(this, logWriter);
      }

      Assert.assertTrue(dm != null);
      Assert.assertTrue(dm.getSystem() == this);

      try {
        id = dm.getMembershipPort();
      } catch (DistributedSystemDisconnectedException e) {
        // bug #48144 - The dm's channel threw an NPE. It now throws this exception
        // but during startup we should instead throw a SystemConnectException
        throw new SystemConnectException(
            "Distributed system has disconnected during startup.",
            e);
      }

      synchronized (isConnectedMutex) {
        isConnected = true;
      }
      if (attemptingToReconnect && (startedLocator == null)) {
        try {
          startInitLocator();
        } catch (InterruptedException e) {
          throw new SystemConnectException("Startup has been interrupted", e);
        }
      }
      try {
        endInitLocator();
      } catch (IOException e) {
        throw new GemFireIOException("Problem finishing a locator service start", e);
      }

      startSampler();

      alertingSession.createSession(new AlertMessaging(this));
      alertingSession.startSession();

      // Log any instantiators that were registered before the log writer
      // was created
      InternalInstantiator.logInstantiators();
    } catch (RuntimeException ex) {
      config.close();
      throw ex;
    }

    reconnected = attemptingToReconnect;
    attemptingToReconnect = false;
    reconnectAttemptCounter.set(0);
  }

  private void startSampler() {
    if (statsDisabled) {
      return;
    }
    sampler = loggingSession.getLogFile()
        .map(logFile -> new GemFireStatSampler(this, logFile))
        .orElseGet(() -> new GemFireStatSampler(this));
    sampler.start();
  }

  /**
   * @since GemFire 5.7
   */
  private void startInitLocator() throws InterruptedException {
    String locatorString = originalConfig.getStartLocator();
    if (locatorString.length() == 0) {
      return;
    }

    // when reconnecting we don't want to join with a colocated locator unless
    // there is a quorum of the old members available
    if (attemptingToReconnect && !isConnected) {
      if (quorumChecker != null) {
        logger.info("performing a quorum check to see if location services can be started early");
        if (!quorumChecker.checkForQuorum(3L * config.getMemberTimeout())) {
          logger.info("quorum check failed - not allowing location services to start early");
          return;
        }
        logger.info("Quorum check passed - allowing location services to start early");
      }
    }
    DistributionLocatorId locId = new DistributionLocatorId(locatorString);
    try {
      startedLocator =
          InternalLocator.createLocator(locId.getPort(), NullLoggingSession.create(), null,
              logWriter, securityLogWriter, locId.getHost().getAddress(),
              locId.getHostnameForClients(), originalConfig.toProperties(), false);

      // if locator is started this way, cluster config is not enabled, set the flag correctly
      startedLocator.getConfig().setEnableClusterConfiguration(false);

      boolean startedPeerLocation = false;
      try {
        startedLocator.startPeerLocation();
        startedPeerLocation = true;
      } finally {
        if (!startedPeerLocation) {
          startedLocator.stop();
        }
      }
    } catch (IOException e) {
      throw new GemFireIOException(
          "Problem starting a locator service",
          e);
    }
  }

  /**
   * @since GemFire 5.7
   */
  private void endInitLocator() throws IOException {
    InternalLocator loc = startedLocator;
    if (loc != null) {
      boolean finished = false;
      try {
        loc.startServerLocation(this);
        loc.endStartLocator(this);
        finished = true;
      } finally {
        if (!finished) {
          loc.stop();
        }
      }
    }
  }

  /**
   * record a locator as a dependent of this distributed system
   */
  void setDependentLocator(InternalLocator theLocator) {
    startedLocator = theLocator;
  }

  /**
   * Used by DistributionManager to fix bug 33362
   */
  void setDM(DistributionManager dm) {
    this.dm = dm;
  }

  /**
   * Checks whether or not this connection to a distributed system is closed.
   *
   * @throws DistributedSystemDisconnectedException This connection has been
   *         {@link #disconnect(boolean, String, boolean) disconnected}
   */
  private void checkConnected() {
    if (!isConnected()) {
      throw new DistributedSystemDisconnectedException(
          "This connection to a distributed system has been disconnected.",
          dm.getRootCause());
    }
  }

  @Override
  public boolean isConnected() {
    if (dm == null) {
      return false;
    }
    if (dm.getCancelCriterion().isCancelInProgress()) {
      return false;
    }
    if (isDisconnecting) {
      return false;
    }
    return isConnected;
  }

  /*
   * This method was introduced so we can deterministically query whether the distributed
   * system has fully disconnected or not. The isConnected() method will return false if a
   * disconnection/cancellation is in progress, so it does not provide a reliable way to
   * query if the distributed system is fully disconnected or not.
   */
  public boolean isDisconnected() {
    return !isConnected;
  }

  public StatisticsManager getStatisticsManager() {
    return statisticsManager;
  }

  @Override
  public StatisticDescriptor createIntCounter(String name,
      String description,
      String units) {
    return createLongCounter(name, description, units);
  }

  @Override
  public StatisticDescriptor createLongCounter(String name,
      String description,
      String units) {
    return statisticsManager.createLongCounter(name, description, units);
  }

  @Override
  public StatisticDescriptor createDoubleCounter(String name,
      String description,
      String units) {
    return statisticsManager.createDoubleCounter(name, description, units);
  }

  @Override
  public StatisticDescriptor createIntGauge(String name,
      String description,
      String units) {
    return createLongGauge(name, description, units);
  }

  @Override
  public StatisticDescriptor createLongGauge(String name,
      String description,
      String units) {
    return statisticsManager.createLongGauge(name, description, units);
  }

  @Override
  public StatisticDescriptor createDoubleGauge(String name,
      String description,
      String units) {
    return statisticsManager.createDoubleGauge(name, description, units);
  }

  @Override
  public StatisticDescriptor createIntCounter(String name,
      String description,
      String units, boolean largerBetter) {
    return createLongCounter(name, description, units, largerBetter);
  }

  @Override
  public StatisticDescriptor createLongCounter(String name,
      String description,
      String units, boolean largerBetter) {
    return statisticsManager.createLongCounter(name, description, units, largerBetter);
  }

  @Override
  public StatisticDescriptor createDoubleCounter(String name,
      String description,
      String units,
      boolean largerBetter) {
    return statisticsManager.createDoubleCounter(name, description, units, largerBetter);
  }

  @Override
  public StatisticDescriptor createIntGauge(String name,
      String description,
      String units, boolean largerBetter) {
    return createLongGauge(name, description, units, largerBetter);
  }

  @Override
  public StatisticDescriptor createLongGauge(String name,
      String description,
      String units, boolean largerBetter) {
    return statisticsManager.createLongGauge(name, description, units, largerBetter);
  }

  @Override
  public StatisticDescriptor createDoubleGauge(String name,
      String description,
      String units, boolean largerBetter) {
    return statisticsManager.createDoubleGauge(name, description, units, largerBetter);
  }

  @Override
  public StatisticsType createType(String name, String description,
      StatisticDescriptor[] stats) {
    return statisticsManager.createType(name, description, stats);
  }

  @Override
  public StatisticsType findType(String name) {
    return statisticsManager.findType(name);
  }

  @Override
  public StatisticsType[] createTypesFromXml(Reader reader)
      throws IOException {
    return statisticsManager.createTypesFromXml(reader);
  }

  @Override
  public Statistics createStatistics(StatisticsType type) {
    return statisticsManager.createStatistics(type);
  }

  @Override
  public Statistics createStatistics(StatisticsType type,
      String textId) {
    return statisticsManager.createStatistics(type, textId);
  }

  @Override
  public Statistics createStatistics(StatisticsType type,
      String textId, long numericId) {
    return statisticsManager.createStatistics(type, textId, numericId);
  }

  @Override
  public Statistics createAtomicStatistics(StatisticsType type) {
    return statisticsManager.createAtomicStatistics(type);
  }

  @Override
  public Statistics createAtomicStatistics(StatisticsType type,
      String textId) {
    return statisticsManager.createAtomicStatistics(type, textId);
  }

  @Override
  public Statistics createAtomicStatistics(StatisticsType type,
      String textId, long numericId) {
    return statisticsManager.createAtomicStatistics(type, textId, numericId);
  }

  @Override
  public Statistics[] findStatisticsByType(StatisticsType type) {
    return statisticsManager.findStatisticsByType(type);
  }

  @Override
  public Statistics[] findStatisticsByTextId(String textId) {
    return statisticsManager.findStatisticsByTextId(textId);
  }

  @Override
  public Statistics[] findStatisticsByNumericId(long numericId) {
    return statisticsManager.findStatisticsByNumericId(numericId);
  }

  @Override
  public String getName() {
    return getOriginalConfig().getName();
  }

  @Override
  public long getId() {
    return id;
  }

  public long getStartTime() {
    return startTime;
  }

  /**
   * This class defers to the DM. If we don't have a DM, we're dead.
   */
  protected class Stopper extends CancelCriterion {

    @Override
    public String cancelInProgress() {
      checkFailure();
      if (dm == null) {
        return "No dm";
      }
      return dm.getCancelCriterion().cancelInProgress();
    }

    @Override
    public RuntimeException generateCancelledException(Throwable e) {
      if (dm == null) {
        return new DistributedSystemDisconnectedException("no dm", e);
      }
      return dm.getCancelCriterion().generateCancelledException(e);
    }
  }

  /**
   * Handles all cancellation queries for this distributed system
   */
  private final Stopper stopper = new Stopper();

  @Override
  public CancelCriterion getCancelCriterion() {
    return stopper;
  }

  public boolean isDisconnecting() {
    if (dm == null) {
      return true;
    }
    if (dm.getCancelCriterion().isCancelInProgress()) {
      return true;
    }
    if (!isConnected) {
      return true;
    }
    return isDisconnecting;
  }

  @Override
  public LogWriter getLogWriter() {
    return logWriter;
  }

  public DSClock getClock() {
    return clock;
  }

  @Override
  public LogWriter getSecurityLogWriter() {
    return securityLogWriter;
  }

  /**
   * Returns the stat sampler
   */
  public GemFireStatSampler getStatSampler() {
    return sampler;
  }

  /**
   * Has this system started the disconnect process?
   */
  protected volatile boolean isDisconnecting = false;

  /**
   * Disconnects this VM from the distributed system. Shuts down the distribution manager, and if
   * necessary,
   */
  @Override
  public void disconnect() {
    disconnect(false,
        "normal disconnect", false);
  }

  /**
   * Disconnects this member from the distributed system when an internal error has caused
   * distribution to fail (e.g., this member was shunned)
   *
   * @param reason a string describing why the disconnect is occurring
   * @param shunned whether this member was shunned by the membership coordinator
   */
  public void disconnect(String reason, boolean shunned) {
    boolean isForcedDisconnect = dm.getRootCause() instanceof ForcedDisconnectException;
    boolean rejoined = false;
    reconnected = false;
    if (isForcedDisconnect && !isReconnectingDS) {
      forcedDisconnect = true;
      resetReconnectAttemptCounter();
      rejoined = tryReconnect(true, reason, GemFireCacheImpl.getInstance());
    }
    if (!rejoined) {
      disconnect(false, reason, shunned);
    }
  }

  /**
   * This is how much time, in milliseconds to allow a disconnect listener to run before we
   * interrupt it.
   */
  private static final long MAX_DISCONNECT_WAIT =
      Long.getLong("DistributionManager.DISCONNECT_WAIT", 10 * 1000);

  /**
   * Run a disconnect listener, checking for errors and honoring the timeout
   * {@link #MAX_DISCONNECT_WAIT}.
   *
   * @param dc the listener to run
   */
  private void runDisconnect(final DisconnectListener dc) {
    // Create a general handler for running the disconnect
    // Launch it and wait a little bit
    Thread t = new LoggingThread(dc.toString(), false, () -> {
      try {
        isDisconnectThread.set(Boolean.TRUE);
        dc.onDisconnect(InternalDistributedSystem.this);
      } catch (CancelException e) {
        if (logger.isDebugEnabled()) {
          logger.debug("Disconnect listener <{}> thwarted by cancellation: {}", dc, e,
              traceException(e));
        }
      }
    });
    try {
      t.start();
      t.join(MAX_DISCONNECT_WAIT);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      logger.warn("Interrupted while processing disconnect listener",
          e);
    }

    // Make sure the listener gets the cue to die
    if (t.isAlive()) {
      logger.warn("Disconnect listener still running: {}", dc);
      t.interrupt();

      try {
        t.join(MAX_DISCONNECT_WAIT);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }

      if (t.isAlive()) {
        logger.warn("Disconnect listener ignored its interrupt: {}",
            dc);
      }
    }

  }

  private Exception traceException(CancelException e) {
    return logger.isTraceEnabled() ? e : null;
  }

  public boolean isDisconnectThread() {
    return isDisconnectThread.get();
  }

  public void setIsDisconnectThread() {
    isDisconnectThread.set(Boolean.TRUE);
  }

  /**
   * Run a disconnect listener in the same thread sequence as the reconnect.
   *
   * @param dc the listener to run
   */

  private void runDisconnectForReconnect(final DisconnectListener dc) {
    try {
      dc.onDisconnect(this);
    } catch (DistributedSystemDisconnectedException e) {
      if (logger.isDebugEnabled()) {
        logger.debug("Disconnect listener <{}> thwarted by shutdown: {}", dc, e, traceException(e));
      }
    }
  }

  /**
   * Disconnect cache, run disconnect listeners.
   *
   * @param doReconnect whether a reconnect will be done
   * @return a collection of shutdownListeners
   */
  private HashSet<ShutdownListener> doDisconnects(boolean doReconnect) {
    // Make a pass over the disconnect listeners, asking them _politely_
    // to clean up.
    HashSet<ShutdownListener> shutdownListeners = new HashSet<>();
    for (;;) {
      DisconnectListener listener;
      synchronized (disconnectListeners) {
        Iterator<DisconnectListener> itr = disconnectListeners.iterator();
        if (!itr.hasNext()) {
          return shutdownListeners;
        }
        listener = itr.next();
        if (listener instanceof ShutdownListener) {
          shutdownListeners.add((ShutdownListener) listener);
        }
        itr.remove();
      } // synchronized

      if (doReconnect) {
        runDisconnectForReconnect(listener);
      } else {
        runDisconnect(listener);
      }
    } // for
  }

  /**
   * Process the shutdown listeners. It is essential that the DM has been shut down before calling
   * this step, to ensure that no new listeners are registering.
   *
   * @param shutdownListeners shutdown listeners initially registered with us
   */
  private void doShutdownListeners(HashSet<ShutdownListener> shutdownListeners) {
    if (shutdownListeners == null) {
      return;
    }

    // Process any shutdown listeners we reaped during first pass
    for (ShutdownListener shutdownListener : shutdownListeners) {
      try {
        shutdownListener.onShutdown(this);
      } catch (VirtualMachineError err) {
        SystemFailure.initiateFailure(err);
        // If this ever returns, rethrow the error. We're poisoned
        // now, so don't let this thread continue.
        throw err;
      } catch (Throwable t) {
        // Whenever you catch Error or Throwable, you must also
        // catch VirtualMachineError (see above). However, there is
        // _still_ a possibility that you are dealing with a cascading
        // error condition, so you also need to check to see if the JVM
        // is still usable:
        SystemFailure.checkFailure();
        // things could break since we continue, but we want to disconnect!
        logger.fatal(String.format("ShutdownListener < %s > threw...", shutdownListener), t);
      }
    }

    // During the window while we were running disconnect listeners, new
    // disconnect listeners may have appeared. After messagingDisabled is
    // set, no new ones will be created. However, we must process any
    // that appeared in the interim.
    for (;;) {
      // Pluck next listener from the list
      DisconnectListener dcListener;
      ShutdownListener sdListener = null;
      synchronized (disconnectListeners) {
        Iterator<DisconnectListener> itr = disconnectListeners.iterator();
        if (!itr.hasNext()) {
          break;
        }
        dcListener = itr.next();
        itr.remove();
        if (dcListener instanceof ShutdownListener) {
          sdListener = (ShutdownListener) dcListener;
        }
      }

      runDisconnect(dcListener);

      // Run the shutdown, if any
      if (sdListener != null) {
        try {
          sdListener.onShutdown(this);
        } catch (VirtualMachineError err) {
          SystemFailure.initiateFailure(err);
          // If this ever returns, rethrow the error. We're poisoned
          // now, so don't let this thread continue.
          throw err;
        } catch (Throwable t) {
          // Whenever you catch Error or Throwable, you must also
          // catch VirtualMachineError (see above). However, there is
          // _still_ a possibility that you are dealing with a cascading
          // error condition, so you also need to check to see if the JVM
          // is still usable:
          SystemFailure.checkFailure();
          // things could break since we continue, but we want to disconnect!
          logger.fatal("DisconnectListener/Shutdown threw...", t);
        }
      }
    } // for
  }

  /**
   * break any potential circularity in {@link #loadEmergencyClasses()}
   */
  @MakeNotStatic
  private static volatile boolean emergencyClassesLoaded = false;

  /**
   * Ensure that the MembershipManager class gets loaded.
   *
   * @see SystemFailure#loadEmergencyClasses()
   */
  public static void loadEmergencyClasses() {
    if (emergencyClassesLoaded) {
      return;
    }
    emergencyClassesLoaded = true;
    GMSMembershipManager.loadEmergencyClasses();
  }

  /**
   * Closes the membership manager
   *
   * @see SystemFailure#emergencyClose()
   */
  public void emergencyClose() {
    if (dm != null) {
      MembershipManager mm = dm.getMembershipManager();
      if (mm != null) {
        mm.emergencyClose();
      }
    }

    // Garbage collection
    // Leave dm alone; its CancelCriterion will help people die
    isConnected = false;
    if (dm != null) {
      dm.setRootCause(SystemFailure.getFailure());
    }
    isDisconnecting = true;
    disconnectListeners.clear();
  }

  private void setDisconnected() {
    synchronized (isConnectedMutex) {
      isConnected = false;
      isConnectedMutex.notifyAll();
    }
  }

  private void waitDisconnected() {
    synchronized (isConnectedMutex) {
      while (isConnected) {
        boolean interrupted = Thread.interrupted();
        try {
          isConnectedMutex.wait();
        } catch (InterruptedException e) {
          interrupted = true;
          getLogWriter()
              .warning("Disconnect wait interrupted", e);
        } finally {
          if (interrupted) {
            Thread.currentThread().interrupt();
          }
        }
      } // while
    }
  }

  /**
   * Disconnects this VM from the distributed system. Shuts down the distribution manager.
   *
   * @param preparingForReconnect true if called by a reconnect operation
   * @param reason the reason the disconnect is being performed
   * @param keepAlive true if user requested durable subscriptions are to be retained at server.
   */
  protected void disconnect(boolean preparingForReconnect, String reason, boolean keepAlive) {
    boolean isShutdownHook = (shutdownHook != null) && (Thread.currentThread() == shutdownHook);

    if (!preparingForReconnect) {
      // logger.info("disconnecting IDS@"+System.identityHashCode(this));
      synchronized (reconnectListeners) {
        reconnectListeners.clear();
      }
      cancelReconnect();
    }

    final boolean isDebugEnabled = logger.isDebugEnabled();
    try {
      HashSet<ShutdownListener> shutdownListeners = null;
      try {
        if (isDebugEnabled) {
          logger.debug("DistributedSystem.disconnect invoked on {}", this);
        }
        synchronized (GemFireCacheImpl.class) {
          // bug 36955, 37014: don't use a disconnect listener on the cache;
          // it takes too long.
          //
          // However, make sure cache is completely closed before starting
          // the distributed system close.
          InternalCache currentCache = getCache();
          if (currentCache != null && !currentCache.isClosed()) {
            isDisconnectThread.set(Boolean.TRUE); // bug #42663 - this must be set while
                                                  // closing the cache
            try {
              currentCache.close(reason, dm.getRootCause(), keepAlive, true); // fix for 42150
            } catch (VirtualMachineError e) {
              SystemFailure.initiateFailure(e);
              throw e;
            } catch (Throwable e) {
              SystemFailure.checkFailure();
              // Whenever you catch Error or Throwable, you must also
              // check for fatal JVM error (see above). However, there is
              logger.warn(
                  "Exception trying to close cache",
                  e);
            } finally {
              isDisconnectThread.set(Boolean.FALSE);
            }
          }

          // While still holding the lock, make sure this instance is
          // marked as shutting down
          synchronized (this) {
            if (isDisconnecting) {
              // It's already started, but don't return
              // to the caller until it has completed.
              waitDisconnected();
              return;
            } // isDisconnecting
            isDisconnecting = true;

            if (!preparingForReconnect) {
              // move cancelReconnect above this synchronized block fix for bug 35202
              if (reconnectDS != null) {
                // break recursion
                if (isDebugEnabled) {
                  logger.debug("disconnecting reconnected DS: {}", reconnectDS);
                }
                InternalDistributedSystem r = reconnectDS;
                reconnectDS = null;
                r.disconnect(false, null, false);
              }
            } // !reconnect
          } // synchronized (this)
        } // synchronized (GemFireCache.class)

        securityService.close();

        if (!isShutdownHook) {
          shutdownListeners = doDisconnects(attemptingToReconnect);
        }

        if (!attemptingToReconnect) {
          alertingSession.stopSession();
        }

      } finally { // be ABSOLUTELY CERTAIN that dm closed
        try {
          // Do the bulk of the close...
          dm.close();
          // we close the locator after the DM so that when split-brain detection
          // is enabled, loss of the locator doesn't cause the DM to croak
          if (startedLocator != null) {
            startedLocator.stop(forcedDisconnect, preparingForReconnect, false);
            startedLocator = null;
          }
        } finally { // timer canceled
          // bug 38501: this has to happen *after*
          // the DM is closed :-(
          if (!preparingForReconnect) {
            SystemTimer.cancelSwarm(this);
          }
        } // finally timer cancelled
      } // finally dm closed

      if (!isShutdownHook) {
        doShutdownListeners(shutdownListeners);
      }

      // closing the Aggregate stats
      if (functionServiceStats != null) {
        functionServiceStats.close();
      }
      // closing individual function stats
      for (FunctionStats functionstats : functionExecutionStatsMap.values()) {
        functionstats.close();
      }

      InternalFunctionService.unregisterAllFunctions();

      if (sampler != null) {
        sampler.stop();
        sampler = null;
      }

      if (!attemptingToReconnect) {
        loggingSession.stopSession();
      }

      EventID.unsetDS();

    } finally {
      try {
        if (getOffHeapStore() != null) {
          getOffHeapStore().close();
        }
      } finally {
        try {
          removeSystem(this);
          if (!attemptingToReconnect) {
            loggingSession.shutdown();
          }
          alertingSession.shutdown();
          // Close the config object
          config.close();
        } finally {
          // Finally, mark ourselves as disconnected
          setDisconnected();
          SystemFailure.stopThreads();
        }
      }
    }
  }

  /**
   * Returns the distribution manager for accessing this distributed system.
   */
  public DistributionManager getDistributionManager() {
    checkConnected();
    return dm;
  }

  /**
   * Returns the distribution manager without checking for connected or not so can also return null.
   */
  public DistributionManager getDM() {
    return dm;
  }

  /**
   * If this DistributedSystem is attempting to reconnect to the distributed system this will return
   * the quorum checker created by the old MembershipManager for checking to see if a quorum of old
   * members can be reached.
   *
   * @return the quorum checking service
   */
  QuorumChecker getQuorumChecker() {
    return quorumChecker;
  }

  /**
   * Returns true if this DS has been attempting to reconnect but the attempt has been cancelled.
   */
  public boolean isReconnectCancelled() {
    return reconnectCancelled;
  }

  /**
   * Returns whether or not this distributed system has the same configuration as the given set of
   * properties.
   *
   * @see DistributedSystem#connect
   */
  public boolean sameAs(Properties props, boolean isConnected) {
    return originalConfig.sameAs(DistributionConfigImpl.produce(props, isConnected));
  }

  public boolean threadOwnsResources() {
    Boolean b = ConnectionTable.getThreadOwnsResourcesRegistration();
    if (b == null) {
      // thread does not have a preference so return default
      return !shareSockets;
    } else {
      return b;
    }
  }

  /**
   * Returns whether or not the given configuration properties refer to the same distributed system
   * as this <code>InternalDistributedSystem</code> connection.
   *
   * @since GemFire 4.0
   */
  public boolean sameSystemAs(Properties props) {
    DistributionConfig other = DistributionConfigImpl.produce(props);
    DistributionConfig me = getConfig();

    if (!me.getBindAddress().equals(other.getBindAddress())) {
      return false;
    }

    // locators
    String myLocators = me.getLocators();
    String otherLocators = other.getLocators();

    // quick check
    if (myLocators.equals(otherLocators)) {
      return true;

    } else {
      myLocators = canonicalizeLocators(myLocators);
      otherLocators = canonicalizeLocators(otherLocators);

      return myLocators.equals(otherLocators);
    }
  }

  /**
   * Canonicalizes a locators string so that they may be compared.
   *
   * @since GemFire 4.0
   */
  private static String canonicalizeLocators(String locators) {
    SortedSet<String> sorted = new TreeSet<>();
    StringTokenizer st = new StringTokenizer(locators, ",");
    while (st.hasMoreTokens()) {
      String l = st.nextToken();
      StringBuilder canonical = new StringBuilder();
      DistributionLocatorId locId = new DistributionLocatorId(l);
      String addr = locId.getBindAddress();
      if (addr != null && addr.trim().length() > 0) {
        canonical.append(addr);
      } else {
        canonical.append(locId.getHostName());
      }
      canonical.append("[");
      canonical.append(locId.getPort());
      canonical.append("]");
      sorted.add(canonical.toString());
    }

    StringBuilder sb = new StringBuilder();
    for (Iterator iter = sorted.iterator(); iter.hasNext();) {
      sb.append((String) iter.next());
      if (iter.hasNext()) {
        sb.append(",");
      }
    }
    return sb.toString();
  }

  /**
   * Returns the current configuration of this distributed system.
   */
  public DistributionConfig getConfig() {
    return config;
  }

  public AlertingService getAlertingService() {
    return alertingService;
  }

  @Override
  public LogConfig getLogConfig() {
    return config;
  }

  @Override
  public StatisticsConfig getStatisticsConfig() {
    return config;
  }

  @Override
  public void addLogConfigListener(LogConfigListener logConfigListener) {
    logConfigListeners.add(logConfigListener);
  }

  @Override
  public void removeLogConfigListener(LogConfigListener logConfigListener) {
    logConfigListeners.remove(logConfigListener);
  }

  public Optional<LogFile> getLogFile() {
    return loggingSession.getLogFile();
  }

  void logConfigChanged() {
    for (LogConfigListener listener : logConfigListeners) {
      listener.configChanged();
    }
  }

  /**
   * Returns the string value of the distribution manager's id.
   */
  @Override
  public String getMemberId() {
    return String.valueOf(dm.getId());
  }

  @Override
  public InternalDistributedMember getDistributedMember() {
    return dm.getId();
  }

  @Override
  @SuppressWarnings("unchecked")
  public Set<DistributedMember> getAllOtherMembers() {
    return (Set) dm.getAllOtherMembers();
  }

  @Override
  public Set<DistributedMember> getGroupMembers(String group) {
    return dm.getGroupMembers(group);
  }


  @Override
  public Set<DistributedMember> findDistributedMembers(InetAddress address) {
    Set<InternalDistributedMember> allMembers = dm.getDistributionManagerIdsIncludingAdmin();
    Set<DistributedMember> results = new HashSet<>(2);

    // Search through the set of all members
    for (InternalDistributedMember member : allMembers) {

      Set<InetAddress> equivalentAddresses = dm.getEquivalents(member.getInetAddress());
      // Check to see if the passed in address is matches one of the addresses on
      // the given member.
      if (address.equals(member.getInetAddress()) || equivalentAddresses.contains(address)) {
        results.add(member);
      }
    }

    return results;
  }

  @Override
  public DistributedMember findDistributedMember(String name) {
    for (DistributedMember member : dm.getDistributionManagerIdsIncludingAdmin()) {
      if (member.getName().equals(name)) {
        return member;
      }
    }
    return null;
  }

  /**
   * Returns the configuration this distributed system was created with.
   */
  public DistributionConfig getOriginalConfig() {
    return originalConfig;
  }

  /////////////////////// Utility Methods ///////////////////////

  /**
   * Since {@link DistributedSystem#connect} guarantees that there is a canonical instance of
   * <code>DistributedSystem</code> for each configuration, we can use the default implementation of
   * <code>equals</code>.
   *
   * @see #sameAs
   */
  @Override
  public boolean equals(Object o) {
    return super.equals(o);
  }

  /**
   * Since we use the default implementation of {@link #equals equals}, we can use the default
   * implementation of <code>hashCode</code>.
   */
  @Override
  public int hashCode() {
    return super.hashCode();
  }

  /**
   * Returns a string describing this connection to distributed system (including highlights of its
   * configuration).
   */
  @Override
  public String toString() {
    StringBuilder sb = new StringBuilder();
    sb.append("Connected ");
    String name = getName();
    if (name != null && !name.equals("")) {
      sb.append("\"");
      sb.append(name);
      sb.append("\" ");
    }
    sb.append("(id=");
    sb.append(Integer.toHexString(System.identityHashCode(this)));
    sb.append(") ");

    sb.append("to distributed system using ");
    int port = config.getMcastPort();
    if (port != 0) {
      sb.append("multicast port ");
      sb.append(port);
      sb.append(" ");

    } else {
      sb.append("locators \"");
      sb.append(config.getLocators());
      sb.append("\" ");
    }

    File logFile = config.getLogFile();
    sb.append("logging to ");
    if (logFile == null || logFile.equals(new File(""))) {
      sb.append("standard out ");

    } else {
      sb.append(logFile);
      sb.append(" ");
    }

    sb.append(" started at ");
    sb.append((new Date(startTime)).toString());

    if (!isConnected()) {
      sb.append(" (closed)");
    }

    return sb.toString().trim();
  }

  // As the function execution stats can be lot in number, its better to put
  // them in a map so that it will be accessible immediately
  private final ConcurrentHashMap<String, FunctionStats> functionExecutionStatsMap =
      new ConcurrentHashMap<>();
  private FunctionServiceStats functionServiceStats = null;

  public FunctionStats getFunctionStats(String textId) {
    if (statsDisabled) {
      return FunctionStats.dummy;
    }
    return JavaWorkarounds.computeIfAbsent(functionExecutionStatsMap, textId,
        key -> new FunctionStats(this, key));
  }


  public synchronized FunctionServiceStats getFunctionServiceStats() {
    if (functionServiceStats == null) {
      functionServiceStats = new FunctionServiceStats(this, "FunctionExecution");
    }
    return functionServiceStats;
  }


  /**
   * Makes note of a <code>ConnectListener</code> whose <code>onConnect</code> method will be
   * invoked when a connection is created to a distributed system.
   *
   * @return set of currently existing system connections
   */
  public static List addConnectListener(ConnectListener listener) {
    synchronized (existingSystemsLock) {
      synchronized (connectListeners) {
        connectListeners.add(listener);
        return existingSystems;
      }
    }
  }

  public static void removeConnectListener(ConnectListener listener) {
    synchronized (existingSystemsLock) {
      synchronized (connectListeners) {
        connectListeners.remove(listener);
      }
    }
  }

  /**
   * Makes note of a <code>ReconnectListener</code> whose <code>onReconnect</code> method will be
   * invoked when a connection is recreated to a distributed system during auto-reconnect.
   * <p>
   * <p>
   * The ReconnectListener set is cleared after a disconnect.
   */
  public static void addReconnectListener(ReconnectListener listener) {
    synchronized (existingSystemsLock) {
      synchronized (reconnectListeners) {
        reconnectListeners.add(listener);
      }
    }
  }

  /**
   * Notifies all registered <code>ConnectListener</code>s that a connection to a distributed system
   * has been created.
   */
  private static void notifyConnectListeners(InternalDistributedSystem sys) {
    synchronized (connectListeners) {
      for (ConnectListener listener : connectListeners) {
        try {
          listener.onConnect(sys);
        } catch (VirtualMachineError err) {
          SystemFailure.initiateFailure(err);
          // If this ever returns, rethrow the error. We're poisoned
          // now, so don't let this thread continue.
          throw err;
        } catch (Throwable t) {
          // Whenever you catch Error or Throwable, you must also
          // catch VirtualMachineError (see above). However, there is
          // _still_ a possibility that you are dealing with a cascading
          // error condition, so you also need to check to see if the JVM
          // is still usable:
          SystemFailure.checkFailure();
          sys.getLogWriter()
              .severe("ConnectListener threw...", t);
        }
      }
    }
  }

  /**
   * Notifies all registered <code>ReconnectListener</code>s that a connection to a distributed
   * system has been recreated.
   */
  private static void notifyReconnectListeners(InternalDistributedSystem oldsys,
      InternalDistributedSystem newsys, boolean starting) {
    List<ReconnectListener> listeners;
    synchronized (reconnectListeners) {
      listeners = new ArrayList<>(reconnectListeners);
    }
    for (ReconnectListener listener : listeners) {
      try {
        if (starting) {
          listener.reconnecting(oldsys);
        } else {
          listener.onReconnect(oldsys, newsys);
        }
      } catch (Throwable t) {
        Error err;
        if (t instanceof OutOfMemoryError || t instanceof UnknownError) {
          err = (Error) t;
          SystemFailure.initiateFailure(err);
          // If this ever returns, rethrow the error. We're poisoned
          // now, so don't let this thread continue.
          throw err;
        }
        // Whenever you catch Error or Throwable, you must also
        // check for fatal JVM error (see above). However, there is
        // _still_ a possibility that you are dealing with a cascading
        // error condition, so you also need to check to see if the JVM
        // is still usable:
        SystemFailure.checkFailure();
        logger.fatal("ConnectListener threw...", t);
      }
    }
  }

  /**
   * Notifies all resource event listeners. All exceptions are caught here and only a warning
   * message is printed in the log
   *
   * @param event Enumeration depicting particular resource event
   * @param resource the actual resource object.
   */
  private void notifyResourceEventListeners(ResourceEvent event, Object resource) {
    for (ResourceEventsListener listener : resourceListeners) {
      try {
        listener.handleEvent(event, resource);
      } catch (CancelException e) {
        // ignore
        logger.info("Skipping notifyResourceEventListeners for {} due to cancellation", event);
      } catch (GemFireSecurityException | ManagementException ex) {
        if (event == ResourceEvent.CACHE_CREATE) {
          throw ex;
        } else {
          logger.warn(ex.getMessage(), ex);
        }
      } catch (Exception err) {
        logger.warn(err.getMessage(), err);
      } catch (VirtualMachineError e) {
        SystemFailure.initiateFailure(e);
        throw e;
      } catch (Throwable t) {
        SystemFailure.checkFailure();
        logger.warn(t.getMessage(), t);
      }
    }
  }

  /**
   * Makes note of a <code>DisconnectListener</code> whose <code>onDisconnect</code> method will be
   * invoked when this connection to the distributed system is disconnected.
   */
  public void addDisconnectListener(DisconnectListener listener) {
    synchronized (disconnectListeners) {
      disconnectListeners.add(listener);

      boolean disconnectThreadBoolean = isDisconnectThread.get();

      if (!disconnectThreadBoolean) {
        // Don't add disconnect listener after messaging has been disabled.
        // Do this test _after_ adding the listener to narrow the window.
        // It's possible to miss it still and never invoke the listener, but
        // other shutdown conditions will presumably get flagged.
        String reason = stopper.cancelInProgress();
        if (reason != null) {
          disconnectListeners.remove(listener); // don't leave in the list!
          throw new DistributedSystemDisconnectedException(
              String.format("No listeners permitted after shutdown: %s",
                  reason),
              dm.getRootCause());
        }
      }
    } // synchronized
  }

  /**
   * Removes a <code>DisconnectListener</code> from the list of listeners that will be notified when
   * this connection to the distributed system is disconnected.
   */
  public void removeDisconnectListener(DisconnectListener listener) {
    synchronized (disconnectListeners) {
      disconnectListeners.remove(listener);
    }
  }

  /**
   * Returns any existing <code>InternalDistributedSystem</code> instance. Returns <code>null</code>
   * if no instance exists.
   */
  public static InternalDistributedSystem getAnyInstance() {
    List l = existingSystems;
    if (l.isEmpty()) {
      return null;
    } else {
      return (InternalDistributedSystem) l.get(0);
    }
  }

  /**
   * Test hook
   */
  public static List getExistingSystems() {
    return existingSystems;
  }

  @Override
  public Properties getProperties() {
    return config.toProperties();
  }

  @Override
  public Properties getSecurityProperties() {
    return config.getSecurityProps();
  }

  /**
   * Installs a shutdown hook to ensure that we are disconnected if an application VM shuts down
   * without first calling disconnect itself.
   */
  @Immutable
  public static final Thread shutdownHook;

  static {
    // Create a shutdown hook to cleanly close connection if
    // VM shuts down with an open connection.
    Thread tmp_shutdownHook = null;
    try {
      // Added for bug 38407
      if (!Boolean.getBoolean(DISABLE_SHUTDOWN_HOOK_PROPERTY)) {
        tmp_shutdownHook = new LoggingThread(SHUTDOWN_HOOK_NAME, false, () -> {
          InternalDistributedSystem ds = InternalDistributedSystem.getAnyInstance();
          setThreadsSocketPolicy(true /* conserve sockets */);
          if (ds != null && ds.isConnected()) {
            logger.info("VM is exiting - shutting down distributed system");
            DurableClientAttributes dca = ds.getDistributedMember()
                .getDurableClientAttributes();
            boolean isDurableClient = false;

            if (dca != null) {
              isDurableClient = (!(dca.getId() == null || dca.getId().isEmpty()));
            }

            ds.disconnect(false,
                "normal disconnect",
                isDurableClient/* keep alive drive from this */);
            // this was how we wanted to do it for 5.7, but there were shutdown
            // issues in PR/dlock (see bug 39287)
            // InternalDistributedSystem ids = (InternalDistributedSystem)ds;
            // if (ids.getDistributionManager() != null &&
            // ids.getDistributionManager().getMembershipManager() != null) {
            // ids.getDistributionManager().getMembershipManager()
            // .uncleanShutdown("VM is exiting", null);
            // }
          }
        });
        Runtime.getRuntime().addShutdownHook(tmp_shutdownHook);
      }
    } finally {
      shutdownHook = tmp_shutdownHook;
    }
  }
  /////////////////////// Inner Classes ///////////////////////

  /**
   * A listener that gets invoked before this connection to the distributed system is disconnected.
   */
  public interface DisconnectListener {

    /**
     * Invoked before a connection to the distributed system is disconnected.
     *
     * @param sys the the system we are disconnecting from process should take before returning.
     */
    void onDisconnect(InternalDistributedSystem sys);

  }

  /**
   * A listener that gets invoked before and after a successful auto-reconnect
   */
  public interface ReconnectListener {

    /**
     * Invoked when reconnect attempts are initiated
     *
     * @param oldSystem the old DS, which is in a partially disconnected state and cannot be used
     *        for messaging
     */
    void reconnecting(InternalDistributedSystem oldSystem);

    /**
     * Invoked after a reconnect to the distributed system
     *
     * @param oldSystem the old DS
     * @param newSystem the new DS
     */
    void onReconnect(InternalDistributedSystem oldSystem, InternalDistributedSystem newSystem);
  }

  /**
   * A listener that gets invoked after this connection to the distributed system is disconnected
   */
  public interface ShutdownListener extends DisconnectListener {

    /**
     * Invoked after the connection to the distributed system has been disconnected
     *
     */
    void onShutdown(InternalDistributedSystem sys);
  }

  /**
   * Integer representing number of tries already made to reconnect and that failed.
   */
  @MakeNotStatic
  private static final AtomicInteger reconnectAttemptCounter = new AtomicInteger();

  /**
   * Boolean indicating if DS needs to reconnect and reconnect is in progress.
   */
  private volatile boolean attemptingToReconnect = false;

  /**
   * Boolean indicating this DS joined through a reconnect attempt
   */
  private volatile boolean reconnected = false;

  /**
   * If reconnect fails due to an exception it will be in this field
   */
  private Exception reconnectException;

  /**
   * Boolean indicating that this member has been shunned by other members or a network partition
   * has occurred
   */
  private volatile boolean forcedDisconnect = false;

  /**
   * Used to keep track of the DS created by doing an reconnect on this.
   */
  private volatile InternalDistributedSystem reconnectDS;
  /**
   * Was this distributed system started with FORCE_LOCATOR_DM_TYPE=true? We need to know when
   * reconnecting.
   */
  private boolean locatorDMTypeForced;


  /**
   * Returns true if we are reconnecting the distributed system or reconnect has completed. If this
   * returns true it means that this instance of the DS is now disconnected and unusable.
   */
  @Override
  public boolean isReconnecting() {
    InternalDistributedSystem rds = reconnectDS;
    if (!attemptingToReconnect) {
      return false;
    }
    if (reconnectCancelled) {
      return false;
    }
    return (rds == null || !rds.isConnected());
  }


  /**
   * Returns true if we are reconnecting the distributed system and this instance was created for
   * one of the connection attempts. If the connection succeeds this state is cleared and this
   * method will commence to return false.
   */
  boolean isReconnectingDS() {
    return isReconnectingDS;
  }

  /**
   * returns the membership socket of the old distributed system, if available, when
   * isReconnectingDS returns true. This is used to connect the new DM to the distributed system
   * through RemoteTransportConfig.
   */
  MembershipInformation oldDSMembershipInfo() {
    if (quorumChecker != null) {
      return quorumChecker.getMembershipInfo();
    }
    return null;
  }

  /**
   * Returns true if this DS reconnected to the distributed system after a forced disconnect or loss
   * of required-roles
   */
  public boolean reconnected() {
    return reconnected;
  }

  /**
   * Returns true if this DS has been kicked out of the distributed system
   */
  public boolean forcedDisconnect() {
    return forcedDisconnect;
  }

  /**
   * If true then this DS will never reconnect.
   */
  private volatile boolean reconnectCancelled = false;

  /**
   * Make sure this instance of DS never does a reconnect. Also if reconnect is in progress cancel
   * it.
   */
  private void cancelReconnect() {
    reconnectCancelled = true;
    if (isReconnecting()) {
      synchronized (reconnectLock) { // should the synchronized be first on this and
        // then on this.reconnectLock.
        reconnectLock.notifyAll();
      }
    }
  }

  /**
   * This lock must be acquired *after* locking any GemFireCache.
   */
  private final Object reconnectLock = new Object();

  /**
   * Tries to reconnect to the distributed system on role loss if configure to reconnect.
   *
   * @param oldCache cache that has apparently failed
   */
  public boolean tryReconnect(boolean forcedDisconnect, String reason, InternalCache oldCache) {
    final boolean isDebugEnabled = logger.isDebugEnabled();
    if (isReconnectingDS && forcedDisconnect) {
      return false;
    }
    synchronized (InternalCacheBuilder.class) {
      synchronized (GemFireCacheImpl.class) {
        // bug 39329: must lock reconnectLock *after* the cache
        synchronized (reconnectLock) {
          if (!forcedDisconnect && !oldCache.isClosed()
              && oldCache.getCachePerfStats().getReliableRegionsMissing() == 0) {
            if (isDebugEnabled) {
              logger.debug("tryReconnect: No required roles are missing.");
            }
            return false;
          }

          if (isDebugEnabled) {
            logger.debug("tryReconnect: forcedDisconnect={}", forcedDisconnect);
          }
          if (forcedDisconnect) {
            if (config.getDisableAutoReconnect()) {
              if (isDebugEnabled) {
                logger.debug("tryReconnect: auto reconnect after forced disconnect is disabled");
              }
              return false;
            }
          }
          reconnect(forcedDisconnect, reason);
          return reconnectDS != null && reconnectDS.isConnected();
        } // synchronized reconnectLock
      } // synchronized cache
    } // synchronized CacheFactory.class
  }


  /**
   * Returns the value for the number of time reconnect has been tried. Test method used by DUnit.
   */
  public static int getReconnectAttemptCounter() {
    return reconnectAttemptCounter.get();
  }

  /**
   * A reconnect is tried when gemfire is configured to reconnect in case of a required role loss.
   * The reconnect will try reconnecting to the distributed system every max-time-out millseconds
   * for max-number-of-tries configured in gemfire.properties file. It uses the cache.xml file to
   * intialize the cache and create regions.
   */
  private void reconnect(boolean forcedDisconnect, String reason) {

    // Collect all the state for cache
    // Collect all the state for Regions
    // Close the cache,
    // loop trying to connect, waiting before each attempt
    //
    // If reconnecting for lost-roles the reconnected system's cache will decide
    // whether the reconnected system should stay up. After max-tries we will
    // give up.
    //
    // If reconnecting for forced-disconnect we ignore max-tries and keep attempting
    // to join the distributed system until successful

    attemptingToReconnect = true;
    InternalDistributedSystem ids = InternalDistributedSystem.getAnyInstance();
    if (ids == null) {
      ids = this;
    }

    // first save the current cache description. This is created by
    // the membership manager when forced-disconnect starts. If we're
    // reconnecting for lost roles then this will be null
    String cacheXML = null;
    List<CacheServerCreation> cacheServerCreation = null;
    Set<MeterRegistry> meterRegistries = null;

    InternalCache cache = GemFireCacheImpl.getInstance();
    if (cache != null) {
      cacheXML = cache.getCacheConfig().getCacheXMLDescription();
      cacheServerCreation = cache.getCacheConfig().getCacheServerCreation();
      meterRegistries = cache.getMeterSubregistries();
    }

    DistributionConfig oldConfig = ids.getConfig();
    Properties configProps = config.toProperties();
    configProps.putAll(config.toSecurityProperties());

    int timeOut = oldConfig.getMaxWaitTimeForReconnect();
    int memberTimeout = oldConfig.getMemberTimeout();
    // we need to make sure that a surviving member is able
    // to take over coordination before trying to auto-reconnect.
    // failure detection can take 4 member-timeout intervals
    // so we set that as a minimum. (suspect, check suspect, final check, send new view)
    final int intervalsAllowedForFailureDetection = 4;
    timeOut = Math.max(timeOut, memberTimeout * intervalsAllowedForFailureDetection);

    int maxTries = oldConfig.getMaxNumReconnectTries();

    final boolean isDebugEnabled = logger.isDebugEnabled();

    if (Thread.currentThread().getName().equals("DisconnectThread")) {
      if (isDebugEnabled) {
        logger.debug("changing thread name to ReconnectThread");
      }
      Thread.currentThread().setName("ReconnectThread");
    }

    // get the membership manager for quorum checks
    MembershipManager mbrMgr = dm.getMembershipManager();
    quorumChecker = mbrMgr.getQuorumChecker();
    if (logger.isDebugEnabled()) {
      if (quorumChecker == null) {
        logger.debug("No quorum checks will be performed during reconnect attempts");
      } else {
        logger.debug("Initialized quorum checking service: {}", quorumChecker);
      }
    }

    // LOG:CLEANUP: deal with reconnect and INHIBIT_DM_BANNER -- this should be ok now
    String appendToLogFile = System.getProperty(APPEND_TO_LOG_FILE);
    if (appendToLogFile == null) {
      System.setProperty(APPEND_TO_LOG_FILE, "true");
    }
    String inhibitBanner = System.getProperty(InternalLocator.INHIBIT_DM_BANNER);
    if (inhibitBanner == null) {
      System.setProperty(InternalLocator.INHIBIT_DM_BANNER, "true");
    }
    if (forcedDisconnect) {
      systemAttemptingReconnect = this;
    }
    try {
      while (reconnectDS == null || !reconnectDS.isConnected()) {
        if (isReconnectCancelled()) {
          break;
        }

        if (!forcedDisconnect) {
          if (isDebugEnabled) {
            logger.debug("Max number of tries : {} and max time out : {}", maxTries, timeOut);
          }
          if (reconnectAttemptCounter.get() >= maxTries) {
            if (isDebugEnabled) {
              logger.debug(
                  "Stopping the checkrequiredrole thread because reconnect : {} reached the max number of reconnect tries : {}",
                  reconnectAttemptCounter, maxTries);
            }
            InternalCache internalCache = dm.getCache();
            if (internalCache == null) {
              throw new CacheClosedException(
                  "Some required roles missing");
            } else {
              throw internalCache.getCacheClosedException(
                  "Some required roles missing");
            }
          }
        }

        reconnectAttemptCounter.getAndIncrement();

        if (isReconnectCancelled()) {
          return;
        }

        logger.info("Disconnecting old DistributedSystem to prepare for a reconnect attempt");

        try {
          disconnect(true, reason, false);
        } catch (Exception ee) {
          logger.warn("Exception disconnecting for reconnect", ee);
        }

        TypeRegistry.init();

        try {
          reconnectLock.wait(timeOut);
        } catch (InterruptedException e) {
          logger.warn("Waiting thread for reconnect got interrupted.");
          Thread.currentThread().interrupt();
          return;
        }

        if (isReconnectCancelled()) {
          return;
        }


        logger.info(
            "Attempting to reconnect to the distributed system.  This is attempt #{}.",
            reconnectAttemptCounter);

        int saveNumberOfTries = reconnectAttemptCounter.get();
        try {
          // notify listeners of each attempt and then again after successful
          notifyReconnectListeners(this, reconnectDS, true);

          if (locatorDMTypeForced) {
            System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
          }

          configProps.put(DistributionConfig.DS_RECONNECTING_NAME, Boolean.TRUE);
          if (quorumChecker != null) {
            configProps.put(DistributionConfig.DS_QUORUM_CHECKER_NAME, quorumChecker);
          }

          InternalDistributedSystem newDS = null;
          if (isReconnectCancelled()) {
            return;
          }

          try {

            newDS = (InternalDistributedSystem) connect(configProps);

          } catch (CancelException e) {
            if (isReconnectCancelled()) {
              return;
            } else {
              throw e;
            }
          } finally {
            if (newDS == null && quorumChecker != null) {
              // make sure the quorum checker is listening for messages from former members
              quorumChecker.resume();
            }
          }

          if (reconnectCancelled) {
            newDS.disconnect();
            continue;
          }

          reconnectDS = newDS;
        } catch (SystemConnectException e) {
          logger.debug("Attempt to reconnect failed with SystemConnectException");

          if (e.getMessage().contains("Rejecting the attempt of a member using an older version")) {
            logger.warn("Exception occurred while trying to connect the system during reconnect",
                e);
            attemptingToReconnect = false;
            reconnectException = e;
            return;
          }
          logger.warn("Caught SystemConnectException in reconnect", e);
          continue;
        } catch (GemFireConfigException e) {
          logger.warn("Caught GemFireConfigException in reconnect", e);
          continue;
        } catch (Exception e) {
          logger.warn("Exception occurred while trying to connect the system during reconnect",
              e);
          attemptingToReconnect = false;
          reconnectException = e;
          return;
        } finally {
          if (locatorDMTypeForced) {
            System.getProperties().remove(InternalLocator.FORCE_LOCATOR_DM_TYPE);
          }
          reconnectAttemptCounter.set(saveNumberOfTries);
        }


        DistributionManager newDM = reconnectDS.getDistributionManager();
        if (newDM instanceof ClusterDistributionManager) {
          // Admin systems don't carry a cache, but for others we can now create
          // a cache
          if (newDM.getDMType() != ClusterDistributionManager.ADMIN_ONLY_DM_TYPE) {
            boolean retry;
            do {
              retry = false;
              try {
                InternalCacheBuilder cacheBuilder = new InternalCacheBuilder()
                    .setCacheXMLDescription(cacheXML);
                for (MeterRegistry meterRegistry : meterRegistries) {
                  cacheBuilder.addMeterSubregistry(meterRegistry);
                }
                cache = cacheBuilder.create(reconnectDS);

                if (!cache.isClosed()) {
                  createAndStartCacheServers(cacheServerCreation, cache);
                  if (cache.getCachePerfStats().getReliableRegionsMissing() == 0) {
                    reconnectAttemptCounter.set(0);
                  }
                }

              } catch (GemFireConfigException e) {
                if (e.getCause() instanceof ClusterConfigurationNotAvailableException) {
                  retry = true;
                  logger.info("Reconnected to the cluster but the cluster configuration service "
                      + "isn't available - will retry creating the cache");
                  try {
                    Thread.sleep(5000);
                  } catch (InterruptedException e1) {
                    reconnectCancelled = true;
                    reconnectException = e;
                    break;
                  }
                }
              } catch (Exception e) {
                // We need to give up because we'll probably get the same exception in
                // the next attempt to build the cache.
                logger.warn(
                    "Exception occurred while trying to create the cache during reconnect.  Auto-reconnect is terminating.",
                    e);
                reconnectCancelled = true;
                reconnectException = e;
                break;
              }
            } while (retry);
          }
        }

        if (reconnectDS != null && reconnectDS.isConnected()) {
          // make sure the new DS and cache are stable before exiting this loop
          try {
            Thread.sleep(config.getMemberTimeout() * 3L);
          } catch (InterruptedException e) {
            logger.info("Reconnect thread has been interrupted - exiting");
            Thread.currentThread().interrupt();
            reconnectCancelled = true;
            reconnectException = e;
            return;
          }
        }
      } // while()

      if (isReconnectCancelled()) {
        if (reconnectDS != null) {
          reconnectDS.disconnect();
        }
      } else {
        reconnectDS.isReconnectingDS = false;
        if (reconnectDS.isConnected()) {
          notifyReconnectListeners(this, reconnectDS, false);
        }
      }

    } finally {
      systemAttemptingReconnect = null;
      attemptingToReconnect = false;
      if (appendToLogFile == null) {
        System.getProperties().remove(APPEND_TO_LOG_FILE);
      } else {
        System.setProperty(APPEND_TO_LOG_FILE, appendToLogFile);
      }
      if (inhibitBanner == null) {
        System.getProperties().remove(InternalLocator.INHIBIT_DM_BANNER);
      } else {
        System.setProperty(InternalLocator.INHIBIT_DM_BANNER, inhibitBanner);
      }
      dm.getMembershipManager().setReconnectCompleted(true);
      InternalDistributedSystem newds = reconnectDS;
      if (newds != null) {
        newds.getDM().getMembershipManager().setReconnectCompleted(true);
      }
      if (quorumChecker != null) {
        mbrMgr.releaseQuorumChecker(quorumChecker, reconnectDS);
      }
    }

    if (isReconnectCancelled()) {
      logger.debug("reconnect can no longer be done because of an explicit disconnect");
      if (reconnectDS != null) {
        reconnectDS.disconnect();
      }
      attemptingToReconnect = false;
    } else if (reconnectDS != null && reconnectDS.isConnected()) {
      logger.info("Reconnect completed.\nNew DistributedSystem is {}\nNew Cache is {}", reconnectDS,
          cache);
    }
  }


  /**
   * after an auto-reconnect we may need to recreate a cache server and start it
   */
  public void createAndStartCacheServers(List<CacheServerCreation> cacheServerCreation,
      InternalCache cache) {

    List<CacheServer> servers = cache.getCacheServers();

    // if there used to be a cache server but now there isn't one we need
    // to recreate it.
    if (servers.isEmpty() && cacheServerCreation != null) {
      for (CacheServerCreation bridge : cacheServerCreation) {
        CacheServerImpl impl = (CacheServerImpl) cache.addCacheServer();
        impl.configureFrom(bridge);
      }
    }

    servers = cache.getCacheServers();
    for (CacheServer server : servers) {
      try {
        if (!server.isRunning()) {
          server.start();
        }
      } catch (IOException ex) {
        throw new GemFireIOException(
            String.format("While starting cache server %s", server),
            ex);
      }
    }

  }

  /**
   * Validates that the configuration provided is the same as the configuration for this
   * InternalDistributedSystem
   *
   * @param propsToCheck the Properties instance to compare with the existing Properties
   *
   * @throws IllegalStateException when the configuration is not the same other returns
   */
  public void validateSameProperties(Properties propsToCheck, boolean isConnected) {
    if (!sameAs(propsToCheck, isConnected)) {
      StringBuilder sb = new StringBuilder();

      DistributionConfig wanted = DistributionConfigImpl.produce(propsToCheck);

      String[] validAttributeNames = originalConfig.getAttributeNames();
      for (String attName : validAttributeNames) {
        Object expectedAtt = wanted.getAttributeObject(attName);
        String expectedAttStr = expectedAtt.toString();
        Object actualAtt = originalConfig.getAttributeObject(attName);
        String actualAttStr = actualAtt.toString();
        sb.append("  ");
        sb.append(attName);
        sb.append("=\"");
        if (actualAtt.getClass().isArray()) {
          actualAttStr = arrayToString(actualAtt);
          expectedAttStr = arrayToString(expectedAtt);
        }

        sb.append(actualAttStr);
        sb.append("\"");
        if (!expectedAttStr.equals(actualAttStr)) {
          sb.append(" ***(wanted \"");
          sb.append(expectedAtt);
          sb.append("\")***");
        }

        sb.append("\n");
      }

      if (creationStack == null) {
        throw new IllegalStateException(
            String.format(
                "A connection to a distributed system already exists in this VM.  It has the following configuration:%s",
                sb.toString()));
      } else {
        throw new IllegalStateException(
            String.format(
                "A connection to a distributed system already exists in this VM.  It has the following configuration:%s",
                sb.toString()),
            creationStack);
      }
    }
  }

  public static String arrayToString(Object obj) {
    if (!obj.getClass().isArray()) {
      return "-not-array-object-";
    }
    StringBuilder buff = new StringBuilder("[");
    int arrayLength = Array.getLength(obj);
    for (int i = 0; i < arrayLength - 1; i++) {
      buff.append(Array.get(obj, i).toString());
      buff.append(",");
    }
    if (arrayLength > 0) {
      buff.append(Array.get(obj, arrayLength - 1).toString());
    }
    buff.append("]");

    return buff.toString();
  }

  public boolean isShareSockets() {
    return shareSockets;
  }

  public void setShareSockets(boolean shareSockets) {
    this.shareSockets = shareSockets;
  }

  /**
   * A listener that gets invoked whenever a connection is created to a distributed system
   */
  public interface ConnectListener {

    /**
     * Invoked after a connection to the distributed system is created
     */
    void onConnect(InternalDistributedSystem sys);
  }

  public boolean hasAlertListenerFor(DistributedMember member) {
    return hasAlertListenerFor(member, AlertLevel.WARNING.intLevel());
  }

  public boolean hasAlertListenerFor(DistributedMember member, int severity) {
    return alertingService.hasAlertListener(member, AlertLevel.find(severity));
  }

  /**
   * see {@link org.apache.geode.admin.AdminDistributedSystemFactory}
   *
   * @since GemFire 5.7
   */
  public static void setEnableAdministrationOnly(boolean adminOnly) {
    DistributedSystem.setEnableAdministrationOnly(adminOnly);
  }

  public static void setCommandLineAdmin(boolean adminOnly) {
    DistributedSystem.setEnableAdministrationOnly(adminOnly);
  }

  @Override
  public boolean waitUntilReconnected(long time, TimeUnit units) throws InterruptedException {
    int sleepTime = 1000;
    long endTime = System.currentTimeMillis();
    if (time < 0) {
      endTime = Long.MAX_VALUE;
    } else {
      endTime += TimeUnit.MILLISECONDS.convert(time, units);
    }
    synchronized (reconnectLock) {
      while (isReconnecting()) {
        if (reconnectCancelled) {
          break;
        }
        if (time != 0) {
          reconnectLock.wait(sleepTime);
        }
        if (time == 0 || System.currentTimeMillis() > endTime) {
          break;
        }
      }

      if (reconnectException != null) {
        throw new DistributedSystemDisconnectedException(
            "Reconnect attempts terminated due to exception", reconnectException);
      }
      InternalDistributedSystem recon = reconnectDS;
      return !attemptingToReconnect && recon != null && recon.isConnected();
    }
  }

  @Override
  public DistributedSystem getReconnectedSystem() {
    return reconnectDS;
  }

  @Override
  public void stopReconnecting() {
    reconnectCancelled = true;
    synchronized (reconnectLock) {
      reconnectLock.notify();
    }
    disconnect(false, "stopReconnecting was invoked", false);
    attemptingToReconnect = false;
  }

  public void stopReconnectingNoDisconnect() {
    reconnectCancelled = true;
    synchronized (reconnectLock) {
      reconnectLock.notify();
    }
    attemptingToReconnect = false;
  }

  /**
   * Provides hook for dunit to generate and store a detailed creation stack trace that includes the
   * keys/values of DistributionConfig including security related attributes without introducing
   * Privacy Violations that Fortify will complain about.
   * </p>
   */
  public interface CreationStackGenerator {

    Throwable generateCreationStack(final DistributionConfig config);
  }

  public void setCache(InternalCache instance) {
    dm.setCache(instance);
  }

  public InternalCache getCache() {
    return dm == null ? null : dm.getCache();
  }

  private static StatisticsManagerFactory defaultStatisticsManagerFactory() {
    return (name, startTime, statsDisabled) -> {
      if (statsDisabled) {
        return new DummyStatisticsRegistry(name, startTime);
      } else {
        return new StatisticsRegistry(name, startTime);
      }
    };
  }

  public static class Builder {

    private final Properties configProperties;

    private SecurityConfig securityConfig;

    public Builder(Properties configProperties) {
      this.configProperties = configProperties;
    }

    public Builder setSecurityConfig(SecurityConfig securityConfig) {
      this.securityConfig = securityConfig;
      return this;
    }

    /**
     * Builds and initializes new instance of InternalDistributedSystem.
     */
    public InternalDistributedSystem build() {
      if (securityConfig == null) {
        securityConfig = new SecurityConfig(null, null);
      }

      boolean stopThreads = true;
      InternalDataSerializer.checkSerializationVersion();
      try {
        SystemFailure.startThreads();
        InternalDistributedSystem newSystem =
            new InternalDistributedSystem(new ConnectionConfigImpl(
                configProperties), defaultStatisticsManagerFactory());
        newSystem
            .initialize(securityConfig.getSecurityManager(), securityConfig.getPostProcessor());
        notifyConnectListeners(newSystem);
        stopThreads = false;
        return newSystem;
      } finally {
        if (stopThreads) {
          SystemFailure.stopThreads();
        }
      }
    }
  }

  @VisibleForTesting
  public static class BuilderForTesting {

    private final Properties configProperties;

    private DistributionManager distributionManager;
    private StatisticsManagerFactory statisticsManagerFactory = defaultStatisticsManagerFactory();

    public BuilderForTesting(Properties configProperties) {
      this.configProperties = configProperties;
    }

    public BuilderForTesting setDistributionManager(DistributionManager distributionManager) {
      this.distributionManager = distributionManager;
      return this;
    }

    public BuilderForTesting setStatisticsManagerFactory(
        StatisticsManagerFactory statisticsManagerFactory) {
      this.statisticsManagerFactory = statisticsManagerFactory;
      return this;
    }

    /**
     * Builds instance without initializing it for testing.
     */
    public InternalDistributedSystem build() {
      ConnectionConfigImpl connectionConfig = new ConnectionConfigImpl(configProperties);

      InternalDistributedSystem internalDistributedSystem =
          new InternalDistributedSystem(connectionConfig, statisticsManagerFactory);

      internalDistributedSystem.config =
          new RuntimeDistributionConfigImpl(internalDistributedSystem);
      internalDistributedSystem.dm = distributionManager;
      internalDistributedSystem.isConnected = true;

      return internalDistributedSystem;
    }
  }
}
