blob: bfe2398d8a87454a9e41fd3a58c2e96a954e375c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.core.entity.SparkEnv;
import org.apache.streampark.console.core.enums.FlinkEnvCheckEnum;
import org.apache.streampark.console.core.mapper.SparkEnvMapper;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.application.ApplicationInfoService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.io.IOException;
import java.util.Date;
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class SparkEnvServiceImpl extends ServiceImpl<SparkEnvMapper, SparkEnv>
implements SparkEnvService {
@Autowired private FlinkClusterService flinkClusterService;
@Autowired private ApplicationInfoService applicationInfoService;
/**
* two places will be checked: <br>
* 1) name repeated <br>
* 2) spark jars repeated <br>
*/
@Override
public FlinkEnvCheckEnum check(SparkEnv version) {
// 1) check name
LambdaQueryWrapper<SparkEnv> queryWrapper =
new LambdaQueryWrapper<SparkEnv>().eq(SparkEnv::getSparkName, version.getSparkName());
if (version.getId() != null) {
queryWrapper.ne(SparkEnv::getId, version.getId());
}
if (this.count(queryWrapper) > 0) {
return FlinkEnvCheckEnum.NAME_REPEATED;
}
String lib = version.getSparkHome().concat("/jars");
File sparkLib = new File(lib);
// 2) spark/jars path exists and is a directory
if (!sparkLib.exists() || !sparkLib.isDirectory()) {
return FlinkEnvCheckEnum.INVALID_PATH;
}
return FlinkEnvCheckEnum.OK;
}
@Override
public boolean create(SparkEnv version) throws Exception {
long count = this.baseMapper.selectCount(null);
version.setIsDefault(count == 0);
version.setCreateTime(new Date());
version.doSetSparkConf();
version.doSetVersion();
return save(version);
}
@Override
public void removeById(Long id) {
SparkEnv sparkEnv = getById(id);
checkOrElseAlert(sparkEnv);
Long count = this.baseMapper.selectCount(null);
ApiAlertException.throwIfFalse(
!(count > 1 && sparkEnv.getIsDefault()),
"The spark home is set as default, please change it first.");
this.baseMapper.deleteById(id);
}
@Override
public void update(SparkEnv version) throws IOException {
SparkEnv sparkEnv = getById(version.getId());
checkOrElseAlert(sparkEnv);
sparkEnv.setDescription(version.getDescription());
sparkEnv.setSparkName(version.getSparkName());
if (!version.getSparkHome().equals(sparkEnv.getSparkHome())) {
sparkEnv.setSparkHome(version.getSparkHome());
sparkEnv.doSetSparkConf();
sparkEnv.doSetVersion();
}
updateById(sparkEnv);
}
@Override
public void setDefault(Long id) {
this.baseMapper.setDefault(id);
}
@Override
public SparkEnv getByAppId(Long appId) {
return this.baseMapper.selectByAppId(appId);
}
@Override
public SparkEnv getDefault() {
return this.baseMapper.selectOne(
new LambdaQueryWrapper<SparkEnv>().eq(SparkEnv::getIsDefault, true));
}
@Override
public SparkEnv getByIdOrDefault(Long id) {
SparkEnv sparkEnv = getById(id);
return sparkEnv == null ? getDefault() : sparkEnv;
}
@Override
public void syncConf(Long id) {
SparkEnv sparkEnv = getById(id);
sparkEnv.doSetSparkConf();
updateById(sparkEnv);
}
@Override
public void validity(Long id) {
SparkEnv sparkEnv = getById(id);
checkOrElseAlert(sparkEnv);
}
private void checkOrElseAlert(SparkEnv sparkEnv) {
// 1.check exists
ApiAlertException.throwIfNull(sparkEnv, "The spark home does not exist, please check.");
// todo : To be developed
// 2.check if it is being used by any spark cluster
// ApiAlertException.throwIfTrue(
// flinkClusterService.existsByFlinkEnvId(sparkEnv.getId()),
// "The spark home is still in use by some spark cluster, please check.");
//
// // 3.check if it is being used by any application
// ApiAlertException.throwIfTrue(
// applicationInfoService.existsBySparkEnvId(sparkEnv.getId()),
// "The spark home is still in use by some application, please check.");
}
}