HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the blocklocations which doesn't satisfy BlockGroupSize. Contributed by Rakesh R.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 173bf9b..f1fb7a9 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -397,3 +397,6 @@
 
     HDFS-8854. Erasure coding: add ECPolicy to replace schema+cellSize in 
     hadoop-hdfs. (Walter Su via zhz)
+
+    HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the
+    blocklocations which doesn't satisfy BlockGroupSize. (Rakesh R via zhz)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index 2d51dc4..f533bf9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -167,18 +168,33 @@
 
         final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock(
             excludedNodes);
+        if (lb.getLocations().length < HdfsConstants.NUM_DATA_BLOCKS) {
+          throw new IOException(
+              "Failed to get datablocks number of nodes from namenode: blockGroupSize= "
+                  + (HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS)
+                  + ", blocks.length= " + lb.getLocations().length);
+        }
         final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
             (LocatedStripedBlock)lb,
             BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
 
         for (int i = 0; i < blocks.length; i++) {
-          if (!coordinator.getStripedDataStreamer(i).isFailed()) {
-            if (blocks[i] == null) {
-              getLastException().set(
-                  new IOException("Failed to get following block, i=" + i));
-            } else {
-              followingBlocks.offer(i, blocks[i]);
-            }
+          StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
+          if (si.isFailed()) {
+            continue; // skipping failed data streamer
+          }
+          if (blocks[i] == null) {
+            // Set exception and close streamer as there is no block locations
+            // found for the parity block.
+            LOG.warn("Failed to get block location for parity block, index="
+                + i);
+            si.getLastException().set(
+                new IOException("Failed to get following block, i=" + i));
+            si.setFailed(true);
+            si.endBlock();
+            si.close(true);
+          } else {
+            followingBlocks.offer(i, blocks[i]);
           }
         }
       }
@@ -199,7 +215,11 @@
             .parseStripedBlockGroup((LocatedStripedBlock) updated,
                 BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
         for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
-          final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
+          StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
+          if (si.isFailed()) {
+            continue; // skipping failed data streamer
+          }
+          final ExtendedBlock bi = si.getBlock();
           if (bi != null) {
             final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
                 null, null, null, -1, updated.isCorrupt(), null);
@@ -225,7 +245,11 @@
         final ExtendedBlock newBG = newBlock(bg, newGS);
         final ExtendedBlock updated = callUpdatePipeline(bg, newBG);
         for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
-          final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
+          StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
+          if (si.isFailed()) {
+            continue; // skipping failed data streamer
+          }
+          final ExtendedBlock bi = si.getBlock();
           updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp()));
         }
       }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index fed9f16..f65d0c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,6 +35,7 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -40,6 +43,7 @@
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.security.token.Token;
@@ -145,6 +149,86 @@
     }
   }
 
+  @Test(timeout = 90000)
+  public void testAddBlockWhenNoSufficientDataBlockNumOfNodes()
+      throws IOException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    try {
+      setup(conf);
+      ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+      // shutdown few datanodes to avoid getting sufficient data blocks number
+      // of datanodes
+      int killDns = dataNodes.size() / 2;
+      int numDatanodes = dataNodes.size() - killDns;
+      for (int i = 0; i < killDns; i++) {
+        cluster.stopDataNode(i);
+      }
+      cluster.restartNameNodes();
+      cluster.triggerHeartbeats();
+      DatanodeInfo[] info = dfs.getClient().datanodeReport(
+          DatanodeReportType.LIVE);
+      assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
+      final Path dirFile = new Path(dir, "ecfile");
+      FSDataOutputStream out = null;
+      try {
+        out = dfs.create(dirFile, true);
+        out.write("something".getBytes());
+        out.flush();
+        out.close();
+        Assert.fail("Failed to validate available dns against blkGroupSize");
+      } catch (IOException ioe) {
+        // expected
+        GenericTestUtils.assertExceptionContains("Failed: the number of "
+            + "remaining blocks = 5 < the number of data blocks = 6", ioe);
+        DFSStripedOutputStream dfsout = (DFSStripedOutputStream) out
+            .getWrappedStream();
+
+        // get leading streamer and verify the last exception
+        StripedDataStreamer datastreamer = dfsout.getStripedDataStreamer(0);
+        try {
+          datastreamer.getLastException().check(true);
+          Assert.fail("Failed to validate available dns against blkGroupSize");
+        } catch (IOException le) {
+          GenericTestUtils.assertExceptionContains(
+              "Failed to get datablocks number of nodes from"
+                  + " namenode: blockGroupSize= 9, blocks.length= "
+                  + numDatanodes, le);
+        }
+      }
+    } finally {
+      tearDown();
+    }
+  }
+
+  @Test(timeout = 90000)
+  public void testAddBlockWhenNoSufficientParityNumOfNodes() throws IOException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    try {
+      setup(conf);
+      ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+      // shutdown few data nodes to avoid writing parity blocks
+      int killDns = (NUM_PARITY_BLOCKS - 1);
+      int numDatanodes = dataNodes.size() - killDns;
+      for (int i = 0; i < killDns; i++) {
+        cluster.stopDataNode(i);
+      }
+      cluster.restartNameNodes();
+      cluster.triggerHeartbeats();
+      DatanodeInfo[] info = dfs.getClient().datanodeReport(
+          DatanodeReportType.LIVE);
+      assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
+      Path srcPath = new Path(dir, "testAddBlockWhenNoSufficientParityNodes");
+      int fileLength = HdfsConstants.BLOCK_STRIPED_CELL_SIZE - 1000;
+      final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
+      DFSTestUtil.writeFile(dfs, srcPath, new String(expected));
+      StripedFileTestUtil.verifySeek(dfs, srcPath, fileLength);
+    } finally {
+      tearDown();
+    }
+  }
+
   private void runTest(final Path p, final int length, final int killPos,
       final int dnIndex, final boolean tokenExpire) throws Exception {
     LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos