blob: 9fb84ce7c1d0461bb6b15a9a8342f7b1a5b62a39 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::{any::Any, fmt::Formatter, sync::Arc};
use arrow::datatypes::{Field, Fields, Schema, SchemaRef};
use datafusion::{
common::{Result, Statistics},
execution::TaskContext,
physical_expr::{PhysicalExprRef, PhysicalSortExpr},
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
},
};
use datafusion_ext_commons::streams::coalesce_stream::CoalesceInput;
use futures::{stream::once, FutureExt, StreamExt, TryStreamExt};
use itertools::Itertools;
use crate::{
common::{
batch_statisitcs::{stat_input, InputBatchStatistics},
cached_exprs_evaluator::CachedExprsEvaluator,
column_pruning::{prune_columns, ExecuteWithColumnPruning},
output::TaskOutputter,
},
filter_exec::FilterExec,
};
#[derive(Debug, Clone)]
pub struct ProjectExec {
expr: Vec<(PhysicalExprRef, String)>,
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
metrics: ExecutionPlanMetricsSet,
}
impl ProjectExec {
pub fn try_new(
expr: Vec<(PhysicalExprRef, String)>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let input_schema = input.schema();
let schema = Arc::new(Schema::new(
expr.iter()
.map(|(e, name)| {
Ok(Field::new(
name,
e.data_type(&input_schema)?,
e.nullable(&input_schema)?,
))
})
.collect::<Result<Fields>>()?,
));
Ok(Self {
expr,
input,
schema,
metrics: ExecutionPlanMetricsSet::new(),
})
}
}
impl DisplayAs for ProjectExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"ProjectExec [{}]",
self.expr
.iter()
.map(|(e, name)| format!("{e} AS {name}"))
.join(", ")
)
}
}
impl ExecutionPlan for ProjectExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.input.output_partitioning().partition_count())
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self::try_new(
self.expr.clone(),
children[0].clone(),
)?))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let exprs: Vec<PhysicalExprRef> = self.expr.iter().map(|(e, _name)| e.clone()).collect();
let fut = if let Some(filter_exec) = self.input.as_any().downcast_ref::<FilterExec>() {
execute_project_with_filtering(
filter_exec.children()[0].clone(),
partition,
context.clone(),
self.schema(),
filter_exec.predicates().to_vec(),
exprs,
self.metrics.clone(),
)
.boxed()
} else {
execute_project_with_filtering(
self.input.clone(),
partition,
context.clone(),
self.schema(),
vec![],
exprs,
self.metrics.clone(),
)
.boxed()
};
let output = Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
once(fut).try_flatten(),
));
Ok(context.coalesce_with_default_batch_size(output, &baseline_metrics)?)
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
fn statistics(&self) -> Result<Statistics> {
todo!()
}
}
impl ExecuteWithColumnPruning for ProjectExec {
fn execute_projected(
&self,
partition: usize,
context: Arc<TaskContext>,
projection: &[usize],
) -> Result<SendableRecordBatchStream> {
let projected_project: Arc<dyn ExecutionPlan> = Arc::new(ProjectExec {
input: self.input.clone(),
expr: projection.iter().map(|&i| self.expr[i].clone()).collect(),
schema: Arc::new(self.schema.project(projection)?),
metrics: self.metrics.clone(),
});
projected_project.execute(partition, context)
}
}
async fn execute_project_with_filtering(
input: Arc<dyn ExecutionPlan>,
partition: usize,
context: Arc<TaskContext>,
output_schema: SchemaRef,
filters: Vec<PhysicalExprRef>,
exprs: Vec<PhysicalExprRef>,
metrics: ExecutionPlanMetricsSet,
) -> Result<SendableRecordBatchStream> {
// execute input with pruning
let baseline_metrics = BaselineMetrics::new(&metrics, partition);
let num_exprs = exprs.len();
let (pruned_exprs, projection) = prune_columns(&[exprs, filters].concat())?;
let exprs = pruned_exprs
.iter()
.take(num_exprs)
.cloned()
.collect::<Vec<PhysicalExprRef>>();
let filters = pruned_exprs
.iter()
.skip(num_exprs)
.cloned()
.collect::<Vec<PhysicalExprRef>>();
let cached_expr_evaluator =
CachedExprsEvaluator::try_new(filters, exprs, output_schema.clone())?;
let mut input = stat_input(
InputBatchStatistics::from_metrics_set_and_blaze_conf(&metrics, partition)?,
input.execute_projected(partition, context.clone(), &projection)?,
)?;
context.output_with_sender("Project", output_schema, move |sender| async move {
while let Some(batch) = input.next().await.transpose()? {
let mut timer = baseline_metrics.elapsed_compute().timer();
let output_batch = cached_expr_evaluator.filter_project(&batch)?;
drop(batch);
baseline_metrics.record_output(output_batch.num_rows());
sender.send(Ok(output_batch), Some(&mut timer)).await;
}
Ok(())
})
}