ARROW-11972: [C++][R][Python][Dataset] Extract IPC/Parquet fragment scan options
Closes #9790 from lidavidm/arrow-11972
Authored-by: David Li <li.davidm96@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h
index b28bf7a..a5ac474 100644
--- a/cpp/src/arrow/dataset/dataset_internal.h
+++ b/cpp/src/arrow/dataset/dataset_internal.h
@@ -191,7 +191,7 @@
/// but of the wrong type, an error is returned.
template <typename T>
arrow::Result<std::shared_ptr<T>> GetFragmentScanOptions(
- const std::string& type_name, ScanOptions* scan_options,
+ const std::string& type_name, const ScanOptions* scan_options,
const std::shared_ptr<FragmentScanOptions>& default_options) {
auto source = default_options;
if (scan_options && scan_options->fragment_scan_options) {
diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc
index a81e8b7..24ea6e3 100644
--- a/cpp/src/arrow/dataset/file_ipc.cc
+++ b/cpp/src/arrow/dataset/file_ipc.cc
@@ -29,6 +29,7 @@
#include "arrow/ipc/writer.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
namespace arrow {
@@ -81,15 +82,28 @@
Result<RecordBatchIterator> Execute() override {
struct Impl {
- static Result<RecordBatchIterator> Make(
- const FileSource& source, std::vector<std::string> materialized_fields,
- MemoryPool* pool) {
+ static Result<RecordBatchIterator> Make(const FileSource& source,
+ FileFormat* format,
+ const ScanOptions* scan_options) {
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source));
- auto options = default_read_options();
- options.memory_pool = pool;
- ARROW_ASSIGN_OR_RAISE(options.included_fields,
- GetIncludedFields(*reader->schema(), materialized_fields));
+ ARROW_ASSIGN_OR_RAISE(
+ auto ipc_scan_options,
+ GetFragmentScanOptions<IpcFragmentScanOptions>(
+ kIpcTypeName, scan_options, format->default_fragment_scan_options));
+ auto options = ipc_scan_options->options ? *ipc_scan_options->options
+ : default_read_options();
+ options.memory_pool = scan_options->pool;
+ options.use_threads = false;
+ if (!options.included_fields.empty()) {
+ // Cannot set them here
+ ARROW_LOG(WARNING) << "IpcFragmentScanOptions.options->included_fields was set "
+ "but will be ignored; included_fields are derived from "
+ "fields referenced by the scan";
+ }
+ ARROW_ASSIGN_OR_RAISE(
+ options.included_fields,
+ GetIncludedFields(*reader->schema(), scan_options->MaterializedFields()));
ARROW_ASSIGN_OR_RAISE(reader, OpenReader(source, options));
return RecordBatchIterator(Impl{std::move(reader), 0});
@@ -107,7 +121,9 @@
int i_;
};
- return Impl::Make(source_, options_->MaterializedFields(), options_->pool);
+ return Impl::Make(
+ source_, internal::checked_pointer_cast<FileFragment>(fragment_)->format().get(),
+ options_.get());
}
private:
diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h
index cbfb6b8..a7bcd04 100644
--- a/cpp/src/arrow/dataset/file_ipc.h
+++ b/cpp/src/arrow/dataset/file_ipc.h
@@ -31,10 +31,12 @@
namespace arrow {
namespace dataset {
+constexpr char kIpcTypeName[] = "ipc";
+
/// \brief A FileFormat implementation that reads from and writes to Ipc files
class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
public:
- std::string type_name() const override { return "ipc"; }
+ std::string type_name() const override { return kIpcTypeName; }
bool Equals(const FileFormat& other) const override {
return type_name() == other.type_name();
@@ -59,6 +61,16 @@
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
};
+/// \brief Per-scan options for IPC fragments
+class ARROW_DS_EXPORT IpcFragmentScanOptions : public FragmentScanOptions {
+ public:
+ std::string type_name() const override { return kIpcTypeName; }
+
+ /// Options passed to the IPC file reader.
+ /// included_fields, memory_pool, and use_threads are ignored.
+ std::shared_ptr<ipc::IpcReadOptions> options;
+};
+
class ARROW_DS_EXPORT IpcFileWriteOptions : public FileWriteOptions {
public:
/// Options passed to ipc::MakeFileWriter. use_threads is ignored
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc
index 8a5fd02..502b61c 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -134,6 +134,28 @@
ASSERT_EQ(row_count, kNumRows);
}
+TEST_F(TestIpcFileFormat, FragmentScanOptions) {
+ auto reader = GetRecordBatchReader(
+ // ARROW-12077: on Windows/mimalloc/release, nullable list column leads to crash
+ schema({field("list", list(float64()), false,
+ key_value_metadata({{"max_length", "1"}})),
+ field("f64", float64())}));
+ auto source = GetFileSource(reader.get());
+
+ SetSchema(reader->schema()->fields());
+ ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+
+ // Set scan options that ensure reading fails
+ auto fragment_scan_options = std::make_shared<IpcFragmentScanOptions>();
+ fragment_scan_options->options = std::make_shared<ipc::IpcReadOptions>();
+ fragment_scan_options->options->max_recursion_depth = 0;
+ opts_->fragment_scan_options = fragment_scan_options;
+ ASSERT_OK_AND_ASSIGN(auto scan_tasks, fragment->Scan(opts_));
+ ASSERT_OK_AND_ASSIGN(auto scan_task, scan_tasks.Next());
+ ASSERT_OK_AND_ASSIGN(auto batches, scan_task->Execute());
+ ASSERT_RAISES(Invalid, batches.Next());
+}
+
TEST_F(TestIpcFileFormat, ScanRecordBatchReaderWithVirtualColumn) {
auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
auto source = GetFileSource(reader.get());
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index d255787..8caae94 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -128,15 +128,18 @@
};
static parquet::ReaderProperties MakeReaderProperties(
- const ParquetFileFormat& format, MemoryPool* pool = default_memory_pool()) {
+ const ParquetFileFormat& format, ParquetFragmentScanOptions* parquet_scan_options,
+ MemoryPool* pool = default_memory_pool()) {
+ // Can't mutate pool after construction
parquet::ReaderProperties properties(pool);
- if (format.reader_options.use_buffered_stream) {
+ if (parquet_scan_options->reader_properties->is_buffered_stream_enabled()) {
properties.enable_buffered_stream();
} else {
properties.disable_buffered_stream();
}
- properties.set_buffer_size(format.reader_options.buffer_size);
- properties.file_decryption_properties(format.reader_options.file_decryption_properties);
+ properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());
+ properties.file_decryption_properties(
+ parquet_scan_options->reader_properties->file_decryption_properties());
return properties;
}
@@ -249,24 +252,23 @@
checked_cast<const ParquetFileFormat&>(other).reader_options;
// FIXME implement comparison for decryption options
- // FIXME extract these to scan time options so comparison is unnecessary
- return reader_options.use_buffered_stream == other_reader_options.use_buffered_stream &&
- reader_options.buffer_size == other_reader_options.buffer_size &&
- reader_options.dict_columns == other_reader_options.dict_columns;
+ return reader_options.dict_columns == other_reader_options.dict_columns;
}
ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties) {
- reader_options.use_buffered_stream = reader_properties.is_buffered_stream_enabled();
- reader_options.buffer_size = reader_properties.buffer_size();
- reader_options.file_decryption_properties =
- reader_properties.file_decryption_properties();
+ auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
+ *parquet_scan_options->reader_properties = reader_properties;
+ default_fragment_scan_options = std::move(parquet_scan_options);
}
Result<bool> ParquetFileFormat::IsSupported(const FileSource& source) const {
try {
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
- auto reader =
- parquet::ParquetFileReader::Open(std::move(input), MakeReaderProperties(*this));
+ ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options,
+ GetFragmentScanOptions<ParquetFragmentScanOptions>(
+ kParquetTypeName, nullptr, default_fragment_scan_options));
+ auto reader = parquet::ParquetFileReader::Open(
+ std::move(input), MakeReaderProperties(*this, parquet_scan_options.get()));
std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
return metadata != nullptr && metadata->can_decompress();
} catch (const ::parquet::ParquetInvalidOrCorruptedFileException& e) {
@@ -290,8 +292,11 @@
Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader(
const FileSource& source, ScanOptions* options) const {
+ ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options,
+ GetFragmentScanOptions<ParquetFragmentScanOptions>(
+ kParquetTypeName, options, default_fragment_scan_options));
MemoryPool* pool = options ? options->pool : default_memory_pool();
- auto properties = MakeReaderProperties(*this, pool);
+ auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), pool);
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
std::unique_ptr<parquet::ParquetFileReader> reader;
@@ -310,7 +315,8 @@
}
if (options && !options->use_threads) {
- arrow_properties.set_use_threads(reader_options.enable_parallel_column_conversion);
+ arrow_properties.set_use_threads(
+ parquet_scan_options->enable_parallel_column_conversion);
}
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
@@ -356,15 +362,21 @@
auto column_projection = InferColumnProjection(*reader, *options);
ScanTaskVector tasks(row_groups.size());
+ ARROW_ASSIGN_OR_RAISE(
+ auto parquet_scan_options,
+ GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
+ default_fragment_scan_options));
std::shared_ptr<std::once_flag> pre_buffer_once = nullptr;
- if (reader_options.pre_buffer) {
+ if (parquet_scan_options->arrow_reader_properties->pre_buffer()) {
pre_buffer_once = std::make_shared<std::once_flag>();
}
for (size_t i = 0; i < row_groups.size(); ++i) {
tasks[i] = std::make_shared<ParquetScanTask>(
row_groups[i], column_projection, reader, pre_buffer_once, row_groups,
- reader_options.io_context, reader_options.cache_options, options, fragment);
+ parquet_scan_options->arrow_reader_properties->io_context(),
+ parquet_scan_options->arrow_reader_properties->cache_options(), options,
+ fragment);
}
return MakeVectorIterator(std::move(tasks));
@@ -587,6 +599,16 @@
}
//
+// ParquetFragmentScanOptions
+//
+
+ParquetFragmentScanOptions::ParquetFragmentScanOptions() {
+ reader_properties = std::make_shared<parquet::ReaderProperties>();
+ arrow_reader_properties =
+ std::make_shared<parquet::ArrowReaderProperties>(/*use_threads=*/false);
+}
+
+//
// ParquetDatasetFactory
//
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index 869857e..fa0d7de 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -57,6 +57,8 @@
namespace arrow {
namespace dataset {
+constexpr char kParquetTypeName[] = "parquet";
+
/// \brief A FileFormat implementation that reads from Parquet files
class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
public:
@@ -66,45 +68,23 @@
/// memory_pool will be ignored.
explicit ParquetFileFormat(const parquet::ReaderProperties& reader_properties);
- std::string type_name() const override { return "parquet"; }
+ std::string type_name() const override { return kParquetTypeName; }
bool splittable() const override { return true; }
bool Equals(const FileFormat& other) const override;
struct ReaderOptions {
- /// \defgroup parquet-file-format-reader-properties properties which correspond to
- /// members of parquet::ReaderProperties.
- ///
- /// We don't embed parquet::ReaderProperties directly because we get memory_pool from
- /// ScanOptions at scan time and provide differing defaults.
- ///
- /// @{
- bool use_buffered_stream = false;
- int64_t buffer_size = 1 << 13;
- std::shared_ptr<parquet::FileDecryptionProperties> file_decryption_properties;
- /// @}
-
/// \defgroup parquet-file-format-arrow-reader-properties properties which correspond
/// to members of parquet::ArrowReaderProperties.
///
- /// We don't embed parquet::ReaderProperties directly because we get batch_size from
- /// ScanOptions at scan time, and we will never pass use_threads == true (since we
- /// defer parallelization of the scan). Additionally column names (rather than
- /// indices) are used to indicate dictionary columns.
+ /// We don't embed parquet::ReaderProperties directly because column names (rather
+ /// than indices) are used to indicate dictionary columns, and other options are
+ /// deferred to scan time.
///
/// @{
std::unordered_set<std::string> dict_columns;
- bool pre_buffer = false;
- arrow::io::CacheOptions cache_options = arrow::io::CacheOptions::Defaults();
- arrow::io::IOContext io_context;
/// @}
-
- /// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
- /// scan is already parallelized across input files to avoid thread contention. This
- /// option will be removed after support is added for simultaneous parallelization
- /// across files and columns.
- bool enable_parallel_column_conversion = false;
} reader_options;
Result<bool> IsSupported(const FileSource& source) const override;
@@ -206,6 +186,27 @@
friend class ParquetDatasetFactory;
};
+/// \brief Per-scan options for Parquet fragments
+class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
+ public:
+ ParquetFragmentScanOptions();
+ std::string type_name() const override { return kParquetTypeName; }
+
+ /// Reader properties. Not all properties are respected: memory_pool comes from
+ /// ScanOptions.
+ std::shared_ptr<parquet::ReaderProperties> reader_properties;
+ /// Arrow reader properties. Not all properties are respected: batch_size comes from
+ /// ScanOptions, and use_threads will be overridden based on
+ /// enable_parallel_column_conversion. Additionally, dictionary columns come from
+ /// ParquetFileFormat::ReaderOptions::dict_columns.
+ std::shared_ptr<parquet::ArrowReaderProperties> arrow_reader_properties;
+ /// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
+ /// scan is already parallelized across input files to avoid thread contention. This
+ /// option will be removed after support is added for simultaneous parallelization
+ /// across files and columns.
+ bool enable_parallel_column_conversion = false;
+};
+
class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
public:
std::shared_ptr<parquet::WriterProperties> writer_properties;
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc
index cf14a2b..bb06e7f 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -272,8 +272,10 @@
SetSchema(reader->schema()->fields());
SetFilter(literal(true));
- format_->reader_options.pre_buffer = true;
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+ auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
+ fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
+ opts_->fragment_scan_options = fragment_scan_options;
ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_));
int64_t task_count = 0;
@@ -636,6 +638,7 @@
options->arrow_writer_properties = parquet::ArrowWriterProperties::Builder()
.coerce_timestamps(coerce_timestamps_to)
+ ->allow_truncated_timestamps()
->build();
EXPECT_OK_AND_ASSIGN(auto writer, format_->MakeWriter(sink, reader->schema(), options));
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index a6e761c..72cde36 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -42,6 +42,7 @@
#include "arrow/table.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
#include "arrow/util/io_util.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
@@ -102,7 +103,7 @@
std::unique_ptr<RecordBatchReader> MakeGeneratedRecordBatch(
std::shared_ptr<Schema> schema, int64_t batch_size, int64_t batch_repetitions) {
- auto batch = ConstantArrayGenerator::Zeroes(batch_size, schema);
+ auto batch = random::GenerateBatch(schema->fields(), batch_size, /*seed=*/0);
int64_t i = 0;
return MakeGeneratedRecordBatch(
schema, [batch, i, batch_repetitions](std::shared_ptr<RecordBatch>* out) mutable {
diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h
index d148d4e..6ba65a6 100644
--- a/cpp/src/arrow/dataset/type_fwd.h
+++ b/cpp/src/arrow/dataset/type_fwd.h
@@ -65,9 +65,11 @@
class IpcFileFormat;
class IpcFileWriter;
class IpcFileWriteOptions;
+class IpcFragmentScanOptions;
class ParquetFileFormat;
class ParquetFileFragment;
+class ParquetFragmentScanOptions;
class ParquetFileWriter;
class ParquetFileWriteOptions;
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index e0096c7..3320b47 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -23,6 +23,7 @@
from cython.operator cimport dereference as deref
import os
+import warnings
import pyarrow as pa
from pyarrow.lib cimport *
@@ -1113,10 +1114,14 @@
@staticmethod
cdef wrap(const shared_ptr[CFragmentScanOptions]& sp):
+ if not sp:
+ return None
+
type_name = frombytes(sp.get().type_name())
classes = {
'csv': CsvFragmentScanOptions,
+ 'parquet': ParquetFragmentScanOptions,
}
class_ = classes.get(type_name, None)
@@ -1287,56 +1292,20 @@
Parameters
----------
- use_buffered_stream : bool, default False
- Read files through buffered input streams rather than loading entire
- row groups at once. This may be enabled to reduce memory overhead.
- Disabled by default.
- buffer_size : int, default 8192
- Size of buffered stream, if enabled. Default is 8KB.
dictionary_columns : list of string, default None
Names of columns which should be dictionary encoded as
they are read.
- pre_buffer : bool, default False
- If enabled, pre-buffer the raw Parquet data instead of issuing one
- read per column chunk. This can improve performance on high-latency
- filesystems.
- enable_parallel_column_conversion : bool, default False
- EXPERIMENTAL: Parallelize conversion across columns. This option is
- ignored if a scan is already parallelized across input files to avoid
- thread contention. This option will be removed after support is added
- for simultaneous parallelization across files and columns.
"""
cdef public:
- bint use_buffered_stream
- uint32_t buffer_size
set dictionary_columns
- bint pre_buffer
- bint enable_parallel_column_conversion
- def __init__(self, bint use_buffered_stream=False,
- buffer_size=8192,
- dictionary_columns=None,
- bint pre_buffer=False,
- bint enable_parallel_column_conversion=False):
- self.use_buffered_stream = use_buffered_stream
- if buffer_size <= 0:
- raise ValueError("Buffer size must be larger than zero")
- self.buffer_size = buffer_size
+ # Also see _PARQUET_READ_OPTIONS
+ def __init__(self, dictionary_columns=None):
self.dictionary_columns = set(dictionary_columns or set())
- self.pre_buffer = pre_buffer
- self.enable_parallel_column_conversion = \
- enable_parallel_column_conversion
def equals(self, ParquetReadOptions other):
- return (
- self.use_buffered_stream == other.use_buffered_stream and
- self.buffer_size == other.buffer_size and
- self.dictionary_columns == other.dictionary_columns and
- self.pre_buffer == other.pre_buffer and
- self.enable_parallel_column_conversion ==
- other.enable_parallel_column_conversion
- )
+ return self.dictionary_columns == other.dictionary_columns
def __eq__(self, other):
try:
@@ -1344,6 +1313,10 @@
except TypeError:
return False
+ def __repr__(self):
+ return (f"<ParquetReadOptions"
+ f" dictionary_columns={self.dictionary_columns}>")
+
cdef class ParquetFileWriteOptions(FileWriteOptions):
@@ -1425,38 +1398,74 @@
self._set_arrow_properties()
+cdef set _PARQUET_READ_OPTIONS = {'dictionary_columns'}
+
+
cdef class ParquetFileFormat(FileFormat):
cdef:
CParquetFileFormat* parquet_format
- def __init__(self, read_options=None):
+ def __init__(self, read_options=None,
+ default_fragment_scan_options=None, **kwargs):
cdef:
shared_ptr[CParquetFileFormat] wrapped
CParquetFileFormatReaderOptions* options
- # Read options
+ # Read/scan options
+ read_options_args = {option: kwargs[option] for option in kwargs
+ if option in _PARQUET_READ_OPTIONS}
+ scan_args = {option: kwargs[option] for option in kwargs
+ if option not in _PARQUET_READ_OPTIONS}
+ if read_options and read_options_args:
+ duplicates = ', '.join(sorted(read_options_args))
+ raise ValueError(f'If `read_options` is given, '
+ f'cannot specify {duplicates}')
+ if default_fragment_scan_options and scan_args:
+ duplicates = ', '.join(sorted(scan_args))
+ raise ValueError(f'If `default_fragment_scan_options` is given, '
+ f'cannot specify {duplicates}')
if read_options is None:
- read_options = ParquetReadOptions()
+ read_options = ParquetReadOptions(**read_options_args)
elif isinstance(read_options, dict):
- read_options = ParquetReadOptions(**read_options)
+ # For backwards compatibility
+ duplicates = []
+ for option, value in read_options.items():
+ if option in _PARQUET_READ_OPTIONS:
+ read_options_args[option] = value
+ else:
+ duplicates.append(option)
+ scan_args[option] = value
+ if duplicates:
+ duplicates = ", ".join(duplicates)
+ warnings.warn(f'The scan options {duplicates} should be '
+ 'specified directly as keyword arguments')
+ read_options = ParquetReadOptions(**read_options_args)
elif not isinstance(read_options, ParquetReadOptions):
raise TypeError('`read_options` must be either a dictionary or an '
'instance of ParquetReadOptions')
+ if default_fragment_scan_options is None:
+ default_fragment_scan_options = ParquetFragmentScanOptions(
+ **scan_args)
+ elif isinstance(default_fragment_scan_options, dict):
+ default_fragment_scan_options = ParquetFragmentScanOptions(
+ **default_fragment_scan_options)
+ elif not isinstance(default_fragment_scan_options,
+ ParquetFragmentScanOptions):
+ raise TypeError('`default_fragment_scan_options` must be either a '
+ 'dictionary or an instance of '
+ 'ParquetFragmentScanOptions')
+
wrapped = make_shared[CParquetFileFormat]()
options = &(wrapped.get().reader_options)
- options.use_buffered_stream = read_options.use_buffered_stream
- options.buffer_size = read_options.buffer_size
- options.pre_buffer = read_options.pre_buffer
- options.enable_parallel_column_conversion = \
- read_options.enable_parallel_column_conversion
if read_options.dictionary_columns is not None:
for column in read_options.dictionary_columns:
options.dict_columns.insert(tobytes(column))
self.init(<shared_ptr[CFileFormat]> wrapped)
+ self.default_fragment_scan_options = default_fragment_scan_options
cdef void init(self, const shared_ptr[CFileFormat]& sp):
FileFormat.init(self, sp)
@@ -1467,14 +1476,8 @@
cdef CParquetFileFormatReaderOptions* options
options = &self.parquet_format.reader_options
return ParquetReadOptions(
- use_buffered_stream=options.use_buffered_stream,
- buffer_size=options.buffer_size,
dictionary_columns={frombytes(col)
for col in options.dict_columns},
- pre_buffer=options.pre_buffer,
- enable_parallel_column_conversion=(
- options.enable_parallel_column_conversion
- )
)
def make_write_options(self, **kwargs):
@@ -1482,13 +1485,25 @@
(<ParquetFileWriteOptions> opts).update(**kwargs)
return opts
+ cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
+ if options.type_name == 'parquet':
+ self.parquet_format.default_fragment_scan_options = options.wrapped
+ else:
+ super()._set_default_fragment_scan_options(options)
+
def equals(self, ParquetFileFormat other):
return (
- self.read_options.equals(other.read_options)
+ self.read_options.equals(other.read_options) and
+ self.default_fragment_scan_options ==
+ other.default_fragment_scan_options
)
def __reduce__(self):
- return ParquetFileFormat, (self.read_options, )
+ return ParquetFileFormat, (self.read_options,
+ self.default_fragment_scan_options)
+
+ def __repr__(self):
+ return f"<ParquetFileFormat read_options={self.read_options}>"
def make_fragment(self, file, filesystem=None,
Expression partition_expression=None, row_groups=None):
@@ -1513,6 +1528,109 @@
return Fragment.wrap(move(c_fragment))
+cdef class ParquetFragmentScanOptions(FragmentScanOptions):
+ """Scan-specific options for Parquet fragments.
+
+ Parameters
+ ----------
+ use_buffered_stream : bool, default False
+ Read files through buffered input streams rather than loading entire
+ row groups at once. This may be enabled to reduce memory overhead.
+ Disabled by default.
+ buffer_size : int, default 8192
+ Size of buffered stream, if enabled. Default is 8KB.
+ pre_buffer : bool, default False
+ If enabled, pre-buffer the raw Parquet data instead of issuing one
+ read per column chunk. This can improve performance on high-latency
+ filesystems.
+ enable_parallel_column_conversion : bool, default False
+ EXPERIMENTAL: Parallelize conversion across columns. This option is
+ ignored if a scan is already parallelized across input files to avoid
+ thread contention. This option will be removed after support is added
+ for simultaneous parallelization across files and columns.
+ """
+
+ cdef:
+ CParquetFragmentScanOptions* parquet_options
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, bint use_buffered_stream=False,
+ buffer_size=8192,
+ bint pre_buffer=False,
+ bint enable_parallel_column_conversion=False):
+ self.init(shared_ptr[CFragmentScanOptions](
+ new CParquetFragmentScanOptions()))
+ self.use_buffered_stream = use_buffered_stream
+ self.buffer_size = buffer_size
+ self.pre_buffer = pre_buffer
+ self.enable_parallel_column_conversion = \
+ enable_parallel_column_conversion
+
+ cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
+ FragmentScanOptions.init(self, sp)
+ self.parquet_options = <CParquetFragmentScanOptions*> sp.get()
+
+ cdef CReaderProperties* reader_properties(self):
+ return self.parquet_options.reader_properties.get()
+
+ cdef ArrowReaderProperties* arrow_reader_properties(self):
+ return self.parquet_options.arrow_reader_properties.get()
+
+ @property
+ def use_buffered_stream(self):
+ return self.reader_properties().is_buffered_stream_enabled()
+
+ @use_buffered_stream.setter
+ def use_buffered_stream(self, bint use_buffered_stream):
+ if use_buffered_stream:
+ self.reader_properties().enable_buffered_stream()
+ else:
+ self.reader_properties().disable_buffered_stream()
+
+ @property
+ def buffer_size(self):
+ return self.reader_properties().buffer_size()
+
+ @buffer_size.setter
+ def buffer_size(self, buffer_size):
+ if buffer_size <= 0:
+ raise ValueError("Buffer size must be larger than zero")
+ self.reader_properties().set_buffer_size(buffer_size)
+
+ @property
+ def pre_buffer(self):
+ return self.arrow_reader_properties().pre_buffer()
+
+ @pre_buffer.setter
+ def pre_buffer(self, bint pre_buffer):
+ self.arrow_reader_properties().set_pre_buffer(pre_buffer)
+
+ @property
+ def enable_parallel_column_conversion(self):
+ return self.parquet_options.enable_parallel_column_conversion
+
+ @enable_parallel_column_conversion.setter
+ def enable_parallel_column_conversion(
+ self, bint enable_parallel_column_conversion):
+ self.parquet_options.enable_parallel_column_conversion = \
+ enable_parallel_column_conversion
+
+ def equals(self, ParquetFragmentScanOptions other):
+ return (
+ self.use_buffered_stream == other.use_buffered_stream and
+ self.buffer_size == other.buffer_size and
+ self.pre_buffer == other.pre_buffer and
+ self.enable_parallel_column_conversion ==
+ other.enable_parallel_column_conversion)
+
+ def __reduce__(self):
+ return ParquetFragmentScanOptions, (
+ self.use_buffered_stream, self.buffer_size, self.pre_buffer,
+ self.enable_parallel_column_conversion)
+
+
cdef class IpcFileWriteOptions(FileWriteOptions):
def __init__(self):
@@ -1543,14 +1661,28 @@
__slots__ = ()
def __init__(self, ParseOptions parse_options=None,
+ default_fragment_scan_options=None,
ConvertOptions convert_options=None,
ReadOptions read_options=None):
self.init(shared_ptr[CFileFormat](new CCsvFileFormat()))
if parse_options is not None:
self.parse_options = parse_options
if convert_options is not None or read_options is not None:
+ if default_fragment_scan_options:
+ raise ValueError('If `default_fragment_scan_options` is '
+ 'given, cannot specify convert_options '
+ 'or read_options')
self.default_fragment_scan_options = CsvFragmentScanOptions(
convert_options=convert_options, read_options=read_options)
+ elif isinstance(default_fragment_scan_options, dict):
+ self.default_fragment_scan_options = CsvFragmentScanOptions(
+ **default_fragment_scan_options)
+ elif isinstance(default_fragment_scan_options, CsvFragmentScanOptions):
+ self.default_fragment_scan_options = default_fragment_scan_options
+ elif default_fragment_scan_options is not None:
+ raise TypeError('`default_fragment_scan_options` must be either '
+ 'a dictionary or an instance of '
+ 'CsvFragmentScanOptions')
cdef void init(self, const shared_ptr[CFileFormat]& sp):
FileFormat.init(self, sp)
@@ -1574,10 +1706,14 @@
super()._set_default_fragment_scan_options(options)
def equals(self, CsvFileFormat other):
- return self.parse_options.equals(other.parse_options)
+ return (
+ self.parse_options.equals(other.parse_options) and
+ self.default_fragment_scan_options ==
+ other.default_fragment_scan_options)
def __reduce__(self):
- return CsvFileFormat, (self.parse_options,)
+ return CsvFileFormat, (self.parse_options,
+ self.default_fragment_scan_options)
def __repr__(self):
return f"<CsvFileFormat parse_options={self.parse_options}>"
@@ -1622,8 +1758,10 @@
self.csv_options.read_options = read_options.options
def equals(self, CsvFragmentScanOptions other):
- return (self.convert_options.equals(other.convert_options) and
- self.read_options.equals(other.read_options))
+ return (
+ other and
+ self.convert_options.equals(other.convert_options) and
+ self.read_options.equals(other.read_options))
def __reduce__(self):
return CsvFragmentScanOptions, (self.convert_options,
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 8fa1c85..96bfd77 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -329,6 +329,7 @@
uint32_t* metadata_len)
cdef cppclass CReaderProperties" parquet::ReaderProperties":
+ c_bool is_buffered_stream_enabled() const
void enable_buffered_stream()
void disable_buffered_stream()
void set_buffer_size(int64_t buf_size)
@@ -342,6 +343,8 @@
c_bool read_dictionary()
void set_batch_size(int64_t batch_size)
int64_t batch_size()
+ void set_pre_buffer(c_bool pre_buffer)
+ c_bool pre_buffer() const
ArrowReaderProperties default_arrow_reader_properties()
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 0c65070..615cb25 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -43,6 +43,7 @@
ParquetFileFormat,
ParquetFileFragment,
ParquetFileWriteOptions,
+ ParquetFragmentScanOptions,
ParquetReadOptions,
Partitioning,
PartitioningFactory,
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index db2e73a..06ec69c 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -237,11 +237,7 @@
cdef cppclass CParquetFileFormatReaderOptions \
"arrow::dataset::ParquetFileFormat::ReaderOptions":
- c_bool use_buffered_stream
- int64_t buffer_size
unordered_set[c_string] dict_columns
- c_bool pre_buffer
- c_bool enable_parallel_column_conversion
cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"(
CFileFormat):
@@ -252,6 +248,12 @@
shared_ptr[CSchema] physical_schema,
vector[int] row_groups)
+ cdef cppclass CParquetFragmentScanOptions \
+ "arrow::dataset::ParquetFragmentScanOptions"(CFragmentScanOptions):
+ shared_ptr[CReaderProperties] reader_properties
+ shared_ptr[ArrowReaderProperties] arrow_reader_properties
+ c_bool enable_parallel_column_conversion
+
cdef cppclass CIpcFileWriteOptions \
"arrow::dataset::IpcFileWriteOptions"(CFileWriteOptions):
pass
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 1b0a336..ed8d387 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -1536,7 +1536,7 @@
self._enable_parallel_column_conversion = True
read_options.update(enable_parallel_column_conversion=True)
- parquet_format = ds.ParquetFileFormat(read_options=read_options)
+ parquet_format = ds.ParquetFileFormat(**read_options)
fragment = parquet_format.make_fragment(single_file, filesystem)
self._dataset = ds.FileSystemDataset(
@@ -1548,7 +1548,7 @@
else:
self._enable_parallel_column_conversion = False
- parquet_format = ds.ParquetFileFormat(read_options=read_options)
+ parquet_format = ds.ParquetFileFormat(**read_options)
# check partitioning to enable dictionary encoding
if partitioning == "hive":
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 5f4bcac..31f4e08 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -519,32 +519,38 @@
def test_parquet_read_options():
opts1 = ds.ParquetReadOptions()
- opts2 = ds.ParquetReadOptions(buffer_size=4096,
- dictionary_columns=['a', 'b'])
- opts3 = ds.ParquetReadOptions(buffer_size=2**13, use_buffered_stream=True,
- dictionary_columns={'a', 'b'})
- opts4 = ds.ParquetReadOptions(buffer_size=2**13, pre_buffer=True,
- dictionary_columns={'a', 'b'})
+ opts2 = ds.ParquetReadOptions(dictionary_columns=['a', 'b'])
+
+ assert opts1.dictionary_columns == set()
+
+ assert opts2.dictionary_columns == {'a', 'b'}
+
+ assert opts1 == opts1
+ assert opts1 != opts2
+
+
+def test_parquet_scan_options():
+ opts1 = ds.ParquetFragmentScanOptions()
+ opts2 = ds.ParquetFragmentScanOptions(buffer_size=4096)
+ opts3 = ds.ParquetFragmentScanOptions(
+ buffer_size=2**13, use_buffered_stream=True)
+ opts4 = ds.ParquetFragmentScanOptions(buffer_size=2**13, pre_buffer=True)
assert opts1.use_buffered_stream is False
assert opts1.buffer_size == 2**13
assert opts1.pre_buffer is False
- assert opts1.dictionary_columns == set()
assert opts2.use_buffered_stream is False
assert opts2.buffer_size == 2**12
assert opts2.pre_buffer is False
- assert opts2.dictionary_columns == {'a', 'b'}
assert opts3.use_buffered_stream is True
assert opts3.buffer_size == 2**13
assert opts3.pre_buffer is False
- assert opts3.dictionary_columns == {'a', 'b'}
assert opts4.use_buffered_stream is False
assert opts4.buffer_size == 2**13
assert opts4.pre_buffer is True
- assert opts4.dictionary_columns == {'a', 'b'}
assert opts1 == opts1
assert opts1 != opts2
@@ -563,14 +569,11 @@
ds.CsvFileFormat(read_options=pa.csv.ReadOptions(
skip_rows=3, block_size=2**20)),
ds.ParquetFileFormat(),
+ ds.ParquetFileFormat(dictionary_columns={'a'}),
+ ds.ParquetFileFormat(use_buffered_stream=True),
ds.ParquetFileFormat(
- read_options=ds.ParquetReadOptions(use_buffered_stream=True)
- ),
- ds.ParquetFileFormat(
- read_options={
- 'use_buffered_stream': True,
- 'buffer_size': 4096,
- }
+ use_buffered_stream=True,
+ buffer_size=4096,
)
]
for file_format in formats:
@@ -584,6 +587,8 @@
convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)),
ds.CsvFragmentScanOptions(
read_options=pa.csv.ReadOptions(block_size=2**16)),
+ ds.ParquetFragmentScanOptions(buffer_size=4096),
+ ds.ParquetFragmentScanOptions(pre_buffer=True),
]
for option in options:
assert pickle.loads(pickle.dumps(option)) == option
@@ -599,8 +604,8 @@
@pytest.mark.parametrize('pre_buffer', [False, True])
def test_filesystem_factory(mockfs, paths_or_selector, pre_buffer):
format = ds.ParquetFileFormat(
- read_options=ds.ParquetReadOptions(dictionary_columns={"str"},
- pre_buffer=pre_buffer)
+ read_options=ds.ParquetReadOptions(dictionary_columns={"str"}),
+ pre_buffer=pre_buffer
)
options = ds.FileSystemFactoryOptions('subdir')
@@ -719,10 +724,10 @@
]
dictionary_format = ds.ParquetFileFormat(
read_options=ds.ParquetReadOptions(
- use_buffered_stream=True,
- buffer_size=4096,
dictionary_columns=['alpha', 'animal']
- )
+ ),
+ use_buffered_stream=True,
+ buffer_size=4096,
)
cases = [
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 9cccec9..9811dc9 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -412,8 +412,8 @@
.Call(`_arrow_dataset___FileFormat__DefaultWriteOptions`, fmt)
}
-dataset___ParquetFileFormat__Make <- function(use_buffered_stream, buffer_size, dict_columns){
- .Call(`_arrow_dataset___ParquetFileFormat__Make`, use_buffered_stream, buffer_size, dict_columns)
+dataset___ParquetFileFormat__Make <- function(options, dict_columns){
+ .Call(`_arrow_dataset___ParquetFileFormat__Make`, options, dict_columns)
}
dataset___FileWriteOptions__type_name <- function(options){
@@ -448,6 +448,10 @@
.Call(`_arrow_dataset___CsvFragmentScanOptions__Make`, convert_options, read_options)
}
+dataset___ParquetFragmentScanOptions__Make <- function(use_buffered_stream, buffer_size, pre_buffer){
+ .Call(`_arrow_dataset___ParquetFragmentScanOptions__Make`, use_buffered_stream, buffer_size, pre_buffer)
+}
+
dataset___DirectoryPartitioning <- function(schm){
.Call(`_arrow_dataset___DirectoryPartitioning`, schm)
}
diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R
index cd54a30..854672b 100644
--- a/r/R/dataset-format.R
+++ b/r/R/dataset-format.R
@@ -34,11 +34,8 @@
#' * `...`: Additional format-specific options
#'
#' `format = "parquet"``:
-#' * `use_buffered_stream`: Read files through buffered input streams rather than
-#' loading entire row groups at once. This may be enabled
-#' to reduce memory overhead. Disabled by default.
-#' * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB.
#' * `dict_columns`: Names of columns which should be read as dictionaries.
+#' * Any Parquet options from [FragmentScanOptions].
#'
#' `format = "text"`: see [CsvParseOptions]. Note that you can specify them either
#' with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the
@@ -91,10 +88,10 @@
#' @rdname FileFormat
#' @export
ParquetFileFormat <- R6Class("ParquetFileFormat", inherit = FileFormat)
-ParquetFileFormat$create <- function(use_buffered_stream = FALSE,
- buffer_size = 8196,
+ParquetFileFormat$create <- function(...,
dict_columns = character(0)) {
- dataset___ParquetFileFormat__Make(use_buffered_stream, buffer_size, dict_columns)
+ options <- ParquetFragmentScanOptions$create(...)
+ dataset___ParquetFileFormat__Make(options, dict_columns)
}
#' @usage NULL
@@ -217,9 +214,18 @@
#' @section Factory:
#' `FragmentScanOptions$create()` takes the following arguments:
#' * `format`: A string identifier of the file format. Currently supported values:
+#' * "parquet"
#' * "csv"/"text", aliases for the same format.
#' * `...`: Additional format-specific options
#'
+#' `format = "parquet"``:
+#' * `use_buffered_stream`: Read files through buffered input streams rather than
+#' loading entire row groups at once. This may be enabled
+#' to reduce memory overhead. Disabled by default.
+#' * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB.
+#' * `pre_buffer`: Pre-buffer the raw Parquet data. This can improve performance
+#' on high-latency filesystems. Disabled by default.
+#
#' `format = "text"`: see [CsvConvertOptions]. Note that options can only be
#' specified with the Arrow C++ library naming. Also, "block_size" from
#' [CsvReadOptions] may be given.
@@ -240,6 +246,8 @@
opt_names <- names(list(...))
if (format %in% c("csv", "text", "tsv")) {
CsvFragmentScanOptions$create(...)
+ } else if (format == "parquet") {
+ ParquetFragmentScanOptions$create(...)
} else {
stop("Unsupported file format: ", format, call. = FALSE)
}
@@ -261,6 +269,17 @@
dataset___CsvFragmentScanOptions__Make(convert_opts, read_opts)
}
+#' @usage NULL
+#' @format NULL
+#' @rdname FragmentScanOptions
+#' @export
+ParquetFragmentScanOptions <- R6Class("ParquetFragmentScanOptions", inherit = FragmentScanOptions)
+ParquetFragmentScanOptions$create <- function(use_buffered_stream = FALSE,
+ buffer_size = 8196,
+ pre_buffer = FALSE) {
+ dataset___ParquetFragmentScanOptions__Make(use_buffered_stream, buffer_size, pre_buffer)
+}
+
#' Format-specific write options
#'
#' @description
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index d74ac0e..4c2ebed 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1612,17 +1612,16 @@
// dataset.cpp
#if defined(ARROW_R_WITH_DATASET)
-std::shared_ptr<ds::ParquetFileFormat> dataset___ParquetFileFormat__Make(bool use_buffered_stream, int64_t buffer_size, cpp11::strings dict_columns);
-extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){
+std::shared_ptr<ds::ParquetFileFormat> dataset___ParquetFileFormat__Make(const std::shared_ptr<ds::ParquetFragmentScanOptions>& options, cpp11::strings dict_columns);
+extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP options_sexp, SEXP dict_columns_sexp){
BEGIN_CPP11
- arrow::r::Input<bool>::type use_buffered_stream(use_buffered_stream_sexp);
- arrow::r::Input<int64_t>::type buffer_size(buffer_size_sexp);
+ arrow::r::Input<const std::shared_ptr<ds::ParquetFragmentScanOptions>&>::type options(options_sexp);
arrow::r::Input<cpp11::strings>::type dict_columns(dict_columns_sexp);
- return cpp11::as_sexp(dataset___ParquetFileFormat__Make(use_buffered_stream, buffer_size, dict_columns));
+ return cpp11::as_sexp(dataset___ParquetFileFormat__Make(options, dict_columns));
END_CPP11
}
#else
-extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){
+extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP options_sexp, SEXP dict_columns_sexp){
Rf_error("Cannot call dataset___ParquetFileFormat__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
}
#endif
@@ -1761,6 +1760,23 @@
// dataset.cpp
#if defined(ARROW_R_WITH_DATASET)
+std::shared_ptr<ds::ParquetFragmentScanOptions> dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buffer_size, bool pre_buffer);
+extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP pre_buffer_sexp){
+BEGIN_CPP11
+ arrow::r::Input<bool>::type use_buffered_stream(use_buffered_stream_sexp);
+ arrow::r::Input<int64_t>::type buffer_size(buffer_size_sexp);
+ arrow::r::Input<bool>::type pre_buffer(pre_buffer_sexp);
+ return cpp11::as_sexp(dataset___ParquetFragmentScanOptions__Make(use_buffered_stream, buffer_size, pre_buffer));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP pre_buffer_sexp){
+ Rf_error("Cannot call dataset___ParquetFragmentScanOptions__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
+}
+#endif
+
+// dataset.cpp
+#if defined(ARROW_R_WITH_DATASET)
std::shared_ptr<ds::DirectoryPartitioning> dataset___DirectoryPartitioning(const std::shared_ptr<arrow::Schema>& schm);
extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp){
BEGIN_CPP11
@@ -6659,7 +6675,7 @@
{ "_arrow_dataset___FileSystemDatasetFactory__Make3", (DL_FUNC) &_arrow_dataset___FileSystemDatasetFactory__Make3, 4},
{ "_arrow_dataset___FileFormat__type_name", (DL_FUNC) &_arrow_dataset___FileFormat__type_name, 1},
{ "_arrow_dataset___FileFormat__DefaultWriteOptions", (DL_FUNC) &_arrow_dataset___FileFormat__DefaultWriteOptions, 1},
- { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 3},
+ { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 2},
{ "_arrow_dataset___FileWriteOptions__type_name", (DL_FUNC) &_arrow_dataset___FileWriteOptions__type_name, 1},
{ "_arrow_dataset___ParquetFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___ParquetFileWriteOptions__update, 3},
{ "_arrow_dataset___IpcFileWriteOptions__update2", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update2, 4},
@@ -6668,6 +6684,7 @@
{ "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 3},
{ "_arrow_dataset___FragmentScanOptions__type_name", (DL_FUNC) &_arrow_dataset___FragmentScanOptions__type_name, 1},
{ "_arrow_dataset___CsvFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___CsvFragmentScanOptions__Make, 2},
+ { "_arrow_dataset___ParquetFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___ParquetFragmentScanOptions__Make, 3},
{ "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1},
{ "_arrow_dataset___DirectoryPartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 1},
{ "_arrow_dataset___HivePartitioning", (DL_FUNC) &_arrow_dataset___HivePartitioning, 2},
diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp
index b7b4800..c8fdb7a 100644
--- a/r/src/dataset.cpp
+++ b/r/src/dataset.cpp
@@ -25,6 +25,7 @@
#include <arrow/table.h>
#include <arrow/util/checked_cast.h>
#include <arrow/util/iterator.h>
+#include <parquet/properties.h>
namespace ds = ::arrow::dataset;
namespace fs = ::arrow::fs;
@@ -227,11 +228,10 @@
// [[dataset::export]]
std::shared_ptr<ds::ParquetFileFormat> dataset___ParquetFileFormat__Make(
- bool use_buffered_stream, int64_t buffer_size, cpp11::strings dict_columns) {
+ const std::shared_ptr<ds::ParquetFragmentScanOptions>& options,
+ cpp11::strings dict_columns) {
auto fmt = std::make_shared<ds::ParquetFileFormat>();
-
- fmt->reader_options.use_buffered_stream = use_buffered_stream;
- fmt->reader_options.buffer_size = buffer_size;
+ fmt->default_fragment_scan_options = std::move(options);
auto dict_columns_vector = cpp11::as_cpp<std::vector<std::string>>(dict_columns);
auto& d = fmt->reader_options.dict_columns;
@@ -295,7 +295,7 @@
return format;
}
-// FragmentScanOptions, CsvFragmentScanOptions
+// FragmentScanOptions, CsvFragmentScanOptions, ParquetFragmentScanOptions
// [[dataset::export]]
std::string dataset___FragmentScanOptions__type_name(
@@ -313,6 +313,21 @@
return options;
}
+// [[dataset::export]]
+std::shared_ptr<ds::ParquetFragmentScanOptions>
+dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buffer_size,
+ bool pre_buffer) {
+ auto options = std::make_shared<ds::ParquetFragmentScanOptions>();
+ if (use_buffered_stream) {
+ options->reader_properties->enable_buffered_stream();
+ } else {
+ options->reader_properties->disable_buffered_stream();
+ }
+ options->reader_properties->set_buffer_size(buffer_size);
+ options->arrow_reader_properties->set_pre_buffer(pre_buffer);
+ return options;
+}
+
// DirectoryPartitioning, HivePartitioning
// [[dataset::export]]