/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.knox.gateway.services.topology.impl;

import org.apache.commons.digester3.Digester;
import org.apache.commons.digester3.binder.DigesterLoader;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.monitor.FileAlterationListener;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.apache.knox.gateway.GatewayMessages;
import org.apache.knox.gateway.GatewayServer;
import org.apache.knox.gateway.audit.api.Action;
import org.apache.knox.gateway.audit.api.ActionOutcome;
import org.apache.knox.gateway.audit.api.AuditServiceFactory;
import org.apache.knox.gateway.audit.api.Auditor;
import org.apache.knox.gateway.audit.api.ResourceType;
import org.apache.knox.gateway.audit.log4j.audit.AuditConstants;
import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.i18n.messages.MessagesFactory;
import org.apache.knox.gateway.service.definition.ServiceDefinition;
import org.apache.knox.gateway.service.definition.ServiceDefinitionChangeListener;
import org.apache.knox.gateway.services.ServiceType;
import org.apache.knox.gateway.services.ServiceLifecycleException;
import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClient;
import org.apache.knox.gateway.services.security.AliasService;
import org.apache.knox.gateway.services.topology.TopologyService;
import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.knox.gateway.topology.Service;
import org.apache.knox.gateway.topology.Topology;
import org.apache.knox.gateway.topology.TopologyEvent;
import org.apache.knox.gateway.topology.TopologyListener;
import org.apache.knox.gateway.topology.TopologyMonitor;
import org.apache.knox.gateway.topology.TopologyProvider;
import org.apache.knox.gateway.topology.Version;
import org.apache.knox.gateway.topology.builder.TopologyBuilder;
import org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitor;
import org.apache.knox.gateway.topology.discovery.ServiceDiscovery;
import org.apache.knox.gateway.topology.monitor.RemoteConfigurationMonitor;
import org.apache.knox.gateway.topology.monitor.RemoteConfigurationMonitorFactory;
import org.apache.knox.gateway.topology.simple.ProviderConfigurationParser;
import org.apache.knox.gateway.topology.simple.SimpleDescriptor;
import org.apache.knox.gateway.topology.simple.SimpleDescriptorFactory;
import org.apache.knox.gateway.topology.simple.SimpleDescriptorHandler;
import org.apache.knox.gateway.topology.validation.TopologyValidator;
import org.apache.knox.gateway.topology.xml.AmbariFormatXmlTopologyRules;
import org.apache.knox.gateway.topology.xml.KnoxFormatXmlTopologyRules;
import org.apache.knox.gateway.util.ServiceDefinitionsLoader;
import org.eclipse.persistence.jaxb.JAXBContextProperties;
import org.xml.sax.SAXException;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import static org.apache.commons.digester3.binder.DigesterLoader.newLoader;

public class DefaultTopologyService extends FileAlterationListenerAdaptor implements TopologyService, TopologyMonitor,
    TopologyProvider, FileFilter, FileAlterationListener, ServiceDefinitionChangeListener {

  private static final JAXBContext jaxbContext = getJAXBContext();

  private static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor(
    AuditConstants.DEFAULT_AUDITOR_NAME, AuditConstants.KNOX_SERVICE_NAME,
    AuditConstants.KNOX_COMPONENT_NAME);

  private static final List<String> SUPPORTED_TOPOLOGY_FILE_EXTENSIONS = Arrays.asList("xml", "conf");

  private static GatewayMessages log = MessagesFactory.get(GatewayMessages.class);
  private static DigesterLoader digesterLoader = newLoader(new KnoxFormatXmlTopologyRules(),
      new AmbariFormatXmlTopologyRules());
  private List<FileAlterationMonitor> monitors = new ArrayList<>();
  private File topologiesDirectory;
  private File sharedProvidersDirectory;
  private File descriptorsDirectory;

  private DescriptorsMonitor descriptorsMonitor;

  private Set<TopologyListener> listeners;
  private Map<File, Topology> topologies;
  private AliasService aliasService;

  private RemoteConfigurationMonitor remoteMonitor;

  private GatewayConfig config;

  private static JAXBContext getJAXBContext() {
    String pkgName = Topology.class.getPackage().getName();
    String bindingFile = pkgName.replace(".", "/") + "/topology_binding-xml.xml";

    Map<String, Object> properties = new HashMap<>(1);
    properties.put(JAXBContextProperties.OXM_METADATA_SOURCE, bindingFile);
    try {
      return JAXBContext.newInstance(pkgName, Topology.class.getClassLoader(), properties);
    } catch (JAXBException e) {
      throw new IllegalStateException(e);
    }
  }

  private Topology loadTopology(File file) throws IOException, SAXException, InterruptedException {
    final long TIMEOUT = 250; //ms
    final long DELAY = 50; //ms
    log.loadingTopologyFile(file.getAbsolutePath());
    Topology topology;
    long start = System.currentTimeMillis();
    while (true) {
      try {
        topology = loadTopologyAttempt(file);
        break;
      } catch (IOException | SAXException e) {
        if (System.currentTimeMillis() - start < TIMEOUT) {
          log.failedToLoadTopologyRetrying(file.getAbsolutePath(), Long.toString(DELAY), e);
          Thread.sleep(DELAY);
        } else {
          throw e;
        }
      }
    }
    return topology;
  }

  private Topology loadTopologyAttempt(File file) throws IOException, SAXException {
    Topology topology;
    Digester digester = digesterLoader.newDigester();
    TopologyBuilder topologyBuilder = digester.parse(FileUtils.openInputStream(file));
    if (null == topologyBuilder) {
      return null;
    }
    topology = topologyBuilder.build();
    topology.setUri(file.toURI());
    topology.setName(FilenameUtils.removeExtension(file.getName()));
    topology.setTimestamp(file.lastModified());
    return topology;
  }

  private void redeployTopology(Topology topology) {
    File topologyFile = new File(topology.getUri());
    try {
      TopologyValidator tv = new TopologyValidator(topology);

      if(!tv.validateTopology()) {
        if(config != null && config.isTopologyValidationEnabled()) {
          /* If strict validation enabled we fail */
          throw new SAXException(tv.getErrorString());
        } else {
          /* Log and move on */
          log.failedToValidateTopology(topology.getName(), tv.getErrorString());
        }
      }

      long start = System.currentTimeMillis();
      long limit = 1000L; // One second.
      long elapsed = 1;
      while (elapsed <= limit) {
        try {
          long origTimestamp = topologyFile.lastModified();
          long setTimestamp = Math.max(System.currentTimeMillis(), topologyFile.lastModified() + elapsed);
          if(topologyFile.setLastModified(setTimestamp)) {
            long newTimstamp = topologyFile.lastModified();
            if(newTimstamp > origTimestamp) {
              break;
            } else {
              Thread.sleep(10);
              elapsed = System.currentTimeMillis() - start;
              continue;
            }
          } else {
            auditor.audit(Action.REDEPLOY, topology.getName(), ResourceType.TOPOLOGY,
                ActionOutcome.FAILURE);
            log.failedToRedeployTopology(topology.getName());
            break;
          }
        } catch (InterruptedException e) {
          auditor.audit(Action.REDEPLOY, topology.getName(), ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
          log.failedToRedeployTopology(topology.getName(), e);
          Thread.currentThread().interrupt();
        }
      }
    } catch (SAXException e) {
      auditor.audit(Action.REDEPLOY, topology.getName(), ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
      log.failedToRedeployTopology(topology.getName(), e);
    }
  }

  private List<TopologyEvent> createChangeEvents(
      Map<File, Topology> oldTopologies,
      Map<File, Topology> newTopologies) {
    ArrayList<TopologyEvent> events = new ArrayList<>();
    // Go through the old topologies and find anything that was deleted.
    for (Entry<File, Topology> oldTopology : oldTopologies.entrySet()) {
      if (!newTopologies.containsKey(oldTopology.getKey())) {
        events.add(new TopologyEvent(TopologyEvent.Type.DELETED, oldTopology.getValue()));
      }
    }
    // Go through the new topologies and figure out what was updated vs added.
    for (Entry<File, Topology> newTopology : newTopologies.entrySet()) {
      if (oldTopologies.containsKey(newTopology.getKey())) {
        Topology oldTopology = oldTopologies.get(newTopology.getKey());
        if (newTopology.getValue().getTimestamp() > oldTopology.getTimestamp()) {
          events.add(new TopologyEvent(TopologyEvent.Type.UPDATED, newTopology.getValue()));
        }
      } else {
        events.add(new TopologyEvent(TopologyEvent.Type.CREATED, newTopology.getValue()));
      }
    }
    return events;
  }

  private File calculateAbsoluteTopologiesDir(GatewayConfig config) {
    File topoDir = new File(config.getGatewayTopologyDir());
    topoDir = topoDir.getAbsoluteFile();
    return topoDir;
  }

  private File calculateAbsoluteConfigDir(GatewayConfig config) {
    File configDir;

    String path = config.getGatewayConfDir();
    configDir = (path != null) ? new File(path) : (new File(config.getGatewayTopologyDir())).getParentFile();

    return configDir.getAbsoluteFile();
  }

  private void initListener(FileAlterationMonitor  monitor,
                            File                   directory,
                            FileFilter             filter,
                            FileAlterationListener listener) {
    monitors.add(monitor);
    FileAlterationObserver observer = new FileAlterationObserver(directory, filter);
    observer.addListener(listener);
    monitor.addObserver(observer);
  }

  private void initListener(File directory, FileFilter filter, FileAlterationListener listener) {
    // Increasing the monitoring interval to 5 seconds as profiling has shown
    // this is rather expensive in terms of generated garbage objects.
    initListener(new FileAlterationMonitor(5000L), directory, filter, listener);
  }

  private Map<File, Topology> loadTopologies(File directory) {
    Map<File, Topology> map = new HashMap<>();
    if (directory.isDirectory() && directory.canRead()) {
      File[] existingTopologies = directory.listFiles(this);
      if (existingTopologies != null) {
        for (File file : existingTopologies) {
          try {
            Topology loadTopology = loadTopology(file);
            if (null != loadTopology) {
              map.put(file, loadTopology);
            } else {
              auditor.audit(Action.LOAD, file.getAbsolutePath(), ResourceType.TOPOLOGY,
                      ActionOutcome.FAILURE);
              log.failedToLoadTopology(file.getAbsolutePath());
            }
          } catch (Exception e) {
            // Maybe it makes sense to throw exception
            auditor.audit(Action.LOAD, file.getAbsolutePath(), ResourceType.TOPOLOGY,
                    ActionOutcome.FAILURE);
            log.failedToLoadTopology(file.getAbsolutePath(), e);
          }
        }
      }
    }
    return map;
  }

  public void setAliasService(AliasService as) {
    this.aliasService = as;
  }

  @Override
  public void deployTopology(Topology t){

    try {
      File temp = new File(topologiesDirectory.getAbsolutePath() + "/" + t.getName() + ".xml.temp");
      Marshaller mr = jaxbContext.createMarshaller();

      mr.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
      mr.marshal(t, temp);

      File topology = new File(topologiesDirectory.getAbsolutePath() + "/" + t.getName() + ".xml");
      if(!temp.renameTo(topology)) {
        FileUtils.forceDelete(temp);
        throw new IOException("Could not rename temp file");
      }

      // This code will check if the topology is valid, and retrieve the errors if it is not.
      TopologyValidator validator = new TopologyValidator( topology.getAbsolutePath() );
      if( !validator.validateTopology() ){
        throw new SAXException( validator.getErrorString() );
      }


    } catch (JAXBException | SAXException | IOException e) {
      auditor.audit(Action.DEPLOY, t.getName(), ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
      log.failedToDeployTopology(t.getName(), e);
    }
    reloadTopologies();
  }

  @Override
  public void redeployTopologies(String topologyName) {

    for (Topology topology : getTopologies()) {
      if (topologyName == null || topologyName.equals(topology.getName())) {
        redeployTopology(topology);
      }
    }

  }

  @Override
  public void reloadTopologies() {
    try {
      synchronized (this) {
        Map<File, Topology> oldTopologies = topologies;
        Map<File, Topology> newTopologies = loadTopologies(topologiesDirectory);
        List<TopologyEvent> events = createChangeEvents(oldTopologies, newTopologies);
        topologies = newTopologies;
        notifyChangeListeners(events);
      }
    } catch (Exception e) {
      // Maybe it makes sense to throw exception
      log.failedToReloadTopologies(e);
    }
  }

  @Override
  public void deleteTopology(Topology t) {
    File topoDir = topologiesDirectory;

    if(topoDir.isDirectory() && topoDir.canRead()) {
      for (File f : listFiles(topoDir)) {
        String fName = FilenameUtils.getBaseName(f.getName());
        if(fName.equals(t.getName())) {
          f.delete();
        }
      }
    }
    reloadTopologies();
  }

  private void notifyChangeListeners(List<TopologyEvent> events) {
    for (TopologyListener listener : listeners) {
      try {
        listener.handleTopologyEvent(events);
      } catch (RuntimeException e) {
        auditor.audit(Action.LOAD, "Topology_Event", ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
        log.failedToHandleTopologyEvents(e);
      }
    }
  }

  @Override
  public Map<String, List<String>> getServiceTestURLs(Topology t, GatewayConfig config) {
    File tFile = null;
    Map<String, List<String>> urls = new HashMap<>();
    if (topologiesDirectory.isDirectory() && topologiesDirectory.canRead()) {
      for (File f : listFiles(topologiesDirectory)) {
        if (FilenameUtils.removeExtension(f.getName()).equals(t.getName())) {
          tFile = f;
        }
      }
    }
    Set<ServiceDefinition> defs;
    if(tFile != null) {
      defs = ServiceDefinitionsLoader.getServiceDefinitions(new File(config.getGatewayServicesDir()));

      for(ServiceDefinition def : defs) {
        urls.put(def.getRole(), def.getTestURLs());
      }
    }
    return urls;
  }

  @Override
  public Collection<Topology> getTopologies() {
    Map<File, Topology> map = topologies;
    return Collections.unmodifiableCollection(map.values());
  }

  @Override
  public boolean deployProviderConfiguration(String name, String content) {
    boolean result;

    // Whether the remote configuration registry is being employed or not, write the file locally
    result =  writeConfig(sharedProvidersDirectory, name, content);

    // If the remote configuration registry is being employed, persist it there also
    if (remoteMonitor != null) {
      RemoteConfigurationRegistryClient client = remoteMonitor.getClient();
      if (client != null) {
        String entryPath = "/knox/config/shared-providers/" + name;
        client.createEntry(entryPath, content);
        result = (client.getEntryData(entryPath) != null);
      }
    }

    return result;
  }

  @Override
  public Collection<File> getProviderConfigurations() {
    List<File> providerConfigs = new ArrayList<>();
    for (File providerConfig : listFiles(sharedProvidersDirectory)) {
      if (SharedProviderConfigMonitor.SUPPORTED_EXTENSIONS.contains(FilenameUtils.getExtension(providerConfig.getName()))) {
        providerConfigs.add(providerConfig);
      }
    }
    return providerConfigs;
  }

  @Override
  public boolean deleteProviderConfiguration(String name) {
    return deleteProviderConfiguration(name, false);
  }

  @Override
  public boolean deleteProviderConfiguration(String name, boolean force) {
    boolean result = false;

    // Determine if the file exists, and if so, if there are any descriptors referencing it
    boolean hasReferences = false;
    File providerConfig = getExistingFile(sharedProvidersDirectory, name);
    if (providerConfig != null) {
      List<String> references = descriptorsMonitor.getReferencingDescriptors(providerConfig.getAbsolutePath());
      hasReferences = !references.isEmpty();
    } else {
      result = true; // If it already does NOT exist, then the delete effectively succeeded
    }

    // If the local file does not exist, or it does exist and there are NOT any referencing descriptors
    if (force || (providerConfig == null || !hasReferences)) {

      // If the remote config monitor is configured, attempt to delete the provider configuration from the remote
      // registry, even if it does not exist locally.
      deleteRemoteEntry("/knox/config/shared-providers", name);

      // Whether the remote configuration registry is being employed or not, delete the local file if it exists
      result = providerConfig == null || !providerConfig.exists() || providerConfig.delete();

    } else {
      log.preventedDeletionOfSharedProviderConfiguration(providerConfig.getAbsolutePath());
    }

    return result;
  }

  @Override
  public boolean deployDescriptor(String name, String content) {
    boolean result;

    // Whether the remote configuration registry is being employed or not, write the file locally
    result = writeConfig(descriptorsDirectory, name, content);

    // If the remote configuration registry is being employed, persist it there also
    if (remoteMonitor != null) {
      RemoteConfigurationRegistryClient client = remoteMonitor.getClient();
      if (client != null) {
        String entryPath = "/knox/config/descriptors/" + name;
        client.createEntry(entryPath, content);
        result = (client.getEntryData(entryPath) != null);
      }
    }

    return result;
  }

  @Override
  public Collection<File> getDescriptors() {
    List<File> descriptors = new ArrayList<>();
    for (File descriptor : listFiles(descriptorsDirectory)) {
      if (DescriptorsMonitor.SUPPORTED_EXTENSIONS.contains(FilenameUtils.getExtension(descriptor.getName()))) {
        descriptors.add(descriptor);
      }
    }
    return descriptors;
  }

  @Override
  public boolean deleteDescriptor(String name) {
    boolean result;

    // If the remote config monitor is configured, delete the descriptor from the remote registry
    deleteRemoteEntry("/knox/config/descriptors", name);

    // Whether the remote configuration registry is being employed or not, delete the local file
    File descriptor = getExistingFile(descriptorsDirectory, name);
    result = (descriptor == null) || descriptor.delete();

    return result;
  }

  @Override
  public void addTopologyChangeListener(TopologyListener listener) {
    listeners.add(listener);
  }

  @Override
  public void onServiceDefinitionChange(String name, String role, String version) {
    getTopologies().stream().filter(topology -> topology.getServices().stream().anyMatch(service -> isRelevantService(service, role, name, version))).forEach(topology -> {
      log.redeployingTopologyOnServiceDefinitionChange(topology.getName(), name, role, version);
      redeployTopology(topology);
    });
  }

  private boolean isRelevantService(Service service, String role, String name, String version) {
    return service.getRole().equalsIgnoreCase(role)
        && (service.getName() == null || service.getName().equalsIgnoreCase(name) && (service.getVersion() == null || service.getVersion().equals(new Version(version))));
  }

  @Override
  public void startMonitor() throws Exception {
    // Start the local configuration monitors
    for (FileAlterationMonitor monitor : monitors) {
      monitor.start();
    }

    // Start the remote configuration monitor, if it has been initialized
    if (remoteMonitor != null) {
      try {
        remoteMonitor.start();
      } catch (Exception e) {
        log.remoteConfigurationMonitorStartFailure(remoteMonitor.getClass().getTypeName(), e.getLocalizedMessage());
      }
    }
  }

  @Override
  public void stopMonitor() throws Exception {
    // Stop the local configuration monitors
    for (FileAlterationMonitor monitor : monitors) {
      monitor.stop();
    }

    // Stop the remote configuration monitor, if it has been initialized
    if (remoteMonitor != null) {
      remoteMonitor.stop();
    }
  }

  @Override
  public boolean accept(File file) {
    boolean accept = false;
    if (!file.isDirectory() && file.canRead()) {
      String extension = FilenameUtils.getExtension(file.getName());
      if (SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.contains(extension)) {
        accept = true;
      }
    }
    return accept;
  }

  @Override
  public void onFileCreate(File file) {
    onFileChange(file);
  }

  @Override
  public void onFileDelete(java.io.File file) {
    onFileChange(file);
  }

  @Override
  public void onFileChange(File file) {
    reloadTopologies();
  }

  @Override
  public void stop() {

  }

  @Override
  public void start() {
    // Register a cluster configuration monitor listener for change notifications
    ClusterConfigurationMonitorService ccms =
                  GatewayServer.getGatewayServices().getService(ServiceType.CLUSTER_CONFIGURATION_MONITOR_SERVICE);
    ccms.addListener(new TopologyDiscoveryTrigger(this, ccms));
  }

  @Override
  public void init(GatewayConfig config, Map<String, String> options) throws ServiceLifecycleException {

    this.config = config;
    String gatewayConfDir = config.getGatewayConfDir();
    if (gatewayConfDir != null) {
      System.setProperty(ServiceDiscovery.CONFIG_DIR_PROPERTY, gatewayConfDir);
    }

    try {
      listeners  = new HashSet<>();
      topologies = new HashMap<>();

      topologiesDirectory = calculateAbsoluteTopologiesDir(config);

      File configDirectory = calculateAbsoluteConfigDir(config);
      descriptorsDirectory = new File(configDirectory, "descriptors");
      sharedProvidersDirectory = new File(configDirectory, "shared-providers");

      // Add support for conf/topologies
      initListener(topologiesDirectory, this, this);

      // Add support for conf/descriptors
      descriptorsMonitor = new DescriptorsMonitor(config, topologiesDirectory, aliasService);
      initListener(descriptorsDirectory,
                   descriptorsMonitor,
                   descriptorsMonitor);
      log.monitoringDescriptorChangesInDirectory(descriptorsDirectory.getAbsolutePath());

      // Add support for conf/shared-providers
      SharedProviderConfigMonitor spm = new SharedProviderConfigMonitor(descriptorsMonitor, descriptorsDirectory);
      initListener(sharedProvidersDirectory, spm, spm);
      log.monitoringProviderConfigChangesInDirectory(sharedProvidersDirectory.getAbsolutePath());

      // For all the descriptors currently in the descriptors dir at start-up time, determine if topology regeneration
      // is required.
      // This happens prior to the start-up loading of the topologies.
      String[] descriptorFilenames =  descriptorsDirectory.list();
      if (descriptorFilenames != null) {
        for (String descriptorFilename : descriptorFilenames) {
          if (DescriptorsMonitor.isDescriptorFile(descriptorFilename)) {
            String topologyName = FilenameUtils.getBaseName(descriptorFilename);
            File existingDescriptorFile = getExistingFile(descriptorsDirectory, topologyName);
            if (existingDescriptorFile != null) {
              // If there isn't a corresponding topology file, or if the descriptor has been modified since the
              // corresponding topology file was generated, then trigger generation of one
              File matchingTopologyFile = getExistingFile(topologiesDirectory, topologyName);
              if (matchingTopologyFile == null || matchingTopologyFile.lastModified() < existingDescriptorFile.lastModified()) {
                descriptorsMonitor.onFileChange(existingDescriptorFile);
              } else {
                // If regeneration is NOT required, then we at least need to report the provider configuration
                // reference relationship (KNOX-1144)
                String normalizedDescriptorPath = FilenameUtils.normalize(existingDescriptorFile.getAbsolutePath());

                // Parse the descriptor to determine the provider config reference
                SimpleDescriptor sd = SimpleDescriptorFactory.parse(normalizedDescriptorPath);
                if (sd != null) {
                  File referencedProviderConfig =
                             getExistingFile(sharedProvidersDirectory, FilenameUtils.getBaseName(sd.getProviderConfig()));
                  if (referencedProviderConfig != null) {
                    List<String> references =
                           descriptorsMonitor.getReferencingDescriptors(referencedProviderConfig.getAbsolutePath());
                    if (!references.contains(normalizedDescriptorPath)) {
                      references.add(normalizedDescriptorPath);
                    }
                  }
                }
              }
            }
          }
        }
      }

      // Initialize the remote configuration monitor, if it has been configured
      remoteMonitor = RemoteConfigurationMonitorFactory.get(config);

    } catch (IOException io) {
      throw new ServiceLifecycleException(io.getMessage(), io);
    }
  }

  /**
   * Delete the entry in the remote configuration registry, which matches the specified resource name.
   *
   * @param entryParent The remote registry path in which the entry exists.
   * @param name        The name of the entry (typically without any file extension).
   *
   * @return true, if the entry is deleted, or did not exist; otherwise, false.
   */
  private boolean deleteRemoteEntry(String entryParent, String name) {
    boolean result = true;

    if (remoteMonitor != null) {
      RemoteConfigurationRegistryClient client = remoteMonitor.getClient();
      if (client != null) {
        List<String> existingProviderConfigs = client.listChildEntries(entryParent);
        for (String entryName : existingProviderConfigs) {
          if (FilenameUtils.getBaseName(entryName).equals(name)) {
            String entryPath = entryParent + "/" + entryName;
            client.deleteEntry(entryPath);
            result = !client.entryExists(entryPath);
            if (!result) {
              log.failedToDeletedRemoteConfigFile("descriptor", name);
            }
            break;
          }
        }
      }
    }

    return result;
  }

  /**
   * Utility method for listing the files in the specified directory.
   * This method is "nicer" than the File#listFiles() because it will not return null.
   *
   * @param directory The directory whose files should be returned.
   *
   * @return A List of the Files on the directory.
   */
  private static List<File> listFiles(File directory) {
    List<File> result;
    File[] files = directory.listFiles();
    if (files != null) {
      result = Arrays.asList(files);
    } else {
      result = Collections.emptyList();
    }
    return result;
  }

  /**
   * Search for a file in the specified directory whose base name (filename without extension) matches the
   * specified basename.
   *
   * @param directory The directory in which to search.
   * @param basename  The basename of interest.
   *
   * @return The matching File
   */
  private static File getExistingFile(File directory, String basename) {
    File match = null;
    for (File file : listFiles(directory)) {
      if (FilenameUtils.getBaseName(file.getName()).equals(basename)) {
        match = file;
        break;
      }
    }
    return match;
  }

  /**
   * Write the specified content to a file.
   *
   * @param dest    The destination directory.
   * @param name    The name of the file.
   * @param content The contents of the file.
   *
   * @return true, if the write succeeds; otherwise, false.
   */
  private static boolean writeConfig(File dest, String name, String content) {
    boolean result = false;

    File destFile = new File(dest, name);
    try {
      FileUtils.writeStringToFile(destFile, content, StandardCharsets.UTF_8);
      log.wroteConfigurationFile(destFile.getAbsolutePath());
      result = true;
    } catch (IOException e) {
      log.failedToWriteConfigurationFile(destFile.getAbsolutePath(), e);
    }

    return result;
  }


  /**
   * Change handler for simple descriptors
   */
  public static class DescriptorsMonitor extends FileAlterationListenerAdaptor
                                          implements FileFilter {

    static final List<String> SUPPORTED_EXTENSIONS = new ArrayList<>();
    static {
      SUPPORTED_EXTENSIONS.add("json");
      SUPPORTED_EXTENSIONS.add("yml");
      SUPPORTED_EXTENSIONS.add("yaml");
    }

    private GatewayConfig gatewayConfig;

    private File topologiesDir;

    private AliasService aliasService;

    private Map<String, List<String>> providerConfigReferences = new HashMap<>();


    static boolean isDescriptorFile(String filename) {
      return SUPPORTED_EXTENSIONS.contains(FilenameUtils.getExtension(filename));
    }

    public DescriptorsMonitor(GatewayConfig config, File topologiesDir, AliasService aliasService) {
      this.gatewayConfig  = config;
      this.topologiesDir  = topologiesDir;
      this.aliasService   = aliasService;
    }

    List<String> getReferencingDescriptors(String providerConfigPath) {
      String normalizedPath = FilenameUtils.normalize(providerConfigPath);
      return providerConfigReferences.computeIfAbsent(normalizedPath, p -> new ArrayList<>());
    }

    @Override
    public void onFileCreate(File file) {
      onFileChange(file);
    }

    @Override
    public void onFileDelete(File file) {
      // For simple descriptors, we need to make sure to delete any corresponding full topology descriptors to trigger undeployment
      for (String ext : DefaultTopologyService.SUPPORTED_TOPOLOGY_FILE_EXTENSIONS) {
        File topologyFile =
                new File(topologiesDir, FilenameUtils.getBaseName(file.getName()) + "." + ext);
        if (topologyFile.exists()) {
          log.deletingTopologyForDescriptorDeletion(topologyFile.getName(), file.getName());
          topologyFile.delete();
        }
      }

      String normalizedFilePath = FilenameUtils.normalize(file.getAbsolutePath());
      String reference = null;
      for (Map.Entry<String, List<String>> entry : providerConfigReferences.entrySet()) {
        if (entry.getValue().contains(normalizedFilePath)) {
          reference = entry.getKey();
          break;
        }
      }

      if (reference != null) {
        providerConfigReferences.get(reference).remove(normalizedFilePath);
        log.removedProviderConfigurationReference(normalizedFilePath, reference);
      }
    }

    @Override
    public void onFileChange(File file) {
      try {
        // When a simple descriptor has been created or modified, generate the new topology descriptor
        Map<String, File> result = SimpleDescriptorHandler.handle(gatewayConfig, file, topologiesDir, aliasService);
        log.generatedTopologyForDescriptorChange(result.get(SimpleDescriptorHandler.RESULT_TOPOLOGY).getName(),
            file.getName());

        // Add the provider config reference relationship for handling updates to the provider config
        String providerConfig =
            FilenameUtils.normalize(result.get(SimpleDescriptorHandler.RESULT_REFERENCE).getAbsolutePath());
        if (!providerConfigReferences.containsKey(providerConfig)) {
          providerConfigReferences.put(providerConfig, new ArrayList<>());
        }
        List<String> refs = providerConfigReferences.get(providerConfig);
        String descriptorName = FilenameUtils.normalize(file.getAbsolutePath());
        if (!refs.contains(descriptorName)) {
          // Need to check if descriptor had previously referenced another provider config, so it can be removed
          for (List<String> descs : providerConfigReferences.values()) {
            descs.remove(descriptorName);
          }

          // Add the current reference relationship
          refs.add(descriptorName);
          log.addedProviderConfigurationReference(descriptorName, providerConfig);
        }
      } catch (IllegalArgumentException e) {
        log.simpleDescriptorHandlingError(file.getName(), e);

        // If the referenced provider configuration is invalid, remove any existing reference relationships for the
        // referencing descriptor.
        String descriptorName = FilenameUtils.normalize(file.getAbsolutePath());
        // Need to check if descriptor had previously referenced another provider config, so it can be removed
        for (List<String> descs : providerConfigReferences.values()) {
          descs.remove(descriptorName);
        }
      } catch (Exception e) {
        log.simpleDescriptorHandlingError(file.getName(), e);
      }
    }

    @Override
    public boolean accept(File file) {
      boolean accept = false;
      if (!file.isDirectory() && file.canRead()) {
        String extension = FilenameUtils.getExtension(file.getName());
        if (SUPPORTED_EXTENSIONS.contains(extension)) {
          accept = true;
        }
      }
      return accept;
    }
  }

  /**
   * Change handler for shared provider configurations
   */
  public static class SharedProviderConfigMonitor extends FileAlterationListenerAdaptor implements FileFilter {

    static final List<String> SUPPORTED_EXTENSIONS = ProviderConfigurationParser.SUPPORTED_EXTENSIONS;

    private DescriptorsMonitor descriptorsMonitor;
    private File descriptorsDir;


    SharedProviderConfigMonitor(DescriptorsMonitor descMonitor, File descriptorsDir) {
      this.descriptorsMonitor = descMonitor;
      this.descriptorsDir     = descriptorsDir;
    }

    @Override
    public void onFileCreate(File file) {
      onFileChange(file);
    }

    @Override
    public void onFileDelete(File file) {
      onFileChange(file);
    }

    @Override
    public void onFileChange(File file) {
      // For shared provider configuration, we need to update any simple descriptors that reference it
      for (File descriptor : getReferencingDescriptors(file)) {
        descriptor.setLastModified(System.currentTimeMillis());
      }
    }

    private List<File> getReferencingDescriptors(File sharedProviderConfig) {
      List<File> references = new ArrayList<>();

      for (File descriptor : listFiles(descriptorsDir)) {
        if (DescriptorsMonitor.SUPPORTED_EXTENSIONS.contains(FilenameUtils.getExtension(descriptor.getName()))) {
          for (String reference : descriptorsMonitor.getReferencingDescriptors(FilenameUtils.normalize(sharedProviderConfig.getAbsolutePath()))) {
            references.add(new File(reference));
          }
        }
      }

      return references;
    }

    @Override
    public boolean accept(File file) {
      boolean accept = false;
      if (!file.isDirectory() && file.canRead()) {
        String extension = FilenameUtils.getExtension(file.getName());
        if (SUPPORTED_EXTENSIONS.contains(extension)) {
          accept = true;
        }
      }
      return accept;
    }
  }

  /**
   * Listener for Ambari config change events, which will trigger re-generation (including re-discovery) of the
   * affected topologies.
   */
  private static class TopologyDiscoveryTrigger implements ClusterConfigurationMonitor.ConfigurationChangeListener {

    private TopologyService topologyService;
    private ClusterConfigurationMonitorService ccms;

    TopologyDiscoveryTrigger(TopologyService topologyService, ClusterConfigurationMonitorService ccms) {
      this.topologyService = topologyService;
      this.ccms = ccms;
    }

    @Override
    public void onConfigurationChange(String source, String clusterName) {
      log.noticedClusterConfigurationChange(source, clusterName);
      try {
        boolean affectedDescriptors = false;
        // Identify any descriptors associated with the cluster configuration change
        for (File descriptor : topologyService.getDescriptors()) {
          String descriptorContent = FileUtils.readFileToString(descriptor, StandardCharsets.UTF_8);
          if (descriptorContent.contains(source) && descriptorContent.contains(clusterName)) {
            affectedDescriptors = true;
            log.triggeringTopologyRegeneration(source, clusterName, descriptor.getAbsolutePath());
            // 'Touch' the descriptor to trigger re-generation of the associated topology
            descriptor.setLastModified(System.currentTimeMillis());
          }
        }

        if (!affectedDescriptors) {
          // If not descriptors are affected by this configuration, then clear the cache to prevent future notifications
          ccms.clearCache(source, clusterName);
        }
      } catch (Exception e) {
        log.errorRespondingToConfigChange(source, clusterName, e);
      }
    }
  }

}
