| /* |
| * Copyright 1999-2015 dangdang.com. |
| * <p> |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * </p> |
| */ |
| |
| package io.elasticjob.cloud.scheduler.statistics; |
| |
| import io.elasticjob.cloud.api.JobType; |
| import io.elasticjob.cloud.scheduler.statistics.job.JobRunningStatisticJob; |
| import io.elasticjob.cloud.event.rdb.JobEventRdbConfiguration; |
| import io.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration; |
| import io.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService; |
| import io.elasticjob.cloud.scheduler.config.job.CloudJobExecutionType; |
| import io.elasticjob.cloud.scheduler.statistics.job.RegisteredJobStatisticJob; |
| import io.elasticjob.cloud.scheduler.statistics.job.TaskResultStatisticJob; |
| import io.elasticjob.cloud.scheduler.statistics.util.StatisticTimeUtils; |
| import io.elasticjob.cloud.statistics.type.job.JobRegisterStatistics; |
| import io.elasticjob.cloud.statistics.type.task.TaskResultStatistics; |
| import io.elasticjob.cloud.reg.base.CoordinatorRegistryCenter; |
| import io.elasticjob.cloud.statistics.StatisticInterval; |
| import io.elasticjob.cloud.statistics.rdb.StatisticRdbRepository; |
| import io.elasticjob.cloud.statistics.type.job.JobExecutionTypeStatistics; |
| import io.elasticjob.cloud.statistics.type.job.JobRunningStatistics; |
| import io.elasticjob.cloud.statistics.type.job.JobTypeStatistics; |
| import io.elasticjob.cloud.statistics.type.task.TaskRunningStatistics; |
| import com.google.common.base.Optional; |
| import lombok.AccessLevel; |
| import lombok.RequiredArgsConstructor; |
| import lombok.extern.slf4j.Slf4j; |
| |
| import java.sql.SQLException; |
| import java.text.ParseException; |
| import java.text.SimpleDateFormat; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| /** |
| * 统计作业调度管理器. |
| * |
| * @author liguangyun |
| */ |
| @Slf4j |
| @RequiredArgsConstructor(access = AccessLevel.PRIVATE) |
| public final class StatisticManager { |
| |
| private static volatile StatisticManager instance; |
| |
| private final CoordinatorRegistryCenter registryCenter; |
| |
| private final CloudJobConfigurationService configurationService; |
| |
| private final Optional<JobEventRdbConfiguration> jobEventRdbConfiguration; |
| |
| private final StatisticsScheduler scheduler; |
| |
| private final Map<StatisticInterval, TaskResultMetaData> statisticData; |
| |
| private StatisticRdbRepository rdbRepository; |
| |
| private StatisticManager(final CoordinatorRegistryCenter registryCenter, final Optional<JobEventRdbConfiguration> jobEventRdbConfiguration, |
| final StatisticsScheduler scheduler, final Map<StatisticInterval, TaskResultMetaData> statisticData) { |
| this.registryCenter = registryCenter; |
| this.configurationService = new CloudJobConfigurationService(registryCenter); |
| this.jobEventRdbConfiguration = jobEventRdbConfiguration; |
| this.scheduler = scheduler; |
| this.statisticData = statisticData; |
| } |
| |
| /** |
| * 获取统计作业调度管理器. |
| * |
| * @param regCenter 注册中心 |
| * @param jobEventRdbConfiguration 作业数据库事件配置 |
| * @return 调度管理器对象 |
| */ |
| public static StatisticManager getInstance(final CoordinatorRegistryCenter regCenter, final Optional<JobEventRdbConfiguration> jobEventRdbConfiguration) { |
| if (null == instance) { |
| synchronized (StatisticManager.class) { |
| if (null == instance) { |
| Map<StatisticInterval, TaskResultMetaData> statisticData = new HashMap<>(); |
| statisticData.put(StatisticInterval.MINUTE, new TaskResultMetaData()); |
| statisticData.put(StatisticInterval.HOUR, new TaskResultMetaData()); |
| statisticData.put(StatisticInterval.DAY, new TaskResultMetaData()); |
| instance = new StatisticManager(regCenter, jobEventRdbConfiguration, new StatisticsScheduler(), statisticData); |
| init(); |
| } |
| } |
| } |
| return instance; |
| } |
| |
| private static void init() { |
| if (instance.jobEventRdbConfiguration.isPresent()) { |
| try { |
| instance.rdbRepository = new StatisticRdbRepository(instance.jobEventRdbConfiguration.get().getDataSource()); |
| } catch (final SQLException ex) { |
| log.error("Init StatisticRdbRepository error:", ex); |
| } |
| } |
| } |
| |
| /** |
| * 启动统计作业调度. |
| */ |
| public void startup() { |
| if (null != rdbRepository) { |
| scheduler.start(); |
| scheduler.register(new TaskResultStatisticJob(StatisticInterval.MINUTE, statisticData.get(StatisticInterval.MINUTE), rdbRepository)); |
| scheduler.register(new TaskResultStatisticJob(StatisticInterval.HOUR, statisticData.get(StatisticInterval.HOUR), rdbRepository)); |
| scheduler.register(new TaskResultStatisticJob(StatisticInterval.DAY, statisticData.get(StatisticInterval.DAY), rdbRepository)); |
| scheduler.register(new JobRunningStatisticJob(registryCenter, rdbRepository)); |
| scheduler.register(new RegisteredJobStatisticJob(configurationService, rdbRepository)); |
| } |
| } |
| |
| /** |
| * 停止统计作业调度. |
| */ |
| public void shutdown() { |
| scheduler.shutdown(); |
| } |
| |
| /** |
| * 任务运行成功. |
| */ |
| public void taskRunSuccessfully() { |
| statisticData.get(StatisticInterval.MINUTE).incrementAndGetSuccessCount(); |
| statisticData.get(StatisticInterval.HOUR).incrementAndGetSuccessCount(); |
| statisticData.get(StatisticInterval.DAY).incrementAndGetSuccessCount(); |
| } |
| |
| /** |
| * 作业运行失败. |
| */ |
| public void taskRunFailed() { |
| statisticData.get(StatisticInterval.MINUTE).incrementAndGetFailedCount(); |
| statisticData.get(StatisticInterval.HOUR).incrementAndGetFailedCount(); |
| statisticData.get(StatisticInterval.DAY).incrementAndGetFailedCount(); |
| } |
| |
| private boolean isRdbConfigured() { |
| return null != rdbRepository; |
| } |
| |
| /** |
| * 获取最近一周的任务运行结果统计数据. |
| * |
| * @return 任务运行结果统计数据对象 |
| */ |
| public TaskResultStatistics getTaskResultStatisticsWeekly() { |
| if (!isRdbConfigured()) { |
| return new TaskResultStatistics(0, 0, StatisticInterval.DAY, new Date()); |
| } |
| return rdbRepository.getSummedTaskResultStatistics(StatisticTimeUtils.getStatisticTime(StatisticInterval.DAY, -7), StatisticInterval.DAY); |
| } |
| |
| /** |
| * 获取自上线以来的任务运行结果统计数据. |
| * |
| * @return 任务运行结果统计数据对象 |
| */ |
| public TaskResultStatistics getTaskResultStatisticsSinceOnline() { |
| if (!isRdbConfigured()) { |
| return new TaskResultStatistics(0, 0, StatisticInterval.DAY, new Date()); |
| } |
| return rdbRepository.getSummedTaskResultStatistics(getOnlineDate(), StatisticInterval.DAY); |
| } |
| |
| /** |
| * 获取最近一个统计周期的任务运行结果统计数据. |
| * |
| * @param statisticInterval 统计周期 |
| * @return 任务运行结果统计数据对象 |
| */ |
| public TaskResultStatistics findLatestTaskResultStatistics(final StatisticInterval statisticInterval) { |
| if (isRdbConfigured()) { |
| Optional<TaskResultStatistics> result = rdbRepository.findLatestTaskResultStatistics(statisticInterval); |
| if (result.isPresent()) { |
| return result.get(); |
| } |
| } |
| return new TaskResultStatistics(0, 0, statisticInterval, new Date()); |
| } |
| |
| /** |
| * 获取最近一天的任务运行结果统计数据集合. |
| * |
| * @return 任务运行结果统计数据对象集合 |
| */ |
| public List<TaskResultStatistics> findTaskResultStatisticsDaily() { |
| if (!isRdbConfigured()) { |
| return Collections.emptyList(); |
| } |
| return rdbRepository.findTaskResultStatistics(StatisticTimeUtils.getStatisticTime(StatisticInterval.HOUR, -24), StatisticInterval.MINUTE); |
| } |
| |
| /** |
| * 获取作业类型统计数据. |
| * |
| * @return 作业类型统计数据对象 |
| */ |
| public JobTypeStatistics getJobTypeStatistics() { |
| int scriptJobCnt = 0; |
| int simpleJobCnt = 0; |
| int dataflowJobCnt = 0; |
| for (CloudJobConfiguration each : configurationService.loadAll()) { |
| if (JobType.SCRIPT.equals(each.getTypeConfig().getJobType())) { |
| scriptJobCnt++; |
| } else if (JobType.SIMPLE.equals(each.getTypeConfig().getJobType())) { |
| simpleJobCnt++; |
| } else if (JobType.DATAFLOW.equals(each.getTypeConfig().getJobType())) { |
| dataflowJobCnt++; |
| } |
| } |
| return new JobTypeStatistics(scriptJobCnt, simpleJobCnt, dataflowJobCnt); |
| } |
| |
| /** |
| * 获取作业执行类型统计数据. |
| * |
| * @return 作业执行类型统计数据对象 |
| */ |
| public JobExecutionTypeStatistics getJobExecutionTypeStatistics() { |
| int transientJobCnt = 0; |
| int daemonJobCnt = 0; |
| for (CloudJobConfiguration each : configurationService.loadAll()) { |
| if (CloudJobExecutionType.TRANSIENT.equals(each.getJobExecutionType())) { |
| transientJobCnt++; |
| } else if (CloudJobExecutionType.DAEMON.equals(each.getJobExecutionType())) { |
| daemonJobCnt++; |
| } |
| } |
| return new JobExecutionTypeStatistics(transientJobCnt, daemonJobCnt); |
| } |
| |
| /** |
| * 获取最近一周的运行中的任务统计数据集合. |
| * |
| * @return 运行中的任务统计数据对象集合 |
| */ |
| public List<TaskRunningStatistics> findTaskRunningStatisticsWeekly() { |
| if (!isRdbConfigured()) { |
| return Collections.emptyList(); |
| } |
| return rdbRepository.findTaskRunningStatistics(StatisticTimeUtils.getStatisticTime(StatisticInterval.DAY, -7)); |
| } |
| |
| /** |
| * 获取最近一周的运行中的作业统计数据集合. |
| * |
| * @return 运行中的任务统计数据对象集合 |
| */ |
| public List<JobRunningStatistics> findJobRunningStatisticsWeekly() { |
| if (!isRdbConfigured()) { |
| return Collections.emptyList(); |
| } |
| return rdbRepository.findJobRunningStatistics(StatisticTimeUtils.getStatisticTime(StatisticInterval.DAY, -7)); |
| } |
| |
| /** |
| * 获取自上线以来的运行中的任务统计数据集合. |
| * |
| * @return 运行中的任务统计数据对象集合 |
| */ |
| public List<JobRegisterStatistics> findJobRegisterStatisticsSinceOnline() { |
| if (!isRdbConfigured()) { |
| return Collections.emptyList(); |
| } |
| return rdbRepository.findJobRegisterStatistics(getOnlineDate()); |
| } |
| |
| private Date getOnlineDate() { |
| SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); |
| try { |
| return formatter.parse("2016-12-16"); |
| } catch (final ParseException ex) { |
| return null; |
| } |
| } |
| } |