| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs; |
| |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; |
| import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; |
| import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; |
| import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; |
| import org.apache.hadoop.hdfs.util.StripedBlockUtil; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.Timeout; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| |
| import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.CELL_SIZE; |
| import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS; |
| import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS; |
| import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findFirstDataNode; |
| import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster; |
| import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster; |
| |
| public class TestReadStripedFileWithDecoding { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestReadStripedFileWithDecoding.class); |
| |
| private MiniDFSCluster cluster; |
| private DistributedFileSystem dfs; |
| |
| @Rule |
| public Timeout globalTimeout = new Timeout(300000); |
| |
| @Before |
| public void setup() throws IOException { |
| cluster = initializeCluster(); |
| dfs = cluster.getFileSystem(); |
| } |
| |
| @After |
| public void tearDown() throws IOException { |
| tearDownCluster(cluster); |
| } |
| |
| /** |
| * After reading a corrupted block, make sure the client can correctly report |
| * the corruption to the NameNode. |
| */ |
| @Test |
| public void testReportBadBlock() throws IOException { |
| // create file |
| final Path file = new Path("/corrupted"); |
| final int length = 10; // length of "corruption" |
| final byte[] bytes = StripedFileTestUtil.generateBytes(length); |
| DFSTestUtil.writeFile(dfs, file, bytes); |
| |
| // corrupt the first data block |
| int dnIndex = ReadStripedFileWithDecodingHelper.findFirstDataNode( |
| cluster, dfs, file, CELL_SIZE * NUM_DATA_UNITS); |
| Assert.assertNotEquals(-1, dnIndex); |
| LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient() |
| .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS) |
| .get(0); |
| final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, |
| CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS); |
| // find the first block file |
| File storageDir = cluster.getInstanceStorageDir(dnIndex, 0); |
| File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock()); |
| Assert.assertTrue("Block file does not exist", blkFile.exists()); |
| // corrupt the block file |
| LOG.info("Deliberately corrupting file " + blkFile.getName()); |
| try (FileOutputStream out = new FileOutputStream(blkFile)) { |
| out.write("corruption".getBytes()); |
| } |
| |
| // disable the heartbeat from DN so that the corrupted block record is kept |
| // in NameNode |
| for (DataNode dn : cluster.getDataNodes()) { |
| DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); |
| } |
| |
| try { |
| // do stateful read |
| StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes, |
| ByteBuffer.allocate(1024)); |
| |
| // check whether the corruption has been reported to the NameNode |
| final FSNamesystem ns = cluster.getNamesystem(); |
| final BlockManager bm = ns.getBlockManager(); |
| BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString()) |
| .asFile().getBlocks())[0]; |
| Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size()); |
| } finally { |
| for (DataNode dn : cluster.getDataNodes()) { |
| DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); |
| } |
| } |
| } |
| |
| @Test |
| public void testInvalidateBlock() throws IOException { |
| final Path file = new Path("/invalidate"); |
| final int length = 10; |
| final byte[] bytes = StripedFileTestUtil.generateBytes(length); |
| DFSTestUtil.writeFile(dfs, file, bytes); |
| |
| int dnIndex = findFirstDataNode(cluster, dfs, file, |
| CELL_SIZE * NUM_DATA_UNITS); |
| Assert.assertNotEquals(-1, dnIndex); |
| LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient() |
| .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS) |
| .get(0); |
| final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, |
| CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS); |
| final Block b = blks[0].getBlock().getLocalBlock(); |
| |
| DataNode dn = cluster.getDataNodes().get(dnIndex); |
| // disable the heartbeat from DN so that the invalidated block record is |
| // kept in NameNode until heartbeat expires and NN mark the dn as dead |
| DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); |
| |
| try { |
| // delete the file |
| dfs.delete(file, true); |
| // check the block is added to invalidateBlocks |
| final FSNamesystem fsn = cluster.getNamesystem(); |
| final BlockManager bm = fsn.getBlockManager(); |
| DatanodeDescriptor dnd = |
| NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId()); |
| Assert.assertTrue(bm.containsInvalidateBlock( |
| blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b)); |
| } finally { |
| DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); |
| } |
| } |
| } |