blob: a7eba1f53469143b040a538267702f62d784c520 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* dependent item execute
*/
public class DependentExecute {
/**
* process service
*/
private final ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
/**
* depend item list
*/
private List<DependentItem> dependItemList;
/**
* dependent relation
*/
private DependentRelation relation;
/**
* depend result
*/
private DependResult modelDependResult = DependResult.WAITING;
/**
* depend result map
*/
private Map<String, DependResult> dependResultMap = new HashMap<>();
/**
* logger
*/
private Logger logger = LoggerFactory.getLogger(DependentExecute.class);
/**
* constructor
*
* @param itemList item list
* @param relation relation
*/
public DependentExecute(List<DependentItem> itemList, DependentRelation relation) {
this.dependItemList = itemList;
this.relation = relation;
}
/**
* get dependent item for one dependent item
*
* @param dependentItem dependent item
* @param currentTime current time
* @return DependResult
*/
private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime) {
List<DateInterval> dateIntervals = DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue());
return calculateResultForTasks(dependentItem, dateIntervals);
}
/**
* calculate dependent result for one dependent item.
*
* @param dependentItem dependent item
* @param dateIntervals date intervals
* @return dateIntervals
*/
private DependResult calculateResultForTasks(DependentItem dependentItem,
List<DateInterval> dateIntervals) {
DependResult result = DependResult.FAILED;
for (DateInterval dateInterval : dateIntervals) {
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(),
dateInterval);
if (processInstance == null) {
return DependResult.WAITING;
}
// need to check workflow for updates, so get all task and check the task state
if (dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)) {
result = dependResultByProcessInstance(processInstance);
} else {
result = getDependTaskResult(dependentItem.getDepTasks(), processInstance);
}
if (result != DependResult.SUCCESS) {
break;
}
}
return result;
}
/**
* depend type = depend_all
*
* @return
*/
private DependResult dependResultByProcessInstance(ProcessInstance processInstance) {
if (!processInstance.getState().typeIsFinished()) {
return DependResult.WAITING;
}
if (processInstance.getState().typeIsSuccess()) {
return DependResult.SUCCESS;
}
return DependResult.FAILED;
}
/**
* get depend task result
*
* @param taskName
* @param processInstance
* @return
*/
private DependResult getDependTaskResult(String taskName, ProcessInstance processInstance) {
DependResult result;
TaskInstance taskInstance = null;
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : taskInstanceList) {
if (task.getName().equals(taskName)) {
taskInstance = task;
break;
}
}
if (taskInstance == null) {
// cannot find task in the process instance
// maybe because process instance is running or failed.
if (processInstance.getState().typeIsFinished()) {
result = DependResult.FAILED;
} else {
return DependResult.WAITING;
}
} else {
result = getDependResultByState(taskInstance.getState());
}
return result;
}
/**
* find the last one process instance that :
* 1. manual run and finish between the interval
* 2. schedule run and schedule time between the interval
*
* @param definitionCode definition code
* @param dateInterval date interval
* @return ProcessInstance
*/
private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval) {
ProcessInstance runningProcess = processService.findLastRunningProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime());
if (runningProcess != null) {
return runningProcess;
}
ProcessInstance lastSchedulerProcess = processService.findLastSchedulerProcessInterval(definitionCode, dateInterval);
ProcessInstance lastManualProcess = processService.findLastManualProcessInterval(definitionCode, dateInterval);
if (lastManualProcess == null) {
return lastSchedulerProcess;
}
if (lastSchedulerProcess == null) {
return lastManualProcess;
}
return (lastManualProcess.getEndTime().after(lastSchedulerProcess.getEndTime())) ? lastManualProcess : lastSchedulerProcess;
}
/**
* get dependent result by task/process instance state
*
* @param state state
* @return DependResult
*/
private DependResult getDependResultByState(ExecutionStatus state) {
if (!state.typeIsFinished()) {
return DependResult.WAITING;
} else if (state.typeIsSuccess()) {
return DependResult.SUCCESS;
} else {
return DependResult.FAILED;
}
}
/**
* get dependent result by task instance state when task instance is null
*
* @param state state
* @return DependResult
*/
private DependResult getDependResultByProcessStateWhenTaskNull(ExecutionStatus state) {
if (state.typeIsRunning()
|| state == ExecutionStatus.SUBMITTED_SUCCESS
|| state == ExecutionStatus.WAITTING_THREAD) {
return DependResult.WAITING;
} else {
return DependResult.FAILED;
}
}
/**
* judge depend item finished
*
* @param currentTime current time
* @return boolean
*/
public boolean finish(Date currentTime) {
if (modelDependResult == DependResult.WAITING) {
modelDependResult = getModelDependResult(currentTime);
return false;
}
return true;
}
/**
* get model depend result
*
* @param currentTime current time
* @return DependResult
*/
public DependResult getModelDependResult(Date currentTime) {
List<DependResult> dependResultList = new ArrayList<>();
for (DependentItem dependentItem : dependItemList) {
DependResult dependResult = getDependResultForItem(dependentItem, currentTime);
if (dependResult != DependResult.WAITING) {
dependResultMap.put(dependentItem.getKey(), dependResult);
}
dependResultList.add(dependResult);
}
modelDependResult = DependentUtils.getDependResultForRelation(this.relation, dependResultList);
return modelDependResult;
}
/**
* get dependent item result
*
* @param item item
* @param currentTime current time
* @return DependResult
*/
private DependResult getDependResultForItem(DependentItem item, Date currentTime) {
String key = item.getKey();
if (dependResultMap.containsKey(key)) {
return dependResultMap.get(key);
}
return getDependentResultForItem(item, currentTime);
}
public Map<String, DependResult> getDependResultMap() {
return dependResultMap;
}
}