blob: 911b7077869200f28c693014c65341af05ab6f04 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Contains reader which reads parquet data into arrow array.
use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
use crate::errors::{ParquetError, Result};
use crate::file::reader::FileReader;
use arrow::array::StructArray;
use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use std::rc::Rc;
use std::sync::Arc;
/// Arrow reader api.
/// With this api, user can get arrow schema from parquet file, and read parquet data
/// into arrow arrays.
pub trait ArrowReader {
type RecordReader: RecordBatchReader;
/// Read parquet schema and convert it into arrow schema.
fn get_schema(&mut self) -> Result<Schema>;
/// Read parquet schema and convert it into arrow schema.
/// This schema only includes columns identified by `column_indices`.
fn get_schema_by_columns<T>(&mut self, column_indices: T) -> Result<Schema>
where
T: IntoIterator<Item = usize>;
/// Returns record batch reader from whole parquet file.
///
/// # Arguments
///
/// `batch_size`: The size of each record batch returned from this reader. Only the
/// last batch may contain records less than this size, otherwise record batches
/// returned from this reader should contains exactly `batch_size` elements.
fn get_record_reader(&mut self, batch_size: usize) -> Result<Self::RecordReader>;
/// Returns record batch reader whose record batch contains columns identified by
/// `column_indices`.
///
/// # Arguments
///
/// `column_indices`: The columns that should be included in record batches.
/// `batch_size`: Please refer to `get_record_reader`.
fn get_record_reader_by_columns<T>(
&mut self,
column_indices: T,
batch_size: usize,
) -> Result<Self::RecordReader>
where
T: IntoIterator<Item = usize>;
}
pub struct ParquetFileArrowReader {
file_reader: Rc<dyn FileReader>,
}
impl ArrowReader for ParquetFileArrowReader {
type RecordReader = ParquetRecordBatchReader;
fn get_schema(&mut self) -> Result<Schema> {
parquet_to_arrow_schema(
self.file_reader.metadata().file_metadata().schema_descr(),
)
}
fn get_schema_by_columns<T>(&mut self, column_indices: T) -> Result<Schema>
where
T: IntoIterator<Item = usize>,
{
parquet_to_arrow_schema_by_columns(
self.file_reader.metadata().file_metadata().schema_descr(),
column_indices,
)
}
fn get_record_reader(
&mut self,
batch_size: usize,
) -> Result<ParquetRecordBatchReader> {
let column_indices = 0..self
.file_reader
.metadata()
.file_metadata()
.schema_descr()
.num_columns();
self.get_record_reader_by_columns(column_indices, batch_size)
}
fn get_record_reader_by_columns<T>(
&mut self,
column_indices: T,
batch_size: usize,
) -> Result<ParquetRecordBatchReader>
where
T: IntoIterator<Item = usize>,
{
let array_reader = build_array_reader(
self.file_reader
.metadata()
.file_metadata()
.schema_descr_ptr(),
column_indices,
self.file_reader.clone(),
)?;
Ok(ParquetRecordBatchReader::try_new(batch_size, array_reader)?)
}
}
impl ParquetFileArrowReader {
pub fn new(file_reader: Rc<dyn FileReader>) -> Self {
Self { file_reader }
}
}
pub struct ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
}
impl RecordBatchReader for ParquetRecordBatchReader {
fn schema(&mut self) -> SchemaRef {
self.schema.clone()
}
fn next_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
self.array_reader
.next_batch(self.batch_size)
.map_err(|err| err.into())
.and_then(|array| {
array
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| {
general_err!("Struct array reader should return struct array")
.into()
})
.and_then(|struct_array| {
RecordBatch::try_new(
self.schema.clone(),
struct_array.columns_ref(),
)
})
})
.map(|record_batch| {
if record_batch.num_rows() > 0 {
Some(record_batch)
} else {
None
}
})
}
}
impl ParquetRecordBatchReader {
pub fn try_new(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
) -> Result<Self> {
// Check that array reader is struct array reader
array_reader
.as_any()
.downcast_ref::<StructArrayReader>()
.ok_or_else(|| general_err!("The input must be struct array reader!"))?;
let schema = match array_reader.get_data_type() {
&ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
_ => unreachable!("Struct array reader's data type is not struct!"),
};
Ok(Self {
batch_size,
array_reader,
schema: Arc::new(schema),
})
}
}
#[cfg(test)]
mod tests {
use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader};
use crate::arrow::converter::{Converter, FromConverter, Utf8ArrayConverter};
use crate::column::writer::get_typed_column_writer_mut;
use crate::data_type::{BoolType, ByteArray, ByteArrayType, DataType, Int32Type};
use crate::errors::Result;
use crate::file::properties::WriterProperties;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::file::writer::{FileWriter, SerializedFileWriter};
use crate::schema::parser::parse_message_type;
use crate::schema::types::TypePtr;
use crate::util::test_common::{get_temp_filename, RandGen};
use arrow::array::{Array, BooleanArray, StringArray, StructArray};
use arrow::record_batch::RecordBatchReader;
use serde_json::Value::Array as JArray;
use std::cmp::min;
use std::convert::TryFrom;
use std::env;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::rc::Rc;
#[test]
fn test_arrow_reader() {
let json_values = match serde_json::from_reader(get_test_file(
"parquet/generated_simple_numerics/blogs.json",
))
.expect("Failed to read json value from file!")
{
JArray(values) => values,
_ => panic!("Input should be json array!"),
};
let parquet_file_reader =
get_test_reader("parquet/generated_simple_numerics/blogs.parquet");
let max_len = parquet_file_reader.metadata().file_metadata().num_rows() as usize;
let mut arrow_reader = ParquetFileArrowReader::new(parquet_file_reader);
let mut record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");
for i in 0..20 {
let array: Option<StructArray> = record_batch_reader
.next_batch()
.expect("Failed to read record batch!")
.map(|r| r.into());
let (start, end) = (i * 60 as usize, (i + 1) * 60 as usize);
if start < max_len {
assert!(array.is_some());
assert_ne!(0, array.as_ref().unwrap().len());
let end = min(end, max_len);
let json = JArray(Vec::from(&json_values[start..end]));
assert_eq!(array.unwrap(), json)
} else {
assert!(array.is_none());
}
}
}
#[test]
fn test_bool_single_column_reader_test() {
let message_type = "
message test_schema {
REQUIRED BOOLEAN leaf;
}
";
single_column_reader_test::<
BoolType,
BooleanArray,
FromConverter<Vec<Option<bool>>, BooleanArray>,
BoolType,
>(2, 100, 2, message_type, 15, 50);
}
struct RandUtf8Gen {}
impl RandGen<ByteArrayType> for RandUtf8Gen {
fn gen(len: i32) -> ByteArray {
Int32Type::gen(len).to_string().as_str().into()
}
}
#[test]
fn test_utf8_single_column_reader_test() {
let message_type = "
message test_schema {
REQUIRED BINARY leaf (UTF8);
}
";
single_column_reader_test::<
ByteArrayType,
StringArray,
Utf8ArrayConverter,
RandUtf8Gen,
>(2, 100, 2, message_type, 15, 50);
}
fn single_column_reader_test<T, A, C, G>(
num_row_groups: usize,
num_rows: usize,
rand_max: i32,
message_type: &str,
record_batch_size: usize,
num_iterations: usize,
) where
T: DataType,
G: RandGen<T>,
A: PartialEq + Array + 'static,
C: Converter<Vec<Option<T::T>>, A> + 'static,
{
let values: Vec<Vec<T::T>> = (0..num_row_groups)
.map(|_| G::gen_vec(rand_max, num_rows))
.collect();
let path = get_temp_filename();
let schema = parse_message_type(message_type)
.map(|t| Rc::new(t))
.unwrap();
generate_single_column_file_with_data::<T>(&values, path.as_path(), schema)
.unwrap();
let parquet_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader));
let mut record_reader =
arrow_reader.get_record_reader(record_batch_size).unwrap();
let expected_data: Vec<Option<T::T>> = values
.iter()
.flat_map(|v| v.iter())
.map(|b| Some(b.clone()))
.collect();
for i in 0..num_iterations {
let start = i * record_batch_size;
let batch = record_reader.next_batch().unwrap();
if start < expected_data.len() {
let end = min(start + record_batch_size, expected_data.len());
assert!(batch.is_some());
let mut data = vec![];
data.extend_from_slice(&expected_data[start..end]);
assert_eq!(
&C::convert(data).unwrap(),
batch
.unwrap()
.column(0)
.as_any()
.downcast_ref::<A>()
.unwrap()
);
} else {
assert!(batch.is_none());
}
}
}
fn generate_single_column_file_with_data<T: DataType>(
values: &Vec<Vec<T::T>>,
path: &Path,
schema: TypePtr,
) -> Result<()> {
let file = File::create(path)?;
let writer_props = Rc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file, schema, writer_props.clone())?;
for v in values {
let mut row_group_writer = writer.next_row_group()?;
let mut column_writer = row_group_writer
.next_column()?
.expect("Column writer is none!");
get_typed_column_writer_mut::<T>(&mut column_writer)
.write_batch(v, None, None)?;
row_group_writer.close_column(column_writer)?;
writer.close_row_group(row_group_writer)?
}
writer.close()
}
fn get_test_reader(file_name: &str) -> Rc<dyn FileReader> {
let file = get_test_file(file_name);
let reader =
SerializedFileReader::new(file).expect("Failed to create serialized reader");
Rc::new(reader)
}
fn get_test_file(file_name: &str) -> File {
let mut path = PathBuf::new();
path.push(env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined!"));
path.push(file_name);
File::open(path.as_path()).expect("File not found!")
}
}