blob: f79c865449cf97321d8ffe3ac966a32f6a114094 [file] [log] [blame]
// Copyright 2022 The Blaze Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
any::Any,
fmt::{Debug, Formatter},
sync::Arc,
};
use arrow::datatypes::SchemaRef;
use blaze_jni_bridge::{jni_call, jni_call_static, jni_new_global_ref, jni_new_string};
use datafusion::{
error::Result,
execution::context::TaskContext,
physical_expr::PhysicalSortExpr,
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
Partitioning::UnknownPartitioning,
SendableRecordBatchStream, Statistics,
},
};
use datafusion_ext_commons::streams::ffi_stream::FFIReaderStream;
use jni::objects::JObject;
pub struct FFIReaderExec {
num_partitions: usize,
schema: SchemaRef,
export_iter_provider_resource_id: String,
metrics: ExecutionPlanMetricsSet,
}
impl FFIReaderExec {
pub fn new(
num_partitions: usize,
export_iter_provider_resource_id: String,
schema: SchemaRef,
) -> FFIReaderExec {
FFIReaderExec {
num_partitions,
export_iter_provider_resource_id,
schema,
metrics: ExecutionPlanMetricsSet::new(),
}
}
}
impl Debug for FFIReaderExec {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "FFIReader")
}
}
impl DisplayAs for FFIReaderExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "FFIReader")
}
}
impl ExecutionPlan for FFIReaderExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_partitioning(&self) -> Partitioning {
UnknownPartitioning(self.num_partitions)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self::new(
self.num_partitions,
self.export_iter_provider_resource_id.clone(),
self.schema.clone(),
)))
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let resource_id = jni_new_string!(&self.export_iter_provider_resource_id)?;
let export_iter_provider =
jni_call_static!(JniBridge.getResource(resource_id.as_obj()) -> JObject)?;
let export_iter_local =
jni_call!(ScalaFunction0(export_iter_provider.as_obj()).apply() -> JObject)?;
let export_iter = jni_new_global_ref!(export_iter_local.as_obj())?;
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let size_counter = MetricBuilder::new(&self.metrics).counter("size", partition);
Ok(Box::pin(FFIReaderStream::new(
self.schema.clone(),
export_iter,
baseline_metrics,
size_counter,
)))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn statistics(&self) -> Result<Statistics> {
todo!()
}
}