blob: ab581536a19ceac73f64f11124c1d2d304620468 [file] [log] [blame]
* Copyright 2015 Confluent Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.rocketmq.connect.jdbc.common;
import org.apache.rocketmq.connect.jdbc.config.Config;
import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
* Utilties for interacting with a JDBC database.
public class DBUtils {
private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);
* The default table types to include when listing tables if none are specified. Valid values
* are those specified by the @{java.sql.DatabaseMetaData#getTables} method's TABLE_TYPE column.
* The default only includes standard, user-defined tables.
public static final Set<String> DEFAULT_TABLE_TYPES = Collections.unmodifiableSet(
new HashSet<String>(Arrays.asList("TABLE"))
private static final int GET_TABLES_TYPE_COLUMN = 4;
private static final int GET_TABLES_NAME_COLUMN = 3;
private static final int GET_COLUMNS_COLUMN_NAME = 4;
private static final int GET_COLUMNS_IS_NULLABLE = 18;
private static final int GET_COLUMNS_IS_AUTOINCREMENT = 23;
private static ThreadLocal<SimpleDateFormat> DATE_FORMATTER = new ThreadLocal<SimpleDateFormat>() {
protected SimpleDateFormat initialValue() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
return sdf;
* Get a list of tables in the database. This uses the default filters, which only include
* user-defined tables.
* @param conn database connection
* @return a list of tables
* @throws SQLException
public static List<String> getTables(Connection conn) throws SQLException {
return getTables(conn, DEFAULT_TABLE_TYPES);
* Get a list of table names in the database.
* @param conn database connection
* @param types a set of table types that should be included in the results
* @throws SQLException
public static List<String> getTables(Connection conn, Set<String> types) throws SQLException {
DatabaseMetaData metadata = conn.getMetaData();
ResultSet rs = metadata.getTables(null, null, "%", null);
List<String> tableNames = new ArrayList<String>();
while ( {
if (types.contains(rs.getString(GET_TABLES_TYPE_COLUMN))) {
String colName = rs.getString(GET_TABLES_NAME_COLUMN);
// SQLite JDBC driver does not correctly mark these as system tables
if (metadata.getDatabaseProductName().equals("SQLite") && colName.startsWith("sqlite_")) {
return tableNames;
* Look up the autoincrement column for the specified table.
* @param conn database connection
* @param table the table to
* @return the name of the column that is an autoincrement column, or null if there is no
* autoincrement column or more than one exists
* @throws SQLException
public static String getAutoincrementColumn(Connection conn, String table) throws SQLException {
String result = null;
int matches = 0;
ResultSet rs = conn.getMetaData().getColumns(null, null, table, "%");
// Some database drivers (SQLite) don't include all the columns
if (rs.getMetaData().getColumnCount() >= GET_COLUMNS_IS_AUTOINCREMENT) {
while ( {
if (rs.getString(GET_COLUMNS_IS_AUTOINCREMENT).equals("YES")) {
result = rs.getString(GET_COLUMNS_COLUMN_NAME);
return (matches == 1 ? result : null);
// Fallback approach is to query for a single row. This unfortunately does not work with any
// empty table
log.trace("Falling back to SELECT detection of auto-increment column for {}:{}", conn, table);
Statement stmt = conn.createStatement();
try {
String quoteString = getIdentifierQuoteString(conn);
rs = stmt.executeQuery("SELECT * FROM " + quoteString + table + quoteString + " LIMIT 1");
ResultSetMetaData rsmd = rs.getMetaData();
for (int i = 1; i < rsmd.getColumnCount(); i++) {
if (rsmd.isAutoIncrement(i)) {
result = rsmd.getColumnName(i);
} finally {
return (matches == 1 ? result : null);
public static boolean isColumnNullable(Connection conn, String table, String column)
throws SQLException {
ResultSet rs = conn.getMetaData().getColumns(null, null, table, column);
if (rs.getMetaData().getColumnCount() > GET_COLUMNS_IS_NULLABLE) {
// Should only be one match
if (! {
return false;
String val = rs.getString(GET_COLUMNS_IS_NULLABLE);
return rs.getString(GET_COLUMNS_IS_NULLABLE).equals("YES");
return false;
* Format the given Date assuming UTC timezone in a format supported by SQL.
* @param date the date to convert to a String
* @return the formatted string
public static String formatUTC(Date date) {
return DATE_FORMATTER.get().format(date);
* Get the string used for quoting identifiers in this database's SQL dialect.
* @param connection the database connection
* @return the quote string
* @throws SQLException
public static String getIdentifierQuoteString(Connection connection) throws SQLException {
String quoteString = connection.getMetaData().getIdentifierQuoteString();
quoteString = quoteString == null ? "" : quoteString;
return quoteString;
* Quote the given string.
* @param orig the string to quote
* @param quote the quote character
* @return the quoted string
public static String quoteString(String orig, String quote) {
return quote + orig + quote;
public static DataSource initDataSource(Config config) throws Exception {
Map<String, String> map = new HashMap<>();
map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
"jdbc:mysql://" + config.getDbUrl() + ":" + config.getDbPort() + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8&characterEncoding=utf8");
map.put("username", config.getDbUsername());
map.put("password", config.getDbPassword());
map.put("initialSize", "1");
map.put("maxActive", "1");
map.put("maxWait", "60000");
map.put("timeBetweenEvictionRunsMillis", "60000");
map.put("minEvictableIdleTimeMillis", "300000");
map.put("validationQuery", "SELECT 1 FROM DUAL");
map.put("testWhileIdle", "true");"{} config read successful", map);
DataSource dataSource = DruidDataSourceFactory.createDataSource(map);"init data source success");
return dataSource;