fix: avoid whole-file reads for Mosaic (#424)
Bridge Mosaic's random-access InputFile to Paimon's FileRead range API so projection and row-group pruning no longer require eager full-file reads.
Add a large-file regression test that verifies projected Mosaic reads do not issue a 0..file_size storage read.
diff --git a/crates/paimon/src/arrow/format/mosaic.rs b/crates/paimon/src/arrow/format/mosaic.rs
index 23f9780..2ab9ae8 100644
--- a/crates/paimon/src/arrow/format/mosaic.rs
+++ b/crates/paimon/src/arrow/format/mosaic.rs
@@ -28,13 +28,15 @@
use async_stream::try_stream;
use async_trait::async_trait;
use bytes::Bytes;
-use futures::StreamExt;
+use futures::{SinkExt, StreamExt};
use paimon_mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess};
use paimon_mosaic_core::schema::MosaicSchema;
use paimon_mosaic_core::stats::ColumnStats;
use paimon_mosaic_core::values::Value as MosaicValue;
use std::collections::{HashMap, HashSet};
use std::io;
+use std::ops::Range;
+use std::sync::Arc;
pub(crate) struct MosaicFormatReader;
@@ -51,107 +53,168 @@
batch_size: Option<usize>,
row_selection: Option<Vec<RowRange>>,
) -> crate::Result<ArrowRecordBatchStream> {
- let file_bytes = reader.read(0..file_size).await?;
- let mosaic_reader = MosaicReader::new(MemoryInputFile::new(file_bytes), file_size)
- .map_err(mosaic_read_error)?;
-
- let file_column_names = mosaic_reader
- .schema()
- .columns
- .iter()
- .map(|column| column.name.as_str())
- .collect::<HashSet<_>>();
- let existing_read_fields = read_fields
- .iter()
- .filter(|field| file_column_names.contains(field.name()))
- .cloned()
- .collect::<Vec<_>>();
- let read_schema = build_target_arrow_schema(&existing_read_fields)?;
- validate_mosaic_schema(&read_schema)?;
- let projected_names = existing_read_fields
- .iter()
- .map(|field| field.name().to_string())
- .collect::<Vec<_>>();
- let all_projected_columns_missing = !read_fields.is_empty() && projected_names.is_empty();
+ let handle = tokio::runtime::Handle::try_current().map_err(|e| Error::UnexpectedError {
+ message: "Mosaic reader requires a Tokio runtime".to_string(),
+ source: Some(Box::new(e)),
+ })?;
+ let read_fields = read_fields.to_vec();
+ let predicates = predicates.map(|predicates| FilePredicates {
+ predicates: predicates.predicates.clone(),
+ file_fields: predicates.file_fields.clone(),
+ });
let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
- let predicate_state = predicates.map(|predicates| {
- let file_fields = predicates.file_fields.clone();
- let file_column_indices =
- build_file_column_indices(mosaic_reader.schema(), &file_fields);
- (
- predicates.predicates.clone(),
- file_fields,
- file_column_indices,
- )
+
+ let (mut batch_tx, mut batch_rx) = futures::channel::mpsc::channel(1);
+ let read_task = tokio::task::spawn_blocking(move || {
+ let result = read_mosaic_batches_blocking(
+ MosaicReadRequest {
+ reader,
+ file_size,
+ read_fields,
+ predicates,
+ batch_size,
+ row_selection,
+ handle,
+ },
+ |batch| futures::executor::block_on(batch_tx.send(Ok(batch))).is_ok(),
+ );
+ if let Err(error) = result {
+ let _ = futures::executor::block_on(batch_tx.send(Err(error)));
+ }
});
Ok(try_stream! {
- let mut row_group_start = 0usize;
- for row_group_index in 0..mosaic_reader.num_row_groups() {
- let row_group_rows = mosaic_reader
- .row_group_num_rows(row_group_index)
- .map_err(mosaic_read_error)?;
- let selected_slices = selected_slices_for_row_group(
- row_group_rows,
- row_group_start,
- row_selection.as_deref(),
- )?;
- row_group_start = row_group_start
- .checked_add(row_group_rows)
- .ok_or_else(|| Error::DataInvalid {
- message: "Mosaic row group row count overflow".to_string(),
- source: None,
- })?;
-
- if let Some(slices) = selected_slices.as_ref() {
- if slices.is_empty() {
- continue;
- }
- }
-
- if let Some((predicates, file_fields, file_column_indices)) = &predicate_state {
- let row_group_stats = mosaic_reader
- .row_group_stats(row_group_index)
- .map_err(mosaic_read_error)?;
- if !row_group_may_match(
- row_group_rows,
- row_group_stats,
- file_column_indices,
- predicates,
- file_fields,
- )? {
- continue;
- }
- }
-
- let batch = if all_projected_columns_missing {
- let row_count = selected_slices
- .as_ref()
- .map_or(row_group_rows, |slices| selected_row_count(slices));
- empty_batch(read_schema.clone(), row_count)?
- } else {
- let names = projected_names
- .iter()
- .map(String::as_str)
- .collect::<Vec<_>>();
- let mut row_group_reader = mosaic_reader
- .row_group_reader_by_names(row_group_index, &names)
- .map_err(mosaic_read_error)?;
-
- let batch = row_group_reader
- .read_columns()
- .map_err(mosaic_read_error)?;
- take_row_slices(batch, selected_slices.as_deref(), &read_schema)?
- };
- for chunk in split_batch(batch, batch_size) {
- yield chunk;
- }
+ while let Some(batch) = batch_rx.next().await {
+ yield batch?;
}
+ read_task.await.map_err(|e| Error::DataInvalid {
+ message: format!("Mosaic read task failed: {e}"),
+ source: None,
+ })?;
}
.boxed())
}
}
+struct MosaicReadRequest {
+ reader: Box<dyn FileRead>,
+ file_size: u64,
+ read_fields: Vec<DataField>,
+ predicates: Option<FilePredicates>,
+ batch_size: usize,
+ row_selection: Option<Vec<RowRange>>,
+ handle: tokio::runtime::Handle,
+}
+
+fn read_mosaic_batches_blocking(
+ request: MosaicReadRequest,
+ mut send_batch: impl FnMut(RecordBatch) -> bool,
+) -> crate::Result<()> {
+ let MosaicReadRequest {
+ reader,
+ file_size,
+ read_fields,
+ predicates,
+ batch_size,
+ row_selection,
+ handle,
+ } = request;
+ let mosaic_reader = MosaicReader::new(FileReadInputFile::new(reader, handle), file_size)
+ .map_err(mosaic_read_error)?;
+
+ let file_column_names = mosaic_reader
+ .schema()
+ .columns
+ .iter()
+ .map(|column| column.name.as_str())
+ .collect::<HashSet<_>>();
+ let existing_read_fields = read_fields
+ .iter()
+ .filter(|field| file_column_names.contains(field.name()))
+ .cloned()
+ .collect::<Vec<_>>();
+ let read_schema = build_target_arrow_schema(&existing_read_fields)?;
+ validate_mosaic_schema(&read_schema)?;
+ let projected_names = existing_read_fields
+ .iter()
+ .map(|field| field.name().to_string())
+ .collect::<Vec<_>>();
+ let all_projected_columns_missing = !read_fields.is_empty() && projected_names.is_empty();
+ let predicate_state = predicates.map(|predicates| {
+ let file_column_indices =
+ build_file_column_indices(mosaic_reader.schema(), &predicates.file_fields);
+ (
+ predicates.predicates,
+ predicates.file_fields,
+ file_column_indices,
+ )
+ });
+
+ let mut row_group_start = 0usize;
+ for row_group_index in 0..mosaic_reader.num_row_groups() {
+ let row_group_rows = mosaic_reader
+ .row_group_num_rows(row_group_index)
+ .map_err(mosaic_read_error)?;
+ let selected_slices = selected_slices_for_row_group(
+ row_group_rows,
+ row_group_start,
+ row_selection.as_deref(),
+ )?;
+ row_group_start =
+ row_group_start
+ .checked_add(row_group_rows)
+ .ok_or_else(|| Error::DataInvalid {
+ message: "Mosaic row group row count overflow".to_string(),
+ source: None,
+ })?;
+
+ if let Some(slices) = selected_slices.as_ref() {
+ if slices.is_empty() {
+ continue;
+ }
+ }
+
+ if let Some((predicates, file_fields, file_column_indices)) = &predicate_state {
+ let row_group_stats = mosaic_reader
+ .row_group_stats(row_group_index)
+ .map_err(mosaic_read_error)?;
+ if !row_group_may_match(
+ row_group_rows,
+ row_group_stats,
+ file_column_indices,
+ predicates,
+ file_fields,
+ )? {
+ continue;
+ }
+ }
+
+ let batch = if all_projected_columns_missing {
+ let row_count = selected_slices
+ .as_ref()
+ .map_or(row_group_rows, |slices| selected_row_count(slices));
+ empty_batch(read_schema.clone(), row_count)?
+ } else {
+ let names = projected_names
+ .iter()
+ .map(String::as_str)
+ .collect::<Vec<_>>();
+ let mut row_group_reader = mosaic_reader
+ .row_group_reader_by_names(row_group_index, &names)
+ .map_err(mosaic_read_error)?;
+
+ let batch = row_group_reader.read_columns().map_err(mosaic_read_error)?;
+ take_row_slices(batch, selected_slices.as_deref(), &read_schema)?
+ };
+ for chunk in split_batch(batch, batch_size) {
+ if !send_batch(chunk) {
+ return Ok(());
+ }
+ }
+ }
+ Ok(())
+}
+
struct MosaicRowGroupStats<'a> {
row_count: i64,
row_group_stats: &'a [ColumnStats],
@@ -313,39 +376,65 @@
)
}
-#[derive(Clone)]
-struct MemoryInputFile {
- data: Bytes,
+struct FileReadInputFile {
+ reader: Arc<dyn FileRead>,
+ handle: tokio::runtime::Handle,
}
-impl MemoryInputFile {
- fn new(data: Bytes) -> Self {
- Self { data }
+impl FileReadInputFile {
+ fn new(reader: Box<dyn FileRead>, handle: tokio::runtime::Handle) -> Self {
+ Self {
+ reader: Arc::from(reader),
+ handle,
+ }
}
}
-impl InputFile for MemoryInputFile {
+impl InputFile for FileReadInputFile {
fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
- let offset = usize::try_from(offset).map_err(|_| {
+ let len = u64::try_from(buf.len()).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
- "mosaic read offset exceeds usize",
+ "mosaic read length exceeds u64",
)
})?;
- let end = offset.checked_add(buf.len()).ok_or_else(|| {
+ let end = offset.checked_add(len).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "mosaic read range overflows")
})?;
- let src = self.data.get(offset..end).ok_or_else(|| {
- io::Error::new(
+ let bytes = block_on_file_read(&self.reader, &self.handle, offset..end)?;
+ if bytes.len() != buf.len() {
+ return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
- "mosaic read range exceeds file size",
- )
- })?;
- buf.copy_from_slice(src);
+ format!(
+ "mosaic read expected {} bytes, got {}",
+ buf.len(),
+ bytes.len()
+ ),
+ ));
+ }
+ buf.copy_from_slice(&bytes);
Ok(())
}
}
+fn block_on_file_read(
+ reader: &Arc<dyn FileRead>,
+ handle: &tokio::runtime::Handle,
+ range: Range<u64>,
+) -> io::Result<Bytes> {
+ let reader = Arc::clone(reader);
+ let (tx, rx) = std::sync::mpsc::sync_channel(1);
+ handle.spawn(async move {
+ let result = reader
+ .read(range)
+ .await
+ .map_err(|e| io::Error::other(e.to_string()));
+ let _ = tx.send(result);
+ });
+ rx.recv()
+ .map_err(|_| io::Error::other("mosaic async read task was cancelled"))?
+}
+
fn validate_mosaic_schema(schema: &SchemaRef) -> crate::Result<()> {
for field in schema.fields() {
validate_mosaic_arrow_type(field.data_type()).map_err(|message| Error::Unsupported {
@@ -550,7 +639,7 @@
use paimon_mosaic_core::spec::COMPRESSION_NONE;
use paimon_mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions};
use std::ops::Range;
- use std::sync::Arc;
+ use std::sync::{Arc, Mutex};
struct TestFileRead {
data: Bytes,
@@ -565,6 +654,42 @@
}
}
+ struct TrackingFileRead {
+ data: Bytes,
+ calls: Arc<Mutex<Vec<Range<u64>>>>,
+ }
+
+ #[async_trait]
+ impl FileRead for TrackingFileRead {
+ async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
+ self.calls.lock().unwrap().push(range.clone());
+ let start = usize::try_from(range.start).unwrap();
+ let end = usize::try_from(range.end).unwrap();
+ Ok(self.data.slice(start..end))
+ }
+ }
+
+ struct RuntimeThreadFileRead {
+ data: Bytes,
+ runtime_thread: std::thread::ThreadId,
+ }
+
+ #[async_trait]
+ impl FileRead for RuntimeThreadFileRead {
+ async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
+ if std::thread::current().id() != self.runtime_thread {
+ return Err(Error::UnexpectedError {
+ message: "Mosaic FileRead was not polled on the runtime thread".to_string(),
+ source: None,
+ });
+ }
+ tokio::time::sleep(std::time::Duration::from_millis(1)).await;
+ let start = usize::try_from(range.start).unwrap();
+ let end = usize::try_from(range.end).unwrap();
+ Ok(self.data.slice(start..end))
+ }
+ }
+
struct MemOutputFile {
data: Vec<u8>,
}
@@ -634,6 +759,25 @@
.unwrap()
}
+ fn large_batch(row_count: i32) -> RecordBatch {
+ let ids = (0..row_count).collect::<Vec<_>>();
+ let names = ids
+ .iter()
+ .map(|id| format!("name-{id:05}-payload-for-range-read"))
+ .collect::<Vec<_>>();
+ let scores = ids.iter().map(|id| id * 10).collect::<Vec<_>>();
+
+ RecordBatch::try_new(
+ arrow_schema(),
+ vec![
+ Arc::new(Int32Array::from(ids)),
+ Arc::new(StringArray::from(names)),
+ Arc::new(Int32Array::from(scores)),
+ ],
+ )
+ .unwrap()
+ }
+
fn batch(ids: Vec<i32>, names: Vec<&str>, scores: Vec<i32>) -> RecordBatch {
RecordBatch::try_new(
arrow_schema(),
@@ -826,6 +970,94 @@
}
#[tokio::test]
+ async fn test_mosaic_projection_uses_range_reads_for_large_file() {
+ let fields = data_fields();
+ let projected = vec![fields[0].clone()];
+ let row_count = 20_000;
+ let data = write_mosaic(&large_batch(row_count));
+ let file_size = data.len() as u64;
+ let calls = Arc::new(Mutex::new(Vec::new()));
+ assert!(file_size > 64 * 1024);
+
+ let batches = MosaicFormatReader
+ .read_batch_stream(
+ Box::new(TrackingFileRead {
+ data,
+ calls: Arc::clone(&calls),
+ }),
+ file_size,
+ &projected,
+ None,
+ None,
+ None,
+ )
+ .await
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
+
+ let ids = collect_i32_column(&batches, 0);
+ assert_eq!(ids.len(), row_count as usize);
+ assert_eq!(ids.first(), Some(&0));
+ assert_eq!(ids.last(), Some(&(row_count - 1)));
+
+ let calls = calls.lock().unwrap().clone();
+ assert!(
+ !calls
+ .iter()
+ .any(|range| range.start == 0 && range.end == file_size),
+ "projection should not read the whole Mosaic file, got {calls:?}"
+ );
+ assert!(
+ calls
+ .iter()
+ .any(|range| range.end == file_size && range.start > 0),
+ "expected a tail metadata read, got {calls:?}"
+ );
+ assert!(
+ calls
+ .iter()
+ .all(|range| range.start < range.end && range.end <= file_size),
+ "invalid read ranges: {calls:?}"
+ );
+ }
+
+ #[test]
+ fn test_current_thread_runtime_dispatches_mosaic_reads_to_runtime() {
+ let data = write_mosaic(&sample_batch());
+ let file_size = data.len() as u64;
+ let fields = data_fields();
+ let runtime_thread = std::thread::current().id();
+ let runtime = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+
+ let batches = runtime.block_on(async move {
+ MosaicFormatReader
+ .read_batch_stream(
+ Box::new(RuntimeThreadFileRead {
+ data,
+ runtime_thread,
+ }),
+ file_size,
+ &fields,
+ None,
+ None,
+ None,
+ )
+ .await
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap()
+ });
+
+ assert_eq!(collect_i32_column(&batches, 0), vec![1, 2, 3, 4, 5]);
+ }
+
+ #[tokio::test]
async fn test_read_empty_projection() {
let data = write_mosaic(&sample_batch());
let batches = read_batches(data, &[], None).await.unwrap();