blob: 7ece9b6456fe8a56cd984b8efbd635c18aae4870 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.databases.postgres.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/** Catalog for PostgreSQL. */
@Internal
public class PostgresCatalog extends AbstractJdbcCatalog {
private static final Logger LOG = LoggerFactory.getLogger(PostgresCatalog.class);
public static final String DEFAULT_DATABASE = "postgres";
// ------ Postgres default objects that shouldn't be exposed to users ------
private static final Set<String> builtinDatabases =
new HashSet<String>() {
{
add("template0");
add("template1");
}
};
private static final Set<String> builtinSchemas =
new HashSet<String>() {
{
add("pg_toast");
add("pg_temp_1");
add("pg_toast_temp_1");
add("pg_catalog");
add("information_schema");
}
};
protected final JdbcDialectTypeMapper dialectTypeMapper;
public PostgresCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl) {
this(
userClassLoader,
catalogName,
defaultDatabase,
username,
pwd,
baseUrl,
new PostgresTypeMapper());
}
protected PostgresCatalog(
ClassLoader userClassLoader,
String catalogName,
String defaultDatabase,
String username,
String pwd,
String baseUrl,
JdbcDialectTypeMapper dialectTypeMapper) {
super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
this.dialectTypeMapper = dialectTypeMapper;
}
// ------ databases ------
@Override
public List<String> listDatabases() throws CatalogException {
return extractColumnValuesBySQL(
defaultUrl,
"SELECT datname FROM pg_database;",
1,
dbName -> !builtinDatabases.contains(dbName));
}
// ------ schemas ------
protected Set<String> getBuiltinSchemas() {
return builtinSchemas;
}
// ------ tables ------
protected List<String> getPureTables(Connection conn, List<String> schemas)
throws SQLException {
List<String> tables = Lists.newArrayList();
// position 1 is database name, position 2 is schema name, position 3 is table name
try (PreparedStatement ps =
conn.prepareStatement(
"SELECT * FROM information_schema.tables "
+ "WHERE table_type = 'BASE TABLE' "
+ "AND table_schema = ? "
+ "ORDER BY table_type, table_name;")) {
for (String schema : schemas) {
// Column index 1 is database name, 2 is schema name, 3 is table name
extractColumnValuesByStatement(ps, 3, null, schema).stream()
.map(pureTable -> schema + "." + pureTable)
.forEach(tables::add);
}
return tables;
}
}
@Override
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
Preconditions.checkState(
StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(getName(), databaseName);
}
final String url = baseUrl + databaseName;
try (Connection conn = DriverManager.getConnection(url, username, pwd)) {
// get all schemas
List<String> schemas;
try (PreparedStatement ps =
conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;")) {
schemas =
extractColumnValuesByStatement(
ps, 1, pgSchema -> !getBuiltinSchemas().contains(pgSchema));
}
// get all tables
return getPureTables(conn, schemas);
} catch (Exception e) {
throw new CatalogException(
String.format("Failed to list tables for database %s", databaseName), e);
}
}
/**
* Converts Postgres type to Flink {@link DataType}.
*
* @see org.postgresql.jdbc.TypeInfoCache
*/
@Override
protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
throws SQLException {
return dialectTypeMapper.mapping(tablePath, metadata, colIndex);
}
@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
List<String> tables = null;
try {
tables = listTables(tablePath.getDatabaseName());
} catch (DatabaseNotExistException e) {
return false;
}
return tables.contains(getSchemaTableName(tablePath));
}
@Override
protected String getTableName(ObjectPath tablePath) {
return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgTableName();
}
@Override
protected String getSchemaName(ObjectPath tablePath) {
return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName();
}
@Override
protected String getSchemaTableName(ObjectPath tablePath) {
return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath();
}
}