blob: 13821fdf7b1f6d327988e600aaf7f3ca847fc969 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::cmp::{max, min};
use std::ops::Range;
use std::sync::Arc;
use futures::FutureExt;
use futures::future::BoxFuture;
use opendal::Reader;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::errors::{ParquetError, Result as ParquetResult};
use parquet::file::FOOTER_SIZE;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::ParquetMetaDataReader;
const PREFETCH_FOOTER_SIZE: usize = 512 * 1024;
/// AsyncReader implements AsyncFileReader trait by using opendal.
///
/// ```no_run
/// use std::sync::Arc;
/// use arrow::array::{ArrayRef, Int64Array, RecordBatch};
///
/// use futures::StreamExt;
/// use opendal::{services::S3Config, Operator};
/// use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder};
/// use parquet_opendal::{AsyncReader, AsyncWriter};
///
/// #[tokio::main]
/// async fn main() {
/// let mut cfg = S3Config::default();
/// cfg.access_key_id = Some("my_access_key".to_string());
/// cfg.secret_access_key = Some("my_secret_key".to_string());
/// cfg.endpoint = Some("my_endpoint".to_string());
/// cfg.region = Some("my_region".to_string());
/// cfg.bucket = "my_bucket".to_string();
///
/// // Create a new operator
/// let operator = Operator::from_config(cfg).unwrap().finish();
/// let path = "/path/to/file.parquet";
///
/// // Create an async writer
/// let writer = AsyncWriter::new(
/// operator
/// .writer_with(path)
/// .chunk(32 * 1024 * 1024)
/// .concurrent(8)
/// .await
/// .unwrap(),
/// );
///
/// let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
/// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
/// let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap();
/// writer.write(&to_write).await.unwrap();
/// writer.close().await.unwrap();
///
/// // gap: Allow the underlying reader to merge small IOs
/// // when the gap between multiple IO ranges is less than the threshold.
/// let reader = operator
/// .reader_with(path)
/// .gap(512 * 1024)
/// .chunk(16 * 1024 * 1024)
/// .concurrent(16)
/// .await
/// .unwrap();
/// let content_len = operator.stat(path).await.unwrap().content_length();
/// let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024);
/// let mut stream = ParquetRecordBatchStreamBuilder::new(reader)
/// .await
/// .unwrap()
/// .build()
/// .unwrap();
/// let read = stream.next().await.unwrap().unwrap();
/// assert_eq!(to_write, read);
/// }
/// ```
pub struct AsyncReader {
inner: Reader,
content_length: u64,
// The prefetch size for fetching file footer.
prefetch_footer_size: usize,
}
fn set_prefetch_footer_size(footer_size: usize, content_size: u64) -> usize {
let footer_size = max(footer_size, FOOTER_SIZE);
min(footer_size as u64, content_size) as usize
}
impl AsyncReader {
/// Create a [`AsyncReader`] by given [`Reader`].
pub fn new(reader: Reader, content_length: u64) -> Self {
Self {
inner: reader,
content_length,
prefetch_footer_size: set_prefetch_footer_size(PREFETCH_FOOTER_SIZE, content_length),
}
}
/// Set prefetch size for fetching file footer.
pub fn with_prefetch_footer_size(mut self, footer_size: usize) -> Self {
self.prefetch_footer_size = set_prefetch_footer_size(footer_size, self.content_length);
self
}
}
impl AsyncFileReader for AsyncReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, ParquetResult<bytes::Bytes>> {
async move {
Ok(self
.inner
.read(range)
.await
.map_err(|err| ParquetError::External(Box::new(err)))?
.to_bytes())
}
.boxed()
}
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, ParquetResult<Vec<bytes::Bytes>>> {
async move {
Ok(self
.inner
.fetch(ranges)
.await
.map_err(|err| ParquetError::External(Box::new(err)))?
.into_iter()
.map(|buf| buf.to_bytes())
.collect::<Vec<_>>())
}
.boxed()
}
fn get_metadata<'a>(
&'a mut self,
_options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, ParquetResult<std::sync::Arc<ParquetMetaData>>> {
let content_length = self.content_length;
let prefetch_footer_size = self.prefetch_footer_size;
async move {
let reader =
ParquetMetaDataReader::new().with_prefetch_hint(Some(prefetch_footer_size));
let meta = reader.load_and_finish(self, content_length).await?;
Ok(Arc::new(meta))
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use futures::StreamExt;
use opendal::{Operator, services};
use rand::{Rng, distributions::Alphanumeric};
use crate::{AsyncReader, AsyncWriter, async_reader::PREFETCH_FOOTER_SIZE};
use std::sync::Arc;
use arrow::array::{ArrayRef, Int64Array, RecordBatch};
use parquet::{
arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder},
file::metadata::KeyValue,
file::properties::WriterProperties,
};
#[tokio::test]
async fn test_async_reader_with_prefetch_footer_size() {
let operator = Operator::new(services::Memory::default()).unwrap().finish();
let path = "/path/to/file.parquet";
let reader = AsyncReader::new(operator.reader(path).await.unwrap(), 1024);
assert_eq!(reader.prefetch_footer_size, 1024);
assert_eq!(reader.content_length, 1024);
let reader = AsyncReader::new(operator.reader(path).await.unwrap(), 1024 * 1024);
assert_eq!(reader.prefetch_footer_size, PREFETCH_FOOTER_SIZE);
assert_eq!(reader.content_length, 1024 * 1024);
let reader = AsyncReader::new(operator.reader(path).await.unwrap(), 1024 * 1024)
.with_prefetch_footer_size(2048 * 1024);
assert_eq!(reader.prefetch_footer_size, 1024 * 1024);
assert_eq!(reader.content_length, 1024 * 1024);
let reader = AsyncReader::new(operator.reader(path).await.unwrap(), 1024 * 1024)
.with_prefetch_footer_size(1);
assert_eq!(reader.prefetch_footer_size, 8);
assert_eq!(reader.content_length, 1024 * 1024);
}
fn gen_fixed_string(size: usize) -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(size)
.map(char::from)
.collect()
}
#[tokio::test]
async fn test_async_reader() {
let operator = Operator::new(services::Memory::default()).unwrap().finish();
let path = "/path/to/file.parquet";
let writer = AsyncWriter::new(
operator
.writer_with(path)
.chunk(32 * 1024 * 1024)
.concurrent(8)
.await
.unwrap(),
);
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut writer = AsyncArrowWriter::try_new(writer, to_write.schema(), None).unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();
let reader = operator.reader(path).await.unwrap();
let content_len = operator.stat(path).await.unwrap().content_length();
let reader = AsyncReader::new(reader, content_len);
let mut stream = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.build()
.unwrap();
let read = stream.next().await.unwrap().unwrap();
assert_eq!(to_write, read);
}
struct TestCase {
metadata_size: usize,
prefetch: Option<usize>,
}
#[tokio::test]
async fn test_async_reader_with_large_metadata() {
for case in [
TestCase {
metadata_size: 256 * 1024,
prefetch: None,
},
TestCase {
metadata_size: 1024 * 1024,
prefetch: None,
},
TestCase {
metadata_size: 256 * 1024,
prefetch: Some(4),
},
TestCase {
metadata_size: 1024 * 1024,
prefetch: Some(4),
},
] {
let operator = Operator::new(services::Memory::default()).unwrap().finish();
let path = "/path/to/file.parquet";
let writer = AsyncWriter::new(
operator
.writer_with(path)
.chunk(32 * 1024 * 1024)
.concurrent(8)
.await
.unwrap(),
);
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut writer = AsyncArrowWriter::try_new(
writer,
to_write.schema(),
Some(
WriterProperties::builder()
.set_key_value_metadata(Some(vec![KeyValue {
key: "__metadata".to_string(),
value: Some(gen_fixed_string(case.metadata_size)),
}]))
.build(),
),
)
.unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();
let reader = operator.reader(path).await.unwrap();
let content_len = operator.stat(path).await.unwrap().content_length();
let mut reader = AsyncReader::new(reader, content_len);
if let Some(footer_size) = case.prefetch {
reader = reader.with_prefetch_footer_size(footer_size);
}
let mut stream = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.build()
.unwrap();
let read = stream.next().await.unwrap().unwrap();
assert_eq!(to_write, read);
}
}
}