blob: bf4e3ac4d60558b7d3096e380bed9d99216f6f83 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.dto.ClusterDto;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.k8s.K8sManager;
import org.apache.dolphinscheduler.api.service.ClusterService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.Cluster;
import org.apache.dolphinscheduler.dao.entity.K8sNamespace;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ClusterMapper;
import org.apache.dolphinscheduler.dao.mapper.K8sNamespaceMapper;
import org.apache.dolphinscheduler.service.utils.ClusterConfUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* cluster definition service impl
*/
@Service
@Slf4j
public class ClusterServiceImpl extends BaseServiceImpl implements ClusterService {
@Autowired
private ClusterMapper clusterMapper;
@Autowired
private K8sManager k8sManager;
@Autowired
private K8sNamespaceMapper k8sNamespaceMapper;
/**
* create cluster
*
* @param loginUser login user
* @param name cluster name
* @param config cluster config
* @param desc cluster desc
*/
@Transactional
@Override
public Long createCluster(User loginUser, String name, String config, String desc) {
if (isNotAdmin(loginUser)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
checkParams(name, config);
Cluster clusterExistByName = clusterMapper.queryByClusterName(name);
if (clusterExistByName != null) {
throw new ServiceException(Status.CLUSTER_NAME_EXISTS, name);
}
Cluster cluster = new Cluster();
cluster.setName(name);
cluster.setConfig(config);
cluster.setDescription(desc);
cluster.setOperator(loginUser.getId());
cluster.setCreateTime(new Date());
cluster.setUpdateTime(new Date());
cluster.setCode(CodeGenerateUtils.genCode());
if (clusterMapper.insert(cluster) > 0) {
return cluster.getCode();
}
throw new ServiceException(Status.CREATE_CLUSTER_ERROR);
}
/**
* query cluster paging
*
* @param pageNo page number
* @param searchVal search value
* @param pageSize page size
* @return cluster list page
*/
@Override
public PageInfo<ClusterDto> queryClusterListPaging(Integer pageNo, Integer pageSize, String searchVal) {
Page<Cluster> page = new Page<>(pageNo, pageSize);
IPage<Cluster> clusterIPage = clusterMapper.queryClusterListPaging(page, searchVal);
PageInfo<ClusterDto> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotal((int) clusterIPage.getTotal());
if (CollectionUtils.isEmpty(clusterIPage.getRecords())) {
return pageInfo;
}
List<ClusterDto> dtoList = clusterIPage.getRecords().stream().map(cluster -> {
ClusterDto dto = new ClusterDto();
BeanUtils.copyProperties(cluster, dto);
return dto;
}).collect(Collectors.toList());
pageInfo.setTotalList(dtoList);
return pageInfo;
}
/**
* query all cluster
*
* @return all cluster list
*/
@Override
public List<ClusterDto> queryAllClusterList() {
List<Cluster> clusterList = clusterMapper.queryAllClusterList();
if (CollectionUtils.isEmpty(clusterList)) {
return Collections.emptyList();
}
return clusterList.stream()
.map(cluster -> {
ClusterDto dto = new ClusterDto();
// todo: Don't use copy
BeanUtils.copyProperties(cluster, dto);
return dto;
}).collect(Collectors.toList());
}
/**
* query cluster
*
* @param code cluster code
*/
@Override
public ClusterDto queryClusterByCode(Long code) {
Cluster cluster = clusterMapper.queryByClusterCode(code);
if (cluster == null) {
throw new ServiceException(Status.QUERY_CLUSTER_BY_CODE_ERROR, code);
}
ClusterDto dto = new ClusterDto();
BeanUtils.copyProperties(cluster, dto);
return dto;
}
/**
* query cluster
*
* @param name cluster name
*/
@Override
public ClusterDto queryClusterByName(String name) {
Cluster cluster = clusterMapper.queryByClusterName(name);
if (cluster == null) {
throw new ServiceException(Status.QUERY_CLUSTER_BY_NAME_ERROR, name);
}
ClusterDto dto = new ClusterDto();
BeanUtils.copyProperties(cluster, dto);
return dto;
}
/**
* delete cluster
*
* @param loginUser login user
* @param code cluster code
*/
@Override
public void deleteClusterByCode(User loginUser, Long code) {
if (isNotAdmin(loginUser)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
Long relatedNamespaceNumber = k8sNamespaceMapper
.selectCount(new QueryWrapper<K8sNamespace>().lambda().eq(K8sNamespace::getClusterCode, code));
if (relatedNamespaceNumber > 0) {
throw new ServiceException(Status.DELETE_CLUSTER_RELATED_NAMESPACE_EXISTS);
}
int delete = clusterMapper.deleteByCode(code);
if (delete > 0) {
return;
}
throw new ServiceException(Status.DELETE_CLUSTER_ERROR);
}
/**
* update cluster
*
* @param loginUser login user
* @param code cluster code
* @param name cluster name
* @param config cluster config
* @param desc cluster desc
*/
@Override
public Cluster updateClusterByCode(User loginUser,
Long code,
String name,
String config,
String desc) {
if (isNotAdmin(loginUser)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
if (checkDescriptionLength(desc)) {
throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR);
}
checkParams(name, config);
Cluster clusterExistByName = clusterMapper.queryByClusterName(name);
if (clusterExistByName != null && !clusterExistByName.getCode().equals(code)) {
throw new ServiceException(Status.CLUSTER_NAME_EXISTS, name);
}
Cluster clusterExist = clusterMapper.queryByClusterCode(code);
if (clusterExist == null) {
throw new ServiceException(Status.CLUSTER_NOT_EXISTS, name);
}
if (!Constants.K8S_LOCAL_TEST_CLUSTER_CODE.equals(clusterExist.getCode())
&& !config.equals(ClusterConfUtils.getK8sConfig(clusterExist.getConfig()))) {
try {
k8sManager.getAndUpdateK8sClient(code, true);
} catch (Exception e) {
throw new ServiceException(Status.K8S_CLIENT_OPS_ERROR, name);
}
}
// update cluster
// need not update relation
clusterExist.setConfig(config);
clusterExist.setName(name);
clusterExist.setDescription(desc);
clusterMapper.updateById(clusterExist);
return clusterExist;
}
/**
* verify cluster name
*
* @param clusterName cluster name
* @return true if the cluster name not exists, otherwise return false
*/
@Override
public void verifyCluster(String clusterName) {
if (StringUtils.isEmpty(clusterName)) {
throw new ServiceException(Status.CLUSTER_NAME_IS_NULL);
}
Cluster cluster = clusterMapper.queryByClusterName(clusterName);
if (cluster != null) {
throw new ServiceException(Status.CLUSTER_NAME_EXISTS);
}
}
protected void checkParams(String name, String config) {
if (StringUtils.isEmpty(name)) {
throw new ServiceException(Status.CLUSTER_NAME_IS_NULL);
}
if (StringUtils.isEmpty(config)) {
throw new ServiceException(Status.CLUSTER_CONFIG_IS_NULL);
}
}
}