| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.catalog; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.UnsupportedEncodingException; |
| import java.net.URLDecoder; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| import org.apache.avro.Schema; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.hive.common.ValidTxnList; |
| import org.apache.hadoop.hive.common.ValidWriteIdList; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.metastore.IMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest; |
| import org.apache.hadoop.hive.metastore.api.Partition; |
| import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; |
| import org.apache.hadoop.hive.metastore.api.StorageDescriptor; |
| import org.apache.hadoop.hive.serde.serdeConstants; |
| import org.apache.impala.analysis.Expr; |
| import org.apache.impala.analysis.LiteralExpr; |
| import org.apache.impala.analysis.NullLiteral; |
| import org.apache.impala.analysis.NumericLiteral; |
| import org.apache.impala.analysis.PartitionKeyValue; |
| import org.apache.impala.catalog.HdfsPartition.FileBlock; |
| import org.apache.impala.catalog.HdfsPartition.FileDescriptor; |
| import org.apache.impala.common.FileSystemUtil; |
| import org.apache.impala.common.ImpalaException; |
| import org.apache.impala.common.Pair; |
| import org.apache.impala.common.PrintUtils; |
| import org.apache.impala.compat.MetastoreShim; |
| import org.apache.impala.fb.FbFileBlock; |
| import org.apache.impala.thrift.CatalogLookupStatus; |
| import org.apache.impala.thrift.CatalogObjectsConstants; |
| import org.apache.impala.thrift.TAccessLevel; |
| import org.apache.impala.thrift.TCatalogObject; |
| import org.apache.impala.thrift.TCatalogObjectType; |
| import org.apache.impala.thrift.TColumn; |
| import org.apache.impala.thrift.TGetPartialCatalogObjectRequest; |
| import org.apache.impala.thrift.TGetPartialCatalogObjectResponse; |
| import org.apache.impala.thrift.THdfsFileDesc; |
| import org.apache.impala.thrift.THdfsPartition; |
| import org.apache.impala.thrift.THdfsTable; |
| import org.apache.impala.thrift.TNetworkAddress; |
| import org.apache.impala.thrift.TPartialPartitionInfo; |
| import org.apache.impala.thrift.TPartitionKeyValue; |
| import org.apache.impala.thrift.TResultSet; |
| import org.apache.impala.thrift.TResultSetMetadata; |
| import org.apache.impala.thrift.TSqlConstraints; |
| import org.apache.impala.thrift.TTable; |
| import org.apache.impala.thrift.TTableDescriptor; |
| import org.apache.impala.thrift.TTableType; |
| import org.apache.impala.util.AcidUtils; |
| import org.apache.impala.util.AvroSchemaConverter; |
| import org.apache.impala.util.AvroSchemaUtils; |
| import org.apache.impala.util.FsPermissionCache; |
| import org.apache.impala.util.FsPermissionChecker; |
| import org.apache.impala.util.HdfsCachingUtil; |
| import org.apache.impala.util.ListMap; |
| import org.apache.impala.util.MetaStoreUtil; |
| import org.apache.impala.util.TAccessLevelUtil; |
| import org.apache.impala.util.TResultRowBuilder; |
| import org.apache.impala.util.ThreadNameAnnotator; |
| import org.apache.thrift.TException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.codahale.metrics.Clock; |
| import com.codahale.metrics.Gauge; |
| import com.codahale.metrics.Timer; |
| import com.codahale.metrics.Counter; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.collect.HashMultiset; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Multiset; |
| import com.google.common.collect.Sets; |
| |
| /** |
| * Internal representation of table-related metadata of a file-resident table on a |
| * Hadoop filesystem. The table data can be accessed through libHDFS (which is more of |
| * an abstraction over Hadoop's FileSystem class rather than DFS specifically). A |
| * partitioned table can even span multiple filesystems. |
| * |
| * This class is not thread-safe. Clients of this class need to protect against |
| * concurrent updates using external locking (see CatalogOpExecutor class). |
| * |
| * Owned by Catalog instance. |
| * The partition keys constitute the clustering columns. |
| * |
| * Partition metadata are propagated to coordinators in different ways depending on the |
| * topic update mode. |
| * 1. In v1 mode (topicMode = full), we only send the partitionIds in the thrift table |
| * which represents the current list of the partitions. Additionally, for each newly |
| * added/removed partition we send a THdfsPartition in the same topic update. However, |
| * coordinators detect the removal of any partitions by absence of an id inside |
| * partitionIds in the table object. |
| * 2. In v2 mode (topicMode = minimal), LocalCatalog coordinators only load what they need |
| * and hence we only send deleted partitionIds. Updated partitions are also treated as a |
| * special case of deleted partitions by sending the previous partitionId for such |
| * partitions so that LocalCatalog coordinators invalidate them proactively. |
| * |
| * In DDL/REFRESH responses, we are still sending the full thrift tables instead of |
| * sending incremental updates as in the topic updates. Because catalogd is not aware of |
| * the table states (partitionIds) of each coordinators. Generating incremental table |
| * updates requires a base status. This will be improved in IMPALA-9936 and IMPALA-9937. |
| */ |
| public class HdfsTable extends Table implements FeFsTable { |
| // Name of default partition for unpartitioned tables |
| private static final String DEFAULT_PARTITION_NAME = ""; |
| |
| // Number of times to retry fetching the partitions from the HMS should an error occur. |
| private final static int NUM_PARTITION_FETCH_RETRIES = 5; |
| |
| // Table property key for overriding the Impalad-wide --enable_stats_extrapolation |
| // setting for a specific table. By default, tables do not have the property set and |
| // rely on the Impalad-wide --enable_stats_extrapolation flag. |
| public static final String TBL_PROP_ENABLE_STATS_EXTRAPOLATION = |
| "impala.enable.stats.extrapolation"; |
| |
| // Similar to above: a table property that overwrites --recursively_list_partitions |
| // for a specific table. This is an escape hatch in case it turns out that someone |
| // was relying on the previous non-recursive behavior, even though it's known to |
| // be inconsistent with modern versions of Spark, Hive, etc. |
| public static final String TBL_PROP_DISABLE_RECURSIVE_LISTING = |
| "impala.disable.recursive.listing"; |
| |
| // Average memory requirements (in bytes) for storing the metadata of a partition. |
| private static final long PER_PARTITION_MEM_USAGE_BYTES = 2048; |
| |
| // Average memory requirements (in bytes) for storing a file descriptor. |
| private static final long PER_FD_MEM_USAGE_BYTES = 500; |
| |
| // Average memory requirements (in bytes) for storing a block. |
| private static final long PER_BLOCK_MEM_USAGE_BYTES = 150; |
| |
| // Hdfs table specific metrics |
| public static final String CATALOG_UPDATE_DURATION_METRIC = "catalog-update-duration"; |
| public static final String NUM_PARTITIONS_METRIC = "num-partitions"; |
| public static final String NUM_FILES_METRIC = "num-files"; |
| public static final String NUM_BLOCKS_METRIC = "num-blocks"; |
| public static final String TOTAL_FILE_BYTES_METRIC = "total-file-size-bytes"; |
| public static final String MEMORY_ESTIMATE_METRIC = "memory-estimate-bytes"; |
| public static final String HAS_INCREMENTAL_STATS_METRIC = "has-incremental-stats"; |
| // metrics used to find out the cache hit rate when file-metadata is requested |
| // for a given ValidWriteIdList |
| public static final String FILEMETADATA_CACHE_MISS_METRIC = "filemetadata-cache-miss"; |
| public static final String FILEMETADATA_CACHE_HIT_METRIC = "filemetadata-cache-hit"; |
| |
| // Load all partitions time, including fetching all partitions |
| // from HMS and loading all partitions. The code path is |
| // MetaStoreUtil.fetchAllPartitions() and HdfsTable.loadAllPartitions() |
| public static final String LOAD_DURATION_ALL_PARTITIONS = |
| "load-duration.all-partitions"; |
| |
| // The file metadata loading for all all partitions. This is part of |
| // LOAD_DURATION_ALL_PARTITIONS |
| // Code path: loadFileMetadataForPartitions() inside loadAllPartitions() |
| public static final String LOAD_DURATION_FILE_METADATA_ALL_PARTITIONS = |
| "load-duration.all-partitions.file-metadata"; |
| |
| // string to indicate NULL. set in load() from table properties |
| private String nullColumnValue_; |
| |
| // hive uses this string for NULL partition keys. Set in load(). |
| private String nullPartitionKeyValue_; |
| |
| // Avro schema of this table if this is an Avro table, otherwise null. Set in load(). |
| private String avroSchema_ = null; |
| |
| // Set to true if any of the partitions have Avro data. |
| private boolean hasAvroData_ = false; |
| |
| // True if this table's metadata is marked as cached. Does not necessarily mean the |
| // data is cached or that all/any partitions are cached. |
| private boolean isMarkedCached_ = false; |
| |
| // Array of sorted maps storing the association between partition values and |
| // partition ids. There is one sorted map per partition key. It is only populated if |
| // this table object is stored in ImpaladCatalog. |
| private final List<TreeMap<LiteralExpr, Set<Long>>> partitionValuesMap_ = |
| new ArrayList<>(); |
| |
| // Array of partition id sets that correspond to partitions with null values |
| // in the partition keys; one set per partition key. It is not populated if the table is |
| // stored in the catalog server. |
| private final List<Set<Long>> nullPartitionIds_ = new ArrayList<>(); |
| |
| // Map of partition ids to HdfsPartitions. |
| private final Map<Long, HdfsPartition> partitionMap_ = new HashMap<>(); |
| |
| // Map of partition name to HdfsPartition object. Used for speeding up |
| // table metadata loading. It is only populated if this table object is stored in |
| // catalog server. |
| private final Map<String, HdfsPartition> nameToPartitionMap_ = new HashMap<>(); |
| |
| // The partition used as a prototype when creating new partitions during |
| // insertion. New partitions inherit file format and other settings from |
| // the prototype. |
| @VisibleForTesting |
| HdfsPartition prototypePartition_; |
| |
| // Empirical estimate (in bytes) of the incremental stats size per column per |
| // partition. |
| public static final long STATS_SIZE_PER_COLUMN_BYTES = 200; |
| |
| // Bi-directional map between an integer index and a unique datanode |
| // TNetworkAddresses, each of which contains blocks of 1 or more |
| // files in this table. The network addresses are stored using IP |
| // address as the host name. Each FileBlock specifies a list of |
| // indices within this hostIndex_ to specify which nodes contain |
| // replicas of the block. |
| private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<TNetworkAddress>(); |
| |
| // True iff this table has incremental stats in any of its partitions. |
| private boolean hasIncrementalStats_ = false; |
| |
| private HdfsPartitionLocationCompressor partitionLocationCompressor_; |
| |
| // Base Hdfs directory where files of this table are stored. |
| // For unpartitioned tables it is simply the path where all files live. |
| // For partitioned tables it is the root directory |
| // under which partition dirs are placed. |
| protected String hdfsBaseDir_; |
| |
| // List of FieldSchemas that correspond to the non-partition columns. Used when |
| // describing this table and its partitions to the HMS (e.g. as part of an alter table |
| // operation), when only non-partition columns are required. |
| private final List<FieldSchema> nonPartFieldSchemas_ = new ArrayList<>(); |
| |
| // Flag to check if the table schema has been loaded. Used as a precondition |
| // for setAvroSchema(). |
| private boolean isSchemaLoaded_ = false; |
| |
| // SQL constraints information for the table. Set in load() method. |
| private SqlConstraints sqlConstraints_ = new SqlConstraints(new ArrayList<>(), |
| new ArrayList<>()); |
| |
| // Valid write id list for this table. |
| // null in the case that this table is not transactional. |
| protected ValidWriteIdList validWriteIds_ = null; |
| |
| // Partitions are marked as "dirty" indicating there are in-progress modifications on |
| // their metadata. The corresponding partition builder contains the new version of the |
| // metadata so represents the in-progress modifications. The modifications will be |
| // finalized in the coming incremental metadata refresh (see updatePartitionsFromHms() |
| // for more details). This map is only maintained in the catalogd. |
| private final Map<Long, HdfsPartition.Builder> dirtyPartitions_ = new HashMap<>(); |
| |
| // The max id of all partitions of this table sent to coordinators. Partitions with ids |
| // larger than this are not known in coordinators. |
| private long maxSentPartitionId_ = HdfsPartition.INITIAL_PARTITION_ID - 1; |
| |
| // Dropped partitions since last catalog update. These partitions need to be removed |
| // in coordinator's cache if there are no updates on them. |
| private final Set<HdfsPartition> droppedPartitions = new HashSet<>(); |
| |
| // Represents a set of storage-related statistics aggregated at the table or partition |
| // level. |
| public final static class FileMetadataStats { |
| // Nuber of files in a table/partition. |
| public long numFiles; |
| // Number of blocks in a table/partition. |
| public long numBlocks; |
| // Total size (in bytes) of all files in a table/partition. |
| public long totalFileBytes; |
| |
| // Unsets the storage stats to indicate that their values are unknown. |
| public void unset() { |
| numFiles = -1; |
| numBlocks = -1; |
| totalFileBytes = -1; |
| } |
| |
| // Initializes the values of the storage stats. |
| public void init() { |
| numFiles = 0; |
| numBlocks = 0; |
| totalFileBytes = 0; |
| } |
| |
| public void set(FileMetadataStats stats) { |
| numFiles = stats.numFiles; |
| numBlocks = stats.numBlocks; |
| totalFileBytes = stats.totalFileBytes; |
| } |
| } |
| |
| // Table level storage-related statistics. Depending on whether the table is stored in |
| // the catalog server or the impalad catalog cache, these statistics serve different |
| // purposes and, hence, are managed differently. |
| // Table stored in impalad catalog cache: |
| // - Used in planning. |
| // - Stats are modified real-time by the operations that modify table metadata |
| // (e.g. add partition). |
| // Table stored in the the catalog server: |
| // - Used for reporting through catalog web UI. |
| // - Stats are reset whenever the table is loaded (due to a metadata operation) and |
| // are set when the table is serialized to Thrift. |
| private final FileMetadataStats fileMetadataStats_ = new FileMetadataStats(); |
| |
| private final static Logger LOG = LoggerFactory.getLogger(HdfsTable.class); |
| |
| public final static long LOADING_WARNING_TIME_NS = 5000000000L; |
| |
| // Caching this configuration object makes calls to getFileSystem much quicker |
| // (saves ~50ms on a standard plan) |
| // TODO(henry): confirm that this is thread safe - cursory inspection of the class |
| // and its usage in getFileSystem suggests it should be. |
| private static final Configuration CONF = new Configuration(); |
| |
| public HdfsTable(org.apache.hadoop.hive.metastore.api.Table msTbl, |
| Db db, String name, String owner) { |
| super(msTbl, db, name, owner); |
| partitionLocationCompressor_ = |
| new HdfsPartitionLocationCompressor(numClusteringCols_); |
| } |
| |
| @Override // FeFsTable |
| public boolean isLocationCacheable() { |
| return FileSystemUtil.isPathCacheable(new Path(getLocation())); |
| } |
| |
| @Override // FeFsTable |
| public boolean isCacheable() { |
| if (!isLocationCacheable()) return false; |
| if (!isMarkedCached() && numClusteringCols_ > 0) { |
| for (FeFsPartition partition: partitionMap_.values()) { |
| if (!partition.isCacheable()) { |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Updates the storage stats of this table based on the partition information. |
| * This is used only for the frontend tests that do not spawn a separate Catalog |
| * instance. |
| */ |
| public void computeHdfsStatsForTesting() { |
| Preconditions.checkState(fileMetadataStats_.numFiles == -1 |
| && fileMetadataStats_.totalFileBytes == -1); |
| fileMetadataStats_.init(); |
| for (HdfsPartition partition: partitionMap_.values()) { |
| fileMetadataStats_.numFiles += partition.getNumFileDescriptors(); |
| fileMetadataStats_.totalFileBytes += partition.getSize(); |
| } |
| } |
| |
| @Override |
| public TCatalogObjectType getCatalogObjectType() { |
| return TCatalogObjectType.TABLE; |
| } |
| |
| @Override // FeFsTable |
| public boolean isMarkedCached() { return isMarkedCached_; } |
| |
| @Override // FeFsTable |
| public Collection<? extends PrunablePartition> getPartitions() { |
| return partitionMap_.values(); |
| } |
| |
| @Override // FeFsTable |
| public Map<Long, ? extends PrunablePartition> getPartitionMap() { |
| return partitionMap_; |
| } |
| |
| @Override // FeFsTable |
| public List<FeFsPartition> loadPartitions(Collection<Long> ids) { |
| List<FeFsPartition> partitions = Lists.newArrayListWithCapacity(ids.size()); |
| for (Long id : ids) { |
| HdfsPartition partition = partitionMap_.get(id); |
| if (partition == null) { |
| throw new IllegalArgumentException("no such partition id " + id); |
| } |
| partitions.add(partition); |
| } |
| return partitions; |
| } |
| |
| @Override // FeFsTable |
| public Set<Long> getNullPartitionIds(int i) { return nullPartitionIds_.get(i); } |
| |
| public HdfsPartitionLocationCompressor getPartitionLocationCompressor() { |
| return partitionLocationCompressor_; |
| } |
| |
| // Returns an unmodifiable set of the partition IDs from partitionMap_. |
| @Override // FeFsTable |
| public Set<Long> getPartitionIds() { |
| return Collections.unmodifiableSet(partitionMap_.keySet()); |
| } |
| |
| @Override // FeFsTable |
| public TreeMap<LiteralExpr, Set<Long>> getPartitionValueMap(int i) { |
| return partitionValuesMap_.get(i); |
| } |
| |
| @Override // FeFsTable |
| public String getNullPartitionKeyValue() { |
| return nullPartitionKeyValue_; // Set during load. |
| } |
| |
| @Override // FeFsTable |
| public String getLocation() { |
| return super.getMetaStoreTable().getSd().getLocation(); |
| } |
| |
| List<FieldSchema> getNonPartitionFieldSchemas() { return nonPartFieldSchemas_; } |
| |
| // True if Impala has HDFS write permissions on the hdfsBaseDir |
| @Override |
| public boolean hasWriteAccessToBaseDir() { |
| return TAccessLevelUtil.impliesWriteAccess(accessLevel_); |
| } |
| |
| /** |
| * Returns the first location (HDFS path) that Impala does not have WRITE access |
| * to, or an null if none is found. For an unpartitioned table, this just |
| * checks the hdfsBaseDir. For a partitioned table it checks the base directory |
| * as well as all partition directories. |
| */ |
| @Override |
| public String getFirstLocationWithoutWriteAccess() { |
| if (!hasWriteAccessToBaseDir()) { |
| return hdfsBaseDir_; |
| } |
| for (HdfsPartition partition: partitionMap_.values()) { |
| if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) { |
| return partition.getLocation(); |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Marks a partition dirty by registering the partition builder for its new instance. |
| */ |
| public void markDirtyPartition(HdfsPartition.Builder partBuilder) { |
| dirtyPartitions_.put(partBuilder.getOldId(), partBuilder); |
| } |
| |
| /** |
| * @return true if whether there are any in-progress modifications on metadata of this |
| * partition. |
| */ |
| public boolean isDirtyPartition(HdfsPartition partition) { |
| return dirtyPartitions_.containsKey(partition.getId()); |
| } |
| |
| /** |
| * Pick up the partition builder to continue the in-progress modifications. |
| * The builder is then unregistered so the callers should guarantee that the in-progress |
| * modifications are finalized (by calling Builder.build() and use the new instance to |
| * replace the old one). |
| * @return the builder for given partition's new instance. |
| */ |
| public HdfsPartition.Builder pickInprogressPartitionBuilder(HdfsPartition partition) { |
| return dirtyPartitions_.remove(partition.getId()); |
| } |
| |
| /** |
| * @return true if any partitions are dirty. |
| */ |
| @Override |
| public boolean hasInProgressModification() { return !dirtyPartitions_.isEmpty(); } |
| |
| /** |
| * Clears all the in-progress modifications by clearing all the partition builders. |
| */ |
| @Override |
| public void resetInProgressModification() { dirtyPartitions_.clear(); } |
| |
| /** |
| * Gets the PrunablePartition matching the given partition spec. Returns null if no |
| * match was found. |
| */ |
| public static PrunablePartition getPartition(FeFsTable table, |
| List<PartitionKeyValue> partitionSpec) { |
| List<TPartitionKeyValue> partitionKeyValues = new ArrayList<>(); |
| for (PartitionKeyValue kv: partitionSpec) { |
| Preconditions.checkArgument(kv.isStatic(), "unexpected dynamic partition: %s", |
| kv); |
| String value = PartitionKeyValue.getPartitionKeyValueString( |
| kv.getLiteralValue(), table.getNullPartitionKeyValue()); |
| partitionKeyValues.add(new TPartitionKeyValue(kv.getColName(), value)); |
| } |
| return Utils.getPartitionFromThriftPartitionSpec(table, partitionKeyValues); |
| } |
| |
| /** |
| * Gets the HdfsPartition matching the Thrift version of the partition spec. |
| * Returns null if no match was found. |
| */ |
| public HdfsPartition getPartitionFromThriftPartitionSpec( |
| List<TPartitionKeyValue> partitionSpec) { |
| return (HdfsPartition)Utils.getPartitionFromThriftPartitionSpec(this, partitionSpec); |
| } |
| |
| /** |
| * Gets hdfs partitions by the given partition set. |
| */ |
| @SuppressWarnings("unchecked") |
| public List<HdfsPartition> getPartitionsFromPartitionSet( |
| List<List<TPartitionKeyValue>> partitionSet) { |
| return (List<HdfsPartition>)Utils.getPartitionsFromPartitionSet(this, partitionSet); |
| } |
| |
| /** |
| * Create columns corresponding to fieldSchemas. Throws a TableLoadingException if the |
| * metadata is incompatible with what we support. |
| */ |
| private void addColumnsFromFieldSchemas(List<FieldSchema> fieldSchemas) |
| throws TableLoadingException { |
| int pos = colsByPos_.size(); |
| for (FieldSchema s: fieldSchemas) { |
| Type type = parseColumnType(s); |
| // Check if we support partitioning on columns of such a type. |
| if (pos < numClusteringCols_ && !type.supportsTablePartitioning()) { |
| throw new TableLoadingException( |
| String.format("Failed to load metadata for table '%s' because of " + |
| "unsupported partition-column type '%s' in partition column '%s'", |
| getFullName(), type.toString(), s.getName())); |
| } |
| |
| Column col = new Column(s.getName(), type, s.getComment(), pos); |
| addColumn(col); |
| ++pos; |
| } |
| } |
| |
| /** |
| * Adds the synthetic "row__id" column to the table schema. Under "row__id" it adds |
| * the ACID hidden columns. |
| * Note that this is the exact opposite of the file schema. In an ACID file, the |
| * hidden columns are top-level while the user columns are embedded inside a struct |
| * typed column called "row". We cheat here because this way we don't need to change |
| * column resolution and everything will work seemlessly. We'll only need to generate |
| * a different schema path for the columns but that's fairly simple. |
| * The hidden columns can be retrieved via 'SELECT row__id.* FROM <table>' which is |
| * similar to Hive's 'SELECT row__id FROM <table>'. |
| */ |
| private void addColumnsForFullAcidTable(List<FieldSchema> fieldSchemas) |
| throws TableLoadingException { |
| addColumn(AcidUtils.getRowIdColumnType(colsByPos_.size())); |
| addColumnsFromFieldSchemas(fieldSchemas); |
| } |
| |
| /** |
| * Clear the partitions of an HdfsTable and the associated metadata. |
| */ |
| private void resetPartitions() { |
| partitionMap_.clear(); |
| nameToPartitionMap_.clear(); |
| partitionValuesMap_.clear(); |
| nullPartitionIds_.clear(); |
| if (isStoredInImpaladCatalogCache()) { |
| // Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats. |
| for (int i = 0; i < numClusteringCols_; ++i) { |
| getColumns().get(i).getStats().setNumNulls(0); |
| getColumns().get(i).getStats().setNumDistinctValues(0); |
| partitionValuesMap_.add(new TreeMap<>()); |
| nullPartitionIds_.add(new HashSet<>()); |
| } |
| } |
| fileMetadataStats_.init(); |
| } |
| |
| /** |
| * Resets any partition metadata, creates the prototype partition and sets the base |
| * table directory path as well as the caching info from the HMS table. |
| */ |
| public void initializePartitionMetadata( |
| org.apache.hadoop.hive.metastore.api.Table msTbl) throws CatalogException { |
| Preconditions.checkNotNull(msTbl); |
| resetPartitions(); |
| hdfsBaseDir_ = msTbl.getSd().getLocation(); |
| setPrototypePartition(msTbl.getSd()); |
| |
| // We silently ignore cache directives that no longer exist in HDFS, and remove |
| // non-existing cache directives from the parameters. |
| isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters()); |
| } |
| |
| /** |
| * Create HdfsPartition objects corresponding to 'msPartitions' and add them to this |
| * table's partition list. Any partition metadata will be reset and loaded from |
| * scratch. For each partition created, we load the block metadata for each data file |
| * under it. Returns time spent loading the filesystem metadata in nanoseconds. |
| * |
| * If there are no partitions in the Hive metadata, a single partition is added with no |
| * partition keys. |
| */ |
| private long loadAllPartitions(IMetaStoreClient client, List<Partition> msPartitions, |
| org.apache.hadoop.hive.metastore.api.Table msTbl) throws IOException, |
| CatalogException { |
| Preconditions.checkNotNull(msTbl); |
| final Clock clock = Clock.defaultClock(); |
| long startTime = clock.getTick(); |
| initializePartitionMetadata(msTbl); |
| FsPermissionCache permCache = preloadPermissionsCache(msPartitions); |
| |
| Path tblLocation = FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath()); |
| accessLevel_ = getAvailableAccessLevel(getFullName(), tblLocation, permCache); |
| |
| List<HdfsPartition.Builder> partBuilders = new ArrayList<>(); |
| if (msTbl.getPartitionKeysSize() == 0) { |
| Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty()); |
| // This table has no partition key, which means it has no declared partitions. |
| // We model partitions slightly differently to Hive - every file must exist in a |
| // partition, so add a single partition with no keys which will get all the |
| // files in the table's root directory. |
| HdfsPartition.Builder partBuilder = createPartitionBuilder(msTbl.getSd(), |
| /*msPartition=*/null, permCache); |
| partBuilder.setIsMarkedCached(isMarkedCached_); |
| setUnpartitionedTableStats(partBuilder); |
| partBuilders.add(partBuilder); |
| } else { |
| for (Partition msPartition: msPartitions) { |
| partBuilders.add(createPartitionBuilder( |
| msPartition.getSd(), msPartition, permCache)); |
| } |
| } |
| // Load the file metadata from scratch. |
| Timer.Context fileMetadataLdContext = getMetrics().getTimer( |
| HdfsTable.LOAD_DURATION_FILE_METADATA_ALL_PARTITIONS).time(); |
| loadFileMetadataForPartitions(client, partBuilders, /*isRefresh=*/false); |
| fileMetadataLdContext.stop(); |
| for (HdfsPartition.Builder p : partBuilders) addPartition(p.build()); |
| return clock.getTick() - startTime; |
| } |
| |
| /** |
| * Loads valid txn list from HMS. Re-throws exceptions as CatalogException. |
| */ |
| private ValidTxnList loadValidTxns(IMetaStoreClient client) throws CatalogException { |
| try { |
| return MetastoreShim.getValidTxns(client); |
| } catch (TException exception) { |
| throw new CatalogException(exception.getMessage()); |
| } |
| } |
| |
| /** |
| * Helper method to load the block locations for each partition in 'parts'. |
| * New file descriptor lists are loaded and the partitions are updated in place. |
| * |
| * @param isRefresh whether this is a refresh operation or an initial load. This only |
| * affects logging. |
| * @return time in nanoseconds spent in loading file metadata. |
| */ |
| private long loadFileMetadataForPartitions(IMetaStoreClient client, |
| Collection<HdfsPartition.Builder> partBuilders, boolean isRefresh) |
| throws CatalogException { |
| final Clock clock = Clock.defaultClock(); |
| long startTime = clock.getTick(); |
| |
| //TODO: maybe it'd be better to load the valid txn list in the context of a |
| // transaction to have consistent valid write ids and valid transaction ids. |
| // Currently tables are loaded when they are first referenced and stay in catalog |
| // until certain actions occur (refresh, invalidate, insert, etc.). However, |
| // Impala doesn't notice when HMS's cleaner removes old transactional directories, |
| // which might lead to FileNotFound exceptions. |
| ValidTxnList validTxnList = validWriteIds_ != null ? loadValidTxns(client) : null; |
| String logPrefix = String.format( |
| "%s file and block metadata for %s paths for table %s", |
| isRefresh ? "Refreshing" : "Loading", partBuilders.size(), |
| getFullName()); |
| |
| // Actually load the partitions. |
| // TODO(IMPALA-8406): if this fails to load files from one or more partitions, then |
| // we'll throw an exception here and end up bailing out of whatever catalog operation |
| // we're in the middle of. This could cause a partial metadata update -- eg we may |
| // have refreshed the top-level table properties without refreshing the files. |
| new ParallelFileMetadataLoader( |
| this, partBuilders, validWriteIds_, validTxnList, logPrefix) |
| .load(); |
| |
| // TODO(todd): would be good to log a summary of the loading process: |
| // - how many block locations did we reuse/load individually/load via batch |
| // - how many partitions did we read metadata for |
| // - etc... |
| String partNames = Joiner.on(", ").join( |
| Iterables.limit( |
| Iterables.transform(partBuilders, HdfsPartition.Builder::getPartitionName), |
| 3)); |
| if (partBuilders.size() > 3) { |
| partNames += String.format(", and %s others", |
| Iterables.size(partBuilders) - 3); |
| } |
| |
| long duration = clock.getTick() - startTime; |
| LOG.info("Loaded file and block metadata for {} partitions: {}. Time taken: {}", |
| getFullName(), partNames, PrintUtils.printTimeNs(duration)); |
| return duration; |
| } |
| |
| public FileSystem getFileSystem() throws CatalogException { |
| FileSystem tableFs; |
| try { |
| tableFs = (new Path(getLocation())).getFileSystem(CONF); |
| } catch (IOException e) { |
| throw new CatalogException("Invalid table path for table: " + getFullName(), e); |
| } |
| return tableFs; |
| } |
| |
| /** |
| * Gets the AccessLevel that is available for Impala for this table based on the |
| * permissions Impala has on the given path. If the path does not exist, recurses up |
| * the path until a existing parent directory is found, and inherit access permissions |
| * from that. |
| * Always returns READ_WRITE for S3 and ADLS files. |
| */ |
| private static TAccessLevel getAvailableAccessLevel(String tableName, |
| Path location, FsPermissionCache permCache) throws IOException { |
| Preconditions.checkNotNull(location); |
| FileSystem fs = location.getFileSystem(CONF); |
| // Avoid calling getPermissions() on file path for S3 files, as that makes a round |
| // trip to S3. Also, the S3A connector is currently unable to manage S3 permissions, |
| // so for now it is safe to assume that all files(objects) have READ_WRITE |
| // permissions, as that's what the S3A connector will always return too. |
| // TODO: Revisit if the S3A connector is updated to be able to manage S3 object |
| // permissions. (see HADOOP-13892) |
| if (FileSystemUtil.isS3AFileSystem(fs)) return TAccessLevel.READ_WRITE; |
| |
| // The ADLS connector currently returns ACLs for files in ADLS, but can only map |
| // them to the ADLS client SPI and not the Hadoop users/groups, causing unexpected |
| // behavior. So ADLS ACLs are unsupported until the connector is able to map |
| // permissions to hadoop users/groups (HADOOP-14437). |
| if (FileSystemUtil.isADLFileSystem(fs)) return TAccessLevel.READ_WRITE; |
| if (FileSystemUtil.isABFSFileSystem(fs)) return TAccessLevel.READ_WRITE; |
| |
| while (location != null) { |
| try { |
| FsPermissionChecker.Permissions perms = permCache.getPermissions(location); |
| if (perms.canReadAndWrite()) { |
| return TAccessLevel.READ_WRITE; |
| } else if (perms.canRead()) { |
| return TAccessLevel.READ_ONLY; |
| } else if (perms.canWrite()) { |
| return TAccessLevel.WRITE_ONLY; |
| } |
| return TAccessLevel.NONE; |
| } catch (FileNotFoundException e) { |
| location = location.getParent(); |
| } |
| } |
| // Should never get here. |
| throw new NullPointerException("Error determining access level for table " + |
| tableName + ": no path ancestor exists for path: " + location); |
| } |
| |
| /** |
| * Creates new HdfsPartition objects to be added to HdfsTable's partition list. |
| * Partitions may be empty, or may not even exist in the filesystem (a partition's |
| * location may have been changed to a new path that is about to be created by an |
| * INSERT). Also loads the file metadata for this partition. Returns new partition |
| * if successful or null if none was created. |
| * |
| * Throws CatalogException if one of the supplied storage descriptors contains metadata |
| * that Impala can't understand. |
| */ |
| public List<HdfsPartition> createAndLoadPartitions(IMetaStoreClient client, |
| List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions) |
| throws CatalogException { |
| List<HdfsPartition.Builder> addedPartBuilders = new ArrayList<>(); |
| FsPermissionCache permCache = preloadPermissionsCache(msPartitions); |
| for (org.apache.hadoop.hive.metastore.api.Partition partition: msPartitions) { |
| HdfsPartition.Builder partBuilder = createPartitionBuilder(partition.getSd(), |
| partition, permCache); |
| Preconditions.checkNotNull(partBuilder); |
| addedPartBuilders.add(partBuilder); |
| } |
| loadFileMetadataForPartitions(client, addedPartBuilders, /*isRefresh=*/false); |
| return addedPartBuilders.stream() |
| .map(HdfsPartition.Builder::build) |
| .collect(Collectors.toList()); |
| } |
| |
| /** |
| * Creates a new HdfsPartition.Builder from a specified StorageDescriptor and an HMS |
| * partition object. |
| */ |
| private HdfsPartition.Builder createPartitionBuilder( |
| StorageDescriptor storageDescriptor, Partition msPartition, |
| FsPermissionCache permCache) throws CatalogException { |
| return createOrUpdatePartitionBuilder( |
| storageDescriptor, msPartition, permCache, null); |
| } |
| |
| private HdfsPartition.Builder createOrUpdatePartitionBuilder( |
| StorageDescriptor storageDescriptor, |
| org.apache.hadoop.hive.metastore.api.Partition msPartition, |
| FsPermissionCache permCache, HdfsPartition.Builder partBuilder) |
| throws CatalogException { |
| if (partBuilder == null) partBuilder = new HdfsPartition.Builder(this); |
| partBuilder |
| .setMsPartition(msPartition) |
| .setFileFormatDescriptor( |
| HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor)); |
| Path partDirPath = new Path(storageDescriptor.getLocation()); |
| try { |
| if (msPartition != null) { |
| // Update the parameters based on validations with hdfs. |
| boolean isCached = HdfsCachingUtil.validateCacheParams( |
| partBuilder.getParameters()); |
| partBuilder.setIsMarkedCached(isCached); |
| } |
| TAccessLevel accessLevel = getAvailableAccessLevel(getFullName(), partDirPath, |
| permCache); |
| partBuilder.setAccessLevel(accessLevel); |
| partBuilder.checkWellFormed(); |
| if (!TAccessLevelUtil.impliesWriteAccess(accessLevel)) { |
| // TODO: READ_ONLY isn't exactly correct because the it's possible the |
| // partition does not have READ permissions either. When we start checking |
| // whether we can READ from a table, this should be updated to set the |
| // table's access level to the "lowest" effective level across all |
| // partitions. That is, if one partition has READ_ONLY and another has |
| // WRITE_ONLY the table's access level should be NONE. |
| accessLevel_ = TAccessLevel.READ_ONLY; |
| } |
| return partBuilder; |
| } catch (IOException e) { |
| throw new CatalogException("Error initializing partition", e); |
| } |
| } |
| |
| /** |
| * Adds the partition to the HdfsTable. Throws a CatalogException if the partition |
| * already exists in this table. |
| */ |
| public void addPartition(HdfsPartition partition) throws CatalogException { |
| if (partitionMap_.containsKey(partition.getId())) { |
| throw new CatalogException(String.format("Partition %s already exists in table %s", |
| partition.getPartitionName(), getFullName())); |
| } |
| addPartitionNoThrow(partition); |
| } |
| |
| /** |
| * Adds the partition to the HdfsTable. Skips if a partition with the same partition id |
| * already exists. |
| */ |
| public boolean addPartitionNoThrow(HdfsPartition partition) { |
| if (partitionMap_.containsKey(partition.getId())) return false; |
| if (partition.getFileFormat() == HdfsFileFormat.AVRO) hasAvroData_ = true; |
| partitionMap_.put(partition.getId(), partition); |
| fileMetadataStats_.totalFileBytes += partition.getSize(); |
| fileMetadataStats_.numFiles += partition.getNumFileDescriptors(); |
| updatePartitionMdAndColStats(partition); |
| return true; |
| } |
| |
| /** |
| * Updates the HdfsTable's partition metadata, i.e. adds the id to the HdfsTable and |
| * populates structures used for speeding up partition pruning/lookup. Also updates |
| * column stats. |
| */ |
| private void updatePartitionMdAndColStats(HdfsPartition partition) { |
| if (partition.getPartitionValues().size() != numClusteringCols_) return; |
| nameToPartitionMap_.put(partition.getPartitionName(), partition); |
| if (!isStoredInImpaladCatalogCache()) return; |
| for (int i = 0; i < partition.getPartitionValues().size(); ++i) { |
| ColumnStats stats = getColumns().get(i).getStats(); |
| LiteralExpr literal = partition.getPartitionValues().get(i); |
| // Store partitions with null partition values separately |
| if (Expr.IS_NULL_LITERAL.apply(literal)) { |
| stats.setNumNulls(stats.getNumNulls() + 1); |
| if (nullPartitionIds_.get(i).isEmpty()) { |
| stats.setNumDistinctValues(stats.getNumDistinctValues() + 1); |
| } |
| nullPartitionIds_.get(i).add(Long.valueOf(partition.getId())); |
| continue; |
| } |
| Set<Long> partitionIds = partitionValuesMap_.get(i).get(literal); |
| if (partitionIds == null) { |
| partitionIds = new HashSet<>(); |
| partitionValuesMap_.get(i).put(literal, partitionIds); |
| stats.setNumDistinctValues(stats.getNumDistinctValues() + 1); |
| } |
| partitionIds.add(Long.valueOf(partition.getId())); |
| } |
| } |
| |
| public void updatePartitions(List<HdfsPartition.Builder> partBuilders) |
| throws CatalogException { |
| for (HdfsPartition.Builder p : partBuilders) updatePartition(p); |
| } |
| |
| public void updatePartition(HdfsPartition.Builder partBuilder) throws CatalogException { |
| HdfsPartition oldPartition = partBuilder.getOldInstance(); |
| Preconditions.checkNotNull(oldPartition); |
| Preconditions.checkState(partitionMap_.containsKey(oldPartition.getId())); |
| HdfsPartition newPartition = partBuilder.build(); |
| // Partition is reloaded and hence cache directives are not dropped. |
| dropPartition(oldPartition, false); |
| addPartition(newPartition); |
| } |
| |
| /** |
| * Drops the partition having the given partition spec from HdfsTable. Cleans up its |
| * metadata from all the mappings used to speed up partition pruning/lookup. |
| * Also updates partition column statistics. Given partitionSpec must match exactly |
| * one partition. |
| * Returns the HdfsPartition that was dropped. If the partition does not exist, returns |
| * null. |
| */ |
| public HdfsPartition dropPartition(List<TPartitionKeyValue> partitionSpec) { |
| return dropPartition(getPartitionFromThriftPartitionSpec(partitionSpec)); |
| } |
| |
| /** |
| * The same as the above method but specifies the partition using the partition id. |
| */ |
| public HdfsPartition dropPartition(long partitionId) { |
| return dropPartition(partitionMap_.get(partitionId)); |
| } |
| |
| /** |
| * Drops a partition and updates partition column statistics. Returns the |
| * HdfsPartition that was dropped or null if the partition does not exist. |
| * If removeCacheDirective = true, any cache directive on the partition is removed. |
| */ |
| private HdfsPartition dropPartition(HdfsPartition partition, |
| boolean removeCacheDirective) { |
| if (partition == null) return null; |
| fileMetadataStats_.totalFileBytes -= partition.getSize(); |
| fileMetadataStats_.numFiles -= partition.getNumFileDescriptors(); |
| Preconditions.checkArgument(partition.getPartitionValues().size() == |
| numClusteringCols_); |
| Long partitionId = partition.getId(); |
| partitionMap_.remove(partitionId); |
| nameToPartitionMap_.remove(partition.getPartitionName()); |
| if (removeCacheDirective && partition.isMarkedCached()) { |
| try { |
| // Partition's parameters map is immutable. Create a temp one for the cleanup. |
| HdfsCachingUtil.removePartitionCacheDirective(Maps.newHashMap( |
| partition.getParameters())); |
| } catch (ImpalaException e) { |
| LOG.error("Unable to remove the cache directive on table " + getFullName() + |
| ", partition " + partition.getPartitionName() + ": ", e); |
| } |
| } |
| // dirtyPartitions_ and droppedPartitionIds are only maintained in the catalogd. |
| // nullPartitionIds_ and partitionValuesMap_ are only maintained in coordinators. |
| if (!isStoredInImpaladCatalogCache()) { |
| dirtyPartitions_.remove(partitionId); |
| droppedPartitions.add(partition.genMinimalPartition()); |
| return partition; |
| } |
| for (int i = 0; i < partition.getPartitionValues().size(); ++i) { |
| ColumnStats stats = getColumns().get(i).getStats(); |
| LiteralExpr literal = partition.getPartitionValues().get(i); |
| // Check if this is a null literal. |
| if (Expr.IS_NULL_LITERAL.apply(literal)) { |
| nullPartitionIds_.get(i).remove(partitionId); |
| stats.setNumNulls(stats.getNumNulls() - 1); |
| if (nullPartitionIds_.get(i).isEmpty()) { |
| stats.setNumDistinctValues(stats.getNumDistinctValues() - 1); |
| } |
| continue; |
| } |
| Set<Long> partitionIds = partitionValuesMap_.get(i).get(literal); |
| // If there are multiple partition ids corresponding to a literal, remove |
| // only this id. Otherwise, remove the <literal, id> pair. |
| if (partitionIds.size() > 1) partitionIds.remove(partitionId); |
| else { |
| partitionValuesMap_.get(i).remove(literal); |
| stats.setNumDistinctValues(stats.getNumDistinctValues() - 1); |
| } |
| } |
| return partition; |
| } |
| |
| private HdfsPartition dropPartition(HdfsPartition partition) { |
| return dropPartition(partition, true); |
| } |
| |
| /** |
| * Drops the given partitions from this table. Cleans up its metadata from all the |
| * mappings used to speed up partition pruning/lookup. Also updates partitions column |
| * statistics. Returns the list of partitions that were dropped. |
| */ |
| public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions, |
| boolean removeCacheDirective) { |
| List<HdfsPartition> droppedPartitions = new ArrayList<>(); |
| for (HdfsPartition partition: partitions) { |
| HdfsPartition hdfsPartition = dropPartition(partition, removeCacheDirective); |
| if (hdfsPartition != null) droppedPartitions.add(hdfsPartition); |
| } |
| return droppedPartitions; |
| } |
| |
| public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions) { |
| return dropPartitions(partitions, true); |
| } |
| |
| /** |
| * Update the prototype partition used when creating new partitions for |
| * this table. New partitions will inherit storage properties from the |
| * provided descriptor. |
| */ |
| public void setPrototypePartition(StorageDescriptor storageDescriptor) |
| throws CatalogException { |
| HdfsStorageDescriptor hdfsStorageDescriptor = |
| HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor); |
| prototypePartition_ = HdfsPartition.prototypePartition(this, hdfsStorageDescriptor); |
| } |
| |
| @Override |
| public void load(boolean reuseMetadata, IMetaStoreClient client, |
| org.apache.hadoop.hive.metastore.api.Table msTbl, String reason) |
| throws TableLoadingException { |
| load(reuseMetadata, client, msTbl, /* loadPartitionFileMetadata */ |
| true, /* loadTableSchema*/true, false, |
| /* partitionsToUpdate*/null, reason); |
| } |
| |
| public void load(boolean reuseMetadata, IMetaStoreClient client, |
| org.apache.hadoop.hive.metastore.api.Table msTbl, boolean refreshUpdatedPartitions, |
| String reason) throws TableLoadingException { |
| load(reuseMetadata, client, msTbl, /* loadPartitionFileMetadata */ |
| true, /* loadTableSchema*/true, refreshUpdatedPartitions, |
| /* partitionsToUpdate*/null, reason); |
| } |
| |
| /** |
| * Loads table metadata from the Hive Metastore. |
| * |
| * If 'reuseMetadata' is false, performs a full metadata load from the Hive Metastore, |
| * including partition and file metadata. Otherwise, loads metadata incrementally and |
| * updates this HdfsTable in place so that it is in sync with the Hive Metastore. |
| * |
| * Depending on the operation that triggered the table metadata load, not all the |
| * metadata may need to be updated. If 'partitionsToUpdate' is not null, it specifies a |
| * list of partitions for which metadata should be updated. Otherwise, all partition |
| * metadata will be updated from the Hive Metastore. |
| * |
| * If 'loadParitionFileMetadata' is true, file metadata of the specified partitions |
| * are reloaded from scratch. If 'partitionsToUpdate' is not specified, file metadata |
| * of all the partitions are loaded. |
| * |
| * If 'loadTableSchema' is true, the table schema is loaded from the Hive Metastore. |
| * |
| * Existing file descriptors might be reused incorrectly if Hdfs rebalancer was |
| * executed, as it changes the block locations but doesn't update the mtime (file |
| * modification time). |
| * If this occurs, user has to execute "invalidate metadata" to invalidate the |
| * metadata cache of the table and trigger a fresh load. |
| */ |
| public void load(boolean reuseMetadata, IMetaStoreClient client, |
| org.apache.hadoop.hive.metastore.api.Table msTbl, |
| boolean loadParitionFileMetadata, boolean loadTableSchema, |
| boolean refreshUpdatedPartitions, Set<String> partitionsToUpdate, String reason) |
| throws TableLoadingException { |
| final Timer.Context context = |
| getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time(); |
| String annotation = String.format("%s metadata for %s%s partition(s) of %s.%s (%s)", |
| reuseMetadata ? "Reloading" : "Loading", |
| loadTableSchema ? "table definition and " : "", |
| partitionsToUpdate == null ? "all" : String.valueOf(partitionsToUpdate.size()), |
| msTbl.getDbName(), msTbl.getTableName(), reason); |
| LOG.info(annotation); |
| final Timer storageLdTimer = |
| getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA); |
| storageMetadataLoadTime_ = 0; |
| try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) { |
| // turn all exceptions into TableLoadingException |
| msTable_ = msTbl; |
| try { |
| if (loadTableSchema) { |
| // set nullPartitionKeyValue from the hive conf. |
| nullPartitionKeyValue_ = |
| MetaStoreUtil.getNullPartitionKeyValue(client).intern(); |
| loadSchema(msTbl); |
| loadAllColumnStats(client); |
| loadConstraintsInfo(client, msTbl); |
| } |
| loadValidWriteIdList(client); |
| // Set table-level stats first so partition stats can inherit it. |
| setTableStats(msTbl); |
| // Load partition and file metadata |
| if (reuseMetadata) { |
| // Incrementally update this table's partitions and file metadata |
| Preconditions.checkState( |
| partitionsToUpdate == null || loadParitionFileMetadata); |
| storageMetadataLoadTime_ += updateMdFromHmsTable(msTbl); |
| if (msTbl.getPartitionKeysSize() == 0) { |
| if (loadParitionFileMetadata) { |
| storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(client); |
| } else { // Update the single partition stats in case table stats changes. |
| updateUnpartitionedTableStats(); |
| } |
| } else { |
| storageMetadataLoadTime_ += updatePartitionsFromHms( |
| client, partitionsToUpdate, loadParitionFileMetadata, |
| refreshUpdatedPartitions); |
| } |
| LOG.info("Incrementally loaded table metadata for: " + getFullName()); |
| } else { |
| LOG.info("Fetching partition metadata from the Metastore: " + getFullName()); |
| final Timer.Context allPartitionsLdContext = |
| getMetrics().getTimer(HdfsTable.LOAD_DURATION_ALL_PARTITIONS).time(); |
| // Load all partitions from Hive Metastore, including file metadata. |
| List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions = |
| MetaStoreUtil.fetchAllPartitions( |
| client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES); |
| LOG.info("Fetched partition metadata from the Metastore: " + getFullName()); |
| storageMetadataLoadTime_ = loadAllPartitions(client, msPartitions, msTbl); |
| allPartitionsLdContext.stop(); |
| } |
| if (loadTableSchema) setAvroSchema(client, msTbl); |
| fileMetadataStats_.unset(); |
| refreshLastUsedTime(); |
| // Make sure all the partition modifications are done. |
| Preconditions.checkState(dirtyPartitions_.isEmpty()); |
| } catch (TableLoadingException e) { |
| throw e; |
| } catch (Exception e) { |
| throw new TableLoadingException("Failed to load metadata for table: " |
| + getFullName(), e); |
| } |
| } finally { |
| storageLdTimer.update(storageMetadataLoadTime_, TimeUnit.NANOSECONDS); |
| long load_time_duration = context.stop(); |
| if (load_time_duration > LOADING_WARNING_TIME_NS) { |
| LOG.warn("Time taken on loading table " + getFullName() + " exceeded " + |
| "warning threshold. Time: " + PrintUtils.printTimeNs(load_time_duration)); |
| } |
| updateTableLoadingTime(); |
| } |
| } |
| |
| /** |
| * Load Primary Key and Foreign Key information for table. Throws TableLoadingException |
| * if the load fails. |
| */ |
| private void loadConstraintsInfo(IMetaStoreClient client, |
| org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException{ |
| try { |
| sqlConstraints_ = new SqlConstraints(client.getPrimaryKeys( |
| new PrimaryKeysRequest(msTbl.getDbName(), msTbl.getTableName())), |
| client.getForeignKeys(new ForeignKeysRequest(null, null, |
| msTbl.getDbName(), msTbl.getTableName()))); |
| } catch (Exception e) { |
| throw new TableLoadingException("Failed to load primary keys/foreign keys for " |
| + "table: " + getFullName(), e); |
| } |
| } |
| |
| /** |
| * Updates the table metadata, including 'hdfsBaseDir_', 'isMarkedCached_', |
| * and 'accessLevel_' from 'msTbl'. Returns time spent accessing file system |
| * in nanoseconds. Throws an IOException if there was an error accessing |
| * the table location path. |
| */ |
| private long updateMdFromHmsTable(org.apache.hadoop.hive.metastore.api.Table msTbl) |
| throws IOException { |
| Preconditions.checkNotNull(msTbl); |
| final Clock clock = Clock.defaultClock(); |
| long filesystemAccessTime = 0; |
| long startTime = clock.getTick(); |
| hdfsBaseDir_ = msTbl.getSd().getLocation(); |
| isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters()); |
| Path location = new Path(hdfsBaseDir_); |
| accessLevel_ = getAvailableAccessLevel(getFullName(), location, |
| new FsPermissionCache()); |
| filesystemAccessTime = clock.getTick() - startTime; |
| setMetaStoreTable(msTbl); |
| return filesystemAccessTime; |
| } |
| |
| /** |
| * Incrementally updates the file metadata of an unpartitioned HdfsTable. |
| * Returns time spent updating the file metadata in nanoseconds. |
| * |
| * This is optimized for the case where few files have changed. See |
| * {@link FileMetadataLoader#load} for details. |
| */ |
| private long updateUnpartitionedTableFileMd(IMetaStoreClient client) |
| throws CatalogException { |
| Preconditions.checkState(getNumClusteringCols() == 0); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("update unpartitioned table: " + getFullName()); |
| } |
| HdfsPartition oldPartition = Iterables.getOnlyElement(partitionMap_.values()); |
| resetPartitions(); |
| org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); |
| Preconditions.checkNotNull(msTbl); |
| setPrototypePartition(msTbl.getSd()); |
| HdfsPartition.Builder partBuilder = createPartitionBuilder(msTbl.getSd(), |
| /*msPartition=*/null, new FsPermissionCache()); |
| // Copy over the FDs from the old partition to the new one, so that |
| // 'refreshPartitionFileMetadata' below can compare modification times and |
| // reload the locations only for those that changed. |
| partBuilder.setFileDescriptors(oldPartition); |
| partBuilder.setIsMarkedCached(isMarkedCached_); |
| // Keep track of the previous partition id, so we can send invalidation on the old |
| // partition instance to local catalog coordinators. |
| partBuilder.setPrevId(oldPartition.getId()); |
| long fileMdLoadTime = loadFileMetadataForPartitions(client, |
| ImmutableList.of(partBuilder), /*isRefresh=*/true); |
| setUnpartitionedTableStats(partBuilder); |
| addPartition(partBuilder.build()); |
| return fileMdLoadTime; |
| } |
| |
| /** |
| * Updates the single partition stats of an unpartitioned HdfsTable. |
| */ |
| private void updateUnpartitionedTableStats() throws CatalogException { |
| // Just update the single partition if its #rows is stale. |
| HdfsPartition oldPartition = Iterables.getOnlyElement(partitionMap_.values()); |
| if (oldPartition.getNumRows() != getNumRows()) { |
| HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(oldPartition) |
| .setNumRows(getNumRows()); |
| updatePartition(partBuilder); |
| } |
| } |
| |
| /** |
| * Updates the partitions of an HdfsTable so that they are in sync with the |
| * Hive Metastore. It reloads partitions that were marked 'dirty' by doing a |
| * DROP + CREATE. It removes from this table partitions that no longer exist |
| * in the Hive Metastore and adds partitions that were added externally (e.g. |
| * using Hive) to the Hive Metastore but do not exist in this table. If |
| * 'loadParitionFileMetadata' is true, it triggers file/block metadata reload |
| * for the partitions specified in 'partitionsToUpdate', if any, or for all |
| * the table partitions if 'partitionsToUpdate' is null. Returns time |
| * spent loading file metadata in nanoseconds. |
| */ |
| private long updatePartitionsFromHms(IMetaStoreClient client, |
| Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata, |
| boolean refreshUpdatedPartitions) throws Exception { |
| if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName()); |
| org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); |
| Preconditions.checkNotNull(msTbl); |
| Preconditions.checkState(msTbl.getPartitionKeysSize() != 0); |
| Preconditions.checkState(loadPartitionFileMetadata || partitionsToUpdate == null); |
| PartitionDeltaUpdater deltaUpdater = |
| refreshUpdatedPartitions ? new PartBasedDeltaUpdater(client, |
| loadPartitionFileMetadata, partitionsToUpdate) |
| : new PartNameBasedDeltaUpdater(client, loadPartitionFileMetadata, |
| partitionsToUpdate); |
| deltaUpdater.apply(); |
| return deltaUpdater.loadTimeForFileMdNs_; |
| } |
| |
| /** |
| * Util class to compute the delta of partitions known to this table and the partitions |
| * in Hive Metastore. This is used to incrementally refresh the table. It identifies |
| * the partitions which are removed [added] from metastore and removes [adds] them. |
| * Additionally, it also updates partitions which are provided or found to be stale. |
| */ |
| private abstract class PartitionDeltaUpdater { |
| // flag used to determine if the file-metadata needs to be reloaded for stale |
| // partitions |
| private final boolean loadFileMd_; |
| // total time taken to load file-metadata in nano-seconds. |
| private long loadTimeForFileMdNs_; |
| // metastore client used to fetch partition information from metastore. |
| protected final IMetaStoreClient client_; |
| // Nullable set of partition names which when set is used to force load partitions. |
| // if loadFileMd_ flag is set, files for these partitions will also be |
| // reloaded. |
| private final Set<String> partitionsToUpdate_; |
| |
| PartitionDeltaUpdater(IMetaStoreClient client, boolean loadPartitionFileMetadata, |
| Set<String> partitionsToUpdate) { |
| this.client_ = client; |
| this.loadFileMd_ = loadPartitionFileMetadata; |
| this.partitionsToUpdate_ = partitionsToUpdate; |
| } |
| |
| /** |
| * This method used to determine if the given HdfsPartition has been removed from |
| * hive metastore. |
| * @return true if partition does not exist in metastore, else false. |
| */ |
| public abstract boolean isRemoved(HdfsPartition hdfsPartition); |
| |
| /** |
| * Loads any partitions which are known to metastore but not provided in |
| * knownPartitions. All such new partitions will be added in the given |
| * {@code addedPartNames} set. |
| * @param knownPartitions Known set of partition names to this Table. |
| * @param addedPartNames Set of part names which is used to return the newly added |
| * partNames |
| * @return Time taken in nanoseconds for file-metadata loading for new partitions. |
| */ |
| public abstract long loadNewPartitions(Set<String> knownPartitions, |
| Set<String> addedPartNames) throws Exception; |
| |
| /** |
| * Gets a {@link HdfsPartition.Builder} to construct a updated HdfsPartition for |
| * the given partition. |
| */ |
| public abstract HdfsPartition.Builder getUpdatedPartition(HdfsPartition partition) |
| throws Exception; |
| |
| /** |
| * Loads both the HMS and file-metadata of the partitions provided by the given |
| * map of HdfsPartition.Builders. |
| * @param updatedPartitionBuilders The map of partition names and the corresponding |
| * HdfsPartition.Builders which need to be loaded. |
| * @return Time taken to load file-metadata in nanoseconds. |
| */ |
| public abstract long loadUpdatedPartitions( |
| Map<String, HdfsPartition.Builder> updatedPartitionBuilders) throws Exception; |
| |
| /** |
| * This method applies the partition delta (create new, remove old, update stale) |
| * when compared to the current state of partitions in the metastore. |
| */ |
| public void apply() throws Exception { |
| List<HdfsPartition> removedPartitions = new ArrayList<>(); |
| Map<String, HdfsPartition.Builder> updatedPartitions = new HashMap<>(); |
| List<HdfsPartition.Builder> partitionsToLoadFiles = new ArrayList<>(); |
| Set<String> partitionNames = new HashSet<>(); |
| for (HdfsPartition partition: partitionMap_.values()) { |
| // Remove partitions that don't exist in the Hive Metastore. These are partitions |
| // that were removed from HMS using some external process, e.g. Hive. |
| if (isRemoved(partition)) { |
| removedPartitions.add(partition); |
| } else { |
| HdfsPartition.Builder updatedPartBuilder = getUpdatedPartition(partition); |
| if (updatedPartBuilder != null) { |
| // If there are any self-updated (dirty) or externally updated partitions |
| // add them to the list of updatedPartitions so that they are reloaded later. |
| updatedPartitions.put(partition.getPartitionName(), updatedPartBuilder); |
| } else if (loadFileMd_ && partitionsToUpdate_ == null) { |
| partitionsToLoadFiles.add(new HdfsPartition.Builder(partition)); |
| } |
| } |
| Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor()); |
| partitionNames.add(partition.getPartitionName()); |
| } |
| dropPartitions(removedPartitions); |
| // Load dirty partitions from Hive Metastore. File metadata of dirty partitions will |
| // always be reloaded (ignore the loadPartitionFileMetadata flag). |
| loadTimeForFileMdNs_ = loadUpdatedPartitions(updatedPartitions); |
| Preconditions.checkState(!hasInProgressModification()); |
| Set<String> addedPartitions = new HashSet<>(); |
| loadTimeForFileMdNs_ += loadNewPartitions(partitionNames, addedPartitions); |
| // If a list of modified partitions (old and new) is specified, don't reload file |
| // metadata for the new ones as they have already been detected in HMS and have been |
| // reloaded by loadNewPartitions(). |
| if (partitionsToUpdate_ != null) { |
| partitionsToUpdate_.removeAll(addedPartitions); |
| } |
| // Load file metadata. Until we have a notification mechanism for when a |
| // file changes in hdfs, it is sometimes required to reload all the file |
| // descriptors and block metadata of a table (e.g. REFRESH statement). |
| if (loadFileMd_) { |
| if (partitionsToUpdate_ != null) { |
| Preconditions.checkState(partitionsToLoadFiles.isEmpty()); |
| // Only reload file metadata of partitions specified in 'partitionsToUpdate' |
| List<HdfsPartition> parts = getPartitionsForNames(partitionsToUpdate_); |
| partitionsToLoadFiles = parts.stream().map(HdfsPartition.Builder::new) |
| .collect(Collectors.toList()); |
| } |
| loadTimeForFileMdNs_ += loadFileMetadataForPartitions(client_, |
| partitionsToLoadFiles,/* isRefresh=*/true); |
| updatePartitions(partitionsToLoadFiles); |
| } |
| } |
| |
| /** |
| * Returns the total time taken to load file-metadata in nanoseconds. Mostly used |
| * for legacy reasons to return to the coordinators the time taken load file-metadata. |
| */ |
| public long getTotalFileMdLoadTime() { |
| return loadTimeForFileMdNs_; |
| } |
| } |
| |
| /** |
| * Util class which computes the delta of partitions for this table when compared to |
| * HMS. This class fetches all the partition objects from metastore and then evaluates |
| * the delta with what is known to this HdfsTable. It also detects changed partitions |
| * unlike {@link PartNameBasedDeltaUpdater} which only determines change in list |
| * of partition names. |
| */ |
| private class PartBasedDeltaUpdater extends PartitionDeltaUpdater { |
| private final Map<String, Partition> msPartitions_ = new HashMap<>(); |
| private final FsPermissionCache permCache_ = new FsPermissionCache(); |
| |
| public PartBasedDeltaUpdater( |
| IMetaStoreClient client, boolean loadPartitionFileMetadata, |
| Set<String> partitionsToUpdate) throws Exception { |
| super(client, loadPartitionFileMetadata, partitionsToUpdate); |
| Stopwatch sw = Stopwatch.createStarted(); |
| List<Partition> partitionList; |
| if (partitionsToUpdate != null) { |
| partitionList = MetaStoreUtil |
| .fetchPartitionsByName(client, Lists.newArrayList(partitionsToUpdate), |
| db_.getName(), name_); |
| } else { |
| partitionList = |
| MetaStoreUtil.fetchAllPartitions( |
| client_, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES); |
| } |
| LOG.debug("Time taken to fetch all partitions of table {}: {} msec", getFullName(), |
| sw.stop().elapsed(TimeUnit.MILLISECONDS)); |
| List<String> partitionColNames = getClusteringColNames(); |
| for (Partition part : partitionList) { |
| msPartitions_ |
| .put(MetastoreShim.makePartName(partitionColNames, part.getValues()), part); |
| } |
| } |
| |
| @Override |
| public boolean isRemoved(HdfsPartition hdfsPartition) { |
| return !msPartitions_.containsKey(hdfsPartition.getPartitionName()); |
| } |
| |
| /** |
| * In addition to the dirty partitions (representing partitions which are updated |
| * via on-going table metadata changes in this Catalog), this also detects staleness |
| * by comparing the {@link StorageDescriptor} of the given HdfsPartition with what is |
| * present in the HiveMetastore. This is useful to perform "deep" refresh table so |
| * that outside changes to existing partitions (eg. location update) are detected. |
| */ |
| @Override |
| public HdfsPartition.Builder getUpdatedPartition(HdfsPartition hdfsPartition) |
| throws Exception { |
| HdfsPartition.Builder updatedPartitionBuilder = pickInprogressPartitionBuilder( |
| hdfsPartition); |
| Partition msPartition = Preconditions |
| .checkNotNull(msPartitions_.get(hdfsPartition.getPartitionName())); |
| Preconditions.checkNotNull(msPartition.getSd()); |
| // we compare the StorageDescriptor from HdfsPartition object to the one |
| // from HMS and if they don't match we assume that the partition has been updated |
| // in HMS. This would catch the cases where partition fields, locations or |
| // file-format are changed from external systems. |
| if(!hdfsPartition.compareSd(msPartition.getSd())) { |
| // if the updatePartitionBuilder is null, it means that this partition update |
| // was not from an in-progress modification in this catalog, but rather from |
| // and outside update to the partition. |
| if (updatedPartitionBuilder == null) { |
| updatedPartitionBuilder = new HdfsPartition.Builder(hdfsPartition); |
| } |
| // msPartition is different than what we have in HdfsTable |
| updatedPartitionBuilder = createOrUpdatePartitionBuilder(msPartition.getSd(), |
| msPartition, permCache_, updatedPartitionBuilder); |
| } |
| return updatedPartitionBuilder; |
| } |
| |
| @Override |
| public long loadNewPartitions(Set<String> knownPartitions, Set<String> addedPartNames) |
| throws Exception { |
| // get the names of the partitions which present in HMS but not in this table. |
| List<Partition> newMsPartitions = new ArrayList<>(); |
| for (String partNameInMs : msPartitions_.keySet()) { |
| if (!knownPartitions.contains(partNameInMs)) { |
| newMsPartitions.add(msPartitions_.get(partNameInMs)); |
| addedPartNames.add(partNameInMs); |
| } |
| } |
| return loadPartitionsFromMetastore(newMsPartitions, |
| /*inprogressPartBuilders=*/null, client_); |
| } |
| |
| @Override |
| public long loadUpdatedPartitions( |
| Map<String, HdfsPartition.Builder> updatedPartBuilders) throws Exception { |
| List<Partition> updatedPartitions = new ArrayList<>(); |
| for (String partName : updatedPartBuilders.keySet()) { |
| updatedPartitions.add(Preconditions |
| .checkNotNull(msPartitions_.get(partName))); |
| } |
| return loadPartitionsFromMetastore(updatedPartitions, updatedPartBuilders, client_); |
| } |
| } |
| |
| |
| /** |
| * This DeltaChecker uses partition names to determine the delta between metastore |
| * and catalog. As such this is faster than {@link PartBasedDeltaUpdater} but it cannot |
| * detect partition updates other than partition names (e.g. outside partition location |
| * updates will not be detected). |
| */ |
| private class PartNameBasedDeltaUpdater extends PartitionDeltaUpdater { |
| private final Set<String> partitionNamesFromHms_; |
| |
| public PartNameBasedDeltaUpdater( |
| IMetaStoreClient client, boolean loadPartitionFileMetadata, |
| Set<String> partitionsToUpdate) throws Exception { |
| super(client, loadPartitionFileMetadata, partitionsToUpdate); |
| // Retrieve all the partition names from the Hive Metastore. We need this to |
| // identify the delta between partitions of the local HdfsTable and the table entry |
| // in the Hive Metastore. Note: This is a relatively "cheap" operation |
| // (~.3 secs for 30K partitions). |
| partitionNamesFromHms_ = new HashSet<>(client_ |
| .listPartitionNames(db_.getName(), name_, (short) -1)); |
| } |
| |
| @Override |
| public boolean isRemoved(HdfsPartition hdfsPartition) { |
| return !partitionNamesFromHms_.contains(hdfsPartition.getPartitionName()); |
| } |
| |
| @Override |
| public HdfsPartition.Builder getUpdatedPartition(HdfsPartition hdfsPartition) { |
| return pickInprogressPartitionBuilder(hdfsPartition); |
| } |
| |
| @Override |
| public long loadNewPartitions(Set<String> knownPartitionNames, |
| Set<String> addedPartNames) throws Exception { |
| // Identify and load partitions that were added in the Hive Metastore but don't |
| // exist in this table. File metadata of them will be loaded. |
| addedPartNames.addAll(Sets |
| .difference(partitionNamesFromHms_, knownPartitionNames)); |
| return loadPartitionsFromMetastore(addedPartNames, |
| /*inprogressPartBuilders=*/null, client_); |
| } |
| |
| @Override |
| public long loadUpdatedPartitions( |
| Map<String, HdfsPartition.Builder> updatedPartitionBuilders) throws Exception { |
| return loadPartitionsFromMetastore(updatedPartitionBuilders.keySet(), |
| updatedPartitionBuilders, client_); |
| } |
| } |
| |
| /** |
| * Gets the names of partition columns. |
| */ |
| public List<String> getClusteringColNames() { |
| List<String> colNames = new ArrayList<>(getNumClusteringCols()); |
| for (Column column : getClusteringColumns()) { |
| colNames.add(column.name_); |
| } |
| return colNames; |
| } |
| |
| /** |
| * Given a set of partition names, returns the corresponding HdfsPartition |
| * objects. |
| */ |
| public List<HdfsPartition> getPartitionsForNames(Collection<String> partitionNames) { |
| List<HdfsPartition> parts = Lists.newArrayListWithCapacity(partitionNames.size()); |
| for (String partitionName: partitionNames) { |
| String partName = DEFAULT_PARTITION_NAME; |
| if (partitionName.length() > 0) { |
| // Trim the last trailing char '/' from each partition name |
| partName = partitionName.substring(0, partitionName.length()-1); |
| } |
| HdfsPartition partition = nameToPartitionMap_.get(partName); |
| Preconditions.checkNotNull(partition, "Invalid partition name: " + partName); |
| parts.add(partition); |
| } |
| return parts; |
| } |
| |
| private void setUnpartitionedTableStats(HdfsPartition.Builder partBuilder) { |
| Preconditions.checkState(numClusteringCols_ == 0); |
| // For unpartitioned tables set the numRows in its single partition |
| // to the table's numRows. |
| partBuilder.setNumRows(getNumRows()); |
| } |
| |
| /** |
| * Sets avroSchema_ if the table or any of the partitions in the table are stored |
| * as Avro. Additionally, this method also reconciles the schema if the column |
| * definitions from the metastore differ from the Avro schema. |
| */ |
| private void setAvroSchema(IMetaStoreClient client, |
| org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception { |
| Preconditions.checkState(isSchemaLoaded_); |
| String inputFormat = msTbl.getSd().getInputFormat(); |
| if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO |
| || hasAvroData_) { |
| // Look for Avro schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter |
| // taking precedence. |
| List<Map<String, String>> schemaSearchLocations = new ArrayList<>(); |
| schemaSearchLocations.add( |
| getMetaStoreTable().getSd().getSerdeInfo().getParameters()); |
| schemaSearchLocations.add(getMetaStoreTable().getParameters()); |
| |
| avroSchema_ = AvroSchemaUtils.getAvroSchema(schemaSearchLocations); |
| |
| if (avroSchema_ == null) { |
| // No Avro schema was explicitly set in the table metadata, so infer the Avro |
| // schema from the column definitions. |
| Schema inferredSchema = AvroSchemaConverter.convertFieldSchemas( |
| msTbl.getSd().getCols(), getFullName()); |
| avroSchema_ = inferredSchema.toString(); |
| // NOTE: below we reconcile this inferred schema back into the table |
| // schema in the case of Avro-formatted tables. This has the side effect |
| // of promoting types like TINYINT to INT. |
| } |
| String serdeLib = msTbl.getSd().getSerdeInfo().getSerializationLib(); |
| if (serdeLib == null || |
| serdeLib.equals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) { |
| // If the SerDe library is null or set to LazySimpleSerDe or is null, it |
| // indicates there is an issue with the table metadata since Avro table need a |
| // non-native serde. Instead of failing to load the table, fall back to |
| // using the fields from the storage descriptor (same as Hive). |
| return; |
| } else { |
| List<FieldSchema> reconciledFieldSchemas = AvroSchemaUtils.reconcileAvroSchema( |
| msTbl, avroSchema_); |
| |
| // Reset and update nonPartFieldSchemas_ to the reconciled colDefs. |
| nonPartFieldSchemas_.clear(); |
| nonPartFieldSchemas_.addAll(reconciledFieldSchemas); |
| // Update the columns as per the reconciled colDefs and re-load stats. |
| clearColumns(); |
| addColumnsFromFieldSchemas(msTbl.getPartitionKeys()); |
| addColumnsFromFieldSchemas(nonPartFieldSchemas_); |
| loadAllColumnStats(client); |
| } |
| } |
| } |
| |
| /** |
| * Loads table schema. |
| */ |
| private void loadSchema(org.apache.hadoop.hive.metastore.api.Table msTbl) |
| throws TableLoadingException { |
| nonPartFieldSchemas_.clear(); |
| // set NULL indicator string from table properties |
| nullColumnValue_ = |
| msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT); |
| if (nullColumnValue_ == null) nullColumnValue_ = FeFsTable.DEFAULT_NULL_COLUMN_VALUE; |
| |
| // Excludes partition columns. |
| nonPartFieldSchemas_.addAll(msTbl.getSd().getCols()); |
| |
| // The number of clustering columns is the number of partition keys. |
| numClusteringCols_ = msTbl.getPartitionKeys().size(); |
| partitionLocationCompressor_.setClusteringColumns(numClusteringCols_); |
| clearColumns(); |
| // Add all columns to the table. Ordering is important: partition columns first, |
| // then all other columns. |
| addColumnsFromFieldSchemas(msTbl.getPartitionKeys()); |
| if (AcidUtils.isFullAcidTable(msTbl.getParameters())) { |
| addColumnsForFullAcidTable(nonPartFieldSchemas_); |
| } else { |
| addColumnsFromFieldSchemas(nonPartFieldSchemas_); |
| } |
| isSchemaLoaded_ = true; |
| } |
| |
| /** |
| * Loads from the Hive Metastore and file system the partitions that correspond to |
| * the specified 'partitionNames' and adds/updates them to the internal list of table |
| * partitions. |
| * If 'inprogressPartBuilders' is null, new partitions will be created. |
| * If 'inprogressPartBuilders' is not null, take over the in-progress modifications |
| * and finalized them by updating the existing partitions. |
| * @return time in nanoseconds spent in loading file metadata. |
| */ |
| private long loadPartitionsFromMetastore(Set<String> partitionNames, |
| Map<String, HdfsPartition.Builder> inprogressPartBuilders, IMetaStoreClient client) |
| throws Exception { |
| Preconditions.checkNotNull(partitionNames); |
| if (partitionNames.isEmpty()) return 0; |
| // Load partition metadata from Hive Metastore. |
| List<Partition> msPartitions = new ArrayList<>( |
| MetaStoreUtil.fetchPartitionsByName( |
| client, Lists.newArrayList(partitionNames), db_.getName(), name_)); |
| return loadPartitionsFromMetastore(msPartitions, inprogressPartBuilders, client); |
| } |
| |
| private long loadPartitionsFromMetastore(List<Partition> msPartitions, |
| Map<String, HdfsPartition.Builder> inprogressPartBuilders, IMetaStoreClient client) |
| throws Exception { |
| FsPermissionCache permCache = preloadPermissionsCache(msPartitions); |
| List<HdfsPartition.Builder> partBuilders = new ArrayList<>(msPartitions.size()); |
| for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) { |
| String partName = FeCatalogUtils.getPartitionName(this, msPartition.getValues()); |
| HdfsPartition.Builder partBuilder = null; |
| if (inprogressPartBuilders != null) { |
| // If we have a in-progress partition modification, update the partition builder. |
| partBuilder = inprogressPartBuilders.get(partName); |
| Preconditions.checkNotNull(partBuilder); |
| } |
| partBuilder = createOrUpdatePartitionBuilder( |
| msPartition.getSd(), msPartition, permCache, partBuilder); |
| partBuilders.add(partBuilder); |
| } |
| long fileMdLoadTime = loadFileMetadataForPartitions(client, partBuilders, |
| /* isRefresh=*/false); |
| for (HdfsPartition.Builder p : partBuilders) { |
| if (inprogressPartBuilders == null) { |
| addPartition(p.build()); |
| } else { |
| updatePartition(p); |
| } |
| } |
| return fileMdLoadTime; |
| } |
| |
| /** |
| * For each of the partitions in 'msPartitions' with a location inside the table's |
| * base directory, attempt to pre-cache the associated file permissions into the |
| * returned cache. This takes advantage of the fact that many partition directories will |
| * be in the same parent directories, and we can bulk fetch the permissions with a |
| * single round trip to the filesystem instead of individually looking up each. |
| */ |
| private FsPermissionCache preloadPermissionsCache(List<Partition> msPartitions) { |
| FsPermissionCache permCache = new FsPermissionCache(); |
| // Only preload permissions if the number of partitions to be added is |
| // large (3x) relative to the number of existing partitions. This covers |
| // two common cases: |
| // |
| // 1) initial load of a table (no existing partition metadata) |
| // 2) ALTER TABLE RECOVER PARTITIONS after creating a table pointing to |
| // an already-existing partition directory tree |
| // |
| // Without this heuristic, we would end up using a "listStatus" call to |
| // potentially fetch a bunch of irrelevant information about existing |
| // partitions when we only want to know about a small number of newly-added |
| // partitions. |
| if (msPartitions.size() < partitionMap_.size() * 3) return permCache; |
| |
| // TODO(todd): when HDFS-13616 (batch listing of multiple directories) |
| // is implemented, we could likely implement this with a single round |
| // trip. |
| Multiset<Path> parentPaths = HashMultiset.create(); |
| for (Partition p : msPartitions) { |
| // We only do this optimization for partitions which are within the table's base |
| // directory. Otherwise we risk a case where a user has specified an external |
| // partition location that is in a directory containing a high number of irrelevant |
| // files, and we'll potentially regress performance compared to just looking up |
| // the partition file directly. |
| String loc = p.getSd().getLocation(); |
| if (!loc.startsWith(hdfsBaseDir_)) continue; |
| Path parent = new Path(loc).getParent(); |
| if (parent == null) continue; |
| parentPaths.add(parent); |
| } |
| |
| // For any paths that contain more than one partition, issue a listStatus |
| // and pre-cache the resulting permissions. |
| for (Multiset.Entry<Path> entry : parentPaths.entrySet()) { |
| if (entry.getCount() == 1) continue; |
| Path p = entry.getElement(); |
| try { |
| FileSystem fs = p.getFileSystem(CONF); |
| permCache.precacheChildrenOf(fs, p); |
| } catch (IOException ioe) { |
| // If we fail to pre-warm the cache we'll just wait for later when we |
| // try to actually load the individual permissions, at which point |
| // we can handle the issue accordingly. |
| LOG.debug("Unable to bulk-load permissions for parent path: " + p, ioe); |
| } |
| } |
| return permCache; |
| } |
| |
| @Override |
| protected List<String> getColumnNamesWithHmsStats() { |
| List<String> ret = new ArrayList<>(); |
| // Only non-partition columns have column stats in the HMS. |
| for (Column column: getColumns().subList(numClusteringCols_, getColumns().size())) { |
| ret.add(column.getName().toLowerCase()); |
| } |
| return ret; |
| } |
| |
| @Override |
| protected synchronized void loadFromThrift(TTable thriftTable) |
| throws TableLoadingException { |
| super.loadFromThrift(thriftTable); |
| THdfsTable hdfsTable = thriftTable.getHdfs_table(); |
| partitionLocationCompressor_ = new HdfsPartitionLocationCompressor( |
| numClusteringCols_, hdfsTable.getPartition_prefixes()); |
| hdfsBaseDir_ = hdfsTable.getHdfsBaseDir(); |
| nullColumnValue_ = hdfsTable.nullColumnValue; |
| nullPartitionKeyValue_ = hdfsTable.nullPartitionKeyValue; |
| hostIndex_.populate(hdfsTable.getNetwork_addresses()); |
| sqlConstraints_ = SqlConstraints.fromThrift(hdfsTable.getSql_constraints()); |
| resetPartitions(); |
| try { |
| if (hdfsTable.has_full_partitions) { |
| for (THdfsPartition tPart : hdfsTable.getPartitions().values()) { |
| addPartition(new HdfsPartition.Builder(this, tPart.id) |
| .fromThrift(tPart) |
| .build()); |
| } |
| } |
| prototypePartition_ = |
| new HdfsPartition.Builder(this, CatalogObjectsConstants.PROTOTYPE_PARTITION_ID) |
| .fromThrift(hdfsTable.prototype_partition) |
| .build(); |
| } catch (CatalogException e) { |
| throw new TableLoadingException(e.getMessage()); |
| } |
| avroSchema_ = hdfsTable.isSetAvroSchema() ? hdfsTable.getAvroSchema() : null; |
| isMarkedCached_ = |
| HdfsCachingUtil.validateCacheParams(getMetaStoreTable().getParameters()); |
| if (hdfsTable.isSetValid_write_ids()) { |
| validWriteIds_ = MetastoreShim.getValidWriteIdListFromThrift( |
| getFullName(), hdfsTable.getValid_write_ids()); |
| } |
| } |
| |
| /** |
| * Validate that all expected partitions are set and not have any stale partitions. |
| */ |
| public void validatePartitions(Set<Long> expectedPartitionIds) |
| throws TableLoadingException { |
| if (!partitionMap_.keySet().equals(expectedPartitionIds)) { |
| throw new TableLoadingException(String.format("Error applying incremental updates" + |
| " on table %s: missing partition ids %s, stale partition ids %s", |
| getFullName(), expectedPartitionIds.removeAll(partitionMap_.keySet()), |
| partitionMap_.keySet().removeAll(expectedPartitionIds))); |
| } |
| } |
| |
| @Override |
| public TTableDescriptor toThriftDescriptor(int tableId, |
| Set<Long> referencedPartitions) { |
| // Create thrift descriptors to send to the BE. The BE does not |
| // need any information below the THdfsPartition level. |
| TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE, |
| getTColumnDescriptors(), numClusteringCols_, name_, db_.getName()); |
| tableDesc.setHdfsTable(getTHdfsTable(ThriftObjectType.DESCRIPTOR_ONLY, |
| referencedPartitions)); |
| return tableDesc; |
| } |
| |
| @Override |
| public TTable toThrift() { |
| // Send all metadata between the catalog service and the FE. |
| TTable table = super.toThrift(); |
| table.setTable_type(TTableType.HDFS_TABLE); |
| table.setHdfs_table(getTHdfsTable(ThriftObjectType.FULL, null)); |
| return table; |
| } |
| |
| /** |
| * Just like toThrift but unset the full partition metadata in the partition map. |
| * So only the partition ids are contained. Used in catalogd to send partition updates |
| * individually in catalog topic updates. |
| */ |
| public TTable toThriftWithMinimalPartitions() { |
| TTable table = super.toThrift(); |
| table.setTable_type(TTableType.HDFS_TABLE); |
| // Specify an empty set to exclude all partitions. |
| THdfsTable hdfsTable = getTHdfsTable(ThriftObjectType.DESCRIPTOR_ONLY, |
| Collections.emptySet()); |
| // Host indexes are still required by resolving incremental partition updates. |
| hdfsTable.setNetwork_addresses(hostIndex_.getList()); |
| // Coordinators use the partition ids to detect stale and new partitions. Thrift |
| // doesn't allow null values in maps. So we still set non-null values here. |
| for (long partId : partitionMap_.keySet()) { |
| THdfsPartition part = new THdfsPartition(); |
| part.setId(partId); |
| hdfsTable.putToPartitions(partId, part); |
| } |
| hdfsTable.setHas_full_partitions(false); |
| hdfsTable.setHas_partition_names(false); |
| table.setHdfs_table(hdfsTable); |
| return table; |
| } |
| |
| /** |
| * Just likes super.toMinimalTCatalogObject() but also contains the minimal catalog |
| * objects of partitions in the returned result. |
| */ |
| @Override |
| public TCatalogObject toMinimalTCatalogObject() { |
| TCatalogObject catalogObject = super.toMinimalTCatalogObject(); |
| catalogObject.getTable().setTable_type(TTableType.HDFS_TABLE); |
| THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(), |
| nullPartitionKeyValue_, nullColumnValue_, |
| /*idToPartition=*/ new HashMap<>(), |
| /*prototypePartition=*/ new THdfsPartition()); |
| for (HdfsPartition part : partitionMap_.values()) { |
| hdfsTable.partitions.put(part.getId(), part.toMinimalTHdfsPartition()); |
| } |
| hdfsTable.setHas_full_partitions(false); |
| // The minimal catalog object of partitions contain the partition names. |
| hdfsTable.setHas_partition_names(true); |
| catalogObject.getTable().setHdfs_table(hdfsTable); |
| return catalogObject; |
| } |
| |
| /** |
| * Gets the max sent partition id in previous catalog topic updates. |
| */ |
| public long getMaxSentPartitionId() { return maxSentPartitionId_; } |
| |
| /** |
| * Updates the max sent partition id in catalog topic updates. |
| */ |
| public void setMaxSentPartitionId(long maxSentPartitionId) { |
| this.maxSentPartitionId_ = maxSentPartitionId; |
| } |
| |
| /** |
| * Resets the max sent partition id in catalog topic updates. Used when statestore |
| * resarts and requires a full update from the catalogd. |
| */ |
| public void resetMaxSentPartitionId() { |
| maxSentPartitionId_ = Catalog.INITIAL_CATALOG_VERSION - 1; |
| } |
| |
| /** |
| * Gets the deleted/replaced partition instances since last catalog topic update. |
| */ |
| public List<HdfsPartition> getDroppedPartitions() { |
| return ImmutableList.copyOf(droppedPartitions); |
| } |
| |
| /** |
| * Clears the deleted/replaced partition instance set. |
| */ |
| public void resetDroppedPartitions() { droppedPartitions.clear(); } |
| |
| /** |
| * Gets catalog objects of new partitions since last catalog update. They are partitions |
| * that coordinators are not aware of. |
| */ |
| public List<TCatalogObject> getNewPartitionsSinceLastUpdate() { |
| List<TCatalogObject> result = new ArrayList<>(); |
| int numSkippedParts = 0; |
| for (HdfsPartition partition: partitionMap_.values()) { |
| if (partition.getId() <= maxSentPartitionId_) { |
| numSkippedParts++; |
| continue; |
| } |
| TCatalogObject catalogPart = |
| new TCatalogObject(TCatalogObjectType.HDFS_PARTITION, getCatalogVersion()); |
| partition.setTCatalogObject(catalogPart); |
| result.add(catalogPart); |
| } |
| LOG.info("Skipped {} partitions of table {} in the incremental update", |
| numSkippedParts, getFullName()); |
| return result; |
| } |
| |
| public TGetPartialCatalogObjectResponse getPartialInfo( |
| TGetPartialCatalogObjectRequest req, |
| Map<HdfsPartition, TPartialPartitionInfo> missingPartitionInfos) |
| throws CatalogException { |
| Preconditions.checkNotNull(missingPartitionInfos); |
| TGetPartialCatalogObjectResponse resp = super.getPartialInfo(req); |
| boolean wantPartitionInfo = req.table_info_selector.want_partition_files || |
| req.table_info_selector.want_partition_metadata || |
| req.table_info_selector.want_partition_names || |
| req.table_info_selector.want_partition_stats; |
| |
| Collection<Long> partIds = req.table_info_selector.partition_ids; |
| if (partIds == null && wantPartitionInfo) { |
| // Caller specified at least one piece of partition info but didn't specify |
| // any partition IDs. That means they want the info for all partitions. |
| partIds = partitionMap_.keySet(); |
| } |
| |
| ValidWriteIdList reqWriteIdList = req.table_info_selector.valid_write_ids == null ? |
| null : MetastoreShim.getValidWriteIdListFromThrift(getFullName(), |
| req.table_info_selector.valid_write_ids); |
| Counter misses = metrics_.getCounter(FILEMETADATA_CACHE_MISS_METRIC); |
| Counter hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC); |
| int numFilesFiltered = 0; |
| if (partIds != null) { |
| resp.table_info.partitions = Lists.newArrayListWithCapacity(partIds.size()); |
| for (long partId : partIds) { |
| HdfsPartition part = partitionMap_.get(partId); |
| if (part == null) { |
| LOG.warn(String.format("Missing partition ID: %s, Table: %s", partId, |
| getFullName())); |
| return new TGetPartialCatalogObjectResponse().setLookup_status( |
| CatalogLookupStatus.PARTITION_NOT_FOUND); |
| } |
| TPartialPartitionInfo partInfo = new TPartialPartitionInfo(partId); |
| |
| if (req.table_info_selector.want_partition_names) { |
| partInfo.setName(part.getPartitionName()); |
| } |
| |
| if (req.table_info_selector.want_partition_metadata) { |
| partInfo.hms_partition = part.toHmsPartition(); |
| partInfo.setHas_incremental_stats(part.hasIncrementalStats()); |
| } |
| |
| if (req.table_info_selector.want_partition_files) { |
| try { |
| if (!part.getInsertFileDescriptors().isEmpty()) { |
| partInfo.file_descriptors = new ArrayList<>(); |
| partInfo.insert_file_descriptors = new ArrayList<>(); |
| numFilesFiltered += addFilteredFds(part.getInsertFileDescriptors(), |
| partInfo.insert_file_descriptors, reqWriteIdList); |
| partInfo.delete_file_descriptors = new ArrayList<>(); |
| numFilesFiltered += addFilteredFds(part.getDeleteFileDescriptors(), |
| partInfo.delete_file_descriptors, reqWriteIdList); |
| } else { |
| partInfo.file_descriptors = new ArrayList<>(); |
| numFilesFiltered += addFilteredFds(part.getFileDescriptors(), |
| partInfo.file_descriptors, reqWriteIdList); |
| partInfo.insert_file_descriptors = new ArrayList<>(); |
| partInfo.delete_file_descriptors = new ArrayList<>(); |
| } |
| hits.inc(); |
| } catch (CatalogException ex) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Could not use cached file descriptors of partition {} of table" |
| + " {} for writeIdList {}", part.getPartitionName(), getFullName(), |
| reqWriteIdList, ex); |
| } |
| misses.inc(); |
| missingPartitionInfos.put(part, partInfo); |
| } |
| } |
| |
| if (req.table_info_selector.want_partition_stats) { |
| partInfo.setPartition_stats(part.getPartitionStatsCompressed()); |
| } |
| |
| partInfo.setIs_marked_cached(part.isMarkedCached()); |
| resp.table_info.partitions.add(partInfo); |
| } |
| } |
| |
| if (reqWriteIdList != null) { |
| LOG.debug("{} files filtered out of table {} for {}. Hit rate : {}", |
| numFilesFiltered, getFullName(), reqWriteIdList, getFileMetadataCacheHitRate()); |
| } |
| |
| if (req.table_info_selector.want_partition_files) { |
| // TODO(todd) we are sending the whole host index even if we returned only |
| // one file -- maybe not so efficient, but the alternative is to do a bunch |
| // of cloning of file descriptors which might increase memory pressure. |
| resp.table_info.setNetwork_addresses(hostIndex_.getList()); |
| } |
| |
| if (req.table_info_selector.want_table_constraints) { |
| TSqlConstraints sqlConstraints = |
| new TSqlConstraints(sqlConstraints_.getPrimaryKeys(), |
| sqlConstraints_.getForeignKeys()); |
| resp.table_info.setSql_constraints(sqlConstraints); |
| } |
| // Publish the isMarkedCached_ marker so coordinators don't need to validate |
| // it again which requires additional HDFS RPCs. |
| resp.table_info.setIs_marked_cached(isMarkedCached_); |
| return resp; |
| } |
| |
| private int addFilteredFds(List<FileDescriptor> fds, List<THdfsFileDesc> thriftFds, |
| ValidWriteIdList writeIdList) throws CatalogException { |
| List<FileDescriptor> filteredFds = new ArrayList<>(fds); |
| int numFilesFiltered = AcidUtils.filterFdsForAcidState(filteredFds, writeIdList); |
| for (FileDescriptor fd: filteredFds) { |
| thriftFds.add(fd.toThrift()); |
| } |
| return numFilesFiltered; |
| } |
| |
| private double getFileMetadataCacheHitRate() { |
| long hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC).getCount(); |
| long misses = metrics_.getCounter(FILEMETADATA_CACHE_MISS_METRIC).getCount(); |
| return ((double) hits) / (double) (hits+misses); |
| } |
| |
| /** |
| * Create a THdfsTable corresponding to this HdfsTable. If serializing the "FULL" |
| * information, then then all partitions and THdfsFileDescs of each partition should be |
| * included. Otherwise, don't include any THdfsFileDescs, and include only those |
| * partitions in the refPartitions set (the backend doesn't need metadata for |
| * unreferenced partitions). In addition, metadata that is not used by the backend will |
| * be omitted. |
| * |
| * To prevent the catalog from hitting an OOM error while trying to |
| * serialize large partition incremental stats, we estimate the stats size and filter |
| * the incremental stats data from partition objects if the estimate exceeds |
| * --inc_stats_size_limit_bytes. This function also collects storage related statistics |
| * (e.g. number of blocks, files, etc) in order to compute an estimate of the metadata |
| * size of this table. |
| */ |
| protected THdfsTable getTHdfsTable(ThriftObjectType type, Set<Long> refPartitions) { |
| if (type == ThriftObjectType.FULL) { |
| // "full" implies all partitions should be included. |
| Preconditions.checkArgument(refPartitions == null); |
| } |
| long memUsageEstimate = 0; |
| int numPartitions = |
| (refPartitions == null) ? partitionMap_.values().size() : refPartitions.size(); |
| memUsageEstimate += numPartitions * PER_PARTITION_MEM_USAGE_BYTES; |
| FileMetadataStats stats = new FileMetadataStats(); |
| Map<Long, THdfsPartition> idToPartition = new HashMap<>(); |
| for (HdfsPartition partition: partitionMap_.values()) { |
| long id = partition.getId(); |
| if (refPartitions == null || refPartitions.contains(id)) { |
| THdfsPartition tHdfsPartition = FeCatalogUtils.fsPartitionToThrift( |
| partition, type); |
| if (partition.hasIncrementalStats()) { |
| memUsageEstimate += getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES; |
| hasIncrementalStats_ = true; |
| } |
| if (type == ThriftObjectType.FULL) { |
| Preconditions.checkState(tHdfsPartition.isSetNum_blocks() && |
| tHdfsPartition.isSetTotal_file_size_bytes()); |
| stats.numBlocks += tHdfsPartition.getNum_blocks(); |
| stats.numFiles += |
| tHdfsPartition.isSetFile_desc() ? tHdfsPartition.getFile_desc().size() : 0; |
| stats.numFiles += tHdfsPartition.isSetInsert_file_desc() ? |
| tHdfsPartition.getInsert_file_desc().size() : 0; |
| stats.numFiles += tHdfsPartition.isSetDelete_file_desc() ? |
| tHdfsPartition.getDelete_file_desc().size() : 0; |
| stats.totalFileBytes += tHdfsPartition.getTotal_file_size_bytes(); |
| } |
| idToPartition.put(id, tHdfsPartition); |
| } |
| } |
| if (type == ThriftObjectType.FULL) fileMetadataStats_.set(stats); |
| |
| THdfsPartition prototypePartition = FeCatalogUtils.fsPartitionToThrift( |
| prototypePartition_, ThriftObjectType.DESCRIPTOR_ONLY); |
| |
| memUsageEstimate += fileMetadataStats_.numFiles * PER_FD_MEM_USAGE_BYTES + |
| fileMetadataStats_.numBlocks * PER_BLOCK_MEM_USAGE_BYTES; |
| if (type == ThriftObjectType.FULL) { |
| // These metrics only make sense when we are collecting a FULL object. |
| setEstimatedMetadataSize(memUsageEstimate); |
| setNumFiles(fileMetadataStats_.numFiles); |
| } |
| THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(), |
| nullPartitionKeyValue_, nullColumnValue_, idToPartition, prototypePartition); |
| hdfsTable.setAvroSchema(avroSchema_); |
| hdfsTable.setSql_constraints(sqlConstraints_.toThrift()); |
| if (type == ThriftObjectType.FULL) { |
| hdfsTable.setHas_full_partitions(true); |
| // Network addresses are used only by THdfsFileBlocks which are inside |
| // THdfsFileDesc, so include network addreses only when including THdfsFileDesc. |
| hdfsTable.setNetwork_addresses(hostIndex_.getList()); |
| } |
| hdfsTable.setPartition_prefixes(partitionLocationCompressor_.getPrefixes()); |
| if (AcidUtils.isFullAcidTable(getMetaStoreTable().getParameters())) { |
| hdfsTable.setIs_full_acid(true); |
| } |
| if (validWriteIds_ != null) { |
| hdfsTable.setValid_write_ids( |
| MetastoreShim.convertToTValidWriteIdList(validWriteIds_)); |
| } |
| return hdfsTable; |
| } |
| |
| @Override // FeFsTable |
| public long getTotalHdfsBytes() { return fileMetadataStats_.totalFileBytes; } |
| |
| @Override // FeFsTable |
| public String getHdfsBaseDir() { return hdfsBaseDir_; } |
| |
| public Path getHdfsBaseDirPath() { |
| Preconditions.checkNotNull(hdfsBaseDir_, "HdfsTable base dir is null"); |
| return new Path(hdfsBaseDir_); |
| } |
| |
| @Override // FeFsTable |
| public boolean usesAvroSchemaOverride() { return avroSchema_ != null; } |
| |
| @Override // FeFsTable |
| public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; } |
| |
| @Override |
| public SqlConstraints getSqlConstraints() { |
| return sqlConstraints_; |
| } |
| |
| /** |
| * Returns the set of file formats that the partitions are stored in. |
| */ |
| @Override |
| public Set<HdfsFileFormat> getFileFormats() { |
| // In the case that we have no partitions added to the table yet, it's |
| // important to add the "prototype" partition as a fallback. |
| Iterable<HdfsPartition> partitionsToConsider = Iterables.concat( |
| partitionMap_.values(), Collections.singleton(prototypePartition_)); |
| return FeCatalogUtils.getFileFormats(partitionsToConsider); |
| } |
| |
| /** |
| * Returns the HDFS paths corresponding to HdfsTable partitions that don't exist in |
| * the Hive Metastore. An HDFS path is represented as a list of strings values, one per |
| * partition key column. |
| */ |
| public List<List<String>> getPathsWithoutPartitions() throws CatalogException { |
| Set<List<LiteralExpr>> existingPartitions = new HashSet<>(); |
| // Get the list of partition values of existing partitions in Hive Metastore. |
| for (HdfsPartition partition: partitionMap_.values()) { |
| existingPartitions.add(partition.getPartitionValues()); |
| } |
| |
| List<String> partitionKeys = new ArrayList<>(); |
| for (int i = 0; i < numClusteringCols_; ++i) { |
| partitionKeys.add(getColumns().get(i).getName()); |
| } |
| Path basePath = new Path(hdfsBaseDir_); |
| List<List<String>> partitionsNotInHms = new ArrayList<>(); |
| try { |
| getAllPartitionsNotInHms(basePath, partitionKeys, existingPartitions, |
| partitionsNotInHms); |
| } catch (Exception e) { |
| throw new CatalogException(String.format("Failed to recover partitions for %s " + |
| "with exception:%s.", getFullName(), e)); |
| } |
| return partitionsNotInHms; |
| } |
| |
| /** |
| * Returns all partitions which match the partition keys directory structure and pass |
| * type compatibility check. Also these partitions are not already part of the table. |
| */ |
| private void getAllPartitionsNotInHms(Path path, List<String> partitionKeys, |
| Set<List<LiteralExpr>> existingPartitions, |
| List<List<String>> partitionsNotInHms) throws IOException { |
| FileSystem fs = path.getFileSystem(CONF); |
| List<String> partitionValues = new ArrayList<>(); |
| List<LiteralExpr> partitionExprs = new ArrayList<>(); |
| getAllPartitionsNotInHms(path, partitionKeys, 0, fs, partitionValues, |
| partitionExprs, existingPartitions, partitionsNotInHms); |
| } |
| |
| /** |
| * Returns all partitions which match the partition keys directory structure and pass |
| * the type compatibility check. |
| * |
| * path e.g. c1=1/c2=2/c3=3 |
| * partitionKeys The ordered partition keys. e.g.("c1", "c2", "c3") |
| * depth. The start position in partitionKeys to match the path name. |
| * partitionValues The partition values used to create a partition. |
| * partitionExprs The list of LiteralExprs which is used to avoid duplicate partitions. |
| * E.g. Having /c1=0001 and /c1=01, we should make sure only one partition |
| * will be added. |
| * existingPartitions All partitions which exist in Hive Metastore or newly added. |
| * partitionsNotInHms Contains all the recovered partitions. |
| */ |
| private void getAllPartitionsNotInHms(Path path, List<String> partitionKeys, |
| int depth, FileSystem fs, List<String> partitionValues, |
| List<LiteralExpr> partitionExprs, Set<List<LiteralExpr>> existingPartitions, |
| List<List<String>> partitionsNotInHms) throws IOException { |
| if (depth == partitionKeys.size()) { |
| if (existingPartitions.contains(partitionExprs)) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(String.format("Skip recovery of path '%s' because it already " |
| + "exists in metastore", path.toString())); |
| } |
| } else { |
| partitionsNotInHms.add(partitionValues); |
| existingPartitions.add(partitionExprs); |
| } |
| return; |
| } |
| |
| RemoteIterator <? extends FileStatus> statuses = fs.listStatusIterator(path); |
| |
| if (statuses == null) return; |
| while (statuses.hasNext()) { |
| FileStatus status = statuses.next(); |
| if (!status.isDirectory()) continue; |
| Pair<String, LiteralExpr> keyValues = |
| getTypeCompatibleValue(status.getPath(), partitionKeys.get(depth)); |
| if (keyValues == null) continue; |
| |
| List<String> currentPartitionValues = Lists.newArrayList(partitionValues); |
| List<LiteralExpr> currentPartitionExprs = Lists.newArrayList(partitionExprs); |
| currentPartitionValues.add(keyValues.first); |
| currentPartitionExprs.add(keyValues.second); |
| getAllPartitionsNotInHms(status.getPath(), partitionKeys, depth + 1, fs, |
| currentPartitionValues, currentPartitionExprs, |
| existingPartitions, partitionsNotInHms); |
| } |
| } |
| |
| /** |
| * Checks that the last component of 'path' is of the form "<partitionkey>=<v>" |
| * where 'v' is a type-compatible value from the domain of the 'partitionKey' column. |
| * If not, returns null, otherwise returns a Pair instance, the first element is the |
| * original value, the second element is the LiteralExpr created from the original |
| * value. |
| */ |
| private Pair<String, LiteralExpr> getTypeCompatibleValue(Path path, |
| String partitionKey) throws UnsupportedEncodingException { |
| String partName[] = path.getName().split("="); |
| if (partName.length != 2 || !partName[0].equals(partitionKey)) return null; |
| |
| // Check Type compatibility for Partition value. |
| Column column = getColumn(partName[0]); |
| Preconditions.checkNotNull(column); |
| Type type = column.getType(); |
| LiteralExpr expr = null; |
| // URL decode the partition value since it may contain encoded URL. |
| String value = URLDecoder.decode(partName[1], StandardCharsets.UTF_8.name()); |
| if (!value.equals(getNullPartitionKeyValue())) { |
| try { |
| expr = LiteralExpr.createFromUnescapedStr(value, type); |
| // Skip large value which exceeds the MAX VALUE of specified Type. |
| if (expr instanceof NumericLiteral) { |
| if (NumericLiteral.isOverflow(((NumericLiteral) expr).getValue(), type)) { |
| LOG.warn(String.format("Skip the overflow value (%s) for Type (%s).", |
| value, type.toSql())); |
| return null; |
| } |
| } |
| } catch (Exception ex) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(String.format("Invalid partition value (%s) for Type (%s).", |
| value, type.toSql())); |
| } |
| return null; |
| } |
| } else { |
| expr = new NullLiteral(); |
| } |
| return new Pair<String, LiteralExpr>(value, expr); |
| } |
| |
| @Override // FeFsTable |
| public TResultSet getTableStats() { |
| return getTableStats(this); |
| } |
| |
| @Override |
| public FileSystemUtil.FsType getFsType() { |
| Preconditions.checkNotNull(getHdfsBaseDirPath().toUri().getScheme(), |
| "Cannot get scheme from path " + getHdfsBaseDirPath()); |
| return FileSystemUtil.FsType.getFsType(getHdfsBaseDirPath().toUri().getScheme()); |
| } |
| |
| // TODO(todd): move to FeCatalogUtils. Upon moving to Java 8, could be |
| // a default method of FeFsTable. |
| public static TResultSet getTableStats(FeFsTable table) { |
| TResultSet result = new TResultSet(); |
| TResultSetMetadata resultSchema = new TResultSetMetadata(); |
| result.setSchema(resultSchema); |
| |
| for (int i = 0; i < table.getNumClusteringCols(); ++i) { |
| // Add the partition-key values as strings for simplicity. |
| Column partCol = table.getColumns().get(i); |
| TColumn colDesc = new TColumn(partCol.getName(), Type.STRING.toThrift()); |
| resultSchema.addToColumns(colDesc); |
| } |
| |
| boolean statsExtrap = Utils.isStatsExtrapolationEnabled(table); |
| |
| resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift())); |
| if (statsExtrap) { |
| resultSchema.addToColumns(new TColumn("Extrap #Rows", Type.BIGINT.toThrift())); |
| } |
| resultSchema.addToColumns(new TColumn("#Files", Type.BIGINT.toThrift())); |
| resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("Bytes Cached", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("Cache Replication", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("Format", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("Incremental stats", Type.STRING.toThrift())); |
| resultSchema.addToColumns(new TColumn("Location", Type.STRING.toThrift())); |
| |
| // Pretty print partitions and their stats. |
| List<FeFsPartition> orderedPartitions = new ArrayList<>( |
| FeCatalogUtils.loadAllPartitions(table)); |
| Collections.sort(orderedPartitions, HdfsPartition.KV_COMPARATOR); |
| |
| long totalCachedBytes = 0L; |
| long totalBytes = 0L; |
| long totalNumFiles = 0L; |
| for (FeFsPartition p: orderedPartitions) { |
| int numFiles = p.getNumFileDescriptors(); |
| long size = p.getSize(); |
| totalNumFiles += numFiles; |
| totalBytes += size; |
| |
| TResultRowBuilder rowBuilder = new TResultRowBuilder(); |
| |
| // Add the partition-key values (as strings for simplicity). |
| for (LiteralExpr expr: p.getPartitionValues()) { |
| rowBuilder.add(expr.getStringValue()); |
| } |
| |
| // Add rows, extrapolated rows, files, bytes, cache stats, and file format. |
| rowBuilder.add(p.getNumRows()); |
| // Compute and report the extrapolated row count because the set of files could |
| // have changed since we last computed stats for this partition. We also follow |
| // this policy during scan-cardinality estimation. |
| if (statsExtrap) { |
| rowBuilder.add(Utils.getExtrapolatedNumRows(table, size)); |
| } |
| |
| rowBuilder.add(numFiles).addBytes(size); |
| if (!p.isMarkedCached()) { |
| // Helps to differentiate partitions that have 0B cached versus partitions |
| // that are not marked as cached. |
| rowBuilder.add("NOT CACHED"); |
| rowBuilder.add("NOT CACHED"); |
| } else { |
| // Calculate the number the number of bytes that are cached. |
| long cachedBytes = 0L; |
| for (FileDescriptor fd: p.getFileDescriptors()) { |
| int numBlocks = fd.getNumFileBlocks(); |
| for (int i = 0; i < numBlocks; ++i) { |
| FbFileBlock block = fd.getFbFileBlock(i); |
| if (FileBlock.hasCachedReplica(block)) { |
| cachedBytes += FileBlock.getLength(block); |
| } |
| } |
| } |
| totalCachedBytes += cachedBytes; |
| rowBuilder.addBytes(cachedBytes); |
| |
| // Extract cache replication factor from the parameters of the table |
| // if the table is not partitioned or directly from the partition. |
| Short rep = HdfsCachingUtil.getCachedCacheReplication( |
| table.getNumClusteringCols() == 0 ? |
| p.getTable().getMetaStoreTable().getParameters() : |
| p.getParameters()); |
| rowBuilder.add(rep.toString()); |
| } |
| rowBuilder.add(p.getFileFormat().toString()); |
| rowBuilder.add(String.valueOf(p.hasIncrementalStats())); |
| rowBuilder.add(p.getLocation()); |
| result.addToRows(rowBuilder.get()); |
| } |
| |
| // For partitioned tables add a summary row at the bottom. |
| if (table.getNumClusteringCols() > 0) { |
| TResultRowBuilder rowBuilder = new TResultRowBuilder(); |
| int numEmptyCells = table.getNumClusteringCols() - 1; |
| rowBuilder.add("Total"); |
| for (int i = 0; i < numEmptyCells; ++i) { |
| rowBuilder.add(""); |
| } |
| |
| // Total rows, extrapolated rows, files, bytes, cache stats. |
| // Leave format empty. |
| rowBuilder.add(table.getNumRows()); |
| // Compute and report the extrapolated row count because the set of files could |
| // have changed since we last computed stats for this partition. We also follow |
| // this policy during scan-cardinality estimation. |
| if (statsExtrap) { |
| rowBuilder.add(Utils.getExtrapolatedNumRows( |
| table, table.getTotalHdfsBytes())); |
| } |
| rowBuilder.add(totalNumFiles) |
| .addBytes(totalBytes) |
| .addBytes(totalCachedBytes).add("").add("").add("").add(""); |
| result.addToRows(rowBuilder.get()); |
| } |
| return result; |
| } |
| |
| /** |
| * Constructs a partition name from a list of TPartitionKeyValue objects. |
| */ |
| public static String constructPartitionName(List<TPartitionKeyValue> partitionSpec) { |
| List<String> partitionCols = new ArrayList<>(); |
| List<String> partitionVals = new ArrayList<>(); |
| for (TPartitionKeyValue kv: partitionSpec) { |
| partitionCols.add(kv.getName()); |
| partitionVals.add(kv.getValue()); |
| } |
| return org.apache.hadoop.hive.common.FileUtils.makePartName(partitionCols, |
| partitionVals); |
| } |
| |
| /** |
| * Reloads the metadata of partition 'oldPartition' by removing |
| * it from the table and reconstructing it from the HMS partition object |
| * 'hmsPartition'. If old partition is null then nothing is removed and |
| * and partition constructed from 'hmsPartition' is simply added. |
| */ |
| public void reloadPartition(IMetaStoreClient client, HdfsPartition oldPartition, |
| Partition hmsPartition) throws CatalogException { |
| HdfsPartition.Builder partBuilder = createPartitionBuilder( |
| hmsPartition.getSd(), hmsPartition, new FsPermissionCache()); |
| Preconditions.checkArgument(oldPartition == null |
| || HdfsPartition.comparePartitionKeyValues( |
| oldPartition.getPartitionValues(), partBuilder.getPartitionValues()) == 0); |
| if (oldPartition != null) { |
| partBuilder.setFileDescriptors(oldPartition); |
| } |
| loadFileMetadataForPartitions(client, ImmutableList.of(partBuilder), |
| /*isRefresh=*/true); |
| dropPartition(oldPartition, false); |
| addPartition(partBuilder.build()); |
| } |
| |
| /** |
| * Registers table metrics. |
| */ |
| @Override |
| public void initMetrics() { |
| super.initMetrics(); |
| metrics_.addGauge(NUM_PARTITIONS_METRIC, new Gauge<Integer>() { |
| @Override |
| public Integer getValue() { return partitionMap_.values().size(); } |
| }); |
| metrics_.addGauge(NUM_FILES_METRIC, new Gauge<Long>() { |
| @Override |
| public Long getValue() { return fileMetadataStats_.numFiles; } |
| }); |
| metrics_.addGauge(NUM_BLOCKS_METRIC, new Gauge<Long>() { |
| @Override |
| public Long getValue() { return fileMetadataStats_.numBlocks; } |
| }); |
| metrics_.addGauge(TOTAL_FILE_BYTES_METRIC, new Gauge<Long>() { |
| @Override |
| public Long getValue() { return fileMetadataStats_.totalFileBytes; } |
| }); |
| metrics_.addGauge(MEMORY_ESTIMATE_METRIC, new Gauge<Long>() { |
| @Override |
| public Long getValue() { return getEstimatedMetadataSize(); } |
| }); |
| metrics_.addGauge(HAS_INCREMENTAL_STATS_METRIC, new Gauge<Boolean>() { |
| @Override |
| public Boolean getValue() { return hasIncrementalStats_; } |
| }); |
| metrics_.addTimer(CATALOG_UPDATE_DURATION_METRIC); |
| metrics_.addCounter(FILEMETADATA_CACHE_HIT_METRIC); |
| metrics_.addCounter(FILEMETADATA_CACHE_MISS_METRIC); |
| } |
| |
| /** |
| * Creates a temporary HdfsTable object populated with the specified properties. |
| * This is used for CTAS statements. |
| */ |
| public static HdfsTable createCtasTarget(Db db, |
| org.apache.hadoop.hive.metastore.api.Table msTbl) throws CatalogException { |
| HdfsTable tmpTable = new HdfsTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner()); |
| HiveConf hiveConf = new HiveConf(HdfsTable.class); |
| // set nullPartitionKeyValue from the hive conf. |
| tmpTable.nullPartitionKeyValue_ = hiveConf.get( |
| MetaStoreUtil.NULL_PARTITION_KEY_VALUE_CONF_KEY, |
| MetaStoreUtil.DEFAULT_NULL_PARTITION_KEY_VALUE); |
| tmpTable.loadSchema(msTbl); |
| tmpTable.initializePartitionMetadata(msTbl); |
| tmpTable.setTableStats(msTbl); |
| return tmpTable; |
| } |
| |
| /** |
| * Returns true if the table is partitioned, false otherwise. |
| */ |
| public boolean isPartitioned() { |
| return getMetaStoreTable().getPartitionKeysSize() > 0; |
| } |
| |
| /** |
| * Get valid write ids for the acid table. |
| * @param client the client to access HMS |
| * @return the list of valid write IDs for the table |
| */ |
| protected ValidWriteIdList fetchValidWriteIds(IMetaStoreClient client) |
| throws TableLoadingException { |
| String tblFullName = getFullName(); |
| if (LOG.isTraceEnabled()) LOG.trace("Get valid writeIds for table: " + tblFullName); |
| ValidWriteIdList validWriteIds = null; |
| try { |
| validWriteIds = MetastoreShim.fetchValidWriteIds(client, tblFullName); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Valid writeIds: " + validWriteIds.writeToString()); |
| } |
| return validWriteIds; |
| } catch (Exception e) { |
| throw new TableLoadingException(String.format("Error loading ValidWriteIds for " + |
| "table '%s'", getName()), e); |
| } |
| } |
| |
| /** |
| * Set ValistWriteIdList with stored writeId |
| * @param client the client to access HMS |
| */ |
| protected boolean loadValidWriteIdList(IMetaStoreClient client) |
| throws TableLoadingException { |
| Stopwatch sw = Stopwatch.createStarted(); |
| Preconditions.checkState(msTable_ != null && msTable_.getParameters() != null); |
| boolean prevWriteIdChanged = false; |
| if (MetastoreShim.getMajorVersion() > 2 && |
| AcidUtils.isTransactionalTable(msTable_.getParameters())) { |
| ValidWriteIdList writeIdList = fetchValidWriteIds(client); |
| prevWriteIdChanged = writeIdList.toString().equals(validWriteIds_); |
| validWriteIds_ = writeIdList; |
| } else { |
| validWriteIds_ = null; |
| } |
| LOG.debug("Load Valid Write Id List Done. Time taken: " + |
| PrintUtils.printTimeNs(sw.elapsed(TimeUnit.NANOSECONDS))); |
| return prevWriteIdChanged; |
| } |
| |
| @Override |
| public ValidWriteIdList getValidWriteIds() { |
| return validWriteIds_; |
| } |
| } |