blob: 5b30b738c7ab528a80d5b78dfcdba58526c2a93e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.primitives.Ints;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
* This class aggregates information from {@link SlowPeerReports} received via
* heartbeats.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SlowPeerTracker {
public static final Logger LOG =
LoggerFactory.getLogger(SlowPeerTracker.class);
/**
* Time duration after which a report is considered stale. This is
* set to DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY * 3 i.e.
* maintained for at least two successive reports.
*/
private final long reportValidityMs;
/**
* Timer object for querying the current time. Separated out for
* unit testing.
*/
private final Timer timer;
/**
* ObjectWriter to convert JSON reports to String.
*/
private static final ObjectWriter WRITER = new ObjectMapper().writer();
/**
* Number of nodes to include in JSON report. We will return nodes with
* the highest number of votes from peers.
*/
private static final int MAX_NODES_TO_REPORT = 5;
/**
* Information about peers that have reported a node as being slow.
* Each outer map entry is a map of (DatanodeId) {@literal ->} (timestamp),
* mapping reporting nodes to the timestamp of the last report from
* that node.
*
* DatanodeId could be the DataNodeId or its address. We
* don't care as long as the caller uses it consistently.
*
* Stale reports are not evicted proactively and can potentially
* hang around forever.
*/
private final ConcurrentMap<String, ConcurrentMap<String, Long>>
allReports;
public SlowPeerTracker(Configuration conf, Timer timer) {
this.timer = timer;
this.allReports = new ConcurrentHashMap<>();
this.reportValidityMs = conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS) * 3;
}
/**
* Add a new report. DatanodeIds can be the DataNodeIds or addresses
* We don't care as long as the caller is consistent.
*
* @param reportingNode DataNodeId of the node reporting on its peer.
* @param slowNode DataNodeId of the peer suspected to be slow.
*/
public void addReport(String slowNode,
String reportingNode) {
ConcurrentMap<String, Long> nodeEntries = allReports.get(slowNode);
if (nodeEntries == null) {
// putIfAbsent guards against multiple writers.
allReports.putIfAbsent(slowNode, new ConcurrentHashMap<>());
nodeEntries = allReports.get(slowNode);
}
// Replace the existing entry from this node, if any.
nodeEntries.put(reportingNode, timer.monotonicNow());
}
/**
* Retrieve the non-expired reports that mark a given DataNode
* as slow. Stale reports are excluded.
*
* @param slowNode target node Id.
* @return set of reports which implicate the target node as being slow.
*/
public Set<String> getReportsForNode(String slowNode) {
final ConcurrentMap<String, Long> nodeEntries =
allReports.get(slowNode);
if (nodeEntries == null || nodeEntries.isEmpty()) {
return Collections.emptySet();
}
return filterNodeReports(nodeEntries, timer.monotonicNow());
}
/**
* Retrieve all reports for all nodes. Stale reports are excluded.
*
* @return map from SlowNodeId {@literal ->} (set of nodes reporting peers).
*/
public Map<String, SortedSet<String>> getReportsForAllDataNodes() {
if (allReports.isEmpty()) {
return ImmutableMap.of();
}
final Map<String, SortedSet<String>> allNodesValidReports = new HashMap<>();
final long now = timer.monotonicNow();
for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
allReports.entrySet()) {
SortedSet<String> validReports = filterNodeReports(entry.getValue(), now);
if (!validReports.isEmpty()) {
allNodesValidReports.put(entry.getKey(), validReports);
}
}
return allNodesValidReports;
}
/**
* Filter the given reports to return just the valid ones.
*
* @param reports
* @param now
* @return
*/
private SortedSet<String> filterNodeReports(
ConcurrentMap<String, Long> reports, long now) {
final SortedSet<String> validReports = new TreeSet<>();
for (Map.Entry<String, Long> entry : reports.entrySet()) {
if (now - entry.getValue() < reportValidityMs) {
validReports.add(entry.getKey());
}
}
return validReports;
}
/**
* Retrieve all valid reports as a JSON string.
* @return serialized representation of valid reports. null if
* serialization failed.
*/
public String getJson() {
Collection<ReportForJson> validReports = getJsonReports(
MAX_NODES_TO_REPORT);
try {
return WRITER.writeValueAsString(validReports);
} catch (JsonProcessingException e) {
// Failed to serialize. Don't log the exception call stack.
LOG.debug("Failed to serialize statistics" + e);
return null;
}
}
/**
* This structure is a thin wrapper over reports to make Json
* [de]serialization easy.
*/
public static class ReportForJson {
@JsonProperty("SlowNode")
final private String slowNode;
@JsonProperty("ReportingNodes")
final private SortedSet<String> reportingNodes;
public ReportForJson(
@JsonProperty("SlowNode") String slowNode,
@JsonProperty("ReportingNodes") SortedSet<String> reportingNodes) {
this.slowNode = slowNode;
this.reportingNodes = reportingNodes;
}
public String getSlowNode() {
return slowNode;
}
public SortedSet<String> getReportingNodes() {
return reportingNodes;
}
}
/**
* Retrieve reports in a structure for generating JSON, limiting the
* output to the top numNodes nodes i.e nodes with the most reports.
* @param numNodes number of nodes to return. This is to limit the
* size of the generated JSON.
*/
private Collection<ReportForJson> getJsonReports(int numNodes) {
if (allReports.isEmpty()) {
return Collections.emptyList();
}
final PriorityQueue<ReportForJson> topNReports =
new PriorityQueue<>(allReports.size(),
new Comparator<ReportForJson>() {
@Override
public int compare(ReportForJson o1, ReportForJson o2) {
return Ints.compare(o1.reportingNodes.size(),
o2.reportingNodes.size());
}
});
final long now = timer.monotonicNow();
for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
allReports.entrySet()) {
SortedSet<String> validReports = filterNodeReports(
entry.getValue(), now);
if (!validReports.isEmpty()) {
if (topNReports.size() < numNodes) {
topNReports.add(new ReportForJson(entry.getKey(), validReports));
} else if (topNReports.peek().getReportingNodes().size() <
validReports.size()){
// Remove the lowest element
topNReports.poll();
topNReports.add(new ReportForJson(entry.getKey(), validReports));
}
}
}
return topNReports;
}
@VisibleForTesting
long getReportValidityMs() {
return reportValidityMs;
}
}