ARROW-4219: [Rust] [Parquet] Initial support for arrow reader.
Initial support of arrow reader, which reads parquet into arrow record batch.
Closes #5523 from liurenjie1024/arrow-4219 and squashes the following commits:
8b915b399 <Renjie Liu> Fix format
cecdd97e3 <Renjie Liu> Fix test
cf7f8f406 <Renjie Liu> Finish arrow reader
Authored-by: Renjie Liu <liurenjie2008@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
diff --git a/rust/arrow/src/array/array.rs b/rust/arrow/src/array/array.rs
index 19ab72f..d0ae1e3 100644
--- a/rust/arrow/src/array/array.rs
+++ b/rust/arrow/src/array/array.rs
@@ -1056,6 +1056,11 @@
self.boxed_fields.iter().collect()
}
+ /// Returns child array refs of the struct array
+ pub fn columns_ref(&self) -> Vec<ArrayRef> {
+ self.boxed_fields.clone()
+ }
+
/// Return field names in this struct array
pub fn column_names(&self) -> Vec<&str> {
match self.data.data_type() {
diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs
index 40e0153..abd526f 100644
--- a/rust/arrow/src/datatypes.rs
+++ b/rust/arrow/src/datatypes.rs
@@ -34,6 +34,7 @@
use serde_json::{json, Number, Value, Value::Number as VNumber};
use crate::error::{ArrowError, Result};
+use std::sync::Arc;
/// The possible relative types that are supported.
///
@@ -869,6 +870,8 @@
}
}
+pub type SchemaRef = Arc<Schema>;
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/rust/arrow/src/error.rs b/rust/arrow/src/error.rs
index 2f758d4..e9a9235 100644
--- a/rust/arrow/src/error.rs
+++ b/rust/arrow/src/error.rs
@@ -31,6 +31,7 @@
JsonError(String),
IoError(String),
InvalidArgumentError(String),
+ ParquetError(String),
}
impl From<::std::io::Error> for ArrowError {
diff --git a/rust/arrow/src/record_batch.rs b/rust/arrow/src/record_batch.rs
index e718118..7ac1d41 100644
--- a/rust/arrow/src/record_batch.rs
+++ b/rust/arrow/src/record_batch.rs
@@ -113,9 +113,32 @@
}
}
+impl Into<StructArray> for RecordBatch {
+ fn into(self) -> StructArray {
+ self.schema
+ .fields
+ .iter()
+ .zip(self.columns.iter())
+ .map(|t| (t.0.clone(), t.1.clone()))
+ .collect::<Vec<(Field, ArrayRef)>>()
+ .into()
+ }
+}
+
unsafe impl Send for RecordBatch {}
unsafe impl Sync for RecordBatch {}
+/// Definition of record batch reader.
+pub trait RecordBatchReader {
+ /// Returns schemas of this record batch reader.
+ /// Implementation of this trait should guarantee that all record batches returned
+ /// by this reader should have same schema as returned from this method.
+ fn schema(&mut self) -> SchemaRef;
+
+ /// Returns next record batch.
+ fn next_batch(&mut self) -> Result<Option<RecordBatch>>;
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml
index 66292c0..549cc44 100644
--- a/rust/parquet/Cargo.toml
+++ b/rust/parquet/Cargo.toml
@@ -41,6 +41,7 @@
chrono = "0.4"
num-bigint = "0.2"
arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT" }
+serde_json = { version = "1.0.13", features = ["preserve_order"] }
[dev-dependencies]
lazy_static = "1"
diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs
index 3a4a786..2e58c0c 100644
--- a/rust/parquet/src/arrow/array_reader.rs
+++ b/rust/parquet/src/arrow/array_reader.rs
@@ -52,9 +52,12 @@
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr,
};
use crate::schema::visitor::TypeVisitor;
+use std::any::Any;
/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
+ fn as_any(&self) -> &dyn Any;
+
/// Returns the arrow type of this array reader.
fn get_data_type(&self) -> &ArrowType;
@@ -115,6 +118,10 @@
/// Implementation of primitive array reader.
impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
/// Returns data type of primitive array.
fn get_data_type(&self) -> &ArrowType {
&self.data_type
@@ -232,7 +239,7 @@
}
/// Implementation of struct array reader.
-struct StructArrayReader {
+pub struct StructArrayReader {
children: Vec<Box<dyn ArrayReader>>,
data_type: ArrowType,
struct_def_level: i16,
@@ -261,6 +268,10 @@
}
impl ArrayReader for StructArrayReader {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
/// Returns data type.
/// This must be a struct.
fn get_data_type(&self) -> &ArrowType {
@@ -705,6 +716,7 @@
use arrow::array::{Array, ArrayRef, PrimitiveArray, StructArray};
use arrow::datatypes::{DataType as ArrowType, Field, Int32Type as ArrowInt32};
use rand::distributions::range::SampleRange;
+ use std::any::Any;
use std::collections::VecDeque;
use std::rc::Rc;
use std::sync::Arc;
@@ -953,6 +965,10 @@
}
impl ArrayReader for InMemoryArrayReader {
+ fn as_any(&self) -> &Any {
+ self
+ }
+
fn get_data_type(&self) -> &ArrowType {
&self.data_type
}
diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs
new file mode 100644
index 0000000..4d12195
--- /dev/null
+++ b/rust/parquet/src/arrow/arrow_reader.rs
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains reader which reads parquet data into arrow array.
+
+use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader};
+use crate::arrow::schema::parquet_to_arrow_schema;
+use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
+use crate::errors::{ParquetError, Result};
+use crate::file::reader::FileReader;
+use arrow::array::StructArray;
+use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::{RecordBatch, RecordBatchReader};
+use std::rc::Rc;
+use std::sync::Arc;
+
+/// Arrow reader api.
+/// With this api, user can get arrow schema from parquet file, and read parquet data
+/// into arrow arrays.
+pub trait ArrowReader {
+ /// Read parquet schema and convert it into arrow schema.
+ fn get_schema(&mut self) -> Result<Schema>;
+
+ /// Read parquet schema and convert it into arrow schema.
+ /// This schema only includes columns identified by `column_indices`.
+ fn get_schema_by_columns<T>(&mut self, column_indices: T) -> Result<Schema>
+ where
+ T: IntoIterator<Item = usize>;
+
+ /// Returns record batch reader from whole parquet file.
+ ///
+ /// # Arguments
+ ///
+ /// `batch_size`: The size of each record batch returned from this reader. Only the
+ /// last batch may contain records less than this size, otherwise record batches
+ /// returned from this reader should contains exactly `batch_size` elements.
+ fn get_record_reader(
+ &mut self,
+ batch_size: usize,
+ ) -> Result<Box<dyn RecordBatchReader>>;
+
+ /// Returns record batch reader whose record batch contains columns identified by
+ /// `column_indices`.
+ ///
+ /// # Arguments
+ ///
+ /// `column_indices`: The columns that should be included in record batches.
+ /// `batch_size`: Please refer to `get_record_reader`.
+ fn get_record_reader_by_columns<T>(
+ &mut self,
+ column_indices: T,
+ batch_size: usize,
+ ) -> Result<Box<dyn RecordBatchReader>>
+ where
+ T: IntoIterator<Item = usize>;
+}
+
+pub struct ParquetFileArrowReader {
+ file_reader: Rc<dyn FileReader>,
+}
+
+impl ArrowReader for ParquetFileArrowReader {
+ fn get_schema(&mut self) -> Result<Schema> {
+ parquet_to_arrow_schema(
+ self.file_reader
+ .metadata()
+ .file_metadata()
+ .schema_descr_ptr(),
+ )
+ }
+
+ fn get_schema_by_columns<T>(&mut self, column_indices: T) -> Result<Schema>
+ where
+ T: IntoIterator<Item = usize>,
+ {
+ parquet_to_arrow_schema_by_columns(
+ self.file_reader
+ .metadata()
+ .file_metadata()
+ .schema_descr_ptr(),
+ column_indices,
+ )
+ }
+
+ fn get_record_reader(
+ &mut self,
+ batch_size: usize,
+ ) -> Result<Box<dyn RecordBatchReader>> {
+ let column_indices = 0..self
+ .file_reader
+ .metadata()
+ .file_metadata()
+ .schema_descr_ptr()
+ .num_columns();
+
+ self.get_record_reader_by_columns(column_indices, batch_size)
+ }
+
+ fn get_record_reader_by_columns<T>(
+ &mut self,
+ column_indices: T,
+ batch_size: usize,
+ ) -> Result<Box<dyn RecordBatchReader>>
+ where
+ T: IntoIterator<Item = usize>,
+ {
+ let array_reader = build_array_reader(
+ self.file_reader
+ .metadata()
+ .file_metadata()
+ .schema_descr_ptr(),
+ column_indices,
+ self.file_reader.clone(),
+ )?;
+
+ Ok(Box::new(ParquetRecordBatchReader::try_new(
+ batch_size,
+ array_reader,
+ )?))
+ }
+}
+
+impl ParquetFileArrowReader {
+ pub fn new(file_reader: Rc<dyn FileReader>) -> Self {
+ Self { file_reader }
+ }
+}
+
+struct ParquetRecordBatchReader {
+ batch_size: usize,
+ array_reader: Box<dyn ArrayReader>,
+ schema: SchemaRef,
+}
+
+impl RecordBatchReader for ParquetRecordBatchReader {
+ fn schema(&mut self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn next_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
+ self.array_reader
+ .next_batch(self.batch_size)
+ .map_err(|err| err.into())
+ .and_then(|array| {
+ array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| {
+ general_err!("Struct array reader should return struct array")
+ .into()
+ })
+ .and_then(|struct_array| {
+ RecordBatch::try_new(
+ self.schema.clone(),
+ struct_array.columns_ref(),
+ )
+ })
+ })
+ .map(|record_batch| {
+ if record_batch.num_rows() > 0 {
+ Some(record_batch)
+ } else {
+ None
+ }
+ })
+ }
+}
+
+impl ParquetRecordBatchReader {
+ pub fn try_new(
+ batch_size: usize,
+ array_reader: Box<dyn ArrayReader>,
+ ) -> Result<Self> {
+ // Check that array reader is struct array reader
+ array_reader
+ .as_any()
+ .downcast_ref::<StructArrayReader>()
+ .ok_or_else(|| general_err!("The input must be struct array reader!"))?;
+
+ let schema = match array_reader.get_data_type() {
+ &ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
+ _ => unreachable!("Struct array reader's data type is not struct!"),
+ };
+
+ Ok(Self {
+ batch_size,
+ array_reader,
+ schema: Arc::new(schema),
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader};
+ use crate::file::reader::{FileReader, SerializedFileReader};
+ use arrow::array::Array;
+ use arrow::array::StructArray;
+ use serde_json::Value::Array as JArray;
+ use std::cmp::min;
+ use std::env;
+ use std::fs::File;
+ use std::path::PathBuf;
+ use std::rc::Rc;
+
+ #[test]
+ fn test_arrow_reader() {
+ let json_values = match serde_json::from_reader(get_test_file(
+ "parquet/generated_simple_numerics/blogs.json",
+ ))
+ .expect("Failed to read json value from file!")
+ {
+ JArray(values) => values,
+ _ => panic!("Input should be json array!"),
+ };
+
+ let parquet_file_reader =
+ get_test_reader("parquet/generated_simple_numerics/blogs.parquet");
+
+ let max_len = parquet_file_reader.metadata().file_metadata().num_rows() as usize;
+
+ let mut arrow_reader = ParquetFileArrowReader::new(parquet_file_reader);
+
+ let mut record_batch_reader = arrow_reader
+ .get_record_reader(60)
+ .expect("Failed to read into array!");
+
+ for i in 0..20 {
+ let array: Option<StructArray> = record_batch_reader
+ .next_batch()
+ .expect("Failed to read record batch!")
+ .map(|r| r.into());
+
+ let (start, end) = (i * 60 as usize, (i + 1) * 60 as usize);
+
+ if start < max_len {
+ assert!(array.is_some());
+ assert_ne!(0, array.as_ref().unwrap().len());
+ let end = min(end, max_len);
+ let json = JArray(Vec::from(&json_values[start..end]));
+ assert_eq!(array.unwrap(), json)
+ } else {
+ assert!(array.is_none());
+ }
+ }
+ }
+
+ fn get_test_reader(file_name: &str) -> Rc<dyn FileReader> {
+ let file = get_test_file(file_name);
+
+ let reader =
+ SerializedFileReader::new(file).expect("Failed to create serialized reader");
+
+ Rc::new(reader)
+ }
+
+ fn get_test_file(file_name: &str) -> File {
+ let mut path = PathBuf::new();
+ path.push(env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined!"));
+ path.push(file_name);
+
+ File::open(path.as_path()).expect("File not found!")
+ }
+}
diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs
index a2c6031..02f50fd 100644
--- a/rust/parquet/src/arrow/mod.rs
+++ b/rust/parquet/src/arrow/mod.rs
@@ -19,10 +19,42 @@
//! in-memory data.
//!
//! This mod provides API for converting between arrow and parquet.
+//!
+//! # Example of reading parquet file into arrow record batch
+//!
+//! ```rust, no_run
+//! use arrow::record_batch::RecordBatchReader;
+//! use parquet::file::reader::SerializedFileReader;
+//! use parquet::arrow::{ParquetFileArrowReader, ArrowReader};
+//! use std::rc::Rc;
+//! use std::fs::File;
+//!
+//! let file = File::open("parquet.file").unwrap();
+//! let file_reader = SerializedFileReader::new(file).unwrap();
+//! let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(file_reader));
+//!
+//! println!("Converted arrow schema is: {}", arrow_reader.get_schema().unwrap());
+//! println!("Arrow schema after projection is: {}",
+//! arrow_reader.get_schema_by_columns(vec![2, 4, 6]).unwrap());
+//!
+//! let mut record_batch_reader = arrow_reader.get_record_reader(2048).unwrap();
+//!
+//! loop {
+//! let record_batch = record_batch_reader.next_batch().unwrap().unwrap();
+//! if record_batch.num_rows() > 0 {
+//! println!("Read {} records.", record_batch.num_rows());
+//! } else {
+//! println!("End of file!");
+//! }
+//!}
+//! ```
pub(in crate::arrow) mod array_reader;
+pub mod arrow_reader;
pub(in crate::arrow) mod converter;
pub(in crate::arrow) mod record_reader;
pub mod schema;
+pub use self::arrow_reader::ArrowReader;
+pub use self::arrow_reader::ParquetFileArrowReader;
pub use self::schema::{parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns};
diff --git a/rust/parquet/src/errors.rs b/rust/parquet/src/errors.rs
index fe19c1f..4e0e2fe 100644
--- a/rust/parquet/src/errors.rs
+++ b/rust/parquet/src/errors.rs
@@ -22,6 +22,7 @@
use arrow::error::ArrowError;
use quick_error::quick_error;
use snap;
+use std::error::Error;
use thrift;
quick_error! {
@@ -97,3 +98,12 @@
($fmt:expr) => (ParquetError::EOF($fmt.to_owned()));
($fmt:expr, $($args:expr),*) => (ParquetError::EOF(format!($fmt, $($args),*)));
}
+
+// ----------------------------------------------------------------------
+// Convert parquet error into other errors
+
+impl Into<ArrowError> for ParquetError {
+ fn into(self) -> ArrowError {
+ ArrowError::ParquetError(self.description().to_string())
+ }
+}
diff --git a/testing b/testing
index 93f33ae..90ae758 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 93f33ae922226686e623d6b1d0307f48030a8a67
+Subproject commit 90ae758c55aebf40e926ce049a662726b26f485f