blob: e63da6cfca264b8050eccb1d6325191ff9e40553 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.*;
public class TestLeaseManager {
@Rule
public Timeout timeout = new Timeout(300000);
public static long maxLockHoldToReleaseLeaseMs = 100;
@Test
public void testRemoveLeases() throws Exception {
FSNamesystem fsn = mock(FSNamesystem.class);
LeaseManager lm = new LeaseManager(fsn);
ArrayList<Long> ids = Lists.newArrayList(INodeId.ROOT_INODE_ID + 1,
INodeId.ROOT_INODE_ID + 2, INodeId.ROOT_INODE_ID + 3,
INodeId.ROOT_INODE_ID + 4);
for (long id : ids) {
lm.addLease("foo", id);
}
assertEquals(4, lm.getINodeIdWithLeases().size());
for (long id : ids) {
lm.removeLease(id);
}
assertEquals(0, lm.getINodeIdWithLeases().size());
}
/** Check that LeaseManager.checkLease release some leases
*/
@Test
public void testCheckLease() throws InterruptedException {
LeaseManager lm = new LeaseManager(makeMockFsNameSystem());
final long numLease = 100;
final long expiryTime = 0;
final long waitTime = expiryTime + 1;
//Make sure the leases we are going to add exceed the hard limit
lm.setLeasePeriod(expiryTime, expiryTime);
for (long i = 0; i <= numLease - 1; i++) {
//Add some leases to the LeaseManager
lm.addLease("holder"+i, INodeId.ROOT_INODE_ID + i);
}
assertEquals(numLease, lm.countLease());
Thread.sleep(waitTime);
//Initiate a call to checkLease. This should exit within the test timeout
lm.checkLeases();
assertTrue(lm.countLease() < numLease);
}
/**
* Test whether the internal lease holder name is updated properly.
*/
@Test
public void testInternalLeaseHolder() throws Exception {
LeaseManager lm = new LeaseManager(makeMockFsNameSystem());
// Set the hard lease limit to 500ms.
lm.setLeasePeriod(100L, 500L);
String holder = lm.getInternalLeaseHolder();
Thread.sleep(1000);
assertNotEquals(holder, lm.getInternalLeaseHolder());
}
@Test
public void testCountPath() {
LeaseManager lm = new LeaseManager(makeMockFsNameSystem());
lm.addLease("holder1", 1);
assertThat(lm.countPath(), is(1L));
lm.addLease("holder2", 2);
assertThat(lm.countPath(), is(2L));
lm.addLease("holder2", 2); // Duplicate addition
assertThat(lm.countPath(), is(2L));
assertThat(lm.countPath(), is(2L));
// Remove a couple of non-existing leases. countPath should not change.
lm.removeLease("holder2", stubInodeFile(3));
lm.removeLease("InvalidLeaseHolder", stubInodeFile(1));
assertThat(lm.countPath(), is(2L));
INodeFile file = stubInodeFile(1);
lm.reassignLease(lm.getLease(file), file, "holder2");
assertThat(lm.countPath(), is(2L)); // Count unchanged on reassign
lm.removeLease("holder2", stubInodeFile(2)); // Remove existing
assertThat(lm.countPath(), is(1L));
}
/**
* Make sure the lease is restored even if only the inode has the record.
*/
@Test
public void testLeaseRestorationOnRestart() throws Exception {
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(new HdfsConfiguration())
.numDataNodes(1).build();
DistributedFileSystem dfs = cluster.getFileSystem();
// Create an empty file
String path = "/testLeaseRestorationOnRestart";
FSDataOutputStream out = dfs.create(new Path(path));
// Remove the lease from the lease manager, but leave it in the inode.
FSDirectory dir = cluster.getNamesystem().getFSDirectory();
INodeFile file = dir.getINode(path).asFile();
cluster.getNamesystem().leaseManager.removeLease(
file.getFileUnderConstructionFeature().getClientName(), file);
// Save a fsimage.
dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.getNameNodeRpc().saveNamespace();
dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
// Restart the namenode.
cluster.restartNameNode(true);
// Check whether the lease manager has the lease
dir = cluster.getNamesystem().getFSDirectory();
file = dir.getINode(path).asFile();
assertTrue("Lease should exist.",
cluster.getNamesystem().leaseManager.getLease(file) != null);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test leased files counts from
* {@link LeaseManager#getINodeWithLeases()},
* {@link LeaseManager#getINodeIdWithLeases()} and
* {@link LeaseManager#getINodeWithLeases(INodeDirectory)}.
*/
@Test (timeout = 60000)
public void testInodeWithLeases() throws Exception {
FSNamesystem fsNamesystem = makeMockFsNameSystem();
FSDirectory fsDirectory = fsNamesystem.getFSDirectory();
LeaseManager lm = new LeaseManager(fsNamesystem);
Set<Long> iNodeIds = new HashSet<>(Arrays.asList(
INodeId.ROOT_INODE_ID + 1,
INodeId.ROOT_INODE_ID + 2,
INodeId.ROOT_INODE_ID + 3,
INodeId.ROOT_INODE_ID + 4
));
final PermissionStatus perm = PermissionStatus.createImmutable(
"user", "group", FsPermission.createImmutable((short)0755));
INodeDirectory rootInodeDirectory = new INodeDirectory(
HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""),
perm, 0L);
when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory);
verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0);
for (Long iNodeId : iNodeIds) {
INodeFile iNodeFile = stubInodeFile(iNodeId);
iNodeFile.setParent(rootInodeDirectory);
when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile);
lm.addLease("holder_" + iNodeId, iNodeId);
}
verifyINodeLeaseCounts(lm, rootInodeDirectory, iNodeIds.size(),
iNodeIds.size(), iNodeIds.size());
for (Long iNodeId : iNodeIds) {
lm.removeLease(iNodeId);
}
verifyINodeLeaseCounts(lm, rootInodeDirectory, 0, 0, 0);
}
/**
* Test leased files counts at various scale from
* {@link LeaseManager#getINodeWithLeases()},
* {@link LeaseManager#getINodeIdWithLeases()} and
* {@link LeaseManager#getINodeWithLeases(INodeDirectory)}.
*/
@Test (timeout = 240000)
public void testInodeWithLeasesAtScale() throws Exception {
FSNamesystem fsNamesystem = makeMockFsNameSystem();
FSDirectory fsDirectory = fsNamesystem.getFSDirectory();
LeaseManager lm = new LeaseManager(fsNamesystem);
final PermissionStatus perm = PermissionStatus.createImmutable(
"user", "group", FsPermission.createImmutable((short)0755));
INodeDirectory rootInodeDirectory = new INodeDirectory(
HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""),
perm, 0L);
when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory);
// Case 1: No open files
int scale = 0;
testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale);
for (int workerCount = 1;
workerCount <= LeaseManager.INODE_FILTER_WORKER_COUNT_MAX / 2;
workerCount++) {
// Case 2: Open files count is half of worker task size
scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN / 2;
testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
rootInodeDirectory, scale);
// Case 3: Open files count is 1 less of worker task size
scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN - 1;
testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
rootInodeDirectory, scale);
// Case 4: Open files count is equal to worker task size
scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN;
testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
rootInodeDirectory, scale);
// Case 5: Open files count is 1 more than worker task size
scale = workerCount * LeaseManager.INODE_FILTER_WORKER_TASK_MIN + 1;
testInodeWithLeasesAtScaleImpl(lm, fsDirectory,
rootInodeDirectory, scale);
}
// Case 6: Open files count is way more than worker count
scale = 1279;
testInodeWithLeasesAtScaleImpl(lm, fsDirectory, rootInodeDirectory, scale);
}
private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager,
final FSDirectory fsDirectory, INodeDirectory ancestorDirectory,
int scale) throws IOException {
verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
Set<Long> iNodeIds = new HashSet<>();
for (int i = 0; i < scale; i++) {
iNodeIds.add(INodeId.ROOT_INODE_ID + i);
}
for (Long iNodeId : iNodeIds) {
INodeFile iNodeFile = stubInodeFile(iNodeId);
iNodeFile.setParent(ancestorDirectory);
when(fsDirectory.getInode(iNodeId)).thenReturn(iNodeFile);
leaseManager.addLease("holder_" + iNodeId, iNodeId);
}
verifyINodeLeaseCounts(leaseManager, ancestorDirectory, iNodeIds.size(),
iNodeIds.size(), iNodeIds.size());
leaseManager.removeAllLeases();
verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
}
/**
* Verify leased INode details across lease get and release from
* {@link LeaseManager#getINodeIdWithLeases()} and
* {@link LeaseManager#getINodeWithLeases(INodeDirectory)}.
*/
@Test (timeout = 60000)
public void testInodeWithLeasesForAncestorDir() throws Exception {
FSNamesystem fsNamesystem = makeMockFsNameSystem();
FSDirectory fsDirectory = fsNamesystem.getFSDirectory();
LeaseManager lm = new LeaseManager(fsNamesystem);
final PermissionStatus perm = PermissionStatus.createImmutable(
"user", "group", FsPermission.createImmutable((short)0755));
INodeDirectory rootInodeDirectory = new INodeDirectory(
HdfsConstants.GRANDFATHER_INODE_ID, DFSUtil.string2Bytes(""),
perm, 0L);
when(fsDirectory.getRoot()).thenReturn(rootInodeDirectory);
AtomicInteger inodeIds = new AtomicInteger(
(int) (HdfsConstants.GRANDFATHER_INODE_ID + 1234));
String[] pathTree = new String[] {
"/root.log",
"/ENG/a/a1.log",
"/ENG/a/b/b1.log",
"/ENG/a/b/c/c1.log",
"/ENG/a/b/c/c2.log",
"/OPS/m/m1.log",
"/OPS/m/n/n1.log",
"/OPS/m/n/n2.log"
};
Map<String, INode> pathINodeMap = createINodeTree(rootInodeDirectory,
pathTree, inodeIds);
assertEquals(0, lm.getINodeIdWithLeases().size());
for (Entry<String, INode> entry : pathINodeMap.entrySet()) {
long iNodeId = entry.getValue().getId();
when(fsDirectory.getInode(iNodeId)).thenReturn(entry.getValue());
if (entry.getKey().contains("log")) {
lm.addLease("holder_" + iNodeId, iNodeId);
}
}
assertEquals(pathTree.length, lm.getINodeIdWithLeases().size());
assertEquals(pathTree.length, lm.getINodeWithLeases().size());
assertEquals(pathTree.length, lm.getINodeWithLeases(
rootInodeDirectory).size());
// reset
lm.removeAllLeases();
Set<String> filesLeased = new HashSet<>(
Arrays.asList("root.log", "a1.log", "c1.log", "n2.log"));
for (String fileName : filesLeased) {
lm.addLease("holder", pathINodeMap.get(fileName).getId());
}
assertEquals(filesLeased.size(), lm.getINodeIdWithLeases().size());
assertEquals(filesLeased.size(), lm.getINodeWithLeases().size());
Set<INodesInPath> iNodeWithLeases = lm.getINodeWithLeases();
for (INodesInPath iNodesInPath : iNodeWithLeases) {
String leasedFileName = DFSUtil.bytes2String(
iNodesInPath.getLastLocalName());
assertTrue(filesLeased.contains(leasedFileName));
}
assertEquals(filesLeased.size(),
lm.getINodeWithLeases(rootInodeDirectory).size());
assertEquals(filesLeased.size() - 2,
lm.getINodeWithLeases(pathINodeMap.get("ENG").asDirectory()).size());
assertEquals(filesLeased.size() - 2,
lm.getINodeWithLeases(pathINodeMap.get("a").asDirectory()).size());
assertEquals(filesLeased.size() - 3,
lm.getINodeWithLeases(pathINodeMap.get("c").asDirectory()).size());
assertEquals(filesLeased.size() - 3,
lm.getINodeWithLeases(pathINodeMap.get("OPS").asDirectory()).size());
assertEquals(filesLeased.size() - 3,
lm.getINodeWithLeases(pathINodeMap.get("n").asDirectory()).size());
lm.removeLease(pathINodeMap.get("n2.log").getId());
assertEquals(filesLeased.size() - 1,
lm.getINodeWithLeases(rootInodeDirectory).size());
assertEquals(filesLeased.size() - 4,
lm.getINodeWithLeases(pathINodeMap.get("n").asDirectory()).size());
lm.removeAllLeases();
filesLeased.clear();
assertEquals(filesLeased.size(),
lm.getINodeWithLeases(rootInodeDirectory).size());
}
private void verifyINodeLeaseCounts(final LeaseManager leaseManager,
INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount,
int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount)
throws IOException {
assertEquals(iNodeIdWithLeaseCount,
leaseManager.getINodeIdWithLeases().size());
assertEquals(iNodeWithLeaseCount,
leaseManager.getINodeWithLeases().size());
assertEquals(iNodeUnderAncestorLeaseCount,
leaseManager.getINodeWithLeases(ancestorDirectory).size());
}
private Map<String, INode> createINodeTree(INodeDirectory parentDir,
String[] pathTree, AtomicInteger inodeId)
throws QuotaExceededException {
HashMap<String, INode> pathINodeMap = new HashMap<>();
for (String path : pathTree) {
byte[][] components = INode.getPathComponents(path);
FsPermission perm = FsPermission.createImmutable((short) 0755);
PermissionStatus permStatus =
PermissionStatus.createImmutable("", "", perm);
INodeDirectory prev = parentDir;
INodeDirectory dir = null;
for (int i = 0; i < components.length - 1; i++) {
byte[] component = components[i];
if (component.length == 0) {
continue;
}
INode existingChild = prev.getChild(
component, Snapshot.CURRENT_STATE_ID);
if (existingChild == null) {
String dirName = DFSUtil.bytes2String(component);
dir = new INodeDirectory(inodeId.incrementAndGet(), component,
permStatus, 0);
prev.addChild(dir, false, Snapshot.CURRENT_STATE_ID);
pathINodeMap.put(dirName, dir);
prev = dir;
} else {
assertTrue(existingChild.isDirectory());
prev = existingChild.asDirectory();
}
}
PermissionStatus p = new PermissionStatus(
"user", "group", new FsPermission((short) 0777));
byte[] fileNameBytes = components[components.length - 1];
String fileName = DFSUtil.bytes2String(fileNameBytes);
INodeFile iNodeFile = new INodeFile(
inodeId.incrementAndGet(), fileNameBytes,
p, 0L, 0L, BlockInfo.EMPTY_ARRAY, (short) 1, 1L);
iNodeFile.setParent(prev);
pathINodeMap.put(fileName, iNodeFile);
}
return pathINodeMap;
}
private static FSNamesystem makeMockFsNameSystem() {
FSDirectory dir = mock(FSDirectory.class);
FSNamesystem fsn = mock(FSNamesystem.class);
when(fsn.isRunning()).thenReturn(true);
when(fsn.hasReadLock()).thenReturn(true);
when(fsn.hasWriteLock()).thenReturn(true);
when(fsn.getFSDirectory()).thenReturn(dir);
when(fsn.getMaxLockHoldToReleaseLeaseMs()).thenReturn(maxLockHoldToReleaseLeaseMs);
return fsn;
}
private static INodeFile stubInodeFile(long inodeId) {
PermissionStatus p = new PermissionStatus(
"dummy", "dummy", new FsPermission((short) 0777));
return new INodeFile(
inodeId, new String("foo-" + inodeId).getBytes(), p, 0L, 0L,
BlockInfo.EMPTY_ARRAY, (short) 1, 1L);
}
}