blob: b018277386fc72aa99a37f15b19a6cbea2ce00f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.stack;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.ConfigHelper;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.MaintenanceState;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.UpgradeContext;
import org.apache.ambari.server.state.UpgradeState;
import org.apache.ambari.server.state.stack.upgrade.Direction;
import org.apache.ambari.server.state.stack.upgrade.ExecuteHostType;
import org.apache.ambari.server.utils.HTTPUtils;
import org.apache.ambari.server.utils.HostAndPort;
import org.apache.ambari.server.utils.StageUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
public class MasterHostResolver {
private static final Logger LOG = LoggerFactory.getLogger(MasterHostResolver.class);
private final UpgradeContext m_upgradeContext;
private final Cluster m_cluster;
private final ConfigHelper m_configHelper;
public enum Service {
HDFS,
HBASE,
YARN,
OTHER;
public static Service fromString(String serviceName) {
try {
return valueOf(serviceName.toUpperCase());
} catch (Exception ignore) {
return OTHER;
}
}
}
/**
* Union of status for several services.
*/
protected enum Status {
ACTIVE,
STANDBY
}
/**
* Constructor.
*
* @param configHelper
* Configuration Helper
* @param upgradeContext
* the upgrade context
*/
public MasterHostResolver(Cluster cluster, ConfigHelper configHelper, UpgradeContext upgradeContext) {
m_configHelper = configHelper;
m_upgradeContext = upgradeContext;
m_cluster = cluster;
}
/**
* Gets the cluster that this instance of the {@link MasterHostResolver} is
* initialized with.
*
* @return the cluster (not {@code null}).
*/
public Cluster getCluster() {
return m_cluster;
}
/**
* Get the master hostname of the given service and component.
* @param serviceName Service
* @param componentName Component
* @return The hostname that is the master of the service and component if successful, null otherwise.
*/
public HostsType getMasterAndHosts(String serviceName, String componentName) {
if (serviceName == null || componentName == null) {
return null;
}
LinkedHashSet<String> componentHosts = new LinkedHashSet<>(m_cluster.getHosts(serviceName, componentName));
if (componentHosts.isEmpty()) {
return null;
}
HostsType hostsType = HostsType.normal(componentHosts);
try {
switch (Service.fromString(serviceName)) {
case HDFS:
if (componentName.equalsIgnoreCase("NAMENODE") && componentHosts.size() >= 2) {
try {
hostsType = HostsType.federated(nameSpaces(componentHosts), componentHosts);
} catch (ClassifyNameNodeException | IllegalArgumentException e) {
if (componentHosts.size() == 2) { // in HA mode guess if cannot determine active/standby
hostsType = HostsType.guessHighAvailability(componentHosts);
LOG.warn(
"Could not determine the active/standby states from NameNodes {}. Using {} as active and {} as standbys.",
componentHosts, hostsType.getMasters(), hostsType.getSecondaries());
} else {
// XXX fallback to HostsType.normal unsure how to handle this
LOG.warn("Could not determine the active/standby states of federated NameNode from NameNodes {}.", componentHosts);
}
}
}
break;
case YARN:
if (componentName.equalsIgnoreCase("RESOURCEMANAGER")) {
hostsType = resolveResourceManagers(getCluster(), componentHosts);
}
break;
case HBASE:
if (componentName.equalsIgnoreCase("HBASE_MASTER")) {
hostsType = resolveHBaseMasters(getCluster(), componentHosts);
}
break;
default:
break;
}
} catch (Exception err) {
LOG.error("Unable to get master and hosts for Component " + componentName + ". Error: " + err.getMessage(), err);
}
return filterHosts(hostsType, serviceName, componentName);
}
/**
* Gets hosts which match the supplied criteria.
*
* @param cluster
* @param executeHostType
* @param serviceName
* @param componentName
* @return
*/
public static Collection<Host> getCandidateHosts(Cluster cluster, ExecuteHostType executeHostType,
String serviceName, String componentName) {
Collection<Host> candidates = cluster.getHosts();
if (StringUtils.isNotBlank(serviceName) && StringUtils.isNotBlank(componentName)) {
List<ServiceComponentHost> schs = cluster.getServiceComponentHosts(serviceName,componentName);
candidates = schs.stream().map(sch -> sch.getHost()).collect(Collectors.toList());
}
if (candidates.isEmpty()) {
return candidates;
}
// figure out where to add the new component
List<Host> winners = Lists.newArrayList();
switch (executeHostType) {
case ALL:
winners.addAll(candidates);
break;
case FIRST:
winners.add(candidates.iterator().next());
break;
case MASTER:
winners.add(candidates.iterator().next());
break;
case ANY:
winners.add(candidates.iterator().next());
break;
}
return winners;
}
/**
* Filters the supplied list of hosts in the following ways:
* <ul>
* <li>Compares the versions of a HostComponent to the version for the
* resolver. Only versions that do not match are retained.</li>
* <li>Removes unhealthy hosts in maintenance mode from the list of healthy
* hosts</li>
* </ul>
*
* @param hostsType
* the hosts to resolve
* @param service
* the service name
* @param component
* the component name
* @return the modified hosts instance with filtered and unhealthy hosts
* filled
*/
private HostsType filterHosts(HostsType hostsType, String service, String component) {
try {
org.apache.ambari.server.state.Service svc = m_cluster.getService(service);
ServiceComponent sc = svc.getServiceComponent(component);
// !!! not really a fan of passing these around
List<ServiceComponentHost> unhealthyHosts = new ArrayList<>();
LinkedHashSet<String> upgradeHosts = new LinkedHashSet<>();
for (String hostName : hostsType.getHosts()) {
ServiceComponentHost sch = sc.getServiceComponentHost(hostName);
Host host = sch.getHost();
MaintenanceState maintenanceState = host.getMaintenanceState(sch.getClusterId());
// !!! FIXME: only rely on maintenance state once the upgrade endpoint
// is using the pre-req endpoint for determining if an upgrade is
// possible
if (maintenanceState != MaintenanceState.OFF) {
unhealthyHosts.add(sch);
continue;
}
if (sch.getUpgradeState() == UpgradeState.FAILED) {
upgradeHosts.add(hostName);
continue;
}
if(m_upgradeContext.getDirection() == Direction.UPGRADE){
RepositoryVersionEntity targetRepositoryVersion = m_upgradeContext.getRepositoryVersion();
if (!StringUtils.equals(targetRepositoryVersion.getVersion(), sch.getVersion())) {
upgradeHosts.add(hostName);
}
continue;
}
// it's a downgrade ...
RepositoryVersionEntity downgradeToRepositoryVersion = m_upgradeContext.getTargetRepositoryVersion(service);
String downgradeToVersion = downgradeToRepositoryVersion.getVersion();
if (!StringUtils.equals(downgradeToVersion, sch.getVersion())) {
upgradeHosts.add(hostName);
continue;
}
}
hostsType.unhealthy = unhealthyHosts;
hostsType.setHosts(upgradeHosts);
return hostsType;
} catch (AmbariException e) {
// !!! better not
LOG.warn("Could not determine host components to upgrade. Defaulting to saved hosts.", e);
return hostsType;
}
}
/**
* Determine if HDFS is present and it has NameNode High Availability.
* @return true if has NameNode HA, otherwise, false.
*/
public boolean isNameNodeHA() throws AmbariException {
Map<String, org.apache.ambari.server.state.Service> services = m_cluster.getServices();
if (services != null && services.containsKey("HDFS")) {
Set<String> secondaryNameNodeHosts = m_cluster.getHosts("HDFS", "SECONDARY_NAMENODE");
Set<String> nameNodeHosts = m_cluster.getHosts("HDFS", "NAMENODE");
if (secondaryNameNodeHosts.size() == 1 && nameNodeHosts.size() == 1) {
return false;
}
if (nameNodeHosts.size() > 1) {
return true;
}
throw new AmbariException("Unable to determine if cluster has NameNode HA.");
}
return false;
}
/**
* Get the NameNode NameSpaces (master->secondaries hosts).
* In each NameSpace there should be exactly 1 master and at least one secondary host.
*/
private List<HostsType.HighAvailabilityHosts> nameSpaces(Set<String> componentHosts) {
return NameService.fromConfig(m_configHelper, getCluster()).stream()
.map(each -> findMasterAndSecondaries(each, componentHosts))
.collect(Collectors.toList());
}
/**
* Find the master and secondary namenode(s) based on JMX NameNodeStatus.
*/
private HostsType.HighAvailabilityHosts findMasterAndSecondaries(NameService nameService, Set<String> componentHosts) throws ClassifyNameNodeException {
String master = null;
List<String> secondaries = new ArrayList<>();
for (NameService.NameNode nameNode : nameService.getNameNodes()) {
checkForDualNetworkCards(componentHosts, nameNode);
String state = queryJmxBeanValue(nameNode.getHost(), nameNode.getPort(), "Hadoop:service=NameNode,name=NameNodeStatus", "State", true, nameNode.isEncrypted());
if (Status.ACTIVE.toString().equalsIgnoreCase(state)) {
master = nameNode.getHost();
} else if (Status.STANDBY.toString().equalsIgnoreCase(state)) {
secondaries.add(nameNode.getHost());
} else {
LOG.error(String.format("Could not retrieve state for NameNode %s from property %s by querying JMX.", nameNode.getHost(), nameNode.getPropertyName()));
}
}
if (masterAndSecondariesAreFound(componentHosts, master, secondaries)) {
return new HostsType.HighAvailabilityHosts(master, secondaries);
}
throw new ClassifyNameNodeException(nameService);
}
private static void checkForDualNetworkCards(Set<String> componentHosts, NameService.NameNode nameNode) {
if (!componentHosts.contains(nameNode.getHost())) {
//This may happen when NN HA is configured on dual network card machines with public/private FQDNs.
LOG.error(
MessageFormat.format(
"Hadoop NameNode HA configuration {0} contains host {1} that does not exist in the NameNode hosts list {3}",
nameNode.getPropertyName(), nameNode.getHost(), componentHosts.toString()));
}
}
private static boolean masterAndSecondariesAreFound(Set<String> componentHosts, String master, List<String> secondaries) {
return master != null && secondaries.size() + 1 == componentHosts.size() && !secondaries.contains(master);
}
private HostAndPort parseHostPort(Cluster cluster, String propertyName, String configType) throws MalformedURLException {
String propertyValue = m_configHelper.getValueFromDesiredConfigurations(cluster, configType, propertyName);
HostAndPort hp = HTTPUtils.getHostAndPortFromProperty(propertyValue);
if (hp == null) {
throw new MalformedURLException("Could not parse host and port from " + propertyValue);
}
return hp;
}
/**
* Resolve the name of the Resource Manager master and convert the hostname to lowercase.
*/
private HostsType resolveResourceManagers(Cluster cluster, Set<String> hosts) throws MalformedURLException {
String master = null;
LinkedHashSet<String> orderedHosts = new LinkedHashSet<>(hosts);
// IMPORTANT, for RM, only the master returns jmx
HostAndPort hp = parseHostPort(cluster, "yarn.resourcemanager.webapp.address", ConfigHelper.YARN_SITE);
for (String hostname : hosts) {
String value = queryJmxBeanValue(hostname, hp.port,
"Hadoop:service=ResourceManager,name=RMNMInfo", "modelerType", true);
if (null != value) {
if (master != null) {
master = hostname.toLowerCase();
}
// Quick and dirty to make sure the master is last in the list
orderedHosts.remove(hostname.toLowerCase());
orderedHosts.add(hostname.toLowerCase());
}
}
return HostsType.from(master, null, orderedHosts);
}
/**
* Resolve the HBASE master and convert the hostname to lowercase.
*/
private HostsType resolveHBaseMasters(Cluster cluster, Set<String> hosts) throws AmbariException {
String master = null;
String secondary = null;
String hbaseMasterInfoPortProperty = "hbase.master.info.port";
String hbaseMasterInfoPortValue = m_configHelper.getValueFromDesiredConfigurations(cluster, ConfigHelper.HBASE_SITE, hbaseMasterInfoPortProperty);
if (hbaseMasterInfoPortValue == null || hbaseMasterInfoPortValue.isEmpty()) {
throw new AmbariException("Could not find property " + hbaseMasterInfoPortProperty);
}
final int hbaseMasterInfoPort = Integer.parseInt(hbaseMasterInfoPortValue);
for (String hostname : hosts) {
String value = queryJmxBeanValue(hostname, hbaseMasterInfoPort,
"Hadoop:service=HBase,name=Master,sub=Server", "tag.isActiveMaster", false);
if (null != value) {
Boolean bool = Boolean.valueOf(value);
if (bool.booleanValue()) {
master = hostname.toLowerCase();
} else {
secondary = hostname.toLowerCase();
}
}
}
return HostsType.from(master, secondary, new LinkedHashSet<>(hosts));
}
protected String queryJmxBeanValue(String hostname, int port, String beanName, String attributeName,
boolean asQuery) {
return queryJmxBeanValue(hostname, port, beanName, attributeName, asQuery, false);
}
/**
* Query the JMX attribute at http(s)://$server:$port/jmx?qry=$query or http(s)://$server:$port/jmx?get=$bean::$attribute
* @param hostname host name
* @param port port number
* @param beanName if asQuery is false, then search for this bean name
* @param attributeName if asQuery is false, then search for this attribute name
* @param asQuery whether to search bean or query
* @param encrypted true if using https instead of http.
* @return The jmx value.
*/
protected String queryJmxBeanValue(String hostname, int port, String beanName, String attributeName,
boolean asQuery, boolean encrypted) {
String protocol = encrypted ? "https://" : "http://";
String endPoint = protocol + (asQuery ?
String.format("%s:%s/jmx?qry=%s", hostname, port, beanName) :
String.format("%s:%s/jmx?get=%s::%s", hostname, port, beanName, attributeName));
String response = HTTPUtils.requestURL(endPoint);
if (null == response || response.isEmpty()) {
return null;
}
Type type = new TypeToken<Map<String, ArrayList<HashMap<String, String>>>>() {}.getType();
try {
Map<String, ArrayList<HashMap<String, String>>> jmxBeans =
StageUtils.getGson().fromJson(response, type);
return jmxBeans.get("beans").get(0).get(attributeName);
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Could not load JMX from {}/{} from {}", beanName, attributeName, hostname, e);
} else {
LOG.debug("Could not load JMX from {}/{} from {}", beanName, attributeName, hostname);
}
}
return null;
}
}