blob: 46833a1d15a58dbf2714fe28708d91bd182a4b54 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.cloud.statistics.rdb;
import com.google.common.base.Optional;
import org.apache.shardingsphere.elasticjob.cloud.statistics.StatisticInterval;
import org.apache.shardingsphere.elasticjob.cloud.statistics.type.job.JobRegisterStatistics;
import org.apache.shardingsphere.elasticjob.cloud.statistics.type.job.JobRunningStatistics;
import org.apache.shardingsphere.elasticjob.cloud.statistics.type.task.TaskResultStatistics;
import org.apache.shardingsphere.elasticjob.cloud.statistics.type.task.TaskRunningStatistics;
import lombok.extern.slf4j.Slf4j;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
/**
* Statistic RDB repository.
*/
@Slf4j
public class StatisticRdbRepository {
private static final String TABLE_TASK_RESULT_STATISTICS = "TASK_RESULT_STATISTICS";
private static final String TABLE_TASK_RUNNING_STATISTICS = "TASK_RUNNING_STATISTICS";
private static final String TABLE_JOB_RUNNING_STATISTICS = "JOB_RUNNING_STATISTICS";
private static final String TABLE_JOB_REGISTER_STATISTICS = "JOB_REGISTER_STATISTICS";
private final DataSource dataSource;
public StatisticRdbRepository(final DataSource dataSource) throws SQLException {
this.dataSource = dataSource;
initTables();
}
private void initTables() throws SQLException {
try (Connection conn = dataSource.getConnection()) {
createTaskResultTableIfNeeded(conn);
createTaskRunningTableIfNeeded(conn);
createJobRunningTableIfNeeded(conn);
createJobRegisterTableIfNeeded(conn);
}
}
private void createTaskResultTableIfNeeded(final Connection conn) throws SQLException {
DatabaseMetaData dbMetaData = conn.getMetaData();
for (StatisticInterval each : StatisticInterval.values()) {
try (ResultSet resultSet = dbMetaData.getTables(null, null, TABLE_TASK_RESULT_STATISTICS + "_" + each, new String[]{"TABLE"})) {
if (!resultSet.next()) {
createTaskResultTable(conn, each);
}
}
}
}
private void createTaskResultTable(final Connection conn, final StatisticInterval statisticInterval) throws SQLException {
String dbSchema = "CREATE TABLE `" + TABLE_TASK_RESULT_STATISTICS + "_" + statisticInterval + "` ("
+ "`id` BIGINT NOT NULL AUTO_INCREMENT, "
+ "`success_count` INT(11),"
+ "`failed_count` INT(11),"
+ "`statistics_time` TIMESTAMP NOT NULL,"
+ "`creation_time` TIMESTAMP NOT NULL,"
+ "PRIMARY KEY (`id`));";
try (PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {
preparedStatement.execute();
}
}
private void createTaskRunningTableIfNeeded(final Connection conn) throws SQLException {
DatabaseMetaData dbMetaData = conn.getMetaData();
try (ResultSet resultSet = dbMetaData.getTables(null, null, TABLE_TASK_RUNNING_STATISTICS, new String[]{"TABLE"})) {
if (!resultSet.next()) {
createTaskRunningTable(conn);
}
}
}
private void createTaskRunningTable(final Connection conn) throws SQLException {
String dbSchema = "CREATE TABLE `" + TABLE_TASK_RUNNING_STATISTICS + "` ("
+ "`id` BIGINT NOT NULL AUTO_INCREMENT, "
+ "`running_count` INT(11),"
+ "`statistics_time` TIMESTAMP NOT NULL,"
+ "`creation_time` TIMESTAMP NOT NULL,"
+ "PRIMARY KEY (`id`));";
try (PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {
preparedStatement.execute();
}
}
private void createJobRunningTableIfNeeded(final Connection conn) throws SQLException {
DatabaseMetaData dbMetaData = conn.getMetaData();
try (ResultSet resultSet = dbMetaData.getTables(null, null, TABLE_JOB_RUNNING_STATISTICS, new String[]{"TABLE"})) {
if (!resultSet.next()) {
createJobRunningTable(conn);
}
}
}
private void createJobRunningTable(final Connection conn) throws SQLException {
String dbSchema = "CREATE TABLE `" + TABLE_JOB_RUNNING_STATISTICS + "` ("
+ "`id` BIGINT NOT NULL AUTO_INCREMENT, "
+ "`running_count` INT(11),"
+ "`statistics_time` TIMESTAMP NOT NULL,"
+ "`creation_time` TIMESTAMP NOT NULL,"
+ "PRIMARY KEY (`id`));";
try (PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {
preparedStatement.execute();
}
}
private void createJobRegisterTableIfNeeded(final Connection conn) throws SQLException {
DatabaseMetaData dbMetaData = conn.getMetaData();
try (ResultSet resultSet = dbMetaData.getTables(null, null, TABLE_JOB_REGISTER_STATISTICS, new String[]{"TABLE"})) {
if (!resultSet.next()) {
createJobRegisterTable(conn);
}
}
}
private void createJobRegisterTable(final Connection conn) throws SQLException {
String dbSchema = "CREATE TABLE `" + TABLE_JOB_REGISTER_STATISTICS + "` ("
+ "`id` BIGINT NOT NULL AUTO_INCREMENT, "
+ "`registered_count` INT(11),"
+ "`statistics_time` TIMESTAMP NOT NULL,"
+ "`creation_time` TIMESTAMP NOT NULL,"
+ "PRIMARY KEY (`id`));";
try (PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {
preparedStatement.execute();
}
}
/**
* Add task result statistics.
*
* @param taskResultStatistics task result statistics
* @return add success or not
*/
public boolean add(final TaskResultStatistics taskResultStatistics) {
boolean result = false;
String sql = "INSERT INTO `" + TABLE_TASK_RESULT_STATISTICS + "_" + taskResultStatistics.getStatisticInterval()
+ "` (`success_count`, `failed_count`, `statistics_time`, `creation_time`) VALUES (?, ?, ?, ?);";
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
preparedStatement.setInt(1, taskResultStatistics.getSuccessCount());
preparedStatement.setInt(2, taskResultStatistics.getFailedCount());
preparedStatement.setTimestamp(3, new Timestamp(taskResultStatistics.getStatisticsTime().getTime()));
preparedStatement.setTimestamp(4, new Timestamp(taskResultStatistics.getCreationTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Insert taskResultStatistics to DB error:", ex);
}
return result;
}
/**
* Add task running statistics.
*
* @param taskRunningStatistics task running statistics
* @return add success or not
*/
public boolean add(final TaskRunningStatistics taskRunningStatistics) {
boolean result = false;
String sql = "INSERT INTO `" + TABLE_TASK_RUNNING_STATISTICS + "` (`running_count`, `statistics_time`, `creation_time`) VALUES (?, ?, ?);";
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
preparedStatement.setInt(1, taskRunningStatistics.getRunningCount());
preparedStatement.setTimestamp(2, new Timestamp(taskRunningStatistics.getStatisticsTime().getTime()));
preparedStatement.setTimestamp(3, new Timestamp(taskRunningStatistics.getCreationTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Insert taskRunningStatistics to DB error:", ex);
}
return result;
}
/**
* Add job running statistics.
*
* @param jobRunningStatistics job running statistics
* @return add success or not
*/
public boolean add(final JobRunningStatistics jobRunningStatistics) {
boolean result = false;
String sql = "INSERT INTO `" + TABLE_JOB_RUNNING_STATISTICS + "` (`running_count`, `statistics_time`, `creation_time`) VALUES (?, ?, ?);";
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
preparedStatement.setInt(1, jobRunningStatistics.getRunningCount());
preparedStatement.setTimestamp(2, new Timestamp(jobRunningStatistics.getStatisticsTime().getTime()));
preparedStatement.setTimestamp(3, new Timestamp(jobRunningStatistics.getCreationTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Insert jobRunningStatistics to DB error:", ex);
}
return result;
}
/**
* Add job register statistics.
*
* @param jobRegisterStatistics job register statistics
* @return add success or not
*/
public boolean add(final JobRegisterStatistics jobRegisterStatistics) {
boolean result = false;
String sql = "INSERT INTO `" + TABLE_JOB_REGISTER_STATISTICS + "` (`registered_count`, `statistics_time`, `creation_time`) VALUES (?, ?, ?);";
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
preparedStatement.setInt(1, jobRegisterStatistics.getRegisteredCount());
preparedStatement.setTimestamp(2, new Timestamp(jobRegisterStatistics.getStatisticsTime().getTime()));
preparedStatement.setTimestamp(3, new Timestamp(jobRegisterStatistics.getCreationTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Insert jobRegisterStatistics to DB error:", ex);
}
return result;
}
/**
* Find task result statistics.
*
* @param from from date to statistics
* @param statisticInterval statistic interval
* @return task result statistics
*/
public List<TaskResultStatistics> findTaskResultStatistics(final Date from, final StatisticInterval statisticInterval) {
List<TaskResultStatistics> result = new LinkedList<>();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sql = String.format("SELECT id, success_count, failed_count, statistics_time, creation_time FROM %s WHERE statistics_time >= '%s' order by id ASC",
TABLE_TASK_RESULT_STATISTICS + "_" + statisticInterval, formatter.format(from));
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()
) {
while (resultSet.next()) {
TaskResultStatistics taskResultStatistics = new TaskResultStatistics(resultSet.getLong(1), resultSet.getInt(2), resultSet.getInt(3),
statisticInterval, new Date(resultSet.getTimestamp(4).getTime()), new Date(resultSet.getTimestamp(5).getTime()));
result.add(taskResultStatistics);
}
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Fetch taskResultStatistics from DB error:", ex);
}
return result;
}
/**
* Get summed task result statistics.
*
* @param from from date to statistics
* @param statisticInterval statistic interval
* @return summed task result statistics
*/
public TaskResultStatistics getSummedTaskResultStatistics(final Date from, final StatisticInterval statisticInterval) {
TaskResultStatistics result = new TaskResultStatistics(0, 0, statisticInterval, new Date());
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sql = String.format("SELECT sum(success_count), sum(failed_count) FROM %s WHERE statistics_time >= '%s'",
TABLE_TASK_RESULT_STATISTICS + "_" + statisticInterval, formatter.format(from));
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()
) {
while (resultSet.next()) {
result = new TaskResultStatistics(resultSet.getInt(1), resultSet.getInt(2), statisticInterval, new Date());
}
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Fetch summed taskResultStatistics from DB error:", ex);
}
return result;
}
/**
* Find latest task result statistics.
*
* @param statisticInterval statistic interval
* @return task result statistics
*/
public Optional<TaskResultStatistics> findLatestTaskResultStatistics(final StatisticInterval statisticInterval) {
TaskResultStatistics result = null;
String sql = String.format("SELECT id, success_count, failed_count, statistics_time, creation_time FROM %s order by id DESC LIMIT 1",
TABLE_TASK_RESULT_STATISTICS + "_" + statisticInterval);
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()
) {
while (resultSet.next()) {
result = new TaskResultStatistics(resultSet.getLong(1), resultSet.getInt(2), resultSet.getInt(3),
statisticInterval, new Date(resultSet.getTimestamp(4).getTime()), new Date(resultSet.getTimestamp(5).getTime()));
}
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Fetch latest taskResultStatistics from DB error:", ex);
}
return Optional.fromNullable(result);
}
/**
* Find task running statistics.
*
* @param from from date to statistics
* @return Task running statistics
*/
public List<TaskRunningStatistics> findTaskRunningStatistics(final Date from) {
List<TaskRunningStatistics> result = new LinkedList<>();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sql = String.format("SELECT id, running_count, statistics_time, creation_time FROM %s WHERE statistics_time >= '%s' order by id ASC",
TABLE_TASK_RUNNING_STATISTICS, formatter.format(from));
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()
) {
while (resultSet.next()) {
TaskRunningStatistics taskRunningStatistics = new TaskRunningStatistics(resultSet.getLong(1), resultSet.getInt(2),
new Date(resultSet.getTimestamp(3).getTime()), new Date(resultSet.getTimestamp(4).getTime()));
result.add(taskRunningStatistics);
}
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Fetch taskRunningStatistics from DB error:", ex);
}
return result;
}
/**
* Find job running statistics.
*
* @param from from date to statistics
* @return job running statistics
*/
public List<JobRunningStatistics> findJobRunningStatistics(final Date from) {
List<JobRunningStatistics> result = new LinkedList<>();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sql = String.format("SELECT id, running_count, statistics_time, creation_time FROM %s WHERE statistics_time >= '%s' order by id ASC",
TABLE_JOB_RUNNING_STATISTICS, formatter.format(from));
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()
) {
while (resultSet.next()) {
JobRunningStatistics jobRunningStatistics = new JobRunningStatistics(resultSet.getLong(1), resultSet.getInt(2),
new Date(resultSet.getTimestamp(3).getTime()), new Date(resultSet.getTimestamp(4).getTime()));
result.add(jobRunningStatistics);
}
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Fetch jobRunningStatistics from DB error:", ex);
}
return result;
}
/**
* Find latest task running statistics.
*
* @return latest task running statistics
*/
public Optional<TaskRunningStatistics> findLatestTaskRunningStatistics() {
TaskRunningStatistics result = null;
String sql = String.format("SELECT id, running_count, statistics_time, creation_time FROM %s order by id DESC LIMIT 1",
TABLE_TASK_RUNNING_STATISTICS);
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()
) {
while (resultSet.next()) {
result = new TaskRunningStatistics(resultSet.getLong(1), resultSet.getInt(2),
new Date(resultSet.getTimestamp(3).getTime()), new Date(resultSet.getTimestamp(4).getTime()));
}
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Fetch latest taskRunningStatistics from DB error:", ex);
}
return Optional.fromNullable(result);
}
/**
* Find latest job running statistics.
*
* @return job running statistics
*/
public Optional<JobRunningStatistics> findLatestJobRunningStatistics() {
JobRunningStatistics result = null;
String sql = String.format("SELECT id, running_count, statistics_time, creation_time FROM %s order by id DESC LIMIT 1",
TABLE_JOB_RUNNING_STATISTICS);
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()
) {
while (resultSet.next()) {
result = new JobRunningStatistics(resultSet.getLong(1), resultSet.getInt(2),
new Date(resultSet.getTimestamp(3).getTime()), new Date(resultSet.getTimestamp(4).getTime()));
}
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Fetch latest jobRunningStatistics from DB error:", ex);
}
return Optional.fromNullable(result);
}
/**
* Find job register statistics.
*
* @param from from date to statistics
* @return job register statistics
*/
public List<JobRegisterStatistics> findJobRegisterStatistics(final Date from) {
List<JobRegisterStatistics> result = new LinkedList<>();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sql = String.format("SELECT id, registered_count, statistics_time, creation_time FROM %s WHERE statistics_time >= '%s' order by id ASC",
TABLE_JOB_REGISTER_STATISTICS, formatter.format(from));
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()
) {
while (resultSet.next()) {
JobRegisterStatistics jobRegisterStatistics = new JobRegisterStatistics(resultSet.getLong(1), resultSet.getInt(2),
new Date(resultSet.getTimestamp(3).getTime()), new Date(resultSet.getTimestamp(4).getTime()));
result.add(jobRegisterStatistics);
}
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Fetch jobRegisterStatistics from DB error:", ex);
}
return result;
}
/**
* Find latest job register statistics.
*
* @return job register statistics
*/
public Optional<JobRegisterStatistics> findLatestJobRegisterStatistics() {
JobRegisterStatistics result = null;
String sql = String.format("SELECT id, registered_count, statistics_time, creation_time FROM %s order by id DESC LIMIT 1",
TABLE_JOB_REGISTER_STATISTICS);
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery()
) {
while (resultSet.next()) {
result = new JobRegisterStatistics(resultSet.getLong(1), resultSet.getInt(2),
new Date(resultSet.getTimestamp(3).getTime()), new Date(resultSet.getTimestamp(4).getTime()));
}
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error("Fetch latest jobRegisterStatistics from DB error:", ex);
}
return Optional.fromNullable(result);
}
}