blob: 06783848247c3ad69c805c51bf7b05c09f796cec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.metastore;
import static org.apache.hadoop.hive.common.AcidConstants.BASE_PREFIX;
import static org.apache.hadoop.hive.common.AcidConstants.DELETE_DELTA_PREFIX;
import static org.apache.hadoop.hive.common.AcidConstants.DELTA_PREFIX;
import static org.apache.hadoop.hive.common.AcidConstants.VISIBILITY_PREFIX;
import static org.apache.hadoop.hive.metastore.PartFilterExprUtil.createExpressionProxy;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getAllPartitionsOf;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getDataLocation;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartColNames;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartCols;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionListByFilterExp;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionName;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionSpec;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPartitionColtoTypeMap;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.getPath;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isPartitioned;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.MetastoreException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Verify that the information in the metastore matches what is on the
* filesystem. Return a CheckResult object containing lists of missing and any
* unexpected tables and partitions.
*/
public class HiveMetaStoreChecker {
public static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreChecker.class);
private final IMetaStoreClient msc;
private final Configuration conf;
private final long partitionExpirySeconds;
public static final PathFilter HIDDEN_FILES_PATH_FILTER =
p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");
public HiveMetaStoreChecker(IMetaStoreClient msc, Configuration conf) {
this(msc, conf, -1);
}
public HiveMetaStoreChecker(IMetaStoreClient msc, Configuration conf, long partitionExpirySeconds) {
super();
this.msc = msc;
this.conf = conf;
this.partitionExpirySeconds = partitionExpirySeconds;
}
public IMetaStoreClient getMsc() {
return msc;
}
/**
* Check the metastore for inconsistencies, data missing in either the
* metastore or on the dfs.
*
* @param catName
* name of the catalog, if not specified default catalog will be used.
* @param dbName
* name of the database, if not specified the default will be used.
* @param tableName
* Table we want to run the check for. If null we'll check all the
* tables in the database.
* @param filterExp
* Filter expression which is used to prune th partition from the
* metastore and FileSystem.
* @param table
* @return Results of the check
* @throws MetastoreException
* Failed to get required information from the metastore.
* @throws IOException
* Most likely filesystem related
*/
public CheckResult checkMetastore(String catName, String dbName, String tableName,
byte[] filterExp, Table table)
throws MetastoreException, IOException {
CheckResult result = new CheckResult();
if (dbName == null || "".equalsIgnoreCase(dbName)) {
dbName = Warehouse.DEFAULT_DATABASE_NAME;
}
try {
if (tableName == null || "".equals(tableName)) {
// TODO: I do not think this is used by anything other than tests
// no table specified, check all tables and all partitions.
List<String> tables = new ArrayList<>();
try{
tables = getMsc().getTables(catName, dbName, ".*");
}catch(UnknownDBException ex){
//ignore database exception.
}
for (String currentTableName : tables) {
checkTable(catName, dbName, currentTableName, null, null, result);
}
findUnknownTables(catName, dbName, tables, result);
} else if (filterExp != null) {
// check for specified partitions which matches filter expression
checkTable(catName, dbName, tableName, filterExp, table, result);
} else {
// only one table, let's check all partitions
checkTable(catName, dbName, tableName, null, table, result);
}
LOG.info("Number of partitionsNotInMs=" + result.getPartitionsNotInMs()
+ ", partitionsNotOnFs=" + result.getPartitionsNotOnFs()
+ ", tablesNotInMs=" + result.getTablesNotInMs()
+ ", tablesNotOnFs=" + result.getTablesNotOnFs()
+ ", expiredPartitions=" + result.getExpiredPartitions());
} catch (TException e) {
throw new MetastoreException(e);
}
return result;
}
/**
* Check for table directories that aren't in the metastore.
*
* @param catName
* name of the catalog, if not specified default catalog will be used.
* @param dbName
* Name of the database
* @param tables
* List of table names
* @param result
* Add any found tables to this
* @throws IOException
* Most likely filesystem related
* @throws MetaException
* Failed to get required information from the metastore.
* @throws NoSuchObjectException
* Failed to get required information from the metastore.
* @throws TException
* Thrift communication error.
*/
void findUnknownTables(String catName, String dbName, List<String> tables, CheckResult result)
throws IOException, MetaException, TException {
Set<Path> dbPaths = new HashSet<>();
Set<String> tableNames = new HashSet<>(tables);
for (String tableName : tables) {
Table table = getMsc().getTable(catName, dbName, tableName);
// hack, instead figure out a way to get the db paths
String isExternal = table.getParameters().get("EXTERNAL");
if (!"TRUE".equalsIgnoreCase(isExternal)) {
Path tablePath = getPath(table);
if (tablePath != null) {
dbPaths.add(tablePath.getParent());
}
}
}
for (Path dbPath : dbPaths) {
FileSystem fs = dbPath.getFileSystem(conf);
FileStatus[] statuses = fs.listStatus(dbPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
for (FileStatus status : statuses) {
if (status.isDirectory() && !tableNames.contains(status.getPath().getName())) {
result.getTablesNotInMs().add(status.getPath().getName());
}
}
}
}
/**
* Check the metastore for inconsistencies, data missing in either the
* metastore or on the dfs.
*
* @param catName
* name of the catalog, if not specified default catalog will be used.
* @param dbName
* Name of the database
* @param tableName
* Name of the table
* @param filterExp
* Filter expression which is used to prune th partition from the
* metastore and FileSystem.
* @param table Table we want to run the check for.
* @param result
* Result object
* @throws MetastoreException
* Failed to get required information from the metastore.
* @throws IOException
* Most likely filesystem related
* @throws MetaException
* Failed to get required information from the metastore.
*/
void checkTable(String catName, String dbName, String tableName, byte[] filterExp, Table table, CheckResult result)
throws MetaException, IOException, MetastoreException {
if (table == null) {
try {
table = getMsc().getTable(catName, dbName, tableName);
} catch (TException e) {
result.getTablesNotInMs().add(tableName);
return;
}
}
PartitionIterable parts;
if (isPartitioned(table)) {
if (filterExp != null) {
List<Partition> results = new ArrayList<>();
getPartitionListByFilterExp(getMsc(), table, filterExp,
MetastoreConf.getVar(conf, MetastoreConf.ConfVars.DEFAULTPARTITIONNAME), results);
parts = new PartitionIterable(results);
} else {
int batchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
if (batchSize > 0) {
parts = new PartitionIterable(getMsc(), table, batchSize);
} else {
List<Partition> loadedPartitions = getAllPartitionsOf(getMsc(), table);
parts = new PartitionIterable(loadedPartitions);
}
}
} else {
parts = new PartitionIterable(Collections.emptyList());
}
checkTable(table, parts, filterExp, result);
}
/**
* Check the metastore for inconsistencies, data missing in either the
* metastore or on the dfs.
*
* @param table
* Table to check
* @param parts
* Partitions to check
* @param result
* Result object
* @param filterExp
* Filter expression which is used to prune th partition from the
* metastore and FileSystem.
* @throws IOException
* Could not get information from filesystem
* @throws MetastoreException
* Could not create Partition object
*/
void checkTable(Table table, PartitionIterable parts, byte[] filterExp, CheckResult result) throws IOException,
MetastoreException, MetaException {
Path tablePath = getPath(table);
if (tablePath == null) {
return;
}
FileSystem fs = tablePath.getFileSystem(conf);
if (!fs.exists(tablePath)) {
result.getTablesNotOnFs().add(table.getTableName());
return;
}
Set<Path> partPaths = new HashSet<>();
// check that the partition folders exist on disk
for (Partition partition : parts) {
if (partition == null) {
// most likely the user specified an invalid partition
continue;
}
Path partPath = getDataLocation(table, partition);
if (partPath == null) {
continue;
}
fs = partPath.getFileSystem(conf);
CheckResult.PartitionResult prFromMetastore = new CheckResult.PartitionResult();
prFromMetastore.setPartitionName(getPartitionName(table, partition));
prFromMetastore.setTableName(partition.getTableName());
if (!fs.exists(partPath)) {
result.getPartitionsNotOnFs().add(prFromMetastore);
} else {
result.getCorrectPartitions().add(prFromMetastore);
}
if (partitionExpirySeconds > 0) {
long currentEpochSecs = Instant.now().getEpochSecond();
long createdTime = partition.getCreateTime();
long partitionAgeSeconds = currentEpochSecs - createdTime;
if (partitionAgeSeconds > partitionExpirySeconds) {
CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
pr.setPartitionName(getPartitionName(table, partition));
pr.setTableName(partition.getTableName());
result.getExpiredPartitions().add(pr);
if (LOG.isDebugEnabled()) {
LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s expiry: {}s", partition.getCatName(),
partition.getDbName(), partition.getTableName(), pr.getPartitionName(), createdTime, currentEpochSecs,
partitionAgeSeconds, partitionExpirySeconds);
}
}
}
for (int i = 0; i < getPartitionSpec(table, partition).size(); i++) {
Path qualifiedPath = partPath.makeQualified(fs);
partPaths.add(qualifiedPath);
partPath = partPath.getParent();
}
}
findUnknownPartitions(table, partPaths, filterExp, result);
if (!isPartitioned(table) && TxnUtils.isTransactionalTable(table)) {
// Check for writeIds in the table directory
CheckResult.PartitionResult tableResult = new CheckResult.PartitionResult();
setMaxTxnAndWriteIdFromPartition(tablePath, tableResult);
result.setMaxWriteId(tableResult.getMaxWriteId());
result.setMaxTxnId(tableResult.getMaxTxnId());
}
}
/**
* Find partitions on the fs that are unknown to the metastore.
*
* @param table
* Table where the partitions would be located
* @param partPaths
* Paths of the partitions the ms knows about
* @param filterExp
* Filter expression which is used to prune th partition from the
* metastore and FileSystem.
* @param result
* Result object
* @throws IOException
* Thrown if we fail at fetching listings from the fs.
* @throws MetastoreException ex
*/
void findUnknownPartitions(Table table, Set<Path> partPaths, byte[] filterExp,
CheckResult result) throws IOException, MetastoreException, MetaException {
Path tablePath = getPath(table);
if (tablePath == null) {
return;
}
boolean transactionalTable = TxnUtils.isTransactionalTable(table);
// now check the table folder and see if we find anything
// that isn't in the metastore
Set<Path> allPartDirs = new HashSet<>();
List<FieldSchema> partColumns = table.getPartitionKeys();
checkPartitionDirs(tablePath, allPartDirs, Collections.unmodifiableList(getPartColNames(table)));
if (filterExp != null) {
PartitionExpressionProxy expressionProxy = createExpressionProxy(conf);
List<String> paritions = new ArrayList<>();
Set<Path> partDirs = new HashSet<Path>();
String tablePathStr = tablePath.toString();
for (Path path : allPartDirs) {
// remove the table's path from the partition path
// eg: <tablePath>/p1=1/p2=2/p3=3 ---> p1=1/p2=2/p3=3
if (tablePathStr.endsWith("/")) {
paritions.add(path.toString().substring(tablePathStr.length()));
} else {
paritions.add(path.toString().substring(tablePathStr.length() + 1));
}
}
// Remove all partition paths which does not matches the filter expression.
expressionProxy.filterPartitionsByExpr(partColumns, filterExp,
conf.get(MetastoreConf.ConfVars.DEFAULTPARTITIONNAME.getVarname()), paritions);
// now the partition list will contain all the paths that matches the filter expression.
// add them back to partDirs.
for (String path : paritions) {
partDirs.add(new Path(tablePath, path));
}
allPartDirs = partDirs;
}
// don't want the table dir
allPartDirs.remove(tablePath);
// remove the partition paths we know about
allPartDirs.removeAll(partPaths);
Set<String> partColNames = Sets.newHashSet();
for(FieldSchema fSchema : getPartCols(table)) {
partColNames.add(fSchema.getName());
}
Map<String, String> partitionColToTypeMap = getPartitionColtoTypeMap(table.getPartitionKeys());
// we should now only have the unexpected folders left
for (Path partPath : allPartDirs) {
FileSystem fs = partPath.getFileSystem(conf);
String partitionName = getPartitionName(fs.makeQualified(tablePath),
partPath, partColNames, partitionColToTypeMap);
LOG.debug("PartitionName: " + partitionName);
if (partitionName != null) {
CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
pr.setPartitionName(partitionName);
pr.setTableName(table.getTableName());
// Also set the correct partition path here as creating path from Warehouse.makePartPath will always return
// lowercase keys/path. Even if we add the new partition with lowerkeys, get queries on such partition
// will not return any results.
pr.setPath(partPath);
// Check if partition already exists. No need to check for those partition which are present in db
// but no in fs as msck will override the partition location in db
if (result.getCorrectPartitions().contains(pr)) {
String msg = "The partition '" + pr.toString() + "' already exists for table" + table.getTableName();
throw new MetastoreException(msg);
} else if (result.getPartitionsNotInMs().contains(pr)) {
String msg = "Found two paths for same partition '" + pr.toString() + "' for table " + table.getTableName();
throw new MetastoreException(msg);
}
if (transactionalTable) {
setMaxTxnAndWriteIdFromPartition(partPath, pr);
}
result.getPartitionsNotInMs().add(pr);
if (result.getPartitionsNotOnFs().contains(pr)) {
result.getPartitionsNotOnFs().remove(pr);
}
}
}
LOG.debug("Number of partitions not in metastore : " + result.getPartitionsNotInMs().size());
}
/**
* Calculate the maximum seen writeId from the acid directory structure
* @param partPath Path of the partition directory
* @param res Partition result to write the max ids
* @throws IOException ex
*/
private void setMaxTxnAndWriteIdFromPartition(Path partPath, CheckResult.PartitionResult res) throws IOException {
FileSystem fs = partPath.getFileSystem(conf);
FileStatus[] deltaOrBaseFiles = fs.listStatus(partPath, HIDDEN_FILES_PATH_FILTER);
// Read the writeIds from every base and delta directory and find the max
long maxWriteId = 0L;
long maxVisibilityId = 0L;
for (FileStatus fileStatus : deltaOrBaseFiles) {
if (!fileStatus.isDirectory()) {
continue;
}
long writeId = 0L;
long visibilityId = 0L;
String folder = fileStatus.getPath().getName();
String visParts[] = folder.split(VISIBILITY_PREFIX);
if (visParts.length > 1) {
visibilityId = Long.parseLong(visParts[1]);
folder = visParts[0];
}
if (folder.startsWith(BASE_PREFIX)) {
writeId = Long.parseLong(folder.substring(BASE_PREFIX.length()));
} else if (folder.startsWith(DELTA_PREFIX) || folder.startsWith(DELETE_DELTA_PREFIX)) {
// See AcidUtils.parseDelta
boolean isDeleteDelta = folder.startsWith(DELETE_DELTA_PREFIX);
String rest = folder.substring((isDeleteDelta ? DELETE_DELTA_PREFIX : DELTA_PREFIX).length());
String[] nameParts = rest.split("_");
// We always want the second part (it is either the same or greater if it is a compacted delta)
writeId = Long.parseLong(nameParts.length > 1 ? nameParts[1] : nameParts[0]);
}
if (writeId > maxWriteId) {
maxWriteId = writeId;
}
if (visibilityId > maxVisibilityId) {
maxVisibilityId = visibilityId;
}
}
LOG.debug("Max writeId {}, max txnId {} found in partition {}", maxWriteId, maxVisibilityId,
partPath.toUri().toString());
res.setMaxWriteId(maxWriteId);
res.setMaxTxnId(maxVisibilityId);
}
/**
* Assume that depth is 2, i.e., partition columns are a and b
* tblPath/a=1 => throw exception
* tblPath/a=1/file => throw exception
* tblPath/a=1/b=2/file => return a=1/b=2
* tblPath/a=1/b=2/c=3 => return a=1/b=2
* tblPath/a=1/b=2/c=3/file => return a=1/b=2
*
* @param basePath
* Start directory
* @param allDirs
* This set will contain the leaf paths at the end.
* @param partColNames
* Partition column names
* @throws IOException
* Thrown if we can't get lists from the fs.
* @throws MetastoreException ex
*/
private void checkPartitionDirs(Path basePath, Set<Path> allDirs, final List<String> partColNames)
throws IOException, MetastoreException {
// Here we just reuse the THREAD_COUNT configuration for
// METASTORE_FS_HANDLER_THREADS_COUNT since this results in better performance
// The number of missing partitions discovered are later added by metastore using a
// threadpool of size METASTORE_FS_HANDLER_THREADS_COUNT. If we have different sized
// pool here the smaller sized pool of the two becomes a bottleneck
int poolSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.FS_HANDLER_THREADS_COUNT);
ExecutorService executor;
if (poolSize <= 1) {
LOG.debug("Using single-threaded version of MSCK-GetPaths");
executor = MoreExecutors.newDirectExecutorService();
} else {
LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads " + poolSize);
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build();
executor = Executors.newFixedThreadPool(poolSize, threadFactory);
}
checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), partColNames);
executor.shutdown();
}
private final class PathDepthInfoCallable implements Callable<Path> {
private final List<String> partColNames;
private final FileSystem fs;
private final ConcurrentLinkedQueue<PathDepthInfo> pendingPaths;
private final boolean throwException;
private final PathDepthInfo pd;
private PathDepthInfoCallable(PathDepthInfo pd, List<String> partColNames, FileSystem fs,
ConcurrentLinkedQueue<PathDepthInfo> basePaths) {
this.partColNames = partColNames;
this.pd = pd;
this.fs = fs;
this.pendingPaths = basePaths;
this.throwException = "throw".equals(MetastoreConf.getVar(conf, MetastoreConf.ConfVars.MSCK_PATH_VALIDATION));
}
@Override
public Path call() throws Exception {
return processPathDepthInfo(pd);
}
private Path processPathDepthInfo(final PathDepthInfo pd)
throws IOException, MetastoreException {
final Path currentPath = pd.p;
final int currentDepth = pd.depth;
if (currentDepth == partColNames.size()) {
return currentPath;
}
FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
// found no files under a sub-directory under table base path; it is possible that the table
// is empty and hence there are no partition sub-directories created under base path
if (fileStatuses.length == 0 && currentDepth > 0) {
// since maxDepth is not yet reached, we are missing partition
// columns in currentPath
logOrThrowExceptionWithMsg(
"MSCK is missing partition columns under " + currentPath.toString());
} else {
// found files under currentPath add them to the queue if it is a directory
for (FileStatus fileStatus : fileStatuses) {
if (!fileStatus.isDirectory()) {
// found a file at depth which is less than number of partition keys
logOrThrowExceptionWithMsg(
"MSCK finds a file rather than a directory when it searches for "
+ fileStatus.getPath().toString());
} else {
// found a sub-directory at a depth less than number of partition keys
// validate if the partition directory name matches with the corresponding
// partition colName at currentDepth
Path nextPath = fileStatus.getPath();
String[] parts = nextPath.getName().split("=");
if (parts.length != 2) {
logOrThrowExceptionWithMsg("Invalid partition name " + nextPath);
} else if (!parts[0].equalsIgnoreCase(partColNames.get(currentDepth))) {
logOrThrowExceptionWithMsg(
"Unexpected partition key " + parts[0] + " found at " + nextPath);
} else {
// add sub-directory to the work queue if maxDepth is not yet reached
pendingPaths.add(new PathDepthInfo(nextPath, currentDepth + 1));
}
}
}
}
return null;
}
private void logOrThrowExceptionWithMsg(String msg) throws MetastoreException {
if(throwException) {
throw new MetastoreException(msg);
} else {
LOG.warn(msg);
}
}
}
private static class PathDepthInfo {
private final Path p;
private final int depth;
PathDepthInfo(Path p, int depth) {
this.p = p;
this.depth = depth;
}
}
@VisibleForTesting
void checkPartitionDirs(final ExecutorService executor,
final Path basePath, final Set<Path> result,
final FileSystem fs, final List<String> partColNames) throws MetastoreException {
try {
Queue<Future<Path>> futures = new LinkedList<>();
ConcurrentLinkedQueue<PathDepthInfo> nextLevel = new ConcurrentLinkedQueue<>();
nextLevel.add(new PathDepthInfo(basePath, 0));
//Uses level parallel implementation of a bfs. Recursive DFS implementations
//have a issue where the number of threads can run out if the number of
//nested sub-directories is more than the pool size.
//Using a two queue implementation is simpler than one queue since then we will
//have to add the complex mechanisms to let the free worker threads know when new levels are
//discovered using notify()/wait() mechanisms which can potentially lead to bugs if
//not done right
while(!nextLevel.isEmpty()) {
ConcurrentLinkedQueue<PathDepthInfo> tempQueue = new ConcurrentLinkedQueue<>();
//process each level in parallel
while(!nextLevel.isEmpty()) {
futures.add(
executor.submit(new PathDepthInfoCallable(nextLevel.poll(), partColNames, fs, tempQueue)));
}
while(!futures.isEmpty()) {
Path p = futures.poll().get();
if (p != null) {
result.add(p);
}
}
//update the nextlevel with newly discovered sub-directories from the above
nextLevel = tempQueue;
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Exception received while listing partition directories", e);
executor.shutdownNow();
throw new MetastoreException(e.getCause());
}
}
}