MAPREDUCE-2156. Raid-aware FSCK. (Patrick Kling via dhruba)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1041702 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 07a96a0..0a5e56a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,8 @@
MAPREDUCE-2172. Added test-patch.properties required by test-patch.sh (nigel)
+ MAPREDUCE-2156. Raid-aware FSCK. (Patrick Kling via dhruba)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/src/contrib/raid/README b/src/contrib/raid/README
index d8d00f1..d445319 100644
--- a/src/contrib/raid/README
+++ b/src/contrib/raid/README
@@ -165,8 +165,12 @@
start-raidnode-remote.sh (and do the equivalent thing for stop-mapred.sh and
stop-raidnode-remote.sh).
-Run fsckraid periodically (being developed as part of another JIRA). This valudates parity
-blocsk of a file.
+To validate the integrity of a file system, run RaidFSCK as follows:
+$HADOOP_HOME/bin/hadoop org.apache.hadoop.raid.RaidShell -fsck [path]
+
+This will print a list of corrupt files (i.e., files which have lost too many
+blocks and can no longer be fixed by Raid).
+
--------------------------------------------------------------------------------
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
index 80108c6..eadcb1f 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
@@ -19,9 +19,14 @@
package org.apache.hadoop.raid;
import java.io.IOException;
+import java.io.FileNotFoundException;
import java.util.Collection;
import java.util.Map;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.LinkedHashMap;
+import java.util.HashMap;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.net.InetSocketAddress;
import javax.security.auth.login.LoginException;
@@ -39,6 +44,14 @@
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.HarFileSystem;
+
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.raid.protocol.PolicyList;
@@ -160,6 +173,7 @@
System.err.println(" [-help [cmd]]");
System.err.println(" [-recover srcPath1 corruptOffset]");
System.err.println(" [-recoverBlocks path1 path2...]");
+ System.err.println(" [-fsck [path]]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
}
@@ -191,6 +205,11 @@
printUsage(cmd);
return exitCode;
}
+ } else if ("-fsck".equals(cmd)) {
+ if ((argv.length < 1) || (argv.length > 2)) {
+ printUsage(cmd);
+ return exitCode;
+ }
}
try {
@@ -204,6 +223,15 @@
initializeLocal(conf);
recoverBlocks(argv, i);
exitCode = 0;
+ } else if ("-fsck".equals(cmd)) {
+ if (argv.length == 1) {
+ // if there are no args, check the whole file system
+ exitCode = fsck("/");
+ } else {
+ // argv.length == 2
+ // otherwise, check the path passed
+ exitCode = fsck(argv[1]);
+ }
} else {
exitCode = -1;
System.err.println(cmd.substring(1) + ": Unknown command");
@@ -296,6 +324,325 @@
}
/**
+ * checks whether a file has more than the allowable number of
+ * corrupt blocks and must therefore be considered corrupt
+ */
+ private boolean isFileCorrupt(final DistributedFileSystem dfs,
+ final Path filePath)
+ throws IOException {
+ // corruptBlocksPerStripe:
+ // map stripe # -> # of corrupt blocks in that stripe (data + parity)
+ HashMap<Integer, Integer> corruptBlocksPerStripe =
+ new LinkedHashMap<Integer, Integer>();
+
+ // read conf
+ final int stripeBlocks = RaidNode.getStripeLength(conf);
+
+ // figure out which blocks are missing/corrupted
+ final FileStatus fileStatus = dfs.getFileStatus(filePath);
+ final long blockSize = fileStatus.getBlockSize();
+ final long fileLength = fileStatus.getLen();
+ final long fileLengthInBlocks = (fileLength / blockSize) +
+ (((fileLength % blockSize) == 0) ? 0L : 1L);
+ final long fileStripes = (fileLengthInBlocks / stripeBlocks) +
+ (((fileLengthInBlocks % stripeBlocks) == 0) ? 0L : 1L);
+ final BlockLocation[] fileBlocks =
+ dfs.getFileBlockLocations(fileStatus, 0, fileLength);
+
+ // figure out which stripes these corrupted blocks belong to
+ for (BlockLocation fileBlock: fileBlocks) {
+ int blockNo = (int) (fileBlock.getOffset() / blockSize);
+ final int stripe = (int) (blockNo / stripeBlocks);
+ if (fileBlock.isCorrupt() ||
+ (fileBlock.getNames().length == 0 && fileBlock.getLength() > 0)) {
+ if (corruptBlocksPerStripe.get(stripe) == null) {
+ corruptBlocksPerStripe.put(stripe, 1);
+ } else {
+ corruptBlocksPerStripe.put(stripe, corruptBlocksPerStripe.
+ get(stripe) + 1);
+ }
+ LOG.info("file " + filePath.toString() + " corrupt in block " +
+ blockNo + "/" + fileLengthInBlocks + ", stripe " + stripe +
+ "/" + fileStripes);
+ } else {
+ LOG.info("file " + filePath.toString() + " OK in block " + blockNo +
+ "/" + fileLengthInBlocks + ", stripe " + stripe + "/" +
+ fileStripes);
+ }
+ }
+
+ RaidInfo raidInfo = getFileRaidInfo(dfs, filePath);
+
+ // now check parity blocks
+ if (raidInfo.raidType != RaidType.NONE) {
+ checkParityBlocks(filePath, corruptBlocksPerStripe, blockSize,
+ fileStripes, raidInfo);
+ }
+
+ final int maxCorruptBlocksPerStripe = raidInfo.parityBlocksPerStripe;
+
+ for (int corruptBlocksInStripe: corruptBlocksPerStripe.values()) {
+ if (corruptBlocksInStripe > maxCorruptBlocksPerStripe) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * holds the type of raid used for a particular file
+ */
+ private enum RaidType {
+ XOR,
+ RS,
+ NONE
+ }
+
+ /**
+ * holds raid type and parity file pair
+ */
+ private class RaidInfo {
+ public RaidInfo(final RaidType raidType,
+ final RaidNode.ParityFilePair parityPair,
+ final int parityBlocksPerStripe) {
+ this.raidType = raidType;
+ this.parityPair = parityPair;
+ this.parityBlocksPerStripe = parityBlocksPerStripe;
+ }
+ public final RaidType raidType;
+ public final RaidNode.ParityFilePair parityPair;
+ public final int parityBlocksPerStripe;
+ }
+
+ /**
+ * returns the raid for a given file
+ */
+ private RaidInfo getFileRaidInfo(final DistributedFileSystem dfs,
+ final Path filePath)
+ throws IOException {
+ // now look for the parity file
+ Path destPath = null;
+ RaidNode.ParityFilePair ppair = null;
+ try {
+ // look for xor parity file first
+ destPath = RaidNode.xorDestinationPath(conf);
+ ppair = RaidNode.getParityFile(destPath, filePath, conf);
+ } catch (FileNotFoundException ignore) {
+ }
+ if (ppair != null) {
+ return new RaidInfo(RaidType.XOR, ppair, 1);
+ } else {
+ // failing that, look for rs parity file
+ try {
+ destPath = RaidNode.rsDestinationPath(conf);
+ ppair = RaidNode.getParityFile(destPath, filePath, conf);
+ } catch (FileNotFoundException ignore) {
+ }
+ if (ppair != null) {
+ return new RaidInfo(RaidType.RS, ppair, RaidNode.rsParityLength(conf));
+ } else {
+ return new RaidInfo(RaidType.NONE, null, 0);
+ }
+ }
+ }
+
+ /**
+ * gets the parity blocks corresponding to file
+ * returns the parity blocks in case of DFS
+ * and the part blocks containing parity blocks
+ * in case of HAR FS
+ */
+ private BlockLocation[] getParityBlocks(final Path filePath,
+ final long blockSize,
+ final long fileStripes,
+ final RaidInfo raidInfo)
+ throws IOException {
+
+
+ final String parityPathStr = raidInfo.parityPair.getPath().toUri().
+ getPath();
+ FileSystem parityFS = raidInfo.parityPair.getFileSystem();
+
+ // get parity file metadata
+ FileStatus parityFileStatus = parityFS.
+ getFileStatus(new Path(parityPathStr));
+ long parityFileLength = parityFileStatus.getLen();
+
+ if (parityFileLength != fileStripes * raidInfo.parityBlocksPerStripe *
+ blockSize) {
+ throw new IOException("expected parity file of length" +
+ (fileStripes * raidInfo.parityBlocksPerStripe *
+ blockSize) +
+ " but got parity file of length " +
+ parityFileLength);
+ }
+
+ BlockLocation[] parityBlocks =
+ parityFS.getFileBlockLocations(parityFileStatus, 0L, parityFileLength);
+
+ if (parityFS instanceof DistributedFileSystem ||
+ parityFS instanceof DistributedRaidFileSystem) {
+ long parityBlockSize = parityFileStatus.getBlockSize();
+ if (parityBlockSize != blockSize) {
+ throw new IOException("file block size is " + blockSize +
+ " but parity file block size is " +
+ parityBlockSize);
+ }
+ } else if (parityFS instanceof HarFileSystem) {
+ LOG.info("HAR FS found");
+ } else {
+ LOG.warn("parity file system is not of a supported type");
+ }
+
+ return parityBlocks;
+ }
+
+ /**
+ * checks the parity blocks for a given file and modifies
+ * corruptBlocksPerStripe accordingly
+ */
+ private void checkParityBlocks(final Path filePath,
+ final HashMap<Integer, Integer>
+ corruptBlocksPerStripe,
+ final long blockSize,
+ final long fileStripes,
+ final RaidInfo raidInfo)
+ throws IOException {
+
+ // get the blocks of the parity file
+ // because of har, multiple blocks may be returned as one container block
+ BlockLocation[] containerBlocks = getParityBlocks(filePath, blockSize,
+ fileStripes, raidInfo);
+
+ long parityStripeLength = blockSize *
+ ((long) raidInfo.parityBlocksPerStripe);
+
+ long parityFileLength = parityStripeLength * fileStripes;
+
+ long parityBlocksFound = 0L;
+
+ for (BlockLocation cb: containerBlocks) {
+ if (cb.getLength() % blockSize != 0) {
+ throw new IOException("container block size is not " +
+ "multiple of parity block size");
+ }
+ int blocksInContainer = (int) (cb.getLength() / blockSize);
+ LOG.info("found container with offset " + cb.getOffset() +
+ ", length " + cb.getLength());
+
+ for (long offset = cb.getOffset();
+ offset < cb.getOffset() + cb.getLength();
+ offset += blockSize) {
+ long block = offset / blockSize;
+
+ int stripe = (int) (offset / parityStripeLength);
+
+ if (stripe < 0) {
+ // before the beginning of the parity file
+ continue;
+ }
+ if (stripe >= fileStripes) {
+ // past the end of the parity file
+ break;
+ }
+
+ parityBlocksFound++;
+
+ if (cb.isCorrupt() ||
+ (cb.getNames().length == 0 && cb.getLength() > 0)) {
+ LOG.info("parity file for " + filePath.toString() +
+ " corrupt in block " + block +
+ ", stripe " + stripe + "/" + fileStripes);
+
+ if (corruptBlocksPerStripe.get(stripe) == null) {
+ corruptBlocksPerStripe.put(stripe, 1);
+ } else {
+ corruptBlocksPerStripe.put(stripe,
+ corruptBlocksPerStripe.get(stripe) +
+ 1);
+ }
+ } else {
+ LOG.info("parity file for " + filePath.toString() +
+ " OK in block " + block +
+ ", stripe " + stripe + "/" + fileStripes);
+ }
+ }
+ }
+
+ long parityBlocksExpected = raidInfo.parityBlocksPerStripe * fileStripes;
+ if (parityBlocksFound != parityBlocksExpected ) {
+ throw new IOException("expected " + parityBlocksExpected +
+ " parity blocks but got " + parityBlocksFound);
+ }
+ }
+
+
+ /**
+ * checks the raided file system, prints a list of corrupt files to
+ * System.out and returns the number of corrupt files
+ */
+ public int fsck(final String path) throws IOException {
+
+ FileSystem fs = (new Path(path)).getFileSystem(conf);
+
+ // if we got a raid fs, get the underlying fs
+ if (fs instanceof DistributedRaidFileSystem) {
+ fs = ((DistributedRaidFileSystem) fs).getFileSystem();
+ }
+
+ // check that we have a distributed fs
+ if (!(fs instanceof DistributedFileSystem)) {
+ throw new IOException("expected DistributedFileSystem but got " +
+ fs.getClass().getName());
+ }
+ final DistributedFileSystem dfs = (DistributedFileSystem) fs;
+
+ // get conf settings
+ String xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
+ String rsPrefix = RaidNode.rsDestinationPath(conf).toUri().getPath();
+ if (!xorPrefix.endsWith("/")) {
+ xorPrefix = xorPrefix + "/";
+ }
+ if (!rsPrefix.endsWith("/")) {
+ rsPrefix = rsPrefix + "/";
+ }
+ LOG.info("prefixes: " + xorPrefix + ", " + rsPrefix);
+
+ // get a list of corrupted files (not considering parity blocks just yet)
+ // from the name node
+ // these are the only files we need to consider:
+ // if a file has no corrupted data blocks, it is OK even if some
+ // of its parity blocks are corrupted, so no further checking is
+ // necessary
+ final String[] files = RaidDFSUtil.getCorruptFiles(dfs);
+ final List<Path> corruptFileCandidates = new LinkedList<Path>();
+ for (final String f: files) {
+ final Path p = new Path(f);
+ // if this file is a parity file
+ // or if it does not start with the specified path,
+ // ignore it
+ if (!p.toString().startsWith(xorPrefix) &&
+ !p.toString().startsWith(rsPrefix) &&
+ p.toString().startsWith(path)) {
+ corruptFileCandidates.add(p);
+ }
+ }
+ // filter files marked for deletion
+ RaidUtils.filterTrash(conf, corruptFileCandidates);
+
+ int numberOfCorruptFiles = 0;
+
+ for (final Path corruptFileCandidate: corruptFileCandidates) {
+ if (isFileCorrupt(dfs, corruptFileCandidate)) {
+ System.out.println(corruptFileCandidate.toString());
+ numberOfCorruptFiles++;
+ }
+ }
+
+ return numberOfCorruptFiles;
+ }
+
+ /**
* main() has some simple utility methods
*/
public static void main(String argv[]) throws Exception {
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java
new file mode 100644
index 0000000..973f6df
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShellFsck.java
@@ -0,0 +1,767 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.zip.CRC32;
+
+import org.junit.Test;
+import org.junit.After;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.HarIndex;
+
+
+public class TestRaidShellFsck {
+ final static Log LOG =
+ LogFactory.getLog("org.apache.hadoop.raid.TestRaidShellFsck");
+ final static String TEST_DIR =
+ new File(System.
+ getProperty("test.build.data", "build/contrib/raid/test/data")).
+ getAbsolutePath();
+ final static String CONFIG_FILE = new File(TEST_DIR, "test-raid.xml").
+ getAbsolutePath();
+ final static long RELOAD_INTERVAL = 1000;
+ final static int NUM_DATANODES = 4;
+ final static int STRIPE_BLOCKS = 3; // number of blocks per stripe
+ final static int FILE_BLOCKS = 6; // number of blocks that file consists of
+ final static short REPL = 1; // replication factor before raiding
+ final static long BLOCK_SIZE = 8192L; // size of block in byte
+ final static String DIR_PATH = "/user/pkling/raidtest";
+ final static Path FILE_PATH0 =
+ new Path("/user/pkling/raidtest/raidfsck.test");
+ final static Path FILE_PATH1 =
+ new Path("/user/pkling/raidtest/raidfsck2.test");
+ final static Path RAID_PATH = new Path("/destraid/user/pkling/raidtest");
+ final static String HAR_NAME = "raidtest_raid.har";
+ final static String RAID_DIR = "/destraid";
+
+ Configuration conf = null;
+ Configuration raidConf = null;
+ Configuration clientConf = null;
+ MiniDFSCluster cluster = null;
+ DistributedFileSystem dfs = null;
+ RaidNode rnode = null;
+
+
+ RaidShell shell = null;
+ String[] args = null;
+
+
+ /**
+ * creates a MiniDFS instance with a raided file in it
+ */
+ private void setUp(boolean doHar) throws IOException, ClassNotFoundException {
+
+ final int timeBeforeHar;
+ if (doHar) {
+ timeBeforeHar = 0;
+ } else {
+ timeBeforeHar = -1;
+ }
+
+
+ new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+ conf = new Configuration();
+
+ conf.set("raid.config.file", CONFIG_FILE);
+ conf.setBoolean("raid.config.reload", true);
+ conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
+
+ // scan all policies once every 5 second
+ conf.setLong("raid.policy.rescan.interval", 5000);
+
+ // make all deletions not go through Trash
+ conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
+ // do not use map-reduce cluster for Raiding
+ conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
+ // use local block fixer
+ conf.set("raid.blockfix.classname",
+ "org.apache.hadoop.raid.LocalBlockFixer");
+
+ conf.set("raid.server.address", "localhost:0");
+ conf.setInt("hdfs.raid.stripeLength", STRIPE_BLOCKS);
+ conf.set("hdfs.raid.locations", RAID_DIR);
+
+ conf.setInt("dfs.corruptfilesreturned.max", 500);
+
+ conf.setBoolean("dfs.permissions", false);
+
+ cluster = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+ cluster.waitActive();
+ dfs = (DistributedFileSystem) cluster.getFileSystem();
+ String namenode = dfs.getUri().toString();
+
+ FileSystem.setDefaultUri(conf, namenode);
+
+ FileWriter fileWriter = new FileWriter(CONFIG_FILE);
+ fileWriter.write("<?xml version=\"1.0\"?>\n");
+ String str =
+ "<configuration> " +
+ " <srcPath prefix=\"" + DIR_PATH + "\"> " +
+ " <policy name = \"RaidTest1\"> " +
+ " <erasureCode>xor</erasureCode> " +
+ " <destPath> " + RAID_DIR + " </destPath> " +
+ " <property> " +
+ " <name>targetReplication</name> " +
+ " <value>1</value> " +
+ " <description>after RAIDing, decrease the replication " +
+ "factor of a file to this value.</description> " +
+ " </property> " +
+ " <property> " +
+ " <name>metaReplication</name> " +
+ " <value>1</value> " +
+ " <description> replication factor of parity file</description> " +
+ " </property> " +
+ " <property> " +
+ " <name>modTimePeriod</name> " +
+ " <value>2000</value> " +
+ " <description>time (milliseconds) after a file is modified " +
+ "to make it a candidate for RAIDing</description> " +
+ " </property> ";
+
+ if (timeBeforeHar >= 0) {
+ str +=
+ " <property> " +
+ " <name>time_before_har</name> " +
+ " <value>" + timeBeforeHar + "</value> " +
+ " <description> amount of time waited before har'ing parity " +
+ "files</description> " +
+ " </property> ";
+ }
+
+ str +=
+ " </policy>" +
+ " </srcPath>" +
+ "</configuration>";
+
+ fileWriter.write(str);
+ fileWriter.close();
+
+ createTestFile(FILE_PATH0);
+ createTestFile(FILE_PATH1);
+
+ Path[] filePaths = { FILE_PATH0, FILE_PATH1 };
+ raidTestFiles(RAID_PATH, filePaths, doHar);
+
+ clientConf = new Configuration(raidConf);
+ clientConf.set("fs.hdfs.impl",
+ "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
+ clientConf.set("fs.raid.underlyingfs.impl",
+ "org.apache.hadoop.hdfs.DistributedFileSystem");
+
+ // prepare shell and arguments
+ shell = new RaidShell(clientConf);
+ args = new String[2];
+ args[0] = "-fsck";
+ args[1] = DIR_PATH;
+
+ }
+
+ /**
+ * Creates test file consisting of random data
+ */
+ private void createTestFile(Path filePath) throws IOException {
+ Random rand = new Random();
+ FSDataOutputStream stm = dfs.create(filePath, true,
+ conf.getInt("io.file.buffer.size",
+ 4096), REPL, BLOCK_SIZE);
+
+ final byte[] b = new byte[(int) BLOCK_SIZE];
+ for (int i = 0; i < FILE_BLOCKS; i++) {
+ rand.nextBytes(b);
+ stm.write(b);
+ }
+ stm.close();
+ LOG.info("test file created");
+
+ }
+
+ /**
+ * raids test file
+ */
+ private void raidTestFiles(Path raidPath, Path[] filePaths, boolean doHar)
+ throws IOException, ClassNotFoundException {
+ // create RaidNode
+ raidConf = new Configuration(conf);
+ raidConf.set(RaidNode.RAID_LOCATION_KEY, RAID_DIR);
+ raidConf.setInt("raid.blockfix.interval", 1000);
+ raidConf.setLong("har.block.size", BLOCK_SIZE * 3);
+ // the RaidNode does the raiding inline (instead of submitting to MR node)
+ conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
+ rnode = RaidNode.createRaidNode(null, raidConf);
+
+ for (Path filePath: filePaths) {
+ long waitStart = System.currentTimeMillis();
+ boolean raided = false;
+
+ Path parityFilePath = new Path(RAID_DIR,
+ filePath.toString().substring(1));
+
+ while (!raided) {
+ try {
+ FileStatus[] listPaths = dfs.listStatus(raidPath);
+ if (listPaths != null) {
+ if (doHar) {
+ // case with HAR
+ for (FileStatus f: listPaths) {
+ if (f.getPath().toString().endsWith(".har")) {
+ // check if the parity file is in the index
+ final Path indexPath = new Path(f.getPath(), "_index");
+ final FileStatus indexFileStatus =
+ dfs.getFileStatus(indexPath);
+ final HarIndex harIndex =
+ new HarIndex(dfs.open(indexPath), indexFileStatus.getLen());
+ final HarIndex.IndexEntry indexEntry =
+ harIndex.findEntryByFileName(parityFilePath.toString());
+ if (indexEntry != null) {
+ LOG.info("raid file " + parityFilePath.toString() +
+ " found in Har archive: " +
+ f.getPath().toString() +
+ " ts=" + indexEntry.mtime);
+ raided = true;
+ break;
+ }
+ }
+ }
+
+ } else {
+ // case without HAR
+ for (FileStatus f : listPaths) {
+ Path found = new Path(f.getPath().toUri().getPath());
+ if (parityFilePath.equals(found)) {
+ LOG.info("raid file found: " + f.getPath().toString());
+ raided = true;
+ break;
+ }
+ }
+ }
+ }
+ } catch (FileNotFoundException ignore) {
+ }
+ if (!raided) {
+ if (System.currentTimeMillis() > waitStart + 40000L) {
+ LOG.error("parity file not created after 40s");
+ throw new IOException("parity file not HARed after 40s");
+ } else {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+ }
+ }
+
+ rnode.stop();
+ rnode.join();
+ rnode = null;
+ LOG.info("test file raided");
+ }
+
+ /**
+ * sleeps for up to 20s until the number of corrupt files
+ * in the file system is equal to the number specified
+ */
+ private void waitUntilCorruptFileCount(DistributedFileSystem dfs,
+ int corruptFiles)
+ throws IOException {
+ int initialCorruptFiles = RaidDFSUtil.getCorruptFiles(dfs).length;
+ long waitStart = System.currentTimeMillis();
+ while (RaidDFSUtil.getCorruptFiles(dfs).length != corruptFiles) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+
+ }
+
+ if (System.currentTimeMillis() > waitStart + 20000L) {
+ break;
+ }
+ }
+
+ long waited = System.currentTimeMillis() - waitStart;
+
+ int corruptFilesFound = RaidDFSUtil.getCorruptFiles(dfs).length;
+ if (corruptFilesFound != corruptFiles) {
+ throw new IOException("expected " + corruptFiles +
+ " corrupt files but got " +
+ corruptFilesFound);
+ }
+ }
+
+ /**
+ * removes a specified block from MiniDFS storage and reports it as corrupt
+ */
+ private void removeAndReportBlock(DistributedFileSystem blockDfs,
+ Path filePath,
+ LocatedBlock block)
+ throws IOException {
+ TestRaidDfs.corruptBlock(filePath, block.getBlock(), NUM_DATANODES, true);
+
+ // report deleted block to the name node
+ LocatedBlock[] toReport = { block };
+ blockDfs.getClient().getNamenode().reportBadBlocks(toReport);
+
+ }
+
+
+ /**
+ * removes a file block in the specified stripe
+ */
+ private void removeFileBlock(Path filePath, int stripe, int blockInStripe)
+ throws IOException {
+ LocatedBlocks fileBlocks = dfs.getClient().getNamenode().
+ getBlockLocations(filePath.toString(), 0, FILE_BLOCKS * BLOCK_SIZE);
+ if (fileBlocks.locatedBlockCount() != FILE_BLOCKS) {
+ throw new IOException("expected " + FILE_BLOCKS +
+ " file blocks but found " +
+ fileBlocks.locatedBlockCount());
+ }
+ if (blockInStripe >= STRIPE_BLOCKS) {
+ throw new IOException("blockInStripe is " + blockInStripe +
+ " but must be smaller than " + STRIPE_BLOCKS);
+ }
+ LocatedBlock block = fileBlocks.get(stripe * STRIPE_BLOCKS + blockInStripe);
+ removeAndReportBlock(dfs, filePath, block);
+ LOG.info("removed file " + filePath.toString() + " block " +
+ stripe * STRIPE_BLOCKS + " in stripe " + stripe);
+ }
+
+ /**
+ * removes a parity block in the specified stripe
+ */
+ private void removeParityBlock(Path filePath, int stripe) throws IOException {
+ // find parity file
+ Path destPath = new Path(RAID_DIR);
+ RaidNode.ParityFilePair ppair = null;
+
+ ppair = RaidNode.getParityFile(destPath, filePath, conf);
+ String parityPathStr = ppair.getPath().toUri().getPath();
+ LOG.info("parity path: " + parityPathStr);
+ FileSystem parityFS = ppair.getFileSystem();
+ if (!(parityFS instanceof DistributedFileSystem)) {
+ throw new IOException("parity file is not on distributed file system");
+ }
+ DistributedFileSystem parityDFS = (DistributedFileSystem) parityFS;
+
+
+ // now corrupt the block corresponding to the stripe selected
+ FileStatus parityFileStatus =
+ parityDFS.getFileStatus(new Path(parityPathStr));
+ long parityBlockSize = parityFileStatus.getBlockSize();
+ long parityFileLength = parityFileStatus.getLen();
+ long parityFileLengthInBlocks = (parityFileLength / parityBlockSize) +
+ (((parityFileLength % parityBlockSize) == 0) ? 0L : 1L);
+ if (parityFileLengthInBlocks <= stripe) {
+ throw new IOException("selected stripe " + stripe +
+ " but parity file only has " +
+ parityFileLengthInBlocks + " blocks");
+ }
+ if (parityBlockSize != BLOCK_SIZE) {
+ throw new IOException("file block size is " + BLOCK_SIZE +
+ " but parity file block size is " +
+ parityBlockSize);
+ }
+ LocatedBlocks parityFileBlocks = parityDFS.getClient().getNamenode().
+ getBlockLocations(parityPathStr, 0, parityFileLength);
+ if (parityFileBlocks.locatedBlockCount() != parityFileLengthInBlocks) {
+ throw new IOException("expected " + parityFileLengthInBlocks +
+ " parity file blocks but got " +
+ parityFileBlocks.locatedBlockCount() +
+ " blocks");
+ }
+ LocatedBlock parityFileBlock = parityFileBlocks.get(stripe);
+ removeAndReportBlock(parityDFS, new Path(parityPathStr), parityFileBlock);
+ LOG.info("removed parity file block/stripe " + stripe +
+ " for " + filePath.toString());
+
+ }
+
+ /**
+ * removes a block from the har part file
+ */
+ private void removeHarParityBlock(int block) throws IOException {
+ Path harPath = new Path(RAID_PATH, HAR_NAME);
+ FileStatus [] listPaths = dfs.listStatus(harPath);
+
+ boolean deleted = false;
+
+ for (FileStatus f: listPaths) {
+ if (f.getPath().getName().startsWith("part-")) {
+ final Path partPath = new Path(f.getPath().toUri().getPath());
+ final LocatedBlocks partBlocks = dfs.getClient().getNamenode().
+ getBlockLocations(partPath.toString(),
+ 0,
+ f.getLen());
+
+ if (partBlocks.locatedBlockCount() <= block) {
+ throw new IOException("invalid har block " + block);
+ }
+
+ final LocatedBlock partBlock = partBlocks.get(block);
+ removeAndReportBlock(dfs, partPath, partBlock);
+ LOG.info("removed block " + block + "/" +
+ partBlocks.locatedBlockCount() +
+ " of file " + partPath.toString() +
+ " block size " + partBlock.getBlockSize());
+ deleted = true;
+ break;
+ }
+ }
+
+ if (!deleted) {
+ throw new IOException("cannot find part file in " + harPath.toString());
+ }
+ }
+
+
+ /**
+ * returns the data directories for a data node
+ */
+ private File[] getDataDirs(int datanode) throws IOException{
+ File data_dir = new File(System.getProperty("test.build.data"),
+ "dfs/data/");
+ File dir1 = new File(data_dir, "data"+(2 * datanode + 1));
+ File dir2 = new File(data_dir, "data"+(2 * datanode + 2));
+ if (!(dir1.isDirectory() && dir2.isDirectory())) {
+ throw new IOException("data directories not found for data node " +
+ datanode + ": " + dir1.toString() + " " +
+ dir2.toString());
+ }
+
+ File[] dirs = new File[2];
+ dirs[0] = new File(dir1, "current");
+ dirs[1] = new File(dir2, "current");
+ return dirs;
+ }
+
+
+ /**
+ * checks fsck with no missing blocks
+ */
+ @Test
+ public void testClean() throws Exception {
+ LOG.info("testClean");
+ setUp(false);
+ int result = ToolRunner.run(shell, args);
+
+ assertTrue("fsck should return 0, but returns " +
+ Integer.toString(result), result == 0);
+ }
+
+
+ /**
+ * checks fsck with missing block in file block but not in parity block
+ */
+ @Test
+ public void testFileBlockMissing() throws Exception {
+ LOG.info("testFileBlockMissing");
+ setUp(false);
+ waitUntilCorruptFileCount(dfs, 0);
+ removeFileBlock(FILE_PATH0, 0, 0);
+ waitUntilCorruptFileCount(dfs, 1);
+
+ int result = ToolRunner.run(shell, args);
+
+ assertTrue("fsck should return 0, but returns " +
+ Integer.toString(result), result == 0);
+ }
+
+ /**
+ * checks fsck with missing block in parity block but not in file block
+ */
+ @Test
+ public void testParityBlockMissing() throws Exception {
+ LOG.info("testParityBlockMissing");
+ setUp(false);
+ waitUntilCorruptFileCount(dfs, 0);
+ removeParityBlock(FILE_PATH0, 0);
+ waitUntilCorruptFileCount(dfs, 1);
+
+ int result = ToolRunner.run(shell, args);
+
+ assertTrue("fsck should return 0, but returns " +
+ Integer.toString(result), result == 0);
+ }
+
+ /**
+ * checks fsck with missing block in both file block and parity block
+ * in different stripes
+ */
+ @Test
+ public void testFileBlockAndParityBlockMissingInDifferentStripes()
+ throws Exception {
+ LOG.info("testFileBlockAndParityBlockMissingInDifferentStripes");
+ setUp(false);
+ waitUntilCorruptFileCount(dfs, 0);
+ removeFileBlock(FILE_PATH0, 0, 0);
+ waitUntilCorruptFileCount(dfs, 1);
+ removeParityBlock(FILE_PATH0, 1);
+ waitUntilCorruptFileCount(dfs, 2);
+
+ int result = ToolRunner.run(shell, args);
+
+ assertTrue("fsck should return 0, but returns " +
+ Integer.toString(result), result == 0);
+ }
+
+ /**
+ * checks fsck with missing block in both file block and parity block
+ * in same stripe
+ */
+ @Test
+ public void testFileBlockAndParityBlockMissingInSameStripe()
+ throws Exception {
+ LOG.info("testFileBlockAndParityBlockMissingInSameStripe");
+ setUp(false);
+ waitUntilCorruptFileCount(dfs, 0);
+ removeParityBlock(FILE_PATH0, 1);
+ waitUntilCorruptFileCount(dfs, 1);
+ removeFileBlock(FILE_PATH0, 1, 0);
+ waitUntilCorruptFileCount(dfs, 2);
+
+ int result = ToolRunner.run(shell, args);
+
+ assertTrue("fsck should return 1, but returns " +
+ Integer.toString(result), result == 1);
+ }
+
+ /**
+ * checks fsck with two missing file blocks in same stripe
+ */
+ @Test
+ public void test2FileBlocksMissingInSameStripe()
+ throws Exception {
+ LOG.info("test2FileBlocksMissingInSameStripe");
+ setUp(false);
+ waitUntilCorruptFileCount(dfs, 0);
+ removeFileBlock(FILE_PATH0, 1, 1);
+ waitUntilCorruptFileCount(dfs, 1);
+ removeFileBlock(FILE_PATH0, 1, 0);
+ waitUntilCorruptFileCount(dfs, 1);
+
+ int result = ToolRunner.run(shell, args);
+
+ assertTrue("fsck should return 1, but returns " +
+ Integer.toString(result), result == 1);
+ }
+
+ /**
+ * checks fsck with two missing file blocks in different stripes
+ */
+ @Test
+ public void test2FileBlocksMissingInDifferentStripes()
+ throws Exception {
+ LOG.info("test2FileBlocksMissingInDifferentStripes");
+ setUp(false);
+ waitUntilCorruptFileCount(dfs, 0);
+ removeFileBlock(FILE_PATH0, 1, 1);
+ waitUntilCorruptFileCount(dfs, 1);
+ removeFileBlock(FILE_PATH0, 0, 0);
+ waitUntilCorruptFileCount(dfs, 1);
+
+ int result = ToolRunner.run(shell, args);
+
+ assertTrue("fsck should return 0, but returns " +
+ Integer.toString(result), result == 0);
+ }
+
+ /**
+ * checks fsck with file block missing (HAR)
+ * use 2 files to verify HAR offset logic in RaidShell fsck
+ * both files have one corrupt block, parity blocks are clean
+ *
+ * parity blocks in har (file.stripe):
+ * +-----+-----+-----+ +-----+
+ * | 0.0 | 0.1 | 1.0 | | 1.1 |
+ * +-----+-----+-----+ +-----+
+ * 0 1
+ *
+ */
+ @Test
+ public void testFileBlockMissingHar()
+ throws Exception {
+ LOG.info("testFileBlockMissingHar");
+ setUp(true);
+ waitUntilCorruptFileCount(dfs, 0);
+ removeFileBlock(FILE_PATH0, 1, 1);
+ removeFileBlock(FILE_PATH1, 1, 1);
+ waitUntilCorruptFileCount(dfs, 2);
+
+ int result = ToolRunner.run(shell, args);
+
+ assertTrue("fsck should return 0, but returns " +
+ Integer.toString(result), result == 0);
+ }
+
+ /**
+ * checks fsck with file block missing (HAR)
+ * use 2 files to verify HAR offset logic in RaidShell fsck
+ *
+ * parity blocks in har (file.stripe):
+ * +-----+-----+-----+ +-----+
+ * | 0.0 | 0.1 | 1.0 | | 1.1 |
+ * +-----+-----+-----+ +-----+
+ * 0 1
+ *
+ * corrupt file 0, stripe 0 file block 0
+ * corrupt file 0, stripe 1 file block 0
+ * corrupt file 1, stripe 0 file block 0
+ * corrupt file 1, stripe 1 file block 0
+ * corrupt har block 0
+ * both files should be corrupt
+ */
+ @Test
+ public void testFileBlockAndParityBlockMissingHar1()
+ throws Exception {
+ LOG.info("testFileBlockAndParityBlockMissingHar1");
+ setUp(true);
+ waitUntilCorruptFileCount(dfs, 0);
+ removeFileBlock(FILE_PATH0, 0, 0);
+ removeFileBlock(FILE_PATH0, 1, 0);
+ removeFileBlock(FILE_PATH1, 0, 0);
+ removeFileBlock(FILE_PATH1, 1, 0);
+ removeHarParityBlock(0);
+ waitUntilCorruptFileCount(dfs, 3);
+
+ int result = ToolRunner.run(shell, args);
+
+ assertTrue("fsck should return 2, but returns " +
+ Integer.toString(result), result == 2);
+ }
+
+ /**
+ * checks fsck with file block missing (HAR)
+ * use 2 files to verify HAR offset logic in RaidShell fsck
+ *
+ * parity blocks in har (file.stripe):
+ * +-----+-----+-----+ +-----+
+ * | 0.0 | 0.1 | 1.0 | | 1.1 |
+ * +-----+-----+-----+ +-----+
+ * 0 1
+ *
+ * corrupt file 0, stripe 0 file block 0
+ * corrupt file 0, stripe 1 file block 0
+ * corrupt file 1, stripe 0 file block 0
+ * corrupt file 1, stripe 1 file block 0
+ * corrupt har block 1
+ * only file 2 should be corrupt
+ */
+ @Test
+ public void testFileBlockAndParityBlockMissingHar2()
+ throws Exception {
+ LOG.info("testFileBlockAndParityBlockMissingHar2");
+ setUp(true);
+ waitUntilCorruptFileCount(dfs, 0);
+ removeFileBlock(FILE_PATH0, 0, 0);
+ removeFileBlock(FILE_PATH0, 1, 0);
+ removeFileBlock(FILE_PATH1, 0, 0);
+ removeFileBlock(FILE_PATH1, 1, 0);
+ removeHarParityBlock(1);
+ waitUntilCorruptFileCount(dfs, 3);
+
+ int result = ToolRunner.run(shell, args);
+
+ assertTrue("fsck should return 1, but returns " +
+ Integer.toString(result), result == 1);
+ }
+
+ /**
+ * checks that fsck does not report corrupt file that is not in
+ * the specified path
+ */
+ @Test
+ public void testPathFilter()
+ throws Exception {
+ LOG.info("testPathFilter");
+ setUp(false);
+ waitUntilCorruptFileCount(dfs, 0);
+ removeParityBlock(FILE_PATH0, 1);
+ waitUntilCorruptFileCount(dfs, 1);
+ removeFileBlock(FILE_PATH0, 1, 0);
+ waitUntilCorruptFileCount(dfs, 2);
+
+ String[] otherArgs = new String[2];
+ otherArgs[0] = "-fsck";
+ otherArgs[1] = "/user/pkling/other";
+ int result = ToolRunner.run(shell, otherArgs);
+
+ assertTrue("fsck should return 0, but returns " +
+ Integer.toString(result), result == 0);
+ }
+
+
+ @After
+ public void tearDown() throws Exception {
+ if (rnode != null) {
+ rnode.stop();
+ rnode.join();
+ rnode = null;
+ }
+
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+
+ dfs = null;
+
+ LOG.info("Test cluster shut down");
+ }
+
+
+}
+