blob: 5c5b1328800da4e3de1b169608796115102daaf7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS_DEFAULT;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.Time;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
/**
* The {@link Router} periodically checks the state of a Namenode (usually on
* the same server) and reports their high availability (HA) state and
* load/space status to the
* {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService}
* . Note that this is an optional role as a Router can be independent of any
* subcluster.
* <p>
* For performance with Namenode HA, the Router uses the high availability state
* information in the State Store to forward the request to the Namenode that is
* most likely to be active.
* <p>
* Note that this service can be embedded into the Namenode itself to simplify
* the operation.
*/
public class NamenodeHeartbeatService extends PeriodicService {
private static final Logger LOG =
LoggerFactory.getLogger(NamenodeHeartbeatService.class);
/** Configuration for the heartbeat. */
private Configuration conf;
/** Router performing the heartbeating. */
private final ActiveNamenodeResolver resolver;
/** Interface to the tracked NN. */
private final String nameserviceId;
private final String namenodeId;
/** Namenode HA target. */
private NNHAServiceTarget localTarget;
/** Cache HA protocol. */
private HAServiceProtocol localTargetHAProtocol;
/** Cache NN protocol. */
private NamenodeProtocol namenodeProtocol;
/** Cache Client protocol. */
private ClientProtocol clientProtocol;
/** RPC address for the namenode. */
private String rpcAddress;
/** Service RPC address for the namenode. */
private String serviceAddress;
/** Service RPC address for the namenode. */
private String lifelineAddress;
/** HTTP address for the namenode. */
private String webAddress;
/** Connection factory for JMX calls. */
private URLConnectionFactory connectionFactory;
/** URL scheme to use for JMX calls. */
private String scheme;
/** Frequency of updates to JMX report. */
private long updateJmxIntervalMs;
/** Timestamp of last attempt to update JMX report. */
private long lastJmxUpdateAttempt;
/** Result of the last successful FsNamesystemMetrics report. */
private JSONArray fsNamesystemMetrics;
/** Result of the last successful NamenodeInfoMetrics report. */
private JSONArray namenodeInfoMetrics;
private String resolvedHost;
private String originalNnId;
private int healthMonitorTimeoutMs = (int) DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
/**
* Create a new Namenode status updater.
* @param resolver Namenode resolver service to handle NN registration.
* @param nsId Identifier of the nameservice.
* @param nnId Identifier of the namenode in HA.
*/
public NamenodeHeartbeatService(
ActiveNamenodeResolver resolver, String nsId, String nnId) {
super(NamenodeHeartbeatService.class.getSimpleName() +
(nsId == null ? "" : " " + nsId) +
(nnId == null ? "" : " " + nnId));
this.resolver = resolver;
this.nameserviceId = nsId;
this.namenodeId = nnId;
}
/**
* Create a new Namenode status updater.
*
* @param resolver Namenode resolver service to handle NN registration.
* @param nsId Identifier of the nameservice.
* @param nnId Identifier of the namenode in HA.
* @param resolvedHost resolvedHostname for this specific namenode.
*/
public NamenodeHeartbeatService(
ActiveNamenodeResolver resolver, String nsId, String nnId, String resolvedHost) {
super(getNnHeartBeatServiceName(nsId, nnId));
this.resolver = resolver;
this.nameserviceId = nsId;
// Concat a uniq id from original nnId and resolvedHost
this.namenodeId = nnId + "-" + resolvedHost;
this.resolvedHost = resolvedHost;
// Same the original nnid to get the ports from config.
this.originalNnId = nnId;
}
@Override
protected void serviceInit(Configuration configuration) throws Exception {
this.conf = DFSHAAdmin.addSecurityConfiguration(configuration);
String nnDesc = nameserviceId;
if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
nnDesc += "-" + namenodeId;
} else {
this.localTarget = null;
}
if (originalNnId == null) {
originalNnId = namenodeId;
}
// Get the RPC address for the clients to connect
this.rpcAddress = getRpcAddress(conf, nameserviceId, originalNnId);
// Get the Service RPC address for monitoring
this.serviceAddress =
DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, originalNnId);
if (this.serviceAddress == null) {
LOG.error("Cannot locate RPC service address for NN {}, " +
"using RPC address {}", nnDesc, this.rpcAddress);
this.serviceAddress = this.rpcAddress;
}
// Get the Lifeline RPC address for faster monitoring
this.lifelineAddress =
DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, originalNnId);
if (this.lifelineAddress == null) {
this.lifelineAddress = this.serviceAddress;
}
// Get the Web address for UI
this.webAddress =
DFSUtil.getNamenodeWebAddr(conf, nameserviceId, originalNnId);
if (resolvedHost != null) {
// Get the addresses from resolvedHost plus the configured ports.
rpcAddress = resolvedHost + ":"
+ NetUtils.getPortFromHostPortString(rpcAddress);
serviceAddress = resolvedHost + ":"
+ NetUtils.getPortFromHostPortString(serviceAddress);
lifelineAddress = resolvedHost + ":"
+ NetUtils.getPortFromHostPortString(lifelineAddress);
webAddress = resolvedHost + ":"
+ NetUtils.getPortFromHostPortString(webAddress);
}
LOG.info("{} RPC address: {}", nnDesc, rpcAddress);
LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress);
LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress);
LOG.info("{} Web address: {}", nnDesc, webAddress);
if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
this.localTarget = new NNHAServiceTarget(
conf, nameserviceId, namenodeId, serviceAddress, lifelineAddress);
}
this.connectionFactory =
URLConnectionFactory.newDefaultURLConnectionFactory(conf);
this.scheme =
DFSUtil.getHttpPolicy(conf).isHttpEnabled() ? "http" : "https";
this.setIntervalMs(conf.getLong(
DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));
long timeoutMs = conf.getTimeDuration(DFS_ROUTER_HEALTH_MONITOR_TIMEOUT,
DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
if (timeoutMs < 0) {
LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
"Using value of : 0ms instead.", timeoutMs, DFS_ROUTER_HEALTH_MONITOR_TIMEOUT);
this.healthMonitorTimeoutMs = 0;
} else {
this.healthMonitorTimeoutMs = (int) timeoutMs;
}
this.updateJmxIntervalMs = conf.getTimeDuration(DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS,
DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS_DEFAULT, TimeUnit.MILLISECONDS);
super.serviceInit(configuration);
}
@Override
public void periodicInvoke() {
try {
// Run using the login user credentials
SecurityUtil.doAsLoginUser((PrivilegedExceptionAction<Void>) () -> {
updateState();
return null;
});
} catch (IOException e) {
LOG.error("Cannot update namenode state", e);
}
}
/**
* Get the RPC address for a Namenode.
* @param conf Configuration.
* @param nsId Name service identifier.
* @param nnId Name node identifier.
* @return RPC address in format hostname:1234.
*/
private static String getRpcAddress(
Configuration conf, String nsId, String nnId) {
// Get it from the regular RPC setting
String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
String ret = conf.get(confKey);
if (nsId != null || nnId != null) {
// Get if for the proper nameservice and namenode
confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId);
ret = conf.get(confKey);
// If not available, get it from the map
if (ret == null) {
Map<String, InetSocketAddress> rpcAddresses =
DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
InetSocketAddress sockAddr = null;
if (nnId != null) {
sockAddr = rpcAddresses.get(nnId);
} else if (rpcAddresses.size() == 1) {
// Get the only namenode in the namespace
sockAddr = rpcAddresses.values().iterator().next();
}
if (sockAddr != null) {
InetAddress addr = sockAddr.getAddress();
ret = addr.getHostName() + ":" + sockAddr.getPort();
}
}
}
return ret;
}
/**
* Update the state of the Namenode.
*/
private void updateState() {
NamenodeStatusReport report = getNamenodeStatusReport();
if (!report.registrationValid()) {
// Not operational
LOG.error("Namenode is not operational: {}", getNamenodeDesc());
} else if (report.haStateValid()) {
// block and HA status available
if (LOG.isDebugEnabled()) {
LOG.debug("Received service state: {} from HA namenode: {}",
report.getState(), getNamenodeDesc());
}
} else if (localTarget == null) {
// block info available, HA status not expected
if (LOG.isDebugEnabled()) {
LOG.debug(
"Reporting non-HA namenode as operational: {}", getNamenodeDesc());
}
} else {
// block info available, HA status should be available, but was not
// fetched do nothing and let the current state stand
return;
}
try {
if (!resolver.registerNamenode(report)) {
LOG.warn("Cannot register namenode {}", report);
}
} catch (Exception e) {
LOG.error("Cannot register namenode {} in the State Store", getNamenodeDesc(), e);
}
}
/**
* Get the status report for the Namenode monitored by this heartbeater.
* @return Namenode status report.
*/
protected NamenodeStatusReport getNamenodeStatusReport() {
NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId,
namenodeId, rpcAddress, serviceAddress,
lifelineAddress, scheme, webAddress);
try {
LOG.debug("Probing NN at service address: {}", serviceAddress);
URI serviceURI = new URI("hdfs://" + serviceAddress);
// Read the filesystem info from RPC (required)
updateNameSpaceInfoParameters(serviceURI, report);
if (!report.registrationValid()) {
return report;
}
// Check for safemode from the client protocol. Currently optional, but
// should be required at some point for QoS
updateSafeModeParameters(serviceURI, report);
// Read the stats from JMX (optional)
updateJMXParameters(webAddress, report);
// Try to get the HA status
updateHAStatusParameters(report);
} catch (IOException e) {
LOG.error("Cannot communicate with {}: {}",
getNamenodeDesc(), e.getMessage());
} catch (Throwable e) {
// Generic error that we don't know about
LOG.error("Unexpected exception while communicating with {}: {}",
getNamenodeDesc(), e.getMessage(), e);
}
return report;
}
@VisibleForTesting
NNHAServiceTarget getLocalTarget(){
return this.localTarget;
}
/**
* Get the description of the Namenode to monitor.
* @return Description of the Namenode to monitor.
*/
public String getNamenodeDesc() {
if (namenodeId != null && !namenodeId.isEmpty()) {
return nameserviceId + "-" + namenodeId + ":" + serviceAddress;
} else {
return nameserviceId + ":" + serviceAddress;
}
}
private static String getNnHeartBeatServiceName(String nsId, String nnId) {
return NamenodeHeartbeatService.class.getSimpleName() +
(nsId == null ? "" : " " + nsId) +
(nnId == null ? "" : " " + nnId);
}
/**
* Get the namespace information for a Namenode via RPC and add them to the report.
* @param serviceURI Server address of the Namenode to monitor.
* @param report Namenode status report updating with namespace information data.
* @throws IOException This method will throw IOException up, because RBF need
* use Namespace Info to identify this NS. If there are some IOExceptions,
* RBF doesn't need to get other information from NameNode,
* so throw IOException up.
*/
private void updateNameSpaceInfoParameters(URI serviceURI,
NamenodeStatusReport report) throws IOException {
try {
if (this.namenodeProtocol == null) {
this.namenodeProtocol = NameNodeProxies.createProxy(this.conf, serviceURI,
NamenodeProtocol.class).getProxy();
}
if (namenodeProtocol != null) {
NamespaceInfo info = namenodeProtocol.versionRequest();
if (info != null) {
report.setNamespaceInfo(info);
}
}
} catch (IOException e) {
this.namenodeProtocol = null;
throw e;
}
}
/**
* Get the safemode information for a Namenode via RPC and add them to the report.
* Safemode is only one status of NameNode and is useless for RBF identify one NameNode.
* So If there are some IOExceptions, RBF can just ignore it and try to collect
* other information form namenode continue.
* @param serviceURI Server address of the Namenode to monitor.
* @param report Namenode status report updating with safemode information data.
*/
private void updateSafeModeParameters(URI serviceURI, NamenodeStatusReport report) {
try {
if (this.clientProtocol == null) {
this.clientProtocol = NameNodeProxies
.createProxy(this.conf, serviceURI, ClientProtocol.class)
.getProxy();
}
if (clientProtocol != null) {
boolean isSafeMode = clientProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
report.setSafeMode(isSafeMode);
}
} catch (Exception e) {
LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
this.clientProtocol = null;
}
}
/**
* Get the parameters for a Namenode from JMX and add them to the report.
* @param address Web interface of the Namenode to monitor.
* @param report Namenode status report to update with JMX data.
*/
private void updateJMXParameters(
String address, NamenodeStatusReport report) {
try {
// TODO part of this should be moved to its own utility
if (shouldUpdateJmx()) {
this.lastJmxUpdateAttempt = Time.monotonicNow();
getFsNamesystemMetrics(address);
getNamenodeInfoMetrics(address);
}
populateFsNamesystemMetrics(this.fsNamesystemMetrics, report);
populateNamenodeInfoMetrics(this.namenodeInfoMetrics, report);
} catch (Exception e) {
LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
}
}
/**
* Get the HA status for a Namenode via RPC and add them to the report.
* @param report Namenode status report updating with HA status information data.
*/
private void updateHAStatusParameters(NamenodeStatusReport report) {
if (localTarget != null) {
try {
// Determine if NN is active
if (localTargetHAProtocol == null) {
localTargetHAProtocol = localTarget.getHealthMonitorProxy(
conf, this.healthMonitorTimeoutMs);
LOG.debug("Get HA status with address {}", lifelineAddress);
}
HAServiceStatus status = localTargetHAProtocol.getServiceStatus();
report.setHAServiceState(status.getState());
} catch (Throwable e) {
if (e.getMessage().startsWith("HA for namenode is not enabled")) {
LOG.error("HA for {} is not enabled", getNamenodeDesc());
localTarget = null;
} else {
// Failed to fetch HA status, ignoring failure
LOG.error("Cannot fetch HA status for {}", getNamenodeDesc(), e);
}
localTargetHAProtocol = null;
}
}
}
/**
* Evaluates whether the JMX report should be refreshed by
* calling the Namenode, based on the following conditions:
* 1. JMX Updates must be enabled.
* 2. The last attempt to update JMX occurred before the
* configured interval (if any).
*/
private boolean shouldUpdateJmx() {
if (this.updateJmxIntervalMs < 0) {
return false;
}
return Time.monotonicNow() - this.lastJmxUpdateAttempt > this.updateJmxIntervalMs;
}
/**
* Fetches NamenodeInfo metrics from namenode.
* @param address Web interface of the Namenode to monitor.
*/
private void getNamenodeInfoMetrics(String address) {
String query = "Hadoop:service=NameNode,name=NameNodeInfo";
this.namenodeInfoMetrics = FederationUtil.getJmx(query, address, connectionFactory, scheme);
}
/**
* Populates NamenodeInfo metrics into report.
* @param aux NamenodeInfo metrics from namenode.
* @param report Namenode status report to update with JMX data.
* @throws JSONException When an invalid JSONObject is found
*/
private void populateNamenodeInfoMetrics(JSONArray aux, NamenodeStatusReport report)
throws JSONException {
if (aux != null && aux.length() > 0) {
JSONObject jsonObject = aux.getJSONObject(0);
String name = jsonObject.getString("name");
if (name.equals("Hadoop:service=NameNode,name=NameNodeInfo")) {
report.setNamenodeInfo(jsonObject.optInt("CorruptFilesCount"),
jsonObject
.optLong("NumberOfMissingBlocksWithReplicationFactorOne"),
jsonObject
.optLong("HighestPriorityLowRedundancyReplicatedBlocks"),
jsonObject.optLong("HighestPriorityLowRedundancyECBlocks"));
}
}
}
/**
* Fetches FSNamesystem* metrics from namenode.
* @param address Web interface of the Namenode to monitor.
*/
private void getFsNamesystemMetrics(String address) {
String query = "Hadoop:service=NameNode,name=FSNamesystem*";
this.fsNamesystemMetrics = FederationUtil.getJmx(query, address, connectionFactory, scheme);
}
/**
* Populates FSNamesystem* metrics into report.
* @param aux FSNamesystem* metrics from namenode.
* @param report Namenode status report to update with JMX data.
* @throws JSONException When invalid JSONObject is found.
*/
private void populateFsNamesystemMetrics(JSONArray aux, NamenodeStatusReport report)
throws JSONException {
if (aux != null) {
for (int i = 0; i < aux.length(); i++) {
JSONObject jsonObject = aux.getJSONObject(i);
String name = jsonObject.getString("name");
if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) {
report.setDatanodeInfo(
jsonObject.getInt("NumLiveDataNodes"),
jsonObject.getInt("NumDeadDataNodes"),
jsonObject.getInt("NumStaleDataNodes"),
jsonObject.getInt("NumDecommissioningDataNodes"),
jsonObject.getInt("NumDecomLiveDataNodes"),
jsonObject.getInt("NumDecomDeadDataNodes"),
jsonObject.optInt("NumInMaintenanceLiveDataNodes"),
jsonObject.optInt("NumInMaintenanceDeadDataNodes"),
jsonObject.optInt("NumEnteringMaintenanceDataNodes"),
jsonObject.optLong("ScheduledReplicationBlocks"));
} else if (name.equals(
"Hadoop:service=NameNode,name=FSNamesystem")) {
report.setNamesystemInfo(
jsonObject.getLong("CapacityRemaining"),
jsonObject.getLong("CapacityTotal"),
jsonObject.getLong("FilesTotal"),
jsonObject.getLong("BlocksTotal"),
jsonObject.getLong("MissingBlocks"),
jsonObject.getLong("PendingReplicationBlocks"),
jsonObject.getLong("UnderReplicatedBlocks"),
jsonObject.getLong("PendingDeletionBlocks"),
jsonObject.optLong("ProvidedCapacityTotal"),
jsonObject.getInt("PendingSPSPaths"));
}
}
}
}
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping NamenodeHeartbeat service for, NS {} NN {} ",
this.nameserviceId, this.namenodeId);
if (this.connectionFactory != null) {
this.connectionFactory.destroy();
}
super.serviceStop();
}
}