blob: 9638f71ef82861bd05783e66ae3e5e3cc6d9da6e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.junit.Assert;
import org.junit.Test;
/**
* This test ensures the all types of data node report work correctly.
*/
public class TestDatanodeReport {
static final Logger LOG = LoggerFactory.getLogger(TestDatanodeReport.class);
final static private Configuration conf = new HdfsConfiguration();
final static private int NUM_OF_DATANODES = 4;
/**
* This test verifies upgrade domain is set according to the JSON host file.
*/
@Test
public void testDatanodeReportWithUpgradeDomain() throws Exception {
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); // 0.5s
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
CombinedHostFileManager.class, HostConfigManager.class);
HostsFileWriter hostsFileWriter = new HostsFileWriter();
hostsFileWriter.initialize(conf, "temp/datanodeReport");
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
final DFSClient client = cluster.getFileSystem().dfs;
final String ud1 = "ud1";
final String ud2 = "ud2";
try {
//wait until the cluster is up
cluster.waitActive();
DatanodeAdminProperties datanode = new DatanodeAdminProperties();
datanode.setHostName(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
datanode.setUpgradeDomain(ud1);
hostsFileWriter.initIncludeHosts(
new DatanodeAdminProperties[]{datanode});
cluster.getNamesystem().getBlockManager().getDatanodeManager().refreshNodes(conf);
DatanodeInfo[] all = client.datanodeReport(DatanodeReportType.ALL);
assertEquals(all[0].getUpgradeDomain(), ud1);
datanode.setUpgradeDomain(null);
hostsFileWriter.initIncludeHosts(
new DatanodeAdminProperties[]{datanode});
cluster.getNamesystem().getBlockManager().getDatanodeManager().refreshNodes(conf);
all = client.datanodeReport(DatanodeReportType.ALL);
assertEquals(all[0].getUpgradeDomain(), null);
datanode.setUpgradeDomain(ud2);
hostsFileWriter.initIncludeHosts(
new DatanodeAdminProperties[]{datanode});
cluster.getNamesystem().getBlockManager().getDatanodeManager().refreshNodes(conf);
all = client.datanodeReport(DatanodeReportType.ALL);
assertEquals(all[0].getUpgradeDomain(), ud2);
} finally {
cluster.shutdown();
}
}
/**
* This test attempts to different types of datanode report.
*/
@Test
public void testDatanodeReport() throws Exception {
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); // 0.5s
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES).build();
try {
//wait until the cluster is up
cluster.waitActive();
final String bpid = cluster.getNamesystem().getBlockPoolId();
final List<DataNode> datanodes = cluster.getDataNodes();
final DFSClient client = cluster.getFileSystem().dfs;
assertReports(NUM_OF_DATANODES, DatanodeReportType.ALL, client, datanodes, bpid);
assertReports(NUM_OF_DATANODES, DatanodeReportType.LIVE, client, datanodes, bpid);
assertReports(0, DatanodeReportType.DEAD, client, datanodes, bpid);
// bring down one datanode
final DataNode last = datanodes.get(datanodes.size() - 1);
LOG.info("XXX shutdown datanode " + last.getDatanodeUuid());
last.shutdown();
DatanodeInfo[] nodeInfo = client.datanodeReport(DatanodeReportType.DEAD);
while (nodeInfo.length != 1) {
try {
Thread.sleep(500);
} catch (Exception e) {
}
nodeInfo = client.datanodeReport(DatanodeReportType.DEAD);
}
assertReports(NUM_OF_DATANODES, DatanodeReportType.ALL, client, datanodes, null);
assertReports(NUM_OF_DATANODES - 1, DatanodeReportType.LIVE, client, datanodes, null);
assertReports(1, DatanodeReportType.DEAD, client, datanodes, null);
Thread.sleep(5000);
assertCounter("ExpiredHeartbeats", 1, getMetrics("FSNamesystem"));
} finally {
cluster.shutdown();
}
}
@Test
public void testDatanodeReportMissingBlock() throws Exception {
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setLong(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_OF_DATANODES).build();
try {
// wait until the cluster is up
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
Path p = new Path("/testDatanodeReportMissingBlock");
DFSTestUtil.writeFile(fs, p, new String("testdata"));
LocatedBlock lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
assertEquals(3, lb.getLocations().length);
ExtendedBlock b = lb.getBlock();
cluster.corruptBlockOnDataNodesByDeletingBlockFile(b);
try {
DFSTestUtil.readFile(fs, p);
Assert.fail("Must throw exception as the block doesn't exists on disk");
} catch (IOException e) {
// all bad datanodes
}
cluster.triggerHeartbeats(); // IBR delete ack
int retries = 0;
while (true) {
lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
if (0 != lb.getLocations().length) {
retries++;
if (retries > 7) {
Assert.fail("getLocatedBlocks failed after 7 retries");
}
Thread.sleep(2000);
} else {
break;
}
}
} finally {
cluster.shutdown();
}
}
final static Comparator<StorageReport> CMP = new Comparator<StorageReport>() {
@Override
public int compare(StorageReport left, StorageReport right) {
return left.getStorage().getStorageID().compareTo(
right.getStorage().getStorageID());
}
};
static void assertReports(int numDatanodes, DatanodeReportType type,
DFSClient client, List<DataNode> datanodes, String bpid) throws IOException {
final DatanodeInfo[] infos = client.datanodeReport(type);
assertEquals(numDatanodes, infos.length);
final DatanodeStorageReport[] reports = client.getDatanodeStorageReport(type);
assertEquals(numDatanodes, reports.length);
for(int i = 0; i < infos.length; i++) {
assertEquals(infos[i], reports[i].getDatanodeInfo());
final DataNode d = findDatanode(infos[i].getDatanodeUuid(), datanodes);
if (bpid != null) {
//check storage
final StorageReport[] computed = reports[i].getStorageReports();
Arrays.sort(computed, CMP);
final StorageReport[] expected = d.getFSDataset().getStorageReports(bpid);
Arrays.sort(expected, CMP);
assertEquals(expected.length, computed.length);
for(int j = 0; j < expected.length; j++) {
assertEquals(expected[j].getStorage().getStorageID(),
computed[j].getStorage().getStorageID());
}
}
}
}
static DataNode findDatanode(String id, List<DataNode> datanodes) {
for(DataNode d : datanodes) {
if (d.getDatanodeUuid().equals(id)) {
return d;
}
}
throw new IllegalStateException("Datnode " + id + " not in datanode list: "
+ datanodes);
}
}