blob: d4f68039dfbbb3b2d768b14d63c8d5534d1f3ac9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.hdfs.DFSConfigKeys
.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys
.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp;
import org.apache.hadoop.hdfs.server.blockmanagement.SlowDiskTracker
.DiskLatency;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.FakeTimer;
import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Tests for {@link SlowDiskTracker}.
*/
public class TestSlowDiskTracker {
public static final Logger LOG = LoggerFactory.getLogger(
TestSlowDiskTracker.class);
/**
* Set a timeout for every test case.
*/
@Rule
public Timeout testTimeout = new Timeout(300_000);
private static Configuration conf;
private SlowDiskTracker tracker;
private FakeTimer timer;
private long reportValidityMs;
private static final long OUTLIERS_REPORT_INTERVAL = 1000;
static {
conf = new HdfsConfiguration();
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setInt(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, 100);
conf.setTimeDuration(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
OUTLIERS_REPORT_INTERVAL, TimeUnit.MILLISECONDS);
}
@Before
public void setup() {
timer = new FakeTimer();
tracker = new SlowDiskTracker(conf, timer);
reportValidityMs = tracker.getReportValidityMs();
}
@Test
public void testDataNodeHeartbeatSlowDiskReport() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build();
try {
DataNode dn1 = cluster.getDataNodes().get(0);
DataNode dn2 = cluster.getDataNodes().get(1);
NameNode nn = cluster.getNameNode(0);
DatanodeManager datanodeManager = nn.getNamesystem().getBlockManager()
.getDatanodeManager();
final SlowDiskTracker slowDiskTracker = datanodeManager.getSlowDiskTracker();
slowDiskTracker.setReportValidityMs(OUTLIERS_REPORT_INTERVAL * 100);
dn1.getDiskMetrics().addSlowDiskForTesting("disk1", ImmutableMap.of(
DiskOp.WRITE, 1.3));
dn1.getDiskMetrics().addSlowDiskForTesting("disk2", ImmutableMap.of(
DiskOp.READ, 1.6, DiskOp.WRITE, 1.1));
dn2.getDiskMetrics().addSlowDiskForTesting("disk1", ImmutableMap.of(
DiskOp.METADATA, 0.8));
dn2.getDiskMetrics().addSlowDiskForTesting("disk2", ImmutableMap.of(
DiskOp.WRITE, 1.3));
String dn1ID = dn1.getDatanodeId().getIpcAddr(false);
String dn2ID = dn2.getDatanodeId().getIpcAddr(false);
// Advance the timer and wait for NN to receive reports from DataNodes.
Thread.sleep(OUTLIERS_REPORT_INTERVAL);
// Wait for NN to receive reports from all DNs
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return (slowDiskTracker.getSlowDisksReport().size() == 4);
}
}, 1000, 100000);
Map<String, DiskLatency> slowDisksReport = getSlowDisksReportForTesting(
slowDiskTracker);
assertThat(slowDisksReport.size(), is(4));
assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk1")
.getLatency(DiskOp.WRITE) - 1.3) < 0.0000001);
assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk2")
.getLatency(DiskOp.READ) - 1.6) < 0.0000001);
assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk2")
.getLatency(DiskOp.WRITE) - 1.1) < 0.0000001);
assertTrue(Math.abs(slowDisksReport.get(dn2ID + ":disk1")
.getLatency(DiskOp.METADATA) - 0.8) < 0.0000001);
assertTrue(Math.abs(slowDisksReport.get(dn2ID + ":disk2")
.getLatency(DiskOp.WRITE) - 1.3) < 0.0000001);
// Test the slow disk report JSON string
ArrayList<DiskLatency> jsonReport = getAndDeserializeJson(
slowDiskTracker.getSlowDiskReportAsJsonString());
assertThat(jsonReport.size(), is(4));
assertTrue(isDiskInReports(jsonReport, dn1ID, "disk1", DiskOp.WRITE, 1.3));
assertTrue(isDiskInReports(jsonReport, dn1ID, "disk2", DiskOp.READ, 1.6));
assertTrue(isDiskInReports(jsonReport, dn1ID, "disk2", DiskOp.WRITE, 1.1));
assertTrue(isDiskInReports(jsonReport, dn2ID, "disk1", DiskOp.METADATA,
0.8));
assertTrue(isDiskInReports(jsonReport, dn2ID, "disk2", DiskOp.WRITE, 1.3));
} finally {
cluster.shutdown();
}
}
/**
* Edge case, there are no reports to retrieve.
*/
@Test
public void testEmptyReports() {
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
assertTrue(getSlowDisksReportForTesting(tracker).isEmpty());
}
@Test
public void testReportsAreRetrieved() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
addSlowDiskForTesting("dn1", "disk2",
ImmutableMap.of(DiskOp.READ, 1.3));
addSlowDiskForTesting("dn2", "disk2",
ImmutableMap.of(DiskOp.READ, 1.1));
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !tracker.getSlowDisksReport().isEmpty();
}
}, 500, 5000);
Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker);
assertThat(reports.size(), is(3));
assertTrue(Math.abs(reports.get("dn1:disk1")
.getLatency(DiskOp.METADATA) - 1.1) < 0.0000001);
assertTrue(Math.abs(reports.get("dn1:disk1")
.getLatency(DiskOp.READ) - 1.8) < 0.0000001);
assertTrue(Math.abs(reports.get("dn1:disk2")
.getLatency(DiskOp.READ) - 1.3) < 0.0000001);
assertTrue(Math.abs(reports.get("dn2:disk2")
.getLatency(DiskOp.READ) - 1.1) < 0.0000001);
}
/**
* Test that when all reports are expired, we get back nothing.
*/
@Test
public void testAllReportsAreExpired() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
addSlowDiskForTesting("dn1", "disk2",
ImmutableMap.of(DiskOp.READ, 1.3));
addSlowDiskForTesting("dn2", "disk2",
ImmutableMap.of(DiskOp.WRITE, 1.1));
// No reports should expire after 1ms.
timer.advance(1);
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !tracker.getSlowDisksReport().isEmpty();
}
}, 500, 5000);
Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker);
assertThat(reports.size(), is(3));
assertTrue(Math.abs(reports.get("dn1:disk1")
.getLatency(DiskOp.METADATA) - 1.1) < 0.0000001);
assertTrue(Math.abs(reports.get("dn1:disk1")
.getLatency(DiskOp.READ) - 1.8) < 0.0000001);
assertTrue(Math.abs(reports.get("dn1:disk2")
.getLatency(DiskOp.READ) - 1.3) < 0.0000001);
assertTrue(Math.abs(reports.get("dn2:disk2")
.getLatency(DiskOp.WRITE) - 1.1) < 0.0000001);
// All reports should expire after REPORT_VALIDITY_MS.
timer.advance(reportValidityMs);
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return tracker.getSlowDisksReport().isEmpty();
}
}, 500, 3000);
reports = getSlowDisksReportForTesting(tracker);
assertThat(reports.size(), is(0));
}
/**
* Test the case when a subset of reports has expired.
* Ensure that we only get back non-expired reports.
*/
@Test
public void testSomeReportsAreExpired() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
addSlowDiskForTesting("dn1", "disk2",
ImmutableMap.of(DiskOp.READ, 1.3));
timer.advance(reportValidityMs);
addSlowDiskForTesting("dn2", "disk2",
ImmutableMap.of(DiskOp.WRITE, 1.1));
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !tracker.getSlowDisksReport().isEmpty();
}
}, 500, 5000);
Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker);
assertThat(reports.size(), is(1));
assertTrue(Math.abs(reports.get("dn2:disk2")
.getLatency(DiskOp.WRITE) - 1.1) < 0.0000001);
}
/**
* Test the case when an expired report is replaced by a valid one.
*/
@Test
public void testReplacement() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
timer.advance(reportValidityMs);
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.READ, 1.4));
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !tracker.getSlowDisksReport().isEmpty();
}
}, 500, 5000);
Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker);
assertThat(reports.size(), is(1));
assertTrue(reports.get("dn1:disk1").getLatency(DiskOp.METADATA) == null);
assertTrue(Math.abs(reports.get("dn1:disk1")
.getLatency(DiskOp.READ) - 1.4) < 0.0000001);
}
@Test
public void testGetJson() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8));
addSlowDiskForTesting("dn1", "disk2",
ImmutableMap.of(DiskOp.READ, 1.3));
addSlowDiskForTesting("dn2", "disk2",
ImmutableMap.of(DiskOp.WRITE, 1.1));
addSlowDiskForTesting("dn3", "disk1",
ImmutableMap.of(DiskOp.WRITE, 1.1));
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return tracker.getSlowDiskReportAsJsonString() != null;
}
}, 500, 5000);
ArrayList<DiskLatency> jsonReport = getAndDeserializeJson(
tracker.getSlowDiskReportAsJsonString());
// And ensure its contents are what we expect.
assertThat(jsonReport.size(), is(4));
assertTrue(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.METADATA,
1.1));
assertTrue(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.READ, 1.8));
assertTrue(isDiskInReports(jsonReport, "dn1", "disk2", DiskOp.READ, 1.3));
assertTrue(isDiskInReports(jsonReport, "dn2", "disk2", DiskOp.WRITE, 1.1));
assertTrue(isDiskInReports(jsonReport, "dn3", "disk1", DiskOp.WRITE, 1.1));
}
@Test
public void testGetJsonSizeIsLimited() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.READ, 1.1));
addSlowDiskForTesting("dn1", "disk2",
ImmutableMap.of(DiskOp.READ, 1.2));
addSlowDiskForTesting("dn1", "disk3",
ImmutableMap.of(DiskOp.READ, 1.3));
addSlowDiskForTesting("dn2", "disk1",
ImmutableMap.of(DiskOp.READ, 1.4));
addSlowDiskForTesting("dn2", "disk2",
ImmutableMap.of(DiskOp.READ, 1.5));
addSlowDiskForTesting("dn3", "disk1",
ImmutableMap.of(DiskOp.WRITE, 1.6));
addSlowDiskForTesting("dn3", "disk2",
ImmutableMap.of(DiskOp.READ, 1.7));
addSlowDiskForTesting("dn3", "disk3",
ImmutableMap.of(DiskOp.READ, 1.2));
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return tracker.getSlowDiskReportAsJsonString() != null;
}
}, 500, 5000);
ArrayList<DiskLatency> jsonReport = getAndDeserializeJson(
tracker.getSlowDiskReportAsJsonString());
// Ensure that only the top 5 highest latencies are in the report.
assertThat(jsonReport.size(), is(5));
assertTrue(isDiskInReports(jsonReport, "dn3", "disk2", DiskOp.READ, 1.7));
assertTrue(isDiskInReports(jsonReport, "dn3", "disk1", DiskOp.WRITE, 1.6));
assertTrue(isDiskInReports(jsonReport, "dn2", "disk2", DiskOp.READ, 1.5));
assertTrue(isDiskInReports(jsonReport, "dn2", "disk1", DiskOp.READ, 1.4));
assertTrue(isDiskInReports(jsonReport, "dn1", "disk3", DiskOp.READ, 1.3));
// Remaining nodes should be in the list.
assertFalse(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.READ, 1.1));
assertFalse(isDiskInReports(jsonReport, "dn1", "disk2", DiskOp.READ, 1.2));
assertFalse(isDiskInReports(jsonReport, "dn3", "disk3", DiskOp.READ, 1.2));
}
@Test
public void testEmptyReport() throws Exception {
addSlowDiskForTesting("dn1", "disk1",
ImmutableMap.of(DiskOp.READ, 1.1));
timer.advance(reportValidityMs);
tracker.updateSlowDiskReportAsync(timer.monotonicNow());
Thread.sleep(OUTLIERS_REPORT_INTERVAL*2);
assertTrue(tracker.getSlowDiskReportAsJsonString() == null);
}
private boolean isDiskInReports(ArrayList<DiskLatency> reports,
String dataNodeID, String disk, DiskOp diskOp, double latency) {
String diskID = SlowDiskTracker.getSlowDiskIDForReport(dataNodeID, disk);
for (DiskLatency diskLatency : reports) {
if (diskLatency.getSlowDiskID().equals(diskID)) {
if (diskLatency.getLatency(diskOp) == null) {
return false;
}
if (Math.abs(diskLatency.getLatency(diskOp) - latency) < 0.0000001) {
return true;
}
}
}
return false;
}
private ArrayList<DiskLatency> getAndDeserializeJson(
final String json) throws IOException {
return (new ObjectMapper()).readValue(json,
new TypeReference<ArrayList<DiskLatency>>() {});
}
private void addSlowDiskForTesting(String dnID, String disk,
Map<DiskOp, Double> latencies) {
Map<String, Map<DiskOp, Double>> slowDisk = Maps.newHashMap();
slowDisk.put(disk, latencies);
SlowDiskReports slowDiskReport = SlowDiskReports.create(slowDisk);
tracker.addSlowDiskReport(dnID, slowDiskReport);
}
Map<String, DiskLatency> getSlowDisksReportForTesting(
SlowDiskTracker slowDiskTracker) {
Map<String, DiskLatency> slowDisksMap = Maps.newHashMap();
for (DiskLatency diskLatency : slowDiskTracker.getSlowDisksReport()) {
slowDisksMap.put(diskLatency.getSlowDiskID(), diskLatency);
}
return slowDisksMap;
}
}