blob: a9202bc1deae80dd138ab9403e1069f68306f5b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.metadata;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator.Feature;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.collections.Collectors;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.util.DrillVersionInfo;
import org.apache.drill.common.util.JacksonUtils;
import org.apache.drill.exec.serialization.PathSerDe;
import org.apache.drill.exec.store.TimedCallable;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.SUPPORTED_VERSIONS;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V4.ColumnMetadata_v4;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V4.ColumnTypeMetadata_v4;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V4.FileMetadata;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V4.MetadataSummary;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V4.ParquetFileAndRowCountMetadata;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V4.ParquetFileMetadata_v4;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V4.ParquetTableMetadata_v4;
/**
* This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig}. All the creation of
* parquet metadata cache using create api's are forced to happen using the process user since only that user will have
* write permission for the cache file
*/
public class Metadata {
private static final Logger logger = LoggerFactory.getLogger(Metadata.class);
public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata", ".drill.parquet_metadata.v2"};
public static final String OLD_METADATA_FILENAME = ".drill.parquet_metadata";
public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories";
public static final String METADATA_FILENAME = ".drill.parquet_file_metadata.v4";
public static final String METADATA_SUMMARY_FILENAME = ".drill.parquet_summary_metadata.v4";
public static final String[] CURRENT_METADATA_FILENAMES = {METADATA_SUMMARY_FILENAME, METADATA_FILENAME};
public static final Long DEFAULT_NULL_COUNT = 0L;
public static final Long NULL_COUNT_NOT_EXISTS = -1L;
private final ParquetReaderConfig readerConfig;
private ParquetTableMetadataBase parquetTableMetadata;
private ParquetTableMetadataDirs parquetTableMetadataDirs;
private Metadata(ParquetReaderConfig readerConfig) {
this.readerConfig = readerConfig;
}
/**
* Create the parquet metadata file for the directory at the given path, and for any subdirectories.
* @param fs file system
* @param path path
* @param readerConfig parquet reader configuration
* @param allColumnsInteresting if set, store column metadata for all the columns
* @param columnSet Set of columns for which column metadata has to be stored
*/
public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig,
boolean allColumnsInteresting, Set<SchemaPath> columnSet) throws IOException {
Metadata metadata = new Metadata(readerConfig);
metadata.createMetaFilesRecursivelyAsProcessUser(path, fs, allColumnsInteresting, columnSet, false );
}
/**
* Get the parquet metadata for the parquet files in the given directory, including those in subdirectories.
*
* @param fs file system
* @param path path
* @param readerConfig parquet reader configuration
*
* @return parquet table metadata
*/
public static ParquetTableMetadata_v4 getParquetTableMetadata(FileSystem fs, Path path, ParquetReaderConfig readerConfig) throws IOException {
Metadata metadata = new Metadata(readerConfig);
return metadata.getParquetTableMetadata(path, fs);
}
/**
* Get the parquet metadata for a list of parquet files.
*
* @param fileStatusMap file statuses and corresponding file systems
* @param readerConfig parquet reader configuration
* @return parquet table metadata
*/
public static ParquetTableMetadata_v4 getParquetTableMetadata(Map<FileStatus, FileSystem> fileStatusMap,
ParquetReaderConfig readerConfig) throws IOException {
Metadata metadata = new Metadata(readerConfig);
return metadata.getParquetTableMetadata(fileStatusMap);
}
/**
* Get the parquet metadata for the table by reading the metadata file
*
* @param fs current file system
* @param paths The path to the metadata file, located in the directory that contains the parquet files
* @param metaContext metadata context
* @param readerConfig parquet reader configuration
* @return parquet table metadata. Null if metadata cache is missing, unsupported or corrupted
*/
public static ParquetTableMetadataBase readBlockMeta(FileSystem fs,
List<Path> paths,
MetadataContext metaContext,
ParquetReaderConfig readerConfig) {
Metadata metadata = new Metadata(readerConfig);
if (paths.isEmpty()) {
metaContext.setMetadataCacheCorrupted(true);
}
for (Path path: paths) {
// readBlockMeta updates the metadataCacheCorrupted flag. So while reading the cache files
// metadataCacheCorrupted should be checked if any of the previous cache files are corrupted.
if (ignoreReadingMetadata(metaContext, path)) {
return null;
}
metadata.readBlockMeta(path, false, metaContext, fs);
}
return metadata.parquetTableMetadata;
}
/**
* Get the parquet metadata for all subdirectories by reading the metadata file
*
* @param fs current file system
* @param path The path to the metadata file, located in the directory that contains the parquet files
* @param metaContext metadata context
* @param readerConfig parquet reader configuration
* @return parquet metadata for a directory. Null if metadata cache is missing, unsupported or corrupted
*/
public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs,
Path path,
MetadataContext metaContext,
ParquetReaderConfig readerConfig) {
if (ignoreReadingMetadata(metaContext, path)) {
return null;
}
Metadata metadata = new Metadata(readerConfig);
metadata.readBlockMeta(path, true, metaContext, fs);
return metadata.parquetTableMetadataDirs;
}
/**
* Ignore reading metadata files, if metadata is missing, unsupported or corrupted
*
* @param metaContext Metadata context
* @param path The path to the metadata file, located in the directory that contains the parquet files
* @return true if parquet metadata is missing or corrupted, false otherwise
*/
private static boolean ignoreReadingMetadata(MetadataContext metaContext, Path path) {
if (metaContext.isMetadataCacheCorrupted()) {
logger.warn("Ignoring of reading '{}' metadata file. Parquet metadata cache files are unsupported or corrupted. " +
"Query performance may be slow. Make sure the cache files are up-to-date by running the 'REFRESH TABLE " +
"METADATA' command", path);
return true;
}
return false;
}
/**
* Wrapper which makes sure that in all cases metadata file is created as a process user no matter what the caller
* is passing.
* @param path to the directory of the parquet table
* @param fs file system
* @param allColumnsInteresting if set, store column metadata for all the columns
* @param columnSet Set of columns for which column metadata has to be stored
* @return Pair of parquet metadata. The left one is a parquet metadata for the table. The right one of the Pair is
* a metadata for all subdirectories (if they are present and there are no any parquet files in the
* {@code path} directory).
* @throws IOException if parquet metadata can't be serialized and written to the json file
*/
private Pair<ParquetTableMetadata_v4, ParquetTableMetadataDirs> createMetaFilesRecursivelyAsProcessUser(
Path path, FileSystem fs, boolean allColumnsInteresting, Set<SchemaPath> columnSet, boolean autoRefreshTriggered)
throws IOException {
final FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(),
fs.getConf());
// Get the allColumnsInteresting flag and columnSet from the existing summary if the auto refresh is triggered.
if (autoRefreshTriggered) {
allColumnsInteresting = isAllColumnsInteresting(fs, path, true);
columnSet = null;
if (!allColumnsInteresting) {
columnSet = getInterestingColumns(fs, path, true);
}
}
return createMetaFilesRecursively(path, processUserFileSystem, allColumnsInteresting, columnSet);
}
/**
* Create the parquet metadata files for the directory at the given path and for any subdirectories.
* Metadata cache files written to the disk contain relative paths. Returned Pair of metadata contains absolute paths.
*
* @param path to the directory of the parquet table
* @param fs file system
* @param allColumnsInteresting if set, store column metadata for all the columns
* @param columnSet Set of columns for which column metadata has to be stored
* @return Pair of parquet metadata. The left one is a parquet metadata for the table. The right one of the Pair is
* a metadata for all subdirectories (if they are present and there are no any parquet files in the
* {@code path} directory).
* @throws IOException if parquet metadata can't be serialized and written to the json file
*/
private Pair<ParquetTableMetadata_v4, ParquetTableMetadataDirs> createMetaFilesRecursively(
Path path, FileSystem fs, boolean allColumnsInteresting, Set<SchemaPath> columnSet) throws IOException {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
List<ParquetFileMetadata_v4> metaDataList = Lists.newArrayList();
List<Path> directoryList = Lists.newArrayList();
ConcurrentHashMap<ColumnTypeMetadata_v4.Key, ColumnTypeMetadata_v4> columnTypeInfoSet =
new ConcurrentHashMap<>();
FileStatus fileStatus = fs.getFileStatus(path);
long dirTotalRowCount = 0;
assert fileStatus.isDirectory() : "Expected directory";
final Map<FileStatus, FileSystem> childFiles = new LinkedHashMap<>();
for (final FileStatus file : DrillFileSystemUtil.listAll(fs, path, false)) {
if (file.isDirectory()) {
ParquetTableMetadata_v4 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs, allColumnsInteresting, columnSet)).getLeft();
ConcurrentHashMap<ColumnTypeMetadata_v4.Key, ColumnTypeMetadata_v4> subTableColumnTypeInfo = subTableMetadata.getColumnTypeInfoMap();
metaDataList.addAll((List<ParquetFileMetadata_v4>) subTableMetadata.getFiles());
directoryList.addAll(subTableMetadata.getDirectories());
directoryList.add(file.getPath());
//TODO: We need a merge method that merges two columns with the same name but different types
if (columnTypeInfoSet.isEmpty()) {
columnTypeInfoSet.putAll(subTableColumnTypeInfo);
} else {
for (ColumnTypeMetadata_v4.Key key : subTableColumnTypeInfo.keySet()) {
ColumnTypeMetadata_v4 columnTypeMetadata_v4 = columnTypeInfoSet.get(key);
if (columnTypeMetadata_v4 == null) {
columnTypeMetadata_v4 = subTableColumnTypeInfo.get(key);
} else {
// If the existing total null count or the null count of the child file is unknown(-1), update the total null count
// as unknown
if (subTableColumnTypeInfo.get(key).totalNullCount < 0 || columnTypeMetadata_v4.totalNullCount < 0) {
columnTypeMetadata_v4.totalNullCount = NULL_COUNT_NOT_EXISTS;
} else {
columnTypeMetadata_v4.totalNullCount = columnTypeMetadata_v4.totalNullCount + subTableColumnTypeInfo.get(key).totalNullCount;
}
}
columnTypeInfoSet.put(key, columnTypeMetadata_v4);
}
}
dirTotalRowCount = dirTotalRowCount + subTableMetadata.getTotalRowCount();
} else {
childFiles.put(file, fs);
}
}
Metadata_V4.MetadataSummary metadataSummary = new Metadata_V4.MetadataSummary(SUPPORTED_VERSIONS.last().toString(),
DrillVersionInfo.getVersion(), allColumnsInteresting || columnSet == null);
ParquetTableMetadata_v4 parquetTableMetadata = new ParquetTableMetadata_v4(metadataSummary);
if (childFiles.size() > 0) {
List<ParquetFileAndRowCountMetadata> childFileAndRowCountMetadata = getParquetFileMetadata_v4(parquetTableMetadata, childFiles, allColumnsInteresting, columnSet);
// If the columnTypeInfoSet is empty, add the columnTypeInfo from the parquetTableMetadata
if (columnTypeInfoSet.isEmpty()) {
columnTypeInfoSet.putAll(parquetTableMetadata.getColumnTypeInfoMap());
}
for (ParquetFileAndRowCountMetadata parquetFileAndRowCountMetadata : childFileAndRowCountMetadata) {
metaDataList.add(parquetFileAndRowCountMetadata.getFileMetadata());
dirTotalRowCount = dirTotalRowCount + parquetFileAndRowCountMetadata.getFileRowCount();
Map<ColumnTypeMetadata_v4.Key, Long> totalNullCountMap = parquetFileAndRowCountMetadata.getTotalNullCountMap();
for (ColumnTypeMetadata_v4.Key column: totalNullCountMap.keySet()) {
ColumnTypeMetadata_v4 columnTypeMetadata_v4 = columnTypeInfoSet.get(column);
// If the column is not present in columnTypeInfoSet, get it from parquetTableMetadata
if (columnTypeMetadata_v4 == null) {
columnTypeMetadata_v4 = parquetTableMetadata.getColumnTypeInfoMap().get(column);
}
// If the existing total null count or the null count of the child file is unknown(-1), update the total null count
// as unknown
if ( columnTypeMetadata_v4.totalNullCount < 0 || totalNullCountMap.get(column) < 0) {
columnTypeMetadata_v4.totalNullCount = NULL_COUNT_NOT_EXISTS;
} else {
columnTypeMetadata_v4.totalNullCount += totalNullCountMap.get(column);
}
columnTypeInfoSet.put(column, columnTypeMetadata_v4);
}
}
}
metadataSummary.directories = directoryList;
parquetTableMetadata.assignFiles(metaDataList);
// TODO: We need a merge method that merges two columns with the same name but different types
if (metadataSummary.columnTypeInfo == null) {
metadataSummary.columnTypeInfo = new ConcurrentHashMap<>();
}
metadataSummary.columnTypeInfo.putAll(columnTypeInfoSet);
metadataSummary.allColumnsInteresting = allColumnsInteresting;
metadataSummary.totalRowCount = dirTotalRowCount;
parquetTableMetadata.metadataSummary = metadataSummary;
for (String oldName : OLD_METADATA_FILENAMES) {
fs.delete(new Path(path, oldName), false);
}
// relative paths in the metadata are only necessary for meta cache files.
ParquetTableMetadata_v4 metadataTableWithRelativePaths =
MetadataPathUtils.createMetadataWithRelativePaths(parquetTableMetadata, path);
writeFile(metadataTableWithRelativePaths.fileMetadata, new Path(path, METADATA_FILENAME), fs);
writeFile(metadataTableWithRelativePaths.getSummary(), new Path(path, METADATA_SUMMARY_FILENAME), fs);
Metadata_V4.MetadataSummary metadataSummaryWithRelativePaths = metadataTableWithRelativePaths.getSummary();
// Directories list will be empty at the leaf level directories. For sub-directories with both files and directories,
// only the directories will be included in the list.
writeFile(new ParquetTableMetadataDirs(metadataSummaryWithRelativePaths.directories),
new Path(path, METADATA_DIRECTORIES_FILENAME), fs);
if (timer != null) {
logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
}
return Pair.of(parquetTableMetadata, new ParquetTableMetadataDirs(directoryList));
}
/**
* Get the parquet metadata for the parquet files in a directory.
*
* @param path the path of the directory
* @return metadata object for an entire parquet directory structure
* @throws IOException in case of problems during accessing files
*/
private ParquetTableMetadata_v4 getParquetTableMetadata(Path path, FileSystem fs) throws IOException {
FileStatus fileStatus = fs.getFileStatus(path);
Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
List<FileStatus> fileStatuses = new ArrayList<>();
if (fileStatus.isFile()) {
fileStatuses.add(fileStatus);
} else {
// the thing we need!?
fileStatuses.addAll(DrillFileSystemUtil.listFiles(fs, path, true));
}
if (watch != null) {
logger.debug("Took {} ms to get file statuses", watch.elapsed(TimeUnit.MILLISECONDS));
watch.reset();
watch.start();
}
Map<FileStatus, FileSystem> fileStatusMap = fileStatuses.stream()
.collect(
java.util.stream.Collectors.toMap(
Function.identity(),
s -> fs,
(oldFs, newFs) -> newFs,
LinkedHashMap::new));
ParquetTableMetadata_v4 metadata_v4 = getParquetTableMetadata(fileStatusMap);
if (watch != null) {
logger.debug("Took {} ms to read file metadata", watch.elapsed(TimeUnit.MILLISECONDS));
watch.stop();
}
return metadata_v4;
}
/**
* Get the parquet metadata for a list of parquet files
*
* @param fileStatusMap file statuses and corresponding file systems
* @return parquet table metadata object
* @throws IOException if parquet file metadata can't be obtained
*/
private ParquetTableMetadata_v4 getParquetTableMetadata(Map<FileStatus, FileSystem> fileStatusMap)
throws IOException {
Metadata_V4.MetadataSummary tableMetadataSummary = new Metadata_V4.MetadataSummary(SUPPORTED_VERSIONS.last().toString(), DrillVersionInfo.getVersion(), new ArrayList<>(), true);
ParquetTableMetadata_v4 tableMetadata = new ParquetTableMetadata_v4(tableMetadataSummary);
List<ParquetFileAndRowCountMetadata> parquetFileAndRowCountMetadata = getParquetFileMetadata_v4(tableMetadata, fileStatusMap, true, null);
List<ParquetFileMetadata_v4> parquetFileMetadata = new ArrayList<>();
for (ParquetFileAndRowCountMetadata fileAndGlobalMetadata : parquetFileAndRowCountMetadata) {
parquetFileMetadata.add(fileAndGlobalMetadata.getFileMetadata());
}
tableMetadata.assignFiles(parquetFileMetadata);
return tableMetadata;
}
/**
* Get a list of file metadata for a list of parquet files
*
* @param parquetTableMetadata_v4 can store column schema info from all the files and row groups
* @param fileStatusMap parquet files statuses and corresponding file systems
*
* @param allColumnsInteresting if set, store column metadata for all the columns
* @param columnSet Set of columns for which column metadata has to be stored
* @return list of the parquet file metadata with absolute paths
* @throws IOException is thrown in case of issues while executing the list of runnables
*/
private List<ParquetFileAndRowCountMetadata> getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata_v4,
Map<FileStatus, FileSystem> fileStatusMap, boolean allColumnsInteresting, Set<SchemaPath> columnSet) throws IOException {
return TimedCallable.run("Fetch parquet metadata", logger,
Collectors.toList(fileStatusMap,
(fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v4, fileStatus, fileSystem, allColumnsInteresting, columnSet)),
16
);
}
/**
* TimedRunnable that reads the footer from parquet and collects file metadata
*/
private class MetadataGatherer extends TimedCallable<ParquetFileAndRowCountMetadata> {
private final ParquetTableMetadata_v4 parquetTableMetadata;
private final FileStatus fileStatus;
private final FileSystem fs;
private final boolean allColumnsInteresting;
private final Set<SchemaPath> columnSet;
MetadataGatherer(ParquetTableMetadata_v4 parquetTableMetadata, FileStatus fileStatus, FileSystem fs,
boolean allColumnsInteresting, Set<SchemaPath> columnSet) {
this.parquetTableMetadata = parquetTableMetadata;
this.fileStatus = fileStatus;
this.fs = fs;
this.allColumnsInteresting = allColumnsInteresting;
this.columnSet = columnSet;
}
@Override
protected ParquetFileAndRowCountMetadata runInner() throws Exception {
return getParquetFileMetadata_v4(parquetTableMetadata, fileStatus, fs, allColumnsInteresting, columnSet, readerConfig);
}
public String toString() {
return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path", fileStatus.getPath()).toString();
}
}
// A private version of the following static method, with no footer given
private ParquetFileAndRowCountMetadata getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata,
FileStatus file, final FileSystem fs, boolean allColumnsInteresting, Set<SchemaPath> columnSet, ParquetReaderConfig readerConfig)
throws IOException, InterruptedException {
return getParquetFileMetadata_v4(parquetTableMetadata, null /* no footer */,
file, fs, allColumnsInteresting, false, columnSet, readerConfig);
}
/**
* Get the file metadata for a single file
*
* @param parquetTableMetadata The table metadata to be updated with all the columns' info
* @param footer If non null, use this footer instead of reading it from the file
* @param file The file
* @param allColumnsInteresting If true, read the min/max metadata for all the columns
* @param skipNonInteresting If true, collect info only for the interesting columns
* @param columnSet Specifies specific columns for which min/max metadata is collected
* @param readerConfig for the options
* @return the file metadata
*/
public static ParquetFileAndRowCountMetadata getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata,
ParquetMetadata footer,
FileStatus file,
FileSystem fs,
boolean allColumnsInteresting,
boolean skipNonInteresting,
Set<SchemaPath> columnSet,
ParquetReaderConfig readerConfig)
throws IOException, InterruptedException {
ParquetMetadata metadata = footer; // if a non-null footer is given, no need to read it again from the file
if (metadata == null) {
UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI();
Configuration conf = new Configuration(fs.getConf());
try {
metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>) () -> {
try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), readerConfig.toReadOptions())) {
return parquetFileReader.getFooter();
}
});
} catch (Exception e) {
logger.error("Exception while reading footer of parquet file [Details - path: {}, owner: {}] as process user {}",
file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e);
throw e;
}
}
FileMetadataCollector metadataCollector = new FileMetadataCollector(metadata, file, fs, allColumnsInteresting,
skipNonInteresting, columnSet, readerConfig);
parquetTableMetadata.metadataSummary.columnTypeInfo.putAll(metadataCollector.getColumnTypeInfo());
return metadataCollector.getFileMetadata();
}
/**
* Serialize parquet metadata to json and write to a file.
*
* @param parquetMetadata parquet table or directory metadata
* @param p file path
* @param fs Drill file system
* @throws IOException if metadata can't be serialized
*/
private void writeFile(Object parquetMetadata, Path p, FileSystem fs) throws IOException {
JsonFactory jsonFactory = new JsonFactory();
jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
SimpleModule module = new SimpleModule();
module.addSerializer(Path.class, new PathSerDe.Se());
if (parquetMetadata instanceof Metadata_V4.FileMetadata) {
module.addSerializer(ColumnMetadata_v4.class, new ColumnMetadata_v4.Serializer());
}
ObjectMapper mapper = JacksonUtils.createJsonMapperBuilder(jsonFactory)
.addModule(module)
.build();
OutputStream os = fs.create(p);
mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetMetadata);
os.flush();
os.close();
}
/**
* Read the parquet metadata from a file
*
* @param path to metadata file
* @param dirsOnly true for {@link Metadata#METADATA_DIRECTORIES_FILENAME}
* or false for {@link Metadata#OLD_METADATA_FILENAME} files reading
* @param metaContext current metadata context
*/
private void readBlockMeta(Path path, boolean dirsOnly, MetadataContext metaContext, FileSystem fs) {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
Path metadataParentDir = Path.getPathWithoutSchemeAndAuthority(path.getParent());
String metadataParentDirPath = metadataParentDir.toUri().getPath();
ObjectMapper mapper = JacksonUtils.createObjectMapper();
final SimpleModule serialModule = new SimpleModule();
serialModule.addDeserializer(SchemaPath.class, new SchemaPath.De());
serialModule.addKeyDeserializer(Metadata_V2.ColumnTypeMetadata_v2.Key.class, new Metadata_V2.ColumnTypeMetadata_v2.Key.DeSerializer());
serialModule.addKeyDeserializer(Metadata_V3.ColumnTypeMetadata_v3.Key.class, new Metadata_V3.ColumnTypeMetadata_v3.Key.DeSerializer());
serialModule.addKeyDeserializer(ColumnTypeMetadata_v4.Key.class, new ColumnTypeMetadata_v4.Key.DeSerializer());
AfterburnerModule module = new AfterburnerModule();
module.setUseOptimizedBeanDeserializer(true);
boolean isFileMetadata = path.toString().endsWith(METADATA_FILENAME);
boolean isSummaryFile = path.toString().endsWith(METADATA_SUMMARY_FILENAME);
mapper.registerModule(serialModule);
mapper.registerModule(module);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
try (InputStream is = fs.open(path)) {
boolean alreadyCheckedModification;
boolean newMetadata = false;
alreadyCheckedModification = metaContext.getStatus(metadataParentDir);
if (dirsOnly) {
parquetTableMetadataDirs = mapper.readValue(is, ParquetTableMetadataDirs.class);
if (timer != null) {
logger.debug("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
}
parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath);
if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) {
parquetTableMetadataDirs =
(createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null, true)).getRight();
newMetadata = true;
}
} else {
if (isFileMetadata) {
parquetTableMetadata.assignFiles((mapper.readValue(is, FileMetadata.class)).getFiles());
if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).isAtLeast(4, 0)) {
((ParquetTableMetadata_v4) parquetTableMetadata).updateRelativePaths(metadataParentDirPath);
}
if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) {
parquetTableMetadata =
(createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null, true)).getLeft();
newMetadata = true;
}
} else if (isSummaryFile) {
MetadataSummary metadataSummary = mapper.readValue(is, Metadata_V4.MetadataSummary.class);
parquetTableMetadata = new ParquetTableMetadata_v4(metadataSummary);
} else {
parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class);
if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).isAtLeast(3, 0)) {
((Metadata_V3.ParquetTableMetadata_v3) parquetTableMetadata).updateRelativePaths(metadataParentDirPath);
}
if (!alreadyCheckedModification && tableModified((parquetTableMetadata.getDirectories()), path, metadataParentDir, metaContext, fs)) {
parquetTableMetadata =
(createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null, true)).getLeft();
newMetadata = true;
}
}
if (timer != null) {
logger.debug("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
}
if (!isSummaryFile) {
List<? extends ParquetFileMetadata> files = parquetTableMetadata.getFiles();
if (files != null) {
for (ParquetFileMetadata file : files) {
// DRILL-5009: Remove empty row groups unless it is the only row group
List<? extends RowGroupMetadata> rowGroups = file.getRowGroups();
if (rowGroups.size() == 1) {
continue;
}
rowGroups.removeIf(r -> r.getRowCount() == 0);
}
}
}
if (newMetadata) {
// if new metadata files were created, invalidate the existing metadata context
metaContext.clear();
}
}
} catch (IOException e) {
logger.error("Failed to read '{}' metadata file", path, e);
metaContext.setMetadataCacheCorrupted(true);
}
}
private Set<SchemaPath> getInterestingColumns(FileSystem fs, Path metadataParentDir, boolean autoRefreshTriggered) {
Metadata_V4.MetadataSummary metadataSummary = getSummary(fs, metadataParentDir, autoRefreshTriggered, null);
if (metadataSummary == null) {
return null;
} else {
Set<SchemaPath> interestingColumns = new HashSet<>();
for (ColumnTypeMetadata_v4 columnTypeMetadata_v4: metadataSummary.columnTypeInfo.values()) {
if (columnTypeMetadata_v4.isInteresting) {
interestingColumns.add(SchemaPath.getSimplePath(SchemaPath.getCompoundPath(columnTypeMetadata_v4.name).getRootSegmentPath()));
}
}
return interestingColumns;
}
}
private boolean isAllColumnsInteresting(FileSystem fs, Path metadataParentDir, boolean autoRefreshTriggered) {
Metadata_V4.MetadataSummary metadataSummary = getSummary(fs, metadataParentDir, autoRefreshTriggered, null);
if (metadataSummary == null) {
return true;
}
return metadataSummary.isAllColumnsInteresting();
}
public static Path getSummaryFileName(Path metadataParentDir) {
return new Path(metadataParentDir, METADATA_SUMMARY_FILENAME);
}
public static Path getDirFileName(Path metadataParentDir) {
return new Path(metadataParentDir, METADATA_DIRECTORIES_FILENAME);
}
private static Path getFileMetadataFileName(Path metadataParentDir) {
return new Path(metadataParentDir, METADATA_FILENAME);
}
/**
* Returns if metadata exists or not in that directory
* @param fs file system
* @param metadataParentDir parent directory that holds metadata files
* @return true if metadata exists in that directory
* @throws IOException when unable to check metadata exists status
*/
private static boolean metadataExists(FileSystem fs, Path metadataParentDir) throws IOException {
Path summaryFile = new Path(metadataParentDir, METADATA_SUMMARY_FILENAME);
Path metadataDirFile = new Path(metadataParentDir, METADATA_DIRECTORIES_FILENAME);
Path fileMetadataFile = new Path(metadataParentDir, METADATA_FILENAME);
if (!fs.exists(summaryFile) && !fs.exists(metadataDirFile) && !fs.exists(fileMetadataFile)) {
return false;
}
return true;
}
/**
* Reads the summary from the metadata cache file, if the cache file is stale recreates the metadata
* @param fs file system
* @param metadataParentDir parent directory that holds metadata files
* @param autoRefreshTriggered true if the auto-refresh is already triggered
* @param readerConfig Parquet reader config
* @return returns metadata summary
*/
public static Metadata_V4.MetadataSummary getSummary(FileSystem fs, Path metadataParentDir, boolean autoRefreshTriggered, ParquetReaderConfig readerConfig) {
Path summaryFile = getSummaryFileName(metadataParentDir);
Path metadataDirFile = getDirFileName(metadataParentDir);
MetadataContext metaContext = new MetadataContext();
try {
// If autoRefresh is not triggered and none of the metadata files exist
if (!autoRefreshTriggered && !metadataExists(fs, metadataParentDir)) {
logger.debug("Metadata doesn't exist in {}", metadataParentDir);
return null;
} else if (autoRefreshTriggered && !fs.exists(summaryFile)) {
logger.debug("Metadata Summary file {} does not exist", summaryFile);
return null;
} else {
// If the autorefresh is not triggered, check if the cache file is stale and trigger auto-refresh
if (!autoRefreshTriggered) {
Metadata metadata = new Metadata(readerConfig);
if (!fs.exists(metadataDirFile)) {
return null;
}
ParquetTableMetadataDirs metadataDirs = readMetadataDirs(fs, metadataDirFile, metaContext, readerConfig);
if (metadata.tableModified(metadataDirs.getDirectories(), summaryFile, metadataParentDir, metaContext, fs) && true) {
ParquetTableMetadata_v4 parquetTableMetadata = (metadata.createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(summaryFile.getParent()), fs, true, null, true)).getLeft();
return parquetTableMetadata.getSummary();
}
}
// Read the existing metadataSummary cache file to get the metadataSummary
final SimpleModule serialModule = new SimpleModule();
serialModule.addDeserializer(SchemaPath.class, new SchemaPath.De());
serialModule.addKeyDeserializer(ColumnTypeMetadata_v4.Key.class, new ColumnTypeMetadata_v4.Key.DeSerializer());
AfterburnerModule module = new AfterburnerModule();
module.setUseOptimizedBeanDeserializer(true);
ObjectMapper mapper = JacksonUtils.createJsonMapperBuilder()
.addModule(serialModule)
.addModule(module)
.build();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
InputStream is = fs.open(summaryFile);
return mapper.readValue(is, Metadata_V4.MetadataSummary.class);
}
} catch (IOException e) {
logger.debug("Failed to read '{}' summary metadata file", summaryFile, e);
return null;
}
}
/**
* Check if the parquet metadata needs to be updated by comparing the modification time of the directories with
* the modification time of the metadata file
*
* @param directories List of directories
* @param metaFilePath path of parquet metadata cache file
* @return true if metadata needs to be updated, false otherwise
* @throws IOException if some resources are not accessible
*/
private boolean tableModified(List<Path> directories, Path metaFilePath, Path parentDir,
MetadataContext metaContext, FileSystem fs) throws IOException {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
metaContext.setStatus(parentDir);
long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime();
FileStatus directoryStatus = fs.getFileStatus(parentDir);
int numDirs = 1;
if (directoryStatus.getModificationTime() > metaFileModifyTime) {
return logAndStopTimer(true, directoryStatus.getPath().toString(), timer, numDirs);
}
boolean isModified = false;
for (Path directory : directories) {
numDirs++;
metaContext.setStatus(directory);
directoryStatus = fs.getFileStatus(directory);
if (directoryStatus.getModificationTime() > metaFileModifyTime) {
isModified = true;
break;
}
}
return logAndStopTimer(isModified, directoryStatus.getPath().toString(), timer, numDirs);
}
private boolean logAndStopTimer(boolean isModified, String directoryName,
Stopwatch timer, int numDirectories) {
if (timer != null) {
logger.debug("{} directory was modified. Took {} ms to check modification time of {} directories",
isModified ? directoryName : "No", timer.elapsed(TimeUnit.MILLISECONDS), numDirectories);
timer.stop();
}
return isModified;
}
}