blob: 2a4dd9d2e069afa4bc8828198f72c0bdffb43763 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.bean.AppControl;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationConfig;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Resource;
import org.apache.streampark.console.core.enums.CandidateTypeEnum;
import org.apache.streampark.console.core.enums.ChangeTypeEnum;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
import org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.mapper.ApplicationMapper;
import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationConfigService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.CommonService;
import org.apache.streampark.console.core.service.EffectiveService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.ProjectService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.YarnQueueService;
import org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverterStub;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
import org.apache.streampark.console.core.watcher.FlinkK8sObserverStub;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.packer.pipeline.PipelineStatusEnum;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.annotations.VisibleForTesting;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.Bridge.toTrackId;
import static org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.isKubernetesApp;
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class ApplicationManageServiceImpl extends ServiceImpl<ApplicationMapper, Application>
implements ApplicationManageService {
private static final String ERROR_APP_QUEUE_HINT =
"Queue label '%s' isn't available for teamId '%d', please add it into the team first.";
@Autowired private ProjectService projectService;
@Autowired private ApplicationBackUpService backUpService;
@Autowired private ApplicationConfigService configService;
@Autowired private ApplicationLogService applicationLogService;
@Autowired private FlinkSqlService flinkSqlService;
@Autowired private SavePointService savePointService;
@Autowired private EffectiveService effectiveService;
@Autowired private SettingService settingService;
@Autowired private CommonService commonService;
@Autowired private FlinkK8sWatcher k8SFlinkTrackMonitor;
@Autowired private AppBuildPipeService appBuildPipeService;
@Autowired private YarnQueueService yarnQueueService;
@Autowired private ResourceService resourceService;
@Autowired private FlinkK8sObserverStub flinkK8sObserver;
@Autowired private FlinkK8sDataTypeConverterStub flinkK8sDataTypeConverter;
@Autowired private FlinkClusterWatcher flinkClusterWatcher;
@Autowired private FlinkClusterService flinkClusterService;
@PostConstruct
public void resetOptionState() {
this.baseMapper.resetOptionState();
}
@Override
public void toEffective(Application appParam) {
// set latest to Effective
ApplicationConfig config = configService.getLatest(appParam.getId());
if (config != null) {
this.configService.toEffective(appParam.getId(), config.getId());
}
if (appParam.isFlinkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getCandidate(appParam.getId(), null);
if (flinkSql != null) {
flinkSqlService.toEffective(appParam.getId(), flinkSql.getId());
// clean candidate
flinkSqlService.cleanCandidate(flinkSql.getId());
}
}
}
@Override
public void persistMetrics(Application appParam) {
this.baseMapper.persistMetrics(appParam);
}
@Override
public boolean mapping(Application appParam) {
boolean mapping = this.baseMapper.mapping(appParam);
Application application = getById(appParam.getId());
if (isKubernetesApp(application)) {
// todo mark
k8SFlinkTrackMonitor.doWatching(toTrackId(application));
if (K8sFlinkConfig.isV2Enabled()) {
flinkK8sObserver.watchApplication(application);
}
} else {
FlinkAppHttpWatcher.doWatching(application);
}
return mapping;
}
@Override
public Boolean remove(Application appParam) {
Application application = getById(appParam.getId());
// 1) remove flink sql
flinkSqlService.removeByAppId(application.getId());
// 2) remove log
applicationLogService.removeByAppId(application.getId());
// 3) remove config
configService.removeByAppId(application.getId());
// 4) remove effective
effectiveService.removeByAppId(application.getId());
// remove related hdfs
// 5) remove backup
backUpService.remove(application);
// 6) remove savepoint
savePointService.remove(application);
// 7) remove BuildPipeline
appBuildPipeService.removeByAppId(application.getId());
// 8) remove app
removeApp(application);
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(application));
if (K8sFlinkConfig.isV2Enabled()) {
flinkK8sObserver.unWatchById(application.getId());
}
} else {
FlinkAppHttpWatcher.unWatching(appParam.getId());
}
return true;
}
private void removeApp(Application application) {
Long appId = application.getId();
removeById(appId);
try {
application
.getFsOperator()
.delete(application.getWorkspace().APP_WORKSPACE().concat("/").concat(appId.toString()));
// try to delete yarn-application, and leave no trouble.
String path =
Workspace.of(StorageType.HDFS).APP_WORKSPACE().concat("/").concat(appId.toString());
if (HdfsOperator.exists(path)) {
HdfsOperator.delete(path);
}
} catch (Exception e) {
// skip
}
}
@Override
public IPage<Application> page(Application appParam, RestRequest request) {
if (appParam.getTeamId() == null) {
return null;
}
Page<Application> page = new MybatisPager<Application>().getDefaultPage(request);
if (ArrayUtils.isNotEmpty(appParam.getStateArray())
&& Arrays.stream(appParam.getStateArray())
.anyMatch(x -> x == FlinkAppStateEnum.FINISHED.getValue())) {
Integer[] newArray =
ArrayUtils.insert(
appParam.getStateArray().length,
appParam.getStateArray(),
FlinkAppStateEnum.POS_TERMINATED.getValue());
appParam.setStateArray(newArray);
}
this.baseMapper.selectPage(page, appParam);
List<Application> records = page.getRecords();
long now = System.currentTimeMillis();
List<Long> appIds = records.stream().map(Application::getId).collect(Collectors.toList());
Map<Long, PipelineStatusEnum> pipeStates =
appBuildPipeService.listAppIdPipelineStatusMap(appIds);
List<Application> newRecords =
records.stream()
.peek(
record -> {
// status of flink job on kubernetes mode had been automatically persisted to db
// in time.
if (isKubernetesApp(record)) {
// set duration
String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(record));
record.setFlinkRestUrl(restUrl);
if (record.getTracking() == 1
&& record.getStartTime() != null
&& record.getStartTime().getTime() > 0) {
record.setDuration(now - record.getStartTime().getTime());
}
}
if (pipeStates.containsKey(record.getId())) {
record.setBuildStatus(pipeStates.get(record.getId()).getCode());
}
AppControl appControl =
new AppControl()
.setAllowBuild(
record.getBuildStatus() == null
|| !PipelineStatusEnum.running
.getCode()
.equals(record.getBuildStatus()))
.setAllowStart(
!record.shouldTracking()
&& PipelineStatusEnum.success
.getCode()
.equals(record.getBuildStatus()))
.setAllowStop(record.isRunning());
record.setAppControl(appControl);
})
.collect(Collectors.toList());
page.setRecords(newRecords);
return page;
}
@Override
public void changeOwnership(Long userId, Long targetUserId) {
LambdaUpdateWrapper<Application> updateWrapper =
new LambdaUpdateWrapper<Application>()
.eq(Application::getUserId, userId)
.set(Application::getUserId, targetUserId);
this.baseMapper.update(null, updateWrapper);
}
@SneakyThrows
@Override
public boolean create(Application appParam) {
ApiAlertException.throwIfNull(
appParam.getTeamId(), "The teamId can't be null. Create application failed.");
appParam.setUserId(commonService.getUserId());
appParam.setState(FlinkAppStateEnum.ADDED.getValue());
appParam.setRelease(ReleaseStateEnum.NEED_RELEASE.get());
appParam.setOptionState(OptionStateEnum.NONE.getValue());
appParam.setCreateTime(new Date());
appParam.setDefaultModeIngress(settingService.getIngressModeDefault());
boolean success = validateQueueIfNeeded(appParam);
ApiAlertException.throwIfFalse(
success,
String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(), appParam.getTeamId()));
appParam.doSetHotParams();
if (appParam.isUploadJob()) {
String jarPath =
String.format(
"%s/%d/%s", Workspace.local().APP_UPLOADS(), appParam.getTeamId(), appParam.getJar());
if (!new File(jarPath).exists()) {
Resource resource =
resourceService.findByResourceName(appParam.getTeamId(), appParam.getJar());
if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
jarPath = resource.getFilePath();
}
}
appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new File(jarPath)));
}
if (shouldHandleK8sName(appParam)) {
switch (appParam.getFlinkExecutionMode()) {
case KUBERNETES_NATIVE_APPLICATION:
appParam.setK8sName(appParam.getClusterId());
break;
case KUBERNETES_NATIVE_SESSION:
appParam.setK8sName(
flinkK8sDataTypeConverter.genSessionJobK8sCRName(appParam.getClusterId()));
break;
}
}
if (save(appParam)) {
if (appParam.isFlinkSqlJobOrPyFlinkJob()) {
FlinkSql flinkSql = new FlinkSql(appParam);
flinkSqlService.create(flinkSql);
}
if (appParam.getConfig() != null) {
configService.create(appParam, true);
}
return true;
} else {
throw new ApiAlertException("create application failed");
}
}
private boolean shouldHandleK8sName(Application app) {
return K8sFlinkConfig.isV2Enabled()
&& FlinkExecutionMode.isKubernetesMode(app.getExecutionMode());
}
private boolean existsByJobName(String jobName) {
return baseMapper.exists(
new LambdaQueryWrapper<Application>().eq(Application::getJobName, jobName));
}
@SuppressWarnings("checkstyle:WhitespaceAround")
@Override
@SneakyThrows
public Long copy(Application appParam) {
boolean existsByJobName = this.existsByJobName(appParam.getJobName());
ApiAlertException.throwIfFalse(
!existsByJobName,
"[StreamPark] Application names can't be repeated, copy application failed.");
Application oldApp = getById(appParam.getId());
Application newApp = new Application();
String jobName = appParam.getJobName();
newApp.setJobName(jobName);
newApp.setClusterId(
FlinkExecutionMode.isSessionMode(oldApp.getFlinkExecutionMode())
? oldApp.getClusterId()
: jobName);
newApp.setArgs(appParam.getArgs() != null ? appParam.getArgs() : oldApp.getArgs());
newApp.setVersionId(oldApp.getVersionId());
newApp.setFlinkClusterId(oldApp.getFlinkClusterId());
newApp.setRestartSize(oldApp.getRestartSize());
newApp.setJobType(oldApp.getJobType());
newApp.setOptions(oldApp.getOptions());
newApp.setDynamicProperties(oldApp.getDynamicProperties());
newApp.setResolveOrder(oldApp.getResolveOrder());
newApp.setExecutionMode(oldApp.getExecutionMode());
newApp.setFlinkImage(oldApp.getFlinkImage());
newApp.setK8sNamespace(oldApp.getK8sNamespace());
newApp.setK8sRestExposedType(oldApp.getK8sRestExposedType());
newApp.setK8sPodTemplate(oldApp.getK8sPodTemplate());
newApp.setK8sJmPodTemplate(oldApp.getK8sJmPodTemplate());
newApp.setK8sTmPodTemplate(oldApp.getK8sTmPodTemplate());
newApp.setK8sHadoopIntegration(oldApp.getK8sHadoopIntegration());
newApp.setDescription(oldApp.getDescription());
newApp.setAlertId(oldApp.getAlertId());
newApp.setCpFailureAction(oldApp.getCpFailureAction());
newApp.setCpFailureRateInterval(oldApp.getCpFailureRateInterval());
newApp.setCpMaxFailureInterval(oldApp.getCpMaxFailureInterval());
newApp.setMainClass(oldApp.getMainClass());
newApp.setAppType(oldApp.getAppType());
newApp.setResourceFrom(oldApp.getResourceFrom());
newApp.setProjectId(oldApp.getProjectId());
newApp.setModule(oldApp.getModule());
newApp.setUserId(commonService.getUserId());
newApp.setState(FlinkAppStateEnum.ADDED.getValue());
newApp.setRelease(ReleaseStateEnum.NEED_RELEASE.get());
newApp.setOptionState(OptionStateEnum.NONE.getValue());
newApp.setCreateTime(new Date());
newApp.setHotParams(oldApp.getHotParams());
newApp.setJar(oldApp.getJar());
newApp.setJarCheckSum(oldApp.getJarCheckSum());
newApp.setTags(oldApp.getTags());
newApp.setTeamId(oldApp.getTeamId());
boolean saved = save(newApp);
if (saved) {
if (newApp.isFlinkSqlJob()) {
FlinkSql copyFlinkSql = flinkSqlService.getLatestFlinkSql(appParam.getId(), true);
newApp.setFlinkSql(copyFlinkSql.getSql());
newApp.setTeamResource(copyFlinkSql.getTeamResource());
newApp.setDependency(copyFlinkSql.getDependency());
FlinkSql flinkSql = new FlinkSql(newApp);
flinkSqlService.create(flinkSql);
}
ApplicationConfig copyConfig = configService.getEffective(appParam.getId());
if (copyConfig != null) {
ApplicationConfig config = new ApplicationConfig();
config.setAppId(newApp.getId());
config.setFormat(copyConfig.getFormat());
config.setContent(copyConfig.getContent());
config.setCreateTime(new Date());
config.setVersion(1);
configService.save(config);
configService.setLatestOrEffective(true, config.getId(), newApp.getId());
}
return newApp.getId();
} else {
throw new ApiAlertException(
"create application from copy failed, copy source app: " + oldApp.getJobName());
}
}
@Override
public boolean update(Application appParam) {
Application application = getById(appParam.getId());
/* If the original mode is remote, k8s-session, yarn-session, check cluster status */
FlinkExecutionMode flinkExecutionMode = application.getFlinkExecutionMode();
switch (flinkExecutionMode) {
case REMOTE:
case YARN_SESSION:
case KUBERNETES_NATIVE_SESSION:
FlinkCluster flinkCluster = flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfFalse(
flinkClusterWatcher.getClusterState(flinkCluster) == ClusterState.RUNNING,
"[StreamPark] update failed, because bind flink cluster not running");
break;
default:
}
boolean success = validateQueueIfNeeded(application, appParam);
ApiAlertException.throwIfFalse(
success,
String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(), appParam.getTeamId()));
application.setRelease(ReleaseStateEnum.NEED_RELEASE.get());
// 1) jar job jar file changed
if (application.isUploadJob()) {
if (!Objects.equals(application.getJar(), appParam.getJar())) {
application.setBuild(true);
} else {
File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
if (jarFile.exists()) {
try {
long checkSum = org.apache.commons.io.FileUtils.checksumCRC32(jarFile);
if (!Objects.equals(checkSum, application.getJarCheckSum())) {
application.setBuild(true);
}
} catch (IOException e) {
log.error("Error in checksumCRC32 for {}.", jarFile);
throw new RuntimeException(e);
}
}
}
}
// 2) k8s podTemplate changed.
if (application.getBuild() && isK8sPodTemplateChanged(application, appParam)) {
application.setBuild(true);
}
// 3) flink version changed
if (!application.getBuild()
&& !Objects.equals(application.getVersionId(), appParam.getVersionId())) {
application.setBuild(true);
}
// 4) yarn application mode change
if (!application.getBuild() && isYarnApplicationModeChange(application, appParam)) {
application.setBuild(true);
}
appParam.setJobType(application.getJobType());
// changes to the following parameters need to be re-release to take effect
application.setJobName(appParam.getJobName());
application.setVersionId(appParam.getVersionId());
application.setArgs(appParam.getArgs());
application.setOptions(appParam.getOptions());
application.setDynamicProperties(appParam.getDynamicProperties());
application.setResolveOrder(appParam.getResolveOrder());
application.setExecutionMode(appParam.getExecutionMode());
application.setClusterId(appParam.getClusterId());
application.setFlinkImage(appParam.getFlinkImage());
application.setK8sNamespace(appParam.getK8sNamespace());
application.updateHotParams(appParam);
application.setK8sRestExposedType(appParam.getK8sRestExposedType());
application.setK8sPodTemplate(appParam.getK8sPodTemplate());
application.setK8sJmPodTemplate(appParam.getK8sJmPodTemplate());
application.setK8sTmPodTemplate(appParam.getK8sTmPodTemplate());
application.setK8sHadoopIntegration(appParam.getK8sHadoopIntegration());
// changes to the following parameters do not affect running tasks
application.setModifyTime(new Date());
application.setDescription(appParam.getDescription());
application.setAlertId(appParam.getAlertId());
application.setRestartSize(appParam.getRestartSize());
application.setCpFailureAction(appParam.getCpFailureAction());
application.setCpFailureRateInterval(appParam.getCpFailureRateInterval());
application.setCpMaxFailureInterval(appParam.getCpMaxFailureInterval());
application.setTags(appParam.getTags());
switch (appParam.getFlinkExecutionMode()) {
case YARN_APPLICATION:
case YARN_PER_JOB:
case KUBERNETES_NATIVE_APPLICATION:
application.setFlinkClusterId(null);
break;
case REMOTE:
case YARN_SESSION:
case KUBERNETES_NATIVE_SESSION:
application.setFlinkClusterId(appParam.getFlinkClusterId());
break;
default:
break;
}
if (shouldHandleK8sName(appParam)) {
switch (appParam.getFlinkExecutionMode()) {
case KUBERNETES_NATIVE_APPLICATION:
application.setK8sName(appParam.getClusterId());
break;
case KUBERNETES_NATIVE_SESSION:
if (!Objects.equals(application.getClusterId(), appParam.getClusterId())) {
application.setK8sName(
flinkK8sDataTypeConverter.genSessionJobK8sCRName(appParam.getClusterId()));
}
break;
}
}
// Flink Sql job...
if (application.isFlinkSqlJob()) {
updateFlinkSqlJob(application, appParam);
return true;
}
if (application.isStreamParkJob()) {
configService.update(appParam, application.isRunning());
} else {
application.setJar(appParam.getJar());
application.setMainClass(appParam.getMainClass());
}
this.updateById(application);
return true;
}
/**
* update FlinkSql type jobs, there are 3 aspects to consider<br>
* 1. flink sql has changed <br>
* 2. dependency has changed<br>
* 3. parameter has changed<br>
*
* @param application
* @param appParam
*/
private void updateFlinkSqlJob(Application application, Application appParam) {
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(application.getId(), true);
if (effectiveFlinkSql == null) {
effectiveFlinkSql = flinkSqlService.getCandidate(application.getId(), CandidateTypeEnum.NEW);
flinkSqlService.removeById(effectiveFlinkSql.getId());
FlinkSql sql = new FlinkSql(appParam);
flinkSqlService.create(sql);
application.setBuild(true);
} else {
// get previous flink sql and decode
FlinkSql copySourceFlinkSql = flinkSqlService.getById(appParam.getSqlId());
ApiAlertException.throwIfNull(
copySourceFlinkSql, "Flink sql is null, update flink sql job failed.");
copySourceFlinkSql.decode();
// get submit flink sql
FlinkSql targetFlinkSql = new FlinkSql(appParam);
// judge sql and dependency has changed
ChangeTypeEnum changeTypeEnum = copySourceFlinkSql.checkChange(targetFlinkSql);
log.info("updateFlinkSqlJob changeTypeEnum: {}", changeTypeEnum);
// if has been changed
if (changeTypeEnum.hasChanged()) {
// check if there is a candidate version for the newly added record
FlinkSql newFlinkSql =
flinkSqlService.getCandidate(application.getId(), CandidateTypeEnum.NEW);
// If the candidate version of the new record exists, it will be deleted directly,
// and only one candidate version will be retained. If the new candidate version is not
// effective,
// if it is edited again and the next record comes in, the previous candidate version will
// be deleted.
if (newFlinkSql != null) {
// delete all records about candidates
flinkSqlService.removeById(newFlinkSql.getId());
}
FlinkSql historyFlinkSql =
flinkSqlService.getCandidate(application.getId(), CandidateTypeEnum.HISTORY);
// remove candidate flags that already exist but are set as candidates
if (historyFlinkSql != null) {
flinkSqlService.cleanCandidate(historyFlinkSql.getId());
}
FlinkSql sql = new FlinkSql(appParam);
flinkSqlService.create(sql);
if (changeTypeEnum.isDependencyChanged()) {
application.setBuild(true);
}
} else {
// judge version has changed
boolean versionChanged = !effectiveFlinkSql.getId().equals(appParam.getSqlId());
if (versionChanged) {
// sql and dependency not changed, but version changed, means that rollback to the version
CandidateTypeEnum type = CandidateTypeEnum.HISTORY;
flinkSqlService.setCandidate(type, appParam.getId(), appParam.getSqlId());
application.setRelease(ReleaseStateEnum.NEED_ROLLBACK.get());
application.setBuild(true);
}
}
}
this.updateById(application);
this.configService.update(appParam, application.isRunning());
}
@Override
public void updateRelease(Application appParam) {
LambdaUpdateWrapper<Application> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(Application::getId, appParam.getId());
updateWrapper.set(Application::getRelease, appParam.getRelease());
updateWrapper.set(Application::getBuild, appParam.getBuild());
if (appParam.getOptionState() != null) {
updateWrapper.set(Application::getOptionState, appParam.getOptionState());
}
this.update(updateWrapper);
}
@Override
public List<Application> listByProjectId(Long id) {
return baseMapper.selectAppsByProjectId(id);
}
@Override
public List<Application> listByTeamId(Long teamId) {
return baseMapper.selectAppsByTeamId(teamId);
}
@Override
public List<Application> listByTeamIdAndExecutionModes(
Long teamId, @Nonnull Collection<FlinkExecutionMode> executionModeEnums) {
return getBaseMapper()
.selectList(
new LambdaQueryWrapper<Application>()
.eq((SFunction<Application, Long>) Application::getTeamId, teamId)
.in(
Application::getExecutionMode,
executionModeEnums.stream()
.map(FlinkExecutionMode::getMode)
.collect(Collectors.toSet())));
}
public List<Application> listProbeApps() {
return this.baseMapper.selectProbeApps();
}
@Override
public boolean checkBuildAndUpdate(Application appParam) {
boolean build = appParam.getBuild();
if (!build) {
LambdaUpdateWrapper<Application> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(Application::getId, appParam.getId());
if (appParam.isRunning()) {
updateWrapper.set(Application::getRelease, ReleaseStateEnum.NEED_RESTART.get());
} else {
updateWrapper.set(Application::getRelease, ReleaseStateEnum.DONE.get());
updateWrapper.set(Application::getOptionState, OptionStateEnum.NONE.getValue());
}
this.update(updateWrapper);
// backup
if (appParam.isFlinkSqlJob()) {
FlinkSql newFlinkSql =
flinkSqlService.getCandidate(appParam.getId(), CandidateTypeEnum.NEW);
if (!appParam.isNeedRollback() && newFlinkSql != null) {
backUpService.backup(appParam, newFlinkSql);
}
}
// If the current task is not running, or the task has just been added,
// directly set the candidate version to the official version
FlinkSql flinkSql = flinkSqlService.getEffective(appParam.getId(), false);
if (!appParam.isRunning() || flinkSql == null) {
this.toEffective(appParam);
}
}
return build;
}
@Override
public void clean(Application appParam) {
appParam.setRelease(ReleaseStateEnum.DONE.get());
this.updateRelease(appParam);
}
@Override
public Application getApp(Application appParam) {
Application application = this.baseMapper.selectApp(appParam);
ApplicationConfig config = configService.getEffective(appParam.getId());
config = config == null ? configService.getLatest(appParam.getId()) : config;
if (config != null) {
config.setToApplication(application);
}
if (application.isFlinkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
if (flinkSql == null) {
flinkSql = flinkSqlService.getCandidate(application.getId(), CandidateTypeEnum.NEW);
flinkSql.setSql(DeflaterUtils.unzipString(flinkSql.getSql()));
}
flinkSql.setToApplication(application);
} else {
if (application.isCICDJob()) {
String path =
this.projectService.getAppConfPath(application.getProjectId(), application.getModule());
application.setConfPath(path);
}
}
// add flink web url info for k8s-mode
if (isKubernetesApp(application)) {
String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(application));
application.setFlinkRestUrl(restUrl);
// set duration
long now = System.currentTimeMillis();
if (application.getTracking() == 1
&& application.getStartTime() != null
&& application.getStartTime().getTime() > 0) {
application.setDuration(now - application.getStartTime().getTime());
}
}
application.setYarnQueueByHotParams();
return application;
}
/**
* Check queue label validation when create the application if needed.
*
* @param appParam the app to create.
* @return <code>true</code> if validate it successfully, <code>false</code> else.
*/
@VisibleForTesting
public boolean validateQueueIfNeeded(Application appParam) {
yarnQueueService.checkQueueLabel(appParam.getFlinkExecutionMode(), appParam.getYarnQueue());
if (!isYarnNotDefaultQueue(appParam)) {
return true;
}
return yarnQueueService.existByTeamIdQueueLabel(appParam.getTeamId(), appParam.getYarnQueue());
}
/**
* Check queue label validation when update the application if needed.
*
* @param oldApp the old app to update.
* @param newApp the new app payload.
* @return <code>true</code> if validate it successfully, <code>false</code> else.
*/
@VisibleForTesting
public boolean validateQueueIfNeeded(Application oldApp, Application newApp) {
yarnQueueService.checkQueueLabel(newApp.getFlinkExecutionMode(), newApp.getYarnQueue());
if (!isYarnNotDefaultQueue(newApp)) {
return true;
}
oldApp.setYarnQueueByHotParams();
if (FlinkExecutionMode.isYarnPerJobOrAppMode(newApp.getFlinkExecutionMode())
&& StringUtils.equals(oldApp.getYarnQueue(), newApp.getYarnQueue())) {
return true;
}
return yarnQueueService.existByTeamIdQueueLabel(newApp.getTeamId(), newApp.getYarnQueue());
}
/**
* Judge the execution mode whether is the Yarn PerJob or Application mode with not default or
* empty queue label.
*
* @param application application entity.
* @return If the executionMode is (Yarn PerJob or application mode) and the queue label is not
* (empty or default), return true, false else.
*/
private boolean isYarnNotDefaultQueue(Application application) {
return FlinkExecutionMode.isYarnPerJobOrAppMode(application.getFlinkExecutionMode())
&& !yarnQueueService.isDefaultQueue(application.getYarnQueue());
}
private boolean isK8sPodTemplateChanged(Application application, Application appParam) {
return FlinkExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& (ObjectUtils.trimNoEquals(
application.getK8sRestExposedType(), appParam.getK8sRestExposedType())
|| ObjectUtils.trimNoEquals(
application.getK8sJmPodTemplate(), appParam.getK8sJmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sTmPodTemplate(), appParam.getK8sTmPodTemplate())
|| ObjectUtils.trimNoEquals(
application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
|| ObjectUtils.trimNoEquals(
application.getK8sHadoopIntegration(), appParam.getK8sHadoopIntegration())
|| ObjectUtils.trimNoEquals(application.getFlinkImage(), appParam.getFlinkImage()));
}
private boolean isYarnApplicationModeChange(Application application, Application appParam) {
return !application.getExecutionMode().equals(appParam.getExecutionMode())
&& (FlinkExecutionMode.YARN_APPLICATION == appParam.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode());
}
}