blob: bf8f781574c3f6d7d14fa39f0d1a32b0cd7aefa9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! CSV data source
//!
//! This CSV data source allows CSV files to be used as input for queries.
//!
//! Example:
//!
//! ```
//! use datafusion::datasource::TableProvider;
//! use datafusion::datasource::csv::{CsvFile, CsvReadOptions};
//!
//! let testdata = arrow::util::test_util::arrow_test_data();
//! let csvdata = CsvFile::try_new(
//! &format!("{}/csv/aggregate_test_100.csv", testdata),
//! CsvReadOptions::new().delimiter(b'|'),
//! ).unwrap();
//! let schema = csvdata.schema();
//! ```
use arrow::datatypes::SchemaRef;
use std::any::Any;
use std::string::String;
use std::sync::Arc;
use crate::datasource::datasource::Statistics;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::Expr;
use crate::physical_plan::csv::CsvExec;
pub use crate::physical_plan::csv::CsvReadOptions;
use crate::physical_plan::{common, ExecutionPlan};
/// Represents a CSV file with a provided schema
pub struct CsvFile {
/// Path to a single CSV file or a directory containing one of more CSV files
path: String,
schema: SchemaRef,
has_header: bool,
delimiter: u8,
file_extension: String,
statistics: Statistics,
}
impl CsvFile {
/// Attempt to initialize a new `CsvFile` from a file path
pub fn try_new(path: &str, options: CsvReadOptions) -> Result<Self> {
let schema = Arc::new(match options.schema {
Some(s) => s.clone(),
None => {
let mut filenames: Vec<String> = vec![];
common::build_file_list(path, &mut filenames, options.file_extension)?;
if filenames.is_empty() {
return Err(DataFusionError::Plan(format!(
"No files found at {path} with file extension {file_extension}",
path = path,
file_extension = options.file_extension
)));
}
CsvExec::try_infer_schema(&filenames, &options)?
}
});
Ok(Self {
path: String::from(path),
schema,
has_header: options.has_header,
delimiter: options.delimiter,
file_extension: String::from(options.file_extension),
statistics: Statistics::default(),
})
}
/// Get the path for the CSV file(s) represented by this CsvFile instance
pub fn path(&self) -> &str {
&self.path
}
/// Determine whether the CSV file(s) represented by this CsvFile instance have a header row
pub fn has_header(&self) -> bool {
self.has_header
}
/// Get the delimiter for the CSV file(s) represented by this CsvFile instance
pub fn delimiter(&self) -> u8 {
self.delimiter
}
/// Get the file extension for the CSV file(s) represented by this CsvFile instance
pub fn file_extension(&self) -> &str {
&self.file_extension
}
}
impl TableProvider for CsvFile {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn scan(
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
_filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(CsvExec::try_new(
&self.path,
CsvReadOptions::new()
.schema(&self.schema)
.has_header(self.has_header)
.delimiter(self.delimiter)
.file_extension(self.file_extension.as_str()),
projection.clone(),
batch_size,
)?))
}
fn statistics(&self) -> Statistics {
self.statistics.clone()
}
}