| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; |
| |
| import com.google.common.base.Supplier; |
| import com.google.common.collect.Lists; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileSystemTestHelper; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; |
| import org.apache.hadoop.hdfs.server.common.Storage; |
| import org.apache.hadoop.hdfs.server.common.StorageInfo; |
| import org.apache.hadoop.hdfs.server.datanode.BlockScanner; |
| import org.apache.hadoop.hdfs.server.datanode.DNConf; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; |
| import org.apache.hadoop.hdfs.server.datanode.DataStorage; |
| import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; |
| import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; |
| import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; |
| import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; |
| import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; |
| import org.apache.hadoop.hdfs.server.datanode.StorageLocation; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; |
| import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; |
| import org.apache.hadoop.io.MultipleIOException; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.FakeTimer; |
| import org.apache.hadoop.util.StringUtils; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Matchers; |
| import org.mockito.Mockito; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStreamWriter; |
| import java.io.Writer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; |
| import static org.hamcrest.core.Is.is; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertSame; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.anyListOf; |
| import static org.mockito.Matchers.anyString; |
| import static org.mockito.Matchers.eq; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.doThrow; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.when; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class TestFsDatasetImpl { |
| Logger LOG = LoggerFactory.getLogger(TestFsDatasetImpl.class); |
| private static final String BASE_DIR = |
| new FileSystemTestHelper().getTestRootDir(); |
| private static final int NUM_INIT_VOLUMES = 2; |
| private static final String CLUSTER_ID = "cluser-id"; |
| private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"}; |
| |
| // Use to generate storageUuid |
| private static final DataStorage dsForStorageUuid = new DataStorage( |
| new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE)); |
| |
| private Configuration conf; |
| private DataNode datanode; |
| private DataStorage storage; |
| private FsDatasetImpl dataset; |
| |
| private final static String BLOCKPOOL = "BP-TEST"; |
| |
| private static Storage.StorageDirectory createStorageDirectory(File root) { |
| Storage.StorageDirectory sd = new Storage.StorageDirectory(root); |
| DataStorage.createStorageID(sd, false); |
| return sd; |
| } |
| |
| private static void createStorageDirs(DataStorage storage, Configuration conf, |
| int numDirs) throws IOException { |
| List<Storage.StorageDirectory> dirs = |
| new ArrayList<Storage.StorageDirectory>(); |
| List<String> dirStrings = new ArrayList<String>(); |
| FileUtils.deleteDirectory(new File(BASE_DIR)); |
| for (int i = 0; i < numDirs; i++) { |
| File loc = new File(BASE_DIR + "/data" + i); |
| dirStrings.add(new Path(loc.toString()).toUri().toString()); |
| loc.mkdirs(); |
| dirs.add(createStorageDirectory(loc)); |
| when(storage.getStorageDir(i)).thenReturn(dirs.get(i)); |
| } |
| |
| String dataDir = StringUtils.join(",", dirStrings); |
| conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir); |
| when(storage.dirIterator()).thenReturn(dirs.iterator()); |
| when(storage.getNumStorageDirs()).thenReturn(numDirs); |
| } |
| |
| private int getNumVolumes() { |
| try (FsDatasetSpi.FsVolumeReferences volumes = |
| dataset.getFsVolumeReferences()) { |
| return volumes.size(); |
| } catch (IOException e) { |
| return 0; |
| } |
| } |
| |
| @Before |
| public void setUp() throws IOException { |
| datanode = mock(DataNode.class); |
| storage = mock(DataStorage.class); |
| this.conf = new Configuration(); |
| this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); |
| |
| final FileIoProvider fileIoProvider = new FileIoProvider(conf, null); |
| when(datanode.getFileIoProvider()).thenReturn(fileIoProvider); |
| when(datanode.getConf()).thenReturn(conf); |
| final DNConf dnConf = new DNConf(datanode); |
| when(datanode.getDnConf()).thenReturn(dnConf); |
| final BlockScanner disabledBlockScanner = new BlockScanner(datanode); |
| when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner); |
| final ShortCircuitRegistry shortCircuitRegistry = |
| new ShortCircuitRegistry(conf); |
| when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry); |
| |
| createStorageDirs(storage, conf, NUM_INIT_VOLUMES); |
| dataset = new FsDatasetImpl(datanode, storage, conf); |
| for (String bpid : BLOCK_POOL_IDS) { |
| dataset.addBlockPool(bpid, conf); |
| } |
| |
| assertEquals(NUM_INIT_VOLUMES, getNumVolumes()); |
| assertEquals(0, dataset.getNumFailedVolumes()); |
| } |
| |
| @Test |
| public void testAddVolumes() throws IOException { |
| final int numNewVolumes = 3; |
| final int numExistingVolumes = getNumVolumes(); |
| final int totalVolumes = numNewVolumes + numExistingVolumes; |
| Set<String> expectedVolumes = new HashSet<String>(); |
| List<NamespaceInfo> nsInfos = Lists.newArrayList(); |
| for (String bpid : BLOCK_POOL_IDS) { |
| nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); |
| } |
| for (int i = 0; i < numNewVolumes; i++) { |
| String path = BASE_DIR + "/newData" + i; |
| String pathUri = new Path(path).toUri().toString(); |
| expectedVolumes.add(new File(pathUri).toString()); |
| StorageLocation loc = StorageLocation.parse(pathUri); |
| Storage.StorageDirectory sd = createStorageDirectory(new File(path)); |
| DataStorage.VolumeBuilder builder = |
| new DataStorage.VolumeBuilder(storage, sd); |
| when(storage.prepareVolume(eq(datanode), eq(loc.getFile()), |
| anyListOf(NamespaceInfo.class))) |
| .thenReturn(builder); |
| |
| dataset.addVolume(loc, nsInfos); |
| } |
| |
| assertEquals(totalVolumes, getNumVolumes()); |
| assertEquals(totalVolumes, dataset.storageMap.size()); |
| |
| Set<String> actualVolumes = new HashSet<String>(); |
| try (FsDatasetSpi.FsVolumeReferences volumes = |
| dataset.getFsVolumeReferences()) { |
| for (int i = 0; i < numNewVolumes; i++) { |
| actualVolumes.add(volumes.get(numExistingVolumes + i).getBasePath()); |
| } |
| } |
| assertEquals(actualVolumes.size(), expectedVolumes.size()); |
| assertTrue(actualVolumes.containsAll(expectedVolumes)); |
| } |
| |
| @Test |
| public void testAddVolumeWithSameStorageUuid() throws IOException { |
| HdfsConfiguration conf = new HdfsConfiguration(); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(1).build(); |
| try { |
| cluster.waitActive(); |
| assertTrue(cluster.getDataNodes().get(0).isConnectedToNN( |
| cluster.getNameNode().getServiceRpcAddress())); |
| |
| MiniDFSCluster.DataNodeProperties dn = cluster.stopDataNode(0); |
| File vol0 = cluster.getStorageDir(0, 0); |
| File vol1 = cluster.getStorageDir(0, 1); |
| Storage.StorageDirectory sd0 = new Storage.StorageDirectory(vol0); |
| Storage.StorageDirectory sd1 = new Storage.StorageDirectory(vol1); |
| FileUtils.copyFile(sd0.getVersionFile(), sd1.getVersionFile()); |
| |
| cluster.restartDataNode(dn, true); |
| cluster.waitActive(); |
| assertFalse(cluster.getDataNodes().get(0).isConnectedToNN( |
| cluster.getNameNode().getServiceRpcAddress())); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testRemoveVolumes() throws IOException { |
| // Feed FsDataset with block metadata. |
| final int NUM_BLOCKS = 100; |
| for (int i = 0; i < NUM_BLOCKS; i++) { |
| String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length]; |
| ExtendedBlock eb = new ExtendedBlock(bpid, i); |
| try (ReplicaHandler replica = |
| dataset.createRbw(StorageType.DEFAULT, eb, false)) { |
| } |
| } |
| final String[] dataDirs = |
| conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(","); |
| final String volumePathToRemove = dataDirs[0]; |
| Set<File> volumesToRemove = new HashSet<>(); |
| volumesToRemove.add(StorageLocation.parse(volumePathToRemove).getFile() |
| .getAbsoluteFile()); |
| |
| dataset.removeVolumes(volumesToRemove, true); |
| int expectedNumVolumes = dataDirs.length - 1; |
| assertEquals("The volume has been removed from the volumeList.", |
| expectedNumVolumes, getNumVolumes()); |
| assertEquals("The volume has been removed from the storageMap.", |
| expectedNumVolumes, dataset.storageMap.size()); |
| |
| try { |
| dataset.asyncDiskService.execute(volumesToRemove.iterator().next(), |
| new Runnable() { |
| @Override |
| public void run() {} |
| }); |
| fail("Expect RuntimeException: the volume has been removed from the " |
| + "AsyncDiskService."); |
| } catch (RuntimeException e) { |
| GenericTestUtils.assertExceptionContains("Cannot find root", e); |
| } |
| |
| int totalNumReplicas = 0; |
| for (String bpid : dataset.volumeMap.getBlockPoolList()) { |
| totalNumReplicas += dataset.volumeMap.size(bpid); |
| } |
| assertEquals("The replica infos on this volume has been removed from the " |
| + "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES, |
| totalNumReplicas); |
| } |
| |
| @Test(timeout = 5000) |
| public void testRemoveNewlyAddedVolume() throws IOException { |
| final int numExistingVolumes = getNumVolumes(); |
| List<NamespaceInfo> nsInfos = new ArrayList<>(); |
| for (String bpid : BLOCK_POOL_IDS) { |
| nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); |
| } |
| String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater"; |
| StorageLocation loc = StorageLocation.parse(newVolumePath); |
| |
| Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath)); |
| DataStorage.VolumeBuilder builder = |
| new DataStorage.VolumeBuilder(storage, sd); |
| when(storage.prepareVolume(eq(datanode), eq(loc.getFile()), |
| anyListOf(NamespaceInfo.class))) |
| .thenReturn(builder); |
| |
| dataset.addVolume(loc, nsInfos); |
| assertEquals(numExistingVolumes + 1, getNumVolumes()); |
| |
| when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1); |
| when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd); |
| Set<File> volumesToRemove = new HashSet<>(); |
| volumesToRemove.add(loc.getFile().getAbsoluteFile()); |
| dataset.removeVolumes(volumesToRemove, true); |
| assertEquals(numExistingVolumes, getNumVolumes()); |
| } |
| |
| @Test |
| public void testAddVolumeFailureReleasesInUseLock() throws IOException { |
| FsDatasetImpl spyDataset = spy(dataset); |
| FsVolumeImpl mockVolume = mock(FsVolumeImpl.class); |
| File badDir = new File(BASE_DIR, "bad").getAbsoluteFile(); |
| badDir.mkdirs(); |
| doReturn(mockVolume).when(spyDataset) |
| .createFsVolume(anyString(), any(File.class), any(StorageType.class)); |
| doThrow(new IOException("Failed to getVolumeMap()")) |
| .when(mockVolume).getVolumeMap( |
| anyString(), |
| any(ReplicaMap.class), |
| any(RamDiskReplicaLruTracker.class)); |
| |
| Storage.StorageDirectory sd = createStorageDirectory(badDir); |
| sd.lock(); |
| DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); |
| when(storage.prepareVolume(eq(datanode), eq(badDir.getAbsoluteFile()), |
| Matchers.<List<NamespaceInfo>>any())) |
| .thenReturn(builder); |
| |
| StorageLocation location = StorageLocation.parse(badDir.toString()); |
| List<NamespaceInfo> nsInfos = Lists.newArrayList(); |
| for (String bpid : BLOCK_POOL_IDS) { |
| nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); |
| } |
| |
| try { |
| spyDataset.addVolume(location, nsInfos); |
| fail("Expect to throw MultipleIOException"); |
| } catch (MultipleIOException e) { |
| } |
| |
| FsDatasetTestUtil.assertFileLockReleased(badDir.toString()); |
| } |
| |
| @Test |
| public void testDeletingBlocks() throws IOException { |
| HdfsConfiguration conf = new HdfsConfiguration(); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); |
| try { |
| cluster.waitActive(); |
| DataNode dn = cluster.getDataNodes().get(0); |
| |
| FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn); |
| ds.addBlockPool(BLOCKPOOL, conf); |
| FsVolumeImpl vol; |
| try (FsDatasetSpi.FsVolumeReferences volumes = ds.getFsVolumeReferences()) { |
| vol = (FsVolumeImpl)volumes.get(0); |
| } |
| |
| ExtendedBlock eb; |
| ReplicaInfo info; |
| List<Block> blockList = new ArrayList<>(); |
| for (int i = 1; i <= 63; i++) { |
| eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i); |
| cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb); |
| blockList.add(eb.getLocalBlock()); |
| } |
| ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| // Nothing to do |
| } |
| assertTrue(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId())); |
| |
| blockList.clear(); |
| eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064); |
| cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb); |
| blockList.add(eb.getLocalBlock()); |
| ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| // Nothing to do |
| } |
| assertFalse(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId())); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test |
| public void testDuplicateReplicaResolution() throws IOException { |
| FsVolumeImpl fsv1 = Mockito.mock(FsVolumeImpl.class); |
| FsVolumeImpl fsv2 = Mockito.mock(FsVolumeImpl.class); |
| |
| File f1 = new File("d1/block"); |
| File f2 = new File("d2/block"); |
| |
| ReplicaInfo replicaOlder = new FinalizedReplica(1,1,1,fsv1,f1); |
| ReplicaInfo replica = new FinalizedReplica(1,2,2,fsv1,f1); |
| ReplicaInfo replicaSame = new FinalizedReplica(1,2,2,fsv1,f1); |
| ReplicaInfo replicaNewer = new FinalizedReplica(1,3,3,fsv1,f1); |
| |
| ReplicaInfo replicaOtherOlder = new FinalizedReplica(1,1,1,fsv2,f2); |
| ReplicaInfo replicaOtherSame = new FinalizedReplica(1,2,2,fsv2,f2); |
| ReplicaInfo replicaOtherNewer = new FinalizedReplica(1,3,3,fsv2,f2); |
| |
| // equivalent path so don't remove either |
| assertNull(BlockPoolSlice.selectReplicaToDelete(replicaSame, replica)); |
| assertNull(BlockPoolSlice.selectReplicaToDelete(replicaOlder, replica)); |
| assertNull(BlockPoolSlice.selectReplicaToDelete(replicaNewer, replica)); |
| |
| // keep latest found replica |
| assertSame(replica, |
| BlockPoolSlice.selectReplicaToDelete(replicaOtherSame, replica)); |
| assertSame(replicaOtherOlder, |
| BlockPoolSlice.selectReplicaToDelete(replicaOtherOlder, replica)); |
| assertSame(replica, |
| BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica)); |
| } |
| |
| @Test |
| public void testLoadingDfsUsedForVolumes() throws IOException, |
| InterruptedException { |
| long waitIntervalTime = 5000; |
| // Initialize the cachedDfsUsedIntervalTime larger than waitIntervalTime |
| // to avoid cache-dfsused time expired |
| long cachedDfsUsedIntervalTime = waitIntervalTime + 1000; |
| conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS, |
| cachedDfsUsedIntervalTime); |
| |
| long cacheDfsUsed = 1024; |
| long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime); |
| |
| assertEquals(cacheDfsUsed, dfsUsed); |
| } |
| |
| @Test |
| public void testLoadingDfsUsedForVolumesExpired() throws IOException, |
| InterruptedException { |
| long waitIntervalTime = 5000; |
| // Initialize the cachedDfsUsedIntervalTime smaller than waitIntervalTime |
| // to make cache-dfsused time expired |
| long cachedDfsUsedIntervalTime = waitIntervalTime - 1000; |
| conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS, |
| cachedDfsUsedIntervalTime); |
| |
| long cacheDfsUsed = 1024; |
| long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime); |
| |
| // Because the cache-dfsused expired and the dfsUsed will be recalculated |
| assertTrue(cacheDfsUsed != dfsUsed); |
| } |
| |
| private long getDfsUsedValueOfNewVolume(long cacheDfsUsed, |
| long waitIntervalTime) throws IOException, InterruptedException { |
| List<NamespaceInfo> nsInfos = Lists.newArrayList(); |
| nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, BLOCK_POOL_IDS[0], 1)); |
| |
| String CURRENT_DIR = "current"; |
| String DU_CACHE_FILE = BlockPoolSlice.DU_CACHE_FILE; |
| String path = BASE_DIR + "/newData0"; |
| String pathUri = new Path(path).toUri().toString(); |
| StorageLocation loc = StorageLocation.parse(pathUri); |
| Storage.StorageDirectory sd = createStorageDirectory(new File(path)); |
| DataStorage.VolumeBuilder builder = |
| new DataStorage.VolumeBuilder(storage, sd); |
| when( |
| storage.prepareVolume(eq(datanode), eq(loc.getFile()), |
| anyListOf(NamespaceInfo.class))).thenReturn(builder); |
| |
| String cacheFilePath = |
| String.format("%s/%s/%s/%s/%s", path, CURRENT_DIR, BLOCK_POOL_IDS[0], |
| CURRENT_DIR, DU_CACHE_FILE); |
| File outFile = new File(cacheFilePath); |
| |
| if (!outFile.getParentFile().exists()) { |
| outFile.getParentFile().mkdirs(); |
| } |
| |
| if (outFile.exists()) { |
| outFile.delete(); |
| } |
| |
| FakeTimer timer = new FakeTimer(); |
| try { |
| try (Writer out = |
| new OutputStreamWriter(new FileOutputStream(outFile), |
| StandardCharsets.UTF_8)) { |
| // Write the dfsUsed value and the time to cache file |
| out.write(Long.toString(cacheDfsUsed) + " " |
| + Long.toString(timer.now())); |
| out.flush(); |
| } |
| } catch (IOException ioe) { |
| } |
| |
| dataset.setTimer(timer); |
| timer.advance(waitIntervalTime); |
| dataset.addVolume(loc, nsInfos); |
| |
| // Get the last volume which was just added before |
| FsVolumeImpl newVolume; |
| try (FsDatasetSpi.FsVolumeReferences volumes = |
| dataset.getFsVolumeReferences()) { |
| newVolume = (FsVolumeImpl) volumes.get(volumes.size() - 1); |
| } |
| long dfsUsed = newVolume.getDfsUsed(); |
| |
| return dfsUsed; |
| } |
| |
| @Test(timeout = 30000) |
| public void testRemoveVolumeBeingWritten() throws Exception { |
| // Will write and remove on dn0. |
| final ExtendedBlock eb = new ExtendedBlock(BLOCK_POOL_IDS[0], 0); |
| final CountDownLatch startFinalizeLatch = new CountDownLatch(1); |
| final CountDownLatch blockReportReceivedLatch = new CountDownLatch(1); |
| final CountDownLatch volRemoveStartedLatch = new CountDownLatch(1); |
| final CountDownLatch volRemoveCompletedLatch = new CountDownLatch(1); |
| class BlockReportThread extends Thread { |
| public void run() { |
| // Lets wait for the volume remove process to start |
| try { |
| volRemoveStartedLatch.await(); |
| } catch (Exception e) { |
| LOG.info("Unexpected exception when waiting for vol removal:", e); |
| } |
| LOG.info("Getting block report"); |
| dataset.getBlockReports(eb.getBlockPoolId()); |
| LOG.info("Successfully received block report"); |
| blockReportReceivedLatch.countDown(); |
| } |
| } |
| |
| class ResponderThread extends Thread { |
| public void run() { |
| try (ReplicaHandler replica = dataset |
| .createRbw(StorageType.DEFAULT, eb, false)) { |
| LOG.info("CreateRbw finished"); |
| startFinalizeLatch.countDown(); |
| |
| // Slow down while we're holding the reference to the volume. |
| // As we finalize a block, the volume is removed in parallel. |
| // Ignore any interrupts coming out of volume shutdown. |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException ie) { |
| LOG.info("Ignoring ", ie); |
| } |
| |
| // Lets wait for the other thread finish getting block report |
| blockReportReceivedLatch.await(); |
| |
| dataset.finalizeBlock(eb, false); |
| LOG.info("FinalizeBlock finished"); |
| } catch (Exception e) { |
| LOG.warn("Exception caught. This should not affect the test", e); |
| } |
| } |
| } |
| |
| class VolRemoveThread extends Thread { |
| public void run() { |
| Set<File> volumesToRemove = new HashSet<>(); |
| try { |
| volumesToRemove.add(StorageLocation.parse( |
| dataset.getVolume(eb).getBasePath()).getFile().getAbsoluteFile()); |
| } catch (Exception e) { |
| LOG.info("Problem preparing volumes to remove: ", e); |
| Assert.fail("Exception in remove volume thread, check log for " + |
| "details."); |
| } |
| LOG.info("Removing volume " + volumesToRemove); |
| dataset.removeVolumes(volumesToRemove, true); |
| volRemoveCompletedLatch.countDown(); |
| LOG.info("Removed volume " + volumesToRemove); |
| } |
| } |
| |
| // Start the volume write operation |
| ResponderThread responderThread = new ResponderThread(); |
| responderThread.start(); |
| startFinalizeLatch.await(); |
| |
| // Start the block report get operation |
| final BlockReportThread blockReportThread = new BlockReportThread(); |
| blockReportThread.start(); |
| |
| // Start the volume remove operation |
| VolRemoveThread volRemoveThread = new VolRemoveThread(); |
| volRemoveThread.start(); |
| |
| // Let volume write and remove operation be |
| // blocked for few seconds |
| Thread.sleep(2000); |
| |
| // Signal block report receiver and volume writer |
| // thread to complete their operations so that vol |
| // remove can proceed |
| volRemoveStartedLatch.countDown(); |
| |
| // Verify if block report can be received |
| // when volume is in use and also being removed |
| blockReportReceivedLatch.await(); |
| |
| // Verify if volume can be removed safely when there |
| // are read/write operation in-progress |
| volRemoveCompletedLatch.await(); |
| } |
| |
| /** |
| * Tests stopping all the active DataXceiver thread on volume failure event. |
| * @throws Exception |
| */ |
| @Test |
| public void testCleanShutdownOfVolume() throws Exception { |
| MiniDFSCluster cluster = null; |
| try { |
| Configuration config = new HdfsConfiguration(); |
| config.setLong( |
| DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000); |
| config.setTimeDuration( |
| DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, 0, |
| TimeUnit.MILLISECONDS); |
| config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); |
| |
| cluster = new MiniDFSCluster.Builder(config, |
| GenericTestUtils.getRandomizedTestDir()).numDataNodes(1).build(); |
| cluster.waitActive(); |
| FileSystem fs = cluster.getFileSystem(); |
| DataNode dataNode = cluster.getDataNodes().get(0); |
| Path filePath = new Path("test.dat"); |
| // Create a file and keep the output stream unclosed. |
| FSDataOutputStream out = fs.create(filePath, (short) 1); |
| out.write(1); |
| out.hflush(); |
| |
| ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); |
| final FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset(). |
| getVolume(block); |
| File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem() |
| .getBlockPoolId()); |
| LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0); |
| DatanodeInfo info = lb.getLocations()[0]; |
| |
| if (finalizedDir.exists()) { |
| // Remove write and execute access so that checkDiskErrorThread detects |
| // this volume is bad. |
| finalizedDir.setExecutable(false); |
| assertTrue(FileUtil.setWritable(finalizedDir, false)); |
| } |
| Assert.assertTrue("Reference count for the volume should be greater " |
| + "than 0", volume.getReferenceCount() > 0); |
| // Invoke the synchronous checkDiskError method |
| dataNode.checkDiskError(); |
| // Sleep for 1 second so that datanode can interrupt and cluster clean up |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override public Boolean get() { |
| return volume.getReferenceCount() == 0; |
| } |
| }, 100, 1000); |
| assertThat(dataNode.getFSDataset().getNumFailedVolumes(), is(1)); |
| |
| try { |
| out.close(); |
| Assert.fail("This is not a valid code path. " |
| + "out.close should have thrown an exception."); |
| } catch (IOException ioe) { |
| GenericTestUtils.assertExceptionContains(info.getXferAddr(), ioe); |
| } |
| finalizedDir.setExecutable(true); |
| assertTrue(FileUtil.setWritable(finalizedDir, true)); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testReportBadBlocks() throws Exception { |
| boolean threwException = false; |
| MiniDFSCluster cluster = null; |
| try { |
| Configuration config = new HdfsConfiguration(); |
| cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); |
| cluster.waitActive(); |
| |
| Assert.assertEquals(0, cluster.getNamesystem().getCorruptReplicaBlocks()); |
| DataNode dataNode = cluster.getDataNodes().get(0); |
| ExtendedBlock block = |
| new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(), 0); |
| try { |
| // Test the reportBadBlocks when the volume is null |
| dataNode.reportBadBlocks(block); |
| } catch (NullPointerException npe) { |
| threwException = true; |
| } |
| Thread.sleep(3000); |
| Assert.assertFalse(threwException); |
| Assert.assertEquals(0, cluster.getNamesystem().getCorruptReplicaBlocks()); |
| |
| FileSystem fs = cluster.getFileSystem(); |
| Path filePath = new Path("testData"); |
| DFSTestUtil.createFile(fs, filePath, 1, (short) 1, 0); |
| |
| block = DFSTestUtil.getFirstBlock(fs, filePath); |
| // Test for the overloaded method reportBadBlocks |
| dataNode.reportBadBlocks(block, dataNode.getFSDataset() |
| .getFsVolumeReferences().get(0)); |
| Thread.sleep(3000); |
| BlockManagerTestUtil.updateState(cluster.getNamesystem() |
| .getBlockManager()); |
| // Verify the bad block has been reported to namenode |
| Assert.assertEquals(1, cluster.getNamesystem().getCorruptReplicaBlocks()); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| } |