| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| use std::{any::Any, fmt::Formatter, sync::Arc}; |
| |
| use arrow::{datatypes::SchemaRef, util::pretty::pretty_format_batches}; |
| use async_trait::async_trait; |
| use datafusion::{ |
| error::Result, |
| execution::context::TaskContext, |
| physical_expr::EquivalenceProperties, |
| physical_plan::{ |
| DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, |
| SendableRecordBatchStream, Statistics, |
| execution_plan::{Boundedness, EmissionType}, |
| metrics::{ExecutionPlanMetricsSet, MetricsSet}, |
| }, |
| }; |
| use futures::StreamExt; |
| use once_cell::sync::OnceCell; |
| |
| use crate::common::execution_context::ExecutionContext; |
| |
| #[derive(Debug)] |
| pub struct DebugExec { |
| input: Arc<dyn ExecutionPlan>, |
| debug_id: String, |
| metrics: ExecutionPlanMetricsSet, |
| props: OnceCell<PlanProperties>, |
| } |
| |
| impl DebugExec { |
| pub fn new(input: Arc<dyn ExecutionPlan>, debug_id: String) -> Self { |
| Self { |
| input, |
| debug_id, |
| metrics: ExecutionPlanMetricsSet::new(), |
| props: OnceCell::new(), |
| } |
| } |
| } |
| |
| impl DisplayAs for DebugExec { |
| fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { |
| write!(f, "DebugExec") |
| } |
| } |
| |
| #[async_trait] |
| impl ExecutionPlan for DebugExec { |
| fn name(&self) -> &str { |
| "DebugExec" |
| } |
| |
| fn as_any(&self) -> &dyn Any { |
| self |
| } |
| |
| fn schema(&self) -> SchemaRef { |
| self.input.schema() |
| } |
| |
| fn properties(&self) -> &PlanProperties { |
| self.props.get_or_init(|| { |
| PlanProperties::new( |
| EquivalenceProperties::new(self.schema()), |
| self.input.output_partitioning().clone(), |
| EmissionType::Both, |
| Boundedness::Bounded, |
| ) |
| }) |
| } |
| |
| fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { |
| vec![&self.input] |
| } |
| |
| fn with_new_children( |
| self: Arc<Self>, |
| _children: Vec<Arc<dyn ExecutionPlan>>, |
| ) -> Result<Arc<dyn ExecutionPlan>> { |
| Ok(Arc::new(DebugExec::new( |
| self.input.clone(), |
| self.debug_id.clone(), |
| ))) |
| } |
| |
| fn execute( |
| &self, |
| partition: usize, |
| context: Arc<TaskContext>, |
| ) -> Result<SendableRecordBatchStream> { |
| let exec_ctx = ExecutionContext::new(context, partition, self.schema(), &self.metrics); |
| let debug_id = self.debug_id.clone(); |
| |
| let mut input = exec_ctx.execute(&self.input)?; |
| Ok( |
| exec_ctx.output_with_sender("Debug", move |sender| async move { |
| while let Some(batch) = input.next().await.transpose()? { |
| let table_str = pretty_format_batches(&[batch.clone()])? |
| .to_string() |
| .replace('\n', &format!("\n{debug_id} - ")); |
| log::info!("DebugExec(partition={partition}):\n{table_str}"); |
| sender.send(batch).await; |
| } |
| Ok(()) |
| }), |
| ) |
| } |
| |
| fn metrics(&self) -> Option<MetricsSet> { |
| Some(self.metrics.clone_inner()) |
| } |
| |
| fn statistics(&self) -> Result<Statistics> { |
| todo!() |
| } |
| } |