[INLONG-8035][Manager] Fix Non-file tasks cannot be recovered from the heartbeat timeout state (#8037)
diff --git a/CHANGES.md b/CHANGES.md
index 496c5c5..3a14fb1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,4 @@
+
# InLong Changelog
<!---
@@ -48,6 +49,7 @@
### Manager
| ISSUE | Summary |
|:-----------------------------------------------------------:|:-----------------------------------------------------------------------------------------------|
+| [INLONG-8035](https://github.com/apache/inlong/issues/8035) | [Bug][Manager] Non-file tasks cannot be recovered from the heartbeat timeout state |
| [INLONG-8021](https://github.com/apache/inlong/issues/8021) | [Improve][Manager] Periodically delete sources with inconsistent states |
| [INLONG-8006](https://github.com/apache/inlong/issues/8006) | [Improve][Manager] Set displayname for the auto-registered cluster |
| [INLONG-7999](https://github.com/apache/inlong/issues/7999) | [Improve][Manager] Support PostgreSQL data node |
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 53ccea2..e21c422 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -493,8 +493,8 @@
private void preTimeoutTasks(TaskRequest taskRequest) {
// If the agent report succeeds, restore the source status
- List<Integer> needUpdateIds = sourceMapper.selectHeartbeatTimeoutIds(Lists.newArrayList(SourceType.FILE),
- taskRequest.getAgentIp(), taskRequest.getClusterName());
+ List<Integer> needUpdateIds = sourceMapper.selectHeartbeatTimeoutIds(null, taskRequest.getAgentIp(),
+ taskRequest.getClusterName());
// restore state for all source by ip and type
if (CollectionUtils.isNotEmpty(needUpdateIds)) {
sourceMapper.rollbackTimeoutStatusByIds(needUpdateIds, null);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 8f2ec1b..3ea0ec0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -23,7 +23,6 @@
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
import com.google.gson.Gson;
import lombok.Getter;
import lombok.SneakyThrows;
@@ -35,7 +34,6 @@
import org.apache.inlong.common.heartbeat.ComponentHeartbeat;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterStatus;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -180,8 +178,8 @@
// If the agent report succeeds, restore the source status
if (Objects.equals(clusterNode.getType(), ClusterType.AGENT)) {
// If the agent report succeeds, restore the source status
- List<Integer> needUpdateIds = sourceMapper.selectHeartbeatTimeoutIds(
- Lists.newArrayList(SourceType.FILE), heartbeat.getIp(), heartbeat.getClusterName());
+ List<Integer> needUpdateIds = sourceMapper.selectHeartbeatTimeoutIds(null, heartbeat.getIp(),
+ heartbeat.getClusterName());
// restore state for all source by ip and type
if (CollectionUtils.isNotEmpty(needUpdateIds)) {
sourceMapper.rollbackTimeoutStatusByIds(needUpdateIds, null);
diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index d705bf9..bb2f17e 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -87,12 +87,12 @@
sort.enable.zookeeper=false
# If turned on, synchronizing change the source status when the agent heartbeat times out
-source.update.enabled=true
+source.update.enabled=false
source.update.before.seconds=60
source.update.interval=60
# If turned on, tasks in the incorrect state are periodically deleted
-source.cleansing.enabled=true
+source.cleansing.enabled=false
source.cleansing.interval=600