blob: 562e3969fdb6835556c7cc2df24d2e4a9a10ac05 [file] [log] [blame]
// Copyright 2022 The Blaze Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::fmt::Formatter;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::DataFusionError;
use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::Partitioning::UnknownPartitioning;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use futures::Stream;
#[derive(Debug, Clone)]
pub struct EmptyPartitionsExec {
schema: SchemaRef,
num_partitions: usize,
}
impl EmptyPartitionsExec {
pub fn new(schema: SchemaRef, num_partitions: usize) -> Self {
Self {
schema,
num_partitions,
}
}
}
#[async_trait]
impl ExecutionPlan for EmptyPartitionsExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_partitioning(&self) -> Partitioning {
UnknownPartitioning(self.num_partitions)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if !children.is_empty() {
return Err(DataFusionError::Plan(
"EmptyPartitionsExec expects no children".to_string(),
));
}
Ok(self)
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(EmptyStream(self.schema.clone())))
}
fn metrics(&self) -> Option<MetricsSet> {
None
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"EmptyPartitionsExec: partitions={}, schema={:?}",
&self.num_partitions, &self.schema
)
}
}
}
fn statistics(&self) -> Statistics {
todo!()
}
}
struct EmptyStream(SchemaRef);
impl RecordBatchStream for EmptyStream {
fn schema(&self) -> SchemaRef {
self.0.clone()
}
}
impl Stream for EmptyStream {
type Item = datafusion::arrow::error::Result<RecordBatch>;
fn poll_next(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}