blob: 87e2d4ce5e9b342d69eea3e8c98a992de084ddec [file] [log] [blame]
package org.apache.hadoop.hdfs;
import junit.framework.TestCase;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
/* File Append tests for HDFS-200 & HDFS-142, specifically focused on:
* using append()/sync() to recover block information
*/
public class TestFileAppend4 extends TestCase {
static final Log LOG = LogFactory.getLog(TestFileAppend4.class);
static final long BLOCK_SIZE = 1024;
static final long BBW_SIZE = 500; // don't align on bytes/checksum
static final Object [] NO_ARGS = new Object []{};
Configuration conf;
MiniDFSCluster cluster;
Path file1;
FSDataOutputStream stm;
boolean simulatedStorage = false;
{
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
}
@Override
public void setUp() throws Exception {
this.conf = new Configuration();
if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
}
conf.setBoolean("dfs.support.append", true);
// lower heartbeat interval for fast recognition of DN death
conf.setInt("heartbeat.recheck.interval", 1000);
conf.setInt("dfs.heartbeat.interval", 1);
conf.setInt("dfs.socket.timeout", 5000);
// handle under-replicated blocks quickly (for replication asserts)
// conf.set("dfs.replication.pending.timeout.sec", Integer.toString(5));
conf.setInt("dfs.replication.pending.timeout.sec", 5);
conf.setInt("dfs.replication.interval", 1);
// handle failures in the DFSClient pipeline quickly
// (for cluster.shutdown(); fs.close() idiom)
conf.setInt("ipc.client.connect.max.retries", 1);
conf.setInt("dfs.client.block.recovery.retries", 1);
}
@Override
public void tearDown() throws Exception {
}
private void createFile(FileSystem whichfs, String filename,
int rep, long fileSize) throws Exception {
file1 = new Path(filename);
stm = whichfs.create(file1, true, (int)fileSize+1, (short)rep, BLOCK_SIZE);
LOG.info("Created file " + filename);
LOG.info("Writing " + fileSize + " bytes to " + file1);
AppendTestUtil.write(stm, 0, (int)fileSize);
}
private void assertFileSize(FileSystem whichfs, long expectedSize) throws Exception {
LOG.info("reading length of " + file1.getName() + " on namenode");
long realSize = whichfs.getFileStatus(file1).getLen();
assertTrue("unexpected file size! received=" + realSize
+ " , expected=" + expectedSize,
realSize == expectedSize);
}
private void assertNumCurrentReplicas(short rep) throws Exception {
OutputStream hdfs_out = stm.getWrappedStream();
Method r = hdfs_out.getClass().getMethod("getNumCurrentReplicas",
new Class<?> []{});
r.setAccessible(true);
int actualRepl = ((Integer)r.invoke(hdfs_out, NO_ARGS)).intValue();
assertTrue(file1 + " should be replicated to " + rep + " datanodes, not " +
actualRepl + ".", actualRepl == rep);
}
private void loseLeases(FileSystem whichfs) throws Exception {
LOG.info("leasechecker.interruptAndJoin()");
// lose the lease on the client
DistributedFileSystem dfs = (DistributedFileSystem)whichfs;
dfs.dfs.leasechecker.interruptAndJoin();
}
/*
* Recover file.
* Try and open file in append mode.
* Doing this, we get a hold of the file that crashed writer
* was writing to. Once we have it, close it. This will
* allow subsequent reader to see up to last sync.
* NOTE: This is the same algorithm that HBase uses for file recovery
* @param fs
* @throws Exception
*/
private void recoverFile(final FileSystem fs) throws Exception {
LOG.info("Recovering File Lease");
// set the soft limit to be 1 second so that the
// namenode triggers lease recovery upon append request
cluster.setLeasePeriod(1000, FSConstants.LEASE_HARDLIMIT_PERIOD);
// Trying recovery
int tries = 60;
boolean recovered = false;
FSDataOutputStream out = null;
while (!recovered && tries-- > 0) {
try {
out = fs.append(file1);
LOG.info("Successfully opened for appends");
recovered = true;
} catch (IOException e) {
LOG.info("Failed open for append, waiting on lease recovery");
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore it and try again
}
}
}
if (out != null) {
try {
out.close();
LOG.info("Successfully obtained lease");
} catch (IOException e) {
LOG.info("Unable to close file after opening for appends. " + e);
recovered = false;
}
// out.close();
}
if (!recovered) {
fail((tries > 0) ? "Recovery failed" : "Recovery should take < 1 min");
}
LOG.info("Past out lease recovery");
}
// Waits for all of the blocks to have expected replication
private void waitForBlockReplication(FileSystem whichfs, String filename,
int expected, long maxWaitSec)
throws IOException {
long start = System.currentTimeMillis();
//wait for all the blocks to be replicated;
LOG.info("Checking for block replication for " + filename);
int iters = 0;
while (true) {
boolean replOk = true;
BlockLocation[] bl = whichfs.getFileBlockLocations(
whichfs.getFileStatus(file1), 0, BLOCK_SIZE);
if(bl.length == 0) {
replOk = false;
}
for (BlockLocation b : bl) {
int actual = b.getNames().length;
if ( actual < expected ) {
if (true || iters > 0) {
LOG.info("Not enough replicas for " + b +
" yet. Expecting " + expected + ", got " +
actual + ".");
}
replOk = false;
break;
}
}
if (replOk) {
return;
}
iters++;
if (maxWaitSec > 0 &&
(System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
throw new IOException("Timedout while waiting for all blocks to " +
" be replicated for " + filename);
}
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
}
}
private void checkFile(FileSystem whichfs, long fileSize) throws Exception {
LOG.info("validating content from datanodes...");
AppendTestUtil.check(whichfs, file1, fileSize);
}
private void corruptDatanode(int dnNumber) throws Exception {
// get the FS data of the 2nd datanode
File data_dir = new File(System.getProperty("test.build.data"),
"dfs/data/data" +
Integer.toString(dnNumber*2 + 1) +
"/blocksBeingWritten");
int corrupted = 0;
for (File block : data_dir.listFiles()) {
// only touch the actual data, not the metadata (with CRC)
if (block.getName().startsWith("blk_") &&
!block.getName().endsWith("meta")) {
RandomAccessFile file = new RandomAccessFile(block, "rw");
FileChannel channel = file.getChannel();
Random r = new Random();
long lastBlockSize = channel.size() % 512;
long position = channel.size() - lastBlockSize;
int length = r.nextInt((int)(channel.size() - position + 1));
byte[] buffer = new byte[length];
r.nextBytes(buffer);
channel.write(ByteBuffer.wrap(buffer), position);
System.out.println("Deliberately corrupting file " + block.getName() +
" at offset " + position +
" length " + length);
file.close();
++corrupted;
}
}
assertTrue("Should have some data in bbw to corrupt", corrupted > 0);
}
// test [1 bbw, 0 HDFS block]
public void testAppendSyncBbw() throws Exception {
LOG.info("START");
cluster = new MiniDFSCluster(conf, 1, true, null);
FileSystem fs1 = cluster.getFileSystem();;
FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
try {
createFile(fs1, "/bbw.test", 1, BBW_SIZE);
stm.sync();
// empty before close()
assertFileSize(fs1, 0);
loseLeases(fs1);
recoverFile(fs2);
// close() should write recovered bbw to HDFS block
assertFileSize(fs2, BBW_SIZE);
checkFile(fs2, BBW_SIZE);
} finally {
fs2.close();
fs1.close();
cluster.shutdown();
}
LOG.info("STOP");
}
// test [1 bbw, 0 HDFS block] with cluster restart
public void testAppendSyncBbwClusterRestart() throws Exception {
LOG.info("START");
cluster = new MiniDFSCluster(conf, 1, true, null);
FileSystem fs1 = cluster.getFileSystem();
FileSystem fs2 = null;
try {
createFile(fs1, "/bbwRestart.test", 1, BBW_SIZE);
stm.sync();
// empty before close()
assertFileSize(fs1, 0);
cluster.shutdown();
fs1.close(); // same as: loseLeases()
LOG.info("STOPPED first instance of the cluster");
cluster = new MiniDFSCluster(conf, 1, false, null);
cluster.waitActive();
LOG.info("START second instance.");
fs2 = cluster.getFileSystem();
recoverFile(fs2);
// close() should write recovered bbw to HDFS block
assertFileSize(fs2, BBW_SIZE);
checkFile(fs2, BBW_SIZE);
} finally {
if(fs2 != null) {
fs2.close();
}
fs1.close();
cluster.shutdown();
}
LOG.info("STOP");
}
// test [3 bbw, 0 HDFS block] with cluster restart
// ** previous HDFS-142 patches hit an problem with multiple outstanding bbw on a single disk**
public void testAppendSync2XBbwClusterRestart() throws Exception {
LOG.info("START");
cluster = new MiniDFSCluster(conf, 1, true, null);
// assumption: this MiniDFS starts up 1 datanode with 2 dirs to load balance
assertTrue(cluster.getDataNodes().get(0).getConf().get("dfs.data.dir").matches("[^,]+,[^,]*"));
FileSystem fs1 = cluster.getFileSystem();
FileSystem fs2 = null;
try {
// create 3 bbw files [so at least one dir has 2 files]
int[] files = new int[]{0,1,2};
Path[] paths = new Path[files.length];
FSDataOutputStream[] stms = new FSDataOutputStream[files.length];
for (int i : files ) {
createFile(fs1, "/bbwRestart" + i + ".test", 1, BBW_SIZE);
stm.sync();
assertFileSize(fs1, 0);
paths[i] = file1;
stms[i] = stm;
}
cluster.shutdown();
fs1.close(); // same as: loseLeases()
LOG.info("STOPPED first instance of the cluster");
cluster = new MiniDFSCluster(conf, 1, false, null);
cluster.waitActive();
LOG.info("START second instance.");
fs2 = cluster.getFileSystem();
// recover 3 bbw files
for (int i : files) {
file1 = paths[i];
recoverFile(fs2);
assertFileSize(fs2, BBW_SIZE);
checkFile(fs2, BBW_SIZE);
}
} finally {
if(fs2 != null) {
fs2.close();
}
fs1.close();
cluster.shutdown();
}
LOG.info("STOP");
}
// test [1 bbw, 1 HDFS block]
public void testAppendSyncBlockPlusBbw() throws Exception {
LOG.info("START");
cluster = new MiniDFSCluster(conf, 1, true, null);
FileSystem fs1 = cluster.getFileSystem();;
FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
try {
createFile(fs1, "/blockPlusBbw.test", 1, BLOCK_SIZE + BBW_SIZE);
// 0 before sync()
assertFileSize(fs1, 0);
stm.sync();
// BLOCK_SIZE after sync()
assertFileSize(fs1, BLOCK_SIZE);
loseLeases(fs1);
recoverFile(fs2);
// close() should write recovered bbw to HDFS block
assertFileSize(fs2, BLOCK_SIZE + BBW_SIZE);
checkFile(fs2, BLOCK_SIZE + BBW_SIZE);
} finally {
stm = null;
fs2.close();
fs1.close();
cluster.shutdown();
}
LOG.info("STOP");
}
// we test different datanodes restarting to exercise
// the start, middle, & end of the DFSOutputStream pipeline
public void testAppendSyncReplication0() throws Exception {
replicationTest(0);
}
public void testAppendSyncReplication1() throws Exception {
replicationTest(1);
}
public void testAppendSyncReplication2() throws Exception {
replicationTest(2);
}
void replicationTest(int badDN) throws Exception {
LOG.info("START");
cluster = new MiniDFSCluster(conf, 3, true, null);
FileSystem fs1 = cluster.getFileSystem();
try {
int halfBlock = (int)BLOCK_SIZE/2;
short rep = 3; // replication
assertTrue(BLOCK_SIZE%4 == 0);
file1 = new Path("/appendWithReplication.dat");
// write 1/2 block & sync
stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, BLOCK_SIZE);
AppendTestUtil.write(stm, 0, halfBlock);
stm.sync();
assertNumCurrentReplicas(rep);
// close one of the datanodes
cluster.stopDataNode(badDN);
// write 1/4 block & sync
AppendTestUtil.write(stm, halfBlock, (int)BLOCK_SIZE/4);
stm.sync();
assertNumCurrentReplicas((short)(rep - 1));
// restart the cluster
/*
* we put the namenode in safe mode first so he doesn't process
* recoverBlock() commands from the remaining DFSClient as datanodes
* are serially shutdown
*/
cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.shutdown();
fs1.close();
LOG.info("STOPPED first instance of the cluster");
cluster = new MiniDFSCluster(conf, 3, false, null);
cluster.getNameNode().getNamesystem().stallReplicationWork();
cluster.waitActive();
fs1 = cluster.getFileSystem();
LOG.info("START second instance.");
recoverFile(fs1);
// the 2 DNs with the larger sequence number should win
BlockLocation[] bl = fs1.getFileBlockLocations(
fs1.getFileStatus(file1), 0, BLOCK_SIZE);
assertTrue("Should have one block", bl.length == 1);
assertTrue("Should have 2 replicas for that block, not " +
bl[0].getNames().length, bl[0].getNames().length == 2);
assertFileSize(fs1, BLOCK_SIZE*3/4);
checkFile(fs1, BLOCK_SIZE*3/4);
// verify that, over time, the block has been replicated to 3 DN
cluster.getNameNode().getNamesystem().restartReplicationWork();
waitForBlockReplication(fs1, file1.toString(), 3, 20);
} finally {
fs1.close();
cluster.shutdown();
}
}
// we test different datanodes restarting to exercise
// the start, middle, & end of the DFSOutputStream pipeline
public void testAppendSyncChecksum0() throws Exception {
checksumTest(0);
}
public void testAppendSyncChecksum1() throws Exception {
checksumTest(1);
}
public void testAppendSyncChecksum2() throws Exception {
checksumTest(2);
}
void checksumTest(int goodDN) throws Exception {
int deadDN = (goodDN + 1) % 3;
int corruptDN = (goodDN + 2) % 3;
LOG.info("START");
cluster = new MiniDFSCluster(conf, 3, true, null);
FileSystem fs1 = cluster.getFileSystem();
try {
int halfBlock = (int)BLOCK_SIZE/2;
short rep = 3; // replication
assertTrue(BLOCK_SIZE%8 == 0);
file1 = new Path("/appendWithReplication.dat");
// write 1/2 block & sync
stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, BLOCK_SIZE);
AppendTestUtil.write(stm, 0, halfBlock);
stm.sync();
assertNumCurrentReplicas(rep);
// close one of the datanodes
cluster.stopDataNode(deadDN);
// write 1/4 block & sync
AppendTestUtil.write(stm, halfBlock, (int)BLOCK_SIZE/4);
stm.sync();
assertNumCurrentReplicas((short)(rep - 1));
// stop the cluster
cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.shutdown();
fs1.close();
LOG.info("STOPPED first instance of the cluster");
// give the second datanode a bad CRC
corruptDatanode(corruptDN);
// restart the cluster
cluster = new MiniDFSCluster(conf, 3, false, null);
cluster.getNameNode().getNamesystem().stallReplicationWork();
cluster.waitActive();
fs1 = cluster.getFileSystem();
LOG.info("START second instance.");
// verify that only the good datanode's file is used
recoverFile(fs1);
BlockLocation[] bl = fs1.getFileBlockLocations(
fs1.getFileStatus(file1), 0, BLOCK_SIZE);
assertTrue("Should have one block", bl.length == 1);
assertTrue("Should have 1 replica for that block, not " +
bl[0].getNames().length, bl[0].getNames().length == 1);
assertTrue("The replica should be the datanode with the correct CRC",
cluster.getDataNodes().get(goodDN).getSelfAddr().toString()
.endsWith(bl[0].getNames()[0]) );
assertFileSize(fs1, BLOCK_SIZE*3/4);
// should fail checkFile() if data with the bad CRC was used
checkFile(fs1, BLOCK_SIZE*3/4);
// ensure proper re-replication
cluster.getNameNode().getNamesystem().restartReplicationWork();
waitForBlockReplication(fs1, file1.toString(), 3, 20);
} finally {
cluster.shutdown();
fs1.close();
}
}
// we test different datanodes dying and not coming back
public void testDnDeath0() throws Exception {
dnDeathTest(0);
}
public void testDnDeath1() throws Exception {
dnDeathTest(1);
}
public void testDnDeath2() throws Exception {
dnDeathTest(2);
}
/**
* Test case that writes and completes a file, and then
* tries to recover the file after the old primary
* DN has failed.
*/
void dnDeathTest(int badDN) throws Exception {
LOG.info("START");
cluster = new MiniDFSCluster(conf, 3, true, null);
FileSystem fs1 = cluster.getFileSystem();
try {
int halfBlock = (int)BLOCK_SIZE/2;
short rep = 3; // replication
assertTrue(BLOCK_SIZE%4 == 0);
file1 = new Path("/dnDeath.dat");
// write 1/2 block & close
stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, BLOCK_SIZE);
AppendTestUtil.write(stm, 0, halfBlock);
stm.close();
// close one of the datanodes
cluster.stopDataNode(badDN);
// Recover the lease
recoverFile(fs1);
checkFile(fs1, halfBlock);
} finally {
fs1.close();
cluster.shutdown();
}
}
/**
* Test case that stops a writer after finalizing a block but
* before calling completeFile, and then tries to recover
* the lease.
*/
public void testRecoverFinalizedBlock() throws Throwable {
cluster = new MiniDFSCluster(conf, 3, true, null);
try {
cluster.waitActive();
NameNode preSpyNN = cluster.getNameNode();
NameNode spyNN = spy(preSpyNN);
// Delay completeFile
DelayAnswer delayer = new DelayAnswer();
doAnswer(delayer).when(spyNN).complete(anyString(), anyString());
DFSClient client = new DFSClient(null, spyNN, conf, null);
file1 = new Path("/testRecoverFinalized");
final OutputStream stm = client.create("/testRecoverFinalized", true);
// write 1/2 block
AppendTestUtil.write(stm, 0, 4096);
final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
Thread t = new Thread() {
public void run() {
try {
stm.close();
} catch (Throwable t) {
err.set(t);
}
}};
t.start();
LOG.info("Waiting for close to get to latch...");
delayer.waitForCall();
// At this point, the block is finalized on the DNs, but the file
// has not been completed in the NN.
// Lose the leases
LOG.info("Killing lease checker");
client.leasechecker.interruptAndJoin();
FileSystem fs1 = cluster.getFileSystem();
FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
fs1.getConf());
LOG.info("Recovering file");
recoverFile(fs2);
LOG.info("Telling close to proceed.");
delayer.proceed();
LOG.info("Waiting for close to finish.");
t.join();
LOG.info("Close finished.");
// We expect that close will get a "Could not complete file"
// error.
Throwable thrownByClose = err.get();
assertNotNull(thrownByClose);
assertTrue(thrownByClose instanceof IOException);
if (!thrownByClose.getMessage().contains("Could not complete write")) {
throw thrownByClose;
}
} finally {
cluster.shutdown();
}
}
/**
* Test case that stops a writer after finalizing a block but
* before calling completeFile, recovers a file from another writer,
* starts writing from that writer, and then has the old lease holder
* call completeFile
*/
public void testCompleteOtherLeaseHoldersFile() throws Throwable {
cluster = new MiniDFSCluster(conf, 3, true, null);
try {
cluster.waitActive();
NameNode preSpyNN = cluster.getNameNode();
NameNode spyNN = spy(preSpyNN);
// Delay completeFile
DelayAnswer delayer = new DelayAnswer();
doAnswer(delayer).when(spyNN).complete(anyString(), anyString());
DFSClient client = new DFSClient(null, spyNN, conf, null);
file1 = new Path("/testRecoverFinalized");
final OutputStream stm = client.create("/testRecoverFinalized", true);
// write 1/2 block
AppendTestUtil.write(stm, 0, 4096);
final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
Thread t = new Thread() {
public void run() {
try {
stm.close();
} catch (Throwable t) {
err.set(t);
}
}};
t.start();
LOG.info("Waiting for close to get to latch...");
delayer.waitForCall();
// At this point, the block is finalized on the DNs, but the file
// has not been completed in the NN.
// Lose the leases
LOG.info("Killing lease checker");
client.leasechecker.interruptAndJoin();
FileSystem fs1 = cluster.getFileSystem();
FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
fs1.getConf());
LOG.info("Recovering file");
recoverFile(fs2);
LOG.info("Opening file for append from new fs");
FSDataOutputStream appenderStream = fs2.append(file1);
LOG.info("Writing some data from new appender");
AppendTestUtil.write(appenderStream, 0, 4096);
LOG.info("Telling old close to proceed.");
delayer.proceed();
LOG.info("Waiting for close to finish.");
t.join();
LOG.info("Close finished.");
// We expect that close will get a "Lease mismatch"
// error.
Throwable thrownByClose = err.get();
assertNotNull(thrownByClose);
assertTrue(thrownByClose instanceof IOException);
if (!thrownByClose.getMessage().contains(
"Lease mismatch"))
throw thrownByClose;
// The appender should be able to close properly
appenderStream.close();
} finally {
cluster.shutdown();
}
}
/**
* Test for an intermittent failure of commitBlockSynchronization.
* This could happen if the DN crashed between calling updateBlocks
* and commitBlockSynchronization.
*/
public void testDatanodeFailsToCommit() throws Throwable {
LOG.info("START");
cluster = new MiniDFSCluster(conf, 1, true, null);
FileSystem fs1 = cluster.getFileSystem();;
FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
try {
createFile(fs1, "/datanodeFailsCommit.test", 1, BBW_SIZE);
stm.sync();
loseLeases(fs1);
// Make the NN fail to commitBlockSynchronization one time
NameNode nn = cluster.getNameNode();
nn.namesystem = spy(nn.namesystem);
doAnswer(new ThrowNTimesAnswer(IOException.class, 1)).
when(nn.namesystem).
commitBlockSynchronization((Block)anyObject(), anyInt(), anyInt(),
anyBoolean(), anyBoolean(),
(DatanodeID[])anyObject());
recoverFile(fs2);
// close() should write recovered bbw to HDFS block
assertFileSize(fs2, BBW_SIZE);
checkFile(fs2, BBW_SIZE);
} finally {
fs2.close();
fs1.close();
cluster.shutdown();
}
LOG.info("STOP");
}
/**
* Test that when a DN starts up with bbws from a file that got
* removed or finalized when it was down, the block gets deleted.
*/
public void testBBWCleanupOnStartup() throws Throwable {
LOG.info("START");
cluster = new MiniDFSCluster(conf, 3, true, null);
FileSystem fs1 = cluster.getFileSystem();
try {
int halfBlock = (int) BLOCK_SIZE / 2;
short rep = 3; // replication
assertTrue(BLOCK_SIZE % 4 == 0);
file1 = new Path("/bbwCleanupOnStartup.dat");
// write 1/2 block & sync
stm = fs1.create(file1, true, (int) BLOCK_SIZE * 2, rep, BLOCK_SIZE);
AppendTestUtil.write(stm, 0, halfBlock);
stm.sync();
String dataDirs = cluster.getDataNodes().get(0).getConf().get("dfs.data.dir");
// close one of the datanodes
MiniDFSCluster.DataNodeProperties dnprops = cluster.stopDataNode(0);
stm.close();
List<File> bbwFilesAfterShutdown = getBBWFiles(dataDirs);
assertEquals(1, bbwFilesAfterShutdown.size());
assertTrue(cluster.restartDataNode(dnprops));
List<File> bbwFilesAfterRestart = null;
// Wait up to 10 heartbeats for the files to get removed - it should
// really happen after just a couple.
for (int i = 0; i < 10; i++) {
LOG.info("Waiting for heartbeat #" + i + " after DN restart");
cluster.waitForDNHeartbeat(0, 10000);
// Check if it has been deleted
bbwFilesAfterRestart = getBBWFiles(dataDirs);
if (bbwFilesAfterRestart.size() == 0) {
break;
}
}
assertEquals(0, bbwFilesAfterRestart.size());
} finally {
fs1.close();
cluster.shutdown();
}
}
private List<File> getBBWFiles(String dfsDataDirs) {
ArrayList<File> files = new ArrayList<File>();
for (String dirString : dfsDataDirs.split(",")) {
File dir = new File(dirString);
assertTrue("data dir " + dir + " should exist",
dir.exists());
File bbwDir = new File(dir, "blocksBeingWritten");
assertTrue("bbw dir " + bbwDir + " should eixst",
bbwDir.exists());
for (File blockFile : bbwDir.listFiles()) {
if (!blockFile.getName().endsWith(".meta")) {
files.add(blockFile);
}
}
}
return files;
}
/**
* Test for following sequence:
* 1. Client finishes writing a block, but does not allocate next one
* 2. Client loses lease
* 3. Recovery process starts, but commitBlockSynchronization not called yet
* 4. Client calls addBlock and continues writing
* 5. commitBlockSynchronization proceeds
* 6. Original client tries to write/close
*/
public void testRecoveryOnBlockBoundary() throws Throwable {
LOG.info("START");
cluster = new MiniDFSCluster(conf, 1, true, null);
FileSystem fs1 = cluster.getFileSystem();
;
final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
// Allow us to delay commitBlockSynchronization
DelayAnswer delayer = new DelayAnswer();
NameNode nn = cluster.getNameNode();
nn.namesystem = spy(nn.namesystem);
doAnswer(delayer).
when(nn.namesystem).
commitBlockSynchronization((Block) anyObject(), anyInt(), anyInt(),
anyBoolean(), anyBoolean(),
(DatanodeID[]) anyObject());
try {
file1 = new Path("/testWritingDuringRecovery.test");
stm = fs1.create(file1, true, (int) BLOCK_SIZE * 2, (short) 3, BLOCK_SIZE);
AppendTestUtil.write(stm, 0, (int) (BLOCK_SIZE));
stm.sync();
LOG.info("Losing lease");
loseLeases(fs1);
LOG.info("Triggering recovery in another thread");
final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
Thread recoverThread = new Thread() {
public void run() {
try {
recoverFile(fs2);
} catch (Throwable t) {
err.set(t);
}
}
};
recoverThread.start();
LOG.info("Waiting for recovery about to call commitBlockSynchronization");
delayer.waitForCall();
LOG.info("Continuing to write to stream");
AppendTestUtil.write(stm, 0, (int) (BLOCK_SIZE));
try {
stm.sync();
fail("Sync was allowed after recovery started");
} catch (IOException ioe) {
LOG.info("Got expected IOE trying to write to a file from the writer " +
"that lost its lease", ioe);
}
LOG.info("Written more to stream, allowing commit to proceed");
delayer.proceed();
LOG.info("Joining on recovery thread");
recoverThread.join();
if (err.get() != null) {
throw err.get();
}
LOG.info("Now that recovery has finished, still expect further writes to fail.");
try {
AppendTestUtil.write(stm, 0, (int) (BLOCK_SIZE));
stm.sync();
fail("Further writes after recovery finished did not fail!");
} catch (IOException ioe) {
LOG.info("Got expected exception", ioe);
}
LOG.info("Checking that file looks good");
// close() should write recovered only the first successful
// writes
assertFileSize(fs2, BLOCK_SIZE);
checkFile(fs2, BLOCK_SIZE);
} finally {
try {
fs2.close();
fs1.close();
cluster.shutdown();
} catch (Throwable t) {
LOG.warn("Didn't close down cleanly", t);
}
}
LOG.info("STOP");
}
/**
* Mockito answer helper that triggers one latch as soon as the
* method is called, then waits on another before continuing.
*/
public static class DelayAnswer implements Answer {
private final CountDownLatch fireLatch = new CountDownLatch(1);
private final CountDownLatch waitLatch = new CountDownLatch(1);
/**
* Wait until the method is called.
*/
public void waitForCall() throws InterruptedException {
fireLatch.await();
}
/**
* Tell the method to proceed.
* This should only be called after waitForCall()
*/
public void proceed() {
waitLatch.countDown();
}
public Object answer(InvocationOnMock invocation) throws Throwable {
LOG.info("DelayAnswer firing fireLatch");
fireLatch.countDown();
try {
LOG.info("DelayAnswer waiting on waitLatch");
waitLatch.await();
LOG.info("DelayAnswer delay complete");
} catch (InterruptedException ie) {
throw new IOException("Interrupted waiting on latch", ie);
}
return invocation.callRealMethod();
}
}
/**
* Mockito answer helper that will throw an exception a given number
* of times before eventually succeding.
*/
private static class ThrowNTimesAnswer implements Answer {
private int numTimesToThrow;
private Class<? extends Throwable> exceptionClass;
public ThrowNTimesAnswer(Class<? extends Throwable> exceptionClass,
int numTimesToThrow) {
this.exceptionClass = exceptionClass;
this.numTimesToThrow = numTimesToThrow;
}
public Object answer(InvocationOnMock invocation) throws Throwable {
if (numTimesToThrow-- > 0) {
throw exceptionClass.newInstance();
}
return invocation.callRealMethod();
}
}
}