ARROW-11972: [C++][R][Python][Dataset] Extract IPC/Parquet fragment scan options

Closes #9790 from lidavidm/arrow-11972

Authored-by: David Li <li.davidm96@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h
index b28bf7a..a5ac474 100644
--- a/cpp/src/arrow/dataset/dataset_internal.h
+++ b/cpp/src/arrow/dataset/dataset_internal.h
@@ -191,7 +191,7 @@
 ///     but of the wrong type, an error is returned.
 template <typename T>
 arrow::Result<std::shared_ptr<T>> GetFragmentScanOptions(
-    const std::string& type_name, ScanOptions* scan_options,
+    const std::string& type_name, const ScanOptions* scan_options,
     const std::shared_ptr<FragmentScanOptions>& default_options) {
   auto source = default_options;
   if (scan_options && scan_options->fragment_scan_options) {
diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc
index a81e8b7..24ea6e3 100644
--- a/cpp/src/arrow/dataset/file_ipc.cc
+++ b/cpp/src/arrow/dataset/file_ipc.cc
@@ -29,6 +29,7 @@
 #include "arrow/ipc/writer.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
 
 namespace arrow {
 
@@ -81,15 +82,28 @@
 
   Result<RecordBatchIterator> Execute() override {
     struct Impl {
-      static Result<RecordBatchIterator> Make(
-          const FileSource& source, std::vector<std::string> materialized_fields,
-          MemoryPool* pool) {
+      static Result<RecordBatchIterator> Make(const FileSource& source,
+                                              FileFormat* format,
+                                              const ScanOptions* scan_options) {
         ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source));
 
-        auto options = default_read_options();
-        options.memory_pool = pool;
-        ARROW_ASSIGN_OR_RAISE(options.included_fields,
-                              GetIncludedFields(*reader->schema(), materialized_fields));
+        ARROW_ASSIGN_OR_RAISE(
+            auto ipc_scan_options,
+            GetFragmentScanOptions<IpcFragmentScanOptions>(
+                kIpcTypeName, scan_options, format->default_fragment_scan_options));
+        auto options = ipc_scan_options->options ? *ipc_scan_options->options
+                                                 : default_read_options();
+        options.memory_pool = scan_options->pool;
+        options.use_threads = false;
+        if (!options.included_fields.empty()) {
+          // Cannot set them here
+          ARROW_LOG(WARNING) << "IpcFragmentScanOptions.options->included_fields was set "
+                                "but will be ignored; included_fields are derived from "
+                                "fields referenced by the scan";
+        }
+        ARROW_ASSIGN_OR_RAISE(
+            options.included_fields,
+            GetIncludedFields(*reader->schema(), scan_options->MaterializedFields()));
 
         ARROW_ASSIGN_OR_RAISE(reader, OpenReader(source, options));
         return RecordBatchIterator(Impl{std::move(reader), 0});
@@ -107,7 +121,9 @@
       int i_;
     };
 
-    return Impl::Make(source_, options_->MaterializedFields(), options_->pool);
+    return Impl::Make(
+        source_, internal::checked_pointer_cast<FileFragment>(fragment_)->format().get(),
+        options_.get());
   }
 
  private:
diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h
index cbfb6b8..a7bcd04 100644
--- a/cpp/src/arrow/dataset/file_ipc.h
+++ b/cpp/src/arrow/dataset/file_ipc.h
@@ -31,10 +31,12 @@
 namespace arrow {
 namespace dataset {
 
+constexpr char kIpcTypeName[] = "ipc";
+
 /// \brief A FileFormat implementation that reads from and writes to Ipc files
 class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
  public:
-  std::string type_name() const override { return "ipc"; }
+  std::string type_name() const override { return kIpcTypeName; }
 
   bool Equals(const FileFormat& other) const override {
     return type_name() == other.type_name();
@@ -59,6 +61,16 @@
   std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
 };
 
+/// \brief Per-scan options for IPC fragments
+class ARROW_DS_EXPORT IpcFragmentScanOptions : public FragmentScanOptions {
+ public:
+  std::string type_name() const override { return kIpcTypeName; }
+
+  /// Options passed to the IPC file reader.
+  /// included_fields, memory_pool, and use_threads are ignored.
+  std::shared_ptr<ipc::IpcReadOptions> options;
+};
+
 class ARROW_DS_EXPORT IpcFileWriteOptions : public FileWriteOptions {
  public:
   /// Options passed to ipc::MakeFileWriter. use_threads is ignored
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc
index 8a5fd02..502b61c 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -134,6 +134,28 @@
   ASSERT_EQ(row_count, kNumRows);
 }
 
+TEST_F(TestIpcFileFormat, FragmentScanOptions) {
+  auto reader = GetRecordBatchReader(
+      // ARROW-12077: on Windows/mimalloc/release, nullable list column leads to crash
+      schema({field("list", list(float64()), false,
+                    key_value_metadata({{"max_length", "1"}})),
+              field("f64", float64())}));
+  auto source = GetFileSource(reader.get());
+
+  SetSchema(reader->schema()->fields());
+  ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+
+  // Set scan options that ensure reading fails
+  auto fragment_scan_options = std::make_shared<IpcFragmentScanOptions>();
+  fragment_scan_options->options = std::make_shared<ipc::IpcReadOptions>();
+  fragment_scan_options->options->max_recursion_depth = 0;
+  opts_->fragment_scan_options = fragment_scan_options;
+  ASSERT_OK_AND_ASSIGN(auto scan_tasks, fragment->Scan(opts_));
+  ASSERT_OK_AND_ASSIGN(auto scan_task, scan_tasks.Next());
+  ASSERT_OK_AND_ASSIGN(auto batches, scan_task->Execute());
+  ASSERT_RAISES(Invalid, batches.Next());
+}
+
 TEST_F(TestIpcFileFormat, ScanRecordBatchReaderWithVirtualColumn) {
   auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
   auto source = GetFileSource(reader.get());
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index d255787..8caae94 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -128,15 +128,18 @@
 };
 
 static parquet::ReaderProperties MakeReaderProperties(
-    const ParquetFileFormat& format, MemoryPool* pool = default_memory_pool()) {
+    const ParquetFileFormat& format, ParquetFragmentScanOptions* parquet_scan_options,
+    MemoryPool* pool = default_memory_pool()) {
+  // Can't mutate pool after construction
   parquet::ReaderProperties properties(pool);
-  if (format.reader_options.use_buffered_stream) {
+  if (parquet_scan_options->reader_properties->is_buffered_stream_enabled()) {
     properties.enable_buffered_stream();
   } else {
     properties.disable_buffered_stream();
   }
-  properties.set_buffer_size(format.reader_options.buffer_size);
-  properties.file_decryption_properties(format.reader_options.file_decryption_properties);
+  properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());
+  properties.file_decryption_properties(
+      parquet_scan_options->reader_properties->file_decryption_properties());
   return properties;
 }
 
@@ -249,24 +252,23 @@
       checked_cast<const ParquetFileFormat&>(other).reader_options;
 
   // FIXME implement comparison for decryption options
-  // FIXME extract these to scan time options so comparison is unnecessary
-  return reader_options.use_buffered_stream == other_reader_options.use_buffered_stream &&
-         reader_options.buffer_size == other_reader_options.buffer_size &&
-         reader_options.dict_columns == other_reader_options.dict_columns;
+  return reader_options.dict_columns == other_reader_options.dict_columns;
 }
 
 ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties) {
-  reader_options.use_buffered_stream = reader_properties.is_buffered_stream_enabled();
-  reader_options.buffer_size = reader_properties.buffer_size();
-  reader_options.file_decryption_properties =
-      reader_properties.file_decryption_properties();
+  auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
+  *parquet_scan_options->reader_properties = reader_properties;
+  default_fragment_scan_options = std::move(parquet_scan_options);
 }
 
 Result<bool> ParquetFileFormat::IsSupported(const FileSource& source) const {
   try {
     ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
-    auto reader =
-        parquet::ParquetFileReader::Open(std::move(input), MakeReaderProperties(*this));
+    ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options,
+                          GetFragmentScanOptions<ParquetFragmentScanOptions>(
+                              kParquetTypeName, nullptr, default_fragment_scan_options));
+    auto reader = parquet::ParquetFileReader::Open(
+        std::move(input), MakeReaderProperties(*this, parquet_scan_options.get()));
     std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
     return metadata != nullptr && metadata->can_decompress();
   } catch (const ::parquet::ParquetInvalidOrCorruptedFileException& e) {
@@ -290,8 +292,11 @@
 
 Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader(
     const FileSource& source, ScanOptions* options) const {
+  ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options,
+                        GetFragmentScanOptions<ParquetFragmentScanOptions>(
+                            kParquetTypeName, options, default_fragment_scan_options));
   MemoryPool* pool = options ? options->pool : default_memory_pool();
-  auto properties = MakeReaderProperties(*this, pool);
+  auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), pool);
 
   ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
   std::unique_ptr<parquet::ParquetFileReader> reader;
@@ -310,7 +315,8 @@
   }
 
   if (options && !options->use_threads) {
-    arrow_properties.set_use_threads(reader_options.enable_parallel_column_conversion);
+    arrow_properties.set_use_threads(
+        parquet_scan_options->enable_parallel_column_conversion);
   }
 
   std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
@@ -356,15 +362,21 @@
   auto column_projection = InferColumnProjection(*reader, *options);
   ScanTaskVector tasks(row_groups.size());
 
+  ARROW_ASSIGN_OR_RAISE(
+      auto parquet_scan_options,
+      GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
+                                                         default_fragment_scan_options));
   std::shared_ptr<std::once_flag> pre_buffer_once = nullptr;
-  if (reader_options.pre_buffer) {
+  if (parquet_scan_options->arrow_reader_properties->pre_buffer()) {
     pre_buffer_once = std::make_shared<std::once_flag>();
   }
 
   for (size_t i = 0; i < row_groups.size(); ++i) {
     tasks[i] = std::make_shared<ParquetScanTask>(
         row_groups[i], column_projection, reader, pre_buffer_once, row_groups,
-        reader_options.io_context, reader_options.cache_options, options, fragment);
+        parquet_scan_options->arrow_reader_properties->io_context(),
+        parquet_scan_options->arrow_reader_properties->cache_options(), options,
+        fragment);
   }
 
   return MakeVectorIterator(std::move(tasks));
@@ -587,6 +599,16 @@
 }
 
 //
+// ParquetFragmentScanOptions
+//
+
+ParquetFragmentScanOptions::ParquetFragmentScanOptions() {
+  reader_properties = std::make_shared<parquet::ReaderProperties>();
+  arrow_reader_properties =
+      std::make_shared<parquet::ArrowReaderProperties>(/*use_threads=*/false);
+}
+
+//
 // ParquetDatasetFactory
 //
 
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index 869857e..fa0d7de 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -57,6 +57,8 @@
 namespace arrow {
 namespace dataset {
 
+constexpr char kParquetTypeName[] = "parquet";
+
 /// \brief A FileFormat implementation that reads from Parquet files
 class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
  public:
@@ -66,45 +68,23 @@
   /// memory_pool will be ignored.
   explicit ParquetFileFormat(const parquet::ReaderProperties& reader_properties);
 
-  std::string type_name() const override { return "parquet"; }
+  std::string type_name() const override { return kParquetTypeName; }
 
   bool splittable() const override { return true; }
 
   bool Equals(const FileFormat& other) const override;
 
   struct ReaderOptions {
-    /// \defgroup parquet-file-format-reader-properties properties which correspond to
-    /// members of parquet::ReaderProperties.
-    ///
-    /// We don't embed parquet::ReaderProperties directly because we get memory_pool from
-    /// ScanOptions at scan time and provide differing defaults.
-    ///
-    /// @{
-    bool use_buffered_stream = false;
-    int64_t buffer_size = 1 << 13;
-    std::shared_ptr<parquet::FileDecryptionProperties> file_decryption_properties;
-    /// @}
-
     /// \defgroup parquet-file-format-arrow-reader-properties properties which correspond
     /// to members of parquet::ArrowReaderProperties.
     ///
-    /// We don't embed parquet::ReaderProperties directly because we get batch_size from
-    /// ScanOptions at scan time, and we will never pass use_threads == true (since we
-    /// defer parallelization of the scan). Additionally column names (rather than
-    /// indices) are used to indicate dictionary columns.
+    /// We don't embed parquet::ReaderProperties directly because column names (rather
+    /// than indices) are used to indicate dictionary columns, and other options are
+    /// deferred to scan time.
     ///
     /// @{
     std::unordered_set<std::string> dict_columns;
-    bool pre_buffer = false;
-    arrow::io::CacheOptions cache_options = arrow::io::CacheOptions::Defaults();
-    arrow::io::IOContext io_context;
     /// @}
-
-    /// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
-    /// scan is already parallelized across input files to avoid thread contention. This
-    /// option will be removed after support is added for simultaneous parallelization
-    /// across files and columns.
-    bool enable_parallel_column_conversion = false;
   } reader_options;
 
   Result<bool> IsSupported(const FileSource& source) const override;
@@ -206,6 +186,27 @@
   friend class ParquetDatasetFactory;
 };
 
+/// \brief Per-scan options for Parquet fragments
+class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
+ public:
+  ParquetFragmentScanOptions();
+  std::string type_name() const override { return kParquetTypeName; }
+
+  /// Reader properties. Not all properties are respected: memory_pool comes from
+  /// ScanOptions.
+  std::shared_ptr<parquet::ReaderProperties> reader_properties;
+  /// Arrow reader properties. Not all properties are respected: batch_size comes from
+  /// ScanOptions, and use_threads will be overridden based on
+  /// enable_parallel_column_conversion. Additionally, dictionary columns come from
+  /// ParquetFileFormat::ReaderOptions::dict_columns.
+  std::shared_ptr<parquet::ArrowReaderProperties> arrow_reader_properties;
+  /// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
+  /// scan is already parallelized across input files to avoid thread contention. This
+  /// option will be removed after support is added for simultaneous parallelization
+  /// across files and columns.
+  bool enable_parallel_column_conversion = false;
+};
+
 class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
  public:
   std::shared_ptr<parquet::WriterProperties> writer_properties;
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc
index cf14a2b..bb06e7f 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -272,8 +272,10 @@
   SetSchema(reader->schema()->fields());
   SetFilter(literal(true));
 
-  format_->reader_options.pre_buffer = true;
   ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+  auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
+  fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
+  opts_->fragment_scan_options = fragment_scan_options;
   ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_));
 
   int64_t task_count = 0;
@@ -636,6 +638,7 @@
 
   options->arrow_writer_properties = parquet::ArrowWriterProperties::Builder()
                                          .coerce_timestamps(coerce_timestamps_to)
+                                         ->allow_truncated_timestamps()
                                          ->build();
 
   EXPECT_OK_AND_ASSIGN(auto writer, format_->MakeWriter(sink, reader->schema(), options));
diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h
index a6e761c..72cde36 100644
--- a/cpp/src/arrow/dataset/test_util.h
+++ b/cpp/src/arrow/dataset/test_util.h
@@ -42,6 +42,7 @@
 #include "arrow/table.h"
 #include "arrow/testing/generator.h"
 #include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
 #include "arrow/util/io_util.h"
 #include "arrow/util/iterator.h"
 #include "arrow/util/logging.h"
@@ -102,7 +103,7 @@
 
 std::unique_ptr<RecordBatchReader> MakeGeneratedRecordBatch(
     std::shared_ptr<Schema> schema, int64_t batch_size, int64_t batch_repetitions) {
-  auto batch = ConstantArrayGenerator::Zeroes(batch_size, schema);
+  auto batch = random::GenerateBatch(schema->fields(), batch_size, /*seed=*/0);
   int64_t i = 0;
   return MakeGeneratedRecordBatch(
       schema, [batch, i, batch_repetitions](std::shared_ptr<RecordBatch>* out) mutable {
diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h
index d148d4e..6ba65a6 100644
--- a/cpp/src/arrow/dataset/type_fwd.h
+++ b/cpp/src/arrow/dataset/type_fwd.h
@@ -65,9 +65,11 @@
 class IpcFileFormat;
 class IpcFileWriter;
 class IpcFileWriteOptions;
+class IpcFragmentScanOptions;
 
 class ParquetFileFormat;
 class ParquetFileFragment;
+class ParquetFragmentScanOptions;
 class ParquetFileWriter;
 class ParquetFileWriteOptions;
 
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index e0096c7..3320b47 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -23,6 +23,7 @@
 from cython.operator cimport dereference as deref
 
 import os
+import warnings
 
 import pyarrow as pa
 from pyarrow.lib cimport *
@@ -1113,10 +1114,14 @@
 
     @staticmethod
     cdef wrap(const shared_ptr[CFragmentScanOptions]& sp):
+        if not sp:
+            return None
+
         type_name = frombytes(sp.get().type_name())
 
         classes = {
             'csv': CsvFragmentScanOptions,
+            'parquet': ParquetFragmentScanOptions,
         }
 
         class_ = classes.get(type_name, None)
@@ -1287,56 +1292,20 @@
 
     Parameters
     ----------
-    use_buffered_stream : bool, default False
-        Read files through buffered input streams rather than loading entire
-        row groups at once. This may be enabled to reduce memory overhead.
-        Disabled by default.
-    buffer_size : int, default 8192
-        Size of buffered stream, if enabled. Default is 8KB.
     dictionary_columns : list of string, default None
         Names of columns which should be dictionary encoded as
         they are read.
-    pre_buffer : bool, default False
-        If enabled, pre-buffer the raw Parquet data instead of issuing one
-        read per column chunk. This can improve performance on high-latency
-        filesystems.
-    enable_parallel_column_conversion : bool, default False
-        EXPERIMENTAL: Parallelize conversion across columns. This option is
-        ignored if a scan is already parallelized across input files to avoid
-        thread contention. This option will be removed after support is added
-        for simultaneous parallelization across files and columns.
     """
 
     cdef public:
-        bint use_buffered_stream
-        uint32_t buffer_size
         set dictionary_columns
-        bint pre_buffer
-        bint enable_parallel_column_conversion
 
-    def __init__(self, bint use_buffered_stream=False,
-                 buffer_size=8192,
-                 dictionary_columns=None,
-                 bint pre_buffer=False,
-                 bint enable_parallel_column_conversion=False):
-        self.use_buffered_stream = use_buffered_stream
-        if buffer_size <= 0:
-            raise ValueError("Buffer size must be larger than zero")
-        self.buffer_size = buffer_size
+    # Also see _PARQUET_READ_OPTIONS
+    def __init__(self, dictionary_columns=None):
         self.dictionary_columns = set(dictionary_columns or set())
-        self.pre_buffer = pre_buffer
-        self.enable_parallel_column_conversion = \
-            enable_parallel_column_conversion
 
     def equals(self, ParquetReadOptions other):
-        return (
-            self.use_buffered_stream == other.use_buffered_stream and
-            self.buffer_size == other.buffer_size and
-            self.dictionary_columns == other.dictionary_columns and
-            self.pre_buffer == other.pre_buffer and
-            self.enable_parallel_column_conversion ==
-            other.enable_parallel_column_conversion
-        )
+        return self.dictionary_columns == other.dictionary_columns
 
     def __eq__(self, other):
         try:
@@ -1344,6 +1313,10 @@
         except TypeError:
             return False
 
+    def __repr__(self):
+        return (f"<ParquetReadOptions"
+                f" dictionary_columns={self.dictionary_columns}>")
+
 
 cdef class ParquetFileWriteOptions(FileWriteOptions):
 
@@ -1425,38 +1398,74 @@
         self._set_arrow_properties()
 
 
+cdef set _PARQUET_READ_OPTIONS = {'dictionary_columns'}
+
+
 cdef class ParquetFileFormat(FileFormat):
 
     cdef:
         CParquetFileFormat* parquet_format
 
-    def __init__(self, read_options=None):
+    def __init__(self, read_options=None,
+                 default_fragment_scan_options=None, **kwargs):
         cdef:
             shared_ptr[CParquetFileFormat] wrapped
             CParquetFileFormatReaderOptions* options
 
-        # Read options
+        # Read/scan options
+        read_options_args = {option: kwargs[option] for option in kwargs
+                             if option in _PARQUET_READ_OPTIONS}
+        scan_args = {option: kwargs[option] for option in kwargs
+                     if option not in _PARQUET_READ_OPTIONS}
+        if read_options and read_options_args:
+            duplicates = ', '.join(sorted(read_options_args))
+            raise ValueError(f'If `read_options` is given, '
+                             f'cannot specify {duplicates}')
+        if default_fragment_scan_options and scan_args:
+            duplicates = ', '.join(sorted(scan_args))
+            raise ValueError(f'If `default_fragment_scan_options` is given, '
+                             f'cannot specify {duplicates}')
 
         if read_options is None:
-            read_options = ParquetReadOptions()
+            read_options = ParquetReadOptions(**read_options_args)
         elif isinstance(read_options, dict):
-            read_options = ParquetReadOptions(**read_options)
+            # For backwards compatibility
+            duplicates = []
+            for option, value in read_options.items():
+                if option in _PARQUET_READ_OPTIONS:
+                    read_options_args[option] = value
+                else:
+                    duplicates.append(option)
+                    scan_args[option] = value
+            if duplicates:
+                duplicates = ", ".join(duplicates)
+                warnings.warn(f'The scan options {duplicates} should be '
+                              'specified directly as keyword arguments')
+            read_options = ParquetReadOptions(**read_options_args)
         elif not isinstance(read_options, ParquetReadOptions):
             raise TypeError('`read_options` must be either a dictionary or an '
                             'instance of ParquetReadOptions')
 
+        if default_fragment_scan_options is None:
+            default_fragment_scan_options = ParquetFragmentScanOptions(
+                **scan_args)
+        elif isinstance(default_fragment_scan_options, dict):
+            default_fragment_scan_options = ParquetFragmentScanOptions(
+                **default_fragment_scan_options)
+        elif not isinstance(default_fragment_scan_options,
+                            ParquetFragmentScanOptions):
+            raise TypeError('`default_fragment_scan_options` must be either a '
+                            'dictionary or an instance of '
+                            'ParquetFragmentScanOptions')
+
         wrapped = make_shared[CParquetFileFormat]()
         options = &(wrapped.get().reader_options)
-        options.use_buffered_stream = read_options.use_buffered_stream
-        options.buffer_size = read_options.buffer_size
-        options.pre_buffer = read_options.pre_buffer
-        options.enable_parallel_column_conversion = \
-            read_options.enable_parallel_column_conversion
         if read_options.dictionary_columns is not None:
             for column in read_options.dictionary_columns:
                 options.dict_columns.insert(tobytes(column))
 
         self.init(<shared_ptr[CFileFormat]> wrapped)
+        self.default_fragment_scan_options = default_fragment_scan_options
 
     cdef void init(self, const shared_ptr[CFileFormat]& sp):
         FileFormat.init(self, sp)
@@ -1467,14 +1476,8 @@
         cdef CParquetFileFormatReaderOptions* options
         options = &self.parquet_format.reader_options
         return ParquetReadOptions(
-            use_buffered_stream=options.use_buffered_stream,
-            buffer_size=options.buffer_size,
             dictionary_columns={frombytes(col)
                                 for col in options.dict_columns},
-            pre_buffer=options.pre_buffer,
-            enable_parallel_column_conversion=(
-                options.enable_parallel_column_conversion
-            )
         )
 
     def make_write_options(self, **kwargs):
@@ -1482,13 +1485,25 @@
         (<ParquetFileWriteOptions> opts).update(**kwargs)
         return opts
 
+    cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
+        if options.type_name == 'parquet':
+            self.parquet_format.default_fragment_scan_options = options.wrapped
+        else:
+            super()._set_default_fragment_scan_options(options)
+
     def equals(self, ParquetFileFormat other):
         return (
-            self.read_options.equals(other.read_options)
+            self.read_options.equals(other.read_options) and
+            self.default_fragment_scan_options ==
+            other.default_fragment_scan_options
         )
 
     def __reduce__(self):
-        return ParquetFileFormat, (self.read_options, )
+        return ParquetFileFormat, (self.read_options,
+                                   self.default_fragment_scan_options)
+
+    def __repr__(self):
+        return f"<ParquetFileFormat read_options={self.read_options}>"
 
     def make_fragment(self, file, filesystem=None,
                       Expression partition_expression=None, row_groups=None):
@@ -1513,6 +1528,109 @@
         return Fragment.wrap(move(c_fragment))
 
 
+cdef class ParquetFragmentScanOptions(FragmentScanOptions):
+    """Scan-specific options for Parquet fragments.
+
+    Parameters
+    ----------
+    use_buffered_stream : bool, default False
+        Read files through buffered input streams rather than loading entire
+        row groups at once. This may be enabled to reduce memory overhead.
+        Disabled by default.
+    buffer_size : int, default 8192
+        Size of buffered stream, if enabled. Default is 8KB.
+    pre_buffer : bool, default False
+        If enabled, pre-buffer the raw Parquet data instead of issuing one
+        read per column chunk. This can improve performance on high-latency
+        filesystems.
+    enable_parallel_column_conversion : bool, default False
+        EXPERIMENTAL: Parallelize conversion across columns. This option is
+        ignored if a scan is already parallelized across input files to avoid
+        thread contention. This option will be removed after support is added
+        for simultaneous parallelization across files and columns.
+    """
+
+    cdef:
+        CParquetFragmentScanOptions* parquet_options
+
+    # Avoid mistakingly creating attributes
+    __slots__ = ()
+
+    def __init__(self, bint use_buffered_stream=False,
+                 buffer_size=8192,
+                 bint pre_buffer=False,
+                 bint enable_parallel_column_conversion=False):
+        self.init(shared_ptr[CFragmentScanOptions](
+            new CParquetFragmentScanOptions()))
+        self.use_buffered_stream = use_buffered_stream
+        self.buffer_size = buffer_size
+        self.pre_buffer = pre_buffer
+        self.enable_parallel_column_conversion = \
+            enable_parallel_column_conversion
+
+    cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
+        FragmentScanOptions.init(self, sp)
+        self.parquet_options = <CParquetFragmentScanOptions*> sp.get()
+
+    cdef CReaderProperties* reader_properties(self):
+        return self.parquet_options.reader_properties.get()
+
+    cdef ArrowReaderProperties* arrow_reader_properties(self):
+        return self.parquet_options.arrow_reader_properties.get()
+
+    @property
+    def use_buffered_stream(self):
+        return self.reader_properties().is_buffered_stream_enabled()
+
+    @use_buffered_stream.setter
+    def use_buffered_stream(self, bint use_buffered_stream):
+        if use_buffered_stream:
+            self.reader_properties().enable_buffered_stream()
+        else:
+            self.reader_properties().disable_buffered_stream()
+
+    @property
+    def buffer_size(self):
+        return self.reader_properties().buffer_size()
+
+    @buffer_size.setter
+    def buffer_size(self, buffer_size):
+        if buffer_size <= 0:
+            raise ValueError("Buffer size must be larger than zero")
+        self.reader_properties().set_buffer_size(buffer_size)
+
+    @property
+    def pre_buffer(self):
+        return self.arrow_reader_properties().pre_buffer()
+
+    @pre_buffer.setter
+    def pre_buffer(self, bint pre_buffer):
+        self.arrow_reader_properties().set_pre_buffer(pre_buffer)
+
+    @property
+    def enable_parallel_column_conversion(self):
+        return self.parquet_options.enable_parallel_column_conversion
+
+    @enable_parallel_column_conversion.setter
+    def enable_parallel_column_conversion(
+            self, bint enable_parallel_column_conversion):
+        self.parquet_options.enable_parallel_column_conversion = \
+            enable_parallel_column_conversion
+
+    def equals(self, ParquetFragmentScanOptions other):
+        return (
+            self.use_buffered_stream == other.use_buffered_stream and
+            self.buffer_size == other.buffer_size and
+            self.pre_buffer == other.pre_buffer and
+            self.enable_parallel_column_conversion ==
+            other.enable_parallel_column_conversion)
+
+    def __reduce__(self):
+        return ParquetFragmentScanOptions, (
+            self.use_buffered_stream, self.buffer_size, self.pre_buffer,
+            self.enable_parallel_column_conversion)
+
+
 cdef class IpcFileWriteOptions(FileWriteOptions):
 
     def __init__(self):
@@ -1543,14 +1661,28 @@
     __slots__ = ()
 
     def __init__(self, ParseOptions parse_options=None,
+                 default_fragment_scan_options=None,
                  ConvertOptions convert_options=None,
                  ReadOptions read_options=None):
         self.init(shared_ptr[CFileFormat](new CCsvFileFormat()))
         if parse_options is not None:
             self.parse_options = parse_options
         if convert_options is not None or read_options is not None:
+            if default_fragment_scan_options:
+                raise ValueError('If `default_fragment_scan_options` is '
+                                 'given, cannot specify convert_options '
+                                 'or read_options')
             self.default_fragment_scan_options = CsvFragmentScanOptions(
                 convert_options=convert_options, read_options=read_options)
+        elif isinstance(default_fragment_scan_options, dict):
+            self.default_fragment_scan_options = CsvFragmentScanOptions(
+                **default_fragment_scan_options)
+        elif isinstance(default_fragment_scan_options, CsvFragmentScanOptions):
+            self.default_fragment_scan_options = default_fragment_scan_options
+        elif default_fragment_scan_options is not None:
+            raise TypeError('`default_fragment_scan_options` must be either '
+                            'a dictionary or an instance of '
+                            'CsvFragmentScanOptions')
 
     cdef void init(self, const shared_ptr[CFileFormat]& sp):
         FileFormat.init(self, sp)
@@ -1574,10 +1706,14 @@
             super()._set_default_fragment_scan_options(options)
 
     def equals(self, CsvFileFormat other):
-        return self.parse_options.equals(other.parse_options)
+        return (
+            self.parse_options.equals(other.parse_options) and
+            self.default_fragment_scan_options ==
+            other.default_fragment_scan_options)
 
     def __reduce__(self):
-        return CsvFileFormat, (self.parse_options,)
+        return CsvFileFormat, (self.parse_options,
+                               self.default_fragment_scan_options)
 
     def __repr__(self):
         return f"<CsvFileFormat parse_options={self.parse_options}>"
@@ -1622,8 +1758,10 @@
         self.csv_options.read_options = read_options.options
 
     def equals(self, CsvFragmentScanOptions other):
-        return (self.convert_options.equals(other.convert_options) and
-                self.read_options.equals(other.read_options))
+        return (
+            other and
+            self.convert_options.equals(other.convert_options) and
+            self.read_options.equals(other.read_options))
 
     def __reduce__(self):
         return CsvFragmentScanOptions, (self.convert_options,
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 8fa1c85..96bfd77 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -329,6 +329,7 @@
                                        uint32_t* metadata_len)
 
     cdef cppclass CReaderProperties" parquet::ReaderProperties":
+        c_bool is_buffered_stream_enabled() const
         void enable_buffered_stream()
         void disable_buffered_stream()
         void set_buffer_size(int64_t buf_size)
@@ -342,6 +343,8 @@
         c_bool read_dictionary()
         void set_batch_size(int64_t batch_size)
         int64_t batch_size()
+        void set_pre_buffer(c_bool pre_buffer)
+        c_bool pre_buffer() const
 
     ArrowReaderProperties default_arrow_reader_properties()
 
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 0c65070..615cb25 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -43,6 +43,7 @@
     ParquetFileFormat,
     ParquetFileFragment,
     ParquetFileWriteOptions,
+    ParquetFragmentScanOptions,
     ParquetReadOptions,
     Partitioning,
     PartitioningFactory,
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd
index db2e73a..06ec69c 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -237,11 +237,7 @@
 
     cdef cppclass CParquetFileFormatReaderOptions \
             "arrow::dataset::ParquetFileFormat::ReaderOptions":
-        c_bool use_buffered_stream
-        int64_t buffer_size
         unordered_set[c_string] dict_columns
-        c_bool pre_buffer
-        c_bool enable_parallel_column_conversion
 
     cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"(
             CFileFormat):
@@ -252,6 +248,12 @@
             shared_ptr[CSchema] physical_schema,
             vector[int] row_groups)
 
+    cdef cppclass CParquetFragmentScanOptions \
+            "arrow::dataset::ParquetFragmentScanOptions"(CFragmentScanOptions):
+        shared_ptr[CReaderProperties] reader_properties
+        shared_ptr[ArrowReaderProperties] arrow_reader_properties
+        c_bool enable_parallel_column_conversion
+
     cdef cppclass CIpcFileWriteOptions \
             "arrow::dataset::IpcFileWriteOptions"(CFileWriteOptions):
         pass
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 1b0a336..ed8d387 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -1536,7 +1536,7 @@
             self._enable_parallel_column_conversion = True
             read_options.update(enable_parallel_column_conversion=True)
 
-            parquet_format = ds.ParquetFileFormat(read_options=read_options)
+            parquet_format = ds.ParquetFileFormat(**read_options)
             fragment = parquet_format.make_fragment(single_file, filesystem)
 
             self._dataset = ds.FileSystemDataset(
@@ -1548,7 +1548,7 @@
         else:
             self._enable_parallel_column_conversion = False
 
-        parquet_format = ds.ParquetFileFormat(read_options=read_options)
+        parquet_format = ds.ParquetFileFormat(**read_options)
 
         # check partitioning to enable dictionary encoding
         if partitioning == "hive":
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 5f4bcac..31f4e08 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -519,32 +519,38 @@
 
 def test_parquet_read_options():
     opts1 = ds.ParquetReadOptions()
-    opts2 = ds.ParquetReadOptions(buffer_size=4096,
-                                  dictionary_columns=['a', 'b'])
-    opts3 = ds.ParquetReadOptions(buffer_size=2**13, use_buffered_stream=True,
-                                  dictionary_columns={'a', 'b'})
-    opts4 = ds.ParquetReadOptions(buffer_size=2**13, pre_buffer=True,
-                                  dictionary_columns={'a', 'b'})
+    opts2 = ds.ParquetReadOptions(dictionary_columns=['a', 'b'])
+
+    assert opts1.dictionary_columns == set()
+
+    assert opts2.dictionary_columns == {'a', 'b'}
+
+    assert opts1 == opts1
+    assert opts1 != opts2
+
+
+def test_parquet_scan_options():
+    opts1 = ds.ParquetFragmentScanOptions()
+    opts2 = ds.ParquetFragmentScanOptions(buffer_size=4096)
+    opts3 = ds.ParquetFragmentScanOptions(
+        buffer_size=2**13, use_buffered_stream=True)
+    opts4 = ds.ParquetFragmentScanOptions(buffer_size=2**13, pre_buffer=True)
 
     assert opts1.use_buffered_stream is False
     assert opts1.buffer_size == 2**13
     assert opts1.pre_buffer is False
-    assert opts1.dictionary_columns == set()
 
     assert opts2.use_buffered_stream is False
     assert opts2.buffer_size == 2**12
     assert opts2.pre_buffer is False
-    assert opts2.dictionary_columns == {'a', 'b'}
 
     assert opts3.use_buffered_stream is True
     assert opts3.buffer_size == 2**13
     assert opts3.pre_buffer is False
-    assert opts3.dictionary_columns == {'a', 'b'}
 
     assert opts4.use_buffered_stream is False
     assert opts4.buffer_size == 2**13
     assert opts4.pre_buffer is True
-    assert opts4.dictionary_columns == {'a', 'b'}
 
     assert opts1 == opts1
     assert opts1 != opts2
@@ -563,14 +569,11 @@
         ds.CsvFileFormat(read_options=pa.csv.ReadOptions(
             skip_rows=3, block_size=2**20)),
         ds.ParquetFileFormat(),
+        ds.ParquetFileFormat(dictionary_columns={'a'}),
+        ds.ParquetFileFormat(use_buffered_stream=True),
         ds.ParquetFileFormat(
-            read_options=ds.ParquetReadOptions(use_buffered_stream=True)
-        ),
-        ds.ParquetFileFormat(
-            read_options={
-                'use_buffered_stream': True,
-                'buffer_size': 4096,
-            }
+            use_buffered_stream=True,
+            buffer_size=4096,
         )
     ]
     for file_format in formats:
@@ -584,6 +587,8 @@
             convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)),
         ds.CsvFragmentScanOptions(
             read_options=pa.csv.ReadOptions(block_size=2**16)),
+        ds.ParquetFragmentScanOptions(buffer_size=4096),
+        ds.ParquetFragmentScanOptions(pre_buffer=True),
     ]
     for option in options:
         assert pickle.loads(pickle.dumps(option)) == option
@@ -599,8 +604,8 @@
 @pytest.mark.parametrize('pre_buffer', [False, True])
 def test_filesystem_factory(mockfs, paths_or_selector, pre_buffer):
     format = ds.ParquetFileFormat(
-        read_options=ds.ParquetReadOptions(dictionary_columns={"str"},
-                                           pre_buffer=pre_buffer)
+        read_options=ds.ParquetReadOptions(dictionary_columns={"str"}),
+        pre_buffer=pre_buffer
     )
 
     options = ds.FileSystemFactoryOptions('subdir')
@@ -719,10 +724,10 @@
     ]
     dictionary_format = ds.ParquetFileFormat(
         read_options=ds.ParquetReadOptions(
-            use_buffered_stream=True,
-            buffer_size=4096,
             dictionary_columns=['alpha', 'animal']
-        )
+        ),
+        use_buffered_stream=True,
+        buffer_size=4096,
     )
 
     cases = [
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 9cccec9..9811dc9 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -412,8 +412,8 @@
     .Call(`_arrow_dataset___FileFormat__DefaultWriteOptions`, fmt)
 }
 
-dataset___ParquetFileFormat__Make <- function(use_buffered_stream, buffer_size, dict_columns){
-    .Call(`_arrow_dataset___ParquetFileFormat__Make`, use_buffered_stream, buffer_size, dict_columns)
+dataset___ParquetFileFormat__Make <- function(options, dict_columns){
+    .Call(`_arrow_dataset___ParquetFileFormat__Make`, options, dict_columns)
 }
 
 dataset___FileWriteOptions__type_name <- function(options){
@@ -448,6 +448,10 @@
     .Call(`_arrow_dataset___CsvFragmentScanOptions__Make`, convert_options, read_options)
 }
 
+dataset___ParquetFragmentScanOptions__Make <- function(use_buffered_stream, buffer_size, pre_buffer){
+    .Call(`_arrow_dataset___ParquetFragmentScanOptions__Make`, use_buffered_stream, buffer_size, pre_buffer)
+}
+
 dataset___DirectoryPartitioning <- function(schm){
     .Call(`_arrow_dataset___DirectoryPartitioning`, schm)
 }
diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R
index cd54a30..854672b 100644
--- a/r/R/dataset-format.R
+++ b/r/R/dataset-format.R
@@ -34,11 +34,8 @@
 #' * `...`: Additional format-specific options
 #'
 #'   `format = "parquet"``:
-#'   * `use_buffered_stream`: Read files through buffered input streams rather than
-#'                            loading entire row groups at once. This may be enabled
-#'                            to reduce memory overhead. Disabled by default.
-#'   * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB.
 #'   * `dict_columns`: Names of columns which should be read as dictionaries.
+#'   * Any Parquet options from [FragmentScanOptions].
 #'
 #'   `format = "text"`: see [CsvParseOptions]. Note that you can specify them either
 #'   with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the
@@ -91,10 +88,10 @@
 #' @rdname FileFormat
 #' @export
 ParquetFileFormat <- R6Class("ParquetFileFormat", inherit = FileFormat)
-ParquetFileFormat$create <- function(use_buffered_stream = FALSE,
-                                     buffer_size = 8196,
+ParquetFileFormat$create <- function(...,
                                      dict_columns = character(0)) {
- dataset___ParquetFileFormat__Make(use_buffered_stream, buffer_size, dict_columns)
+ options <- ParquetFragmentScanOptions$create(...)
+ dataset___ParquetFileFormat__Make(options, dict_columns)
 }
 
 #' @usage NULL
@@ -217,9 +214,18 @@
 #' @section Factory:
 #' `FragmentScanOptions$create()` takes the following arguments:
 #' * `format`: A string identifier of the file format. Currently supported values:
+#'   * "parquet"
 #'   * "csv"/"text", aliases for the same format.
 #' * `...`: Additional format-specific options
 #'
+#'   `format = "parquet"``:
+#'   * `use_buffered_stream`: Read files through buffered input streams rather than
+#'                            loading entire row groups at once. This may be enabled
+#'                            to reduce memory overhead. Disabled by default.
+#'   * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB.
+#'   * `pre_buffer`: Pre-buffer the raw Parquet data. This can improve performance
+#'                   on high-latency filesystems. Disabled by default.
+#
 #'   `format = "text"`: see [CsvConvertOptions]. Note that options can only be
 #'   specified with the Arrow C++ library naming. Also, "block_size" from
 #'   [CsvReadOptions] may be given.
@@ -240,6 +246,8 @@
   opt_names <- names(list(...))
   if (format %in% c("csv", "text", "tsv")) {
     CsvFragmentScanOptions$create(...)
+  } else if (format == "parquet") {
+    ParquetFragmentScanOptions$create(...)
   } else {
     stop("Unsupported file format: ", format, call. = FALSE)
   }
@@ -261,6 +269,17 @@
   dataset___CsvFragmentScanOptions__Make(convert_opts, read_opts)
 }
 
+#' @usage NULL
+#' @format NULL
+#' @rdname FragmentScanOptions
+#' @export
+ParquetFragmentScanOptions <- R6Class("ParquetFragmentScanOptions", inherit = FragmentScanOptions)
+ParquetFragmentScanOptions$create <- function(use_buffered_stream = FALSE,
+                                              buffer_size = 8196,
+                                              pre_buffer = FALSE) {
+  dataset___ParquetFragmentScanOptions__Make(use_buffered_stream, buffer_size, pre_buffer)
+}
+
 #' Format-specific write options
 #'
 #' @description
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index d74ac0e..4c2ebed 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1612,17 +1612,16 @@
 
 // dataset.cpp
 #if defined(ARROW_R_WITH_DATASET)
-std::shared_ptr<ds::ParquetFileFormat> dataset___ParquetFileFormat__Make(bool use_buffered_stream, int64_t buffer_size, cpp11::strings dict_columns);
-extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){
+std::shared_ptr<ds::ParquetFileFormat> dataset___ParquetFileFormat__Make(const std::shared_ptr<ds::ParquetFragmentScanOptions>& options, cpp11::strings dict_columns);
+extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP options_sexp, SEXP dict_columns_sexp){
 BEGIN_CPP11
-	arrow::r::Input<bool>::type use_buffered_stream(use_buffered_stream_sexp);
-	arrow::r::Input<int64_t>::type buffer_size(buffer_size_sexp);
+	arrow::r::Input<const std::shared_ptr<ds::ParquetFragmentScanOptions>&>::type options(options_sexp);
 	arrow::r::Input<cpp11::strings>::type dict_columns(dict_columns_sexp);
-	return cpp11::as_sexp(dataset___ParquetFileFormat__Make(use_buffered_stream, buffer_size, dict_columns));
+	return cpp11::as_sexp(dataset___ParquetFileFormat__Make(options, dict_columns));
 END_CPP11
 }
 #else
-extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){
+extern "C" SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP options_sexp, SEXP dict_columns_sexp){
 	Rf_error("Cannot call dataset___ParquetFileFormat__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
 }
 #endif
@@ -1761,6 +1760,23 @@
 
 // dataset.cpp
 #if defined(ARROW_R_WITH_DATASET)
+std::shared_ptr<ds::ParquetFragmentScanOptions> dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buffer_size, bool pre_buffer);
+extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP pre_buffer_sexp){
+BEGIN_CPP11
+	arrow::r::Input<bool>::type use_buffered_stream(use_buffered_stream_sexp);
+	arrow::r::Input<int64_t>::type buffer_size(buffer_size_sexp);
+	arrow::r::Input<bool>::type pre_buffer(pre_buffer_sexp);
+	return cpp11::as_sexp(dataset___ParquetFragmentScanOptions__Make(use_buffered_stream, buffer_size, pre_buffer));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_dataset___ParquetFragmentScanOptions__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP pre_buffer_sexp){
+	Rf_error("Cannot call dataset___ParquetFragmentScanOptions__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. ");
+}
+#endif
+
+// dataset.cpp
+#if defined(ARROW_R_WITH_DATASET)
 std::shared_ptr<ds::DirectoryPartitioning> dataset___DirectoryPartitioning(const std::shared_ptr<arrow::Schema>& schm);
 extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp){
 BEGIN_CPP11
@@ -6659,7 +6675,7 @@
 		{ "_arrow_dataset___FileSystemDatasetFactory__Make3", (DL_FUNC) &_arrow_dataset___FileSystemDatasetFactory__Make3, 4}, 
 		{ "_arrow_dataset___FileFormat__type_name", (DL_FUNC) &_arrow_dataset___FileFormat__type_name, 1}, 
 		{ "_arrow_dataset___FileFormat__DefaultWriteOptions", (DL_FUNC) &_arrow_dataset___FileFormat__DefaultWriteOptions, 1}, 
-		{ "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 3}, 
+		{ "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 2}, 
 		{ "_arrow_dataset___FileWriteOptions__type_name", (DL_FUNC) &_arrow_dataset___FileWriteOptions__type_name, 1}, 
 		{ "_arrow_dataset___ParquetFileWriteOptions__update", (DL_FUNC) &_arrow_dataset___ParquetFileWriteOptions__update, 3}, 
 		{ "_arrow_dataset___IpcFileWriteOptions__update2", (DL_FUNC) &_arrow_dataset___IpcFileWriteOptions__update2, 4}, 
@@ -6668,6 +6684,7 @@
 		{ "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC) &_arrow_dataset___CsvFileFormat__Make, 3}, 
 		{ "_arrow_dataset___FragmentScanOptions__type_name", (DL_FUNC) &_arrow_dataset___FragmentScanOptions__type_name, 1}, 
 		{ "_arrow_dataset___CsvFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___CsvFragmentScanOptions__Make, 2}, 
+		{ "_arrow_dataset___ParquetFragmentScanOptions__Make", (DL_FUNC) &_arrow_dataset___ParquetFragmentScanOptions__Make, 3}, 
 		{ "_arrow_dataset___DirectoryPartitioning", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning, 1}, 
 		{ "_arrow_dataset___DirectoryPartitioning__MakeFactory", (DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 1}, 
 		{ "_arrow_dataset___HivePartitioning", (DL_FUNC) &_arrow_dataset___HivePartitioning, 2}, 
diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp
index b7b4800..c8fdb7a 100644
--- a/r/src/dataset.cpp
+++ b/r/src/dataset.cpp
@@ -25,6 +25,7 @@
 #include <arrow/table.h>
 #include <arrow/util/checked_cast.h>
 #include <arrow/util/iterator.h>
+#include <parquet/properties.h>
 
 namespace ds = ::arrow::dataset;
 namespace fs = ::arrow::fs;
@@ -227,11 +228,10 @@
 
 // [[dataset::export]]
 std::shared_ptr<ds::ParquetFileFormat> dataset___ParquetFileFormat__Make(
-    bool use_buffered_stream, int64_t buffer_size, cpp11::strings dict_columns) {
+    const std::shared_ptr<ds::ParquetFragmentScanOptions>& options,
+    cpp11::strings dict_columns) {
   auto fmt = std::make_shared<ds::ParquetFileFormat>();
-
-  fmt->reader_options.use_buffered_stream = use_buffered_stream;
-  fmt->reader_options.buffer_size = buffer_size;
+  fmt->default_fragment_scan_options = std::move(options);
 
   auto dict_columns_vector = cpp11::as_cpp<std::vector<std::string>>(dict_columns);
   auto& d = fmt->reader_options.dict_columns;
@@ -295,7 +295,7 @@
   return format;
 }
 
-// FragmentScanOptions, CsvFragmentScanOptions
+// FragmentScanOptions, CsvFragmentScanOptions, ParquetFragmentScanOptions
 
 // [[dataset::export]]
 std::string dataset___FragmentScanOptions__type_name(
@@ -313,6 +313,21 @@
   return options;
 }
 
+// [[dataset::export]]
+std::shared_ptr<ds::ParquetFragmentScanOptions>
+dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t buffer_size,
+                                           bool pre_buffer) {
+  auto options = std::make_shared<ds::ParquetFragmentScanOptions>();
+  if (use_buffered_stream) {
+    options->reader_properties->enable_buffered_stream();
+  } else {
+    options->reader_properties->disable_buffered_stream();
+  }
+  options->reader_properties->set_buffer_size(buffer_size);
+  options->arrow_reader_properties->set_pre_buffer(pre_buffer);
+  return options;
+}
+
 // DirectoryPartitioning, HivePartitioning
 
 // [[dataset::export]]