[#927] Improvement: improve the control of server heartbeat (#928)
### What changes were proposed in this pull request?
Eleminate rss.server.heartbeat.timeout replaced with rss.server.heartbeat.interval
### Why are the changes needed?
https://github.com/apache/incubator-uniffle/issues/927
### Does this PR introduce _any_ user-facing change?
Yes, `rss.server.heartbeat.timeout` will effect nothing
### How was this patch tested?
UnitTest
diff --git a/README.md b/README.md
index 2bbe512..a4246fb 100644
--- a/README.md
+++ b/README.md
@@ -187,7 +187,6 @@
rss.server.flush.threadPool.size 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
- rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index c79dda6..61c2531 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -195,7 +195,7 @@
}
int requiredAssignmentShuffleServersNum = RssMRUtils.getRequiredShuffleServerNumber(conf);
- // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
+ // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
long retryInterval = conf.getLong(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
int retryTimes = conf.getInt(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES,
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index f4740cf..27c2226 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -256,7 +256,7 @@
int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
- // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
+ // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index d5fec8c..d0892ce 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -335,7 +335,7 @@
int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
- // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
+ // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
diff --git a/deploy/kubernetes/operator/examples/configuration.yaml b/deploy/kubernetes/operator/examples/configuration.yaml
index 16b551a..5fcb70b 100644
--- a/deploy/kubernetes/operator/examples/configuration.yaml
+++ b/deploy/kubernetes/operator/examples/configuration.yaml
@@ -64,7 +64,6 @@
rss.server.hdfs.base.path hdfs://${your-hdfs-path}
rss.server.health.check.enable false
rss.server.heartbeat.interval 10000
- rss.server.heartbeat.timeout 60000
rss.server.memory.shuffle.highWaterMark.percentage 70.0
rss.server.memory.shuffle.lowWaterMark.percentage 10.0
rss.server.pending.event.timeoutSec 600
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 4f80114..ce83b93 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -47,7 +47,6 @@
rss.server.flush.threadPool.size 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
- rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
@@ -143,7 +142,6 @@
rss.server.flush.thread.alive 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
-rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 9a450ca..566d99e 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -375,7 +375,6 @@
RemoteStorageInfo remoteStorage = new RemoteStorageInfo("");
ShuffleAssignmentsInfo response = null;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
- int heartbeatTimeout = shuffleServerConf.getInteger("rss.server.heartbeat.timeout", 65000);
int heartbeatInterval = shuffleServerConf.getInteger("rss.server.heartbeat.interval", 1000);
Thread.sleep(heartbeatInterval * 2);
shuffleWriteClientImpl.registerCoordinators(COORDINATOR_QUORUM);
@@ -410,7 +409,7 @@
);
});
return shuffleAssignments;
- }, heartbeatTimeout, maxTryTime);
+ }, heartbeatInterval, maxTryTime);
assertNotNull(response);
}
diff --git a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index ddb7d53..893b9c5 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -51,13 +51,11 @@
private final ScheduledExecutorService service =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("startHeartBeat");
private final ExecutorService heartBeatExecutorService;
- private long heartBeatTimeout;
public RegisterHeartBeat(ShuffleServer shuffleServer) {
ShuffleServerConf conf = shuffleServer.getShuffleServerConf();
this.heartBeatInitialDelay = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_DELAY);
this.heartBeatInterval = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL);
- this.heartBeatTimeout = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_TIMEOUT);
this.coordinatorQuorum = conf.getString(ShuffleServerConf.RSS_COORDINATOR_QUORUM);
CoordinatorClientFactory factory =
new CoordinatorClientFactory(conf.get(ShuffleServerConf.RSS_CLIENT_TYPE));
@@ -107,6 +105,7 @@
Map<String, StorageInfo> localStorageInfo,
int nettyPort) {
boolean sendSuccessfully = false;
+ // use `rss.server.heartbeat.interval` as the timeout option
RssSendHeartBeatRequest request = new RssSendHeartBeatRequest(
id,
ip,
@@ -115,7 +114,7 @@
preAllocatedMemory,
availableMemory,
eventNumInFlush,
- heartBeatTimeout,
+ heartBeatInterval,
tags,
isHealthy,
serverStatus,
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index a34c013..ecd1160 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -75,12 +75,6 @@
.defaultValue(10 * 1000L)
.withDescription("Heartbeat interval to Coordinator (ms)");
- public static final ConfigOption<Long> SERVER_HEARTBEAT_TIMEOUT = ConfigOptions
- .key("rss.server.heartbeat.timeout")
- .longType()
- .defaultValue(60 * 1000L)
- .withDescription("rss heartbeat interval ms");
-
public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions
.key("rss.server.flush.threadPool.size")
.intType()
diff --git a/server/src/test/resources/server.conf b/server/src/test/resources/server.conf
index ca7a47b..585da76 100644
--- a/server/src/test/resources/server.conf
+++ b/server/src/test/resources/server.conf
@@ -24,6 +24,5 @@
rss.server.partition.buffer.size 128
rss.jetty.http.port 12345
rss.jetty.corePool.size 64
-rss.server.heartbeat.timeout 1
rss.server.write.timeout 2000
rss.server.shuffleBufferManager.trigger.flush.interval 500