MAPREDUCE-1819. RaidNode is now smarter in submitting Raid jobs. (Ramkumar
Vadali via schen)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1021873 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cfdf74..56aa6ce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -133,6 +133,9 @@
MAPREDUCE-1517. Supports streaming job to run in the background. (Bochun Bai
via amareshwari)
+ MAPREDUCE-1819. RaidNode is now smarter in submitting Raid jobs. (Ramkumar
+ Vadali via schen)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
@@ -325,7 +328,7 @@
MAPREDUCE-2095. Fixes Gridmix to run from compressed traces. (Ranjit
Mathew via amareshwari)
- MAPREDUCE-1980. DistributedRaidFileSystem now handles ChecksumException
+ MAPREDUCE-1908. DistributedRaidFileSystem now handles ChecksumException
correctly. (Ramkumar Vadali via schen)
Release 0.21.1 - Unreleased
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
index 49defb0..fd1c33e 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
@@ -35,7 +35,9 @@
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.raid.Decoder;
import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.XORDecoder;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -94,7 +96,7 @@
}
// find stripe length configured
- stripeLength = conf.getInt("hdfs.raid.stripeLength", RaidNode.DEFAULT_STRIPE_LENGTH);
+ stripeLength = RaidNode.getStripeLength(conf);
if (stripeLength == 0) {
LOG.info("dfs.raid.stripeLength is incorrectly defined to be " +
stripeLength + " Ignoring...");
@@ -343,9 +345,10 @@
clientConf.set("fs.hdfs.impl", clazz.getName());
// Disable caching so that a previously cached RaidDfs is not used.
clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
- Path npath = RaidNode.unRaid(clientConf, path,
- alternates[idx], stripeLength,
- corruptOffset);
+ Decoder decoder =
+ new XORDecoder(clientConf, RaidNode.getStripeLength(clientConf));
+ Path npath = RaidNode.unRaid(clientConf, path, alternates[idx],
+ decoder, stripeLength, corruptOffset);
FileSystem fs1 = getUnderlyingFileSystem(conf);
fs1.initialize(npath.toUri(), conf);
LOG.info("Opening alternate path " + npath + " at offset " + curpos);
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
index ecf63c4..deb9865 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
@@ -56,6 +56,10 @@
public static final long HAR_PARTFILE_SIZE = 10 * 1024 * 1024 * 1024l;
+ public static final int DISTRAID_MAX_JOBS = 10;
+
+ public static final int DISTRAID_MAX_FILES = 10000;
+
/**
* Time to wait after the config file has been modified before reloading it
* (this is done to prevent loading a file that hasn't been fully written).
@@ -71,6 +75,9 @@
private long reloadInterval = RELOAD_INTERVAL;
private long periodicity; // time between runs of all policies
private long harPartfileSize;
+ private int maxJobsPerPolicy; // Max no. of jobs running simultaneously for
+ // a job.
+ private int maxFilesPerJob; // Max no. of files raided by a job.
// Reload the configuration
private boolean doReload;
@@ -88,6 +95,10 @@
this.reloadInterval = conf.getLong("raid.config.reload.interval", RELOAD_INTERVAL);
this.periodicity = conf.getLong("raid.policy.rescan.interval", RESCAN_INTERVAL);
this.harPartfileSize = conf.getLong("raid.har.partfile.size", HAR_PARTFILE_SIZE);
+ this.maxJobsPerPolicy = conf.getInt("raid.distraid.max.jobs",
+ DISTRAID_MAX_JOBS);
+ this.maxFilesPerJob = conf.getInt("raid.distraid.max.files",
+ DISTRAID_MAX_FILES);
if (configFileName == null) {
String msg = "No raid.config.file given in conf - " +
"the Hadoop Raid utility cannot run. Aborting....";
@@ -306,6 +317,14 @@
return harPartfileSize;
}
+ public synchronized int getMaxJobsPerPolicy() {
+ return maxJobsPerPolicy;
+ }
+
+ public synchronized int getMaxFilesPerJob() {
+ return maxFilesPerJob;
+ }
+
/**
* Get a collection of all policies
*/
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java
new file mode 100644
index 0000000..bdb062a
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Represents a generic decoder that can be used to read a file with
+ * corrupt blocks by using the parity file.
+ * This is an abstract class, concrete subclasses need to implement
+ * fixErasedBlock.
+ */
+public abstract class Decoder {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.Decoder");
+ protected Configuration conf;
+ protected int stripeSize;
+ protected int paritySize;
+ protected Random rand;
+ protected int bufSize;
+ protected byte[][] readBufs;
+ protected byte[][] writeBufs;
+
+ Decoder(Configuration conf, int stripeSize, int paritySize) {
+ this.conf = conf;
+ this.stripeSize = stripeSize;
+ this.paritySize = paritySize;
+ this.rand = new Random();
+ this.bufSize = conf.getInt("raid.decoder.bufsize", 1024 * 1024);
+ this.readBufs = new byte[stripeSize + paritySize][];
+ this.writeBufs = new byte[paritySize][];
+ allocateBuffers();
+ }
+
+ private void allocateBuffers() {
+ for (int i = 0; i < stripeSize + paritySize; i++) {
+ readBufs[i] = new byte[bufSize];
+ }
+ for (int i = 0; i < paritySize; i++) {
+ writeBufs[i] = new byte[bufSize];
+ }
+ }
+
+ private void configureBuffers(long blockSize) {
+ if ((long)bufSize > blockSize) {
+ bufSize = (int)blockSize;
+ allocateBuffers();
+ } else if (blockSize % bufSize != 0) {
+ bufSize = (int)(blockSize / 256L); // heuristic.
+ if (bufSize == 0) {
+ bufSize = 1024;
+ }
+ bufSize = Math.min(bufSize, 1024 * 1024);
+ allocateBuffers();
+ }
+ }
+
+ /**
+ * The interface to generate a decoded file using the good portion of the
+ * source file and the parity file.
+ * @param fs The filesystem containing the source file.
+ * @param srcFile The damaged source file.
+ * @param parityFs The filesystem containing the parity file. This could be
+ * different from fs in case the parity file is part of a HAR archive.
+ * @param parityFile The parity file.
+ * @param errorOffset Known location of error in the source file. There could
+ * be additional errors in the source file that are discovered during
+ * the decode process.
+ * @param decodedFile The decoded file. This will have the exact same contents
+ * as the source file on success.
+ */
+ public void decodeFile(
+ FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
+ long errorOffset, Path decodedFile) throws IOException {
+
+ LOG.info("Create " + decodedFile + " for error at " +
+ srcFile + ":" + errorOffset);
+ FileStatus srcStat = fs.getFileStatus(srcFile);
+ long blockSize = srcStat.getBlockSize();
+ configureBuffers(blockSize);
+ // Move the offset to the start of the block.
+ errorOffset = (errorOffset / blockSize) * blockSize;
+
+ // Create the decoded file.
+ FSDataOutputStream out = fs.create(
+ decodedFile, false, conf.getInt("io.file.buffer.size", 64 * 1024),
+ srcStat.getReplication(), srcStat.getBlockSize());
+
+ // Open the source file.
+ FSDataInputStream in = fs.open(
+ srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+
+ // Start copying data block-by-block.
+ for (long offset = 0; offset < srcStat.getLen(); offset += blockSize) {
+ long limit = Math.min(blockSize, srcStat.getLen() - offset);
+ long bytesAlreadyCopied = 0;
+ if (offset != errorOffset) {
+ try {
+ in = fs.open(
+ srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+ in.seek(offset);
+ RaidUtils.copyBytes(in, out, readBufs[0], limit);
+ assert(out.getPos() == offset +limit);
+ LOG.info("Copied till " + out.getPos() + " from " + srcFile);
+ continue;
+ } catch (BlockMissingException e) {
+ LOG.info("Encountered BME at " + srcFile + ":" + offset);
+ bytesAlreadyCopied = out.getPos() - offset;
+ } catch (ChecksumException e) {
+ LOG.info("Encountered CE at " + srcFile + ":" + offset);
+ bytesAlreadyCopied = out.getPos() - offset;
+ }
+ }
+ // If we are here offset == errorOffset or we got an exception.
+ // Recover the block starting at offset.
+ fixErasedBlock(fs, srcFile, parityFs, parityFile, blockSize, offset,
+ bytesAlreadyCopied, limit, out);
+ }
+ out.close();
+
+ try {
+ fs.setOwner(decodedFile, srcStat.getOwner(), srcStat.getGroup());
+ fs.setPermission(decodedFile, srcStat.getPermission());
+ fs.setTimes(decodedFile, srcStat.getModificationTime(),
+ srcStat.getAccessTime());
+ } catch (Exception exc) {
+ LOG.info("Didn't manage to copy meta information because of " + exc +
+ " Ignoring...");
+ }
+
+ }
+
+ /**
+ * Recovers a corrupt block to local file.
+ *
+ * @param srcFs The filesystem containing the source file.
+ * @param srcPath The damaged source file.
+ * @param parityPath The filesystem containing the parity file. This could be
+ * different from fs in case the parity file is part of a HAR archive.
+ * @param parityFile The parity file.
+ * @param blockSize The block size of the file.
+ * @param blockOffset Known location of error in the source file. There could
+ * be additional errors in the source file that are discovered during
+ * the decode process.
+ * @param localBlockFile The file to write the block to.
+ * @param limit The maximum number of bytes to be written out.
+ * This is to prevent writing beyond the end of the file.
+ */
+ public void recoverBlockToFile(
+ FileSystem srcFs, Path srcPath, FileSystem parityFs, Path parityPath,
+ long blockSize, long blockOffset, File localBlockFile, long limit)
+ throws IOException {
+ OutputStream out = new FileOutputStream(localBlockFile);
+ fixErasedBlock(srcFs, srcPath, parityFs, parityPath,
+ blockSize, blockOffset, 0, limit, out);
+ out.close();
+ }
+
+ /**
+ * Implementation-specific mechanism of writing a fixed block.
+ * @param fs The filesystem containing the source file.
+ * @param srcFile The damaged source file.
+ * @param parityFs The filesystem containing the parity file. This could be
+ * different from fs in case the parity file is part of a HAR archive.
+ * @param parityFile The parity file.
+ * @param blockSize The maximum size of a block.
+ * @param errorOffset Known location of error in the source file. There could
+ * be additional errors in the source file that are discovered during
+ * the decode process.
+ * @param bytesToSkip After the block is generated, these many bytes should be
+ * skipped before writing to the output. This is needed because the
+ * output may have a portion of the block written from the source file
+ * before a new corruption is discovered in the block.
+ * @param limit The maximum number of bytes to be written out, including
+ * bytesToSkip. This is to prevent writing beyond the end of the file.
+ * @param out The output.
+ */
+ protected abstract void fixErasedBlock(
+ FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
+ long blockSize, long errorOffset, long bytesToSkip, long limit,
+ OutputStream out) throws IOException;
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
new file mode 100644
index 0000000..180d71f
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Implements depth-first traversal using a Stack object. The traversal
+ * can be stopped at any time and the state of traversal is saved.
+ */
+public class DirectoryTraversal {
+ public static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.raid.DirectoryTraversal");
+
+ private FileSystem fs;
+ private List<FileStatus> paths;
+ private int pathIdx = 0; // Next path to process.
+ private Stack<Node> stack = new Stack<Node>();
+
+ /**
+ * Represents a directory node in directory traversal.
+ */
+ static class Node {
+ private FileStatus path; // Path that this node represents.
+ private FileStatus[] elements; // Elements in the node.
+ private int idx = 0;
+
+ public Node(FileStatus path, FileStatus[] elements) {
+ this.path = path;
+ this.elements = elements;
+ }
+
+ public boolean hasNext() {
+ return idx < elements.length;
+ }
+
+ public FileStatus next() {
+ return elements[idx++];
+ }
+
+ public FileStatus path() {
+ return this.path;
+ }
+ }
+
+ /**
+ * Constructor.
+ * @param fs The filesystem to use.
+ * @param startPaths A list of paths that need to be traversed
+ */
+ public DirectoryTraversal(FileSystem fs, List<FileStatus> startPaths) {
+ this.fs = fs;
+ paths = startPaths;
+ pathIdx = 0;
+ }
+
+ /**
+ * Choose some files to RAID.
+ * @param conf Configuration to use.
+ * @param raidDestPrefix Prefix of the path to RAID to.
+ * @param modTimePeriod Time gap before RAIDing.
+ * @param limit Limit on the number of files to choose.
+ * @return list of files to RAID.
+ * @throws IOException
+ */
+ public List<FileStatus> selectFilesToRaid(
+ Configuration conf, int targetRepl, Path raidDestPrefix,
+ long modTimePeriod, int limit) throws IOException {
+ List<FileStatus> selected = new LinkedList<FileStatus>();
+ int numSelected = 0;
+
+ long now = System.currentTimeMillis();
+ while (numSelected < limit) {
+ FileStatus next = getNextFile();
+ if (next == null) {
+ break;
+ }
+ // We have the next file, do we want to select it?
+ // If the source file has fewer than or equal to 2 blocks, then skip it.
+ long blockSize = next.getBlockSize();
+ if (2 * blockSize >= next.getLen()) {
+ continue;
+ }
+
+ boolean select = false;
+ try {
+ Object ppair = RaidNode.getParityFile(
+ raidDestPrefix, next.getPath(), conf);
+ // Is there is a valid parity file?
+ if (ppair != null) {
+ // Is the source at the target replication?
+ if (next.getReplication() != targetRepl) {
+ // Select the file so that its replication can be set.
+ select = true;
+ } else {
+ // Nothing to do, don't select the file.
+ select = false;
+ }
+ } else if (next.getModificationTime() + modTimePeriod < now) {
+ // If there isn't a valid parity file, check if the file is too new.
+ select = true;
+ }
+ } catch (java.io.FileNotFoundException e) {
+ select = true; // destination file does not exist
+ }
+ if (select) {
+ selected.add(next);
+ numSelected++;
+ }
+ }
+
+ return selected;
+ }
+
+ /**
+ * Return the next file.
+ * @throws IOException
+ */
+ public FileStatus getNextFile() throws IOException {
+ // Check if traversal is done.
+ while (!doneTraversal()) {
+ // If traversal is not done, check if the stack is not empty.
+ while (!stack.isEmpty()) {
+ // If the stack is not empty, look at the top node.
+ Node node = stack.peek();
+ // Check if the top node has an element.
+ if (node.hasNext()) {
+ FileStatus element = node.next();
+ // Is the next element a directory.
+ if (!element.isDir()) {
+ // It is a file, return it.
+ return element;
+ }
+ // Next element is a directory, push it on to the stack and
+ // continue
+ try {
+ pushNewNode(element);
+ } catch (FileNotFoundException e) {
+ // Ignore and move to the next element.
+ }
+ continue;
+ } else {
+ // Top node has no next element, pop it and continue.
+ stack.pop();
+ continue;
+ }
+ }
+ // If the stack is empty, do we have more paths?
+ while (!paths.isEmpty()) {
+ FileStatus next = paths.remove(0);
+ pathIdx++;
+ if (!next.isDir()) {
+ return next;
+ }
+ try {
+ pushNewNode(next);
+ } catch (FileNotFoundException e) {
+ continue;
+ }
+ break;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Gets the next directory in the tree. The algorithm returns deeper directories
+ * first.
+ * @return A FileStatus representing the directory.
+ * @throws IOException
+ */
+ public FileStatus getNextDirectory() throws IOException {
+ // Check if traversal is done.
+ while (!doneTraversal()) {
+ // If traversal is not done, check if the stack is not empty.
+ while (!stack.isEmpty()) {
+ // If the stack is not empty, look at the top node.
+ Node node = stack.peek();
+ // Check if the top node has an element.
+ if (node.hasNext()) {
+ FileStatus element = node.next();
+ // Is the next element a directory.
+ if (element.isDir()) {
+ // Next element is a directory, push it on to the stack and
+ // continue
+ try {
+ pushNewNode(element);
+ } catch (FileNotFoundException e) {
+ // Ignore and move to the next element.
+ }
+ continue;
+ }
+ } else {
+ stack.pop();
+ return node.path;
+ }
+ }
+ // If the stack is empty, do we have more paths?
+ while (!paths.isEmpty()) {
+ FileStatus next = paths.remove(0);
+ pathIdx++;
+ if (next.isDir()) {
+ try {
+ pushNewNode(next);
+ } catch (FileNotFoundException e) {
+ continue;
+ }
+ break;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void pushNewNode(FileStatus stat) throws IOException {
+ if (!stat.isDir()) {
+ return;
+ }
+ Path p = stat.getPath();
+ LOG.info("Traversing to directory " + p);
+ FileStatus[] elements = fs.listStatus(p);
+ Node newNode = new Node(stat, (elements == null? new FileStatus[0]: elements));
+ stack.push(newNode);
+ }
+
+ public boolean doneTraversal() {
+ return paths.isEmpty() && stack.isEmpty();
+ }
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
index 7130e7a..85f4aca 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
@@ -34,17 +34,21 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.raid.RaidNode.Statistics;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.util.StringUtils;
@@ -111,6 +115,11 @@
List<RaidPolicyPathPair> raidPolicyPathPairList = new ArrayList<RaidPolicyPathPair>();
+ private JobClient jobClient;
+ private RunningJob runningJob;
+ private int jobEventCounter = 0;
+ private String lastReport = null;
+
/** Responsible for generating splits of the src file list. */
static class DistRaidInputFormat implements InputFormat<Text, PolicyInfo> {
/** Do nothing. */
@@ -184,6 +193,7 @@
private int failcount = 0;
private int succeedcount = 0;
private Statistics st = null;
+ private Reporter reporter = null;
private String getCountString() {
return "Succeeded: " + succeedcount + " Failed: " + failcount;
@@ -200,6 +210,7 @@
public void map(Text key, PolicyInfo policy,
OutputCollector<WritableComparable, Text> out, Reporter reporter)
throws IOException {
+ this.reporter = reporter;
try {
LOG.info("Raiding file=" + key.toString() + " policy=" + policy);
Path p = new Path(key.toString());
@@ -268,30 +279,71 @@
private static int getMapCount(int srcCount, int numNodes) {
int numMaps = (int) (srcCount / OP_PER_MAP);
numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
- return Math.max(numMaps, 1);
+ return Math.max(numMaps, MAX_MAPS_PER_NODE);
}
- /** invokes mapred job do parallel raiding */
- public void doDistRaid() throws IOException {
- if (raidPolicyPathPairList.size() == 0) {
- LOG.info("DistRaid has no paths to raid.");
- return;
+ /** Invokes a map-reduce job do parallel raiding.
+ * @return true if the job was started, false otherwise
+ */
+ public boolean startDistRaid() throws IOException {
+ assert(raidPolicyPathPairList.size() > 0);
+ if (setup()) {
+ this.jobClient = new JobClient(jobconf);
+ this.runningJob = this.jobClient.submitJob(jobconf);
+ LOG.info("Job Started: " + runningJob.getID());
+ return true;
}
- try {
- if (setup()) {
- JobClient.runJob(jobconf);
- }
- } finally {
- // delete job directory
- final String jobdir = jobconf.get(JOB_DIR_LABEL);
- if (jobdir != null) {
- final Path jobpath = new Path(jobdir);
- jobpath.getFileSystem(jobconf).delete(jobpath, true);
- }
- }
- raidPolicyPathPairList.clear();
+ return false;
}
+ /** Checks if the map-reduce job has completed.
+ *
+ * @return true if the job completed, false otherwise.
+ * @throws IOException
+ */
+ public boolean checkComplete() throws IOException {
+ JobID jobID = runningJob.getID();
+ if (runningJob.isComplete()) {
+ // delete job directory
+ final String jobdir = jobconf.get(JOB_DIR_LABEL);
+ if (jobdir != null) {
+ final Path jobpath = new Path(jobdir);
+ jobpath.getFileSystem(jobconf).delete(jobpath, true);
+ }
+ if (runningJob.isSuccessful()) {
+ LOG.info("Job Complete(Succeeded): " + jobID);
+ } else {
+ LOG.info("Job Complete(Failed): " + jobID);
+ }
+ raidPolicyPathPairList.clear();
+ Counters ctrs = runningJob.getCounters();
+ long filesRaided = ctrs.findCounter(Counter.FILES_SUCCEEDED).getValue();
+ long filesFailed = ctrs.findCounter(Counter.FILES_FAILED).getValue();
+ return true;
+ } else {
+ String report = (" job " + jobID +
+ " map " + StringUtils.formatPercent(runningJob.mapProgress(), 0)+
+ " reduce " + StringUtils.formatPercent(runningJob.reduceProgress(), 0));
+ if (!report.equals(lastReport)) {
+ LOG.info(report);
+ lastReport = report;
+ }
+ TaskCompletionEvent[] events =
+ runningJob.getTaskCompletionEvents(jobEventCounter);
+ jobEventCounter += events.length;
+ for(TaskCompletionEvent event : events) {
+ if (event.getTaskStatus() == TaskCompletionEvent.Status.FAILED) {
+ LOG.info(" Job " + jobID + " " + event.toString());
+ }
+ }
+ return false;
+ }
+ }
+
+ public boolean successful() throws IOException {
+ return runningJob.isSuccessful();
+ }
+
/**
* set up input file which has the list of input files.
*
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
new file mode 100644
index 0000000..e3d91b0
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Represents a generic encoder that can generate a parity file for a source
+ * file.
+ * This is an abstract class, concrete subclasses need to implement
+ * encodeFileImpl.
+ */
+public abstract class Encoder {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.Encoder");
+ protected Configuration conf;
+ protected int stripeSize;
+ protected int paritySize;
+ protected Random rand;
+ protected int bufSize;
+ protected byte[][] readBufs;
+ protected byte[][] writeBufs;
+
+ /**
+ * A class that acts as a sink for data, similar to /dev/null.
+ */
+ static class NullOutputStream extends OutputStream {
+ public void write(byte[] b) throws IOException {}
+ public void write(int b) throws IOException {}
+ public void write(byte[] b, int off, int len) throws IOException {}
+ }
+
+ Encoder(
+ Configuration conf, int stripeSize, int paritySize) {
+ this.conf = conf;
+ this.stripeSize = stripeSize;
+ this.paritySize = paritySize;
+ this.rand = new Random();
+ this.bufSize = conf.getInt("raid.encoder.bufsize", 1024 * 1024);
+ this.readBufs = new byte[stripeSize][];
+ this.writeBufs = new byte[paritySize][];
+ allocateBuffers();
+ }
+
+ private void allocateBuffers() {
+ for (int i = 0; i < stripeSize; i++) {
+ readBufs[i] = new byte[bufSize];
+ }
+ for (int i = 0; i < paritySize; i++) {
+ writeBufs[i] = new byte[bufSize];
+ }
+ }
+
+ private void configureBuffers(long blockSize) {
+ if ((long)bufSize > blockSize) {
+ bufSize = (int)blockSize;
+ allocateBuffers();
+ } else if (blockSize % bufSize != 0) {
+ bufSize = (int)(blockSize / 256L); // heuristic.
+ if (bufSize == 0) {
+ bufSize = 1024;
+ }
+ bufSize = Math.min(bufSize, 1024 * 1024);
+ allocateBuffers();
+ }
+ }
+
+ /**
+ * The interface to use to generate a parity file.
+ * This method can be called multiple times with the same Encoder object,
+ * thus allowing reuse of the buffers allocated by the Encoder object.
+ *
+ * @param fs The filesystem containing the source file.
+ * @param srcFile The source file.
+ * @param parityFile The parity file to be generated.
+ */
+ public void encodeFile(FileSystem fs, Path srcFile, Path parityFile,
+ short parityRepl, Progressable reporter) throws IOException {
+ FileStatus srcStat = fs.getFileStatus(srcFile);
+ long srcSize = srcStat.getLen();
+ long blockSize = srcStat.getBlockSize();
+
+ configureBuffers(blockSize);
+
+ // Create a tmp file to which we will write first.
+ Path parityTmp = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") +
+ parityFile.toUri().getPath() +
+ "." + rand.nextLong() + ".tmp");
+ FSDataOutputStream out = fs.create(
+ parityTmp,
+ true,
+ conf.getInt("io.file.buffer.size", 64 * 1024),
+ parityRepl,
+ blockSize);
+
+ try {
+ encodeFileToStream(fs, srcFile, srcSize, blockSize, out, reporter);
+ out.close();
+ out = null;
+ LOG.info("Wrote temp parity file " + parityTmp);
+
+ // delete destination if exists
+ if (fs.exists(parityFile)){
+ fs.delete(parityFile, false);
+ }
+ fs.mkdirs(parityFile.getParent());
+ if (!fs.rename(parityTmp, parityFile)) {
+ String msg = "Unable to rename file " + parityTmp + " to " + parityFile;
+ throw new IOException (msg);
+ }
+ LOG.info("Wrote parity file " + parityFile);
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ fs.delete(parityTmp, false);
+ }
+ }
+
+ /**
+ * Recovers a corrupt block in a parity file to a local file.
+ *
+ * The encoder generates paritySize parity blocks for a source file stripe.
+ * Since we want only one of the parity blocks, this function creates
+ * null outputs for the blocks to be discarded.
+ *
+ * @param fs The filesystem in which both srcFile and parityFile reside.
+ * @param srcFile The source file.
+ * @param srcSize The size of the source file.
+ * @param blockSize The block size for the source/parity files.
+ * @param corruptOffset The location of corruption in the parity file.
+ * @param localBlockFile The destination for the reovered block.
+ */
+ public void recoverParityBlockToFile(
+ FileSystem fs,
+ Path srcFile, long srcSize, long blockSize,
+ Path parityFile, long corruptOffset,
+ File localBlockFile) throws IOException {
+ OutputStream out = new FileOutputStream(localBlockFile);
+ try {
+ recoverParityBlockToStream(fs, srcFile, srcSize, blockSize, parityFile,
+ corruptOffset, out);
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
+ * Recovers a corrupt block in a parity file to a local file.
+ *
+ * The encoder generates paritySize parity blocks for a source file stripe.
+ * Since we want only one of the parity blocks, this function creates
+ * null outputs for the blocks to be discarded.
+ *
+ * @param fs The filesystem in which both srcFile and parityFile reside.
+ * @param srcFile The source file.
+ * @param srcSize The size of the source file.
+ * @param blockSize The block size for the source/parity files.
+ * @param corruptOffset The location of corruption in the parity file.
+ * @param out The destination for the reovered block.
+ */
+ public void recoverParityBlockToStream(
+ FileSystem fs,
+ Path srcFile, long srcSize, long blockSize,
+ Path parityFile, long corruptOffset,
+ OutputStream out) throws IOException {
+ LOG.info("Recovering parity block" + parityFile + ":" + corruptOffset);
+ // Get the start offset of the corrupt block.
+ corruptOffset = (corruptOffset / blockSize) * blockSize;
+ // Output streams to each block in the parity file stripe.
+ OutputStream[] outs = new OutputStream[paritySize];
+ long indexOfCorruptBlockInParityStripe =
+ (corruptOffset / blockSize) % paritySize;
+ LOG.info("Index of corrupt block in parity stripe: " +
+ indexOfCorruptBlockInParityStripe);
+ // Create a real output stream for the block we want to recover,
+ // and create null streams for the rest.
+ for (int i = 0; i < paritySize; i++) {
+ if (indexOfCorruptBlockInParityStripe == i) {
+ outs[i] = out;
+ } else {
+ outs[i] = new NullOutputStream();
+ }
+ }
+ // Get the stripe index and start offset of stripe.
+ long stripeIdx = corruptOffset / (paritySize * blockSize);
+ long stripeStart = stripeIdx * blockSize * stripeSize;
+
+ // Get input streams to each block in the source file stripe.
+ InputStream[] blocks = stripeInputs(fs, srcFile, stripeStart,
+ srcSize, blockSize);
+ LOG.info("Starting recovery by using source stripe " +
+ srcFile + ":" + stripeStart);
+ // Read the data from the blocks and write to the parity file.
+ encodeStripe(blocks, stripeStart, blockSize, outs, Reporter.NULL);
+ }
+
+ /**
+ * Recovers a corrupt block in a parity file to an output stream.
+ *
+ * The encoder generates paritySize parity blocks for a source file stripe.
+ * Since there is only one output provided, some blocks are written out to
+ * files before being written out to the output.
+ *
+ * @param fs The filesystem in which both srcFile and parityFile reside.
+ * @param srcFile The source file.
+ * @param srcSize The size of the source file.
+ * @param blockSize The block size for the source/parity files.
+ * @param out The destination for the reovered block.
+ */
+ private void encodeFileToStream(FileSystem fs, Path srcFile, long srcSize,
+ long blockSize, OutputStream out, Progressable reporter) throws IOException {
+ OutputStream[] tmpOuts = new OutputStream[paritySize];
+ // One parity block can be written directly to out, rest to local files.
+ tmpOuts[0] = out;
+ File[] tmpFiles = new File[paritySize - 1];
+ for (int i = 0; i < paritySize - 1; i++) {
+ tmpFiles[i] = File.createTempFile("parity", "_" + i);
+ LOG.info("Created tmp file " + tmpFiles[i]);
+ tmpFiles[i].deleteOnExit();
+ }
+ try {
+ // Loop over stripes in the file.
+ for (long stripeStart = 0; stripeStart < srcSize;
+ stripeStart += blockSize * stripeSize) {
+ reporter.progress();
+ LOG.info("Starting encoding of stripe " + srcFile + ":" + stripeStart);
+ // Create input streams for blocks in the stripe.
+ InputStream[] blocks = stripeInputs(fs, srcFile, stripeStart,
+ srcSize, blockSize);
+ // Create output streams to the temp files.
+ for (int i = 0; i < paritySize - 1; i++) {
+ tmpOuts[i + 1] = new FileOutputStream(tmpFiles[i]);
+ }
+ // Call the implementation of encoding.
+ encodeStripe(blocks, stripeStart, blockSize, tmpOuts, reporter);
+ // Close output streams to the temp files and write the temp files
+ // to the output provided.
+ for (int i = 0; i < paritySize - 1; i++) {
+ tmpOuts[i + 1].close();
+ tmpOuts[i + 1] = null;
+ InputStream in = new FileInputStream(tmpFiles[i]);
+ RaidUtils.copyBytes(in, out, writeBufs[i], blockSize);
+ reporter.progress();
+ }
+ }
+ } finally {
+ for (int i = 0; i < paritySize - 1; i++) {
+ if (tmpOuts[i + 1] != null) {
+ tmpOuts[i + 1].close();
+ }
+ tmpFiles[i].delete();
+ LOG.info("Deleted tmp file " + tmpFiles[i]);
+ }
+ }
+ }
+
+ /**
+ * Return input streams for each block in a source file's stripe.
+ * @param fs The filesystem where the file resides.
+ * @param srcFile The source file.
+ * @param stripeStartOffset The start offset of the stripe.
+ * @param srcSize The size of the source file.
+ * @param blockSize The block size for the source file.
+ */
+ protected InputStream[] stripeInputs(
+ FileSystem fs,
+ Path srcFile,
+ long stripeStartOffset,
+ long srcSize,
+ long blockSize
+ ) throws IOException {
+ InputStream[] blocks = new InputStream[stripeSize];
+ for (int i = 0; i < stripeSize; i++) {
+ long seekOffset = stripeStartOffset + i * blockSize;
+ if (seekOffset < srcSize) {
+ FSDataInputStream in = fs.open(
+ srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+ in.seek(seekOffset);
+ LOG.info("Opening stream at " + srcFile + ":" + seekOffset);
+ blocks[i] = in;
+ } else {
+ LOG.info("Using zeros at offset " + seekOffset);
+ // We have no src data at this offset.
+ blocks[i] = new RaidUtils.ZeroInputStream(
+ seekOffset + blockSize);
+ }
+ }
+ return blocks;
+ }
+
+ /**
+ * The implementation of generating parity data for a stripe.
+ *
+ * @param blocks The streams to blocks in the stripe.
+ * @param srcFile The source file.
+ * @param stripeStartOffset The start offset of the stripe
+ * @param blockSize The maximum size of a block.
+ * @param outs output streams to the parity blocks.
+ * @param reporter progress indicator.
+ */
+ protected abstract void encodeStripe(
+ InputStream[] blocks,
+ long stripeStartOffset,
+ long blockSize,
+ OutputStream[] outs,
+ Progressable reporter) throws IOException;
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java
new file mode 100644
index 0000000..e01fcba
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Periodically monitors the status of jobs registered with it.
+ *
+ * Jobs that are submitted for the same policy name are kept in the same list,
+ * and the list itself is kept in a map that has the policy name as the key and
+ * the list as value.
+ */
+class JobMonitor implements Runnable {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.JobMonitor");
+
+ volatile boolean running = true;
+
+ private Map<String, List<DistRaid>> jobs;
+ private long jobMonitorInterval;
+ private volatile long jobsMonitored = 0;
+ private volatile long jobsSucceeded = 0;
+
+ public JobMonitor(Configuration conf) {
+ jobMonitorInterval = conf.getLong("raid.jobmonitor.interval", 60000);
+ jobs = new java.util.HashMap<String, List<DistRaid>>();
+ }
+
+ public void run() {
+ while (running) {
+ try {
+ LOG.info("JobMonitor thread continuing to run...");
+ doMonitor();
+ } catch (Throwable e) {
+ LOG.error("JobMonitor encountered exception " +
+ StringUtils.stringifyException(e));
+ // All expected exceptions are caught by doMonitor(). It is better
+ // to exit now, this will prevent RaidNode from submitting more jobs
+ // since the number of running jobs will never decrease.
+ return;
+ }
+ }
+ }
+
+ /**
+ * Periodically checks status of running map-reduce jobs.
+ */
+ public void doMonitor() {
+ while (running) {
+ String[] keys = null;
+ // Make a copy of the names of the current jobs.
+ synchronized(jobs) {
+ keys = jobs.keySet().toArray(new String[0]);
+ }
+
+ // Check all the jobs. We do not want to block access to `jobs`
+ // because that will prevent new jobs from being added.
+ // This is safe because JobMonitor.run is the only code that can
+ // remove a job from `jobs`. Thus all elements in `keys` will have
+ // valid values.
+ Map<String, List<DistRaid>> finishedJobs =
+ new HashMap<String, List<DistRaid>>();
+
+ for (String key: keys) {
+ // For each policy being monitored, get the list of jobs running.
+ DistRaid[] jobListCopy = null;
+ synchronized(jobs) {
+ List<DistRaid> jobList = jobs.get(key);
+ synchronized(jobList) {
+ jobListCopy = jobList.toArray(new DistRaid[jobList.size()]);
+ }
+ }
+ // The code that actually contacts the JobTracker is not synchronized,
+ // it uses copies of the list of jobs.
+ for (DistRaid job: jobListCopy) {
+ // Check each running job.
+ try {
+ boolean complete = job.checkComplete();
+ if (complete) {
+ addJob(finishedJobs, key, job);
+ if (job.successful()) {
+ jobsSucceeded++;
+ }
+ }
+ } catch (IOException ioe) {
+ // If there was an error, consider the job finished.
+ addJob(finishedJobs, key, job);
+ }
+ }
+ }
+
+ if (finishedJobs.size() > 0) {
+ for (String key: finishedJobs.keySet()) {
+ List<DistRaid> finishedJobList = finishedJobs.get(key);
+ // Iterate through finished jobs and remove from jobs.
+ // removeJob takes care of locking.
+ for (DistRaid job: finishedJobList) {
+ removeJob(jobs, key, job);
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(jobMonitorInterval);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+
+ public int runningJobsCount(String key) {
+ int count = 0;
+ synchronized(jobs) {
+ if (jobs.containsKey(key)) {
+ List<DistRaid> jobList = jobs.get(key);
+ synchronized(jobList) {
+ count = jobList.size();
+ }
+ }
+ }
+ return count;
+ }
+
+ public void monitorJob(String key, DistRaid job) {
+ addJob(jobs, key, job);
+ jobsMonitored++;
+ }
+
+ public long jobsMonitored() {
+ return this.jobsMonitored;
+ }
+
+ public long jobsSucceeded() {
+ return this.jobsSucceeded;
+ }
+
+ private static void addJob(Map<String, List<DistRaid>> jobsMap,
+ String jobName, DistRaid job) {
+ synchronized(jobsMap) {
+ List<DistRaid> list = null;
+ if (jobsMap.containsKey(jobName)) {
+ list = jobsMap.get(jobName);
+ } else {
+ list = new LinkedList<DistRaid>();
+ jobsMap.put(jobName, list);
+ }
+ synchronized(list) {
+ list.add(job);
+ }
+ }
+ }
+
+ private static void removeJob(Map<String, List<DistRaid>> jobsMap,
+ String jobName, DistRaid job) {
+ synchronized(jobsMap) {
+ if (jobsMap.containsKey(jobName)) {
+ List<DistRaid> list = jobsMap.get(jobName);
+ synchronized(list) {
+ for (Iterator<DistRaid> it = list.iterator(); it.hasNext(); ) {
+ DistRaid val = it.next();
+ if (val == job) {
+ it.remove();
+ }
+ }
+ if (list.size() == 0) {
+ jobsMap.remove(jobName);
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java
new file mode 100644
index 0000000..b30cfcf
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Wraps over multiple input streams and provides an input stream that is
+ * an XOR of the streams.
+ */
+class ParityInputStream extends InputStream {
+ private static final int DEFAULT_BUFSIZE = 5*1024*1024;
+ private InputStream[] streams;
+ private byte[] xor;
+ private byte[] buf;
+ private int bufSize;
+ private long remaining;
+ private int available = 0;
+ private int readPos = 0;
+
+ public ParityInputStream(
+ InputStream[] streams, long parityBlockSize, byte[] buf, byte[] xor) {
+ assert buf.length == xor.length;
+ bufSize = buf.length;
+ this.streams = streams;
+ remaining = parityBlockSize;
+ this.buf = buf;
+ this.xor = xor;
+ }
+
+ @Override
+ public int read() throws IOException {
+ makeAvailable();
+ if (available == 0) {
+ return -1;
+ }
+ int ret = xor[readPos];
+ readPos++;
+ available--;
+ return ret;
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ makeAvailable();
+ if (available == 0) {
+ return -1;
+ }
+ int ret = Math.min(len, available);
+ for (int i = 0; i < ret; ++i) {
+ b[off+i] = xor[readPos+i];
+ }
+ readPos += ret;
+ available -= ret;
+ return ret;
+ }
+
+ public void close() throws IOException {
+ for (InputStream i: streams) {
+ i.close();
+ }
+ }
+
+ /**
+ * Send the contents of the stream to the sink.
+ * @param sink
+ * @param reporter
+ * @throws IOException
+ */
+ public void drain(OutputStream sink, Progressable reporter)
+ throws IOException {
+
+ while (true) {
+ makeAvailable();
+ if (available == 0) {
+ break;
+ }
+ sink.write(xor, readPos, available);
+ available = 0;
+ if (reporter != null) {
+ reporter.progress();
+ }
+ }
+ }
+
+ /**
+ * Make some bytes available for reading in the internal buffer.
+ * @throws IOException
+ */
+ private void makeAvailable() throws IOException {
+ if (available > 0 || remaining <= 0) {
+ return;
+ }
+ // Read some bytes from the first stream.
+ int xorlen = (int)Math.min(remaining, bufSize);
+ readExact(streams[0], xor, xorlen);
+
+ // Read bytes from all the other streams and xor them.
+ for (int i = 1; i < streams.length; i++) {
+ readExact(streams[i], buf, xorlen);
+
+ for (int j = 0; j < xorlen; j++) {
+ xor[j] ^= buf[j];
+ }
+ }
+
+ remaining -= xorlen;
+ available = xorlen;
+ readPos = 0;
+ readPos = 0;
+ }
+
+ private static void readExact(InputStream in, byte[] bufs, int toRead)
+ throws IOException {
+ int tread = 0;
+ while (tread < toRead) {
+ int read = in.read(bufs, tread, toRead - tread);
+ if (read == -1) {
+ // If the stream ends, fill in zeros.
+ Arrays.fill(bufs, tread, toRead, (byte)0);
+ tread = toRead;
+ } else {
+ tread += read;
+ }
+ }
+ assert tread == toRead;
+ }
+
+}
+
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
index af226a5..90f8f3e 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
@@ -21,10 +21,12 @@
import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.LinkedList;
import java.util.Iterator;
import java.util.Arrays;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.HashSet;
@@ -73,7 +75,9 @@
public static final long SLEEP_TIME = 10000L; // 10 seconds
public static final int DEFAULT_PORT = 60000;
public static final int DEFAULT_STRIPE_LENGTH = 5; // default value of stripe length
+ public static final String STRIPE_LENGTH_KEY = "hdfs.raid.stripeLength";
public static final String DEFAULT_RAID_LOCATION = "/raid";
+ public static final String RAID_LOCATION_KEY = "hdfs.raid.locations";
public static final String HAR_SUFFIX = "_raid.har";
/** RPC server */
@@ -101,6 +105,10 @@
/** Deamon thread to har raid directories */
Daemon harThread = null;
+ /** Daemon thread to monitor distributed raid job progress */
+ JobMonitor jobMonitor = null;
+ Daemon jobMonitorThread = null;
+
/** Do do distributed raiding */
boolean isRaidLocal = false;
@@ -168,6 +176,7 @@
try {
initialize(conf);
} catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
this.stop();
throw e;
} catch (Exception e) {
@@ -193,6 +202,7 @@
try {
if (server != null) server.join();
if (triggerThread != null) triggerThread.join();
+ if (jobMonitorThread != null) jobMonitorThread.join();
if (purgeThread != null) purgeThread.join();
} catch (InterruptedException ie) {
// do nothing
@@ -210,6 +220,8 @@
running = false;
if (server != null) server.stop();
if (triggerThread != null) triggerThread.interrupt();
+ if (jobMonitor != null) jobMonitor.running = false;
+ if (jobMonitorThread != null) jobMonitorThread.interrupt();
if (purgeThread != null) purgeThread.interrupt();
}
@@ -252,6 +264,10 @@
running = true;
this.server.start(); // start RPC server
+ this.jobMonitor = new JobMonitor(conf);
+ this.jobMonitorThread = new Daemon(this.jobMonitor);
+ this.jobMonitorThread.start();
+
// start the deamon thread to fire polcies appropriately
this.triggerThread = new Daemon(new TriggerMonitor());
this.triggerThread.start();
@@ -282,22 +298,15 @@
LOG.info("Recover File for " + inStr + " for corrupt offset " + corruptOffset);
Path inputPath = new Path(inStr);
Path srcPath = inputPath.makeQualified(inputPath.getFileSystem(conf));
- PolicyInfo info = findMatchingPolicy(srcPath);
- if (info != null) {
+ // find stripe length from config
+ int stripeLength = getStripeLength(conf);
- // find stripe length from config
- int stripeLength = getStripeLength(conf, info);
-
- // create destination path prefix
- String destPrefix = getDestinationPath(conf, info);
- Path destPath = new Path(destPrefix.trim());
- FileSystem fs = FileSystem.get(destPath.toUri(), conf);
- destPath = destPath.makeQualified(fs);
-
- Path unraided = unRaid(conf, srcPath, destPath, stripeLength, corruptOffset);
- if (unraided != null) {
- return unraided.toString();
- }
+ Path destPref = getDestinationPath(conf);
+ Decoder decoder = new XORDecoder(conf, RaidNode.getStripeLength(conf));
+ Path unraided = unRaid(conf, srcPath, destPref, decoder,
+ stripeLength, corruptOffset);
+ if (unraided != null) {
+ return unraided.toString();
}
return null;
}
@@ -306,6 +315,11 @@
* Periodically checks to see which policies should be fired.
*/
class TriggerMonitor implements Runnable {
+
+ private Map<String, Long> scanTimes = new HashMap<String, Long>();
+ private Map<String, DirectoryTraversal> scanState =
+ new HashMap<String, DirectoryTraversal>();
+
/**
*/
public void run() {
@@ -320,6 +334,109 @@
}
}
+ /**
+ * Should we select more files for a policy.
+ */
+ private boolean shouldSelectFiles(PolicyInfo info) {
+ String policyName = info.getName();
+ int runningJobsCount = jobMonitor.runningJobsCount(policyName);
+ // Is there a scan in progress for this policy?
+ if (scanState.containsKey(policyName)) {
+ int maxJobsPerPolicy = configMgr.getMaxJobsPerPolicy();
+
+ // If there is a scan in progress for this policy, we can have
+ // upto maxJobsPerPolicy running jobs.
+ return (runningJobsCount < maxJobsPerPolicy);
+ } else {
+ // If there isn't a scan in progress for this policy, we don't
+ // want to start a fresh scan if there is even one running job.
+ if (runningJobsCount >= 1) {
+ return false;
+ }
+ // Check the time of the last full traversal before starting a fresh
+ // traversal.
+ if (scanTimes.containsKey(policyName)) {
+ long lastScan = scanTimes.get(policyName);
+ return (now() > lastScan + configMgr.getPeriodicity());
+ } else {
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Returns a list of pathnames that needs raiding.
+ * The list of paths could be obtained by resuming a previously suspended
+ * traversal.
+ * The number of paths returned is limited by raid.distraid.max.jobs.
+ */
+ private List<FileStatus> selectFiles(PolicyInfo info) throws IOException {
+ Path destPrefix = getDestinationPath(conf);
+ String policyName = info.getName();
+ Path srcPath = info.getSrcPath();
+ long modTimePeriod = 0;
+ String str = info.getProperty("modTimePeriod");
+ if (str != null) {
+ modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
+ }
+ short srcReplication = 0;
+ str = info.getProperty("srcReplication");
+ if (str != null) {
+ srcReplication = Short.parseShort(info.getProperty("srcReplication"));
+ }
+
+ // Max number of files returned.
+ int selectLimit = configMgr.getMaxFilesPerJob();
+ int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
+
+ // If we have a pending traversal, resume it.
+ if (scanState.containsKey(policyName)) {
+ DirectoryTraversal dt = scanState.get(policyName);
+ LOG.info("Resuming traversal for policy " + policyName);
+ List<FileStatus> returnSet = dt.selectFilesToRaid(
+ conf, targetRepl, destPrefix, modTimePeriod, selectLimit);
+ if (dt.doneTraversal()) {
+ scanState.remove(policyName);
+ }
+ return returnSet;
+ }
+
+ // Expand destination prefix path.
+ String destpstr = destPrefix.toString();
+ if (!destpstr.endsWith(Path.SEPARATOR)) {
+ destpstr += Path.SEPARATOR;
+ }
+
+ List<FileStatus> returnSet = new LinkedList<FileStatus>();
+
+ FileSystem fs = srcPath.getFileSystem(conf);
+ FileStatus[] gpaths = fs.globStatus(srcPath);
+ if (gpaths != null) {
+ List<FileStatus> selectedPaths = new LinkedList<FileStatus>();
+ for (FileStatus onepath: gpaths) {
+ String pathstr = onepath.getPath().makeQualified(fs).toString();
+ if (!pathstr.endsWith(Path.SEPARATOR)) {
+ pathstr += Path.SEPARATOR;
+ }
+ if (pathstr.startsWith(destpstr) || destpstr.startsWith(pathstr)) {
+ LOG.info("Skipping source " + pathstr +
+ " because it conflicts with raid directory " + destpstr);
+ } else {
+ selectedPaths.add(onepath);
+ }
+ }
+
+ // Set the time for a new traversal.
+ scanTimes.put(policyName, now());
+ DirectoryTraversal dt = new DirectoryTraversal(fs, selectedPaths);
+ returnSet = dt.selectFilesToRaid(
+ conf, targetRepl, destPrefix, modTimePeriod, selectLimit);
+ if (!dt.doneTraversal()) {
+ scanState.put(policyName, dt);
+ }
+ }
+ return returnSet;
+ }
/**
* Keep processing policies.
@@ -328,18 +445,11 @@
private void doProcess() throws IOException, InterruptedException {
PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
- long prevExec = 0;
- DistRaid dr = null;
while (running) {
+ Thread.sleep(SLEEP_TIME);
- boolean reload = configMgr.reloadConfigsIfNecessary();
- while(!reload && now() < prevExec + configMgr.getPeriodicity()){
- Thread.sleep(SLEEP_TIME);
- reload = configMgr.reloadConfigsIfNecessary();
- }
+ configMgr.reloadConfigsIfNecessary();
- prevExec = now();
-
// activate all categories
Collection<PolicyList> all = configMgr.getAllPolicies();
@@ -348,35 +458,18 @@
PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
Arrays.sort(sorted, lexi);
- if (!isRaidLocal) {
- dr = new DistRaid(conf);
- }
- // paths we have processed so far
- List<String> processed = new LinkedList<String>();
-
for (PolicyList category : sorted) {
for (PolicyInfo info: category.getAll()) {
- long modTimePeriod = 0;
- short srcReplication = 0;
- String str = info.getProperty("modTimePeriod");
- if (str != null) {
- modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
- }
- str = info.getProperty("srcReplication");
- if (str != null) {
- srcReplication = Short.parseShort(info.getProperty("srcReplication"));
+ if (!shouldSelectFiles(info)) {
+ continue;
}
LOG.info("Triggering Policy Filter " + info.getName() +
" " + info.getSrcPath());
List<FileStatus> filteredPaths = null;
- try {
- filteredPaths = selectFiles(conf, info.getSrcPath(),
- getDestinationPath(conf, info),
- modTimePeriod,
- srcReplication,
- prevExec);
+ try {
+ filteredPaths = selectFiles(info);
} catch (Exception e) {
LOG.info("Exception while invoking filter on policy " + info.getName() +
" srcPath " + info.getSrcPath() +
@@ -389,95 +482,41 @@
continue;
}
- // If any of the filtered path has already been accepted
- // by a previous policy, then skip it.
- for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
- String fs = iter.next().getPath().toString() + "/";
- for (String p : processed) {
- if (p.startsWith(fs)) {
- iter.remove();
- break;
+ // Apply the action on accepted paths
+ LOG.info("Triggering Policy Action " + info.getName() +
+ " " + info.getSrcPath());
+ try {
+ if (isRaidLocal){
+ doRaid(conf, info, filteredPaths);
+ }
+ else{
+ // We already checked that no job for this policy is running
+ // So we can start a new job.
+ DistRaid dr = new DistRaid(conf);
+ //add paths for distributed raiding
+ dr.addRaidPaths(info, filteredPaths);
+ boolean started = dr.startDistRaid();
+ if (started) {
+ jobMonitor.monitorJob(info.getName(), dr);
}
}
- }
-
- // Apply the action on accepted paths
- LOG.info("Triggering Policy Action " + info.getName());
- try {
- if (isRaidLocal){
- doRaid(conf, info, filteredPaths);
- }
- else{
- //add paths for distributed raiding
- dr.addRaidPaths(info, filteredPaths);
- }
} catch (Exception e) {
LOG.info("Exception while invoking action on policy " + info.getName() +
" srcPath " + info.getSrcPath() +
" exception " + StringUtils.stringifyException(e));
continue;
}
-
- // add these paths to processed paths
- for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
- String p = iter.next().getPath().toString() + "/";
- processed.add(p);
- }
- }
- }
- processed.clear(); // free up memory references before yielding
-
- //do the distributed raiding
- if (!isRaidLocal) {
- dr.doDistRaid();
- }
- }
- }
- }
-
- /**
- * Returns the policy that matches the specified path.
- * The method below finds the first policy that matches an input path. Since different
- * policies with different purposes and destinations might be associated with the same input
- * path, we should be skeptical about the places using the method and we should try to change
- * the code to avoid it.
- */
- private PolicyInfo findMatchingPolicy(Path inpath) throws IOException {
- PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
- Collection<PolicyList> all = configMgr.getAllPolicies();
-
- // sort all policies by reverse lexicographical order. This is needed
- // to make the nearest policy take precedence.
- PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
- Arrays.sort(sorted, lexi);
-
- // loop through all categories of policies.
- for (PolicyList category : sorted) {
- PolicyInfo first = category.getAll().iterator().next();
- if (first != null) {
- Path[] srcPaths = first.getSrcPathExpanded(); // input src paths unglobbed
- if (srcPaths == null) {
- continue;
- }
-
- for (Path src: srcPaths) {
- if (inpath.toString().startsWith(src.toString())) {
- // if the srcpath is a prefix of the specified path
- // we have a match!
- return first;
}
}
}
}
- return null; // no matching policies
}
-
static private Path getOriginalParityFile(Path destPathPrefix, Path srcPath) {
return new Path(destPathPrefix, makeRelative(srcPath));
}
- private static class ParityFilePair {
+ static class ParityFilePair {
private Path path;
private FileSystem fs;
@@ -506,11 +545,19 @@
* @return Path object representing the parity file of the source
* @throws IOException
*/
- static private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath, Configuration conf) throws IOException {
+ static ParityFilePair getParityFile(Path destPathPrefix, Path srcPath, Configuration conf) throws IOException {
Path srcParent = srcPath.getParent();
FileSystem fsDest = destPathPrefix.getFileSystem(conf);
-
+ FileSystem fsSrc = srcPath.getFileSystem(conf);
+
+ FileStatus srcStatus = null;
+ try {
+ srcStatus = fsSrc.getFileStatus(srcPath);
+ } catch (java.io.FileNotFoundException e) {
+ return null;
+ }
+
Path outDir = destPathPrefix;
if (srcParent != null) {
if (srcParent.getParent() == null) {
@@ -520,36 +567,36 @@
}
}
+
+ //CASE 1: CHECK HAR - Must be checked first because har is created after
+ // parity file and returning the parity file could result in error while
+ // reading it.
+ Path outPath = getOriginalParityFile(destPathPrefix, srcPath);
String harDirName = srcParent.getName() + HAR_SUFFIX;
Path HarPath = new Path(outDir,harDirName);
- Path outPath = getOriginalParityFile(destPathPrefix, srcPath);
-
- if (!fsDest.exists(HarPath)) { // case 1: no HAR file
- return new ParityFilePair(outPath,fsDest);
+ if (fsDest.exists(HarPath)) {
+ URI HarPathUri = HarPath.toUri();
+ Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath());
+ FileSystem fsHar = new HarFileSystem(fsDest);
+ fsHar.initialize(inHarPath.toUri(), conf);
+ if (fsHar.exists(inHarPath)) {
+ FileStatus inHar = fsHar.getFileStatus(inHarPath);
+ if (inHar.getModificationTime() == srcStatus.getModificationTime()) {
+ return new ParityFilePair(inHarPath,fsHar);
+ }
+ }
+ }
+
+ //CASE 2: CHECK PARITY
+ try {
+ FileStatus outHar = fsDest.getFileStatus(outPath);
+ if (outHar.getModificationTime() == srcStatus.getModificationTime()) {
+ return new ParityFilePair(outPath,fsDest);
+ }
+ } catch (java.io.FileNotFoundException e) {
}
- URI HarPathUri = HarPath.toUri();
- Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath());
- FileSystem fsHar = new HarFileSystem(fsDest);
- fsHar.initialize(inHarPath.toUri(), conf);
-
- if (!fsHar.exists(inHarPath)) { // case 2: no file inside HAR
- return new ParityFilePair(outPath,fsDest);
- }
-
- if (! fsDest.exists(outPath)) { // case 3: only inside HAR
- return new ParityFilePair(inHarPath,fsHar);
- }
-
- // both inside and outside HAR. Should return most recent
- FileStatus inHar = fsHar.getFileStatus(inHarPath);
- FileStatus outHar = fsDest.getFileStatus(outPath);
-
- if (inHar.getModificationTime() >= outHar.getModificationTime()) {
- return new ParityFilePair(inHarPath,fsHar);
- }
-
- return new ParityFilePair(outPath,fsDest);
+ return null; // NULL if no parity file
}
private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath) throws IOException {
@@ -558,108 +605,6 @@
}
- /**
- * Returns a list of pathnames that needs raiding.
- */
- List<FileStatus> selectFiles(Configuration conf, Path p, String destPrefix,
- long modTimePeriod, short srcReplication, long now) throws IOException {
-
- List<FileStatus> returnSet = new LinkedList<FileStatus>();
-
- // expand destination prefix path
- Path destp = new Path(destPrefix.trim());
- FileSystem fs = FileSystem.get(destp.toUri(), conf);
- destp = destp.makeQualified(fs);
-
- // Expand destination prefix path.
- String destpstr = destp.toString();
- if (!destpstr.endsWith(Path.SEPARATOR)) {
- destpstr += Path.SEPARATOR;
- }
-
- fs = p.getFileSystem(conf);
- FileStatus[] gpaths = fs.globStatus(p);
- if (gpaths != null) {
- for (FileStatus onepath: gpaths) {
- String pathstr = onepath.getPath().makeQualified(fs).toString();
- if (!pathstr.endsWith(Path.SEPARATOR)) {
- pathstr += Path.SEPARATOR;
- }
- if (pathstr.startsWith(destpstr) || destpstr.startsWith(pathstr)) {
- LOG.info("Skipping source " + pathstr +
- " because it conflicts with raid directory " + destpstr);
- } else {
- recurse(fs, conf, destp, onepath, returnSet, modTimePeriod, srcReplication, now);
- }
- }
- }
- return returnSet;
- }
-
- /**
- * Pick files that need to be RAIDed.
- */
- private void recurse(FileSystem srcFs,
- Configuration conf,
- Path destPathPrefix,
- FileStatus src,
- List<FileStatus> accept,
- long modTimePeriod,
- short srcReplication,
- long now) throws IOException {
- Path path = src.getPath();
- FileStatus[] files = null;
- try {
- files = srcFs.listStatus(path);
- } catch (java.io.FileNotFoundException e) {
- // ignore error because the file could have been deleted by an user
- LOG.info("FileNotFound " + path + " " + StringUtils.stringifyException(e));
- } catch (IOException e) {
- throw e;
- }
-
- // If the modTime of the raid file is later than the modtime of the
- // src file and the src file has not been modified
- // recently, then that file is a candidate for RAID.
-
- if (src.isFile()) {
-
- // if the source file has fewer than or equal to 2 blocks, then no need to RAID
- long blockSize = src.getBlockSize();
- if (2 * blockSize >= src.getLen()) {
- return;
- }
-
- // check if destination path already exists. If it does and it's modification time
- // does not match the modTime of the source file, then recalculate RAID
- boolean add = false;
- try {
- ParityFilePair ppair = getParityFile(destPathPrefix, path);
- Path outpath = ppair.getPath();
- FileSystem outFs = ppair.getFileSystem();
- FileStatus ostat = outFs.getFileStatus(outpath);
- if (ostat.getModificationTime() != src.getModificationTime() &&
- src.getModificationTime() + modTimePeriod < now) {
- add = true;
- }
- } catch (java.io.FileNotFoundException e) {
- add = true; // destination file does not exist
- }
-
- if (add) {
- accept.add(src);
- }
- return;
-
- } else if (files != null) {
- for (FileStatus one:files) {
- if (!one.getPath().getName().endsWith(HAR_SUFFIX)){
- recurse(srcFs, conf, destPathPrefix, one, accept, modTimePeriod, srcReplication, now);
- }
- }
- }
- }
-
/**
* RAID a list of files.
@@ -668,8 +613,8 @@
throws IOException {
int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
- int stripeLength = getStripeLength(conf, info);
- String destPrefix = getDestinationPath(conf, info);
+ int stripeLength = getStripeLength(conf);
+ Path destPref = getDestinationPath(conf);
String simulate = info.getProperty("simulate");
boolean doSimulate = simulate == null ? false : Boolean
.parseBoolean(simulate);
@@ -677,13 +622,9 @@
Statistics statistics = new Statistics();
int count = 0;
- Path p = new Path(destPrefix.trim());
- FileSystem fs = FileSystem.get(p.toUri(), conf);
- p = p.makeQualified(fs);
-
for (FileStatus s : paths) {
- doRaid(conf, s, p, statistics, null, doSimulate, targetRepl, metaRepl,
- stripeLength);
+ doRaid(conf, s, destPref, statistics, Reporter.NULL, doSimulate, targetRepl,
+ metaRepl, stripeLength);
if (count % 1000 == 0) {
LOG.info("RAID statistics " + statistics.toString());
}
@@ -701,23 +642,16 @@
FileStatus src, Statistics statistics, Reporter reporter) throws IOException {
int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
- int stripeLength = getStripeLength(conf, info);
- String destPrefix = getDestinationPath(conf, info);
+ int stripeLength = getStripeLength(conf);
+ Path destPref = getDestinationPath(conf);
String simulate = info.getProperty("simulate");
boolean doSimulate = simulate == null ? false : Boolean
.parseBoolean(simulate);
- int count = 0;
-
- Path p = new Path(destPrefix.trim());
- FileSystem fs = FileSystem.get(p.toUri(), conf);
- p = p.makeQualified(fs);
-
- doRaid(conf, src, p, statistics, reporter, doSimulate, targetRepl, metaRepl,
- stripeLength);
+ doRaid(conf, src, destPref, statistics, reporter, doSimulate,
+ targetRepl, metaRepl, stripeLength);
}
-
-
+
/**
* RAID an individual file
*/
@@ -784,25 +718,11 @@
Path destPathPrefix, BlockLocation[] locations,
int metaRepl, int stripeLength) throws IOException {
- // two buffers for generating parity
- Random rand = new Random();
- int bufSize = 5 * 1024 * 1024; // 5 MB
- byte[] bufs = new byte[bufSize];
- byte[] xor = new byte[bufSize];
-
Path inpath = stat.getPath();
- long blockSize = stat.getBlockSize();
- long fileSize = stat.getLen();
-
- // create output tmp path
Path outpath = getOriginalParityFile(destPathPrefix, inpath);
FileSystem outFs = outpath.getFileSystem(conf);
-
- Path tmppath = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") +
- outpath.toUri().getPath() + "." +
- rand.nextLong() + ".tmp");
- // if the parity file is already upto-date, then nothing to do
+ // If the parity file is already upto-date, then nothing to do
try {
FileStatus stmp = outFs.getFileStatus(outpath);
if (stmp.getModificationTime() == stat.getModificationTime()) {
@@ -812,66 +732,11 @@
}
} catch (IOException e) {
// ignore errors because the raid file might not exist yet.
- }
-
- LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath);
- FSDataOutputStream out = outFs.create(tmppath,
- true,
- conf.getInt("io.file.buffer.size", 64 * 1024),
- (short)metaRepl,
- blockSize);
-
- try {
-
- // loop once for every stripe length
- for (int startBlock = 0; startBlock < locations.length;) {
-
- // report progress to Map-reduce framework
- if (reporter != null) {
- reporter.progress();
- }
- int blocksLeft = locations.length - startBlock;
- int stripe = Math.min(stripeLength, blocksLeft);
- LOG.info(" startBlock " + startBlock + " stripe " + stripe);
-
- // open a new file descriptor for each block in this stripe.
- // make each fd point to the beginning of each block in this stripe.
- FSDataInputStream[] ins = new FSDataInputStream[stripe];
- for (int i = 0; i < stripe; i++) {
- ins[i] = inFs.open(inpath, bufSize);
- ins[i].seek(blockSize * (startBlock + i));
- }
-
- generateParity(ins,out,blockSize,bufs,xor, reporter);
-
- // close input file handles
- for (int i = 0; i < ins.length; i++) {
- ins[i].close();
- }
-
- // increment startBlock to point to the first block to be processed
- // in the next iteration
- startBlock += stripe;
- }
- out.close();
- out = null;
-
- // delete destination if exists
- if (outFs.exists(outpath)){
- outFs.delete(outpath, false);
- }
- // rename tmppath to the real parity filename
- outFs.mkdirs(outpath.getParent());
- if (!outFs.rename(tmppath, outpath)) {
- String msg = "Unable to rename tmp file " + tmppath + " to " + outpath;
- LOG.warn(msg);
- throw new IOException (msg);
- }
- } finally {
- // remove the tmp file if it still exists
- outFs.delete(tmppath, false);
}
+ XOREncoder encoder = new XOREncoder(conf, stripeLength);
+ encoder.encodeFile(inFs, inpath, outpath, (short)metaRepl, reporter);
+
// set the modification time of the RAID file. This is done so that the modTime of the
// RAID file reflects that contents of the source file that it has RAIDed. This should
// also work for files that are being appended to. This is necessary because the time on
@@ -880,255 +745,39 @@
outFs.setTimes(outpath, stat.getModificationTime(), -1);
FileStatus outstat = outFs.getFileStatus(outpath);
- LOG.info("Source file " + inpath + " of size " + fileSize +
+ FileStatus inStat = inFs.getFileStatus(inpath);
+ LOG.info("Source file " + inpath + " of size " + inStat.getLen() +
" Parity file " + outpath + " of size " + outstat.getLen() +
" src mtime " + stat.getModificationTime() +
" parity mtime " + outstat.getModificationTime());
}
- private static int readInputUntilEnd(FSDataInputStream ins, byte[] bufs, int toRead)
- throws IOException {
-
- int tread = 0;
-
- while (tread < toRead) {
- int read = ins.read(bufs, tread, toRead - tread);
- if (read == -1) {
- return tread;
- } else {
- tread += read;
- }
- }
-
- return tread;
- }
-
- private static void generateParity(FSDataInputStream[] ins, FSDataOutputStream fout,
- long parityBlockSize, byte[] bufs, byte[] xor, Reporter reporter) throws IOException {
-
- int bufSize;
- if ((bufs == null) || (bufs.length == 0)){
- bufSize = 5 * 1024 * 1024; // 5 MB
- bufs = new byte[bufSize];
- } else {
- bufSize = bufs.length;
- }
- if ((xor == null) || (xor.length != bufs.length)){
- xor = new byte[bufSize];
- }
-
- int xorlen = 0;
-
- // this loop processes all good blocks in selected stripe
- long remaining = parityBlockSize;
-
- while (remaining > 0) {
- int toRead = (int)Math.min(remaining, bufSize);
-
- if (ins.length > 0) {
- xorlen = readInputUntilEnd(ins[0], xor, toRead);
- }
-
- // read all remaining blocks and xor them into the buffer
- for (int i = 1; i < ins.length; i++) {
-
- // report progress to Map-reduce framework
- if (reporter != null) {
- reporter.progress();
- }
-
- int actualRead = readInputUntilEnd(ins[i], bufs, toRead);
-
- int j;
- int xorlimit = (int) Math.min(xorlen,actualRead);
- for (j = 0; j < xorlimit; j++) {
- xor[j] ^= bufs[j];
- }
- if ( actualRead > xorlen ){
- for (; j < actualRead; j++) {
- xor[j] = bufs[j];
- }
- xorlen = actualRead;
- }
-
- }
-
- if (xorlen < toRead) {
- Arrays.fill(bufs, xorlen, toRead, (byte) 0);
- }
-
- // write this to the tmp file
- fout.write(xor, 0, toRead);
- remaining -= toRead;
- }
-
- }
-
/**
- * Extract a good block from the parity block. This assumes that the corruption
- * is in the main file and the parity file is always good.
+ * Extract a good block from the parity block. This assumes that the
+ * corruption is in the main file and the parity file is always good.
*/
- public static Path unRaid(Configuration conf, Path srcPath, Path destPathPrefix,
- int stripeLength, long corruptOffset) throws IOException {
+ public static Path unRaid(Configuration conf, Path srcPath,
+ Path destPathPrefix, Decoder decoder, int stripeLength,
+ long corruptOffset) throws IOException {
- // extract block locations, size etc from source file
- Random rand = new Random();
- FileSystem srcFs = srcPath.getFileSystem(conf);
- FileStatus srcStat = srcFs.getFileStatus(srcPath);
- long blockSize = srcStat.getBlockSize();
- long fileSize = srcStat.getLen();
-
- // find the stripe number where the corrupted offset lies
- long snum = corruptOffset / (stripeLength * blockSize);
- long startOffset = snum * stripeLength * blockSize;
- long corruptBlockInStripe = (corruptOffset - startOffset)/blockSize;
- long corruptBlockSize = Math.min(blockSize, fileSize - startOffset);
-
- LOG.info("Start offset of relevent stripe = " + startOffset +
- " corruptBlockInStripe " + corruptBlockInStripe);
-
- // open file descriptors to read all good blocks of the file
- FSDataInputStream[] instmp = new FSDataInputStream[stripeLength];
- int numLength = 0;
- for (int i = 0; i < stripeLength; i++) {
- if (i == corruptBlockInStripe) {
- continue; // do not open corrupt block
- }
- if (startOffset + i * blockSize >= fileSize) {
- LOG.info("Stop offset of relevent stripe = " +
- startOffset + i * blockSize);
- break;
- }
- instmp[numLength] = srcFs.open(srcPath);
- instmp[numLength].seek(startOffset + i * blockSize);
- numLength++;
+ // Test if parity file exists
+ ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf);
+ if (ppair == null) {
+ return null;
}
- // create array of inputstream, allocate one extra slot for
- // parity file. numLength could be smaller than stripeLength
- // if we are processing the last partial stripe on a file.
- numLength += 1;
- FSDataInputStream[] ins = new FSDataInputStream[numLength];
- for (int i = 0; i < numLength-1; i++) {
- ins[i] = instmp[i];
- }
- LOG.info("Decompose a total of " + numLength + " blocks.");
-
- // open and seek to the appropriate offset in parity file.
- ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf);
- Path parityFile = ppair.getPath();
- FileSystem parityFs = ppair.getFileSystem();
- LOG.info("Parity file for " + srcPath + " is " + parityFile);
- ins[numLength-1] = parityFs.open(parityFile);
- ins[numLength-1].seek(snum * blockSize);
- LOG.info("Parity file " + parityFile +
- " seeking to relevent block at offset " +
- ins[numLength-1].getPos());
-
- // create a temporary filename in the source filesystem
- // do not overwrite an existing tmp file. Make it fail for now.
- // We need to generate a unique name for this tmp file later on.
- Path tmpFile = null;
- FSDataOutputStream fout = null;
- FileSystem destFs = destPathPrefix.getFileSystem(conf);
- int retry = 5;
- try {
- tmpFile = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") + "/" +
- rand.nextInt());
- fout = destFs.create(tmpFile, false);
- } catch (IOException e) {
- if (retry-- <= 0) {
- LOG.info("Unable to create temporary file " + tmpFile +
- " Aborting....");
- throw e;
- }
- LOG.info("Unable to create temporary file " + tmpFile +
- "Retrying....");
- }
- LOG.info("Created recovered block file " + tmpFile);
-
- // buffers for generating parity bits
- int bufSize = 5 * 1024 * 1024; // 5 MB
- byte[] bufs = new byte[bufSize];
- byte[] xor = new byte[bufSize];
-
- generateParity(ins,fout,corruptBlockSize,bufs,xor,null);
-
- // close all files
- fout.close();
- for (int i = 0; i < ins.length; i++) {
- ins[i].close();
- }
-
- // Now, reopen the source file and the recovered block file
- // and copy all relevant data to new file
final Path recoveryDestination =
new Path(conf.get("fs.raid.tmpdir", "/tmp/raid"));
+ FileSystem destFs = recoveryDestination.getFileSystem(conf);
final Path recoveredPrefix =
destFs.makeQualified(new Path(recoveryDestination, makeRelative(srcPath)));
final Path recoveredPath =
- new Path(recoveredPrefix + "." + rand.nextLong() + ".recovered");
+ new Path(recoveredPrefix + "." + new Random().nextLong() + ".recovered");
LOG.info("Creating recovered file " + recoveredPath);
- FSDataInputStream sin = srcFs.open(srcPath);
- FSDataOutputStream out = destFs.create(recoveredPath, false,
- conf.getInt("io.file.buffer.size", 64 * 1024),
- srcStat.getReplication(),
- srcStat.getBlockSize());
-
- FSDataInputStream bin = destFs.open(tmpFile);
- long recoveredSize = 0;
-
- // copy all the good blocks (upto the corruption)
- // from source file to output file
- long remaining = corruptOffset / blockSize * blockSize;
- while (remaining > 0) {
- int toRead = (int)Math.min(remaining, bufSize);
- sin.readFully(bufs, 0, toRead);
- out.write(bufs, 0, toRead);
- remaining -= toRead;
- recoveredSize += toRead;
- }
- LOG.info("Copied upto " + recoveredSize + " from src file. ");
-
- // copy recovered block to output file
- remaining = corruptBlockSize;
- while (recoveredSize < fileSize &&
- remaining > 0) {
- int toRead = (int)Math.min(remaining, bufSize);
- bin.readFully(bufs, 0, toRead);
- out.write(bufs, 0, toRead);
- remaining -= toRead;
- recoveredSize += toRead;
- }
- LOG.info("Copied upto " + recoveredSize + " from recovered-block file. ");
-
- // skip bad block in src file
- if (recoveredSize < fileSize) {
- sin.seek(sin.getPos() + corruptBlockSize);
- }
-
- // copy remaining good data from src file to output file
- while (recoveredSize < fileSize) {
- int toRead = (int)Math.min(fileSize - recoveredSize, bufSize);
- sin.readFully(bufs, 0, toRead);
- out.write(bufs, 0, toRead);
- recoveredSize += toRead;
- }
- out.close();
- LOG.info("Completed writing " + recoveredSize + " bytes into " +
- recoveredPath);
-
- sin.close();
- bin.close();
-
- // delete the temporary block file that was created.
- destFs.delete(tmpFile, false);
- LOG.info("Deleted temporary file " + tmpFile);
-
- // copy the meta information from source path to the newly created
- // recovered path
- copyMetaInformation(destFs, srcStat, recoveredPath);
+ FileSystem srcFs = srcPath.getFileSystem(conf);
+ decoder.decodeFile(srcFs, srcPath, ppair.getFileSystem(),
+ ppair.getPath(), corruptOffset, recoveredPath);
return recoveredPath;
}
@@ -1179,35 +828,22 @@
PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
Arrays.sort(sorted, lexi);
- // paths we have processed so far
- Set<Path> processed = new HashSet<Path>();
-
for (PolicyList category : sorted) {
for (PolicyInfo info: category.getAll()) {
try {
// expand destination prefix path
- String destinationPrefix = getDestinationPath(conf, info);
- Path destPref = new Path(destinationPrefix.trim());
- FileSystem destFs = FileSystem.get(destPref.toUri(), conf);
- destPref = destFs.makeQualified(destPref);
+ Path destPref = getDestinationPath(conf);
+ FileSystem destFs = destPref.getFileSystem(conf);
//get srcPaths
Path[] srcPaths = info.getSrcPathExpanded();
- if ( srcPaths != null ){
+ if (srcPaths != null) {
for (Path srcPath: srcPaths) {
// expand destination prefix
Path destPath = getOriginalParityFile(destPref, srcPath);
- // if this destination path has already been processed as part
- // of another policy, then nothing more to do
- if (processed.contains(destPath)) {
- LOG.info("Obsolete parity files for policy " +
- info.getName() + " has already been procesed.");
- continue;
- }
-
FileSystem srcFs = info.getSrcPath().getFileSystem(conf);
FileStatus stat = null;
try {
@@ -1221,12 +857,8 @@
recursePurge(srcFs, destFs, destPref.toUri().getPath(), stat);
}
- // this destination path has already been processed
- processed.add(destPath);
-
}
}
-
} catch (Exception e) {
LOG.warn("Ignoring Exception while processing policy " +
info.getName() + " " +
@@ -1342,10 +974,8 @@
try {
long cutoff = now() - ( Long.parseLong(str) * 24L * 3600000L );
- String destinationPrefix = getDestinationPath(conf, info);
- Path destPref = new Path(destinationPrefix.trim());
+ Path destPref = getDestinationPath(conf);
FileSystem destFs = destPref.getFileSystem(conf);
- destPref = destFs.makeQualified(destPref);
//get srcPaths
Path[] srcPaths = info.getSrcPathExpanded();
@@ -1407,7 +1037,11 @@
recurseHar(info, destFs, one, destPrefix, srcFs, cutoff, tmpHarPath);
shouldHar = false;
} else if (one.getModificationTime() > cutoff ) {
- shouldHar = false;
+ if (shouldHar) {
+ LOG.info("Cannot archive " + destPath +
+ " because " + one.getPath() + " was modified after cutoff");
+ shouldHar = false;
+ }
}
}
@@ -1433,6 +1067,7 @@
}
if ( shouldHar ) {
+ LOG.info("Archiving " + dest.getPath() + " to " + tmpHarPath );
singleHar(destFs, dest, tmpHarPath);
}
}
@@ -1493,56 +1128,25 @@
LOG.info("Leaving Har thread.");
}
-
- }
-
- /**
- * If the config file has an entry for hdfs.raid.locations, then that overrides
- * destination path specified in the raid policy file
- */
- static private String getDestinationPath(Configuration conf, PolicyInfo info) {
- String locs = conf.get("hdfs.raid.locations");
- if (locs != null) {
- return locs;
- }
- locs = info.getDestinationPath();
- if (locs == null) {
- return DEFAULT_RAID_LOCATION;
- }
- return locs;
}
/**
- * If the config file has an entry for hdfs.raid.stripeLength, then use that
- * specified in the raid policy file
+ * Return the path prefix that stores the parity files
*/
- static private int getStripeLength(Configuration conf, PolicyInfo info)
- throws IOException {
- int len = conf.getInt("hdfs.raid.stripeLength", 0);
- if (len != 0) {
- return len;
- }
- String str = info.getProperty("stripeLength");
- if (str == null) {
- String msg = "hdfs.raid.stripeLength is not defined." +
- " Using a default " + DEFAULT_STRIPE_LENGTH;
- LOG.info(msg);
- return DEFAULT_STRIPE_LENGTH;
- }
- return Integer.parseInt(str);
+ static Path getDestinationPath(Configuration conf)
+ throws IOException {
+ String loc = conf.get(RAID_LOCATION_KEY, DEFAULT_RAID_LOCATION);
+ Path p = new Path(loc.trim());
+ FileSystem fs = FileSystem.get(p.toUri(), conf);
+ p = p.makeQualified(fs);
+ return p;
}
/**
- * Copy the file owner, modtime, etc from srcPath to the recovered Path.
- * It is possiible that we might have to retrieve file persmissions,
- * quotas, etc too in future.
+ * Obtain stripe length from configuration
*/
- static private void copyMetaInformation(FileSystem fs, FileStatus stat,
- Path recoveredPath)
- throws IOException {
- fs.setOwner(recoveredPath, stat.getOwner(), stat.getGroup());
- fs.setPermission(recoveredPath, stat.getPermission());
- fs.setTimes(recoveredPath, stat.getModificationTime(), stat.getAccessTime());
+ public static int getStripeLength(Configuration conf) {
+ return conf.getInt(STRIPE_LENGTH_KEY, DEFAULT_STRIPE_LENGTH);
}
/**
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
new file mode 100644
index 0000000..6818168
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.IOUtils;
+
+public class RaidUtils {
+ public static void readTillEnd(InputStream in, byte[] buf, boolean eofOK)
+ throws IOException {
+ int toRead = buf.length;
+ int numRead = 0;
+ while (numRead < toRead) {
+ int nread = in.read(buf, numRead, toRead - numRead);
+ if (nread < 0) {
+ if (eofOK) {
+ // EOF hit, fill with zeros
+ Arrays.fill(buf, numRead, toRead, (byte)0);
+ numRead = toRead;
+ } else {
+ // EOF hit, throw.
+ throw new IOException("Premature EOF");
+ }
+ } else {
+ numRead += nread;
+ }
+ }
+ }
+
+ public static void copyBytes(
+ InputStream in, OutputStream out, byte[] buf, long count)
+ throws IOException {
+ for (long bytesRead = 0; bytesRead < count; ) {
+ int toRead = Math.min(buf.length, (int)(count - bytesRead));
+ IOUtils.readFully(in, buf, 0, toRead);
+ bytesRead += toRead;
+ out.write(buf, 0, toRead);
+ }
+ }
+
+ public static class ZeroInputStream extends InputStream
+ implements Seekable, PositionedReadable {
+ private long endOffset;
+ private long pos;
+
+ public ZeroInputStream(long endOffset) {
+ this.endOffset = endOffset;
+ this.pos = 0;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (pos < endOffset) {
+ pos++;
+ return 0;
+ }
+ return -1;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int)(endOffset - pos);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public void seek(long seekOffset) throws IOException {
+ if (seekOffset < endOffset) {
+ pos = seekOffset;
+ } else {
+ throw new IOException("Illegal Offset" + pos);
+ }
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ int count = 0;
+ for (; position < endOffset && count < length; position++) {
+ buffer[offset + count] = 0;
+ count++;
+ }
+ return count;
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ int count = 0;
+ for (; position < endOffset && count < length; position++) {
+ buffer[offset + count] = 0;
+ count++;
+ }
+ if (count < length) {
+ throw new IOException("Premature EOF");
+ }
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ readFully(position, buffer, 0, buffer.length);
+ }
+ }
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java
new file mode 100644
index 0000000..3dfe592
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+public class XORDecoder extends Decoder {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.XORDecoder");
+
+ public XORDecoder(
+ Configuration conf, int stripeSize) {
+ super(conf, stripeSize, 1);
+ }
+
+ @Override
+ protected void fixErasedBlock(
+ FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
+ long blockSize, long errorOffset, long bytesToSkip, long limit,
+ OutputStream out) throws IOException {
+ LOG.info("Fixing block at " + srcFile + ":" + errorOffset +
+ ", skipping " + bytesToSkip + ", limit " + limit);
+ FileStatus srcStat = fs.getFileStatus(srcFile);
+ ArrayList<FSDataInputStream> xorinputs = new ArrayList<FSDataInputStream>();
+
+ FSDataInputStream parityFileIn = parityFs.open(parityFile);
+ parityFileIn.seek(parityOffset(errorOffset, blockSize));
+ xorinputs.add(parityFileIn);
+
+ long errorBlockOffset = (errorOffset / blockSize) * blockSize;
+ long[] srcOffsets = stripeOffsets(errorOffset, blockSize);
+ for (int i = 0; i < srcOffsets.length; i++) {
+ if (srcOffsets[i] == errorBlockOffset) {
+ LOG.info("Skipping block at " + srcFile + ":" + errorBlockOffset);
+ continue;
+ }
+ if (srcOffsets[i] < srcStat.getLen()) {
+ FSDataInputStream in = fs.open(srcFile);
+ in.seek(srcOffsets[i]);
+ xorinputs.add(in);
+ }
+ }
+ FSDataInputStream[] inputs = xorinputs.toArray(
+ new FSDataInputStream[]{null});
+ ParityInputStream recovered =
+ new ParityInputStream(inputs, limit, readBufs[0], writeBufs[0]);
+ recovered.skip(bytesToSkip);
+ recovered.drain(out, null);
+ }
+
+ protected long[] stripeOffsets(long errorOffset, long blockSize) {
+ long[] offsets = new long[stripeSize];
+ long stripeIdx = errorOffset / (blockSize * stripeSize);
+ long startOffsetOfStripe = stripeIdx * stripeSize * blockSize;
+ for (int i = 0; i < stripeSize; i++) {
+ offsets[i] = startOffsetOfStripe + i * blockSize;
+ }
+ return offsets;
+ }
+
+ protected long parityOffset(long errorOffset, long blockSize) {
+ long stripeIdx = errorOffset / (blockSize * stripeSize);
+ return stripeIdx * blockSize;
+ }
+
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
new file mode 100644
index 0000000..3d113e6
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+public class XOREncoder extends Encoder {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.XOREncoder");
+ public XOREncoder(
+ Configuration conf, int stripeSize) {
+ super(conf, stripeSize, 1);
+ }
+
+ @Override
+ protected void encodeStripe(
+ InputStream[] blocks,
+ long stripeStartOffset,
+ long blockSize,
+ OutputStream[] outs,
+ Progressable reporter) throws IOException {
+ LOG.info("Peforming XOR ");
+ ParityInputStream parityIn =
+ new ParityInputStream(blocks, blockSize, readBufs[0], writeBufs[0]);
+ try {
+ parityIn.drain(outs[0], reporter);
+ } finally {
+ parityIn.close();
+ }
+ }
+}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
index f6e3aa6..3da2f15 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
@@ -47,11 +47,14 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
public class TestRaidDfs extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -59,8 +62,7 @@
final static String CONFIG_FILE = new File(TEST_DIR,
"test-raid.xml").getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
- final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidNode");
- final Random rand = new Random();
+ final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidDfs");
final static int NUM_DATANODES = 3;
Configuration conf;
@@ -83,6 +85,9 @@
// scan all policies once every 5 second
conf.setLong("raid.policy.rescan.interval", 5000);
+ // make all deletions not go through Trash
+ conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
// do not use map-reduce cluster for Raiding
conf.setBoolean("fs.raidnode.local", true);
conf.set("raid.server.address", "localhost:0");
@@ -133,80 +138,148 @@
if (cnode != null) { cnode.stop(); cnode.join(); }
if (dfs != null) { dfs.shutdown(); }
}
+
+ private LocatedBlocks getBlockLocations(Path file, long length)
+ throws IOException {
+ DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
+ return dfs.getClient().namenode.getBlockLocations(file.toString(), 0, length);
+ }
+
+ private LocatedBlocks getBlockLocations(Path file)
+ throws IOException {
+ FileStatus stat = fileSys.getFileStatus(file);
+ return getBlockLocations(file, stat.getLen());
+ }
+
+ private DistributedRaidFileSystem getRaidFS() throws IOException {
+ DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+ Configuration clientConf = new Configuration(conf);
+ clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
+ clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+ URI dfsUri = dfs.getUri();
+ return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf);
+ }
+
+ public static void waitForFileRaided(
+ Log logger, FileSystem fileSys, Path file, Path destPath)
+ throws IOException, InterruptedException {
+ FileStatus parityStat = null;
+ String fileName = file.getName().toString();
+ // wait till file is raided
+ while (parityStat == null) {
+ logger.info("Waiting for files to be raided.");
+ try {
+ FileStatus[] listPaths = fileSys.listStatus(destPath);
+ if (listPaths != null) {
+ for (FileStatus f : listPaths) {
+ logger.info("File raided so far : " + f.getPath());
+ String found = f.getPath().getName().toString();
+ if (fileName.equals(found)) {
+ parityStat = f;
+ break;
+ }
+ }
+ }
+ } catch (FileNotFoundException e) {
+ //ignore
+ }
+ Thread.sleep(1000); // keep waiting
+ }
+
+ while (true) {
+ LocatedBlocks locations = null;
+ DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
+ locations = dfs.getClient().namenode.getBlockLocations(
+ file.toString(), 0, parityStat.getLen());
+ if (!locations.isUnderConstruction()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ while (true) {
+ FileStatus stat = fileSys.getFileStatus(file);
+ if (stat.getReplication() == 1) break;
+ Thread.sleep(1000);
+ }
+ }
+
+ private void corruptBlockAndValidate(Path srcFile, Path destPath,
+ int[] listBlockNumToCorrupt, long blockSize, int numBlocks)
+ throws IOException, InterruptedException {
+ int repl = 1;
+ long crc = createTestFilePartialLastBlock(fileSys, srcFile, repl,
+ numBlocks, blockSize);
+ long length = fileSys.getFileStatus(srcFile).getLen();
+
+ waitForFileRaided(LOG, fileSys, srcFile, destPath);
+
+ // Delete first block of file
+ for (int blockNumToCorrupt : listBlockNumToCorrupt) {
+ LOG.info("Corrupt block " + blockNumToCorrupt + " of file " + srcFile);
+ LocatedBlocks locations = getBlockLocations(srcFile);
+ corruptBlock(srcFile, locations.get(blockNumToCorrupt).getBlock(),
+ NUM_DATANODES, true);
+ }
+
+ // Validate
+ DistributedRaidFileSystem raidfs = getRaidFS();
+ assertTrue(validateFile(raidfs, srcFile, length, crc));
+ }
/**
- * Test DFS Raid
+ * Create a file, corrupt a block in it and ensure that the file can be
+ * read through DistributedRaidFileSystem.
*/
public void testRaidDfs() throws Exception {
LOG.info("Test testRaidDfs started.");
+
long blockSize = 8192L;
- int stripeLength = 3;
+ int numBlocks = 8;
+ int repl = 1;
mySetup();
- Path file1 = new Path("/user/dhruba/raidtest/file1");
+
+ // Create an instance of the RaidNode
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ cnode = RaidNode.createRaidNode(null, localConf);
+
+ Path file = new Path("/user/dhruba/raidtest/file");
Path destPath = new Path("/destraid/user/dhruba/raidtest");
- long crc1 = createOldFile(fileSys, file1, 1, 7, blockSize);
- LOG.info("Test testPathFilter created test files");
-
- // create an instance of the RaidNode
- cnode = RaidNode.createRaidNode(null, conf);
-
+ int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
try {
- FileStatus[] listPaths = null;
+ long crc = createTestFilePartialLastBlock(fileSys, file, repl,
+ numBlocks, blockSize);
+ long length = fileSys.getFileStatus(file).getLen();
+ waitForFileRaided(LOG, fileSys, file, destPath);
+ LocatedBlocks locations = getBlockLocations(file);
- // wait till file is raided
- while (listPaths == null || listPaths.length != 1) {
- LOG.info("Test testPathFilter waiting for files to be raided.");
- try {
- listPaths = fileSys.listStatus(destPath);
- } catch (FileNotFoundException e) {
- //ignore
- }
- Thread.sleep(1000); // keep waiting
+ for (int i = 0; i < corrupt.length; i++) {
+ int blockNumToCorrupt = corrupt[i][0];
+ LOG.info("Corrupt block " + blockNumToCorrupt + " of file");
+ corruptBlock(file, locations.get(blockNumToCorrupt).getBlock(),
+ NUM_DATANODES, false);
+ validateFile(getRaidFS(), file, length, crc);
}
- assertEquals(listPaths.length, 1); // all files raided
- LOG.info("Files raided so far : " + listPaths[0].getPath());
-
- // extract block locations from File system. Wait till file is closed.
- LocatedBlocks locations = null;
- DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
- while (true) {
- locations = dfs.getClient().getNamenode().getBlockLocations(file1.toString(),
- 0, listPaths[0].getLen());
- if (!locations.isUnderConstruction()) {
- break;
- }
- Thread.sleep(1000);
- }
-
- // filter all filesystem calls from client
- Configuration clientConf = new Configuration(conf);
- clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
- clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
- URI dfsUri = dfs.getUri();
- FileSystem.closeAll();
- FileSystem raidfs = FileSystem.get(dfsUri, clientConf);
-
- assertTrue("raidfs not an instance of DistributedRaidFileSystem",raidfs instanceof DistributedRaidFileSystem);
-
- LOG.info("Corrupt first block of file");
- corruptBlock(file1, locations.get(0).getBlock(), NUM_DATANODES, false);
- validateFile(raidfs, file1, file1, crc1);
// Corrupt one more block. This is expected to fail.
- LOG.info("Corrupt second block of file");
- corruptBlock(file1, locations.get(1).getBlock(), NUM_DATANODES, false);
+ LOG.info("Corrupt one more block of file");
+ corruptBlock(file, locations.get(1).getBlock(), NUM_DATANODES, false);
try {
- validateFile(raidfs, file1, file1, crc1);
+ validateFile(getRaidFS(), file, length, crc);
fail("Expected exception ChecksumException not thrown!");
} catch (org.apache.hadoop.fs.ChecksumException e) {
}
} catch (Exception e) {
- LOG.info("testPathFilter Exception " + e + StringUtils.stringifyException(e));
+ LOG.info("testRaidDfs Exception " + e +
+ StringUtils.stringifyException(e));
throw e;
} finally {
+ if (cnode != null) { cnode.stop(); cnode.join(); }
myTearDown();
}
- LOG.info("Test testPathFilter completed.");
+ LOG.info("Test testRaidDfs completed.");
}
/**
@@ -217,7 +290,7 @@
try {
Path file = new Path("/user/raid/raidtest/file1");
- createOldFile(fileSys, file, 1, 7, 8192L);
+ createTestFile(fileSys, file, 1, 7, 8192L);
// filter all filesystem calls from client
Configuration clientConf = new Configuration(conf);
@@ -242,13 +315,15 @@
myTearDown();
}
}
-
+
//
// creates a file and populate it with random data. Returns its crc.
//
- private long createOldFile(FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize)
+ public static long createTestFile(FileSystem fileSys, Path name, int repl,
+ int numBlocks, long blocksize)
throws IOException {
CRC32 crc = new CRC32();
+ Random rand = new Random();
FSDataOutputStream stm = fileSys.create(name, true,
fileSys.getConf().getInt("io.file.buffer.size", 4096),
(short)repl, blocksize);
@@ -264,19 +339,43 @@
}
//
+ // Creates a file with partially full last block. Populate it with random
+ // data. Returns its crc.
+ //
+ public static long createTestFilePartialLastBlock(
+ FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize)
+ throws IOException {
+ CRC32 crc = new CRC32();
+ Random rand = new Random();
+ FSDataOutputStream stm = fileSys.create(name, true,
+ fileSys.getConf().getInt("io.file.buffer.size", 4096),
+ (short)repl, blocksize);
+ // Write whole blocks.
+ byte[] b = new byte[(int)blocksize];
+ for (int i = 1; i < numBlocks; i++) {
+ rand.nextBytes(b);
+ stm.write(b);
+ crc.update(b);
+ }
+ // Write partial block.
+ b = new byte[(int)blocksize/2 - 1];
+ rand.nextBytes(b);
+ stm.write(b);
+ crc.update(b);
+
+ stm.close();
+ return crc.getValue();
+ }
+ //
// validates that file matches the crc.
//
- private void validateFile(FileSystem fileSys, Path name1, Path name2, long crc)
+ public static boolean validateFile(FileSystem fileSys, Path name, long length,
+ long crc)
throws IOException {
- FileStatus stat1 = fileSys.getFileStatus(name1);
- FileStatus stat2 = fileSys.getFileStatus(name2);
- assertTrue(" Length of file " + name1 + " is " + stat1.getLen() +
- " is different from length of file " + name1 + " " + stat2.getLen(),
- stat1.getLen() == stat2.getLen());
-
+ long numRead = 0;
CRC32 newcrc = new CRC32();
- FSDataInputStream stm = fileSys.open(name2);
+ FSDataInputStream stm = fileSys.open(name);
final byte[] b = new byte[4192];
int num = 0;
while (num >= 0) {
@@ -284,19 +383,28 @@
if (num < 0) {
break;
}
+ numRead += num;
newcrc.update(b, 0, num);
}
stm.close();
+
+ if (numRead != length) {
+ LOG.info("Number of bytes read " + numRead +
+ " does not match file size " + length);
+ return false;
+ }
+
LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
if (newcrc.getValue() != crc) {
- fail("CRC mismatch of files " + name1 + " with file " + name2);
+ LOG.info("CRC mismatch of file " + name + ": " + newcrc + " vs. " + crc);
}
+ return true;
}
/*
* The Data directories for a datanode
*/
- static private File[] getDataNodeDirs(int i) throws IOException {
+ private static File[] getDataNodeDirs(int i) throws IOException {
File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
File data_dir = new File(base_dir, "data");
File dir1 = new File(data_dir, "data"+(2*i+1));
@@ -353,10 +461,32 @@
(numCorrupted + numDeleted) > 0);
}
- //
- // Corrupt specified block of file
- //
- void corruptBlock(Path file, Block blockNum) throws IOException {
- corruptBlock(file, blockNum, NUM_DATANODES, true);
+ public static void corruptBlock(Path file, Block blockNum,
+ int numDataNodes, long offset) throws IOException {
+ long id = blockNum.getBlockId();
+
+ // Now deliberately remove/truncate data blocks from the block.
+ //
+ for (int i = 0; i < numDataNodes; i++) {
+ File[] dirs = getDataNodeDirs(i);
+
+ for (int j = 0; j < dirs.length; j++) {
+ File[] blocks = dirs[j].listFiles();
+ assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
+ for (int idx = 0; idx < blocks.length; idx++) {
+ if (blocks[idx].getName().startsWith("blk_" + id) &&
+ !blocks[idx].getName().endsWith(".meta")) {
+ // Corrupt
+ File f = blocks[idx];
+ RandomAccessFile raf = new RandomAccessFile(f, "rw");
+ raf.seek(offset);
+ int data = raf.readInt();
+ raf.seek(offset);
+ raf.writeInt(data+1);
+ LOG.info("Corrupted block " + blocks[idx]);
+ }
+ }
+ }
+ }
}
}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
new file mode 100644
index 0000000..119cae6
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+public class TestDirectoryTraversal extends TestCase {
+ final static Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.TestDirectoryTraversal");
+ final static String TEST_DIR = new File(System.getProperty("test.build.data",
+ "build/contrib/raid/test/data")).getAbsolutePath();
+
+ MiniDFSCluster dfs = null;
+ FileSystem fs = null;
+ Configuration conf = null;
+
+ /**
+ * Test basic enumeration.
+ */
+ public void testEnumeration() throws IOException {
+ mySetup();
+
+ try {
+ Path topDir = new Path(TEST_DIR + "/testenumeration");
+
+ createTestTree(topDir);
+
+ LOG.info("Enumerating files");
+ List<FileStatus> startPaths = new LinkedList<FileStatus>();
+ startPaths.add(fs.getFileStatus(topDir));
+ DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
+
+ List<FileStatus> selected = new LinkedList<FileStatus>();
+ while (true) {
+ FileStatus f = dt.getNextFile();
+ if (f == null) break;
+ assertEquals(false, f.isDir());
+ LOG.info(f.getPath());
+ selected.add(f);
+ }
+ assertEquals(5, selected.size());
+
+ LOG.info("Enumerating directories");
+ startPaths.clear();
+ startPaths.add(fs.getFileStatus(topDir));
+ dt = new DirectoryTraversal(fs, startPaths);
+ selected.clear();
+ while (true) {
+ FileStatus dir = dt.getNextDirectory();
+ if (dir == null) break;
+ assertEquals(true, dir.isDir());
+ LOG.info(dir.getPath());
+ selected.add(dir);
+ }
+ assertEquals(4, selected.size());
+ } finally {
+ myTearDown();
+ }
+ }
+
+ public void testSuspension() throws IOException {
+ mySetup();
+
+ try {
+ Path topDir = new Path(TEST_DIR + "/testenumeration");
+
+ createTestTree(topDir);
+
+ String top = topDir.toString();
+ List<FileStatus> startPaths = new LinkedList<FileStatus>();
+ startPaths.add(fs.getFileStatus(new Path(top + "/a")));
+ startPaths.add(fs.getFileStatus(new Path(top + "/b")));
+ DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
+
+ int limit = 2;
+ short targetRepl = 1;
+ Path raid = new Path("/raid");
+ List<FileStatus> selected = dt.selectFilesToRaid(conf, targetRepl, raid,
+ 0, limit);
+ for (FileStatus f: selected) {
+ LOG.info(f.getPath());
+ }
+ assertEquals(limit, selected.size());
+
+ selected = dt.selectFilesToRaid(conf, targetRepl, raid, 0, limit);
+ for (FileStatus f: selected) {
+ LOG.info(f.getPath());
+ }
+ assertEquals(limit, selected.size());
+ } finally {
+ myTearDown();
+ }
+ }
+
+ /**
+ * Creates a test directory tree.
+ * top
+ * / | \
+ * / | f5
+ * a b___
+ * / \ |\ \
+ * f1 f2 f3f4 c
+ */
+ private void createTestTree(Path topDir) throws IOException {
+ String top = topDir.toString();
+ fs.delete(topDir, true);
+
+ fs.mkdirs(topDir);
+ fs.create(new Path(top + "/f5")).close();
+
+ fs.mkdirs(new Path(top + "/a"));
+ createTestFile(new Path(top + "/a/f1"));
+ createTestFile(new Path(top + "/a/f2"));
+
+ fs.mkdirs(new Path(top + "/b"));
+ fs.mkdirs(new Path(top + "/b/c"));
+ createTestFile(new Path(top + "/b/f3"));
+ createTestFile(new Path(top + "/b/f4"));
+ }
+
+ private void createTestFile(Path file) throws IOException {
+ long blockSize = 8192;
+ byte[] bytes = new byte[(int)blockSize];
+ FSDataOutputStream stm = fs.create(file, false, 4096, (short)1, blockSize);
+ stm.write(bytes);
+ stm.write(bytes);
+ stm.write(bytes);
+ stm.close();
+ FileStatus stat = fs.getFileStatus(file);
+ assertEquals(blockSize, stat.getBlockSize());
+ }
+
+ private void mySetup() throws IOException {
+ conf = new Configuration();
+ dfs = new MiniDFSCluster(conf, 6, true, null);
+ dfs.waitActive();
+ fs = dfs.getFileSystem();
+ }
+
+ private void myTearDown() {
+ if (dfs != null) { dfs.shutdown(); }
+ }
+}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
index 01b9b95..09fe785 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
@@ -82,6 +82,7 @@
conf.setBoolean("fs.raidnode.local", local);
conf.set("raid.server.address", "localhost:0");
+ conf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
// create a dfs and map-reduce cluster
final int taskTrackers = 4;
@@ -101,12 +102,12 @@
/**
* create raid.xml file for RaidNode
*/
- private void mySetup(String srcPath, long targetReplication,
- long metaReplication, long stripeLength ) throws Exception {
+ private void mySetup(long targetReplication,
+ long metaReplication, long stripeLength) throws Exception {
FileWriter fileWriter = new FileWriter(CONFIG_FILE);
fileWriter.write("<?xml version=\"1.0\"?>\n");
String str = "<configuration> " +
- "<srcPath prefix=\"" + srcPath + "\"> " +
+ "<srcPath prefix=\"/user/test/raidtest\"> " +
"<policy name = \"RaidTest1\"> " +
"<destPath> /destraid</destPath> " +
"<property> " +
@@ -162,7 +163,6 @@
public void testRaidHar() throws Exception {
LOG.info("Test testRaidHar started.");
- String srcPaths [] = { "/user/test/raidtest", "/user/test/raid*" };
long blockSizes [] = {1024L};
long stripeLengths [] = {5};
long targetReplication = 1;
@@ -172,13 +172,11 @@
createClusters(true);
try {
- for (String srcPath : srcPaths) {
- for (long blockSize : blockSizes) {
- for (long stripeLength : stripeLengths) {
- doTestHar(iter, srcPath, targetReplication, metaReplication,
- stripeLength, blockSize, numBlock);
- iter++;
- }
+ for (long blockSize : blockSizes) {
+ for (long stripeLength : stripeLengths) {
+ doTestHar(iter, targetReplication, metaReplication,
+ stripeLength, blockSize, numBlock);
+ iter++;
}
}
} finally {
@@ -191,14 +189,14 @@
* Create parity file, delete original file and then validate that
* parity file is automatically deleted.
*/
- private void doTestHar(int iter, String srcPath, long targetReplication,
+ private void doTestHar(int iter, long targetReplication,
long metaReplication, long stripeLength,
long blockSize, int numBlock) throws Exception {
LOG.info("doTestHar started---------------------------:" + " iter " + iter +
" blockSize=" + blockSize + " stripeLength=" + stripeLength);
- mySetup(srcPath, targetReplication, metaReplication, stripeLength);
- RaidShell shell = null;
+ mySetup(targetReplication, metaReplication, stripeLength);
Path dir = new Path("/user/test/raidtest/subdir/");
+ Path file1 = new Path(dir + "/file" + iter);
RaidNode cnode = null;
try {
Path destPath = new Path("/destraid/user/test/raidtest/subdir");
@@ -211,21 +209,9 @@
LOG.info("doTestHar created test files for iteration " + iter);
// create an instance of the RaidNode
- cnode = RaidNode.createRaidNode(null, conf);
- int times = 10;
-
- while (times-- > 0) {
- try {
- shell = new RaidShell(conf, cnode.getListenerAddress());
- } catch (Exception e) {
- LOG.info("doTestHar unable to connect to " +
- cnode.getListenerAddress() + " retrying....");
- Thread.sleep(1000);
- continue;
- }
- break;
- }
- LOG.info("doTestHar created RaidShell.");
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ cnode = RaidNode.createRaidNode(null, localConf);
FileStatus[] listPaths = null;
int maxFilesFound = 0;
@@ -234,6 +220,7 @@
try {
listPaths = fileSys.listStatus(destPath);
int count = 0;
+ Path harPath = null;
int filesFound = 0;
if (listPaths != null) {
for (FileStatus s : listPaths) {
@@ -250,6 +237,7 @@
// files since some parity files might get deleted by the
// purge thread.
assertEquals(10, maxFilesFound);
+ harPath = s.getPath();
count++;
}
}
@@ -260,11 +248,12 @@
} catch (FileNotFoundException e) {
//ignore
}
- LOG.info("doTestHar waiting for files to be raided and parity files to be har'ed and deleted. Found " +
+ LOG.info("doTestHar waiting for files to be raided and parity files to be har'ed and deleted. Found " +
(listPaths == null ? "none" : listPaths.length));
Thread.sleep(1000); // keep waiting
+
}
-
+
fileSys.delete(dir, true);
// wait till raid file is deleted
int count = 1;
@@ -291,7 +280,6 @@
StringUtils.stringifyException(e));
throw e;
} finally {
- shell.close();
if (cnode != null) { cnode.stop(); cnode.join(); }
}
LOG.info("doTestHar completed:" + " blockSize=" + blockSize +
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
index 6613da0..1bb197a 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.GregorianCalendar;
import java.util.Iterator;
@@ -64,8 +65,11 @@
Configuration conf;
String namenode = null;
+ String hftp = null;
MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
FileSystem fileSys = null;
+ String jobTrackerName = null;
/**
* create mapreduce and dfs clusters
@@ -75,6 +79,7 @@
new File(TEST_DIR).mkdirs(); // Make sure data directory exists
conf = new Configuration();
conf.set("raid.config.file", CONFIG_FILE);
+ conf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
conf.setBoolean("raid.config.reload", true);
conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
conf.setBoolean("dfs.permissions.enabled", true);
@@ -82,76 +87,138 @@
// scan all policies once every 5 second
conf.setLong("raid.policy.rescan.interval", 5000);
+ // make all deletions not go through Trash
+ conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
// the RaidNode does the raiding inline (instead of submitting to map/reduce)
conf.setBoolean("fs.raidnode.local", local);
conf.set("raid.server.address", "localhost:0");
-
+
// create a dfs and map-reduce cluster
final int taskTrackers = 4;
final int jobTrackerPort = 60050;
- dfs = new MiniDFSCluster(conf, 3, true, null);
+ dfs = new MiniDFSCluster(conf, 6, true, null);
dfs.waitActive();
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
- }
-
- /**
- * create raid.xml file for RaidNode
- */
- private void mySetup(String path, short srcReplication, long targetReplication,
- long metaReplication, long stripeLength) throws Exception {
- FileWriter fileWriter = new FileWriter(CONFIG_FILE);
- fileWriter.write("<?xml version=\"1.0\"?>\n");
- String str = "<configuration> " +
- "<srcPath prefix=\"" + path + "\"> " +
- "<policy name = \"RaidTest1\"> " +
- "<destPath> /destraid</destPath> " +
- "<property> " +
- "<name>srcReplication</name> " +
- "<value>" + srcReplication + "</value> " +
- "<description> pick only files whole replFactor is greater than or equal to " +
- "</description> " +
- "</property> " +
- "<property> " +
- "<name>targetReplication</name> " +
- "<value>" + targetReplication + "</value> " +
- "<description>after RAIDing, decrease the replication factor of a file to this value." +
- "</description> " +
- "</property> " +
- "<property> " +
- "<name>metaReplication</name> " +
- "<value>" + metaReplication + "</value> " +
- "<description> replication factor of parity file" +
- "</description> " +
- "</property> " +
- "<property> " +
- "<name>stripeLength</name> " +
- "<value>" + stripeLength + "</value> " +
- "<description> the max number of blocks in a file to RAID together " +
- "</description> " +
- "</property> " +
- "<property> " +
- "<name>modTimePeriod</name> " +
- "<value>2000</value> " +
- "<description> time (milliseconds) after a file is modified to make it " +
- "a candidate for RAIDing " +
- "</description> " +
- "</property> " +
- "</policy>" +
- "</srcPath>" +
- "</configuration>";
- fileWriter.write(str);
- fileWriter.close();
+ mr = new MiniMRCluster(taskTrackers, namenode, 3);
+ jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+ hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
-
+ FileSystem.setDefaultUri(conf, namenode);
+ conf.set("mapred.job.tracker", jobTrackerName);
}
+ class ConfigBuilder {
+ private List<String> policies;
+
+ public ConfigBuilder() {
+ policies = new java.util.ArrayList<String>();
+ }
+
+ public void addPolicy(String name, short srcReplication,
+ long targetReplication, long metaReplication, long stripeLength) {
+ String str =
+ "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
+ "<policy name = \"" + name + "\"> " +
+ "<destPath> /destraid</destPath> " +
+ "<property> " +
+ "<name>srcReplication</name> " +
+ "<value>" + srcReplication + "</value> " +
+ "<description> pick only files whole replFactor is greater than or equal to " +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>targetReplication</name> " +
+ "<value>" + targetReplication + "</value> " +
+ "<description>after RAIDing, decrease the replication factor of a file to this value." +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>metaReplication</name> " +
+ "<value>" + metaReplication + "</value> " +
+ "<description> replication factor of parity file" +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>stripeLength</name> " +
+ "<value>" + stripeLength + "</value> " +
+ "<description> the max number of blocks in a file to RAID together " +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>modTimePeriod</name> " +
+ "<value>2000</value> " +
+ "<description> time (milliseconds) after a file is modified to make it " +
+ "a candidate for RAIDing " +
+ "</description> " +
+ "</property> " +
+ "</policy>" +
+ "</srcPath>";
+ policies.add(str);
+ }
+
+ public void addPolicy(String name, String path, short srcReplication,
+ long targetReplication, long metaReplication, long stripeLength) {
+ String str =
+ "<srcPath prefix=\"" + path + "\"> " +
+ "<policy name = \"" + name + "\"> " +
+ "<destPath> /destraid</destPath> " +
+ "<property> " +
+ "<name>srcReplication</name> " +
+ "<value>" + srcReplication + "</value> " +
+ "<description> pick only files whole replFactor is greater than or equal to " +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>targetReplication</name> " +
+ "<value>" + targetReplication + "</value> " +
+ "<description>after RAIDing, decrease the replication factor of a file to this value." +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>metaReplication</name> " +
+ "<value>" + metaReplication + "</value> " +
+ "<description> replication factor of parity file" +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>stripeLength</name> " +
+ "<value>" + stripeLength + "</value> " +
+ "<description> the max number of blocks in a file to RAID together " +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>modTimePeriod</name> " +
+ "<value>2000</value> " +
+ "<description> time (milliseconds) after a file is modified to make it " +
+ "a candidate for RAIDing " +
+ "</description> " +
+ "</property> " +
+ "</policy>" +
+ "</srcPath>";
+ policies.add(str);
+ }
+
+ public void persist() throws IOException {
+ FileWriter fileWriter = new FileWriter(CONFIG_FILE);
+ fileWriter.write("<?xml version=\"1.0\"?>\n");
+ fileWriter.write("<configuration>");
+ for (String policy: policies) {
+ fileWriter.write(policy);
+ }
+ fileWriter.write("</configuration>");
+ fileWriter.close();
+ }
+ }
+
/**
* stop clusters created earlier
*/
private void stopClusters() throws Exception {
+ if (mr != null) { mr.shutdown(); }
if (dfs != null) { dfs.shutdown(); }
}
@@ -168,8 +235,8 @@
int numBlock = 11;
int iter = 0;
+ createClusters(true);
try {
- createClusters(true);
for (long blockSize : blockSizes) {
for (long stripeLength : stripeLengths) {
doTestPathFilter(iter, targetReplication, metaReplication,
@@ -192,7 +259,10 @@
long blockSize, int numBlock) throws Exception {
LOG.info("doTestPathFilter started---------------------------:" + " iter " + iter +
" blockSize=" + blockSize + " stripeLength=" + stripeLength);
- mySetup("/user/dhruba/raidtest", (short)1, targetReplication, metaReplication, stripeLength);
+ ConfigBuilder cb = new ConfigBuilder();
+ cb.addPolicy("policy1", "/user/dhruba/raidtest", (short)1, targetReplication, metaReplication, stripeLength);
+ cb.persist();
+
RaidShell shell = null;
Path dir = new Path("/user/dhruba/raidtest/");
Path file1 = new Path(dir + "/file" + iter);
@@ -230,7 +300,9 @@
if (listPaths != null && listPaths.length == 1) {
for (FileStatus s : listPaths) {
LOG.info("doTestPathFilter found path " + s.getPath());
- if (!s.getPath().toString().endsWith(".tmp")) {
+ if (!s.getPath().toString().endsWith(".tmp") &&
+ fileSys.getFileStatus(file1).getReplication() ==
+ targetReplication) {
count++;
}
}
@@ -247,28 +319,29 @@
}
// assertEquals(listPaths.length, 1); // all files raided
LOG.info("doTestPathFilter all files found in Raid.");
+ Thread.sleep(20000); // Without this wait, unit test crashes
// check for error at beginning of file
if (numBlock >= 1) {
- LOG.info("Check error at beginning of file.");
+ LOG.info("doTestPathFilter Check error at beginning of file.");
simulateError(shell, fileSys, file1, crc1, 0);
}
// check for error at the beginning of second block
if (numBlock >= 2) {
- LOG.info("Check error at beginning of second block.");
+ LOG.info("doTestPathFilter Check error at beginning of second block.");
simulateError(shell, fileSys, file1, crc1, blockSize + 1);
}
// check for error at the middle of third block
if (numBlock >= 3) {
- LOG.info("Check error at middle of third block.");
+ LOG.info("doTestPathFilter Check error at middle of third block.");
simulateError(shell, fileSys, file1, crc1, 2 * blockSize + 10);
}
// check for error at the middle of second stripe
if (numBlock >= stripeLength + 1) {
- LOG.info("Check error at middle of second stripe.");
+ LOG.info("doTestPathFilter Check error at middle of second stripe.");
simulateError(shell, fileSys, file1, crc1,
stripeLength * blockSize + 100);
}
@@ -297,8 +370,9 @@
long stripeLength = 2;
long blockSize = 1024;
int numBlock = 3;
- mySetup("/user/dhruba/policytest", srcReplication, targetReplication, metaReplication, stripeLength);
- RaidShell shell = null;
+ ConfigBuilder cb = new ConfigBuilder();
+ cb.addPolicy("policy1", "/user/dhruba/policytest", (short)1, targetReplication, metaReplication, stripeLength);
+ cb.persist();
Path dir = new Path("/user/dhruba/policytest/");
Path file1 = new Path(dir + "/file1");
Path file2 = new Path(dir + "/file2");
@@ -309,21 +383,9 @@
fileSys.delete(destPath, true);
// create an instance of the RaidNode
- cnode = RaidNode.createRaidNode(null, conf);
- int times = 10;
-
- while (times-- > 0) {
- try {
- shell = new RaidShell(conf, cnode.getListenerAddress());
- } catch (Exception e) {
- LOG.info("doCheckPolicy unable to connect to " +
- cnode.getListenerAddress() + " retrying....");
- Thread.sleep(1000);
- continue;
- }
- break;
- }
- LOG.info("doCheckPolicy created RaidShell.");
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ cnode = RaidNode.createRaidNode(null, localConf);
// this file should be picked up RaidNode
long crc2 = createOldFile(fileSys, file2, 2, numBlock, blockSize);
@@ -338,7 +400,9 @@
if (listPaths != null && listPaths.length == 1) {
for (FileStatus s : listPaths) {
LOG.info("doCheckPolicy found path " + s.getPath());
- if (!s.getPath().toString().endsWith(".tmp")) {
+ if (!s.getPath().toString().endsWith(".tmp") &&
+ fileSys.getFileStatus(file2).getReplication() ==
+ targetReplication) {
count++;
firstmodtime = s.getModificationTime();
}
@@ -369,7 +433,9 @@
for (FileStatus s : listPaths) {
LOG.info("doCheckPolicy found path " + s.getPath() + " " + s.getModificationTime());
if (!s.getPath().toString().endsWith(".tmp") &&
- s.getModificationTime() > firstmodtime) {
+ s.getModificationTime() > firstmodtime &&
+ fileSys.getFileStatus(file2).getReplication() ==
+ targetReplication) {
count++;
}
}
@@ -389,7 +455,6 @@
StringUtils.stringifyException(e));
throw e;
} finally {
- shell.close();
if (cnode != null) { cnode.stop(); cnode.join(); }
LOG.info("doTestPathFilter delete file " + file1);
fileSys.delete(file1, false);
@@ -397,112 +462,93 @@
LOG.info("doCheckPolicy completed:");
}
+ private void createTestFiles(String path, String destpath) throws IOException {
+ long blockSize = 1024L;
+ Path dir = new Path(path);
+ Path destPath = new Path(destpath);
+ fileSys.delete(dir, true);
+ fileSys.delete(destPath, true);
+
+ for(int i = 0 ; i < 10; i++){
+ Path file = new Path(path + "file" + i);
+ createOldFile(fileSys, file, 1, 7, blockSize);
+ }
+ }
+
/**
* Test dist Raid
*/
public void testDistRaid() throws Exception {
LOG.info("Test testDistRaid started.");
- long blockSize = 1024L;
long targetReplication = 2;
long metaReplication = 2;
long stripeLength = 3;
short srcReplication = 1;
+ createClusters(false);
+ ConfigBuilder cb = new ConfigBuilder();
+ cb.addPolicy("policy1", "/user/dhruba/raidtest", (short)1, targetReplication, metaReplication, stripeLength);
+ cb.addPolicy("policy2", "/user/dhruba/raidtest2", (short)1, targetReplication, metaReplication, stripeLength);
+ cb.persist();
+
+ RaidNode cnode = null;
try {
- createClusters(false);
- mySetup("/user/dhruba/raidtest", srcReplication, targetReplication, metaReplication, stripeLength);
+ createTestFiles("/user/dhruba/raidtest/", "/destraid/user/dhruba/raidtest");
+ createTestFiles("/user/dhruba/raidtest2/", "/destraid/user/dhruba/raidtest2");
LOG.info("Test testDistRaid created test files");
- Path dir = new Path("/user/dhruba/raidtest/");
- Path destPath = new Path("/destraid/user/dhruba/raidtest");
- fileSys.delete(dir, true);
- fileSys.delete(destPath, true);
-
- ConfigManager configMgr = new ConfigManager(conf);
- configMgr.reloadConfigsIfNecessary();
- LOG.info(" testDistRaid ConfigFile Loaded");
-
- // activate all categories
- Collection<PolicyList> all = configMgr.getAllPolicies();
- PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
- Iterator<PolicyInfo> pi = sorted[0].getAll().iterator();
- PolicyInfo p = pi.next();
- List<FileStatus> ll = new ArrayList<FileStatus>();
-
- for(int i = 0 ; i < 10; i++){
- Path file = new Path("/user/dhruba/raidtest/file"+i);
- createOldFile(fileSys, file, 1, 7, blockSize);
- FileStatus st = fileSys.getFileStatus(file);
- ll.add(st);
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ cnode = RaidNode.createRaidNode(null, localConf);
+ // Verify the policies are parsed correctly
+ for (PolicyList policyList : cnode.getAllPolicies()) {
+ for (PolicyInfo p : policyList.getAll()) {
+ if (p.getName().equals("policy1")) {
+ Path srcPath = new Path("/user/dhruba/raidtest");
+ assertTrue(p.getSrcPath().equals(
+ srcPath.makeQualified(srcPath.getFileSystem(conf))));
+ } else {
+ assertTrue(p.getName().equals("policy2"));
+ Path srcPath = new Path("/user/dhruba/raidtest2");
+ assertTrue(p.getSrcPath().equals(
+ srcPath.makeQualified(srcPath.getFileSystem(conf))));
+ }
+ assertEquals(targetReplication,
+ Integer.parseInt(p.getProperty("targetReplication")));
+ assertEquals(metaReplication,
+ Integer.parseInt(p.getProperty("metaReplication")));
+ assertEquals(stripeLength,
+ Integer.parseInt(p.getProperty("stripeLength")));
+ }
}
-
- DistRaid dr = new DistRaid(conf);
- dr.addRaidPaths(p, ll);
- dr.doDistRaid();
+
+ long start = System.currentTimeMillis();
+ final int MAX_WAITTIME = 300000;
+ while (cnode.jobMonitor.jobsMonitored() < 2 &&
+ System.currentTimeMillis() - start < MAX_WAITTIME) {
+ Thread.sleep(1000);
+ }
+ this.assertEquals(cnode.jobMonitor.jobsMonitored(), 2);
+
+ start = System.currentTimeMillis();
+ while (cnode.jobMonitor.jobsSucceeded() < 2 &&
+ System.currentTimeMillis() - start < MAX_WAITTIME) {
+ Thread.sleep(1000);
+ }
+ this.assertEquals(cnode.jobMonitor.jobsSucceeded(), 2);
+
LOG.info("Test testDistRaid successful.");
} catch (Exception e) {
LOG.info("testDistRaid Exception " + e + StringUtils.stringifyException(e));
throw e;
} finally {
+ if (cnode != null) { cnode.stop(); cnode.join(); }
stopClusters();
}
LOG.info("Test testDistRaid completed.");
}
- /**
- * Test the case where the source and destination paths conflict.
- * @throws Exception
- */
- public void testConflictingPaths() throws Exception {
- LOG.info("Test testConflictingPaths started");
- long targetReplication = 2;
- long metaReplication = 2;
- long stripeLength = 3;
- short srcReplication = 1;
- long modTimePeriod = 0;
- try {
- createClusters(false);
- mySetup("/user/d/raidtest", srcReplication, targetReplication,
- metaReplication, stripeLength);
- // We dont need this to run, just need the object.
- RaidNode cnode = RaidNode.createRaidNode(null, conf);
- cnode.stop();
- cnode.join();
-
- createOldFile(fileSys, new Path("/user/d/raidtest/f1"), 2, 7, 8192L);
- LOG.info("Test testConflictingPaths created test files");
-
- long now = System.currentTimeMillis();
-
- // Test the regular case.
- LOG.info("Test testConflictingPaths testing the regular case");
- List<FileStatus> selected = cnode.selectFiles(conf,
- new Path("/user/d/raidtest*"), "/raid",
- modTimePeriod, srcReplication, now);
- assertTrue(selected.size() > 0);
-
- // Test the conflicting case: src under dest.
- LOG.info("Test testConflictingPaths testing src under dest");
- selected = cnode.selectFiles(conf,
- new Path("/user/d/raidtest*"), "/user/d",
- modTimePeriod, srcReplication, now);
- assertEquals(0, selected.size());
-
- // Test the conflicting case: dest under src.
- LOG.info("Test testConflictingPaths testing dest under src");
- selected = cnode.selectFiles(conf,
- new Path("/user/d*"), "/user/d/raidtest",
- modTimePeriod, srcReplication, now);
- assertEquals(0, selected.size());
-
- LOG.info("Test testConflictingPaths succeeded.");
- } finally {
- stopClusters();
- }
- LOG.info("Test testConflictingPaths completed.");
- }
-
//
// simulate a corruption at specified offset and verify that eveyrthing is good
//
@@ -573,4 +619,50 @@
fail("CRC mismatch of files " + name1 + " with file " + name2);
}
}
+
+ public void testSuspendTraversal() throws Exception {
+ LOG.info("Test testSuspendTraversal started.");
+ long targetReplication = 2;
+ long metaReplication = 2;
+ long stripeLength = 3;
+ short srcReplication = 1;
+
+ createClusters(false);
+ ConfigBuilder cb = new ConfigBuilder();
+ cb.addPolicy("policy1", "/user/dhruba/raidtest", (short)1, targetReplication, metaReplication, stripeLength);
+ cb.persist();
+
+ RaidNode cnode = null;
+ try {
+ createTestFiles("/user/dhruba/raidtest/", "/destraid/user/dhruba/raidtest");
+ LOG.info("Test testSuspendTraversal created test files");
+
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ localConf.setInt("raid.distraid.max.files", 3);
+ final int numJobsExpected = 4; // 10 test files: 4 jobs with 3 files each.
+ cnode = RaidNode.createRaidNode(null, localConf);
+
+ long start = System.currentTimeMillis();
+ final int MAX_WAITTIME = 300000;
+
+ start = System.currentTimeMillis();
+ while (cnode.jobMonitor.jobsSucceeded() < numJobsExpected &&
+ System.currentTimeMillis() - start < MAX_WAITTIME) {
+ Thread.sleep(1000);
+ }
+ this.assertEquals(cnode.jobMonitor.jobsMonitored(), numJobsExpected);
+ this.assertEquals(cnode.jobMonitor.jobsSucceeded(), numJobsExpected);
+
+ LOG.info("Test testSuspendTraversal successful.");
+
+ } catch (Exception e) {
+ LOG.info("testSuspendTraversal Exception " + e + StringUtils.stringifyException(e));
+ throw e;
+ } finally {
+ if (cnode != null) { cnode.stop(); cnode.join(); }
+ stopClusters();
+ }
+ LOG.info("Test testSuspendTraversal completed.");
+ }
}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
index 311cbba..c2dcfdf 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
@@ -195,7 +195,6 @@
LOG.info("doTestPurge started---------------------------:" + " iter " + iter +
" blockSize=" + blockSize + " stripeLength=" + stripeLength);
mySetup(srcPath, targetReplication, metaReplication, stripeLength);
- RaidShell shell = null;
Path dir = new Path("/user/dhruba/raidtest/");
Path file1 = new Path(dir + "/file" + iter);
RaidNode cnode = null;
@@ -207,21 +206,9 @@
LOG.info("doTestPurge created test files for iteration " + iter);
// create an instance of the RaidNode
- cnode = RaidNode.createRaidNode(null, conf);
- int times = 10;
-
- while (times-- > 0) {
- try {
- shell = new RaidShell(conf, cnode.getListenerAddress());
- } catch (Exception e) {
- LOG.info("doTestPurge unable to connect to " +
- cnode.getListenerAddress() + " retrying....");
- Thread.sleep(1000);
- continue;
- }
- break;
- }
- LOG.info("doTestPurge created RaidShell.");
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ cnode = RaidNode.createRaidNode(null, localConf);
FileStatus[] listPaths = null;
// wait till file is raided
@@ -266,7 +253,6 @@
StringUtils.stringifyException(e));
throw e;
} finally {
- shell.close();
if (cnode != null) { cnode.stop(); cnode.join(); }
LOG.info("doTestPurge delete file " + file1);
fileSys.delete(file1, true);