| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.catalog; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.apache.hadoop.hive.common.ValidWriteIdList; |
| import org.apache.hadoop.hive.metastore.IMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.TableType; |
| import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; |
| import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.hadoop.hive.metastore.api.SQLForeignKey; |
| import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; |
| import org.apache.impala.analysis.TableName; |
| import org.apache.impala.common.ImpalaRuntimeException; |
| import org.apache.impala.common.Metrics; |
| import org.apache.impala.common.Pair; |
| import org.apache.impala.common.PrintUtils; |
| import org.apache.impala.common.RuntimeEnv; |
| import org.apache.impala.compat.MetastoreShim; |
| import org.apache.impala.service.MetadataOp; |
| import org.apache.impala.thrift.TAccessLevel; |
| import org.apache.impala.thrift.TCatalogObject; |
| import org.apache.impala.thrift.TCatalogObjectType; |
| import org.apache.impala.thrift.TColumn; |
| import org.apache.impala.thrift.TColumnDescriptor; |
| import org.apache.impala.thrift.TGetPartialCatalogObjectRequest; |
| import org.apache.impala.thrift.TGetPartialCatalogObjectResponse; |
| import org.apache.impala.thrift.TPartialTableInfo; |
| import org.apache.impala.thrift.TTable; |
| import org.apache.impala.thrift.TTableDescriptor; |
| import org.apache.impala.thrift.TTableInfoSelector; |
| import org.apache.impala.thrift.TTableStats; |
| import org.apache.impala.util.AcidUtils; |
| import org.apache.impala.util.HdfsCachingUtil; |
| import org.apache.log4j.Logger; |
| |
| import com.codahale.metrics.Timer; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| |
| /** |
| * Base class for table metadata. |
| * |
| * This includes the concept of clustering columns, which are columns by which the table |
| * data is physically clustered. In other words, if two rows share the same values |
| * for the clustering columns, those two rows are most likely colocated. Note that this |
| * is more general than Hive's CLUSTER BY ... INTO BUCKETS clause (which partitions |
| * a key range into a fixed number of buckets). |
| */ |
| public abstract class Table extends CatalogObjectImpl implements FeTable { |
| private static final Logger LOG = Logger.getLogger(Table.class); |
| protected org.apache.hadoop.hive.metastore.api.Table msTable_; |
| protected final Db db_; |
| protected final String name_; |
| protected final String owner_; |
| protected TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE; |
| // Lock protecting this table |
| private final ReentrantLock tableLock_ = new ReentrantLock(); |
| |
| // Number of clustering columns. |
| protected int numClusteringCols_; |
| |
| // Contains the estimated number of rows and optional file bytes. Non-null. Member |
| // values of -1 indicate an unknown statistic. |
| protected TTableStats tableStats_; |
| |
| // Estimated size (in bytes) of this table metadata. Stored in an AtomicLong to allow |
| // this field to be accessed without holding the table lock. |
| protected AtomicLong estimatedMetadataSize_ = new AtomicLong(0); |
| |
| // Number of metadata operations performed on that table since it was loaded. |
| // Stored in an AtomicLong to allow this field to be accessed without holding the |
| // table lock. |
| protected AtomicLong metadataOpsCount_ = new AtomicLong(0); |
| |
| // Number of files that the table has. |
| // Stored in an AtomicLong to allow this field to be accessed without holding the |
| // table lock. |
| protected AtomicLong numFiles_ = new AtomicLong(0); |
| |
| // Metrics for this table |
| protected final Metrics metrics_ = new Metrics(); |
| |
| // colsByPos[i] refers to the ith column in the table. The first numClusteringCols are |
| // the clustering columns. |
| protected final ArrayList<Column> colsByPos_ = new ArrayList<>(); |
| |
| // map from lowercase column name to Column object. |
| private final Map<String, Column> colsByName_ = new HashMap<>(); |
| |
| // List of primary keys associated with the table. |
| protected final List<SQLPrimaryKey> primaryKeys_ = new ArrayList<>(); |
| |
| // List of foreign keys associated with the table. |
| protected final List<SQLForeignKey> foreignKeys_ = new ArrayList<>(); |
| |
| // Type of this table (array of struct) that mirrors the columns. Useful for analysis. |
| protected final ArrayType type_ = new ArrayType(new StructType()); |
| |
| // True if this object is stored in an Impalad catalog cache. |
| protected boolean storedInImpaladCatalogCache_ = false; |
| |
| // Time spent in the source systems loading/reloading the fs metadata for the table. |
| protected long storageMetadataLoadTime_ = 0; |
| |
| // Last used time of this table in nanoseconds as returned by |
| // CatalogdTableInvalidator.nanoTime(). This is only set in catalogd and not used by |
| // impalad. |
| protected long lastUsedTime_; |
| |
| // Valid write id list for this table. |
| // null in the case that this table is not transactional. |
| // TODO(todd) this should probably be a ValidWriteIdList in memory instead of a String. |
| protected String validWriteIds_ = null; |
| |
| // maximum number of catalog versions to store for in-flight events for this table |
| private static final int MAX_NUMBER_OF_INFLIGHT_EVENTS = 10; |
| |
| // FIFO list of versions for all the in-flight metastore events in this table |
| // This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_EVENTS size. Anything which |
| // is attempted to be added to this list when its at maximum capacity is ignored |
| private final LinkedList<Long> versionsForInflightEvents_ = new LinkedList<>(); |
| |
| // Table metrics. These metrics are applicable to all table types. Each subclass of |
| // Table can define additional metrics specific to that table type. |
| public static final String REFRESH_DURATION_METRIC = "refresh-duration"; |
| public static final String ALTER_DURATION_METRIC = "alter-duration"; |
| |
| // The time to load all the table metadata. |
| public static final String LOAD_DURATION_METRIC = "load-duration"; |
| |
| // Storage related to file system operations during metadata loading. |
| // The amount of time spent loading metadata from the underlying storage layer. |
| public static final String LOAD_DURATION_STORAGE_METADATA = |
| "load-duration.storage-metadata"; |
| |
| // The time for HMS to fetch table object and the real schema loading time. |
| // Normally, the code path is "msClient.getHiveClient().getTable(dbName, tblName)" |
| public static final String HMS_LOAD_TBL_SCHEMA = "hms-load-tbl-schema"; |
| |
| // Load all column stats, this is part of table metadata loading |
| // The code path is HdfsTable.loadAllColumnStats() |
| public static final String LOAD_DURATION_ALL_COLUMN_STATS = |
| "load-duration.all-column-stats"; |
| |
| // Table property key for storing the time of the last DDL operation. |
| public static final String TBL_PROP_LAST_DDL_TIME = "transient_lastDdlTime"; |
| |
| // Table property key for storing the last time when Impala executed COMPUTE STATS. |
| public static final String TBL_PROP_LAST_COMPUTE_STATS_TIME = |
| "impala.lastComputeStatsTime"; |
| |
| // Table property key for storing table type externality. |
| public static final String TBL_PROP_EXTERNAL_TABLE = "EXTERNAL"; |
| |
| // Table property key to determined if HMS translated a managed table to external table |
| public static final String TBL_PROP_EXTERNAL_TABLE_PURGE = "external.table.purge"; |
| |
| protected Table(org.apache.hadoop.hive.metastore.api.Table msTable, Db db, |
| String name, String owner) { |
| msTable_ = msTable; |
| db_ = db; |
| name_ = name.toLowerCase(); |
| owner_ = owner; |
| tableStats_ = new TTableStats(-1); |
| tableStats_.setTotal_file_bytes(-1); |
| initMetrics(); |
| } |
| |
| public ReentrantLock getLock() { return tableLock_; } |
| @Override |
| public abstract TTableDescriptor toThriftDescriptor( |
| int tableId, Set<Long> referencedPartitions); |
| |
| @Override // FeTable |
| public abstract TCatalogObjectType getCatalogObjectType(); |
| |
| public long getMetadataOpsCount() { return metadataOpsCount_.get(); } |
| public long getEstimatedMetadataSize() { return estimatedMetadataSize_.get(); } |
| public long getNumFiles() { return numFiles_.get(); } |
| public long getMedianTableLoadingTime() { |
| return (long)metrics_.getTimer(LOAD_DURATION_METRIC).getSnapshot().getMedian(); |
| } |
| public long getMaxTableLoadingTime() { |
| return metrics_.getTimer(LOAD_DURATION_METRIC).getSnapshot().getMax(); |
| } |
| public long get75TableLoadingTime() { |
| return (long)metrics_.getTimer(LOAD_DURATION_METRIC). |
| getSnapshot().get75thPercentile(); |
| } |
| public long get95TableLoadingTime() { |
| return (long)metrics_.getTimer(LOAD_DURATION_METRIC). |
| getSnapshot().get95thPercentile(); |
| } |
| public long get99TableLoadingTime() { |
| return (long)metrics_.getTimer(LOAD_DURATION_METRIC). |
| getSnapshot().get99thPercentile(); |
| } |
| public long getTableLoadingCounts() { |
| return metrics_.getTimer(LOAD_DURATION_METRIC).getCount(); |
| } |
| |
| public void setEstimatedMetadataSize(long estimatedMetadataSize) { |
| estimatedMetadataSize_.set(estimatedMetadataSize); |
| if (!isStoredInImpaladCatalogCache()) { |
| CatalogUsageMonitor.INSTANCE.updateLargestTables(this); |
| } |
| } |
| |
| public void incrementMetadataOpsCount() { |
| metadataOpsCount_.incrementAndGet(); |
| if (!isStoredInImpaladCatalogCache()) { |
| CatalogUsageMonitor.INSTANCE.updateFrequentlyAccessedTables(this); |
| } |
| } |
| |
| public void updateTableLoadingTime() { |
| if (!isStoredInImpaladCatalogCache()) { |
| CatalogUsageMonitor.INSTANCE.updateLongMetadataLoadingTables(this); |
| } |
| } |
| |
| public void setNumFiles(long numFiles) { |
| numFiles_.set(numFiles); |
| if (!isStoredInImpaladCatalogCache()) { |
| CatalogUsageMonitor.INSTANCE.updateHighFileCountTables(this); |
| } |
| } |
| |
| public void initMetrics() { |
| metrics_.addTimer(REFRESH_DURATION_METRIC); |
| metrics_.addTimer(ALTER_DURATION_METRIC); |
| metrics_.addTimer(LOAD_DURATION_METRIC); |
| metrics_.addTimer(LOAD_DURATION_STORAGE_METADATA); |
| metrics_.addTimer(HMS_LOAD_TBL_SCHEMA); |
| metrics_.addTimer(LOAD_DURATION_ALL_COLUMN_STATS); |
| } |
| |
| public Metrics getMetrics() { return metrics_; } |
| |
| // Returns storage wait time during metadata load. |
| public long getStorageLoadTime() { return storageMetadataLoadTime_; } |
| |
| // Returns true if this table reference comes from the impalad catalog cache or if it |
| // is loaded from the testing framework. Returns false if this table reference points |
| // to a table stored in the catalog server. |
| public boolean isStoredInImpaladCatalogCache() { |
| return storedInImpaladCatalogCache_ || RuntimeEnv.INSTANCE.isTestEnv(); |
| } |
| |
| public long getLastUsedTime() { |
| Preconditions.checkState(lastUsedTime_ != 0 && |
| !isStoredInImpaladCatalogCache()); |
| return lastUsedTime_; |
| } |
| |
| public void updateHMSLoadTableSchemaTime(long hmsLoadTimeNS) { |
| this.metrics_.getTimer(Table.HMS_LOAD_TBL_SCHEMA). |
| update(hmsLoadTimeNS, TimeUnit.NANOSECONDS); |
| } |
| |
| /** |
| * Populate members of 'this' from metastore info. If 'reuseMetadata' is true, reuse |
| * valid existing metadata. |
| */ |
| public abstract void load(boolean reuseMetadata, IMetaStoreClient client, |
| org.apache.hadoop.hive.metastore.api.Table msTbl, String reason) |
| throws TableLoadingException; |
| |
| /** |
| * Sets 'tableStats_' by extracting the table statistics from the given HMS table. |
| */ |
| public void setTableStats(org.apache.hadoop.hive.metastore.api.Table msTbl) { |
| tableStats_ = new TTableStats(FeCatalogUtils.getRowCount(msTbl.getParameters())); |
| tableStats_.setTotal_file_bytes(FeCatalogUtils.getTotalSize(msTbl.getParameters())); |
| } |
| |
| public void addColumn(Column col) { |
| colsByPos_.add(col); |
| colsByName_.put(col.getName().toLowerCase(), col); |
| ((StructType) type_.getItemType()).addField( |
| new StructField(col.getName(), col.getType(), col.getComment())); |
| } |
| |
| public void clearColumns() { |
| colsByPos_.clear(); |
| colsByName_.clear(); |
| ((StructType) type_.getItemType()).clearFields(); |
| } |
| |
| // Returns a list of all column names for this table which we expect to have column |
| // stats in the HMS. This exists because, when we request the column stats from HMS, |
| // including a column name that does not have stats causes the |
| // getTableColumnStatistics() to return nothing. For Hdfs tables, partition columns do |
| // not have column stats in the HMS, but HBase table clustering columns do have column |
| // stats. This method allows each table type to volunteer the set of columns we should |
| // ask the metastore for in loadAllColumnStats(). |
| protected List<String> getColumnNamesWithHmsStats() { |
| List<String> ret = new ArrayList<>(); |
| for (String name: colsByName_.keySet()) ret.add(name); |
| return ret; |
| } |
| |
| /** |
| * Loads column statistics for all columns in this table from the Hive metastore. Any |
| * errors are logged and ignored, since the absence of column stats is not critical to |
| * the correctness of the system. |
| */ |
| protected void loadAllColumnStats(IMetaStoreClient client) { |
| final Timer.Context columnStatsLdContext = |
| getMetrics().getTimer(LOAD_DURATION_ALL_COLUMN_STATS).time(); |
| try { |
| if (LOG.isTraceEnabled()) LOG.trace("Loading column stats for table: " + name_); |
| List<ColumnStatisticsObj> colStats; |
| |
| // We need to only query those columns which may have stats; asking HMS for other |
| // columns causes loadAllColumnStats() to return nothing. |
| // TODO(todd): this no longer seems to be true - asking for a non-existent column |
| // is just ignored, and the columns that do exist are returned. |
| List<String> colNames = getColumnNamesWithHmsStats(); |
| |
| try { |
| colStats = MetastoreShim.getTableColumnStatistics(client, db_.getName(), name_, |
| colNames); |
| } catch (Exception e) { |
| LOG.warn("Could not load column statistics for: " + getFullName(), e); |
| return; |
| } |
| FeCatalogUtils.injectColumnStats(colStats, this); |
| } finally { |
| columnStatsLdContext.stop(); |
| } |
| } |
| |
| /** |
| * Get valid write ids for the acid table. |
| * @param client the client to access HMS |
| * @return the list of valid write IDs for the table |
| */ |
| protected String fetchValidWriteIds(IMetaStoreClient client) |
| throws TableLoadingException { |
| String tblFullName = getFullName(); |
| if (LOG.isTraceEnabled()) LOG.trace("Get valid writeIds for table: " + tblFullName); |
| String writeIds = null; |
| try { |
| ValidWriteIdList validWriteIds = MetastoreShim.fetchValidWriteIds(client, |
| tblFullName); |
| writeIds = validWriteIds == null ? null : validWriteIds.writeToString(); |
| } catch (Exception e) { |
| throw new TableLoadingException(String.format("Error loading ValidWriteIds for " + |
| "table '%s'", getName()), e); |
| } |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Valid writeIds: " + writeIds); |
| } |
| return writeIds; |
| } |
| |
| /** |
| * Set ValistWriteIdList with stored writeId |
| * @param client the client to access HMS |
| */ |
| protected void loadValidWriteIdList(IMetaStoreClient client) |
| throws TableLoadingException { |
| Stopwatch sw = new Stopwatch().start(); |
| Preconditions.checkState(msTable_ != null && msTable_.getParameters() != null); |
| if (MetastoreShim.getMajorVersion() > 2 && |
| AcidUtils.isTransactionalTable(msTable_.getParameters())) { |
| validWriteIds_ = fetchValidWriteIds(client); |
| } else { |
| validWriteIds_ = null; |
| } |
| LOG.debug("Load Valid Write Id List Done. Time taken: " + |
| PrintUtils.printTimeNs(sw.elapsed(TimeUnit.NANOSECONDS))); |
| } |
| |
| /** |
| * Creates a table of the appropriate type based on the given hive.metastore.api.Table |
| * object. |
| */ |
| public static Table fromMetastoreTable(Db db, |
| org.apache.hadoop.hive.metastore.api.Table msTbl) { |
| CatalogInterners.internFieldsInPlace(msTbl); |
| Table table = null; |
| // Create a table of appropriate type |
| if (MetadataOp.TABLE_TYPE_VIEW.equals( |
| MetastoreShim.mapToInternalTableType(msTbl.getTableType()))) { |
| table = new View(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); |
| } else if (HBaseTable.isHBaseTable(msTbl)) { |
| table = new HBaseTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); |
| } else if (KuduTable.isKuduTable(msTbl)) { |
| table = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); |
| } else if (DataSourceTable.isDataSourceTable(msTbl)) { |
| // It's important to check if this is a DataSourceTable before HdfsTable because |
| // DataSourceTables are still represented by HDFS tables in the metastore but |
| // have a special table property to indicate that Impala should use an external |
| // data source. |
| table = new DataSourceTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); |
| } else if (HdfsFileFormat.isHdfsInputFormatClass(msTbl.getSd().getInputFormat())) { |
| table = new HdfsTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); |
| } |
| return table; |
| } |
| |
| /** |
| * Factory method that creates a new Table from its Thrift representation. |
| * Determines the type of table to create based on the Thrift table provided. |
| */ |
| public static Table fromThrift(Db parentDb, TTable thriftTable) |
| throws TableLoadingException { |
| CatalogInterners.internFieldsInPlace(thriftTable); |
| Table newTable; |
| if (!thriftTable.isSetLoad_status() && thriftTable.isSetMetastore_table()) { |
| newTable = Table.fromMetastoreTable(parentDb, thriftTable.getMetastore_table()); |
| } else { |
| newTable = |
| IncompleteTable.createUninitializedTable(parentDb, thriftTable.getTbl_name()); |
| } |
| newTable.loadFromThrift(thriftTable); |
| newTable.validate(); |
| return newTable; |
| } |
| |
| @Override // FeTable |
| public boolean isClusteringColumn(Column c) { |
| return c.getPosition() < numClusteringCols_; |
| } |
| |
| protected void loadFromThrift(TTable thriftTable) throws TableLoadingException { |
| List<TColumn> columns = new ArrayList<TColumn>(); |
| columns.addAll(thriftTable.getClustering_columns()); |
| columns.addAll(thriftTable.getColumns()); |
| |
| colsByPos_.clear(); |
| colsByPos_.ensureCapacity(columns.size()); |
| try { |
| for (int i = 0; i < columns.size(); ++i) { |
| Column col = Column.fromThrift(columns.get(i)); |
| colsByPos_.add(col.getPosition(), col); |
| colsByName_.put(col.getName().toLowerCase(), col); |
| ((StructType) type_.getItemType()).addField( |
| new StructField(col.getName(), col.getType(), col.getComment())); |
| } |
| } catch (ImpalaRuntimeException e) { |
| throw new TableLoadingException(String.format("Error loading schema for " + |
| "table '%s'", getName()), e); |
| } |
| |
| numClusteringCols_ = thriftTable.getClustering_columns().size(); |
| if (thriftTable.isSetTable_stats()) tableStats_ = thriftTable.getTable_stats(); |
| |
| // Default to READ_WRITE access if the field is not set. |
| accessLevel_ = thriftTable.isSetAccess_level() ? thriftTable.getAccess_level() : |
| TAccessLevel.READ_WRITE; |
| |
| storageMetadataLoadTime_ = thriftTable.getStorage_metadata_load_time_ns(); |
| |
| storedInImpaladCatalogCache_ = true; |
| validWriteIds_ = thriftTable.isSetValid_write_ids() ? |
| thriftTable.getValid_write_ids() : null; |
| } |
| |
| /** |
| * Checks preconditions for this table to function as expected. Currently only checks |
| * that all entries in colsByName_ use lower case keys. |
| */ |
| public void validate() throws TableLoadingException { |
| for (String colName: colsByName_.keySet()) { |
| if (!colName.equals(colName.toLowerCase())) { |
| throw new TableLoadingException( |
| "Expected lower case column name but found: " + colName); |
| } |
| } |
| } |
| |
| /** |
| * Must be called with 'tableLock_' held to protect against concurrent modifications |
| * while producing the TTable result. |
| */ |
| public TTable toThrift() { |
| // It would be simple to acquire and release the lock in this function. |
| // However, in most cases toThrift() is called after modifying a table for which |
| // the table lock should already be held, and we want the toThrift() to be consistent |
| // with the modification. So this check helps us identify places where the lock |
| // acquisition is probably missing entirely. |
| if (!tableLock_.isHeldByCurrentThread()) { |
| throw new IllegalStateException( |
| "Table.toThrift() called without holding the table lock: " + |
| getFullName() + " " + getClass().getName()); |
| } |
| |
| TTable table = new TTable(db_.getName(), name_); |
| table.setAccess_level(accessLevel_); |
| table.setStorage_metadata_load_time_ns(storageMetadataLoadTime_); |
| |
| // Populate both regular columns and clustering columns (if there are any). |
| table.setColumns(new ArrayList<>()); |
| table.setClustering_columns(new ArrayList<>()); |
| for (int i = 0; i < colsByPos_.size(); ++i) { |
| TColumn colDesc = colsByPos_.get(i).toThrift(); |
| // Clustering columns come first. |
| if (i < numClusteringCols_) { |
| table.addToClustering_columns(colDesc); |
| } else { |
| table.addToColumns(colDesc); |
| } |
| } |
| |
| table.setMetastore_table(getMetaStoreTable()); |
| table.setTable_stats(tableStats_); |
| if (validWriteIds_ != null) { |
| table.setValid_write_ids(validWriteIds_); |
| } |
| return table; |
| } |
| |
| public TCatalogObject toMinimalTCatalogObject() { |
| TCatalogObject catalogObject = |
| new TCatalogObject(getCatalogObjectType(), getCatalogVersion()); |
| catalogObject.setTable(new TTable()); |
| catalogObject.getTable().setDb_name(getDb().getName()); |
| catalogObject.getTable().setTbl_name(getName()); |
| return catalogObject; |
| } |
| |
| /** |
| * Override parent implementation that will finally call toThrift() which requires |
| * holding the table lock. However, it's not guaranteed that caller holds the table |
| * lock (IMPALA-9136). Here we use toMinimalTCatalogObject() directly since only db |
| * name and table name are needed. |
| */ |
| @Override |
| public final String getUniqueName() { |
| return Catalog.toCatalogObjectKey(toMinimalTCatalogObject()); |
| } |
| |
| @Override |
| protected void setTCatalogObject(TCatalogObject catalogObject) { |
| catalogObject.setTable(toThrift()); |
| } |
| |
| /** |
| * Return partial info about this table. This is called only on the catalogd to |
| * service GetPartialCatalogObject RPCs. |
| */ |
| public TGetPartialCatalogObjectResponse getPartialInfo( |
| TGetPartialCatalogObjectRequest req) throws TableLoadingException { |
| Preconditions.checkState(isLoaded(), "unloaded table: %s", getFullName()); |
| TTableInfoSelector selector = Preconditions.checkNotNull(req.table_info_selector, |
| "no table_info_selector"); |
| |
| TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse(); |
| resp.setObject_version_number(getCatalogVersion()); |
| resp.table_info = new TPartialTableInfo(); |
| resp.table_info.setStorage_metadata_load_time_ns(storageMetadataLoadTime_); |
| storageMetadataLoadTime_ = 0; |
| if (selector.want_hms_table) { |
| // TODO(todd): the deep copy could be a bit expensive. Unfortunately if we took |
| // a reference to this object, and let it escape out of the lock, it would be |
| // racy since the TTable is modified in place by some DDLs (eg 'dropTableStats'). |
| // We either need to ensure that TTable is cloned on every write, or we need to |
| // ensure that serialization of the GetPartialCatalogObjectResponse object |
| // is done while we continue to hold the table lock. |
| resp.table_info.setHms_table(getMetaStoreTable().deepCopy()); |
| } |
| if (selector.want_stats_for_column_names != null) { |
| List<ColumnStatisticsObj> statsList = Lists.newArrayListWithCapacity( |
| selector.want_stats_for_column_names.size()); |
| for (String colName: selector.want_stats_for_column_names) { |
| Column col = getColumn(colName); |
| if (col == null) continue; |
| |
| // Don't return stats for HDFS partitioning columns, since these are computed |
| // by the coordinator based on the partition map itself. This makes the |
| // behavior consistent with the HMS stats-fetching APIs. |
| // |
| // NOTE: we _do_ have to return stats for HBase clustering columns, because |
| // those aren't simple value partitions. |
| if (this instanceof FeFsTable && isClusteringColumn(col)) continue; |
| |
| ColumnStatisticsData tstats = col.getStats().toHmsCompatibleThrift(col.getType()); |
| if (tstats == null) continue; |
| // TODO(todd): it seems like the column type is not used? maybe worth not |
| // setting it here to save serialization. |
| statsList.add(new ColumnStatisticsObj(colName, col.getType().toString(), tstats)); |
| } |
| resp.table_info.setColumn_stats(statsList); |
| } |
| |
| return resp; |
| } |
| /** |
| * @see FeCatalogUtils#parseColumnType(FieldSchema, String) |
| */ |
| protected Type parseColumnType(FieldSchema fs) throws TableLoadingException { |
| return FeCatalogUtils.parseColumnType(fs, getName()); |
| } |
| |
| @Override // FeTable |
| public Db getDb() { return db_; } |
| |
| @Override // FeTable |
| public String getName() { return name_; } |
| |
| @Override // FeTable |
| public String getFullName() { return (db_ != null ? db_.getName() + "." : "") + name_; } |
| |
| @Override // FeTable |
| public TableName getTableName() { |
| return new TableName(db_ != null ? db_.getName() : null, name_); |
| } |
| |
| @Override // FeTable |
| public List<Column> getColumns() { return colsByPos_; } |
| |
| @Override // FeTable |
| public List<SQLPrimaryKey> getPrimaryKeys() { |
| // Prevent clients from modifying the primary keys list. |
| return ImmutableList.copyOf(primaryKeys_); |
| } |
| |
| @Override // FeTable |
| public List<SQLForeignKey> getForeignKeys() { |
| // Prevent clients from modifying the foreign keys list. |
| return ImmutableList.copyOf(foreignKeys_); |
| } |
| |
| @Override // FeTable |
| public List<String> getColumnNames() { return Column.toColumnNames(colsByPos_); } |
| |
| /** |
| * Returns a list of thrift column descriptors ordered by position. |
| */ |
| public List<TColumnDescriptor> getTColumnDescriptors() { |
| return FeCatalogUtils.getTColumnDescriptors(this); |
| } |
| |
| /** |
| * Subclasses should override this if they provide a storage handler class. Currently |
| * only HBase tables need to provide a storage handler. |
| */ |
| @Override // FeTable |
| public String getStorageHandlerClassName() { return null; } |
| |
| @Override // FeTable |
| public List<Column> getColumnsInHiveOrder() { |
| List<Column> columns = Lists.newArrayList(getNonClusteringColumns()); |
| columns.addAll(getClusteringColumns()); |
| return Collections.unmodifiableList(columns); |
| } |
| |
| @Override // FeTable |
| public List<Column> getClusteringColumns() { |
| return Collections.unmodifiableList(colsByPos_.subList(0, numClusteringCols_)); |
| } |
| |
| @Override // FeTable |
| public List<Column> getNonClusteringColumns() { |
| return Collections.unmodifiableList(colsByPos_.subList(numClusteringCols_, |
| colsByPos_.size())); |
| } |
| |
| @Override // FeTable |
| public Column getColumn(String name) { return colsByName_.get(name.toLowerCase()); } |
| |
| @Override // FeTable |
| public org.apache.hadoop.hive.metastore.api.Table getMetaStoreTable() { |
| return msTable_; |
| } |
| |
| @Override // FeTable |
| public String getOwnerUser() { |
| if (msTable_ == null) return null; |
| return msTable_.getOwner(); |
| } |
| |
| public void setMetaStoreTable(org.apache.hadoop.hive.metastore.api.Table msTbl) { |
| msTable_ = msTbl; |
| CatalogInterners.internFieldsInPlace(msTable_); |
| } |
| |
| @Override // FeTable |
| public int getNumClusteringCols() { return numClusteringCols_; } |
| |
| /** |
| * Sets the number of clustering columns. This method should only be used for tests and |
| * the caller must make sure that the value matches any columns that were added to the |
| * table. |
| */ |
| public void setNumClusteringCols(int n) { |
| Preconditions.checkState(RuntimeEnv.INSTANCE.isTestEnv()); |
| numClusteringCols_ = n; |
| } |
| |
| @Override // FeTable |
| public long getNumRows() { return tableStats_.num_rows; } |
| |
| @Override // FeTable |
| public TTableStats getTTableStats() { return tableStats_; } |
| |
| @Override // FeTable |
| public ArrayType getType() { return type_; } |
| |
| /** |
| * If the table is cached, it returns a <cache pool name, replication factor> pair |
| * and adds the table cached directive ID to 'cacheDirIds'. Otherwise, it |
| * returns a <null, 0> pair. |
| */ |
| public Pair<String, Short> getTableCacheInfo(List<Long> cacheDirIds) { |
| String cachePoolName = null; |
| Short cacheReplication = 0; |
| Long cacheDirId = HdfsCachingUtil.getCacheDirectiveId(msTable_.getParameters()); |
| if (cacheDirId != null) { |
| try { |
| cachePoolName = HdfsCachingUtil.getCachePool(cacheDirId); |
| cacheReplication = HdfsCachingUtil.getCacheReplication(cacheDirId); |
| Preconditions.checkNotNull(cacheReplication); |
| if (numClusteringCols_ == 0) cacheDirIds.add(cacheDirId); |
| } catch (ImpalaRuntimeException e) { |
| // Catch the error so that the actual update to the catalog can progress, |
| // this resets caching for the table though |
| LOG.error( |
| String.format("Cache directive %d was not found, uncache the table %s " + |
| "to remove this message.", cacheDirId, getFullName())); |
| cacheDirId = null; |
| } |
| } |
| return new Pair<String, Short>(cachePoolName, cacheReplication); |
| } |
| |
| /** |
| * The implementations of hashCode() and equals() functions are using table names as |
| * unique identifiers of tables. Hence, they should be used with caution and not in |
| * cases where truly unique table objects are needed. |
| */ |
| @Override |
| public int hashCode() { return getFullName().hashCode(); } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj == null) return false; |
| if (!(obj instanceof Table)) return false; |
| return getFullName().equals(((Table) obj).getFullName()); |
| } |
| |
| /** |
| * Updates a table property with the current system time in seconds precision. |
| */ |
| public static void updateTimestampProperty( |
| org.apache.hadoop.hive.metastore.api.Table msTbl, String propertyKey) { |
| msTbl.putToParameters(propertyKey, Long.toString(System.currentTimeMillis() / 1000)); |
| } |
| |
| public void refreshLastUsedTime() { |
| lastUsedTime_ = CatalogdTableInvalidator.nanoTime(); |
| } |
| |
| /** |
| * Gets the current list of versions for in-flight events for this table |
| */ |
| public List<Long> getVersionsForInflightEvents() { |
| return Collections.unmodifiableList(versionsForInflightEvents_); |
| } |
| |
| /** |
| * Removes a given version from the collection of version numbers for in-flight events |
| * @param versionNumber version number to remove from the collection |
| * @return true if version was successfully removed, false if didn't exist |
| */ |
| public boolean removeFromVersionsForInflightEvents(long versionNumber) { |
| return versionsForInflightEvents_.remove(versionNumber); |
| } |
| |
| /** |
| * Adds a version number to the collection of versions for in-flight events. If the |
| * collection is already at the max size defined by |
| * <code>MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the given version and |
| * does not add it |
| * @param versionNumber version number to add |
| * @return True if version number was added, false if the collection is at its max |
| * capacity |
| */ |
| public boolean addToVersionsForInflightEvents(long versionNumber) { |
| if (versionsForInflightEvents_.size() == MAX_NUMBER_OF_INFLIGHT_EVENTS) { |
| LOG.warn(String.format("Number of versions to be stored for table %s is at " |
| + " its max capacity %d. Ignoring add request for version number %d. This " |
| + "could cause unnecessary table invalidation when the event is processed", |
| getFullName(), MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber)); |
| return false; |
| } |
| versionsForInflightEvents_.add(versionNumber); |
| return true; |
| } |
| |
| @Override |
| public long getWriteId() { |
| return MetastoreShim.getWriteIdFromMSTable(msTable_); |
| } |
| |
| @Override |
| public String getValidWriteIds() { |
| return validWriteIds_; |
| } |
| } |