| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.streampark.console.core.watcher; |
| |
| import org.apache.streampark.common.conf.CommonConfig; |
| import org.apache.streampark.common.conf.InternalConfigHolder; |
| import org.apache.streampark.common.enums.ClusterState; |
| import org.apache.streampark.common.enums.FlinkExecutionMode; |
| import org.apache.streampark.common.util.HadoopUtils; |
| import org.apache.streampark.common.util.HttpClientUtils; |
| import org.apache.streampark.common.util.YarnUtils; |
| import org.apache.streampark.console.base.util.JacksonUtils; |
| import org.apache.streampark.console.core.bean.AlertTemplate; |
| import org.apache.streampark.console.core.entity.FlinkCluster; |
| import org.apache.streampark.console.core.metrics.flink.Overview; |
| import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo; |
| import org.apache.streampark.console.core.service.FlinkClusterService; |
| import org.apache.streampark.console.core.service.alert.AlertService; |
| import org.apache.streampark.console.core.service.application.ApplicationInfoService; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hc.client5.http.config.RequestConfig; |
| |
| import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
| import com.github.benmanes.caffeine.cache.Cache; |
| import com.github.benmanes.caffeine.cache.Caffeine; |
| import lombok.extern.slf4j.Slf4j; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.beans.factory.annotation.Qualifier; |
| import org.springframework.scheduling.annotation.Scheduled; |
| import org.springframework.stereotype.Component; |
| |
| import javax.annotation.PostConstruct; |
| |
| import java.time.Duration; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.TimeUnit; |
| |
| /** This implementation is currently used for tracing Cluster on yarn,remote,K8s mode */ |
| @Slf4j |
| @Component |
| public class FlinkClusterWatcher { |
| |
| @Autowired private FlinkClusterService flinkClusterService; |
| |
| @Autowired private AlertService alertService; |
| |
| @Autowired private ApplicationInfoService applicationInfoService; |
| |
| @Qualifier("flinkClusterWatchingExecutor") |
| @Autowired |
| private Executor executorService; |
| |
| private Long lastWatchTime = 0L; |
| |
| // Track interval every 30 seconds |
| private static final Duration WATCHER_INTERVAL = Duration.ofSeconds(30); |
| |
| /** Watcher cluster lists */ |
| private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new ConcurrentHashMap<>(8); |
| |
| private static final Cache<Long, ClusterState> FAILED_STATES = |
| Caffeine.newBuilder().expireAfterWrite(WATCHER_INTERVAL).build(); |
| |
| private boolean immediateWatch = false; |
| |
| /** Initialize cluster cache */ |
| @PostConstruct |
| private void init() { |
| WATCHER_CLUSTERS.clear(); |
| List<FlinkCluster> flinkClusters = |
| flinkClusterService.list( |
| new LambdaQueryWrapper<FlinkCluster>() |
| .eq(FlinkCluster::getClusterState, ClusterState.RUNNING.getState()) |
| // excluding flink clusters on kubernetes |
| .notIn(FlinkCluster::getExecutionMode, FlinkExecutionMode.getKubernetesMode())); |
| flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster)); |
| } |
| |
| @Scheduled(fixedDelay = 1000) |
| private void start() { |
| Long timeMillis = System.currentTimeMillis(); |
| if (immediateWatch || timeMillis - lastWatchTime >= WATCHER_INTERVAL.toMillis()) { |
| lastWatchTime = timeMillis; |
| immediateWatch = false; |
| WATCHER_CLUSTERS.forEach( |
| (aLong, flinkCluster) -> |
| executorService.execute( |
| () -> { |
| ClusterState state = getClusterState(flinkCluster); |
| switch (state) { |
| case FAILED: |
| case LOST: |
| case UNKNOWN: |
| case KILLED: |
| flinkClusterService.updateClusterState(flinkCluster.getId(), state); |
| unWatching(flinkCluster); |
| alert(flinkCluster, state); |
| break; |
| default: |
| break; |
| } |
| })); |
| } |
| } |
| |
| private void alert(FlinkCluster cluster, ClusterState state) { |
| if (cluster.getAlertId() != null) { |
| cluster.setAllJobs(applicationInfoService.countByClusterId(cluster.getId())); |
| cluster.setAffectedJobs( |
| applicationInfoService.countAffectedByClusterId( |
| cluster.getId(), InternalConfigHolder.get(CommonConfig.SPRING_PROFILES_ACTIVE()))); |
| cluster.setClusterState(state.getState()); |
| cluster.setEndTime(new Date()); |
| alertService.alert(cluster.getAlertId(), AlertTemplate.of(cluster, state)); |
| } |
| } |
| |
| /** |
| * Retrieves the state of a cluster from the Flink or YARN API. |
| * |
| * @param flinkCluster The FlinkCluster object representing the cluster. |
| * @return The ClusterState object representing the state of the cluster. |
| */ |
| public ClusterState getClusterState(FlinkCluster flinkCluster) { |
| ClusterState state = FAILED_STATES.getIfPresent(flinkCluster.getId()); |
| if (state != null) { |
| return state; |
| } |
| state = httpClusterState(flinkCluster); |
| if (ClusterState.isRunning(state)) { |
| FAILED_STATES.invalidate(flinkCluster.getId()); |
| } else { |
| immediateWatch = true; |
| FAILED_STATES.put(flinkCluster.getId(), state); |
| } |
| return state; |
| } |
| |
| /** |
| * Retrieves the state of a cluster from the Flink or YARN API using the remote HTTP endpoint. |
| * |
| * @param flinkCluster The FlinkCluster object representing the cluster. |
| * @return The ClusterState object representing the state of the cluster. |
| */ |
| private ClusterState httpRemoteClusterState(FlinkCluster flinkCluster) { |
| return getStateFromFlinkRestApi(flinkCluster); |
| } |
| |
| /** |
| * get yarn session cluster state |
| * |
| * @param flinkCluster |
| * @return |
| */ |
| private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) { |
| ClusterState state = getStateFromFlinkRestApi(flinkCluster); |
| if (ClusterState.LOST == state) { |
| return getStateFromYarnRestApi(flinkCluster); |
| } |
| return state; |
| } |
| |
| /** |
| * get flink cluster state |
| * |
| * @param flinkCluster |
| * @return |
| */ |
| private ClusterState httpClusterState(FlinkCluster flinkCluster) { |
| switch (flinkCluster.getFlinkExecutionModeEnum()) { |
| case REMOTE: |
| return httpRemoteClusterState(flinkCluster); |
| case YARN_SESSION: |
| return httpYarnSessionClusterState(flinkCluster); |
| default: |
| return ClusterState.UNKNOWN; |
| } |
| } |
| |
| /** |
| * cluster get state from flink rest api |
| * |
| * @param flinkCluster |
| * @return |
| */ |
| private ClusterState getStateFromFlinkRestApi(FlinkCluster flinkCluster) { |
| String address = flinkCluster.getAddress(); |
| String jobManagerUrl = flinkCluster.getJobManagerUrl(); |
| String flinkUrl = |
| StringUtils.isBlank(jobManagerUrl) |
| ? address.concat("/overview") |
| : jobManagerUrl.concat("/overview"); |
| try { |
| String res = |
| HttpClientUtils.httpGetRequest( |
| flinkUrl, |
| RequestConfig.custom().setConnectTimeout(5000, TimeUnit.MILLISECONDS).build()); |
| JacksonUtils.read(res, Overview.class); |
| return ClusterState.RUNNING; |
| } catch (Exception ignored) { |
| log.error("cluster id:{} get state from flink api failed", flinkCluster.getId()); |
| } |
| return ClusterState.LOST; |
| } |
| |
| /** |
| * cluster get state from yarn rest api |
| * |
| * @param flinkCluster |
| * @return |
| */ |
| private ClusterState getStateFromYarnRestApi(FlinkCluster flinkCluster) { |
| String yarnUrl = "ws/v1/cluster/apps/".concat(flinkCluster.getClusterId()); |
| try { |
| String result = YarnUtils.restRequest(yarnUrl); |
| if (null == result) { |
| return ClusterState.UNKNOWN; |
| } |
| YarnAppInfo yarnAppInfo = JacksonUtils.read(result, YarnAppInfo.class); |
| YarnApplicationState status = HadoopUtils.toYarnState(yarnAppInfo.getApp().getState()); |
| if (status == null) { |
| log.error( |
| "cluster id:{} final application status convert failed, invalid string ", |
| flinkCluster.getId()); |
| return ClusterState.UNKNOWN; |
| } |
| return yarnStateConvertClusterState(status); |
| } catch (Exception e) { |
| return ClusterState.LOST; |
| } |
| } |
| |
| /** |
| * add flinkCluster to watching |
| * |
| * @param flinkCluster |
| */ |
| public static void addWatching(FlinkCluster flinkCluster) { |
| if (!FlinkExecutionMode.isKubernetesMode(flinkCluster.getFlinkExecutionModeEnum()) |
| && !WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) { |
| log.info("add the cluster with id:{} to watcher cluster cache", flinkCluster.getId()); |
| WATCHER_CLUSTERS.put(flinkCluster.getId(), flinkCluster); |
| } |
| } |
| |
| /** @param flinkCluster */ |
| public static void unWatching(FlinkCluster flinkCluster) { |
| if (WATCHER_CLUSTERS.containsKey(flinkCluster.getId())) { |
| log.info("remove the cluster with id:{} from watcher cluster cache", flinkCluster.getId()); |
| WATCHER_CLUSTERS.remove(flinkCluster.getId()); |
| } |
| } |
| |
| /** |
| * yarn application state convert cluster state |
| * |
| * @param state |
| * @return |
| */ |
| private ClusterState yarnStateConvertClusterState(YarnApplicationState state) { |
| return state == YarnApplicationState.FINISHED |
| ? ClusterState.CANCELED |
| : ClusterState.of(state.toString()); |
| } |
| |
| /** |
| * Verify the cluster connection whether is valid. |
| * |
| * @return <code>false</code> if the connection of the cluster is invalid, <code>true</code> else. |
| */ |
| public Boolean verifyClusterConnection(FlinkCluster flinkCluster) { |
| ClusterState clusterStateEnum = httpClusterState(flinkCluster); |
| return ClusterState.isRunning(clusterStateEnum); |
| } |
| } |