blob: b739185a624b4efe250f5f6353d2a766e83aeb47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog.hive;
import org.apache.flink.table.api.DatabaseAlreadyExistException;
import org.apache.flink.table.api.DatabaseNotExistException;
import org.apache.flink.table.api.TableAlreadyExistException;
import org.apache.flink.table.api.TableNotExistException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.exceptions.PartitionAlreadyExistException;
import org.apache.flink.table.api.exceptions.PartitionNotExistException;
import org.apache.flink.table.api.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ReadableWritableCatalog;
import org.apache.flink.table.plan.stats.ColumnStats;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.parquet.Strings;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A catalog that connects to Hive MetaStore and reads/writes Hive tables.
*/
public class HiveCatalog implements ReadableWritableCatalog {
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
private static final String DEFAULT_DB = "default";
private final String catalogName;
private String defaultDatabaseName = DEFAULT_DB;
private HiveConf hiveConf;
private IMetaStoreClient client;
public HiveCatalog(String catalogName, String hiveMetastoreURI) {
this(catalogName, getHiveConf(hiveMetastoreURI));
}
public HiveCatalog(String catalogName, HiveConf hiveConf) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty");
this.catalogName = catalogName;
this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null");
LOG.info("Created HiveCatalog '{}'", catalogName);
}
private static HiveConf getHiveConf(String hiveMetastoreURI) {
checkArgument(!Strings.isNullOrEmpty(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty");
HiveConf hiveConf = new HiveConf();
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI);
return hiveConf;
}
private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
try {
return RetryingMetaStoreClient.getProxy(
hiveConf,
null,
null,
HiveMetaStoreClient.class.getName(),
true);
} catch (MetaException e) {
throw new FlinkHiveException("Failed to create Hive metastore client", e);
}
}
@Override
public String getDefaultDatabaseName() {
return defaultDatabaseName;
}
@Override
public void setDefaultDatabaseName(String databaseName) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
defaultDatabaseName = databaseName;
}
@Override
public void open() {
if (client == null) {
client = getMetastoreClient(hiveConf);
LOG.info("Connect to Hive metastore");
}
}
@Override
public void close() {
if (client != null) {
client.close();
client = null;
LOG.info("Close connection to Hive metastore");
}
}
// ------ tables ------
@Override
public CatalogTable getTable(ObjectPath path) throws TableNotExistException {
Table hiveTable = getHiveTable(path);
TableSchema tableSchema = HiveMetadataUtil.createTableSchema(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
// Get the column statistics for a set of columns in a table. This only works for non-partitioned tables.
Map<String, ColumnStats> colStats = new HashMap<>();
if (hiveTable.getPartitionKeysSize() == 0) {
colStats = HiveMetadataUtil.createColumnStats(
getHiveTableColumnStats(path.getDbName(), path.getObjectName(), Arrays.asList(tableSchema.getFieldNames())));
}
TableStats tableStats = new TableStats(getRowCount(hiveTable), colStats);
CatalogTable catalogTable = HiveMetadataUtil.createCatalogTable(hiveTable, tableSchema, tableStats);
catalogTable.getProperties().put(HiveConf.ConfVars.METASTOREURIS.varname,
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
return catalogTable;
}
/**
* Get the column statistics for a set of columns in a table. This only works for non-partitioned tables.
*/
private List<ColumnStatisticsObj> getHiveTableColumnStats(String dbName, String tableName, List<String> cols) {
try {
return client.getTableColumnStatistics(dbName, tableName, cols);
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to get table column stats for columns %s of table %s", cols,
new ObjectPath(dbName, tableName)));
}
}
private Table getHiveTable(ObjectPath path) throws TableNotExistException {
try {
return client.getTable(path.getDbName(), path.getObjectName());
} catch (NoSuchObjectException e) {
throw new TableNotExistException(catalogName, path.getFullName());
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to get table %s", path.getFullName()), e);
}
}
private Long getRowCount(Table hiveTable) {
if (hiveTable.getParameters().get(StatsSetupConst.ROW_COUNT) != null) {
return Math.max(0L, Long.parseLong(hiveTable.getParameters().get(StatsSetupConst.ROW_COUNT)));
} else {
return 0L;
}
}
@Override
public List<ObjectPath> listTables(String dbName) throws DatabaseNotExistException {
try {
return client.getAllTables(dbName).stream()
.map(t -> new ObjectPath(dbName, t))
.collect(Collectors.toList());
} catch (UnknownDBException e) {
throw new DatabaseNotExistException(catalogName, dbName);
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to list tables in database %s", dbName), e);
}
}
@Override
public List<ObjectPath> listAllTables() {
List<String> dbs = listDatabases();
List<ObjectPath> result = new ArrayList<>();
for (String db : dbs) {
result.addAll(listTables(db));
}
return result;
}
@Override
public void createTable(ObjectPath path, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
try {
if (tableExists(path)) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(catalogName, path.getFullName());
}
} else {
// Testing shows that createTable() API in Hive 2.3.4 doesn't throw UnknownDBException as it claims
// Thus we have to manually check if the db exists or not
if (!dbExists(path.getDbName())) {
throw new DatabaseNotExistException(catalogName, path.getDbName());
}
client.createTable(HiveMetadataUtil.createHiveTable(path, table));
}
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to create table %s", path.getFullName()), e);
}
}
@Override
public void dropTable(ObjectPath path, boolean ignoreIfNotExists) throws TableNotExistException {
try {
client.dropTable(path.getDbName(), path.getObjectName(), true, ignoreIfNotExists);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, path.getFullName());
}
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to drop table %s", path.getFullName()), e);
}
}
@Override
public void alterTable(ObjectPath path, CatalogTable newTable, boolean ignoreIfNotExists) throws TableNotExistException {
try {
if (tableExists(path)) {
client.alter_table(path.getDbName(), path.getObjectName(), HiveMetadataUtil.createHiveTable(path, newTable));
} else if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, path.getFullName());
}
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to alter table %s", path.getFullName()), e);
}
}
@Override
public void renameTable(ObjectPath tableName, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, DatabaseNotExistException {
// Hive metastore client doesn't support renaming yet
throw new UnsupportedOperationException();
}
@Override
public boolean tableExists(ObjectPath path) {
try {
return client.tableExists(path.getDbName(), path.getObjectName());
} catch (UnknownDBException e) {
return false;
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to check if table %s exists in database %s", path.getObjectName(), path.getDbName()), e);
}
}
// ------ table and column stats ------
@Override
public TableStats getTableStats(ObjectPath path) throws TableNotExistException {
Table hiveTable = getHiveTable(path);
// Get the column statistics for a set of columns in a table. This only works for non-partitioned tables.
// For partitioned tables, get partition columns stats from HiveCatalog.getPartition()
Map<String, ColumnStats> colStats = new HashMap<>();
if (!isTablePartitioned(hiveTable)) {
colStats = HiveMetadataUtil.createColumnStats(
getHiveTableColumnStats(
path.getDbName(),
path.getObjectName(),
hiveTable.getSd().getCols().stream()
.map(fs -> fs.getName())
.collect(Collectors.toList())
));
}
return new TableStats(getRowCount(hiveTable), colStats);
}
@Override
public void alterTableStats(ObjectPath path, TableStats newtTableStats, boolean ignoreIfNotExists) throws TableNotExistException {
try {
Table hiveTable = getHiveTable(path);
// Set table column stats. This only works for non-partitioned tables.
if (!isTablePartitioned(hiveTable)) {
client.updateTableColumnStatistics(HiveMetadataUtil.createColumnStats(hiveTable, newtTableStats.colStats()));
}
// Set table stats
String oldRowCount = hiveTable.getParameters().getOrDefault(StatsSetupConst.ROW_COUNT, "0");
if (newtTableStats.rowCount() != Long.parseLong(oldRowCount)) {
hiveTable.getParameters().put(StatsSetupConst.ROW_COUNT, String.valueOf(newtTableStats.rowCount()));
client.alter_table(path.getDbName(), path.getObjectName(), hiveTable);
}
} catch (TableNotExistException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, path.getFullName());
}
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to alter table stats of table %s", path.getFullName()), e);
}
}
// ------ databases ------
@Override
public void createDatabase(String dbName, CatalogDatabase db, boolean ignoreIfExists) throws DatabaseAlreadyExistException {
try {
client.createDatabase(HiveMetadataUtil.createHiveDatabase(dbName, db));
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new DatabaseAlreadyExistException(catalogName, dbName);
}
} catch (TException e) {
throw new FlinkHiveException(String.format("Failed to create database %s", dbName), e);
}
}
@Override
public void dropDatabase(String dbName, boolean ignoreIfNotExists) throws DatabaseNotExistException {
try {
client.dropDatabase(dbName, true, ignoreIfNotExists);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(catalogName, dbName);
}
} catch (TException e) {
throw new FlinkHiveException(String.format("Failed to drop database %s", dbName), e);
}
}
@Override
public void alterDatabase(String dbName, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException {
try {
if (dbExists(dbName)) {
client.alterDatabase(dbName, HiveMetadataUtil.createHiveDatabase(dbName, newDatabase));
} else if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(catalogName, dbName);
}
} catch (TException e) {
throw new FlinkHiveException(String.format("Failed to alter database %s", dbName), e);
}
}
@Override
public void renameDatabase(String dbName, String newDbName, boolean ignoreIfNotExists) throws DatabaseNotExistException {
// Hive metastore client doesn't support renaming yet
throw new UnsupportedOperationException();
}
@Override
public List<String> listDatabases() {
try {
return client.getAllDatabases();
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to list all databases in HiveCatalog %s", catalogName));
}
}
@Override
public CatalogDatabase getDatabase(String dbName) throws DatabaseNotExistException {
Database hiveDb;
try {
hiveDb = client.getDatabase(dbName);
} catch (NoSuchObjectException e) {
throw new DatabaseNotExistException(catalogName, dbName);
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to get database %s from HiveCatalog %s", dbName, catalogName), e);
}
return HiveMetadataUtil.createCatalogDatabase(hiveDb);
}
@Override
public boolean dbExists(String dbName) {
try {
return client.getDatabase(dbName) != null;
} catch (NoSuchObjectException e) {
return false;
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to get database %s", dbName), e);
}
}
// ------ partitions ------
@Override
public void createPartition(ObjectPath path, CatalogPartition partition, boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException, PartitionAlreadyExistException {
Table hiveTable = getHiveTable(path);
if (hiveTable.getPartitionKeysSize() == 0) {
throw new TableNotPartitionedException(catalogName, path);
}
try {
client.add_partition(
HiveMetadataUtil.createHivePartition(hiveTable, partition));
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new PartitionAlreadyExistException(catalogName, path, partition.getPartitionSpec());
}
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to create partition %s of table %s", partition.getPartitionSpec(), path));
}
}
@Override
public void dropPartition(ObjectPath path, CatalogPartition.PartitionSpec partitionSpec, boolean ignoreIfNotExists)
throws TableNotExistException, TableNotPartitionedException, PartitionNotExistException {
Table hiveTable = getHiveTable(path);
if (!isTablePartitioned(hiveTable)) {
throw new TableNotPartitionedException(catalogName, path);
}
try {
client.dropPartition(path.getDbName(), path.getObjectName(), getOrderedPartitionValues(hiveTable, partitionSpec), true);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
throw new PartitionNotExistException(catalogName, path, partitionSpec);
}
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to drop partition %s of table %s", partitionSpec, path));
}
}
@Override
public void alterPartition(ObjectPath path, CatalogPartition newPartition, boolean ignoreIfNotExists)
throws TableNotExistException, TableNotPartitionedException, PartitionNotExistException {
Table hiveTable = getHiveTable(path);
if (hiveTable.getPartitionKeysSize() == 0) {
throw new TableNotPartitionedException(catalogName, path);
}
// Explicitly check if the parititon exists or not
// because alter_partition() doesn't throw NoSuchObjectException like dropPartition() when the target doesn't exist
if (partitionExists(path, newPartition.getPartitionSpec())) {
try {
client.alter_partition(
path.getDbName(),
path.getObjectName(),
HiveMetadataUtil.createHivePartition(hiveTable, newPartition)
);
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to alter existing partition with new partition %s of table %s", newPartition.getPartitionSpec(), path), e);
}
} else if (!ignoreIfNotExists) {
throw new PartitionNotExistException(catalogName, path, newPartition.getPartitionSpec());
}
}
@Override
public List<CatalogPartition.PartitionSpec> listPartitions(ObjectPath path)
throws TableNotExistException, TableNotPartitionedException {
Table hiveTable = getHiveTable(path);
if (!isTablePartitioned(hiveTable)) {
throw new TableNotPartitionedException(catalogName, path);
}
try {
return client.listPartitionNames(path.getDbName(), path.getObjectName(), (short) -1).stream()
.map(n -> HiveMetadataUtil.createPartitionSpec(n))
.collect(Collectors.toList());
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to list partitions of table %s", path), e);
}
}
@Override
public List<CatalogPartition.PartitionSpec> listPartitions(ObjectPath path, CatalogPartition.PartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException {
Table hiveTable = getHiveTable(path);
if (!isTablePartitioned(hiveTable)) {
throw new TableNotPartitionedException(catalogName, path);
}
try {
return client.listPartitionNames(path.getDbName(), path.getObjectName(), getOrderedPartitionValues(hiveTable, partitionSpec), (short) -1).stream()
.map(n -> HiveMetadataUtil.createPartitionSpec(n))
.collect(Collectors.toList());
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to list partitions of table %s", path), e);
}
}
@Override
public CatalogPartition getPartition(ObjectPath path, CatalogPartition.PartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException, PartitionNotExistException {
Table hiveTable = getHiveTable(path);
if (!isTablePartitioned(hiveTable)) {
throw new TableNotPartitionedException(catalogName, path);
}
try {
return HiveMetadataUtil.createCatalogPartition(
partitionSpec,
client.getPartition(path.getDbName(), path.getObjectName(), getOrderedPartitionValues(hiveTable, partitionSpec)));
} catch (NoSuchObjectException e) {
throw new PartitionNotExistException(catalogName, path, partitionSpec);
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to get partition %s of table %s", partitionSpec, path), e);
}
}
@Override
public boolean partitionExists(ObjectPath path, CatalogPartition.PartitionSpec partitionSpec) {
try {
Table hiveTable = getHiveTable(path);
return client.getPartition(path.getDbName(), path.getObjectName(), getOrderedPartitionValues(hiveTable, partitionSpec)) != null;
} catch (NoSuchObjectException | TableNotExistException e) {
return false;
} catch (TException e) {
throw new FlinkHiveException(
String.format("Failed to get partition %s of table %s", partitionSpec, path), e);
}
}
private boolean isTablePartitioned(Table hiveTable) throws TableNotExistException {
return hiveTable.getPartitionKeysSize() != 0;
}
private List<String> getOrderedPartitionValues(Table hiveTable, CatalogPartition.PartitionSpec partitionSpec) {
return partitionSpec.getOrderedValues(HiveMetadataUtil.getPartitionKeys(hiveTable.getPartitionKeys()));
}
}