blob: b65375104f1232dfc76faad9c4371dcf27f529e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkDevelopmentMode;
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.enums.SparkExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApplicationException;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.console.core.entity.ApplicationConfig;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Resource;
import org.apache.streampark.console.core.entity.SavePoint;
import org.apache.streampark.console.core.entity.SparkApplication;
import org.apache.streampark.console.core.entity.SparkEnv;
import org.apache.streampark.console.core.enums.CheckPointTypeEnum;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
import org.apache.streampark.console.core.enums.OperationEnum;
import org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.mapper.SparkApplicationMapper;
import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationConfigService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.CommonService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.ResourceService;
import org.apache.streampark.console.core.service.SparkEnvService;
import org.apache.streampark.console.core.service.VariableService;
import org.apache.streampark.console.core.service.application.SparkApplicationActionService;
import org.apache.streampark.console.core.service.application.SparkApplicationInfoService;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
import org.apache.streampark.spark.client.SparkClient;
import org.apache.streampark.spark.client.bean.CancelRequest;
import org.apache.streampark.spark.client.bean.CancelResponse;
import org.apache.streampark.spark.client.bean.SubmitRequest;
import org.apache.streampark.spark.client.bean.SubmitResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.io.File;
import java.net.URI;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@Slf4j
@Service
public class SparkApplicationActionServiceImpl
extends ServiceImpl<SparkApplicationMapper, SparkApplication>
implements SparkApplicationActionService {
@Qualifier("streamparkDeployExecutor")
@Autowired
private Executor executorService;
@Autowired private SparkApplicationInfoService applicationInfoService;
@Autowired private ApplicationConfigService configService;
@Autowired private ApplicationLogService applicationLogService;
@Autowired private SparkEnvService sparkEnvService;
@Autowired private FlinkSqlService flinkSqlService;
@Autowired private CommonService commonService;
@Autowired private AppBuildPipeService appBuildPipeService;
@Autowired private FlinkClusterService flinkClusterService;
@Autowired private VariableService variableService;
@Autowired private ResourceService resourceService;
@Autowired private FlinkClusterWatcher flinkClusterWatcher;
private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap =
new ConcurrentHashMap<>();
private final Map<Long, CompletableFuture<CancelResponse>> cancelFutureMap =
new ConcurrentHashMap<>();
@Override
public void revoke(Long appId) throws ApplicationException {
SparkApplication application = getById(appId);
ApiAlertException.throwIfNull(
application, String.format("The application id=%s not found, revoke failed.", appId));
// 1) delete files that have been published to workspace
application.getFsOperator().delete(application.getAppHome());
// 2) rollback the files to the workspace
// backUpService.revoke(application);
// 3) restore related status
LambdaUpdateWrapper<SparkApplication> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(SparkApplication::getId, application.getId());
if (application.isSparkSqlJob()) {
updateWrapper.set(SparkApplication::getRelease, ReleaseStateEnum.FAILED.get());
} else {
updateWrapper.set(SparkApplication::getRelease, ReleaseStateEnum.NEED_RELEASE.get());
}
if (!application.isRunning()) {
updateWrapper.set(SparkApplication::getState, FlinkAppStateEnum.REVOKED.getValue());
}
baseMapper.update(null, updateWrapper);
}
@Override
public void restart(SparkApplication appParam) throws Exception {
this.cancel(appParam);
this.start(appParam, false);
}
@Override
public void forcedStop(Long id) {
CompletableFuture<SubmitResponse> startFuture = startFutureMap.remove(id);
CompletableFuture<CancelResponse> cancelFuture = cancelFutureMap.remove(id);
SparkApplication application = this.baseMapper.selectApp(id);
if (startFuture != null) {
startFuture.cancel(true);
}
if (cancelFuture != null) {
cancelFuture.cancel(true);
}
if (startFuture == null && cancelFuture == null) {
this.doStopped(id);
}
}
@Override
public void cancel(SparkApplication appParam) throws Exception {
FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.CANCELLING);
SparkApplication application = getById(appParam.getId());
application.setState(FlinkAppStateEnum.CANCELLING.getValue());
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setOptionName(OperationEnum.CANCEL.getValue());
applicationLog.setAppId(application.getId());
applicationLog.setJobManagerUrl(application.getJobManagerUrl());
applicationLog.setOptionTime(new Date());
applicationLog.setYarnAppId(application.getClusterId());
applicationLog.setUserId(commonService.getUserId());
if (appParam.getSavePointed()) {
FlinkAppHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue());
} else {
application.setOptionState(OptionStateEnum.CANCELLING.getValue());
}
application.setOptionTime(new Date());
this.baseMapper.updateById(application);
Long userId = commonService.getUserId();
if (!application.getUserId().equals(userId)) {
FlinkAppHttpWatcher.addCanceledApp(application.getId(), userId);
}
SparkEnv sparkEnv = sparkEnvService.getById(application.getVersionId());
String clusterId = null;
if (SparkExecutionMode.isYarnMode(application.getExecutionMode())) {
clusterId = application.getAppId();
}
Map<String, Object> properties = new HashMap<>();
if (SparkExecutionMode.isRemoteMode(application.getSparkExecutionMode())) {
FlinkCluster cluster = flinkClusterService.getById(application.getSparkClusterId());
ApiAlertException.throwIfNull(
cluster,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or "
+ "the cluster has been deleted. Please contact the Admin.",
application.getSparkClusterId()));
URI activeAddress = cluster.getRemoteURI();
properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
}
CancelRequest cancelRequest =
new CancelRequest(
application.getId(),
sparkEnv.getSparkVersion(),
SparkExecutionMode.of(application.getExecutionMode()),
properties,
clusterId,
application.getJobId(),
appParam.getDrain(),
appParam.getNativeFormat());
final Date triggerTime = new Date();
CompletableFuture<CancelResponse> cancelFuture =
CompletableFuture.supplyAsync(() -> SparkClient.cancel(cancelRequest), executorService);
cancelFutureMap.put(application.getId(), cancelFuture);
cancelFuture.whenComplete(
(cancelResponse, throwable) -> {
cancelFutureMap.remove(application.getId());
if (throwable != null) {
String exception = ExceptionUtils.stringifyException(throwable);
applicationLog.setException(exception);
applicationLog.setSuccess(false);
applicationLogService.save(applicationLog);
if (throwable instanceof CancellationException) {
doStopped(application.getId());
} else {
log.error("stop flink job failed.", throwable);
application.setOptionState(OptionStateEnum.NONE.getValue());
application.setState(FlinkAppStateEnum.FAILED.getValue());
updateById(application);
FlinkAppHttpWatcher.unWatching(application.getId());
}
return;
}
applicationLog.setSuccess(true);
// save log...
applicationLogService.save(applicationLog);
if (cancelResponse != null && cancelResponse.savePointDir() != null) {
String savePointDir = cancelResponse.savePointDir();
log.info("savePoint path: {}", savePointDir);
SavePoint savePoint = new SavePoint();
savePoint.setPath(savePointDir);
savePoint.setAppId(application.getId());
savePoint.setLatest(true);
savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
savePoint.setCreateTime(new Date());
savePoint.setTriggerTime(triggerTime);
}
});
}
@Override
public void start(SparkApplication appParam, boolean auto) throws Exception {
// 1) check application
final SparkApplication application = getById(appParam.getId());
AssertUtils.notNull(application);
ApiAlertException.throwIfTrue(
!application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly.");
if (SparkExecutionMode.isRemoteMode(application.getSparkExecutionMode())) {
checkBeforeStart(application);
}
if (SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
ApiAlertException.throwIfTrue(
!applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
"[StreamPark] The same task name is already running in the yarn queue");
}
AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
AssertUtils.notNull(buildPipeline);
SparkEnv sparkEnv = sparkEnvService.getByIdOrDefault(application.getVersionId());
ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found flink version");
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
} else {
if (!application.isNeedRestartOnFailed()) {
return;
}
appParam.setSavePointed(true);
application.setRestartCount(application.getRestartCount() + 1);
}
// 2) update app state to starting...
starting(application);
String jobId = new JobID().toHexString();
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setOptionName(OperationEnum.START.getValue());
applicationLog.setAppId(application.getId());
applicationLog.setOptionTime(new Date());
applicationLog.setUserId(commonService.getUserId());
// set the latest to Effective, (it will only become the current effective at this time)
// applicationManageService.toEffective(application);
Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isSparkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
flinkSql.setSql(DeflaterUtils.zipString(realSql));
extraParameter.put(ConfigKeys.KEY_FLINK_SQL(null), flinkSql.getSql());
}
Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(sparkEnv, application);
String flinkUserJar = userJarAndAppConf.f0;
String appConf = userJarAndAppConf.f1;
BuildResult buildResult = buildPipeline.getBuildResult();
if (SparkExecutionMode.YARN_CLUSTER == application.getSparkExecutionMode()
|| SparkExecutionMode.YARN_CLIENT == application.getSparkExecutionMode()) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
}
// Get the args after placeholder replacement
String applicationArgs =
variableService.replaceVariable(application.getTeamId(), application.getArgs());
SubmitRequest submitRequest =
new SubmitRequest(
sparkEnv.getSparkVersion(),
SparkExecutionMode.of(application.getExecutionMode()),
getProperties(application),
sparkEnv.getSparkConf(),
FlinkDevelopmentMode.of(application.getJobType()),
application.getId(),
jobId,
application.getJobName(),
appConf,
application.getApplicationType(),
applicationArgs,
application.getHadoopUser(),
buildResult,
extraParameter);
CompletableFuture<SubmitResponse> future =
CompletableFuture.supplyAsync(() -> SparkClient.submit(submitRequest), executorService);
startFutureMap.put(application.getId(), future);
future.whenComplete(
(response, throwable) -> {
// 1) remove Future
startFutureMap.remove(application.getId());
// 2) exception
if (throwable != null) {
String exception = ExceptionUtils.stringifyException(throwable);
applicationLog.setException(exception);
applicationLog.setSuccess(false);
applicationLogService.save(applicationLog);
if (throwable instanceof CancellationException) {
doStopped(application.getId());
} else {
SparkApplication app = getById(appParam.getId());
app.setState(FlinkAppStateEnum.FAILED.getValue());
app.setOptionState(OptionStateEnum.NONE.getValue());
updateById(app);
FlinkAppHttpWatcher.unWatching(appParam.getId());
}
return;
}
// 3) success
applicationLog.setSuccess(true);
if (response.sparkConfig() != null) {
String jmMemory = response.sparkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
if (jmMemory != null) {
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
}
String tmMemory = response.sparkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
if (tmMemory != null) {
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
}
}
application.setAppId(response.clusterId());
if (StringUtils.isNoneEmpty(response.jobId())) {
application.setJobId(response.jobId());
}
if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
application.setJobManagerUrl(response.jobManagerUrl());
applicationLog.setJobManagerUrl(response.jobManagerUrl());
}
applicationLog.setYarnAppId(response.clusterId());
application.setStartTime(new Date());
application.setEndTime(null);
// if start completed, will be added task to tracking queue
FlinkAppHttpWatcher.setOptionState(appParam.getId(), OptionStateEnum.STARTING);
// FlinkAppHttpWatcher.doWatching(application);
// update app
updateById(application);
// save log
applicationLogService.save(applicationLog);
});
}
/**
* Check whether a job with the same name is running in the yarn queue
*
* @param jobName
* @return
*/
private boolean checkAppRepeatInYarn(String jobName) {
try {
YarnClient yarnClient = HadoopUtils.yarnClient();
Set<String> types =
Sets.newHashSet(
ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName());
EnumSet<YarnApplicationState> states =
EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.ACCEPTED);
List<ApplicationReport> applications = yarnClient.getApplications(types, states);
for (ApplicationReport report : applications) {
if (report.getName().equals(jobName)) {
return true;
}
}
return false;
} catch (Exception e) {
throw new RuntimeException("The yarn api is abnormal. Ensure that yarn is running properly.");
}
}
private void starting(SparkApplication application) {
application.setState(FlinkAppStateEnum.STARTING.getValue());
application.setOptionTime(new Date());
updateById(application);
}
private Tuple2<String, String> getUserJarAndAppConf(
SparkEnv sparkEnv, SparkApplication application) {
SparkExecutionMode executionModeEnum = application.getSparkExecutionMode();
ApplicationConfig applicationConfig = configService.getEffective(application.getId());
ApiAlertException.throwIfNull(
executionModeEnum, "ExecutionMode can't be null, start application failed.");
String flinkUserJar = null;
String appConf = null;
switch (application.getDevelopmentMode()) {
case FLINK_SQL:
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
AssertUtils.notNull(flinkSql);
// 1) dist_userJar
String sqlDistJar = commonService.getSqlClientJar(sparkEnv);
// 2) appConfig
appConf =
applicationConfig == null
? null
: String.format("yaml://%s", applicationConfig.getContent());
// 3) client
if (SparkExecutionMode.YARN_CLUSTER == executionModeEnum) {
String clientPath = Workspace.remote().APP_CLIENT();
flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
}
break;
case PYFLINK:
Resource resource =
resourceService.findByResourceName(application.getTeamId(), application.getJar());
ApiAlertException.throwIfNull(
resource, "pyflink file can't be null, start application failed.");
ApiAlertException.throwIfNull(
resource.getFilePath(), "pyflink file can't be null, start application failed.");
ApiAlertException.throwIfFalse(
resource.getFilePath().endsWith(Constant.PYTHON_SUFFIX),
"pyflink format error, must be a \".py\" suffix, start application failed.");
flinkUserJar = resource.getFilePath();
break;
case CUSTOM_CODE:
if (application.isUploadJob()) {
appConf =
String.format(
"json://{\"%s\":\"%s\"}",
ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
} else {
switch (application.getApplicationType()) {
case STREAMPARK_SPARK:
ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(applicationConfig.getFormat());
if (fileType != null && ConfigFileTypeEnum.UNKNOWN != fileType) {
appConf =
String.format(
"%s://%s", fileType.getTypeName(), applicationConfig.getContent());
} else {
throw new IllegalArgumentException(
"application' config type error,must be ( yaml| properties| hocon )");
}
break;
case APACHE_SPARK:
appConf =
String.format(
"json://{\"%s\":\"%s\"}",
ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
break;
default:
throw new IllegalArgumentException(
"[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
}
}
if (SparkExecutionMode.YARN_CLUSTER == executionModeEnum) {
switch (application.getApplicationType()) {
case STREAMPARK_SPARK:
flinkUserJar =
String.format(
"%s/%s",
application.getAppLib(), application.getModule().concat(Constant.JAR_SUFFIX));
break;
case APACHE_SPARK:
flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
if (!FsOperator.hdfs().exists(flinkUserJar)) {
resource =
resourceService.findByResourceName(
application.getTeamId(), application.getJar());
if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
flinkUserJar =
String.format(
"%s/%s",
application.getAppHome(), new File(resource.getFilePath()).getName());
}
}
break;
default:
throw new IllegalArgumentException(
"[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
}
}
break;
}
return Tuple2.of(flinkUserJar, appConf);
}
private Map<String, Object> getProperties(SparkApplication application) {
Map<String, Object> properties = new HashMap<>(application.getOptionMap());
if (SparkExecutionMode.isRemoteMode(application.getSparkExecutionMode())) {
FlinkCluster cluster = flinkClusterService.getById(application.getSparkClusterId());
ApiAlertException.throwIfNull(
cluster,
String.format(
"The clusterId=%s can't be find, maybe the clusterId is wrong or "
+ "the cluster has been deleted. Please contact the Admin.",
application.getSparkClusterId()));
URI activeAddress = cluster.getRemoteURI();
properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
} else if (SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
String yarnQueue =
(String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE());
String yarnLabelExpr =
(String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL());
Optional.ofNullable(yarnQueue)
.ifPresent(yq -> properties.put(ConfigKeys.KEY_YARN_APP_QUEUE(), yq));
Optional.ofNullable(yarnLabelExpr)
.ifPresent(yLabel -> properties.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), yLabel));
}
if (application.getAllowNonRestored()) {
properties.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true);
}
Map<String, String> dynamicProperties =
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties());
properties.putAll(dynamicProperties);
ResolveOrder resolveOrder = ResolveOrder.of(application.getResolveOrder());
if (resolveOrder != null) {
properties.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(), resolveOrder.getName());
}
return properties;
}
private void doStopped(Long id) {
SparkApplication application = getById(id);
application.setOptionState(OptionStateEnum.NONE.getValue());
application.setState(FlinkAppStateEnum.CANCELED.getValue());
application.setOptionTime(new Date());
updateById(application);
// re-tracking flink job on kubernetes and logging exception
FlinkAppHttpWatcher.unWatching(application.getId());
// kill application
if (SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
try {
List<ApplicationReport> applications =
applicationInfoService.getYarnAppReport(application.getJobName());
if (!applications.isEmpty()) {
YarnClient yarnClient = HadoopUtils.yarnClient();
yarnClient.killApplication(applications.get(0).getApplicationId());
}
} catch (Exception ignored) {
}
}
}
/* check flink cluster before job start job */
private void checkBeforeStart(SparkApplication application) {
SparkEnv sparkEnv = sparkEnvService.getByAppId(application.getId());
ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found flink version");
ApiAlertException.throwIfFalse(
flinkClusterService.existsByFlinkEnvId(sparkEnv.getId()),
"[StreamPark] The flink cluster don't exist, please check it");
FlinkCluster flinkCluster = flinkClusterService.getById(application.getSparkClusterId());
ApiAlertException.throwIfFalse(
flinkClusterWatcher.getClusterState(flinkCluster) == ClusterState.RUNNING,
"[StreamPark] The flink cluster not running, please start it");
}
}