blob: b79ed591b29b9d1ef5d2e03400163139db10b83d [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use datafusion::common::config::CsvOptions;
use datafusion::common::parsers::CompressionTypeVariant;
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::error::DataFusionError;
use datafusion::prelude::{CsvReadOptions, SessionContext};
use jni::objects::{JByteArray, JClass, JString};
use jni::sys::jlong;
use jni::JNIEnv;
use prost::Message;
use crate::proto_gen::{
CsvReadOptionsProto, CsvWriteOptionsProto, FileCompressionType as ProtoFileCompressionType,
};
use crate::runtime;
use crate::schema::decode_optional_schema;
use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult};
fn with_csv_options<R>(
env: &mut JNIEnv,
options_bytes: JByteArray,
schema_ipc_bytes: JByteArray,
f: impl FnOnce(CsvReadOptions) -> JniResult<R>,
) -> JniResult<R> {
let bytes: Vec<u8> = env.convert_byte_array(&options_bytes)?;
let p = CsvReadOptionsProto::decode(bytes.as_slice())?;
let schema = decode_optional_schema(env, schema_ipc_bytes)?;
let compression = match p.file_compression_type() {
ProtoFileCompressionType::Unspecified => {
return Err("CsvReadOptionsProto.file_compression_type is UNSPECIFIED".into());
}
ProtoFileCompressionType::Uncompressed => FileCompressionType::UNCOMPRESSED,
ProtoFileCompressionType::Gzip => FileCompressionType::GZIP,
ProtoFileCompressionType::Bzip2 => FileCompressionType::BZIP2,
ProtoFileCompressionType::Xz => FileCompressionType::XZ,
ProtoFileCompressionType::Zstd => FileCompressionType::ZSTD,
};
let file_ext = p.file_extension;
let mut opts = CsvReadOptions::new()
.has_header(p.has_header)
.delimiter(p.delimiter as u8)
.quote(p.quote as u8)
.file_extension(&file_ext)
.file_compression_type(compression);
if let Some(t) = p.terminator {
opts = opts.terminator(Some(t as u8));
}
if let Some(e) = p.escape {
opts = opts.escape(e as u8);
}
if let Some(c) = p.comment {
opts = opts.comment(c as u8);
}
if let Some(n) = p.newlines_in_values {
opts = opts.newlines_in_values(n);
}
if let Some(n) = p.schema_infer_max_records {
opts = opts.schema_infer_max_records(n as usize);
}
if let Some(ref s) = schema {
opts = opts.schema(s);
}
f(opts)
}
#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_SessionContext_registerCsvWithOptions<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
handle: jlong,
name: JString<'local>,
path: JString<'local>,
options_bytes: JByteArray<'local>,
schema_ipc_bytes: JByteArray<'local>,
) {
try_unwrap_or_throw(&mut env, (), |env| -> JniResult<()> {
if handle == 0 {
return Err("SessionContext handle is null".into());
}
let ctx = unsafe { &*(handle as *const SessionContext) };
let name: String = env.get_string(&name)?.into();
let path: String = env.get_string(&path)?.into();
with_csv_options(env, options_bytes, schema_ipc_bytes, |opts| {
runtime().block_on(async {
ctx.register_csv(&name, &path, opts).await?;
Ok::<(), DataFusionError>(())
})?;
Ok(())
})
})
}
#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_SessionContext_readCsvWithOptions<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
handle: jlong,
path: JString<'local>,
options_bytes: JByteArray<'local>,
schema_ipc_bytes: JByteArray<'local>,
) -> jlong {
try_unwrap_or_throw(&mut env, 0, |env| -> JniResult<jlong> {
if handle == 0 {
return Err("SessionContext handle is null".into());
}
let ctx = unsafe { &*(handle as *const SessionContext) };
let path: String = env.get_string(&path)?.into();
with_csv_options(env, options_bytes, schema_ipc_bytes, |opts| {
let df = runtime().block_on(ctx.read_csv(path, opts))?;
Ok(Box::into_raw(Box::new(df)) as jlong)
})
})
}
fn proto_compression_to_variant(p: ProtoFileCompressionType) -> JniResult<CompressionTypeVariant> {
match p {
ProtoFileCompressionType::Unspecified => {
Err("CsvWriteOptionsProto.file_compression_type is UNSPECIFIED".into())
}
ProtoFileCompressionType::Uncompressed => Ok(CompressionTypeVariant::UNCOMPRESSED),
ProtoFileCompressionType::Gzip => Ok(CompressionTypeVariant::GZIP),
ProtoFileCompressionType::Bzip2 => Ok(CompressionTypeVariant::BZIP2),
ProtoFileCompressionType::Xz => Ok(CompressionTypeVariant::XZ),
ProtoFileCompressionType::Zstd => Ok(CompressionTypeVariant::ZSTD),
}
}
#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_DataFrame_writeCsvWithOptions<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
handle: jlong,
path: JString<'local>,
options_bytes: JByteArray<'local>,
) {
try_unwrap_or_throw(&mut env, (), |env| -> JniResult<()> {
if handle == 0 {
return Err("DataFrame handle is null".into());
}
let df = unsafe { &*(handle as *const DataFrame) }.clone();
let path: String = env.get_string(&path)?.into();
let bytes: Vec<u8> = env.convert_byte_array(&options_bytes)?;
let p = CsvWriteOptionsProto::decode(bytes.as_slice())?;
// Decode the file_compression_type field eagerly so an unknown wire
// value surfaces as a clear error rather than a silent default.
let compression = if p.file_compression_type.is_some() {
Some(proto_compression_to_variant(p.file_compression_type())?)
} else {
None
};
// When the caller left `singleFileOutput` unset, force directory output (`false`)
// rather than leaving DataFusion in `Automatic` mode. Automatic mode treats paths
// with an extension (e.g. `out.csv`) as single-file targets, which would silently
// contradict the documented "directory unless overridden" default and surprise any
// caller that hands writeCsv a `.csv` path.
let mut write_opts = DataFrameWriteOptions::new()
.with_single_file_output(p.single_file_output.unwrap_or(false));
if !p.partition_cols.is_empty() {
write_opts = write_opts.with_partition_by(p.partition_cols.clone());
}
// Build CsvOptions only when at least one writer-side knob is set, so
// the DataFusion default is preserved when the caller passes
// `new CsvWriteOptions()`.
let writer_opts: Option<CsvOptions> = if p.has_header.is_some()
|| p.delimiter.is_some()
|| p.quote.is_some()
|| p.escape.is_some()
|| p.null_value.is_some()
|| compression.is_some()
{
let mut o = CsvOptions::default();
if let Some(v) = p.has_header {
o = o.with_has_header(v);
}
if let Some(v) = p.delimiter {
o = o.with_delimiter(v as u8);
}
if let Some(v) = p.quote {
o = o.with_quote(v as u8);
}
if let Some(v) = p.escape {
o = o.with_escape(Some(v as u8));
}
if let Some(v) = p.null_value {
o.null_value = Some(v);
}
if let Some(v) = compression {
o = o.with_file_compression_type(v);
}
Some(o)
} else {
None
};
runtime().block_on(df.write_csv(&path, write_opts, writer_opts))?;
Ok(())
})
}