/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.distributed.internal;

import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_MANAGER;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_POST_PROCESSOR;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileFilter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.TransformerException;

import joptsimple.internal.Strings;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.configuration.CacheConfig;
import org.apache.geode.distributed.ConfigurationPersistenceService;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.internal.cache.ClusterConfigurationLoader;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
import org.apache.geode.internal.cache.persistence.PersistentMemberPattern;
import org.apache.geode.internal.cache.xmlcache.CacheXmlGenerator;
import org.apache.geode.internal.config.JAXBService;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.management.internal.configuration.callbacks.ConfigurationChangeListener;
import org.apache.geode.management.internal.configuration.domain.Configuration;
import org.apache.geode.management.internal.configuration.domain.SharedConfigurationStatus;
import org.apache.geode.management.internal.configuration.domain.XmlEntity;
import org.apache.geode.management.internal.configuration.messages.ConfigurationResponse;
import org.apache.geode.management.internal.configuration.messages.SharedConfigurationStatusResponse;
import org.apache.geode.management.internal.configuration.utils.XmlUtils;

public class InternalConfigurationPersistenceService implements ConfigurationPersistenceService {
  private static final Logger logger = LogService.getLogger();

  /**
   * Name of the directory where the shared configuration artifacts are stored
   */
  public static final String CLUSTER_CONFIG_ARTIFACTS_DIR_NAME = "cluster_config";

  private static final String CLUSTER_CONFIG_DISK_STORE_NAME = "cluster_config";

  public static final String CLUSTER_CONFIG_DISK_DIR_PREFIX = "ConfigDiskDir_";

  /**
   * Name of the lock service used for shared configuration
   */
  private static final String SHARED_CONFIG_LOCK_SERVICE_NAME = "__CLUSTER_CONFIG_LS";

  /**
   * Name of the lock for locking the shared configuration
   */
  private static final String SHARED_CONFIG_LOCK_NAME = "__CLUSTER_CONFIG_LOCK";

  /**
   * Name of the region which is used to store the configuration information
   */
  public static final String CONFIG_REGION_NAME = "_ConfigurationRegion";
  private static final String CACHE_CONFIG_VERSION = "1.0";

  private final Path configDirPath;
  private final Path configDiskDirPath;

  private final Set<PersistentMemberPattern> newerSharedConfigurationLocatorInfo = new HashSet<>();
  private final AtomicReference<SharedConfigurationStatus> status = new AtomicReference<>();

  private final InternalCache cache;
  private final DistributedLockService sharedConfigLockingService;
  private final JAXBService jaxbService;

  public InternalConfigurationPersistenceService(InternalCache cache, Path workingDirectory,
      JAXBService jaxbService) {
    this(cache,
        sharedConfigLockService(cache.getDistributedSystem()),
        jaxbService,
        workingDirectory.resolve(CLUSTER_CONFIG_ARTIFACTS_DIR_NAME),
        workingDirectory
            .resolve(CLUSTER_CONFIG_DISK_DIR_PREFIX + cache.getDistributedSystem().getName()));
  }

  @VisibleForTesting
  public InternalConfigurationPersistenceService(JAXBService jaxbService) {
    this(null, null, jaxbService, null, null);
  }

  private InternalConfigurationPersistenceService(InternalCache cache,
      DistributedLockService sharedConfigLockingService, JAXBService jaxbService,
      Path configDirPath, Path configDiskDirPath) {
    this.cache = cache;
    this.configDirPath = configDirPath;
    this.configDiskDirPath = configDiskDirPath;
    this.sharedConfigLockingService = sharedConfigLockingService;
    status.set(SharedConfigurationStatus.NOT_STARTED);
    this.jaxbService = jaxbService;
  }

  /**
   * Gets or creates (if not created) shared configuration lock service
   */
  private static DistributedLockService sharedConfigLockService(DistributedSystem ds) {
    DistributedLockService sharedConfigDls =
        DLockService.getServiceNamed(SHARED_CONFIG_LOCK_SERVICE_NAME);
    try {
      if (sharedConfigDls == null) {
        sharedConfigDls = DLockService.create(SHARED_CONFIG_LOCK_SERVICE_NAME,
            (InternalDistributedSystem) ds, true, true);
      }
    } catch (IllegalArgumentException ignore) {
      return DLockService.getServiceNamed(SHARED_CONFIG_LOCK_SERVICE_NAME);
    }
    return sharedConfigDls;
  }

  public JAXBService getJaxbService() {
    return jaxbService;
  }

  /**
   * Adds/replaces the xml entity in the shared configuration we don't need to trigger the change
   * listener for this modification, so it's ok to operate on the original configuration object
   */
  public void addXmlEntity(XmlEntity xmlEntity, String[] groups) {
    lockSharedConfiguration();
    try {
      Region<String, Configuration> configRegion = getConfigurationRegion();
      if (groups == null || groups.length == 0) {
        groups = new String[] {ConfigurationPersistenceService.CLUSTER_CONFIG};
      }
      for (String group : groups) {
        Configuration configuration = configRegion.get(group);
        if (configuration == null) {
          configuration = new Configuration(group);
        }
        String xmlContent = configuration.getCacheXmlContent();
        if (xmlContent == null || xmlContent.isEmpty()) {
          xmlContent = generateInitialXmlContent();
        }
        try {
          final Document doc = XmlUtils.createAndUpgradeDocumentFromXml(xmlContent);
          XmlUtils.addNewNode(doc, xmlEntity);
          configuration.setCacheXmlContent(XmlUtils.prettyXml(doc));
          configRegion.put(group, configuration);
        } catch (Exception e) {
          logger.error("error updating cluster configuration for group {}", group, e);
        }
      }
    } finally {
      unlockSharedConfiguration();
    }
  }

  /**
   * Deletes the xml entity from the shared configuration.
   */
  public void deleteXmlEntity(final XmlEntity xmlEntity, String[] groups) {
    lockSharedConfiguration();
    try {
      Region<String, Configuration> configRegion = getConfigurationRegion();
      // No group is specified, so delete in every single group if it exists.
      if (groups == null) {
        Set<String> groupSet = configRegion.keySet();
        groups = groupSet.toArray(new String[groupSet.size()]);
      }
      for (String group : groups) {
        Configuration configuration = configRegion.get(group);
        if (configuration != null) {
          String xmlContent = configuration.getCacheXmlContent();
          try {
            if (xmlContent != null && !xmlContent.isEmpty()) {
              Document doc = XmlUtils.createAndUpgradeDocumentFromXml(xmlContent);
              XmlUtils.deleteNode(doc, xmlEntity);
              configuration.setCacheXmlContent(XmlUtils.prettyXml(doc));
              configRegion.put(group, configuration);
            }
          } catch (Exception e) {
            logger.error("error updating cluster configuration for group {}", group, e);
          }
        }
      }
    } finally {
      unlockSharedConfiguration();
    }
  }

  /**
   * we don't need to trigger the change listener for this modification, so it's ok to operate on
   * the original configuration object
   */
  public void modifyXmlAndProperties(Properties properties, XmlEntity xmlEntity, String[] groups) {
    lockSharedConfiguration();
    try {
      if (groups == null) {
        groups = new String[] {ConfigurationPersistenceService.CLUSTER_CONFIG};
      }
      Region<String, Configuration> configRegion = getConfigurationRegion();
      for (String group : groups) {
        Configuration configuration = configRegion.get(group);
        if (configuration == null) {
          configuration = new Configuration(group);
        }

        if (xmlEntity != null) {
          String xmlContent = configuration.getCacheXmlContent();
          if (xmlContent == null || xmlContent.isEmpty()) {
            StringWriter sw = new StringWriter();
            PrintWriter pw = new PrintWriter(sw);
            CacheXmlGenerator.generateDefault(pw);
            xmlContent = sw.toString();
          }
          try {
            Document doc = XmlUtils.createAndUpgradeDocumentFromXml(xmlContent);
            // Modify the cache attributes
            XmlUtils.modifyRootAttributes(doc, xmlEntity);
            // Change the xml content of the configuration and put it the config region
            configuration.setCacheXmlContent(XmlUtils.prettyXml(doc));
          } catch (Exception e) {
            logger.error("error updating cluster configuration for group {}", group, e);
          }
        }

        if (properties != null) {
          configuration.getGemfireProperties().putAll(properties);
        }
        configRegion.put(group, configuration);
      }
    } finally {
      unlockSharedConfiguration();
    }
  }

  /**
   * Add jar information into the shared configuration and save the jars in the file system used
   * when deploying jars
   */
  public void addJarsToThisLocator(List<String> jarFullPaths, String[] groups) throws IOException {
    lockSharedConfiguration();
    try {
      if (groups == null) {
        groups = new String[] {ConfigurationPersistenceService.CLUSTER_CONFIG};
      }
      Region<String, Configuration> configRegion = getConfigurationRegion();
      for (String group : groups) {
        Configuration configuration = configRegion.get(group);

        if (configuration == null) {
          configuration = new Configuration(group);
          createConfigDirIfNecessary(group);
        }

        Path groupDir = configDirPath.resolve(group);
        Set<String> jarNames = new HashSet<>();
        for (String jarFullPath : jarFullPaths) {
          File stagedJar = new File(jarFullPath);
          jarNames.add(stagedJar.getName());
          Path filePath = groupDir.resolve(stagedJar.getName());
          FileUtils.copyFile(stagedJar, filePath.toFile());
        }

        // update the record after writing the jars to the file system, since the listener
        // will need the jars on file to upload to other locators. Need to update the jars
        // using a new copy of the Configuration so that the change listener will pick up the jar
        // name changes.
        String memberId = cache.getMyId().getId();

        Configuration configurationCopy = new Configuration(configuration);
        configurationCopy.addJarNames(jarNames);
        configRegion.put(group, configurationCopy, memberId);
      }
    } finally {
      unlockSharedConfiguration();
    }
  }

  /**
   * Removes the jar files from the shared configuration. used when undeploy jars
   *
   * @param jarNames Names of the jar files.
   * @param groups Names of the groups which had the jar file deployed.
   * @return true on success.
   */
  public boolean removeJars(final String[] jarNames, String[] groups) {
    lockSharedConfiguration();
    boolean success = true;
    try {
      Region<String, Configuration> configRegion = getConfigurationRegion();
      if (groups == null) {
        groups = configRegion.keySet().stream().toArray(String[]::new);
      }
      for (String group : groups) {
        Configuration configuration = configRegion.get(group);
        if (configuration == null) {
          break;
        }

        for (String jarRemoved : jarNames) {
          File jar = getPathToJarOnThisLocator(group, jarRemoved).toFile();
          if (jar.exists()) {
            try {
              FileUtils.forceDelete(jar);
            } catch (IOException e) {
              logger.error(
                  "Exception occurred while attempting to delete a jar from the filesystem: {}",
                  jarRemoved, e);
            }
          }
        }

        Configuration configurationCopy = new Configuration(configuration);
        configurationCopy.removeJarNames(jarNames);
        configRegion.put(group, configurationCopy);
      }
    } catch (Exception e) {
      logger.info("Exception occurred while deleting the jar files", e);
      success = false;
    } finally {
      unlockSharedConfiguration();
    }
    return success;
  }

  // Only used when a locator is initially starting up
  public void downloadJarFromOtherLocators(String groupName, String jarName)
      throws IllegalStateException, IOException {
    logger.info("Getting Jar files from other locators");
    DistributionManager dm = cache.getDistributionManager();
    DistributedMember me = cache.getMyId();
    List<DistributedMember> locators =
        new ArrayList<>(dm.getAllHostedLocatorsWithSharedConfiguration().keySet());
    locators.remove(me);

    createConfigDirIfNecessary(groupName);

    if (locators.isEmpty()) {
      throw new IllegalStateException(
          "Request to download jar " + jarName + " but no other locators are present");
    }

    downloadJarFromLocator(groupName, jarName, locators.get(0));
  }

  // used in the cluster config change listener when jarnames are changed in the internal region
  public void downloadJarFromLocator(String groupName, String jarName,
      DistributedMember sourceLocator) throws IllegalStateException, IOException {
    logger.info("Downloading jar {} from locator {}", jarName, sourceLocator.getName());

    createConfigDirIfNecessary(groupName);

    File jarFile = downloadJar(sourceLocator, groupName, jarName);

    File jarToWrite = getPathToJarOnThisLocator(groupName, jarName).toFile();
    Files.copy(jarFile.toPath(), jarToWrite.toPath(), StandardCopyOption.REPLACE_EXISTING);
  }

  /**
   * Retrieve a deployed jar from a locator. The retrieved file is staged in a temporary location.
   *
   * @param locator the DistributedMember
   * @param groupName the group to use when retrieving the jar
   * @param jarName the name of the deployed jar
   * @return a File referencing the downloaded jar. The File is downloaded to a temporary location.
   */
  public File downloadJar(DistributedMember locator, String groupName, String jarName)
      throws IOException {
    ClusterConfigurationLoader loader = new ClusterConfigurationLoader();
    return loader.downloadJar(locator, groupName, jarName);
  }

  /**
   * Creates the shared configuration service
   *
   * @param loadSharedConfigFromDir when set to true, loads the configuration from the share_config
   *        directory
   */
  void initSharedConfiguration(boolean loadSharedConfigFromDir) throws IOException {
    status.set(SharedConfigurationStatus.STARTED);
    Region<String, Configuration> configRegion = getConfigurationRegion();
    lockSharedConfiguration();
    try {
      removeInvalidXmlConfigurations(configRegion);
      if (loadSharedConfigFromDir) {
        logger.info("Reading cluster configuration from '{}' directory",
            InternalConfigurationPersistenceService.CLUSTER_CONFIG_ARTIFACTS_DIR_NAME);
        loadSharedConfigurationFromDir(configDirPath.toFile());
      } else {
        persistSecuritySettings(configRegion);
        // for those groups that have jar files, need to download the jars from other locators
        // if it doesn't exist yet
        for (Entry<String, Configuration> stringConfigurationEntry : configRegion.entrySet()) {
          Configuration config = stringConfigurationEntry.getValue();
          for (String jar : config.getJarNames()) {
            if (!getPathToJarOnThisLocator(stringConfigurationEntry.getKey(), jar).toFile()
                .exists()) {
              downloadJarFromOtherLocators(stringConfigurationEntry.getKey(), jar);
            }
          }
        }
      }
    } finally {
      unlockSharedConfiguration();
    }

    status.set(SharedConfigurationStatus.RUNNING);
  }

  void removeInvalidXmlConfigurations(Region<String, Configuration> configRegion)
      throws IOException {
    for (Map.Entry<String, Configuration> entry : configRegion.entrySet()) {
      String group = entry.getKey();
      Configuration configuration = entry.getValue();
      String configurationXml = configuration.getCacheXmlContent();
      if (configurationXml != null && !configurationXml.isEmpty()) {
        try {
          Document document = XmlUtils.createDocumentFromXml(configurationXml);
          boolean removedInvalidReceivers = removeInvalidGatewayReceivers(document);
          boolean removedDuplicateReceivers = removeDuplicateGatewayReceivers(document);
          if (removedInvalidReceivers || removedDuplicateReceivers) {
            configuration.setCacheXmlContent(XmlUtils.prettyXml(document));
            configRegion.put(group, configuration);
          }
        } catch (SAXException | TransformerException | ParserConfigurationException e) {
          throw new IOException("Unable to parse existing cluster configuration from disk. ", e);
        }
      }
    }
  }

  boolean removeInvalidGatewayReceivers(Document document) throws TransformerException {
    boolean modified = false;
    NodeList receiverNodes = document.getElementsByTagName("gateway-receiver");
    for (int i = receiverNodes.getLength() - 1; i >= 0; i--) {
      Element receiverElement = (Element) receiverNodes.item(i);

      // Check hostname-for-senders
      String hostNameForSenders = receiverElement.getAttribute("hostname-for-senders");
      if (StringUtils.isNotBlank(hostNameForSenders)) {
        receiverElement.getParentNode().removeChild(receiverElement);
        logger.info("Removed invalid cluster configuration gateway-receiver element="
            + XmlUtils.prettyXml(receiverElement));
        modified = true;
      }

      // Check bind-address
      String bindAddress = receiverElement.getAttribute("bind-address");
      if (StringUtils.isNotBlank(bindAddress) && !bindAddress.equals("0.0.0.0")) {
        receiverElement.getParentNode().removeChild(receiverElement);
        logger.info("Removed invalid cluster configuration gateway-receiver element="
            + XmlUtils.prettyXml(receiverElement));
        modified = true;
      }
    }
    return modified;
  }

  boolean removeDuplicateGatewayReceivers(Document document) throws TransformerException {
    boolean modified = false;
    NodeList receiverNodes = document.getElementsByTagName("gateway-receiver");
    while (receiverNodes.getLength() > 1) {
      Element receiverElement = (Element) receiverNodes.item(0);
      receiverElement.getParentNode().removeChild(receiverElement);
      logger.info("Removed duplicate cluster configuration gateway-receiver element="
          + XmlUtils.prettyXml(receiverElement));
      modified = true;
      receiverNodes = document.getElementsByTagName("gateway-receiver");
    }
    return modified;
  }

  private void persistSecuritySettings(final Region<String, Configuration> configRegion) {
    Properties securityProps = cache.getDistributedSystem().getSecurityProperties();

    Configuration clusterPropertiesConfig =
        configRegion.get(ConfigurationPersistenceService.CLUSTER_CONFIG);
    if (clusterPropertiesConfig == null) {
      clusterPropertiesConfig = new Configuration(ConfigurationPersistenceService.CLUSTER_CONFIG);
      configRegion.put(ConfigurationPersistenceService.CLUSTER_CONFIG, clusterPropertiesConfig);
    }
    // put security-manager and security-post-processor in the cluster config
    Properties clusterProperties = clusterPropertiesConfig.getGemfireProperties();

    if (securityProps.containsKey(SECURITY_MANAGER)) {
      clusterProperties.setProperty(SECURITY_MANAGER, securityProps.getProperty(SECURITY_MANAGER));
    }
    if (securityProps.containsKey(SECURITY_POST_PROCESSOR)) {
      clusterProperties.setProperty(SECURITY_POST_PROCESSOR,
          securityProps.getProperty(SECURITY_POST_PROCESSOR));
    }
  }

  /**
   * Creates a ConfigurationResponse based on the configRequest, configuration response contains the
   * requested shared configuration This method locks the ConfigurationPersistenceService
   */
  public ConfigurationResponse createConfigurationResponse(Set<String> groups) {
    ConfigurationResponse configResponse = null;

    boolean isLocked = lockSharedConfiguration();
    try {
      if (isLocked) {
        configResponse = new ConfigurationResponse();
        groups.add(ConfigurationPersistenceService.CLUSTER_CONFIG);
        logger.info("Building up configuration response with following configurations: {}", groups);

        for (String group : groups) {
          Configuration configuration = getConfiguration(group);
          configResponse.addConfiguration(configuration);
          if (configuration != null) {
            configResponse.addJar(group, configuration.getJarNames());
          }
        }

        return configResponse;
      }
    } finally {
      unlockSharedConfiguration();
    }
    return configResponse;
  }

  /**
   * Create a response containing the status of the Shared configuration and information about other
   * locators containing newer shared configuration data (if at all)
   *
   * @return {@link SharedConfigurationStatusResponse} containing the
   *         {@link SharedConfigurationStatus}
   */
  SharedConfigurationStatusResponse createStatusResponse() {
    SharedConfigurationStatusResponse response = new SharedConfigurationStatusResponse();
    response.setStatus(getStatus());
    response.addWaitingLocatorInfo(newerSharedConfigurationLocatorInfo);
    return response;
  }

  /**
   * For tests only. TODO: clean this up and remove from production code
   * <p>
   * Throws {@code AssertionError} wrapping any exception thrown by operation.
   */
  public void destroySharedConfiguration() {
    try {
      Region<String, Configuration> configRegion = getConfigurationRegion();
      if (configRegion != null) {
        configRegion.destroyRegion();
      }
      DiskStore configDiskStore = cache.findDiskStore(CLUSTER_CONFIG_ARTIFACTS_DIR_NAME);
      if (configDiskStore != null) {
        configDiskStore.destroy();
      }
      FileUtils.deleteDirectory(configDirPath.toFile());
    } catch (Exception exception) {
      throw new AssertionError(exception);
    }
  }

  public Path getPathToJarOnThisLocator(String groupName, String jarName) {
    return configDirPath.resolve(groupName).resolve(jarName);
  }

  public Configuration getConfiguration(String groupName) {
    return getConfigurationRegion().get(groupName);
  }

  public void setConfiguration(String groupName, Configuration configuration) {
    getConfigurationRegion().put(groupName, configuration);
  }

  public boolean hasXmlConfiguration() {
    Region<String, Configuration> configRegion = getConfigurationRegion();
    return configRegion.values().stream().anyMatch(c -> c.getCacheXmlContent() != null);
  }

  public Map<String, Configuration> getEntireConfiguration() {
    Set<String> keys = getConfigurationRegion().keySet();
    return getConfigurationRegion().getAll(keys);
  }

  public Set<String> getGroups() {
    return getConfigurationRegion().keySet();
  }

  public Path getClusterConfigDirPath() {
    return configDirPath;
  }

  /**
   * Gets the current status of the ConfigurationPersistenceService If the status is started , it
   * determines if the shared configuration is waiting for new configuration on other locators
   *
   * @return {@link SharedConfigurationStatus}
   */
  public SharedConfigurationStatus getStatus() {
    SharedConfigurationStatus scStatus = status.get();
    if (scStatus == SharedConfigurationStatus.STARTED) {
      PersistentMemberManager pmm = cache.getPersistentMemberManager();
      Map<String, Set<PersistentMemberID>> waitingRegions = pmm.getWaitingRegions();
      if (!waitingRegions.isEmpty()) {
        status.compareAndSet(SharedConfigurationStatus.STARTED,
            SharedConfigurationStatus.WAITING);
        Set<PersistentMemberID> persMemIds =
            waitingRegions.get(Region.SEPARATOR_CHAR + CONFIG_REGION_NAME);
        for (PersistentMemberID persMemId : persMemIds) {
          newerSharedConfigurationLocatorInfo.add(new PersistentMemberPattern(persMemId));
        }
      }
    }
    return status.get();
  }

  // configDir is the dir that has all the groups structure underneath it.
  public void loadSharedConfigurationFromDir(File configDir) throws IOException {
    lockSharedConfiguration();
    try {
      File[] groupNames = configDir.listFiles((FileFilter) DirectoryFileFilter.INSTANCE);
      boolean needToCopyJars = true;
      if (configDir.getAbsolutePath().equals(configDirPath)) {
        needToCopyJars = false;
      }

      logger.info("loading the cluster configuration: ");
      Map<String, Configuration> sharedConfiguration = new HashMap<>();
      for (File groupName : groupNames) {
        Configuration configuration = readConfiguration(groupName);
        logger.info(configuration.getConfigName() + " xml content: \n"
            + configuration.getCacheXmlContent());
        logger.info(configuration.getConfigName() + " properties: "
            + configuration.getGemfireProperties().size());
        logger.info(configuration.getConfigName() + " jars: "
            + Strings.join(configuration.getJarNames(), ", "));
        sharedConfiguration.put(groupName.getName(), configuration);
        if (needToCopyJars && configuration.getJarNames().size() > 0) {
          Path groupDirPath = createConfigDirIfNecessary(configuration.getConfigName()).toPath();
          for (String jarName : configuration.getJarNames()) {
            Files.copy(groupName.toPath().resolve(jarName), groupDirPath.resolve(jarName));
          }
        }
      }
      Region<String, Configuration> clusterRegion = getConfigurationRegion();
      clusterRegion.clear();

      String memberId = cache.getMyId().getId();
      clusterRegion.putAll(sharedConfiguration, memberId);

      // Overwrite the security settings using the locator's properties, ignoring whatever
      // in the import
      persistSecuritySettings(clusterRegion);
    } finally {
      unlockSharedConfiguration();
    }
  }

  // Write the content of xml and properties into the file system for exporting purpose
  public void writeConfigToFile(final Configuration configuration, File rootDir)
      throws IOException {
    File configDir = createConfigDirIfNecessary(rootDir, configuration.getConfigName());

    File propsFile = new File(configDir, configuration.getPropertiesFileName());
    BufferedWriter bw = new BufferedWriter(new FileWriter(propsFile));
    configuration.getGemfireProperties().store(bw, null);
    bw.close();

    File xmlFile = new File(configDir, configuration.getCacheXmlFileName());
    FileUtils.writeStringToFile(xmlFile, configuration.getCacheXmlContent(), "UTF-8");

    // copy the jars if the rootDir is different than the configDirPath
    if (rootDir.getAbsolutePath().equals(configDirPath)) {
      return;
    }

    File locatorConfigDir = configDirPath.resolve(configuration.getConfigName()).toFile();
    if (locatorConfigDir.exists()) {
      File[] jarFiles = locatorConfigDir.listFiles(x -> x.getName().endsWith(".jar"));
      for (File file : jarFiles) {
        Files.copy(file.toPath(), configDir.toPath().resolve(file.getName()));
      }
    }
  }

  public boolean lockSharedConfiguration() {
    return sharedConfigLockingService.lock(SHARED_CONFIG_LOCK_NAME, -1, -1);
  }

  public void unlockSharedConfiguration() {
    sharedConfigLockingService.unlock(SHARED_CONFIG_LOCK_NAME);
  }

  /**
   * Gets the region containing the shared configuration data. The region is created , if it does
   * not exist already. Note : this could block if this locator contains stale persistent
   * configuration data.
   *
   * @return {@link Region} ConfigurationRegion, this should never be null
   */
  public Region<String, Configuration> getConfigurationRegion() {
    Region<String, Configuration> configRegion = cache.getRegion(CONFIG_REGION_NAME);

    try {
      if (configRegion == null) {
        File diskDir = configDiskDirPath.toFile();

        if (!diskDir.exists() && !diskDir.mkdirs()) {
          throw new IOException("Cannot create directory at " + configDiskDirPath);
        }

        File[] diskDirs = {diskDir};
        cache.createDiskStoreFactory().setDiskDirs(diskDirs).setAutoCompact(true)
            .setMaxOplogSize(10).create(CLUSTER_CONFIG_DISK_STORE_NAME);

        AttributesFactory<String, Configuration> regionAttrsFactory = new AttributesFactory<>();
        regionAttrsFactory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
        regionAttrsFactory.setCacheListener(new ConfigurationChangeListener(this, cache));
        regionAttrsFactory.setDiskStoreName(CLUSTER_CONFIG_DISK_STORE_NAME);
        regionAttrsFactory.setScope(Scope.DISTRIBUTED_ACK);
        InternalRegionArguments internalArgs = new InternalRegionArguments();
        internalArgs.setIsUsedForMetaRegion(true);
        internalArgs.setMetaRegionWithTransactions(false);

        configRegion = cache.createVMRegion(CONFIG_REGION_NAME, regionAttrsFactory.create(),
            internalArgs);
      }
    } catch (RuntimeException e) {
      if (configRegion == null) {
        status.set(SharedConfigurationStatus.STOPPED);
      }
      // throw RuntimeException as is
      throw e;
    } catch (Exception e) {
      if (configRegion == null) {
        status.set(SharedConfigurationStatus.STOPPED);
      }
      // turn all other exceptions into runtime exceptions
      throw new RuntimeException("Error occurred while initializing cluster configuration", e);
    }

    return configRegion;
  }

  /**
   * Reads the configuration information from the shared configuration directory and returns a
   * {@link Configuration} object
   *
   * @return {@link Configuration}
   */
  private Configuration readConfiguration(File groupConfigDir) throws IOException {
    Configuration configuration = new Configuration(groupConfigDir.getName());
    File cacheXmlFull = new File(groupConfigDir, configuration.getCacheXmlFileName());
    File propertiesFull = new File(groupConfigDir, configuration.getPropertiesFileName());

    configuration.setCacheXmlFile(cacheXmlFull);
    configuration.setPropertiesFile(propertiesFull);

    Set<String> jarFileNames = Arrays.stream(groupConfigDir.list())
        .filter((String filename) -> filename.endsWith(".jar")).collect(Collectors.toSet());
    configuration.addJarNames(jarFileNames);
    return configuration;
  }

  /**
   * Creates a directory for this configuration if it doesn't already exist.
   */
  private File createConfigDirIfNecessary(final String configName) throws IOException {
    return createConfigDirIfNecessary(configDirPath.toFile(), configName);
  }

  private File createConfigDirIfNecessary(File clusterConfigDir, final String configName)
      throws IOException {
    if (!clusterConfigDir.exists() && !clusterConfigDir.mkdirs()) {
      throw new IOException("Cannot create directory : " + configDirPath);
    }
    Path configDirPath = clusterConfigDir.toPath().resolve(configName);

    File configDir = configDirPath.toFile();
    if (!configDir.exists() && !configDir.mkdir()) {
      throw new IOException("Cannot create directory : " + configDirPath);
    }

    return configDir;
  }

  private String generateInitialXmlContent() {
    StringWriter sw = new StringWriter();
    PrintWriter pw = new PrintWriter(sw);
    CacheXmlGenerator.generateDefault(pw);
    return sw.toString();
  }

  @Override
  public CacheConfig getCacheConfig(String group) {
    return getCacheConfig(group, false);
  }


  @Override
  public CacheConfig getCacheConfig(String group, boolean createNew) {
    if (group == null) {
      group = CLUSTER_CONFIG;
    }
    Configuration configuration = getConfiguration(group);
    if (configuration == null) {
      if (createNew) {
        return new CacheConfig(CACHE_CONFIG_VERSION);
      }
      return null;
    }
    String xmlContent = configuration.getCacheXmlContent();
    // group existed, so we should create a blank one to start with
    if (xmlContent == null || xmlContent.isEmpty()) {
      if (createNew) {
        return new CacheConfig(CACHE_CONFIG_VERSION);
      }
      return null;
    }

    return jaxbService.unMarshall(xmlContent);
  }

  @Override
  public void updateCacheConfig(String group, UnaryOperator<CacheConfig> mutator) {
    if (group == null) {
      group = CLUSTER_CONFIG;
    }
    lockSharedConfiguration();
    try {
      CacheConfig cacheConfig = getCacheConfig(group, true);
      cacheConfig = mutator.apply(cacheConfig);
      if (cacheConfig == null) {
        // mutator returns a null config, indicating no change needs to be persisted
        return;
      }
      Configuration configuration = getConfiguration(group);
      if (configuration == null) {
        configuration = new Configuration(group);
      }
      configuration.setCacheXmlContent(jaxbService.marshall(cacheConfig));
      getConfigurationRegion().put(group, configuration);
    } finally {
      unlockSharedConfiguration();
    }
  }
}
