| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.dolphinscheduler.api.service.impl; |
| |
| import static java.util.stream.Collectors.toSet; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_BATCH_COPY; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_CREATE; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_EXPORT; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW; |
| import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE; |
| import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EXIST; |
| import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; |
| import static org.apache.dolphinscheduler.common.constants.Constants.COPY_SUFFIX; |
| import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST; |
| import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP; |
| import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING; |
| import static org.apache.dolphinscheduler.common.constants.Constants.GLOBAL_PARAMS; |
| import static org.apache.dolphinscheduler.common.constants.Constants.IMPORT_SUFFIX; |
| import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS; |
| import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LOCAL_PARAMS_LIST; |
| import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE; |
| import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SQL; |
| |
| import org.apache.dolphinscheduler.api.dto.DagDataSchedule; |
| import org.apache.dolphinscheduler.api.dto.ScheduleParam; |
| import org.apache.dolphinscheduler.api.dto.treeview.Instance; |
| import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto; |
| import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest; |
| import org.apache.dolphinscheduler.api.dto.workflow.WorkflowFilterRequest; |
| import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest; |
| import org.apache.dolphinscheduler.api.enums.Status; |
| import org.apache.dolphinscheduler.api.exceptions.ServiceException; |
| import org.apache.dolphinscheduler.api.service.MetricsCleanUpService; |
| import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; |
| import org.apache.dolphinscheduler.api.service.ProcessInstanceService; |
| import org.apache.dolphinscheduler.api.service.ProjectService; |
| import org.apache.dolphinscheduler.api.service.SchedulerService; |
| import org.apache.dolphinscheduler.api.service.TaskDefinitionLogService; |
| import org.apache.dolphinscheduler.api.service.TaskDefinitionService; |
| import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; |
| import org.apache.dolphinscheduler.api.utils.CheckUtils; |
| import org.apache.dolphinscheduler.api.utils.FileUtils; |
| import org.apache.dolphinscheduler.api.utils.PageInfo; |
| import org.apache.dolphinscheduler.api.utils.Result; |
| import org.apache.dolphinscheduler.common.constants.Constants; |
| import org.apache.dolphinscheduler.common.enums.ConditionType; |
| import org.apache.dolphinscheduler.common.enums.FailureStrategy; |
| import org.apache.dolphinscheduler.common.enums.Flag; |
| import org.apache.dolphinscheduler.common.enums.Priority; |
| import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; |
| import org.apache.dolphinscheduler.common.enums.ReleaseState; |
| import org.apache.dolphinscheduler.common.enums.TimeoutFlag; |
| import org.apache.dolphinscheduler.common.enums.UserType; |
| import org.apache.dolphinscheduler.common.enums.WarningType; |
| import org.apache.dolphinscheduler.common.graph.DAG; |
| import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; |
| import org.apache.dolphinscheduler.common.model.TaskNodeRelation; |
| import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; |
| import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; |
| import org.apache.dolphinscheduler.common.utils.DateUtils; |
| import org.apache.dolphinscheduler.common.utils.JSONUtils; |
| import org.apache.dolphinscheduler.dao.entity.DagData; |
| import org.apache.dolphinscheduler.dao.entity.DataSource; |
| import org.apache.dolphinscheduler.dao.entity.DependentSimplifyDefinition; |
| import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
| import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; |
| import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
| import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; |
| import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; |
| import org.apache.dolphinscheduler.dao.entity.Project; |
| import org.apache.dolphinscheduler.dao.entity.Schedule; |
| import org.apache.dolphinscheduler.dao.entity.TaskDefinition; |
| import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
| import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
| import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; |
| import org.apache.dolphinscheduler.dao.entity.User; |
| import org.apache.dolphinscheduler.dao.entity.UserWithProcessDefinitionCode; |
| import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; |
| import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; |
| import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; |
| import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; |
| import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; |
| import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; |
| import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; |
| import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; |
| import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; |
| import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; |
| import org.apache.dolphinscheduler.dao.mapper.UserMapper; |
| import org.apache.dolphinscheduler.dao.model.PageListingResult; |
| import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; |
| import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao; |
| import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; |
| import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; |
| import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType; |
| import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; |
| import org.apache.dolphinscheduler.plugin.task.api.model.Property; |
| import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; |
| import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; |
| import org.apache.dolphinscheduler.service.cron.CronUtils; |
| import org.apache.dolphinscheduler.service.model.TaskNode; |
| import org.apache.dolphinscheduler.service.process.ProcessService; |
| |
| import org.apache.commons.collections4.CollectionUtils; |
| import org.apache.commons.lang3.ObjectUtils; |
| import org.apache.commons.lang3.StringUtils; |
| |
| import java.io.BufferedOutputStream; |
| import java.io.BufferedReader; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipInputStream; |
| |
| import javax.servlet.ServletOutputStream; |
| import javax.servlet.http.HttpServletResponse; |
| |
| import lombok.NonNull; |
| import lombok.extern.slf4j.Slf4j; |
| |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.context.annotation.Lazy; |
| import org.springframework.http.MediaType; |
| import org.springframework.stereotype.Service; |
| import org.springframework.transaction.annotation.Transactional; |
| import org.springframework.web.multipart.MultipartFile; |
| |
| import com.baomidou.mybatisplus.core.metadata.IPage; |
| import com.baomidou.mybatisplus.extension.plugins.pagination.Page; |
| import com.fasterxml.jackson.databind.JsonNode; |
| import com.fasterxml.jackson.databind.node.ArrayNode; |
| import com.fasterxml.jackson.databind.node.ObjectNode; |
| import com.google.common.collect.Lists; |
| |
| /** |
| * process definition service impl |
| */ |
| @Service |
| @Slf4j |
| public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements ProcessDefinitionService { |
| |
| private static final String RELEASESTATE = "releaseState"; |
| |
| @Autowired |
| private ProjectMapper projectMapper; |
| |
| @Autowired |
| private ProjectService projectService; |
| |
| @Autowired |
| private UserMapper userMapper; |
| |
| @Autowired |
| private ProcessDefinitionLogMapper processDefinitionLogMapper; |
| |
| @Autowired |
| private ProcessDefinitionMapper processDefinitionMapper; |
| |
| @Autowired |
| private ProcessDefinitionDao processDefinitionDao; |
| |
| @Autowired |
| private ProcessDefinitionLogDao processDefinitionLogDao; |
| @Lazy |
| @Autowired |
| private ProcessInstanceService processInstanceService; |
| |
| @Autowired |
| private TaskInstanceMapper taskInstanceMapper; |
| |
| @Autowired |
| private ScheduleMapper scheduleMapper; |
| |
| @Autowired |
| private ProcessService processService; |
| |
| @Autowired |
| private TaskDefinitionLogDao taskDefinitionLogDao; |
| |
| @Autowired |
| private ProcessTaskRelationMapper processTaskRelationMapper; |
| |
| @Autowired |
| private ProcessTaskRelationLogMapper processTaskRelationLogMapper; |
| |
| @Autowired |
| TaskDefinitionLogMapper taskDefinitionLogMapper; |
| |
| @Lazy |
| @Autowired |
| private TaskDefinitionService taskDefinitionService; |
| |
| @Autowired |
| private TaskDefinitionLogService taskDefinitionLogService; |
| |
| @Autowired |
| private TaskDefinitionMapper taskDefinitionMapper; |
| |
| @Lazy |
| @Autowired |
| private SchedulerService schedulerService; |
| |
| @Autowired |
| private DataSourceMapper dataSourceMapper; |
| |
| @Autowired |
| private TaskPluginManager taskPluginManager; |
| |
| @Autowired |
| private WorkFlowLineageService workFlowLineageService; |
| |
| @Autowired |
| private MetricsCleanUpService metricsCleanUpService; |
| |
| /** |
| * create process definition |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param name process definition name |
| * @param description description |
| * @param globalParams global params |
| * @param locations locations for nodes |
| * @param timeout timeout |
| * @param taskRelationJson relation json for nodes |
| * @param taskDefinitionJson taskDefinitionJson |
| * @return create result code |
| */ |
| @Override |
| @Transactional |
| public Map<String, Object> createProcessDefinition(User loginUser, |
| long projectCode, |
| String name, |
| String description, |
| String globalParams, |
| String locations, |
| int timeout, |
| String taskRelationJson, |
| String taskDefinitionJson, |
| String otherParamsJson, |
| ProcessExecutionTypeEnum executionType) { |
| Project project = projectMapper.queryByCode(projectCode); |
| |
| // check if user have write perm for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_CREATE); |
| boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result); |
| if (!hasProjectAndWritePerm) { |
| return result; |
| } |
| if (checkDescriptionLength(description)) { |
| log.warn("Parameter description is too long."); |
| throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR); |
| } |
| // check whether the new process define name exist |
| ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name); |
| if (definition != null) { |
| log.warn("Process definition with the same name {} already exists, processDefinitionCode:{}.", |
| definition.getName(), definition.getCode()); |
| throw new ServiceException(Status.PROCESS_DEFINITION_NAME_EXIST, name); |
| } |
| List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson); |
| List<ProcessTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs); |
| |
| long processDefinitionCode = CodeGenerateUtils.getInstance().genCode(); |
| ProcessDefinition processDefinition = |
| new ProcessDefinition(projectCode, name, processDefinitionCode, description, |
| globalParams, locations, timeout, loginUser.getId()); |
| processDefinition.setExecutionType(executionType); |
| |
| return createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs, otherParamsJson); |
| } |
| |
| private void createWorkflowValid(User user, ProcessDefinition processDefinition) { |
| Project project = projectMapper.queryByCode(processDefinition.getProjectCode()); |
| if (project == null) { |
| throw new ServiceException(Status.PROJECT_NOT_FOUND, processDefinition.getProjectCode()); |
| } |
| // check user access for project |
| projectService.checkProjectAndAuthThrowException(user, project, WORKFLOW_CREATE); |
| |
| if (checkDescriptionLength(processDefinition.getDescription())) { |
| throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR); |
| } |
| |
| // check whether the new process define name exist |
| ProcessDefinition definition = |
| processDefinitionMapper.verifyByDefineName(project.getCode(), processDefinition.getName()); |
| if (definition != null) { |
| throw new ServiceException(Status.PROCESS_DEFINITION_NAME_EXIST, processDefinition.getName()); |
| } |
| |
| } |
| |
| private void syncObj2Log(User user, ProcessDefinition processDefinition) { |
| ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); |
| processDefinitionLog.setOperator(user.getId()); |
| int result = processDefinitionLogMapper.insert(processDefinitionLog); |
| if (result <= 0) { |
| throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_LOG_ERROR); |
| } |
| } |
| |
| /** |
| * create single process definition |
| * |
| * @param loginUser login user |
| * @param workflowCreateRequest the new workflow object will be created |
| * @return New ProcessDefinition object created just now |
| */ |
| @Override |
| @Transactional |
| public ProcessDefinition createSingleProcessDefinition(User loginUser, |
| WorkflowCreateRequest workflowCreateRequest) { |
| ProcessDefinition processDefinition = workflowCreateRequest.convert2ProcessDefinition(); |
| this.createWorkflowValid(loginUser, processDefinition); |
| |
| long processDefinitionCode; |
| try { |
| processDefinitionCode = CodeGenerateUtils.getInstance().genCode(); |
| } catch (CodeGenerateException e) { |
| throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS); |
| } |
| |
| processDefinition.setCode(processDefinitionCode); |
| processDefinition.setUserId(loginUser.getId()); |
| |
| int create = processDefinitionMapper.insert(processDefinition); |
| if (create <= 0) { |
| throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); |
| } |
| this.syncObj2Log(loginUser, processDefinition); |
| return processDefinition; |
| } |
| |
| protected Map<String, Object> createDagDefine(User loginUser, |
| List<ProcessTaskRelationLog> taskRelationList, |
| ProcessDefinition processDefinition, |
| List<TaskDefinitionLog> taskDefinitionLogs, String otherParamsJson) { |
| Map<String, Object> result = new HashMap<>(); |
| int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), |
| taskDefinitionLogs, Boolean.TRUE); |
| if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) { |
| log.info("The task has not changed, so skip"); |
| } |
| if (saveTaskResult == Constants.DEFINITION_FAILURE) { |
| log.error("Save task definition error."); |
| throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); |
| } |
| int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); |
| if (insertVersion == 0) { |
| log.error("Save process definition error, processCode:{}.", processDefinition.getCode()); |
| throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); |
| } else { |
| log.info("Save process definition complete, processCode:{}, processVersion:{}.", |
| processDefinition.getCode(), insertVersion); |
| } |
| int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), |
| processDefinition.getCode(), |
| insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE); |
| if (insertResult != Constants.EXIT_CODE_SUCCESS) { |
| log.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); |
| throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); |
| } else { |
| log.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); |
| } |
| |
| saveOtherRelation(loginUser, processDefinition, result, otherParamsJson); |
| |
| putMsg(result, Status.SUCCESS); |
| result.put(Constants.DATA_LIST, processDefinition); |
| return result; |
| } |
| |
| private List<TaskDefinitionLog> generateTaskDefinitionList(String taskDefinitionJson) { |
| try { |
| List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); |
| if (CollectionUtils.isEmpty(taskDefinitionLogs)) { |
| log.error("Generate task definition list failed, the given taskDefinitionJson is invalided: {}", |
| taskDefinitionJson); |
| throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionJson); |
| } |
| for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { |
| if (!taskPluginManager.checkTaskParameters(ParametersNode.builder() |
| .taskType(taskDefinitionLog.getTaskType()) |
| .taskParams(taskDefinitionLog.getTaskParams()) |
| .dependence(taskDefinitionLog.getDependence()) |
| .build())) { |
| log.error( |
| "Generate task definition list failed, the given task definition parameter is invalided, taskName: {}, taskDefinition: {}", |
| taskDefinitionLog.getName(), taskDefinitionLog); |
| throw new ServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName()); |
| } |
| } |
| return taskDefinitionLogs; |
| } catch (ServiceException ex) { |
| throw ex; |
| } catch (Exception e) { |
| log.error("Generate task definition list failed, meet an unknown exception", e); |
| throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR); |
| } |
| } |
| |
| private List<ProcessTaskRelationLog> generateTaskRelationList(String taskRelationJson, |
| List<TaskDefinitionLog> taskDefinitionLogs) { |
| try { |
| List<ProcessTaskRelationLog> taskRelationList = |
| JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); |
| if (CollectionUtils.isEmpty(taskRelationList)) { |
| log.error("Generate task relation list failed the taskRelation list is empty, taskRelationJson: {}", |
| taskRelationJson); |
| throw new ServiceException(Status.DATA_IS_NOT_VALID); |
| } |
| List<ProcessTaskRelation> processTaskRelations = taskRelationList.stream() |
| .map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), |
| ProcessTaskRelation.class)) |
| .collect(Collectors.toList()); |
| List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); |
| if (taskNodeList.size() != taskRelationList.size()) { |
| Set<Long> postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode) |
| .collect(Collectors.toSet()); |
| Set<Long> taskNodeCodes = taskNodeList.stream().map(TaskNode::getCode).collect(Collectors.toSet()); |
| Collection<Long> codes = CollectionUtils.subtract(postTaskCodes, taskNodeCodes); |
| if (CollectionUtils.isNotEmpty(codes)) { |
| String taskCodes = StringUtils.join(codes, Constants.COMMA); |
| log.error("Task definitions do not exist, taskCodes:{}.", taskCodes); |
| throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCodes); |
| } |
| } |
| if (graphHasCycle(taskNodeList)) { |
| log.error("Process DAG has cycle."); |
| throw new ServiceException(Status.PROCESS_NODE_HAS_CYCLE); |
| } |
| |
| // check whether the task relation json is normal |
| for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) { |
| if (processTaskRelationLog.getPostTaskCode() == 0) { |
| log.error("The post_task_code or post_task_version of processTaskRelationLog can not be zero, " + |
| "processTaskRelationLogId:{}.", processTaskRelationLog.getId()); |
| throw new ServiceException(Status.CHECK_PROCESS_TASK_RELATION_ERROR); |
| } |
| } |
| return taskRelationList; |
| } catch (ServiceException ex) { |
| throw ex; |
| } catch (Exception e) { |
| log.error("Check task relation list error, meet an unknown exception, given taskRelationJson: {}", |
| taskRelationJson, e); |
| throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR); |
| } |
| } |
| |
| /** |
| * query process definition list |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @return definition list |
| */ |
| @Override |
| public Map<String, Object> queryProcessDefinitionList(User loginUser, long projectCode) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| List<ProcessDefinition> resourceList = processDefinitionMapper.queryAllDefinitionList(projectCode); |
| List<DagData> dagDataList = resourceList.stream().map(processService::genDagData).collect(Collectors.toList()); |
| result.put(Constants.DATA_LIST, dagDataList); |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * query process definition simple list |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @return definition simple list |
| */ |
| @Override |
| public Map<String, Object> queryProcessDefinitionSimpleList(User loginUser, long projectCode) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| List<ProcessDefinition> processDefinitions = processDefinitionMapper.queryAllDefinitionList(projectCode); |
| ArrayNode arrayNode = JSONUtils.createArrayNode(); |
| for (ProcessDefinition processDefinition : processDefinitions) { |
| ObjectNode processDefinitionNode = JSONUtils.createObjectNode(); |
| processDefinitionNode.put("id", processDefinition.getId()); |
| processDefinitionNode.put("code", processDefinition.getCode()); |
| processDefinitionNode.put("name", processDefinition.getName()); |
| processDefinitionNode.put("projectCode", processDefinition.getProjectCode()); |
| arrayNode.add(processDefinitionNode); |
| } |
| result.put(Constants.DATA_LIST, arrayNode); |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * query process definition list paging |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param searchVal search value |
| * @param userId user id |
| * @param pageNo page number |
| * @param pageSize page size |
| * @return process definition page |
| */ |
| @Override |
| public PageInfo<ProcessDefinition> queryProcessDefinitionListPaging(@NonNull User loginUser, |
| long projectCode, |
| String searchVal, |
| String otherParamsJson, |
| Integer userId, |
| Integer pageNo, |
| Integer pageSize) { |
| Project project = projectMapper.queryByCode(projectCode); |
| |
| // check user access for project |
| projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION); |
| |
| PageListingResult<ProcessDefinition> processDefinitionsPageListingResult = |
| processDefinitionDao.listingProcessDefinition( |
| pageNo, pageSize, searchVal, userId, projectCode); |
| List<ProcessDefinition> processDefinitions = processDefinitionsPageListingResult.getRecords(); |
| |
| List<Long> processDefinitionCodes = |
| processDefinitions.stream().map(ProcessDefinition::getCode).collect(Collectors.toList()); |
| Map<Long, Schedule> scheduleMap = schedulerService.queryScheduleByProcessDefinitionCodes(processDefinitionCodes) |
| .stream() |
| .collect(Collectors.toMap(Schedule::getProcessDefinitionCode, Function.identity())); |
| List<UserWithProcessDefinitionCode> userWithCodes = userMapper.queryUserWithProcessDefinitionCode( |
| processDefinitionCodes); |
| for (ProcessDefinition pd : processDefinitions) { |
| userWithCodes.stream() |
| .filter(userWithCode -> userWithCode.getProcessDefinitionCode() == pd.getCode() |
| && userWithCode.getProcessDefinitionVersion() == pd.getVersion()) |
| .findAny().ifPresent(userWithCode -> { |
| pd.setModifyBy(userWithCode.getModifierName()); |
| pd.setUserName(userWithCode.getCreatorName()); |
| }); |
| Schedule schedule = scheduleMap.get(pd.getCode()); |
| pd.setScheduleReleaseState(schedule == null ? null : schedule.getReleaseState()); |
| } |
| |
| PageInfo<ProcessDefinition> pageInfo = new PageInfo<>(pageNo, pageSize); |
| pageInfo.setTotal((int) processDefinitionsPageListingResult.getTotalCount()); |
| pageInfo.setTotalList(processDefinitions); |
| |
| return pageInfo; |
| } |
| |
| /** |
| * Filter resource process definitions |
| * |
| * @param loginUser login user |
| * @param workflowFilterRequest workflow filter requests |
| * @return List process definition |
| */ |
| @Override |
| public PageInfo<ProcessDefinition> filterProcessDefinition(User loginUser, |
| WorkflowFilterRequest workflowFilterRequest) { |
| ProcessDefinition processDefinition = workflowFilterRequest.convert2ProcessDefinition(); |
| if (workflowFilterRequest.getProjectName() != null) { |
| Project project = projectMapper.queryByName(workflowFilterRequest.getProjectName()); |
| // check user access for project |
| projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION); |
| processDefinition.setProjectCode(project.getCode()); |
| } |
| |
| Page<ProcessDefinition> page = |
| new Page<>(workflowFilterRequest.getPageNo(), workflowFilterRequest.getPageSize()); |
| IPage<ProcessDefinition> processDefinitionIPage = |
| processDefinitionMapper.filterProcessDefinition(page, processDefinition); |
| |
| List<ProcessDefinition> records = processDefinitionIPage.getRecords(); |
| for (ProcessDefinition pd : records) { |
| ProcessDefinitionLog processDefinitionLog = |
| processDefinitionLogMapper.queryByDefinitionCodeAndVersion(pd.getCode(), pd.getVersion()); |
| User user = userMapper.selectById(processDefinitionLog.getOperator()); |
| pd.setModifyBy(user.getUserName()); |
| } |
| |
| processDefinitionIPage.setRecords(records); |
| PageInfo<ProcessDefinition> pageInfo = |
| new PageInfo<>(workflowFilterRequest.getPageNo(), workflowFilterRequest.getPageSize()); |
| pageInfo.setTotal((int) processDefinitionIPage.getTotal()); |
| pageInfo.setTotalList(processDefinitionIPage.getRecords()); |
| |
| return pageInfo; |
| } |
| |
| /** |
| * query detail of process definition |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param code process definition code |
| * @return process definition detail |
| */ |
| @Override |
| public Map<String, Object> queryProcessDefinitionByCode(User loginUser, long projectCode, long code) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); |
| if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { |
| log.error("Process definition does not exist, processCode:{}.", code); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); |
| } else { |
| DagData dagData = processService.genDagData(processDefinition); |
| result.put(Constants.DATA_LIST, dagData); |
| putMsg(result, Status.SUCCESS); |
| } |
| return result; |
| } |
| |
| /** |
| * query detail of process definition |
| * |
| * @param loginUser login user |
| * @param code process definition code |
| * @return process definition detail |
| */ |
| @Override |
| public ProcessDefinition getProcessDefinition(User loginUser, long code) { |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); |
| if (processDefinition == null) { |
| throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); |
| } |
| |
| Project project = projectMapper.queryByCode(processDefinition.getProjectCode()); |
| // check user access for project |
| projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION); |
| |
| return processDefinition; |
| } |
| |
| @Override |
| public Optional<ProcessDefinition> queryWorkflowDefinition(long workflowDefinitionCode, |
| int workflowDefinitionVersion) { |
| ProcessDefinition workflowDefinition = processDefinitionDao.queryByCode(workflowDefinitionCode).orElse(null); |
| if (workflowDefinition == null || workflowDefinition.getVersion() != workflowDefinitionVersion) { |
| workflowDefinition = processDefinitionLogDao.queryProcessDefinitionLog(workflowDefinitionCode, |
| workflowDefinitionVersion); |
| } |
| return Optional.ofNullable(workflowDefinition); |
| } |
| |
| @Override |
| public ProcessDefinition queryWorkflowDefinitionThrowExceptionIfNotFound(long workflowDefinitionCode, |
| int workflowDefinitionVersion) { |
| return queryWorkflowDefinition(workflowDefinitionCode, workflowDefinitionVersion) |
| .orElseThrow(() -> new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, |
| String.valueOf(workflowDefinitionCode))); |
| } |
| |
| @Override |
| public Map<String, Object> queryProcessDefinitionByName(User loginUser, long projectCode, String name) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, name); |
| |
| if (processDefinition == null) { |
| log.error("Process definition does not exist, projectCode:{}.", projectCode); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, name); |
| } else { |
| DagData dagData = processService.genDagData(processDefinition); |
| result.put(Constants.DATA_LIST, dagData); |
| putMsg(result, Status.SUCCESS); |
| } |
| return result; |
| } |
| |
| /** |
| * update process definition |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param name process definition name |
| * @param code process definition code |
| * @param description description |
| * @param globalParams global params |
| * @param locations locations for nodes |
| * @param timeout timeout |
| * @param taskRelationJson relation json for nodes |
| * @param taskDefinitionJson taskDefinitionJson |
| * @param otherParamsJson otherParamsJson handle other params |
| * @return update result code |
| */ |
| @Override |
| @Transactional |
| public Map<String, Object> updateProcessDefinition(User loginUser, |
| long projectCode, |
| String name, |
| long code, |
| String description, |
| String globalParams, |
| String locations, |
| int timeout, |
| String taskRelationJson, |
| String taskDefinitionJson, |
| String otherParamsJson, |
| ProcessExecutionTypeEnum executionType) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check if user have write perm for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_UPDATE); |
| boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result); |
| if (!hasProjectAndWritePerm) { |
| return result; |
| } |
| |
| if (checkDescriptionLength(description)) { |
| log.warn("Parameter description is too long."); |
| putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR); |
| return result; |
| } |
| List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson); |
| List<ProcessTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs); |
| |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); |
| // check process definition exists |
| if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { |
| log.error("Process definition does not exist, processCode:{}.", code); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); |
| return result; |
| } |
| if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { |
| // online can not permit edit |
| log.warn("Process definition is not allowed to be modified due to {}, processDefinitionCode:{}.", |
| ReleaseState.ONLINE.getDescp(), processDefinition.getCode()); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName()); |
| return result; |
| } |
| if (!name.equals(processDefinition.getName())) { |
| // check whether the new process define name exist |
| ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name); |
| if (definition != null) { |
| log.warn("Process definition with the same name already exists, processDefinitionCode:{}.", |
| definition.getCode()); |
| putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name); |
| return result; |
| } |
| } |
| ProcessDefinition processDefinitionDeepCopy = |
| JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class); |
| processDefinition.set(projectCode, name, description, globalParams, locations, timeout); |
| processDefinition.setExecutionType(executionType); |
| return updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, |
| taskDefinitionLogs, otherParamsJson); |
| } |
| |
| /** |
| * Task want to delete whether used in other task, should throw exception when have be used. |
| * |
| * This function avoid delete task already dependencies by other tasks by accident. |
| * |
| * @param processDefinition ProcessDefinition you change task definition and task relation |
| * @param taskRelationList All the latest task relation list from process definition |
| */ |
| private void taskUsedInOtherTaskValid(ProcessDefinition processDefinition, |
| List<ProcessTaskRelationLog> taskRelationList) { |
| List<ProcessTaskRelation> oldProcessTaskRelationList = processTaskRelationMapper |
| .queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); |
| Set<ProcessTaskRelationLog> oldProcessTaskRelationSet = |
| oldProcessTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toSet()); |
| StringBuilder sb = new StringBuilder(); |
| for (ProcessTaskRelationLog oldProcessTaskRelation : oldProcessTaskRelationSet) { |
| boolean oldTaskExists = taskRelationList.stream() |
| .anyMatch(relation -> oldProcessTaskRelation.getPostTaskCode() == relation.getPostTaskCode()); |
| if (!oldTaskExists) { |
| Optional<String> taskDepMsg = workFlowLineageService.taskDepOnTaskMsg( |
| processDefinition.getProjectCode(), oldProcessTaskRelation.getProcessDefinitionCode(), |
| oldProcessTaskRelation.getPostTaskCode()); |
| taskDepMsg.ifPresent(sb::append); |
| } |
| if (sb.length() != 0) { |
| log.error("Task cannot be deleted because it is dependent"); |
| throw new ServiceException(sb.toString()); |
| } |
| } |
| } |
| |
| protected Map<String, Object> updateDagDefine(User loginUser, |
| List<ProcessTaskRelationLog> taskRelationList, |
| ProcessDefinition processDefinition, |
| ProcessDefinition processDefinitionDeepCopy, |
| List<TaskDefinitionLog> taskDefinitionLogs, |
| String otherParamsJson) { |
| Map<String, Object> result = new HashMap<>(); |
| int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), |
| taskDefinitionLogs, Boolean.TRUE); |
| if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) { |
| log.info("The task has not changed, so skip"); |
| } |
| if (saveTaskResult == Constants.DEFINITION_FAILURE) { |
| log.error("Update task definitions error, projectCode:{}, processCode:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode()); |
| putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); |
| throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); |
| } |
| boolean isChange = false; |
| if (processDefinition.equals(processDefinitionDeepCopy) && saveTaskResult == Constants.EXIT_CODE_SUCCESS) { |
| List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper |
| .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); |
| if (taskRelationList.size() == processTaskRelationLogList.size()) { |
| Set<ProcessTaskRelationLog> taskRelationSet = new HashSet<>(taskRelationList); |
| Set<ProcessTaskRelationLog> processTaskRelationLogSet = new HashSet<>(processTaskRelationLogList); |
| if (taskRelationSet.size() == processTaskRelationLogSet.size()) { |
| taskRelationSet.removeAll(processTaskRelationLogSet); |
| if (!taskRelationSet.isEmpty()) { |
| isChange = true; |
| } |
| } else { |
| isChange = true; |
| } |
| } else { |
| isChange = true; |
| } |
| } else { |
| isChange = true; |
| } |
| if (isChange) { |
| log.info("Process definition needs to be updated, projectCode:{}, processCode:{}, processVersion:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion()); |
| processDefinition.setUpdateTime(new Date()); |
| int insertVersion = |
| processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); |
| if (insertVersion <= 0) { |
| log.error("Update process definition error, processCode:{}.", processDefinition.getCode()); |
| putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); |
| throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); |
| } else { |
| log.info("Update process definition complete, processCode:{}, processVersion:{}.", |
| processDefinition.getCode(), insertVersion); |
| } |
| |
| taskUsedInOtherTaskValid(processDefinition, taskRelationList); |
| int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), |
| processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE); |
| if (insertResult == Constants.EXIT_CODE_SUCCESS) { |
| log.info( |
| "Update process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); |
| putMsg(result, Status.SUCCESS); |
| result.put(Constants.DATA_LIST, processDefinition); |
| } else { |
| log.error("Update process task relations error, projectCode:{}, processCode:{}, processVersion:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); |
| putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); |
| throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); |
| } |
| saveOtherRelation(loginUser, processDefinition, result, otherParamsJson); |
| } else { |
| log.info( |
| "Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion()); |
| putMsg(result, Status.SUCCESS); |
| result.put(Constants.DATA_LIST, processDefinition); |
| } |
| return result; |
| } |
| |
| /** |
| * verify process definition name unique |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param name name |
| * @return true if process definition name not exists, otherwise false |
| */ |
| @Override |
| public Map<String, Object> verifyProcessDefinitionName(User loginUser, long projectCode, String name, |
| long processDefinitionCode) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_CREATE); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| ProcessDefinition processDefinition = |
| processDefinitionMapper.verifyByDefineName(project.getCode(), name.trim()); |
| if (processDefinition == null) { |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| if (processDefinitionCode != 0 && processDefinitionCode == processDefinition.getCode()) { |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| log.warn("Process definition with the same name {} already exists, processDefinitionCode:{}.", |
| processDefinition.getName(), processDefinition.getCode()); |
| putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name.trim()); |
| return result; |
| } |
| |
| @Override |
| @Transactional |
| public Map<String, Object> batchDeleteProcessDefinitionByCodes(User loginUser, long projectCode, String codes) { |
| Map<String, Object> result = new HashMap<>(); |
| if (StringUtils.isEmpty(codes)) { |
| log.error("Parameter processDefinitionCodes is empty, projectCode is {}.", projectCode); |
| putMsg(result, Status.PROCESS_DEFINITION_CODES_IS_EMPTY); |
| return result; |
| } |
| |
| Set<Long> definitionCodes = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong) |
| .collect(Collectors.toSet()); |
| List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(definitionCodes); |
| Set<Long> queryCodes = |
| processDefinitionList.stream().map(ProcessDefinition::getCode).collect(Collectors.toSet()); |
| // definitionCodes - queryCodes |
| Set<Long> diffCode = |
| definitionCodes.stream().filter(code -> !queryCodes.contains(code)).collect(Collectors.toSet()); |
| |
| if (CollectionUtils.isNotEmpty(diffCode)) { |
| log.error("Process definition does not exist, processCodes:{}.", |
| diffCode.stream().map(String::valueOf).collect(Collectors.joining(Constants.COMMA))); |
| throw new ServiceException(Status.BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR, |
| diffCode.stream().map(code -> code + "[process definition not exist]") |
| .collect(Collectors.joining(Constants.COMMA))); |
| } |
| |
| for (ProcessDefinition process : processDefinitionList) { |
| try { |
| this.deleteProcessDefinitionByCode(loginUser, process.getCode()); |
| metricsCleanUpService.cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(process.getCode())); |
| } catch (Exception e) { |
| throw new ServiceException(Status.DELETE_PROCESS_DEFINE_ERROR, process.getName(), e.getMessage()); |
| } |
| } |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * Process definition want to delete whether used in other task, should throw exception when have be used. |
| * |
| * This function avoid delete process definition already dependencies by other tasks by accident. |
| * |
| * @param processDefinition ProcessDefinition you change task definition and task relation |
| */ |
| private void processDefinitionUsedInOtherTaskValid(ProcessDefinition processDefinition) { |
| // check process definition is already online |
| if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { |
| throw new ServiceException(Status.PROCESS_DEFINE_STATE_ONLINE, processDefinition.getName()); |
| } |
| |
| // check process instances is already running |
| List<ProcessInstance> processInstances = processInstanceService |
| .queryByProcessDefineCodeAndStatus(processDefinition.getCode(), |
| org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES); |
| if (CollectionUtils.isNotEmpty(processInstances)) { |
| throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, processInstances.size()); |
| } |
| |
| // check process used by other task, including subprocess and dependent task type |
| Set<TaskMainInfo> taskDepOnProcess = workFlowLineageService |
| .queryTaskDepOnProcess(processDefinition.getProjectCode(), processDefinition.getCode()); |
| if (CollectionUtils.isNotEmpty(taskDepOnProcess)) { |
| String taskDepDetail = taskDepOnProcess.stream() |
| .map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(), |
| task.getTaskName())) |
| .collect(Collectors.joining(Constants.COMMA)); |
| throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, taskDepDetail); |
| } |
| } |
| |
| public void deleteProcessDefinitionByCode(User loginUser, long code) { |
| ProcessDefinition processDefinition = processDefinitionDao.queryByCode(code) |
| .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, String.valueOf(code))); |
| |
| Project project = projectMapper.queryByCode(processDefinition.getProjectCode()); |
| // check user access for project |
| projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION_DELETE); |
| |
| // Determine if the login user is the owner of the process definition |
| if (loginUser.getId() != processDefinition.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) { |
| throw new ServiceException(Status.USER_NO_OPERATION_PERM); |
| } |
| |
| processDefinitionUsedInOtherTaskValid(processDefinition); |
| |
| // get the timing according to the process definition |
| Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code); |
| if (scheduleObj != null) { |
| if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) { |
| int delete = scheduleMapper.deleteById(scheduleObj.getId()); |
| if (delete == 0) { |
| throw new ServiceException(Status.DELETE_SCHEDULE_BY_ID_ERROR); |
| } |
| } |
| if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) { |
| throw new ServiceException(Status.SCHEDULE_STATE_ONLINE, scheduleObj.getId()); |
| } |
| } |
| |
| // delete workflow instance, will delete workflow instance, sub workflow instance, task instance, alert |
| processInstanceService.deleteProcessInstanceByWorkflowDefinitionCode(processDefinition.getCode()); |
| // delete task definition |
| taskDefinitionService.deleteTaskByWorkflowDefinitionCode(processDefinition.getCode(), |
| processDefinition.getVersion()); |
| // delete task definition log |
| taskDefinitionLogService.deleteTaskByWorkflowDefinitionCode(processDefinition.getCode()); |
| // delete workflow definition log |
| processDefinitionLogDao.deleteByWorkflowDefinitionCode(processDefinition.getCode()); |
| deleteOtherRelation(project, new HashMap<>(), processDefinition); |
| |
| // we delete the workflow definition at last to avoid using transaction here. |
| // If delete error, we can call this interface again. |
| processDefinitionDao.deleteByWorkflowDefinitionCode(processDefinition.getCode()); |
| metricsCleanUpService.cleanUpWorkflowMetricsByDefinitionCode(String.valueOf(code)); |
| log.info("Success delete workflow definition workflowDefinitionCode: {}", code); |
| } |
| |
| /** |
| * release process definition: online / offline |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param code process definition code |
| * @param releaseState release state |
| * @return release result code |
| */ |
| @Override |
| @Transactional |
| public Map<String, Object> releaseProcessDefinition(User loginUser, long projectCode, long code, |
| ReleaseState releaseState) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_ONLINE_OFFLINE); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| |
| // check state |
| if (null == releaseState) { |
| putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); |
| return result; |
| } |
| |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); |
| if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { |
| log.error("Process definition does not exist, processDefinitionCode:{}.", code); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); |
| return result; |
| } |
| switch (releaseState) { |
| case ONLINE: |
| List<ProcessTaskRelation> relationList = |
| processService.findRelationByCode(code, processDefinition.getVersion()); |
| if (CollectionUtils.isEmpty(relationList)) { |
| log.warn("Process definition has no task relation, processDefinitionCode:{}.", code); |
| putMsg(result, Status.PROCESS_DAG_IS_EMPTY); |
| return result; |
| } |
| processDefinition.setReleaseState(releaseState); |
| processDefinitionMapper.updateById(processDefinition); |
| log.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode, |
| code); |
| break; |
| case OFFLINE: |
| processDefinition.setReleaseState(releaseState); |
| int updateProcess = processDefinitionMapper.updateById(processDefinition); |
| Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(code); |
| if (updateProcess > 0) { |
| log.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.", |
| projectCode, code); |
| if (schedule != null) { |
| // set status |
| schedule.setReleaseState(releaseState); |
| int updateSchedule = scheduleMapper.updateById(schedule); |
| if (updateSchedule == 0) { |
| log.error( |
| "Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", |
| projectCode, code, schedule.getId()); |
| putMsg(result, Status.OFFLINE_SCHEDULE_ERROR); |
| throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR); |
| } else { |
| log.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", |
| projectCode, code, schedule.getId()); |
| } |
| schedulerService.deleteSchedule(project.getId(), schedule.getId()); |
| } |
| } |
| break; |
| default: |
| putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); |
| return result; |
| } |
| |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * batch export process definition by codes |
| */ |
| @Override |
| public void batchExportProcessDefinitionByCodes(User loginUser, long projectCode, String codes, |
| HttpServletResponse response) { |
| if (StringUtils.isEmpty(codes)) { |
| log.warn("Process definition codes to be exported is empty."); |
| return; |
| } |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_EXPORT); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return; |
| } |
| Set<Long> defineCodeSet = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong) |
| .collect(Collectors.toSet()); |
| List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet); |
| if (CollectionUtils.isEmpty(processDefinitionList)) { |
| log.error("Process definitions to be exported do not exist, processDefinitionCodes:{}.", defineCodeSet); |
| return; |
| } |
| // check processDefinition exist in project |
| List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream() |
| .filter(o -> projectCode == o.getProjectCode()).collect(Collectors.toList()); |
| List<DagDataSchedule> dagDataSchedules = |
| processDefinitionListInProject.stream().map(this::exportProcessDagData).collect(Collectors.toList()); |
| if (CollectionUtils.isNotEmpty(dagDataSchedules)) { |
| log.info("Start download process definition file, processDefinitionCodes:{}.", defineCodeSet); |
| downloadProcessDefinitionFile(response, dagDataSchedules); |
| } else { |
| log.error("There is no exported process dag data."); |
| } |
| } |
| |
| /** |
| * download the process definition file |
| */ |
| protected void downloadProcessDefinitionFile(HttpServletResponse response, List<DagDataSchedule> dagDataSchedules) { |
| response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); |
| BufferedOutputStream buff = null; |
| ServletOutputStream out = null; |
| try { |
| out = response.getOutputStream(); |
| buff = new BufferedOutputStream(out); |
| buff.write(JSONUtils.toPrettyJsonString(dagDataSchedules).getBytes(StandardCharsets.UTF_8)); |
| buff.flush(); |
| buff.close(); |
| } catch (IOException e) { |
| log.warn("Export process definition fail", e); |
| } finally { |
| if (null != buff) { |
| try { |
| buff.close(); |
| } catch (Exception e) { |
| log.warn("Buffer does not close", e); |
| } |
| } |
| if (null != out) { |
| try { |
| out.close(); |
| } catch (Exception e) { |
| log.warn("Output stream does not close", e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * get export process dag data |
| * |
| * @param processDefinition process definition |
| * @return DagDataSchedule |
| */ |
| public DagDataSchedule exportProcessDagData(ProcessDefinition processDefinition) { |
| Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinition.getCode()); |
| DagDataSchedule dagDataSchedule = new DagDataSchedule(processService.genDagData(processDefinition)); |
| if (scheduleObj != null) { |
| scheduleObj.setReleaseState(ReleaseState.OFFLINE); |
| dagDataSchedule.setSchedule(scheduleObj); |
| } |
| return dagDataSchedule; |
| } |
| |
| /** |
| * import process definition |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param file process metadata json file |
| * @return import process |
| */ |
| @Override |
| @Transactional |
| public Map<String, Object> importProcessDefinition(User loginUser, long projectCode, MultipartFile file) { |
| Map<String, Object> result; |
| String dagDataScheduleJson = FileUtils.file2String(file); |
| List<DagDataSchedule> dagDataScheduleList = JSONUtils.toList(dagDataScheduleJson, DagDataSchedule.class); |
| Project project = projectMapper.queryByCode(projectCode); |
| result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_IMPORT); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| // check file content |
| if (CollectionUtils.isEmpty(dagDataScheduleList)) { |
| log.warn("Process definition file content is empty."); |
| putMsg(result, Status.DATA_IS_NULL, "fileContent"); |
| return result; |
| } |
| for (DagDataSchedule dagDataSchedule : dagDataScheduleList) { |
| if (!checkAndImport(loginUser, projectCode, result, dagDataSchedule, EMPTY_STRING)) { |
| return result; |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| @Transactional |
| public Map<String, Object> importSqlProcessDefinition(User loginUser, long projectCode, MultipartFile file) { |
| Map<String, Object> result; |
| Project project = projectMapper.queryByCode(projectCode); |
| result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_IMPORT); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| String processDefinitionName = file.getOriginalFilename() == null ? file.getName() : file.getOriginalFilename(); |
| int index = processDefinitionName.lastIndexOf("."); |
| if (index > 0) { |
| processDefinitionName = processDefinitionName.substring(0, index); |
| } |
| processDefinitionName = getNewName(processDefinitionName, IMPORT_SUFFIX); |
| |
| ProcessDefinition processDefinition; |
| List<TaskDefinitionLog> taskDefinitionList = new ArrayList<>(); |
| List<ProcessTaskRelationLog> processTaskRelationList = new ArrayList<>(); |
| |
| // for Zip Bomb Attack |
| final int THRESHOLD_ENTRIES = 10000; |
| final int THRESHOLD_SIZE = 1000000000; // 1 GB |
| final double THRESHOLD_RATIO = 10; |
| int totalEntryArchive = 0; |
| int totalSizeEntry = 0; |
| // In most cases, there will be only one data source |
| Map<String, DataSource> dataSourceCache = new HashMap<>(1); |
| Map<String, Long> taskNameToCode = new HashMap<>(16); |
| Map<String, List<String>> taskNameToUpstream = new HashMap<>(16); |
| try ( |
| ZipInputStream zIn = new ZipInputStream(file.getInputStream()); |
| BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(zIn))) { |
| // build process definition |
| processDefinition = new ProcessDefinition(projectCode, |
| processDefinitionName, |
| CodeGenerateUtils.getInstance().genCode(), |
| "", |
| "[]", null, |
| 0, loginUser.getId()); |
| |
| ZipEntry entry; |
| while ((entry = zIn.getNextEntry()) != null) { |
| totalEntryArchive++; |
| int totalSizeArchive = 0; |
| if (!entry.isDirectory()) { |
| StringBuilder sql = new StringBuilder(); |
| String taskName = null; |
| String datasourceName = null; |
| List<String> upstreams = Collections.emptyList(); |
| String line; |
| while ((line = bufferedReader.readLine()) != null) { |
| int nBytes = line.getBytes(StandardCharsets.UTF_8).length; |
| totalSizeEntry += nBytes; |
| totalSizeArchive += nBytes; |
| long compressionRatio = totalSizeEntry / entry.getCompressedSize(); |
| if (compressionRatio > THRESHOLD_RATIO) { |
| throw new IllegalStateException( |
| "Ratio between compressed and uncompressed data is highly suspicious, looks like a Zip Bomb Attack."); |
| } |
| int commentIndex = line.indexOf("-- "); |
| if (commentIndex >= 0) { |
| int colonIndex = line.indexOf(":", commentIndex); |
| if (colonIndex > 0) { |
| String key = line.substring(commentIndex + 3, colonIndex).trim().toLowerCase(); |
| String value = line.substring(colonIndex + 1).trim(); |
| switch (key) { |
| case "name": |
| taskName = value; |
| line = line.substring(0, commentIndex); |
| break; |
| case "upstream": |
| upstreams = Arrays.stream(value.split(",")).map(String::trim) |
| .filter(s -> !"".equals(s)).collect(Collectors.toList()); |
| line = line.substring(0, commentIndex); |
| break; |
| case "datasource": |
| datasourceName = value; |
| line = line.substring(0, commentIndex); |
| break; |
| default: |
| break; |
| } |
| } |
| } |
| if (!"".equals(line)) { |
| sql.append(line).append("\n"); |
| } |
| } |
| // import/sql1.sql -> sql1 |
| if (taskName == null) { |
| taskName = entry.getName(); |
| index = taskName.indexOf("/"); |
| if (index > 0) { |
| taskName = taskName.substring(index + 1); |
| } |
| index = taskName.lastIndexOf("."); |
| if (index > 0) { |
| taskName = taskName.substring(0, index); |
| } |
| } |
| DataSource dataSource = dataSourceCache.get(datasourceName); |
| if (dataSource == null) { |
| dataSource = queryDatasourceByNameAndUser(datasourceName, loginUser); |
| } |
| if (dataSource == null) { |
| log.error("Datasource does not found, may be its name is illegal."); |
| putMsg(result, Status.DATASOURCE_NAME_ILLEGAL); |
| return result; |
| } |
| dataSourceCache.put(datasourceName, dataSource); |
| |
| TaskDefinitionLog taskDefinition = |
| buildNormalSqlTaskDefinition(taskName, dataSource, sql.substring(0, sql.length() - 1)); |
| |
| taskDefinitionList.add(taskDefinition); |
| taskNameToCode.put(taskDefinition.getName(), taskDefinition.getCode()); |
| taskNameToUpstream.put(taskDefinition.getName(), upstreams); |
| } |
| |
| if (totalSizeArchive > THRESHOLD_SIZE) { |
| throw new IllegalStateException( |
| "the uncompressed data size is too much for the application resource capacity"); |
| } |
| |
| if (totalEntryArchive > THRESHOLD_ENTRIES) { |
| throw new IllegalStateException( |
| "too much entries in this archive, can lead to inodes exhaustion of the system"); |
| } |
| } |
| } catch (Exception e) { |
| log.error("Import process definition error.", e); |
| putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); |
| return result; |
| } |
| |
| // build task relation |
| for (Map.Entry<String, Long> entry : taskNameToCode.entrySet()) { |
| List<String> upstreams = taskNameToUpstream.get(entry.getKey()); |
| if (CollectionUtils.isEmpty(upstreams) |
| || (upstreams.size() == 1 && upstreams.contains("root") && !taskNameToCode.containsKey("root"))) { |
| ProcessTaskRelationLog processTaskRelation = buildNormalTaskRelation(0, entry.getValue()); |
| processTaskRelationList.add(processTaskRelation); |
| continue; |
| } |
| for (String upstream : upstreams) { |
| ProcessTaskRelationLog processTaskRelation = |
| buildNormalTaskRelation(taskNameToCode.get(upstream), entry.getValue()); |
| processTaskRelationList.add(processTaskRelation); |
| } |
| } |
| |
| return createDagDefine(loginUser, processTaskRelationList, processDefinition, taskDefinitionList, EMPTY_STRING); |
| } |
| |
| private ProcessTaskRelationLog buildNormalTaskRelation(long preTaskCode, long postTaskCode) { |
| ProcessTaskRelationLog processTaskRelation = new ProcessTaskRelationLog(); |
| processTaskRelation.setPreTaskCode(preTaskCode); |
| processTaskRelation.setPreTaskVersion(0); |
| processTaskRelation.setPostTaskCode(postTaskCode); |
| processTaskRelation.setPostTaskVersion(0); |
| processTaskRelation.setConditionType(ConditionType.NONE); |
| processTaskRelation.setName(""); |
| return processTaskRelation; |
| } |
| |
| private DataSource queryDatasourceByNameAndUser(String datasourceName, User loginUser) { |
| if (isAdmin(loginUser)) { |
| List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(datasourceName); |
| if (CollectionUtils.isNotEmpty(dataSources)) { |
| return dataSources.get(0); |
| } |
| } else { |
| return dataSourceMapper.queryDataSourceByNameAndUserId(loginUser.getId(), datasourceName); |
| } |
| return null; |
| } |
| |
| private TaskDefinitionLog buildNormalSqlTaskDefinition(String taskName, DataSource dataSource, |
| String sql) throws CodeGenerateException { |
| TaskDefinitionLog taskDefinition = new TaskDefinitionLog(); |
| taskDefinition.setName(taskName); |
| taskDefinition.setFlag(Flag.YES); |
| SqlParameters sqlParameters = new SqlParameters(); |
| sqlParameters.setType(dataSource.getType().name()); |
| sqlParameters.setDatasource(dataSource.getId()); |
| sqlParameters.setSql(sql.substring(0, sql.length() - 1)); |
| // it may be a query type, but it can only be determined by parsing SQL |
| sqlParameters.setSqlType(SqlType.NON_QUERY.ordinal()); |
| sqlParameters.setLocalParams(Collections.emptyList()); |
| taskDefinition.setTaskParams(JSONUtils.toJsonString(sqlParameters)); |
| taskDefinition.setCode(CodeGenerateUtils.getInstance().genCode()); |
| taskDefinition.setTaskType(TASK_TYPE_SQL); |
| taskDefinition.setFailRetryTimes(0); |
| taskDefinition.setFailRetryInterval(0); |
| taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE); |
| taskDefinition.setWorkerGroup(DEFAULT_WORKER_GROUP); |
| taskDefinition.setTaskPriority(Priority.MEDIUM); |
| taskDefinition.setEnvironmentCode(-1); |
| taskDefinition.setTimeout(0); |
| taskDefinition.setDelayTime(0); |
| taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN); |
| taskDefinition.setVersion(0); |
| taskDefinition.setResourceIds(""); |
| return taskDefinition; |
| } |
| |
| /** |
| * check and import |
| */ |
| protected boolean checkAndImport(User loginUser, long projectCode, Map<String, Object> result, |
| DagDataSchedule dagDataSchedule, String otherParamsJson) { |
| if (!checkImportanceParams(dagDataSchedule, result)) { |
| return false; |
| } |
| ProcessDefinition processDefinition = dagDataSchedule.getProcessDefinition(); |
| |
| // generate import processDefinitionName |
| String processDefinitionName = recursionProcessDefinitionName(projectCode, processDefinition.getName(), 1); |
| String importProcessDefinitionName = getNewName(processDefinitionName, IMPORT_SUFFIX); |
| // unique check |
| Map<String, Object> checkResult = |
| verifyProcessDefinitionName(loginUser, projectCode, importProcessDefinitionName, 0); |
| if (Status.SUCCESS.equals(checkResult.get(Constants.STATUS))) { |
| putMsg(result, Status.SUCCESS); |
| } else { |
| result.putAll(checkResult); |
| return false; |
| } |
| processDefinition.setName(importProcessDefinitionName); |
| processDefinition.setId(null); |
| processDefinition.setProjectCode(projectCode); |
| processDefinition.setUserId(loginUser.getId()); |
| try { |
| processDefinition.setCode(CodeGenerateUtils.getInstance().genCode()); |
| } catch (CodeGenerateException e) { |
| log.error( |
| "Save process definition error because generate process definition code error, projectCode:{}.", |
| projectCode, e); |
| putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); |
| return false; |
| } |
| List<TaskDefinition> taskDefinitionList = dagDataSchedule.getTaskDefinitionList(); |
| Map<Long, Long> taskCodeMap = new HashMap<>(); |
| Date now = new Date(); |
| List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>(); |
| for (TaskDefinition taskDefinition : taskDefinitionList) { |
| TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition); |
| taskDefinitionLog.setName(taskDefinitionLog.getName()); |
| taskDefinitionLog.setProjectCode(projectCode); |
| taskDefinitionLog.setUserId(loginUser.getId()); |
| taskDefinitionLog.setVersion(Constants.VERSION_FIRST); |
| taskDefinitionLog.setCreateTime(now); |
| taskDefinitionLog.setUpdateTime(now); |
| taskDefinitionLog.setOperator(loginUser.getId()); |
| taskDefinitionLog.setOperateTime(now); |
| try { |
| long code = CodeGenerateUtils.getInstance().genCode(); |
| taskCodeMap.put(taskDefinitionLog.getCode(), code); |
| taskDefinitionLog.setCode(code); |
| } catch (CodeGenerateException e) { |
| log.error("Generate task definition code error, projectCode:{}, processDefinitionCode:{}", |
| projectCode, processDefinition.getCode(), e); |
| putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code"); |
| return false; |
| } |
| taskDefinitionLogList.add(taskDefinitionLog); |
| } |
| int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList); |
| int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList); |
| if ((logInsert & insert) == 0) { |
| log.error("Save task definition error, projectCode:{}, processDefinitionCode:{}", projectCode, |
| processDefinition.getCode()); |
| putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); |
| throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); |
| } |
| |
| List<ProcessTaskRelation> taskRelationList = dagDataSchedule.getProcessTaskRelationList(); |
| List<ProcessTaskRelationLog> taskRelationLogList = new ArrayList<>(); |
| for (ProcessTaskRelation processTaskRelation : taskRelationList) { |
| ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); |
| if (taskCodeMap.containsKey(processTaskRelationLog.getPreTaskCode())) { |
| processTaskRelationLog.setPreTaskCode(taskCodeMap.get(processTaskRelationLog.getPreTaskCode())); |
| } |
| if (taskCodeMap.containsKey(processTaskRelationLog.getPostTaskCode())) { |
| processTaskRelationLog.setPostTaskCode(taskCodeMap.get(processTaskRelationLog.getPostTaskCode())); |
| } |
| processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST); |
| processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); |
| taskRelationLogList.add(processTaskRelationLog); |
| } |
| if (StringUtils.isNotEmpty(processDefinition.getLocations()) |
| && JSONUtils.checkJsonValid(processDefinition.getLocations())) { |
| ArrayNode arrayNode = JSONUtils.parseArray(processDefinition.getLocations()); |
| ArrayNode newArrayNode = JSONUtils.createArrayNode(); |
| for (int i = 0; i < arrayNode.size(); i++) { |
| ObjectNode newObjectNode = newArrayNode.addObject(); |
| JsonNode jsonNode = arrayNode.get(i); |
| Long taskCode = taskCodeMap.get(jsonNode.get("taskCode").asLong()); |
| if (Objects.nonNull(taskCode)) { |
| newObjectNode.put("taskCode", taskCode); |
| newObjectNode.set("x", jsonNode.get("x")); |
| newObjectNode.set("y", jsonNode.get("y")); |
| } |
| } |
| processDefinition.setLocations(newArrayNode.toString()); |
| } |
| processDefinition.setCreateTime(new Date()); |
| processDefinition.setUpdateTime(new Date()); |
| Map<String, Object> createDagResult = createDagDefine(loginUser, taskRelationLogList, processDefinition, |
| Lists.newArrayList(), otherParamsJson); |
| if (Status.SUCCESS.equals(createDagResult.get(Constants.STATUS))) { |
| putMsg(createDagResult, Status.SUCCESS); |
| } else { |
| result.putAll(createDagResult); |
| log.error("Import process definition error, projectCode:{}, processDefinitionCode:{}.", projectCode, |
| processDefinition.getCode()); |
| throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR); |
| } |
| |
| Schedule schedule = dagDataSchedule.getSchedule(); |
| if (null != schedule) { |
| ProcessDefinition newProcessDefinition = processDefinitionMapper.queryByCode(processDefinition.getCode()); |
| schedule.setProcessDefinitionCode(newProcessDefinition.getCode()); |
| schedule.setId(null); |
| schedule.setUserId(loginUser.getId()); |
| schedule.setCreateTime(now); |
| schedule.setUpdateTime(now); |
| int scheduleInsert = scheduleMapper.insert(schedule); |
| if (0 == scheduleInsert) { |
| log.error( |
| "Import process definition error due to save schedule fail, projectCode:{}, processDefinitionCode:{}.", |
| projectCode, processDefinition.getCode()); |
| putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); |
| throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR); |
| } |
| } |
| |
| log.info("Import process definition complete, projectCode:{}, processDefinitionCode:{}.", projectCode, |
| processDefinition.getCode()); |
| return true; |
| } |
| |
| /** |
| * check importance params |
| */ |
| private boolean checkImportanceParams(DagDataSchedule dagDataSchedule, Map<String, Object> result) { |
| if (dagDataSchedule.getProcessDefinition() == null) { |
| log.warn("Process definition is null."); |
| putMsg(result, Status.DATA_IS_NULL, "ProcessDefinition"); |
| return false; |
| } |
| if (CollectionUtils.isEmpty(dagDataSchedule.getTaskDefinitionList())) { |
| log.warn("Task definition list is null."); |
| putMsg(result, Status.DATA_IS_NULL, "TaskDefinitionList"); |
| return false; |
| } |
| if (CollectionUtils.isEmpty(dagDataSchedule.getProcessTaskRelationList())) { |
| log.warn("Process task relation list is null."); |
| putMsg(result, Status.DATA_IS_NULL, "ProcessTaskRelationList"); |
| return false; |
| } |
| return true; |
| } |
| |
| private String recursionProcessDefinitionName(long projectCode, String processDefinitionName, int num) { |
| ProcessDefinition processDefinition = |
| processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName); |
| if (processDefinition != null) { |
| if (num > 1) { |
| String str = processDefinitionName.substring(0, processDefinitionName.length() - 3); |
| processDefinitionName = str + "(" + num + ")"; |
| } else { |
| processDefinitionName = processDefinition.getName() + "(" + num + ")"; |
| } |
| } else { |
| return processDefinitionName; |
| } |
| return recursionProcessDefinitionName(projectCode, processDefinitionName, num + 1); |
| } |
| |
| /** |
| * check the process task relation json |
| * |
| * @param processTaskRelationJson process task relation json |
| * @return check result code |
| */ |
| @Override |
| public Map<String, Object> checkProcessNodeList(String processTaskRelationJson, |
| List<TaskDefinitionLog> taskDefinitionLogsList) { |
| Map<String, Object> result = new HashMap<>(); |
| try { |
| if (processTaskRelationJson == null) { |
| log.error("Process task relation data is null."); |
| putMsg(result, Status.DATA_IS_NOT_VALID, processTaskRelationJson); |
| return result; |
| } |
| |
| List<ProcessTaskRelation> taskRelationList = |
| JSONUtils.toList(processTaskRelationJson, ProcessTaskRelation.class); |
| // Check whether the task node is normal |
| List<TaskNode> taskNodes = processService.transformTask(taskRelationList, taskDefinitionLogsList); |
| |
| if (CollectionUtils.isEmpty(taskNodes)) { |
| log.error("Task node data is empty."); |
| putMsg(result, Status.PROCESS_DAG_IS_EMPTY); |
| return result; |
| } |
| |
| // check has cycle |
| if (graphHasCycle(taskNodes)) { |
| log.error("Process DAG has cycle."); |
| putMsg(result, Status.PROCESS_NODE_HAS_CYCLE); |
| return result; |
| } |
| |
| // check whether the process definition json is normal |
| for (TaskNode taskNode : taskNodes) { |
| if (!taskPluginManager.checkTaskParameters(ParametersNode.builder() |
| .taskType(taskNode.getType()) |
| .taskParams(taskNode.getTaskParams()) |
| .dependence(taskNode.getDependence()) |
| .switchResult(taskNode.getSwitchResult()) |
| .build())) { |
| log.error("Task node {} parameter invalid.", taskNode.getName()); |
| putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName()); |
| return result; |
| } |
| |
| // check extra params |
| CheckUtils.checkOtherParams(taskNode.getExtras()); |
| } |
| putMsg(result, Status.SUCCESS); |
| } catch (Exception e) { |
| result.put(Constants.STATUS, Status.INTERNAL_SERVER_ERROR_ARGS); |
| putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, e.getMessage()); |
| log.error(Status.INTERNAL_SERVER_ERROR_ARGS.getMsg(), e); |
| } |
| return result; |
| } |
| |
| /** |
| * get task node details based on process definition |
| * |
| * @param loginUser loginUser |
| * @param projectCode project code |
| * @param code process definition code |
| * @return task node list |
| */ |
| @Override |
| public Map<String, Object> getTaskNodeListByDefinitionCode(User loginUser, long projectCode, long code) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); |
| if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { |
| log.error("Process definition does not exist, processDefinitionCode:{}.", code); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); |
| return result; |
| } |
| DagData dagData = processService.genDagData(processDefinition); |
| result.put(Constants.DATA_LIST, dagData.getTaskDefinitionList()); |
| putMsg(result, Status.SUCCESS); |
| |
| return result; |
| } |
| |
| /** |
| * get task node details map based on process definition |
| * |
| * @param loginUser loginUser |
| * @param projectCode project code |
| * @param codes define codes |
| * @return task node list |
| */ |
| @Override |
| public Map<String, Object> getNodeListMapByDefinitionCodes(User loginUser, long projectCode, String codes) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| |
| Set<Long> defineCodeSet = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong) |
| .collect(Collectors.toSet()); |
| List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet); |
| if (CollectionUtils.isEmpty(processDefinitionList)) { |
| log.error("Process definitions do not exist, codes:{}.", defineCodeSet); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes); |
| return result; |
| } |
| HashMap<Long, Project> userProjects = new HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE); |
| projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId()) |
| .forEach(userProject -> userProjects.put(userProject.getCode(), userProject)); |
| |
| // check processDefinition exist in project |
| List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream() |
| .filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList()); |
| if (CollectionUtils.isEmpty(processDefinitionListInProject)) { |
| Set<Long> codesInProject = processDefinitionListInProject.stream() |
| .map(ProcessDefinition::getCode).collect(Collectors.toSet()); |
| log.error("Process definitions do not exist in project, projectCode:{}, processDefinitionsCodes:{}.", |
| processDefinitionListInProject.get(0).getProjectCode(), codesInProject); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes); |
| return result; |
| } |
| Map<Long, List<TaskDefinition>> taskNodeMap = new HashMap<>(); |
| for (ProcessDefinition processDefinition : processDefinitionListInProject) { |
| DagData dagData = processService.genDagData(processDefinition); |
| taskNodeMap.put(processDefinition.getCode(), dagData.getTaskDefinitionList()); |
| } |
| |
| result.put(Constants.DATA_LIST, taskNodeMap); |
| putMsg(result, Status.SUCCESS); |
| |
| return result; |
| |
| } |
| |
| /** |
| * query process definition all by project code |
| * |
| * @param loginUser loginUser |
| * @param projectCode project code |
| * @return process definitions in the project |
| */ |
| @Override |
| public Map<String, Object> queryAllProcessDefinitionByProjectCode(User loginUser, long projectCode) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| List<ProcessDefinition> processDefinitions = processDefinitionMapper.queryAllDefinitionList(projectCode); |
| List<DagData> dagDataList = |
| processDefinitions.stream().map(processService::genDagData).collect(Collectors.toList()); |
| result.put(Constants.DATA_LIST, dagDataList); |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * query process definition list by project code |
| * |
| * @param projectCode project code |
| * @return process definition list in the project |
| */ |
| @Override |
| public Map<String, Object> queryProcessDefinitionListByProjectCode(long projectCode) { |
| Map<String, Object> result = new HashMap<>(); |
| List<DependentSimplifyDefinition> processDefinitions = |
| processDefinitionMapper.queryDefinitionListByProjectCodeAndProcessDefinitionCodes(projectCode, null); |
| result.put(Constants.DATA_LIST, processDefinitions); |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * query process definition list by process definition code |
| * |
| * @param projectCode project code |
| * @param processDefinitionCode process definition code |
| * @return task definition list in the process definition |
| */ |
| @Override |
| public Map<String, Object> queryTaskDefinitionListByProcessDefinitionCode(long projectCode, |
| Long processDefinitionCode) { |
| Map<String, Object> result = new HashMap<>(); |
| |
| Set<Long> definitionCodesSet = new HashSet<>(); |
| definitionCodesSet.add(processDefinitionCode); |
| List<DependentSimplifyDefinition> processDefinitions = processDefinitionMapper |
| .queryDefinitionListByProjectCodeAndProcessDefinitionCodes(projectCode, definitionCodesSet); |
| |
| // query process task relation |
| List<ProcessTaskRelation> processTaskRelations = |
| processTaskRelationMapper.queryProcessTaskRelationsByProcessDefinitionCode( |
| processDefinitions.get(0).getCode(), |
| processDefinitions.get(0).getVersion()); |
| |
| // query task definition log |
| List<TaskDefinitionLog> taskDefinitionLogsList = |
| taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations); |
| |
| List<DependentSimplifyDefinition> taskDefinitionList = new ArrayList<>(); |
| for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogsList) { |
| DependentSimplifyDefinition dependentSimplifyDefinition = new DependentSimplifyDefinition(); |
| dependentSimplifyDefinition.setCode(taskDefinitionLog.getCode()); |
| dependentSimplifyDefinition.setName(taskDefinitionLog.getName()); |
| dependentSimplifyDefinition.setVersion(taskDefinitionLog.getVersion()); |
| taskDefinitionList.add(dependentSimplifyDefinition); |
| } |
| |
| result.put(Constants.DATA_LIST, taskDefinitionList); |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * Encapsulates the TreeView structure |
| * |
| * @param projectCode project code |
| * @param code process definition code |
| * @param limit limit |
| * @return tree view json data |
| */ |
| @Override |
| public Map<String, Object> viewTree(User loginUser, long projectCode, long code, Integer limit) { |
| Map<String, Object> result = new HashMap<>(); |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_TREE_VIEW); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); |
| if (null == processDefinition || projectCode != processDefinition.getProjectCode()) { |
| log.error("Process definition does not exist, code:{}.", code); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); |
| return result; |
| } |
| DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition); |
| // nodes that are running |
| Map<String, List<TreeViewDto>> runningNodeMap = new ConcurrentHashMap<>(); |
| |
| // nodes that are waiting to run |
| Map<String, List<TreeViewDto>> waitingRunningNodeMap = new ConcurrentHashMap<>(); |
| |
| // List of process instances |
| List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit); |
| processInstanceList.forEach(processInstance -> processInstance |
| .setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()))); |
| List<TaskDefinitionLog> taskDefinitionList = taskDefinitionLogDao.getTaskDefineLogList(processTaskRelationMapper |
| .queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode())); |
| Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream() |
| .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); |
| |
| if (limit < 0) { |
| putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR); |
| return result; |
| } |
| if (limit > processInstanceList.size()) { |
| limit = processInstanceList.size(); |
| } |
| |
| TreeViewDto parentTreeViewDto = new TreeViewDto(); |
| parentTreeViewDto.setName("DAG"); |
| parentTreeViewDto.setType(""); |
| parentTreeViewDto.setCode(0L); |
| // Specify the process definition, because it is a TreeView for a process definition |
| for (int i = limit - 1; i >= 0; i--) { |
| ProcessInstance processInstance = processInstanceList.get(i); |
| Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime(); |
| parentTreeViewDto.getInstances() |
| .add(new Instance(processInstance.getId(), processInstance.getName(), |
| processInstance.getProcessDefinitionCode(), |
| "", processInstance.getState().name(), processInstance.getStartTime(), endTime, |
| processInstance.getHost(), |
| DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); |
| } |
| |
| List<TreeViewDto> parentTreeViewDtoList = new ArrayList<>(); |
| parentTreeViewDtoList.add(parentTreeViewDto); |
| // Here is the encapsulation task instance |
| for (String startNode : dag.getBeginNode()) { |
| runningNodeMap.put(startNode, parentTreeViewDtoList); |
| } |
| |
| while (!ServerLifeCycleManager.isStopped()) { |
| Set<String> postNodeList; |
| Iterator<Map.Entry<String, List<TreeViewDto>>> iter = runningNodeMap.entrySet().iterator(); |
| while (iter.hasNext()) { |
| Map.Entry<String, List<TreeViewDto>> en = iter.next(); |
| String nodeCode = en.getKey(); |
| parentTreeViewDtoList = en.getValue(); |
| |
| TreeViewDto treeViewDto = new TreeViewDto(); |
| TaskNode taskNode = dag.getNode(nodeCode); |
| treeViewDto.setType(taskNode.getType()); |
| treeViewDto.setCode(taskNode.getCode()); |
| treeViewDto.setName(taskNode.getName()); |
| // set treeViewDto instances |
| for (int i = limit - 1; i >= 0; i--) { |
| ProcessInstance processInstance = processInstanceList.get(i); |
| TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(), |
| Long.parseLong(nodeCode)); |
| if (taskInstance == null) { |
| treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null")); |
| } else { |
| Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); |
| Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); |
| |
| long subProcessCode = 0L; |
| // if process is sub process, the return sub id, or sub id=0 |
| if (taskInstance.isSubProcess()) { |
| TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode()); |
| subProcessCode = Long.parseLong(JSONUtils.parseObject( |
| taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText()); |
| } |
| treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), |
| taskInstance.getTaskCode(), |
| taskInstance.getTaskType(), taskInstance.getState().name(), |
| taskInstance.getStartTime(), taskInstance.getEndTime(), |
| taskInstance.getHost(), |
| DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode)); |
| } |
| } |
| for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) { |
| pTreeViewDto.getChildren().add(treeViewDto); |
| } |
| postNodeList = dag.getSubsequentNodes(nodeCode); |
| if (CollectionUtils.isNotEmpty(postNodeList)) { |
| for (String nextNodeCode : postNodeList) { |
| List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeCode); |
| if (CollectionUtils.isEmpty(treeViewDtoList)) { |
| treeViewDtoList = new ArrayList<>(); |
| } |
| treeViewDtoList.add(treeViewDto); |
| waitingRunningNodeMap.put(nextNodeCode, treeViewDtoList); |
| } |
| } |
| runningNodeMap.remove(nodeCode); |
| } |
| if (waitingRunningNodeMap.size() == 0) { |
| break; |
| } else { |
| runningNodeMap.putAll(waitingRunningNodeMap); |
| waitingRunningNodeMap.clear(); |
| } |
| } |
| result.put(Constants.DATA_LIST, parentTreeViewDto); |
| result.put(Constants.STATUS, Status.SUCCESS); |
| result.put(Constants.MSG, Status.SUCCESS.getMsg()); |
| return result; |
| } |
| |
| /** |
| * whether the graph has a ring |
| * |
| * @param taskNodeResponseList task node response list |
| * @return if graph has cycle flag |
| */ |
| private boolean graphHasCycle(List<TaskNode> taskNodeResponseList) { |
| DAG<String, TaskNode, String> graph = new DAG<>(); |
| // Fill the vertices |
| for (TaskNode taskNodeResponse : taskNodeResponseList) { |
| graph.addNode(Long.toString(taskNodeResponse.getCode()), taskNodeResponse); |
| } |
| // Fill edge relations |
| for (TaskNode taskNodeResponse : taskNodeResponseList) { |
| List<String> preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(), String.class); |
| if (CollectionUtils.isNotEmpty(preTasks)) { |
| for (String preTask : preTasks) { |
| if (!graph.addEdge(preTask, Long.toString(taskNodeResponse.getCode()))) { |
| return true; |
| } |
| } |
| } |
| } |
| return graph.hasCycle(); |
| } |
| |
| /** |
| * batch copy process definition |
| * |
| * @param loginUser loginUser |
| * @param projectCode projectCode |
| * @param codes processDefinitionCodes |
| * @param targetProjectCode targetProjectCode |
| */ |
| @Override |
| @Transactional |
| public Map<String, Object> batchCopyProcessDefinition(User loginUser, |
| long projectCode, |
| String codes, |
| long targetProjectCode) { |
| Map<String, Object> result = checkParams(loginUser, projectCode, codes, targetProjectCode, WORKFLOW_BATCH_COPY); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| List<String> failedProcessList = new ArrayList<>(); |
| doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, codes, result, true); |
| checkBatchOperateResult(projectCode, targetProjectCode, result, failedProcessList, true); |
| return result; |
| } |
| |
| /** |
| * batch move process definition |
| * Will be deleted |
| * @param loginUser loginUser |
| * @param projectCode projectCode |
| * @param codes processDefinitionCodes |
| * @param targetProjectCode targetProjectCode |
| */ |
| @Override |
| @Transactional |
| public Map<String, Object> batchMoveProcessDefinition(User loginUser, |
| long projectCode, |
| String codes, |
| long targetProjectCode) { |
| Map<String, Object> result = |
| checkParams(loginUser, projectCode, codes, targetProjectCode, TASK_DEFINITION_MOVE); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| if (projectCode == targetProjectCode) { |
| log.warn("Project code is same as target project code, projectCode:{}.", projectCode); |
| return result; |
| } |
| |
| List<String> failedProcessList = new ArrayList<>(); |
| doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, codes, result, false); |
| checkBatchOperateResult(projectCode, targetProjectCode, result, failedProcessList, false); |
| return result; |
| } |
| |
| private Map<String, Object> checkParams(User loginUser, |
| long projectCode, |
| String processDefinitionCodes, |
| long targetProjectCode, String perm) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, perm); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| |
| if (StringUtils.isEmpty(processDefinitionCodes)) { |
| log.error("Parameter processDefinitionCodes is empty, projectCode is {}.", projectCode); |
| putMsg(result, Status.PROCESS_DEFINITION_CODES_IS_EMPTY, processDefinitionCodes); |
| return result; |
| } |
| |
| if (projectCode != targetProjectCode) { |
| Project targetProject = projectMapper.queryByCode(targetProjectCode); |
| // check user access for project |
| Map<String, Object> targetResult = |
| projectService.checkProjectAndAuth(loginUser, targetProject, targetProjectCode, perm); |
| if (targetResult.get(Constants.STATUS) != Status.SUCCESS) { |
| return targetResult; |
| } |
| } |
| return result; |
| } |
| |
| protected void doBatchOperateProcessDefinition(User loginUser, |
| long targetProjectCode, |
| List<String> failedProcessList, |
| String processDefinitionCodes, |
| Map<String, Object> result, |
| boolean isCopy) { |
| Set<Long> definitionCodes = Arrays.stream(processDefinitionCodes.split(Constants.COMMA)).map(Long::parseLong) |
| .collect(Collectors.toSet()); |
| List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(definitionCodes); |
| Set<Long> queryCodes = |
| processDefinitionList.stream().map(ProcessDefinition::getCode).collect(Collectors.toSet()); |
| // definitionCodes - queryCodes |
| Set<Long> diffCode = |
| definitionCodes.stream().filter(code -> !queryCodes.contains(code)).collect(Collectors.toSet()); |
| diffCode.forEach(code -> failedProcessList.add(code + "[null]")); |
| for (ProcessDefinition processDefinition : processDefinitionList) { |
| List<ProcessTaskRelation> processTaskRelations = |
| processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), |
| processDefinition.getCode()); |
| List<ProcessTaskRelationLog> taskRelationList = |
| processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); |
| processDefinition.setProjectCode(targetProjectCode); |
| String otherParamsJson = doOtherOperateProcess(loginUser, processDefinition); |
| if (isCopy) { |
| log.info("Copy process definition..."); |
| List<TaskDefinitionLog> taskDefinitionLogs = |
| taskDefinitionLogDao.getTaskDefineLogList(processTaskRelations); |
| Map<Long, Long> taskCodeMap = new HashMap<>(); |
| for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { |
| try { |
| long taskCode = CodeGenerateUtils.getInstance().genCode(); |
| taskCodeMap.put(taskDefinitionLog.getCode(), taskCode); |
| taskDefinitionLog.setCode(taskCode); |
| } catch (CodeGenerateException e) { |
| log.error("Generate task definition code error, projectCode:{}.", targetProjectCode, e); |
| putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS); |
| throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS); |
| } |
| taskDefinitionLog.setProjectCode(targetProjectCode); |
| taskDefinitionLog.setVersion(0); |
| taskDefinitionLog.setName(taskDefinitionLog.getName()); |
| } |
| for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) { |
| if (processTaskRelationLog.getPreTaskCode() > 0) { |
| processTaskRelationLog.setPreTaskCode(taskCodeMap.get(processTaskRelationLog.getPreTaskCode())); |
| } |
| if (processTaskRelationLog.getPostTaskCode() > 0) { |
| processTaskRelationLog |
| .setPostTaskCode(taskCodeMap.get(processTaskRelationLog.getPostTaskCode())); |
| } |
| } |
| final long oldProcessDefinitionCode = processDefinition.getCode(); |
| try { |
| processDefinition.setCode(CodeGenerateUtils.getInstance().genCode()); |
| } catch (CodeGenerateException e) { |
| log.error("Generate process definition code error, projectCode:{}.", targetProjectCode, e); |
| putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS); |
| throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS); |
| } |
| processDefinition.setId(null); |
| processDefinition.setUserId(loginUser.getId()); |
| processDefinition.setName(getNewName(processDefinition.getName(), COPY_SUFFIX)); |
| final Date date = new Date(); |
| processDefinition.setCreateTime(date); |
| processDefinition.setUpdateTime(date); |
| processDefinition.setReleaseState(ReleaseState.OFFLINE); |
| if (StringUtils.isNotBlank(processDefinition.getLocations())) { |
| ArrayNode jsonNodes = JSONUtils.parseArray(processDefinition.getLocations()); |
| for (int i = 0; i < jsonNodes.size(); i++) { |
| ObjectNode node = (ObjectNode) jsonNodes.path(i); |
| node.put("taskCode", taskCodeMap.get(node.get("taskCode").asLong())); |
| jsonNodes.set(i, node); |
| } |
| processDefinition.setLocations(JSONUtils.toJsonString(jsonNodes)); |
| } |
| // copy timing configuration |
| Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(oldProcessDefinitionCode); |
| if (scheduleObj != null) { |
| scheduleObj.setId(null); |
| scheduleObj.setUserId(loginUser.getId()); |
| scheduleObj.setProcessDefinitionCode(processDefinition.getCode()); |
| scheduleObj.setReleaseState(ReleaseState.OFFLINE); |
| scheduleObj.setCreateTime(date); |
| scheduleObj.setUpdateTime(date); |
| int insertResult = scheduleMapper.insert(scheduleObj); |
| if (insertResult != 1) { |
| log.error("Schedule create error, processDefinitionCode:{}.", processDefinition.getCode()); |
| putMsg(result, Status.CREATE_SCHEDULE_ERROR); |
| throw new ServiceException(Status.CREATE_SCHEDULE_ERROR); |
| } |
| } |
| try { |
| result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs, |
| otherParamsJson)); |
| } catch (Exception e) { |
| log.error("Copy process definition error, processDefinitionCode from {} to {}.", |
| oldProcessDefinitionCode, processDefinition.getCode(), e); |
| putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR); |
| throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR); |
| } |
| } else { |
| log.info("Move process definition..."); |
| try { |
| result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null, |
| Lists.newArrayList(), otherParamsJson)); |
| } catch (Exception e) { |
| log.error("Move process definition error, processDefinitionCode:{}.", |
| processDefinition.getCode(), e); |
| putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR); |
| throw new ServiceException(Status.MOVE_PROCESS_DEFINITION_ERROR); |
| } |
| } |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| failedProcessList.add(processDefinition.getCode() + "[" + processDefinition.getName() + "]"); |
| } |
| } |
| } |
| |
| /** |
| * get new Task name or Process name when copy or import operate |
| * @param originalName Task or Process original name |
| * @param suffix "_copy_" or "_import_" |
| * @return new name |
| */ |
| public String getNewName(String originalName, String suffix) { |
| StringBuilder newName = new StringBuilder(); |
| String regex = String.format(".*%s\\d{17}$", suffix); |
| if (originalName.matches(regex)) { |
| // replace timestamp of originalName |
| return newName.append(originalName, 0, originalName.lastIndexOf(suffix)) |
| .append(suffix) |
| .append(DateUtils.getCurrentTimeStamp()) |
| .toString(); |
| } |
| return newName.append(originalName) |
| .append(suffix) |
| .append(DateUtils.getCurrentTimeStamp()) |
| .toString(); |
| } |
| |
| /** |
| * switch the defined process definition version |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param code process definition code |
| * @param version the version user want to switch |
| * @return switch process definition version result code |
| */ |
| @Override |
| @Transactional |
| public Map<String, Object> switchProcessDefinitionVersion(User loginUser, long projectCode, long code, |
| int version) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_SWITCH_TO_THIS_VERSION); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); |
| if (Objects.isNull(processDefinition) || projectCode != processDefinition.getProjectCode()) { |
| log.error( |
| "Switch process definition error because it does not exist, projectCode:{}, processDefinitionCode:{}.", |
| projectCode, code); |
| putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR, code); |
| return result; |
| } |
| |
| ProcessDefinitionLog processDefinitionLog = |
| processDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version); |
| if (Objects.isNull(processDefinitionLog)) { |
| log.error( |
| "Switch process definition error because version does not exist, projectCode:{}, processDefinitionCode:{}, version:{}.", |
| projectCode, code, version); |
| putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR, |
| processDefinition.getCode(), version); |
| return result; |
| } |
| int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog); |
| if (switchVersion <= 0) { |
| log.error( |
| "Switch process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", |
| projectCode, code, version); |
| putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); |
| throw new ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); |
| } |
| log.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", |
| projectCode, code, version); |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * check batch operate result |
| * |
| * @param srcProjectCode srcProjectCode |
| * @param targetProjectCode targetProjectCode |
| * @param result result |
| * @param failedProcessList failedProcessList |
| * @param isCopy isCopy |
| */ |
| private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode, |
| Map<String, Object> result, List<String> failedProcessList, boolean isCopy) { |
| if (!failedProcessList.isEmpty()) { |
| String failedProcess = String.join(",", failedProcessList); |
| if (isCopy) { |
| log.error( |
| "Copy process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.", |
| srcProjectCode, targetProjectCode, failedProcess); |
| putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess); |
| } else { |
| log.error( |
| "Move process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.", |
| srcProjectCode, targetProjectCode, failedProcess); |
| putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess); |
| } |
| } else { |
| log.info("Batch {} process definition complete, srcProjectCode:{}, targetProjectCode:{}.", |
| isCopy ? "copy" : "move", srcProjectCode, targetProjectCode); |
| putMsg(result, Status.SUCCESS); |
| } |
| } |
| |
| /** |
| * query the pagination versions info by one certain process definition code |
| * |
| * @param loginUser login user info to check auth |
| * @param projectCode project code |
| * @param pageNo page number |
| * @param pageSize page size |
| * @param code process definition code |
| * @return the pagination process definition versions info of the certain process definition |
| */ |
| @Override |
| public Result queryProcessDefinitionVersions(User loginUser, long projectCode, int pageNo, int pageSize, |
| long code) { |
| Result result = new Result(); |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> checkResult = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, VERSION_LIST); |
| Status resultStatus = (Status) checkResult.get(Constants.STATUS); |
| if (resultStatus != Status.SUCCESS) { |
| putMsg(result, resultStatus); |
| return result; |
| } |
| PageInfo<ProcessDefinitionLog> pageInfo = new PageInfo<>(pageNo, pageSize); |
| Page<ProcessDefinitionLog> page = new Page<>(pageNo, pageSize); |
| IPage<ProcessDefinitionLog> processDefinitionVersionsPaging = |
| processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, code, projectCode); |
| List<ProcessDefinitionLog> processDefinitionLogs = processDefinitionVersionsPaging.getRecords(); |
| |
| pageInfo.setTotalList(processDefinitionLogs); |
| pageInfo.setTotal((int) processDefinitionVersionsPaging.getTotal()); |
| result.setData(pageInfo); |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * delete one certain process definition by version number and process definition code |
| * |
| * @param loginUser login user info to check auth |
| * @param projectCode project code |
| * @param code process definition code |
| * @param version version number |
| * @return delete result code |
| */ |
| @Override |
| @Transactional |
| public Map<String, Object> deleteProcessDefinitionVersion(User loginUser, long projectCode, long code, |
| int version) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check if user have write perm for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, VERSION_DELETE); |
| boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result); |
| if (!hasProjectAndWritePerm) { |
| return result; |
| } |
| |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); |
| |
| if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { |
| log.error("Process definition does not exist, code:{}.", code); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); |
| } else { |
| if (processDefinition.getVersion() == version) { |
| log.warn( |
| "Process definition can not be deleted due to version is being used, projectCode:{}, processDefinitionCode:{}, version:{}.", |
| projectCode, code, version); |
| putMsg(result, Status.MAIN_TABLE_USING_VERSION); |
| return result; |
| } |
| int deleteLog = processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(code, version); |
| int deleteRelationLog = processTaskRelationLogMapper.deleteByCode(code, version); |
| if (deleteLog == 0 || deleteRelationLog == 0) { |
| log.error( |
| "Delete process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", |
| projectCode, code, version); |
| putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); |
| throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); |
| } |
| deleteOtherRelation(project, result, processDefinition); |
| log.info( |
| "Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", |
| projectCode, code, version); |
| putMsg(result, Status.SUCCESS); |
| } |
| return result; |
| } |
| |
| /** |
| * create empty process definition |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param name process definition name |
| * @param description description |
| * @param globalParams globalParams |
| * @param timeout timeout |
| * @param scheduleJson scheduleJson |
| * @return process definition code |
| */ |
| @Override |
| @Transactional |
| public Map<String, Object> createEmptyProcessDefinition(User loginUser, |
| long projectCode, |
| String name, |
| String description, |
| String globalParams, |
| int timeout, |
| String scheduleJson, |
| ProcessExecutionTypeEnum executionType) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check if user have write perm for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_CREATE); |
| boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result); |
| if (!hasProjectAndWritePerm) { |
| return result; |
| } |
| if (checkDescriptionLength(description)) { |
| log.warn("Parameter description is too long."); |
| putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR); |
| return result; |
| } |
| // check whether the new process define name exist |
| ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name); |
| if (definition != null) { |
| log.warn("Process definition with the same name {} already exists, processDefinitionCode:{}.", |
| definition.getName(), definition.getCode()); |
| putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name); |
| return result; |
| } |
| |
| long processDefinitionCode; |
| try { |
| processDefinitionCode = CodeGenerateUtils.getInstance().genCode(); |
| } catch (CodeGenerateException e) { |
| log.error("Generate process definition code error, projectCode:{}.", projectCode, e); |
| putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS); |
| return result; |
| } |
| ProcessDefinition processDefinition = |
| new ProcessDefinition(projectCode, name, processDefinitionCode, description, |
| globalParams, "", timeout, loginUser.getId()); |
| processDefinition.setExecutionType(executionType); |
| result = createEmptyDagDefine(loginUser, processDefinition); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| log.error("Create empty process definition error."); |
| return result; |
| } |
| |
| if (StringUtils.isBlank(scheduleJson)) { |
| return result; |
| } |
| |
| // save dag schedule |
| Map<String, Object> scheduleResult = createDagSchedule(loginUser, processDefinition, scheduleJson); |
| if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) { |
| Status scheduleResultStatus = (Status) scheduleResult.get(Constants.STATUS); |
| putMsg(result, scheduleResultStatus); |
| throw new ServiceException(scheduleResultStatus); |
| } |
| return result; |
| } |
| |
| protected Map<String, Object> createEmptyDagDefine(User loginUser, ProcessDefinition processDefinition) { |
| Map<String, Object> result = new HashMap<>(); |
| int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); |
| if (insertVersion == 0) { |
| log.error("Save process definition error, processDefinitionCode:{}.", processDefinition.getCode()); |
| putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); |
| throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); |
| } |
| putMsg(result, Status.SUCCESS); |
| result.put(Constants.DATA_LIST, processDefinition); |
| return result; |
| } |
| |
| protected Map<String, Object> createDagSchedule(User loginUser, ProcessDefinition processDefinition, |
| String scheduleJson) { |
| Map<String, Object> result = new HashMap<>(); |
| Schedule scheduleObj = JSONUtils.parseObject(scheduleJson, Schedule.class); |
| if (scheduleObj == null) { |
| putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson); |
| throw new ServiceException(Status.DATA_IS_NOT_VALID); |
| } |
| Date now = new Date(); |
| scheduleObj.setProcessDefinitionCode(processDefinition.getCode()); |
| if (DateUtils.differSec(scheduleObj.getStartTime(), scheduleObj.getEndTime()) == 0) { |
| log.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.", |
| processDefinition.getCode()); |
| putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME); |
| return result; |
| } |
| if (!CronUtils.isValidExpression(scheduleObj.getCrontab())) { |
| log.error("CronExpression verify failure, cron:{}.", scheduleObj.getCrontab()); |
| putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleObj.getCrontab()); |
| return result; |
| } |
| scheduleObj |
| .setWarningType(scheduleObj.getWarningType() == null ? WarningType.NONE : scheduleObj.getWarningType()); |
| scheduleObj.setWarningGroupId(scheduleObj.getWarningGroupId() == 0 ? 1 : scheduleObj.getWarningGroupId()); |
| scheduleObj.setFailureStrategy( |
| scheduleObj.getFailureStrategy() == null ? FailureStrategy.CONTINUE : scheduleObj.getFailureStrategy()); |
| scheduleObj.setCreateTime(now); |
| scheduleObj.setUpdateTime(now); |
| scheduleObj.setUserId(loginUser.getId()); |
| scheduleObj.setReleaseState(ReleaseState.OFFLINE); |
| scheduleObj.setProcessInstancePriority(scheduleObj.getProcessInstancePriority() == null ? Priority.MEDIUM |
| : scheduleObj.getProcessInstancePriority()); |
| scheduleObj.setWorkerGroup(scheduleObj.getWorkerGroup() == null ? "default" : scheduleObj.getWorkerGroup()); |
| scheduleObj |
| .setEnvironmentCode(scheduleObj.getEnvironmentCode() == null ? -1 : scheduleObj.getEnvironmentCode()); |
| scheduleMapper.insert(scheduleObj); |
| |
| putMsg(result, Status.SUCCESS); |
| result.put("scheduleId", scheduleObj.getId()); |
| return result; |
| } |
| |
| /** |
| * update process definition basic info |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param name process definition name |
| * @param code process definition code |
| * @param description description |
| * @param globalParams globalParams |
| * @param timeout timeout |
| * @param scheduleJson scheduleJson |
| * @param otherParamsJson otherParamsJson handle other params |
| * @param executionType executionType |
| * @return update result code |
| */ |
| @Override |
| @Transactional |
| public Map<String, Object> updateProcessDefinitionBasicInfo(User loginUser, |
| long projectCode, |
| String name, |
| long code, |
| String description, |
| String globalParams, |
| int timeout, |
| String scheduleJson, |
| String otherParamsJson, |
| ProcessExecutionTypeEnum executionType) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check if user have write perm for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_UPDATE); |
| boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result); |
| if (!hasProjectAndWritePerm) { |
| return result; |
| } |
| if (checkDescriptionLength(description)) { |
| log.warn("Parameter description is too long."); |
| putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR); |
| return result; |
| } |
| |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); |
| // check process definition exists |
| if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { |
| log.error("Process definition does not exist, code:{}.", code); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); |
| return result; |
| } |
| if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { |
| // online can not permit edit |
| log.warn("Process definition is not allowed to be modified due to {}, processDefinitionCode:{}.", |
| ReleaseState.ONLINE.getDescp(), processDefinition.getCode()); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName()); |
| return result; |
| } |
| if (!name.equals(processDefinition.getName())) { |
| // check whether the new process define name exist |
| ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name); |
| if (definition != null) { |
| log.warn("Process definition with the same name {} already exists, processDefinitionCode:{}.", |
| definition.getName(), definition.getCode()); |
| putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name); |
| return result; |
| } |
| } |
| ProcessDefinition processDefinitionDeepCopy = |
| JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class); |
| processDefinition.set(projectCode, name, description, globalParams, "", timeout); |
| processDefinition.setExecutionType(executionType); |
| List<ProcessTaskRelationLog> taskRelationList = processTaskRelationLogMapper |
| .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); |
| result = updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, |
| Lists.newArrayList(), otherParamsJson); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| log.error("Update process definition basic info error."); |
| return result; |
| } |
| |
| if (StringUtils.isBlank(scheduleJson)) { |
| return result; |
| } |
| // update dag schedule |
| Map<String, Object> scheduleResult = updateDagSchedule(loginUser, projectCode, code, scheduleJson); |
| if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) { |
| Status scheduleResultStatus = (Status) scheduleResult.get(Constants.STATUS); |
| putMsg(result, scheduleResultStatus); |
| throw new ServiceException(scheduleResultStatus); |
| } |
| return result; |
| } |
| |
| private void updateWorkflowValid(User user, ProcessDefinition oldProcessDefinition, |
| ProcessDefinition newProcessDefinition) { |
| // online can not permit edit |
| if (oldProcessDefinition.getReleaseState() == ReleaseState.ONLINE) { |
| throw new ServiceException(Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, oldProcessDefinition.getName()); |
| } |
| |
| Project project = projectMapper.queryByCode(oldProcessDefinition.getProjectCode()); |
| // check user access for project |
| projectService.checkProjectAndAuthThrowException(user, project, WORKFLOW_UPDATE); |
| |
| if (checkDescriptionLength(newProcessDefinition.getDescription())) { |
| throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR); |
| } |
| |
| // check whether the new process define name exist |
| if (!oldProcessDefinition.getName().equals(newProcessDefinition.getName())) { |
| ProcessDefinition definition = processDefinitionMapper |
| .verifyByDefineName(newProcessDefinition.getProjectCode(), newProcessDefinition.getName()); |
| if (definition != null) { |
| throw new ServiceException(Status.PROCESS_DEFINITION_NAME_EXIST, newProcessDefinition.getName()); |
| } |
| } |
| } |
| |
| /** |
| * update single resource workflow |
| * |
| * @param loginUser login user |
| * @param workflowCode workflow resource code want to update |
| * @param workflowUpdateRequest workflow update resource object |
| * @return Process definition |
| */ |
| @Override |
| @Transactional |
| public ProcessDefinition updateSingleProcessDefinition(User loginUser, |
| long workflowCode, |
| WorkflowUpdateRequest workflowUpdateRequest) { |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(workflowCode); |
| // check process definition exists |
| if (processDefinition == null) { |
| throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, workflowCode); |
| } |
| |
| ProcessDefinition processDefinitionUpdate = workflowUpdateRequest.mergeIntoProcessDefinition(processDefinition); |
| this.updateWorkflowValid(loginUser, processDefinition, processDefinitionUpdate); |
| |
| int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate); |
| if (insertVersion == 0) { |
| log.error("Update process definition error, projectCode:{}, processDefinitionName:{}.", |
| processDefinitionUpdate.getCode(), |
| processDefinitionUpdate.getName()); |
| throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); |
| } |
| |
| int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion); |
| if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) { |
| log.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); |
| throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); |
| } |
| log.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); |
| processDefinitionUpdate.setVersion(insertVersion); |
| return processDefinitionUpdate; |
| } |
| |
| public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) { |
| ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); |
| Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode()); |
| int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1; |
| processDefinitionLog.setVersion(insertVersion); |
| processDefinition.setVersion(insertVersion); |
| |
| processDefinitionLog.setOperator(loginUser.getId()); |
| processDefinition.setUserId(loginUser.getId()); |
| processDefinitionLog.setOperateTime(processDefinition.getUpdateTime()); |
| processDefinition.setUpdateTime(processDefinition.getUpdateTime()); |
| processDefinitionLog.setId(null); |
| int result = processDefinitionMapper.updateById(processDefinition); |
| |
| int insertLog = processDefinitionLogMapper.insert(processDefinitionLog); |
| processDefinitionLog.setId(processDefinition.getId()); |
| return (insertLog & result) > 0 ? insertVersion : 0; |
| } |
| |
| public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition, |
| int processDefinitionVersion) { |
| long projectCode = processDefinition.getProjectCode(); |
| long processDefinitionCode = processDefinition.getCode(); |
| List<ProcessTaskRelation> taskRelations = |
| processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); |
| List<ProcessTaskRelationLog> taskRelationList = |
| taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); |
| |
| List<Long> taskCodeList = |
| taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList()); |
| List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList); |
| List<TaskDefinitionLog> taskDefinitionLogs = |
| taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList()); |
| |
| if (taskRelationList.isEmpty()) { |
| return Constants.EXIT_CODE_SUCCESS; |
| } |
| Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null; |
| if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) { |
| taskDefinitionLogMap = taskDefinitionLogs |
| .stream() |
| .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog)); |
| } |
| Date now = new Date(); |
| for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) { |
| processTaskRelationLog.setProjectCode(projectCode); |
| processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); |
| processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion); |
| if (taskDefinitionLogMap != null) { |
| TaskDefinitionLog preTaskDefinitionLog = |
| taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode()); |
| if (preTaskDefinitionLog != null) { |
| processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion()); |
| } |
| TaskDefinitionLog postTaskDefinitionLog = |
| taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode()); |
| if (postTaskDefinitionLog != null) { |
| processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion()); |
| } |
| } |
| processTaskRelationLog.setCreateTime(now); |
| processTaskRelationLog.setUpdateTime(now); |
| processTaskRelationLog.setOperator(loginUser.getId()); |
| processTaskRelationLog.setOperateTime(now); |
| } |
| if (!taskRelations.isEmpty()) { |
| Set<Integer> processTaskRelationSet = |
| taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet()); |
| Set<Integer> taskRelationSet = |
| taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet()); |
| boolean isSame = CollectionUtils.isEqualCollection(processTaskRelationSet, |
| taskRelationSet); |
| if (isSame) { |
| log.info("process task relations is non-existent, projectCode:{}, processCode:{}.", |
| processDefinition.getProjectCode(), processDefinition.getCode()); |
| return Constants.EXIT_CODE_SUCCESS; |
| } |
| processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode); |
| } |
| List<ProcessTaskRelation> processTaskRelations = |
| taskRelationList.stream().map(ProcessTaskRelation::new).collect(Collectors.toList()); |
| int insert = processTaskRelationMapper.batchInsert(processTaskRelations); |
| int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList); |
| return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; |
| } |
| |
| protected Map<String, Object> updateDagSchedule(User loginUser, |
| long projectCode, |
| long processDefinitionCode, |
| String scheduleJson) { |
| Map<String, Object> result = new HashMap<>(); |
| Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class); |
| if (schedule == null) { |
| putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson); |
| throw new ServiceException(Status.DATA_IS_NOT_VALID); |
| } |
| // set default value |
| FailureStrategy failureStrategy = |
| ObjectUtils.defaultIfNull(schedule.getFailureStrategy(), FailureStrategy.CONTINUE); |
| WarningType warningType = ObjectUtils.defaultIfNull(schedule.getWarningType(), WarningType.NONE); |
| Priority processInstancePriority = |
| ObjectUtils.defaultIfNull(schedule.getProcessInstancePriority(), Priority.MEDIUM); |
| int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId(); |
| String workerGroup = StringUtils.defaultIfBlank(schedule.getWorkerGroup(), DEFAULT_WORKER_GROUP); |
| String tenantCode = StringUtils.defaultIfBlank(schedule.getTenantCode(), Constants.DEFAULT); |
| long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode(); |
| |
| ScheduleParam param = new ScheduleParam(); |
| param.setStartTime(schedule.getStartTime()); |
| param.setEndTime(schedule.getEndTime()); |
| param.setCrontab(schedule.getCrontab()); |
| param.setTimezoneId(schedule.getTimezoneId()); |
| |
| return schedulerService.updateScheduleByProcessDefinitionCode( |
| loginUser, |
| projectCode, |
| processDefinitionCode, |
| JSONUtils.toJsonString(param), |
| warningType, |
| warningGroupId, |
| failureStrategy, |
| processInstancePriority, |
| workerGroup, |
| tenantCode, |
| environmentCode); |
| } |
| |
| /** |
| * release process definition and schedule |
| * |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param code process definition code |
| * @param releaseState releaseState |
| * @return update result code |
| */ |
| @Transactional |
| @Override |
| public Map<String, Object> releaseWorkflowAndSchedule(User loginUser, long projectCode, long code, |
| ReleaseState releaseState) { |
| Project project = projectMapper.queryByCode(projectCode); |
| // check user access for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_ONLINE_OFFLINE); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| // check state |
| if (null == releaseState) { |
| putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); |
| return result; |
| } |
| |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); |
| if (processDefinition == null) { |
| log.error("Process definition does not exist, code:{}.", code); |
| putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); |
| return result; |
| } |
| Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code); |
| if (scheduleObj == null) { |
| log.error("Schedule cron does not exist, processDefinitionCode:{}.", code); |
| putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, "processDefinitionCode:" + code); |
| return result; |
| } |
| switch (releaseState) { |
| case ONLINE: |
| List<ProcessTaskRelation> relationList = |
| processService.findRelationByCode(code, processDefinition.getVersion()); |
| if (CollectionUtils.isEmpty(relationList)) { |
| log.warn("Process definition has no task relation, processDefinitionCode:{}.", code); |
| putMsg(result, Status.PROCESS_DAG_IS_EMPTY); |
| return result; |
| } |
| processDefinition.setReleaseState(releaseState); |
| processDefinitionMapper.updateById(processDefinition); |
| schedulerService.setScheduleState(loginUser, projectCode, scheduleObj.getId(), ReleaseState.ONLINE); |
| break; |
| case OFFLINE: |
| processDefinition.setReleaseState(releaseState); |
| int updateProcess = processDefinitionMapper.updateById(processDefinition); |
| if (updateProcess > 0) { |
| log.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}.", |
| projectCode, code, scheduleObj.getId()); |
| // set status |
| scheduleObj.setReleaseState(ReleaseState.OFFLINE); |
| int updateSchedule = scheduleMapper.updateById(scheduleObj); |
| if (updateSchedule == 0) { |
| log.error( |
| "Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", |
| projectCode, code, scheduleObj.getId()); |
| putMsg(result, Status.OFFLINE_SCHEDULE_ERROR); |
| throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR); |
| } |
| schedulerService.deleteSchedule(project.getId(), scheduleObj.getId()); |
| } |
| break; |
| default: |
| putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); |
| return result; |
| } |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * save other relation |
| * @param loginUser |
| * @param processDefinition |
| * @param result |
| * @param otherParamsJson |
| */ |
| @Override |
| public void saveOtherRelation(User loginUser, ProcessDefinition processDefinition, Map<String, Object> result, |
| String otherParamsJson) { |
| |
| } |
| |
| /** |
| * get Json String |
| * @param loginUser |
| * @param processDefinition |
| * @return Json String |
| */ |
| @Override |
| public String doOtherOperateProcess(User loginUser, ProcessDefinition processDefinition) { |
| return null; |
| } |
| |
| /** |
| * view process variables |
| * @param loginUser login user |
| * @param projectCode project code |
| * @param code process definition code |
| * @return variables data |
| */ |
| @Override |
| public Map<String, Object> viewVariables(User loginUser, long projectCode, long code) { |
| |
| Project project = projectMapper.queryByCode(projectCode); |
| |
| // check user access for project |
| Map<String, Object> result = |
| projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION); |
| if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| return result; |
| } |
| |
| ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); |
| |
| if (Objects.isNull(processDefinition) || projectCode != processDefinition.getProjectCode()) { |
| log.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, |
| code); |
| putMsg(result, PROCESS_DEFINE_NOT_EXIST, code); |
| return result; |
| } |
| |
| // global params |
| List<Property> globalParams = processDefinition.getGlobalParamList(); |
| |
| Map<String, Map<String, Object>> localUserDefParams = getLocalParams(processDefinition); |
| |
| Map<String, Object> resultMap = new HashMap<>(); |
| |
| if (Objects.nonNull(globalParams)) { |
| resultMap.put(GLOBAL_PARAMS, globalParams); |
| } |
| |
| if (Objects.nonNull(localUserDefParams)) { |
| resultMap.put(LOCAL_PARAMS, localUserDefParams); |
| } |
| |
| result.put(DATA_LIST, resultMap); |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| /** |
| * get local params |
| */ |
| private Map<String, Map<String, Object>> getLocalParams(ProcessDefinition processDefinition) { |
| Map<String, Map<String, Object>> localUserDefParams = new HashMap<>(); |
| |
| Set<Long> taskCodeSet = new TreeSet<>(); |
| |
| processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()) |
| .forEach(processTaskRelation -> { |
| if (processTaskRelation.getPreTaskCode() > 0) { |
| taskCodeSet.add(processTaskRelation.getPreTaskCode()); |
| } |
| if (processTaskRelation.getPostTaskCode() > 0) { |
| taskCodeSet.add(processTaskRelation.getPostTaskCode()); |
| } |
| }); |
| |
| taskDefinitionMapper.queryByCodeList(taskCodeSet) |
| .stream().forEach(taskDefinition -> { |
| Map<String, Object> localParamsMap = new HashMap<>(); |
| String localParams = JSONUtils.getNodeString(taskDefinition.getTaskParams(), LOCAL_PARAMS); |
| if (!StringUtils.isEmpty(localParams)) { |
| List<Property> localParamsList = JSONUtils.toList(localParams, Property.class); |
| localParamsMap.put(TASK_TYPE, taskDefinition.getTaskType()); |
| localParamsMap.put(LOCAL_PARAMS_LIST, localParamsList); |
| if (CollectionUtils.isNotEmpty(localParamsList)) { |
| localUserDefParams.put(taskDefinition.getName(), localParamsMap); |
| } |
| } |
| }); |
| |
| return localUserDefParams; |
| } |
| |
| /** |
| * delete other relation |
| * @param project |
| * @param result |
| * @param processDefinition |
| */ |
| @Override |
| public void deleteOtherRelation(Project project, Map<String, Object> result, ProcessDefinition processDefinition) { |
| |
| } |
| |
| } |