| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.dolphinscheduler.api.service.impl; |
| |
| import static java.util.stream.Collectors.toSet; |
| import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; |
| import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; |
| import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; |
| |
| import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationCreateRequest; |
| import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationFilterRequest; |
| import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationUpdateUpstreamRequest; |
| import org.apache.dolphinscheduler.api.enums.Status; |
| import org.apache.dolphinscheduler.api.exceptions.ServiceException; |
| import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; |
| import org.apache.dolphinscheduler.api.service.ProjectService; |
| import org.apache.dolphinscheduler.common.constants.Constants; |
| import org.apache.dolphinscheduler.common.enums.ConditionType; |
| import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
| import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; |
| import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; |
| import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; |
| import org.apache.dolphinscheduler.dao.entity.Project; |
| import org.apache.dolphinscheduler.dao.entity.TaskDefinition; |
| import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
| import org.apache.dolphinscheduler.dao.entity.User; |
| import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; |
| import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; |
| import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; |
| import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; |
| import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; |
| import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; |
| import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; |
| import org.apache.dolphinscheduler.service.process.ProcessService; |
| |
| import org.apache.commons.collections4.CollectionUtils; |
| import org.apache.commons.lang3.StringUtils; |
| |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| import lombok.extern.slf4j.Slf4j; |
| |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.stereotype.Service; |
| import org.springframework.transaction.annotation.Transactional; |
| |
| import com.baomidou.mybatisplus.core.metadata.IPage; |
| import com.baomidou.mybatisplus.extension.plugins.pagination.Page; |
| import com.google.common.collect.Lists; |
| |
| /** |
| * process task relation service impl |
| */ |
| @Service |
| @Slf4j |
| public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements ProcessTaskRelationService { |
| |
| @Autowired |
| private ProjectMapper projectMapper; |
| |
| @Autowired |
| private ProjectService projectService; |
| |
| @Autowired |
| private ProcessTaskRelationMapper processTaskRelationMapper; |
| |
| @Autowired |
| private TaskDefinitionLogMapper taskDefinitionLogMapper; |
| |
| @Autowired |
| private TaskDefinitionMapper taskDefinitionMapper; |
| |
| @Autowired |
| private ProcessDefinitionMapper processDefinitionMapper; |
| |
| @Autowired |
| private ProcessDefinitionLogMapper processDefinitionLogMapper; |
| |
| @Autowired |
| private ProcessService processService; |
| |
| @Autowired |
| private ProcessTaskRelationLogMapper processTaskRelationLogMapper; |
| |
| /** |
| * create process task relation |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param processDefinitionCode processDefinitionCode |
| * @param preTaskCode preTaskCode |
| * @param postTaskCode postTaskCode |
| * @return create result code |
| */ |
| @Transactional |
| @Override |
| public Map<String, Object> createProcessTaskRelation(User loginUser, long projectCode, long processDefinitionCode, |
| long preTaskCode, long postTaskCode) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); |
| if (processDefinition == null) { |
| log.error("Process definition does not exist, processCode:{}.", processDefinitionCode); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode)); |
| return result; |
| } |
| if (processDefinition.getProjectCode() != projectCode) { |
| log.error("Process definition's project does not match project {}.", projectCode); |
| putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH); |
| return result; |
| } |
| updateProcessDefiniteVersion(loginUser, result, processDefinition); |
| List<ProcessTaskRelation> processTaskRelationList = |
| processTaskRelationMapper.queryByProcessCode(processDefinitionCode); |
| List<ProcessTaskRelation> processTaskRelations = Lists.newArrayList(processTaskRelationList); |
| if (!processTaskRelations.isEmpty()) { |
| Map<Long, ProcessTaskRelation> preTaskCodeMap = |
| processTaskRelations.stream().filter(r -> r.getPostTaskCode() == postTaskCode) |
| .collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, |
| processTaskRelation -> processTaskRelation)); |
| if (!preTaskCodeMap.isEmpty()) { |
| if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) { |
| putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, String.valueOf(processDefinitionCode)); |
| return result; |
| } |
| if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) { |
| // delete no upstream |
| processTaskRelations.remove(preTaskCodeMap.get(0L)); |
| } |
| } |
| } |
| TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode); |
| ProcessTaskRelation processTaskRelation = setRelation(processDefinition, postTaskDefinition); |
| if (preTaskCode != 0L) { |
| TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(preTaskCode); |
| List<ProcessTaskRelation> upstreamTaskRelationList = processTaskRelations.stream() |
| .filter(r -> r.getPostTaskCode() == preTaskCode).collect(Collectors.toList()); |
| // upstream is or not exist |
| if (upstreamTaskRelationList.isEmpty()) { |
| ProcessTaskRelation preProcessTaskRelation = setRelation(processDefinition, preTaskDefinition); |
| preProcessTaskRelation.setPreTaskCode(0L); |
| preProcessTaskRelation.setPreTaskVersion(0); |
| processTaskRelations.add(preProcessTaskRelation); |
| } |
| processTaskRelation.setPreTaskCode(preTaskDefinition.getCode()); |
| processTaskRelation.setPreTaskVersion(preTaskDefinition.getVersion()); |
| } else { |
| processTaskRelation.setPreTaskCode(0L); |
| processTaskRelation.setPreTaskVersion(0); |
| } |
| processTaskRelations.add(processTaskRelation); |
| updateRelation(loginUser, result, processDefinition, processTaskRelations); |
| return result; |
| } |
| |
| private ProcessTaskRelationLog persist2ProcessTaskRelationLog(User user, ProcessTaskRelation processTaskRelation) { |
| ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); |
| processTaskRelationLog.setOperator(user.getId()); |
| processTaskRelationLog.setOperateTime(new Date()); |
| int result = processTaskRelationLogMapper.insert(processTaskRelationLog); |
| if (result <= 0) { |
| throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR, |
| processTaskRelationLog.getPreTaskCode(), processTaskRelationLog.getPostTaskCode()); |
| } |
| return processTaskRelationLog; |
| } |
| |
| private List<ProcessTaskRelationLog> batchPersist2ProcessTaskRelationLog(User user, |
| List<ProcessTaskRelation> processTaskRelations) { |
| Date now = new Date(); |
| List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>(); |
| |
| for (ProcessTaskRelation processTaskRelation : processTaskRelations) { |
| ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); |
| processTaskRelationLog.setOperator(user.getId()); |
| processTaskRelationLog.setOperateTime(now); |
| processTaskRelationLogs.add(processTaskRelationLog); |
| } |
| int result = processTaskRelationLogMapper.batchInsert(processTaskRelationLogs); |
| if (result != processTaskRelationLogs.size()) { |
| throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR); |
| } |
| return processTaskRelationLogs; |
| } |
| |
| private void updateVersions(ProcessTaskRelation processTaskRelation) { |
| // workflow |
| ProcessDefinition processDefinition = |
| processDefinitionMapper.queryByCode(processTaskRelation.getProcessDefinitionCode()); |
| processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion()); |
| |
| // tasks |
| TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(processTaskRelation.getPreTaskCode()); |
| processTaskRelation.setPreTaskVersion(preTaskDefinition.getVersion()); |
| TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(processTaskRelation.getPostTaskCode()); |
| processTaskRelation.setPostTaskVersion(postTaskDefinition.getVersion()); |
| } |
| |
| /** |
| * create resource process task relation |
| * |
| * @param loginUser login user |
| * @param taskRelationCreateRequest project code |
| * @return ProcessTaskRelation object |
| */ |
| @Override |
| @Transactional |
| public ProcessTaskRelation createProcessTaskRelationV2(User loginUser, |
| TaskRelationCreateRequest taskRelationCreateRequest) { |
| ProcessTaskRelation processTaskRelation = taskRelationCreateRequest.convert2ProcessTaskRelation(); |
| ProcessDefinition processDefinition = |
| processDefinitionMapper.queryByCode(processTaskRelation.getProcessDefinitionCode()); |
| if (processDefinition == null) { |
| throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, |
| String.valueOf(processTaskRelation.getProcessDefinitionCode())); |
| } |
| if (processTaskRelation.getProjectCode() == 0) { |
| processTaskRelation.setProjectCode(processDefinition.getProjectCode()); |
| } |
| Project project = projectMapper.queryByCode(processTaskRelation.getProjectCode()); |
| projectService.checkProjectAndAuthThrowException(loginUser, project, null); |
| |
| // persistence process task relation and process task relation log to database |
| this.updateVersions(processTaskRelation); |
| int insert = processTaskRelationMapper.insert(processTaskRelation); |
| if (insert <= 0) { |
| throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR, processTaskRelation.getPreTaskCode(), |
| processTaskRelation.getPostTaskCode()); |
| } |
| this.persist2ProcessTaskRelationLog(loginUser, processTaskRelation); |
| |
| return processTaskRelation; |
| } |
| |
| private ProcessTaskRelation setRelation(ProcessDefinition processDefinition, TaskDefinition taskDefinition) { |
| Date now = new Date(); |
| ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); |
| processTaskRelation.setProjectCode(processDefinition.getProjectCode()); |
| processTaskRelation.setProcessDefinitionCode(processDefinition.getCode()); |
| processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion()); |
| processTaskRelation.setPostTaskCode(taskDefinition.getCode()); |
| processTaskRelation.setPostTaskVersion(taskDefinition.getVersion()); |
| processTaskRelation.setConditionType(ConditionType.NONE); |
| processTaskRelation.setConditionParams("{}"); |
| processTaskRelation.setCreateTime(now); |
| processTaskRelation.setUpdateTime(now); |
| return processTaskRelation; |
| } |
| |
| private void updateProcessDefiniteVersion(User loginUser, Map<String, Object> result, |
| ProcessDefinition processDefinition) { |
| int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); |
| if (insertVersion <= 0) { |
| log.error("Update process definition error, projectCode:{}, processDefinitionCode:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode()); |
| putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); |
| throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); |
| } else |
| log.info( |
| "Update process definition complete, new version is {}, projectCode:{}, processDefinitionCode:{}.", |
| insertVersion, processDefinition.getProjectCode(), processDefinition.getCode()); |
| processDefinition.setVersion(insertVersion); |
| } |
| |
| /** |
| * delete process task relation |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param processDefinitionCode process definition code |
| * @param taskCode the post task code |
| * @return delete result code |
| */ |
| @Transactional |
| @Override |
| public Map<String, Object> deleteTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, |
| long taskCode) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| if (taskCode == 0) { |
| log.error( |
| "Delete task process relation error due to parameter taskCode is 0, projectCode:{}, processDefinitionCode:{}.", |
| projectCode, processDefinitionCode); |
| putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR); |
| return result; |
| } |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); |
| if (processDefinition == null) { |
| log.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode)); |
| return result; |
| } |
| TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); |
| if (null == taskDefinition) { |
| log.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode); |
| putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); |
| return result; |
| } |
| List<ProcessTaskRelation> processTaskRelations = |
| processTaskRelationMapper.queryByProcessCode(processDefinitionCode); |
| List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations); |
| if (CollectionUtils.isEmpty(processTaskRelationList)) { |
| log.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode, |
| processDefinitionCode); |
| putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); |
| return result; |
| } |
| List<Long> downstreamList = Lists.newArrayList(); |
| for (ProcessTaskRelation processTaskRelation : processTaskRelations) { |
| if (processTaskRelation.getPreTaskCode() == taskCode) { |
| downstreamList.add(processTaskRelation.getPostTaskCode()); |
| } |
| if (processTaskRelation.getPostTaskCode() == taskCode) { |
| processTaskRelationList.remove(processTaskRelation); |
| } |
| } |
| if (CollectionUtils.isNotEmpty(downstreamList)) { |
| String downstream = StringUtils.join(downstreamList, ","); |
| log.warn( |
| "Relation can not be deleted because task has downstream tasks:[{}], projectCode:{}, processDefinitionCode:{}, taskDefinitionCode:{}.", |
| downstream, projectCode, processDefinitionCode, taskCode); |
| putMsg(result, Status.TASK_HAS_DOWNSTREAM, downstream); |
| return result; |
| } |
| updateProcessDefiniteVersion(loginUser, result, processDefinition); |
| updateRelation(loginUser, result, processDefinition, processTaskRelationList); |
| if (TASK_TYPE_CONDITIONS.equals(taskDefinition.getTaskType()) |
| || TASK_TYPE_DEPENDENT.equals(taskDefinition.getTaskType()) |
| || TASK_TYPE_SUB_PROCESS.equals(taskDefinition.getTaskType())) { |
| int deleteTaskDefinition = taskDefinitionMapper.deleteByCode(taskCode); |
| if (0 == deleteTaskDefinition) { |
| log.error("Delete task definition error, taskDefinitionCode:{}.", taskCode); |
| putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); |
| throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); |
| } else |
| log.info("Delete {} type task definition complete, taskDefinitionCode:{}.", |
| taskDefinition.getTaskType(), taskCode); |
| } |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * delete process task relation, will delete exists relation preTaskCode -> postTaskCode, throw error if not exists |
| * |
| * @param loginUser login user |
| * @param preTaskCode relation upstream code |
| * @param postTaskCode relation downstream code |
| */ |
| @Override |
| @Transactional |
| public void deleteTaskProcessRelationV2(User loginUser, |
| long preTaskCode, |
| long postTaskCode) { |
| ProcessTaskRelation processTaskRelation = |
| new TaskRelationFilterRequest(preTaskCode, postTaskCode).convert2TaskDefinition(); |
| |
| Page<ProcessTaskRelation> page = |
| new Page<>(new TaskRelationFilterRequest(preTaskCode, postTaskCode).getPageNo(), |
| new TaskRelationFilterRequest(preTaskCode, postTaskCode).getPageSize()); |
| IPage<ProcessTaskRelation> processTaskRelationIPage = |
| processTaskRelationMapper.filterProcessTaskRelation(page, processTaskRelation); |
| |
| List<ProcessTaskRelation> processTaskRelations = processTaskRelationIPage.getRecords(); |
| if (processTaskRelations.size() != 1) { |
| throw new ServiceException(Status.PROCESS_TASK_RELATION_NOT_EXPECT, 1, processTaskRelations.size()); |
| } |
| |
| ProcessTaskRelation processTaskRelationDb = processTaskRelations.get(0); |
| Project project = projectMapper.queryByCode(processTaskRelationDb.getProjectCode()); |
| projectService.checkProjectAndAuthThrowException(loginUser, project, null); |
| processTaskRelationMapper.deleteById(processTaskRelationDb.getId()); |
| } |
| |
| /** |
| * delete process task relation, will delete exists relation upstream -> downstream, throw error if not exists |
| * |
| * @param loginUser login user |
| * @param taskCode relation upstream code |
| * @param needSyncDag needSyncDag |
| * @param taskRelationUpdateUpstreamRequest relation downstream code |
| */ |
| @Override |
| @Transactional |
| public List<ProcessTaskRelation> updateUpstreamTaskDefinitionWithSyncDag(User loginUser, |
| long taskCode, |
| Boolean needSyncDag, |
| TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) { |
| TaskDefinition downstreamTask = taskDefinitionMapper.queryByCode(taskCode); |
| if (downstreamTask == null) { |
| throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode); |
| } |
| List<Long> upstreamTaskCodes = taskRelationUpdateUpstreamRequest.getUpstreams(); |
| |
| ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); |
| processTaskRelation.setPostTaskCode(taskCode); |
| |
| Page<ProcessTaskRelation> page = new Page<>(taskRelationUpdateUpstreamRequest.getPageNo(), |
| taskRelationUpdateUpstreamRequest.getPageSize()); |
| IPage<ProcessTaskRelation> processTaskRelationExistsIPage = |
| processTaskRelationMapper.filterProcessTaskRelation(page, processTaskRelation); |
| List<ProcessTaskRelation> processTaskRelationExists = processTaskRelationExistsIPage.getRecords(); |
| |
| ProcessDefinition processDefinition = null; |
| if (CollectionUtils.isNotEmpty(processTaskRelationExists)) { |
| processDefinition = |
| processDefinitionMapper.queryByCode(processTaskRelationExists.get(0).getProcessDefinitionCode()); |
| } else if (taskRelationUpdateUpstreamRequest.getWorkflowCode() != 0L) { |
| processDefinition = |
| processDefinitionMapper.queryByCode(taskRelationUpdateUpstreamRequest.getWorkflowCode()); |
| } |
| if (processDefinition == null) { |
| throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, |
| taskRelationUpdateUpstreamRequest.toString()); |
| } |
| processDefinition.setUpdateTime(new Date()); |
| int insertVersion = processDefinition.getVersion(); |
| if (needSyncDag) { |
| insertVersion = |
| this.saveProcessDefine(loginUser, processDefinition); |
| if (insertVersion <= 0) { |
| throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); |
| } |
| } |
| // get new relation to create and out of date relation to delete |
| List<Long> taskCodeCreates = upstreamTaskCodes |
| .stream() |
| .filter(upstreamTaskCode -> processTaskRelationExists.stream().noneMatch( |
| processTaskRelationExist -> processTaskRelationExist.getPreTaskCode() == upstreamTaskCode)) |
| .collect(Collectors.toList()); |
| List<Integer> taskCodeDeletes = processTaskRelationExists.stream() |
| .filter(ptr -> !upstreamTaskCodes.contains(ptr.getPreTaskCode())) |
| .map(ProcessTaskRelation::getId) |
| .collect(Collectors.toList()); |
| |
| // delete relation not exists |
| if (CollectionUtils.isNotEmpty(taskCodeDeletes)) { |
| int delete = processTaskRelationMapper.deleteBatchIds(taskCodeDeletes); |
| if (delete != taskCodeDeletes.size()) { |
| throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_DELETE_ERROR, taskCodeDeletes); |
| } |
| } |
| |
| // create relation not exists |
| List<ProcessTaskRelation> processTaskRelations = new ArrayList<>(); |
| for (long createCode : taskCodeCreates) { |
| long upstreamCode = 0L; |
| int version = 0; |
| if (createCode != 0L) { |
| // 0 for DAG root, should not, it may already exists and skip to create anymore |
| TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode); |
| if (upstreamTask == null) { |
| throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, createCode); |
| } |
| upstreamCode = upstreamTask.getCode(); |
| version = upstreamTask.getVersion(); |
| } |
| ProcessTaskRelation processTaskRelationCreate = |
| new ProcessTaskRelation(null, processDefinition.getVersion(), downstreamTask.getProjectCode(), |
| processDefinition.getCode(), upstreamCode, version, |
| downstreamTask.getCode(), downstreamTask.getVersion(), null, null); |
| processTaskRelations.add(processTaskRelationCreate); |
| } |
| int batchInsert = processTaskRelationMapper.batchInsert(processTaskRelations); |
| if (batchInsert != processTaskRelations.size()) { |
| throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_CREATE_ERROR, taskCodeCreates); |
| } |
| |
| // batch sync to process task relation log |
| int saveTaskRelationResult = saveTaskRelation(loginUser, processDefinition, insertVersion); |
| if (saveTaskRelationResult != Constants.EXIT_CODE_SUCCESS) { |
| log.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); |
| throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); |
| } |
| log.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); |
| processTaskRelations.get(0).setProcessDefinitionVersion(insertVersion); |
| return processTaskRelations; |
| } |
| |
| public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition, |
| int processDefinitionVersion) { |
| long projectCode = processDefinition.getProjectCode(); |
| long processDefinitionCode = processDefinition.getCode(); |
| List<ProcessTaskRelation> taskRelations = |
| processTaskRelationMapper.queryByProcessCode(processDefinitionCode); |
| List<ProcessTaskRelationLog> taskRelationList = |
| taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); |
| |
| List<Long> taskCodeList = |
| taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList()); |
| List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList); |
| List<TaskDefinitionLog> taskDefinitionLogs = |
| taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList()); |
| |
| if (taskRelationList.isEmpty()) { |
| return Constants.EXIT_CODE_SUCCESS; |
| } |
| Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null; |
| if (org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) { |
| taskDefinitionLogMap = taskDefinitionLogs |
| .stream() |
| .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog)); |
| } |
| Date now = new Date(); |
| for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) { |
| processTaskRelationLog.setProjectCode(projectCode); |
| processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); |
| processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion); |
| if (taskDefinitionLogMap != null) { |
| TaskDefinitionLog preTaskDefinitionLog = |
| taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode()); |
| if (preTaskDefinitionLog != null) { |
| processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion()); |
| } |
| TaskDefinitionLog postTaskDefinitionLog = |
| taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode()); |
| if (postTaskDefinitionLog != null) { |
| processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion()); |
| } |
| } |
| processTaskRelationLog.setCreateTime(now); |
| processTaskRelationLog.setUpdateTime(now); |
| processTaskRelationLog.setOperator(loginUser.getId()); |
| processTaskRelationLog.setOperateTime(now); |
| } |
| if (CollectionUtils.isNotEmpty(taskRelations)) { |
| Set<Integer> processTaskRelationSet = |
| taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet()); |
| Set<Integer> taskRelationSet = |
| taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet()); |
| boolean isSame = org.apache.commons.collections.CollectionUtils.isEqualCollection(processTaskRelationSet, |
| taskRelationSet); |
| if (isSame) { |
| return Constants.EXIT_CODE_SUCCESS; |
| } |
| processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode); |
| } |
| List<ProcessTaskRelation> processTaskRelations = |
| taskRelationList.stream().map(ProcessTaskRelation::new).collect(Collectors.toList()); |
| int insert = processTaskRelationMapper.batchInsert(processTaskRelations); |
| int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList); |
| return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; |
| } |
| |
| public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) { |
| ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); |
| Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode()); |
| int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1; |
| processDefinitionLog.setVersion(insertVersion); |
| processDefinitionLog.setOperator(loginUser.getId()); |
| processDefinitionLog.setOperateTime(processDefinition.getUpdateTime()); |
| processDefinitionLog.setId(null); |
| int insertLog = processDefinitionLogMapper.insert(processDefinitionLog); |
| |
| processDefinitionLog.setId(processDefinition.getId()); |
| int result = processDefinitionMapper.updateById(processDefinitionLog); |
| return (insertLog & result) > 0 ? insertVersion : 0; |
| } |
| |
| private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition, |
| List<ProcessTaskRelation> processTaskRelationList) { |
| List<ProcessTaskRelationLog> relationLogs = |
| processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); |
| int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), |
| processDefinition.getCode(), |
| processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE); |
| if (insertResult == Constants.EXIT_CODE_SUCCESS) { |
| log.info( |
| "Update task relations complete, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion()); |
| putMsg(result, Status.SUCCESS); |
| result.put(Constants.DATA_LIST, processDefinition); |
| } else { |
| log.error( |
| "Update task relations error, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion()); |
| putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); |
| throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); |
| } |
| } |
| |
| /** |
| * delete task upstream relation |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param preTaskCodes the pre task codes, sep ',' |
| * @param taskCode the post task code |
| * @return delete result code |
| */ |
| @Transactional |
| @Override |
| public Map<String, Object> deleteUpstreamRelation(User loginUser, long projectCode, String preTaskCodes, |
| long taskCode) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| if (StringUtils.isEmpty(preTaskCodes)) { |
| log.warn("Parameter preTaskCodes is empty."); |
| putMsg(result, Status.DATA_IS_NULL, "preTaskCodes"); |
| return result; |
| } |
| List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); |
| if (CollectionUtils.isEmpty(upstreamList)) { |
| log.error("Upstream tasks based on the task do not exist, theTaskDefinitionCode:{}.", taskCode); |
| putMsg(result, Status.DATA_IS_NULL, "taskCode"); |
| return result; |
| } |
| |
| List<Long> preTaskCodeList = Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream() |
| .map(Long::parseLong).collect(Collectors.toList()); |
| if (preTaskCodeList.contains(0L)) { |
| log.warn("Parameter preTaskCodes contain 0."); |
| putMsg(result, Status.DATA_IS_NULL, "preTaskCodes"); |
| return result; |
| } |
| List<Long> currentUpstreamList = |
| upstreamList.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toList()); |
| if (currentUpstreamList.contains(0L)) { |
| log.error("Upstream taskCodes based on the task contain, theTaskDefinitionCode:{}.", taskCode); |
| putMsg(result, Status.DATA_IS_NOT_VALID, "currentUpstreamList"); |
| return result; |
| } |
| List<Long> tmpCurrent = Lists.newArrayList(currentUpstreamList); |
| tmpCurrent.removeAll(preTaskCodeList); |
| preTaskCodeList.removeAll(currentUpstreamList); |
| if (!preTaskCodeList.isEmpty()) { |
| String invalidPreTaskCodes = StringUtils.join(preTaskCodeList, Constants.COMMA); |
| log.error("Some upstream taskCodes are invalid, preTaskCodeList:{}.", invalidPreTaskCodes); |
| putMsg(result, Status.DATA_IS_NOT_VALID, invalidPreTaskCodes); |
| return result; |
| } |
| ProcessDefinition processDefinition = |
| processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode()); |
| if (processDefinition == null) { |
| log.error("Process definition does not exist, processDefinitionCode:{}.", |
| upstreamList.get(0).getProcessDefinitionCode()); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, |
| String.valueOf(upstreamList.get(0).getProcessDefinitionCode())); |
| return result; |
| } |
| List<ProcessTaskRelation> processTaskRelations = |
| processTaskRelationMapper.queryByProcessCode(processDefinition.getCode()); |
| List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations); |
| List<ProcessTaskRelation> processTaskRelationWaitRemove = Lists.newArrayList(); |
| for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { |
| if (currentUpstreamList.size() > 1) { |
| if (currentUpstreamList.contains(processTaskRelation.getPreTaskCode())) { |
| currentUpstreamList.remove(processTaskRelation.getPreTaskCode()); |
| processTaskRelationWaitRemove.add(processTaskRelation); |
| } |
| } else { |
| if (processTaskRelation.getPostTaskCode() == taskCode |
| && (currentUpstreamList.isEmpty() || tmpCurrent.isEmpty())) { |
| processTaskRelation.setPreTaskVersion(0); |
| processTaskRelation.setPreTaskCode(0L); |
| } |
| } |
| } |
| processTaskRelationList.removeAll(processTaskRelationWaitRemove); |
| updateProcessDefiniteVersion(loginUser, result, processDefinition); |
| updateRelation(loginUser, result, processDefinition, processTaskRelationList); |
| return result; |
| } |
| |
| /** |
| * delete task downstream relation |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param postTaskCodes the post task codes, sep ',' |
| * @param taskCode the pre task code |
| * @return delete result code |
| */ |
| @Transactional |
| @Override |
| public Map<String, Object> deleteDownstreamRelation(User loginUser, long projectCode, String postTaskCodes, |
| long taskCode) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| if (StringUtils.isEmpty(postTaskCodes)) { |
| log.warn("Parameter postTaskCodes is empty."); |
| putMsg(result, Status.DATA_IS_NULL, "postTaskCodes"); |
| return result; |
| } |
| List<ProcessTaskRelation> downstreamList = |
| processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode); |
| if (CollectionUtils.isEmpty(downstreamList)) { |
| log.error("Downstream tasks based on the task do not exist, theTaskDefinitionCode:{}.", taskCode); |
| putMsg(result, Status.DATA_IS_NULL, "taskCode"); |
| return result; |
| } |
| List<Long> postTaskCodeList = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream() |
| .map(Long::parseLong).collect(Collectors.toList()); |
| if (postTaskCodeList.contains(0L)) { |
| log.warn("Parameter postTaskCodes contains 0."); |
| putMsg(result, Status.DATA_IS_NULL, "postTaskCodes"); |
| return result; |
| } |
| ProcessDefinition processDefinition = |
| processDefinitionMapper.queryByCode(downstreamList.get(0).getProcessDefinitionCode()); |
| if (processDefinition == null) { |
| log.error("Process definition does not exist, processDefinitionCode:{}.", |
| downstreamList.get(0).getProcessDefinitionCode()); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, |
| String.valueOf(downstreamList.get(0).getProcessDefinitionCode())); |
| return result; |
| } |
| List<ProcessTaskRelation> processTaskRelations = |
| processTaskRelationMapper.queryByProcessCode(processDefinition.getCode()); |
| List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations); |
| processTaskRelationList |
| .removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) |
| && processTaskRelation.getPreTaskCode() == taskCode); |
| updateProcessDefiniteVersion(loginUser, result, processDefinition); |
| updateRelation(loginUser, result, processDefinition, processTaskRelationList); |
| return result; |
| } |
| |
| /** |
| * query task upstream relation |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param taskCode current task code (post task code) |
| * @return the upstream task definitions |
| */ |
| @Override |
| public Map<String, Object> queryUpstreamRelation(User loginUser, long projectCode, long taskCode) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| List<ProcessTaskRelation> processTaskRelationList = |
| processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); |
| List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>(); |
| if (CollectionUtils.isNotEmpty(processTaskRelationList)) { |
| Set<TaskDefinition> taskDefinitions = processTaskRelationList |
| .stream() |
| .map(processTaskRelation -> { |
| TaskDefinition taskDefinition = buildTaskDefinition(); |
| taskDefinition.setProjectCode(processTaskRelation.getProjectCode()); |
| taskDefinition.setCode(processTaskRelation.getPreTaskCode()); |
| taskDefinition.setVersion(processTaskRelation.getPreTaskVersion()); |
| return taskDefinition; |
| }) |
| .collect(Collectors.toSet()); |
| taskDefinitionLogList = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions); |
| } |
| result.put(Constants.DATA_LIST, taskDefinitionLogList); |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * query task downstream relation |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param taskCode pre task code |
| * @return the downstream task definitions |
| */ |
| @Override |
| public Map<String, Object> queryDownstreamRelation(User loginUser, long projectCode, long taskCode) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| List<ProcessTaskRelation> processTaskRelationList = |
| processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode); |
| List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>(); |
| if (CollectionUtils.isNotEmpty(processTaskRelationList)) { |
| Set<TaskDefinition> taskDefinitions = processTaskRelationList |
| .stream() |
| .map(processTaskRelation -> { |
| TaskDefinition taskDefinition = buildTaskDefinition(); |
| taskDefinition.setProjectCode(processTaskRelation.getProjectCode()); |
| taskDefinition.setCode(processTaskRelation.getPostTaskCode()); |
| taskDefinition.setVersion(processTaskRelation.getPostTaskVersion()); |
| return taskDefinition; |
| }) |
| .collect(Collectors.toSet()); |
| taskDefinitionLogList = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions); |
| } |
| result.put(Constants.DATA_LIST, taskDefinitionLogList); |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * delete edge |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param processDefinitionCode process definition code |
| * @param preTaskCode pre task code |
| * @param postTaskCode post task code |
| * @return delete result code |
| */ |
| @Transactional |
| @Override |
| public Map<String, Object> deleteEdge(User loginUser, long projectCode, long processDefinitionCode, |
| long preTaskCode, long postTaskCode) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); |
| if (processDefinition == null) { |
| log.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, |
| processDefinitionCode); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode)); |
| return result; |
| } |
| List<ProcessTaskRelation> processTaskRelations = |
| processTaskRelationMapper.queryByProcessCode(processDefinitionCode); |
| List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations); |
| if (CollectionUtils.isEmpty(processTaskRelationList)) { |
| log.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode, |
| processDefinitionCode); |
| putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); |
| return result; |
| } |
| Map<Long, List<ProcessTaskRelation>> taskRelationMap = new HashMap<>(); |
| for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { |
| taskRelationMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> { |
| if (v == null) { |
| v = new ArrayList<>(); |
| } |
| v.add(processTaskRelation); |
| return v; |
| }); |
| } |
| if (!taskRelationMap.containsKey(postTaskCode)) { |
| putMsg(result, Status.DATA_IS_NULL, "postTaskCode"); |
| return result; |
| } |
| if (taskRelationMap.get(postTaskCode).size() > 1) { |
| for (ProcessTaskRelation processTaskRelation : taskRelationMap.get(postTaskCode)) { |
| if (processTaskRelation.getPreTaskCode() == preTaskCode) { |
| int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId()); |
| if (delete == 0) { |
| log.error( |
| "Delete task relation edge error, processTaskRelationId:{}, preTaskCode:{}, postTaskCode:{}", |
| processTaskRelation.getId(), preTaskCode, postTaskCode); |
| putMsg(result, Status.DELETE_EDGE_ERROR); |
| throw new ServiceException(Status.DELETE_EDGE_ERROR); |
| } else |
| log.info( |
| "Delete task relation edge complete, processTaskRelationId:{}, preTaskCode:{}, postTaskCode:{}", |
| processTaskRelation.getId(), preTaskCode, postTaskCode); |
| processTaskRelationList.remove(processTaskRelation); |
| } |
| } |
| } else { |
| ProcessTaskRelation processTaskRelation = taskRelationMap.get(postTaskCode).get(0); |
| processTaskRelationList.remove(processTaskRelation); |
| processTaskRelation.setPreTaskVersion(0); |
| processTaskRelation.setPreTaskCode(0L); |
| processTaskRelationList.add(processTaskRelation); |
| log.info( |
| "Delete task relation through set invalid value for it: preTaskCode from {} to 0, processTaskRelationId:{}.", |
| preTaskCode, processTaskRelation.getId()); |
| } |
| updateProcessDefiniteVersion(loginUser, result, processDefinition); |
| updateRelation(loginUser, result, processDefinition, processTaskRelationList); |
| return result; |
| } |
| |
| @Override |
| public List<ProcessTaskRelation> queryByWorkflowDefinitionCode(long workflowDefinitionCode, |
| int workflowDefinitionVersion) { |
| return processTaskRelationMapper.queryProcessTaskRelationsByProcessDefinitionCode(workflowDefinitionCode, |
| workflowDefinitionVersion); |
| } |
| |
| @Override |
| public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion) { |
| processTaskRelationMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode, workflowDefinitionVersion); |
| } |
| |
| /** |
| * build task definition |
| * |
| * @return task definition |
| */ |
| private TaskDefinition buildTaskDefinition() { |
| |
| return new TaskDefinition() { |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (!(o instanceof TaskDefinition)) { |
| return false; |
| } |
| TaskDefinition that = (TaskDefinition) o; |
| return getCode() == that.getCode() |
| && getVersion() == that.getVersion() |
| && getProjectCode() == that.getProjectCode(); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(getCode(), getVersion(), getProjectCode()); |
| } |
| }; |
| } |
| } |