blob: 7690e920dd4369632f051d72cecae94e270754bb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::deletion_vector::{DeletionVector, DeletionVectorFactory};
use crate::io::{FileIO, FileRead, FileStatus};
use crate::spec::DataField;
use crate::table::ArrowRecordBatchStream;
use crate::{DataSplit, Error};
use async_stream::try_stream;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{StreamExt, TryFutureExt};
use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use std::ops::Range;
use std::sync::Arc;
use tokio::try_join;
/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
file_io: FileIO,
}
impl ArrowReaderBuilder {
/// Create a new ArrowReaderBuilder
pub(crate) fn new(file_io: FileIO) -> Self {
ArrowReaderBuilder {
batch_size: None,
file_io,
}
}
/// Build the ArrowReader with the given read type (logical row type or projected subset).
/// Used to clip Parquet schema to requested columns only.
pub fn build(self, read_type: Vec<DataField>) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io,
read_type,
}
}
}
/// Reads data from Parquet files
#[derive(Clone)]
pub struct ArrowReader {
batch_size: Option<usize>,
file_io: FileIO,
read_type: Vec<DataField>,
}
impl ArrowReader {
/// Take a stream of DataSplits and read every data file in each split.
/// Returns a stream of Arrow RecordBatches from all files.
/// When a split has deletion files (see [DataSplit::data_deletion_files]), the corresponding
/// deletion vectors are loaded and applied so that deleted rows are filtered out from the stream.
/// Row positions are 0-based within each data file, matching Java's ApplyDeletionVectorReader.
///
/// Matches [RawFileSplitRead.createReader](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java):
/// one DV factory per DataSplit (created from that split's data files and deletion files).
///
/// Parquet schema is clipped to this reader's read type (column names from [DataField]s).
/// File-only columns are not read. See [ParquetReaderFactory.clipParquetSchema](https://github.com/apache/paimon/blob/master/paimon-format/paimon-format-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java).
pub fn read(self, data_splits: &[DataSplit]) -> crate::Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();
let batch_size = self.batch_size;
// Owned list of splits so the stream does not hold references.
let splits: Vec<DataSplit> = data_splits.to_vec();
let read_type = self.read_type;
let projected_column_names = (!read_type.is_empty()).then(|| {
read_type
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>()
});
Ok(try_stream! {
for split in splits {
// Create DV factory for this split only (like Java createReader(partition, bucket, files, deletionFiles)).
let core_data_files = split.data_files();
let dv_factory = if split
.data_deletion_files()
.is_some_and(|files| files.iter().any(Option::is_some))
{
Some(
DeletionVectorFactory::new(
&file_io,
core_data_files,
split.data_deletion_files(),
)
.await?,
)
} else {
None
};
for file_meta in core_data_files {
let path_to_read = split.data_file_path(file_meta);
if !path_to_read.to_ascii_lowercase().ends_with(".parquet") {
Err(Error::Unsupported {
message: format!(
"unsupported file format: only .parquet is supported, got: {path_to_read}"
),
})?
}
let dv = dv_factory
.as_ref()
.and_then(|factory| factory.get_deletion_vector(&file_meta.file_name));
let parquet_file = file_io.new_input(&path_to_read)?;
let (parquet_metadata, parquet_reader) = try_join!(
parquet_file.metadata(),
parquet_file.reader()
)?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
.await?;
// Clip to read type columns; file-only columns are dropped.
if let Some(projected_column_names) = projected_column_names.as_ref() {
let parquet_schema = batch_stream_builder.parquet_schema();
let mask = ProjectionMask::columns(
parquet_schema,
projected_column_names.iter().map(String::as_str),
);
batch_stream_builder = batch_stream_builder.with_projection(mask);
}
if let Some(dv) = dv {
if !dv.is_empty() {
let row_selection =
build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?;
batch_stream_builder = batch_stream_builder.with_row_selection(row_selection);
}
}
if let Some(size) = batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(size);
}
let mut batch_stream = batch_stream_builder.build()?;
while let Some(batch) = batch_stream.next().await {
yield batch?
}
}
}
}
.boxed())
}
}
/// Builds a Parquet [RowSelection] from deletion vector.
/// Only rows not in the deletion vector are selected; deleted rows are skipped at read time.
/// todo: Uses [DeletionVectorIterator] with [advance_to](DeletionVectorIterator::advance_to) when skipping row groups similar to iceberg-rust
fn build_deletes_row_selection(
row_group_metadata_list: &[RowGroupMetaData],
deletion_vector: &DeletionVector,
) -> crate::Result<RowSelection> {
let mut delete_iter = deletion_vector.iter();
let mut results: Vec<RowSelector> = Vec::new();
let mut current_row_group_base_idx: u64 = 0;
let mut next_deleted_row_idx_opt = delete_iter.next();
for row_group_metadata in row_group_metadata_list {
let row_group_num_rows = row_group_metadata.num_rows() as u64;
let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
Some(next_deleted_row_idx) => {
if next_deleted_row_idx >= next_row_group_base_idx {
results.push(RowSelector::select(row_group_num_rows as usize));
current_row_group_base_idx += row_group_num_rows;
continue;
}
next_deleted_row_idx
}
None => {
results.push(RowSelector::select(row_group_num_rows as usize));
current_row_group_base_idx += row_group_num_rows;
continue;
}
};
let mut current_idx = current_row_group_base_idx;
'chunks: while next_deleted_row_idx < next_row_group_base_idx {
if current_idx < next_deleted_row_idx {
let run_length = next_deleted_row_idx - current_idx;
results.push(RowSelector::select(run_length as usize));
current_idx += run_length;
}
let mut run_length = 0u64;
while next_deleted_row_idx == current_idx
&& next_deleted_row_idx < next_row_group_base_idx
{
run_length += 1;
current_idx += 1;
next_deleted_row_idx_opt = delete_iter.next();
next_deleted_row_idx = match next_deleted_row_idx_opt {
Some(v) => v,
None => {
results.push(RowSelector::skip(run_length as usize));
break 'chunks;
}
};
}
if run_length > 0 {
results.push(RowSelector::skip(run_length as usize));
}
}
if current_idx < next_row_group_base_idx {
results.push(RowSelector::select(
(next_row_group_base_idx - current_idx) as usize,
));
}
current_row_group_base_idx += row_group_num_rows;
}
Ok(results.into())
}
/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
///
/// # TODO
///
/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64)
/// contains the following hints to speed up metadata loading, similar to iceberg, we can consider adding them to this struct:
///
/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer.
/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`].
/// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`].
struct ArrowFileReader<R: FileRead> {
meta: FileStatus,
r: R,
}
impl<R: FileRead> ArrowFileReader<R> {
/// Create a new ArrowFileReader
fn new(meta: FileStatus, r: R) -> Self {
Self { meta, r }
}
fn read_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
Box::pin(self.r.read(range.start..range.end).map_err(|err| {
let err_msg = format!("{err}");
parquet::errors::ParquetError::External(err_msg.into())
}))
}
}
impl<R: FileRead> MetadataFetch for ArrowFileReader<R> {
fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.read_bytes(range)
}
}
impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.read_bytes(range)
}
fn get_metadata(
&mut self,
options: Option<&ArrowReaderOptions>,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
let metadata_opts = options.map(|o| o.metadata_options().clone());
Box::pin(async move {
let file_size = self.meta.size;
let metadata = ParquetMetaDataReader::new()
.with_metadata_options(metadata_opts)
.load_and_finish(self, file_size)
.await?;
Ok(Arc::new(metadata))
})
}
}