| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.catalog; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| import javax.xml.bind.DatatypeConverter; |
| |
| import org.apache.hadoop.hive.common.StatsSetupConst; |
| import org.apache.hadoop.hive.metastore.IMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; |
| import org.apache.impala.analysis.ColumnDef; |
| import org.apache.impala.analysis.KuduPartitionParam; |
| import org.apache.impala.common.ImpalaRuntimeException; |
| import org.apache.impala.service.BackendConfig; |
| import org.apache.impala.service.CatalogOpExecutor; |
| import org.apache.impala.thrift.TCatalogObjectType; |
| import org.apache.impala.thrift.TColumn; |
| import org.apache.impala.thrift.TKuduPartitionByHashParam; |
| import org.apache.impala.thrift.TKuduPartitionByRangeParam; |
| import org.apache.impala.thrift.TKuduPartitionParam; |
| import org.apache.impala.thrift.TKuduTable; |
| import org.apache.impala.thrift.TResultSet; |
| import org.apache.impala.thrift.TResultSetMetadata; |
| import org.apache.impala.thrift.TTable; |
| import org.apache.impala.thrift.TTableDescriptor; |
| import org.apache.impala.thrift.TTableType; |
| import org.apache.impala.util.KuduUtil; |
| import org.apache.impala.util.TResultRowBuilder; |
| import org.apache.kudu.ColumnSchema; |
| import org.apache.kudu.Schema; |
| import org.apache.kudu.client.KuduClient; |
| import org.apache.kudu.client.KuduException; |
| import org.apache.kudu.client.LocatedTablet; |
| import org.apache.kudu.client.PartitionSchema; |
| import org.apache.kudu.client.PartitionSchema.HashBucketSchema; |
| import org.apache.kudu.client.PartitionSchema.RangeSchema; |
| import org.apache.log4j.Logger; |
| import org.apache.thrift.TException; |
| |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| |
| /** |
| * Representation of a Kudu table in the catalog cache. |
| */ |
| public class KuduTable extends Table { |
| |
| // Alias to the string key that identifies the storage handler for Kudu tables. |
| public static final String KEY_STORAGE_HANDLER = |
| hive_metastoreConstants.META_TABLE_STORAGE; |
| |
| // Key to access the table name from the table properties. |
| public static final String KEY_TABLE_NAME = "kudu.table_name"; |
| |
| // Key to access the columns used to build the (composite) key of the table. |
| // Deprecated - Used only for error checking. |
| public static final String KEY_KEY_COLUMNS = "kudu.key_columns"; |
| |
| // Key to access the master host from the table properties. Error handling for |
| // this string is done in the KuduClient library. |
| // TODO: Rename kudu.master_addresses to kudu.master_host will break compatibility |
| // with older versions. |
| public static final String KEY_MASTER_HOSTS = "kudu.master_addresses"; |
| |
| // Kudu specific value for the storage handler table property keyed by |
| // KEY_STORAGE_HANDLER. |
| // TODO: Fix the storage handler name (see IMPALA-4271). |
| public static final String KUDU_STORAGE_HANDLER = |
| "com.cloudera.kudu.hive.KuduStorageHandler"; |
| |
| // Key to specify the number of tablet replicas. |
| public static final String KEY_TABLET_REPLICAS = "kudu.num_tablet_replicas"; |
| |
| // Table name in the Kudu storage engine. It may not neccessarily be the same as the |
| // table name specified in the CREATE TABLE statement; the latter |
| // is stored in Table.name_. Reasons why KuduTable.kuduTableName_ and Table.name_ may |
| // differ: |
| // 1. For managed tables, 'kuduTableName_' is prefixed with 'impala::<db_name>' to |
| // avoid conficts. TODO: Remove this when Kudu supports databases. |
| // 2. The user may specify a table name using the 'kudu.table_name' table property. |
| private String kuduTableName_; |
| |
| // Comma separated list of Kudu master hosts with optional ports. |
| private String kuduMasters_; |
| |
| // Primary key column names. |
| private final List<String> primaryKeyColumnNames_ = Lists.newArrayList(); |
| |
| // Partitioning schemes of this Kudu table. Both range and hash-based partitioning are |
| // supported. |
| private final List<KuduPartitionParam> partitionBy_ = Lists.newArrayList(); |
| |
| // Schema of the underlying Kudu table. |
| private org.apache.kudu.Schema kuduSchema_; |
| |
| protected KuduTable(org.apache.hadoop.hive.metastore.api.Table msTable, |
| Db db, String name, String owner) { |
| super(msTable, db, name, owner); |
| kuduTableName_ = msTable.getParameters().get(KuduTable.KEY_TABLE_NAME); |
| kuduMasters_ = msTable.getParameters().get(KuduTable.KEY_MASTER_HOSTS); |
| } |
| |
| @Override |
| public TCatalogObjectType getCatalogObjectType() { return TCatalogObjectType.TABLE; } |
| |
| @Override |
| public String getStorageHandlerClassName() { return KUDU_STORAGE_HANDLER; } |
| |
| /** |
| * Returns the columns in the order they have been created |
| */ |
| @Override |
| public ArrayList<Column> getColumnsInHiveOrder() { return getColumns(); } |
| |
| public static boolean isKuduTable(org.apache.hadoop.hive.metastore.api.Table msTbl) { |
| return KUDU_STORAGE_HANDLER.equals(msTbl.getParameters().get(KEY_STORAGE_HANDLER)); |
| } |
| |
| public String getKuduTableName() { return kuduTableName_; } |
| public String getKuduMasterHosts() { return kuduMasters_; } |
| public org.apache.kudu.Schema getKuduSchema() { return kuduSchema_; } |
| |
| public List<String> getPrimaryKeyColumnNames() { |
| return ImmutableList.copyOf(primaryKeyColumnNames_); |
| } |
| |
| public List<KuduPartitionParam> getPartitionBy() { |
| return ImmutableList.copyOf(partitionBy_); |
| } |
| |
| public Set<String> getPartitionColumnNames() { |
| Set<String> ret = new HashSet<String>(); |
| for (KuduPartitionParam partitionParam : partitionBy_) { |
| ret.addAll(partitionParam.getColumnNames()); |
| } |
| return ret; |
| } |
| |
| /** |
| * Returns the range-based partitioning of this table if it exists, null otherwise. |
| */ |
| private KuduPartitionParam getRangePartitioning() { |
| for (KuduPartitionParam partitionParam: partitionBy_) { |
| if (partitionParam.getType() == KuduPartitionParam.Type.RANGE) { |
| return partitionParam; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Returns the column names of the table's range-based partitioning or an empty |
| * list if the table doesn't have a range-based partitioning. |
| */ |
| public List<String> getRangePartitioningColNames() { |
| KuduPartitionParam rangePartitioning = getRangePartitioning(); |
| if (rangePartitioning == null) return Collections.<String>emptyList(); |
| return rangePartitioning.getColumnNames(); |
| } |
| |
| /** |
| * Load schema and partitioning schemes directly from Kudu. |
| */ |
| public void loadSchemaFromKudu() throws ImpalaRuntimeException { |
| // This is set to 0 for Kudu tables. |
| // TODO: Change this to reflect the number of pk columns and modify all the |
| // places (e.g. insert stmt) that currently make use of this parameter. |
| numClusteringCols_ = 0; |
| org.apache.kudu.client.KuduTable kuduTable = null; |
| // Connect to Kudu to retrieve table metadata |
| KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts()); |
| try { |
| kuduTable = kuduClient.openTable(kuduTableName_); |
| } catch (KuduException e) { |
| throw new ImpalaRuntimeException( |
| String.format("Error opening Kudu table '%s', Kudu error: %s", kuduTableName_, |
| e.getMessage())); |
| } |
| Preconditions.checkNotNull(kuduTable); |
| |
| loadSchema(kuduTable); |
| loadPartitionByParams(kuduTable); |
| } |
| |
| /** |
| * Loads the metadata of a Kudu table. |
| * |
| * Schema and partitioning schemes are loaded directly from Kudu whereas column stats |
| * are loaded from HMS. The function also updates the table schema in HMS in order to |
| * propagate alterations made to the Kudu table to HMS. |
| */ |
| @Override |
| public void load(boolean dummy /* not used */, IMetaStoreClient msClient, |
| org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { |
| msTable_ = msTbl; |
| kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME); |
| Preconditions.checkNotNull(kuduTableName_); |
| kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS); |
| Preconditions.checkNotNull(kuduMasters_); |
| setTableStats(msTable_); |
| // Load metadata from Kudu and HMS |
| try { |
| loadSchemaFromKudu(); |
| loadAllColumnStats(msClient); |
| } catch (ImpalaRuntimeException e) { |
| throw new TableLoadingException("Error loading metadata for Kudu table " + |
| kuduTableName_, e); |
| } |
| |
| // Update the table schema in HMS. |
| try { |
| long lastDdlTime = CatalogOpExecutor.calculateDdlTime(msTable_); |
| msTable_.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime)); |
| msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS, |
| StatsSetupConst.TRUE); |
| msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_); |
| } catch (TException e) { |
| throw new TableLoadingException(e.getMessage()); |
| } |
| } |
| |
| /** |
| * Loads the schema from the Kudu table including column definitions and primary key |
| * columns. Replaces the columns in the HMS table with the columns from the Kudu table. |
| * Throws an ImpalaRuntimeException if Kudu column data types cannot be mapped to |
| * Impala data types. |
| */ |
| private void loadSchema(org.apache.kudu.client.KuduTable kuduTable) |
| throws ImpalaRuntimeException { |
| Preconditions.checkNotNull(kuduTable); |
| clearColumns(); |
| primaryKeyColumnNames_.clear(); |
| List<FieldSchema> cols = msTable_.getSd().getCols(); |
| cols.clear(); |
| int pos = 0; |
| kuduSchema_ = kuduTable.getSchema(); |
| for (ColumnSchema colSchema: kuduSchema_.getColumns()) { |
| KuduColumn kuduCol = KuduColumn.fromColumnSchema(colSchema, pos); |
| Preconditions.checkNotNull(kuduCol); |
| // Add the HMS column |
| cols.add(new FieldSchema(kuduCol.getName(), kuduCol.getType().toSql().toLowerCase(), |
| null)); |
| if (kuduCol.isKey()) primaryKeyColumnNames_.add(kuduCol.getName()); |
| addColumn(kuduCol); |
| ++pos; |
| } |
| } |
| |
| private void loadPartitionByParams(org.apache.kudu.client.KuduTable kuduTable) { |
| Preconditions.checkNotNull(kuduTable); |
| Schema tableSchema = kuduTable.getSchema(); |
| PartitionSchema partitionSchema = kuduTable.getPartitionSchema(); |
| Preconditions.checkState(!colsByPos_.isEmpty()); |
| partitionBy_.clear(); |
| for (HashBucketSchema hashBucketSchema: partitionSchema.getHashBucketSchemas()) { |
| List<String> columnNames = Lists.newArrayList(); |
| for (int colId: hashBucketSchema.getColumnIds()) { |
| columnNames.add(getColumnNameById(tableSchema, colId)); |
| } |
| partitionBy_.add(KuduPartitionParam.createHashParam(columnNames, |
| hashBucketSchema.getNumBuckets())); |
| } |
| RangeSchema rangeSchema = partitionSchema.getRangeSchema(); |
| List<Integer> columnIds = rangeSchema.getColumns(); |
| if (columnIds.isEmpty()) return; |
| List<String> columnNames = Lists.newArrayList(); |
| for (int colId: columnIds) columnNames.add(getColumnNameById(tableSchema, colId)); |
| // We don't populate the split values because Kudu's API doesn't currently support |
| // retrieving the split values for range partitions. |
| // TODO: File a Kudu JIRA. |
| partitionBy_.add(KuduPartitionParam.createRangeParam(columnNames, null)); |
| } |
| |
| /** |
| * Returns the name of a Kudu column with id 'colId'. |
| */ |
| private String getColumnNameById(Schema tableSchema, int colId) { |
| Preconditions.checkNotNull(tableSchema); |
| ColumnSchema col = tableSchema.getColumnByIndex(tableSchema.getColumnIndex(colId)); |
| Preconditions.checkNotNull(col); |
| return col.getName(); |
| } |
| |
| /** |
| * Creates a temporary KuduTable object populated with the specified properties but has |
| * an invalid TableId and is not added to the Kudu storage engine or the |
| * HMS. This is used for CTAS statements. |
| */ |
| public static KuduTable createCtasTarget(Db db, |
| org.apache.hadoop.hive.metastore.api.Table msTbl, List<ColumnDef> columnDefs, |
| List<String> primaryKeyColumnNames, List<KuduPartitionParam> partitionParams) { |
| KuduTable tmpTable = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); |
| int pos = 0; |
| for (ColumnDef colDef: columnDefs) { |
| tmpTable.addColumn(new Column(colDef.getColName(), colDef.getType(), pos++)); |
| } |
| tmpTable.primaryKeyColumnNames_.addAll(primaryKeyColumnNames); |
| tmpTable.partitionBy_.addAll(partitionParams); |
| return tmpTable; |
| } |
| |
| @Override |
| public TTable toThrift() { |
| TTable table = super.toThrift(); |
| table.setTable_type(TTableType.KUDU_TABLE); |
| table.setKudu_table(getTKuduTable()); |
| return table; |
| } |
| |
| @Override |
| protected void loadFromThrift(TTable thriftTable) throws TableLoadingException { |
| super.loadFromThrift(thriftTable); |
| TKuduTable tkudu = thriftTable.getKudu_table(); |
| kuduTableName_ = tkudu.getTable_name(); |
| kuduMasters_ = Joiner.on(',').join(tkudu.getMaster_addresses()); |
| primaryKeyColumnNames_.clear(); |
| primaryKeyColumnNames_.addAll(tkudu.getKey_columns()); |
| loadPartitionByParamsFromThrift(tkudu.getPartition_by()); |
| } |
| |
| private void loadPartitionByParamsFromThrift(List<TKuduPartitionParam> params) { |
| partitionBy_.clear(); |
| for (TKuduPartitionParam param: params) { |
| if (param.isSetBy_hash_param()) { |
| TKuduPartitionByHashParam hashParam = param.getBy_hash_param(); |
| partitionBy_.add(KuduPartitionParam.createHashParam( |
| hashParam.getColumns(), hashParam.getNum_partitions())); |
| } else { |
| Preconditions.checkState(param.isSetBy_range_param()); |
| TKuduPartitionByRangeParam rangeParam = param.getBy_range_param(); |
| partitionBy_.add(KuduPartitionParam.createRangeParam(rangeParam.getColumns(), |
| null)); |
| } |
| } |
| } |
| |
| @Override |
| public TTableDescriptor toThriftDescriptor(int tableId, |
| Set<Long> referencedPartitions) { |
| TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.KUDU_TABLE, |
| getTColumnDescriptors(), numClusteringCols_, kuduTableName_, db_.getName()); |
| desc.setKuduTable(getTKuduTable()); |
| return desc; |
| } |
| |
| private TKuduTable getTKuduTable() { |
| TKuduTable tbl = new TKuduTable(); |
| tbl.setKey_columns(Preconditions.checkNotNull(primaryKeyColumnNames_)); |
| tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(","))); |
| tbl.setTable_name(kuduTableName_); |
| Preconditions.checkNotNull(partitionBy_); |
| // IMPALA-5154: partitionBy_ may be empty if Kudu table created outside Impala, |
| // partition_by must be explicitly created because the field is required. |
| tbl.partition_by = Lists.newArrayList(); |
| for (KuduPartitionParam partitionParam: partitionBy_) { |
| tbl.addToPartition_by(partitionParam.toThrift()); |
| } |
| return tbl; |
| } |
| |
| public boolean isPrimaryKeyColumn(String name) { |
| return primaryKeyColumnNames_.contains(name); |
| } |
| |
| public TResultSet getTableStats() throws ImpalaRuntimeException { |
| TResultSet result = new TResultSet(); |
| TResultSetMetadata resultSchema = new TResultSetMetadata(); |
| result.setSchema(resultSchema); |
| |
| resultSchema.addToColumns(new TColumn("# Rows", Type.INT.toThrift())); |
| resultSchema.addToColumns(new TColumn("Start Key", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("Stop Key", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("Leader Replica", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("# Replicas", Type.INT.toThrift())); |
| |
| KuduClient client = KuduUtil.getKuduClient(getKuduMasterHosts()); |
| try { |
| org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_); |
| List<LocatedTablet> tablets = |
| kuduTable.getTabletsLocations(BackendConfig.INSTANCE.getKuduClientTimeoutMs()); |
| if (tablets.isEmpty()) { |
| TResultRowBuilder builder = new TResultRowBuilder(); |
| result.addToRows( |
| builder.add("-1").add("N/A").add("N/A").add("N/A").add("-1").get()); |
| return result; |
| } |
| for (LocatedTablet tab: tablets) { |
| TResultRowBuilder builder = new TResultRowBuilder(); |
| builder.add("-1"); // The Kudu client API doesn't expose tablet row counts. |
| builder.add(DatatypeConverter.printHexBinary( |
| tab.getPartition().getPartitionKeyStart())); |
| builder.add(DatatypeConverter.printHexBinary( |
| tab.getPartition().getPartitionKeyEnd())); |
| LocatedTablet.Replica leader = tab.getLeaderReplica(); |
| if (leader == null) { |
| // Leader might be null, if it is not yet available (e.g. during |
| // leader election in Kudu) |
| builder.add("Leader n/a"); |
| } else { |
| builder.add(leader.getRpcHost() + ":" + leader.getRpcPort().toString()); |
| } |
| builder.add(tab.getReplicas().size()); |
| result.addToRows(builder.get()); |
| } |
| |
| } catch (Exception e) { |
| throw new ImpalaRuntimeException("Error accessing Kudu for table stats.", e); |
| } |
| return result; |
| } |
| |
| public TResultSet getRangePartitions() throws ImpalaRuntimeException { |
| TResultSet result = new TResultSet(); |
| TResultSetMetadata resultSchema = new TResultSetMetadata(); |
| result.setSchema(resultSchema); |
| |
| // Build column header |
| String header = "RANGE (" + Joiner.on(',').join(getRangePartitioningColNames()) + ")"; |
| resultSchema.addToColumns(new TColumn(header, Type.STRING.toThrift())); |
| KuduClient client = KuduUtil.getKuduClient(getKuduMasterHosts()); |
| try { |
| org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_); |
| // The Kudu table API will return the partitions in sorted order by value. |
| List<String> partitions = kuduTable.getFormattedRangePartitions( |
| BackendConfig.INSTANCE.getKuduClientTimeoutMs()); |
| if (partitions.isEmpty()) { |
| TResultRowBuilder builder = new TResultRowBuilder(); |
| result.addToRows(builder.add("").get()); |
| return result; |
| } |
| for (String partition: partitions) { |
| TResultRowBuilder builder = new TResultRowBuilder(); |
| builder.add(partition); |
| result.addToRows(builder.get()); |
| } |
| } catch (Exception e) { |
| throw new ImpalaRuntimeException("Error accessing Kudu for table partitions.", e); |
| } |
| return result; |
| } |
| } |