Merge remote-tracking branch 'origin/trunk' into HDFS-8966
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index dbe0726..0a19007 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -94,6 +94,7 @@
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
+import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
@@ -112,7 +113,7 @@
* Keeps information related to the blocks stored in the Hadoop cluster.
*/
@InterfaceAudience.Private
-public class BlockManager implements BlockStatsMXBean {
+public class BlockManager implements RwLock, BlockStatsMXBean {
public static final Logger LOG = LoggerFactory.getLogger(BlockManager.class);
public static final Logger blockLog = NameNode.blockStateChangeLog;
@@ -125,6 +126,7 @@
private final Namesystem namesystem;
+ private final BlockManagerLock lock;
private final DatanodeManager datanodeManager;
private final HeartbeatManager heartbeatManager;
private final BlockTokenSecretManager blockTokenSecretManager;
@@ -302,6 +304,7 @@
public BlockManager(final Namesystem namesystem, final Configuration conf)
throws IOException {
this.namesystem = namesystem;
+ this.lock = new BlockManagerLock(namesystem);
datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
@@ -519,7 +522,7 @@
/** Dump meta data to out. */
public void metaSave(PrintWriter out) {
- assert namesystem.hasWriteLock();
+ assert hasWriteLock();
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
datanodeManager.fetchDatanodes(live, dead, false);
@@ -551,7 +554,7 @@
// Dump all datanodes
getDatanodeManager().datanodeDump(out);
}
-
+
/**
* Dump the metadata for the given block in a human-readable
* form.
@@ -580,12 +583,12 @@
out.print(fileName + ": ");
}
// l: == live:, d: == decommissioned c: == corrupt e: == excess
- out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
+ out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
" (replicas:" +
" l: " + numReplicas.liveReplicas() +
" d: " + numReplicas.decommissionedAndDecommissioning() +
" c: " + numReplicas.corruptReplicas() +
- " e: " + numReplicas.excessReplicas() + ") ");
+ " e: " + numReplicas.excessReplicas() + ") ");
Collection<DatanodeDescriptor> corruptNodes =
corruptReplicas.getNodes(block);
@@ -956,7 +959,7 @@
final boolean inSnapshot, FileEncryptionInfo feInfo,
ErasureCodingPolicy ecPolicy)
throws IOException {
- assert namesystem.hasReadLock();
+ assert hasReadLock();
if (blocks == null) {
return null;
} else if (blocks.length == 0) {
@@ -990,6 +993,41 @@
}
}
+ @Override
+ public void readLock() {
+ lock.readLock().lock();
+ }
+
+ @Override
+ public void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ @Override
+ public boolean hasReadLock() {
+ return lock.hasReadLock();
+ }
+
+ @Override
+ public boolean hasWriteLock() {
+ return lock.hasWriteLock();
+ }
+
+ @Override
+ public void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ @Override
+ public void writeLockInterruptibly() throws InterruptedException {
+ lock.writeLock().lockInterruptibly();
+ }
+
+ @Override
+ public void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
/** @return current access keys. */
public ExportedBlockKeys getBlockKeys() {
return isBlockTokenEnabled()? blockTokenSecretManager.exportKeys()
@@ -1105,12 +1143,12 @@
public BlocksWithLocations getBlocks(DatanodeID datanode, long size
) throws IOException {
namesystem.checkOperation(OperationCategory.READ);
- namesystem.readLock();
+ readLock();
try {
namesystem.checkOperation(OperationCategory.READ);
return getBlocksWithLocations(datanode, size);
} finally {
- namesystem.readUnlock();
+ readUnlock();
}
}
@@ -1173,7 +1211,7 @@
/** Remove the blocks associated to the given DatanodeStorageInfo. */
void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
- assert namesystem.hasWriteLock();
+ assert hasWriteLock();
final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
while(it.hasNext()) {
@@ -1250,7 +1288,7 @@
*/
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
final DatanodeInfo dn, String storageID, String reason) throws IOException {
- assert namesystem.hasWriteLock();
+ assert hasWriteLock();
final Block reportedBlock = blk.getLocalBlock();
final BlockInfo storedBlock = getStoredBlock(reportedBlock);
if (storedBlock == null) {
@@ -1428,13 +1466,13 @@
*/
int computeBlockRecoveryWork(int blocksToProcess) {
List<List<BlockInfo>> blocksToReplicate = null;
- namesystem.writeLock();
+ writeLock();
try {
// Choose the blocks to be replicated
blocksToReplicate = neededReplications
.chooseUnderReplicatedBlocks(blocksToProcess);
} finally {
- namesystem.writeUnlock();
+ writeUnlock();
}
return computeRecoveryWorkForBlocks(blocksToReplicate);
}
@@ -1452,7 +1490,7 @@
List<BlockRecoveryWork> recovWork = new LinkedList<>();
// Step 1: categorize at-risk blocks into replication and EC tasks
- namesystem.writeLock();
+ writeLock();
try {
synchronized (neededReplications) {
for (int priority = 0; priority < blocksToRecover.size(); priority++) {
@@ -1465,7 +1503,7 @@
}
}
} finally {
- namesystem.writeUnlock();
+ writeUnlock();
}
// Step 2: choose target nodes for each recovery task
@@ -1487,7 +1525,7 @@
}
// Step 3: add tasks to the DN
- namesystem.writeLock();
+ writeLock();
try {
for(BlockRecoveryWork rw : recovWork){
final DatanodeStorageInfo[] targets = rw.getTargets();
@@ -1503,7 +1541,7 @@
}
}
} finally {
- namesystem.writeUnlock();
+ writeUnlock();
}
if (blockLog.isInfoEnabled()) {
@@ -1882,7 +1920,7 @@
private void processPendingReplications() {
BlockInfo[] timedOutItems = pendingReplications.getTimedOutBlocks();
if (timedOutItems != null) {
- namesystem.writeLock();
+ writeLock();
try {
for (int i = 0; i < timedOutItems.length; i++) {
/*
@@ -1900,7 +1938,7 @@
}
}
} finally {
- namesystem.writeUnlock();
+ writeUnlock();
}
/* If we know the target datanodes where the replication timedout,
* we could invoke decBlocksScheduled() on it. Its ok for now.
@@ -1909,7 +1947,7 @@
}
public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
- assert namesystem.hasReadLock();
+ assert hasReadLock();
DatanodeDescriptor node = null;
try {
node = datanodeManager.getDatanode(nodeReg);
@@ -1970,7 +2008,7 @@
final DatanodeStorage storage,
final BlockListAsLongs newReport, BlockReportContext context,
boolean lastStorageInRpc) throws IOException {
- namesystem.writeLock();
+ writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime;
DatanodeDescriptor node;
@@ -2046,7 +2084,7 @@
}
} finally {
endTime = Time.monotonicNow();
- namesystem.writeUnlock();
+ writeUnlock();
}
if (invalidatedBlocks != null) {
@@ -2073,7 +2111,7 @@
LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
"longer exists on the DataNode.",
Long.toHexString(context.getReportId()), zombie.getStorageID());
- assert(namesystem.hasWriteLock());
+ assert hasWriteLock();
Iterator<BlockInfo> iter = zombie.getBlockIterator();
int prevBlocks = zombie.numBlocks();
while (iter.hasNext()) {
@@ -2107,7 +2145,7 @@
long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow();
long startPostponedMisReplicatedBlocksCount =
getPostponedMisreplicatedBlocksCount();
- namesystem.writeLock();
+ writeLock();
try {
// blocksPerRescan is the configured number of blocks per rescan.
// Randomly select blocksPerRescan consecutive blocks from the HashSet
@@ -2160,7 +2198,7 @@
}
}
} finally {
- namesystem.writeUnlock();
+ writeUnlock();
long endPostponedMisReplicatedBlocksCount =
getPostponedMisreplicatedBlocksCount();
LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
@@ -2222,7 +2260,7 @@
BlockInfo block,
long oldGenerationStamp, long oldNumBytes,
DatanodeStorageInfo[] newStorages) throws IOException {
- assert namesystem.hasWriteLock();
+ assert hasWriteLock();
BlockToMarkCorrupt b = null;
if (block.getGenerationStamp() != oldGenerationStamp) {
b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
@@ -2270,7 +2308,7 @@
final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report) throws IOException {
if (report == null) return;
- assert (namesystem.hasWriteLock());
+ assert (hasWriteLock());
assert (storageInfo.getBlockReportCount() == 0);
for (BlockReportReplica iblk : report) {
@@ -2708,7 +2746,7 @@
private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
DatanodeStorageInfo storageInfo)
throws IOException {
- assert (storedBlock != null && namesystem.hasWriteLock());
+ assert (storedBlock != null && hasWriteLock());
if (!namesystem.isInStartupSafeMode()
|| isPopulatingReplQueues()) {
addStoredBlock(storedBlock, reported, storageInfo, null, false);
@@ -2743,7 +2781,7 @@
DatanodeDescriptor delNodeHint,
boolean logEveryBlock)
throws IOException {
- assert block != null && namesystem.hasWriteLock();
+ assert block != null && hasWriteLock();
BlockInfo storedBlock;
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
if (!block.isComplete()) {
@@ -2905,7 +2943,7 @@
* over or under replicated. Place it into the respective queue.
*/
public void processMisReplicatedBlocks() {
- assert namesystem.hasWriteLock();
+ assert hasWriteLock();
stopReplicationInitializer();
neededReplications.clear();
replicationQueuesInitializer = new Daemon() {
@@ -2962,7 +3000,7 @@
while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
int processed = 0;
- namesystem.writeLockInterruptibly();
+ writeLockInterruptibly();
try {
while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
BlockInfo block = blocksItr.next();
@@ -3016,7 +3054,7 @@
break;
}
} finally {
- namesystem.writeUnlock();
+ writeUnlock();
// Make sure it is out of the write lock for sufficiently long time.
Thread.sleep(sleepDuration);
}
@@ -3114,7 +3152,7 @@
private void processOverReplicatedBlock(final BlockInfo block,
final short replication, final DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
- assert namesystem.hasWriteLock();
+ assert hasWriteLock();
if (addedNode == delNodeHint) {
delNodeHint = null;
}
@@ -3152,7 +3190,7 @@
BlockInfo storedBlock, short replication,
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
- assert namesystem.hasWriteLock();
+ assert hasWriteLock();
// first form a rack to datanodes map and
BlockCollection bc = getBlockCollection(storedBlock);
if (storedBlock.isStriped()) {
@@ -3290,7 +3328,7 @@
}
private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
- assert namesystem.hasWriteLock();
+ assert hasWriteLock();
LightWeightHashSet<BlockInfo> excessBlocks = excessReplicateMap.get(
dn.getDatanodeUuid());
if (excessBlocks == null) {
@@ -3321,7 +3359,7 @@
*/
public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
- assert (namesystem.hasWriteLock());
+ assert hasWriteLock();
{
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
@@ -3498,7 +3536,7 @@
*/
public void processIncrementalBlockReport(final DatanodeID nodeID,
final StorageReceivedDeletedBlocks srdb) throws IOException {
- assert namesystem.hasWriteLock();
+ assert hasWriteLock();
int received = 0;
int deleted = 0;
int receiving = 0;
@@ -3706,7 +3744,7 @@
}
public void removeBlock(BlockInfo block) {
- assert namesystem.hasWriteLock();
+ assert hasWriteLock();
// No need to ACK blocks that are being removed entirely
// from the namespace, since the removal of the associated
// file already removes them from the block map below.
@@ -3740,7 +3778,7 @@
/** updates a block in under replication queue */
private void updateNeededReplications(final BlockInfo block,
final int curReplicasDelta, int expectedReplicasDelta) {
- namesystem.writeLock();
+ writeLock();
try {
if (!isPopulatingReplQueues()) {
return;
@@ -3758,7 +3796,7 @@
repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
}
} finally {
- namesystem.writeUnlock();
+ writeUnlock();
}
}
@@ -3820,7 +3858,7 @@
private int invalidateWorkForOneNode(DatanodeInfo dn) {
final List<Block> toInvalidate;
- namesystem.writeLock();
+ writeLock();
try {
// blocks should not be replicated or removed if safe mode is on
if (namesystem.isInSafeMode()) {
@@ -3844,7 +3882,7 @@
return 0;
}
} finally {
- namesystem.writeUnlock();
+ writeUnlock();
}
blockLog.debug("BLOCK* {}: ask {} to delete {}", getClass().getSimpleName(),
dn, toInvalidate);
@@ -4047,12 +4085,12 @@
int workFound = this.computeBlockRecoveryWork(blocksToProcess);
// Update counters
- namesystem.writeLock();
+ writeLock();
try {
this.updateState();
this.scheduledReplicationBlocksCount = workFound;
} finally {
- namesystem.writeUnlock();
+ writeUnlock();
}
workFound += this.computeInvalidateWork(nodesToProcess);
return workFound;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java
new file mode 100644
index 0000000..18dc201
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+class BlockManagerLock implements ReadWriteLock {
+ private final ReentrantReadWriteLock coarseLock;
+
+ BlockManagerLock(Namesystem ns) {
+ this.coarseLock = ns.getLockImplementation();
+ }
+
+ @Override
+ public Lock readLock() {
+ return coarseLock.readLock();
+ }
+
+ @Override
+ public Lock writeLock() {
+ return coarseLock.writeLock();
+ }
+
+ boolean hasReadLock() {
+ return hasWriteLock() || coarseLock.getReadHoldCount() > 0;
+ }
+
+ boolean hasWriteLock() {
+ return coarseLock.isWriteLockedByCurrentThread();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index 2f81ddf..54bcffd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -55,7 +55,6 @@
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-;
/**
* Scans the namesystem, scheduling blocks to be cached as appropriate.
@@ -218,7 +217,7 @@
* after are not atomic.
*/
public void waitForRescanIfNeeded() {
- Preconditions.checkArgument(!namesystem.hasWriteLock(),
+ Preconditions.checkArgument(!blockManager.hasWriteLock(),
"Must not hold the FSN write lock when waiting for a rescan.");
Preconditions.checkArgument(lock.isHeldByCurrentThread(),
"Must hold the CRM lock when waiting for a rescan.");
@@ -263,7 +262,7 @@
*/
@Override
public void close() throws IOException {
- Preconditions.checkArgument(namesystem.hasWriteLock());
+ Preconditions.checkArgument(blockManager.hasWriteLock());
lock.lock();
try {
if (shutdown) return;
@@ -285,7 +284,7 @@
scannedDirectives = 0;
scannedBlocks = 0;
try {
- namesystem.writeLock();
+ blockManager.writeLock();
try {
lock.lock();
if (shutdown) {
@@ -302,7 +301,7 @@
rescanCachedBlockMap();
blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
} finally {
- namesystem.writeUnlock();
+ blockManager.writeUnlock();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index b32092d..3357fab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -540,7 +540,7 @@
* @param nodeInfo datanode descriptor.
*/
private void removeDatanode(DatanodeDescriptor nodeInfo) {
- assert namesystem.hasWriteLock();
+ assert blockManager.hasWriteLock();
heartbeatManager.removeDatanode(nodeInfo);
blockManager.removeBlocksAssociatedTo(nodeInfo);
networktopology.remove(nodeInfo);
@@ -559,7 +559,7 @@
*/
public void removeDatanode(final DatanodeID node
) throws UnregisteredNodeException {
- namesystem.writeLock();
+ blockManager.writeLock();
try {
final DatanodeDescriptor descriptor = getDatanode(node);
if (descriptor != null) {
@@ -569,7 +569,7 @@
+ node + " does not exist");
}
} finally {
- namesystem.writeUnlock();
+ blockManager.writeUnlock();
}
}
@@ -993,12 +993,12 @@
*/
public void refreshNodes(final Configuration conf) throws IOException {
refreshHostsReader(conf);
- namesystem.writeLock();
+ blockManager.writeLock();
try {
refreshDatanodes();
countSoftwareVersions();
} finally {
- namesystem.writeUnlock();
+ blockManager.writeUnlock();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 42810350..5ce5a83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -366,12 +366,12 @@
numBlocksChecked = 0;
numNodesChecked = 0;
// Check decom progress
- namesystem.writeLock();
+ blockManager.writeLock();
try {
processPendingNodes();
check();
} finally {
- namesystem.writeUnlock();
+ blockManager.writeUnlock();
}
if (numBlocksChecked + numNodesChecked > 0) {
LOG.info("Checked {} blocks and {} nodes this tick", numBlocksChecked,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index d0369aa..24ad6dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -351,20 +351,20 @@
}
if (dead != null) {
// acquire the fsnamesystem lock, and then remove the dead node.
- namesystem.writeLock();
+ blockManager.writeLock();
try {
dm.removeDeadDatanode(dead);
} finally {
- namesystem.writeUnlock();
+ blockManager.writeUnlock();
}
}
if (failedStorage != null) {
// acquire the fsnamesystem lock, and remove blocks on the storage.
- namesystem.writeLock();
+ blockManager.writeLock();
try {
blockManager.removeBlocksAssociatedTo(failedStorage);
} finally {
- namesystem.writeUnlock();
+ blockManager.writeUnlock();
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index 4fd9ca8..3e2443b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -928,7 +928,7 @@
public final void processCacheReport(final DatanodeID datanodeID,
final List<Long> blockIds) throws IOException {
- namesystem.writeLock();
+ blockManager.writeLock();
final long startTime = Time.monotonicNow();
final long endTime;
try {
@@ -942,7 +942,7 @@
processCacheReportImpl(datanode, blockIds);
} finally {
endTime = Time.monotonicNow();
- namesystem.writeUnlock();
+ blockManager.writeUnlock();
}
// Log the block report processing stats from Namenode perspective
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 65b40c8..dcedcc4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -6293,6 +6293,11 @@
return haContext;
}
+ @Override
+ public ReentrantReadWriteLock getLockImplementation() {
+ return fsLock.coarseLock;
+ }
+
@Override // NameNodeMXBean
public String getCorruptFiles() {
List<String> list = new ArrayList<String>();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index b1012c2..89fe678 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@ -30,6 +30,8 @@
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
/** Namesystem operations. */
@InterfaceAudience.Private
public interface Namesystem extends RwLock, SafeMode {
@@ -67,4 +69,6 @@
CacheManager getCacheManager();
HAContext getHAContext();
+
+ ReentrantReadWriteLock getLockImplementation();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index 64d80bd..1ae6c2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -172,10 +172,10 @@
* @param dnName the name of the DataNode
*/
public static void noticeDeadDatanode(NameNode nn, String dnName) {
- FSNamesystem namesystem = nn.getNamesystem();
- namesystem.writeLock();
+ final BlockManager bm = nn.getNamesystem().getBlockManager();
+ bm.writeLock();
try {
- DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
+ DatanodeManager dnm = bm.getDatanodeManager();
HeartbeatManager hbm = dnm.getHeartbeatManager();
DatanodeDescriptor[] dnds = hbm.getDatanodes();
DatanodeDescriptor theDND = null;
@@ -191,7 +191,7 @@
hbm.heartbeatCheck();
}
} finally {
- namesystem.writeUnlock();
+ bm.writeUnlock();
}
}
@@ -220,18 +220,17 @@
* Call heartbeat check function of HeartbeatManager and get
* under replicated blocks count within write lock to make sure
* computeDatanodeWork doesn't interfere.
- * @param namesystem the FSNamesystem
* @param bm the BlockManager to manipulate
* @return the number of under replicated blocks
*/
public static int checkHeartbeatAndGetUnderReplicatedBlocksCount(
- FSNamesystem namesystem, BlockManager bm) {
- namesystem.writeLock();
+ BlockManager bm) {
+ bm.writeLock();
try {
bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
return bm.getUnderReplicatedNotMissingBlocks();
} finally {
- namesystem.writeUnlock();
+ bm.writeUnlock();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 16d482e..76b4f14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -23,6 +23,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
@@ -34,6 +35,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -63,12 +65,12 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
+import org.mockito.internal.util.reflection.Whitebox;
public class TestBlockManager {
private DatanodeStorageInfo[] storages;
@@ -97,11 +99,16 @@
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
"need to set a dummy value here so it assumes a multi-rack cluster");
- fsn = Mockito.mock(FSNamesystem.class);
- Mockito.doReturn(true).when(fsn).hasWriteLock();
- Mockito.doReturn(true).when(fsn).hasReadLock();
- Mockito.doReturn(true).when(fsn).isRunning();
+ fsn = mock(FSNamesystem.class);
+ doReturn(true).when(fsn).isRunning();
+ Lock lockImpl = mock(Lock.class);
+ BlockManagerLock lock = mock(BlockManagerLock.class);
bm = new BlockManager(fsn, conf);
+ Whitebox.setInternalState(bm, "lock", lock);
+ doReturn(true).when(lock).hasWriteLock();
+ doReturn(true).when(lock).hasReadLock();
+ doReturn(lockImpl).when(lock).readLock();
+ doReturn(lockImpl).when(lock).writeLock();
final String[] racks = {
"/rackA",
"/rackA",
@@ -438,9 +445,9 @@
private BlockInfo addBlockOnNodes(long blockId, List<DatanodeDescriptor> nodes) {
long inodeId = ++mockINodeId;
- BlockCollection bc = Mockito.mock(BlockCollection.class);
- Mockito.doReturn(inodeId).when(bc).getId();
- Mockito.doReturn(bc).when(fsn).getBlockCollection(inodeId);
+ BlockCollection bc = mock(BlockCollection.class);
+ doReturn(inodeId).when(bc).getId();
+ doReturn(bc).when(fsn).getBlockCollection(inodeId);
BlockInfo blockInfo = blockOnNodes(blockId, nodes);
blockInfo.setReplication((short) 3);
@@ -749,7 +756,7 @@
Block block = new Block(blkId);
BlockInfo blockInfo =
new BlockInfoContiguous(block, (short) 3);
- BlockCollection bc = Mockito.mock(BlockCollection.class);
+ BlockCollection bc = mock(BlockCollection.class);
long inodeId = ++mockINodeId;
doReturn(inodeId).when(bc).getId();
bm.blocksMap.addBlockCollection(blockInfo, bc);
@@ -761,7 +768,7 @@
Block block = new Block(blkId);
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
blockInfo.convertToBlockUnderConstruction(UNDER_CONSTRUCTION, null);
- BlockCollection bc = Mockito.mock(BlockCollection.class);
+ BlockCollection bc = mock(BlockCollection.class);
long inodeId = ++mockINodeId;
doReturn(inodeId).when(bc).getId();
bm.blocksMap.addBlockCollection(blockInfo, bc);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index b55a716..8bad600 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -55,6 +55,7 @@
import org.apache.hadoop.util.Shell;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import static org.hamcrest.core.Is.is;
@@ -70,6 +71,7 @@
private static DatanodeManager mockDatanodeManager(
FSNamesystem fsn, Configuration conf) throws IOException {
BlockManager bm = Mockito.mock(BlockManager.class);
+ Mockito.doReturn(true).when(bm).hasWriteLock();
BlockReportLeaseManager blm = new BlockReportLeaseManager(conf);
Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm);
DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index 37fcf34..8e9ce12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -36,6 +36,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.Lock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
@@ -66,6 +67,7 @@
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.mockito.internal.util.reflection.Whitebox;
@RunWith(Parameterized.class)
public class TestReplicationPolicy extends BaseReplicationPolicyTest {
@@ -1221,9 +1223,11 @@
public void testAddStoredBlockDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
- when(mockNS.hasWriteLock()).thenReturn(true);
- when(mockNS.hasReadLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
+ BlockManagerLock lock = mock(BlockManagerLock.class);
+ when(lock.hasWriteLock()).thenReturn(true);
+ when(lock.hasReadLock()).thenReturn(true);
+ Whitebox.setInternalState(bm, "lock", lock);
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
@@ -1270,9 +1274,12 @@
testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
- when(mockNS.hasReadLock()).thenReturn(true);
-
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
+ BlockManagerLock lock = mock(BlockManagerLock.class);
+ Lock impl = mock(Lock.class);
+ when(lock.hasReadLock()).thenReturn(true);
+ when(lock.writeLock()).thenReturn(impl);
+ Whitebox.setInternalState(bm, "lock", lock);
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
long blkID1 = ThreadLocalRandom.current().nextLong();
@@ -1342,9 +1349,12 @@
public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
- when(mockNS.hasReadLock()).thenReturn(true);
-
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
+ BlockManagerLock lock = mock(BlockManagerLock.class);
+ Lock impl = mock(Lock.class);
+ when(lock.hasReadLock()).thenReturn(true);
+ when(lock.writeLock()).thenReturn(impl);
+ Whitebox.setInternalState(bm, "lock", lock);
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 2c4fcc5..98519c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -393,8 +393,7 @@
// underReplicatedBlocks are due to failed volumes
int underReplicatedBlocks =
- BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount(
- cluster.getNamesystem(), bm);
+ BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount(bm);
assertTrue("There is no under replicated block after volume failure",
underReplicatedBlocks > 0);
}