| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.connector.jdbc.catalog; |
| |
| import org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory; |
| import org.apache.flink.table.api.Schema; |
| import org.apache.flink.table.api.ValidationException; |
| import org.apache.flink.table.catalog.AbstractCatalog; |
| import org.apache.flink.table.catalog.CatalogBaseTable; |
| import org.apache.flink.table.catalog.CatalogDatabase; |
| import org.apache.flink.table.catalog.CatalogDatabaseImpl; |
| import org.apache.flink.table.catalog.CatalogFunction; |
| import org.apache.flink.table.catalog.CatalogPartition; |
| import org.apache.flink.table.catalog.CatalogPartitionSpec; |
| import org.apache.flink.table.catalog.CatalogTable; |
| import org.apache.flink.table.catalog.ObjectPath; |
| import org.apache.flink.table.catalog.UniqueConstraint; |
| import org.apache.flink.table.catalog.exceptions.CatalogException; |
| import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; |
| import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; |
| import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; |
| import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; |
| import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; |
| import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; |
| import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; |
| import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; |
| import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; |
| import org.apache.flink.table.catalog.exceptions.TableNotExistException; |
| import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; |
| import org.apache.flink.table.catalog.exceptions.TablePartitionedException; |
| import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; |
| import org.apache.flink.table.catalog.stats.CatalogTableStatistics; |
| import org.apache.flink.table.expressions.Expression; |
| import org.apache.flink.table.factories.Factory; |
| import org.apache.flink.table.types.DataType; |
| import org.apache.flink.util.Preconditions; |
| import org.apache.flink.util.StringUtils; |
| import org.apache.flink.util.TemporaryClassLoaderContext; |
| |
| import org.apache.commons.compress.utils.Lists; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.sql.Connection; |
| import java.sql.DatabaseMetaData; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.ResultSetMetaData; |
| import java.sql.SQLException; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.function.Predicate; |
| |
| import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD; |
| import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME; |
| import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL; |
| import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME; |
| import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER; |
| import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; |
| import static org.apache.flink.util.Preconditions.checkArgument; |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** Abstract catalog for any JDBC catalogs. */ |
| public abstract class AbstractJdbcCatalog extends AbstractCatalog { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class); |
| |
| protected final ClassLoader userClassLoader; |
| protected final String username; |
| protected final String pwd; |
| protected final String baseUrl; |
| protected final String defaultUrl; |
| |
| public AbstractJdbcCatalog( |
| ClassLoader userClassLoader, |
| String catalogName, |
| String defaultDatabase, |
| String username, |
| String pwd, |
| String baseUrl) { |
| super(catalogName, defaultDatabase); |
| |
| checkNotNull(userClassLoader); |
| checkArgument(!StringUtils.isNullOrWhitespaceOnly(username)); |
| checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd)); |
| checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl)); |
| |
| JdbcCatalogUtils.validateJdbcUrl(baseUrl); |
| |
| this.userClassLoader = userClassLoader; |
| this.username = username; |
| this.pwd = pwd; |
| this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; |
| this.defaultUrl = this.baseUrl + defaultDatabase; |
| } |
| |
| @Override |
| public void open() throws CatalogException { |
| // load the Driver use userClassLoader explicitly, see FLINK-15635 for more detail |
| try (TemporaryClassLoaderContext ignored = |
| TemporaryClassLoaderContext.of(userClassLoader)) { |
| // test connection, fail early if we cannot connect to database |
| try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { |
| } catch (SQLException e) { |
| throw new ValidationException( |
| String.format("Failed connecting to %s via JDBC.", defaultUrl), e); |
| } |
| LOG.info("Catalog {} established connection to {}", getName(), defaultUrl); |
| } |
| } |
| |
| @Override |
| public void close() throws CatalogException { |
| LOG.info("Catalog {} closing", getName()); |
| } |
| |
| // ----- getters ------ |
| |
| public String getUsername() { |
| return username; |
| } |
| |
| public String getPassword() { |
| return pwd; |
| } |
| |
| public String getBaseUrl() { |
| return baseUrl; |
| } |
| |
| // ------ retrieve PK constraint ------ |
| |
| protected Optional<UniqueConstraint> getPrimaryKey( |
| DatabaseMetaData metaData, String database, String schema, String table) |
| throws SQLException { |
| |
| // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys, |
| // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ. |
| // We need to sort them based on the KEY_SEQ value. |
| // In the currently supported database dialects MySQL and Postgres, |
| // the database term is equivalent to catalog term. |
| // We need to pass the database name as catalog parameter for retrieving primary keys by |
| // full table identifier. |
| ResultSet rs = metaData.getPrimaryKeys(database, schema, table); |
| |
| Map<Integer, String> keySeqColumnName = new HashMap<>(); |
| String pkName = null; |
| while (rs.next()) { |
| String columnName = rs.getString("COLUMN_NAME"); |
| pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same |
| int keySeq = rs.getInt("KEY_SEQ"); |
| Preconditions.checkState( |
| !keySeqColumnName.containsKey(keySeq - 1), |
| "The field(s) of primary key must be from the same table."); |
| keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index |
| } |
| List<String> pkFields = |
| Arrays.asList(new String[keySeqColumnName.size()]); // initialize size |
| keySeqColumnName.forEach(pkFields::set); |
| if (!pkFields.isEmpty()) { |
| // PK_NAME maybe null according to the javadoc, generate an unique name in that case |
| pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName; |
| return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields)); |
| } |
| return Optional.empty(); |
| } |
| |
| // ------ table factory ------ |
| |
| @Override |
| public Optional<Factory> getFactory() { |
| return Optional.of(new JdbcDynamicTableFactory()); |
| } |
| |
| // ------ databases ------ |
| |
| @Override |
| public boolean databaseExists(String databaseName) throws CatalogException { |
| checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); |
| |
| return listDatabases().contains(databaseName); |
| } |
| |
| @Override |
| public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) |
| throws DatabaseAlreadyExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) |
| throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) |
| throws DatabaseNotExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public CatalogDatabase getDatabase(String databaseName) |
| throws DatabaseNotExistException, CatalogException { |
| |
| Preconditions.checkState( |
| !StringUtils.isNullOrWhitespaceOnly(databaseName), |
| "Database name must not be blank."); |
| if (listDatabases().contains(databaseName)) { |
| return new CatalogDatabaseImpl(Collections.emptyMap(), null); |
| } else { |
| throw new DatabaseNotExistException(getName(), databaseName); |
| } |
| } |
| |
| // ------ tables and views ------ |
| |
| @Override |
| public CatalogBaseTable getTable(ObjectPath tablePath) |
| throws TableNotExistException, CatalogException { |
| |
| if (!tableExists(tablePath)) { |
| throw new TableNotExistException(getName(), tablePath); |
| } |
| |
| String databaseName = tablePath.getDatabaseName(); |
| String dbUrl = baseUrl + databaseName; |
| |
| try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { |
| DatabaseMetaData metaData = conn.getMetaData(); |
| Optional<UniqueConstraint> primaryKey = |
| getPrimaryKey( |
| metaData, |
| databaseName, |
| getSchemaName(tablePath), |
| getTableName(tablePath)); |
| |
| PreparedStatement ps = |
| conn.prepareStatement( |
| String.format("SELECT * FROM %s;", getSchemaTableName(tablePath))); |
| |
| ResultSetMetaData resultSetMetaData = ps.getMetaData(); |
| |
| String[] columnNames = new String[resultSetMetaData.getColumnCount()]; |
| DataType[] types = new DataType[resultSetMetaData.getColumnCount()]; |
| |
| for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { |
| columnNames[i - 1] = resultSetMetaData.getColumnName(i); |
| types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i); |
| if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) { |
| types[i - 1] = types[i - 1].notNull(); |
| } |
| } |
| |
| Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types); |
| primaryKey.ifPresent( |
| pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns())); |
| Schema tableSchema = schemaBuilder.build(); |
| |
| Map<String, String> props = new HashMap<>(); |
| props.put(CONNECTOR.key(), IDENTIFIER); |
| props.put(URL.key(), dbUrl); |
| props.put(USERNAME.key(), username); |
| props.put(PASSWORD.key(), pwd); |
| props.put(TABLE_NAME.key(), getSchemaTableName(tablePath)); |
| return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props); |
| } catch (Exception e) { |
| throw new CatalogException( |
| String.format("Failed getting table %s", tablePath.getFullName()), e); |
| } |
| } |
| |
| @Override |
| public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) |
| throws TableNotExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) |
| throws TableNotExistException, TableAlreadyExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) |
| throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void alterTable( |
| ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) |
| throws TableNotExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public List<String> listViews(String databaseName) |
| throws DatabaseNotExistException, CatalogException { |
| return Collections.emptyList(); |
| } |
| |
| // ------ partitions ------ |
| |
| @Override |
| public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) |
| throws TableNotExistException, TableNotPartitionedException, CatalogException { |
| return Collections.emptyList(); |
| } |
| |
| @Override |
| public List<CatalogPartitionSpec> listPartitions( |
| ObjectPath tablePath, CatalogPartitionSpec partitionSpec) |
| throws TableNotExistException, TableNotPartitionedException, |
| PartitionSpecInvalidException, CatalogException { |
| return Collections.emptyList(); |
| } |
| |
| @Override |
| public List<CatalogPartitionSpec> listPartitionsByFilter( |
| ObjectPath tablePath, List<Expression> filters) |
| throws TableNotExistException, TableNotPartitionedException, CatalogException { |
| return Collections.emptyList(); |
| } |
| |
| @Override |
| public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) |
| throws PartitionNotExistException, CatalogException { |
| throw new PartitionNotExistException(getName(), tablePath, partitionSpec); |
| } |
| |
| @Override |
| public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) |
| throws CatalogException { |
| return false; |
| } |
| |
| @Override |
| public void createPartition( |
| ObjectPath tablePath, |
| CatalogPartitionSpec partitionSpec, |
| CatalogPartition partition, |
| boolean ignoreIfExists) |
| throws TableNotExistException, TableNotPartitionedException, |
| PartitionSpecInvalidException, PartitionAlreadyExistsException, |
| CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void dropPartition( |
| ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) |
| throws PartitionNotExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void alterPartition( |
| ObjectPath tablePath, |
| CatalogPartitionSpec partitionSpec, |
| CatalogPartition newPartition, |
| boolean ignoreIfNotExists) |
| throws PartitionNotExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| // ------ functions ------ |
| |
| @Override |
| public List<String> listFunctions(String dbName) |
| throws DatabaseNotExistException, CatalogException { |
| return Collections.emptyList(); |
| } |
| |
| @Override |
| public CatalogFunction getFunction(ObjectPath functionPath) |
| throws FunctionNotExistException, CatalogException { |
| throw new FunctionNotExistException(getName(), functionPath); |
| } |
| |
| @Override |
| public boolean functionExists(ObjectPath functionPath) throws CatalogException { |
| return false; |
| } |
| |
| @Override |
| public void createFunction( |
| ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) |
| throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void alterFunction( |
| ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) |
| throws FunctionNotExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) |
| throws FunctionNotExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| // ------ stats ------ |
| |
| @Override |
| public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) |
| throws TableNotExistException, CatalogException { |
| return CatalogTableStatistics.UNKNOWN; |
| } |
| |
| @Override |
| public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) |
| throws TableNotExistException, CatalogException { |
| return CatalogColumnStatistics.UNKNOWN; |
| } |
| |
| @Override |
| public CatalogTableStatistics getPartitionStatistics( |
| ObjectPath tablePath, CatalogPartitionSpec partitionSpec) |
| throws PartitionNotExistException, CatalogException { |
| return CatalogTableStatistics.UNKNOWN; |
| } |
| |
| @Override |
| public CatalogColumnStatistics getPartitionColumnStatistics( |
| ObjectPath tablePath, CatalogPartitionSpec partitionSpec) |
| throws PartitionNotExistException, CatalogException { |
| return CatalogColumnStatistics.UNKNOWN; |
| } |
| |
| @Override |
| public void alterTableStatistics( |
| ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) |
| throws TableNotExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void alterTableColumnStatistics( |
| ObjectPath tablePath, |
| CatalogColumnStatistics columnStatistics, |
| boolean ignoreIfNotExists) |
| throws TableNotExistException, CatalogException, TablePartitionedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void alterPartitionStatistics( |
| ObjectPath tablePath, |
| CatalogPartitionSpec partitionSpec, |
| CatalogTableStatistics partitionStatistics, |
| boolean ignoreIfNotExists) |
| throws PartitionNotExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void alterPartitionColumnStatistics( |
| ObjectPath tablePath, |
| CatalogPartitionSpec partitionSpec, |
| CatalogColumnStatistics columnStatistics, |
| boolean ignoreIfNotExists) |
| throws PartitionNotExistException, CatalogException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| protected List<String> extractColumnValuesBySQL( |
| String connUrl, |
| String sql, |
| int columnIndex, |
| Predicate<String> filterFunc, |
| Object... params) { |
| |
| try (Connection conn = DriverManager.getConnection(connUrl, username, pwd); |
| PreparedStatement ps = conn.prepareStatement(sql)) { |
| return extractColumnValuesByStatement(ps, columnIndex, filterFunc, params); |
| |
| } catch (Exception e) { |
| throw new CatalogException( |
| String.format( |
| "The following SQL query could not be executed (%s): %s", connUrl, sql), |
| e); |
| } |
| } |
| |
| protected static List<String> extractColumnValuesByStatement( |
| PreparedStatement ps, int columnIndex, Predicate<String> filterFunc, Object... params) |
| throws SQLException { |
| List<String> columnValues = Lists.newArrayList(); |
| if (Objects.nonNull(params) && params.length > 0) { |
| for (int i = 0; i < params.length; i++) { |
| ps.setObject(i + 1, params[i]); |
| } |
| } |
| try (ResultSet rs = ps.executeQuery()) { |
| while (rs.next()) { |
| String columnValue = rs.getString(columnIndex); |
| if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) { |
| columnValues.add(columnValue); |
| } |
| } |
| } |
| return columnValues; |
| } |
| |
| protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex) |
| throws SQLException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| protected String getTableName(ObjectPath tablePath) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| protected String getSchemaName(ObjectPath tablePath) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| protected String getSchemaTableName(ObjectPath tablePath) { |
| throw new UnsupportedOperationException(); |
| } |
| } |