blob: b22f477bdb3a6c799f7eac97a216ba7976f3849a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Defines the projection execution plan. A projection determines which columns or expressions
//! are returned from a query. The SQL statement `SELECT a, b, a+b FROM t1` is an example
//! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` are the
//! projection expressions. `SELECT` without `FROM` will only evaluate expressions.
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr};
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use super::{RecordBatchStream, SendableRecordBatchStream};
use async_trait::async_trait;
use futures::stream::Stream;
use futures::stream::StreamExt;
/// Execution plan for a projection
#[derive(Debug)]
pub struct ProjectionExec {
/// The projection expressions stored as tuples of (expression, output column name)
expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
/// The schema once the projection has been applied to the input
schema: SchemaRef,
/// The input plan
input: Arc<dyn ExecutionPlan>,
}
impl ProjectionExec {
/// Create a projection on an input
pub fn try_new(
expr: Vec<(Arc<dyn PhysicalExpr>, String)>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let input_schema = input.schema();
let fields: Result<Vec<_>> = expr
.iter()
.map(|(e, name)| {
Ok(Field::new(
name,
e.data_type(&input_schema)?,
e.nullable(&input_schema)?,
))
})
.collect();
let schema = Arc::new(Schema::new(fields?));
Ok(Self {
expr,
schema,
input: input.clone(),
})
}
/// The projection expressions stored as tuples of (expression, output column name)
pub fn expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
&self.expr
}
/// The input plan
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
}
#[async_trait]
impl ExecutionPlan for ProjectionExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
self.input.output_partitioning()
}
fn with_new_children(
&self,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
match children.len() {
1 => Ok(Arc::new(ProjectionExec::try_new(
self.expr.clone(),
children[0].clone(),
)?)),
_ => Err(DataFusionError::Internal(
"ProjectionExec wrong number of children".to_string(),
)),
}
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(ProjectionStream {
schema: self.schema.clone(),
expr: self.expr.iter().map(|x| x.0.clone()).collect(),
input: self.input.execute(partition).await?,
}))
}
}
fn batch_project(
batch: &RecordBatch,
expressions: &[Arc<dyn PhysicalExpr>],
schema: &SchemaRef,
) -> ArrowResult<RecordBatch> {
expressions
.iter()
.map(|expr| expr.evaluate(&batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()
.map_or_else(
|e| Err(DataFusionError::into_arrow_external_error(e)),
|arrays| RecordBatch::try_new(schema.clone(), arrays),
)
}
/// Projection iterator
struct ProjectionStream {
schema: SchemaRef,
expr: Vec<Arc<dyn PhysicalExpr>>,
input: SendableRecordBatchStream,
}
impl Stream for ProjectionStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.input.poll_next_unpin(cx).map(|x| match x {
Some(Ok(batch)) => Some(batch_project(&batch, &self.expr, &self.schema)),
other => other,
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
// same number of record batches
self.input.size_hint()
}
}
impl RecordBatchStream for ProjectionStream {
/// Get the schema
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
use crate::physical_plan::expressions::col;
use crate::test;
use futures::future;
#[tokio::test]
async fn project_first_column() -> Result<()> {
let schema = test::aggr_test_schema();
let partitions = 4;
let path = test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
let csv =
CsvExec::try_new(&path, CsvReadOptions::new().schema(&schema), None, 1024)?;
// pick column c1 and name it column c1 in the output schema
let projection =
ProjectionExec::try_new(vec![(col("c1"), "c1".to_string())], Arc::new(csv))?;
let mut partition_count = 0;
let mut row_count = 0;
for partition in 0..projection.output_partitioning().partition_count() {
partition_count += 1;
let stream = projection.execute(partition).await?;
row_count += stream
.map(|batch| {
let batch = batch.unwrap();
assert_eq!(1, batch.num_columns());
batch.num_rows()
})
.fold(0, |acc, x| future::ready(acc + x))
.await;
}
assert_eq!(partitions, partition_count);
assert_eq!(100, row_count);
Ok(())
}
}