blob: a312b03168b4933619306b1379cf4e424088c5e7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED;
import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.io.StringWriter;
import java.io.Writer;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ReplicationResult;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ErasureCodingResult;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.LoggerFactory;
/**
* A JUnit test for doing fsck.
*/
public class TestFsck {
private static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(TestFsck.class.getName());
// Pattern for:
// allowed=true ugi=name ip=/address cmd=FSCK src=/ dst=null perm=null
static final Pattern FSCK_PATTERN = Pattern.compile(
"allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=fsck\\ssrc=\\/\\sdst=null\\s" +
"perm=null\\s" + "proto=.*");
static final Pattern GET_FILE_INFO_PATTERN = Pattern.compile(
"allowed=.*?\\s" +
"ugi=.*?\\s" +
"ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" +
"cmd=getfileinfo\\ssrc=\\/\\sdst=null\\s" +
"perm=null\\s" + "proto=.*");
static final Pattern NUM_MISSING_BLOCKS_PATTERN = Pattern.compile(
".*Missing blocks:\t\t([0123456789]*).*");
static final Pattern NUM_CORRUPT_BLOCKS_PATTERN = Pattern.compile(
".*Corrupt blocks:\t\t([0123456789]*).*");
private static final String LINE_SEPARATOR =
System.getProperty("line.separator");
private static LogCapturer auditLogCapture;
public static String runFsck(Configuration conf, int expectedErrCode,
boolean checkErrorCode, String... path)
throws Exception {
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bStream, true);
GenericTestUtils.setLogLevel(
FSPermissionChecker.LOG, org.slf4j.event.Level.TRACE);
int errCode = ToolRunner.run(new DFSck(conf, out), path);
LOG.info("OUTPUT = " + bStream.toString());
if (checkErrorCode) {
assertEquals(expectedErrCode, errCode);
}
GenericTestUtils.setLogLevel(
FSPermissionChecker.LOG, org.slf4j.event.Level.INFO);
return bStream.toString();
}
private MiniDFSCluster cluster = null;
private Configuration conf = null;
@BeforeClass
public static void beforeClass() {
auditLogCapture = LogCapturer.captureLogs(FSNamesystem.AUDIT_LOG);
}
@AfterClass
public static void afterClass() {
auditLogCapture.stopCapturing();
}
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.setBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
false);
}
@After
public void tearDown() throws Exception {
shutdownCluster();
}
private void shutdownCluster() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
/** do fsck. */
@Test
public void testFsck() throws Exception {
DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck").
setNumFiles(20).build();
FileSystem fs = null;
final long precision = 1L;
conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY,
precision);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(4).build();
fs = cluster.getFileSystem();
final String fileName = "/srcdat";
util.createFiles(fs, fileName);
util.waitReplication(fs, fileName, (short)3);
final Path file = new Path(fileName);
long aTime = fs.getFileStatus(file).getAccessTime();
Thread.sleep(precision);
String outStr = runFsck(conf, 0, true, "/");
verifyAuditLogs();
assertEquals(aTime, fs.getFileStatus(file).getAccessTime());
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
shutdownCluster();
// restart the cluster; bring up namenode but not the data nodes
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0).format(false).build();
outStr = runFsck(conf, 1, true, "/");
// expect the result is corrupt
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
System.out.println(outStr);
// bring up data nodes & cleanup cluster
cluster.startDataNodes(conf, 4, true, null, null);
cluster.waitActive();
cluster.waitClusterUp();
fs = cluster.getFileSystem();
util.cleanup(fs, "/srcdat");
}
private void verifyAuditLogs() {
String[] auditLogOutputLines = auditLogCapture.getOutput().split("\\n");
int fileStatusSuccess = 0;
int fsckCount = 0;
for (String auditLogLine : auditLogOutputLines) {
if (!auditLogLine.contains("allowed=")) {
continue;
}
String extractedAuditLog = "allowed=" + auditLogLine.split("allowed=")[1];
LOG.info("Line: {}", extractedAuditLog);
if (extractedAuditLog.contains("cmd=getfileinfo") && GET_FILE_INFO_PATTERN.matcher(
extractedAuditLog).matches()) {
fileStatusSuccess++;
} else if (FSCK_PATTERN.matcher(extractedAuditLog).matches()) {
fsckCount++;
}
}
if (fileStatusSuccess < 2) {
throw new AssertionError(
"getfileinfo cmd should occur at least 2 times. Actual count: " + fileStatusSuccess);
}
if (fsckCount < 1) {
throw new AssertionError("fsck should be present at least once. Actual count: " + fsckCount);
}
}
@Test
public void testFsckNonExistent() throws Exception {
DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck").
setNumFiles(20).build();
FileSystem fs = null;
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(4).build();
fs = cluster.getFileSystem();
util.createFiles(fs, "/srcdat");
util.waitReplication(fs, "/srcdat", (short)3);
String outStr = runFsck(conf, 0, true, "/non-existent");
assertEquals(-1, outStr.indexOf(NamenodeFsck.HEALTHY_STATUS));
System.out.println(outStr);
util.cleanup(fs, "/srcdat");
}
/** Test fsck with permission set on inodes. */
@Test
public void testFsckPermission() throws Exception {
final DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(20).build();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
// Create a cluster with the current user, write some files
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(4).build();
final MiniDFSCluster c2 = cluster;
final String dir = "/dfsck";
final Path dirpath = new Path(dir);
final FileSystem fs = c2.getFileSystem();
util.createFiles(fs, dir);
util.waitReplication(fs, dir, (short) 3);
fs.setPermission(dirpath, new FsPermission((short) 0700));
// run DFSck as another user, should fail with permission issue
UserGroupInformation fakeUGI = UserGroupInformation.createUserForTesting(
"ProbablyNotARealUserName", new String[] {"ShangriLa"});
fakeUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
System.out.println(runFsck(conf, -1, true, dir));
return null;
}
});
// set permission and try DFSck again as the fake user, should succeed
fs.setPermission(dirpath, new FsPermission((short) 0777));
fakeUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final String outStr = runFsck(conf, 0, true, dir);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
return null;
}
});
util.cleanup(fs, dir);
}
@Test
public void testFsckMove() throws Exception {
final int dfsBlockSize = 1024;
final int numDatanodes = 4;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, dfsBlockSize);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
DFSTestUtil util = new DFSTestUtil("TestFsck", 5, 3,
(5 * dfsBlockSize) + (dfsBlockSize - 1), 5 * dfsBlockSize);
FileSystem fs = null;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(numDatanodes).build();
String topDir = "/srcdat";
fs = cluster.getFileSystem();
cluster.waitActive();
util.createFiles(fs, topDir);
util.waitReplication(fs, topDir, (short)3);
String outStr = runFsck(conf, 0, true, "/");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), conf);
String[] fileNames = util.getFileNames(topDir);
CorruptedTestFile[] ctFiles = new CorruptedTestFile[]{
new CorruptedTestFile(fileNames[0], new HashSet<>(Arrays.asList(0)),
dfsClient, numDatanodes, dfsBlockSize),
new CorruptedTestFile(fileNames[1], new HashSet<>(Arrays.asList(2, 3)),
dfsClient, numDatanodes, dfsBlockSize),
new CorruptedTestFile(fileNames[2], new HashSet<>(Arrays.asList(4)),
dfsClient, numDatanodes, dfsBlockSize),
new CorruptedTestFile(fileNames[3], new HashSet<>(Arrays.asList(0, 1, 2, 3)),
dfsClient, numDatanodes, dfsBlockSize),
new CorruptedTestFile(fileNames[4], new HashSet<>(Arrays.asList(1, 2, 3, 4)),
dfsClient, numDatanodes, dfsBlockSize)
};
int totalMissingBlocks = 0;
for (CorruptedTestFile ctFile : ctFiles) {
totalMissingBlocks += ctFile.getTotalMissingBlocks();
}
for (CorruptedTestFile ctFile : ctFiles) {
ctFile.removeBlocks(cluster);
}
// Wait for fsck to discover all the missing blocks
while (true) {
outStr = runFsck(conf, 1, false, "/");
String numMissing = null;
String numCorrupt = null;
for (String line : outStr.split(LINE_SEPARATOR)) {
Matcher m = NUM_MISSING_BLOCKS_PATTERN.matcher(line);
if (m.matches()) {
numMissing = m.group(1);
}
m = NUM_CORRUPT_BLOCKS_PATTERN.matcher(line);
if (m.matches()) {
numCorrupt = m.group(1);
}
if (numMissing != null && numCorrupt != null) {
break;
}
}
if (numMissing == null || numCorrupt == null) {
throw new IOException("failed to find number of missing or corrupt" +
" blocks in fsck output.");
}
if (numMissing.equals(Integer.toString(totalMissingBlocks))) {
assertTrue(numCorrupt.equals(Integer.toString(0)));
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
}
}
// Copy the non-corrupt blocks of corruptFileName to lost+found.
outStr = runFsck(conf, 1, false, "/", "-move");
LOG.info("WATERMELON: outStr = " + outStr);
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
// Make sure that we properly copied the block files from the DataNodes
// to lost+found
for (CorruptedTestFile ctFile : ctFiles) {
ctFile.checkSalvagedRemains();
}
// Fix the filesystem by removing corruptFileName
outStr = runFsck(conf, 1, true, "/", "-delete");
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
// Check to make sure we have a healthy filesystem
outStr = runFsck(conf, 0, true, "/");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
util.cleanup(fs, topDir);
}
static private class CorruptedTestFile {
final private String name;
final private Set<Integer> blocksToCorrupt;
final private DFSClient dfsClient;
final private int numDataNodes;
final private int blockSize;
final private byte[] initialContents;
CorruptedTestFile(String name, Set<Integer> blocksToCorrupt,
DFSClient dfsClient, int numDataNodes, int blockSize)
throws IOException {
this.name = name;
this.blocksToCorrupt = blocksToCorrupt;
this.dfsClient = dfsClient;
this.numDataNodes = numDataNodes;
this.blockSize = blockSize;
this.initialContents = cacheInitialContents();
}
public int getTotalMissingBlocks() {
return blocksToCorrupt.size();
}
private byte[] cacheInitialContents() throws IOException {
HdfsFileStatus status = dfsClient.getFileInfo(name);
byte[] content = new byte[(int)status.getLen()];
DFSInputStream in = null;
try {
in = dfsClient.open(name);
IOUtils.readFully(in, content, 0, content.length);
} finally {
in.close();
}
return content;
}
public void removeBlocks(MiniDFSCluster cluster)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
for (int corruptIdx : blocksToCorrupt) {
// Corrupt a block by deleting it
ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(
name, blockSize * corruptIdx, Long.MAX_VALUE).get(0).getBlock();
for (int i = 0; i < numDataNodes; i++) {
File blockFile = cluster.getBlockFile(i, block);
if(blockFile != null && blockFile.exists()) {
assertTrue(blockFile.delete());
}
}
}
}
public void corruptBlocks(MiniDFSCluster cluster) throws IOException {
for (int corruptIdx : blocksToCorrupt) {
// Corrupt a block by deleting it
ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(name,
blockSize * corruptIdx, Long.MAX_VALUE).get(0).getBlock();
for (int i = 0; i < numDataNodes; i++) {
File blockFile = cluster.getBlockFile(i, block);
if(blockFile != null && blockFile.exists()) {
FileOutputStream blockFileStream =
new FileOutputStream(blockFile, false);
blockFileStream.write("corrupt".getBytes());
blockFileStream.close();
LOG.info("Corrupted block file " + blockFile);
}
}
}
}
public void checkSalvagedRemains() throws IOException {
int chainIdx = 0;
HdfsFileStatus status = dfsClient.getFileInfo(name);
long length = status.getLen();
int numBlocks = (int)((length + blockSize - 1) / blockSize);
DFSInputStream in = null;
byte[] blockBuffer = new byte[blockSize];
try {
for (int blockIdx = 0; blockIdx < numBlocks; blockIdx++) {
if (blocksToCorrupt.contains(blockIdx)) {
if (in != null) {
in.close();
in = null;
}
continue;
}
if (in == null) {
in = dfsClient.open("/lost+found" + name + "/" + chainIdx);
chainIdx++;
}
int len = blockBuffer.length;
if (blockIdx == (numBlocks - 1)) {
// The last block might not be full-length
len = (int)(in.getFileLength() % blockSize);
if (len == 0) {
len = blockBuffer.length;
}
}
IOUtils.readFully(in, blockBuffer, 0, len);
int startIdx = blockIdx * blockSize;
for (int i = 0; i < len; i++) {
if (initialContents[startIdx + i] != blockBuffer[i]) {
throw new IOException("salvaged file " + name + " differed " +
"from what we expected on block " + blockIdx);
}
}
}
} finally {
IOUtils.cleanupWithLogger(null, in);
}
}
}
@Test
public void testFsckMoveAndDelete() throws Exception {
final int maxMoveTries = 5;
DFSTestUtil util = new DFSTestUtil.Builder().
setName("TestFsckMoveAndDelete").setNumFiles(5).build();
FileSystem fs = null;
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(4).build();
String topDir = "/srcdat";
fs = cluster.getFileSystem();
cluster.waitActive();
util.createFiles(fs, topDir);
util.waitReplication(fs, topDir, (short)3);
String outStr = runFsck(conf, 0, true, "/");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// Corrupt a block by deleting it
String[] fileNames = util.getFileNames(topDir);
DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), conf);
String corruptFileName = fileNames[0];
ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(
corruptFileName, 0, Long.MAX_VALUE).get(0).getBlock();
for (int i=0; i<4; i++) {
File blockFile = cluster.getBlockFile(i, block);
if(blockFile != null && blockFile.exists()) {
assertTrue(blockFile.delete());
}
}
// We excpect the filesystem to be corrupted
outStr = runFsck(conf, 1, false, "/");
while (!outStr.contains(NamenodeFsck.CORRUPT_STATUS)) {
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
}
outStr = runFsck(conf, 1, false, "/");
}
// After a fsck -move, the corrupted file should still exist.
for (int i = 0; i < maxMoveTries; i++) {
outStr = runFsck(conf, 1, true, "/", "-move");
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
String[] newFileNames = util.getFileNames(topDir);
boolean found = false;
for (String f : newFileNames) {
if (f.equals(corruptFileName)) {
found = true;
break;
}
}
assertTrue(found);
}
// Fix the filesystem by moving corrupted files to lost+found
outStr = runFsck(conf, 1, true, "/", "-move", "-delete");
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
// Check to make sure we have healthy filesystem
outStr = runFsck(conf, 0, true, "/");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
util.cleanup(fs, topDir);
}
@Test
public void testFsckOpenFiles() throws Exception {
DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck").
setNumFiles(4).build();
FileSystem fs = null;
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(4).build();
String topDir = "/srcdat";
String randomString = "HADOOP ";
fs = cluster.getFileSystem();
cluster.waitActive();
util.createFiles(fs, topDir);
util.waitReplication(fs, topDir, (short)3);
String outStr = runFsck(conf, 0, true, "/");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// Open a file for writing and do not close for now
Path openFile = new Path(topDir + "/openFile");
FSDataOutputStream out = fs.create(openFile);
int writeCount = 0;
while (writeCount != 100) {
out.write(randomString.getBytes());
writeCount++;
}
((DFSOutputStream) out.getWrappedStream()).hflush();
// We expect the filesystem to be HEALTHY and show one open file
outStr = runFsck(conf, 0, true, topDir);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
assertFalse(outStr.contains("OPENFORWRITE"));
// Use -openforwrite option to list open files
outStr = runFsck(conf, 0, true, topDir, "-files", "-blocks",
"-locations", "-openforwrite");
System.out.println(outStr);
assertTrue(outStr.contains("OPENFORWRITE"));
assertTrue(outStr.contains("Under Construction Block:"));
assertTrue(outStr.contains("openFile"));
// Close the file
out.close();
// Now, fsck should show HEALTHY fs and should not show any open files
outStr = runFsck(conf, 0, true, topDir);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
assertFalse(outStr.contains("OPENFORWRITE"));
assertFalse(outStr.contains("Under Construction Block:"));
util.cleanup(fs, topDir);
}
@Test
public void testFsckOpenECFiles() throws Exception {
DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsckECFile").
setNumFiles(4).build();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
ErasureCodingPolicy ecPolicy =
StripedFileTestUtil.getDefaultECPolicy();
final int dataBlocks = ecPolicy.getNumDataUnits();
final int cellSize = ecPolicy.getCellSize();
final int numAllUnits = dataBlocks + ecPolicy.getNumParityUnits();
int blockSize = 2 * cellSize;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir).numDataNodes(
numAllUnits + 1).build();
String topDir = "/myDir";
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(ecPolicy.getName());
util.createFiles(fs, topDir);
// set topDir to EC when it has replicated files
cluster.getFileSystem().getClient().setErasureCodingPolicy(
topDir, ecPolicy.getName());
// create a new file under topDir
DFSTestUtil.createFile(fs, new Path(topDir, "ecFile"), 1024, (short) 1, 0L);
// Open a EC file for writing and do not close for now
Path openFile = new Path(topDir + "/openECFile");
FSDataOutputStream out = fs.create(openFile);
int blockGroupSize = dataBlocks * blockSize;
// data size is more than 1 block group and less than 2 block groups
byte[] randomBytes = new byte[2 * blockGroupSize - cellSize];
int seed = 42;
new Random(seed).nextBytes(randomBytes);
out.write(randomBytes);
// make sure the fsck can correctly handle mixed ec/replicated files
runFsck(conf, 0, true, topDir, "-files", "-blocks", "-openforwrite");
// We expect the filesystem to be HEALTHY and show one open file
String outStr = runFsck(conf, 0, true, openFile.toString(), "-files",
"-blocks", "-openforwrite");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
assertTrue(outStr.contains("OPENFORWRITE"));
assertTrue(outStr.contains("Live_repl=" + numAllUnits));
assertTrue(outStr.contains("Expected_repl=" + numAllUnits));
// Use -openforwrite option to list open files
outStr = runFsck(conf, 0, true, openFile.toString(), "-files", "-blocks",
"-locations", "-openforwrite", "-replicaDetails");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
assertTrue(outStr.contains("OPENFORWRITE"));
assertTrue(outStr.contains("Live_repl=" + numAllUnits));
assertTrue(outStr.contains("Expected_repl=" + numAllUnits));
assertTrue(outStr.contains("Under Construction Block:"));
// check reported blockIDs of internal blocks
LocatedStripedBlock lsb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(openFile.toString(), 0, cellSize * dataBlocks).get(0);
long groupId = lsb.getBlock().getBlockId();
byte[] indices = lsb.getBlockIndices();
DatanodeInfo[] locs = lsb.getLocations();
long blockId;
for (int i = 0; i < indices.length; i++) {
blockId = groupId + indices[i];
String str = "blk_" + blockId + ":" + locs[i];
assertTrue(outStr.contains(str));
}
// check the output of under-constructed blocks doesn't include the blockIDs
String regex = ".*Expected_repl=" + numAllUnits + "(.*)\nStatus:.*";
Pattern p = Pattern.compile(regex, Pattern.DOTALL);
Matcher m = p.matcher(outStr);
assertTrue(m.find());
String ucBlockOutput = m.group(1);
assertFalse(ucBlockOutput.contains("blk_"));
// Close the file
out.close();
// Now, fsck should show HEALTHY fs and should not show any open files
outStr = runFsck(conf, 0, true, openFile.toString(), "-files", "-blocks",
"-locations", "-racks", "-replicaDetails");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
assertFalse(outStr.contains("OPENFORWRITE"));
assertFalse(outStr.contains("Under Construction Block:"));
assertFalse(outStr.contains("Expected_repl=" + numAllUnits));
assertTrue(outStr.contains("Live_repl=" + numAllUnits));
util.cleanup(fs, topDir);
}
@Test
public void testCorruptBlock() throws Exception {
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
FileSystem fs = null;
DFSClient dfsClient = null;
LocatedBlocks blocks = null;
int replicaCount = 0;
Random random = new Random();
String outStr = null;
short factor = 1;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
Path file1 = new Path("/testCorruptBlock");
DFSTestUtil.createFile(fs, file1, 1024, factor, 0);
// Wait until file replication has completed
DFSTestUtil.waitReplication(fs, file1, factor);
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
// Make sure filesystem is in healthy state
outStr = runFsck(conf, 0, true, "/");
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// corrupt replicas
File blockFile = cluster.getBlockFile(0, block);
if (blockFile != null && blockFile.exists()) {
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
FileChannel channel = raFile.getChannel();
String badString = "BADBAD";
int rand = random.nextInt((int) channel.size()/2);
raFile.seek(rand);
raFile.write(badString.getBytes());
raFile.close();
}
// Read the file to trigger reportBadBlocks
try {
IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), conf,
true);
} catch (IOException ie) {
assertTrue(ie instanceof ChecksumException);
}
dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), conf);
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
while (replicaCount != factor) {
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
}
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
}
assertTrue(blocks.get(0).isCorrupt());
// Check if fsck reports the same
outStr = runFsck(conf, 1, true, "/");
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
assertTrue(outStr.contains("testCorruptBlock"));
}
@Test
public void testUnderMinReplicatedBlock() throws Exception {
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
// Set minReplication to 2
short minReplication=2;
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, minReplication);
FileSystem fs = null;
DFSClient dfsClient = null;
LocatedBlocks blocks = null;
int replicaCount = 0;
Random random = new Random();
String outStr = null;
short factor = 1;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(2).build();
cluster.waitActive();
fs = cluster.getFileSystem();
Path file1 = new Path("/testUnderMinReplicatedBlock");
DFSTestUtil.createFile(fs, file1, 1024, minReplication, 0);
// Wait until file replication has completed
DFSTestUtil.waitReplication(fs, file1, minReplication);
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
// Make sure filesystem is in healthy state
outStr = runFsck(conf, 0, true, "/");
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// corrupt the first replica
File blockFile = cluster.getBlockFile(0, block);
if (blockFile != null && blockFile.exists()) {
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
FileChannel channel = raFile.getChannel();
String badString = "BADBAD";
int rand = random.nextInt((int) channel.size()/2);
raFile.seek(rand);
raFile.write(badString.getBytes());
raFile.close();
}
dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), conf);
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
while (replicaCount != factor) {
try {
Thread.sleep(100);
// Read the file to trigger reportBadBlocks
try {
IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(),
conf, true);
} catch (IOException ie) {
assertTrue(ie instanceof ChecksumException);
}
System.out.println("sleep in try: replicaCount=" + replicaCount
+ " factor=" + factor);
} catch (InterruptedException ignore) {
}
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
}
// Check if fsck reports the same
outStr = runFsck(conf, 0, true, "/");
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
assertTrue(outStr.contains("UNDER MIN REPL'D BLOCKS:\t1 (100.0 %)"));
assertTrue(outStr.contains("MINIMAL BLOCK REPLICATION:\t2"));
}
@Test(timeout = 90000)
public void testFsckReplicaDetails() throws Exception {
final short replFactor = 1;
short numDn = 1;
final long blockSize = 512;
final long fileSize = 1024;
String[] racks = {"/rack1"};
String[] hosts = {"host1"};
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
DistributedFileSystem dfs;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(numDn).hosts(hosts).racks(racks).build();
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
// create files
final String testFile = new String("/testfile");
final Path path = new Path(testFile);
DFSTestUtil.createFile(dfs, path, fileSize, replFactor, 1000L);
DFSTestUtil.waitReplication(dfs, path, replFactor);
// make sure datanode that has replica is fine before decommission
String fsckOut = runFsck(conf, 0, true, testFile, "-files",
"-maintenance", "-blocks", "-replicaDetails");
assertTrue(fsckOut.contains(NamenodeFsck.HEALTHY_STATUS));
assertTrue(fsckOut.contains("(LIVE)"));
assertFalse(fsckOut.contains("(ENTERING MAINTENANCE)"));
assertFalse(fsckOut.contains("(IN MAINTENANCE)"));
// decommission datanode
FSNamesystem fsn = cluster.getNameNode().getNamesystem();
BlockManager bm = fsn.getBlockManager();
final DatanodeManager dnm = bm.getDatanodeManager();
DatanodeDescriptor dnDesc0 = dnm.getDatanode(
cluster.getDataNodes().get(0).getDatanodeId());
bm.getDatanodeManager().getDatanodeAdminManager().startDecommission(
dnDesc0);
final String dn0Name = dnDesc0.getXferAddr();
// check the replica status while decommissioning
fsckOut = runFsck(conf, 0, true, testFile, "-files",
"-maintenance", "-blocks", "-replicaDetails");
assertTrue(fsckOut.contains("(DECOMMISSIONING)"));
assertFalse(fsckOut.contains("(ENTERING MAINTENANCE)"));
assertFalse(fsckOut.contains("(IN MAINTENANCE)"));
// Start 2nd DataNode
cluster.startDataNodes(conf, 1, true, null,
new String[] {"/rack2"}, new String[] {"host2"}, null, false);
// Wait for decommission to start
final AtomicBoolean checkDecommissionInProgress =
new AtomicBoolean(false);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
DatanodeInfo datanodeInfo = null;
try {
for (DatanodeInfo info : dfs.getDataNodeStats()) {
if (dn0Name.equals(info.getXferAddr())) {
datanodeInfo = info;
}
}
if (!checkDecommissionInProgress.get() && datanodeInfo != null
&& datanodeInfo.isDecommissionInProgress()) {
checkDecommissionInProgress.set(true);
}
if (datanodeInfo != null && datanodeInfo.isDecommissioned()) {
return true;
}
} catch (Exception e) {
LOG.warn("Unexpected exception: " + e);
return false;
}
return false;
}
}, 500, 30000);
// check the replica status after decommission is done
fsckOut = runFsck(conf, 0, true, testFile, "-files",
"-maintenance", "-blocks", "-replicaDetails");
assertTrue(fsckOut.contains("(DECOMMISSIONED)"));
assertFalse(fsckOut.contains("(ENTERING MAINTENANCE)"));
assertFalse(fsckOut.contains("(IN MAINTENANCE)"));
DatanodeDescriptor dnDesc1 = dnm.getDatanode(
cluster.getDataNodes().get(1).getDatanodeId());
final String dn1Name = dnDesc1.getXferAddr();
bm.getDatanodeManager().getDatanodeAdminManager().startMaintenance(dnDesc1,
Long.MAX_VALUE);
// check the replica status while entering maintenance
fsckOut = runFsck(conf, 0, true, testFile, "-files",
"-maintenance", "-blocks", "-replicaDetails");
assertTrue(fsckOut.contains("(DECOMMISSIONED)"));
assertTrue(fsckOut.contains("(ENTERING MAINTENANCE)"));
assertFalse(fsckOut.contains("(IN MAINTENANCE)"));
// check entering maintenance replicas are printed only when requested
fsckOut = runFsck(conf, 0, true, testFile, "-files",
"-blocks", "-replicaDetails");
assertTrue(fsckOut.contains("(DECOMMISSIONED)"));
assertFalse(fsckOut.contains("(ENTERING MAINTENANCE)"));
assertFalse(fsckOut.contains("(IN MAINTENANCE)"));
// Start 3rd DataNode
cluster.startDataNodes(conf, 1, true, null,
new String[] {"/rack3"}, new String[] {"host3"}, null, false);
// Wait for the 2nd node to reach in maintenance state
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
DatanodeInfo dnInfo = null;
try {
for (DatanodeInfo info : dfs.getDataNodeStats()) {
if (dn1Name.equals(info.getXferAddr())) {
dnInfo = info;
}
}
if (dnInfo != null && dnInfo.isInMaintenance()) {
return true;
}
} catch (Exception e) {
LOG.warn("Unexpected exception: " + e);
return false;
}
return false;
}
}, 500, 30000);
// check the replica status after decommission is done
fsckOut = runFsck(conf, 0, true, testFile, "-files",
"-maintenance", "-blocks", "-replicaDetails");
assertTrue(fsckOut.contains("(DECOMMISSIONED)"));
assertFalse(fsckOut.contains("(ENTERING MAINTENANCE)"));
assertTrue(fsckOut.contains("(IN MAINTENANCE)"));
// check in maintenance replicas are not printed when not requested
fsckOut = runFsck(conf, 0, true, testFile, "-files",
"-blocks", "-replicaDetails");
assertTrue(fsckOut.contains("(DECOMMISSIONED)"));
assertFalse(fsckOut.contains("(ENTERING MAINTENANCE)"));
assertFalse(fsckOut.contains("(IN MAINTENANCE)"));
}
/** Test if fsck can return -1 in case of failure.
*
* @throws Exception
*/
@Test
public void testFsckError() throws Exception {
// bring up a one-node cluster
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir).build();
String fileName = "/test.txt";
Path filePath = new Path(fileName);
FileSystem fs = cluster.getFileSystem();
// create a one-block file
DFSTestUtil.createFile(fs, filePath, 1L, (short)1, 1L);
DFSTestUtil.waitReplication(fs, filePath, (short)1);
// intentionally corrupt NN data structure
INodeFile node = (INodeFile) cluster.getNamesystem().dir.getINode(
fileName, DirOp.READ);
final BlockInfo[] blocks = node.getBlocks();
assertEquals(blocks.length, 1);
blocks[0].setNumBytes(-1L); // set the block length to be negative
// run fsck and expect a failure with -1 as the error code
String outStr = runFsck(conf, -1, true, fileName);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.FAILURE_STATUS));
// clean up file system
fs.delete(filePath, true);
}
/** check if option -list-corruptfiles of fsck command works properly. */
@Test
public void testFsckListCorruptFilesBlocks() throws Exception {
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
FileSystem fs = null;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil util = new DFSTestUtil.Builder().
setName("testGetCorruptFiles").setNumFiles(3).setMaxLevels(1).
setMaxSize(1024).build();
util.createFiles(fs, "/corruptData", (short) 1);
util.waitReplication(fs, "/corruptData", (short) 1);
String outStr = runFsck(conf, 0, false, "/corruptData",
"-list-corruptfileblocks");
System.out.println("1. good fsck out: " + outStr);
assertTrue(outStr.contains("has 0 CORRUPT blocks"));
// delete the blocks
final String bpid = cluster.getNamesystem().getBlockPoolId();
for (int i=0; i<4; i++) {
for (int j=0; j<=1; j++) {
File storageDir = cluster.getInstanceStorageDir(i, j);
File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(
dataDir);
if (metadataFiles == null) {
continue;
}
for (File metadataFile : metadataFiles) {
File blockFile = Block.metaToBlockFile(metadataFile);
assertTrue("Cannot remove file.", blockFile.delete());
assertTrue("Cannot remove file.", metadataFile.delete());
}
}
}
waitForCorruptionBlocks(3, "/corruptData");
outStr = runFsck(conf, -1, true, "/corruptData", "-list-corruptfileblocks");
System.out.println("2. bad fsck out: " + outStr);
assertTrue(outStr.contains("has 3 CORRUPT blocks"));
// Do a listing on a dir which doesn't have any corrupt blocks and validate
util.createFiles(fs, "/goodData");
outStr = runFsck(conf, 0, true, "/goodData", "-list-corruptfileblocks");
System.out.println("3. good fsck out: " + outStr);
assertTrue(outStr.contains("has 0 CORRUPT blocks"));
util.cleanup(fs, "/goodData");
// validate if a directory have any invalid entries
util.createFiles(fs, "/corruptDa");
outStr = runFsck(conf, 0, true, "/corruptDa", "-list-corruptfileblocks");
assertTrue(outStr.contains("has 0 CORRUPT blocks"));
util.cleanup(fs, "/corruptData");
util.cleanup(fs, "/corruptDa");
}
/**
* Test for checking fsck command on illegal arguments should print the proper
* usage.
*/
@Test
public void testToCheckTheFsckCommandOnIllegalArguments() throws Exception {
// bring up a one-node cluster
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir).build();
String fileName = "/test.txt";
Path filePath = new Path(fileName);
FileSystem fs = cluster.getFileSystem();
// create a one-block file
DFSTestUtil.createFile(fs, filePath, 1L, (short) 1, 1L);
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
// passing illegal option
String outStr = runFsck(conf, -1, true, fileName, "-thisIsNotAValidFlag");
System.out.println(outStr);
assertTrue(!outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// passing multiple paths are arguments
outStr = runFsck(conf, -1, true, "/", fileName);
System.out.println(outStr);
assertTrue(!outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// clean up file system
fs.delete(filePath, true);
}
/**
* Tests that the # of missing block replicas and expected replicas is
* correct.
* @throws IOException
*/
@Test
public void testFsckMissingReplicas() throws IOException {
// Desired replication factor
// Set this higher than numReplicas so it's under-replicated
final short replFactor = 2;
// Number of replicas to actually start
final short numReplicas = 1;
// Number of blocks to write
final short numBlocks = 3;
// Set a small-ish blocksize
final long blockSize = 512;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
DistributedFileSystem dfs = null;
// Startup a minicluster
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(numReplicas).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
// Create a file that will be intentionally under-replicated
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
long fileLen = blockSize * numBlocks;
DFSTestUtil.createFile(dfs, path, fileLen, replFactor, 1);
// Create an under-replicated file
NameNode namenode = cluster.getNameNode();
NetworkTopology nettop = cluster.getNamesystem().getBlockManager()
.getDatanodeManager().getNetworkTopology();
Map<String, String[]> pmap = new HashMap<String, String[]>();
Writer result = new StringWriter();
PrintWriter out = new PrintWriter(result, true);
InetAddress remoteAddress = InetAddress.getLocalHost();
NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out,
numReplicas, remoteAddress);
// Run the fsck and check the Result
final HdfsFileStatus file =
namenode.getRpcServer().getFileInfo(pathString);
assertNotNull(file);
Result replRes = new ReplicationResult(conf);
Result ecRes = new ErasureCodingResult(conf);
fsck.check(pathString, file, replRes, ecRes);
// Also print the output from the fsck, for ex post facto sanity checks
System.out.println(result.toString());
assertEquals(replRes.missingReplicas,
(numBlocks*replFactor) - (numBlocks*numReplicas));
assertEquals(replRes.numExpectedReplicas, numBlocks*replFactor);
}
/**
* Tests that the # of misreplaced replicas is correct.
* @throws IOException
*/
@Test
public void testFsckMisPlacedReplicas() throws IOException {
// Desired replication factor
final short replFactor = 2;
// Number of replicas to actually start
short numDn = 2;
// Number of blocks to write
final short numBlocks = 3;
// Set a small-ish blocksize
final long blockSize = 512;
String[] racks = {"/rack1", "/rack1"};
String[] hosts = {"host1", "host2"};
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
DistributedFileSystem dfs = null;
// Startup a minicluster
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(numDn).hosts(hosts).racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
// Create a file that will be intentionally under-replicated
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
long fileLen = blockSize * numBlocks;
DFSTestUtil.createFile(dfs, path, fileLen, replFactor, 1);
// Create an under-replicated file
NameNode namenode = cluster.getNameNode();
NetworkTopology nettop = cluster.getNamesystem().getBlockManager()
.getDatanodeManager().getNetworkTopology();
// Add a new node on different rack, so previous blocks' replicas
// are considered to be misplaced
nettop.add(DFSTestUtil.getDatanodeDescriptor("/rack2", "/host3"));
numDn++;
Map<String, String[]> pmap = new HashMap<String, String[]>();
Writer result = new StringWriter();
PrintWriter out = new PrintWriter(result, true);
InetAddress remoteAddress = InetAddress.getLocalHost();
NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out,
numDn, remoteAddress);
// Run the fsck and check the Result
final HdfsFileStatus file =
namenode.getRpcServer().getFileInfo(pathString);
assertNotNull(file);
Result replRes = new ReplicationResult(conf);
Result ecRes = new ErasureCodingResult(conf);
fsck.check(pathString, file, replRes, ecRes);
// check misReplicatedBlock number.
assertEquals(replRes.numMisReplicatedBlocks, numBlocks);
}
/** Test fsck with FileNotFound. */
@Test
public void testFsckFileNotFound() throws Exception {
// Number of replicas to actually start
final short numReplicas = 1;
NameNode namenode = mock(NameNode.class);
NetworkTopology nettop = mock(NetworkTopology.class);
Map<String, String[]> pmap = new HashMap<>();
Writer result = new StringWriter();
PrintWriter out = new PrintWriter(result, true);
InetAddress remoteAddress = InetAddress.getLocalHost();
FSNamesystem fsName = mock(FSNamesystem.class);
FSDirectory fsd = mock(FSDirectory.class);
BlockManager blockManager = mock(BlockManager.class);
DatanodeManager dnManager = mock(DatanodeManager.class);
INodesInPath iip = mock(INodesInPath.class);
when(namenode.getNamesystem()).thenReturn(fsName);
when(fsName.getBlockManager()).thenReturn(blockManager);
when(fsName.getFSDirectory()).thenReturn(fsd);
when(fsd.getFSNamesystem()).thenReturn(fsName);
when(fsd.resolvePath(any(), anyString(), any(DirOp.class))).thenReturn(iip);
when(blockManager.getDatanodeManager()).thenReturn(dnManager);
NamenodeFsck fsck = new NamenodeFsck(conf, namenode, nettop, pmap, out,
numReplicas, remoteAddress);
String pathString = "/tmp/testFile";
HdfsFileStatus file = new HdfsFileStatus.Builder()
.length(123L)
.replication(1)
.blocksize(128 * 1024L)
.mtime(123123123L)
.atime(123123120L)
.perm(FsPermission.getDefault())
.owner("foo")
.group("bar")
.path(DFSUtil.string2Bytes(pathString))
.fileId(312321L)
.children(1)
.storagePolicy(HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED)
.build();
Result replRes = new ReplicationResult(conf);
Result ecRes = new ErasureCodingResult(conf);
try {
fsck.check(pathString, file, replRes, ecRes);
} catch (Exception e) {
fail("Unexpected exception " + e.getMessage());
}
assertTrue(replRes.isHealthy());
}
/** Test fsck with symlinks in the filesystem. */
@Test
public void testFsckSymlink() throws Exception {
final DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
FileSystem fs = null;
final long precision = 1L;
conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY,
precision);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(4).build();
fs = cluster.getFileSystem();
final String fileName = "/srcdat";
util.createFiles(fs, fileName);
final FileContext fc = FileContext.getFileContext(
cluster.getConfiguration(0));
final Path file = new Path(fileName);
final Path symlink = new Path("/srcdat-symlink");
fc.createSymlink(file, symlink, false);
util.waitReplication(fs, fileName, (short)3);
long aTime = fc.getFileStatus(symlink).getAccessTime();
Thread.sleep(precision);
String outStr = runFsck(conf, 0, true, "/");
verifyAuditLogs();
assertEquals(aTime, fc.getFileStatus(symlink).getAccessTime());
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
assertTrue(outStr.contains("Total symlinks:\t\t1"));
util.cleanup(fs, fileName);
}
/**
* Test for including the snapshot files in fsck report.
*/
@Test
public void testFsckForSnapshotFiles() throws Exception {
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir).numDataNodes(1)
.build();
String runFsck = runFsck(conf, 0, true, "/", "-includeSnapshots",
"-files");
assertTrue(runFsck.contains("HEALTHY"));
final String fileName = "/srcdat";
DistributedFileSystem hdfs = cluster.getFileSystem();
Path file1 = new Path(fileName);
DFSTestUtil.createFile(hdfs, file1, 1024, (short) 1, 1000L);
hdfs.allowSnapshot(new Path("/"));
hdfs.createSnapshot(new Path("/"), "mySnapShot");
runFsck = runFsck(conf, 0, true, "/", "-includeSnapshots", "-files");
assertTrue(runFsck.contains("/.snapshot/mySnapShot/srcdat"));
runFsck = runFsck(conf, 0, true, "/", "-files");
assertFalse(runFsck.contains("mySnapShot"));
}
/**
* Test for blockIdCK.
*/
@Test
public void testBlockIdCK() throws Exception {
final short replFactor = 2;
short numDn = 2;
final long blockSize = 512;
String[] racks = {"/rack1", "/rack2"};
String[] hosts = {"host1", "host2"};
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
DistributedFileSystem dfs = null;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(numDn).hosts(hosts).racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();
//create files
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
util.createFile(dfs, path, 1024, replFactor, 1000L);
util.waitReplication(dfs, path, replFactor);
StringBuilder sb = new StringBuilder();
for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
}
String[] bIds = sb.toString().split(" ");
//run fsck
//illegal input test
String runFsckResult = runFsck(conf, 0, true, "/", "-blockId",
"not_a_block_id");
assertTrue(runFsckResult.contains("Incorrect blockId format:"));
//general test
runFsckResult = runFsck(conf, 0, true, "/", "-blockId", sb.toString());
assertTrue(runFsckResult.contains(bIds[0]));
assertTrue(runFsckResult.contains(bIds[1]));
assertTrue(runFsckResult.contains(
"Block replica on datanode/rack: host1/rack1 is HEALTHY"));
assertTrue(runFsckResult.contains(
"Block replica on datanode/rack: host2/rack2 is HEALTHY"));
}
/**
* Test for blockIdCK with datanode decommission.
*/
@Test
public void testBlockIdCKDecommission() throws Exception {
final short replFactor = 1;
short numDn = 2;
final long blockSize = 512;
boolean checkDecommissionInProgress = false;
String[] racks = {"/rack1", "/rack2"};
String[] hosts = {"host1", "host2"};
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
DistributedFileSystem dfs;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(numDn).hosts(hosts).racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();
//create files
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
util.createFile(dfs, path, 1024, replFactor, 1000L);
util.waitReplication(dfs, path, replFactor);
StringBuilder sb = new StringBuilder();
for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
}
String[] bIds = sb.toString().split(" ");
//make sure datanode that has replica is fine before decommission
String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
//decommission datanode
FSNamesystem fsn = cluster.getNameNode().getNamesystem();
BlockManager bm = fsn.getBlockManager();
ExtendedBlock eb = util.getFirstBlock(dfs, path);
BlockCollection bc = null;
try {
fsn.writeLock();
BlockInfo bi = bm.getStoredBlock(eb.getLocalBlock());
bc = fsn.getBlockCollection(bi);
} finally {
fsn.writeUnlock();
}
DatanodeDescriptor dn = bc.getBlocks()[0].getDatanode(0);
bm.getDatanodeManager().getDatanodeAdminManager().startDecommission(dn);
String dnName = dn.getXferAddr();
//wait for decommission start
DatanodeInfo datanodeInfo = null;
int count = 0;
do {
Thread.sleep(2000);
for (DatanodeInfo info : dfs.getDataNodeStats()) {
if (dnName.equals(info.getXferAddr())) {
datanodeInfo = info;
}
}
//check decommissioning only once
if(!checkDecommissionInProgress && datanodeInfo != null
&& datanodeInfo.isDecommissionInProgress()) {
String fsckOut = runFsck(conf, 3, true, "/", "-blockId", bIds[0]);
assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONING_STATUS));
checkDecommissionInProgress = true;
}
} while (datanodeInfo != null && !datanodeInfo.isDecommissioned());
//check decommissioned
String fsckOut = runFsck(conf, 2, true, "/", "-blockId", bIds[0]);
assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONED_STATUS));
}
/**
* Test for blockIdCK with datanode maintenance.
*/
@Test (timeout = 90000)
public void testBlockIdCKMaintenance() throws Exception {
final short replFactor = 2;
short numDn = 2;
final long blockSize = 512;
String[] hosts = {"host1", "host2"};
String[] racks = {"/rack1", "/rack2"};
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replFactor);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, replFactor);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
replFactor);
DistributedFileSystem dfs;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(numDn)
.hosts(hosts)
.racks(racks)
.build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();
//create files
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
util.createFile(dfs, path, 1024, replFactor, 1000L);
util.waitReplication(dfs, path, replFactor);
StringBuilder sb = new StringBuilder();
for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
}
String[] bIds = sb.toString().split(" ");
//make sure datanode that has replica is fine before maintenance
String outStr = runFsck(conf, 0, true, "/",
"-maintenance", "-blockId", bIds[0]);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
FSNamesystem fsn = cluster.getNameNode().getNamesystem();
BlockManager bm = fsn.getBlockManager();
DatanodeManager dnm = bm.getDatanodeManager();
DatanodeDescriptor dn = dnm.getDatanode(cluster.getDataNodes().get(0)
.getDatanodeId());
bm.getDatanodeManager().getDatanodeAdminManager().startMaintenance(dn,
Long.MAX_VALUE);
final String dnName = dn.getXferAddr();
//wait for the node to enter maintenance state
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
DatanodeInfo datanodeInfo = null;
try {
for (DatanodeInfo info : dfs.getDataNodeStats()) {
if (dnName.equals(info.getXferAddr())) {
datanodeInfo = info;
}
}
if (datanodeInfo != null && datanodeInfo.isEnteringMaintenance()) {
String fsckOut = runFsck(conf, 5, false, "/",
"-maintenance", "-blockId", bIds[0]);
assertTrue(fsckOut.contains(
NamenodeFsck.ENTERING_MAINTENANCE_STATUS));
return true;
}
} catch (Exception e) {
LOG.warn("Unexpected exception: " + e);
return false;
}
return false;
}
}, 500, 30000);
// Start 3rd DataNode
cluster.startDataNodes(conf, 1, true, null,
new String[] {"/rack3"}, new String[] {"host3"}, null, false);
// Wait for 1st node to reach in maintenance state
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
DatanodeInfo datanodeInfo = null;
for (DatanodeInfo info : dfs.getDataNodeStats()) {
if (dnName.equals(info.getXferAddr())) {
datanodeInfo = info;
}
}
if (datanodeInfo != null && datanodeInfo.isInMaintenance()) {
return true;
}
} catch (Exception e) {
LOG.warn("Unexpected exception: " + e);
return false;
}
return false;
}
}, 500, 30000);
//check in maintenance node
String fsckOut = runFsck(conf, 4, false, "/",
"-maintenance", "-blockId", bIds[0]);
assertTrue(fsckOut.contains(NamenodeFsck.IN_MAINTENANCE_STATUS));
//check in maintenance node are not printed when not requested
fsckOut = runFsck(conf, 4, false, "/", "-blockId", bIds[0]);
assertFalse(fsckOut.contains(NamenodeFsck.IN_MAINTENANCE_STATUS));
}
/**
* Test for blockIdCK with block corruption.
*/
@Test
public void testBlockIdCKCorruption() throws Exception {
short numDn = 1;
final long blockSize = 512;
Random random = new Random();
ExtendedBlock block;
short repFactor = 1;
String[] racks = {"/rack1"};
String[] hosts = {"host1"};
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
// Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
DistributedFileSystem dfs = null;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(numDn).hosts(hosts).racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();
//create files
final String pathString = new String("/testfile");
final Path path = new Path(pathString);
util.createFile(dfs, path, 1024, repFactor, 1000L);
util.waitReplication(dfs, path, repFactor);
StringBuilder sb = new StringBuilder();
for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
}
String[] bIds = sb.toString().split(" ");
//make sure block is healthy before we corrupt it
String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// corrupt replicas
block = DFSTestUtil.getFirstBlock(dfs, path);
File blockFile = cluster.getBlockFile(0, block);
if (blockFile != null && blockFile.exists()) {
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
FileChannel channel = raFile.getChannel();
String badString = "BADBAD";
int rand = random.nextInt((int) channel.size()/2);
raFile.seek(rand);
raFile.write(badString.getBytes());
raFile.close();
}
util.waitCorruptReplicas(dfs, cluster.getNamesystem(), path, block, 1);
outStr = runFsck(conf, 1, false, "/", "-blockId", block.getBlockName());
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
}
private void writeFile(final DistributedFileSystem dfs,
Path dir, String fileName) throws IOException {
Path filePath = new Path(dir.toString() + Path.SEPARATOR + fileName);
final FSDataOutputStream out = dfs.create(filePath);
out.writeChars("teststring");
out.close();
}
private void writeFile(final DistributedFileSystem dfs,
String dirName, String fileName, String storagePolicy)
throws IOException {
Path dirPath = new Path(dirName);
dfs.mkdirs(dirPath);
dfs.setStoragePolicy(dirPath, storagePolicy);
writeFile(dfs, dirPath, fileName);
}
/**
* Test storage policy display.
*/
@Test
public void testStoragePoliciesCK() throws Exception {
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(3)
.storageTypes(
new StorageType[] {StorageType.DISK, StorageType.ARCHIVE})
.build();
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
writeFile(dfs, "/testhot", "file", "HOT");
writeFile(dfs, "/testwarm", "file", "WARM");
writeFile(dfs, "/testcold", "file", "COLD");
String outStr = runFsck(conf, 0, true, "/", "-storagepolicies");
assertTrue(outStr.contains("DISK:3(HOT)"));
assertTrue(outStr.contains("DISK:1,ARCHIVE:2(WARM)"));
assertTrue(outStr.contains("ARCHIVE:3(COLD)"));
assertTrue(outStr.contains("All blocks satisfy specified storage policy."));
dfs.setStoragePolicy(new Path("/testhot"), "COLD");
dfs.setStoragePolicy(new Path("/testwarm"), "COLD");
outStr = runFsck(conf, 0, true, "/", "-storagepolicies");
assertTrue(outStr.contains("DISK:3(HOT)"));
assertTrue(outStr.contains("DISK:1,ARCHIVE:2(WARM)"));
assertTrue(outStr.contains("ARCHIVE:3(COLD)"));
assertFalse(outStr.contains(
"All blocks satisfy specified storage policy."));
}
/**
* Test for blocks on decommissioning hosts are not shown as missing.
*/
@Test
public void testFsckWithDecommissionedReplicas() throws Exception {
final short replFactor = 1;
short numDn = 2;
final long blockSize = 512;
final long fileSize = 1024;
boolean checkDecommissionInProgress = false;
String[] racks = {"/rack1", "/rack2"};
String[] hosts = {"host1", "host2"};
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
DistributedFileSystem dfs;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(numDn).hosts(hosts).racks(racks).build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();
//create files
final String testFile = new String("/testfile");
final Path path = new Path(testFile);
util.createFile(dfs, path, fileSize, replFactor, 1000L);
util.waitReplication(dfs, path, replFactor);
// make sure datanode that has replica is fine before decommission
String outStr = runFsck(conf, 0, true, testFile);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// decommission datanode
FSNamesystem fsn = cluster.getNameNode().getNamesystem();
BlockManager bm = fsn.getBlockManager();
ExtendedBlock eb = util.getFirstBlock(dfs, path);
BlockCollection bc = null;
try {
fsn.writeLock();
BlockInfo bi = bm.getStoredBlock(eb.getLocalBlock());
bc = fsn.getBlockCollection(bi);
} finally {
fsn.writeUnlock();
}
DatanodeDescriptor dn = bc.getBlocks()[0]
.getDatanode(0);
bm.getDatanodeManager().getDatanodeAdminManager().startDecommission(dn);
String dnName = dn.getXferAddr();
// wait for decommission start
DatanodeInfo datanodeInfo = null;
int count = 0;
do {
Thread.sleep(2000);
for (DatanodeInfo info : dfs.getDataNodeStats()) {
if (dnName.equals(info.getXferAddr())) {
datanodeInfo = info;
}
}
// check the replica status should be healthy(0)
// instead of corruption (1) during decommissioning
if(!checkDecommissionInProgress && datanodeInfo != null
&& datanodeInfo.isDecommissionInProgress()) {
String fsckOut = runFsck(conf, 0, true, testFile);
checkDecommissionInProgress = true;
}
} while (datanodeInfo != null && !datanodeInfo.isDecommissioned());
// check the replica status should be healthy(0) after decommission
// is done
String fsckOut = runFsck(conf, 0, true, testFile);
}
/**
* Test for blocks on maintenance hosts are not shown as missing.
*/
@Test (timeout = 90000)
public void testFsckWithMaintenanceReplicas() throws Exception {
final short replFactor = 2;
short numDn = 2;
final long blockSize = 512;
String[] hosts = {"host1", "host2"};
String[] racks = {"/rack1", "/rack2"};
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replFactor);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, replFactor);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
replFactor);
DistributedFileSystem dfs;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(numDn)
.hosts(hosts)
.racks(racks)
.build();
assertNotNull("Failed Cluster Creation", cluster);
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
assertNotNull("Failed to get FileSystem", dfs);
DFSTestUtil util = new DFSTestUtil.Builder().
setName(getClass().getSimpleName()).setNumFiles(1).build();
//create files
final String testFile = new String("/testfile");
final Path path = new Path(testFile);
util.createFile(dfs, path, 1024, replFactor, 1000L);
util.waitReplication(dfs, path, replFactor);
StringBuilder sb = new StringBuilder();
for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
}
String[] bIds = sb.toString().split(" ");
//make sure datanode that has replica is fine before maintenance
String outStr = runFsck(conf, 0, true, testFile);
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
FSNamesystem fsn = cluster.getNameNode().getNamesystem();
BlockManager bm = fsn.getBlockManager();
DatanodeManager dnm = bm.getDatanodeManager();
DatanodeDescriptor dn = dnm.getDatanode(cluster.getDataNodes().get(0)
.getDatanodeId());
bm.getDatanodeManager().getDatanodeAdminManager().startMaintenance(dn,
Long.MAX_VALUE);
final String dnName = dn.getXferAddr();
//wait for the node to enter maintenance state
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
DatanodeInfo datanodeInfo = null;
try {
for (DatanodeInfo info : dfs.getDataNodeStats()) {
if (dnName.equals(info.getXferAddr())) {
datanodeInfo = info;
}
}
if (datanodeInfo != null && datanodeInfo.isEnteringMaintenance()) {
// verify fsck returns Healthy status
String fsckOut = runFsck(conf, 0, true, testFile, "-maintenance");
assertTrue(fsckOut.contains(NamenodeFsck.HEALTHY_STATUS));
return true;
}
} catch (Exception e) {
LOG.warn("Unexpected exception: " + e);
return false;
}
return false;
}
}, 500, 30000);
// Start 3rd DataNode and wait for node to reach in maintenance state
cluster.startDataNodes(conf, 1, true, null,
new String[] {"/rack3"}, new String[] {"host3"}, null, false);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
DatanodeInfo datanodeInfo = null;
try {
for (DatanodeInfo info : dfs.getDataNodeStats()) {
if (dnName.equals(info.getXferAddr())) {
datanodeInfo = info;
}
}
if (datanodeInfo != null && datanodeInfo.isInMaintenance()) {
return true;
}
} catch (Exception e) {
LOG.warn("Unexpected exception: " + e);
return false;
}
return false;
}
}, 500, 30000);
// verify fsck returns Healthy status
String fsckOut = runFsck(conf, 0, true, testFile, "-maintenance");
assertTrue(fsckOut.contains(NamenodeFsck.HEALTHY_STATUS));
// verify fsck returns Healthy status even without maintenance option
fsckOut = runFsck(conf, 0, true, testFile);
assertTrue(fsckOut.contains(NamenodeFsck.HEALTHY_STATUS));
}
@Test
public void testECFsck() throws Exception {
DistributedFileSystem fs = null;
final long precision = 1L;
conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY,
precision);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
int dataBlocks = StripedFileTestUtil.getDefaultECPolicy().getNumDataUnits();
int parityBlocks =
StripedFileTestUtil.getDefaultECPolicy().getNumParityUnits();
int totalSize = dataBlocks + parityBlocks;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(totalSize).build();
fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
// create a contiguous file
Path replDirPath = new Path("/replicated");
Path replFilePath = new Path(replDirPath, "replfile");
final short factor = 3;
DFSTestUtil.createFile(fs, replFilePath, 1024, factor, 0);
DFSTestUtil.waitReplication(fs, replFilePath, factor);
// create a large striped file
Path ecDirPath = new Path("/striped");
Path largeFilePath = new Path(ecDirPath, "largeFile");
DFSTestUtil.createStripedFile(cluster, largeFilePath, ecDirPath, 1, 2,
true);
// create a small striped file
Path smallFilePath = new Path(ecDirPath, "smallFile");
DFSTestUtil.writeFile(fs, smallFilePath, "hello world!");
long replTime = fs.getFileStatus(replFilePath).getAccessTime();
long ecTime = fs.getFileStatus(largeFilePath).getAccessTime();
Thread.sleep(precision);
String outStr = runFsck(conf, 0, true, "/");
verifyAuditLogs();
assertEquals(replTime, fs.getFileStatus(replFilePath).getAccessTime());
assertEquals(ecTime, fs.getFileStatus(largeFilePath).getAccessTime());
System.out.println(outStr);
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
shutdownCluster();
// restart the cluster; bring up namenode but not the data nodes
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0).format(false).build();
outStr = runFsck(conf, 1, true, "/", "-files", "-blocks");
// expect the result is corrupt
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
String[] outLines = outStr.split("\\r?\\n");
for (String line: outLines) {
if (line.contains(largeFilePath.toString())) {
final HdfsFileStatus file = cluster.getNameNode().getRpcServer().
getFileInfo(largeFilePath.toString());
assertTrue(line.contains("policy=" +
file.getErasureCodingPolicy().getName()));
} else if (line.contains(replFilePath.toString())) {
assertTrue(line.contains("replication=" + cluster.getFileSystem().
getFileStatus(replFilePath).getReplication()));
}
}
System.out.println(outStr);
}
/**
* Test that corrupted snapshot files are listed with full dir.
*/
@Test
public void testFsckListCorruptSnapshotFiles() throws Exception {
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
DistributedFileSystem hdfs = null;
final short replFactor = 1;
int numFiles = 3;
int numSnapshots = 0;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir).build();
cluster.waitActive();
hdfs = cluster.getFileSystem();
DFSTestUtil util = new DFSTestUtil.Builder().
setName("testGetCorruptFiles").setNumFiles(numFiles).setMaxLevels(1).
setMaxSize(1024).build();
util.createFiles(hdfs, "/corruptData", (short) 1);
final Path fp = new Path("/corruptData/file");
util.createFile(hdfs, fp, 1024, replFactor, 1000L);
numFiles++;
util.waitReplication(hdfs, "/corruptData", (short) 1);
hdfs.allowSnapshot(new Path("/corruptData"));
hdfs.createSnapshot(new Path("/corruptData"), "mySnapShot");
numSnapshots = numFiles;
String outStr =
runFsck(conf, 0, false, "/corruptData", "-list-corruptfileblocks");
System.out.println("1. good fsck out: " + outStr);
assertTrue(outStr.contains("has 0 CORRUPT blocks"));
// delete the blocks
final String bpid = cluster.getNamesystem().getBlockPoolId();
for (int i=0; i<numFiles; i++) {
for (int j=0; j<=1; j++) {
File storageDir = cluster.getInstanceStorageDir(i, j);
File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(
dataDir);
if (metadataFiles == null) {
continue;
}
for (File metadataFile : metadataFiles) {
File blockFile = Block.metaToBlockFile(metadataFile);
assertTrue("Cannot remove file.", blockFile.delete());
assertTrue("Cannot remove file.", metadataFile.delete());
}
}
}
// Delete file when it has a snapshot
hdfs.delete(fp, false);
numFiles--;
waitForCorruptionBlocks(numSnapshots, "/corruptData");
// with -includeSnapshots all files are reported
outStr = runFsck(conf, -1, true, "/corruptData",
"-list-corruptfileblocks", "-includeSnapshots");
System.out.println("2. bad fsck include snapshot out: " + outStr);
assertTrue(outStr
.contains("has " + (numFiles + numSnapshots) + " CORRUPT blocks"));
assertTrue(outStr.contains("/.snapshot/"));
// without -includeSnapshots only non-snapshots are reported
outStr =
runFsck(conf, -1, true, "/corruptData", "-list-corruptfileblocks");
System.out.println("3. bad fsck exclude snapshot out: " + outStr);
assertTrue(outStr.contains("has " + numFiles + " CORRUPT blocks"));
assertFalse(outStr.contains("/.snapshot/"));
}
/**
* Wait for the namenode to see the corruption.
* @param corruptBlocks The expected number of corruptfilelocks
* @param path The Directory Path where corruptfileblocks exists
* @throws IOException
*/
private void waitForCorruptionBlocks(int corruptBlocks, String path)
throws Exception {
GenericTestUtils.waitFor(() -> {
try {
final NamenodeProtocols namenode = cluster.getNameNodeRpc();
CorruptFileBlocks corruptFileBlocks =
namenode.listCorruptFileBlocks(path, null);
int numCorrupt = corruptFileBlocks.getFiles().length;
if (numCorrupt == corruptBlocks) {
return true;
}
} catch (Exception e) {
LOG.error("Exception while getting Corrupt file blocks", e);
}
return false;
}, 100, 10000);
}
@Test (timeout = 300000)
public void testFsckMoveAfterCorruption() throws Exception {
final int dfsBlockSize = 512 * 1024;
final int numDatanodes = 1;
final int replication = 1;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, dfsBlockSize);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replication);
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir).build();
DistributedFileSystem dfs = cluster.getFileSystem();
cluster.waitActive();
final String srcDir = "/srcdat";
final DFSTestUtil util = new DFSTestUtil.Builder().setName("TestFsck")
.setMinSize(dfsBlockSize * 2).setMaxSize(dfsBlockSize * 3)
.setNumFiles(1).build();
util.createFiles(dfs, srcDir, (short) replication);
final String[] fileNames = util.getFileNames(srcDir);
LOG.info("Created files: " + Arrays.toString(fileNames));
// Run fsck here. The output is automatically logged for easier debugging
String outStr = runFsck(conf, 0, true, "/", "-files", "-blocks");
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// Corrupt the first block
final DFSClient dfsClient = new DFSClient(
new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
final String blockFileToCorrupt = fileNames[0];
final CorruptedTestFile ctf = new CorruptedTestFile(blockFileToCorrupt,
new HashSet<>(Arrays.asList(0)), dfsClient, numDatanodes, dfsBlockSize);
ctf.corruptBlocks(cluster);
// Wait for fsck to discover all the missing blocks
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
final String str = runFsck(conf, 1, false, "/");
String numCorrupt = null;
for (String line : str.split(LINE_SEPARATOR)) {
Matcher m = NUM_CORRUPT_BLOCKS_PATTERN.matcher(line);
if (m.matches()) {
numCorrupt = m.group(1);
break;
}
}
if (numCorrupt == null) {
Assert.fail("Cannot find corrupt blocks count in fsck output.");
}
if (Integer.parseInt(numCorrupt) == ctf.getTotalMissingBlocks()) {
assertTrue(str.contains(NamenodeFsck.CORRUPT_STATUS));
return true;
}
} catch (Exception e) {
LOG.error("Exception caught", e);
Assert.fail("Caught unexpected exception.");
}
return false;
}
}, 1000, 60000);
runFsck(conf, 1, true, "/", "-files", "-blocks", "-racks");
LOG.info("Moving blocks to lost+found");
// Fsck will return error since we corrupted a block
runFsck(conf, 1, false, "/", "-move");
final List<LocatedFileStatus> retVal = new ArrayList<>();
final RemoteIterator<LocatedFileStatus> iter =
dfs.listFiles(new Path("/lost+found"), true);
while (iter.hasNext()) {
retVal.add(iter.next());
}
LOG.info("Items in lost+found: " + retVal);
// Expect all good blocks moved, only corrupted block skipped.
long totalLength = 0;
for (LocatedFileStatus lfs: retVal) {
totalLength += lfs.getLen();
}
Assert.assertTrue("Nothing is moved to lost+found!", totalLength > 0);
util.cleanup(dfs, srcDir);
}
@Test(timeout = 60000)
public void testFsckUpgradeDomain() throws Exception {
testUpgradeDomain(false, false);
testUpgradeDomain(false, true);
testUpgradeDomain(true, false);
testUpgradeDomain(true, true);
}
private void testUpgradeDomain(boolean defineUpgradeDomain,
boolean displayUpgradeDomain) throws Exception {
final short replFactor = 1;
final short numDN = 1;
final long blockSize = 512;
final long fileSize = 1024;
final String upgradeDomain = "ud1";
final String[] racks = {"/rack1"};
final String[] hosts = {"127.0.0.1"};
HostsFileWriter hostsFileWriter = new HostsFileWriter();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replFactor);
conf.set(HDFS_MINIDFS_BASEDIR, GenericTestUtils.getRandomizedTempPath());
if (defineUpgradeDomain) {
conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY,
CombinedHostFileManager.class, HostConfigManager.class);
hostsFileWriter.initialize(conf, "temp/fsckupgradedomain");
}
DistributedFileSystem dfs;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDN).
hosts(hosts).racks(racks).build();
cluster.waitClusterUp();
dfs = cluster.getFileSystem();
// Configure the upgrade domain on the datanode
if (defineUpgradeDomain) {
DatanodeAdminProperties dnProp = new DatanodeAdminProperties();
DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId();
dnProp.setHostName(datanodeID.getHostName());
dnProp.setPort(datanodeID.getXferPort());
dnProp.setUpgradeDomain(upgradeDomain);
hostsFileWriter.initIncludeHosts(new DatanodeAdminProperties[]{dnProp});
cluster.getNamesystem(0).getBlockManager().getDatanodeManager().refreshNodes(conf);
}
// create files
final String testFile = new String("/testfile");
final Path path = new Path(testFile);
DFSTestUtil.createFile(dfs, path, fileSize, replFactor, 1000L);
DFSTestUtil.waitReplication(dfs, path, replFactor);
try {
String fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks",
displayUpgradeDomain ? "-upgradedomains" : "-locations");
assertTrue(fsckOut.contains(NamenodeFsck.HEALTHY_STATUS));
String udValue = defineUpgradeDomain ? upgradeDomain :
NamenodeFsck.UNDEFINED;
assertEquals(displayUpgradeDomain,
fsckOut.contains("(ud=" + udValue + ")"));
} finally {
if (defineUpgradeDomain) {
hostsFileWriter.cleanup();
}
}
}
@Test (timeout = 300000)
public void testFsckCorruptECFile() throws Exception {
DistributedFileSystem fs = null;
int dataBlocks = StripedFileTestUtil.getDefaultECPolicy().getNumDataUnits();
int parityBlocks =
StripedFileTestUtil.getDefaultECPolicy().getNumParityUnits();
int cellSize = StripedFileTestUtil.getDefaultECPolicy().getCellSize();
int totalSize = dataBlocks + parityBlocks;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(totalSize).build();
fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
Map<Integer, Integer> dnIndices = new HashMap<>();
ArrayList<DataNode> dnList = cluster.getDataNodes();
for (int i = 0; i < totalSize; i++) {
dnIndices.put(dnList.get(i).getIpcPort(), i);
}
// create file
Path ecDirPath = new Path("/striped");
fs.mkdir(ecDirPath, FsPermission.getDirDefault());
fs.getClient().setErasureCodingPolicy(ecDirPath.toString(),
StripedFileTestUtil.getDefaultECPolicy().getName());
Path file = new Path(ecDirPath, "corrupted");
final int length = cellSize * dataBlocks;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, file, bytes);
LocatedStripedBlock lsb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb,
cellSize, dataBlocks, parityBlocks);
// make an unrecoverable ec file with corrupted blocks
for(int i = 0; i < parityBlocks + 1; i++) {
int ipcPort = blks[i].getLocations()[0].getIpcPort();
int dnIndex = dnIndices.get(ipcPort);
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
File blkFile = MiniDFSCluster.getBlockFile(storageDir,
blks[i].getBlock());
Assert.assertTrue("Block file does not exist", blkFile.exists());
FileOutputStream out = new FileOutputStream(blkFile);
out.write("corruption".getBytes());
}
// disable the heart beat from DN so that the corrupted block record is
// kept in NameNode
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}
// Read the file to trigger reportBadBlocks
try {
IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), conf,
true);
} catch (IOException ie) {
assertTrue(ie.getMessage().contains(
"missingChunksNum=" + (parityBlocks + 1)));
}
waitForUnrecoverableBlockGroup(conf);
String outStr = runFsck(conf, 1, true, "/");
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
assertTrue(outStr.contains("Under-erasure-coded block groups:\t0"));
outStr = runFsck(conf, -1, true, "/", "-list-corruptfileblocks");
assertTrue(outStr.contains("has 1 CORRUPT blocks"));
}
@Test (timeout = 300000)
public void testFsckMissingECFile() throws Exception {
DistributedFileSystem fs = null;
int dataBlocks = StripedFileTestUtil.getDefaultECPolicy().getNumDataUnits();
int parityBlocks =
StripedFileTestUtil.getDefaultECPolicy().getNumParityUnits();
int cellSize = StripedFileTestUtil.getDefaultECPolicy().getCellSize();
int totalSize = dataBlocks + parityBlocks;
File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir)
.numDataNodes(totalSize).build();
fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
// create file
Path ecDirPath = new Path("/striped");
fs.mkdir(ecDirPath, FsPermission.getDirDefault());
fs.getClient().setErasureCodingPolicy(ecDirPath.toString(),
StripedFileTestUtil.getDefaultECPolicy().getName());
Path file = new Path(ecDirPath, "missing");
final int length = cellSize * dataBlocks;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, file, bytes);
// make an unrecoverable ec file with missing blocks
ArrayList<DataNode> dns = cluster.getDataNodes();
DatanodeID dnId;
for (int i = 0; i < parityBlocks + 1; i++) {
dnId = dns.get(i).getDatanodeId();
cluster.stopDataNode(dnId.getXferAddr());
cluster.setDataNodeDead(dnId);
}
waitForUnrecoverableBlockGroup(conf);
String outStr = runFsck(conf, 1, true, "/", "-files", "-blocks",
"-locations");
assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
assertTrue(outStr.contains("Live_repl=" + (dataBlocks - 1)));
assertTrue(outStr.contains("Under-erasure-coded block groups:\t0"));
outStr = runFsck(conf, -1, true, "/", "-list-corruptfileblocks");
assertTrue(outStr.contains("has 1 CORRUPT blocks"));
}
@Test
public void testFsckECBlockIdRedundantInternalBlocks() throws Exception {
final int dataBlocks = StripedFileTestUtil.getDefaultECPolicy().getNumDataUnits();
final int parityBlocks = StripedFileTestUtil.getDefaultECPolicy().getNumParityUnits();
final int cellSize = StripedFileTestUtil.getDefaultECPolicy().getCellSize();
final short groupSize = (short) (dataBlocks + parityBlocks);
final File builderBaseDir = new File(GenericTestUtils.getRandomizedTempPath());
final Path dirPath = new Path("/ec_dir");
final Path filePath = new Path(dirPath, "file");
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf, builderBaseDir).numDataNodes(groupSize + 1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
try {
fs.mkdirs(dirPath);
fs.setErasureCodingPolicy(dirPath, StripedFileTestUtil.getDefaultECPolicy().getName());
DFSTestUtil.createFile(fs, filePath, cellSize * dataBlocks * 2, (short) 1, 0L);
LocatedBlocks blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
LocatedStripedBlock block = (LocatedStripedBlock) blks.getLastLocatedBlock();
Assert.assertEquals(groupSize, block.getLocations().length);
//general test.
String runFsckResult = runFsck(conf, 0, true, "/",
"-blockId", block.getBlock().getBlockName());
assertTrue(runFsckResult.contains(block.getBlock().getBlockName()));
assertTrue(runFsckResult.contains("No. of Expected Replica: " + groupSize));
assertTrue(runFsckResult.contains("No. of live Replica: " + groupSize));
assertTrue(runFsckResult.contains("No. of redundant Replica: " + 0));
// stop a dn.
DatanodeInfo dnToStop = block.getLocations()[0];
MiniDFSCluster.DataNodeProperties dnProp = cluster.stopDataNode(dnToStop.getXferAddr());
cluster.setDataNodeDead(dnToStop);
// wait for reconstruction to happen.
DFSTestUtil.waitForReplication(fs, filePath, groupSize, 15 * 1000);
// bring the dn back: 10 internal blocks now.
cluster.restartDataNode(dnProp);
cluster.waitActive();
blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
block = (LocatedStripedBlock) blks.getLastLocatedBlock();
Assert.assertEquals(groupSize + 1, block.getLocations().length);
//general test, number of redundant internal block replicas.
runFsckResult = runFsck(conf, 0, true, "/",
"-blockId", block.getBlock().getBlockName());
assertTrue(runFsckResult.contains(block.getBlock().getBlockName()));
assertTrue(runFsckResult.contains("No. of Expected Replica: " + groupSize));
assertTrue(runFsckResult.contains("No. of live Replica: " + groupSize));
assertTrue(runFsckResult.contains("No. of redundant Replica: " + 1));
} finally {
cluster.shutdown();
}
}
private void waitForUnrecoverableBlockGroup(Configuration configuration)
throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bStream, true);
ToolRunner.run(new DFSck(configuration, out), new String[] {"/"});
String outStr = bStream.toString();
if (outStr.contains("UNRECOVERABLE BLOCK GROUPS")) {
return true;
}
} catch (Exception e) {
LOG.error("Exception caught", e);
Assert.fail("Caught unexpected exception.");
}
return false;
}
}, 1000, 60000);
}
@Test(timeout = 300000)
public void testFsckCorruptWhenOneReplicaIsCorrupt()
throws Exception {
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf,
new File(GenericTestUtils.getRandomizedTempPath()))
.nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(2)
.build()) {
cluster.waitActive();
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
cluster.transitionToActive(0);
String filePath = "/appendTest";
Path fileName = new Path(filePath);
DFSTestUtil.createFile(fs, fileName, 512, (short) 2, 0);
DFSTestUtil.waitReplication(fs, fileName, (short) 2);
Assert.assertTrue("File not created", fs.exists(fileName));
cluster.getDataNodes().get(1).shutdown();
DFSTestUtil.appendFile(fs, fileName, "appendCorruptBlock");
cluster.restartDataNode(1, true);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
return (
cluster.getNameNode(0).getNamesystem().getCorruptReplicaBlocks()
> 0);
}
}, 100, 5000);
DFSTestUtil.appendFile(fs, fileName, "appendCorruptBlock");
runFsck(cluster.getConfiguration(0), 0, true, "/");
}
}
@Test
public void testFsckNonPrivilegedListCorrupt() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("systest", new String[]{""});
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
String path = "/";
String outStr =
runFsck(conf, -1, true, path, "-list-corruptfileblocks");
assertFalse(outStr.contains(
"The list of corrupt blocks under path '" + path + "' are:"));
assertFalse(
outStr.contains("The filesystem under path '" + path + "' has "));
assertTrue(outStr
.contains("Failed to open path '" + path + "': Permission denied"));
return null;
}
});
}
}