| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.streampark.console.core.service.impl; |
| |
| import org.apache.streampark.console.base.exception.ApiAlertException; |
| import org.apache.streampark.console.core.entity.SparkEnv; |
| import org.apache.streampark.console.core.enums.FlinkEnvCheckEnum; |
| import org.apache.streampark.console.core.mapper.SparkEnvMapper; |
| import org.apache.streampark.console.core.service.FlinkClusterService; |
| import org.apache.streampark.console.core.service.SparkEnvService; |
| import org.apache.streampark.console.core.service.application.ApplicationInfoService; |
| |
| import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
| import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
| import lombok.extern.slf4j.Slf4j; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.stereotype.Service; |
| import org.springframework.transaction.annotation.Propagation; |
| import org.springframework.transaction.annotation.Transactional; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Date; |
| |
| @Slf4j |
| @Service |
| @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class) |
| public class SparkEnvServiceImpl extends ServiceImpl<SparkEnvMapper, SparkEnv> |
| implements SparkEnvService { |
| |
| @Autowired private FlinkClusterService flinkClusterService; |
| @Autowired private ApplicationInfoService applicationInfoService; |
| |
| /** |
| * two places will be checked: <br> |
| * 1) name repeated <br> |
| * 2) spark jars repeated <br> |
| */ |
| @Override |
| public FlinkEnvCheckEnum check(SparkEnv version) { |
| // 1) check name |
| LambdaQueryWrapper<SparkEnv> queryWrapper = |
| new LambdaQueryWrapper<SparkEnv>().eq(SparkEnv::getSparkName, version.getSparkName()); |
| if (version.getId() != null) { |
| queryWrapper.ne(SparkEnv::getId, version.getId()); |
| } |
| if (this.count(queryWrapper) > 0) { |
| return FlinkEnvCheckEnum.NAME_REPEATED; |
| } |
| |
| String lib = version.getSparkHome().concat("/jars"); |
| File sparkLib = new File(lib); |
| // 2) spark/jars path exists and is a directory |
| if (!sparkLib.exists() || !sparkLib.isDirectory()) { |
| return FlinkEnvCheckEnum.INVALID_PATH; |
| } |
| |
| return FlinkEnvCheckEnum.OK; |
| } |
| |
| @Override |
| public boolean create(SparkEnv version) throws Exception { |
| long count = this.baseMapper.selectCount(null); |
| version.setIsDefault(count == 0); |
| version.setCreateTime(new Date()); |
| version.doSetSparkConf(); |
| version.doSetVersion(); |
| return save(version); |
| } |
| |
| @Override |
| public void removeById(Long id) { |
| SparkEnv sparkEnv = getById(id); |
| checkOrElseAlert(sparkEnv); |
| Long count = this.baseMapper.selectCount(null); |
| ApiAlertException.throwIfFalse( |
| !(count > 1 && sparkEnv.getIsDefault()), |
| "The spark home is set as default, please change it first."); |
| |
| this.baseMapper.deleteById(id); |
| } |
| |
| @Override |
| public void update(SparkEnv version) throws IOException { |
| SparkEnv sparkEnv = getById(version.getId()); |
| checkOrElseAlert(sparkEnv); |
| sparkEnv.setDescription(version.getDescription()); |
| sparkEnv.setSparkName(version.getSparkName()); |
| if (!version.getSparkHome().equals(sparkEnv.getSparkHome())) { |
| sparkEnv.setSparkHome(version.getSparkHome()); |
| sparkEnv.doSetSparkConf(); |
| sparkEnv.doSetVersion(); |
| } |
| updateById(sparkEnv); |
| } |
| |
| @Override |
| public void setDefault(Long id) { |
| this.baseMapper.setDefault(id); |
| } |
| |
| @Override |
| public SparkEnv getByAppId(Long appId) { |
| return this.baseMapper.selectByAppId(appId); |
| } |
| |
| @Override |
| public SparkEnv getDefault() { |
| return this.baseMapper.selectOne( |
| new LambdaQueryWrapper<SparkEnv>().eq(SparkEnv::getIsDefault, true)); |
| } |
| |
| @Override |
| public SparkEnv getByIdOrDefault(Long id) { |
| SparkEnv sparkEnv = getById(id); |
| return sparkEnv == null ? getDefault() : sparkEnv; |
| } |
| |
| @Override |
| public void syncConf(Long id) { |
| SparkEnv sparkEnv = getById(id); |
| sparkEnv.doSetSparkConf(); |
| updateById(sparkEnv); |
| } |
| |
| @Override |
| public void validity(Long id) { |
| SparkEnv sparkEnv = getById(id); |
| checkOrElseAlert(sparkEnv); |
| } |
| |
| private void checkOrElseAlert(SparkEnv sparkEnv) { |
| |
| // 1.check exists |
| ApiAlertException.throwIfNull(sparkEnv, "The spark home does not exist, please check."); |
| |
| // todo : To be developed |
| // 2.check if it is being used by any spark cluster |
| // ApiAlertException.throwIfTrue( |
| // flinkClusterService.existsByFlinkEnvId(sparkEnv.getId()), |
| // "The spark home is still in use by some spark cluster, please check."); |
| // |
| // // 3.check if it is being used by any application |
| // ApiAlertException.throwIfTrue( |
| // applicationInfoService.existsBySparkEnvId(sparkEnv.getId()), |
| // "The spark home is still in use by some application, please check."); |
| } |
| } |