blob: db6e0eb5373a6cf0f57fc36043596c60b8d498c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet;
import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.metastore.ParquetMetadataProvider;
import org.apache.drill.exec.metastore.ParquetTableMetadataProvider;
import org.apache.drill.exec.planner.common.DrillStatsTable;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
import org.apache.drill.exec.store.parquet.metadata.Metadata;
import org.apache.drill.exec.store.parquet.metadata.MetadataBase;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
public class ParquetTableMetadataProviderImpl extends BaseParquetMetadataProvider implements ParquetTableMetadataProvider {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetTableMetadataProviderImpl.class);
private final DrillFileSystem fs;
private final MetadataContext metaContext;
// may change when filter push down / partition pruning is applied
private Path selectionRoot;
private Path cacheFileRoot;
private final boolean corruptDatesAutoCorrected;
private boolean usedMetadataCache; // false by default
private ParquetTableMetadataProviderImpl(List<ReadEntryWithPath> entries,
Path selectionRoot,
Path cacheFileRoot,
ParquetReaderConfig readerConfig,
DrillFileSystem fs,
boolean autoCorrectCorruptedDates,
ParquetMetadataProvider source,
TupleMetadata schema,
DrillStatsTable statsTable) throws IOException {
super(entries, readerConfig, selectionRoot != null ? selectionRoot.toUri().getPath() : "", selectionRoot, schema, statsTable);
this.fs = fs;
this.selectionRoot = selectionRoot;
this.cacheFileRoot = cacheFileRoot;
this.metaContext = new MetadataContext();
this.corruptDatesAutoCorrected = autoCorrectCorruptedDates;
init((BaseParquetMetadataProvider) source);
}
private ParquetTableMetadataProviderImpl(FileSelection selection,
ParquetReaderConfig readerConfig,
DrillFileSystem fs,
boolean autoCorrectCorruptedDates,
ParquetMetadataProvider source,
TupleMetadata schema,
DrillStatsTable statsTable) throws IOException {
super(readerConfig, new ArrayList<>(),
selection.getSelectionRoot() != null ? selection.getSelectionRoot().toUri().getPath() : "", selection.getSelectionRoot(), schema, statsTable);
this.fs = fs;
this.selectionRoot = selection.getSelectionRoot();
this.cacheFileRoot = selection.getCacheFileRoot();
MetadataContext metadataContext = selection.getMetaContext();
this.metaContext = metadataContext != null ? metadataContext : new MetadataContext();
this.corruptDatesAutoCorrected = autoCorrectCorruptedDates;
FileSelection fileSelection = expandIfNecessary(selection);
if (fileSelection != null) {
if (checkForInitializingEntriesWithSelectionRoot()) {
// The fully expanded list is already stored as part of the fileSet
entries.add(new ReadEntryWithPath(fileSelection.getSelectionRoot()));
} else {
for (Path fileName : fileSelection.getFiles()) {
entries.add(new ReadEntryWithPath(fileName));
}
}
init((BaseParquetMetadataProvider) source);
}
}
@Override
public boolean isUsedMetadataCache() {
return usedMetadataCache;
}
@Override
public Path getSelectionRoot() {
return selectionRoot;
}
/**
* Returns list of metadata cache files
* @param p directory path of the cache file
* @param fs filesystem object
* @return list of cache files found in the given directory path
*/
public List<Path> populateMetaPaths(Path p, DrillFileSystem fs) throws IOException {
List<Path> metaFilepaths = new ArrayList<>();
for (String filename : Metadata.CURRENT_METADATA_FILENAMES) {
metaFilepaths.add(new Path(p, filename));
}
for (String filename : Metadata.OLD_METADATA_FILENAMES) {
// Read the older version of metadata file if the current version of metadata cache files donot exist.
if (fileExists(fs, metaFilepaths)) {
return metaFilepaths;
}
metaFilepaths.clear();
metaFilepaths.add(new Path(p, filename));
}
if (fileExists(fs, metaFilepaths)) {
return metaFilepaths;
}
return new ArrayList<>();
}
public boolean fileExists(DrillFileSystem fs, List<Path> paths) throws IOException {
for (Path path : paths) {
if (!fs.exists(path)) {
return false;
}
}
return true;
}
@Override
protected void initInternal() throws IOException {
try (FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf())) {
// Depending on the version of metadata this may represent more than 1 metadata file paths.
List<Path> metaPaths = new ArrayList<>();
if (entries.size() == 1 && parquetTableMetadata == null) {
Path p = Path.getPathWithoutSchemeAndAuthority(entries.get(0).getPath());
if (fs.isDirectory(p)) {
// Using the metadata file makes sense when querying a directory; otherwise
// if querying a single file we can look up the metadata directly from the file
metaPaths = populateMetaPaths(p, fs);
}
if (!metaContext.isMetadataCacheCorrupted() && !metaPaths.isEmpty()) {
parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPaths, metaContext, readerConfig);
if (parquetTableMetadata != null) {
usedMetadataCache = true;
}
}
if (!usedMetadataCache) {
parquetTableMetadata = Metadata.getParquetTableMetadata(processUserFileSystem, p.toString(), readerConfig);
}
} else {
Path p = Path.getPathWithoutSchemeAndAuthority(selectionRoot);
metaPaths = populateMetaPaths(p, fs);
if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(selectionRoot) && !metaPaths.isEmpty()) {
if (parquetTableMetadata == null) {
parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPaths, metaContext, readerConfig);
}
if (parquetTableMetadata != null) {
usedMetadataCache = true;
if (fileSet != null) {
parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
}
}
}
if (!usedMetadataCache) {
final List<FileStatus> fileStatuses = new ArrayList<>();
for (ReadEntryWithPath entry : entries) {
fileStatuses.addAll(
DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(entry.getPath()), true));
}
Map<FileStatus, FileSystem> statusMap = fileStatuses.stream()
.collect(
Collectors.toMap(
Function.identity(),
s -> processUserFileSystem,
(oldFs, newFs) -> newFs,
LinkedHashMap::new));
parquetTableMetadata = Metadata.getParquetTableMetadata(statusMap, readerConfig);
}
}
}
}
// private methods block start
/**
* Expands the selection's folders if metadata cache is found for the selection root.<br>
* If the selection has already been expanded or no metadata cache was found, does nothing
*
* @param selection actual selection before expansion
* @return new selection after expansion, if no expansion was done returns the input selection
*/
private FileSelection expandIfNecessary(FileSelection selection) throws IOException {
if (selection.isExpandedFully()) {
return selection;
}
// use the cacheFileRoot if provided (e.g after partition pruning)
Path path = cacheFileRoot != null ? cacheFileRoot : selectionRoot;
// Depending on the version of metadata this may represent more than 1 metadata file paths.
List<Path> metaPaths = populateMetaPaths(path, fs);
if (metaPaths.isEmpty()) { // no metadata cache
if (selection.isExpandedPartial()) {
logger.error("'{}' metadata file/files does not exist, but metadata directories cache file is present", metaPaths.toString());
metaContext.setMetadataCacheCorrupted(true);
}
return selection;
}
return expandSelectionFromMetadataCache(selection, metaPaths);
}
/**
* For two cases the entries should be initialized with just the selection root instead of the fully expanded list:
* <ul>
* <li> When metadata caching is corrupted (to use correct file selection)
* <li> Metadata caching is correct and used, but pruning was not applicable or was attempted and nothing was pruned
* (to reduce overhead in parquet group scan).
* </ul>
*
* @return true if entries should be initialized with selection root, false otherwise
*/
private boolean checkForInitializingEntriesWithSelectionRoot() {
return metaContext.isMetadataCacheCorrupted() || (parquetTableMetadata != null &&
(metaContext.getPruneStatus() == MetadataContext.PruneStatus.NOT_STARTED || metaContext.getPruneStatus() == MetadataContext.PruneStatus.NOT_PRUNED));
}
/**
* Create and return a new file selection based on reading the metadata cache file.
*
* This function also initializes a few of ParquetGroupScan's fields as appropriate.
*
* @param selection initial file selection
* @param metaFilePaths metadata cache file path
* @return file selection read from cache
*
* @throws org.apache.drill.common.exceptions.UserException when the updated selection is empty, this happens if the user selects an empty folder.
*/
private FileSelection expandSelectionFromMetadataCache(FileSelection selection, List<Path> metaFilePaths) throws IOException {
// get the metadata for the root directory by reading the metadata file
// parquetTableMetadata contains the metadata for all files in the selection root folder, but we need to make sure
// we only select the files that are part of selection (by setting fileSet appropriately)
// get (and set internal field) the metadata for the directory by reading the metadata file
FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf());
parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaFilePaths, metaContext, readerConfig);
if (ignoreExpandingSelection(parquetTableMetadata)) {
return selection;
}
if (corruptDatesAutoCorrected) {
ParquetReaderUtility.correctDatesInMetadataCache(this.parquetTableMetadata);
}
ParquetReaderUtility.transformBinaryInMetadataCache(parquetTableMetadata, readerConfig);
List<FileStatus> fileStatuses = selection.getStatuses(fs);
if (fileSet == null) {
fileSet = new HashSet<>();
}
final Path first = fileStatuses.get(0).getPath();
if (fileStatuses.size() == 1 && selection.getSelectionRoot().equals(first)) {
// we are selecting all files from selection root. Expand the file list from the cache
for (MetadataBase.ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
fileSet.add(file.getPath());
}
} else if (selection.isExpandedPartial() && !selection.hadWildcard() && cacheFileRoot != null) {
if (selection.wasAllPartitionsPruned()) {
// if all partitions were previously pruned, we only need to read 1 file (for the schema)
fileSet.add(this.parquetTableMetadata.getFiles().get(0).getPath());
} else {
// we are here if the selection is in the expanded_partial state (i.e it has directories). We get the
// list of files from the metadata cache file that is present in the cacheFileRoot directory and populate
// the fileSet. However, this is *not* the final list of files that will be scanned in execution since the
// second phase of partition pruning will apply on the files and modify the file selection appropriately.
for (MetadataBase.ParquetFileMetadata file : this.parquetTableMetadata.getFiles()) {
fileSet.add(file.getPath());
}
}
} else {
// we need to expand the files from fileStatuses
for (FileStatus status : fileStatuses) {
Path currentCacheFileRoot = status.getPath();
if (status.isDirectory()) {
// TODO [DRILL-4496] read the metadata cache files in parallel
// Depending on the version of metadata this may represent more than 1 metadata file paths.
List<Path> metaPaths = populateMetaPaths(currentCacheFileRoot, fs);
MetadataBase.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(processUserFileSystem, metaPaths, metaContext, readerConfig);
if (ignoreExpandingSelection(metadata)) {
return selection;
}
for (MetadataBase.ParquetFileMetadata file : metadata.getFiles()) {
fileSet.add(file.getPath());
}
} else {
final Path path = Path.getPathWithoutSchemeAndAuthority(currentCacheFileRoot);
fileSet.add(path);
}
}
}
if (fileSet.isEmpty()) {
// no files were found, most likely we tried to query some empty sub folders
logger.warn("The table is empty but with outdated invalid metadata cache files. Please, delete them.");
return null;
}
List<Path> fileNames = new ArrayList<>(fileSet);
// when creating the file selection, set the selection root without the URI prefix
// The reason is that the file names above have been created in the form
// /a/b/c.parquet and the format of the selection root must match that of the file names
// otherwise downstream operations such as partition pruning can break.
Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(selection.getSelectionRoot());
this.selectionRoot = metaRootPath;
// Use the FileSelection constructor directly here instead of the FileSelection.create() method
// because create() changes the root to include the scheme and authority; In future, if create()
// is the preferred way to instantiate a file selection, we may need to do something different...
// WARNING: file statuses and file names are inconsistent
FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath, cacheFileRoot,
selection.wasAllPartitionsPruned());
newSelection.setExpandedFully();
newSelection.setMetaContext(metaContext);
return newSelection;
}
private MetadataBase.ParquetTableMetadataBase removeUnneededRowGroups(MetadataBase.ParquetTableMetadataBase parquetTableMetadata) {
List<MetadataBase.ParquetFileMetadata> newFileMetadataList = new ArrayList<>();
for (MetadataBase.ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
if (fileSet.contains(file.getPath())) {
newFileMetadataList.add(file);
}
}
MetadataBase.ParquetTableMetadataBase metadata = parquetTableMetadata.clone();
metadata.assignFiles(newFileMetadataList);
return metadata;
}
/**
* If metadata is corrupted, ignore expanding selection and reset parquetTableMetadata and fileSet fields
*
* @param metadata parquet table metadata
* @return true if parquet metadata is corrupted, false otherwise
*/
private boolean ignoreExpandingSelection(MetadataBase.ParquetTableMetadataBase metadata) {
if (metadata == null || metaContext.isMetadataCacheCorrupted()) {
logger.debug("Selection can't be expanded since metadata file is corrupted or metadata version is not supported");
this.parquetTableMetadata = null;
this.fileSet = null;
return true;
}
return false;
}
public static class Builder implements ParquetFileTableMetadataProviderBuilder {
private final MetadataProviderManager metadataProviderManager;
private List<ReadEntryWithPath> entries;
private Path selectionRoot;
private Path cacheFileRoot;
private ParquetReaderConfig readerConfig;
private DrillFileSystem fs;
private boolean autoCorrectCorruptedDates;
private TupleMetadata schema;
private FileSelection selection;
public Builder(MetadataProviderManager source) {
this.metadataProviderManager = source;
}
@Override
public ParquetFileTableMetadataProviderBuilder withEntries(List<ReadEntryWithPath> entries) {
this.entries = entries;
return this;
}
@Override
public ParquetFileTableMetadataProviderBuilder withSelectionRoot(Path selectionRoot) {
this.selectionRoot = selectionRoot;
return this;
}
@Override
public ParquetFileTableMetadataProviderBuilder withCacheFileRoot(Path cacheFileRoot) {
this.cacheFileRoot = cacheFileRoot;
return this;
}
@Override
public ParquetFileTableMetadataProviderBuilder withReaderConfig(ParquetReaderConfig readerConfig) {
this.readerConfig = readerConfig;
return this;
}
@Override
public ParquetFileTableMetadataProviderBuilder withFileSystem(DrillFileSystem fs) {
this.fs = fs;
return this;
}
@Override
public ParquetFileTableMetadataProviderBuilder withCorrectCorruptedDates(boolean autoCorrectCorruptedDates) {
this.autoCorrectCorruptedDates = autoCorrectCorruptedDates;
return this;
}
@Override
public ParquetFileTableMetadataProviderBuilder withSelection(FileSelection selection) {
this.selection = selection;
return this;
}
@Override
public ParquetFileTableMetadataProviderBuilder withSchema(TupleMetadata schema) {
this.schema = schema;
return this;
}
@Override
public ParquetTableMetadataProvider build() throws IOException {
ParquetTableMetadataProviderImpl provider;
SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider();
ParquetMetadataProvider source = (ParquetTableMetadataProvider) metadataProviderManager.getTableMetadataProvider();
DrillStatsTable statsProvider = metadataProviderManager.getStatsProvider();
// schema passed into the builder has greater priority
TupleMetadata schema = null;
try {
if (this.schema != null) {
schema = this.schema;
} else {
schema = schemaProvider != null ? schemaProvider.read().getSchema() : null;
}
} catch (IOException e) {
logger.debug("Unable to deserialize schema from schema file for table: " + (selection == null ? selectionRoot : selection.selectionRoot), e);
}
if (entries != null) {
// reuse previously stored metadata
provider = new ParquetTableMetadataProviderImpl(entries, selectionRoot, cacheFileRoot, readerConfig, fs, autoCorrectCorruptedDates,
source, schema, statsProvider);
} else {
provider = new ParquetTableMetadataProviderImpl(selection, readerConfig, fs, autoCorrectCorruptedDates,
source, schema, statsProvider);
}
// store results into FileSystemMetadataProviderManager to be able to use them when creating new instances
if (source == null || source.getRowGroupsMeta().size() < provider.getRowGroupsMeta().size()) {
metadataProviderManager.setTableMetadataProvider(provider);
}
return provider;
}
}
}