| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.namenode.snapshot; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.EnumSet; |
| import java.util.HashSet; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileChecksum; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.DFSConfigKeys; |
| import org.apache.hadoop.hdfs.DFSOutputStream; |
| import org.apache.hadoop.hdfs.DFSTestUtil; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; |
| import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; |
| import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; |
| import org.apache.hadoop.util.Time; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class TestOpenFilesWithSnapshot { |
| private static final Log LOG = |
| LogFactory.getLog(TestOpenFilesWithSnapshot.class.getName()); |
| private final Configuration conf = new Configuration(); |
| MiniDFSCluster cluster = null; |
| DistributedFileSystem fs = null; |
| |
| private static final long SEED = 0; |
| private static final short REPLICATION = 3; |
| private static final long BLOCKSIZE = 1024; |
| private static final long BUFFERLEN = BLOCKSIZE / 2; |
| private static final long FILELEN = BLOCKSIZE * 2; |
| |
| @Before |
| public void setup() throws IOException { |
| conf.setBoolean( |
| DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES, true); |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); |
| conf.set("dfs.blocksize", "1048576"); |
| fs = cluster.getFileSystem(); |
| } |
| |
| @After |
| public void teardown() throws IOException { |
| if (fs != null) { |
| fs.close(); |
| fs = null; |
| } |
| if (cluster != null) { |
| cluster.shutdown(); |
| cluster = null; |
| } |
| |
| } |
| |
| @Test |
| public void testUCFileDeleteWithSnapShot() throws Exception { |
| Path path = new Path("/test"); |
| doWriteAndAbort(fs, path); |
| |
| // delete files separately |
| fs.delete(new Path("/test/test/test2"), true); |
| fs.delete(new Path("/test/test/test3"), true); |
| restartNameNode(); |
| } |
| |
| @Test |
| public void testParentDirWithUCFileDeleteWithSnapShot() throws Exception { |
| Path path = new Path("/test"); |
| doWriteAndAbort(fs, path); |
| |
| // delete parent directory |
| fs.delete(new Path("/test/test"), true); |
| restartNameNode(); |
| } |
| |
| @Test |
| public void testWithCheckpoint() throws Exception { |
| Path path = new Path("/test"); |
| doWriteAndAbort(fs, path); |
| fs.delete(new Path("/test/test"), true); |
| restartNameNode(); |
| |
| // read snapshot file after restart |
| String test2snapshotPath = Snapshot.getSnapshotPath(path.toString(), |
| "s1/test/test2"); |
| DFSTestUtil.readFile(fs, new Path(test2snapshotPath)); |
| String test3snapshotPath = Snapshot.getSnapshotPath(path.toString(), |
| "s1/test/test3"); |
| DFSTestUtil.readFile(fs, new Path(test3snapshotPath)); |
| } |
| |
| @Test |
| public void testFilesDeletionWithCheckpoint() throws Exception { |
| Path path = new Path("/test"); |
| doWriteAndAbort(fs, path); |
| fs.delete(new Path("/test/test/test2"), true); |
| fs.delete(new Path("/test/test/test3"), true); |
| restartNameNode(); |
| |
| // read snapshot file after restart |
| String test2snapshotPath = Snapshot.getSnapshotPath(path.toString(), |
| "s1/test/test2"); |
| DFSTestUtil.readFile(fs, new Path(test2snapshotPath)); |
| String test3snapshotPath = Snapshot.getSnapshotPath(path.toString(), |
| "s1/test/test3"); |
| DFSTestUtil.readFile(fs, new Path(test3snapshotPath)); |
| } |
| |
| private void doWriteAndAbort(DistributedFileSystem fs, Path path) |
| throws IOException { |
| fs.mkdirs(path); |
| fs.allowSnapshot(path); |
| DFSTestUtil |
| .createFile(fs, new Path("/test/test1"), 100, (short) 2, 100024L); |
| DFSTestUtil |
| .createFile(fs, new Path("/test/test2"), 100, (short) 2, 100024L); |
| Path file = new Path("/test/test/test2"); |
| FSDataOutputStream out = fs.create(file); |
| for (int i = 0; i < 2; i++) { |
| long count = 0; |
| while (count < 1048576) { |
| out.writeBytes("hell"); |
| count += 4; |
| } |
| } |
| ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet |
| .of(SyncFlag.UPDATE_LENGTH)); |
| DFSTestUtil.abortStream((DFSOutputStream) out.getWrappedStream()); |
| Path file2 = new Path("/test/test/test3"); |
| FSDataOutputStream out2 = fs.create(file2); |
| for (int i = 0; i < 2; i++) { |
| long count = 0; |
| while (count < 1048576) { |
| out2.writeBytes("hell"); |
| count += 4; |
| } |
| } |
| ((DFSOutputStream) out2.getWrappedStream()).hsync(EnumSet |
| .of(SyncFlag.UPDATE_LENGTH)); |
| DFSTestUtil.abortStream((DFSOutputStream) out2.getWrappedStream()); |
| fs.createSnapshot(path, "s1"); |
| } |
| |
| @Test |
| public void testOpenFilesWithMultipleSnapshots() throws Exception { |
| doTestMultipleSnapshots(true); |
| } |
| |
| @Test |
| public void testOpenFilesWithMultipleSnapshotsWithoutCheckpoint() |
| throws Exception { |
| doTestMultipleSnapshots(false); |
| } |
| |
| private void doTestMultipleSnapshots(boolean saveNamespace) |
| throws IOException { |
| Path path = new Path("/test"); |
| doWriteAndAbort(fs, path); |
| fs.createSnapshot(path, "s2"); |
| fs.delete(new Path("/test/test"), true); |
| fs.deleteSnapshot(path, "s2"); |
| cluster.triggerBlockReports(); |
| if (saveNamespace) { |
| NameNode nameNode = cluster.getNameNode(); |
| NameNodeAdapter.enterSafeMode(nameNode, false); |
| NameNodeAdapter.saveNamespace(nameNode); |
| NameNodeAdapter.leaveSafeMode(nameNode); |
| } |
| cluster.restartNameNode(true); |
| } |
| |
| @Test |
| public void testOpenFilesWithRename() throws Exception { |
| Path path = new Path("/test"); |
| doWriteAndAbort(fs, path); |
| |
| // check for zero sized blocks |
| Path fileWithEmptyBlock = new Path("/test/test/test4"); |
| fs.create(fileWithEmptyBlock); |
| NamenodeProtocols nameNodeRpc = cluster.getNameNodeRpc(); |
| String clientName = fs.getClient().getClientName(); |
| // create one empty block |
| nameNodeRpc.addBlock(fileWithEmptyBlock.toString(), clientName, null, null, |
| HdfsConstants.GRANDFATHER_INODE_ID, null, null); |
| fs.createSnapshot(path, "s2"); |
| |
| fs.rename(new Path("/test/test"), new Path("/test/test-renamed")); |
| fs.delete(new Path("/test/test-renamed"), true); |
| restartNameNode(); |
| } |
| |
| private void createFile(final Path filePath) throws IOException { |
| DFSTestUtil.createFile(fs, filePath, (int) BUFFERLEN, |
| FILELEN, BLOCKSIZE, REPLICATION, SEED); |
| } |
| |
| private int writeToStream(final FSDataOutputStream outputStream, byte[] buf) |
| throws IOException { |
| outputStream.write(buf); |
| ((HdfsDataOutputStream)outputStream).hsync( |
| EnumSet.of(SyncFlag.UPDATE_LENGTH)); |
| return buf.length; |
| } |
| |
| /** |
| * Test open files under snapshot directories are getting captured |
| * in snapshots as a truly immutable copy. Verify open files outside |
| * of snapshot directory not getting affected. |
| * |
| * \- level_0_A |
| * \- level_1_C |
| * +- appA.log (open file, not under snap root) |
| * \- level_2_E (Snapshottable Dir) |
| * \- level_3_G |
| * +- flume.log (open file, under snap root) |
| * \- level_0_B |
| * +- appB.log (open file, not under snap root) |
| * \- level_2_D (Snapshottable Dir) |
| * +- hbase.log (open file, under snap root) |
| */ |
| @Test (timeout = 120000) |
| public void testPointInTimeSnapshotCopiesForOpenFiles() throws Exception { |
| conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES, |
| true); |
| // Construct the directory tree |
| final Path level0A = new Path("/level_0_A"); |
| final Path level0B = new Path("/level_0_B"); |
| final Path level1C = new Path(level0A, "level_1_C"); |
| final Path level1D = new Path(level0B, "level_1_D"); |
| final Path level2E = new Path(level1C, "level_2_E"); |
| final Path level3G = new Path(level2E, "level_3_G"); |
| Set<Path> dirPaths = new HashSet<>(Arrays.asList(level0A, level0B, |
| level1C, level1D, level2E, level3G)); |
| for (Path dirPath : dirPaths) { |
| fs.mkdirs(dirPath); |
| } |
| |
| // String constants |
| final Path flumeSnapRootDir = level2E; |
| final Path hbaseSnapRootDir = level1D; |
| final String flumeFileName = "flume.log"; |
| final String hbaseFileName = "hbase.log"; |
| final String appAFileName = "appA.log"; |
| final String appBFileName = "appB.log"; |
| final String flumeSnap1Name = "flume_snap_s1"; |
| final String flumeSnap2Name = "flume_snap_s2"; |
| final String flumeSnap3Name = "flume_snap_s3"; |
| final String hbaseSnap1Name = "hbase_snap_s1"; |
| final String hbaseSnap2Name = "hbase_snap_s2"; |
| final String hbaseSnap3Name = "hbase_snap_s3"; |
| final String flumeRelPathFromSnapDir = "level_3_G/" + flumeFileName; |
| |
| // Create files and open a stream |
| final Path flumeFile = new Path(level3G, flumeFileName); |
| createFile(flumeFile); |
| FSDataOutputStream flumeOutputStream = fs.append(flumeFile); |
| |
| final Path hbaseFile = new Path(level1D, hbaseFileName); |
| createFile(hbaseFile); |
| FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile); |
| |
| final Path appAFile = new Path(level1C, appAFileName); |
| createFile(appAFile); |
| FSDataOutputStream appAOutputStream = fs.append(appAFile); |
| |
| final Path appBFile = new Path(level0B, appBFileName); |
| createFile(appBFile); |
| FSDataOutputStream appBOutputStream = fs.append(appBFile); |
| |
| final long appAFileInitialLength = fs.getFileStatus(appAFile).getLen(); |
| final long appBFileInitialLength = fs.getFileStatus(appBFile).getLen(); |
| |
| // Create Snapshot S1 |
| final Path flumeS1Dir = SnapshotTestHelper.createSnapshot( |
| fs, flumeSnapRootDir, flumeSnap1Name); |
| final Path flumeS1Path = new Path(flumeS1Dir, flumeRelPathFromSnapDir); |
| final Path hbaseS1Dir = SnapshotTestHelper.createSnapshot( |
| fs, hbaseSnapRootDir, hbaseSnap1Name); |
| final Path hbaseS1Path = new Path(hbaseS1Dir, hbaseFileName); |
| |
| final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen(); |
| final long hbaseFileLengthAfterS1 = fs.getFileStatus(hbaseFile).getLen(); |
| |
| // Verify if Snap S1 file lengths are same as the the live ones |
| Assert.assertEquals(flumeFileLengthAfterS1, |
| fs.getFileStatus(flumeS1Path).getLen()); |
| Assert.assertEquals(hbaseFileLengthAfterS1, |
| fs.getFileStatus(hbaseS1Path).getLen()); |
| Assert.assertEquals(appAFileInitialLength, |
| fs.getFileStatus(appAFile).getLen()); |
| Assert.assertEquals(appBFileInitialLength, |
| fs.getFileStatus(appBFile).getLen()); |
| |
| long flumeFileWrittenDataLength = flumeFileLengthAfterS1; |
| long hbaseFileWrittenDataLength = hbaseFileLengthAfterS1; |
| long appAFileWrittenDataLength = appAFileInitialLength; |
| |
| int newWriteLength = (int) (BLOCKSIZE * 1.5); |
| byte[] buf = new byte[newWriteLength]; |
| Random random = new Random(); |
| random.nextBytes(buf); |
| |
| // Write more data to flume and hbase files only |
| flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); |
| hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf); |
| |
| // Create Snapshot S2 |
| final Path flumeS2Dir = SnapshotTestHelper.createSnapshot( |
| fs, flumeSnapRootDir, flumeSnap2Name); |
| final Path flumeS2Path = new Path(flumeS2Dir, flumeRelPathFromSnapDir); |
| final Path hbaseS2Dir = SnapshotTestHelper.createSnapshot( |
| fs, hbaseSnapRootDir, hbaseSnap2Name); |
| final Path hbaseS2Path = new Path(hbaseS2Dir, hbaseFileName); |
| |
| // Verify live files lengths are same as all data written till now |
| final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen(); |
| final long hbaseFileLengthAfterS2 = fs.getFileStatus(hbaseFile).getLen(); |
| Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2); |
| Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS2); |
| |
| // Verify if Snap S2 file lengths are same as the live ones |
| Assert.assertEquals(flumeFileLengthAfterS2, |
| fs.getFileStatus(flumeS2Path).getLen()); |
| Assert.assertEquals(hbaseFileLengthAfterS2, |
| fs.getFileStatus(hbaseS2Path).getLen()); |
| Assert.assertEquals(appAFileInitialLength, |
| fs.getFileStatus(appAFile).getLen()); |
| Assert.assertEquals(appBFileInitialLength, |
| fs.getFileStatus(appBFile).getLen()); |
| |
| // Write more data to appA file only |
| newWriteLength = (int) (BLOCKSIZE * 2.5); |
| buf = new byte[newWriteLength]; |
| random.nextBytes(buf); |
| appAFileWrittenDataLength += writeToStream(appAOutputStream, buf); |
| |
| // Verify other open files are not affected in their snapshots |
| Assert.assertEquals(flumeFileLengthAfterS2, |
| fs.getFileStatus(flumeS2Path).getLen()); |
| Assert.assertEquals(appAFileWrittenDataLength, |
| fs.getFileStatus(appAFile).getLen()); |
| |
| // Write more data to flume file only |
| newWriteLength = (int) (BLOCKSIZE * 2.5); |
| buf = new byte[newWriteLength]; |
| random.nextBytes(buf); |
| flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); |
| |
| // Create Snapshot S3 |
| final Path flumeS3Dir = SnapshotTestHelper.createSnapshot( |
| fs, flumeSnapRootDir, flumeSnap3Name); |
| final Path flumeS3Path = new Path(flumeS3Dir, flumeRelPathFromSnapDir); |
| final Path hbaseS3Dir = SnapshotTestHelper.createSnapshot( |
| fs, hbaseSnapRootDir, hbaseSnap3Name); |
| final Path hbaseS3Path = new Path(hbaseS3Dir, hbaseFileName); |
| |
| // Verify live files lengths are same as all data written till now |
| final long flumeFileLengthAfterS3 = fs.getFileStatus(flumeFile).getLen(); |
| final long hbaseFileLengthAfterS3 = fs.getFileStatus(hbaseFile).getLen(); |
| Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS3); |
| Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS3); |
| |
| // Verify if Snap S3 file lengths are same as the live ones |
| Assert.assertEquals(flumeFileLengthAfterS3, |
| fs.getFileStatus(flumeS3Path).getLen()); |
| Assert.assertEquals(hbaseFileLengthAfterS3, |
| fs.getFileStatus(hbaseS3Path).getLen()); |
| Assert.assertEquals(appAFileWrittenDataLength, |
| fs.getFileStatus(appAFile).getLen()); |
| Assert.assertEquals(appBFileInitialLength, |
| fs.getFileStatus(appBFile).getLen()); |
| |
| // Verify old flume snapshots have point-in-time / frozen file lengths |
| // even after the live file have moved forward. |
| Assert.assertEquals(flumeFileLengthAfterS1, |
| fs.getFileStatus(flumeS1Path).getLen()); |
| Assert.assertEquals(flumeFileLengthAfterS2, |
| fs.getFileStatus(flumeS2Path).getLen()); |
| Assert.assertEquals(flumeFileLengthAfterS3, |
| fs.getFileStatus(flumeS3Path).getLen()); |
| |
| // Verify old hbase snapshots have point-in-time / frozen file lengths |
| // even after the live files have moved forward. |
| Assert.assertEquals(hbaseFileLengthAfterS1, |
| fs.getFileStatus(hbaseS1Path).getLen()); |
| Assert.assertEquals(hbaseFileLengthAfterS2, |
| fs.getFileStatus(hbaseS2Path).getLen()); |
| Assert.assertEquals(hbaseFileLengthAfterS3, |
| fs.getFileStatus(hbaseS3Path).getLen()); |
| |
| flumeOutputStream.close(); |
| hbaseOutputStream.close(); |
| appAOutputStream.close(); |
| appBOutputStream.close(); |
| } |
| |
| /** |
| * Test snapshot capturing open files and verify the same |
| * across NameNode restarts. |
| */ |
| @Test (timeout = 120000) |
| public void testSnapshotsForOpenFilesWithNNRestart() throws Exception { |
| // Construct the directory tree |
| final Path level0A = new Path("/level_0_A"); |
| final Path flumeSnapRootDir = level0A; |
| final String flumeFileName = "flume.log"; |
| final String flumeSnap1Name = "flume_snap_1"; |
| final String flumeSnap2Name = "flume_snap_2"; |
| |
| // Create files and open a stream |
| final Path flumeFile = new Path(level0A, flumeFileName); |
| createFile(flumeFile); |
| FSDataOutputStream flumeOutputStream = fs.append(flumeFile); |
| |
| // Create Snapshot S1 |
| final Path flumeS1Dir = SnapshotTestHelper.createSnapshot( |
| fs, flumeSnapRootDir, flumeSnap1Name); |
| final Path flumeS1Path = new Path(flumeS1Dir, flumeFileName); |
| final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen(); |
| |
| // Verify if Snap S1 file length is same as the the live one |
| Assert.assertEquals(flumeFileLengthAfterS1, |
| fs.getFileStatus(flumeS1Path).getLen()); |
| |
| long flumeFileWrittenDataLength = flumeFileLengthAfterS1; |
| int newWriteLength = (int) (BLOCKSIZE * 1.5); |
| byte[] buf = new byte[newWriteLength]; |
| Random random = new Random(); |
| random.nextBytes(buf); |
| |
| // Write more data to flume file |
| flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); |
| |
| // Create Snapshot S2 |
| final Path flumeS2Dir = SnapshotTestHelper.createSnapshot( |
| fs, flumeSnapRootDir, flumeSnap2Name); |
| final Path flumeS2Path = new Path(flumeS2Dir, flumeFileName); |
| |
| // Verify live files length is same as all data written till now |
| final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen(); |
| Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2); |
| |
| // Verify if Snap S2 file length is same as the live one |
| Assert.assertEquals(flumeFileLengthAfterS2, |
| fs.getFileStatus(flumeS2Path).getLen()); |
| |
| // Write more data to flume file |
| flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); |
| |
| // Verify old flume snapshots have point-in-time / frozen file lengths |
| // even after the live file have moved forward. |
| Assert.assertEquals(flumeFileLengthAfterS1, |
| fs.getFileStatus(flumeS1Path).getLen()); |
| Assert.assertEquals(flumeFileLengthAfterS2, |
| fs.getFileStatus(flumeS2Path).getLen()); |
| |
| // Restart the NameNode |
| restartNameNode(); |
| cluster.waitActive(); |
| |
| // Verify live file length hasn't changed after NN restart |
| Assert.assertEquals(flumeFileWrittenDataLength, |
| fs.getFileStatus(flumeFile).getLen()); |
| |
| // Verify old flume snapshots have point-in-time / frozen file lengths |
| // after NN restart and live file moved forward. |
| Assert.assertEquals(flumeFileLengthAfterS1, |
| fs.getFileStatus(flumeS1Path).getLen()); |
| Assert.assertEquals(flumeFileLengthAfterS2, |
| fs.getFileStatus(flumeS2Path).getLen()); |
| |
| flumeOutputStream.close(); |
| } |
| |
| /** |
| * Test snapshot capturing open files when an open file with active lease |
| * is deleted by the client. |
| */ |
| @Test (timeout = 120000) |
| public void testSnapshotsForOpenFilesAndDeletion() throws Exception { |
| // Construct the directory tree |
| final Path snapRootDir = new Path("/level_0_A"); |
| final String flumeFileName = "flume.log"; |
| final String hbaseFileName = "hbase.log"; |
| final String snap1Name = "snap_1"; |
| final String snap2Name = "snap_2"; |
| final String snap3Name = "snap_3"; |
| |
| // Create files and open streams |
| final Path flumeFile = new Path(snapRootDir, flumeFileName); |
| createFile(flumeFile); |
| final Path hbaseFile = new Path(snapRootDir, hbaseFileName); |
| createFile(hbaseFile); |
| FSDataOutputStream flumeOutputStream = fs.append(flumeFile); |
| FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile); |
| |
| // Create Snapshot S1 |
| final Path snap1Dir = SnapshotTestHelper.createSnapshot( |
| fs, snapRootDir, snap1Name); |
| final Path flumeS1Path = new Path(snap1Dir, flumeFileName); |
| final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen(); |
| final Path hbaseS1Path = new Path(snap1Dir, hbaseFileName); |
| final long hbaseFileLengthAfterS1 = fs.getFileStatus(hbaseFile).getLen(); |
| |
| // Verify if Snap S1 file length is same as the the current versions |
| Assert.assertEquals(flumeFileLengthAfterS1, |
| fs.getFileStatus(flumeS1Path).getLen()); |
| Assert.assertEquals(hbaseFileLengthAfterS1, |
| fs.getFileStatus(hbaseS1Path).getLen()); |
| |
| long flumeFileWrittenDataLength = flumeFileLengthAfterS1; |
| long hbaseFileWrittenDataLength = hbaseFileLengthAfterS1; |
| int newWriteLength = (int) (BLOCKSIZE * 1.5); |
| byte[] buf = new byte[newWriteLength]; |
| Random random = new Random(); |
| random.nextBytes(buf); |
| |
| // Write more data to files |
| flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); |
| hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf); |
| |
| // Create Snapshot S2 |
| final Path snap2Dir = SnapshotTestHelper.createSnapshot( |
| fs, snapRootDir, snap2Name); |
| final Path flumeS2Path = new Path(snap2Dir, flumeFileName); |
| final Path hbaseS2Path = new Path(snap2Dir, hbaseFileName); |
| |
| // Verify current files length are same as all data written till now |
| final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen(); |
| Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2); |
| final long hbaseFileLengthAfterS2 = fs.getFileStatus(hbaseFile).getLen(); |
| Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS2); |
| |
| // Verify if Snap S2 file length is same as the current versions |
| Assert.assertEquals(flumeFileLengthAfterS2, |
| fs.getFileStatus(flumeS2Path).getLen()); |
| Assert.assertEquals(hbaseFileLengthAfterS2, |
| fs.getFileStatus(hbaseS2Path).getLen()); |
| |
| // Write more data to open files |
| writeToStream(flumeOutputStream, buf); |
| hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf); |
| |
| // Verify old snapshots have point-in-time/frozen file |
| // lengths even after the current versions have moved forward. |
| Assert.assertEquals(flumeFileLengthAfterS1, |
| fs.getFileStatus(flumeS1Path).getLen()); |
| Assert.assertEquals(flumeFileLengthAfterS2, |
| fs.getFileStatus(flumeS2Path).getLen()); |
| Assert.assertEquals(hbaseFileLengthAfterS1, |
| fs.getFileStatus(hbaseS1Path).getLen()); |
| Assert.assertEquals(hbaseFileLengthAfterS2, |
| fs.getFileStatus(hbaseS2Path).getLen()); |
| |
| // Delete flume current file. Snapshots should |
| // still have references to flume file. |
| boolean flumeFileDeleted = fs.delete(flumeFile, true); |
| Assert.assertTrue(flumeFileDeleted); |
| Assert.assertFalse(fs.exists(flumeFile)); |
| Assert.assertTrue(fs.exists(flumeS1Path)); |
| Assert.assertTrue(fs.exists(flumeS2Path)); |
| |
| SnapshotTestHelper.createSnapshot(fs, snapRootDir, "tmp_snap"); |
| fs.deleteSnapshot(snapRootDir, "tmp_snap"); |
| |
| // Delete snap_2. snap_1 still has reference to |
| // the flume file. |
| fs.deleteSnapshot(snapRootDir, snap2Name); |
| Assert.assertFalse(fs.exists(flumeS2Path)); |
| Assert.assertTrue(fs.exists(flumeS1Path)); |
| |
| // Delete snap_1. Now all traces of flume file |
| // is gone. |
| fs.deleteSnapshot(snapRootDir, snap1Name); |
| Assert.assertFalse(fs.exists(flumeS2Path)); |
| Assert.assertFalse(fs.exists(flumeS1Path)); |
| |
| // Create Snapshot S3 |
| final Path snap3Dir = SnapshotTestHelper.createSnapshot( |
| fs, snapRootDir, snap3Name); |
| final Path hbaseS3Path = new Path(snap3Dir, hbaseFileName); |
| |
| // Verify live files length is same as all data written till now |
| final long hbaseFileLengthAfterS3 = fs.getFileStatus(hbaseFile).getLen(); |
| Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS3); |
| |
| // Write more data to open files |
| hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf); |
| |
| // Verify old snapshots have point-in-time/frozen file |
| // lengths even after the flume open file is deleted and |
| // the hbase live file has moved forward. |
| Assert.assertEquals(hbaseFileLengthAfterS3, |
| fs.getFileStatus(hbaseS3Path).getLen()); |
| Assert.assertEquals(hbaseFileWrittenDataLength, |
| fs.getFileStatus(hbaseFile).getLen()); |
| |
| hbaseOutputStream.close(); |
| } |
| |
| /** |
| * Verify if the NameNode can restart properly after an OpenForWrite |
| * file and the only snapshot it was present in were deleted. |
| * |
| * @throws Exception |
| */ |
| @Test (timeout = 600000) |
| public void testOpenFileDeletionAndNNRestart() throws Exception { |
| // Construct the directory tree |
| final Path snapRootDir = new Path("/level_0_A/test"); |
| final String hbaseFileName = "hbase.log"; |
| final String snap1Name = "snap_1"; |
| |
| // Create a file with few blocks. Get its output stream |
| // for append. |
| final Path hbaseFile = new Path(snapRootDir, hbaseFileName); |
| createFile(hbaseFile); |
| FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile); |
| |
| int newWriteLength = (int) (BLOCKSIZE * 1.5); |
| byte[] buf = new byte[newWriteLength]; |
| Random random = new Random(); |
| random.nextBytes(buf); |
| |
| // Write more data to the file |
| writeToStream(hbaseOutputStream, buf); |
| |
| // Take a snapshot while the file is open for write |
| final Path snap1Dir = SnapshotTestHelper.createSnapshot( |
| fs, snapRootDir, snap1Name); |
| LOG.info("Open file status in snap: " + |
| fs.getFileStatus(new Path(snap1Dir, hbaseFileName))); |
| |
| // Delete the open file and the snapshot while |
| // its output stream is still open. |
| fs.delete(hbaseFile, true); |
| fs.deleteSnapshot(snapRootDir, snap1Name); |
| Assert.assertFalse(fs.exists(hbaseFile)); |
| |
| // Verify file existence after the NameNode restart |
| cluster.restartNameNode(); |
| cluster.waitActive(); |
| Assert.assertFalse(fs.exists(hbaseFile)); |
| } |
| |
| /** |
| * Test client writing to open files are not interrupted when snapshots |
| * that captured open files get deleted. |
| */ |
| @Test (timeout = 240000) |
| public void testOpenFileWritingAcrossSnapDeletion() throws Exception { |
| final Path snapRootDir = new Path("/level_0_A"); |
| final String flumeFileName = "flume.log"; |
| final String hbaseFileName = "hbase.log"; |
| final String snap1Name = "snap_1"; |
| final String snap2Name = "snap_2"; |
| final String snap3Name = "snap_3"; |
| |
| // Create files and open streams |
| final Path flumeFile = new Path(snapRootDir, flumeFileName); |
| FSDataOutputStream flumeOut = fs.create(flumeFile, false, |
| 8000, (short)3, 1048576); |
| flumeOut.close(); |
| final Path hbaseFile = new Path(snapRootDir, hbaseFileName); |
| FSDataOutputStream hbaseOut = fs.create(hbaseFile, false, |
| 8000, (short)3, 1048576); |
| hbaseOut.close(); |
| |
| final AtomicBoolean writerError = new AtomicBoolean(false); |
| final CountDownLatch startLatch = new CountDownLatch(1); |
| final CountDownLatch deleteLatch = new CountDownLatch(1); |
| Thread t = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| FSDataOutputStream flumeOutputStream = fs.append(flumeFile, 8000); |
| FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile, 8000); |
| byte[] bytes = new byte[(int) (1024 * 0.2)]; |
| Random r = new Random(Time.now()); |
| |
| for (int i = 0; i < 200000; i++) { |
| r.nextBytes(bytes); |
| flumeOutputStream.write(bytes); |
| if (hbaseOutputStream != null) { |
| hbaseOutputStream.write(bytes); |
| } |
| if (i == 50000) { |
| startLatch.countDown(); |
| } else if (i == 100000) { |
| deleteLatch.countDown(); |
| } else if (i == 150000) { |
| hbaseOutputStream.hsync(); |
| fs.delete(hbaseFile, true); |
| try { |
| hbaseOutputStream.close(); |
| } catch (Exception e) { |
| // since the file is deleted before the open stream close, |
| // it might throw FileNotFoundException. Ignore the |
| // expected exception. |
| } |
| hbaseOutputStream = null; |
| } else if (i % 5000 == 0) { |
| LOG.info("Write pos: " + flumeOutputStream.getPos() |
| + ", size: " + fs.getFileStatus(flumeFile).getLen() |
| + ", loop: " + (i + 1)); |
| } |
| } |
| } catch (Exception e) { |
| LOG.warn("Writer error: " + e); |
| writerError.set(true); |
| } |
| } |
| }); |
| t.start(); |
| |
| startLatch.await(); |
| final Path snap1Dir = SnapshotTestHelper.createSnapshot( |
| fs, snapRootDir, snap1Name); |
| final Path flumeS1Path = new Path(snap1Dir, flumeFileName); |
| LOG.info("Snap1 file status: " + fs.getFileStatus(flumeS1Path)); |
| LOG.info("Current file status: " + fs.getFileStatus(flumeFile)); |
| |
| deleteLatch.await(); |
| LOG.info("Snap1 file status: " + fs.getFileStatus(flumeS1Path)); |
| LOG.info("Current file status: " + fs.getFileStatus(flumeFile)); |
| |
| // Verify deletion of snapshot which had the under construction file |
| // captured is not truncating the under construction file and the thread |
| // writing to the same file not crashing on newer block allocations. |
| LOG.info("Deleting " + snap1Name); |
| fs.deleteSnapshot(snapRootDir, snap1Name); |
| |
| // Verify creation and deletion of snapshot newer than the oldest |
| // snapshot is not crashing the thread writing to under construction file. |
| SnapshotTestHelper.createSnapshot(fs, snapRootDir, snap2Name); |
| SnapshotTestHelper.createSnapshot(fs, snapRootDir, snap3Name); |
| fs.deleteSnapshot(snapRootDir, snap3Name); |
| fs.deleteSnapshot(snapRootDir, snap2Name); |
| SnapshotTestHelper.createSnapshot(fs, snapRootDir, "test"); |
| |
| t.join(); |
| Assert.assertFalse("Client encountered writing error!", writerError.get()); |
| |
| restartNameNode(); |
| cluster.waitActive(); |
| } |
| |
| /** |
| * Verify snapshots with open files captured are safe even when the |
| * 'current' version of the file is truncated and appended later. |
| */ |
| @Test (timeout = 120000) |
| public void testOpenFilesSnapChecksumWithTrunkAndAppend() throws Exception { |
| conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_CAPTURE_OPENFILES, |
| true); |
| // Construct the directory tree |
| final Path dir = new Path("/A/B/C"); |
| fs.mkdirs(dir); |
| |
| // String constants |
| final Path hbaseSnapRootDir = dir; |
| final String hbaseFileName = "hbase.wal"; |
| final String hbaseSnap1Name = "hbase_snap_s1"; |
| final String hbaseSnap2Name = "hbase_snap_s2"; |
| final String hbaseSnap3Name = "hbase_snap_s3"; |
| final String hbaseSnap4Name = "hbase_snap_s4"; |
| |
| // Create files and open a stream |
| final Path hbaseFile = new Path(dir, hbaseFileName); |
| createFile(hbaseFile); |
| final FileChecksum hbaseWALFileCksum0 = |
| fs.getFileChecksum(hbaseFile); |
| FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile); |
| |
| // Create Snapshot S1 |
| final Path hbaseS1Dir = SnapshotTestHelper.createSnapshot( |
| fs, hbaseSnapRootDir, hbaseSnap1Name); |
| final Path hbaseS1Path = new Path(hbaseS1Dir, hbaseFileName); |
| final FileChecksum hbaseFileCksumS1 = fs.getFileChecksum(hbaseS1Path); |
| |
| // Verify if Snap S1 checksum is same as the current version one |
| Assert.assertEquals("Live and snap1 file checksum doesn't match!", |
| hbaseWALFileCksum0, fs.getFileChecksum(hbaseS1Path)); |
| |
| int newWriteLength = (int) (BLOCKSIZE * 1.5); |
| byte[] buf = new byte[newWriteLength]; |
| Random random = new Random(); |
| random.nextBytes(buf); |
| writeToStream(hbaseOutputStream, buf); |
| |
| // Create Snapshot S2 |
| final Path hbaseS2Dir = SnapshotTestHelper.createSnapshot( |
| fs, hbaseSnapRootDir, hbaseSnap2Name); |
| final Path hbaseS2Path = new Path(hbaseS2Dir, hbaseFileName); |
| final FileChecksum hbaseFileCksumS2 = fs.getFileChecksum(hbaseS2Path); |
| |
| // Verify if the s1 checksum is still the same |
| Assert.assertEquals("Snap file checksum has changed!", |
| hbaseFileCksumS1, fs.getFileChecksum(hbaseS1Path)); |
| // Verify if the s2 checksum is different from the s1 checksum |
| Assert.assertNotEquals("Snap1 and snap2 file checksum should differ!", |
| hbaseFileCksumS1, hbaseFileCksumS2); |
| |
| newWriteLength = (int) (BLOCKSIZE * 2.5); |
| buf = new byte[newWriteLength]; |
| random.nextBytes(buf); |
| writeToStream(hbaseOutputStream, buf); |
| |
| // Create Snapshot S3 |
| final Path hbaseS3Dir = SnapshotTestHelper.createSnapshot( |
| fs, hbaseSnapRootDir, hbaseSnap3Name); |
| final Path hbaseS3Path = new Path(hbaseS3Dir, hbaseFileName); |
| FileChecksum hbaseFileCksumS3 = fs.getFileChecksum(hbaseS3Path); |
| |
| // Record the checksum for the before truncate current file |
| hbaseOutputStream.close(); |
| final FileChecksum hbaseFileCksumBeforeTruncate = |
| fs.getFileChecksum(hbaseFile); |
| Assert.assertEquals("Snap3 and before truncate file checksum should match!", |
| hbaseFileCksumBeforeTruncate, hbaseFileCksumS3); |
| |
| // Truncate the current file and record the after truncate checksum |
| long currentFileLen = fs.getFileStatus(hbaseFile).getLen(); |
| boolean fileTruncated = fs.truncate(hbaseFile, currentFileLen / 2); |
| Assert.assertTrue("File truncation failed!", fileTruncated); |
| final FileChecksum hbaseFileCksumAfterTruncate = |
| fs.getFileChecksum(hbaseFile); |
| |
| Assert.assertNotEquals("Snap3 and after truncate checksum shouldn't match!", |
| hbaseFileCksumS3, hbaseFileCksumAfterTruncate); |
| |
| // Append more data to the current file |
| hbaseOutputStream = fs.append(hbaseFile); |
| newWriteLength = (int) (BLOCKSIZE * 5.5); |
| buf = new byte[newWriteLength]; |
| random.nextBytes(buf); |
| writeToStream(hbaseOutputStream, buf); |
| |
| // Create Snapshot S4 |
| final Path hbaseS4Dir = SnapshotTestHelper.createSnapshot( |
| fs, hbaseSnapRootDir, hbaseSnap4Name); |
| final Path hbaseS4Path = new Path(hbaseS4Dir, hbaseFileName); |
| final FileChecksum hbaseFileCksumS4 = fs.getFileChecksum(hbaseS4Path); |
| |
| // Record the checksum for the current file after append |
| hbaseOutputStream.close(); |
| final FileChecksum hbaseFileCksumAfterAppend = |
| fs.getFileChecksum(hbaseFile); |
| |
| Assert.assertEquals("Snap4 and after append file checksum should match!", |
| hbaseFileCksumAfterAppend, hbaseFileCksumS4); |
| |
| // Recompute checksum for S3 path and verify it has not changed |
| hbaseFileCksumS3 = fs.getFileChecksum(hbaseS3Path); |
| Assert.assertEquals("Snap3 and before truncate file checksum should match!", |
| hbaseFileCksumBeforeTruncate, hbaseFileCksumS3); |
| } |
| |
| private void restartNameNode() throws Exception { |
| cluster.triggerBlockReports(); |
| NameNode nameNode = cluster.getNameNode(); |
| NameNodeAdapter.enterSafeMode(nameNode, false); |
| NameNodeAdapter.saveNamespace(nameNode); |
| NameNodeAdapter.leaveSafeMode(nameNode); |
| cluster.restartNameNode(true); |
| } |
| } |