blob: f553e72297bf9cef395ccaba91b22ede849b87bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.DevelopmentMode;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
import org.apache.streampark.common.fs.LfsOperator;
import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.common.util.FlinkUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.streampark.console.base.exception.ApplicationException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.base.util.CommonUtils;
import org.apache.streampark.console.base.util.ObjectUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationConfig;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.FlinkSql;
import org.apache.streampark.console.core.entity.Project;
import org.apache.streampark.console.core.entity.SavePoint;
import org.apache.streampark.console.core.enums.AppExistsState;
import org.apache.streampark.console.core.enums.CandidateType;
import org.apache.streampark.console.core.enums.ChangedType;
import org.apache.streampark.console.core.enums.CheckPointType;
import org.apache.streampark.console.core.enums.ConfigFileType;
import org.apache.streampark.console.core.enums.FlinkAppState;
import org.apache.streampark.console.core.enums.LaunchState;
import org.apache.streampark.console.core.enums.OptionState;
import org.apache.streampark.console.core.mapper.ApplicationMapper;
import org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.runner.EnvInitializer;
import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationBackUpService;
import org.apache.streampark.console.core.service.ApplicationConfigService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.CommonService;
import org.apache.streampark.console.core.service.EffectiveService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.FlinkSqlService;
import org.apache.streampark.console.core.service.LogClientService;
import org.apache.streampark.console.core.service.ProjectService;
import org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher;
import org.apache.streampark.flink.core.conf.ParameterCli;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.kubernetes.IngressController;
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV;
import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
import org.apache.streampark.flink.packer.pipeline.DockerImageBuildResponse;
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
import org.apache.streampark.flink.submit.FlinkSubmitter;
import org.apache.streampark.flink.submit.bean.CancelRequest;
import org.apache.streampark.flink.submit.bean.CancelResponse;
import org.apache.streampark.flink.submit.bean.KubernetesSubmitParam;
import org.apache.streampark.flink.submit.bean.SubmitRequest;
import org.apache.streampark.flink.submit.bean.SubmitResponse;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.fabric8.kubernetes.client.KubernetesClientException;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.PostConstruct;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.jar.Manifest;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.streampark.common.enums.StorageType.LFS;
import static org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.Bridge.toTrackId;
import static org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper.isKubernetesApp;
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Application>
implements ApplicationService {
private final Map<Long, Long> tailOutMap = new ConcurrentHashMap<>();
private final Map<Long, Boolean> tailBeginning = new ConcurrentHashMap<>();
private static final int DEFAULT_HISTORY_RECORD_LIMIT = 25;
private static final int DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT = 5;
private final ExecutorService executorService =
new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
ThreadUtils.threadFactory("streampark-deploy-executor"),
new ThreadPoolExecutor.AbortPolicy());
private static final Pattern JOB_NAME_PATTERN =
Pattern.compile("^[.\\x{4e00}-\\x{9fa5}A-Za-z\\d_\\-\\s]+$");
private static final Pattern SINGLE_SPACE_PATTERN = Pattern.compile("^\\S+(\\s\\S+)*$");
@Autowired private ProjectService projectService;
@Autowired private ApplicationBackUpService backUpService;
@Autowired private ApplicationConfigService configService;
@Autowired private ApplicationLogService applicationLogService;
@Autowired private FlinkEnvService flinkEnvService;
@Autowired private FlinkSqlService flinkSqlService;
@Autowired private SavePointService savePointService;
@Autowired private EffectiveService effectiveService;
@Autowired private SettingService settingService;
@Autowired private CommonService commonService;
@Autowired private EnvInitializer envInitializer;
@Autowired private FlinkK8sWatcher k8SFlinkTrackMonitor;
@Autowired private AppBuildPipeService appBuildPipeService;
@Autowired private FlinkClusterService flinkClusterService;
@Autowired private VariableService variableService;
@Autowired private LogClientService logClient;
@PostConstruct
public void resetOptionState() {
this.baseMapper.resetOptionState();
}
private final Map<Long, CompletableFuture<SubmitResponse>> startFutureMap =
new ConcurrentHashMap<>();
private final Map<Long, CompletableFuture<CancelResponse>> cancelFutureMap =
new ConcurrentHashMap<>();
@Override
public Map<String, Serializable> dashboard(Long teamId) {
JobsOverview.Task overview = new JobsOverview.Task();
Integer totalJmMemory = 0;
Integer totalTmMemory = 0;
Integer totalTm = 0;
Integer totalSlot = 0;
Integer availableSlot = 0;
Integer runningJob = 0;
// stat metrics from other than kubernetes mode
for (Application app : FlinkRESTAPIWatcher.getWatchingApps()) {
if (!teamId.equals(app.getTeamId())) {
continue;
}
if (app.getJmMemory() != null) {
totalJmMemory += app.getJmMemory();
}
if (app.getTmMemory() != null) {
totalTmMemory += app.getTmMemory() * (app.getTotalTM() == null ? 1 : app.getTotalTM());
}
if (app.getTotalTM() != null) {
totalTm += app.getTotalTM();
}
if (app.getTotalSlot() != null) {
totalSlot += app.getTotalSlot();
}
if (app.getAvailableSlot() != null) {
availableSlot += app.getAvailableSlot();
}
if (app.getState() == FlinkAppState.RUNNING.getValue()) {
runningJob++;
}
JobsOverview.Task task = app.getOverview();
if (task != null) {
overview.setTotal(overview.getTotal() + task.getTotal());
overview.setCreated(overview.getCreated() + task.getCreated());
overview.setScheduled(overview.getScheduled() + task.getScheduled());
overview.setDeploying(overview.getDeploying() + task.getDeploying());
overview.setRunning(overview.getRunning() + task.getRunning());
overview.setFinished(overview.getFinished() + task.getFinished());
overview.setCanceling(overview.getCanceling() + task.getCanceling());
overview.setCanceled(overview.getCanceled() + task.getCanceled());
overview.setFailed(overview.getFailed() + task.getFailed());
overview.setReconciling(overview.getReconciling() + task.getReconciling());
}
}
// merge metrics from flink kubernetes cluster
FlinkMetricCV k8sMetric = k8SFlinkTrackMonitor.getAccClusterMetrics();
totalJmMemory += k8sMetric.totalJmMemory();
totalTmMemory += k8sMetric.totalTmMemory();
totalTm += k8sMetric.totalTm();
totalSlot += k8sMetric.totalSlot();
availableSlot += k8sMetric.availableSlot();
runningJob += k8sMetric.runningJob();
overview.setTotal(overview.getTotal() + k8sMetric.totalJob());
overview.setRunning(overview.getRunning() + k8sMetric.runningJob());
overview.setFinished(overview.getFinished() + k8sMetric.finishedJob());
overview.setCanceled(overview.getCanceled() + k8sMetric.cancelledJob());
overview.setFailed(overview.getFailed() + k8sMetric.failedJob());
Map<String, Serializable> map = new HashMap<>(8);
map.put("task", overview);
map.put("jmMemory", totalJmMemory);
map.put("tmMemory", totalTmMemory);
map.put("totalTM", totalTm);
map.put("availableSlot", availableSlot);
map.put("totalSlot", totalSlot);
map.put("runningJob", runningJob);
return map;
}
@Override
public void tailMvnDownloading(Long id) {
this.tailOutMap.put(id, id);
// the first time, will be read from the beginning of the buffer. Only once
this.tailBeginning.put(id, true);
}
@Override
public String upload(MultipartFile file) throws Exception {
File temp = WebUtils.getAppTempDir();
String fileName = FilenameUtils.getName(Objects.requireNonNull(file.getOriginalFilename()));
File saveFile = new File(temp, fileName);
// delete when exists
if (saveFile.exists()) {
saveFile.delete();
}
// save file to temp dir
try {
file.transferTo(saveFile);
} catch (Exception e) {
throw new ApiDetailException(e);
}
return saveFile.getAbsolutePath();
}
@Override
public void toEffective(Application application) {
// set latest to Effective
ApplicationConfig config = configService.getLatest(application.getId());
if (config != null) {
this.configService.toEffective(application.getId(), config.getId());
}
if (application.isFlinkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getCandidate(application.getId(), null);
if (flinkSql != null) {
flinkSqlService.toEffective(application.getId(), flinkSql.getId());
// clean candidate
flinkSqlService.cleanCandidate(flinkSql.getId());
}
}
}
@Override
public void revoke(Application appParma) throws ApplicationException {
Application application = getById(appParma.getId());
AssertUtils.state(
application != null,
String.format("The application id=%s cannot be find in the database.", appParma.getId()));
// 1) delete files that have been published to workspace
application.getFsOperator().delete(application.getAppHome());
// 2) rollback the files to the workspace
backUpService.revoke(application);
// 3) restore related status
LambdaUpdateWrapper<Application> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(Application::getId, application.getId());
if (application.isFlinkSqlJob()) {
updateWrapper.set(Application::getLaunch, LaunchState.FAILED.get());
} else {
updateWrapper.set(Application::getLaunch, LaunchState.NEED_LAUNCH.get());
}
if (!application.isRunning()) {
updateWrapper.set(Application::getState, FlinkAppState.REVOKED.getValue());
}
baseMapper.update(null, updateWrapper);
}
@Override
@Transactional(rollbackFor = {Exception.class})
public Boolean delete(Application paramApp) {
Application application = getById(paramApp.getId());
// 1) remove flink sql
flinkSqlService.removeApp(application.getId());
// 2) remove log
applicationLogService.removeApp(application.getId());
// 3) remove config
configService.removeApp(application.getId());
// 4) remove effective
effectiveService.removeApp(application.getId());
// remove related hdfs
// 5) remove backup
backUpService.removeApp(application);
// 6) remove savepoint
savePointService.removeApp(application);
// 7) remove BuildPipeline
appBuildPipeService.removeApp(application.getId());
// 8) remove app
removeApp(application);
if (isKubernetesApp(paramApp)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(application));
} else {
FlinkRESTAPIWatcher.unWatching(paramApp.getId());
}
return true;
}
@Override
public void restart(Application application) throws Exception {
this.cancel(application);
this.start(application, false);
}
@Override
public boolean checkEnv(Application appParam) throws ApplicationException {
Application application = getById(appParam.getId());
try {
FlinkEnv flinkEnv;
if (application.getVersionId() != null) {
flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
} else {
flinkEnv = flinkEnvService.getDefault();
}
if (flinkEnv == null) {
return false;
}
envInitializer.checkFlinkEnv(application.getStorageType(), flinkEnv);
envInitializer.storageInitialize(application.getStorageType());
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())
|| ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
FlinkCluster flinkCluster = flinkClusterService.getById(application.getFlinkClusterId());
boolean conned = flinkCluster.verifyClusterConnection();
if (!conned) {
throw new ApiAlertException("the target cluster is unavailable, please check!");
}
}
return true;
} catch (Exception e) {
log.error(ExceptionUtils.stringifyException(e));
throw new ApiDetailException(e);
}
}
@Override
public boolean checkAlter(Application application) {
Long appId = application.getId();
FlinkAppState state = FlinkAppState.of(application.getState());
if (!FlinkAppState.CANCELED.equals(state)) {
return false;
}
long cancelUserId = FlinkRESTAPIWatcher.getCanceledJobUserId(appId);
long appUserId = application.getUserId();
return cancelUserId != -1 && cancelUserId != appUserId;
}
private void removeApp(Application application) {
Long appId = application.getId();
removeById(appId);
try {
application
.getFsOperator()
.delete(application.getWorkspace().APP_WORKSPACE().concat("/").concat(appId.toString()));
// try to delete yarn-application, and leave no trouble.
String path =
Workspace.of(StorageType.HDFS).APP_WORKSPACE().concat("/").concat(appId.toString());
if (HdfsOperator.exists(path)) {
HdfsOperator.delete(path);
}
} catch (Exception e) {
// skip
}
}
@Override
public IPage<Application> page(Application appParam, RestRequest request) {
if (appParam.getTeamId() == null) {
return null;
}
Page<Application> page = new MybatisPager<Application>().getDefaultPage(request);
if (CommonUtils.notEmpty(appParam.getStateArray())) {
if (Arrays.stream(appParam.getStateArray())
.anyMatch(x -> x == FlinkAppState.FINISHED.getValue())) {
Integer[] newArray =
CommonUtils.arrayInsertIndex(
appParam.getStateArray(),
appParam.getStateArray().length,
FlinkAppState.POS_TERMINATED.getValue());
appParam.setStateArray(newArray);
}
}
this.baseMapper.page(page, appParam);
List<Application> records = page.getRecords();
long now = System.currentTimeMillis();
List<Application> newRecords =
records.stream()
.peek(
record -> {
// status of flink job on kubernetes mode had been automatically persisted to db
// in time.
if (isKubernetesApp(record)) {
// set duration
String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(record));
record.setFlinkRestUrl(restUrl);
if (record.getTracking() == 1
&& record.getStartTime() != null
&& record.getStartTime().getTime() > 0) {
record.setDuration(now - record.getStartTime().getTime());
}
}
})
.collect(Collectors.toList());
page.setRecords(newRecords);
return page;
}
@Override
public boolean existsByTeamId(Long teamId) {
return baseMapper.existsByTeamId(teamId);
}
@Override
public boolean existsRunningJobByClusterId(Long clusterId) {
boolean exists = baseMapper.existsRunningJobByClusterId(clusterId);
if (exists) {
return true;
}
for (Application application : FlinkRESTAPIWatcher.getWatchingApps()) {
if (clusterId.equals(application.getFlinkClusterId())
&& FlinkAppState.RUNNING.equals(application.getFlinkAppStateEnum())) {
return true;
}
}
return false;
}
@Override
public boolean existsJobByClusterId(Long clusterId) {
return baseMapper.existsJobByClusterId(clusterId);
}
@Override
public List<String> getRecentK8sNamespace() {
return baseMapper.getRecentK8sNamespace(DEFAULT_HISTORY_RECORD_LIMIT);
}
@Override
public List<String> getRecentK8sClusterId(Integer executionMode) {
return baseMapper.getRecentK8sClusterId(executionMode, DEFAULT_HISTORY_RECORD_LIMIT);
}
@Override
public List<String> getRecentFlinkBaseImage() {
return baseMapper.getRecentFlinkBaseImage(DEFAULT_HISTORY_RECORD_LIMIT);
}
@Override
public List<String> getRecentK8sPodTemplate() {
return baseMapper.getRecentK8sPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
}
@Override
public List<String> getRecentK8sJmPodTemplate() {
return baseMapper.getRecentK8sJmPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
}
@Override
public List<String> getRecentK8sTmPodTemplate() {
return baseMapper.getRecentK8sTmPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
}
@Override
public List<String> historyUploadJars() {
return Arrays.stream(LfsOperator.listDir(Workspace.of(LFS).APP_UPLOADS()))
.filter(File::isFile)
.sorted(Comparator.comparingLong(File::lastModified).reversed())
.map(File::getName)
.filter(fn -> fn.endsWith(".jar"))
.limit(DEFAULT_HISTORY_RECORD_LIMIT)
.collect(Collectors.toList());
}
@Override
public String k8sStartLog(Long id, Integer offset, Integer limit) throws Exception {
Application application = getById(id);
AssertUtils.state(application != null);
if (ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
CompletableFuture<String> future =
CompletableFuture.supplyAsync(
() ->
KubernetesDeploymentHelper.watchDeploymentLog(
application.getK8sNamespace(),
application.getJobName(),
application.getJobId()));
return future
.exceptionally(
e -> {
String errorLog =
String.format(
"%s/%s_err.log",
WebUtils.getAppTempDir().getAbsolutePath(), application.getJobId());
File file = new File(errorLog);
if (file.exists() && file.isFile()) {
return file.getAbsolutePath();
}
return null;
})
.thenApply(
path -> {
if (!future.isDone()) {
future.cancel(true);
}
if (path != null) {
return logClient.rollViewLog(path, offset, limit);
}
return null;
})
.toCompletableFuture()
.get(5, TimeUnit.SECONDS);
} else {
throw new ApiAlertException(
"job executionMode must be kubernetes-session|kubernetes-application");
}
}
@Override
public String getYarnName(Application appParam) {
String[] args = new String[2];
args[0] = "--name";
args[1] = appParam.getConfig();
return ParameterCli.read(args);
}
/** Check if the current jobName and other key identifiers already exist in db and yarn/k8s */
@Override
public AppExistsState checkExists(Application appParam) {
if (!checkJobName(appParam.getJobName())) {
return AppExistsState.INVALID;
}
boolean existsByJobName = this.existsByJobName(appParam.getJobName());
if (appParam.getId() != null) {
Application app = getById(appParam.getId());
if (app.getJobName().equals(appParam.getJobName())) {
return AppExistsState.NO;
}
if (existsByJobName) {
return AppExistsState.IN_DB;
}
FlinkAppState state = FlinkAppState.of(app.getState());
// has stopped status
if (state.equals(FlinkAppState.ADDED)
|| state.equals(FlinkAppState.CREATED)
|| state.equals(FlinkAppState.FAILED)
|| state.equals(FlinkAppState.CANCELED)
|| state.equals(FlinkAppState.LOST)
|| state.equals(FlinkAppState.KILLED)) {
// check whether jobName exists on yarn
if (ExecutionMode.isYarnMode(appParam.getExecutionMode())
&& YarnUtils.isContains(appParam.getJobName())) {
return AppExistsState.IN_YARN;
}
// check whether clusterId, namespace, jobId on kubernetes
else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
return AppExistsState.IN_KUBERNETES;
}
}
} else {
if (existsByJobName) {
return AppExistsState.IN_DB;
}
// check whether jobName exists on yarn
if (ExecutionMode.isYarnMode(appParam.getExecutionMode())
&& YarnUtils.isContains(appParam.getJobName())) {
return AppExistsState.IN_YARN;
}
// check whether clusterId, namespace, jobId on kubernetes
else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
return AppExistsState.IN_KUBERNETES;
}
}
return AppExistsState.NO;
}
@SneakyThrows
@Override
@Transactional(rollbackFor = {Exception.class})
public boolean create(Application appParam) {
AssertUtils.checkArgument(appParam.getTeamId() != null, "The teamId cannot be null");
appParam.setUserId(commonService.getUserId());
appParam.setState(FlinkAppState.ADDED.getValue());
appParam.setLaunch(LaunchState.NEED_LAUNCH.get());
appParam.setOptionState(OptionState.NONE.getValue());
appParam.setCreateTime(new Date());
appParam.setDefaultModeIngress(settingService.getIngressModeDefault());
appParam.doSetHotParams();
if (appParam.isUploadJob()) {
String jarPath =
WebUtils.getAppTempDir().getAbsolutePath().concat("/").concat(appParam.getJar());
appParam.setJarCheckSum(FileUtils.checksumCRC32(new File(jarPath)));
}
boolean saved = save(appParam);
if (saved) {
if (appParam.isFlinkSqlJob()) {
FlinkSql flinkSql = new FlinkSql(appParam);
flinkSqlService.create(flinkSql);
}
if (appParam.getConfig() != null) {
configService.create(appParam, true);
}
AssertUtils.state(appParam.getId() != null);
return true;
}
return false;
}
private boolean existsByJobName(String jobName) {
return this.baseMapper.existsByJobName(jobName);
}
@SuppressWarnings("checkstyle:WhitespaceAround")
@Override
@SneakyThrows
@Transactional(rollbackFor = {Exception.class})
public Long copy(Application appParam) {
boolean existsByJobName = this.existsByJobName(appParam.getJobName());
if (existsByJobName) {
throw new ApiAlertException("[StreamPark] Application names cannot be repeated");
}
Application oldApp = getById(appParam.getId());
Application newApp = new Application();
String jobName = appParam.getJobName();
newApp.setJobName(jobName);
newApp.setClusterId(
ExecutionMode.isSessionMode(oldApp.getExecutionModeEnum())
? oldApp.getClusterId()
: jobName);
newApp.setArgs(appParam.getArgs() != null ? appParam.getArgs() : oldApp.getArgs());
newApp.setVersionId(oldApp.getVersionId());
newApp.setFlinkClusterId(oldApp.getFlinkClusterId());
newApp.setRestartSize(oldApp.getRestartSize());
newApp.setJobType(oldApp.getJobType());
newApp.setOptions(oldApp.getOptions());
newApp.setDynamicProperties(oldApp.getDynamicProperties());
newApp.setResolveOrder(oldApp.getResolveOrder());
newApp.setExecutionMode(oldApp.getExecutionMode());
newApp.setFlinkImage(oldApp.getFlinkImage());
newApp.setK8sNamespace(oldApp.getK8sNamespace());
newApp.setK8sRestExposedType(oldApp.getK8sRestExposedType());
newApp.setK8sPodTemplate(oldApp.getK8sPodTemplate());
newApp.setK8sJmPodTemplate(oldApp.getK8sJmPodTemplate());
newApp.setK8sTmPodTemplate(oldApp.getK8sTmPodTemplate());
newApp.setK8sHadoopIntegration(oldApp.getK8sHadoopIntegration());
newApp.setDescription(oldApp.getDescription());
newApp.setAlertId(oldApp.getAlertId());
newApp.setCpFailureAction(oldApp.getCpFailureAction());
newApp.setCpFailureRateInterval(oldApp.getCpFailureRateInterval());
newApp.setCpMaxFailureInterval(oldApp.getCpMaxFailureInterval());
newApp.setMainClass(oldApp.getMainClass());
newApp.setAppType(oldApp.getAppType());
newApp.setResourceFrom(oldApp.getResourceFrom());
newApp.setProjectId(oldApp.getProjectId());
newApp.setModule(oldApp.getModule());
newApp.setDefaultModeIngress(oldApp.getDefaultModeIngress());
newApp.setUserId(commonService.getUserId());
newApp.setState(FlinkAppState.ADDED.getValue());
newApp.setLaunch(LaunchState.NEED_LAUNCH.get());
newApp.setOptionState(OptionState.NONE.getValue());
newApp.setCreateTime(new Date());
newApp.setHotParams(oldApp.getHotParams());
newApp.setJar(oldApp.getJar());
newApp.setJarCheckSum(oldApp.getJarCheckSum());
newApp.setTags(oldApp.getTags());
newApp.setTeamId(oldApp.getTeamId());
boolean saved = save(newApp);
if (saved) {
if (newApp.isFlinkSqlJob()) {
FlinkSql copyFlinkSql = flinkSqlService.getLatestFlinkSql(appParam.getId(), true);
newApp.setFlinkSql(copyFlinkSql.getSql());
newApp.setDependency(copyFlinkSql.getDependency());
FlinkSql flinkSql = new FlinkSql(newApp);
flinkSqlService.create(flinkSql);
}
if (newApp.getConfig() != null) {
ApplicationConfig copyConfig = configService.getEffective(appParam.getId());
ApplicationConfig config = new ApplicationConfig();
config.setAppId(newApp.getId());
config.setFormat(copyConfig.getFormat());
config.setContent(copyConfig.getContent());
config.setCreateTime(new Date());
config.setVersion(1);
configService.save(config);
configService.setLatestOrEffective(true, config.getId(), newApp.getId());
}
AssertUtils.state(newApp.getId() != null);
return newApp.getId();
}
return 0L;
}
@Override
@Transactional(rollbackFor = {Exception.class})
public boolean update(Application appParam) {
try {
Application application = getById(appParam.getId());
application.setLaunch(LaunchState.NEED_LAUNCH.get());
if (application.isUploadJob()) {
if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) {
application.setBuild(true);
} else {
File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
if (jarFile.exists()) {
long checkSum = FileUtils.checksumCRC32(jarFile);
if (!ObjectUtils.safeEquals(checkSum, application.getJarCheckSum())) {
application.setBuild(true);
}
}
}
}
if (!application.getBuild()) {
if (!application.getExecutionMode().equals(appParam.getExecutionMode())) {
if (appParam.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)
|| application.getExecutionModeEnum().equals(ExecutionMode.YARN_APPLICATION)) {
application.setBuild(true);
}
}
}
if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())) {
if (!ObjectUtils.safeTrimEquals(
application.getK8sRestExposedType(), appParam.getK8sRestExposedType())
|| !ObjectUtils.safeTrimEquals(
application.getK8sJmPodTemplate(), appParam.getK8sJmPodTemplate())
|| !ObjectUtils.safeTrimEquals(
application.getK8sTmPodTemplate(), appParam.getK8sTmPodTemplate())
|| !ObjectUtils.safeTrimEquals(
application.getK8sPodTemplates(), appParam.getK8sPodTemplates())
|| !ObjectUtils.safeTrimEquals(
application.getK8sHadoopIntegration(), appParam.getK8sHadoopIntegration())
|| !ObjectUtils.safeTrimEquals(application.getFlinkImage(), appParam.getFlinkImage())) {
application.setBuild(true);
}
}
appParam.setJobType(application.getJobType());
// changes to the following parameters need to be re-launched to take effect
application.setJobName(appParam.getJobName());
application.setVersionId(appParam.getVersionId());
application.setArgs(appParam.getArgs());
application.setOptions(appParam.getOptions());
application.setDynamicProperties(appParam.getDynamicProperties());
application.setResolveOrder(appParam.getResolveOrder());
application.setExecutionMode(appParam.getExecutionMode());
application.setClusterId(appParam.getClusterId());
application.setFlinkImage(appParam.getFlinkImage());
application.setK8sNamespace(appParam.getK8sNamespace());
application.updateHotParams(appParam);
application.setK8sRestExposedType(appParam.getK8sRestExposedType());
application.setK8sPodTemplate(appParam.getK8sPodTemplate());
application.setK8sJmPodTemplate(appParam.getK8sJmPodTemplate());
application.setK8sTmPodTemplate(appParam.getK8sTmPodTemplate());
application.setK8sHadoopIntegration(appParam.getK8sHadoopIntegration());
// changes to the following parameters do not affect running tasks
application.setModifyTime(new Date());
application.setDescription(appParam.getDescription());
application.setAlertId(appParam.getAlertId());
application.setRestartSize(appParam.getRestartSize());
application.setCpFailureAction(appParam.getCpFailureAction());
application.setCpFailureRateInterval(appParam.getCpFailureRateInterval());
application.setCpMaxFailureInterval(appParam.getCpMaxFailureInterval());
application.setTags(appParam.getTags());
switch (appParam.getExecutionModeEnum()) {
case YARN_APPLICATION:
case YARN_PER_JOB:
case KUBERNETES_NATIVE_APPLICATION:
application.setFlinkClusterId(null);
break;
case REMOTE:
case YARN_SESSION:
case KUBERNETES_NATIVE_SESSION:
application.setFlinkClusterId(appParam.getFlinkClusterId());
break;
default:
break;
}
// Flink Sql job...
if (application.isFlinkSqlJob()) {
updateFlinkSqlJob(application, appParam);
} else {
if (application.isStreamParkJob()) {
configService.update(appParam, application.isRunning());
} else {
application.setJar(appParam.getJar());
application.setMainClass(appParam.getMainClass());
}
}
baseMapper.updateById(application);
return true;
} catch (Exception e) {
log.error(e.getMessage(), e);
return false;
}
}
/**
* update FlinkSql type jobs, there are 3 aspects to consider<br>
* 1. flink sql has changed <br>
* 2. dependency has changed<br>
* 3. parameter has changed<br>
*
* @param application
* @param appParam
*/
private void updateFlinkSqlJob(Application application, Application appParam) {
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(application.getId(), true);
if (effectiveFlinkSql == null) {
effectiveFlinkSql = flinkSqlService.getCandidate(application.getId(), CandidateType.NEW);
flinkSqlService.removeById(effectiveFlinkSql.getId());
FlinkSql sql = new FlinkSql(appParam);
flinkSqlService.create(sql);
application.setBuild(true);
} else {
// get previous flink sql and decode
FlinkSql copySourceFlinkSql = flinkSqlService.getById(appParam.getSqlId());
AssertUtils.state(copySourceFlinkSql != null);
copySourceFlinkSql.decode();
// get submit flink sql
FlinkSql targetFlinkSql = new FlinkSql(appParam);
// judge sql and dependency has changed
ChangedType changedType = copySourceFlinkSql.checkChange(targetFlinkSql);
log.info("updateFlinkSqlJob changedType: {}", changedType);
// if has been changed
if (changedType.hasChanged()) {
// check if there is a candidate version for the newly added record
FlinkSql newFlinkSql = flinkSqlService.getCandidate(application.getId(), CandidateType.NEW);
// If the candidate version of the new record exists, it will be deleted directly,
// and only one candidate version will be retained. If the new candidate version is not
// effective,
// if it is edited again and the next record comes in, the previous candidate version will
// be deleted.
if (newFlinkSql != null) {
// delete all records about candidates
flinkSqlService.removeById(newFlinkSql.getId());
}
FlinkSql historyFlinkSql =
flinkSqlService.getCandidate(application.getId(), CandidateType.HISTORY);
// remove candidate flags that already exist but are set as candidates
if (historyFlinkSql != null) {
flinkSqlService.cleanCandidate(historyFlinkSql.getId());
}
FlinkSql sql = new FlinkSql(appParam);
flinkSqlService.create(sql);
if (changedType.isDependencyChanged()) {
application.setBuild(true);
}
} else {
// judge version has changed
boolean versionChanged = !effectiveFlinkSql.getId().equals(appParam.getSqlId());
if (versionChanged) {
// sql and dependency not changed, but version changed, means that rollback to the version
CandidateType type = CandidateType.HISTORY;
flinkSqlService.setCandidate(type, appParam.getId(), appParam.getSqlId());
application.setLaunch(LaunchState.NEED_ROLLBACK.get());
application.setBuild(true);
}
}
}
this.configService.update(appParam, application.isRunning());
}
@Override
public void updateLaunch(Application application) {
LambdaUpdateWrapper<Application> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(Application::getId, application.getId());
updateWrapper.set(Application::getLaunch, application.getLaunch());
updateWrapper.set(Application::getBuild, application.getBuild());
if (application.getOptionState() != null) {
updateWrapper.set(Application::getOptionState, application.getOptionState());
}
this.update(updateWrapper);
}
@Override
public List<Application> getByProjectId(Long id) {
return baseMapper.getByProjectId(id);
}
@Override
public List<Application> getByTeamId(Long teamId) {
return baseMapper.getByTeamId(teamId);
}
@Override
public boolean checkBuildAndUpdate(Application application) {
boolean build = application.getBuild();
if (!build) {
LambdaUpdateWrapper<Application> updateWrapper = Wrappers.lambdaUpdate();
updateWrapper.eq(Application::getId, application.getId());
if (application.isRunning()) {
updateWrapper.set(Application::getLaunch, LaunchState.NEED_RESTART.get());
} else {
updateWrapper.set(Application::getLaunch, LaunchState.DONE.get());
updateWrapper.set(Application::getOptionState, OptionState.NONE.getValue());
}
this.update(updateWrapper);
// backup
if (application.isFlinkSqlJob()) {
FlinkSql newFlinkSql = flinkSqlService.getCandidate(application.getId(), CandidateType.NEW);
if (!application.isNeedRollback() && newFlinkSql != null) {
backUpService.backup(application, newFlinkSql);
}
}
// If the current task is not running, or the task has just been added,
// directly set the candidate version to the official version
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
if (!application.isRunning() || flinkSql == null) {
this.toEffective(application);
}
}
return build;
}
@Override
public void forcedStop(Application app) {
CompletableFuture<SubmitResponse> startFuture = startFutureMap.remove(app.getId());
CompletableFuture<CancelResponse> cancelFuture = cancelFutureMap.remove(app.getId());
Application application = this.baseMapper.getApp(app);
if (isKubernetesApp(application)) {
KubernetesDeploymentHelper.watchPodTerminatedLog(
application.getK8sNamespace(), application.getJobName(), application.getJobId());
KubernetesDeploymentHelper.deleteTaskDeployment(
application.getK8sNamespace(), application.getJobName());
KubernetesDeploymentHelper.deleteTaskConfigMap(
application.getK8sNamespace(), application.getJobName());
IngressController.deleteIngress(application.getK8sNamespace(), application.getJobName());
}
if (startFuture != null) {
startFuture.cancel(true);
}
if (cancelFuture != null) {
cancelFuture.cancel(true);
}
if (startFuture == null && cancelFuture == null) {
this.updateToStopped(app);
}
}
@Override
public void clean(Application appParam) {
appParam.setLaunch(LaunchState.DONE.get());
this.updateLaunch(appParam);
}
@Override
public String readConf(Application appParam) throws IOException {
File file = new File(appParam.getConfig());
String conf = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
return Base64.getEncoder().encodeToString(conf.getBytes());
}
@Override
public Application getApp(Application appParam) {
Application application = this.baseMapper.getApp(appParam);
ApplicationConfig config = configService.getEffective(appParam.getId());
config = config == null ? configService.getLatest(appParam.getId()) : config;
if (config != null) {
config.setToApplication(application);
}
if (application.isFlinkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
if (flinkSql == null) {
flinkSql = flinkSqlService.getCandidate(application.getId(), CandidateType.NEW);
flinkSql.setSql(DeflaterUtils.unzipString(flinkSql.getSql()));
}
flinkSql.setToApplication(application);
} else {
if (application.isCICDJob()) {
String path =
this.projectService.getAppConfPath(application.getProjectId(), application.getModule());
application.setConfPath(path);
}
}
// add flink web url info for k8s-mode
if (isKubernetesApp(application)) {
String restUrl = k8SFlinkTrackMonitor.getRemoteRestUrl(toTrackId(application));
application.setFlinkRestUrl(restUrl);
// set duration
long now = System.currentTimeMillis();
if (application.getTracking() == 1
&& application.getStartTime() != null
&& application.getStartTime().getTime() > 0) {
application.setDuration(now - application.getStartTime().getTime());
}
}
if (!application.getHotParamsMap().isEmpty()) {
if (ExecutionMode.YARN_APPLICATION.equals(application.getExecutionModeEnum())) {
if (application.getHotParamsMap().containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
application.setYarnQueue(
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE()).toString());
}
}
}
return application;
}
@Override
public String getMain(Application application) {
File jarFile;
if (application.getProjectId() == null) {
jarFile = new File(application.getJar());
} else {
Project project = new Project();
project.setId(application.getProjectId());
String modulePath =
project.getDistHome().getAbsolutePath().concat("/").concat(application.getModule());
jarFile = new File(modulePath, application.getJar());
}
Manifest manifest = Utils.getJarManifest(jarFile);
return manifest.getMainAttributes().getValue("Main-Class");
}
@Override
public boolean mapping(Application appParam) {
boolean mapping = this.baseMapper.mapping(appParam);
Application application = getById(appParam.getId());
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.doWatching(toTrackId(application));
} else {
FlinkRESTAPIWatcher.doWatching(application);
}
return mapping;
}
@Override
public void cancel(Application appParam) throws Exception {
FlinkRESTAPIWatcher.setOptionState(appParam.getId(), OptionState.CANCELLING);
Application application = getById(appParam.getId());
application.setState(FlinkAppState.CANCELLING.getValue());
if (appParam.getSavePointed()) {
FlinkRESTAPIWatcher.addSavepoint(application.getId());
application.setOptionState(OptionState.SAVEPOINTING.getValue());
} else {
application.setOptionState(OptionState.CANCELLING.getValue());
}
application.setOptionTime(new Date());
this.baseMapper.updateById(application);
Long userId = commonService.getUserId();
if (!application.getUserId().equals(userId)) {
FlinkRESTAPIWatcher.addCanceledApp(application.getId(), userId);
}
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
// infer savepoint
String customSavepoint = null;
if (appParam.getSavePointed()) {
customSavepoint = appParam.getSavePoint();
if (StringUtils.isBlank(customSavepoint)) {
customSavepoint = getSavePointPath(appParam);
}
}
String clusterId = null;
if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
clusterId = application.getClusterId();
} else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
AssertUtils.state(
cluster != null,
String.format(
"The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or "
+ "the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
clusterId = cluster.getClusterId();
} else {
clusterId = application.getAppId();
}
}
Map<String, Object> properties = new HashMap<>();
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
AssertUtils.state(
cluster != null,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or "
+ "the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
URI activeAddress = cluster.getRemoteURI();
properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
}
CancelRequest cancelRequest =
new CancelRequest(
flinkEnv.getFlinkVersion(),
ExecutionMode.of(application.getExecutionMode()),
clusterId,
application.getJobId(),
appParam.getSavePointed(),
appParam.getDrain(),
customSavepoint,
application.getK8sNamespace(),
properties);
CompletableFuture<CancelResponse> cancelFuture =
CompletableFuture.supplyAsync(() -> FlinkSubmitter.cancel(cancelRequest), executorService);
cancelFutureMap.put(application.getId(), cancelFuture);
CompletableFutureUtils.runTimeout(
cancelFuture,
10L,
TimeUnit.MINUTES,
cancelResponse -> {
if (cancelResponse != null && cancelResponse.savePointDir() != null) {
String savePointDir = cancelResponse.savePointDir();
log.info("savePoint path: {}", savePointDir);
SavePoint savePoint = new SavePoint();
Date now = new Date();
savePoint.setPath(savePointDir);
savePoint.setAppId(application.getId());
savePoint.setLatest(true);
savePoint.setType(CheckPointType.SAVEPOINT.get());
savePoint.setTriggerTime(now);
savePoint.setCreateTime(now);
savePointService.save(savePoint);
}
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(application));
}
},
e -> {
if (e.getCause() instanceof CancellationException) {
updateToStopped(application);
} else {
log.error("stop flink job fail.", e);
application.setOptionState(OptionState.NONE.getValue());
application.setState(FlinkAppState.FAILED.getValue());
updateById(application);
if (appParam.getSavePointed()) {
savePointService.expire(application.getId());
}
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
TrackId id = toTrackId(application);
k8SFlinkTrackMonitor.unWatching(id);
k8SFlinkTrackMonitor.doWatching(id);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
}
ApplicationLog log = new ApplicationLog();
log.setAppId(application.getId());
log.setYarnAppId(application.getClusterId());
log.setOptionTime(new Date());
String exception = ExceptionUtils.stringifyException(e);
log.setException(exception);
log.setSuccess(false);
applicationLogService.save(log);
}
})
.whenComplete(
(t, e) -> {
if (isKubernetesApp(application)) {
IngressController.deleteIngress(
application.getK8sNamespace(), application.getJobName());
}
cancelFutureMap.remove(application.getId());
});
}
@Override
public String checkSavepointPath(Application appParam) throws Exception {
String savepointPath = appParam.getSavePoint();
if (StringUtils.isBlank(savepointPath)) {
savepointPath = getSavePointPath(appParam);
}
if (StringUtils.isNotBlank(savepointPath)) {
final URI uri = URI.create(savepointPath);
final String scheme = uri.getScheme();
final String pathPart = uri.getPath();
String error = null;
if (scheme == null) {
error =
"This state.savepoints.dir value "
+ savepointPath
+ " scheme (hdfs://, file://, etc) of is null. Please specify the file system scheme explicitly in the URI.";
} else if (pathPart == null) {
error =
"This state.savepoints.dir value "
+ savepointPath
+ " path part to store the checkpoint data in is null. Please specify a directory path for the checkpoint data.";
} else if (pathPart.length() == 0 || "/".equals(pathPart)) {
error =
"This state.savepoints.dir value "
+ savepointPath
+ " Cannot use the root directory for checkpoints.";
}
return error;
} else {
return "When custom savepoint is not set, state.savepoints.dir needs to be set in properties or flink-conf.yaml of application";
}
}
@Override
public void persistMetrics(Application appParam) {
this.baseMapper.persistMetrics(appParam);
}
/**
* Setup task is starting (for webUI "state" display)
*
* @param appParam
*/
@Override
public void starting(Application appParam) {
Application application = getById(appParam.getId());
AssertUtils.state(application != null);
application.setState(FlinkAppState.STARTING.getValue());
application.setOptionTime(new Date());
updateById(application);
}
@Override
@Transactional(rollbackFor = {Exception.class})
public void start(Application appParam, boolean auto) throws Exception {
final Application application = getById(appParam.getId());
AssertUtils.state(application != null);
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
if (flinkEnv == null) {
throw new ApiAlertException("[StreamPark] can no found flink version");
}
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
} else {
if (!application.isNeedRestartOnFailed()) {
return;
}
application.setRestartCount(application.getRestartCount() + 1);
application.setSavePointed(true);
}
String appConf;
String flinkUserJar = null;
String jobId = new JobID().toHexString();
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setAppId(application.getId());
applicationLog.setOptionTime(new Date());
// set the latest to Effective, (it will only become the current effective at this time)
this.toEffective(application);
ApplicationConfig applicationConfig = configService.getEffective(application.getId());
ExecutionMode executionMode = ExecutionMode.of(application.getExecutionMode());
AssertUtils.state(executionMode != null);
if (application.isCustomCodeJob()) {
if (application.isUploadJob()) {
appConf =
String.format(
"json://{\"%s\":\"%s\"}",
ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
} else {
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
ConfigFileType fileType = ConfigFileType.of(applicationConfig.getFormat());
if (fileType != null && !fileType.equals(ConfigFileType.UNKNOWN)) {
appConf =
String.format("%s://%s", fileType.getTypeName(), applicationConfig.getContent());
} else {
throw new IllegalArgumentException(
"application' config type error,must be ( yaml| properties| hocon )");
}
break;
case APACHE_FLINK:
appConf =
String.format(
"json://{\"%s\":\"%s\"}",
ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
break;
default:
throw new IllegalArgumentException(
"[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
}
}
if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
flinkUserJar =
String.format(
"%s/%s", application.getAppLib(), application.getModule().concat(".jar"));
break;
case APACHE_FLINK:
flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
break;
default:
throw new IllegalArgumentException(
"[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
}
}
} else if (application.isFlinkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
AssertUtils.state(flinkSql != null);
// 1) dist_userJar
String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
// 2) appConfig
appConf =
applicationConfig == null
? null
: String.format("yaml://%s", applicationConfig.getContent());
// 3) client
if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
String clientPath = Workspace.remote().APP_CLIENT();
flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
}
} else {
throw new UnsupportedOperationException("Unsupported...");
}
Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isFlinkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
flinkSql.setSql(DeflaterUtils.zipString(realSql));
extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
}
KubernetesSubmitParam kubernetesSubmitParam =
new KubernetesSubmitParam(
application.getClusterId(),
application.getK8sNamespace(),
application.getK8sRestExposedTypeEnum());
AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
AssertUtils.state(buildPipeline != null);
BuildResult buildResult = buildPipeline.getBuildResult();
if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
} else {
if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
AssertUtils.state(buildResult != null);
DockerImageBuildResponse result = buildResult.as(DockerImageBuildResponse.class);
String ingressTemplates = application.getIngressTemplate();
String domainName = application.getDefaultModeIngress();
if (StringUtils.isNotBlank(ingressTemplates)) {
String ingressOutput = result.workspacePath() + "/ingress.yaml";
IngressController.configureIngress(ingressOutput);
}
if (StringUtils.isNotBlank(domainName)) {
try {
IngressController.configureIngress(
domainName, application.getClusterId(), application.getK8sNamespace());
} catch (KubernetesClientException e) {
log.info("Failed to create ingress, stack info:{}", e.getMessage());
applicationLog.setException(e.getMessage());
applicationLog.setSuccess(false);
applicationLogService.save(applicationLog);
application.setState(FlinkAppState.FAILED.getValue());
application.setOptionState(OptionState.NONE.getValue());
updateById(application);
return;
}
}
}
}
// Get the args after placeholder replacement
String applicationArgs =
variableService.replaceVariable(application.getTeamId(), application.getArgs());
SubmitRequest submitRequest =
new SubmitRequest(
flinkEnv.getFlinkVersion(),
flinkEnv.getFlinkConf(),
DevelopmentMode.of(application.getJobType()),
ExecutionMode.of(application.getExecutionMode()),
application.getId(),
jobId,
application.getJobName(),
appConf,
application.getApplicationType(),
getSavePointed(appParam),
getProperties(application),
applicationArgs,
buildResult,
kubernetesSubmitParam,
extraParameter);
CompletableFuture<SubmitResponse> future =
CompletableFuture.supplyAsync(() -> FlinkSubmitter.submit(submitRequest), executorService);
startFutureMap.put(application.getId(), future);
CompletableFutureUtils.runTimeout(
future,
2L,
TimeUnit.MINUTES,
submitResponse -> {
if (submitResponse.flinkConfig() != null) {
String jmMemory =
submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
if (jmMemory != null) {
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
}
String tmMemory =
submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_TM_PROCESS_MEMORY());
if (tmMemory != null) {
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
}
}
application.setAppId(submitResponse.clusterId());
if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
application.setJobId(submitResponse.jobId());
}
if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
application.setJobManagerUrl(submitResponse.jobManagerUrl());
applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl());
}
applicationLog.setYarnAppId(submitResponse.clusterId());
application.setStartTime(new Date());
application.setEndTime(null);
if (isKubernetesApp(application)) {
application.setLaunch(LaunchState.DONE.get());
}
updateById(application);
// if start completed, will be added task to tracking queue
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.doWatching(toTrackId(application));
} else {
FlinkRESTAPIWatcher.setOptionState(appParam.getId(), OptionState.STARTING);
FlinkRESTAPIWatcher.doWatching(application);
}
applicationLog.setSuccess(true);
applicationLogService.save(applicationLog);
// set savepoint to expire
savePointService.expire(application.getId());
},
e -> {
if (e.getCause() instanceof CancellationException) {
updateToStopped(application);
} else {
String exception = ExceptionUtils.stringifyException(e);
applicationLog.setException(exception);
applicationLog.setSuccess(false);
applicationLogService.save(applicationLog);
Application app = getById(appParam.getId());
app.setState(FlinkAppState.FAILED.getValue());
app.setOptionState(OptionState.NONE.getValue());
updateById(app);
if (isKubernetesApp(app)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(app));
} else {
FlinkRESTAPIWatcher.unWatching(appParam.getId());
}
}
})
.whenComplete(
(t, e) -> {
startFutureMap.remove(application.getId());
});
}
private Map<String, Object> getProperties(Application application) {
Map<String, Object> properties = application.getOptionMap();
if (ExecutionMode.isRemoteMode(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
AssertUtils.state(
cluster != null,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or "
+ "the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
URI activeAddress = cluster.getRemoteURI();
properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
} else if (ExecutionMode.isYarnMode(application.getExecutionModeEnum())) {
String yarnQueue =
(String) application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE());
if (yarnQueue != null) {
properties.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueue);
}
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
AssertUtils.state(
cluster != null,
String.format(
"The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or "
+ "the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
properties.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
}
} else if (ExecutionMode.isKubernetesMode(application.getExecutionModeEnum())) {
properties.put(ConfigConst.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
}
if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
try {
HadoopUtils.yarnClient();
properties.put(JobManagerOptions.ARCHIVE_DIR.key(), Workspace.ARCHIVES_FILE_PATH());
} catch (Exception e) {
// skip
}
}
if (application.getAllowNonRestored()) {
properties.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true);
}
Map<String, String> dynamicProperties =
FlinkSubmitter.extractDynamicPropertiesAsJava(application.getDynamicProperties());
properties.putAll(dynamicProperties);
ResolveOrder resolveOrder = ResolveOrder.of(application.getResolveOrder());
if (resolveOrder != null) {
properties.put(CoreOptions.CLASSLOADER_RESOLVE_ORDER.key(), resolveOrder.getName());
}
return properties;
}
private void updateToStopped(Application app) {
Application application = getById(app);
application.setOptionState(OptionState.NONE.getValue());
application.setState(FlinkAppState.CANCELED.getValue());
application.setOptionTime(new Date());
updateById(application);
savePointService.expire(application.getId());
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
TrackId id = toTrackId(application);
k8SFlinkTrackMonitor.unWatching(id);
k8SFlinkTrackMonitor.doWatching(id);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
}
}
private Boolean checkJobName(String jobName) {
if (!StringUtils.isEmpty(jobName.trim())) {
return JOB_NAME_PATTERN.matcher(jobName).matches()
&& SINGLE_SPACE_PATTERN.matcher(jobName).matches();
}
return false;
}
private String getSavePointPath(Application appParam) throws Exception {
Application application = getById(appParam.getId());
// 1) properties have the highest priority, read the properties are set: -Dstate.savepoints.dir
String savepointPath =
FlinkSubmitter.extractDynamicPropertiesAsJava(application.getDynamicProperties())
.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
// Application conf configuration has the second priority. If it is a streampark|flinksql type
// task,
// see if Application conf is configured when the task is defined, if checkpoints are configured
// and enabled,
// read `state.savepoints.dir`
if (StringUtils.isBlank(savepointPath)) {
if (application.isStreamParkJob() || application.isFlinkSqlJob()) {
ApplicationConfig applicationConfig = configService.getEffective(application.getId());
if (applicationConfig != null) {
Map<String, String> map = applicationConfig.readConfig();
if (FlinkUtils.isCheckpointEnabled(map)) {
savepointPath = map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
}
}
}
}
// 3) If the savepoint is not obtained above, try to obtain the savepoint path according to the
// deployment type (remote|on yarn)
if (StringUtils.isBlank(savepointPath)) {
// 3.1) At the remote mode, request the flink webui interface to get the savepoint path
if (ExecutionMode.isRemoteMode(application.getExecutionMode())) {
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
AssertUtils.state(
cluster != null,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or "
+ "the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
Map<String, String> config = cluster.getFlinkConfig();
if (!config.isEmpty()) {
savepointPath = config.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
}
} else {
// 3.2) At the yarn or k8s mode, then read the savepoint in flink-conf.yml in the bound
// flink
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
savepointPath =
flinkEnv.convertFlinkYamlAsMap().get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
}
}
return savepointPath;
}
private String getSavePointed(Application appParam) {
if (appParam.getSavePointed()) {
if (appParam.getSavePoint() == null) {
SavePoint savePoint = savePointService.getLatest(appParam.getId());
if (savePoint != null) {
return savePoint.getPath();
}
} else {
return appParam.getSavePoint();
}
}
return null;
}
}