Revert "HDFS-10301. Interleaving processing of storages from repeated block reports causes false zombie storage detection, removes valid blocks. Contributed by Vinitha Gankidi."

This reverts commit 85a20508bd04851d47c24b7562ec2927d5403446.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
index 26340a9..26c7ffb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
@@ -63,34 +63,6 @@
     public Iterator<BlockReportReplica> iterator() {
       return Collections.emptyIterator();
     }
-    @Override
-    public boolean isStorageReport() {
-      return false;
-    }
-  };
-
-  // STORAGE_REPORT is used to report all storages in the DN
-  public static final BlockListAsLongs STORAGE_REPORT = new BlockListAsLongs() {
-    @Override
-    public int getNumberOfBlocks() {
-      return -1;
-    }
-    @Override
-    public ByteString getBlocksBuffer() {
-      return ByteString.EMPTY;
-    }
-    @Override
-    public long[] getBlockListAsLongs() {
-      return EMPTY_LONGS;
-    }
-    @Override
-    public Iterator<BlockReportReplica> iterator() {
-      return Collections.emptyIterator();
-    }
-    @Override
-    public boolean isStorageReport() {
-      return true;
-    }
   };
 
   /**
@@ -281,13 +253,6 @@
   abstract public long[] getBlockListAsLongs();
 
   /**
-   * Return true for STORAGE_REPORT BlocksListsAsLongs.
-   * Otherwise return false.
-   * @return boolean
-   */
-  abstract public boolean isStorageReport();
-
-  /**
    * Returns a singleton iterator over blocks in the block report.  Do not
    * add the returned blocks to a collection.
    * @return Iterator
@@ -427,11 +392,6 @@
     }
 
     @Override
-    public boolean isStorageReport() {
-      return false;
-    }
-
-    @Override
     public Iterator<BlockReportReplica> iterator() {
       return new Iterator<BlockReportReplica>() {
         final BlockReportReplica block = new BlockReportReplica();
@@ -515,11 +475,6 @@
     }
 
     @Override
-    public boolean isStorageReport() {
-      return false;
-    }
-
-    @Override
     public Iterator<BlockReportReplica> iterator() {
       return new Iterator<BlockReportReplica>() {
         private final BlockReportReplica block = new BlockReportReplica();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index d927b2a..349b018 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2138,7 +2138,7 @@
   public boolean processReport(final DatanodeID nodeID,
       final DatanodeStorage storage,
       final BlockListAsLongs newReport,
-      BlockReportContext context) throws IOException {
+      BlockReportContext context, boolean lastStorageInRpc) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.monotonicNow(); //after acquiring write lock
     final long endTime;
@@ -2189,14 +2189,30 @@
       
       storageInfo.receivedBlockReport();
       if (context != null) {
-        if (context.getTotalRpcs() == context.getCurRpc() + 1) {
-          long leaseId = this.getBlockReportLeaseManager().removeLease(node);
-          BlockManagerFaultInjector.getInstance().
-              removeBlockReportLease(node, leaseId);
+        storageInfo.setLastBlockReportId(context.getReportId());
+        if (lastStorageInRpc) {
+          int rpcsSeen = node.updateBlockReportContext(context);
+          if (rpcsSeen >= context.getTotalRpcs()) {
+            long leaseId = blockReportLeaseManager.removeLease(node);
+            BlockManagerFaultInjector.getInstance().
+                removeBlockReportLease(node, leaseId);
+            List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
+            if (zombies.isEmpty()) {
+              LOG.debug("processReport 0x{}: no zombie storages found.",
+                  Long.toHexString(context.getReportId()));
+            } else {
+              for (DatanodeStorageInfo zombie : zombies) {
+                removeZombieReplicas(context, zombie);
+              }
+            }
+            node.clearBlockReportContext();
+          } else {
+            LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
+                    "report.", Long.toHexString(context.getReportId()),
+                (context.getTotalRpcs() - rpcsSeen)
+            );
+          }
         }
-        LOG.debug("Processing RPC with index {} out of total {} RPCs in "
-                + "processReport 0x{}", context.getCurRpc(),
-            context.getTotalRpcs(), Long.toHexString(context.getReportId()));
       }
     } finally {
       endTime = Time.monotonicNow();
@@ -2222,26 +2238,6 @@
     return !node.hasStaleStorages();
   }
 
-  public void removeZombieStorages(DatanodeRegistration nodeReg,
-      BlockReportContext context, Set<String> storageIDsInBlockReport)
-      throws UnregisteredNodeException {
-    namesystem.writeLock();
-    DatanodeDescriptor node = this.getDatanodeManager().getDatanode(nodeReg);
-    if (node != null) {
-      List<DatanodeStorageInfo> zombies =
-          node.removeZombieStorages(storageIDsInBlockReport);
-      if (zombies.isEmpty()) {
-        LOG.debug("processReport 0x{}: no zombie storages found.",
-            Long.toHexString(context.getReportId()));
-      } else {
-        for (DatanodeStorageInfo zombie : zombies) {
-          this.removeZombieReplicas(context, zombie);
-        }
-      }
-    }
-    namesystem.writeUnlock();
-  }
-
   private void removeZombieReplicas(BlockReportContext context,
       DatanodeStorageInfo zombie) {
     LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
index 34e0949..7db05c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
@@ -308,10 +308,10 @@
       return false;
     }
     if (node.leaseId == 0) {
-      LOG.warn("BR lease 0x{} is not found for DN {}, because the DN " +
+      LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " +
                "is not in the pending set.",
                Long.toHexString(id), dn.getDatanodeUuid());
-      return true;
+      return false;
     }
     if (pruneIfExpired(monotonicNowMs, node)) {
       LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " +
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index d807ab6..1646129 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -42,6 +43,7 @@
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -152,6 +154,9 @@
   public final DecommissioningStatus decommissioningStatus =
       new DecommissioningStatus();
 
+  private long curBlockReportId = 0;
+
+  private BitSet curBlockReportRpcsSeen = null;
 
   private final Map<String, DatanodeStorageInfo> storageMap =
       new HashMap<>();
@@ -252,6 +257,20 @@
     updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
   }
 
+  public int updateBlockReportContext(BlockReportContext context) {
+    if (curBlockReportId != context.getReportId()) {
+      curBlockReportId = context.getReportId();
+      curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
+    }
+    curBlockReportRpcsSeen.set(context.getCurRpc());
+    return curBlockReportRpcsSeen.cardinality();
+  }
+
+  public void clearBlockReportContext() {
+    curBlockReportId = 0;
+    curBlockReportRpcsSeen = null;
+  }
+
   public CachedBlocksList getPendingCached() {
     return pendingCached;
   }
@@ -315,8 +334,7 @@
     }
   }
 
-  List<DatanodeStorageInfo>
-      removeZombieStorages(Set<String> storageIDsInBlockReport) {
+  List<DatanodeStorageInfo> removeZombieStorages() {
     List<DatanodeStorageInfo> zombies = null;
     synchronized (storageMap) {
       Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
@@ -324,13 +342,18 @@
       while (iter.hasNext()) {
         Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
         DatanodeStorageInfo storageInfo = entry.getValue();
-        if (!storageIDsInBlockReport.contains(storageInfo.getStorageID())) {
+        if (storageInfo.getLastBlockReportId() != curBlockReportId) {
+          LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}",
+              storageInfo.getStorageID(),
+              Long.toHexString(storageInfo.getLastBlockReportId()),
+              Long.toHexString(curBlockReportId));
           iter.remove();
           if (zombies == null) {
             zombies = new LinkedList<>();
           }
           zombies.add(storageInfo);
         }
+        storageInfo.setLastBlockReportId(0);
       }
     }
     return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 1b7cd7c..843a8d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -98,6 +98,9 @@
 
   private final FoldedTreeSet<BlockInfo> blocks = new FoldedTreeSet<>();
 
+  // The ID of the last full block report which updated this storage.
+  private long lastBlockReportId = 0;
+
   /** The number of block reports received */
   private int blockReportCount = 0;
 
@@ -162,6 +165,14 @@
     this.blockPoolUsed = blockPoolUsed;
   }
 
+  long getLastBlockReportId() {
+    return lastBlockReportId;
+  }
+
+  void setLastBlockReportId(long lastBlockReportId) {
+    this.lastBlockReportId = lastBlockReportId;
+  }
+
   State getState() {
     return this.state;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index f18cf0b..69989fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -367,36 +367,11 @@
       } else {
         // Send one block report per message.
         for (int r = 0; r < reports.length; r++) {
-          StorageBlockReport[] singleReport = {reports[r]};
-          DatanodeCommand cmd;
-          if (r != reports.length - 1) {
-            cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(),
-                singleReport, new BlockReportContext(reports.length, r,
-                    reportId, fullBrLeaseId, true));
-          } else {
-            StorageBlockReport[] lastSplitReport =
-                new StorageBlockReport[perVolumeBlockLists.size()];
-            // When block reports are split, the last RPC in the block report
-            // has the information about all storages in the block report.
-            // See HDFS-10301 for more details. To achieve this, the last RPC
-            // has 'n' storage reports, where 'n' is the number of storages in
-            // a DN. The actual block replicas are reported only for the
-            // last/n-th storage.
-            i = 0;
-            for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair :
-                perVolumeBlockLists.entrySet()) {
-              lastSplitReport[i++] = new StorageBlockReport(
-                  kvPair.getKey(), BlockListAsLongs.STORAGE_REPORT);
-              if (i == r) {
-                lastSplitReport[i] = reports[r];
-                break;
-              }
-            }
-            cmd = bpNamenode.blockReport(
-                bpRegistration, bpos.getBlockPoolId(), lastSplitReport,
-                new BlockReportContext(reports.length, r, reportId,
-                    fullBrLeaseId, true));
-          }
+          StorageBlockReport singleReport[] = { reports[r] };
+          DatanodeCommand cmd = bpNamenode.blockReport(
+              bpRegistration, bpos.getBlockPoolId(), singleReport,
+              new BlockReportContext(reports.length, r, reportId,
+                  fullBrLeaseId, true));
           numReportsSent++;
           numRPCs++;
           if (cmd != null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 3f36fcc..6b52949 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1435,37 +1435,25 @@
     boolean noStaleStorages = false;
     for (int r = 0; r < reports.length; r++) {
       final BlockListAsLongs blocks = reports[r].getBlocks();
-      if (!blocks.isStorageReport()) {
-        //
-        // BlockManager.processReport accumulates information of prior calls
-        // for the same node and storage, so the value returned by the last
-        // call of this loop is the final updated value for noStaleStorage.
-        //
-        final int index = r;
-        noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
-          @Override
-          public Boolean call()
-              throws IOException {
-            return bm.processReport(nodeReg, reports[index].getStorage(),
-                blocks, context);
-          }
-        });
-        metrics.incrStorageBlockReportOps();
-      }
+      //
+      // BlockManager.processReport accumulates information of prior calls
+      // for the same node and storage, so the value returned by the last
+      // call of this loop is the final updated value for noStaleStorage.
+      //
+      final int index = r;
+      noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws IOException {
+          return bm.processReport(nodeReg, reports[index].getStorage(),
+              blocks, context, (index == reports.length - 1));
+        }
+      });
+      metrics.incrStorageBlockReportOps();
     }
     BlockManagerFaultInjector.getInstance().
         incomingBlockReportRpc(nodeReg, context);
 
     if (nn.getFSImage().isUpgradeFinalized() &&
-        context.getTotalRpcs() == context.getCurRpc() + 1) {
-      Set<String> storageIDsInBlockReport = new HashSet<>();
-      for (StorageBlockReport report : reports) {
-        storageIDsInBlockReport.add(report.getStorage().getStorageID());
-      }
-      bm.removeZombieStorages(nodeReg, context, storageIDsInBlockReport);
-    }
-
-    if (nn.getFSImage().isUpgradeFinalized() &&
         !namesystem.isRollingUpgrade() &&
         !nn.isStandbyState() &&
         noStaleStorages) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 8c231d1..394fae9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -713,12 +713,12 @@
     reset(node);
     
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        BlockListAsLongs.EMPTY, null);
+        BlockListAsLongs.EMPTY, null, false);
     assertEquals(1, ds.getBlockReportCount());
     // send block report again, should NOT be processed
     reset(node);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        BlockListAsLongs.EMPTY, null);
+        BlockListAsLongs.EMPTY, null, false);
     assertEquals(1, ds.getBlockReportCount());
 
     // re-register as if node restarted, should update existing node
@@ -729,7 +729,7 @@
     // send block report, should be processed after restart
     reset(node);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-                     BlockListAsLongs.EMPTY, null);
+                     BlockListAsLongs.EMPTY, null, false);
     // Reinitialize as registration with empty storage list pruned
     // node.storageMap.
     ds = node.getStorageInfos()[0];
@@ -758,7 +758,7 @@
     reset(node);
     doReturn(1).when(node).numBlocks();
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        BlockListAsLongs.EMPTY, null);
+        BlockListAsLongs.EMPTY, null, false);
     assertEquals(1, ds.getBlockReportCount());
   }
 
@@ -832,7 +832,7 @@
     assertEquals(0, ds.getBlockReportCount());
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
         builder.build(),
-        new BlockReportContext(1, 0, System.nanoTime(), 0, true));
+        new BlockReportContext(1, 0, System.nanoTime(), 0, true), false);
     assertEquals(1, ds.getBlockReportCount());
 
     // verify the storage info is correct
@@ -871,7 +871,8 @@
     assertEquals(0, ds.getBlockReportCount());
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
                      generateReport(blocks),
-                     new BlockReportContext(1, 0, System.nanoTime(), 0, false));
+                     new BlockReportContext(1, 0, System.nanoTime(), 0, false),
+                     false);
     assertEquals(1, ds.getBlockReportCount());
     // verify the storage info is correct
     for (BlockInfo block : blocks) {
@@ -881,7 +882,8 @@
     // Send unsorted report
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
                      generateReport(blocks),
-                     new BlockReportContext(1, 0, System.nanoTime(), 0, false));
+                     new BlockReportContext(1, 0, System.nanoTime(), 0, false),
+                     false);
     assertEquals(2, ds.getBlockReportCount());
     // verify the storage info is correct
     for (BlockInfo block : blocks) {
@@ -892,7 +894,8 @@
     Collections.sort(blocks);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
                      generateReport(blocks),
-                     new BlockReportContext(1, 0, System.nanoTime(), 0, true));
+                     new BlockReportContext(1, 0, System.nanoTime(), 0, true),
+                     false);
     assertEquals(3, ds.getBlockReportCount());
     // verify the storage info is correct
     for (BlockInfo block : blocks) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index be38afe..b11b48ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -19,40 +19,34 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import com.google.common.base.Supplier;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -61,6 +55,8 @@
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
+import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
@@ -372,68 +368,4 @@
       cluster.shutdown();
     }
   }
-
-  @Test(timeout=300000)
-  public void testInterleavedFullBlockReports() throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
-        36000000L);
-    int numStoragesPerDatanode = 6;
-    final MiniDFSCluster cluster = new MiniDFSCluster
-        .Builder(conf).numDataNodes(1)
-        .storagesPerDatanode(numStoragesPerDatanode)
-        .build();
-    try {
-      LOG.info("waiting for cluster to become active...");
-      cluster.waitActive();
-      // Get the datanode registration and the block reports
-      DataNode dn = cluster.getDataNodes().get(0);
-      final String blockPoolId = cluster.getNamesystem().getBlockPoolId();
-      LOG.info("Block pool id: " + blockPoolId);
-      final DatanodeRegistration dnR = dn.getDNRegistrationForBP(blockPoolId);
-      Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
-          dn.getFSDataset().getBlockReports(blockPoolId);
-      final StorageBlockReport[] reports =
-          new StorageBlockReport[perVolumeBlockLists.size()];
-      int reportIndex = 0;
-      for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair :
-          perVolumeBlockLists.entrySet()) {
-        DatanodeStorage dnStorage = kvPair.getKey();
-        BlockListAsLongs blockList = kvPair.getValue();
-        reports[reportIndex++] =
-            new StorageBlockReport(dnStorage, blockList);
-      }
-      // Get the list of storage ids associated with the datanode
-      // before the test
-      BlockManager bm =
-          cluster.getNameNode().getNamesystem().getBlockManager();
-      final DatanodeDescriptor dnDescriptor = bm.getDatanodeManager().
-          getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid());
-      DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos();
-      // Send the full block report concurrently using
-      // numThreads=numStoragesPerDatanode
-      ExecutorService executorService = Executors.
-          newFixedThreadPool(numStoragesPerDatanode);
-      List<Future<DatanodeCommand>> futureList =
-          new ArrayList<>(numStoragesPerDatanode);
-      for (int i = 0; i < numStoragesPerDatanode; i++) {
-        futureList.add(executorService.submit(new Callable<DatanodeCommand>() {
-          @Override
-          public DatanodeCommand call() throws IOException {
-            return cluster.getNameNodeRpc().blockReport(dnR, blockPoolId,
-                 reports, new BlockReportContext(1, 0, System.nanoTime(),
-                     0L, true));
-          }
-        }));
-      }
-      for (Future<DatanodeCommand> future: futureList) {
-        future.get();
-      }
-      executorService.shutdown();
-      // Verify that the storages match before and after the test
-      Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos());
-    } finally {
-      cluster.shutdown();
-    }
-  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
index f41c546..bf0e3c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
@@ -41,7 +41,6 @@
 import org.mockito.Mockito;
 
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.times;
@@ -89,34 +88,6 @@
         blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed);
   }
 
-  private void verifyCapturedArgumentsSplit(
-      ArgumentCaptor<StorageBlockReport[]> captor,
-      int expectedReportsPerCall,
-      int expectedTotalBlockCount) {
-    List<StorageBlockReport[]> listOfReports = captor.getAllValues();
-    int numBlocksReported = 0;
-    int storageIndex = 0;
-    int listOfReportsSize = listOfReports.size();
-    for (StorageBlockReport[] reports : listOfReports) {
-      if (storageIndex < (listOfReportsSize - 1)) {
-        assertThat(reports.length, is(expectedReportsPerCall));
-      } else {
-        assertThat(reports.length, is(listOfReportsSize));
-      }
-      for (StorageBlockReport report : reports) {
-        BlockListAsLongs blockList = report.getBlocks();
-        if (!blockList.isStorageReport()) {
-          numBlocksReported += blockList.getNumberOfBlocks();
-        } else {
-          assertEquals(blockList.getNumberOfBlocks(), -1);
-        }
-      }
-      storageIndex++;
-    }
-
-    assert(numBlocksReported >= expectedTotalBlockCount);
-  }
-
   private void verifyCapturedArguments(
       ArgumentCaptor<StorageBlockReport[]> captor,
       int expectedReportsPerCall,
@@ -165,7 +136,7 @@
         anyString(),
         captor.capture(), Mockito.<BlockReportContext>anyObject());
 
-    verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE);
+    verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
   }
 
   /**
@@ -229,7 +200,7 @@
         anyString(),
         captor.capture(), Mockito.<BlockReportContext>anyObject());
 
-    verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE);
+    verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
   }
 
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
index 524243b..791ee20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@@ -35,32 +34,13 @@
   @Override
   protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
       StorageBlockReport[] reports) throws IOException {
-    for (int r = 0; r < reports.length; r++) {
-      LOG.info("Sending block report for storage " +
-          reports[r].getStorage().getStorageID());
-      StorageBlockReport[] singletonReport = {reports[r]};
-      if (r != reports.length - 1) {
-        cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
-            new BlockReportContext(reports.length, r, System.nanoTime(),
-                0L, true));
-      } else {
-        StorageBlockReport[] lastSplitReport =
-            new StorageBlockReport[reports.length];
-        // When block reports are split, send a dummy storage report for all
-        // other storages in the blockreport along with the last storage report
-        for (int i = 0; i <= r; i++) {
-          if (i == r) {
-            lastSplitReport[i] = reports[r];
-            break;
-          }
-          lastSplitReport[i] =
-              new StorageBlockReport(reports[i].getStorage(),
-                  BlockListAsLongs.STORAGE_REPORT);
-        }
-        cluster.getNameNodeRpc().blockReport(dnR, poolId, lastSplitReport,
-            new BlockReportContext(reports.length, r, System.nanoTime(),
-                0L, true));
-      }
+    int i = 0;
+    for (StorageBlockReport report : reports) {
+      LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
+      StorageBlockReport[] singletonReport = { report };
+      cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
+          new BlockReportContext(reports.length, i, System.nanoTime(), 0L, true));
+      i++;
     }
   }
 }