blob: d684950426fcfef13cf3230b5e0b1c1f4a10af01 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestFsDatasetImpl {
Logger LOG = LoggerFactory.getLogger(TestFsDatasetImpl.class);
private static final String BASE_DIR =
new FileSystemTestHelper().getTestRootDir();
private static final int NUM_INIT_VOLUMES = 2;
private static final String CLUSTER_ID = "cluser-id";
private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
// Use to generate storageUuid
private static final DataStorage dsForStorageUuid = new DataStorage(
new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE));
private Configuration conf;
private DataNode datanode;
private DataStorage storage;
private FsDatasetImpl dataset;
private final static String BLOCKPOOL = "BP-TEST";
private static Storage.StorageDirectory createStorageDirectory(File root,
Configuration conf)
throws SecurityException, IOException {
Storage.StorageDirectory sd = new Storage.StorageDirectory(
StorageLocation.parse(root.toURI().toString()));
DataStorage.createStorageID(sd, false, conf);
return sd;
}
private static void createStorageDirs(DataStorage storage, Configuration conf,
int numDirs) throws IOException {
List<Storage.StorageDirectory> dirs =
new ArrayList<Storage.StorageDirectory>();
List<String> dirStrings = new ArrayList<String>();
FileUtils.deleteDirectory(new File(BASE_DIR));
for (int i = 0; i < numDirs; i++) {
File loc = new File(BASE_DIR + "/data" + i);
dirStrings.add(new Path(loc.toString()).toUri().toString());
loc.mkdirs();
dirs.add(createStorageDirectory(loc, conf));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
}
String dataDir = StringUtils.join(",", dirStrings);
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
when(storage.dirIterator()).thenReturn(dirs.iterator());
when(storage.getNumStorageDirs()).thenReturn(numDirs);
}
private int getNumVolumes() {
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
return volumes.size();
} catch (IOException e) {
return 0;
}
}
@Before
public void setUp() throws IOException {
datanode = mock(DataNode.class);
storage = mock(DataStorage.class);
this.conf = new Configuration();
this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
when(datanode.getConf()).thenReturn(conf);
final DNConf dnConf = new DNConf(datanode);
when(datanode.getDnConf()).thenReturn(dnConf);
final BlockScanner disabledBlockScanner = new BlockScanner(datanode);
when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
final ShortCircuitRegistry shortCircuitRegistry =
new ShortCircuitRegistry(conf);
when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
dataset = new FsDatasetImpl(datanode, storage, conf);
for (String bpid : BLOCK_POOL_IDS) {
dataset.addBlockPool(bpid, conf);
}
assertEquals(NUM_INIT_VOLUMES, getNumVolumes());
assertEquals(0, dataset.getNumFailedVolumes());
}
@Test
public void testAddVolumes() throws IOException {
final int numNewVolumes = 3;
final int numExistingVolumes = getNumVolumes();
final int totalVolumes = numNewVolumes + numExistingVolumes;
Set<String> expectedVolumes = new HashSet<String>();
List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
}
for (int i = 0; i < numNewVolumes; i++) {
String path = BASE_DIR + "/newData" + i;
String pathUri = new Path(path).toUri().toString();
expectedVolumes.add(new File(pathUri).getAbsolutePath());
StorageLocation loc = StorageLocation.parse(pathUri);
Storage.StorageDirectory sd = createStorageDirectory(
new File(path), conf);
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc),
anyListOf(NamespaceInfo.class)))
.thenReturn(builder);
dataset.addVolume(loc, nsInfos);
LOG.info("expectedVolumes " + i + " is " +
new File(pathUri).getAbsolutePath());
}
assertEquals(totalVolumes, getNumVolumes());
assertEquals(totalVolumes, dataset.storageMap.size());
Set<String> actualVolumes = new HashSet<String>();
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
for (int i = 0; i < numNewVolumes; i++) {
String volumeName = volumes.get(numExistingVolumes + i).toString();
actualVolumes.add(volumeName);
LOG.info("actualVolume " + i + " is " + volumeName);
}
}
assertEquals(actualVolumes.size(), expectedVolumes.size());
assertTrue(actualVolumes.containsAll(expectedVolumes));
}
@Test
public void testAddVolumeWithSameStorageUuid() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
try {
cluster.waitActive();
assertTrue(cluster.getDataNodes().get(0).isConnectedToNN(
cluster.getNameNode().getServiceRpcAddress()));
MiniDFSCluster.DataNodeProperties dn = cluster.stopDataNode(0);
File vol0 = cluster.getStorageDir(0, 0);
File vol1 = cluster.getStorageDir(0, 1);
Storage.StorageDirectory sd0 = new Storage.StorageDirectory(vol0);
Storage.StorageDirectory sd1 = new Storage.StorageDirectory(vol1);
FileUtils.copyFile(sd0.getVersionFile(), sd1.getVersionFile());
cluster.restartDataNode(dn, true);
cluster.waitActive();
assertFalse(cluster.getDataNodes().get(0).isConnectedToNN(
cluster.getNameNode().getServiceRpcAddress()));
} finally {
cluster.shutdown();
}
}
@Test(timeout = 30000)
public void testRemoveVolumes() throws IOException {
// Feed FsDataset with block metadata.
final int NUM_BLOCKS = 100;
for (int i = 0; i < NUM_BLOCKS; i++) {
String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
ExtendedBlock eb = new ExtendedBlock(bpid, i);
try (ReplicaHandler replica =
dataset.createRbw(StorageType.DEFAULT, null, eb, false)) {
}
}
final String[] dataDirs =
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
final String volumePathToRemove = dataDirs[0];
Set<StorageLocation> volumesToRemove = new HashSet<>();
volumesToRemove.add(StorageLocation.parse(volumePathToRemove));
FsVolumeReferences volReferences = dataset.getFsVolumeReferences();
FsVolumeImpl volumeToRemove = null;
for (FsVolumeSpi vol: volReferences) {
if (vol.getStorageLocation().equals(volumesToRemove.iterator().next())) {
volumeToRemove = (FsVolumeImpl) vol;
}
}
assertTrue(volumeToRemove != null);
volReferences.close();
dataset.removeVolumes(volumesToRemove, true);
int expectedNumVolumes = dataDirs.length - 1;
assertEquals("The volume has been removed from the volumeList.",
expectedNumVolumes, getNumVolumes());
assertEquals("The volume has been removed from the storageMap.",
expectedNumVolumes, dataset.storageMap.size());
try {
dataset.asyncDiskService.execute(volumeToRemove,
new Runnable() {
@Override
public void run() {}
});
fail("Expect RuntimeException: the volume has been removed from the "
+ "AsyncDiskService.");
} catch (RuntimeException e) {
GenericTestUtils.assertExceptionContains("Cannot find volume", e);
}
int totalNumReplicas = 0;
for (String bpid : dataset.volumeMap.getBlockPoolList()) {
totalNumReplicas += dataset.volumeMap.size(bpid);
}
assertEquals("The replica infos on this volume has been removed from the "
+ "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES,
totalNumReplicas);
}
@Test(timeout = 5000)
public void testRemoveNewlyAddedVolume() throws IOException {
final int numExistingVolumes = getNumVolumes();
List<NamespaceInfo> nsInfos = new ArrayList<>();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
}
String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater";
StorageLocation loc = StorageLocation.parse(newVolumePath);
Storage.StorageDirectory sd = createStorageDirectory(
new File(newVolumePath), conf);
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode), eq(loc),
anyListOf(NamespaceInfo.class)))
.thenReturn(builder);
dataset.addVolume(loc, nsInfos);
assertEquals(numExistingVolumes + 1, getNumVolumes());
when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1);
when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd);
Set<StorageLocation> volumesToRemove = new HashSet<>();
volumesToRemove.add(loc);
dataset.removeVolumes(volumesToRemove, true);
assertEquals(numExistingVolumes, getNumVolumes());
}
@Test
public void testAddVolumeFailureReleasesInUseLock() throws IOException {
FsDatasetImpl spyDataset = spy(dataset);
FsVolumeImpl mockVolume = mock(FsVolumeImpl.class);
File badDir = new File(BASE_DIR, "bad");
badDir.mkdirs();
doReturn(mockVolume).when(spyDataset)
.createFsVolume(anyString(), any(StorageDirectory.class),
any(StorageLocation.class));
doThrow(new IOException("Failed to getVolumeMap()"))
.when(mockVolume).getVolumeMap(
anyString(),
any(ReplicaMap.class),
any(RamDiskReplicaLruTracker.class));
Storage.StorageDirectory sd = createStorageDirectory(badDir, conf);
sd.lock();
DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
when(storage.prepareVolume(eq(datanode),
eq(StorageLocation.parse(badDir.toURI().toString())),
Matchers.<List<NamespaceInfo>>any()))
.thenReturn(builder);
StorageLocation location = StorageLocation.parse(badDir.toString());
List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (String bpid : BLOCK_POOL_IDS) {
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
}
try {
spyDataset.addVolume(location, nsInfos);
fail("Expect to throw MultipleIOException");
} catch (MultipleIOException e) {
}
FsDatasetTestUtil.assertFileLockReleased(badDir.toString());
}
@Test
public void testDeletingBlocks() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
ds.addBlockPool(BLOCKPOOL, conf);
FsVolumeImpl vol;
try (FsDatasetSpi.FsVolumeReferences volumes = ds.getFsVolumeReferences()) {
vol = (FsVolumeImpl)volumes.get(0);
}
ExtendedBlock eb;
ReplicaInfo info;
List<Block> blockList = new ArrayList<>();
for (int i = 1; i <= 63; i++) {
eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i);
cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb);
blockList.add(eb.getLocalBlock());
}
ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Nothing to do
}
assertTrue(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId()));
blockList.clear();
eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064);
cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb);
blockList.add(eb.getLocalBlock());
ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Nothing to do
}
assertFalse(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId()));
} finally {
cluster.shutdown();
}
}
@Test
public void testDuplicateReplicaResolution() throws IOException {
FsVolumeImpl fsv1 = Mockito.mock(FsVolumeImpl.class);
FsVolumeImpl fsv2 = Mockito.mock(FsVolumeImpl.class);
File f1 = new File("d1/block");
File f2 = new File("d2/block");
ReplicaInfo replicaOlder = new FinalizedReplica(1,1,1,fsv1,f1);
ReplicaInfo replica = new FinalizedReplica(1,2,2,fsv1,f1);
ReplicaInfo replicaSame = new FinalizedReplica(1,2,2,fsv1,f1);
ReplicaInfo replicaNewer = new FinalizedReplica(1,3,3,fsv1,f1);
ReplicaInfo replicaOtherOlder = new FinalizedReplica(1,1,1,fsv2,f2);
ReplicaInfo replicaOtherSame = new FinalizedReplica(1,2,2,fsv2,f2);
ReplicaInfo replicaOtherNewer = new FinalizedReplica(1,3,3,fsv2,f2);
// equivalent path so don't remove either
assertNull(BlockPoolSlice.selectReplicaToDelete(replicaSame, replica));
assertNull(BlockPoolSlice.selectReplicaToDelete(replicaOlder, replica));
assertNull(BlockPoolSlice.selectReplicaToDelete(replicaNewer, replica));
// keep latest found replica
assertSame(replica,
BlockPoolSlice.selectReplicaToDelete(replicaOtherSame, replica));
assertSame(replicaOtherOlder,
BlockPoolSlice.selectReplicaToDelete(replicaOtherOlder, replica));
assertSame(replica,
BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica));
}
@Test
public void testLoadingDfsUsedForVolumes() throws IOException,
InterruptedException {
long waitIntervalTime = 5000;
// Initialize the cachedDfsUsedIntervalTime larger than waitIntervalTime
// to avoid cache-dfsused time expired
long cachedDfsUsedIntervalTime = waitIntervalTime + 1000;
conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
cachedDfsUsedIntervalTime);
long cacheDfsUsed = 1024;
long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime);
assertEquals(cacheDfsUsed, dfsUsed);
}
@Test
public void testLoadingDfsUsedForVolumesExpired() throws IOException,
InterruptedException {
long waitIntervalTime = 5000;
// Initialize the cachedDfsUsedIntervalTime smaller than waitIntervalTime
// to make cache-dfsused time expired
long cachedDfsUsedIntervalTime = waitIntervalTime - 1000;
conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
cachedDfsUsedIntervalTime);
long cacheDfsUsed = 1024;
long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime);
// Because the cache-dfsused expired and the dfsUsed will be recalculated
assertTrue(cacheDfsUsed != dfsUsed);
}
private long getDfsUsedValueOfNewVolume(long cacheDfsUsed,
long waitIntervalTime) throws IOException, InterruptedException {
List<NamespaceInfo> nsInfos = Lists.newArrayList();
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, BLOCK_POOL_IDS[0], 1));
String CURRENT_DIR = "current";
String DU_CACHE_FILE = BlockPoolSlice.DU_CACHE_FILE;
String path = BASE_DIR + "/newData0";
String pathUri = new Path(path).toUri().toString();
StorageLocation loc = StorageLocation.parse(pathUri);
Storage.StorageDirectory sd = createStorageDirectory(new File(path), conf);
DataStorage.VolumeBuilder builder =
new DataStorage.VolumeBuilder(storage, sd);
when(
storage.prepareVolume(eq(datanode), eq(loc),
anyListOf(NamespaceInfo.class))).thenReturn(builder);
String cacheFilePath =
String.format("%s/%s/%s/%s/%s", path, CURRENT_DIR, BLOCK_POOL_IDS[0],
CURRENT_DIR, DU_CACHE_FILE);
File outFile = new File(cacheFilePath);
if (!outFile.getParentFile().exists()) {
outFile.getParentFile().mkdirs();
}
if (outFile.exists()) {
outFile.delete();
}
FakeTimer timer = new FakeTimer();
try {
try (Writer out =
new OutputStreamWriter(new FileOutputStream(outFile),
StandardCharsets.UTF_8)) {
// Write the dfsUsed value and the time to cache file
out.write(Long.toString(cacheDfsUsed) + " "
+ Long.toString(timer.now()));
out.flush();
}
} catch (IOException ioe) {
}
dataset.setTimer(timer);
timer.advance(waitIntervalTime);
dataset.addVolume(loc, nsInfos);
// Get the last volume which was just added before
FsVolumeImpl newVolume;
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
newVolume = (FsVolumeImpl) volumes.get(volumes.size() - 1);
}
long dfsUsed = newVolume.getDfsUsed();
return dfsUsed;
}
@Test(timeout = 60000)
public void testRemoveVolumeBeingWritten() throws Exception {
// Will write and remove on dn0.
final ExtendedBlock eb = new ExtendedBlock(BLOCK_POOL_IDS[0], 0);
final CountDownLatch startFinalizeLatch = new CountDownLatch(1);
final CountDownLatch blockReportReceivedLatch = new CountDownLatch(1);
final CountDownLatch volRemoveStartedLatch = new CountDownLatch(1);
final CountDownLatch volRemoveCompletedLatch = new CountDownLatch(1);
class BlockReportThread extends Thread {
public void run() {
// Lets wait for the volume remove process to start
try {
volRemoveStartedLatch.await();
} catch (Exception e) {
LOG.info("Unexpected exception when waiting for vol removal:", e);
}
LOG.info("Getting block report");
dataset.getBlockReports(eb.getBlockPoolId());
LOG.info("Successfully received block report");
blockReportReceivedLatch.countDown();
}
}
class ResponderThread extends Thread {
public void run() {
try (ReplicaHandler replica = dataset
.createRbw(StorageType.DEFAULT, null, eb, false)) {
LOG.info("CreateRbw finished");
startFinalizeLatch.countDown();
// Slow down while we're holding the reference to the volume.
// As we finalize a block, the volume is removed in parallel.
// Ignore any interrupts coming out of volume shutdown.
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
LOG.info("Ignoring ", ie);
}
// Lets wait for the other thread finish getting block report
blockReportReceivedLatch.await();
dataset.finalizeBlock(eb, false);
LOG.info("FinalizeBlock finished");
} catch (Exception e) {
LOG.warn("Exception caught. This should not affect the test", e);
}
}
}
class VolRemoveThread extends Thread {
public void run() {
Set<StorageLocation> volumesToRemove = new HashSet<>();
try {
volumesToRemove.add(dataset.getVolume(eb).getStorageLocation());
} catch (Exception e) {
LOG.info("Problem preparing volumes to remove: ", e);
Assert.fail("Exception in remove volume thread, check log for " +
"details.");
}
LOG.info("Removing volume " + volumesToRemove);
dataset.removeVolumes(volumesToRemove, true);
volRemoveCompletedLatch.countDown();
LOG.info("Removed volume " + volumesToRemove);
}
}
// Start the volume write operation
ResponderThread responderThread = new ResponderThread();
responderThread.start();
startFinalizeLatch.await();
// Start the block report get operation
final BlockReportThread blockReportThread = new BlockReportThread();
blockReportThread.start();
// Start the volume remove operation
VolRemoveThread volRemoveThread = new VolRemoveThread();
volRemoveThread.start();
// Let volume write and remove operation be
// blocked for few seconds
Thread.sleep(2000);
// Signal block report receiver and volume writer
// thread to complete their operations so that vol
// remove can proceed
volRemoveStartedLatch.countDown();
// Verify if block report can be received
// when volume is in use and also being removed
blockReportReceivedLatch.await();
// Verify if volume can be removed safely when there
// are read/write operation in-progress
volRemoveCompletedLatch.await();
}
/**
* Tests stopping all the active DataXceiver thread on volume failure event.
* @throws Exception
*/
@Test
public void testCleanShutdownOfVolume() throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration config = new HdfsConfiguration();
config.setLong(
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
config.setTimeDuration(
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, 0,
TimeUnit.MILLISECONDS);
config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
DataNode dataNode = cluster.getDataNodes().get(0);
Path filePath = new Path("test.dat");
// Create a file and keep the output stream unclosed.
FSDataOutputStream out = fs.create(filePath, (short) 1);
out.write(1);
out.hflush();
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
final FsVolumeImpl volume = (FsVolumeImpl) dataNode.getFSDataset().
getVolume(block);
File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
.getBlockPoolId());
LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
DatanodeInfo info = lb.getLocations()[0];
if (finalizedDir.exists()) {
// Remove write and execute access so that checkDiskErrorThread detects
// this volume is bad.
finalizedDir.setExecutable(false);
finalizedDir.setWritable(false);
}
Assert.assertTrue("Reference count for the volume should be greater "
+ "than 0", volume.getReferenceCount() > 0);
// Invoke the synchronous checkDiskError method
dataNode.checkDiskError();
// Sleep for 1 second so that datanode can interrupt and cluster clean up
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return volume.getReferenceCount() == 0;
}
}, 100, 1000);
assertThat(dataNode.getFSDataset().getNumFailedVolumes(), is(1));
try {
out.close();
Assert.fail("This is not a valid code path. "
+ "out.close should have thrown an exception.");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(info.getXferAddr(), ioe);
}
finalizedDir.setWritable(true);
finalizedDir.setExecutable(true);
} finally {
cluster.shutdown();
}
}
@Test(timeout = 30000)
public void testReportBadBlocks() throws Exception {
boolean threwException = false;
MiniDFSCluster cluster = null;
try {
Configuration config = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
cluster.waitActive();
Assert.assertEquals(0, cluster.getNamesystem().getCorruptReplicaBlocks());
DataNode dataNode = cluster.getDataNodes().get(0);
ExtendedBlock block =
new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(), 0);
try {
// Test the reportBadBlocks when the volume is null
dataNode.reportBadBlocks(block);
} catch (NullPointerException npe) {
threwException = true;
}
Thread.sleep(3000);
Assert.assertFalse(threwException);
Assert.assertEquals(0, cluster.getNamesystem().getCorruptReplicaBlocks());
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("testData");
DFSTestUtil.createFile(fs, filePath, 1, (short) 1, 0);
block = DFSTestUtil.getFirstBlock(fs, filePath);
// Test for the overloaded method reportBadBlocks
dataNode.reportBadBlocks(block, dataNode.getFSDataset()
.getFsVolumeReferences().get(0));
Thread.sleep(3000);
BlockManagerTestUtil.updateState(cluster.getNamesystem()
.getBlockManager());
// Verify the bad block has been reported to namenode
Assert.assertEquals(1, cluster.getNamesystem().getCorruptReplicaBlocks());
} finally {
cluster.shutdown();
}
}
@Test(timeout = 30000)
public void testMoveBlockFailure() {
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
.storagesPerDatanode(2)
.build();
FileSystem fs = cluster.getFileSystem();
DataNode dataNode = cluster.getDataNodes().get(0);
Path filePath = new Path("testData");
DFSTestUtil.createFile(fs, filePath, 100, (short) 1, 0);
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
ReplicaInfo newReplicaInfo = createNewReplicaObj(block, fsDataSetImpl);
// Append to file to update its GS
FSDataOutputStream out = fs.append(filePath, (short) 1);
out.write(100);
out.hflush();
// Call finalizeNewReplica
LOG.info("GenerationStamp of old replica: {}",
block.getGenerationStamp());
LOG.info("GenerationStamp of new replica: {}", fsDataSetImpl
.getReplicaInfo(block.getBlockPoolId(), newReplicaInfo.getBlockId())
.getGenerationStamp());
LambdaTestUtils.intercept(IOException.class, "Generation Stamp "
+ "should be monotonically increased.",
() -> fsDataSetImpl.finalizeNewReplica(newReplicaInfo, block));
} catch (Exception ex) {
LOG.info("Exception in testMoveBlockFailure ", ex);
fail("Exception while testing testMoveBlockFailure ");
} finally {
if (cluster.isClusterUp()) {
cluster.shutdown();
}
}
}
@Test(timeout = 30000)
public void testMoveBlockSuccess() {
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
.storagesPerDatanode(2)
.build();
FileSystem fs = cluster.getFileSystem();
DataNode dataNode = cluster.getDataNodes().get(0);
Path filePath = new Path("testData");
DFSTestUtil.createFile(fs, filePath, 100, (short) 1, 0);
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
ReplicaInfo newReplicaInfo = createNewReplicaObj(block, fsDataSetImpl);
fsDataSetImpl.finalizeNewReplica(newReplicaInfo, block);
} catch (Exception ex) {
LOG.info("Exception in testMoveBlockSuccess ", ex);
fail("MoveBlock operation should succeed");
} finally {
if (cluster.isClusterUp()) {
cluster.shutdown();
}
}
}
/**
* Create a new temporary replica of replicaInfo object in another volume.
*
* @param block - Extended Block
* @param fsDataSetImpl - FsDatasetImpl reference
* @throws IOException
*/
private ReplicaInfo createNewReplicaObj(ExtendedBlock block, FsDatasetImpl
fsDataSetImpl) throws IOException {
ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl);
return fsDataSetImpl.copyReplicaToVolume(block, replicaInfo,
destVolume.obtainReference());
}
/**
* Finds a new destination volume for block.
*
* @param block - Extended Block
* @param fsDataSetImpl - FsDatasetImpl reference
* @throws IOException
*/
private FsVolumeSpi getDestinationVolume(ExtendedBlock block, FsDatasetImpl
fsDataSetImpl) throws IOException {
FsVolumeSpi destVolume = null;
final String srcStorageId = fsDataSetImpl.getVolume(block).getStorageID();
try (FsVolumeReferences volumeReferences =
fsDataSetImpl.getFsVolumeReferences()) {
for (int i = 0; i < volumeReferences.size(); i++) {
if (!volumeReferences.get(i).getStorageID().equals(srcStorageId)) {
destVolume = volumeReferences.get(i);
break;
}
}
}
return destVolume;
}
@Test(timeout = 3000000)
public void testBlockReadOpWhileMovingBlock() throws IOException {
MiniDFSCluster cluster = null;
try {
// Setup cluster
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
.storagesPerDatanode(2)
.build();
FileSystem fs = cluster.getFileSystem();
DataNode dataNode = cluster.getDataNodes().get(0);
// Create test file with ASCII data
Path filePath = new Path("/tmp/testData");
String blockData = RandomStringUtils.randomAscii(512 * 4);
FSDataOutputStream fout = fs.create(filePath);
fout.writeBytes(blockData);
fout.close();
assertEquals(blockData, DFSTestUtil.readFile(fs, filePath));
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
BlockReaderTestUtil util = new BlockReaderTestUtil(cluster, new
HdfsConfiguration(conf));
LocatedBlock blk = util.getFileBlocks(filePath, 512 * 2).get(0);
File[] blkFiles = cluster.getAllBlockFiles(block);
// Part 1: Read partial data from block
LOG.info("Reading partial data for block {} before moving it: ",
blk.getBlock().toString());
BlockReader blkReader = BlockReaderTestUtil.getBlockReader(
(DistributedFileSystem) fs, blk, 0, 512 * 2);
byte[] buf = new byte[512 * 2];
blkReader.read(buf, 0, 512);
assertEquals(blockData.substring(0, 512), new String(buf,
StandardCharsets.US_ASCII).substring(0, 512));
// Part 2: Move block and than read remaining block
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
ReplicaInfo replicaInfo = fsDataSetImpl.getReplicaInfo(block);
FsVolumeSpi destVolume = getDestinationVolume(block, fsDataSetImpl);
assertNotNull("Destination volume should not be null.", destVolume);
fsDataSetImpl.moveBlock(block, replicaInfo, destVolume.obtainReference());
// Trigger block report to update block info in NN
cluster.triggerBlockReports();
blkReader.read(buf, 512, 512);
assertEquals(blockData.substring(0, 512 * 2), new String(buf,
StandardCharsets.US_ASCII).substring(0, 512 * 2));
blkReader = BlockReaderTestUtil.getBlockReader(
(DistributedFileSystem) fs,
blk, 0, blockData.length());
buf = new byte[512 * 4];
blkReader.read(buf, 0, 512 * 4);
assertEquals(blockData, new String(buf, StandardCharsets.US_ASCII));
// Part 3: 1. Close the block reader
// 2. Assert source block doesn't exist on initial volume
// 3. Assert new file location for block is different
// 4. Confirm client can read data from new location
blkReader.close();
ExtendedBlock block2 = DFSTestUtil.getFirstBlock(fs, filePath);
File[] blkFiles2 = cluster.getAllBlockFiles(block2);
blk = util.getFileBlocks(filePath, 512 * 4).get(0);
blkReader = BlockReaderTestUtil.getBlockReader(
(DistributedFileSystem) fs,
blk, 0, blockData.length());
blkReader.read(buf, 0, 512 * 4);
assertFalse(Files.exists(Paths.get(blkFiles[0].getAbsolutePath())));
assertNotEquals(blkFiles[0], blkFiles2[0]);
assertEquals(blockData, new String(buf, StandardCharsets.US_ASCII));
} finally {
if (cluster.isClusterUp()) {
cluster.shutdown();
}
}
}
}