blob: 5654cc2d01580a70684f26e0faf855bca85532a0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{any::Any, fmt::Formatter, sync::Arc};
use arrow::{datatypes::SchemaRef, util::pretty::pretty_format_batches};
use async_trait::async_trait;
use datafusion::{
error::Result,
execution::context::TaskContext,
physical_expr::EquivalenceProperties,
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
SendableRecordBatchStream, Statistics,
execution_plan::{Boundedness, EmissionType},
metrics::{ExecutionPlanMetricsSet, MetricsSet},
},
};
use futures::StreamExt;
use once_cell::sync::OnceCell;
use crate::common::execution_context::ExecutionContext;
#[derive(Debug)]
pub struct DebugExec {
input: Arc<dyn ExecutionPlan>,
debug_id: String,
metrics: ExecutionPlanMetricsSet,
props: OnceCell<PlanProperties>,
}
impl DebugExec {
pub fn new(input: Arc<dyn ExecutionPlan>, debug_id: String) -> Self {
Self {
input,
debug_id,
metrics: ExecutionPlanMetricsSet::new(),
props: OnceCell::new(),
}
}
}
impl DisplayAs for DebugExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "DebugExec")
}
}
#[async_trait]
impl ExecutionPlan for DebugExec {
fn name(&self) -> &str {
"DebugExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.input.schema()
}
fn properties(&self) -> &PlanProperties {
self.props.get_or_init(|| {
PlanProperties::new(
EquivalenceProperties::new(self.schema()),
self.input.output_partitioning().clone(),
EmissionType::Both,
Boundedness::Bounded,
)
})
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(DebugExec::new(
self.input.clone(),
self.debug_id.clone(),
)))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics);
let debug_id = self.debug_id.clone();
let mut input = exec_ctx.execute(&self.input)?;
Ok(
exec_ctx.output_with_sender("Debug", move |sender| async move {
while let Some(batch) = input.next().await.transpose()? {
let table_str = pretty_format_batches(&[batch.clone()])?
.to_string()
.replace('\n', &format!("\n{debug_id} - "));
log::info!("DebugExec(partition={partition}):\n{table_str}");
sender.send(batch).await;
}
Ok(())
}),
)
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn statistics(&self) -> Result<Statistics> {
todo!()
}
}