| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::sync::Arc; |
| |
| use datafusion::arrow::datatypes::SchemaRef; |
| use datafusion::arrow::ipc::writer::StreamWriter; |
| use datafusion::dataframe::DataFrame; |
| use datafusion::prelude::SessionContext; |
| use datafusion_proto::logical_plan::{AsLogicalPlan, DefaultLogicalExtensionCodec}; |
| use datafusion_proto::protobuf::LogicalPlanNode; |
| use jni::objects::{JByteArray, JClass, JString}; |
| use jni::sys::{jbyteArray, jlong}; |
| use jni::JNIEnv; |
| use prost::Message; |
| |
| use crate::runtime; |
| use datafusion_jni_common::errors::{try_unwrap_or_throw, JniResult}; |
| |
| #[no_mangle] |
| pub extern "system" fn Java_org_apache_datafusion_SessionContext_createDataFrameFromProto< |
| 'local, |
| >( |
| mut env: JNIEnv<'local>, |
| _class: JClass<'local>, |
| handle: jlong, |
| plan_bytes: JByteArray<'local>, |
| ) -> jlong { |
| try_unwrap_or_throw(&mut env, 0, |env| -> JniResult<jlong> { |
| if handle == 0 { |
| return Err("SessionContext handle is null".into()); |
| } |
| let ctx = unsafe { &*(handle as *const SessionContext) }; |
| let bytes: Vec<u8> = env.convert_byte_array(&plan_bytes)?; |
| let node = LogicalPlanNode::decode(bytes.as_slice())?; |
| let codec = DefaultLogicalExtensionCodec {}; |
| let task_ctx = ctx.task_ctx(); |
| let plan = node.try_into_logical_plan(task_ctx.as_ref(), &codec)?; |
| let df: DataFrame = runtime().block_on(ctx.execute_logical_plan(plan))?; |
| Ok(Box::into_raw(Box::new(df)) as jlong) |
| }) |
| } |
| |
| /// Decode a Substrait `Plan` proto, translate it to a DataFusion `LogicalPlan` |
| /// against this session's catalog, and wrap the result in a `DataFrame`. |
| /// |
| /// Sibling of `createDataFrameFromProto`, but for the cross-engine Substrait |
| /// wire format rather than DataFusion's native `LogicalPlanNode`. Gated behind |
| /// the `substrait` Cargo feature; the feature-off compile substitutes a stub |
| /// that throws a clear error at runtime so the Java surface is unchanged. |
| #[cfg(feature = "substrait")] |
| #[no_mangle] |
| pub extern "system" fn Java_org_apache_datafusion_SessionContext_createDataFrameFromSubstrait< |
| 'local, |
| >( |
| mut env: JNIEnv<'local>, |
| _class: JClass<'local>, |
| handle: jlong, |
| plan_bytes: JByteArray<'local>, |
| ) -> jlong { |
| try_unwrap_or_throw(&mut env, 0, |env| -> JniResult<jlong> { |
| if handle == 0 { |
| return Err("SessionContext handle is null".into()); |
| } |
| let ctx = unsafe { &*(handle as *const SessionContext) }; |
| let bytes: Vec<u8> = env.convert_byte_array(&plan_bytes)?; |
| |
| let plan = runtime().block_on(async { |
| let substrait_plan = datafusion_substrait::serializer::deserialize_bytes(bytes).await?; |
| datafusion_substrait::logical_plan::consumer::from_substrait_plan( |
| &ctx.state(), |
| &substrait_plan, |
| ) |
| .await |
| })?; |
| let df: DataFrame = runtime().block_on(ctx.execute_logical_plan(plan))?; |
| Ok(Box::into_raw(Box::new(df)) as jlong) |
| }) |
| } |
| |
| #[cfg(not(feature = "substrait"))] |
| #[no_mangle] |
| pub extern "system" fn Java_org_apache_datafusion_SessionContext_createDataFrameFromSubstrait< |
| 'local, |
| >( |
| mut env: JNIEnv<'local>, |
| _class: JClass<'local>, |
| _handle: jlong, |
| _plan_bytes: JByteArray<'local>, |
| ) -> jlong { |
| try_unwrap_or_throw(&mut env, 0, |_env| -> JniResult<jlong> { |
| Err( |
| "datafusion-jni was built without the `substrait` Cargo feature; \ |
| rebuild the native crate with `--features substrait` to enable \ |
| SessionContext.fromSubstrait" |
| .into(), |
| ) |
| }) |
| } |
| |
| #[no_mangle] |
| pub extern "system" fn Java_org_apache_datafusion_SessionContext_tableSchemaIpc<'local>( |
| mut env: JNIEnv<'local>, |
| _class: JClass<'local>, |
| handle: jlong, |
| name: JString<'local>, |
| ) -> jbyteArray { |
| try_unwrap_or_throw( |
| &mut env, |
| std::ptr::null_mut(), |
| |env| -> JniResult<jbyteArray> { |
| if handle == 0 { |
| return Err("SessionContext handle is null".into()); |
| } |
| let ctx = unsafe { &*(handle as *const SessionContext) }; |
| let name: String = env.get_string(&name)?.into(); |
| |
| let df = runtime().block_on(ctx.table(name.as_str()))?; |
| let schema: SchemaRef = Arc::new(df.schema().as_arrow().clone()); |
| |
| let mut buf: Vec<u8> = Vec::new(); |
| { |
| let mut writer = StreamWriter::try_new(&mut buf, schema.as_ref())?; |
| writer.finish()?; |
| } |
| let arr = env.byte_array_from_slice(&buf)?; |
| Ok(arr.into_raw()) |
| }, |
| ) |
| } |