| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.IOException; |
| import java.util.EnumSet; |
| import java.util.function.Supplier; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.crypto.CryptoProtocolVersion; |
| import org.apache.hadoop.fs.CreateFlag; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.BlockType; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlocks; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; |
| import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol; |
| import org.apache.hadoop.hdfs.server.namenode.INodeFile; |
| import org.apache.hadoop.hdfs.server.namenode.LeaseManager; |
| import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; |
| import org.apache.hadoop.io.EnumSetWritable; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.util.DataChecksum; |
| import org.junit.After; |
| import org.junit.Test; |
| |
| public class TestLeaseRecovery { |
| static final int BLOCK_SIZE = 1024; |
| static final short REPLICATION_NUM = (short)3; |
| private static final long LEASE_PERIOD = 300L; |
| |
| private MiniDFSCluster cluster; |
| |
| @After |
| public void shutdown() throws IOException { |
| if (cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| } |
| |
| static void checkMetaInfo(ExtendedBlock b, DataNode dn |
| ) throws IOException { |
| TestInterDatanodeProtocol.checkMetaInfo(b, dn); |
| } |
| |
| static int min(Integer... x) { |
| int m = x[0]; |
| for(int i = 1; i < x.length; i++) { |
| if (x[i] < m) { |
| m = x[i]; |
| } |
| } |
| return m; |
| } |
| |
| void waitLeaseRecovery(MiniDFSCluster cluster) { |
| cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD); |
| // wait for the lease to expire |
| try { |
| Thread.sleep(2 * 3000); // 2 heartbeat intervals |
| } catch (InterruptedException e) { |
| } |
| } |
| |
| /** |
| * The following test first creates a file with a few blocks. |
| * It randomly truncates the replica of the last block stored in each datanode. |
| * Finally, it triggers block synchronization to synchronize all stored block. |
| */ |
| @Test |
| public void testBlockSynchronization() throws Exception { |
| final int ORG_FILE_SIZE = 3000; |
| Configuration conf = new HdfsConfiguration(); |
| conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build(); |
| cluster.waitActive(); |
| |
| //create a file |
| DistributedFileSystem dfs = cluster.getFileSystem(); |
| String filestr = "/foo"; |
| Path filepath = new Path(filestr); |
| DFSTestUtil.createFile(dfs, filepath, ORG_FILE_SIZE, REPLICATION_NUM, 0L); |
| assertTrue(dfs.exists(filepath)); |
| DFSTestUtil.waitReplication(dfs, filepath, REPLICATION_NUM); |
| |
| //get block info for the last block |
| LocatedBlock locatedblock = TestInterDatanodeProtocol.getLastLocatedBlock( |
| dfs.dfs.getNamenode(), filestr); |
| DatanodeInfo[] datanodeinfos = locatedblock.getLocations(); |
| assertEquals(REPLICATION_NUM, datanodeinfos.length); |
| |
| //connect to data nodes |
| DataNode[] datanodes = new DataNode[REPLICATION_NUM]; |
| for(int i = 0; i < REPLICATION_NUM; i++) { |
| datanodes[i] = cluster.getDataNode(datanodeinfos[i].getIpcPort()); |
| assertTrue(datanodes[i] != null); |
| } |
| |
| //verify Block Info |
| ExtendedBlock lastblock = locatedblock.getBlock(); |
| DataNode.LOG.info("newblocks=" + lastblock); |
| for(int i = 0; i < REPLICATION_NUM; i++) { |
| checkMetaInfo(lastblock, datanodes[i]); |
| } |
| |
| DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName); |
| cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName, |
| new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))); |
| |
| // expire lease to trigger block recovery. |
| waitLeaseRecovery(cluster); |
| |
| Block[] updatedmetainfo = new Block[REPLICATION_NUM]; |
| long oldSize = lastblock.getNumBytes(); |
| lastblock = TestInterDatanodeProtocol.getLastLocatedBlock( |
| dfs.dfs.getNamenode(), filestr).getBlock(); |
| long currentGS = lastblock.getGenerationStamp(); |
| for(int i = 0; i < REPLICATION_NUM; i++) { |
| updatedmetainfo[i] = DataNodeTestUtils.getFSDataset(datanodes[i]).getStoredBlock( |
| lastblock.getBlockPoolId(), lastblock.getBlockId()); |
| assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId()); |
| assertEquals(oldSize, updatedmetainfo[i].getNumBytes()); |
| assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp()); |
| } |
| |
| // verify that lease recovery does not occur when namenode is in safemode |
| System.out.println("Testing that lease recovery cannot happen during safemode."); |
| filestr = "/foo.safemode"; |
| filepath = new Path(filestr); |
| dfs.create(filepath, (short)1); |
| cluster.getNameNodeRpc().setSafeMode( |
| HdfsConstants.SafeModeAction.SAFEMODE_ENTER, false); |
| assertTrue(dfs.dfs.exists(filestr)); |
| DFSTestUtil.waitReplication(dfs, filepath, (short)1); |
| waitLeaseRecovery(cluster); |
| // verify that we still cannot recover the lease |
| LeaseManager lm = NameNodeAdapter.getLeaseManager(cluster.getNamesystem()); |
| assertTrue("Found " + lm.countLease() + " lease, expected 1", lm.countLease() == 1); |
| cluster.getNameNodeRpc().setSafeMode( |
| HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, false); |
| } |
| |
| /** |
| * Block Recovery when the meta file not having crcs for all chunks in block |
| * file |
| */ |
| @Test |
| public void testBlockRecoveryWithLessMetafile() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, |
| UserGroupInformation.getCurrentUser().getShortUserName()); |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); |
| Path file = new Path("/testRecoveryFile"); |
| DistributedFileSystem dfs = cluster.getFileSystem(); |
| FSDataOutputStream out = dfs.create(file); |
| final int FILE_SIZE = 2 * 1024 * 1024; |
| int count = 0; |
| while (count < FILE_SIZE) { |
| out.writeBytes("Data"); |
| count += 4; |
| } |
| out.hsync(); |
| // abort the original stream |
| ((DFSOutputStream) out.getWrappedStream()).abort(); |
| |
| LocatedBlocks locations = cluster.getNameNodeRpc().getBlockLocations( |
| file.toString(), 0, count); |
| ExtendedBlock block = locations.get(0).getBlock(); |
| |
| // Calculate meta file size |
| // From DataNode.java, checksum size is given by: |
| // (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM * |
| // CHECKSUM_SIZE |
| final int CHECKSUM_SIZE = 4; // CRC32 & CRC32C |
| final int bytesPerChecksum = conf.getInt( |
| DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, |
| DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT); |
| final int metaFileSize = |
| (FILE_SIZE + bytesPerChecksum - 1) / bytesPerChecksum * CHECKSUM_SIZE + |
| 8; // meta file header is 8 bytes |
| final int newMetaFileSize = metaFileSize - CHECKSUM_SIZE; |
| |
| // Corrupt the block meta file by dropping checksum for bytesPerChecksum |
| // bytes. Lease recovery is expected to recover the uncorrupted file length. |
| cluster.truncateMeta(0, block, newMetaFileSize); |
| |
| // restart DN to make replica to RWR |
| DataNodeProperties dnProp = cluster.stopDataNode(0); |
| cluster.restartDataNode(dnProp, true); |
| |
| // try to recover the lease |
| DistributedFileSystem newdfs = (DistributedFileSystem) FileSystem |
| .newInstance(cluster.getConfiguration(0)); |
| count = 0; |
| while (++count < 10 && !newdfs.recoverLease(file)) { |
| Thread.sleep(1000); |
| } |
| assertTrue("File should be closed", newdfs.recoverLease(file)); |
| |
| // Verify file length after lease recovery. The new file length should not |
| // include the bytes with corrupted checksum. |
| final long expectedNewFileLen = FILE_SIZE - bytesPerChecksum; |
| final long newFileLen = newdfs.getFileStatus(file).getLen(); |
| assertEquals(newFileLen, expectedNewFileLen); |
| } |
| |
| /** |
| * Block/lease recovery should be retried with failed nodes from the second |
| * stage removed to avoid perpetual recovery failures. |
| */ |
| @Test |
| public void testBlockRecoveryRetryAfterFailedRecovery() throws Exception { |
| Configuration conf = new Configuration(); |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); |
| Path file = new Path("/testBlockRecoveryRetryAfterFailedRecovery"); |
| DistributedFileSystem dfs = cluster.getFileSystem(); |
| |
| // Create a file. |
| FSDataOutputStream out = dfs.create(file); |
| final int FILE_SIZE = 128 * 1024; |
| int count = 0; |
| while (count < FILE_SIZE) { |
| out.writeBytes("DE K9SUL"); |
| count += 8; |
| } |
| out.hsync(); |
| |
| // Abort the original stream. |
| ((DFSOutputStream) out.getWrappedStream()).abort(); |
| |
| LocatedBlocks locations = cluster.getNameNodeRpc().getBlockLocations( |
| file.toString(), 0, count); |
| ExtendedBlock block = locations.get(0).getBlock(); |
| |
| // Finalize one replica to simulate a partial close failure. |
| cluster.getDataNodes().get(0).getFSDataset().finalizeBlock(block, false); |
| // Delete the meta file to simulate a rename/move failure. |
| cluster.deleteMeta(0, block); |
| |
| // Try to recover the lease. |
| DistributedFileSystem newDfs = (DistributedFileSystem) FileSystem |
| .newInstance(cluster.getConfiguration(0)); |
| count = 0; |
| while (count++ < 15 && !newDfs.recoverLease(file)) { |
| Thread.sleep(1000); |
| } |
| // The lease should have been recovered. |
| assertTrue("File should be closed", newDfs.recoverLease(file)); |
| } |
| |
| /** |
| * Recover the lease on a file and append file from another client. |
| */ |
| @Test |
| public void testLeaseRecoveryAndAppend() throws Exception { |
| testLeaseRecoveryAndAppend(new Configuration()); |
| } |
| |
| /** |
| * Recover the lease on a file and append file from another client with |
| * ViewDFS enabled. |
| */ |
| @Test |
| public void testLeaseRecoveryAndAppendWithViewDFS() throws Exception { |
| Configuration conf = new Configuration(); |
| conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName()); |
| testLeaseRecoveryAndAppend(conf); |
| } |
| |
| private void testLeaseRecoveryAndAppend(Configuration conf) throws Exception { |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); |
| Path file = new Path("/testLeaseRecovery"); |
| DistributedFileSystem dfs = cluster.getFileSystem(); |
| |
| // create a file with 0 bytes |
| FSDataOutputStream out = dfs.create(file); |
| out.hflush(); |
| out.hsync(); |
| |
| // abort the original stream |
| ((DFSOutputStream) out.getWrappedStream()).abort(); |
| DistributedFileSystem newdfs = |
| (DistributedFileSystem) FileSystem.newInstance |
| (cluster.getConfiguration(0)); |
| |
| // Append to a file , whose lease is held by another client should fail |
| try { |
| newdfs.append(file); |
| fail("Append to a file(lease is held by another client) should fail"); |
| } catch (RemoteException e) { |
| assertTrue(e.getMessage().contains("file lease is currently owned")); |
| } |
| |
| // Lease recovery on first try should be successful |
| boolean recoverLease = newdfs.recoverLease(file); |
| assertTrue(recoverLease); |
| FSDataOutputStream append = newdfs.append(file); |
| append.write("test".getBytes()); |
| append.close(); |
| }finally{ |
| if (cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| } |
| } |
| |
| /** |
| * HDFS-14498 - test lease can be recovered for a file where the final |
| * block was never registered with the DNs, and hence the IBRs will never |
| * be received. In this case the final block should be zero bytes and can |
| * be removed. |
| */ |
| @Test |
| public void testLeaseRecoveryEmptyCommittedLastBlock() throws Exception { |
| Configuration conf = new Configuration(); |
| DFSClient client = null; |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); |
| DistributedFileSystem dfs = cluster.getFileSystem(); |
| client = |
| new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf); |
| String file = "/test/f1"; |
| Path filePath = new Path(file); |
| |
| createCommittedNotCompleteFile(client, file, null, 1); |
| |
| INodeFile inode = cluster.getNamesystem().getFSDirectory() |
| .getINode(filePath.toString()).asFile(); |
| assertTrue(inode.isUnderConstruction()); |
| assertEquals(1, inode.numBlocks()); |
| assertNotNull(inode.getLastBlock()); |
| |
| // Ensure a different client cannot append the file |
| try { |
| dfs.append(filePath); |
| fail("Append to a file(lease is held by another client) should fail"); |
| } catch (RemoteException e) { |
| assertTrue(e.getMessage().contains("file lease is currently owned")); |
| } |
| |
| // Lease will not be recovered on the first try |
| assertEquals(false, client.recoverLease(file)); |
| for (int i=0; i < 10 && !client.recoverLease(file); i++) { |
| Thread.sleep(1000); |
| } |
| assertTrue(client.recoverLease(file)); |
| |
| inode = cluster.getNamesystem().getFSDirectory() |
| .getINode(filePath.toString()).asFile(); |
| assertTrue(!inode.isUnderConstruction()); |
| assertEquals(0, inode.numBlocks()); |
| assertNull(inode.getLastBlock()); |
| |
| // Ensure the recovered file can now be written |
| FSDataOutputStream append = dfs.append(filePath); |
| append.write("test".getBytes()); |
| append.close(); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| if (client != null) { |
| client.close(); |
| } |
| } |
| } |
| |
| /** |
| * HDFS-14498 - similar to testLeaseRecoveryEmptyCommittedLastBlock except |
| * we wait for the lease manager to recover the lease automatically. |
| */ |
| @Test |
| public void testLeaseManagerRecoversEmptyCommittedLastBlock() |
| throws Exception { |
| Configuration conf = new Configuration(); |
| DFSClient client = null; |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); |
| client = |
| new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf); |
| String file = "/test/f1"; |
| |
| createCommittedNotCompleteFile(client, file, null, 1); |
| waitLeaseRecovery(cluster); |
| |
| GenericTestUtils.waitFor(() -> { |
| String holder = NameNodeAdapter |
| .getLeaseHolderForPath(cluster.getNameNode(), file); |
| return holder == null; |
| }, 100, 10000); |
| |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| if (client != null) { |
| client.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testAbortedRecovery() throws Exception { |
| Configuration conf = new Configuration(); |
| DFSClient client = null; |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); |
| client = |
| new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf); |
| final String file = "/test/f1"; |
| |
| HdfsFileStatus stat = client.getNamenode() |
| .create(file, new FsPermission("777"), client.clientName, |
| new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), |
| true, (short) 1, 1024 * 1024 * 128L, |
| new CryptoProtocolVersion[0], null, null); |
| |
| assertNotNull(NameNodeAdapter.getLeaseHolderForPath( |
| cluster.getNameNode(), file)); |
| |
| // Add a block to the file |
| ExtendedBlock block = client.getNamenode().addBlock( |
| file, client.clientName, null, new DatanodeInfo[0], stat.getFileId(), |
| new String[0], null).getBlock(); |
| |
| // update the pipeline to get a new genstamp. |
| ExtendedBlock updatedBlock = client.getNamenode() |
| .updateBlockForPipeline(block, client.clientName) |
| .getBlock(); |
| // fake that some data was maybe written. commit block sync will |
| // reconcile. |
| updatedBlock.setNumBytes(1234); |
| |
| // get the stored block and make it look like the DN sent a RBW IBR. |
| BlockManager bm = cluster.getNamesystem().getBlockManager(); |
| BlockInfo storedBlock = bm.getStoredBlock(block.getLocalBlock()); |
| BlockUnderConstructionFeature uc = |
| storedBlock.getUnderConstructionFeature(); |
| uc.setExpectedLocations(updatedBlock.getLocalBlock(), |
| uc.getExpectedStorageLocations(), BlockType.CONTIGUOUS); |
| |
| // complete the file w/o updatePipeline to simulate client failure. |
| client.getNamenode().complete(file, client.clientName, block, |
| stat.getFileId()); |
| |
| assertNotNull(NameNodeAdapter.getLeaseHolderForPath( |
| cluster.getNameNode(), file)); |
| |
| cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD); |
| GenericTestUtils.waitFor(new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| String holder = NameNodeAdapter |
| .getLeaseHolderForPath(cluster.getNameNode(), file); |
| return holder == null; |
| } |
| }, 100, 20000); |
| // nothing was actually written so the block should be dropped. |
| assertTrue(storedBlock.isDeleted()); |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| if (client != null) { |
| client.close(); |
| } |
| } |
| } |
| |
| @Test |
| public void testLeaseManagerRecoversCommittedLastBlockWithContent() |
| throws Exception { |
| Configuration conf = new Configuration(); |
| DFSClient client = null; |
| try { |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); |
| client = |
| new DFSClient(cluster.getNameNode().getServiceRpcAddress(), conf); |
| String file = "/test/f2"; |
| |
| byte[] bytesToWrite = new byte[1]; |
| bytesToWrite[0] = 123; |
| createCommittedNotCompleteFile(client, file, bytesToWrite, 3); |
| |
| waitLeaseRecovery(cluster); |
| |
| DistributedFileSystem hdfs = cluster.getFileSystem(); |
| |
| // Now the least has been recovered, attempt to append the file and then |
| // ensure the earlier written and newly written data can be read back. |
| FSDataOutputStream op = null; |
| try { |
| op = hdfs.append(new Path(file)); |
| op.write(23); |
| } finally { |
| if (op != null) { |
| op.close(); |
| } |
| } |
| |
| FSDataInputStream stream = null; |
| try { |
| stream = cluster.getFileSystem().open(new Path(file)); |
| assertEquals(123, stream.readByte()); |
| assertEquals(23, stream.readByte()); |
| } finally { |
| stream.close(); |
| } |
| |
| // Finally check there are no leases for the file and hence the file is |
| // closed. |
| GenericTestUtils.waitFor(() -> { |
| String holder = NameNodeAdapter |
| .getLeaseHolderForPath(cluster.getNameNode(), file); |
| return holder == null; |
| }, 100, 10000); |
| |
| } finally { |
| if (cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| if (client != null) { |
| client.close(); |
| } |
| } |
| } |
| |
| private void createCommittedNotCompleteFile(DFSClient client, String file, |
| byte[] bytesToWrite, int repFactor) throws IOException { |
| HdfsFileStatus stat = client.getNamenode() |
| .create(file, new FsPermission("777"), client.clientName, |
| new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), |
| true, (short) repFactor, 1024 * 1024 * 128L, |
| new CryptoProtocolVersion[0], null, null); |
| // Add a block to the file |
| LocatedBlock blk = client.getNamenode() |
| .addBlock(file, client.clientName, null, |
| new DatanodeInfo[0], stat.getFileId(), new String[0], null); |
| ExtendedBlock finalBlock = blk.getBlock(); |
| if (bytesToWrite != null) { |
| // Here we create a output stream and then abort it so the block gets |
| // created on the datanode, but we never send the message to tell the DN |
| // to complete the block. This simulates the client crashing after it |
| // wrote the data, but before the file gets closed. |
| DFSOutputStream s = new DFSOutputStream(client, file, stat, |
| EnumSet.of(CreateFlag.CREATE), null, |
| DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512), |
| null, true); |
| s.start(); |
| s.write(bytesToWrite); |
| s.hflush(); |
| finalBlock = s.getBlock(); |
| s.abort(); |
| } |
| // Attempt to close the file. This will fail (return false) as the NN will |
| // be expecting the registered block to be reported from the DNs via IBR, |
| // but that will never happen, as we either did not write it, or we aborted |
| // the stream preventing the "close block" message to be sent to the DN. |
| boolean closed = client.getNamenode().complete( |
| file, client.clientName, finalBlock, stat.getFileId()); |
| assertEquals(false, closed); |
| } |
| |
| } |