blob: 7c840fd717449d88cdec796dbfaf8bf81544fa07 [file]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{any::Any, fmt::Formatter, io::Write, sync::Arc};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use auron_jni_bridge::{
jni_call, jni_call_static, jni_new_direct_byte_buffer, jni_new_global_ref, jni_new_string,
};
use datafusion::{
error::Result,
execution::context::TaskContext,
physical_expr::EquivalenceProperties,
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
SendableRecordBatchStream, Statistics,
execution_plan::{Boundedness, EmissionType},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
},
};
use datafusion_ext_commons::io::ipc_compression::IpcCompressionWriter;
use futures::{StreamExt, TryStreamExt, stream::once};
use jni::objects::{GlobalRef, JObject};
use once_cell::sync::OnceCell;
use crate::common::{execution_context::ExecutionContext, timer_helper::TimerHelper};
#[derive(Debug)]
pub struct IpcWriterExec {
input: Arc<dyn ExecutionPlan>,
ipc_consumer_resource_id: String,
metrics: ExecutionPlanMetricsSet,
props: OnceCell<PlanProperties>,
}
impl IpcWriterExec {
pub fn new(input: Arc<dyn ExecutionPlan>, ipc_consumer_resource_id: String) -> Self {
Self {
input,
ipc_consumer_resource_id,
metrics: ExecutionPlanMetricsSet::new(),
props: OnceCell::new(),
}
}
}
impl DisplayAs for IpcWriterExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "IpcWriter")
}
}
#[async_trait]
impl ExecutionPlan for IpcWriterExec {
fn name(&self) -> &str {
"IpcWriterExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.input.schema()
}
fn properties(&self) -> &PlanProperties {
self.props.get_or_init(|| {
PlanProperties::new(
EquivalenceProperties::new(self.schema()),
self.input.output_partitioning().clone(),
EmissionType::Both,
Boundedness::Bounded,
)
})
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(IpcWriterExec::new(
self.input.clone(),
self.ipc_consumer_resource_id.clone(),
)))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics);
let ipc_consumer_local = jni_call_static!(
JniBridge.getResource(
jni_new_string!(&self.ipc_consumer_resource_id)?.as_obj()) -> JObject
)?;
let ipc_consumer = jni_new_global_ref!(ipc_consumer_local.as_obj())?;
let input = exec_ctx.execute_with_input_stats(&self.input)?;
let coalesced = exec_ctx.coalesce_with_default_batch_size(input);
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
once(write_ipc(coalesced, exec_ctx, ipc_consumer)).try_flatten(),
)))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn statistics(&self) -> Result<Statistics> {
todo!()
}
}
pub async fn write_ipc(
mut input: SendableRecordBatchStream,
exec_ctx: Arc<ExecutionContext>,
ipc_consumer: GlobalRef,
) -> Result<SendableRecordBatchStream> {
Ok(exec_ctx
.clone()
.output_with_sender("IpcWrite", move |_sender| async move {
let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer();
struct IpcConsumerWrite(GlobalRef);
impl Write for IpcConsumerWrite {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let buf_len = buf.len();
let buf = jni_new_direct_byte_buffer!(&buf).map_err(std::io::Error::other)?;
jni_call!(ScalaFunction1(self.0.as_obj()).apply(buf.as_obj()) -> JObject)
.map_err(std::io::Error::other)?;
Ok(buf_len)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
let mut writer = IpcCompressionWriter::new(IpcConsumerWrite(ipc_consumer));
while let Some(batch) = exec_ctx
.baseline_metrics()
.elapsed_compute()
.exclude_timer_async(input.next())
.await
.transpose()?
{
writer.write_batch(batch.num_rows(), batch.columns())?;
exec_ctx.baseline_metrics().record_output(batch.num_rows());
}
writer.finish_current_buf()?;
Ok(())
}))
}