blob: 1cd99d362d6734c6457c9f92df119c6dfe10d39c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.seatunnel.datasource.plugin.cdc.mysql;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MysqlCDCDataSourceChannel implements DataSourceChannel {
public static final Set<String> MYSQL_SYSTEM_DATABASES =
Sets.newHashSet("information_schema", "mysql", "performance_schema", "sys");
public boolean canAbleGetSchema() {
return true;
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return MysqlCDCOptionRule.optionRule();
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return MysqlCDCOptionRule.metadataRule();
public List<String> getTables(
String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> options) {
return this.getTableNames(requestParams, database, options);
public List<String> getDatabases(String pluginName, Map<String, String> requestParams) {
try {
return this.getDataBaseNames(requestParams);
} catch (SQLException e) {
throw new DataSourcePluginException("get databases failed", e);
public boolean checkDataSourceConnectivity(
String pluginName, Map<String, String> requestParams) {
return this.checkJdbcConnectivity(requestParams);
public List<TableField> getTableFields(
String pluginName, Map<String, String> requestParams, String database, String table) {
return getTableFields(requestParams, database, table);
public Map<String, List<TableField>> getTableFields(
String pluginName,
Map<String, String> requestParams,
String database,
List<String> tables) {
Map<String, List<TableField>> tableFields = new HashMap<>(tables.size());
for (String table : tables) {
tableFields.put(table, getTableFields(requestParams, database, table));
return tableFields;
protected boolean checkJdbcConnectivity(Map<String, String> requestParams) {
try (Connection connection = init(requestParams);
Statement statement = connection.createStatement()) {
try (ResultSet resultSet = statement.executeQuery("SHOW MASTER STATUS"); ) {
if ( {
String binlogFile = resultSet.getString("File");
if (StringUtils.isBlank(binlogFile)) {
throw new DataSourcePluginException("binlog must be enabled");
} else {
throw new DataSourcePluginException("binlog must be enabled");
try (ResultSet resultSet =
statement.executeQuery("SHOW VARIABLES LIKE 'binlog_format'")) {
if ( {
String binlogFormat = resultSet.getString("Value");
if (!"ROW".equalsIgnoreCase(binlogFormat)) {
throw new DataSourcePluginException("binlog_format must be ROW");
} else {
throw new DataSourcePluginException("binlog_format must be ROW");
try (ResultSet resultSet =
statement.executeQuery("SHOW VARIABLES LIKE 'binlog_row_image'")) {
if ( {
String binlogRowImage = resultSet.getString("Value");
if (!"FULL".equalsIgnoreCase(binlogRowImage)) {
throw new DataSourcePluginException("binlog_row_image must be FULL");
} else {
throw new DataSourcePluginException("binlog_row_image must be FULL");
return true;
} catch (Exception e) {
throw new DataSourcePluginException(
"check jdbc connectivity failed, " + e.getMessage(), e);
protected Connection init(Map<String, String> requestParams) throws SQLException {
if (null == requestParams.get(MysqlCDCOptionRule.BASE_URL.key())) {
throw new DataSourcePluginException("Jdbc url is null");
String url = requestParams.get(MysqlCDCOptionRule.BASE_URL.key());
if (null != requestParams.get(MysqlCDCOptionRule.PASSWORD.key())
&& null != requestParams.get(MysqlCDCOptionRule.USERNAME.key())) {
String username = requestParams.get(MysqlCDCOptionRule.USERNAME.key());
String password = requestParams.get(MysqlCDCOptionRule.PASSWORD.key());
return DriverManager.getConnection(url, username, password);
return DriverManager.getConnection(url);
protected List<String> getDataBaseNames(Map<String, String> requestParams) throws SQLException {
List<String> dbNames = new ArrayList<>();
try (Connection connection = init(requestParams);
PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;");
ResultSet re = statement.executeQuery()) {
// filter system databases
while ( {
String dbName = re.getString("database");
if (StringUtils.isNotBlank(dbName) && isNotSystemDatabase(dbName)) {
return dbNames;
protected List<String> getTableNames(
Map<String, String> requestParams, String dbName, Map<String, String> options) {
List<String> tableNames = new ArrayList<>();
String filterName = options.get("filterName");
String size = options.get("size");
boolean isSize = StringUtils.isNotEmpty(size);
if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) {
filterName = "%" + filterName + "%";
} else if (StringUtils.equals(filterName, "")) {
filterName = null;
try (Connection connection = init(requestParams);
ResultSet resultSet =
.getTables(dbName, null, filterName, new String[] {"TABLE"})) {
while ( {
String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) {
if (isSize && tableNames.size() >= Integer.parseInt(size)) {
return tableNames;
} catch (SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
protected List<TableField> getTableFields(
Map<String, String> requestParams, String dbName, String tableName) {
List<TableField> tableFields = new ArrayList<>();
try (Connection connection = init(requestParams); ) {
DatabaseMetaData metaData = connection.getMetaData();
String primaryKey = getPrimaryKey(metaData, dbName, tableName);
ResultSet resultSet = metaData.getColumns(dbName, null, tableName, null);
while ( {
TableField tableField = new TableField();
String columnName = resultSet.getString("COLUMN_NAME");
if (StringUtils.isNotBlank(primaryKey) && primaryKey.equals(columnName)) {
Object nullable = resultSet.getObject("IS_NULLABLE");
boolean isNullable = convertToBoolean(nullable);
} catch (SQLException e) {
throw new DataSourcePluginException("get table fields failed", e);
return tableFields;
private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName);
while ( {
return primaryKeysInfo.getString("COLUMN_NAME");
return null;
private boolean isNotSystemDatabase(String dbName) {
.noneMatch(systemDatabase -> StringUtils.equalsIgnoreCase(systemDatabase, dbName));
private boolean convertToBoolean(Object value) {
if (value instanceof Boolean) {
return (Boolean) value;
if (value instanceof String) {
return value.equals("TRUE");
return false;