blob: b4bf9c8b53156c77262e226f57ffad83999ac358 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.watcher;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.component.FlinkCheckpointProcessor;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
import org.apache.streampark.console.core.enums.OptionStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.enums.StopFromEnum;
import org.apache.streampark.console.core.metrics.flink.CheckPoints;
import org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.SavePointService;
import org.apache.streampark.console.core.service.alert.AlertService;
import org.apache.streampark.console.core.service.application.ApplicationActionService;
import org.apache.streampark.console.core.service.application.ApplicationInfoService;
import org.apache.streampark.console.core.service.application.ApplicationManageService;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hc.client5.http.config.RequestConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** This implementation is currently used for tracing flink job on yarn,standalone,remote mode */
@Slf4j
@Component
public class FlinkAppHttpWatcher {
@Autowired private ApplicationManageService applicationManageService;
@Autowired private ApplicationActionService applicationActionService;
@Autowired private ApplicationInfoService applicationInfoService;
@Autowired private AlertService alertService;
@Autowired private FlinkCheckpointProcessor checkpointProcessor;
@Autowired private FlinkClusterService flinkClusterService;
@Autowired private SavePointService savePointService;
@Autowired private FlinkClusterWatcher flinkClusterWatcher;
@Qualifier("flinkRestAPIWatchingExecutor")
@Autowired
private Executor executorService;
// track interval every 5 seconds
public static final Duration WATCHING_INTERVAL = Duration.ofSeconds(5);
// option interval within 10 seconds
private static final Duration OPTION_INTERVAL = Duration.ofSeconds(10);
/**
*
*
* <pre>
* record task requires save points
* It will only be used in the RUNNING state. If it is checked that the task is running and the save point is required,
* set the state of the task to savepoint
* </pre>
*/
private static final Cache<Long, Byte> SAVEPOINT_CACHE =
Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
/**
* Record the status of the first tracking task, because after the task is started, the overview
* of the task will be obtained during the first tracking
*/
private static final Cache<Long, Byte> STARTING_CACHE =
Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
/** tracking task list */
private static final Map<Long, Application> WATCHING_APPS = new ConcurrentHashMap<>(0);
/**
*
*
* <pre>
* StopFrom: marked a flink job canceling from the StreamPark or other ways:
* 1) If stop from streampark, We can know whether to make a savepoint when flink job canceling, and if We make a savepoint,
* We can set the savepoint as the latest savepoint, and the next time start, will be automatically choose to start.
* 2) if stop from other ways, there is no way to know the savepoint has been done, directly set all the savepoint to expire,
* and needs to be manually specified when started again.
* </pre>
*/
private static final Map<Long, StopFromEnum> STOP_FROM_MAP = new ConcurrentHashMap<>(0);
/**
* Cancelling tasks are placed in this cache with an expiration time of 10 seconds (the time of 2
* task monitoring polls).
*/
private static final Cache<Long, Byte> CANCELING_CACHE =
Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();
/**
* Task canceled tracking list, record who cancelled the tracking task Map<applicationId,userId>
*/
private static final Map<Long, Long> CANCELLED_JOB_MAP = new ConcurrentHashMap<>(0);
private static final Map<Long, FlinkCluster> FLINK_CLUSTER_MAP = new ConcurrentHashMap<>(0);
private static final Map<Long, OptionStateEnum> OPTIONING = new ConcurrentHashMap<>(0);
private Long lastWatchTime = 0L;
private Long lastOptionTime = 0L;
private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0");
@PostConstruct
public void init() {
WATCHING_APPS.clear();
List<Application> applications =
applicationManageService.list(
new LambdaQueryWrapper<Application>()
.eq(Application::getTracking, 1)
.ne(Application::getState, FlinkAppStateEnum.LOST.getValue())
.notIn(Application::getExecutionMode, FlinkExecutionMode.getKubernetesMode()));
applications.forEach(
(app) -> {
WATCHING_APPS.put(app.getId(), app);
STARTING_CACHE.put(app.getId(), DEFAULT_FLAG_BYTE);
});
}
@PreDestroy
public void doStop() {
log.info(
"FlinkAppHttpWatcher StreamPark Console will be shutdown,persistent application to database.");
WATCHING_APPS.forEach((k, v) -> applicationManageService.persistMetrics(v));
}
/**
* <strong>NOTE: The following conditions must be met for execution</strong>
*
* <p><strong>1) Program started or page operated task, such as start/stop, needs to return the
* state immediately. (the frequency of 1 second once, continued 10 seconds (10 times))</strong>
*
* <p><strong>2) Normal information obtain, once every 5 seconds</strong>
*/
@Scheduled(fixedDelay = 1000)
public void start() {
Long timeMillis = System.currentTimeMillis();
if (lastWatchTime == null
|| !OPTIONING.isEmpty()
|| timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis()
|| timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) {
lastWatchTime = timeMillis;
WATCHING_APPS.forEach(this::watch);
}
}
@VisibleForTesting
public @Nullable FlinkAppStateEnum tryQueryFlinkAppState(@Nonnull Long appId) {
Application app = WATCHING_APPS.get(appId);
return (app == null || app.getState() == null) ? null : app.getStateEnum();
}
private void watch(Long id, Application application) {
executorService.execute(
() -> {
try {
// query status from flink rest api
getStateFromFlink(application);
} catch (Exception flinkException) {
// query status from yarn rest api
try {
getStateFromYarn(application);
} catch (Exception e) {
doStateFailed(application);
}
}
});
}
private StopFromEnum getAppStopFrom(Long appId) {
return STOP_FROM_MAP.getOrDefault(appId, StopFromEnum.NONE);
}
/**
* Get the current task running status information from Flink rest api.
*
* @param application The application for which to retrieve the information
* @throws Exception if an error occurs while retrieving the information from the Flink REST API
*/
private void getStateFromFlink(Application application) throws Exception {
JobsOverview jobsOverview = httpJobsOverview(application);
Optional<JobsOverview.Job> optional;
FlinkExecutionMode execMode = application.getFlinkExecutionMode();
if (FlinkExecutionMode.YARN_APPLICATION == execMode
|| FlinkExecutionMode.YARN_PER_JOB == execMode) {
optional =
!jobsOverview.getJobs().isEmpty()
? jobsOverview.getJobs().stream()
.filter(a -> StringUtils.equals(application.getJobId(), a.getId()))
.findFirst()
: Optional.empty();
} else {
optional =
jobsOverview.getJobs().stream()
.filter(x -> x.getId().equals(application.getJobId()))
.findFirst();
}
if (optional.isPresent()) {
JobsOverview.Job jobOverview = optional.get();
FlinkAppStateEnum currentState = FlinkAppStateEnum.of(jobOverview.getState());
if (FlinkAppStateEnum.OTHER != currentState) {
try {
// 1) set info from JobOverview
handleJobOverview(application, jobOverview);
} catch (Exception e) {
log.error("get flink jobOverview error: {}", e.getMessage(), e);
}
try {
// 2) CheckPoints
handleCheckPoints(application);
} catch (Exception e) {
log.error("get flink jobOverview error: {}", e.getMessage(), e);
}
// 3) savePoint obsolete check and NEED_START check
OptionStateEnum optionStateEnum = OPTIONING.get(application.getId());
if (FlinkAppStateEnum.RUNNING == currentState) {
handleRunningState(application, optionStateEnum, currentState);
} else {
handleNotRunState(application, optionStateEnum, currentState);
}
}
}
}
/**
* <strong>Query the job history in yarn, indicating that the task has stopped, and the final
* status of the task is CANCELED</strong>
*
* @param application application
*/
private void getStateFromYarn(Application application) throws Exception {
OptionStateEnum optionStateEnum = OPTIONING.get(application.getId());
/*
If the status of the last time is CANCELING (flink rest server is not closed at the time of getting information)
and the status is not obtained this time (flink rest server is closed),
the task is considered CANCELED
*/
Byte flag = CANCELING_CACHE.getIfPresent(application.getId());
StopFromEnum stopFromEnum = getAppStopFrom(application.getId());
if (flag != null) {
log.info("FlinkAppHttpWatcher previous state: canceling.");
FlinkAppStateEnum flinkAppStateEnum = FlinkAppStateEnum.CANCELED;
try {
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
if (yarnAppInfo != null) {
String state = yarnAppInfo.getApp().getFinalStatus();
flinkAppStateEnum = FlinkAppStateEnum.of(state);
}
} finally {
if (stopFromEnum.isNone()) {
log.error(
"FlinkAppHttpWatcher query previous state was canceling and stopFrom NotFound,savePoint expired!");
savePointService.expire(application.getId());
if (flinkAppStateEnum == FlinkAppStateEnum.KILLED
|| flinkAppStateEnum == FlinkAppStateEnum.FAILED) {
doAlert(application, flinkAppStateEnum);
}
}
application.setState(flinkAppStateEnum.getValue());
cleanSavepoint(application);
cleanOptioning(optionStateEnum, application.getId());
doPersistMetrics(application, true);
}
} else {
// query the status from the yarn rest Api
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
if (yarnAppInfo == null) {
if (FlinkExecutionMode.REMOTE != application.getFlinkExecutionMode()) {
throw new RuntimeException("FlinkAppHttpWatcher getStateFromYarn failed ");
}
} else {
try {
String state = yarnAppInfo.getApp().getFinalStatus();
FlinkAppStateEnum flinkAppStateEnum = FlinkAppStateEnum.of(state);
if (FlinkAppStateEnum.OTHER == flinkAppStateEnum) {
return;
}
if (FlinkAppStateEnum.KILLED == flinkAppStateEnum) {
if (stopFromEnum.isNone()) {
log.error(
"FlinkAppHttpWatcher getStateFromYarn,job was killed and stopFrom NotFound,savePoint expired!");
savePointService.expire(application.getId());
}
flinkAppStateEnum = FlinkAppStateEnum.CANCELED;
cleanSavepoint(application);
application.setEndTime(new Date());
}
if (FlinkAppStateEnum.SUCCEEDED == flinkAppStateEnum) {
flinkAppStateEnum = FlinkAppStateEnum.FINISHED;
}
application.setState(flinkAppStateEnum.getValue());
cleanOptioning(optionStateEnum, application.getId());
doPersistMetrics(application, true);
if (FlinkAppStateEnum.FAILED == flinkAppStateEnum
|| FlinkAppStateEnum.LOST == flinkAppStateEnum
|| (FlinkAppStateEnum.CANCELED == flinkAppStateEnum && stopFromEnum.isNone())
|| applicationInfoService.checkAlter(application)) {
doAlert(application, flinkAppStateEnum);
stopCanceledJob(application.getId());
if (FlinkAppStateEnum.FAILED == flinkAppStateEnum) {
applicationActionService.start(application, true);
}
}
} catch (Exception e) {
if (FlinkExecutionMode.REMOTE != application.getFlinkExecutionMode()) {
throw new RuntimeException("FlinkAppHttpWatcher getStateFromYarn error,", e);
}
}
}
}
}
private void doStateFailed(Application application) {
/*
Query from flink's restAPI and yarn's restAPI both failed.
In this case, it is necessary to decide whether to return to the final state depending on the state being operated
*/
final OptionStateEnum optionStateEnum = OPTIONING.get(application.getId());
if (OptionStateEnum.STARTING != optionStateEnum) {
// non-mapping
if (application.getStateEnum() != FlinkAppStateEnum.MAPPING) {
log.error(
"FlinkAppHttpWatcher getStateFromFlink and getStateFromYARN error,job failed, savePoint expired!");
StopFromEnum stopFromEnum = getAppStopFrom(application.getId());
if (stopFromEnum.isNone()) {
savePointService.expire(application.getId());
application.setState(FlinkAppStateEnum.LOST.getValue());
doAlert(application, FlinkAppStateEnum.LOST);
} else {
application.setState(FlinkAppStateEnum.CANCELED.getValue());
}
}
/*
This step means that the above two ways to get information have failed, and this step is the last step,
which will directly identify the mission as cancelled or lost.
Need clean savepoint.
*/
application.setEndTime(new Date());
cleanSavepoint(application);
cleanOptioning(optionStateEnum, application.getId());
doPersistMetrics(application, true);
FlinkAppStateEnum appState = application.getStateEnum();
if (FlinkAppStateEnum.FAILED == appState || FlinkAppStateEnum.LOST == appState) {
doAlert(application, application.getStateEnum());
if (FlinkAppStateEnum.FAILED == appState) {
try {
applicationActionService.start(application, true);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
}
}
/**
* handle job overview
*
* @param application application
* @param jobOverview jobOverview
*/
private void handleJobOverview(Application application, JobsOverview.Job jobOverview)
throws IOException {
// compute duration
long startTime = jobOverview.getStartTime();
long endTime = jobOverview.getEndTime();
if (application.getStartTime() == null || startTime != application.getStartTime().getTime()) {
application.setStartTime(new Date(startTime));
}
if (endTime != -1
&& (application.getEndTime() == null || endTime != application.getEndTime().getTime())) {
application.setEndTime(new Date(endTime));
}
application.setJobId(jobOverview.getId());
application.setDuration(jobOverview.getDuration());
application.setTotalTask(jobOverview.getTasks().getTotal());
application.setOverview(jobOverview.getTasks());
// get overview info at the first start time
if (STARTING_CACHE.getIfPresent(application.getId()) != null) {
STARTING_CACHE.invalidate(application.getId());
Overview override = httpOverview(application);
if (override != null && override.getSlotsTotal() > 0) {
application.setTotalTM(override.getTaskmanagers());
application.setTotalSlot(override.getSlotsTotal());
application.setAvailableSlot(override.getSlotsAvailable());
}
}
}
/** get latest checkpoint */
private void handleCheckPoints(Application application) throws Exception {
CheckPoints checkPoints = httpCheckpoints(application);
if (checkPoints != null) {
checkpointProcessor.process(application, checkPoints);
}
}
/**
* Handle running task
*
* @param application application
* @param optionStateEnum optionState
* @param currentState currentState
*/
private void handleRunningState(
Application application, OptionStateEnum optionStateEnum, FlinkAppStateEnum currentState) {
/*
if the last recorded state is STARTING and the latest state obtained this time is RUNNING,
which means it is the first tracking after restart.
Then: the following the job status needs to be updated to the restart status:
NEED_RESTART_AFTER_CONF_UPDATE (Need to restart after modified configuration)
NEED_RESTART_AFTER_SQL_UPDATE (Need to restart after modified flink sql)
NEED_RESTART_AFTER_ROLLBACK (Need to restart after rollback)
NEED_RESTART_AFTER_DEPLOY (Need to rollback after deploy)
*/
if (OptionStateEnum.STARTING == optionStateEnum) {
Application latestApp = WATCHING_APPS.get(application.getId());
ReleaseStateEnum releaseStateEnum = latestApp.getReleaseState();
switch (releaseStateEnum) {
case NEED_RESTART:
case NEED_ROLLBACK:
LambdaUpdateWrapper<Application> updateWrapper =
new LambdaUpdateWrapper<Application>()
.eq(Application::getId, application.getId())
.set(Application::getRelease, ReleaseStateEnum.DONE.get());
applicationManageService.update(updateWrapper);
break;
default:
break;
}
}
// The current state is running, and there is a current task in the savePointCache,
// indicating that the task is doing savepoint
if (SAVEPOINT_CACHE.getIfPresent(application.getId()) != null) {
application.setOptionState(OptionStateEnum.SAVEPOINTING.getValue());
} else {
application.setOptionState(OptionStateEnum.NONE.getValue());
}
application.setState(currentState.getValue());
doPersistMetrics(application, false);
cleanOptioning(optionStateEnum, application.getId());
}
private void doPersistMetrics(Application application, boolean stopWatch) {
if (FlinkAppStateEnum.isEndState(application.getState())) {
application.setOverview(null);
application.setTotalTM(null);
application.setTotalSlot(null);
application.setTotalTask(null);
application.setAvailableSlot(null);
application.setJmMemory(null);
application.setTmMemory(null);
unWatching(application.getId());
} else if (stopWatch) {
unWatching(application.getId());
} else {
WATCHING_APPS.put(application.getId(), application);
}
applicationManageService.persistMetrics(application);
}
/**
* Handle not running task
*
* @param application application
* @param optionStateEnum optionState
* @param currentState currentState
*/
private void handleNotRunState(
Application application, OptionStateEnum optionStateEnum, FlinkAppStateEnum currentState)
throws Exception {
switch (currentState) {
case CANCELLING:
CANCELING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
cleanSavepoint(application);
application.setState(currentState.getValue());
doPersistMetrics(application, false);
break;
case CANCELED:
log.info(
"FlinkAppHttpWatcher getFromFlinkRestApi, job state {}, stop tracking and delete stopFrom!",
currentState.name());
cleanSavepoint(application);
application.setState(currentState.getValue());
StopFromEnum stopFromEnum = getAppStopFrom(application.getId());
if (stopFromEnum.isNone() || applicationInfoService.checkAlter(application)) {
if (stopFromEnum.isNone()) {
log.info(
"FlinkAppHttpWatcher getFromFlinkRestApi, job cancel is not form StreamPark,savePoint expired!");
savePointService.expire(application.getId());
}
stopCanceledJob(application.getId());
doAlert(application, FlinkAppStateEnum.CANCELED);
}
STOP_FROM_MAP.remove(application.getId());
doPersistMetrics(application, true);
cleanOptioning(optionStateEnum, application.getId());
break;
case FAILED:
cleanSavepoint(application);
STOP_FROM_MAP.remove(application.getId());
application.setState(FlinkAppStateEnum.FAILED.getValue());
doPersistMetrics(application, true);
doAlert(application, FlinkAppStateEnum.FAILED);
applicationActionService.start(application, true);
break;
case RESTARTING:
log.info(
"FlinkAppHttpWatcher getFromFlinkRestApi, job state {},add to starting",
currentState.name());
STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
break;
default:
application.setState(currentState.getValue());
doPersistMetrics(application, false);
}
}
private void cleanOptioning(OptionStateEnum optionStateEnum, Long key) {
if (optionStateEnum != null) {
lastOptionTime = System.currentTimeMillis();
OPTIONING.remove(key);
}
}
public void cleanSavepoint(Application application) {
SAVEPOINT_CACHE.invalidate(application.getId());
application.setOptionState(OptionStateEnum.NONE.getValue());
}
/** set current option state */
public static void setOptionState(Long appId, OptionStateEnum state) {
if (isKubernetesApp(appId)) {
return;
}
log.info("FlinkAppHttpWatcher setOptioning");
OPTIONING.put(appId, state);
if (OptionStateEnum.CANCELLING == state) {
STOP_FROM_MAP.put(appId, StopFromEnum.STREAMPARK);
}
}
public static void doWatching(Application application) {
if (isKubernetesApp(application)) {
return;
}
log.info("FlinkAppHttpWatcher add app to tracking,appId:{}", application.getId());
WATCHING_APPS.put(application.getId(), application);
STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE);
}
public static void addSavepoint(Long appId) {
if (isKubernetesApp(appId)) {
return;
}
log.info("FlinkAppHttpWatcher add app to savepoint,appId:{}", appId);
SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE);
}
public static void unWatching(Long appId) {
if (isKubernetesApp(appId)) {
return;
}
log.info("FlinkAppHttpWatcher stop app,appId:{}", appId);
WATCHING_APPS.remove(appId);
}
public static void stopCanceledJob(Long appId) {
if (!CANCELLED_JOB_MAP.containsKey(appId)) {
return;
}
log.info("flink job canceled app appId:{} by useId:{}", appId, CANCELLED_JOB_MAP.get(appId));
CANCELLED_JOB_MAP.remove(appId);
}
public static void addCanceledApp(Long appId, Long userId) {
log.info("flink job addCanceledApp app appId:{}, useId:{}", appId, userId);
CANCELLED_JOB_MAP.put(appId, userId);
}
public static Long getCanceledJobUserId(Long appId) {
return CANCELLED_JOB_MAP.get(appId) == null ? Long.valueOf(-1) : CANCELLED_JOB_MAP.get(appId);
}
public static Collection<Application> getWatchingApps() {
return WATCHING_APPS.values();
}
private static boolean isKubernetesApp(Application application) {
return FlinkK8sWatcherWrapper.isKubernetesApp(application);
}
private static boolean isKubernetesApp(Long appId) {
Application app = WATCHING_APPS.get(appId);
return FlinkK8sWatcherWrapper.isKubernetesApp(app);
}
private YarnAppInfo httpYarnAppInfo(Application application) throws Exception {
String reqURL = "ws/v1/cluster/apps/".concat(application.getAppId());
return yarnRestRequest(reqURL, YarnAppInfo.class);
}
private Overview httpOverview(Application application) throws IOException {
String appId = application.getAppId();
if (appId != null
&& (FlinkExecutionMode.YARN_APPLICATION == application.getFlinkExecutionMode()
|| FlinkExecutionMode.YARN_PER_JOB == application.getFlinkExecutionMode())) {
String reqURL;
String jmURL = application.getJobManagerUrl();
if (StringUtils.isNotBlank(jmURL) && Utils.checkHttpURL(jmURL)) {
String format = "%s/overview";
reqURL = String.format(format, jmURL);
} else {
String format = "proxy/%s/overview";
reqURL = String.format(format, appId);
}
return yarnRestRequest(reqURL, Overview.class);
}
return null;
}
private JobsOverview httpJobsOverview(Application application) throws Exception {
final String flinkUrl = "jobs/overview";
FlinkExecutionMode execMode = application.getFlinkExecutionMode();
if (FlinkExecutionMode.isYarnMode(execMode)) {
String reqURL;
String jmURL = application.getJobManagerUrl();
if (StringUtils.isNotBlank(jmURL) && Utils.checkHttpURL(jmURL)) {
String format = "%s/" + flinkUrl;
reqURL = String.format(format, jmURL);
} else {
String format = "proxy/%s/" + flinkUrl;
reqURL = String.format(format, application.getAppId());
}
return yarnRestRequest(reqURL, JobsOverview.class);
}
if (application.getJobId() != null && FlinkExecutionMode.isRemoteMode(execMode)) {
return httpRemoteCluster(
application.getFlinkClusterId(),
cluster -> {
String remoteUrl = cluster.getAddress() + "/" + flinkUrl;
JobsOverview jobsOverview = httpRestRequest(remoteUrl, JobsOverview.class);
if (jobsOverview != null) {
List<JobsOverview.Job> jobs =
jobsOverview.getJobs().stream()
.filter(x -> x.getId().equals(application.getJobId()))
.collect(Collectors.toList());
jobsOverview.setJobs(jobs);
}
return jobsOverview;
});
}
return null;
}
private CheckPoints httpCheckpoints(Application application) throws Exception {
final String flinkUrl = "jobs/%s/checkpoints";
FlinkExecutionMode execMode = application.getFlinkExecutionMode();
if (FlinkExecutionMode.isYarnMode(execMode)) {
String reqURL;
String jmURL = application.getJobManagerUrl();
if (StringUtils.isNotBlank(jmURL) && Utils.checkHttpURL(jmURL)) {
String format = "%s/" + flinkUrl;
reqURL = String.format(format, jmURL, application.getJobId());
} else {
String format = "proxy/%s/" + flinkUrl;
reqURL = String.format(format, application.getAppId(), application.getJobId());
}
return yarnRestRequest(reqURL, CheckPoints.class);
}
if (application.getJobId() != null && FlinkExecutionMode.isRemoteMode(execMode)) {
return httpRemoteCluster(
application.getFlinkClusterId(),
cluster -> {
String remoteUrl =
cluster.getAddress() + "/" + String.format(flinkUrl, application.getJobId());
return httpRestRequest(remoteUrl, CheckPoints.class);
});
}
return null;
}
private <T> T yarnRestRequest(String url, Class<T> clazz) throws IOException {
String result = YarnUtils.restRequest(url);
if (null == result) {
return null;
}
return JacksonUtils.read(result, clazz);
}
private <T> T httpRestRequest(String url, Class<T> clazz) throws IOException {
String result =
HttpClientUtils.httpGetRequest(
url, RequestConfig.custom().setConnectTimeout(5000, TimeUnit.MILLISECONDS).build());
if (null == result) {
return null;
}
return JacksonUtils.read(result, clazz);
}
public boolean isWatchingApp(Long id) {
return WATCHING_APPS.containsKey(id);
}
private <T> T httpRemoteCluster(Long clusterId, Callback<FlinkCluster, T> function)
throws Exception {
FlinkCluster flinkCluster = getFlinkRemoteCluster(clusterId, false);
try {
return function.call(flinkCluster);
} catch (Exception e) {
flinkCluster = getFlinkRemoteCluster(clusterId, true);
return function.call(flinkCluster);
}
}
private FlinkCluster getFlinkRemoteCluster(Long clusterId, boolean flush) {
FlinkCluster flinkCluster = FLINK_CLUSTER_MAP.get(clusterId);
if (flinkCluster == null || flush) {
flinkCluster = flinkClusterService.getById(clusterId);
FLINK_CLUSTER_MAP.put(clusterId, flinkCluster);
}
return flinkCluster;
}
interface Callback<T, R> {
R call(T e) throws Exception;
}
/**
* Describes the alarming behavior under abnormal operation for different job running modes:
*
* <p>- <strong>yarn per job</strong> or <strong>yarn application</strong>
*
* <p>Directly triggers an alarm when the job encounters an abnormal condition.<br>
*
* <p>- <strong>yarn session</strong> or <strong>remote</strong>
*
* <p>If the Flink cluster configuration lacks alarm information, it triggers an alarm directly
* when the job is abnormal.<br>
* If the Flink cluster configuration has alarm information:
*
* <p>When the job is abnormal due to an issue in the Flink cluster, the job's alarm will be held
* back, instead waiting for the Flink cluster's alarm.<br>
* When the job is abnormal due to the job itself and the Flink cluster is running normally, an
* alarm specific to the job will be triggered.
*
* @param app application
* @param appState application state
*/
private void doAlert(Application app, FlinkAppStateEnum appState) {
if (app.getProbing()) {
log.info("application with id {} is probing, don't send alert", app.getId());
return;
}
switch (app.getFlinkExecutionMode()) {
case YARN_APPLICATION:
case YARN_PER_JOB:
alertService.alert(app.getAlertId(), AlertTemplate.of(app, appState));
return;
case YARN_SESSION:
case REMOTE:
FlinkCluster flinkCluster = flinkClusterService.getById(app.getFlinkClusterId());
if (flinkClusterWatcher.verifyClusterConnection(flinkCluster)) {
log.info(
"application with id {} is yarn session or remote and flink cluster with id {} is alive, application send alert",
app.getId(),
app.getFlinkClusterId());
alertService.alert(app.getAlertId(), AlertTemplate.of(app, appState));
}
break;
default:
break;
}
}
}