blob: 65b2f8c3f93232d708561396abe84ee0c7a61a2f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClientAdapter;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test snapshot functionalities while file appending.
*/
public class TestINodeFileUnderConstructionWithSnapshot {
{
GenericTestUtils.setLogLevel(INode.LOG, Level.ALL);
SnapshotTestHelper.disableLogs();
}
static final long seed = 0;
static final short REPLICATION = 3;
static final int BLOCKSIZE = 1024;
private final Path dir = new Path("/TestSnapshot");
Configuration conf;
MiniDFSCluster cluster;
FSNamesystem fsn;
DistributedFileSystem hdfs;
FSDirectory fsdir;
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
.build();
cluster.waitActive();
fsn = cluster.getNamesystem();
fsdir = fsn.getFSDirectory();
hdfs = cluster.getFileSystem();
hdfs.mkdirs(dir);
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
/**
* Test snapshot after file appending
*/
@Test (timeout=60000)
public void testSnapshotAfterAppending() throws Exception {
Path file = new Path(dir, "file");
// 1. create snapshot --> create file --> append
SnapshotTestHelper.createSnapshot(hdfs, dir, "s0");
DFSTestUtil.createFile(hdfs, file, BLOCKSIZE, REPLICATION, seed);
DFSTestUtil.appendFile(hdfs, file, BLOCKSIZE);
INodeFile fileNode = (INodeFile) fsdir.getINode(file.toString());
// 2. create snapshot --> modify the file --> append
hdfs.createSnapshot(dir, "s1");
hdfs.setReplication(file, (short) (REPLICATION - 1));
DFSTestUtil.appendFile(hdfs, file, BLOCKSIZE);
// check corresponding inodes
fileNode = (INodeFile) fsdir.getINode(file.toString());
assertEquals(REPLICATION - 1, fileNode.getFileReplication());
assertEquals(BLOCKSIZE * 3, fileNode.computeFileSize());
// 3. create snapshot --> append
hdfs.createSnapshot(dir, "s2");
DFSTestUtil.appendFile(hdfs, file, BLOCKSIZE);
// check corresponding inodes
fileNode = (INodeFile) fsdir.getINode(file.toString());
assertEquals(REPLICATION - 1, fileNode.getFileReplication());
assertEquals(BLOCKSIZE * 4, fileNode.computeFileSize());
}
private HdfsDataOutputStream appendFileWithoutClosing(Path file, int length)
throws IOException {
byte[] toAppend = new byte[length];
Random random = new Random();
random.nextBytes(toAppend);
HdfsDataOutputStream out = (HdfsDataOutputStream) hdfs.append(file);
out.write(toAppend);
return out;
}
/**
* Test snapshot during file appending, before the corresponding
* {@link FSDataOutputStream} instance closes.
*/
@Test (timeout=60000)
public void testSnapshotWhileAppending() throws Exception {
Path file = new Path(dir, "file");
DFSTestUtil.createFile(hdfs, file, BLOCKSIZE, REPLICATION, seed);
// 1. append without closing stream --> create snapshot
HdfsDataOutputStream out = appendFileWithoutClosing(file, BLOCKSIZE);
out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
SnapshotTestHelper.createSnapshot(hdfs, dir, "s0");
out.close();
// check: an INodeFileUnderConstructionWithSnapshot should be stored into s0's
// deleted list, with size BLOCKSIZE*2
INodeFile fileNode = (INodeFile) fsdir.getINode(file.toString());
assertEquals(BLOCKSIZE * 2, fileNode.computeFileSize());
INodeDirectory dirNode = fsdir.getINode(dir.toString()).asDirectory();
DirectoryDiff last = dirNode.getDiffs().getLast();
// 2. append without closing stream
out = appendFileWithoutClosing(file, BLOCKSIZE);
out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
// re-check nodeInDeleted_S0
dirNode = fsdir.getINode(dir.toString()).asDirectory();
assertEquals(BLOCKSIZE * 2, fileNode.computeFileSize(last.getSnapshotId()));
// 3. take snapshot --> close stream
hdfs.createSnapshot(dir, "s1");
out.close();
// check: an INodeFileUnderConstructionWithSnapshot with size BLOCKSIZE*3 should
// have been stored in s1's deleted list
fileNode = (INodeFile) fsdir.getINode(file.toString());
dirNode = fsdir.getINode(dir.toString()).asDirectory();
last = dirNode.getDiffs().getLast();
assertTrue(fileNode.isWithSnapshot());
assertEquals(BLOCKSIZE * 3, fileNode.computeFileSize(last.getSnapshotId()));
// 4. modify file --> append without closing stream --> take snapshot -->
// close stream
hdfs.setReplication(file, (short) (REPLICATION - 1));
out = appendFileWithoutClosing(file, BLOCKSIZE);
hdfs.createSnapshot(dir, "s2");
out.close();
// re-check the size of nodeInDeleted_S1
assertEquals(BLOCKSIZE * 3, fileNode.computeFileSize(last.getSnapshotId()));
}
/**
* call DFSClient#callGetBlockLocations(...) for snapshot file. Make sure only
* blocks within the size range are returned.
*/
@Test
public void testGetBlockLocations() throws Exception {
final Path root = new Path("/");
final Path file = new Path("/file");
DFSTestUtil.createFile(hdfs, file, BLOCKSIZE, REPLICATION, seed);
// take a snapshot on root
SnapshotTestHelper.createSnapshot(hdfs, root, "s1");
final Path fileInSnapshot = SnapshotTestHelper.getSnapshotPath(root,
"s1", file.getName());
FileStatus status = hdfs.getFileStatus(fileInSnapshot);
// make sure we record the size for the file
assertEquals(BLOCKSIZE, status.getLen());
// append data to file
DFSTestUtil.appendFile(hdfs, file, BLOCKSIZE - 1);
status = hdfs.getFileStatus(fileInSnapshot);
// the size of snapshot file should still be BLOCKSIZE
assertEquals(BLOCKSIZE, status.getLen());
// the size of the file should be (2 * BLOCKSIZE - 1)
status = hdfs.getFileStatus(file);
assertEquals(BLOCKSIZE * 2 - 1, status.getLen());
// call DFSClient#callGetBlockLocations for the file in snapshot
LocatedBlocks blocks = DFSClientAdapter.callGetBlockLocations(
cluster.getNameNodeRpc(), fileInSnapshot.toString(), 0, Long.MAX_VALUE);
List<LocatedBlock> blockList = blocks.getLocatedBlocks();
// should be only one block
assertEquals(BLOCKSIZE, blocks.getFileLength());
assertEquals(1, blockList.size());
// check the last block
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
assertEquals(0, lastBlock.getStartOffset());
assertEquals(BLOCKSIZE, lastBlock.getBlockSize());
// take another snapshot
SnapshotTestHelper.createSnapshot(hdfs, root, "s2");
final Path fileInSnapshot2 = SnapshotTestHelper.getSnapshotPath(root,
"s2", file.getName());
// append data to file without closing
HdfsDataOutputStream out = appendFileWithoutClosing(file, BLOCKSIZE);
out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
status = hdfs.getFileStatus(fileInSnapshot2);
// the size of snapshot file should be BLOCKSIZE*2-1
assertEquals(BLOCKSIZE * 2 - 1, status.getLen());
// the size of the file should be (3 * BLOCKSIZE - 1)
status = hdfs.getFileStatus(file);
assertEquals(BLOCKSIZE * 3 - 1, status.getLen());
blocks = DFSClientAdapter.callGetBlockLocations(cluster.getNameNodeRpc(),
fileInSnapshot2.toString(), 0, Long.MAX_VALUE);
assertFalse(blocks.isUnderConstruction());
assertTrue(blocks.isLastBlockComplete());
blockList = blocks.getLocatedBlocks();
// should be 2 blocks
assertEquals(BLOCKSIZE * 2 - 1, blocks.getFileLength());
assertEquals(2, blockList.size());
// check the last block
lastBlock = blocks.getLastLocatedBlock();
assertEquals(BLOCKSIZE, lastBlock.getStartOffset());
assertEquals(BLOCKSIZE, lastBlock.getBlockSize());
blocks = DFSClientAdapter.callGetBlockLocations(cluster.getNameNodeRpc(),
fileInSnapshot2.toString(), BLOCKSIZE, 0);
blockList = blocks.getLocatedBlocks();
assertEquals(1, blockList.size());
// check blocks for file being written
blocks = DFSClientAdapter.callGetBlockLocations(cluster.getNameNodeRpc(),
file.toString(), 0, Long.MAX_VALUE);
blockList = blocks.getLocatedBlocks();
assertEquals(3, blockList.size());
assertTrue(blocks.isUnderConstruction());
assertFalse(blocks.isLastBlockComplete());
lastBlock = blocks.getLastLocatedBlock();
assertEquals(BLOCKSIZE * 2, lastBlock.getStartOffset());
assertEquals(BLOCKSIZE - 1, lastBlock.getBlockSize());
out.close();
}
@Test
public void testLease() throws Exception {
try {
NameNodeAdapter.setLeasePeriod(fsn, 100, 200);
final Path foo = new Path(dir, "foo");
final Path bar = new Path(foo, "bar");
DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPLICATION, 0);
HdfsDataOutputStream out = appendFileWithoutClosing(bar, 100);
out.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
SnapshotTestHelper.createSnapshot(hdfs, dir, "s0");
hdfs.delete(foo, true);
Thread.sleep(1000);
try {
fsn.writeLock();
NameNodeAdapter.getLeaseManager(fsn).runLeaseChecks();
} finally {
fsn.writeUnlock();
}
} finally {
NameNodeAdapter.setLeasePeriod(fsn, HdfsConstants.LEASE_SOFTLIMIT_PERIOD,
HdfsConstants.LEASE_HARDLIMIT_PERIOD);
}
}
}