Allow block reconstruction pending timeout to be refreshable (#4567)

Reviewed-by: Hiroyuki Adachi <hadachi@yahoo-corp.jp>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 4df4b40..dfe48f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1067,6 +1067,26 @@
     blocksReplWorkMultiplier = newVal;
   }
 
+  /**
+   * Updates the value used for pendingReconstruction timeout, which is set by
+   * {@code DFSConfigKeys.
+   *     DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY} initially.
+   *
+   * @param newVal - Must be a positive non-zero integer.
+   */
+  public void setReconstructionPendingTimeout(int newVal) {
+    ensurePositiveInt(newVal,
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY);
+    pendingReconstruction.setTimeout(newVal * 1000L);
+  }
+
+  /** Returns the current setting for pendingReconstruction timeout, set by
+   * {@code DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY}.
+   */
+  public int getReconstructionPendingTimeout() {
+    return (int)(pendingReconstruction.getTimeout() / 1000L);
+  }
+
   public int getDefaultStorageNum(BlockInfo block) {
     switch (block.getBlockType()) {
     case STRIPED: return ((BlockInfoStriped) block).getRealTotalBlockNum();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java
index 3e56606..6c3b4c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java
@@ -76,6 +76,14 @@
     timerThread.start();
   }
 
+  public void setTimeout(long timeoutPeriod) {
+    this.timeout = timeoutPeriod;
+  }
+
+  public long getTimeout() {
+    return this.timeout;
+  }
+
   /**
    * Add a block to the list of pending reconstructions
    * @param block The corresponding block
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 9e397b9..3d3b65d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -203,6 +203,8 @@
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT;
 
 import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
@@ -350,7 +352,8 @@
           DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
           DFS_BLOCK_INVALIDATE_LIMIT_KEY,
           DFS_DATANODE_PEER_STATS_ENABLED_KEY,
-          DFS_DATANODE_MAX_NODES_TO_REPORT_KEY));
+          DFS_DATANODE_MAX_NODES_TO_REPORT_KEY,
+          DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY));
 
   private static final String USAGE = "Usage: hdfs namenode ["
       + StartupOption.BACKUP.getName() + "] | \n\t["
@@ -2301,7 +2304,8 @@
     } else if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)
         || property.equals(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)
         || property.equals(
-            DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
+            DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)
+        || property.equals(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)) {
       return reconfReplicationParameters(newVal, property);
     } else if (property.equals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY) || property
         .equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) {
@@ -2347,6 +2351,14 @@
                 DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT,
                 newVal));
         newSetting = bm.getBlocksReplWorkMultiplier();
+      } else if (
+          property.equals(
+              DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY)) {
+        bm.setReconstructionPendingTimeout(
+            adjustNewVal(
+                DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT,
+                newVal));
+        newSetting = bm.getReconstructionPendingTimeout();
       } else {
         throw new IllegalArgumentException("Unexpected property " +
             property + " in reconfReplicationParameters");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java
index 8dc81f8..8336a432 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java
@@ -49,6 +49,9 @@
     config.setInt(
         DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
         12);
+    config.setInt(
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
+        300);
 
     cluster = new MiniDFSCluster.Builder(config)
         .nnTopology(MiniDFSNNTopology.simpleSingleNN(0, 0))
@@ -72,6 +75,7 @@
     assertEquals(8, bm.getMaxReplicationStreams());
     assertEquals(10, bm.getReplicationStreamsHardLimit());
     assertEquals(12, bm.getBlocksReplWorkMultiplier());
+    assertEquals(300, bm.getReconstructionPendingTimeout());
 
     cluster.getNameNode().reconfigurePropertyImpl(
         DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, "20");
@@ -81,10 +85,14 @@
     cluster.getNameNode().reconfigurePropertyImpl(
         DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
         "24");
+    cluster.getNameNode().reconfigurePropertyImpl(
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
+        "180");
 
     assertEquals(20, bm.getMaxReplicationStreams());
     assertEquals(22, bm.getReplicationStreamsHardLimit());
     assertEquals(24, bm.getBlocksReplWorkMultiplier());
+    assertEquals(180, bm.getReconstructionPendingTimeout());
   }
 
   /**
@@ -96,7 +104,8 @@
     String[] keys = new String[]{
         DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
         DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
-        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
+        DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY
     };
 
     // Ensure we cannot set any of the parameters negative
@@ -112,6 +121,7 @@
     assertEquals(8, bm.getMaxReplicationStreams());
     assertEquals(10, bm.getReplicationStreamsHardLimit());
     assertEquals(12, bm.getBlocksReplWorkMultiplier());
+    assertEquals(300, bm.getReconstructionPendingTimeout());
 
     for (String key : keys) {
       ReconfigurationException e =
@@ -126,6 +136,7 @@
     assertEquals(8, bm.getMaxReplicationStreams());
     assertEquals(10, bm.getReplicationStreamsHardLimit());
     assertEquals(12, bm.getBlocksReplWorkMultiplier());
+    assertEquals(300, bm.getReconstructionPendingTimeout());
 
     // Ensure none of the parameters can be set to a string value
     for (String key : keys) {
@@ -139,5 +150,6 @@
     assertEquals(8, bm.getMaxReplicationStreams());
     assertEquals(10, bm.getReplicationStreamsHardLimit());
     assertEquals(12, bm.getBlocksReplWorkMultiplier());
+    assertEquals(300, bm.getReconstructionPendingTimeout());
   }
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index 3df873a..99e4b34 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -438,7 +438,7 @@
     final List<String> outs = Lists.newArrayList();
     final List<String> errs = Lists.newArrayList();
     getReconfigurableProperties("namenode", address, outs, errs);
-    assertEquals(19, outs.size());
+    assertEquals(20, outs.size());
     assertTrue(outs.get(0).contains("Reconfigurable properties:"));
     assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
     assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));
@@ -1266,4 +1266,4 @@
         outs.get(8));
   }
 
-}
\ No newline at end of file
+}