| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.shardingsphere.elasticjob.cloud.event.rdb; |
| |
| import com.google.common.base.CaseFormat; |
| import com.google.common.base.Strings; |
| import com.google.common.collect.Lists; |
| import org.apache.shardingsphere.elasticjob.cloud.context.ExecutionType; |
| import org.apache.shardingsphere.elasticjob.cloud.event.type.JobExecutionEvent; |
| import org.apache.shardingsphere.elasticjob.cloud.event.type.JobExecutionEventThrowable; |
| import org.apache.shardingsphere.elasticjob.cloud.event.type.JobStatusTraceEvent; |
| import lombok.Getter; |
| import lombok.RequiredArgsConstructor; |
| import lombok.extern.slf4j.Slf4j; |
| |
| import javax.sql.DataSource; |
| import java.sql.Connection; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Timestamp; |
| import java.util.Collection; |
| import java.util.Date; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| |
| /** |
| * Job event RDB search. |
| */ |
| @RequiredArgsConstructor |
| @Slf4j |
| public final class JobEventRdbSearch { |
| |
| private static final String TABLE_JOB_EXECUTION_LOG = "JOB_EXECUTION_LOG"; |
| |
| private static final String TABLE_JOB_STATUS_TRACE_LOG = "JOB_STATUS_TRACE_LOG"; |
| |
| private static final List<String> FIELDS_JOB_EXECUTION_LOG = |
| Lists.newArrayList("id", "hostname", "ip", "task_id", "job_name", "execution_source", "sharding_item", "start_time", "complete_time", "is_success", "failure_cause"); |
| |
| private static final List<String> FIELDS_JOB_STATUS_TRACE_LOG = |
| Lists.newArrayList("id", "job_name", "original_task_id", "task_id", "slave_id", "source", "execution_type", "sharding_item", "state", "message", "creation_time"); |
| |
| private final DataSource dataSource; |
| |
| /** |
| * Find job execution events. |
| * |
| * @param condition query condition |
| * @return job execution events |
| */ |
| public Result<JobExecutionEvent> findJobExecutionEvents(final Condition condition) { |
| return new Result<>(getEventCount(TABLE_JOB_EXECUTION_LOG, FIELDS_JOB_EXECUTION_LOG, condition), getJobExecutionEvents(condition)); |
| } |
| |
| /** |
| * Find job status trace events. |
| * |
| * @param condition query condition |
| * @return job status trace events |
| */ |
| public Result<JobStatusTraceEvent> findJobStatusTraceEvents(final Condition condition) { |
| return new Result<>(getEventCount(TABLE_JOB_STATUS_TRACE_LOG, FIELDS_JOB_STATUS_TRACE_LOG, condition), getJobStatusTraceEvents(condition)); |
| } |
| |
| private List<JobExecutionEvent> getJobExecutionEvents(final Condition condition) { |
| List<JobExecutionEvent> result = new LinkedList<>(); |
| try ( |
| Connection conn = dataSource.getConnection(); |
| PreparedStatement preparedStatement = createDataPreparedStatement(conn, TABLE_JOB_EXECUTION_LOG, FIELDS_JOB_EXECUTION_LOG, condition); |
| ResultSet resultSet = preparedStatement.executeQuery() |
| ) { |
| while (resultSet.next()) { |
| JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(resultSet.getString(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(4), |
| resultSet.getString(5), JobExecutionEvent.ExecutionSource.valueOf(resultSet.getString(6)), Integer.valueOf(resultSet.getString(7)), |
| new Date(resultSet.getTimestamp(8).getTime()), resultSet.getTimestamp(9) == null ? null : new Date(resultSet.getTimestamp(9).getTime()), |
| resultSet.getBoolean(10), new JobExecutionEventThrowable(null, resultSet.getString(11)) |
| ); |
| result.add(jobExecutionEvent); |
| } |
| } catch (final SQLException ex) { |
| // TODO log failure directly to output log, consider to be configurable in the future |
| log.error("Fetch JobExecutionEvent from DB error:", ex); |
| } |
| return result; |
| } |
| |
| private List<JobStatusTraceEvent> getJobStatusTraceEvents(final Condition condition) { |
| List<JobStatusTraceEvent> result = new LinkedList<>(); |
| try ( |
| Connection conn = dataSource.getConnection(); |
| PreparedStatement preparedStatement = createDataPreparedStatement(conn, TABLE_JOB_STATUS_TRACE_LOG, FIELDS_JOB_STATUS_TRACE_LOG, condition); |
| ResultSet resultSet = preparedStatement.executeQuery() |
| ) { |
| while (resultSet.next()) { |
| JobStatusTraceEvent jobStatusTraceEvent = new JobStatusTraceEvent(resultSet.getString(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(4), |
| resultSet.getString(5), JobStatusTraceEvent.Source.valueOf(resultSet.getString(6)), ExecutionType.valueOf(resultSet.getString(7)), resultSet.getString(8), |
| JobStatusTraceEvent.State.valueOf(resultSet.getString(9)), resultSet.getString(10), new Date(resultSet.getTimestamp(11).getTime())); |
| result.add(jobStatusTraceEvent); |
| } |
| } catch (final SQLException ex) { |
| // TODO log failure directly to output log, consider to be configurable in the future |
| log.error("Fetch JobStatusTraceEvent from DB error:", ex); |
| } |
| return result; |
| } |
| |
| private int getEventCount(final String tableName, final Collection<String> tableFields, final Condition condition) { |
| int result = 0; |
| try ( |
| Connection conn = dataSource.getConnection(); |
| PreparedStatement preparedStatement = createCountPreparedStatement(conn, tableName, tableFields, condition); |
| ResultSet resultSet = preparedStatement.executeQuery() |
| ) { |
| resultSet.next(); |
| result = resultSet.getInt(1); |
| } catch (final SQLException ex) { |
| // TODO log failure directly to output log, consider to be configurable in the future |
| log.error("Fetch EventCount from DB error:", ex); |
| } |
| return result; |
| } |
| |
| private PreparedStatement createDataPreparedStatement(final Connection conn, final String tableName, final Collection<String> tableFields, final Condition condition) throws SQLException { |
| String sql = buildDataSql(tableName, tableFields, condition); |
| PreparedStatement preparedStatement = conn.prepareStatement(sql); |
| setBindValue(preparedStatement, tableFields, condition); |
| return preparedStatement; |
| } |
| |
| private PreparedStatement createCountPreparedStatement(final Connection conn, final String tableName, final Collection<String> tableFields, final Condition condition) throws SQLException { |
| String sql = buildCountSql(tableName, tableFields, condition); |
| PreparedStatement preparedStatement = conn.prepareStatement(sql); |
| setBindValue(preparedStatement, tableFields, condition); |
| return preparedStatement; |
| } |
| |
| private String buildDataSql(final String tableName, final Collection<String> tableFields, final Condition condition) { |
| StringBuilder sqlBuilder = new StringBuilder(); |
| String selectSql = buildSelect(tableName, tableFields); |
| String whereSql = buildWhere(tableName, tableFields, condition); |
| String orderSql = buildOrder(tableFields, condition.getSort(), condition.getOrder()); |
| String limitSql = buildLimit(condition.getPage(), condition.getPerPage()); |
| sqlBuilder.append(selectSql).append(whereSql).append(orderSql).append(limitSql); |
| return sqlBuilder.toString(); |
| } |
| |
| private String buildCountSql(final String tableName, final Collection<String> tableFields, final Condition condition) { |
| StringBuilder sqlBuilder = new StringBuilder(); |
| String selectSql = buildSelectCount(tableName); |
| String whereSql = buildWhere(tableName, tableFields, condition); |
| sqlBuilder.append(selectSql).append(whereSql); |
| return sqlBuilder.toString(); |
| } |
| |
| private String buildSelectCount(final String tableName) { |
| return String.format("SELECT COUNT(1) FROM %s", tableName); |
| } |
| |
| private String buildSelect(final String tableName, final Collection<String> tableFields) { |
| StringBuilder sqlBuilder = new StringBuilder(); |
| sqlBuilder.append("SELECT "); |
| for (String each : tableFields) { |
| sqlBuilder.append(each).append(","); |
| } |
| sqlBuilder.deleteCharAt(sqlBuilder.length() - 1); |
| sqlBuilder.append(" FROM ").append(tableName); |
| return sqlBuilder.toString(); |
| } |
| |
| private String buildWhere(final String tableName, final Collection<String> tableFields, final Condition condition) { |
| StringBuilder sqlBuilder = new StringBuilder(); |
| sqlBuilder.append(" WHERE 1=1"); |
| if (null != condition.getFields() && !condition.getFields().isEmpty()) { |
| for (Map.Entry<String, Object> entry : condition.getFields().entrySet()) { |
| String lowerUnderscore = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, entry.getKey()); |
| if (null != entry.getValue() && tableFields.contains(lowerUnderscore)) { |
| sqlBuilder.append(" AND ").append(lowerUnderscore).append("=?"); |
| } |
| } |
| } |
| if (null != condition.getStartTime()) { |
| sqlBuilder.append(" AND ").append(getTableTimeField(tableName)).append(">=?"); |
| } |
| if (null != condition.getEndTime()) { |
| sqlBuilder.append(" AND ").append(getTableTimeField(tableName)).append("<=?"); |
| } |
| return sqlBuilder.toString(); |
| } |
| |
| private void setBindValue(final PreparedStatement preparedStatement, final Collection<String> tableFields, final Condition condition) throws SQLException { |
| int index = 1; |
| if (null != condition.getFields() && !condition.getFields().isEmpty()) { |
| for (Map.Entry<String, Object> entry : condition.getFields().entrySet()) { |
| String lowerUnderscore = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, entry.getKey()); |
| if (null != entry.getValue() && tableFields.contains(lowerUnderscore)) { |
| preparedStatement.setString(index++, String.valueOf(entry.getValue())); |
| } |
| } |
| } |
| if (null != condition.getStartTime()) { |
| preparedStatement.setTimestamp(index++, new Timestamp(condition.getStartTime().getTime())); |
| } |
| if (null != condition.getEndTime()) { |
| preparedStatement.setTimestamp(index, new Timestamp(condition.getEndTime().getTime())); |
| } |
| } |
| |
| private String getTableTimeField(final String tableName) { |
| String result = ""; |
| if (TABLE_JOB_EXECUTION_LOG.equals(tableName)) { |
| result = "start_time"; |
| } else if (TABLE_JOB_STATUS_TRACE_LOG.equals(tableName)) { |
| result = "creation_time"; |
| } |
| return result; |
| } |
| |
| private String buildOrder(final Collection<String> tableFields, final String sortName, final String sortOrder) { |
| if (Strings.isNullOrEmpty(sortName)) { |
| return ""; |
| } |
| String lowerUnderscore = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, sortName); |
| if (!tableFields.contains(lowerUnderscore)) { |
| return ""; |
| } |
| StringBuilder sqlBuilder = new StringBuilder(); |
| sqlBuilder.append(" ORDER BY ").append(lowerUnderscore); |
| switch (sortOrder.toUpperCase()) { |
| case "ASC": |
| sqlBuilder.append(" ASC"); |
| break; |
| case "DESC": |
| sqlBuilder.append(" DESC"); |
| break; |
| default : |
| sqlBuilder.append(" ASC"); |
| } |
| return sqlBuilder.toString(); |
| } |
| |
| private String buildLimit(final int page, final int perPage) { |
| StringBuilder sqlBuilder = new StringBuilder(); |
| if (page > 0 && perPage > 0) { |
| sqlBuilder.append(" LIMIT ").append((page - 1) * perPage).append(",").append(perPage); |
| } else { |
| sqlBuilder.append(" LIMIT ").append(Condition.DEFAULT_PAGE_SIZE); |
| } |
| return sqlBuilder.toString(); |
| } |
| |
| /** |
| * Query condition. |
| */ |
| @RequiredArgsConstructor |
| @Getter |
| public static class Condition { |
| |
| private static final int DEFAULT_PAGE_SIZE = 10; |
| |
| private final int perPage; |
| |
| private final int page; |
| |
| private final String sort; |
| |
| private final String order; |
| |
| private final Date startTime; |
| |
| private final Date endTime; |
| |
| private final Map<String, Object> fields; |
| } |
| |
| @RequiredArgsConstructor |
| @Getter |
| public static class Result<T> { |
| |
| private final Integer total; |
| |
| private final List<T> rows; |
| } |
| } |