blob: 83fb0a2f7e957751754246f6d78af764cc158b92 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Contains reader which reads parquet data into arrow array.
use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::schema::{
parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns,
};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::FileReader;
use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use arrow::{array::StructArray, error::ArrowError};
use std::sync::Arc;
/// Arrow reader api.
/// With this api, user can get arrow schema from parquet file, and read parquet data
/// into arrow arrays.
pub trait ArrowReader {
type RecordReader: RecordBatchReader;
/// Read parquet schema and convert it into arrow schema.
fn get_schema(&mut self) -> Result<Schema>;
/// Read parquet schema and convert it into arrow schema.
/// This schema only includes columns identified by `column_indices`.
/// To select leaf columns (i.e. `a.b.c` instead of `a`), set `leaf_columns = true`
fn get_schema_by_columns<T>(
&mut self,
column_indices: T,
leaf_columns: bool,
) -> Result<Schema>
where
T: IntoIterator<Item = usize>;
/// Returns record batch reader from whole parquet file.
///
/// # Arguments
///
/// `batch_size`: The size of each record batch returned from this reader. Only the
/// last batch may contain records less than this size, otherwise record batches
/// returned from this reader should contains exactly `batch_size` elements.
fn get_record_reader(&mut self, batch_size: usize) -> Result<Self::RecordReader>;
/// Returns record batch reader whose record batch contains columns identified by
/// `column_indices`.
///
/// # Arguments
///
/// `column_indices`: The columns that should be included in record batches.
/// `batch_size`: Please refer to `get_record_reader`.
fn get_record_reader_by_columns<T>(
&mut self,
column_indices: T,
batch_size: usize,
) -> Result<Self::RecordReader>
where
T: IntoIterator<Item = usize>;
}
pub struct ParquetFileArrowReader {
file_reader: Arc<dyn FileReader>,
}
impl ArrowReader for ParquetFileArrowReader {
type RecordReader = ParquetRecordBatchReader;
fn get_schema(&mut self) -> Result<Schema> {
let file_metadata = self.file_reader.metadata().file_metadata();
parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
}
fn get_schema_by_columns<T>(
&mut self,
column_indices: T,
leaf_columns: bool,
) -> Result<Schema>
where
T: IntoIterator<Item = usize>,
{
let file_metadata = self.file_reader.metadata().file_metadata();
if leaf_columns {
parquet_to_arrow_schema_by_columns(
file_metadata.schema_descr(),
column_indices,
file_metadata.key_value_metadata(),
)
} else {
parquet_to_arrow_schema_by_root_columns(
file_metadata.schema_descr(),
column_indices,
file_metadata.key_value_metadata(),
)
}
}
fn get_record_reader(
&mut self,
batch_size: usize,
) -> Result<ParquetRecordBatchReader> {
let column_indices = 0..self
.file_reader
.metadata()
.file_metadata()
.schema_descr()
.num_columns();
self.get_record_reader_by_columns(column_indices, batch_size)
}
fn get_record_reader_by_columns<T>(
&mut self,
column_indices: T,
batch_size: usize,
) -> Result<ParquetRecordBatchReader>
where
T: IntoIterator<Item = usize>,
{
let array_reader = build_array_reader(
self.file_reader
.metadata()
.file_metadata()
.schema_descr_ptr(),
self.get_schema()?,
column_indices,
self.file_reader.clone(),
)?;
ParquetRecordBatchReader::try_new(batch_size, array_reader)
}
}
impl ParquetFileArrowReader {
pub fn new(file_reader: Arc<dyn FileReader>) -> Self {
Self { file_reader }
}
// Expose the reader metadata
pub fn get_metadata(&mut self) -> ParquetMetaData {
self.file_reader.metadata().clone()
}
}
pub struct ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
}
impl Iterator for ParquetRecordBatchReader {
type Item = ArrowResult<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
match self.array_reader.next_batch(self.batch_size) {
Err(error) => Some(Err(error.into())),
Ok(array) => {
let struct_array =
array.as_any().downcast_ref::<StructArray>().ok_or_else(|| {
ArrowError::ParquetError(
"Struct array reader should return struct array".to_string(),
)
});
match struct_array {
Err(err) => Some(Err(err)),
Ok(e) => {
match RecordBatch::try_new(self.schema.clone(), e.columns_ref()) {
Err(err) => Some(Err(err)),
Ok(record_batch) => {
if record_batch.num_rows() > 0 {
Some(Ok(record_batch))
} else {
None
}
}
}
}
}
}
}
}
}
impl RecordBatchReader for ParquetRecordBatchReader {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl ParquetRecordBatchReader {
pub fn try_new(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
) -> Result<Self> {
// Check that array reader is struct array reader
array_reader
.as_any()
.downcast_ref::<StructArrayReader>()
.ok_or_else(|| general_err!("The input must be struct array reader!"))?;
let schema = match array_reader.get_data_type() {
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
_ => unreachable!("Struct array reader's data type is not struct!"),
};
Ok(Self {
batch_size,
array_reader,
schema: Arc::new(schema),
})
}
}
#[cfg(test)]
mod tests {
use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader};
use crate::arrow::converter::{
Converter, FixedSizeArrayConverter, FromConverter, IntervalDayTimeArrayConverter,
Utf8ArrayConverter,
};
use crate::column::writer::get_typed_column_writer_mut;
use crate::data_type::{
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
FixedLenByteArrayType, Int32Type,
};
use crate::errors::Result;
use crate::file::properties::WriterProperties;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::file::writer::{FileWriter, SerializedFileWriter};
use crate::schema::parser::parse_message_type;
use crate::schema::types::TypePtr;
use crate::util::test_common::{get_temp_filename, RandGen};
use arrow::array::*;
use arrow::record_batch::RecordBatchReader;
use rand::RngCore;
use serde_json::json;
use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject};
use std::cmp::min;
use std::convert::TryFrom;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[test]
fn test_arrow_reader_all_columns() {
let json_values = get_json_array("parquet/generated_simple_numerics/blogs.json");
let parquet_file_reader =
get_test_reader("parquet/generated_simple_numerics/blogs.parquet");
let max_len = parquet_file_reader.metadata().file_metadata().num_rows() as usize;
let mut arrow_reader = ParquetFileArrowReader::new(parquet_file_reader);
let mut record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");
// Verify that the schema was correctly parsed
let original_schema = arrow_reader.get_schema().unwrap().fields().clone();
assert_eq!(original_schema, *record_batch_reader.schema().fields());
compare_batch_json(&mut record_batch_reader, json_values, max_len);
}
#[test]
fn test_arrow_reader_single_column() {
let json_values = get_json_array("parquet/generated_simple_numerics/blogs.json");
let projected_json_values = json_values
.into_iter()
.map(|value| match value {
JObject(fields) => {
json!({ "blog_id": fields.get("blog_id").unwrap_or(&JNull).clone()})
}
_ => panic!("Input should be json object array!"),
})
.collect::<Vec<_>>();
let parquet_file_reader =
get_test_reader("parquet/generated_simple_numerics/blogs.parquet");
let max_len = parquet_file_reader.metadata().file_metadata().num_rows() as usize;
let mut arrow_reader = ParquetFileArrowReader::new(parquet_file_reader);
let mut record_batch_reader = arrow_reader
.get_record_reader_by_columns(vec![2], 60)
.expect("Failed to read into array!");
// Verify that the schema was correctly parsed
let original_schema = arrow_reader.get_schema().unwrap().fields().clone();
assert_eq!(1, record_batch_reader.schema().fields().len());
assert_eq!(original_schema[1], record_batch_reader.schema().fields()[0]);
compare_batch_json(&mut record_batch_reader, projected_json_values, max_len);
}
#[test]
fn test_bool_single_column_reader_test() {
let message_type = "
message test_schema {
REQUIRED BOOLEAN leaf;
}
";
let converter = FromConverter::new();
run_single_column_reader_tests::<
BoolType,
BooleanArray,
FromConverter<Vec<Option<bool>>, BooleanArray>,
BoolType,
>(2, message_type, &converter);
}
struct RandFixedLenGen {}
impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
fn gen(len: i32) -> FixedLenByteArray {
let mut v = vec![0u8; len as usize];
rand::thread_rng().fill_bytes(&mut v);
ByteArray::from(v).into()
}
}
#[test]
fn test_fixed_length_binary_column_reader() {
let message_type = "
message test_schema {
REQUIRED FIXED_LEN_BYTE_ARRAY (20) leaf;
}
";
let converter = FixedSizeArrayConverter::new(20);
run_single_column_reader_tests::<
FixedLenByteArrayType,
FixedSizeBinaryArray,
FixedSizeArrayConverter,
RandFixedLenGen,
>(20, message_type, &converter);
}
#[test]
fn test_interval_day_time_column_reader() {
let message_type = "
message test_schema {
REQUIRED FIXED_LEN_BYTE_ARRAY (12) leaf (INTERVAL);
}
";
let converter = IntervalDayTimeArrayConverter {};
run_single_column_reader_tests::<
FixedLenByteArrayType,
IntervalDayTimeArray,
IntervalDayTimeArrayConverter,
RandFixedLenGen,
>(12, message_type, &converter);
}
struct RandUtf8Gen {}
impl RandGen<ByteArrayType> for RandUtf8Gen {
fn gen(len: i32) -> ByteArray {
Int32Type::gen(len).to_string().as_str().into()
}
}
#[test]
fn test_utf8_single_column_reader_test() {
let message_type = "
message test_schema {
REQUIRED BINARY leaf (UTF8);
}
";
let converter = Utf8ArrayConverter {};
run_single_column_reader_tests::<
ByteArrayType,
StringArray,
Utf8ArrayConverter,
RandUtf8Gen,
>(2, message_type, &converter);
}
#[test]
fn test_read_decimal_file() {
use arrow::array::DecimalArray;
let testdata = arrow::util::test_util::parquet_test_data();
let file_variants = vec![("fixed_length", 25), ("int32", 4), ("int64", 10)];
for (prefix, target_precision) in file_variants {
let path = format!("{}/{}_decimal.parquet", testdata, prefix);
let parquet_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let mut record_reader = arrow_reader.get_record_reader(32).unwrap();
let batch = record_reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 24);
let col = batch
.column(0)
.as_any()
.downcast_ref::<DecimalArray>()
.unwrap();
let expected = 1..25;
assert_eq!(col.precision(), target_precision);
assert_eq!(col.scale(), 2);
for (i, v) in expected.enumerate() {
assert_eq!(col.value(i), v * 100_i128);
}
}
}
/// Parameters for single_column_reader_test
#[derive(Debug)]
struct TestOptions {
/// Number of row group to write to parquet (row group size =
/// num_row_groups / num_rows)
num_row_groups: usize,
/// Total number of rows
num_rows: usize,
/// Size of batches to read back
record_batch_size: usize,
/// Total number of batches to attempt to read.
/// `record_batch_size` * `num_iterations` should be greater
/// than `num_rows` to ensure the data can be read back completely
num_iterations: usize,
}
/// Create a parquet file and then read it using
/// `ParquetFileArrowReader` using a standard set of parameters
/// `opts`.
///
/// `rand_max` represents the maximum size of value to pass to to
/// value generator
fn run_single_column_reader_tests<T, A, C, G>(
rand_max: i32,
message_type: &str,
converter: &C,
) where
T: DataType,
G: RandGen<T>,
A: PartialEq + Array + 'static,
C: Converter<Vec<Option<T::T>>, A> + 'static,
{
let all_options = vec![
// choose record_batch_batch (15) so batches cross row
// group boundaries (50 rows in 2 row groups) cases.
TestOptions {
num_row_groups: 2,
num_rows: 100,
record_batch_size: 15,
num_iterations: 50,
},
// choose record_batch_batch (5) so batches sometime fall
// on row group boundaries and (25 rows in 3 row groups
// --> row groups of 10, 10, and 5). Tests buffer
// refilling edge cases.
TestOptions {
num_row_groups: 3,
num_rows: 25,
record_batch_size: 5,
num_iterations: 50,
},
// Choose record_batch_size (25) so all batches fall
// exactly on row group boundary (25). Tests buffer
// refilling edge cases.
TestOptions {
num_row_groups: 4,
num_rows: 100,
record_batch_size: 25,
num_iterations: 50,
},
];
all_options.into_iter().for_each(|opts| {
// Print out options to facilitate debugging failures on CI
println!("Running with Test Options: {:?}", opts);
single_column_reader_test::<T, A, C, G>(
opts,
rand_max,
message_type,
converter,
)
});
}
/// Create a parquet file and then read it using
/// `ParquetFileArrowReader` using the parameters described in
/// `opts`.
fn single_column_reader_test<T, A, C, G>(
opts: TestOptions,
rand_max: i32,
message_type: &str,
converter: &C,
) where
T: DataType,
G: RandGen<T>,
A: PartialEq + Array + 'static,
C: Converter<Vec<Option<T::T>>, A> + 'static,
{
let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
.map(|_| G::gen_vec(rand_max, opts.num_rows))
.collect();
let path = get_temp_filename();
let schema = parse_message_type(message_type).map(Arc::new).unwrap();
generate_single_column_file_with_data::<T>(&values, path.as_path(), schema)
.unwrap();
let parquet_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
let mut record_reader = arrow_reader
.get_record_reader(opts.record_batch_size)
.unwrap();
let expected_data: Vec<Option<T::T>> = values
.iter()
.flat_map(|v| v.iter())
.map(|b| Some(b.clone()))
.collect();
for i in 0..opts.num_iterations {
let start = i * opts.record_batch_size;
let batch = record_reader.next();
if start < expected_data.len() {
let end = min(start + opts.record_batch_size, expected_data.len());
assert!(batch.is_some());
let mut data = vec![];
data.extend_from_slice(&expected_data[start..end]);
assert_eq!(
&converter.convert(data).unwrap(),
batch
.unwrap()
.unwrap()
.column(0)
.as_any()
.downcast_ref::<A>()
.unwrap()
);
} else {
assert!(batch.is_none());
}
}
}
fn generate_single_column_file_with_data<T: DataType>(
values: &[Vec<T::T>],
path: &Path,
schema: TypePtr,
) -> Result<parquet_format::FileMetaData> {
let file = File::create(path)?;
let writer_props = Arc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file, schema, writer_props)?;
for v in values {
let mut row_group_writer = writer.next_row_group()?;
let mut column_writer = row_group_writer
.next_column()?
.expect("Column writer is none!");
get_typed_column_writer_mut::<T>(&mut column_writer)
.write_batch(v, None, None)?;
row_group_writer.close_column(column_writer)?;
writer.close_row_group(row_group_writer)?
}
writer.close()
}
fn get_test_reader(file_name: &str) -> Arc<dyn FileReader> {
let file = get_test_file(file_name);
let reader =
SerializedFileReader::new(file).expect("Failed to create serialized reader");
Arc::new(reader)
}
fn get_test_file(file_name: &str) -> File {
let mut path = PathBuf::new();
path.push(arrow::util::test_util::arrow_test_data());
path.push(file_name);
File::open(path.as_path()).expect("File not found!")
}
fn get_json_array(filename: &str) -> Vec<serde_json::Value> {
match serde_json::from_reader(get_test_file(filename))
.expect("Failed to read json value from file!")
{
JArray(values) => values,
_ => panic!("Input should be json array!"),
}
}
fn compare_batch_json(
record_batch_reader: &mut dyn RecordBatchReader,
json_values: Vec<serde_json::Value>,
max_len: usize,
) {
for i in 0..20 {
let array: Option<StructArray> = record_batch_reader
.next()
.map(|r| r.expect("Failed to read record batch!").into());
let (start, end) = (i * 60_usize, (i + 1) * 60_usize);
if start < max_len {
assert!(array.is_some());
assert_ne!(0, array.as_ref().unwrap().len());
let end = min(end, max_len);
let json = JArray(Vec::from(&json_values[start..end]));
assert_eq!(array.unwrap(), json)
} else {
assert!(array.is_none());
}
}
}
#[test]
fn test_read_structs() {
// This particular test file has columns of struct types where there is
// a column that has the same name as one of the struct fields
// (see: ARROW-11452)
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/nested_structs.rust.parquet", testdata);
let parquet_file_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
let record_batch_reader = arrow_reader
.get_record_reader(60)
.expect("Failed to read into array!");
for batch in record_batch_reader {
batch.unwrap();
}
}
}