MAPREDUCE-2132. A command line option in RaidShell to fix blocks using raid
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1026137 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 79800b7..4ea8e40 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -138,6 +138,8 @@
MAPREDUCE-1819. RaidNode is now smarter in submitting Raid jobs. (Ramkumar
Vadali via schen)
+ MAPREDUCE-2132. A command line option in RaidShell to fix blocks using raid
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
new file mode 100644
index 0000000..364ac12
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/RaidDFSUtil.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+
+public abstract class RaidDFSUtil {
+ /**
+ * Returns the corrupt blocks in a file.
+ */
+ public static List<LocatedBlock> corruptBlocksInFile(
+ DistributedFileSystem dfs, String path, long offset, long length)
+ throws IOException {
+ List<LocatedBlock> corrupt = new LinkedList<LocatedBlock>();
+ LocatedBlocks locatedBlocks =
+ getBlockLocations(dfs, path, offset, length);
+ for (LocatedBlock b: locatedBlocks.getLocatedBlocks()) {
+ if (b.isCorrupt() ||
+ (b.getLocations().length == 0 && b.getBlockSize() > 0)) {
+ corrupt.add(b);
+ }
+ }
+ return corrupt;
+ }
+
+ public static LocatedBlocks getBlockLocations(
+ DistributedFileSystem dfs, String path, long offset, long length)
+ throws IOException {
+ return dfs.getClient().namenode.getBlockLocations(path, offset, length);
+ }
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
new file mode 100644
index 0000000..2790157
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Reads a block from the disk and sends it to a recipient.
+ */
+public class RaidBlockSender implements java.io.Closeable, FSConstants {
+ public static final Log LOG = DataNode.LOG;
+ static final Log ClientTraceLog = DataNode.ClientTraceLog;
+
+ private Block block; // the block to read from
+
+ /** the replica to read from */
+ private final Replica replica = null;
+ /** The visible length of a replica. */
+ private final long replicaVisibleLength;
+
+ private InputStream blockIn; // data stream
+ private long blockInPosition = -1; // updated while using transferTo().
+ private DataInputStream checksumIn; // checksum datastream
+ private DataChecksum checksum; // checksum stream
+ private long offset; // starting position to read
+ private long endOffset; // ending position
+ private int bytesPerChecksum; // chunk size
+ private int checksumSize; // checksum size
+ private boolean corruptChecksumOk; // if need to verify checksum
+ private boolean chunkOffsetOK; // if need to send chunk offset
+ private long seqno; // sequence number of packet
+
+ private boolean transferToAllowed = true;
+ private boolean blockReadFully; //set when the whole block is read
+ private boolean verifyChecksum; //if true, check is verified while reading
+ private BlockTransferThrottler throttler;
+ private final String clientTraceFmt; // format of client trace log message
+
+ /**
+ * Minimum buffer used while sending data to clients. Used only if
+ * transferTo() is enabled. 64KB is not that large. It could be larger, but
+ * not sure if there will be much more improvement.
+ */
+ private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+ private volatile ChunkChecksum lastChunkChecksum = null;
+
+
+ public RaidBlockSender(Block block, long blockLength, long startOffset, long length,
+ boolean corruptChecksumOk, boolean chunkOffsetOK,
+ boolean verifyChecksum, boolean transferToAllowed,
+ DataInputStream metadataIn, InputStreamFactory streamFactory
+ ) throws IOException {
+ this(block, blockLength, startOffset, length,
+ corruptChecksumOk, chunkOffsetOK,
+ verifyChecksum, transferToAllowed,
+ metadataIn, streamFactory, null);
+ }
+
+ public RaidBlockSender(Block block, long blockLength, long startOffset, long length,
+ boolean corruptChecksumOk, boolean chunkOffsetOK,
+ boolean verifyChecksum, boolean transferToAllowed,
+ DataInputStream metadataIn, InputStreamFactory streamFactory,
+ String clientTraceFmt) throws IOException {
+ try {
+ this.block = block;
+ ChunkChecksum chunkChecksum = null;
+ this.chunkOffsetOK = chunkOffsetOK;
+ this.corruptChecksumOk = corruptChecksumOk;
+ this.verifyChecksum = verifyChecksum;
+ this.replicaVisibleLength = blockLength;
+ this.transferToAllowed = transferToAllowed;
+ this.clientTraceFmt = clientTraceFmt;
+
+ if ( !corruptChecksumOk || metadataIn != null) {
+ this.checksumIn = metadataIn;
+
+ // read and handle the common header here. For now just a version
+ BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+ short version = header.getVersion();
+
+ if (version != FSDataset.METADATA_VERSION) {
+ LOG.warn("Wrong version (" + version + ") for metadata file for "
+ + block + " ignoring ...");
+ }
+ checksum = header.getChecksum();
+ } else {
+ LOG.warn("Could not find metadata file for " + block);
+ // This only decides the buffer size. Use BUFFER_SIZE?
+ checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
+ 16 * 1024);
+ }
+
+ /* If bytesPerChecksum is very large, then the metadata file
+ * is mostly corrupted. For now just truncate bytesPerchecksum to
+ * blockLength.
+ */
+ bytesPerChecksum = checksum.getBytesPerChecksum();
+ if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
+ checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+ Math.max((int)replicaVisibleLength, 10*1024*1024));
+ bytesPerChecksum = checksum.getBytesPerChecksum();
+ }
+ checksumSize = checksum.getChecksumSize();
+
+ if (length < 0) {
+ length = replicaVisibleLength;
+ }
+
+ // end is either last byte on disk or the length for which we have a
+ // checksum
+ if (chunkChecksum != null) {
+ endOffset = chunkChecksum.getDataLength();
+ } else {
+ endOffset = blockLength;
+ }
+
+ if (startOffset < 0 || startOffset > endOffset
+ || (length + startOffset) > endOffset) {
+ String msg = " Offset " + startOffset + " and length " + length
+ + " don't match block " + block + " ( blockLen " + endOffset + " )";
+ LOG.warn("sendBlock() : " + msg);
+ throw new IOException(msg);
+ }
+
+ offset = (startOffset - (startOffset % bytesPerChecksum));
+ if (length >= 0) {
+ // Make sure endOffset points to end of a checksumed chunk.
+ long tmpLen = startOffset + length;
+ if (tmpLen % bytesPerChecksum != 0) {
+ tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+ }
+ if (tmpLen < endOffset) {
+ // will use on-disk checksum here since the end is a stable chunk
+ endOffset = tmpLen;
+ } else if (chunkChecksum != null) {
+ //in last chunk which is changing. flag that we need to use in-memory
+ // checksum
+ this.lastChunkChecksum = chunkChecksum;
+ }
+ }
+
+ // seek to the right offsets
+ if (offset > 0) {
+ long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
+ // note blockInStream is seeked when created below
+ if (checksumSkip > 0) {
+ // Should we use seek() for checksum file as well?
+ IOUtils.skipFully(checksumIn, checksumSkip);
+ }
+ }
+ seqno = 0;
+
+ blockIn = streamFactory.createStream(offset);
+ } catch (IOException ioe) {
+ IOUtils.closeStream(this);
+ IOUtils.closeStream(blockIn);
+ throw ioe;
+ }
+ }
+
+ /**
+ * close opened files.
+ */
+ public void close() throws IOException {
+ IOException ioe = null;
+ // close checksum file
+ if(checksumIn!=null) {
+ try {
+ checksumIn.close();
+ } catch (IOException e) {
+ ioe = e;
+ }
+ checksumIn = null;
+ }
+ // close data file
+ if(blockIn!=null) {
+ try {
+ blockIn.close();
+ } catch (IOException e) {
+ ioe = e;
+ }
+ blockIn = null;
+ }
+ // throw IOException if there is any
+ if(ioe!= null) {
+ throw ioe;
+ }
+ }
+
+ /**
+ * Converts an IOExcpetion (not subclasses) to SocketException.
+ * This is typically done to indicate to upper layers that the error
+ * was a socket error rather than often more serious exceptions like
+ * disk errors.
+ */
+ private static IOException ioeToSocketException(IOException ioe) {
+ if (ioe.getClass().equals(IOException.class)) {
+ // "se" could be a new class in stead of SocketException.
+ IOException se = new SocketException("Original Exception : " + ioe);
+ se.initCause(ioe);
+ /* Change the stacktrace so that original trace is not truncated
+ * when printed.*/
+ se.setStackTrace(ioe.getStackTrace());
+ return se;
+ }
+ // otherwise just return the same exception.
+ return ioe;
+ }
+
+ /**
+ * Sends upto maxChunks chunks of data.
+ *
+ * When blockInPosition is >= 0, assumes 'out' is a
+ * {@link SocketOutputStream} and tries
+ * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
+ * send data (and updates blockInPosition).
+ */
+ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
+ throws IOException {
+ // Sends multiple chunks in one packet with a single write().
+
+ int len = (int) Math.min(endOffset - offset,
+ (((long) bytesPerChecksum) * ((long) maxChunks)));
+ int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
+ int packetLen = len + numChunks*checksumSize + 4;
+ boolean lastDataPacket = offset + len == endOffset && len > 0;
+ pkt.clear();
+
+
+ PacketHeader header = new PacketHeader(
+ packetLen, offset, seqno, (len == 0), len);
+ header.putInBuffer(pkt);
+
+ int checksumOff = pkt.position();
+ int checksumLen = numChunks * checksumSize;
+ byte[] buf = pkt.array();
+
+ if (checksumSize > 0 && checksumIn != null) {
+ try {
+ checksumIn.readFully(buf, checksumOff, checksumLen);
+ } catch (IOException e) {
+ LOG.warn(" Could not read or failed to veirfy checksum for data" +
+ " at offset " + offset + " for block " + block + " got : "
+ + StringUtils.stringifyException(e));
+ IOUtils.closeStream(checksumIn);
+ checksumIn = null;
+ if (corruptChecksumOk) {
+ if (checksumOff < checksumLen) {
+ // Just fill the array with zeros.
+ Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
+ }
+ } else {
+ throw e;
+ }
+ }
+
+ // write in progress that we need to use to get last checksum
+ if (lastDataPacket && lastChunkChecksum != null) {
+ int start = checksumOff + checksumLen - checksumSize;
+ byte[] updatedChecksum = lastChunkChecksum.getChecksum();
+
+ if (updatedChecksum != null) {
+ System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
+ }
+ }
+ }
+
+ int dataOff = checksumOff + checksumLen;
+
+ if (blockInPosition < 0) {
+ //normal transfer
+ IOUtils.readFully(blockIn, buf, dataOff, len);
+
+ if (verifyChecksum) {
+ int dOff = dataOff;
+ int cOff = checksumOff;
+ int dLeft = len;
+
+ for (int i=0; i<numChunks; i++) {
+ checksum.reset();
+ int dLen = Math.min(dLeft, bytesPerChecksum);
+ checksum.update(buf, dOff, dLen);
+ if (!checksum.compare(buf, cOff)) {
+ long failedPos = offset + len -dLeft;
+ throw new ChecksumException("Checksum failed at " +
+ failedPos, failedPos);
+ }
+ dLeft -= dLen;
+ dOff += dLen;
+ cOff += checksumSize;
+ }
+ }
+ //writing is done below (mainly to handle IOException)
+ }
+
+ try {
+ if (blockInPosition >= 0) {
+ //use transferTo(). Checks on out and blockIn are already done.
+
+ SocketOutputStream sockOut = (SocketOutputStream)out;
+ //first write the packet
+ sockOut.write(buf, 0, dataOff);
+ // no need to flush. since we know out is not a buffered stream.
+
+ sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
+ blockInPosition, len);
+
+ blockInPosition += len;
+ } else {
+ // normal transfer
+ out.write(buf, 0, dataOff + len);
+ }
+
+ } catch (IOException e) {
+ /* exception while writing to the client (well, with transferTo(),
+ * it could also be while reading from the local file).
+ */
+ throw ioeToSocketException(e);
+ }
+
+ if (throttler != null) { // rebalancing so throttle
+ throttler.throttle(packetLen);
+ }
+
+ return len;
+ }
+
+ /**
+ * sendBlock() is used to read block and its metadata and stream the data to
+ * either a client or to another datanode.
+ *
+ * @param out stream to which the block is written to
+ * @param baseStream optional. if non-null, <code>out</code> is assumed to
+ * be a wrapper over this stream. This enables optimizations for
+ * sending the data, e.g.
+ * {@link SocketOutputStream#transferToFully(FileChannel,
+ * long, int)}.
+ * @param throttler for sending data.
+ * @return total bytes reads, including crc.
+ */
+ public long sendBlock(DataOutputStream out, OutputStream baseStream,
+ BlockTransferThrottler throttler) throws IOException {
+ if( out == null ) {
+ throw new IOException( "out stream is null" );
+ }
+ this.throttler = throttler;
+
+ long initialOffset = offset;
+ long totalRead = 0;
+ OutputStream streamForSendChunks = out;
+
+ final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
+ try {
+ try {
+ checksum.writeHeader(out);
+ if ( chunkOffsetOK ) {
+ out.writeLong( offset );
+ }
+ out.flush();
+ } catch (IOException e) { //socket error
+ throw ioeToSocketException(e);
+ }
+
+ int maxChunksPerPacket;
+ int pktSize = PacketHeader.PKT_HEADER_LEN;
+
+ if (transferToAllowed && !verifyChecksum &&
+ baseStream instanceof SocketOutputStream &&
+ blockIn instanceof FileInputStream) {
+
+ FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
+
+ // blockInPosition also indicates sendChunks() uses transferTo.
+ blockInPosition = fileChannel.position();
+ streamForSendChunks = baseStream;
+
+ // assure a mininum buffer size.
+ maxChunksPerPacket = (Math.max(BUFFER_SIZE,
+ MIN_BUFFER_WITH_TRANSFERTO)
+ + bytesPerChecksum - 1)/bytesPerChecksum;
+
+ // allocate smaller buffer while using transferTo().
+ pktSize += checksumSize * maxChunksPerPacket;
+ } else {
+ maxChunksPerPacket = Math.max(1,
+ (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
+ pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+ }
+
+ ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
+
+ while (endOffset > offset) {
+ long len = sendChunks(pktBuf, maxChunksPerPacket,
+ streamForSendChunks);
+ offset += len;
+ totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
+ checksumSize);
+ seqno++;
+ }
+ try {
+ // send an empty packet to mark the end of the block
+ sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);
+ out.flush();
+ } catch (IOException e) { //socket error
+ throw ioeToSocketException(e);
+ }
+ } finally {
+ if (clientTraceFmt != null) {
+ final long endTime = System.nanoTime();
+ ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime));
+ }
+ close();
+ }
+
+ blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
+
+ return totalRead;
+ }
+
+ boolean isBlockReadFully() {
+ return blockReadFully;
+ }
+
+ public static interface InputStreamFactory {
+ public InputStream createStream(long offset) throws IOException;
+ }
+
+ private static class BlockInputStreamFactory implements InputStreamFactory {
+ private final Block block;
+ private final FSDatasetInterface data;
+
+ private BlockInputStreamFactory(Block block, FSDatasetInterface data) {
+ this.block = block;
+ this.data = data;
+ }
+
+ @Override
+ public InputStream createStream(long offset) throws IOException {
+ return data.getBlockInputStream(block, offset);
+ }
+ }
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
new file mode 100644
index 0000000..bd16ab7
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
@@ -0,0 +1,653 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.Random;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.SocketChannel;
+
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.hdfs.server.datanode.RaidBlockSender;
+import org.apache.hadoop.io.Text;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.net.NetUtils;
+
+import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.RaidUtils;
+
+/**
+ * This class fixes source file blocks using the parity file,
+ * and parity file blocks using the source file.
+ * It periodically fetches the list of corrupt files from the namenode,
+ * and figures out the location of the bad block by reading through
+ * the corrupt file.
+ */
+public class BlockFixer {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.BlockFixer");
+ private java.util.HashMap<String, java.util.Date> history;
+ private int blockFixInterval = 60*1000; // 1min
+ private long numFilesFixed = 0;
+ private Configuration conf;
+ private String xorPrefix;
+ private XOREncoder xorEncoder;
+ private XORDecoder xorDecoder;
+
+ public BlockFixer(Configuration conf) throws IOException {
+ this.conf = conf;
+ history = new java.util.HashMap<String, java.util.Date>();
+ blockFixInterval = conf.getInt("raid.blockfix.interval",
+ blockFixInterval);
+ xorPrefix = RaidNode.getDestinationPath(conf).toUri().getPath();
+ int stripeLength = RaidNode.getStripeLength(conf);
+ xorEncoder = new XOREncoder(conf, stripeLength);
+ xorDecoder = new XORDecoder(conf, stripeLength);
+ }
+
+ public long filesFixed() {
+ return numFilesFixed;
+ }
+
+ void fixFile(Path srcPath) throws IOException {
+ if (RaidNode.isParityHarPartFile(srcPath)) {
+ processCorruptParityHarPartFile(srcPath);
+ return;
+ }
+
+ // The corrupted file is a XOR parity file
+ if (isXorParityFile(srcPath)) {
+ processCorruptParityFile(srcPath, xorEncoder);
+ return;
+ }
+
+ // The corrupted file is a source file
+
+ // Do we have a parity file for this file?
+ RaidNode.ParityFilePair ppair = null;
+ Decoder decoder = null;
+ Path destPath = null;
+ try {
+ destPath = RaidNode.getDestinationPath(conf);
+ ppair = RaidNode.getParityFile(destPath, srcPath, conf);
+ if (ppair != null) {
+ decoder = xorDecoder;
+ }
+ } catch (FileNotFoundException e) {
+ }
+ // If we have a parity file, process the file and fix it.
+ if (ppair != null) {
+ processCorruptFile(srcPath, destPath, decoder);
+ }
+ }
+
+ /**
+ * We maintain history of fixed files because a fixed file may appear in
+ * the list of corrupt files if we loop around too quickly.
+ * This function removes the old items in the history so that we can
+ * recognize files that have actually become corrupt since being fixed.
+ */
+ void purgeHistory() {
+ // Default history interval is 1 hour.
+ long historyInterval = conf.getLong(
+ "raid.blockfix.history.interval", 3600*1000);
+ java.util.Date cutOff = new java.util.Date(
+ System.currentTimeMillis()-historyInterval);
+ List<String> toRemove = new java.util.ArrayList<String>();
+
+ for (String key: history.keySet()) {
+ java.util.Date item = history.get(key);
+ if (item.before(cutOff)) {
+ toRemove.add(key);
+ }
+ }
+ for (String key: toRemove) {
+ LOG.info("Removing " + key + " from history");
+ history.remove(key);
+ }
+ }
+
+ /**
+ * @return A list of corrupt files as obtained from the namenode
+ */
+ List<Path> getCorruptFiles() throws IOException {
+ DistributedFileSystem dfs = getDFS(new Path("/"));
+
+ // TODO: need an RPC here.
+ // FileStatus[] files = dfs.getClient().namenode.getCorruptFiles();
+ FileStatus[] files = new FileStatus[0];
+ List<Path> corruptFiles = new LinkedList<Path>();
+ for (FileStatus f: files) {
+ Path p = f.getPath();
+ if (!history.containsKey(p.toString())) {
+ corruptFiles.add(p);
+ }
+ }
+ RaidUtils.filterTrash(conf, corruptFiles);
+ return corruptFiles;
+ }
+
+ /**
+ * Sorts source files ahead of parity files.
+ */
+ void sortCorruptFiles(List<Path> files) {
+ // TODO: We should first fix the files that lose more blocks
+ Comparator<Path> comp = new Comparator<Path>() {
+ public int compare(Path p1, Path p2) {
+ if (isXorParityFile(p2)) {
+ // If p2 is a parity file, p1 is smaller.
+ return -1;
+ }
+ if (isXorParityFile(p1)) {
+ // If p1 is a parity file, p2 is smaller.
+ return 1;
+ }
+ // If both are source files, they are equal.
+ return 0;
+ }
+ };
+ Collections.sort(files, comp);
+ }
+
+
+ /**
+ * Returns a DistributedFileSystem hosting the path supplied.
+ */
+ private DistributedFileSystem getDFS(Path p) throws IOException {
+ return (DistributedFileSystem) p.getFileSystem(conf);
+ }
+
+ /**
+ * Reads through a corrupt source file fixing corrupt blocks on the way.
+ * @param srcPath Path identifying the corrupt file.
+ * @throws IOException
+ */
+ void processCorruptFile(Path srcPath, Path destPath, Decoder decoder)
+ throws IOException {
+ LOG.info("Processing corrupt file " + srcPath);
+
+ DistributedFileSystem srcFs = getDFS(srcPath);
+ FileStatus srcStat = srcFs.getFileStatus(srcPath);
+ long blockSize = srcStat.getBlockSize();
+ long srcFileSize = srcStat.getLen();
+ String uriPath = srcPath.toUri().getPath();
+
+ int numBlocksFixed = 0;
+ List<LocatedBlock> corrupt =
+ RaidDFSUtil.corruptBlocksInFile(srcFs, uriPath, 0, srcFileSize);
+ for (LocatedBlock lb: corrupt) {
+ Block corruptBlock = lb.getBlock();
+ long corruptOffset = lb.getStartOffset();
+
+ LOG.info("Found corrupt block " + corruptBlock +
+ ", offset " + corruptOffset);
+
+ final long blockContentsSize =
+ Math.min(blockSize, srcFileSize - corruptOffset);
+ File localBlockFile =
+ File.createTempFile(corruptBlock.getBlockName(), ".tmp");
+ localBlockFile.deleteOnExit();
+
+ try {
+ RaidNode.ParityFilePair parityPair = RaidNode.getParityFile(
+ destPath, srcPath, conf);
+
+ decoder.recoverBlockToFile(srcFs, srcPath, parityPair.getFileSystem(),
+ parityPair.getPath(), blockSize, corruptOffset, localBlockFile,
+ blockContentsSize);
+
+ // We have a the contents of the block, send them.
+ DatanodeInfo datanode = chooseDatanode(lb.getLocations());
+ computeMetdataAndSendFixedBlock(
+ datanode, localBlockFile, lb, blockContentsSize);
+ numBlocksFixed++;
+
+ LOG.info("Adding " + srcPath + " to history");
+ history.put(srcPath.toString(), new java.util.Date());
+ } finally {
+ localBlockFile.delete();
+ }
+ }
+ LOG.info("Fixed " + numBlocksFixed + " blocks in " + srcPath);
+ numFilesFixed++;
+ }
+
+ boolean isXorParityFile(Path p) {
+ String pathStr = p.toUri().getPath();
+ if (pathStr.contains(RaidNode.HAR_SUFFIX)) {
+ return false;
+ }
+ return pathStr.startsWith(xorPrefix);
+ }
+
+ /**
+ * Reads through a parity file, fixing corrupt blocks on the way.
+ * This function uses the corresponding source file to regenerate parity
+ * file blocks.
+ */
+ void processCorruptParityFile(Path parityPath, Encoder encoder)
+ throws IOException {
+ LOG.info("Processing corrupt file " + parityPath);
+ Path srcPath = sourcePathFromParityPath(parityPath);
+ if (srcPath == null) {
+ LOG.warn("Unusable parity file " + parityPath);
+ return;
+ }
+
+ DistributedFileSystem parityFs = getDFS(parityPath);
+ FileStatus parityStat = parityFs.getFileStatus(parityPath);
+ long blockSize = parityStat.getBlockSize();
+ long parityFileSize = parityStat.getLen();
+ FileStatus srcStat = getDFS(srcPath).getFileStatus(srcPath);
+ long srcFileSize = srcStat.getLen();
+
+ // Check timestamp.
+ if (srcStat.getModificationTime() != parityStat.getModificationTime()) {
+ LOG.info("Mismatching timestamp for " + srcPath + " and " + parityPath +
+ ", moving on...");
+ return;
+ }
+
+ String uriPath = parityPath.toUri().getPath();
+ int numBlocksFixed = 0;
+ List<LocatedBlock> corrupt = RaidDFSUtil.corruptBlocksInFile(
+ parityFs, uriPath, 0, parityFileSize);
+ for (LocatedBlock lb: corrupt) {
+ Block corruptBlock = lb.getBlock();
+ long corruptOffset = lb.getStartOffset();
+
+ LOG.info("Found corrupt block " + corruptBlock +
+ ", offset " + corruptOffset);
+
+ File localBlockFile =
+ File.createTempFile(corruptBlock.getBlockName(), ".tmp");
+ localBlockFile.deleteOnExit();
+
+ try {
+ encoder.recoverParityBlockToFile(parityFs, srcPath, srcFileSize,
+ blockSize, parityPath, corruptOffset, localBlockFile);
+ // We have a the contents of the block, send them.
+ DatanodeInfo datanode = chooseDatanode(lb.getLocations());
+ computeMetdataAndSendFixedBlock(
+ datanode, localBlockFile, lb, blockSize);
+
+ numBlocksFixed++;
+ LOG.info("Adding " + parityPath + " to history");
+ history.put(parityPath.toString(), new java.util.Date());
+ } finally {
+ localBlockFile.delete();
+ }
+ }
+ LOG.info("Fixed " + numBlocksFixed + " blocks in " + parityPath);
+ numFilesFixed++;
+ }
+
+ /**
+ * Reads through a parity HAR part file, fixing corrupt blocks on the way.
+ * A HAR block can contain many file blocks, as long as the HAR part file
+ * block size is a multiple of the file block size.
+ */
+ void processCorruptParityHarPartFile(Path partFile) throws IOException {
+ LOG.info("Processing corrupt file " + partFile);
+ // Get some basic information.
+ DistributedFileSystem dfs = getDFS(partFile);
+ FileStatus partFileStat = dfs.getFileStatus(partFile);
+ long partFileSize = partFileStat.getLen();
+ long partFileBlockSize = partFileStat.getBlockSize();
+ LOG.info(partFile + " has block size " + partFileBlockSize);
+
+ // Find the path to the index file.
+ // Parity file HARs are only one level deep, so the index files is at the
+ // same level as the part file.
+ String harDirectory = partFile.toUri().getPath(); // Temporarily.
+ harDirectory =
+ harDirectory.substring(0, harDirectory.lastIndexOf(Path.SEPARATOR));
+ Path indexFile = new Path(harDirectory + "/" + HarIndex.indexFileName);
+ FileStatus indexStat = dfs.getFileStatus(indexFile);
+ // Parses through the HAR index file.
+ HarIndex harIndex = new HarIndex(dfs.open(indexFile), indexStat.getLen());
+
+ String uriPath = partFile.toUri().getPath();
+ int numBlocksFixed = 0;
+ List<LocatedBlock> corrupt = RaidDFSUtil.corruptBlocksInFile(
+ dfs, uriPath, 0, partFileSize);
+ for (LocatedBlock lb: corrupt) {
+ Block corruptBlock = lb.getBlock();
+ long corruptOffset = lb.getStartOffset();
+
+ File localBlockFile =
+ File.createTempFile(corruptBlock.getBlockName(), ".tmp");
+ localBlockFile.deleteOnExit();
+ processCorruptParityHarPartBlock(
+ dfs, partFile, corruptBlock, corruptOffset, partFileStat, harIndex,
+ localBlockFile);
+ // Now we have recovered the part file block locally, send it.
+ try {
+ DatanodeInfo datanode = chooseDatanode(lb.getLocations());
+ computeMetdataAndSendFixedBlock(datanode, localBlockFile,
+ lb, localBlockFile.length());
+ numBlocksFixed++;
+
+ LOG.info("Adding " + partFile + " to history");
+ history.put(partFile.toString(), new java.util.Date());
+ } finally {
+ localBlockFile.delete();
+ }
+ }
+ LOG.info("Fixed " + numBlocksFixed + " blocks in " + partFile);
+ numFilesFixed++;
+ }
+
+ /**
+ * This fixes a single part file block by recovering in sequence each
+ * parity block in the part file block.
+ */
+ private void processCorruptParityHarPartBlock(
+ FileSystem dfs, Path partFile, Block corruptBlock, long corruptOffset,
+ FileStatus partFileStat, HarIndex harIndex, File localBlockFile)
+ throws IOException {
+ String partName = partFile.toUri().getPath(); // Temporarily.
+ partName = partName.substring(1 + partName.lastIndexOf(Path.SEPARATOR));
+
+ OutputStream out = new FileOutputStream(localBlockFile);
+
+ try {
+ // A HAR part file block could map to several parity files. We need to
+ // use all of them to recover this block.
+ final long corruptEnd = Math.min(corruptOffset + partFileStat.getBlockSize(),
+ partFileStat.getLen());
+ for (long offset = corruptOffset; offset < corruptEnd; ) {
+ HarIndex.IndexEntry entry = harIndex.findEntry(partName, offset);
+ if (entry == null) {
+ String msg = "Corrupt index file has no matching index entry for " +
+ partName + ":" + offset;
+ LOG.warn(msg);
+ throw new IOException(msg);
+ }
+ Path parityFile = new Path(entry.fileName);
+ Encoder encoder;
+ if (isXorParityFile(parityFile)) {
+ encoder = xorEncoder;
+ } else {
+ String msg = "Could not figure out parity file correctly";
+ LOG.warn(msg);
+ throw new IOException(msg);
+ }
+ Path srcFile = sourcePathFromParityPath(parityFile);
+ FileStatus srcStat = dfs.getFileStatus(srcFile);
+ if (srcStat.getModificationTime() != entry.mtime) {
+ String msg = "Modification times of " + parityFile + " and " + srcFile +
+ " do not match.";
+ LOG.warn(msg);
+ throw new IOException(msg);
+ }
+ long corruptOffsetInParity = offset - entry.startOffset;
+ LOG.info(partFile + ":" + offset + " maps to " +
+ parityFile + ":" + corruptOffsetInParity +
+ " and will be recovered from " + srcFile);
+ encoder.recoverParityBlockToStream(dfs, srcFile, srcStat.getLen(),
+ srcStat.getBlockSize(), parityFile, corruptOffsetInParity, out);
+ // Finished recovery of one parity block. Since a parity block has the
+ // same size as a source block, we can move offset by source block size.
+ offset += srcStat.getBlockSize();
+ LOG.info("Recovered " + srcStat.getBlockSize() + " part file bytes ");
+ if (offset > corruptEnd) {
+ String msg =
+ "Recovered block spills across part file blocks. Cannot continue...";
+ throw new IOException(msg);
+ }
+ }
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
+ * Choose a datanode (hostname:portnumber). The datanode is chosen at
+ * random from the live datanodes.
+ * @param locationsToAvoid locations to avoid.
+ * @return A string in the format name:port.
+ * @throws IOException
+ */
+ private DatanodeInfo chooseDatanode(DatanodeInfo[] locationsToAvoid)
+ throws IOException {
+ DistributedFileSystem dfs = getDFS(new Path("/"));
+ DatanodeInfo[] live = dfs.getClient().datanodeReport(
+ DatanodeReportType.LIVE);
+ LOG.info("Choosing a datanode from " + live.length +
+ " live nodes while avoiding " + locationsToAvoid.length);
+ Random rand = new Random();
+ DatanodeInfo chosen = null;
+ int maxAttempts = 1000;
+ for (int i = 0; i < maxAttempts && chosen == null; i++) {
+ int idx = rand.nextInt(live.length);
+ chosen = live[idx];
+ for (DatanodeInfo avoid: locationsToAvoid) {
+ if (chosen.name.equals(avoid.name)) {
+ LOG.info("Avoiding " + avoid.name);
+ chosen = null;
+ break;
+ }
+ }
+ }
+ if (chosen == null) {
+ throw new IOException("Could not choose datanode");
+ }
+ LOG.info("Choosing datanode " + chosen.name);
+ return chosen;
+ }
+
+ /**
+ * Reads data from the data stream provided and computes metadata.
+ */
+ static DataInputStream computeMetadata(
+ Configuration conf, InputStream dataStream) throws IOException {
+ ByteArrayOutputStream mdOutBase = new ByteArrayOutputStream(1024*1024);
+ DataOutputStream mdOut = new DataOutputStream(mdOutBase);
+
+ // First, write out the version.
+ mdOut.writeShort(FSDataset.METADATA_VERSION);
+
+ // Create a summer and write out its header.
+ int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
+ DataChecksum sum = DataChecksum.newDataChecksum(
+ DataChecksum.CHECKSUM_CRC32,
+ bytesPerChecksum);
+ sum.writeHeader(mdOut);
+
+ // Buffer to read in a chunk of data.
+ byte[] buf = new byte[bytesPerChecksum];
+ // Buffer to store the checksum bytes.
+ byte[] chk = new byte[sum.getChecksumSize()];
+
+ // Read data till we reach the end of the input stream.
+ int bytesSinceFlush = 0;
+ while (true) {
+ // Read some bytes.
+ int bytesRead = dataStream.read(
+ buf, bytesSinceFlush, bytesPerChecksum-bytesSinceFlush);
+ if (bytesRead == -1) {
+ if (bytesSinceFlush > 0) {
+ boolean reset = true;
+ sum.writeValue(chk, 0, reset); // This also resets the sum.
+ // Write the checksum to the stream.
+ mdOut.write(chk, 0, chk.length);
+ bytesSinceFlush = 0;
+ }
+ break;
+ }
+ // Update the checksum.
+ sum.update(buf, bytesSinceFlush, bytesRead);
+ bytesSinceFlush += bytesRead;
+
+ // Flush the checksum if necessary.
+ if (bytesSinceFlush == bytesPerChecksum) {
+ boolean reset = true;
+ sum.writeValue(chk, 0, reset); // This also resets the sum.
+ // Write the checksum to the stream.
+ mdOut.write(chk, 0, chk.length);
+ bytesSinceFlush = 0;
+ }
+ }
+
+ byte[] mdBytes = mdOutBase.toByteArray();
+ return new DataInputStream(new ByteArrayInputStream(mdBytes));
+ }
+
+ private void computeMetdataAndSendFixedBlock(
+ DatanodeInfo datanode,
+ File localBlockFile, LocatedBlock block, long blockSize
+ ) throws IOException {
+
+ LOG.info("Computing metdata");
+ InputStream blockContents = null;
+ DataInputStream blockMetadata = null;
+ try {
+ blockContents = new FileInputStream(localBlockFile);
+ blockMetadata = computeMetadata(conf, blockContents);
+ blockContents.close();
+ // Reopen
+ blockContents = new FileInputStream(localBlockFile);
+ sendFixedBlock(datanode, blockContents, blockMetadata, block, blockSize);
+ } finally {
+ if (blockContents != null) {
+ blockContents.close();
+ blockContents = null;
+ }
+ if (blockMetadata != null) {
+ blockMetadata.close();
+ blockMetadata = null;
+ }
+ }
+ }
+
+ /**
+ * Send a generated block to a datanode.
+ * @param datanode Chosen datanode name in host:port form.
+ * @param blockContents Stream with the block contents.
+ * @param corruptBlock Block identifying the block to be sent.
+ * @param blockSize size of the block.
+ * @throws IOException
+ */
+ private void sendFixedBlock(
+ DatanodeInfo datanode,
+ final InputStream blockContents, DataInputStream metadataIn,
+ LocatedBlock block, long blockSize
+ ) throws IOException {
+ InetSocketAddress target = NetUtils.createSocketAddr(datanode.name);
+ Socket sock = SocketChannel.open().socket();
+
+ int readTimeout = conf.getInt(
+ "raid.blockfix.read.timeout", HdfsConstants.READ_TIMEOUT);
+ NetUtils.connect(sock, target, readTimeout);
+ sock.setSoTimeout(readTimeout);
+
+ int writeTimeout = conf.getInt(
+ "raid.blockfix.write.timeout", HdfsConstants.WRITE_TIMEOUT);
+
+ OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
+ DataOutputStream out = new DataOutputStream(
+ new BufferedOutputStream(baseStream, FSConstants.SMALL_BUFFER_SIZE));
+
+ boolean corruptChecksumOk = false;
+ boolean chunkOffsetOK = false;
+ boolean verifyChecksum = true;
+ boolean transferToAllowed = false;
+
+ try {
+ LOG.info("Sending block " + block.getBlock() +
+ " from " + sock.getLocalSocketAddress().toString() +
+ " to " + sock.getRemoteSocketAddress().toString() +
+ " " + blockSize + " bytes");
+ RaidBlockSender blockSender = new RaidBlockSender(
+ block.getBlock(), blockSize, 0, blockSize,
+ corruptChecksumOk, chunkOffsetOK, verifyChecksum, transferToAllowed,
+ metadataIn, new RaidBlockSender.InputStreamFactory() {
+ @Override
+ public InputStream createStream(long offset) throws IOException {
+ // we are passing 0 as the offset above, so we can safely ignore
+ // the offset passed
+ return blockContents;
+ }
+ });
+
+ DatanodeInfo[] nodes = new DatanodeInfo[]{datanode};
+ DataTransferProtocol.Sender.opWriteBlock(
+ out, block.getBlock(), 1,
+ DataTransferProtocol.BlockConstructionStage.PIPELINE_SETUP_CREATE,
+ 0, blockSize, 0, "", null, nodes, block.getBlockToken());
+ blockSender.sendBlock(out, baseStream, null);
+
+ LOG.info("Sent block " + block.getBlock() + " to " + datanode.name);
+ } finally {
+ out.close();
+ }
+ }
+
+ Path sourcePathFromParityPath(Path parityPath) {
+ String parityPathStr = parityPath.toUri().getPath();
+ if (parityPathStr.startsWith(xorPrefix)) {
+ // Remove the prefix to get the source file.
+ String src = parityPathStr.replaceFirst(xorPrefix, "");
+ return new Path(src);
+ }
+ return null;
+ }
+}
+
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java
new file mode 100644
index 0000000..42233d7
--- /dev/null
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/HarIndex.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.List;
+import java.util.LinkedList;
+
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents the contents of a HAR Index file. The HAR is assumed to be
+ * comprising of RAID parity files only and no directories.
+ */
+public class HarIndex {
+ public static final String indexFileName = "_index";
+ private List<IndexEntry> entries = new LinkedList<IndexEntry>();
+
+ /**
+ * Represents information in a single line of the HAR index file.
+ */
+ public static class IndexEntry {
+ String fileName; // Name of the file in the part file.
+ long startOffset; // Start offset within the part file.
+ long length; // Length of this file within the part file.
+ long mtime; // Modification time of the file.
+ String partFileName; // Name of the part file.
+
+ IndexEntry(String fileName, long startOffset, long length,
+ long mtime, String partFileName) {
+ this.fileName = fileName;
+ this.startOffset = startOffset;
+ this.length = length;
+ this.mtime = mtime;
+ this.partFileName = partFileName;
+ }
+
+ public String toString() {
+ return "fileName=" + fileName +
+ ", startOffset=" + startOffset +
+ ", length=" + length +
+ ", mtime=" + mtime +
+ ", partFileName=" + partFileName;
+ }
+ }
+
+ /**
+ * Constructor that reads the contents of the index file.
+ * @param in An input stream to the index file.
+ * @param max The size of the index file.
+ * @throws IOException
+ */
+ public HarIndex(InputStream in, long max) throws IOException {
+ LineReader lineReader = new LineReader(in);
+ Text text = new Text();
+ long nread = 0;
+ while (nread < max) {
+ int n = lineReader.readLine(text);
+ nread += n;
+ String line = text.toString();
+ try {
+ parseLine(line);
+ } catch (UnsupportedEncodingException e) {
+ throw new IOException("UnsupportedEncodingException after reading " +
+ nread + "bytes");
+ }
+ }
+ }
+
+ /**
+ * Parses each line and extracts relevant information.
+ * @param line
+ * @throws UnsupportedEncodingException
+ */
+ void parseLine(String line) throws UnsupportedEncodingException {
+ String[] splits = line.split(" ");
+
+ boolean isDir = "dir".equals(splits[1]) ? true: false;
+ if (!isDir && splits.length >= 6) {
+ String name = URLDecoder.decode(splits[0], "UTF-8");
+ String partName = URLDecoder.decode(splits[2], "UTF-8");
+ long startIndex = Long.parseLong(splits[3]);
+ long length = Long.parseLong(splits[4]);
+ String[] newsplits = URLDecoder.decode(splits[5],"UTF-8").split(" ");
+ if (newsplits != null && newsplits.length >= 5) {
+ long mtime = Long.parseLong(newsplits[0]);
+ IndexEntry entry = new IndexEntry(
+ name, startIndex, length, mtime, partName);
+ entries.add(entry);
+ }
+ }
+ }
+
+ /**
+ * Finds the index entry corresponding to a HAR partFile at an offset.
+ * @param partName The name of the part file (part-*).
+ * @param partFileOffset The offset into the part file.
+ * @return The entry corresponding to partName:partFileOffset.
+ */
+ public IndexEntry findEntry(String partName, long partFileOffset) {
+ for (IndexEntry e: entries) {
+ boolean nameMatch = partName.equals(e.partFileName);
+ boolean inRange = (partFileOffset >= e.startOffset) &&
+ (partFileOffset < e.startOffset + e.length);
+ if (nameMatch && inRange) {
+ return e;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Finds the index entry corresponding to a file in the archive
+ */
+ public IndexEntry findEntryByFileName(String fileName) {
+ for (IndexEntry e: entries) {
+ if (fileName.equals(e.fileName)) {
+ return e;
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
index 90f8f3e..06b2686 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
@@ -30,6 +30,8 @@
import java.util.Random;
import java.util.Set;
import java.util.HashSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.net.URI;
@@ -79,6 +81,9 @@
public static final String DEFAULT_RAID_LOCATION = "/raid";
public static final String RAID_LOCATION_KEY = "hdfs.raid.locations";
public static final String HAR_SUFFIX = "_raid.har";
+ public static final Pattern PARITY_HAR_PARTFILE_PATTERN =
+ Pattern.compile(".*" + HAR_SUFFIX + "/part-.*");
+
/** RPC server */
private Server server;
@@ -1149,6 +1154,11 @@
return conf.getInt(STRIPE_LENGTH_KEY, DEFAULT_STRIPE_LENGTH);
}
+ static boolean isParityHarPartFile(Path p) {
+ Matcher m = PARITY_HAR_PARTFILE_PATTERN.matcher(p.toUri().getPath());
+ return m.matches();
+ }
+
/**
* Returns current time.
*/
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
index d93844a..28dbee2 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
@@ -48,9 +48,13 @@
* A {@link RaidShell} that allows browsing configured raid policies.
*/
public class RaidShell extends Configured implements Tool {
+ static {
+ Configuration.addDefaultResource("hdfs-default.xml");
+ Configuration.addDefaultResource("hdfs-site.xml");
+ }
public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.RaidShell");
public RaidProtocol raidnode;
- final RaidProtocol rpcRaidnode;
+ RaidProtocol rpcRaidnode;
private UserGroupInformation ugi;
volatile boolean clientRunning = true;
private Configuration conf;
@@ -62,28 +66,21 @@
* configuration options.
* @throws IOException
*/
- public RaidShell() throws IOException {
- this(new Configuration());
- }
-
- /**
- * The RaidShell connects to the specified RaidNode and performs basic
- * configuration options.
- * @param conf The Hadoop configuration
- * @throws IOException
- */
public RaidShell(Configuration conf) throws IOException {
- this(conf, RaidNode.getAddress(conf));
+ super(conf);
+ this.conf = conf;
}
- public RaidShell(Configuration conf, InetSocketAddress address) throws IOException {
- super(conf);
+ void initializeRpc(Configuration conf, InetSocketAddress address) throws IOException {
this.ugi = UserGroupInformation.getCurrentUser();
-
this.rpcRaidnode = createRPCRaidnode(address, conf, ugi);
this.raidnode = createRaidnode(rpcRaidnode);
}
-
+
+ void initializeLocal(Configuration conf) throws IOException {
+ this.ugi = UserGroupInformation.getCurrentUser();
+ }
+
public static RaidProtocol createRaidnode(Configuration conf) throws IOException {
return createRaidnode(RaidNode.getAddress(conf), conf);
}
@@ -92,7 +89,6 @@
Configuration conf) throws IOException {
return createRaidnode(createRPCRaidnode(raidNodeAddr, conf,
UserGroupInformation.getCurrentUser()));
-
}
private static RaidProtocol createRPCRaidnode(InetSocketAddress raidNodeAddr,
@@ -153,13 +149,17 @@
System.err.println("Usage: java RaidShell" +
" [-showConfig]");
} else if ("-recover".equals(cmd)) {
- System.err.println("Usage: java CronShell" +
+ System.err.println("Usage: java RaidShell" +
" [-recover srcPath1 corruptOffset]");
+ } else if ("-recoverBlocks".equals(cmd)) {
+ System.err.println("Usage: java RaidShell" +
+ " [-recoverBlocks path1 path2...]");
} else {
System.err.println("Usage: java RaidShell");
System.err.println(" [-showConfig ]");
System.err.println(" [-help [cmd]]");
System.err.println(" [-recover srcPath1 corruptOffset]");
+ System.err.println(" [-recoverBlocks path1 path2...]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
}
@@ -195,9 +195,15 @@
try {
if ("-showConfig".equals(cmd)) {
+ initializeRpc(conf, RaidNode.getAddress(conf));
exitCode = showConfig(cmd, argv, i);
} else if ("-recover".equals(cmd)) {
+ initializeRpc(conf, RaidNode.getAddress(conf));
exitCode = recoverAndPrint(cmd, argv, i);
+ } else if ("-recoverBlocks".equals(cmd)) {
+ initializeLocal(conf);
+ recoverBlocks(argv, i);
+ exitCode = 0;
} else {
exitCode = -1;
System.err.println(cmd.substring(1) + ": Unknown command");
@@ -278,29 +284,37 @@
}
return exitCode;
}
-
+
+ public void recoverBlocks(String[] args, int startIndex)
+ throws IOException {
+ LOG.info("Recovering blocks for " + (args.length - startIndex) + " files");
+ BlockFixer fixer = new BlockFixer(conf);
+ for (int i = startIndex; i < args.length; i++) {
+ String path = args[i];
+ fixer.fixFile(new Path(path));
+ }
+ }
+
/**
* main() has some simple utility methods
*/
public static void main(String argv[]) throws Exception {
RaidShell shell = null;
try {
- shell = new RaidShell();
+ shell = new RaidShell(new Configuration());
+ int res = ToolRunner.run(shell, argv);
+ System.exit(res);
} catch (RPC.VersionMismatch v) {
System.err.println("Version Mismatch between client and server" +
"... command aborted.");
System.exit(-1);
} catch (IOException e) {
- System.err.println("Bad connection to RaidNode. command aborted.");
+ System.err.
+ println("Bad connection to RaidNode or NameNode. command aborted.");
+ System.err.println(e.getMessage());
System.exit(-1);
- }
-
- int res;
- try {
- res = ToolRunner.run(shell, argv);
} finally {
shell.close();
}
- System.exit(res);
}
}
diff --git a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
index 6818168..bde24fe 100644
--- a/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
+++ b/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
@@ -22,12 +22,32 @@
import java.io.OutputStream;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.IOUtils;
public class RaidUtils {
+ /**
+ * Removes files matching the trash file pattern.
+ */
+ public static void filterTrash(Configuration conf, List<Path> files) {
+ // Remove files under Trash.
+ String trashPattern = conf.get("raid.blockfixer.trash.pattern",
+ "^/user/.*/\\.Trash.*");
+ for (Iterator<Path> it = files.iterator(); it.hasNext(); ) {
+ String pathStr = it.next().toString();
+ if (Pattern.matches(trashPattern, pathStr)) {
+ it.remove();
+ }
+ }
+ }
+
public static void readTillEnd(InputStream in, byte[] buf, boolean eofOK)
throws IOException {
int toRead = buf.length;
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
index 3da2f15..5d7a60a 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
@@ -142,7 +142,8 @@
private LocatedBlocks getBlockLocations(Path file, long length)
throws IOException {
DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
- return dfs.getClient().namenode.getBlockLocations(file.toString(), 0, length);
+ return RaidDFSUtil.getBlockLocations(
+ dfs, file.toUri().getPath(), 0, length);
}
private LocatedBlocks getBlockLocations(Path file)
@@ -190,8 +191,8 @@
while (true) {
LocatedBlocks locations = null;
DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
- locations = dfs.getClient().namenode.getBlockLocations(
- file.toString(), 0, parityStat.getLen());
+ locations = RaidDFSUtil.getBlockLocations(
+ dfs, file.toUri().getPath(), 0, parityStat.getLen());
if (!locations.isUnderConstruction()) {
break;
}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestHarIndexParser.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestHarIndexParser.java
new file mode 100644
index 0000000..8ece5a6
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestHarIndexParser.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestHarIndexParser extends TestCase {
+ final static Log LOG = LogFactory.getLog(TestHarIndexParser.class);
+ File indexFile = null;
+
+ protected void setUp() throws FileNotFoundException, IOException {
+ LOG.info("TestHarIndexParser.setUp()");
+ indexFile = File.createTempFile("harindex", ".tmp");
+ indexFile.deleteOnExit();
+ OutputStreamWriter out = new OutputStreamWriter(
+ new FileOutputStream(indexFile),
+ Charset.forName("UTF-8"));
+ out.write("%2F dir 1282018162460+0+493+hadoop+hadoop 0 0 f1 f2 f3 f4\n");
+ out.write("%2Ff1 file part-0 0 1024 1282018141145+1282018140822+420+hadoop+hadoop\n");
+ out.write("%2Ff3 file part-0 2048 1024 1282018148590+1282018148255+420+hadoop+hadoop\n");
+ out.write("%2Ff2 file part-0 1024 1024 1282018144198+1282018143852+420+hadoop+hadoop\n");
+ out.write("%2Ff4 file part-1 0 1024000 1282018162959+1282018162460+420+hadoop+hadoop\n");
+ out.flush();
+ out.close();
+ }
+
+ protected void tearDown() {
+ LOG.info("TestHarIndexParser.tearDown()");
+ if (indexFile != null)
+ indexFile.delete();
+ }
+
+ public void testHarIndexParser()
+ throws UnsupportedEncodingException, IOException {
+ LOG.info("testHarIndexParser started.");
+ InputStream in = new FileInputStream(indexFile);
+ long size = indexFile.length();
+ HarIndex parser = new HarIndex(in, size);
+
+ HarIndex.IndexEntry entry = parser.findEntry("part-0", 2100);
+ assertEquals("/f3", entry.fileName);
+
+ LOG.info("testHarIndexParser finished.");
+ }
+}
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
index 1bb197a..e444711 100644
--- a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
@@ -278,18 +278,6 @@
cnode = RaidNode.createRaidNode(null, conf);
int times = 10;
- while (times-- > 0) {
- try {
- shell = new RaidShell(conf, cnode.getListenerAddress());
- } catch (Exception e) {
- LOG.info("doTestPathFilter unable to connect to " +
- cnode.getListenerAddress() + " retrying....");
- Thread.sleep(1000);
- continue;
- }
- break;
- }
- LOG.info("doTestPathFilter created RaidShell.");
FileStatus[] listPaths = null;
// wait till file is raided
@@ -322,6 +310,8 @@
Thread.sleep(20000); // Without this wait, unit test crashes
// check for error at beginning of file
+ shell = new RaidShell(conf);
+ shell.initializeRpc(conf, cnode.getListenerAddress());
if (numBlock >= 1) {
LOG.info("doTestPathFilter Check error at beginning of file.");
simulateError(shell, fileSys, file1, crc1, 0);
diff --git a/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
new file mode 100644
index 0000000..d75b50b
--- /dev/null
+++ b/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.zip.CRC32;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.raid.RaidNode;
+
+
+public class TestRaidShell extends TestCase {
+ final static Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.TestRaidShell");
+ final static String TEST_DIR = new File(System.getProperty("test.build.data",
+ "build/contrib/raid/test/data")).getAbsolutePath();
+ final static String CONFIG_FILE = new File(TEST_DIR,
+ "test-raid.xml").getAbsolutePath();
+ final static long RELOAD_INTERVAL = 1000;
+ final static int NUM_DATANODES = 3;
+ Configuration conf;
+ String namenode = null;
+ MiniDFSCluster dfs = null;
+ String hftp = null;
+ FileSystem fileSys = null;
+ RaidNode cnode = null;
+ Random rand = new Random();
+
+ /**
+ * Create a file with three stripes, corrupt a block each in two stripes,
+ * and wait for the the file to be fixed.
+ */
+ public void testBlockFix() throws Exception {
+ LOG.info("Test testBlockFix started.");
+ long blockSize = 8192L;
+ int stripeLength = 3;
+ mySetup(stripeLength, -1);
+ Path file1 = new Path("/user/dhruba/raidtest/file1");
+ Path destPath = new Path("/destraid/user/dhruba/raidtest");
+ Path parityFile = new Path(destPath, "file1");
+ long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+ 1, 7, blockSize);
+ long file1Len = fileSys.getFileStatus(file1).getLen();
+ LOG.info("Test testBlockFix created test files");
+
+ // create an instance of the RaidNode
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ localConf.setInt("raid.blockfix.interval", 1000);
+ // the RaidNode does the raiding inline (instead of submitting to map/reduce)
+ conf.setBoolean("fs.raidnode.local", true);
+ cnode = RaidNode.createRaidNode(null, localConf);
+
+ try {
+ TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+ cnode.stop();
+ cnode.join();
+ cnode = null;
+
+ FileStatus srcStat = fileSys.getFileStatus(file1);
+ LocatedBlocks locations = RaidDFSUtil.getBlockLocations(
+ (DistributedFileSystem) fileSys, file1.toUri().getPath(),
+ 0, srcStat.getLen());
+
+ DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+
+ // Corrupt blocks in different stripes. We can fix them.
+ int[] corruptBlockIdxs = new int[]{0, 4, 6};
+ for (int idx: corruptBlockIdxs) {
+ LOG.info("Corrupting block " + locations.get(idx).getBlock());
+ TestRaidDfs.corruptBlock(file1, locations.get(idx).getBlock(),
+ NUM_DATANODES, false); // corrupt block
+ long offset = idx * blockSize;
+ try {
+ readAndDiscard(fileSys, file1, offset, blockSize);
+ fail("Expected checksumexception not thrown");
+ } catch (ChecksumException e) {
+ LOG.info("Block at offset " + offset + " got expected exception");
+ }
+ }
+
+ String fileUriPath = file1.toUri().getPath();
+ waitForCorruptBlocks(corruptBlockIdxs.length, dfs, file1);
+
+ // Create RaidShell and fix the file.
+ RaidShell shell = new RaidShell(conf);
+ String[] args = new String[2];
+ args[0] = "-recoverBlocks";
+ args[1] = file1.toUri().getPath();
+ ToolRunner.run(shell, args);
+
+ waitForCorruptBlocks(0, dfs, file1);
+
+ assertTrue(TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
+
+ // Now corrupt and fix the parity file.
+ FileStatus parityStat = fileSys.getFileStatus(parityFile);
+ long parityCrc = getCRC(fileSys, parityFile);
+ locations = RaidDFSUtil.getBlockLocations(
+ dfs, parityFile.toUri().getPath(), 0, parityStat.getLen());
+ TestRaidDfs.corruptBlock(parityFile, locations.get(0).getBlock(),
+ NUM_DATANODES, false); // corrupt block
+ try {
+ readAndDiscard(fileSys, parityFile, 0, blockSize);
+ fail("Expected checksumexception not thrown");
+ } catch (ChecksumException e) {
+ LOG.info("Parity Block at offset 0 got expected exception");
+ }
+ waitForCorruptBlocks(1, dfs, parityFile);
+
+ args[1] = parityFile.toUri().getPath();
+ ToolRunner.run(shell, args);
+
+ waitForCorruptBlocks(0, dfs, file1);
+ assertEquals(parityCrc, getCRC(fileSys, parityFile));
+
+ } catch (Exception e) {
+ LOG.info("Test testBlockFix Exception " + e + StringUtils.stringifyException(e));
+ throw e;
+ } finally {
+ myTearDown();
+ }
+ LOG.info("Test testBlockFix completed.");
+ }
+
+ private void waitForCorruptBlocks(
+ int numCorruptBlocks, DistributedFileSystem dfs, Path file)
+ throws Exception {
+ String path = file.toUri().getPath();
+ FileStatus stat = dfs.getFileStatus(file);
+ long start = System.currentTimeMillis();
+ long actual = 0;
+ do {
+ actual = RaidDFSUtil.corruptBlocksInFile(
+ dfs, path, 0, stat.getLen()).size();
+ if (actual == numCorruptBlocks) break;
+ if (System.currentTimeMillis() - start > 120000) break;
+ LOG.info("Waiting for " + numCorruptBlocks + " corrupt blocks in " +
+ path + ", found " + actual);
+ Thread.sleep(1000);
+ } while (true);
+ assertEquals(numCorruptBlocks, actual);
+ }
+
+ private static DistributedFileSystem getDFS(
+ Configuration conf, FileSystem dfs) throws IOException {
+ Configuration clientConf = new Configuration(conf);
+ clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+ URI dfsUri = dfs.getUri();
+ FileSystem.closeAll();
+ return (DistributedFileSystem) FileSystem.get(dfsUri, clientConf);
+ }
+
+ private void mySetup(int stripeLength, int timeBeforeHar) throws Exception {
+
+ new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+ conf = new Configuration();
+
+ conf.set("raid.config.file", CONFIG_FILE);
+ conf.setBoolean("raid.config.reload", true);
+ conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
+
+ // scan all policies once every 5 second
+ conf.setLong("raid.policy.rescan.interval", 5000);
+
+ // make all deletions not go through Trash
+ conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
+ // do not use map-reduce cluster for Raiding
+ conf.setBoolean("fs.raidnode.local", true);
+ conf.set("raid.server.address", "localhost:0");
+ conf.setInt("hdfs.raid.stripeLength", stripeLength);
+ conf.set("hdfs.raid.locations", "/destraid");
+
+ dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+ dfs.waitActive();
+ fileSys = dfs.getFileSystem();
+ namenode = fileSys.getUri().toString();
+
+ FileSystem.setDefaultUri(conf, namenode);
+ hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
+
+ FileSystem.setDefaultUri(conf, namenode);
+
+ FileWriter fileWriter = new FileWriter(CONFIG_FILE);
+ fileWriter.write("<?xml version=\"1.0\"?>\n");
+ String str = "<configuration> " +
+ "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
+ "<policy name = \"RaidTest1\"> " +
+ "<erasureCode>xor</erasureCode> " +
+ "<destPath> /destraid</destPath> " +
+ "<property> " +
+ "<name>targetReplication</name> " +
+ "<value>1</value> " +
+ "<description>after RAIDing, decrease the replication factor of a file to this value." +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>metaReplication</name> " +
+ "<value>1</value> " +
+ "<description> replication factor of parity file" +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>modTimePeriod</name> " +
+ "<value>2000</value> " +
+ "<description> time (milliseconds) after a file is modified to make it " +
+ "a candidate for RAIDing " +
+ "</description> " +
+ "</property> ";
+ if (timeBeforeHar >= 0) {
+ str +=
+ "<property> " +
+ "<name>time_before_har</name> " +
+ "<value>" + timeBeforeHar + "</value> " +
+ "<description> amount of time waited before har'ing parity files" +
+ "</description> " +
+ "</property> ";
+ }
+
+ str +=
+ "</policy>" +
+ "</srcPath>" +
+ "</configuration>";
+ fileWriter.write(str);
+ fileWriter.close();
+ }
+
+ private void myTearDown() throws Exception {
+ if (cnode != null) { cnode.stop(); cnode.join(); }
+ if (dfs != null) { dfs.shutdown(); }
+ }
+
+ private long getCRC(FileSystem fs, Path p) throws IOException {
+ CRC32 crc = new CRC32();
+ FSDataInputStream stm = fs.open(p);
+ for (int b = 0; b > 0; b = stm.read()) {
+ crc.update(b);
+ }
+ stm.close();
+ return crc.getValue();
+ }
+
+ private static void readAndDiscard(
+ FileSystem fs, Path p, long offset, long length) throws IOException {
+ FSDataInputStream in = fs.open(p);
+ in.seek(offset);
+ long count = 0;
+ for (int b = 0; b >= 0 && count < length; count++) {
+ b = in.read();
+ }
+ }
+}
+