blob: 3011b289507ff2b392692fed408b0da8b6f008d8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! EmptyRelation execution plan
use std::any::Any;
use std::sync::Arc;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::memory::MemoryStream;
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning};
use arrow::array::NullArray;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use super::SendableRecordBatchStream;
use async_trait::async_trait;
/// Execution plan for empty relation (produces no rows)
#[derive(Debug)]
pub struct EmptyExec {
/// Specifies whether this exec produces a row or not
produce_one_row: bool,
/// The schema for the produced row
schema: SchemaRef,
}
impl EmptyExec {
/// Create a new EmptyExec
pub fn new(produce_one_row: bool, schema: SchemaRef) -> Self {
EmptyExec {
produce_one_row,
schema,
}
}
/// Specifies whether this exec produces a row or not
pub fn produce_one_row(&self) -> bool {
self.produce_one_row
}
}
#[async_trait]
impl ExecutionPlan for EmptyExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
fn with_new_children(
&self,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
match children.len() {
0 => Ok(Arc::new(EmptyExec::new(false, self.schema.clone()))),
_ => Err(DataFusionError::Internal(
"EmptyExec wrong number of children".to_string(),
)),
}
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
// GlobalLimitExec has a single output partition
if 0 != partition {
return Err(DataFusionError::Internal(format!(
"EmptyExec invalid partition {} (expected 0)",
partition
)));
}
// Makes a stream only contains one null element if needed
let data = if self.produce_one_row {
vec![RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"placeholder",
DataType::Null,
true,
)])),
vec![Arc::new(NullArray::new(1))],
)?]
} else {
vec![]
};
Ok(Box::pin(MemoryStream::try_new(
data,
self.schema.clone(),
None,
)?))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::physical_plan::common;
use crate::test;
#[tokio::test]
async fn empty() -> Result<()> {
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(false, schema.clone());
assert_eq!(empty.schema(), schema);
// we should have no results
let iter = empty.execute(0).await?;
let batches = common::collect(iter).await?;
assert!(batches.is_empty());
Ok(())
}
#[test]
fn with_new_children() -> Result<()> {
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(false, schema);
let empty2 = empty.with_new_children(vec![])?;
assert_eq!(empty.schema(), empty2.schema());
let too_many_kids = vec![empty2];
assert!(
empty.with_new_children(too_many_kids).is_err(),
"expected error when providing list of kids"
);
Ok(())
}
#[tokio::test]
async fn invalid_execute() -> Result<()> {
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(false, schema);
// ask for the wrong partition
assert!(empty.execute(1).await.is_err());
assert!(empty.execute(20).await.is_err());
Ok(())
}
#[tokio::test]
async fn produce_one_row() -> Result<()> {
let schema = test::aggr_test_schema();
let empty = EmptyExec::new(true, schema);
let iter = empty.execute(0).await?;
let batches = common::collect(iter).await?;
// should have one item
assert_eq!(batches.len(), 1);
Ok(())
}
}