blob: 9aba293a01511d090323bc552e7d883ee5e0ad0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.ExceptionUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.ApplicationConfig;
import org.apache.streampark.console.core.entity.ApplicationLog;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.SavePoint;
import org.apache.streampark.console.core.enums.CheckPointTypeEnum;
import org.apache.streampark.console.core.enums.OperationEnum;
import org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.mapper.SavePointMapper;
import org.apache.streampark.console.core.service.ApplicationConfigService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
import org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.application.ApplicationActionService;
import org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.SavepointResponse;
import org.apache.streampark.flink.client.bean.TriggerSavepointRequest;
import org.apache.streampark.flink.util.FlinkUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.RestOptions;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
import static org.apache.streampark.common.util.PropertiesUtils.extractDynamicPropertiesAsJava;
import static org.apache.streampark.console.core.enums.CheckPointTypeEnum.CHECKPOINT;
@Slf4j
@Service
@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class)
public class SavePointServiceImpl extends ServiceImpl<SavePointMapper, SavePoint>
implements SavePointService {
@Autowired private FlinkEnvService flinkEnvService;
@Autowired private ApplicationManageService applicationManageService;
@Autowired private ApplicationActionService applicationActionService;
@Autowired private ApplicationConfigService configService;
@Autowired private FlinkClusterService flinkClusterService;
@Autowired private ApplicationLogService applicationLogService;
@Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
@Autowired private CommonServiceImpl commonService;
@Qualifier("triggerSavepointExecutor")
@Autowired
private Executor executorService;
@Override
public void expire(Long appId) {
SavePoint savePoint = new SavePoint();
savePoint.setLatest(false);
LambdaQueryWrapper<SavePoint> queryWrapper =
new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, appId);
this.update(savePoint, queryWrapper);
}
@Override
public boolean save(SavePoint entity) {
this.expire(entity);
this.expire(entity.getAppId());
return super.save(entity);
}
@Override
public SavePoint getLatest(Long id) {
LambdaQueryWrapper<SavePoint> queryWrapper =
new LambdaQueryWrapper<SavePoint>()
.eq(SavePoint::getAppId, id)
.eq(SavePoint::getLatest, true);
return this.getOne(queryWrapper);
}
@Override
public String getSavePointPath(Application appParam) throws Exception {
Application application = applicationManageService.getById(appParam.getId());
// 1) properties have the highest priority, read the properties are set: -Dstate.savepoints.dir
String savepointPath = getSavepointFromDynamicProps(application.getDynamicProperties());
if (StringUtils.isNotBlank(savepointPath)) {
return savepointPath;
}
// Application conf configuration has the second priority. If it is a streampark|flinksql type
// task, see if Application conf is configured when the task is defined, if checkpoints are
// configured
// and enabled, read `state.savepoints.dir`
savepointPath = getSavepointFromConfig(application);
if (StringUtils.isNotBlank(savepointPath)) {
return savepointPath;
}
// 3) If the savepoint is not obtained above, try to obtain the savepoint path according to the
// deployment type (remote|on yarn)
// 3.1) At the remote mode, request the flink webui interface to get the savepoint path
// 3.2) At the yarn or k8s mode, then read the savepoint in flink-conf.yml in the bound flink
return getSavepointFromDeployLayer(application);
}
@Override
public void trigger(Long appId, @Nullable String savepointPath, @Nullable Boolean nativeFormat) {
log.info("Start to trigger savepoint for app {}", appId);
Application application = applicationManageService.getById(appId);
ApplicationLog applicationLog = getApplicationLog(application);
FlinkAppHttpWatcher.addSavepoint(application.getId());
application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue());
application.setOptionTime(new Date());
this.applicationActionService.updateById(application);
flinkAppHttpWatcher.init();
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
// infer savepoint
TriggerSavepointRequest request =
renderTriggerSavepointRequest(savepointPath, nativeFormat, application, flinkEnv);
CompletableFuture<SavepointResponse> savepointFuture =
CompletableFuture.supplyAsync(() -> FlinkClient.triggerSavepoint(request), executorService);
handleSavepointResponseFuture(application, applicationLog, savepointFuture);
}
@NotNull
private ApplicationLog getApplicationLog(Application application) {
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setOptionName(OperationEnum.SAVEPOINT.getValue());
applicationLog.setAppId(application.getId());
applicationLog.setJobManagerUrl(application.getJobManagerUrl());
applicationLog.setOptionTime(new Date());
applicationLog.setYarnAppId(application.getClusterId());
applicationLog.setUserId(commonService.getUserId());
return applicationLog;
}
@Override
public Boolean remove(Long id, Application appParam) throws InternalException {
SavePoint savePoint = getById(id);
try {
if (StringUtils.isNotBlank(savePoint.getPath())) {
appParam.getFsOperator().delete(savePoint.getPath());
}
return removeById(id);
} catch (Exception e) {
throw new InternalException(e.getMessage());
}
}
@Override
public IPage<SavePoint> getPage(SavePoint savePoint, RestRequest request) {
request.setSortField("trigger_time");
Page<SavePoint> page = MybatisPager.getPage(request);
LambdaQueryWrapper<SavePoint> queryWrapper =
new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, savePoint.getAppId());
return this.page(page, queryWrapper);
}
@Override
public void remove(Application appParam) {
Long appId = appParam.getId();
LambdaQueryWrapper<SavePoint> queryWrapper =
new LambdaQueryWrapper<SavePoint>().eq(SavePoint::getAppId, appId);
this.remove(queryWrapper);
try {
appParam
.getFsOperator()
.delete(appParam.getWorkspace().APP_SAVEPOINTS().concat("/").concat(appId.toString()));
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
// private methods.
private void handleSavepointResponseFuture(
Application application,
ApplicationLog applicationLog,
CompletableFuture<SavepointResponse> savepointFuture) {
CompletableFutureUtils.runTimeout(
savepointFuture,
10L,
TimeUnit.MINUTES,
savepointResponse -> {
if (savepointResponse != null && savepointResponse.savePointDir() != null) {
applicationLog.setSuccess(true);
String savePointDir = savepointResponse.savePointDir();
log.info("Request savepoint successful, savepointDir: {}", savePointDir);
}
},
e -> {
log.error("Trigger savepoint for flink job failed.", e);
String exception = ExceptionUtils.stringifyException(e);
applicationLog.setException(exception);
if (!(e instanceof TimeoutException)) {
applicationLog.setSuccess(false);
}
})
.whenComplete(
(t, e) -> {
applicationLogService.save(applicationLog);
application.setOptionState(OptionStateEnum.NONE.getValue());
application.setOptionTime(new Date());
applicationManageService.update(application);
flinkAppHttpWatcher.init();
});
}
private String getFinalSavepointDir(@Nullable String savepointPath, Application application) {
String result = savepointPath;
if (StringUtils.isBlank(savepointPath)) {
try {
result = this.getSavePointPath(application);
} catch (Exception e) {
throw new ApiAlertException(
"Error in getting savepoint path for triggering savepoint for app "
+ application.getId(),
e);
}
}
return result;
}
@Nonnull
private Map<String, Object> tryGetRestProps(Application application, FlinkCluster cluster) {
Map<String, Object> properties = new HashMap<>();
if (FlinkExecutionMode.isRemoteMode(application.getFlinkExecutionMode())) {
AssertUtils.notNull(
cluster,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
URI activeAddress = cluster.getRemoteURI();
properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
}
return properties;
}
private String getClusterId(Application application, FlinkCluster cluster) {
if (FlinkExecutionMode.isKubernetesMode(application.getExecutionMode())) {
return application.getClusterId();
}
if (FlinkExecutionMode.isYarnMode(application.getExecutionMode())) {
if (FlinkExecutionMode.YARN_SESSION == application.getFlinkExecutionMode()) {
AssertUtils.notNull(
cluster,
String.format(
"The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
return cluster.getClusterId();
}
return application.getAppId();
}
return null;
}
/**
* Try to get the savepoint config item from the dynamic properties.
*
* @param dynamicProps dynamic properties string.
* @return the value of the savepoint in the dynamic properties.
*/
@VisibleForTesting
@Nullable
public String getSavepointFromDynamicProps(String dynamicProps) {
return extractDynamicPropertiesAsJava(dynamicProps).get(SAVEPOINT_DIRECTORY.key());
}
/**
* Try to obtain the savepoint path If it is a streampark|flinksql type task. See if Application
* conf is configured when the task is defined, if checkpoints are configured and enabled, read
* `state.savepoints.dir`.
*
* @param application the target application.
* @return the value of the savepoint if existed.
*/
@VisibleForTesting
@Nullable
public String getSavepointFromConfig(Application application) {
if (!application.isStreamParkJob() && !application.isFlinkSqlJob()) {
return null;
}
ApplicationConfig applicationConfig = configService.getEffective(application.getId());
if (applicationConfig == null) {
return null;
}
Map<String, String> configMap = applicationConfig.readConfig();
return FlinkUtils.isCheckpointEnabled(configMap)
? configMap.get(SAVEPOINT_DIRECTORY.key())
: null;
}
/**
* Try to obtain the savepoint path according to the eployment type (remote|on yarn). At the
* remote mode, request the flink webui interface to get the savepoint path At the yarn or k8s
* mode, then read the savepoint in flink-conf.yml in the bound flink
*
* @param application the target application.
* @return the value of the savepoint if existed.
*/
@VisibleForTesting
@Nullable
public String getSavepointFromDeployLayer(Application application)
throws JsonProcessingException {
// At the yarn or k8s mode, then read the savepoint in flink-conf.yml in the bound flink
if (!FlinkExecutionMode.isRemoteMode(application.getExecutionMode())) {
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
return flinkEnv.convertFlinkYamlAsMap().get(SAVEPOINT_DIRECTORY.key());
}
// At the remote mode, request the flink webui interface to get the savepoint path
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
AssertUtils.notNull(
cluster,
String.format(
"The clusterId=%s cannot be find, maybe the clusterId is wrong or "
+ "the cluster has been deleted. Please contact the Admin.",
application.getFlinkClusterId()));
Map<String, String> config = cluster.getFlinkConfig();
return config.isEmpty() ? null : config.get(SAVEPOINT_DIRECTORY.key());
}
/** Try get the 'state.checkpoints.num-retained' from the dynamic properties. */
private Optional<Integer> tryGetChkNumRetainedFromDynamicProps(String dynamicProps) {
String rawCfgValue =
extractDynamicPropertiesAsJava(dynamicProps).get(MAX_RETAINED_CHECKPOINTS.key());
if (StringUtils.isBlank(rawCfgValue)) {
return Optional.empty();
}
try {
int value = Integer.parseInt(rawCfgValue.trim());
if (value > 0) {
return Optional.of(value);
}
log.warn(
"This value of dynamicProperties key: state.checkpoints.num-retained is invalid, must be gt 0");
} catch (NumberFormatException e) {
log.warn(
"This value of dynamicProperties key: state.checkpoints.num-retained invalid, must be number");
}
return Optional.empty();
}
/** Try get the 'state.checkpoints.num-retained' from the flink env. */
private int getChkNumRetainedFromFlinkEnv(
@Nonnull FlinkEnv flinkEnv, @Nonnull Application application) {
String flinkConfNumRetained =
flinkEnv.convertFlinkYamlAsMap().get(MAX_RETAINED_CHECKPOINTS.key());
if (StringUtils.isBlank(flinkConfNumRetained)) {
log.info(
"The application: {} is not set {} in dynamicProperties or value is invalid, and flink-conf.yaml is the same problem of flink env: {}, default value: {} will be use.",
application.getJobName(),
MAX_RETAINED_CHECKPOINTS.key(),
flinkEnv.getFlinkHome(),
MAX_RETAINED_CHECKPOINTS.defaultValue());
return MAX_RETAINED_CHECKPOINTS.defaultValue();
}
try {
int value = Integer.parseInt(flinkConfNumRetained.trim());
if (value > 0) {
return value;
}
log.warn(
"The value of key: state.checkpoints.num-retained in flink-conf.yaml is invalid, must be gt 0, default value: {} will be use",
MAX_RETAINED_CHECKPOINTS.defaultValue());
} catch (NumberFormatException e) {
log.warn(
"The value of key: state.checkpoints.num-retained in flink-conf.yaml is invalid, must be number, flink env: {}, default value: {} will be use",
flinkEnv.getFlinkHome(),
flinkConfNumRetained);
}
return MAX_RETAINED_CHECKPOINTS.defaultValue();
}
private void expire(SavePoint entity) {
FlinkEnv flinkEnv = flinkEnvService.getByAppId(entity.getAppId());
Application application = applicationManageService.getById(entity.getAppId());
AssertUtils.notNull(flinkEnv);
AssertUtils.notNull(application);
int cpThreshold =
tryGetChkNumRetainedFromDynamicProps(application.getDynamicProperties())
.orElse(getChkNumRetainedFromFlinkEnv(flinkEnv, application));
cpThreshold =
CHECKPOINT == CheckPointTypeEnum.of(entity.getType()) ? cpThreshold - 1 : cpThreshold;
if (cpThreshold == 0) {
LambdaQueryWrapper<SavePoint> queryWrapper =
new LambdaQueryWrapper<SavePoint>()
.eq(SavePoint::getAppId, entity.getAppId())
.eq(SavePoint::getType, CHECKPOINT.get());
this.remove(queryWrapper);
return;
}
LambdaQueryWrapper<SavePoint> queryWrapper =
new LambdaQueryWrapper<SavePoint>()
.select(SavePoint::getTriggerTime)
.eq(SavePoint::getAppId, entity.getAppId())
.eq(SavePoint::getType, CHECKPOINT.get())
.orderByDesc(SavePoint::getTriggerTime);
Page<SavePoint> savePointPage =
this.baseMapper.selectPage(new Page<>(1, cpThreshold + 1), queryWrapper);
if (CollectionUtils.isEmpty(savePointPage.getRecords())
|| savePointPage.getRecords().size() <= cpThreshold) {
return;
}
SavePoint savePoint = savePointPage.getRecords().get(cpThreshold - 1);
LambdaQueryWrapper<SavePoint> lambdaQueryWrapper =
new LambdaQueryWrapper<SavePoint>()
.eq(SavePoint::getAppId, entity.getAppId())
.eq(SavePoint::getType, CHECKPOINT.get())
.lt(SavePoint::getTriggerTime, savePoint.getTriggerTime());
this.remove(lambdaQueryWrapper);
}
@Nonnull
private TriggerSavepointRequest renderTriggerSavepointRequest(
@Nullable String savepointPath,
Boolean nativeFormat,
Application application,
FlinkEnv flinkEnv) {
String customSavepoint = this.getFinalSavepointDir(savepointPath, application);
FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId());
String clusterId = getClusterId(application, cluster);
Map<String, Object> properties = this.tryGetRestProps(application, cluster);
return new TriggerSavepointRequest(
application.getId(),
flinkEnv.getFlinkVersion(),
application.getFlinkExecutionMode(),
properties,
clusterId,
application.getJobId(),
customSavepoint,
nativeFormat,
application.getK8sNamespace());
}
}