| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! CSV Reader |
| //! |
| //! This CSV reader allows CSV files to be read into the Arrow memory model. Records are |
| //! loaded in batches and are then converted from row-based data to columnar data. |
| //! |
| //! Example: |
| //! |
| //! ``` |
| //! use arrow::csv; |
| //! use arrow::datatypes::{DataType, Field, Schema}; |
| //! use std::fs::File; |
| //! use std::sync::Arc; |
| //! |
| //! let schema = Schema::new(vec![ |
| //! Field::new("city", DataType::Utf8, false), |
| //! Field::new("lat", DataType::Float64, false), |
| //! Field::new("lng", DataType::Float64, false), |
| //! ]); |
| //! |
| //! let file = File::open("test/data/uk_cities.csv").unwrap(); |
| //! |
| //! let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, None, None); |
| //! let batch = csv.next().unwrap().unwrap(); |
| //! ``` |
| |
| use core::cmp::min; |
| use lazy_static::lazy_static; |
| use regex::{Regex, RegexBuilder}; |
| use std::collections::HashSet; |
| use std::fmt; |
| use std::fs::File; |
| use std::io::{Read, Seek, SeekFrom}; |
| use std::sync::Arc; |
| |
| use crate::array::{ |
| ArrayRef, BooleanArray, DictionaryArray, PrimitiveArray, StringArray, |
| }; |
| use crate::datatypes::*; |
| use crate::error::{ArrowError, Result}; |
| use crate::record_batch::RecordBatch; |
| |
| use csv_crate::{ByteRecord, StringRecord}; |
| |
| lazy_static! { |
| static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap(); |
| static ref INTEGER_RE: Regex = Regex::new(r"^-?(\d+)$").unwrap(); |
| static ref BOOLEAN_RE: Regex = RegexBuilder::new(r"^(true)$|^(false)$") |
| .case_insensitive(true) |
| .build() |
| .unwrap(); |
| static ref DATE_RE: Regex = Regex::new(r"^\d{4}-\d\d-\d\d$").unwrap(); |
| static ref DATETIME_RE: Regex = |
| Regex::new(r"^\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap(); |
| } |
| |
| /// Infer the data type of a record |
| fn infer_field_schema(string: &str) -> DataType { |
| // when quoting is enabled in the reader, these quotes aren't escaped, we default to |
| // Utf8 for them |
| if string.starts_with('"') { |
| return DataType::Utf8; |
| } |
| // match regex in a particular order |
| if BOOLEAN_RE.is_match(string) { |
| DataType::Boolean |
| } else if DECIMAL_RE.is_match(string) { |
| DataType::Float64 |
| } else if INTEGER_RE.is_match(string) { |
| DataType::Int64 |
| } else if DATETIME_RE.is_match(string) { |
| DataType::Date64 |
| } else if DATE_RE.is_match(string) { |
| DataType::Date32 |
| } else { |
| DataType::Utf8 |
| } |
| } |
| |
| /// Infer the schema of a CSV file by reading through the first n records of the file, |
| /// with `max_read_records` controlling the maximum number of records to read. |
| /// |
| /// If `max_read_records` is not set, the whole file is read to infer its schema. |
| /// |
| /// Return infered schema and number of records used for inference. This function does not change |
| /// reader cursor offset. |
| pub fn infer_file_schema<R: Read + Seek>( |
| reader: &mut R, |
| delimiter: u8, |
| max_read_records: Option<usize>, |
| has_header: bool, |
| ) -> Result<(Schema, usize)> { |
| infer_file_schema_with_csv_options( |
| reader, |
| delimiter, |
| max_read_records, |
| has_header, |
| None, |
| None, |
| None, |
| ) |
| } |
| |
| fn infer_file_schema_with_csv_options<R: Read + Seek>( |
| reader: &mut R, |
| delimiter: u8, |
| max_read_records: Option<usize>, |
| has_header: bool, |
| escape: Option<u8>, |
| quote: Option<u8>, |
| terminator: Option<u8>, |
| ) -> Result<(Schema, usize)> { |
| let saved_offset = reader.seek(SeekFrom::Current(0))?; |
| |
| let (schema, records_count) = infer_reader_schema_with_csv_options( |
| reader, |
| delimiter, |
| max_read_records, |
| has_header, |
| escape, |
| quote, |
| terminator, |
| )?; |
| |
| // return the reader seek back to the start |
| reader.seek(SeekFrom::Start(saved_offset))?; |
| |
| Ok((schema, records_count)) |
| } |
| |
| /// Infer schema of CSV records provided by struct that implements `Read` trait. |
| /// |
| /// `max_read_records` controlling the maximum number of records to read. If `max_read_records` is |
| /// not set, all records are read to infer the schema. |
| /// |
| /// Return infered schema and number of records used for inference. |
| pub fn infer_reader_schema<R: Read>( |
| reader: &mut R, |
| delimiter: u8, |
| max_read_records: Option<usize>, |
| has_header: bool, |
| ) -> Result<(Schema, usize)> { |
| infer_reader_schema_with_csv_options( |
| reader, |
| delimiter, |
| max_read_records, |
| has_header, |
| None, |
| None, |
| None, |
| ) |
| } |
| |
| fn infer_reader_schema_with_csv_options<R: Read>( |
| reader: &mut R, |
| delimiter: u8, |
| max_read_records: Option<usize>, |
| has_header: bool, |
| escape: Option<u8>, |
| quote: Option<u8>, |
| terminator: Option<u8>, |
| ) -> Result<(Schema, usize)> { |
| let mut csv_reader = Reader::build_csv_reader( |
| reader, |
| has_header, |
| Some(delimiter), |
| escape, |
| quote, |
| terminator, |
| ); |
| |
| // get or create header names |
| // when has_header is false, creates default column names with column_ prefix |
| let headers: Vec<String> = if has_header { |
| let headers = &csv_reader.headers()?.clone(); |
| headers.iter().map(|s| s.to_string()).collect() |
| } else { |
| let first_record_count = &csv_reader.headers()?.len(); |
| (0..*first_record_count) |
| .map(|i| format!("column_{}", i + 1)) |
| .collect() |
| }; |
| |
| let header_length = headers.len(); |
| // keep track of inferred field types |
| let mut column_types: Vec<HashSet<DataType>> = vec![HashSet::new(); header_length]; |
| // keep track of columns with nulls |
| let mut nulls: Vec<bool> = vec![false; header_length]; |
| |
| let mut records_count = 0; |
| let mut fields = vec![]; |
| |
| let mut record = StringRecord::new(); |
| let max_records = max_read_records.unwrap_or(usize::MAX); |
| while records_count < max_records { |
| if !csv_reader.read_record(&mut record)? { |
| break; |
| } |
| records_count += 1; |
| |
| for i in 0..header_length { |
| if let Some(string) = record.get(i) { |
| if string.is_empty() { |
| nulls[i] = true; |
| } else { |
| column_types[i].insert(infer_field_schema(string)); |
| } |
| } |
| } |
| } |
| |
| // build schema from inference results |
| for i in 0..header_length { |
| let possibilities = &column_types[i]; |
| let has_nulls = nulls[i]; |
| let field_name = &headers[i]; |
| |
| // determine data type based on possible types |
| // if there are incompatible types, use DataType::Utf8 |
| match possibilities.len() { |
| 1 => { |
| for dtype in possibilities.iter() { |
| fields.push(Field::new(&field_name, dtype.clone(), has_nulls)); |
| } |
| } |
| 2 => { |
| if possibilities.contains(&DataType::Int64) |
| && possibilities.contains(&DataType::Float64) |
| { |
| // we have an integer and double, fall down to double |
| fields.push(Field::new(&field_name, DataType::Float64, has_nulls)); |
| } else { |
| // default to Utf8 for conflicting datatypes (e.g bool and int) |
| fields.push(Field::new(&field_name, DataType::Utf8, has_nulls)); |
| } |
| } |
| _ => fields.push(Field::new(&field_name, DataType::Utf8, has_nulls)), |
| } |
| } |
| |
| Ok((Schema::new(fields), records_count)) |
| } |
| |
| /// Infer schema from a list of CSV files by reading through first n records |
| /// with `max_read_records` controlling the maximum number of records to read. |
| /// |
| /// Files will be read in the given order untill n records have been reached. |
| /// |
| /// If `max_read_records` is not set, all files will be read fully to infer the schema. |
| pub fn infer_schema_from_files( |
| files: &[String], |
| delimiter: u8, |
| max_read_records: Option<usize>, |
| has_header: bool, |
| ) -> Result<Schema> { |
| let mut schemas = vec![]; |
| let mut records_to_read = max_read_records.unwrap_or(std::usize::MAX); |
| |
| for fname in files.iter() { |
| let (schema, records_read) = infer_file_schema( |
| &mut File::open(fname)?, |
| delimiter, |
| Some(records_to_read), |
| has_header, |
| )?; |
| if records_read == 0 { |
| continue; |
| } |
| schemas.push(schema.clone()); |
| records_to_read -= records_read; |
| if records_to_read == 0 { |
| break; |
| } |
| } |
| |
| Schema::try_merge(schemas) |
| } |
| |
| // optional bounds of the reader, of the form (min line, max line). |
| type Bounds = Option<(usize, usize)>; |
| |
| /// CSV file reader |
| pub struct Reader<R: Read> { |
| /// Explicit schema for the CSV file |
| schema: SchemaRef, |
| /// Optional projection for which columns to load (zero-based column indices) |
| projection: Option<Vec<usize>>, |
| /// File reader |
| reader: csv_crate::Reader<R>, |
| /// Current line number |
| line_number: usize, |
| /// Maximum number of rows to read |
| end: usize, |
| /// Number of records per batch |
| batch_size: usize, |
| /// Vector that can hold the `StringRecord`s of the batches |
| batch_records: Vec<StringRecord>, |
| } |
| |
| impl<R> fmt::Debug for Reader<R> |
| where |
| R: Read, |
| { |
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| f.debug_struct("Reader") |
| .field("schema", &self.schema) |
| .field("projection", &self.projection) |
| .field("line_number", &self.line_number) |
| .finish() |
| } |
| } |
| |
| impl<R: Read> Reader<R> { |
| /// Create a new CsvReader from any value that implements the `Read` trait. |
| /// |
| /// If reading a `File` or an input that supports `std::io::Read` and `std::io::Seek`; |
| /// you can customise the Reader, such as to enable schema inference, use |
| /// `ReaderBuilder`. |
| pub fn new( |
| reader: R, |
| schema: SchemaRef, |
| has_header: bool, |
| delimiter: Option<u8>, |
| batch_size: usize, |
| bounds: Bounds, |
| projection: Option<Vec<usize>>, |
| ) -> Self { |
| Self::from_reader( |
| reader, schema, has_header, delimiter, batch_size, bounds, projection, |
| ) |
| } |
| |
| /// Returns the schema of the reader, useful for getting the schema without reading |
| /// record batches |
| pub fn schema(&self) -> SchemaRef { |
| match &self.projection { |
| Some(projection) => { |
| let fields = self.schema.fields(); |
| let projected_fields: Vec<Field> = |
| projection.iter().map(|i| fields[*i].clone()).collect(); |
| |
| Arc::new(Schema::new(projected_fields)) |
| } |
| None => self.schema.clone(), |
| } |
| } |
| |
| /// Create a new CsvReader from a Reader |
| /// |
| /// This constructor allows you more flexibility in what records are processed by the |
| /// csv reader. |
| pub fn from_reader( |
| reader: R, |
| schema: SchemaRef, |
| has_header: bool, |
| delimiter: Option<u8>, |
| batch_size: usize, |
| bounds: Bounds, |
| projection: Option<Vec<usize>>, |
| ) -> Self { |
| let csv_reader = |
| Self::build_csv_reader(reader, has_header, delimiter, None, None, None); |
| Self::from_csv_reader( |
| csv_reader, schema, has_header, batch_size, bounds, projection, |
| ) |
| } |
| |
| fn build_csv_reader( |
| reader: R, |
| has_header: bool, |
| delimiter: Option<u8>, |
| escape: Option<u8>, |
| quote: Option<u8>, |
| terminator: Option<u8>, |
| ) -> csv_crate::Reader<R> { |
| let mut reader_builder = csv_crate::ReaderBuilder::new(); |
| reader_builder.has_headers(has_header); |
| |
| if let Some(c) = delimiter { |
| reader_builder.delimiter(c); |
| } |
| reader_builder.escape(escape); |
| if let Some(c) = quote { |
| reader_builder.quote(c); |
| } |
| if let Some(t) = terminator { |
| reader_builder.terminator(csv_crate::Terminator::Any(t)); |
| } |
| reader_builder.from_reader(reader) |
| } |
| |
| fn from_csv_reader( |
| mut csv_reader: csv_crate::Reader<R>, |
| schema: SchemaRef, |
| has_header: bool, |
| batch_size: usize, |
| bounds: Bounds, |
| projection: Option<Vec<usize>>, |
| ) -> Self { |
| let (start, end) = match bounds { |
| None => (0, usize::MAX), |
| Some((start, end)) => (start, end), |
| }; |
| |
| // First we will skip `start` rows |
| // note that this skips by iteration. This is because in general it is not possible |
| // to seek in CSV. However, skiping still saves the burden of creating arrow arrays, |
| // which is a slow operation that scales with the number of columns |
| |
| let mut record = ByteRecord::new(); |
| // Skip first start items |
| for _ in 0..start { |
| let res = csv_reader.read_byte_record(&mut record); |
| if !res.unwrap_or(false) { |
| break; |
| } |
| } |
| |
| // Initialize batch_records with StringRecords so they |
| // can be reused across batches |
| let mut batch_records = Vec::with_capacity(batch_size); |
| batch_records.resize_with(batch_size, Default::default); |
| |
| Self { |
| schema, |
| projection, |
| reader: csv_reader, |
| line_number: if has_header { start + 1 } else { start }, |
| batch_size, |
| end, |
| batch_records, |
| } |
| } |
| } |
| |
| impl<R: Read> Iterator for Reader<R> { |
| type Item = Result<RecordBatch>; |
| |
| fn next(&mut self) -> Option<Self::Item> { |
| let remaining = self.end - self.line_number; |
| |
| let mut read_records = 0; |
| for i in 0..min(self.batch_size, remaining) { |
| match self.reader.read_record(&mut self.batch_records[i]) { |
| Ok(true) => { |
| read_records += 1; |
| } |
| Ok(false) => break, |
| Err(e) => { |
| return Some(Err(ArrowError::ParseError(format!( |
| "Error parsing line {}: {:?}", |
| self.line_number + i, |
| e |
| )))); |
| } |
| } |
| } |
| |
| // return early if no data was loaded |
| if read_records == 0 { |
| return None; |
| } |
| |
| // parse the batches into a RecordBatch |
| let result = parse( |
| &self.batch_records[..read_records], |
| &self.schema.fields(), |
| Some(self.schema.metadata.clone()), |
| &self.projection, |
| self.line_number, |
| ); |
| |
| self.line_number += read_records; |
| |
| Some(result) |
| } |
| } |
| |
| /// parses a slice of [csv_crate::StringRecord] into a [array::record_batch::RecordBatch]. |
| fn parse( |
| rows: &[StringRecord], |
| fields: &[Field], |
| metadata: Option<std::collections::HashMap<String, String>>, |
| projection: &Option<Vec<usize>>, |
| line_number: usize, |
| ) -> Result<RecordBatch> { |
| let projection: Vec<usize> = match projection { |
| Some(ref v) => v.clone(), |
| None => fields.iter().enumerate().map(|(i, _)| i).collect(), |
| }; |
| |
| let arrays: Result<Vec<ArrayRef>> = projection |
| .iter() |
| .map(|i| { |
| let i = *i; |
| let field = &fields[i]; |
| match field.data_type() { |
| DataType::Boolean => build_boolean_array(line_number, rows, i), |
| DataType::Int8 => build_primitive_array::<Int8Type>(line_number, rows, i), |
| DataType::Int16 => { |
| build_primitive_array::<Int16Type>(line_number, rows, i) |
| } |
| DataType::Int32 => { |
| build_primitive_array::<Int32Type>(line_number, rows, i) |
| } |
| DataType::Int64 => { |
| build_primitive_array::<Int64Type>(line_number, rows, i) |
| } |
| DataType::UInt8 => { |
| build_primitive_array::<UInt8Type>(line_number, rows, i) |
| } |
| DataType::UInt16 => { |
| build_primitive_array::<UInt16Type>(line_number, rows, i) |
| } |
| DataType::UInt32 => { |
| build_primitive_array::<UInt32Type>(line_number, rows, i) |
| } |
| DataType::UInt64 => { |
| build_primitive_array::<UInt64Type>(line_number, rows, i) |
| } |
| DataType::Float32 => { |
| build_primitive_array::<Float32Type>(line_number, rows, i) |
| } |
| DataType::Float64 => { |
| build_primitive_array::<Float64Type>(line_number, rows, i) |
| } |
| DataType::Date32 => { |
| build_primitive_array::<Date32Type>(line_number, rows, i) |
| } |
| DataType::Date64 => { |
| build_primitive_array::<Date64Type>(line_number, rows, i) |
| } |
| DataType::Timestamp(TimeUnit::Microsecond, _) => build_primitive_array::< |
| TimestampMicrosecondType, |
| >( |
| line_number, rows, i |
| ), |
| DataType::Timestamp(TimeUnit::Nanosecond, _) => { |
| build_primitive_array::<TimestampNanosecondType>(line_number, rows, i) |
| } |
| DataType::Utf8 => Ok(Arc::new( |
| rows.iter().map(|row| row.get(i)).collect::<StringArray>(), |
| ) as ArrayRef), |
| DataType::Dictionary(key_type, value_type) |
| if value_type.as_ref() == &DataType::Utf8 => |
| { |
| match key_type.as_ref() { |
| DataType::Int8 => Ok(Arc::new( |
| rows.iter() |
| .map(|row| row.get(i)) |
| .collect::<DictionaryArray<Int8Type>>(), |
| ) as ArrayRef), |
| DataType::Int16 => Ok(Arc::new( |
| rows.iter() |
| .map(|row| row.get(i)) |
| .collect::<DictionaryArray<Int16Type>>(), |
| ) as ArrayRef), |
| DataType::Int32 => Ok(Arc::new( |
| rows.iter() |
| .map(|row| row.get(i)) |
| .collect::<DictionaryArray<Int32Type>>(), |
| ) as ArrayRef), |
| DataType::Int64 => Ok(Arc::new( |
| rows.iter() |
| .map(|row| row.get(i)) |
| .collect::<DictionaryArray<Int64Type>>(), |
| ) as ArrayRef), |
| DataType::UInt8 => Ok(Arc::new( |
| rows.iter() |
| .map(|row| row.get(i)) |
| .collect::<DictionaryArray<UInt8Type>>(), |
| ) as ArrayRef), |
| DataType::UInt16 => Ok(Arc::new( |
| rows.iter() |
| .map(|row| row.get(i)) |
| .collect::<DictionaryArray<UInt16Type>>(), |
| ) as ArrayRef), |
| DataType::UInt32 => Ok(Arc::new( |
| rows.iter() |
| .map(|row| row.get(i)) |
| .collect::<DictionaryArray<UInt32Type>>(), |
| ) as ArrayRef), |
| DataType::UInt64 => Ok(Arc::new( |
| rows.iter() |
| .map(|row| row.get(i)) |
| .collect::<DictionaryArray<UInt64Type>>(), |
| ) as ArrayRef), |
| _ => Err(ArrowError::ParseError(format!( |
| "Unsupported dictionary key type {:?}", |
| key_type |
| ))), |
| } |
| } |
| other => Err(ArrowError::ParseError(format!( |
| "Unsupported data type {:?}", |
| other |
| ))), |
| } |
| }) |
| .collect(); |
| |
| let projected_fields: Vec<Field> = |
| projection.iter().map(|i| fields[*i].clone()).collect(); |
| |
| let projected_schema = Arc::new(match metadata { |
| None => Schema::new(projected_fields), |
| Some(metadata) => Schema::new_with_metadata(projected_fields, metadata), |
| }); |
| |
| arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr)) |
| } |
| |
| /// Specialized parsing implementations |
| trait Parser: ArrowPrimitiveType { |
| fn parse(string: &str) -> Option<Self::Native> { |
| string.parse::<Self::Native>().ok() |
| } |
| } |
| |
| impl Parser for Float32Type { |
| fn parse(string: &str) -> Option<f32> { |
| lexical_core::parse(string.as_bytes()).ok() |
| } |
| } |
| |
| impl Parser for Float64Type { |
| fn parse(string: &str) -> Option<f64> { |
| lexical_core::parse(string.as_bytes()).ok() |
| } |
| } |
| |
| impl Parser for UInt64Type {} |
| |
| impl Parser for UInt32Type {} |
| |
| impl Parser for UInt16Type {} |
| |
| impl Parser for UInt8Type {} |
| |
| impl Parser for Int64Type {} |
| |
| impl Parser for Int32Type {} |
| |
| impl Parser for Int16Type {} |
| |
| impl Parser for Int8Type {} |
| |
| /// Number of days between 0001-01-01 and 1970-01-01 |
| const EPOCH_DAYS_FROM_CE: i32 = 719_163; |
| |
| impl Parser for Date32Type { |
| fn parse(string: &str) -> Option<i32> { |
| use chrono::Datelike; |
| |
| match Self::DATA_TYPE { |
| DataType::Date32 => { |
| let date = string.parse::<chrono::NaiveDate>().ok()?; |
| Self::Native::from_i32(date.num_days_from_ce() - EPOCH_DAYS_FROM_CE) |
| } |
| _ => None, |
| } |
| } |
| } |
| |
| impl Parser for Date64Type { |
| fn parse(string: &str) -> Option<i64> { |
| match Self::DATA_TYPE { |
| DataType::Date64 => { |
| let date_time = string.parse::<chrono::NaiveDateTime>().ok()?; |
| Self::Native::from_i64(date_time.timestamp_millis()) |
| } |
| _ => None, |
| } |
| } |
| } |
| |
| impl Parser for TimestampNanosecondType { |
| fn parse(string: &str) -> Option<i64> { |
| match Self::DATA_TYPE { |
| DataType::Timestamp(TimeUnit::Nanosecond, None) => { |
| let date_time = string.parse::<chrono::NaiveDateTime>().ok()?; |
| Self::Native::from_i64(date_time.timestamp_nanos()) |
| } |
| _ => None, |
| } |
| } |
| } |
| |
| impl Parser for TimestampMicrosecondType { |
| fn parse(string: &str) -> Option<i64> { |
| match Self::DATA_TYPE { |
| DataType::Timestamp(TimeUnit::Microsecond, None) => { |
| let date_time = string.parse::<chrono::NaiveDateTime>().ok()?; |
| Self::Native::from_i64(date_time.timestamp_nanos() / 1000) |
| } |
| _ => None, |
| } |
| } |
| } |
| |
| fn parse_item<T: Parser>(string: &str) -> Option<T::Native> { |
| T::parse(string) |
| } |
| |
| fn parse_bool(string: &str) -> Option<bool> { |
| if string.eq_ignore_ascii_case("false") { |
| Some(false) |
| } else if string.eq_ignore_ascii_case("true") { |
| Some(true) |
| } else { |
| None |
| } |
| } |
| |
| // parses a specific column (col_idx) into an Arrow Array. |
| fn build_primitive_array<T: ArrowPrimitiveType + Parser>( |
| line_number: usize, |
| rows: &[StringRecord], |
| col_idx: usize, |
| ) -> Result<ArrayRef> { |
| rows.iter() |
| .enumerate() |
| .map(|(row_index, row)| { |
| match row.get(col_idx) { |
| Some(s) => { |
| if s.is_empty() { |
| return Ok(None); |
| } |
| |
| let parsed = parse_item::<T>(s); |
| match parsed { |
| Some(e) => Ok(Some(e)), |
| None => Err(ArrowError::ParseError(format!( |
| // TODO: we should surface the underlying error here. |
| "Error while parsing value {} for column {} at line {}", |
| s, |
| col_idx, |
| line_number + row_index |
| ))), |
| } |
| } |
| None => Ok(None), |
| } |
| }) |
| .collect::<Result<PrimitiveArray<T>>>() |
| .map(|e| Arc::new(e) as ArrayRef) |
| } |
| |
| // parses a specific column (col_idx) into an Arrow Array. |
| fn build_boolean_array( |
| line_number: usize, |
| rows: &[StringRecord], |
| col_idx: usize, |
| ) -> Result<ArrayRef> { |
| rows.iter() |
| .enumerate() |
| .map(|(row_index, row)| { |
| match row.get(col_idx) { |
| Some(s) => { |
| if s.is_empty() { |
| return Ok(None); |
| } |
| |
| let parsed = parse_bool(s); |
| match parsed { |
| Some(e) => Ok(Some(e)), |
| None => Err(ArrowError::ParseError(format!( |
| // TODO: we should surface the underlying error here. |
| "Error while parsing value {} for column {} at line {}", |
| s, |
| col_idx, |
| line_number + row_index |
| ))), |
| } |
| } |
| None => Ok(None), |
| } |
| }) |
| .collect::<Result<BooleanArray>>() |
| .map(|e| Arc::new(e) as ArrayRef) |
| } |
| |
| /// CSV file reader builder |
| #[derive(Debug)] |
| pub struct ReaderBuilder { |
| /// Optional schema for the CSV file |
| /// |
| /// If the schema is not supplied, the reader will try to infer the schema |
| /// based on the CSV structure. |
| schema: Option<SchemaRef>, |
| /// Whether the file has headers or not |
| /// |
| /// If schema inference is run on a file with no headers, default column names |
| /// are created. |
| has_header: bool, |
| /// An optional column delimiter. Defaults to `b','` |
| delimiter: Option<u8>, |
| /// An optional escape charactor. Defaults None |
| escape: Option<u8>, |
| /// An optional quote charactor. Defaults b'\"' |
| quote: Option<u8>, |
| /// An optional record terminator. Defaults CRLF |
| terminator: Option<u8>, |
| /// Optional maximum number of records to read during schema inference |
| /// |
| /// If a number is not provided, all the records are read. |
| max_records: Option<usize>, |
| /// Batch size (number of records to load each time) |
| /// |
| /// The default batch size when using the `ReaderBuilder` is 1024 records |
| batch_size: usize, |
| /// The bounds over which to scan the reader. `None` starts from 0 and runs until EOF. |
| bounds: Bounds, |
| /// Optional projection for which columns to load (zero-based column indices) |
| projection: Option<Vec<usize>>, |
| } |
| |
| impl Default for ReaderBuilder { |
| fn default() -> Self { |
| Self { |
| schema: None, |
| has_header: false, |
| delimiter: None, |
| escape: None, |
| quote: None, |
| terminator: None, |
| max_records: None, |
| batch_size: 1024, |
| bounds: None, |
| projection: None, |
| } |
| } |
| } |
| |
| impl ReaderBuilder { |
| /// Create a new builder for configuring CSV parsing options. |
| /// |
| /// To convert a builder into a reader, call `ReaderBuilder::build` |
| /// |
| /// # Example |
| /// |
| /// ``` |
| /// extern crate arrow; |
| /// |
| /// use arrow::csv; |
| /// use std::fs::File; |
| /// |
| /// fn example() -> csv::Reader<File> { |
| /// let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); |
| /// |
| /// // create a builder, inferring the schema with the first 100 records |
| /// let builder = csv::ReaderBuilder::new().infer_schema(Some(100)); |
| /// |
| /// let reader = builder.build(file).unwrap(); |
| /// |
| /// reader |
| /// } |
| /// ``` |
| pub fn new() -> ReaderBuilder { |
| ReaderBuilder::default() |
| } |
| |
| /// Set the CSV file's schema |
| pub fn with_schema(mut self, schema: SchemaRef) -> Self { |
| self.schema = Some(schema); |
| self |
| } |
| |
| /// Set whether the CSV file has headers |
| pub fn has_header(mut self, has_header: bool) -> Self { |
| self.has_header = has_header; |
| self |
| } |
| |
| /// Set the CSV file's column delimiter as a byte character |
| pub fn with_delimiter(mut self, delimiter: u8) -> Self { |
| self.delimiter = Some(delimiter); |
| self |
| } |
| |
| pub fn with_escape(mut self, escape: u8) -> Self { |
| self.escape = Some(escape); |
| self |
| } |
| |
| pub fn with_quote(mut self, quote: u8) -> Self { |
| self.quote = Some(quote); |
| self |
| } |
| |
| pub fn with_terminator(mut self, terminator: u8) -> Self { |
| self.terminator = Some(terminator); |
| self |
| } |
| |
| /// Set the CSV reader to infer the schema of the file |
| pub fn infer_schema(mut self, max_records: Option<usize>) -> Self { |
| // remove any schema that is set |
| self.schema = None; |
| self.max_records = max_records; |
| self |
| } |
| |
| /// Set the batch size (number of records to load at one time) |
| pub fn with_batch_size(mut self, batch_size: usize) -> Self { |
| self.batch_size = batch_size; |
| self |
| } |
| |
| /// Set the reader's column projection |
| pub fn with_projection(mut self, projection: Vec<usize>) -> Self { |
| self.projection = Some(projection); |
| self |
| } |
| |
| /// Create a new `Reader` from the `ReaderBuilder` |
| pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<Reader<R>> { |
| // check if schema should be inferred |
| let delimiter = self.delimiter.unwrap_or(b','); |
| let schema = match self.schema { |
| Some(schema) => schema, |
| None => { |
| let (inferred_schema, _) = infer_file_schema_with_csv_options( |
| &mut reader, |
| delimiter, |
| self.max_records, |
| self.has_header, |
| self.escape, |
| self.quote, |
| self.terminator, |
| )?; |
| |
| Arc::new(inferred_schema) |
| } |
| }; |
| let csv_reader = Reader::build_csv_reader( |
| reader, |
| self.has_header, |
| self.delimiter, |
| self.escape, |
| self.quote, |
| self.terminator, |
| ); |
| Ok(Reader::from_csv_reader( |
| csv_reader, |
| schema, |
| self.has_header, |
| self.batch_size, |
| None, |
| self.projection.clone(), |
| )) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| use std::fs::File; |
| use std::io::{Cursor, Write}; |
| use tempfile::NamedTempFile; |
| |
| use crate::array::*; |
| use crate::compute::cast; |
| use crate::datatypes::Field; |
| |
| #[test] |
| fn test_csv() { |
| let schema = Schema::new(vec![ |
| Field::new("city", DataType::Utf8, false), |
| Field::new("lat", DataType::Float64, false), |
| Field::new("lng", DataType::Float64, false), |
| ]); |
| |
| let file = File::open("test/data/uk_cities.csv").unwrap(); |
| |
| let mut csv = Reader::new( |
| file, |
| Arc::new(schema.clone()), |
| false, |
| None, |
| 1024, |
| None, |
| None, |
| ); |
| assert_eq!(Arc::new(schema), csv.schema()); |
| let batch = csv.next().unwrap().unwrap(); |
| assert_eq!(37, batch.num_rows()); |
| assert_eq!(3, batch.num_columns()); |
| |
| // access data from a primitive array |
| let lat = batch |
| .column(1) |
| .as_any() |
| .downcast_ref::<Float64Array>() |
| .unwrap(); |
| assert!(57.653484 - lat.value(0) < f64::EPSILON); |
| |
| // access data from a string array (ListArray<u8>) |
| let city = batch |
| .column(0) |
| .as_any() |
| .downcast_ref::<StringArray>() |
| .unwrap(); |
| |
| assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); |
| } |
| |
| #[test] |
| fn test_csv_schema_metadata() { |
| let mut metadata = std::collections::HashMap::new(); |
| metadata.insert("foo".to_owned(), "bar".to_owned()); |
| let schema = Schema::new_with_metadata( |
| vec![ |
| Field::new("city", DataType::Utf8, false), |
| Field::new("lat", DataType::Float64, false), |
| Field::new("lng", DataType::Float64, false), |
| ], |
| metadata.clone(), |
| ); |
| |
| let file = File::open("test/data/uk_cities.csv").unwrap(); |
| |
| let mut csv = Reader::new( |
| file, |
| Arc::new(schema.clone()), |
| false, |
| None, |
| 1024, |
| None, |
| None, |
| ); |
| assert_eq!(Arc::new(schema), csv.schema()); |
| let batch = csv.next().unwrap().unwrap(); |
| assert_eq!(37, batch.num_rows()); |
| assert_eq!(3, batch.num_columns()); |
| |
| assert_eq!(&metadata, batch.schema().metadata()); |
| } |
| |
| #[test] |
| fn test_csv_from_buf_reader() { |
| let schema = Schema::new(vec![ |
| Field::new("city", DataType::Utf8, false), |
| Field::new("lat", DataType::Float64, false), |
| Field::new("lng", DataType::Float64, false), |
| ]); |
| |
| let file_with_headers = |
| File::open("test/data/uk_cities_with_headers.csv").unwrap(); |
| let file_without_headers = File::open("test/data/uk_cities.csv").unwrap(); |
| let both_files = file_with_headers |
| .chain(Cursor::new("\n".to_string())) |
| .chain(file_without_headers); |
| let mut csv = Reader::from_reader( |
| both_files, |
| Arc::new(schema), |
| true, |
| None, |
| 1024, |
| None, |
| None, |
| ); |
| let batch = csv.next().unwrap().unwrap(); |
| assert_eq!(74, batch.num_rows()); |
| assert_eq!(3, batch.num_columns()); |
| } |
| |
| #[test] |
| fn test_csv_with_schema_inference() { |
| let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); |
| |
| let builder = ReaderBuilder::new().has_header(true).infer_schema(None); |
| |
| let mut csv = builder.build(file).unwrap(); |
| let expected_schema = Schema::new(vec![ |
| Field::new("city", DataType::Utf8, false), |
| Field::new("lat", DataType::Float64, false), |
| Field::new("lng", DataType::Float64, false), |
| ]); |
| assert_eq!(Arc::new(expected_schema), csv.schema()); |
| let batch = csv.next().unwrap().unwrap(); |
| assert_eq!(37, batch.num_rows()); |
| assert_eq!(3, batch.num_columns()); |
| |
| // access data from a primitive array |
| let lat = batch |
| .column(1) |
| .as_any() |
| .downcast_ref::<Float64Array>() |
| .unwrap(); |
| assert!(57.653484 - lat.value(0) < f64::EPSILON); |
| |
| // access data from a string array (ListArray<u8>) |
| let city = batch |
| .column(0) |
| .as_any() |
| .downcast_ref::<StringArray>() |
| .unwrap(); |
| |
| assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); |
| } |
| |
| #[test] |
| fn test_csv_with_schema_inference_no_headers() { |
| let file = File::open("test/data/uk_cities.csv").unwrap(); |
| |
| let builder = ReaderBuilder::new().infer_schema(None); |
| |
| let mut csv = builder.build(file).unwrap(); |
| |
| // csv field names should be 'column_{number}' |
| let schema = csv.schema(); |
| assert_eq!("column_1", schema.field(0).name()); |
| assert_eq!("column_2", schema.field(1).name()); |
| assert_eq!("column_3", schema.field(2).name()); |
| let batch = csv.next().unwrap().unwrap(); |
| let batch_schema = batch.schema(); |
| |
| assert_eq!(schema, batch_schema); |
| assert_eq!(37, batch.num_rows()); |
| assert_eq!(3, batch.num_columns()); |
| |
| // access data from a primitive array |
| let lat = batch |
| .column(1) |
| .as_any() |
| .downcast_ref::<Float64Array>() |
| .unwrap(); |
| assert!(57.653484 - lat.value(0) < f64::EPSILON); |
| |
| // access data from a string array (ListArray<u8>) |
| let city = batch |
| .column(0) |
| .as_any() |
| .downcast_ref::<StringArray>() |
| .unwrap(); |
| |
| assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); |
| } |
| |
| #[test] |
| fn test_csv_with_projection() { |
| let schema = Schema::new(vec![ |
| Field::new("city", DataType::Utf8, false), |
| Field::new("lat", DataType::Float64, false), |
| Field::new("lng", DataType::Float64, false), |
| ]); |
| |
| let file = File::open("test/data/uk_cities.csv").unwrap(); |
| |
| let mut csv = Reader::new( |
| file, |
| Arc::new(schema), |
| false, |
| None, |
| 1024, |
| None, |
| Some(vec![0, 1]), |
| ); |
| let projected_schema = Arc::new(Schema::new(vec![ |
| Field::new("city", DataType::Utf8, false), |
| Field::new("lat", DataType::Float64, false), |
| ])); |
| assert_eq!(projected_schema, csv.schema()); |
| let batch = csv.next().unwrap().unwrap(); |
| assert_eq!(projected_schema, batch.schema()); |
| assert_eq!(37, batch.num_rows()); |
| assert_eq!(2, batch.num_columns()); |
| } |
| |
| #[test] |
| fn test_csv_with_dictionary() { |
| let schema = Schema::new(vec![ |
| Field::new( |
| "city", |
| DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), |
| false, |
| ), |
| Field::new("lat", DataType::Float64, false), |
| Field::new("lng", DataType::Float64, false), |
| ]); |
| |
| let file = File::open("test/data/uk_cities.csv").unwrap(); |
| |
| let mut csv = Reader::new( |
| file, |
| Arc::new(schema), |
| false, |
| None, |
| 1024, |
| None, |
| Some(vec![0, 1]), |
| ); |
| let projected_schema = Arc::new(Schema::new(vec![ |
| Field::new( |
| "city", |
| DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), |
| false, |
| ), |
| Field::new("lat", DataType::Float64, false), |
| ])); |
| assert_eq!(projected_schema, csv.schema()); |
| let batch = csv.next().unwrap().unwrap(); |
| assert_eq!(projected_schema, batch.schema()); |
| assert_eq!(37, batch.num_rows()); |
| assert_eq!(2, batch.num_columns()); |
| |
| let strings = cast(batch.column(0), &DataType::Utf8).unwrap(); |
| let strings = strings.as_any().downcast_ref::<StringArray>().unwrap(); |
| |
| assert_eq!(strings.value(0), "Elgin, Scotland, the UK"); |
| assert_eq!(strings.value(4), "Eastbourne, East Sussex, UK"); |
| assert_eq!(strings.value(29), "Uckfield, East Sussex, UK"); |
| } |
| |
| #[test] |
| fn test_nulls() { |
| let schema = Schema::new(vec![ |
| Field::new("c_int", DataType::UInt64, false), |
| Field::new("c_float", DataType::Float32, false), |
| Field::new("c_string", DataType::Utf8, false), |
| ]); |
| |
| let file = File::open("test/data/null_test.csv").unwrap(); |
| |
| let mut csv = Reader::new(file, Arc::new(schema), true, None, 1024, None, None); |
| let batch = csv.next().unwrap().unwrap(); |
| |
| assert_eq!(false, batch.column(1).is_null(0)); |
| assert_eq!(false, batch.column(1).is_null(1)); |
| assert_eq!(true, batch.column(1).is_null(2)); |
| assert_eq!(false, batch.column(1).is_null(3)); |
| assert_eq!(false, batch.column(1).is_null(4)); |
| } |
| |
| #[test] |
| fn test_nulls_with_inference() { |
| let file = File::open("test/data/various_types.csv").unwrap(); |
| |
| let builder = ReaderBuilder::new() |
| .infer_schema(None) |
| .has_header(true) |
| .with_delimiter(b'|') |
| .with_batch_size(512) |
| .with_projection(vec![0, 1, 2, 3, 4, 5]); |
| |
| let mut csv = builder.build(file).unwrap(); |
| let batch = csv.next().unwrap().unwrap(); |
| |
| assert_eq!(5, batch.num_rows()); |
| assert_eq!(6, batch.num_columns()); |
| |
| let schema = batch.schema(); |
| |
| assert_eq!(&DataType::Int64, schema.field(0).data_type()); |
| assert_eq!(&DataType::Float64, schema.field(1).data_type()); |
| assert_eq!(&DataType::Float64, schema.field(2).data_type()); |
| assert_eq!(&DataType::Boolean, schema.field(3).data_type()); |
| assert_eq!(&DataType::Date32, schema.field(4).data_type()); |
| assert_eq!(&DataType::Date64, schema.field(5).data_type()); |
| |
| let names: Vec<&str> = |
| schema.fields().iter().map(|x| x.name().as_str()).collect(); |
| assert_eq!( |
| names, |
| vec![ |
| "c_int", |
| "c_float", |
| "c_string", |
| "c_bool", |
| "c_date", |
| "c_datetime" |
| ] |
| ); |
| |
| assert_eq!(false, schema.field(0).is_nullable()); |
| assert_eq!(true, schema.field(1).is_nullable()); |
| assert_eq!(true, schema.field(2).is_nullable()); |
| assert_eq!(false, schema.field(3).is_nullable()); |
| assert_eq!(true, schema.field(4).is_nullable()); |
| assert_eq!(true, schema.field(5).is_nullable()); |
| |
| assert_eq!(false, batch.column(1).is_null(0)); |
| assert_eq!(false, batch.column(1).is_null(1)); |
| assert_eq!(true, batch.column(1).is_null(2)); |
| assert_eq!(false, batch.column(1).is_null(3)); |
| assert_eq!(false, batch.column(1).is_null(4)); |
| } |
| |
| #[test] |
| fn test_parse_invalid_csv() { |
| let file = File::open("test/data/various_types_invalid.csv").unwrap(); |
| |
| let schema = Schema::new(vec![ |
| Field::new("c_int", DataType::UInt64, false), |
| Field::new("c_float", DataType::Float32, false), |
| Field::new("c_string", DataType::Utf8, false), |
| Field::new("c_bool", DataType::Boolean, false), |
| ]); |
| |
| let builder = ReaderBuilder::new() |
| .with_schema(Arc::new(schema)) |
| .has_header(true) |
| .with_delimiter(b'|') |
| .with_batch_size(512) |
| .with_projection(vec![0, 1, 2, 3]); |
| |
| let mut csv = builder.build(file).unwrap(); |
| match csv.next() { |
| Some(e) => match e { |
| Err(e) => assert_eq!( |
| "ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")", |
| format!("{:?}", e) |
| ), |
| Ok(_) => panic!("should have failed"), |
| }, |
| None => panic!("should have failed"), |
| } |
| } |
| |
| #[test] |
| fn test_infer_field_schema() { |
| assert_eq!(infer_field_schema("A"), DataType::Utf8); |
| assert_eq!(infer_field_schema("\"123\""), DataType::Utf8); |
| assert_eq!(infer_field_schema("10"), DataType::Int64); |
| assert_eq!(infer_field_schema("10.2"), DataType::Float64); |
| assert_eq!(infer_field_schema("true"), DataType::Boolean); |
| assert_eq!(infer_field_schema("false"), DataType::Boolean); |
| assert_eq!(infer_field_schema("2020-11-08"), DataType::Date32); |
| assert_eq!(infer_field_schema("2020-11-08T14:20:01"), DataType::Date64); |
| } |
| |
| #[test] |
| fn parse_date32() { |
| assert_eq!(parse_item::<Date32Type>("1970-01-01").unwrap(), 0); |
| assert_eq!(parse_item::<Date32Type>("2020-03-15").unwrap(), 18336); |
| assert_eq!(parse_item::<Date32Type>("1945-05-08").unwrap(), -9004); |
| } |
| |
| #[test] |
| fn parse_date64() { |
| assert_eq!(parse_item::<Date64Type>("1970-01-01T00:00:00").unwrap(), 0); |
| assert_eq!( |
| parse_item::<Date64Type>("2018-11-13T17:11:10").unwrap(), |
| 1542129070000 |
| ); |
| assert_eq!( |
| parse_item::<Date64Type>("2018-11-13T17:11:10.011").unwrap(), |
| 1542129070011 |
| ); |
| assert_eq!( |
| parse_item::<Date64Type>("1900-02-28T12:34:56").unwrap(), |
| -2203932304000 |
| ); |
| } |
| |
| #[test] |
| fn test_infer_schema_from_multiple_files() -> Result<()> { |
| let mut csv1 = NamedTempFile::new()?; |
| let mut csv2 = NamedTempFile::new()?; |
| let csv3 = NamedTempFile::new()?; // empty csv file should be skipped |
| let mut csv4 = NamedTempFile::new()?; |
| writeln!(csv1, "c1,c2,c3")?; |
| writeln!(csv1, "1,\"foo\",0.5")?; |
| writeln!(csv1, "3,\"bar\",1")?; |
| // reading csv2 will set c2 to optional |
| writeln!(csv2, "c1,c2,c3,c4")?; |
| writeln!(csv2, "10,,3.14,true")?; |
| // reading csv4 will set c3 to optional |
| writeln!(csv4, "c1,c2,c3")?; |
| writeln!(csv4, "10,\"foo\",")?; |
| |
| let schema = infer_schema_from_files( |
| &[ |
| csv3.path().to_str().unwrap().to_string(), |
| csv1.path().to_str().unwrap().to_string(), |
| csv2.path().to_str().unwrap().to_string(), |
| csv4.path().to_str().unwrap().to_string(), |
| ], |
| b',', |
| Some(3), // only csv1 and csv2 should be read |
| true, |
| )?; |
| |
| assert_eq!(schema.fields().len(), 4); |
| assert_eq!(false, schema.field(0).is_nullable()); |
| assert_eq!(true, schema.field(1).is_nullable()); |
| assert_eq!(false, schema.field(2).is_nullable()); |
| assert_eq!(false, schema.field(3).is_nullable()); |
| |
| assert_eq!(&DataType::Int64, schema.field(0).data_type()); |
| assert_eq!(&DataType::Utf8, schema.field(1).data_type()); |
| assert_eq!(&DataType::Float64, schema.field(2).data_type()); |
| assert_eq!(&DataType::Boolean, schema.field(3).data_type()); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_bounded() { |
| let schema = Schema::new(vec![Field::new("int", DataType::UInt32, false)]); |
| let data = vec![ |
| vec!["0"], |
| vec!["1"], |
| vec!["2"], |
| vec!["3"], |
| vec!["4"], |
| vec!["5"], |
| vec!["6"], |
| ]; |
| |
| let data = data |
| .iter() |
| .map(|x| x.join(",")) |
| .collect::<Vec<_>>() |
| .join("\n"); |
| let data = data.as_bytes(); |
| |
| let reader = std::io::Cursor::new(data); |
| |
| let mut csv = Reader::new( |
| reader, |
| Arc::new(schema), |
| false, |
| None, |
| 2, |
| // starting at row 2 and up to row 6. |
| Some((2, 6)), |
| Some(vec![0]), |
| ); |
| |
| let batch = csv.next().unwrap().unwrap(); |
| let a = batch.column(0); |
| let a = a.as_any().downcast_ref::<UInt32Array>().unwrap(); |
| assert_eq!(a, &UInt32Array::from(vec![2, 3])); |
| |
| let batch = csv.next().unwrap().unwrap(); |
| let a = batch.column(0); |
| let a = a.as_any().downcast_ref::<UInt32Array>().unwrap(); |
| assert_eq!(a, &UInt32Array::from(vec![4, 5])); |
| |
| assert!(csv.next().is_none()); |
| } |
| |
| #[test] |
| fn test_parsing_bool() { |
| // Encode the expected behavior of boolean parsing |
| assert_eq!(Some(true), parse_bool("true")); |
| assert_eq!(Some(true), parse_bool("tRUe")); |
| assert_eq!(Some(true), parse_bool("True")); |
| assert_eq!(Some(true), parse_bool("TRUE")); |
| assert_eq!(None, parse_bool("t")); |
| assert_eq!(None, parse_bool("T")); |
| assert_eq!(None, parse_bool("")); |
| |
| assert_eq!(Some(false), parse_bool("false")); |
| assert_eq!(Some(false), parse_bool("fALse")); |
| assert_eq!(Some(false), parse_bool("False")); |
| assert_eq!(Some(false), parse_bool("FALSE")); |
| assert_eq!(None, parse_bool("f")); |
| assert_eq!(None, parse_bool("F")); |
| assert_eq!(None, parse_bool("")); |
| } |
| |
| #[test] |
| fn test_parsing_float() { |
| assert_eq!(Some(12.34), parse_item::<Float64Type>("12.34")); |
| assert_eq!(Some(-12.34), parse_item::<Float64Type>("-12.34")); |
| assert_eq!(Some(12.0), parse_item::<Float64Type>("12")); |
| assert_eq!(Some(0.0), parse_item::<Float64Type>("0")); |
| assert!(parse_item::<Float64Type>("nan").unwrap().is_nan()); |
| assert!(parse_item::<Float64Type>("NaN").unwrap().is_nan()); |
| assert!(parse_item::<Float64Type>("inf").unwrap().is_infinite()); |
| assert!(parse_item::<Float64Type>("inf").unwrap().is_sign_positive()); |
| assert!(parse_item::<Float64Type>("-inf").unwrap().is_infinite()); |
| assert!(parse_item::<Float64Type>("-inf") |
| .unwrap() |
| .is_sign_negative()); |
| assert_eq!(None, parse_item::<Float64Type>("")); |
| assert_eq!(None, parse_item::<Float64Type>("dd")); |
| assert_eq!(None, parse_item::<Float64Type>("12.34.56")); |
| } |
| |
| #[test] |
| fn test_non_std_quote() { |
| let schema = Schema::new(vec![ |
| Field::new("text1", DataType::Utf8, false), |
| Field::new("text2", DataType::Utf8, false), |
| ]); |
| let builder = ReaderBuilder::new() |
| .with_schema(Arc::new(schema)) |
| .has_header(false) |
| .with_quote(b'~'); // default is ", change to ~ |
| |
| let mut csv_text = Vec::new(); |
| let mut csv_writer = std::io::Cursor::new(&mut csv_text); |
| for index in 0..10 { |
| let text1 = format!("id{:}", index); |
| let text2 = format!("value{:}", index); |
| csv_writer |
| .write_fmt(format_args!("~{}~,~{}~\r\n", text1, text2)) |
| .unwrap(); |
| } |
| let mut csv_reader = std::io::Cursor::new(&csv_text); |
| let mut reader = builder.build(&mut csv_reader).unwrap(); |
| let batch = reader.next().unwrap().unwrap(); |
| let col0 = batch.column(0); |
| assert_eq!(col0.len(), 10); |
| let col0_arr = col0.as_any().downcast_ref::<StringArray>().unwrap(); |
| assert_eq!(col0_arr.value(0), "id0"); |
| let col1 = batch.column(1); |
| assert_eq!(col1.len(), 10); |
| let col1_arr = col1.as_any().downcast_ref::<StringArray>().unwrap(); |
| assert_eq!(col1_arr.value(5), "value5"); |
| } |
| |
| #[test] |
| fn test_non_std_escape() { |
| let schema = Schema::new(vec![ |
| Field::new("text1", DataType::Utf8, false), |
| Field::new("text2", DataType::Utf8, false), |
| ]); |
| let builder = ReaderBuilder::new() |
| .with_schema(Arc::new(schema)) |
| .has_header(false) |
| .with_escape(b'\\'); // default is None, change to \ |
| |
| let mut csv_text = Vec::new(); |
| let mut csv_writer = std::io::Cursor::new(&mut csv_text); |
| for index in 0..10 { |
| let text1 = format!("id{:}", index); |
| let text2 = format!("value\\\"{:}", index); |
| csv_writer |
| .write_fmt(format_args!("\"{}\",\"{}\"\r\n", text1, text2)) |
| .unwrap(); |
| } |
| let mut csv_reader = std::io::Cursor::new(&csv_text); |
| let mut reader = builder.build(&mut csv_reader).unwrap(); |
| let batch = reader.next().unwrap().unwrap(); |
| let col0 = batch.column(0); |
| assert_eq!(col0.len(), 10); |
| let col0_arr = col0.as_any().downcast_ref::<StringArray>().unwrap(); |
| assert_eq!(col0_arr.value(0), "id0"); |
| let col1 = batch.column(1); |
| assert_eq!(col1.len(), 10); |
| let col1_arr = col1.as_any().downcast_ref::<StringArray>().unwrap(); |
| assert_eq!(col1_arr.value(5), "value\"5"); |
| } |
| |
| #[test] |
| fn test_non_std_terminator() { |
| let schema = Schema::new(vec![ |
| Field::new("text1", DataType::Utf8, false), |
| Field::new("text2", DataType::Utf8, false), |
| ]); |
| let builder = ReaderBuilder::new() |
| .with_schema(Arc::new(schema)) |
| .has_header(false) |
| .with_terminator(b'\n'); // default is CRLF, change to LF |
| |
| let mut csv_text = Vec::new(); |
| let mut csv_writer = std::io::Cursor::new(&mut csv_text); |
| for index in 0..10 { |
| let text1 = format!("id{:}", index); |
| let text2 = format!("value{:}", index); |
| csv_writer |
| .write_fmt(format_args!("\"{}\",\"{}\"\n", text1, text2)) |
| .unwrap(); |
| } |
| let mut csv_reader = std::io::Cursor::new(&csv_text); |
| let mut reader = builder.build(&mut csv_reader).unwrap(); |
| let batch = reader.next().unwrap().unwrap(); |
| let col0 = batch.column(0); |
| assert_eq!(col0.len(), 10); |
| let col0_arr = col0.as_any().downcast_ref::<StringArray>().unwrap(); |
| assert_eq!(col0_arr.value(0), "id0"); |
| let col1 = batch.column(1); |
| assert_eq!(col1.len(), 10); |
| let col1_arr = col1.as_any().downcast_ref::<StringArray>().unwrap(); |
| assert_eq!(col1_arr.value(5), "value5"); |
| } |
| } |