blob: 9d37967d58e82f17140bd6b8d3e2d18213929c68 [file] [log] [blame]
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.elasticjob.cloud.scheduler.statistics;
import io.elasticjob.cloud.api.JobType;
import io.elasticjob.cloud.scheduler.statistics.job.JobRunningStatisticJob;
import io.elasticjob.cloud.event.rdb.JobEventRdbConfiguration;
import io.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration;
import io.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService;
import io.elasticjob.cloud.scheduler.config.job.CloudJobExecutionType;
import io.elasticjob.cloud.scheduler.statistics.job.RegisteredJobStatisticJob;
import io.elasticjob.cloud.scheduler.statistics.job.TaskResultStatisticJob;
import io.elasticjob.cloud.scheduler.statistics.util.StatisticTimeUtils;
import io.elasticjob.cloud.statistics.type.job.JobRegisterStatistics;
import io.elasticjob.cloud.statistics.type.task.TaskResultStatistics;
import io.elasticjob.cloud.reg.base.CoordinatorRegistryCenter;
import io.elasticjob.cloud.statistics.StatisticInterval;
import io.elasticjob.cloud.statistics.rdb.StatisticRdbRepository;
import io.elasticjob.cloud.statistics.type.job.JobExecutionTypeStatistics;
import io.elasticjob.cloud.statistics.type.job.JobRunningStatistics;
import io.elasticjob.cloud.statistics.type.job.JobTypeStatistics;
import io.elasticjob.cloud.statistics.type.task.TaskRunningStatistics;
import com.google.common.base.Optional;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 统计作业调度管理器.
*
* @author liguangyun
*/
@Slf4j
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class StatisticManager {
private static volatile StatisticManager instance;
private final CoordinatorRegistryCenter registryCenter;
private final CloudJobConfigurationService configurationService;
private final Optional<JobEventRdbConfiguration> jobEventRdbConfiguration;
private final StatisticsScheduler scheduler;
private final Map<StatisticInterval, TaskResultMetaData> statisticData;
private StatisticRdbRepository rdbRepository;
private StatisticManager(final CoordinatorRegistryCenter registryCenter, final Optional<JobEventRdbConfiguration> jobEventRdbConfiguration,
final StatisticsScheduler scheduler, final Map<StatisticInterval, TaskResultMetaData> statisticData) {
this.registryCenter = registryCenter;
this.configurationService = new CloudJobConfigurationService(registryCenter);
this.jobEventRdbConfiguration = jobEventRdbConfiguration;
this.scheduler = scheduler;
this.statisticData = statisticData;
}
/**
* 获取统计作业调度管理器.
*
* @param regCenter 注册中心
* @param jobEventRdbConfiguration 作业数据库事件配置
* @return 调度管理器对象
*/
public static StatisticManager getInstance(final CoordinatorRegistryCenter regCenter, final Optional<JobEventRdbConfiguration> jobEventRdbConfiguration) {
if (null == instance) {
synchronized (StatisticManager.class) {
if (null == instance) {
Map<StatisticInterval, TaskResultMetaData> statisticData = new HashMap<>();
statisticData.put(StatisticInterval.MINUTE, new TaskResultMetaData());
statisticData.put(StatisticInterval.HOUR, new TaskResultMetaData());
statisticData.put(StatisticInterval.DAY, new TaskResultMetaData());
instance = new StatisticManager(regCenter, jobEventRdbConfiguration, new StatisticsScheduler(), statisticData);
init();
}
}
}
return instance;
}
private static void init() {
if (instance.jobEventRdbConfiguration.isPresent()) {
try {
instance.rdbRepository = new StatisticRdbRepository(instance.jobEventRdbConfiguration.get().getDataSource());
} catch (final SQLException ex) {
log.error("Init StatisticRdbRepository error:", ex);
}
}
}
/**
* 启动统计作业调度.
*/
public void startup() {
if (null != rdbRepository) {
scheduler.start();
scheduler.register(new TaskResultStatisticJob(StatisticInterval.MINUTE, statisticData.get(StatisticInterval.MINUTE), rdbRepository));
scheduler.register(new TaskResultStatisticJob(StatisticInterval.HOUR, statisticData.get(StatisticInterval.HOUR), rdbRepository));
scheduler.register(new TaskResultStatisticJob(StatisticInterval.DAY, statisticData.get(StatisticInterval.DAY), rdbRepository));
scheduler.register(new JobRunningStatisticJob(registryCenter, rdbRepository));
scheduler.register(new RegisteredJobStatisticJob(configurationService, rdbRepository));
}
}
/**
* 停止统计作业调度.
*/
public void shutdown() {
scheduler.shutdown();
}
/**
* 任务运行成功.
*/
public void taskRunSuccessfully() {
statisticData.get(StatisticInterval.MINUTE).incrementAndGetSuccessCount();
statisticData.get(StatisticInterval.HOUR).incrementAndGetSuccessCount();
statisticData.get(StatisticInterval.DAY).incrementAndGetSuccessCount();
}
/**
* 作业运行失败.
*/
public void taskRunFailed() {
statisticData.get(StatisticInterval.MINUTE).incrementAndGetFailedCount();
statisticData.get(StatisticInterval.HOUR).incrementAndGetFailedCount();
statisticData.get(StatisticInterval.DAY).incrementAndGetFailedCount();
}
private boolean isRdbConfigured() {
return null != rdbRepository;
}
/**
* 获取最近一周的任务运行结果统计数据.
*
* @return 任务运行结果统计数据对象
*/
public TaskResultStatistics getTaskResultStatisticsWeekly() {
if (!isRdbConfigured()) {
return new TaskResultStatistics(0, 0, StatisticInterval.DAY, new Date());
}
return rdbRepository.getSummedTaskResultStatistics(StatisticTimeUtils.getStatisticTime(StatisticInterval.DAY, -7), StatisticInterval.DAY);
}
/**
* 获取自上线以来的任务运行结果统计数据.
*
* @return 任务运行结果统计数据对象
*/
public TaskResultStatistics getTaskResultStatisticsSinceOnline() {
if (!isRdbConfigured()) {
return new TaskResultStatistics(0, 0, StatisticInterval.DAY, new Date());
}
return rdbRepository.getSummedTaskResultStatistics(getOnlineDate(), StatisticInterval.DAY);
}
/**
* 获取最近一个统计周期的任务运行结果统计数据.
*
* @param statisticInterval 统计周期
* @return 任务运行结果统计数据对象
*/
public TaskResultStatistics findLatestTaskResultStatistics(final StatisticInterval statisticInterval) {
if (isRdbConfigured()) {
Optional<TaskResultStatistics> result = rdbRepository.findLatestTaskResultStatistics(statisticInterval);
if (result.isPresent()) {
return result.get();
}
}
return new TaskResultStatistics(0, 0, statisticInterval, new Date());
}
/**
* 获取最近一天的任务运行结果统计数据集合.
*
* @return 任务运行结果统计数据对象集合
*/
public List<TaskResultStatistics> findTaskResultStatisticsDaily() {
if (!isRdbConfigured()) {
return Collections.emptyList();
}
return rdbRepository.findTaskResultStatistics(StatisticTimeUtils.getStatisticTime(StatisticInterval.HOUR, -24), StatisticInterval.MINUTE);
}
/**
* 获取作业类型统计数据.
*
* @return 作业类型统计数据对象
*/
public JobTypeStatistics getJobTypeStatistics() {
int scriptJobCnt = 0;
int simpleJobCnt = 0;
int dataflowJobCnt = 0;
for (CloudJobConfiguration each : configurationService.loadAll()) {
if (JobType.SCRIPT.equals(each.getTypeConfig().getJobType())) {
scriptJobCnt++;
} else if (JobType.SIMPLE.equals(each.getTypeConfig().getJobType())) {
simpleJobCnt++;
} else if (JobType.DATAFLOW.equals(each.getTypeConfig().getJobType())) {
dataflowJobCnt++;
}
}
return new JobTypeStatistics(scriptJobCnt, simpleJobCnt, dataflowJobCnt);
}
/**
* 获取作业执行类型统计数据.
*
* @return 作业执行类型统计数据对象
*/
public JobExecutionTypeStatistics getJobExecutionTypeStatistics() {
int transientJobCnt = 0;
int daemonJobCnt = 0;
for (CloudJobConfiguration each : configurationService.loadAll()) {
if (CloudJobExecutionType.TRANSIENT.equals(each.getJobExecutionType())) {
transientJobCnt++;
} else if (CloudJobExecutionType.DAEMON.equals(each.getJobExecutionType())) {
daemonJobCnt++;
}
}
return new JobExecutionTypeStatistics(transientJobCnt, daemonJobCnt);
}
/**
* 获取最近一周的运行中的任务统计数据集合.
*
* @return 运行中的任务统计数据对象集合
*/
public List<TaskRunningStatistics> findTaskRunningStatisticsWeekly() {
if (!isRdbConfigured()) {
return Collections.emptyList();
}
return rdbRepository.findTaskRunningStatistics(StatisticTimeUtils.getStatisticTime(StatisticInterval.DAY, -7));
}
/**
* 获取最近一周的运行中的作业统计数据集合.
*
* @return 运行中的任务统计数据对象集合
*/
public List<JobRunningStatistics> findJobRunningStatisticsWeekly() {
if (!isRdbConfigured()) {
return Collections.emptyList();
}
return rdbRepository.findJobRunningStatistics(StatisticTimeUtils.getStatisticTime(StatisticInterval.DAY, -7));
}
/**
* 获取自上线以来的运行中的任务统计数据集合.
*
* @return 运行中的任务统计数据对象集合
*/
public List<JobRegisterStatistics> findJobRegisterStatisticsSinceOnline() {
if (!isRdbConfigured()) {
return Collections.emptyList();
}
return rdbRepository.findJobRegisterStatistics(getOnlineDate());
}
private Date getOnlineDate() {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
try {
return formatter.parse("2016-12-16");
} catch (final ParseException ex) {
return null;
}
}
}