| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.dolphinscheduler.tools.datasource.dao; |
| |
| import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; |
| import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; |
| import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; |
| |
| import org.apache.dolphinscheduler.common.Constants; |
| import org.apache.dolphinscheduler.common.enums.ConditionType; |
| import org.apache.dolphinscheduler.common.enums.Flag; |
| import org.apache.dolphinscheduler.common.enums.Priority; |
| import org.apache.dolphinscheduler.common.enums.TimeoutFlag; |
| import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; |
| import org.apache.dolphinscheduler.common.utils.ConnectionUtils; |
| import org.apache.dolphinscheduler.common.utils.JSONUtils; |
| import org.apache.dolphinscheduler.common.utils.ScriptRunner; |
| import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
| import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; |
| import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; |
| import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; |
| import org.apache.dolphinscheduler.dao.upgrade.JsonSplitDao; |
| import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao; |
| import org.apache.dolphinscheduler.dao.upgrade.ProjectDao; |
| import org.apache.dolphinscheduler.dao.upgrade.ScheduleDao; |
| import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils; |
| import org.apache.dolphinscheduler.dao.upgrade.WorkerGroupDao; |
| import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; |
| import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter; |
| import org.apache.dolphinscheduler.spi.enums.DbType; |
| |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.lang.StringUtils; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.Reader; |
| import java.sql.Connection; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.stream.Collectors; |
| |
| import javax.sql.DataSource; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.core.io.ClassPathResource; |
| import org.springframework.core.io.Resource; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.JsonNode; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.node.ArrayNode; |
| import com.fasterxml.jackson.databind.node.ObjectNode; |
| |
| public abstract class UpgradeDao { |
| public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class); |
| private static final String T_VERSION_NAME = "t_escheduler_version"; |
| private static final String T_NEW_VERSION_NAME = "t_ds_version"; |
| |
| protected final DataSource dataSource; |
| |
| protected UpgradeDao(DataSource dataSource) { |
| this.dataSource = dataSource; |
| } |
| |
| protected abstract String initSqlPath(); |
| |
| public abstract DbType getDbType(); |
| |
| public void initSchema() { |
| // Execute the dolphinscheduler full sql |
| runInitSql(getDbType()); |
| } |
| |
| /** |
| * run init sql to init db schema |
| * @param dbType db type |
| */ |
| private void runInitSql(DbType dbType) { |
| String sqlFile = String.format("dolphinscheduler_%s.sql",dbType.getDescp()); |
| Resource mysqlSQLFilePath = new ClassPathResource("sql/" + sqlFile); |
| try (Connection conn = dataSource.getConnection()) { |
| // Execute the dolphinscheduler_ddl.sql script to create the table structure of dolphinscheduler |
| ScriptRunner initScriptRunner = new ScriptRunner(conn, true, true); |
| Reader initSqlReader = new InputStreamReader(mysqlSQLFilePath.getInputStream()); |
| initScriptRunner.runScript(initSqlReader); |
| } catch (Exception e) { |
| logger.error(e.getMessage(), e); |
| throw new RuntimeException(e.getMessage(), e); |
| } |
| } |
| |
| public abstract boolean isExistsTable(String tableName); |
| |
| public abstract boolean isExistsColumn(String tableName, String columnName); |
| |
| public String getCurrentVersion(String versionName) { |
| String sql = String.format("select version from %s", versionName); |
| Connection conn = null; |
| ResultSet rs = null; |
| PreparedStatement pstmt = null; |
| String version = null; |
| try { |
| conn = dataSource.getConnection(); |
| pstmt = conn.prepareStatement(sql); |
| rs = pstmt.executeQuery(); |
| |
| if (rs.next()) { |
| version = rs.getString(1); |
| } |
| |
| return version; |
| |
| } catch (SQLException e) { |
| logger.error(e.getMessage(), e); |
| throw new RuntimeException("sql: " + sql, e); |
| } finally { |
| ConnectionUtils.releaseResource(rs, pstmt, conn); |
| } |
| } |
| |
| public void upgradeDolphinScheduler(String schemaDir) { |
| upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl.sql"); |
| upgradeDolphinSchedulerDML(schemaDir); |
| } |
| |
| /** |
| * upgrade DolphinScheduler worker group |
| * ds-1.3.0 modify the worker group for process definition json |
| */ |
| public void upgradeDolphinSchedulerWorkerGroup() { |
| updateProcessDefinitionJsonWorkerGroup(); |
| } |
| |
| /** |
| * upgrade DolphinScheduler resource list |
| * ds-1.3.2 modify the resource list for process definition json |
| */ |
| public void upgradeDolphinSchedulerResourceList() { |
| updateProcessDefinitionJsonResourceList(); |
| } |
| |
| /** |
| * upgrade DolphinScheduler to 2.0.0 |
| */ |
| public void upgradeDolphinSchedulerTo200(String schemaDir) { |
| processDefinitionJsonSplit(); |
| upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql"); |
| } |
| |
| /** |
| * upgrade DolphinScheduler to 2.0.6 |
| */ |
| public void upgradeDolphinSchedulerResourceFileSize() { |
| ResourceDao resourceDao = new ResourceDao(); |
| try { |
| // update the size of the folder that is the type of file. |
| resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 0); |
| // update the size of the folder that is the type of udf. |
| resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 1); |
| } catch (Exception ex) { |
| logger.error("Failed to upgrade because of failing to update the folder's size of resource files."); |
| } |
| } |
| |
| /** |
| * updateProcessDefinitionJsonWorkerGroup |
| */ |
| protected void updateProcessDefinitionJsonWorkerGroup() { |
| WorkerGroupDao workerGroupDao = new WorkerGroupDao(); |
| ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); |
| Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>(); |
| try { |
| Map<Integer, String> oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection()); |
| Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); |
| |
| for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) { |
| ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue()); |
| ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); |
| |
| for (int i = 0; i < tasks.size(); i++) { |
| ObjectNode task = (ObjectNode) tasks.path(i); |
| ObjectNode workerGroupNode = (ObjectNode) task.path("workerGroupId"); |
| int workerGroupId = -1; |
| if (workerGroupNode != null && workerGroupNode.canConvertToInt()) { |
| workerGroupId = workerGroupNode.asInt(-1); |
| } |
| if (workerGroupId == -1) { |
| task.put("workerGroup", "default"); |
| } else { |
| task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId)); |
| } |
| } |
| |
| jsonObject.remove("task"); |
| |
| jsonObject.put("tasks", tasks); |
| |
| replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString()); |
| } |
| if (replaceProcessDefinitionMap.size() > 0) { |
| processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(), replaceProcessDefinitionMap); |
| } |
| } catch (Exception e) { |
| logger.error("update process definition json workergroup error", e); |
| } |
| } |
| |
| protected void updateProcessDefinitionJsonResourceList() { |
| ResourceDao resourceDao = new ResourceDao(); |
| ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); |
| Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>(); |
| try { |
| Map<String, Integer> resourcesMap = resourceDao.listAllResources(dataSource.getConnection()); |
| Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); |
| |
| for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) { |
| ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue()); |
| ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); |
| |
| for (int i = 0; i < tasks.size(); i++) { |
| ObjectNode task = (ObjectNode) tasks.get(i); |
| ObjectNode param = (ObjectNode) task.get("params"); |
| if (param != null) { |
| |
| List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class); |
| ResourceInfo mainJar = JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class); |
| if (mainJar != null && mainJar.getId() == 0) { |
| String fullName = mainJar.getRes().startsWith("/") ? mainJar.getRes() : String.format("/%s", mainJar.getRes()); |
| if (resourcesMap.containsKey(fullName)) { |
| mainJar.setId(resourcesMap.get(fullName)); |
| param.put("mainJar", JSONUtils.parseObject(JSONUtils.toJsonString(mainJar))); |
| } |
| } |
| |
| if (CollectionUtils.isNotEmpty(resourceList)) { |
| List<ResourceInfo> newResourceList = resourceList.stream().map(resInfo -> { |
| String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() : String.format("/%s", resInfo.getRes()); |
| if (resInfo.getId() == 0 && resourcesMap.containsKey(fullName)) { |
| resInfo.setId(resourcesMap.get(fullName)); |
| } |
| return resInfo; |
| }).collect(Collectors.toList()); |
| param.put("resourceList", JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList))); |
| } |
| } |
| task.put("params", param); |
| |
| } |
| |
| jsonObject.remove("tasks"); |
| |
| jsonObject.put("tasks", tasks); |
| |
| replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString()); |
| } |
| if (replaceProcessDefinitionMap.size() > 0) { |
| processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(), replaceProcessDefinitionMap); |
| } |
| } catch (Exception e) { |
| logger.error("update process definition json resource list error", e); |
| } |
| |
| } |
| |
| private void upgradeDolphinSchedulerDML(String schemaDir) { |
| String schemaVersion = schemaDir.split("_")[0]; |
| Resource sqlFilePath = new ClassPathResource(String.format("sql/upgrade/%s/%s/dolphinscheduler_dml.sql", schemaDir, getDbType().name().toLowerCase())); |
| logger.info("sqlSQLFilePath: {}", sqlFilePath); |
| if (!sqlFilePath.exists()) { |
| logger.info("sqlSQLFilePath: {} not exist", sqlFilePath); |
| return; |
| } |
| Connection conn = null; |
| PreparedStatement pstmt = null; |
| try { |
| conn = dataSource.getConnection(); |
| conn.setAutoCommit(false); |
| // Execute the upgraded dolphinscheduler dml |
| ScriptRunner scriptRunner = new ScriptRunner(conn, false, true); |
| Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream()); |
| scriptRunner.runScript(sqlReader); |
| if (isExistsTable(T_VERSION_NAME)) { |
| // Change version in the version table to the new version |
| String upgradeSQL = String.format("update %s set version = ?", T_VERSION_NAME); |
| pstmt = conn.prepareStatement(upgradeSQL); |
| pstmt.setString(1, schemaVersion); |
| pstmt.executeUpdate(); |
| } else if (isExistsTable(T_NEW_VERSION_NAME)) { |
| // Change version in the version table to the new version |
| String upgradeSQL = String.format("update %s set version = ?", T_NEW_VERSION_NAME); |
| pstmt = conn.prepareStatement(upgradeSQL); |
| pstmt.setString(1, schemaVersion); |
| pstmt.executeUpdate(); |
| } |
| conn.commit(); |
| } catch (FileNotFoundException e) { |
| try { |
| conn.rollback(); |
| } catch (SQLException e1) { |
| logger.error(e1.getMessage(), e1); |
| } |
| logger.error(e.getMessage(), e); |
| throw new RuntimeException("sql file not found ", e); |
| } catch (IOException e) { |
| try { |
| conn.rollback(); |
| } catch (SQLException e1) { |
| logger.error(e1.getMessage(), e1); |
| } |
| logger.error(e.getMessage(), e); |
| throw new RuntimeException(e.getMessage(), e); |
| } catch (Exception e) { |
| try { |
| if (null != conn) { |
| conn.rollback(); |
| } |
| } catch (SQLException e1) { |
| logger.error(e1.getMessage(), e1); |
| } |
| logger.error(e.getMessage(), e); |
| throw new RuntimeException(e.getMessage(), e); |
| } finally { |
| ConnectionUtils.releaseResource(pstmt, conn); |
| } |
| |
| } |
| |
| /** |
| * upgradeDolphinScheduler DDL |
| * |
| * @param schemaDir schemaDir |
| */ |
| private void upgradeDolphinSchedulerDDL(String schemaDir, String scriptFile) { |
| Resource sqlFilePath = new ClassPathResource(String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile)); |
| if (!sqlFilePath.exists()) { |
| logger.info("sqlFilePath {} not exist", sqlFilePath); |
| return; |
| } |
| Connection conn = null; |
| PreparedStatement pstmt = null; |
| try { |
| conn = dataSource.getConnection(); |
| String dbName = conn.getCatalog(); |
| logger.info(dbName); |
| conn.setAutoCommit(true); |
| // Execute the dolphinscheduler ddl.sql for the upgrade |
| ScriptRunner scriptRunner = new ScriptRunner(conn, true, true); |
| Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream()); |
| logger.info("Start ddl file: {}-{}", schemaDir, scriptFile); |
| scriptRunner.runScript(sqlReader); |
| |
| } catch (FileNotFoundException e) { |
| |
| logger.error(e.getMessage(), e); |
| throw new RuntimeException("sql file not found ", e); |
| } catch (Exception e) { |
| logger.error(e.getMessage(), e); |
| throw new RuntimeException(e.getMessage(), e); |
| } finally { |
| ConnectionUtils.releaseResource(pstmt, conn); |
| } |
| } |
| |
| /** |
| * update version |
| * |
| * @param version version |
| */ |
| public void updateVersion(String version) { |
| // Change version in the version table to the new version |
| String versionName = T_VERSION_NAME; |
| if (!SchemaUtils.isAGreatVersion("1.2.0", version)) { |
| versionName = "t_ds_version"; |
| } |
| String upgradeSQL = String.format("update %s set version = ?", versionName); |
| PreparedStatement pstmt = null; |
| Connection conn = null; |
| try { |
| conn = dataSource.getConnection(); |
| pstmt = conn.prepareStatement(upgradeSQL); |
| pstmt.setString(1, version); |
| pstmt.executeUpdate(); |
| |
| } catch (SQLException e) { |
| logger.error(e.getMessage(), e); |
| throw new RuntimeException("sql: " + upgradeSQL, e); |
| } finally { |
| ConnectionUtils.releaseResource(pstmt, conn); |
| } |
| |
| } |
| |
| /** |
| * upgrade DolphinScheduler to 2.0.0, json split |
| */ |
| private void processDefinitionJsonSplit() { |
| ProjectDao projectDao = new ProjectDao(); |
| ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); |
| ScheduleDao scheduleDao = new ScheduleDao(); |
| JsonSplitDao jsonSplitDao = new JsonSplitDao(); |
| try { |
| // execute project |
| Map<Integer, Long> projectIdCodeMap = projectDao.queryAllProject(dataSource.getConnection()); |
| projectDao.updateProjectCode(dataSource.getConnection(), projectIdCodeMap); |
| |
| // execute process definition code |
| List<ProcessDefinition> processDefinitions = processDefinitionDao.queryProcessDefinition(dataSource.getConnection()); |
| processDefinitionDao.updateProcessDefinitionCode(dataSource.getConnection(), processDefinitions, projectIdCodeMap); |
| |
| // execute schedule |
| Map<Integer, Long> allSchedule = scheduleDao.queryAllSchedule(dataSource.getConnection()); |
| Map<Integer, Long> processIdCodeMap = processDefinitions.stream().collect(Collectors.toMap(ProcessDefinition::getId, ProcessDefinition::getCode)); |
| scheduleDao.updateScheduleCode(dataSource.getConnection(), allSchedule, processIdCodeMap); |
| |
| // json split |
| Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); |
| List<ProcessDefinitionLog> processDefinitionLogs = new ArrayList<>(); |
| List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>(); |
| List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>(); |
| Map<Integer, Map<Long, Map<String, Long>>> processTaskMap = new HashMap<>(); |
| splitProcessDefinitionJson(processDefinitions, processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs, taskDefinitionLogs, processTaskMap); |
| convertDependence(taskDefinitionLogs, projectIdCodeMap, processTaskMap); |
| |
| // execute json split |
| jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(), processDefinitionLogs); |
| jsonSplitDao.executeJsonSplitProcessTaskRelation(dataSource.getConnection(), processTaskRelationLogs); |
| jsonSplitDao.executeJsonSplitTaskDefinition(dataSource.getConnection(), taskDefinitionLogs); |
| } catch (Exception e) { |
| logger.error("json split error", e); |
| } |
| } |
| |
| private void splitProcessDefinitionJson(List<ProcessDefinition> processDefinitions, |
| Map<Integer, String> processDefinitionJsonMap, |
| List<ProcessDefinitionLog> processDefinitionLogs, |
| List<ProcessTaskRelationLog> processTaskRelationLogs, |
| List<TaskDefinitionLog> taskDefinitionLogs, |
| Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) throws Exception { |
| Map<Integer, ProcessDefinition> processDefinitionMap = processDefinitions.stream() |
| .collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition)); |
| Date now = new Date(); |
| for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) { |
| if (entry.getValue() == null) { |
| throw new Exception("processDefinitionJson is null"); |
| } |
| ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue()); |
| ProcessDefinition processDefinition = processDefinitionMap.get(entry.getKey()); |
| if (processDefinition != null) { |
| processDefinition.setTenantId(jsonObject.get("tenantId") == null ? -1 : jsonObject.get("tenantId").asInt()); |
| processDefinition.setTimeout(jsonObject.get("timeout").asInt()); |
| processDefinition.setGlobalParams(jsonObject.get("globalParams").toString()); |
| } else { |
| throw new Exception("It can't find processDefinition, please check !"); |
| } |
| Map<String, Long> taskIdCodeMap = new HashMap<>(); |
| Map<String, List<String>> taskNamePreMap = new HashMap<>(); |
| Map<String, Long> taskNameCodeMap = new HashMap<>(); |
| Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = new HashMap<>(); |
| List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>(); |
| ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); |
| for (int i = 0; i < tasks.size(); i++) { |
| ObjectNode task = (ObjectNode) tasks.path(i); |
| ObjectNode param = (ObjectNode) task.get("params"); |
| TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); |
| String taskType = task.get("type").asText(); |
| if (param != null) { |
| JsonNode resourceJsonNode = param.get("resourceList"); |
| if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) { |
| List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class); |
| List<Integer> resourceIds = resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList()); |
| taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, Constants.COMMA)); |
| } else { |
| taskDefinitionLog.setResourceIds(StringUtils.EMPTY); |
| } |
| if (TASK_TYPE_SUB_PROCESS.equals(taskType)) { |
| JsonNode jsonNodeDefinitionId = param.get("processDefinitionId"); |
| if (jsonNodeDefinitionId != null) { |
| param.put("processDefinitionCode", processDefinitionMap.get(jsonNodeDefinitionId.asInt()).getCode()); |
| param.remove("processDefinitionId"); |
| } |
| } |
| param.put("conditionResult", task.get("conditionResult")); |
| param.put("dependence", task.get("dependence")); |
| taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param)); |
| } |
| TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class); |
| if (timeout != null) { |
| taskDefinitionLog.setTimeout(timeout.getInterval()); |
| taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE); |
| taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy()); |
| } |
| String desc = task.get("description") != null ? task.get("description").asText() : |
| task.get("desc") != null ? task.get("desc").asText() : ""; |
| taskDefinitionLog.setDescription(desc); |
| taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText()) ? Flag.YES : Flag.NO); |
| taskDefinitionLog.setTaskType(taskType); |
| taskDefinitionLog.setFailRetryInterval(TASK_TYPE_SUB_PROCESS.equals(taskType) ? 1 : task.get("retryInterval").asInt()); |
| taskDefinitionLog.setFailRetryTimes(TASK_TYPE_SUB_PROCESS.equals(taskType) ? 0 : task.get("maxRetryTimes").asInt()); |
| taskDefinitionLog.setTaskPriority(JSONUtils.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")), Priority.class)); |
| String name = task.get("name").asText(); |
| taskDefinitionLog.setName(name); |
| taskDefinitionLog.setWorkerGroup(task.get("workerGroup") == null ? "default" : task.get("workerGroup").asText()); |
| long taskCode = CodeGenerateUtils.getInstance().genCode(); |
| taskDefinitionLog.setCode(taskCode); |
| taskDefinitionLog.setVersion(Constants.VERSION_FIRST); |
| taskDefinitionLog.setProjectCode(processDefinition.getProjectCode()); |
| taskDefinitionLog.setUserId(processDefinition.getUserId()); |
| taskDefinitionLog.setEnvironmentCode(-1); |
| taskDefinitionLog.setDelayTime(0); |
| taskDefinitionLog.setOperator(1); |
| taskDefinitionLog.setOperateTime(now); |
| taskDefinitionLog.setCreateTime(now); |
| taskDefinitionLog.setUpdateTime(now); |
| taskDefinitionLogList.add(taskDefinitionLog); |
| taskIdCodeMap.put(task.get("id").asText(), taskCode); |
| List<String> preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class); |
| taskNamePreMap.put(name, preTasks); |
| taskNameCodeMap.put(name, taskCode); |
| } |
| convertConditions(taskDefinitionLogList, taskNameCodeMap); |
| taskDefinitionLogs.addAll(taskDefinitionLogList); |
| processDefinition.setLocations(convertLocations(processDefinition.getLocations(), taskIdCodeMap)); |
| ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); |
| processDefinitionLog.setOperator(1); |
| processDefinitionLog.setOperateTime(now); |
| processDefinitionLog.setUpdateTime(now); |
| processDefinitionLogs.add(processDefinitionLog); |
| handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap, processDefinition, processTaskRelationLogs); |
| processCodeTaskNameCodeMap.put(processDefinition.getCode(), taskNameCodeMap); |
| processTaskMap.put(entry.getKey(), processCodeTaskNameCodeMap); |
| } |
| } |
| |
| public void convertConditions(List<TaskDefinitionLog> taskDefinitionLogList, Map<String, Long> taskNameCodeMap) throws Exception { |
| for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) { |
| if (TASK_TYPE_CONDITIONS.equals(taskDefinitionLog.getTaskType())) { |
| ObjectMapper objectMapper = new ObjectMapper(); |
| ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams()); |
| // reset conditionResult |
| ObjectNode conditionResult = (ObjectNode) taskParams.get("conditionResult"); |
| List<String> successNode = JSONUtils.toList(conditionResult.get("successNode").toString(), String.class); |
| List<Long> nodeCode = new ArrayList<>(); |
| successNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node))); |
| conditionResult.set("successNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode))); |
| List<String> failedNode = JSONUtils.toList(conditionResult.get("failedNode").toString(), String.class); |
| nodeCode.clear(); |
| failedNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node))); |
| conditionResult.set("failedNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode))); |
| // reset dependItemList |
| ObjectNode dependence = (ObjectNode) taskParams.get("dependence"); |
| ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList"))); |
| for (int i = 0; i < dependTaskList.size(); i++) { |
| ObjectNode dependTask = (ObjectNode) dependTaskList.path(i); |
| ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList"))); |
| for (int j = 0; j < dependItemList.size(); j++) { |
| ObjectNode dependItem = (ObjectNode) dependItemList.path(j); |
| JsonNode depTasks = dependItem.get("depTasks"); |
| dependItem.put("depTaskCode", taskNameCodeMap.get(depTasks.asText())); |
| dependItem.remove("depTasks"); |
| dependItemList.set(j, dependItem); |
| } |
| dependTask.put("dependItemList", dependItemList); |
| dependTaskList.set(i, dependTask); |
| } |
| dependence.put("dependTaskList", dependTaskList); |
| taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams)); |
| } |
| } |
| } |
| |
| private String convertLocations(String locations, Map<String, Long> taskIdCodeMap) { |
| if (StringUtils.isBlank(locations)) { |
| return locations; |
| } |
| Map<String, ObjectNode> locationsMap = JSONUtils.parseObject(locations, new TypeReference<Map<String, ObjectNode>>() { |
| }); |
| if (locationsMap == null) { |
| return locations; |
| } |
| ArrayNode jsonNodes = JSONUtils.createArrayNode(); |
| for (Map.Entry<String, ObjectNode> entry : locationsMap.entrySet()) { |
| ObjectNode nodes = JSONUtils.createObjectNode(); |
| nodes.put("taskCode", taskIdCodeMap.get(entry.getKey())); |
| ObjectNode oldNodes = entry.getValue(); |
| nodes.put("x", oldNodes.get("x").asInt()); |
| nodes.put("y", oldNodes.get("y").asInt()); |
| jsonNodes.add(nodes); |
| } |
| return jsonNodes.toString(); |
| } |
| |
| public void convertDependence(List<TaskDefinitionLog> taskDefinitionLogs, |
| Map<Integer, Long> projectIdCodeMap, |
| Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) { |
| for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { |
| if (TASK_TYPE_DEPENDENT.equals(taskDefinitionLog.getTaskType())) { |
| ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams()); |
| ObjectNode dependence = (ObjectNode) taskParams.get("dependence"); |
| ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList"))); |
| for (int i = 0; i < dependTaskList.size(); i++) { |
| ObjectNode dependTask = (ObjectNode) dependTaskList.path(i); |
| ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList"))); |
| for (int j = 0; j < dependItemList.size(); j++) { |
| ObjectNode dependItem = (ObjectNode) dependItemList.path(j); |
| dependItem.put("projectCode", projectIdCodeMap.get(dependItem.get("projectId").asInt())); |
| int definitionId = dependItem.get("definitionId").asInt(); |
| Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = processTaskMap.get(definitionId); |
| if (processCodeTaskNameCodeMap == null) { |
| logger.warn("We can't find processDefinition [{}], please check it is not exist, remove this dependence", definitionId); |
| dependItemList.remove(j); |
| continue; |
| } |
| Optional<Map.Entry<Long, Map<String, Long>>> mapEntry = processCodeTaskNameCodeMap.entrySet().stream().findFirst(); |
| if (mapEntry.isPresent()) { |
| Map.Entry<Long, Map<String, Long>> processCodeTaskNameCodeEntry = mapEntry.get(); |
| dependItem.put("definitionCode", processCodeTaskNameCodeEntry.getKey()); |
| String depTasks = dependItem.get("depTasks").asText(); |
| long taskCode = "ALL".equals(depTasks) || processCodeTaskNameCodeEntry.getValue() == null ? 0L : processCodeTaskNameCodeEntry.getValue().get(depTasks); |
| dependItem.put("depTaskCode", taskCode); |
| } |
| dependItem.remove("projectId"); |
| dependItem.remove("definitionId"); |
| dependItem.remove("depTasks"); |
| dependItemList.set(j, dependItem); |
| } |
| dependTask.put("dependItemList", dependItemList); |
| dependTaskList.set(i, dependTask); |
| } |
| dependence.put("dependTaskList", dependTaskList); |
| taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams)); |
| } |
| } |
| } |
| |
| private void handleProcessTaskRelation(Map<String, List<String>> taskNamePreMap, |
| Map<String, Long> taskNameCodeMap, |
| ProcessDefinition processDefinition, |
| List<ProcessTaskRelationLog> processTaskRelationLogs) { |
| Date now = new Date(); |
| for (Map.Entry<String, List<String>> entry : taskNamePreMap.entrySet()) { |
| List<String> entryValue = entry.getValue(); |
| if (CollectionUtils.isNotEmpty(entryValue)) { |
| for (String preTaskName : entryValue) { |
| ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now); |
| processTaskRelationLog.setPreTaskCode(taskNameCodeMap.get(preTaskName)); |
| processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST); |
| processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey())); |
| processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); |
| processTaskRelationLogs.add(processTaskRelationLog); |
| } |
| } else { |
| ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now); |
| processTaskRelationLog.setPreTaskCode(0); |
| processTaskRelationLog.setPreTaskVersion(0); |
| processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey())); |
| processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); |
| processTaskRelationLogs.add(processTaskRelationLog); |
| } |
| } |
| } |
| |
| private ProcessTaskRelationLog setProcessTaskRelationLog(ProcessDefinition processDefinition, Date now) { |
| ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); |
| processTaskRelationLog.setProjectCode(processDefinition.getProjectCode()); |
| processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode()); |
| processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); |
| processTaskRelationLog.setConditionType(ConditionType.NONE); |
| processTaskRelationLog.setConditionParams("{}"); |
| processTaskRelationLog.setOperator(1); |
| processTaskRelationLog.setOperateTime(now); |
| processTaskRelationLog.setCreateTime(now); |
| processTaskRelationLog.setUpdateTime(now); |
| return processTaskRelationLog; |
| } |
| } |