[INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task (#10715)
* [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task
* [INLONG-10714][Manager] Fix the problem of incorrect deletion of data source task
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index ccefd5c..6b19648 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -18,9 +18,7 @@
package org.apache.inlong.manager.service.listener.source;
import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -33,15 +31,11 @@
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
-import com.google.common.collect.Lists;
-import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
-import java.util.concurrent.TimeUnit;
/**
* Event listener of operate resources, such as delete, stop, restart sources.
@@ -67,68 +61,22 @@
InlongGroupInfo groupInfo = getGroupInfo(context.getProcessForm());
final String groupId = groupInfo.getInlongGroupId();
List<InlongStreamBriefInfo> streamResponses = streamService.listBriefWithSink(groupId);
- List<StreamSource> unOperatedSources = Lists.newArrayList();
- streamResponses.forEach(stream -> operateStreamSources(groupId, stream.getInlongStreamId(),
- context.getOperator(), unOperatedSources));
-
- if (CollectionUtils.isNotEmpty(unOperatedSources)) {
- GroupOperateType operateType = getOperateType(context.getProcessForm());
- StringBuilder builder = new StringBuilder("Unsupported operate ").append(operateType).append(" for (");
- unOperatedSources.forEach(source -> builder.append(" ").append(source.getSourceName()).append(" "));
- String errMsg = builder.append(")").toString();
- throw new WorkflowListenerException(errMsg);
- }
-
+ streamResponses
+ .forEach(stream -> operateStreamSources(groupId, stream.getInlongStreamId(), context.getOperator()));
return ListenerResult.success();
}
/**
* Operate stream sources, such as delete, stop, restart.
*/
- protected void operateStreamSources(String groupId, String streamId, String operator,
- List<StreamSource> unOperatedSources) {
+ protected void operateStreamSources(String groupId, String streamId, String operator) {
List<StreamSource> sources = streamSourceService.listSource(groupId, streamId);
sources.forEach(source -> {
- if (checkIfOp(source, unOperatedSources)) {
- operateStreamSource(source.genSourceRequest(), operator);
- }
+ operateStreamSource(source.genSourceRequest(), operator);
});
}
/**
- * Check source status.
- */
- @SneakyThrows
- public boolean checkIfOp(StreamSource streamSource, List<StreamSource> unOperatedSources) {
- for (int retry = 0; retry < 60; retry++) {
- int status = streamSource.getStatus();
- SourceStatus sourceStatus = SourceStatus.forCode(status);
- // template sources are filtered and processed in corresponding subclass listeners
- if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == SourceStatus.SOURCE_STOP
- || sourceStatus == SourceStatus.HEARTBEAT_TIMEOUT
- || CollectionUtils.isNotEmpty(streamSource.getDataAddTaskList())) {
- return true;
- } else if (sourceStatus == SourceStatus.SOURCE_FAILED || sourceStatus == SourceStatus.SOURCE_DISABLE) {
- return false;
- } else {
- log.warn("stream source={} cannot be operated for status={}", streamSource, sourceStatus);
- TimeUnit.SECONDS.sleep(5);
- streamSource = streamSourceService.get(streamSource.getId());
- }
- }
- SourceStatus sourceStatus = SourceStatus.forCode(streamSource.getStatus());
- if (sourceStatus != SourceStatus.SOURCE_NORMAL
- && sourceStatus != SourceStatus.SOURCE_STOP
- && sourceStatus != SourceStatus.SOURCE_DISABLE
- && sourceStatus != SourceStatus.SOURCE_FAILED
- && sourceStatus != SourceStatus.HEARTBEAT_TIMEOUT) {
- log.error("stream source ={} cannot be operated for status={}", streamSource, sourceStatus);
- unOperatedSources.add(streamSource);
- }
- return false;
- }
-
- /**
* Operate stream sources ,such as delete, stop, restart.
*/
public abstract void operateStreamSource(SourceRequest sourceRequest, String operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 2ecd79f..7c4c0c4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -277,10 +277,6 @@
if (curState == SourceStatus.SOURCE_STOP) {
return;
}
- if (!SourceStatus.isAllowedTransition(curState, nextState)) {
- throw new BusinessException(String.format("current source status=%s for id=%s is not allowed to stop",
- existEntity.getStatus(), existEntity.getId()));
- }
StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
curEntity.setPreviousStatus(curState.getCode());
curEntity.setStatus(nextState.getCode());
@@ -300,10 +296,6 @@
StreamSourceEntity existEntity = sourceMapper.selectByIdForUpdate(request.getId());
SourceStatus curState = SourceStatus.forCode(existEntity.getStatus());
SourceStatus nextState = SourceStatus.TO_BE_ISSUED_ACTIVE;
- if (!SourceStatus.isAllowedTransition(curState, nextState)) {
- throw new BusinessException(String.format("current source status=%s for id=%s is not allowed to restart",
- existEntity.getStatus(), existEntity.getId()));
- }
StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
curEntity.setPreviousStatus(curState.getCode());
curEntity.setStatus(nextState.getCode());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index aab60a8..ca6865b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -343,11 +343,6 @@
|| SourceType.AUTO_PUSH.equals(entity.getSourceType())) {
nextStatus = SourceStatus.SOURCE_DISABLE;
}
- if (!SourceStatus.isAllowedTransition(curStatus, nextStatus)) {
- throw new BusinessException(
- String.format("current source status=%s for id=%s is not allowed to delete", entity.getStatus(),
- entity.getId()));
- }
entity.setPreviousStatus(curStatus.getCode());
entity.setStatus(nextStatus.getCode());
@@ -360,7 +355,9 @@
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
sourceFieldMapper.deleteAll(id);
-
+ SourceRequest request = CommonBeanUtils.copyProperties(entity, SourceRequest::new, true);
+ StreamSourceOperator sourceOperator = operatorFactory.getInstance(request.getSourceType());
+ sourceOperator.updateAgentTaskConfig(request, operator);
LOGGER.info("success to delete source for id={} by user={}", id, operator);
return true;
}