blob: 970bd4fdfeea5f65f03fc9b509cd7b912d54f20b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.metrics;
import static org.apache.hadoop.util.Time.now;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.codehaus.jettison.json.JSONObject;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Implementation of the Router metrics collector.
*/
public class FederationMetrics implements FederationMBean {
private static final Logger LOG =
LoggerFactory.getLogger(FederationMetrics.class);
/** Format for a date. */
private static final String DATE_FORMAT = "yyyy/MM/dd HH:mm:ss";
/** Prevent holding the page from load too long. */
private static final long TIME_OUT = TimeUnit.SECONDS.toMillis(1);
/** Router interface. */
private final Router router;
/** FederationState JMX bean. */
private ObjectName beanName;
/** Resolve the namenode for each namespace. */
private final ActiveNamenodeResolver namenodeResolver;
/** State store. */
private final StateStoreService stateStore;
/** Membership state store. */
private MembershipStore membershipStore;
/** Mount table store. */
private MountTableStore mountTableStore;
/** Router state store. */
private RouterStore routerStore;
public FederationMetrics(Router router) throws IOException {
this.router = router;
try {
StandardMBean bean = new StandardMBean(this, FederationMBean.class);
this.beanName = MBeans.register("Router", "FederationState", bean);
LOG.info("Registered Router MBean: {}", this.beanName);
} catch (NotCompliantMBeanException e) {
throw new RuntimeException("Bad Router MBean setup", e);
}
// Resolve namenode for each nameservice
this.namenodeResolver = this.router.getNamenodeResolver();
// State store interfaces
this.stateStore = this.router.getStateStore();
if (this.stateStore == null) {
LOG.error("State store not available");
} else {
this.membershipStore = stateStore.getRegisteredRecordStore(
MembershipStore.class);
this.mountTableStore = stateStore.getRegisteredRecordStore(
MountTableStore.class);
this.routerStore = stateStore.getRegisteredRecordStore(
RouterStore.class);
}
}
/**
* Unregister the JMX beans.
*/
public void close() {
if (this.beanName != null) {
MBeans.unregister(beanName);
}
}
@Override
public String getNamenodes() {
final Map<String, Map<String, Object>> info = new LinkedHashMap<>();
try {
// Get the values from the store
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance();
GetNamenodeRegistrationsResponse response =
membershipStore.getNamenodeRegistrations(request);
// Order the namenodes
final List<MembershipState> namenodes = response.getNamenodeMemberships();
if (namenodes == null || namenodes.size() == 0) {
return JSON.toString(info);
}
List<MembershipState> namenodesOrder = new ArrayList<>(namenodes);
Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR);
// Dump namenodes information into JSON
for (MembershipState namenode : namenodesOrder) {
Map<String, Object> innerInfo = new HashMap<>();
Map<String, Object> map = getJson(namenode);
innerInfo.putAll(map);
long dateModified = namenode.getDateModified();
long lastHeartbeat = getSecondsSince(dateModified);
innerInfo.put("lastHeartbeat", lastHeartbeat);
MembershipStats stats = namenode.getStats();
long used = stats.getTotalSpace() - stats.getAvailableSpace();
innerInfo.put("used", used);
info.put(namenode.getNamenodeKey(),
Collections.unmodifiableMap(innerInfo));
}
} catch (IOException e) {
LOG.error("Enable to fetch json representation of namenodes {}",
e.getMessage());
return "{}";
}
return JSON.toString(info);
}
@Override
public String getNameservices() {
final Map<String, Map<String, Object>> info = new LinkedHashMap<>();
try {
final List<MembershipState> namenodes = getActiveNamenodeRegistrations();
List<MembershipState> namenodesOrder = new ArrayList<>(namenodes);
Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR);
// Dump namenodes information into JSON
for (MembershipState namenode : namenodesOrder) {
Map<String, Object> innerInfo = new HashMap<>();
Map<String, Object> map = getJson(namenode);
innerInfo.putAll(map);
long dateModified = namenode.getDateModified();
long lastHeartbeat = getSecondsSince(dateModified);
innerInfo.put("lastHeartbeat", lastHeartbeat);
MembershipStats stats = namenode.getStats();
long used = stats.getTotalSpace() - stats.getAvailableSpace();
innerInfo.put("used", used);
info.put(namenode.getNamenodeKey(),
Collections.unmodifiableMap(innerInfo));
}
} catch (IOException e) {
LOG.error("Cannot retrieve nameservices for JMX: {}", e.getMessage());
return "{}";
}
return JSON.toString(info);
}
@Override
public String getMountTable() {
final List<Map<String, Object>> info = new LinkedList<>();
try {
// Get all the mount points in order
GetMountTableEntriesRequest request =
GetMountTableEntriesRequest.newInstance("/");
GetMountTableEntriesResponse response =
mountTableStore.getMountTableEntries(request);
final List<MountTable> mounts = response.getEntries();
List<MountTable> orderedMounts = new ArrayList<>(mounts);
Collections.sort(orderedMounts, MountTable.SOURCE_COMPARATOR);
// Dump mount table entries information into JSON
for (MountTable entry : orderedMounts) {
// Sumarize destinations
Set<String> nameservices = new LinkedHashSet<>();
Set<String> paths = new LinkedHashSet<>();
for (RemoteLocation location : entry.getDestinations()) {
nameservices.add(location.getNameserviceId());
paths.add(location.getDest());
}
Map<String, Object> map = getJson(entry);
// We add some values with a cleaner format
map.put("dateCreated", getDateString(entry.getDateCreated()));
map.put("dateModified", getDateString(entry.getDateModified()));
Map<String, Object> innerInfo = new HashMap<>();
innerInfo.putAll(map);
innerInfo.put("nameserviceId", StringUtils.join(",", nameservices));
innerInfo.put("path", StringUtils.join(",", paths));
if (nameservices.size() > 1) {
innerInfo.put("order", entry.getDestOrder().toString());
} else {
innerInfo.put("order", "");
}
innerInfo.put("readonly", entry.isReadOnly());
info.add(Collections.unmodifiableMap(innerInfo));
}
} catch (IOException e) {
LOG.error(
"Cannot generate JSON of mount table from store: {}", e.getMessage());
return "[]";
}
return JSON.toString(info);
}
@Override
public String getRouters() {
final Map<String, Map<String, Object>> info = new LinkedHashMap<>();
try {
// Get all the routers in order
GetRouterRegistrationsRequest request =
GetRouterRegistrationsRequest.newInstance();
GetRouterRegistrationsResponse response =
routerStore.getRouterRegistrations(request);
final List<RouterState> routers = response.getRouters();
List<RouterState> routersOrder = new ArrayList<>(routers);
Collections.sort(routersOrder);
// Dump router information into JSON
for (RouterState record : routersOrder) {
Map<String, Object> innerInfo = new HashMap<>();
Map<String, Object> map = getJson(record);
innerInfo.putAll(map);
long dateModified = record.getDateModified();
long lastHeartbeat = getSecondsSince(dateModified);
innerInfo.put("lastHeartbeat", lastHeartbeat);
StateStoreVersion stateStoreVersion = record.getStateStoreVersion();
if (stateStoreVersion == null) {
LOG.error("Cannot get State Store versions");
} else {
setStateStoreVersions(innerInfo, stateStoreVersion);
}
info.put(record.getPrimaryKey(),
Collections.unmodifiableMap(innerInfo));
}
} catch (IOException e) {
LOG.error("Cannot get Routers JSON from the State Store", e);
return "{}";
}
return JSON.toString(info);
}
/**
* Populate the map with the State Store versions.
*
* @param innerInfo Map with the information.
* @param version State Store versions.
*/
private static void setStateStoreVersions(
Map<String, Object> map, StateStoreVersion version) {
long membershipVersion = version.getMembershipVersion();
String lastMembershipUpdate = getDateString(membershipVersion);
map.put("lastMembershipUpdate", lastMembershipUpdate);
long mountTableVersion = version.getMountTableVersion();
String lastMountTableDate = getDateString(mountTableVersion);
map.put("lastMountTableUpdate", lastMountTableDate);
}
@Override
public long getTotalCapacity() {
return getNameserviceAggregatedLong(MembershipStats::getTotalSpace);
}
@Override
public long getRemainingCapacity() {
return getNameserviceAggregatedLong(MembershipStats::getAvailableSpace);
}
@Override
public long getProvidedSpace() {
return getNameserviceAggregatedLong(MembershipStats::getProvidedSpace);
}
@Override
public long getUsedCapacity() {
return getTotalCapacity() - getRemainingCapacity();
}
@Override
public int getNumNameservices() {
try {
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
return nss.size();
} catch (IOException e) {
LOG.error(
"Cannot fetch number of expired registrations from the store: {}",
e.getMessage());
return 0;
}
}
@Override
public int getNumNamenodes() {
try {
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance();
GetNamenodeRegistrationsResponse response =
membershipStore.getNamenodeRegistrations(request);
List<MembershipState> memberships = response.getNamenodeMemberships();
return memberships.size();
} catch (IOException e) {
LOG.error("Cannot retrieve numNamenodes for JMX: {}", e.getMessage());
return 0;
}
}
@Override
public int getNumExpiredNamenodes() {
try {
GetNamenodeRegistrationsRequest request =
GetNamenodeRegistrationsRequest.newInstance();
GetNamenodeRegistrationsResponse response =
membershipStore.getExpiredNamenodeRegistrations(request);
List<MembershipState> expiredMemberships =
response.getNamenodeMemberships();
return expiredMemberships.size();
} catch (IOException e) {
LOG.error(
"Cannot retrieve numExpiredNamenodes for JMX: {}", e.getMessage());
return 0;
}
}
@Override
public int getNumLiveNodes() {
return getNameserviceAggregatedInt(
MembershipStats::getNumOfActiveDatanodes);
}
@Override
public int getNumDeadNodes() {
return getNameserviceAggregatedInt(MembershipStats::getNumOfDeadDatanodes);
}
@Override
public int getNumDecommissioningNodes() {
return getNameserviceAggregatedInt(
MembershipStats::getNumOfDecommissioningDatanodes);
}
@Override
public int getNumDecomLiveNodes() {
return getNameserviceAggregatedInt(
MembershipStats::getNumOfDecomActiveDatanodes);
}
@Override
public int getNumDecomDeadNodes() {
return getNameserviceAggregatedInt(
MembershipStats::getNumOfDecomDeadDatanodes);
}
@Override // NameNodeMXBean
public String getNodeUsage() {
float median = 0;
float max = 0;
float min = 0;
float dev = 0;
final Map<String, Map<String, Object>> info = new HashMap<>();
try {
RouterRpcServer rpcServer = this.router.getRpcServer();
DatanodeInfo[] live = rpcServer.getDatanodeReport(
DatanodeReportType.LIVE, false, TIME_OUT);
if (live.length > 0) {
float totalDfsUsed = 0;
float[] usages = new float[live.length];
int i = 0;
for (DatanodeInfo dn : live) {
usages[i++] = dn.getDfsUsedPercent();
totalDfsUsed += dn.getDfsUsedPercent();
}
totalDfsUsed /= live.length;
Arrays.sort(usages);
median = usages[usages.length / 2];
max = usages[usages.length - 1];
min = usages[0];
for (i = 0; i < usages.length; i++) {
dev += (usages[i] - totalDfsUsed) * (usages[i] - totalDfsUsed);
}
dev = (float) Math.sqrt(dev / usages.length);
}
} catch (IOException e) {
LOG.error("Cannot get the live nodes: {}", e.getMessage());
}
final Map<String, Object> innerInfo = new HashMap<>();
innerInfo.put("min", StringUtils.format("%.2f%%", min));
innerInfo.put("median", StringUtils.format("%.2f%%", median));
innerInfo.put("max", StringUtils.format("%.2f%%", max));
innerInfo.put("stdDev", StringUtils.format("%.2f%%", dev));
info.put("nodeUsage", innerInfo);
return JSON.toString(info);
}
@Override
public long getNumBlocks() {
return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocks);
}
@Override
public long getNumOfMissingBlocks() {
return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocksMissing);
}
@Override
public long getNumOfBlocksPendingReplication() {
return getNameserviceAggregatedLong(
MembershipStats::getNumOfBlocksPendingReplication);
}
@Override
public long getNumOfBlocksUnderReplicated() {
return getNameserviceAggregatedLong(
MembershipStats::getNumOfBlocksUnderReplicated);
}
@Override
public long getNumOfBlocksPendingDeletion() {
return getNameserviceAggregatedLong(
MembershipStats::getNumOfBlocksPendingDeletion);
}
@Override
public long getNumFiles() {
return getNameserviceAggregatedLong(MembershipStats::getNumOfFiles);
}
@Override
public String getRouterStarted() {
long startTime = this.router.getStartTime();
return new Date(startTime).toString();
}
@Override
public String getVersion() {
return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision();
}
@Override
public String getCompiledDate() {
return VersionInfo.getDate();
}
@Override
public String getCompileInfo() {
return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from "
+ VersionInfo.getBranch();
}
@Override
public String getHostAndPort() {
InetSocketAddress address = this.router.getHttpServerAddress();
if (address != null) {
try {
String hostname = InetAddress.getLocalHost().getHostName();
int port = address.getPort();
return hostname + ":" + port;
} catch (UnknownHostException ignored) { }
}
return "Unknown";
}
@Override
public String getRouterId() {
return this.router.getRouterId();
}
@Override
public String getClusterId() {
try {
Collection<String> clusterIds =
getNamespaceInfo(FederationNamespaceInfo::getClusterId);
return clusterIds.toString();
} catch (IOException e) {
LOG.error("Cannot fetch cluster ID metrics: {}", e.getMessage());
return "";
}
}
@Override
public String getBlockPoolId() {
try {
Collection<String> blockpoolIds =
getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId);
return blockpoolIds.toString();
} catch (IOException e) {
LOG.error("Cannot fetch block pool ID metrics: {}", e.getMessage());
return "";
}
}
@Override
public String getRouterStatus() {
return "RUNNING";
}
/**
* Build a set of unique values found in all namespaces.
*
* @param f Method reference of the appropriate FederationNamespaceInfo
* getter function
* @return Set of unique string values found in all discovered namespaces.
* @throws IOException if the query could not be executed.
*/
private Collection<String> getNamespaceInfo(
Function<FederationNamespaceInfo, String> f) throws IOException {
GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
GetNamespaceInfoResponse response =
membershipStore.getNamespaceInfo(request);
return response.getNamespaceInfo().stream()
.map(f)
.collect(Collectors.toSet());
}
/**
* Get the aggregated value for a method for all nameservices.
* @param f Method reference
* @return Aggregated integer.
*/
private int getNameserviceAggregatedInt(ToIntFunction<MembershipStats> f) {
try {
return getActiveNamenodeRegistrations().stream()
.map(MembershipState::getStats)
.collect(Collectors.summingInt(f));
} catch (IOException e) {
LOG.error("Unable to extract metrics: {}", e.getMessage());
return 0;
}
}
/**
* Get the aggregated value for a method for all nameservices.
* @param f Method reference
* @return Aggregated long.
*/
private long getNameserviceAggregatedLong(ToLongFunction<MembershipStats> f) {
try {
return getActiveNamenodeRegistrations().stream()
.map(MembershipState::getStats)
.collect(Collectors.summingLong(f));
} catch (IOException e) {
LOG.error("Unable to extract metrics: {}", e.getMessage());
return 0;
}
}
/**
* Fetches the most active namenode memberships for all known nameservices.
* The fetched membership may not or may not be active. Excludes expired
* memberships.
* @throws IOException if the query could not be performed.
* @return List of the most active NNs from each known nameservice.
*/
private List<MembershipState> getActiveNamenodeRegistrations()
throws IOException {
List<MembershipState> resultList = new ArrayList<>();
GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
GetNamespaceInfoResponse response =
membershipStore.getNamespaceInfo(request);
for (FederationNamespaceInfo nsInfo : response.getNamespaceInfo()) {
// Fetch the most recent namenode registration
String nsId = nsInfo.getNameserviceId();
List<? extends FederationNamenodeContext> nns =
namenodeResolver.getNamenodesForNameserviceId(nsId);
if (nns != null) {
FederationNamenodeContext nn = nns.get(0);
if (nn != null && nn instanceof MembershipState) {
resultList.add((MembershipState) nn);
}
}
}
return resultList;
}
/**
* Get time as a date string.
* @param time Seconds since 1970.
* @return String representing the date.
*/
@VisibleForTesting
static String getDateString(long time) {
if (time <= 0) {
return "-";
}
Date date = new Date(time);
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
return sdf.format(date);
}
/**
* Get the number of seconds passed since a date.
*
* @param timeMs to use as a reference.
* @return Seconds since the date.
*/
private static long getSecondsSince(long timeMs) {
if (timeMs < 0) {
return -1;
}
return (now() - timeMs) / 1000;
}
/**
* Get JSON for this record.
*
* @return Map representing the data for the JSON representation.
*/
private static Map<String, Object> getJson(BaseRecord record) {
Map<String, Object> json = new HashMap<>();
Map<String, Class<?>> fields = getFields(record);
for (String fieldName : fields.keySet()) {
if (!fieldName.equalsIgnoreCase("proto")) {
try {
Object value = getField(record, fieldName);
if (value instanceof BaseRecord) {
BaseRecord recordField = (BaseRecord) value;
json.putAll(getJson(recordField));
} else {
json.put(fieldName, value == null ? JSONObject.NULL : value);
}
} catch (Exception e) {
throw new IllegalArgumentException(
"Cannot serialize field " + fieldName + " into JSON");
}
}
}
return json;
}
/**
* Returns all serializable fields in the object.
*
* @return Map with the fields.
*/
private static Map<String, Class<?>> getFields(BaseRecord record) {
Map<String, Class<?>> getters = new HashMap<>();
for (Method m : record.getClass().getDeclaredMethods()) {
if (m.getName().startsWith("get")) {
try {
Class<?> type = m.getReturnType();
char[] c = m.getName().substring(3).toCharArray();
c[0] = Character.toLowerCase(c[0]);
String key = new String(c);
getters.put(key, type);
} catch (Exception e) {
LOG.error("Cannot execute getter {} on {}", m.getName(), record);
}
}
}
return getters;
}
/**
* Fetches the value for a field name.
*
* @param fieldName the legacy name of the field.
* @return The field data or null if not found.
*/
private static Object getField(BaseRecord record, String fieldName) {
Object result = null;
Method m = locateGetter(record, fieldName);
if (m != null) {
try {
result = m.invoke(record);
} catch (Exception e) {
LOG.error("Cannot get field {} on {}", fieldName, record);
}
}
return result;
}
/**
* Finds the appropriate getter for a field name.
*
* @param fieldName The legacy name of the field.
* @return The matching getter or null if not found.
*/
private static Method locateGetter(BaseRecord record, String fieldName) {
for (Method m : record.getClass().getMethods()) {
if (m.getName().equalsIgnoreCase("get" + fieldName)) {
return m;
}
}
return null;
}
}