| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.server.datanode; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.collect.Lists; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.ReconfigurationException; |
| import org.apache.hadoop.fs.BlockLocation; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.hdfs.BlockMissingException; |
| import org.apache.hadoop.hdfs.DFSClient; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.MiniDFSNNTopology; |
| import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlocks; |
| import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; |
| import org.apache.hadoop.hdfs.server.common.Storage; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; |
| import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; |
| import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; |
| import org.apache.hadoop.io.MultipleIOException; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.Time; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.BrokenBarrierException; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.stream.Collectors; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.mockito.Mockito; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; |
| import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows; |
| import static org.hamcrest.CoreMatchers.anyOf; |
| import static org.hamcrest.CoreMatchers.containsString; |
| import static org.hamcrest.CoreMatchers.not; |
| import static org.hamcrest.core.Is.is; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.anyString; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.timeout; |
| |
| public class TestDataNodeHotSwapVolumes { |
| private static final Log LOG = LogFactory.getLog( |
| TestDataNodeHotSwapVolumes.class); |
| private static final int BLOCK_SIZE = 512; |
| private static final int DEFAULT_STORAGES_PER_DATANODE = 2; |
| private MiniDFSCluster cluster; |
| private Configuration conf; |
| |
| @After |
| public void tearDown() { |
| shutdown(); |
| } |
| |
| private void startDFSCluster(int numNameNodes, int numDataNodes) |
| throws IOException { |
| startDFSCluster(numNameNodes, numDataNodes, DEFAULT_STORAGES_PER_DATANODE); |
| } |
| |
| private void startDFSCluster(int numNameNodes, int numDataNodes, |
| int storagePerDataNode) throws IOException { |
| shutdown(); |
| conf = new Configuration(); |
| conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); |
| |
| /* |
| * Lower the DN heartbeat, DF rate, and recheck interval to one second |
| * so state about failures and datanode death propagates faster. |
| */ |
| conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); |
| conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000); |
| conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, |
| 1000); |
| /* Allow 1 volume failure */ |
| conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); |
| conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, |
| 0, TimeUnit.MILLISECONDS); |
| |
| MiniDFSNNTopology nnTopology = |
| MiniDFSNNTopology.simpleFederatedTopology(numNameNodes); |
| |
| cluster = new MiniDFSCluster.Builder(conf) |
| .nnTopology(nnTopology) |
| .numDataNodes(numDataNodes) |
| .storagesPerDatanode(storagePerDataNode) |
| .build(); |
| cluster.waitActive(); |
| } |
| |
| private void shutdown() { |
| if (cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| } |
| |
| private void createFile(Path path, int numBlocks) |
| throws IOException, InterruptedException, TimeoutException { |
| final short replicateFactor = 1; |
| createFile(path, numBlocks, replicateFactor); |
| } |
| |
| private void createFile(Path path, int numBlocks, short replicateFactor) |
| throws IOException, InterruptedException, TimeoutException { |
| createFile(0, path, numBlocks, replicateFactor); |
| } |
| |
| private void createFile(int fsIdx, Path path, int numBlocks) |
| throws IOException, InterruptedException, TimeoutException { |
| final short replicateFactor = 1; |
| createFile(fsIdx, path, numBlocks, replicateFactor); |
| } |
| |
| private void createFile(int fsIdx, Path path, int numBlocks, |
| short replicateFactor) |
| throws IOException, TimeoutException, InterruptedException { |
| final int seed = 0; |
| final DistributedFileSystem fs = cluster.getFileSystem(fsIdx); |
| DFSTestUtil.createFile(fs, path, BLOCK_SIZE * numBlocks, |
| replicateFactor, seed); |
| DFSTestUtil.waitReplication(fs, path, replicateFactor); |
| } |
| |
| /** |
| * Verify whether a file has enough content. |
| */ |
| private static void verifyFileLength(FileSystem fs, Path path, int numBlocks) |
| throws IOException { |
| FileStatus status = fs.getFileStatus(path); |
| assertEquals(numBlocks * BLOCK_SIZE, status.getLen()); |
| } |
| |
| /** Return the number of replicas for a given block in the file. */ |
| private static int getNumReplicas(FileSystem fs, Path file, |
| int blockIdx) throws IOException { |
| BlockLocation locs[] = fs.getFileBlockLocations(file, 0, Long.MAX_VALUE); |
| return locs.length < blockIdx + 1 ? 0 : locs[blockIdx].getNames().length; |
| } |
| |
| /** |
| * Wait the block to have the exact number of replicas as expected. |
| */ |
| private static void waitReplication(FileSystem fs, Path file, int blockIdx, |
| int numReplicas) |
| throws IOException, TimeoutException, InterruptedException { |
| int attempts = 50; // Wait 5 seconds. |
| while (attempts > 0) { |
| int actualReplicas = getNumReplicas(fs, file, blockIdx); |
| if (actualReplicas == numReplicas) { |
| return; |
| } |
| System.out.printf("Block %d of file %s has %d replicas (desired %d).\n", |
| blockIdx, file.toString(), actualReplicas, numReplicas); |
| Thread.sleep(100); |
| attempts--; |
| } |
| throw new TimeoutException("Timed out waiting the " + blockIdx + "-th block" |
| + " of " + file + " to have " + numReplicas + " replicas."); |
| } |
| |
| /** Parses data dirs from DataNode's configuration. */ |
| private static List<String> getDataDirs(DataNode datanode) { |
| return new ArrayList<String>(datanode.getConf().getTrimmedStringCollection( |
| DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)); |
| } |
| |
| /** Force the DataNode to report missing blocks immediately. */ |
| private static void triggerDeleteReport(DataNode datanode) |
| throws IOException { |
| datanode.scheduleAllBlockReport(0); |
| DataNodeTestUtils.triggerDeletionReport(datanode); |
| } |
| |
| @Test |
| public void testParseChangedVolumes() throws IOException { |
| startDFSCluster(1, 1); |
| DataNode dn = cluster.getDataNodes().get(0); |
| Configuration conf = dn.getConf(); |
| |
| String oldPaths = conf.get(DFS_DATANODE_DATA_DIR_KEY); |
| List<StorageLocation> oldLocations = new ArrayList<StorageLocation>(); |
| for (String path : oldPaths.split(",")) { |
| oldLocations.add(StorageLocation.parse(path)); |
| } |
| assertFalse(oldLocations.isEmpty()); |
| |
| String newPaths = new File(oldLocations.get(0).getUri()).getAbsolutePath() + |
| ",/foo/path1,/foo/path2"; |
| |
| DataNode.ChangedVolumes changedVolumes = |
| dn.parseChangedVolumes(newPaths); |
| List<StorageLocation> newVolumes = changedVolumes.newLocations; |
| assertEquals(2, newVolumes.size()); |
| assertEquals(new File("/foo/path1").getAbsolutePath(), |
| new File(newVolumes.get(0).getUri()).getAbsolutePath()); |
| assertEquals(new File("/foo/path2").getAbsolutePath(), |
| new File(newVolumes.get(1).getUri()).getAbsolutePath()); |
| |
| List<StorageLocation> removedVolumes = changedVolumes.deactivateLocations; |
| assertEquals(1, removedVolumes.size()); |
| assertEquals(oldLocations.get(1).getNormalizedUri(), |
| removedVolumes.get(0).getNormalizedUri()); |
| |
| assertEquals(1, changedVolumes.unchangedLocations.size()); |
| assertEquals(oldLocations.get(0).getNormalizedUri(), |
| changedVolumes.unchangedLocations.get(0).getNormalizedUri()); |
| } |
| |
| @Test |
| public void testParseChangedVolumesFailures() throws IOException { |
| startDFSCluster(1, 1); |
| DataNode dn = cluster.getDataNodes().get(0); |
| try { |
| dn.parseChangedVolumes(""); |
| fail("Should throw IOException: empty inputs."); |
| } catch (IOException e) { |
| GenericTestUtils.assertExceptionContains("No directory is specified.", e); |
| } |
| } |
| |
| @Test |
| public void testParseStorageTypeChanges() throws IOException { |
| startDFSCluster(1, 1); |
| DataNode dn = cluster.getDataNodes().get(0); |
| Configuration conf = dn.getConf(); |
| List<StorageLocation> oldLocations = DataNode.getStorageLocations(conf); |
| |
| // Change storage type of an existing StorageLocation |
| String newLoc = String.format("[%s]%s", StorageType.SSD, |
| oldLocations.get(1).getUri()); |
| String newDataDirs = oldLocations.get(0).toString() + "," + newLoc; |
| |
| try { |
| dn.parseChangedVolumes(newDataDirs); |
| fail("should throw IOE because storage type changes."); |
| } catch (IOException e) { |
| GenericTestUtils.assertExceptionContains( |
| "Changing storage type is not allowed", e); |
| } |
| } |
| |
| /** Add volumes to the first DataNode. */ |
| private void addVolumes(int numNewVolumes) |
| throws InterruptedException, IOException, ReconfigurationException { |
| addVolumes(numNewVolumes, new CountDownLatch(0)); |
| } |
| |
| private void addVolumes(int numNewVolumes, CountDownLatch waitLatch) |
| throws ReconfigurationException, IOException, InterruptedException { |
| DataNode dn = cluster.getDataNodes().get(0); // First DataNode. |
| Configuration conf = dn.getConf(); |
| String oldDataDir = conf.get(DFS_DATANODE_DATA_DIR_KEY); |
| |
| List<File> newVolumeDirs = new ArrayList<File>(); |
| StringBuilder newDataDirBuf = new StringBuilder(oldDataDir); |
| int startIdx = oldDataDir.split(",").length + 1; |
| // Find the first available (non-taken) directory name for data volume. |
| while (true) { |
| File volumeDir = cluster.getInstanceStorageDir(0, startIdx); |
| if (!volumeDir.exists()) { |
| break; |
| } |
| startIdx++; |
| } |
| for (int i = startIdx; i < startIdx + numNewVolumes; i++) { |
| File volumeDir = cluster.getInstanceStorageDir(0, i); |
| newVolumeDirs.add(volumeDir); |
| volumeDir.mkdirs(); |
| newDataDirBuf.append(","); |
| newDataDirBuf.append( |
| StorageLocation.parse(volumeDir.toString()).toString()); |
| } |
| |
| String newDataDir = newDataDirBuf.toString(); |
| assertThat( |
| "DN did not update its own config", |
| dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir), |
| is(conf.get(DFS_DATANODE_DATA_DIR_KEY))); |
| |
| // Await on the latch for needed operations to complete |
| waitLatch.await(); |
| |
| // Verify the configuration value is appropriately set. |
| String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(","); |
| String[] expectDataDirs = newDataDir.split(","); |
| assertEquals(expectDataDirs.length, effectiveDataDirs.length); |
| List<StorageLocation> expectedStorageLocations = new ArrayList<>(); |
| List<StorageLocation> effectiveStorageLocations = new ArrayList<>(); |
| for (int i = 0; i < expectDataDirs.length; i++) { |
| StorageLocation expectLocation = StorageLocation.parse(expectDataDirs[i]); |
| StorageLocation effectiveLocation = StorageLocation |
| .parse(effectiveDataDirs[i]); |
| expectedStorageLocations.add(expectLocation); |
| effectiveStorageLocations.add(effectiveLocation); |
| } |
| Comparator<StorageLocation> comparator = new Comparator<StorageLocation>() { |
| |
| @Override |
| public int compare(StorageLocation o1, StorageLocation o2) { |
| return o1.toString().compareTo(o2.toString()); |
| } |
| |
| }; |
| Collections.sort(expectedStorageLocations, comparator); |
| Collections.sort(effectiveStorageLocations, comparator); |
| assertEquals("Effective volumes doesnt match expected", |
| expectedStorageLocations, effectiveStorageLocations); |
| |
| // Check that all newly created volumes are appropriately formatted. |
| for (File volumeDir : newVolumeDirs) { |
| File curDir = new File(volumeDir, "current"); |
| assertTrue(curDir.exists()); |
| assertTrue(curDir.isDirectory()); |
| } |
| } |
| |
| private List<List<Integer>> getNumBlocksReport(int namesystemIdx) { |
| List<List<Integer>> results = new ArrayList<List<Integer>>(); |
| final String bpid = cluster.getNamesystem(namesystemIdx).getBlockPoolId(); |
| List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = |
| cluster.getAllBlockReports(bpid); |
| for (Map<DatanodeStorage, BlockListAsLongs> datanodeReport : blockReports) { |
| List<Integer> numBlocksPerDN = new ArrayList<Integer>(); |
| for (BlockListAsLongs blocks : datanodeReport.values()) { |
| numBlocksPerDN.add(blocks.getNumberOfBlocks()); |
| } |
| results.add(numBlocksPerDN); |
| } |
| return results; |
| } |
| |
| /** |
| * Test adding one volume on a running MiniDFSCluster with only one NameNode. |
| */ |
| @Test(timeout=60000) |
| public void testAddOneNewVolume() |
| throws IOException, ReconfigurationException, |
| InterruptedException, TimeoutException { |
| startDFSCluster(1, 1); |
| String bpid = cluster.getNamesystem().getBlockPoolId(); |
| final int numBlocks = 10; |
| |
| addVolumes(1); |
| |
| Path testFile = new Path("/test"); |
| createFile(testFile, numBlocks); |
| |
| List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = |
| cluster.getAllBlockReports(bpid); |
| assertEquals(1, blockReports.size()); // 1 DataNode |
| assertEquals(3, blockReports.get(0).size()); // 3 volumes |
| |
| // FSVolumeList uses Round-Robin block chooser by default. Thus the new |
| // blocks should be evenly located in all volumes. |
| int minNumBlocks = Integer.MAX_VALUE; |
| int maxNumBlocks = Integer.MIN_VALUE; |
| for (BlockListAsLongs blockList : blockReports.get(0).values()) { |
| minNumBlocks = Math.min(minNumBlocks, blockList.getNumberOfBlocks()); |
| maxNumBlocks = Math.max(maxNumBlocks, blockList.getNumberOfBlocks()); |
| } |
| assertTrue(Math.abs(maxNumBlocks - maxNumBlocks) <= 1); |
| verifyFileLength(cluster.getFileSystem(), testFile, numBlocks); |
| } |
| |
| @Test(timeout=60000) |
| public void testAddVolumesDuringWrite() |
| throws IOException, InterruptedException, TimeoutException, |
| ReconfigurationException { |
| startDFSCluster(1, 1); |
| int numVolumes = cluster.getStoragesPerDatanode(); |
| String bpid = cluster.getNamesystem().getBlockPoolId(); |
| Path testFile = new Path("/test"); |
| |
| // Each volume has 2 blocks |
| int initialBlockCount = numVolumes * 2; |
| createFile(testFile, initialBlockCount); |
| |
| int newVolumeCount = 5; |
| addVolumes(newVolumeCount); |
| numVolumes += newVolumeCount; |
| |
| int additionalBlockCount = 9; |
| int totalBlockCount = initialBlockCount + additionalBlockCount; |
| |
| // Continue to write the same file, thus the new volumes will have blocks. |
| DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, |
| BLOCK_SIZE * additionalBlockCount); |
| verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount); |
| |
| // After appending data, each new volume added should |
| // have 1 block each. |
| List<Integer> expectedNumBlocks = Arrays.asList(1, 1, 1, 1, 1, 4, 4); |
| |
| List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = |
| cluster.getAllBlockReports(bpid); |
| assertEquals(1, blockReports.size()); // 1 DataNode |
| assertEquals(numVolumes, blockReports.get(0).size()); // 7 volumes |
| Map<DatanodeStorage, BlockListAsLongs> dnReport = |
| blockReports.get(0); |
| List<Integer> actualNumBlocks = new ArrayList<Integer>(); |
| for (BlockListAsLongs blockList : dnReport.values()) { |
| actualNumBlocks.add(blockList.getNumberOfBlocks()); |
| } |
| Collections.sort(actualNumBlocks); |
| assertEquals(expectedNumBlocks, actualNumBlocks); |
| } |
| |
| @Test(timeout=180000) |
| public void testAddVolumesConcurrently() |
| throws IOException, InterruptedException, TimeoutException, |
| ReconfigurationException { |
| startDFSCluster(1, 1, 10); |
| int numVolumes = cluster.getStoragesPerDatanode(); |
| String blockPoolId = cluster.getNamesystem().getBlockPoolId(); |
| Path testFile = new Path("/test"); |
| |
| // Each volume has 2 blocks |
| int initialBlockCount = numVolumes * 2; |
| createFile(testFile, initialBlockCount); |
| |
| DataNode dn = cluster.getDataNodes().get(0); |
| final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data; |
| dn.data = Mockito.spy(data); |
| |
| final int newVolumeCount = 40; |
| List<Thread> addVolumeDelayedThreads = new ArrayList<>(); |
| AtomicBoolean addVolumeError = new AtomicBoolean(false); |
| AtomicBoolean listStorageError = new AtomicBoolean(false); |
| CountDownLatch addVolumeCompletionLatch = |
| new CountDownLatch(newVolumeCount); |
| |
| // Thread to list all storage available at DataNode, |
| // when the volumes are being added in parallel. |
| final Thread listStorageThread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| while (addVolumeCompletionLatch.getCount() != newVolumeCount) { |
| int i = 0; |
| while(i++ < 1000) { |
| try { |
| dn.getStorage().listStorageDirectories(); |
| } catch (Exception e) { |
| listStorageError.set(true); |
| LOG.error("Error listing storage: " + e); |
| } |
| } |
| } |
| } |
| }); |
| listStorageThread.start(); |
| |
| // FsDatasetImpl addVolume mocked to perform the operation asynchronously |
| doAnswer(new Answer<Object>() { |
| @Override |
| public Object answer(InvocationOnMock invocationOnMock) throws Throwable { |
| final Random r = new Random(); |
| Thread addVolThread = |
| new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| r.setSeed(Time.now()); |
| // Let 50% of add volume operations |
| // start after an initial delay. |
| if (r.nextInt(10) > 4) { |
| int s = r.nextInt(10) + 1; |
| Thread.sleep(s * 100); |
| } |
| invocationOnMock.callRealMethod(); |
| } catch (Throwable throwable) { |
| addVolumeError.set(true); |
| LOG.error("Error adding volume: " + throwable); |
| } finally { |
| addVolumeCompletionLatch.countDown(); |
| } |
| } |
| }); |
| addVolumeDelayedThreads.add(addVolThread); |
| addVolThread.start(); |
| return null; |
| } |
| }).when(dn.data).addVolume(any(StorageLocation.class), any(List.class)); |
| |
| addVolumes(newVolumeCount, addVolumeCompletionLatch); |
| numVolumes += newVolumeCount; |
| |
| // Wait for all addVolume and listStorage Threads to complete |
| for (Thread t : addVolumeDelayedThreads) { |
| t.join(); |
| } |
| listStorageThread.join(); |
| |
| // Verify errors while adding volumes and listing storage directories |
| Assert.assertEquals("Error adding volumes!", false, addVolumeError.get()); |
| Assert.assertEquals("Error listing storage!", |
| false, listStorageError.get()); |
| |
| int additionalBlockCount = 9; |
| int totalBlockCount = initialBlockCount + additionalBlockCount; |
| |
| // Continue to write the same file, thus the new volumes will have blocks. |
| DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, |
| BLOCK_SIZE * additionalBlockCount); |
| verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount); |
| |
| List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = |
| cluster.getAllBlockReports(blockPoolId); |
| assertEquals(1, blockReports.size()); |
| assertEquals(numVolumes, blockReports.get(0).size()); |
| } |
| |
| @Test(timeout=60000) |
| public void testAddVolumesToFederationNN() |
| throws IOException, TimeoutException, InterruptedException, |
| ReconfigurationException { |
| // Starts a Cluster with 2 NameNode and 3 DataNodes. Each DataNode has 2 |
| // volumes. |
| final int numNameNodes = 2; |
| final int numDataNodes = 1; |
| startDFSCluster(numNameNodes, numDataNodes); |
| Path testFile = new Path("/test"); |
| // Create a file on the first namespace with 4 blocks. |
| createFile(0, testFile, 4); |
| // Create a file on the second namespace with 4 blocks. |
| createFile(1, testFile, 4); |
| |
| // Add 2 volumes to the first DataNode. |
| final int numNewVolumes = 2; |
| addVolumes(numNewVolumes); |
| |
| // Append to the file on the first namespace. |
| DFSTestUtil.appendFile(cluster.getFileSystem(0), testFile, BLOCK_SIZE * 8); |
| |
| List<List<Integer>> actualNumBlocks = getNumBlocksReport(0); |
| assertEquals(cluster.getDataNodes().size(), actualNumBlocks.size()); |
| List<Integer> blocksOnFirstDN = actualNumBlocks.get(0); |
| Collections.sort(blocksOnFirstDN); |
| assertEquals(Arrays.asList(2, 2, 4, 4), blocksOnFirstDN); |
| |
| // Verify the second namespace also has the new volumes and they are empty. |
| actualNumBlocks = getNumBlocksReport(1); |
| assertEquals(4, actualNumBlocks.get(0).size()); |
| assertEquals(numNewVolumes, |
| Collections.frequency(actualNumBlocks.get(0), 0)); |
| } |
| |
| @Test(timeout=60000) |
| public void testRemoveOneVolume() |
| throws ReconfigurationException, InterruptedException, TimeoutException, |
| IOException { |
| startDFSCluster(1, 1); |
| final short replFactor = 1; |
| Path testFile = new Path("/test"); |
| createFile(testFile, 10, replFactor); |
| |
| DataNode dn = cluster.getDataNodes().get(0); |
| Collection<String> oldDirs = getDataDirs(dn); |
| String newDirs = oldDirs.iterator().next(); // Keep the first volume. |
| assertThat( |
| "DN did not update its own config", |
| dn.reconfigurePropertyImpl( |
| DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs), |
| is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); |
| assertFileLocksReleased( |
| new ArrayList<String>(oldDirs).subList(1, oldDirs.size())); |
| dn.scheduleAllBlockReport(0); |
| |
| try { |
| DFSTestUtil.readFile(cluster.getFileSystem(), testFile); |
| fail("Expect to throw BlockMissingException."); |
| } catch (BlockMissingException e) { |
| GenericTestUtils.assertExceptionContains("Could not obtain block", e); |
| } |
| |
| Path newFile = new Path("/newFile"); |
| createFile(newFile, 6); |
| |
| String bpid = cluster.getNamesystem().getBlockPoolId(); |
| List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = |
| cluster.getAllBlockReports(bpid); |
| assertEquals((int)replFactor, blockReports.size()); |
| |
| BlockListAsLongs blocksForVolume1 = |
| blockReports.get(0).values().iterator().next(); |
| // The first volume has half of the testFile and full of newFile. |
| assertEquals(10 / 2 + 6, blocksForVolume1.getNumberOfBlocks()); |
| } |
| |
| @Test(timeout=60000) |
| public void testReplicatingAfterRemoveVolume() |
| throws InterruptedException, TimeoutException, IOException, |
| ReconfigurationException { |
| startDFSCluster(1, 2); |
| |
| final FileSystem fs = cluster.getFileSystem(); |
| final short replFactor = 2; |
| Path testFile = new Path("/test"); |
| createFile(testFile, 4, replFactor); |
| |
| DataNode dn = cluster.getDataNodes().get(0); |
| Collection<String> oldDirs = getDataDirs(dn); |
| // Findout the storage with block and remove it |
| ExtendedBlock block = |
| DFSTestUtil.getAllBlocks(fs, testFile).get(1).getBlock(); |
| FsVolumeSpi volumeWithBlock = dn.getFSDataset().getVolume(block); |
| String dirWithBlock = "[" + volumeWithBlock.getStorageType() + "]" + |
| volumeWithBlock.getStorageLocation().getUri(); |
| String newDirs = dirWithBlock; |
| for (String dir : oldDirs) { |
| if (dirWithBlock.startsWith(dir)) { |
| continue; |
| } |
| newDirs = dir; |
| break; |
| } |
| assertThat( |
| "DN did not update its own config", |
| dn.reconfigurePropertyImpl( |
| DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs), |
| is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); |
| oldDirs.remove(newDirs); |
| assertFileLocksReleased(oldDirs); |
| |
| triggerDeleteReport(dn); |
| |
| waitReplication(fs, testFile, 1, 1); |
| DFSTestUtil.waitReplication(fs, testFile, replFactor); |
| } |
| |
| @Test |
| public void testAddVolumeFailures() throws IOException { |
| startDFSCluster(1, 1); |
| final String dataDir = cluster.getDataDirectory(); |
| |
| DataNode dn = cluster.getDataNodes().get(0); |
| List<String> newDirs = Lists.newArrayList(); |
| final int NUM_NEW_DIRS = 4; |
| for (int i = 0; i < NUM_NEW_DIRS; i++) { |
| File newVolume = new File(dataDir, "new_vol" + i); |
| newDirs.add(newVolume.toString()); |
| if (i % 2 == 0) { |
| // Make addVolume() fail. |
| newVolume.createNewFile(); |
| } |
| } |
| |
| String newValue = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY) + "," + |
| Joiner.on(",").join(newDirs); |
| try { |
| dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newValue); |
| fail("Expect to throw IOException."); |
| } catch (ReconfigurationException e) { |
| String errorMessage = e.getCause().getMessage(); |
| String messages[] = errorMessage.split("\\r?\\n"); |
| assertEquals(2, messages.length); |
| assertThat(messages[0], containsString("new_vol0")); |
| assertThat(messages[1], containsString("new_vol2")); |
| } |
| |
| // Make sure that vol0 and vol2's metadata are not left in memory. |
| FsDatasetSpi<?> dataset = dn.getFSDataset(); |
| try (FsDatasetSpi.FsVolumeReferences volumes = |
| dataset.getFsVolumeReferences()) { |
| for (FsVolumeSpi volume : volumes) { |
| assertThat(new File(volume.getStorageLocation().getUri()).toString(), |
| is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2)))))); |
| } |
| } |
| DataStorage storage = dn.getStorage(); |
| for (int i = 0; i < storage.getNumStorageDirs(); i++) { |
| Storage.StorageDirectory sd = storage.getStorageDir(i); |
| assertThat(sd.getRoot().toString(), |
| is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2)))))); |
| } |
| |
| // The newly effective conf does not have vol0 and vol2. |
| String[] effectiveVolumes = |
| dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY).split(","); |
| assertEquals(4, effectiveVolumes.length); |
| for (String ev : effectiveVolumes) { |
| assertThat( |
| new File(StorageLocation.parse(ev).getUri()).getCanonicalPath(), |
| is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2))))) |
| ); |
| } |
| } |
| |
| /** |
| * Asserts that the storage lock file in each given directory has been |
| * released. This method works by trying to acquire the lock file itself. If |
| * locking fails here, then the main code must have failed to release it. |
| * |
| * @param dirs every storage directory to check |
| * @throws IOException if there is an unexpected I/O error |
| */ |
| private static void assertFileLocksReleased(Collection<String> dirs) |
| throws IOException { |
| for (String dir: dirs) { |
| try { |
| FsDatasetTestUtil.assertFileLockReleased(dir); |
| } catch (IOException e) { |
| LOG.warn(e); |
| } |
| } |
| } |
| |
| @Test(timeout=600000) |
| public void testRemoveVolumeBeingWritten() |
| throws InterruptedException, TimeoutException, ReconfigurationException, |
| IOException, BrokenBarrierException { |
| // test against removing volumes on the different DataNode on the pipeline. |
| for (int i = 0; i < 3; i++) { |
| testRemoveVolumeBeingWrittenForDatanode(i); |
| } |
| } |
| |
| /** |
| * Test the case that remove a data volume on a particular DataNode when the |
| * volume is actively being written. |
| * @param dataNodeIdx the index of the DataNode to remove a volume. |
| */ |
| private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx) |
| throws IOException, ReconfigurationException, TimeoutException, |
| InterruptedException, BrokenBarrierException { |
| startDFSCluster(1, 4); |
| |
| final short REPLICATION = 3; |
| final DistributedFileSystem fs = cluster.getFileSystem(); |
| final DFSClient client = fs.getClient(); |
| final Path testFile = new Path("/test"); |
| FSDataOutputStream out = fs.create(testFile, REPLICATION); |
| |
| Random rb = new Random(0); |
| byte[] writeBuf = new byte[BLOCK_SIZE / 2]; // half of the block. |
| rb.nextBytes(writeBuf); |
| out.write(writeBuf); |
| out.hflush(); |
| |
| BlockLocation[] blocks = fs.getFileBlockLocations(testFile, 0, BLOCK_SIZE); |
| String[] dataNodeNames = blocks[0].getNames(); |
| String dataNodeName = dataNodeNames[dataNodeIdx]; |
| int xferPort = Integer.parseInt(dataNodeName.split(":")[1]); |
| DataNode dn = null; |
| for (DataNode dataNode : cluster.getDataNodes()) { |
| if (dataNode.getXferPort() == xferPort) { |
| dn = dataNode; |
| break; |
| } |
| } |
| assertNotNull(dn); |
| |
| final CyclicBarrier barrier = new CyclicBarrier(4); |
| final AtomicBoolean done = new AtomicBoolean(false); |
| DataNodeFaultInjector newInjector = new DataNodeFaultInjector() { |
| public void logDelaySendingAckToUpstream( |
| final String upstreamAddr, final long delayMs) throws IOException { |
| try { |
| // Make all streams which hold the volume references to wait the |
| // reconfiguration thread to start. |
| // It should only block IO during the period of reconfiguration |
| // task running. |
| if (!done.get()) { |
| barrier.await(); |
| // Add delays to allow the reconfiguration thread starts before |
| // IO finish. |
| Thread.sleep(1000); |
| } |
| } catch (InterruptedException | BrokenBarrierException e) { |
| throw new IOException(e); |
| } |
| } |
| }; |
| DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); |
| |
| try { |
| DataNodeFaultInjector.set(newInjector); |
| |
| List<String> oldDirs = getDataDirs(dn); |
| LocatedBlocks lbs = client.getLocatedBlocks("/test", 0); |
| LocatedBlock block = lbs.get(0); |
| FsVolumeImpl volume = |
| (FsVolumeImpl) dn.getFSDataset().getVolume(block.getBlock()); |
| final String newDirs = oldDirs.stream() |
| .filter((d) -> !d.contains(volume.getStorageLocation().toString())) |
| .collect(Collectors.joining(",")); |
| final List<IOException> exceptions = new ArrayList<>(); |
| final DataNode dataNode = dn; |
| final CyclicBarrier reconfigBarrier = new CyclicBarrier(2); |
| |
| Thread reconfigThread = new Thread(() -> { |
| try { |
| reconfigBarrier.await(); |
| |
| // Wake up writing threads on the pipeline to finish the block. |
| barrier.await(); |
| |
| assertThat( |
| "DN did not update its own config", |
| dataNode.reconfigurePropertyImpl( |
| DFS_DATANODE_DATA_DIR_KEY, newDirs), |
| is(dataNode.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); |
| done.set(true); |
| } catch (ReconfigurationException | |
| InterruptedException | |
| BrokenBarrierException e) { |
| exceptions.add(new IOException(e)); |
| } |
| }); |
| reconfigThread.start(); |
| |
| // Write more data to make sure the stream threads wait on the barrier. |
| rb.nextBytes(writeBuf); |
| out.write(writeBuf); |
| reconfigBarrier.await(); |
| out.hflush(); |
| out.close(); |
| |
| reconfigThread.join(); |
| |
| if (!exceptions.isEmpty()) { |
| throw MultipleIOException.createIOException(exceptions); |
| } |
| } finally { |
| DataNodeFaultInjector.set(oldInjector); |
| } |
| |
| // Verify if the data directory reconfigure was successful |
| FsDatasetSpi<? extends FsVolumeSpi> fsDatasetSpi = dn.getFSDataset(); |
| try (FsDatasetSpi.FsVolumeReferences fsVolumeReferences = fsDatasetSpi |
| .getFsVolumeReferences()) { |
| for (int i =0; i < fsVolumeReferences.size(); i++) { |
| System.out.println("Vol: " + |
| fsVolumeReferences.get(i).getBaseURI().toString()); |
| } |
| assertEquals("Volume remove wasn't successful.", |
| 1, fsVolumeReferences.size()); |
| } |
| |
| // Verify the file has sufficient replications. |
| DFSTestUtil.waitReplication(fs, testFile, REPLICATION); |
| // Read the content back |
| byte[] content = DFSTestUtil.readFileBuffer(fs, testFile); |
| assertEquals(BLOCK_SIZE, content.length); |
| |
| // Write more files to make sure that the DataNode that has removed volume |
| // is still alive to receive data. |
| for (int i = 0; i < 10; i++) { |
| final Path file = new Path("/after-" + i); |
| try (FSDataOutputStream fout = fs.create(file, REPLICATION)) { |
| rb.nextBytes(writeBuf); |
| fout.write(writeBuf); |
| } |
| } |
| |
| try (FsDatasetSpi.FsVolumeReferences fsVolumeReferences = fsDatasetSpi |
| .getFsVolumeReferences()) { |
| assertEquals("Volume remove wasn't successful.", |
| 1, fsVolumeReferences.size()); |
| FsVolumeSpi volume = fsVolumeReferences.get(0); |
| String bpid = cluster.getNamesystem().getBlockPoolId(); |
| FsVolumeSpi.BlockIterator blkIter = volume.newBlockIterator(bpid, "test"); |
| int blockCount = 0; |
| while (!blkIter.atEnd()) { |
| blkIter.nextBlock(); |
| blockCount++; |
| } |
| assertTrue(String.format("DataNode(%d) should have more than 1 blocks", |
| dataNodeIdx), blockCount > 1); |
| } |
| } |
| |
| @Test(timeout=60000) |
| public void testAddBackRemovedVolume() |
| throws IOException, TimeoutException, InterruptedException, |
| ReconfigurationException { |
| startDFSCluster(1, 2); |
| // Create some data on every volume. |
| createFile(new Path("/test"), 32); |
| |
| DataNode dn = cluster.getDataNodes().get(0); |
| Configuration conf = dn.getConf(); |
| String oldDataDir = conf.get(DFS_DATANODE_DATA_DIR_KEY); |
| String keepDataDir = oldDataDir.split(",")[0]; |
| String removeDataDir = oldDataDir.split(",")[1]; |
| |
| assertThat( |
| "DN did not update its own config", |
| dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, keepDataDir), |
| is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); |
| for (int i = 0; i < cluster.getNumNameNodes(); i++) { |
| String bpid = cluster.getNamesystem(i).getBlockPoolId(); |
| BlockPoolSliceStorage bpsStorage = |
| dn.getStorage().getBPStorage(bpid); |
| // Make sure that there is no block pool level storage under removeDataDir. |
| for (int j = 0; j < bpsStorage.getNumStorageDirs(); j++) { |
| Storage.StorageDirectory sd = bpsStorage.getStorageDir(j); |
| assertFalse(sd.getRoot().getAbsolutePath().startsWith( |
| new File(removeDataDir).getAbsolutePath() |
| )); |
| } |
| assertEquals(dn.getStorage().getBPStorage(bpid).getNumStorageDirs(), 1); |
| } |
| |
| // Bring the removed directory back. It only successes if all metadata about |
| // this directory were removed from the previous step. |
| assertThat( |
| "DN did not update its own config", |
| dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir), |
| is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); |
| } |
| |
| /** |
| * Verify that {@link DataNode#checkDiskError()} removes all metadata in |
| * DataNode upon a volume failure. Thus we can run reconfig on the same |
| * configuration to reload the new volume on the same directory as the failed one. |
| */ |
| @Test(timeout=60000) |
| public void testDirectlyReloadAfterCheckDiskError() |
| throws Exception { |
| // The test uses DataNodeTestUtils#injectDataDirFailure() to simulate |
| // volume failures which is currently not supported on Windows. |
| assumeNotWindows(); |
| |
| startDFSCluster(1, 2); |
| createFile(new Path("/test"), 32, (short)2); |
| |
| DataNode dn = cluster.getDataNodes().get(0); |
| final String oldDataDir = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY); |
| File dirToFail = cluster.getInstanceStorageDir(0, 0); |
| |
| FsVolumeImpl failedVolume = DataNodeTestUtils.getVolume(dn, dirToFail); |
| assertTrue("No FsVolume was found for " + dirToFail, |
| failedVolume != null); |
| long used = failedVolume.getDfsUsed(); |
| |
| DataNodeTestUtils.injectDataDirFailure(dirToFail); |
| // Call and wait DataNode to detect disk failure. |
| DataNodeTestUtils.waitForDiskError(dn, failedVolume); |
| |
| createFile(new Path("/test1"), 32, (short)2); |
| assertEquals(used, failedVolume.getDfsUsed()); |
| |
| DataNodeTestUtils.restoreDataDirFromFailure(dirToFail); |
| LOG.info("reconfiguring DN "); |
| assertThat( |
| "DN did not update its own config", |
| dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir), |
| is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); |
| |
| createFile(new Path("/test2"), 32, (short)2); |
| FsVolumeImpl restoredVolume = DataNodeTestUtils.getVolume(dn, dirToFail); |
| assertTrue(restoredVolume != null); |
| assertTrue(restoredVolume != failedVolume); |
| // More data has been written to this volume. |
| assertTrue(restoredVolume.getDfsUsed() > used); |
| } |
| |
| /** Test that a full block report is sent after hot swapping volumes */ |
| @Test(timeout=100000) |
| public void testFullBlockReportAfterRemovingVolumes() |
| throws IOException, ReconfigurationException { |
| |
| Configuration conf = new Configuration(); |
| conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); |
| |
| // Similar to TestTriggerBlockReport, set a really long value for |
| // dfs.heartbeat.interval, so that incremental block reports and heartbeats |
| // won't be sent during this test unless they're triggered |
| // manually. |
| conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L); |
| conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L); |
| |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); |
| cluster.waitActive(); |
| |
| final DataNode dn = cluster.getDataNodes().get(0); |
| DatanodeProtocolClientSideTranslatorPB spy = |
| InternalDataNodeTestUtils.spyOnBposToNN(dn, cluster.getNameNode()); |
| |
| // Remove a data dir from datanode |
| File dataDirToKeep = cluster.getInstanceStorageDir(0, 0); |
| assertThat( |
| "DN did not update its own config", |
| dn.reconfigurePropertyImpl( |
| DFS_DATANODE_DATA_DIR_KEY, dataDirToKeep.toString()), |
| is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); |
| |
| // We should get 1 full report |
| Mockito.verify(spy, timeout(60000).times(1)).blockReport( |
| any(DatanodeRegistration.class), |
| anyString(), |
| any(StorageBlockReport[].class), |
| any(BlockReportContext.class)); |
| } |
| } |