| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.dolphinscheduler.api.service.impl; |
| |
| import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; |
| |
| import org.apache.dolphinscheduler.api.enums.Status; |
| import org.apache.dolphinscheduler.api.exceptions.ServiceException; |
| import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; |
| import org.apache.dolphinscheduler.common.constants.Constants; |
| import org.apache.dolphinscheduler.common.utils.JSONUtils; |
| import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition; |
| import org.apache.dolphinscheduler.dao.entity.ProcessLineage; |
| import org.apache.dolphinscheduler.dao.entity.Project; |
| import org.apache.dolphinscheduler.dao.entity.TaskDefinition; |
| import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
| import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; |
| import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; |
| import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; |
| import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; |
| import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; |
| import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; |
| import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; |
| import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; |
| import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; |
| import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; |
| |
| import org.apache.commons.lang3.StringUtils; |
| |
| import java.text.MessageFormat; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| import lombok.extern.slf4j.Slf4j; |
| |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.stereotype.Service; |
| import org.springframework.util.CollectionUtils; |
| |
| /** |
| * work flow lineage service impl |
| */ |
| @Slf4j |
| @Service |
| public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkFlowLineageService { |
| |
| @Autowired |
| private WorkFlowLineageMapper workFlowLineageMapper; |
| |
| @Autowired |
| private ProjectMapper projectMapper; |
| |
| @Autowired |
| private TaskDefinitionLogMapper taskDefinitionLogMapper; |
| |
| @Autowired |
| private TaskDefinitionMapper taskDefinitionMapper; |
| |
| @Override |
| public List<WorkFlowLineage> queryWorkFlowLineageByName(long projectCode, String workFlowName) { |
| Project project = projectMapper.queryByCode(projectCode); |
| if (project == null) { |
| throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode); |
| } |
| return workFlowLineageMapper.queryWorkFlowLineageByName(projectCode, workFlowName); |
| } |
| |
| @Override |
| public Map<String, Object> queryWorkFlowLineageByCode(long projectCode, long sourceWorkFlowCode) { |
| Project project = projectMapper.queryByCode(projectCode); |
| if (project == null) { |
| throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode); |
| } |
| List<WorkFlowLineage> workFlowLineages = new ArrayList<>(); |
| Set<WorkFlowRelation> workFlowRelations = new HashSet<>(); |
| recursiveWorkFlow(projectCode, sourceWorkFlowCode, workFlowLineages, workFlowRelations); |
| Map<String, Object> workFlowLists = new HashMap<>(); |
| // todo: use vo |
| workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineages); |
| workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations); |
| return workFlowLists; |
| } |
| |
| private void recursiveWorkFlow(long projectCode, |
| long sourceWorkFlowCode, |
| List<WorkFlowLineage> workFlowLineages, |
| Set<WorkFlowRelation> workFlowRelations) { |
| workFlowLineages.add(workFlowLineageMapper.queryWorkFlowLineageByCode(projectCode, sourceWorkFlowCode)); |
| |
| List<WorkFlowLineage> downStreamWorkFlowLineages = |
| workFlowLineageMapper.queryDownstreamLineageByProcessDefinitionCode(sourceWorkFlowCode, "DEPENDENT"); |
| workFlowLineages.addAll(downStreamWorkFlowLineages); |
| downStreamWorkFlowLineages.forEach(workFlowLineage -> workFlowRelations |
| .add(new WorkFlowRelation(sourceWorkFlowCode, workFlowLineage.getWorkFlowCode()))); |
| |
| List<WorkFlowLineage> upstreamWorkFlowLineages = new ArrayList<>(); |
| getUpstreamLineages(sourceWorkFlowCode, upstreamWorkFlowLineages); |
| workFlowLineages.addAll(upstreamWorkFlowLineages); |
| upstreamWorkFlowLineages.forEach(workFlowLineage -> workFlowRelations |
| .add(new WorkFlowRelation(workFlowLineage.getWorkFlowCode(), sourceWorkFlowCode))); |
| } |
| |
| private void getUpstreamLineages(long sourceWorkFlowCode, |
| List<WorkFlowLineage> upstreamWorkFlowLineages) { |
| List<DependentProcessDefinition> workFlowDependentDefinitionList = |
| workFlowLineageMapper.queryUpstreamDependentParamsByProcessDefinitionCode(sourceWorkFlowCode, |
| "DEPENDENT"); |
| |
| List<Long> upstreamProcessDefinitionCodes = new ArrayList<>(); |
| |
| getProcessDefinitionCodeByDependentDefinitionList(workFlowDependentDefinitionList, |
| upstreamProcessDefinitionCodes); |
| |
| if (!upstreamProcessDefinitionCodes.isEmpty()) { |
| upstreamWorkFlowLineages.addAll( |
| workFlowLineageMapper.queryWorkFlowLineageByProcessDefinitionCodes(upstreamProcessDefinitionCodes)); |
| } |
| } |
| |
| /** |
| * get dependent process definition code by dependent process definition list |
| */ |
| private void getProcessDefinitionCodeByDependentDefinitionList(List<DependentProcessDefinition> dependentDefinitionList, |
| List<Long> processDefinitionCodes) { |
| for (DependentProcessDefinition dependentProcessDefinition : dependentDefinitionList) { |
| for (DependentTaskModel dependentTaskModel : dependentProcessDefinition.getDependentParameters() |
| .getDependTaskList()) { |
| for (DependentItem dependentItem : dependentTaskModel.getDependItemList()) { |
| if (!processDefinitionCodes.contains(dependentItem.getDefinitionCode())) { |
| processDefinitionCodes.add(dependentItem.getDefinitionCode()); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public Map<String, Object> queryWorkFlowLineage(long projectCode) { |
| Map<String, Object> result = new HashMap<>(); |
| Project project = projectMapper.queryByCode(projectCode); |
| if (project == null) { |
| log.error("Project does not exist, projectCode:{}.", projectCode); |
| putMsg(result, Status.PROJECT_NOT_FOUND, projectCode); |
| return result; |
| } |
| List<ProcessLineage> processLineages = workFlowLineageMapper.queryProcessLineage(projectCode); |
| Map<Long, WorkFlowLineage> workFlowLineagesMap = new HashMap<>(); |
| Set<WorkFlowRelation> workFlowRelations = new HashSet<>(); |
| if (!processLineages.isEmpty()) { |
| List<WorkFlowLineage> workFlowLineages = |
| workFlowLineageMapper.queryWorkFlowLineageByLineage(processLineages); |
| workFlowLineagesMap = workFlowLineages.stream() |
| .collect(Collectors.toMap(WorkFlowLineage::getWorkFlowCode, workFlowLineage -> workFlowLineage)); |
| Map<Long, List<TaskDefinition>> workFlowMap = new HashMap<>(); |
| for (ProcessLineage processLineage : processLineages) { |
| workFlowMap.compute(processLineage.getProcessDefinitionCode(), (k, v) -> { |
| if (v == null) { |
| v = new ArrayList<>(); |
| } |
| if (processLineage.getPreTaskCode() > 0) { |
| v.add(new TaskDefinition(processLineage.getPreTaskCode(), processLineage.getPreTaskVersion())); |
| } |
| if (processLineage.getPostTaskCode() > 0) { |
| v.add(new TaskDefinition(processLineage.getPostTaskCode(), |
| processLineage.getPostTaskVersion())); |
| } |
| return v; |
| }); |
| } |
| for (Entry<Long, List<TaskDefinition>> workFlow : workFlowMap.entrySet()) { |
| Set<Long> sourceWorkFlowCodes = |
| querySourceWorkFlowCodes(projectCode, workFlow.getKey(), workFlow.getValue()); |
| if (sourceWorkFlowCodes.isEmpty()) { |
| workFlowRelations.add(new WorkFlowRelation(0L, workFlow.getKey())); |
| } else { |
| workFlowLineagesMap.get(workFlow.getKey()) |
| .setSourceWorkFlowCode(StringUtils.join(sourceWorkFlowCodes, Constants.COMMA)); |
| sourceWorkFlowCodes |
| .forEach(code -> workFlowRelations.add(new WorkFlowRelation(code, workFlow.getKey()))); |
| } |
| } |
| } |
| Map<String, Object> workFlowLists = new HashMap<>(); |
| workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineagesMap.values()); |
| workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations); |
| result.put(Constants.DATA_LIST, workFlowLists); |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| |
| private Set<Long> querySourceWorkFlowCodes(long projectCode, long workFlowCode, |
| List<TaskDefinition> taskDefinitionList) { |
| Set<Long> sourceWorkFlowCodes = new HashSet<>(); |
| if (taskDefinitionList == null || taskDefinitionList.isEmpty()) { |
| return sourceWorkFlowCodes; |
| } |
| List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionList); |
| for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { |
| if (taskDefinitionLog.getProjectCode() == projectCode) { |
| if (taskDefinitionLog.getTaskType().equals(TASK_TYPE_DEPENDENT)) { |
| DependentParameters dependentParameters = |
| JSONUtils.parseObject(taskDefinitionLog.getDependence(), DependentParameters.class); |
| if (dependentParameters != null) { |
| List<DependentTaskModel> dependTaskList = |
| dependentParameters.getDependTaskList(); |
| if (!CollectionUtils.isEmpty(dependTaskList)) { |
| for (DependentTaskModel taskModel : dependTaskList) { |
| List<DependentItem> dependItemList = taskModel.getDependItemList(); |
| for (DependentItem dependentItem : dependItemList) { |
| if (dependentItem.getProjectCode() == projectCode |
| && dependentItem.getDefinitionCode() != workFlowCode) { |
| sourceWorkFlowCodes.add(dependentItem.getDefinitionCode()); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| return sourceWorkFlowCodes; |
| } |
| |
| /** |
| * Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method. |
| * |
| * @param projectCode Project code want to query tasks dependence |
| * @param processDefinitionCode Process definition code want to query tasks dependence |
| * @param taskCode Task code want to query tasks dependence |
| * @return Optional of formatter message |
| */ |
| @Override |
| public Optional<String> taskDepOnTaskMsg(long projectCode, long processDefinitionCode, long taskCode) { |
| List<TaskMainInfo> tasksDep = |
| workFlowLineageMapper.queryTaskDepOnTask(projectCode, processDefinitionCode, taskCode); |
| if (CollectionUtils.isEmpty(tasksDep)) { |
| return Optional.empty(); |
| } |
| |
| String taskDepStr = tasksDep.stream().map( |
| task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(), task.getTaskName())) |
| .collect(Collectors.joining(Constants.COMMA)); |
| TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); |
| return Optional.of(MessageFormat.format(Status.DELETE_TASK_USE_BY_OTHER_FAIL.getMsg(), taskDefinition.getName(), |
| taskDepStr)); |
| } |
| |
| /** |
| * Query tasks depend on process definition, include upstream or downstream |
| * |
| * @param projectCode Project code want to query tasks dependence |
| * @param processDefinitionCode Process definition code want to query tasks dependence |
| * @return Set of TaskMainInfo |
| */ |
| @Override |
| public Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode) { |
| Set<TaskMainInfo> taskMainInfos = new HashSet<>(); |
| List<TaskMainInfo> taskDependents = |
| workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode, 0); |
| List<TaskMainInfo> taskSubProcess = |
| workFlowLineageMapper.queryTaskSubProcessDepOnProcess(projectCode, processDefinitionCode); |
| taskMainInfos.addAll(taskDependents); |
| taskMainInfos.addAll(taskSubProcess); |
| return taskMainInfos; |
| } |
| |
| /** |
| * Query downstream tasks depend on a process definition or a task |
| * |
| * @param processDefinitionCode Process definition code want to query tasks dependence |
| * @param taskCode Task code want to query tasks dependence |
| * @return downstream dependent tasks |
| */ |
| @Override |
| public Map<String, Object> queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode) { |
| Map<String, Object> result = new HashMap<>(); |
| List<TaskMainInfo> taskDependents = |
| workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode, |
| Objects.isNull(taskCode) ? 0 : taskCode.longValue()); |
| result.put(Constants.DATA_LIST, taskDependents); |
| putMsg(result, Status.SUCCESS); |
| return result; |
| } |
| } |