Revert "HDFS-10301. Interleaving processing of storages from repeated block reports causes false zombie storage detection, removes valid blocks. Contributed by Vinitha Gankidi."
This reverts commit 85a20508bd04851d47c24b7562ec2927d5403446.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
index 26340a9..26c7ffb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
@@ -63,34 +63,6 @@
public Iterator<BlockReportReplica> iterator() {
return Collections.emptyIterator();
}
- @Override
- public boolean isStorageReport() {
- return false;
- }
- };
-
- // STORAGE_REPORT is used to report all storages in the DN
- public static final BlockListAsLongs STORAGE_REPORT = new BlockListAsLongs() {
- @Override
- public int getNumberOfBlocks() {
- return -1;
- }
- @Override
- public ByteString getBlocksBuffer() {
- return ByteString.EMPTY;
- }
- @Override
- public long[] getBlockListAsLongs() {
- return EMPTY_LONGS;
- }
- @Override
- public Iterator<BlockReportReplica> iterator() {
- return Collections.emptyIterator();
- }
- @Override
- public boolean isStorageReport() {
- return true;
- }
};
/**
@@ -281,13 +253,6 @@
abstract public long[] getBlockListAsLongs();
/**
- * Return true for STORAGE_REPORT BlocksListsAsLongs.
- * Otherwise return false.
- * @return boolean
- */
- abstract public boolean isStorageReport();
-
- /**
* Returns a singleton iterator over blocks in the block report. Do not
* add the returned blocks to a collection.
* @return Iterator
@@ -427,11 +392,6 @@
}
@Override
- public boolean isStorageReport() {
- return false;
- }
-
- @Override
public Iterator<BlockReportReplica> iterator() {
return new Iterator<BlockReportReplica>() {
final BlockReportReplica block = new BlockReportReplica();
@@ -515,11 +475,6 @@
}
@Override
- public boolean isStorageReport() {
- return false;
- }
-
- @Override
public Iterator<BlockReportReplica> iterator() {
return new Iterator<BlockReportReplica>() {
private final BlockReportReplica block = new BlockReportReplica();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index d927b2a..349b018 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2138,7 +2138,7 @@
public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage,
final BlockListAsLongs newReport,
- BlockReportContext context) throws IOException {
+ BlockReportContext context, boolean lastStorageInRpc) throws IOException {
namesystem.writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime;
@@ -2189,14 +2189,30 @@
storageInfo.receivedBlockReport();
if (context != null) {
- if (context.getTotalRpcs() == context.getCurRpc() + 1) {
- long leaseId = this.getBlockReportLeaseManager().removeLease(node);
- BlockManagerFaultInjector.getInstance().
- removeBlockReportLease(node, leaseId);
+ storageInfo.setLastBlockReportId(context.getReportId());
+ if (lastStorageInRpc) {
+ int rpcsSeen = node.updateBlockReportContext(context);
+ if (rpcsSeen >= context.getTotalRpcs()) {
+ long leaseId = blockReportLeaseManager.removeLease(node);
+ BlockManagerFaultInjector.getInstance().
+ removeBlockReportLease(node, leaseId);
+ List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
+ if (zombies.isEmpty()) {
+ LOG.debug("processReport 0x{}: no zombie storages found.",
+ Long.toHexString(context.getReportId()));
+ } else {
+ for (DatanodeStorageInfo zombie : zombies) {
+ removeZombieReplicas(context, zombie);
+ }
+ }
+ node.clearBlockReportContext();
+ } else {
+ LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
+ "report.", Long.toHexString(context.getReportId()),
+ (context.getTotalRpcs() - rpcsSeen)
+ );
+ }
}
- LOG.debug("Processing RPC with index {} out of total {} RPCs in "
- + "processReport 0x{}", context.getCurRpc(),
- context.getTotalRpcs(), Long.toHexString(context.getReportId()));
}
} finally {
endTime = Time.monotonicNow();
@@ -2222,26 +2238,6 @@
return !node.hasStaleStorages();
}
- public void removeZombieStorages(DatanodeRegistration nodeReg,
- BlockReportContext context, Set<String> storageIDsInBlockReport)
- throws UnregisteredNodeException {
- namesystem.writeLock();
- DatanodeDescriptor node = this.getDatanodeManager().getDatanode(nodeReg);
- if (node != null) {
- List<DatanodeStorageInfo> zombies =
- node.removeZombieStorages(storageIDsInBlockReport);
- if (zombies.isEmpty()) {
- LOG.debug("processReport 0x{}: no zombie storages found.",
- Long.toHexString(context.getReportId()));
- } else {
- for (DatanodeStorageInfo zombie : zombies) {
- this.removeZombieReplicas(context, zombie);
- }
- }
- }
- namesystem.writeUnlock();
- }
-
private void removeZombieReplicas(BlockReportContext context,
DatanodeStorageInfo zombie) {
LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
index 34e0949..7db05c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
@@ -308,10 +308,10 @@
return false;
}
if (node.leaseId == 0) {
- LOG.warn("BR lease 0x{} is not found for DN {}, because the DN " +
+ LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " +
"is not in the pending set.",
Long.toHexString(id), dn.getDatanodeUuid());
- return true;
+ return false;
}
if (pruneIfExpired(monotonicNowMs, node)) {
LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " +
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index d807ab6..1646129 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -42,6 +43,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -152,6 +154,9 @@
public final DecommissioningStatus decommissioningStatus =
new DecommissioningStatus();
+ private long curBlockReportId = 0;
+
+ private BitSet curBlockReportRpcsSeen = null;
private final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<>();
@@ -252,6 +257,20 @@
updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
}
+ public int updateBlockReportContext(BlockReportContext context) {
+ if (curBlockReportId != context.getReportId()) {
+ curBlockReportId = context.getReportId();
+ curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
+ }
+ curBlockReportRpcsSeen.set(context.getCurRpc());
+ return curBlockReportRpcsSeen.cardinality();
+ }
+
+ public void clearBlockReportContext() {
+ curBlockReportId = 0;
+ curBlockReportRpcsSeen = null;
+ }
+
public CachedBlocksList getPendingCached() {
return pendingCached;
}
@@ -315,8 +334,7 @@
}
}
- List<DatanodeStorageInfo>
- removeZombieStorages(Set<String> storageIDsInBlockReport) {
+ List<DatanodeStorageInfo> removeZombieStorages() {
List<DatanodeStorageInfo> zombies = null;
synchronized (storageMap) {
Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
@@ -324,13 +342,18 @@
while (iter.hasNext()) {
Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
DatanodeStorageInfo storageInfo = entry.getValue();
- if (!storageIDsInBlockReport.contains(storageInfo.getStorageID())) {
+ if (storageInfo.getLastBlockReportId() != curBlockReportId) {
+ LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}",
+ storageInfo.getStorageID(),
+ Long.toHexString(storageInfo.getLastBlockReportId()),
+ Long.toHexString(curBlockReportId));
iter.remove();
if (zombies == null) {
zombies = new LinkedList<>();
}
zombies.add(storageInfo);
}
+ storageInfo.setLastBlockReportId(0);
}
}
return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 1b7cd7c..843a8d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -98,6 +98,9 @@
private final FoldedTreeSet<BlockInfo> blocks = new FoldedTreeSet<>();
+ // The ID of the last full block report which updated this storage.
+ private long lastBlockReportId = 0;
+
/** The number of block reports received */
private int blockReportCount = 0;
@@ -162,6 +165,14 @@
this.blockPoolUsed = blockPoolUsed;
}
+ long getLastBlockReportId() {
+ return lastBlockReportId;
+ }
+
+ void setLastBlockReportId(long lastBlockReportId) {
+ this.lastBlockReportId = lastBlockReportId;
+ }
+
State getState() {
return this.state;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index f18cf0b..69989fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -367,36 +367,11 @@
} else {
// Send one block report per message.
for (int r = 0; r < reports.length; r++) {
- StorageBlockReport[] singleReport = {reports[r]};
- DatanodeCommand cmd;
- if (r != reports.length - 1) {
- cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(),
- singleReport, new BlockReportContext(reports.length, r,
- reportId, fullBrLeaseId, true));
- } else {
- StorageBlockReport[] lastSplitReport =
- new StorageBlockReport[perVolumeBlockLists.size()];
- // When block reports are split, the last RPC in the block report
- // has the information about all storages in the block report.
- // See HDFS-10301 for more details. To achieve this, the last RPC
- // has 'n' storage reports, where 'n' is the number of storages in
- // a DN. The actual block replicas are reported only for the
- // last/n-th storage.
- i = 0;
- for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair :
- perVolumeBlockLists.entrySet()) {
- lastSplitReport[i++] = new StorageBlockReport(
- kvPair.getKey(), BlockListAsLongs.STORAGE_REPORT);
- if (i == r) {
- lastSplitReport[i] = reports[r];
- break;
- }
- }
- cmd = bpNamenode.blockReport(
- bpRegistration, bpos.getBlockPoolId(), lastSplitReport,
- new BlockReportContext(reports.length, r, reportId,
- fullBrLeaseId, true));
- }
+ StorageBlockReport singleReport[] = { reports[r] };
+ DatanodeCommand cmd = bpNamenode.blockReport(
+ bpRegistration, bpos.getBlockPoolId(), singleReport,
+ new BlockReportContext(reports.length, r, reportId,
+ fullBrLeaseId, true));
numReportsSent++;
numRPCs++;
if (cmd != null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 3f36fcc..6b52949 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1435,37 +1435,25 @@
boolean noStaleStorages = false;
for (int r = 0; r < reports.length; r++) {
final BlockListAsLongs blocks = reports[r].getBlocks();
- if (!blocks.isStorageReport()) {
- //
- // BlockManager.processReport accumulates information of prior calls
- // for the same node and storage, so the value returned by the last
- // call of this loop is the final updated value for noStaleStorage.
- //
- final int index = r;
- noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
- @Override
- public Boolean call()
- throws IOException {
- return bm.processReport(nodeReg, reports[index].getStorage(),
- blocks, context);
- }
- });
- metrics.incrStorageBlockReportOps();
- }
+ //
+ // BlockManager.processReport accumulates information of prior calls
+ // for the same node and storage, so the value returned by the last
+ // call of this loop is the final updated value for noStaleStorage.
+ //
+ final int index = r;
+ noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws IOException {
+ return bm.processReport(nodeReg, reports[index].getStorage(),
+ blocks, context, (index == reports.length - 1));
+ }
+ });
+ metrics.incrStorageBlockReportOps();
}
BlockManagerFaultInjector.getInstance().
incomingBlockReportRpc(nodeReg, context);
if (nn.getFSImage().isUpgradeFinalized() &&
- context.getTotalRpcs() == context.getCurRpc() + 1) {
- Set<String> storageIDsInBlockReport = new HashSet<>();
- for (StorageBlockReport report : reports) {
- storageIDsInBlockReport.add(report.getStorage().getStorageID());
- }
- bm.removeZombieStorages(nodeReg, context, storageIDsInBlockReport);
- }
-
- if (nn.getFSImage().isUpgradeFinalized() &&
!namesystem.isRollingUpgrade() &&
!nn.isStandbyState() &&
noStaleStorages) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 8c231d1..394fae9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -713,12 +713,12 @@
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- BlockListAsLongs.EMPTY, null);
+ BlockListAsLongs.EMPTY, null, false);
assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- BlockListAsLongs.EMPTY, null);
+ BlockListAsLongs.EMPTY, null, false);
assertEquals(1, ds.getBlockReportCount());
// re-register as if node restarted, should update existing node
@@ -729,7 +729,7 @@
// send block report, should be processed after restart
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- BlockListAsLongs.EMPTY, null);
+ BlockListAsLongs.EMPTY, null, false);
// Reinitialize as registration with empty storage list pruned
// node.storageMap.
ds = node.getStorageInfos()[0];
@@ -758,7 +758,7 @@
reset(node);
doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- BlockListAsLongs.EMPTY, null);
+ BlockListAsLongs.EMPTY, null, false);
assertEquals(1, ds.getBlockReportCount());
}
@@ -832,7 +832,7 @@
assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
builder.build(),
- new BlockReportContext(1, 0, System.nanoTime(), 0, true));
+ new BlockReportContext(1, 0, System.nanoTime(), 0, true), false);
assertEquals(1, ds.getBlockReportCount());
// verify the storage info is correct
@@ -871,7 +871,8 @@
assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
generateReport(blocks),
- new BlockReportContext(1, 0, System.nanoTime(), 0, false));
+ new BlockReportContext(1, 0, System.nanoTime(), 0, false),
+ false);
assertEquals(1, ds.getBlockReportCount());
// verify the storage info is correct
for (BlockInfo block : blocks) {
@@ -881,7 +882,8 @@
// Send unsorted report
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
generateReport(blocks),
- new BlockReportContext(1, 0, System.nanoTime(), 0, false));
+ new BlockReportContext(1, 0, System.nanoTime(), 0, false),
+ false);
assertEquals(2, ds.getBlockReportCount());
// verify the storage info is correct
for (BlockInfo block : blocks) {
@@ -892,7 +894,8 @@
Collections.sort(blocks);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
generateReport(blocks),
- new BlockReportContext(1, 0, System.nanoTime(), 0, true));
+ new BlockReportContext(1, 0, System.nanoTime(), 0, true),
+ false);
assertEquals(3, ds.getBlockReportCount());
// verify the storage info is correct
for (BlockInfo block : blocks) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index be38afe..b11b48ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -19,40 +19,34 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Supplier;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
+import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -61,6 +55,8 @@
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
+import java.util.Arrays;
+import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@@ -372,68 +368,4 @@
cluster.shutdown();
}
}
-
- @Test(timeout=300000)
- public void testInterleavedFullBlockReports() throws Exception {
- Configuration conf = new HdfsConfiguration();
- conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
- 36000000L);
- int numStoragesPerDatanode = 6;
- final MiniDFSCluster cluster = new MiniDFSCluster
- .Builder(conf).numDataNodes(1)
- .storagesPerDatanode(numStoragesPerDatanode)
- .build();
- try {
- LOG.info("waiting for cluster to become active...");
- cluster.waitActive();
- // Get the datanode registration and the block reports
- DataNode dn = cluster.getDataNodes().get(0);
- final String blockPoolId = cluster.getNamesystem().getBlockPoolId();
- LOG.info("Block pool id: " + blockPoolId);
- final DatanodeRegistration dnR = dn.getDNRegistrationForBP(blockPoolId);
- Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
- dn.getFSDataset().getBlockReports(blockPoolId);
- final StorageBlockReport[] reports =
- new StorageBlockReport[perVolumeBlockLists.size()];
- int reportIndex = 0;
- for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair :
- perVolumeBlockLists.entrySet()) {
- DatanodeStorage dnStorage = kvPair.getKey();
- BlockListAsLongs blockList = kvPair.getValue();
- reports[reportIndex++] =
- new StorageBlockReport(dnStorage, blockList);
- }
- // Get the list of storage ids associated with the datanode
- // before the test
- BlockManager bm =
- cluster.getNameNode().getNamesystem().getBlockManager();
- final DatanodeDescriptor dnDescriptor = bm.getDatanodeManager().
- getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid());
- DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos();
- // Send the full block report concurrently using
- // numThreads=numStoragesPerDatanode
- ExecutorService executorService = Executors.
- newFixedThreadPool(numStoragesPerDatanode);
- List<Future<DatanodeCommand>> futureList =
- new ArrayList<>(numStoragesPerDatanode);
- for (int i = 0; i < numStoragesPerDatanode; i++) {
- futureList.add(executorService.submit(new Callable<DatanodeCommand>() {
- @Override
- public DatanodeCommand call() throws IOException {
- return cluster.getNameNodeRpc().blockReport(dnR, blockPoolId,
- reports, new BlockReportContext(1, 0, System.nanoTime(),
- 0L, true));
- }
- }));
- }
- for (Future<DatanodeCommand> future: futureList) {
- future.get();
- }
- executorService.shutdown();
- // Verify that the storages match before and after the test
- Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos());
- } finally {
- cluster.shutdown();
- }
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
index f41c546..bf0e3c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
@@ -41,7 +41,6 @@
import org.mockito.Mockito;
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.times;
@@ -89,34 +88,6 @@
blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed);
}
- private void verifyCapturedArgumentsSplit(
- ArgumentCaptor<StorageBlockReport[]> captor,
- int expectedReportsPerCall,
- int expectedTotalBlockCount) {
- List<StorageBlockReport[]> listOfReports = captor.getAllValues();
- int numBlocksReported = 0;
- int storageIndex = 0;
- int listOfReportsSize = listOfReports.size();
- for (StorageBlockReport[] reports : listOfReports) {
- if (storageIndex < (listOfReportsSize - 1)) {
- assertThat(reports.length, is(expectedReportsPerCall));
- } else {
- assertThat(reports.length, is(listOfReportsSize));
- }
- for (StorageBlockReport report : reports) {
- BlockListAsLongs blockList = report.getBlocks();
- if (!blockList.isStorageReport()) {
- numBlocksReported += blockList.getNumberOfBlocks();
- } else {
- assertEquals(blockList.getNumberOfBlocks(), -1);
- }
- }
- storageIndex++;
- }
-
- assert(numBlocksReported >= expectedTotalBlockCount);
- }
-
private void verifyCapturedArguments(
ArgumentCaptor<StorageBlockReport[]> captor,
int expectedReportsPerCall,
@@ -165,7 +136,7 @@
anyString(),
captor.capture(), Mockito.<BlockReportContext>anyObject());
- verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE);
+ verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
}
/**
@@ -229,7 +200,7 @@
anyString(),
captor.capture(), Mockito.<BlockReportContext>anyObject());
- verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE);
+ verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
index 524243b..791ee20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
@@ -20,7 +20,6 @@
import java.io.IOException;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@@ -35,32 +34,13 @@
@Override
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException {
- for (int r = 0; r < reports.length; r++) {
- LOG.info("Sending block report for storage " +
- reports[r].getStorage().getStorageID());
- StorageBlockReport[] singletonReport = {reports[r]};
- if (r != reports.length - 1) {
- cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
- new BlockReportContext(reports.length, r, System.nanoTime(),
- 0L, true));
- } else {
- StorageBlockReport[] lastSplitReport =
- new StorageBlockReport[reports.length];
- // When block reports are split, send a dummy storage report for all
- // other storages in the blockreport along with the last storage report
- for (int i = 0; i <= r; i++) {
- if (i == r) {
- lastSplitReport[i] = reports[r];
- break;
- }
- lastSplitReport[i] =
- new StorageBlockReport(reports[i].getStorage(),
- BlockListAsLongs.STORAGE_REPORT);
- }
- cluster.getNameNodeRpc().blockReport(dnR, poolId, lastSplitReport,
- new BlockReportContext(reports.length, r, System.nanoTime(),
- 0L, true));
- }
+ int i = 0;
+ for (StorageBlockReport report : reports) {
+ LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
+ StorageBlockReport[] singletonReport = { report };
+ cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
+ new BlockReportContext(reports.length, i, System.nanoTime(), 0L, true));
+ i++;
}
}
}