blob: 7a013d485dcb6090a8c8b4f1169a71c080d62d26 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Before;
import org.junit.Test;
/* File Append tests for HDFS-200 & HDFS-142, specifically focused on:
* using append()/sync() to recover block information
*/
public class TestFileAppend4 {
static final Log LOG = LogFactory.getLog(TestFileAppend4.class);
static final long BLOCK_SIZE = 1024;
static final long BBW_SIZE = 500; // don't align on bytes/checksum
static final Object [] NO_ARGS = new Object []{};
Configuration conf;
MiniDFSCluster cluster;
Path file1;
FSDataOutputStream stm;
boolean simulatedStorage = false;
{
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
}
@Before
public void setUp() throws Exception {
this.conf = new Configuration();
if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
}
conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
// lower heartbeat interval for fast recognition of DN death
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
1000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
// handle under-replicated blocks quickly (for replication asserts)
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
// handle failures in the DFSClient pipeline quickly
// (for cluster.shutdown(); fs.close() idiom)
conf.setInt("ipc.client.connect.max.retries", 1);
}
/*
* Recover file.
* Try and open file in append mode.
* Doing this, we get a hold of the file that crashed writer
* was writing to. Once we have it, close it. This will
* allow subsequent reader to see up to last sync.
* NOTE: This is the same algorithm that HBase uses for file recovery
* @param fs
* @throws Exception
*/
private void recoverFile(final FileSystem fs) throws Exception {
LOG.info("Recovering File Lease");
// set the soft limit to be 1 second so that the
// namenode triggers lease recovery upon append request
cluster.setLeasePeriod(1000, FSConstants.LEASE_HARDLIMIT_PERIOD);
// Trying recovery
int tries = 60;
boolean recovered = false;
FSDataOutputStream out = null;
while (!recovered && tries-- > 0) {
try {
out = fs.append(file1);
LOG.info("Successfully opened for appends");
recovered = true;
} catch (IOException e) {
LOG.info("Failed open for append, waiting on lease recovery");
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore it and try again
}
}
}
if (out != null) {
out.close();
}
if (!recovered) {
fail("Recovery should take < 1 min");
}
LOG.info("Past out lease recovery");
}
/**
* Test case that stops a writer after finalizing a block but
* before calling completeFile, and then tries to recover
* the lease from another thread.
*/
@Test(timeout=60000)
public void testRecoverFinalizedBlock() throws Throwable {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
try {
cluster.waitActive();
NameNode preSpyNN = cluster.getNameNode();
NameNode spyNN = spy(preSpyNN);
// Delay completeFile
GenericTestUtils.DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
doAnswer(delayer).when(spyNN).complete(
anyString(), anyString(), (ExtendedBlock)anyObject());
DFSClient client = new DFSClient(null, spyNN, conf, null);
file1 = new Path("/testRecoverFinalized");
final OutputStream stm = client.create("/testRecoverFinalized", true);
// write 1/2 block
AppendTestUtil.write(stm, 0, 4096);
final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
Thread t = new Thread() {
public void run() {
try {
stm.close();
} catch (Throwable t) {
err.set(t);
}
}};
t.start();
LOG.info("Waiting for close to get to latch...");
delayer.waitForCall();
// At this point, the block is finalized on the DNs, but the file
// has not been completed in the NN.
// Lose the leases
LOG.info("Killing lease checker");
client.leaserenewer.interruptAndJoin();
FileSystem fs1 = cluster.getFileSystem();
FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
fs1.getConf());
LOG.info("Recovering file");
recoverFile(fs2);
LOG.info("Telling close to proceed.");
delayer.proceed();
LOG.info("Waiting for close to finish.");
t.join();
LOG.info("Close finished.");
// We expect that close will get a "File is not open"
// error.
Throwable thrownByClose = err.get();
assertNotNull(thrownByClose);
assertTrue(thrownByClose instanceof IOException);
if (!thrownByClose.getMessage().contains(
"No lease on /testRecoverFinalized"))
throw thrownByClose;
} finally {
cluster.shutdown();
}
}
/**
* Test case that stops a writer after finalizing a block but
* before calling completeFile, recovers a file from another writer,
* starts writing from that writer, and then has the old lease holder
* call completeFile
*/
@Test(timeout=60000)
public void testCompleteOtherLeaseHoldersFile() throws Throwable {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
try {
cluster.waitActive();
NameNode preSpyNN = cluster.getNameNode();
NameNode spyNN = spy(preSpyNN);
// Delay completeFile
GenericTestUtils.DelayAnswer delayer =
new GenericTestUtils.DelayAnswer(LOG);
doAnswer(delayer).when(spyNN).complete(anyString(), anyString(),
(ExtendedBlock) anyObject());
DFSClient client = new DFSClient(null, spyNN, conf, null);
file1 = new Path("/testCompleteOtherLease");
final OutputStream stm = client.create("/testCompleteOtherLease", true);
// write 1/2 block
AppendTestUtil.write(stm, 0, 4096);
final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
Thread t = new Thread() {
public void run() {
try {
stm.close();
} catch (Throwable t) {
err.set(t);
}
}};
t.start();
LOG.info("Waiting for close to get to latch...");
delayer.waitForCall();
// At this point, the block is finalized on the DNs, but the file
// has not been completed in the NN.
// Lose the leases
LOG.info("Killing lease checker");
client.leaserenewer.interruptAndJoin();
FileSystem fs1 = cluster.getFileSystem();
FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
fs1.getConf());
LOG.info("Recovering file");
recoverFile(fs2);
LOG.info("Opening file for append from new fs");
FSDataOutputStream appenderStream = fs2.append(file1);
LOG.info("Writing some data from new appender");
AppendTestUtil.write(appenderStream, 0, 4096);
LOG.info("Telling old close to proceed.");
delayer.proceed();
LOG.info("Waiting for close to finish.");
t.join();
LOG.info("Close finished.");
// We expect that close will get a "Lease mismatch"
// error.
Throwable thrownByClose = err.get();
assertNotNull(thrownByClose);
assertTrue(thrownByClose instanceof IOException);
if (!thrownByClose.getMessage().contains(
"Lease mismatch"))
throw thrownByClose;
// The appender should be able to close properly
appenderStream.close();
} finally {
cluster.shutdown();
}
}
}