blob: c2417f6a7ee5420f0fbcf57b1bff831968110b5d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.impala.analysis.ColumnDef;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.analysis.NullLiteral;
import org.apache.impala.analysis.NumericLiteral;
import org.apache.impala.analysis.PartitionKeyValue;
import org.apache.impala.catalog.HdfsPartition.FileBlock;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.Reference;
import org.apache.impala.fb.FbFileBlock;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.ImpalaInternalServiceConstants;
import org.apache.impala.thrift.TAccessLevel;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.THdfsPartition;
import org.apache.impala.thrift.THdfsTable;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TResultRow;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TResultSetMetadata;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTableType;
import org.apache.impala.util.AvroSchemaConverter;
import org.apache.impala.util.AvroSchemaParser;
import org.apache.impala.util.AvroSchemaUtils;
import org.apache.impala.util.FsPermissionChecker;
import org.apache.impala.util.HdfsCachingUtil;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.impala.util.TAccessLevelUtil;
import org.apache.impala.util.TResultRowBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Internal representation of table-related metadata of a file-resident table on a
* Hadoop filesystem. The table data can be accessed through libHDFS (which is more of
* an abstraction over Hadoop's FileSystem class rather than DFS specifically). A
* partitioned table can even span multiple filesystems.
* This class is not thread-safe. Clients of this class need to protect against
* concurrent updates using external locking (see CatalogOpExecutor class).
* Owned by Catalog instance.
* The partition keys constitute the clustering columns.
public class HdfsTable extends Table {
// hive's default value for table property 'serialization.null.format'
private static final String DEFAULT_NULL_COLUMN_VALUE = "\\N";
// Name of default partition for unpartitioned tables
private static final String DEFAULT_PARTITION_NAME = "";
// Number of times to retry fetching the partitions from the HMS should an error occur.
private final static int NUM_PARTITION_FETCH_RETRIES = 5;
// Maximum number of errors logged when loading partitioned tables.
private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100;
// Table property key for skip.header.line.count
public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = "skip.header.line.count";
// string to indicate NULL. set in load() from table properties
private String nullColumnValue_;
// hive uses this string for NULL partition keys. Set in load().
private String nullPartitionKeyValue_;
// Avro schema of this table if this is an Avro table, otherwise null. Set in load().
private String avroSchema_ = null;
// Set to true if any of the partitions have Avro data.
private boolean hasAvroData_ = false;
// True if this table's metadata is marked as cached. Does not necessarily mean the
// data is cached or that all/any partitions are cached.
private boolean isMarkedCached_ = false;
// Array of sorted maps storing the association between partition values and
// partition ids. There is one sorted map per partition key. It is only populated if
// this table object is stored in ImpaladCatalog.
private final ArrayList<TreeMap<LiteralExpr, HashSet<Long>>> partitionValuesMap_ =
// Array of partition id sets that correspond to partitions with null values
// in the partition keys; one set per partition key. It is not populated if the table is
// stored in the catalog server.
private final ArrayList<HashSet<Long>> nullPartitionIds_ = Lists.newArrayList();
// Map of partition ids to HdfsPartitions.
private final HashMap<Long, HdfsPartition> partitionMap_ = Maps.newHashMap();
// Map of partition name to HdfsPartition object. Used for speeding up
// table metadata loading.
private final HashMap<String, HdfsPartition> nameToPartitionMap_ = Maps.newHashMap();
// Store all the partition ids of an HdfsTable.
private final HashSet<Long> partitionIds_ = Sets.newHashSet();
// Estimate (in bytes) of the incremental stats size per column per partition
public static final long STATS_SIZE_PER_COLUMN_BYTES = 400;
// Bi-directional map between an integer index and a unique datanode
// TNetworkAddresses, each of which contains blocks of 1 or more
// files in this table. The network addresses are stored using IP
// address as the host name. Each FileBlock specifies a list of
// indices within this hostIndex_ to specify which nodes contain
// replicas of the block.
private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<TNetworkAddress>();
private HdfsPartitionLocationCompressor partitionLocationCompressor_;
// Total number of Hdfs files in this table. Accounted only in the Impalad catalog
// cache. Set to -1 on Catalogd.
private long numHdfsFiles_;
// Sum of sizes of all Hdfs files in this table. Accounted only in the Impalad
// catalog cache. Set to -1 on Catalogd.
private long totalHdfsBytes_;
// True iff the table's partitions are located on more than one filesystem.
private boolean multipleFileSystems_ = false;
// Base Hdfs directory where files of this table are stored.
// For unpartitioned tables it is simply the path where all files live.
// For partitioned tables it is the root directory
// under which partition dirs are placed.
protected String hdfsBaseDir_;
// List of FieldSchemas that correspond to the non-partition columns. Used when
// describing this table and its partitions to the HMS (e.g. as part of an alter table
// operation), when only non-partition columns are required.
private final List<FieldSchema> nonPartFieldSchemas_ = Lists.newArrayList();
// Flag to check if the table schema has been loaded. Used as a precondition
// for setAvroSchema().
private boolean isSchemaLoaded_ = false;
private final static Logger LOG = LoggerFactory.getLogger(HdfsTable.class);
// Caching this configuration object makes calls to getFileSystem much quicker
// (saves ~50ms on a standard plan)
// TODO(henry): confirm that this is thread safe - cursory inspection of the class
// and its usage in getFileSystem suggests it should be.
private static final Configuration CONF = new Configuration();
private static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD =
private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD =
// File/Block metadata loading stats for a single HDFS path.
private class FileMetadataLoadStats {
// Path corresponding to this metadata load request.
private final Path hdfsPath;
// Number of files for which the metadata was loaded.
public int loadedFiles = 0;
// Number of hidden files excluded from file metadata loading. More details at
// isValidDataFile().
public int hiddenFiles = 0;
// Number of files skipped from file metadata loading because the files have not
// changed since the last load. More details at hasFileChanged().
public int skippedFiles = 0;
// Number of unknown disk IDs encountered while loading block
// metadata for this path.
public long unknownDiskIds = 0;
public FileMetadataLoadStats(Path path) { hdfsPath = path; }
public String debugString() {
return String.format("Path: %s: Loaded files: %s Hidden files: %s " +
"Skipped files: %s Unknown diskIDs: %s", hdfsPath, loadedFiles, hiddenFiles,
skippedFiles, unknownDiskIds);
// A callable implementation of file metadata loading request for a given
// HDFS path.
public class FileMetadataLoadRequest
implements Callable<FileMetadataLoadStats> {
private final Path hdfsPath_;
// All the partitions mapped to the above path
private final List<HdfsPartition> partitionList_;
// If set to true, reloads the file metadata only when the files in this path
// have changed since last load (more details in hasFileChanged()).
private final boolean reuseFileMd_;
public FileMetadataLoadRequest(Path path, List<HdfsPartition> partitions,
boolean reuseFileMd) {
hdfsPath_ = path;
partitionList_ = partitions;
reuseFileMd_ = reuseFileMd;
public FileMetadataLoadStats call() throws IOException {
FileMetadataLoadStats loadingStats =
reuseFileMd_ ? refreshFileMetadata(hdfsPath_, partitionList_) :
resetAndLoadFileMetadata(hdfsPath_, partitionList_);
return loadingStats;
public String debugString() {
String loadType = reuseFileMd_? "Refreshed" : "Loaded";
return String.format("%s file metadata for path: %s", loadType,
public HdfsTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
Db db, String name, String owner) {
super(msTbl, db, name, owner);
partitionLocationCompressor_ =
new HdfsPartitionLocationCompressor(numClusteringCols_);
* Returns true if the table resides at a location which supports caching (e.g. HDFS).
public boolean isLocationCacheable() {
return FileSystemUtil.isPathCacheable(new Path(getLocation()));
* Returns true if the table and all its partitions reside at locations which
* support caching (e.g. HDFS).
public boolean isCacheable() {
if (!isLocationCacheable()) return false;
if (!isMarkedCached() && numClusteringCols_ > 0) {
for (HdfsPartition partition: getPartitions()) {
if (partition.getId() == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) {
if (!partition.isCacheable()) {
return false;
return true;
* Updates numHdfsFiles_ and totalHdfsBytes_ based on the partition information.
* This is used only for the frontend tests that do not spawn a separate Catalog
* instance.
public void computeHdfsStatsForTesting() {
Preconditions.checkState(numHdfsFiles_ == -1 && totalHdfsBytes_ == -1);
numHdfsFiles_ = 0;
totalHdfsBytes_ = 0;
for (HdfsPartition partition: partitionMap_.values()) {
numHdfsFiles_ += partition.getNumFileDescriptors();
totalHdfsBytes_ += partition.getSize();
* Drops and re-loads the file metadata of all the partitions in 'partitions' that
* map to the path 'partDir'. 'partDir' may belong to any file system that
* implements the hadoop's FileSystem interface (like HDFS, S3, ADLS etc.). It involves
* the following steps:
* - Clear the current file metadata of the partitions.
* - Call FileSystem.listFiles() on 'partDir' to fetch the FileStatus and BlockLocations
* for each file under it.
* - For every valid data file, enumerate all its blocks and their corresponding hosts
* and disk IDs if the underlying file system supports the block locations API
* (for ex: HDFS). For other file systems (like S3) we synthesize the file metadata
* manually by splitting the file ranges into fixed size blocks.
* For filesystems that don't support BlockLocation API, synthesize file blocks
* by manually splitting the file range into fixed-size blocks. That way, scan
* ranges can be derived from file blocks as usual. All synthesized blocks are given
* an invalid network address so that the scheduler will treat them as remote.
private FileMetadataLoadStats resetAndLoadFileMetadata(
Path partDir, List<HdfsPartition> partitions) throws IOException {
FileMetadataLoadStats loadStats = new FileMetadataLoadStats(partDir);
// No need to load blocks for empty partitions list.
if (partitions == null || partitions.isEmpty()) return loadStats;
if (LOG.isTraceEnabled()) {
LOG.trace("Loading block md for " + getFullName() + " path: " + partDir.toString());
FileSystem fs = partDir.getFileSystem(CONF);
boolean synthesizeFileMd = !FileSystemUtil.supportsStorageIds(fs);
RemoteIterator<LocatedFileStatus> fileStatusIter =
FileSystemUtil.listFiles(fs, partDir, false);
if (fileStatusIter == null) return loadStats;
Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
List<FileDescriptor> newFileDescs = Lists.newArrayList();
while (fileStatusIter.hasNext()) {
LocatedFileStatus fileStatus =;
if (!FileSystemUtil.isValidDataFile(fileStatus)) {
FileDescriptor fd;
// Block locations are manually synthesized if the underlying fs does not support
// the block location API.
if (synthesizeFileMd) {
fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
partitions.get(0).getFileFormat(), hostIndex_);
} else {
fd = FileDescriptor.create(fileStatus,
fileStatus.getBlockLocations(), fs, hostIndex_, numUnknownDiskIds);
for (HdfsPartition partition: partitions) partition.setFileDescriptors(newFileDescs);
loadStats.unknownDiskIds += numUnknownDiskIds.getRef();
if (LOG.isTraceEnabled()) {
LOG.trace("Loaded file metadata for " + getFullName() + " " +
return loadStats;
* Refreshes file metadata information for 'path'. This method is optimized for
* the case where the files in the partition have not changed dramatically. It first
* uses a listStatus() call on the partition directory to detect the modified files
* (look at hasFileChanged()) and fetches their block locations using the
* getFileBlockLocations() method. Our benchmarks suggest that the listStatus() call
* is much faster then the listFiles() (up to ~40x faster in some cases).
private FileMetadataLoadStats refreshFileMetadata(
Path partDir, List<HdfsPartition> partitions) throws IOException {
FileMetadataLoadStats loadStats = new FileMetadataLoadStats(partDir);
// No need to load blocks for empty partitions list.
if (partitions == null || partitions.isEmpty()) return loadStats;
if (LOG.isTraceEnabled()) {
LOG.trace("Refreshing block md for " + getFullName() + " path: " +
// Index the partition file descriptors by their file names for O(1) look ups.
// We just pick the first partition to generate the fileDescByName lookup table
// since all the partitions map to the same partDir.
ImmutableMap<String, FileDescriptor> fileDescsByName = Maps.uniqueIndex(
partitions.get(0).getFileDescriptors(), new Function<FileDescriptor, String>() {
public String apply(FileDescriptor desc) { return desc.getFileName(); }
FileSystem fs = partDir.getFileSystem(CONF);
FileStatus[] fileStatuses = FileSystemUtil.listStatus(fs, partDir);
if (fileStatuses == null) return loadStats;
boolean synthesizeFileMd = !FileSystemUtil.supportsStorageIds(fs);
Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
List<FileDescriptor> newFileDescs = Lists.newArrayList();
HdfsFileFormat fileFormat = partitions.get(0).getFileFormat();
// If there is a cached partition mapped to this path, we recompute the block
// locations even if the underlying files have not changed (hasFileChanged()).
// This is done to keep the cached block metadata up to date.
boolean isPartitionMarkedCached = false;
for (HdfsPartition partition: partitions) {
if (partition.isMarkedCached()) {
isPartitionMarkedCached = true;
for (FileStatus fileStatus: fileStatuses) {
if (!FileSystemUtil.isValidDataFile(fileStatus)) {
String fileName = fileStatus.getPath().getName().toString();
FileDescriptor fd = fileDescsByName.get(fileName);
if (isPartitionMarkedCached || hasFileChanged(fd, fileStatus)) {
if (synthesizeFileMd) {
fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
fileFormat, hostIndex_);
} else {
BlockLocation[] locations =
fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
} else {
loadStats.unknownDiskIds += numUnknownDiskIds.getRef();
for (HdfsPartition partition: partitions) partition.setFileDescriptors(newFileDescs);
if (LOG.isTraceEnabled()) {
LOG.trace("Refreshed file metadata for " + getFullName() + " "
+ loadStats.debugString());
return loadStats;
* Compares the modification time and file size between the FileDescriptor and the
* FileStatus to determine if the file has changed. Returns true if the file has changed
* and false otherwise.
private static boolean hasFileChanged(FileDescriptor fd, FileStatus status) {
return (fd == null) || (fd.getFileLength() != status.getLen()) ||
(fd.getModificationTime() != status.getModificationTime());
* Helper method to reload the file metadata of a single partition.
private void refreshPartitionFileMetadata(HdfsPartition partition)
throws CatalogException {
try {
Path partDir = partition.getLocationPath();
FileMetadataLoadStats stats = refreshFileMetadata(partDir,
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Refreshed file metadata for %s %s",
getFullName(), stats.debugString()));
} catch (IOException e) {
throw new CatalogException("Encountered invalid partition path", e);
public TCatalogObjectType getCatalogObjectType() {
return TCatalogObjectType.TABLE;
public boolean isMarkedCached() { return isMarkedCached_; }
public Collection<HdfsPartition> getPartitions() { return partitionMap_.values(); }
public Map<Long, HdfsPartition> getPartitionMap() { return partitionMap_; }
public Set<Long> getNullPartitionIds(int i) { return nullPartitionIds_.get(i); }
public HdfsPartitionLocationCompressor getPartitionLocationCompressor() {
return partitionLocationCompressor_;
public Set<Long> getPartitionIds() { return partitionIds_; }
public TreeMap<LiteralExpr, HashSet<Long>> getPartitionValueMap(int i) {
return partitionValuesMap_.get(i);
* Returns the value Hive is configured to use for NULL partition key values.
* Set during load.
public String getNullPartitionKeyValue() { return nullPartitionKeyValue_; }
* Returns the storage location (HDFS path) of this table.
public String getLocation() {
return super.getMetaStoreTable().getSd().getLocation();
List<FieldSchema> getNonPartitionFieldSchemas() { return nonPartFieldSchemas_; }
// True if Impala has HDFS write permissions on the hdfsBaseDir (for an unpartitioned
// table) or if Impala has write permissions on all partition directories (for
// a partitioned table).
public boolean hasWriteAccess() {
return TAccessLevelUtil.impliesWriteAccess(accessLevel_);
* Returns the first location (HDFS path) that Impala does not have WRITE access
* to, or an null if none is found. For an unpartitioned table, this just
* checks the hdfsBaseDir. For a partitioned table it checks all partition directories.
public String getFirstLocationWithoutWriteAccess() {
if (getMetaStoreTable() == null) return null;
if (getMetaStoreTable().getPartitionKeysSize() == 0) {
if (!TAccessLevelUtil.impliesWriteAccess(accessLevel_)) {
return hdfsBaseDir_;
} else {
for (HdfsPartition partition: partitionMap_.values()) {
if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
return partition.getLocation();
return null;
* Gets the HdfsPartition matching the given partition spec. Returns null if no match
* was found.
public HdfsPartition getPartition(
List<PartitionKeyValue> partitionSpec) {
List<TPartitionKeyValue> partitionKeyValues = Lists.newArrayList();
for (PartitionKeyValue kv: partitionSpec) {
String value = PartitionKeyValue.getPartitionKeyValueString(
kv.getLiteralValue(), getNullPartitionKeyValue());
partitionKeyValues.add(new TPartitionKeyValue(kv.getColName(), value));
return getPartitionFromThriftPartitionSpec(partitionKeyValues);
* Gets the HdfsPartition matching the Thrift version of the partition spec.
* Returns null if no match was found.
public HdfsPartition getPartitionFromThriftPartitionSpec(
List<TPartitionKeyValue> partitionSpec) {
// First, build a list of the partition values to search for in the same order they
// are defined in the table.
List<String> targetValues = Lists.newArrayList();
Set<String> keys = Sets.newHashSet();
for (FieldSchema fs: getMetaStoreTable().getPartitionKeys()) {
for (TPartitionKeyValue kv: partitionSpec) {
if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) {
// Same key was specified twice
if (!keys.add(kv.getName().toLowerCase())) {
return null;
// Make sure the number of values match up and that some values were found.
if (targetValues.size() == 0 ||
(targetValues.size() != getMetaStoreTable().getPartitionKeysSize())) {
return null;
// Search through all the partitions and check if their partition key values
// match the values being searched for.
for (HdfsPartition partition: partitionMap_.values()) {
if (partition.isDefaultPartition()) continue;
List<LiteralExpr> partitionValues = partition.getPartitionValues();
Preconditions.checkState(partitionValues.size() == targetValues.size());
boolean matchFound = true;
for (int i = 0; i < targetValues.size(); ++i) {
String value;
if (partitionValues.get(i) instanceof NullLiteral) {
value = getNullPartitionKeyValue();
} else {
value = partitionValues.get(i).getStringValue();
// See IMPALA-252: we deliberately map empty strings on to
// NULL when they're in partition columns. This is for
// backwards compatibility with Hive, and is clearly broken.
if (value.isEmpty()) value = getNullPartitionKeyValue();
if (!targetValues.get(i).equals(value)) {
matchFound = false;
if (matchFound) {
return partition;
return null;
* Gets hdfs partitions by the given partition set.
public List<HdfsPartition> getPartitionsFromPartitionSet(
List<List<TPartitionKeyValue>> partitionSet) {
List<HdfsPartition> partitions = Lists.newArrayList();
for (List<TPartitionKeyValue> kv : partitionSet) {
HdfsPartition partition =
if (partition != null) partitions.add(partition);
return partitions;
* Create columns corresponding to fieldSchemas. Throws a TableLoadingException if the
* metadata is incompatible with what we support.
private void addColumnsFromFieldSchemas(List<FieldSchema> fieldSchemas)
throws TableLoadingException {
int pos = colsByPos_.size();
for (FieldSchema s: fieldSchemas) {
Type type = parseColumnType(s);
// Check if we support partitioning on columns of such a type.
if (pos < numClusteringCols_ && !type.supportsTablePartitioning()) {
throw new TableLoadingException(
String.format("Failed to load metadata for table '%s' because of " +
"unsupported partition-column type '%s' in partition column '%s'",
getFullName(), type.toString(), s.getName()));
Column col = new Column(s.getName(), type, s.getComment(), pos);
* Clear the partitions of an HdfsTable and the associated metadata.
private void resetPartitions() {
if (isStoredInImpaladCatalogCache()) {
// Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats.
for (int i = 0; i < numClusteringCols_; ++i) {
partitionValuesMap_.add(Maps.<LiteralExpr, HashSet<Long>>newTreeMap());
numHdfsFiles_ = 0;
totalHdfsBytes_ = 0;
* Resets any partition metadata, creates the default partition and sets the base
* table directory path as well as the caching info from the HMS table.
private void initializePartitionMetadata(
org.apache.hadoop.hive.metastore.api.Table msTbl) throws CatalogException {
hdfsBaseDir_ = msTbl.getSd().getLocation();
// INSERT statements need to refer to this if they try to write to new partitions
// Scans don't refer to this because by definition all partitions they refer to
// exist.
// We silently ignore cache directives that no longer exist in HDFS, and remove
// non-existing cache directives from the parameters.
isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters());
* Create HdfsPartition objects corresponding to 'msPartitions' and add them to this
* table's partition list. Any partition metadata will be reset and loaded from
* scratch. For each partition created, we load the block metadata for each data file
* under it.
* If there are no partitions in the Hive metadata, a single partition is added with no
* partition keys.
private void loadAllPartitions(
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws IOException,
CatalogException {
// Map of partition paths to their corresponding HdfsPartition objects. Populated
// using createPartition() calls. A single partition path can correspond to multiple
// partitions.
HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
if (msTbl.getPartitionKeysSize() == 0) {
Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty());
// This table has no partition key, which means it has no declared partitions.
// We model partitions slightly differently to Hive - every file must exist in a
// partition, so add a single partition with no keys which will get all the
// files in the table's root directory.
Path tblLocation = FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath());
HdfsPartition part = createPartition(msTbl.getSd(), null);
partsByPath.put(tblLocation, Lists.newArrayList(part));
if (isMarkedCached_) part.markCached();
FileSystem fs = tblLocation.getFileSystem(CONF);
accessLevel_ = getAvailableAccessLevel(fs, tblLocation);
} else {
for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
HdfsPartition partition = createPartition(msPartition.getSd(), msPartition);
// If the partition is null, its HDFS path does not exist, and it was not added
// to this table's partition list. Skip the partition.
if (partition == null) continue;
if (msPartition.getParameters() != null) {
if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
// TODO: READ_ONLY isn't exactly correct because the it's possible the
// partition does not have READ permissions either. When we start checking
// whether we can READ from a table, this should be updated to set the
// table's access level to the "lowest" effective level across all
// partitions. That is, if one partition has READ_ONLY and another has
// WRITE_ONLY the table's access level should be NONE.
accessLevel_ = TAccessLevel.READ_ONLY;
Path partDir = FileSystemUtil.createFullyQualifiedPath(
new Path(msPartition.getSd().getLocation()));
List<HdfsPartition> parts = partsByPath.get(partDir);
if (parts == null) {
partsByPath.put(partDir, Lists.newArrayList(partition));
} else {
// Load the file metadata from scratch.
loadMetadataAndDiskIds(partsByPath, false);
* Helper method to load the partition file metadata from scratch. This method is
* optimized for loading newly added partitions. For refreshing existing partitions
* use refreshPartitionFileMetadata(HdfsPartition).
private void resetAndLoadPartitionFileMetadata(HdfsPartition partition) {
Path partDir = partition.getLocationPath();
try {
FileMetadataLoadStats stats =
resetAndLoadFileMetadata(partDir, Lists.newArrayList(partition));
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Loaded file metadata for %s %s", getFullName(),
} catch (Exception e) {
LOG.error("Error loading file metadata for path: " + partDir.toString() +
". Partitions file metadata could be partially loaded.", e);
* Returns the thread pool size to load the file metadata of this table.
* 'numPaths' is the number of paths for which the file metadata should be loaded.
* We use different thread pool sizes for HDFS and non-HDFS tables since the latter
* supports much higher throughput of RPC calls for listStatus/listFiles. For
* simplicity, the filesystem type is determined based on the table's root path and
* not for each partition individually. Based on our experiments, S3 showed a linear
* speed up (up to ~100x) with increasing number of loading threads where as the HDFS
* throughput was limited to ~5x in un-secure clusters and up to ~3.7x in secure
* clusters. We narrowed it down to scalability bottlenecks in HDFS RPC implementation
* (HADOOP-14558) on both the server and the client side.
private int getLoadingThreadPoolSize(int numPaths) throws CatalogException {
Preconditions.checkState(numPaths > 0);
FileSystem tableFs;
try {
tableFs = (new Path(getLocation())).getFileSystem(CONF);
} catch (IOException e) {
throw new CatalogException("Invalid table path for table: " + getFullName(), e);
int threadPoolSize = FileSystemUtil.supportsStorageIds(tableFs) ?
// Thread pool size need not exceed the number of paths to be loaded.
return Math.min(numPaths, threadPoolSize);
* Helper method to load the block locations for each partition directory in
* partsByPath using a thread pool. 'partsByPath' maps each partition directory to
* the corresponding HdfsPartition objects. If 'reuseFileMd' is true, the block
* metadata is incrementally refreshed, else it is reloaded from scratch.
private void loadMetadataAndDiskIds(Map<Path, List<HdfsPartition>> partsByPath,
boolean reuseFileMd) throws CatalogException {
int numPathsToLoad = partsByPath.size();
// For tables without partitions we have no file metadata to load.
if (numPathsToLoad == 0) return;
int threadPoolSize = getLoadingThreadPoolSize(numPathsToLoad);"Loading file and block metadata for %s paths for table %s " +
"using a thread pool of size %s", numPathsToLoad, getFullName(),
ExecutorService partitionLoadingPool = Executors.newFixedThreadPool(threadPoolSize);
try {
List<Future<FileMetadataLoadStats>> pendingMdLoadTasks = Lists.newArrayList();
for (Path p: partsByPath.keySet()) {
FileMetadataLoadRequest blockMdLoadReq =
new FileMetadataLoadRequest(p, partsByPath.get(p), reuseFileMd);
// Wait for the partition load tasks to finish.
int failedLoadTasks = 0;
for (Future<FileMetadataLoadStats> task: pendingMdLoadTasks) {
try {
FileMetadataLoadStats loadStats = task.get();
if (LOG.isTraceEnabled()) LOG.trace(loadStats.debugString());
} catch (ExecutionException | InterruptedException e) {
LOG.error("Encountered an error loading block metadata for table: " +
getFullName(), e);
if (failedLoadTasks > 0) {
int errorsNotLogged = failedLoadTasks - MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG;
if (errorsNotLogged > 0) {
LOG.error(String.format("Error loading file metadata for %s paths for table " +
"%s. Only the first %s errors were logged.", failedLoadTasks, getFullName(),
throw new TableLoadingException(String.format("Failed to load file metadata "
+ "for %s paths for table %s. Table's file metadata could be partially "
+ "loaded. Check the Catalog server log for more details.", failedLoadTasks,
} finally {
}"Loaded file and block metadata for %s", getFullName()));
* Gets the AccessLevel that is available for Impala for this table based on the
* permissions Impala has on the given path. If the path does not exist, recurses up
* the path until a existing parent directory is found, and inherit access permissions
* from that.
* Always returns READ_WRITE for S3 and ADLS files.
private TAccessLevel getAvailableAccessLevel(FileSystem fs, Path location)
throws IOException {
// Avoid calling getPermissions() on file path for S3 files, as that makes a round
// trip to S3. Also, the S3A connector is currently unable to manage S3 permissions,
// so for now it is safe to assume that all files(objects) have READ_WRITE
// permissions, as that's what the S3A connector will always return too.
// TODO: Revisit if the S3A connector is updated to be able to manage S3 object
// permissions. (see HADOOP-13892)
if (FileSystemUtil.isS3AFileSystem(fs)) return TAccessLevel.READ_WRITE;
// The ADLS connector currently returns ACLs for files in ADLS, but can only map
// them to the ADLS client SPI and not the Hadoop users/groups, causing unexpected
// behavior. So ADLS ACLs are unsupported until the connector is able to map
// permissions to hadoop users/groups (HADOOP-14437).
if (FileSystemUtil.isADLFileSystem(fs)) return TAccessLevel.READ_WRITE;
FsPermissionChecker permissionChecker = FsPermissionChecker.getInstance();
while (location != null) {
try {
FsPermissionChecker.Permissions perms =
permissionChecker.getPermissions(fs, location);
if (perms.canReadAndWrite()) {
return TAccessLevel.READ_WRITE;
} else if (perms.canRead()) {
return TAccessLevel.READ_ONLY;
} else if (perms.canWrite()) {
return TAccessLevel.WRITE_ONLY;
return TAccessLevel.NONE;
} catch (FileNotFoundException e) {
location = location.getParent();
// Should never get here.
Preconditions.checkNotNull(location, "Error: no path ancestor exists");
return TAccessLevel.NONE;
* Creates a new HdfsPartition object to be added to HdfsTable's partition list.
* Partitions may be empty, or may not even exist in the filesystem (a partition's
* location may have been changed to a new path that is about to be created by an
* INSERT). Also loads the file metadata for this partition. Returns new partition
* if successful or null if none was created.
* Throws CatalogException if the supplied storage descriptor contains metadata that
* Impala can't understand.
public HdfsPartition createAndLoadPartition(
org.apache.hadoop.hive.metastore.api.Partition msPartition)
throws CatalogException {
HdfsPartition hdfsPartition = createPartition(msPartition.getSd(), msPartition);
return hdfsPartition;
* Same as createAndLoadPartition() but is optimized for loading file metadata of
* newly created HdfsPartitions in parallel.
public List<HdfsPartition> createAndLoadPartitions(
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions)
throws CatalogException {
HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
List<HdfsPartition> addedParts = Lists.newArrayList();
for (org.apache.hadoop.hive.metastore.api.Partition partition: msPartitions) {
HdfsPartition hdfsPartition = createPartition(partition.getSd(), partition);
Path partitionPath = hdfsPartition.getLocationPath();
List<HdfsPartition> hdfsPartitions = partsByPath.get(partitionPath);
if (hdfsPartitions == null) {
partsByPath.put(partitionPath, Lists.newArrayList(hdfsPartition));
} else {
loadMetadataAndDiskIds(partsByPath, false);
return addedParts;
* Creates a new HdfsPartition from a specified StorageDescriptor and an HMS partition
* object.
private HdfsPartition createPartition(StorageDescriptor storageDescriptor,
org.apache.hadoop.hive.metastore.api.Partition msPartition)
throws CatalogException {
HdfsStorageDescriptor fileFormatDescriptor =
HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor);
List<LiteralExpr> keyValues = Lists.newArrayList();
if (msPartition != null) {
// Load key values
for (String partitionKey: msPartition.getValues()) {
Type type = getColumns().get(keyValues.size()).getType();
// Deal with Hive's special NULL partition key.
if (partitionKey.equals(nullPartitionKeyValue_)) {
} else {
try {
keyValues.add(LiteralExpr.create(partitionKey, type));
} catch (Exception ex) {
LOG.warn("Failed to create literal expression of type: " + type, ex);
throw new CatalogException("Invalid partition key value of type: " + type,
for (Expr v: keyValues) v.analyzeNoThrow(null);
Path partDirPath = new Path(storageDescriptor.getLocation());
try {
FileSystem fs = partDirPath.getFileSystem(CONF);
multipleFileSystems_ = multipleFileSystems_ ||
!FileSystemUtil.isPathOnFileSystem(new Path(getLocation()), fs);
if (msPartition != null) {
HdfsPartition partition =
new HdfsPartition(this, msPartition, keyValues, fileFormatDescriptor,
new ArrayList<FileDescriptor>(), getAvailableAccessLevel(fs, partDirPath));
return partition;
} catch (IOException e) {
throw new CatalogException("Error initializing partition", e);
* Adds the partition to the HdfsTable. Throws a CatalogException if the partition
* already exists in this table.
public void addPartition(HdfsPartition partition) throws CatalogException {
if (partitionMap_.containsKey(partition.getId())) {
throw new CatalogException(String.format("Partition %s already exists in table %s",
partition.getPartitionName(), getFullName()));
if (partition.getFileFormat() == HdfsFileFormat.AVRO) hasAvroData_ = true;
partitionMap_.put(partition.getId(), partition);
totalHdfsBytes_ += partition.getSize();
numHdfsFiles_ += partition.getNumFileDescriptors();
* Updates the HdfsTable's partition metadata, i.e. adds the id to the HdfsTable and
* populates structures used for speeding up partition pruning/lookup. Also updates
* column stats.
private void updatePartitionMdAndColStats(HdfsPartition partition) {
if (partition.getPartitionValues().size() != numClusteringCols_) return;
nameToPartitionMap_.put(partition.getPartitionName(), partition);
if (!isStoredInImpaladCatalogCache()) return;
for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
ColumnStats stats = getColumns().get(i).getStats();
LiteralExpr literal = partition.getPartitionValues().get(i);
// Store partitions with null partition values separately
if (literal instanceof NullLiteral) {
stats.setNumNulls(stats.getNumNulls() + 1);
if (nullPartitionIds_.get(i).isEmpty()) {
stats.setNumDistinctValues(stats.getNumDistinctValues() + 1);
HashSet<Long> partitionIds = partitionValuesMap_.get(i).get(literal);
if (partitionIds == null) {
partitionIds = Sets.newHashSet();
partitionValuesMap_.get(i).put(literal, partitionIds);
stats.setNumDistinctValues(stats.getNumDistinctValues() + 1);
* Drops the partition having the given partition spec from HdfsTable. Cleans up its
* metadata from all the mappings used to speed up partition pruning/lookup.
* Also updates partition column statistics. Given partitionSpec must match exactly
* one partition.
* Returns the HdfsPartition that was dropped. If the partition does not exist, returns
* null.
public HdfsPartition dropPartition(List<TPartitionKeyValue> partitionSpec) {
return dropPartition(getPartitionFromThriftPartitionSpec(partitionSpec));
* Drops a partition and updates partition column statistics. Returns the
* HdfsPartition that was dropped or null if the partition does not exist.
private HdfsPartition dropPartition(HdfsPartition partition) {
if (partition == null) return null;
totalHdfsBytes_ -= partition.getSize();
numHdfsFiles_ -= partition.getNumFileDescriptors();
Preconditions.checkArgument(partition.getPartitionValues().size() ==
Long partitionId = partition.getId();
// Remove the partition id from the list of partition ids and other mappings.
if (!isStoredInImpaladCatalogCache()) return partition;
for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
ColumnStats stats = getColumns().get(i).getStats();
LiteralExpr literal = partition.getPartitionValues().get(i);
// Check if this is a null literal.
if (literal instanceof NullLiteral) {
stats.setNumNulls(stats.getNumNulls() - 1);
if (nullPartitionIds_.get(i).isEmpty()) {
stats.setNumDistinctValues(stats.getNumDistinctValues() - 1);
HashSet<Long> partitionIds = partitionValuesMap_.get(i).get(literal);
// If there are multiple partition ids corresponding to a literal, remove
// only this id. Otherwise, remove the <literal, id> pair.
if (partitionIds.size() > 1) partitionIds.remove(partitionId);
else {
stats.setNumDistinctValues(stats.getNumDistinctValues() - 1);
return partition;
* Drops the given partitions from this table. Cleans up its metadata from all the
* mappings used to speed up partition pruning/lookup. Also updates partitions column
* statistics. Returns the list of partitions that were dropped.
public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions) {
ArrayList<HdfsPartition> droppedPartitions = Lists.newArrayList();
for (HdfsPartition partition: partitions) {
HdfsPartition hdfsPartition = dropPartition(partition);
if (hdfsPartition != null) droppedPartitions.add(hdfsPartition);
return droppedPartitions;
* Adds or replaces the default partition.
public void addDefaultPartition(StorageDescriptor storageDescriptor)
throws CatalogException {
// Default partition has no files and is not referred to by scan nodes. Data sinks
// refer to this to understand how to create new partitions.
HdfsStorageDescriptor hdfsStorageDescriptor =
HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor);
HdfsPartition partition = HdfsPartition.defaultPartition(this,
partitionMap_.put(partition.getId(), partition);
public void load(boolean reuseMetadata, IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
load(reuseMetadata, client, msTbl, true, true, null);
* Loads table metadata from the Hive Metastore.
* If 'reuseMetadata' is false, performs a full metadata load from the Hive Metastore,
* including partition and file metadata. Otherwise, loads metadata incrementally and
* updates this HdfsTable in place so that it is in sync with the Hive Metastore.
* Depending on the operation that triggered the table metadata load, not all the
* metadata may need to be updated. If 'partitionsToUpdate' is not null, it specifies a
* list of partitions for which metadata should be updated. Otherwise, all partition
* metadata will be updated from the Hive Metastore.
* If 'loadParitionFileMetadata' is true, file metadata of the specified partitions
* are reloaded from scratch. If 'partitionsToUpdate' is not specified, file metadata
* of all the partitions are loaded.
* If 'loadTableSchema' is true, the table schema is loaded from the Hive Metastore.
* There are several cases where existing file descriptors might be reused incorrectly:
* 1. an ALTER TABLE ADD PARTITION or dynamic partition insert is executed through
* Hive. This does not update the lastDdlTime.
* 2. Hdfs rebalancer is executed. This changes the block locations but doesn't update
* the mtime (file modification time).
* If any of these occur, user has to execute "invalidate metadata" to invalidate the
* metadata cache of the table and trigger a fresh load.
public void load(boolean reuseMetadata, IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl,
boolean loadParitionFileMetadata, boolean loadTableSchema,
Set<String> partitionsToUpdate) throws TableLoadingException {
// turn all exceptions into TableLoadingException
msTable_ = msTbl;
try {
if (loadTableSchema) loadSchema(client, msTbl);
if (reuseMetadata && getCatalogVersion() == Catalog.INITIAL_CATALOG_VERSION) {
// This is the special case of CTAS that creates a 'temp' table that does not
// actually exist in the Hive Metastore.
// Load partition and file metadata
if (reuseMetadata) {
// Incrementally update this table's partitions and file metadata"Incrementally loading table metadata for: " + getFullName());
partitionsToUpdate == null || loadParitionFileMetadata);
if (msTbl.getPartitionKeysSize() == 0) {
if (loadParitionFileMetadata) updateUnpartitionedTableFileMd();
} else {
client, partitionsToUpdate, loadParitionFileMetadata);
}"Incrementally loaded table metadata for: " + getFullName());
} else {
// Load all partitions from Hive Metastore, including file metadata."Fetching partition metadata from the Metastore: " + getFullName());
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);"Fetched partition metadata from the Metastore: " + getFullName());
loadAllPartitions(msPartitions, msTbl);
if (loadTableSchema) setAvroSchema(client, msTbl);
numHdfsFiles_ = -1;
totalHdfsBytes_ = -1;
} catch (TableLoadingException e) {
throw e;
} catch (Exception e) {
throw new TableLoadingException("Failed to load metadata for table: "
+ getFullName(), e);
* Updates the table metadata, including 'hdfsBaseDir_', 'isMarkedCached_',
* and 'accessLevel_' from 'msTbl'. Throws an IOException if there was an error
* accessing the table location path.
private void updateMdFromHmsTable(org.apache.hadoop.hive.metastore.api.Table msTbl)
throws IOException {
hdfsBaseDir_ = msTbl.getSd().getLocation();
isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters());
if (msTbl.getPartitionKeysSize() == 0) {
Path location = new Path(hdfsBaseDir_);
FileSystem fs = location.getFileSystem(CONF);
accessLevel_ = getAvailableAccessLevel(fs, location);
* Updates the file metadata of an unpartitioned HdfsTable.
private void updateUnpartitionedTableFileMd() throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("update unpartitioned table: " + getFullName());
org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
HdfsPartition part = createPartition(msTbl.getSd(), null);
if (isMarkedCached_) part.markCached();
* Updates the partitions of an HdfsTable so that they are in sync with the Hive
* Metastore. It reloads partitions that were marked 'dirty' by doing a DROP + CREATE.
* It removes from this table partitions that no longer exist in the Hive Metastore and
* adds partitions that were added externally (e.g. using Hive) to the Hive Metastore
* but do not exist in this table. If 'loadParitionFileMetadata' is true, it triggers
* file/block metadata reload for the partitions specified in 'partitionsToUpdate', if
* any, or for all the table partitions if 'partitionsToUpdate' is null.
private void updatePartitionsFromHms(IMetaStoreClient client,
Set<String> partitionsToUpdate, boolean loadParitionFileMetadata)
throws Exception {
if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName());
org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
Preconditions.checkState(msTbl.getPartitionKeysSize() != 0);
Preconditions.checkState(loadParitionFileMetadata || partitionsToUpdate == null);
// Retrieve all the partition names from the Hive Metastore. We need this to
// identify the delta between partitions of the local HdfsTable and the table entry
// in the Hive Metastore. Note: This is a relatively "cheap" operation
// (~.3 secs for 30K partitions).
Set<String> msPartitionNames = Sets.newHashSet();
client.listPartitionNames(db_.getName(), name_, (short) -1));
// Names of loaded partitions in this table
Set<String> partitionNames = Sets.newHashSet();
// Partitions for which file metadata must be loaded, grouped by partition paths.
Map<Path, List<HdfsPartition>> partitionsToUpdateFileMdByPath = Maps.newHashMap();
// Partitions that need to be dropped and recreated from scratch
List<HdfsPartition> dirtyPartitions = Lists.newArrayList();
// Partitions that need to be removed from this table. That includes dirty
// partitions as well as partitions that were removed from the Hive Metastore.
List<HdfsPartition> partitionsToRemove = Lists.newArrayList();
// Identify dirty partitions that need to be loaded from the Hive Metastore and
// partitions that no longer exist in the Hive Metastore.
for (HdfsPartition partition: partitionMap_.values()) {
// Ignore the default partition
if (partition.isDefaultPartition()) continue;
// Remove partitions that don't exist in the Hive Metastore. These are partitions
// that were removed from HMS using some external process, e.g. Hive.
if (!msPartitionNames.contains(partition.getPartitionName())) {
if (partition.isDirty()) {
// Dirty partitions are updated by removing them from table's partition
// list and loading them from the Hive Metastore.
} else {
if (partitionsToUpdate == null && loadParitionFileMetadata) {
Path partitionPath = partition.getLocationPath();
List<HdfsPartition> partitions =
if (partitions == null) {
partitionPath, Lists.newArrayList(partition));
} else {
// Load dirty partitions from Hive Metastore
loadPartitionsFromMetastore(dirtyPartitions, client);
// Identify and load partitions that were added in the Hive Metastore but don't
// exist in this table.
Set<String> newPartitionsInHms = Sets.difference(msPartitionNames, partitionNames);
loadPartitionsFromMetastore(newPartitionsInHms, client);
// If a list of modified partitions (old and new) is specified, don't reload file
// metadata for the new ones as they have already been detected in HMS and have been
// reloaded by loadPartitionsFromMetastore().
if (partitionsToUpdate != null) {
// Load file metadata. Until we have a notification mechanism for when a
// file changes in hdfs, it is sometimes required to reload all the file
// descriptors and block metadata of a table (e.g. REFRESH statement).
if (loadParitionFileMetadata) {
if (partitionsToUpdate != null) {
// Only reload file metadata of partitions specified in 'partitionsToUpdate'
partitionsToUpdateFileMdByPath = getPartitionsByPath(partitionsToUpdate);
loadMetadataAndDiskIds(partitionsToUpdateFileMdByPath, true);
* Given a set of partition names, returns the corresponding HdfsPartition
* objects grouped by their base directory path.
private HashMap<Path, List<HdfsPartition>> getPartitionsByPath(
Collection<String> partitionNames) {
HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
for (String partitionName: partitionNames) {
if (partitionName.length() > 0) {
// Trim the last trailing char '/' from each partition name
partName = partitionName.substring(0, partitionName.length()-1);
HdfsPartition partition = nameToPartitionMap_.get(partName);
Preconditions.checkNotNull(partition, "Invalid partition name: " + partName);
Path partitionPath = partition.getLocationPath();
List<HdfsPartition> partitions = partsByPath.get(partitionPath);
if (partitions == null) {
partsByPath.put(partitionPath, Lists.newArrayList(partition));
} else {
return partsByPath;
public void setTableStats(org.apache.hadoop.hive.metastore.api.Table msTbl) {
// For unpartitioned tables set the numRows in its partitions
// to the table's numRows.
if (numClusteringCols_ == 0 && !partitionMap_.isEmpty()) {
// Unpartitioned tables have a 'dummy' partition and a default partition.
// Temp tables used in CTAS statements have one partition.
Preconditions.checkState(partitionMap_.size() == 2 || partitionMap_.size() == 1);
for (HdfsPartition p: partitionMap_.values()) {
* Returns whether the table has the 'skip.header.line.count' property set.
private boolean hasSkipHeaderLineCount() {
org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
if (msTbl == null) return false;
return msTbl.getParameters().containsKey(TBL_PROP_SKIP_HEADER_LINE_COUNT);
* Parses and returns the value of the 'skip.header.line.count' table property. If the
* value is not set for the table, returns 0. If parsing fails or a value < 0 is found,
* the error parameter is updated to contain an error message.
public int parseSkipHeaderLineCount(StringBuilder error) {
if (!hasSkipHeaderLineCount()) return 0;
return parseSkipHeaderLineCount(getMetaStoreTable().getParameters(), error);
* Parses and returns the value of the 'skip.header.line.count' table property. The
* caller must ensure that the property is contained in the 'tblProperties' map. If
* parsing fails or a value < 0 is found, the error parameter is updated to contain an
* error message.
public static int parseSkipHeaderLineCount(Map<String, String> tblProperties,
StringBuilder error) {
Preconditions.checkState(tblProperties != null);
// Try to parse.
String string_value = tblProperties.get(TBL_PROP_SKIP_HEADER_LINE_COUNT);
int skipHeaderLineCount = 0;
String error_msg = String.format("Invalid value for table property %s: %s (value " +
"must be an integer >= 0)", TBL_PROP_SKIP_HEADER_LINE_COUNT, string_value);
try {
skipHeaderLineCount = Integer.parseInt(string_value);
} catch (NumberFormatException exc) {
if (skipHeaderLineCount < 0) error.append(error_msg);
return skipHeaderLineCount;
* Sets avroSchema_ if the table or any of the partitions in the table are stored
* as Avro. Additionally, this method also reconciles the schema if the column
* definitions from the metastore differ from the Avro schema.
private void setAvroSchema(IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
String inputFormat = msTbl.getSd().getInputFormat();
if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO
|| hasAvroData_) {
// Look for Avro schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter
// taking precedence.
List<Map<String, String>> schemaSearchLocations = Lists.newArrayList();
avroSchema_ = AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
if (avroSchema_ == null) {
// No Avro schema was explicitly set in the table metadata, so infer the Avro
// schema from the column definitions.
Schema inferredSchema = AvroSchemaConverter.convertFieldSchemas(
msTbl.getSd().getCols(), getFullName());
avroSchema_ = inferredSchema.toString();
String serdeLib = msTbl.getSd().getSerdeInfo().getSerializationLib();
if (serdeLib == null ||
serdeLib.equals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) {
// If the SerDe library is null or set to LazySimpleSerDe or is null, it
// indicates there is an issue with the table metadata since Avro table need a
// non-native serde. Instead of failing to load the table, fall back to
// using the fields from the storage descriptor (same as Hive).
} else {
// Generate new FieldSchemas from the Avro schema. This step reconciles
// differences in the column definitions and the Avro schema. For
// Impala-created tables this step is not necessary because the same
// resolution is done during table creation. But Hive-created tables
// store the original column definitions, and not the reconciled ones.
List<ColumnDef> colDefs =
List<ColumnDef> avroCols = AvroSchemaParser.parse(avroSchema_);
StringBuilder warning = new StringBuilder();
List<ColumnDef> reconciledColDefs =
AvroSchemaUtils.reconcileSchemas(colDefs, avroCols, warning);
if (warning.length() != 0) {
LOG.warn(String.format("Warning while loading table %s:\n%s",
getFullName(), warning.toString()));
// Reset and update nonPartFieldSchemas_ to the reconcicled colDefs.
// Update the columns as per the reconciled colDefs and re-load stats.
* Loads table schema and column stats from Hive Metastore.
private void loadSchema(IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
// set nullPartitionKeyValue from the hive conf.
nullPartitionKeyValue_ = client.getConfigValue(
// set NULL indicator string from table properties
nullColumnValue_ =
if (nullColumnValue_ == null) nullColumnValue_ = DEFAULT_NULL_COLUMN_VALUE;
// Excludes partition columns.
// The number of clustering columns is the number of partition keys.
numClusteringCols_ = msTbl.getPartitionKeys().size();
// Add all columns to the table. Ordering is important: partition columns first,
// then all other columns.
isSchemaLoaded_ = true;
* Loads partitions from the Hive Metastore and adds them to the internal list of
* table partitions.
private void loadPartitionsFromMetastore(List<HdfsPartition> partitions,
IMetaStoreClient client) throws Exception {
if (partitions.isEmpty()) return;
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Incrementally updating %d/%d partitions.",
partitions.size(), partitionMap_.size()));
Set<String> partitionNames = Sets.newHashSet();
for (HdfsPartition part: partitions) {
loadPartitionsFromMetastore(partitionNames, client);
* Loads from the Hive Metastore the partitions that correspond to the specified
* 'partitionNames' and adds them to the internal list of table partitions.
private void loadPartitionsFromMetastore(Set<String> partitionNames,
IMetaStoreClient client) throws Exception {
if (partitionNames.isEmpty()) return;
// Load partition metadata from Hive Metastore.
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
Lists.newArrayList(partitionNames), db_.getName(), name_));
for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
HdfsPartition partition = createPartition(msPartition.getSd(), msPartition);
// If the partition is null, its HDFS path does not exist, and it was not added to
// this table's partition list. Skip the partition.
if (partition == null) continue;
if (msPartition.getParameters() != null) {
if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
// TODO: READ_ONLY isn't exactly correct because the it's possible the
// partition does not have READ permissions either. When we start checking
// whether we can READ from a table, this should be updated to set the
// table's access level to the "lowest" effective level across all
// partitions. That is, if one partition has READ_ONLY and another has
// WRITE_ONLY the table's access level should be NONE.
accessLevel_ = TAccessLevel.READ_ONLY;
protected List<String> getColumnNamesWithHmsStats() {
List<String> ret = Lists.newArrayList();
// Only non-partition columns have column stats in the HMS.
for (Column column: getColumns().subList(numClusteringCols_, getColumns().size())) {
return ret;
protected synchronized void loadFromThrift(TTable thriftTable)
throws TableLoadingException {
THdfsTable hdfsTable = thriftTable.getHdfs_table();
partitionLocationCompressor_ = new HdfsPartitionLocationCompressor(
numClusteringCols_, hdfsTable.getPartition_prefixes());
hdfsBaseDir_ = hdfsTable.getHdfsBaseDir();
nullColumnValue_ = hdfsTable.nullColumnValue;
nullPartitionKeyValue_ = hdfsTable.nullPartitionKeyValue;
multipleFileSystems_ = hdfsTable.multiple_filesystems;
try {
for (Map.Entry<Long, THdfsPartition> part: hdfsTable.getPartitions().entrySet()) {
HdfsPartition hdfsPart =
HdfsPartition.fromThrift(this, part.getKey(), part.getValue());
} catch (CatalogException e) {
throw new TableLoadingException(e.getMessage());
avroSchema_ = hdfsTable.isSetAvroSchema() ? hdfsTable.getAvroSchema() : null;
isMarkedCached_ =
public TTableDescriptor toThriftDescriptor(int tableId,
Set<Long> referencedPartitions) {
// Create thrift descriptors to send to the BE. The BE does not
// need any information below the THdfsPartition level.
TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
getTColumnDescriptors(), numClusteringCols_, name_, db_.getName());
tableDesc.setHdfsTable(getTHdfsTable(false, referencedPartitions));
return tableDesc;
public TTable toThrift() {
// Send all metadata between the catalog service and the FE.
TTable table = super.toThrift();
table.setHdfs_table(getTHdfsTable(true, null));
return table;
* Create a THdfsTable corresponding to this HdfsTable. If includeFileDesc is true,
* then then all partitions and THdfsFileDescs of each partition should be included.
* Otherwise, don't include any THdfsFileDescs, and include only those partitions in
* the refPartitions set (the backend doesn't need metadata for unreferenced
* partitions). To prevent the catalog from hitting an OOM error while trying to
* serialize large partition incremental stats, we estimate the stats size and filter
* the incremental stats data from partition objects if the estimate exceeds
* --inc_stats_size_limit_bytes
private THdfsTable getTHdfsTable(boolean includeFileDesc, Set<Long> refPartitions) {
// includeFileDesc implies all partitions should be included (refPartitions == null).
Preconditions.checkState(!includeFileDesc || refPartitions == null);
int numPartitions =
(refPartitions == null) ? partitionMap_.values().size() : refPartitions.size();
long statsSizeEstimate =
numPartitions * getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
boolean includeIncrementalStats =
(statsSizeEstimate < BackendConfig.INSTANCE.getIncStatsMaxSize());
Map<Long, THdfsPartition> idToPartition = Maps.newHashMap();
for (HdfsPartition partition: partitionMap_.values()) {
long id = partition.getId();
if (refPartitions == null || refPartitions.contains(id)) {
partition.toThrift(includeFileDesc, includeIncrementalStats));
THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
nullPartitionKeyValue_, nullColumnValue_, idToPartition);
if (includeFileDesc) {
// Network addresses are used only by THdfsFileBlocks which are inside
// THdfsFileDesc, so include network addreses only when including THdfsFileDesc.
return hdfsTable;
public long getTotalHdfsBytes() { return totalHdfsBytes_; }
public String getHdfsBaseDir() { return hdfsBaseDir_; }
public Path getHdfsBaseDirPath() { return new Path(hdfsBaseDir_); }
public boolean isAvroTable() { return avroSchema_ != null; }
* Get the index of hosts that store replicas of blocks of this table.
public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; }
* Returns the file format that the majority of partitions are stored in.
public HdfsFileFormat getMajorityFormat() {
Map<HdfsFileFormat, Integer> numPartitionsByFormat = Maps.newHashMap();
for (HdfsPartition partition: partitionMap_.values()) {
HdfsFileFormat format = partition.getInputFormatDescriptor().getFileFormat();
Integer numPartitions = numPartitionsByFormat.get(format);
if (numPartitions == null) {
numPartitions = Integer.valueOf(1);
} else {
numPartitions = Integer.valueOf(numPartitions.intValue() + 1);
numPartitionsByFormat.put(format, numPartitions);
int maxNumPartitions = Integer.MIN_VALUE;
HdfsFileFormat majorityFormat = null;
for (Map.Entry<HdfsFileFormat, Integer> entry: numPartitionsByFormat.entrySet()) {
if (entry.getValue().intValue() > maxNumPartitions) {
majorityFormat = entry.getKey();
maxNumPartitions = entry.getValue().intValue();
return majorityFormat;
* Returns the HDFS paths corresponding to HdfsTable partitions that don't exist in
* the Hive Metastore. An HDFS path is represented as a list of strings values, one per
* partition key column.
public List<List<String>> getPathsWithoutPartitions() throws CatalogException {
HashSet<List<LiteralExpr>> existingPartitions = new HashSet<List<LiteralExpr>>();
// Get the list of partition values of existing partitions in Hive Metastore.
for (HdfsPartition partition: partitionMap_.values()) {
if (partition.isDefaultPartition()) continue;
List<String> partitionKeys = Lists.newArrayList();
for (int i = 0; i < numClusteringCols_; ++i) {
Path basePath = new Path(hdfsBaseDir_);
List<List<String>> partitionsNotInHms = new ArrayList<List<String>>();
try {
getAllPartitionsNotInHms(basePath, partitionKeys, existingPartitions,
} catch (Exception e) {
throw new CatalogException(String.format("Failed to recover partitions for %s " +
"with exception:%s.", getFullName(), e));
return partitionsNotInHms;
* Returns all partitions which match the partition keys directory structure and pass
* type compatibility check. Also these partitions are not already part of the table.
private void getAllPartitionsNotInHms(Path path, List<String> partitionKeys,
HashSet<List<LiteralExpr>> existingPartitions,
List<List<String>> partitionsNotInHms) throws IOException {
FileSystem fs = path.getFileSystem(CONF);
List<String> partitionValues = Lists.newArrayList();
List<LiteralExpr> partitionExprs = Lists.newArrayList();
getAllPartitionsNotInHms(path, partitionKeys, 0, fs, partitionValues,
partitionExprs, existingPartitions, partitionsNotInHms);
* Returns all partitions which match the partition keys directory structure and pass
* the type compatibility check.
* path e.g. c1=1/c2=2/c3=3
* partitionKeys The ordered partition keys. e.g.("c1", "c2", "c3")
* depth The start position in partitionKeys to match the path name.
* partitionValues The partition values used to create a partition.
* partitionExprs The list of LiteralExprs which is used to avoid duplicate partitions.
* E.g. Having /c1=0001 and /c1=01, we should make sure only one partition
* will be added.
* existingPartitions All partitions which exist in Hive Metastore or newly added.
* partitionsNotInHms Contains all the recovered partitions.
private void getAllPartitionsNotInHms(Path path, List<String> partitionKeys,
int depth, FileSystem fs, List<String> partitionValues,
List<LiteralExpr> partitionExprs, HashSet<List<LiteralExpr>> existingPartitions,
List<List<String>> partitionsNotInHms) throws IOException {
if (depth == partitionKeys.size()) {
if (existingPartitions.contains(partitionExprs)) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Skip recovery of path '%s' because it already "
+ "exists in metastore", path.toString()));
} else {
FileStatus[] statuses = FileSystemUtil.listStatus(fs, path);
if (statuses == null) return;
for (FileStatus status: statuses) {
if (!status.isDirectory()) continue;
Pair<String, LiteralExpr> keyValues =
getTypeCompatibleValue(status.getPath(), partitionKeys.get(depth));
if (keyValues == null) continue;
List<String> currentPartitionValues = Lists.newArrayList(partitionValues);
List<LiteralExpr> currentPartitionExprs = Lists.newArrayList(partitionExprs);
getAllPartitionsNotInHms(status.getPath(), partitionKeys, depth + 1, fs,
currentPartitionValues, currentPartitionExprs,
existingPartitions, partitionsNotInHms);
* Checks that the last component of 'path' is of the form "<partitionkey>=<v>"
* where 'v' is a type-compatible value from the domain of the 'partitionKey' column.
* If not, returns null, otherwise returns a Pair instance, the first element is the
* original value, the second element is the LiteralExpr created from the original
* value.
private Pair<String, LiteralExpr> getTypeCompatibleValue(Path path,
String partitionKey) {
String partName[] = path.getName().split("=");
if (partName.length != 2 || !partName[0].equals(partitionKey)) return null;
// Check Type compatibility for Partition value.
Column column = getColumn(partName[0]);
Type type = column.getType();
LiteralExpr expr = null;
if (!partName[1].equals(getNullPartitionKeyValue())) {
try {
expr = LiteralExpr.create(partName[1], type);
// Skip large value which exceeds the MAX VALUE of specified Type.
if (expr instanceof NumericLiteral) {
if (NumericLiteral.isOverflow(((NumericLiteral)expr).getValue(), type)) {
LOG.warn(String.format("Skip the overflow value (%s) for Type (%s).",
partName[1], type.toSql()));
return null;
} catch (Exception ex) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Invalid partition value (%s) for Type (%s).",
partName[1], type.toSql()));
return null;
} else {
expr = new NullLiteral();
return new Pair<String, LiteralExpr>(partName[1], expr);
* Returns an estimated row count for the given number of file bytes. The row count is
* extrapolated using the table-level row count and file bytes statistics.
* Returns zero only if the given file bytes is zero.
* Returns -1 if:
* - stats extrapolation has been disabled
* - the given file bytes statistic is negative
* - the row count or the file byte statistic is missing
* - the file bytes statistic is zero or negative
* - the row count statistic is zero and the file bytes is non-zero
* Otherwise, returns a value >= 1.
public long getExtrapolatedNumRows(long fileBytes) {
if (!BackendConfig.INSTANCE.enableStatsExtrapolation()) return -1;
if (fileBytes == 0) return 0;
if (fileBytes < 0) return -1;
if (tableStats_.num_rows < 0 || tableStats_.total_file_bytes <= 0) return -1;
if (tableStats_.num_rows == 0 && tableStats_.total_file_bytes != 0) return -1;
double rowsPerByte = tableStats_.num_rows / (double) tableStats_.total_file_bytes;
double extrapolatedNumRows = fileBytes * rowsPerByte;
return (long) Math.max(1, Math.round(extrapolatedNumRows));
* Returns statistics on this table as a tabular result set. Used for the
* SHOW TABLE STATS statement. The schema of the returned TResultSet is set
* inside this method.
public TResultSet getTableStats() {
TResultSet result = new TResultSet();
TResultSetMetadata resultSchema = new TResultSetMetadata();
for (int i = 0; i < numClusteringCols_; ++i) {
// Add the partition-key values as strings for simplicity.
Column partCol = getColumns().get(i);
TColumn colDesc = new TColumn(partCol.getName(), Type.STRING.toThrift());
boolean statsExtrap = BackendConfig.INSTANCE.enableStatsExtrapolation();
resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
if (statsExtrap) {
resultSchema.addToColumns(new TColumn("Extrap #Rows", Type.BIGINT.toThrift()));
resultSchema.addToColumns(new TColumn("#Files", Type.BIGINT.toThrift()));
resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Bytes Cached", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Cache Replication", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Format", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Incremental stats", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Location", Type.STRING.toThrift()));
// Pretty print partitions and their stats.
ArrayList<HdfsPartition> orderedPartitions =
long totalCachedBytes = 0L;
for (HdfsPartition p: orderedPartitions) {
// Ignore dummy default partition.
if (p.isDefaultPartition()) continue;
TResultRowBuilder rowBuilder = new TResultRowBuilder();
// Add the partition-key values (as strings for simplicity).
for (LiteralExpr expr: p.getPartitionValues()) {
// Add rows, extrapolated rows, files, bytes, cache stats, and file format.
// Compute and report the extrapolated row count because the set of files could
// have changed since we last computed stats for this partition. We also follow
// this policy during scan-cardinality estimation.
if (statsExtrap) rowBuilder.add(getExtrapolatedNumRows(p.getSize()));
if (!p.isMarkedCached()) {
// Helps to differentiate partitions that have 0B cached versus partitions
// that are not marked as cached.
rowBuilder.add("NOT CACHED");
rowBuilder.add("NOT CACHED");
} else {
// Calculate the number the number of bytes that are cached.
long cachedBytes = 0L;
for (FileDescriptor fd: p.getFileDescriptors()) {
int numBlocks = fd.getNumFileBlocks();
for (int i = 0; i < numBlocks; ++i) {
FbFileBlock block = fd.getFbFileBlock(i);
if (FileBlock.hasCachedReplica(block)) {
cachedBytes += FileBlock.getLength(block);
totalCachedBytes += cachedBytes;
// Extract cache replication factor from the parameters of the table
// if the table is not partitioned or directly from the partition.
Short rep = HdfsCachingUtil.getCachedCacheReplication(
numClusteringCols_ == 0 ?
p.getTable().getMetaStoreTable().getParameters() :
// For partitioned tables add a summary row at the bottom.
if (numClusteringCols_ > 0) {
TResultRowBuilder rowBuilder = new TResultRowBuilder();
int numEmptyCells = numClusteringCols_ - 1;
for (int i = 0; i < numEmptyCells; ++i) {
// Total rows, extrapolated rows, files, bytes, cache stats.
// Leave format empty.
// Compute and report the extrapolated row count because the set of files could
// have changed since we last computed stats for this partition. We also follow
// this policy during scan-cardinality estimation.
if (statsExtrap) rowBuilder.add(getExtrapolatedNumRows(totalHdfsBytes_));
return result;
* Returns files info for the given dbname/tableName and partition spec.
* Returns files info for all partitions, if partition spec is null, ordered
* by partition.
public TResultSet getFiles(List<List<TPartitionKeyValue>> partitionSet)
throws CatalogException {
TResultSet result = new TResultSet();
TResultSetMetadata resultSchema = new TResultSetMetadata();
resultSchema.addToColumns(new TColumn("Path", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Partition", Type.STRING.toThrift()));
List<HdfsPartition> orderedPartitions;
if (partitionSet == null) {
orderedPartitions = Lists.newArrayList(partitionMap_.values());
} else {
// Get a list of HdfsPartition objects for the given partition set.
orderedPartitions = getPartitionsFromPartitionSet(partitionSet);
for (HdfsPartition p: orderedPartitions) {
List<FileDescriptor> orderedFds = Lists.newArrayList(p.getFileDescriptors());
for (FileDescriptor fd: orderedFds) {
TResultRowBuilder rowBuilder = new TResultRowBuilder();
rowBuilder.add(p.getLocation() + "/" + fd.getFileName());
return result;
* Constructs a partition name from a list of TPartitionKeyValue objects.
public static String constructPartitionName(List<TPartitionKeyValue> partitionSpec) {
List<String> partitionCols = Lists.newArrayList();
List<String> partitionVals = Lists.newArrayList();
for (TPartitionKeyValue kv: partitionSpec) {
return org.apache.hadoop.hive.common.FileUtils.makePartName(partitionCols,
* Reloads the metadata of partition 'oldPartition' by removing
* it from the table and reconstructing it from the HMS partition object
* 'hmsPartition'. If old partition is null then nothing is removed and
* and partition constructed from 'hmsPartition' is simply added.
public void reloadPartition(HdfsPartition oldPartition, Partition hmsPartition)
throws CatalogException {
HdfsPartition refreshedPartition = createPartition(
hmsPartition.getSd(), hmsPartition);
Preconditions.checkArgument(oldPartition == null
|| oldPartition.compareTo(refreshedPartition) == 0);
* Selects a random sample of files from the given list of partitions such that the sum
* of file sizes is at least 'percentBytes' percent of the total number of bytes in
* those partitions. The sample is returned as a map from partition id to a list of
* file descriptors selected from that partition.
* This function allocates memory proportional to the number of files in 'inputParts'.
* Its implementation tries to minimize the constant factor and object generation.
* The given 'randomSeed' is used for random number generation.
* The 'percentBytes' parameter must be between 0 and 100.
public Map<Long, List<FileDescriptor>> getFilesSample(
Collection<HdfsPartition> inputParts, long percentBytes, long randomSeed) {
Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
// Conservative max size for Java arrays. The actual maximum varies
// from JVM version and sometimes between configurations.
final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
if (numHdfsFiles_ > JVM_MAX_ARRAY_SIZE) {
throw new IllegalStateException(String.format(
"Too many files to generate a table sample. " +
"Table '%s' has %s files, but a maximum of %s files are supported.",
getTableName().toString(), numHdfsFiles_, JVM_MAX_ARRAY_SIZE));
int totalNumFiles = (int) numHdfsFiles_;
// Ensure a consistent ordering of files for repeatable runs. The files within a
// partition are already ordered based on how they are loaded in the catalog.
List<HdfsPartition> orderedParts = Lists.newArrayList(inputParts);
// fileIdxs contains indexes into the file descriptor lists of all inputParts
// parts[i] contains the partition corresponding to fileIdxs[i]
// fileIdxs[i] is an index into the file descriptor list of the partition parts[i]
// Use max size to avoid looping over inputParts for the exact size.
// The purpose of these arrays is to efficiently avoid selecting the same file
// multiple times during the sampling, regardless of the sample percent. We purposely
// avoid generating objects proportional to the number of files.
int[] fileIdxs = new int[totalNumFiles];
HdfsPartition[] parts = new HdfsPartition[totalNumFiles];
int idx = 0;
long totalBytes = 0;
for (HdfsPartition part: orderedParts) {
totalBytes += part.getSize();
int numFds = part.getNumFileDescriptors();
for (int fileIdx = 0; fileIdx < numFds; ++fileIdx) {
fileIdxs[idx] = fileIdx;
parts[idx] = part;
int numFilesRemaining = idx;
double fracPercentBytes = (double) percentBytes / 100;
long targetBytes = (long) Math.round(totalBytes * fracPercentBytes);
// Randomly select files until targetBytes has been reached or all files have been
// selected.
Random rnd = new Random(randomSeed);
long selectedBytes = 0;
Map<Long, List<FileDescriptor>> result = Maps.newHashMap();
while (selectedBytes < targetBytes && numFilesRemaining > 0) {
int selectedIdx = Math.abs(rnd.nextInt()) % numFilesRemaining;
HdfsPartition part = parts[selectedIdx];
Long partId = Long.valueOf(part.getId());
List<FileDescriptor> sampleFileIdxs = result.get(partId);
if (sampleFileIdxs == null) {
sampleFileIdxs = Lists.newArrayList();
result.put(partId, sampleFileIdxs);
FileDescriptor fd = part.getFileDescriptors().get(fileIdxs[selectedIdx]);
selectedBytes += fd.getFileLength();
// Avoid selecting the same file multiple times.
fileIdxs[selectedIdx] = fileIdxs[numFilesRemaining - 1];
parts[selectedIdx] = parts[numFilesRemaining - 1];
return result;