feat(dataframe): add writeCsv with CsvWriteOptions (#53)
diff --git a/core/src/main/java/org/apache/datafusion/CsvWriteOptions.java b/core/src/main/java/org/apache/datafusion/CsvWriteOptions.java
new file mode 100644
index 0000000..169f985
--- /dev/null
+++ b/core/src/main/java/org/apache/datafusion/CsvWriteOptions.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Configuration knobs for writing CSV, passed to {@link DataFrame#writeCsv(String,
+ * CsvWriteOptions)}.
+ *
+ * <p>Mirrors a subset of DataFusion's {@code DataFrameWriteOptions} and the writer-side {@code
+ * CsvOptions}. All setters return {@code this} for fluent chaining. Defaults: every field {@code
+ * null} or empty (meaning the DataFusion default is used).
+ *
+ * <p>Path semantics: when {@link #singleFileOutput(boolean)} is {@code true}, the path passed to
+ * {@code writeCsv} is the literal output filename. When left unset (the default) and there are no
+ * partition columns, the path is treated as a directory that DataFusion populates with one or more
+ * part-files.
+ *
+ * <p>Compression reuses {@link FileCompressionType} -- both the read and write sides accept the
+ * same codec set ({@code UNCOMPRESSED}, {@code GZIP}, {@code BZIP2}, {@code XZ}, {@code ZSTD}).
+ */
+public final class CsvWriteOptions {
+
+  private Boolean singleFileOutput;
+  private final List<String> partitionCols = new ArrayList<>();
+  private Boolean hasHeader;
+  private Byte delimiter;
+  private Byte quote;
+  private Byte escape;
+  private String nullValue;
+  private FileCompressionType fileCompressionType;
+
+  /**
+   * When {@code true}, write to a single file at the supplied path. When left unset (the default)
+   * and no partition columns are configured, the path is treated as a directory and DataFusion
+   * writes one or more part-files.
+   */
+  public CsvWriteOptions singleFileOutput(boolean v) {
+    this.singleFileOutput = v;
+    return this;
+  }
+
+  /**
+   * Hive-style partition columns. Each column listed here is removed from the data rows and encoded
+   * into the directory layout (one subdirectory per distinct value). Mutually exclusive with {@link
+   * #singleFileOutput(boolean)} -- DataFusion rejects the combination at write time.
+   */
+  public CsvWriteOptions partitionCols(String... cols) {
+    this.partitionCols.clear();
+    for (String c : cols) {
+      this.partitionCols.add(c);
+    }
+    return this;
+  }
+
+  /** Whether to write a header row. Defaults to DataFusion's setting (typically {@code true}). */
+  public CsvWriteOptions hasHeader(boolean v) {
+    this.hasHeader = v;
+    return this;
+  }
+
+  /** Field delimiter byte. Defaults to {@code ','}. */
+  public CsvWriteOptions delimiter(byte b) {
+    this.delimiter = b;
+    return this;
+  }
+
+  /** Quote character byte. Defaults to {@code '"'}. */
+  public CsvWriteOptions quote(byte b) {
+    this.quote = b;
+    return this;
+  }
+
+  /** Escape character byte. Defaults to none. */
+  public CsvWriteOptions escape(byte b) {
+    this.escape = b;
+    return this;
+  }
+
+  /** String to write for SQL NULL values. Defaults to the empty string. */
+  public CsvWriteOptions nullValue(String s) {
+    this.nullValue = s;
+    return this;
+  }
+
+  /** Output compression codec. Defaults to uncompressed. */
+  public CsvWriteOptions fileCompressionType(FileCompressionType t) {
+    this.fileCompressionType = t;
+    return this;
+  }
+
+  byte[] toBytes() {
+    org.apache.datafusion.protobuf.CsvWriteOptionsProto.Builder b =
+        org.apache.datafusion.protobuf.CsvWriteOptionsProto.newBuilder();
+    if (singleFileOutput != null) {
+      b.setSingleFileOutput(singleFileOutput);
+    }
+    b.addAllPartitionCols(partitionCols);
+    if (hasHeader != null) {
+      b.setHasHeader(hasHeader);
+    }
+    if (delimiter != null) {
+      b.setDelimiter(delimiter & 0xFF);
+    }
+    if (quote != null) {
+      b.setQuote(quote & 0xFF);
+    }
+    if (escape != null) {
+      b.setEscape(escape & 0xFF);
+    }
+    if (nullValue != null) {
+      b.setNullValue(nullValue);
+    }
+    if (fileCompressionType != null) {
+      b.setFileCompressionType(FileCompressionTypes.toProto(fileCompressionType));
+    }
+    return b.build().toByteArray();
+  }
+}
diff --git a/core/src/main/java/org/apache/datafusion/DataFrame.java b/core/src/main/java/org/apache/datafusion/DataFrame.java
index b263564..c85692c 100644
--- a/core/src/main/java/org/apache/datafusion/DataFrame.java
+++ b/core/src/main/java/org/apache/datafusion/DataFrame.java
@@ -250,6 +250,38 @@
         options.singleFileOutput() != null && options.singleFileOutput());
   }
 
+  /**
+   * Materialize this DataFrame as CSV at {@code path}. The path is treated as a directory unless
+   * overridden via {@link CsvWriteOptions#singleFileOutput(boolean)}. The receiver remains usable
+   * and must still be closed independently.
+   *
+   * @throws RuntimeException if the write fails.
+   */
+  public void writeCsv(String path) {
+    writeCsv(path, new CsvWriteOptions());
+  }
+
+  /**
+   * Materialize this DataFrame as CSV at {@code path} with the supplied {@link CsvWriteOptions}.
+   * The receiver remains usable and must still be closed independently.
+   *
+   * @throws IllegalArgumentException if {@code path} or {@code options} is {@code null}.
+   * @throws RuntimeException if the write fails (path inaccessible, invalid compression spec,
+   *     etc.).
+   */
+  public void writeCsv(String path, CsvWriteOptions options) {
+    if (nativeHandle == 0) {
+      throw new IllegalStateException("DataFrame is closed or already collected");
+    }
+    if (path == null) {
+      throw new IllegalArgumentException("writeCsv path must be non-null");
+    }
+    if (options == null) {
+      throw new IllegalArgumentException("writeCsv options must be non-null");
+    }
+    writeCsvWithOptions(nativeHandle, path, options.toBytes());
+  }
+
   @Override
   public void close() {
     if (nativeHandle != 0) {
@@ -290,4 +322,6 @@
       String compression,
       boolean singleFileOutputSet,
       boolean singleFileOutputValue);
+
+  private static native void writeCsvWithOptions(long handle, String path, byte[] optionsBytes);
 }
diff --git a/core/src/test/java/org/apache/datafusion/CsvWriteOptionsTest.java b/core/src/test/java/org/apache/datafusion/CsvWriteOptionsTest.java
new file mode 100644
index 0000000..babbe1a
--- /dev/null
+++ b/core/src/test/java/org/apache/datafusion/CsvWriteOptionsTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.datafusion.protobuf.CsvWriteOptionsProto;
+import org.junit.jupiter.api.Test;
+
+class CsvWriteOptionsTest {
+
+  @Test
+  void defaultsLeaveEverythingUnset() throws Exception {
+    CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(new CsvWriteOptions().toBytes());
+
+    assertFalse(p.hasSingleFileOutput());
+    assertEquals(0, p.getPartitionColsCount());
+    assertFalse(p.hasHasHeader());
+    assertFalse(p.hasDelimiter());
+    assertFalse(p.hasQuote());
+    assertFalse(p.hasEscape());
+    assertFalse(p.hasNullValue());
+    assertFalse(p.hasFileCompressionType());
+  }
+
+  @Test
+  void fluentSettersRoundTripThroughProto() throws Exception {
+    CsvWriteOptions opts =
+        new CsvWriteOptions()
+            .singleFileOutput(true)
+            .partitionCols("region", "year")
+            .hasHeader(false)
+            .delimiter((byte) '|')
+            .quote((byte) '\'')
+            .escape((byte) '\\')
+            .nullValue("\\N")
+            .fileCompressionType(FileCompressionType.GZIP);
+
+    CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(opts.toBytes());
+
+    assertTrue(p.getSingleFileOutput());
+    assertEquals(2, p.getPartitionColsCount());
+    assertEquals("region", p.getPartitionCols(0));
+    assertEquals("year", p.getPartitionCols(1));
+    assertFalse(p.getHasHeader());
+    assertEquals((int) '|', p.getDelimiter());
+    assertEquals((int) '\'', p.getQuote());
+    assertEquals((int) '\\', p.getEscape());
+    assertEquals("\\N", p.getNullValue());
+    assertEquals(
+        org.apache.datafusion.protobuf.FileCompressionType.FILE_COMPRESSION_TYPE_GZIP,
+        p.getFileCompressionType());
+  }
+
+  @Test
+  void partitionColsResetOnSubsequentCalls() throws Exception {
+    CsvWriteOptions opts = new CsvWriteOptions().partitionCols("a", "b", "c").partitionCols("only");
+
+    CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(opts.toBytes());
+    assertEquals(1, p.getPartitionColsCount());
+    assertEquals("only", p.getPartitionCols(0));
+  }
+
+  @Test
+  void delimiterAcceptsHighByteWithoutSignExtension() throws Exception {
+    CsvWriteOptions opts = new CsvWriteOptions().delimiter((byte) 0xC2);
+    CsvWriteOptionsProto p = CsvWriteOptionsProto.parseFrom(opts.toBytes());
+    assertEquals(0xC2, p.getDelimiter());
+  }
+}
diff --git a/core/src/test/java/org/apache/datafusion/DataFrameWriteCsvTest.java b/core/src/test/java/org/apache/datafusion/DataFrameWriteCsvTest.java
new file mode 100644
index 0000000..0e6de69
--- /dev/null
+++ b/core/src/test/java/org/apache/datafusion/DataFrameWriteCsvTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class DataFrameWriteCsvTest {
+
+  private static Path writeCsv(Path dir, String name, String contents) throws IOException {
+    Path file = dir.resolve(name);
+    Files.writeString(file, contents);
+    return file;
+  }
+
+  private static long countRowsAt(Path dirOrFile, CsvReadOptions readOpts) throws Exception {
+    try (BufferAllocator allocator = new RootAllocator();
+        SessionContext ctx = new SessionContext()) {
+      ctx.registerCsv("t", dirOrFile.toAbsolutePath().toString(), readOpts);
+      try (DataFrame df = ctx.sql("SELECT COUNT(*) FROM t");
+          ArrowReader reader = df.collect(allocator)) {
+        assertTrue(reader.loadNextBatch());
+        VectorSchemaRoot root = reader.getVectorSchemaRoot();
+        assertEquals(1, root.getRowCount());
+        return ((BigIntVector) root.getVector(0)).get(0);
+      }
+    }
+  }
+
+  @Test
+  void writeCsvRoundTripsRowCount(@TempDir Path tempDir) throws Exception {
+    Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n");
+    Path out = tempDir.resolve("out");
+
+    try (SessionContext ctx = new SessionContext();
+        DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+      df.writeCsv(out.toString());
+    }
+
+    assertEquals(3L, countRowsAt(out, new CsvReadOptions()));
+  }
+
+  @Test
+  void writeCsvDefaultsToDirectoryEvenWithExtensionInPath(@TempDir Path tempDir) throws Exception {
+    // The Javadoc promises "directory unless overridden via singleFileOutput(true)". DataFusion's
+    // own DataFrameWriteOptions defaults to Automatic mode, where an extension in the path
+    // (".csv" here) silently flips the output to a single file. The native handler explicitly
+    // pins the default to directory mode so this contract holds regardless of path shape.
+    Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n");
+    Path out = tempDir.resolve("out.csv");
+
+    try (SessionContext ctx = new SessionContext();
+        DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+      df.writeCsv(out.toString());
+    }
+
+    assertTrue(Files.isDirectory(out), "expected directory output at " + out + ", got a file");
+    assertEquals(3L, countRowsAt(out, new CsvReadOptions()));
+  }
+
+  @Test
+  void writeCsvSingleFileProducesOneFile(@TempDir Path tempDir) throws Exception {
+    Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n");
+    Path out = tempDir.resolve("out.csv");
+
+    try (SessionContext ctx = new SessionContext();
+        DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+      df.writeCsv(out.toString(), new CsvWriteOptions().singleFileOutput(true));
+    }
+
+    assertTrue(Files.isRegularFile(out), "expected single file at " + out);
+    assertEquals(3L, countRowsAt(out, new CsvReadOptions()));
+  }
+
+  @Test
+  void writeCsvWithCustomDelimiter(@TempDir Path tempDir) throws Exception {
+    Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n");
+    Path out = tempDir.resolve("out.csv");
+
+    try (SessionContext ctx = new SessionContext();
+        DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+      df.writeCsv(
+          out.toString(), new CsvWriteOptions().singleFileOutput(true).delimiter((byte) '|'));
+    }
+
+    String written = Files.readString(out);
+    assertTrue(written.contains("|"), "expected pipe delimiter in output, got: " + written);
+
+    assertEquals(2L, countRowsAt(out, new CsvReadOptions().delimiter((byte) '|')));
+  }
+
+  @Test
+  void writeCsvWithGzipCompressionRoundTrips(@TempDir Path tempDir) throws Exception {
+    Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n4,dan\n");
+    Path out = tempDir.resolve("gz-out");
+
+    try (SessionContext ctx = new SessionContext();
+        DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+      df.writeCsv(
+          out.toString(), new CsvWriteOptions().fileCompressionType(FileCompressionType.GZIP));
+    }
+
+    try (Stream<Path> stream = Files.walk(out)) {
+      List<Path> files = stream.filter(Files::isRegularFile).toList();
+      assertTrue(!files.isEmpty(), "expected at least one part-file under " + out);
+      for (Path p : files) {
+        assertTrue(
+            p.getFileName().toString().endsWith(".gz"),
+            "expected .gz suffix on " + p.getFileName());
+      }
+    }
+
+    CsvReadOptions readOpts =
+        new CsvReadOptions().fileCompressionType(FileCompressionType.GZIP).fileExtension(".csv.gz");
+    assertEquals(4L, countRowsAt(out, readOpts));
+  }
+
+  @Test
+  void writeCsvRetainsDataFrame(@TempDir Path tempDir) throws Exception {
+    Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n2,bob\n3,carol\n");
+    Path out = tempDir.resolve("retained");
+
+    try (SessionContext ctx = new SessionContext();
+        DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+      df.writeCsv(out.toString());
+      assertEquals(3L, df.count());
+    }
+  }
+
+  @Test
+  void writeCsvRejectsNullPath(@TempDir Path tempDir) throws Exception {
+    Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n");
+    try (SessionContext ctx = new SessionContext();
+        DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+      assertThrows(IllegalArgumentException.class, () -> df.writeCsv(null));
+    }
+  }
+
+  @Test
+  void writeCsvRejectsNullOptions(@TempDir Path tempDir) throws Exception {
+    Path src = writeCsv(tempDir, "src.csv", "id,name\n1,alice\n");
+    Path out = tempDir.resolve("out");
+    try (SessionContext ctx = new SessionContext();
+        DataFrame df = ctx.readCsv(src.toAbsolutePath().toString())) {
+      assertThrows(IllegalArgumentException.class, () -> df.writeCsv(out.toString(), null));
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/datafusion/DataFrameWriteParquetTest.java b/core/src/test/java/org/apache/datafusion/DataFrameWriteParquetTest.java
index 5be934e..acbf6c8 100644
--- a/core/src/test/java/org/apache/datafusion/DataFrameWriteParquetTest.java
+++ b/core/src/test/java/org/apache/datafusion/DataFrameWriteParquetTest.java
@@ -109,4 +109,27 @@
       assertEquals(LINEITEM_ROWS, df.count());
     }
   }
+
+  @Test
+  void writeParquetDefaultsToDirectoryEvenWithExtensionInPath(@TempDir Path tempDir)
+      throws Exception {
+    // The Javadoc promises "directory unless overridden via singleFileOutput(true)". DataFusion's
+    // own DataFrameWriteOptions defaults to Automatic mode, where an extension in the path
+    // (".parquet" here) silently flips the output to a single file. The native handler explicitly
+    // pins the default to directory mode so this contract holds regardless of path shape.
+    //
+    // Uses an in-memory 3-row source so this regression test runs without TPC-H fixtures.
+    Path out = tempDir.resolve("out.parquet");
+
+    try (SessionContext ctx = new SessionContext();
+        DataFrame df =
+            ctx.sql(
+                "SELECT * FROM (VALUES (CAST(1 AS BIGINT), 'alice'), (CAST(2 AS BIGINT), 'bob'),"
+                    + " (CAST(3 AS BIGINT), 'carol')) AS t(id, name)")) {
+      df.writeParquet(out.toString());
+    }
+
+    assertTrue(Files.isDirectory(out), "expected directory output at " + out + ", got a file");
+    assertEquals(3L, countRowsAt(out));
+  }
 }
diff --git a/native/build.rs b/native/build.rs
index 92146b3..52b1127 100644
--- a/native/build.rs
+++ b/native/build.rs
@@ -21,6 +21,7 @@
         "../proto/file_compression_type.proto",
         "../proto/arrow_read_options.proto",
         "../proto/csv_read_options.proto",
+        "../proto/csv_write_options.proto",
         "../proto/json_read_options.proto",
         "../proto/parquet_read_options.proto",
     ];
diff --git a/native/src/csv.rs b/native/src/csv.rs
index 3201951..3ae4627 100644
--- a/native/src/csv.rs
+++ b/native/src/csv.rs
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::common::config::CsvOptions;
+use datafusion::common::parsers::CompressionTypeVariant;
+use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
 use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
 use datafusion::error::DataFusionError;
 use datafusion::prelude::{CsvReadOptions, SessionContext};
@@ -24,7 +27,9 @@
 use prost::Message;
 
 use crate::errors::{try_unwrap_or_throw, JniResult};
-use crate::proto_gen::{CsvReadOptionsProto, FileCompressionType as ProtoFileCompressionType};
+use crate::proto_gen::{
+    CsvReadOptionsProto, CsvWriteOptionsProto, FileCompressionType as ProtoFileCompressionType,
+};
 use crate::runtime;
 use crate::schema::decode_optional_schema;
 
@@ -128,3 +133,91 @@
         })
     })
 }
+
+fn proto_compression_to_variant(p: ProtoFileCompressionType) -> JniResult<CompressionTypeVariant> {
+    match p {
+        ProtoFileCompressionType::Unspecified => {
+            Err("CsvWriteOptionsProto.file_compression_type is UNSPECIFIED".into())
+        }
+        ProtoFileCompressionType::Uncompressed => Ok(CompressionTypeVariant::UNCOMPRESSED),
+        ProtoFileCompressionType::Gzip => Ok(CompressionTypeVariant::GZIP),
+        ProtoFileCompressionType::Bzip2 => Ok(CompressionTypeVariant::BZIP2),
+        ProtoFileCompressionType::Xz => Ok(CompressionTypeVariant::XZ),
+        ProtoFileCompressionType::Zstd => Ok(CompressionTypeVariant::ZSTD),
+    }
+}
+
+#[no_mangle]
+pub extern "system" fn Java_org_apache_datafusion_DataFrame_writeCsvWithOptions<'local>(
+    mut env: JNIEnv<'local>,
+    _class: JClass<'local>,
+    handle: jlong,
+    path: JString<'local>,
+    options_bytes: JByteArray<'local>,
+) {
+    try_unwrap_or_throw(&mut env, (), |env| -> JniResult<()> {
+        if handle == 0 {
+            return Err("DataFrame handle is null".into());
+        }
+        let df = unsafe { &*(handle as *const DataFrame) }.clone();
+        let path: String = env.get_string(&path)?.into();
+        let bytes: Vec<u8> = env.convert_byte_array(&options_bytes)?;
+        let p = CsvWriteOptionsProto::decode(bytes.as_slice())?;
+
+        // Decode the file_compression_type field eagerly so an unknown wire
+        // value surfaces as a clear error rather than a silent default.
+        let compression = if p.file_compression_type.is_some() {
+            Some(proto_compression_to_variant(p.file_compression_type())?)
+        } else {
+            None
+        };
+
+        // When the caller left `singleFileOutput` unset, force directory output (`false`)
+        // rather than leaving DataFusion in `Automatic` mode. Automatic mode treats paths
+        // with an extension (e.g. `out.csv`) as single-file targets, which would silently
+        // contradict the documented "directory unless overridden" default and surprise any
+        // caller that hands writeCsv a `.csv` path.
+        let mut write_opts = DataFrameWriteOptions::new()
+            .with_single_file_output(p.single_file_output.unwrap_or(false));
+        if !p.partition_cols.is_empty() {
+            write_opts = write_opts.with_partition_by(p.partition_cols.clone());
+        }
+
+        // Build CsvOptions only when at least one writer-side knob is set, so
+        // the DataFusion default is preserved when the caller passes
+        // `new CsvWriteOptions()`.
+        let writer_opts: Option<CsvOptions> = if p.has_header.is_some()
+            || p.delimiter.is_some()
+            || p.quote.is_some()
+            || p.escape.is_some()
+            || p.null_value.is_some()
+            || compression.is_some()
+        {
+            let mut o = CsvOptions::default();
+            if let Some(v) = p.has_header {
+                o = o.with_has_header(v);
+            }
+            if let Some(v) = p.delimiter {
+                o = o.with_delimiter(v as u8);
+            }
+            if let Some(v) = p.quote {
+                o = o.with_quote(v as u8);
+            }
+            if let Some(v) = p.escape {
+                o = o.with_escape(Some(v as u8));
+            }
+            if let Some(v) = p.null_value {
+                o.null_value = Some(v);
+            }
+            if let Some(v) = compression {
+                o = o.with_file_compression_type(v);
+            }
+            Some(o)
+        } else {
+            None
+        };
+
+        runtime().block_on(df.write_csv(&path, write_opts, writer_opts))?;
+        Ok(())
+    })
+}
diff --git a/native/src/lib.rs b/native/src/lib.rs
index fe46a07..dba3c08 100644
--- a/native/src/lib.rs
+++ b/native/src/lib.rs
@@ -446,10 +446,17 @@
         let df = unsafe { &*(handle as *const DataFrame) }.clone();
         let path: String = env.get_string(&path)?.into();
 
-        let mut write_opts = DataFrameWriteOptions::new();
-        if single_file_output_set != 0 {
-            write_opts = write_opts.with_single_file_output(single_file_output_value != 0);
-        }
+        // When the caller left `singleFileOutput` unset, force directory output (`false`)
+        // rather than leaving DataFusion in `Automatic` mode. Automatic mode treats paths
+        // with an extension (e.g. `out.parquet`) as single-file targets, which would silently
+        // contradict the documented "directory unless overridden" default and surprise any
+        // caller that hands writeParquet a `.parquet` path.
+        let single_file = if single_file_output_set != 0 {
+            single_file_output_value != 0
+        } else {
+            false
+        };
+        let write_opts = DataFrameWriteOptions::new().with_single_file_output(single_file);
 
         let writer_opts: Option<TableParquetOptions> = if !compression.is_null() {
             let c: String = env.get_string(&compression)?.into();
diff --git a/proto/csv_write_options.proto b/proto/csv_write_options.proto
new file mode 100644
index 0000000..199d532
--- /dev/null
+++ b/proto/csv_write_options.proto
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+package datafusion_java;
+
+import "file_compression_type.proto";
+
+option java_package = "org.apache.datafusion.protobuf";
+option java_multiple_files = true;
+
+// Options used to write CSV files. Fields with non-null Java defaults are
+// always sent; fields marked `optional` preserve unset-ness so the Rust side
+// can leave a DataFusion default in place. `FileCompressionType` is shared
+// with the read side since the codec set is identical. Note: `partition_cols`
+// is sent as a repeated string so an empty list (the default) round-trips
+// unambiguously.
+message CsvWriteOptionsProto {
+  optional bool single_file_output = 1;
+  repeated string partition_cols = 2;
+  optional bool has_header = 3;
+  optional uint32 delimiter = 4;
+  optional uint32 quote = 5;
+  optional uint32 escape = 6;
+  optional string null_value = 7;
+  optional FileCompressionType file_compression_type = 8;
+}