feat: Support Parquet writer options (#1123)

* feat: Support Parquet writer options

* Create dedicated write_parquet_options function

* Rename write_parquet_options to write_parquet_with_options

* Merge remote-tracking branch 'origin/main' into write_parquet_options

* Fix ruff errors
diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py
index 4f77002..16d65f6 100644
--- a/python/datafusion/__init__.py
+++ b/python/datafusion/__init__.py
@@ -46,7 +46,7 @@
     SessionContext,
     SQLOptions,
 )
-from .dataframe import DataFrame
+from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions
 from .expr import (
     Expr,
     WindowFrame,
@@ -80,6 +80,8 @@
     "ExecutionPlan",
     "Expr",
     "LogicalPlan",
+    "ParquetColumnOptions",
+    "ParquetWriterOptions",
     "RecordBatch",
     "RecordBatchStream",
     "RuntimeEnvBuilder",
diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py
index a1df7e0..769271c 100644
--- a/python/datafusion/dataframe.py
+++ b/python/datafusion/dataframe.py
@@ -38,6 +38,8 @@
     from typing_extensions import deprecated  # Python 3.12
 
 from datafusion._internal import DataFrame as DataFrameInternal
+from datafusion._internal import ParquetColumnOptions as ParquetColumnOptionsInternal
+from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
 from datafusion.expr import Expr, SortExpr, sort_or_default
 from datafusion.plan import ExecutionPlan, LogicalPlan
 from datafusion.record_batch import RecordBatchStream
@@ -114,6 +116,173 @@
         return None
 
 
+class ParquetWriterOptions:
+    """Advanced parquet writer options.
+
+    Allows settings the writer options that apply to the entire file. Some options can
+    also be set on a column by column basis, with the field `column_specific_options`
+    (see `ParquetColumnOptions`).
+
+    Attributes:
+        data_pagesize_limit: Sets best effort maximum size of data page in bytes.
+        write_batch_size: Sets write_batch_size in bytes.
+        writer_version: Sets parquet writer version. Valid values are `1.0` and
+            `2.0`.
+        skip_arrow_metadata: Skip encoding the embedded arrow metadata in the
+            KV_meta.
+        compression: Compression type to use. Default is "zstd(3)".
+            Available compression types are
+            - "uncompressed": No compression.
+            - "snappy": Snappy compression.
+            - "gzip(n)": Gzip compression with level n.
+            - "brotli(n)": Brotli compression with level n.
+            - "lz4": LZ4 compression.
+            - "lz4_raw": LZ4_RAW compression.
+            - "zstd(n)": Zstandard compression with level n.
+        dictionary_enabled: Sets if dictionary encoding is enabled. If None, uses
+            the default parquet writer setting.
+        dictionary_page_size_limit: Sets best effort maximum dictionary page size,
+            in bytes.
+        statistics_enabled: Sets if statistics are enabled for any column Valid
+            values are `none`, `chunk`, and `page`. If None, uses the default
+            parquet writer setting.
+        max_row_group_size: Target maximum number of rows in each row group
+            (defaults to 1M rows). Writing larger row groups requires more memory to
+            write, but can get better compression and be faster to read.
+        created_by: Sets "created by" property.
+        column_index_truncate_length: Sets column index truncate length.
+        statistics_truncate_length: Sets statistics truncate length. If None, uses
+            the default parquet writer setting.
+        data_page_row_count_limit: Sets best effort maximum number of rows in a data
+            page.
+        encoding: Sets default encoding for any column. Valid values are `plain`,
+            `plain_dictionary`, `rle`, `bit_packed`, `delta_binary_packed`,
+            `delta_length_byte_array`, `delta_byte_array`, `rle_dictionary`, and
+            `byte_stream_split`. If None, uses the default parquet writer setting.
+        bloom_filter_on_write: Write bloom filters for all columns when creating
+            parquet files.
+        bloom_filter_fpp: Sets bloom filter false positive probability. If None,
+            uses the default parquet writer setting
+        bloom_filter_ndv: Sets bloom filter number of distinct values. If None, uses
+            the default parquet writer setting.
+        allow_single_file_parallelism: Controls whether DataFusion will attempt to
+            speed up writing parquet files by serializing them in parallel. Each
+            column in each row group in each output file are serialized in parallel
+            leveraging a maximum possible core count of n_files * n_row_groups *
+            n_columns.
+        maximum_parallel_row_group_writers: By default parallel parquet writer is
+            tuned for minimum memory usage in a streaming execution plan. You may
+            see a performance benefit when writing large parquet files by increasing
+            `maximum_parallel_row_group_writers` and
+            `maximum_buffered_record_batches_per_stream` if your system has idle
+            cores and can tolerate additional memory usage. Boosting these values is
+            likely worthwhile when writing out already in-memory data, such as from
+            a cached data frame.
+        maximum_buffered_record_batches_per_stream: See
+            `maximum_parallel_row_group_writers`.
+        column_specific_options: Overrides options for specific columns. If a column
+            is not a part of this dictionary, it will use the parameters provided here.
+    """
+
+    def __init__(
+        self,
+        data_pagesize_limit: int = 1024 * 1024,
+        write_batch_size: int = 1024,
+        writer_version: str = "1.0",
+        skip_arrow_metadata: bool = False,
+        compression: Optional[str] = "zstd(3)",
+        dictionary_enabled: Optional[bool] = True,
+        dictionary_page_size_limit: int = 1024 * 1024,
+        statistics_enabled: Optional[str] = "page",
+        max_row_group_size: int = 1024 * 1024,
+        created_by: str = "datafusion-python",
+        column_index_truncate_length: Optional[int] = 64,
+        statistics_truncate_length: Optional[int] = None,
+        data_page_row_count_limit: int = 20_000,
+        encoding: Optional[str] = None,
+        bloom_filter_on_write: bool = False,
+        bloom_filter_fpp: Optional[float] = None,
+        bloom_filter_ndv: Optional[int] = None,
+        allow_single_file_parallelism: bool = True,
+        maximum_parallel_row_group_writers: int = 1,
+        maximum_buffered_record_batches_per_stream: int = 2,
+        column_specific_options: Optional[dict[str, ParquetColumnOptions]] = None,
+    ) -> None:
+        """Initialize the ParquetWriterOptions."""
+        self.data_pagesize_limit = data_pagesize_limit
+        self.write_batch_size = write_batch_size
+        self.writer_version = writer_version
+        self.skip_arrow_metadata = skip_arrow_metadata
+        self.compression = compression
+        self.dictionary_enabled = dictionary_enabled
+        self.dictionary_page_size_limit = dictionary_page_size_limit
+        self.statistics_enabled = statistics_enabled
+        self.max_row_group_size = max_row_group_size
+        self.created_by = created_by
+        self.column_index_truncate_length = column_index_truncate_length
+        self.statistics_truncate_length = statistics_truncate_length
+        self.data_page_row_count_limit = data_page_row_count_limit
+        self.encoding = encoding
+        self.bloom_filter_on_write = bloom_filter_on_write
+        self.bloom_filter_fpp = bloom_filter_fpp
+        self.bloom_filter_ndv = bloom_filter_ndv
+        self.allow_single_file_parallelism = allow_single_file_parallelism
+        self.maximum_parallel_row_group_writers = maximum_parallel_row_group_writers
+        self.maximum_buffered_record_batches_per_stream = (
+            maximum_buffered_record_batches_per_stream
+        )
+        self.column_specific_options = column_specific_options
+
+
+class ParquetColumnOptions:
+    """Parquet options for individual columns.
+
+    Contains the available options that can be applied for an individual Parquet column,
+    replacing the global options in `ParquetWriterOptions`.
+
+    Attributes:
+        encoding: Sets encoding for the column path. Valid values are: `plain`,
+            `plain_dictionary`, `rle`, `bit_packed`, `delta_binary_packed`,
+            `delta_length_byte_array`, `delta_byte_array`, `rle_dictionary`, and
+            `byte_stream_split`. These values are not case-sensitive. If `None`, uses
+            the default parquet options
+        dictionary_enabled: Sets if dictionary encoding is enabled for the column path.
+            If `None`, uses the default parquet options
+        compression: Sets default parquet compression codec for the column path. Valid
+            values are `uncompressed`, `snappy`, `gzip(level)`, `lzo`, `brotli(level)`,
+            `lz4`, `zstd(level)`, and `lz4_raw`. These values are not case-sensitive. If
+            `None`, uses the default parquet options.
+        statistics_enabled: Sets if statistics are enabled for the column Valid values
+            are: `none`, `chunk`, and `page` These values are not case sensitive. If
+            `None`, uses the default parquet options.
+        bloom_filter_enabled: Sets if bloom filter is enabled for the column path. If
+            `None`, uses the default parquet options.
+        bloom_filter_fpp: Sets bloom filter false positive probability for the column
+            path. If `None`, uses the default parquet options.
+        bloom_filter_ndv: Sets bloom filter number of distinct values. If `None`, uses
+            the default parquet options.
+    """
+
+    def __init__(
+        self,
+        encoding: Optional[str] = None,
+        dictionary_enabled: Optional[bool] = None,
+        compression: Optional[str] = None,
+        statistics_enabled: Optional[str] = None,
+        bloom_filter_enabled: Optional[bool] = None,
+        bloom_filter_fpp: Optional[float] = None,
+        bloom_filter_ndv: Optional[int] = None,
+    ) -> None:
+        """Initialize the ParquetColumnOptions."""
+        self.encoding = encoding
+        self.dictionary_enabled = dictionary_enabled
+        self.compression = compression
+        self.statistics_enabled = statistics_enabled
+        self.bloom_filter_enabled = bloom_filter_enabled
+        self.bloom_filter_fpp = bloom_filter_fpp
+        self.bloom_filter_ndv = bloom_filter_ndv
+
+
 class DataFrame:
     """Two dimensional table representation of data.
 
@@ -737,6 +906,58 @@
 
         self.df.write_parquet(str(path), compression.value, compression_level)
 
+    def write_parquet_with_options(
+        self, path: str | pathlib.Path, options: ParquetWriterOptions
+    ) -> None:
+        """Execute the :py:class:`DataFrame` and write the results to a Parquet file.
+
+        Allows advanced writer options to be set with `ParquetWriterOptions`.
+
+        Args:
+            path: Path of the Parquet file to write.
+            options: Sets the writer parquet options (see `ParquetWriterOptions`).
+        """
+        options_internal = ParquetWriterOptionsInternal(
+            options.data_pagesize_limit,
+            options.write_batch_size,
+            options.writer_version,
+            options.skip_arrow_metadata,
+            options.compression,
+            options.dictionary_enabled,
+            options.dictionary_page_size_limit,
+            options.statistics_enabled,
+            options.max_row_group_size,
+            options.created_by,
+            options.column_index_truncate_length,
+            options.statistics_truncate_length,
+            options.data_page_row_count_limit,
+            options.encoding,
+            options.bloom_filter_on_write,
+            options.bloom_filter_fpp,
+            options.bloom_filter_ndv,
+            options.allow_single_file_parallelism,
+            options.maximum_parallel_row_group_writers,
+            options.maximum_buffered_record_batches_per_stream,
+        )
+
+        column_specific_options_internal = {}
+        for column, opts in (options.column_specific_options or {}).items():
+            column_specific_options_internal[column] = ParquetColumnOptionsInternal(
+                bloom_filter_enabled=opts.bloom_filter_enabled,
+                encoding=opts.encoding,
+                dictionary_enabled=opts.dictionary_enabled,
+                compression=opts.compression,
+                statistics_enabled=opts.statistics_enabled,
+                bloom_filter_fpp=opts.bloom_filter_fpp,
+                bloom_filter_ndv=opts.bloom_filter_ndv,
+            )
+
+        self.df.write_parquet_with_options(
+            str(path),
+            options_internal,
+            column_specific_options_internal,
+        )
+
     def write_json(self, path: str | pathlib.Path) -> None:
         """Execute the :py:class:`DataFrame` and write the results to a JSON file.
 
diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py
index 64220ce..3c9b97f 100644
--- a/python/tests/test_dataframe.py
+++ b/python/tests/test_dataframe.py
@@ -27,6 +27,8 @@
 import pytest
 from datafusion import (
     DataFrame,
+    ParquetColumnOptions,
+    ParquetWriterOptions,
     SessionContext,
     WindowFrame,
     column,
@@ -67,6 +69,21 @@
 
 
 @pytest.fixture
+def large_df():
+    ctx = SessionContext()
+
+    rows = 100000
+    data = {
+        "a": list(range(rows)),
+        "b": [f"s-{i}" for i in range(rows)],
+        "c": [float(i + 0.1) for i in range(rows)],
+    }
+    batch = pa.record_batch(data)
+
+    return ctx.from_arrow(batch)
+
+
+@pytest.fixture
 def struct_df():
     ctx = SessionContext()
 
@@ -1632,6 +1649,395 @@
     df.write_parquet(str(path), compression=compression)
 
 
+def test_write_parquet_with_options_default_compression(df, tmp_path):
+    """Test that the default compression is ZSTD."""
+    df.write_parquet(tmp_path)
+
+    for file in tmp_path.rglob("*.parquet"):
+        metadata = pq.ParquetFile(file).metadata.to_dict()
+        for row_group in metadata["row_groups"]:
+            for col in row_group["columns"]:
+                assert col["compression"].lower() == "zstd"
+
+
+@pytest.mark.parametrize(
+    "compression",
+    ["gzip(6)", "brotli(7)", "zstd(15)", "snappy", "uncompressed"],
+)
+def test_write_parquet_with_options_compression(df, tmp_path, compression):
+    import re
+
+    path = tmp_path
+    df.write_parquet_with_options(
+        str(path), ParquetWriterOptions(compression=compression)
+    )
+
+    # test that the actual compression scheme is the one written
+    for _root, _dirs, files in os.walk(path):
+        for file in files:
+            if file.endswith(".parquet"):
+                metadata = pq.ParquetFile(tmp_path / file).metadata.to_dict()
+                for row_group in metadata["row_groups"]:
+                    for col in row_group["columns"]:
+                        assert col["compression"].lower() == re.sub(
+                            r"\(\d+\)", "", compression
+                        )
+
+    result = pq.read_table(str(path)).to_pydict()
+    expected = df.to_pydict()
+
+    assert result == expected
+
+
+@pytest.mark.parametrize(
+    "compression",
+    ["gzip(12)", "brotli(15)", "zstd(23)"],
+)
+def test_write_parquet_with_options_wrong_compression_level(df, tmp_path, compression):
+    path = tmp_path
+
+    with pytest.raises(Exception, match=r"valid compression range .*? exceeded."):
+        df.write_parquet_with_options(
+            str(path), ParquetWriterOptions(compression=compression)
+        )
+
+
+@pytest.mark.parametrize("compression", ["wrong", "wrong(12)"])
+def test_write_parquet_with_options_invalid_compression(df, tmp_path, compression):
+    path = tmp_path
+
+    with pytest.raises(Exception, match="Unknown or unsupported parquet compression"):
+        df.write_parquet_with_options(
+            str(path), ParquetWriterOptions(compression=compression)
+        )
+
+
+@pytest.mark.parametrize(
+    ("writer_version", "format_version"),
+    [("1.0", "1.0"), ("2.0", "2.6"), (None, "1.0")],
+)
+def test_write_parquet_with_options_writer_version(
+    df, tmp_path, writer_version, format_version
+):
+    """Test the Parquet writer version. Note that writer_version=2.0 results in
+    format_version=2.6"""
+    if writer_version is None:
+        df.write_parquet_with_options(tmp_path, ParquetWriterOptions())
+    else:
+        df.write_parquet_with_options(
+            tmp_path, ParquetWriterOptions(writer_version=writer_version)
+        )
+
+    for file in tmp_path.rglob("*.parquet"):
+        parquet = pq.ParquetFile(file)
+        metadata = parquet.metadata.to_dict()
+        assert metadata["format_version"] == format_version
+
+
+@pytest.mark.parametrize("writer_version", ["1.2.3", "custom-version", "0"])
+def test_write_parquet_with_options_wrong_writer_version(df, tmp_path, writer_version):
+    """Test that invalid writer versions in Parquet throw an exception."""
+    with pytest.raises(
+        Exception, match="Unknown or unsupported parquet writer version"
+    ):
+        df.write_parquet_with_options(
+            tmp_path, ParquetWriterOptions(writer_version=writer_version)
+        )
+
+
+@pytest.mark.parametrize("dictionary_enabled", [True, False, None])
+def test_write_parquet_with_options_dictionary_enabled(
+    df, tmp_path, dictionary_enabled
+):
+    """Test enabling/disabling the dictionaries in Parquet."""
+    df.write_parquet_with_options(
+        tmp_path, ParquetWriterOptions(dictionary_enabled=dictionary_enabled)
+    )
+    # by default, the dictionary is enabled, so None results in True
+    result = dictionary_enabled if dictionary_enabled is not None else True
+
+    for file in tmp_path.rglob("*.parquet"):
+        parquet = pq.ParquetFile(file)
+        metadata = parquet.metadata.to_dict()
+
+        for row_group in metadata["row_groups"]:
+            for col in row_group["columns"]:
+                assert col["has_dictionary_page"] == result
+
+
+@pytest.mark.parametrize(
+    ("statistics_enabled", "has_statistics"),
+    [("page", True), ("chunk", True), ("none", False), (None, True)],
+)
+def test_write_parquet_with_options_statistics_enabled(
+    df, tmp_path, statistics_enabled, has_statistics
+):
+    """Test configuring the statistics in Parquet. In pyarrow we can only check for
+    column-level statistics, so "page" and "chunk" are tested in the same way."""
+    df.write_parquet_with_options(
+        tmp_path, ParquetWriterOptions(statistics_enabled=statistics_enabled)
+    )
+
+    for file in tmp_path.rglob("*.parquet"):
+        parquet = pq.ParquetFile(file)
+        metadata = parquet.metadata.to_dict()
+
+        for row_group in metadata["row_groups"]:
+            for col in row_group["columns"]:
+                if has_statistics:
+                    assert col["statistics"] is not None
+                else:
+                    assert col["statistics"] is None
+
+
+@pytest.mark.parametrize("max_row_group_size", [1000, 5000, 10000, 100000])
+def test_write_parquet_with_options_max_row_group_size(
+    large_df, tmp_path, max_row_group_size
+):
+    """Test configuring the max number of rows per group in Parquet. These test cases
+    guarantee that the number of rows for each row group is max_row_group_size, given
+    the total number of rows is a multiple of max_row_group_size."""
+    large_df.write_parquet_with_options(
+        tmp_path, ParquetWriterOptions(max_row_group_size=max_row_group_size)
+    )
+
+    for file in tmp_path.rglob("*.parquet"):
+        parquet = pq.ParquetFile(file)
+        metadata = parquet.metadata.to_dict()
+        for row_group in metadata["row_groups"]:
+            assert row_group["num_rows"] == max_row_group_size
+
+
+@pytest.mark.parametrize("created_by", ["datafusion", "datafusion-python", "custom"])
+def test_write_parquet_with_options_created_by(df, tmp_path, created_by):
+    """Test configuring the created by metadata in Parquet."""
+    df.write_parquet_with_options(tmp_path, ParquetWriterOptions(created_by=created_by))
+
+    for file in tmp_path.rglob("*.parquet"):
+        parquet = pq.ParquetFile(file)
+        metadata = parquet.metadata.to_dict()
+        assert metadata["created_by"] == created_by
+
+
+@pytest.mark.parametrize("statistics_truncate_length", [5, 25, 50])
+def test_write_parquet_with_options_statistics_truncate_length(
+    df, tmp_path, statistics_truncate_length
+):
+    """Test configuring the truncate limit in Parquet's row-group-level statistics."""
+    ctx = SessionContext()
+    data = {
+        "a": [
+            "a_the_quick_brown_fox_jumps_over_the_lazy_dog",
+            "m_the_quick_brown_fox_jumps_over_the_lazy_dog",
+            "z_the_quick_brown_fox_jumps_over_the_lazy_dog",
+        ],
+        "b": ["a_smaller", "m_smaller", "z_smaller"],
+    }
+    df = ctx.from_arrow(pa.record_batch(data))
+    df.write_parquet_with_options(
+        tmp_path,
+        ParquetWriterOptions(statistics_truncate_length=statistics_truncate_length),
+    )
+
+    for file in tmp_path.rglob("*.parquet"):
+        parquet = pq.ParquetFile(file)
+        metadata = parquet.metadata.to_dict()
+
+        for row_group in metadata["row_groups"]:
+            for col in row_group["columns"]:
+                statistics = col["statistics"]
+                assert len(statistics["min"]) <= statistics_truncate_length
+                assert len(statistics["max"]) <= statistics_truncate_length
+
+
+def test_write_parquet_with_options_default_encoding(tmp_path):
+    """Test that, by default, Parquet files are written with dictionary encoding.
+    Note that dictionary encoding is not used for boolean values, so it is not tested
+    here."""
+    ctx = SessionContext()
+    data = {
+        "a": [1, 2, 3],
+        "b": ["1", "2", "3"],
+        "c": [1.01, 2.02, 3.03],
+    }
+    df = ctx.from_arrow(pa.record_batch(data))
+    df.write_parquet_with_options(tmp_path, ParquetWriterOptions())
+
+    for file in tmp_path.rglob("*.parquet"):
+        parquet = pq.ParquetFile(file)
+        metadata = parquet.metadata.to_dict()
+
+        for row_group in metadata["row_groups"]:
+            for col in row_group["columns"]:
+                assert col["encodings"] == ("PLAIN", "RLE", "RLE_DICTIONARY")
+
+
+@pytest.mark.parametrize(
+    ("encoding", "data_types", "result"),
+    [
+        ("plain", ["int", "float", "str", "bool"], ("PLAIN", "RLE")),
+        ("rle", ["bool"], ("RLE",)),
+        ("delta_binary_packed", ["int"], ("RLE", "DELTA_BINARY_PACKED")),
+        ("delta_length_byte_array", ["str"], ("RLE", "DELTA_LENGTH_BYTE_ARRAY")),
+        ("delta_byte_array", ["str"], ("RLE", "DELTA_BYTE_ARRAY")),
+        ("byte_stream_split", ["int", "float"], ("RLE", "BYTE_STREAM_SPLIT")),
+    ],
+)
+def test_write_parquet_with_options_encoding(tmp_path, encoding, data_types, result):
+    """Test different encodings in Parquet in their respective support column types."""
+    ctx = SessionContext()
+
+    data = {}
+    for data_type in data_types:
+        if data_type == "int":
+            data["int"] = [1, 2, 3]
+        elif data_type == "float":
+            data["float"] = [1.01, 2.02, 3.03]
+        elif data_type == "str":
+            data["str"] = ["a", "b", "c"]
+        elif data_type == "bool":
+            data["bool"] = [True, False, True]
+
+    df = ctx.from_arrow(pa.record_batch(data))
+    df.write_parquet_with_options(
+        tmp_path, ParquetWriterOptions(encoding=encoding, dictionary_enabled=False)
+    )
+
+    for file in tmp_path.rglob("*.parquet"):
+        parquet = pq.ParquetFile(file)
+        metadata = parquet.metadata.to_dict()
+
+        for row_group in metadata["row_groups"]:
+            for col in row_group["columns"]:
+                assert col["encodings"] == result
+
+
+@pytest.mark.parametrize("encoding", ["bit_packed"])
+def test_write_parquet_with_options_unsupported_encoding(df, tmp_path, encoding):
+    """Test that unsupported Parquet encodings do not work."""
+    # BaseException is used since this throws a Rust panic: https://github.com/PyO3/pyo3/issues/3519
+    with pytest.raises(BaseException, match="Encoding .*? is not supported"):
+        df.write_parquet_with_options(tmp_path, ParquetWriterOptions(encoding=encoding))
+
+
+@pytest.mark.parametrize("encoding", ["non_existent", "unknown", "plain123"])
+def test_write_parquet_with_options_invalid_encoding(df, tmp_path, encoding):
+    """Test that invalid Parquet encodings do not work."""
+    with pytest.raises(Exception, match="Unknown or unsupported parquet encoding"):
+        df.write_parquet_with_options(tmp_path, ParquetWriterOptions(encoding=encoding))
+
+
+@pytest.mark.parametrize("encoding", ["plain_dictionary", "rle_dictionary"])
+def test_write_parquet_with_options_dictionary_encoding_fallback(
+    df, tmp_path, encoding
+):
+    """Test that the dictionary encoding cannot be used as fallback in Parquet."""
+    # BaseException is used since this throws a Rust panic: https://github.com/PyO3/pyo3/issues/3519
+    with pytest.raises(
+        BaseException, match="Dictionary encoding can not be used as fallback encoding"
+    ):
+        df.write_parquet_with_options(tmp_path, ParquetWriterOptions(encoding=encoding))
+
+
+def test_write_parquet_with_options_bloom_filter(df, tmp_path):
+    """Test Parquet files with and without (default) bloom filters. Since pyarrow does
+    not expose any information about bloom filters, the easiest way to confirm that they
+    are actually written is to compare the file size."""
+    path_no_bloom_filter = tmp_path / "1"
+    path_bloom_filter = tmp_path / "2"
+
+    df.write_parquet_with_options(path_no_bloom_filter, ParquetWriterOptions())
+    df.write_parquet_with_options(
+        path_bloom_filter, ParquetWriterOptions(bloom_filter_on_write=True)
+    )
+
+    size_no_bloom_filter = 0
+    for file in path_no_bloom_filter.rglob("*.parquet"):
+        size_no_bloom_filter += os.path.getsize(file)
+
+    size_bloom_filter = 0
+    for file in path_bloom_filter.rglob("*.parquet"):
+        size_bloom_filter += os.path.getsize(file)
+
+    assert size_no_bloom_filter < size_bloom_filter
+
+
+def test_write_parquet_with_options_column_options(df, tmp_path):
+    """Test writing Parquet files with different options for each column, which replace
+    the global configs (when provided)."""
+    data = {
+        "a": [1, 2, 3],
+        "b": ["a", "b", "c"],
+        "c": [False, True, False],
+        "d": [1.01, 2.02, 3.03],
+        "e": [4, 5, 6],
+    }
+
+    column_specific_options = {
+        "a": ParquetColumnOptions(statistics_enabled="none"),
+        "b": ParquetColumnOptions(encoding="plain", dictionary_enabled=False),
+        "c": ParquetColumnOptions(
+            compression="snappy", encoding="rle", dictionary_enabled=False
+        ),
+        "d": ParquetColumnOptions(
+            compression="zstd(6)",
+            encoding="byte_stream_split",
+            dictionary_enabled=False,
+            statistics_enabled="none",
+        ),
+        # column "e" will use the global configs
+    }
+
+    results = {
+        "a": {
+            "statistics": False,
+            "compression": "brotli",
+            "encodings": ("PLAIN", "RLE", "RLE_DICTIONARY"),
+        },
+        "b": {
+            "statistics": True,
+            "compression": "brotli",
+            "encodings": ("PLAIN", "RLE"),
+        },
+        "c": {
+            "statistics": True,
+            "compression": "snappy",
+            "encodings": ("RLE",),
+        },
+        "d": {
+            "statistics": False,
+            "compression": "zstd",
+            "encodings": ("RLE", "BYTE_STREAM_SPLIT"),
+        },
+        "e": {
+            "statistics": True,
+            "compression": "brotli",
+            "encodings": ("PLAIN", "RLE", "RLE_DICTIONARY"),
+        },
+    }
+
+    ctx = SessionContext()
+    df = ctx.from_arrow(pa.record_batch(data))
+    df.write_parquet_with_options(
+        tmp_path,
+        ParquetWriterOptions(
+            compression="brotli(8)", column_specific_options=column_specific_options
+        ),
+    )
+
+    for file in tmp_path.rglob("*.parquet"):
+        parquet = pq.ParquetFile(file)
+        metadata = parquet.metadata.to_dict()
+
+        for row_group in metadata["row_groups"]:
+            for col in row_group["columns"]:
+                column_name = col["path_in_schema"]
+                result = results[column_name]
+                assert (col["statistics"] is not None) == result["statistics"]
+                assert col["compression"].lower() == result["compression"].lower()
+                assert col["encodings"] == result["encodings"]
+
+
 def test_dataframe_export(df) -> None:
     # Guarantees that we have the canonical implementation
     # reading our dataframe export
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 7711a07..3d68db2 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::HashMap;
 use std::ffi::CString;
 use std::sync::Arc;
 
@@ -27,7 +28,7 @@
 use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
 use datafusion::arrow::util::pretty;
 use datafusion::common::UnnestOptions;
-use datafusion::config::{CsvOptions, TableParquetOptions};
+use datafusion::config::{CsvOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions};
 use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
 use datafusion::datasource::TableProvider;
 use datafusion::error::DataFusionError;
@@ -185,6 +186,101 @@
     Ok(config)
 }
 
+/// Python mapping of `ParquetOptions` (includes just the writer-related options).
+#[pyclass(name = "ParquetWriterOptions", module = "datafusion", subclass)]
+#[derive(Clone, Default)]
+pub struct PyParquetWriterOptions {
+    options: ParquetOptions,
+}
+
+#[pymethods]
+impl PyParquetWriterOptions {
+    #[new]
+    #[allow(clippy::too_many_arguments)]
+    pub fn new(
+        data_pagesize_limit: usize,
+        write_batch_size: usize,
+        writer_version: String,
+        skip_arrow_metadata: bool,
+        compression: Option<String>,
+        dictionary_enabled: Option<bool>,
+        dictionary_page_size_limit: usize,
+        statistics_enabled: Option<String>,
+        max_row_group_size: usize,
+        created_by: String,
+        column_index_truncate_length: Option<usize>,
+        statistics_truncate_length: Option<usize>,
+        data_page_row_count_limit: usize,
+        encoding: Option<String>,
+        bloom_filter_on_write: bool,
+        bloom_filter_fpp: Option<f64>,
+        bloom_filter_ndv: Option<u64>,
+        allow_single_file_parallelism: bool,
+        maximum_parallel_row_group_writers: usize,
+        maximum_buffered_record_batches_per_stream: usize,
+    ) -> Self {
+        Self {
+            options: ParquetOptions {
+                data_pagesize_limit,
+                write_batch_size,
+                writer_version,
+                skip_arrow_metadata,
+                compression,
+                dictionary_enabled,
+                dictionary_page_size_limit,
+                statistics_enabled,
+                max_row_group_size,
+                created_by,
+                column_index_truncate_length,
+                statistics_truncate_length,
+                data_page_row_count_limit,
+                encoding,
+                bloom_filter_on_write,
+                bloom_filter_fpp,
+                bloom_filter_ndv,
+                allow_single_file_parallelism,
+                maximum_parallel_row_group_writers,
+                maximum_buffered_record_batches_per_stream,
+                ..Default::default()
+            },
+        }
+    }
+}
+
+/// Python mapping of `ParquetColumnOptions`.
+#[pyclass(name = "ParquetColumnOptions", module = "datafusion", subclass)]
+#[derive(Clone, Default)]
+pub struct PyParquetColumnOptions {
+    options: ParquetColumnOptions,
+}
+
+#[pymethods]
+impl PyParquetColumnOptions {
+    #[new]
+    pub fn new(
+        bloom_filter_enabled: Option<bool>,
+        encoding: Option<String>,
+        dictionary_enabled: Option<bool>,
+        compression: Option<String>,
+        statistics_enabled: Option<String>,
+        bloom_filter_fpp: Option<f64>,
+        bloom_filter_ndv: Option<u64>,
+    ) -> Self {
+        Self {
+            options: ParquetColumnOptions {
+                bloom_filter_enabled,
+                encoding,
+                dictionary_enabled,
+                compression,
+                statistics_enabled,
+                bloom_filter_fpp,
+                bloom_filter_ndv,
+                ..Default::default()
+            },
+        }
+    }
+}
+
 /// A PyDataFrame is a representation of a logical plan and an API to compose statements.
 /// Use it to build a plan and `.collect()` to execute the plan and collect the result.
 /// The actual execution of a plan runs natively on Rust and Arrow on a multi-threaded environment.
@@ -689,6 +785,34 @@
         Ok(())
     }
 
+    /// Write a `DataFrame` to a Parquet file, using advanced options.
+    fn write_parquet_with_options(
+        &self,
+        path: &str,
+        options: PyParquetWriterOptions,
+        column_specific_options: HashMap<String, PyParquetColumnOptions>,
+        py: Python,
+    ) -> PyDataFusionResult<()> {
+        let table_options = TableParquetOptions {
+            global: options.options,
+            column_specific_options: column_specific_options
+                .into_iter()
+                .map(|(k, v)| (k, v.options))
+                .collect(),
+            ..Default::default()
+        };
+
+        wait_for_future(
+            py,
+            self.df.as_ref().clone().write_parquet(
+                path,
+                DataFrameWriteOptions::new(),
+                Option::from(table_options),
+            ),
+        )??;
+        Ok(())
+    }
+
     /// Executes a query and writes the results to a partitioned JSON file.
     fn write_json(&self, path: &str, py: Python) -> PyDataFusionResult<()> {
         wait_for_future(
diff --git a/src/lib.rs b/src/lib.rs
index 7dced1f..1293eee 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -86,6 +86,8 @@
     m.add_class::<context::PySessionContext>()?;
     m.add_class::<context::PySQLOptions>()?;
     m.add_class::<dataframe::PyDataFrame>()?;
+    m.add_class::<dataframe::PyParquetColumnOptions>()?;
+    m.add_class::<dataframe::PyParquetWriterOptions>()?;
     m.add_class::<udf::PyScalarUDF>()?;
     m.add_class::<udaf::PyAggregateUDF>()?;
     m.add_class::<udwf::PyWindowUDF>()?;