[INLONG-7294][Manager] Fix the problem of suspend, restart, delete sort task failed (#7295)
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 7eae0e9..d898d5d 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -89,15 +89,13 @@
Map<String, String> kvConf = new HashMap<>();
extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isEmpty(sortExt)) {
- log.warn("no need to delete sort for groupId={}, as the sort properties is empty", groupId);
- return ListenerResult.success();
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
}
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId=%s", groupId);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 6400641..54d4f23 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -94,16 +94,13 @@
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isEmpty(sortExt)) {
- log.warn("no need to delete sort for groupId={} streamId={}, as the sort properties is empty",
- groupId, streamId);
- return ListenerResult.success();
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
}
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId=%s streamId=%s", groupId, streamId);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 0f1fbe4..57238a3 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -90,17 +90,13 @@
Map<String, String> kvConf = new HashMap<>();
extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isEmpty(sortExt)) {
- String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
- groupId);
- log.error(message);
- return ListenerResult.fail(message);
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
}
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId [%s]", groupId);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index 5ca86cc..3228544 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -99,18 +99,13 @@
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isEmpty(sortExt)) {
- String message = String.format(
- "restart sort failed for groupId [%s] and streamId [%s], as the sort properties is empty",
- groupId, streamId);
- log.error(message);
- return ListenerResult.fail(message);
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
}
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index d8a17d2..fd68e36 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -89,17 +89,13 @@
Map<String, String> kvConf = new HashMap<>();
extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isEmpty(sortExt)) {
- String message = String.format("suspend sort failed for groupId [%s], as the sort properties is empty",
- groupId);
- log.error(message);
- return ListenerResult.fail(message);
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
}
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId [%s]", groupId);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index 672590d..b3f7821 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -95,18 +95,13 @@
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
- if (StringUtils.isEmpty(sortExt)) {
- String message = String.format(
- "suspend sort failed for groupId [%s] streamId [%s], as the sort properties is empty",
- groupId, streamId);
- log.error(message);
- return ListenerResult.fail(message);
+ if (StringUtils.isNotEmpty(sortExt)) {
+ Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+ JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
+ });
+ kvConf.putAll(result);
}
- Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
- JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() {
- });
- kvConf.putAll(result);
String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
if (StringUtils.isBlank(jobId)) {
String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);