| /* |
| * Copyright (c) 2019 The StreamX Project |
| * <p> |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.streamxhub.streamx.console.core.task; |
| |
| import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
| import com.github.benmanes.caffeine.cache.Cache; |
| import com.github.benmanes.caffeine.cache.Caffeine; |
| import com.streamxhub.streamx.common.util.ThreadUtils; |
| import com.streamxhub.streamx.console.core.entity.Application; |
| import com.streamxhub.streamx.console.core.entity.SavePoint; |
| import com.streamxhub.streamx.console.core.enums.DeployState; |
| import com.streamxhub.streamx.console.core.enums.FlinkAppState; |
| import com.streamxhub.streamx.console.core.enums.OptionState; |
| import com.streamxhub.streamx.console.core.enums.StopFrom; |
| import com.streamxhub.streamx.console.core.metrics.flink.CheckPoints; |
| import com.streamxhub.streamx.console.core.metrics.flink.JobsOverview; |
| import com.streamxhub.streamx.console.core.metrics.flink.Overview; |
| import com.streamxhub.streamx.console.core.metrics.yarn.AppInfo; |
| import com.streamxhub.streamx.console.core.service.AlertService; |
| import com.streamxhub.streamx.console.core.service.ApplicationService; |
| import com.streamxhub.streamx.console.core.service.SavePointService; |
| import lombok.extern.slf4j.Slf4j; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.context.annotation.DependsOn; |
| import org.springframework.scheduling.annotation.Scheduled; |
| import org.springframework.stereotype.Component; |
| |
| import javax.annotation.PostConstruct; |
| import javax.annotation.PreDestroy; |
| import java.io.IOException; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.concurrent.*; |
| |
| /** |
| * <pre><b> |
| * 十步杀一人 |
| * 千里不留行 |
| * 事了拂衣去 |
| * 深藏身与名 |
| * </b></pre> |
| * <br> |
| * <strong> |
| * NOTE:未曾想该类竟然是改动最多的 |
| * 最容易出问题的<br> |
| * 看似一个简单的功能<br> |
| * 每个状态的处理<br> |
| * 每个操作<br> |
| * 都是经过反复的思考<br> |
| * 反复的测试<br> |
| * 反复的修改<br> |
| * 重要的事情说三遍:<br> |
| * 魔鬼在细节中<br> |
| * 魔鬼在细节中<br> |
| * 魔鬼在细节中...<br> |
| * </strong> |
| * |
| * @author benjobs |
| */ |
| @Slf4j |
| @Component |
| @DependsOn({"flyway", "flywayInitializer"}) |
| public class FlinkTrackingTask { |
| |
| /** |
| * 记录任务是否需要savePoint<br> |
| * 只有在RUNNING状态下才会真正使用,如检查到任务正在运行,且需要savePoint,则设置该任务的状态为"savepoint"<br> |
| */ |
| private static final Cache<Long, Byte> SAVEPOINT_CACHE = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build(); |
| |
| /** |
| * 记录第一次跟踪任务的状态,因为在任务启动后会在第一次跟踪时会获取任务的overview |
| */ |
| private static final Cache<Long, Byte> STARTING_CACHE = Caffeine.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES).build(); |
| |
| /** |
| * 跟踪任务列表 |
| */ |
| private static final Map<Long, Application> TRACKING_MAP = new ConcurrentHashMap<>(0); |
| |
| /** |
| * StopFrom: 用来记录任务是从StreamX web管理端停止的还是其他方式停止<br> |
| * 如从StreamX web管理端停止可以知道在停止任务时是否做savepoint,如做了savepoint,则将该savepoint设置为最后有效的savepoint,下次启动时,自动选择从该savepoint<br> |
| * 如:其他方式停止则,无法知道是否savepoint,直接将所有的savepoint设置为过期,任务再次启动时需要手动指定<br> |
| */ |
| private static final Map<Long, StopFrom> STOP_FROM_MAP = new ConcurrentHashMap<>(0); |
| |
| /** |
| * 检查到正在canceling的任务放到该cache中,过期时间为10秒(2次任务监控轮询的时间). |
| */ |
| private final Cache<Long, Byte> cancelingCache = Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build(); |
| |
| @Autowired |
| private SavePointService savePointService; |
| |
| @Autowired |
| private AlertService alertService; |
| |
| private static ApplicationService applicationService; |
| |
| private static final Map<String, Long> CHECK_POINT_MAP = new ConcurrentHashMap<>(); |
| |
| private static final Map<Long, OptionState> OPTIONING = new ConcurrentHashMap<>(); |
| /** |
| * 10秒之内 |
| */ |
| private final Long OPTION_INTERVAL = 1000L * 10; |
| |
| private final Long STARTING_INTERVAL = 1000L * 30; |
| |
| /** |
| * 正常5秒钟获取一次信息 |
| */ |
| private final Long TRACK_INTERVAL = 1000L * 5; |
| |
| private Long lastTrackTime = 0L; |
| |
| private Long lastOptionTime = 0L; |
| |
| private static Long optioningTime = 0L; |
| |
| private static final Byte DEFAULT_FLAG_BYTE = Byte.valueOf("0"); |
| |
| private final ExecutorService executor = new ThreadPoolExecutor( |
| Runtime.getRuntime().availableProcessors() * 2, |
| 200, |
| 60L, |
| TimeUnit.SECONDS, |
| new LinkedBlockingQueue<>(1024), |
| ThreadUtils.threadFactory("flink-tracking-executor")); |
| |
| @Autowired |
| public void setApplicationService(ApplicationService appService) { |
| applicationService = appService; |
| } |
| |
| @PostConstruct |
| public void initialization() { |
| getAllApplications().forEach((app) -> TRACKING_MAP.put(app.getId(), app)); |
| } |
| |
| @PreDestroy |
| public void ending() { |
| log.info("flinkTrackingTask StreamXConsole will be shutdown,persistent application to database."); |
| TRACKING_MAP.forEach((k, v) -> persistent(v)); |
| } |
| |
| /** |
| * <p> <strong> NOTE: 执行必须满足以下条件</strong> |
| * <p> <strong>1) 工程刚启动或者管理端页面正常操作任务(启动|停止),该操作需要非常实时的返回状态,频率1秒一次,持续10秒种(10次)</strong></p> |
| * <p> <strong>2) 正常的状态信息获取,5秒执行一次</strong></p> |
| */ |
| @Scheduled(fixedDelay = 1000) |
| public void execute() { |
| //1) 项目刚启动第一次执行,或者前端正在操作...(启动,停止)需要立即返回状态信息. |
| if (lastTrackTime == null || !OPTIONING.isEmpty()) { |
| tracking(); |
| } else if (System.currentTimeMillis() - lastOptionTime <= OPTION_INTERVAL) { |
| //2) 如果在管理端正在操作时间的10秒中之内(每秒执行一次) |
| tracking(); |
| } else if (System.currentTimeMillis() - lastTrackTime >= TRACK_INTERVAL) { |
| //3) 正常信息获取,判断本次时间和上次时间是否间隔5秒(正常监控信息获取,每5秒一次) |
| tracking(); |
| } |
| } |
| |
| private void tracking() { |
| Long now = System.currentTimeMillis(); |
| lastTrackTime = now; |
| TRACKING_MAP.forEach((key, application) -> executor.execute(() -> { |
| final StopFrom stopFrom = STOP_FROM_MAP.getOrDefault(key, null) == null ? StopFrom.NONE : STOP_FROM_MAP.get(key); |
| final OptionState optionState = OPTIONING.get(key); |
| try { |
| // 1) 到flink的REST Api中查询状态 |
| assert application.getId() != null; |
| getFromFlinkRestApi(application, stopFrom); |
| } catch (Exception flinkException) { |
| // 2) 到 YARN REST api中查询状态 |
| try { |
| getFromYarnRestApi(application, stopFrom); |
| } catch (Exception yarnException) { |
| /** |
| * 3) 从flink的restAPI和yarn的restAPI都查询失败</br> |
| * 此时需要根据管理端正在操作的状态来决定是否返回最终状态,需满足:</br> |
| * 1: 操作状态为为取消和正常的状态跟踪(操作状态不为STARTING)</br> |
| * 2: 如果操作状态为STARTING,则需要判断操作间隔是否在30秒之内(启动可能需要时间,这里给足够多的时间去完成启动)</br> |
| */ |
| if (optionState == null |
| || !optionState.equals(OptionState.STARTING) |
| || now - optioningTime >= STARTING_INTERVAL) { |
| //非正在手动映射appId |
| if (application.getState() != FlinkAppState.MAPPING.getValue()) { |
| log.error("flinkTrackingTask getFromFlinkRestApi and getFromYarnRestApi error,job failed,savePoint obsoleted!"); |
| if (StopFrom.NONE.equals(stopFrom)) { |
| savePointService.obsolete(application.getId()); |
| application.setState(FlinkAppState.LOST.getValue()); |
| alertService.alert(application, FlinkAppState.LOST); |
| } else { |
| application.setState(FlinkAppState.CANCELED.getValue()); |
| } |
| } |
| /** |
| * 进入到这一步说明前两种方式获取信息都失败,此步是最后一步,直接会判别任务取消或失联</br> |
| * 需清空savepoint. |
| */ |
| cleanSavepoint(application); |
| cleanOptioning(optionState, key); |
| application.setEndTime(new Date()); |
| this.persistentAndClean(application); |
| |
| FlinkAppState appState = FlinkAppState.of(application.getState()); |
| if (appState.equals(FlinkAppState.FAILED) || appState.equals(FlinkAppState.LOST)) { |
| alertService.alert(application, FlinkAppState.of(application.getState())); |
| if (appState.equals(FlinkAppState.FAILED)) { |
| try { |
| applicationService.start(application, true); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| } |
| } |
| } |
| })); |
| } |
| |
| /** |
| * 从flink restapi成功拿到当前任务的运行状态信息... |
| * |
| * @param application |
| * @param stopFrom |
| * @throws Exception |
| */ |
| private void getFromFlinkRestApi(Application application, StopFrom stopFrom) throws Exception { |
| JobsOverview jobsOverview = application.httpJobsOverview(); |
| Optional<JobsOverview.Job> optional = jobsOverview.getJobs().stream().findFirst(); |
| |
| if (optional.isPresent()) { |
| |
| JobsOverview.Job jobOverview = optional.get(); |
| FlinkAppState currentState = FlinkAppState.of(jobOverview.getState()); |
| |
| if (!FlinkAppState.OTHER.equals(currentState)) { |
| // 1) set info from JobOverview |
| handleJobOverview(application, jobOverview); |
| |
| //2) CheckPoints |
| handleCheckPoints(application); |
| |
| //3) savePoint obsolete check and NEED_START check |
| OptionState optionState = OPTIONING.get(application.getId()); |
| // cpu分支预测,将Running的状态单独拿出来 |
| if (currentState.equals(FlinkAppState.RUNNING)) { |
| handleRunningState(application, optionState, currentState); |
| } else { |
| handleNotRunState(application, optionState, currentState, stopFrom); |
| } |
| } |
| } |
| } |
| |
| /** |
| * 基本信息回写等处理 |
| * |
| * @param application |
| * @param jobOverview |
| * @throws IOException |
| */ |
| private void handleJobOverview(Application application, JobsOverview.Job jobOverview) throws IOException { |
| // 1) jobId以restapi返回的状态为准 |
| application.setJobId(jobOverview.getId()); |
| application.setTotalTask(jobOverview.getTasks().getTotal()); |
| application.setOverview(jobOverview.getTasks()); |
| |
| // 2) duration |
| long startTime = jobOverview.getStartTime(); |
| long endTime = jobOverview.getEndTime(); |
| if (application.getStartTime() == null) { |
| application.setStartTime(new Date(startTime)); |
| } else if (startTime != application.getStartTime().getTime()) { |
| application.setStartTime(new Date(startTime)); |
| } |
| if (endTime != -1) { |
| if (application.getEndTime() == null || endTime != application.getEndTime().getTime()) { |
| application.setEndTime(new Date(endTime)); |
| } |
| } |
| application.setDuration(jobOverview.getDuration()); |
| |
| // 3) overview,刚启动第一次获取Overview信息. |
| if (STARTING_CACHE.getIfPresent(application.getId()) != null) { |
| Overview override = application.httpOverview(); |
| if (override.getSlotsTotal() > 0) { |
| STARTING_CACHE.invalidate(application.getId()); |
| application.setTotalTM(override.getTaskmanagers()); |
| application.setTotalSlot(override.getSlotsTotal()); |
| application.setAvailableSlot(override.getSlotsAvailable()); |
| } |
| } |
| } |
| |
| /** |
| * 获取最新的checkPoint |
| * |
| * @param application |
| * @throws IOException |
| */ |
| private void handleCheckPoints(Application application) throws IOException { |
| CheckPoints checkPoints = application.httpCheckpoints(); |
| if (checkPoints != null) { |
| CheckPoints.Latest latest = checkPoints.getLatest(); |
| if (latest != null ) { |
| CheckPoints.CheckPoint checkPoint = latest.getCompleted(); |
| if (checkPoint != null && checkPoint.isCompleted()) { |
| Long latestId = CHECK_POINT_MAP.get(application.getJobId()); |
| if (latestId == null || latestId < checkPoint.getId()) { |
| SavePoint savePoint = new SavePoint(); |
| savePoint.setAppId(application.getId()); |
| savePoint.setLatest(true); |
| savePoint.setType(checkPoint.getCheckPointType().get()); |
| savePoint.setPath(checkPoint.getPath()); |
| savePoint.setTriggerTime(new Date(checkPoint.getTriggerTimestamp())); |
| savePoint.setCreateTime(new Date()); |
| savePointService.save(savePoint); |
| CHECK_POINT_MAP.put(application.getJobId(), checkPoint.getId()); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * 当前任务正在运行,一系列状态处理. |
| * |
| * @param application |
| * @param optionState |
| * @param currentState |
| */ |
| private void handleRunningState(Application application, OptionState optionState, FlinkAppState currentState) { |
| /** |
| * 上次记录的状态的 "STARTING" 本次获取到最新的状态为"RUNNING",说明是重启后的第一次跟踪 |
| * 则:job之前一下状态需要请求重启状态: |
| * NEED_RESTART_AFTER_CONF_UPDATE(配置文件修改后需要重新启动) |
| * NEED_RESTART_AFTER_SQL_UPDATE(flink sql修改后需要重启) |
| * NEED_RESTART_AFTER_ROLLBACK(任务回滚后需要重启) |
| * NEED_RESTART_AFTER_DEPLOY(任务重新发布后需要回滚) |
| */ |
| if (OptionState.STARTING.equals(optionState)) { |
| DeployState deployState = DeployState.of(application.getDeploy()); |
| //如果任务更新后需要重新启动 或 发布后需要重新启动 |
| switch (deployState) { |
| case NEED_RESTART_AFTER_CONF_UPDATE: |
| case NEED_RESTART_AFTER_SQL_UPDATE: |
| case NEED_RESTART_AFTER_ROLLBACK: |
| case NEED_RESTART_AFTER_DEPLOY: |
| case NEED_ROLLBACK: |
| //清空需要重新启动的状态. |
| application.setDeploy(DeployState.DONE.get()); |
| break; |
| default: |
| break; |
| } |
| } |
| // 当前状态为running,且savePointCache里有当前任务,说明该任务正在做savepoint |
| if (SAVEPOINT_CACHE.getIfPresent(application.getId()) != null) { |
| application.setOptionState(OptionState.SAVEPOINTING.getValue()); |
| } else { |
| application.setOptionState(OptionState.NONE.getValue()); |
| } |
| application.setState(currentState.getValue()); |
| TRACKING_MAP.put(application.getId(), application); |
| cleanOptioning(optionState, application.getId()); |
| } |
| |
| |
| /** |
| * 当前任务未运行,状态处理 |
| * |
| * @param application |
| * @param optionState |
| * @param currentState |
| * @param stopFrom |
| */ |
| private void handleNotRunState(Application application, |
| OptionState optionState, |
| FlinkAppState currentState, |
| StopFrom stopFrom) throws Exception { |
| switch (currentState) { |
| case CANCELLING: |
| cancelingCache.put(application.getId(), DEFAULT_FLAG_BYTE); |
| cleanSavepoint(application); |
| application.setState(currentState.getValue()); |
| TRACKING_MAP.put(application.getId(), application); |
| break; |
| case CANCELED: |
| log.info("flinkTrackingTask getFromFlinkRestApi, job state {}, stop tracking and delete stopFrom!", currentState.name()); |
| cleanSavepoint(application); |
| application.setState(currentState.getValue()); |
| if (StopFrom.NONE.equals(stopFrom)) { |
| log.info("flinkTrackingTask getFromFlinkRestApi, job cancel is not form streamX,savePoint obsoleted!"); |
| savePointService.obsolete(application.getId()); |
| alertService.alert(application, FlinkAppState.CANCELED); |
| } |
| //清理stopFrom |
| STOP_FROM_MAP.remove(application.getId()); |
| //持久化application并且移除跟踪监控 |
| persistentAndClean(application); |
| cleanOptioning(optionState, application.getId()); |
| break; |
| case FAILED: |
| cleanSavepoint(application); |
| //清理stopFrom |
| STOP_FROM_MAP.remove(application.getId()); |
| application.setState(FlinkAppState.FAILED.getValue()); |
| //持久化application并且移除跟踪监控 |
| persistentAndClean(application); |
| alertService.alert(application, FlinkAppState.FAILED); |
| applicationService.start(application, true); |
| break; |
| case RESTARTING: |
| log.info("flinkTrackingTask getFromFlinkRestApi, job state {},add to starting", currentState.name()); |
| STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE); |
| break; |
| default: |
| application.setState(currentState.getValue()); |
| TRACKING_MAP.put(application.getId(), application); |
| } |
| } |
| |
| /** |
| * <p><strong>到 yarn中查询job的历史记录,说明flink任务已经停止,任务的最终状态为"CANCELED"</strong> |
| * |
| * @param application |
| * @param stopFrom |
| */ |
| private void getFromYarnRestApi(Application application, StopFrom stopFrom) throws Exception { |
| log.debug("flinkTrackingTask getFromYarnRestApi starting..."); |
| OptionState optionState = OPTIONING.get(application.getId()); |
| |
| /** |
| * 上一次的状态为canceling(在获取信息时flink restServer还未关闭为canceling) |
| * 且本次如获取不到状态(flink restServer已关闭),则认为任务已经CANCELED |
| */ |
| Byte flag = cancelingCache.getIfPresent(application.getId()); |
| if (flag != null) { |
| log.info("flinkTrackingTask previous state: canceling."); |
| if (StopFrom.NONE.equals(stopFrom)) { |
| log.error("flinkTrackingTask query previous state was canceling and stopFrom NotFound,savePoint obsoleted!"); |
| savePointService.obsolete(application.getId()); |
| } |
| application.setState(FlinkAppState.CANCELED.getValue()); |
| cleanSavepoint(application); |
| cleanOptioning(optionState, application.getId()); |
| this.persistentAndClean(application); |
| } else { |
| // 2)到yarn的restApi中查询状态 |
| AppInfo appInfo = application.httpYarnAppInfo(); |
| if (appInfo == null) { |
| throw new RuntimeException("flinkTrackingTask getFromYarnRestApi failed "); |
| } else { |
| try { |
| String state = appInfo.getApp().getFinalStatus(); |
| FlinkAppState flinkAppState = FlinkAppState.of(state); |
| if (FlinkAppState.OTHER.equals(flinkAppState)) { |
| return; |
| } |
| if (FlinkAppState.KILLED.equals(flinkAppState)) { |
| if (StopFrom.NONE.equals(stopFrom)) { |
| log.error("flinkTrackingTask getFromYarnRestApi,job was killed and stopFrom NotFound,savePoint obsoleted!"); |
| savePointService.obsolete(application.getId()); |
| } |
| flinkAppState = FlinkAppState.CANCELED; |
| cleanSavepoint(application); |
| application.setEndTime(new Date()); |
| } |
| application.setState(flinkAppState.getValue()); |
| //能运行到这一步,说明到YARN REST api中成功查询到信息 |
| cleanOptioning(optionState, application.getId()); |
| this.persistentAndClean(application); |
| |
| if (flinkAppState.equals(FlinkAppState.FAILED) || flinkAppState.equals(FlinkAppState.LOST)) { |
| alertService.alert(application, flinkAppState); |
| if (flinkAppState.equals(FlinkAppState.FAILED)) { |
| applicationService.start(application, true); |
| } |
| } |
| } catch (Exception e) { |
| throw new RuntimeException("flinkTrackingTask getFromYarnRestApi error,", e); |
| } |
| } |
| } |
| |
| } |
| |
| private void cleanOptioning(OptionState optionState, Long key) { |
| if (optionState != null) { |
| lastOptionTime = System.currentTimeMillis(); |
| OPTIONING.remove(key); |
| } |
| } |
| |
| private void cleanSavepoint(Application application) { |
| SAVEPOINT_CACHE.invalidate(application.getId()); |
| application.setOptionState(OptionState.NONE.getValue()); |
| } |
| |
| private static List<Application> getAllApplications() { |
| QueryWrapper<Application> queryWrapper = new QueryWrapper<>(); |
| queryWrapper.eq("tracking", 1); |
| return applicationService.list(queryWrapper); |
| } |
| |
| private static void persistent(Application application) { |
| applicationService.updateTracking(application); |
| } |
| |
| private void persistentAndClean(Application application) { |
| persistent(application); |
| stopTracking(application.getId()); |
| } |
| |
| |
| /** |
| * <p><strong>1分钟往数据库同步一次状态</strong></p></br> |
| * <p><strong>NOTE:该操作可能会导致当程序挂了,所监控的状态没及时往数据库同步的情况,造成被监控的实际的application和数控库状态不一致的情况 |
| * 但是这种操作也仅在每次程序挂和升级手动停止的情况,但是带的是减少了对数据库读写的次数,减小了数据的压力. |
| * </strong></p> |
| */ |
| @Scheduled(fixedDelay = 1000 * 60) |
| public void persistent() { |
| TRACKING_MAP.forEach((k, v) -> persistent(v)); |
| } |
| |
| // =============================== static public method... ========================================= |
| |
| /** |
| * 设置正在操作中... |
| */ |
| public static void setOptionState(Long appId, OptionState state) { |
| log.info("flinkTrackingTask setOptioning"); |
| optioningTime = System.currentTimeMillis(); |
| OPTIONING.put(appId, state); |
| //从streamx停止 |
| if (state.equals(OptionState.CANCELLING)) { |
| STOP_FROM_MAP.put(appId, StopFrom.STREAMX); |
| } |
| } |
| |
| public static void addTracking(Application application) { |
| log.info("flinkTrackingTask add app to tracking,appId:{}", application.getId()); |
| TRACKING_MAP.put(application.getId(), application); |
| STARTING_CACHE.put(application.getId(), DEFAULT_FLAG_BYTE); |
| } |
| |
| public static void addSavepoint(Long appId) { |
| log.info("flinkTrackingTask add app to savepoint,appId:{}", appId); |
| SAVEPOINT_CACHE.put(appId, DEFAULT_FLAG_BYTE); |
| } |
| |
| /** |
| * 重新加载最新的application到数据库,防止如修改等操作,导致cache和实际数据库中信息不一致的问题. |
| * |
| * @param appId |
| * @param callable |
| */ |
| public static Object refreshTracking(Long appId, Callable callable) throws Exception { |
| log.info("flinkTrackingTask flushing app,appId:{}", appId); |
| Application application = TRACKING_MAP.get(appId); |
| if (application != null) { |
| persistent(application); |
| Object result = callable.call(); |
| TRACKING_MAP.put(appId, applicationService.getById(appId)); |
| return result; |
| } |
| return callable.call(); |
| } |
| |
| public static void refreshTracking(Runnable runnable) { |
| log.info("flinkTrackingTask flushing all application starting"); |
| getAllTrackingApp().values().forEach(app -> { |
| Application application = TRACKING_MAP.get(app.getId()); |
| if (application != null) { |
| persistent(application); |
| } |
| }); |
| |
| runnable.run(); |
| |
| getAllApplications().forEach((app) -> { |
| if (TRACKING_MAP.get(app.getId()) != null) { |
| TRACKING_MAP.put(app.getId(), app); |
| } |
| }); |
| log.info("flinkTrackingTask flushing all application end!"); |
| } |
| |
| public static void stopTracking(Long appId) { |
| log.info("flinkTrackingTask stop app,appId:{}", appId); |
| TRACKING_MAP.remove(appId); |
| } |
| |
| public static Map<Long, Application> getAllTrackingApp() { |
| return TRACKING_MAP; |
| } |
| |
| public static Application getTracking(Long appId) { |
| return TRACKING_MAP.get(appId); |
| } |
| } |