/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.distributed.internal;

import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace;
import static org.apache.geode.distributed.ConfigurationProperties.BIND_ADDRESS;
import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
import static org.apache.geode.internal.admin.remote.DistributionLocatorId.asDistributionLocatorIds;

import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.Logger;

import org.apache.geode.CancelException;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.client.internal.locator.ClientConnectionRequest;
import org.apache.geode.cache.client.internal.locator.ClientReplacementRequest;
import org.apache.geode.cache.client.internal.locator.GetAllServersRequest;
import org.apache.geode.cache.client.internal.locator.LocatorListRequest;
import org.apache.geode.cache.client.internal.locator.LocatorStatusRequest;
import org.apache.geode.cache.client.internal.locator.QueueConnectionRequest;
import org.apache.geode.cache.client.internal.locator.wan.LocatorMembershipListener;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalDistributedSystem.ConnectListener;
import org.apache.geode.distributed.internal.membership.MemberFactory;
import org.apache.geode.distributed.internal.membership.QuorumChecker;
import org.apache.geode.distributed.internal.membership.gms.NetLocator;
import org.apache.geode.distributed.internal.membership.gms.locator.PeerLocatorRequest;
import org.apache.geode.distributed.internal.tcpserver.LocatorCancelException;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.distributed.internal.tcpserver.TcpServer;
import org.apache.geode.internal.GemFireVersion;
import org.apache.geode.internal.admin.remote.DistributionLocatorId;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.HttpService;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheBuilder;
import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory;
import org.apache.geode.internal.cache.wan.WANServiceProvider;
import org.apache.geode.internal.config.JAXBService;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.logging.LogConfig;
import org.apache.geode.internal.logging.LogConfigListener;
import org.apache.geode.internal.logging.LogConfigSupplier;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LogWriterFactory;
import org.apache.geode.internal.logging.LoggingSession;
import org.apache.geode.internal.logging.LoggingThread;
import org.apache.geode.internal.logging.NullLoggingSession;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.statistics.StatisticsConfig;
import org.apache.geode.management.api.ClusterManagementService;
import org.apache.geode.management.internal.AgentUtil;
import org.apache.geode.management.internal.JmxManagerLocator;
import org.apache.geode.management.internal.JmxManagerLocatorRequest;
import org.apache.geode.management.internal.api.LocatorClusterManagementService;
import org.apache.geode.management.internal.configuration.domain.SharedConfigurationStatus;
import org.apache.geode.management.internal.configuration.handlers.ClusterManagementServiceInfoRequestHandler;
import org.apache.geode.management.internal.configuration.handlers.SharedConfigurationStatusRequestHandler;
import org.apache.geode.management.internal.configuration.messages.ClusterManagementServiceInfoRequest;
import org.apache.geode.management.internal.configuration.messages.SharedConfigurationStatusRequest;
import org.apache.geode.management.internal.configuration.messages.SharedConfigurationStatusResponse;

/**
 * Provides the implementation of a distribution {@code Locator} as well as internal-only
 * functionality.
 *
 * <p>
 * This class has APIs that perform essentially three layers of services. At the bottom layer is the
 * JGroups location service. On top of that you can start a distributed system. And then on top of
 * that you can start server location services.
 *
 * <p>
 * Server Location Service DistributedSystem Peer Location Service
 *
 * <p>
 * The startLocator() methods provide a way to start all three services in one call. Otherwise, the
 * services can be started independently {@code  locator = createLocator();
 * locator.startPeerLocation(); locator.startDistributeSystem();}
 *
 * @since GemFire 4.0
 */
public class InternalLocator extends Locator implements ConnectListener, LogConfigSupplier {
  private static final Logger logger = LogService.getLogger();

  /**
   * system property name for forcing an locator distribution manager type
   */
  public static final String FORCE_LOCATOR_DM_TYPE = "Locator.forceLocatorDMType";

  /**
   * system property name for inhibiting DM banner
   */
  public static final String INHIBIT_DM_BANNER = "Locator.inhibitDMBanner";

  /**
   * system property name for forcing locators to be preferred as coordinators
   */
  public static final String LOCATORS_PREFERRED_AS_COORDINATORS =
      GEMFIRE_PREFIX + "disable-floating-coordinator";

  /**
   * the locator hosted by this JVM. As of 7.0 it is a singleton.
   *
   * GuardedBy must synchronize on locatorLock
   */
  @MakeNotStatic
  private static InternalLocator locator;

  private static final Object locatorLock = new Object();

  /**
   * The tcp server responding to locator requests
   */
  private final TcpServer server;

  /**
   * @since GemFire 5.7
   */
  private final PrimaryHandler handler;
  private final LocatorMembershipListener locatorListener;
  private final AtomicBoolean shutdownHandled = new AtomicBoolean(false);
  private final LoggingSession loggingSession;
  private final Set<LogConfigListener> logConfigListeners = new HashSet<>();
  private final LocatorStats locatorStats;
  private final Path workingDirectory;

  /**
   * whether the locator was stopped during forced-disconnect processing but a reconnect will occur
   */
  private volatile boolean stoppedForReconnect;
  private volatile boolean reconnected;

  /**
   * whether the locator was stopped during forced-disconnect processing
   */
  private volatile boolean forcedDisconnect;
  private volatile boolean isSharedConfigurationStarted;
  private volatile Thread restartThread;

  /**
   * The distributed system owned by this locator, if any. Note that if a ds already exists because
   * the locator is being colocated in a normal member this field will be null.
   */
  private InternalDistributedSystem internalDistributedSystem;

  /**
   * The cache owned by this locator, if any. Note that if a cache already exists because the
   * locator is being colocated in a normal member this field will be null.
   */
  private InternalCache internalCache;

  /**
   * product use logging
   */
  private ProductUseLog productUseLog;
  private boolean peerLocator;
  private ServerLocator serverLocator;
  private Properties env;

  /**
   * the TcpHandler used for peer location
   */
  private NetLocator netLocator;
  private DistributionConfigImpl distributionConfig;
  private WanLocatorDiscoverer locatorDiscoverer;
  private InternalConfigurationPersistenceService configurationPersistenceService;
  private LocatorClusterManagementService clusterManagementService;

  public static InternalLocator getLocator() {
    synchronized (locatorLock) {
      return locator;
    }
  }

  public static boolean hasLocator() {
    synchronized (locatorLock) {
      return locator != null;
    }
  }

  private static void removeLocator(InternalLocator locator) {
    if (locator == null) {
      return;
    }
    synchronized (locatorLock) {
      locator.loggingSession.stopSession();
      locator.loggingSession.shutdown();
      if (locator.equals(InternalLocator.locator)) {
        InternalLocator.locator = null;
      }
    }
  }

  /**
   * Create a locator that listens on a given port. This locator will not have peer or server
   * location services available until they are started by calling startServerLocation or
   * startPeerLocation on the locator object.
   *
   * @param port the tcp/ip port to listen on
   * @param loggingSession the LoggingSession to use, may be a NullLoggingSession which does
   *        nothing
   * @param logFile the file that log messages should be written to
   * @param logWriter a log writer that should be used (logFile parameter is ignored)
   * @param securityLogWriter the logWriter to be used for security related log messages
   * @param distributedSystemProperties optional properties to configure the distributed system
   *        (e.g., mcast addr/port, other locators)
   * @param startDistributedSystem if true then this locator will also start its own ds
   *
   * @deprecated Please use
   *             {@link #createLocator(int, LoggingSession, File, InternalLogWriter, InternalLogWriter, InetAddress, String, Properties, Path)}
   *             instead.
   */
  @Deprecated
  public static InternalLocator createLocator(int port, LoggingSession loggingSession, File logFile,
      InternalLogWriter logWriter, InternalLogWriter securityLogWriter, InetAddress bindAddress,
      String hostnameForClients, Properties distributedSystemProperties,
      boolean startDistributedSystem) {
    return createLocator(port, loggingSession, logFile, logWriter, securityLogWriter, bindAddress,
        hostnameForClients, distributedSystemProperties,
        Paths.get(System.getProperty("user.dir")));
  }

  /**
   * Create a locator that listens on a given port. This locator will not have peer or server
   * location services available until they are started by calling startServerLocation or
   * startPeerLocation on the locator object.
   *
   * @param port the tcp/ip port to listen on
   * @param loggingSession the LoggingSession to use, may be a NullLoggingSession which does
   *        nothing
   * @param logFile the file that log messages should be written to
   * @param logWriter a log writer that should be used (logFile parameter is ignored)
   * @param securityLogWriter the logWriter to be used for security related log messages
   * @param distributedSystemProperties optional properties to configure the distributed system
   *        (e.g., mcast addr/port, other locators)
   * @param workingDirectory the working directory to use for any files
   */
  public static InternalLocator createLocator(int port, LoggingSession loggingSession, File logFile,
      InternalLogWriter logWriter, InternalLogWriter securityLogWriter, InetAddress bindAddress,
      String hostnameForClients, Properties distributedSystemProperties, Path workingDirectory) {
    synchronized (locatorLock) {
      if (hasLocator()) {
        throw new IllegalStateException(
            "A locator can not be created because one already exists in this JVM.");
      }
      InternalLocator locator =
          new InternalLocator(port, loggingSession, logFile, logWriter, securityLogWriter,
              bindAddress, hostnameForClients, distributedSystemProperties, null,
              workingDirectory);
      InternalLocator.locator = locator;
      return locator;
    }
  }

  private static void setLocator(InternalLocator locator) {
    synchronized (locatorLock) {
      if (InternalLocator.locator != null && InternalLocator.locator != locator) {
        throw new IllegalStateException(
            "A locator can not be created because one already exists in this JVM.");
      }

      InternalLocator.locator = locator;
    }
  }

  /**
   * Creates a distribution locator that runs in this VM on the given port and bind address in the
   * default working directory (Java "user.dir" System Property).
   *
   * <p>
   * This is for internal use only as it does not create a distributed system unless told to do so.
   *
   * @param port the tcp/ip port to listen on
   * @param logFile the file that log messages should be written to
   * @param logWriter a log writer that should be used (logFile parameter is ignored)
   * @param securityLogWriter the logWriter to be used for security related log messages
   * @param startDistributedSystem if true, a distributed system is started
   * @param distributedSystemProperties optional properties to configure the distributed system
   *        (e.g., mcast
   *        addr/port, other locators)
   * @param hostnameForClients the name to give to clients for connecting to this locator
   */
  public static InternalLocator startLocator(int port, File logFile, InternalLogWriter logWriter,
      InternalLogWriter securityLogWriter, InetAddress bindAddress, boolean startDistributedSystem,
      Properties distributedSystemProperties, String hostnameForClients)
      throws IOException {
    return startLocator(port, logFile, logWriter, securityLogWriter, bindAddress,
        startDistributedSystem, distributedSystemProperties, hostnameForClients,
        Paths.get(System.getProperty("user.dir")));
  }

  /**
   * Creates a distribution locator that runs in this VM on the given port and bind address in the
   * specified working directory.
   *
   * <p>
   * This is for internal use only as it does not create a distributed system unless told to do so.
   *
   * @param port the tcp/ip port to listen on
   * @param logFile the file that log messages should be written to
   * @param logWriter a log writer that should be used (logFile parameter is ignored)
   * @param securityLogWriter the logWriter to be used for security related log messages
   * @param startDistributedSystem if true, a distributed system is started
   * @param distributedSystemProperties optional properties to configure the distributed system
   *        (e.g., mcast
   *        addr/port, other locators)
   * @param hostnameForClients the name to give to clients for connecting to this locator
   * @param workingDirectory the working directory to use for any files
   */
  public static InternalLocator startLocator(int port, File logFile, InternalLogWriter logWriter,
      InternalLogWriter securityLogWriter, InetAddress bindAddress, boolean startDistributedSystem,
      Properties distributedSystemProperties, String hostnameForClients, Path workingDirectory)
      throws IOException {
    System.setProperty(FORCE_LOCATOR_DM_TYPE, "true");
    InternalLocator newLocator = null;

    boolean startedLocator = false;
    try {

      // if startDistributedSystem is true then Locator uses a NullLoggingSession (does nothing)
      LoggingSession loggingSession =
          startDistributedSystem ? NullLoggingSession.create() : LoggingSession.create();

      newLocator = createLocator(port, loggingSession, logFile, logWriter, securityLogWriter,
          bindAddress, hostnameForClients, distributedSystemProperties,
          workingDirectory);

      loggingSession.createSession(newLocator);
      loggingSession.startSession();

      try {
        newLocator.startPeerLocation();

        if (startDistributedSystem) {

          try {
            newLocator.startDistributedSystem();
          } catch (RuntimeException e) {
            newLocator.stop();
            throw e;
          }

          InternalDistributedSystem system = newLocator.internalDistributedSystem;
          if (system != null) {
            system.getDistributionManager().addHostedLocators(system.getDistributedMember(),
                getLocatorStrings(), newLocator.isSharedConfigurationEnabled());
          }
        }
      } catch (LocatorCancelException e) {
        newLocator.stop();
        throw e;
      }

      InternalDistributedSystem system = InternalDistributedSystem.getConnectedInstance();
      if (system != null) {
        try {
          newLocator.startServerLocation(system);
        } catch (RuntimeException e) {
          newLocator.stop();
          throw e;
        }
      }

      newLocator.endStartLocator(null);
      startedLocator = true;
      return newLocator;

    } finally {
      System.clearProperty(FORCE_LOCATOR_DM_TYPE);

      if (!startedLocator) {
        removeLocator(newLocator);
      }
    }
  }

  /**
   * Determines if this VM is a locator which must ignore a shutdown.
   *
   * @return true if this VM is a locator which should ignore a shutdown, false if it is a normal
   *         member.
   */
  public static boolean isDedicatedLocator() {
    InternalLocator internalLocator = getLocator();
    if (internalLocator == null) {
      return false;
    }

    InternalDistributedSystem system = internalLocator.internalDistributedSystem;
    if (system == null) {
      return false;
    }
    DistributionManager distributionManager = system.getDistributionManager();
    if (distributionManager.isLoner()) {
      return false;
    }
    ClusterDistributionManager clusterDistributionManager =
        (ClusterDistributionManager) system.getDistributionManager();
    return clusterDistributionManager.getDMType() == ClusterDistributionManager.LOCATOR_DM_TYPE;
  }

  /**
   * Creates a new {@code Locator} with the given port, log file, logWriter, and bind address.
   *
   * @param port the tcp/ip port to listen on
   * @param logFile the file that log messages should be written to
   * @param logWriter a log writer that should be used (logFile parameter is ignored)
   * @param securityLogWriter the log writer to be used for security related log messages
   * @param hostnameForClients the name to give to clients for connecting to this locator
   * @param distributedSystemProperties optional properties to configure the distributed system
   *        (e.g., mcast addr/port, other locators)
   * @param distributionConfig the config if being called from a distributed system; otherwise null.
   * @param workingDirectory the working directory to use for files
   */
  @VisibleForTesting
  InternalLocator(int port, LoggingSession loggingSession, File logFile,
      InternalLogWriter logWriter, InternalLogWriter securityLogWriter, InetAddress bindAddress,
      String hostnameForClients, Properties distributedSystemProperties,
      DistributionConfigImpl distributionConfig, Path workingDirectory) {
    this.logFile = logFile;
    this.bindAddress = bindAddress;
    this.hostnameForClients = hostnameForClients;

    this.workingDirectory = workingDirectory;

    this.distributionConfig = distributionConfig;

    env = new Properties();

    // set bind-address explicitly only if not wildcard and let any explicit
    // value in distributedSystemProperties take precedence
    if (bindAddress != null && !bindAddress.isAnyLocalAddress()) {
      env.setProperty(BIND_ADDRESS, bindAddress.getHostAddress());
    }

    if (distributedSystemProperties != null) {
      env.putAll(distributedSystemProperties);
    }
    env.setProperty(CACHE_XML_FILE, "");

    // create a DC so that all of the lookup rules, gemfire.properties, etc,
    // are considered and we have a config object we can trust
    if (this.distributionConfig == null) {
      this.distributionConfig = new DistributionConfigImpl(env);
      env.clear();
      env.putAll(this.distributionConfig.getProps());
    }

    boolean hasLogFileButConfigDoesNot =
        this.logFile != null && this.distributionConfig.getLogFile()
            .toString().equals(DistributionConfig.DEFAULT_LOG_FILE.toString());
    if (logWriter == null && hasLogFileButConfigDoesNot) {
      // LOG: this is(was) a hack for when logFile and config don't match -- if config specifies a
      // different log-file things will break!
      this.distributionConfig.unsafeSetLogFile(this.logFile);
    }

    if (loggingSession == null) {
      throw new Error("LoggingSession must not be null");
    }
    this.loggingSession = loggingSession;

    // LOG: create LogWriters for GemFireTracer (or use whatever was passed in)
    if (logWriter == null) {
      LogWriterFactory.createLogWriterLogger(this.distributionConfig, false);
      if (logger.isDebugEnabled()) {
        logger.debug("LogWriter for locator is created.");
      }
    }

    if (securityLogWriter == null) {
      securityLogWriter = LogWriterFactory.createLogWriterLogger(this.distributionConfig, true);
      securityLogWriter.fine("SecurityLogWriter for locator is created.");
    }

    SocketCreatorFactory.setDistributionConfig(this.distributionConfig);

    locatorListener = WANServiceProvider.createLocatorMembershipListener();
    if (locatorListener != null) {
      // We defer setting the port until the handler is init'd - that way we'll have an actual port
      // in the case where we're starting with port = 0.
      locatorListener.setConfig(getConfig());
    }
    handler = new PrimaryHandler(this, locatorListener);

    locatorStats = new LocatorStats();

    server = new TcpServerFactory().makeTcpServer(port, this.bindAddress, null,
        this.distributionConfig, handler, new DelayedPoolStatHelper(), toString(), this);
  }

  public boolean isSharedConfigurationEnabled() {
    return distributionConfig.getEnableClusterConfiguration();
  }

  private boolean loadFromSharedConfigDir() {
    return distributionConfig.getLoadClusterConfigFromDir();
  }

  public boolean isSharedConfigurationRunning() {
    return configurationPersistenceService != null
        && configurationPersistenceService.getStatus() == SharedConfigurationStatus.RUNNING;
  }

  public LocatorMembershipListener getLocatorMembershipListener() {
    return locatorListener;
  }

  /**
   * @deprecated Please use {@link #getLocatorMembershipListener()} instead.
   */
  @Deprecated
  public LocatorMembershipListener getlocatorMembershipListener() {
    return getLocatorMembershipListener();
  }

  private void startTcpServer() throws IOException {
    logger.info("Starting {}", this);
    server.start();
  }

  public InternalConfigurationPersistenceService getConfigurationPersistenceService() {
    return configurationPersistenceService;
  }

  public DistributionConfigImpl getConfig() {
    return distributionConfig;
  }

  public InternalCache getCache() {
    if (internalCache == null) {
      return GemFireCacheImpl.getInstance();
    }
    return internalCache;
  }

  /**
   * Start peer location in this locator. If you plan on starting a distributed system later, this
   * method should be called first so that the distributed system can use this locator.
   *
   * @return returns the port that the locator to which the locator is bound
   * @since GemFire 5.7
   */
  int startPeerLocation() throws IOException {
    if (isPeerLocator()) {
      throw new IllegalStateException(
          String.format("Peer location is already running for %s", this));
    }
    logger.info("Starting peer location for {}", this);

    String locatorsConfigValue = distributionConfig.getLocators();

    // check for settings that would require only locators to hold the
    // coordinator - e.g., security and network-partition detection
    boolean locatorsAreCoordinators;
    boolean networkPartitionDetectionEnabled =
        distributionConfig.getEnableNetworkPartitionDetection();
    String securityUDPDHAlgo = distributionConfig.getSecurityUDPDHAlgo();
    if (networkPartitionDetectionEnabled) {
      locatorsAreCoordinators = true;
    } else {
      // check if security is enabled
      String prop = distributionConfig.getSecurityPeerAuthInit();
      locatorsAreCoordinators = prop != null && !prop.isEmpty();
      if (!locatorsAreCoordinators) {
        locatorsAreCoordinators = Boolean.getBoolean(LOCATORS_PREFERRED_AS_COORDINATORS);
      }
    }

    netLocator = MemberFactory.newLocatorHandler(bindAddress, locatorsConfigValue,
        locatorsAreCoordinators, networkPartitionDetectionEnabled, locatorStats, securityUDPDHAlgo,
        workingDirectory);
    handler.addHandler(PeerLocatorRequest.class, netLocator);
    peerLocator = true;
    if (!server.isAlive()) {
      startTcpServer();
    }
    int boundPort = server.getPort();
    File productUseFile = workingDirectory.resolve("locator" + boundPort + "views.log").toFile();
    productUseLog = new ProductUseLog(productUseFile);

    return boundPort;
  }

  /**
   * @return the TcpHandler for peer to peer discovery
   */
  public NetLocator getLocatorHandler() {
    return netLocator;
  }

  /**
   * For backward-compatibility we retain this method
   *
   * @deprecated use a form of the method that does not have peerLocator/serverLocator parameters
   */
  @Deprecated
  public static InternalLocator startLocator(int locatorPort, File logFile,
      InternalLogWriter logWriter, InternalLogWriter securityLogWriter, InetAddress bindAddress,
      Properties distributedSystemProperties, boolean peerLocator, boolean serverLocator,
      String hostnameForClients, boolean b1) throws IOException {
    return startLocator(locatorPort, logFile, logWriter, securityLogWriter, bindAddress, true,
        distributedSystemProperties, hostnameForClients);
  }

  /**
   * Start a distributed system whose life cycle is managed by this locator. When the locator is
   * stopped, this distributed system will be disconnected. If a distributed system already exists,
   * this method will have no affect.
   *
   * @since GemFire 5.7
   */
  private void startDistributedSystem() throws IOException {
    InternalDistributedSystem existing = InternalDistributedSystem.getConnectedInstance();
    if (existing != null) {
      // LOG: changed from config to info
      logger.info("Using existing distributed system: {}", existing);
      startCache(existing);
    } else {

      StringBuilder sb = new StringBuilder(100);
      if (bindAddress != null) {
        sb.append(bindAddress.getHostAddress());
      } else {
        sb.append(SocketCreator.getLocalHost().getHostAddress());
      }
      sb.append('[').append(getPort()).append(']');
      String thisLocator = sb.toString();

      if (peerLocator) {
        // append this locator to the locators list from the config properties
        boolean setLocatorsProp = false;
        String locatorsConfigValue = distributionConfig.getLocators();
        if (StringUtils.isNotBlank(locatorsConfigValue)) {
          if (!locatorsConfigValue.contains(thisLocator)) {
            locatorsConfigValue = locatorsConfigValue + ',' + thisLocator;
            setLocatorsProp = true;
          }
        } else {
          locatorsConfigValue = thisLocator;
          setLocatorsProp = true;
        }
        if (setLocatorsProp) {
          Properties updateEnv = new Properties();
          updateEnv.setProperty(LOCATORS, locatorsConfigValue);
          distributionConfig.setApiProps(updateEnv);
          String locatorsPropertyName = GEMFIRE_PREFIX + LOCATORS;
          if (System.getProperty(locatorsPropertyName) != null) {
            System.setProperty(locatorsPropertyName, locatorsConfigValue);
          }
        }
        // No longer default mcast-port to zero.
      }

      Properties distributedSystemProperties = new Properties();
      // LogWriterAppender is now shared via that class
      // using a DistributionConfig earlier in this method
      distributedSystemProperties.put(DistributionConfig.DS_CONFIG_NAME, distributionConfig);

      logger.info("Starting distributed system");

      internalDistributedSystem =
          (InternalDistributedSystem) DistributedSystem.connect(distributedSystemProperties);

      if (peerLocator) {
        netLocator.setMembershipManager(internalDistributedSystem.getDM().getMembershipManager());
      }

      internalDistributedSystem.addDisconnectListener(sys -> stop(false, false, false));

      startCache(internalDistributedSystem);

      logger.info("Locator started on {}", thisLocator);

      internalDistributedSystem.setDependentLocator(this);
    }
  }

  private void startCache(DistributedSystem system) throws IOException {
    InternalCache internalCache = GemFireCacheImpl.getInstance();
    if (internalCache == null) {
      logger.info("Creating cache for locator.");
      this.internalCache = new InternalCacheBuilder(system.getProperties())
          .create((InternalDistributedSystem) system);
      internalCache = this.internalCache;
    } else {
      logger.info("Using existing cache for locator.");
      ((InternalDistributedSystem) system).handleResourceEvent(ResourceEvent.LOCATOR_START, this);
    }
    startJmxManagerLocationService(internalCache);

    startClusterManagementService();
  }

  private void startClusterManagementService() throws IOException {
    startConfigurationPersistenceService();

    if (internalCache == null) {
      return;
    }

    clusterManagementService = new LocatorClusterManagementService(locator.internalCache,
        locator.configurationPersistenceService);

    // start management rest service
    AgentUtil agentUtil = new AgentUtil(GemFireVersion.getGemFireVersion());

    // Find the V2 Management rest WAR file
    String gemfireManagementWar = agentUtil.findWarLocation("geode-web-management");
    if (gemfireManagementWar == null) {
      logger.info(
          "Unable to find GemFire V2 Management REST API WAR file; the Management REST Interface for Geode will not be accessible.");
      return;
    }

    Pair<String, Object> securityServiceAttribute =
        new ImmutablePair<>(HttpService.SECURITY_SERVICE_SERVLET_CONTEXT_PARAM,
            internalCache.getSecurityService());
    Pair<String, Object> clusterManagementServiceAttribute =
        new ImmutablePair<>(HttpService.CLUSTER_MANAGEMENT_SERVICE_CONTEXT_PARAM,
            clusterManagementService);

    if (distributionConfig.getEnableManagementRestService()) {
      internalCache.getHttpService().ifPresent(x -> {
        try {
          logger.info("Geode Property {}=true Geode Management Rest Service is enabled.",
              ConfigurationProperties.ENABLE_MANAGEMENT_REST_SERVICE);
          x.addWebApplication("/management", gemfireManagementWar, securityServiceAttribute,
              clusterManagementServiceAttribute);
        } catch (Throwable e) {
          logger.warn("Unable to start management service: {}", e.getMessage());
        }
      });
    } else {
      logger.info("Geode Property {}=false Geode Management Rest Service is disabled.",
          ConfigurationProperties.ENABLE_MANAGEMENT_REST_SERVICE);
    }
  }

  /**
   * End the initialization of the locator. This method should be called once the location services
   * and distributed system are started.
   *
   * @param distributedSystem The distributed system to use for the statistics.
   * @since GemFire 5.7
   */
  void endStartLocator(InternalDistributedSystem distributedSystem) {
    env = null;

    if (distributedSystem == null) {
      distributedSystem = InternalDistributedSystem.getConnectedInstance();
    }

    if (distributedSystem != null) {
      onConnect(distributedSystem);
    } else {
      InternalDistributedSystem.addConnectListener(this);
    }

    locatorDiscoverer = WANServiceProvider.createLocatorDiscoverer();
    if (locatorDiscoverer != null) {
      locatorDiscoverer.discover(getPort(), distributionConfig, locatorListener,
          hostnameForClients);
    }
  }

  /**
   * Start server location services in this locator. Server location can only be started once there
   * is a running distributed system.
   *
   * @param distributedSystem The distributed system which the server location services should use.
   *        If null, the method will try to find an already connected distributed system.
   * @since GemFire 5.7
   */
  void startServerLocation(InternalDistributedSystem distributedSystem) throws IOException {
    if (isServerLocator()) {
      throw new IllegalStateException(
          String.format("Server location is already running for %s", this));
    }
    logger.info("Starting server location for {}", this);

    if (distributedSystem == null) {
      distributedSystem = InternalDistributedSystem.getConnectedInstance();
      if (distributedSystem == null) {
        throw new IllegalStateException(
            "Since server location is enabled the distributed system must be connected.");
      }
    }

    ServerLocator serverLocator = new ServerLocator(getPort(), bindAddress, hostnameForClients,
        logFile, productUseLog, getConfig().getName(), distributedSystem, locatorStats);
    handler.addHandler(LocatorListRequest.class, serverLocator);
    handler.addHandler(ClientConnectionRequest.class, serverLocator);
    handler.addHandler(QueueConnectionRequest.class, serverLocator);
    handler.addHandler(ClientReplacementRequest.class, serverLocator);
    handler.addHandler(GetAllServersRequest.class, serverLocator);
    handler.addHandler(LocatorStatusRequest.class, serverLocator);
    this.serverLocator = serverLocator;
    if (!server.isAlive()) {
      startTcpServer();
    }
    // the product use is not guaranteed to be initialized until the server is started, so
    // the last thing we do is tell it to start logging
    productUseLog.monitorUse(distributedSystem);
  }

  /**
   * Stop this locator.
   */
  @Override
  public void stop() {
    stop(false, false, true);
  }

  /**
   * Stop this locator
   *
   * @param stopForReconnect - stopping for distributed system reconnect
   * @param waitForDisconnect - wait up to 60 seconds for the locator to completely stop
   */
  public void stop(boolean forcedDisconnect, boolean stopForReconnect, boolean waitForDisconnect) {
    boolean isDebugEnabled = logger.isDebugEnabled();

    stoppedForReconnect = stopForReconnect;
    this.forcedDisconnect = forcedDisconnect;

    if (server.isShuttingDown()) {
      // fix for bug 46156
      // If we are already shutting down don't do all of this again.
      // But, give the server a bit of time to shut down so a new
      // locator can be created, if desired, when this method returns
      if (!stopForReconnect && waitForDisconnect) {
        long endOfWait = System.currentTimeMillis() + 60000;
        if (isDebugEnabled && server.isAlive()) {
          logger.debug("sleeping to wait for the locator server to shut down...");
        }

        while (server.isAlive() && System.currentTimeMillis() < endOfWait) {
          try {
            Thread.sleep(500);
          } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
            return;
          }
        }

        if (isDebugEnabled) {
          if (server.isAlive()) {
            logger.debug(
                "60 seconds have elapsed waiting for the locator server to shut down - terminating wait and returning");
          } else {
            logger.debug("the locator server has shut down");
          }
        }
      }
      return;
    }

    if (locatorDiscoverer != null) {
      locatorDiscoverer.stop();
      locatorDiscoverer = null;
    }

    if (server.isAlive()) {
      logger.info("Stopping {}", this);
      try {
        new TcpClient().stop(bindAddress, getPort());
      } catch (ConnectException ignore) {
        // must not be running
      }

      boolean interrupted = Thread.interrupted();
      try {
        // TcpServer up to SHUTDOWN_WAIT_TIME for its executor pool to shut down.
        // We wait 2 * SHUTDOWN_WAIT_TIME here to account for that shutdown, and then our own.
        server.join(TcpServer.SHUTDOWN_WAIT_TIME * 2);

      } catch (InterruptedException ex) {
        interrupted = true;
        logger.warn("Interrupted while stopping {}", this, ex);

        // Continue running -- doing our best to stop everything...
      } finally {
        if (interrupted) {
          Thread.currentThread().interrupt();
        }
      }

      if (server.isAlive()) {
        logger.fatal("Could not stop {} in 60 seconds", this);
      }
    }

    removeLocator(this);

    handleShutdown();
    logger.info("{} is stopped", this);

    if (stoppedForReconnect) {
      if (internalDistributedSystem != null) {
        launchRestartThread();
      }
    }
  }

  /**
   * answers whether this locator is currently stopped
   */
  public boolean isStopped() {
    return server == null || !server.isAlive();
  }

  void handleShutdown() {
    if (!shutdownHandled.compareAndSet(false, true)) {
      // already shutdown
      return;
    }
    if (productUseLog != null) {
      productUseLog.close();
    }
    if (internalDistributedSystem != null) {
      internalDistributedSystem.setDependentLocator(null);
    }

    if (internalCache != null && !stoppedForReconnect && !forcedDisconnect) {
      logger.info("Closing locator's cache");
      try {
        internalCache.close();
      } catch (RuntimeException ex) {
        logger.info("Could not close locator's cache because: {}", ex.getMessage(), ex);
      }
    }

    if (locatorStats != null) {
      locatorStats.close();
    }

    if (locatorListener != null) {
      locatorListener.clearLocatorInfo();
    }

    isSharedConfigurationStarted = false;
    if (internalDistributedSystem != null && !forcedDisconnect) {
      if (internalDistributedSystem.isConnected()) {
        logger.info("Disconnecting distributed system for {}", this);
        internalDistributedSystem.disconnect();
      }
    }
  }

  /**
   * Waits for a locator to be told to stop.
   *
   * @throws InterruptedException thrown if the thread is interrupted
   */
  public void waitToStop() throws InterruptedException {
    boolean restarted;
    do {
      DistributedSystem system = internalDistributedSystem;
      restarted = false;
      server.join();
      if (stoppedForReconnect) {
        logger.info("waiting for distributed system to disconnect...");
        while (system.isConnected()) {
          Thread.sleep(5000);
        }
        logger.info("waiting for distributed system to reconnect...");
        try {
          restarted = system.waitUntilReconnected(-1, TimeUnit.SECONDS);
        } catch (CancelException e) {
          // reconnect attempt failed
        }
        if (restarted) {
          logger.info("system restarted");
        } else {
          logger.info("system was not restarted");
        }
        Thread restartThread = this.restartThread;
        if (restartThread != null) {
          logger.info("waiting for services to restart...");
          restartThread.join();
          this.restartThread = null;
          logger.info("done waiting for services to restart");
        }
      }
    } while (restarted);
  }

  /**
   * launch a thread that will restart location services
   */
  private void launchRestartThread() {
    String threadName = "Location services restart thread";
    restartThread = new LoggingThread(threadName, () -> {
      boolean restarted = false;
      try {
        restarted = attemptReconnect();
        logger.info("attemptReconnect returned {}", restarted);
      } catch (InterruptedException e) {
        logger.info("attempt to restart location services was interrupted", e);
      } catch (IOException e) {
        logger.info("attempt to restart location services terminated", e);
      } finally {
        if (!restarted) {
          stoppedForReconnect = false;
        }
        reconnected = restarted;
      }
      restartThread = null;
    });
    restartThread.start();
  }

  public boolean isReconnected() {
    return reconnected;
  }

  /**
   * reconnects the locator to a restarting DistributedSystem. If quorum checks are enabled this
   * will start peer location services before a distributed system is available if the quorum check
   * succeeds. It will then wait for the system to finish reconnecting before returning. If quorum
   * checks are not being done this merely waits for the distributed system to reconnect and then
   * starts location services.
   *
   * @return true if able to reconnect the locator to the new distributed system
   */
  private boolean attemptReconnect() throws InterruptedException, IOException {
    boolean restarted = false;
    if (stoppedForReconnect) {
      logger.info("attempting to restart locator");
      boolean tcpServerStarted = false;
      InternalDistributedSystem system = internalDistributedSystem;
      long waitTime = system.getConfig().getMaxWaitTimeForReconnect() / 2;
      QuorumChecker quorumChecker = null;

      while (system.getReconnectedSystem() == null && !system.isReconnectCancelled()) {
        if (quorumChecker == null) {
          quorumChecker = internalDistributedSystem.getQuorumChecker();
          if (quorumChecker != null) {
            logger.info("The distributed system returned this quorum checker: {}", quorumChecker);
          }
        }

        if (quorumChecker != null && !tcpServerStarted) {
          boolean start = quorumChecker
              .checkForQuorum(3L * internalDistributedSystem.getConfig().getMemberTimeout());
          if (start) {
            // start up peer location. server location is started after the DS finishes reconnecting
            logger.info("starting peer location");
            if (locatorListener != null) {
              locatorListener.clearLocatorInfo();
            }
            stoppedForReconnect = false;
            internalDistributedSystem = null;
            internalCache = null;
            restartWithoutSystem();
            tcpServerStarted = true;
            setLocator(this);
          }
        }

        try {
          system.waitUntilReconnected(waitTime, TimeUnit.MILLISECONDS);
        } catch (CancelException e) {
          logger.info("Attempt to reconnect failed and further attempts have been terminated");
          stoppedForReconnect = false;
          return false;
        }
      }

      InternalDistributedSystem newSystem =
          (InternalDistributedSystem) system.getReconnectedSystem();
      if (newSystem != null) {
        if (!tcpServerStarted) {
          if (locatorListener != null) {
            locatorListener.clearLocatorInfo();
          }
          stoppedForReconnect = false;
        }

        try {
          restartWithSystem(newSystem, GemFireCacheImpl.getInstance());
        } catch (CancelException e) {
          stoppedForReconnect = true;
          return false;
        }

        setLocator(this);
        restarted = true;
      }
    }

    logger.info("restart thread exiting.  Service was {}restarted", restarted ? "" : "not ");
    return restarted;
  }

  private void restartWithoutSystem() throws IOException {
    synchronized (locatorLock) {
      if (locator != this && hasLocator()) {
        throw new IllegalStateException(
            "A locator can not be created because one already exists in this JVM.");
      }
      internalDistributedSystem = null;
      internalCache = null;

      logger.info("Locator restart: initializing TcpServer peer location services");
      server.restarting(null, null, null);

      if (productUseLog.isClosed()) {
        productUseLog.reopen();
      }

      if (!server.isAlive()) {
        logger.info("Locator restart: starting TcpServer");
        startTcpServer();
      }
    }
  }

  private void restartWithSystem(InternalDistributedSystem newSystem, InternalCache newCache)
      throws IOException {
    synchronized (locatorLock) {
      if (locator != this && hasLocator()) {
        throw new IllegalStateException(
            "A locator can not be created because one already exists in this JVM.");
      }
    }
    internalDistributedSystem = newSystem;
    internalCache = newCache;
    internalDistributedSystem.setDependentLocator(this);
    logger.info("Locator restart: initializing TcpServer");

    try {
      server.restarting(newSystem, newCache, configurationPersistenceService);
    } catch (CancelException e) {
      internalDistributedSystem = null;
      internalCache = null;
      logger.info("Locator restart: attempt to restart location services failed", e);
      throw e;
    }

    if (productUseLog.isClosed()) {
      productUseLog.reopen();
    }
    productUseLog.monitorUse(newSystem);

    if (isSharedConfigurationEnabled()) {
      configurationPersistenceService =
          new InternalConfigurationPersistenceService(newCache, workingDirectory,
              JAXBService.create());
      startClusterManagementService();
    }

    if (!server.isAlive()) {
      logger.info("Locator restart: starting TcpServer");
      startTcpServer();
    }

    logger.info("Locator restart: initializing JMX manager");
    startJmxManagerLocationService(newCache);
    endStartLocator(internalDistributedSystem);
    logger.info("Locator restart completed");

    server.restartCompleted(newSystem);
  }

  public ClusterManagementService getClusterManagementService() {
    return clusterManagementService;
  }

  @Override
  public DistributedSystem getDistributedSystem() {
    if (internalDistributedSystem == null) {
      return InternalDistributedSystem.getAnyInstance();
    }

    return internalDistributedSystem;
  }

  @Override
  public boolean isPeerLocator() {
    return peerLocator;
  }

  @Override
  public boolean isServerLocator() {
    return serverLocator != null;
  }

  /**
   * Returns null if no server locator; otherwise returns the advisee that represents the server
   * locator.
   */
  public ServerLocator getServerLocatorAdvisee() {
    return serverLocator;
  }

  /**
   * Return the port on which the locator is actually listening. If called before the locator has
   * actually started, this method will return null.
   *
   * @return the port the locator is listening on or null if it has not yet been started
   */
  @Override
  public Integer getPort() {
    if (server != null) {
      return server.getPort();
    }
    return null;
  }

  @Override
  public LogConfig getLogConfig() {
    return distributionConfig;
  }

  @Override
  public StatisticsConfig getStatisticsConfig() {
    return distributionConfig;
  }

  @Override
  public void addLogConfigListener(LogConfigListener logConfigListener) {
    logConfigListeners.add(logConfigListener);
  }

  @Override
  public void removeLogConfigListener(LogConfigListener logConfigListener) {
    logConfigListeners.remove(logConfigListener);
  }

  public SharedConfigurationStatusResponse getSharedConfigurationStatus() {
    ExecutorService waitingPoolExecutor =
        internalCache.getDistributionManager().getWaitingThreadPool();
    Future<SharedConfigurationStatusResponse> statusFuture =
        waitingPoolExecutor.submit(new FetchSharedConfigStatus());
    SharedConfigurationStatusResponse response;

    try {
      response = statusFuture.get(5, TimeUnit.SECONDS);
    } catch (Exception e) {
      logger.info("Exception occurred while fetching the status {}", getStackTrace(e));
      response = new SharedConfigurationStatusResponse();
      response.setStatus(SharedConfigurationStatus.UNDETERMINED);
    }
    return response;
  }

  @Override
  public void onConnect(InternalDistributedSystem sys) {
    try {
      locatorStats.hookupStats(sys,
          SocketCreator.getLocalHost().getCanonicalHostName() + '-' + server.getBindAddress());
    } catch (UnknownHostException e) {
      logger.warn(e);
    }
  }

  /**
   * Returns collection of locator strings representing every locator instance hosted by this
   * member.
   *
   * @see #getLocators()
   */
  public static Collection<String> getLocatorStrings() {
    Collection<String> locatorStrings;
    try {
      Collection<DistributionLocatorId> locatorIds = asDistributionLocatorIds(getLocators());
      locatorStrings = DistributionLocatorId.asStrings(locatorIds);
    } catch (UnknownHostException ignored) {
      locatorStrings = null;
    }
    if (locatorStrings == null || locatorStrings.isEmpty()) {
      return null;
    }
    return locatorStrings;
  }

  private void startConfigurationPersistenceService() throws IOException {
    installRequestHandlers();

    if (!distributionConfig.getEnableClusterConfiguration()) {
      logger.info("Cluster configuration service is disabled");
      return;
    }

    if (!distributionConfig.getJmxManager()) {
      throw new IllegalStateException(
          "Cannot start cluster configuration without jmx-manager=true");
    }

    if (isSharedConfigurationStarted) {
      logger.info("Cluster configuration service is already started.");
      return;
    }

    if (!isDedicatedLocator()) {
      logger.info(
          "Cluster configuration service not enabled as it is only supported in dedicated locators");
      return;
    }


    if (locator.configurationPersistenceService == null) {
      // configurationPersistenceService will already be created in case of auto-reconnect
      locator.configurationPersistenceService =
          new InternalConfigurationPersistenceService(locator.internalCache, workingDirectory,
              JAXBService.create());
    }
    locator.configurationPersistenceService
        .initSharedConfiguration(locator.loadFromSharedConfigDir());
    logger.info(
        "Cluster configuration service start up completed successfully and is now running ....");
    isSharedConfigurationStarted = true;
  }

  public void startJmxManagerLocationService(InternalCache internalCache) {
    if (internalCache.getJmxManagerAdvisor() != null) {
      if (!handler.isHandled(JmxManagerLocatorRequest.class)) {
        handler.addHandler(JmxManagerLocatorRequest.class,
            new JmxManagerLocator(internalCache));
      }
    }
  }

  private void installRequestHandlers() {
    if (!handler.isHandled(SharedConfigurationStatusRequest.class)) {
      handler.addHandler(SharedConfigurationStatusRequest.class,
          new SharedConfigurationStatusRequestHandler());
      logger.info("SharedConfigStatusRequestHandler installed");
    }

    if (!handler.isHandled(ClusterManagementServiceInfoRequest.class)) {
      handler.addHandler(ClusterManagementServiceInfoRequest.class,
          new ClusterManagementServiceInfoRequestHandler());
      logger.info("ClusterManagementServiceInfoRequestHandler installed");
    }
  }

  public boolean hasHandlerForClass(Class messageClass) {
    return handler.isHandled(messageClass);
  }

  class FetchSharedConfigStatus implements Callable<SharedConfigurationStatusResponse> {

    @Override
    public SharedConfigurationStatusResponse call() throws InterruptedException {
      InternalLocator locator = InternalLocator.this;

      SharedConfigurationStatusResponse response;
      if (locator.configurationPersistenceService != null) {
        response = locator.configurationPersistenceService.createStatusResponse();
      } else {
        response = new SharedConfigurationStatusResponse();
        response.setStatus(SharedConfigurationStatus.UNDETERMINED);
      }
      return response;
    }
  }

  /**
   * A helper object so that the TcpServer can record its stats to the proper place. Stats are only
   * recorded if a distributed system is started.
   */
  protected class DelayedPoolStatHelper implements PoolStatHelper {

    @Override
    public void startJob() {
      locatorStats.incRequestInProgress(1);

    }

    @Override
    public void endJob() {
      locatorStats.incRequestInProgress(-1);
    }
  }
}
