HDFS-15856: Make write pipeline retry times configurable. (#2721). Contributed by Qi Zhu

Reviewed-by: Ayush Saxena <ayushsaxena@apache.org>
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index df5a479..96c86c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -529,6 +529,7 @@
   private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
       CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
   private int lastCongestionBackoffTime;
+  private int maxPipelineRecoveryRetries;
 
   protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
   private final String[] favoredNodes;
@@ -557,6 +558,7 @@
     this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
     this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
     this.addBlockFlags = flags;
+    this.maxPipelineRecoveryRetries = conf.getMaxPipelineRecoveryRetries();
   }
 
   /**
@@ -1263,14 +1265,18 @@
       packetSendTime.clear();
     }
 
-    // If we had to recover the pipeline five times in a row for the
+    // If we had to recover the pipeline more than the value
+    // defined by maxPipelineRecoveryRetries in a row for the
     // same packet, this client likely has corrupt data or corrupting
     // during transmission.
-    if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) {
+    if (!errorState.isRestartingNode() && ++pipelineRecoveryCount >
+        maxPipelineRecoveryRetries) {
       LOG.warn("Error recovering pipeline for writing " +
-          block + ". Already retried 5 times for the same packet.");
+          block + ". Already retried " + maxPipelineRecoveryRetries
+          + " times for the same packet.");
       lastException.set(new IOException("Failing write. Tried pipeline " +
-          "recovery 5 times without success."));
+          "recovery " + maxPipelineRecoveryRetries
+          + " times without success."));
       streamerClosed = true;
       return false;
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index f858080..c17ad0e 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -83,6 +83,9 @@
       "dfs.namenode.kerberos.principal";
   String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
   int     DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
+  String  DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES =
+      "dfs.client.pipeline.recovery.max-retries";
+  int     DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES_DEFAULT = 5;
   String  DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
   String  DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY =
       "dfs.client.socket.send.buffer.size";
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index facbe70..f462dca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -107,6 +107,7 @@
 
   private final int maxFailoverAttempts;
   private final int maxRetryAttempts;
+  private final int maxPipelineRecoveryRetries;
   private final int failoverSleepBaseMillis;
   private final int failoverSleepMaxMillis;
   private final int maxBlockAcquireFailures;
@@ -294,6 +295,10 @@
     Preconditions.checkArgument(clientShortCircuitNum <= 5,
             HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
                     "can't be more then 5.");
+    maxPipelineRecoveryRetries = conf.getInt(
+        HdfsClientConfigKeys.DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES,
+        HdfsClientConfigKeys.DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES_DEFAULT
+    );
   }
 
   private ByteArrayManager.Conf loadWriteByteArrayManagerConf(
@@ -699,6 +704,13 @@
   }
 
   /**
+   *@return the maxPipelineRecoveryRetries
+   */
+  public int getMaxPipelineRecoveryRetries() {
+    return maxPipelineRecoveryRetries;
+  }
+
+  /**
    * Configuration for short-circuit reads.
    */
   public static class ShortCircuitConf {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 56c65b5..94ff3ec 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4371,6 +4371,15 @@
 </property>
 
 <property>
+  <name>dfs.client.pipeline.recovery.max-retries</name>
+  <value>5</value>
+  <description>
+    if the DFS client encounters errors in write pipeline,
+    retry up to the number defined by this property before giving up.
+  </description>
+</property>
+
+<property>
   <name>dfs.client.socket-timeout</name>
   <value>60000</value>
   <description>