blob: 3a6d71f7d8f2d1e84ca9fb57f0b28ac8f173c369 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.analysis.NullLiteral;
import org.apache.impala.analysis.NumericLiteral;
import org.apache.impala.analysis.PartitionKeyValue;
import org.apache.impala.catalog.HdfsPartition.FileBlock;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.fb.FbFileBlock;
import org.apache.impala.thrift.CatalogLookupStatus;
import org.apache.impala.thrift.CatalogObjectsConstants;
import org.apache.impala.thrift.TAccessLevel;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
import org.apache.impala.thrift.THdfsPartition;
import org.apache.impala.thrift.THdfsTable;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TPartialPartitionInfo;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TResultSetMetadata;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTableType;
import org.apache.impala.util.AvroSchemaConverter;
import org.apache.impala.util.AvroSchemaUtils;
import org.apache.impala.util.FsPermissionCache;
import org.apache.impala.util.FsPermissionChecker;
import org.apache.impala.util.HdfsCachingUtil;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.impala.util.TAccessLevelUtil;
import org.apache.impala.util.TResultRowBuilder;
import org.apache.impala.util.ThreadNameAnnotator;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Clock;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
/**
* Internal representation of table-related metadata of a file-resident table on a
* Hadoop filesystem. The table data can be accessed through libHDFS (which is more of
* an abstraction over Hadoop's FileSystem class rather than DFS specifically). A
* partitioned table can even span multiple filesystems.
*
* This class is not thread-safe. Clients of this class need to protect against
* concurrent updates using external locking (see CatalogOpExecutor class).
*
* Owned by Catalog instance.
* The partition keys constitute the clustering columns.
*
*/
public class HdfsTable extends Table implements FeFsTable {
// Name of default partition for unpartitioned tables
private static final String DEFAULT_PARTITION_NAME = "";
// Number of times to retry fetching the partitions from the HMS should an error occur.
private final static int NUM_PARTITION_FETCH_RETRIES = 5;
// Table property key for overriding the Impalad-wide --enable_stats_extrapolation
// setting for a specific table. By default, tables do not have the property set and
// rely on the Impalad-wide --enable_stats_extrapolation flag.
public static final String TBL_PROP_ENABLE_STATS_EXTRAPOLATION =
"impala.enable.stats.extrapolation";
// Similar to above: a table property that overwrites --recursively_list_partitions
// for a specific table. This is an escape hatch in case it turns out that someone
// was relying on the previous non-recursive behavior, even though it's known to
// be inconsistent with modern versions of Spark, Hive, etc.
public static final String TBL_PROP_DISABLE_RECURSIVE_LISTING =
"impala.disable.recursive.listing";
// Average memory requirements (in bytes) for storing the metadata of a partition.
private static final long PER_PARTITION_MEM_USAGE_BYTES = 2048;
// Average memory requirements (in bytes) for storing a file descriptor.
private static final long PER_FD_MEM_USAGE_BYTES = 500;
// Average memory requirements (in bytes) for storing a block.
private static final long PER_BLOCK_MEM_USAGE_BYTES = 150;
// Hdfs table specific metrics
public static final String CATALOG_UPDATE_DURATION_METRIC = "catalog-update-duration";
public static final String NUM_PARTITIONS_METRIC = "num-partitions";
public static final String NUM_FILES_METRIC = "num-files";
public static final String NUM_BLOCKS_METRIC = "num-blocks";
public static final String TOTAL_FILE_BYTES_METRIC = "total-file-size-bytes";
public static final String MEMORY_ESTIMATE_METRIC = "memory-estimate-bytes";
public static final String HAS_INCREMENTAL_STATS_METRIC = "has-incremental-stats";
// Load all partitions time, including fetching all partitions
// from HMS and loading all partitions. The code path is
// MetaStoreUtil.fetchAllPartitions() and HdfsTable.loadAllPartitions()
public static final String LOAD_DURATION_ALL_PARTITIONS =
"load-duration.all-partitions";
// The file metadata loading for all all partitions. This is part of
// LOAD_DURATION_ALL_PARTITIONS
// Code path: loadFileMetadataForPartitions() inside loadAllPartitions()
public static final String LOAD_DURATION_FILE_METADATA_ALL_PARTITIONS =
"load-duration.all-partitions.file-metadata";
// string to indicate NULL. set in load() from table properties
private String nullColumnValue_;
// hive uses this string for NULL partition keys. Set in load().
private String nullPartitionKeyValue_;
// Avro schema of this table if this is an Avro table, otherwise null. Set in load().
private String avroSchema_ = null;
// Set to true if any of the partitions have Avro data.
private boolean hasAvroData_ = false;
// True if this table's metadata is marked as cached. Does not necessarily mean the
// data is cached or that all/any partitions are cached.
private boolean isMarkedCached_ = false;
// Array of sorted maps storing the association between partition values and
// partition ids. There is one sorted map per partition key. It is only populated if
// this table object is stored in ImpaladCatalog.
private final List<TreeMap<LiteralExpr, Set<Long>>> partitionValuesMap_ =
new ArrayList<>();
// Array of partition id sets that correspond to partitions with null values
// in the partition keys; one set per partition key. It is not populated if the table is
// stored in the catalog server.
private final List<Set<Long>> nullPartitionIds_ = new ArrayList<>();
// Map of partition ids to HdfsPartitions.
private final Map<Long, HdfsPartition> partitionMap_ = new HashMap<>();
// Map of partition name to HdfsPartition object. Used for speeding up
// table metadata loading. It is only populated if this table object is stored in
// catalog server.
private final Map<String, HdfsPartition> nameToPartitionMap_ = new HashMap<>();
// The partition used as a prototype when creating new partitions during
// insertion. New partitions inherit file format and other settings from
// the prototype.
@VisibleForTesting
HdfsPartition prototypePartition_;
// Empirical estimate (in bytes) of the incremental stats size per column per
// partition.
public static final long STATS_SIZE_PER_COLUMN_BYTES = 200;
// Bi-directional map between an integer index and a unique datanode
// TNetworkAddresses, each of which contains blocks of 1 or more
// files in this table. The network addresses are stored using IP
// address as the host name. Each FileBlock specifies a list of
// indices within this hostIndex_ to specify which nodes contain
// replicas of the block.
private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<TNetworkAddress>();
// True iff this table has incremental stats in any of its partitions.
private boolean hasIncrementalStats_ = false;
private HdfsPartitionLocationCompressor partitionLocationCompressor_;
// Base Hdfs directory where files of this table are stored.
// For unpartitioned tables it is simply the path where all files live.
// For partitioned tables it is the root directory
// under which partition dirs are placed.
protected String hdfsBaseDir_;
// List of FieldSchemas that correspond to the non-partition columns. Used when
// describing this table and its partitions to the HMS (e.g. as part of an alter table
// operation), when only non-partition columns are required.
private final List<FieldSchema> nonPartFieldSchemas_ = new ArrayList<>();
// Flag to check if the table schema has been loaded. Used as a precondition
// for setAvroSchema().
private boolean isSchemaLoaded_ = false;
// Primary Key and Foreign Key information. Set in load() method.
private final List<SQLPrimaryKey> primaryKeys_ = new ArrayList<>();
private final List<SQLForeignKey> foreignKeys_ = new ArrayList<>();
// Represents a set of storage-related statistics aggregated at the table or partition
// level.
public final static class FileMetadataStats {
// Nuber of files in a table/partition.
public long numFiles;
// Number of blocks in a table/partition.
public long numBlocks;
// Total size (in bytes) of all files in a table/partition.
public long totalFileBytes;
// Unsets the storage stats to indicate that their values are unknown.
public void unset() {
numFiles = -1;
numBlocks = -1;
totalFileBytes = -1;
}
// Initializes the values of the storage stats.
public void init() {
numFiles = 0;
numBlocks = 0;
totalFileBytes = 0;
}
public void set(FileMetadataStats stats) {
numFiles = stats.numFiles;
numBlocks = stats.numBlocks;
totalFileBytes = stats.totalFileBytes;
}
}
// Table level storage-related statistics. Depending on whether the table is stored in
// the catalog server or the impalad catalog cache, these statistics serve different
// purposes and, hence, are managed differently.
// Table stored in impalad catalog cache:
// - Used in planning.
// - Stats are modified real-time by the operations that modify table metadata
// (e.g. add partition).
// Table stored in the the catalog server:
// - Used for reporting through catalog web UI.
// - Stats are reset whenever the table is loaded (due to a metadata operation) and
// are set when the table is serialized to Thrift.
private final FileMetadataStats fileMetadataStats_ = new FileMetadataStats();
private final static Logger LOG = LoggerFactory.getLogger(HdfsTable.class);
public final static long LOADING_WARNING_TIME_NS = 5000000000L;
// Caching this configuration object makes calls to getFileSystem much quicker
// (saves ~50ms on a standard plan)
// TODO(henry): confirm that this is thread safe - cursory inspection of the class
// and its usage in getFileSystem suggests it should be.
private static final Configuration CONF = new Configuration();
public HdfsTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
Db db, String name, String owner) {
super(msTbl, db, name, owner);
partitionLocationCompressor_ =
new HdfsPartitionLocationCompressor(numClusteringCols_);
}
@Override // FeFsTable
public boolean isLocationCacheable() {
return FileSystemUtil.isPathCacheable(new Path(getLocation()));
}
@Override // FeFsTable
public boolean isCacheable() {
if (!isLocationCacheable()) return false;
if (!isMarkedCached() && numClusteringCols_ > 0) {
for (FeFsPartition partition: partitionMap_.values()) {
if (!partition.isCacheable()) {
return false;
}
}
}
return true;
}
/**
* Updates the storage stats of this table based on the partition information.
* This is used only for the frontend tests that do not spawn a separate Catalog
* instance.
*/
public void computeHdfsStatsForTesting() {
Preconditions.checkState(fileMetadataStats_.numFiles == -1
&& fileMetadataStats_.totalFileBytes == -1);
fileMetadataStats_.init();
for (HdfsPartition partition: partitionMap_.values()) {
fileMetadataStats_.numFiles += partition.getNumFileDescriptors();
fileMetadataStats_.totalFileBytes += partition.getSize();
}
}
@Override
public TCatalogObjectType getCatalogObjectType() {
return TCatalogObjectType.TABLE;
}
@Override // FeFsTable
public boolean isMarkedCached() { return isMarkedCached_; }
@Override // FeFsTable
public Collection<? extends PrunablePartition> getPartitions() {
return partitionMap_.values();
}
@Override // FeFsTable
public Map<Long, ? extends PrunablePartition> getPartitionMap() {
return partitionMap_;
}
@Override // FeFsTable
public List<FeFsPartition> loadPartitions(Collection<Long> ids) {
List<FeFsPartition> partitions = Lists.newArrayListWithCapacity(ids.size());
for (Long id : ids) {
HdfsPartition partition = partitionMap_.get(id);
if (partition == null) {
throw new IllegalArgumentException("no such partition id " + id);
}
partitions.add(partition);
}
return partitions;
}
@Override // FeFsTable
public Set<Long> getNullPartitionIds(int i) { return nullPartitionIds_.get(i); }
public HdfsPartitionLocationCompressor getPartitionLocationCompressor() {
return partitionLocationCompressor_;
}
// Returns an unmodifiable set of the partition IDs from partitionMap_.
@Override // FeFsTable
public Set<Long> getPartitionIds() {
return Collections.unmodifiableSet(partitionMap_.keySet());
}
@Override // FeFsTable
public TreeMap<LiteralExpr, Set<Long>> getPartitionValueMap(int i) {
return partitionValuesMap_.get(i);
}
@Override // FeFsTable
public String getNullPartitionKeyValue() {
return nullPartitionKeyValue_; // Set during load.
}
@Override // FeFsTable
public String getLocation() {
return super.getMetaStoreTable().getSd().getLocation();
}
List<FieldSchema> getNonPartitionFieldSchemas() { return nonPartFieldSchemas_; }
// True if Impala has HDFS write permissions on the hdfsBaseDir
@Override
public boolean hasWriteAccessToBaseDir() {
return TAccessLevelUtil.impliesWriteAccess(accessLevel_);
}
/**
* Returns the first location (HDFS path) that Impala does not have WRITE access
* to, or an null if none is found. For an unpartitioned table, this just
* checks the hdfsBaseDir. For a partitioned table it checks the base directory
* as well as all partition directories.
*/
@Override
public String getFirstLocationWithoutWriteAccess() {
if (!hasWriteAccessToBaseDir()) {
return hdfsBaseDir_;
}
for (HdfsPartition partition: partitionMap_.values()) {
if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
return partition.getLocation();
}
}
return null;
}
/**
* Gets the HdfsPartition matching the given partition spec. Returns null if no match
* was found.
*/
public HdfsPartition getPartition(List<PartitionKeyValue> partitionSpec) {
return (HdfsPartition)getPartition(this, partitionSpec);
}
public static PrunablePartition getPartition(FeFsTable table,
List<PartitionKeyValue> partitionSpec) {
List<TPartitionKeyValue> partitionKeyValues = new ArrayList<>();
for (PartitionKeyValue kv: partitionSpec) {
Preconditions.checkArgument(kv.isStatic(), "unexpected dynamic partition: %s",
kv);
String value = PartitionKeyValue.getPartitionKeyValueString(
kv.getLiteralValue(), table.getNullPartitionKeyValue());
partitionKeyValues.add(new TPartitionKeyValue(kv.getColName(), value));
}
return Utils.getPartitionFromThriftPartitionSpec(table, partitionKeyValues);
}
/**
* Gets the HdfsPartition matching the Thrift version of the partition spec.
* Returns null if no match was found.
*/
public HdfsPartition getPartitionFromThriftPartitionSpec(
List<TPartitionKeyValue> partitionSpec) {
return (HdfsPartition)Utils.getPartitionFromThriftPartitionSpec(this, partitionSpec);
}
/**
* Gets hdfs partitions by the given partition set.
*/
@SuppressWarnings("unchecked")
public List<HdfsPartition> getPartitionsFromPartitionSet(
List<List<TPartitionKeyValue>> partitionSet) {
return (List<HdfsPartition>)Utils.getPartitionsFromPartitionSet(this, partitionSet);
}
/**
* Create columns corresponding to fieldSchemas. Throws a TableLoadingException if the
* metadata is incompatible with what we support.
*/
private void addColumnsFromFieldSchemas(List<FieldSchema> fieldSchemas)
throws TableLoadingException {
int pos = colsByPos_.size();
for (FieldSchema s: fieldSchemas) {
Type type = parseColumnType(s);
// Check if we support partitioning on columns of such a type.
if (pos < numClusteringCols_ && !type.supportsTablePartitioning()) {
throw new TableLoadingException(
String.format("Failed to load metadata for table '%s' because of " +
"unsupported partition-column type '%s' in partition column '%s'",
getFullName(), type.toString(), s.getName()));
}
Column col = new Column(s.getName(), type, s.getComment(), pos);
addColumn(col);
++pos;
}
}
/**
* Clear the partitions of an HdfsTable and the associated metadata.
*/
private void resetPartitions() {
partitionMap_.clear();
nameToPartitionMap_.clear();
partitionValuesMap_.clear();
nullPartitionIds_.clear();
if (isStoredInImpaladCatalogCache()) {
// Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats.
for (int i = 0; i < numClusteringCols_; ++i) {
getColumns().get(i).getStats().setNumNulls(0);
getColumns().get(i).getStats().setNumDistinctValues(0);
partitionValuesMap_.add(new TreeMap<>());
nullPartitionIds_.add(new HashSet<>());
}
}
fileMetadataStats_.init();
}
/**
* Resets any partition metadata, creates the prototype partition and sets the base
* table directory path as well as the caching info from the HMS table.
*/
public void initializePartitionMetadata(
org.apache.hadoop.hive.metastore.api.Table msTbl) throws CatalogException {
Preconditions.checkNotNull(msTbl);
resetPartitions();
hdfsBaseDir_ = msTbl.getSd().getLocation();
setPrototypePartition(msTbl.getSd());
// We silently ignore cache directives that no longer exist in HDFS, and remove
// non-existing cache directives from the parameters.
isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters());
}
/**
* Create HdfsPartition objects corresponding to 'msPartitions' and add them to this
* table's partition list. Any partition metadata will be reset and loaded from
* scratch. For each partition created, we load the block metadata for each data file
* under it. Returns time spent loading the filesystem metadata in nanoseconds.
*
* If there are no partitions in the Hive metadata, a single partition is added with no
* partition keys.
*/
private long loadAllPartitions(IMetaStoreClient client,
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws IOException,
CatalogException {
Preconditions.checkNotNull(msTbl);
final Clock clock = Clock.defaultClock();
long startTime = clock.getTick();
initializePartitionMetadata(msTbl);
FsPermissionCache permCache = preloadPermissionsCache(msPartitions);
Path tblLocation = FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath());
accessLevel_ = getAvailableAccessLevel(getFullName(), tblLocation, permCache);
if (msTbl.getPartitionKeysSize() == 0) {
Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty());
// This table has no partition key, which means it has no declared partitions.
// We model partitions slightly differently to Hive - every file must exist in a
// partition, so add a single partition with no keys which will get all the
// files in the table's root directory.
HdfsPartition part = createPartition(msTbl.getSd(), null, permCache);
if (isMarkedCached_) part.markCached();
addPartition(part);
} else {
for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
HdfsPartition partition = createPartition(msPartition.getSd(), msPartition,
permCache);
addPartition(partition);
// If the partition is null, its HDFS path does not exist, and it was not added
// to this table's partition list. Skip the partition.
if (partition == null) continue;
}
}
// Load the file metadata from scratch.
Timer.Context fileMetadataLdContext = getMetrics().getTimer(
HdfsTable.LOAD_DURATION_FILE_METADATA_ALL_PARTITIONS).time();
loadFileMetadataForPartitions(client, partitionMap_.values(), /*isRefresh=*/false);
fileMetadataLdContext.stop();
return clock.getTick() - startTime;
}
/**
* Loads valid txn list from HMS. Re-throws exceptions as CatalogException.
*/
private ValidTxnList loadValidTxns(IMetaStoreClient client) throws CatalogException {
try {
return MetastoreShim.getValidTxns(client);
} catch (TException exception) {
throw new CatalogException(exception.getMessage());
}
}
/**
* Helper method to load the block locations for each partition in 'parts'.
* New file descriptor lists are loaded and the partitions are updated in place.
*
* @param isRefresh whether this is a refresh operation or an initial load. This only
* affects logging.
*/
private void loadFileMetadataForPartitions(IMetaStoreClient client,
Iterable<HdfsPartition> parts, boolean isRefresh) throws CatalogException {
final Clock clock = Clock.defaultClock();
long startTime = clock.getTick();
// Group the partitions by their path (multiple partitions may point to the same
// path).
Map<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
for (HdfsPartition p : parts) {
Path partPath = FileSystemUtil.createFullyQualifiedPath(new Path(p.getLocation()));
partsByPath.computeIfAbsent(partPath, (path) -> new ArrayList<HdfsPartition>())
.add(p);
}
ValidWriteIdList writeIds = validWriteIds_ != null
? MetastoreShim.getValidWriteIdListFromString(validWriteIds_) : null;
//TODO: maybe it'd be better to load the valid txn list in the context of a
// transaction to have consistent valid write ids and valid transaction ids.
// Currently tables are loaded when they are first referenced and stay in catalog
// until certain actions occur (refresh, invalidate, insert, etc.). However,
// Impala doesn't notice when HMS's cleaner removes old transactional directories,
// which might lead to FileNotFound exceptions.
ValidTxnList validTxnList = writeIds != null ? loadValidTxns(client) : null;
// Create a FileMetadataLoader for each path.
Map<Path, FileMetadataLoader> loadersByPath = Maps.newHashMap();
for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath.entrySet()) {
List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
Utils.shouldRecursivelyListPartitions(this),
oldFds, hostIndex_, validTxnList, writeIds);
// If there is a cached partition mapped to this path, we recompute the block
// locations even if the underlying files have not changed.
// This is done to keep the cached block metadata up to date.
boolean hasCachedPartition = Iterables.any(e.getValue(),
HdfsPartition::isMarkedCached);
loader.setForceRefreshBlockLocations(hasCachedPartition);
loadersByPath.put(e.getKey(), loader);
}
String logPrefix = String.format(
"%s file and block metadata for %s paths for table %s",
isRefresh ? "Refreshing" : "Loading", partsByPath.size(),
getFullName());
FileSystem tableFs;
try {
tableFs = (new Path(getLocation())).getFileSystem(CONF);
} catch (IOException e) {
throw new CatalogException("Invalid table path for table: " + getFullName(), e);
}
// Actually load the partitions.
// TODO(IMPALA-8406): if this fails to load files from one or more partitions, then
// we'll throw an exception here and end up bailing out of whatever catalog operation
// we're in the middle of. This could cause a partial metadata update -- eg we may
// have refreshed the top-level table properties without refreshing the files.
new ParallelFileMetadataLoader(logPrefix, tableFs, loadersByPath.values())
.load();
// Store the loaded FDs into the partitions.
for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath.entrySet()) {
Path p = e.getKey();
FileMetadataLoader loader = loadersByPath.get(p);
for (HdfsPartition part : e.getValue()) {
part.setFileDescriptors(loader.getLoadedFds());
}
}
// TODO(todd): would be good to log a summary of the loading process:
// - how many block locations did we reuse/load individually/load via batch
// - how many partitions did we read metadata for
// - etc...
String partNames = Joiner.on(", ").join(
Iterables.limit(Iterables.transform(parts, HdfsPartition::getPartitionName), 3));
if (partsByPath.size() > 3) {
partNames += String.format(", and %s others",
Iterables.size(parts) - 3);
}
long duration = clock.getTick() - startTime;
LOG.info("Loaded file and block metadata for {} partitions: {}. Time taken: {}",
getFullName(), partNames, PrintUtils.printTimeNs(duration));
}
/**
* Gets the AccessLevel that is available for Impala for this table based on the
* permissions Impala has on the given path. If the path does not exist, recurses up
* the path until a existing parent directory is found, and inherit access permissions
* from that.
* Always returns READ_WRITE for S3 and ADLS files.
*/
private static TAccessLevel getAvailableAccessLevel(String tableName,
Path location, FsPermissionCache permCache) throws IOException {
Preconditions.checkNotNull(location);
FileSystem fs = location.getFileSystem(CONF);
// Avoid calling getPermissions() on file path for S3 files, as that makes a round
// trip to S3. Also, the S3A connector is currently unable to manage S3 permissions,
// so for now it is safe to assume that all files(objects) have READ_WRITE
// permissions, as that's what the S3A connector will always return too.
// TODO: Revisit if the S3A connector is updated to be able to manage S3 object
// permissions. (see HADOOP-13892)
if (FileSystemUtil.isS3AFileSystem(fs)) return TAccessLevel.READ_WRITE;
// The ADLS connector currently returns ACLs for files in ADLS, but can only map
// them to the ADLS client SPI and not the Hadoop users/groups, causing unexpected
// behavior. So ADLS ACLs are unsupported until the connector is able to map
// permissions to hadoop users/groups (HADOOP-14437).
if (FileSystemUtil.isADLFileSystem(fs)) return TAccessLevel.READ_WRITE;
if (FileSystemUtil.isABFSFileSystem(fs)) return TAccessLevel.READ_WRITE;
while (location != null) {
try {
FsPermissionChecker.Permissions perms = permCache.getPermissions(location);
if (perms.canReadAndWrite()) {
return TAccessLevel.READ_WRITE;
} else if (perms.canRead()) {
return TAccessLevel.READ_ONLY;
} else if (perms.canWrite()) {
return TAccessLevel.WRITE_ONLY;
}
return TAccessLevel.NONE;
} catch (FileNotFoundException e) {
location = location.getParent();
}
}
// Should never get here.
throw new NullPointerException("Error determining access level for table " +
tableName + ": no path ancestor exists for path: " + location);
}
/**
* Creates new HdfsPartition objects to be added to HdfsTable's partition list.
* Partitions may be empty, or may not even exist in the filesystem (a partition's
* location may have been changed to a new path that is about to be created by an
* INSERT). Also loads the file metadata for this partition. Returns new partition
* if successful or null if none was created.
*
* Throws CatalogException if one of the supplied storage descriptors contains metadata
* that Impala can't understand.
*/
public List<HdfsPartition> createAndLoadPartitions(IMetaStoreClient client,
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions)
throws CatalogException {
List<HdfsPartition> addedParts = new ArrayList<>();
FsPermissionCache permCache = preloadPermissionsCache(msPartitions);
for (org.apache.hadoop.hive.metastore.api.Partition partition: msPartitions) {
HdfsPartition hdfsPartition = createPartition(partition.getSd(), partition,
permCache);
Preconditions.checkNotNull(hdfsPartition);
addedParts.add(hdfsPartition);
}
loadFileMetadataForPartitions(client, addedParts, /* isRefresh = */ false);
return addedParts;
}
/**
* Creates a new HdfsPartition from a specified StorageDescriptor and an HMS partition
* object.
*/
private HdfsPartition createPartition(StorageDescriptor storageDescriptor,
org.apache.hadoop.hive.metastore.api.Partition msPartition,
FsPermissionCache permCache) throws CatalogException {
HdfsStorageDescriptor fileFormatDescriptor =
HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor);
List<LiteralExpr> keyValues;
if (msPartition != null) {
keyValues = FeCatalogUtils.parsePartitionKeyValues(this, msPartition.getValues());
} else {
keyValues = Collections.emptyList();
}
Path partDirPath = new Path(storageDescriptor.getLocation());
try {
if (msPartition != null) {
HdfsCachingUtil.validateCacheParams(msPartition.getParameters());
}
TAccessLevel accessLevel = getAvailableAccessLevel(getFullName(), partDirPath,
permCache);
HdfsPartition partition =
new HdfsPartition(this, msPartition, keyValues, fileFormatDescriptor,
new ArrayList<FileDescriptor>(), accessLevel);
partition.checkWellFormed();
// Set the partition's #rows.
if (msPartition != null && msPartition.getParameters() != null) {
partition.setNumRows(FeCatalogUtils.getRowCount(msPartition.getParameters()));
}
if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
// TODO: READ_ONLY isn't exactly correct because the it's possible the
// partition does not have READ permissions either. When we start checking
// whether we can READ from a table, this should be updated to set the
// table's access level to the "lowest" effective level across all
// partitions. That is, if one partition has READ_ONLY and another has
// WRITE_ONLY the table's access level should be NONE.
accessLevel_ = TAccessLevel.READ_ONLY;
}
return partition;
} catch (IOException e) {
throw new CatalogException("Error initializing partition", e);
}
}
/**
* Adds the partition to the HdfsTable. Throws a CatalogException if the partition
* already exists in this table.
*/
public void addPartition(HdfsPartition partition) throws CatalogException {
if (partitionMap_.containsKey(partition.getId())) {
throw new CatalogException(String.format("Partition %s already exists in table %s",
partition.getPartitionName(), getFullName()));
}
if (partition.getFileFormat() == HdfsFileFormat.AVRO) hasAvroData_ = true;
partitionMap_.put(partition.getId(), partition);
fileMetadataStats_.totalFileBytes += partition.getSize();
fileMetadataStats_.numFiles += partition.getNumFileDescriptors();
updatePartitionMdAndColStats(partition);
}
/**
* Updates the HdfsTable's partition metadata, i.e. adds the id to the HdfsTable and
* populates structures used for speeding up partition pruning/lookup. Also updates
* column stats.
*/
private void updatePartitionMdAndColStats(HdfsPartition partition) {
if (partition.getPartitionValues().size() != numClusteringCols_) return;
nameToPartitionMap_.put(partition.getPartitionName(), partition);
if (!isStoredInImpaladCatalogCache()) return;
for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
ColumnStats stats = getColumns().get(i).getStats();
LiteralExpr literal = partition.getPartitionValues().get(i);
// Store partitions with null partition values separately
if (Expr.IS_NULL_LITERAL.apply(literal)) {
stats.setNumNulls(stats.getNumNulls() + 1);
if (nullPartitionIds_.get(i).isEmpty()) {
stats.setNumDistinctValues(stats.getNumDistinctValues() + 1);
}
nullPartitionIds_.get(i).add(Long.valueOf(partition.getId()));
continue;
}
Set<Long> partitionIds = partitionValuesMap_.get(i).get(literal);
if (partitionIds == null) {
partitionIds = new HashSet<>();
partitionValuesMap_.get(i).put(literal, partitionIds);
stats.setNumDistinctValues(stats.getNumDistinctValues() + 1);
}
partitionIds.add(Long.valueOf(partition.getId()));
}
}
/**
* Drops the partition having the given partition spec from HdfsTable. Cleans up its
* metadata from all the mappings used to speed up partition pruning/lookup.
* Also updates partition column statistics. Given partitionSpec must match exactly
* one partition.
* Returns the HdfsPartition that was dropped. If the partition does not exist, returns
* null.
*/
public HdfsPartition dropPartition(List<TPartitionKeyValue> partitionSpec) {
return dropPartition(getPartitionFromThriftPartitionSpec(partitionSpec));
}
/**
* Drops a partition and updates partition column statistics. Returns the
* HdfsPartition that was dropped or null if the partition does not exist.
* If removeCacheDirective = true, any cache directive on the partition is removed.
*/
private HdfsPartition dropPartition(HdfsPartition partition,
boolean removeCacheDirective) {
if (partition == null) return null;
fileMetadataStats_.totalFileBytes -= partition.getSize();
fileMetadataStats_.numFiles -= partition.getNumFileDescriptors();
Preconditions.checkArgument(partition.getPartitionValues().size() ==
numClusteringCols_);
Long partitionId = partition.getId();
partitionMap_.remove(partitionId);
nameToPartitionMap_.remove(partition.getPartitionName());
if (removeCacheDirective && partition.isMarkedCached()) {
try {
HdfsCachingUtil.removePartitionCacheDirective(partition);
} catch (ImpalaException e) {
LOG.error("Unable to remove the cache directive on table " + getFullName() +
", partition " + partition.getPartitionName() + ": ", e);
}
}
if (!isStoredInImpaladCatalogCache()) return partition;
for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
ColumnStats stats = getColumns().get(i).getStats();
LiteralExpr literal = partition.getPartitionValues().get(i);
// Check if this is a null literal.
if (Expr.IS_NULL_LITERAL.apply(literal)) {
nullPartitionIds_.get(i).remove(partitionId);
stats.setNumNulls(stats.getNumNulls() - 1);
if (nullPartitionIds_.get(i).isEmpty()) {
stats.setNumDistinctValues(stats.getNumDistinctValues() - 1);
}
continue;
}
Set<Long> partitionIds = partitionValuesMap_.get(i).get(literal);
// If there are multiple partition ids corresponding to a literal, remove
// only this id. Otherwise, remove the <literal, id> pair.
if (partitionIds.size() > 1) partitionIds.remove(partitionId);
else {
partitionValuesMap_.get(i).remove(literal);
stats.setNumDistinctValues(stats.getNumDistinctValues() - 1);
}
}
return partition;
}
private HdfsPartition dropPartition(HdfsPartition partition) {
return dropPartition(partition, true);
}
/**
* Drops the given partitions from this table. Cleans up its metadata from all the
* mappings used to speed up partition pruning/lookup. Also updates partitions column
* statistics. Returns the list of partitions that were dropped.
*/
public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions,
boolean removeCacheDirective) {
List<HdfsPartition> droppedPartitions = new ArrayList<>();
for (HdfsPartition partition: partitions) {
HdfsPartition hdfsPartition = dropPartition(partition, removeCacheDirective);
if (hdfsPartition != null) droppedPartitions.add(hdfsPartition);
}
return droppedPartitions;
}
public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions) {
return dropPartitions(partitions, true);
}
/**
* Update the prototype partition used when creating new partitions for
* this table. New partitions will inherit storage properties from the
* provided descriptor.
*/
public void setPrototypePartition(StorageDescriptor storageDescriptor)
throws CatalogException {
HdfsStorageDescriptor hdfsStorageDescriptor =
HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor);
prototypePartition_ = HdfsPartition.prototypePartition(this, hdfsStorageDescriptor);
}
@Override
public void load(boolean reuseMetadata, IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
throws TableLoadingException {
load(reuseMetadata, client, msTbl, true, true, null, reason);
}
/**
* Loads table metadata from the Hive Metastore.
*
* If 'reuseMetadata' is false, performs a full metadata load from the Hive Metastore,
* including partition and file metadata. Otherwise, loads metadata incrementally and
* updates this HdfsTable in place so that it is in sync with the Hive Metastore.
*
* Depending on the operation that triggered the table metadata load, not all the
* metadata may need to be updated. If 'partitionsToUpdate' is not null, it specifies a
* list of partitions for which metadata should be updated. Otherwise, all partition
* metadata will be updated from the Hive Metastore.
*
* If 'loadParitionFileMetadata' is true, file metadata of the specified partitions
* are reloaded from scratch. If 'partitionsToUpdate' is not specified, file metadata
* of all the partitions are loaded.
*
* If 'loadTableSchema' is true, the table schema is loaded from the Hive Metastore.
*
* Existing file descriptors might be reused incorrectly if Hdfs rebalancer was
* executed, as it changes the block locations but doesn't update the mtime (file
* modification time).
* If this occurs, user has to execute "invalidate metadata" to invalidate the
* metadata cache of the table and trigger a fresh load.
*/
public void load(boolean reuseMetadata, IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl,
boolean loadParitionFileMetadata, boolean loadTableSchema,
Set<String> partitionsToUpdate, String reason) throws TableLoadingException {
final Timer.Context context =
getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
String annotation = String.format("%s metadata for %s%s partition(s) of %s.%s (%s)",
reuseMetadata ? "Reloading" : "Loading",
loadTableSchema ? "table definition and " : "",
partitionsToUpdate == null ? "all" : String.valueOf(partitionsToUpdate.size()),
msTbl.getDbName(), msTbl.getTableName(), reason);
LOG.info(annotation);
final Timer storageLdTimer =
getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA);
storageMetadataLoadTime_ = 0;
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
// turn all exceptions into TableLoadingException
msTable_ = msTbl;
try {
if (loadTableSchema) {
// set nullPartitionKeyValue from the hive conf.
nullPartitionKeyValue_ =
MetaStoreUtil.getNullPartitionKeyValue(client).intern();
loadSchema(msTbl);
loadAllColumnStats(client);
loadPkFkInfo(client, msTbl);
}
loadValidWriteIdList(client);
// Load partition and file metadata
if (reuseMetadata) {
// Incrementally update this table's partitions and file metadata
Preconditions.checkState(
partitionsToUpdate == null || loadParitionFileMetadata);
storageMetadataLoadTime_ += updateMdFromHmsTable(msTbl);
if (msTbl.getPartitionKeysSize() == 0) {
if (loadParitionFileMetadata) {
storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(client);
}
} else {
storageMetadataLoadTime_ += updatePartitionsFromHms(
client, partitionsToUpdate, loadParitionFileMetadata);
}
LOG.info("Incrementally loaded table metadata for: " + getFullName());
} else {
LOG.info("Fetching partition metadata from the Metastore: " + getFullName());
final Timer.Context allPartitionsLdContext =
getMetrics().getTimer(HdfsTable.LOAD_DURATION_ALL_PARTITIONS).time();
// Load all partitions from Hive Metastore, including file metadata.
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
MetaStoreUtil.fetchAllPartitions(
client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
LOG.info("Fetched partition metadata from the Metastore: " + getFullName());
storageMetadataLoadTime_ = loadAllPartitions(client, msPartitions, msTbl);
allPartitionsLdContext.stop();
}
if (loadTableSchema) setAvroSchema(client, msTbl);
setTableStats(msTbl);
fileMetadataStats_.unset();
refreshLastUsedTime();
} catch (TableLoadingException e) {
throw e;
} catch (Exception e) {
throw new TableLoadingException("Failed to load metadata for table: "
+ getFullName(), e);
}
} finally {
storageLdTimer.update(storageMetadataLoadTime_, TimeUnit.NANOSECONDS);
long load_time_duration = context.stop();
if (load_time_duration > LOADING_WARNING_TIME_NS) {
LOG.warn("Time taken on loading table " + getFullName() + " exceeded " +
"warning threshold. Time: " + PrintUtils.printTimeNs(load_time_duration));
}
updateTableLoadingTime();
}
}
/**
* Load Primary Key and Foreign Key information for table. Throws TableLoadingException
* if the load fails.
*/
private void loadPkFkInfo(IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException{
try {
// Reset and add primary keys info and foreign keys info.
primaryKeys_.clear();
foreignKeys_.clear();
primaryKeys_.addAll(client.getPrimaryKeys(
new PrimaryKeysRequest(msTbl.getDbName(), msTbl.getTableName())));
foreignKeys_.addAll(client.getForeignKeys(new ForeignKeysRequest(null, null,
msTbl.getDbName(), msTbl.getTableName())));
} catch (Exception e) {
throw new TableLoadingException("Failed to load primary keys/foreign keys for "
+ "table: " + getFullName(), e);
}
}
/**
* Updates the table metadata, including 'hdfsBaseDir_', 'isMarkedCached_',
* and 'accessLevel_' from 'msTbl'. Returns time spent accessing file system
* in nanoseconds. Throws an IOException if there was an error accessing
* the table location path.
*/
private long updateMdFromHmsTable(org.apache.hadoop.hive.metastore.api.Table msTbl)
throws IOException {
Preconditions.checkNotNull(msTbl);
final Clock clock = Clock.defaultClock();
long filesystemAccessTime = 0;
long startTime = clock.getTick();
hdfsBaseDir_ = msTbl.getSd().getLocation();
isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters());
Path location = new Path(hdfsBaseDir_);
accessLevel_ = getAvailableAccessLevel(getFullName(), location,
new FsPermissionCache());
filesystemAccessTime = clock.getTick() - startTime;
setMetaStoreTable(msTbl);
return filesystemAccessTime;
}
/**
* Incrementally updates the file metadata of an unpartitioned HdfsTable.
* Returns time spent updating the file metadata in nanoseconds.
*
* This is optimized for the case where few files have changed. See
* {@link #refreshFileMetadata(Path, List)} above for details.
*/
private long updateUnpartitionedTableFileMd(IMetaStoreClient client)
throws CatalogException {
Preconditions.checkState(getNumClusteringCols() == 0);
if (LOG.isTraceEnabled()) {
LOG.trace("update unpartitioned table: " + getFullName());
}
final Clock clock = Clock.defaultClock();
long startTime = clock.getTick();
HdfsPartition oldPartition = Iterables.getOnlyElement(partitionMap_.values());
// Instead of updating the existing partition in place, we create a new one
// so that we reflect any changes in the msTbl object and also assign a new
// ID. This is one step towards eventually implementing IMPALA-7533.
resetPartitions();
org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
Preconditions.checkNotNull(msTbl);
setPrototypePartition(msTbl.getSd());
HdfsPartition part = createPartition(msTbl.getSd(), null, new FsPermissionCache());
// Copy over the FDs from the old partition to the new one, so that
// 'refreshPartitionFileMetadata' below can compare modification times and
// reload the locations only for those that changed.
part.setFileDescriptors(oldPartition.getFileDescriptors());
addPartition(part);
if (isMarkedCached_) part.markCached();
loadFileMetadataForPartitions(client, ImmutableList.of(part), /*isRefresh=*/true);
return clock.getTick() - startTime;
}
/**
* Updates the partitions of an HdfsTable so that they are in sync with the
* Hive Metastore. It reloads partitions that were marked 'dirty' by doing a
* DROP + CREATE. It removes from this table partitions that no longer exist
* in the Hive Metastore and adds partitions that were added externally (e.g.
* using Hive) to the Hive Metastore but do not exist in this table. If
* 'loadParitionFileMetadata' is true, it triggers file/block metadata reload
* for the partitions specified in 'partitionsToUpdate', if any, or for all
* the table partitions if 'partitionsToUpdate' is null. Returns time
* spent loading file metadata in nanoseconds.
*/
private long updatePartitionsFromHms(IMetaStoreClient client,
Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata)
throws Exception {
if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName());
org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
Preconditions.checkNotNull(msTbl);
Preconditions.checkState(msTbl.getPartitionKeysSize() != 0);
Preconditions.checkState(loadPartitionFileMetadata || partitionsToUpdate == null);
// Retrieve all the partition names from the Hive Metastore. We need this to
// identify the delta between partitions of the local HdfsTable and the table entry
// in the Hive Metastore. Note: This is a relatively "cheap" operation
// (~.3 secs for 30K partitions).
Set<String> msPartitionNames = new HashSet<>();
msPartitionNames.addAll(client.listPartitionNames(db_.getName(), name_, (short) -1));
// Names of loaded partitions in this table
Set<String> partitionNames = new HashSet<>();
// Partitions for which file metadata must be loaded
List<HdfsPartition> partitionsToLoadFiles = Lists.newArrayList();
// Partitions that need to be dropped and recreated from scratch
List<HdfsPartition> dirtyPartitions = new ArrayList<>();
// Partitions removed from the Hive Metastore.
List<HdfsPartition> removedPartitions = new ArrayList<>();
// Identify dirty partitions that need to be loaded from the Hive Metastore and
// partitions that no longer exist in the Hive Metastore.
for (HdfsPartition partition: partitionMap_.values()) {
// Remove partitions that don't exist in the Hive Metastore. These are partitions
// that were removed from HMS using some external process, e.g. Hive.
if (!msPartitionNames.contains(partition.getPartitionName())) {
removedPartitions.add(partition);
}
if (partition.isDirty()) {
// Dirty partitions are updated by removing them from table's partition
// list and loading them from the Hive Metastore.
dirtyPartitions.add(partition);
} else {
if (partitionsToUpdate == null && loadPartitionFileMetadata) {
partitionsToLoadFiles.add(partition);
}
}
Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor());
partitionNames.add(partition.getPartitionName());
}
dropPartitions(removedPartitions);
// dirtyPartitions are reloaded and hence cache directives are not dropped.
dropPartitions(dirtyPartitions, false);
// Load dirty partitions from Hive Metastore
// TODO(todd): the logic around "dirty partitions" is highly suspicious.
loadPartitionsFromMetastore(dirtyPartitions, client);
// Identify and load partitions that were added in the Hive Metastore but don't
// exist in this table.
Set<String> newPartitionsInHms = Sets.difference(msPartitionNames, partitionNames);
loadPartitionsFromMetastore(newPartitionsInHms, client);
// If a list of modified partitions (old and new) is specified, don't reload file
// metadata for the new ones as they have already been detected in HMS and have been
// reloaded by loadPartitionsFromMetastore().
if (partitionsToUpdate != null) {
partitionsToUpdate.removeAll(newPartitionsInHms);
}
// Load file metadata. Until we have a notification mechanism for when a
// file changes in hdfs, it is sometimes required to reload all the file
// descriptors and block metadata of a table (e.g. REFRESH statement).
long fileLoadMdTime = 0;
if (loadPartitionFileMetadata) {
final Clock clock = Clock.defaultClock();
long startTime = clock.getTick();
if (partitionsToUpdate != null) {
Preconditions.checkState(partitionsToLoadFiles.isEmpty());
// Only reload file metadata of partitions specified in 'partitionsToUpdate'
partitionsToLoadFiles = getPartitionsForNames(partitionsToUpdate);
}
loadFileMetadataForPartitions(client, partitionsToLoadFiles, /* isRefresh=*/true);
fileLoadMdTime = clock.getTick() - startTime;
}
return fileLoadMdTime;
}
/**
* Given a set of partition names, returns the corresponding HdfsPartition
* objects.
*/
public List<HdfsPartition> getPartitionsForNames(Collection<String> partitionNames) {
List<HdfsPartition> parts = Lists.newArrayListWithCapacity(partitionNames.size());
for (String partitionName: partitionNames) {
String partName = DEFAULT_PARTITION_NAME;
if (partitionName.length() > 0) {
// Trim the last trailing char '/' from each partition name
partName = partitionName.substring(0, partitionName.length()-1);
}
HdfsPartition partition = nameToPartitionMap_.get(partName);
Preconditions.checkNotNull(partition, "Invalid partition name: " + partName);
parts.add(partition);
}
return parts;
}
@Override
public void setTableStats(org.apache.hadoop.hive.metastore.api.Table msTbl) {
super.setTableStats(msTbl);
// For unpartitioned tables set the numRows in its single partition
// to the table's numRows.
if (numClusteringCols_ == 0 && !partitionMap_.isEmpty()) {
// Unpartitioned tables have a default partition.
Preconditions.checkState(partitionMap_.size() == 1);
for (HdfsPartition p: partitionMap_.values()) {
p.setNumRows(getNumRows());
}
}
}
/**
* Sets avroSchema_ if the table or any of the partitions in the table are stored
* as Avro. Additionally, this method also reconciles the schema if the column
* definitions from the metastore differ from the Avro schema.
*/
private void setAvroSchema(IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
Preconditions.checkState(isSchemaLoaded_);
String inputFormat = msTbl.getSd().getInputFormat();
if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO
|| hasAvroData_) {
// Look for Avro schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter
// taking precedence.
List<Map<String, String>> schemaSearchLocations = new ArrayList<>();
schemaSearchLocations.add(
getMetaStoreTable().getSd().getSerdeInfo().getParameters());
schemaSearchLocations.add(getMetaStoreTable().getParameters());
avroSchema_ = AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
if (avroSchema_ == null) {
// No Avro schema was explicitly set in the table metadata, so infer the Avro
// schema from the column definitions.
Schema inferredSchema = AvroSchemaConverter.convertFieldSchemas(
msTbl.getSd().getCols(), getFullName());
avroSchema_ = inferredSchema.toString();
// NOTE: below we reconcile this inferred schema back into the table
// schema in the case of Avro-formatted tables. This has the side effect
// of promoting types like TINYINT to INT.
}
String serdeLib = msTbl.getSd().getSerdeInfo().getSerializationLib();
if (serdeLib == null ||
serdeLib.equals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) {
// If the SerDe library is null or set to LazySimpleSerDe or is null, it
// indicates there is an issue with the table metadata since Avro table need a
// non-native serde. Instead of failing to load the table, fall back to
// using the fields from the storage descriptor (same as Hive).
return;
} else {
List<FieldSchema> reconciledFieldSchemas = AvroSchemaUtils.reconcileAvroSchema(
msTbl, avroSchema_);
// Reset and update nonPartFieldSchemas_ to the reconciled colDefs.
nonPartFieldSchemas_.clear();
nonPartFieldSchemas_.addAll(reconciledFieldSchemas);
// Update the columns as per the reconciled colDefs and re-load stats.
clearColumns();
addColumnsFromFieldSchemas(msTbl.getPartitionKeys());
addColumnsFromFieldSchemas(nonPartFieldSchemas_);
loadAllColumnStats(client);
}
}
}
/**
* Loads table schema.
*/
private void loadSchema(org.apache.hadoop.hive.metastore.api.Table msTbl)
throws TableLoadingException {
nonPartFieldSchemas_.clear();
// set NULL indicator string from table properties
nullColumnValue_ =
msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT);
if (nullColumnValue_ == null) nullColumnValue_ = FeFsTable.DEFAULT_NULL_COLUMN_VALUE;
// Excludes partition columns.
nonPartFieldSchemas_.addAll(msTbl.getSd().getCols());
// The number of clustering columns is the number of partition keys.
numClusteringCols_ = msTbl.getPartitionKeys().size();
partitionLocationCompressor_.setClusteringColumns(numClusteringCols_);
clearColumns();
// Add all columns to the table. Ordering is important: partition columns first,
// then all other columns.
addColumnsFromFieldSchemas(msTbl.getPartitionKeys());
addColumnsFromFieldSchemas(nonPartFieldSchemas_);
isSchemaLoaded_ = true;
}
/**
* Loads partitions from the Hive Metastore and adds them to the internal list of
* table partitions.
*/
private void loadPartitionsFromMetastore(List<HdfsPartition> partitions,
IMetaStoreClient client) throws Exception {
Preconditions.checkNotNull(partitions);
if (partitions.isEmpty()) return;
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Incrementally updating %d/%d partitions.",
partitions.size(), partitionMap_.size()));
}
Set<String> partitionNames = new HashSet<>();
for (HdfsPartition part: partitions) {
partitionNames.add(part.getPartitionName());
}
loadPartitionsFromMetastore(partitionNames, client);
}
/**
* Loads from the Hive Metastore the partitions that correspond to the specified
* 'partitionNames' and adds them to the internal list of table partitions.
*/
private void loadPartitionsFromMetastore(Set<String> partitionNames,
IMetaStoreClient client) throws Exception {
Preconditions.checkNotNull(partitionNames);
if (partitionNames.isEmpty()) return;
// Load partition metadata from Hive Metastore.
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
new ArrayList<>();
msPartitions.addAll(MetaStoreUtil.fetchPartitionsByName(client,
Lists.newArrayList(partitionNames), db_.getName(), name_));
FsPermissionCache permCache = preloadPermissionsCache(msPartitions);
List<HdfsPartition> partitions = new ArrayList<>(msPartitions.size());
for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
HdfsPartition partition = createPartition(msPartition.getSd(), msPartition,
permCache);
// If the partition is null, its HDFS path does not exist, and it was not added to
// this table's partition list. Skip the partition.
if (partition == null) continue;
partitions.add(partition);
}
loadFileMetadataForPartitions(client, partitions, /* isRefresh=*/false);
for (HdfsPartition partition : partitions) addPartition(partition);
}
/**
* For each of the partitions in 'msPartitions' with a location inside the table's
* base directory, attempt to pre-cache the associated file permissions into the
* returned cache. This takes advantage of the fact that many partition directories will
* be in the same parent directories, and we can bulk fetch the permissions with a
* single round trip to the filesystem instead of individually looking up each.
*/
private FsPermissionCache preloadPermissionsCache(List<Partition> msPartitions) {
FsPermissionCache permCache = new FsPermissionCache();
// Only preload permissions if the number of partitions to be added is
// large (3x) relative to the number of existing partitions. This covers
// two common cases:
//
// 1) initial load of a table (no existing partition metadata)
// 2) ALTER TABLE RECOVER PARTITIONS after creating a table pointing to
// an already-existing partition directory tree
//
// Without this heuristic, we would end up using a "listStatus" call to
// potentially fetch a bunch of irrelevant information about existing
// partitions when we only want to know about a small number of newly-added
// partitions.
if (msPartitions.size() < partitionMap_.size() * 3) return permCache;
// TODO(todd): when HDFS-13616 (batch listing of multiple directories)
// is implemented, we could likely implement this with a single round
// trip.
Multiset<Path> parentPaths = HashMultiset.create();
for (Partition p : msPartitions) {
// We only do this optimization for partitions which are within the table's base
// directory. Otherwise we risk a case where a user has specified an external
// partition location that is in a directory containing a high number of irrelevant
// files, and we'll potentially regress performance compared to just looking up
// the partition file directly.
String loc = p.getSd().getLocation();
if (!loc.startsWith(hdfsBaseDir_)) continue;
Path parent = new Path(loc).getParent();
if (parent == null) continue;
parentPaths.add(parent);
}
// For any paths that contain more than one partition, issue a listStatus
// and pre-cache the resulting permissions.
for (Multiset.Entry<Path> entry : parentPaths.entrySet()) {
if (entry.getCount() == 1) continue;
Path p = entry.getElement();
try {
FileSystem fs = p.getFileSystem(CONF);
permCache.precacheChildrenOf(fs, p);
} catch (IOException ioe) {
// If we fail to pre-warm the cache we'll just wait for later when we
// try to actually load the individual permissions, at which point
// we can handle the issue accordingly.
LOG.debug("Unable to bulk-load permissions for parent path: " + p, ioe);
}
}
return permCache;
}
@Override
protected List<String> getColumnNamesWithHmsStats() {
List<String> ret = new ArrayList<>();
// Only non-partition columns have column stats in the HMS.
for (Column column: getColumns().subList(numClusteringCols_, getColumns().size())) {
ret.add(column.getName().toLowerCase());
}
return ret;
}
@Override
protected synchronized void loadFromThrift(TTable thriftTable)
throws TableLoadingException {
super.loadFromThrift(thriftTable);
THdfsTable hdfsTable = thriftTable.getHdfs_table();
partitionLocationCompressor_ = new HdfsPartitionLocationCompressor(
numClusteringCols_, hdfsTable.getPartition_prefixes());
hdfsBaseDir_ = hdfsTable.getHdfsBaseDir();
nullColumnValue_ = hdfsTable.nullColumnValue;
nullPartitionKeyValue_ = hdfsTable.nullPartitionKeyValue;
hostIndex_.populate(hdfsTable.getNetwork_addresses());
primaryKeys_.clear();
primaryKeys_.addAll(hdfsTable.getPrimary_keys());
foreignKeys_.clear();
foreignKeys_.addAll(hdfsTable.getForeign_keys());
resetPartitions();
try {
for (Map.Entry<Long, THdfsPartition> part: hdfsTable.getPartitions().entrySet()) {
HdfsPartition hdfsPart =
HdfsPartition.fromThrift(this, part.getKey(), part.getValue());
addPartition(hdfsPart);
}
prototypePartition_ = HdfsPartition.fromThrift(this,
CatalogObjectsConstants.PROTOTYPE_PARTITION_ID,
hdfsTable.prototype_partition);
} catch (CatalogException e) {
throw new TableLoadingException(e.getMessage());
}
avroSchema_ = hdfsTable.isSetAvroSchema() ? hdfsTable.getAvroSchema() : null;
isMarkedCached_ =
HdfsCachingUtil.validateCacheParams(getMetaStoreTable().getParameters());
}
@Override
public TTableDescriptor toThriftDescriptor(int tableId,
Set<Long> referencedPartitions) {
// Create thrift descriptors to send to the BE. The BE does not
// need any information below the THdfsPartition level.
TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
getTColumnDescriptors(), numClusteringCols_, name_, db_.getName());
tableDesc.setHdfsTable(getTHdfsTable(ThriftObjectType.DESCRIPTOR_ONLY,
referencedPartitions));
return tableDesc;
}
@Override
public TTable toThrift() {
// Send all metadata between the catalog service and the FE.
TTable table = super.toThrift();
table.setTable_type(TTableType.HDFS_TABLE);
table.setHdfs_table(getTHdfsTable(ThriftObjectType.FULL, null));
return table;
}
@Override
public TGetPartialCatalogObjectResponse getPartialInfo(
TGetPartialCatalogObjectRequest req) throws TableLoadingException {
TGetPartialCatalogObjectResponse resp = super.getPartialInfo(req);
boolean wantPartitionInfo = req.table_info_selector.want_partition_files ||
req.table_info_selector.want_partition_metadata ||
req.table_info_selector.want_partition_names ||
req.table_info_selector.want_partition_stats;
Collection<Long> partIds = req.table_info_selector.partition_ids;
if (partIds == null && wantPartitionInfo) {
// Caller specified at least one piece of partition info but didn't specify
// any partition IDs. That means they want the info for all partitions.
partIds = partitionMap_.keySet();
}
if (partIds != null) {
resp.table_info.partitions = Lists.newArrayListWithCapacity(partIds.size());
for (long partId : partIds) {
HdfsPartition part = partitionMap_.get(partId);
if (part == null) {
LOG.warn(String.format("Missing partition ID: %s, Table: %s", partId,
getFullName()));
return new TGetPartialCatalogObjectResponse().setLookup_status(
CatalogLookupStatus.PARTITION_NOT_FOUND);
}
TPartialPartitionInfo partInfo = new TPartialPartitionInfo(partId);
if (req.table_info_selector.want_partition_names) {
partInfo.setName(part.getPartitionName());
}
if (req.table_info_selector.want_partition_metadata) {
partInfo.hms_partition = part.toHmsPartition();
partInfo.setHas_incremental_stats(part.hasIncrementalStats());
}
if (req.table_info_selector.want_partition_files) {
List<FileDescriptor> fds = part.getFileDescriptors();
partInfo.file_descriptors = Lists.newArrayListWithCapacity(fds.size());
for (FileDescriptor fd: fds) {
partInfo.file_descriptors.add(fd.toThrift());
}
}
if (req.table_info_selector.want_partition_stats) {
partInfo.setPartition_stats(part.getPartitionStatsCompressed());
}
resp.table_info.partitions.add(partInfo);
}
}
if (req.table_info_selector.want_partition_files) {
// TODO(todd) we are sending the whole host index even if we returned only
// one file -- maybe not so efficient, but the alternative is to do a bunch
// of cloning of file descriptors which might increase memory pressure.
resp.table_info.setNetwork_addresses(hostIndex_.getList());
}
return resp;
}
/**
* Create a THdfsTable corresponding to this HdfsTable. If serializing the "FULL"
* information, then then all partitions and THdfsFileDescs of each partition should be
* included. Otherwise, don't include any THdfsFileDescs, and include only those
* partitions in the refPartitions set (the backend doesn't need metadata for
* unreferenced partitions). In addition, metadata that is not used by the backend will
* be omitted.
*
* To prevent the catalog from hitting an OOM error while trying to
* serialize large partition incremental stats, we estimate the stats size and filter
* the incremental stats data from partition objects if the estimate exceeds
* --inc_stats_size_limit_bytes. This function also collects storage related statistics
* (e.g. number of blocks, files, etc) in order to compute an estimate of the metadata
* size of this table.
*/
private THdfsTable getTHdfsTable(ThriftObjectType type, Set<Long> refPartitions) {
if (type == ThriftObjectType.FULL) {
// "full" implies all partitions should be included.
Preconditions.checkArgument(refPartitions == null);
}
long memUsageEstimate = 0;
int numPartitions =
(refPartitions == null) ? partitionMap_.values().size() : refPartitions.size();
memUsageEstimate += numPartitions * PER_PARTITION_MEM_USAGE_BYTES;
FileMetadataStats stats = new FileMetadataStats();
Map<Long, THdfsPartition> idToPartition = new HashMap<>();
for (HdfsPartition partition: partitionMap_.values()) {
long id = partition.getId();
if (refPartitions == null || refPartitions.contains(id)) {
THdfsPartition tHdfsPartition = FeCatalogUtils.fsPartitionToThrift(
partition, type);
if (partition.hasIncrementalStats()) {
memUsageEstimate += getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
hasIncrementalStats_ = true;
}
if (type == ThriftObjectType.FULL) {
Preconditions.checkState(tHdfsPartition.isSetNum_blocks() &&
tHdfsPartition.isSetTotal_file_size_bytes());
stats.numBlocks += tHdfsPartition.getNum_blocks();
stats.numFiles +=
tHdfsPartition.isSetFile_desc() ? tHdfsPartition.getFile_desc().size() : 0;
stats.totalFileBytes += tHdfsPartition.getTotal_file_size_bytes();
}
idToPartition.put(id, tHdfsPartition);
}
}
if (type == ThriftObjectType.FULL) fileMetadataStats_.set(stats);
THdfsPartition prototypePartition = FeCatalogUtils.fsPartitionToThrift(
prototypePartition_, ThriftObjectType.DESCRIPTOR_ONLY);
memUsageEstimate += fileMetadataStats_.numFiles * PER_FD_MEM_USAGE_BYTES +
fileMetadataStats_.numBlocks * PER_BLOCK_MEM_USAGE_BYTES;
setEstimatedMetadataSize(memUsageEstimate);
setNumFiles(fileMetadataStats_.numFiles);
THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
nullPartitionKeyValue_, nullColumnValue_, idToPartition, prototypePartition);
hdfsTable.setAvroSchema(avroSchema_);
hdfsTable.setPrimary_keys(primaryKeys_);
hdfsTable.setForeign_keys(foreignKeys_);
if (type == ThriftObjectType.FULL) {
// Network addresses are used only by THdfsFileBlocks which are inside
// THdfsFileDesc, so include network addreses only when including THdfsFileDesc.
hdfsTable.setNetwork_addresses(hostIndex_.getList());
}
hdfsTable.setPartition_prefixes(partitionLocationCompressor_.getPrefixes());
return hdfsTable;
}
@Override // FeFsTable
public long getTotalHdfsBytes() { return fileMetadataStats_.totalFileBytes; }
@Override // FeFsTable
public String getHdfsBaseDir() { return hdfsBaseDir_; }
public Path getHdfsBaseDirPath() {
Preconditions.checkNotNull(hdfsBaseDir_, "HdfsTable base dir is null");
return new Path(hdfsBaseDir_);
}
@Override // FeFsTable
public boolean usesAvroSchemaOverride() { return avroSchema_ != null; }
@Override // FeFsTable
public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; }
@Override
public List<SQLPrimaryKey> getPrimaryKeys() {
return ImmutableList.copyOf(primaryKeys_);
}
@Override
public List<SQLForeignKey> getForeignKeys() {
return ImmutableList.copyOf(foreignKeys_);
}
/**
* Get primary keys column names, useful for toSqlUtils.
*/
public List<String> getPrimaryKeysSql() {
List<String> primaryKeyColNames = new ArrayList<>();
if (getPrimaryKeys() != null && !getPrimaryKeys().isEmpty()) {
getPrimaryKeys().stream().forEach(p -> primaryKeyColNames.add(p.getColumn_name()));
}
return primaryKeyColNames;
}
/**
* Get foreign keys information as strings. Useful for toSqlUtils.
* @return List of strings of the form "(col1, col2,..) REFERENCES [pk_db].pk_table
* (colA, colB,..)".
*/
public List<String> getForeignKeysSql() {
List<String> foreignKeysSql = new ArrayList<>();
// Iterate through foreign keys list. This list may contain multiple foreign keys
// and each foreign key may contain multiple columns. The outer loop collects
// information common to a foreign key (pk table information). The inner
// loop collects column information.
List<SQLForeignKey> foreignKeys = getForeignKeys();
for (int i = 0; i < foreignKeys.size(); i++) {
String pkTableDb = foreignKeys.get(i).getPktable_db();
String pkTableName = foreignKeys.get(i).getPktable_name();
List<String> pkList = new ArrayList<>();
List<String> fkList = new ArrayList<>();
StringBuilder sb = new StringBuilder();
sb.append("(");
for (; i<foreignKeys.size(); i++) {
fkList.add(foreignKeys.get(i).getFkcolumn_name());
pkList.add(foreignKeys.get(i).getPkcolumn_name());
// Bail out of inner loop if the key_seq of the next ForeignKey is 1.
if (i + 1 < foreignKeys.size() && foreignKeys.get(i + 1).getKey_seq() == 1) {
break;
}
}
Joiner.on(", ").appendTo(sb, fkList).append(") ");
sb.append("REFERENCES ");
if (pkTableDb != null) sb.append(pkTableDb + ".");
sb.append(pkTableName + "(");
Joiner.on(", ").appendTo(sb, pkList).append(")");
foreignKeysSql.add(sb.toString());
}
return foreignKeysSql;
}
/**
* Returns the set of file formats that the partitions are stored in.
*/
@Override
public Set<HdfsFileFormat> getFileFormats() {
// In the case that we have no partitions added to the table yet, it's
// important to add the "prototype" partition as a fallback.
Iterable<HdfsPartition> partitionsToConsider = Iterables.concat(
partitionMap_.values(), Collections.singleton(prototypePartition_));
return FeCatalogUtils.getFileFormats(partitionsToConsider);
}
/**
* Returns the HDFS paths corresponding to HdfsTable partitions that don't exist in
* the Hive Metastore. An HDFS path is represented as a list of strings values, one per
* partition key column.
*/
public List<List<String>> getPathsWithoutPartitions() throws CatalogException {
Set<List<LiteralExpr>> existingPartitions = new HashSet<>();
// Get the list of partition values of existing partitions in Hive Metastore.
for (HdfsPartition partition: partitionMap_.values()) {
existingPartitions.add(partition.getPartitionValues());
}
List<String> partitionKeys = new ArrayList<>();
for (int i = 0; i < numClusteringCols_; ++i) {
partitionKeys.add(getColumns().get(i).getName());
}
Path basePath = new Path(hdfsBaseDir_);
List<List<String>> partitionsNotInHms = new ArrayList<>();
try {
getAllPartitionsNotInHms(basePath, partitionKeys, existingPartitions,
partitionsNotInHms);
} catch (Exception e) {
throw new CatalogException(String.format("Failed to recover partitions for %s " +
"with exception:%s.", getFullName(), e));
}
return partitionsNotInHms;
}
/**
* Returns all partitions which match the partition keys directory structure and pass
* type compatibility check. Also these partitions are not already part of the table.
*/
private void getAllPartitionsNotInHms(Path path, List<String> partitionKeys,
Set<List<LiteralExpr>> existingPartitions,
List<List<String>> partitionsNotInHms) throws IOException {
FileSystem fs = path.getFileSystem(CONF);
List<String> partitionValues = new ArrayList<>();
List<LiteralExpr> partitionExprs = new ArrayList<>();
getAllPartitionsNotInHms(path, partitionKeys, 0, fs, partitionValues,
partitionExprs, existingPartitions, partitionsNotInHms);
}
/**
* Returns all partitions which match the partition keys directory structure and pass
* the type compatibility check.
*
* path e.g. c1=1/c2=2/c3=3
* partitionKeys The ordered partition keys. e.g.("c1", "c2", "c3")
* depth. The start position in partitionKeys to match the path name.
* partitionValues The partition values used to create a partition.
* partitionExprs The list of LiteralExprs which is used to avoid duplicate partitions.
* E.g. Having /c1=0001 and /c1=01, we should make sure only one partition
* will be added.
* existingPartitions All partitions which exist in Hive Metastore or newly added.
* partitionsNotInHms Contains all the recovered partitions.
*/
private void getAllPartitionsNotInHms(Path path, List<String> partitionKeys,
int depth, FileSystem fs, List<String> partitionValues,
List<LiteralExpr> partitionExprs, Set<List<LiteralExpr>> existingPartitions,
List<List<String>> partitionsNotInHms) throws IOException {
if (depth == partitionKeys.size()) {
if (existingPartitions.contains(partitionExprs)) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Skip recovery of path '%s' because it already "
+ "exists in metastore", path.toString()));
}
} else {
partitionsNotInHms.add(partitionValues);
existingPartitions.add(partitionExprs);
}
return;
}
RemoteIterator <? extends FileStatus> statuses = fs.listStatusIterator(path);
if (statuses == null) return;
while (statuses.hasNext()) {
FileStatus status = statuses.next();
if (!status.isDirectory()) continue;
Pair<String, LiteralExpr> keyValues =
getTypeCompatibleValue(status.getPath(), partitionKeys.get(depth));
if (keyValues == null) continue;
List<String> currentPartitionValues = Lists.newArrayList(partitionValues);
List<LiteralExpr> currentPartitionExprs = Lists.newArrayList(partitionExprs);
currentPartitionValues.add(keyValues.first);
currentPartitionExprs.add(keyValues.second);
getAllPartitionsNotInHms(status.getPath(), partitionKeys, depth + 1, fs,
currentPartitionValues, currentPartitionExprs,
existingPartitions, partitionsNotInHms);
}
}
/**
* Checks that the last component of 'path' is of the form "<partitionkey>=<v>"
* where 'v' is a type-compatible value from the domain of the 'partitionKey' column.
* If not, returns null, otherwise returns a Pair instance, the first element is the
* original value, the second element is the LiteralExpr created from the original
* value.
*/
private Pair<String, LiteralExpr> getTypeCompatibleValue(Path path,
String partitionKey) throws UnsupportedEncodingException {
String partName[] = path.getName().split("=");
if (partName.length != 2 || !partName[0].equals(partitionKey)) return null;
// Check Type compatibility for Partition value.
Column column = getColumn(partName[0]);
Preconditions.checkNotNull(column);
Type type = column.getType();
LiteralExpr expr = null;
// URL decode the partition value since it may contain encoded URL.
String value = URLDecoder.decode(partName[1], StandardCharsets.UTF_8.name());
if (!value.equals(getNullPartitionKeyValue())) {
try {
expr = LiteralExpr.create(value, type);
// Skip large value which exceeds the MAX VALUE of specified Type.
if (expr instanceof NumericLiteral) {
if (NumericLiteral.isOverflow(((NumericLiteral) expr).getValue(), type)) {
LOG.warn(String.format("Skip the overflow value (%s) for Type (%s).",
value, type.toSql()));
return null;
}
}
} catch (Exception ex) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Invalid partition value (%s) for Type (%s).",
value, type.toSql()));
}
return null;
}
} else {
expr = new NullLiteral();
}
return new Pair<String, LiteralExpr>(value, expr);
}
@Override // FeFsTable
public TResultSet getTableStats() {
return getTableStats(this);
}
@Override
public FileSystemUtil.FsType getFsType() {
Preconditions.checkNotNull(getHdfsBaseDirPath().toUri().getScheme(),
"Cannot get scheme from path " + getHdfsBaseDirPath());
return FileSystemUtil.FsType.getFsType(getHdfsBaseDirPath().toUri().getScheme());
}
// TODO(todd): move to FeCatalogUtils. Upon moving to Java 8, could be
// a default method of FeFsTable.
public static TResultSet getTableStats(FeFsTable table) {
TResultSet result = new TResultSet();
TResultSetMetadata resultSchema = new TResultSetMetadata();
result.setSchema(resultSchema);
for (int i = 0; i < table.getNumClusteringCols(); ++i) {
// Add the partition-key values as strings for simplicity.
Column partCol = table.getColumns().get(i);
TColumn colDesc = new TColumn(partCol.getName(), Type.STRING.toThrift());
resultSchema.addToColumns(colDesc);
}
boolean statsExtrap = Utils.isStatsExtrapolationEnabled(table);
resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
if (statsExtrap) {
resultSchema.addToColumns(new TColumn("Extrap #Rows", Type.BIGINT.toThrift()));
}
resultSchema.addToColumns(new TColumn("#Files", Type.BIGINT.toThrift()));
resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Bytes Cached", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Cache Replication", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Format", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Incremental stats", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Location", Type.STRING.toThrift()));
// Pretty print partitions and their stats.
List<FeFsPartition> orderedPartitions = new ArrayList<>(
FeCatalogUtils.loadAllPartitions(table));
Collections.sort(orderedPartitions, HdfsPartition.KV_COMPARATOR);
long totalCachedBytes = 0L;
long totalBytes = 0L;
long totalNumFiles = 0L;
for (FeFsPartition p: orderedPartitions) {
int numFiles = p.getFileDescriptors().size();
long size = p.getSize();
totalNumFiles += numFiles;
totalBytes += size;
TResultRowBuilder rowBuilder = new TResultRowBuilder();
// Add the partition-key values (as strings for simplicity).
for (LiteralExpr expr: p.getPartitionValues()) {
rowBuilder.add(expr.getStringValue());
}
// Add rows, extrapolated rows, files, bytes, cache stats, and file format.
rowBuilder.add(p.getNumRows());
// Compute and report the extrapolated row count because the set of files could
// have changed since we last computed stats for this partition. We also follow
// this policy during scan-cardinality estimation.
if (statsExtrap) {
rowBuilder.add(Utils.getExtrapolatedNumRows(table, size));
}
rowBuilder.add(numFiles).addBytes(size);
if (!p.isMarkedCached()) {
// Helps to differentiate partitions that have 0B cached versus partitions
// that are not marked as cached.
rowBuilder.add("NOT CACHED");
rowBuilder.add("NOT CACHED");
} else {
// Calculate the number the number of bytes that are cached.
long cachedBytes = 0L;
for (FileDescriptor fd: p.getFileDescriptors()) {
int numBlocks = fd.getNumFileBlocks();
for (int i = 0; i < numBlocks; ++i) {
FbFileBlock block = fd.getFbFileBlock(i);
if (FileBlock.hasCachedReplica(block)) {
cachedBytes += FileBlock.getLength(block);
}
}
}
totalCachedBytes += cachedBytes;
rowBuilder.addBytes(cachedBytes);
// Extract cache replication factor from the parameters of the table
// if the table is not partitioned or directly from the partition.
Short rep = HdfsCachingUtil.getCachedCacheReplication(
table.getNumClusteringCols() == 0 ?
p.getTable().getMetaStoreTable().getParameters() :
p.getParameters());
rowBuilder.add(rep.toString());
}
rowBuilder.add(p.getFileFormat().toString());
rowBuilder.add(String.valueOf(p.hasIncrementalStats()));
rowBuilder.add(p.getLocation());
result.addToRows(rowBuilder.get());
}
// For partitioned tables add a summary row at the bottom.
if (table.getNumClusteringCols() > 0) {
TResultRowBuilder rowBuilder = new TResultRowBuilder();
int numEmptyCells = table.getNumClusteringCols() - 1;
rowBuilder.add("Total");
for (int i = 0; i < numEmptyCells; ++i) {
rowBuilder.add("");
}
// Total rows, extrapolated rows, files, bytes, cache stats.
// Leave format empty.
rowBuilder.add(table.getNumRows());
// Compute and report the extrapolated row count because the set of files could
// have changed since we last computed stats for this partition. We also follow
// this policy during scan-cardinality estimation.
if (statsExtrap) {
rowBuilder.add(Utils.getExtrapolatedNumRows(
table, table.getTotalHdfsBytes()));
}
rowBuilder.add(totalNumFiles)
.addBytes(totalBytes)
.addBytes(totalCachedBytes).add("").add("").add("").add("");
result.addToRows(rowBuilder.get());
}
return result;
}
/**
* Constructs a partition name from a list of TPartitionKeyValue objects.
*/
public static String constructPartitionName(List<TPartitionKeyValue> partitionSpec) {
List<String> partitionCols = new ArrayList<>();
List<String> partitionVals = new ArrayList<>();
for (TPartitionKeyValue kv: partitionSpec) {
partitionCols.add(kv.getName());
partitionVals.add(kv.getValue());
}
return org.apache.hadoop.hive.common.FileUtils.makePartName(partitionCols,
partitionVals);
}
/**
* Reloads the metadata of partition 'oldPartition' by removing
* it from the table and reconstructing it from the HMS partition object
* 'hmsPartition'. If old partition is null then nothing is removed and
* and partition constructed from 'hmsPartition' is simply added.
*/
public void reloadPartition(IMetaStoreClient client, HdfsPartition oldPartition,
Partition hmsPartition) throws CatalogException {
// Instead of updating the existing partition in place, we create a new one
// so that we reflect any changes in the hmsPartition object and also assign a new
// ID. This is one step towards eventually implementing IMPALA-7533.
HdfsPartition refreshedPartition = createPartition(
hmsPartition.getSd(), hmsPartition, new FsPermissionCache());
Preconditions.checkArgument(oldPartition == null
|| HdfsPartition.KV_COMPARATOR.compare(oldPartition, refreshedPartition) == 0);
if (oldPartition != null) {
refreshedPartition.setFileDescriptors(oldPartition.getFileDescriptors());
}
loadFileMetadataForPartitions(client, ImmutableList.of(refreshedPartition),
/*isRefresh=*/true);
dropPartition(oldPartition, false);
addPartition(refreshedPartition);
}
/**
* Registers table metrics.
*/
@Override
public void initMetrics() {
super.initMetrics();
metrics_.addGauge(NUM_PARTITIONS_METRIC, new Gauge<Integer>() {
@Override
public Integer getValue() { return partitionMap_.values().size(); }
});
metrics_.addGauge(NUM_FILES_METRIC, new Gauge<Long>() {
@Override
public Long getValue() { return fileMetadataStats_.numFiles; }
});
metrics_.addGauge(NUM_BLOCKS_METRIC, new Gauge<Long>() {
@Override
public Long getValue() { return fileMetadataStats_.numBlocks; }
});
metrics_.addGauge(TOTAL_FILE_BYTES_METRIC, new Gauge<Long>() {
@Override
public Long getValue() { return fileMetadataStats_.totalFileBytes; }
});
metrics_.addGauge(MEMORY_ESTIMATE_METRIC, new Gauge<Long>() {
@Override
public Long getValue() { return getEstimatedMetadataSize(); }
});
metrics_.addGauge(HAS_INCREMENTAL_STATS_METRIC, new Gauge<Boolean>() {
@Override
public Boolean getValue() { return hasIncrementalStats_; }
});
metrics_.addTimer(CATALOG_UPDATE_DURATION_METRIC);
}
/**
* Creates a temporary HdfsTable object populated with the specified properties.
* This is used for CTAS statements.
*/
public static HdfsTable createCtasTarget(Db db,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws CatalogException {
HdfsTable tmpTable = new HdfsTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
HiveConf hiveConf = new HiveConf(HdfsTable.class);
// set nullPartitionKeyValue from the hive conf.
tmpTable.nullPartitionKeyValue_ = hiveConf.get(
MetaStoreUtil.NULL_PARTITION_KEY_VALUE_CONF_KEY,
MetaStoreUtil.DEFAULT_NULL_PARTITION_KEY_VALUE);
tmpTable.loadSchema(msTbl);
tmpTable.initializePartitionMetadata(msTbl);
tmpTable.setTableStats(msTbl);
return tmpTable;
}
/**
* Returns true if the table is partitioned, false otherwise.
*/
public boolean isPartitioned() {
return getMetaStoreTable().getPartitionKeysSize() > 0;
}
}