blob: 093a18a3abd16d066433f1841d339ab0184ffb95 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.io.*;
import java.nio.channels.FileChannel;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import junit.framework.TestCase;
/**
* This test verifies that block verification occurs on the datanode
*/
public class TestDatanodeBlockScanner extends TestCase {
private static final Log LOG =
LogFactory.getLog(TestDatanodeBlockScanner.class);
private static Pattern pattern =
Pattern.compile(".*?(blk_[-]*\\d+).*?scan time\\s*:\\s*(\\d+)");
private static Pattern pattern_blockVerify =
Pattern.compile(".*?(SCAN_PERIOD)\\s*:\\s*(\\d+.*?)");
/**
* This connects to datanode and fetches block verification data.
* It repeats this until the given block has a verification time > 0.
*/
private static long waitForVerification(DatanodeInfo dn, FileSystem fs,
Path file, int blocksValidated) throws IOException {
URL url = new URL("http://localhost:" + dn.getInfoPort() +
"/blockScannerReport?listblocks");
long lastWarnTime = System.currentTimeMillis();
long verificationTime = 0;
String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
while (verificationTime <= 0) {
String response = DFSTestUtil.urlGet(url);
if(blocksValidated >= 0) {
for(Matcher matcher = pattern_blockVerify.matcher(response); matcher.find();) {
if (block.equals(matcher.group(1))) {
assertEquals(1, blocksValidated);
break;
}
}
}
for(Matcher matcher = pattern.matcher(response); matcher.find();) {
if (block.equals(matcher.group(1))) {
verificationTime = Long.parseLong(matcher.group(2));
break;
}
}
if (verificationTime <= 0) {
long now = System.currentTimeMillis();
if ((now - lastWarnTime) >= 5*1000) {
LOG.info("Waiting for verification of " + block);
lastWarnTime = now;
}
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {}
}
}
return verificationTime;
}
public void testDatanodeBlockScanner() throws IOException {
long startTime = System.currentTimeMillis();
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path file1 = new Path("/tmp/testBlockVerification/file1");
Path file2 = new Path("/tmp/testBlockVerification/file2");
/*
* Write the first file and restart the cluster.
*/
DFSTestUtil.createFile(fs, file1, 10, (short)1, 0);
cluster.shutdown();
cluster = new MiniDFSCluster(conf, 1, false, null);
cluster.waitActive();
DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), conf);
fs = cluster.getFileSystem();
DatanodeInfo dn = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
/*
* The cluster restarted. The block should be verified by now.
*/
assertTrue(waitForVerification(dn, fs, file1, 1) > startTime);
/*
* Create a new file and read the block. The block should be marked
* verified since the client reads the block and verifies checksum.
*/
DFSTestUtil.createFile(fs, file2, 10, (short)1, 0);
IOUtils.copyBytes(fs.open(file2), new IOUtils.NullOutputStream(),
conf, true);
assertTrue(waitForVerification(dn, fs, file2, 2) > startTime);
cluster.shutdown();
}
public static boolean corruptReplica(String blockName, int replica) throws IOException {
Random random = new Random();
File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
boolean corrupted = false;
for (int i=replica*2; i<replica*2+2; i++) {
File blockFile = new File(baseDir, "data" + (i+1) +
MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
if (blockFile.exists()) {
// Corrupt replica by writing random bytes into replica
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
FileChannel channel = raFile.getChannel();
String badString = "BADBAD";
int rand = random.nextInt((int)channel.size()/2);
raFile.seek(rand);
raFile.write(badString.getBytes());
raFile.close();
corrupted = true;
}
}
return corrupted;
}
public void testBlockCorruptionPolicy() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong("dfs.blockreport.intervalMsec", 1000L);
Random random = new Random();
FileSystem fs = null;
DFSClient dfsClient = null;
LocatedBlocks blocks = null;
int blockCount = 0;
int rand = random.nextInt(3);
MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
cluster.waitActive();
fs = cluster.getFileSystem();
Path file1 = new Path("/tmp/testBlockVerification/file1");
DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), conf);
do {
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
blockCount = blocks.get(0).getLocations().length;
try {
LOG.info("Looping until expected blockCount of 3 is received: " + blockCount);
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
} while (blockCount != 3);
assertTrue(blocks.get(0).isCorrupt() == false);
// Corrupt random replica of block
assertTrue(corruptReplica(block, rand));
// Restart the datanode hoping the corrupt block to be reported
cluster.restartDataNode(rand);
// We have 2 good replicas and block is not corrupt
do {
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
blockCount = blocks.get(0).getLocations().length;
try {
LOG.info("Looping until expected blockCount of 2 is received: " + blockCount);
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
} while (blockCount != 2);
assertTrue(blocks.get(0).isCorrupt() == false);
// Corrupt all replicas. Now, block should be marked as corrupt
// and we should get all the replicas
assertTrue(corruptReplica(block, 0));
assertTrue(corruptReplica(block, 1));
assertTrue(corruptReplica(block, 2));
// Read the file to trigger reportBadBlocks by client
try {
IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(),
conf, true);
} catch (IOException e) {
// Ignore exception
}
// We now have the blocks to be marked as corrupt and we get back all
// its replicas
do {
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
blockCount = blocks.get(0).getLocations().length;
try {
LOG.info("Looping until expected blockCount of 3 is received");
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
} while (blockCount != 3);
assertTrue(blocks.get(0).isCorrupt() == true);
cluster.shutdown();
}
/**
* testBlockCorruptionRecoveryPolicy.
* This tests recovery of corrupt replicas, first for one corrupt replica
* then for two. The test invokes blockCorruptionRecoveryPolicy which
* 1. Creates a block with desired number of replicas
* 2. Corrupts the desired number of replicas and restarts the datanodes
* containing the corrupt replica. Additionaly we also read the block
* in case restarting does not report corrupt replicas.
* Restarting or reading from the datanode would trigger reportBadBlocks
* to namenode.
* NameNode adds it to corruptReplicasMap and neededReplication
* 3. Test waits until all corrupt replicas are reported, meanwhile
* Re-replciation brings the block back to healthy state
* 4. Test again waits until the block is reported with expected number
* of good replicas.
*/
public void testBlockCorruptionRecoveryPolicy() throws IOException {
// Test recovery of 1 corrupt replica
LOG.info("Testing corrupt replica recovery for one corrupt replica");
blockCorruptionRecoveryPolicy(4, (short)3, 1);
// Test recovery of 2 corrupt replicas
LOG.info("Testing corrupt replica recovery for two corrupt replicas");
blockCorruptionRecoveryPolicy(5, (short)3, 2);
}
private void blockCorruptionRecoveryPolicy(int numDataNodes,
short numReplicas,
int numCorruptReplicas)
throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong("dfs.blockreport.intervalMsec", 30L);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 30);
conf.setLong("dfs.heartbeat.interval", 30L);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
FileSystem fs = null;
DFSClient dfsClient = null;
LocatedBlocks blocks = null;
int replicaCount = 0;
MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
cluster.waitActive();
fs = cluster.getFileSystem();
Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
Block blk = DFSTestUtil.getFirstBlock(fs, file1);
String block = blk.getBlockName();
dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), conf);
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
// Wait until block is replicated to numReplicas
while (replicaCount != numReplicas) {
try {
LOG.info("Looping until expected replicaCount of " + numReplicas +
"is reached");
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
}
assertTrue(blocks.get(0).isCorrupt() == false);
// Corrupt numCorruptReplicas replicas of block
int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
if (corruptReplica(block, i))
corruptReplicasDNIDs[j++] = i;
}
// Restart the datanodes containing corrupt replicas
// so they would be reported to namenode and re-replicated
for (int i =0; i < numCorruptReplicas; i++)
cluster.restartDataNode(corruptReplicasDNIDs[i]);
// Loop until all corrupt replicas are reported
int corruptReplicaSize = cluster.getNamesystem().
numCorruptReplicas(blk);
while (corruptReplicaSize != numCorruptReplicas) {
try {
IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(),
conf, true);
} catch (IOException e) {
}
try {
LOG.info("Looping until expected " + numCorruptReplicas + " are " +
"reported. Current reported " + corruptReplicaSize);
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
corruptReplicaSize = cluster.getNamesystem().
numCorruptReplicas(blk);
}
// Loop until the block recovers after replication
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
while (replicaCount != numReplicas) {
try {
LOG.info("Looping until block gets rereplicated to " + numReplicas);
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
}
// Make sure the corrupt replica is invalidated and removed from
// corruptReplicasMap
corruptReplicaSize = cluster.getNamesystem().
numCorruptReplicas(blk);
while (corruptReplicaSize != 0 || replicaCount != numReplicas) {
try {
LOG.info("Looping until corrupt replica is invalidated");
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
corruptReplicaSize = cluster.getNamesystem().
numCorruptReplicas(blk);
blocks = dfsClient.getNamenode().
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
replicaCount = blocks.get(0).getLocations().length;
}
// Make sure block is healthy
assertTrue(corruptReplicaSize == 0);
assertTrue(replicaCount == numReplicas);
assertTrue(blocks.get(0).isCorrupt() == false);
cluster.shutdown();
}
/** Test if NameNode handles truncated blocks in block report */
public void testTruncatedBlockReport() throws Exception {
final Configuration conf = new HdfsConfiguration();
final short REPLICATION_FACTOR = (short)2;
MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION_FACTOR, true, null);
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
try {
final Path fileName = new Path("/file1");
DFSTestUtil.createFile(fs, fileName, 1, REPLICATION_FACTOR, 0);
DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
String block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
// Truncate replica of block
changeReplicaLength(block, 0, -1);
cluster.shutdown();
// restart the cluster
cluster = new MiniDFSCluster(
0, conf, REPLICATION_FACTOR, false, true, null, null, null);
cluster.startDataNodes(conf, 1, true, null, null);
cluster.waitActive(); // now we have 3 datanodes
// wait for truncated block be detected and the block to be replicated
DFSTestUtil.waitReplication(
cluster.getFileSystem(), fileName, REPLICATION_FACTOR);
// Make sure that truncated block will be deleted
waitForBlockDeleted(block, 0);
} finally {
cluster.shutdown();
}
}
/**
* Change the length of a block at datanode dnIndex
*/
static boolean changeReplicaLength(String blockName, int dnIndex, int lenDelta) throws IOException {
File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
for (int i=dnIndex*2; i<dnIndex*2+2; i++) {
File blockFile = new File(baseDir, "data" + (i+1) +
MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
if (blockFile.exists()) {
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
raFile.setLength(raFile.length()+lenDelta);
raFile.close();
return true;
}
}
return false;
}
private static void waitForBlockDeleted(String blockName, int dnIndex)
throws IOException, InterruptedException {
File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
File blockFile1 = new File(baseDir, "data" + (2*dnIndex+1) +
MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
File blockFile2 = new File(baseDir, "data" + (2*dnIndex+2) +
MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
while (blockFile1.exists() || blockFile2.exists()) {
Thread.sleep(100);
}
}
}