blob: 78a7f62e9f0ce6f2e2887f8533eff716426ac820 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.mysql.jdbc;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
public class MysqlJdbcDataSourceChannel implements DataSourceChannel {
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return MysqlDataSourceConfig.OPTION_RULE;
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return MysqlDataSourceConfig.METADATA_RULE;
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> options) {
List<String> tableNames = new ArrayList<>();
String filterName = options.get("filterName");
String size = options.get("size");
boolean isSize = StringUtils.isNotEmpty(size);
if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) {
filterName = "%" + filterName + "%";
} else if (StringUtils.equals(filterName, "")) {
filterName = null;
}
try (Connection connection = getConnection(requestParams);
ResultSet resultSet =
connection
.getMetaData()
.getTables(database, null, filterName, new String[] {"TABLE"})) {
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) {
tableNames.add(tableName);
if (isSize && tableNames.size() >= Integer.parseInt(size)) {
break;
}
}
}
return tableNames;
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
}
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
List<String> dbNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams);
PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;");
ResultSet re = statement.executeQuery()) {
// filter system databases
while (re.next()) {
String dbName = re.getString("database");
if (StringUtils.isNotBlank(dbName)
&& !MysqlDataSourceConfig.MYSQL_SYSTEM_DATABASES.contains(dbName)) {
dbNames.add(dbName);
}
}
return dbNames;
} catch (SQLException | ClassNotFoundException e) {
throw new DataSourcePluginException("Get databases failed", e);
}
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try (Connection ignored = getConnection(requestParams)) {
return true;
} catch (Exception e) {
throw new DataSourcePluginException("check jdbc connectivity failed", e);
}
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
List<TableField> tableFields = new ArrayList<>();
try (Connection connection = getConnection(requestParams, database)) {
DatabaseMetaData metaData = connection.getMetaData();
String primaryKey = getPrimaryKey(metaData, database, table);
try (ResultSet resultSet = metaData.getColumns(database, null, table, null)) {
while (resultSet.next()) {
TableField tableField = new TableField();
String columnName = resultSet.getString("COLUMN_NAME");
tableField.setPrimaryKey(false);
if (StringUtils.isNotBlank(primaryKey) && primaryKey.equals(columnName)) {
tableField.setPrimaryKey(true);
}
tableField.setName(columnName);
tableField.setType(resultSet.getString("TYPE_NAME"));
tableField.setComment(resultSet.getString("REMARKS"));
Object nullable = resultSet.getObject("IS_NULLABLE");
tableField.setNullable(Boolean.TRUE.toString().equals(nullable.toString()));
tableFields.add(tableField);
}
}
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get table fields failed", e);
}
return tableFields;
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
return tables.parallelStream()
.collect(
Collectors.toMap(
Function.identity(),
table ->
getTableFields(
pluginName, requestParams, database, table)));
}
private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName);
while (primaryKeysInfo.next()) {
return primaryKeysInfo.getString("COLUMN_NAME");
}
return null;
}
private Connection getConnection(Map<String, String> requestParams)
throws SQLException, ClassNotFoundException {
return getConnection(requestParams, null);
}
private Connection getConnection(Map<String, String> requestParams, String databaseName)
throws SQLException, ClassNotFoundException {
checkNotNull(requestParams.get(MysqlOptionRule.DRIVER.key()));
checkNotNull(requestParams.get(MysqlOptionRule.URL.key()), "Jdbc url cannot be null");
String url =
JdbcUtils.replaceDatabase(
requestParams.get(MysqlOptionRule.URL.key()), databaseName);
if (requestParams.containsKey(MysqlOptionRule.USER.key())) {
String username = requestParams.get(MysqlOptionRule.USER.key());
String password = requestParams.get(MysqlOptionRule.PASSWORD.key());
return DriverManager.getConnection(url, username, password);
}
return DriverManager.getConnection(url);
}
}