blob: 74c70cec769675e7923236e1c9d5c0996efb6d12 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.protocol.Block.BLOCK_FILE_PREFIX;
import static org.apache.hadoop.util.Shell.getMemlockLimit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Time;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.WriterAppender;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
/**
* Tests {@link DirectoryScanner} handling of differences between blocks on the
* disk and block in memory.
*/
public class TestDirectoryScanner {
private static final Logger LOG =
LoggerFactory.getLogger(TestDirectoryScanner.class);
private static final int DEFAULT_GEN_STAMP = 9999;
private MiniDFSCluster cluster;
private String bpid;
private DFSClient client;
private FsDatasetSpi<? extends FsVolumeSpi> fds = null;
private DirectoryScanner scanner = null;
private final Random rand = new Random();
private final Random r = new Random();
private static final int BLOCK_LENGTH = 100;
public Configuration getConfiguration() {
Configuration configuration = new HdfsConfiguration();
configuration.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH);
configuration.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
configuration.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
getMemlockLimit(Long.MAX_VALUE));
return configuration;
}
@Before
public void setup() {
LazyPersistTestCase.initCacheManipulator();
}
/** create a file with a length of <code>fileLen</code>. */
private List<LocatedBlock> createFile(String fileNamePrefix, long fileLen,
boolean isLazyPersist) throws IOException {
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/" + fileNamePrefix + ".dat");
DFSTestUtil.createFile(fs, filePath, isLazyPersist, 1024, fileLen,
BLOCK_LENGTH, (short) 1, r.nextLong(), false);
return client.getLocatedBlocks(filePath.toString(), 0, fileLen)
.getLocatedBlocks();
}
/** Truncate a block file. */
private long truncateBlockFile() throws IOException {
try (AutoCloseableLock lock = fds.acquireDatasetLockManager().writeLock(
LockLevel.BLOCK_POOl, bpid)) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = new File(b.getBlockURI());
File mf = new File(b.getMetadataURI());
// Truncate a block file that has a corresponding metadata file
if (f.exists() && f.length() != 0 && mf.exists()) {
FileOutputStream s = null;
FileChannel channel = null;
try {
s = new FileOutputStream(f);
channel = s.getChannel();
channel.truncate(0);
LOG.info("Truncated block file " + f.getAbsolutePath());
return b.getBlockId();
} finally {
IOUtils.cleanupWithLogger(LOG, channel, s);
}
}
}
}
return 0;
}
/** Delete a block file */
private long deleteBlockFile() {
try (AutoCloseableLock lock = fds.acquireDatasetLockManager().
writeLock(LockLevel.BLOCK_POOl, bpid)) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
File f = new File(b.getBlockURI());
File mf = new File(b.getMetadataURI());
// Delete a block file that has corresponding metadata file
if (f.exists() && mf.exists() && f.delete()) {
LOG.info("Deleting block file " + f.getAbsolutePath());
return b.getBlockId();
}
}
}
return 0;
}
/** Delete block meta file */
private long deleteMetaFile() {
try (AutoCloseableLock lock = fds.acquireDatasetLockManager().
writeLock(LockLevel.BLOCK_POOl, bpid)) {
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
// Delete a metadata file
if (b.metadataExists() && b.deleteMetadata()) {
LOG.info("Deleting metadata " + b.getMetadataURI());
return b.getBlockId();
}
}
}
return 0;
}
/**
* Duplicate the given block on all volumes.
*
* @param blockId
* @throws IOException
*/
private void duplicateBlock(long blockId) throws IOException {
try (AutoCloseableLock lock = fds.acquireDatasetLockManager().
writeLock(LockLevel.BLOCK_POOl, bpid)) {
ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
try (FsDatasetSpi.FsVolumeReferences volumes =
fds.getFsVolumeReferences()) {
for (FsVolumeSpi v : volumes) {
if (v.getStorageID().equals(b.getVolume().getStorageID())) {
continue;
}
// Volume without a copy of the block. Make a copy now.
File sourceBlock = new File(b.getBlockURI());
File sourceMeta = new File(b.getMetadataURI());
URI sourceRoot = b.getVolume().getStorageLocation().getUri();
URI destRoot = v.getStorageLocation().getUri();
String relativeBlockPath =
sourceRoot.relativize(sourceBlock.toURI()).getPath();
String relativeMetaPath =
sourceRoot.relativize(sourceMeta.toURI()).getPath();
File destBlock =
new File(new File(destRoot).toString(), relativeBlockPath);
File destMeta =
new File(new File(destRoot).toString(), relativeMetaPath);
destBlock.getParentFile().mkdirs();
FileUtils.copyFile(sourceBlock, destBlock);
FileUtils.copyFile(sourceMeta, destMeta);
if (destBlock.exists() && destMeta.exists()) {
LOG.info("Copied " + sourceBlock + " ==> " + destBlock);
LOG.info("Copied " + sourceMeta + " ==> " + destMeta);
}
}
}
}
}
/** Get a random blockId that is not used already. */
private long getFreeBlockId() {
long id = rand.nextLong();
while (true) {
id = rand.nextLong();
if (FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, id) == null) {
break;
}
}
return id;
}
private String getBlockFile(long id) {
return BLOCK_FILE_PREFIX + id;
}
private String getMetaFile(long id) {
return BLOCK_FILE_PREFIX + id + "_" + DEFAULT_GEN_STAMP
+ Block.METADATA_EXTENSION;
}
/** Create a block file in a random volume. */
private long createBlockFile(long id) throws IOException {
try (
FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) {
int numVolumes = volumes.size();
int index = rand.nextInt(numVolumes - 1);
File finalizedDir =
((FsVolumeImpl) volumes.get(index)).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
}
}
return id;
}
/** Create a metafile in a random volume */
private long createMetaFile() throws IOException {
long id = getFreeBlockId();
try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) {
int numVolumes = refs.size();
int index = rand.nextInt(numVolumes - 1);
File finalizedDir =
((FsVolumeImpl) refs.get(index)).getFinalizedDir(bpid);
File file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
}
}
return id;
}
/** Create block file and corresponding metafile in a rondom volume. */
private long createBlockMetaFile(long id) throws IOException {
try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) {
int numVolumes = refs.size();
int index = rand.nextInt(numVolumes - 1);
File finalizedDir =
((FsVolumeImpl) refs.get(index)).getFinalizedDir(bpid);
File file = new File(finalizedDir, getBlockFile(id));
if (file.createNewFile()) {
LOG.info("Created block file " + file.getName());
// Create files with same prefix as block file but extension names
// such that during sorting, these files appear around meta file
// to test how DirectoryScanner handles extraneous files
String name1 = file.getAbsolutePath() + ".l";
String name2 = file.getAbsolutePath() + ".n";
file = new File(name1);
if (file.createNewFile()) {
LOG.info("Created extraneous file " + name1);
}
file = new File(name2);
if (file.createNewFile()) {
LOG.info("Created extraneous file " + name2);
}
file = new File(finalizedDir, getMetaFile(id));
if (file.createNewFile()) {
LOG.info("Created metafile " + file.getName());
}
}
}
return id;
}
private void scan(long totalBlocks, int diffsize, long missingMetaFile,
long missingBlockFile, long missingMemoryBlocks, long mismatchBlocks)
throws IOException, InterruptedException, TimeoutException {
scan(totalBlocks, diffsize, missingMetaFile, missingBlockFile,
missingMemoryBlocks, mismatchBlocks, 0);
}
private void scan(long totalBlocks, int diffsize, long missingMetaFile,
long missingBlockFile, long missingMemoryBlocks, long mismatchBlocks,
long duplicateBlocks)
throws IOException, InterruptedException, TimeoutException {
scanner.reconcile();
GenericTestUtils.waitFor(() -> {
try {
verifyStats(totalBlocks, diffsize, missingMetaFile, missingBlockFile,
missingMemoryBlocks, mismatchBlocks, duplicateBlocks);
} catch (AssertionError ex) {
LOG.warn("Assertion Error", ex);
return false;
}
return true;
}, 100, 2000);
}
private void verifyStats(long totalBlocks, int diffsize, long missingMetaFile,
long missingBlockFile, long missingMemoryBlocks, long mismatchBlocks,
long duplicateBlocks) {
Collection<FsVolumeSpi.ScanInfo> diff = scanner.diffs.getScanInfo(bpid);
assertEquals(diffsize, diff.size());
DirectoryScanner.Stats stats = scanner.stats.get(bpid);
assertNotNull(stats);
assertEquals(totalBlocks, stats.totalBlocks);
assertEquals(missingMetaFile, stats.missingMetaFile);
assertEquals(missingBlockFile, stats.missingBlockFile);
assertEquals(missingMemoryBlocks, stats.missingMemoryBlocks);
assertEquals(mismatchBlocks, stats.mismatchBlocks);
assertEquals(duplicateBlocks, stats.duplicateBlocks);
}
@Test(timeout = 300000)
public void testRetainBlockOnPersistentStorage() throws Exception {
Configuration conf = getConfiguration();
cluster = new MiniDFSCluster.Builder(conf)
.storageTypes(
new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
.numDataNodes(1).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
// Add a file with 1 block
List<LocatedBlock> blocks =
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false);
// Ensure no difference between volumeMap and disk.
scan(1, 0, 0, 0, 0, 0);
// Make a copy of the block on RAM_DISK and ensure that it is
// picked up by the scanner.
duplicateBlock(blocks.get(0).getBlock().getBlockId());
scan(2, 1, 0, 0, 0, 0, 1);
verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
scan(1, 0, 0, 0, 0, 0);
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
cluster.shutdown();
cluster = null;
}
}
/**
* test scan only meta file NOT generate wrong folder structure warn log.
*/
@Test(timeout=600000)
public void testScanDirectoryStructureWarn() throws Exception {
//add a logger stream to check what has printed to log
ByteArrayOutputStream loggerStream = new ByteArrayOutputStream();
org.apache.log4j.Logger rootLogger =
org.apache.log4j.Logger.getRootLogger();
GenericTestUtils.setRootLogLevel(Level.INFO);
WriterAppender writerAppender =
new WriterAppender(new SimpleLayout(), loggerStream);
rootLogger.addAppender(writerAppender);
Configuration conf = getConfiguration();
cluster = new MiniDFSCluster
.Builder(conf)
.storageTypes(new StorageType[] {
StorageType.RAM_DISK, StorageType.DEFAULT })
.numDataNodes(1)
.build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
// Create a file file on RAM_DISK
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, true);
// Ensure no difference between volumeMap and disk.
scan(1, 0, 0, 0, 0, 0);
//delete thre block file , left the meta file alone
deleteBlockFile();
//scan to ensure log warn not printed
scan(1, 1, 0, 1, 0, 0, 0);
//ensure the warn log not appear and missing block log do appear
String logContent = new String(loggerStream.toByteArray());
String missingBlockWarn = "Deleted a metadata file" +
" for the deleted block";
String dirStructureWarnLog = " found in invalid directory." +
" Expected directory: ";
assertFalse("directory check print meaningless warning message",
logContent.contains(dirStructureWarnLog));
assertTrue("missing block warn log not appear",
logContent.contains(missingBlockWarn));
LOG.info("check pass");
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
cluster.shutdown();
cluster = null;
}
}
@Test(timeout = 300000)
public void testDeleteBlockOnTransientStorage() throws Exception {
Configuration conf = getConfiguration();
cluster = new MiniDFSCluster.Builder(conf)
.storageTypes(
new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
.numDataNodes(1).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
// Create a file on RAM_DISK
List<LocatedBlock> blocks =
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, true);
// Ensure no difference between volumeMap and disk.
scan(1, 0, 0, 0, 0, 0);
// Make a copy of the block on DEFAULT storage and ensure that it is
// picked up by the scanner.
duplicateBlock(blocks.get(0).getBlock().getBlockId());
scan(2, 1, 0, 0, 0, 0, 1);
// Ensure that the copy on RAM_DISK was deleted.
verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
scan(1, 0, 0, 0, 0, 0);
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
cluster.shutdown();
cluster = null;
}
}
@Test(timeout = 600000)
public void testRegularBlock() throws Exception {
Configuration conf = getConfiguration();
cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
// log trace
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.
captureLogs(NameNode.stateChangeLog);
// Add files with 5 blocks
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 5, false);
List<ReplicaInfo> infos = new ArrayList<>(FsDatasetTestUtil.getReplicas(fds, bpid));
ReplicaInfo lastReplica = infos.get(infos.size() - 1);
ReplicaInfo penultimateReplica = infos.get(infos.size() - 2);
String blockParent = new File(lastReplica.getBlockURI().getPath()).getParent();
File lastBlockFile = new File(blockParent, getBlockFile(lastReplica.getBlockId()));
File penultimateBlockFile = new File(blockParent,
getBlockFile(penultimateReplica.getBlockId()));
FileUtil.symLink(lastBlockFile.toString(), penultimateBlockFile.toString());
ExtendedBlock block = new ExtendedBlock(bpid, penultimateReplica.getBlockId());
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scanner.reconcile();
DirectoryScanner.Stats stats = scanner.stats.get(bpid);
assertNotNull(stats);
assertEquals(1, stats.mismatchBlocks);
// check nn log
String msg = "*DIR* reportBadBlocks for block: " + bpid + ":" +
getBlockFile(block.getBlockId());
assertTrue(logCapturer.getOutput().contains(msg));
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
cluster.shutdown();
}
}
@Test(timeout = 600000)
public void testDirectoryScanner() throws Exception {
// Run the test with and without parallel scanning
for (int parallelism = 1; parallelism < 3; parallelism++) {
runTest(parallelism);
}
}
public void runTest(int parallelism) throws Exception {
Configuration conf = getConfiguration();
cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
parallelism);
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
// Add files with 100 blocks
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 100, false);
long totalBlocks = 100;
// Test1: No difference between volumeMap and disk
scan(100, 0, 0, 0, 0, 0);
// Test2: block metafile is missing
long blockId = deleteMetaFile();
scan(totalBlocks, 1, 1, 0, 0, 1);
verifyGenStamp(blockId, HdfsConstants.GRANDFATHER_GENERATION_STAMP);
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test3: block file is missing
blockId = deleteBlockFile();
scan(totalBlocks, 1, 0, 1, 0, 0);
totalBlocks--;
verifyDeletion(blockId);
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test4: A block file exists for which there is no metafile and
// a block in memory
blockId = createBlockFile(blockId);
totalBlocks++;
scan(totalBlocks, 1, 1, 0, 1, 0);
verifyAddition(blockId, HdfsConstants.GRANDFATHER_GENERATION_STAMP, 0);
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test5: A metafile exists for which there is no block file and
// a block in memory
blockId = createMetaFile();
scan(totalBlocks + 1, 1, 0, 1, 1, 0);
File metafile = new File(getMetaFile(blockId));
assertTrue(!metafile.exists());
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test6: A block file and metafile exists for which there is no block in
blockId = deleteBlockFile();
scan(totalBlocks, 1, 0, 1, 0, 0);
totalBlocks--;
verifyDeletion(blockId);
blockId = createBlockMetaFile(blockId);
totalBlocks++;
scan(totalBlocks, 1, 0, 0, 1, 0);
verifyAddition(blockId, DEFAULT_GEN_STAMP, 0);
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test7: Delete bunch of metafiles
for (int i = 0; i < 10; i++) {
blockId = deleteMetaFile();
}
scan(totalBlocks, 10, 10, 0, 0, 10);
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test8: Delete bunch of block files and record the ids.
List<Long> ids = new ArrayList<>();
for (int i = 0; i < 10; i++) {
ids.add(deleteBlockFile());
}
scan(totalBlocks, 10, 0, 10, 0, 0);
totalBlocks -= 10;
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test9: create a bunch of blocks files
for (int i = 0; i < 10; i++) {
blockId = createBlockFile(ids.get(i));
}
totalBlocks += 10;
scan(totalBlocks, 10, 10, 0, 10, 0);
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test10: create a bunch of metafiles
for (int i = 0; i < 10; i++) {
blockId = createMetaFile();
}
scan(totalBlocks + 10, 10, 0, 10, 10, 0);
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test11: create a bunch block files and meta files
ids.clear();
for (int i = 0; i < 10; i++) {
ids.add(deleteBlockFile());
}
scan(totalBlocks, 10, 0, 10, 0, 0);
totalBlocks -= 10;
for (int i = 0; i < 10; i++) {
blockId = createBlockMetaFile(ids.get(i));
}
totalBlocks += 10;
scan(totalBlocks, 10, 0, 0, 10, 0);
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test12: truncate block files to test block length mismatch
for (int i = 0; i < 10; i++) {
truncateBlockFile();
}
scan(totalBlocks, 10, 0, 0, 0, 10);
scan(totalBlocks, 0, 0, 0, 0, 0);
// Test13: all the conditions combined
long blockId1 = deleteBlockFile();
long blockId2 = deleteBlockFile();
scan(totalBlocks, 2, 0, 2, 0, 0);
totalBlocks -= 2;
verifyDeletion(blockId1);
verifyDeletion(blockId2);
createMetaFile();
createBlockFile(blockId1);
createBlockMetaFile(blockId2);
deleteMetaFile();
deleteBlockFile();
truncateBlockFile();
scan(totalBlocks + 3, 6, 2, 2, 3, 2);
scan(totalBlocks + 1, 0, 0, 0, 0, 0);
// Test14: make sure no throttling is happening
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
cluster.shutdown();
}
}
/**
* Test that the timeslice throttle limits the report compiler thread's
* execution time correctly. We test by scanning a large block pool and
* comparing the time spent waiting to the time spent running.
*
* The block pool has to be large, or the ratio will be off. The throttle
* allows the report compiler thread to finish its current cycle when blocking
* it, so the ratio will always be a little lower than expected. The smaller
* the block pool, the further off the ratio will be.
*
* @throws Exception thrown on unexpected failure
*/
@Test
public void testThrottling() throws Exception {
Configuration conf = new Configuration(getConfiguration());
// We need lots of blocks so the report compiler threads have enough to
// keep them busy while we watch them.
int blocks = 20000;
int maxRetries = 3;
cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
100);
final int maxBlocksPerFile =
(int) DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT;
int numBlocksToCreate = blocks;
while (numBlocksToCreate > 0) {
final int toCreate = Math.min(maxBlocksPerFile, numBlocksToCreate);
createFile(GenericTestUtils.getMethodName() + numBlocksToCreate,
BLOCK_LENGTH * toCreate, false);
numBlocksToCreate -= toCreate;
}
float ratio = 0.0f;
int retries = maxRetries;
while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
scanner = new DirectoryScanner(fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
// Waiting should be about 9x running.
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too restrictive", ratio <= 10f);
assertTrue("Throttle is too permissive" + ratio, ratio >= 7f);
// Test with a different limit
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
200);
ratio = 0.0f;
retries = maxRetries;
while ((retries > 0) && ((ratio < 2.75f) || (ratio > 4.5f))) {
scanner = new DirectoryScanner(fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
// Waiting should be about 4x running.
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too restrictive", ratio <= 4.5f);
assertTrue("Throttle is too permissive", ratio >= 2.75f);
// Test with more than 1 thread
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 3);
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
100);
ratio = 0.0f;
retries = maxRetries;
while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
scanner = new DirectoryScanner(fds, conf);
ratio = runThrottleTest(blocks);
retries -= 1;
}
// Waiting should be about 9x running.
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too restrictive", ratio <= 10f);
assertTrue("Throttle is too permissive", ratio >= 7f);
// Test with no limit
scanner = new DirectoryScanner(fds, getConfiguration());
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test with a 1ms limit. This also tests whether the scanner can be
// shutdown cleanly in mid stride.
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
1);
ratio = 0.0f;
retries = maxRetries;
ScheduledExecutorService interruptor =
Executors.newScheduledThreadPool(maxRetries);
try {
while ((retries > 0) && (ratio < 10)) {
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
final AtomicLong nowMs = new AtomicLong();
// Stop the scanner after 2 seconds because otherwise it will take an
// eternity to complete it's run
interruptor.schedule(new Runnable() {
@Override
public void run() {
nowMs.set(Time.monotonicNow());
scanner.shutdown();
}
}, 2L, TimeUnit.SECONDS);
scanner.reconcile();
assertFalse(scanner.getRunStatus());
long finalMs = nowMs.get();
// If the scan didn't complete before the shutdown was run, check
// that the shutdown was timely
if (finalMs > 0) {
LOG.info("Scanner took " + (Time.monotonicNow() - finalMs)
+ "ms to shutdown");
assertTrue("Scanner took too long to shutdown",
Time.monotonicNow() - finalMs < 1000L);
}
ratio =
(float) scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
retries -= 1;
}
} finally {
interruptor.shutdown();
}
// We just want to test that it waits a lot, but it also runs some
LOG.info("RATIO: " + ratio);
assertTrue("Throttle is too permissive", ratio > 8);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test with a 0 limit, i.e. disabled
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
0);
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test with a 1000 limit, i.e. disabled
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
1000);
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
assertTrue("Throttle appears to be engaged",
scanner.timeWaitingMs.get() < 10L);
assertTrue("Report complier threads logged no execution time",
scanner.timeRunningMs.get() > 0L);
// Test that throttle works from regular start
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
conf.setInt(
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
10);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scanner.start();
int count = 50;
while ((count > 0) && (scanner.timeWaitingMs.get() < 500L)) {
Thread.sleep(100L);
count -= 1;
}
scanner.shutdown();
assertFalse(scanner.getRunStatus());
assertTrue("Throttle does not appear to be engaged", count > 0);
} finally {
cluster.shutdown();
}
}
private float runThrottleTest(int blocks)
throws IOException, InterruptedException, TimeoutException {
scanner.setRetainDiffs(true);
scan(blocks, 0, 0, 0, 0, 0);
scanner.shutdown();
assertFalse(scanner.getRunStatus());
return (float) scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
}
private void verifyAddition(long blockId, long genStamp, long size) {
final ReplicaInfo replicainfo;
replicainfo = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
assertNotNull(replicainfo);
// Added block has the same file as the one created by the test
File file = new File(getBlockFile(blockId));
assertEquals(file.getName(),
FsDatasetTestUtil.getFile(fds, bpid, blockId).getName());
// Generation stamp is same as that of created file
assertEquals(genStamp, replicainfo.getGenerationStamp());
// File size matches
assertEquals(size, replicainfo.getNumBytes());
}
private void verifyDeletion(long blockId) {
// Ensure block does not exist in memory
assertNull(FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId));
}
private void verifyGenStamp(long blockId, long genStamp) {
final ReplicaInfo memBlock;
memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
assertNotNull(memBlock);
assertEquals(genStamp, memBlock.getGenerationStamp());
}
private void verifyStorageType(long blockId, boolean expectTransient) {
final ReplicaInfo memBlock;
memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
assertNotNull(memBlock);
assertThat(memBlock.getVolume().isTransientStorage(), is(expectTransient));
}
private static class TestFsVolumeSpi implements FsVolumeSpi {
@Override
public String[] getBlockPoolList() {
return new String[0];
}
@Override
public FsVolumeReference obtainReference() throws ClosedChannelException {
return null;
}
@Override
public long getAvailable() throws IOException {
return 0;
}
public File getFinalizedDir(String bpid) throws IOException {
return new File("/base/current/" + bpid + "/finalized");
}
@Override
public StorageType getStorageType() {
return StorageType.DEFAULT;
}
@Override
public String getStorageID() {
return "";
}
@Override
public boolean isTransientStorage() {
return false;
}
@Override
public boolean isRAMStorage() {
return false;
}
@Override
public void reserveSpaceForReplica(long bytesToReserve) {
}
@Override
public void releaseReservedSpace(long bytesToRelease) {
}
@Override
public void releaseLockedMemory(long bytesToRelease) {
}
@Override
public BlockIterator newBlockIterator(String bpid, String name) {
throw new UnsupportedOperationException();
}
@Override
public BlockIterator loadBlockIterator(String bpid, String name)
throws IOException {
throw new UnsupportedOperationException();
}
@SuppressWarnings("rawtypes")
@Override
public FsDatasetSpi getDataset() {
throw new UnsupportedOperationException();
}
@Override
public StorageLocation getStorageLocation() {
return null;
}
@Override
public URI getBaseURI() {
return (new File("/base")).toURI();
}
@Override
public DF getUsageStats(Configuration conf) {
return null;
}
@Override
public byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile)
throws IOException {
return null;
}
@Override
public void compileReport(String bpid,
Collection<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException, IOException {
}
@Override
public FileIoProvider getFileIoProvider() {
return null;
}
@Override
public DataNodeVolumeMetrics getMetrics() {
return null;
}
@Override
public VolumeCheckResult check(VolumeCheckContext context)
throws Exception {
return VolumeCheckResult.HEALTHY;
}
}
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();
private final static String BPID_1 = "BP-783049782-127.0.0.1-1370971773491";
private final static String BPID_2 = "BP-367845636-127.0.0.1-5895645674231";
void testScanInfoObject(long blockId, File baseDir, String blockFile,
String metaFile)
throws Exception {
FsVolumeSpi.ScanInfo scanInfo =
new FsVolumeSpi.ScanInfo(blockId, baseDir, blockFile, metaFile,
TEST_VOLUME);
assertEquals(blockId, scanInfo.getBlockId());
if (blockFile != null) {
assertEquals(new File(baseDir, blockFile).getAbsolutePath(),
scanInfo.getBlockFile().getAbsolutePath());
} else {
assertNull(scanInfo.getBlockFile());
}
if (metaFile != null) {
assertEquals(new File(baseDir, metaFile).getAbsolutePath(),
scanInfo.getMetaFile().getAbsolutePath());
} else {
assertNull(scanInfo.getMetaFile());
}
assertEquals(TEST_VOLUME, scanInfo.getVolume());
}
void testScanInfoObject(long blockId) throws Exception {
FsVolumeSpi.ScanInfo scanInfo =
new FsVolumeSpi.ScanInfo(blockId, null, null, null, null);
assertEquals(blockId, scanInfo.getBlockId());
assertNull(scanInfo.getBlockFile());
assertNull(scanInfo.getMetaFile());
}
@Test(timeout = 120000)
public void TestScanInfo() throws Exception {
testScanInfoObject(123,
new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath()),
"blk_123", "blk_123__1001.meta");
testScanInfoObject(464,
new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath()),
"blk_123", null);
testScanInfoObject(523,
new File(TEST_VOLUME.getFinalizedDir(BPID_1).getAbsolutePath()),
null, "blk_123__1009.meta");
testScanInfoObject(789, null, null, null);
testScanInfoObject(456);
testScanInfoObject(123,
new File(TEST_VOLUME.getFinalizedDir(BPID_2).getAbsolutePath()),
"blk_567", "blk_567__1004.meta");
}
/**
* Test the behavior of exception handling during directory scan operation.
* Directory scanner shouldn't abort the scan on every directory just because
* one had an error.
*/
@Test(timeout = 60000)
public void testExceptionHandlingWhileDirectoryScan() throws Exception {
Configuration conf = getConfiguration();
cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
// Add files with 2 blocks
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 2, false);
// Inject error on #getFinalizedDir() so that ReportCompiler#call() will
// hit exception while preparing the block info report list.
List<FsVolumeSpi> volumes = new ArrayList<>();
Iterator<FsVolumeSpi> iterator = fds.getFsVolumeReferences().iterator();
while (iterator.hasNext()) {
FsVolumeImpl volume = (FsVolumeImpl) iterator.next();
FsVolumeImpl spy = Mockito.spy(volume);
Mockito.doThrow(new IOException("Error while getFinalizedDir"))
.when(spy).getFinalizedDir(volume.getBlockPoolList()[0]);
volumes.add(spy);
}
FsVolumeReferences volReferences = new FsVolumeReferences(volumes);
FsDatasetSpi<? extends FsVolumeSpi> spyFds = Mockito.spy(fds);
Mockito.doReturn(volReferences).when(spyFds).getFsVolumeReferences();
scanner = new DirectoryScanner(spyFds, conf);
scanner.setRetainDiffs(true);
scanner.reconcile();
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
cluster.shutdown();
}
}
@Test
public void testDirectoryScannerInFederatedCluster() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration(getConfiguration());
// Create Federated cluster with two nameservices and one DN
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
.numDataNodes(1).build()) {
cluster.waitActive();
cluster.transitionToActive(1);
cluster.transitionToActive(3);
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
// Create one block in first nameservice
FileSystem fs = cluster.getFileSystem(1);
int bp1Files = 1;
writeFile(fs, bp1Files);
// Create two blocks in second nameservice
FileSystem fs2 = cluster.getFileSystem(3);
int bp2Files = 2;
writeFile(fs2, bp2Files);
// Call the Directory scanner
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scanner.reconcile();
// Check blocks in corresponding BP
GenericTestUtils.waitFor(() -> {
try {
bpid = cluster.getNamesystem(1).getBlockPoolId();
verifyStats(bp1Files, 0, 0, 0, 0, 0, 0);
bpid = cluster.getNamesystem(3).getBlockPoolId();
verifyStats(bp2Files, 0, 0, 0, 0, 0, 0);
} catch (AssertionError ex) {
return false;
}
return true;
}, 50, 2000);
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
}
}
private static final String SEP = System.getProperty("file.separator");
/**
* Test parsing LocalReplica. We should be able to find the replica's path
* even if the replica's dir doesn't match the idToBlockDir.
*/
@Test(timeout = 3000)
public void testLocalReplicaParsing() {
String baseDir = GenericTestUtils.getRandomizedTempPath();
long blkId = getRandomBlockId();
File blockDir = DatanodeUtil.idToBlockDir(new File(baseDir), blkId);
String subdir1 = new File(blockDir.getParent()).getName();
// test parsing dir without ./subdir/subdir
LocalReplica.ReplicaDirInfo info =
LocalReplica.parseBaseDir(new File(baseDir), blkId);
assertEquals(baseDir, info.baseDirPath);
assertEquals(false, info.hasSubidrs);
// test when path doesn't match the idToBLockDir.
String pathWithOneSubdir = baseDir + SEP + subdir1;
info = LocalReplica.parseBaseDir(new File(pathWithOneSubdir), blkId);
assertEquals(pathWithOneSubdir, info.baseDirPath);
assertEquals(false, info.hasSubidrs);
// test when path doesn't match the idToBlockDir.
String badPath = baseDir + SEP + subdir1 + SEP + "subdir-not-exist";
info = LocalReplica.parseBaseDir(new File(badPath), blkId);
assertEquals(badPath, info.baseDirPath);
assertEquals(false, info.hasSubidrs);
// test when path matches the idToBlockDir.
info = LocalReplica.parseBaseDir(blockDir, blkId);
assertEquals(baseDir, info.baseDirPath);
assertEquals(true, info.hasSubidrs);
}
/**
* Test whether can LocalReplica.updateWithReplica() correct the wrongly
* recorded replica location.
*/
@Test(timeout = 3000)
public void testLocalReplicaUpdateWithReplica() throws Exception {
String baseDir = GenericTestUtils.getRandomizedTempPath();
long blkId = getRandomBlockId();
File blockDir = DatanodeUtil.idToBlockDir(new File(baseDir), blkId);
String subdir2 = blockDir.getName();
String subdir1 = new File(blockDir.getParent()).getName();
String diskSub = subdir2.equals("subdir0") ? "subdir1" : "subdir0";
// the block file on disk
File diskBlockDir = new File(baseDir + SEP + subdir1 + SEP + diskSub);
File realBlkFile = new File(diskBlockDir, BLOCK_FILE_PREFIX + blkId);
// the block file in mem
File memBlockDir = blockDir;
LocalReplica localReplica = (LocalReplica) new ReplicaBuilder(
HdfsServerConstants.ReplicaState.FINALIZED)
.setDirectoryToUse(memBlockDir).setBlockId(blkId).build();
// DirectoryScanner find the inconsistent file and try to make it right
StorageLocation sl = StorageLocation.parse(realBlkFile.toString());
localReplica.updateWithReplica(sl);
assertEquals(realBlkFile, localReplica.getBlockFile());
}
public long getRandomBlockId() {
return Math.abs(new Random().nextLong());
}
private void writeFile(FileSystem fs, int numFiles) throws IOException {
final String fileName = "/" + GenericTestUtils.getMethodName();
for (int i = 0; i < numFiles; i++) {
final Path filePath = new Path(fileName + i);
DFSTestUtil.createFile(fs, filePath, 1, (short) 1, 0);
}
}
}