blob: 23fa7a42a8f27b3004193149ad62d51947eda07b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.impala.analysis.TableName;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.thrift.TAccessLevel;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TColumnDescriptor;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTableStats;
import org.apache.impala.util.HdfsCachingUtil;
import org.apache.log4j.Logger;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* Base class for table metadata.
*
* This includes the concept of clustering columns, which are columns by which the table
* data is physically clustered. In other words, if two rows share the same values
* for the clustering columns, those two rows are most likely colocated. Note that this
* is more general than Hive's CLUSTER BY ... INTO BUCKETS clause (which partitions
* a key range into a fixed number of buckets).
*/
public abstract class Table implements CatalogObject {
private static final Logger LOG = Logger.getLogger(Table.class);
// Catalog version assigned to this table
private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
protected org.apache.hadoop.hive.metastore.api.Table msTable_;
protected final Db db_;
protected final String name_;
protected final String owner_;
protected TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE;
// Lock protecting this table
private final ReentrantLock tableLock_ = new ReentrantLock();
// Number of clustering columns.
protected int numClusteringCols_;
// Contains the estimated number of rows and optional file bytes. Non-null. Member
// values of -1 indicate an unknown statistic.
protected TTableStats tableStats_;
// colsByPos[i] refers to the ith column in the table. The first numClusteringCols are
// the clustering columns.
protected final ArrayList<Column> colsByPos_ = Lists.newArrayList();
// map from lowercase column name to Column object.
private final Map<String, Column> colsByName_ = Maps.newHashMap();
// Type of this table (array of struct) that mirrors the columns. Useful for analysis.
protected final ArrayType type_ = new ArrayType(new StructType());
// The lastDdlTime for this table; -1 if not set
protected long lastDdlTime_;
// True if this object is stored in an Impalad catalog cache.
protected boolean storedInImpaladCatalogCache_ = false;
protected Table(org.apache.hadoop.hive.metastore.api.Table msTable, Db db,
String name, String owner) {
msTable_ = msTable;
db_ = db;
name_ = name.toLowerCase();
owner_ = owner;
lastDdlTime_ = (msTable_ != null) ?
CatalogServiceCatalog.getLastDdlTime(msTable_) : -1;
tableStats_ = new TTableStats(-1);
tableStats_.setTotal_file_bytes(-1);
}
public ReentrantLock getLock() { return tableLock_; }
public abstract TTableDescriptor toThriftDescriptor(
int tableId, Set<Long> referencedPartitions);
public abstract TCatalogObjectType getCatalogObjectType();
// Returns true if this table reference comes from the impalad catalog cache or if it
// is loaded from the testing framework. Returns false if this table reference points
// to a table stored in the catalog server.
public boolean isStoredInImpaladCatalogCache() {
return storedInImpaladCatalogCache_ || RuntimeEnv.INSTANCE.isTestEnv();
}
/**
* Populate members of 'this' from metastore info. If 'reuseMetadata' is true, reuse
* valid existing metadata.
*/
public abstract void load(boolean reuseMetadata, IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException;
/**
* Sets 'tableStats_' by extracting the table statistics from the given HMS table.
*/
public void setTableStats(org.apache.hadoop.hive.metastore.api.Table msTbl) {
tableStats_ = new TTableStats(getRowCount(msTbl.getParameters()));
tableStats_.setTotal_file_bytes(getTotalSize(msTbl.getParameters()));
}
public void addColumn(Column col) {
colsByPos_.add(col);
colsByName_.put(col.getName().toLowerCase(), col);
((StructType) type_.getItemType()).addField(
new StructField(col.getName(), col.getType(), col.getComment()));
}
public void clearColumns() {
colsByPos_.clear();
colsByName_.clear();
((StructType) type_.getItemType()).clearFields();
}
/**
* Updates the lastDdlTime for this Table, if the new value is greater
* than the existing value. Does nothing if the new value is less than
* or equal to the existing value.
*/
public void updateLastDdlTime(long ddlTime) {
// Ensure the lastDdlTime never goes backwards.
if (ddlTime > lastDdlTime_) lastDdlTime_ = ddlTime;
}
// Returns a list of all column names for this table which we expect to have column
// stats in the HMS. This exists because, when we request the column stats from HMS,
// including a column name that does not have stats causes the
// getTableColumnStatistics() to return nothing. For Hdfs tables, partition columns do
// not have column stats in the HMS, but HBase table clustering columns do have column
// stats. This method allows each table type to volunteer the set of columns we should
// ask the metastore for in loadAllColumnStats().
protected List<String> getColumnNamesWithHmsStats() {
List<String> ret = Lists.newArrayList();
for (String name: colsByName_.keySet()) ret.add(name);
return ret;
}
/**
* Loads column statistics for all columns in this table from the Hive metastore. Any
* errors are logged and ignored, since the absence of column stats is not critical to
* the correctness of the system.
*/
protected void loadAllColumnStats(IMetaStoreClient client) {
if (LOG.isTraceEnabled()) LOG.trace("Loading column stats for table: " + name_);
List<ColumnStatisticsObj> colStats;
// We need to only query those columns which may have stats; asking HMS for other
// columns causes loadAllColumnStats() to return nothing.
List<String> colNames = getColumnNamesWithHmsStats();
try {
colStats = client.getTableColumnStatistics(db_.getName(), name_, colNames);
} catch (Exception e) {
LOG.warn("Could not load column statistics for: " + getFullName(), e);
return;
}
for (ColumnStatisticsObj stats: colStats) {
Column col = getColumn(stats.getColName());
Preconditions.checkNotNull(col);
if (!ColumnStats.isSupportedColType(col.getType())) {
LOG.warn(String.format("Statistics for %s, column %s are not supported as " +
"column has type %s", getFullName(), col.getName(), col.getType()));
continue;
}
if (!col.updateStats(stats.getStatsData())) {
LOG.warn(String.format("Failed to load column stats for %s, column %s. Stats " +
"may be incompatible with column type %s. Consider regenerating statistics " +
"for %s.", getFullName(), col.getName(), col.getType(), getFullName()));
continue;
}
}
}
/**
* Returns the value of the ROW_COUNT constant, or -1 if not found.
*/
protected static long getRowCount(Map<String, String> parameters) {
return getLongParam(StatsSetupConst.ROW_COUNT, parameters);
}
protected static long getTotalSize(Map<String, String> parameters) {
return getLongParam(StatsSetupConst.TOTAL_SIZE, parameters);
}
private static long getLongParam(String key, Map<String, String> parameters) {
if (parameters == null) return -1;
String value = parameters.get(key);
if (value == null) return -1;
try {
return Long.valueOf(value);
} catch (NumberFormatException exc) {
// ignore
}
return -1;
}
/**
* Creates a table of the appropriate type based on the given hive.metastore.api.Table
* object.
*/
public static Table fromMetastoreTable(Db db,
org.apache.hadoop.hive.metastore.api.Table msTbl) {
// Create a table of appropriate type
Table table = null;
if (TableType.valueOf(msTbl.getTableType()) == TableType.VIRTUAL_VIEW) {
table = new View(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
} else if (HBaseTable.isHBaseTable(msTbl)) {
table = new HBaseTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
} else if (KuduTable.isKuduTable(msTbl)) {
table = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
} else if (DataSourceTable.isDataSourceTable(msTbl)) {
// It's important to check if this is a DataSourceTable before HdfsTable because
// DataSourceTables are still represented by HDFS tables in the metastore but
// have a special table property to indicate that Impala should use an external
// data source.
table = new DataSourceTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
} else if (HdfsFileFormat.isHdfsInputFormatClass(msTbl.getSd().getInputFormat())) {
table = new HdfsTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
}
return table;
}
/**
* Factory method that creates a new Table from its Thrift representation.
* Determines the type of table to create based on the Thrift table provided.
*/
public static Table fromThrift(Db parentDb, TTable thriftTable)
throws TableLoadingException {
Table newTable;
if (!thriftTable.isSetLoad_status() && thriftTable.isSetMetastore_table()) {
newTable = Table.fromMetastoreTable(parentDb, thriftTable.getMetastore_table());
} else {
newTable =
IncompleteTable.createUninitializedTable(parentDb, thriftTable.getTbl_name());
}
newTable.loadFromThrift(thriftTable);
newTable.validate();
return newTable;
}
public boolean isClusteringColumn(Column c) {
return c.getPosition() < numClusteringCols_;
}
protected void loadFromThrift(TTable thriftTable) throws TableLoadingException {
List<TColumn> columns = new ArrayList<TColumn>();
columns.addAll(thriftTable.getClustering_columns());
columns.addAll(thriftTable.getColumns());
colsByPos_.clear();
colsByPos_.ensureCapacity(columns.size());
try {
for (int i = 0; i < columns.size(); ++i) {
Column col = Column.fromThrift(columns.get(i));
colsByPos_.add(col.getPosition(), col);
colsByName_.put(col.getName().toLowerCase(), col);
((StructType) type_.getItemType()).addField(
new StructField(col.getName(), col.getType(), col.getComment()));
}
} catch (ImpalaRuntimeException e) {
throw new TableLoadingException(String.format("Error loading schema for " +
"table '%s'", getName()), e);
}
numClusteringCols_ = thriftTable.getClustering_columns().size();
if (thriftTable.isSetTable_stats()) tableStats_ = thriftTable.getTable_stats();
// Default to READ_WRITE access if the field is not set.
accessLevel_ = thriftTable.isSetAccess_level() ? thriftTable.getAccess_level() :
TAccessLevel.READ_WRITE;
storedInImpaladCatalogCache_ = true;
}
/**
* Checks preconditions for this table to function as expected. Currently only checks
* that all entries in colsByName_ use lower case keys.
*/
public void validate() throws TableLoadingException {
for (String colName: colsByName_.keySet()) {
if (!colName.equals(colName.toLowerCase())) {
throw new TableLoadingException(
"Expected lower case column name but found: " + colName);
}
}
}
/**
* Must be called with 'tableLock_' held to protect against concurrent modifications
* while producing the TTable result.
*/
public TTable toThrift() {
// It would be simple to acquire and release the lock in this function.
// However, in most cases toThrift() is called after modifying a table for which
// the table lock should already be held, and we want the toThrift() to be consistent
// with the modification. So this check helps us identify places where the lock
// acquisition is probably missing entirely.
if (!tableLock_.isHeldByCurrentThread()) {
throw new IllegalStateException(
"Table.toThrift() called without holding the table lock: " +
getFullName() + " " + getClass().getName());
}
TTable table = new TTable(db_.getName(), name_);
table.setAccess_level(accessLevel_);
// Populate both regular columns and clustering columns (if there are any).
table.setColumns(new ArrayList<TColumn>());
table.setClustering_columns(new ArrayList<TColumn>());
for (int i = 0; i < colsByPos_.size(); ++i) {
TColumn colDesc = colsByPos_.get(i).toThrift();
// Clustering columns come first.
if (i < numClusteringCols_) {
table.addToClustering_columns(colDesc);
} else {
table.addToColumns(colDesc);
}
}
table.setMetastore_table(getMetaStoreTable());
table.setTable_stats(tableStats_);
return table;
}
public TCatalogObject toTCatalogObject() {
TCatalogObject catalogObject = new TCatalogObject();
catalogObject.setType(getCatalogObjectType());
catalogObject.setCatalog_version(getCatalogVersion());
catalogObject.setTable(toThrift());
return catalogObject;
}
/**
* Gets the ColumnType from the given FieldSchema by using Impala's SqlParser.
* Throws a TableLoadingException if the FieldSchema could not be parsed.
* The type can either be:
* - Supported by Impala, in which case the type is returned.
* - A type Impala understands but is not yet implemented (e.g. date), the type is
* returned but type.IsSupported() returns false.
* - A supported type that exceeds an Impala limit, e.g., on the nesting depth.
* - A type Impala can't understand at all, and a TableLoadingException is thrown.
*/
protected Type parseColumnType(FieldSchema fs) throws TableLoadingException {
Type type = Type.parseColumnType(fs.getType());
if (type == null) {
throw new TableLoadingException(String.format(
"Unsupported type '%s' in column '%s' of table '%s'",
fs.getType(), fs.getName(), getName()));
}
if (type.exceedsMaxNestingDepth()) {
throw new TableLoadingException(String.format(
"Type exceeds the maximum nesting depth of %s:\n%s",
Type.MAX_NESTING_DEPTH, type.toSql()));
}
return type;
}
public Db getDb() { return db_; }
public String getName() { return name_; }
public String getFullName() { return (db_ != null ? db_.getName() + "." : "") + name_; }
public TableName getTableName() {
return new TableName(db_ != null ? db_.getName() : null, name_);
}
public ArrayList<Column> getColumns() { return colsByPos_; }
/**
* Returns a list of the column names ordered by position.
*/
public List<String> getColumnNames() { return Column.toColumnNames(colsByPos_); }
/**
* Returns a list of thrift column descriptors ordered by position.
*/
public List<TColumnDescriptor> getTColumnDescriptors() {
List<TColumnDescriptor> colDescs = Lists.<TColumnDescriptor>newArrayList();
for (Column col: colsByPos_) {
colDescs.add(new TColumnDescriptor(col.getName(), col.getType().toThrift()));
}
return colDescs;
}
/**
* Subclasses should override this if they provide a storage handler class. Currently
* only HBase tables need to provide a storage handler.
*/
public String getStorageHandlerClassName() { return null; }
/**
* Returns the list of all columns, but with partition columns at the end of
* the list rather than the beginning. This is equivalent to the order in
* which Hive enumerates columns.
*/
public ArrayList<Column> getColumnsInHiveOrder() {
ArrayList<Column> columns = Lists.newArrayList(getNonClusteringColumns());
columns.addAll(getClusteringColumns());
return columns;
}
/**
* Returns a struct type with the columns in the same order as getColumnsInHiveOrder().
*/
public StructType getHiveColumnsAsStruct() {
ArrayList<StructField> fields = Lists.newArrayListWithCapacity(colsByPos_.size());
for (Column col: getColumnsInHiveOrder()) {
fields.add(new StructField(col.getName(), col.getType(), col.getComment()));
}
return new StructType(fields);
}
/**
* Returns the list of all partition columns.
*/
public List<Column> getClusteringColumns() {
return colsByPos_.subList(0, numClusteringCols_);
}
/**
* Returns the list of all columns excluding any partition columns.
*/
public List<Column> getNonClusteringColumns() {
return colsByPos_.subList(numClusteringCols_, colsByPos_.size());
}
/**
* Case-insensitive lookup.
*/
public Column getColumn(String name) { return colsByName_.get(name.toLowerCase()); }
/**
* Returns the metastore.api.Table object this Table was created from. Returns null
* if the derived Table object was not created from a metastore Table (ex. InlineViews).
*/
public org.apache.hadoop.hive.metastore.api.Table getMetaStoreTable() {
return msTable_;
}
public void setMetaStoreTable(org.apache.hadoop.hive.metastore.api.Table msTbl) {
msTable_ = msTbl;
}
public int getNumClusteringCols() { return numClusteringCols_; }
/**
* Sets the number of clustering columns. This method should only be used for tests and
* the caller must make sure that the value matches any columns that were added to the
* table.
*/
public void setNumClusteringCols(int n) {
Preconditions.checkState(RuntimeEnv.INSTANCE.isTestEnv());
numClusteringCols_ = n;
}
public long getNumRows() { return tableStats_.num_rows; }
public TTableStats getTTableStats() { return tableStats_; }
public ArrayType getType() { return type_; }
@Override
public long getCatalogVersion() { return catalogVersion_; }
@Override
public void setCatalogVersion(long catalogVersion) {
catalogVersion_ = catalogVersion;
}
@Override
public boolean isLoaded() { return true; }
public static boolean isExternalTable(
org.apache.hadoop.hive.metastore.api.Table msTbl) {
return msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());
}
/**
* If the table is cached, it returns a <cache pool name, replication factor> pair
* and adds the table cached directive ID to 'cacheDirIds'. Otherwise, it
* returns a <null, 0> pair.
*/
public Pair<String, Short> getTableCacheInfo(List<Long> cacheDirIds) {
String cachePoolName = null;
Short cacheReplication = 0;
Long cacheDirId = HdfsCachingUtil.getCacheDirectiveId(msTable_.getParameters());
if (cacheDirId != null) {
try {
cachePoolName = HdfsCachingUtil.getCachePool(cacheDirId);
cacheReplication = HdfsCachingUtil.getCacheReplication(cacheDirId);
Preconditions.checkNotNull(cacheReplication);
if (numClusteringCols_ == 0) cacheDirIds.add(cacheDirId);
} catch (ImpalaRuntimeException e) {
// Catch the error so that the actual update to the catalog can progress,
// this resets caching for the table though
LOG.error(
String.format("Cache directive %d was not found, uncache the table %s " +
"to remove this message.", cacheDirId, getFullName()));
cacheDirId = null;
}
}
return new Pair<String, Short>(cachePoolName, cacheReplication);
}
}