blob: ab7d045f74ae8351c5cbdf22d62ea3780214fdf5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.tracing.rdb.storage;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.Source;
import org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;
import org.apache.shardingsphere.elasticjob.tracing.rdb.type.DatabaseType;
import org.apache.shardingsphere.elasticjob.tracing.rdb.type.impl.DefaultDatabaseType;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.UUID;
/**
* RDB job event storage.
*/
@Slf4j
public final class RDBJobEventStorage {
private static final String TABLE_JOB_EXECUTION_LOG = "JOB_EXECUTION_LOG";
private static final String TABLE_JOB_STATUS_TRACE_LOG = "JOB_STATUS_TRACE_LOG";
private static final String TASK_ID_STATE_INDEX = "TASK_ID_STATE_INDEX";
private static final Map<String, DatabaseType> DATABASE_TYPES = new HashMap<>();
private final DataSource dataSource;
private final DatabaseType databaseType;
private final RDBStorageSQLMapper sqlMapper;
static {
for (DatabaseType each : ServiceLoader.load(DatabaseType.class)) {
DATABASE_TYPES.put(each.getType(), each);
}
}
public RDBJobEventStorage(final DataSource dataSource) throws SQLException {
this.dataSource = dataSource;
databaseType = getDatabaseType(dataSource);
sqlMapper = new RDBStorageSQLMapper(databaseType.getSQLPropertiesFile());
initTablesAndIndexes();
}
private DatabaseType getDatabaseType(final DataSource dataSource) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
String databaseProductName = connection.getMetaData().getDatabaseProductName();
for (DatabaseType each : DATABASE_TYPES.values()) {
if (each.getDatabaseProductName().equals(databaseProductName)) {
return each;
}
}
}
return new DefaultDatabaseType();
}
private void initTablesAndIndexes() throws SQLException {
try (Connection connection = dataSource.getConnection()) {
createJobExecutionTableAndIndexIfNeeded(connection);
createJobStatusTraceTableAndIndexIfNeeded(connection);
}
}
private void createJobExecutionTableAndIndexIfNeeded(final Connection connection) throws SQLException {
if (existsTable(connection, TABLE_JOB_EXECUTION_LOG) || existsTable(connection, TABLE_JOB_EXECUTION_LOG.toLowerCase())) {
return;
}
createJobExecutionTable(connection);
}
private void createJobStatusTraceTableAndIndexIfNeeded(final Connection connection) throws SQLException {
if (existsTable(connection, TABLE_JOB_STATUS_TRACE_LOG) || existsTable(connection, TABLE_JOB_STATUS_TRACE_LOG.toLowerCase())) {
return;
}
createJobStatusTraceTable(connection);
createTaskIdIndexIfNeeded(connection);
}
private boolean existsTable(final Connection connection, final String tableName) throws SQLException {
DatabaseMetaData dbMetaData = connection.getMetaData();
try (ResultSet resultSet = dbMetaData.getTables(connection.getCatalog(), null, tableName, new String[]{"TABLE"})) {
return resultSet.next();
}
}
private void createTaskIdIndexIfNeeded(final Connection connection) throws SQLException {
if (existsIndex(connection, TABLE_JOB_STATUS_TRACE_LOG, TASK_ID_STATE_INDEX) || existsIndex(connection, TABLE_JOB_STATUS_TRACE_LOG.toLowerCase(), TASK_ID_STATE_INDEX.toLowerCase())) {
return;
}
createTaskIdAndStateIndex(connection);
}
private boolean existsIndex(final Connection connection, final String tableName, final String indexName) throws SQLException {
DatabaseMetaData dbMetaData = connection.getMetaData();
try (ResultSet resultSet = dbMetaData.getIndexInfo(connection.getCatalog(), null, tableName, false, false)) {
while (resultSet.next()) {
if (indexName.equals(resultSet.getString("INDEX_NAME"))) {
return true;
}
}
}
return false;
}
private void createJobExecutionTable(final Connection connection) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getCreateTableForJobExecutionLog())) {
preparedStatement.execute();
}
}
private void createJobStatusTraceTable(final Connection connection) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getCreateTableForJobStatusTraceLog())) {
preparedStatement.execute();
}
}
private void createTaskIdAndStateIndex(final Connection connection) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getCreateIndexForTaskIdStateIndex())) {
preparedStatement.execute();
}
}
/**
* Add job execution event.
*
* @param jobExecutionEvent job execution event
* @return add success or not
*/
public boolean addJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
if (null == jobExecutionEvent.getCompleteTime()) {
return insertJobExecutionEvent(jobExecutionEvent);
} else {
if (jobExecutionEvent.isSuccess()) {
return updateJobExecutionEventWhenSuccess(jobExecutionEvent);
} else {
return updateJobExecutionEventFailure(jobExecutionEvent);
}
}
}
private boolean insertJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
boolean result = false;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getInsertForJobExecutionLog())) {
preparedStatement.setString(1, jobExecutionEvent.getId());
preparedStatement.setString(2, jobExecutionEvent.getJobName());
preparedStatement.setString(3, jobExecutionEvent.getTaskId());
preparedStatement.setString(4, jobExecutionEvent.getHostname());
preparedStatement.setString(5, jobExecutionEvent.getIp());
preparedStatement.setInt(6, jobExecutionEvent.getShardingItem());
preparedStatement.setString(7, jobExecutionEvent.getSource().toString());
preparedStatement.setBoolean(8, jobExecutionEvent.isSuccess());
preparedStatement.setTimestamp(9, new Timestamp(jobExecutionEvent.getStartTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
if (!isDuplicateRecord(ex)) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error(ex.getMessage());
}
}
return result;
}
private boolean updateJobExecutionEventWhenSuccess(final JobExecutionEvent jobExecutionEvent) {
boolean result = false;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getUpdateForJobExecutionLog())) {
preparedStatement.setBoolean(1, jobExecutionEvent.isSuccess());
preparedStatement.setTimestamp(2, new Timestamp(jobExecutionEvent.getCompleteTime().getTime()));
preparedStatement.setString(3, jobExecutionEvent.getId());
if (0 == preparedStatement.executeUpdate()) {
return insertJobExecutionEventWhenSuccess(jobExecutionEvent);
}
result = true;
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error(ex.getMessage());
}
return result;
}
private boolean insertJobExecutionEventWhenSuccess(final JobExecutionEvent jobExecutionEvent) {
boolean result = false;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getInsertForJobExecutionLogForComplete())) {
preparedStatement.setString(1, jobExecutionEvent.getId());
preparedStatement.setString(2, jobExecutionEvent.getJobName());
preparedStatement.setString(3, jobExecutionEvent.getTaskId());
preparedStatement.setString(4, jobExecutionEvent.getHostname());
preparedStatement.setString(5, jobExecutionEvent.getIp());
preparedStatement.setInt(6, jobExecutionEvent.getShardingItem());
preparedStatement.setString(7, jobExecutionEvent.getSource().toString());
preparedStatement.setBoolean(8, jobExecutionEvent.isSuccess());
preparedStatement.setTimestamp(9, new Timestamp(jobExecutionEvent.getStartTime().getTime()));
preparedStatement.setTimestamp(10, new Timestamp(jobExecutionEvent.getCompleteTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
if (isDuplicateRecord(ex)) {
return updateJobExecutionEventWhenSuccess(jobExecutionEvent);
}
// TODO log failure directly to output log, consider to be configurable in the future
log.error(ex.getMessage());
}
return result;
}
private boolean updateJobExecutionEventFailure(final JobExecutionEvent jobExecutionEvent) {
boolean result = false;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getUpdateForJobExecutionLogForFailure())) {
preparedStatement.setBoolean(1, jobExecutionEvent.isSuccess());
preparedStatement.setTimestamp(2, new Timestamp(jobExecutionEvent.getCompleteTime().getTime()));
preparedStatement.setString(3, truncateString(jobExecutionEvent.getFailureCause()));
preparedStatement.setString(4, jobExecutionEvent.getId());
if (0 == preparedStatement.executeUpdate()) {
return insertJobExecutionEventWhenFailure(jobExecutionEvent);
}
result = true;
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error(ex.getMessage());
}
return result;
}
private boolean insertJobExecutionEventWhenFailure(final JobExecutionEvent jobExecutionEvent) {
boolean result = false;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getInsertForJobExecutionLogForFailure())) {
preparedStatement.setString(1, jobExecutionEvent.getId());
preparedStatement.setString(2, jobExecutionEvent.getJobName());
preparedStatement.setString(3, jobExecutionEvent.getTaskId());
preparedStatement.setString(4, jobExecutionEvent.getHostname());
preparedStatement.setString(5, jobExecutionEvent.getIp());
preparedStatement.setInt(6, jobExecutionEvent.getShardingItem());
preparedStatement.setString(7, jobExecutionEvent.getSource().toString());
preparedStatement.setString(8, truncateString(jobExecutionEvent.getFailureCause()));
preparedStatement.setBoolean(9, jobExecutionEvent.isSuccess());
preparedStatement.setTimestamp(10, new Timestamp(jobExecutionEvent.getStartTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
if (isDuplicateRecord(ex)) {
return updateJobExecutionEventFailure(jobExecutionEvent);
}
// TODO log failure directly to output log, consider to be configurable in the future
log.error(ex.getMessage());
}
return result;
}
private boolean isDuplicateRecord(final SQLException ex) {
return null != databaseType && databaseType.getDuplicateRecordErrorCode() == ex.getErrorCode();
}
/**
* Add job status trace event.
*
* @param jobStatusTraceEvent job status trace event
* @return add success or not
*/
public boolean addJobStatusTraceEvent(final JobStatusTraceEvent jobStatusTraceEvent) {
String originalTaskId = jobStatusTraceEvent.getOriginalTaskId();
if (State.TASK_STAGING != jobStatusTraceEvent.getState()) {
originalTaskId = getOriginalTaskId(jobStatusTraceEvent.getTaskId());
}
boolean result = false;
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getInsertForJobStatusTraceLog())) {
preparedStatement.setString(1, UUID.randomUUID().toString());
preparedStatement.setString(2, jobStatusTraceEvent.getJobName());
preparedStatement.setString(3, originalTaskId);
preparedStatement.setString(4, jobStatusTraceEvent.getTaskId());
preparedStatement.setString(5, jobStatusTraceEvent.getSlaveId());
preparedStatement.setString(6, jobStatusTraceEvent.getSource().toString());
preparedStatement.setString(7, jobStatusTraceEvent.getExecutionType());
preparedStatement.setString(8, jobStatusTraceEvent.getShardingItems());
preparedStatement.setString(9, jobStatusTraceEvent.getState().toString());
preparedStatement.setString(10, truncateString(jobStatusTraceEvent.getMessage()));
preparedStatement.setTimestamp(11, new Timestamp(jobStatusTraceEvent.getCreationTime().getTime()));
preparedStatement.execute();
result = true;
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error(ex.getMessage());
}
return result;
}
private String getOriginalTaskId(final String taskId) {
String result = "";
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getSelectOriginalTaskIdForJobStatusTraceLog())) {
preparedStatement.setString(1, taskId);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
return resultSet.getString("original_task_id");
}
}
} catch (final SQLException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error(ex.getMessage());
}
return result;
}
private String truncateString(final String str) {
return !Strings.isNullOrEmpty(str) && str.length() > 4000 ? str.substring(0, 4000) : str;
}
List<JobStatusTraceEvent> getJobStatusTraceEvents(final String taskId) {
List<JobStatusTraceEvent> result = new ArrayList<>();
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlMapper.getSelectForJobStatusTraceLog())) {
preparedStatement.setString(1, taskId);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
while (resultSet.next()) {
JobStatusTraceEvent jobStatusTraceEvent = new JobStatusTraceEvent(resultSet.getString(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(4),
resultSet.getString(5), Source.valueOf(resultSet.getString(6)), resultSet.getString(7), resultSet.getString(8),
State.valueOf(resultSet.getString(9)), resultSet.getString(10), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(resultSet.getString(11)));
result.add(jobStatusTraceEvent);
}
}
} catch (final SQLException | ParseException ex) {
// TODO log failure directly to output log, consider to be configurable in the future
log.error(ex.getMessage());
}
return result;
}
}