| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.catalog; |
| |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.hadoop.hive.common.StatsSetupConst; |
| import org.apache.hadoop.hive.metastore.IMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.TableType; |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; |
| import org.apache.impala.analysis.ColumnDef; |
| import org.apache.impala.analysis.KuduPartitionParam; |
| import org.apache.impala.common.ImpalaRuntimeException; |
| import org.apache.impala.thrift.TCatalogObjectType; |
| import org.apache.impala.thrift.TKuduPartitionByHashParam; |
| import org.apache.impala.thrift.TKuduPartitionByRangeParam; |
| import org.apache.impala.thrift.TKuduPartitionParam; |
| import org.apache.impala.thrift.TKuduTable; |
| import org.apache.impala.thrift.TTable; |
| import org.apache.impala.thrift.TTableDescriptor; |
| import org.apache.impala.thrift.TTableType; |
| import org.apache.impala.util.KuduUtil; |
| import org.apache.kudu.ColumnSchema; |
| import org.apache.kudu.client.HiveMetastoreConfig; |
| import org.apache.kudu.client.KuduClient; |
| import org.apache.kudu.client.KuduException; |
| import org.apache.thrift.TException; |
| |
| import com.codahale.metrics.Timer; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| |
| /** |
| * Representation of a Kudu table in the catalog cache. |
| */ |
| public class KuduTable extends Table implements FeKuduTable { |
| |
| // Alias to the string key that identifies the storage handler for Kudu tables. |
| public static final String KEY_STORAGE_HANDLER = |
| hive_metastoreConstants.META_TABLE_STORAGE; |
| |
| // Key to access the table name from the table properties. |
| public static final String KEY_TABLE_NAME = "kudu.table_name"; |
| |
| // Key to access the table ID from the table properties. |
| public static final String KEY_TABLE_ID = "kudu.table_id"; |
| |
| // Key to access the columns used to build the (composite) key of the table. |
| // Deprecated - Used only for error checking. |
| public static final String KEY_KEY_COLUMNS = "kudu.key_columns"; |
| |
| // Key to access the master host from the table properties. Error handling for |
| // this string is done in the KuduClient library. |
| // TODO: Rename kudu.master_addresses to kudu.master_host will break compatibility |
| // with older versions. |
| public static final String KEY_MASTER_HOSTS = "kudu.master_addresses"; |
| |
| // Kudu specific value for the legacy storage handler table property keyed by |
| // KEY_STORAGE_HANDLER. This is expected to be deprecated eventually. |
| public static final String KUDU_LEGACY_STORAGE_HANDLER = |
| "com.cloudera.kudu.hive.KuduStorageHandler"; |
| |
| // Kudu specific value for the storage handler table property keyed by |
| // KEY_STORAGE_HANDLER. |
| public static final String KUDU_STORAGE_HANDLER = |
| "org.apache.hadoop.hive.kudu.KuduStorageHandler"; |
| |
| // Key to specify the number of tablet replicas. |
| public static final String KEY_TABLET_REPLICAS = "kudu.num_tablet_replicas"; |
| |
| // Table name in the Kudu storage engine. It may not neccessarily be the same as the |
| // table name specified in the CREATE TABLE statement; the latter |
| // is stored in Table.name_. Reasons why KuduTable.kuduTableName_ and Table.name_ may |
| // differ: |
| // 1. For managed tables, 'kuduTableName_' is prefixed with 'impala::<db_name>' to |
| // avoid conficts. TODO: Remove this when Kudu supports databases. |
| // 2. The user may specify a table name using the 'kudu.table_name' table property. |
| private String kuduTableName_; |
| |
| // Comma separated list of Kudu master hosts with optional ports. |
| private String kuduMasters_; |
| |
| // Primary key column names, the column names are all in lower case. |
| private final List<String> primaryKeyColumnNames_ = new ArrayList<>(); |
| |
| // Partitioning schemes of this Kudu table. Both range and hash-based partitioning are |
| // supported. |
| private List<KuduPartitionParam> partitionBy_; |
| |
| // Schema of the underlying Kudu table. |
| private org.apache.kudu.Schema kuduSchema_; |
| |
| protected KuduTable(org.apache.hadoop.hive.metastore.api.Table msTable, |
| Db db, String name, String owner) { |
| super(msTable, db, name, owner); |
| kuduTableName_ = msTable.getParameters().get(KuduTable.KEY_TABLE_NAME); |
| kuduMasters_ = msTable.getParameters().get(KuduTable.KEY_MASTER_HOSTS); |
| } |
| |
| /** |
| * A synchronized Kudu table is a table where operations like (drop, rename) |
| * on the table are pushed down to Kudu along with HMS. This method returns |
| * true if given metastore table is a synchronized Kudu table. For older HMS |
| * versions (HMS 2 and below) this is just checking if the table is managed or not. |
| * However, since HIVE-22158 this also checks if the external table has |
| * <code>external.table.purge</code> property set to true. After HIVE-22158 HMS |
| * transforms a managed table into external table if it is not transactional and sets |
| * <code>external.table.purge</code> to true to indicate that table data will be |
| * dropped when it is dropped. From the perspective of |
| * Impala, if a Kudu table has <code>external.table.purge</code> set to true and it |
| * is an external HMS table, it should treat it like a managed table so the user facing |
| * behavior is not changed when compared to previous versions of HMS. |
| * |
| * A table is synchronized table if its Managed table or if its a external table with |
| * <code>external.table.purge</code> property set to true. |
| */ |
| public static boolean isSynchronizedTable( |
| org.apache.hadoop.hive.metastore.api.Table msTbl) { |
| Preconditions.checkState(isKuduTable(msTbl)); |
| // HIVE-22158: A translated table can have external purge property set to true |
| // in such case we sync operations in Impala and Kudu |
| // it is possible that in older versions of HMS a managed Kudu table is present |
| return isManagedTable(msTbl) || (isExternalTable(msTbl) && Boolean |
| .parseBoolean(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE_PURGE))); |
| } |
| |
| /** |
| * Returns if this metastore table has managed table type |
| */ |
| private static boolean isManagedTable( |
| org.apache.hadoop.hive.metastore.api.Table msTbl) { |
| return msTbl.getTableType().equalsIgnoreCase(TableType.MANAGED_TABLE.toString()); |
| } |
| |
| /** |
| * Returns if the given HMS table is external table or not based on table type or table |
| * properties. Implementation is based on org.apache.hadoop.hive.metastore.utils |
| * .MetaStoreUtils.isExternalTable() |
| */ |
| public static boolean isExternalTable( |
| org.apache.hadoop.hive.metastore.api.Table msTbl) { |
| // HIVE-19253: table property can also indicate an external table. |
| return (msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString()) || |
| ("TRUE").equalsIgnoreCase(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE))); |
| } |
| |
| @Override |
| public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; } |
| |
| @Override |
| public String getStorageHandlerClassName() { return KUDU_STORAGE_HANDLER; } |
| |
| /** |
| * Returns the columns in the order they have been created |
| */ |
| @Override |
| public List<Column> getColumnsInHiveOrder() { return getColumns(); } |
| |
| public static boolean isKuduStorageHandler(String handler) { |
| return handler != null && (handler.equals(KUDU_LEGACY_STORAGE_HANDLER) || |
| handler.equals(KUDU_STORAGE_HANDLER)); |
| } |
| |
| public static boolean isKuduTable(org.apache.hadoop.hive.metastore.api.Table msTbl) { |
| return isKuduStorageHandler(msTbl.getParameters().get(KEY_STORAGE_HANDLER)); |
| } |
| |
| @Override |
| public String getKuduTableName() { return kuduTableName_; } |
| @Override |
| public String getKuduMasterHosts() { return kuduMasters_; } |
| |
| public org.apache.kudu.Schema getKuduSchema() { return kuduSchema_; } |
| |
| @Override |
| public List<String> getPrimaryKeyColumnNames() { |
| return ImmutableList.copyOf(primaryKeyColumnNames_); |
| } |
| |
| @Override |
| public List<KuduPartitionParam> getPartitionBy() { |
| Preconditions.checkState(partitionBy_ != null); |
| return ImmutableList.copyOf(partitionBy_); |
| } |
| |
| /** |
| * Get the Hive Metastore configuration from Kudu masters. |
| */ |
| private static HiveMetastoreConfig getHiveMetastoreConfig(String kuduMasters) |
| throws ImpalaRuntimeException { |
| Preconditions.checkNotNull(kuduMasters); |
| Preconditions.checkArgument(!kuduMasters.isEmpty()); |
| KuduClient kuduClient = KuduUtil.getKuduClient(kuduMasters); |
| HiveMetastoreConfig hmsConfig; |
| try { |
| hmsConfig = kuduClient.getHiveMetastoreConfig(); |
| } catch (KuduException e) { |
| throw new ImpalaRuntimeException( |
| String.format("Error determining if Kudu's integration with " + |
| "the Hive Metastore is enabled: %s", e.getMessage())); |
| } |
| return hmsConfig; |
| } |
| |
| /** |
| * Check with Kudu master to see if Kudu's integration with the Hive Metastore |
| * is enabled. |
| */ |
| public static boolean isHMSIntegrationEnabled(String kuduMasters) |
| throws ImpalaRuntimeException { |
| return getHiveMetastoreConfig(kuduMasters) != null; |
| } |
| |
| /** |
| * Check with the Kudu master to see if its Kudu-HMS integration is enabled; |
| * if so, validate that it is integrated with the same Hive Metastore that |
| * Impala is configured to use. |
| */ |
| public static boolean isHMSIntegrationEnabledAndValidate(String kuduMasters, |
| String hmsUris) throws ImpalaRuntimeException { |
| Preconditions.checkNotNull(hmsUris); |
| Preconditions.checkArgument(!hmsUris.isEmpty()); |
| HiveMetastoreConfig hmsConfig = getHiveMetastoreConfig(kuduMasters); |
| if (hmsConfig == null) { |
| return false; |
| } |
| // Validate Kudu is configured to use the same HMS as Impala. We consider |
| // it is the case as long as Kudu and Impala are configured to talk to |
| // the HMS with the same host address(es). |
| final String kuduHmsUris = hmsConfig.getHiveMetastoreUris(); |
| Set<String> hmsHosts; |
| Set<String> kuduHmsHosts; |
| try { |
| hmsHosts = parseHosts(hmsUris); |
| kuduHmsHosts = parseHosts(kuduHmsUris); |
| } catch (URISyntaxException e) { |
| throw new ImpalaRuntimeException( |
| String.format("Error parsing URI: %s", e.getMessage())); |
| } |
| if (hmsHosts != null && kuduHmsHosts != null && hmsHosts.equals(kuduHmsHosts)) { |
| return true; |
| } |
| throw new ImpalaRuntimeException( |
| String.format("Kudu is integrated with a different Hive Metastore " + |
| "than that used by Impala, Kudu is configured to use the HMS: " + |
| "%s, while Impala is configured to use the HMS: %s", |
| kuduHmsUris, hmsUris)); |
| } |
| |
| /** |
| * Parse the given URIs and return a set of hosts in the URIs. |
| */ |
| private static Set<String> parseHosts(String uris) throws URISyntaxException { |
| String[] urisString = uris.split(","); |
| Set<String> parsedHosts = new HashSet<>(); |
| |
| for (String s : urisString) { |
| s.trim(); |
| URI tmpUri = new URI(s); |
| parsedHosts.add(tmpUri.getHost()); |
| } |
| return parsedHosts; |
| } |
| |
| /** |
| * Load schema and partitioning schemes directly from Kudu. |
| */ |
| public void loadSchemaFromKudu() throws ImpalaRuntimeException { |
| // This is set to 0 for Kudu tables. |
| // TODO: Change this to reflect the number of pk columns and modify all the |
| // places (e.g. insert stmt) that currently make use of this parameter. |
| numClusteringCols_ = 0; |
| org.apache.kudu.client.KuduTable kuduTable = null; |
| // Connect to Kudu to retrieve table metadata |
| KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts()); |
| try { |
| kuduTable = kuduClient.openTable(kuduTableName_); |
| } catch (KuduException e) { |
| throw new ImpalaRuntimeException( |
| String.format("Error opening Kudu table '%s', Kudu error: %s", kuduTableName_, |
| e.getMessage())); |
| } |
| Preconditions.checkNotNull(kuduTable); |
| |
| loadSchema(kuduTable); |
| Preconditions.checkState(!colsByPos_.isEmpty()); |
| partitionBy_ = Utils.loadPartitionByParams(kuduTable); |
| } |
| |
| /** |
| * Loads the metadata of a Kudu table. |
| * |
| * Schema and partitioning schemes are loaded directly from Kudu whereas column stats |
| * are loaded from HMS. The function also updates the table schema in HMS in order to |
| * propagate alterations made to the Kudu table to HMS. |
| */ |
| @Override |
| public void load(boolean dummy /* not used */, IMetaStoreClient msClient, |
| org.apache.hadoop.hive.metastore.api.Table msTbl, String reason) |
| throws TableLoadingException { |
| final Timer.Context context = |
| getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time(); |
| try { |
| // Copy the table to check later if anything has changed. |
| msTable_ = msTbl.deepCopy(); |
| kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME); |
| kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS); |
| if (kuduMasters_ == null || kuduMasters_.isEmpty()) { |
| throw new TableLoadingException("No " + KuduTable.KEY_MASTER_HOSTS + |
| " property found for Kudu table " + kuduTableName_); |
| } |
| setTableStats(msTable_); |
| // Load metadata from Kudu |
| final Timer.Context ctxStorageLdTime = |
| getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA).time(); |
| try { |
| loadSchemaFromKudu(); |
| } catch (ImpalaRuntimeException e) { |
| throw new TableLoadingException("Error loading metadata for Kudu table " + |
| kuduTableName_, e); |
| } finally { |
| storageMetadataLoadTime_ = ctxStorageLdTime.stop(); |
| } |
| // Load from HMS |
| loadAllColumnStats(msClient); |
| refreshLastUsedTime(); |
| // Avoid updating HMS if the schema didn't change. |
| if (msTable_.equals(msTbl)) return; |
| |
| // Update the table schema in HMS. |
| try { |
| // HMS would fill this table property if it was not set, but as metadata written |
| // with alter_table is not read back in case of Kudu tables, it has to be set here |
| // to ensure that HMS and catalogd have the same timestamp. |
| updateTimestampProperty(msTable_, TBL_PROP_LAST_DDL_TIME); |
| msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS, |
| StatsSetupConst.TRUE); |
| msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_); |
| } catch (TException e) { |
| throw new TableLoadingException(e.getMessage()); |
| } |
| } finally { |
| context.stop(); |
| } |
| } |
| |
| /** |
| * Loads the schema from the Kudu table including column definitions and primary key |
| * columns. Replaces the columns in the HMS table with the columns from the Kudu table. |
| * Throws an ImpalaRuntimeException if Kudu column data types cannot be mapped to |
| * Impala data types. |
| */ |
| private void loadSchema(org.apache.kudu.client.KuduTable kuduTable) |
| throws ImpalaRuntimeException { |
| Preconditions.checkNotNull(kuduTable); |
| clearColumns(); |
| primaryKeyColumnNames_.clear(); |
| boolean isHMSIntegrationEnabled = KuduTable.isHMSIntegrationEnabled(kuduMasters_); |
| List<FieldSchema> cols = msTable_.getSd().getCols(); |
| if (!isHMSIntegrationEnabled) { |
| cols.clear(); |
| } |
| |
| int pos = 0; |
| kuduSchema_ = kuduTable.getSchema(); |
| for (ColumnSchema colSchema: kuduSchema_.getColumns()) { |
| KuduColumn kuduCol = KuduColumn.fromColumnSchema(colSchema, pos); |
| Preconditions.checkNotNull(kuduCol); |
| // Only update the HMS column definition when Kudu/HMS integration |
| // is disabled. |
| if (!isHMSIntegrationEnabled) { |
| cols.add(new FieldSchema(kuduCol.getName(), |
| kuduCol.getType().toSql().toLowerCase(), null)); |
| } |
| if (kuduCol.isKey()) primaryKeyColumnNames_.add(kuduCol.getName()); |
| addColumn(kuduCol); |
| ++pos; |
| } |
| } |
| |
| |
| /** |
| * Creates a temporary KuduTable object populated with the specified properties but has |
| * an invalid TableId and is not added to the Kudu storage engine or the |
| * HMS. This is used for CTAS statements. |
| */ |
| public static KuduTable createCtasTarget(Db db, |
| org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs, |
| List<ColumnDef> primaryKeyColumnDefs, List<KuduPartitionParam> partitionParams) { |
| KuduTable tmpTable = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); |
| int pos = 0; |
| for (ColumnDef colDef: columnDefs) { |
| tmpTable.addColumn(new Column(colDef.getColName(), colDef.getType(), pos++)); |
| } |
| for (ColumnDef pkColDef: primaryKeyColumnDefs) { |
| tmpTable.primaryKeyColumnNames_.add(pkColDef.getColName()); |
| } |
| tmpTable.partitionBy_ = ImmutableList.copyOf(partitionParams); |
| return tmpTable; |
| } |
| |
| @Override |
| public TTable toThrift() { |
| TTable table = super.toThrift(); |
| table.setTable_type(TTableType.KUDU_TABLE); |
| table.setKudu_table(getTKuduTable()); |
| return table; |
| } |
| |
| @Override |
| protected void loadFromThrift(TTable thriftTable) throws TableLoadingException { |
| super.loadFromThrift(thriftTable); |
| TKuduTable tkudu = thriftTable.getKudu_table(); |
| kuduTableName_ = tkudu.getTable_name(); |
| kuduMasters_ = Joiner.on(',').join(tkudu.getMaster_addresses()); |
| primaryKeyColumnNames_.clear(); |
| primaryKeyColumnNames_.addAll(tkudu.getKey_columns()); |
| partitionBy_ = loadPartitionByParamsFromThrift(tkudu.getPartition_by()); |
| } |
| |
| private static List<KuduPartitionParam> loadPartitionByParamsFromThrift( |
| List<TKuduPartitionParam> params) { |
| List<KuduPartitionParam> ret= new ArrayList<>(); |
| for (TKuduPartitionParam param: params) { |
| if (param.isSetBy_hash_param()) { |
| TKuduPartitionByHashParam hashParam = param.getBy_hash_param(); |
| ret.add(KuduPartitionParam.createHashParam( |
| hashParam.getColumns(), hashParam.getNum_partitions())); |
| } else { |
| Preconditions.checkState(param.isSetBy_range_param()); |
| TKuduPartitionByRangeParam rangeParam = param.getBy_range_param(); |
| ret.add(KuduPartitionParam.createRangeParam(rangeParam.getColumns(), |
| null)); |
| } |
| } |
| return ret; |
| } |
| |
| @Override |
| public TTableDescriptor toThriftDescriptor(int tableId, |
| Set<Long> referencedPartitions) { |
| TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.KUDU_TABLE, |
| getTColumnDescriptors(), numClusteringCols_, name_, db_.getName()); |
| desc.setKuduTable(getTKuduTable()); |
| return desc; |
| } |
| |
| private TKuduTable getTKuduTable() { |
| TKuduTable tbl = new TKuduTable(); |
| tbl.setKey_columns(Preconditions.checkNotNull(primaryKeyColumnNames_)); |
| tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(","))); |
| tbl.setTable_name(kuduTableName_); |
| Preconditions.checkNotNull(partitionBy_); |
| // IMPALA-5154: partitionBy_ may be empty if Kudu table created outside Impala, |
| // partition_by must be explicitly created because the field is required. |
| tbl.partition_by = new ArrayList<>(); |
| for (KuduPartitionParam partitionParam: partitionBy_) { |
| tbl.addToPartition_by(partitionParam.toThrift()); |
| } |
| return tbl; |
| } |
| } |