blob: d62d18756f877785a07c1fc664c74cdaab4ceca8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Execution plan for reading CSV files
use std::any::Any;
use std::fs::File;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{common, Partitioning};
use arrow::csv;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use futures::Stream;
use super::{RecordBatchStream, SendableRecordBatchStream};
use async_trait::async_trait;
/// CSV file read option
#[derive(Copy, Clone)]
pub struct CsvReadOptions<'a> {
/// Does the CSV file have a header?
///
/// If schema inference is run on a file with no headers, default column names
/// are created.
pub has_header: bool,
/// An optional column delimiter. Defaults to `b','`.
pub delimiter: u8,
/// An optional schema representing the CSV files. If None, CSV reader will try to infer it
/// based on data in file.
pub schema: Option<&'a Schema>,
/// Max number of rows to read from CSV files for schema inference if needed. Defaults to 1000.
pub schema_infer_max_records: usize,
/// File extension; only files with this extension are selected for data input.
/// Defaults to ".csv".
pub file_extension: &'a str,
}
impl<'a> CsvReadOptions<'a> {
/// Create a CSV read option with default presets
pub fn new() -> Self {
Self {
has_header: true,
schema: None,
schema_infer_max_records: 1000,
delimiter: b',',
file_extension: ".csv",
}
}
/// Configure has_header setting
pub fn has_header(mut self, has_header: bool) -> Self {
self.has_header = has_header;
self
}
/// Specify delimiter to use for CSV read
pub fn delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
self
}
/// Specify the file extension for CSV file selection
pub fn file_extension(mut self, file_extension: &'a str) -> Self {
self.file_extension = file_extension;
self
}
/// Configure delimiter setting with Option, None value will be ignored
pub fn delimiter_option(mut self, delimiter: Option<u8>) -> Self {
if let Some(d) = delimiter {
self.delimiter = d;
}
self
}
/// Specify schema to use for CSV read
pub fn schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
self
}
/// Configure number of max records to read for schema inference
pub fn schema_infer_max_records(mut self, max_records: usize) -> Self {
self.schema_infer_max_records = max_records;
self
}
}
/// Execution plan for scanning a CSV file
#[derive(Debug, Clone)]
pub struct CsvExec {
/// Path to directory containing partitioned CSV files with the same schema
path: String,
/// The individual files under path
filenames: Vec<String>,
/// Schema representing the CSV file
schema: SchemaRef,
/// Does the CSV file have a header?
has_header: bool,
/// An optional column delimiter. Defaults to `b','`
delimiter: Option<u8>,
/// File extension
file_extension: String,
/// Optional projection for which columns to load
projection: Option<Vec<usize>>,
/// Schema after the projection has been applied
projected_schema: SchemaRef,
/// Batch size
batch_size: usize,
}
impl CsvExec {
/// Create a new execution plan for reading a set of CSV files
pub fn try_new(
path: &str,
options: CsvReadOptions,
projection: Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
let file_extension = String::from(options.file_extension);
let mut filenames: Vec<String> = vec![];
common::build_file_list(path, &mut filenames, file_extension.as_str())?;
if filenames.is_empty() {
return Err(DataFusionError::Execution(format!(
"No files found at {path} with file extension {file_extension}",
path = path,
file_extension = file_extension.as_str()
)));
}
let schema = match options.schema {
Some(s) => s.clone(),
None => CsvExec::try_infer_schema(&filenames, &options)?,
};
let projected_schema = match &projection {
None => schema.clone(),
Some(p) => Schema::new(p.iter().map(|i| schema.field(*i).clone()).collect()),
};
Ok(Self {
path: path.to_string(),
filenames,
schema: Arc::new(schema),
has_header: options.has_header,
delimiter: Some(options.delimiter),
file_extension,
projection,
projected_schema: Arc::new(projected_schema),
batch_size,
})
}
/// Path to directory containing partitioned CSV files with the same schema
pub fn path(&self) -> &str {
&self.path
}
/// The individual files under path
pub fn filenames(&self) -> &[String] {
&self.filenames
}
/// Does the CSV file have a header?
pub fn has_header(&self) -> bool {
self.has_header
}
/// An optional column delimiter. Defaults to `b','`
pub fn delimiter(&self) -> Option<&u8> {
self.delimiter.as_ref()
}
/// File extension
pub fn file_extension(&self) -> &str {
&self.file_extension
}
/// Get the schema of the CSV file
pub fn file_schema(&self) -> SchemaRef {
self.schema.clone()
}
/// Optional projection for which columns to load
pub fn projection(&self) -> Option<&Vec<usize>> {
self.projection.as_ref()
}
/// Batch size
pub fn batch_size(&self) -> usize {
self.batch_size
}
/// Infer schema for given CSV dataset
pub fn try_infer_schema(
filenames: &[String],
options: &CsvReadOptions,
) -> Result<Schema> {
Ok(csv::infer_schema_from_files(
filenames,
options.delimiter,
Some(options.schema_infer_max_records),
options.has_header,
)?)
}
}
#[async_trait]
impl ExecutionPlan for CsvExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.filenames.len())
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
// this is a leaf node and has no children
vec![]
}
fn with_new_children(
&self,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(Arc::new(self.clone()))
} else {
Err(DataFusionError::Internal(format!(
"Children cannot be replaced in {:?}",
self
)))
}
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(CsvStream::try_new(
&self.filenames[partition],
self.schema.clone(),
self.has_header,
self.delimiter,
&self.projection,
self.batch_size,
)?))
}
}
/// Iterator over batches
struct CsvStream {
/// Arrow CSV reader
reader: csv::Reader<File>,
}
impl CsvStream {
/// Create an iterator for a CSV file
pub fn try_new(
filename: &str,
schema: SchemaRef,
has_header: bool,
delimiter: Option<u8>,
projection: &Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
let file = File::open(filename)?;
let reader = csv::Reader::new(
file,
schema,
has_header,
delimiter,
batch_size,
None,
projection.clone(),
);
Ok(Self { reader })
}
}
impl Stream for CsvStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(self.reader.next())
}
}
impl RecordBatchStream for CsvStream {
/// Get the schema
fn schema(&self) -> SchemaRef {
self.reader.schema()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test::aggr_test_schema;
use futures::StreamExt;
#[tokio::test]
async fn csv_exec_with_projection() -> Result<()> {
let schema = aggr_test_schema();
let testdata = arrow::util::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv = CsvExec::try_new(
&path,
CsvReadOptions::new().schema(&schema),
Some(vec![0, 2, 4]),
1024,
)?;
assert_eq!(13, csv.schema.fields().len());
assert_eq!(3, csv.projected_schema.fields().len());
assert_eq!(13, csv.file_schema().fields().len());
assert_eq!(3, csv.schema().fields().len());
let mut stream = csv.execute(0).await?;
let batch = stream.next().await.unwrap()?;
assert_eq!(3, batch.num_columns());
let batch_schema = batch.schema();
assert_eq!(3, batch_schema.fields().len());
assert_eq!("c1", batch_schema.field(0).name());
assert_eq!("c3", batch_schema.field(1).name());
assert_eq!("c5", batch_schema.field(2).name());
Ok(())
}
#[tokio::test]
async fn csv_exec_without_projection() -> Result<()> {
let schema = aggr_test_schema();
let testdata = arrow::util::test_util::arrow_test_data();
let filename = "aggregate_test_100.csv";
let path = format!("{}/csv/{}", testdata, filename);
let csv =
CsvExec::try_new(&path, CsvReadOptions::new().schema(&schema), None, 1024)?;
assert_eq!(13, csv.schema.fields().len());
assert_eq!(13, csv.projected_schema.fields().len());
assert_eq!(13, csv.file_schema().fields().len());
assert_eq!(13, csv.schema().fields().len());
let mut it = csv.execute(0).await?;
let batch = it.next().await.unwrap()?;
assert_eq!(13, batch.num_columns());
let batch_schema = batch.schema();
assert_eq!(13, batch_schema.fields().len());
assert_eq!("c1", batch_schema.field(0).name());
assert_eq!("c2", batch_schema.field(1).name());
assert_eq!("c3", batch_schema.field(2).name());
Ok(())
}
}