blob: 4ee6beaede6efaeea04228661783457b0241bd62 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.impala.catalog.HdfsPartition.Builder;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.ThreadNameAnnotator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Utility to coordinate the issuing of parallel metadata loading requests
* on a thread pool.
* This may safely be used even to load a single path: if only one path is to
* be loaded, this avoids creating any extra threads and uses the current thread
* instead.
public class ParallelFileMetadataLoader {
private final static Logger LOG = LoggerFactory.getLogger(
private static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD =
private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD =
public static final AtomicInteger TOTAL_THREADS = new AtomicInteger(0);
public static final AtomicInteger TOTAL_TABLES = new AtomicInteger(0);
// Maximum number of errors logged when loading partitioned tables.
private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100;
private final String logPrefix_;
private final Map<Path, FileMetadataLoader> loaders_;
private final Map<Path, List<HdfsPartition.Builder>> partsByPath_;
private final FileSystem fs_;
public ParallelFileMetadataLoader(FileSystem fs,
Collection<Builder> partBuilders,
ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean isRecursive,
@Nullable ListMap<TNetworkAddress> hostIndex, String debugAction,
String logPrefix) {
this(fs, partBuilders, writeIdList, validTxnList, isRecursive, hostIndex, debugAction,
logPrefix, new GroupedContentFiles(), false);
public ParallelFileMetadataLoader(FileSystem fs,
Collection<Builder> partBuilders,
ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean isRecursive,
@Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String logPrefix,
GroupedContentFiles icebergFiles, boolean canDataBeOutsideOfTableLocation) {
if (writeIdList != null || validTxnList != null) {
// make sure that both either both writeIdList and validTxnList are set or both
// of them are not.
Preconditions.checkState(writeIdList != null && validTxnList != null);
// Group the partitions by their path (multiple partitions may point to the same
// path).
partsByPath_ = Maps.newHashMap();
for (HdfsPartition.Builder p : partBuilders) {
Path partPath = FileSystemUtil.createFullyQualifiedPath(new Path(p.getLocation()));
partsByPath_.computeIfAbsent(partPath, (path) -> new ArrayList<>())
// Create a FileMetadataLoader for each path.
loaders_ = Maps.newHashMap();
for (Map.Entry<Path, List<HdfsPartition.Builder>> e : partsByPath_.entrySet()) {
List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
FileMetadataLoader loader;
HdfsFileFormat format = e.getValue().get(0).getFileFormat();
if (format.equals(HdfsFileFormat.ICEBERG)) {
loader = new IcebergFileMetadataLoader(e.getKey(), isRecursive, oldFds, hostIndex,
validTxnList, writeIdList, Preconditions.checkNotNull(icebergFiles),
} else {
loader = new FileMetadataLoader(e.getKey(), isRecursive, oldFds, hostIndex,
validTxnList, writeIdList, format);
// If there is a cached partition mapped to this path, we recompute the block
// locations even if the underlying files have not changed.
// This is done to keep the cached block metadata up to date.
boolean hasCachedPartition = Iterables.any(e.getValue(),
loaders_.put(e.getKey(), loader);
this.logPrefix_ = logPrefix;
this.fs_ = fs;
* Loads the file metadata for the given list of Partitions in the constructor. If the
* load is successful also set the fileDescriptors in the HdfsPartition.Builders.
* @throws TableLoadingException
void load() throws TableLoadingException {
// Store the loaded FDs into the partitions.
for (Map.Entry<Path, List<HdfsPartition.Builder>> e : partsByPath_.entrySet()) {
Path p = e.getKey();
FileMetadataLoader loader = loaders_.get(p);
for (HdfsPartition.Builder partBuilder : e.getValue()) {
// Checks if we can reuse the old file descriptors. Partition builders in the list
// may have different old file descriptors. We need to verify them one by one.
if ((!loader.hasFilesChangedCompareTo(partBuilder.getFileDescriptors()))) {
LOG.trace("Detected files unchanged on partition {}",
List<FileDescriptor> deleteDescriptors = loader.getLoadedDeleteDeltaFds();
if (deleteDescriptors != null && !deleteDescriptors.isEmpty()) {
} else {
* Call 'load()' in parallel on all of the loaders. If any loaders fail, throws
* an exception. However, any successful loaders are guaranteed to complete
* before any exception is thrown.
private void loadInternal() throws TableLoadingException {
if (loaders_.isEmpty()) return;
int failedLoadTasks = 0;
int poolSize = getPoolSize(loaders_.size(), fs_);
ExecutorService pool = createPool(poolSize, logPrefix_);
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix_)) {
List<Pair<FileMetadataLoader, Future<Void>>> futures =
new ArrayList<>(loaders_.size());
for (FileMetadataLoader loader : loaders_.values()) {
futures.add(new Pair<>(
loader, pool.submit(() -> { loader.load(); return null; })));
// Wait for the loaders to finish.
for (int i = 0; i < futures.size(); i++) {
try {
} catch (ExecutionException | InterruptedException e) {
LOG.error(logPrefix_ + " encountered an error loading data for path " +
futures.get(i).first.getPartDir(), e);
} finally {
if (failedLoadTasks > 0) {
int errorsNotLogged = failedLoadTasks - MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG;
if (errorsNotLogged > 0) {
LOG.error(logPrefix_ + " error loading {} paths. Only the first {} errors " +
"were logged", failedLoadTasks, MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG);
throw new TableLoadingException(logPrefix_ + ": failed to load " + failedLoadTasks
+ " paths. Check the catalog server log for more details.");
* Returns the thread pool to load the file metadata. Callers should use
* {@link #getPoolSize(int, FileSystem)} to get a correct pool size.
public static ExecutorService createPool(int poolSize, String logPrefix) {
Preconditions.checkState(poolSize > 0, "Illegal poolSize: {}", poolSize);
if (poolSize == 1) {
return MoreExecutors.newDirectExecutorService();
} else {"{} using a thread pool of size {}", logPrefix, poolSize);
return Executors.newFixedThreadPool(poolSize);
* Returns the thread pool size to load the file metadata.
* We use different thread pool sizes for HDFS and non-HDFS tables since the latter
* supports much higher throughput of RPC calls for listStatus/listFiles. For
* simplicity, the filesystem type is determined based on the table's root path and
* not for each partition individually. Based on our experiments, S3 showed a linear
* speed up (up to ~100x) with increasing number of loading threads where as the HDFS
* throughput was limited to ~5x in un-secure clusters and up to ~3.7x in secure
* clusters. We narrowed it down to scalability bottlenecks in HDFS RPC implementation
* (HADOOP-14558) on both the server and the client side.
public static int getPoolSize(int numLoaders, FileSystem fs) {
int poolSize = FileSystemUtil.supportsStorageIds(fs) ?
// Thread pool size need not exceed the number of paths to be loaded.
poolSize = Math.min(numLoaders, poolSize);
return poolSize;