| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.server.blockmanagement; |
| |
| import static org.hamcrest.core.Is.is; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.collect.ImmutableMap; |
| import org.apache.hadoop.conf.Configuration; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys |
| .DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys |
| .DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; |
| import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp; |
| import org.apache.hadoop.hdfs.server.blockmanagement.SlowDiskTracker |
| .DiskLatency; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.FakeTimer; |
| |
| import com.google.common.base.Supplier; |
| import com.google.common.collect.Maps; |
| |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.Timeout; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * Tests for {@link SlowDiskTracker}. |
| */ |
| public class TestSlowDiskTracker { |
| public static final Logger LOG = LoggerFactory.getLogger( |
| TestSlowDiskTracker.class); |
| |
| /** |
| * Set a timeout for every test case. |
| */ |
| @Rule |
| public Timeout testTimeout = new Timeout(300_000); |
| |
| private static Configuration conf; |
| private SlowDiskTracker tracker; |
| private FakeTimer timer; |
| private long reportValidityMs; |
| private static final long OUTLIERS_REPORT_INTERVAL = 1000; |
| |
| static { |
| conf = new HdfsConfiguration(); |
| conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1L); |
| conf.setInt(DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, 100); |
| conf.setTimeDuration(DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, |
| OUTLIERS_REPORT_INTERVAL, TimeUnit.MILLISECONDS); |
| } |
| @Before |
| public void setup() { |
| timer = new FakeTimer(); |
| tracker = new SlowDiskTracker(conf, timer); |
| reportValidityMs = tracker.getReportValidityMs(); |
| } |
| |
| @Test |
| public void testDataNodeHeartbeatSlowDiskReport() throws Exception { |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) |
| .build(); |
| try { |
| DataNode dn1 = cluster.getDataNodes().get(0); |
| DataNode dn2 = cluster.getDataNodes().get(1); |
| NameNode nn = cluster.getNameNode(0); |
| |
| DatanodeManager datanodeManager = nn.getNamesystem().getBlockManager() |
| .getDatanodeManager(); |
| final SlowDiskTracker slowDiskTracker = datanodeManager.getSlowDiskTracker(); |
| slowDiskTracker.setReportValidityMs(OUTLIERS_REPORT_INTERVAL * 100); |
| |
| dn1.getDiskMetrics().addSlowDiskForTesting("disk1", ImmutableMap.of( |
| DiskOp.WRITE, 1.3)); |
| dn1.getDiskMetrics().addSlowDiskForTesting("disk2", ImmutableMap.of( |
| DiskOp.READ, 1.6, DiskOp.WRITE, 1.1)); |
| dn2.getDiskMetrics().addSlowDiskForTesting("disk1", ImmutableMap.of( |
| DiskOp.METADATA, 0.8)); |
| dn2.getDiskMetrics().addSlowDiskForTesting("disk2", ImmutableMap.of( |
| DiskOp.WRITE, 1.3)); |
| |
| String dn1ID = dn1.getDatanodeId().getIpcAddr(false); |
| String dn2ID = dn2.getDatanodeId().getIpcAddr(false); |
| |
| // Advance the timer and wait for NN to receive reports from DataNodes. |
| Thread.sleep(OUTLIERS_REPORT_INTERVAL); |
| |
| // Wait for NN to receive reports from all DNs |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| return (slowDiskTracker.getSlowDisksReport().size() == 4); |
| } |
| }, 1000, 100000); |
| |
| Map<String, DiskLatency> slowDisksReport = getSlowDisksReportForTesting( |
| slowDiskTracker); |
| |
| assertThat(slowDisksReport.size(), is(4)); |
| assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk1") |
| .getLatency(DiskOp.WRITE) - 1.3) < 0.0000001); |
| assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk2") |
| .getLatency(DiskOp.READ) - 1.6) < 0.0000001); |
| assertTrue(Math.abs(slowDisksReport.get(dn1ID + ":disk2") |
| .getLatency(DiskOp.WRITE) - 1.1) < 0.0000001); |
| assertTrue(Math.abs(slowDisksReport.get(dn2ID + ":disk1") |
| .getLatency(DiskOp.METADATA) - 0.8) < 0.0000001); |
| assertTrue(Math.abs(slowDisksReport.get(dn2ID + ":disk2") |
| .getLatency(DiskOp.WRITE) - 1.3) < 0.0000001); |
| |
| // Test the slow disk report JSON string |
| ArrayList<DiskLatency> jsonReport = getAndDeserializeJson( |
| slowDiskTracker.getSlowDiskReportAsJsonString()); |
| |
| assertThat(jsonReport.size(), is(4)); |
| assertTrue(isDiskInReports(jsonReport, dn1ID, "disk1", DiskOp.WRITE, 1.3)); |
| assertTrue(isDiskInReports(jsonReport, dn1ID, "disk2", DiskOp.READ, 1.6)); |
| assertTrue(isDiskInReports(jsonReport, dn1ID, "disk2", DiskOp.WRITE, 1.1)); |
| assertTrue(isDiskInReports(jsonReport, dn2ID, "disk1", DiskOp.METADATA, |
| 0.8)); |
| assertTrue(isDiskInReports(jsonReport, dn2ID, "disk2", DiskOp.WRITE, 1.3)); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Edge case, there are no reports to retrieve. |
| */ |
| @Test |
| public void testEmptyReports() { |
| tracker.updateSlowDiskReportAsync(timer.monotonicNow()); |
| assertTrue(getSlowDisksReportForTesting(tracker).isEmpty()); |
| } |
| |
| @Test |
| public void testReportsAreRetrieved() throws Exception { |
| addSlowDiskForTesting("dn1", "disk1", |
| ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8)); |
| addSlowDiskForTesting("dn1", "disk2", |
| ImmutableMap.of(DiskOp.READ, 1.3)); |
| addSlowDiskForTesting("dn2", "disk2", |
| ImmutableMap.of(DiskOp.READ, 1.1)); |
| |
| tracker.updateSlowDiskReportAsync(timer.monotonicNow()); |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| return !tracker.getSlowDisksReport().isEmpty(); |
| } |
| }, 500, 5000); |
| |
| Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker); |
| |
| assertThat(reports.size(), is(3)); |
| assertTrue(Math.abs(reports.get("dn1:disk1") |
| .getLatency(DiskOp.METADATA) - 1.1) < 0.0000001); |
| assertTrue(Math.abs(reports.get("dn1:disk1") |
| .getLatency(DiskOp.READ) - 1.8) < 0.0000001); |
| assertTrue(Math.abs(reports.get("dn1:disk2") |
| .getLatency(DiskOp.READ) - 1.3) < 0.0000001); |
| assertTrue(Math.abs(reports.get("dn2:disk2") |
| .getLatency(DiskOp.READ) - 1.1) < 0.0000001); |
| } |
| |
| /** |
| * Test that when all reports are expired, we get back nothing. |
| */ |
| @Test |
| public void testAllReportsAreExpired() throws Exception { |
| addSlowDiskForTesting("dn1", "disk1", |
| ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8)); |
| addSlowDiskForTesting("dn1", "disk2", |
| ImmutableMap.of(DiskOp.READ, 1.3)); |
| addSlowDiskForTesting("dn2", "disk2", |
| ImmutableMap.of(DiskOp.WRITE, 1.1)); |
| |
| // No reports should expire after 1ms. |
| timer.advance(1); |
| tracker.updateSlowDiskReportAsync(timer.monotonicNow()); |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| return !tracker.getSlowDisksReport().isEmpty(); |
| } |
| }, 500, 5000); |
| |
| Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker); |
| |
| assertThat(reports.size(), is(3)); |
| assertTrue(Math.abs(reports.get("dn1:disk1") |
| .getLatency(DiskOp.METADATA) - 1.1) < 0.0000001); |
| assertTrue(Math.abs(reports.get("dn1:disk1") |
| .getLatency(DiskOp.READ) - 1.8) < 0.0000001); |
| assertTrue(Math.abs(reports.get("dn1:disk2") |
| .getLatency(DiskOp.READ) - 1.3) < 0.0000001); |
| assertTrue(Math.abs(reports.get("dn2:disk2") |
| .getLatency(DiskOp.WRITE) - 1.1) < 0.0000001); |
| |
| // All reports should expire after REPORT_VALIDITY_MS. |
| timer.advance(reportValidityMs); |
| tracker.updateSlowDiskReportAsync(timer.monotonicNow()); |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| return tracker.getSlowDisksReport().isEmpty(); |
| } |
| }, 500, 3000); |
| |
| reports = getSlowDisksReportForTesting(tracker); |
| |
| assertThat(reports.size(), is(0)); |
| } |
| |
| /** |
| * Test the case when a subset of reports has expired. |
| * Ensure that we only get back non-expired reports. |
| */ |
| @Test |
| public void testSomeReportsAreExpired() throws Exception { |
| addSlowDiskForTesting("dn1", "disk1", |
| ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8)); |
| addSlowDiskForTesting("dn1", "disk2", |
| ImmutableMap.of(DiskOp.READ, 1.3)); |
| timer.advance(reportValidityMs); |
| addSlowDiskForTesting("dn2", "disk2", |
| ImmutableMap.of(DiskOp.WRITE, 1.1)); |
| |
| tracker.updateSlowDiskReportAsync(timer.monotonicNow()); |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| return !tracker.getSlowDisksReport().isEmpty(); |
| } |
| }, 500, 5000); |
| |
| Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker); |
| |
| assertThat(reports.size(), is(1)); |
| assertTrue(Math.abs(reports.get("dn2:disk2") |
| .getLatency(DiskOp.WRITE) - 1.1) < 0.0000001); |
| } |
| |
| /** |
| * Test the case when an expired report is replaced by a valid one. |
| */ |
| @Test |
| public void testReplacement() throws Exception { |
| addSlowDiskForTesting("dn1", "disk1", |
| ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8)); |
| timer.advance(reportValidityMs); |
| addSlowDiskForTesting("dn1", "disk1", |
| ImmutableMap.of(DiskOp.READ, 1.4)); |
| |
| tracker.updateSlowDiskReportAsync(timer.monotonicNow()); |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| return !tracker.getSlowDisksReport().isEmpty(); |
| } |
| }, 500, 5000); |
| |
| Map<String, DiskLatency> reports = getSlowDisksReportForTesting(tracker); |
| |
| assertThat(reports.size(), is(1)); |
| assertTrue(reports.get("dn1:disk1").getLatency(DiskOp.METADATA) == null); |
| assertTrue(Math.abs(reports.get("dn1:disk1") |
| .getLatency(DiskOp.READ) - 1.4) < 0.0000001); |
| } |
| |
| @Test |
| public void testGetJson() throws Exception { |
| addSlowDiskForTesting("dn1", "disk1", |
| ImmutableMap.of(DiskOp.METADATA, 1.1, DiskOp.READ, 1.8)); |
| addSlowDiskForTesting("dn1", "disk2", |
| ImmutableMap.of(DiskOp.READ, 1.3)); |
| addSlowDiskForTesting("dn2", "disk2", |
| ImmutableMap.of(DiskOp.WRITE, 1.1)); |
| addSlowDiskForTesting("dn3", "disk1", |
| ImmutableMap.of(DiskOp.WRITE, 1.1)); |
| |
| tracker.updateSlowDiskReportAsync(timer.monotonicNow()); |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| return tracker.getSlowDiskReportAsJsonString() != null; |
| } |
| }, 500, 5000); |
| |
| ArrayList<DiskLatency> jsonReport = getAndDeserializeJson( |
| tracker.getSlowDiskReportAsJsonString()); |
| |
| // And ensure its contents are what we expect. |
| assertThat(jsonReport.size(), is(4)); |
| assertTrue(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.METADATA, |
| 1.1)); |
| assertTrue(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.READ, 1.8)); |
| assertTrue(isDiskInReports(jsonReport, "dn1", "disk2", DiskOp.READ, 1.3)); |
| assertTrue(isDiskInReports(jsonReport, "dn2", "disk2", DiskOp.WRITE, 1.1)); |
| assertTrue(isDiskInReports(jsonReport, "dn3", "disk1", DiskOp.WRITE, 1.1)); |
| } |
| |
| @Test |
| public void testGetJsonSizeIsLimited() throws Exception { |
| addSlowDiskForTesting("dn1", "disk1", |
| ImmutableMap.of(DiskOp.READ, 1.1)); |
| addSlowDiskForTesting("dn1", "disk2", |
| ImmutableMap.of(DiskOp.READ, 1.2)); |
| addSlowDiskForTesting("dn1", "disk3", |
| ImmutableMap.of(DiskOp.READ, 1.3)); |
| addSlowDiskForTesting("dn2", "disk1", |
| ImmutableMap.of(DiskOp.READ, 1.4)); |
| addSlowDiskForTesting("dn2", "disk2", |
| ImmutableMap.of(DiskOp.READ, 1.5)); |
| addSlowDiskForTesting("dn3", "disk1", |
| ImmutableMap.of(DiskOp.WRITE, 1.6)); |
| addSlowDiskForTesting("dn3", "disk2", |
| ImmutableMap.of(DiskOp.READ, 1.7)); |
| addSlowDiskForTesting("dn3", "disk3", |
| ImmutableMap.of(DiskOp.READ, 1.2)); |
| |
| tracker.updateSlowDiskReportAsync(timer.monotonicNow()); |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| return tracker.getSlowDiskReportAsJsonString() != null; |
| } |
| }, 500, 5000); |
| |
| ArrayList<DiskLatency> jsonReport = getAndDeserializeJson( |
| tracker.getSlowDiskReportAsJsonString()); |
| |
| // Ensure that only the top 5 highest latencies are in the report. |
| assertThat(jsonReport.size(), is(5)); |
| assertTrue(isDiskInReports(jsonReport, "dn3", "disk2", DiskOp.READ, 1.7)); |
| assertTrue(isDiskInReports(jsonReport, "dn3", "disk1", DiskOp.WRITE, 1.6)); |
| assertTrue(isDiskInReports(jsonReport, "dn2", "disk2", DiskOp.READ, 1.5)); |
| assertTrue(isDiskInReports(jsonReport, "dn2", "disk1", DiskOp.READ, 1.4)); |
| assertTrue(isDiskInReports(jsonReport, "dn1", "disk3", DiskOp.READ, 1.3)); |
| |
| // Remaining nodes should be in the list. |
| assertFalse(isDiskInReports(jsonReport, "dn1", "disk1", DiskOp.READ, 1.1)); |
| assertFalse(isDiskInReports(jsonReport, "dn1", "disk2", DiskOp.READ, 1.2)); |
| assertFalse(isDiskInReports(jsonReport, "dn3", "disk3", DiskOp.READ, 1.2)); |
| } |
| |
| @Test |
| public void testEmptyReport() throws Exception { |
| addSlowDiskForTesting("dn1", "disk1", |
| ImmutableMap.of(DiskOp.READ, 1.1)); |
| timer.advance(reportValidityMs); |
| |
| tracker.updateSlowDiskReportAsync(timer.monotonicNow()); |
| Thread.sleep(OUTLIERS_REPORT_INTERVAL*2); |
| |
| assertTrue(tracker.getSlowDiskReportAsJsonString() == null); |
| } |
| |
| private boolean isDiskInReports(ArrayList<DiskLatency> reports, |
| String dataNodeID, String disk, DiskOp diskOp, double latency) { |
| String diskID = SlowDiskTracker.getSlowDiskIDForReport(dataNodeID, disk); |
| for (DiskLatency diskLatency : reports) { |
| if (diskLatency.getSlowDiskID().equals(diskID)) { |
| if (diskLatency.getLatency(diskOp) == null) { |
| return false; |
| } |
| if (Math.abs(diskLatency.getLatency(diskOp) - latency) < 0.0000001) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| private ArrayList<DiskLatency> getAndDeserializeJson( |
| final String json) throws IOException { |
| return (new ObjectMapper()).readValue(json, |
| new TypeReference<ArrayList<DiskLatency>>() {}); |
| } |
| |
| private void addSlowDiskForTesting(String dnID, String disk, |
| Map<DiskOp, Double> latencies) { |
| Map<String, Map<DiskOp, Double>> slowDisk = Maps.newHashMap(); |
| slowDisk.put(disk, latencies); |
| SlowDiskReports slowDiskReport = SlowDiskReports.create(slowDisk); |
| tracker.addSlowDiskReport(dnID, slowDiskReport); |
| } |
| |
| Map<String, DiskLatency> getSlowDisksReportForTesting( |
| SlowDiskTracker slowDiskTracker) { |
| Map<String, DiskLatency> slowDisksMap = Maps.newHashMap(); |
| for (DiskLatency diskLatency : slowDiskTracker.getSlowDisksReport()) { |
| slowDisksMap.put(diskLatency.getSlowDiskID(), diskLatency); |
| } |
| return slowDisksMap; |
| } |
| } |