blob: fb8d73c513bdef4f003588da99c317d12d0e02fa [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
import com.cloudera.api.swagger.EventsResourceApi;
import com.cloudera.api.swagger.RolesResourceApi;
import com.cloudera.api.swagger.ServicesResourceApi;
import com.cloudera.api.swagger.client.ApiClient;
import com.cloudera.api.swagger.client.ApiException;
import com.cloudera.api.swagger.model.ApiConfigList;
import com.cloudera.api.swagger.model.ApiEvent;
import com.cloudera.api.swagger.model.ApiEventAttribute;
import com.cloudera.api.swagger.model.ApiEventCategory;
import com.cloudera.api.swagger.model.ApiEventQueryResult;
import com.cloudera.api.swagger.model.ApiRole;
import com.cloudera.api.swagger.model.ApiRoleList;
import com.cloudera.api.swagger.model.ApiServiceConfig;
import org.apache.knox.gateway.GatewayServer;
import org.apache.knox.gateway.i18n.messages.MessagesFactory;
import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
import org.apache.knox.gateway.topology.simple.SimpleDescriptor;
import org.apache.knox.gateway.topology.simple.SimpleDescriptorFactory;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitor.ConfigurationChangeListener;
public class PollingConfigurationAnalyzer implements Runnable {
private static final String COMMAND = "COMMAND";
private static final String COMMAND_STATUS = "COMMAND_STATUS";
private static final String STARTED_STATUS = "STARTED";
private static final String SUCCEEDED_STATUS = "SUCCEEDED";
private static final String RESTART_COMMAND = "Restart";
private static final String START_COMMAND = "Start";
// The format of the filter employed when start events are queried from ClouderaManager
private static final String EVENTS_QUERY_FORMAT =
"category==" + ApiEventCategory.AUDIT_EVENT.getValue() +
// The format of the timestamp element of the start events query filter
private static final String EVENTS_QUERY_TIMESTAMP_FORMAT = ";timeOccurred=gt=%s";
// The default amount of time before "now" to check for start events the first time
private static final long DEFAULT_EVENT_QUERY_DEFAULT_TIMESTAMP_OFFSET = (60 * 60 * 1000); // one hour
private static final int DEFAULT_POLLING_INTERVAL = 60;
private static final ClouderaManagerServiceDiscoveryMessages log =
// Fully-qualified cluster name delimiter
private static final String FQCN_DELIM = "::";
private ClusterConfigurationCache configCache;
// Single listener for configuration change events
private ConfigurationChangeListener changeListener;
private AliasService aliasService;
private KeystoreService keystoreService;
private TopologyService topologyService;
private ClusterConfigurationMonitorService ccms;
// Polling interval in seconds
private int interval;
// Cache of ClouderaManager API clients, keyed by discovery address
private final Map<String, DiscoveryApiClient> clients = new ConcurrentHashMap<>();
// Timestamp records of the most recent start event query per discovery address
private Map<String, String> eventQueryTimestamps = new ConcurrentHashMap<>();
// The amount of time before "now" to will check for start events the first time
private long eventQueryDefaultTimestampOffset = DEFAULT_EVENT_QUERY_DEFAULT_TIMESTAMP_OFFSET;
private boolean isActive;
PollingConfigurationAnalyzer(final ClusterConfigurationCache configCache,
final AliasService aliasService,
final KeystoreService keystoreService,
final ConfigurationChangeListener changeListener) {
this(configCache, aliasService, keystoreService, changeListener, DEFAULT_POLLING_INTERVAL);
PollingConfigurationAnalyzer(final ClusterConfigurationCache configCache,
final AliasService aliasService,
final KeystoreService keystoreService,
final ConfigurationChangeListener changeListener,
int interval) {
this.configCache = configCache;
this.aliasService = aliasService;
this.keystoreService = keystoreService;
this.changeListener = changeListener;
this.interval = interval;
void setInterval(int interval) {
this.interval = interval;
void stop() {
isActive = false;
private void waitFor(long seconds) {
try {
Thread.sleep(seconds * 1000L);
} catch (InterruptedException e) {
public void run() {
isActive = true;
while (isActive) {
List<String> clustersToStopMonitoring = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : configCache.getClusterNames().entrySet()) {
String address = entry.getKey();
for (String clusterName : entry.getValue()) {
log.checkingClusterConfiguration(clusterName, address);
// Check here for existing descriptor references, and add to the removal list if there are not any
if (!clusterReferencesExist(address, clusterName)) {
clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
// Configuration changes don't mean anything without corresponding service start/restarts. Therefore, monitor
// start events, and check the configuration only of the restarted service(s) to identify changes
// that should trigger re-discovery.
List<StartEvent> relevantEvents = getRelevantEvents(address, clusterName);
// If there are no recent start events, then nothing to do now
if (!relevantEvents.isEmpty()) {
boolean configHasChanged = false;
// If there are start events, then check the previously-recorded properties for the same service to
// identify if the configuration has changed
Map<String, ServiceConfigurationModel> serviceConfigurations =
configCache.getClusterServiceConfigurations(address, clusterName);
// Those services for which a start even has been handled
List<String> handledServiceTypes = new ArrayList<>();
for (StartEvent re : relevantEvents) {
String serviceType = re.getServiceType();
// Determine if we've already handled a start event for this service type
if (!handledServiceTypes.contains(serviceType)) {
// Get the previously-recorded configuration
ServiceConfigurationModel serviceConfig = serviceConfigurations.get(re.getServiceType());
if (serviceConfig != null) {
// Get the current config for the started service, and compare with the previously-recorded config
ServiceConfigurationModel currentConfig =
getCurrentServiceConfiguration(address, clusterName, re.getService());
if (currentConfig != null) {
try {
configHasChanged = hasConfigurationChanged(serviceConfig, currentConfig);
} catch (Exception e) {
log.errorAnalyzingCurrentServiceConfiguration(re.getService(), e);
} else {
// A new service (no prior config) represent a config change, since a descriptor may have referenced
// the "new" service, but discovery had previously not succeeded because the service had not been
// configured (appropriately) at that time.
configHasChanged = true;
if (configHasChanged) {
break; // No need to continue checking once we've identified one reason to perform discovery again
// If a change has occurred, notify the listeners
if (configHasChanged) {
notifyChangeListener(address, clusterName);
// Remove outdated entries from the cache
for (String fqcn : clustersToStopMonitoring) {
String[] parts = fqcn.split(FQCN_DELIM);
stopMonitoring(parts[0], parts[1]);
clustersToStopMonitoring.clear(); // reset the removal list
private TopologyService getTopologyService() {
if (topologyService == null) {
GatewayServices gws = GatewayServer.getGatewayServices();
if (gws != null) {
topologyService = gws.getService(ServiceType.TOPOLOGY_SERVICE);
return topologyService;
private ClusterConfigurationMonitorService getConfigMonitorService() {
if (ccms == null) {
GatewayServices gws = GatewayServer.getGatewayServices();
if (gws != null) {
return ccms;
* Determine if any descriptors reference the specified discovery source and cluster.
* @param source A discovery source
* @param clusterName A discovery cluster name
* @return true, if at least one descriptor references the specified discovery information; Otherwise, false.
private boolean clusterReferencesExist(final String source, final String clusterName) {
boolean remainingClusterRefs = false;
if (source != null && clusterName != null) {
TopologyService ts = getTopologyService();
if (ts != null) {
for (File f : ts.getDescriptors()) {
try {
SimpleDescriptor sd = SimpleDescriptorFactory.parse(f.toPath().toAbsolutePath().toString());
if (source.equals(sd.getDiscoveryAddress()) && clusterName.equals(sd.getCluster())) {
remainingClusterRefs = true;
} catch (IOException e) {
// Ignore these errors
} else {
remainingClusterRefs = true; // If the TopologyService is unavailable, assume references remain
return remainingClusterRefs;
* Stop monitoring the specified cluster for configuration changes.
* @param source The discovery source
* @param clusterName The name of the cluster
private void stopMonitoring(final String source, final String clusterName) {
ClusterConfigurationMonitorService ms = getConfigMonitorService();
if (ms != null) {
log.stoppingConfigMonitoring(source, clusterName);
ms.clearCache(source, clusterName);
* Notify the registered change listener.
* @param source The address of the ClouderaManager instance from which the cluster details were determined.
* @param clusterName The name of the cluster whose configuration details have changed.
private void notifyChangeListener(final String source, final String clusterName) {
if (changeListener != null) {
changeListener.onConfigurationChange(source, clusterName);
void setEventQueryTimestamp(final String address, final String cluster, final Instant timestamp) {
eventQueryTimestamps.put((address + ":" + cluster), timestamp.toString());
private String getEventQueryTimestamp(final String address, final String cluster) {
return eventQueryTimestamps.get(address + ":" + cluster);
* Get a DiscoveryApiClient for the ClouderaManager instance described by the specified discovery configuration.
* @param discoveryConfig The discovery configuration for interacting with a ClouderaManager instance.
private DiscoveryApiClient getApiClient(final ServiceDiscoveryConfig discoveryConfig) {
return clients.computeIfAbsent(discoveryConfig.getAddress(),
c -> new DiscoveryApiClient(discoveryConfig, aliasService, keystoreService));
* Get relevant events for the specified ClouderaManager cluster.
* @param address The address of the ClouderaManager instance.
* @param clusterName The name of the cluster.
* @return A List of StartEvent objects for service start events since the last time they were queried.
private List<StartEvent> getRelevantEvents(final String address, final String clusterName) {
List<StartEvent> relevantEvents = new ArrayList<>();
// Get the last event query timestamp
String lastTimestamp = getEventQueryTimestamp(address, clusterName);
// If this is the first query, then define the last timestamp
if (lastTimestamp == null) {
lastTimestamp =, ChronoUnit.MILLIS).toString();
log.queryingRestartEventsFromCluster(clusterName, address, lastTimestamp);
// Record the new event query timestamp for this address/cluster
setEventQueryTimestamp(address, clusterName,;
// Query the event log from CM for service/cluster start events
List<ApiEvent> events = queryEvents(getApiClient(configCache.getDiscoveryConfig(address, clusterName)),
for (ApiEvent event : events) {
if(isRelevantEvent(event)) {
relevantEvents.add(new StartEvent(event));
return relevantEvents;
private boolean isRelevantEvent(ApiEvent event) {
final Map<String, Object> attributeMap = getAttributeMap(event.getAttributes());
final String command = attributeMap.containsKey(COMMAND) ? (String) ((List<String>) attributeMap.get(COMMAND)).get(0) : "";
final String status = attributeMap.containsKey(COMMAND_STATUS) ? (String) ((List<String>) attributeMap.get(COMMAND_STATUS)).get(0) : "";
if ((START_COMMAND.equals(command) || RESTART_COMMAND.equals(command)) && (SUCCEEDED_STATUS.equals(status) || STARTED_STATUS.equals(status))) {
return true;
return false;
private Map<String, Object> getAttributeMap(List<ApiEventAttribute> attributes) {
return attributes == null ? Collections.emptyMap() :, ApiEventAttribute::getValues));
* Query the ClouderaManager instance associated with the specified client for any service start events in the
* specified cluster since the specified time.
* @param client A ClouderaManager API client.
* @param clusterName The name of the cluster for which events should be queried.
* @param since The ISO8601 timestamp indicating from which time to query.
* @return A List of ApiEvent objects representing the relevant events since the specified time.
protected List<ApiEvent> queryEvents(final ApiClient client, final String clusterName, final String since) {
List<ApiEvent> events = new ArrayList<>();
// Setup the query for events
String timeFilter =
(since != null) ? String.format(Locale.ROOT, EVENTS_QUERY_TIMESTAMP_FORMAT, since) : "";
String queryString = String.format(Locale.ROOT,
try {
ApiEventQueryResult eventsResult = (new EventsResourceApi(client)).readEvents(20, queryString, 0);
} catch (ApiException e) {
return events;
* Get the current configuration for the specified service.
* @param address The address of the ClouderaManager instance.
* @param clusterName The name of the cluster.
* @param service The name of the service.
* @return A ServiceConfigurationModel object with the configuration properties associated with the specified
* service.
protected ServiceConfigurationModel getCurrentServiceConfiguration(final String address,
final String clusterName,
final String service) {
ServiceConfigurationModel currentConfig = null;
log.gettingCurrentClusterConfiguration(service, clusterName, address);
ApiClient apiClient = getApiClient(configCache.getDiscoveryConfig(address, clusterName));
ServicesResourceApi api = new ServicesResourceApi(apiClient);
try {
ApiServiceConfig svcConfig = api.readServiceConfig(clusterName, service, "full");
Map<ApiRole, ApiConfigList> roleConfigs = new HashMap<>();
RolesResourceApi rolesApi = (new RolesResourceApi(apiClient));
ApiRoleList roles = rolesApi.readRoles(clusterName, service, "", "full");
for (ApiRole role : roles.getItems()) {
ApiConfigList config = rolesApi.readRoleConfig(clusterName, role.getName(), service, "full");
roleConfigs.put(role, config);
currentConfig = new ServiceConfigurationModel(svcConfig, roleConfigs);
} catch (ApiException e) {
return currentConfig;
* Examine the ServiceConfigurationModel objects for significant differences.
* @param previous The previously-recorded service configuration properties.
* @param current The current service configuration properties.
* @return true, if the current service configuration values differ from those properties defined in the previous
* service configuration; Otherwise, false.
private boolean hasConfigurationChanged(final ServiceConfigurationModel previous,
final ServiceConfigurationModel current) {
boolean hasChanged = false;
// Compare the service configuration properties first
Map<String, String> previousProps = previous.getServiceProps();
Map<String, String> currentProps = current.getServiceProps();
for (String name : previousProps.keySet()) {
String prevValue = previousProps.get(name);
String currValue = currentProps.get(name);
if (!prevValue.equals(currValue)) {
log.serviceConfigurationPropertyHasChanged(name, prevValue, currValue);
hasChanged = true;
// If service config has not changed, check the role configuration properties
if (!hasChanged) {
Set<String> previousRoleTypes = previous.getRoleTypes();
Set<String> currentRoleTypes = current.getRoleTypes();
for (String roleType : previousRoleTypes) {
if (!currentRoleTypes.contains(roleType)) {
hasChanged = true;
} else {
previousProps = previous.getRoleProps(roleType);
currentProps = current.getRoleProps(roleType);
for (String name : previousProps.keySet()) {
String prevValue = previousProps.get(name);
String currValue = currentProps.get(name);
if (currValue == null) { // A missing/removed property
if (!(prevValue == null || "null".equals(prevValue))) {
log.roleConfigurationPropertyHasChanged(name, prevValue, "null");
hasChanged = true;
} else if (!currValue.equals(prevValue)) {
log.roleConfigurationPropertyHasChanged(name, prevValue, currValue);
hasChanged = true;
return hasChanged;
* Internal representation of a ClouderaManager service start event
static final class StartEvent {
private static final String ATTR_CLUSTER = "CLUSTER";
private static final String ATTR_SERVICE_TYPE = "SERVICE_TYPE";
private static final String ATTR_SERVICE = "SERVICE";
private static List<String> attrsOfInterest = new ArrayList<>();
static {
private ApiEvent auditEvent;
private String clusterName;
private String serviceType;
private String service;
StartEvent(final ApiEvent auditEvent) {
if (ApiEventCategory.AUDIT_EVENT != auditEvent.getCategory()) {
throw new IllegalArgumentException("Invalid event category " + auditEvent.getCategory().getValue());
this.auditEvent = auditEvent;
for (ApiEventAttribute attribute : auditEvent.getAttributes()) {
if (attrsOfInterest.contains(attribute.getName())) {
String getTimestamp() {
return auditEvent.getTimeOccurred();
String getClusterName() {
return clusterName;
String getServiceType() {
return serviceType;
String getService() {
return service;
private void setPropertyFromAttribute(final ApiEventAttribute attribute) {
switch (attribute.getName()) {
clusterName = attribute.getValues().get(0);
serviceType = attribute.getValues().get(0);
service = attribute.getValues().get(0);