| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; |
| |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.util.concurrent.TimeoutException; |
| import java.util.function.Supplier; |
| |
| import org.apache.hadoop.fs.DF; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; |
| import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner; |
| import org.apache.hadoop.hdfs.server.datanode.LocalReplica; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.lang3.RandomStringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileSystemTestHelper; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.StorageType; |
| import org.apache.hadoop.hdfs.BlockReader; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; |
| import org.apache.hadoop.hdfs.server.common.Storage; |
| import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; |
| import org.apache.hadoop.hdfs.server.common.StorageInfo; |
| import org.apache.hadoop.hdfs.server.datanode.BlockScanner; |
| import org.apache.hadoop.hdfs.server.datanode.DNConf; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; |
| import org.apache.hadoop.hdfs.server.datanode.DataStorage; |
| import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; |
| import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; |
| import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; |
| import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; |
| import org.apache.hadoop.hdfs.server.datanode.StorageLocation; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; |
| import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; |
| import org.apache.hadoop.io.MultipleIOException; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.test.LambdaTestUtils; |
| import org.apache.hadoop.util.AutoCloseableLock; |
| import org.apache.hadoop.util.DiskChecker; |
| import org.apache.hadoop.util.FakeTimer; |
| import org.apache.hadoop.util.Lists; |
| import org.apache.hadoop.util.StringUtils; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.Mockito; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStreamWriter; |
| import java.io.Writer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; |
| import static org.hamcrest.core.Is.is; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertSame; |
| import static org.junit.Assert.fail; |
| import static org.mockito.ArgumentMatchers.anyList; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.anyString; |
| import static org.mockito.ArgumentMatchers.eq; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.doThrow; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class TestFsDatasetImpl { |
| Logger LOG = LoggerFactory.getLogger(TestFsDatasetImpl.class); |
| private static final String BASE_DIR = |
| new FileSystemTestHelper().getTestRootDir(); |
| private String replicaCacheRootDir = BASE_DIR + Path.SEPARATOR + "cache"; |
| private static final int NUM_INIT_VOLUMES = 2; |
| private static final String CLUSTER_ID = "cluser-id"; |
| private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"}; |
| |
| // Use to generate storageUuid |
| private static final DataStorage dsForStorageUuid = new DataStorage( |
| new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE)); |
| |
| private Configuration conf; |
| private DataNode datanode; |
| private DataStorage storage; |
| private FsDatasetImpl dataset; |
| |
| private final static String BLOCKPOOL = "BP-TEST"; |
| |
| private static Storage.StorageDirectory createStorageDirectory(File root, |
| Configuration conf) |
| throws SecurityException, IOException { |
| Storage.StorageDirectory sd = new Storage.StorageDirectory( |
| StorageLocation.parse(root.toURI().toString())); |
| DataStorage.createStorageID(sd, false, conf); |
| return sd; |
| } |
| |
| private static void createStorageDirs(DataStorage storage, Configuration conf, |
| int numDirs) throws IOException { |
| List<Storage.StorageDirectory> dirs = |
| new ArrayList<Storage.StorageDirectory>(); |
| List<String> dirStrings = new ArrayList<String>(); |
| FileUtils.deleteDirectory(new File(BASE_DIR)); |
| for (int i = 0; i < numDirs; i++) { |
| File loc = new File(BASE_DIR + "/data" + i); |
| dirStrings.add(new Path(loc.toString()).toUri().toString()); |
| loc.mkdirs(); |
| dirs.add(createStorageDirectory(loc, conf)); |
| when(storage.getStorageDir(i)).thenReturn(dirs.get(i)); |
| } |
| |
| String dataDir = StringUtils.join(",", dirStrings); |
| conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir); |
| when(storage.dirIterator()).thenReturn(dirs.iterator()); |
| when(storage.getNumStorageDirs()).thenReturn(numDirs); |
| } |
| |
| private static StorageLocation createStorageWithStorageType(String subDir, |
| StorageType storageType, Configuration conf, DataStorage storage, |
| DataNode dataNode) throws IOException { |
| String archiveStorageType = "[" + storageType + "]"; |
| String path = BASE_DIR + subDir; |
| new File(path).mkdirs(); |
| String pathUri = new Path(path).toUri().toString(); |
| StorageLocation loc = StorageLocation.parse(archiveStorageType + pathUri); |
| Storage.StorageDirectory sd = new Storage.StorageDirectory( |
| loc); |
| DataStorage.createStorageID(sd, false, conf); |
| |
| DataStorage.VolumeBuilder builder = |
| new DataStorage.VolumeBuilder(storage, sd); |
| when(storage.prepareVolume(eq(dataNode), eq(loc), |
| anyList())) |
| .thenReturn(builder); |
| return loc; |
| } |
| |
| private int getNumVolumes() { |
| try (FsDatasetSpi.FsVolumeReferences volumes = |
| dataset.getFsVolumeReferences()) { |
| return volumes.size(); |
| } catch (IOException e) { |
| return 0; |
| } |
| } |
| |
| @Before |
| public void setUp() throws IOException { |
| datanode = mock(DataNode.class); |
| storage = mock(DataStorage.class); |
| this.conf = new Configuration(); |
| this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); |
| this.conf.set(DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY, |
| replicaCacheRootDir); |
| |
| when(datanode.getConf()).thenReturn(conf); |
| final DNConf dnConf = new DNConf(datanode); |
| when(datanode.getDnConf()).thenReturn(dnConf); |
| final BlockScanner disabledBlockScanner = new BlockScanner(datanode); |
| when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner); |
| final ShortCircuitRegistry shortCircuitRegistry = |
| new ShortCircuitRegistry(conf); |
| when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry); |
| |
| createStorageDirs(storage, conf, NUM_INIT_VOLUMES); |
| dataset = new FsDatasetImpl(datanode, storage, conf); |
| for (String bpid : BLOCK_POOL_IDS) { |
| dataset.addBlockPool(bpid, conf); |
| } |
| |
| assertEquals(NUM_INIT_VOLUMES, getNumVolumes()); |
| assertEquals(0, dataset.getNumFailedVolumes()); |
| } |
| @Test(timeout=10000) |
| public void testReadLockEnabledByDefault() |
| throws Exception { |
| final FsDatasetSpi ds = dataset; |
| AtomicBoolean accessed = new AtomicBoolean(false); |
| CountDownLatch latch = new CountDownLatch(1); |
| CountDownLatch waiterLatch = new CountDownLatch(1); |
| |
| Thread holder = new Thread() { |
| public void run() { |
| try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { |
| latch.countDown(); |
| // wait for the waiter thread to access the lock. |
| waiterLatch.await(); |
| } catch (Exception e) { |
| } |
| } |
| }; |
| |
| Thread waiter = new Thread() { |
| public void run() { |
| try { |
| latch.await(); |
| } catch (InterruptedException e) { |
| waiterLatch.countDown(); |
| return; |
| } |
| try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { |
| accessed.getAndSet(true); |
| // signal the holder thread. |
| waiterLatch.countDown(); |
| } catch (Exception e) { |
| } |
| } |
| }; |
| waiter.start(); |
| holder.start(); |
| holder.join(); |
| waiter.join(); |
| // The holder thread is still holding the lock, but the waiter can still |
| // run as the lock is a shared read lock. |
| // Otherwise test will timeout with deadlock. |
| assertEquals(true, accessed.get()); |
| holder.interrupt(); |
| } |
| |
| @Test(timeout=20000) |
| public void testReadLockCanBeDisabledByConfig() |
| throws Exception { |
| HdfsConfiguration conf = new HdfsConfiguration(); |
| conf.setBoolean( |
| DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(1).build(); |
| try { |
| AtomicBoolean accessed = new AtomicBoolean(false); |
| cluster.waitActive(); |
| DataNode dn = cluster.getDataNodes().get(0); |
| final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn); |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| CountDownLatch waiterLatch = new CountDownLatch(1); |
| Thread holder = new Thread() { |
| public void run() { |
| try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { |
| latch.countDown(); |
| // wait for the waiter thread to access the lock. |
| waiterLatch.await(); |
| } catch (Exception e) { |
| } |
| } |
| }; |
| |
| Thread waiter = new Thread() { |
| public void run() { |
| try { |
| // Wait for holder to get ds read lock. |
| latch.await(); |
| } catch (InterruptedException e) { |
| waiterLatch.countDown(); |
| return; |
| } |
| try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { |
| accessed.getAndSet(true); |
| // signal the holder thread. |
| waiterLatch.countDown(); |
| } catch (Exception e) { |
| } |
| } |
| }; |
| waiter.start(); |
| holder.start(); |
| // Wait for sometime to make sure we are in deadlock, |
| try { |
| GenericTestUtils.waitFor(() -> |
| accessed.get(), |
| 100, 10000); |
| fail("Waiter thread should not execute."); |
| } catch (TimeoutException e) { |
| } |
| // Release waiterLatch to exit deadlock. |
| waiterLatch.countDown(); |
| holder.join(); |
| waiter.join(); |
| // After releasing waiterLatch water |
| // thread will be able to execute. |
| assertTrue(accessed.get()); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test |
| public void testAddVolumes() throws IOException { |
| final int numNewVolumes = 3; |
| final int numExistingVolumes = getNumVolumes(); |
| final int totalVolumes = numNewVolumes + numExistingVolumes; |
| Set<String> expectedVolumes = new HashSet<String>(); |
| List<NamespaceInfo> nsInfos = Lists.newArrayList(); |
| for (String bpid : BLOCK_POOL_IDS) { |
| nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); |
| } |
| for (int i = 0; i < numNewVolumes; i++) { |
| String path = BASE_DIR + "/newData" + i; |
| String pathUri = new Path(path).toUri().toString(); |
| expectedVolumes.add(new File(pathUri).getAbsolutePath()); |
| StorageLocation loc = StorageLocation.parse(pathUri); |
| Storage.StorageDirectory sd = createStorageDirectory( |
| new File(path), conf); |
| DataStorage.VolumeBuilder builder = |
| new DataStorage.VolumeBuilder(storage, sd); |
| when(storage.prepareVolume(eq(datanode), eq(loc), |
| anyList())) |
| .thenReturn(builder); |
| |
| dataset.addVolume(loc, nsInfos); |
| LOG.info("expectedVolumes " + i + " is " + |
| new File(pathUri).getAbsolutePath()); |
| } |
| |
| assertEquals(totalVolumes, getNumVolumes()); |
| assertEquals(totalVolumes, dataset.storageMap.size()); |
| |
| Set<String> actualVolumes = new HashSet<String>(); |
| try (FsDatasetSpi.FsVolumeReferences volumes = |
| dataset.getFsVolumeReferences()) { |
| for (int i = 0; i < numNewVolumes; i++) { |
| String volumeName = volumes.get(numExistingVolumes + i).toString(); |
| actualVolumes.add(volumeName); |
| LOG.info("actualVolume " + i + " is " + volumeName); |
| } |
| } |
| assertEquals(actualVolumes.size(), expectedVolumes.size()); |
| assertTrue(actualVolumes.containsAll(expectedVolumes)); |
| } |
| |
| // When turning on same disk tiering, |
| // we should prevent misconfig that |
| // volumes with same storage type created on same mount. |
| @Test |
| public void testAddVolumeWithSameDiskTiering() throws IOException { |
| datanode = mock(DataNode.class); |
| storage = mock(DataStorage.class); |
| this.conf = new Configuration(); |
| this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); |
| this.conf.set(DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY, |
| replicaCacheRootDir); |
| conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, |
| true); |
| conf.setDouble(DFSConfigKeys |
| .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, |
| 0.4); |
| |
| when(datanode.getConf()).thenReturn(conf); |
| final DNConf dnConf = new DNConf(datanode); |
| when(datanode.getDnConf()).thenReturn(dnConf); |
| final BlockScanner disabledBlockScanner = new BlockScanner(datanode); |
| when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner); |
| final ShortCircuitRegistry shortCircuitRegistry = |
| new ShortCircuitRegistry(conf); |
| when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry); |
| |
| createStorageDirs(storage, conf, 1); |
| dataset = new FsDatasetImpl(datanode, storage, conf); |
| |
| List<NamespaceInfo> nsInfos = Lists.newArrayList(); |
| for (String bpid : BLOCK_POOL_IDS) { |
| nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); |
| } |
| StorageLocation archive = createStorageWithStorageType("archive1", |
| StorageType.ARCHIVE, conf, storage, datanode); |
| dataset.addVolume(archive, nsInfos); |
| assertEquals(2, dataset.getVolumeCount()); |
| |
| String mount = new DF(new File(archive.getUri()), conf).getMount(); |
| double archiveRatio = dataset.getMountVolumeMap() |
| .getCapacityRatioByMountAndStorageType(mount, StorageType.ARCHIVE); |
| double diskRatio = dataset.getMountVolumeMap() |
| .getCapacityRatioByMountAndStorageType(mount, StorageType.DISK); |
| assertEquals(0.4, archiveRatio, 0); |
| assertEquals(0.6, diskRatio, 0); |
| |
| // Add second ARCHIVAL volume should fail fsDataSetImpl. |
| try { |
| dataset.addVolume( |
| createStorageWithStorageType("archive2", |
| StorageType.ARCHIVE, conf, storage, datanode), nsInfos); |
| fail("Should throw exception for" + |
| " same storage type already exists on same mount."); |
| } catch (IOException e) { |
| assertTrue(e.getMessage() |
| .startsWith("Storage type ARCHIVE already exists on same mount:")); |
| } |
| } |
| |
| @Test |
| public void testAddVolumeWithCustomizedCapacityRatio() |
| throws IOException { |
| datanode = mock(DataNode.class); |
| storage = mock(DataStorage.class); |
| this.conf = new Configuration(); |
| this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0); |
| this.conf.set(DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY, |
| replicaCacheRootDir); |
| conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, |
| true); |
| conf.setDouble(DFSConfigKeys |
| .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, |
| 0.5); |
| |
| // 1) Normal case, get capacity should return correct value. |
| String archivedir = "/archive1"; |
| String diskdir = "/disk1"; |
| String configStr = "[0.3]file:" + BASE_DIR + archivedir |
| + ", " + "[0.6]file:" + BASE_DIR + diskdir; |
| |
| conf.set(DFSConfigKeys |
| .DFS_DATANODE_SAME_DISK_TIERING_CAPACITY_RATIO_PERCENTAGE, |
| configStr); |
| |
| when(datanode.getConf()).thenReturn(conf); |
| final DNConf dnConf = new DNConf(datanode); |
| when(datanode.getDnConf()).thenReturn(dnConf); |
| final BlockScanner disabledBlockScanner = new BlockScanner(datanode); |
| when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner); |
| final ShortCircuitRegistry shortCircuitRegistry = |
| new ShortCircuitRegistry(conf); |
| when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry); |
| |
| createStorageDirs(storage, conf, 0); |
| |
| dataset = createStorageWithCapacityRatioConfig( |
| configStr, archivedir, diskdir); |
| |
| Path p = new Path("file:" + BASE_DIR); |
| String mount = new DF(new File(p.toUri()), conf).getMount(); |
| double archiveRatio = dataset.getMountVolumeMap() |
| .getCapacityRatioByMountAndStorageType(mount, StorageType.ARCHIVE); |
| double diskRatio = dataset.getMountVolumeMap() |
| .getCapacityRatioByMountAndStorageType(mount, StorageType.DISK); |
| assertEquals(0.3, archiveRatio, 0); |
| assertEquals(0.6, diskRatio, 0); |
| |
| // 2) Counter part volume should get rest of the capacity |
| // wihtout explicit config |
| configStr = "[0.3]file:" + BASE_DIR + archivedir; |
| dataset = createStorageWithCapacityRatioConfig( |
| configStr, archivedir, diskdir); |
| mount = new DF(new File(p.toUri()), conf).getMount(); |
| archiveRatio = dataset.getMountVolumeMap() |
| .getCapacityRatioByMountAndStorageType(mount, StorageType.ARCHIVE); |
| diskRatio = dataset.getMountVolumeMap() |
| .getCapacityRatioByMountAndStorageType(mount, StorageType.DISK); |
| assertEquals(0.3, archiveRatio, 0); |
| assertEquals(0.7, diskRatio, 0); |
| |
| // 3) Add volume will fail if capacity ratio is > 1 |
| dataset = new FsDatasetImpl(datanode, storage, conf); |
| configStr = "[0.3]file:" + BASE_DIR + archivedir |
| + ", " + "[0.8]file:" + BASE_DIR + diskdir; |
| |
| try { |
| createStorageWithCapacityRatioConfig( |
| configStr, archivedir, diskdir); |
| fail("Should fail add volume as capacity ratio sum is > 1"); |
| } catch (IOException e) { |
| assertTrue(e.getMessage() |
| .contains("Not enough capacity ratio left on mount")); |
| } |
| } |
| |
| private FsDatasetImpl createStorageWithCapacityRatioConfig( |
| String configStr, String archivedir, String diskdir) |
| throws IOException { |
| conf.set(DFSConfigKeys |
| .DFS_DATANODE_SAME_DISK_TIERING_CAPACITY_RATIO_PERCENTAGE, configStr |
| ); |
| dataset = new FsDatasetImpl(datanode, storage, conf); |
| List<NamespaceInfo> nsInfos = Lists.newArrayList(); |
| for (String bpid : BLOCK_POOL_IDS) { |
| nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); |
| } |
| |
| StorageLocation archive = createStorageWithStorageType( |
| archivedir, StorageType.ARCHIVE, conf, storage, datanode); |
| |
| StorageLocation disk = createStorageWithStorageType( |
| diskdir, StorageType.DISK, conf, storage, datanode); |
| |
| dataset.addVolume(archive, nsInfos); |
| dataset.addVolume(disk, nsInfos); |
| assertEquals(2, dataset.getVolumeCount()); |
| return dataset; |
| } |
| |
| @Test |
| public void testAddVolumeWithSameStorageUuid() throws IOException { |
| HdfsConfiguration config = new HdfsConfiguration(); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(config) |
| .numDataNodes(1).build(); |
| try { |
| cluster.waitActive(); |
| assertTrue(cluster.getDataNodes().get(0).isConnectedToNN( |
| cluster.getNameNode().getServiceRpcAddress())); |
| |
| MiniDFSCluster.DataNodeProperties dn = cluster.stopDataNode(0); |
| File vol0 = cluster.getStorageDir(0, 0); |
| File vol1 = cluster.getStorageDir(0, 1); |
| Storage.StorageDirectory sd0 = new Storage.StorageDirectory(vol0); |
| Storage.StorageDirectory sd1 = new Storage.StorageDirectory(vol1); |
| FileUtils.copyFile(sd0.getVersionFile(), sd1.getVersionFile()); |
| |
| cluster.restartDataNode(dn, true); |
| cluster.waitActive(); |
| assertFalse(cluster.getDataNodes().get(0).isConnectedToNN( |
| cluster.getNameNode().getServiceRpcAddress())); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testRemoveOneVolume() throws IOException { |
| // Feed FsDataset with block metadata. |
| final int numBlocks = 100; |
| for (int i = 0; i < numBlocks; i++) { |
| String bpid = BLOCK_POOL_IDS[numBlocks % BLOCK_POOL_IDS.length]; |
| ExtendedBlock eb = new ExtendedBlock(bpid, i); |
| ReplicaHandler replica = null; |
| try { |
| replica = dataset.createRbw(StorageType.DEFAULT, null, eb, |
| false); |
| } finally { |
| if (replica != null) { |
| replica.close(); |
| } |
| } |
| } |
| |
| // Remove one volume |
| final String[] dataDirs = |
| conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(","); |
| final String volumePathToRemove = dataDirs[0]; |
| Set<StorageLocation> volumesToRemove = new HashSet<>(); |
| volumesToRemove.add(StorageLocation.parse(volumePathToRemove)); |
| |
| FsVolumeReferences volReferences = dataset.getFsVolumeReferences(); |
| FsVolumeImpl volumeToRemove = null; |
| for (FsVolumeSpi vol: volReferences) { |
| if (vol.getStorageLocation().equals(volumesToRemove.iterator().next())) { |
| volumeToRemove = (FsVolumeImpl) vol; |
| } |
| } |
| assertTrue(volumeToRemove != null); |
| volReferences.close(); |
| dataset.removeVolumes(volumesToRemove, true); |
| int expectedNumVolumes = dataDirs.length - 1; |
| assertEquals("The volume has been removed from the volumeList.", |
| expectedNumVolumes, getNumVolumes()); |
| assertEquals("The volume has been removed from the storageMap.", |
| expectedNumVolumes, dataset.storageMap.size()); |
| |
| // DataNode.notifyNamenodeDeletedBlock() should be called 50 times |
| // as we deleted one volume that has 50 blocks |
| verify(datanode, times(50)) |
| .notifyNamenodeDeletedBlock(any(), any()); |
| |
| try { |
| dataset.asyncDiskService.execute(volumeToRemove, |
| new Runnable() { |
| @Override |
| public void run() {} |
| }); |
| fail("Expect RuntimeException: the volume has been removed from the " |
| + "AsyncDiskService."); |
| } catch (RuntimeException e) { |
| GenericTestUtils.assertExceptionContains("Cannot find volume", e); |
| } |
| |
| int totalNumReplicas = 0; |
| for (String bpid : dataset.volumeMap.getBlockPoolList()) { |
| totalNumReplicas += dataset.volumeMap.size(bpid); |
| } |
| assertEquals("The replica infos on this volume has been removed from the " |
| + "volumeMap.", numBlocks / NUM_INIT_VOLUMES, |
| totalNumReplicas); |
| } |
| |
| @Test(timeout = 30000) |
| public void testRemoveTwoVolumes() throws IOException { |
| // Feed FsDataset with block metadata. |
| final int numBlocks = 100; |
| for (int i = 0; i < numBlocks; i++) { |
| String bpid = BLOCK_POOL_IDS[numBlocks % BLOCK_POOL_IDS.length]; |
| ExtendedBlock eb = new ExtendedBlock(bpid, i); |
| ReplicaHandler replica = null; |
| try { |
| replica = dataset.createRbw(StorageType.DEFAULT, null, eb, |
| false); |
| } finally { |
| if (replica != null) { |
| replica.close(); |
| } |
| } |
| } |
| |
| // Remove two volumes |
| final String[] dataDirs = |
| conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(","); |
| Set<StorageLocation> volumesToRemove = new HashSet<>(); |
| volumesToRemove.add(StorageLocation.parse(dataDirs[0])); |
| volumesToRemove.add(StorageLocation.parse(dataDirs[1])); |
| |
| FsVolumeReferences volReferences = dataset.getFsVolumeReferences(); |
| Set<FsVolumeImpl> volumes = new HashSet<>(); |
| for (FsVolumeSpi vol: volReferences) { |
| for (StorageLocation volume : volumesToRemove) { |
| if (vol.getStorageLocation().equals(volume)) { |
| volumes.add((FsVolumeImpl) vol); |
| } |
| } |
| } |
| assertEquals(2, volumes.size()); |
| volReferences.close(); |
| |
| dataset.removeVolumes(volumesToRemove, true); |
| int expectedNumVolumes = dataDirs.length - 2; |
| assertEquals("The volume has been removed from the volumeList.", |
| expectedNumVolumes, getNumVolumes()); |
| assertEquals("The volume has been removed from the storageMap.", |
| expectedNumVolumes, dataset.storageMap.size()); |
| |
| // DataNode.notifyNamenodeDeletedBlock() should be called 100 times |
| // as we deleted 2 volumes that have 100 blocks totally |
| verify(datanode, times(100)) |
| .notifyNamenodeDeletedBlock(any(), any()); |
| |
| for (FsVolumeImpl volume : volumes) { |
| try { |
| dataset.asyncDiskService.execute(volume, |
| new Runnable() { |
| @Override |
| public void run() {} |
| }); |
| fail("Expect RuntimeException: the volume has been removed from the " |
| + "AsyncDiskService."); |
| } catch (RuntimeException e) { |
| GenericTestUtils.assertExceptionContains("Cannot find volume", e); |
| } |
| } |
| |
| int totalNumReplicas = 0; |
| for (String bpid : dataset.volumeMap.getBlockPoolList()) { |
| totalNumReplicas += dataset.volumeMap.size(bpid); |
| } |
| assertEquals("The replica infos on this volume has been removed from the " |
| + "volumeMap.", 0, totalNumReplicas); |
| } |
| |
| @Test(timeout = 5000) |
| public void testRemoveNewlyAddedVolume() throws IOException { |
| final int numExistingVolumes = getNumVolumes(); |
| List<NamespaceInfo> nsInfos = new ArrayList<>(); |
| for (String bpid : BLOCK_POOL_IDS) { |
| nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); |
| } |
| String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater"; |
| StorageLocation loc = StorageLocation.parse(newVolumePath); |
| |
| Storage.StorageDirectory sd = createStorageDirectory( |
| new File(newVolumePath), conf); |
| DataStorage.VolumeBuilder builder = |
| new DataStorage.VolumeBuilder(storage, sd); |
| when(storage.prepareVolume(eq(datanode), eq(loc), |
| anyList())) |
| .thenReturn(builder); |
| |
| dataset.addVolume(loc, nsInfos); |
| assertEquals(numExistingVolumes + 1, getNumVolumes()); |
| |
| when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1); |
| when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd); |
| Set<StorageLocation> volumesToRemove = new HashSet<>(); |
| volumesToRemove.add(loc); |
| dataset.removeVolumes(volumesToRemove, true); |
| assertEquals(numExistingVolumes, getNumVolumes()); |
| } |
| |
| @Test |
| public void testAddVolumeFailureReleasesInUseLock() throws IOException { |
| FsDatasetImpl spyDataset = spy(dataset); |
| FsVolumeImpl mockVolume = mock(FsVolumeImpl.class); |
| File badDir = new File(BASE_DIR, "bad"); |
| badDir.mkdirs(); |
| doReturn(mockVolume).when(spyDataset) |
| .createFsVolume(anyString(), any(StorageDirectory.class), |
| any(StorageLocation.class)); |
| doThrow(new IOException("Failed to getVolumeMap()")) |
| .when(mockVolume).getVolumeMap( |
| anyString(), |
| any(ReplicaMap.class), |
| any(RamDiskReplicaLruTracker.class)); |
| |
| Storage.StorageDirectory sd = createStorageDirectory(badDir, conf); |
| sd.lock(); |
| DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); |
| when(storage.prepareVolume(eq(datanode), |
| eq(StorageLocation.parse(badDir.toURI().toString())), |
| anyList())) |
| .thenReturn(builder); |
| |
| StorageLocation location = StorageLocation.parse(badDir.toString()); |
| List<NamespaceInfo> nsInfos = Lists.newArrayList(); |
| for (String bpid : BLOCK_POOL_IDS) { |
| nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1)); |
| } |
| |
| try { |
| spyDataset.addVolume(location, nsInfos); |
| fail("Expect to throw MultipleIOException"); |
| } catch (MultipleIOException e) { |
| } |
| |
| FsDatasetTestUtil.assertFileLockReleased(badDir.toString()); |
| } |
| |
| @Test |
| public void testDeletingBlocks() throws IOException { |
| HdfsConfiguration conf = new HdfsConfiguration(); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); |
| try { |
| cluster.waitActive(); |
| DataNode dn = cluster.getDataNodes().get(0); |
| |
| FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn); |
| ds.addBlockPool(BLOCKPOOL, conf); |
| FsVolumeImpl vol; |
| try (FsDatasetSpi.FsVolumeReferences volumes = ds.getFsVolumeReferences()) { |
| vol = (FsVolumeImpl)volumes.get(0); |
| } |
| |
| ExtendedBlock eb; |
| ReplicaInfo info; |
| List<Block> blockList = new ArrayList<>(); |
| for (int i = 1; i <= 63; i++) { |
| eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i); |
| cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb); |
| blockList.add(eb.getLocalBlock()); |
| } |
| ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| // Nothing to do |
| } |
| assertTrue(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId())); |
| |
| blockList.clear(); |
| eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064); |
| cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb); |
| blockList.add(eb.getLocalBlock()); |
| ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| // Nothing to do |
| } |
| assertFalse(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId())); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test |
| public void testDuplicateReplicaResolution() throws IOException { |
| FsVolumeImpl fsv1 = Mockito.mock(FsVolumeImpl.class); |
| FsVolumeImpl fsv2 = Mockito.mock(FsVolumeImpl.class); |
| |
| File f1 = new File("d1/block"); |
| File f2 = new File("d2/block"); |
| |
| ReplicaInfo replicaOlder = new FinalizedReplica(1,1,1,fsv1,f1); |
| ReplicaInfo replica = new FinalizedReplica(1,2,2,fsv1,f1); |
| ReplicaInfo replicaSame = new FinalizedReplica(1,2,2,fsv1,f1); |
| ReplicaInfo replicaNewer = new FinalizedReplica(1,3,3,fsv1,f1); |
| |
| ReplicaInfo replicaOtherOlder = new FinalizedReplica(1,1,1,fsv2,f2); |
| ReplicaInfo replicaOtherSame = new FinalizedReplica(1,2,2,fsv2,f2); |
| ReplicaInfo replicaOtherNewer = new FinalizedReplica(1,3,3,fsv2,f2); |
| |
| // equivalent path so don't remove either |
| assertNull(BlockPoolSlice.selectReplicaToDelete(replicaSame, replica)); |
| assertNull(BlockPoolSlice.selectReplicaToDelete(replicaOlder, replica)); |
| assertNull(BlockPoolSlice.selectReplicaToDelete(replicaNewer, replica)); |
| |
| // keep latest found replica |
| assertSame(replica, |
| BlockPoolSlice.selectReplicaToDelete(replicaOtherSame, replica)); |
| assertSame(replicaOtherOlder, |
| BlockPoolSlice.selectReplicaToDelete(replicaOtherOlder, replica)); |
| assertSame(replica, |
| BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica)); |
| } |
| |
| @Test |
| public void testLoadingDfsUsedForVolumes() throws IOException, |
| InterruptedException { |
| long waitIntervalTime = 5000; |
| // Initialize the cachedDfsUsedIntervalTime larger than waitIntervalTime |
| // to avoid cache-dfsused time expired |
| long cachedDfsUsedIntervalTime = waitIntervalTime + 1000; |
| conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS, |
| cachedDfsUsedIntervalTime); |
| |
| long cacheDfsUsed = 1024; |
| long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime); |
| |
| assertEquals(cacheDfsUsed, dfsUsed); |
| } |
| |
| @Test |
| public void testLoadingDfsUsedForVolumesExpired() throws IOException, |
| InterruptedException { |
| long waitIntervalTime = 5000; |
| // Initialize the cachedDfsUsedIntervalTime smaller than waitIntervalTime |
| // to make cache-dfsused time expired |
| long cachedDfsUsedIntervalTime = waitIntervalTime - 1000; |
| conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS, |
| cachedDfsUsedIntervalTime); |
| |
| long cacheDfsUsed = 1024; |
| long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime); |
| |
| // Because the cache-dfsused expired and the dfsUsed will be recalculated |
| assertTrue(cacheDfsUsed != dfsUsed); |
| } |
| |
| private long getDfsUsedValueOfNewVolume(long cacheDfsUsed, |
| long waitIntervalTime) throws IOException, InterruptedException { |
| List<NamespaceInfo> nsInfos = Lists.newArrayList(); |
| nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, BLOCK_POOL_IDS[0], 1)); |
| |
| String CURRENT_DIR = "current"; |
| String DU_CACHE_FILE = BlockPoolSlice.DU_CACHE_FILE; |
| String path = BASE_DIR + "/newData0"; |
| String pathUri = new Path(path).toUri().toString(); |
| StorageLocation loc = StorageLocation.parse(pathUri); |
| Storage.StorageDirectory sd = createStorageDirectory(new File(path), conf); |
| DataStorage.VolumeBuilder builder = |
| new DataStorage.VolumeBuilder(storage, sd); |
| when( |
| storage.prepareVolume(eq(datanode), eq(loc), |
| anyList())).thenReturn(builder); |
| |
| String cacheFilePath = |
| String.format("%s/%s/%s/%s/%s", path, CURRENT_DIR, BLOCK_POOL_IDS[0], |
| CURRENT_DIR, DU_CACHE_FILE); |
| File outFile = new File(cacheFilePath); |
| |
| if (!outFile.getParentFile().exists()) { |
| outFile.getParentFile().mkdirs(); |
| } |
| |
| if (outFile.exists()) { |
| outFile.delete(); |
| } |
| |
| FakeTimer timer = new FakeTimer(); |
| try { |
| try (Writer out = |
| new OutputStreamWriter(new FileOutputStream(outFile), |
| StandardCharsets.UTF_8)) { |
| // Write the dfsUsed value and the time to cache file |
| out.write(Long.toString(cacheDfsUsed) + " " |
| + Long.toString(timer.now())); |
| out.flush(); |
| } |
| } catch (IOException ioe) { |
| } |
| |
| dataset.setTimer(timer); |
| timer.advance(waitIntervalTime); |
| dataset.addVolume(loc, nsInfos); |
| |
| // Get the last volume which was just added before |
| FsVolumeImpl newVolume; |
| try (FsDatasetSpi.FsVolumeReferences volumes = |
| dataset.getFsVolumeReferences()) { |
| newVolume = (FsVolumeImpl) volumes.get(volumes.size() - 1); |
| } |
| long dfsUsed = newVolume.getDfsUsed(); |
| |
| return dfsUsed; |
| } |
| |
| @Test(timeout = 60000) |
| public void testRemoveVolumeBeingWritten() throws Exception { |
| // Will write and remove on dn0. |
| final ExtendedBlock eb = new ExtendedBlock(BLOCK_POOL_IDS[0], 0); |
| final CountDownLatch startFinalizeLatch = new CountDownLatch(1); |
| final CountDownLatch blockReportReceivedLatch = new CountDownLatch(1); |
| final CountDownLatch volRemoveStartedLatch = new CountDownLatch(1); |
| final CountDownLatch volRemoveCompletedLatch = new CountDownLatch(1); |
| class BlockReportThread extends Thread { |
| public void run() { |
| // Lets wait for the volume remove process to start |
| try { |
| volRemoveStartedLatch.await(); |
| } catch (Exception e) { |
| LOG.info("Unexpected exception when waiting for vol removal:", e); |
| } |
| LOG.info("Getting block report"); |
| dataset.getBlockReports(eb.getBlockPoolId()); |
| LOG.info("Successfully received block report"); |
| blockReportReceivedLatch.countDown(); |
| } |
| } |
| |
| class ResponderThread extends Thread { |
| public void run() { |
| try (ReplicaHandler replica = dataset |
| .createRbw(StorageType.DEFAULT, null, eb, false)) { |
| LOG.info("CreateRbw finished"); |
| startFinalizeLatch.countDown(); |
| |
| // Slow down while we're holding the reference to the volume. |
| // As we finalize a block, the volume is removed in parallel. |
| // Ignore any interrupts coming out of volume shutdown. |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException ie) { |
| LOG.info("Ignoring ", ie); |
| } |
| |
| // Lets wait for the other thread finish getting block report |
| blockReportReceivedLatch.await(); |
| |
| dataset.finalizeBlock(eb, false); |
| LOG.info("FinalizeBlock finished"); |
| } catch (Exception e) { |
| LOG.warn("Exception caught. This should not affect the test", e); |
| } |
| } |
| } |
| |
| class VolRemoveThread extends Thread { |
| public void run() { |
| Set<StorageLocation> volumesToRemove = new HashSet<>(); |
| try { |
| volumesToRemove.add(dataset.getVolume(eb).getStorageLocation()); |
| } catch (Exception e) { |
| LOG.info("Problem preparing volumes to remove: ", e); |
| Assert.fail("Exception in remove volume thread, check log for " + |
| "details."); |
| } |
| LOG.info("Removing volume " + volumesToRemove); |
| dataset.removeVolumes(volumesToRemove, true); |
| volRemoveCompletedLatch.countDown(); |
| LOG.info("Removed volume " + volumesToRemove); |
| } |
| } |
| |
| // Start the volume write operation |
| ResponderThread responderThread = new ResponderThread(); |
| responderThread.start(); |
| startFinalizeLatch.await(); |
| |
| // Start the block report get operation |
| final BlockReportThread blockReportThread = new BlockReportThread(); |
| blockReportThread.start(); |
| |
| // Start the volume remove operation |
| VolRemoveThread volRemoveThread = new VolRemoveThread(); |
| volRemoveThread.start(); |
| |
| // Let volume write and remove operation be |
| // blocked for few seconds |
| Thread.sleep(2000); |
| |
| // Signal block report receiver and volume writer |
| // thread to complete their operations so that vol |
| // remove can proceed |
| volRemoveStartedLatch.countDown(); |
| |
| // Verify if block report can be received |
| // when volume is in use and also being removed |
| blockReportReceivedLatch.await(); |
| |
| // Verify if volume can be removed safely when there |
| // are read/write operation in-progress |
| volRemoveCompletedLatch.await(); |
| } |
| |
| /** |
| * Tests stopping all the active DataXceiver thread on volume failure event. |
| * @throws Exception |
| */ |
| @Test |
| public void testCleanShutdownOfVolume() throws Exception { |
| MiniDFSCluster cluster = null; |
| try { |
| Configuration config = new HdfsConfiguration(); |
| config.setLong( |
| DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000); |
| config.setTimeDuration( |
| DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, 0, |
| TimeUnit.MILLISECONDS); |
| config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); |
| |
| cluster = new MiniDFSCluster.Builder(config, |
| GenericTestUtils.getRandomizedTestDir()).numDataNodes(1).build(); |
| cluster.waitActive(); |
| FileSystem fs = cluster.getFileSystem(); |
| DataNode dataNode = cluster.getDataNodes().get(0); |
| Path filePath = new Path("test.dat"); |
| // Create a file and keep the output stream unclosed. |
| FSDataOutputStream out = fs.create(filePath, (short) 1); |
| out.write(1); |
| out.hflush(); |
| |
| ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); |
| final FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset(). |
| getVolume(block); |
| File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem() |
| .getBlockPoolId()); |
| LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0); |
| DatanodeInfo info = lb.getLocations()[0]; |
| |
| if (finalizedDir.exists()) { |
| // Remove write and execute access so that checkDiskErrorThread detects |
| // this volume is bad. |
| finalizedDir.setExecutable(false); |
| assertTrue(FileUtil.setWritable(finalizedDir, false)); |
| } |
| Assert.assertTrue("Reference count for the volume should be greater " |
| + "than 0", volume.getReferenceCount() > 0); |
| // Invoke the synchronous checkDiskError method |
| dataNode.checkDiskError(); |
| // Sleep for 1 second so that datanode can interrupt and cluster clean up |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override public Boolean get() { |
| return volume.getReferenceCount() == 0; |
| } |
| }, 100, 1000); |
| assertThat(dataNode.getFSDataset().getNumFailedVolumes(), is(1)); |
| |
| try { |
| out.close(); |
| Assert.fail("This is not a valid code path. " |
| + "out.close should have thrown an exception."); |
| } catch (IOException ioe) { |
| GenericTestUtils.assertExceptionContains(info.getXferAddr(), ioe); |
| } |
| assertTrue(FileUtil.setWritable(finalizedDir, true)); |
| finalizedDir.setExecutable(true); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testReportBadBlocks() throws Exception { |
| boolean threwException = false; |
| MiniDFSCluster cluster = null; |
| try { |
| Configuration config = new HdfsConfiguration(); |
| cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); |
| cluster.waitActive(); |
| |
| Assert.assertEquals(0, cluster.getNamesystem().getCorruptReplicaBlocks()); |
| DataNode dataNode = cluster.getDataNodes().get(0); |
| ExtendedBlock block = |
| new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(), 0); |
| try { |
| // Test the reportBadBlocks when the volume is null |
| dataNode.reportBadBlocks(block); |
| } catch (NullPointerException npe) { |
| threwException = true; |
| } |
| Thread.sleep(3000); |
| Assert.assertFalse(threwException); |
| Assert.assertEquals(0, cluster.getNamesystem().getCorruptReplicaBlocks()); |
| |
| FileSystem fs = cluster.getFileSystem(); |
| Path filePath = new Path("testData"); |
| DFSTestUtil.createFile(fs, filePath, 1, (short) 1, 0); |
| |
| block = DFSTestUtil.getFirstBlock(fs, filePath); |
| // Test for the overloaded method reportBadBlocks |
| dataNode.reportBadBlocks(block, dataNode.getFSDataset() |
| .getFsVolumeReferences().get(0)); |
| Thread.sleep(3000); |
| BlockManagerTestUtil.updateState(cluster.getNamesystem() |
| .getBlockManager()); |
| // Verify the bad block has been reported to namenode |
| Assert.assertEquals(1, cluster.getNamesystem().getCorruptReplicaBlocks()); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * When moving blocks using hardLink or copy |
| * and append happened in the middle, |
| * block movement should fail and hardlink is removed. |
| */ |
| @Test(timeout = 30000) |
| public void testMoveBlockFailure() { |
| // Test copy |
| testMoveBlockFailure(conf); |
| // Test hardlink |
| conf.setBoolean(DFSConfigKeys |
| .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); |
| conf.setDouble(DFSConfigKeys |
| .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5); |
| testMoveBlockFailure(conf); |
| } |
| |
| private void testMoveBlockFailure(Configuration config) { |
| MiniDFSCluster cluster = null; |
| try { |
| |
| cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(1) |
| .storageTypes( |
| new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}) |
| .storagesPerDatanode(2) |
| .build(); |
| FileSystem fs = cluster.getFileSystem(); |
| DataNode dataNode = cluster.getDataNodes().get(0); |
| |
| Path filePath = new Path("testData"); |
| long fileLen = 100; |
| ExtendedBlock block = createTestFile(fs, fileLen, filePath); |
| |
| FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); |
| ReplicaInfo newReplicaInfo = |
| createNewReplicaObjWithLink(block, fsDataSetImpl); |
| |
| // Append to file to update its GS |
| FSDataOutputStream out = fs.append(filePath, (short) 1); |
| out.write(100); |
| out.hflush(); |
| |
| // Call finalizeNewReplica |
| assertTrue(newReplicaInfo.blockDataExists()); |
| LOG.info("GenerationStamp of old replica: {}", |
| block.getGenerationStamp()); |
| LOG.info("GenerationStamp of new replica: {}", fsDataSetImpl |
| .getReplicaInfo(block.getBlockPoolId(), newReplicaInfo.getBlockId()) |
| .getGenerationStamp()); |
| LambdaTestUtils.intercept(IOException.class, "Generation Stamp " |
| + "should be monotonically increased.", |
| () -> fsDataSetImpl.finalizeNewReplica(newReplicaInfo, block)); |
| assertFalse(newReplicaInfo.blockDataExists()); |
| |
| validateFileLen(fs, fileLen, filePath); |
| } catch (Exception ex) { |
| LOG.info("Exception in testMoveBlockFailure ", ex); |
| fail("Exception while testing testMoveBlockFailure "); |
| } finally { |
| if (cluster.isClusterUp()) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testMoveBlockSuccess() { |
| MiniDFSCluster cluster = null; |
| try { |
| cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(1) |
| .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK}) |
| .storagesPerDatanode(2) |
| .build(); |
| FileSystem fs = cluster.getFileSystem(); |
| DataNode dataNode = cluster.getDataNodes().get(0); |
| |
| Path filePath = new Path("testData"); |
| DFSTestUtil.createFile(fs, filePath, 100, (short) 1, 0); |
| ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); |
| |
| FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); |
| ReplicaInfo newReplicaInfo = createNewReplicaObj(block, fsDataSetImpl); |
| fsDataSetImpl.finalizeNewReplica(newReplicaInfo, block); |
| |
| } catch (Exception ex) { |
| LOG.info("Exception in testMoveBlockSuccess ", ex); |
| fail("MoveBlock operation should succeed"); |
| } finally { |
| if (cluster.isClusterUp()) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| /** |
| * Make sure datanode restart can clean up un-finalized links, |
| * if the block is not finalized yet. |
| */ |
| @Test(timeout = 30000) |
| public void testDnRestartWithHardLinkInTmp() { |
| MiniDFSCluster cluster = null; |
| try { |
| conf.setBoolean(DFSConfigKeys |
| .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); |
| conf.setDouble(DFSConfigKeys |
| .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5); |
| cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(1) |
| .storageTypes( |
| new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}) |
| .storagesPerDatanode(2) |
| .build(); |
| FileSystem fs = cluster.getFileSystem(); |
| DataNode dataNode = cluster.getDataNodes().get(0); |
| |
| Path filePath = new Path("testData"); |
| long fileLen = 100; |
| |
| ExtendedBlock block = createTestFile(fs, fileLen, filePath); |
| |
| FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); |
| |
| ReplicaInfo oldReplicaInfo = fsDataSetImpl.getReplicaInfo(block); |
| ReplicaInfo newReplicaInfo = |
| createNewReplicaObjWithLink(block, fsDataSetImpl); |
| |
| // Link exists |
| assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI()))); |
| |
| cluster.restartDataNode(0); |
| cluster.waitDatanodeFullyStarted(cluster.getDataNodes().get(0), 60000); |
| cluster.triggerBlockReports(); |
| |
| // Un-finalized replica data (hard link) is deleted as they were in /tmp |
| assertFalse(Files.exists(Paths.get(newReplicaInfo.getBlockURI()))); |
| |
| // Old block is there. |
| assertTrue(Files.exists(Paths.get(oldReplicaInfo.getBlockURI()))); |
| |
| validateFileLen(fs, fileLen, filePath); |
| |
| } catch (Exception ex) { |
| LOG.info("Exception in testDnRestartWithHardLinkInTmp ", ex); |
| fail("Exception while testing testDnRestartWithHardLinkInTmp "); |
| } finally { |
| if (cluster.isClusterUp()) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| /** |
| * If new block is finalized and DN restarted, |
| * DiskScanner should clean up the hardlink correctly. |
| */ |
| @Test(timeout = 30000) |
| public void testDnRestartWithHardLink() { |
| MiniDFSCluster cluster = null; |
| try { |
| conf.setBoolean(DFSConfigKeys |
| .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); |
| conf.setDouble(DFSConfigKeys |
| .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5); |
| cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(1) |
| .storageTypes( |
| new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}) |
| .storagesPerDatanode(2) |
| .build(); |
| FileSystem fs = cluster.getFileSystem(); |
| DataNode dataNode = cluster.getDataNodes().get(0); |
| |
| Path filePath = new Path("testData"); |
| long fileLen = 100; |
| |
| ExtendedBlock block = createTestFile(fs, fileLen, filePath); |
| |
| FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); |
| |
| final ReplicaInfo oldReplicaInfo = fsDataSetImpl.getReplicaInfo(block); |
| |
| fsDataSetImpl.finalizeNewReplica( |
| createNewReplicaObjWithLink(block, fsDataSetImpl), block); |
| |
| ReplicaInfo newReplicaInfo = fsDataSetImpl.getReplicaInfo(block); |
| |
| cluster.restartDataNode(0); |
| cluster.waitDatanodeFullyStarted(cluster.getDataNodes().get(0), 60000); |
| cluster.triggerBlockReports(); |
| |
| assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI()))); |
| assertTrue(Files.exists(Paths.get(oldReplicaInfo.getBlockURI()))); |
| |
| DirectoryScanner scanner = new DirectoryScanner( |
| cluster.getDataNodes().get(0).getFSDataset(), conf); |
| scanner.start(); |
| scanner.run(); |
| |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override public Boolean get() { |
| return !Files.exists(Paths.get(oldReplicaInfo.getBlockURI())); |
| } |
| }, 100, 10000); |
| assertTrue(Files.exists(Paths.get(newReplicaInfo.getBlockURI()))); |
| |
| validateFileLen(fs, fileLen, filePath); |
| |
| } catch (Exception ex) { |
| LOG.info("Exception in testDnRestartWithHardLink ", ex); |
| fail("Exception while testing testDnRestartWithHardLink "); |
| } finally { |
| if (cluster.isClusterUp()) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test(timeout = 30000) |
| public void testMoveBlockSuccessWithSameMountMove() { |
| MiniDFSCluster cluster = null; |
| try { |
| conf.setBoolean(DFSConfigKeys |
| .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); |
| conf.setDouble(DFSConfigKeys |
| .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.5); |
| cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(1) |
| .storageTypes( |
| new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}) |
| .storagesPerDatanode(2) |
| .build(); |
| FileSystem fs = cluster.getFileSystem(); |
| DataNode dataNode = cluster.getDataNodes().get(0); |
| Path filePath = new Path("testData"); |
| long fileLen = 100; |
| |
| ExtendedBlock block = createTestFile(fs, fileLen, filePath); |
| |
| FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); |
| assertEquals(StorageType.DISK, |
| fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType()); |
| |
| FsDatasetImpl fsDataSetImplSpy = |
| spy((FsDatasetImpl) dataNode.getFSDataset()); |
| fsDataSetImplSpy.moveBlockAcrossStorage( |
| block, StorageType.ARCHIVE, null); |
| |
| // Make sure it is done thru hardlink |
| verify(fsDataSetImplSpy).moveBlock(any(), any(), any(), eq(true)); |
| |
| assertEquals(StorageType.ARCHIVE, |
| fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType()); |
| validateFileLen(fs, fileLen, filePath); |
| |
| } catch (Exception ex) { |
| LOG.info("Exception in testMoveBlockSuccessWithSameMountMove ", ex); |
| fail("testMoveBlockSuccessWithSameMountMove operation should succeed"); |
| } finally { |
| if (cluster.isClusterUp()) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| // Move should fail if the volume on same mount has no space. |
| @Test(timeout = 30000) |
| public void testMoveBlockWithSameMountMoveWithoutSpace() { |
| MiniDFSCluster cluster = null; |
| try { |
| conf.setBoolean(DFSConfigKeys |
| .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); |
| conf.setDouble(DFSConfigKeys |
| .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.0); |
| cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(1) |
| .storageTypes( |
| new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}) |
| .storagesPerDatanode(2) |
| .build(); |
| FileSystem fs = cluster.getFileSystem(); |
| DataNode dataNode = cluster.getDataNodes().get(0); |
| Path filePath = new Path("testData"); |
| long fileLen = 100; |
| |
| ExtendedBlock block = createTestFile(fs, fileLen, filePath); |
| |
| FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); |
| assertEquals(StorageType.DISK, |
| fsDataSetImpl.getReplicaInfo(block).getVolume().getStorageType()); |
| |
| FsDatasetImpl fsDataSetImplSpy = |
| spy((FsDatasetImpl) dataNode.getFSDataset()); |
| fsDataSetImplSpy.moveBlockAcrossStorage( |
| block, StorageType.ARCHIVE, null); |
| |
| fail("testMoveBlockWithSameMountMoveWithoutSpace operation" + |
| " should failed"); |
| } catch (Exception ex) { |
| assertTrue(ex instanceof DiskChecker.DiskOutOfSpaceException); |
| } finally { |
| if (cluster.isClusterUp()) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| // More tests on shouldConsiderSameMountVolume. |
| @Test(timeout = 10000) |
| public void testShouldConsiderSameMountVolume() throws IOException { |
| FsVolumeImpl volume = new FsVolumeImplBuilder() |
| .setConf(conf) |
| .setDataset(dataset) |
| .setStorageID("storage-id") |
| .setStorageDirectory( |
| new StorageDirectory(StorageLocation.parse(BASE_DIR))) |
| .build(); |
| assertFalse(dataset.shouldConsiderSameMountVolume(volume, |
| StorageType.ARCHIVE, null)); |
| |
| conf.setBoolean(DFSConfigKeys |
| .DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true); |
| conf.setDouble(DFSConfigKeys |
| .DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, |
| 0.5); |
| volume = new FsVolumeImplBuilder() |
| .setConf(conf) |
| .setDataset(dataset) |
| .setStorageID("storage-id") |
| .setStorageDirectory( |
| new StorageDirectory(StorageLocation.parse(BASE_DIR))) |
| .build(); |
| assertTrue(dataset.shouldConsiderSameMountVolume(volume, |
| StorageType.ARCHIVE, null)); |
| assertTrue(dataset.shouldConsiderSameMountVolume(volume, |
| StorageType.ARCHIVE, "")); |
| assertFalse(dataset.shouldConsiderSameMountVolume(volume, |
| StorageType.DISK, null)); |
| assertFalse(dataset.shouldConsiderSameMountVolume(volume, |
| StorageType.ARCHIVE, "target")); |
| } |
| |
| /** |
| * Create a new temporary replica of replicaInfo object in another volume. |
| * |
| * @param block - Extended Block |
| * @param fsDataSetImpl - FsDatasetImpl reference |
| * @throws IOException |
| */ |
| private ReplicaInfo createNewReplicaObj(ExtendedBlock block, FsDatasetImpl |
| fsDataSetImpl) throws IOException { |
| ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block); |
| FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl); |
| return fsDataSetImpl.copyReplicaToVolume(block, replicaInfo, |
| destVolume.obtainReference()); |
| } |
| |
| /** |
| * Create a new temporary replica of replicaInfo object in another volume. |
| * |
| * @param block - Extended Block |
| * @param fsDataSetImpl - FsDatasetImpl reference |
| * @throws IOException |
| */ |
| private ReplicaInfo createNewReplicaObjWithLink(ExtendedBlock block, |
| FsDatasetImpl fsDataSetImpl) throws IOException { |
| ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block); |
| FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl); |
| return fsDataSetImpl.moveReplicaToVolumeOnSameMount(block, replicaInfo, |
| destVolume.obtainReference()); |
| } |
| |
| private ExtendedBlock createTestFile(FileSystem fs, |
| long fileLen, Path filePath) throws IOException { |
| DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, 0); |
| return DFSTestUtil.getFirstBlock(fs, filePath); |
| } |
| |
| private void validateFileLen(FileSystem fs, |
| long fileLen, Path filePath) throws IOException { |
| // Read data file to make sure it is good. |
| InputStream in = fs.open(filePath); |
| int bytesCount = 0; |
| while (in.read() != -1) { |
| bytesCount++; |
| } |
| assertTrue(fileLen <= bytesCount); |
| } |
| |
| /** |
| * Finds a new destination volume for block. |
| * |
| * @param block - Extended Block |
| * @param fsDataSetImpl - FsDatasetImpl reference |
| * @throws IOException |
| */ |
| private FsVolumeSpi getDestinationVolume(ExtendedBlock block, FsDatasetImpl |
| fsDataSetImpl) throws IOException { |
| FsVolumeSpi destVolume = null; |
| final String srcStorageId = fsDataSetImpl.getVolume(block).getStorageID(); |
| try (FsVolumeReferences volumeReferences = |
| fsDataSetImpl.getFsVolumeReferences()) { |
| for (int i = 0; i < volumeReferences.size(); i++) { |
| if (!volumeReferences.get(i).getStorageID().equals(srcStorageId)) { |
| destVolume = volumeReferences.get(i); |
| break; |
| } |
| } |
| } |
| return destVolume; |
| } |
| |
| @Test(timeout = 3000000) |
| public void testBlockReadOpWhileMovingBlock() throws IOException { |
| MiniDFSCluster cluster = null; |
| try { |
| |
| // Setup cluster |
| conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); |
| cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(1) |
| .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK}) |
| .storagesPerDatanode(2) |
| .build(); |
| FileSystem fs = cluster.getFileSystem(); |
| DataNode dataNode = cluster.getDataNodes().get(0); |
| |
| // Create test file with ASCII data |
| Path filePath = new Path("/tmp/testData"); |
| String blockData = RandomStringUtils.randomAscii(512 * 4); |
| FSDataOutputStream fout = fs.create(filePath); |
| fout.writeBytes(blockData); |
| fout.close(); |
| assertEquals(blockData, DFSTestUtil.readFile(fs, filePath)); |
| |
| ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath); |
| BlockReaderTestUtil util = new BlockReaderTestUtil(cluster, new |
| HdfsConfiguration(conf)); |
| LocatedBlock blk = util.getFileBlocks(filePath, 512 * 2).get(0); |
| File[] blkFiles = cluster.getAllBlockFiles(block); |
| |
| // Part 1: Read partial data from block |
| LOG.info("Reading partial data for block {} before moving it: ", |
| blk.getBlock().toString()); |
| BlockReader blkReader = BlockReaderTestUtil.getBlockReader( |
| (DistributedFileSystem) fs, blk, 0, 512 * 2); |
| byte[] buf = new byte[512 * 2]; |
| blkReader.read(buf, 0, 512); |
| assertEquals(blockData.substring(0, 512), new String(buf, |
| StandardCharsets.US_ASCII).substring(0, 512)); |
| |
| // Part 2: Move block and than read remaining block |
| FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); |
| ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block); |
| FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl); |
| assertNotNull("Destination volume should not be null.", destVolume); |
| fsDataSetImpl.moveBlock(block, replicaInfo, |
| destVolume.obtainReference(), false); |
| // Trigger block report to update block info in NN |
| cluster.triggerBlockReports(); |
| blkReader.read(buf, 512, 512); |
| assertEquals(blockData.substring(0, 512 * 2), new String(buf, |
| StandardCharsets.US_ASCII).substring(0, 512 * 2)); |
| blkReader = BlockReaderTestUtil.getBlockReader( |
| (DistributedFileSystem) fs, |
| blk, 0, blockData.length()); |
| buf = new byte[512 * 4]; |
| blkReader.read(buf, 0, 512 * 4); |
| assertEquals(blockData, new String(buf, StandardCharsets.US_ASCII)); |
| |
| // Part 3: 1. Close the block reader |
| // 2. Assert source block doesn't exist on initial volume |
| // 3. Assert new file location for block is different |
| // 4. Confirm client can read data from new location |
| blkReader.close(); |
| ExtendedBlock block2 = DFSTestUtil.getFirstBlock(fs, filePath); |
| File[] blkFiles2 = cluster.getAllBlockFiles(block2); |
| blk = util.getFileBlocks(filePath, 512 * 4).get(0); |
| blkReader = BlockReaderTestUtil.getBlockReader( |
| (DistributedFileSystem) fs, |
| blk, 0, blockData.length()); |
| blkReader.read(buf, 0, 512 * 4); |
| |
| assertFalse(Files.exists(Paths.get(blkFiles[0].getAbsolutePath()))); |
| assertNotEquals(blkFiles[0], blkFiles2[0]); |
| assertEquals(blockData, new String(buf, StandardCharsets.US_ASCII)); |
| |
| } finally { |
| if (cluster.isClusterUp()) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| @Test(timeout=30000) |
| public void testDataDirWithPercent() throws IOException { |
| String baseDir = new FileSystemTestHelper().getTestRootDir(); |
| File dataDir = new File(baseDir, "invalidFormatString-%z"); |
| dataDir.mkdirs(); |
| FsVolumeImpl volumeFixed = new FsVolumeImplBuilder() |
| .setConf(new HdfsConfiguration()) |
| .setDataset(dataset) |
| .setStorageID("storage-id") |
| .setStorageDirectory( |
| new StorageDirectory(StorageLocation.parse(dataDir.getPath()))) |
| .build(); |
| } |
| |
| @Test |
| public void testReplicaCacheFileToOtherPlace() throws IOException { |
| final String bpid = "bpid-0"; |
| for (int i = 0; i < 5; i++) { |
| ExtendedBlock eb = new ExtendedBlock(bpid, i); |
| dataset.createRbw(StorageType.DEFAULT, null, eb, false); |
| } |
| List<File> cacheFiles = new ArrayList<>(); |
| for (FsVolumeSpi vol: dataset.getFsVolumeReferences()) { |
| BlockPoolSlice bpSlice = ((FsVolumeImpl)vol).getBlockPoolSlice(bpid); |
| File cacheFile = new File(replicaCacheRootDir + Path.SEPARATOR + |
| bpSlice.getDirectory().getCanonicalPath() + Path.SEPARATOR + |
| DataStorage.STORAGE_DIR_CURRENT + Path.SEPARATOR + "replicas"); |
| cacheFiles.add(cacheFile); |
| } |
| dataset.shutdownBlockPool(bpid); |
| for (File f : cacheFiles) { |
| assertTrue(f.exists()); |
| } |
| } |
| |
| @Test |
| public void testGetMetadataLengthOfFinalizedReplica() throws IOException { |
| FsVolumeImpl fsv1 = Mockito.mock(FsVolumeImpl.class); |
| File blockDir = new File(BASE_DIR,"testFinalizedReplica/block"); |
| if (!blockDir.exists()) { |
| assertTrue(blockDir.mkdirs()); |
| } |
| long blockID = 1; |
| long genStamp = 2; |
| File metaFile = new File(blockDir,Block.BLOCK_FILE_PREFIX + |
| blockID + "_" + genStamp + Block.METADATA_EXTENSION); |
| |
| // create meta file on disk |
| OutputStream os = new FileOutputStream(metaFile); |
| os.write("TEST_META_SIZE".getBytes()); |
| os.close(); |
| long fileLength = metaFile.length(); |
| |
| ReplicaInfo replica = new FinalizedReplica( |
| blockID, 2, genStamp, fsv1, blockDir); |
| |
| long metaLength = replica.getMetadataLength(); |
| assertEquals(fileLength, metaLength); |
| |
| // Delete the meta file on disks, make sure we still can get the length |
| // from cached meta size. |
| metaFile.delete(); |
| metaLength = replica.getMetadataLength(); |
| assertEquals(fileLength, metaLength); |
| if (!blockDir.exists()) { |
| assertTrue(blockDir.delete()); |
| } |
| } |
| |
| @Test |
| public void testNotifyNamenodeMissingOrNewBlock() throws Exception { |
| long blockSize = 1024; |
| int heatbeatInterval = 1; |
| HdfsConfiguration c = new HdfsConfiguration(); |
| c.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, heatbeatInterval); |
| c.setLong(DFS_BLOCK_SIZE_KEY, blockSize); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(c). |
| numDataNodes(1).build(); |
| try { |
| cluster.waitActive(); |
| DFSTestUtil.createFile(cluster.getFileSystem(), new Path("/f1"), |
| blockSize, (short)1, 0); |
| String bpid = cluster.getNameNode().getNamesystem().getBlockPoolId(); |
| DataNode dn = cluster.getDataNodes().get(0); |
| FsDatasetSpi fsdataset = dn.getFSDataset(); |
| List<ReplicaInfo> replicaInfos = |
| fsdataset.getFinalizedBlocks(bpid); |
| assertEquals(1, replicaInfos.size()); |
| |
| ReplicaInfo replicaInfo = replicaInfos.get(0); |
| String blockPath = replicaInfo.getBlockURI().getPath(); |
| String metaPath = replicaInfo.getMetadataURI().getPath(); |
| String blockTempPath = blockPath + ".tmp"; |
| String metaTempPath = metaPath + ".tmp"; |
| File blockFile = new File(blockPath); |
| File blockTempFile = new File(blockTempPath); |
| File metaFile = new File(metaPath); |
| File metaTempFile = new File(metaTempPath); |
| |
| // remove block and meta file of the block |
| blockFile.renameTo(blockTempFile); |
| metaFile.renameTo(metaTempFile); |
| assertFalse(blockFile.exists()); |
| assertFalse(metaFile.exists()); |
| |
| FsVolumeSpi.ScanInfo info = new FsVolumeSpi.ScanInfo( |
| replicaInfo.getBlockId(), blockFile.getParentFile().getAbsoluteFile(), |
| blockFile.getName(), metaFile.getName(), replicaInfo.getVolume()); |
| fsdataset.checkAndUpdate(bpid, info); |
| |
| BlockManager blockManager = cluster.getNameNode(). |
| getNamesystem().getBlockManager(); |
| GenericTestUtils.waitFor(() -> |
| blockManager.getLowRedundancyBlocksCount() == 1, 100, 5000); |
| |
| // move the block and meta file back |
| blockTempFile.renameTo(blockFile); |
| metaTempFile.renameTo(metaFile); |
| |
| fsdataset.checkAndUpdate(bpid, info); |
| GenericTestUtils.waitFor(() -> |
| blockManager.getLowRedundancyBlocksCount() == 0, 100, 5000); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test(timeout = 20000) |
| public void testReleaseVolumeRefIfExceptionThrown() throws IOException { |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder( |
| new HdfsConfiguration()).build(); |
| cluster.waitActive(); |
| FsVolumeImpl vol = (FsVolumeImpl) dataset.getFsVolumeReferences().get(0); |
| ExtendedBlock eb; |
| ReplicaInfo info; |
| int beforeCnt = 0; |
| try { |
| List<Block> blockList = new ArrayList<Block>(); |
| eb = new ExtendedBlock(BLOCKPOOL, 1, 1, 1001); |
| info = new FinalizedReplica( |
| eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile()); |
| dataset.volumeMap.add(BLOCKPOOL, info); |
| ((LocalReplica) info).getBlockFile().createNewFile(); |
| ((LocalReplica) info).getMetaFile().createNewFile(); |
| blockList.add(info); |
| |
| // Create a runtime exception. |
| dataset.asyncDiskService.shutdown(); |
| |
| beforeCnt = vol.getReferenceCount(); |
| dataset.invalidate(BLOCKPOOL, blockList.toArray(new Block[0])); |
| |
| } catch (RuntimeException re) { |
| int afterCnt = vol.getReferenceCount(); |
| assertEquals(beforeCnt, afterCnt); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| } |