blob: 821eb8c38722d25fc206b0d1948c3db930a2d9fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.state.alert;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.ClusterNotFoundException;
import org.apache.ambari.server.agent.AlertDefinitionCommand;
import org.apache.ambari.server.controller.RootComponent;
import org.apache.ambari.server.controller.RootService;
import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ConfigHelper;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.commons.codec.binary.Hex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
/**
* The {@link AlertDefinitionHash} class is used to generate an MD5 hash for a
* list of {@link AlertDefinitionEntity}s. It is used in order to represent the
* state of a group of definitions by using
* {@link AlertDefinitionEntity#getHash()}
*/
@Singleton
public class AlertDefinitionHash {
/**
* Logger.
*/
private final static Logger LOG = LoggerFactory.getLogger(AlertDefinitionHash.class);
/**
* The hash returned when there are no definitions to hash.
*/
public static final String NULL_MD5_HASH = "37a6259cc0c1dae299a7866489dff0bd";
/**
* DAO for retrieving {@link AlertDefinitionEntity} instances.
*/
@Inject
private AlertDefinitionDAO m_definitionDao;
/**
* Used to coerce {@link AlertDefinitionEntity} into {@link AlertDefinition}.
*/
@Inject
private AlertDefinitionFactory m_factory;
/**
* All clusters.
*/
@Inject
private Provider<Clusters> m_clusters;
/**
* Used to add configurations to the {@link AlertDefinitionCommand} instances
* so that alerts can be scheduled to run with access to the properties they
* need.
*/
@Inject
private Provider<ConfigHelper> m_configHelper;
/**
* Due to the nature of the asynchronous events for alerts and Ambari, this
* lock will ensure that only a single writer is writing to the
* {@link ActionQueue}.
*/
private ReentrantLock m_actionQueueLock = new ReentrantLock();
/**
* The hashes for all hosts for any cluster. The key is the hostname and the
* value is a map between cluster name and hash.
*/
private ConcurrentMap<String, ConcurrentMap<String, String>> m_hashes =
new ConcurrentHashMap<>();
/**
* Gets a unique hash value reprssenting all of the alert definitions that
* should be scheduled to run on a given host.
* <p/>
* This will not include alert definitions where the type is defined as
* {@link SourceType#AGGREGATE} since aggregate definitions are not scheduled
* to run on agent hosts.
* <p/>
* Hash values from this method are cached.
*
* @param clusterName
* the cluster name (not {@code null}).
* @param hostName
* the host name (not {@code null}).
* @return the unique hash or {@value #NULL_MD5_HASH} if none.
*/
public String getHash(String clusterName, String hostName) {
ConcurrentMap<String, String> clusterMapping = m_hashes.get(hostName);
if (null == clusterMapping) {
clusterMapping = new ConcurrentHashMap<>();
ConcurrentMap<String, String> temp = m_hashes.putIfAbsent(hostName, clusterMapping);
if (temp != null) {
clusterMapping = temp;
}
}
String hash = clusterMapping.get(hostName);
if (null != hash) {
return hash;
}
hash = hash(clusterName, hostName);
clusterMapping.put(clusterName, hash);
return hash;
}
/**
* Invalidate all cached hashes causing subsequent lookups to recalculate.
*/
public void invalidateAll() {
m_hashes.clear();
}
/**
* Invalidates the cached hash for the specified agent host across all
* clusters.
*
* @param hostName
* the host to invalidate the cache for (not {@code null}).
*/
public void invalidate(String hostName) {
m_hashes.remove(hostName);
}
/**
* Invalidates the cached hash for the specified agent host in the specified
* cluster.
*
* @param clusterName
* the name of the cluster (not {@code null}).
* @param hostName
* the host to invalidate the cache for (not {@code null}).
*/
public void invalidate(String clusterName, String hostName) {
Map<String, String> clusterMapping = m_hashes.get(hostName);
if (null != clusterMapping) {
clusterMapping.remove(clusterName);
}
}
/**
* Gets whether the alert definition hash for the specified host has been
* calculated and cached.
*
* @param hostName
* the host.
* @return {@code true} if the hash was calculated; {@code false} otherwise.
*/
public boolean isHashCached(String clusterName, String hostName) {
if (null == clusterName || null == hostName) {
return false;
}
Map<String, String> clusterMapping = m_hashes.get(hostName);
if (null == clusterMapping) {
return false;
}
return clusterMapping.containsKey(clusterName);
}
/**
* Gets the alert definitions for the specified host. This will include the
* following types of alert definitions:
* <ul>
* <li>Service/Component alerts</li>
* <li>Service alerts where the host is a MASTER</li>
* <li>Host alerts that are not bound to a service</li>
* </ul>
*
* @param clusterName
* the cluster name (not {@code null}).
* @param hostName
* the host name (not {@code null}).
* @return the alert definitions for the host, or an empty set (never
* {@code null}).
*/
public List<AlertDefinition> getAlertDefinitions(String clusterName, String hostName) {
return coerce(getAlertDefinitionEntities(clusterName, hostName));
}
public Map<Long, Map<Long, AlertDefinition>> getAlertDefinitions(Long hostId) throws AmbariException {
Map<Long, Map<Long, AlertDefinition>> result = new HashMap<>();
String hostName = m_clusters.get().getHostById(hostId).getHostName();
for (Cluster cluster : m_clusters.get().getClustersForHost(hostName)) {
List<AlertDefinition> alertDefinitions = getAlertDefinitions(cluster.getClusterName(), hostName);
result.put(cluster.getClusterId(), mapById(alertDefinitions));
}
return result;
}
public Map<Long, AlertDefinition> findByServiceComponent(long clusterId, String serviceName, String componentName) {
return mapById(coerce(m_definitionDao.findByServiceComponent(clusterId, serviceName, componentName)));
}
public Map<Long, AlertDefinition> findByServiceMaster(long clusterId, String... serviceName) {
return mapById(coerce(m_definitionDao.findByServiceMaster(clusterId, Sets.newHashSet(serviceName))));
}
/**
* Invalidate the hashes of any host that would be affected by the specified
* definition. If the definition is an {@link SourceType#AGGREGATE}, this will
* return an empty set since aggregates do not affect hosts.
*
* @param definition
* the definition to use to find the hosts to invlidate (not
* {@code null}).
* @return the hosts that were invalidated, or an empty set (never
* {@code null}).
*/
public Set<String> invalidateHosts(AlertDefinitionEntity definition) {
return invalidateHosts(definition.getClusterId(),
definition.getSourceType(),
definition.getDefinitionName(), definition.getServiceName(),
definition.getComponentName());
}
/**
* Invalidate the hashes of any host that would be affected by the specified
* definition. If the definition is an {@link SourceType#AGGREGATE}, this will
* return an empty set since aggregates do not affect hosts.
*
* @param definition
* the definition to use to find the hosts to invlidate (not
* {@code null}).
* @return the hosts that were invalidated, or an empty set (never
* {@code null}).
*/
public Set<String> invalidateHosts(AlertDefinition definition) {
return invalidateHosts(definition.getClusterId(),
definition.getSource().getType(), definition.getName(),
definition.getServiceName(), definition.getComponentName());
}
/**
* Invalidate the hashes of any host that would be affected by the specified
* definition. If the definition is an {@link SourceType#AGGREGATE}, this will
* return an empty set since aggregates do not affect hosts.
*
* @param clusterId
* the cluster ID
* @param definitionSourceType
* the type of alert definition
* @param definitionName
* the definition unique name.
* @param definitionServiceName
* the definition's service name.
* @param definitionComponentName
* the definition's component name.
* @return the hosts that were invalidated, or an empty set (never
* {@code null}).
*/
private Set<String> invalidateHosts(long clusterId,
SourceType definitionSourceType, String definitionName,
String definitionServiceName, String definitionComponentName) {
Cluster cluster = null;
String clusterName = null;
try {
cluster = m_clusters.get().getClusterById(clusterId);
if (null != cluster) {
clusterName = cluster.getClusterName();
}
if (null == cluster) {
LOG.warn("Unable to lookup cluster with ID {}", clusterId);
}
} catch (Exception exception) {
LOG.error("Unable to lookup cluster with ID {}", clusterId, exception);
}
if (null == cluster) {
return Collections.emptySet();
}
// determine which hosts in the cluster would be affected by a change
// to the specified definition; pass in the definition source type
// to check for AGGREGATE
Set<String> affectedHosts = getAssociatedHosts(cluster,
definitionSourceType, definitionName,
definitionServiceName, definitionComponentName);
// invalidate all returned hosts
for (String hostName : affectedHosts) {
invalidate(clusterName, hostName);
}
return affectedHosts;
}
/**
* Gets the hosts that are associated with the specified definition. Each host
* returned is expected to be capable of running the alert. A change to the
* definition would entail contacting each returned host and invalidating
* their current alert definitions.
* <p/>
* If the definition is an {@link SourceType#AGGREGATE}, this will return an
* empty set since aggregates do not affect hosts.
*
* @param cluster
* @param definitionName
* @param definitionServiceName
* @param definitionComponentName
* @return a set of all associated hosts or an empty set, never {@code null}.
*/
public Set<String> getAssociatedHosts(Cluster cluster,
SourceType definitionSourceType, String definitionName,
String definitionServiceName, String definitionComponentName) {
if (definitionSourceType == SourceType.AGGREGATE) {
return Collections.emptySet();
}
String clusterName = cluster.getClusterName();
Map<String, Host> hosts = m_clusters.get().getHostsForCluster(clusterName);
Set<String> affectedHosts = new HashSet<>();
String ambariServiceName = RootService.AMBARI.name();
String agentComponentName = RootComponent.AMBARI_AGENT.name();
// intercept host agent alerts; they affect all hosts
if (ambariServiceName.equals(definitionServiceName)
&& agentComponentName.equals(definitionComponentName)) {
affectedHosts.addAll(hosts.keySet());
return affectedHosts;
}
// ignore other AMBARI components as they are server-side only
if (ambariServiceName.equalsIgnoreCase(definitionServiceName)) {
return Collections.emptySet();
}
// find all hosts that have the matching service and component
for (String hostName : hosts.keySet()) {
List<ServiceComponentHost> hostComponents = cluster.getServiceComponentHosts(hostName);
if (null == hostComponents || hostComponents.size() == 0) {
continue;
}
// if a host has a matching service/component, invalidate it
for (ServiceComponentHost component : hostComponents) {
String serviceName = component.getServiceName();
String componentName = component.getServiceComponentName();
if (serviceName.equals(definitionServiceName)
&& componentName.equals(definitionComponentName)) {
affectedHosts.add(hostName);
}
}
}
// get the service that this alert definition is associated with
Map<String, Service> services = cluster.getServices();
Service service = services.get(definitionServiceName);
if (null == service) {
LOG.warn("The alert definition {} has an unknown service of {}",
definitionName, definitionServiceName);
return affectedHosts;
}
// get all master components of the definition's service; any hosts that
// run the master should be invalidated as well
Map<String, ServiceComponent> components = service.getServiceComponents();
if (null != components) {
for (Entry<String, ServiceComponent> component : components.entrySet()) {
if (component.getValue().isMasterComponent()) {
Map<String, ServiceComponentHost> componentHosts = component.getValue().getServiceComponentHosts();
if (null != componentHosts) {
affectedHosts.addAll(componentHosts.keySet());
}
}
}
}
return affectedHosts;
}
/**
* Enqueue {@link AlertDefinitionCommand}s for every host in the cluster so
* that they will receive a payload of alert definitions that they should be
* running.
* <p/>
* This method is typically called after {@link #invalidateAll()} has caused a
* cache invalidation of all alert definitions.
*
* @param clusterId
* the ID of the cluster.
*/
public void enqueueAgentCommands(long clusterId) {
String clusterName = null;
Collection<String> hostNames;
try {
Cluster cluster = m_clusters.get().getClusterById(clusterId);
clusterName = cluster.getClusterName();
Collection<Host> hosts = cluster.getHosts();
hostNames = new ArrayList<>(hosts.size());
for (Host host : hosts) {
hostNames.add(host.getHostName());
}
enqueueAgentCommands(cluster, clusterName, hostNames);
} catch (AmbariException ae) {
LOG.error("Unable to lookup cluster for alert definition commands", ae);
}
}
/**
* Enqueue {@link AlertDefinitionCommand}s for every host specified so that
* they will receive a payload of alert definitions that they should be
* running.
* <p/>
* This method is typically called after
* {@link #invalidateHosts(AlertDefinitionEntity)} has caused a cache
* invalidation of the alert definition hash.
*
* @param clusterId
* the ID of the cluster.
* @param hosts
* the hosts to push {@link AlertDefinitionCommand}s for.
*/
public void enqueueAgentCommands(long clusterId, Collection<String> hosts) {
String clusterName = null;
Cluster cluster = null;
try {
cluster = m_clusters.get().getClusterById(clusterId);
clusterName = cluster.getClusterName();
} catch (AmbariException ae) {
LOG.error("Unable to lookup cluster for alert definition commands", ae);
}
enqueueAgentCommands(cluster, clusterName, hosts);
}
/**
* Enqueue {@link AlertDefinitionCommand}s for every host specified so that
* they will receive a payload of alert definitions that they should be
* running.
* <p/>
* This method is typically called after
* {@link #invalidateHosts(AlertDefinitionEntity)} has caused a cache
* invalidation of the alert definition hash.
*
* @param clusterName
* the name of the cluster (not {@code null}).
* @param hosts
* the hosts to push {@link AlertDefinitionCommand}s for.
*/
private void enqueueAgentCommands(Cluster cluster, String clusterName, Collection<String> hosts) {
if (null == clusterName) {
LOG.warn("Unable to create alert definition agent commands because of a null cluster name");
return;
}
if (null == hosts || hosts.size() == 0) {
return;
}
try {
m_actionQueueLock.lock();
for (String hostName : hosts) {
List<AlertDefinition> definitions = getAlertDefinitions(clusterName,
hostName);
String hash = getHash(clusterName, hostName);
Host host = cluster.getHost(hostName);
String publicHostName = host == null? hostName : host.getPublicHostName();
AlertDefinitionCommand command = new AlertDefinitionCommand(
clusterName, hostName, publicHostName, hash, definitions);
try {
command.addConfigs(m_configHelper.get(), cluster);
} catch (AmbariException ae) {
LOG.warn("Unable to add configurations to alert definition command",
ae);
}
// TODO implement alert execution commands logic
//m_actionQueue.enqueue(hostName, command);
}
} finally {
m_actionQueueLock.unlock();
}
}
/**
* Calculates a unique hash value representing all of the alert definitions
* that should be scheduled to run on a given host. Alerts of type
* {@link SourceType#AGGREGATE} are not included in the hash since they are
* not run on the agents.
*
* @param clusterName
* the cluster name (not {@code null}).
* @param hostName
* the host name (not {@code null}).
* @return the unique hash or {@value #NULL_MD5_HASH} if none.
*/
private String hash(String clusterName, String hostName) {
Set<AlertDefinitionEntity> definitions = getAlertDefinitionEntities(
clusterName,
hostName);
// no definitions found for this host, don't bother hashing
if(definitions.isEmpty()) {
return NULL_MD5_HASH;
}
// strip out all AGGREGATE types
Iterator<AlertDefinitionEntity> iterator = definitions.iterator();
while (iterator.hasNext()) {
if (SourceType.AGGREGATE.equals(iterator.next().getSourceType())) {
iterator.remove();
}
}
// build the UUIDs
List<String> uuids = new ArrayList<>(definitions.size());
for (AlertDefinitionEntity definition : definitions) {
uuids.add(definition.getHash());
}
// sort the UUIDs so that the digest is created with bytes in the same order
Collections.sort(uuids);
try {
MessageDigest digest = MessageDigest.getInstance("MD5");
for (String uuid : uuids) {
digest.update(uuid.getBytes());
}
byte[] hashBytes = digest.digest();
return Hex.encodeHexString(hashBytes);
} catch (NoSuchAlgorithmException nsae) {
LOG.warn("Unable to calculate MD5 alert definition hash", nsae);
return NULL_MD5_HASH;
}
}
/**
* Gets the alert definition entities for the specified host. This will include the
* following types of alert definitions:
* <ul>
* <li>Service/Component alerts</li>
* <li>Service alerts where the host is a MASTER except AGGREGATE alerts</li>
* <li>Host alerts that are not bound to a service</li>
* </ul>
*
* @param clusterName
* the cluster name (not {@code null}).
* @param hostName
* the host name (not {@code null}).
* @return the alert definitions for the host, or an empty set (never
* {@code null}).
*/
private Set<AlertDefinitionEntity> getAlertDefinitionEntities(
String clusterName, String hostName) {
Set<AlertDefinitionEntity> definitions = new HashSet<>();
try {
Cluster cluster = m_clusters.get().getCluster(clusterName);
if (null == cluster) {
return Collections.emptySet();
}
long clusterId = cluster.getClusterId();
// services and components
List<ServiceComponentHost> serviceComponents = cluster.getServiceComponentHosts(hostName);
if (null == serviceComponents || !serviceComponents.isEmpty()) {
if (serviceComponents != null) {
for (ServiceComponentHost serviceComponent : serviceComponents) {
String serviceName = serviceComponent.getServiceName();
String componentName = serviceComponent.getServiceComponentName();
// add all alerts for this service/component pair
definitions.addAll(m_definitionDao.findByServiceComponent(clusterId, serviceName, componentName));
}
}
// for every service, get the master components and see if the host
// is a master
Set<String> services = new HashSet<>();
for (Entry<String, Service> entry : cluster.getServices().entrySet()) {
Service service = entry.getValue();
Map<String, ServiceComponent> components = service.getServiceComponents();
for (Entry<String, ServiceComponent> component : components.entrySet()) {
if (component.getValue().isMasterComponent()) {
Map<String, ServiceComponentHost> hosts = component.getValue().getServiceComponentHosts();
if (hosts.containsKey(hostName)) {
services.add(service.getName());
}
}
}
}
// add all service scoped alerts
if (services.size() > 0) {
definitions.addAll(m_definitionDao.findByServiceMaster(clusterId,
services));
}
}
// add any alerts not bound to a service (host level alerts)
definitions.addAll(m_definitionDao.findAgentScoped(clusterId));
}
catch (ClusterNotFoundException clusterNotFound) {
LOG.warn("Unable to get alert definitions for the missing cluster {}",
clusterName);
return Collections.emptySet();
}
catch (AmbariException ambariException) {
LOG.error("Unable to get alert definitions", ambariException);
return Collections.emptySet();
}
return definitions;
}
private List<AlertDefinition> coerce(Collection<AlertDefinitionEntity> entities) {
return entities.stream()
.map(m_factory::coerce)
.collect(Collectors.toList());
}
private static Map<Long, AlertDefinition> mapById(Collection<AlertDefinition> definitions) {
return definitions.stream()
.collect(Collectors.toMap(AlertDefinition::getDefinitionId, Function.identity()));
}
}