| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.server.datanode; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.ha.HAServiceProtocol; |
| import org.apache.hadoop.hdfs.AppendTestUtil; |
| import org.apache.hadoop.hdfs.DFSClient; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.HdfsConfiguration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.StripedFileTestUtil; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; |
| import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; |
| import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; |
| import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; |
| import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; |
| import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.AutoCloseableLock; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TestName; |
| import org.mockito.Mockito; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.slf4j.event.Level; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; |
| import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY; |
| import static org.junit.Assert.assertEquals; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.anyLong; |
| import static org.mockito.ArgumentMatchers.anyString; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.when; |
| |
| /** |
| * Test part 2 for sync all replicas in block recovery. |
| */ |
| public class TestBlockRecovery2 { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestBlockRecovery2.class); |
| |
| private static final String DATA_DIR = |
| MiniDFSCluster.getBaseDirectory() + "data"; |
| |
| private DataNode dn; |
| private Configuration conf; |
| private boolean tearDownDone; |
| |
| private final static String CLUSTER_ID = "testClusterID"; |
| private final static String POOL_ID = "BP-TEST"; |
| private final static InetSocketAddress NN_ADDR = new InetSocketAddress( |
| "localhost", 5020); |
| |
| @Rule |
| public TestName currentTestName = new TestName(); |
| |
| static { |
| GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.TRACE); |
| GenericTestUtils.setLogLevel(LOG, Level.TRACE); |
| } |
| |
| /** |
| * Starts an instance of DataNode. |
| * @throws IOException |
| */ |
| @Before |
| public void startUp() throws IOException { |
| tearDownDone = false; |
| conf = new HdfsConfiguration(); |
| conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR); |
| conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0"); |
| conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); |
| conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); |
| conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); |
| FileSystem.setDefaultUri(conf, |
| "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort()); |
| List<StorageLocation> locations = new ArrayList<>(); |
| File dataDir = new File(DATA_DIR); |
| FileUtil.fullyDelete(dataDir); |
| dataDir.mkdirs(); |
| StorageLocation location = StorageLocation.parse(dataDir.getPath()); |
| locations.add(location); |
| final DatanodeProtocolClientSideTranslatorPB namenode = |
| mock(DatanodeProtocolClientSideTranslatorPB.class); |
| |
| Mockito.doAnswer( |
| (Answer<DatanodeRegistration>) invocation -> |
| (DatanodeRegistration) invocation.getArguments()[0]) |
| .when(namenode) |
| .registerDatanode(Mockito.any(DatanodeRegistration.class)); |
| |
| when(namenode.versionRequest()) |
| .thenReturn(new NamespaceInfo(1, CLUSTER_ID, POOL_ID, 1L)); |
| |
| when(namenode.sendHeartbeat( |
| Mockito.any(), |
| Mockito.any(), |
| Mockito.anyLong(), |
| Mockito.anyLong(), |
| Mockito.anyInt(), |
| Mockito.anyInt(), |
| Mockito.anyInt(), |
| Mockito.any(), |
| Mockito.anyBoolean(), |
| Mockito.any(), |
| Mockito.any())) |
| .thenReturn(new HeartbeatResponse( |
| new DatanodeCommand[0], |
| new NNHAStatusHeartbeat(HAServiceProtocol.HAServiceState.ACTIVE, 1), |
| null, ThreadLocalRandom.current().nextLong() | 1L)); |
| |
| dn = new DataNode(conf, locations, null, null) { |
| @Override |
| DatanodeProtocolClientSideTranslatorPB connectToNN( |
| InetSocketAddress nnAddr) throws IOException { |
| Assert.assertEquals(NN_ADDR, nnAddr); |
| return namenode; |
| } |
| }; |
| // Trigger a heartbeat so that it acknowledges the NN as active. |
| dn.getAllBpOs().get(0).triggerHeartbeatForTests(); |
| waitForActiveNN(); |
| } |
| |
| /** |
| * Wait for active NN up to 15 seconds. |
| */ |
| private void waitForActiveNN() { |
| try { |
| GenericTestUtils.waitFor(() -> |
| dn.getAllBpOs().get(0).getActiveNN() != null, 1000, 15 * 1000); |
| } catch (TimeoutException e) { |
| // Here its not failing, will again do the assertions for activeNN after |
| // this waiting period and fails there if BPOS has not acknowledged |
| // any NN as active. |
| LOG.warn("Failed to get active NN", e); |
| } catch (InterruptedException e) { |
| LOG.warn("InterruptedException while waiting to see active NN", e); |
| } |
| Assert.assertNotNull("Failed to get ActiveNN", |
| dn.getAllBpOs().get(0).getActiveNN()); |
| } |
| |
| /** |
| * Cleans the resources and closes the instance of datanode. |
| * @throws IOException if an error occurred |
| */ |
| @After |
| public void tearDown() throws IOException { |
| if (!tearDownDone && dn != null) { |
| try { |
| dn.shutdown(); |
| } catch(Exception e) { |
| LOG.error("Cannot close: ", e); |
| } finally { |
| File dir = new File(DATA_DIR); |
| if (dir.exists()) { |
| Assert.assertTrue( |
| "Cannot delete data-node dirs", FileUtil.fullyDelete(dir)); |
| } |
| } |
| tearDownDone = true; |
| } |
| } |
| |
| /** |
| * Test to verify the race between finalizeBlock and Lease recovery. |
| * |
| * @throws Exception |
| */ |
| @Test(timeout = 20000) |
| public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() |
| throws Exception { |
| // Stop the Mocked DN started in startup() |
| tearDown(); |
| |
| Configuration configuration = new HdfsConfiguration(); |
| configuration.set( |
| DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, "1000"); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(configuration) |
| .numDataNodes(1).build(); |
| try { |
| cluster.waitClusterUp(); |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| Path path = new Path("/test"); |
| FSDataOutputStream out = fs.create(path); |
| out.writeBytes("data"); |
| out.hsync(); |
| |
| List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path)); |
| final LocatedBlock block = blocks.get(0); |
| final DataNode dataNode = cluster.getDataNodes().get(0); |
| |
| final AtomicBoolean recoveryInitResult = new AtomicBoolean(true); |
| Thread recoveryThread = new Thread(() -> { |
| try { |
| DatanodeInfo[] locations = block.getLocations(); |
| final BlockRecoveryCommand.RecoveringBlock recoveringBlock = |
| new BlockRecoveryCommand.RecoveringBlock(block.getBlock(), |
| locations, block.getBlock().getGenerationStamp() + 1); |
| try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) { |
| Thread.sleep(2000); |
| dataNode.initReplicaRecovery(recoveringBlock); |
| } |
| } catch (Exception e) { |
| recoveryInitResult.set(false); |
| } |
| }); |
| recoveryThread.start(); |
| try { |
| out.close(); |
| } catch (IOException e) { |
| Assert.assertTrue("Writing should fail", |
| e.getMessage().contains("are bad. Aborting...")); |
| } finally { |
| recoveryThread.join(); |
| } |
| Assert.assertTrue("Recovery should be initiated successfully", |
| recoveryInitResult.get()); |
| |
| dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock() |
| .getGenerationStamp() + 1, block.getBlock().getBlockId(), |
| block.getBlockSize()); |
| } finally { |
| if (null != cluster) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| /** |
| * Test for block recovery timeout. All recovery attempts will be delayed |
| * and the first attempt will be lost to trigger recovery timeout and retry. |
| */ |
| @Test(timeout = 300000L) |
| public void testRecoveryTimeout() throws Exception { |
| tearDown(); // Stop the Mocked DN started in startup() |
| final Random r = new Random(); |
| |
| // Make sure first commitBlockSynchronization call from the DN gets lost |
| // for the recovery timeout to expire and new recovery attempt |
| // to be started. |
| GenericTestUtils.SleepAnswer delayer = |
| new GenericTestUtils.SleepAnswer(3000) { |
| private final AtomicBoolean callRealMethod = new AtomicBoolean(); |
| |
| @Override |
| public Object answer(InvocationOnMock invocation) throws Throwable { |
| boolean interrupted = false; |
| try { |
| Thread.sleep(r.nextInt(3000) + 6000); |
| } catch (InterruptedException ie) { |
| interrupted = true; |
| } |
| try { |
| if (callRealMethod.get()) { |
| return invocation.callRealMethod(); |
| } |
| callRealMethod.set(true); |
| return null; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| }; |
| TestBlockRecovery.testRecoveryWithDatanodeDelayed(delayer); |
| } |
| |
| /** |
| * Test for block recovery taking longer than the heartbeat interval. |
| */ |
| @Test(timeout = 300000L) |
| public void testRecoverySlowerThanHeartbeat() throws Exception { |
| tearDown(); // Stop the Mocked DN started in startup() |
| |
| GenericTestUtils.SleepAnswer delayer = |
| new GenericTestUtils.SleepAnswer(3000, 6000); |
| TestBlockRecovery.testRecoveryWithDatanodeDelayed(delayer); |
| } |
| |
| @Test(timeout = 60000) |
| public void testEcRecoverBlocks() throws Throwable { |
| // Stop the Mocked DN started in startup() |
| tearDown(); |
| ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy(); |
| MiniDFSCluster cluster = |
| new MiniDFSCluster.Builder(conf).numDataNodes(8).build(); |
| |
| try { |
| cluster.waitActive(); |
| NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); |
| NamenodeProtocols spyNN = spy(preSpyNN); |
| |
| // Delay completeFile |
| GenericTestUtils.DelayAnswer delayer = |
| new GenericTestUtils.DelayAnswer(LOG); |
| doAnswer(delayer).when(spyNN).complete(anyString(), anyString(), any(), |
| anyLong()); |
| String topDir = "/myDir"; |
| DFSClient client = new DFSClient(null, spyNN, conf, null); |
| Path file = new Path(topDir + "/testECLeaseRecover"); |
| client.mkdirs(topDir, null, false); |
| client.enableErasureCodingPolicy(ecPolicy.getName()); |
| client.setErasureCodingPolicy(topDir, ecPolicy.getName()); |
| OutputStream stm = client.create(file.toString(), true); |
| |
| // write 5MB File |
| AppendTestUtil.write(stm, 0, 1024 * 1024 * 5); |
| final AtomicReference<Throwable> err = new AtomicReference<>(); |
| Thread t = new Thread(() -> { |
| try { |
| stm.close(); |
| } catch (Throwable t1) { |
| err.set(t1); |
| } |
| }); |
| t.start(); |
| |
| // Waiting for close to get to latch |
| delayer.waitForCall(); |
| GenericTestUtils.waitFor(() -> { |
| try { |
| return client.getNamenode().recoverLease(file.toString(), |
| client.getClientName()); |
| } catch (IOException e) { |
| return false; |
| } |
| }, 5000, 24000); |
| delayer.proceed(); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test that block will be recovered even if there are less than the |
| * specified minReplication datanodes involved in its recovery. |
| * |
| * Check that, after recovering, the block will be successfully replicated. |
| */ |
| @Test(timeout = 300000L) |
| public void testRecoveryWillIgnoreMinReplication() throws Exception { |
| tearDown(); // Stop the Mocked DN started in startup() |
| |
| final int blockSize = 4096; |
| final int numReplicas = 3; |
| final String filename = "/testIgnoreMinReplication"; |
| final Path filePath = new Path(filename); |
| Configuration configuration = new HdfsConfiguration(); |
| configuration.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000); |
| configuration.setInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 2); |
| configuration.setLong(DFS_BLOCK_SIZE_KEY, blockSize); |
| MiniDFSCluster cluster = null; |
| |
| try { |
| cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(5) |
| .build(); |
| cluster.waitActive(); |
| final DistributedFileSystem dfs = cluster.getFileSystem(); |
| final FSNamesystem fsn = cluster.getNamesystem(); |
| |
| // Create a file and never close the output stream to trigger recovery |
| FSDataOutputStream out = dfs.create(filePath, (short) numReplicas); |
| out.write(AppendTestUtil.randomBytes(0, blockSize)); |
| out.hsync(); |
| |
| DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", |
| cluster.getNameNodePort()), configuration); |
| LocatedBlock blk = dfsClient.getNamenode(). |
| getBlockLocations(filename, 0, blockSize). |
| getLastLocatedBlock(); |
| |
| // Kill 2 out of 3 datanodes so that only 1 alive, thus < minReplication |
| List<DatanodeInfo> dataNodes = Arrays.asList(blk.getLocations()); |
| assertEquals(dataNodes.size(), numReplicas); |
| for (DatanodeInfo dataNode : dataNodes.subList(0, numReplicas - 1)) { |
| cluster.stopDataNode(dataNode.getName()); |
| } |
| |
| GenericTestUtils.waitFor(() -> fsn.getNumDeadDataNodes() == 2, |
| 300, 300000); |
| |
| // Make sure hard lease expires to trigger replica recovery |
| cluster.setLeasePeriod(100L, 100L); |
| |
| // Wait for recovery to succeed |
| GenericTestUtils.waitFor(() -> { |
| try { |
| return dfs.isFileClosed(filePath); |
| } catch (IOException e) { |
| LOG.info("Something went wrong.", e); |
| } |
| return false; |
| }, 300, 300000); |
| |
| // Wait for the block to be replicated |
| DFSTestUtil.waitForReplication(cluster, DFSTestUtil.getFirstBlock( |
| dfs, filePath), 1, numReplicas, 0); |
| |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| } |
| |
| } |