[INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task (#10715)

* [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task

* [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index ccefd5c..6b19648 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -18,9 +18,7 @@
 package org.apache.inlong.manager.service.listener.source;
 
 import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -33,15 +31,11 @@
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 
-import com.google.common.collect.Lists;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Event listener of operate resources, such as delete, stop, restart sources.
@@ -67,68 +61,22 @@
         InlongGroupInfo groupInfo = getGroupInfo(context.getProcessForm());
         final String groupId = groupInfo.getInlongGroupId();
         List<InlongStreamBriefInfo> streamResponses = streamService.listBriefWithSink(groupId);
-        List<StreamSource> unOperatedSources = Lists.newArrayList();
-        streamResponses.forEach(stream -> operateStreamSources(groupId, stream.getInlongStreamId(),
-                context.getOperator(), unOperatedSources));
-
-        if (CollectionUtils.isNotEmpty(unOperatedSources)) {
-            GroupOperateType operateType = getOperateType(context.getProcessForm());
-            StringBuilder builder = new StringBuilder("Unsupported operate ").append(operateType).append(" for (");
-            unOperatedSources.forEach(source -> builder.append(" ").append(source.getSourceName()).append(" "));
-            String errMsg = builder.append(")").toString();
-            throw new WorkflowListenerException(errMsg);
-        }
-
+        streamResponses
+                .forEach(stream -> operateStreamSources(groupId, stream.getInlongStreamId(), context.getOperator()));
         return ListenerResult.success();
     }
 
     /**
      * Operate stream sources, such as delete, stop, restart.
      */
-    protected void operateStreamSources(String groupId, String streamId, String operator,
-            List<StreamSource> unOperatedSources) {
+    protected void operateStreamSources(String groupId, String streamId, String operator) {
         List<StreamSource> sources = streamSourceService.listSource(groupId, streamId);
         sources.forEach(source -> {
-            if (checkIfOp(source, unOperatedSources)) {
-                operateStreamSource(source.genSourceRequest(), operator);
-            }
+            operateStreamSource(source.genSourceRequest(), operator);
         });
     }
 
     /**
-     * Check source status.
-     */
-    @SneakyThrows
-    public boolean checkIfOp(StreamSource streamSource, List<StreamSource> unOperatedSources) {
-        for (int retry = 0; retry < 60; retry++) {
-            int status = streamSource.getStatus();
-            SourceStatus sourceStatus = SourceStatus.forCode(status);
-            // template sources are filtered and processed in corresponding subclass listeners
-            if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == SourceStatus.SOURCE_STOP
-                    || sourceStatus == SourceStatus.HEARTBEAT_TIMEOUT
-                    || CollectionUtils.isNotEmpty(streamSource.getDataAddTaskList())) {
-                return true;
-            } else if (sourceStatus == SourceStatus.SOURCE_FAILED || sourceStatus == SourceStatus.SOURCE_DISABLE) {
-                return false;
-            } else {
-                log.warn("stream source={} cannot be operated for status={}", streamSource, sourceStatus);
-                TimeUnit.SECONDS.sleep(5);
-                streamSource = streamSourceService.get(streamSource.getId());
-            }
-        }
-        SourceStatus sourceStatus = SourceStatus.forCode(streamSource.getStatus());
-        if (sourceStatus != SourceStatus.SOURCE_NORMAL
-                && sourceStatus != SourceStatus.SOURCE_STOP
-                && sourceStatus != SourceStatus.SOURCE_DISABLE
-                && sourceStatus != SourceStatus.SOURCE_FAILED
-                && sourceStatus != SourceStatus.HEARTBEAT_TIMEOUT) {
-            log.error("stream source ={} cannot be operated for status={}", streamSource, sourceStatus);
-            unOperatedSources.add(streamSource);
-        }
-        return false;
-    }
-
-    /**
      * Operate stream sources ,such as delete, stop, restart.
      */
     public abstract void operateStreamSource(SourceRequest sourceRequest, String operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 2ecd79f..7c4c0c4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -277,10 +277,6 @@
         if (curState == SourceStatus.SOURCE_STOP) {
             return;
         }
-        if (!SourceStatus.isAllowedTransition(curState, nextState)) {
-            throw new BusinessException(String.format("current source status=%s for id=%s is not allowed to stop",
-                    existEntity.getStatus(), existEntity.getId()));
-        }
         StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
         curEntity.setPreviousStatus(curState.getCode());
         curEntity.setStatus(nextState.getCode());
@@ -300,10 +296,6 @@
         StreamSourceEntity existEntity = sourceMapper.selectByIdForUpdate(request.getId());
         SourceStatus curState = SourceStatus.forCode(existEntity.getStatus());
         SourceStatus nextState = SourceStatus.TO_BE_ISSUED_ACTIVE;
-        if (!SourceStatus.isAllowedTransition(curState, nextState)) {
-            throw new BusinessException(String.format("current source status=%s for id=%s is not allowed to restart",
-                    existEntity.getStatus(), existEntity.getId()));
-        }
         StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
         curEntity.setPreviousStatus(curState.getCode());
         curEntity.setStatus(nextState.getCode());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index aab60a8..ca6865b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -343,11 +343,6 @@
                 || SourceType.AUTO_PUSH.equals(entity.getSourceType())) {
             nextStatus = SourceStatus.SOURCE_DISABLE;
         }
-        if (!SourceStatus.isAllowedTransition(curStatus, nextStatus)) {
-            throw new BusinessException(
-                    String.format("current source status=%s for id=%s is not allowed to delete", entity.getStatus(),
-                            entity.getId()));
-        }
 
         entity.setPreviousStatus(curStatus.getCode());
         entity.setStatus(nextStatus.getCode());
@@ -360,7 +355,9 @@
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
         sourceFieldMapper.deleteAll(id);
-
+        SourceRequest request = CommonBeanUtils.copyProperties(entity, SourceRequest::new, true);
+        StreamSourceOperator sourceOperator = operatorFactory.getInstance(request.getSourceType());
+        sourceOperator.updateAgentTaskConfig(request, operator);
         LOGGER.info("success to delete source for id={} by user={}", id, operator);
         return true;
     }