| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! CSV Reader |
| //! |
| //! This CSV reader allows CSV files to be read into the Arrow memory model. Records are |
| //! loaded in batches and are then converted from row-based data to columnar data. |
| //! |
| //! Example: |
| //! |
| //! ``` |
| //! use arrow::csv; |
| //! use arrow::datatypes::{DataType, Field, Schema}; |
| //! use std::fs::File; |
| //! use std::sync::Arc; |
| //! |
| //! let schema = Schema::new(vec![ |
| //! Field::new("city", DataType::Utf8, false), |
| //! Field::new("lat", DataType::Float64, false), |
| //! Field::new("lng", DataType::Float64, false), |
| //! ]); |
| //! |
| //! let file = File::open("test/data/uk_cities.csv").unwrap(); |
| //! |
| //! let mut csv = csv::Reader::new(file, Arc::new(schema), false, 1024, None); |
| //! let batch = csv.next().unwrap().unwrap(); |
| //! ``` |
| |
| use lazy_static::lazy_static; |
| use regex::{Regex, RegexBuilder}; |
| use std::collections::HashSet; |
| use std::io::{BufReader, Read, Seek, SeekFrom}; |
| use std::sync::Arc; |
| |
| use csv as csv_crate; |
| |
| use crate::array::{ArrayRef, BinaryBuilder, PrimitiveBuilder}; |
| use crate::datatypes::*; |
| use crate::error::{ArrowError, Result}; |
| use crate::record_batch::RecordBatch; |
| |
| use self::csv_crate::{StringRecord, StringRecordsIntoIter}; |
| |
| lazy_static! { |
| static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap(); |
| static ref INTEGER_RE: Regex = Regex::new(r"^-?(\d*.)$").unwrap(); |
| static ref BOOLEAN_RE: Regex = RegexBuilder::new(r"^(true)$|^(false)$") |
| .case_insensitive(true) |
| .build() |
| .unwrap(); |
| } |
| |
| /// Infer the data type of a record |
| fn infer_field_schema(string: &str) -> DataType { |
| // when quoting is enabled in the reader, these quotes aren't escaped, we default to |
| // Utf8 for them |
| if string.starts_with("\"") { |
| return DataType::Utf8; |
| } |
| // match regex in a particular order |
| if BOOLEAN_RE.is_match(string) { |
| return DataType::Boolean; |
| } else if DECIMAL_RE.is_match(string) { |
| return DataType::Float64; |
| } else if INTEGER_RE.is_match(string) { |
| return DataType::Int64; |
| } else { |
| return DataType::Utf8; |
| } |
| } |
| |
| /// Infer the schema of a CSV file by reading through the first n records of the file, |
| /// with `max_read_records` controlling the maximum number of records to read. |
| /// |
| /// If `max_read_records` is not set, the whole file is read to infer its schema. |
| fn infer_file_schema<R: Read + Seek>( |
| reader: &mut BufReader<R>, |
| delimiter: u8, |
| max_read_records: Option<usize>, |
| has_headers: bool, |
| ) -> Result<Schema> { |
| let mut csv_reader = csv::ReaderBuilder::new() |
| .delimiter(delimiter) |
| .from_reader(reader); |
| |
| // get or create header names |
| // when has_headers is false, creates default column names with column_ prefix |
| let headers: Vec<String> = if has_headers { |
| let headers = &csv_reader.headers()?.clone(); |
| headers.iter().map(|s| s.to_string()).collect() |
| } else { |
| let first_record_count = &csv_reader.headers()?.len(); |
| (0..*first_record_count) |
| .map(|i| format!("column_{}", i + 1)) |
| .into_iter() |
| .collect() |
| }; |
| |
| // save the csv reader position after reading headers |
| let position = csv_reader.position().clone(); |
| |
| let header_length = headers.len(); |
| // keep track of inferred field types |
| let mut column_types: Vec<HashSet<DataType>> = vec![HashSet::new(); header_length]; |
| // keep track of columns with nulls |
| let mut nulls: Vec<bool> = vec![false; header_length]; |
| |
| // return csv reader position to after headers |
| csv_reader.seek(position)?; |
| |
| let mut fields = vec![]; |
| |
| for result in csv_reader |
| .records() |
| .take(max_read_records.unwrap_or(std::usize::MAX)) |
| { |
| let record = result?; |
| |
| for i in 0..header_length { |
| let string: Option<&str> = record.get(i); |
| match string { |
| Some(s) => { |
| if s == "" { |
| nulls[i] = true; |
| } else { |
| column_types[i].insert(infer_field_schema(s)); |
| } |
| } |
| _ => {} |
| } |
| } |
| } |
| |
| // build schema from inference results |
| for i in 0..header_length { |
| let possibilities = &column_types[i]; |
| let has_nulls = nulls[i]; |
| let field_name = &headers[i]; |
| |
| // determine data type based on possible types |
| // if there are incompatible types, use DataType::Utf8 |
| match possibilities.len() { |
| 1 => { |
| for dtype in possibilities.iter() { |
| fields.push(Field::new(&field_name, dtype.clone(), has_nulls)); |
| } |
| } |
| 2 => { |
| if possibilities.contains(&DataType::Int64) |
| && possibilities.contains(&DataType::Float64) |
| { |
| // we have an integer and double, fall down to double |
| fields.push(Field::new(&field_name, DataType::Float64, has_nulls)); |
| } else { |
| // default to Utf8 for conflicting datatypes (e.g bool and int) |
| fields.push(Field::new(&field_name, DataType::Utf8, has_nulls)); |
| } |
| } |
| _ => fields.push(Field::new(&field_name, DataType::Utf8, has_nulls)), |
| } |
| } |
| |
| // return the reader seek back to the start |
| csv_reader.into_inner().seek(SeekFrom::Start(0))?; |
| |
| Ok(Schema::new(fields)) |
| } |
| |
| /// CSV file reader |
| pub struct Reader<R: Read> { |
| /// Explicit schema for the CSV file |
| schema: Arc<Schema>, |
| /// Optional projection for which columns to load (zero-based column indices) |
| projection: Option<Vec<usize>>, |
| /// File reader |
| record_iter: StringRecordsIntoIter<BufReader<R>>, |
| /// Batch size (number of records to load each time) |
| batch_size: usize, |
| /// Current line number, used in error reporting |
| line_number: usize, |
| } |
| |
| impl<R: Read> Reader<R> { |
| /// Create a new CsvReader from any value that implements the `Read` trait. |
| /// |
| /// If reading a `File` or an input that supports `std::io::Read` and `std::io::Seek`; |
| /// you can customise the Reader, such as to enable schema inference, use |
| /// `ReaderBuilder`. |
| pub fn new( |
| reader: R, |
| schema: Arc<Schema>, |
| has_headers: bool, |
| batch_size: usize, |
| projection: Option<Vec<usize>>, |
| ) -> Self { |
| Self::from_buf_reader( |
| BufReader::new(reader), |
| schema, |
| has_headers, |
| batch_size, |
| projection, |
| ) |
| } |
| |
| /// Returns the schema of the reader, useful for getting the schema without reading |
| /// record batches |
| pub fn schema(&self) -> Arc<Schema> { |
| match &self.projection { |
| Some(projection) => { |
| let fields = self.schema.fields(); |
| let projected_fields: Vec<Field> = |
| projection.iter().map(|i| fields[*i].clone()).collect(); |
| |
| Arc::new(Schema::new(projected_fields)) |
| } |
| None => self.schema.clone(), |
| } |
| } |
| |
| /// Create a new CsvReader from a `BufReader<R: Read> |
| /// |
| /// This constructor allows you more flexibility in what records are processed by the |
| /// csv reader. |
| pub fn from_buf_reader( |
| buf_reader: BufReader<R>, |
| schema: Arc<Schema>, |
| has_headers: bool, |
| batch_size: usize, |
| projection: Option<Vec<usize>>, |
| ) -> Self { |
| let csv_reader = csv::ReaderBuilder::new() |
| .has_headers(has_headers) |
| .from_reader(buf_reader); |
| let record_iter = csv_reader.into_records(); |
| Self { |
| schema, |
| projection, |
| record_iter, |
| batch_size, |
| line_number: if has_headers { 1 } else { 0 }, |
| } |
| } |
| |
| /// Read the next batch of rows |
| pub fn next(&mut self) -> Result<Option<RecordBatch>> { |
| // read a batch of rows into memory |
| let mut rows: Vec<StringRecord> = Vec::with_capacity(self.batch_size); |
| for i in 0..self.batch_size { |
| match self.record_iter.next() { |
| Some(Ok(r)) => { |
| rows.push(r); |
| } |
| Some(Err(e)) => { |
| return Err(ArrowError::ParseError(format!( |
| "Error parsing line {}: {:?}", |
| self.line_number + i, |
| e |
| ))); |
| } |
| None => break, |
| } |
| } |
| |
| // return early if no data was loaded |
| if rows.is_empty() { |
| return Ok(None); |
| } |
| |
| let projection: Vec<usize> = match self.projection { |
| Some(ref v) => v.clone(), |
| None => self |
| .schema |
| .fields() |
| .iter() |
| .enumerate() |
| .map(|(i, _)| i) |
| .collect(), |
| }; |
| |
| let rows = &rows[..]; |
| let arrays: Result<Vec<ArrayRef>> = projection |
| .iter() |
| .map(|i| { |
| let field = self.schema.field(*i); |
| match field.data_type() { |
| &DataType::Boolean => { |
| self.build_primitive_array::<BooleanType>(rows, i) |
| } |
| &DataType::Int8 => self.build_primitive_array::<Int8Type>(rows, i), |
| &DataType::Int16 => self.build_primitive_array::<Int16Type>(rows, i), |
| &DataType::Int32 => self.build_primitive_array::<Int32Type>(rows, i), |
| &DataType::Int64 => self.build_primitive_array::<Int64Type>(rows, i), |
| &DataType::UInt8 => self.build_primitive_array::<UInt8Type>(rows, i), |
| &DataType::UInt16 => { |
| self.build_primitive_array::<UInt16Type>(rows, i) |
| } |
| &DataType::UInt32 => { |
| self.build_primitive_array::<UInt32Type>(rows, i) |
| } |
| &DataType::UInt64 => { |
| self.build_primitive_array::<UInt64Type>(rows, i) |
| } |
| &DataType::Float32 => { |
| self.build_primitive_array::<Float32Type>(rows, i) |
| } |
| &DataType::Float64 => { |
| self.build_primitive_array::<Float64Type>(rows, i) |
| } |
| &DataType::Utf8 => { |
| let mut builder = BinaryBuilder::new(rows.len()); |
| for row_index in 0..rows.len() { |
| match rows[row_index].get(*i) { |
| Some(s) => builder.append_string(s).unwrap(), |
| _ => builder.append(false).unwrap(), |
| } |
| } |
| Ok(Arc::new(builder.finish()) as ArrayRef) |
| } |
| other => Err(ArrowError::ParseError(format!( |
| "Unsupported data type {:?}", |
| other |
| ))), |
| } |
| }) |
| .collect(); |
| |
| self.line_number += rows.len(); |
| |
| let schema_fields = self.schema.fields(); |
| |
| let projected_fields: Vec<Field> = projection |
| .iter() |
| .map(|i| schema_fields[*i].clone()) |
| .collect(); |
| |
| let projected_schema = Arc::new(Schema::new(projected_fields)); |
| |
| match arrays { |
| Ok(arr) => match RecordBatch::try_new(projected_schema, arr) { |
| Ok(batch) => Ok(Some(batch)), |
| Err(e) => Err(e), |
| }, |
| Err(e) => Err(e), |
| } |
| } |
| |
| fn build_primitive_array<T: ArrowPrimitiveType>( |
| &self, |
| rows: &[StringRecord], |
| col_idx: &usize, |
| ) -> Result<ArrayRef> { |
| let mut builder = PrimitiveBuilder::<T>::new(rows.len()); |
| let is_boolean_type = |
| *self.schema.field(*col_idx).data_type() == DataType::Boolean; |
| for row_index in 0..rows.len() { |
| match rows[row_index].get(*col_idx) { |
| Some(s) if s.len() > 0 => { |
| let t = if is_boolean_type { |
| s.to_lowercase().parse::<T::Native>() |
| } else { |
| s.parse::<T::Native>() |
| }; |
| match t { |
| Ok(v) => builder.append_value(v)?, |
| Err(_) => { |
| // TODO: we should surface the underlying error here. |
| return Err(ArrowError::ParseError(format!( |
| "Error while parsing value {} at line {}", |
| s, |
| self.line_number + row_index |
| ))); |
| } |
| } |
| } |
| _ => builder.append_null()?, |
| } |
| } |
| Ok(Arc::new(builder.finish())) |
| } |
| } |
| |
| /// CSV file reader builder |
| pub struct ReaderBuilder { |
| /// Optional schema for the CSV file |
| /// |
| /// If the schema is not supplied, the reader will try to infer the schema |
| /// based on the CSV structure. |
| schema: Option<Arc<Schema>>, |
| /// Whether the file has headers or not |
| /// |
| /// If schema inference is run on a file with no headers, default column names |
| /// are created. |
| has_headers: bool, |
| /// An optional column delimiter. Defauits to `b','` |
| delimiter: Option<u8>, |
| /// Optional maximum number of records to read during schema inference |
| /// |
| /// If a number is not provided, all the records are read. |
| max_records: Option<usize>, |
| /// Batch size (number of records to load each time) |
| /// |
| /// The default batch size when using the `ReaderBuilder` is 1024 records |
| batch_size: usize, |
| /// Optional projection for which columns to load (zero-based column indices) |
| projection: Option<Vec<usize>>, |
| } |
| |
| impl Default for ReaderBuilder { |
| fn default() -> ReaderBuilder { |
| ReaderBuilder { |
| schema: None, |
| has_headers: false, |
| delimiter: None, |
| max_records: None, |
| batch_size: 1024, |
| projection: None, |
| } |
| } |
| } |
| |
| impl ReaderBuilder { |
| /// Create a new builder for configuring CSV parsing options. |
| /// |
| /// To convert a builder into a reader, call `ReaderBuilder::build` |
| /// |
| /// # Example |
| /// |
| /// ``` |
| /// extern crate arrow; |
| /// |
| /// use arrow::csv; |
| /// use std::fs::File; |
| /// |
| /// fn example() -> csv::Reader<File> { |
| /// let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); |
| /// |
| /// // create a builder, inferring the schema with the first 100 records |
| /// let builder = csv::ReaderBuilder::new().infer_schema(Some(100)); |
| /// |
| /// let reader = builder.build(file).unwrap(); |
| /// |
| /// reader |
| /// } |
| /// ``` |
| pub fn new() -> ReaderBuilder { |
| ReaderBuilder::default() |
| } |
| |
| /// Set the CSV file's schema |
| pub fn with_schema(mut self, schema: Arc<Schema>) -> Self { |
| self.schema = Some(schema); |
| self |
| } |
| |
| /// Set whether the CSV file has headers |
| pub fn has_headers(mut self, has_headers: bool) -> Self { |
| self.has_headers = has_headers; |
| self |
| } |
| |
| /// Set the CSV file's column delimiter as a byte character |
| pub fn with_delimiter(mut self, delimiter: u8) -> Self { |
| self.delimiter = Some(delimiter); |
| self |
| } |
| |
| /// Set the CSV reader to infer the schema of the file |
| pub fn infer_schema(mut self, max_records: Option<usize>) -> Self { |
| // remove any schema that is set |
| self.schema = None; |
| self.max_records = max_records; |
| self |
| } |
| |
| /// Set the batch size (number of records to load at one time) |
| pub fn with_batch_size(mut self, batch_size: usize) -> Self { |
| self.batch_size = batch_size; |
| self |
| } |
| |
| /// Set the reader's column projection |
| pub fn with_projection(mut self, projection: Vec<usize>) -> Self { |
| self.projection = Some(projection); |
| self |
| } |
| |
| /// Create a new `Reader` from the `ReaderBuilder` |
| pub fn build<R: Read + Seek>(self, reader: R) -> Result<Reader<R>> { |
| // check if schema should be inferred |
| let mut buf_reader = BufReader::new(reader); |
| let schema = match self.schema { |
| Some(schema) => schema, |
| None => { |
| let inferred_schema = infer_file_schema( |
| &mut buf_reader, |
| self.delimiter.unwrap_or(b','), |
| self.max_records, |
| self.has_headers, |
| )?; |
| |
| Arc::new(inferred_schema) |
| } |
| }; |
| let csv_reader = csv::ReaderBuilder::new() |
| .delimiter(self.delimiter.unwrap_or(b',')) |
| .has_headers(self.has_headers) |
| .from_reader(buf_reader); |
| let record_iter = csv_reader.into_records(); |
| Ok(Reader { |
| schema, |
| projection: self.projection.clone(), |
| record_iter, |
| batch_size: self.batch_size, |
| line_number: if self.has_headers { 1 } else { 0 }, |
| }) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| use std::fs::File; |
| use std::io::Cursor; |
| |
| use crate::array::*; |
| use crate::datatypes::Field; |
| |
| #[test] |
| fn test_csv() { |
| let schema = Schema::new(vec![ |
| Field::new("city", DataType::Utf8, false), |
| Field::new("lat", DataType::Float64, false), |
| Field::new("lng", DataType::Float64, false), |
| ]); |
| |
| let file = File::open("test/data/uk_cities.csv").unwrap(); |
| |
| let mut csv = Reader::new(file, Arc::new(schema.clone()), false, 1024, None); |
| assert_eq!(Arc::new(schema), csv.schema()); |
| let batch = csv.next().unwrap().unwrap(); |
| assert_eq!(37, batch.num_rows()); |
| assert_eq!(3, batch.num_columns()); |
| |
| // access data from a primitive array |
| let lat = batch |
| .column(1) |
| .as_any() |
| .downcast_ref::<Float64Array>() |
| .unwrap(); |
| assert_eq!(57.653484, lat.value(0)); |
| |
| // access data from a string array (ListArray<u8>) |
| let city = batch |
| .column(0) |
| .as_any() |
| .downcast_ref::<BinaryArray>() |
| .unwrap(); |
| |
| let city_name: String = String::from_utf8(city.value(13).to_vec()).unwrap(); |
| |
| assert_eq!("Aberdeen, Aberdeen City, UK", city_name); |
| } |
| |
| #[test] |
| fn test_csv_from_buf_reader() { |
| let schema = Schema::new(vec![ |
| Field::new("city", DataType::Utf8, false), |
| Field::new("lat", DataType::Float64, false), |
| Field::new("lng", DataType::Float64, false), |
| ]); |
| |
| let file_with_headers = |
| File::open("test/data/uk_cities_with_headers.csv").unwrap(); |
| let file_without_headers = File::open("test/data/uk_cities.csv").unwrap(); |
| let both_files = file_with_headers |
| .chain(Cursor::new("\n".to_string())) |
| .chain(file_without_headers); |
| let mut csv = Reader::from_buf_reader( |
| BufReader::new(both_files), |
| Arc::new(schema), |
| true, |
| 1024, |
| None, |
| ); |
| let batch = csv.next().unwrap().unwrap(); |
| assert_eq!(74, batch.num_rows()); |
| assert_eq!(3, batch.num_columns()); |
| } |
| |
| #[test] |
| fn test_csv_with_schema_inference() { |
| let file = File::open("test/data/uk_cities_with_headers.csv").unwrap(); |
| |
| let builder = ReaderBuilder::new().has_headers(true).infer_schema(None); |
| |
| let mut csv = builder.build(file).unwrap(); |
| let expected_schema = Schema::new(vec![ |
| Field::new("city", DataType::Utf8, false), |
| Field::new("lat", DataType::Float64, false), |
| Field::new("lng", DataType::Float64, false), |
| ]); |
| assert_eq!(Arc::new(expected_schema), csv.schema()); |
| let batch = csv.next().unwrap().unwrap(); |
| assert_eq!(37, batch.num_rows()); |
| assert_eq!(3, batch.num_columns()); |
| |
| // access data from a primitive array |
| let lat = batch |
| .column(1) |
| .as_any() |
| .downcast_ref::<Float64Array>() |
| .unwrap(); |
| assert_eq!(57.653484, lat.value(0)); |
| |
| // access data from a string array (ListArray<u8>) |
| let city = batch |
| .column(0) |
| .as_any() |
| .downcast_ref::<BinaryArray>() |
| .unwrap(); |
| |
| let city_name: String = String::from_utf8(city.value(13).to_vec()).unwrap(); |
| |
| assert_eq!("Aberdeen, Aberdeen City, UK", city_name); |
| } |
| |
| #[test] |
| fn test_csv_with_schema_inference_no_headers() { |
| let file = File::open("test/data/uk_cities.csv").unwrap(); |
| |
| let builder = ReaderBuilder::new().infer_schema(None); |
| |
| let mut csv = builder.build(file).unwrap(); |
| |
| // csv field names should be 'column_{number}' |
| let schema = csv.schema(); |
| assert_eq!("column_1", schema.field(0).name()); |
| assert_eq!("column_2", schema.field(1).name()); |
| assert_eq!("column_3", schema.field(2).name()); |
| let batch = csv.next().unwrap().unwrap(); |
| let batch_schema = batch.schema(); |
| |
| assert_eq!(&schema, batch_schema); |
| assert_eq!(37, batch.num_rows()); |
| assert_eq!(3, batch.num_columns()); |
| |
| // access data from a primitive array |
| let lat = batch |
| .column(1) |
| .as_any() |
| .downcast_ref::<Float64Array>() |
| .unwrap(); |
| assert_eq!(57.653484, lat.value(0)); |
| |
| // access data from a string array (ListArray<u8>) |
| let city = batch |
| .column(0) |
| .as_any() |
| .downcast_ref::<BinaryArray>() |
| .unwrap(); |
| |
| let city_name: String = String::from_utf8(city.value(13).to_vec()).unwrap(); |
| |
| assert_eq!("Aberdeen, Aberdeen City, UK", city_name); |
| } |
| |
| #[test] |
| fn test_csv_with_projection() { |
| let schema = Schema::new(vec![ |
| Field::new("city", DataType::Utf8, false), |
| Field::new("lat", DataType::Float64, false), |
| Field::new("lng", DataType::Float64, false), |
| ]); |
| |
| let file = File::open("test/data/uk_cities.csv").unwrap(); |
| |
| let mut csv = Reader::new(file, Arc::new(schema), false, 1024, Some(vec![0, 1])); |
| let projected_schema = Arc::new(Schema::new(vec![ |
| Field::new("city", DataType::Utf8, false), |
| Field::new("lat", DataType::Float64, false), |
| ])); |
| assert_eq!(projected_schema.clone(), csv.schema()); |
| let batch = csv.next().unwrap().unwrap(); |
| assert_eq!(&projected_schema, batch.schema()); |
| assert_eq!(37, batch.num_rows()); |
| assert_eq!(2, batch.num_columns()); |
| } |
| |
| #[test] |
| fn test_nulls() { |
| let schema = Schema::new(vec![ |
| Field::new("c_int", DataType::UInt64, false), |
| Field::new("c_float", DataType::Float32, false), |
| Field::new("c_string", DataType::Utf8, false), |
| ]); |
| |
| let file = File::open("test/data/null_test.csv").unwrap(); |
| |
| let mut csv = Reader::new(file, Arc::new(schema), true, 1024, None); |
| let batch = csv.next().unwrap().unwrap(); |
| |
| assert_eq!(false, batch.column(1).is_null(0)); |
| assert_eq!(false, batch.column(1).is_null(1)); |
| assert_eq!(true, batch.column(1).is_null(2)); |
| assert_eq!(false, batch.column(1).is_null(3)); |
| assert_eq!(false, batch.column(1).is_null(4)); |
| } |
| |
| #[test] |
| fn test_nulls_with_inference() { |
| let file = File::open("test/data/various_types.csv").unwrap(); |
| |
| let builder = ReaderBuilder::new() |
| .infer_schema(None) |
| .has_headers(true) |
| .with_delimiter(b'|') |
| .with_batch_size(512) |
| .with_projection(vec![0, 1, 2, 3]); |
| |
| let mut csv = builder.build(file).unwrap(); |
| let batch = csv.next().unwrap().unwrap(); |
| |
| assert_eq!(5, batch.num_rows()); |
| assert_eq!(4, batch.num_columns()); |
| |
| let schema = batch.schema(); |
| |
| assert_eq!(&DataType::Int64, schema.field(0).data_type()); |
| assert_eq!(&DataType::Float64, schema.field(1).data_type()); |
| assert_eq!(&DataType::Float64, schema.field(2).data_type()); |
| assert_eq!(&DataType::Boolean, schema.field(3).data_type()); |
| |
| assert_eq!(false, schema.field(0).is_nullable()); |
| assert_eq!(true, schema.field(1).is_nullable()); |
| assert_eq!(true, schema.field(2).is_nullable()); |
| assert_eq!(false, schema.field(3).is_nullable()); |
| |
| assert_eq!(false, batch.column(1).is_null(0)); |
| assert_eq!(false, batch.column(1).is_null(1)); |
| assert_eq!(true, batch.column(1).is_null(2)); |
| assert_eq!(false, batch.column(1).is_null(3)); |
| assert_eq!(false, batch.column(1).is_null(4)); |
| } |
| |
| #[test] |
| fn test_parse_invalid_csv() { |
| let file = File::open("test/data/various_types_invalid.csv").unwrap(); |
| |
| let schema = Schema::new(vec![ |
| Field::new("c_int", DataType::UInt64, false), |
| Field::new("c_float", DataType::Float32, false), |
| Field::new("c_string", DataType::Utf8, false), |
| Field::new("c_bool", DataType::Boolean, false), |
| ]); |
| |
| let builder = ReaderBuilder::new() |
| .with_schema(Arc::new(schema)) |
| .has_headers(true) |
| .with_delimiter(b'|') |
| .with_batch_size(512) |
| .with_projection(vec![0, 1, 2, 3]); |
| |
| let mut csv = builder.build(file).unwrap(); |
| match csv.next() { |
| Err(e) => assert_eq!( |
| "ParseError(\"Error while parsing value 4.x4 at line 4\")", |
| format!("{:?}", e) |
| ), |
| Ok(_) => panic!("should have failed"), |
| } |
| } |
| } |