blob: f3e713e5c826556bbeb1ade88f2ee2d7dae51424 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.component;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.SavePoint;
import org.apache.streampark.console.core.enums.CheckPointStatusEnum;
import org.apache.streampark.console.core.enums.FailoverStrategyEnum;
import org.apache.streampark.console.core.metrics.flink.CheckPoints;
import org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.alert.AlertService;
import org.apache.streampark.console.core.service.application.ApplicationActionService;
import org.apache.streampark.console.core.watcher.FlinkAppHttpWatcher;
import org.apache.flink.shaded.guava30.com.google.common.base.Preconditions;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Component
public class FlinkCheckpointProcessor {
private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0");
private static final Integer SAVEPOINT_CACHE_HOUR = 1;
private final Cache<String, Long> checkPointCache =
Caffeine.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build();
/**
* Cache to store the savepoint if be stored in the db. Use the {appId}_{jobID}_{chkId} from
* {@link CheckPointKey#getSavePointId()} as the cache key to save the trace of the savepoint. And
* try best to make sure the every savepoint would be stored into DB. Especially for the case
* 'maxConcurrent of Checkpoint' > 1: 1. savepoint(n-1) is completed after completed
* checkpoint(n); 2. savepoint(n-1) is completed after completed savepoint(n).
*/
private final Cache<String, Byte> savepointedCache =
Caffeine.newBuilder().expireAfterWrite(SAVEPOINT_CACHE_HOUR, TimeUnit.HOURS).build();
private final Map<Long, Counter> checkPointFailedCache = new ConcurrentHashMap<>(0);
@Autowired private ApplicationActionService applicationActionService;
@Autowired private AlertService alertService;
@Autowired private SavePointService savePointService;
@Autowired private FlinkAppHttpWatcher flinkAppHttpWatcher;
public void process(Application application, @Nonnull CheckPoints checkPoints) {
checkPoints.getLatestCheckpoint().forEach(checkPoint -> process(application, checkPoint));
}
private void process(Application application, @Nonnull CheckPoints.CheckPoint checkPoint) {
String jobID = application.getJobId();
Long appId = application.getId();
CheckPointStatusEnum status = checkPoint.getCheckPointStatus();
CheckPointKey checkPointKey = new CheckPointKey(appId, jobID, checkPoint.getId());
if (CheckPointStatusEnum.COMPLETED == status) {
if (shouldStoreAsSavepoint(checkPointKey, checkPoint)) {
savepointedCache.put(checkPointKey.getSavePointId(), DEFAULT_FLAG_BYTE);
saveSavepoint(checkPoint, application.getId());
flinkAppHttpWatcher.cleanSavepoint(application);
return;
}
Long latestChkId = getLatestCheckpointedId(appId, checkPointKey.getCheckPointId());
if (shouldStoreAsCheckpoint(checkPoint, latestChkId)) {
checkPointCache.put(checkPointKey.getCheckPointId(), checkPoint.getId());
saveSavepoint(checkPoint, application.getId());
}
} else if (shouldProcessFailedTrigger(checkPoint, application.cpFailedTrigger(), status)) {
processFailedCheckpoint(application, checkPoint, appId);
}
}
private void processFailedCheckpoint(
Application application, @Nonnull CheckPoints.CheckPoint checkPoint, Long appId) {
Counter counter = checkPointFailedCache.get(appId);
if (counter == null) {
checkPointFailedCache.put(appId, new Counter(checkPoint.getTriggerTimestamp()));
return;
}
long minute = counter.getDuration(checkPoint.getTriggerTimestamp());
if (minute > application.getCpFailureRateInterval()
|| counter.getCount() < application.getCpMaxFailureInterval()) {
counter.increment();
return;
}
checkPointFailedCache.remove(appId);
FailoverStrategyEnum failoverStrategyEnum =
FailoverStrategyEnum.of(application.getCpFailureAction());
Preconditions.checkArgument(
failoverStrategyEnum != null,
"Unexpected cpFailureAction: %s",
application.getCpFailureAction());
processFailoverStrategy(application, failoverStrategyEnum);
}
private void processFailoverStrategy(
Application application, FailoverStrategyEnum failoverStrategyEnum) {
switch (failoverStrategyEnum) {
case ALERT:
alertService.alert(
application.getAlertId(), AlertTemplate.of(application, CheckPointStatusEnum.FAILED));
break;
case RESTART:
try {
applicationActionService.restart(application);
} catch (Exception e) {
throw new RuntimeException(e);
}
break;
default:
// do nothing
break;
}
}
private static boolean shouldStoreAsCheckpoint(
@Nonnull CheckPoints.CheckPoint checkPoint, Long latestId) {
return !checkPoint.getIsSavepoint() && (latestId == null || latestId < checkPoint.getId());
}
private boolean shouldStoreAsSavepoint(
CheckPointKey checkPointKey, @Nonnull CheckPoints.CheckPoint checkPoint) {
if (!checkPoint.getIsSavepoint()) {
return false;
}
return savepointedCache.getIfPresent(checkPointKey.getSavePointId()) == null
// If the savepoint triggered before SAVEPOINT_CACHE_HOUR span, we'll see it as out-of-time
// savepoint and ignore it.
&& checkPoint.getTriggerTimestamp()
>= System.currentTimeMillis() - TimeUnit.HOURS.toMillis(SAVEPOINT_CACHE_HOUR);
}
@Nullable
private Long getLatestCheckpointedId(Long appId, String cacheId) {
return checkPointCache.get(
cacheId,
key -> {
SavePoint savePoint = savePointService.getLatest(appId);
return Optional.ofNullable(savePoint).map(SavePoint::getChkId).orElse(null);
});
}
private boolean shouldProcessFailedTrigger(
CheckPoints.CheckPoint checkPoint, boolean cpFailedTrigger, CheckPointStatusEnum status) {
return CheckPointStatusEnum.FAILED == status && !checkPoint.getIsSavepoint() && cpFailedTrigger;
}
private void saveSavepoint(CheckPoints.CheckPoint checkPoint, Long appId) {
SavePoint savePoint = new SavePoint();
savePoint.setAppId(appId);
savePoint.setChkId(checkPoint.getId());
savePoint.setLatest(true);
savePoint.setType(checkPoint.getCheckPointType().get());
savePoint.setPath(checkPoint.getExternalPath());
savePoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp()));
savePoint.setCreateTime(new Date());
savePointService.save(savePoint);
}
public static class Counter {
private final Long timestamp;
private final AtomicInteger count;
public Counter(Long timestamp) {
this.timestamp = timestamp;
this.count = new AtomicInteger(1);
}
public void increment() {
this.count.incrementAndGet();
}
public Integer getCount() {
return count.get();
}
public long getDuration(Long currentTimestamp) {
return (currentTimestamp - this.timestamp) / 1000 / 60;
}
}
/** Util class for checkpoint key. */
@Data
public static class CheckPointKey {
private Long appId;
private String jobId;
private Long checkId;
public CheckPointKey(Long appId, String jobId, Long checkId) {
this.appId = appId;
this.jobId = jobId;
this.checkId = checkId;
}
/** Get savepoint cache id, see {@link #savepointedCache}. */
public String getSavePointId() {
return String.format("%s_%s_%s", appId, jobId, checkId);
}
/** Get checkpoint cache id. */
public String getCheckPointId() {
return String.format("%s_%s", appId, jobId);
}
}
}