| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs; |
| |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Matchers.anyObject; |
| import static org.mockito.Matchers.anyString; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.spy; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.commons.logging.impl.Log4JLogger; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.FSConstants; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; |
| import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; |
| import org.apache.hadoop.hdfs.server.namenode.LeaseManager; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.log4j.Level; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| /* File Append tests for HDFS-200 & HDFS-142, specifically focused on: |
| * using append()/sync() to recover block information |
| */ |
| public class TestFileAppend4 { |
| static final Log LOG = LogFactory.getLog(TestFileAppend4.class); |
| static final long BLOCK_SIZE = 1024; |
| static final long BBW_SIZE = 500; // don't align on bytes/checksum |
| |
| static final Object [] NO_ARGS = new Object []{}; |
| |
| Configuration conf; |
| MiniDFSCluster cluster; |
| Path file1; |
| FSDataOutputStream stm; |
| boolean simulatedStorage = false; |
| |
| { |
| ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); |
| ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); |
| ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); |
| ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); |
| ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); |
| } |
| |
| @Before |
| public void setUp() throws Exception { |
| this.conf = new Configuration(); |
| if (simulatedStorage) { |
| conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); |
| } |
| conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true); |
| |
| // lower heartbeat interval for fast recognition of DN death |
| conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, |
| 1000); |
| conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); |
| conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000); |
| // handle under-replicated blocks quickly (for replication asserts) |
| conf.setInt( |
| DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5); |
| conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); |
| |
| // handle failures in the DFSClient pipeline quickly |
| // (for cluster.shutdown(); fs.close() idiom) |
| conf.setInt("ipc.client.connect.max.retries", 1); |
| } |
| |
| /* |
| * Recover file. |
| * Try and open file in append mode. |
| * Doing this, we get a hold of the file that crashed writer |
| * was writing to. Once we have it, close it. This will |
| * allow subsequent reader to see up to last sync. |
| * NOTE: This is the same algorithm that HBase uses for file recovery |
| * @param fs |
| * @throws Exception |
| */ |
| private void recoverFile(final FileSystem fs) throws Exception { |
| LOG.info("Recovering File Lease"); |
| |
| // set the soft limit to be 1 second so that the |
| // namenode triggers lease recovery upon append request |
| cluster.setLeasePeriod(1000, FSConstants.LEASE_HARDLIMIT_PERIOD); |
| |
| // Trying recovery |
| int tries = 60; |
| boolean recovered = false; |
| FSDataOutputStream out = null; |
| while (!recovered && tries-- > 0) { |
| try { |
| out = fs.append(file1); |
| LOG.info("Successfully opened for appends"); |
| recovered = true; |
| } catch (IOException e) { |
| LOG.info("Failed open for append, waiting on lease recovery"); |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException ex) { |
| // ignore it and try again |
| } |
| } |
| } |
| if (out != null) { |
| out.close(); |
| } |
| if (!recovered) { |
| fail("Recovery should take < 1 min"); |
| } |
| LOG.info("Past out lease recovery"); |
| } |
| |
| /** |
| * Test case that stops a writer after finalizing a block but |
| * before calling completeFile, and then tries to recover |
| * the lease from another thread. |
| */ |
| @Test(timeout=60000) |
| public void testRecoverFinalizedBlock() throws Throwable { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build(); |
| |
| try { |
| cluster.waitActive(); |
| NameNode preSpyNN = cluster.getNameNode(); |
| NameNode spyNN = spy(preSpyNN); |
| |
| // Delay completeFile |
| DelayAnswer delayer = new DelayAnswer(); |
| doAnswer(delayer).when(spyNN).complete( |
| anyString(), anyString(), (ExtendedBlock)anyObject()); |
| |
| DFSClient client = new DFSClient(null, spyNN, conf, null); |
| file1 = new Path("/testRecoverFinalized"); |
| final OutputStream stm = client.create("/testRecoverFinalized", true); |
| |
| // write 1/2 block |
| AppendTestUtil.write(stm, 0, 4096); |
| final AtomicReference<Throwable> err = new AtomicReference<Throwable>(); |
| Thread t = new Thread() { |
| public void run() { |
| try { |
| stm.close(); |
| } catch (Throwable t) { |
| err.set(t); |
| } |
| }}; |
| t.start(); |
| LOG.info("Waiting for close to get to latch..."); |
| delayer.waitForCall(); |
| |
| // At this point, the block is finalized on the DNs, but the file |
| // has not been completed in the NN. |
| // Lose the leases |
| LOG.info("Killing lease checker"); |
| client.leaserenewer.interruptAndJoin(); |
| |
| FileSystem fs1 = cluster.getFileSystem(); |
| FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername( |
| fs1.getConf()); |
| |
| LOG.info("Recovering file"); |
| recoverFile(fs2); |
| |
| LOG.info("Telling close to proceed."); |
| delayer.proceed(); |
| LOG.info("Waiting for close to finish."); |
| t.join(); |
| LOG.info("Close finished."); |
| |
| // We expect that close will get a "File is not open" |
| // error. |
| Throwable thrownByClose = err.get(); |
| assertNotNull(thrownByClose); |
| assertTrue(thrownByClose instanceof IOException); |
| if (!thrownByClose.getMessage().contains( |
| "No lease on /testRecoverFinalized")) |
| throw thrownByClose; |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test case that stops a writer after finalizing a block but |
| * before calling completeFile, recovers a file from another writer, |
| * starts writing from that writer, and then has the old lease holder |
| * call completeFile |
| */ |
| @Test(timeout=60000) |
| public void testCompleteOtherLeaseHoldersFile() throws Throwable { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build(); |
| |
| try { |
| cluster.waitActive(); |
| NameNode preSpyNN = cluster.getNameNode(); |
| NameNode spyNN = spy(preSpyNN); |
| |
| // Delay completeFile |
| DelayAnswer delayer = new DelayAnswer(); |
| doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), |
| (ExtendedBlock) anyObject()); |
| |
| DFSClient client = new DFSClient(null, spyNN, conf, null); |
| file1 = new Path("/testCompleteOtherLease"); |
| final OutputStream stm = client.create("/testCompleteOtherLease", true); |
| |
| // write 1/2 block |
| AppendTestUtil.write(stm, 0, 4096); |
| final AtomicReference<Throwable> err = new AtomicReference<Throwable>(); |
| Thread t = new Thread() { |
| public void run() { |
| try { |
| stm.close(); |
| } catch (Throwable t) { |
| err.set(t); |
| } |
| }}; |
| t.start(); |
| LOG.info("Waiting for close to get to latch..."); |
| delayer.waitForCall(); |
| |
| // At this point, the block is finalized on the DNs, but the file |
| // has not been completed in the NN. |
| // Lose the leases |
| LOG.info("Killing lease checker"); |
| client.leaserenewer.interruptAndJoin(); |
| |
| FileSystem fs1 = cluster.getFileSystem(); |
| FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername( |
| fs1.getConf()); |
| |
| LOG.info("Recovering file"); |
| recoverFile(fs2); |
| |
| LOG.info("Opening file for append from new fs"); |
| FSDataOutputStream appenderStream = fs2.append(file1); |
| |
| LOG.info("Writing some data from new appender"); |
| AppendTestUtil.write(appenderStream, 0, 4096); |
| |
| LOG.info("Telling old close to proceed."); |
| delayer.proceed(); |
| LOG.info("Waiting for close to finish."); |
| t.join(); |
| LOG.info("Close finished."); |
| |
| // We expect that close will get a "Lease mismatch" |
| // error. |
| Throwable thrownByClose = err.get(); |
| assertNotNull(thrownByClose); |
| assertTrue(thrownByClose instanceof IOException); |
| if (!thrownByClose.getMessage().contains( |
| "Lease mismatch")) |
| throw thrownByClose; |
| |
| // The appender should be able to close properly |
| appenderStream.close(); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Mockito answer helper that triggers one latch as soon as the |
| * method is called, then waits on another before continuing. |
| */ |
| private static class DelayAnswer implements Answer<Object> { |
| private final CountDownLatch fireLatch = new CountDownLatch(1); |
| private final CountDownLatch waitLatch = new CountDownLatch(1); |
| |
| /** |
| * Wait until the method is called. |
| */ |
| public void waitForCall() throws InterruptedException { |
| fireLatch.await(); |
| } |
| |
| /** |
| * Tell the method to proceed. |
| * This should only be called after waitForCall() |
| */ |
| public void proceed() { |
| waitLatch.countDown(); |
| } |
| |
| public Object answer(InvocationOnMock invocation) throws Throwable { |
| LOG.info("DelayAnswer firing fireLatch"); |
| fireLatch.countDown(); |
| try { |
| LOG.info("DelayAnswer waiting on waitLatch"); |
| waitLatch.await(); |
| LOG.info("DelayAnswer delay complete"); |
| } catch (InterruptedException ie) { |
| throw new IOException("Interrupted waiting on latch", ie); |
| } |
| return invocation.callRealMethod(); |
| } |
| } |
| } |