fix: avoid whole-file reads for Mosaic (#424)

Bridge Mosaic's random-access InputFile to Paimon's FileRead range API so projection and row-group pruning no longer require eager full-file reads.

Add a large-file regression test that verifies projected Mosaic reads do not issue a 0..file_size storage read.
diff --git a/crates/paimon/src/arrow/format/mosaic.rs b/crates/paimon/src/arrow/format/mosaic.rs
index 23f9780..2ab9ae8 100644
--- a/crates/paimon/src/arrow/format/mosaic.rs
+++ b/crates/paimon/src/arrow/format/mosaic.rs
@@ -28,13 +28,15 @@
 use async_stream::try_stream;
 use async_trait::async_trait;
 use bytes::Bytes;
-use futures::StreamExt;
+use futures::{SinkExt, StreamExt};
 use paimon_mosaic_core::reader::{InputFile, MosaicReader, ReaderAccess};
 use paimon_mosaic_core::schema::MosaicSchema;
 use paimon_mosaic_core::stats::ColumnStats;
 use paimon_mosaic_core::values::Value as MosaicValue;
 use std::collections::{HashMap, HashSet};
 use std::io;
+use std::ops::Range;
+use std::sync::Arc;
 
 pub(crate) struct MosaicFormatReader;
 
@@ -51,107 +53,168 @@
         batch_size: Option<usize>,
         row_selection: Option<Vec<RowRange>>,
     ) -> crate::Result<ArrowRecordBatchStream> {
-        let file_bytes = reader.read(0..file_size).await?;
-        let mosaic_reader = MosaicReader::new(MemoryInputFile::new(file_bytes), file_size)
-            .map_err(mosaic_read_error)?;
-
-        let file_column_names = mosaic_reader
-            .schema()
-            .columns
-            .iter()
-            .map(|column| column.name.as_str())
-            .collect::<HashSet<_>>();
-        let existing_read_fields = read_fields
-            .iter()
-            .filter(|field| file_column_names.contains(field.name()))
-            .cloned()
-            .collect::<Vec<_>>();
-        let read_schema = build_target_arrow_schema(&existing_read_fields)?;
-        validate_mosaic_schema(&read_schema)?;
-        let projected_names = existing_read_fields
-            .iter()
-            .map(|field| field.name().to_string())
-            .collect::<Vec<_>>();
-        let all_projected_columns_missing = !read_fields.is_empty() && projected_names.is_empty();
+        let handle = tokio::runtime::Handle::try_current().map_err(|e| Error::UnexpectedError {
+            message: "Mosaic reader requires a Tokio runtime".to_string(),
+            source: Some(Box::new(e)),
+        })?;
+        let read_fields = read_fields.to_vec();
+        let predicates = predicates.map(|predicates| FilePredicates {
+            predicates: predicates.predicates.clone(),
+            file_fields: predicates.file_fields.clone(),
+        });
         let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);
-        let predicate_state = predicates.map(|predicates| {
-            let file_fields = predicates.file_fields.clone();
-            let file_column_indices =
-                build_file_column_indices(mosaic_reader.schema(), &file_fields);
-            (
-                predicates.predicates.clone(),
-                file_fields,
-                file_column_indices,
-            )
+
+        let (mut batch_tx, mut batch_rx) = futures::channel::mpsc::channel(1);
+        let read_task = tokio::task::spawn_blocking(move || {
+            let result = read_mosaic_batches_blocking(
+                MosaicReadRequest {
+                    reader,
+                    file_size,
+                    read_fields,
+                    predicates,
+                    batch_size,
+                    row_selection,
+                    handle,
+                },
+                |batch| futures::executor::block_on(batch_tx.send(Ok(batch))).is_ok(),
+            );
+            if let Err(error) = result {
+                let _ = futures::executor::block_on(batch_tx.send(Err(error)));
+            }
         });
 
         Ok(try_stream! {
-            let mut row_group_start = 0usize;
-            for row_group_index in 0..mosaic_reader.num_row_groups() {
-                let row_group_rows = mosaic_reader
-                    .row_group_num_rows(row_group_index)
-                    .map_err(mosaic_read_error)?;
-                let selected_slices = selected_slices_for_row_group(
-                    row_group_rows,
-                    row_group_start,
-                    row_selection.as_deref(),
-                )?;
-                row_group_start = row_group_start
-                    .checked_add(row_group_rows)
-                    .ok_or_else(|| Error::DataInvalid {
-                        message: "Mosaic row group row count overflow".to_string(),
-                        source: None,
-                    })?;
-
-                if let Some(slices) = selected_slices.as_ref() {
-                    if slices.is_empty() {
-                        continue;
-                    }
-                }
-
-                if let Some((predicates, file_fields, file_column_indices)) = &predicate_state {
-                    let row_group_stats = mosaic_reader
-                        .row_group_stats(row_group_index)
-                        .map_err(mosaic_read_error)?;
-                    if !row_group_may_match(
-                        row_group_rows,
-                        row_group_stats,
-                        file_column_indices,
-                        predicates,
-                        file_fields,
-                    )? {
-                        continue;
-                    }
-                }
-
-                let batch = if all_projected_columns_missing {
-                    let row_count = selected_slices
-                        .as_ref()
-                        .map_or(row_group_rows, |slices| selected_row_count(slices));
-                    empty_batch(read_schema.clone(), row_count)?
-                } else {
-                    let names = projected_names
-                        .iter()
-                        .map(String::as_str)
-                        .collect::<Vec<_>>();
-                    let mut row_group_reader = mosaic_reader
-                        .row_group_reader_by_names(row_group_index, &names)
-                        .map_err(mosaic_read_error)?;
-
-                    let batch = row_group_reader
-                        .read_columns()
-                        .map_err(mosaic_read_error)?;
-                    take_row_slices(batch, selected_slices.as_deref(), &read_schema)?
-                };
-                for chunk in split_batch(batch, batch_size) {
-                    yield chunk;
-                }
+            while let Some(batch) = batch_rx.next().await {
+                yield batch?;
             }
+            read_task.await.map_err(|e| Error::DataInvalid {
+                message: format!("Mosaic read task failed: {e}"),
+                source: None,
+            })?;
         }
         .boxed())
     }
 }
 
+struct MosaicReadRequest {
+    reader: Box<dyn FileRead>,
+    file_size: u64,
+    read_fields: Vec<DataField>,
+    predicates: Option<FilePredicates>,
+    batch_size: usize,
+    row_selection: Option<Vec<RowRange>>,
+    handle: tokio::runtime::Handle,
+}
+
+fn read_mosaic_batches_blocking(
+    request: MosaicReadRequest,
+    mut send_batch: impl FnMut(RecordBatch) -> bool,
+) -> crate::Result<()> {
+    let MosaicReadRequest {
+        reader,
+        file_size,
+        read_fields,
+        predicates,
+        batch_size,
+        row_selection,
+        handle,
+    } = request;
+    let mosaic_reader = MosaicReader::new(FileReadInputFile::new(reader, handle), file_size)
+        .map_err(mosaic_read_error)?;
+
+    let file_column_names = mosaic_reader
+        .schema()
+        .columns
+        .iter()
+        .map(|column| column.name.as_str())
+        .collect::<HashSet<_>>();
+    let existing_read_fields = read_fields
+        .iter()
+        .filter(|field| file_column_names.contains(field.name()))
+        .cloned()
+        .collect::<Vec<_>>();
+    let read_schema = build_target_arrow_schema(&existing_read_fields)?;
+    validate_mosaic_schema(&read_schema)?;
+    let projected_names = existing_read_fields
+        .iter()
+        .map(|field| field.name().to_string())
+        .collect::<Vec<_>>();
+    let all_projected_columns_missing = !read_fields.is_empty() && projected_names.is_empty();
+    let predicate_state = predicates.map(|predicates| {
+        let file_column_indices =
+            build_file_column_indices(mosaic_reader.schema(), &predicates.file_fields);
+        (
+            predicates.predicates,
+            predicates.file_fields,
+            file_column_indices,
+        )
+    });
+
+    let mut row_group_start = 0usize;
+    for row_group_index in 0..mosaic_reader.num_row_groups() {
+        let row_group_rows = mosaic_reader
+            .row_group_num_rows(row_group_index)
+            .map_err(mosaic_read_error)?;
+        let selected_slices = selected_slices_for_row_group(
+            row_group_rows,
+            row_group_start,
+            row_selection.as_deref(),
+        )?;
+        row_group_start =
+            row_group_start
+                .checked_add(row_group_rows)
+                .ok_or_else(|| Error::DataInvalid {
+                    message: "Mosaic row group row count overflow".to_string(),
+                    source: None,
+                })?;
+
+        if let Some(slices) = selected_slices.as_ref() {
+            if slices.is_empty() {
+                continue;
+            }
+        }
+
+        if let Some((predicates, file_fields, file_column_indices)) = &predicate_state {
+            let row_group_stats = mosaic_reader
+                .row_group_stats(row_group_index)
+                .map_err(mosaic_read_error)?;
+            if !row_group_may_match(
+                row_group_rows,
+                row_group_stats,
+                file_column_indices,
+                predicates,
+                file_fields,
+            )? {
+                continue;
+            }
+        }
+
+        let batch = if all_projected_columns_missing {
+            let row_count = selected_slices
+                .as_ref()
+                .map_or(row_group_rows, |slices| selected_row_count(slices));
+            empty_batch(read_schema.clone(), row_count)?
+        } else {
+            let names = projected_names
+                .iter()
+                .map(String::as_str)
+                .collect::<Vec<_>>();
+            let mut row_group_reader = mosaic_reader
+                .row_group_reader_by_names(row_group_index, &names)
+                .map_err(mosaic_read_error)?;
+
+            let batch = row_group_reader.read_columns().map_err(mosaic_read_error)?;
+            take_row_slices(batch, selected_slices.as_deref(), &read_schema)?
+        };
+        for chunk in split_batch(batch, batch_size) {
+            if !send_batch(chunk) {
+                return Ok(());
+            }
+        }
+    }
+    Ok(())
+}
+
 struct MosaicRowGroupStats<'a> {
     row_count: i64,
     row_group_stats: &'a [ColumnStats],
@@ -313,39 +376,65 @@
     )
 }
 
-#[derive(Clone)]
-struct MemoryInputFile {
-    data: Bytes,
+struct FileReadInputFile {
+    reader: Arc<dyn FileRead>,
+    handle: tokio::runtime::Handle,
 }
 
-impl MemoryInputFile {
-    fn new(data: Bytes) -> Self {
-        Self { data }
+impl FileReadInputFile {
+    fn new(reader: Box<dyn FileRead>, handle: tokio::runtime::Handle) -> Self {
+        Self {
+            reader: Arc::from(reader),
+            handle,
+        }
     }
 }
 
-impl InputFile for MemoryInputFile {
+impl InputFile for FileReadInputFile {
     fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
-        let offset = usize::try_from(offset).map_err(|_| {
+        let len = u64::try_from(buf.len()).map_err(|_| {
             io::Error::new(
                 io::ErrorKind::InvalidInput,
-                "mosaic read offset exceeds usize",
+                "mosaic read length exceeds u64",
             )
         })?;
-        let end = offset.checked_add(buf.len()).ok_or_else(|| {
+        let end = offset.checked_add(len).ok_or_else(|| {
             io::Error::new(io::ErrorKind::InvalidInput, "mosaic read range overflows")
         })?;
-        let src = self.data.get(offset..end).ok_or_else(|| {
-            io::Error::new(
+        let bytes = block_on_file_read(&self.reader, &self.handle, offset..end)?;
+        if bytes.len() != buf.len() {
+            return Err(io::Error::new(
                 io::ErrorKind::UnexpectedEof,
-                "mosaic read range exceeds file size",
-            )
-        })?;
-        buf.copy_from_slice(src);
+                format!(
+                    "mosaic read expected {} bytes, got {}",
+                    buf.len(),
+                    bytes.len()
+                ),
+            ));
+        }
+        buf.copy_from_slice(&bytes);
         Ok(())
     }
 }
 
+fn block_on_file_read(
+    reader: &Arc<dyn FileRead>,
+    handle: &tokio::runtime::Handle,
+    range: Range<u64>,
+) -> io::Result<Bytes> {
+    let reader = Arc::clone(reader);
+    let (tx, rx) = std::sync::mpsc::sync_channel(1);
+    handle.spawn(async move {
+        let result = reader
+            .read(range)
+            .await
+            .map_err(|e| io::Error::other(e.to_string()));
+        let _ = tx.send(result);
+    });
+    rx.recv()
+        .map_err(|_| io::Error::other("mosaic async read task was cancelled"))?
+}
+
 fn validate_mosaic_schema(schema: &SchemaRef) -> crate::Result<()> {
     for field in schema.fields() {
         validate_mosaic_arrow_type(field.data_type()).map_err(|message| Error::Unsupported {
@@ -550,7 +639,7 @@
     use paimon_mosaic_core::spec::COMPRESSION_NONE;
     use paimon_mosaic_core::writer::{MosaicWriter, OutputFile, WriterOptions};
     use std::ops::Range;
-    use std::sync::Arc;
+    use std::sync::{Arc, Mutex};
 
     struct TestFileRead {
         data: Bytes,
@@ -565,6 +654,42 @@
         }
     }
 
+    struct TrackingFileRead {
+        data: Bytes,
+        calls: Arc<Mutex<Vec<Range<u64>>>>,
+    }
+
+    #[async_trait]
+    impl FileRead for TrackingFileRead {
+        async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
+            self.calls.lock().unwrap().push(range.clone());
+            let start = usize::try_from(range.start).unwrap();
+            let end = usize::try_from(range.end).unwrap();
+            Ok(self.data.slice(start..end))
+        }
+    }
+
+    struct RuntimeThreadFileRead {
+        data: Bytes,
+        runtime_thread: std::thread::ThreadId,
+    }
+
+    #[async_trait]
+    impl FileRead for RuntimeThreadFileRead {
+        async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
+            if std::thread::current().id() != self.runtime_thread {
+                return Err(Error::UnexpectedError {
+                    message: "Mosaic FileRead was not polled on the runtime thread".to_string(),
+                    source: None,
+                });
+            }
+            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
+            let start = usize::try_from(range.start).unwrap();
+            let end = usize::try_from(range.end).unwrap();
+            Ok(self.data.slice(start..end))
+        }
+    }
+
     struct MemOutputFile {
         data: Vec<u8>,
     }
@@ -634,6 +759,25 @@
         .unwrap()
     }
 
+    fn large_batch(row_count: i32) -> RecordBatch {
+        let ids = (0..row_count).collect::<Vec<_>>();
+        let names = ids
+            .iter()
+            .map(|id| format!("name-{id:05}-payload-for-range-read"))
+            .collect::<Vec<_>>();
+        let scores = ids.iter().map(|id| id * 10).collect::<Vec<_>>();
+
+        RecordBatch::try_new(
+            arrow_schema(),
+            vec![
+                Arc::new(Int32Array::from(ids)),
+                Arc::new(StringArray::from(names)),
+                Arc::new(Int32Array::from(scores)),
+            ],
+        )
+        .unwrap()
+    }
+
     fn batch(ids: Vec<i32>, names: Vec<&str>, scores: Vec<i32>) -> RecordBatch {
         RecordBatch::try_new(
             arrow_schema(),
@@ -826,6 +970,94 @@
     }
 
     #[tokio::test]
+    async fn test_mosaic_projection_uses_range_reads_for_large_file() {
+        let fields = data_fields();
+        let projected = vec![fields[0].clone()];
+        let row_count = 20_000;
+        let data = write_mosaic(&large_batch(row_count));
+        let file_size = data.len() as u64;
+        let calls = Arc::new(Mutex::new(Vec::new()));
+        assert!(file_size > 64 * 1024);
+
+        let batches = MosaicFormatReader
+            .read_batch_stream(
+                Box::new(TrackingFileRead {
+                    data,
+                    calls: Arc::clone(&calls),
+                }),
+                file_size,
+                &projected,
+                None,
+                None,
+                None,
+            )
+            .await
+            .unwrap()
+            .try_collect::<Vec<_>>()
+            .await
+            .unwrap();
+
+        let ids = collect_i32_column(&batches, 0);
+        assert_eq!(ids.len(), row_count as usize);
+        assert_eq!(ids.first(), Some(&0));
+        assert_eq!(ids.last(), Some(&(row_count - 1)));
+
+        let calls = calls.lock().unwrap().clone();
+        assert!(
+            !calls
+                .iter()
+                .any(|range| range.start == 0 && range.end == file_size),
+            "projection should not read the whole Mosaic file, got {calls:?}"
+        );
+        assert!(
+            calls
+                .iter()
+                .any(|range| range.end == file_size && range.start > 0),
+            "expected a tail metadata read, got {calls:?}"
+        );
+        assert!(
+            calls
+                .iter()
+                .all(|range| range.start < range.end && range.end <= file_size),
+            "invalid read ranges: {calls:?}"
+        );
+    }
+
+    #[test]
+    fn test_current_thread_runtime_dispatches_mosaic_reads_to_runtime() {
+        let data = write_mosaic(&sample_batch());
+        let file_size = data.len() as u64;
+        let fields = data_fields();
+        let runtime_thread = std::thread::current().id();
+        let runtime = tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+
+        let batches = runtime.block_on(async move {
+            MosaicFormatReader
+                .read_batch_stream(
+                    Box::new(RuntimeThreadFileRead {
+                        data,
+                        runtime_thread,
+                    }),
+                    file_size,
+                    &fields,
+                    None,
+                    None,
+                    None,
+                )
+                .await
+                .unwrap()
+                .try_collect::<Vec<_>>()
+                .await
+                .unwrap()
+        });
+
+        assert_eq!(collect_i32_column(&batches, 0), vec![1, 2, 3, 4, 5]);
+    }
+
+    #[tokio::test]
     async fn test_read_empty_projection() {
         let data = write_mosaic(&sample_batch());
         let batches = read_batches(data, &[], None).await.unwrap();