| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.tools; |
| |
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; |
| |
| import com.google.common.base.Supplier; |
| import com.google.common.collect.Lists; |
| |
| import org.apache.commons.lang.text.StrBuilder; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.ReconfigurationUtil; |
| import org.apache.hadoop.fs.ChecksumException; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.DFSClient; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlocks; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; |
| import org.apache.hadoop.hdfs.server.common.Storage; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.datanode.StorageLocation; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.test.PathUtils; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.PrintStream; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Scanner; |
| import java.util.concurrent.TimeoutException; |
| |
| import static org.hamcrest.CoreMatchers.allOf; |
| import static org.hamcrest.CoreMatchers.anyOf; |
| import static org.hamcrest.CoreMatchers.is; |
| import static org.hamcrest.CoreMatchers.not; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.hamcrest.CoreMatchers.containsString; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| /** |
| * set/clrSpaceQuote are tested in {@link org.apache.hadoop.hdfs.TestQuota}. |
| */ |
| public class TestDFSAdmin { |
| private static final Log LOG = LogFactory.getLog(TestDFSAdmin.class); |
| private Configuration conf = null; |
| private MiniDFSCluster cluster; |
| private DFSAdmin admin; |
| private DataNode datanode; |
| private NameNode namenode; |
| private final ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| private final ByteArrayOutputStream err = new ByteArrayOutputStream(); |
| private static final PrintStream OLD_OUT = System.out; |
| private static final PrintStream OLD_ERR = System.err; |
| |
| @Before |
| public void setUp() throws Exception { |
| conf = new Configuration(); |
| conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 3); |
| restartCluster(); |
| |
| admin = new DFSAdmin(); |
| } |
| |
| private void redirectStream() { |
| System.setOut(new PrintStream(out)); |
| System.setErr(new PrintStream(err)); |
| } |
| |
| private void resetStream() { |
| out.reset(); |
| err.reset(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| try { |
| System.out.flush(); |
| System.err.flush(); |
| } finally { |
| System.setOut(OLD_OUT); |
| System.setErr(OLD_ERR); |
| } |
| |
| if (cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| |
| resetStream(); |
| } |
| |
| private void restartCluster() throws IOException { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); |
| cluster.waitActive(); |
| datanode = cluster.getDataNodes().get(0); |
| namenode = cluster.getNameNode(); |
| } |
| |
| private void getReconfigurableProperties(String nodeType, String address, |
| final List<String> outs, final List<String> errs) throws IOException { |
| reconfigurationOutErrFormatter("getReconfigurableProperties", nodeType, |
| address, outs, errs); |
| } |
| |
| private void getReconfigurationStatus(String nodeType, String address, |
| final List<String> outs, final List<String> errs) throws IOException { |
| reconfigurationOutErrFormatter("getReconfigurationStatus", nodeType, |
| address, outs, errs); |
| } |
| |
| private void reconfigurationOutErrFormatter(String methodName, |
| String nodeType, String address, final List<String> outs, |
| final List<String> errs) throws IOException { |
| ByteArrayOutputStream bufOut = new ByteArrayOutputStream(); |
| PrintStream outStream = new PrintStream(bufOut); |
| ByteArrayOutputStream bufErr = new ByteArrayOutputStream(); |
| PrintStream errStream = new PrintStream(bufErr); |
| |
| if (methodName.equals("getReconfigurableProperties")) { |
| admin.getReconfigurableProperties( |
| nodeType, |
| address, |
| outStream, |
| errStream); |
| } else if (methodName.equals("getReconfigurationStatus")) { |
| admin.getReconfigurationStatus(nodeType, address, outStream, errStream); |
| } else if (methodName.equals("startReconfiguration")) { |
| admin.startReconfiguration(nodeType, address, outStream, errStream); |
| } |
| |
| scanIntoList(bufOut, outs); |
| scanIntoList(bufErr, errs); |
| } |
| |
| private static void scanIntoList( |
| final ByteArrayOutputStream baos, |
| final List<String> list) { |
| final Scanner scanner = new Scanner(baos.toString()); |
| while (scanner.hasNextLine()) { |
| list.add(scanner.nextLine()); |
| } |
| scanner.close(); |
| } |
| |
| @Test(timeout = 30000) |
| public void testGetDatanodeInfo() throws Exception { |
| redirectStream(); |
| final DFSAdmin dfsAdmin = new DFSAdmin(conf); |
| |
| for (int i = 0; i < cluster.getDataNodes().size(); i++) { |
| resetStream(); |
| final DataNode dn = cluster.getDataNodes().get(i); |
| final String addr = String.format( |
| "%s:%d", |
| dn.getXferAddress().getHostString(), |
| dn.getIpcPort()); |
| final int ret = ToolRunner.run(dfsAdmin, |
| new String[]{"-getDatanodeInfo", addr}); |
| assertEquals(0, ret); |
| |
| /* collect outputs */ |
| final List<String> outs = Lists.newArrayList(); |
| scanIntoList(out, outs); |
| /* verify results */ |
| assertEquals( |
| "One line per DataNode like: Uptime: XXX, Software version: x.y.z," |
| + " Config version: core-x.y.z,hdfs-x", |
| 1, outs.size()); |
| assertThat(outs.get(0), |
| is(allOf(containsString("Uptime:"), |
| containsString("Software version"), |
| containsString("Config version")))); |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testGetVolumeReport() throws Exception { |
| redirectStream(); |
| final DFSAdmin dfsAdmin = new DFSAdmin(conf); |
| |
| for (int i = 0; i < cluster.getDataNodes().size(); i++) { |
| resetStream(); |
| final DataNode dn = cluster.getDataNodes().get(i); |
| final String addr = String.format("%s:%d", dn.getXferAddress() |
| .getHostString(), dn.getIpcPort()); |
| final int ret = ToolRunner.run(dfsAdmin, new String[] { |
| "-getVolumeReport", addr }); |
| assertEquals(0, ret); |
| |
| /* collect outputs */ |
| final List<String> outs = Lists.newArrayList(); |
| scanIntoList(out, outs); |
| assertEquals(outs.get(0), "Active Volumes : 2"); |
| } |
| } |
| /** |
| * Test that if datanode is not reachable, some DFSAdmin commands will fail |
| * elegantly with non-zero ret error code along with exception error message. |
| */ |
| @Test(timeout = 60000) |
| public void testDFSAdminUnreachableDatanode() throws Exception { |
| redirectStream(); |
| final DFSAdmin dfsAdmin = new DFSAdmin(conf); |
| for (String command : new String[]{"-getDatanodeInfo", |
| "-evictWriters", "-getBalancerBandwidth"}) { |
| // Connecting to Xfer port instead of IPC port will get |
| // Datanode unreachable. java.io.EOFException |
| final String dnDataAddr = datanode.getXferAddress().getHostString() + ":" |
| + datanode.getXferPort(); |
| resetStream(); |
| final List<String> outs = Lists.newArrayList(); |
| final int ret = ToolRunner.run(dfsAdmin, |
| new String[]{command, dnDataAddr}); |
| assertEquals(-1, ret); |
| |
| scanIntoList(out, outs); |
| assertTrue("Unexpected " + command + " stdout: " + out, outs.isEmpty()); |
| assertTrue("Unexpected " + command + " stderr: " + err, |
| err.toString().contains("Exception")); |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testDataNodeGetReconfigurableProperties() throws IOException { |
| final int port = datanode.getIpcPort(); |
| final String address = "localhost:" + port; |
| final List<String> outs = Lists.newArrayList(); |
| final List<String> errs = Lists.newArrayList(); |
| getReconfigurableProperties("datanode", address, outs, errs); |
| assertEquals(3, outs.size()); |
| assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1)); |
| } |
| |
| /** |
| * Test reconfiguration and check the status outputs. |
| * @param expectedSuccuss set true if the reconfiguration task should success. |
| * @throws IOException |
| * @throws InterruptedException |
| * @throws TimeoutException |
| */ |
| private void testDataNodeGetReconfigurationStatus(boolean expectedSuccuss) |
| throws IOException, InterruptedException, TimeoutException { |
| ReconfigurationUtil ru = mock(ReconfigurationUtil.class); |
| datanode.setReconfigurationUtil(ru); |
| |
| List<ReconfigurationUtil.PropertyChange> changes = |
| new ArrayList<>(); |
| File newDir = new File(cluster.getDataDirectory(), "data_new"); |
| if (expectedSuccuss) { |
| newDir.mkdirs(); |
| } else { |
| // Inject failure. |
| newDir.createNewFile(); |
| } |
| changes.add(new ReconfigurationUtil.PropertyChange( |
| DFS_DATANODE_DATA_DIR_KEY, newDir.toString(), |
| datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); |
| changes.add(new ReconfigurationUtil.PropertyChange( |
| "randomKey", "new123", "old456")); |
| when(ru.parseChangedProperties(any(Configuration.class), |
| any(Configuration.class))).thenReturn(changes); |
| |
| final int port = datanode.getIpcPort(); |
| final String address = "localhost:" + port; |
| |
| assertThat(admin.startReconfiguration("datanode", address), is(0)); |
| |
| final List<String> outs = Lists.newArrayList(); |
| final List<String> errs = Lists.newArrayList(); |
| awaitReconfigurationFinished("datanode", address, outs, errs); |
| |
| if (expectedSuccuss) { |
| assertThat(outs.size(), is(4)); |
| } else { |
| assertThat(outs.size(), is(6)); |
| } |
| |
| List<StorageLocation> locations = DataNode.getStorageLocations( |
| datanode.getConf()); |
| if (expectedSuccuss) { |
| assertThat(locations.size(), is(1)); |
| assertThat(locations.get(0).getFile(), is(newDir)); |
| // Verify the directory is appropriately formatted. |
| assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory()); |
| } else { |
| assertTrue(locations.isEmpty()); |
| } |
| |
| int offset = 1; |
| if (expectedSuccuss) { |
| assertThat(outs.get(offset), |
| containsString("SUCCESS: Changed property " + |
| DFS_DATANODE_DATA_DIR_KEY)); |
| } else { |
| assertThat(outs.get(offset), |
| containsString("FAILED: Change property " + |
| DFS_DATANODE_DATA_DIR_KEY)); |
| } |
| assertThat(outs.get(offset + 1), |
| is(allOf(containsString("From:"), containsString("data1"), |
| containsString("data2")))); |
| assertThat(outs.get(offset + 2), |
| is(not(anyOf(containsString("data1"), containsString("data2"))))); |
| assertThat(outs.get(offset + 2), |
| is(allOf(containsString("To"), containsString("data_new")))); |
| } |
| |
| @Test(timeout = 30000) |
| public void testDataNodeGetReconfigurationStatus() throws IOException, |
| InterruptedException, TimeoutException { |
| testDataNodeGetReconfigurationStatus(true); |
| restartCluster(); |
| testDataNodeGetReconfigurationStatus(false); |
| } |
| |
| @Test(timeout = 30000) |
| public void testNameNodeGetReconfigurableProperties() throws IOException { |
| final String address = namenode.getHostAndPort(); |
| final List<String> outs = Lists.newArrayList(); |
| final List<String> errs = Lists.newArrayList(); |
| getReconfigurableProperties("namenode", address, outs, errs); |
| assertEquals(6, outs.size()); |
| assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(1)); |
| assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(2)); |
| assertEquals(errs.size(), 0); |
| } |
| |
| void awaitReconfigurationFinished(final String nodeType, |
| final String address, final List<String> outs, final List<String> errs) |
| throws TimeoutException, IOException, InterruptedException { |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| outs.clear(); |
| errs.clear(); |
| try { |
| getReconfigurationStatus(nodeType, address, outs, errs); |
| } catch (IOException e) { |
| LOG.error(String.format( |
| "call getReconfigurationStatus on %s[%s] failed.", nodeType, |
| address), e); |
| } |
| return !outs.isEmpty() && outs.get(0).contains("finished"); |
| |
| } |
| }, 100, 100 * 100); |
| } |
| |
| @Test(timeout = 30000) |
| public void testPrintTopology() throws Exception { |
| redirectStream(); |
| |
| /* init conf */ |
| final Configuration dfsConf = new HdfsConfiguration(); |
| final File baseDir = new File( |
| PathUtils.getTestDir(getClass()), |
| GenericTestUtils.getMethodName()); |
| dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); |
| |
| final int numDn = 4; |
| final String[] racks = { |
| "/d1/r1", "/d1/r2", |
| "/d2/r1", "/d2/r2"}; |
| |
| /* init cluster using topology */ |
| try (MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(dfsConf) |
| .numDataNodes(numDn).racks(racks).build()) { |
| |
| miniCluster.waitActive(); |
| assertEquals(numDn, miniCluster.getDataNodes().size()); |
| final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf); |
| |
| resetStream(); |
| final int ret = ToolRunner.run(dfsAdmin, new String[] {"-printTopology"}); |
| |
| /* collect outputs */ |
| final List<String> outs = Lists.newArrayList(); |
| scanIntoList(out, outs); |
| |
| /* verify results */ |
| assertEquals(0, ret); |
| assertEquals( |
| "There should be three lines per Datanode: the 1st line is" |
| + " rack info, 2nd node info, 3rd empty line. The total" |
| + " should be as a result of 3 * numDn.", |
| 12, outs.size()); |
| assertThat(outs.get(0), |
| is(allOf(containsString("Rack:"), containsString("/d1/r1")))); |
| assertThat(outs.get(3), |
| is(allOf(containsString("Rack:"), containsString("/d1/r2")))); |
| assertThat(outs.get(6), |
| is(allOf(containsString("Rack:"), containsString("/d2/r1")))); |
| assertThat(outs.get(9), |
| is(allOf(containsString("Rack:"), containsString("/d2/r2")))); |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testNameNodeGetReconfigurationStatus() throws IOException, |
| InterruptedException, TimeoutException { |
| ReconfigurationUtil ru = mock(ReconfigurationUtil.class); |
| namenode.setReconfigurationUtil(ru); |
| final String address = namenode.getHostAndPort(); |
| |
| List<ReconfigurationUtil.PropertyChange> changes = |
| new ArrayList<>(); |
| changes.add(new ReconfigurationUtil.PropertyChange( |
| DFS_HEARTBEAT_INTERVAL_KEY, String.valueOf(6), |
| namenode.getConf().get(DFS_HEARTBEAT_INTERVAL_KEY))); |
| changes.add(new ReconfigurationUtil.PropertyChange( |
| "randomKey", "new123", "old456")); |
| when(ru.parseChangedProperties(any(Configuration.class), |
| any(Configuration.class))).thenReturn(changes); |
| assertThat(admin.startReconfiguration("namenode", address), is(0)); |
| |
| final List<String> outs = Lists.newArrayList(); |
| final List<String> errs = Lists.newArrayList(); |
| awaitReconfigurationFinished("namenode", address, outs, errs); |
| |
| // verify change |
| assertEquals( |
| DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value", |
| 6, |
| namenode |
| .getConf() |
| .getLong(DFS_HEARTBEAT_INTERVAL_KEY, |
| DFS_HEARTBEAT_INTERVAL_DEFAULT)); |
| assertEquals(DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value", |
| 6, |
| namenode |
| .getNamesystem() |
| .getBlockManager() |
| .getDatanodeManager() |
| .getHeartbeatInterval()); |
| |
| int offset = 1; |
| assertThat(outs.get(offset), containsString("SUCCESS: Changed property " |
| + DFS_HEARTBEAT_INTERVAL_KEY)); |
| assertThat(outs.get(offset + 1), |
| is(allOf(containsString("From:"), containsString("3")))); |
| assertThat(outs.get(offset + 2), |
| is(allOf(containsString("To:"), containsString("6")))); |
| } |
| |
| private static String scanIntoString(final ByteArrayOutputStream baos) { |
| final StrBuilder sb = new StrBuilder(); |
| final Scanner scanner = new Scanner(baos.toString()); |
| while (scanner.hasNextLine()) { |
| sb.appendln(scanner.nextLine()); |
| } |
| scanner.close(); |
| return sb.toString(); |
| } |
| |
| @Test(timeout = 120000) |
| public void testReportCommand() throws Exception { |
| redirectStream(); |
| |
| /* init conf */ |
| final Configuration dfsConf = new HdfsConfiguration(); |
| dfsConf.setInt( |
| DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, |
| 500); // 0.5s |
| dfsConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); |
| final Path baseDir = new Path( |
| PathUtils.getTestDir(getClass()).getAbsolutePath(), |
| GenericTestUtils.getMethodName()); |
| dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString()); |
| |
| final int numDn = 3; |
| |
| /* init cluster */ |
| try(MiniDFSCluster miniCluster = new MiniDFSCluster |
| .Builder(dfsConf) |
| .numDataNodes(numDn).build()) { |
| |
| miniCluster.waitActive(); |
| assertEquals(numDn, miniCluster.getDataNodes().size()); |
| |
| /* local vars */ |
| final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf); |
| final DFSClient client = miniCluster.getFileSystem().getClient(); |
| |
| /* run and verify report command */ |
| resetStream(); |
| assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"})); |
| verifyNodesAndCorruptBlocks(numDn, numDn, 0, client); |
| |
| /* shut down one DN */ |
| final List<DataNode> datanodes = miniCluster.getDataNodes(); |
| final DataNode last = datanodes.get(datanodes.size() - 1); |
| last.shutdown(); |
| miniCluster.setDataNodeDead(last.getDatanodeId()); |
| |
| /* run and verify report command */ |
| assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"})); |
| verifyNodesAndCorruptBlocks(numDn, numDn - 1, 0, client); |
| |
| /* corrupt one block */ |
| final short replFactor = 1; |
| final long fileLength = 512L; |
| final FileSystem fs = miniCluster.getFileSystem(); |
| final Path file = new Path(baseDir, "/corrupted"); |
| DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L); |
| DFSTestUtil.waitReplication(fs, file, replFactor); |
| |
| final ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file); |
| final int blockFilesCorrupted = miniCluster |
| .corruptBlockOnDataNodes(block); |
| assertEquals("Fail to corrupt all replicas for block " + block, |
| replFactor, blockFilesCorrupted); |
| |
| try { |
| IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), |
| conf, true); |
| fail("Should have failed to read the file with corrupted blocks."); |
| } catch (ChecksumException ignored) { |
| // expected exception reading corrupt blocks |
| } |
| |
| /* |
| * Increase replication factor, this should invoke transfer request. |
| * Receiving datanode fails on checksum and reports it to namenode |
| */ |
| fs.setReplication(file, (short) (replFactor + 1)); |
| |
| /* get block details and check if the block is corrupt */ |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| LocatedBlocks blocks = null; |
| try { |
| miniCluster.triggerBlockReports(); |
| blocks = client.getNamenode().getBlockLocations(file.toString(), 0, |
| Long.MAX_VALUE); |
| } catch (IOException e) { |
| return false; |
| } |
| return blocks != null && blocks.get(0).isCorrupt(); |
| } |
| }, 1000, 60000); |
| |
| BlockManagerTestUtil.updateState( |
| miniCluster.getNameNode().getNamesystem().getBlockManager()); |
| |
| /* run and verify report command */ |
| resetStream(); |
| assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"})); |
| verifyNodesAndCorruptBlocks(numDn, numDn - 1, 1, client); |
| } |
| } |
| |
| @Test(timeout = 300000L) |
| public void testListOpenFiles() throws Exception { |
| redirectStream(); |
| |
| final Configuration dfsConf = new HdfsConfiguration(); |
| dfsConf.setInt( |
| DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); |
| dfsConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); |
| dfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES, 5); |
| final Path baseDir = new Path( |
| PathUtils.getTestDir(getClass()).getAbsolutePath(), |
| GenericTestUtils.getMethodName()); |
| dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString()); |
| |
| final int numDataNodes = 3; |
| final int numClosedFiles = 25; |
| final int numOpenFiles = 15; |
| |
| try(MiniDFSCluster miniCluster = new MiniDFSCluster |
| .Builder(dfsConf) |
| .numDataNodes(numDataNodes).build()) { |
| final short replFactor = 1; |
| final long fileLength = 512L; |
| final FileSystem fs = miniCluster.getFileSystem(); |
| final Path parentDir = new Path("/tmp/files/"); |
| |
| fs.mkdirs(parentDir); |
| HashSet<Path> closedFileSet = new HashSet<>(); |
| for (int i = 0; i < numClosedFiles; i++) { |
| Path file = new Path(parentDir, "closed-file-" + i); |
| DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L); |
| closedFileSet.add(file); |
| } |
| |
| HashMap<Path, FSDataOutputStream> openFilesMap = new HashMap<>(); |
| for (int i = 0; i < numOpenFiles; i++) { |
| Path file = new Path(parentDir, "open-file-" + i); |
| DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L); |
| FSDataOutputStream outputStream = fs.append(file); |
| openFilesMap.put(file, outputStream); |
| } |
| |
| final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf); |
| assertEquals(0, ToolRunner.run(dfsAdmin, |
| new String[]{"-listOpenFiles"})); |
| verifyOpenFilesListing(closedFileSet, openFilesMap); |
| |
| for (int count = 0; count < numOpenFiles; count++) { |
| closedFileSet.addAll(DFSTestUtil.closeOpenFiles(openFilesMap, 1)); |
| resetStream(); |
| assertEquals(0, ToolRunner.run(dfsAdmin, |
| new String[]{"-listOpenFiles"})); |
| verifyOpenFilesListing(closedFileSet, openFilesMap); |
| } |
| } |
| } |
| |
| private void verifyOpenFilesListing(HashSet<Path> closedFileSet, |
| HashMap<Path, FSDataOutputStream> openFilesMap) { |
| final String outStr = scanIntoString(out); |
| LOG.info("dfsadmin -listOpenFiles output: \n" + out); |
| for (Path closedFilePath : closedFileSet) { |
| assertThat(outStr, not(containsString(closedFilePath.toString() + |
| System.lineSeparator()))); |
| } |
| for (Path openFilePath : openFilesMap.keySet()) { |
| assertThat(outStr, is(containsString(openFilePath.toString() + |
| System.lineSeparator()))); |
| } |
| } |
| |
| private void verifyNodesAndCorruptBlocks( |
| final int numDn, |
| final int numLiveDn, |
| final int numCorruptBlocks, |
| final DFSClient client) throws IOException { |
| |
| /* init vars */ |
| final String outStr = scanIntoString(out); |
| final String expectedLiveNodesStr = String.format( |
| "Live datanodes (%d)", |
| numLiveDn); |
| final String expectedCorruptedBlocksStr = String.format( |
| "Blocks with corrupt replicas: %d", |
| numCorruptBlocks); |
| |
| /* verify nodes and corrupt blocks */ |
| assertThat(outStr, is(allOf( |
| containsString(expectedLiveNodesStr), |
| containsString(expectedCorruptedBlocksStr)))); |
| |
| assertEquals( |
| numDn, |
| client.getDatanodeStorageReport(DatanodeReportType.ALL).length); |
| assertEquals( |
| numLiveDn, |
| client.getDatanodeStorageReport(DatanodeReportType.LIVE).length); |
| assertEquals( |
| numDn - numLiveDn, |
| client.getDatanodeStorageReport(DatanodeReportType.DEAD).length); |
| assertEquals(numCorruptBlocks, client.getCorruptBlocksCount()); |
| } |
| } |