blob: d9f3fb72649ce6a12410aa7cb494d99958202372 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog;
import org.apache.flink.table.api.CatalogAlreadyExistException;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.util.StringUtils;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.SchemaPlus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* CatalogManager manages all the registered ReadableCatalog instances in a table environment.
* It also has a concept of default catalog, which will be selected when a catalog name isn’t given
* in a meta-object reference.
*
* <p>CatalogManger also encapsulates Calcite’s schema framework such that no code outside CatalogManager
* needs to interact with Calcite’s schema except the parser which needs all catalogs. (All catalogs will
* be added to Calcite schema so that all external tables and tables can be resolved by Calcite during
* query parsing and analysis.)
*/
public class CatalogManager {
private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class);
public static final String BUILTIN_CATALOG_NAME = "builtin";
// The catalog to hold all registered and translated tables
// We disable caching here to prevent side effects
private CalciteSchema internalSchema = CalciteSchema.createRootSchema(true, false);
private SchemaPlus rootSchema = internalSchema.plus();
// A list of named catalogs.
private Map<String, ReadableCatalog> catalogs;
// The name of the default catalog and schema
private String defaultCatalogName;
private String defaultDbName;
public CatalogManager() {
LOG.info("Initializing CatalogManager");
catalogs = new HashMap<>();
FlinkInMemoryCatalog inMemoryCatalog = new FlinkInMemoryCatalog(BUILTIN_CATALOG_NAME);
catalogs.put(BUILTIN_CATALOG_NAME, inMemoryCatalog);
defaultCatalogName = BUILTIN_CATALOG_NAME;
defaultDbName = inMemoryCatalog.getDefaultDatabaseName();
CatalogCalciteSchema.registerCatalog(rootSchema, BUILTIN_CATALOG_NAME, inMemoryCatalog);
}
public void registerCatalog(String catalogName, ReadableCatalog catalog) throws CatalogAlreadyExistException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
checkNotNull(catalog, "catalog cannot be null");
if (catalogs.containsKey(catalogName)) {
throw new CatalogAlreadyExistException(catalogName);
}
catalogs.put(catalogName, catalog);
catalog.open();
CatalogCalciteSchema.registerCatalog(rootSchema, catalogName, catalog);
}
public ReadableCatalog getCatalog(String catalogName) throws CatalogNotExistException {
if (!catalogs.keySet().contains(catalogName)) {
throw new CatalogNotExistException(catalogName);
}
return catalogs.get(catalogName);
}
public Set<String> getCatalogs() {
return catalogs.keySet();
}
public void setDefaultCatalog(String catalogName) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
checkArgument(catalogs.keySet().contains(catalogName),
String.format("Cannot find registered catalog %s", catalogName));
if (!defaultCatalogName.equals(catalogName)) {
defaultCatalogName = catalogName;
defaultDbName = catalogs.get(catalogName).getDefaultDatabaseName();
LOG.info("Set default catalog as '{}' and default database as '{}'", defaultCatalogName, defaultDbName);
}
}
public ReadableCatalog getDefaultCatalog() {
return catalogs.get(defaultCatalogName);
}
public String getDefaultCatalogName() {
return defaultCatalogName;
}
public void setDefaultDatabase(String catalogName, String dbName) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
checkArgument(!StringUtils.isNullOrWhitespaceOnly(dbName), "dbName cannot be null or empty");
checkArgument(catalogs.containsKey(catalogName),
String.format("Cannot find registered catalog %s", catalogName));
checkArgument(catalogs.get(catalogName).listDatabases().contains(dbName),
String.format("Cannot find registered database %s", dbName));
defaultCatalogName = catalogName;
defaultDbName = dbName;
LOG.info("Set default catalog as '{}' and default database as '{}'", defaultCatalogName, defaultDbName);
}
public String getDefaultDatabaseName() {
return defaultDbName;
}
public SchemaPlus getRootSchema() {
return rootSchema;
}
/**
* Returns the full name of the given table name.
*
* @param paths Table paths whose format can be among "catalog.db.table", "db.table", or "table"
* @return An array of complete table path
*/
public String[] resolveTableName(String... paths) {
return resolveTableName(Arrays.asList(paths));
}
/**
* Returns the full name of the given table name.
*
* @param paths Table paths whose format can be among "catalog.db.table", "db.table", or "table"
* @return An array of complete table path
*/
public String[] resolveTableName(List<String> paths) {
checkNotNull(paths, "paths cannot be null");
checkArgument(paths.size() >= 1 && paths.size() <= 3, "paths length has to be between 1 and 3");
checkArgument(!paths.stream().anyMatch(p -> StringUtils.isNullOrWhitespaceOnly(p)),
"Paths contains null or while-space-only string");
if (paths.size() == 3) {
return new String[] {paths.get(0), paths.get(1), paths.get(2)};
}
String catalogName;
String dbName;
String tableName;
if (paths.size() == 1) {
catalogName = getDefaultCatalogName();
dbName = getDefaultDatabaseName();
tableName = paths.get(0);
} else {
catalogName = getDefaultCatalogName();
dbName = paths.get(0);
tableName = paths.get(1);
}
return new String[]{catalogName, dbName, tableName};
}
/**
* Returns the full name of the given table name.
*
* @param paths Table paths whose format can be among "catalog.db.table", "db.table", or "table"
* @return A string of complete table path
*/
public String resolveTableNameAsString(String[] paths) {
return String.join(".", resolveTableName(paths));
}
public List<List<String>> getCalciteReaderDefaultPaths(SchemaPlus defaultSchema) {
List<List<String>> paths = new ArrayList<>();
// Add both catalog and catalog.db, if there's a default db, as default schema paths
paths.add(new ArrayList<>(CalciteSchema.from(defaultSchema).path(getDefaultCatalogName())));
if (getDefaultDatabaseName() != null && defaultSchema.getSubSchema(getDefaultCatalogName()) != null) {
paths.add(new ArrayList<>(
CalciteSchema.from(defaultSchema.getSubSchema(getDefaultCatalogName())).path(getDefaultDatabaseName())));
}
return paths;
}
}