ARROW-4219: [Rust] [Parquet] Initial support for arrow reader.

Initial support of arrow reader, which reads parquet into arrow record batch.

Closes #5523 from liurenjie1024/arrow-4219 and squashes the following commits:

8b915b399 <Renjie Liu> Fix format
cecdd97e3 <Renjie Liu> Fix test
cf7f8f406 <Renjie Liu> Finish arrow reader

Authored-by: Renjie Liu <liurenjie2008@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
diff --git a/rust/arrow/src/array/array.rs b/rust/arrow/src/array/array.rs
index 19ab72f..d0ae1e3 100644
--- a/rust/arrow/src/array/array.rs
+++ b/rust/arrow/src/array/array.rs
@@ -1056,6 +1056,11 @@
         self.boxed_fields.iter().collect()
     }
 
+    /// Returns child array refs of the struct array
+    pub fn columns_ref(&self) -> Vec<ArrayRef> {
+        self.boxed_fields.clone()
+    }
+
     /// Return field names in this struct array
     pub fn column_names(&self) -> Vec<&str> {
         match self.data.data_type() {
diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs
index 40e0153..abd526f 100644
--- a/rust/arrow/src/datatypes.rs
+++ b/rust/arrow/src/datatypes.rs
@@ -34,6 +34,7 @@
 use serde_json::{json, Number, Value, Value::Number as VNumber};
 
 use crate::error::{ArrowError, Result};
+use std::sync::Arc;
 
 /// The possible relative types that are supported.
 ///
@@ -869,6 +870,8 @@
     }
 }
 
+pub type SchemaRef = Arc<Schema>;
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/rust/arrow/src/error.rs b/rust/arrow/src/error.rs
index 2f758d4..e9a9235 100644
--- a/rust/arrow/src/error.rs
+++ b/rust/arrow/src/error.rs
@@ -31,6 +31,7 @@
     JsonError(String),
     IoError(String),
     InvalidArgumentError(String),
+    ParquetError(String),
 }
 
 impl From<::std::io::Error> for ArrowError {
diff --git a/rust/arrow/src/record_batch.rs b/rust/arrow/src/record_batch.rs
index e718118..7ac1d41 100644
--- a/rust/arrow/src/record_batch.rs
+++ b/rust/arrow/src/record_batch.rs
@@ -113,9 +113,32 @@
     }
 }
 
+impl Into<StructArray> for RecordBatch {
+    fn into(self) -> StructArray {
+        self.schema
+            .fields
+            .iter()
+            .zip(self.columns.iter())
+            .map(|t| (t.0.clone(), t.1.clone()))
+            .collect::<Vec<(Field, ArrayRef)>>()
+            .into()
+    }
+}
+
 unsafe impl Send for RecordBatch {}
 unsafe impl Sync for RecordBatch {}
 
+/// Definition of record batch reader.
+pub trait RecordBatchReader {
+    /// Returns schemas of this record batch reader.
+    /// Implementation of this trait should guarantee that all record batches returned
+    /// by this reader should have same schema as returned from this method.
+    fn schema(&mut self) -> SchemaRef;
+
+    /// Returns next record batch.
+    fn next_batch(&mut self) -> Result<Option<RecordBatch>>;
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml
index 66292c0..549cc44 100644
--- a/rust/parquet/Cargo.toml
+++ b/rust/parquet/Cargo.toml
@@ -41,6 +41,7 @@
 chrono = "0.4"
 num-bigint = "0.2"
 arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT" }
+serde_json = { version = "1.0.13", features = ["preserve_order"] }
 
 [dev-dependencies]
 lazy_static = "1"
diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs
index 3a4a786..2e58c0c 100644
--- a/rust/parquet/src/arrow/array_reader.rs
+++ b/rust/parquet/src/arrow/array_reader.rs
@@ -52,9 +52,12 @@
     ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr,
 };
 use crate::schema::visitor::TypeVisitor;
+use std::any::Any;
 
 /// Array reader reads parquet data into arrow array.
 pub trait ArrayReader {
+    fn as_any(&self) -> &dyn Any;
+
     /// Returns the arrow type of this array reader.
     fn get_data_type(&self) -> &ArrowType;
 
@@ -115,6 +118,10 @@
 
 /// Implementation of primitive array reader.
 impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     /// Returns data type of primitive array.
     fn get_data_type(&self) -> &ArrowType {
         &self.data_type
@@ -232,7 +239,7 @@
 }
 
 /// Implementation of struct array reader.
-struct StructArrayReader {
+pub struct StructArrayReader {
     children: Vec<Box<dyn ArrayReader>>,
     data_type: ArrowType,
     struct_def_level: i16,
@@ -261,6 +268,10 @@
 }
 
 impl ArrayReader for StructArrayReader {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
     /// Returns data type.
     /// This must be a struct.
     fn get_data_type(&self) -> &ArrowType {
@@ -705,6 +716,7 @@
     use arrow::array::{Array, ArrayRef, PrimitiveArray, StructArray};
     use arrow::datatypes::{DataType as ArrowType, Field, Int32Type as ArrowInt32};
     use rand::distributions::range::SampleRange;
+    use std::any::Any;
     use std::collections::VecDeque;
     use std::rc::Rc;
     use std::sync::Arc;
@@ -953,6 +965,10 @@
     }
 
     impl ArrayReader for InMemoryArrayReader {
+        fn as_any(&self) -> &Any {
+            self
+        }
+
         fn get_data_type(&self) -> &ArrowType {
             &self.data_type
         }
diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs
new file mode 100644
index 0000000..4d12195
--- /dev/null
+++ b/rust/parquet/src/arrow/arrow_reader.rs
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains reader which reads parquet data into arrow array.
+
+use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader};
+use crate::arrow::schema::parquet_to_arrow_schema;
+use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
+use crate::errors::{ParquetError, Result};
+use crate::file::reader::FileReader;
+use arrow::array::StructArray;
+use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::{RecordBatch, RecordBatchReader};
+use std::rc::Rc;
+use std::sync::Arc;
+
+/// Arrow reader api.
+/// With this api, user can get arrow schema from parquet file, and read parquet data
+/// into arrow arrays.
+pub trait ArrowReader {
+    /// Read parquet schema and convert it into arrow schema.
+    fn get_schema(&mut self) -> Result<Schema>;
+
+    /// Read parquet schema and convert it into arrow schema.
+    /// This schema only includes columns identified by `column_indices`.
+    fn get_schema_by_columns<T>(&mut self, column_indices: T) -> Result<Schema>
+    where
+        T: IntoIterator<Item = usize>;
+
+    /// Returns record batch reader from whole parquet file.
+    ///
+    /// # Arguments
+    ///
+    /// `batch_size`: The size of each record batch returned from this reader. Only the
+    /// last batch may contain records less than this size, otherwise record batches
+    /// returned from this reader should contains exactly `batch_size` elements.
+    fn get_record_reader(
+        &mut self,
+        batch_size: usize,
+    ) -> Result<Box<dyn RecordBatchReader>>;
+
+    /// Returns record batch reader whose record batch contains columns identified by
+    /// `column_indices`.
+    ///
+    /// # Arguments
+    ///
+    /// `column_indices`: The columns that should be included in record batches.
+    /// `batch_size`: Please refer to `get_record_reader`.
+    fn get_record_reader_by_columns<T>(
+        &mut self,
+        column_indices: T,
+        batch_size: usize,
+    ) -> Result<Box<dyn RecordBatchReader>>
+    where
+        T: IntoIterator<Item = usize>;
+}
+
+pub struct ParquetFileArrowReader {
+    file_reader: Rc<dyn FileReader>,
+}
+
+impl ArrowReader for ParquetFileArrowReader {
+    fn get_schema(&mut self) -> Result<Schema> {
+        parquet_to_arrow_schema(
+            self.file_reader
+                .metadata()
+                .file_metadata()
+                .schema_descr_ptr(),
+        )
+    }
+
+    fn get_schema_by_columns<T>(&mut self, column_indices: T) -> Result<Schema>
+    where
+        T: IntoIterator<Item = usize>,
+    {
+        parquet_to_arrow_schema_by_columns(
+            self.file_reader
+                .metadata()
+                .file_metadata()
+                .schema_descr_ptr(),
+            column_indices,
+        )
+    }
+
+    fn get_record_reader(
+        &mut self,
+        batch_size: usize,
+    ) -> Result<Box<dyn RecordBatchReader>> {
+        let column_indices = 0..self
+            .file_reader
+            .metadata()
+            .file_metadata()
+            .schema_descr_ptr()
+            .num_columns();
+
+        self.get_record_reader_by_columns(column_indices, batch_size)
+    }
+
+    fn get_record_reader_by_columns<T>(
+        &mut self,
+        column_indices: T,
+        batch_size: usize,
+    ) -> Result<Box<dyn RecordBatchReader>>
+    where
+        T: IntoIterator<Item = usize>,
+    {
+        let array_reader = build_array_reader(
+            self.file_reader
+                .metadata()
+                .file_metadata()
+                .schema_descr_ptr(),
+            column_indices,
+            self.file_reader.clone(),
+        )?;
+
+        Ok(Box::new(ParquetRecordBatchReader::try_new(
+            batch_size,
+            array_reader,
+        )?))
+    }
+}
+
+impl ParquetFileArrowReader {
+    pub fn new(file_reader: Rc<dyn FileReader>) -> Self {
+        Self { file_reader }
+    }
+}
+
+struct ParquetRecordBatchReader {
+    batch_size: usize,
+    array_reader: Box<dyn ArrayReader>,
+    schema: SchemaRef,
+}
+
+impl RecordBatchReader for ParquetRecordBatchReader {
+    fn schema(&mut self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn next_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
+        self.array_reader
+            .next_batch(self.batch_size)
+            .map_err(|err| err.into())
+            .and_then(|array| {
+                array
+                    .as_any()
+                    .downcast_ref::<StructArray>()
+                    .ok_or_else(|| {
+                        general_err!("Struct array reader should return struct array")
+                            .into()
+                    })
+                    .and_then(|struct_array| {
+                        RecordBatch::try_new(
+                            self.schema.clone(),
+                            struct_array.columns_ref(),
+                        )
+                    })
+            })
+            .map(|record_batch| {
+                if record_batch.num_rows() > 0 {
+                    Some(record_batch)
+                } else {
+                    None
+                }
+            })
+    }
+}
+
+impl ParquetRecordBatchReader {
+    pub fn try_new(
+        batch_size: usize,
+        array_reader: Box<dyn ArrayReader>,
+    ) -> Result<Self> {
+        // Check that array reader is struct array reader
+        array_reader
+            .as_any()
+            .downcast_ref::<StructArrayReader>()
+            .ok_or_else(|| general_err!("The input must be struct array reader!"))?;
+
+        let schema = match array_reader.get_data_type() {
+            &ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
+            _ => unreachable!("Struct array reader's data type is not struct!"),
+        };
+
+        Ok(Self {
+            batch_size,
+            array_reader,
+            schema: Arc::new(schema),
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader};
+    use crate::file::reader::{FileReader, SerializedFileReader};
+    use arrow::array::Array;
+    use arrow::array::StructArray;
+    use serde_json::Value::Array as JArray;
+    use std::cmp::min;
+    use std::env;
+    use std::fs::File;
+    use std::path::PathBuf;
+    use std::rc::Rc;
+
+    #[test]
+    fn test_arrow_reader() {
+        let json_values = match serde_json::from_reader(get_test_file(
+            "parquet/generated_simple_numerics/blogs.json",
+        ))
+        .expect("Failed to read json value from file!")
+        {
+            JArray(values) => values,
+            _ => panic!("Input should be json array!"),
+        };
+
+        let parquet_file_reader =
+            get_test_reader("parquet/generated_simple_numerics/blogs.parquet");
+
+        let max_len = parquet_file_reader.metadata().file_metadata().num_rows() as usize;
+
+        let mut arrow_reader = ParquetFileArrowReader::new(parquet_file_reader);
+
+        let mut record_batch_reader = arrow_reader
+            .get_record_reader(60)
+            .expect("Failed to read into array!");
+
+        for i in 0..20 {
+            let array: Option<StructArray> = record_batch_reader
+                .next_batch()
+                .expect("Failed to read record batch!")
+                .map(|r| r.into());
+
+            let (start, end) = (i * 60 as usize, (i + 1) * 60 as usize);
+
+            if start < max_len {
+                assert!(array.is_some());
+                assert_ne!(0, array.as_ref().unwrap().len());
+                let end = min(end, max_len);
+                let json = JArray(Vec::from(&json_values[start..end]));
+                assert_eq!(array.unwrap(), json)
+            } else {
+                assert!(array.is_none());
+            }
+        }
+    }
+
+    fn get_test_reader(file_name: &str) -> Rc<dyn FileReader> {
+        let file = get_test_file(file_name);
+
+        let reader =
+            SerializedFileReader::new(file).expect("Failed to create serialized reader");
+
+        Rc::new(reader)
+    }
+
+    fn get_test_file(file_name: &str) -> File {
+        let mut path = PathBuf::new();
+        path.push(env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined!"));
+        path.push(file_name);
+
+        File::open(path.as_path()).expect("File not found!")
+    }
+}
diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs
index a2c6031..02f50fd 100644
--- a/rust/parquet/src/arrow/mod.rs
+++ b/rust/parquet/src/arrow/mod.rs
@@ -19,10 +19,42 @@
 //! in-memory data.
 //!
 //! This mod provides API for converting between arrow and parquet.
+//!
+//! # Example of reading parquet file into arrow record batch
+//!
+//! ```rust, no_run
+//! use arrow::record_batch::RecordBatchReader;
+//! use parquet::file::reader::SerializedFileReader;
+//! use parquet::arrow::{ParquetFileArrowReader, ArrowReader};
+//! use std::rc::Rc;
+//! use std::fs::File;
+//!
+//! let file = File::open("parquet.file").unwrap();
+//! let file_reader = SerializedFileReader::new(file).unwrap();
+//! let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(file_reader));
+//!
+//! println!("Converted arrow schema is: {}", arrow_reader.get_schema().unwrap());
+//! println!("Arrow schema after projection is: {}",
+//!    arrow_reader.get_schema_by_columns(vec![2, 4, 6]).unwrap());
+//!
+//! let mut record_batch_reader = arrow_reader.get_record_reader(2048).unwrap();
+//!
+//! loop {
+//!    let record_batch = record_batch_reader.next_batch().unwrap().unwrap();
+//!    if record_batch.num_rows() > 0 {
+//!        println!("Read {} records.", record_batch.num_rows());
+//!    } else {
+//!        println!("End of file!");
+//!    }
+//!}
+//! ```
 
 pub(in crate::arrow) mod array_reader;
+pub mod arrow_reader;
 pub(in crate::arrow) mod converter;
 pub(in crate::arrow) mod record_reader;
 pub mod schema;
 
+pub use self::arrow_reader::ArrowReader;
+pub use self::arrow_reader::ParquetFileArrowReader;
 pub use self::schema::{parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns};
diff --git a/rust/parquet/src/errors.rs b/rust/parquet/src/errors.rs
index fe19c1f..4e0e2fe 100644
--- a/rust/parquet/src/errors.rs
+++ b/rust/parquet/src/errors.rs
@@ -22,6 +22,7 @@
 use arrow::error::ArrowError;
 use quick_error::quick_error;
 use snap;
+use std::error::Error;
 use thrift;
 
 quick_error! {
@@ -97,3 +98,12 @@
     ($fmt:expr) => (ParquetError::EOF($fmt.to_owned()));
     ($fmt:expr, $($args:expr),*) => (ParquetError::EOF(format!($fmt, $($args),*)));
 }
+
+// ----------------------------------------------------------------------
+// Convert parquet error into other errors
+
+impl Into<ArrowError> for ParquetError {
+    fn into(self) -> ArrowError {
+        ArrowError::ParquetError(self.description().to_string())
+    }
+}
diff --git a/testing b/testing
index 93f33ae..90ae758 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 93f33ae922226686e623d6b1d0307f48030a8a67
+Subproject commit 90ae758c55aebf40e926ce049a662726b26f485f