blob: 4ef5d959af2e741443edb9b3b4b6c5834d44d79e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.impala.analysis.ColumnDef;
import org.apache.impala.analysis.KuduPartitionParam;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TKuduPartitionByHashParam;
import org.apache.impala.thrift.TKuduPartitionByRangeParam;
import org.apache.impala.thrift.TKuduPartitionParam;
import org.apache.impala.thrift.TKuduTable;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTableType;
import org.apache.impala.util.KuduUtil;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.client.HiveMetastoreConfig;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.thrift.TException;
import com.codahale.metrics.Timer;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
/**
* Representation of a Kudu table in the catalog cache.
*/
public class KuduTable extends Table implements FeKuduTable {
// Alias to the string key that identifies the storage handler for Kudu tables.
public static final String KEY_STORAGE_HANDLER =
hive_metastoreConstants.META_TABLE_STORAGE;
// Key to access the table name from the table properties.
public static final String KEY_TABLE_NAME = "kudu.table_name";
// Key to access the table ID from the table properties.
public static final String KEY_TABLE_ID = "kudu.table_id";
// Key to access the columns used to build the (composite) key of the table.
// Deprecated - Used only for error checking.
public static final String KEY_KEY_COLUMNS = "kudu.key_columns";
// Key to access the master host from the table properties. Error handling for
// this string is done in the KuduClient library.
// TODO: Rename kudu.master_addresses to kudu.master_host will break compatibility
// with older versions.
public static final String KEY_MASTER_HOSTS = "kudu.master_addresses";
// Kudu specific value for the legacy storage handler table property keyed by
// KEY_STORAGE_HANDLER. This is expected to be deprecated eventually.
public static final String KUDU_LEGACY_STORAGE_HANDLER =
"com.cloudera.kudu.hive.KuduStorageHandler";
// Kudu specific value for the storage handler table property keyed by
// KEY_STORAGE_HANDLER.
public static final String KUDU_STORAGE_HANDLER =
"org.apache.hadoop.hive.kudu.KuduStorageHandler";
// Key to specify the number of tablet replicas.
public static final String KEY_TABLET_REPLICAS = "kudu.num_tablet_replicas";
// Table name in the Kudu storage engine. It may not neccessarily be the same as the
// table name specified in the CREATE TABLE statement; the latter
// is stored in Table.name_. Reasons why KuduTable.kuduTableName_ and Table.name_ may
// differ:
// 1. For managed tables, 'kuduTableName_' is prefixed with 'impala::<db_name>' to
// avoid conficts. TODO: Remove this when Kudu supports databases.
// 2. The user may specify a table name using the 'kudu.table_name' table property.
private String kuduTableName_;
// Comma separated list of Kudu master hosts with optional ports.
private String kuduMasters_;
// Primary key column names, the column names are all in lower case.
private final List<String> primaryKeyColumnNames_ = new ArrayList<>();
// Partitioning schemes of this Kudu table. Both range and hash-based partitioning are
// supported.
private List<KuduPartitionParam> partitionBy_;
// Schema of the underlying Kudu table.
private org.apache.kudu.Schema kuduSchema_;
protected KuduTable(org.apache.hadoop.hive.metastore.api.Table msTable,
Db db, String name, String owner) {
super(msTable, db, name, owner);
kuduTableName_ = msTable.getParameters().get(KuduTable.KEY_TABLE_NAME);
kuduMasters_ = msTable.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
}
/**
* A synchronized Kudu table is a table where operations like (drop, rename)
* on the table are pushed down to Kudu along with HMS. This method returns
* true if given metastore table is a synchronized Kudu table. For older HMS
* versions (HMS 2 and below) this is just checking if the table is managed or not.
* However, since HIVE-22158 this also checks if the external table has
* <code>external.table.purge</code> property set to true. After HIVE-22158 HMS
* transforms a managed table into external table if it is not transactional and sets
* <code>external.table.purge</code> to true to indicate that table data will be
* dropped when it is dropped. From the perspective of
* Impala, if a Kudu table has <code>external.table.purge</code> set to true and it
* is an external HMS table, it should treat it like a managed table so the user facing
* behavior is not changed when compared to previous versions of HMS.
*
* A table is synchronized table if its Managed table or if its a external table with
* <code>external.table.purge</code> property set to true.
*/
public static boolean isSynchronizedTable(
org.apache.hadoop.hive.metastore.api.Table msTbl) {
Preconditions.checkState(isKuduTable(msTbl));
// HIVE-22158: A translated table can have external purge property set to true
// in such case we sync operations in Impala and Kudu
// it is possible that in older versions of HMS a managed Kudu table is present
return isManagedTable(msTbl) || (isExternalTable(msTbl) && Boolean
.parseBoolean(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE_PURGE)));
}
/**
* Returns if this metastore table has managed table type
*/
private static boolean isManagedTable(
org.apache.hadoop.hive.metastore.api.Table msTbl) {
return msTbl.getTableType().equalsIgnoreCase(TableType.MANAGED_TABLE.toString());
}
/**
* Returns if the given HMS table is external table or not based on table type or table
* properties. Implementation is based on org.apache.hadoop.hive.metastore.utils
* .MetaStoreUtils.isExternalTable()
*/
public static boolean isExternalTable(
org.apache.hadoop.hive.metastore.api.Table msTbl) {
// HIVE-19253: table property can also indicate an external table.
return (msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString()) ||
("TRUE").equalsIgnoreCase(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE)));
}
@Override
public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; }
@Override
public String getStorageHandlerClassName() { return KUDU_STORAGE_HANDLER; }
/**
* Returns the columns in the order they have been created
*/
@Override
public List<Column> getColumnsInHiveOrder() { return getColumns(); }
public static boolean isKuduStorageHandler(String handler) {
return handler != null && (handler.equals(KUDU_LEGACY_STORAGE_HANDLER) ||
handler.equals(KUDU_STORAGE_HANDLER));
}
public static boolean isKuduTable(org.apache.hadoop.hive.metastore.api.Table msTbl) {
return isKuduStorageHandler(msTbl.getParameters().get(KEY_STORAGE_HANDLER));
}
@Override
public String getKuduTableName() { return kuduTableName_; }
@Override
public String getKuduMasterHosts() { return kuduMasters_; }
public org.apache.kudu.Schema getKuduSchema() { return kuduSchema_; }
@Override
public List<String> getPrimaryKeyColumnNames() {
return ImmutableList.copyOf(primaryKeyColumnNames_);
}
@Override
public List<KuduPartitionParam> getPartitionBy() {
Preconditions.checkState(partitionBy_ != null);
return ImmutableList.copyOf(partitionBy_);
}
/**
* Get the Hive Metastore configuration from Kudu masters.
*/
private static HiveMetastoreConfig getHiveMetastoreConfig(String kuduMasters)
throws ImpalaRuntimeException {
Preconditions.checkNotNull(kuduMasters);
Preconditions.checkArgument(!kuduMasters.isEmpty());
KuduClient kuduClient = KuduUtil.getKuduClient(kuduMasters);
HiveMetastoreConfig hmsConfig;
try {
hmsConfig = kuduClient.getHiveMetastoreConfig();
} catch (KuduException e) {
throw new ImpalaRuntimeException(
String.format("Error determining if Kudu's integration with " +
"the Hive Metastore is enabled: %s", e.getMessage()));
}
return hmsConfig;
}
/**
* Check with Kudu master to see if Kudu's integration with the Hive Metastore
* is enabled.
*/
public static boolean isHMSIntegrationEnabled(String kuduMasters)
throws ImpalaRuntimeException {
return getHiveMetastoreConfig(kuduMasters) != null;
}
/**
* Check with the Kudu master to see if its Kudu-HMS integration is enabled;
* if so, validate that it is integrated with the same Hive Metastore that
* Impala is configured to use.
*/
public static boolean isHMSIntegrationEnabledAndValidate(String kuduMasters,
String hmsUris) throws ImpalaRuntimeException {
Preconditions.checkNotNull(hmsUris);
Preconditions.checkArgument(!hmsUris.isEmpty());
HiveMetastoreConfig hmsConfig = getHiveMetastoreConfig(kuduMasters);
if (hmsConfig == null) {
return false;
}
// Validate Kudu is configured to use the same HMS as Impala. We consider
// it is the case as long as Kudu and Impala are configured to talk to
// the HMS with the same host address(es).
final String kuduHmsUris = hmsConfig.getHiveMetastoreUris();
Set<String> hmsHosts;
Set<String> kuduHmsHosts;
try {
hmsHosts = parseHosts(hmsUris);
kuduHmsHosts = parseHosts(kuduHmsUris);
} catch (URISyntaxException e) {
throw new ImpalaRuntimeException(
String.format("Error parsing URI: %s", e.getMessage()));
}
if (hmsHosts != null && kuduHmsHosts != null && hmsHosts.equals(kuduHmsHosts)) {
return true;
}
throw new ImpalaRuntimeException(
String.format("Kudu is integrated with a different Hive Metastore " +
"than that used by Impala, Kudu is configured to use the HMS: " +
"%s, while Impala is configured to use the HMS: %s",
kuduHmsUris, hmsUris));
}
/**
* Parse the given URIs and return a set of hosts in the URIs.
*/
private static Set<String> parseHosts(String uris) throws URISyntaxException {
String[] urisString = uris.split(",");
Set<String> parsedHosts = new HashSet<>();
for (String s : urisString) {
s.trim();
URI tmpUri = new URI(s);
parsedHosts.add(tmpUri.getHost());
}
return parsedHosts;
}
/**
* Load schema and partitioning schemes directly from Kudu.
*/
public void loadSchemaFromKudu() throws ImpalaRuntimeException {
// This is set to 0 for Kudu tables.
// TODO: Change this to reflect the number of pk columns and modify all the
// places (e.g. insert stmt) that currently make use of this parameter.
numClusteringCols_ = 0;
org.apache.kudu.client.KuduTable kuduTable = null;
// Connect to Kudu to retrieve table metadata
KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts());
try {
kuduTable = kuduClient.openTable(kuduTableName_);
} catch (KuduException e) {
throw new ImpalaRuntimeException(
String.format("Error opening Kudu table '%s', Kudu error: %s", kuduTableName_,
e.getMessage()));
}
Preconditions.checkNotNull(kuduTable);
loadSchema(kuduTable);
Preconditions.checkState(!colsByPos_.isEmpty());
partitionBy_ = Utils.loadPartitionByParams(kuduTable);
}
/**
* Loads the metadata of a Kudu table.
*
* Schema and partitioning schemes are loaded directly from Kudu whereas column stats
* are loaded from HMS. The function also updates the table schema in HMS in order to
* propagate alterations made to the Kudu table to HMS.
*/
@Override
public void load(boolean dummy /* not used */, IMetaStoreClient msClient,
org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
throws TableLoadingException {
final Timer.Context context =
getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
try {
// Copy the table to check later if anything has changed.
msTable_ = msTbl.deepCopy();
kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME);
kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
if (kuduMasters_ == null || kuduMasters_.isEmpty()) {
throw new TableLoadingException("No " + KuduTable.KEY_MASTER_HOSTS +
" property found for Kudu table " + kuduTableName_);
}
setTableStats(msTable_);
// Load metadata from Kudu
final Timer.Context ctxStorageLdTime =
getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA).time();
try {
loadSchemaFromKudu();
} catch (ImpalaRuntimeException e) {
throw new TableLoadingException("Error loading metadata for Kudu table " +
kuduTableName_, e);
} finally {
storageMetadataLoadTime_ = ctxStorageLdTime.stop();
}
// Load from HMS
loadAllColumnStats(msClient);
refreshLastUsedTime();
// Avoid updating HMS if the schema didn't change.
if (msTable_.equals(msTbl)) return;
// Update the table schema in HMS.
try {
// HMS would fill this table property if it was not set, but as metadata written
// with alter_table is not read back in case of Kudu tables, it has to be set here
// to ensure that HMS and catalogd have the same timestamp.
updateTimestampProperty(msTable_, TBL_PROP_LAST_DDL_TIME);
msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS,
StatsSetupConst.TRUE);
msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_);
} catch (TException e) {
throw new TableLoadingException(e.getMessage());
}
} finally {
context.stop();
}
}
/**
* Loads the schema from the Kudu table including column definitions and primary key
* columns. Replaces the columns in the HMS table with the columns from the Kudu table.
* Throws an ImpalaRuntimeException if Kudu column data types cannot be mapped to
* Impala data types.
*/
private void loadSchema(org.apache.kudu.client.KuduTable kuduTable)
throws ImpalaRuntimeException {
Preconditions.checkNotNull(kuduTable);
clearColumns();
primaryKeyColumnNames_.clear();
boolean isHMSIntegrationEnabled = KuduTable.isHMSIntegrationEnabled(kuduMasters_);
List<FieldSchema> cols = msTable_.getSd().getCols();
if (!isHMSIntegrationEnabled) {
cols.clear();
}
int pos = 0;
kuduSchema_ = kuduTable.getSchema();
for (ColumnSchema colSchema: kuduSchema_.getColumns()) {
KuduColumn kuduCol = KuduColumn.fromColumnSchema(colSchema, pos);
Preconditions.checkNotNull(kuduCol);
// Only update the HMS column definition when Kudu/HMS integration
// is disabled.
if (!isHMSIntegrationEnabled) {
cols.add(new FieldSchema(kuduCol.getName(),
kuduCol.getType().toSql().toLowerCase(), null));
}
if (kuduCol.isKey()) primaryKeyColumnNames_.add(kuduCol.getName());
addColumn(kuduCol);
++pos;
}
}
/**
* Creates a temporary KuduTable object populated with the specified properties but has
* an invalid TableId and is not added to the Kudu storage engine or the
* HMS. This is used for CTAS statements.
*/
public static KuduTable createCtasTarget(Db db,
org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs,
List<ColumnDef> primaryKeyColumnDefs, List<KuduPartitionParam> partitionParams) {
KuduTable tmpTable = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
int pos = 0;
for (ColumnDef colDef: columnDefs) {
tmpTable.addColumn(new Column(colDef.getColName(), colDef.getType(), pos++));
}
for (ColumnDef pkColDef: primaryKeyColumnDefs) {
tmpTable.primaryKeyColumnNames_.add(pkColDef.getColName());
}
tmpTable.partitionBy_ = ImmutableList.copyOf(partitionParams);
return tmpTable;
}
@Override
public TTable toThrift() {
TTable table = super.toThrift();
table.setTable_type(TTableType.KUDU_TABLE);
table.setKudu_table(getTKuduTable());
return table;
}
@Override
protected void loadFromThrift(TTable thriftTable) throws TableLoadingException {
super.loadFromThrift(thriftTable);
TKuduTable tkudu = thriftTable.getKudu_table();
kuduTableName_ = tkudu.getTable_name();
kuduMasters_ = Joiner.on(',').join(tkudu.getMaster_addresses());
primaryKeyColumnNames_.clear();
primaryKeyColumnNames_.addAll(tkudu.getKey_columns());
partitionBy_ = loadPartitionByParamsFromThrift(tkudu.getPartition_by());
}
private static List<KuduPartitionParam> loadPartitionByParamsFromThrift(
List<TKuduPartitionParam> params) {
List<KuduPartitionParam> ret= new ArrayList<>();
for (TKuduPartitionParam param: params) {
if (param.isSetBy_hash_param()) {
TKuduPartitionByHashParam hashParam = param.getBy_hash_param();
ret.add(KuduPartitionParam.createHashParam(
hashParam.getColumns(), hashParam.getNum_partitions()));
} else {
Preconditions.checkState(param.isSetBy_range_param());
TKuduPartitionByRangeParam rangeParam = param.getBy_range_param();
ret.add(KuduPartitionParam.createRangeParam(rangeParam.getColumns(),
null));
}
}
return ret;
}
@Override
public TTableDescriptor toThriftDescriptor(int tableId,
Set<Long> referencedPartitions) {
TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.KUDU_TABLE,
getTColumnDescriptors(), numClusteringCols_, name_, db_.getName());
desc.setKuduTable(getTKuduTable());
return desc;
}
private TKuduTable getTKuduTable() {
TKuduTable tbl = new TKuduTable();
tbl.setKey_columns(Preconditions.checkNotNull(primaryKeyColumnNames_));
tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(",")));
tbl.setTable_name(kuduTableName_);
Preconditions.checkNotNull(partitionBy_);
// IMPALA-5154: partitionBy_ may be empty if Kudu table created outside Impala,
// partition_by must be explicitly created because the field is required.
tbl.partition_by = new ArrayList<>();
for (KuduPartitionParam partitionParam: partitionBy_) {
tbl.addToPartition_by(partitionParam.toThrift());
}
return tbl;
}
}