blob: 869857e4d347d7b28719e2b8b9c52ec3ee586237 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This API is EXPERIMENTAL.
#pragma once
#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include "arrow/dataset/discovery.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/io/caching.h"
#include "arrow/util/optional.h"
namespace parquet {
class ParquetFileReader;
class Statistics;
class ColumnChunkMetaData;
class RowGroupMetaData;
class FileMetaData;
class FileDecryptionProperties;
class FileEncryptionProperties;
class ReaderProperties;
class ArrowReaderProperties;
class WriterProperties;
class ArrowWriterProperties;
namespace arrow {
class FileReader;
class FileWriter;
struct SchemaManifest;
} // namespace arrow
} // namespace parquet
namespace arrow {
namespace dataset {
/// \brief A FileFormat implementation that reads from Parquet files
class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
public:
ParquetFileFormat() = default;
/// Convenience constructor which copies properties from a parquet::ReaderProperties.
/// memory_pool will be ignored.
explicit ParquetFileFormat(const parquet::ReaderProperties& reader_properties);
std::string type_name() const override { return "parquet"; }
bool splittable() const override { return true; }
bool Equals(const FileFormat& other) const override;
struct ReaderOptions {
/// \defgroup parquet-file-format-reader-properties properties which correspond to
/// members of parquet::ReaderProperties.
///
/// We don't embed parquet::ReaderProperties directly because we get memory_pool from
/// ScanOptions at scan time and provide differing defaults.
///
/// @{
bool use_buffered_stream = false;
int64_t buffer_size = 1 << 13;
std::shared_ptr<parquet::FileDecryptionProperties> file_decryption_properties;
/// @}
/// \defgroup parquet-file-format-arrow-reader-properties properties which correspond
/// to members of parquet::ArrowReaderProperties.
///
/// We don't embed parquet::ReaderProperties directly because we get batch_size from
/// ScanOptions at scan time, and we will never pass use_threads == true (since we
/// defer parallelization of the scan). Additionally column names (rather than
/// indices) are used to indicate dictionary columns.
///
/// @{
std::unordered_set<std::string> dict_columns;
bool pre_buffer = false;
arrow::io::CacheOptions cache_options = arrow::io::CacheOptions::Defaults();
arrow::io::IOContext io_context;
/// @}
/// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
/// scan is already parallelized across input files to avoid thread contention. This
/// option will be removed after support is added for simultaneous parallelization
/// across files and columns.
bool enable_parallel_column_conversion = false;
} reader_options;
Result<bool> IsSupported(const FileSource& source) const override;
/// \brief Return the schema of the file if possible.
Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;
/// \brief Open a file for scanning
Result<ScanTaskIterator> ScanFile(
std::shared_ptr<ScanOptions> options,
const std::shared_ptr<FileFragment>& file) const override;
using FileFormat::MakeFragment;
/// \brief Create a Fragment targeting all RowGroups.
Result<std::shared_ptr<FileFragment>> MakeFragment(
FileSource source, Expression partition_expression,
std::shared_ptr<Schema> physical_schema) override;
/// \brief Create a Fragment, restricted to the specified row groups.
Result<std::shared_ptr<ParquetFileFragment>> MakeFragment(
FileSource source, Expression partition_expression,
std::shared_ptr<Schema> physical_schema, std::vector<int> row_groups);
/// \brief Return a FileReader on the given source.
Result<std::unique_ptr<parquet::arrow::FileReader>> GetReader(
const FileSource& source, ScanOptions* = NULLPTR) const;
Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const override;
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
};
/// \brief A FileFragment with parquet logic.
///
/// ParquetFileFragment provides a lazy (with respect to IO) interface to
/// scan parquet files. Any heavy IO calls are deferred to the Scan() method.
///
/// The caller can provide an optional list of selected RowGroups to limit the
/// number of scanned RowGroups, or to partition the scans across multiple
/// threads.
///
/// Metadata can be explicitly provided, enabling pushdown predicate benefits without
/// the potentially heavy IO of loading Metadata from the file system. This can induce
/// significant performance boost when scanning high latency file systems.
class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
public:
Result<FragmentVector> SplitByRowGroup(Expression predicate);
/// \brief Return the RowGroups selected by this fragment.
const std::vector<int>& row_groups() const {
if (row_groups_) return *row_groups_;
static std::vector<int> empty;
return empty;
}
/// \brief Return the FileMetaData associated with this fragment.
const std::shared_ptr<parquet::FileMetaData>& metadata() const { return metadata_; }
/// \brief Ensure this fragment's FileMetaData is in memory.
Status EnsureCompleteMetadata(parquet::arrow::FileReader* reader = NULLPTR);
/// \brief Return fragment which selects a filtered subset of this fragment's RowGroups.
Result<std::shared_ptr<Fragment>> Subset(Expression predicate);
Result<std::shared_ptr<Fragment>> Subset(std::vector<int> row_group_ids);
private:
ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
Expression partition_expression,
std::shared_ptr<Schema> physical_schema,
util::optional<std::vector<int>> row_groups);
Status SetMetadata(std::shared_ptr<parquet::FileMetaData> metadata,
std::shared_ptr<parquet::arrow::SchemaManifest> manifest);
// Overridden to opportunistically set metadata since a reader must be opened anyway.
Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
ARROW_RETURN_NOT_OK(EnsureCompleteMetadata());
return physical_schema_;
}
// Return a filtered subset of row group indices.
Result<std::vector<int>> FilterRowGroups(Expression predicate);
ParquetFileFormat& parquet_format_;
// Indices of row groups selected by this fragment,
// or util::nullopt if all row groups are selected.
util::optional<std::vector<int>> row_groups_;
std::vector<Expression> statistics_expressions_;
std::vector<bool> statistics_expressions_complete_;
std::shared_ptr<parquet::FileMetaData> metadata_;
std::shared_ptr<parquet::arrow::SchemaManifest> manifest_;
friend class ParquetFileFormat;
friend class ParquetDatasetFactory;
};
class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
public:
std::shared_ptr<parquet::WriterProperties> writer_properties;
std::shared_ptr<parquet::ArrowWriterProperties> arrow_writer_properties;
protected:
using FileWriteOptions::FileWriteOptions;
friend class ParquetFileFormat;
};
class ARROW_DS_EXPORT ParquetFileWriter : public FileWriter {
public:
const std::shared_ptr<parquet::arrow::FileWriter>& parquet_writer() const {
return parquet_writer_;
}
Status Write(const std::shared_ptr<RecordBatch>& batch) override;
private:
ParquetFileWriter(std::shared_ptr<io::OutputStream> destination,
std::shared_ptr<parquet::arrow::FileWriter> writer,
std::shared_ptr<ParquetFileWriteOptions> options);
Status FinishInternal() override;
std::shared_ptr<parquet::arrow::FileWriter> parquet_writer_;
friend class ParquetFileFormat;
};
struct ParquetFactoryOptions {
// Either an explicit Partitioning or a PartitioningFactory to discover one.
//
// If a factory is provided, it will be used to infer a schema for partition fields
// based on file and directory paths then construct a Partitioning. The default
// is a Partitioning which will yield no partition information.
//
// The (explicit or discovered) partitioning will be applied to discovered files
// and the resulting partition information embedded in the Dataset.
PartitioningOrFactory partitioning{Partitioning::Default()};
// For the purposes of applying the partitioning, paths will be stripped
// of the partition_base_dir. Files not matching the partition_base_dir
// prefix will be skipped for partition discovery. The ignored files will still
// be part of the Dataset, but will not have partition information.
//
// Example:
// partition_base_dir = "/dataset";
//
// - "/dataset/US/sales.csv" -> "US/sales.csv" will be given to the partitioning
//
// - "/home/john/late_sales.csv" -> Will be ignored for partition discovery.
//
// This is useful for partitioning which parses directory when ordering
// is important, e.g. DirectoryPartitioning.
std::string partition_base_dir;
// Assert that all ColumnChunk paths are consistent. The parquet spec allows for
// ColumnChunk data to be stored in multiple files, but ParquetDatasetFactory
// supports only a single file with all ColumnChunk data. If this flag is set
// construction of a ParquetDatasetFactory will raise an error if ColumnChunk
// data is not resident in a single file.
bool validate_column_chunk_paths = false;
};
/// \brief Create FileSystemDataset from custom `_metadata` cache file.
///
/// Dask and other systems will generate a cache metadata file by concatenating
/// the RowGroupMetaData of multiple parquet files into a single parquet file
/// that only contains metadata and no ColumnChunk data.
///
/// ParquetDatasetFactory creates a FileSystemDataset composed of
/// ParquetFileFragment where each fragment is pre-populated with the exact
/// number of row groups and statistics for each columns.
class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
public:
/// \brief Create a ParquetDatasetFactory from a metadata path.
///
/// The `metadata_path` will be read from `filesystem`. Each RowGroup
/// contained in the metadata file will be relative to `dirname(metadata_path)`.
///
/// \param[in] metadata_path path of the metadata parquet file
/// \param[in] filesystem from which to open/read the path
/// \param[in] format to read the file with.
/// \param[in] options see ParquetFactoryOptions
static Result<std::shared_ptr<DatasetFactory>> Make(
const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions options);
/// \brief Create a ParquetDatasetFactory from a metadata source.
///
/// Similar to the previous Make definition, but the metadata can be a Buffer
/// and the base_path is explicited instead of inferred from the metadata
/// path.
///
/// \param[in] metadata source to open the metadata parquet file from
/// \param[in] base_path used as the prefix of every parquet files referenced
/// \param[in] filesystem from which to read the files referenced.
/// \param[in] format to read the file with.
/// \param[in] options see ParquetFactoryOptions
static Result<std::shared_ptr<DatasetFactory>> Make(
const FileSource& metadata, const std::string& base_path,
std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions options);
Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
InspectOptions options) override;
Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;
protected:
ParquetDatasetFactory(
std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<ParquetFileFormat> format,
std::shared_ptr<parquet::FileMetaData> metadata,
std::shared_ptr<parquet::arrow::SchemaManifest> manifest,
std::shared_ptr<Schema> physical_schema, std::string base_path,
ParquetFactoryOptions options,
std::unordered_map<std::string, std::vector<int>> path_to_row_group_ids)
: filesystem_(std::move(filesystem)),
format_(std::move(format)),
metadata_(std::move(metadata)),
manifest_(std::move(manifest)),
physical_schema_(std::move(physical_schema)),
base_path_(std::move(base_path)),
options_(std::move(options)),
path_to_row_group_ids_(std::move(path_to_row_group_ids)) {}
std::shared_ptr<fs::FileSystem> filesystem_;
std::shared_ptr<ParquetFileFormat> format_;
std::shared_ptr<parquet::FileMetaData> metadata_;
std::shared_ptr<parquet::arrow::SchemaManifest> manifest_;
std::shared_ptr<Schema> physical_schema_;
std::string base_path_;
ParquetFactoryOptions options_;
std::unordered_map<std::string, std::vector<int>> path_to_row_group_ids_;
private:
Result<std::vector<std::shared_ptr<FileFragment>>> CollectParquetFragments(
const Partitioning& partitioning);
Result<std::shared_ptr<Schema>> PartitionSchema();
};
} // namespace dataset
} // namespace arrow