| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! JNI plumbing shared by this workspace's native crates (`datafusion-jni` |
| //! and `datafusion-spark-bridge`, and through the latter every bridge |
| //! cdylib): the error-to-Java-exception mapping, the per-cdylib Tokio |
| //! runtime singleton, and the async-stream-to-`FFI_ArrowArrayStream` |
| //! bridge. |
| //! |
| //! Each cdylib statically links its own copy of this rlib, so [`runtime`] is |
| //! a per-cdylib singleton -- exactly the behaviour each crate had when this |
| //! code lived inline. Nothing here is exported with `#[no_mangle]`, so |
| //! linking this crate into several cdylibs loaded in one JVM cannot collide. |
| |
| pub mod errors; |
| |
| use std::panic::{catch_unwind, AssertUnwindSafe}; |
| use std::sync::OnceLock; |
| |
| use datafusion::arrow::array::RecordBatch; |
| use datafusion::arrow::datatypes::SchemaRef; |
| use datafusion::arrow::error::ArrowError; |
| use datafusion::arrow::record_batch::RecordBatchReader; |
| use datafusion::execution::SendableRecordBatchStream; |
| use futures::StreamExt; |
| use tokio::runtime::{Handle, Runtime}; |
| |
| static RT: OnceLock<Runtime> = OnceLock::new(); |
| |
| /// The cdylib-wide Tokio runtime. |
| pub fn runtime() -> &'static Runtime { |
| runtime_with_init(|_| {}) |
| } |
| |
| /// Same singleton as [`runtime`], with a hook that runs exactly once, when |
| /// the runtime is created. `datafusion-jni` uses it to install its |
| /// runtime-metrics accumulator so the sampling baseline coincides with |
| /// runtime start; every later call (either entry point) returns the existing |
| /// runtime without invoking the hook. |
| pub fn runtime_with_init(init: impl FnOnce(&Handle)) -> &'static Runtime { |
| RT.get_or_init(|| { |
| let rt = Runtime::new().expect("failed to create Tokio runtime"); |
| init(rt.handle()); |
| rt |
| }) |
| } |
| |
| /// Bridges DataFusion's async [`SendableRecordBatchStream`] to the synchronous |
| /// [`RecordBatchReader`] interface that `FFI_ArrowArrayStream` (and therefore |
| /// the Java `ArrowReader`) consumes. Each call to `next()` drives one |
| /// `runtime().block_on(stream.next())`, so memory pressure stays bounded by the |
| /// executor pipeline plus a single in-flight batch. |
| pub struct StreamingReader { |
| pub schema: SchemaRef, |
| pub stream: SendableRecordBatchStream, |
| } |
| |
| impl Iterator for StreamingReader { |
| type Item = Result<RecordBatch, ArrowError>; |
| |
| fn next(&mut self) -> Option<Self::Item> { |
| // Arrow's C ABI invokes this iterator through FFI_ArrowArrayStream's |
| // vtable, outside the JNI handler's try_unwrap_or_throw guard. A panic |
| // here (buggy UDF, arrow cast that panics, runtime poison) would |
| // unwind across C/FFI -- undefined behaviour. Catch it and surface as |
| // an ArrowError so the Java side sees a normal exception instead. |
| let next = catch_unwind(AssertUnwindSafe(|| runtime().block_on(self.stream.next()))); |
| match next { |
| Ok(item) => item.map(|r| r.map_err(|e| ArrowError::ExternalError(Box::new(e)))), |
| Err(panic) => { |
| let msg = errors::panic_message(&panic); |
| Some(Err(ArrowError::ExternalError( |
| format!("panic in DataFrame stream: {msg}").into(), |
| ))) |
| } |
| } |
| } |
| } |
| |
| impl RecordBatchReader for StreamingReader { |
| fn schema(&self) -> SchemaRef { |
| self.schema.clone() |
| } |
| } |