blob: 49e32267b2b2d4164c5df523627dd16f9f7a98c4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS;
import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER;
import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Supplier;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.Statistics;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestBlockScanner {
public static final Logger LOG =
LoggerFactory.getLogger(TestBlockScanner.class);
@Before
public void before() {
BlockScanner.Conf.allowUnitTestSettings = true;
GenericTestUtils.setLogLevel(BlockScanner.LOG, Level.ALL);
GenericTestUtils.setLogLevel(VolumeScanner.LOG, Level.ALL);
GenericTestUtils.setLogLevel(FsVolumeImpl.LOG, Level.ALL);
}
private static void disableBlockScanner(Configuration conf) {
conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 0L);
}
private static class TestContext implements Closeable {
final int numNameServices;
final MiniDFSCluster cluster;
final DistributedFileSystem[] dfs;
final String[] bpids;
final DataNode datanode;
final BlockScanner blockScanner;
final FsDatasetSpi<? extends FsVolumeSpi> data;
final FsDatasetSpi.FsVolumeReferences volumes;
TestContext(Configuration conf, int numNameServices) throws Exception {
this.numNameServices = numNameServices;
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf, basedir).
numDataNodes(1).
storagesPerDatanode(1);
if (numNameServices > 1) {
bld.nnTopology(MiniDFSNNTopology.
simpleFederatedTopology(numNameServices));
}
cluster = bld.build();
cluster.waitActive();
dfs = new DistributedFileSystem[numNameServices];
for (int i = 0; i < numNameServices; i++) {
dfs[i] = cluster.getFileSystem(i);
}
bpids = new String[numNameServices];
for (int i = 0; i < numNameServices; i++) {
bpids[i] = cluster.getNamesystem(i).getBlockPoolId();
}
datanode = cluster.getDataNodes().get(0);
blockScanner = datanode.getBlockScanner();
for (int i = 0; i < numNameServices; i++) {
dfs[i].mkdirs(new Path("/test"));
}
data = datanode.getFSDataset();
volumes = data.getFsVolumeReferences();
}
@Override
public void close() throws IOException {
volumes.close();
if (cluster != null) {
for (int i = 0; i < numNameServices; i++) {
dfs[i].delete(new Path("/test"), true);
dfs[i].close();
}
cluster.shutdown();
}
}
public void createFiles(int nsIdx, int numFiles, int length)
throws Exception {
for (int blockIdx = 0; blockIdx < numFiles; blockIdx++) {
DFSTestUtil.createFile(dfs[nsIdx], getPath(blockIdx), length,
(short)1, 123L);
}
}
public Path getPath(int fileIdx) {
return new Path("/test/" + fileIdx);
}
public ExtendedBlock getFileBlock(int nsIdx, int fileIdx)
throws Exception {
return DFSTestUtil.getFirstBlock(dfs[nsIdx], getPath(fileIdx));
}
public MaterializedReplica getMaterializedReplica(int nsIdx, int fileIdx)
throws Exception {
return cluster.getMaterializedReplica(0, getFileBlock(nsIdx, fileIdx));
}
}
/**
* Test iterating through a bunch of blocks in a volume using a volume
* iterator.<p/>
*
* We will rewind the iterator when about halfway through the blocks.
*
* @param numFiles The number of files to create.
* @param maxStaleness The maximum staleness to allow with the iterator.
* @throws Exception
*/
private void testVolumeIteratorImpl(int numFiles,
long maxStaleness) throws Exception {
Configuration conf = new Configuration();
disableBlockScanner(conf);
TestContext ctx = new TestContext(conf, 1);
ctx.createFiles(0, numFiles, 1);
assertEquals(1, ctx.volumes.size());
FsVolumeSpi volume = ctx.volumes.get(0);
ExtendedBlock savedBlock = null, loadedBlock = null;
boolean testedRewind = false, testedSave = false, testedLoad = false;
int blocksProcessed = 0, savedBlocksProcessed = 0;
try {
List<BPOfferService> bpos = ctx.datanode.getAllBpOs();
assertEquals(1, bpos.size());
BlockIterator iter = volume.newBlockIterator(ctx.bpids[0], "test");
assertEquals(ctx.bpids[0], iter.getBlockPoolId());
iter.setMaxStalenessMs(maxStaleness);
while (true) {
HashSet<ExtendedBlock> blocks = new HashSet<ExtendedBlock>();
for (int blockIdx = 0; blockIdx < numFiles; blockIdx++) {
blocks.add(ctx.getFileBlock(0, blockIdx));
}
while (true) {
ExtendedBlock block = iter.nextBlock();
if (block == null) {
break;
}
blocksProcessed++;
LOG.info("BlockIterator for {} found block {}, blocksProcessed = {}",
volume, block, blocksProcessed);
if (testedSave && (savedBlock == null)) {
savedBlock = block;
}
if (testedLoad && (loadedBlock == null)) {
loadedBlock = block;
// The block that we get back right after loading the iterator
// should be the same block we got back right after saving
// the iterator.
assertEquals(savedBlock, loadedBlock);
}
boolean blockRemoved = blocks.remove(block);
assertTrue("Found unknown block " + block, blockRemoved);
if (blocksProcessed > (numFiles / 3)) {
if (!testedSave) {
LOG.info("Processed {} blocks out of {}. Saving iterator.",
blocksProcessed, numFiles);
iter.save();
testedSave = true;
savedBlocksProcessed = blocksProcessed;
}
}
if (blocksProcessed > (numFiles / 2)) {
if (!testedRewind) {
LOG.info("Processed {} blocks out of {}. Rewinding iterator.",
blocksProcessed, numFiles);
iter.rewind();
break;
}
}
if (blocksProcessed > ((2 * numFiles) / 3)) {
if (!testedLoad) {
LOG.info("Processed {} blocks out of {}. Loading iterator.",
blocksProcessed, numFiles);
iter = volume.loadBlockIterator(ctx.bpids[0], "test");
iter.setMaxStalenessMs(maxStaleness);
break;
}
}
}
if (!testedRewind) {
testedRewind = true;
blocksProcessed = 0;
LOG.info("Starting again at the beginning...");
continue;
}
if (!testedLoad) {
testedLoad = true;
blocksProcessed = savedBlocksProcessed;
LOG.info("Starting again at the load point...");
continue;
}
assertEquals(numFiles, blocksProcessed);
break;
}
} finally {
ctx.close();
}
}
@Test(timeout=60000)
public void testVolumeIteratorWithoutCaching() throws Exception {
testVolumeIteratorImpl(5, 0);
}
@Test(timeout=300000)
public void testVolumeIteratorWithCaching() throws Exception {
testVolumeIteratorImpl(600, 100);
}
@Test(timeout=60000)
public void testDisableVolumeScanner() throws Exception {
Configuration conf = new Configuration();
disableBlockScanner(conf);
TestContext ctx = new TestContext(conf, 1);
try {
Assert.assertFalse(ctx.datanode.getBlockScanner().isEnabled());
} finally {
ctx.close();
}
}
public static class TestScanResultHandler extends ScanResultHandler {
static class Info {
boolean shouldRun = false;
final Set<ExtendedBlock> badBlocks = new HashSet<ExtendedBlock>();
final Set<ExtendedBlock> goodBlocks = new HashSet<ExtendedBlock>();
long blocksScanned = 0;
Semaphore sem = null;
@Override
public String toString() {
final StringBuilder bld = new StringBuilder();
bld.append("ScanResultHandler.Info{");
bld.append("shouldRun=").append(shouldRun).append(", ");
bld.append("blocksScanned=").append(blocksScanned).append(", ");
bld.append("sem#availablePermits=").append(sem.availablePermits()).
append(", ");
bld.append("badBlocks=").append(badBlocks).append(", ");
bld.append("goodBlocks=").append(goodBlocks);
bld.append("}");
return bld.toString();
}
}
private VolumeScanner scanner;
final static ConcurrentHashMap<String, Info> infos =
new ConcurrentHashMap<String, Info>();
static Info getInfo(FsVolumeSpi volume) {
Info newInfo = new Info();
Info prevInfo = infos.
putIfAbsent(volume.getStorageID(), newInfo);
return prevInfo == null ? newInfo : prevInfo;
}
@Override
public void setup(VolumeScanner scanner) {
this.scanner = scanner;
Info info = getInfo(scanner.volume);
LOG.info("about to start scanning.");
synchronized (info) {
while (!info.shouldRun) {
try {
info.wait();
} catch (InterruptedException e) {
}
}
}
LOG.info("starting scanning.");
}
@Override
public void handle(ExtendedBlock block, IOException e) {
LOG.info("handling block {} (exception {})", block, e);
Info info = getInfo(scanner.volume);
Semaphore sem;
synchronized (info) {
sem = info.sem;
}
if (sem != null) {
try {
sem.acquire();
} catch (InterruptedException ie) {
throw new RuntimeException("interrupted");
}
}
synchronized (info) {
if (!info.shouldRun) {
throw new RuntimeException("stopping volumescanner thread.");
}
if (e == null) {
info.goodBlocks.add(block);
} else {
info.badBlocks.add(block);
}
info.blocksScanned++;
}
}
}
private void testScanAllBlocksImpl(final boolean rescan) throws Exception {
Configuration conf = new Configuration();
conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 1048576L);
if (rescan) {
conf.setLong(INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, 100L);
} else {
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
}
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
final TestContext ctx = new TestContext(conf, 1);
final int NUM_EXPECTED_BLOCKS = 10;
ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
final Set<ExtendedBlock> expectedBlocks = new HashSet<ExtendedBlock>();
for (int i = 0; i < NUM_EXPECTED_BLOCKS; i++) {
expectedBlocks.add(ctx.getFileBlock(0, i));
}
TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
synchronized (info) {
info.shouldRun = true;
info.notify();
}
GenericTestUtils.waitFor(new Supplier<Boolean>(){
@Override
public Boolean get() {
TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
int numFoundBlocks = 0;
StringBuilder foundBlocksBld = new StringBuilder();
String prefix = "";
synchronized (info) {
for (ExtendedBlock block : info.goodBlocks) {
assertTrue(expectedBlocks.contains(block));
numFoundBlocks++;
foundBlocksBld.append(prefix).append(block);
prefix = ", ";
}
LOG.info("numFoundBlocks = {}. blocksScanned = {}. Found blocks {}",
numFoundBlocks, info.blocksScanned, foundBlocksBld.toString());
if (rescan) {
return (numFoundBlocks == NUM_EXPECTED_BLOCKS) &&
(info.blocksScanned >= 2 * NUM_EXPECTED_BLOCKS);
} else {
return numFoundBlocks == NUM_EXPECTED_BLOCKS;
}
}
}
}, 10, 60000);
if (!rescan) {
synchronized (info) {
assertEquals(NUM_EXPECTED_BLOCKS, info.blocksScanned);
}
Statistics stats = ctx.blockScanner.getVolumeStats(
ctx.volumes.get(0).getStorageID());
assertEquals(5 * NUM_EXPECTED_BLOCKS, stats.bytesScannedInPastHour);
assertEquals(NUM_EXPECTED_BLOCKS, stats.blocksScannedSinceRestart);
assertEquals(NUM_EXPECTED_BLOCKS, stats.blocksScannedInCurrentPeriod);
assertEquals(0, stats.scanErrorsSinceRestart);
assertEquals(1, stats.scansSinceRestart);
}
ctx.close();
}
/**
* Test scanning all blocks. Set the scan period high enough that
* we shouldn't rescan any block during this test.
*/
@Test(timeout=60000)
public void testScanAllBlocksNoRescan() throws Exception {
testScanAllBlocksImpl(false);
}
/**
* Test scanning all blocks. Set the scan period high enough that
* we should rescan all blocks at least twice during this test.
*/
@Test(timeout=60000)
public void testScanAllBlocksWithRescan() throws Exception {
testScanAllBlocksImpl(true);
}
/**
* Test that we don't scan too many blocks per second.
*/
@Test(timeout=120000)
public void testScanRateLimit() throws Exception {
Configuration conf = new Configuration();
// Limit scan bytes per second dramatically
conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 4096L);
// Scan continuously
conf.setLong(INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, 1L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
final TestContext ctx = new TestContext(conf, 1);
final int NUM_EXPECTED_BLOCKS = 5;
ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 4096);
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
long startMs = Time.monotonicNow();
synchronized (info) {
info.shouldRun = true;
info.notify();
}
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
return info.blocksScanned > 0;
}
}
}, 1, 30000);
Thread.sleep(2000);
synchronized (info) {
long endMs = Time.monotonicNow();
// Should scan no more than one block a second.
long seconds = ((endMs + 999 - startMs) / 1000);
long maxBlocksScanned = seconds * 1;
assertTrue("The number of blocks scanned is too large. Scanned " +
info.blocksScanned + " blocks; only expected to scan at most " +
maxBlocksScanned + " in " + seconds + " seconds.",
info.blocksScanned <= maxBlocksScanned);
}
ctx.close();
}
@Test(timeout=120000)
public void testCorruptBlockHandling() throws Exception {
Configuration conf = new Configuration();
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
final TestContext ctx = new TestContext(conf, 1);
final int NUM_EXPECTED_BLOCKS = 5;
final int CORRUPT_INDEX = 3;
ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 4);
ExtendedBlock badBlock = ctx.getFileBlock(0, CORRUPT_INDEX);
ctx.cluster.corruptBlockOnDataNodes(badBlock);
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
synchronized (info) {
info.shouldRun = true;
info.notify();
}
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
return info.blocksScanned == NUM_EXPECTED_BLOCKS;
}
}
}, 3, 30000);
synchronized (info) {
assertTrue(info.badBlocks.contains(badBlock));
for (int i = 0; i < NUM_EXPECTED_BLOCKS; i++) {
if (i != CORRUPT_INDEX) {
ExtendedBlock block = ctx.getFileBlock(0, i);
assertTrue(info.goodBlocks.contains(block));
}
}
}
ctx.close();
}
/**
* Test that we save the scan cursor when shutting down the datanode, and
* restart scanning from there when the datanode is restarted.
*/
@Test(timeout=120000)
public void testDatanodeCursor() throws Exception {
Configuration conf = new Configuration();
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
final TestContext ctx = new TestContext(conf, 1);
final int NUM_EXPECTED_BLOCKS = 10;
ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
synchronized (info) {
info.sem = new Semaphore(5);
info.shouldRun = true;
info.notify();
}
// Scan the first 5 blocks
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
return info.blocksScanned == 5;
}
}
}, 3, 30000);
synchronized (info) {
assertEquals(5, info.goodBlocks.size());
assertEquals(5, info.blocksScanned);
info.shouldRun = false;
}
ctx.datanode.shutdown();
String vPath = ctx.volumes.get(0).getBasePath();
File cursorPath = new File(new File(new File(vPath, "current"),
ctx.bpids[0]), "scanner.cursor");
assertTrue("Failed to find cursor save file in " +
cursorPath.getAbsolutePath(), cursorPath.exists());
Set<ExtendedBlock> prevGoodBlocks = new HashSet<ExtendedBlock>();
synchronized (info) {
info.sem = new Semaphore(4);
prevGoodBlocks.addAll(info.goodBlocks);
info.goodBlocks.clear();
}
// The block that we were scanning when we shut down the DN won't get
// recorded.
// After restarting the datanode, we should scan the next 4 blocks.
ctx.cluster.restartDataNode(0);
synchronized (info) {
info.shouldRun = true;
info.notify();
}
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
if (info.blocksScanned != 9) {
LOG.info("Waiting for blocksScanned to reach 9. It is at {}",
info.blocksScanned);
}
return info.blocksScanned == 9;
}
}
}, 3, 30000);
synchronized (info) {
assertEquals(4, info.goodBlocks.size());
info.goodBlocks.addAll(prevGoodBlocks);
assertEquals(9, info.goodBlocks.size());
assertEquals(9, info.blocksScanned);
}
ctx.datanode.shutdown();
// After restarting the datanode, we should not scan any more blocks.
// This is because we reached the end of the block pool earlier, and
// the scan period is much, much longer than the test time.
synchronized (info) {
info.sem = null;
info.shouldRun = false;
info.goodBlocks.clear();
}
ctx.cluster.restartDataNode(0);
synchronized (info) {
info.shouldRun = true;
info.notify();
}
Thread.sleep(3000);
synchronized (info) {
assertTrue(info.goodBlocks.isEmpty());
}
ctx.close();
}
@Test(timeout=120000)
public void testMultipleBlockPoolScanning() throws Exception {
Configuration conf = new Configuration();
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
final TestContext ctx = new TestContext(conf, 3);
// We scan 5 bytes per file (1 byte in file, 4 bytes of checksum)
final int BYTES_SCANNED_PER_FILE = 5;
int TOTAL_FILES = 16;
ctx.createFiles(0, TOTAL_FILES, 1);
// start scanning
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
synchronized (info) {
info.shouldRun = true;
info.notify();
}
// Wait for all the block pools to be scanned.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
Statistics stats = ctx.blockScanner.getVolumeStats(
ctx.volumes.get(0).getStorageID());
if (stats.scansSinceRestart < 3) {
LOG.info("Waiting for scansSinceRestart to reach 3 (it is {})",
stats.scansSinceRestart);
return false;
}
if (!stats.eof) {
LOG.info("Waiting for eof.");
return false;
}
return true;
}
}
}, 3, 30000);
Statistics stats = ctx.blockScanner.getVolumeStats(
ctx.volumes.get(0).getStorageID());
assertEquals(TOTAL_FILES, stats.blocksScannedSinceRestart);
assertEquals(BYTES_SCANNED_PER_FILE * TOTAL_FILES,
stats.bytesScannedInPastHour);
ctx.close();
}
@Test(timeout=120000)
public void testNextSorted() throws Exception {
List<String> arr = new LinkedList<String>();
arr.add("1");
arr.add("3");
arr.add("5");
arr.add("7");
Assert.assertEquals("3", FsVolumeImpl.nextSorted(arr, "2"));
Assert.assertEquals("3", FsVolumeImpl.nextSorted(arr, "1"));
Assert.assertEquals("1", FsVolumeImpl.nextSorted(arr, ""));
Assert.assertEquals("1", FsVolumeImpl.nextSorted(arr, null));
Assert.assertEquals(null, FsVolumeImpl.nextSorted(arr, "9"));
}
@Test(timeout=120000)
public void testCalculateNeededBytesPerSec() throws Exception {
// If we didn't check anything the last hour, we should scan now.
Assert.assertTrue(
VolumeScanner.calculateShouldScan("test", 100, 0, 0, 60));
// If, on average, we checked 101 bytes/s checked during the last hour,
// stop checking now.
Assert.assertFalse(VolumeScanner.
calculateShouldScan("test", 100, 101 * 3600, 1000, 5000));
// Target is 1 byte / s, but we didn't scan anything in the last minute.
// Should scan now.
Assert.assertTrue(VolumeScanner.
calculateShouldScan("test", 1, 3540, 0, 60));
// Target is 1000000 byte / s, but we didn't scan anything in the last
// minute. Should scan now.
Assert.assertTrue(VolumeScanner.
calculateShouldScan("test", 100000L, 354000000L, 0, 60));
Assert.assertFalse(VolumeScanner.
calculateShouldScan("test", 100000L, 365000000L, 0, 60));
}
/**
* Test that we can mark certain blocks as suspect, and get them quickly
* rescanned that way. See HDFS-7686 and HDFS-7548.
*/
@Test(timeout=120000)
public void testMarkSuspectBlock() throws Exception {
Configuration conf = new Configuration();
// Set a really long scan period.
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
final TestContext ctx = new TestContext(conf, 1);
final int NUM_EXPECTED_BLOCKS = 10;
ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
String storageID = ctx.volumes.get(0).getStorageID();
synchronized (info) {
info.sem = new Semaphore(4);
info.shouldRun = true;
info.notify();
}
// Scan the first 4 blocks
LOG.info("Waiting for the first 4 blocks to be scanned.");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
if (info.blocksScanned >= 4) {
LOG.info("info = {}. blockScanned has now reached 4.", info);
return true;
} else {
LOG.info("info = {}. Waiting for blockScanned to reach 4.", info);
return false;
}
}
}
}, 50, 30000);
// We should have scanned 4 blocks
synchronized (info) {
assertEquals("Expected 4 good blocks.", 4, info.goodBlocks.size());
info.goodBlocks.clear();
assertEquals("Expected 4 blocksScanned", 4, info.blocksScanned);
assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
info.blocksScanned = 0;
}
ExtendedBlock first = ctx.getFileBlock(0, 0);
ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
// When we increment the semaphore, the TestScanResultHandler will finish
// adding the block that it was scanning previously (the 5th block).
// We increment the semaphore twice so that the handler will also
// get a chance to see the suspect block which we just requested the
// VolumeScanner to process.
info.sem.release(2);
LOG.info("Waiting for 2 more blocks to be scanned.");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
if (info.blocksScanned >= 2) {
LOG.info("info = {}. blockScanned has now reached 2.", info);
return true;
} else {
LOG.info("info = {}. Waiting for blockScanned to reach 2.", info);
return false;
}
}
}
}, 50, 30000);
synchronized (info) {
assertTrue("Expected block " + first + " to have been scanned.",
info.goodBlocks.contains(first));
assertEquals(2, info.goodBlocks.size());
info.goodBlocks.clear();
assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
assertEquals(2, info.blocksScanned);
info.blocksScanned = 0;
}
// Re-mark the same block as suspect.
ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
info.sem.release(10);
LOG.info("Waiting for 5 more blocks to be scanned.");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
if (info.blocksScanned >= 5) {
LOG.info("info = {}. blockScanned has now reached 5.", info);
return true;
} else {
LOG.info("info = {}. Waiting for blockScanned to reach 5.", info);
return false;
}
}
}
}, 50, 30000);
synchronized (info) {
assertEquals(5, info.goodBlocks.size());
assertEquals(0, info.badBlocks.size());
assertEquals(5, info.blocksScanned);
// We should not have rescanned the "suspect block",
// because it was recently rescanned by the suspect block system.
// This is a test of the "suspect block" rate limiting.
Assert.assertFalse("We should not " +
"have rescanned block " + first + ", because it should have been " +
"in recentSuspectBlocks.", info.goodBlocks.contains(first));
info.blocksScanned = 0;
}
ctx.close();
}
/**
* Test that blocks which are in the wrong location are ignored.
*/
@Test(timeout=120000)
public void testIgnoreMisplacedBlock() throws Exception {
Configuration conf = new Configuration();
// Set a really long scan period.
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
final TestContext ctx = new TestContext(conf, 1);
final int NUM_FILES = 4;
ctx.createFiles(0, NUM_FILES, 5);
MaterializedReplica unreachableReplica = ctx.getMaterializedReplica(0, 1);
ExtendedBlock unreachableBlock = ctx.getFileBlock(0, 1);
unreachableReplica.makeUnreachable();
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
String storageID = ctx.volumes.get(0).getStorageID();
synchronized (info) {
info.sem = new Semaphore(NUM_FILES);
info.shouldRun = true;
info.notify();
}
// Scan the first 4 blocks
LOG.info("Waiting for the blocks to be scanned.");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
if (info.blocksScanned >= NUM_FILES - 1) {
LOG.info("info = {}. blockScanned has now reached " +
info.blocksScanned, info);
return true;
} else {
LOG.info("info = {}. Waiting for blockScanned to reach " +
(NUM_FILES - 1), info);
return false;
}
}
}
}, 50, 30000);
// We should have scanned 4 blocks
synchronized (info) {
assertFalse(info.goodBlocks.contains(unreachableBlock));
assertFalse(info.badBlocks.contains(unreachableBlock));
assertEquals("Expected 3 good blocks.", 3, info.goodBlocks.size());
info.goodBlocks.clear();
assertEquals("Expected 3 blocksScanned", 3, info.blocksScanned);
assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
info.blocksScanned = 0;
}
info.sem.release(1);
ctx.close();
}
/**
* Test concurrent append and scan.
* @throws Exception
*/
@Test(timeout=120000)
public void testAppendWhileScanning() throws Exception {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
Configuration conf = new Configuration();
// throttle the block scanner: 1MB per second
conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 1048576);
// Set a really long scan period.
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
final int numExpectedFiles = 1;
final int numExpectedBlocks = 1;
final int numNameServices = 1;
// the initial file length can not be too small.
// Otherwise checksum file stream buffer will be pre-filled and
// BlockSender will not see the updated checksum.
final int initialFileLength = 2*1024*1024+100;
final TestContext ctx = new TestContext(conf, numNameServices);
// create one file, with one block.
ctx.createFiles(0, numExpectedFiles, initialFileLength);
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
String storageID = ctx.volumes.get(0).getStorageID();
synchronized (info) {
info.sem = new Semaphore(numExpectedBlocks*2);
info.shouldRun = true;
info.notify();
}
// VolumeScanner scans the first block when DN starts.
// Due to throttler, this should take approximately 2 seconds.
waitForRescan(info, numExpectedBlocks);
// update throttler to schedule rescan immediately.
// this number must be larger than initial file length, otherwise
// throttler prevents immediate rescan.
conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND,
initialFileLength+32*1024);
BlockScanner.Conf newConf = new BlockScanner.Conf(conf);
ctx.datanode.getBlockScanner().setConf(newConf);
// schedule the first block for scanning
ExtendedBlock first = ctx.getFileBlock(0, 0);
ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
// append the file before VolumeScanner completes scanning the block,
// which takes approximately 2 seconds to complete.
FileSystem fs = ctx.cluster.getFileSystem();
FSDataOutputStream os = fs.append(ctx.getPath(0));
long seed = -1;
int size = 200;
final byte[] bytes = AppendTestUtil.randomBytes(seed, size);
os.write(bytes);
os.hflush();
os.close();
// verify that volume scanner does not find bad blocks after append.
waitForRescan(info, numExpectedBlocks);
GenericTestUtils.setLogLevel(DataNode.LOG, Level.INFO);
ctx.close();
}
private void waitForRescan(final TestScanResultHandler.Info info,
final int numExpectedBlocks)
throws TimeoutException, InterruptedException {
LOG.info("Waiting for the first 1 blocks to be scanned.");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
if (info.blocksScanned >= numExpectedBlocks) {
LOG.info("info = {}. blockScanned has now reached 1.", info);
return true;
} else {
LOG.info("info = {}. Waiting for blockScanned to reach 1.", info);
return false;
}
}
}
}, 1000, 30000);
synchronized (info) {
assertEquals("Expected 1 good block.",
numExpectedBlocks, info.goodBlocks.size());
info.goodBlocks.clear();
assertEquals("Expected 1 blocksScanned",
numExpectedBlocks, info.blocksScanned);
assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
info.blocksScanned = 0;
}
}
}