blob: dada561c07885cfbc9809c3c4692d93e119570a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.console.core.watcher;
import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.bean.AlertTemplate;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.alert.AlertService;
import org.apache.streampark.console.core.service.application.ApplicationInfoService;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hc.client5.http.config.RequestConfig;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */
@Slf4j
@Component
public class FlinkClusterWatcher {
@Autowired private FlinkClusterService flinkClusterService;
@Autowired private AlertService alertService;
@Autowired private ApplicationInfoService applicationInfoService;
@Qualifier("flinkClusterWatchingExecutor")
@Autowired
private Executor executorService;
private Long lastWatchTime = 0L;
// Track interval every 30 seconds
private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30);
/** Watcher cluster lists */
private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new ConcurrentHashMap<>(8);
private static final Cache<Long, ClusterState> FAILED_STATES =
Caffeine.newBuilder().expireAfterWrite(WATCHER_INTERVAL).build();
private boolean immediateWatch = false;
/** Initialize cluster cache */
@PostConstruct
private void init() {
WATCHER_CLUSTERS.clear();
List<FlinkCluster> flinkClusters =
flinkClusterService.list(
new LambdaQueryWrapper<FlinkCluster>()
.eq(FlinkCluster::getClusterState, ClusterState.RUNNING.getState())
// excluding flink clusters on kubernetes
.notIn(FlinkCluster::getExecutionMode, FlinkExecutionMode.getKubernetesMode()));
flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster));
}
@Scheduled(fixedDelay = 1000)
private void start() {
Long timeMillis = System.currentTimeMillis();
if (immediateWatch || timeMillis - lastWatchTime >= WATCHER_INTERVAL.toMillis()) {
lastWatchTime = timeMillis;
immediateWatch = false;
WATCHER_CLUSTERS.forEach(
(aLong, flinkCluster) ->
executorService.execute(
() -> {
ClusterState state = getClusterState(flinkCluster);
switch (state) {
case FAILED:
case LOST:
case UNKNOWN:
case KILLED:
flinkClusterService.updateClusterState(flinkCluster.getId(), state);
unWatching(flinkCluster);
alert(flinkCluster, state);
break;
default:
break;
}
}));
}
}
private void alert(FlinkCluster cluster, ClusterState state) {
if (cluster.getAlertId() != null) {
cluster.setAllJobs(applicationInfoService.countByClusterId(cluster.getId()));
cluster.setAffectedJobs(
applicationInfoService.countAffectedByClusterId(
cluster.getId(), InternalConfigHolder.get(CommonConfig.SPRING_PROFILES_ACTIVE())));
cluster.setClusterState(state.getState());
cluster.setEndTime(new Date());
alertService.alert(cluster.getAlertId(), AlertTemplate.of(cluster, state));
}
}
/**
* Retrieves the state of a cluster from the Flink or YARN API.
*
* @param flinkCluster The FlinkCluster object representing the cluster.
* @return The ClusterState object representing the state of the cluster.
*/
public ClusterState getClusterState(FlinkCluster flinkCluster) {
ClusterState state = FAILED_STATES.getIfPresent(flinkCluster.getId());
if (state != null) {
return state;
}
state = httpClusterState(flinkCluster);
if (ClusterState.isRunning(state)) {
FAILED_STATES.invalidate(flinkCluster.getId());
} else {
immediateWatch = true;
FAILED_STATES.put(flinkCluster.getId(), state);
}
return state;
}
/**
* Retrieves the state of a cluster from the Flink or YARN API using the remote HTTP endpoint.
*
* @param flinkCluster The FlinkCluster object representing the cluster.
* @return The ClusterState object representing the state of the cluster.
*/
private ClusterState httpRemoteClusterState(FlinkCluster flinkCluster) {
return getStateFromFlinkRestApi(flinkCluster);
}
/**
* get yarn session cluster state
*
* @param flinkCluster
* @return
*/
private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
ClusterState state = getStateFromFlinkRestApi(flinkCluster);
if (ClusterState.LOST == state) {
return getStateFromYarnRestApi(flinkCluster);
}
return state;
}
/**
* get flink cluster state
*
* @param flinkCluster
* @return
*/
private ClusterState httpClusterState(FlinkCluster flinkCluster) {
switch (flinkCluster.getFlinkExecutionModeEnum()) {
case REMOTE:
return httpRemoteClusterState(flinkCluster);
case YARN_SESSION:
return httpYarnSessionClusterState(flinkCluster);
default:
return ClusterState.UNKNOWN;
}
}
/**
* cluster get state from flink rest api
*
* @param flinkCluster
* @return
*/
private ClusterState getStateFromFlinkRestApi(FlinkCluster flinkCluster) {
String address = flinkCluster.getAddress();
String jobManagerUrl = flinkCluster.getJobManagerUrl();
String flinkUrl =
StringUtils.isBlank(jobManagerUrl)
? address.concat("/overview")
: jobManagerUrl.concat("/overview");
try {
String res =
HttpClientUtils.httpGetRequest(
flinkUrl,
RequestConfig.custom().setConnectTimeout(5000, TimeUnit.MILLISECONDS).build());
JacksonUtils.read(res, Overview.class);
return ClusterState.RUNNING;
} catch (Exception ignored) {
log.error("cluster id:{} get state from flink api failed", flinkCluster.getId());
}
return ClusterState.LOST;
}
/**
* cluster get state from yarn rest api
*
* @param flinkCluster
* @return
*/
private ClusterState getStateFromYarnRestApi(FlinkCluster flinkCluster) {
String yarnUrl = "ws/v1/cluster/apps/".concat(flinkCluster.getClusterId());
try {
String result = YarnUtils.restRequest(yarnUrl);
if (null == result) {
return ClusterState.UNKNOWN;
}
YarnAppInfo yarnAppInfo = JacksonUtils.read(result, YarnAppInfo.class);
YarnApplicationState status = HadoopUtils.toYarnState(yarnAppInfo.getApp().getState());
if (status == null) {
log.error(
"cluster id:{} final application status convert failed, invalid string ",
flinkCluster.getId());
return ClusterState.UNKNOWN;
}
return yarnStateConvertClusterState(status);
} catch (Exception e) {
return ClusterState.LOST;
}
}
/**
* add flinkCluster to watching
*
* @param flinkCluster
*/
public static void addWatching(FlinkCluster flinkCluster) {
if (!FlinkExecutionMode.isKubernetesMode(flinkCluster.getFlinkExecutionModeEnum())
&& !WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
log.info("add the cluster with id:{} to watcher cluster cache", flinkCluster.getId());
WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster);
}
}
/** @param flinkCluster */
public static void unWatching(FlinkCluster flinkCluster) {
if (WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) {
log.info("remove the cluster with id:{} from watcher cluster cache", flinkCluster.getId());
WATCHER_CLUSTERS.remove(flinkCluster.getId());
}
}
/**
* yarn application state convert cluster state
*
* @param state
* @return
*/
private ClusterState yarnStateConvertClusterState(YarnApplicationState state) {
return state == YarnApplicationState.FINISHED
? ClusterState.CANCELED
: ClusterState.of(state.toString());
}
/**
* Verify the cluster connection whether is valid.
*
* @return <code>false</code> if the connection of the cluster is invalid, <code>true</code> else.
*/
public Boolean verifyClusterConnection(FlinkCluster flinkCluster) {
ClusterState clusterStateEnum = httpClusterState(flinkCluster);
return ClusterState.isRunning(clusterStateEnum);
}
}