blob: 4a1101da36f89609aa491716304e74a32cfd45e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.knox.gateway.services.topology.impl;
import org.apache.commons.digester3.Digester;
import org.apache.commons.digester3.binder.DigesterLoader;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.monitor.FileAlterationListener;
import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
import org.apache.knox.gateway.GatewayMessages;
import org.apache.knox.gateway.GatewayServer;
import org.apache.knox.gateway.audit.api.Action;
import org.apache.knox.gateway.audit.api.ActionOutcome;
import org.apache.knox.gateway.audit.api.AuditServiceFactory;
import org.apache.knox.gateway.audit.api.Auditor;
import org.apache.knox.gateway.audit.api.ResourceType;
import org.apache.knox.gateway.audit.log4j.audit.AuditConstants;
import org.apache.knox.gateway.config.GatewayConfig;
import org.apache.knox.gateway.i18n.messages.MessagesFactory;
import org.apache.knox.gateway.service.definition.ServiceDefinition;
import org.apache.knox.gateway.service.definition.ServiceDefinitionChangeListener;
import org.apache.knox.gateway.services.ServiceType;
import org.apache.knox.gateway.services.ServiceLifecycleException;
import org.apache.knox.gateway.services.config.client.RemoteConfigurationRegistryClient;
import org.apache.knox.gateway.services.security.AliasService;
import org.apache.knox.gateway.services.topology.TopologyService;
import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.knox.gateway.topology.Service;
import org.apache.knox.gateway.topology.Topology;
import org.apache.knox.gateway.topology.TopologyEvent;
import org.apache.knox.gateway.topology.TopologyListener;
import org.apache.knox.gateway.topology.TopologyMonitor;
import org.apache.knox.gateway.topology.TopologyProvider;
import org.apache.knox.gateway.topology.Version;
import org.apache.knox.gateway.topology.builder.TopologyBuilder;
import org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitor;
import org.apache.knox.gateway.topology.discovery.ServiceDiscovery;
import org.apache.knox.gateway.topology.monitor.RemoteConfigurationMonitor;
import org.apache.knox.gateway.topology.monitor.RemoteConfigurationMonitorFactory;
import org.apache.knox.gateway.topology.simple.ProviderConfigurationParser;
import org.apache.knox.gateway.topology.simple.SimpleDescriptor;
import org.apache.knox.gateway.topology.simple.SimpleDescriptorFactory;
import org.apache.knox.gateway.topology.simple.SimpleDescriptorHandler;
import org.apache.knox.gateway.topology.validation.TopologyValidator;
import org.apache.knox.gateway.topology.xml.AmbariFormatXmlTopologyRules;
import org.apache.knox.gateway.topology.xml.KnoxFormatXmlTopologyRules;
import org.apache.knox.gateway.util.ServiceDefinitionsLoader;
import org.eclipse.persistence.jaxb.JAXBContextProperties;
import org.xml.sax.SAXException;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static org.apache.commons.digester3.binder.DigesterLoader.newLoader;
public class DefaultTopologyService extends FileAlterationListenerAdaptor implements TopologyService, TopologyMonitor,
TopologyProvider, FileFilter, FileAlterationListener, ServiceDefinitionChangeListener {
private static final JAXBContext jaxbContext = getJAXBContext();
private static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor(
AuditConstants.DEFAULT_AUDITOR_NAME, AuditConstants.KNOX_SERVICE_NAME,
AuditConstants.KNOX_COMPONENT_NAME);
private static final List<String> SUPPORTED_TOPOLOGY_FILE_EXTENSIONS = Arrays.asList("xml", "conf");
private static GatewayMessages log = MessagesFactory.get(GatewayMessages.class);
private static DigesterLoader digesterLoader = newLoader(new KnoxFormatXmlTopologyRules(),
new AmbariFormatXmlTopologyRules());
private List<FileAlterationMonitor> monitors = new ArrayList<>();
private File topologiesDirectory;
private File sharedProvidersDirectory;
private File descriptorsDirectory;
private DescriptorsMonitor descriptorsMonitor;
private Set<TopologyListener> listeners;
private Map<File, Topology> topologies;
private AliasService aliasService;
private RemoteConfigurationMonitor remoteMonitor;
private GatewayConfig config;
private static JAXBContext getJAXBContext() {
String pkgName = Topology.class.getPackage().getName();
String bindingFile = pkgName.replace(".", "/") + "/topology_binding-xml.xml";
Map<String, Object> properties = new HashMap<>(1);
properties.put(JAXBContextProperties.OXM_METADATA_SOURCE, bindingFile);
try {
return JAXBContext.newInstance(pkgName, Topology.class.getClassLoader(), properties);
} catch (JAXBException e) {
throw new IllegalStateException(e);
}
}
private Topology loadTopology(File file) throws IOException, SAXException, InterruptedException {
final long TIMEOUT = 250; //ms
final long DELAY = 50; //ms
log.loadingTopologyFile(file.getAbsolutePath());
Topology topology;
long start = System.currentTimeMillis();
while (true) {
try {
topology = loadTopologyAttempt(file);
break;
} catch (IOException | SAXException e) {
if (System.currentTimeMillis() - start < TIMEOUT) {
log.failedToLoadTopologyRetrying(file.getAbsolutePath(), Long.toString(DELAY), e);
Thread.sleep(DELAY);
} else {
throw e;
}
}
}
return topology;
}
private Topology loadTopologyAttempt(File file) throws IOException, SAXException {
Topology topology;
Digester digester = digesterLoader.newDigester();
TopologyBuilder topologyBuilder = digester.parse(FileUtils.openInputStream(file));
if (null == topologyBuilder) {
return null;
}
topology = topologyBuilder.build();
topology.setUri(file.toURI());
topology.setName(FilenameUtils.removeExtension(file.getName()));
topology.setTimestamp(file.lastModified());
return topology;
}
private void redeployTopology(Topology topology) {
File topologyFile = new File(topology.getUri());
try {
TopologyValidator tv = new TopologyValidator(topology);
if(!tv.validateTopology()) {
if(config != null && config.isTopologyValidationEnabled()) {
/* If strict validation enabled we fail */
throw new SAXException(tv.getErrorString());
} else {
/* Log and move on */
log.failedToValidateTopology(topology.getName(), tv.getErrorString());
}
}
long start = System.currentTimeMillis();
long limit = 1000L; // One second.
long elapsed = 1;
while (elapsed <= limit) {
try {
long origTimestamp = topologyFile.lastModified();
long setTimestamp = Math.max(System.currentTimeMillis(), topologyFile.lastModified() + elapsed);
if(topologyFile.setLastModified(setTimestamp)) {
long newTimstamp = topologyFile.lastModified();
if(newTimstamp > origTimestamp) {
break;
} else {
Thread.sleep(10);
elapsed = System.currentTimeMillis() - start;
continue;
}
} else {
auditor.audit(Action.REDEPLOY, topology.getName(), ResourceType.TOPOLOGY,
ActionOutcome.FAILURE);
log.failedToRedeployTopology(topology.getName());
break;
}
} catch (InterruptedException e) {
auditor.audit(Action.REDEPLOY, topology.getName(), ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
log.failedToRedeployTopology(topology.getName(), e);
Thread.currentThread().interrupt();
}
}
} catch (SAXException e) {
auditor.audit(Action.REDEPLOY, topology.getName(), ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
log.failedToRedeployTopology(topology.getName(), e);
}
}
private List<TopologyEvent> createChangeEvents(
Map<File, Topology> oldTopologies,
Map<File, Topology> newTopologies) {
ArrayList<TopologyEvent> events = new ArrayList<>();
// Go through the old topologies and find anything that was deleted.
for (Entry<File, Topology> oldTopology : oldTopologies.entrySet()) {
if (!newTopologies.containsKey(oldTopology.getKey())) {
events.add(new TopologyEvent(TopologyEvent.Type.DELETED, oldTopology.getValue()));
}
}
// Go through the new topologies and figure out what was updated vs added.
for (Entry<File, Topology> newTopology : newTopologies.entrySet()) {
if (oldTopologies.containsKey(newTopology.getKey())) {
Topology oldTopology = oldTopologies.get(newTopology.getKey());
if (newTopology.getValue().getTimestamp() > oldTopology.getTimestamp()) {
events.add(new TopologyEvent(TopologyEvent.Type.UPDATED, newTopology.getValue()));
}
} else {
events.add(new TopologyEvent(TopologyEvent.Type.CREATED, newTopology.getValue()));
}
}
return events;
}
private File calculateAbsoluteTopologiesDir(GatewayConfig config) {
File topoDir = new File(config.getGatewayTopologyDir());
topoDir = topoDir.getAbsoluteFile();
return topoDir;
}
private File calculateAbsoluteConfigDir(GatewayConfig config) {
File configDir;
String path = config.getGatewayConfDir();
configDir = (path != null) ? new File(path) : (new File(config.getGatewayTopologyDir())).getParentFile();
return configDir.getAbsoluteFile();
}
private void initListener(FileAlterationMonitor monitor,
File directory,
FileFilter filter,
FileAlterationListener listener) {
monitors.add(monitor);
FileAlterationObserver observer = new FileAlterationObserver(directory, filter);
observer.addListener(listener);
monitor.addObserver(observer);
}
private void initListener(File directory, FileFilter filter, FileAlterationListener listener) {
// Increasing the monitoring interval to 5 seconds as profiling has shown
// this is rather expensive in terms of generated garbage objects.
initListener(new FileAlterationMonitor(5000L), directory, filter, listener);
}
private Map<File, Topology> loadTopologies(File directory) {
Map<File, Topology> map = new HashMap<>();
if (directory.isDirectory() && directory.canRead()) {
File[] existingTopologies = directory.listFiles(this);
if (existingTopologies != null) {
for (File file : existingTopologies) {
try {
Topology loadTopology = loadTopology(file);
if (null != loadTopology) {
map.put(file, loadTopology);
} else {
auditor.audit(Action.LOAD, file.getAbsolutePath(), ResourceType.TOPOLOGY,
ActionOutcome.FAILURE);
log.failedToLoadTopology(file.getAbsolutePath());
}
} catch (Exception e) {
// Maybe it makes sense to throw exception
auditor.audit(Action.LOAD, file.getAbsolutePath(), ResourceType.TOPOLOGY,
ActionOutcome.FAILURE);
log.failedToLoadTopology(file.getAbsolutePath(), e);
}
}
}
}
return map;
}
public void setAliasService(AliasService as) {
this.aliasService = as;
}
@Override
public void deployTopology(Topology t){
try {
File temp = new File(topologiesDirectory.getAbsolutePath() + "/" + t.getName() + ".xml.temp");
Marshaller mr = jaxbContext.createMarshaller();
mr.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
mr.marshal(t, temp);
File topology = new File(topologiesDirectory.getAbsolutePath() + "/" + t.getName() + ".xml");
if(!temp.renameTo(topology)) {
FileUtils.forceDelete(temp);
throw new IOException("Could not rename temp file");
}
// This code will check if the topology is valid, and retrieve the errors if it is not.
TopologyValidator validator = new TopologyValidator( topology.getAbsolutePath() );
if( !validator.validateTopology() ){
throw new SAXException( validator.getErrorString() );
}
} catch (JAXBException | SAXException | IOException e) {
auditor.audit(Action.DEPLOY, t.getName(), ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
log.failedToDeployTopology(t.getName(), e);
}
reloadTopologies();
}
@Override
public void redeployTopologies(String topologyName) {
for (Topology topology : getTopologies()) {
if (topologyName == null || topologyName.equals(topology.getName())) {
redeployTopology(topology);
}
}
}
@Override
public void reloadTopologies() {
try {
synchronized (this) {
Map<File, Topology> oldTopologies = topologies;
Map<File, Topology> newTopologies = loadTopologies(topologiesDirectory);
List<TopologyEvent> events = createChangeEvents(oldTopologies, newTopologies);
topologies = newTopologies;
notifyChangeListeners(events);
}
} catch (Exception e) {
// Maybe it makes sense to throw exception
log.failedToReloadTopologies(e);
}
}
@Override
public void deleteTopology(Topology t) {
File topoDir = topologiesDirectory;
if(topoDir.isDirectory() && topoDir.canRead()) {
for (File f : listFiles(topoDir)) {
String fName = FilenameUtils.getBaseName(f.getName());
if(fName.equals(t.getName())) {
f.delete();
}
}
}
reloadTopologies();
}
private void notifyChangeListeners(List<TopologyEvent> events) {
for (TopologyListener listener : listeners) {
try {
listener.handleTopologyEvent(events);
} catch (RuntimeException e) {
auditor.audit(Action.LOAD, "Topology_Event", ResourceType.TOPOLOGY, ActionOutcome.FAILURE);
log.failedToHandleTopologyEvents(e);
}
}
}
@Override
public Map<String, List<String>> getServiceTestURLs(Topology t, GatewayConfig config) {
File tFile = null;
Map<String, List<String>> urls = new HashMap<>();
if (topologiesDirectory.isDirectory() && topologiesDirectory.canRead()) {
for (File f : listFiles(topologiesDirectory)) {
if (FilenameUtils.removeExtension(f.getName()).equals(t.getName())) {
tFile = f;
}
}
}
Set<ServiceDefinition> defs;
if(tFile != null) {
defs = ServiceDefinitionsLoader.getServiceDefinitions(new File(config.getGatewayServicesDir()));
for(ServiceDefinition def : defs) {
urls.put(def.getRole(), def.getTestURLs());
}
}
return urls;
}
@Override
public Collection<Topology> getTopologies() {
Map<File, Topology> map = topologies;
return Collections.unmodifiableCollection(map.values());
}
@Override
public boolean deployProviderConfiguration(String name, String content) {
boolean result;
// Whether the remote configuration registry is being employed or not, write the file locally
result = writeConfig(sharedProvidersDirectory, name, content);
// If the remote configuration registry is being employed, persist it there also
if (remoteMonitor != null) {
RemoteConfigurationRegistryClient client = remoteMonitor.getClient();
if (client != null) {
String entryPath = "/knox/config/shared-providers/" + name;
client.createEntry(entryPath, content);
result = (client.getEntryData(entryPath) != null);
}
}
return result;
}
@Override
public Collection<File> getProviderConfigurations() {
List<File> providerConfigs = new ArrayList<>();
for (File providerConfig : listFiles(sharedProvidersDirectory)) {
if (SharedProviderConfigMonitor.SUPPORTED_EXTENSIONS.contains(FilenameUtils.getExtension(providerConfig.getName()))) {
providerConfigs.add(providerConfig);
}
}
return providerConfigs;
}
@Override
public boolean deleteProviderConfiguration(String name) {
return deleteProviderConfiguration(name, false);
}
@Override
public boolean deleteProviderConfiguration(String name, boolean force) {
boolean result = false;
// Determine if the file exists, and if so, if there are any descriptors referencing it
boolean hasReferences = false;
File providerConfig = getExistingFile(sharedProvidersDirectory, name);
if (providerConfig != null) {
List<String> references = descriptorsMonitor.getReferencingDescriptors(providerConfig.getAbsolutePath());
hasReferences = !references.isEmpty();
} else {
result = true; // If it already does NOT exist, then the delete effectively succeeded
}
// If the local file does not exist, or it does exist and there are NOT any referencing descriptors
if (force || (providerConfig == null || !hasReferences)) {
// If the remote config monitor is configured, attempt to delete the provider configuration from the remote
// registry, even if it does not exist locally.
deleteRemoteEntry("/knox/config/shared-providers", name);
// Whether the remote configuration registry is being employed or not, delete the local file if it exists
result = providerConfig == null || !providerConfig.exists() || providerConfig.delete();
} else {
log.preventedDeletionOfSharedProviderConfiguration(providerConfig.getAbsolutePath());
}
return result;
}
@Override
public boolean deployDescriptor(String name, String content) {
boolean result;
// Whether the remote configuration registry is being employed or not, write the file locally
result = writeConfig(descriptorsDirectory, name, content);
// If the remote configuration registry is being employed, persist it there also
if (remoteMonitor != null) {
RemoteConfigurationRegistryClient client = remoteMonitor.getClient();
if (client != null) {
String entryPath = "/knox/config/descriptors/" + name;
client.createEntry(entryPath, content);
result = (client.getEntryData(entryPath) != null);
}
}
return result;
}
@Override
public Collection<File> getDescriptors() {
List<File> descriptors = new ArrayList<>();
for (File descriptor : listFiles(descriptorsDirectory)) {
if (DescriptorsMonitor.SUPPORTED_EXTENSIONS.contains(FilenameUtils.getExtension(descriptor.getName()))) {
descriptors.add(descriptor);
}
}
return descriptors;
}
@Override
public boolean deleteDescriptor(String name) {
boolean result;
// If the remote config monitor is configured, delete the descriptor from the remote registry
deleteRemoteEntry("/knox/config/descriptors", name);
// Whether the remote configuration registry is being employed or not, delete the local file
File descriptor = getExistingFile(descriptorsDirectory, name);
result = (descriptor == null) || descriptor.delete();
return result;
}
@Override
public void addTopologyChangeListener(TopologyListener listener) {
listeners.add(listener);
}
@Override
public void onServiceDefinitionChange(String name, String role, String version) {
getTopologies().stream().filter(topology -> topology.getServices().stream().anyMatch(service -> isRelevantService(service, role, name, version))).forEach(topology -> {
log.redeployingTopologyOnServiceDefinitionChange(topology.getName(), name, role, version);
redeployTopology(topology);
});
}
private boolean isRelevantService(Service service, String role, String name, String version) {
return service.getRole().equalsIgnoreCase(role)
&& (service.getName() == null || service.getName().equalsIgnoreCase(name) && (service.getVersion() == null || service.getVersion().equals(new Version(version))));
}
@Override
public void startMonitor() throws Exception {
// Start the local configuration monitors
for (FileAlterationMonitor monitor : monitors) {
monitor.start();
}
// Start the remote configuration monitor, if it has been initialized
if (remoteMonitor != null) {
try {
remoteMonitor.start();
} catch (Exception e) {
log.remoteConfigurationMonitorStartFailure(remoteMonitor.getClass().getTypeName(), e.getLocalizedMessage());
}
}
}
@Override
public void stopMonitor() throws Exception {
// Stop the local configuration monitors
for (FileAlterationMonitor monitor : monitors) {
monitor.stop();
}
// Stop the remote configuration monitor, if it has been initialized
if (remoteMonitor != null) {
remoteMonitor.stop();
}
}
@Override
public boolean accept(File file) {
boolean accept = false;
if (!file.isDirectory() && file.canRead()) {
String extension = FilenameUtils.getExtension(file.getName());
if (SUPPORTED_TOPOLOGY_FILE_EXTENSIONS.contains(extension)) {
accept = true;
}
}
return accept;
}
@Override
public void onFileCreate(File file) {
onFileChange(file);
}
@Override
public void onFileDelete(java.io.File file) {
onFileChange(file);
}
@Override
public void onFileChange(File file) {
reloadTopologies();
}
@Override
public void stop() {
}
@Override
public void start() {
// Register a cluster configuration monitor listener for change notifications
ClusterConfigurationMonitorService ccms =
GatewayServer.getGatewayServices().getService(ServiceType.CLUSTER_CONFIGURATION_MONITOR_SERVICE);
ccms.addListener(new TopologyDiscoveryTrigger(this, ccms));
}
@Override
public void init(GatewayConfig config, Map<String, String> options) throws ServiceLifecycleException {
this.config = config;
String gatewayConfDir = config.getGatewayConfDir();
if (gatewayConfDir != null) {
System.setProperty(ServiceDiscovery.CONFIG_DIR_PROPERTY, gatewayConfDir);
}
try {
listeners = new HashSet<>();
topologies = new HashMap<>();
topologiesDirectory = calculateAbsoluteTopologiesDir(config);
File configDirectory = calculateAbsoluteConfigDir(config);
descriptorsDirectory = new File(configDirectory, "descriptors");
sharedProvidersDirectory = new File(configDirectory, "shared-providers");
// Add support for conf/topologies
initListener(topologiesDirectory, this, this);
// Add support for conf/descriptors
descriptorsMonitor = new DescriptorsMonitor(config, topologiesDirectory, aliasService);
initListener(descriptorsDirectory,
descriptorsMonitor,
descriptorsMonitor);
log.monitoringDescriptorChangesInDirectory(descriptorsDirectory.getAbsolutePath());
// Add support for conf/shared-providers
SharedProviderConfigMonitor spm = new SharedProviderConfigMonitor(descriptorsMonitor, descriptorsDirectory);
initListener(sharedProvidersDirectory, spm, spm);
log.monitoringProviderConfigChangesInDirectory(sharedProvidersDirectory.getAbsolutePath());
// For all the descriptors currently in the descriptors dir at start-up time, determine if topology regeneration
// is required.
// This happens prior to the start-up loading of the topologies.
String[] descriptorFilenames = descriptorsDirectory.list();
if (descriptorFilenames != null) {
for (String descriptorFilename : descriptorFilenames) {
if (DescriptorsMonitor.isDescriptorFile(descriptorFilename)) {
String topologyName = FilenameUtils.getBaseName(descriptorFilename);
File existingDescriptorFile = getExistingFile(descriptorsDirectory, topologyName);
// If there isn't a corresponding topology file, or if the descriptor has been modified since the
// corresponding topology file was generated, then trigger generation of one
File matchingTopologyFile = getExistingFile(topologiesDirectory, topologyName);
if (matchingTopologyFile == null || matchingTopologyFile.lastModified() < existingDescriptorFile.lastModified()) {
descriptorsMonitor.onFileChange(existingDescriptorFile);
} else {
// If regeneration is NOT required, then we at least need to report the provider configuration
// reference relationship (KNOX-1144)
String normalizedDescriptorPath = FilenameUtils.normalize(existingDescriptorFile.getAbsolutePath());
// Parse the descriptor to determine the provider config reference
SimpleDescriptor sd = SimpleDescriptorFactory.parse(normalizedDescriptorPath);
if (sd != null) {
File referencedProviderConfig =
getExistingFile(sharedProvidersDirectory, FilenameUtils.getBaseName(sd.getProviderConfig()));
if (referencedProviderConfig != null) {
List<String> references =
descriptorsMonitor.getReferencingDescriptors(referencedProviderConfig.getAbsolutePath());
if (!references.contains(normalizedDescriptorPath)) {
references.add(normalizedDescriptorPath);
}
}
}
}
}
}
}
// Initialize the remote configuration monitor, if it has been configured
remoteMonitor = RemoteConfigurationMonitorFactory.get(config);
} catch (IOException io) {
throw new ServiceLifecycleException(io.getMessage(), io);
}
}
/**
* Delete the entry in the remote configuration registry, which matches the specified resource name.
*
* @param entryParent The remote registry path in which the entry exists.
* @param name The name of the entry (typically without any file extension).
*
* @return true, if the entry is deleted, or did not exist; otherwise, false.
*/
private boolean deleteRemoteEntry(String entryParent, String name) {
boolean result = true;
if (remoteMonitor != null) {
RemoteConfigurationRegistryClient client = remoteMonitor.getClient();
if (client != null) {
List<String> existingProviderConfigs = client.listChildEntries(entryParent);
for (String entryName : existingProviderConfigs) {
if (FilenameUtils.getBaseName(entryName).equals(name)) {
String entryPath = entryParent + "/" + entryName;
client.deleteEntry(entryPath);
result = !client.entryExists(entryPath);
if (!result) {
log.failedToDeletedRemoteConfigFile("descriptor", name);
}
break;
}
}
}
}
return result;
}
/**
* Utility method for listing the files in the specified directory.
* This method is "nicer" than the File#listFiles() because it will not return null.
*
* @param directory The directory whose files should be returned.
*
* @return A List of the Files on the directory.
*/
private static List<File> listFiles(File directory) {
List<File> result;
File[] files = directory.listFiles();
if (files != null) {
result = Arrays.asList(files);
} else {
result = Collections.emptyList();
}
return result;
}
/**
* Search for a file in the specified directory whose base name (filename without extension) matches the
* specified basename.
*
* @param directory The directory in which to search.
* @param basename The basename of interest.
*
* @return The matching File
*/
private static File getExistingFile(File directory, String basename) {
File match = null;
for (File file : listFiles(directory)) {
if (FilenameUtils.getBaseName(file.getName()).equals(basename)) {
match = file;
break;
}
}
return match;
}
/**
* Write the specified content to a file.
*
* @param dest The destination directory.
* @param name The name of the file.
* @param content The contents of the file.
*
* @return true, if the write succeeds; otherwise, false.
*/
private static boolean writeConfig(File dest, String name, String content) {
boolean result = false;
File destFile = new File(dest, name);
try {
FileUtils.writeStringToFile(destFile, content, StandardCharsets.UTF_8);
log.wroteConfigurationFile(destFile.getAbsolutePath());
result = true;
} catch (IOException e) {
log.failedToWriteConfigurationFile(destFile.getAbsolutePath(), e);
}
return result;
}
/**
* Change handler for simple descriptors
*/
public static class DescriptorsMonitor extends FileAlterationListenerAdaptor
implements FileFilter {
static final List<String> SUPPORTED_EXTENSIONS = new ArrayList<>();
static {
SUPPORTED_EXTENSIONS.add("json");
SUPPORTED_EXTENSIONS.add("yml");
SUPPORTED_EXTENSIONS.add("yaml");
}
private GatewayConfig gatewayConfig;
private File topologiesDir;
private AliasService aliasService;
private Map<String, List<String>> providerConfigReferences = new HashMap<>();
static boolean isDescriptorFile(String filename) {
return SUPPORTED_EXTENSIONS.contains(FilenameUtils.getExtension(filename));
}
public DescriptorsMonitor(GatewayConfig config, File topologiesDir, AliasService aliasService) {
this.gatewayConfig = config;
this.topologiesDir = topologiesDir;
this.aliasService = aliasService;
}
List<String> getReferencingDescriptors(String providerConfigPath) {
String normalizedPath = FilenameUtils.normalize(providerConfigPath);
return providerConfigReferences.computeIfAbsent(normalizedPath, p -> new ArrayList<>());
}
@Override
public void onFileCreate(File file) {
onFileChange(file);
}
@Override
public void onFileDelete(File file) {
// For simple descriptors, we need to make sure to delete any corresponding full topology descriptors to trigger undeployment
for (String ext : DefaultTopologyService.SUPPORTED_TOPOLOGY_FILE_EXTENSIONS) {
File topologyFile =
new File(topologiesDir, FilenameUtils.getBaseName(file.getName()) + "." + ext);
if (topologyFile.exists()) {
log.deletingTopologyForDescriptorDeletion(topologyFile.getName(), file.getName());
topologyFile.delete();
}
}
String normalizedFilePath = FilenameUtils.normalize(file.getAbsolutePath());
String reference = null;
for (Map.Entry<String, List<String>> entry : providerConfigReferences.entrySet()) {
if (entry.getValue().contains(normalizedFilePath)) {
reference = entry.getKey();
break;
}
}
if (reference != null) {
providerConfigReferences.get(reference).remove(normalizedFilePath);
log.removedProviderConfigurationReference(normalizedFilePath, reference);
}
}
@Override
public void onFileChange(File file) {
try {
// When a simple descriptor has been created or modified, generate the new topology descriptor
Map<String, File> result = SimpleDescriptorHandler.handle(gatewayConfig, file, topologiesDir, aliasService);
log.generatedTopologyForDescriptorChange(result.get(SimpleDescriptorHandler.RESULT_TOPOLOGY).getName(),
file.getName());
// Add the provider config reference relationship for handling updates to the provider config
String providerConfig =
FilenameUtils.normalize(result.get(SimpleDescriptorHandler.RESULT_REFERENCE).getAbsolutePath());
if (!providerConfigReferences.containsKey(providerConfig)) {
providerConfigReferences.put(providerConfig, new ArrayList<>());
}
List<String> refs = providerConfigReferences.get(providerConfig);
String descriptorName = FilenameUtils.normalize(file.getAbsolutePath());
if (!refs.contains(descriptorName)) {
// Need to check if descriptor had previously referenced another provider config, so it can be removed
for (List<String> descs : providerConfigReferences.values()) {
descs.remove(descriptorName);
}
// Add the current reference relationship
refs.add(descriptorName);
log.addedProviderConfigurationReference(descriptorName, providerConfig);
}
} catch (IllegalArgumentException e) {
log.simpleDescriptorHandlingError(file.getName(), e);
// If the referenced provider configuration is invalid, remove any existing reference relationships for the
// referencing descriptor.
String descriptorName = FilenameUtils.normalize(file.getAbsolutePath());
// Need to check if descriptor had previously referenced another provider config, so it can be removed
for (List<String> descs : providerConfigReferences.values()) {
descs.remove(descriptorName);
}
} catch (Exception e) {
log.simpleDescriptorHandlingError(file.getName(), e);
}
}
@Override
public boolean accept(File file) {
boolean accept = false;
if (!file.isDirectory() && file.canRead()) {
String extension = FilenameUtils.getExtension(file.getName());
if (SUPPORTED_EXTENSIONS.contains(extension)) {
accept = true;
}
}
return accept;
}
}
/**
* Change handler for shared provider configurations
*/
public static class SharedProviderConfigMonitor extends FileAlterationListenerAdaptor implements FileFilter {
static final List<String> SUPPORTED_EXTENSIONS = ProviderConfigurationParser.SUPPORTED_EXTENSIONS;
private DescriptorsMonitor descriptorsMonitor;
private File descriptorsDir;
SharedProviderConfigMonitor(DescriptorsMonitor descMonitor, File descriptorsDir) {
this.descriptorsMonitor = descMonitor;
this.descriptorsDir = descriptorsDir;
}
@Override
public void onFileCreate(File file) {
onFileChange(file);
}
@Override
public void onFileDelete(File file) {
onFileChange(file);
}
@Override
public void onFileChange(File file) {
// For shared provider configuration, we need to update any simple descriptors that reference it
for (File descriptor : getReferencingDescriptors(file)) {
descriptor.setLastModified(System.currentTimeMillis());
}
}
private List<File> getReferencingDescriptors(File sharedProviderConfig) {
List<File> references = new ArrayList<>();
for (File descriptor : listFiles(descriptorsDir)) {
if (DescriptorsMonitor.SUPPORTED_EXTENSIONS.contains(FilenameUtils.getExtension(descriptor.getName()))) {
for (String reference : descriptorsMonitor.getReferencingDescriptors(FilenameUtils.normalize(sharedProviderConfig.getAbsolutePath()))) {
references.add(new File(reference));
}
}
}
return references;
}
@Override
public boolean accept(File file) {
boolean accept = false;
if (!file.isDirectory() && file.canRead()) {
String extension = FilenameUtils.getExtension(file.getName());
if (SUPPORTED_EXTENSIONS.contains(extension)) {
accept = true;
}
}
return accept;
}
}
/**
* Listener for Ambari config change events, which will trigger re-generation (including re-discovery) of the
* affected topologies.
*/
private static class TopologyDiscoveryTrigger implements ClusterConfigurationMonitor.ConfigurationChangeListener {
private TopologyService topologyService;
private ClusterConfigurationMonitorService ccms;
TopologyDiscoveryTrigger(TopologyService topologyService, ClusterConfigurationMonitorService ccms) {
this.topologyService = topologyService;
this.ccms = ccms;
}
@Override
public void onConfigurationChange(String source, String clusterName) {
log.noticedClusterConfigurationChange(source, clusterName);
try {
boolean affectedDescriptors = false;
// Identify any descriptors associated with the cluster configuration change
for (File descriptor : topologyService.getDescriptors()) {
String descriptorContent = FileUtils.readFileToString(descriptor, StandardCharsets.UTF_8);
if (descriptorContent.contains(source) && descriptorContent.contains(clusterName)) {
affectedDescriptors = true;
log.triggeringTopologyRegeneration(source, clusterName, descriptor.getAbsolutePath());
// 'Touch' the descriptor to trigger re-generation of the associated topology
descriptor.setLastModified(System.currentTimeMillis());
}
}
if (!affectedDescriptors) {
// If not descriptors are affected by this configuration, then clear the cache to prevent future notifications
ccms.clearCache(source, clusterName);
}
} catch (Exception e) {
log.errorRespondingToConfigChange(source, clusterName, e);
}
}
}
}