HDFS-15856: Make write pipeline retry times configurable. (#2721). Contributed by Qi Zhu
Reviewed-by: Ayush Saxena <ayushsaxena@apache.org>
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index df5a479..96c86c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -529,6 +529,7 @@
private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
private int lastCongestionBackoffTime;
+ private int maxPipelineRecoveryRetries;
protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
private final String[] favoredNodes;
@@ -557,6 +558,7 @@
this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
this.addBlockFlags = flags;
+ this.maxPipelineRecoveryRetries = conf.getMaxPipelineRecoveryRetries();
}
/**
@@ -1263,14 +1265,18 @@
packetSendTime.clear();
}
- // If we had to recover the pipeline five times in a row for the
+ // If we had to recover the pipeline more than the value
+ // defined by maxPipelineRecoveryRetries in a row for the
// same packet, this client likely has corrupt data or corrupting
// during transmission.
- if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) {
+ if (!errorState.isRestartingNode() && ++pipelineRecoveryCount >
+ maxPipelineRecoveryRetries) {
LOG.warn("Error recovering pipeline for writing " +
- block + ". Already retried 5 times for the same packet.");
+ block + ". Already retried " + maxPipelineRecoveryRetries
+ + " times for the same packet.");
lastException.set(new IOException("Failing write. Tried pipeline " +
- "recovery 5 times without success."));
+ "recovery " + maxPipelineRecoveryRetries
+ + " times without success."));
streamerClosed = true;
return false;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index f858080..c17ad0e 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -83,6 +83,9 @@
"dfs.namenode.kerberos.principal";
String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
+ String DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES =
+ "dfs.client.pipeline.recovery.max-retries";
+ int DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES_DEFAULT = 5;
String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
String DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY =
"dfs.client.socket.send.buffer.size";
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index facbe70..f462dca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -107,6 +107,7 @@
private final int maxFailoverAttempts;
private final int maxRetryAttempts;
+ private final int maxPipelineRecoveryRetries;
private final int failoverSleepBaseMillis;
private final int failoverSleepMaxMillis;
private final int maxBlockAcquireFailures;
@@ -294,6 +295,10 @@
Preconditions.checkArgument(clientShortCircuitNum <= 5,
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
"can't be more then 5.");
+ maxPipelineRecoveryRetries = conf.getInt(
+ HdfsClientConfigKeys.DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES,
+ HdfsClientConfigKeys.DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES_DEFAULT
+ );
}
private ByteArrayManager.Conf loadWriteByteArrayManagerConf(
@@ -699,6 +704,13 @@
}
/**
+ *@return the maxPipelineRecoveryRetries
+ */
+ public int getMaxPipelineRecoveryRetries() {
+ return maxPipelineRecoveryRetries;
+ }
+
+ /**
* Configuration for short-circuit reads.
*/
public static class ShortCircuitConf {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 56c65b5..94ff3ec 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4371,6 +4371,15 @@
</property>
<property>
+ <name>dfs.client.pipeline.recovery.max-retries</name>
+ <value>5</value>
+ <description>
+ if the DFS client encounters errors in write pipeline,
+ retry up to the number defined by this property before giving up.
+ </description>
+</property>
+
+<property>
<name>dfs.client.socket-timeout</name>
<value>60000</value>
<description>