HDFS-12886. Ignore minReplication for block recovery. Contributed by Lukas Majercak.

(cherry picked from commit 08ff1586d5d3e39f546200f9e696f62ea4cf000d)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 6b7175d..89264c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1001,6 +1001,13 @@
         addExpectedReplicasToPending(lastBlock);
       }
       completeBlock(lastBlock, iip, false);
+    } else if (pendingRecoveryBlocks.isUnderRecovery(lastBlock)) {
+      // We've just finished recovery for this block, complete
+      // the block forcibly disregarding number of replicas.
+      // This is to ignore minReplication, the block will be closed
+      // and then replicated out.
+      completeBlock(lastBlock, iip, true);
+      updateNeededReconstructions(lastBlock, 1, 0);
     }
     return committed;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index e9bd7a8..07fd4ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -19,9 +19,14 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import org.apache.hadoop.hdfs.AppendTestUtil;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -42,6 +47,7 @@
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -1142,4 +1148,81 @@
       }
     }
   }
+
+  /**
+   * Test that block will be recovered even if there are less than the
+   * specified minReplication datanodes involved in its recovery.
+   *
+   * Check that, after recovering, the block will be successfully replicated.
+   */
+  @Test(timeout = 300000L)
+  public void testRecoveryWillIgnoreMinReplication() throws Exception {
+    tearDown(); // Stop the Mocked DN started in startup()
+
+    final int blockSize = 4096;
+    final int numReplicas = 3;
+    final String filename = "/testIgnoreMinReplication";
+    final Path filePath = new Path(filename);
+    Configuration configuration = new HdfsConfiguration();
+    configuration.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
+    configuration.setInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 2);
+    configuration.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(configuration).numDataNodes(5)
+          .build();
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final FSNamesystem fsn = cluster.getNamesystem();
+
+      // Create a file and never close the output stream to trigger recovery
+      FSDataOutputStream out = dfs.create(filePath, (short) numReplicas);
+      out.write(AppendTestUtil.randomBytes(0, blockSize));
+      out.hsync();
+
+      DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
+          cluster.getNameNodePort()), configuration);
+      LocatedBlock blk = dfsClient.getNamenode().
+          getBlockLocations(filename, 0, blockSize).
+          getLastLocatedBlock();
+
+      // Kill 2 out of 3 datanodes so that only 1 alive, thus < minReplication
+      List<DatanodeInfo> dataNodes = Arrays.asList(blk.getLocations());
+      assertEquals(dataNodes.size(), numReplicas);
+      for (DatanodeInfo dataNode : dataNodes.subList(0, numReplicas - 1)) {
+        cluster.stopDataNode(dataNode.getName());
+      }
+
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return fsn.getNumDeadDataNodes() == 2;
+        }
+      }, 300, 300000);
+
+      // Make sure hard lease expires to trigger replica recovery
+      cluster.setLeasePeriod(100L, 100L);
+
+      // Wait for recovery to succeed
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            return dfs.isFileClosed(filePath);
+          } catch (IOException e) {}
+          return false;
+        }
+      }, 300, 300000);
+
+      // Wait for the block to be replicated
+      DFSTestUtil.waitForReplication(cluster, DFSTestUtil.getFirstBlock(
+          dfs, filePath), 1, numReplicas, 0);
+
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }