blob: 4de25fe1350365a543398fddde160b5fbaef2440 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.impala.analysis.ColumnDef;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.analysis.NullLiteral;
import org.apache.impala.analysis.NumericLiteral;
import org.apache.impala.analysis.PartitionKeyValue;
import org.apache.impala.catalog.HdfsPartition.FileBlock;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.Reference;
import org.apache.impala.fb.FbFileBlock;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.ImpalaInternalServiceConstants;
import org.apache.impala.thrift.TAccessLevel;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.THdfsPartition;
import org.apache.impala.thrift.THdfsTable;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TResultRow;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TResultSetMetadata;
import org.apache.impala.thrift.TTable;
import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTableType;
import org.apache.impala.util.AvroSchemaConverter;
import org.apache.impala.util.AvroSchemaParser;
import org.apache.impala.util.AvroSchemaUtils;
import org.apache.impala.util.FsPermissionChecker;
import org.apache.impala.util.HdfsCachingUtil;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.impala.util.TAccessLevelUtil;
import org.apache.impala.util.TResultRowBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Internal representation of table-related metadata of a file-resident table on a
* Hadoop filesystem. The table data can be accessed through libHDFS (which is more of
* an abstraction over Hadoop's FileSystem class rather than DFS specifically). A
* partitioned table can even span multiple filesystems.
*
* This class is not thread-safe. Clients of this class need to protect against
* concurrent updates using external locking (see CatalogOpExecutor class).
*
* Owned by Catalog instance.
* The partition keys constitute the clustering columns.
*
*/
public class HdfsTable extends Table {
// hive's default value for table property 'serialization.null.format'
private static final String DEFAULT_NULL_COLUMN_VALUE = "\\N";
// Name of default partition for unpartitioned tables
private static final String DEFAULT_PARTITION_NAME = "";
// Number of times to retry fetching the partitions from the HMS should an error occur.
private final static int NUM_PARTITION_FETCH_RETRIES = 5;
// Maximum number of errors logged when loading partitioned tables.
private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100;
// Table property key for skip.header.line.count
public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = "skip.header.line.count";
// string to indicate NULL. set in load() from table properties
private String nullColumnValue_;
// hive uses this string for NULL partition keys. Set in load().
private String nullPartitionKeyValue_;
// Avro schema of this table if this is an Avro table, otherwise null. Set in load().
private String avroSchema_ = null;
// Set to true if any of the partitions have Avro data.
private boolean hasAvroData_ = false;
// True if this table's metadata is marked as cached. Does not necessarily mean the
// data is cached or that all/any partitions are cached.
private boolean isMarkedCached_ = false;
// Array of sorted maps storing the association between partition values and
// partition ids. There is one sorted map per partition key. It is only populated if
// this table object is stored in ImpaladCatalog.
private final ArrayList<TreeMap<LiteralExpr, HashSet<Long>>> partitionValuesMap_ =
Lists.newArrayList();
// Array of partition id sets that correspond to partitions with null values
// in the partition keys; one set per partition key. It is not populated if the table is
// stored in the catalog server.
private final ArrayList<HashSet<Long>> nullPartitionIds_ = Lists.newArrayList();
// Map of partition ids to HdfsPartitions.
private final HashMap<Long, HdfsPartition> partitionMap_ = Maps.newHashMap();
// Map of partition name to HdfsPartition object. Used for speeding up
// table metadata loading.
private final HashMap<String, HdfsPartition> nameToPartitionMap_ = Maps.newHashMap();
// Store all the partition ids of an HdfsTable.
private final HashSet<Long> partitionIds_ = Sets.newHashSet();
// Estimate (in bytes) of the incremental stats size per column per partition
public static final long STATS_SIZE_PER_COLUMN_BYTES = 400;
// Bi-directional map between an integer index and a unique datanode
// TNetworkAddresses, each of which contains blocks of 1 or more
// files in this table. The network addresses are stored using IP
// address as the host name. Each FileBlock specifies a list of
// indices within this hostIndex_ to specify which nodes contain
// replicas of the block.
private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<TNetworkAddress>();
private HdfsPartitionLocationCompressor partitionLocationCompressor_;
// Total number of Hdfs files in this table. Accounted only in the Impalad catalog
// cache. Set to -1 on Catalogd.
private long numHdfsFiles_;
// Sum of sizes of all Hdfs files in this table. Accounted only in the Impalad
// catalog cache. Set to -1 on Catalogd.
private long totalHdfsBytes_;
// True iff the table's partitions are located on more than one filesystem.
private boolean multipleFileSystems_ = false;
// Base Hdfs directory where files of this table are stored.
// For unpartitioned tables it is simply the path where all files live.
// For partitioned tables it is the root directory
// under which partition dirs are placed.
protected String hdfsBaseDir_;
// List of FieldSchemas that correspond to the non-partition columns. Used when
// describing this table and its partitions to the HMS (e.g. as part of an alter table
// operation), when only non-partition columns are required.
private final List<FieldSchema> nonPartFieldSchemas_ = Lists.newArrayList();
// Flag to check if the table schema has been loaded. Used as a precondition
// for setAvroSchema().
private boolean isSchemaLoaded_ = false;
private final static Logger LOG = LoggerFactory.getLogger(HdfsTable.class);
// Caching this configuration object makes calls to getFileSystem much quicker
// (saves ~50ms on a standard plan)
// TODO(henry): confirm that this is thread safe - cursory inspection of the class
// and its usage in getFileSystem suggests it should be.
private static final Configuration CONF = new Configuration();
private static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD =
BackendConfig.INSTANCE.maxHdfsPartsParallelLoad();
private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD =
BackendConfig.INSTANCE.maxNonHdfsPartsParallelLoad();
// File/Block metadata loading stats for a single HDFS path.
private class FileMetadataLoadStats {
// Path corresponding to this metadata load request.
private final Path hdfsPath;
// Number of files for which the metadata was loaded.
public int loadedFiles = 0;
// Number of hidden files excluded from file metadata loading. More details at
// isValidDataFile().
public int hiddenFiles = 0;
// Number of files skipped from file metadata loading because the files have not
// changed since the last load. More details at hasFileChanged().
public int skippedFiles = 0;
// Number of unknown disk IDs encountered while loading block
// metadata for this path.
public long unknownDiskIds = 0;
public FileMetadataLoadStats(Path path) { hdfsPath = path; }
public String debugString() {
Preconditions.checkNotNull(hdfsPath);
return String.format("Path: %s: Loaded files: %s Hidden files: %s " +
"Skipped files: %s Unknown diskIDs: %s", hdfsPath, loadedFiles, hiddenFiles,
skippedFiles, unknownDiskIds);
}
}
// A callable implementation of file metadata loading request for a given
// HDFS path.
public class FileMetadataLoadRequest
implements Callable<FileMetadataLoadStats> {
private final Path hdfsPath_;
// All the partitions mapped to the above path
private final List<HdfsPartition> partitionList_;
// If set to true, reloads the file metadata only when the files in this path
// have changed since last load (more details in hasFileChanged()).
private final boolean reuseFileMd_;
public FileMetadataLoadRequest(Path path, List<HdfsPartition> partitions,
boolean reuseFileMd) {
hdfsPath_ = path;
partitionList_ = partitions;
reuseFileMd_ = reuseFileMd;
}
@Override
public FileMetadataLoadStats call() throws IOException {
FileMetadataLoadStats loadingStats =
reuseFileMd_ ? refreshFileMetadata(hdfsPath_, partitionList_) :
resetAndLoadFileMetadata(hdfsPath_, partitionList_);
return loadingStats;
}
public String debugString() {
String loadType = reuseFileMd_? "Refreshed" : "Loaded";
return String.format("%s file metadata for path: %s", loadType,
hdfsPath_.toString());
}
}
public HdfsTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
Db db, String name, String owner) {
super(msTbl, db, name, owner);
partitionLocationCompressor_ =
new HdfsPartitionLocationCompressor(numClusteringCols_);
}
/**
* Returns true if the table resides at a location which supports caching (e.g. HDFS).
*/
public boolean isLocationCacheable() {
return FileSystemUtil.isPathCacheable(new Path(getLocation()));
}
/**
* Returns true if the table and all its partitions reside at locations which
* support caching (e.g. HDFS).
*/
public boolean isCacheable() {
if (!isLocationCacheable()) return false;
if (!isMarkedCached() && numClusteringCols_ > 0) {
for (HdfsPartition partition: getPartitions()) {
if (partition.getId() == ImpalaInternalServiceConstants.DEFAULT_PARTITION_ID) {
continue;
}
if (!partition.isCacheable()) {
return false;
}
}
}
return true;
}
/**
* Updates numHdfsFiles_ and totalHdfsBytes_ based on the partition information.
* This is used only for the frontend tests that do not spawn a separate Catalog
* instance.
*/
public void computeHdfsStatsForTesting() {
Preconditions.checkState(numHdfsFiles_ == -1 && totalHdfsBytes_ == -1);
numHdfsFiles_ = 0;
totalHdfsBytes_ = 0;
for (HdfsPartition partition: partitionMap_.values()) {
numHdfsFiles_ += partition.getNumFileDescriptors();
totalHdfsBytes_ += partition.getSize();
}
}
/**
* Drops and re-loads the file metadata of all the partitions in 'partitions' that
* map to the path 'partDir'. 'partDir' may belong to any file system that
* implements the hadoop's FileSystem interface (like HDFS, S3, ADLS etc.). It involves
* the following steps:
* - Clear the current file metadata of the partitions.
* - Call FileSystem.listFiles() on 'partDir' to fetch the FileStatus and BlockLocations
* for each file under it.
* - For every valid data file, enumerate all its blocks and their corresponding hosts
* and disk IDs if the underlying file system supports the block locations API
* (for ex: HDFS). For other file systems (like S3) we synthesize the file metadata
* manually by splitting the file ranges into fixed size blocks.
* For filesystems that don't support BlockLocation API, synthesize file blocks
* by manually splitting the file range into fixed-size blocks. That way, scan
* ranges can be derived from file blocks as usual. All synthesized blocks are given
* an invalid network address so that the scheduler will treat them as remote.
*/
private FileMetadataLoadStats resetAndLoadFileMetadata(
Path partDir, List<HdfsPartition> partitions) throws IOException {
FileMetadataLoadStats loadStats = new FileMetadataLoadStats(partDir);
// No need to load blocks for empty partitions list.
if (partitions == null || partitions.isEmpty()) return loadStats;
if (LOG.isTraceEnabled()) {
LOG.trace("Loading block md for " + getFullName() + " path: " + partDir.toString());
}
FileSystem fs = partDir.getFileSystem(CONF);
boolean synthesizeFileMd = !FileSystemUtil.supportsStorageIds(fs);
RemoteIterator<LocatedFileStatus> fileStatusIter =
FileSystemUtil.listFiles(fs, partDir, false);
if (fileStatusIter == null) return loadStats;
Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
List<FileDescriptor> newFileDescs = Lists.newArrayList();
while (fileStatusIter.hasNext()) {
LocatedFileStatus fileStatus = fileStatusIter.next();
if (!FileSystemUtil.isValidDataFile(fileStatus)) {
++loadStats.hiddenFiles;
continue;
}
FileDescriptor fd;
// Block locations are manually synthesized if the underlying fs does not support
// the block location API.
if (synthesizeFileMd) {
fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
partitions.get(0).getFileFormat(), hostIndex_);
} else {
fd = FileDescriptor.create(fileStatus,
fileStatus.getBlockLocations(), fs, hostIndex_, numUnknownDiskIds);
}
newFileDescs.add(fd);
++loadStats.loadedFiles;
}
for (HdfsPartition partition: partitions) partition.setFileDescriptors(newFileDescs);
loadStats.unknownDiskIds += numUnknownDiskIds.getRef();
if (LOG.isTraceEnabled()) {
LOG.trace("Loaded file metadata for " + getFullName() + " " +
loadStats.debugString());
}
return loadStats;
}
/**
* Refreshes file metadata information for 'path'. This method is optimized for
* the case where the files in the partition have not changed dramatically. It first
* uses a listStatus() call on the partition directory to detect the modified files
* (look at hasFileChanged()) and fetches their block locations using the
* getFileBlockLocations() method. Our benchmarks suggest that the listStatus() call
* is much faster then the listFiles() (up to ~40x faster in some cases).
*/
private FileMetadataLoadStats refreshFileMetadata(
Path partDir, List<HdfsPartition> partitions) throws IOException {
FileMetadataLoadStats loadStats = new FileMetadataLoadStats(partDir);
// No need to load blocks for empty partitions list.
if (partitions == null || partitions.isEmpty()) return loadStats;
if (LOG.isTraceEnabled()) {
LOG.trace("Refreshing block md for " + getFullName() + " path: " +
partDir.toString());
}
// Index the partition file descriptors by their file names for O(1) look ups.
// We just pick the first partition to generate the fileDescByName lookup table
// since all the partitions map to the same partDir.
ImmutableMap<String, FileDescriptor> fileDescsByName = Maps.uniqueIndex(
partitions.get(0).getFileDescriptors(), new Function<FileDescriptor, String>() {
@Override
public String apply(FileDescriptor desc) { return desc.getFileName(); }
});
FileSystem fs = partDir.getFileSystem(CONF);
FileStatus[] fileStatuses = FileSystemUtil.listStatus(fs, partDir);
if (fileStatuses == null) return loadStats;
boolean synthesizeFileMd = !FileSystemUtil.supportsStorageIds(fs);
Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
List<FileDescriptor> newFileDescs = Lists.newArrayList();
HdfsFileFormat fileFormat = partitions.get(0).getFileFormat();
// If there is a cached partition mapped to this path, we recompute the block
// locations even if the underlying files have not changed (hasFileChanged()).
// This is done to keep the cached block metadata up to date.
boolean isPartitionMarkedCached = false;
for (HdfsPartition partition: partitions) {
if (partition.isMarkedCached()) {
isPartitionMarkedCached = true;
break;
}
}
for (FileStatus fileStatus: fileStatuses) {
if (!FileSystemUtil.isValidDataFile(fileStatus)) {
++loadStats.hiddenFiles;
continue;
}
String fileName = fileStatus.getPath().getName().toString();
FileDescriptor fd = fileDescsByName.get(fileName);
if (isPartitionMarkedCached || hasFileChanged(fd, fileStatus)) {
if (synthesizeFileMd) {
fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
fileFormat, hostIndex_);
} else {
BlockLocation[] locations =
fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
numUnknownDiskIds);
}
++loadStats.loadedFiles;
} else {
++loadStats.skippedFiles;
}
Preconditions.checkNotNull(fd);
newFileDescs.add(fd);
}
loadStats.unknownDiskIds += numUnknownDiskIds.getRef();
for (HdfsPartition partition: partitions) partition.setFileDescriptors(newFileDescs);
if (LOG.isTraceEnabled()) {
LOG.trace("Refreshed file metadata for " + getFullName() + " "
+ loadStats.debugString());
}
return loadStats;
}
/**
* Compares the modification time and file size between the FileDescriptor and the
* FileStatus to determine if the file has changed. Returns true if the file has changed
* and false otherwise.
*/
private static boolean hasFileChanged(FileDescriptor fd, FileStatus status) {
return (fd == null) || (fd.getFileLength() != status.getLen()) ||
(fd.getModificationTime() != status.getModificationTime());
}
/**
* Helper method to reload the file metadata of a single partition.
*/
private void refreshPartitionFileMetadata(HdfsPartition partition)
throws CatalogException {
try {
Path partDir = partition.getLocationPath();
FileMetadataLoadStats stats = refreshFileMetadata(partDir,
Lists.newArrayList(partition));
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Refreshed file metadata for %s %s",
getFullName(), stats.debugString()));
}
} catch (IOException e) {
throw new CatalogException("Encountered invalid partition path", e);
}
}
@Override
public TCatalogObjectType getCatalogObjectType() {
return TCatalogObjectType.TABLE;
}
public boolean isMarkedCached() { return isMarkedCached_; }
public Collection<HdfsPartition> getPartitions() { return partitionMap_.values(); }
public Map<Long, HdfsPartition> getPartitionMap() { return partitionMap_; }
public Set<Long> getNullPartitionIds(int i) { return nullPartitionIds_.get(i); }
public HdfsPartitionLocationCompressor getPartitionLocationCompressor() {
return partitionLocationCompressor_;
}
public Set<Long> getPartitionIds() { return partitionIds_; }
public TreeMap<LiteralExpr, HashSet<Long>> getPartitionValueMap(int i) {
return partitionValuesMap_.get(i);
}
/**
* Returns the value Hive is configured to use for NULL partition key values.
* Set during load.
*/
public String getNullPartitionKeyValue() { return nullPartitionKeyValue_; }
/*
* Returns the storage location (HDFS path) of this table.
*/
public String getLocation() {
return super.getMetaStoreTable().getSd().getLocation();
}
List<FieldSchema> getNonPartitionFieldSchemas() { return nonPartFieldSchemas_; }
// True if Impala has HDFS write permissions on the hdfsBaseDir (for an unpartitioned
// table) or if Impala has write permissions on all partition directories (for
// a partitioned table).
public boolean hasWriteAccess() {
return TAccessLevelUtil.impliesWriteAccess(accessLevel_);
}
/**
* Returns the first location (HDFS path) that Impala does not have WRITE access
* to, or an null if none is found. For an unpartitioned table, this just
* checks the hdfsBaseDir. For a partitioned table it checks all partition directories.
*/
public String getFirstLocationWithoutWriteAccess() {
if (getMetaStoreTable() == null) return null;
if (getMetaStoreTable().getPartitionKeysSize() == 0) {
if (!TAccessLevelUtil.impliesWriteAccess(accessLevel_)) {
return hdfsBaseDir_;
}
} else {
for (HdfsPartition partition: partitionMap_.values()) {
if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
return partition.getLocation();
}
}
}
return null;
}
/**
* Gets the HdfsPartition matching the given partition spec. Returns null if no match
* was found.
*/
public HdfsPartition getPartition(
List<PartitionKeyValue> partitionSpec) {
List<TPartitionKeyValue> partitionKeyValues = Lists.newArrayList();
for (PartitionKeyValue kv: partitionSpec) {
String value = PartitionKeyValue.getPartitionKeyValueString(
kv.getLiteralValue(), getNullPartitionKeyValue());
partitionKeyValues.add(new TPartitionKeyValue(kv.getColName(), value));
}
return getPartitionFromThriftPartitionSpec(partitionKeyValues);
}
/**
* Gets the HdfsPartition matching the Thrift version of the partition spec.
* Returns null if no match was found.
*/
public HdfsPartition getPartitionFromThriftPartitionSpec(
List<TPartitionKeyValue> partitionSpec) {
// First, build a list of the partition values to search for in the same order they
// are defined in the table.
List<String> targetValues = Lists.newArrayList();
Set<String> keys = Sets.newHashSet();
for (FieldSchema fs: getMetaStoreTable().getPartitionKeys()) {
for (TPartitionKeyValue kv: partitionSpec) {
if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) {
targetValues.add(kv.getValue());
// Same key was specified twice
if (!keys.add(kv.getName().toLowerCase())) {
return null;
}
}
}
}
// Make sure the number of values match up and that some values were found.
if (targetValues.size() == 0 ||
(targetValues.size() != getMetaStoreTable().getPartitionKeysSize())) {
return null;
}
// Search through all the partitions and check if their partition key values
// match the values being searched for.
for (HdfsPartition partition: partitionMap_.values()) {
if (partition.isDefaultPartition()) continue;
List<LiteralExpr> partitionValues = partition.getPartitionValues();
Preconditions.checkState(partitionValues.size() == targetValues.size());
boolean matchFound = true;
for (int i = 0; i < targetValues.size(); ++i) {
String value;
if (partitionValues.get(i) instanceof NullLiteral) {
value = getNullPartitionKeyValue();
} else {
value = partitionValues.get(i).getStringValue();
Preconditions.checkNotNull(value);
// See IMPALA-252: we deliberately map empty strings on to
// NULL when they're in partition columns. This is for
// backwards compatibility with Hive, and is clearly broken.
if (value.isEmpty()) value = getNullPartitionKeyValue();
}
if (!targetValues.get(i).equals(value)) {
matchFound = false;
break;
}
}
if (matchFound) {
return partition;
}
}
return null;
}
/**
* Gets hdfs partitions by the given partition set.
*/
public List<HdfsPartition> getPartitionsFromPartitionSet(
List<List<TPartitionKeyValue>> partitionSet) {
List<HdfsPartition> partitions = Lists.newArrayList();
for (List<TPartitionKeyValue> kv : partitionSet) {
HdfsPartition partition =
getPartitionFromThriftPartitionSpec(kv);
if (partition != null) partitions.add(partition);
}
return partitions;
}
/**
* Create columns corresponding to fieldSchemas. Throws a TableLoadingException if the
* metadata is incompatible with what we support.
*/
private void addColumnsFromFieldSchemas(List<FieldSchema> fieldSchemas)
throws TableLoadingException {
int pos = colsByPos_.size();
for (FieldSchema s: fieldSchemas) {
Type type = parseColumnType(s);
// Check if we support partitioning on columns of such a type.
if (pos < numClusteringCols_ && !type.supportsTablePartitioning()) {
throw new TableLoadingException(
String.format("Failed to load metadata for table '%s' because of " +
"unsupported partition-column type '%s' in partition column '%s'",
getFullName(), type.toString(), s.getName()));
}
Column col = new Column(s.getName(), type, s.getComment(), pos);
addColumn(col);
++pos;
}
}
/**
* Clear the partitions of an HdfsTable and the associated metadata.
*/
private void resetPartitions() {
partitionIds_.clear();
partitionMap_.clear();
nameToPartitionMap_.clear();
partitionValuesMap_.clear();
nullPartitionIds_.clear();
if (isStoredInImpaladCatalogCache()) {
// Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats.
for (int i = 0; i < numClusteringCols_; ++i) {
getColumns().get(i).getStats().setNumNulls(0);
getColumns().get(i).getStats().setNumDistinctValues(0);
partitionValuesMap_.add(Maps.<LiteralExpr, HashSet<Long>>newTreeMap());
nullPartitionIds_.add(Sets.<Long>newHashSet());
}
}
numHdfsFiles_ = 0;
totalHdfsBytes_ = 0;
}
/**
* Resets any partition metadata, creates the default partition and sets the base
* table directory path as well as the caching info from the HMS table.
*/
private void initializePartitionMetadata(
org.apache.hadoop.hive.metastore.api.Table msTbl) throws CatalogException {
Preconditions.checkNotNull(msTbl);
resetPartitions();
hdfsBaseDir_ = msTbl.getSd().getLocation();
// INSERT statements need to refer to this if they try to write to new partitions
// Scans don't refer to this because by definition all partitions they refer to
// exist.
addDefaultPartition(msTbl.getSd());
// We silently ignore cache directives that no longer exist in HDFS, and remove
// non-existing cache directives from the parameters.
isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters());
}
/**
* Create HdfsPartition objects corresponding to 'msPartitions' and add them to this
* table's partition list. Any partition metadata will be reset and loaded from
* scratch. For each partition created, we load the block metadata for each data file
* under it.
*
* If there are no partitions in the Hive metadata, a single partition is added with no
* partition keys.
*/
private void loadAllPartitions(
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws IOException,
CatalogException {
Preconditions.checkNotNull(msTbl);
initializePartitionMetadata(msTbl);
// Map of partition paths to their corresponding HdfsPartition objects. Populated
// using createPartition() calls. A single partition path can correspond to multiple
// partitions.
HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
if (msTbl.getPartitionKeysSize() == 0) {
Preconditions.checkArgument(msPartitions == null || msPartitions.isEmpty());
// This table has no partition key, which means it has no declared partitions.
// We model partitions slightly differently to Hive - every file must exist in a
// partition, so add a single partition with no keys which will get all the
// files in the table's root directory.
Path tblLocation = FileSystemUtil.createFullyQualifiedPath(getHdfsBaseDirPath());
HdfsPartition part = createPartition(msTbl.getSd(), null);
partsByPath.put(tblLocation, Lists.newArrayList(part));
if (isMarkedCached_) part.markCached();
addPartition(part);
FileSystem fs = tblLocation.getFileSystem(CONF);
accessLevel_ = getAvailableAccessLevel(fs, tblLocation);
} else {
for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
HdfsPartition partition = createPartition(msPartition.getSd(), msPartition);
addPartition(partition);
// If the partition is null, its HDFS path does not exist, and it was not added
// to this table's partition list. Skip the partition.
if (partition == null) continue;
if (msPartition.getParameters() != null) {
partition.setNumRows(getRowCount(msPartition.getParameters()));
}
if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
// TODO: READ_ONLY isn't exactly correct because the it's possible the
// partition does not have READ permissions either. When we start checking
// whether we can READ from a table, this should be updated to set the
// table's access level to the "lowest" effective level across all
// partitions. That is, if one partition has READ_ONLY and another has
// WRITE_ONLY the table's access level should be NONE.
accessLevel_ = TAccessLevel.READ_ONLY;
}
Path partDir = FileSystemUtil.createFullyQualifiedPath(
new Path(msPartition.getSd().getLocation()));
List<HdfsPartition> parts = partsByPath.get(partDir);
if (parts == null) {
partsByPath.put(partDir, Lists.newArrayList(partition));
} else {
parts.add(partition);
}
}
}
// Load the file metadata from scratch.
loadMetadataAndDiskIds(partsByPath, false);
}
/**
* Helper method to load the partition file metadata from scratch. This method is
* optimized for loading newly added partitions. For refreshing existing partitions
* use refreshPartitionFileMetadata(HdfsPartition).
*/
private void resetAndLoadPartitionFileMetadata(HdfsPartition partition) {
Path partDir = partition.getLocationPath();
try {
FileMetadataLoadStats stats =
resetAndLoadFileMetadata(partDir, Lists.newArrayList(partition));
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Loaded file metadata for %s %s", getFullName(),
stats.debugString()));
}
} catch (Exception e) {
LOG.error("Error loading file metadata for path: " + partDir.toString() +
". Partitions file metadata could be partially loaded.", e);
}
}
/**
* Returns the thread pool size to load the file metadata of this table.
* 'numPaths' is the number of paths for which the file metadata should be loaded.
*
* We use different thread pool sizes for HDFS and non-HDFS tables since the latter
* supports much higher throughput of RPC calls for listStatus/listFiles. For
* simplicity, the filesystem type is determined based on the table's root path and
* not for each partition individually. Based on our experiments, S3 showed a linear
* speed up (up to ~100x) with increasing number of loading threads where as the HDFS
* throughput was limited to ~5x in un-secure clusters and up to ~3.7x in secure
* clusters. We narrowed it down to scalability bottlenecks in HDFS RPC implementation
* (HADOOP-14558) on both the server and the client side.
*/
private int getLoadingThreadPoolSize(int numPaths) throws CatalogException {
Preconditions.checkState(numPaths > 0);
FileSystem tableFs;
try {
tableFs = (new Path(getLocation())).getFileSystem(CONF);
} catch (IOException e) {
throw new CatalogException("Invalid table path for table: " + getFullName(), e);
}
int threadPoolSize = FileSystemUtil.supportsStorageIds(tableFs) ?
MAX_HDFS_PARTITIONS_PARALLEL_LOAD : MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD;
// Thread pool size need not exceed the number of paths to be loaded.
return Math.min(numPaths, threadPoolSize);
}
/**
* Helper method to load the block locations for each partition directory in
* partsByPath using a thread pool. 'partsByPath' maps each partition directory to
* the corresponding HdfsPartition objects. If 'reuseFileMd' is true, the block
* metadata is incrementally refreshed, else it is reloaded from scratch.
*/
private void loadMetadataAndDiskIds(Map<Path, List<HdfsPartition>> partsByPath,
boolean reuseFileMd) throws CatalogException {
int numPathsToLoad = partsByPath.size();
// For tables without partitions we have no file metadata to load.
if (numPathsToLoad == 0) return;
int threadPoolSize = getLoadingThreadPoolSize(numPathsToLoad);
LOG.info(String.format("Loading file and block metadata for %s paths for table %s " +
"using a thread pool of size %s", numPathsToLoad, getFullName(),
threadPoolSize));
ExecutorService partitionLoadingPool = Executors.newFixedThreadPool(threadPoolSize);
try {
List<Future<FileMetadataLoadStats>> pendingMdLoadTasks = Lists.newArrayList();
for (Path p: partsByPath.keySet()) {
FileMetadataLoadRequest blockMdLoadReq =
new FileMetadataLoadRequest(p, partsByPath.get(p), reuseFileMd);
pendingMdLoadTasks.add(partitionLoadingPool.submit(blockMdLoadReq));
}
// Wait for the partition load tasks to finish.
int failedLoadTasks = 0;
for (Future<FileMetadataLoadStats> task: pendingMdLoadTasks) {
try {
FileMetadataLoadStats loadStats = task.get();
if (LOG.isTraceEnabled()) LOG.trace(loadStats.debugString());
} catch (ExecutionException | InterruptedException e) {
++failedLoadTasks;
if (failedLoadTasks <= MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG) {
LOG.error("Encountered an error loading block metadata for table: " +
getFullName(), e);
}
}
}
if (failedLoadTasks > 0) {
int errorsNotLogged = failedLoadTasks - MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG;
if (errorsNotLogged > 0) {
LOG.error(String.format("Error loading file metadata for %s paths for table " +
"%s. Only the first %s errors were logged.", failedLoadTasks, getFullName(),
MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG));
}
throw new TableLoadingException(String.format("Failed to load file metadata "
+ "for %s paths for table %s. Table's file metadata could be partially "
+ "loaded. Check the Catalog server log for more details.", failedLoadTasks,
getFullName()));
}
} finally {
partitionLoadingPool.shutdown();
}
LOG.info(String.format("Loaded file and block metadata for %s", getFullName()));
}
/**
* Gets the AccessLevel that is available for Impala for this table based on the
* permissions Impala has on the given path. If the path does not exist, recurses up
* the path until a existing parent directory is found, and inherit access permissions
* from that.
* Always returns READ_WRITE for S3 and ADLS files.
*/
private TAccessLevel getAvailableAccessLevel(FileSystem fs, Path location)
throws IOException {
// Avoid calling getPermissions() on file path for S3 files, as that makes a round
// trip to S3. Also, the S3A connector is currently unable to manage S3 permissions,
// so for now it is safe to assume that all files(objects) have READ_WRITE
// permissions, as that's what the S3A connector will always return too.
// TODO: Revisit if the S3A connector is updated to be able to manage S3 object
// permissions. (see HADOOP-13892)
if (FileSystemUtil.isS3AFileSystem(fs)) return TAccessLevel.READ_WRITE;
// The ADLS connector currently returns ACLs for files in ADLS, but can only map
// them to the ADLS client SPI and not the Hadoop users/groups, causing unexpected
// behavior. So ADLS ACLs are unsupported until the connector is able to map
// permissions to hadoop users/groups (HADOOP-14437).
if (FileSystemUtil.isADLFileSystem(fs)) return TAccessLevel.READ_WRITE;
FsPermissionChecker permissionChecker = FsPermissionChecker.getInstance();
while (location != null) {
try {
FsPermissionChecker.Permissions perms =
permissionChecker.getPermissions(fs, location);
if (perms.canReadAndWrite()) {
return TAccessLevel.READ_WRITE;
} else if (perms.canRead()) {
return TAccessLevel.READ_ONLY;
} else if (perms.canWrite()) {
return TAccessLevel.WRITE_ONLY;
}
return TAccessLevel.NONE;
} catch (FileNotFoundException e) {
location = location.getParent();
}
}
// Should never get here.
Preconditions.checkNotNull(location, "Error: no path ancestor exists");
return TAccessLevel.NONE;
}
/**
* Creates a new HdfsPartition object to be added to HdfsTable's partition list.
* Partitions may be empty, or may not even exist in the filesystem (a partition's
* location may have been changed to a new path that is about to be created by an
* INSERT). Also loads the file metadata for this partition. Returns new partition
* if successful or null if none was created.
*
* Throws CatalogException if the supplied storage descriptor contains metadata that
* Impala can't understand.
*/
public HdfsPartition createAndLoadPartition(
org.apache.hadoop.hive.metastore.api.Partition msPartition)
throws CatalogException {
HdfsPartition hdfsPartition = createPartition(msPartition.getSd(), msPartition);
resetAndLoadPartitionFileMetadata(hdfsPartition);
return hdfsPartition;
}
/**
* Same as createAndLoadPartition() but is optimized for loading file metadata of
* newly created HdfsPartitions in parallel.
*/
public List<HdfsPartition> createAndLoadPartitions(
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions)
throws CatalogException {
HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
List<HdfsPartition> addedParts = Lists.newArrayList();
for (org.apache.hadoop.hive.metastore.api.Partition partition: msPartitions) {
HdfsPartition hdfsPartition = createPartition(partition.getSd(), partition);
Preconditions.checkNotNull(hdfsPartition);
addedParts.add(hdfsPartition);
Path partitionPath = hdfsPartition.getLocationPath();
List<HdfsPartition> hdfsPartitions = partsByPath.get(partitionPath);
if (hdfsPartitions == null) {
partsByPath.put(partitionPath, Lists.newArrayList(hdfsPartition));
} else {
hdfsPartitions.add(hdfsPartition);
}
}
loadMetadataAndDiskIds(partsByPath, false);
return addedParts;
}
/**
* Creates a new HdfsPartition from a specified StorageDescriptor and an HMS partition
* object.
*/
private HdfsPartition createPartition(StorageDescriptor storageDescriptor,
org.apache.hadoop.hive.metastore.api.Partition msPartition)
throws CatalogException {
HdfsStorageDescriptor fileFormatDescriptor =
HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor);
List<LiteralExpr> keyValues = Lists.newArrayList();
if (msPartition != null) {
// Load key values
for (String partitionKey: msPartition.getValues()) {
Type type = getColumns().get(keyValues.size()).getType();
// Deal with Hive's special NULL partition key.
if (partitionKey.equals(nullPartitionKeyValue_)) {
keyValues.add(NullLiteral.create(type));
} else {
try {
keyValues.add(LiteralExpr.create(partitionKey, type));
} catch (Exception ex) {
LOG.warn("Failed to create literal expression of type: " + type, ex);
throw new CatalogException("Invalid partition key value of type: " + type,
ex);
}
}
}
for (Expr v: keyValues) v.analyzeNoThrow(null);
}
Path partDirPath = new Path(storageDescriptor.getLocation());
try {
FileSystem fs = partDirPath.getFileSystem(CONF);
multipleFileSystems_ = multipleFileSystems_ ||
!FileSystemUtil.isPathOnFileSystem(new Path(getLocation()), fs);
if (msPartition != null) {
HdfsCachingUtil.validateCacheParams(msPartition.getParameters());
}
HdfsPartition partition =
new HdfsPartition(this, msPartition, keyValues, fileFormatDescriptor,
new ArrayList<FileDescriptor>(), getAvailableAccessLevel(fs, partDirPath));
partition.checkWellFormed();
return partition;
} catch (IOException e) {
throw new CatalogException("Error initializing partition", e);
}
}
/**
* Adds the partition to the HdfsTable. Throws a CatalogException if the partition
* already exists in this table.
*/
public void addPartition(HdfsPartition partition) throws CatalogException {
if (partitionMap_.containsKey(partition.getId())) {
throw new CatalogException(String.format("Partition %s already exists in table %s",
partition.getPartitionName(), getFullName()));
}
if (partition.getFileFormat() == HdfsFileFormat.AVRO) hasAvroData_ = true;
partitionMap_.put(partition.getId(), partition);
totalHdfsBytes_ += partition.getSize();
numHdfsFiles_ += partition.getNumFileDescriptors();
updatePartitionMdAndColStats(partition);
}
/**
* Updates the HdfsTable's partition metadata, i.e. adds the id to the HdfsTable and
* populates structures used for speeding up partition pruning/lookup. Also updates
* column stats.
*/
private void updatePartitionMdAndColStats(HdfsPartition partition) {
if (partition.getPartitionValues().size() != numClusteringCols_) return;
partitionIds_.add(partition.getId());
nameToPartitionMap_.put(partition.getPartitionName(), partition);
if (!isStoredInImpaladCatalogCache()) return;
for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
ColumnStats stats = getColumns().get(i).getStats();
LiteralExpr literal = partition.getPartitionValues().get(i);
// Store partitions with null partition values separately
if (literal instanceof NullLiteral) {
stats.setNumNulls(stats.getNumNulls() + 1);
if (nullPartitionIds_.get(i).isEmpty()) {
stats.setNumDistinctValues(stats.getNumDistinctValues() + 1);
}
nullPartitionIds_.get(i).add(Long.valueOf(partition.getId()));
continue;
}
HashSet<Long> partitionIds = partitionValuesMap_.get(i).get(literal);
if (partitionIds == null) {
partitionIds = Sets.newHashSet();
partitionValuesMap_.get(i).put(literal, partitionIds);
stats.setNumDistinctValues(stats.getNumDistinctValues() + 1);
}
partitionIds.add(Long.valueOf(partition.getId()));
}
}
/**
* Drops the partition having the given partition spec from HdfsTable. Cleans up its
* metadata from all the mappings used to speed up partition pruning/lookup.
* Also updates partition column statistics. Given partitionSpec must match exactly
* one partition.
* Returns the HdfsPartition that was dropped. If the partition does not exist, returns
* null.
*/
public HdfsPartition dropPartition(List<TPartitionKeyValue> partitionSpec) {
return dropPartition(getPartitionFromThriftPartitionSpec(partitionSpec));
}
/**
* Drops a partition and updates partition column statistics. Returns the
* HdfsPartition that was dropped or null if the partition does not exist.
*/
private HdfsPartition dropPartition(HdfsPartition partition) {
if (partition == null) return null;
totalHdfsBytes_ -= partition.getSize();
numHdfsFiles_ -= partition.getNumFileDescriptors();
Preconditions.checkArgument(partition.getPartitionValues().size() ==
numClusteringCols_);
Long partitionId = partition.getId();
// Remove the partition id from the list of partition ids and other mappings.
partitionIds_.remove(partitionId);
partitionMap_.remove(partitionId);
nameToPartitionMap_.remove(partition.getPartitionName());
if (!isStoredInImpaladCatalogCache()) return partition;
for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
ColumnStats stats = getColumns().get(i).getStats();
LiteralExpr literal = partition.getPartitionValues().get(i);
// Check if this is a null literal.
if (literal instanceof NullLiteral) {
nullPartitionIds_.get(i).remove(partitionId);
stats.setNumNulls(stats.getNumNulls() - 1);
if (nullPartitionIds_.get(i).isEmpty()) {
stats.setNumDistinctValues(stats.getNumDistinctValues() - 1);
}
continue;
}
HashSet<Long> partitionIds = partitionValuesMap_.get(i).get(literal);
// If there are multiple partition ids corresponding to a literal, remove
// only this id. Otherwise, remove the <literal, id> pair.
if (partitionIds.size() > 1) partitionIds.remove(partitionId);
else {
partitionValuesMap_.get(i).remove(literal);
stats.setNumDistinctValues(stats.getNumDistinctValues() - 1);
}
}
return partition;
}
/**
* Drops the given partitions from this table. Cleans up its metadata from all the
* mappings used to speed up partition pruning/lookup. Also updates partitions column
* statistics. Returns the list of partitions that were dropped.
*/
public List<HdfsPartition> dropPartitions(List<HdfsPartition> partitions) {
ArrayList<HdfsPartition> droppedPartitions = Lists.newArrayList();
for (HdfsPartition partition: partitions) {
HdfsPartition hdfsPartition = dropPartition(partition);
if (hdfsPartition != null) droppedPartitions.add(hdfsPartition);
}
return droppedPartitions;
}
/**
* Adds or replaces the default partition.
*/
public void addDefaultPartition(StorageDescriptor storageDescriptor)
throws CatalogException {
// Default partition has no files and is not referred to by scan nodes. Data sinks
// refer to this to understand how to create new partitions.
HdfsStorageDescriptor hdfsStorageDescriptor =
HdfsStorageDescriptor.fromStorageDescriptor(this.name_, storageDescriptor);
HdfsPartition partition = HdfsPartition.defaultPartition(this,
hdfsStorageDescriptor);
partitionMap_.put(partition.getId(), partition);
}
@Override
public void load(boolean reuseMetadata, IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
load(reuseMetadata, client, msTbl, true, true, null);
}
/**
* Loads table metadata from the Hive Metastore.
*
* If 'reuseMetadata' is false, performs a full metadata load from the Hive Metastore,
* including partition and file metadata. Otherwise, loads metadata incrementally and
* updates this HdfsTable in place so that it is in sync with the Hive Metastore.
*
* Depending on the operation that triggered the table metadata load, not all the
* metadata may need to be updated. If 'partitionsToUpdate' is not null, it specifies a
* list of partitions for which metadata should be updated. Otherwise, all partition
* metadata will be updated from the Hive Metastore.
*
* If 'loadParitionFileMetadata' is true, file metadata of the specified partitions
* are reloaded from scratch. If 'partitionsToUpdate' is not specified, file metadata
* of all the partitions are loaded.
*
* If 'loadTableSchema' is true, the table schema is loaded from the Hive Metastore.
*
* There are several cases where existing file descriptors might be reused incorrectly:
* 1. an ALTER TABLE ADD PARTITION or dynamic partition insert is executed through
* Hive. This does not update the lastDdlTime.
* 2. Hdfs rebalancer is executed. This changes the block locations but doesn't update
* the mtime (file modification time).
* If any of these occur, user has to execute "invalidate metadata" to invalidate the
* metadata cache of the table and trigger a fresh load.
*/
public void load(boolean reuseMetadata, IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl,
boolean loadParitionFileMetadata, boolean loadTableSchema,
Set<String> partitionsToUpdate) throws TableLoadingException {
// turn all exceptions into TableLoadingException
msTable_ = msTbl;
try {
if (loadTableSchema) loadSchema(client, msTbl);
if (reuseMetadata && getCatalogVersion() == Catalog.INITIAL_CATALOG_VERSION) {
// This is the special case of CTAS that creates a 'temp' table that does not
// actually exist in the Hive Metastore.
initializePartitionMetadata(msTbl);
setTableStats(msTbl);
return;
}
// Load partition and file metadata
if (reuseMetadata) {
// Incrementally update this table's partitions and file metadata
LOG.info("Incrementally loading table metadata for: " + getFullName());
Preconditions.checkState(
partitionsToUpdate == null || loadParitionFileMetadata);
updateMdFromHmsTable(msTbl);
if (msTbl.getPartitionKeysSize() == 0) {
if (loadParitionFileMetadata) updateUnpartitionedTableFileMd();
} else {
updatePartitionsFromHms(
client, partitionsToUpdate, loadParitionFileMetadata);
}
LOG.info("Incrementally loaded table metadata for: " + getFullName());
} else {
// Load all partitions from Hive Metastore, including file metadata.
LOG.info("Fetching partition metadata from the Metastore: " + getFullName());
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
MetaStoreUtil.fetchAllPartitions(
client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
LOG.info("Fetched partition metadata from the Metastore: " + getFullName());
loadAllPartitions(msPartitions, msTbl);
}
if (loadTableSchema) setAvroSchema(client, msTbl);
setTableStats(msTbl);
numHdfsFiles_ = -1;
totalHdfsBytes_ = -1;
} catch (TableLoadingException e) {
throw e;
} catch (Exception e) {
throw new TableLoadingException("Failed to load metadata for table: "
+ getFullName(), e);
}
}
/**
* Updates the table metadata, including 'hdfsBaseDir_', 'isMarkedCached_',
* and 'accessLevel_' from 'msTbl'. Throws an IOException if there was an error
* accessing the table location path.
*/
private void updateMdFromHmsTable(org.apache.hadoop.hive.metastore.api.Table msTbl)
throws IOException {
Preconditions.checkNotNull(msTbl);
hdfsBaseDir_ = msTbl.getSd().getLocation();
isMarkedCached_ = HdfsCachingUtil.validateCacheParams(msTbl.getParameters());
if (msTbl.getPartitionKeysSize() == 0) {
Path location = new Path(hdfsBaseDir_);
FileSystem fs = location.getFileSystem(CONF);
accessLevel_ = getAvailableAccessLevel(fs, location);
}
setMetaStoreTable(msTbl);
}
/**
* Updates the file metadata of an unpartitioned HdfsTable.
*/
private void updateUnpartitionedTableFileMd() throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("update unpartitioned table: " + getFullName());
}
resetPartitions();
org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
Preconditions.checkNotNull(msTbl);
addDefaultPartition(msTbl.getSd());
HdfsPartition part = createPartition(msTbl.getSd(), null);
addPartition(part);
if (isMarkedCached_) part.markCached();
refreshPartitionFileMetadata(part);
}
/**
* Updates the partitions of an HdfsTable so that they are in sync with the Hive
* Metastore. It reloads partitions that were marked 'dirty' by doing a DROP + CREATE.
* It removes from this table partitions that no longer exist in the Hive Metastore and
* adds partitions that were added externally (e.g. using Hive) to the Hive Metastore
* but do not exist in this table. If 'loadParitionFileMetadata' is true, it triggers
* file/block metadata reload for the partitions specified in 'partitionsToUpdate', if
* any, or for all the table partitions if 'partitionsToUpdate' is null.
*/
private void updatePartitionsFromHms(IMetaStoreClient client,
Set<String> partitionsToUpdate, boolean loadParitionFileMetadata)
throws Exception {
if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName());
org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
Preconditions.checkNotNull(msTbl);
Preconditions.checkState(msTbl.getPartitionKeysSize() != 0);
Preconditions.checkState(loadParitionFileMetadata || partitionsToUpdate == null);
// Retrieve all the partition names from the Hive Metastore. We need this to
// identify the delta between partitions of the local HdfsTable and the table entry
// in the Hive Metastore. Note: This is a relatively "cheap" operation
// (~.3 secs for 30K partitions).
Set<String> msPartitionNames = Sets.newHashSet();
msPartitionNames.addAll(
client.listPartitionNames(db_.getName(), name_, (short) -1));
// Names of loaded partitions in this table
Set<String> partitionNames = Sets.newHashSet();
// Partitions for which file metadata must be loaded, grouped by partition paths.
Map<Path, List<HdfsPartition>> partitionsToUpdateFileMdByPath = Maps.newHashMap();
// Partitions that need to be dropped and recreated from scratch
List<HdfsPartition> dirtyPartitions = Lists.newArrayList();
// Partitions that need to be removed from this table. That includes dirty
// partitions as well as partitions that were removed from the Hive Metastore.
List<HdfsPartition> partitionsToRemove = Lists.newArrayList();
// Identify dirty partitions that need to be loaded from the Hive Metastore and
// partitions that no longer exist in the Hive Metastore.
for (HdfsPartition partition: partitionMap_.values()) {
// Ignore the default partition
if (partition.isDefaultPartition()) continue;
// Remove partitions that don't exist in the Hive Metastore. These are partitions
// that were removed from HMS using some external process, e.g. Hive.
if (!msPartitionNames.contains(partition.getPartitionName())) {
partitionsToRemove.add(partition);
}
if (partition.isDirty()) {
// Dirty partitions are updated by removing them from table's partition
// list and loading them from the Hive Metastore.
dirtyPartitions.add(partition);
} else {
if (partitionsToUpdate == null && loadParitionFileMetadata) {
Path partitionPath = partition.getLocationPath();
List<HdfsPartition> partitions =
partitionsToUpdateFileMdByPath.get(partitionPath);
if (partitions == null) {
partitionsToUpdateFileMdByPath.put(
partitionPath, Lists.newArrayList(partition));
} else {
partitions.add(partition);
}
}
}
Preconditions.checkNotNull(partition.getCachedMsPartitionDescriptor());
partitionNames.add(partition.getPartitionName());
}
partitionsToRemove.addAll(dirtyPartitions);
dropPartitions(partitionsToRemove);
// Load dirty partitions from Hive Metastore
loadPartitionsFromMetastore(dirtyPartitions, client);
// Identify and load partitions that were added in the Hive Metastore but don't
// exist in this table.
Set<String> newPartitionsInHms = Sets.difference(msPartitionNames, partitionNames);
loadPartitionsFromMetastore(newPartitionsInHms, client);
// If a list of modified partitions (old and new) is specified, don't reload file
// metadata for the new ones as they have already been detected in HMS and have been
// reloaded by loadPartitionsFromMetastore().
if (partitionsToUpdate != null) {
partitionsToUpdate.removeAll(newPartitionsInHms);
}
// Load file metadata. Until we have a notification mechanism for when a
// file changes in hdfs, it is sometimes required to reload all the file
// descriptors and block metadata of a table (e.g. REFRESH statement).
if (loadParitionFileMetadata) {
if (partitionsToUpdate != null) {
// Only reload file metadata of partitions specified in 'partitionsToUpdate'
Preconditions.checkState(partitionsToUpdateFileMdByPath.isEmpty());
partitionsToUpdateFileMdByPath = getPartitionsByPath(partitionsToUpdate);
}
loadMetadataAndDiskIds(partitionsToUpdateFileMdByPath, true);
}
}
/**
* Given a set of partition names, returns the corresponding HdfsPartition
* objects grouped by their base directory path.
*/
private HashMap<Path, List<HdfsPartition>> getPartitionsByPath(
Collection<String> partitionNames) {
HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
for (String partitionName: partitionNames) {
String partName = DEFAULT_PARTITION_NAME;
if (partitionName.length() > 0) {
// Trim the last trailing char '/' from each partition name
partName = partitionName.substring(0, partitionName.length()-1);
}
HdfsPartition partition = nameToPartitionMap_.get(partName);
Preconditions.checkNotNull(partition, "Invalid partition name: " + partName);
Path partitionPath = partition.getLocationPath();
List<HdfsPartition> partitions = partsByPath.get(partitionPath);
if (partitions == null) {
partsByPath.put(partitionPath, Lists.newArrayList(partition));
} else {
partitions.add(partition);
}
}
return partsByPath;
}
@Override
public void setTableStats(org.apache.hadoop.hive.metastore.api.Table msTbl) {
super.setTableStats(msTbl);
// For unpartitioned tables set the numRows in its partitions
// to the table's numRows.
if (numClusteringCols_ == 0 && !partitionMap_.isEmpty()) {
// Unpartitioned tables have a 'dummy' partition and a default partition.
// Temp tables used in CTAS statements have one partition.
Preconditions.checkState(partitionMap_.size() == 2 || partitionMap_.size() == 1);
for (HdfsPartition p: partitionMap_.values()) {
p.setNumRows(getNumRows());
}
}
}
/**
* Returns whether the table has the 'skip.header.line.count' property set.
*/
private boolean hasSkipHeaderLineCount() {
org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
if (msTbl == null) return false;
return msTbl.getParameters().containsKey(TBL_PROP_SKIP_HEADER_LINE_COUNT);
}
/**
* Parses and returns the value of the 'skip.header.line.count' table property. If the
* value is not set for the table, returns 0. If parsing fails or a value < 0 is found,
* the error parameter is updated to contain an error message.
*/
public int parseSkipHeaderLineCount(StringBuilder error) {
if (!hasSkipHeaderLineCount()) return 0;
return parseSkipHeaderLineCount(getMetaStoreTable().getParameters(), error);
}
/**
* Parses and returns the value of the 'skip.header.line.count' table property. The
* caller must ensure that the property is contained in the 'tblProperties' map. If
* parsing fails or a value < 0 is found, the error parameter is updated to contain an
* error message.
*/
public static int parseSkipHeaderLineCount(Map<String, String> tblProperties,
StringBuilder error) {
Preconditions.checkState(tblProperties != null);
Preconditions.checkState(tblProperties.containsKey(TBL_PROP_SKIP_HEADER_LINE_COUNT));
// Try to parse.
String string_value = tblProperties.get(TBL_PROP_SKIP_HEADER_LINE_COUNT);
int skipHeaderLineCount = 0;
String error_msg = String.format("Invalid value for table property %s: %s (value " +
"must be an integer >= 0)", TBL_PROP_SKIP_HEADER_LINE_COUNT, string_value);
try {
skipHeaderLineCount = Integer.parseInt(string_value);
} catch (NumberFormatException exc) {
error.append(error_msg);
}
if (skipHeaderLineCount < 0) error.append(error_msg);
return skipHeaderLineCount;
}
/**
* Sets avroSchema_ if the table or any of the partitions in the table are stored
* as Avro. Additionally, this method also reconciles the schema if the column
* definitions from the metastore differ from the Avro schema.
*/
private void setAvroSchema(IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
Preconditions.checkState(isSchemaLoaded_);
String inputFormat = msTbl.getSd().getInputFormat();
if (HdfsFileFormat.fromJavaClassName(inputFormat) == HdfsFileFormat.AVRO
|| hasAvroData_) {
// Look for Avro schema in TBLPROPERTIES and in SERDEPROPERTIES, with the latter
// taking precedence.
List<Map<String, String>> schemaSearchLocations = Lists.newArrayList();
schemaSearchLocations.add(
getMetaStoreTable().getSd().getSerdeInfo().getParameters());
schemaSearchLocations.add(getMetaStoreTable().getParameters());
avroSchema_ = AvroSchemaUtils.getAvroSchema(schemaSearchLocations);
if (avroSchema_ == null) {
// No Avro schema was explicitly set in the table metadata, so infer the Avro
// schema from the column definitions.
Schema inferredSchema = AvroSchemaConverter.convertFieldSchemas(
msTbl.getSd().getCols(), getFullName());
avroSchema_ = inferredSchema.toString();
}
String serdeLib = msTbl.getSd().getSerdeInfo().getSerializationLib();
if (serdeLib == null ||
serdeLib.equals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) {
// If the SerDe library is null or set to LazySimpleSerDe or is null, it
// indicates there is an issue with the table metadata since Avro table need a
// non-native serde. Instead of failing to load the table, fall back to
// using the fields from the storage descriptor (same as Hive).
return;
} else {
// Generate new FieldSchemas from the Avro schema. This step reconciles
// differences in the column definitions and the Avro schema. For
// Impala-created tables this step is not necessary because the same
// resolution is done during table creation. But Hive-created tables
// store the original column definitions, and not the reconciled ones.
List<ColumnDef> colDefs =
ColumnDef.createFromFieldSchemas(msTbl.getSd().getCols());
List<ColumnDef> avroCols = AvroSchemaParser.parse(avroSchema_);
StringBuilder warning = new StringBuilder();
List<ColumnDef> reconciledColDefs =
AvroSchemaUtils.reconcileSchemas(colDefs, avroCols, warning);
if (warning.length() != 0) {
LOG.warn(String.format("Warning while loading table %s:\n%s",
getFullName(), warning.toString()));
}
AvroSchemaUtils.setFromSerdeComment(reconciledColDefs);
// Reset and update nonPartFieldSchemas_ to the reconcicled colDefs.
nonPartFieldSchemas_.clear();
nonPartFieldSchemas_.addAll(ColumnDef.toFieldSchemas(reconciledColDefs));
// Update the columns as per the reconciled colDefs and re-load stats.
clearColumns();
addColumnsFromFieldSchemas(msTbl.getPartitionKeys());
addColumnsFromFieldSchemas(nonPartFieldSchemas_);
loadAllColumnStats(client);
}
}
}
/**
* Loads table schema and column stats from Hive Metastore.
*/
private void loadSchema(IMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws Exception {
nonPartFieldSchemas_.clear();
// set nullPartitionKeyValue from the hive conf.
nullPartitionKeyValue_ = client.getConfigValue(
"hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__");
// set NULL indicator string from table properties
nullColumnValue_ =
msTbl.getParameters().get(serdeConstants.SERIALIZATION_NULL_FORMAT);
if (nullColumnValue_ == null) nullColumnValue_ = DEFAULT_NULL_COLUMN_VALUE;
// Excludes partition columns.
nonPartFieldSchemas_.addAll(msTbl.getSd().getCols());
// The number of clustering columns is the number of partition keys.
numClusteringCols_ = msTbl.getPartitionKeys().size();
partitionLocationCompressor_.setClusteringColumns(numClusteringCols_);
clearColumns();
// Add all columns to the table. Ordering is important: partition columns first,
// then all other columns.
addColumnsFromFieldSchemas(msTbl.getPartitionKeys());
addColumnsFromFieldSchemas(nonPartFieldSchemas_);
loadAllColumnStats(client);
isSchemaLoaded_ = true;
}
/**
* Loads partitions from the Hive Metastore and adds them to the internal list of
* table partitions.
*/
private void loadPartitionsFromMetastore(List<HdfsPartition> partitions,
IMetaStoreClient client) throws Exception {
Preconditions.checkNotNull(partitions);
if (partitions.isEmpty()) return;
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Incrementally updating %d/%d partitions.",
partitions.size(), partitionMap_.size()));
}
Set<String> partitionNames = Sets.newHashSet();
for (HdfsPartition part: partitions) {
partitionNames.add(part.getPartitionName());
}
loadPartitionsFromMetastore(partitionNames, client);
}
/**
* Loads from the Hive Metastore the partitions that correspond to the specified
* 'partitionNames' and adds them to the internal list of table partitions.
*/
private void loadPartitionsFromMetastore(Set<String> partitionNames,
IMetaStoreClient client) throws Exception {
Preconditions.checkNotNull(partitionNames);
if (partitionNames.isEmpty()) return;
// Load partition metadata from Hive Metastore.
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
Lists.newArrayList();
msPartitions.addAll(MetaStoreUtil.fetchPartitionsByName(client,
Lists.newArrayList(partitionNames), db_.getName(), name_));
for (org.apache.hadoop.hive.metastore.api.Partition msPartition: msPartitions) {
HdfsPartition partition = createPartition(msPartition.getSd(), msPartition);
addPartition(partition);
// If the partition is null, its HDFS path does not exist, and it was not added to
// this table's partition list. Skip the partition.
if (partition == null) continue;
if (msPartition.getParameters() != null) {
partition.setNumRows(getRowCount(msPartition.getParameters()));
}
if (!TAccessLevelUtil.impliesWriteAccess(partition.getAccessLevel())) {
// TODO: READ_ONLY isn't exactly correct because the it's possible the
// partition does not have READ permissions either. When we start checking
// whether we can READ from a table, this should be updated to set the
// table's access level to the "lowest" effective level across all
// partitions. That is, if one partition has READ_ONLY and another has
// WRITE_ONLY the table's access level should be NONE.
accessLevel_ = TAccessLevel.READ_ONLY;
}
refreshPartitionFileMetadata(partition);
}
}
@Override
protected List<String> getColumnNamesWithHmsStats() {
List<String> ret = Lists.newArrayList();
// Only non-partition columns have column stats in the HMS.
for (Column column: getColumns().subList(numClusteringCols_, getColumns().size())) {
ret.add(column.getName().toLowerCase());
}
return ret;
}
@Override
protected synchronized void loadFromThrift(TTable thriftTable)
throws TableLoadingException {
super.loadFromThrift(thriftTable);
THdfsTable hdfsTable = thriftTable.getHdfs_table();
partitionLocationCompressor_ = new HdfsPartitionLocationCompressor(
numClusteringCols_, hdfsTable.getPartition_prefixes());
hdfsBaseDir_ = hdfsTable.getHdfsBaseDir();
nullColumnValue_ = hdfsTable.nullColumnValue;
nullPartitionKeyValue_ = hdfsTable.nullPartitionKeyValue;
multipleFileSystems_ = hdfsTable.multiple_filesystems;
hostIndex_.populate(hdfsTable.getNetwork_addresses());
resetPartitions();
try {
for (Map.Entry<Long, THdfsPartition> part: hdfsTable.getPartitions().entrySet()) {
HdfsPartition hdfsPart =
HdfsPartition.fromThrift(this, part.getKey(), part.getValue());
addPartition(hdfsPart);
}
} catch (CatalogException e) {
throw new TableLoadingException(e.getMessage());
}
avroSchema_ = hdfsTable.isSetAvroSchema() ? hdfsTable.getAvroSchema() : null;
isMarkedCached_ =
HdfsCachingUtil.validateCacheParams(getMetaStoreTable().getParameters());
}
@Override
public TTableDescriptor toThriftDescriptor(int tableId,
Set<Long> referencedPartitions) {
// Create thrift descriptors to send to the BE. The BE does not
// need any information below the THdfsPartition level.
TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
getTColumnDescriptors(), numClusteringCols_, name_, db_.getName());
tableDesc.setHdfsTable(getTHdfsTable(false, referencedPartitions));
return tableDesc;
}
@Override
public TTable toThrift() {
// Send all metadata between the catalog service and the FE.
TTable table = super.toThrift();
table.setTable_type(TTableType.HDFS_TABLE);
table.setHdfs_table(getTHdfsTable(true, null));
return table;
}
/**
* Create a THdfsTable corresponding to this HdfsTable. If includeFileDesc is true,
* then then all partitions and THdfsFileDescs of each partition should be included.
* Otherwise, don't include any THdfsFileDescs, and include only those partitions in
* the refPartitions set (the backend doesn't need metadata for unreferenced
* partitions). To prevent the catalog from hitting an OOM error while trying to
* serialize large partition incremental stats, we estimate the stats size and filter
* the incremental stats data from partition objects if the estimate exceeds
* --inc_stats_size_limit_bytes
*/
private THdfsTable getTHdfsTable(boolean includeFileDesc, Set<Long> refPartitions) {
// includeFileDesc implies all partitions should be included (refPartitions == null).
Preconditions.checkState(!includeFileDesc || refPartitions == null);
int numPartitions =
(refPartitions == null) ? partitionMap_.values().size() : refPartitions.size();
long statsSizeEstimate =
numPartitions * getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
boolean includeIncrementalStats =
(statsSizeEstimate < BackendConfig.INSTANCE.getIncStatsMaxSize());
Map<Long, THdfsPartition> idToPartition = Maps.newHashMap();
for (HdfsPartition partition: partitionMap_.values()) {
long id = partition.getId();
if (refPartitions == null || refPartitions.contains(id)) {
idToPartition.put(id,
partition.toThrift(includeFileDesc, includeIncrementalStats));
}
}
THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
nullPartitionKeyValue_, nullColumnValue_, idToPartition);
hdfsTable.setAvroSchema(avroSchema_);
hdfsTable.setMultiple_filesystems(multipleFileSystems_);
if (includeFileDesc) {
// Network addresses are used only by THdfsFileBlocks which are inside
// THdfsFileDesc, so include network addreses only when including THdfsFileDesc.
hdfsTable.setNetwork_addresses(hostIndex_.getList());
}
hdfsTable.setPartition_prefixes(partitionLocationCompressor_.getPrefixes());
return hdfsTable;
}
public long getTotalHdfsBytes() { return totalHdfsBytes_; }
public String getHdfsBaseDir() { return hdfsBaseDir_; }
public Path getHdfsBaseDirPath() { return new Path(hdfsBaseDir_); }
public boolean isAvroTable() { return avroSchema_ != null; }
/**
* Get the index of hosts that store replicas of blocks of this table.
*/
public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; }
/**
* Returns the file format that the majority of partitions are stored in.
*/
public HdfsFileFormat getMajorityFormat() {
Map<HdfsFileFormat, Integer> numPartitionsByFormat = Maps.newHashMap();
for (HdfsPartition partition: partitionMap_.values()) {
HdfsFileFormat format = partition.getInputFormatDescriptor().getFileFormat();
Integer numPartitions = numPartitionsByFormat.get(format);
if (numPartitions == null) {
numPartitions = Integer.valueOf(1);
} else {
numPartitions = Integer.valueOf(numPartitions.intValue() + 1);
}
numPartitionsByFormat.put(format, numPartitions);
}
int maxNumPartitions = Integer.MIN_VALUE;
HdfsFileFormat majorityFormat = null;
for (Map.Entry<HdfsFileFormat, Integer> entry: numPartitionsByFormat.entrySet()) {
if (entry.getValue().intValue() > maxNumPartitions) {
majorityFormat = entry.getKey();
maxNumPartitions = entry.getValue().intValue();
}
}
Preconditions.checkNotNull(majorityFormat);
return majorityFormat;
}
/**
* Returns the HDFS paths corresponding to HdfsTable partitions that don't exist in
* the Hive Metastore. An HDFS path is represented as a list of strings values, one per
* partition key column.
*/
public List<List<String>> getPathsWithoutPartitions() throws CatalogException {
HashSet<List<LiteralExpr>> existingPartitions = new HashSet<List<LiteralExpr>>();
// Get the list of partition values of existing partitions in Hive Metastore.
for (HdfsPartition partition: partitionMap_.values()) {
if (partition.isDefaultPartition()) continue;
existingPartitions.add(partition.getPartitionValues());
}
List<String> partitionKeys = Lists.newArrayList();
for (int i = 0; i < numClusteringCols_; ++i) {
partitionKeys.add(getColumns().get(i).getName());
}
Path basePath = new Path(hdfsBaseDir_);
List<List<String>> partitionsNotInHms = new ArrayList<List<String>>();
try {
getAllPartitionsNotInHms(basePath, partitionKeys, existingPartitions,
partitionsNotInHms);
} catch (Exception e) {
throw new CatalogException(String.format("Failed to recover partitions for %s " +
"with exception:%s.", getFullName(), e));
}
return partitionsNotInHms;
}
/**
* Returns all partitions which match the partition keys directory structure and pass
* type compatibility check. Also these partitions are not already part of the table.
*/
private void getAllPartitionsNotInHms(Path path, List<String> partitionKeys,
HashSet<List<LiteralExpr>> existingPartitions,
List<List<String>> partitionsNotInHms) throws IOException {
FileSystem fs = path.getFileSystem(CONF);
List<String> partitionValues = Lists.newArrayList();
List<LiteralExpr> partitionExprs = Lists.newArrayList();
getAllPartitionsNotInHms(path, partitionKeys, 0, fs, partitionValues,
partitionExprs, existingPartitions, partitionsNotInHms);
}
/**
* Returns all partitions which match the partition keys directory structure and pass
* the type compatibility check.
*
* path e.g. c1=1/c2=2/c3=3
* partitionKeys The ordered partition keys. e.g.("c1", "c2", "c3")
* depth The start position in partitionKeys to match the path name.
* partitionValues The partition values used to create a partition.
* partitionExprs The list of LiteralExprs which is used to avoid duplicate partitions.
* E.g. Having /c1=0001 and /c1=01, we should make sure only one partition
* will be added.
* existingPartitions All partitions which exist in Hive Metastore or newly added.
* partitionsNotInHms Contains all the recovered partitions.
*/
private void getAllPartitionsNotInHms(Path path, List<String> partitionKeys,
int depth, FileSystem fs, List<String> partitionValues,
List<LiteralExpr> partitionExprs, HashSet<List<LiteralExpr>> existingPartitions,
List<List<String>> partitionsNotInHms) throws IOException {
if (depth == partitionKeys.size()) {
if (existingPartitions.contains(partitionExprs)) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Skip recovery of path '%s' because it already "
+ "exists in metastore", path.toString()));
}
} else {
partitionsNotInHms.add(partitionValues);
existingPartitions.add(partitionExprs);
}
return;
}
FileStatus[] statuses = FileSystemUtil.listStatus(fs, path);
if (statuses == null) return;
for (FileStatus status: statuses) {
if (!status.isDirectory()) continue;
Pair<String, LiteralExpr> keyValues =
getTypeCompatibleValue(status.getPath(), partitionKeys.get(depth));
if (keyValues == null) continue;
List<String> currentPartitionValues = Lists.newArrayList(partitionValues);
List<LiteralExpr> currentPartitionExprs = Lists.newArrayList(partitionExprs);
currentPartitionValues.add(keyValues.first);
currentPartitionExprs.add(keyValues.second);
getAllPartitionsNotInHms(status.getPath(), partitionKeys, depth + 1, fs,
currentPartitionValues, currentPartitionExprs,
existingPartitions, partitionsNotInHms);
}
}
/**
* Checks that the last component of 'path' is of the form "<partitionkey>=<v>"
* where 'v' is a type-compatible value from the domain of the 'partitionKey' column.
* If not, returns null, otherwise returns a Pair instance, the first element is the
* original value, the second element is the LiteralExpr created from the original
* value.
*/
private Pair<String, LiteralExpr> getTypeCompatibleValue(Path path,
String partitionKey) {
String partName[] = path.getName().split("=");
if (partName.length != 2 || !partName[0].equals(partitionKey)) return null;
// Check Type compatibility for Partition value.
Column column = getColumn(partName[0]);
Preconditions.checkNotNull(column);
Type type = column.getType();
LiteralExpr expr = null;
if (!partName[1].equals(getNullPartitionKeyValue())) {
try {
expr = LiteralExpr.create(partName[1], type);
// Skip large value which exceeds the MAX VALUE of specified Type.
if (expr instanceof NumericLiteral) {
if (NumericLiteral.isOverflow(((NumericLiteral)expr).getValue(), type)) {
LOG.warn(String.format("Skip the overflow value (%s) for Type (%s).",
partName[1], type.toSql()));
return null;
}
}
} catch (Exception ex) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Invalid partition value (%s) for Type (%s).",
partName[1], type.toSql()));
}
return null;
}
} else {
expr = new NullLiteral();
}
return new Pair<String, LiteralExpr>(partName[1], expr);
}
/**
* Returns an estimated row count for the given number of file bytes. The row count is
* extrapolated using the table-level row count and file bytes statistics.
* Returns zero only if the given file bytes is zero.
* Returns -1 if:
* - stats extrapolation has been disabled
* - the given file bytes statistic is negative
* - the row count or the file byte statistic is missing
* - the file bytes statistic is zero or negative
* - the row count statistic is zero and the file bytes is non-zero
* Otherwise, returns a value >= 1.
*/
public long getExtrapolatedNumRows(long fileBytes) {
if (!BackendConfig.INSTANCE.enableStatsExtrapolation()) return -1;
if (fileBytes == 0) return 0;
if (fileBytes < 0) return -1;
if (tableStats_.num_rows < 0 || tableStats_.total_file_bytes <= 0) return -1;
if (tableStats_.num_rows == 0 && tableStats_.total_file_bytes != 0) return -1;
double rowsPerByte = tableStats_.num_rows / (double) tableStats_.total_file_bytes;
double extrapolatedNumRows = fileBytes * rowsPerByte;
return (long) Math.max(1, Math.round(extrapolatedNumRows));
}
/**
* Returns statistics on this table as a tabular result set. Used for the
* SHOW TABLE STATS statement. The schema of the returned TResultSet is set
* inside this method.
*/
public TResultSet getTableStats() {
TResultSet result = new TResultSet();
TResultSetMetadata resultSchema = new TResultSetMetadata();
result.setSchema(resultSchema);
for (int i = 0; i < numClusteringCols_; ++i) {
// Add the partition-key values as strings for simplicity.
Column partCol = getColumns().get(i);
TColumn colDesc = new TColumn(partCol.getName(), Type.STRING.toThrift());
resultSchema.addToColumns(colDesc);
}
boolean statsExtrap = BackendConfig.INSTANCE.enableStatsExtrapolation();
resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
if (statsExtrap) {
resultSchema.addToColumns(new TColumn("Extrap #Rows", Type.BIGINT.toThrift()));
}
resultSchema.addToColumns(new TColumn("#Files", Type.BIGINT.toThrift()));
resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Bytes Cached", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Cache Replication", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Format", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Incremental stats", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Location", Type.STRING.toThrift()));
// Pretty print partitions and their stats.
ArrayList<HdfsPartition> orderedPartitions =
Lists.newArrayList(partitionMap_.values());
Collections.sort(orderedPartitions);
long totalCachedBytes = 0L;
for (HdfsPartition p: orderedPartitions) {
// Ignore dummy default partition.
if (p.isDefaultPartition()) continue;
TResultRowBuilder rowBuilder = new TResultRowBuilder();
// Add the partition-key values (as strings for simplicity).
for (LiteralExpr expr: p.getPartitionValues()) {
rowBuilder.add(expr.getStringValue());
}
// Add rows, extrapolated rows, files, bytes, cache stats, and file format.
rowBuilder.add(p.getNumRows());
// Compute and report the extrapolated row count because the set of files could
// have changed since we last computed stats for this partition. We also follow
// this policy during scan-cardinality estimation.
if (statsExtrap) rowBuilder.add(getExtrapolatedNumRows(p.getSize()));
rowBuilder.add(p.getFileDescriptors().size()).addBytes(p.getSize());
if (!p.isMarkedCached()) {
// Helps to differentiate partitions that have 0B cached versus partitions
// that are not marked as cached.
rowBuilder.add("NOT CACHED");
rowBuilder.add("NOT CACHED");
} else {
// Calculate the number the number of bytes that are cached.
long cachedBytes = 0L;
for (FileDescriptor fd: p.getFileDescriptors()) {
int numBlocks = fd.getNumFileBlocks();
for (int i = 0; i < numBlocks; ++i) {
FbFileBlock block = fd.getFbFileBlock(i);
if (FileBlock.hasCachedReplica(block)) {
cachedBytes += FileBlock.getLength(block);
}
}
}
totalCachedBytes += cachedBytes;
rowBuilder.addBytes(cachedBytes);
// Extract cache replication factor from the parameters of the table
// if the table is not partitioned or directly from the partition.
Short rep = HdfsCachingUtil.getCachedCacheReplication(
numClusteringCols_ == 0 ?
p.getTable().getMetaStoreTable().getParameters() :
p.getParameters());
rowBuilder.add(rep.toString());
}
rowBuilder.add(p.getInputFormatDescriptor().getFileFormat().toString());
rowBuilder.add(String.valueOf(p.hasIncrementalStats()));
rowBuilder.add(p.getLocation());
result.addToRows(rowBuilder.get());
}
// For partitioned tables add a summary row at the bottom.
if (numClusteringCols_ > 0) {
TResultRowBuilder rowBuilder = new TResultRowBuilder();
int numEmptyCells = numClusteringCols_ - 1;
rowBuilder.add("Total");
for (int i = 0; i < numEmptyCells; ++i) {
rowBuilder.add("");
}
// Total rows, extrapolated rows, files, bytes, cache stats.
// Leave format empty.
rowBuilder.add(getNumRows());
// Compute and report the extrapolated row count because the set of files could
// have changed since we last computed stats for this partition. We also follow
// this policy during scan-cardinality estimation.
if (statsExtrap) rowBuilder.add(getExtrapolatedNumRows(totalHdfsBytes_));
rowBuilder.add(numHdfsFiles_).addBytes(totalHdfsBytes_)
.addBytes(totalCachedBytes).add("").add("").add("").add("");
result.addToRows(rowBuilder.get());
}
return result;
}
/**
* Returns files info for the given dbname/tableName and partition spec.
* Returns files info for all partitions, if partition spec is null, ordered
* by partition.
*/
public TResultSet getFiles(List<List<TPartitionKeyValue>> partitionSet)
throws CatalogException {
TResultSet result = new TResultSet();
TResultSetMetadata resultSchema = new TResultSetMetadata();
result.setSchema(resultSchema);
resultSchema.addToColumns(new TColumn("Path", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Size", Type.STRING.toThrift()));
resultSchema.addToColumns(new TColumn("Partition", Type.STRING.toThrift()));
result.setRows(Lists.<TResultRow>newArrayList());
List<HdfsPartition> orderedPartitions;
if (partitionSet == null) {
orderedPartitions = Lists.newArrayList(partitionMap_.values());
} else {
// Get a list of HdfsPartition objects for the given partition set.
orderedPartitions = getPartitionsFromPartitionSet(partitionSet);
}
Collections.sort(orderedPartitions);
for (HdfsPartition p: orderedPartitions) {
List<FileDescriptor> orderedFds = Lists.newArrayList(p.getFileDescriptors());
Collections.sort(orderedFds);
for (FileDescriptor fd: orderedFds) {
TResultRowBuilder rowBuilder = new TResultRowBuilder();
rowBuilder.add(p.getLocation() + "/" + fd.getFileName());
rowBuilder.add(PrintUtils.printBytes(fd.getFileLength()));
rowBuilder.add(p.getPartitionName());
result.addToRows(rowBuilder.get());
}
}
return result;
}
/**
* Constructs a partition name from a list of TPartitionKeyValue objects.
*/
public static String constructPartitionName(List<TPartitionKeyValue> partitionSpec) {
List<String> partitionCols = Lists.newArrayList();
List<String> partitionVals = Lists.newArrayList();
for (TPartitionKeyValue kv: partitionSpec) {
partitionCols.add(kv.getName());
partitionVals.add(kv.getValue());
}
return org.apache.hadoop.hive.common.FileUtils.makePartName(partitionCols,
partitionVals);
}
/**
* Reloads the metadata of partition 'oldPartition' by removing
* it from the table and reconstructing it from the HMS partition object
* 'hmsPartition'. If old partition is null then nothing is removed and
* and partition constructed from 'hmsPartition' is simply added.
*/
public void reloadPartition(HdfsPartition oldPartition, Partition hmsPartition)
throws CatalogException {
HdfsPartition refreshedPartition = createPartition(
hmsPartition.getSd(), hmsPartition);
refreshPartitionFileMetadata(refreshedPartition);
Preconditions.checkArgument(oldPartition == null
|| oldPartition.compareTo(refreshedPartition) == 0);
dropPartition(oldPartition);
addPartition(refreshedPartition);
}
/**
* Selects a random sample of files from the given list of partitions such that the sum
* of file sizes is at least 'percentBytes' percent of the total number of bytes in
* those partitions. The sample is returned as a map from partition id to a list of
* file descriptors selected from that partition.
* This function allocates memory proportional to the number of files in 'inputParts'.
* Its implementation tries to minimize the constant factor and object generation.
* The given 'randomSeed' is used for random number generation.
* The 'percentBytes' parameter must be between 0 and 100.
*/
public Map<Long, List<FileDescriptor>> getFilesSample(
Collection<HdfsPartition> inputParts, long percentBytes, long randomSeed) {
Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
// Conservative max size for Java arrays. The actual maximum varies
// from JVM version and sometimes between configurations.
final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
if (numHdfsFiles_ > JVM_MAX_ARRAY_SIZE) {
throw new IllegalStateException(String.format(
"Too many files to generate a table sample. " +
"Table '%s' has %s files, but a maximum of %s files are supported.",
getTableName().toString(), numHdfsFiles_, JVM_MAX_ARRAY_SIZE));
}
int totalNumFiles = (int) numHdfsFiles_;
// Ensure a consistent ordering of files for repeatable runs. The files within a
// partition are already ordered based on how they are loaded in the catalog.
List<HdfsPartition> orderedParts = Lists.newArrayList(inputParts);
Collections.sort(orderedParts);
// fileIdxs contains indexes into the file descriptor lists of all inputParts
// parts[i] contains the partition corresponding to fileIdxs[i]
// fileIdxs[i] is an index into the file descriptor list of the partition parts[i]
// Use max size to avoid looping over inputParts for the exact size.
// The purpose of these arrays is to efficiently avoid selecting the same file
// multiple times during the sampling, regardless of the sample percent. We purposely
// avoid generating objects proportional to the number of files.
int[] fileIdxs = new int[totalNumFiles];
HdfsPartition[] parts = new HdfsPartition[totalNumFiles];
int idx = 0;
long totalBytes = 0;
for (HdfsPartition part: orderedParts) {
totalBytes += part.getSize();
int numFds = part.getNumFileDescriptors();
for (int fileIdx = 0; fileIdx < numFds; ++fileIdx) {
fileIdxs[idx] = fileIdx;
parts[idx] = part;
++idx;
}
}
int numFilesRemaining = idx;
double fracPercentBytes = (double) percentBytes / 100;
long targetBytes = (long) Math.round(totalBytes * fracPercentBytes);
// Randomly select files until targetBytes has been reached or all files have been
// selected.
Random rnd = new Random(randomSeed);
long selectedBytes = 0;
Map<Long, List<FileDescriptor>> result = Maps.newHashMap();
while (selectedBytes < targetBytes && numFilesRemaining > 0) {
int selectedIdx = Math.abs(rnd.nextInt()) % numFilesRemaining;
HdfsPartition part = parts[selectedIdx];
Long partId = Long.valueOf(part.getId());
List<FileDescriptor> sampleFileIdxs = result.get(partId);
if (sampleFileIdxs == null) {
sampleFileIdxs = Lists.newArrayList();
result.put(partId, sampleFileIdxs);
}
FileDescriptor fd = part.getFileDescriptors().get(fileIdxs[selectedIdx]);
sampleFileIdxs.add(fd);
selectedBytes += fd.getFileLength();
// Avoid selecting the same file multiple times.
fileIdxs[selectedIdx] = fileIdxs[numFilesRemaining - 1];
parts[selectedIdx] = parts[numFilesRemaining - 1];
--numFilesRemaining;
}
return result;
}
}