package org.apache.hadoop.gateway.topology.discovery.ambari;
import org.apache.hadoop.gateway.config.GatewayConfig;
import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
import org.apache.hadoop.gateway.topology.discovery.ClusterConfigurationMonitor;
import org.apache.hadoop.gateway.topology.discovery.ServiceDiscoveryConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class AmbariConfigurationMonitor implements ClusterConfigurationMonitor {
private static final String TYPE = "Ambari";
private static final String CLUSTERS_DATA_DIR_NAME = "clusters";
private static final String PERSISTED_FILE_COMMENT = "Generated File. Do Not Edit!";
private static final String PROP_CLUSTER_PREFIX = "cluster.";
private static final String PROP_CLUSTER_SOURCE = PROP_CLUSTER_PREFIX + "source";
private static final String PROP_CLUSTER_NAME = PROP_CLUSTER_PREFIX + "name";
private static final String PROP_CLUSTER_USER = PROP_CLUSTER_PREFIX + "user";
private static final String PROP_CLUSTER_ALIAS = PROP_CLUSTER_PREFIX + "pwd.alias";
static final String INTERVAL_PROPERTY_NAME = "org.apache.hadoop.gateway.topology.discovery.ambari.monitor.interval";
private static final AmbariServiceDiscoveryMessages log = MessagesFactory.get(AmbariServiceDiscoveryMessages.class);
// Ambari address
// clusterName -> ServiceDiscoveryConfig
Map<String, Map<String, ServiceDiscoveryConfig>> clusterMonitorConfigurations = new HashMap<>();
// Ambari address
// clusterName
// configType -> version
Map<String, Map<String, Map<String, String>>> ambariClusterConfigVersions = new HashMap<>();
ReadWriteLock configVersionsLock = new ReentrantReadWriteLock();
private List<ConfigurationChangeListener> changeListeners = new ArrayList<>();
private AmbariClientCommon ambariClient;
PollingConfigAnalyzer internalMonitor;
GatewayConfig gatewayConfig = null;
static String getType() {
return TYPE;
AmbariConfigurationMonitor(GatewayConfig config, AliasService aliasService) {
this.gatewayConfig = config;
this.ambariClient = new AmbariClientCommon(aliasService);
this.internalMonitor = new PollingConfigAnalyzer(this);
// Override the default polling interval if it has been configured
int interval = config.getClusterMonitorPollingInterval(getType());
if (interval > 0) {
public void setPollingInterval(int interval) {
private void init() {
* Load any previously-persisted service discovery configurations.
* This is necessary for checking previously-deployed topologies.
private void loadDiscoveryConfiguration() {
File persistenceDir = getPersistenceDir();
if (persistenceDir != null) {
Collection<File> persistedConfigs = FileUtils.listFiles(persistenceDir, new String[]{"conf"}, false);
for (File persisted : persistedConfigs) {
Properties props = new Properties();
FileInputStream in = null;
try {
in = new FileInputStream(persisted);
addDiscoveryConfig(props.getProperty(PROP_CLUSTER_NAME), new ServiceDiscoveryConfig() {
public String getAddress() {
return props.getProperty(PROP_CLUSTER_SOURCE);
public String getUser() {
return props.getProperty(PROP_CLUSTER_USER);
public String getPasswordAlias() {
return props.getProperty(PROP_CLUSTER_ALIAS);
} catch (IOException e) {
log.failedToLoadClusterMonitorServiceDiscoveryConfig(getType(), e);
} finally {
if (in != null) {
try {
} catch (IOException e) {
* Load any previously-persisted cluster configuration version records, so the monitor will check
* previously-deployed topologies against the current cluster configuration.
private void loadClusterVersionData() {
File persistenceDir = getPersistenceDir();
if (persistenceDir != null) {
Collection<File> persistedConfigs = FileUtils.listFiles(persistenceDir, new String[]{"ver"}, false);
for (File persisted : persistedConfigs) {
Properties props = new Properties();
FileInputStream in = null;
try {
in = new FileInputStream(persisted);
String source = props.getProperty(PROP_CLUSTER_SOURCE);
String clusterName = props.getProperty(PROP_CLUSTER_NAME);
Map<String, String> configVersions = new HashMap<>();
for (String name : props.stringPropertyNames()) {
if (!name.startsWith(PROP_CLUSTER_PREFIX)) { // Ignore implementation-specific properties
configVersions.put(name, props.getProperty(name));
// Map the config versions to the cluster name
addClusterConfigVersions(source, clusterName, configVersions);
} catch (IOException e) {
log.failedToLoadClusterMonitorConfigVersions(getType(), e);
} finally {
if (in != null) {
try {
} catch (IOException e) {
private void persistDiscoveryConfiguration(String clusterName, ServiceDiscoveryConfig sdc) {
File persistenceDir = getPersistenceDir();
if (persistenceDir != null) {
Properties props = new Properties();
props.setProperty(PROP_CLUSTER_NAME, clusterName);
props.setProperty(PROP_CLUSTER_SOURCE, sdc.getAddress());
String username = sdc.getUser();
if (username != null) {
props.setProperty(PROP_CLUSTER_USER, username);
String pwdAlias = sdc.getPasswordAlias();
if (pwdAlias != null) {
props.setProperty(PROP_CLUSTER_ALIAS, pwdAlias);
persist(props, getDiscoveryConfigPersistenceFile(sdc.getAddress(), clusterName));
private void persistClusterVersionData(String address, String clusterName, Map<String, String> configVersions) {
File persistenceDir = getPersistenceDir();
if (persistenceDir != null) {
Properties props = new Properties();
props.setProperty(PROP_CLUSTER_NAME, clusterName);
props.setProperty(PROP_CLUSTER_SOURCE, address);
for (String name : configVersions.keySet()) {
props.setProperty(name, configVersions.get(name));
persist(props, getConfigVersionsPersistenceFile(address, clusterName));
private void persist(Properties props, File dest) {
FileOutputStream out = null;
try {
out = new FileOutputStream(dest);, PERSISTED_FILE_COMMENT);
} catch (Exception e) {
log.failedToPersistClusterMonitorData(getType(), dest.getAbsolutePath(), e);
} finally {
if (out != null) {
try {
} catch (IOException e) {
private File getPersistenceDir() {
File persistenceDir = null;
File dataDir = new File(gatewayConfig.getGatewayDataDir());
if (dataDir.exists()) {
File clustersDir = new File(dataDir, CLUSTERS_DATA_DIR_NAME);
if (!clustersDir.exists()) {
persistenceDir = clustersDir;
return persistenceDir;
private File getDiscoveryConfigPersistenceFile(String address, String clusterName) {
return getPersistenceFile(address, clusterName, "conf");
private File getConfigVersionsPersistenceFile(String address, String clusterName) {
return getPersistenceFile(address, clusterName, "ver");
private File getPersistenceFile(String address, String clusterName, String ext) {
String fileName = address.replace(":", "_").replace("/", "_") + "-" + clusterName + "." + ext;
return new File(getPersistenceDir(), fileName);
* Add cluster configuration details to the monitor's in-memory record.
* @param address An Ambari instance address.
* @param clusterName The name of a cluster associated with the Ambari instance.
* @param configVersions A Map of configuration types and their corresponding versions.
private void addClusterConfigVersions(String address, String clusterName, Map<String, String> configVersions) {
try {
ambariClusterConfigVersions.computeIfAbsent(address, k -> new HashMap<>())
.put(clusterName, configVersions);
} finally {
public void start() {
(new Thread(internalMonitor, "AmbariConfigurationMonitor")).start();
public void stop() {
public void addListener(ConfigurationChangeListener listener) {
* Add discovery configuration details for the specified cluster, so the monitor knows how to connect to check for
* changes.
* @param clusterName The name of the cluster.
* @param config The associated service discovery configuration.
void addDiscoveryConfig(String clusterName, ServiceDiscoveryConfig config) {
clusterMonitorConfigurations.computeIfAbsent(config.getAddress(), k -> new HashMap<>()).put(clusterName, config);
* Get the service discovery configuration associated with the specified Ambari instance and cluster.
* @param address An Ambari instance address.
* @param clusterName The name of a cluster associated with the Ambari instance.
* @return The associated ServiceDiscoveryConfig object.
ServiceDiscoveryConfig getDiscoveryConfig(String address, String clusterName) {
ServiceDiscoveryConfig config = null;
if (clusterMonitorConfigurations.containsKey(address)) {
config = clusterMonitorConfigurations.get(address).get(clusterName);
return config;
* Add cluster configuration data to the monitor, which it will use when determining if configuration has changed.
* @param cluster An AmbariCluster object.
* @param discoveryConfig The discovery configuration associated with the cluster.
void addClusterConfigVersions(AmbariCluster cluster, ServiceDiscoveryConfig discoveryConfig) {
String clusterName = cluster.getName();
// Register the cluster discovery configuration for the monitor connections
persistDiscoveryConfiguration(clusterName, discoveryConfig);
addDiscoveryConfig(clusterName, discoveryConfig);
// Build the set of configuration versions
Map<String, String> configVersions = new HashMap<>();
Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs = cluster.getServiceConfigurations();
for (String serviceName : serviceConfigs.keySet()) {
Map<String, AmbariCluster.ServiceConfiguration> configTypeVersionMap = serviceConfigs.get(serviceName);
for (AmbariCluster.ServiceConfiguration config : configTypeVersionMap.values()) {
String configType = config.getType();
String version = config.getVersion();
configVersions.put(configType, version);
persistClusterVersionData(discoveryConfig.getAddress(), clusterName, configVersions);
addClusterConfigVersions(discoveryConfig.getAddress(), clusterName, configVersions);
* Remove the configuration record for the specified Ambari instance and cluster name.
* @param address An Ambari instance address.
* @param clusterName The name of a cluster associated with the Ambari instance.
* @return The removed data; A Map of configuration types and their corresponding versions.
Map<String, String> removeClusterConfigVersions(String address, String clusterName) {
Map<String, String> result = new HashMap<>();
try {
if (ambariClusterConfigVersions.containsKey(address)) {
} finally {
// Delete the associated persisted record
File persisted = getConfigVersionsPersistenceFile(address, clusterName);
if (persisted.exists()) {
return result;
* Get the cluster configuration details for the specified cluster and Ambari instance.
* @param address An Ambari instance address.
* @param clusterName The name of a cluster associated with the Ambari instance.
* @return A Map of configuration types and their corresponding versions.
Map<String, String> getClusterConfigVersions(String address, String clusterName) {
Map<String, String> result = new HashMap<>();
try {
if (ambariClusterConfigVersions.containsKey(address)) {
} finally {
return result;
* Get all the clusters the monitor knows about.
* @return A Map of Ambari instance addresses to associated cluster names.
Map<String, List<String>> getClusterNames() {
Map<String, List<String>> result = new HashMap<>();
try {
for (String address : ambariClusterConfigVersions.keySet()) {
List<String> clusterNames = new ArrayList<>();
result.put(address, clusterNames);
} finally {
return result;
* Notify registered change listeners.
* @param source The address of the Ambari instance from which the cluster details were determined.
* @param clusterName The name of the cluster whose configuration details have changed.
void notifyChangeListeners(String source, String clusterName) {
for (ConfigurationChangeListener listener : changeListeners) {
listener.onConfigurationChange(source, clusterName);
* Request the current active configuration version info from Ambari.
* @param address The Ambari instance address.
* @param clusterName The name of the cluster for which the details are desired.
* @return A Map of service configuration types and their corresponding versions.
Map<String, String> getUpdatedConfigVersions(String address, String clusterName) {
Map<String, String> configVersions = new HashMap<>();
ServiceDiscoveryConfig sdc = getDiscoveryConfig(address, clusterName);
if (sdc != null) {
Map<String, Map<String, AmbariCluster.ServiceConfiguration>> serviceConfigs =
ambariClient.getActiveServiceConfigurations(clusterName, sdc);
for (Map<String, AmbariCluster.ServiceConfiguration> serviceConfig : serviceConfigs.values()) {
for (AmbariCluster.ServiceConfiguration config : serviceConfig.values()) {
configVersions.put(config.getType(), config.getVersion());
return configVersions;
* The thread that polls Ambari for configuration details for clusters associated with discovered topologies,
* compares them with the current recorded values, and notifies any listeners when differences are discovered.
static final class PollingConfigAnalyzer implements Runnable {
private static final int DEFAULT_POLLING_INTERVAL = 60;
// Polling interval in seconds
private int interval = DEFAULT_POLLING_INTERVAL;
private AmbariConfigurationMonitor delegate;
private boolean isActive = false;
PollingConfigAnalyzer(AmbariConfigurationMonitor delegate) {
this.delegate = delegate;
this.interval = Integer.getInteger(INTERVAL_PROPERTY_NAME, PollingConfigAnalyzer.DEFAULT_POLLING_INTERVAL);
void setInterval(int interval) {
this.interval = interval;
void stop() {
isActive = false;
public void run() {
isActive = true;
while (isActive) {
for (Map.Entry<String, List<String>> entry : delegate.getClusterNames().entrySet()) {
String address = entry.getKey();
for (String clusterName : entry.getValue()) {
Map<String, String> configVersions = delegate.getClusterConfigVersions(address, clusterName);
if (configVersions != null && !configVersions.isEmpty()) {
Map<String, String> updatedVersions = delegate.getUpdatedConfigVersions(address, clusterName);
if (updatedVersions != null && !updatedVersions.isEmpty()) {
boolean configHasChanged = false;
// If the config sets don't match in size, then something has changed
if (updatedVersions.size() != configVersions.size()) {
configHasChanged = true;
} else {
// Perform the comparison of all the config versions
for (Map.Entry<String, String> configVersion : configVersions.entrySet()) {
if (!updatedVersions.get(configVersion.getKey()).equals(configVersion.getValue())) {
configHasChanged = true;
// If a change has occurred, notify the listeners
if (configHasChanged) {
delegate.notifyChangeListeners(address, clusterName);
try {
Thread.sleep(interval * 1000);
} catch (InterruptedException e) {
// Ignore