blob: a1b6fddc73e45a71d6c441af410b65a0eddef904 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::str::FromStr;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::ipc::reader::StreamReader;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::error::DataFusionError;
use datafusion::prelude::{CsvReadOptions, SessionContext};
use jni::objects::{JByteArray, JClass, JString};
use jni::sys::{jboolean, jbyte, jlong};
use jni::JNIEnv;
use crate::errors::{try_unwrap_or_throw, JniResult};
use crate::runtime;
#[allow(clippy::too_many_arguments)]
fn with_csv_options<R>(
env: &mut JNIEnv,
has_header: jboolean,
delimiter: jbyte,
quote: jbyte,
terminator_set: jboolean,
terminator_value: jbyte,
escape_set: jboolean,
escape_value: jbyte,
comment_set: jboolean,
comment_value: jbyte,
newlines_in_values_set: jboolean,
newlines_in_values_value: jboolean,
schema_infer_max_records: jlong,
file_extension: JString,
file_compression_type: JString,
schema_ipc_bytes: JByteArray,
f: impl FnOnce(CsvReadOptions) -> JniResult<R>,
) -> JniResult<R> {
let file_ext: String = env.get_string(&file_extension)?.into();
let compression: String = env.get_string(&file_compression_type)?.into();
let compression = FileCompressionType::from_str(&compression)?;
let schema: Option<Schema> = if !schema_ipc_bytes.is_null() {
let bytes: Vec<u8> = env.convert_byte_array(&schema_ipc_bytes)?;
let reader = StreamReader::try_new(std::io::Cursor::new(bytes), None)?;
Some((*reader.schema()).clone())
} else {
None
};
let mut opts = CsvReadOptions::new()
.has_header(has_header != 0)
.delimiter(delimiter as u8)
.quote(quote as u8)
.file_extension(&file_ext)
.file_compression_type(compression);
if terminator_set != 0 {
opts = opts.terminator(Some(terminator_value as u8));
}
if escape_set != 0 {
opts = opts.escape(escape_value as u8);
}
if comment_set != 0 {
opts = opts.comment(comment_value as u8);
}
if newlines_in_values_set != 0 {
opts = opts.newlines_in_values(newlines_in_values_value != 0);
}
if schema_infer_max_records >= 0 {
opts = opts.schema_infer_max_records(schema_infer_max_records as usize);
}
if let Some(ref s) = schema {
opts = opts.schema(s);
}
f(opts)
}
#[allow(clippy::too_many_arguments)]
#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_SessionContext_registerCsvWithOptions<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
handle: jlong,
name: JString<'local>,
path: JString<'local>,
has_header: jboolean,
delimiter: jbyte,
quote: jbyte,
terminator_set: jboolean,
terminator_value: jbyte,
escape_set: jboolean,
escape_value: jbyte,
comment_set: jboolean,
comment_value: jbyte,
newlines_in_values_set: jboolean,
newlines_in_values_value: jboolean,
schema_infer_max_records: jlong,
file_extension: JString<'local>,
file_compression_type: JString<'local>,
schema_ipc_bytes: JByteArray<'local>,
) {
try_unwrap_or_throw(&mut env, (), |env| -> JniResult<()> {
if handle == 0 {
return Err("SessionContext handle is null".into());
}
let ctx = unsafe { &*(handle as *const SessionContext) };
let name: String = env.get_string(&name)?.into();
let path: String = env.get_string(&path)?.into();
with_csv_options(
env,
has_header,
delimiter,
quote,
terminator_set,
terminator_value,
escape_set,
escape_value,
comment_set,
comment_value,
newlines_in_values_set,
newlines_in_values_value,
schema_infer_max_records,
file_extension,
file_compression_type,
schema_ipc_bytes,
|opts| {
runtime().block_on(async {
ctx.register_csv(&name, &path, opts).await?;
Ok::<(), DataFusionError>(())
})?;
Ok(())
},
)
})
}
#[allow(clippy::too_many_arguments)]
#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_SessionContext_readCsvWithOptions<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
handle: jlong,
path: JString<'local>,
has_header: jboolean,
delimiter: jbyte,
quote: jbyte,
terminator_set: jboolean,
terminator_value: jbyte,
escape_set: jboolean,
escape_value: jbyte,
comment_set: jboolean,
comment_value: jbyte,
newlines_in_values_set: jboolean,
newlines_in_values_value: jboolean,
schema_infer_max_records: jlong,
file_extension: JString<'local>,
file_compression_type: JString<'local>,
schema_ipc_bytes: JByteArray<'local>,
) -> jlong {
try_unwrap_or_throw(&mut env, 0, |env| -> JniResult<jlong> {
if handle == 0 {
return Err("SessionContext handle is null".into());
}
let ctx = unsafe { &*(handle as *const SessionContext) };
let path: String = env.get_string(&path)?.into();
with_csv_options(
env,
has_header,
delimiter,
quote,
terminator_set,
terminator_value,
escape_set,
escape_value,
comment_set,
comment_value,
newlines_in_values_set,
newlines_in_values_value,
schema_infer_max_records,
file_extension,
file_compression_type,
schema_ipc_bytes,
|opts| {
let df = runtime().block_on(ctx.read_csv(path, opts))?;
Ok(Box::into_raw(Box::new(df)) as jlong)
},
)
})
}