| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.datanode; |
| |
| import static org.hamcrest.core.Is.is; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assume.assumeTrue; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.ReconfigurationException; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.FsTracer; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.BlockReader; |
| import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; |
| import org.apache.hadoop.hdfs.ClientContext; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.DFSUtilClient; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.RemotePeerFactory; |
| import org.apache.hadoop.hdfs.client.impl.DfsClientConf; |
| import org.apache.hadoop.hdfs.net.Peer; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.DatanodeID; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; |
| import org.apache.hadoop.hdfs.server.common.Storage; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; |
| import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; |
| import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.Shell; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.io.filefilter.TrueFileFilter; |
| |
| import com.google.common.base.Supplier; |
| |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.internal.AssumptionViolatedException; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Fine-grain testing of block files and locations after volume failure. |
| */ |
| public class TestDataNodeVolumeFailure { |
| private final static Logger LOG = LoggerFactory.getLogger( |
| TestDataNodeVolumeFailure.class); |
| final private int block_size = 512; |
| MiniDFSCluster cluster = null; |
| private Configuration conf; |
| final int dn_num = 2; |
| final int blocks_num = 30; |
| final short repl=2; |
| File dataDir = null; |
| File data_fail = null; |
| File failedDir = null; |
| private FileSystem fs; |
| |
| // mapping blocks to Meta files(physical files) and locs(NameNode locations) |
| private class BlockLocs { |
| public int num_files = 0; |
| public int num_locs = 0; |
| } |
| // block id to BlockLocs |
| final Map<String, BlockLocs> block_map = new HashMap<String, BlockLocs> (); |
| |
| @Before |
| public void setUp() throws Exception { |
| // bring up a cluster of 2 |
| conf = new HdfsConfiguration(); |
| conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, block_size); |
| // Allow a single volume failure (there are two volumes) |
| conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); |
| conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); |
| conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, |
| 0, TimeUnit.MILLISECONDS); |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build(); |
| cluster.waitActive(); |
| fs = cluster.getFileSystem(); |
| dataDir = new File(cluster.getDataDirectory()); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| if(data_fail != null) { |
| FileUtil.setWritable(data_fail, true); |
| data_fail = null; |
| } |
| if(failedDir != null) { |
| FileUtil.setWritable(failedDir, true); |
| failedDir = null; |
| } |
| if(cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| } |
| |
| /* |
| * Verify the number of blocks and files are correct after volume failure, |
| * and that we can replicate to both datanodes even after a single volume |
| * failure if the configuration parameter allows this. |
| */ |
| @Test(timeout = 120000) |
| public void testVolumeFailure() throws Exception { |
| System.out.println("Data dir: is " + dataDir.getPath()); |
| |
| |
| // Data dir structure is dataDir/data[1-4]/[current,tmp...] |
| // data1,2 is for datanode 1, data2,3 - datanode2 |
| String filename = "/test.txt"; |
| Path filePath = new Path(filename); |
| |
| // we use only small number of blocks to avoid creating subdirs in the data dir.. |
| int filesize = block_size*blocks_num; |
| DFSTestUtil.createFile(fs, filePath, filesize, repl, 1L); |
| DFSTestUtil.waitReplication(fs, filePath, repl); |
| System.out.println("file " + filename + "(size " + |
| filesize + ") is created and replicated"); |
| |
| // fail the volume |
| // delete/make non-writable one of the directories (failed volume) |
| data_fail = new File(dataDir, "data3"); |
| failedDir = MiniDFSCluster.getFinalizedDir(data_fail, |
| cluster.getNamesystem().getBlockPoolId()); |
| if (failedDir.exists() && |
| //!FileUtil.fullyDelete(failedDir) |
| !deteteBlocks(failedDir) |
| ) { |
| throw new IOException("Could not delete hdfs directory '" + failedDir + "'"); |
| } |
| data_fail.setReadOnly(); |
| failedDir.setReadOnly(); |
| System.out.println("Deleteing " + failedDir.getPath() + "; exist=" + failedDir.exists()); |
| |
| // access all the blocks on the "failed" DataNode, |
| // we need to make sure that the "failed" volume is being accessed - |
| // and that will cause failure, blocks removal, "emergency" block report |
| triggerFailure(filename, filesize); |
| |
| // DN eventually have latest volume failure information for next heartbeat |
| final DataNode dn = cluster.getDataNodes().get(1); |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| final VolumeFailureSummary summary = |
| dn.getFSDataset().getVolumeFailureSummary(); |
| return summary != null && |
| summary.getFailedStorageLocations() != null && |
| summary.getFailedStorageLocations().length == 1; |
| } |
| }, 10, 30 * 1000); |
| |
| // trigger DN to send heartbeat |
| DataNodeTestUtils.triggerHeartbeat(dn); |
| final BlockManager bm = cluster.getNamesystem().getBlockManager(); |
| // trigger NN handel heartbeat |
| BlockManagerTestUtil.checkHeartbeat(bm); |
| // NN now should have latest volume failure |
| assertEquals(1, cluster.getNamesystem().getVolumeFailuresTotal()); |
| |
| // verify number of blocks and files... |
| verify(filename, filesize); |
| |
| // create another file (with one volume failed). |
| System.out.println("creating file test1.txt"); |
| Path fileName1 = new Path("/test1.txt"); |
| DFSTestUtil.createFile(fs, fileName1, filesize, repl, 1L); |
| |
| // should be able to replicate to both nodes (2 DN, repl=2) |
| DFSTestUtil.waitReplication(fs, fileName1, repl); |
| System.out.println("file " + fileName1.getName() + |
| " is created and replicated"); |
| } |
| |
| /** |
| * Test that DataStorage and BlockPoolSliceStorage remove the failed volume |
| * after failure. |
| */ |
| @Test(timeout=150000) |
| public void testFailedVolumeBeingRemovedFromDataNode() |
| throws InterruptedException, IOException, TimeoutException { |
| // The test uses DataNodeTestUtils#injectDataDirFailure() to simulate |
| // volume failures which is currently not supported on Windows. |
| assumeTrue(!Path.WINDOWS); |
| |
| Path file1 = new Path("/test1"); |
| DFSTestUtil.createFile(fs, file1, 1024, (short) 2, 1L); |
| DFSTestUtil.waitReplication(fs, file1, (short) 2); |
| |
| File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1)); |
| DataNodeTestUtils.injectDataDirFailure(dn0Vol1); |
| DataNode dn0 = cluster.getDataNodes().get(0); |
| checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1)); |
| |
| // Verify dn0Vol1 has been completely removed from DN0. |
| // 1. dn0Vol1 is removed from DataStorage. |
| DataStorage storage = dn0.getStorage(); |
| assertEquals(1, storage.getNumStorageDirs()); |
| for (int i = 0; i < storage.getNumStorageDirs(); i++) { |
| Storage.StorageDirectory sd = storage.getStorageDir(i); |
| assertFalse(sd.getRoot().getAbsolutePath().startsWith( |
| dn0Vol1.getAbsolutePath() |
| )); |
| } |
| final String bpid = cluster.getNamesystem().getBlockPoolId(); |
| BlockPoolSliceStorage bpsStorage = storage.getBPStorage(bpid); |
| assertEquals(1, bpsStorage.getNumStorageDirs()); |
| for (int i = 0; i < bpsStorage.getNumStorageDirs(); i++) { |
| Storage.StorageDirectory sd = bpsStorage.getStorageDir(i); |
| assertFalse(sd.getRoot().getAbsolutePath().startsWith( |
| dn0Vol1.getAbsolutePath() |
| )); |
| } |
| |
| // 2. dn0Vol1 is removed from FsDataset |
| FsDatasetSpi<? extends FsVolumeSpi> data = dn0.getFSDataset(); |
| try (FsDatasetSpi.FsVolumeReferences vols = data.getFsVolumeReferences()) { |
| for (FsVolumeSpi volume : vols) { |
| assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(), |
| dn0Vol1.getAbsoluteFile()); |
| } |
| } |
| |
| // 3. all blocks on dn0Vol1 have been removed. |
| for (ReplicaInfo replica : FsDatasetTestUtil.getReplicas(data, bpid)) { |
| assertNotNull(replica.getVolume()); |
| assertNotEquals( |
| new File(replica.getVolume().getBasePath()).getAbsoluteFile(), |
| dn0Vol1.getAbsoluteFile()); |
| } |
| |
| // 4. dn0Vol1 is not in DN0's configuration and dataDirs anymore. |
| String[] dataDirStrs = |
| dn0.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(","); |
| assertEquals(1, dataDirStrs.length); |
| assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath())); |
| } |
| |
| private static void checkDiskErrorSync(DataNode dn, FsVolumeSpi volume) |
| throws InterruptedException { |
| final long lastDiskErrorCheck = dn.getLastDiskErrorCheck(); |
| dn.checkDiskErrorAsync(volume); |
| // Wait 10 seconds for checkDiskError thread to finish and discover volume |
| // failures. |
| int count = 100; |
| while (count > 0 && dn.getLastDiskErrorCheck() == lastDiskErrorCheck) { |
| Thread.sleep(100); |
| count--; |
| } |
| assertTrue("Disk checking thread does not finish in 10 seconds", |
| count > 0); |
| } |
| |
| /** |
| * Test DataNode stops when the number of failed volumes exceeds |
| * dfs.datanode.failed.volumes.tolerated . |
| */ |
| @Test(timeout=10000) |
| public void testDataNodeShutdownAfterNumFailedVolumeExceedsTolerated() |
| throws InterruptedException, IOException { |
| if (Shell.WINDOWS) { |
| // The test uses DataNodeTestUtils#injectDataDirFailure() to simulate |
| // volume failures which is currently not supported on Windows. |
| throw new AssumptionViolatedException("Expected Unix-like platform"); |
| } |
| |
| // make both data directories to fail on dn0 |
| final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1)); |
| final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2)); |
| DataNodeTestUtils.injectDataDirFailure(dn0Vol1, dn0Vol2); |
| DataNode dn0 = cluster.getDataNodes().get(0); |
| checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1)); |
| checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2)); |
| |
| // DN0 should stop after the number of failure disks exceed tolerated |
| // value (1). |
| assertFalse(dn0.shouldRun()); |
| } |
| |
| /** |
| * Test that DN does not shutdown, as long as failure volumes being hot swapped. |
| */ |
| @Test |
| public void testVolumeFailureRecoveredByHotSwappingVolume() |
| throws InterruptedException, ReconfigurationException, IOException { |
| if (Shell.WINDOWS) { |
| // The test uses DataNodeTestUtils#injectDataDirFailure() to simulate |
| // volume failures which is currently not supported on Windows. |
| throw new AssumptionViolatedException("Expected Unix-like platform"); |
| } |
| |
| final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1)); |
| final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2)); |
| final DataNode dn0 = cluster.getDataNodes().get(0); |
| final String oldDataDirs = dn0.getConf().get( |
| DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); |
| |
| // Fail dn0Vol1 first. |
| DataNodeTestUtils.injectDataDirFailure(dn0Vol1); |
| checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1)); |
| |
| // Hot swap out the failure volume. |
| String dataDirs = dn0Vol2.getPath(); |
| assertThat( |
| dn0.reconfigurePropertyImpl( |
| DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDirs), |
| is(dn0.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY))); |
| |
| // Fix failure volume dn0Vol1 and remount it back. |
| DataNodeTestUtils.restoreDataDirFromFailure(dn0Vol1); |
| assertThat( |
| dn0.reconfigurePropertyImpl( |
| DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, oldDataDirs), |
| is(dn0.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY))); |
| |
| // Fail dn0Vol2. Now since dn0Vol1 has been fixed, DN0 has sufficient |
| // resources, thus it should keep running. |
| DataNodeTestUtils.injectDataDirFailure(dn0Vol2); |
| checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2)); |
| assertTrue(dn0.shouldRun()); |
| } |
| |
| /** |
| * Test changing the number of volumes does not impact the disk failure |
| * tolerance. |
| */ |
| @Test |
| public void testTolerateVolumeFailuresAfterAddingMoreVolumes() |
| throws InterruptedException, ReconfigurationException, IOException { |
| if (Shell.WINDOWS) { |
| // The test uses DataNodeTestUtils#injectDataDirFailure() to simulate |
| // volume failures which is currently not supported on Windows. |
| throw new AssumptionViolatedException("Expected Unix-like platform"); |
| } |
| |
| final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1)); |
| final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2)); |
| final File dn0VolNew = new File(dataDir, "data_new"); |
| final DataNode dn0 = cluster.getDataNodes().get(0); |
| final String oldDataDirs = dn0.getConf().get( |
| DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); |
| |
| // Add a new volume to DN0 |
| assertThat( |
| dn0.reconfigurePropertyImpl( |
| DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, |
| oldDataDirs + "," + dn0VolNew.getAbsolutePath()), |
| is(dn0.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY))); |
| |
| // Fail dn0Vol1 first and hot swap it. |
| DataNodeTestUtils.injectDataDirFailure(dn0Vol1); |
| checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1)); |
| assertTrue(dn0.shouldRun()); |
| |
| // Fail dn0Vol2, now dn0 should stop, because we only tolerate 1 disk failure. |
| DataNodeTestUtils.injectDataDirFailure(dn0Vol2); |
| checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2)); |
| assertFalse(dn0.shouldRun()); |
| } |
| |
| /** |
| * Test that there are under replication blocks after vol failures |
| */ |
| @Test |
| public void testUnderReplicationAfterVolFailure() throws Exception { |
| // The test uses DataNodeTestUtils#injectDataDirFailure() to simulate |
| // volume failures which is currently not supported on Windows. |
| assumeTrue(!Path.WINDOWS); |
| |
| // Bring up one more datanode |
| cluster.startDataNodes(conf, 1, true, null, null); |
| cluster.waitActive(); |
| |
| final BlockManager bm = cluster.getNamesystem().getBlockManager(); |
| |
| Path file1 = new Path("/test1"); |
| DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L); |
| DFSTestUtil.waitReplication(fs, file1, (short)3); |
| |
| // Fail the first volume on both datanodes |
| File dn1Vol1 = new File(dataDir, "data"+(2*0+1)); |
| File dn2Vol1 = new File(dataDir, "data"+(2*1+1)); |
| DataNodeTestUtils.injectDataDirFailure(dn1Vol1, dn2Vol1); |
| |
| Path file2 = new Path("/test2"); |
| DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L); |
| DFSTestUtil.waitReplication(fs, file2, (short)3); |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| // underReplicatedBlocks are due to failed volumes |
| int underReplicatedBlocks = BlockManagerTestUtil |
| .checkHeartbeatAndGetUnderReplicatedBlocksCount( |
| cluster.getNamesystem(), bm); |
| |
| if (underReplicatedBlocks > 0) { |
| return true; |
| } |
| LOG.info("There is no under replicated block after volume failure."); |
| |
| return false; |
| } |
| }, 500, 60000); |
| } |
| |
| /** |
| * Test if there is volume failure, the DataNode will fail to start. |
| * |
| * We fail a volume by setting the parent directory non-writable. |
| */ |
| @Test (timeout = 120000) |
| public void testDataNodeFailToStartWithVolumeFailure() throws Exception { |
| // Method to simulate volume failures is currently not supported on Windows. |
| assumeTrue(!Path.WINDOWS); |
| |
| failedDir = new File(dataDir, "failedDir"); |
| assertTrue("Failed to fail a volume by setting it non-writable", |
| failedDir.mkdir() && failedDir.setReadOnly()); |
| |
| startNewDataNodeWithDiskFailure(new File(failedDir, "newDir1"), false); |
| } |
| |
| /** |
| * DataNode will start and tolerate one failing disk according to config. |
| * |
| * We fail a volume by setting the parent directory non-writable. |
| */ |
| @Test (timeout = 120000) |
| public void testDNStartAndTolerateOneVolumeFailure() throws Exception { |
| // Method to simulate volume failures is currently not supported on Windows. |
| assumeTrue(!Path.WINDOWS); |
| |
| failedDir = new File(dataDir, "failedDir"); |
| assertTrue("Failed to fail a volume by setting it non-writable", |
| failedDir.mkdir() && failedDir.setReadOnly()); |
| |
| startNewDataNodeWithDiskFailure(new File(failedDir, "newDir1"), true); |
| } |
| |
| /** |
| * Test if data directory is not readable/writable, DataNode won't start. |
| */ |
| @Test (timeout = 120000) |
| public void testDNFailToStartWithDataDirNonWritable() throws Exception { |
| // Method to simulate volume failures is currently not supported on Windows. |
| assumeTrue(!Path.WINDOWS); |
| |
| final File readOnlyDir = new File(dataDir, "nonWritable"); |
| assertTrue("Set the data dir permission non-writable", |
| readOnlyDir.mkdir() && readOnlyDir.setReadOnly()); |
| |
| startNewDataNodeWithDiskFailure(new File(readOnlyDir, "newDir1"), false); |
| } |
| |
| /** |
| * DataNode will start and tolerate one non-writable data directory |
| * according to config. |
| */ |
| @Test (timeout = 120000) |
| public void testDNStartAndTolerateOneDataDirNonWritable() throws Exception { |
| // Method to simulate volume failures is currently not supported on Windows. |
| assumeTrue(!Path.WINDOWS); |
| |
| final File readOnlyDir = new File(dataDir, "nonWritable"); |
| assertTrue("Set the data dir permission non-writable", |
| readOnlyDir.mkdir() && readOnlyDir.setReadOnly()); |
| startNewDataNodeWithDiskFailure(new File(readOnlyDir, "newDir1"), true); |
| } |
| |
| /** |
| * @param badDataDir bad data dir, either disk failure or non-writable |
| * @param tolerated true if one volume failure is allowed else false |
| */ |
| private void startNewDataNodeWithDiskFailure(File badDataDir, |
| boolean tolerated) throws Exception { |
| final File data5 = new File(dataDir, "data5"); |
| final String newDirs = badDataDir.toString() + "," + data5.toString(); |
| final Configuration newConf = new Configuration(conf); |
| newConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs); |
| LOG.info("Setting dfs.datanode.data.dir for new DataNode as {}", newDirs); |
| newConf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, |
| tolerated ? 1 : 0); |
| |
| // bring up one more DataNode |
| assertEquals(repl, cluster.getDataNodes().size()); |
| |
| try { |
| cluster.startDataNodes(newConf, 1, false, null, null); |
| assertTrue("Failed to get expected IOException", tolerated); |
| } catch (IOException ioe) { |
| assertFalse("Unexpected IOException " + ioe, tolerated); |
| return; |
| } |
| |
| assertEquals(repl + 1, cluster.getDataNodes().size()); |
| |
| // create new file and it should be able to replicate to 3 nodes |
| final Path p = new Path("/test1.txt"); |
| DFSTestUtil.createFile(fs, p, block_size * blocks_num, (short) 3, 1L); |
| DFSTestUtil.waitReplication(fs, p, (short) (repl + 1)); |
| } |
| |
| /** |
| * verifies two things: |
| * 1. number of locations of each block in the name node |
| * matches number of actual files |
| * 2. block files + pending block equals to total number of blocks that a file has |
| * including the replication (HDFS file has 30 blocks, repl=2 - total 60 |
| * @param fn - file name |
| * @param fs - file size |
| * @throws IOException |
| */ |
| private void verify(String fn, int fs) throws IOException{ |
| // now count how many physical blocks are there |
| int totalReal = countRealBlocks(block_map); |
| System.out.println("countRealBlocks counted " + totalReal + " blocks"); |
| |
| // count how many blocks store in NN structures. |
| int totalNN = countNNBlocks(block_map, fn, fs); |
| System.out.println("countNNBlocks counted " + totalNN + " blocks"); |
| |
| for(String bid : block_map.keySet()) { |
| BlockLocs bl = block_map.get(bid); |
| // System.out.println(bid + "->" + bl.num_files + "vs." + bl.num_locs); |
| // number of physical files (1 or 2) should be same as number of datanodes |
| // in the list of the block locations |
| assertEquals("Num files should match num locations", |
| bl.num_files, bl.num_locs); |
| } |
| assertEquals("Num physical blocks should match num stored in the NN", |
| totalReal, totalNN); |
| |
| // now check the number of under-replicated blocks |
| FSNamesystem fsn = cluster.getNamesystem(); |
| // force update of all the metric counts by calling computeDatanodeWork |
| BlockManagerTestUtil.getComputedDatanodeWork(fsn.getBlockManager()); |
| // get all the counts |
| long underRepl = fsn.getUnderReplicatedBlocks(); |
| long pendRepl = fsn.getPendingReplicationBlocks(); |
| long totalRepl = underRepl + pendRepl; |
| System.out.println("underreplicated after = "+ underRepl + |
| " and pending repl =" + pendRepl + "; total underRepl = " + totalRepl); |
| |
| System.out.println("total blocks (real and replicating):" + |
| (totalReal + totalRepl) + " vs. all files blocks " + blocks_num*2); |
| |
| // together all the blocks should be equal to all real + all underreplicated |
| assertEquals("Incorrect total block count", |
| totalReal + totalRepl, blocks_num * repl); |
| } |
| |
| /** |
| * go to each block on the 2nd DataNode until it fails... |
| * @param path |
| * @param size |
| * @throws IOException |
| */ |
| private void triggerFailure(String path, long size) throws IOException { |
| NamenodeProtocols nn = cluster.getNameNodeRpc(); |
| List<LocatedBlock> locatedBlocks = |
| nn.getBlockLocations(path, 0, size).getLocatedBlocks(); |
| |
| for (LocatedBlock lb : locatedBlocks) { |
| DatanodeInfo dinfo = lb.getLocations()[1]; |
| ExtendedBlock b = lb.getBlock(); |
| try { |
| accessBlock(dinfo, lb); |
| } catch (IOException e) { |
| System.out.println("Failure triggered, on block: " + b.getBlockId() + |
| "; corresponding volume should be removed by now"); |
| break; |
| } |
| } |
| } |
| |
| /** |
| * simulate failure delete all the block files |
| * @param dir |
| * @throws IOException |
| */ |
| private boolean deteteBlocks(File dir) { |
| Collection<File> fileList = FileUtils.listFiles(dir, |
| TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE); |
| for(File f : fileList) { |
| if(f.getName().startsWith(Block.BLOCK_FILE_PREFIX)) { |
| System.out.println("Deleting file " + f); |
| if(!f.delete()) |
| return false; |
| |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * try to access a block on a data node. If fails - throws exception |
| * @param datanode |
| * @param lblock |
| * @throws IOException |
| */ |
| private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) |
| throws IOException { |
| InetSocketAddress targetAddr = null; |
| ExtendedBlock block = lblock.getBlock(); |
| |
| targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); |
| |
| BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)). |
| setInetSocketAddress(targetAddr). |
| setBlock(block). |
| setFileName(BlockReaderFactory.getFileName(targetAddr, |
| "test-blockpoolid", block.getBlockId())). |
| setBlockToken(lblock.getBlockToken()). |
| setStartOffset(0). |
| setLength(0). |
| setVerifyChecksum(true). |
| setClientName("TestDataNodeVolumeFailure"). |
| setDatanodeInfo(datanode). |
| setCachingStrategy(CachingStrategy.newDefaultStrategy()). |
| setClientCacheContext(ClientContext.getFromConf(conf)). |
| setConfiguration(conf). |
| setTracer(FsTracer.get(conf)). |
| setRemotePeerFactory(new RemotePeerFactory() { |
| @Override |
| public Peer newConnectedPeer(InetSocketAddress addr, |
| Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) |
| throws IOException { |
| Peer peer = null; |
| Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); |
| try { |
| sock.connect(addr, HdfsConstants.READ_TIMEOUT); |
| sock.setSoTimeout(HdfsConstants.READ_TIMEOUT); |
| peer = DFSUtilClient.peerFromSocket(sock); |
| } finally { |
| if (peer == null) { |
| IOUtils.closeSocket(sock); |
| } |
| } |
| return peer; |
| } |
| }). |
| build(); |
| blockReader.close(); |
| } |
| |
| /** |
| * Count datanodes that have copies of the blocks for a file |
| * put it into the map |
| * @param map |
| * @param path |
| * @param size |
| * @return |
| * @throws IOException |
| */ |
| private int countNNBlocks(Map<String, BlockLocs> map, String path, long size) |
| throws IOException { |
| int total = 0; |
| |
| NamenodeProtocols nn = cluster.getNameNodeRpc(); |
| List<LocatedBlock> locatedBlocks = |
| nn.getBlockLocations(path, 0, size).getLocatedBlocks(); |
| //System.out.println("Number of blocks: " + locatedBlocks.size()); |
| |
| for(LocatedBlock lb : locatedBlocks) { |
| String blockId = ""+lb.getBlock().getBlockId(); |
| //System.out.print(blockId + ": "); |
| DatanodeInfo[] dn_locs = lb.getLocations(); |
| BlockLocs bl = map.get(blockId); |
| if(bl == null) { |
| bl = new BlockLocs(); |
| } |
| //System.out.print(dn_info.name+","); |
| total += dn_locs.length; |
| bl.num_locs += dn_locs.length; |
| map.put(blockId, bl); |
| //System.out.println(); |
| } |
| return total; |
| } |
| |
| /** |
| * look for real blocks |
| * by counting *.meta files in all the storage dirs |
| * @param map |
| * @return |
| */ |
| private int countRealBlocks(Map<String, BlockLocs> map) { |
| int total = 0; |
| final String bpid = cluster.getNamesystem().getBlockPoolId(); |
| for(int i=0; i<dn_num; i++) { |
| for(int j=0; j<=1; j++) { |
| File storageDir = cluster.getInstanceStorageDir(i, j); |
| File dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); |
| if(dir == null) { |
| System.out.println("dir is null for dn=" + i + " and data_dir=" + j); |
| continue; |
| } |
| |
| List<File> res = MiniDFSCluster.getAllBlockMetadataFiles(dir); |
| if(res == null) { |
| System.out.println("res is null for dir = " + dir + " i=" + i + " and j=" + j); |
| continue; |
| } |
| //System.out.println("for dn" + i + "." + j + ": " + dir + "=" + res.length+ " files"); |
| |
| //int ii = 0; |
| for(File f: res) { |
| String s = f.getName(); |
| // cut off "blk_-" at the beginning and ".meta" at the end |
| assertNotNull("Block file name should not be null", s); |
| String bid = s.substring(s.indexOf("_")+1, s.lastIndexOf("_")); |
| //System.out.println(ii++ + ". block " + s + "; id=" + bid); |
| BlockLocs val = map.get(bid); |
| if(val == null) { |
| val = new BlockLocs(); |
| } |
| val.num_files ++; // one more file for the block |
| map.put(bid, val); |
| |
| } |
| //System.out.println("dir1="+dir.getPath() + "blocks=" + res.length); |
| //System.out.println("dir2="+dir2.getPath() + "blocks=" + res2.length); |
| |
| total += res.size(); |
| } |
| } |
| return total; |
| } |
| } |