blob: 1d579234be03fd3dfdc219d23a426d14c0ca61ee [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
any::Any,
fmt::{Debug, Formatter},
sync::Arc,
};
use arrow::{
array::{Array, RecordBatch, RecordBatchOptions, StructArray},
datatypes::{DataType, SchemaRef},
ffi::{FFI_ArrowArray, from_ffi_and_data_type},
};
use auron_jni_bridge::{jni_call, jni_call_static, jni_new_global_ref, jni_new_string};
use datafusion::{
error::Result,
execution::context::TaskContext,
physical_expr::EquivalenceProperties,
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning::UnknownPartitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
execution_plan::{Boundedness, EmissionType},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
},
};
use datafusion_ext_commons::arrow::array_size::BatchSize;
use jni::objects::GlobalRef;
use once_cell::sync::OnceCell;
use crate::common::execution_context::ExecutionContext;
pub struct FFIReaderExec {
num_partitions: usize,
schema: SchemaRef,
export_iter_provider_resource_id: String,
metrics: ExecutionPlanMetricsSet,
props: OnceCell<PlanProperties>,
}
impl FFIReaderExec {
pub fn new(
num_partitions: usize,
export_iter_provider_resource_id: String,
schema: SchemaRef,
) -> FFIReaderExec {
FFIReaderExec {
num_partitions,
export_iter_provider_resource_id,
schema,
metrics: ExecutionPlanMetricsSet::new(),
props: OnceCell::new(),
}
}
}
impl Debug for FFIReaderExec {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "FFIReader")
}
}
impl DisplayAs for FFIReaderExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "FFIReader")
}
}
impl ExecutionPlan for FFIReaderExec {
fn name(&self) -> &str {
"FFIReaderExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn properties(&self) -> &PlanProperties {
self.props.get_or_init(|| {
PlanProperties::new(
EquivalenceProperties::new(self.schema()),
UnknownPartitioning(self.num_partitions),
EmissionType::Both,
Boundedness::Bounded,
)
})
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self::new(
self.num_partitions,
self.export_iter_provider_resource_id.clone(),
self.schema.clone(),
)))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics);
let resource_id = jni_new_string!(&self.export_iter_provider_resource_id)?;
let exporter = jni_new_global_ref!(
jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?.as_obj()
)?;
read_ffi(self.schema(), exporter, exec_ctx.clone())
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn statistics(&self) -> Result<Statistics> {
todo!()
}
}
/// Reads Arrow data from a Java-side exporter via FFI (Foreign Function
/// Interface).
///
/// This function establishes a bridge between Java and Rust to read Arrow data
/// efficiently using the Arrow C Data Interface. It continuously fetches record
/// batches from the Java exporter, converts them from FFI format to native Rust
/// Arrow format, and streams them back to the execution engine.
///
/// # Arguments
///
/// * `schema` - The Arrow schema reference defining the structure of the data
/// * `exporter` - A JNI global reference to the Java-side Arrow FFI exporter
/// object
/// * `exec_ctx` - The execution context for metrics collection and stream
/// management
///
/// # Returns
///
/// Returns a `Result<SendableRecordBatchStream>` containing a stream of record
/// batches on success, or an error if the FFI operation fails.
///
/// # Behavior
///
/// - Continuously polls the Java exporter for new batches until no more data is
/// available
/// - Converts FFI Arrow arrays to native Rust Arrow data structures
/// - Tracks memory usage and output row counts via metrics
/// - Automatically closes the Java exporter resource when the stream ends or
/// fails
/// - Processes batches asynchronously using tokio's blocking task spawning
///
/// # FFI Safety
///
/// This function uses unsafe operations to convert FFI Arrow data. The safety
/// is ensured by:
/// - Proper FFI Arrow array initialization and cleanup
/// - Correct data type matching between Java and Rust sides
/// - Automatic resource management through RAII patterns
fn read_ffi(
schema: SchemaRef,
exporter: GlobalRef,
exec_ctx: Arc<ExecutionContext>,
) -> Result<SendableRecordBatchStream> {
let size_counter = exec_ctx.register_counter_metric("size");
let exec_ctx_cloned = exec_ctx.clone();
Ok(exec_ctx
.clone()
.output_with_sender("FFIReader", move |sender| async move {
struct AutoCloseableExporter(GlobalRef);
impl Drop for AutoCloseableExporter {
fn drop(&mut self) {
let _ = jni_call!(JavaAutoCloseable(self.0.as_obj()).close() -> ());
}
}
let exporter = AutoCloseableExporter(exporter);
loop {
let batch = {
// load batch from ffi
let mut ffi_arrow_array = FFI_ArrowArray::empty();
let ffi_arrow_array_ptr = &mut ffi_arrow_array as *mut FFI_ArrowArray as i64;
let exporter_obj = exporter.0.clone();
let has_next = tokio::task::spawn_blocking(move || {
jni_call!(
AuronArrowFFIExporter(exporter_obj.as_obj())
.exportNextBatch(ffi_arrow_array_ptr) -> bool
)
})
.await
.expect("tokio spawn_blocking error")?;
if !has_next {
break;
}
let import_data_type = DataType::Struct(schema.fields().clone());
let imported =
unsafe { from_ffi_and_data_type(ffi_arrow_array, import_data_type)? };
let struct_array = StructArray::from(imported);
let batch = RecordBatch::try_new_with_options(
schema.clone(),
struct_array.columns().to_vec(),
&RecordBatchOptions::new().with_row_count(Some(struct_array.len())),
)?;
size_counter.add(batch.get_batch_mem_size());
exec_ctx_cloned
.baseline_metrics()
.record_output(batch.num_rows());
batch
};
sender.send(batch).await;
}
Ok(())
}))
}