blob: 257ae32c75d652f71161dd986c0be2e6cfd5d5d8 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use datafusion::error::DataFusionError;
use datafusion::execution::options::AvroReadOptions;
use datafusion::prelude::SessionContext;
use jni::objects::{JByteArray, JClass, JString};
use jni::sys::jlong;
use jni::JNIEnv;
use prost::Message;
use crate::proto_gen::AvroReadOptionsProto;
use crate::runtime;
use crate::schema::decode_optional_schema;
use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult};
fn with_avro_options<R>(
env: &mut JNIEnv,
options_bytes: JByteArray,
schema_ipc_bytes: JByteArray,
f: impl FnOnce(AvroReadOptions) -> JniResult<R>,
) -> JniResult<R> {
let bytes: Vec<u8> = env.convert_byte_array(&options_bytes)?;
let p = AvroReadOptionsProto::decode(bytes.as_slice())?;
let schema = decode_optional_schema(env, schema_ipc_bytes)?;
// AvroReadOptions exposes `file_extension` as a public field (not a builder
// setter); `schema` is the only field with a fluent setter. Build via
// struct-update syntax to avoid clippy::field_reassign_with_default.
let file_ext = p.file_extension;
let mut opts = AvroReadOptions {
file_extension: &file_ext,
..AvroReadOptions::default()
};
if let Some(ref s) = schema {
opts = opts.schema(s);
}
f(opts)
}
#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_SessionContext_registerAvroWithOptions<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
handle: jlong,
name: JString<'local>,
path: JString<'local>,
options_bytes: JByteArray<'local>,
schema_ipc_bytes: JByteArray<'local>,
) {
try_unwrap_or_throw(&mut env, (), |env| -> JniResult<()> {
if handle == 0 {
return Err("SessionContext handle is null".into());
}
let ctx = unsafe { &*(handle as *const SessionContext) };
let name: String = env.get_string(&name)?.into();
let path: String = env.get_string(&path)?.into();
with_avro_options(env, options_bytes, schema_ipc_bytes, |opts| {
runtime().block_on(async {
ctx.register_avro(&name, &path, opts).await?;
Ok::<(), DataFusionError>(())
})?;
Ok(())
})
})
}
#[no_mangle]
pub extern "system" fn Java_org_apache_datafusion_SessionContext_readAvroWithOptions<'local>(
mut env: JNIEnv<'local>,
_class: JClass<'local>,
handle: jlong,
path: JString<'local>,
options_bytes: JByteArray<'local>,
schema_ipc_bytes: JByteArray<'local>,
) -> jlong {
try_unwrap_or_throw(&mut env, 0, |env| -> JniResult<jlong> {
if handle == 0 {
return Err("SessionContext handle is null".into());
}
let ctx = unsafe { &*(handle as *const SessionContext) };
let path: String = env.get_string(&path)?.into();
with_avro_options(env, options_bytes, schema_ipc_bytes, |opts| {
let df = runtime().block_on(ctx.read_avro(path, opts))?;
Ok(Box::into_raw(Box::new(df)) as jlong)
})
})
}