blob: c27a12157762c5b13ebd200fa2de6acecd5e7487 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.raid;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.Collection;
import java.util.List;
import java.util.LinkedList;
import java.util.Iterator;
import java.util.Arrays;
import java.util.Random;
import java.util.Set;
import java.util.HashSet;
import java.lang.Thread;
import java.net.InetSocketAddress;
import org.xml.sax.SAXException;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.hadoop.ipc.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.raid.protocol.PolicyList;
import org.apache.hadoop.raid.protocol.RaidProtocol;
import org.apache.hadoop.raid.protocol.RaidProtocol.ReturnStatus;
/**
* A {@link RaidNode} that implements
*/
public class RaidNode implements RaidProtocol {
static{
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
}
public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.raid.RaidNode");
public static final long SLEEP_TIME = 10000L; // 10 seconds
public static final int DEFAULT_PORT = 60000;
public static final int DEFAULT_STRIPE_LENGTH = 5; // default value of stripe length
public static final String DEFAULT_RAID_LOCATION = "/raid";
/** RPC server */
private Server server;
/** RPC server address */
private InetSocketAddress serverAddress = null;
/** only used for testing purposes */
private boolean stopRequested = false;
/** Configuration Manager */
private ConfigManager configMgr;
/** hadoop configuration */
private Configuration conf;
protected boolean initialized; // Are we initialized?
protected volatile boolean running; // Are we running?
/** Deamon thread to trigger policies */
Daemon triggerThread = null;
/** Deamon thread to delete obsolete parity files */
Daemon purgeThread = null;
/** Do do distributed raiding */
boolean isRaidLocal = false;
// statistics about RAW hdfs blocks. This counts all replicas of a block.
public static class Statistics {
long numProcessedBlocks; // total blocks encountered in namespace
long processedSize; // disk space occupied by all blocks
long remainingSize; // total disk space post RAID
long numMetaBlocks; // total blocks in metafile
long metaSize; // total disk space for meta files
public void clear() {
numProcessedBlocks = 0;
processedSize = 0;
remainingSize = 0;
numMetaBlocks = 0;
metaSize = 0;
}
public String toString() {
long save = processedSize - (remainingSize + metaSize);
long savep = 0;
if (processedSize > 0) {
savep = (save * 100)/processedSize;
}
String msg = " numProcessedBlocks = " + numProcessedBlocks +
" processedSize = " + processedSize +
" postRaidSize = " + remainingSize +
" numMetaBlocks = " + numMetaBlocks +
" metaSize = " + metaSize +
" %save in raw disk space = " + savep;
return msg;
}
}
// Startup options
static public enum StartupOption{
TEST ("-test"),
REGULAR ("-regular");
private String name = null;
private StartupOption(String arg) {this.name = arg;}
public String getName() {return name;}
}
/**
* Start RaidNode.
* <p>
* The raid-node can be started with one of the following startup options:
* <ul>
* <li>{@link StartupOption#REGULAR REGULAR} - normal raid node startup</li>
* </ul>
* The option is passed via configuration field:
* <tt>fs.raidnode.startup</tt>
*
* The conf will be modified to reflect the actual ports on which
* the RaidNode is up and running if the user passes the port as
* <code>zero</code> in the conf.
*
* @param conf confirguration
* @throws IOException
*/
RaidNode(Configuration conf) throws IOException {
try {
initialize(conf);
} catch (IOException e) {
this.stop();
throw e;
} catch (Exception e) {
this.stop();
throw new IOException(e);
}
}
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
if (protocol.equals(RaidProtocol.class.getName())) {
return RaidProtocol.versionID;
} else {
throw new IOException("Unknown protocol to name node: " + protocol);
}
}
/**
* Wait for service to finish.
* (Normally, it runs forever.)
*/
public void join() {
try {
if (server != null) server.join();
if (triggerThread != null) triggerThread.join();
if (purgeThread != null) purgeThread.join();
} catch (InterruptedException ie) {
// do nothing
}
}
/**
* Stop all RaidNode threads and wait for all to finish.
*/
public void stop() {
if (stopRequested) {
return;
}
stopRequested = true;
running = false;
if (server != null) server.stop();
if (triggerThread != null) triggerThread.interrupt();
if (purgeThread != null) purgeThread.interrupt();
}
private static InetSocketAddress getAddress(String address) {
return NetUtils.createSocketAddr(address);
}
public static InetSocketAddress getAddress(Configuration conf) {
String nodeport = conf.get("raid.server.address");
if (nodeport == null) {
nodeport = "localhost:" + DEFAULT_PORT;
}
return getAddress(nodeport);
}
private void initialize(Configuration conf)
throws IOException, SAXException, InterruptedException, RaidConfigurationException,
ClassNotFoundException, ParserConfigurationException {
this.conf = conf;
InetSocketAddress socAddr = RaidNode.getAddress(conf);
int handlerCount = conf.getInt("fs.raidnode.handler.count", 10);
isRaidLocal = conf.getBoolean("fs.raidnode.local", false);
// read in the configuration
configMgr = new ConfigManager(conf);
// create rpc server
this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
handlerCount, false, conf);
// The rpc-server port can be ephemeral... ensure we have the correct info
this.serverAddress = this.server.getListenerAddress();
LOG.info("RaidNode up at: " + this.serverAddress);
initialized = true;
running = true;
this.server.start(); // start RPC server
// start the deamon thread to fire polcies appropriately
this.triggerThread = new Daemon(new TriggerMonitor());
this.triggerThread.start();
// start the thread that deletes obsolete parity files
this.purgeThread = new Daemon(new PurgeMonitor());
this.purgeThread.start();
}
/**
* Implement RaidProtocol methods
*/
/** {@inheritDoc} */
public PolicyList[] getAllPolicies() throws IOException {
Collection<PolicyList> list = configMgr.getAllPolicies();
return list.toArray(new PolicyList[list.size()]);
}
/** {@inheritDoc} */
public ReturnStatus recoverFile(String inStr, long corruptOffset) throws IOException {
LOG.info("Recover File for " + inStr + " for corrupt offset " + corruptOffset);
Path inputPath = new Path(inStr);
Path srcPath = inputPath.makeQualified(inputPath.getFileSystem(conf));
PolicyInfo info = findMatchingPolicy(srcPath);
if (info != null) {
// find stripe length from config
int stripeLength = getStripeLength(conf, info);
// create destination path prefix
String destPrefix = getDestinationPath(conf, info);
Path destPath = new Path(destPrefix.trim());
FileSystem fs = FileSystem.get(destPath.toUri(), conf);
destPath = destPath.makeQualified(fs);
unRaid(conf, srcPath, destPath, stripeLength, corruptOffset);
}
return ReturnStatus.SUCCESS;
}
/**
* Periodically checks to see which policies should be fired.
*/
class TriggerMonitor implements Runnable {
/**
*/
public void run() {
while (running) {
try {
doProcess();
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
} finally {
LOG.info("Trigger thread continuing to run...");
}
}
}
/**
* Keep processing policies.
* If the config file has changed, then reload config file and start afresh.
*/
private void doProcess() throws IOException, InterruptedException {
PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
long prevExec = 0;
DistRaid dr = null;
while (running) {
boolean reload = configMgr.reloadConfigsIfNecessary();
while(!reload && now() < prevExec + configMgr.getPeriodicity()){
Thread.sleep(SLEEP_TIME);
reload = configMgr.reloadConfigsIfNecessary();
}
prevExec = now();
// activate all categories
Collection<PolicyList> all = configMgr.getAllPolicies();
// sort all policies by reverse lexicographical order. This is needed
// to make the nearest policy take precedence.
PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
Arrays.sort(sorted, lexi);
if (!isRaidLocal) {
dr = new DistRaid(conf);
}
// paths we have processed so far
List<String> processed = new LinkedList<String>();
for (PolicyList category : sorted) {
for (PolicyInfo info: category.getAll()) {
long modTimePeriod = 0;
short srcReplication = 0;
String str = info.getProperty("modTimePeriod");
if (str != null) {
modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
}
str = info.getProperty("srcReplication");
if (str != null) {
srcReplication = Short.parseShort(info.getProperty("srcReplication"));
}
LOG.info("Triggering Policy Filter " + info.getName() +
" " + info.getSrcPath());
List<FileStatus> filteredPaths = null;
try {
filteredPaths = selectFiles(conf, info.getSrcPath(),
getDestinationPath(conf, info),
modTimePeriod,
srcReplication,
prevExec);
} catch (Exception e) {
LOG.info("Exception while invoking filter on policy " + info.getName() +
" srcPath " + info.getSrcPath() +
" exception " + StringUtils.stringifyException(e));
continue;
}
if (filteredPaths == null || filteredPaths.size() == 0) {
LOG.info("No filtered paths for policy " + info.getName());
continue;
}
// If any of the filtered path has already been accepted
// by a previous policy, then skip it.
for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
String fs = iter.next().getPath().toString() + "/";
for (String p : processed) {
if (p.startsWith(fs)) {
iter.remove();
break;
}
}
}
// Apply the action on accepted paths
LOG.info("Triggering Policy Action " + info.getName());
try {
if (isRaidLocal){
doRaid(conf, info, filteredPaths);
}
else{
//add paths for distributed raiding
dr.addRaidPaths(info, filteredPaths);
}
} catch (Exception e) {
LOG.info("Exception while invoking action on policy " + info.getName() +
" srcPath " + info.getSrcPath() +
" exception " + StringUtils.stringifyException(e));
continue;
}
// add these paths to processed paths
for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
String p = iter.next().getPath().toString() + "/";
processed.add(p);
}
}
}
processed.clear(); // free up memory references before yielding
//do the distributed raiding
if (!isRaidLocal) {
dr.doDistRaid();
}
}
}
}
/**
* Returns the policy that matches the specified path.
* The method below finds the first policy that matches an input path. Since different
* policies with different purposes and destinations might be associated with the same input
* path, we should be skeptical about the places using the method and we should try to change
* the code to avoid it.
*/
private PolicyInfo findMatchingPolicy(Path inpath) throws IOException {
PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
Collection<PolicyList> all = configMgr.getAllPolicies();
// sort all policies by reverse lexicographical order. This is needed
// to make the nearest policy take precedence.
PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
Arrays.sort(sorted, lexi);
// loop through all categories of policies.
for (PolicyList category : sorted) {
PolicyInfo first = category.getAll().iterator().next();
if (first != null) {
Path[] srcPaths = first.getSrcPathExpanded(); // input src paths unglobbed
if (srcPaths == null) {
continue;
}
for (Path src: srcPaths) {
if (inpath.toString().startsWith(src.toString())) {
// if the srcpath is a prefix of the specified path
// we have a match!
return first;
}
}
}
}
return null; // no matching policies
}
/**
* Returns a list of pathnames that needs raiding.
*/
private List<FileStatus> selectFiles(Configuration conf, Path p, String destPrefix,
long modTimePeriod, short srcReplication, long now) throws IOException {
List<FileStatus> returnSet = new LinkedList<FileStatus>();
// expand destination prefix path
Path destp = new Path(destPrefix.trim());
FileSystem fs = FileSystem.get(destp.toUri(), conf);
destp = destp.makeQualified(fs);
fs = p.getFileSystem(conf);
FileStatus[] gpaths = fs.globStatus(p);
if (gpaths != null){
for (FileStatus onepath: gpaths) {
recurse(fs, conf, destp, onepath, returnSet, modTimePeriod, srcReplication, now);
}
}
return returnSet;
}
/**
* Pick files that need to be RAIDed.
*/
private void recurse(FileSystem srcFs,
Configuration conf,
Path destPathPrefix,
FileStatus src,
List<FileStatus> accept,
long modTimePeriod,
short srcReplication,
long now) throws IOException {
Path path = src.getPath();
FileStatus[] files = null;
try {
files = srcFs.listStatus(path);
} catch (java.io.FileNotFoundException e) {
// ignore error because the file could have been deleted by an user
LOG.info("FileNotFound " + path + " " + StringUtils.stringifyException(e));
} catch (IOException e) {
throw e;
}
// If the modTime of the raid file is later than the modtime of the
// src file and the src file has not been modified
// recently, then that file is a candidate for RAID.
if (!src.isDir()) { // a file
// if the source file has fewer than or equal to 2 blocks, then no need to RAID
long blockSize = src.getBlockSize();
if (2 * blockSize >= src.getLen()) {
return;
}
// If the replication factor of the source file is already below what is
// expected by RAID, then ignore it.
if (src.getReplication() < srcReplication) {
return;
}
// check if destination path already exists. If it does and it's modification time
// does not match the modTime of the source file, then recalculate RAID
boolean add = false;
try {
Path outpath = new Path(destPathPrefix, makeRelative(path));
FileSystem outFs = outpath.getFileSystem(conf);
FileStatus ostat = outFs.getFileStatus(outpath);
if (ostat.getModificationTime() != src.getModificationTime() &&
src.getModificationTime() + modTimePeriod < now) {
add = true;
}
} catch (java.io.FileNotFoundException e) {
add = true; // destination file does not exist
}
if (add) {
accept.add(src);
}
return;
} else if (files != null) {
for (FileStatus one:files) {
recurse(srcFs, conf, destPathPrefix, one, accept, modTimePeriod, srcReplication, now);
}
}
}
/**
* RAID a list of files.
*/
void doRaid(Configuration conf, PolicyInfo info, List<FileStatus> paths)
throws IOException {
int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
int stripeLength = getStripeLength(conf, info);
String destPrefix = getDestinationPath(conf, info);
String simulate = info.getProperty("simulate");
boolean doSimulate = simulate == null ? false : Boolean
.parseBoolean(simulate);
Statistics statistics = new Statistics();
int count = 0;
Path p = new Path(destPrefix.trim());
FileSystem fs = FileSystem.get(p.toUri(), conf);
p = p.makeQualified(fs);
for (FileStatus s : paths) {
doRaid(conf, s, p, statistics, null, doSimulate, targetRepl, metaRepl,
stripeLength);
if (count % 1000 == 0) {
LOG.info("RAID statistics " + statistics.toString());
}
count++;
}
LOG.info("RAID statistics " + statistics.toString());
}
/**
* RAID an individual file
*/
static public void doRaid(Configuration conf, PolicyInfo info,
FileStatus src, Statistics statistics, Reporter reporter) throws IOException {
int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
int stripeLength = getStripeLength(conf, info);
String destPrefix = getDestinationPath(conf, info);
String simulate = info.getProperty("simulate");
boolean doSimulate = simulate == null ? false : Boolean
.parseBoolean(simulate);
int count = 0;
Path p = new Path(destPrefix.trim());
FileSystem fs = FileSystem.get(p.toUri(), conf);
p = p.makeQualified(fs);
doRaid(conf, src, p, statistics, reporter, doSimulate, targetRepl, metaRepl,
stripeLength);
}
/**
* RAID an individual file
*/
static private void doRaid(Configuration conf, FileStatus stat, Path destPath,
Statistics statistics, Reporter reporter, boolean doSimulate,
int targetRepl, int metaRepl, int stripeLength)
throws IOException {
Path p = stat.getPath();
FileSystem srcFs = p.getFileSystem(conf);
// extract block locations from File system
BlockLocation[] locations = srcFs.getFileBlockLocations(stat, 0, stat.getLen());
// if the file has fewer than 2 blocks, then nothing to do
if (locations.length <= 2) {
return;
}
// add up the raw disk space occupied by this file
long diskSpace = 0;
for (BlockLocation l: locations) {
diskSpace += (l.getLength() * stat.getReplication());
}
statistics.numProcessedBlocks += locations.length;
statistics.processedSize += diskSpace;
// generate parity file
generateParityFile(conf, stat, reporter, srcFs, destPath, locations, metaRepl, stripeLength);
// reduce the replication factor of the source file
if (!doSimulate) {
if (srcFs.setReplication(p, (short)targetRepl) == false) {
LOG.info("Error in reducing relication factor of file " + p + " to " + targetRepl);
statistics.remainingSize += diskSpace; // no change in disk space usage
return;
}
}
diskSpace = 0;
for (BlockLocation l: locations) {
diskSpace += (l.getLength() * targetRepl);
}
statistics.remainingSize += diskSpace;
// the metafile will have this many number of blocks
int numMeta = locations.length / stripeLength;
if (locations.length % stripeLength != 0) {
numMeta++;
}
// we create numMeta for every file. This metablock has metaRepl # replicas.
// the last block of the metafile might not be completely filled up, but we
// ignore that for now.
statistics.numMetaBlocks += (numMeta * metaRepl);
statistics.metaSize += (numMeta * metaRepl * stat.getBlockSize());
}
/**
* Create the parity file.
*/
static private void generateParityFile(Configuration conf, FileStatus stat,
Reporter reporter,
FileSystem inFs,
Path destPathPrefix, BlockLocation[] locations,
int metaRepl, int stripeLength) throws IOException {
// two buffers for generating parity
int bufSize = 5 * 1024 * 1024; // 5 MB
byte[] bufs = new byte[bufSize];
byte[] xor = new byte[bufSize];
byte zero = 0;
Path inpath = stat.getPath();
long blockSize = stat.getBlockSize();
long fileSize = stat.getLen();
// create output tmp path
Path outpath = new Path(destPathPrefix, makeRelative(inpath));
Path tmppath = new Path(outpath + ".tmp");
FileSystem outFs = outpath.getFileSystem(conf);
// if the parity file is already upto-date, then nothing to do
try {
FileStatus stmp = outFs.getFileStatus(outpath);
if (stmp.getModificationTime() == stat.getModificationTime()) {
LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath +
" already upto-date. Nothing more to do.");
}
} catch (IOException e) {
// ignore errors because the raid file might not exist yet.
}
LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath);
FSDataOutputStream out = outFs.create(tmppath,
true,
conf.getInt("io.file.buffer.size", 64 * 1024),
(short)metaRepl,
blockSize);
try {
// loop once for every stripe length
for (int startBlock = 0; startBlock < locations.length;) {
// report progress to Map-reduce framework
if (reporter != null) {
reporter.progress();
}
int blocksLeft = locations.length - startBlock;
int stripe = Math.min(stripeLength, blocksLeft);
LOG.info(" startBlock " + startBlock + " stripe " + stripe);
// open a new file descriptor for each block in this stripe.
// make each fd point to the beginning of each block in this stripe.
FSDataInputStream[] ins = new FSDataInputStream[stripe];
for (int i = 0; i < stripe; i++) {
ins[i] = inFs.open(inpath, bufSize);
ins[i].seek(blockSize * (startBlock + i));
}
generateParity(ins,out,blockSize,bufs,xor, reporter);
// close input file handles
for (int i = 0; i < ins.length; i++) {
ins[i].close();
}
// increment startBlock to point to the first block to be processed
// in the next iteration
startBlock += stripe;
}
out.close();
out = null;
// rename tmppath to the real parity filename
if (!outFs.rename(tmppath, outpath)) {
String msg = "Unable to rename tmp file " + tmppath + " to " + outpath;
LOG.warn(msg);
throw new IOException (msg);
}
} finally {
// remove the tmp file if it still exists
outFs.delete(tmppath, false);
}
// set the modification time of the RAID file. This is done so that the modTime of the
// RAID file reflects that contents of the source file that it has RAIDed. This should
// also work for files that are being appended to. This is necessary because the time on
// on the destination namenode may not be synchronised with the timestamp of the
// source namenode.
outFs.setTimes(outpath, stat.getModificationTime(), -1);
FileStatus outstat = outFs.getFileStatus(outpath);
LOG.info("Source file " + inpath + " of size " + fileSize +
" Parity file " + outpath + " of size " + outstat.getLen() +
" src mtime " + stat.getModificationTime() +
" parity mtime " + outstat.getModificationTime());
}
private static int readInputUntilEnd(FSDataInputStream ins, byte[] bufs, int toRead)
throws IOException {
int tread = 0;
while (tread < toRead) {
int read = ins.read(bufs, tread, toRead - tread);
if (read == -1) {
return tread;
} else {
tread += read;
}
}
return tread;
}
private static void generateParity(FSDataInputStream[] ins, FSDataOutputStream fout,
long parityBlockSize, byte[] bufs, byte[] xor, Reporter reporter) throws IOException {
int bufSize;
if ((bufs == null) || (bufs.length == 0)){
bufSize = 5 * 1024 * 1024; // 5 MB
bufs = new byte[bufSize];
} else {
bufSize = bufs.length;
}
if ((xor == null) || (xor.length != bufs.length)){
xor = new byte[bufSize];
}
int xorlen = 0;
// this loop processes all good blocks in selected stripe
long remaining = parityBlockSize;
while (remaining > 0) {
int toRead = (int)Math.min(remaining, bufSize);
if (ins.length > 0) {
xorlen = readInputUntilEnd(ins[0], xor, toRead);
}
// read all remaining blocks and xor them into the buffer
for (int i = 1; i < ins.length; i++) {
// report progress to Map-reduce framework
if (reporter != null) {
reporter.progress();
}
int actualRead = readInputUntilEnd(ins[i], bufs, toRead);
int j;
int xorlimit = (int) Math.min(xorlen,actualRead);
for (j = 0; j < xorlimit; j++) {
xor[j] ^= bufs[j];
}
if ( actualRead > xorlen ){
for (; j < actualRead; j++) {
xor[j] = bufs[j];
}
xorlen = actualRead;
}
}
if (xorlen < toRead) {
Arrays.fill(bufs, xorlen, toRead, (byte) 0);
}
// write this to the tmp file
fout.write(xor, 0, toRead);
remaining -= toRead;
}
}
/**
* Extract a good block from the parity block. This assumes that the corruption
* is in the main file and the parity file is always good.
*/
public static Path unRaid(Configuration conf, Path srcPath, Path destPathPrefix,
int stripeLength, long corruptOffset) throws IOException {
// extract block locations, size etc from source file
Random rand = new Random();
FileSystem srcFs = srcPath.getFileSystem(conf);
FileStatus srcStat = srcFs.getFileStatus(srcPath);
BlockLocation[] locations = srcFs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
long blockSize = srcStat.getBlockSize();
long fileSize = srcStat.getLen();
int totalBlocks = locations.length;
// find the stripe number where the corrupted offset lies
long snum = corruptOffset / (stripeLength * blockSize);
long startOffset = snum * stripeLength * blockSize;
long corruptBlockInStripe = (corruptOffset - startOffset)/blockSize;
long corruptBlockSize = Math.min(blockSize, fileSize - startOffset);
LOG.info("Start offset of relevent stripe = " + startOffset +
" corruptBlockInStripe " + corruptBlockInStripe);
// open file descriptors to read all good blocks of the file
FSDataInputStream[] instmp = new FSDataInputStream[stripeLength];
int numLength = 0;
for (int i = 0; i < stripeLength; i++) {
if (i == corruptBlockInStripe) {
continue; // do not open corrupt block
}
if (startOffset + i * blockSize >= fileSize) {
LOG.info("Stop offset of relevent stripe = " +
startOffset + i * blockSize);
break;
}
instmp[numLength] = srcFs.open(srcPath);
instmp[numLength].seek(startOffset + i * blockSize);
numLength++;
}
// create array of inputstream, allocate one extra slot for
// parity file. numLength could be smaller than stripeLength
// if we are processing the last partial stripe on a file.
numLength += 1;
FSDataInputStream[] ins = new FSDataInputStream[numLength];
for (int i = 0; i < numLength-1; i++) {
ins[i] = instmp[i];
}
LOG.info("Decompose a total of " + numLength + " blocks.");
// open and seek to the appropriate offset in parity file.
Path parityFile = new Path(destPathPrefix, makeRelative(srcPath));
FileSystem parityFs = parityFile.getFileSystem(conf);
LOG.info("Parity file for " + srcPath + " is " + parityFile);
ins[numLength-1] = parityFs.open(parityFile);
ins[numLength-1].seek(snum * blockSize);
LOG.info("Parity file " + parityFile +
" seeking to relevent block at offset " +
ins[numLength-1].getPos());
// create a temporary filename in the source filesystem
// do not overwrite an existing tmp file. Make it fail for now.
// We need to generate a unique name for this tmp file later on.
Path tmpFile = null;
FSDataOutputStream fout = null;
int retry = 5;
try {
tmpFile = new Path("/tmp/dhruba/" + rand.nextInt());
fout = parityFs.create(tmpFile, false);
} catch (IOException e) {
if (retry-- <= 0) {
LOG.info("Unable to create temporary file " + tmpFile +
" Aborting....");
throw e;
}
LOG.info("Unable to create temporary file " + tmpFile +
"Retrying....");
}
LOG.info("Created recovered block file " + tmpFile);
// buffers for generating parity bits
int bufSize = 5 * 1024 * 1024; // 5 MB
byte[] bufs = new byte[bufSize];
byte[] xor = new byte[bufSize];
generateParity(ins,fout,corruptBlockSize,bufs,xor,null);
// close all files
fout.close();
for (int i = 0; i < ins.length; i++) {
ins[i].close();
}
// Now, reopen the source file and the recovered block file
// and copy all relevant data to new file
Path recoveredPath = new Path(destPathPrefix, makeRelative(srcPath));
recoveredPath = new Path(recoveredPath + ".recovered");
LOG.info("Creating recovered file " + recoveredPath);
FSDataInputStream sin = srcFs.open(srcPath);
FSDataOutputStream out = parityFs.create(recoveredPath, false,
conf.getInt("io.file.buffer.size", 64 * 1024),
srcStat.getReplication(),
srcStat.getBlockSize());
FSDataInputStream bin = parityFs.open(tmpFile);
long recoveredSize = 0;
// copy all the good blocks (upto the corruption)
// from source file to output file
long remaining = corruptOffset / blockSize * blockSize;
while (remaining > 0) {
int toRead = (int)Math.min(remaining, bufSize);
sin.readFully(bufs, 0, toRead);
out.write(bufs, 0, toRead);
remaining -= toRead;
recoveredSize += toRead;
}
LOG.info("Copied upto " + recoveredSize + " from src file. ");
// copy recovered block to output file
remaining = corruptBlockSize;
while (recoveredSize < fileSize &&
remaining > 0) {
int toRead = (int)Math.min(remaining, bufSize);
bin.readFully(bufs, 0, toRead);
out.write(bufs, 0, toRead);
remaining -= toRead;
recoveredSize += toRead;
}
LOG.info("Copied upto " + recoveredSize + " from recovered-block file. ");
// skip bad block in src file
if (recoveredSize < fileSize) {
sin.seek(sin.getPos() + corruptBlockSize);
}
// copy remaining good data from src file to output file
while (recoveredSize < fileSize) {
int toRead = (int)Math.min(fileSize - recoveredSize, bufSize);
sin.readFully(bufs, 0, toRead);
out.write(bufs, 0, toRead);
recoveredSize += toRead;
}
out.close();
LOG.info("Completed writing " + recoveredSize + " bytes into " +
recoveredPath);
sin.close();
bin.close();
// delete the temporary block file that was created.
parityFs.delete(tmpFile, false);
LOG.info("Deleted temporary file " + tmpFile);
// copy the meta information from source path to the newly created
// recovered path
copyMetaInformation(parityFs, srcStat, recoveredPath);
return recoveredPath;
}
/**
* Periodically delete orphaned parity files.
*/
class PurgeMonitor implements Runnable {
/**
*/
public void run() {
while (running) {
try {
doPurge();
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
} finally {
LOG.info("Purge parity files thread continuing to run...");
}
}
}
/**
* Delete orphaned files. The reason this is done by a separate thread
* is to not burden the TriggerMonitor with scanning the
* destination directories.
*/
private void doPurge() throws IOException, InterruptedException {
PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
long prevExec = 0;
while (running) {
// The config may be reloaded by the TriggerMonitor.
// This thread uses whatever config is currently active.
while(now() < prevExec + configMgr.getPeriodicity()){
Thread.sleep(SLEEP_TIME);
}
prevExec = now();
// fetch all categories
Collection<PolicyList> all = configMgr.getAllPolicies();
// sort all policies by reverse lexicographical order. This is
// needed to make the nearest policy take precedence.
PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
Arrays.sort(sorted, lexi);
// paths we have processed so far
Set<Path> processed = new HashSet<Path>();
for (PolicyList category : sorted) {
for (PolicyInfo info: category.getAll()) {
try {
// expand destination prefix path
String destinationPrefix = getDestinationPath(conf, info);
Path destp = new Path(destinationPrefix.trim());
FileSystem destFs = FileSystem.get(destp.toUri(), conf);
destp = destp.makeQualified(destFs);
destinationPrefix = destp.toUri().getPath(); // strip host:port
destp = new Path(destp, makeRelative(info.getSrcPath()));
// if this destination path has already been processed as part
// of another policy, then nothing more to do
if (processed.contains(destp)) {
LOG.info("Obsolete parity files for policy " +
info.getName() + " has already been procesed.");
continue;
}
FileSystem srcFs = info.getSrcPath().getFileSystem(conf);
FileStatus stat = null;
try {
stat = destFs.getFileStatus(destp);
} catch (FileNotFoundException e) {
// do nothing, leave stat = null;
}
if (stat != null) {
LOG.info("Purging obsolete parity files for policy " +
info.getName() + " " + destp);
recursePurge(srcFs, destFs, destinationPrefix, stat);
}
// this destination path has already been processed
processed.add(destp);
} catch (Exception e) {
LOG.warn("Ignoring Exception while processing policy " +
info.getName() + " " +
StringUtils.stringifyException(e));
}
}
}
}
}
/**
* The destPrefix is the absolute pathname of the destinationPath
* specified in the policy (without the host:port)
*/
private void recursePurge(FileSystem srcFs, FileSystem destFs,
String destPrefix, FileStatus dest)
throws IOException {
Path destPath = dest.getPath(); // pathname, no host:port
String destStr = destPath.toUri().getPath();
LOG.debug("Checking " + destPath + " prefix " + destPrefix);
// Verify the destPrefix is a prefix of the destPath
if (!destStr.startsWith(destPrefix)) {
LOG.error("Destination path " + destStr + " should have " +
destPrefix + " as its prefix.");
return;
}
String src = destStr.replaceFirst(destPrefix, "");
// if the source path does not exist, then delete the
// destination path as well
Path srcPath = new Path(src);
if (!srcFs.exists(srcPath)) {
boolean done = destFs.delete(destPath, false);
if (done) {
LOG.info("Purged path " + destPath );
} else {
LOG.info("Unable to purge path " + destPath );
}
return;
}
if (!dest.isDir()) {
return;
}
FileStatus[] files = null;
files = destFs.listStatus(destPath);
if (files != null) {
for (FileStatus one:files) {
recursePurge(srcFs, destFs, destPrefix, one);
}
}
}
}
/**
* If the config file has an entry for hdfs.raid.locations, then that overrides
* destination path specified in the raid policy file
*/
static private String getDestinationPath(Configuration conf, PolicyInfo info) {
String locs = conf.get("hdfs.raid.locations");
if (locs != null) {
return locs;
}
locs = info.getDestinationPath();
if (locs == null) {
return DEFAULT_RAID_LOCATION;
}
return locs;
}
/**
* If the config file has an entry for hdfs.raid.stripeLength, then use that
* specified in the raid policy file
*/
static private int getStripeLength(Configuration conf, PolicyInfo info)
throws IOException {
int len = conf.getInt("hdfs.raid.stripeLength", 0);
if (len != 0) {
return len;
}
String str = info.getProperty("stripeLength");
if (str == null) {
String msg = "hdfs.raid.stripeLength is not defined." +
" Using a default " + DEFAULT_STRIPE_LENGTH;
LOG.info(msg);
return DEFAULT_STRIPE_LENGTH;
}
return Integer.parseInt(str);
}
/**
* Copy the file owner, modtime, etc from srcPath to the recovered Path.
* It is possiible that we might have to retrieve file persmissions,
* quotas, etc too in future.
*/
static private void copyMetaInformation(FileSystem fs, FileStatus stat,
Path recoveredPath)
throws IOException {
fs.setOwner(recoveredPath, stat.getOwner(), stat.getGroup());
fs.setPermission(recoveredPath, stat.getPermission());
fs.setTimes(recoveredPath, stat.getModificationTime(), stat.getAccessTime());
}
/**
* Returns current time.
*/
static long now() {
return System.currentTimeMillis();
}
/**
* Make an absolute path relative by stripping the leading /
*/
static private Path makeRelative(Path path) {
if (!path.isAbsolute()) {
return path;
}
String p = path.toUri().getPath();
String relative = p.substring(1, p.length());
return new Path(relative);
}
private static void printUsage() {
System.err.println("Usage: java RaidNode ");
}
private static StartupOption parseArguments(String args[]) {
int argsLen = (args == null) ? 0 : args.length;
StartupOption startOpt = StartupOption.REGULAR;
for(int i=0; i < argsLen; i++) {
String cmd = args[i]; // We have to parse command line args in future.
}
return startOpt;
}
/**
* Convert command line options to configuration parameters
*/
private static void setStartupOption(Configuration conf, StartupOption opt) {
conf.set("fs.raidnode.startup", opt.toString());
}
/**
* Create an instance of the RaidNode
*/
public static RaidNode createRaidNode(String argv[],
Configuration conf) throws IOException {
if (conf == null) {
conf = new Configuration();
}
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage();
return null;
}
setStartupOption(conf, startOpt);
RaidNode node = new RaidNode(conf);
return node;
}
/**
*/
public static void main(String argv[]) throws Exception {
try {
StringUtils.startupShutdownMessage(RaidNode.class, argv, LOG);
RaidNode raid = createRaidNode(argv, null);
if (raid != null) {
raid.join();
}
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
}
}
}