| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.concurrent.ThreadLocalRandom; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.BlockLocation; |
| import org.apache.hadoop.fs.CreateFlag; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.HardLink; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; |
| import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlocks; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.util.Time; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| /** |
| * This class tests the building blocks that are needed to |
| * support HDFS appends. |
| */ |
| public class TestFileAppend{ |
| private static final long RANDOM_TEST_RUNTIME = 10000; |
| |
| private static byte[] fileContents = null; |
| |
| // |
| // writes to file but does not close it |
| // |
| private void writeFile(FSDataOutputStream stm) throws IOException { |
| byte[] buffer = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE); |
| stm.write(buffer); |
| } |
| |
| // |
| // verify that the data written to the full blocks are sane |
| // |
| private void checkFile(DistributedFileSystem fileSys, Path name, int repl) |
| throws IOException { |
| boolean done = false; |
| |
| // wait till all full blocks are confirmed by the datanodes. |
| while (!done) { |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) {;} |
| done = true; |
| BlockLocation[] locations = fileSys.getFileBlockLocations( |
| fileSys.getFileStatus(name), 0, AppendTestUtil.FILE_SIZE); |
| if (locations.length < AppendTestUtil.NUM_BLOCKS) { |
| System.out.println("Number of blocks found " + locations.length); |
| done = false; |
| continue; |
| } |
| for (int idx = 0; idx < AppendTestUtil.NUM_BLOCKS; idx++) { |
| if (locations[idx].getHosts().length < repl) { |
| System.out.println("Block index " + idx + " not yet replciated."); |
| done = false; |
| break; |
| } |
| } |
| } |
| byte[] expected = |
| new byte[AppendTestUtil.NUM_BLOCKS * AppendTestUtil.BLOCK_SIZE]; |
| System.arraycopy(fileContents, 0, expected, 0, expected.length); |
| // do a sanity check. Read the file |
| // do not check file status since the file is not yet closed. |
| AppendTestUtil.checkFullFile(fileSys, name, |
| AppendTestUtil.NUM_BLOCKS * AppendTestUtil.BLOCK_SIZE, |
| expected, "Read 1", false); |
| } |
| |
| @Test |
| public void testBreakHardlinksIfNeeded() throws IOException { |
| Configuration conf = new HdfsConfiguration(); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); |
| FileSystem fs = cluster.getFileSystem(); |
| InetSocketAddress addr = new InetSocketAddress("localhost", |
| cluster.getNameNodePort()); |
| DFSClient client = new DFSClient(addr, conf); |
| try { |
| // create a new file, write to it and close it. |
| Path file1 = new Path("/filestatus.dat"); |
| FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1); |
| writeFile(stm); |
| stm.close(); |
| |
| // Get a handle to the datanode |
| DataNode[] dn = cluster.listDataNodes(); |
| assertTrue("There should be only one datanode but found " + dn.length, |
| dn.length == 1); |
| |
| LocatedBlocks locations = client.getNamenode().getBlockLocations( |
| file1.toString(), 0, Long.MAX_VALUE); |
| List<LocatedBlock> blocks = locations.getLocatedBlocks(); |
| final FsDatasetSpi<?> fsd = dn[0].getFSDataset(); |
| |
| // |
| // Create hard links for a few of the blocks |
| // |
| for (int i = 0; i < blocks.size(); i = i + 2) { |
| ExtendedBlock b = blocks.get(i).getBlock(); |
| final File f = FsDatasetTestUtil.getBlockFile( |
| fsd, b.getBlockPoolId(), b.getLocalBlock()); |
| File link = new File(f.toString() + ".link"); |
| System.out.println("Creating hardlink for File " + f + " to " + link); |
| HardLink.createHardLink(f, link); |
| } |
| |
| // Detach all blocks. This should remove hardlinks (if any) |
| for (int i = 0; i < blocks.size(); i++) { |
| ExtendedBlock b = blocks.get(i).getBlock(); |
| System.out.println("breakHardlinksIfNeeded detaching block " + b); |
| assertTrue("breakHardlinksIfNeeded(" + b + ") should have returned true", |
| FsDatasetTestUtil.breakHardlinksIfNeeded(fsd, b)); |
| } |
| |
| // Since the blocks were already detached earlier, these calls should |
| // return false |
| for (int i = 0; i < blocks.size(); i++) { |
| ExtendedBlock b = blocks.get(i).getBlock(); |
| System.out.println("breakHardlinksIfNeeded re-attempting to " + |
| "detach block " + b); |
| assertTrue("breakHardlinksIfNeeded(" + b + ") should have returned false", |
| FsDatasetTestUtil.breakHardlinksIfNeeded(fsd, b)); |
| } |
| } finally { |
| client.close(); |
| fs.close(); |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test a simple flush on a simple HDFS file. |
| * @throws IOException an exception might be thrown |
| */ |
| @Test |
| public void testSimpleFlush() throws IOException { |
| Configuration conf = new HdfsConfiguration(); |
| fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| try { |
| |
| // create a new file. |
| Path file1 = new Path("/simpleFlush.dat"); |
| FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1); |
| System.out.println("Created file simpleFlush.dat"); |
| |
| // write to file |
| int mid = AppendTestUtil.FILE_SIZE /2; |
| stm.write(fileContents, 0, mid); |
| stm.hflush(); |
| System.out.println("Wrote and Flushed first part of file."); |
| |
| // write the remainder of the file |
| stm.write(fileContents, mid, AppendTestUtil.FILE_SIZE - mid); |
| System.out.println("Written second part of file"); |
| stm.hflush(); |
| stm.hflush(); |
| System.out.println("Wrote and Flushed second part of file."); |
| |
| // verify that full blocks are sane |
| checkFile(fs, file1, 1); |
| |
| stm.close(); |
| System.out.println("Closed file."); |
| |
| // verify that entire file is good |
| AppendTestUtil.checkFullFile(fs, file1, AppendTestUtil.FILE_SIZE, |
| fileContents, "Read 2"); |
| |
| } catch (IOException e) { |
| System.out.println("Exception :" + e); |
| throw e; |
| } catch (Throwable e) { |
| System.out.println("Throwable :" + e); |
| e.printStackTrace(); |
| throw new IOException("Throwable : " + e); |
| } finally { |
| fs.close(); |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test that file data can be flushed. |
| * @throws IOException an exception might be thrown |
| */ |
| @Test |
| public void testComplexFlush() throws IOException { |
| Configuration conf = new HdfsConfiguration(); |
| fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| try { |
| |
| // create a new file. |
| Path file1 = new Path("/complexFlush.dat"); |
| FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1); |
| System.out.println("Created file complexFlush.dat"); |
| |
| int start = 0; |
| for (start = 0; (start + 29) < AppendTestUtil.FILE_SIZE; ) { |
| stm.write(fileContents, start, 29); |
| stm.hflush(); |
| start += 29; |
| } |
| stm.write(fileContents, start, AppendTestUtil.FILE_SIZE -start); |
| // need to make sure we completely write out all full blocks before |
| // the checkFile() call (see FSOutputSummer#flush) |
| stm.flush(); |
| // verify that full blocks are sane |
| checkFile(fs, file1, 1); |
| stm.close(); |
| |
| // verify that entire file is good |
| AppendTestUtil.checkFullFile(fs, file1, AppendTestUtil.FILE_SIZE, |
| fileContents, "Read 2"); |
| } catch (IOException e) { |
| System.out.println("Exception :" + e); |
| throw e; |
| } catch (Throwable e) { |
| System.out.println("Throwable :" + e); |
| e.printStackTrace(); |
| throw new IOException("Throwable : " + e); |
| } finally { |
| fs.close(); |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * FileNotFoundException is expected for appending to a non-exisiting file |
| * |
| * @throws FileNotFoundException as the result |
| */ |
| @Test(expected = FileNotFoundException.class) |
| public void testFileNotFound() throws IOException { |
| Configuration conf = new HdfsConfiguration(); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); |
| FileSystem fs = cluster.getFileSystem(); |
| try { |
| Path file1 = new Path("/nonexistingfile.dat"); |
| fs.append(file1); |
| } finally { |
| fs.close(); |
| cluster.shutdown(); |
| } |
| } |
| |
| /** Test two consecutive appends on a file with a full block. */ |
| @Test |
| public void testAppendTwice() throws Exception { |
| Configuration conf = new HdfsConfiguration(); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); |
| final FileSystem fs1 = cluster.getFileSystem(); |
| final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf); |
| try { |
| |
| final Path p = new Path("/testAppendTwice/foo"); |
| final int len = 1 << 16; |
| final byte[] fileContents = AppendTestUtil.initBuffer(len); |
| |
| { |
| // create a new file with a full block. |
| FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len); |
| out.write(fileContents, 0, len); |
| out.close(); |
| } |
| |
| //1st append does not add any data so that the last block remains full |
| //and the last block in INodeFileUnderConstruction is a BlockInfo |
| //but does not have a BlockUnderConstructionFeature. |
| fs2.append(p); |
| |
| //2nd append should get AlreadyBeingCreatedException |
| fs1.append(p); |
| Assert.fail(); |
| } catch(RemoteException re) { |
| AppendTestUtil.LOG.info("Got an exception:", re); |
| Assert.assertEquals(AlreadyBeingCreatedException.class.getName(), |
| re.getClassName()); |
| } finally { |
| fs2.close(); |
| fs1.close(); |
| cluster.shutdown(); |
| } |
| } |
| |
| /** Test two consecutive appends on a file with a full block. */ |
| @Test |
| public void testAppend2Twice() throws Exception { |
| Configuration conf = new HdfsConfiguration(); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); |
| final DistributedFileSystem fs1 = cluster.getFileSystem(); |
| final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf); |
| try { |
| final Path p = new Path("/testAppendTwice/foo"); |
| final int len = 1 << 16; |
| final byte[] fileContents = AppendTestUtil.initBuffer(len); |
| |
| { |
| // create a new file with a full block. |
| FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len); |
| out.write(fileContents, 0, len); |
| out.close(); |
| } |
| |
| //1st append does not add any data so that the last block remains full |
| //and the last block in INodeFileUnderConstruction is a BlockInfo |
| //but does not have a BlockUnderConstructionFeature. |
| ((DistributedFileSystem) fs2).append(p, |
| EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); |
| |
| // 2nd append should get AlreadyBeingCreatedException |
| fs1.append(p); |
| Assert.fail(); |
| } catch(RemoteException re) { |
| AppendTestUtil.LOG.info("Got an exception:", re); |
| Assert.assertEquals(AlreadyBeingCreatedException.class.getName(), |
| re.getClassName()); |
| } finally { |
| fs2.close(); |
| fs1.close(); |
| cluster.shutdown(); |
| } |
| } |
| |
| |
| @Test |
| public void testMultipleAppends() throws Exception { |
| final long startTime = Time.monotonicNow(); |
| final Configuration conf = new HdfsConfiguration(); |
| conf.setInt( |
| DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 1); |
| |
| final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) |
| .numDataNodes(4).build(); |
| final DistributedFileSystem fs = cluster.getFileSystem(); |
| try { |
| final Path p = new Path("/testMultipleAppend/foo"); |
| final int blockSize = 1 << 16; |
| final byte[] data = AppendTestUtil.initBuffer(blockSize); |
| |
| // create an empty file. |
| fs.create(p, true, 4096, (short)3, blockSize).close(); |
| |
| int fileLen = 0; |
| for(int i = 0; |
| i < 10 || Time.monotonicNow() - startTime < RANDOM_TEST_RUNTIME; |
| i++) { |
| int appendLen = ThreadLocalRandom.current().nextInt(100) + 1; |
| if (fileLen + appendLen > data.length) { |
| break; |
| } |
| |
| AppendTestUtil.LOG.info(i + ") fileLen=" + fileLen |
| + ", appendLen=" + appendLen); |
| final FSDataOutputStream out = fs.append(p); |
| out.write(data, fileLen, appendLen); |
| out.close(); |
| fileLen += appendLen; |
| } |
| |
| Assert.assertEquals(fileLen, fs.getFileStatus(p).getLen()); |
| final byte[] actual = new byte[fileLen]; |
| final FSDataInputStream in = fs.open(p); |
| in.readFully(actual); |
| in.close(); |
| for(int i = 0; i < fileLen; i++) { |
| Assert.assertEquals(data[i], actual[i]); |
| } |
| } finally { |
| fs.close(); |
| cluster.shutdown(); |
| } |
| } |
| |
| /** Tests appending after soft-limit expires. */ |
| @Test |
| public void testAppendAfterSoftLimit() |
| throws IOException, InterruptedException { |
| Configuration conf = new HdfsConfiguration(); |
| conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); |
| //Set small soft-limit for lease |
| final long softLimit = 1L; |
| final long hardLimit = 9999999L; |
| |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) |
| .build(); |
| cluster.setLeasePeriod(softLimit, hardLimit); |
| cluster.waitActive(); |
| |
| FileSystem fs = cluster.getFileSystem(); |
| FileSystem fs2 = new DistributedFileSystem(); |
| fs2.initialize(fs.getUri(), conf); |
| |
| final Path testPath = new Path("/testAppendAfterSoftLimit"); |
| final byte[] fileContents = AppendTestUtil.initBuffer(32); |
| |
| // create a new file without closing |
| FSDataOutputStream out = fs.create(testPath); |
| out.write(fileContents); |
| |
| //Wait for > soft-limit |
| Thread.sleep(250); |
| |
| try { |
| FSDataOutputStream appendStream2 = fs2.append(testPath); |
| appendStream2.write(fileContents); |
| appendStream2.close(); |
| assertEquals(fileContents.length, fs.getFileStatus(testPath).getLen()); |
| } finally { |
| fs.close(); |
| fs2.close(); |
| cluster.shutdown(); |
| } |
| } |
| |
| /** Tests appending after soft-limit expires. */ |
| @Test |
| public void testAppend2AfterSoftLimit() throws Exception { |
| Configuration conf = new HdfsConfiguration(); |
| conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); |
| //Set small soft-limit for lease |
| final long softLimit = 1L; |
| final long hardLimit = 9999999L; |
| |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) |
| .build(); |
| cluster.setLeasePeriod(softLimit, hardLimit); |
| cluster.waitActive(); |
| |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| DistributedFileSystem fs2 = new DistributedFileSystem(); |
| fs2.initialize(fs.getUri(), conf); |
| |
| final Path testPath = new Path("/testAppendAfterSoftLimit"); |
| final byte[] fileContents = AppendTestUtil.initBuffer(32); |
| |
| // create a new file without closing |
| FSDataOutputStream out = fs.create(testPath); |
| out.write(fileContents); |
| |
| //Wait for > soft-limit |
| Thread.sleep(250); |
| |
| try { |
| FSDataOutputStream appendStream2 = fs2.append(testPath, |
| EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); |
| appendStream2.write(fileContents); |
| appendStream2.close(); |
| assertEquals(fileContents.length, fs.getFileStatus(testPath).getLen()); |
| // make sure we now have 1 block since the first writer was revoked |
| LocatedBlocks blks = fs.getClient().getLocatedBlocks(testPath.toString(), |
| 0L); |
| assertEquals(1, blks.getLocatedBlocks().size()); |
| for (LocatedBlock blk : blks.getLocatedBlocks()) { |
| assertEquals(fileContents.length, blk.getBlockSize()); |
| } |
| } finally { |
| fs.close(); |
| fs2.close(); |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Old replica of the block should not be accepted as valid for append/read |
| */ |
| @Test |
| public void testFailedAppendBlockRejection() throws Exception { |
| Configuration conf = new HdfsConfiguration(); |
| conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", |
| "false"); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) |
| .build(); |
| DistributedFileSystem fs = null; |
| try { |
| fs = cluster.getFileSystem(); |
| Path path = new Path("/test"); |
| FSDataOutputStream out = fs.create(path); |
| out.writeBytes("hello\n"); |
| out.close(); |
| |
| // stop one datanode |
| DataNodeProperties dnProp = cluster.stopDataNode(0); |
| String dnAddress = dnProp.datanode.getXferAddress().toString(); |
| if (dnAddress.startsWith("/")) { |
| dnAddress = dnAddress.substring(1); |
| } |
| |
| // append again to bump genstamps |
| for (int i = 0; i < 2; i++) { |
| out = fs.append(path); |
| out.writeBytes("helloagain\n"); |
| out.close(); |
| } |
| |
| // re-open and make the block state as underconstruction |
| out = fs.append(path); |
| cluster.restartDataNode(dnProp, true); |
| // wait till the block report comes |
| Thread.sleep(2000); |
| // check the block locations, this should not contain restarted datanode |
| BlockLocation[] locations = fs.getFileBlockLocations(path, 0, |
| Long.MAX_VALUE); |
| String[] names = locations[0].getNames(); |
| for (String node : names) { |
| if (node.equals(dnAddress)) { |
| fail("Failed append should not be present in latest block locations."); |
| } |
| } |
| out.close(); |
| } finally { |
| IOUtils.closeStream(fs); |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Old replica of the block should not be accepted as valid for append/read |
| */ |
| @Test |
| public void testMultiAppend2() throws Exception { |
| Configuration conf = new HdfsConfiguration(); |
| conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", |
| "false"); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) |
| .build(); |
| DistributedFileSystem fs = null; |
| final String hello = "hello\n"; |
| try { |
| fs = cluster.getFileSystem(); |
| Path path = new Path("/test"); |
| FSDataOutputStream out = fs.create(path); |
| out.writeBytes(hello); |
| out.close(); |
| |
| // stop one datanode |
| DataNodeProperties dnProp = cluster.stopDataNode(0); |
| String dnAddress = dnProp.datanode.getXferAddress().toString(); |
| if (dnAddress.startsWith("/")) { |
| dnAddress = dnAddress.substring(1); |
| } |
| |
| // append again to bump genstamps |
| for (int i = 0; i < 2; i++) { |
| out = fs.append(path, |
| EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null); |
| out.writeBytes(hello); |
| out.close(); |
| } |
| |
| // re-open and make the block state as underconstruction |
| out = fs.append(path, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), |
| 4096, null); |
| cluster.restartDataNode(dnProp, true); |
| // wait till the block report comes |
| Thread.sleep(2000); |
| out.writeBytes(hello); |
| out.close(); |
| // check the block locations |
| LocatedBlocks blocks = fs.getClient().getLocatedBlocks(path.toString(), 0L); |
| // since we append the file 3 time, we should be 4 blocks |
| assertEquals(4, blocks.getLocatedBlocks().size()); |
| for (LocatedBlock block : blocks.getLocatedBlocks()) { |
| assertEquals(hello.length(), block.getBlockSize()); |
| } |
| StringBuilder sb = new StringBuilder(); |
| for (int i = 0; i < 4; i++) { |
| sb.append(hello); |
| } |
| final byte[] content = sb.toString().getBytes(); |
| AppendTestUtil.checkFullFile(fs, path, content.length, content, |
| "Read /test"); |
| |
| // restart namenode to make sure the editlog can be properly applied |
| cluster.restartNameNode(true); |
| cluster.waitActive(); |
| AppendTestUtil.checkFullFile(fs, path, content.length, content, |
| "Read /test"); |
| blocks = fs.getClient().getLocatedBlocks(path.toString(), 0L); |
| // since we append the file 3 time, we should be 4 blocks |
| assertEquals(4, blocks.getLocatedBlocks().size()); |
| for (LocatedBlock block : blocks.getLocatedBlocks()) { |
| assertEquals(hello.length(), block.getBlockSize()); |
| } |
| } finally { |
| IOUtils.closeStream(fs); |
| cluster.shutdown(); |
| } |
| } |
| |
| @Test(timeout = 10000) |
| public void testAppendCorruptedBlock() throws Exception { |
| Configuration conf = new HdfsConfiguration(); |
| conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); |
| conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); |
| conf.setInt("dfs.min.replication", 1); |
| MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) |
| .build(); |
| try { |
| DistributedFileSystem fs = cluster.getFileSystem(); |
| Path fileName = new Path("/appendCorruptBlock"); |
| DFSTestUtil.createFile(fs, fileName, 512, (short) 1, 0); |
| DFSTestUtil.waitReplication(fs, fileName, (short) 1); |
| Assert.assertTrue("File not created", fs.exists(fileName)); |
| ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName); |
| cluster.corruptBlockOnDataNodes(block); |
| DFSTestUtil.appendFile(fs, fileName, "appendCorruptBlock"); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| } |