blob: 985774bcdcccabc08238cda7c47bf9bc70edf9e1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.raid.Decoder;
import org.apache.hadoop.raid.RaidNode;
import org.apache.hadoop.raid.ReedSolomonDecoder;
import org.apache.hadoop.raid.XORDecoder;
import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
import org.apache.hadoop.util.ReflectionUtils;
/**
* This is an implementation of the Hadoop RAID Filesystem. This FileSystem
* wraps an instance of the DistributedFileSystem.
* If a file is corrupted, this FileSystem uses the parity blocks to
* regenerate the bad block.
*/
public class DistributedRaidFileSystem extends FilterFileSystem {
// these are alternate locations that can be used for read-only access
DecodeInfo[] alternates;
Configuration conf;
int stripeLength;
DistributedRaidFileSystem() throws IOException {
}
DistributedRaidFileSystem(FileSystem fs) throws IOException {
super(fs);
alternates = null;
stripeLength = 0;
}
// Information required for decoding a source file
static private class DecodeInfo {
final Path destPath;
final ErasureCodeType type;
final Configuration conf;
final int stripeLength;
private DecodeInfo(Configuration conf, ErasureCodeType type, Path destPath) {
this.conf = conf;
this.type = type;
this.destPath = destPath;
this.stripeLength = RaidNode.getStripeLength(conf);
}
Decoder createDecoder() {
if (this.type == ErasureCodeType.XOR) {
return new XORDecoder(conf, stripeLength);
} else if (this.type == ErasureCodeType.RS) {
return new ReedSolomonDecoder(conf, stripeLength,
RaidNode.rsParityLength(conf));
}
return null;
}
}
/* Initialize a Raid FileSystem
*/
public void initialize(URI name, Configuration conf) throws IOException {
this.conf = conf;
Class<?> clazz = conf.getClass("fs.raid.underlyingfs.impl",
DistributedFileSystem.class);
if (clazz == null) {
throw new IOException("No FileSystem for fs.raid.underlyingfs.impl.");
}
this.fs = (FileSystem)ReflectionUtils.newInstance(clazz, null);
super.initialize(name, conf);
// find stripe length configured
stripeLength = RaidNode.getStripeLength(conf);
if (stripeLength == 0) {
LOG.info("dfs.raid.stripeLength is incorrectly defined to be " +
stripeLength + " Ignoring...");
return;
}
// Put XOR and RS in alternates
alternates= new DecodeInfo[2];
Path xorPath = RaidNode.xorDestinationPath(conf, fs);
alternates[0] = new DecodeInfo(conf, ErasureCodeType.XOR, xorPath);
Path rsPath = RaidNode.rsDestinationPath(conf, fs);
alternates[1] = new DecodeInfo(conf, ErasureCodeType.RS, rsPath);
}
/*
* Returns the underlying filesystem
*/
public FileSystem getFileSystem() throws IOException {
return fs;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
ExtFSDataInputStream fd = new ExtFSDataInputStream(conf, this, alternates, f,
stripeLength, bufferSize);
return fd;
}
public void close() throws IOException {
if (fs != null) {
try {
fs.close();
} catch(IOException ie) {
//this might already be closed, ignore
}
}
super.close();
}
/**
* Layered filesystem input stream. This input stream tries reading
* from alternate locations if it encoumters read errors in the primary location.
*/
private static class ExtFSDataInputStream extends FSDataInputStream {
private static class UnderlyingBlock {
// File that holds this block. Need not be the same as outer file.
public Path path;
// Offset within path where this block starts.
public long actualFileOffset;
// Offset within the outer file where this block starts.
public long originalFileOffset;
// Length of the block (length <= blk sz of outer file).
public long length;
public UnderlyingBlock(Path path, long actualFileOffset,
long originalFileOffset, long length) {
this.path = path;
this.actualFileOffset = actualFileOffset;
this.originalFileOffset = originalFileOffset;
this.length = length;
}
}
/**
* Create an input stream that wraps all the reads/positions/seeking.
*/
private static class ExtFsInputStream extends FSInputStream {
// Extents of "good" underlying data that can be read.
private UnderlyingBlock[] underlyingBlocks;
private long currentOffset;
private FSDataInputStream currentStream;
private UnderlyingBlock currentBlock;
private byte[] oneBytebuff = new byte[1];
private int nextLocation;
private DistributedRaidFileSystem lfs;
private Path path;
private FileStatus stat;
private final DecodeInfo[] alternates;
private final int buffersize;
private final Configuration conf;
private final int stripeLength;
ExtFsInputStream(Configuration conf, DistributedRaidFileSystem lfs,
DecodeInfo[] alternates, Path path, int stripeLength, int buffersize)
throws IOException {
this.path = path;
this.nextLocation = 0;
// Construct array of blocks in file.
this.stat = lfs.getFileStatus(path);
long numBlocks = (this.stat.getLen() % this.stat.getBlockSize() == 0) ?
this.stat.getLen() / this.stat.getBlockSize() :
1 + this.stat.getLen() / this.stat.getBlockSize();
this.underlyingBlocks = new UnderlyingBlock[(int)numBlocks];
for (int i = 0; i < numBlocks; i++) {
long actualFileOffset = i * stat.getBlockSize();
long originalFileOffset = i * stat.getBlockSize();
long length = Math.min(
stat.getBlockSize(), stat.getLen() - originalFileOffset);
this.underlyingBlocks[i] = new UnderlyingBlock(
path, actualFileOffset, originalFileOffset, length);
}
this.currentOffset = 0;
this.currentBlock = null;
this.alternates = alternates;
this.buffersize = buffersize;
this.conf = conf;
this.lfs = lfs;
this.stripeLength = stripeLength;
// Open a stream to the first block.
openCurrentStream();
}
private void closeCurrentStream() throws IOException {
if (currentStream != null) {
currentStream.close();
currentStream = null;
}
}
/**
* Open a stream to the file containing the current block
* and seek to the appropriate offset
*/
private void openCurrentStream() throws IOException {
int blockIdx = (int)(currentOffset/stat.getBlockSize());
UnderlyingBlock block = underlyingBlocks[blockIdx];
// If the current path is the same as we want.
if (currentBlock == block ||
currentBlock != null && currentBlock.path == block.path) {
// If we have a valid stream, nothing to do.
if (currentStream != null) {
currentBlock = block;
return;
}
} else {
closeCurrentStream();
}
currentBlock = block;
currentStream = lfs.fs.open(currentBlock.path, buffersize);
long offset = block.actualFileOffset +
(currentOffset - block.originalFileOffset);
currentStream.seek(offset);
}
/**
* Returns the number of bytes available in the current block.
*/
private int blockAvailable() {
return (int) (currentBlock.length -
(currentOffset - currentBlock.originalFileOffset));
}
@Override
public synchronized int available() throws IOException {
// Application should not assume that any bytes are buffered here.
nextLocation = 0;
return Math.min(blockAvailable(), currentStream.available());
}
@Override
public synchronized void close() throws IOException {
closeCurrentStream();
super.close();
}
@Override
public boolean markSupported() { return false; }
@Override
public void mark(int readLimit) {
// Mark and reset are not supported.
nextLocation = 0;
}
@Override
public void reset() throws IOException {
// Mark and reset are not supported.
nextLocation = 0;
}
@Override
public synchronized int read() throws IOException {
int value = read(oneBytebuff);
if (value < 0) {
return value;
} else {
return oneBytebuff[0];
}
}
@Override
public synchronized int read(byte[] b) throws IOException {
int value = read(b, 0, b.length);
nextLocation = 0;
return value;
}
@Override
public synchronized int read(byte[] b, int offset, int len)
throws IOException {
while (true) {
openCurrentStream();
try{
int limit = Math.min(blockAvailable(), len);
int value = currentStream.read(b, offset, limit);
currentOffset += value;
nextLocation = 0;
return value;
} catch (BlockMissingException e) {
setAlternateLocations(e, currentOffset);
} catch (ChecksumException e) {
setAlternateLocations(e, currentOffset);
}
}
}
@Override
public synchronized int read(long position, byte[] b, int offset, int len)
throws IOException {
long oldPos = currentOffset;
seek(position);
try {
return read(b, offset, len);
} finally {
seek(oldPos);
}
}
@Override
public synchronized long skip(long n) throws IOException {
long skipped = 0;
while (skipped < n) {
int val = read();
if (val < 0) {
break;
}
skipped++;
}
nextLocation = 0;
return skipped;
}
@Override
public synchronized long getPos() throws IOException {
nextLocation = 0;
return currentOffset;
}
@Override
public synchronized void seek(long pos) throws IOException {
if (pos != currentOffset) {
closeCurrentStream();
currentOffset = pos;
openCurrentStream();
}
nextLocation = 0;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
seek(targetPos);
boolean value = currentStream.seekToNewSource(currentStream.getPos());
nextLocation = 0;
return value;
}
/**
* position readable again.
*/
@Override
public void readFully(long pos, byte[] b, int offset, int length)
throws IOException {
long oldPos = currentOffset;
seek(pos);
try {
while (true) {
// This loop retries reading until successful. Unrecoverable errors
// cause exceptions.
// currentOffset is changed by read().
try {
while (length > 0) {
int n = read(b, offset, length);
if (n < 0) {
throw new IOException("Premature EOF");
}
offset += n;
length -= n;
}
nextLocation = 0;
return;
} catch (BlockMissingException e) {
setAlternateLocations(e, currentOffset);
} catch (ChecksumException e) {
setAlternateLocations(e, currentOffset);
}
}
} finally {
seek(oldPos);
}
}
@Override
public void readFully(long pos, byte[] b) throws IOException {
readFully(pos, b, 0, b.length);
nextLocation = 0;
}
/**
* Extract good block from RAID
* @throws IOException if all alternate locations are exhausted
*/
private void setAlternateLocations(IOException curexp, long offset)
throws IOException {
while (alternates != null && nextLocation < alternates.length) {
try {
int idx = nextLocation++;
// Start offset of block.
long corruptOffset =
(offset / stat.getBlockSize()) * stat.getBlockSize();
// Make sure we use DFS and not DistributedRaidFileSystem for unRaid.
Configuration clientConf = new Configuration(conf);
Class<?> clazz = conf.getClass("fs.raid.underlyingfs.impl",
DistributedFileSystem.class);
clientConf.set("fs.hdfs.impl", clazz.getName());
// Disable caching so that a previously cached RaidDfs is not used.
clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
Path npath = RaidNode.unRaidCorruptBlock(clientConf, path,
alternates[idx].destPath,
alternates[idx].createDecoder(),
stripeLength, corruptOffset);
if (npath == null)
continue;
try {
String outdir = conf.get("fs.raid.recoverylogdir");
if (outdir != null) {
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
java.util.Date date = new java.util.Date();
String fname = path.getName() + dateFormat.format(date) +
(new Random()).nextInt() + ".txt";
Path outputunraid = new Path(outdir, fname);
FileSystem fs = outputunraid.getFileSystem(conf);
FSDataOutputStream dout = fs.create(outputunraid);
PrintStream ps = new PrintStream(dout);
ps.println("Recovery attempt log");
ps.println("Source path : " + path );
ps.println("Alternate path : " + alternates[idx].destPath);
ps.println("Stripe lentgh : " + stripeLength);
ps.println("Corrupt offset : " + corruptOffset);
String output = (npath==null) ? "UNSUCCESSFUL" : npath.toString();
ps.println("Output from unRaid : " + output);
ps.close();
}
} catch (Exception exc) {
LOG.info("Error while creating recovery log: " + exc);
}
closeCurrentStream();
LOG.info("Using block at offset " + corruptOffset + " from " +
npath);
currentBlock.path = npath;
currentBlock.actualFileOffset = 0; // Single block in file.
// Dont change currentOffset, in case the user had done a seek?
openCurrentStream();
return;
} catch (Exception e) {
LOG.info("Error in using alternate path " + path + ". " + e +
" Ignoring...");
}
}
throw curexp;
}
/**
* The name of the file system that is immediately below the
* DistributedRaidFileSystem. This is specified by the
* configuration parameter called fs.raid.underlyingfs.impl.
* If this parameter is not specified in the configuration, then
* the default class DistributedFileSystem is returned.
* @param conf the configuration object
* @return the filesystem object immediately below DistributedRaidFileSystem
* @throws IOException if all alternate locations are exhausted
*/
private FileSystem getUnderlyingFileSystem(Configuration conf) {
Class<?> clazz = conf.getClass("fs.raid.underlyingfs.impl", DistributedFileSystem.class);
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
return fs;
}
}
/**
* constructor for ext input stream.
* @param fs the underlying filesystem
* @param p the path in the underlying file system
* @param buffersize the size of IO
* @throws IOException
*/
public ExtFSDataInputStream(Configuration conf, DistributedRaidFileSystem lfs,
DecodeInfo[] alternates, Path p, int stripeLength, int buffersize) throws IOException {
super(new ExtFsInputStream(conf, lfs, alternates, p, stripeLength, buffersize));
}
}
}