allow to read non-standard CSV (#326)

* refactor Reader::from_reader

split into build_csv_reader, from_csv_reader
add escape, quote, terminator arg to build_csv_reader

* add escape,quote,terminator field to ReaderBuilder

schema inference support for non-standard CSV

  add fn infer_file_schema_with_csv_options
  add fn infer_reader_schema_with_csv_options

ReaderBuilder support for non-standard CSV

add escape, quote, terminator field
add fn with_escape, with_quote, with_terminator
change ReaderBuilder::build for non-standard CSV

* minimize API change

* add tests

add #[test] fn test_non_std_quote
add #[test] fn test_non_std_escape
add #[test] fn test_non_std_terminator

* apply cargo fmt
diff --git a/arrow/src/csv/reader.rs b/arrow/src/csv/reader.rs
index 5b9fb5d..b2f7a7a 100644
--- a/arrow/src/csv/reader.rs
+++ b/arrow/src/csv/reader.rs
@@ -106,10 +106,37 @@
     max_read_records: Option<usize>,
     has_header: bool,
 ) -> Result<(Schema, usize)> {
+    infer_file_schema_with_csv_options(
+        reader,
+        delimiter,
+        max_read_records,
+        has_header,
+        None,
+        None,
+        None,
+    )
+}
+
+fn infer_file_schema_with_csv_options<R: Read + Seek>(
+    reader: &mut R,
+    delimiter: u8,
+    max_read_records: Option<usize>,
+    has_header: bool,
+    escape: Option<u8>,
+    quote: Option<u8>,
+    terminator: Option<u8>,
+) -> Result<(Schema, usize)> {
     let saved_offset = reader.seek(SeekFrom::Current(0))?;
 
-    let (schema, records_count) =
-        infer_reader_schema(reader, delimiter, max_read_records, has_header)?;
+    let (schema, records_count) = infer_reader_schema_with_csv_options(
+        reader,
+        delimiter,
+        max_read_records,
+        has_header,
+        escape,
+        quote,
+        terminator,
+    )?;
 
     // return the reader seek back to the start
     reader.seek(SeekFrom::Start(saved_offset))?;
@@ -129,9 +156,34 @@
     max_read_records: Option<usize>,
     has_header: bool,
 ) -> Result<(Schema, usize)> {
-    let mut csv_reader = csv_crate::ReaderBuilder::new()
-        .delimiter(delimiter)
-        .from_reader(reader);
+    infer_reader_schema_with_csv_options(
+        reader,
+        delimiter,
+        max_read_records,
+        has_header,
+        None,
+        None,
+        None,
+    )
+}
+
+fn infer_reader_schema_with_csv_options<R: Read>(
+    reader: &mut R,
+    delimiter: u8,
+    max_read_records: Option<usize>,
+    has_header: bool,
+    escape: Option<u8>,
+    quote: Option<u8>,
+    terminator: Option<u8>,
+) -> Result<(Schema, usize)> {
+    let mut csv_reader = Reader::build_csv_reader(
+        reader,
+        has_header,
+        Some(delimiter),
+        escape,
+        quote,
+        terminator,
+    );
 
     // get or create header names
     // when has_header is false, creates default column names with column_ prefix
@@ -322,15 +374,45 @@
         bounds: Bounds,
         projection: Option<Vec<usize>>,
     ) -> Self {
+        let csv_reader =
+            Self::build_csv_reader(reader, has_header, delimiter, None, None, None);
+        Self::from_csv_reader(
+            csv_reader, schema, has_header, batch_size, bounds, projection,
+        )
+    }
+
+    fn build_csv_reader(
+        reader: R,
+        has_header: bool,
+        delimiter: Option<u8>,
+        escape: Option<u8>,
+        quote: Option<u8>,
+        terminator: Option<u8>,
+    ) -> csv_crate::Reader<R> {
         let mut reader_builder = csv_crate::ReaderBuilder::new();
         reader_builder.has_headers(has_header);
 
         if let Some(c) = delimiter {
             reader_builder.delimiter(c);
         }
+        reader_builder.escape(escape);
+        if let Some(c) = quote {
+            reader_builder.quote(c);
+        }
+        if let Some(t) = terminator {
+            reader_builder.terminator(csv_crate::Terminator::Any(t));
+        }
+        reader_builder.from_reader(reader)
+    }
 
-        let mut csv_reader = reader_builder.from_reader(reader);
-
+    fn from_csv_reader(
+        mut csv_reader: csv_crate::Reader<R>,
+        schema: SchemaRef,
+        has_header: bool,
+        batch_size: usize,
+        bounds: Bounds,
+        projection: Option<Vec<usize>>,
+    ) -> Self {
         let (start, end) = match bounds {
             None => (0, usize::MAX),
             Some((start, end)) => (start, end),
@@ -729,6 +811,12 @@
     has_header: bool,
     /// An optional column delimiter. Defaults to `b','`
     delimiter: Option<u8>,
+    /// An optional escape charactor. Defaults None
+    escape: Option<u8>,
+    /// An optional quote charactor. Defaults b'\"'
+    quote: Option<u8>,
+    /// An optional record terminator. Defaults CRLF
+    terminator: Option<u8>,
     /// Optional maximum number of records to read during schema inference
     ///
     /// If a number is not provided, all the records are read.
@@ -749,6 +837,9 @@
             schema: None,
             has_header: false,
             delimiter: None,
+            escape: None,
+            quote: None,
+            terminator: None,
             max_records: None,
             batch_size: 1024,
             bounds: None,
@@ -803,6 +894,21 @@
         self
     }
 
+    pub fn with_escape(mut self, escape: u8) -> Self {
+        self.escape = Some(escape);
+        self
+    }
+
+    pub fn with_quote(mut self, quote: u8) -> Self {
+        self.quote = Some(quote);
+        self
+    }
+
+    pub fn with_terminator(mut self, terminator: u8) -> Self {
+        self.terminator = Some(terminator);
+        self
+    }
+
     /// Set the CSV reader to infer the schema of the file
     pub fn infer_schema(mut self, max_records: Option<usize>) -> Self {
         // remove any schema that is set
@@ -830,21 +936,31 @@
         let schema = match self.schema {
             Some(schema) => schema,
             None => {
-                let (inferred_schema, _) = infer_file_schema(
+                let (inferred_schema, _) = infer_file_schema_with_csv_options(
                     &mut reader,
                     delimiter,
                     self.max_records,
                     self.has_header,
+                    self.escape,
+                    self.quote,
+                    self.terminator,
                 )?;
 
                 Arc::new(inferred_schema)
             }
         };
-        Ok(Reader::from_reader(
+        let csv_reader = Reader::build_csv_reader(
             reader,
-            schema,
             self.has_header,
             self.delimiter,
+            self.escape,
+            self.quote,
+            self.terminator,
+        );
+        Ok(Reader::from_csv_reader(
+            csv_reader,
+            schema,
+            self.has_header,
             self.batch_size,
             None,
             self.projection.clone(),
@@ -1381,4 +1497,103 @@
         assert_eq!(None, parse_item::<Float64Type>("dd"));
         assert_eq!(None, parse_item::<Float64Type>("12.34.56"));
     }
+
+    #[test]
+    fn test_non_std_quote() {
+        let schema = Schema::new(vec![
+            Field::new("text1", DataType::Utf8, false),
+            Field::new("text2", DataType::Utf8, false),
+        ]);
+        let builder = ReaderBuilder::new()
+            .with_schema(Arc::new(schema))
+            .has_header(false)
+            .with_quote(b'~'); // default is ", change to ~
+
+        let mut csv_text = Vec::new();
+        let mut csv_writer = std::io::Cursor::new(&mut csv_text);
+        for index in 0..10 {
+            let text1 = format!("id{:}", index);
+            let text2 = format!("value{:}", index);
+            csv_writer
+                .write_fmt(format_args!("~{}~,~{}~\r\n", text1, text2))
+                .unwrap();
+        }
+        let mut csv_reader = std::io::Cursor::new(&csv_text);
+        let mut reader = builder.build(&mut csv_reader).unwrap();
+        let batch = reader.next().unwrap().unwrap();
+        let col0 = batch.column(0);
+        assert_eq!(col0.len(), 10);
+        let col0_arr = col0.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(col0_arr.value(0), "id0");
+        let col1 = batch.column(1);
+        assert_eq!(col1.len(), 10);
+        let col1_arr = col1.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(col1_arr.value(5), "value5");
+    }
+
+    #[test]
+    fn test_non_std_escape() {
+        let schema = Schema::new(vec![
+            Field::new("text1", DataType::Utf8, false),
+            Field::new("text2", DataType::Utf8, false),
+        ]);
+        let builder = ReaderBuilder::new()
+            .with_schema(Arc::new(schema))
+            .has_header(false)
+            .with_escape(b'\\'); // default is None, change to \
+
+        let mut csv_text = Vec::new();
+        let mut csv_writer = std::io::Cursor::new(&mut csv_text);
+        for index in 0..10 {
+            let text1 = format!("id{:}", index);
+            let text2 = format!("value\\\"{:}", index);
+            csv_writer
+                .write_fmt(format_args!("\"{}\",\"{}\"\r\n", text1, text2))
+                .unwrap();
+        }
+        let mut csv_reader = std::io::Cursor::new(&csv_text);
+        let mut reader = builder.build(&mut csv_reader).unwrap();
+        let batch = reader.next().unwrap().unwrap();
+        let col0 = batch.column(0);
+        assert_eq!(col0.len(), 10);
+        let col0_arr = col0.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(col0_arr.value(0), "id0");
+        let col1 = batch.column(1);
+        assert_eq!(col1.len(), 10);
+        let col1_arr = col1.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(col1_arr.value(5), "value\"5");
+    }
+
+    #[test]
+    fn test_non_std_terminator() {
+        let schema = Schema::new(vec![
+            Field::new("text1", DataType::Utf8, false),
+            Field::new("text2", DataType::Utf8, false),
+        ]);
+        let builder = ReaderBuilder::new()
+            .with_schema(Arc::new(schema))
+            .has_header(false)
+            .with_terminator(b'\n'); // default is CRLF, change to LF
+
+        let mut csv_text = Vec::new();
+        let mut csv_writer = std::io::Cursor::new(&mut csv_text);
+        for index in 0..10 {
+            let text1 = format!("id{:}", index);
+            let text2 = format!("value{:}", index);
+            csv_writer
+                .write_fmt(format_args!("\"{}\",\"{}\"\n", text1, text2))
+                .unwrap();
+        }
+        let mut csv_reader = std::io::Cursor::new(&csv_text);
+        let mut reader = builder.build(&mut csv_reader).unwrap();
+        let batch = reader.next().unwrap().unwrap();
+        let col0 = batch.column(0);
+        assert_eq!(col0.len(), 10);
+        let col0_arr = col0.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(col0_arr.value(0), "id0");
+        let col1 = batch.column(1);
+        assert_eq!(col1.len(), 10);
+        let col1_arr = col1.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(col1_arr.value(5), "value5");
+    }
 }