blob: 2eb8f84810ea28cc10a84db39a09c6e81c877b88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ENVIRONMENT_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ENVIRONMENT_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ENVIRONMENT_UPDATE;
import org.apache.dolphinscheduler.api.dto.EnvironmentDto;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.EnvironmentService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.SetUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.type.TypeReference;
/**
* task definition service impl
*/
@Service
@Slf4j
public class EnvironmentServiceImpl extends BaseServiceImpl implements EnvironmentService {
@Autowired
private EnvironmentMapper environmentMapper;
@Autowired
private EnvironmentWorkerGroupRelationMapper relationMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
/**
* create environment
*
* @param loginUser login user
* @param name environment name
* @param config environment config
* @param desc environment desc
* @param workerGroups worker groups
*/
@Override
@Transactional
public Long createEnvironment(User loginUser,
String name,
String config,
String desc,
String workerGroups) {
if (!canOperatorPermissions(loginUser, null, AuthorizationType.ENVIRONMENT, ENVIRONMENT_CREATE)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
if (checkDescriptionLength(desc)) {
throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR);
}
checkParams(name, config, workerGroups);
Environment environment = environmentMapper.queryByEnvironmentName(name);
if (environment != null) {
throw new ServiceException(Status.ENVIRONMENT_NAME_EXISTS, name);
}
Environment env = new Environment();
env.setName(name);
env.setConfig(config);
env.setDescription(desc);
env.setOperator(loginUser.getId());
env.setCreateTime(new Date());
env.setUpdateTime(new Date());
long code = 0L;
try {
code = CodeGenerateUtils.genCode();
env.setCode(code);
} catch (CodeGenerateException e) {
log.error("Generate environment code error.", e);
}
if (code == 0L) {
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating environment code");
}
if (environmentMapper.insert(env) > 0) {
if (!StringUtils.isEmpty(workerGroups)) {
List<String> workerGroupList = JSONUtils.parseObject(workerGroups, new TypeReference<List<String>>() {
});
if (CollectionUtils.isNotEmpty(workerGroupList)) {
workerGroupList.stream().forEach(workerGroup -> {
if (!StringUtils.isEmpty(workerGroup)) {
EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation();
relation.setEnvironmentCode(env.getCode());
relation.setWorkerGroup(workerGroup);
relation.setOperator(loginUser.getId());
relation.setCreateTime(new Date());
relation.setUpdateTime(new Date());
relationMapper.insert(relation);
log.info(
"Environment-WorkerGroup relation create complete, environmentName:{}, workerGroup:{}.",
env.getName(), relation.getWorkerGroup());
}
});
}
}
return env.getCode();
}
throw new ServiceException(Status.CREATE_ENVIRONMENT_ERROR);
}
/**
* query environment paging
*
* @param pageNo page number
* @param searchVal search value
* @param pageSize page size
* @return environment list page
*/
@Override
public Result queryEnvironmentListPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal) {
Result<Object> result = new Result();
Page<Environment> page = new Page<>(pageNo, pageSize);
PageInfo<EnvironmentDto> pageInfo = new PageInfo<>(pageNo, pageSize);
IPage<Environment> environmentIPage;
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
environmentIPage = environmentMapper.queryEnvironmentListPaging(page, searchVal);
} else {
Set<Integer> ids = resourcePermissionCheckService
.userOwnedResourceIdsAcquisition(AuthorizationType.ENVIRONMENT, loginUser.getId(), log);
if (ids.isEmpty()) {
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
environmentIPage = environmentMapper.queryEnvironmentListPagingByIds(page, new ArrayList<>(ids), searchVal);
}
pageInfo.setTotal((int) environmentIPage.getTotal());
if (CollectionUtils.isNotEmpty(environmentIPage.getRecords())) {
Map<Long, List<String>> relationMap = relationMapper.selectList(null).stream()
.collect(Collectors.groupingBy(EnvironmentWorkerGroupRelation::getEnvironmentCode,
Collectors.mapping(EnvironmentWorkerGroupRelation::getWorkerGroup, Collectors.toList())));
List<EnvironmentDto> dtoList = environmentIPage.getRecords().stream().map(environment -> {
EnvironmentDto dto = new EnvironmentDto();
BeanUtils.copyProperties(environment, dto);
List<String> workerGroups = relationMap.getOrDefault(environment.getCode(), new ArrayList<String>());
dto.setWorkerGroups(workerGroups);
return dto;
}).collect(Collectors.toList());
pageInfo.setTotalList(dtoList);
} else {
pageInfo.setTotalList(new ArrayList<>());
}
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query all environment
*
* @param loginUser
* @return all environment list
*/
@Override
public Map<String, Object> queryAllEnvironmentList(User loginUser) {
Map<String, Object> result = new HashMap<>();
Set<Integer> ids = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.ENVIRONMENT,
loginUser.getId(), log);
if (ids.isEmpty()) {
result.put(Constants.DATA_LIST, Collections.emptyList());
putMsg(result, Status.SUCCESS);
return result;
}
List<Environment> environmentList = environmentMapper.selectBatchIds(ids);
if (CollectionUtils.isNotEmpty(environmentList)) {
Map<Long, List<String>> relationMap = relationMapper.selectList(null).stream()
.collect(Collectors.groupingBy(EnvironmentWorkerGroupRelation::getEnvironmentCode,
Collectors.mapping(EnvironmentWorkerGroupRelation::getWorkerGroup, Collectors.toList())));
List<EnvironmentDto> dtoList = environmentList.stream().map(environment -> {
EnvironmentDto dto = new EnvironmentDto();
BeanUtils.copyProperties(environment, dto);
List<String> workerGroups = relationMap.getOrDefault(environment.getCode(), new ArrayList<String>());
dto.setWorkerGroups(workerGroups);
return dto;
}).collect(Collectors.toList());
result.put(Constants.DATA_LIST, dtoList);
} else {
result.put(Constants.DATA_LIST, new ArrayList<>());
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query environment
*
* @param code environment code
*/
@Override
public Map<String, Object> queryEnvironmentByCode(Long code) {
Map<String, Object> result = new HashMap<>();
Environment env = environmentMapper.queryByEnvironmentCode(code);
if (env == null) {
putMsg(result, Status.QUERY_ENVIRONMENT_BY_CODE_ERROR, code);
} else {
List<String> workerGroups = relationMapper.queryByEnvironmentCode(env.getCode()).stream()
.map(item -> item.getWorkerGroup())
.collect(Collectors.toList());
EnvironmentDto dto = new EnvironmentDto();
BeanUtils.copyProperties(env, dto);
dto.setWorkerGroups(workerGroups);
result.put(Constants.DATA_LIST, dto);
putMsg(result, Status.SUCCESS);
}
return result;
}
/**
* query environment
*
* @param name environment name
*/
@Override
public Map<String, Object> queryEnvironmentByName(String name) {
Map<String, Object> result = new HashMap<>();
Environment env = environmentMapper.queryByEnvironmentName(name);
if (env == null) {
putMsg(result, Status.QUERY_ENVIRONMENT_BY_NAME_ERROR, name);
} else {
List<String> workerGroups = relationMapper.queryByEnvironmentCode(env.getCode()).stream()
.map(item -> item.getWorkerGroup())
.collect(Collectors.toList());
EnvironmentDto dto = new EnvironmentDto();
BeanUtils.copyProperties(env, dto);
dto.setWorkerGroups(workerGroups);
result.put(Constants.DATA_LIST, dto);
putMsg(result, Status.SUCCESS);
}
return result;
}
/**
* delete environment
*
* @param loginUser login user
* @param code environment code
*/
@Transactional
@Override
public Map<String, Object> deleteEnvironmentByCode(User loginUser, Long code) {
Map<String, Object> result = new HashMap<>();
if (!canOperatorPermissions(loginUser, null, AuthorizationType.ENVIRONMENT, ENVIRONMENT_DELETE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
Long relatedTaskNumber = taskDefinitionMapper
.selectCount(new QueryWrapper<TaskDefinition>().lambda().eq(TaskDefinition::getEnvironmentCode, code));
if (relatedTaskNumber > 0) {
log.warn("Delete environment failed because {} tasks is using it, environmentCode:{}.",
relatedTaskNumber, code);
putMsg(result, Status.DELETE_ENVIRONMENT_RELATED_TASK_EXISTS);
return result;
}
int delete = environmentMapper.deleteByCode(code);
if (delete > 0) {
relationMapper.delete(new QueryWrapper<EnvironmentWorkerGroupRelation>()
.lambda()
.eq(EnvironmentWorkerGroupRelation::getEnvironmentCode, code));
log.info("Environment and relations delete complete, environmentCode:{}.", code);
putMsg(result, Status.SUCCESS);
} else {
log.error("Environment delete error, environmentCode:{}.", code);
putMsg(result, Status.DELETE_ENVIRONMENT_ERROR);
}
return result;
}
/**
* update environment
*
* @param loginUser login user
* @param code environment code
* @param name environment name
* @param config environment config
* @param desc environment desc
* @param workerGroups worker groups
*/
@Transactional
@Override
public Environment updateEnvironmentByCode(User loginUser, Long code, String name, String config,
String desc, String workerGroups) {
if (!canOperatorPermissions(loginUser, null, AuthorizationType.ENVIRONMENT, ENVIRONMENT_UPDATE)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
checkParams(name, config, workerGroups);
if (checkDescriptionLength(desc)) {
throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR);
}
Environment environment = environmentMapper.queryByEnvironmentName(name);
if (environment != null && !environment.getCode().equals(code)) {
throw new ServiceException(Status.ENVIRONMENT_NAME_EXISTS, name);
}
Set<String> workerGroupSet;
if (!StringUtils.isEmpty(workerGroups)) {
workerGroupSet = JSONUtils.parseObject(workerGroups, new TypeReference<Set<String>>() {
});
} else {
workerGroupSet = new TreeSet<>();
}
Set<String> existWorkerGroupSet = relationMapper
.queryByEnvironmentCode(code)
.stream()
.map(EnvironmentWorkerGroupRelation::getWorkerGroup)
.collect(Collectors.toSet());
Set<String> deleteWorkerGroupSet = SetUtils.difference(existWorkerGroupSet, workerGroupSet).toSet();
Set<String> addWorkerGroupSet = SetUtils.difference(workerGroupSet, existWorkerGroupSet).toSet();
// verify whether the relation of this environment and worker groups can be adjusted
checkUsedEnvironmentWorkerGroupRelation(deleteWorkerGroupSet, name, code);
Environment env = new Environment();
env.setCode(code);
env.setName(name);
env.setConfig(config);
env.setDescription(desc);
env.setOperator(loginUser.getId());
env.setUpdateTime(new Date());
int update =
environmentMapper.update(env, new UpdateWrapper<Environment>().lambda().eq(Environment::getCode, code));
if (update <= 0) {
throw new ServiceException(Status.UPDATE_ENVIRONMENT_ERROR, name);
}
deleteWorkerGroupSet.forEach(key -> {
if (StringUtils.isNotEmpty(key)) {
relationMapper.delete(new QueryWrapper<EnvironmentWorkerGroupRelation>()
.lambda()
.eq(EnvironmentWorkerGroupRelation::getEnvironmentCode, code)
.eq(EnvironmentWorkerGroupRelation::getWorkerGroup, key));
}
});
addWorkerGroupSet.forEach(key -> {
if (StringUtils.isNotEmpty(key)) {
EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation();
relation.setEnvironmentCode(code);
relation.setWorkerGroup(key);
relation.setUpdateTime(new Date());
relation.setCreateTime(new Date());
relation.setOperator(loginUser.getId());
relationMapper.insert(relation);
}
});
return env;
}
/**
* verify environment name
*
* @param environmentName environment name
* @return true if the environment name not exists, otherwise return false
*/
@Override
public Map<String, Object> verifyEnvironment(String environmentName) {
Map<String, Object> result = new HashMap<>();
if (StringUtils.isEmpty(environmentName)) {
log.warn("parameter environment name is empty.");
putMsg(result, Status.ENVIRONMENT_NAME_IS_NULL);
return result;
}
Environment environment = environmentMapper.queryByEnvironmentName(environmentName);
if (environment != null) {
log.warn("Environment with the same name already exist, name:{}.", environment.getName());
putMsg(result, Status.ENVIRONMENT_NAME_EXISTS, environmentName);
return result;
}
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}
private void checkUsedEnvironmentWorkerGroupRelation(Set<String> deleteKeySet,
String environmentName, Long environmentCode) {
for (String workerGroup : deleteKeySet) {
List<TaskDefinition> taskDefinitionList = taskDefinitionMapper
.selectList(new QueryWrapper<TaskDefinition>().lambda()
.eq(TaskDefinition::getEnvironmentCode, environmentCode)
.eq(TaskDefinition::getWorkerGroup, workerGroup));
if (Objects.nonNull(taskDefinitionList) && taskDefinitionList.size() != 0) {
Set<String> collect =
taskDefinitionList.stream().map(TaskDefinition::getName).collect(Collectors.toSet());
throw new ServiceException(Status.UPDATE_ENVIRONMENT_WORKER_GROUP_RELATION_ERROR, workerGroup,
environmentName, collect);
}
}
}
protected void checkParams(String name, String config, String workerGroups) {
if (StringUtils.isEmpty(name)) {
throw new ServiceException(Status.ENVIRONMENT_NAME_IS_NULL);
}
if (StringUtils.isEmpty(config)) {
throw new ServiceException(Status.ENVIRONMENT_CONFIG_IS_NULL);
}
if (!StringUtils.isEmpty(workerGroups)) {
List<String> workerGroupList = JSONUtils.parseObject(workerGroups, new TypeReference<List<String>>() {
});
if (Objects.isNull(workerGroupList)) {
throw new ServiceException(Status.ENVIRONMENT_WORKER_GROUPS_IS_INVALID);
}
}
}
}